Best Python code snippet using localstack_python
test_ipam.py
Source:test_ipam.py  
...65        try:66            self.neutron_client.list_ports(67                network_id=networks['networks'][0]['id'])68        except Exception as e:69            self.docker_client.disconnect_container_from_network(70                container_id,71                container_net_id)72            message = ("Failed to list neutron ports: %s")73            self.fail(message % e.args[0])74        # Disconnect container from network, this release ip address.75        self.docker_client.disconnect_container_from_network(container_id,76                                                             container_net_id)77        # Delete docker network, if no endpoint, will release the pool78        # and delete the subnetpool in Neutron.79        self.docker_client.stop(container=container_id)80        self.docker_client.remove_network(container_net_id)81        subnetpools = self.neutron_client.list_subnetpools(name=pool_name)82        self.assertEqual(0, len(subnetpools['subnetpools']))83    def test_container_ipam_request_address(self):84        fake_ipam = {85            "Driver": "kuryr",86            "Options": {},87            "Config": [88                {89                    "Subnet": "10.12.0.0/16",90                    "IPRange": "10.12.0.0/24",91                    "Gateway": "10.12.0.1"92                }93            ]94        }95        container_net_name = lib_utils.get_random_string(8)96        container_net = self.docker_client.create_network(97            name=container_net_name,98            driver='kuryr',99            ipam=fake_ipam)100        container_net_id = container_net.get('Id')101        try:102            networks = self.neutron_client.list_networks(103                tags=utils.make_net_tags(container_net_id))104        except Exception as e:105            self.docker_client.remove_network(container_net_id)106            message = ("Failed to list neutron networks: %s")107            self.fail(message % e.args[0])108        # Currently we cannot get IPAM pool from docker client.109        pool_name = "kuryrPool-" + "10.12.0.0/24"110        subnetpools = self.neutron_client.list_subnetpools(name=pool_name)111        self.assertEqual(1, len(subnetpools['subnetpools']))112        subnets = self.neutron_client.list_subnets(113            network_id=networks['networks'][0]['id'],114            cidr="10.12.0.0/24")115        self.assertEqual(1, len(subnets['subnets']))116        # Boot a container, and connect to the docker network.117        container_name = lib_utils.get_random_string(8)118        container = self.docker_client.create_container(119            image='kuryr/busybox',120            command='/bin/sleep 600',121            hostname='kuryr_test_container',122            name=container_name)123        warn_msg = container.get('Warning')124        container_id = container.get('Id')125        self.assertIsNone(warn_msg, 'Warn in creating container')126        self.assertIsNotNone(container_id, 'Create container id must not '127                                           'be None')128        self.docker_client.start(container=container_id)129        self.docker_client.connect_container_to_network(container_id,130                                                        container_net_id)131        try:132            ports = self.neutron_client.list_ports(133                network_id=networks['networks'][0]['id'])134        except Exception as e:135            self.docker_client.disconnect_container_from_network(136                container_id,137                container_net_id)138            message = ("Failed to list neutron ports: %s")139            self.fail(message % e.args[0])140        # DHCP port and container endpoint141        self.assertEqual(2, len(ports['ports']))142        # Find the kuryr port143        kuryr_port_param = {"network_id": networks['networks'][0]['id']}144        kuryr_ports = self.neutron_client.list_ports(145            **kuryr_port_param)146        kuryr_port = [port for port in kuryr_ports['ports'] if147                      (lib_const.DEVICE_OWNER in port['tags'] or148                       port['name'] ==149                       utils.get_neutron_port_name(port['device_id']))]150        self.assertEqual(1, len(kuryr_port))151        # Disconnect container from network, this release ip address.152        self.docker_client.disconnect_container_from_network(container_id,153                                                             container_net_id)154        # Cleanup resources155        self.docker_client.stop(container=container_id)156        self.docker_client.remove_network(container_net_id)157    def test_container_ipam_release_address(self):158        fake_ipam = {159            "Driver": "kuryr",160            "Options": {},161            "Config": [162                {163                    "Subnet": "10.13.0.0/16",164                    "IPRange": "10.13.0.0/24",165                    "Gateway": "10.13.0.1"166                }167            ]168        }169        container_net_name = lib_utils.get_random_string(8)170        container_net = self.docker_client.create_network(171            name=container_net_name,172            driver='kuryr',173            ipam=fake_ipam)174        container_net_id = container_net.get('Id')175        try:176            networks = self.neutron_client.list_networks(177                tags=utils.make_net_tags(container_net_id))178        except Exception as e:179            self.docker_client.remove_network(container_net_id)180            message = ("Failed to list neutron networks: %s")181            self.fail(message % e.args[0])182        self.assertEqual(1, len(networks['networks']))183        # Boot a container, and connect to the docker network.184        container_name = lib_utils.get_random_string(8)185        container = self.docker_client.create_container(186            image='kuryr/busybox',187            command='/bin/sleep 600',188            hostname='kuryr_test_container',189            name=container_name)190        warn_msg = container.get('Warning')191        container_id = container.get('Id')192        self.assertIsNone(warn_msg, 'Warn in creating container')193        self.assertIsNotNone(container_id, 'Create container id must not '194                                           'be None')195        self.docker_client.start(container=container_id)196        self.docker_client.connect_container_to_network(container_id,197                                                        container_net_id)198        try:199            ports = self.neutron_client.list_ports(200                network_id=networks['networks'][0]['id'])201        except Exception as e:202            self.docker_client.disconnect_container_from_network(203                container_id,204                container_net_id)205            message = ("Failed to list neutron ports: %s")206            self.fail(message % e.args[0])207        # A dhcp port gets created as well; dhcp is enabled by default208        self.assertEqual(2, len(ports['ports']))209        # Find the kuryr port210        kuryr_port_param = {"network_id": networks['networks'][0]['id']}211        kuryr_ports = self.neutron_client.list_ports(212            **kuryr_port_param)213        kuryr_port = [port for port in kuryr_ports['ports'] if214                      (lib_const.DEVICE_OWNER in port['tags'] or215                       port['name'] ==216                       utils.get_neutron_port_name(port['device_id']))]217        self.assertEqual(1, len(kuryr_port))218        # Disconnect container from network, this release ip address.219        self.docker_client.disconnect_container_from_network(container_id,220                                                             container_net_id)221        ports = self.neutron_client.list_ports(222            network_id=networks['networks'][0]['id'])223        # DHCP port leave behind.224        self.assertEqual(1, len(ports['ports']))225        kuryr_ports = self.neutron_client.list_ports(226            **kuryr_port_param)227        kuryr_port = [port for port in kuryr_ports['ports'] if228                      (lib_const.DEVICE_OWNER in port['tags'] or229                       port['name'] ==230                       utils.get_neutron_port_name(port['device_id']))]231        self.assertEqual(0, len(kuryr_port))232        # Cleanup resources233        self.docker_client.stop(container=container_id)234        self.docker_client.remove_network(container_net_id)235    def test_container_ipam_release_address_with_existing_network(self):236        # pre-created Neutron network and subnet237        neutron_net_name = lib_utils.get_random_string(8)238        neutron_network = self.neutron_client.create_network(239            {'network': {'name': neutron_net_name,240                         "admin_state_up": True}})241        neutron_subnet_name = lib_utils.get_random_string(8)242        subnet_param = [{243            'name': neutron_subnet_name,244            'network_id': neutron_network['network']['id'],245            'ip_version': 4,246            'cidr': "10.10.0.0/24",247            'enable_dhcp': True,248        }]249        self.neutron_client.create_subnet({'subnets': subnet_param})250        fake_ipam = {251            "Driver": "kuryr",252            "Options": {},253            "Config": [254                {255                    "Subnet": "10.10.0.0/16",256                    "IPRange": "10.10.0.0/24",257                    "Gateway": "10.10.0.1"258                }259            ]260        }261        # Create docker network using existing Neutron network262        options = {'neutron.net.name': neutron_net_name}263        container_net_name = lib_utils.get_random_string(8)264        container_net = self.docker_client.create_network(265            name=container_net_name,266            driver='kuryr',267            options=options,268            ipam=fake_ipam)269        container_net_id = container_net.get('Id')270        try:271            networks = self.neutron_client.list_networks(272                tags=utils.make_net_tags(container_net_id))273        except Exception as e:274            self.docker_client.remove_network(container_net_id)275            message = ("Failed to list neutron networks: %s")276            self.fail(message % e.args[0])277        self.assertEqual(1, len(networks['networks']))278        # Boot a container, and connect to the docker network.279        container_name = lib_utils.get_random_string(8)280        container = self.docker_client.create_container(281            image='kuryr/busybox',282            command='/bin/sleep 600',283            hostname='kuryr_test_container',284            name=container_name)285        warn_msg = container.get('Warning')286        container_id = container.get('Id')287        self.assertIsNone(warn_msg, 'Warn in creating container')288        self.assertIsNotNone(container_id, 'Create container id must not '289                                           'be None')290        self.docker_client.start(container=container_id)291        self.docker_client.connect_container_to_network(container_id,292                                                        container_net_id)293        try:294            ports = self.neutron_client.list_ports(295                network_id=neutron_network['network']['id'])296        except Exception as e:297            self.docker_client.disconnect_container_from_network(298                container_id,299                container_net_id)300            message = ("Failed to list neutron ports: %s")301            self.fail(message % e.args[0])302        # A dhcp port gets created as well; dhcp is enabled by default303        self.assertEqual(2, len(ports['ports']))304        # Find the kuryr port305        kuryr_port_param = {"network_id": neutron_network['network']['id']}306        kuryr_ports = self.neutron_client.list_ports(307            **kuryr_port_param)308        kuryr_port = [port for port in kuryr_ports['ports'] if309                      (lib_const.DEVICE_OWNER in port['tags'] or310                       port['name'] ==311                       utils.get_neutron_port_name(port['device_id']))]312        self.assertEqual(1, len(kuryr_port))313        # Disconnect container from network, this release ip address.314        self.docker_client.disconnect_container_from_network(container_id,315                                                             container_net_id)316        ports = self.neutron_client.list_ports(317            network_id=neutron_network['network']['id'])318        self.assertEqual(1, len(ports['ports']))319        kuryr_ports = self.neutron_client.list_ports(320            **kuryr_port_param)321        kuryr_port = [port for port in kuryr_ports['ports'] if322                      (lib_const.DEVICE_OWNER in port['tags'] or323                       port['name'] ==324                       utils.get_neutron_port_name(port['device_id']))]325        self.assertEqual(0, len(kuryr_port))326        # Cleanup resources327        self.docker_client.stop(container=container_id)328        self.docker_client.remove_network(container_net_id)329        self.neutron_client.delete_network(neutron_network['network']['id'])330    def test_container_ipam_request_address_with_existing_port(self):331        # pre-created Neutron network and subnet and port332        neutron_net_name = lib_utils.get_random_string(8)333        neutron_network = self.neutron_client.create_network(334            {'network': {'name': neutron_net_name,335                         "admin_state_up": True}})336        neutron_subnet_name = lib_utils.get_random_string(8)337        subnet_param = [{338            'name': neutron_subnet_name,339            'network_id': neutron_network['network']['id'],340            'ip_version': 4,341            'cidr': "10.14.0.0/24",342        }]343        neutron_subnet = self.neutron_client.create_subnet(344            {'subnets': subnet_param})345        neutron_v6_subnet_name = lib_utils.get_random_string(8)346        v6_subnet_param = [{347            'name': neutron_v6_subnet_name,348            'network_id': neutron_network['network']['id'],349            'ip_version': 6,350            'cidr': "fe81::/64",351        }]352        neutron_v6_subnet = self.neutron_client.create_subnet(353            {'subnets': v6_subnet_param})354        existing_neutron_port = self.neutron_client.create_port(355            {'port': {'network_id': neutron_network['network']['id']}})356        fixed_ips = {fip['subnet_id']: fip['ip_address']357                     for fip in existing_neutron_port['port']['fixed_ips']}358        ipv4_address = fixed_ips[neutron_subnet['subnets'][0]['id']]359        ipv6_address = fixed_ips[neutron_v6_subnet['subnets'][0]['id']]360        fake_ipam = {361            "Driver": "kuryr",362            "Options": {},363            "Config": [364                {365                    "Subnet": "10.14.0.0/24",366                    "Gateway": "10.14.0.1"367                },368                {369                    "Subnet": "fe81::/64",370                    "Gateway": "fe81::1"371                },372            ]373        }374        # Create docker network using existing Neutron network375        options = {'neutron.net.name': neutron_net_name}376        container_net_name = lib_utils.get_random_string(8)377        container_net = self.docker_client.create_network(378            name=container_net_name,379            driver='kuryr',380            enable_ipv6=True,381            options=options,382            ipam=fake_ipam)383        container_net_id = container_net.get('Id')384        try:385            networks = self.neutron_client.list_networks(386                tags=utils.make_net_tags(container_net_id))387        except Exception as e:388            self.docker_client.remove_network(container_net_id)389            message = ("Failed to list neutron networks: %s")390            self.fail(message % e.args[0])391        self.assertEqual(1, len(networks['networks']))392        # Boot a container, and connect to the docker network.393        container_name = lib_utils.get_random_string(8)394        container = self.docker_client.create_container(395            image='kuryr/busybox',396            command='/bin/sleep 600',397            hostname='kuryr_test_container',398            name=container_name)399        warn_msg = container.get('Warning')400        container_id = container.get('Id')401        self.assertIsNone(warn_msg, 'Warn in creating container')402        self.assertIsNotNone(container_id, 'Create container id must not '403                                           'be None')404        self.docker_client.start(container=container_id)405        self.docker_client.connect_container_to_network(406            container_id, container_net_id, ipv4_address=ipv4_address,407            ipv6_address=ipv6_address)408        try:409            ports = self.neutron_client.list_ports(410                network_id=neutron_network['network']['id'])411        except Exception as e:412            self.docker_client.disconnect_container_from_network(413                container_id,414                container_net_id)415            message = ("Failed to list neutron ports: %s")416            self.fail(message % e.args[0])417        # A dhcp port gets created as well; dhcp is enabled by default418        self.assertEqual(2, len(ports['ports']))419        # Find the existing neutron port420        neutron_port_param = {"network_id": neutron_network['network']['id']}421        neutron_ports = self.neutron_client.list_ports(422            **neutron_port_param)423        neutron_port = [port for port in neutron_ports['ports'] if424                        (const.KURYR_EXISTING_NEUTRON_PORT in port['tags'] or425                         port['name'] ==426                         utils.get_neutron_port_name(port['device_id']))]427        self.assertEqual(1, len(neutron_port))428        # Disconnect container from network.429        self.docker_client.disconnect_container_from_network(container_id,430                                                             container_net_id)431        ports = self.neutron_client.list_ports(432            network_id=neutron_network['network']['id'])433        self.assertEqual(2, len(ports['ports']))434        neutron_ports = self.neutron_client.list_ports(435            **neutron_port_param)436        neutron_port = [port for port in neutron_ports['ports'] if437                        (const.KURYR_EXISTING_NEUTRON_PORT in port['tags'] or438                         port['name'] ==439                         utils.get_neutron_port_name(port['device_id']))]440        self.assertEqual(0, len(neutron_port))441        # Cleanup resources442        self.docker_client.stop(container=container_id)443        self.docker_client.remove_network(container_net_id)444        self.neutron_client.delete_port(existing_neutron_port['port']['id'])445        self.neutron_client.delete_subnet(neutron_subnet['subnets'][0]['id'])446        self.neutron_client.delete_subnet(447            neutron_v6_subnet['subnets'][0]['id'])448        self.neutron_client.delete_network(neutron_network['network']['id'])449    def test_container_ipam_release_address_with_existing_port_same_ip(self):450        ipv4_address = "10.15.0.10"451        # pre-created the first Neutron network and subnet and port452        neutron_net_name = lib_utils.get_random_string(8)453        neutron_network = self.neutron_client.create_network(454            {'network': {'name': neutron_net_name,455                         "admin_state_up": True}})456        neutron_subnet_name = lib_utils.get_random_string(8)457        subnet_param = [{458            'name': neutron_subnet_name,459            'network_id': neutron_network['network']['id'],460            'ip_version': 4,461            'cidr': "10.15.0.0/24",462        }]463        neutron_subnet = self.neutron_client.create_subnet(464            {'subnets': subnet_param})465        existing_neutron_port = self.neutron_client.create_port(466            {'port': {'network_id': neutron_network['network']['id'],467                      'fixed_ips': [{'ip_address': ipv4_address}]}})468        fake_ipam = {469            "Driver": "kuryr",470            "Options": {471                'neutron.subnet.name': neutron_subnet_name472            },473            "Config": [474                {475                    "Subnet": "10.15.0.0/24",476                    "Gateway": "10.15.0.1"477                }478            ]479        }480        # Create docker network using existing Neutron network481        options = {'neutron.net.name': neutron_net_name,482                   'neutron.subnet.name': neutron_subnet_name}483        container_net_name = lib_utils.get_random_string(8)484        container_net = self.docker_client.create_network(485            name=container_net_name,486            driver='kuryr',487            options=options,488            ipam=fake_ipam)489        container_net_id = container_net.get('Id')490        # pre-created the second Neutron network and subnet and port491        neutron_net_name2 = lib_utils.get_random_string(8)492        neutron_network2 = self.neutron_client.create_network(493            {'network': {'name': neutron_net_name2,494                         "admin_state_up": True}})495        neutron_subnet_name2 = lib_utils.get_random_string(8)496        subnet_param2 = [{497            'name': neutron_subnet_name2,498            'network_id': neutron_network2['network']['id'],499            'ip_version': 4,500            'cidr': "10.15.0.0/24",501        }]502        neutron_subnet2 = self.neutron_client.create_subnet(503            {'subnets': subnet_param2})504        existing_neutron_port2 = self.neutron_client.create_port(505            {'port': {'network_id': neutron_network2['network']['id'],506                      'fixed_ips': [{'ip_address': ipv4_address}]}})507        fake_ipam2 = {508            "Driver": "kuryr",509            "Options": {510                'neutron.subnet.name': neutron_subnet_name2511            },512            "Config": [513                {514                    "Subnet": "10.15.0.0/24",515                    "Gateway": "10.15.0.1"516                }517            ]518        }519        # Create docker network using existing Neutron network520        options = {'neutron.net.name': neutron_net_name2,521                   'neutron.subnet.name': neutron_subnet_name2}522        container_net_name2 = lib_utils.get_random_string(8)523        container_net2 = self.docker_client.create_network(524            name=container_net_name2,525            driver='kuryr',526            options=options,527            ipam=fake_ipam2)528        container_net_id2 = container_net2.get('Id')529        # Boot the first container, and connect to the first docker network.530        endpoint_config = self.docker_client.create_endpoint_config(531            ipv4_address=ipv4_address)532        network_config = self.docker_client.create_networking_config({533            container_net_id: endpoint_config})534        container_name = lib_utils.get_random_string(8)535        container = self.docker_client.create_container(536            image='kuryr/busybox',537            command='/bin/sleep 600',538            hostname='kuryr_test_container',539            name=container_name,540            networking_config=network_config)541        container_id = container.get('Id')542        self.docker_client.start(container=container_id)543        # Boot the second container, and connect to the second docker network.544        endpoint_config = self.docker_client.create_endpoint_config(545            ipv4_address=ipv4_address)546        network_config = self.docker_client.create_networking_config({547            container_net_id2: endpoint_config})548        container_name2 = lib_utils.get_random_string(8)549        container2 = self.docker_client.create_container(550            image='kuryr/busybox',551            command='/bin/sleep 600',552            hostname='kuryr_test_container2',553            name=container_name2,554            networking_config=network_config)555        container_id2 = container2.get('Id')556        self.docker_client.start(container=container_id2)557        # Assert both existing neutron ports active558        for port_id in (existing_neutron_port['port']['id'],559                        existing_neutron_port2['port']['id']):560            utils.wait_for_port_active(561                self.neutron_client, port_id, 60)562            neutron_port = self.neutron_client.show_port(port_id)563            self.assertEqual('ACTIVE', neutron_port['port']['status'])564        # Disconnect the first container from network and565        # assert the first neutron port is down and the second is still active566        self.docker_client.disconnect_container_from_network(container_id,567                                                             container_net_id)568        existing_neutron_port = self.neutron_client.show_port(569            existing_neutron_port['port']['id'])570        self.assertEqual('DOWN', existing_neutron_port['port']['status'])571        existing_neutron_port2 = self.neutron_client.show_port(572            existing_neutron_port2['port']['id'])573        self.assertEqual('ACTIVE', existing_neutron_port2['port']['status'])574        # Disconnect the second container from network and575        # assert both neutron ports are down.576        self.docker_client.disconnect_container_from_network(container_id2,577                                                             container_net_id2)578        for port_id in (existing_neutron_port['port']['id'],579                        existing_neutron_port2['port']['id']):580            neutron_port = self.neutron_client.show_port(port_id)581            self.assertEqual('DOWN', neutron_port['port']['status'])582        # Cleanup resources583        self.docker_client.stop(container=container_id)584        self.docker_client.stop(container=container_id2)585        self.docker_client.remove_network(container_net_id)586        self.docker_client.remove_network(container_net_id2)587        self.neutron_client.delete_port(existing_neutron_port['port']['id'])588        self.neutron_client.delete_port(existing_neutron_port2['port']['id'])589        self.neutron_client.delete_subnet(neutron_subnet['subnets'][0]['id'])590        self.neutron_client.delete_subnet(neutron_subnet2['subnets'][0]['id'])...api_network_test.py
Source:api_network_test.py  
...82            container['Id']83        ]84        with pytest.raises(docker.errors.APIError):85            self.client.connect_container_to_network(container, net_id)86        self.client.disconnect_container_from_network(container, net_id)87        network_data = self.client.inspect_network(net_id)88        assert not network_data.get('Containers')89        with pytest.raises(docker.errors.APIError):90            self.client.disconnect_container_from_network(container, net_id)91    @requires_api_version('1.22')92    def test_connect_and_force_disconnect_container(self):93        net_name, net_id = self.create_network()94        container = self.client.create_container(TEST_IMG, 'top')95        self.tmp_containers.append(container)96        self.client.start(container)97        network_data = self.client.inspect_network(net_id)98        assert not network_data.get('Containers')99        self.client.connect_container_to_network(container, net_id)100        network_data = self.client.inspect_network(net_id)101        assert list(network_data['Containers'].keys()) == \102            [container['Id']]103        self.client.disconnect_container_from_network(container, net_id, True)104        network_data = self.client.inspect_network(net_id)105        assert not network_data.get('Containers')106        with pytest.raises(docker.errors.APIError):107            self.client.disconnect_container_from_network(108                container, net_id, force=True109            )110    @requires_api_version('1.22')111    def test_connect_with_aliases(self):112        net_name, net_id = self.create_network()113        container = self.client.create_container(TEST_IMG, 'top')114        self.tmp_containers.append(container)115        self.client.start(container)116        self.client.connect_container_to_network(117            container, net_id, aliases=['foo', 'bar'])118        container_data = self.client.inspect_container(container)119        aliases = (120            container_data['NetworkSettings']['Networks'][net_name]['Aliases']121        )122        assert 'foo' in aliases123        assert 'bar' in aliases124    def test_connect_on_container_create(self):125        net_name, net_id = self.create_network()126        container = self.client.create_container(127            image=TEST_IMG,128            command='top',129            host_config=self.client.create_host_config(network_mode=net_name),130        )131        self.tmp_containers.append(container)132        self.client.start(container)133        network_data = self.client.inspect_network(net_id)134        assert list(network_data['Containers'].keys()) == \135            [container['Id']]136        self.client.disconnect_container_from_network(container, net_id)137        network_data = self.client.inspect_network(net_id)138        assert not network_data.get('Containers')139    @requires_api_version('1.22')140    def test_create_with_aliases(self):141        net_name, net_id = self.create_network()142        container = self.client.create_container(143            image=TEST_IMG,144            command='top',145            host_config=self.client.create_host_config(146                network_mode=net_name,147            ),148            networking_config=self.client.create_networking_config({149                net_name: self.client.create_endpoint_config(150                    aliases=['foo', 'bar'],151                ),152            }),153        )154        self.tmp_containers.append(container)155        self.client.start(container)156        container_data = self.client.inspect_container(container)157        aliases = (158            container_data['NetworkSettings']['Networks'][net_name]['Aliases']159        )160        assert 'foo' in aliases161        assert 'bar' in aliases162    @requires_api_version('1.22')163    def test_create_with_ipv4_address(self):164        net_name, net_id = self.create_network(165            ipam=IPAMConfig(166                driver='default',167                pool_configs=[IPAMPool(subnet="132.124.0.0/16")],168            ),169        )170        container = self.client.create_container(171            image=TEST_IMG, command='top',172            host_config=self.client.create_host_config(network_mode=net_name),173            networking_config=self.client.create_networking_config({174                net_name: self.client.create_endpoint_config(175                    ipv4_address='132.124.0.23'176                )177            })178        )179        self.tmp_containers.append(container)180        self.client.start(container)181        net_settings = self.client.inspect_container(container)[182            'NetworkSettings'183        ]184        assert net_settings['Networks'][net_name]['IPAMConfig']['IPv4Address']\185            == '132.124.0.23'186    @requires_api_version('1.22')187    def test_create_with_ipv6_address(self):188        net_name, net_id = self.create_network(189            ipam=IPAMConfig(190                driver='default',191                pool_configs=[IPAMPool(subnet="2001:389::1/64")],192            ),193        )194        container = self.client.create_container(195            image=TEST_IMG, command='top',196            host_config=self.client.create_host_config(network_mode=net_name),197            networking_config=self.client.create_networking_config({198                net_name: self.client.create_endpoint_config(199                    ipv6_address='2001:389::f00d'200                )201            })202        )203        self.tmp_containers.append(container)204        self.client.start(container)205        net_settings = self.client.inspect_container(container)[206            'NetworkSettings'207        ]208        assert net_settings['Networks'][net_name]['IPAMConfig']['IPv6Address']\209            == '2001:389::f00d'210    @requires_api_version('1.24')211    def test_create_with_linklocal_ips(self):212        container = self.client.create_container(213            TEST_IMG, 'top',214            networking_config=self.client.create_networking_config(215                {216                    'bridge': self.client.create_endpoint_config(217                        link_local_ips=['169.254.8.8']218                    )219                }220            ),221            host_config=self.client.create_host_config(network_mode='bridge')222        )223        self.tmp_containers.append(container)224        self.client.start(container)225        container_data = self.client.inspect_container(container)226        net_cfg = container_data['NetworkSettings']['Networks']['bridge']227        assert 'IPAMConfig' in net_cfg228        assert 'LinkLocalIPs' in net_cfg['IPAMConfig']229        assert net_cfg['IPAMConfig']['LinkLocalIPs'] == ['169.254.8.8']230    @requires_api_version('1.32')231    def test_create_with_driveropt(self):232        container = self.client.create_container(233            TEST_IMG, 'top',234            networking_config=self.client.create_networking_config(235                {236                    'bridge': self.client.create_endpoint_config(237                        driver_opt={'com.docker-py.setting': 'on'}238                    )239                }240            ),241            host_config=self.client.create_host_config(network_mode='bridge')242        )243        self.tmp_containers.append(container)244        self.client.start(container)245        container_data = self.client.inspect_container(container)246        net_cfg = container_data['NetworkSettings']['Networks']['bridge']247        assert 'DriverOpts' in net_cfg248        assert 'com.docker-py.setting' in net_cfg['DriverOpts']249        assert net_cfg['DriverOpts']['com.docker-py.setting'] == 'on'250    @requires_api_version('1.22')251    def test_create_with_links(self):252        net_name, net_id = self.create_network()253        container = self.create_and_start(254            host_config=self.client.create_host_config(network_mode=net_name),255            networking_config=self.client.create_networking_config({256                net_name: self.client.create_endpoint_config(257                    links=[('docker-py-test-upstream', 'bar')],258                ),259            }),260        )261        net_settings = self.client.inspect_container(container)[262            'NetworkSettings'263        ]264        assert net_settings['Networks'][net_name]['Links'] == [265            'docker-py-test-upstream:bar'266        ]267        self.create_and_start(268            name='docker-py-test-upstream',269            host_config=self.client.create_host_config(network_mode=net_name),270        )271        self.execute(container, ['nslookup', 'bar'])272    def test_create_check_duplicate(self):273        net_name, net_id = self.create_network()274        with pytest.raises(docker.errors.APIError):275            self.client.create_network(net_name, check_duplicate=True)276        net_id = self.client.create_network(net_name, check_duplicate=False)277        self.tmp_networks.append(net_id['Id'])278    @requires_api_version('1.22')279    def test_connect_with_links(self):280        net_name, net_id = self.create_network()281        container = self.create_and_start(282            host_config=self.client.create_host_config(network_mode=net_name))283        self.client.disconnect_container_from_network(container, net_name)284        self.client.connect_container_to_network(285            container, net_name,286            links=[('docker-py-test-upstream', 'bar')])287        net_settings = self.client.inspect_container(container)[288            'NetworkSettings'289        ]290        assert net_settings['Networks'][net_name]['Links'] == [291            'docker-py-test-upstream:bar'292        ]293        self.create_and_start(294            name='docker-py-test-upstream',295            host_config=self.client.create_host_config(network_mode=net_name),296        )297        self.execute(container, ['nslookup', 'bar'])298    @requires_api_version('1.22')299    def test_connect_with_ipv4_address(self):300        net_name, net_id = self.create_network(301            ipam=IPAMConfig(302                driver='default',303                pool_configs=[304                    IPAMPool(305                        subnet="172.28.0.0/16", iprange="172.28.5.0/24",306                        gateway="172.28.5.254"307                    )308                ]309            )310        )311        container = self.create_and_start(312            host_config=self.client.create_host_config(network_mode=net_name))313        self.client.disconnect_container_from_network(container, net_name)314        self.client.connect_container_to_network(315            container, net_name, ipv4_address='172.28.5.24'316        )317        container_data = self.client.inspect_container(container)318        net_data = container_data['NetworkSettings']['Networks'][net_name]319        assert net_data['IPAMConfig']['IPv4Address'] == '172.28.5.24'320    @requires_api_version('1.22')321    def test_connect_with_ipv6_address(self):322        net_name, net_id = self.create_network(323            ipam=IPAMConfig(324                driver='default',325                pool_configs=[326                    IPAMPool(327                        subnet="2001:389::1/64", iprange="2001:389::0/96",328                        gateway="2001:389::ffff"329                    )330                ]331            )332        )333        container = self.create_and_start(334            host_config=self.client.create_host_config(network_mode=net_name))335        self.client.disconnect_container_from_network(container, net_name)336        self.client.connect_container_to_network(337            container, net_name, ipv6_address='2001:389::f00d'338        )339        container_data = self.client.inspect_container(container)340        net_data = container_data['NetworkSettings']['Networks'][net_name]341        assert net_data['IPAMConfig']['IPv6Address'] == '2001:389::f00d'342    @requires_api_version('1.23')343    def test_create_internal_networks(self):344        _, net_id = self.create_network(internal=True)345        net = self.client.inspect_network(net_id)346        assert net['Internal'] is True347    @requires_api_version('1.23')348    def test_create_network_with_labels(self):349        _, net_id = self.create_network(labels={...test_docker_compose.py
Source:test_docker_compose.py  
...59    caplog.set_level("INFO")60    with docker_compose_cm(docker_compose_yml=other_docker_compose_yml) as docker_compose:61        container = next(iter(docker_compose))62        network = network_name_from_yml(other_docker_compose_yml)63        disconnect_container_from_network(container=container, network=network)64        # Connecting multiple times is idempotent65        connect_container_to_network(container=container, network=network)66        assert f"Connected {container} to network {network}." in caplog.text67        connect_container_to_network(container=container, network=network)68        assert f"Unable to connect {container} to network {network}." in caplog.text69def test_disconnect_container_from_network(docker_compose_cm, other_docker_compose_yml, caplog):70    caplog.set_level("INFO")71    with docker_compose_cm(docker_compose_yml=other_docker_compose_yml) as docker_compose:72        container = next(iter(docker_compose))73        network = network_name_from_yml(other_docker_compose_yml)74        # Disconnecting multiple times is idempotent75        disconnect_container_from_network(container=container, network=network)76        assert f"Disconnected {container} from network {network}." in caplog.text77        disconnect_container_from_network(container=container, network=network)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
