Best Python code snippet using tempest_python
test_monitor.py
Source:test_monitor.py  
...16TESTDESCRIPTION = "TESTDESCRIPTION"17def setup_basic_test(request, bigip):18    monitor1 = bigip.ltm.monitor19    return monitor120def delete_resource(resources):21    for resource in resources.get_collection():22        rn = resource.name23        if rn != 'http' and rn != 'http_head_f5' and rn != 'https'\24                and rn != 'https_443' and rn != 'https_head_f5'\25                and rn != 'diameter' and rn != 'dns' and rn != 'external'\26                and rn != 'firepass' and rn != 'ftp' and rn != 'gateway_icmp'\27                and rn != 'icmp' and rn != 'imap' and rn != 'inband'\28                and rn != 'ldap' and rn != 'module_score' and rn != 'mssql'\29                and rn != 'mysql' and rn != 'nntp' and rn != 'none'\30                and rn != 'oracle' and rn != 'pop3' and rn != 'postgresql'\31                and rn != 'radius' and rn != 'radius_accounting'\32                and rn != 'real_server' and rn != 'rpc' and rn != 'sasp'\33                and rn != 'scripted' and rn != 'sip' and rn != 'smb'\34                and rn != 'smtp' and rn != 'snmp_dca'\35                and rn != 'snmp_dca_base' and rn != 'soap'\36                and rn != 'tcp' and rn != 'tcp_echo'\37                and rn != 'tcp_half_open' and rn != 'udp'\38                and rn != 'virtual_location' and rn != 'wap' and rn != 'wmi':39            pp(resource.__dict__)40            resource.delete()41def setup_http_test(request, bigip, partition, name):42    def teardown():43        delete_resource(hc1)44    request.addfinalizer(teardown)45    hc1 = bigip.ltm.monitor.https46    pp(hc1._meta_data)47    http1 = hc1.http48    http1.create(name=name, partition=partition)49    return http1, hc150class TestMonitor(object):51    def test_get_collection(self, request, bigip):52        m1 = setup_basic_test(request, bigip)53        list_of_references = m1.get_collection()54        assert len(list_of_references) == 3955class TestMonitorHTTP(object):56    def test_http_create_refresh_update_delete_load(self, request, bigip):57        http1, hc1 = setup_http_test(request, bigip, 'Common', 'test1')58        assert http1.name == 'test1'59        http1.description = TESTDESCRIPTION60        http1.update()61        assert http1.description == TESTDESCRIPTION62        http1.description = ''63        http1.refresh()64        assert http1.description == TESTDESCRIPTION65        http2 = hc1.http66        http2.load(partition='Common', name='test1')67        assert http2.selfLink == http1.selfLink68# End HTTP Tests69# Begin HTTPS Tests70def setup_https_test(request, bigip, partition, name):71    def teardown():72        delete_resource(hc1)73    request.addfinalizer(teardown)74    hc1 = bigip.ltm.monitor.https_s75    https1 = hc1.https76    https1.create(name=name, partition=partition)77    return https1, hc178class TestMonitorHTTPS(object):79    def test_https_create_refresh_update_delete_load(self, request, bigip):80        https1, hc1 = setup_https_test(request, bigip, 'Common', 'httpstest')81        assert https1.name == 'httpstest'82        https1.description = TESTDESCRIPTION83        https1.update()84        assert https1.description == TESTDESCRIPTION85        https1.description = ''86        https1.refresh()87        assert https1.description == TESTDESCRIPTION88        https2 = hc1.https89        https2.load(partition='Common', name='httpstest')90        assert https2.selfLink == https1.selfLink91# End HTTPS Tests92# Begin Diameter Tests93def setup_diameter_test(request, bigip, partition, name):94    def teardown():95        delete_resource(hc1)96    request.addfinalizer(teardown)97    hc1 = bigip.ltm.monitor.diameters98    diameter1 = hc1.diameter99    diameter1.create(name=name, partition=partition)100    return diameter1, hc1101class TestMonitorDiameter(object):102    def test_diameter_create_refresh_update_delete_load(self, request, bigip):103        diameter1, hc1 = setup_diameter_test(request, bigip, 'Common',104                                             'diametertest')105        assert diameter1.name == 'diametertest'106        diameter1.description = TESTDESCRIPTION107        diameter1.update()108        assert diameter1.description == TESTDESCRIPTION109        diameter1.description = ''110        diameter1.refresh()111        assert diameter1.description == TESTDESCRIPTION112        diameter2 = hc1.diameter113        diameter2.load(partition='Common', name='diametertest')114        assert diameter2.selfLink == diameter1.selfLink115# End Diameter Tests116# Begin DNS Tests117def setup_dns_test(request, bigip, partition, name, qname):118    def teardown():119        delete_resource(hc1)120    request.addfinalizer(teardown)121    hc1 = bigip.ltm.monitor.dns_s122    dns1 = hc1.dns123    dns1.create(name=name, partition=partition, qname=qname)124    return dns1, hc1125class TestMonitorDNS(object):126    def test_dns_create_refresh_update_delete_load(self, request, bigip):127        dns1, hc1 = setup_dns_test(request, bigip, 'Common', 'dnstest', 'aqna')128        assert dns1.name == 'dnstest'129        dns1.description = TESTDESCRIPTION130        dns1.update()131        assert dns1.description == TESTDESCRIPTION132        dns1.description = ''133        dns1.refresh()134        assert dns1.description == TESTDESCRIPTION135        dns2 = hc1.dns136        dns2.load(partition='Common', name='dnstest')137        assert dns2.selfLink == dns1.selfLink138# End DNS Tests139# Begin External140def setup_external_test(request, bigip, partition, name):141    def teardown():142        delete_resource(hc1)143    request.addfinalizer(teardown)144    hc1 = bigip.ltm.monitor.externals145    external1 = hc1.external146    external1.create(name=name, partition=partition)147    return external1, hc1148class TestMonitorExternal(object):149    def test_external_create_refresh_update_delete_load(self, request, bigip):150        external1, hc1 = setup_external_test(request, bigip, 'Common',151                                             'externaltest')152        assert external1.name == 'externaltest'153        external1.description = TESTDESCRIPTION154        external1.update()155        assert external1.description == TESTDESCRIPTION156        external1.description = ''157        external1.refresh()158        assert external1.description == TESTDESCRIPTION159        external2 = hc1.external160        external2.load(partition='Common', name='externaltest')161        assert external2.selfLink == external1.selfLink162# End External Tests163# Begin FirePass164def setup_firepass_test(request, bigip, partition, name):165    def teardown():166        delete_resource(hc1)167    request.addfinalizer(teardown)168    hc1 = bigip.ltm.monitor.firepass_s169    firepass1 = hc1.firepass170    firepass1.create(name=name, partition=partition)171    return firepass1, hc1172class TestMonitorFirePass(object):173    def test_firepass_create_refresh_update_delete_load(self, request, bigip):174        firepass1, hc1 = setup_firepass_test(request, bigip, 'Common',175                                             'firepasstest')176        assert firepass1.name == 'firepasstest'177        firepass1.description = TESTDESCRIPTION178        firepass1.update()179        assert firepass1.description == TESTDESCRIPTION180        firepass1.description = ''181        firepass1.refresh()182        assert firepass1.description == TESTDESCRIPTION183        firepass2 = hc1.firepass184        firepass2.load(partition='Common', name='firepasstest')185        assert firepass2.selfLink == firepass1.selfLink186# End FirePass Tests187# Begin FTP Tests188def setup_ftp_test(request, bigip, partition, name):189    def teardown():190        delete_resource(hc1)191    request.addfinalizer(teardown)192    hc1 = bigip.ltm.monitor.ftps193    ftp1 = hc1.ftp194    ftp1.create(name=name, partition=partition)195    return ftp1, hc1196class TestMonitorFTP(object):197    def test_ftp_create_refresh_update_delete_load(self, request, bigip):198        ftp1, hc1 = setup_ftp_test(request, bigip, 'Common', 'ftptest')199        assert ftp1.name == 'ftptest'200        ftp1.description = TESTDESCRIPTION201        ftp1.update()202        assert ftp1.description == TESTDESCRIPTION203        ftp1.description = ''204        ftp1.refresh()205        assert ftp1.description == TESTDESCRIPTION206        ftp2 = hc1.ftp207        ftp2.load(partition='Common', name='ftptest')208        assert ftp2.selfLink == ftp1.selfLink209# End FTP Tests210# Begin GateWay-ICMP211def setup_gateway_icmp_test(request, bigip, partition, name):212    def teardown():213        delete_resource(hc1)214    request.addfinalizer(teardown)215    hc1 = bigip.ltm.monitor.gateway_icmps216    gateway_icmp1 = hc1.gateway_icmp217    gateway_icmp1.create(name=name, partition=partition)218    return gateway_icmp1, hc1219class TestMonitorGateWay_ICMP(object):220    def test_gateway_icmp_create_refresh_update_delete_load(self,221                                                            request,222                                                            bigip):223        gateway_icmp1, hc1 =\224            setup_gateway_icmp_test(request,225                                    bigip,226                                    'Common',227                                    'gateway_icmptest')228        assert gateway_icmp1.name == 'gateway_icmptest'229        gateway_icmp1.description = TESTDESCRIPTION230        gateway_icmp1.update()231        assert gateway_icmp1.description == TESTDESCRIPTION232        gateway_icmp1.description = ''233        gateway_icmp1.refresh()234        assert gateway_icmp1.description == TESTDESCRIPTION235        gateway_icmp2 = hc1.gateway_icmp236        gateway_icmp2.load(partition='Common', name='gateway_icmptest')237        assert gateway_icmp2.selfLink == gateway_icmp1.selfLink238# End GateWay-ICMP239# Begin ICMP240def setup_icmp_test(request, bigip, partition, name):241    def teardown():242        delete_resource(hc1)243    request.addfinalizer(teardown)244    hc1 = bigip.ltm.monitor.icmps245    icmp1 = hc1.icmp246    icmp1.create(name=name, partition=partition)247    return icmp1, hc1248class TestMonitorICMP(object):249    def test_icmp_create_refresh_update_delete_load(self, request, bigip):250        icmp1, hc1 = setup_icmp_test(request, bigip, 'Common', 'icmptest')251        assert icmp1.name == 'icmptest'252        icmp1.description = TESTDESCRIPTION253        icmp1.update()254        assert icmp1.description == TESTDESCRIPTION255        icmp1.description = ''256        icmp1.refresh()257        assert icmp1.description == TESTDESCRIPTION258        icmp2 = hc1.icmp259        icmp2.load(partition='Common', name='icmptest')260        assert icmp2.selfLink == icmp1.selfLink261# End ICMP262# Begin IMAP263def setup_imap_test(request, bigip, partition, name):264    def teardown():265        delete_resource(hc1)266    request.addfinalizer(teardown)267    hc1 = bigip.ltm.monitor.imaps268    imap1 = hc1.imap269    imap1.create(name=name, partition=partition)270    return imap1, hc1271class TestMonitorIMAP(object):272    def test_imap_create_refresh_update_delete_load(self, request, bigip):273        imap1, hc1 = setup_imap_test(request, bigip, 'Common', 'imaptest')274        assert imap1.name == 'imaptest'275        imap1.description = TESTDESCRIPTION276        imap1.update()277        assert imap1.description == TESTDESCRIPTION278        imap1.description = ''279        imap1.refresh()280        assert imap1.description == TESTDESCRIPTION281        imap2 = hc1.imap282        imap2.load(partition='Common', name='imaptest')283        assert imap2.selfLink == imap1.selfLink284# End IMAP285# Begin InBand286def setup_inband_test(request, bigip, partition, name):287    def teardown():288        delete_resource(hc1)289    request.addfinalizer(teardown)290    hc1 = bigip.ltm.monitor.inbands291    inband1 = hc1.inband292    inband1.create(name=name, partition=partition)293    return inband1, hc1294class TestMonitorInBand(object):295    def test_inband_create_refresh_update_delete_load(self, request, bigip):296        inband1, hc1 =\297            setup_inband_test(request, bigip, 'Common', 'inbandtest')298        assert inband1.name == 'inbandtest'299        inband1.description = TESTDESCRIPTION300        inband1.update()301        assert inband1.description == TESTDESCRIPTION302        inband1.description = ''303        inband1.refresh()304        assert inband1.description == TESTDESCRIPTION305        inband2 = hc1.inband306        inband2.load(partition='Common', name='inbandtest')307        assert inband2.selfLink == inband1.selfLink308# End InBand309# Begin LDAP310def setup_ldap_test(request, bigip, partition, name):311    def teardown():312        delete_resource(hc1)313    request.addfinalizer(teardown)314    hc1 = bigip.ltm.monitor.ldaps315    ldap1 = hc1.ldap316    ldap1.create(name=name, partition=partition)317    return ldap1, hc1318class TestMonitorLDAP(object):319    def test_ldap_create_refresh_update_delete_load(self, request, bigip):320        ldap1, hc1 = setup_ldap_test(request, bigip, 'Common', 'ldaptest')321        assert ldap1.name == 'ldaptest'322        ldap1.description = TESTDESCRIPTION323        ldap1.update()324        assert ldap1.description == TESTDESCRIPTION325        ldap1.description = ''326        ldap1.refresh()327        assert ldap1.description == TESTDESCRIPTION328        ldap2 = hc1.ldap329        ldap2.load(partition='Common', name='ldaptest')330        assert ldap2.selfLink == ldap1.selfLink331# End LDAP Tests332# Begin Module-Score333def setup_module_score_test(request, bigip, partition, name, snmpaddr):334    def teardown():335        delete_resource(hc1)336    request.addfinalizer(teardown)337    hc1 = bigip.ltm.monitor.module_scores338    module_score1 = hc1.module_score339    creation_params = {'name': name,340                       'partition': partition,341                       'snmp-ip-address': snmpaddr}342    module_score1.create(**creation_params)343    return module_score1, hc1344class TestMonitorModule_Score(object):345    def test_module_score_create_refresh_update_delete_load(self, request,346                                                            bigip):347        module_score1, hc1 =\348            setup_module_score_test(request,349                                    bigip,350                                    'Common',351                                    'module_scoretest',352                                    '9.9.9.9')353        assert module_score1.name == 'module_scoretest'354        module_score1.description = TESTDESCRIPTION355        module_score1.update()356        assert module_score1.description == TESTDESCRIPTION357        module_score1.description = ''358        module_score1.refresh()359        assert module_score1.description == TESTDESCRIPTION360        module_score2 = hc1.module_score361        module_score2.load(partition='Common', name='module_scoretest')362        assert module_score2.selfLink == module_score1.selfLink363# End Module-Score364# Begin MSSQL365def setup_mssql_test(request, bigip, partition, name):366    def teardown():367        delete_resource(hc1)368    request.addfinalizer(teardown)369    hc1 = bigip.ltm.monitor.mssqls370    mssql1 = hc1.mssql371    creation_params = {'name': name,372                       'partition': partition}373    mssql1.create(**creation_params)374    return mssql1, hc1375class TestMonitorMSSQL(object):376    def test_mssql_create_refresh_update_delete_load(self, request, bigip):377        mssql1, hc1 = setup_mssql_test(request, bigip, 'Common', 'mssqltest')378        assert mssql1.name == 'mssqltest'379        mssql1.description = TESTDESCRIPTION380        mssql1.update()381        assert mssql1.description == TESTDESCRIPTION382        mssql1.description = ''383        mssql1.refresh()384        assert mssql1.description == TESTDESCRIPTION385        mssql2 = hc1.mssql386        mssql2.load(partition='Common', name='mssqltest')387        assert mssql2.selfLink == mssql1.selfLink388# End MSSQL389# Begin MYSQL390def setup_mysql_test(request, bigip, partition, name):391    def teardown():392        delete_resource(hc1)393    request.addfinalizer(teardown)394    hc1 = bigip.ltm.monitor.mysqls395    mysql1 = hc1.mysql396    creation_params = {'name': name,397                       'partition': partition}398    mysql1.create(**creation_params)399    return mysql1, hc1400class TestMonitorMYSQL(object):401    def test_mysql_create_refresh_update_delete_load(self, request, bigip):402        mysql1, hc1 = setup_mysql_test(request, bigip, 'Common', 'mysqltest')403        assert mysql1.name == 'mysqltest'404        mysql1.description = TESTDESCRIPTION405        mysql1.update()406        assert mysql1.description == TESTDESCRIPTION407        mysql1.description = ''408        mysql1.refresh()409        assert mysql1.description == TESTDESCRIPTION410        mysql2 = hc1.mysql411        mysql2.load(partition='Common', name='mysqltest')412        assert mysql2.selfLink == mysql1.selfLink413# End MYSQL414# Begin NNTP415def setup_nntp_test(request, bigip, partition, name):416    def teardown():417        delete_resource(hc1)418    request.addfinalizer(teardown)419    hc1 = bigip.ltm.monitor.nntps420    nntp1 = hc1.nntp421    creation_params = {'name': name,422                       'partition': partition}423    nntp1.create(**creation_params)424    return nntp1, hc1425class TestMonitorNNTP(object):426    def test_nntp_create_refresh_update_delete_load(self, request, bigip):427        nntp1, hc1 = setup_nntp_test(request, bigip, 'Common', 'nntptest')428        assert nntp1.name == 'nntptest'429        nntp1.description = TESTDESCRIPTION430        nntp1.update()431        assert nntp1.description == TESTDESCRIPTION432        nntp1.description = ''433        nntp1.refresh()434        assert nntp1.description == TESTDESCRIPTION435        nntp2 = hc1.nntp436        nntp2.load(partition='Common', name='nntptest')437        assert nntp2.selfLink == nntp1.selfLink438# End NNTP439# Begin NONE440'''441def setup_none_test(request, bigip, partition, name):442    def teardown():443        delete_resource(hc1)444    request.addfinalizer(teardown)445    hc1 = bigip.ltm.monitor.nones446    none1 = hc1.none447    creation_params = {'name': name,448                       'partition': partition}449    none1.create(**creation_params)450    return none1, hc1451class TestMonitorNONE(object):452    def test_none_create_refresh_update_delete_load(self, request,453                                                            bigip):454        none1, hc1 = setup_none_test(request, bigip, 'Common', 'nonetest')455        assert none1.name == 'nonetest'456        none1.description = TESTDESCRIPTION457        none1.update()458        assert none1.description == TESTDESCRIPTION459        none1.description = ''460        none1.refresh()461        assert none1.description == TESTDESCRIPTION462        none2 = hc1.none463        none2.load(partition='Common', name='nonetest')464        assert none2.selfLink == none1.selfLink465'''466# End NONE467# Begin Oracle468def setup_oracle_test(request, bigip, partition, name):469    def teardown():470        delete_resource(hc1)471    request.addfinalizer(teardown)472    hc1 = bigip.ltm.monitor.oracles473    oracle1 = hc1.oracle474    creation_params = {'name': name,475                       'partition': partition}476    oracle1.create(**creation_params)477    return oracle1, hc1478class TestMonitorOracle(object):479    def test_oracle_create_refresh_update_delete_load(self, request, bigip):480        oracle1, hc1 =\481            setup_oracle_test(request, bigip, 'Common', 'oracletest')482        assert oracle1.name == 'oracletest'483        oracle1.description = TESTDESCRIPTION484        oracle1.update()485        assert oracle1.description == TESTDESCRIPTION486        oracle1.description = ''487        oracle1.refresh()488        assert oracle1.description == TESTDESCRIPTION489        oracle2 = hc1.oracle490        oracle2.load(partition='Common', name='oracletest')491        assert oracle2.selfLink == oracle1.selfLink492# End Oracle493# Begin POP3494def setup_pop3_test(request, bigip, partition, name):495    def teardown():496        delete_resource(hc1)497    request.addfinalizer(teardown)498    hc1 = bigip.ltm.monitor.pop3s499    pop31 = hc1.pop3500    creation_params = {'name': name,501                       'partition': partition}502    pop31.create(**creation_params)503    return pop31, hc1504class TestMonitorPOP3(object):505    def test_pop3_create_refresh_update_delete_load(self, request, bigip):506        pop31, hc1 = setup_pop3_test(request, bigip, 'Common', 'pop3test')507        assert pop31.name == 'pop3test'508        pop31.description = TESTDESCRIPTION509        pop31.update()510        assert pop31.description == TESTDESCRIPTION511        pop31.description = ''512        pop31.refresh()513        assert pop31.description == TESTDESCRIPTION514        pop32 = hc1.pop3515        pop32.load(partition='Common', name='pop3test')516        assert pop32.selfLink == pop31.selfLink517# End POP3518# Begin PostGRESQL519def setup_postgresql_test(request, bigip, partition, name):520    def teardown():521        delete_resource(hc1)522    request.addfinalizer(teardown)523    hc1 = bigip.ltm.monitor.postgresqls524    postgresql1 = hc1.postgresql525    creation_params = {'name': name,526                       'partition': partition}527    postgresql1.create(**creation_params)528    return postgresql1, hc1529class TestMonitorPostGRESQL(object):530    def test_postgresql_create_refresh_update_delete_load(self, request,531                                                          bigip):532        postgresql1, hc1 =\533            setup_postgresql_test(request, bigip, 'Common', 'postgresqltest')534        assert postgresql1.name == 'postgresqltest'535        postgresql1.description = TESTDESCRIPTION536        postgresql1.update()537        assert postgresql1.description == TESTDESCRIPTION538        postgresql1.description = ''539        postgresql1.refresh()540        assert postgresql1.description == TESTDESCRIPTION541        postgresql2 = hc1.postgresql542        postgresql2.load(partition='Common', name='postgresqltest')543        assert postgresql2.selfLink == postgresql1.selfLink544# End PostGRESQL545# Begin Radius546def setup_radius_test(request, bigip, partition, name):547    def teardown():548        delete_resource(hc1)549    request.addfinalizer(teardown)550    hc1 = bigip.ltm.monitor.radius_s551    radius1 = hc1.radius552    creation_params = {'name': name,553                       'partition': partition}554    radius1.create(**creation_params)555    return radius1, hc1556class TestMonitorRadius(object):557    def test_radius_create_refresh_update_delete_load(self, request, bigip):558        radius1, hc1 =\559            setup_radius_test(request, bigip, 'Common', 'radiustest')560        assert radius1.name == 'radiustest'561        radius1.description = TESTDESCRIPTION562        radius1.update()563        assert radius1.description == TESTDESCRIPTION564        radius1.description = ''565        radius1.refresh()566        assert radius1.description == TESTDESCRIPTION567        radius2 = hc1.radius568        radius2.load(partition='Common', name='radiustest')569        assert radius2.selfLink == radius1.selfLink570# End Radius571# Begin Radius_Accounting572def setup_radius_accounting_test(request, bigip, partition, name):573    def teardown():574        delete_resource(hc1)575    request.addfinalizer(teardown)576    hc1 = bigip.ltm.monitor.radius_accountings577    radius_accounting1 = hc1.radius_accounting578    creation_params = {'name': name,579                       'partition': partition}580    radius_accounting1.create(**creation_params)581    return radius_accounting1, hc1582class TestMonitorRadius_Accounting(object):583    def test_radius_accounting_create_refresh_update_delete_load(self, request,584                                                                 bigip):585        radius_accounting1, hc1 =\586            setup_radius_accounting_test(request,587                                         bigip,588                                         'Common',589                                         'radius_accountingtest')590        assert radius_accounting1.name == 'radius_accountingtest'591        radius_accounting1.description = TESTDESCRIPTION592        radius_accounting1.update()593        assert radius_accounting1.description == TESTDESCRIPTION594        radius_accounting1.description = ''595        radius_accounting1.refresh()596        assert radius_accounting1.description == TESTDESCRIPTION597        radius_accounting2 = hc1.radius_accounting598        radius_accounting2.load(partition='Common',599                                name='radius_accountingtest')600        assert radius_accounting2.selfLink == radius_accounting1.selfLink601# End Radius_Accounting602# Begin Real_Server603def setup_real_server_test(request, bigip, partition, name):604    def teardown():605        delete_resource(hc1)606    request.addfinalizer(teardown)607    hc1 = bigip.ltm.monitor.real_servers608    real_server1 = hc1.real_server609    creation_params = {'name': name,610                       'partition': partition}611    real_server1.create(**creation_params)612    return real_server1, hc1613class TestMonitorReal_Server(object):614    def test_real_server_create_refresh_update_delete_load(self, request,615                                                           bigip):616        real_server1, hc1 =\617            setup_real_server_test(request, bigip, 'Common', 'real_servertest')618        assert real_server1.name == 'real_servertest'619        real_server1.description = TESTDESCRIPTION620        real_server1.update()621        assert real_server1.description == TESTDESCRIPTION622        real_server1.description = ''623        real_server1.refresh()624        assert real_server1.description == TESTDESCRIPTION625        real_server2 = hc1.real_server626        real_server2.load(partition='Common', name='real_servertest')627        assert real_server2.selfLink == real_server1.selfLink628# End Real_Server629# Begin RPC630def setup_rpc_test(request, bigip, partition, name):631    def teardown():632        delete_resource(hc1)633    request.addfinalizer(teardown)634    hc1 = bigip.ltm.monitor.rpcs635    rpc1 = hc1.rpc636    creation_params = {'name': name,637                       'partition': partition}638    rpc1.create(**creation_params)639    return rpc1, hc1640class TestMonitorRPC(object):641    def test_rpc_create_refresh_update_delete_load(self, request, bigip):642        rpc1, hc1 = setup_rpc_test(request, bigip, 'Common', 'rpctest')643        assert rpc1.name == 'rpctest'644        rpc1.description = TESTDESCRIPTION645        rpc1.update()646        assert rpc1.description == TESTDESCRIPTION647        rpc1.description = ''648        rpc1.refresh()649        assert rpc1.description == TESTDESCRIPTION650        rpc2 = hc1.rpc651        rpc2.load(partition='Common', name='rpctest')652        assert rpc2.selfLink == rpc1.selfLink653# End RPC654# Begin SASP655def setup_sasp_test(request, bigip, partition, name, primaryAddress='1.1.1.1'):656    def teardown():657        delete_resource(hc1)658    request.addfinalizer(teardown)659    hc1 = bigip.ltm.monitor.sasps660    sasp1 = hc1.sasp661    creation_params = {'name': name,662                       'partition': partition,663                       'primaryAddress': primaryAddress}664    sasp1.create(**creation_params)665    return sasp1, hc1666class TestMonitorSASP(object):667    def test_sasp_create_refresh_update_delete_load(self, request, bigip):668        sasp1, hc1 = setup_sasp_test(request, bigip, 'Common', 'sasptest')669        assert sasp1.name == 'sasptest'670        sasp1.description = TESTDESCRIPTION671        pp(sasp1.__dict__)672        sasp1.update()673        assert sasp1.description == TESTDESCRIPTION674        sasp1.description = ''675        sasp1.refresh()676        assert sasp1.description == TESTDESCRIPTION677        sasp2 = hc1.sasp678        sasp2.load(partition='Common', name='sasptest')679        assert sasp2.selfLink == sasp1.selfLink680# End SASP681# Begin Scripted682def setup_scripted_test(request, bigip, partition, name):683    def teardown():684        delete_resource(hc1)685    request.addfinalizer(teardown)686    hc1 = bigip.ltm.monitor.scripteds687    scripted1 = hc1.scripted688    creation_params = {'name': name,689                       'partition': partition}690    scripted1.create(**creation_params)691    return scripted1, hc1692class TestMonitorScripted(object):693    def test_scripted_create_refresh_update_delete_load(self, request, bigip):694        scripted1, hc1 =\695            setup_scripted_test(request, bigip, 'Common', 'scriptedtest')696        assert scripted1.name == 'scriptedtest'697        scripted1.description = TESTDESCRIPTION698        scripted1.update()699        assert scripted1.description == TESTDESCRIPTION700        scripted1.description = ''701        scripted1.refresh()702        assert scripted1.description == TESTDESCRIPTION703        scripted2 = hc1.scripted704        scripted2.load(partition='Common', name='scriptedtest')705        assert scripted2.selfLink == scripted1.selfLink706# End Scripted707# Begin SIP708def setup_sip_test(request, bigip, partition, name):709    def teardown():710        delete_resource(hc1)711    request.addfinalizer(teardown)712    hc1 = bigip.ltm.monitor.sips713    sip1 = hc1.sip714    creation_params = {'name': name,715                       'partition': partition}716    sip1.create(**creation_params)717    return sip1, hc1718class TestMonitorSIP(object):719    def test_sip_create_refresh_update_delete_load(self, request, bigip):720        sip1, hc1 = setup_sip_test(request, bigip, 'Common', 'siptest')721        assert sip1.name == 'siptest'722        sip1.description = TESTDESCRIPTION723        sip1.update()724        assert sip1.description == TESTDESCRIPTION725        sip1.description = ''726        sip1.refresh()727        assert sip1.description == TESTDESCRIPTION728        sip2 = hc1.sip729        sip2.load(partition='Common', name='siptest')730        assert sip2.selfLink == sip1.selfLink731# End SIP732# Begin SMB733def setup_smb_test(request, bigip, partition, name):734    def teardown():735        delete_resource(hc1)736    request.addfinalizer(teardown)737    hc1 = bigip.ltm.monitor.smbs738    smb1 = hc1.smb739    creation_params = {'name': name,740                       'partition': partition}741    smb1.create(**creation_params)742    return smb1, hc1743class TestMonitorSMB(object):744    def test_smb_create_refresh_update_delete_load(self, request, bigip):745        smb1, hc1 = setup_smb_test(request, bigip, 'Common', 'smbtest')746        assert smb1.name == 'smbtest'747        smb1.description = TESTDESCRIPTION748        smb1.update()749        assert smb1.description == TESTDESCRIPTION750        smb1.description = ''751        smb1.refresh()752        assert smb1.description == TESTDESCRIPTION753        smb2 = hc1.smb754        smb2.load(partition='Common', name='smbtest')755        assert smb2.selfLink == smb1.selfLink756# End SMB757# Begin SMTP758def setup_smtp_test(request, bigip, partition, name):759    def teardown():760        delete_resource(hc1)761    request.addfinalizer(teardown)762    hc1 = bigip.ltm.monitor.smtps763    smtp1 = hc1.smtp764    creation_params = {'name': name,765                       'partition': partition}766    smtp1.create(**creation_params)767    return smtp1, hc1768class TestMonitorSMTP(object):769    def test_smtp_create_refresh_update_delete_load(self, request, bigip):770        smtp1, hc1 = setup_smtp_test(request, bigip, 'Common', 'smtptest')771        assert smtp1.name == 'smtptest'772        smtp1.description = TESTDESCRIPTION773        smtp1.update()774        assert smtp1.description == TESTDESCRIPTION775        smtp1.description = ''776        smtp1.refresh()777        assert smtp1.description == TESTDESCRIPTION778        smtp2 = hc1.smtp779        smtp2.load(partition='Common', name='smtptest')780        assert smtp2.selfLink == smtp1.selfLink781# End SMTP782# Begin SNMP_DCA783def setup_snmp_dca_test(request, bigip, partition, name):784    def teardown():785        delete_resource(hc1)786    request.addfinalizer(teardown)787    hc1 = bigip.ltm.monitor.snmp_dcas788    snmp_dca1 = hc1.snmp_dca789    creation_params = {'name': name,790                       'partition': partition}791    snmp_dca1.create(**creation_params)792    return snmp_dca1, hc1793class TestMonitorSNMP_DCA(object):794    def test_snmp_dca_create_refresh_update_delete_load(self, request, bigip):795        snmp_dca1, hc1 =\796            setup_snmp_dca_test(request, bigip, 'Common', 'snmp_dcatest')797        assert snmp_dca1.name == 'snmp_dcatest'798        snmp_dca1.description = TESTDESCRIPTION799        snmp_dca1.update()800        assert snmp_dca1.description == TESTDESCRIPTION801        snmp_dca1.description = ''802        snmp_dca1.refresh()803        assert snmp_dca1.description == TESTDESCRIPTION804        snmp_dca2 = hc1.snmp_dca805        snmp_dca2.load(partition='Common', name='snmp_dcatest')806        assert snmp_dca2.selfLink == snmp_dca1.selfLink807# End SNMP_DCA808# Begin SNMP_DCA_Base809def setup_snmp_dca_base_test(request, bigip, partition, name):810    def teardown():811        delete_resource(hc1)812    request.addfinalizer(teardown)813    hc1 = bigip.ltm.monitor.snmp_dca_bases814    snmp_dca_base1 = hc1.snmp_dca_base815    creation_params = {'name': name,816                       'partition': partition}817    snmp_dca_base1.create(**creation_params)818    return snmp_dca_base1, hc1819class TestMonitorSNMP_DCA_Base(object):820    def test_snmp_dca_base_create_refresh_update_delete_load(self, request,821                                                             bigip):822        snmp_dca_base1, hc1 =\823            setup_snmp_dca_base_test(request,824                                     bigip,825                                     'Common',826                                     'snmp_dca_basetest')827        assert snmp_dca_base1.name == 'snmp_dca_basetest'828        snmp_dca_base1.description = TESTDESCRIPTION829        snmp_dca_base1.update()830        assert snmp_dca_base1.description == TESTDESCRIPTION831        snmp_dca_base1.description = ''832        snmp_dca_base1.refresh()833        assert snmp_dca_base1.description == TESTDESCRIPTION834        snmp_dca_base2 = hc1.snmp_dca_base835        snmp_dca_base2.load(partition='Common', name='snmp_dca_basetest')836        assert snmp_dca_base2.selfLink == snmp_dca_base1.selfLink837# End SNMP_DCA_Base838# Begin SOAP839def setup_soap_test(request, bigip, partition, name):840    def teardown():841        delete_resource(hc1)842    request.addfinalizer(teardown)843    hc1 = bigip.ltm.monitor.soaps844    soap1 = hc1.soap845    creation_params = {'name': name,846                       'partition': partition}847    soap1.create(**creation_params)848    return soap1, hc1849class TestMonitorSOAP(object):850    def test_soap_create_refresh_update_delete_load(self, request, bigip):851        soap1, hc1 = setup_soap_test(request, bigip, 'Common', 'soaptest')852        assert soap1.name == 'soaptest'853        soap1.description = TESTDESCRIPTION854        soap1.update()855        assert soap1.description == TESTDESCRIPTION856        soap1.description = ''857        soap1.refresh()858        assert soap1.description == TESTDESCRIPTION859        soap2 = hc1.soap860        soap2.load(partition='Common', name='soaptest')861        assert soap2.selfLink == soap1.selfLink862# End SOAP863# Begin TCP864def setup_tcp_test(request, bigip, partition, name):865    def teardown():866        delete_resource(hc1)867    request.addfinalizer(teardown)868    hc1 = bigip.ltm.monitor.tcps869    tcp1 = hc1.tcp870    creation_params = {'name': name,871                       'partition': partition}872    tcp1.create(**creation_params)873    return tcp1, hc1874class TestMonitorTCP(object):875    def test_tcp_create_refresh_update_delete_load(self, request, bigip):876        tcp1, hc1 = setup_tcp_test(request, bigip, 'Common', 'tcptest')877        assert tcp1.name == 'tcptest'878        tcp1.description = TESTDESCRIPTION879        tcp1.update()880        assert tcp1.description == TESTDESCRIPTION881        tcp1.description = ''882        tcp1.refresh()883        assert tcp1.description == TESTDESCRIPTION884        tcp2 = hc1.tcp885        tcp2.load(partition='Common', name='tcptest')886        assert tcp2.selfLink == tcp1.selfLink887# End TCP888# Begin TCP_Echo889def setup_tcp_echo_test(request, bigip, partition, name):890    def teardown():891        delete_resource(hc1)892    request.addfinalizer(teardown)893    hc1 = bigip.ltm.monitor.tcp_echos894    tcp_echo1 = hc1.tcp_echo895    creation_params = {'name': name,896                       'partition': partition}897    tcp_echo1.create(**creation_params)898    return tcp_echo1, hc1899class TestMonitorTCP_Echo(object):900    def test_tcp_echo_create_refresh_update_delete_load(self, request, bigip):901        tcp_echo1, hc1 =\902            setup_tcp_echo_test(request, bigip, 'Common', 'tcp_echotest')903        assert tcp_echo1.name == 'tcp_echotest'904        tcp_echo1.description = TESTDESCRIPTION905        tcp_echo1.update()906        assert tcp_echo1.description == TESTDESCRIPTION907        tcp_echo1.description = ''908        tcp_echo1.refresh()909        assert tcp_echo1.description == TESTDESCRIPTION910        tcp_echo2 = hc1.tcp_echo911        tcp_echo2.load(partition='Common', name='tcp_echotest')912        assert tcp_echo2.selfLink == tcp_echo1.selfLink913# End TCP_Echo914# Begin TCP_Half_Open915def setup_tcp_half_open_test(request, bigip, partition, name):916    def teardown():917        delete_resource(hc1)918    request.addfinalizer(teardown)919    hc1 = bigip.ltm.monitor.tcp_half_opens920    tcp_half_open1 = hc1.tcp_half_open921    creation_params = {'name': name,922                       'partition': partition}923    tcp_half_open1.create(**creation_params)924    return tcp_half_open1, hc1925class TestMonitorTCP_Half_Open(object):926    def test_tcp_half_open_create_refresh_update_delete_load(self, request,927                                                             bigip):928        tcp_half_open1, hc1 =\929            setup_tcp_half_open_test(request,930                                     bigip,931                                     'Common',932                                     'tcp_half_opentest')933        assert tcp_half_open1.name == 'tcp_half_opentest'934        tcp_half_open1.description = TESTDESCRIPTION935        tcp_half_open1.update()936        assert tcp_half_open1.description == TESTDESCRIPTION937        tcp_half_open1.description = ''938        tcp_half_open1.refresh()939        assert tcp_half_open1.description == TESTDESCRIPTION940        tcp_half_open2 = hc1.tcp_half_open941        tcp_half_open2.load(partition='Common', name='tcp_half_opentest')942        assert tcp_half_open2.selfLink == tcp_half_open1.selfLink943# End TCP_Half_Open944# Begin UDP945def setup_udp_test(request, bigip, partition, name):946    def teardown():947        delete_resource(hc1)948    request.addfinalizer(teardown)949    hc1 = bigip.ltm.monitor.udps950    udp1 = hc1.udp951    creation_params = {'name': name,952                       'partition': partition}953    udp1.create(**creation_params)954    return udp1, hc1955class TestMonitorUDP(object):956    def test_udp_create_refresh_update_delete_load(self, request, bigip):957        udp1, hc1 = setup_udp_test(request, bigip, 'Common', 'udptest')958        assert udp1.name == 'udptest'959        udp1.description = TESTDESCRIPTION960        udp1.update()961        assert udp1.description == TESTDESCRIPTION962        udp1.description = ''963        udp1.refresh()964        assert udp1.description == TESTDESCRIPTION965        udp2 = hc1.udp966        udp2.load(partition='Common', name='udptest')967        assert udp2.selfLink == udp1.selfLink968# End UDP969# Begin Virtual_Location970def setup_virtual_location_test(request, bigip, partition, name, pool='tp1'):971    def teardown():972        delete_resource(hc1)973    request.addfinalizer(teardown)974    hc1 = bigip.ltm.monitor.virtual_locations975    virtual_location1 = hc1.virtual_location976    creation_params = {'name': name,977                       'partition': partition,978                       'pool': pool}979    virtual_location1.create(**creation_params)980    return virtual_location1, hc1981class TestMonitorVirtual_Location(object):982    def test_virtual_location_create_refresh_update_delete_load(self, request,983                                                                bigip):984        virtual_location1, hc1 =\985            setup_virtual_location_test(request,986                                        bigip,987                                        'Common',988                                        'virtual_locationtest')989        assert virtual_location1.name == 'virtual_locationtest'990        virtual_location1.description = TESTDESCRIPTION991        virtual_location1.update()992        assert virtual_location1.description == TESTDESCRIPTION993        virtual_location1.description = ''994        virtual_location1.refresh()995        assert virtual_location1.description == TESTDESCRIPTION996        virtual_location2 = hc1.virtual_location997        virtual_location2.load(partition='Common', name='virtual_locationtest')998        assert virtual_location2.selfLink == virtual_location1.selfLink999# End Virtual_Location1000# Begin WAP1001def setup_wap_test(request, bigip, partition, name):1002    def teardown():1003        delete_resource(hc1)1004    request.addfinalizer(teardown)1005    hc1 = bigip.ltm.monitor.waps1006    wap1 = hc1.wap1007    creation_params = {'name': name,1008                       'partition': partition}1009    wap1.create(**creation_params)1010    return wap1, hc11011class TestMonitorWAP(object):1012    def test_wap_create_refresh_update_delete_load(self, request, bigip):1013        wap1, hc1 = setup_wap_test(request, bigip, 'Common', 'waptest')1014        assert wap1.name == 'waptest'1015        wap1.description = TESTDESCRIPTION1016        wap1.update()1017        assert wap1.description == TESTDESCRIPTION1018        wap1.description = ''1019        wap1.refresh()1020        assert wap1.description == TESTDESCRIPTION1021        wap2 = hc1.wap1022        wap2.load(partition='Common', name='waptest')1023        assert wap2.selfLink == wap1.selfLink1024# End WAP1025# Begin WMI1026def setup_wmi_test(request, bigip, partition, name):1027    def teardown():1028        delete_resource(hc1)1029    request.addfinalizer(teardown)1030    hc1 = bigip.ltm.monitor.wmis1031    wmi1 = hc1.wmi1032    creation_params = {'name': name,1033                       'partition': partition}1034    wmi1.create(**creation_params)1035    return wmi1, hc11036class TestMonitorWMI(object):1037    def test_wmi_create_refresh_update_delete_load(self, request, bigip):1038        wmi1, hc1 = setup_wmi_test(request, bigip, 'Common', 'wmitest')1039        assert wmi1.name == 'wmitest'1040        wmi1.description = TESTDESCRIPTION1041        wmi1.update()1042        assert wmi1.description == TESTDESCRIPTION...test_scenario.py
Source:test_scenario.py  
1#    Licensed under the Apache License, Version 2.0 (the "License"); you may2#    not use this file except in compliance with the License. You may obtain3#    a copy of the License at4#5#         http://www.apache.org/licenses/LICENSE-2.06#7#    Unless required by applicable law or agreed to in writing, software8#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT9#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the10#    License for the specific language governing permissions and limitations11#    under the License.12import testtools13from tempest import config14from tempest.lib import decorators15from sahara_tempest_plugin.tests.cli import clusters16from sahara_tempest_plugin.tests.cli import cluster_templates17from sahara_tempest_plugin.tests.cli import images18from sahara_tempest_plugin.tests.cli import node_group_templates19from sahara_tempest_plugin.tests.cli import plugins20from sahara_tempest_plugin.tests.cli import job_binaries21from sahara_tempest_plugin.tests.cli import jobs22from sahara_tempest_plugin.tests.cli import job_templates23from sahara_tempest_plugin.tests.cli import data_sources24from sahara_tempest_plugin.tests.cli import job_types25TEMPEST_CONF = config.CONF26NODE_GROUP_TEMPLATE = 'node group template'27class Scenario(images.SaharaImageCLITest,28               node_group_templates.SaharaNodeGroupCLITest,29               cluster_templates.SaharaClusterTemplateCLITest,30               clusters.SaharaClusterCLITest,31               plugins.SaharaPluginCLITest,32               job_binaries.SaharaJobBinaryCLITest,33               jobs.SaharaJobCLITest,34               job_templates.SaharaJobTemplateCLITest,35               data_sources.SaharaDataSourceCLITest,36               job_types.SaharaJobTypeCLITest):37    def test_plugin_cli(self):38        self.openstack_plugin_list()39        self.openstack_plugin_show()40        self.openstack_plugin_configs_get()41        if TEMPEST_CONF.data_processing.plugin_update_support:42            self.openstack_plugin_update()43    def test_node_group_cli(self):44        master_ngt = self.openstack_node_group_template_create('master', '4')45        worker_ngt = self.openstack_node_group_template_create('worker', '3')46        self.addCleanup(self.delete_resource, NODE_GROUP_TEMPLATE, master_ngt)47        self.addCleanup(self.delete_resource, NODE_GROUP_TEMPLATE, worker_ngt)48        self.filter_node_group_list_with_plugin()49        self.openstack_node_group_template_list()50        new_master_ngt = self.openstack_node_group_template_update(51            master_ngt, update_field='name')52        self.addCleanup(self.delete_resource, NODE_GROUP_TEMPLATE,53                        new_master_ngt)54        self.openstack_node_group_template_show(new_master_ngt)55        self.openstack_node_group_template_delete(new_master_ngt)56        self.negative_try_to_delete_protected_node_group(worker_ngt)57        self.openstack_node_group_template_delete(worker_ngt)58        self.wait_for_resource_deletion(new_master_ngt, NODE_GROUP_TEMPLATE)59        self.wait_for_resource_deletion(worker_ngt, NODE_GROUP_TEMPLATE)60        self.negative_delete_removed_node_group(worker_ngt)61    def test_cluster_template_cli(self):62        cluster_template_cmd = 'cluster template'63        ng_master = (64            self.openstack_node_group_template_create('tmp-master', '4'))65        ng_worker = (66            self.openstack_node_group_template_create('tmp-worker', '3'))67        self.addCleanup(self.delete_resource, NODE_GROUP_TEMPLATE,68                        ng_master)69        self.addCleanup(self.delete_resource, NODE_GROUP_TEMPLATE,70                        ng_worker)71        cluster_template_name = (72            self.openstack_cluster_template_create(ng_master, ng_worker))73        self.addCleanup(self.delete_resource, cluster_template_cmd,74                        cluster_template_name)75        self.openstack_cluster_template_list()76        self.openstack_cluster_template_show(cluster_template_name)77        new_cluster_template_name = self.openstack_cluster_template_update(78            cluster_template_name)79        self.addCleanup(self.delete_resource, cluster_template_cmd,80                        new_cluster_template_name)81        self.openstack_cluster_template_delete(new_cluster_template_name)82        self.wait_for_resource_deletion(new_cluster_template_name,83                                        cluster_template_cmd)84        self.openstack_node_group_template_delete(ng_master)85        self.openstack_node_group_template_delete(ng_worker)86        self.wait_for_resource_deletion(ng_master, NODE_GROUP_TEMPLATE)87        self.wait_for_resource_deletion(ng_worker, NODE_GROUP_TEMPLATE)88    @decorators.skip_because(bug="1629295")89    def test_cluster_cli(self):90        image_name = self.openstack_image_register(91            TEMPEST_CONF.data_processing.test_image_name,92            TEMPEST_CONF.data_processing.test_ssh_user)93        self.openstack_image_tags_set(image_name)94        self.openstack_image_tags_remove(image_name)95        self.openstack_image_tags_add(image_name)96        self.openstack_image_show(image_name)97        self.openstack_image_list()98        flavors_client = self.client_manager_admin.flavors_client99        flavor_ref = flavors_client.create_flavor(name='sahara-flavor',100                                                  ram=512, vcpus=1, disk=4,101                                                  id=20)['flavor']102        self.addCleanup(flavors_client.delete_flavor, flavor_ref['id'])103        self.addCleanup(self.openstack_image_unregister, image_name)104        ng_master = self.openstack_node_group_template_create('cli-cluster'105                                                              '-master',106                                                              'sahara-flavor')107        ng_worker = self.openstack_node_group_template_create('cli-cluster'108                                                              '-worker',109                                                              'sahara-flavor')110        self.addCleanup(self.delete_resource, 'node group template',111                        ng_master)112        self.addCleanup(self.delete_resource, 'node group template',113                        ng_worker)114        cluster_template_name = (115            self.openstack_cluster_template_create(ng_master, ng_worker))116        self.addCleanup(self.delete_resource, 'cluster template',117                        cluster_template_name)118        cluster_name = (119            self.openstack_cluster_create(cluster_template_name, image_name))120        self.addCleanup(self.delete_resource, 'cluster',121                        cluster_name)122        self._run_job_on_cluster(cluster_name)123        self.openstack_cluster_list()124        self.openstack_cluster_show(cluster_name)125        self.openstack_cluster_update(cluster_name)126        self.openstack_cluster_verification_show(cluster_name)127        self.openstack_cluster_verification_start(cluster_name)128        self.openstack_cluster_scale(cluster_name, ng_worker)129        self.openstack_cluster_delete(cluster_name)130        self.wait_for_resource_deletion(cluster_name, 'cluster')131        self.openstack_cluster_template_delete(cluster_template_name)132        self.wait_for_resource_deletion(cluster_template_name, 'cluster '133                                                               'template')134        self.openstack_node_group_template_delete(ng_master)135        self.openstack_node_group_template_delete(ng_worker)136        self.wait_for_resource_deletion(ng_master, 'node group template')137        self.wait_for_resource_deletion(ng_worker, 'node group template')138        self.openstack_image_unregister(image_name)139        self.negative_unregister_not_existing_image(image_name)140    @testtools.skipIf(TEMPEST_CONF.data_processing.api_version_saharaclient !=141                      '1.1', "Full job binaries testing requires API v1.1")142    def test_job_binary_cli(self):143        job_binary_name, original_file = self.openstack_job_binary_create()144        self.addCleanup(self.delete_resource, 'job binary', job_binary_name)145        self.openstack_job_binary_list()146        self.openstack_job_binary_show(job_binary_name)147        self.openstack_job_binary_update(job_binary_name, flag='description')148        self.openstack_job_binary_download(job_binary_name, original_file)149        self.filter_job_binaries_in_list()150        self.negative_try_to_update_protected_jb(job_binary_name)151        self.openstack_job_binary_update(job_binary_name, flag='unprotected')152        self.openstack_job_binary_delete(job_binary_name)153        self.negative_delete_removed_job_binary(job_binary_name)154    def test_job_template_cli(self):155        job_binary_name, _ = self.openstack_job_binary_create(156            job_internal=False)157        self.addCleanup(self.delete_resource, 'job binary', job_binary_name)158        job_template_name = self.openstack_job_template_create(job_binary_name)159        self.addCleanup(self.delete_resource, 'job template',160                        job_template_name)161        self.openstack_job_template_list()162        self.openstack_job_template_show(job_template_name)163        self.openstack_job_template_update(job_template_name)164        self.openstack_job_template_delete(job_template_name)165        self.openstack_job_binary_delete(job_binary_name)166    def test_data_source_cli(self):167        data_source_name = self.openstack_data_source_create()168        self.addCleanup(self.delete_resource, 'data source', data_source_name)169        self.openstack_data_source_list()170        self.openstack_data_source_show(data_source_name)171        self.openstack_data_source_update(data_source_name)172        self.openstack_data_source_delete(data_source_name)173    def test_job_type_cli(self):174        self.openstack_job_type_list()175        self.openstack_job_type_configs_get(flag='file')176        self.filter_job_type_in_list()177    def _run_job_on_cluster(self, cluster_name):178        job_template_name = self.openstack_job_template_name()179        self.addCleanup(self.delete_resource, 'job template',180                        job_template_name)181        input_file = self.openstack_data_source_create()182        output_file = self.openstack_data_source_create()183        self.addCleanup(self.delete_resource, 'data source', input_file)184        self.addCleanup(self.delete_resource, 'data source', output_file)185        job_id = self.openstack_job_execute(cluster_name, job_template_name,186                                            input_file, output_file)187        self.addCleanup(self.delete_resource, 'job', job_id)188        self.openstack_job_list()189        self.openstack_job_show(job_id)190        self.openstack_job_update(job_id)191        self.openstack_job_delete(job_id)192        self.openstack_data_source_delete(input_file)193        self.openstack_data_source_delete(output_file)...clients.py
Source:clients.py  
...24from tempest_lib import exceptions as exc25class Client(object):26    def is_resource_deleted(self, method, *args, **kwargs):27        raise NotImplementedError28    def delete_resource(self, method, *args, **kwargs):29        # TODO(sreshetniak): make timeout configurable30        with fixtures.Timeout(300, gentle=True):31            while True:32                if self.is_resource_deleted(method, *args, **kwargs):33                    break34                time.sleep(5)35class SaharaClient(Client):36    def __init__(self, *args, **kwargs):37        self.sahara_client = sahara_client.Client('1.1', *args, **kwargs)38    def create_node_group_template(self, *args, **kwargs):39        data = self.sahara_client.node_group_templates.create(*args, **kwargs)40        return data.id41    def delete_node_group_template(self, node_group_template_id):42        return self.delete_resource(43            self.sahara_client.node_group_templates.delete,44            node_group_template_id)45    def create_cluster_template(self, *args, **kwargs):46        data = self.sahara_client.cluster_templates.create(*args, **kwargs)47        return data.id48    def delete_cluster_template(self, cluster_template_id):49        return self.delete_resource(50            self.sahara_client.cluster_templates.delete,51            cluster_template_id)52    def create_cluster(self, *args, **kwargs):53        data = self.sahara_client.clusters.create(*args, **kwargs)54        return data.id55    def delete_cluster(self, cluster_id):56        return self.delete_resource(57            self.sahara_client.clusters.delete,58            cluster_id)59    def scale_cluster(self, cluster_id, body):60        return self.sahara_client.clusters.scale(cluster_id, body)61    def create_datasource(self, *args, **kwargs):62        data = self.sahara_client.data_sources.create(*args, **kwargs)63        return data.id64    def delete_datasource(self, datasource_id):65        return self.delete_resource(66            self.sahara_client.data_sources.delete,67            datasource_id)68    def create_job_binary_internal(self, *args, **kwargs):69        data = self.sahara_client.job_binary_internals.create(*args, **kwargs)70        return data.id71    def delete_job_binary_internal(self, job_binary_internal_id):72        return self.delete_resource(73            self.sahara_client.job_binary_internals.delete,74            job_binary_internal_id)75    def create_job_binary(self, *args, **kwargs):76        data = self.sahara_client.job_binaries.create(*args, **kwargs)77        return data.id78    def delete_job_binary(self, job_binary_id):79        return self.delete_resource(80            self.sahara_client.job_binaries.delete,81            job_binary_id)82    def create_job(self, *args, **kwargs):83        data = self.sahara_client.jobs.create(*args, **kwargs)84        return data.id85    def delete_job(self, job_id):86        return self.delete_resource(87            self.sahara_client.jobs.delete,88            job_id)89    def run_job(self, *args, **kwargs):90        data = self.sahara_client.job_executions.create(*args, **kwargs)91        return data.id92    def delete_job_execution(self, job_execution_id):93        return self.delete_resource(94            self.sahara_client.job_executions.delete,95            job_execution_id)96    def get_cluster_status(self, cluster_id):97        data = self.sahara_client.clusters.get(cluster_id)98        return str(data.status)99    def get_job_status(self, exec_id):100        data = self.sahara_client.job_executions.get(exec_id)101        return str(data.info['status'])102    def is_resource_deleted(self, method, *args, **kwargs):103        try:104            method(*args, **kwargs)105        except saharaclient_base.APIException as ex:106            return ex.error_code == 404107        return False108class NovaClient(Client):109    def __init__(self, *args, **kwargs):110        self.nova_client = nova_client.Client('1.1', *args, **kwargs)111    def get_image_id(self, image_name):112        if uuidutils.is_uuid_like(image_name):113            return image_name114        for image in self.nova_client.images.list():115            if image.name == image_name:116                return image.id117        raise exc.NotFound(image_name)118class NeutronClient(Client):119    def __init__(self, *args, **kwargs):120        self.neutron_client = neutron_client.Client('2.0', *args, **kwargs)121    def get_network_id(self, network_name):122        if uuidutils.is_uuid_like(network_name):123            return network_name124        networks = self.neutron_client.list_networks(name=network_name)125        networks = networks['networks']126        if len(networks) < 1:127            raise exc.NotFound(network_name)128        return networks[0]['id']129class SwiftClient(Client):130    def __init__(self, *args, **kwargs):131        self.swift_client = swift_client.Connection(auth_version='2.0',132                                                    *args, **kwargs)133    def create_container(self, container_name):134        return self.swift_client.put_container(container_name)135    def delete_container(self, container_name):136        return self.delete_resource(137            self.swift_client.delete_container,138            container_name)139    def upload_data(self, container_name, object_name, data):140        return self.swift_client.put_object(container_name, object_name, data)141    def delete_object(self, container_name, object_name):142        return self.delete_resource(143            self.swift_client.delete_object,144            container_name,145            object_name)146    def is_resource_deleted(self, method, *args, **kwargs):147        try:148            method(*args, **kwargs)149        except swift_exc.ClientException as ex:150            return ex.http_status == 404...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
