Best Python code snippet using robotframework-anywherelibrary
smit_deploy.py
Source:smit_deploy.py  
...187        os.chdir('casetup')188        ca = casetup.deployca.DeployCA()189        if args.noexten:190            args.extensions = [' ']191        ca.init_config(package=args.package[0], CN=get_value(args.CN), C=get_value(args.C), O=get_value(args.O),192                       L=get_value(args.L),193                       ST=get_value(args.ST), OU=get_value(args.OU), emailAddress=get_value(args.email),194                       SELFSIGN='y', CAPATH=get_value(args.capath),195                       WORKPATH=get_value(args.capath), SK=get_value(args.sk), CSR=get_value(args.csr),196                       ECCPARAM=get_value(args.eccparam), CERT=get_value(args.certpath),197                       CERTDB=get_value(args.certdb), CERTS=get_value(args.certs), MSG=get_value(args.msg),198                       SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp),199                       MCERT=get_value(args.mcert), EXTENSIONS=get_value(args.extensions),200                       OPENSSL_PATH=get_value(args.opensslpath))201        ca.setup(args.all)202    elif args.package[0] == 'p43':203        if args.new:  # Create a new OCSP server.204            os.chdir('casetup')205            ocsp_deploy = casetup.deployca.DeployCA()206            if args.noexten:207                args.extensions = [' ']208            ocsp_deploy.init_config(package=args.package[0], CN=get_value(args.CN), C=get_value(args.C),209                                    O=get_value(args.O), L=get_value(args.L),210                                    ST=get_value(args.ST), OU=get_value(args.OU), emailAddress=get_value(args.email),211                                    SELFSIGN='n', CAPATH=get_value(args.capath),212                                    WORKPATH=get_value(args.workpath), SK=get_value(args.sk), CSR=get_value(args.csr),213                                    ECCPARAM=get_value(args.eccparam), CERT=get_value(args.certpath),214                                    CERTDB=get_value(args.certdb), CERTS=get_value(args.certs), MSG=get_value(args.msg),215                                    SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp),216                                    OCSPPORT=get_value(args.ocspport),217                                    MCERT=get_value(args.mcert), EXTENSIONS=get_value(args.extensions),218                                    OPENSSL_PATH=get_value(args.opensslpath))219            ocsp_deploy.ocsp_setup()220            os.chdir('../appca')221        else:  # Run existing OCSP server.222            os.chdir('appca')223        app = appca.installca.AppCA()224        app.build()225        ocsp = appca.ocsp.OCSP()226        if not args.config or args.config[0] == '':227            args.config = ['ocspcnf']228        ocsp.init_config(config=args.config[0], OCSPSK=get_value(args.sk), OCSPCERT=get_value(args.certpath),229                         CERTDB=get_value(args.certdb), CACERT=get_value(args.cacert),230                         OCSPPORT=get_value(args.ocspport))231        ocsp.start()232    elif args.package[0] == 'p44':233        os.chdir('security')234        cert = security.certmngr.CertManager()235        # ca = casetup.deployca.DeployCA()236        if args.noexten:237            args.extensions = [' ']238        if not args.config or args.config[0] == '':239            args.config = ['certcnf']240        cert.init_config(config=args.config[0], package=args.package[0], CN=get_value(args.CN), C=get_value(args.C),241                         O=get_value(args.O), L=get_value(args.L),242                         ST=get_value(args.ST), OU=get_value(args.OU), emailAddress=get_value(args.email),243                         SELFSIGN='n', CAPATH=get_value(args.capath),244                         WORKPATH=get_value(args.workpath), SK=get_value(args.sk), CSR=get_value(args.csr),245                         ECCPARAM=get_value(args.eccparam), CERT=get_value(args.certpath),246                         CERTDB=get_value(args.certdb), CERTS=get_value(args.certs), MSG=get_value(args.msg),247                         SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp),248                         MCERT=get_value(args.mcert), EXTENSIONS=get_value(args.extensions),249                         OPENSSL_PATH=get_value(args.opensslpath))250        cert.create_csr()251    elif args.package[0] == 'p45':252        os.chdir('security')253        cert = security.certmngr.CertManager()254        # ca = casetup.deployca.DeployCA()255        if args.noexten:256            args.extensions = [' ']257        if not args.config or args.config[0] == '':258            args.config = ['certcnf']259        cert.init_config(config=args.config[0], package=args.package[0], CN=get_value(args.CN),260                         C=get_value(args.C), O=get_value(args.O), L=get_value(args.L),261                         ST=get_value(args.ST), OU=get_value(args.OU), emailAddress=get_value(args.email),262                         SELFSIGN='n', CAPATH=get_value(args.capath),263                         WORKPATH=get_value(args.workpath), SK=get_value(args.sk), CSR=get_value(args.csr),264                         ECCPARAM=get_value(args.eccparam), CERT=get_value(args.certpath),265                         CERTDB=get_value(args.certdb), CERTS=get_value(args.certs), MSG=get_value(args.msg),266                         SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp),267                         MCERT=get_value(args.mcert), EXTENSIONS=get_value(args.extensions),268                         OPENSSL_PATH=get_value(args.opensslpath))269        cert.gen_sig()270    elif args.package[0] == 'p46':271        os.chdir('security')272        cert = security.certmngr.CertManager()273        # ca = casetup.deployca.DeployCA()274        if args.noexten:275            args.extensions = [' ']276        if not args.config or args.config[0] == '':277            args.config = ['certcnf']278        cert.init_config(config=args.config[0], package=args.package[0], CN=get_value(args.CN), C=get_value(args.C),279                         O=get_value(args.O), L=get_value(args.L),280                         ST=get_value(args.ST), OU=get_value(args.OU), emailAddress=get_value(args.email),281                         SELFSIGN='n', CAPATH=get_value(args.capath),282                         WORKPATH=get_value(args.workpath), SK=get_value(args.sk), CSR=get_value(args.csr),283                         ECCPARAM=get_value(args.eccparam), CERT=get_value(args.certpath),284                         CERTDB=get_value(args.certdb), CERTS=get_value(args.certs), MSG=get_value(args.msg),285                         SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp),286                         MCERT=get_value(args.mcert), EXTENSIONS=get_value(args.extensions),287                         OPENSSL_PATH=get_value(args.opensslpath))288        cert.create_cert()289    elif args.package[0] == 'p5':290        if args.new:  # create new private CA.291            os.chdir('casetup')292            cadeploy = casetup.deployca.DeployCA()293            if args.noexten:294                args.extensions = [' ']295            cadeploy.init_config(package=args.package[0], CN=get_value(args.CN), C=get_value(args.C),296                                 O=get_value(args.O),297                                 L=get_value(args.L),298                                 ST=get_value(args.ST), OU=get_value(args.OU), emailAddress=get_value(args.email),299                                 SELFSIGN='y', CAPATH=get_value(args.capath),300                                 WORKPATH=get_value(args.capath), SK=get_value(args.sk), CSR=get_value(args.csr),301                                 ECCPARAM=get_value(args.eccparam), CERT=get_value(args.certpath),302                                 CERTDB=get_value(args.certdb), CERTS=get_value(args.certs), MSG=get_value(args.msg),303                                 SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp),304                                 OCSPPORT=get_value(args.ocspport),305                                 MCERT=get_value(args.mcert), EXTENSIONS=get_value(args.extensions),306                                 OPENSSL_PATH=get_value(args.opensslpath))307            cadeploy.setup(args.all)308            os.chdir('../appca')309        else:310            os.chdir('appca')311        ca = appca.ca.CA()312        if not args.config or args.config[0] == '':313            args.config = ['appcacnf']314        ca.init_config(config=args.config[0], OCSPSK=get_value(args.sk), OCSPCERT=get_value(args.certpath),315                       CACERT=get_value(args.cacert), OCSPPORT=get_value(args.ocspport),316                       CACHAIN=get_value(args.cachain), OCSP=get_value(args.ocsp), IP=get_value(args.caip),317                       PORT=get_value(args.caport), SIGCERT=get_value(args.sigcert), SIGCHAIN=get_value(args.sigchain),318                       SELFSIGN='n', OPENSSL_PATH=get_value(args.opensslpath))319        ca.start()320    elif args.package[0] == 'p6':321        os.chdir('appserver')322        server = appserver.server.Server()323        if not args.config or args.config[0] == '':324            args.config = ['servercnf']325        server.init_config(config=args.config[0], CN=get_value(args.CN), C=get_value(args.C), O=get_value(args.O),326                           L=get_value(args.L), ST=get_value(args.ST), OU=get_value(args.OU),327                           emailAddress=get_value(args.email), SK=get_value(args.sk), CSR=get_value(args.csr),328                           CERT=get_value(args.certpath), MSG=get_value(args.msg), CACERT=get_value(args.cacert),329                           SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), CAIP=get_value(args.caip),330                           CAPORT=get_value(args.caport), SERVERIP=get_value(args.serverip),331                           SERVERPORT=get_value(args.serverport), TYPE='server', CERT_REQS='CERT_REQUIRED')332        server.start(args.all)333    elif args.package[0] == 'p7':334        os.chdir('appclient')335        client = appclient.client.Client()336        if not args.config or args.config[0] == '':337            args.config = ['clientcnf']338        client.init_config(config=args.config[0], CN=get_value(args.CN), C=get_value(args.C), O=get_value(args.O),339                           L=get_value(args.L), ST=get_value(args.ST), OU=get_value(args.OU),340                           emailAddress=get_value(args.email), SK=get_value(args.sk), CSR=get_value(args.csr),341                           CERT=get_value(args.certpath), MSG=get_value(args.msg), CACERT=get_value(args.cacert),342                           SIG=get_value(args.sig), CACHAIN=get_value(args.cachain), CAIP=get_value(args.caip),343                           CAPORT=get_value(args.caport), SERVERIP=get_value(args.serverip),344                           SERVERPORT=get_value(args.serverport), TYPE='client', CERT_REQS='CERT_REQUIRED')345        client.start(args.all)346    elif args.package[0] == 'p81':347        os.chdir('testbed')348        if args.exp_config is not None:349            if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':350                print ('Error: you should spcified two configuration files for the testbed configuration.\n'351                       'E.g.,: -exp_config client_config_file sink_config_file'352                       '\nNote: order is important, you should specify client configuration then sink configuration.')353                return354        else:355            args.exp_config = ['client_expcnf', 'sink_expcnf']356        exp = testbed.setupexp.SetupExp()357        exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),358                               CERT=get_value(args.certpath), CSR=get_value(args.csr),359                               CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),360                               SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),361                               CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),362                               SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),363                               SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),364                               DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),365                               SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')366        exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),367                             CERT=get_value(args.certpath), CSR=get_value(args.csr),368                             CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),369                             SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),370                             CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),371                             CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),372                             DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),373                             ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),374                             PASSWORD=get_value(args.passwd), USER=get_value(args.user),375                             SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),376                             TYPE='server')377        exp.setup()378    elif args.package[0] == 'p82':379        os.chdir('testbed')380        if args.exp_config is not None:381            if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':382                print ('Error: you should spcified two configuration files for the testbed configuration. \n'383                       'E.g.,: -exp_config client_config_file sink_config_file'384                       '\nNote: order is important, you should specify client configuration then sink configuration.')385                return386        else:387            args.exp_config = ['client_expcnf', 'sink_expcnf']388        exp = testbed.setupexp.SetupExp()389        exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),390                               CERT=get_value(args.certpath), CSR=get_value(args.csr),391                               CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),392                               SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),393                               CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),394                               SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),395                               SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),396                               DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),397                               SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')398        exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),399                             CERT=get_value(args.certpath), CSR=get_value(args.csr),400                             CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),401                             SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),402                             CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),403                             CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),404                             DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),405                             ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),406                             PASSWORD=get_value(args.passwd), USER=get_value(args.user),407                             SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),408                             TYPE='server')409        exp.start_sink(dtls=args.dtls)410    elif args.package[0] == 'p83':411        os.chdir('testbed')412        if args.exp_config is not None:413            if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':414                print ('Error: you should spcified two configuration files for the testbed configuration. \n'415                       'E.g.,: -exp_config client_config_file sink_config_file'416                       '\nNote: order is important, you should specify client configuration then sink configuration.')417                return418        else:419            args.exp_config = ['client_expcnf', 'sink_expcnf']420        exp = testbed.setupexp.SetupExp()421        exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),422                               CERT=get_value(args.certpath), CSR=get_value(args.csr),423                               CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),424                               SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),425                               CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),426                               SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),427                               SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),428                               DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),429                               SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')430        exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),431                             CERT=get_value(args.certpath), CSR=get_value(args.csr),432                             CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),433                             SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),434                             CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),435                             CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),436                             DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),437                             ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),438                             PASSWORD=get_value(args.passwd), USER=get_value(args.user),439                             SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),440                             TYPE='server')441        exp.start_router()442    elif args.package[0] == 'p84':  # start client443        os.chdir('testbed')444        if args.exp_config is not None:445            if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':446                print ('Error: you should spcified two configuration files for the testbed configuration.\n'447                       'E.g.,: -exp_config client_config_file sink_config_file'448                       '\nNote: order is important, you should specify client configuration then sink configuration.')449                return450        else:451            args.exp_config = ['client_expcnf', 'sink_expcnf']452        if args.inet is None:453            args.inet = ['6']454        elif args.inet[0] != '4' and args.inet[0] != '6':455            print ("Error: invalid parameter of \'inet\'. The value should be eigher 4 or 6")456            return457        exp = testbed.setupexp.SetupExp()458        exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),459                               CERT=get_value(args.certpath), CSR=get_value(args.csr),460                               CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),461                               SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),462                               CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),463                               SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),464                               SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),465                               DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),466                               SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')467        exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),468                             CERT=get_value(args.certpath), CSR=get_value(args.csr),469                             CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),470                             SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),471                             CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),472                             CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),473                             DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),474                             ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),475                             PASSWORD=get_value(args.passwd), USER=get_value(args.user),476                             SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),477                             TYPE='server')478        if args.update_client:479            exp.upload_to_clients(args.inet[0])480        if args.nostart is None or args.nostart != True:481            exp.start_clients()482    elif args.package[0] == 'p85':  # collect data483        os.chdir('testbed')484        if args.exp_config is not None:485            if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':486                print ('Error: you should spcified two configuration files for the testbed configuration. \n'487                       'E.g.,: -exp_config client_config_file sink_config_file'488                       '\nNote: order is important, you should specify client configuration then sink configuration.')489                return490        else:491            args.exp_config = ['client_expcnf', 'sink_expcnf']492        if args.inet is None:493            args.inet = ['6']494        elif args.inet[0] != '4' and args.inet[0] != '6':495            print ("Error: invalid parameter of \'inet\'. The value should be eigher 4 or 6")496            return497        exp = testbed.setupexp.SetupExp()498        exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),499                               CERT=get_value(args.certpath), CSR=get_value(args.csr),500                               CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),501                               SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),502                               CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),503                               SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),504                               SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),505                               DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),506                               SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')507        exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),508                             CERT=get_value(args.certpath), CSR=get_value(args.csr),509                             CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),510                             SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),511                             CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),512                             CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),513                             DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),514                             ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),515                             PASSWORD=get_value(args.passwd), USER=get_value(args.user),516                             SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),517                             TYPE='server')518        exp.collect_data(args.inet[0])519    elif args.package[0] == 'p86':  # analyze data520        os.chdir('testbed')521        if args.exp_config is not None:522            if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':523                print ('Error: you should spcified two configuration files for the testbed configuration. \n'524                       'E.g.,: -exp_config client_config_file sink_config_file'525                       '\nNote: order is important, you should specify client configuration then sink configuration.')526                return527        else:528            args.exp_config = ['client_expcnf', 'sink_expcnf']529        exp = testbed.setupexp.SetupExp()530        exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),531                               CERT=get_value(args.certpath), CSR=get_value(args.csr),532                               CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),533                               SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),534                               CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),535                               SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),536                               SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),537                               DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),538                               SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')539        exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),540                             CERT=get_value(args.certpath), CSR=get_value(args.csr),541                             CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),542                             SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),543                             CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),544                             CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),545                             DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),546                             ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),547                             PASSWORD=get_value(args.passwd), USER=get_value(args.user),548                             SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),549                             TYPE='server')550        exp.analyze()551    elif args.package[0] == 'p87':  # stop experiment552        os.chdir('testbed')553        if args.exp_config is not None:554            if len(args.exp_config) != 2 or args.exp_config[0] == '' or args.exp_config[1] == '':555                print ('Error: you should spcified two configuration files for the testbed configuration. \n'556                       'E.g.,: -exp_config client_config_file sink_config_file'557                       '\nNote: order is important, you should specify client configuration then sink configuration.')558                return559        else:560            args.exp_config = ['client_expcnf', 'sink_expcnf']561        if args.inet is not None:562            args.inet = ['6']563        elif args.inet[0] != '4' and args.inet[0] != '6':564            print ("Error: invalid parameter of \'inet\'. The value should be eigher 4 or 6")565            return566        exp = testbed.setupexp.SetupExp()567        exp.init_client_config(exp_config=args.exp_config[0], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),568                               CERT=get_value(args.certpath), CSR=get_value(args.csr),569                               CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),570                               SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),571                               CERT_REQS=get_value(args.certreq), TIMEZONE=get_value(args.timezone),572                               SYNCTIME=get_value(args.synctime), REFLOWPAN=get_value(args.rflowpan),573                               SYSWAIT=get_value(args.syswait), PAYLOADLEN=get_value(args.payloadlen),574                               DATE=get_value(args.date), SENDTIME=get_value(args.sendtime),575                               SENDRATE=get_value(args.sendrate), DEVNUM=get_value(args.devnum), TYPE='client')576        exp.init_sink_config(exp_config=args.exp_config[1], CAIP=get_value(args.caip), CAPORT=get_value(args.caport),577                             CERT=get_value(args.certpath), CSR=get_value(args.csr),578                             CACERT=get_value(args.cacert), SK=get_value(args.sk), SERVERIP=get_value(args.serverip),579                             SERVERPORT=get_value(args.serverport), CACHAIN=get_value(args.cacert),580                             CERT_REQS=get_value(args.certreq), ROUTER_IP6=get_value(args.router_ip6),581                             CLIENT_IP6=get_value(args.client_ip6), CLIENT_IP4=get_value(args.client_ip4),582                             DATE=get_value(args.date), CLIENT_WKD=get_value(args.client_workdir),583                             ROUTER_WKD=get_value(args.router_workdir), SINK2CLIENT=get_value(args.sink2client),584                             PASSWORD=get_value(args.passwd), USER=get_value(args.user),585                             SINK_INTERFACE=get_value(args.sink_interface), CLIENT_SCRIPT=get_value(args.client_script),586                             TYPE='server')587        exp.stop(args.inet[0])588    else:589        print('Invalid argument.')590def get_value(inlist):591    if inlist:592        return inlist[0]593    else:594        return None595if __name__ == '__main__':...SATEncodingTest.py
Source:SATEncodingTest.py  
...18        m = Model(v1 == 3)19        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)20        s.solve()21        self.assertTrue(s.is_sat())22        self.assertEqual(v1.get_value(), 3)23    def testEq2(self):24        v1 = Variable(1, 5)25        v2 = Variable(3, 8)26        m = Model(v1 == v2)27        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)28        s.solve()29        self.assertTrue(s.is_sat())30        self.assertEqual(v1.get_value(), v2.get_value())31    def testEq3(self):32        v1 = Variable(1, 4)33        v2 = Variable(4, 6)34        m = Model(v1 == v2)35        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)36        s.solve()37        self.assertTrue(s.is_sat())38        self.assertEqual(v1.get_value(), v2.get_value())39    def testEqGap(self):40        v1 = Variable([1, 3, 5, 7])41        v2 = Variable(4, 8)42        m = Model(v1 == v2)43        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)44        s.solve()45        self.assertTrue(s.is_sat())46        self.assertEqual(v1.get_value(), v2.get_value())47    def testSmallDomainEq(self):48        v1 = Variable(2)49        m = Model(v1 == 1)50        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)51        s.solve()52        self.assertTrue(s.is_sat())53        self.assertEqual(v1.get_value(), 1)54    def testSmallDomainLt(self):55        v1 = Variable(2)56        m = Model(v1 < 1)57        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)58        s.solve()59        self.assertTrue(s.is_sat())60        self.assertLess(v1.get_value(), 1)61    def testSmallDomainLe(self):62        v1 = Variable(2)63        m = Model(v1 <= 1)64        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)65        s.solve()66        self.assertTrue(s.is_sat())67        self.assertLessEqual(v1.get_value(), 1)68    def testSmallDomainGt(self):69        v1 = Variable(2)70        m = Model(v1 > 0)71        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)72        s.solve()73        self.assertTrue(s.is_sat())74        self.assertEqual(v1.get_value(), 1)75    def testSmallDomainGe(self):76        v1 = Variable(2)77        m = Model(v1 >= 1)78        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)79        s.solve()80        self.assertTrue(s.is_sat())81        self.assertEqual(v1.get_value(), 1)82    # def testConstantDomain(self):83    #     v1 = Variable(1, 4)84    #     v2 = Variable([2])85    #     m = Model(v1 == v2)86    #     s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)87    #     s.solve()88    #     self.assertTrue(s.is_sat())89    #     self.assertEqual(v1.get_value(), v2.get_value())90    def testOffsetDomain(self):91        v1 = Variable(1, 4)92        v2 = Variable(1, 5)93        m = Model(v1 + 3 == v2)94        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)95        s.solve()96        self.assertTrue(s.is_sat())97        self.assertEqual(v1.get_value() + 3, v2.get_value())98    def testNe1(self):99        v1 = Variable(1, 2)100        m = Model(v1 != 1)101        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)102        s.solve()103        self.assertTrue(s.is_sat())104        self.assertNotEqual(v1.get_value(), 1)105    def testNe2(self):106        v1 = Variable(1, 2)107        v2 = Variable(1, 2)108        m = Model(v1 != v2)109        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)110        s.solve()111        self.assertTrue(s.is_sat())112        self.assertNotEqual(v1.get_value(), v2.get_value())113    def testGt(self):114        v1 = Variable(1, 3)115        v2 = Variable(1, 3)116        m = Model(v1 > v2)117        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)118        s.solve()119        self.assertTrue(s.is_sat())120        self.assertGreater(v1.get_value(), v2.get_value())121    def testGe(self):122        v1 = Variable(1, 3)123        v2 = Variable(1, 3)124        m = Model(v1 >= v2)125        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)126        s.solve()127        self.assertTrue(s.is_sat())128        self.assertGreaterEqual(v1.get_value(), v2.get_value())129    def testLt(self):130        v1 = Variable(1, 3)131        v2 = Variable(1, 3)132        m = Model(v1 < v2)133        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)134        s.solve()135        self.assertTrue(s.is_sat())136        self.assertLess(v1.get_value(), v2.get_value())137    def testLe(self):138        v1 = Variable(1, 3)139        v2 = Variable(1, 3)140        m = Model(v1 <= v2)141        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)142        s.solve()143        self.assertTrue(s.is_sat())144        self.assertLessEqual(v1.get_value(), v2.get_value())145    def testLtGap(self):146        v1 = Variable([1, 3, 5])147        v2 = Variable([3])148        m = Model(v1 < v2)149        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)150        s.solve()151        self.assertTrue(s.is_sat())152        self.assertLess(v1.get_value(), v2.get_value())153    def testAddEqConstant(self):154        v1 = Variable(5)155        v2 = Variable(5)156        m = Model(v1 + v2 == 6)157        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)158        s.solve()159        self.assertTrue(s.is_sat())160        self.assertEqual(v1.get_value() + v2.get_value(), 6)161    def testAddEq(self):162        v1 = Variable(5)163        v2 = Variable(5)164        v3 = Variable(5)165        m = Model(v1 + v2 == v3)166        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)167        s.solve()168        self.assertTrue(s.is_sat())169        self.assertEqual(v1.get_value() + v2.get_value(), v3.get_value())170    def testAddGapsSat(self):171        v1 = Variable([0, 4, 8])172        v2 = Variable()173        v3 = Variable(10)174        m = Model(v1 + v2 == v3, v3 == 5)175        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)176        s.solve()177        self.assertTrue(s.is_sat())178        self.assertEqual(v1.get_value() + v2.get_value(), v3.get_value())179        self.assertEqual(v3.get_value(), 5)180    def testAddGapsUnsat(self):181        v1 = Variable([0, 4, 8])182        v2 = Variable()183        v3 = Variable(10)184        m = Model(v1 + v2 == v3, v3 == 3)185        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)186        s.solve()187        self.assertTrue(s.is_unsat())188    def testSubEq(self):189        v1 = Variable(5)190        v2 = Variable(5)191        v3 = Variable(5)192        m = Model(v1 - v2 == v3)193        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)194        s.solve()195        self.assertTrue(s.is_sat())196        self.assertEqual(v1.get_value() - v2.get_value(), v3.get_value())197    def testAddMultiple(self):198        v1 = Variable(5)199        v2 = Variable(5)200        v3 = Variable(5)201        v4 = Variable(5)202        v5 = Variable(5)203        m = Model(v1 + v2 + v3 + v4 == v5)204        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)205        s.solve()206        self.assertTrue(s.is_sat())207        self.assertEqual(v1.get_value() + v2.get_value() + v3.get_value() + v4.get_value(), v5.get_value())208    def testSumEq(self):209        v1 = Variable(5)210        v2 = Variable(5)211        v3 = Variable(5)212        m = Model(Sum([v1, v2]) == v3)213        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)214        s.solve()215        self.assertTrue(s.is_sat())216        self.assertEqual(v1.get_value() + v2.get_value(), v3.get_value())217    def testSumArray(self):218        vs = VarArray(4, 1, 5)219        v_sum = Variable(5)220        m = Model(Sum(vs) == v_sum)221        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)222        s.solve()223        self.assertTrue(s.is_sat())224        self.assertEqual(sum(v.get_value() for v in vs), v_sum.get_value())225    def testSumSingleCoef(self):226        v1 = Variable(5)227        m = Model(Sum([v1], [-2]) == -4)228        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)229        s.solve()230        self.assertTrue(s.is_sat())231        self.assertEqual(v1.get_value() * -2, -4)232    def testSumCoefEq(self):233        v1 = Variable(5)234        v2 = Variable(5)235        v3 = Variable(5)236        m = Model(Sum([v1, v2], [2, 2]) == v3)237        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)238        s.solve()239        self.assertTrue(s.is_sat())240        self.assertEqual(v1.get_value() * 2 + v2.get_value() * 2, v3.get_value())241    def testSumCoefNeg(self):242        v1 = Variable(5)243        v2 = Variable(5)244        m = Model(Sum([v1, v2], [-1, -1]) == -3)245        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)246        s.solve()247        self.assertTrue(s.is_sat())248        self.assertEqual(v1.get_value() * -1 + v2.get_value() * -1, -3)249    def testMulConstant(self):250        v1 = Variable(5)251        v2 = Variable(5)252        m = Model(v1 * 3 == v2)253        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)254        s.solve()255        self.assertTrue(s.is_sat())256        self.assertEqual(v1.get_value() * 3, v2.get_value())257    def testMulVars(self):258        v1 = Variable(5)259        v2 = Variable(5)260        m = Model(v1 * v2 == 3)261        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)262        s.solve()263        self.assertTrue(s.is_sat())264        self.assertEqual(v1.get_value() * v2.get_value(), 3)265    def testMulPredicate(self):266        v1 = Variable(5)267        v2 = Variable(5)268        v3 = Variable(5)269        m = Model(v1 * v2 == v3)270        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)271        s.solve()272        self.assertTrue(s.is_sat())273        self.assertEqual(v1.get_value() * v2.get_value(), v3.get_value())274    def testMulGapsSat(self):275        v1 = Variable([0, 4, 8])276        v2 = Variable()277        v3 = Variable(10)278        m = Model(v1 * v2 == v3, v3 == 4)279        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)280        s.solve()281        self.assertTrue(s.is_sat())282        self.assertEqual(v1.get_value() * v2.get_value(), v3.get_value())283        self.assertEqual(v3.get_value(), 4)284    def testMulGapsUnsat(self):285        v1 = Variable([0, 4, 8])286        v2 = Variable()287        v3 = Variable(10)288        m = Model(v1 * v2 == v3, v3 == 3)289        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)290        s.solve()291        self.assertTrue(s.is_unsat())292    # def testDivConstant(self):293    #     v1 = Variable(5)294    #     v2 = Variable(5)295    #     m = Model(v1 / 3 == v2)296    #     s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)297    #     s.solve()298    #     self.assertTrue(s.is_sat())299    #     self.assertEqual(v1.get_value() / 3, v2.get_value())300    def testAllDiff(self):301        vs = VarArray(5, 1, 5)302        m = Model(AllDiff(vs))303        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)304        s.solve()305        values = [v.get_value() for v in vs]306        self.assertItemsEqual(range(1, 6), values)307    def testMaximise(self):308        v1 = Variable(5)309        v2 = Variable(5)310        m = Model(v1 < v2, Maximise(v1))311        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)312        s.solve()313        self.assertTrue(s.is_sat())314        self.assertEqual(v1.get_value(), 3)315        self.assertEqual(v2.get_value(), 4)316    def testMinimise(self):317        v1 = Variable(5)318        v2 = Variable(5)319        m = Model(v1 < v2, Minimise(v2))320        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)321        s.solve()322        self.assertTrue(s.is_sat())323        self.assertEqual(v1.get_value(), 0)324        self.assertEqual(v2.get_value(), 1)325    def testFormula(self):326        a, b, c = VarArray(3)327        m = Model(((a == True) | (b == False)),328                  ((b == True) | (a == False)),329                  (c == True))330        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)331        s.solve()332        self.assertTrue(s.is_sat())333        self.assertTrue(c.get_value())334        self.assertEqual(a.get_value(), b.get_value())335    def testFormula2(self):336        a, b, c = VarArray(3)337        m = Model(((a == True) | (b == False)) &338                  ((b == True) | (a == False)) &339                  (c == True))340        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)341        s.solve()342        self.assertTrue(s.is_sat())343        self.assertTrue(c.get_value())344        self.assertEqual(a.get_value(), b.get_value())345    # FIXME Encoding of Table constraint is not supported yet.346    # def testTable(self):347    #     v1, v2 = VarArray(2, 1, 3)348    #     t = Table([v1, v2], [[1, 1], [2, 2], [3, 3]])349    #     m = Model(t)350    #     s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)351    #     s.solve()352    #     self.assertTrue(s.is_sat())353    #     self.assertNotEqual(v1.get_value(), v2.get_value())354    def testMax(self):355        v1 = Variable(5)356        v2 = Variable(5)357        m = Model(Max([v1, v2]) < 2)358        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)359        s.solve()360        self.assertTrue(s.is_sat())361        self.assertLess(v1.get_value(), 2)362        self.assertLess(v2.get_value(), 2)363    def testMaxNegVars(self):364        v1 = Variable(-5, -1)365        v2 = Variable(-5, -1)366        m = Model(Max([v1, v2]) < -2)367        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)368        s.solve()369        self.assertTrue(s.is_sat())370        self.assertLess(v1.get_value(), -2)371        self.assertLess(v2.get_value(), -2)372    # def testLargeDomain(self):373    #     v1 = Variable(1000)374    #     v2 = Variable(1000)375    #     m = Model(v1 == v2)376    #     s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)377    #     s.solve()378    #     self.assertTrue(s.is_sat())379    # def testAllDiffLargeDomain(self):380    #     vs = VarArray(100, 1, 1000)381    #     m = Model(AllDiff(vs))382    #     s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)383    #     s.solve()384    #     self.assertTrue(s.is_sat())385    # ---------------- Absolute ----------------386    def testAbsEq(self):387        v1 = Variable(-4, -1)388        m = Model(Abs(v1) == 2)389        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)390        s.solve()391        self.assertTrue(s.is_sat())392        self.assertEqual(v1.get_value(), -2)393    def testAbsGt(self):394        v1 = Variable(-4, -1)395        m = Model(Abs(v1) > 2)396        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)397        s.solve()398        self.assertTrue(s.is_sat())399        self.assertLess(v1.get_value(), -2)400    def testAbsLt(self):401        v1 = Variable(-4, -1)402        m = Model(Abs(v1) < 2)403        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)404        s.solve()405        self.assertTrue(s.is_sat())406        self.assertGreater(v1.get_value(), -2)407    # ---------------- Reification of inequalities ----------------408    # Note that some of these tests require that the order encoding of the domain409    # is enabled. For now, this will just take a copy and set the bit to true,410    # but should find a better solution for parameterized test-cases in future.411    def testReifEqTrue(self):412        v1 = Variable(5)413        v2 = Variable()414        m = Model(v2 == (v1 == 3), v2 == 1)415        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)416        s.solve()417        self.assertTrue(s.is_sat())418        self.assertEqual(v1.get_value(), 3)419        self.assertEqual(v2.get_value(), 1)420    def testReifEqFalse(self):421        v1 = Variable(5)422        v2 = Variable()423        m = Model(v2 == (v1 == 3), v2 == 0)424        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)425        s.solve()426        self.assertTrue(s.is_sat())427        self.assertNotEqual(v1.get_value(), 3)428        self.assertEqual(v2.get_value(), 0)429    def testReifNeTrue(self):430        v1 = Variable(5)431        v2 = Variable()432        m = Model(v2 == (v1 != 3), v2 == 1)433        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)434        s.solve()435        self.assertTrue(s.is_sat())436        self.assertNotEqual(v1.get_value(), 3)437        self.assertEqual(v2.get_value(), 1)438    def testReifNeFalse(self):439        v1 = Variable(5)440        v2 = Variable()441        m = Model(v2 == (v1 != 3), v2 == 0)442        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)443        s.solve()444        self.assertTrue(s.is_sat())445        self.assertEqual(v1.get_value(), 3)446        self.assertEqual(v2.get_value(), 0)447    def testReifLeTrue(self):448        v1 = Variable(5)449        v2 = Variable()450        m = Model(v2 == (v1 <= 3), v2 == 1)451        e = copy(SATEncodingTest.encoding)452        e.order = True453        s = SATEncodingTest.solver(m, encoding=e)454        s.solve()455        self.assertTrue(s.is_sat())456        self.assertLessEqual(v1.get_value(), 3)457        self.assertEqual(v2.get_value(), 1)458    def testReifLeFalse(self):459        v1 = Variable(5)460        v2 = Variable()461        m = Model(v2 == (v1 <= 3), v2 == 0)462        e = copy(SATEncodingTest.encoding)463        e.order = True464        s = SATEncodingTest.solver(m, encoding=e)465        s.solve()466        self.assertTrue(s.is_sat())467        self.assertGreater(v1.get_value(), 3)468        self.assertEqual(v2.get_value(), 0)469    def testReifLtTrue(self):470        v1 = Variable(5)471        v2 = Variable()472        m = Model(v2 == (v1 < 3), v2 == 1)473        e = copy(SATEncodingTest.encoding)474        e.order = True475        s = SATEncodingTest.solver(m, encoding=e)476        s.solve()477        self.assertTrue(s.is_sat())478        self.assertLess(v1.get_value(), 3)479        self.assertEqual(v2.get_value(), 1)480    def testReifLtFalse(self):481        v1 = Variable(5)482        v2 = Variable()483        m = Model(v2 == (v1 < 3), v2 == 0)484        e = copy(SATEncodingTest.encoding)485        e.order = True486        s = SATEncodingTest.solver(m, encoding=e)487        s.solve()488        self.assertTrue(s.is_sat())489        self.assertGreaterEqual(v1.get_value(), 3)490        self.assertEqual(v2.get_value(), 0)491    def testReifGeTrue(self):492        v1 = Variable(5)493        v2 = Variable()494        m = Model(v2 == (v1 >= 3), v2 == 1)495        e = copy(SATEncodingTest.encoding)496        e.order = True497        s = SATEncodingTest.solver(m, encoding=e)498        s.solve()499        self.assertTrue(s.is_sat())500        self.assertGreaterEqual(v1.get_value(), 3)501        self.assertEqual(v2.get_value(), 1)502    def testReifGeFalse(self):503        v1 = Variable(5)504        v2 = Variable()505        m = Model(v2 == (v1 >= 3), v2 == 0)506        e = copy(SATEncodingTest.encoding)507        e.order = True508        s = SATEncodingTest.solver(m, encoding=e)509        s.solve()510        self.assertTrue(s.is_sat())511        self.assertLess(v1.get_value(), 3)512        self.assertEqual(v2.get_value(), 0)513    def testReifGtTrue(self):514        v1 = Variable(5)515        v2 = Variable()516        m = Model(v2 == (v1 > 3), v2 == 1)517        e = copy(SATEncodingTest.encoding)518        e.order = True519        s = SATEncodingTest.solver(m, encoding=e)520        s.solve()521        self.assertTrue(s.is_sat())522        self.assertGreater(v1.get_value(), 3)523        self.assertEqual(v2.get_value(), 1)524    def testReifGtFalse(self):525        v1 = Variable(5)526        v2 = Variable()527        m = Model(v2 == (v1 > 3), v2 == 0)528        e = copy(SATEncodingTest.encoding)529        e.order = True530        s = SATEncodingTest.solver(m, encoding=e)531        s.solve()532        self.assertTrue(s.is_sat())533        self.assertLessEqual(v1.get_value(), 3)534        self.assertEqual(v2.get_value(), 0)535    # ---------------- Reification of inequalities between two expressions ----------------536    def testReif2ExprEqTrue(self):537        v1 = Variable(5)538        v2 = Variable()539        v3 = Variable(5)540        m = Model(v2 == (v1 == v3 + 2), v2 == 1)541        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)542        s.solve()543        self.assertTrue(s.is_sat())544        self.assertEqual(v1.get_value(), v3.get_value() + 2)545        self.assertEqual(v2.get_value(), 1)546    def testReif2ExprEqFalse(self):547        v1 = Variable(5)548        v2 = Variable()549        v3 = Variable(5)550        m = Model(v2 == (v1 == v3 + 2), v2 == 0)551        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)552        s.solve()553        self.assertTrue(s.is_sat())554        self.assertNotEqual(v1.get_value(), v3.get_value() + 2)555        self.assertEqual(v2.get_value(), 0)556    def testReif2ExprNeTrue(self):557        v1 = Variable(5)558        v2 = Variable()559        v3 = Variable(5)560        m = Model(v2 == (v1 != v3 + 2), v2 == 1)561        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)562        s.solve()563        self.assertTrue(s.is_sat())564        self.assertNotEqual(v1.get_value(), v3.get_value() + 2)565        self.assertEqual(v2.get_value(), 1)566    def testReif2ExprNeFalse(self):567        v1 = Variable(5)568        v2 = Variable()569        v3 = Variable(5)570        m = Model(v2 == (v1 != v3 + 2), v2 == 0)571        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)572        s.solve()573        self.assertTrue(s.is_sat())574        self.assertEqual(v1.get_value(), v3.get_value() + 2)575        self.assertEqual(v2.get_value(), 0)576    def testReif2ExprLeTrue(self):577        v1 = Variable(5)578        v2 = Variable()579        v3 = Variable(5)580        m = Model(v2 == (v1 <= v3 + 2), v2 == 1)581        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)582        s.solve()583        self.assertTrue(s.is_sat())584        self.assertLessEqual(v1.get_value(), v3.get_value() + 2)585        self.assertEqual(v2.get_value(), 1)586    def testReif2ExprLeFalse(self):587        v1 = Variable(5)588        v2 = Variable()589        v3 = Variable(5)590        m = Model(v2 == (v1 <= v3 + 2), v2 == 0)591        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)592        s.solve()593        self.assertTrue(s.is_sat())594        self.assertGreater(v1.get_value(), v3.get_value() + 2)595        self.assertEqual(v2.get_value(), 0)596    def testReif2ExprLtTrue(self):597        v1 = Variable(5)598        v2 = Variable()599        v3 = Variable(5)600        m = Model(v2 == (v1 < v3 + 2), v2 == 1)601        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)602        s.solve()603        self.assertTrue(s.is_sat())604        self.assertLess(v1.get_value(), v3.get_value() + 2)605        self.assertEqual(v2.get_value(), 1)606    def testReif2ExprLtFalse(self):607        v1 = Variable(5)608        v2 = Variable()609        v3 = Variable(5)610        m = Model(v2 == (v1 < v3 + 2), v2 == 0)611        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)612        s.solve()613        self.assertTrue(s.is_sat())614        self.assertGreaterEqual(v1.get_value(), v3.get_value() + 2)615        self.assertEqual(v2.get_value(), 0)616    def testReif2ExprGeTrue(self):617        v1 = Variable(5)618        v2 = Variable()619        v3 = Variable(5)620        m = Model(v2 == (v1 >= v3 + 2), v2 == 1)621        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)622        s.solve()623        self.assertTrue(s.is_sat())624        self.assertGreaterEqual(v1.get_value(), v3.get_value() + 2)625        self.assertEqual(v2.get_value(), 1)626    def testReif2ExprGeFalse(self):627        v1 = Variable(5)628        v2 = Variable()629        v3 = Variable(5)630        m = Model(v2 == (v1 >= v3 + 2), v2 == 0)631        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)632        s.solve()633        self.assertTrue(s.is_sat())634        self.assertLess(v1.get_value(), v3.get_value() + 2)635        self.assertEqual(v2.get_value(), 0)636    def testReif2ExprGtTrue(self):637        v1 = Variable(5)638        v2 = Variable()639        v3 = Variable(5)640        m = Model(v2 == (v1 > v3 + 2), v2 == 1)641        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)642        s.solve()643        self.assertTrue(s.is_sat())644        self.assertGreater(v1.get_value(), v3.get_value() + 2)645        self.assertEqual(v2.get_value(), 1)646    def testReif2ExprGtFalse(self):647        v1 = Variable(5)648        v2 = Variable()649        v3 = Variable(5)650        m = Model(v2 == (v1 > v3 + 2), v2 == 0)651        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)652        s.solve()653        self.assertTrue(s.is_sat())654        self.assertLessEqual(v1.get_value(), v3.get_value() + 2)655        self.assertEqual(v2.get_value(), 0)656    def testReif2ExprMul(self):657        v1 = Variable(5)658        v2 = Variable()659        v3 = Variable(1, 5)660        v4 = Variable(1, 5)661        m = Model(v2 == (v1 == v3 * v4), v2 == 1)662        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)663        s.solve()664        self.assertTrue(s.is_sat())665        self.assertEqual(v1.get_value(), v3.get_value() * v4.get_value())666        self.assertEqual(v2.get_value(), 1)667    def testReif2ExprFactorDomain(self):668        v1 = Variable(5)669        v2 = Variable()670        v3 = Variable(5)671        m = Model(v2 == (v1 == v3 * 2), v2 == 1)672        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)673        s.solve()674        self.assertTrue(s.is_sat())675        self.assertEqual(v1.get_value(), v3.get_value() * 2)676        self.assertEqual(v2.get_value(), 1)677    # ---------------- Modulus ----------------678    def testModConstant(self):679        v1 = Variable(10)680        v2 = Variable(5)681        m = Model(v2 == (v1 % 3), v1 == 4)682        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)683        s.solve()684        self.assertTrue(s.is_sat())685        self.assertEqual(v1.get_value(), 4)686        self.assertEqual(v2.get_value(), 1)687    def testModNegConstant(self):688        v1 = Variable(-5, 5)689        v2 = Variable(-5, 5)690        m = Model(v2 == (v1 % 3), v1 == -5)691        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)692        s.solve()693        self.assertTrue(s.is_sat())694        self.assertEqual(v1.get_value(), -5)695        self.assertEqual(v2.get_value(), -2)696    def testModTwoVars(self):697        v1 = Variable(-10, 10)698        v2 = Variable(-5, 5)699        v3 = Variable(-5, 5)700        m = Model(v3 == (v1 % v2), v1 == -5, v2 == 3)701        s = SATEncodingTest.solver(m, encoding=SATEncodingTest.encoding)702        s.solve()703        self.assertTrue(s.is_sat())704        self.assertEqual(v1.get_value(), -5)705        self.assertEqual(v2.get_value(), 3)...parse_patient_files_full.py
Source:parse_patient_files_full.py  
...10class ParsePatientFiles:11	def __init__(self):12		# Open a file13		base_dir = os.path.dirname(os.path.abspath(__file__))14	def get_value(self, sheet, row, col, type='', nullable=True):15		val = sheet.cell_value(row, col)16		if val == '':17			if nullable:18				return None19			elif type == 'convert_x_to_boolean':20				return False21			else:22				print('val: ', val, ' | type:', type, ' | nullable:', nullable)23				print('ERROR')24		else:25			if type == int:26				return int(val)27			elif type == 'convert_float_to_date':28				return self.convert_float_to_date(val)29			elif type == 'convert_x_to_boolean':30				return self.convert_x_to_boolean(val)31			else:32				return val33	def convert_float_to_date(self, date):34		if date != '':35			serial = date36			seconds = (serial - 25569) * 86400.037			date = datetime.datetime.utcfromtimestamp(seconds)38			return date.strftime('%m/%d/%y')39	def convert_x_to_boolean(self, x):40		if x == 'X':41			return True42		else: 43			return False44	def get_patient(self, sheet):45		patient = {}46		patient['patient_id'] = self.get_value(sheet,2,4,int,False)47		patient['first_name'] = self.get_value(sheet,2,1,'',False)48		patient['last_name'] = self.get_value(sheet,2,2,'',False)49		patient['gender'] = self.get_value(sheet,2,10,'',False)50		patient['age'] = self.get_value(sheet,2,7,int,False)51		patient['relationship'] = self.get_value(sheet,4,4)52		patient['first_appt_date'] = self.get_value(sheet,0,10,'convert_float_to_date')53		# print(OrderedDict(sorted(patient.items(), key=lambda t: t[0])))54		return patient55	def get_visit(self, sheet, patient):56		visit = {}57		visit['patient_id'] = int(patient['patient_id'])58		visit['visit_date'] = self.get_value(sheet,0,1,'convert_float_to_date',False)59		visit['next_appt_date'] = self.get_value(sheet,0,10,'convert_float_to_date')60		visit['pcp_name'] = self.get_value(sheet,4,1) + ' ' + self.get_value(sheet,4,2)61		visit['case_status'] = self.get_value(sheet,6,1)62		visit['health_risk_assessment'] = self.get_value(sheet,6,4)63		visit['biometrics'] = self.get_value(sheet,6,7)64		visit['coach_initials'] = self.get_value(sheet,6,10)65		visit['comments'] = self.get_value(sheet,8,1)66		# print(OrderedDict(sorted(visit.items(), key=lambda t: t[0])))67		return visit68	def get_diagnosis(self, sheet, patient, visit):69		diagnosis = {}70		diagnosis['patient_id'] = int(patient['patient_id'])71		diagnosis['visit_date'] = visit['visit_date']72		diagnosis['case_status'] = visit['case_status']73		diagnosis['overweight'] = self.get_value(sheet,12,0,'convert_x_to_boolean',False)74		diagnosis['obese'] = self.get_value(sheet,13,0,'convert_x_to_boolean',False)75		diagnosis['hypertension'] = self.get_value(sheet,14,0,'convert_x_to_boolean',False)76		diagnosis['cad'] = self.get_value(sheet,15,0,'convert_x_to_boolean',False)77		diagnosis['chf'] = self.get_value(sheet,16,0,'convert_x_to_boolean',False)78		diagnosis['hyperlipidemia'] = self.get_value(sheet,17,0,'convert_x_to_boolean',False)79		diagnosis['prediabetes'] = self.get_value(sheet,18,0,'convert_x_to_boolean',False)80		diagnosis['diabetes'] = self.get_value(sheet,19,0,'convert_x_to_boolean',False)81		diagnosis['asthma'] = self.get_value(sheet,20,0,'convert_x_to_boolean',False)82		diagnosis['copd'] = self.get_value(sheet,21,0,'convert_x_to_boolean',False)83		diagnosis['depression'] = self.get_value(sheet,22,0,'convert_x_to_boolean',False)84		diagnosis['nicotine_use'] = self.get_value(sheet,23,0,'convert_x_to_boolean',False)85		# print(OrderedDict(sorted(diagnosis.items(), key=lambda t: t[0])))86		return diagnosis87	def get_weight_management(self, sheet, patient, visit):88		weight_management = {}89		weight_management['patient_id'] = int(patient['patient_id'])90		weight_management['visit_date'] = visit['visit_date']91		weight_management['height_measured'] = self.get_value(sheet,12,4,int)92		weight_management['height_baseline'] = self.get_value(sheet,12,5,int)93		weight_management['waist_measured'] = self.get_value(sheet,13,4,int)94		weight_management['waist_baseline'] = self.get_value(sheet,13,5,int)95		weight_management['waist_progress'] = self.get_value(sheet,13,6,int)96		weight_management['waist_goal'] = self.get_value(sheet,13,7)97		weight_management['waist_date_goal_achieved'] = self.get_value(sheet,13,8,'convert_float_to_date')98		weight_management['waist_progress_notes'] = self.get_value(sheet,13,9)99		weight_management['weight_measured'] = self.get_value(sheet,14,4,int)100		weight_management['weight_baseline'] = self.get_value(sheet,14,5,int)101		weight_management['weight_progress'] = self.get_value(sheet,14,6,int)102		weight_management['weight_goal'] = self.get_value(sheet,14,7)103		weight_management['weight_date_goal_achieved'] = self.get_value(sheet,14,8,'convert_float_to_date')104		weight_management['weight_progress_notes'] = self.get_value(sheet,4,9)105		weight_management['bmi_measured'] = self.get_value(sheet,15,4,int)106		weight_management['bmi_baseline'] = self.get_value(sheet,15,5,int)107		weight_management['bmi_progress'] = self.get_value(sheet,15,6,int)108		weight_management['bmi_date_goal_achieved'] = self.get_value(sheet,15,8,'convert_float_to_date')109		weight_management['bmi_progress_notes'] = self.get_value(sheet,15,9)110		# print(OrderedDict(sorted(weight_management.items(), key=lambda t: t[0])))111		return weight_management112	def get_blood_pressure_control(self, sheet, patient, visit):113		blood_pressure_control = {}114		blood_pressure_control['patient_id'] = int(patient['patient_id'])115		blood_pressure_control['visit_date'] = visit['visit_date']116		blood_pressure_control['systolic_measured'] = self.get_value(sheet,16,4,int)117		blood_pressure_control['systolic_baseline'] = self.get_value(sheet,16,5,int)118		blood_pressure_control['systolic_progress'] = self.get_value(sheet,16,6,int)119		blood_pressure_control['systolic_goal'] = self.get_value(sheet,16,7,int)120		blood_pressure_control['systolic_date_goal_achieved'] = self.get_value(sheet,16,8,'convert_float_to_date')121		blood_pressure_control['systolic_progress_notes'] = self.get_value(sheet,16,9)122		blood_pressure_control['diastolic_measured'] = self.get_value(sheet,17,4,int)123		blood_pressure_control['diastolic_baseline'] = self.get_value(sheet,17,5,int)124		blood_pressure_control['diastolic_progress'] = self.get_value(sheet,17,6,int)125		blood_pressure_control['diastolic_goal'] = self.get_value(sheet,17,7,int)126		blood_pressure_control['diastolic_date_goal_achieved'] = self.get_value(sheet,17,8,'convert_float_to_date')127		blood_pressure_control['diastolic_progress_notes'] = self.get_value(sheet,17,9)128		# print(OrderedDict(sorted(blood_pressure_control.items(), key=lambda t: t[0])))129		return blood_pressure_control130	def get_lipid_management(self, sheet, patient, visit):131		lipid_management = {}132		lipid_management['patient_id'] = int(patient['patient_id'])133		lipid_management['visit_date'] = visit['visit_date']134		lipid_management['tc_measured'] = self.get_value(sheet,18,4,int)135		lipid_management['tc_baseline'] = self.get_value(sheet,18,5,int)136		lipid_management['tc_progress'] = self.get_value(sheet,18,6,int)137		lipid_management['tc_date_goal_achieved'] = self.get_value(sheet,18,8,'convert_float_to_date')138		lipid_management['tc_progress_notes'] = self.get_value(sheet,18,9)139		lipid_management['ldl_measured'] = self.get_value(sheet,19,4,int)140		lipid_management['ldl_baseline'] = self.get_value(sheet,19,5,int)141		lipid_management['ldl_progress'] = self.get_value(sheet,19,6,int)142		lipid_management['ldl_goal'] = self.get_value(sheet,19,7,int)143		lipid_management['ldl_date_goal_achieved'] = self.get_value(sheet,19,8,'convert_float_to_date')144		lipid_management['ldl_progress_notes'] = self.get_value(sheet,19,9)145		lipid_management['hdl_measured'] = self.get_value(sheet,20,4,int)146		lipid_management['hdl_baseline'] = self.get_value(sheet,20,5,int)147		lipid_management['hdl_progress'] = self.get_value(sheet,20,6,int)148		lipid_management['hdl_goal'] = self.get_value(sheet,20,7,int)149		lipid_management['hdl_date_goal_achieved'] = self.get_value(sheet,20,8,'convert_float_to_date')150		lipid_management['hdl_progress_notes'] = self.get_value(sheet,20,9)151		lipid_management['tgs_measured'] = self.get_value(sheet,21,4,int)152		lipid_management['tgs_baseline'] = self.get_value(sheet,21,5,int)153		lipid_management['tgs_progress'] = self.get_value(sheet,21,6,int)154		lipid_management['tgs_date_goal_achieved'] = self.get_value(sheet,21,8,'convert_float_to_date')155		lipid_management['tgs_progress_notes'] = self.get_value(sheet,21,9)156		# print(OrderedDict(sorted(lipid_management.items(), key=lambda t: t[0])))157		return lipid_management158	def diabetes(self, sheet, patient, visit):159		diabetes = {}160		diabetes['patient_id'] = int(patient['patient_id'])161		diabetes['visit_date'] = visit['visit_date']162		diabetes['fbg_measured'] = self.get_value(sheet,22,4,int)163		diabetes['fbg_baseline'] = self.get_value(sheet,22,5,int)164		diabetes['fbg_progress'] = self.get_value(sheet,22,6,int)165		diabetes['fbg_date_goal_achieved'] = self.get_value(sheet,22,8,'convert_float_to_date')166		diabetes['fbg_progress_notes'] = self.get_value(sheet,22,9)167		diabetes['hb_measured'] = self.get_value(sheet,23,4,int)168		diabetes['hb_baseline'] = self.get_value(sheet,23,5,int)169		diabetes['hb_progress'] = self.get_value(sheet,23,6,int)170		diabetes['hb_date_goal_achieved'] = self.get_value(sheet,23,8,'convert_float_to_date')171		diabetes['hb_progress_notes'] = self.get_value(sheet,23,9)172		diabetes['retinal_measured'] = self.get_value(sheet,24,4)173		diabetes['retinal_baseline'] = self.get_value(sheet,24,5)174		diabetes['retinal_progress'] = self.get_value(sheet,24,6,int)175		diabetes['retinal_goal'] = self.get_value(sheet,24,7)176		diabetes['retinal_date_goal_achieved'] = self.get_value(sheet,24,8,'convert_float_to_date')177		diabetes['retinal_progress_notes'] = self.get_value(sheet,24,9)178		diabetes['renal_measured'] = self.get_value(sheet,25,4)179		diabetes['renal_baseline'] = self.get_value(sheet,25,5)180		diabetes['renal_progress'] = self.get_value(sheet,25,6,int)181		diabetes['renal_goal'] = self.get_value(sheet,25,7)182		diabetes['renal_date_goal_achieved'] = self.get_value(sheet,25,8,'convert_float_to_date')183		diabetes['renal_progress_notes'] = self.get_value(sheet,25,9)184		diabetes['foot_measured'] = self.get_value(sheet,26,4)185		diabetes['foot_baseline'] = self.get_value(sheet,26,5)186		diabetes['foot_progress'] = self.get_value(sheet,26,6,int)187		diabetes['foot_goal'] = self.get_value(sheet,26,7)188		diabetes['foot_date_goal_achieved'] = self.get_value(sheet,26,8,'convert_float_to_date')189		diabetes['foot_progress_notes'] = self.get_value(sheet,26,9)190		# print(OrderedDict(sorted(diabetes.items(), key=lambda t: t[0])))191		return diabetes192	def get_compliance(self, sheet, patient, visit):193		compliance = {}194		compliance['patient_id'] = int(patient['patient_id'])195		compliance['visit_date'] = visit['visit_date']196		compliance['meds_measured'] = self.get_value(sheet,27,4)197		compliance['meds_baseline'] = self.get_value(sheet,27,5)198		compliance['meds_progress'] = self.get_value(sheet,27,6,int)199		compliance['meds_date_goal_achieved'] = self.get_value(sheet,27,8,'convert_float_to_date')200		compliance['meds_progress_notes'] = self.get_value(sheet,27,9)201		compliance['diet_measured'] = self.get_value(sheet,28,4)202		compliance['diet_baseline'] = self.get_value(sheet,28,5)203		compliance['diet_progress'] = self.get_value(sheet,28,6,int)204		compliance['diet_date_goal_achieved'] = self.get_value(sheet,28,8,'convert_float_to_date')205		compliance['diet_progress_notes'] = self.get_value(sheet,28,9)206		compliance['exercise_measured'] = self.get_value(sheet,29,4)207		compliance['exercise_baseline'] = self.get_value(sheet,29,5)208		compliance['exercise_progress'] = self.get_value(sheet,29,6,int)209		compliance['exercise_date_goal_achieved'] = self.get_value(sheet,29,8,'convert_float_to_date')210		compliance['exercise_progress_notes'] = self.get_value(sheet,29,9)211		compliance['nicotine_measured'] = self.get_value(sheet,30,4)212		compliance['nicotine_baseline'] = self.get_value(sheet,30,5)213		compliance['nicotine_progress'] = self.get_value(sheet,30,6,int)214		compliance['nicotine_goal'] = self.get_value(sheet,30,7)215		compliance['nicotine_date_goal_achieved'] = self.get_value(sheet,30,8,'convert_float_to_date')216		compliance['nicotine_progress_notes'] = self.get_value(sheet,30,9)217		# print(OrderedDict(sorted(compliance.items(), key=lambda t: t[0])))218		return compliance219	220########################################################################################################################221	# initialize database and update/insert data222	def process_files(self, patientFiles_path):223		db = CreateDB('patients.db','patients_schema.sql')224		path = os.path.join(patientFiles_path, '*.xlsm')225		for fname in glob.glob(path):226			print("processing ", os.path.basename(fname), "...")227			workbook = xlrd.open_workbook(fname)228			worksheets = workbook.sheet_names()229			for worksheet_name in worksheets:230				print("worksheet: ", worksheet_name)...parsePatientFiles_old.py
Source:parsePatientFiles_old.py  
...12		# Open a file13		base_dir = os.path.dirname(os.path.abspath(__file__))14		# path = "/Users/alyssafreeman/Documents/git/kensa_python/carePlanDashboard/patientFiles/*.xlsm"15		# dirs = os.listdir(path)16	def get_value(self, sheet, row, col, type='', nullable=True):17		val = sheet.cell_value(row, col)18		if val == '':19			if nullable:20				return None21			else:22				print('val: ', val, ' | type:', type, ' | nullable:', nullable)23				print('ERROR')24		else:25			if type == int:26				return int(val)27			elif type == 'convert_float_to_date':28				return self.convert_float_to_date(val)29			elif type == 'convert_x_to_boolean':30				return self.convert_x_to_boolean31			else:32				return val33	def convert_float_to_date(self, date):34		if date != '':35			serial = date36			seconds = (serial - 25569) * 86400.037			date = datetime.datetime.utcfromtimestamp(seconds)38			return date.strftime('%m/%d/%y')39	def convert_x_to_boolean(self, x):40		if x == 'X':41			return True42		else: 43			return False44	def get_patient(self, sheet):45		patient = {}46		patient['patient_id'] = self.get_value(sheet,2,4,int,False)47		patient['first_name'] = self.get_value(sheet,2,1,'',False)48		patient['last_name'] = self.get_value(sheet,2,2,'',False)49		patient['gender'] = self.get_value(sheet,2,10,'',False)50		patient['age'] = self.get_value(sheet,2,7,int,False)51		patient['relationship'] = self.get_value(sheet,4,4)52		patient['first_appt_date'] = self.get_value(sheet,0,10,'convert_float_to_date')53		db.insert_data('patient',patient)54		# print(OrderedDict(sorted(patient.items(), key=lambda t: t[0])))55		return sheet56	def get_visit(self, sheet):57		visit = {}58		visit['visit_date'] = self.get_value(sheet,0,1,'convert_float_to_date',False)59		visit['next_appt_date'] = self.get_value(sheet,0,10,'convert_float_to_date')60		visit['pcp_name'] = self.get_value(sheet,4,1) + ' ' + self.get_value(sheet,4,2)61		visit['case_status'] = self.get_value(sheet,6,1)62		visit['health_risk_assessment'] = self.get_value(sheet,6,4)63		visit['biometrics'] = self.get_value(sheet,6,7)64		visit['coach_initials'] = self.get_value(sheet,6,10)65		visit['comments'] = self.get_value(sheet,8,1)66		# print(OrderedDict(sorted(visit.items(), key=lambda t: t[0])))67		return sheet68	def get_diagnosis(self, sheet):69		diagnosis = {}70		diagnosis['overweight'] = self.get_value(sheet,12,0,'convert_x_to_boolean')71		diagnosis['obese'] = self.get_value(sheet,13,0,'convert_x_to_boolean')72		diagnosis['hypertension'] = self.get_value(sheet,14,0,'convert_x_to_boolean')73		diagnosis['cad'] = self.get_value(sheet,15,0,'convert_x_to_boolean')74		diagnosis['chf'] = self.get_value(sheet,16,0,'convert_x_to_boolean')75		diagnosis['hyperlipidemia'] = self.get_value(sheet,17,0,'convert_x_to_boolean')76		diagnosis['prediabetes'] = self.get_value(sheet,18,0,'convert_x_to_boolean')77		diagnosis['diabetes'] = self.get_value(sheet,19,0,'convert_x_to_boolean')78		diagnosis['asthma'] = self.get_value(sheet,20,0,'convert_x_to_boolean')79		diagnosis['copd'] = self.get_value(sheet,21,0,'convert_x_to_boolean')80		diagnosis['depression'] = self.get_value(sheet,22,0,'convert_x_to_boolean')81		diagnosis['nicotine_use'] = self.get_value(sheet,23,0,'convert_x_to_boolean')82		# print(OrderedDict(sorted(diagnosis.items(), key=lambda t: t[0])))83		return sheet84	def get_weight_management(self, sheet):85		weight_management = {}86		weight_management['height_measured'] = self.get_value(sheet,12,4,int)87		weight_management['height_baseline'] = self.get_value(sheet,12,5,int)88		weight_management['waist_measured'] = self.get_value(sheet,13,4,int)89		weight_management['waist_baseline'] = self.get_value(sheet,13,5,int)90		weight_management['waist_progress'] = self.get_value(sheet,13,6,int)91		weight_management['waist_goal'] = self.get_value(sheet,13,7)92		weight_management['waist_date_goal_achieved'] = self.get_value(sheet,13,8,'convert_float_to_date')93		weight_management['waist_progress_notes'] = self.get_value(sheet,13,9)94		weight_management['weight_measured'] = self.get_value(sheet,14,4,int)95		weight_management['weight_baseline'] = self.get_value(sheet,14,5,int)96		weight_management['weight_progress'] = self.get_value(sheet,14,6,int)97		weight_management['weight_goal'] = self.get_value(sheet,14,7)98		weight_management['weight_date_goal_achieved'] = self.get_value(sheet,14,8,'convert_float_to_date')99		weight_management['weight_progress_notes'] = self.get_value(sheet,4,9)100		weight_management['bmi_measured'] = self.get_value(sheet,15,4,int)101		weight_management['bmi_baseline'] = self.get_value(sheet,15,5,int)102		weight_management['bmi_progress'] = self.get_value(sheet,15,6,int)103		weight_management['bmi_date_goal_achieved'] = self.get_value(sheet,15,8,'convert_float_to_date')104		weight_management['bmi_progress_notes'] = self.get_value(sheet,15,9)105		# print(OrderedDict(sorted(weight_management.items(), key=lambda t: t[0])))106		return sheet107	def get_blood_pressure_control(self, sheet):108		blood_pressure_control = {}109		blood_pressure_control['systolic_measured'] = self.get_value(sheet,16,4,int)110		blood_pressure_control['systolic_baseline'] = self.get_value(sheet,16,5,int)111		blood_pressure_control['systolic_progress'] = self.get_value(sheet,16,6,int)112		blood_pressure_control['systolic_goal'] = self.get_value(sheet,16,7,int)113		blood_pressure_control['systolic_date_goal_achieved'] = self.get_value(sheet,16,8,'convert_float_to_date')114		blood_pressure_control['systolic_progress_notes'] = self.get_value(sheet,16,9)115		blood_pressure_control['diastolic_measured'] = self.get_value(sheet,17,4,int)116		blood_pressure_control['diastolic_baseline'] = self.get_value(sheet,17,5,int)117		blood_pressure_control['diastolic_progress'] = self.get_value(sheet,17,6,int)118		blood_pressure_control['diastolic_goal'] = self.get_value(sheet,17,7,int)119		blood_pressure_control['diastolic_date_goal_achieved'] = self.get_value(sheet,17,8,'convert_float_to_date')120		blood_pressure_control['diastolic_progress_notes'] = self.get_value(sheet,17,9)121		# print(OrderedDict(sorted(blood_pressure_control.items(), key=lambda t: t[0])))122		return sheet123	def get_lipid_management(self, sheet):124		lipid_management = {}125		lipid_management['tc_measured'] = self.get_value(sheet,18,4,int)126		lipid_management['tc_baseline'] = self.get_value(sheet,18,5,int)127		lipid_management['tc_progress'] = self.get_value(sheet,18,6,int)128		lipid_management['tc_date_goal_achieved'] = self.get_value(sheet,18,8,'convert_float_to_date')129		lipid_management['tc_progress_notes'] = self.get_value(sheet,18,9)130		lipid_management['ldl_measured'] = self.get_value(sheet,19,4,int)131		lipid_management['ldl_baseline'] = self.get_value(sheet,19,5,int)132		lipid_management['ldl_progress'] = self.get_value(sheet,19,6,int)133		lipid_management['ldl_goal'] = self.get_value(sheet,19,7,int)134		lipid_management['ldl_date_goal_achieved'] = self.get_value(sheet,19,8,'convert_float_to_date')135		lipid_management['ldl_progress_notes'] = self.get_value(sheet,19,9)136		lipid_management['hdl_measured'] = self.get_value(sheet,20,4,int)137		lipid_management['hdl_baseline'] = self.get_value(sheet,20,5,int)138		lipid_management['hdl_progress'] = self.get_value(sheet,20,6,int)139		lipid_management['hdl_goal'] = self.get_value(sheet,20,7,int)140		lipid_management['hdl_date_goal_achieved'] = self.get_value(sheet,20,8,'convert_float_to_date')141		lipid_management['hdl_progress_notes'] = self.get_value(sheet,20,9)142		lipid_management['tgs_measured'] = self.get_value(sheet,21,4,int)143		lipid_management['tgs_baseline'] = self.get_value(sheet,21,5,int)144		lipid_management['tgs_progress'] = self.get_value(sheet,21,6,int)145		lipid_management['tgs_date_goal_achieved'] = self.get_value(sheet,21,8,'convert_float_to_date')146		lipid_management['tgs_progress_notes'] = self.get_value(sheet,21,9)147		# print(OrderedDict(sorted(lipid_management.items(), key=lambda t: t[0])))148		return sheet149	def get_fbg_measured(self, sheet):150		fbg_measured = {}151		fbg_measured['fbg_measured'] = self.get_value(sheet,22,4,int)152		fbg_measured['fbg_baseline'] = self.get_value(sheet,22,5,int)153		fbg_measured['fbg_progress'] = self.get_value(sheet,22,6,int)154		fbg_measured['fbg_date_goal_achieved'] = self.get_value(sheet,22,8,'convert_float_to_date')155		fbg_measured['fbg_progress_notes'] = self.get_value(sheet,22,9)156		fbg_measured['hb_measured'] = self.get_value(sheet,23,4,int)157		fbg_measured['hb_baseline'] = self.get_value(sheet,23,5,int)158		fbg_measured['hb_progress'] = self.get_value(sheet,23,6,int)159		fbg_measured['hb_date_goal_achieved'] = self.get_value(sheet,23,8,'convert_float_to_date')160		fbg_measured['hb_progress_notes'] = self.get_value(sheet,23,9)161		fbg_measured['retinal_measured'] = self.get_value(sheet,24,4)162		fbg_measured['retinal_baseline'] = self.get_value(sheet,24,5)163		fbg_measured['retinal_progress'] = self.get_value(sheet,24,6,int)164		fbg_measured['retinal_goal'] = self.get_value(sheet,24,7)165		fbg_measured['retinal_date_goal_achieved'] = self.get_value(sheet,24,8,'convert_float_to_date')166		fbg_measured['retinal_progress_notes'] = self.get_value(sheet,24,9)167		fbg_measured['renal_measured'] = self.get_value(sheet,25,4)168		fbg_measured['renal_baseline'] = self.get_value(sheet,25,5)169		fbg_measured['renal_progress'] = self.get_value(sheet,25,6,int)170		fbg_measured['renal_goal'] = self.get_value(sheet,25,7)171		fbg_measured['renal_date_goal_achieved'] = self.get_value(sheet,25,8,'convert_float_to_date')172		fbg_measured['renal_progress_notes'] = self.get_value(sheet,25,9)173		fbg_measured['foot_measured'] = self.get_value(sheet,26,4)174		fbg_measured['foot_baseline'] = self.get_value(sheet,26,5)175		fbg_measured['foot_progress'] = self.get_value(sheet,26,6,int)176		fbg_measured['foot_goal'] = self.get_value(sheet,26,7)177		fbg_measured['foot_date_goal_achieved'] = self.get_value(sheet,26,8,'convert_float_to_date')178		fbg_measured['foot_progress_notes'] = self.get_value(sheet,26,9)179		# print(OrderedDict(sorted(fbg_measured.items(), key=lambda t: t[0])))180		return sheet181	def get_compliance(self, sheet):182		compliance = {}183		compliance['meds_measured'] = self.get_value(sheet,27,4)184		compliance['meds_baseline'] = self.get_value(sheet,27,5)185		compliance['meds_progress'] = self.get_value(sheet,27,6,int)186		compliance['meds_date_goal_achieved'] = self.get_value(sheet,27,8,'convert_float_to_date')187		compliance['meds_progress_notes'] = self.get_value(sheet,27,9)188		compliance['diet_measured'] = self.get_value(sheet,28,4)189		compliance['diet_baseline'] = self.get_value(sheet,28,5)190		compliance['diet_progress'] = self.get_value(sheet,28,6,int)191		compliance['diet_date_goal_achieved'] = self.get_value(sheet,28,8,'convert_float_to_date')192		compliance['diet_progress_notes'] = self.get_value(sheet,28,9)193		compliance['exercise_measured'] = self.get_value(sheet,29,4)194		compliance['exercise_baseline'] = self.get_value(sheet,29,5)195		compliance['exercise_progress'] = self.get_value(sheet,29,6,int)196		compliance['exercise_date_goal_achieved'] = self.get_value(sheet,29,8,'convert_float_to_date')197		compliance['exercise_progress_notes'] = self.get_value(sheet,29,9)198		compliance['nicotine_measured'] = self.get_value(sheet,30,4)199		compliance['nicotine_baseline'] = self.get_value(sheet,30,5)200		compliance['nicotine_progress'] = self.get_value(sheet,30,6,int)201		compliance['nicotine_goal'] = self.get_value(sheet,30,7)202		compliance['nicotine_date_goal_achieved'] = self.get_value(sheet,30,8,'convert_float_to_date')203		compliance['nicotine_progress_notes'] = self.get_value(sheet,30,9)204		# print(OrderedDict(sorted(compliance.items(), key=lambda t: t[0])))205		return sheet206	207########################################################################################################################208# initialize database and update/insert data209db = DBInit('patient.db','patients_schema.sql')210pf = ParsePatientFiles()211# path = "/Users/alyssafreeman/Documents/git/kensa_python/carePlanDashboard/patientFiles/*.xlsm"212# path = os.path.join(db.base_dir, "/patientFiles/*.xlsm")213path = db.base_dir + "/patientFiles/*.xlsm"214for fname in glob.glob(path):215	print("processing ", os.path.basename(fname), "...")216	workbook = xlrd.open_workbook(fname)217	worksheets = workbook.sheet_names()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
