Best Python code snippet using tempest_python
service_test.py
Source:service_test.py  
...49    container = service.create_container(**override_options)50    return service.start_container(container)51class ServiceTest(DockerClientTestCase):52    def test_containers(self):53        foo = self.create_service('foo')54        bar = self.create_service('bar')55        create_and_start_container(foo)56        assert len(foo.containers()) == 157        assert foo.containers()[0].name.startswith('composetest_foo_')58        assert len(bar.containers()) == 059        create_and_start_container(bar)60        create_and_start_container(bar)61        assert len(foo.containers()) == 162        assert len(bar.containers()) == 263        names = [c.name for c in bar.containers()]64        assert len(names) == 265        assert all(name.startswith('composetest_bar_') for name in names)66    def test_containers_one_off(self):67        db = self.create_service('db')68        container = db.create_container(one_off=True)69        assert db.containers(stopped=True) == []70        assert db.containers(one_off=OneOffFilter.only, stopped=True) == [container]71    def test_project_is_added_to_container_name(self):72        service = self.create_service('web')73        create_and_start_container(service)74        assert service.containers()[0].name.startswith('composetest_web_')75    def test_create_container_with_one_off(self):76        db = self.create_service('db')77        container = db.create_container(one_off=True)78        assert container.name.startswith('composetest_db_run_')79    def test_create_container_with_one_off_when_existing_container_is_running(self):80        db = self.create_service('db')81        db.start()82        container = db.create_container(one_off=True)83        assert container.name.startswith('composetest_db_run_')84    def test_create_container_with_unspecified_volume(self):85        service = self.create_service('db', volumes=[VolumeSpec.parse('/var/db')])86        container = service.create_container()87        service.start_container(container)88        assert container.get_mount('/var/db')89    def test_create_container_with_volume_driver(self):90        service = self.create_service('db', volume_driver='foodriver')91        container = service.create_container()92        service.start_container(container)93        assert 'foodriver' == container.get('HostConfig.VolumeDriver')94    @pytest.mark.skipif(SWARM_SKIP_CPU_SHARES, reason='Swarm --cpu-shares bug')95    def test_create_container_with_cpu_shares(self):96        service = self.create_service('db', cpu_shares=73)97        container = service.create_container()98        service.start_container(container)99        assert container.get('HostConfig.CpuShares') == 73100    def test_create_container_with_cpu_quota(self):101        service = self.create_service('db', cpu_quota=40000, cpu_period=150000)102        container = service.create_container()103        container.start()104        assert container.get('HostConfig.CpuQuota') == 40000105        assert container.get('HostConfig.CpuPeriod') == 150000106    @pytest.mark.xfail(raises=OperationFailedError, reason='not supported by kernel')107    def test_create_container_with_cpu_rt(self):108        service = self.create_service('db', cpu_rt_runtime=40000, cpu_rt_period=150000)109        container = service.create_container()110        container.start()111        assert container.get('HostConfig.CpuRealtimeRuntime') == 40000112        assert container.get('HostConfig.CpuRealtimePeriod') == 150000113    def test_create_container_with_cpu_count(self):114        self.require_api_version('1.25')115        service = self.create_service('db', cpu_count=2)116        container = service.create_container()117        service.start_container(container)118        assert container.get('HostConfig.CpuCount') == 2119    @pytest.mark.skipif(not IS_WINDOWS_PLATFORM, reason='cpu_percent is not supported for Linux')120    def test_create_container_with_cpu_percent(self):121        self.require_api_version('1.25')122        service = self.create_service('db', cpu_percent=12)123        container = service.create_container()124        service.start_container(container)125        assert container.get('HostConfig.CpuPercent') == 12126    def test_create_container_with_cpus(self):127        self.require_api_version('1.25')128        service = self.create_service('db', cpus=1)129        container = service.create_container()130        service.start_container(container)131        assert container.get('HostConfig.NanoCpus') == 1000000000132    def test_create_container_with_shm_size(self):133        self.require_api_version('1.22')134        service = self.create_service('db', shm_size=67108864)135        container = service.create_container()136        service.start_container(container)137        assert container.get('HostConfig.ShmSize') == 67108864138    def test_create_container_with_init_bool(self):139        self.require_api_version('1.25')140        service = self.create_service('db', init=True)141        container = service.create_container()142        service.start_container(container)143        assert container.get('HostConfig.Init') is True144    @pytest.mark.xfail(True, reason='Option has been removed in Engine 17.06.0')145    def test_create_container_with_init_path(self):146        self.require_api_version('1.25')147        docker_init_path = find_executable('docker-init')148        service = self.create_service('db', init=docker_init_path)149        container = service.create_container()150        service.start_container(container)151        assert container.get('HostConfig.InitPath') == docker_init_path152    @pytest.mark.xfail(True, reason='Some kernels/configs do not support pids_limit')153    def test_create_container_with_pids_limit(self):154        self.require_api_version('1.23')155        service = self.create_service('db', pids_limit=10)156        container = service.create_container()157        service.start_container(container)158        assert container.get('HostConfig.PidsLimit') == 10159    def test_create_container_with_extra_hosts_list(self):160        extra_hosts = ['somehost:162.242.195.82', 'otherhost:50.31.209.229']161        service = self.create_service('db', extra_hosts=extra_hosts)162        container = service.create_container()163        service.start_container(container)164        assert set(container.get('HostConfig.ExtraHosts')) == set(extra_hosts)165    def test_create_container_with_extra_hosts_dicts(self):166        extra_hosts = {'somehost': '162.242.195.82', 'otherhost': '50.31.209.229'}167        extra_hosts_list = ['somehost:162.242.195.82', 'otherhost:50.31.209.229']168        service = self.create_service('db', extra_hosts=extra_hosts)169        container = service.create_container()170        service.start_container(container)171        assert set(container.get('HostConfig.ExtraHosts')) == set(extra_hosts_list)172    def test_create_container_with_cpu_set(self):173        service = self.create_service('db', cpuset='0')174        container = service.create_container()175        service.start_container(container)176        assert container.get('HostConfig.CpusetCpus') == '0'177    def test_create_container_with_read_only_root_fs(self):178        read_only = True179        service = self.create_service('db', read_only=read_only)180        container = service.create_container()181        service.start_container(container)182        assert container.get('HostConfig.ReadonlyRootfs') == read_only183    @pytest.mark.xfail(True, reason='Getting "Your kernel does not support '184                                    'cgroup blkio weight and weight_device" on daemon start '185                                    'on Linux kernel 5.3.x')186    def test_create_container_with_blkio_config(self):187        blkio_config = {188            'weight': 300,189            'weight_device': [{'path': '/dev/sda', 'weight': 200}],190            'device_read_bps': [{'path': '/dev/sda', 'rate': 1024 * 1024 * 100}],191            'device_read_iops': [{'path': '/dev/sda', 'rate': 1000}],192            'device_write_bps': [{'path': '/dev/sda', 'rate': 1024 * 1024}],193            'device_write_iops': [{'path': '/dev/sda', 'rate': 800}]194        }195        service = self.create_service('web', blkio_config=blkio_config)196        container = service.create_container()197        assert container.get('HostConfig.BlkioWeight') == 300198        assert container.get('HostConfig.BlkioWeightDevice') == [{199            'Path': '/dev/sda', 'Weight': 200200        }]201        assert container.get('HostConfig.BlkioDeviceReadBps') == [{202            'Path': '/dev/sda', 'Rate': 1024 * 1024 * 100203        }]204        assert container.get('HostConfig.BlkioDeviceWriteBps') == [{205            'Path': '/dev/sda', 'Rate': 1024 * 1024206        }]207        assert container.get('HostConfig.BlkioDeviceReadIOps') == [{208            'Path': '/dev/sda', 'Rate': 1000209        }]210        assert container.get('HostConfig.BlkioDeviceWriteIOps') == [{211            'Path': '/dev/sda', 'Rate': 800212        }]213    def test_create_container_with_security_opt(self):214        security_opt = [SecurityOpt.parse('label:disable')]215        service = self.create_service('db', security_opt=security_opt)216        container = service.create_container()217        service.start_container(container)218        assert set(container.get('HostConfig.SecurityOpt')) == {o.repr() for o in security_opt}219    @pytest.mark.xfail(True, reason='Not supported on most drivers')220    def test_create_container_with_storage_opt(self):221        storage_opt = {'size': '1G'}222        service = self.create_service('db', storage_opt=storage_opt)223        container = service.create_container()224        service.start_container(container)225        assert container.get('HostConfig.StorageOpt') == storage_opt226    def test_create_container_with_oom_kill_disable(self):227        self.require_api_version('1.20')228        service = self.create_service('db', oom_kill_disable=True)229        container = service.create_container()230        assert container.get('HostConfig.OomKillDisable') is True231    def test_create_container_with_mac_address(self):232        service = self.create_service('db', mac_address='02:42:ac:11:65:43')233        container = service.create_container()234        service.start_container(container)235        assert container.inspect()['Config']['MacAddress'] == '02:42:ac:11:65:43'236    def test_create_container_with_device_cgroup_rules(self):237        service = self.create_service('db', device_cgroup_rules=['c 7:128 rwm'])238        container = service.create_container()239        assert container.get('HostConfig.DeviceCgroupRules') == ['c 7:128 rwm']240    def test_create_container_with_specified_volume(self):241        host_path = '/tmp/host-path'242        container_path = '/container-path'243        service = self.create_service(244            'db',245            volumes=[VolumeSpec(host_path, container_path, 'rw')])246        container = service.create_container()247        service.start_container(container)248        assert container.get_mount(container_path)249        # Match the last component ("host-path"), because boot2docker symlinks /tmp250        actual_host_path = container.get_mount(container_path)['Source']251        assert path.basename(actual_host_path) == path.basename(host_path), (252            "Last component differs: {}, {}".format(actual_host_path, host_path)253        )254    def test_create_container_with_host_mount(self):255        host_path = '/tmp/host-path'256        container_path = '/container-path'257        create_custom_host_file(self.client, path.join(host_path, 'a.txt'), 'test')258        service = self.create_service(259            'db',260            volumes=[261                MountSpec(type='bind', source=host_path, target=container_path, read_only=True)262            ]263        )264        container = service.create_container()265        service.start_container(container)266        mount = container.get_mount(container_path)267        assert mount268        assert path.basename(mount['Source']) == path.basename(host_path)269        assert mount['RW'] is False270    def test_create_container_with_tmpfs_mount(self):271        container_path = '/container-tmpfs'272        service = self.create_service(273            'db',274            volumes=[MountSpec(type='tmpfs', target=container_path)]275        )276        container = service.create_container()277        service.start_container(container)278        mount = container.get_mount(container_path)279        assert mount280        assert mount['Type'] == 'tmpfs'281    def test_create_container_with_tmpfs_mount_tmpfs_size(self):282        container_path = '/container-tmpfs'283        service = self.create_service(284            'db',285            volumes=[MountSpec(type='tmpfs', target=container_path, tmpfs={'size': 5368709})]286        )287        container = service.create_container()288        service.start_container(container)289        mount = container.get_mount(container_path)290        assert mount291        print(container.dictionary)292        assert mount['Type'] == 'tmpfs'293        assert container.get('HostConfig.Mounts')[0]['TmpfsOptions'] == {294            'SizeBytes': 5368709295        }296    def test_create_container_with_volume_mount(self):297        container_path = '/container-volume'298        volume_name = 'composetest_abcde'299        self.client.create_volume(volume_name)300        service = self.create_service(301            'db',302            volumes=[MountSpec(type='volume', source=volume_name, target=container_path)]303        )304        container = service.create_container()305        service.start_container(container)306        mount = container.get_mount(container_path)307        assert mount308        assert mount['Name'] == volume_name309    def test_create_container_with_legacy_mount(self):310        # Ensure mounts are converted to volumes if API version < 1.30311        # Needed to support long syntax in the 3.2 format312        client = docker_client({}, version='1.25')313        container_path = '/container-volume'314        volume_name = 'composetest_abcde'315        self.client.create_volume(volume_name)316        service = Service('db', client=client, volumes=[317            MountSpec(type='volume', source=volume_name, target=container_path)318        ], image=BUSYBOX_IMAGE_WITH_TAG, command=['top'], project='composetest')319        container = service.create_container()320        service.start_container(container)321        mount = container.get_mount(container_path)322        assert mount323        assert mount['Name'] == volume_name324    def test_create_container_with_legacy_tmpfs_mount(self):325        # Ensure tmpfs mounts are converted to tmpfs entries if API version < 1.30326        # Needed to support long syntax in the 3.2 format327        client = docker_client({}, version='1.25')328        container_path = '/container-tmpfs'329        service = Service('db', client=client, volumes=[330            MountSpec(type='tmpfs', target=container_path)331        ], image=BUSYBOX_IMAGE_WITH_TAG, command=['top'], project='composetest')332        container = service.create_container()333        service.start_container(container)334        mount = container.get_mount(container_path)335        assert mount is None336        assert container_path in container.get('HostConfig.Tmpfs')337    def test_create_container_with_healthcheck_config(self):338        one_second = parse_nanoseconds_int('1s')339        healthcheck = {340            'test': ['true'],341            'interval': 2 * one_second,342            'timeout': 5 * one_second,343            'retries': 5,344            'start_period': 2 * one_second345        }346        service = self.create_service('db', healthcheck=healthcheck)347        container = service.create_container()348        remote_healthcheck = container.get('Config.Healthcheck')349        assert remote_healthcheck['Test'] == healthcheck['test']350        assert remote_healthcheck['Interval'] == healthcheck['interval']351        assert remote_healthcheck['Timeout'] == healthcheck['timeout']352        assert remote_healthcheck['Retries'] == healthcheck['retries']353        assert remote_healthcheck['StartPeriod'] == healthcheck['start_period']354    def test_recreate_preserves_volume_with_trailing_slash(self):355        """When the Compose file specifies a trailing slash in the container path, make356        sure we copy the volume over when recreating.357        """358        service = self.create_service('data', volumes=[VolumeSpec.parse('/data/')])359        old_container = create_and_start_container(service)360        volume_path = old_container.get_mount('/data')['Source']361        new_container = service.recreate_container(old_container)362        assert new_container.get_mount('/data')['Source'] == volume_path363    def test_recreate_volume_to_mount(self):364        # https://github.com/docker/compose/issues/6280365        service = Service(366            project='composetest',367            name='db',368            client=self.client,369            build={'context': 'tests/fixtures/dockerfile-with-volume'},370            volumes=[MountSpec.parse({371                'type': 'volume',372                'target': '/data',373            })]374        )375        old_container = create_and_start_container(service)376        new_container = service.recreate_container(old_container)377        assert new_container.get_mount('/data')['Source']378    def test_duplicate_volume_trailing_slash(self):379        """380        When an image specifies a volume, and the Compose file specifies a host path381        but adds a trailing slash, make sure that we don't create duplicate binds.382        """383        host_path = '/tmp/data'384        container_path = '/data'385        volumes = [VolumeSpec.parse('{}:{}/'.format(host_path, container_path))]386        tmp_container = self.client.create_container(387            'busybox', 'true',388            volumes={container_path: {}},389            labels={'com.docker.compose.test_image': 'true'},390            host_config={}391        )392        image = self.client.commit(tmp_container)['Id']393        service = self.create_service('db', image=image, volumes=volumes)394        old_container = create_and_start_container(service)395        assert old_container.get('Config.Volumes') == {container_path: {}}396        service = self.create_service('db', image=image, volumes=volumes)397        new_container = service.recreate_container(old_container)398        assert new_container.get('Config.Volumes') == {container_path: {}}399        assert service.containers(stopped=False) == [new_container]400    def test_create_container_with_volumes_from(self):401        volume_service = self.create_service('data')402        volume_container_1 = volume_service.create_container()403        volume_container_2 = Container.create(404            self.client,405            image=BUSYBOX_IMAGE_WITH_TAG,406            command=["top"],407            labels={LABEL_PROJECT: 'composetest'},408            host_config={},409            environment=['affinity:container=={}'.format(volume_container_1.id)],410        )411        host_service = self.create_service(412            'host',413            volumes_from=[414                VolumeFromSpec(volume_service, 'rw', 'service'),415                VolumeFromSpec(volume_container_2, 'rw', 'container')416            ],417            environment=['affinity:container=={}'.format(volume_container_1.id)],418        )419        host_container = host_service.create_container()420        host_service.start_container(host_container)421        assert volume_container_1.id + ':rw' in host_container.get('HostConfig.VolumesFrom')422        assert volume_container_2.id + ':rw' in host_container.get('HostConfig.VolumesFrom')423    def test_execute_convergence_plan_recreate(self):424        service = self.create_service(425            'db',426            environment={'FOO': '1'},427            volumes=[VolumeSpec.parse('/etc')],428            entrypoint=['top'],429            command=['-d', '1']430        )431        old_container = service.create_container()432        assert old_container.get('Config.Entrypoint') == ['top']433        assert old_container.get('Config.Cmd') == ['-d', '1']434        assert 'FOO=1' in old_container.get('Config.Env')435        assert old_container.name.startswith('composetest_db_')436        service.start_container(old_container)437        old_container.inspect()  # reload volume data438        volume_path = old_container.get_mount('/etc')['Source']439        num_containers_before = len(self.client.containers(all=True))440        service.options['environment']['FOO'] = '2'441        new_container, = service.execute_convergence_plan(442            ConvergencePlan('recreate', [old_container]))443        assert new_container.get('Config.Entrypoint') == ['top']444        assert new_container.get('Config.Cmd') == ['-d', '1']445        assert 'FOO=2' in new_container.get('Config.Env')446        assert new_container.name.startswith('composetest_db_')447        assert new_container.get_mount('/etc')['Source'] == volume_path448        if not is_cluster(self.client):449            assert (450                'affinity:container==%s' % old_container.id in451                new_container.get('Config.Env')452            )453        else:454            # In Swarm, the env marker is consumed and the container should be deployed455            # on the same node.456            assert old_container.get('Node.Name') == new_container.get('Node.Name')457        assert len(self.client.containers(all=True)) == num_containers_before458        assert old_container.id != new_container.id459        with pytest.raises(APIError):460            self.client.inspect_container(old_container.id)461    def test_execute_convergence_plan_recreate_change_mount_target(self):462        service = self.create_service(463            'db',464            volumes=[MountSpec(target='/app1', type='volume')],465            entrypoint=['top'], command=['-d', '1']466        )467        old_container = create_and_start_container(service)468        assert (469            [mount['Destination'] for mount in old_container.get('Mounts')] ==470            ['/app1']471        )472        service.options['volumes'] = [MountSpec(target='/app2', type='volume')]473        new_container, = service.execute_convergence_plan(474            ConvergencePlan('recreate', [old_container])475        )476        assert (477            [mount['Destination'] for mount in new_container.get('Mounts')] ==478            ['/app2']479        )480    def test_execute_convergence_plan_recreate_twice(self):481        service = self.create_service(482            'db',483            volumes=[VolumeSpec.parse('/etc')],484            entrypoint=['top'],485            command=['-d', '1'])486        orig_container = service.create_container()487        service.start_container(orig_container)488        orig_container.inspect()  # reload volume data489        volume_path = orig_container.get_mount('/etc')['Source']490        # Do this twice to reproduce the bug491        for _ in range(2):492            new_container, = service.execute_convergence_plan(493                ConvergencePlan('recreate', [orig_container]))494            assert new_container.get_mount('/etc')['Source'] == volume_path495            if not is_cluster(self.client):496                assert ('affinity:container==%s' % orig_container.id in497                        new_container.get('Config.Env'))498            else:499                # In Swarm, the env marker is consumed and the container should be deployed500                # on the same node.501                assert orig_container.get('Node.Name') == new_container.get('Node.Name')502            orig_container = new_container503    def test_execute_convergence_plan_recreate_twice_with_mount(self):504        service = self.create_service(505            'db',506            volumes=[MountSpec(target='/etc', type='volume')],507            entrypoint=['top'],508            command=['-d', '1']509        )510        orig_container = service.create_container()511        service.start_container(orig_container)512        orig_container.inspect()  # reload volume data513        volume_path = orig_container.get_mount('/etc')['Source']514        # Do this twice to reproduce the bug515        for _ in range(2):516            new_container, = service.execute_convergence_plan(517                ConvergencePlan('recreate', [orig_container])518            )519            assert new_container.get_mount('/etc')['Source'] == volume_path520            if not is_cluster(self.client):521                assert ('affinity:container==%s' % orig_container.id in522                        new_container.get('Config.Env'))523            else:524                # In Swarm, the env marker is consumed and the container should be deployed525                # on the same node.526                assert orig_container.get('Node.Name') == new_container.get('Node.Name')527            orig_container = new_container528    def test_execute_convergence_plan_when_containers_are_stopped(self):529        service = self.create_service(530            'db',531            environment={'FOO': '1'},532            volumes=[VolumeSpec.parse('/var/db')],533            entrypoint=['top'],534            command=['-d', '1']535        )536        service.create_container()537        containers = service.containers(stopped=True)538        assert len(containers) == 1539        container, = containers540        assert not container.is_running541        service.execute_convergence_plan(ConvergencePlan('start', [container]))542        containers = service.containers()543        assert len(containers) == 1544        container.inspect()545        assert container == containers[0]546        assert container.is_running547    def test_execute_convergence_plan_with_image_declared_volume(self):548        service = Service(549            project='composetest',550            name='db',551            client=self.client,552            build={'context': 'tests/fixtures/dockerfile-with-volume'},553        )554        old_container = create_and_start_container(service)555        assert [mount['Destination'] for mount in old_container.get('Mounts')] == ['/data']556        volume_path = old_container.get_mount('/data')['Source']557        new_container, = service.execute_convergence_plan(558            ConvergencePlan('recreate', [old_container]))559        assert [mount['Destination'] for mount in new_container.get('Mounts')] == ['/data']560        assert new_container.get_mount('/data')['Source'] == volume_path561    def test_execute_convergence_plan_with_image_declared_volume_renew(self):562        service = Service(563            project='composetest',564            name='db',565            client=self.client,566            build={'context': 'tests/fixtures/dockerfile-with-volume'},567        )568        old_container = create_and_start_container(service)569        assert [mount['Destination'] for mount in old_container.get('Mounts')] == ['/data']570        volume_path = old_container.get_mount('/data')['Source']571        new_container, = service.execute_convergence_plan(572            ConvergencePlan('recreate', [old_container]), renew_anonymous_volumes=True573        )574        assert [mount['Destination'] for mount in new_container.get('Mounts')] == ['/data']575        assert new_container.get_mount('/data')['Source'] != volume_path576    def test_execute_convergence_plan_when_image_volume_masks_config(self):577        service = self.create_service(578            'db',579            build={'context': 'tests/fixtures/dockerfile-with-volume'},580        )581        old_container = create_and_start_container(service)582        assert [mount['Destination'] for mount in old_container.get('Mounts')] == ['/data']583        volume_path = old_container.get_mount('/data')['Source']584        service.options['volumes'] = [VolumeSpec.parse('/tmp:/data')]585        with mock.patch('compose.service.log') as mock_log:586            new_container, = service.execute_convergence_plan(587                ConvergencePlan('recreate', [old_container]))588        mock_log.warning.assert_called_once_with(mock.ANY)589        _, args, kwargs = mock_log.warning.mock_calls[0]590        assert "Service \"db\" is using volume \"/data\" from the previous container" in args[0]591        assert [mount['Destination'] for mount in new_container.get('Mounts')] == ['/data']592        assert new_container.get_mount('/data')['Source'] == volume_path593    def test_execute_convergence_plan_when_host_volume_is_removed(self):594        host_path = '/tmp/host-path'595        service = self.create_service(596            'db',597            build={'context': 'tests/fixtures/dockerfile-with-volume'},598            volumes=[VolumeSpec(host_path, '/data', 'rw')])599        old_container = create_and_start_container(service)600        assert (601            [mount['Destination'] for mount in old_container.get('Mounts')] ==602            ['/data']603        )604        service.options['volumes'] = []605        with mock.patch('compose.service.log', autospec=True) as mock_log:606            new_container, = service.execute_convergence_plan(607                ConvergencePlan('recreate', [old_container]))608        assert not mock_log.warn.called609        assert (610            [mount['Destination'] for mount in new_container.get('Mounts')] ==611            ['/data']612        )613        assert new_container.get_mount('/data')['Source'] != host_path614    def test_execute_convergence_plan_anonymous_volume_renew(self):615        service = self.create_service(616            'db',617            image='busybox',618            volumes=[VolumeSpec(None, '/data', 'rw')])619        old_container = create_and_start_container(service)620        assert (621            [mount['Destination'] for mount in old_container.get('Mounts')] ==622            ['/data']623        )624        volume_path = old_container.get_mount('/data')['Source']625        new_container, = service.execute_convergence_plan(626            ConvergencePlan('recreate', [old_container]),627            renew_anonymous_volumes=True628        )629        assert (630            [mount['Destination'] for mount in new_container.get('Mounts')] ==631            ['/data']632        )633        assert new_container.get_mount('/data')['Source'] != volume_path634    def test_execute_convergence_plan_anonymous_volume_recreate_then_renew(self):635        service = self.create_service(636            'db',637            image='busybox',638            volumes=[VolumeSpec(None, '/data', 'rw')])639        old_container = create_and_start_container(service)640        assert (641            [mount['Destination'] for mount in old_container.get('Mounts')] ==642            ['/data']643        )644        volume_path = old_container.get_mount('/data')['Source']645        mid_container, = service.execute_convergence_plan(646            ConvergencePlan('recreate', [old_container]),647        )648        assert (649            [mount['Destination'] for mount in mid_container.get('Mounts')] ==650            ['/data']651        )652        assert mid_container.get_mount('/data')['Source'] == volume_path653        new_container, = service.execute_convergence_plan(654            ConvergencePlan('recreate', [mid_container]),655            renew_anonymous_volumes=True656        )657        assert (658            [mount['Destination'] for mount in new_container.get('Mounts')] ==659            ['/data']660        )661        assert new_container.get_mount('/data')['Source'] != volume_path662    def test_execute_convergence_plan_without_start(self):663        service = self.create_service(664            'db',665            build={'context': 'tests/fixtures/dockerfile-with-volume'}666        )667        containers = service.execute_convergence_plan(ConvergencePlan('create', []), start=False)668        service_containers = service.containers(stopped=True)669        assert len(service_containers) == 1670        assert not service_containers[0].is_running671        containers = service.execute_convergence_plan(672            ConvergencePlan('recreate', containers),673            start=False)674        service_containers = service.containers(stopped=True)675        assert len(service_containers) == 1676        assert not service_containers[0].is_running677        service.execute_convergence_plan(ConvergencePlan('start', containers), start=False)678        service_containers = service.containers(stopped=True)679        assert len(service_containers) == 1680        assert not service_containers[0].is_running681    def test_execute_convergence_plan_image_with_volume_is_removed(self):682        service = self.create_service(683            'db', build={'context': 'tests/fixtures/dockerfile-with-volume'}684        )685        old_container = create_and_start_container(service)686        assert (687            [mount['Destination'] for mount in old_container.get('Mounts')] ==688            ['/data']689        )690        volume_path = old_container.get_mount('/data')['Source']691        old_container.stop()692        self.client.remove_image(service.image(), force=True)693        service.ensure_image_exists()694        with pytest.raises(ImageNotFound):695            service.execute_convergence_plan(696                ConvergencePlan('recreate', [old_container])697            )698        old_container.inspect()  # retrieve new name from server699        new_container, = service.execute_convergence_plan(700            ConvergencePlan('recreate', [old_container]),701            reset_container_image=True702        )703        assert [mount['Destination'] for mount in new_container.get('Mounts')] == ['/data']704        assert new_container.get_mount('/data')['Source'] == volume_path705    def test_start_container_passes_through_options(self):706        db = self.create_service('db')707        create_and_start_container(db, environment={'FOO': 'BAR'})708        assert db.containers()[0].environment['FOO'] == 'BAR'709    def test_start_container_inherits_options_from_constructor(self):710        db = self.create_service('db', environment={'FOO': 'BAR'})711        create_and_start_container(db)712        assert db.containers()[0].environment['FOO'] == 'BAR'713    @no_cluster('No legacy links support in Swarm')714    def test_start_container_creates_links(self):715        db = self.create_service('db')716        web = self.create_service('web', links=[(db, None)])717        db1 = create_and_start_container(db)718        db2 = create_and_start_container(db)719        create_and_start_container(web)720        assert set(get_links(web.containers()[0])) == {721            db1.name, db1.name_without_project,722            db2.name, db2.name_without_project,723            'db'724        }725    @no_cluster('No legacy links support in Swarm')726    def test_start_container_creates_links_with_names(self):727        db = self.create_service('db')728        web = self.create_service('web', links=[(db, 'custom_link_name')])729        db1 = create_and_start_container(db)730        db2 = create_and_start_container(db)731        create_and_start_container(web)732        assert set(get_links(web.containers()[0])) == {733            db1.name, db1.name_without_project,734            db2.name, db2.name_without_project,735            'custom_link_name'736        }737    @no_cluster('No legacy links support in Swarm')738    def test_start_container_with_external_links(self):739        db = self.create_service('db')740        db_ctnrs = [create_and_start_container(db) for _ in range(3)]741        web = self.create_service(742            'web', external_links=[743                db_ctnrs[0].name,744                db_ctnrs[1].name,745                '{}:db_3'.format(db_ctnrs[2].name)746            ]747        )748        create_and_start_container(web)749        assert set(get_links(web.containers()[0])) == {750            db_ctnrs[0].name,751            db_ctnrs[1].name,752            'db_3'753        }754    @no_cluster('No legacy links support in Swarm')755    def test_start_normal_container_does_not_create_links_to_its_own_service(self):756        db = self.create_service('db')757        create_and_start_container(db)758        create_and_start_container(db)759        c = create_and_start_container(db)760        assert set(get_links(c)) == set()761    @no_cluster('No legacy links support in Swarm')762    def test_start_one_off_container_creates_links_to_its_own_service(self):763        db = self.create_service('db')764        db1 = create_and_start_container(db)765        db2 = create_and_start_container(db)766        c = create_and_start_container(db, one_off=OneOffFilter.only)767        assert set(get_links(c)) == {768            db1.name, db1.name_without_project,769            db2.name, db2.name_without_project,770            'db'771        }772    def test_start_container_builds_images(self):773        service = Service(774            name='test',775            client=self.client,776            build={'context': 'tests/fixtures/simple-dockerfile'},777            project='composetest',778        )779        container = create_and_start_container(service)780        container.wait()781        assert b'success' in container.logs()782        assert len(self.client.images(name='composetest_test')) >= 1783    def test_start_container_uses_tagged_image_if_it_exists(self):784        self.check_build('tests/fixtures/simple-dockerfile', tag='composetest_test')785        service = Service(786            name='test',787            client=self.client,788            build={'context': 'this/does/not/exist/and/will/throw/error'},789            project='composetest',790        )791        container = create_and_start_container(service)792        container.wait()793        assert b'success' in container.logs()794    def test_start_container_creates_ports(self):795        service = self.create_service('web', ports=[8000])796        container = create_and_start_container(service).inspect()797        assert list(container['NetworkSettings']['Ports'].keys()) == ['8000/tcp']798        assert container['NetworkSettings']['Ports']['8000/tcp'][0]['HostPort'] != '8000'799    def test_build(self):800        base_dir = tempfile.mkdtemp()801        self.addCleanup(shutil.rmtree, base_dir)802        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:803            f.write("FROM busybox\n")804        service = self.create_service('web',805                                      build={'context': base_dir},806                                      environment={807                                          'COMPOSE_DOCKER_CLI_BUILD': '0',808                                          'DOCKER_BUILDKIT': '0',809                                      })810        service.build()811        self.addCleanup(self.client.remove_image, service.image_name)812        assert self.client.inspect_image('composetest_web')813    def test_build_cli(self):814        base_dir = tempfile.mkdtemp()815        self.addCleanup(shutil.rmtree, base_dir)816        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:817            f.write("FROM busybox\n")818        service = self.create_service('web',819                                      build={'context': base_dir},820                                      environment={821                                          'DOCKER_BUILDKIT': '1',822                                      })823        service.build(cli=True)824        self.addCleanup(self.client.remove_image, service.image_name)825        assert self.client.inspect_image('composetest_web')826    def test_build_cli_with_build_labels(self):827        base_dir = tempfile.mkdtemp()828        self.addCleanup(shutil.rmtree, base_dir)829        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:830            f.write("FROM busybox\n")831        service = self.create_service('web',832                                      build={833                                          'context': base_dir,834                                          'labels': {'com.docker.compose.test': 'true'}},835                                      )836        service.build(cli=True)837        self.addCleanup(self.client.remove_image, service.image_name)838        image = self.client.inspect_image('composetest_web')839        assert image['Config']['Labels']['com.docker.compose.test']840    def test_build_cli_with_build_error(self):841        base_dir = tempfile.mkdtemp()842        self.addCleanup(shutil.rmtree, base_dir)843        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:844            f.write('\n'.join([845                "FROM busybox",846                "RUN exit 2",847            ]))848        service = self.create_service('web',849                                      build={850                                          'context': base_dir,851                                          'labels': {'com.docker.compose.test': 'true'}},852                                      )853        with pytest.raises(BuildError):854            service.build(cli=True)855    def test_up_build_cli(self):856        base_dir = tempfile.mkdtemp()857        self.addCleanup(shutil.rmtree, base_dir)858        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:859            f.write("FROM busybox\n")860        web = self.create_service('web',861                                  build={'context': base_dir},862                                  environment={863                                      'DOCKER_BUILDKIT': '1',864                                  })865        project = Project('composetest', [web], self.client)866        project.up(do_build=BuildAction.force)867        containers = project.containers(['web'])868        assert len(containers) == 1869        assert containers[0].name.startswith('composetest_web_')870    def test_build_non_ascii_filename(self):871        base_dir = tempfile.mkdtemp()872        self.addCleanup(shutil.rmtree, base_dir)873        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:874            f.write("FROM busybox\n")875        with open(os.path.join(base_dir.encode('utf8'), b'foo\xE2bar'), 'w') as f:876            f.write("hello world\n")877        service = self.create_service('web', build={'context': str(base_dir)})878        service.build()879        self.addCleanup(self.client.remove_image, service.image_name)880        assert self.client.inspect_image('composetest_web')881    def test_build_with_image_name(self):882        base_dir = tempfile.mkdtemp()883        self.addCleanup(shutil.rmtree, base_dir)884        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:885            f.write("FROM busybox\n")886        image_name = 'examples/composetest:latest'887        self.addCleanup(self.client.remove_image, image_name)888        self.create_service('web', build={'context': base_dir}, image=image_name).build()889        assert self.client.inspect_image(image_name)890    def test_build_with_git_url(self):891        build_url = "https://github.com/dnephin/docker-build-from-url.git"892        service = self.create_service('buildwithurl', build={'context': build_url})893        self.addCleanup(self.client.remove_image, service.image_name)894        service.build()895        assert service.image()896    def test_build_with_build_args(self):897        base_dir = tempfile.mkdtemp()898        self.addCleanup(shutil.rmtree, base_dir)899        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:900            f.write("FROM busybox\n")901            f.write("ARG build_version\n")902            f.write("RUN echo ${build_version}\n")903        service = self.create_service('buildwithargs',904                                      build={'context': str(base_dir),905                                             'args': {"build_version": "1"}})906        service.build()907        self.addCleanup(self.client.remove_image, service.image_name)908        assert service.image()909        assert "build_version=1" in service.image()['ContainerConfig']['Cmd']910    def test_build_with_build_args_override(self):911        base_dir = tempfile.mkdtemp()912        self.addCleanup(shutil.rmtree, base_dir)913        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:914            f.write("FROM busybox\n")915            f.write("ARG build_version\n")916            f.write("RUN echo ${build_version}\n")917        service = self.create_service('buildwithargs',918                                      build={'context': str(base_dir),919                                             'args': {"build_version": "1"}})920        service.build(build_args_override={'build_version': '2'})921        self.addCleanup(self.client.remove_image, service.image_name)922        assert service.image()923        assert "build_version=2" in service.image()['ContainerConfig']['Cmd']924    def test_build_with_build_labels(self):925        base_dir = tempfile.mkdtemp()926        self.addCleanup(shutil.rmtree, base_dir)927        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:928            f.write('FROM busybox\n')929        service = self.create_service('buildlabels', build={930            'context': str(base_dir),931            'labels': {'com.docker.compose.test': 'true'}932        })933        service.build()934        self.addCleanup(self.client.remove_image, service.image_name)935        assert service.image()936        assert service.image()['Config']['Labels']['com.docker.compose.test'] == 'true'937    @no_cluster('Container networks not on Swarm')938    def test_build_with_network(self):939        base_dir = tempfile.mkdtemp()940        self.addCleanup(shutil.rmtree, base_dir)941        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:942            f.write('FROM busybox\n')943            f.write('RUN ping -c1 google.local\n')944        net_container = self.client.create_container(945            'busybox', 'top', host_config=self.client.create_host_config(946                extra_hosts={'google.local': '127.0.0.1'}947            ), name='composetest_build_network'948        )949        self.addCleanup(self.client.remove_container, net_container, force=True)950        self.client.start(net_container)951        service = self.create_service('buildwithnet', build={952            'context': str(base_dir),953            'network': 'container:{}'.format(net_container['Id'])954        })955        service.build()956        self.addCleanup(self.client.remove_image, service.image_name)957        assert service.image()958    @no_cluster('Not supported on UCP 2.2.0-beta1')  # FIXME: remove once support is added959    def test_build_with_target(self):960        self.require_api_version('1.30')961        base_dir = tempfile.mkdtemp()962        self.addCleanup(shutil.rmtree, base_dir)963        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:964            f.write('FROM busybox as one\n')965            f.write('LABEL com.docker.compose.test=true\n')966            f.write('LABEL com.docker.compose.test.target=one\n')967            f.write('FROM busybox as two\n')968            f.write('LABEL com.docker.compose.test.target=two\n')969        service = self.create_service('buildtarget', build={970            'context': str(base_dir),971            'target': 'one'972        })973        service.build()974        assert service.image()975        assert service.image()['Config']['Labels']['com.docker.compose.test.target'] == 'one'976    def test_build_with_extra_hosts(self):977        self.require_api_version('1.27')978        base_dir = tempfile.mkdtemp()979        self.addCleanup(shutil.rmtree, base_dir)980        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:981            f.write('\n'.join([982                'FROM busybox',983                'RUN ping -c1 foobar',984                'RUN ping -c1 baz',985            ]))986        service = self.create_service('build_extra_hosts', build={987            'context': str(base_dir),988            'extra_hosts': {989                'foobar': '127.0.0.1',990                'baz': '127.0.0.1'991            }992        })993        service.build()994        assert service.image()995    def test_build_with_gzip(self):996        base_dir = tempfile.mkdtemp()997        self.addCleanup(shutil.rmtree, base_dir)998        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:999            f.write('\n'.join([1000                'FROM busybox',1001                'COPY . /src',1002                'RUN cat /src/hello.txt'1003            ]))1004        with open(os.path.join(base_dir, 'hello.txt'), 'w') as f:1005            f.write('hello world\n')1006        service = self.create_service('build_gzip', build={1007            'context': str(base_dir),1008        })1009        service.build(gzip=True)1010        assert service.image()1011    def test_build_with_isolation(self):1012        base_dir = tempfile.mkdtemp()1013        self.addCleanup(shutil.rmtree, base_dir)1014        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:1015            f.write('FROM busybox\n')1016        service = self.create_service('build_isolation', build={1017            'context': str(base_dir),1018            'isolation': 'default',1019        })1020        service.build()1021        assert service.image()1022    def test_build_with_illegal_leading_chars(self):1023        base_dir = tempfile.mkdtemp()1024        self.addCleanup(shutil.rmtree, base_dir)1025        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:1026            f.write('FROM busybox\nRUN echo "Embodiment of Scarlet Devil"\n')1027        service = Service(1028            'build_leading_slug', client=self.client,1029            project='___-composetest', build={1030                'context': str(base_dir)1031            }1032        )1033        assert service.image_name == 'composetest_build_leading_slug'1034        service.build()1035        assert service.image()1036    def test_start_container_stays_unprivileged(self):1037        service = self.create_service('web')1038        container = create_and_start_container(service).inspect()1039        assert container['HostConfig']['Privileged'] is False1040    def test_start_container_becomes_privileged(self):1041        service = self.create_service('web', privileged=True)1042        container = create_and_start_container(service).inspect()1043        assert container['HostConfig']['Privileged'] is True1044    def test_expose_does_not_publish_ports(self):1045        service = self.create_service('web', expose=["8000"])1046        container = create_and_start_container(service).inspect()1047        assert container['NetworkSettings']['Ports'] == {'8000/tcp': None}1048    def test_start_container_creates_port_with_explicit_protocol(self):1049        service = self.create_service('web', ports=['8000/udp'])1050        container = create_and_start_container(service).inspect()1051        assert list(container['NetworkSettings']['Ports'].keys()) == ['8000/udp']1052    def test_start_container_creates_fixed_external_ports(self):1053        service = self.create_service('web', ports=['8000:8000'])1054        container = create_and_start_container(service).inspect()1055        assert '8000/tcp' in container['NetworkSettings']['Ports']1056        assert container['NetworkSettings']['Ports']['8000/tcp'][0]['HostPort'] == '8000'1057    def test_start_container_creates_fixed_external_ports_when_it_is_different_to_internal_port(self):1058        service = self.create_service('web', ports=['8001:8000'])1059        container = create_and_start_container(service).inspect()1060        assert '8000/tcp' in container['NetworkSettings']['Ports']1061        assert container['NetworkSettings']['Ports']['8000/tcp'][0]['HostPort'] == '8001'1062    def test_port_with_explicit_interface(self):1063        service = self.create_service('web', ports=[1064            '127.0.0.1:8001:8000',1065            '0.0.0.0:9001:9000/udp',1066        ])1067        container = create_and_start_container(service).inspect()1068        assert container['NetworkSettings']['Ports']['8000/tcp'] == [{1069            'HostIp': '127.0.0.1',1070            'HostPort': '8001',1071        }]1072        assert container['NetworkSettings']['Ports']['9000/udp'][0]['HostPort'] == '9001'1073        if not is_cluster(self.client):1074            assert container['NetworkSettings']['Ports']['9000/udp'][0]['HostIp'] == '0.0.0.0'1075        # self.assertEqual(container['NetworkSettings']['Ports'], {1076        #     '8000/tcp': [1077        #         {1078        #             'HostIp': '127.0.0.1',1079        #             'HostPort': '8001',1080        #         },1081        #     ],1082        #     '9000/udp': [1083        #         {1084        #             'HostIp': '0.0.0.0',1085        #             'HostPort': '9001',1086        #         },1087        #     ],1088        # })1089    def test_create_with_image_id(self):1090        pull_busybox(self.client)1091        image_id = self.client.inspect_image(BUSYBOX_IMAGE_WITH_TAG)['Id'][:12]1092        service = self.create_service('foo', image=image_id)1093        service.create_container()1094    def test_scale(self):1095        service = self.create_service('web')1096        service.scale(1)1097        assert len(service.containers()) == 11098        # Ensure containers don't have stdout or stdin connected1099        container = service.containers()[0]1100        config = container.inspect()['Config']1101        assert not config['AttachStderr']1102        assert not config['AttachStdout']1103        assert not config['AttachStdin']1104        service.scale(3)1105        assert len(service.containers()) == 31106        service.scale(1)1107        assert len(service.containers()) == 11108        service.scale(0)1109        assert len(service.containers()) == 01110    @pytest.mark.skipif(1111        SWARM_SKIP_CONTAINERS_ALL,1112        reason='Swarm /containers/json bug'1113    )1114    def test_scale_with_stopped_containers(self):1115        """1116        Given there are some stopped containers and scale is called with a1117        desired number that is the same as the number of stopped containers,1118        test that those containers are restarted and not removed/recreated.1119        """1120        service = self.create_service('web')1121        service.create_container(number=1)1122        service.create_container(number=2)1123        ParallelStreamWriter.instance = None1124        with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr:1125            service.scale(2)1126        for container in service.containers():1127            assert container.is_running1128            assert container.number in [1, 2]1129        captured_output = mock_stderr.getvalue()1130        assert 'Creating' not in captured_output1131        assert 'Starting' in captured_output1132    def test_scale_with_stopped_containers_and_needing_creation(self):1133        """1134        Given there are some stopped containers and scale is called with a1135        desired number that is greater than the number of stopped containers,1136        test that those containers are restarted and required number are created.1137        """1138        service = self.create_service('web')1139        next_number = service._next_container_number()1140        service.create_container(number=next_number, quiet=True)1141        for container in service.containers():1142            assert not container.is_running1143        ParallelStreamWriter.instance = None1144        with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr:1145            service.scale(2)1146        assert len(service.containers()) == 21147        for container in service.containers():1148            assert container.is_running1149        captured_output = mock_stderr.getvalue()1150        assert 'Creating' in captured_output1151        assert 'Starting' in captured_output1152    def test_scale_with_api_error(self):1153        """Test that when scaling if the API returns an error, that error is handled1154        and the remaining threads continue.1155        """1156        service = self.create_service('web')1157        next_number = service._next_container_number()1158        service.create_container(number=next_number, quiet=True)1159        with mock.patch(1160            'compose.container.Container.create',1161            side_effect=APIError(1162                message="testing",1163                response={},1164                explanation="Boom")):1165            with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr:1166                with pytest.raises(OperationFailedError):1167                    service.scale(3)1168        assert len(service.containers()) == 11169        assert service.containers()[0].is_running1170        assert "ERROR: for composetest_web_" in mock_stderr.getvalue()1171        assert "Cannot create container for service web: Boom" in mock_stderr.getvalue()1172    def test_scale_with_unexpected_exception(self):1173        """Test that when scaling if the API returns an error, that is not of type1174        APIError, that error is re-raised.1175        """1176        service = self.create_service('web')1177        next_number = service._next_container_number()1178        service.create_container(number=next_number, quiet=True)1179        with mock.patch(1180            'compose.container.Container.create',1181            side_effect=ValueError("BOOM")1182        ):1183            with pytest.raises(ValueError):1184                service.scale(3)1185        assert len(service.containers()) == 11186        assert service.containers()[0].is_running1187    @mock.patch('compose.service.log')1188    def test_scale_with_desired_number_already_achieved(self, mock_log):1189        """1190        Test that calling scale with a desired number that is equal to the1191        number of containers already running results in no change.1192        """1193        service = self.create_service('web')1194        next_number = service._next_container_number()1195        container = service.create_container(number=next_number, quiet=True)1196        container.start()1197        container.inspect()1198        assert container.is_running1199        assert len(service.containers()) == 11200        service.scale(1)1201        assert len(service.containers()) == 11202        container.inspect()1203        assert container.is_running1204        captured_output = mock_log.info.call_args[0]1205        assert 'Desired container number already achieved' in captured_output1206    @mock.patch('compose.service.log')1207    def test_scale_with_custom_container_name_outputs_warning(self, mock_log):1208        """Test that calling scale on a service that has a custom container name1209        results in warning output.1210        """1211        service = self.create_service('app', container_name='custom-container')1212        assert service.custom_container_name == 'custom-container'1213        with pytest.raises(OperationFailedError):1214            service.scale(3)1215        captured_output = mock_log.warning.call_args[0][0]1216        assert len(service.containers()) == 11217        assert "Remove the custom name to scale the service." in captured_output1218    def test_scale_sets_ports(self):1219        service = self.create_service('web', ports=['8000'])1220        service.scale(2)1221        containers = service.containers()1222        assert len(containers) == 21223        for container in containers:1224            assert list(container.get('HostConfig.PortBindings')) == ['8000/tcp']1225    def test_scale_with_immediate_exit(self):1226        service = self.create_service('web', image='busybox', command='true')1227        service.scale(2)1228        assert len(service.containers(stopped=True)) == 21229    def test_network_mode_none(self):1230        service = self.create_service('web', network_mode=NetworkMode('none'))1231        container = create_and_start_container(service)1232        assert container.get('HostConfig.NetworkMode') == 'none'1233    def test_network_mode_bridged(self):1234        service = self.create_service('web', network_mode=NetworkMode('bridge'))1235        container = create_and_start_container(service)1236        assert container.get('HostConfig.NetworkMode') == 'bridge'1237    def test_network_mode_host(self):1238        service = self.create_service('web', network_mode=NetworkMode('host'))1239        container = create_and_start_container(service)1240        assert container.get('HostConfig.NetworkMode') == 'host'1241    def test_pid_mode_none_defined(self):1242        service = self.create_service('web', pid_mode=None)1243        container = create_and_start_container(service)1244        assert container.get('HostConfig.PidMode') == ''1245    def test_pid_mode_host(self):1246        service = self.create_service('web', pid_mode=PidMode('host'))1247        container = create_and_start_container(service)1248        assert container.get('HostConfig.PidMode') == 'host'1249    def test_ipc_mode_none_defined(self):1250        service = self.create_service('web', ipc_mode=None)1251        container = create_and_start_container(service)1252        print(container.get('HostConfig.IpcMode'))1253        assert container.get('HostConfig.IpcMode') == 'shareable'1254    def test_ipc_mode_host(self):1255        service = self.create_service('web', ipc_mode=IpcMode('host'))1256        container = create_and_start_container(service)1257        assert container.get('HostConfig.IpcMode') == 'host'1258    def test_userns_mode_none_defined(self):1259        service = self.create_service('web', userns_mode=None)1260        container = create_and_start_container(service)1261        assert container.get('HostConfig.UsernsMode') == ''1262    def test_userns_mode_host(self):1263        service = self.create_service('web', userns_mode='host')1264        container = create_and_start_container(service)1265        assert container.get('HostConfig.UsernsMode') == 'host'1266    def test_dns_no_value(self):1267        service = self.create_service('web')1268        container = create_and_start_container(service)1269        assert container.get('HostConfig.Dns') is None1270    def test_dns_list(self):1271        service = self.create_service('web', dns=['8.8.8.8', '9.9.9.9'])1272        container = create_and_start_container(service)1273        assert container.get('HostConfig.Dns') == ['8.8.8.8', '9.9.9.9']1274    def test_mem_swappiness(self):1275        service = self.create_service('web', mem_swappiness=11)1276        container = create_and_start_container(service)1277        assert container.get('HostConfig.MemorySwappiness') == 111278    def test_mem_reservation(self):1279        service = self.create_service('web', mem_reservation='20m')1280        container = create_and_start_container(service)1281        assert container.get('HostConfig.MemoryReservation') == 20 * 1024 * 10241282    def test_restart_always_value(self):1283        service = self.create_service('web', restart={'Name': 'always'})1284        container = create_and_start_container(service)1285        assert container.get('HostConfig.RestartPolicy.Name') == 'always'1286    def test_oom_score_adj_value(self):1287        service = self.create_service('web', oom_score_adj=500)1288        container = create_and_start_container(service)1289        assert container.get('HostConfig.OomScoreAdj') == 5001290    def test_group_add_value(self):1291        service = self.create_service('web', group_add=["root", "1"])1292        container = create_and_start_container(service)1293        host_container_groupadd = container.get('HostConfig.GroupAdd')1294        assert "root" in host_container_groupadd1295        assert "1" in host_container_groupadd1296    def test_dns_opt_value(self):1297        service = self.create_service('web', dns_opt=["use-vc", "no-tld-query"])1298        container = create_and_start_container(service)1299        dns_opt = container.get('HostConfig.DnsOptions')1300        assert 'use-vc' in dns_opt1301        assert 'no-tld-query' in dns_opt1302    def test_restart_on_failure_value(self):1303        service = self.create_service('web', restart={1304            'Name': 'on-failure',1305            'MaximumRetryCount': 51306        })1307        container = create_and_start_container(service)1308        assert container.get('HostConfig.RestartPolicy.Name') == 'on-failure'1309        assert container.get('HostConfig.RestartPolicy.MaximumRetryCount') == 51310    def test_cap_add_list(self):1311        service = self.create_service('web', cap_add=['SYS_ADMIN', 'NET_ADMIN'])1312        container = create_and_start_container(service)1313        assert container.get('HostConfig.CapAdd') == ['SYS_ADMIN', 'NET_ADMIN']1314    def test_cap_drop_list(self):1315        service = self.create_service('web', cap_drop=['SYS_ADMIN', 'NET_ADMIN'])1316        container = create_and_start_container(service)1317        assert container.get('HostConfig.CapDrop') == ['SYS_ADMIN', 'NET_ADMIN']1318    def test_dns_search(self):1319        service = self.create_service('web', dns_search=['dc1.example.com', 'dc2.example.com'])1320        container = create_and_start_container(service)1321        assert container.get('HostConfig.DnsSearch') == ['dc1.example.com', 'dc2.example.com']1322    def test_tmpfs(self):1323        service = self.create_service('web', tmpfs=['/run'])1324        container = create_and_start_container(service)1325        assert container.get('HostConfig.Tmpfs') == {'/run': ''}1326    def test_working_dir_param(self):1327        service = self.create_service('container', working_dir='/working/dir/sample')1328        container = service.create_container()1329        assert container.get('Config.WorkingDir') == '/working/dir/sample'1330    def test_split_env(self):1331        service = self.create_service(1332            'web',1333            environment=['NORMAL=F1', 'CONTAINS_EQUALS=F=2', 'TRAILING_EQUALS='])1334        env = create_and_start_container(service).environment1335        for k, v in {'NORMAL': 'F1', 'CONTAINS_EQUALS': 'F=2', 'TRAILING_EQUALS': ''}.items():1336            assert env[k] == v1337    def test_env_from_file_combined_with_env(self):1338        service = self.create_service(1339            'web',1340            environment=['ONE=1', 'TWO=2', 'THREE=3'],1341            env_file=['tests/fixtures/env/one.env', 'tests/fixtures/env/two.env'])1342        env = create_and_start_container(service).environment1343        for k, v in {1344            'ONE': '1',1345            'TWO': '2',1346            'THREE': '3',1347            'FOO': 'baz',1348            'DOO': 'dah'1349        }.items():1350            assert env[k] == v1351    def test_build_with_cachefrom(self):1352        base_dir = tempfile.mkdtemp()1353        self.addCleanup(shutil.rmtree, base_dir)1354        with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:1355            f.write("FROM busybox\n")1356        service = self.create_service('cache_from',1357                                      build={'context': base_dir,1358                                             'cache_from': ['build1']})1359        service.build()1360        self.addCleanup(self.client.remove_image, service.image_name)1361        assert service.image()1362    @mock.patch.dict(os.environ)1363    def test_resolve_env(self):1364        os.environ['FILE_DEF'] = 'E1'1365        os.environ['FILE_DEF_EMPTY'] = 'E2'1366        os.environ['ENV_DEF'] = 'E3'1367        service = self.create_service(1368            'web',1369            environment={1370                'FILE_DEF': 'F1',1371                'FILE_DEF_EMPTY': '',1372                'ENV_DEF': None,1373                'NO_DEF': None1374            }1375        )1376        env = create_and_start_container(service).environment1377        for k, v in {1378            'FILE_DEF': 'F1',1379            'FILE_DEF_EMPTY': '',1380            'ENV_DEF': 'E3',1381            'NO_DEF': None1382        }.items():1383            assert env[k] == v1384    def test_with_high_enough_api_version_we_get_default_network_mode(self):1385        # TODO: remove this test once minimum docker version is 1.8.x1386        with mock.patch.object(self.client, '_version', '1.20'):1387            service = self.create_service('web')1388            service_config = service._get_container_host_config({})1389            assert service_config['NetworkMode'] == 'default'1390    def test_labels(self):1391        labels_dict = {1392            'com.example.description': "Accounting webapp",1393            'com.example.department': "Finance",1394            'com.example.label-with-empty-value': "",1395        }1396        compose_labels = {1397            LABEL_ONE_OFF: 'False',1398            LABEL_PROJECT: 'composetest',1399            LABEL_SERVICE: 'web',1400            LABEL_VERSION: __version__,1401            LABEL_CONTAINER_NUMBER: '1'1402        }1403        expected = dict(labels_dict, **compose_labels)1404        service = self.create_service('web', labels=labels_dict)1405        ctnr = create_and_start_container(service)1406        labels = ctnr.labels.items()1407        for pair in expected.items():1408            assert pair in labels1409    def test_empty_labels(self):1410        labels_dict = {'foo': '', 'bar': ''}1411        service = self.create_service('web', labels=labels_dict)1412        labels = create_and_start_container(service).labels.items()1413        for name in labels_dict:1414            assert (name, '') in labels1415    def test_stop_signal(self):1416        stop_signal = 'SIGINT'1417        service = self.create_service('web', stop_signal=stop_signal)1418        container = create_and_start_container(service)1419        assert container.stop_signal == stop_signal1420    def test_custom_container_name(self):1421        service = self.create_service('web', container_name='my-web-container')1422        assert service.custom_container_name == 'my-web-container'1423        container = create_and_start_container(service)1424        assert container.name == 'my-web-container'1425        one_off_container = service.create_container(one_off=True)1426        assert one_off_container.name != 'my-web-container'1427    @pytest.mark.skipif(True, reason="Broken on 1.11.0 - 17.03.0")1428    def test_log_drive_invalid(self):1429        service = self.create_service('web', logging={'driver': 'xxx'})1430        expected_error_msg = "logger: no log driver named 'xxx' is registered"1431        with pytest.raises(APIError) as excinfo:1432            create_and_start_container(service)1433        assert re.search(expected_error_msg, excinfo.value)1434    def test_log_drive_empty_default_jsonfile(self):1435        service = self.create_service('web')1436        log_config = create_and_start_container(service).log_config1437        assert 'json-file' == log_config['Type']1438        assert not log_config['Config']1439    def test_log_drive_none(self):1440        service = self.create_service('web', logging={'driver': 'none'})1441        log_config = create_and_start_container(service).log_config1442        assert 'none' == log_config['Type']1443        assert not log_config['Config']1444    def test_devices(self):1445        service = self.create_service('web', devices=["/dev/random:/dev/mapped-random"])1446        device_config = create_and_start_container(service).get('HostConfig.Devices')1447        device_dict = {1448            'PathOnHost': '/dev/random',1449            'CgroupPermissions': 'rwm',1450            'PathInContainer': '/dev/mapped-random'1451        }1452        assert 1 == len(device_config)1453        assert device_dict == device_config[0]1454    def test_duplicate_containers(self):1455        service = self.create_service('web')1456        options = service._get_container_create_options({}, service._next_container_number())1457        original = Container.create(service.client, **options)1458        assert set(service.containers(stopped=True)) == {original}1459        assert set(service.duplicate_containers()) == set()1460        options['name'] = 'temporary_container_name'1461        duplicate = Container.create(service.client, **options)1462        assert set(service.containers(stopped=True)) == {original, duplicate}1463        assert set(service.duplicate_containers()) == {duplicate}1464def converge(service, strategy=ConvergenceStrategy.changed):1465    """Create a converge plan from a strategy and execute the plan."""1466    plan = service.convergence_plan(strategy)1467    return service.execute_convergence_plan(plan, timeout=1)1468class ConfigHashTest(DockerClientTestCase):1469    def test_no_config_hash_when_one_off(self):1470        web = self.create_service('web')1471        container = web.create_container(one_off=True)1472        assert LABEL_CONFIG_HASH not in container.labels1473    def test_no_config_hash_when_overriding_options(self):1474        web = self.create_service('web')1475        container = web.create_container(environment={'FOO': '1'})1476        assert LABEL_CONFIG_HASH not in container.labels1477    def test_config_hash_with_custom_labels(self):1478        web = self.create_service('web', labels={'foo': '1'})1479        container = converge(web)[0]1480        assert LABEL_CONFIG_HASH in container.labels1481        assert 'foo' in container.labels1482    def test_config_hash_sticks_around(self):1483        web = self.create_service('web', command=["top"])1484        container = converge(web)[0]1485        assert LABEL_CONFIG_HASH in container.labels1486        web = self.create_service('web', command=["top", "-d", "1"])1487        container = converge(web)[0]...start-services.py
Source:start-services.py  
...45	for service in services:46		if service.name == name:47			return service48	return None49def create_service(name, replicas, postfix):50	service = get_service(name)51	if service is not None:52		update_replicas(service, replicas)53	else:54		run_service(name, replicas, postfix)55def create_service_telegraf(name, postfix):56	subprocess.run(["bash", "start-services_exec.sh"] + global_params + ["create_service_telegraf", name])57def create_service_telegraf_on_master(name, postfix):58	subprocess.run(["bash", "start-services_exec.sh"] + global_params + ["create_service_telegraf_on_master", name])59def rm_service(name, postfix):60	# subprocess.run(["docker", "service", "rm", name])61	subprocess.run(["bash", "start-services_exec.sh"] + global_params + ["rm_service", name])62def create_secrets(name):63	# subprocess.run(["docker", "service", "rm", name])64	subprocess.run(["bash", "start-services_exec.sh"] + global_params + ["create_secrets" + "_" + name])65def create_network():66	client.networks.create("smartmeter", driver="overlay")67## RUN SCENARIO ##68def run(steps):69	if not isinstance(steps[0], list):70		steps = [steps]71	for step in steps:72		print()73		print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")74		print(step)75		if step[0] == "create_service" :76			create_service(step[1], step[2], postfix)77		elif step[0] == "rm_service" :78			rm_service(step[1], postfix)79		elif step[0] == "create_secrets" :80			create_secrets(step[1])81		elif step[0] == "create_service_telegraf" :82			create_service_telegraf(step[1], postfix)83		elif step[0] == "create_service_telegraf_on_master" :84			create_service_telegraf_on_master(step[1], postfix)85		else:86			call(step[0], step[1], step[2:])87def run_or_kill(steps):88	if not isinstance(steps[0], list):89		steps = [steps]90	# Collect all existing services names...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
