Best Python code snippet using tempest_python
test_security_groups_basic_ops.py
Source:test_security_groups_basic_ops.py  
...215        self.assertEqual(216            sorted([s['name'] for s in security_groups]),217            sorted([s['name'] for s in server['security_groups']]))218        return server219    def _create_tenant_servers(self, tenant, num=1):220        for i in range(num):221            name = 'server-{tenant}-gen-{num}-'.format(222                   tenant=tenant.creds.tenant_name,223                   num=i224            )225            name = data_utils.rand_name(name)226            server = self._create_server(name, tenant)227            tenant.servers.append(server)228    def _set_access_point(self, tenant):229        """230        creates a server in a secgroup with rule allowing external ssh231        in order to access tenant internal network232        workaround ip namespace233        """234        secgroups = tenant.security_groups.values()235        name = 'server-{tenant}-access_point-'.format(236            tenant=tenant.creds.tenant_name)237        name = data_utils.rand_name(name)238        server = self._create_server(name, tenant,239                                     security_groups=secgroups)240        tenant.access_point = server241        self._assign_floating_ips(tenant, server)242    def _assign_floating_ips(self, tenant, server):243        public_network_id = CONF.network.public_network_id244        floating_ip = self._create_floating_ip(245            server, public_network_id,246            client=tenant.manager.network_client)247        self.floating_ips.setdefault(server['id'], floating_ip)248    def _create_tenant_network(self, tenant):249        network, subnet, router = self.create_networks(250            client=tenant.manager.network_client)251        tenant.set_network(network, subnet, router)252    def _set_compute_context(self, tenant):253        self.servers_client = tenant.manager.servers_client254        return self.servers_client255    def _deploy_tenant(self, tenant_or_id):256        """257        creates:258            network259            subnet260            router (if public not defined)261            access security group262            access-point server263        """264        if not isinstance(tenant_or_id, self.TenantProperties):265            tenant = self.tenants[tenant_or_id]266        else:267            tenant = tenant_or_id268        self._set_compute_context(tenant)269        self._create_tenant_keypairs(tenant)270        self._create_tenant_network(tenant)271        self._create_tenant_security_groups(tenant)272        self._set_access_point(tenant)273    def _get_server_ip(self, server, floating=False):274        """275        returns the ip (floating/internal) of a server276        """277        if floating:278            server_ip = self.floating_ips[server['id']].floating_ip_address279        else:280            server_ip = None281            network_name = self.tenants[server['tenant_id']].network.name282            if network_name in server['addresses']:283                server_ip = server['addresses'][network_name][0]['addr']284        return server_ip285    def _connect_to_access_point(self, tenant):286        """287        create ssh connection to tenant access point288        """289        access_point_ssh = \290            self.floating_ips[tenant.access_point['id']].floating_ip_address291        private_key = tenant.keypair['private_key']292        access_point_ssh = self._ssh_to_server(access_point_ssh,293                                               private_key=private_key)294        return access_point_ssh295    def _check_connectivity(self, access_point, ip, should_succeed=True):296        if should_succeed:297            msg = "Timed out waiting for %s to become reachable" % ip298        else:299            msg = "%s is reachable" % ip300        try:301            self.assertTrue(self._check_remote_connectivity(access_point, ip,302                                                            should_succeed),303                            msg)304        except test.exceptions.SSHTimeout:305            raise306        except Exception:307            debug.log_net_debug()308            raise309    def _test_in_tenant_block(self, tenant):310        access_point_ssh = self._connect_to_access_point(tenant)311        for server in tenant.servers:312            self._check_connectivity(access_point=access_point_ssh,313                                     ip=self._get_server_ip(server),314                                     should_succeed=False)315    def _test_in_tenant_allow(self, tenant):316        ruleset = dict(317            protocol='icmp',318            remote_group_id=tenant.security_groups['default'].id,319            direction='ingress'320        )321        self._create_security_group_rule(322            secgroup=tenant.security_groups['default'],323            **ruleset324        )325        access_point_ssh = self._connect_to_access_point(tenant)326        for server in tenant.servers:327            self._check_connectivity(access_point=access_point_ssh,328                                     ip=self._get_server_ip(server))329    def _test_cross_tenant_block(self, source_tenant, dest_tenant):330        """331        if public router isn't defined, then dest_tenant access is via332        floating-ip333        """334        access_point_ssh = self._connect_to_access_point(source_tenant)335        ip = self._get_server_ip(dest_tenant.access_point,336                                 floating=self.floating_ip_access)337        self._check_connectivity(access_point=access_point_ssh, ip=ip,338                                 should_succeed=False)339    def _test_cross_tenant_allow(self, source_tenant, dest_tenant):340        """341        check for each direction:342        creating rule for tenant incoming traffic enables only 1way traffic343        """344        ruleset = dict(345            protocol='icmp',346            direction='ingress'347        )348        self._create_security_group_rule(349            secgroup=dest_tenant.security_groups['default'],350            client=dest_tenant.manager.network_client,351            **ruleset352        )353        access_point_ssh = self._connect_to_access_point(source_tenant)354        ip = self._get_server_ip(dest_tenant.access_point,355                                 floating=self.floating_ip_access)356        self._check_connectivity(access_point_ssh, ip)357        # test that reverse traffic is still blocked358        self._test_cross_tenant_block(dest_tenant, source_tenant)359        # allow reverse traffic and check360        self._create_security_group_rule(361            secgroup=source_tenant.security_groups['default'],362            client=source_tenant.manager.network_client,363            **ruleset364        )365        access_point_ssh_2 = self._connect_to_access_point(dest_tenant)366        ip = self._get_server_ip(source_tenant.access_point,367                                 floating=self.floating_ip_access)368        self._check_connectivity(access_point_ssh_2, ip)369    def _verify_mac_addr(self, tenant):370        """371        verify that VM (tenant's access point) has the same ip,mac as listed in372        port list373        """374        access_point_ssh = self._connect_to_access_point(tenant)375        mac_addr = access_point_ssh.get_mac_address()376        mac_addr = mac_addr.strip().lower()377        # Get the fixed_ips and mac_address fields of all ports. Select378        # only those two columns to reduce the size of the response.379        port_list = self._list_ports(fields=['fixed_ips', 'mac_address'])380        port_detail_list = [381            (port['fixed_ips'][0]['subnet_id'],382             port['fixed_ips'][0]['ip_address'],383             port['mac_address'].lower())384            for port in port_list if port['fixed_ips']385        ]386        server_ip = self._get_server_ip(tenant.access_point)387        subnet_id = tenant.subnet.id388        self.assertIn((subnet_id, server_ip, mac_addr), port_detail_list)389    @test.attr(type='smoke')390    @test.services('compute', 'network')391    def test_cross_tenant_traffic(self):392        try:393            # deploy new tenant394            self._deploy_tenant(self.alt_tenant)395            self._verify_network_details(self.alt_tenant)396            self._verify_mac_addr(self.alt_tenant)397            # cross tenant check398            source_tenant = self.primary_tenant399            dest_tenant = self.alt_tenant400            self._test_cross_tenant_block(source_tenant, dest_tenant)401            self._test_cross_tenant_allow(source_tenant, dest_tenant)402        except Exception:403            for tenant in self.tenants.values():404                self._log_console_output(servers=tenant.servers)405            raise406    @test.attr(type='smoke')407    @test.services('compute', 'network')408    def test_in_tenant_traffic(self):409        try:410            self._create_tenant_servers(self.primary_tenant, num=1)411            # in-tenant check412            self._test_in_tenant_block(self.primary_tenant)413            self._test_in_tenant_allow(self.primary_tenant)414        except Exception:415            for tenant in self.tenants.values():416                self._log_console_output(servers=tenant.servers)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
