Best Python code snippet using tempest_python
test_cleanup_services.py
Source:test_cleanup_services.py  
...87                mocked_func = self.useFixture(fixtures.MockPatch(88                    func, return_value=mocked_response))89            mocked_fixtures.append(mocked_func)90        return mocked_fixtures91    def run_function_with_mocks(self, function_to_run, functions_to_mock):92        """Mock a service client function for testing.93        :param function_to_run: The service client function to call.94        :param functions_to_mock: a list of tuples containing the function95               to mock, the response body, and the response status.96               EX:97               ('tempest.lib.common.rest_client.RestClient.get',98                {'users': ['']},99                200)100        """101        mocked_fixtures = self._create_fixtures(functions_to_mock)102        func_return = function_to_run()103        return func_return, mocked_fixtures104class BaseCmdServiceTests(MockFunctionsBase):105    def setUp(self):106        super(BaseCmdServiceTests, self).setUp()107        self.useFixture(fake_config.ConfigFixture())108        self.patchobject(config, 'TempestConfigPrivate',109                         fake_config.FakePrivate)110        self.useFixture(fixtures.MockPatch(111            'tempest.cmd.cleanup_service._get_network_id',112            return_value=''))113        cleanup_service.init_conf()114        self.conf_values = {"flavors": cleanup_service.CONF_FLAVORS[0],115                            "images": cleanup_service.CONF_IMAGES[0],116                            "projects": cleanup_service.CONF_PROJECTS[0],117                            "users": cleanup_service.CONF_USERS[0],118                            "networks": cleanup_service.CONF_PUB_NETWORK,119                            "security_groups":120                                cleanup_service.CONF_PROJECTS[0],121                            "ports": cleanup_service.CONF_PUB_NETWORK,122                            "routers": cleanup_service.CONF_PUB_ROUTER,123                            "subnetpools": cleanup_service.CONF_PROJECTS[0],124                            }125    saved_state = {126        # Static list to ensure global service saved items are not deleted127        "users": {u'32rwef64245tgr20121qw324bgg': u'Lightning'},128        "flavors": {u'42': u'm1.tiny'},129        "images": {u'34yhwr-4t3q': u'stratus-0.3.2-x86_64-disk'},130        "roles": {u'3efrt74r45hn': u'president'},131        "projects": {u'f38ohgp93jj032': u'manhattan'},132        "domains": {u'default': u'Default'},133        # Static list to ensure project service saved items are not deleted134        "snapshots": {u'1ad4c789-7e8w-4dwg-afc5': u'saved-snapshot'},135        "servers": {u'7a6d4v7w-36ds-4216': u'saved-server'},136        "server_groups": {u'as6d5f7g-46ca-475e': u'saved-server-group'},137        "keypairs": {u'saved-key-pair': {138            u'fingerprint': u'7e:eb:ab:24',139            u'name': u'saved-key-pair'140        }},141        "volumes": {u'aa77asdf-1234': u'saved-volume'},142        "networks": {u'6722fc13-4319': {143            u'id': u'6722fc13-4319',144            u'name': u'saved-network'145        }},146        "floatingips": {u'9e82d248-408a': {147            u'id': u'9e82d248-408a',148            u'status': u'ACTIVE'149        }},150        "routers": {u'4s5w34hj-id44': u'saved-router'},151        "metering_label_rules": {u'93a973ce-4dc5': {152            u'direction': u'ingress',153            u'id': u'93a973ce-4dc5'154        }},155        "metering_labels": {u'723b346ce866-4c7q': u'saved-label'},156        "ports": {u'aa74aa4v-741a': u'saved-port'},157        "security_groups": {u'7q844add-3697': u'saved-sec-group'},158        "subnets": {u'55ttda4a-2584': u'saved-subnet'},159        "subnetpools": {u'8acf64c1-43fc': u'saved-subnet-pool'}160    }161    # Mocked methods162    get_method = 'tempest.lib.common.rest_client.RestClient.get'163    delete_method = 'tempest.lib.common.rest_client.RestClient.delete'164    log_method = 'tempest.cmd.cleanup_service.LOG.exception'165    # Override parameters166    service_class = 'BaseService'167    response = None168    service_name = 'default'169    def _create_cmd_service(self, service_type, is_save_state=False,170                            is_preserve=False, is_dry_run=False):171        creds = fake_credentials.FakeKeystoneV3Credentials()172        os = clients.Manager(creds)173        return getattr(cleanup_service, service_type)(174            os,175            is_save_state=is_save_state,176            is_preserve=is_preserve,177            is_dry_run=is_dry_run,178            data={},179            saved_state_json=self.saved_state180            )181    def _test_delete(self, mocked_fixture_tuple_list, fail=False):182        serv = self._create_cmd_service(self.service_class)183        resp, fixtures = self.run_function_with_mocks(184            serv.run,185            mocked_fixture_tuple_list,186        )187        for fixture in fixtures:188            if fixture.mock.return_value == 'validate':189                fixture.mock.assert_called()190            elif fail is False and fixture.mock.return_value == 'exception':191                fixture.mock.assert_not_called()192            elif self.service_name in self.saved_state.keys():193                fixture.mock.assert_called_once()194                for key in self.saved_state[self.service_name].keys():195                    self.assertNotIn(key, fixture.mock.call_args[0][0])196            else:197                fixture.mock.assert_called_once()198        self.assertFalse(serv.data)199    def _test_dry_run_true(self, mocked_fixture_tuple_list):200        serv = self._create_cmd_service(self.service_class, is_dry_run=True)201        _, fixtures = self.run_function_with_mocks(202            serv.run,203            mocked_fixture_tuple_list204        )205        for fixture in fixtures:206            if fixture.mock.return_value == 'delete':207                fixture.mock.assert_not_called()208            elif self.service_name in self.saved_state.keys():209                fixture.mock.assert_called_once()210                for key in self.saved_state[self.service_name].keys():211                    self.assertNotIn(key, fixture.mock.call_args[0][0])212            else:213                fixture.mock.assert_called_once()214    def _test_saved_state_true(self, mocked_fixture_tuple_list):215        serv = self._create_cmd_service(self.service_class, is_save_state=True)216        _, fixtures = self.run_function_with_mocks(217            serv.run,218            mocked_fixture_tuple_list219        )220        for item in self.response[self.service_name]:221            self.assertIn(item['id'],222                          serv.data[self.service_name])223        for fixture in fixtures:224            fixture.mock.assert_called_once()225    def _test_is_preserve_true(self, mocked_fixture_tuple_list):226        serv = self._create_cmd_service(self.service_class, is_preserve=True)227        resp, fixtures = self.run_function_with_mocks(228            serv.list,229            mocked_fixture_tuple_list230        )231        for fixture in fixtures:232            fixture.mock.assert_called_once()233        self.assertIn(resp[0], self.response[self.service_name])234        for rsp in resp:235            self.assertNotIn(rsp['id'], self.conf_values.values())236            self.assertNotIn(rsp['name'], self.conf_values.values())237class TestSnapshotService(BaseCmdServiceTests):238    service_class = 'SnapshotService'239    service_name = 'snapshots'240    response = {241        "snapshots": [242            {243                "status": "available",244                "metadata": {245                    "name": "test"246                },247                "name": "test-volume-snapshot",248                "user_id": "40c2102f4a554b848d96b14f3eec39ed",249                "volume_id": "173f7b48-c4c1-4e70-9acc-086b39073506",250                "created_at": "2015-11-29T02:25:51.000000",251                "size": 1,252                "updated_at": "2015-11-20T05:36:40.000000",253                "os-extended-snapshot-attributes:progress": "100%",254                "id": "b1323cda-8e4b-41c1-afc5-2fc791809c8c",255                "description": "volume snapshot"256            },257            {258                "status": "available",259                "name": "saved-snapshot",260                "id": "1ad4c789-7e8w-4dwg-afc5",261                "description": "snapshot in saved state"262            }263        ]264    }265    def test_delete_fail(self):266        delete_mock = [(self.get_method, self.response, 200),267                       (self.delete_method, 'error', None),268                       (self.log_method, 'exception', None)]269        self._test_delete(delete_mock, fail=True)270    def test_delete_pass(self):271        delete_mock = [(self.get_method, self.response, 200),272                       (self.delete_method, None, 202),273                       (self.log_method, 'exception', None)]274        self._test_delete(delete_mock)275    def test_dry_run(self):276        dry_mock = [(self.get_method, self.response, 200),277                    (self.delete_method, "delete", None)]278        self._test_dry_run_true(dry_mock)279    def test_save_state(self):280        self._test_saved_state_true([(self.get_method, self.response, 200)])281class TestServerService(BaseCmdServiceTests):282    service_class = 'ServerService'283    service_name = 'servers'284    response = {285        "servers": [286            {287                "id": "22c91117-08de-4894-9aa9-6ef382400985",288                "links": [289                    {290                        "href": "http://openstack.example.com/v2/6f70-6ef0985",291                        "rel": "self"292                    },293                    {294                        "href": "http://openstack.example.com/6f70656e7-6ef35",295                        "rel": "bookmark"296                    }297                ],298                "name": "new-server-test"299            },300            {301                "id": "7a6d4v7w-36ds-4216",302                "links": [303                    {304                        "href": "http://openstack.example.com/v2/6f70-6ef0985",305                        "rel": "self"306                    },307                    {308                        "href": "http://openstack.example.com/6f70656e7-6ef35",309                        "rel": "bookmark"310                    }311                ],312                "name": "saved-server"313            }314        ]315    }316    def test_delete_fail(self):317        delete_mock = [(self.get_method, self.response, 200),318                       (self.delete_method, 'error', None),319                       (self.log_method, 'exception', None)]320        self._test_delete(delete_mock, fail=True)321    def test_delete_pass(self):322        delete_mock = [(self.get_method, self.response, 200),323                       (self.delete_method, None, 204),324                       (self.log_method, 'exception', None)]325        self._test_delete(delete_mock)326    def test_dry_run(self):327        dry_mock = [(self.get_method, self.response, 200),328                    (self.delete_method, "delete", None)]329        self._test_dry_run_true(dry_mock)330    def test_save_state(self):331        self._test_saved_state_true([(self.get_method, self.response, 200)])332class TestServerGroupService(BaseCmdServiceTests):333    service_class = 'ServerGroupService'334    service_name = 'server_groups'335    validate_response = ('tempest.lib.services.compute.server_groups_client'336                         '.ServerGroupsClient.validate_response')337    response = {338        "server_groups": [339            {340                "id": "616fb98f-46ca-475e-917e-2563e5a8cd19",341                "name": "test",342                "policy": "anti-affinity",343                "rules": {"max_server_per_host": 3},344                "members": [],345                "project_id": "6f70656e737461636b20342065766572",346                "user_id": "fake"347            },348            {349                "id": "as6d5f7g-46ca-475e",350                "name": "saved-server-group"351            }352        ]353    }354    def test_delete_fail(self):355        delete_mock = [(self.get_method, self.response, 200),356                       (self.validate_response, 'validate', None),357                       (self.delete_method, 'error', None),358                       (self.log_method, 'exception', None)]359        self._test_delete(delete_mock, fail=True)360    def test_delete_pass(self):361        delete_mock = [(self.get_method, self.response, 200),362                       (self.validate_response, 'validate', None),363                       (self.delete_method, None, 204),364                       (self.log_method, 'exception', None)]365        self._test_delete(delete_mock)366    def test_dry_run(self):367        dry_mock = [(self.get_method, self.response, 200),368                    (self.validate_response, 'validate', None),369                    (self.delete_method, "delete", None)]370        self._test_dry_run_true(dry_mock)371    def test_save_state(self):372        self._test_saved_state_true([(self.get_method, self.response, 200),373                                     (self.validate_response, 'validate', None)374                                     ])375class TestKeyPairService(BaseCmdServiceTests):376    service_class = 'KeyPairService'377    service_name = 'keypairs'378    validate_response = ('tempest.lib.services.compute.keypairs_client'379                         '.KeyPairsClient.validate_response')380    response = {381        "keypairs": [382            {383                "keypair": {384                    "fingerprint": "7e:eb:ab:24:ba:d1:e1:88:ae:9a:fb:66:53:bd",385                    "name": "keypair-5d935425-31d5-48a7-a0f1-e76e9813f2c3",386                    "type": "ssh",387                    "public_key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCkF\n"388                }389            },390            {391                "keypair": {392                    "fingerprint": "7e:eb:ab:24",393                    "name": "saved-key-pair"394                }395            }396        ]397    }398    def _test_saved_state_true(self, mocked_fixture_tuple_list):399        serv = self._create_cmd_service(self.service_class, is_save_state=True)400        _, fixtures = self.run_function_with_mocks(401            serv.run,402            mocked_fixture_tuple_list403        )404        for item in self.response[self.service_name]:405            self.assertTrue(item['keypair']['name'],406                            serv.data[self.service_name])407        for fixture in fixtures:408            fixture.mock.assert_called_once()409    def test_delete_fail(self):410        delete_mock = [(self.get_method, self.response, 200),411                       (self.validate_response, 'validate', None),412                       (self.delete_method, 'error', None),413                       (self.log_method, 'exception', None)]414        self._test_delete(delete_mock, fail=True)415    def test_delete_pass(self):416        delete_mock = [(self.get_method, self.response, 200),417                       (self.validate_response, 'validate', None),418                       (self.delete_method, None, 204),419                       (self.log_method, 'exception', None)]420        self._test_delete(delete_mock)421    def test_dry_run(self):422        dry_mock = [(self.get_method, self.response, 200),423                    (self.validate_response, 'validate', None),424                    (self.delete_method, "delete", None)]425        self._test_dry_run_true(dry_mock)426    def test_save_state(self):427        self._test_saved_state_true([428            (self.get_method, self.response, 200),429            (self.validate_response, 'validate', None)430        ])431class TestVolumeService(BaseCmdServiceTests):432    service_class = 'VolumeService'433    service_name = 'volumes'434    response = {435        "volumes": [436            {437                "id": "efa54464-8fab-47cd-a05a-be3e6b396188",438                "links": [439                    {440                        "href": "http://127.0.0.1:37097/v3/89af/volumes/efa54",441                        "rel": "self"442                    },443                    {444                        "href": "http://127.0.0.1:37097/89af/volumes/efa54464",445                        "rel": "bookmark"446                    }447                ],448                "name": "volume-name"449            },450            {451                "id": "aa77asdf-1234",452                "name": "saved-volume"453            }454        ]455    }456    def test_delete_fail(self):457        delete_mock = [(self.get_method, self.response, 200),458                       (self.delete_method, 'error', None),459                       (self.log_method, 'exception', None)]460        self._test_delete(delete_mock, fail=True)461    def test_delete_pass(self):462        delete_mock = [(self.get_method, self.response, 200),463                       (self.delete_method, None, 202),464                       (self.log_method, 'exception', None)]465        self._test_delete(delete_mock)466    def test_dry_run(self):467        dry_mock = [(self.get_method, self.response, 200),468                    (self.delete_method, "delete", None)]469        self._test_dry_run_true(dry_mock)470    def test_save_state(self):471        self._test_saved_state_true([(self.get_method, self.response, 200)])472# Begin network service classes473class TestNetworkService(BaseCmdServiceTests):474    service_class = 'NetworkService'475    service_name = 'networks'476    response = {477        "networks": [478            {479                "admin_state_up": True,480                "availability_zone_hints": [],481                "availability_zones": [482                    "nova"483                ],484                "created_at": "2016-03-08T20:19:41",485                "dns_domain": "my-domain.org.",486                "id": "d32019d3-bc6e-4319-9c1d-6722fc136a22",487                "l2_adjacency": False,488                "mtu": 1500,489                "name": "net1",490                "port_security_enabled": True,491                "project_id": "4fd44f30292945e481c7b8a0c8908869",492                "qos_policy_id": "6a8454ade84346f59e8d40665f878b2e",493                "revision_number": 1,494                "router:external": False,495                "shared": False,496                "status": "ACTIVE",497                "subnets": [498                    "54d6f61d-db07-451c-9ab3-b9609b6b6f0b"499                ],500                "tenant_id": "4fd44f30292945e481c7b8a0c8908869",501                "updated_at": "2016-03-08T20:19:41",502                "vlan_transparent": True,503                "description": "",504                "is_default": False505            },506            {507                "id": "6722fc13-4319",508                "name": "saved-network"509            }510        ]511    }512    def test_delete_fail(self):513        delete_mock = [(self.get_method, self.response, 200),514                       (self.delete_method, 'error', None),515                       (self.log_method, 'exception', None)]516        self._test_delete(delete_mock, fail=True)517    def test_delete_pass(self):518        delete_mock = [(self.get_method, self.response, 200),519                       (self.delete_method, None, 204),520                       (self.log_method, 'exception', None)]521        self._test_delete(delete_mock)522    def test_dry_run(self):523        dry_mock = [(self.get_method, self.response, 200),524                    (self.delete_method, "delete", None)]525        self._test_dry_run_true(dry_mock)526    def test_save_state(self):527        self._test_saved_state_true([(self.get_method, self.response, 200)])528    def test_preserve_list(self):529        self.response['networks'].append(530            {531                "admin_state_up": True,532                "availability_zone_hints": [],533                "availability_zones": [534                    "nova"535                ],536                "created_at": "2017-03-08T20:19:41",537                "dns_domain": "my-domain.org.",538                "id": cleanup_service.CONF_PUB_NETWORK,539                "name": "net2",540                "port_security_enabled": True,541                "project_id": "4fd44f30292945e481c7b8a0c8908869",542                "qos_policy_id": "6a8454ade84346f59e8d40665f878b2e",543                "revision_number": 1,544                "status": "ACTIVE",545                "subnets": [546                    "54d6f61d-db07-451c-9ab3-b9609b6b6f0b"547                ],548                "tenant_id": "4fd44f30292945e481c7b8a0c8908869",549                "updated_at": "2018-03-08T20:19:41",550                "vlan_transparent": True,551                "is_default": False552            })553        self._test_is_preserve_true([(self.get_method, self.response, 200)])554class TestNetworkFloatingIpService(BaseCmdServiceTests):555    service_class = 'NetworkFloatingIpService'556    service_name = 'floatingips'557    response = {558        "floatingips": [559            {560                "router_id": "d23abc8d-2991-4a55-ba98-2aaea84cc72f",561                "description": "for test",562                "dns_domain": "my-domain.org.",563                "dns_name": "myfip",564                "created_at": "2016-12-21T10:55:50Z",565                "updated_at": "2016-12-21T10:55:53Z",566                "revision_number": 1,567                "project_id": "4969c491a3c74ee4af974e6d800c62de",568                "tenant_id": "4969c491a3c74ee4af974e6d800c62de",569                "floating_network_id": "376da547-b977-4cfe-9cba-275c80debf57",570                "fixed_ip_address": "10.0.0.3",571                "floating_ip_address": "172.24.4.228",572                "port_id": "ce705c24-c1ef-408a-bda3-7bbd946164ab",573                "id": "2f245a7b-796b-4f26-9cf9-9e82d248fda7",574                "status": "ACTIVE",575                "port_details": {576                    "status": "ACTIVE",577                    "name": "",578                    "admin_state_up": True,579                    "network_id": "02dd8479-ef26-4398-a102-d19d0a7b3a1f",580                    "device_owner": "compute:nova",581                    "mac_address": "fa:16:3e:b1:3b:30",582                    "device_id": "8e3941b4-a6e9-499f-a1ac-2a4662025cba"583                },584                "tags": ["tag1,tag2"],585                "port_forwardings": []586            },587            {588                "id": "9e82d248-408a",589                "status": "ACTIVE"590            }591        ]592    }593    def test_delete_fail(self):594        delete_mock = [(self.get_method, self.response, 200),595                       (self.delete_method, 'error', None),596                       (self.log_method, 'exception', None)]597        self._test_delete(delete_mock, fail=True)598    def test_delete_pass(self):599        delete_mock = [(self.get_method, self.response, 200),600                       (self.delete_method, None, 204),601                       (self.log_method, 'exception', None)]602        self._test_delete(delete_mock)603    def test_dry_run(self):604        dry_mock = [(self.get_method, self.response, 200),605                    (self.delete_method, "delete", None)]606        self._test_dry_run_true(dry_mock)607    def test_save_state(self):608        self._test_saved_state_true([(self.get_method, self.response, 200)])609class TestNetworkRouterService(BaseCmdServiceTests):610    service_class = 'NetworkRouterService'611    service_name = 'routers'612    validate_response = ('tempest.lib.services.network.routers_client'613                         '.RoutersClient.validate_response')614    response = {615        "routers": [616            {617                "admin_state_up": True,618                "availability_zone_hints": [],619                "availability_zones": [620                    "nova"621                ],622                "created_at": "2018-03-19T19:17:04Z",623                "description": "",624                "distributed": False,625                "external_gateway_info": {626                    "enable_snat": True,627                    "external_fixed_ips": [628                        {629                            "ip_address": "172.24.4.3",630                            "subnet_id": "b930d7f6-ceb7-40a0-8b81-a425dd994ccf"631                        },632                        {633                            "ip_address": "2001:db8::c",634                            "subnet_id": "0c56df5d-ace5-46c8-8f4c-45fa4e334d18"635                        }636                    ],637                    "network_id": "ae34051f-aa6c-4c75-abf5-50dc9ac99ef3"638                },639                "flavor_id": "f7b14d9a-b0dc-4fbe-bb14-a0f4970a69e0",640                "ha": False,641                "id": "915a14a6-867b-4af7-83d1-70efceb146f9",642                "name": "router2",643                "revision_number": 1,644                "routes": [645                    {646                        "destination": "179.24.1.0/24",647                        "nexthop": "172.24.3.99"648                    }649                ],650                "status": "ACTIVE",651                "updated_at": "2018-03-19T19:17:22Z",652                "project_id": "0bd18306d801447bb457a46252d82d13",653                "tenant_id": "0bd18306d801447bb457a46252d82d13",654                "tags": ["tag1,tag2"]655            },656            {657                "id": "4s5w34hj-id44",658                "name": "saved-router"659            }660        ],661        # "ports" key is added to the response in order to simplify unit662        # testing - it's because NetworkRouterService's delete method lists663        # ports before deleting any router664        "ports": []665    }666    def _test_delete(self, mocked_fixture_tuple_list, fail=False):667        serv = self._create_cmd_service(self.service_class)668        resp, fixtures = self.run_function_with_mocks(669            serv.run,670            mocked_fixture_tuple_list,671        )672        for fixture in fixtures:673            if fail is False and fixture.mock.return_value == 'exception':674                fixture.mock.assert_not_called()675            elif self.service_name in self.saved_state.keys():676                fixture.mock.assert_called()677                for key in self.saved_state[self.service_name].keys():678                    self.assertNotIn(key, fixture.mock.call_args[0][0])679        self.assertFalse(serv.data)680    def test_delete_fail(self):681        delete_mock = [(self.get_method, self.response, 200),682                       (self.delete_method, 'error', None),...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
