How to use create_service method in tempest

Best Python code snippet using tempest_python

service_test.py

Source:service_test.py Github

copy

Full Screen

...49 container = service.create_container(**override_options)50 return service.start_container(container)51class ServiceTest(DockerClientTestCase):52 def test_containers(self):53 foo = self.create_service('foo')54 bar = self.create_service('bar')55 create_and_start_container(foo)56 assert len(foo.containers()) == 157 assert foo.containers()[0].name.startswith('composetest_foo_')58 assert len(bar.containers()) == 059 create_and_start_container(bar)60 create_and_start_container(bar)61 assert len(foo.containers()) == 162 assert len(bar.containers()) == 263 names = [c.name for c in bar.containers()]64 assert len(names) == 265 assert all(name.startswith('composetest_bar_') for name in names)66 def test_containers_one_off(self):67 db = self.create_service('db')68 container = db.create_container(one_off=True)69 assert db.containers(stopped=True) == []70 assert db.containers(one_off=OneOffFilter.only, stopped=True) == [container]71 def test_project_is_added_to_container_name(self):72 service = self.create_service('web')73 create_and_start_container(service)74 assert service.containers()[0].name.startswith('composetest_web_')75 def test_create_container_with_one_off(self):76 db = self.create_service('db')77 container = db.create_container(one_off=True)78 assert container.name.startswith('composetest_db_run_')79 def test_create_container_with_one_off_when_existing_container_is_running(self):80 db = self.create_service('db')81 db.start()82 container = db.create_container(one_off=True)83 assert container.name.startswith('composetest_db_run_')84 def test_create_container_with_unspecified_volume(self):85 service = self.create_service('db', volumes=[VolumeSpec.parse('/var/db')])86 container = service.create_container()87 service.start_container(container)88 assert container.get_mount('/var/db')89 def test_create_container_with_volume_driver(self):90 service = self.create_service('db', volume_driver='foodriver')91 container = service.create_container()92 service.start_container(container)93 assert 'foodriver' == container.get('HostConfig.VolumeDriver')94 @pytest.mark.skipif(SWARM_SKIP_CPU_SHARES, reason='Swarm --cpu-shares bug')95 def test_create_container_with_cpu_shares(self):96 service = self.create_service('db', cpu_shares=73)97 container = service.create_container()98 service.start_container(container)99 assert container.get('HostConfig.CpuShares') == 73100 def test_create_container_with_cpu_quota(self):101 service = self.create_service('db', cpu_quota=40000, cpu_period=150000)102 container = service.create_container()103 container.start()104 assert container.get('HostConfig.CpuQuota') == 40000105 assert container.get('HostConfig.CpuPeriod') == 150000106 @pytest.mark.xfail(raises=OperationFailedError, reason='not supported by kernel')107 def test_create_container_with_cpu_rt(self):108 service = self.create_service('db', cpu_rt_runtime=40000, cpu_rt_period=150000)109 container = service.create_container()110 container.start()111 assert container.get('HostConfig.CpuRealtimeRuntime') == 40000112 assert container.get('HostConfig.CpuRealtimePeriod') == 150000113 def test_create_container_with_cpu_count(self):114 self.require_api_version('1.25')115 service = self.create_service('db', cpu_count=2)116 container = service.create_container()117 service.start_container(container)118 assert container.get('HostConfig.CpuCount') == 2119 @pytest.mark.skipif(not IS_WINDOWS_PLATFORM, reason='cpu_percent is not supported for Linux')120 def test_create_container_with_cpu_percent(self):121 self.require_api_version('1.25')122 service = self.create_service('db', cpu_percent=12)123 container = service.create_container()124 service.start_container(container)125 assert container.get('HostConfig.CpuPercent') == 12126 def test_create_container_with_cpus(self):127 self.require_api_version('1.25')128 service = self.create_service('db', cpus=1)129 container = service.create_container()130 service.start_container(container)131 assert container.get('HostConfig.NanoCpus') == 1000000000132 def test_create_container_with_shm_size(self):133 self.require_api_version('1.22')134 service = self.create_service('db', shm_size=67108864)135 container = service.create_container()136 service.start_container(container)137 assert container.get('HostConfig.ShmSize') == 67108864138 def test_create_container_with_init_bool(self):139 self.require_api_version('1.25')140 service = self.create_service('db', init=True)141 container = service.create_container()142 service.start_container(container)143 assert container.get('HostConfig.Init') is True144 @pytest.mark.xfail(True, reason='Option has been removed in Engine 17.06.0')145 def test_create_container_with_init_path(self):146 self.require_api_version('1.25')147 docker_init_path = find_executable('docker-init')148 service = self.create_service('db', init=docker_init_path)149 container = service.create_container()150 service.start_container(container)151 assert container.get('HostConfig.InitPath') == docker_init_path152 @pytest.mark.xfail(True, reason='Some kernels/configs do not support pids_limit')153 def test_create_container_with_pids_limit(self):154 self.require_api_version('1.23')155 service = self.create_service('db', pids_limit=10)156 container = service.create_container()157 service.start_container(container)158 assert container.get('HostConfig.PidsLimit') == 10159 def test_create_container_with_extra_hosts_list(self):160 extra_hosts = ['somehost:162.242.195.82', 'otherhost:50.31.209.229']161 service = self.create_service('db', extra_hosts=extra_hosts)162 container = service.create_container()163 service.start_container(container)164 assert set(container.get('HostConfig.ExtraHosts')) == set(extra_hosts)165 def test_create_container_with_extra_hosts_dicts(self):166 extra_hosts = {'somehost': '162.242.195.82', 'otherhost': '50.31.209.229'}167 extra_hosts_list = ['somehost:162.242.195.82', 'otherhost:50.31.209.229']168 service = self.create_service('db', extra_hosts=extra_hosts)169 container = service.create_container()170 service.start_container(container)171 assert set(container.get('HostConfig.ExtraHosts')) == set(extra_hosts_list)172 def test_create_container_with_cpu_set(self):173 service = self.create_service('db', cpuset='0')174 container = service.create_container()175 service.start_container(container)176 assert container.get('HostConfig.CpusetCpus') == '0'177 def test_create_container_with_read_only_root_fs(self):178 read_only = True179 service = self.create_service('db', read_only=read_only)180 container = service.create_container()181 service.start_container(container)182 assert container.get('HostConfig.ReadonlyRootfs') == read_only183 @pytest.mark.xfail(True, reason='Getting "Your kernel does not support '184 'cgroup blkio weight and weight_device" on daemon start '185 'on Linux kernel 5.3.x')186 def test_create_container_with_blkio_config(self):187 blkio_config = {188 'weight': 300,189 'weight_device': [{'path': '/dev/sda', 'weight': 200}],190 'device_read_bps': [{'path': '/dev/sda', 'rate': 1024 * 1024 * 100}],191 'device_read_iops': [{'path': '/dev/sda', 'rate': 1000}],192 'device_write_bps': [{'path': '/dev/sda', 'rate': 1024 * 1024}],193 'device_write_iops': [{'path': '/dev/sda', 'rate': 800}]194 }195 service = self.create_service('web', blkio_config=blkio_config)196 container = service.create_container()197 assert container.get('HostConfig.BlkioWeight') == 300198 assert container.get('HostConfig.BlkioWeightDevice') == [{199 'Path': '/dev/sda', 'Weight': 200200 }]201 assert container.get('HostConfig.BlkioDeviceReadBps') == [{202 'Path': '/dev/sda', 'Rate': 1024 * 1024 * 100203 }]204 assert container.get('HostConfig.BlkioDeviceWriteBps') == [{205 'Path': '/dev/sda', 'Rate': 1024 * 1024206 }]207 assert container.get('HostConfig.BlkioDeviceReadIOps') == [{208 'Path': '/dev/sda', 'Rate': 1000209 }]210 assert container.get('HostConfig.BlkioDeviceWriteIOps') == [{211 'Path': '/dev/sda', 'Rate': 800212 }]213 def test_create_container_with_security_opt(self):214 security_opt = [SecurityOpt.parse('label:disable')]215 service = self.create_service('db', security_opt=security_opt)216 container = service.create_container()217 service.start_container(container)218 assert set(container.get('HostConfig.SecurityOpt')) == {o.repr() for o in security_opt}219 @pytest.mark.xfail(True, reason='Not supported on most drivers')220 def test_create_container_with_storage_opt(self):221 storage_opt = {'size': '1G'}222 service = self.create_service('db', storage_opt=storage_opt)223 container = service.create_container()224 service.start_container(container)225 assert container.get('HostConfig.StorageOpt') == storage_opt226 def test_create_container_with_oom_kill_disable(self):227 self.require_api_version('1.20')228 service = self.create_service('db', oom_kill_disable=True)229 container = service.create_container()230 assert container.get('HostConfig.OomKillDisable') is True231 def test_create_container_with_mac_address(self):232 service = self.create_service('db', mac_address='02:42:ac:11:65:43')233 container = service.create_container()234 service.start_container(container)235 assert container.inspect()['Config']['MacAddress'] == '02:42:ac:11:65:43'236 def test_create_container_with_device_cgroup_rules(self):237 service = self.create_service('db', device_cgroup_rules=['c 7:128 rwm'])238 container = service.create_container()239 assert container.get('HostConfig.DeviceCgroupRules') == ['c 7:128 rwm']240 def test_create_container_with_specified_volume(self):241 host_path = '/tmp/host-path'242 container_path = '/container-path'243 service = self.create_service(244 'db',245 volumes=[VolumeSpec(host_path, container_path, 'rw')])246 container = service.create_container()247 service.start_container(container)248 assert container.get_mount(container_path)249 # Match the last component ("host-path"), because boot2docker symlinks /tmp250 actual_host_path = container.get_mount(container_path)['Source']251 assert path.basename(actual_host_path) == path.basename(host_path), (252 "Last component differs: {}, {}".format(actual_host_path, host_path)253 )254 def test_create_container_with_host_mount(self):255 host_path = '/tmp/host-path'256 container_path = '/container-path'257 create_custom_host_file(self.client, path.join(host_path, 'a.txt'), 'test')258 service = self.create_service(259 'db',260 volumes=[261 MountSpec(type='bind', source=host_path, target=container_path, read_only=True)262 ]263 )264 container = service.create_container()265 service.start_container(container)266 mount = container.get_mount(container_path)267 assert mount268 assert path.basename(mount['Source']) == path.basename(host_path)269 assert mount['RW'] is False270 def test_create_container_with_tmpfs_mount(self):271 container_path = '/container-tmpfs'272 service = self.create_service(273 'db',274 volumes=[MountSpec(type='tmpfs', target=container_path)]275 )276 container = service.create_container()277 service.start_container(container)278 mount = container.get_mount(container_path)279 assert mount280 assert mount['Type'] == 'tmpfs'281 def test_create_container_with_tmpfs_mount_tmpfs_size(self):282 container_path = '/container-tmpfs'283 service = self.create_service(284 'db',285 volumes=[MountSpec(type='tmpfs', target=container_path, tmpfs={'size': 5368709})]286 )287 container = service.create_container()288 service.start_container(container)289 mount = container.get_mount(container_path)290 assert mount291 print(container.dictionary)292 assert mount['Type'] == 'tmpfs'293 assert container.get('HostConfig.Mounts')[0]['TmpfsOptions'] == {294 'SizeBytes': 5368709295 }296 def test_create_container_with_volume_mount(self):297 container_path = '/container-volume'298 volume_name = 'composetest_abcde'299 self.client.create_volume(volume_name)300 service = self.create_service(301 'db',302 volumes=[MountSpec(type='volume', source=volume_name, target=container_path)]303 )304 container = service.create_container()305 service.start_container(container)306 mount = container.get_mount(container_path)307 assert mount308 assert mount['Name'] == volume_name309 def test_create_container_with_legacy_mount(self):310 # Ensure mounts are converted to volumes if API version < 1.30311 # Needed to support long syntax in the 3.2 format312 client = docker_client({}, version='1.25')313 container_path = '/container-volume'314 volume_name = 'composetest_abcde'315 self.client.create_volume(volume_name)316 service = Service('db', client=client, volumes=[317 MountSpec(type='volume', source=volume_name, target=container_path)318 ], image=BUSYBOX_IMAGE_WITH_TAG, command=['top'], project='composetest')319 container = service.create_container()320 service.start_container(container)321 mount = container.get_mount(container_path)322 assert mount323 assert mount['Name'] == volume_name324 def test_create_container_with_legacy_tmpfs_mount(self):325 # Ensure tmpfs mounts are converted to tmpfs entries if API version < 1.30326 # Needed to support long syntax in the 3.2 format327 client = docker_client({}, version='1.25')328 container_path = '/container-tmpfs'329 service = Service('db', client=client, volumes=[330 MountSpec(type='tmpfs', target=container_path)331 ], image=BUSYBOX_IMAGE_WITH_TAG, command=['top'], project='composetest')332 container = service.create_container()333 service.start_container(container)334 mount = container.get_mount(container_path)335 assert mount is None336 assert container_path in container.get('HostConfig.Tmpfs')337 def test_create_container_with_healthcheck_config(self):338 one_second = parse_nanoseconds_int('1s')339 healthcheck = {340 'test': ['true'],341 'interval': 2 * one_second,342 'timeout': 5 * one_second,343 'retries': 5,344 'start_period': 2 * one_second345 }346 service = self.create_service('db', healthcheck=healthcheck)347 container = service.create_container()348 remote_healthcheck = container.get('Config.Healthcheck')349 assert remote_healthcheck['Test'] == healthcheck['test']350 assert remote_healthcheck['Interval'] == healthcheck['interval']351 assert remote_healthcheck['Timeout'] == healthcheck['timeout']352 assert remote_healthcheck['Retries'] == healthcheck['retries']353 assert remote_healthcheck['StartPeriod'] == healthcheck['start_period']354 def test_recreate_preserves_volume_with_trailing_slash(self):355 """When the Compose file specifies a trailing slash in the container path, make356 sure we copy the volume over when recreating.357 """358 service = self.create_service('data', volumes=[VolumeSpec.parse('/data/')])359 old_container = create_and_start_container(service)360 volume_path = old_container.get_mount('/data')['Source']361 new_container = service.recreate_container(old_container)362 assert new_container.get_mount('/data')['Source'] == volume_path363 def test_recreate_volume_to_mount(self):364 # https://github.com/docker/compose/issues/6280365 service = Service(366 project='composetest',367 name='db',368 client=self.client,369 build={'context': 'tests/fixtures/dockerfile-with-volume'},370 volumes=[MountSpec.parse({371 'type': 'volume',372 'target': '/data',373 })]374 )375 old_container = create_and_start_container(service)376 new_container = service.recreate_container(old_container)377 assert new_container.get_mount('/data')['Source']378 def test_duplicate_volume_trailing_slash(self):379 """380 When an image specifies a volume, and the Compose file specifies a host path381 but adds a trailing slash, make sure that we don't create duplicate binds.382 """383 host_path = '/tmp/data'384 container_path = '/data'385 volumes = [VolumeSpec.parse('{}:{}/'.format(host_path, container_path))]386 tmp_container = self.client.create_container(387 'busybox', 'true',388 volumes={container_path: {}},389 labels={'com.docker.compose.test_image': 'true'},390 host_config={}391 )392 image = self.client.commit(tmp_container)['Id']393 service = self.create_service('db', image=image, volumes=volumes)394 old_container = create_and_start_container(service)395 assert old_container.get('Config.Volumes') == {container_path: {}}396 service = self.create_service('db', image=image, volumes=volumes)397 new_container = service.recreate_container(old_container)398 assert new_container.get('Config.Volumes') == {container_path: {}}399 assert service.containers(stopped=False) == [new_container]400 def test_create_container_with_volumes_from(self):401 volume_service = self.create_service('data')402 volume_container_1 = volume_service.create_container()403 volume_container_2 = Container.create(404 self.client,405 image=BUSYBOX_IMAGE_WITH_TAG,406 command=["top"],407 labels={LABEL_PROJECT: 'composetest'},408 host_config={},409 environment=['affinity:container=={}'.format(volume_container_1.id)],410 )411 host_service = self.create_service(412 'host',413 volumes_from=[414 VolumeFromSpec(volume_service, 'rw', 'service'),415 VolumeFromSpec(volume_container_2, 'rw', 'container')416 ],417 environment=['affinity:container=={}'.format(volume_container_1.id)],418 )419 host_container = host_service.create_container()420 host_service.start_container(host_container)421 assert volume_container_1.id + ':rw' in host_container.get('HostConfig.VolumesFrom')422 assert volume_container_2.id + ':rw' in host_container.get('HostConfig.VolumesFrom')423 def test_execute_convergence_plan_recreate(self):424 service = self.create_service(425 'db',426 environment={'FOO': '1'},427 volumes=[VolumeSpec.parse('/etc')],428 entrypoint=['top'],429 command=['-d', '1']430 )431 old_container = service.create_container()432 assert old_container.get('Config.Entrypoint') == ['top']433 assert old_container.get('Config.Cmd') == ['-d', '1']434 assert 'FOO=1' in old_container.get('Config.Env')435 assert old_container.name.startswith('composetest_db_')436 service.start_container(old_container)437 old_container.inspect() # reload volume data438 volume_path = old_container.get_mount('/etc')['Source']439 num_containers_before = len(self.client.containers(all=True))440 service.options['environment']['FOO'] = '2'441 new_container, = service.execute_convergence_plan(442 ConvergencePlan('recreate', [old_container]))443 assert new_container.get('Config.Entrypoint') == ['top']444 assert new_container.get('Config.Cmd') == ['-d', '1']445 assert 'FOO=2' in new_container.get('Config.Env')446 assert new_container.name.startswith('composetest_db_')447 assert new_container.get_mount('/etc')['Source'] == volume_path448 if not is_cluster(self.client):449 assert (450 'affinity:container==%s' % old_container.id in451 new_container.get('Config.Env')452 )453 else:454 # In Swarm, the env marker is consumed and the container should be deployed455 # on the same node.456 assert old_container.get('Node.Name') == new_container.get('Node.Name')457 assert len(self.client.containers(all=True)) == num_containers_before458 assert old_container.id != new_container.id459 with pytest.raises(APIError):460 self.client.inspect_container(old_container.id)461 def test_execute_convergence_plan_recreate_change_mount_target(self):462 service = self.create_service(463 'db',464 volumes=[MountSpec(target='/app1', type='volume')],465 entrypoint=['top'], command=['-d', '1']466 )467 old_container = create_and_start_container(service)468 assert (469 [mount['Destination'] for mount in old_container.get('Mounts')] ==470 ['/app1']471 )472 service.options['volumes'] = [MountSpec(target='/app2', type='volume')]473 new_container, = service.execute_convergence_plan(474 ConvergencePlan('recreate', [old_container])475 )476 assert (477 [mount['Destination'] for mount in new_container.get('Mounts')] ==478 ['/app2']479 )480 def test_execute_convergence_plan_recreate_twice(self):481 service = self.create_service(482 'db',483 volumes=[VolumeSpec.parse('/etc')],484 entrypoint=['top'],485 command=['-d', '1'])486 orig_container = service.create_container()487 service.start_container(orig_container)488 orig_container.inspect() # reload volume data489 volume_path = orig_container.get_mount('/etc')['Source']490 # Do this twice to reproduce the bug491 for _ in range(2):492 new_container, = service.execute_convergence_plan(493 ConvergencePlan('recreate', [orig_container]))494 assert new_container.get_mount('/etc')['Source'] == volume_path495 if not is_cluster(self.client):496 assert ('affinity:container==%s' % orig_container.id in497 new_container.get('Config.Env'))498 else:499 # In Swarm, the env marker is consumed and the container should be deployed500 # on the same node.501 assert orig_container.get('Node.Name') == new_container.get('Node.Name')502 orig_container = new_container503 def test_execute_convergence_plan_recreate_twice_with_mount(self):504 service = self.create_service(505 'db',506 volumes=[MountSpec(target='/etc', type='volume')],507 entrypoint=['top'],508 command=['-d', '1']509 )510 orig_container = service.create_container()511 service.start_container(orig_container)512 orig_container.inspect() # reload volume data513 volume_path = orig_container.get_mount('/etc')['Source']514 # Do this twice to reproduce the bug515 for _ in range(2):516 new_container, = service.execute_convergence_plan(517 ConvergencePlan('recreate', [orig_container])518 )519 assert new_container.get_mount('/etc')['Source'] == volume_path520 if not is_cluster(self.client):521 assert ('affinity:container==%s' % orig_container.id in522 new_container.get('Config.Env'))523 else:524 # In Swarm, the env marker is consumed and the container should be deployed525 # on the same node.526 assert orig_container.get('Node.Name') == new_container.get('Node.Name')527 orig_container = new_container528 def test_execute_convergence_plan_when_containers_are_stopped(self):529 service = self.create_service(530 'db',531 environment={'FOO': '1'},532 volumes=[VolumeSpec.parse('/var/db')],533 entrypoint=['top'],534 command=['-d', '1']535 )536 service.create_container()537 containers = service.containers(stopped=True)538 assert len(containers) == 1539 container, = containers540 assert not container.is_running541 service.execute_convergence_plan(ConvergencePlan('start', [container]))542 containers = service.containers()543 assert len(containers) == 1544 container.inspect()545 assert container == containers[0]546 assert container.is_running547 def test_execute_convergence_plan_with_image_declared_volume(self):548 service = Service(549 project='composetest',550 name='db',551 client=self.client,552 build={'context': 'tests/fixtures/dockerfile-with-volume'},553 )554 old_container = create_and_start_container(service)555 assert [mount['Destination'] for mount in old_container.get('Mounts')] == ['/data']556 volume_path = old_container.get_mount('/data')['Source']557 new_container, = service.execute_convergence_plan(558 ConvergencePlan('recreate', [old_container]))559 assert [mount['Destination'] for mount in new_container.get('Mounts')] == ['/data']560 assert new_container.get_mount('/data')['Source'] == volume_path561 def test_execute_convergence_plan_with_image_declared_volume_renew(self):562 service = Service(563 project='composetest',564 name='db',565 client=self.client,566 build={'context': 'tests/fixtures/dockerfile-with-volume'},567 )568 old_container = create_and_start_container(service)569 assert [mount['Destination'] for mount in old_container.get('Mounts')] == ['/data']570 volume_path = old_container.get_mount('/data')['Source']571 new_container, = service.execute_convergence_plan(572 ConvergencePlan('recreate', [old_container]), renew_anonymous_volumes=True573 )574 assert [mount['Destination'] for mount in new_container.get('Mounts')] == ['/data']575 assert new_container.get_mount('/data')['Source'] != volume_path576 def test_execute_convergence_plan_when_image_volume_masks_config(self):577 service = self.create_service(578 'db',579 build={'context': 'tests/fixtures/dockerfile-with-volume'},580 )581 old_container = create_and_start_container(service)582 assert [mount['Destination'] for mount in old_container.get('Mounts')] == ['/data']583 volume_path = old_container.get_mount('/data')['Source']584 service.options['volumes'] = [VolumeSpec.parse('/tmp:/data')]585 with mock.patch('compose.service.log') as mock_log:586 new_container, = service.execute_convergence_plan(587 ConvergencePlan('recreate', [old_container]))588 mock_log.warning.assert_called_once_with(mock.ANY)589 _, args, kwargs = mock_log.warning.mock_calls[0]590 assert "Service \"db\" is using volume \"/data\" from the previous container" in args[0]591 assert [mount['Destination'] for mount in new_container.get('Mounts')] == ['/data']592 assert new_container.get_mount('/data')['Source'] == volume_path593 def test_execute_convergence_plan_when_host_volume_is_removed(self):594 host_path = '/tmp/host-path'595 service = self.create_service(596 'db',597 build={'context': 'tests/fixtures/dockerfile-with-volume'},598 volumes=[VolumeSpec(host_path, '/data', 'rw')])599 old_container = create_and_start_container(service)600 assert (601 [mount['Destination'] for mount in old_container.get('Mounts')] ==602 ['/data']603 )604 service.options['volumes'] = []605 with mock.patch('compose.service.log', autospec=True) as mock_log:606 new_container, = service.execute_convergence_plan(607 ConvergencePlan('recreate', [old_container]))608 assert not mock_log.warn.called609 assert (610 [mount['Destination'] for mount in new_container.get('Mounts')] ==611 ['/data']612 )613 assert new_container.get_mount('/data')['Source'] != host_path614 def test_execute_convergence_plan_anonymous_volume_renew(self):615 service = self.create_service(616 'db',617 image='busybox',618 volumes=[VolumeSpec(None, '/data', 'rw')])619 old_container = create_and_start_container(service)620 assert (621 [mount['Destination'] for mount in old_container.get('Mounts')] ==622 ['/data']623 )624 volume_path = old_container.get_mount('/data')['Source']625 new_container, = service.execute_convergence_plan(626 ConvergencePlan('recreate', [old_container]),627 renew_anonymous_volumes=True628 )629 assert (630 [mount['Destination'] for mount in new_container.get('Mounts')] ==631 ['/data']632 )633 assert new_container.get_mount('/data')['Source'] != volume_path634 def test_execute_convergence_plan_anonymous_volume_recreate_then_renew(self):635 service = self.create_service(636 'db',637 image='busybox',638 volumes=[VolumeSpec(None, '/data', 'rw')])639 old_container = create_and_start_container(service)640 assert (641 [mount['Destination'] for mount in old_container.get('Mounts')] ==642 ['/data']643 )644 volume_path = old_container.get_mount('/data')['Source']645 mid_container, = service.execute_convergence_plan(646 ConvergencePlan('recreate', [old_container]),647 )648 assert (649 [mount['Destination'] for mount in mid_container.get('Mounts')] ==650 ['/data']651 )652 assert mid_container.get_mount('/data')['Source'] == volume_path653 new_container, = service.execute_convergence_plan(654 ConvergencePlan('recreate', [mid_container]),655 renew_anonymous_volumes=True656 )657 assert (658 [mount['Destination'] for mount in new_container.get('Mounts')] ==659 ['/data']660 )661 assert new_container.get_mount('/data')['Source'] != volume_path662 def test_execute_convergence_plan_without_start(self):663 service = self.create_service(664 'db',665 build={'context': 'tests/fixtures/dockerfile-with-volume'}666 )667 containers = service.execute_convergence_plan(ConvergencePlan('create', []), start=False)668 service_containers = service.containers(stopped=True)669 assert len(service_containers) == 1670 assert not service_containers[0].is_running671 containers = service.execute_convergence_plan(672 ConvergencePlan('recreate', containers),673 start=False)674 service_containers = service.containers(stopped=True)675 assert len(service_containers) == 1676 assert not service_containers[0].is_running677 service.execute_convergence_plan(ConvergencePlan('start', containers), start=False)678 service_containers = service.containers(stopped=True)679 assert len(service_containers) == 1680 assert not service_containers[0].is_running681 def test_execute_convergence_plan_image_with_volume_is_removed(self):682 service = self.create_service(683 'db', build={'context': 'tests/fixtures/dockerfile-with-volume'}684 )685 old_container = create_and_start_container(service)686 assert (687 [mount['Destination'] for mount in old_container.get('Mounts')] ==688 ['/data']689 )690 volume_path = old_container.get_mount('/data')['Source']691 old_container.stop()692 self.client.remove_image(service.image(), force=True)693 service.ensure_image_exists()694 with pytest.raises(ImageNotFound):695 service.execute_convergence_plan(696 ConvergencePlan('recreate', [old_container])697 )698 old_container.inspect() # retrieve new name from server699 new_container, = service.execute_convergence_plan(700 ConvergencePlan('recreate', [old_container]),701 reset_container_image=True702 )703 assert [mount['Destination'] for mount in new_container.get('Mounts')] == ['/data']704 assert new_container.get_mount('/data')['Source'] == volume_path705 def test_start_container_passes_through_options(self):706 db = self.create_service('db')707 create_and_start_container(db, environment={'FOO': 'BAR'})708 assert db.containers()[0].environment['FOO'] == 'BAR'709 def test_start_container_inherits_options_from_constructor(self):710 db = self.create_service('db', environment={'FOO': 'BAR'})711 create_and_start_container(db)712 assert db.containers()[0].environment['FOO'] == 'BAR'713 @no_cluster('No legacy links support in Swarm')714 def test_start_container_creates_links(self):715 db = self.create_service('db')716 web = self.create_service('web', links=[(db, None)])717 db1 = create_and_start_container(db)718 db2 = create_and_start_container(db)719 create_and_start_container(web)720 assert set(get_links(web.containers()[0])) == {721 db1.name, db1.name_without_project,722 db2.name, db2.name_without_project,723 'db'724 }725 @no_cluster('No legacy links support in Swarm')726 def test_start_container_creates_links_with_names(self):727 db = self.create_service('db')728 web = self.create_service('web', links=[(db, 'custom_link_name')])729 db1 = create_and_start_container(db)730 db2 = create_and_start_container(db)731 create_and_start_container(web)732 assert set(get_links(web.containers()[0])) == {733 db1.name, db1.name_without_project,734 db2.name, db2.name_without_project,735 'custom_link_name'736 }737 @no_cluster('No legacy links support in Swarm')738 def test_start_container_with_external_links(self):739 db = self.create_service('db')740 db_ctnrs = [create_and_start_container(db) for _ in range(3)]741 web = self.create_service(742 'web', external_links=[743 db_ctnrs[0].name,744 db_ctnrs[1].name,745 '{}:db_3'.format(db_ctnrs[2].name)746 ]747 )748 create_and_start_container(web)749 assert set(get_links(web.containers()[0])) == {750 db_ctnrs[0].name,751 db_ctnrs[1].name,752 'db_3'753 }754 @no_cluster('No legacy links support in Swarm')755 def test_start_normal_container_does_not_create_links_to_its_own_service(self):756 db = self.create_service('db')757 create_and_start_container(db)758 create_and_start_container(db)759 c = create_and_start_container(db)760 assert set(get_links(c)) == set()761 @no_cluster('No legacy links support in Swarm')762 def test_start_one_off_container_creates_links_to_its_own_service(self):763 db = self.create_service('db')764 db1 = create_and_start_container(db)765 db2 = create_and_start_container(db)766 c = create_and_start_container(db, one_off=OneOffFilter.only)767 assert set(get_links(c)) == {768 db1.name, db1.name_without_project,769 db2.name, db2.name_without_project,770 'db'771 }772 def test_start_container_builds_images(self):773 service = Service(774 name='test',775 client=self.client,776 build={'context': 'tests/fixtures/simple-dockerfile'},777 project='composetest',778 )779 container = create_and_start_container(service)780 container.wait()781 assert b'success' in container.logs()782 assert len(self.client.images(name='composetest_test')) >= 1783 def test_start_container_uses_tagged_image_if_it_exists(self):784 self.check_build('tests/fixtures/simple-dockerfile', tag='composetest_test')785 service = Service(786 name='test',787 client=self.client,788 build={'context': 'this/does/not/exist/and/will/throw/error'},789 project='composetest',790 )791 container = create_and_start_container(service)792 container.wait()793 assert b'success' in container.logs()794 def test_start_container_creates_ports(self):795 service = self.create_service('web', ports=[8000])796 container = create_and_start_container(service).inspect()797 assert list(container['NetworkSettings']['Ports'].keys()) == ['8000/tcp']798 assert container['NetworkSettings']['Ports']['8000/tcp'][0]['HostPort'] != '8000'799 def test_build(self):800 base_dir = tempfile.mkdtemp()801 self.addCleanup(shutil.rmtree, base_dir)802 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:803 f.write("FROM busybox\n")804 service = self.create_service('web',805 build={'context': base_dir},806 environment={807 'COMPOSE_DOCKER_CLI_BUILD': '0',808 'DOCKER_BUILDKIT': '0',809 })810 service.build()811 self.addCleanup(self.client.remove_image, service.image_name)812 assert self.client.inspect_image('composetest_web')813 def test_build_cli(self):814 base_dir = tempfile.mkdtemp()815 self.addCleanup(shutil.rmtree, base_dir)816 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:817 f.write("FROM busybox\n")818 service = self.create_service('web',819 build={'context': base_dir},820 environment={821 'DOCKER_BUILDKIT': '1',822 })823 service.build(cli=True)824 self.addCleanup(self.client.remove_image, service.image_name)825 assert self.client.inspect_image('composetest_web')826 def test_build_cli_with_build_labels(self):827 base_dir = tempfile.mkdtemp()828 self.addCleanup(shutil.rmtree, base_dir)829 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:830 f.write("FROM busybox\n")831 service = self.create_service('web',832 build={833 'context': base_dir,834 'labels': {'com.docker.compose.test': 'true'}},835 )836 service.build(cli=True)837 self.addCleanup(self.client.remove_image, service.image_name)838 image = self.client.inspect_image('composetest_web')839 assert image['Config']['Labels']['com.docker.compose.test']840 def test_build_cli_with_build_error(self):841 base_dir = tempfile.mkdtemp()842 self.addCleanup(shutil.rmtree, base_dir)843 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:844 f.write('\n'.join([845 "FROM busybox",846 "RUN exit 2",847 ]))848 service = self.create_service('web',849 build={850 'context': base_dir,851 'labels': {'com.docker.compose.test': 'true'}},852 )853 with pytest.raises(BuildError):854 service.build(cli=True)855 def test_up_build_cli(self):856 base_dir = tempfile.mkdtemp()857 self.addCleanup(shutil.rmtree, base_dir)858 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:859 f.write("FROM busybox\n")860 web = self.create_service('web',861 build={'context': base_dir},862 environment={863 'DOCKER_BUILDKIT': '1',864 })865 project = Project('composetest', [web], self.client)866 project.up(do_build=BuildAction.force)867 containers = project.containers(['web'])868 assert len(containers) == 1869 assert containers[0].name.startswith('composetest_web_')870 def test_build_non_ascii_filename(self):871 base_dir = tempfile.mkdtemp()872 self.addCleanup(shutil.rmtree, base_dir)873 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:874 f.write("FROM busybox\n")875 with open(os.path.join(base_dir.encode('utf8'), b'foo\xE2bar'), 'w') as f:876 f.write("hello world\n")877 service = self.create_service('web', build={'context': str(base_dir)})878 service.build()879 self.addCleanup(self.client.remove_image, service.image_name)880 assert self.client.inspect_image('composetest_web')881 def test_build_with_image_name(self):882 base_dir = tempfile.mkdtemp()883 self.addCleanup(shutil.rmtree, base_dir)884 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:885 f.write("FROM busybox\n")886 image_name = 'examples/composetest:latest'887 self.addCleanup(self.client.remove_image, image_name)888 self.create_service('web', build={'context': base_dir}, image=image_name).build()889 assert self.client.inspect_image(image_name)890 def test_build_with_git_url(self):891 build_url = "https://github.com/dnephin/docker-build-from-url.git"892 service = self.create_service('buildwithurl', build={'context': build_url})893 self.addCleanup(self.client.remove_image, service.image_name)894 service.build()895 assert service.image()896 def test_build_with_build_args(self):897 base_dir = tempfile.mkdtemp()898 self.addCleanup(shutil.rmtree, base_dir)899 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:900 f.write("FROM busybox\n")901 f.write("ARG build_version\n")902 f.write("RUN echo ${build_version}\n")903 service = self.create_service('buildwithargs',904 build={'context': str(base_dir),905 'args': {"build_version": "1"}})906 service.build()907 self.addCleanup(self.client.remove_image, service.image_name)908 assert service.image()909 assert "build_version=1" in service.image()['ContainerConfig']['Cmd']910 def test_build_with_build_args_override(self):911 base_dir = tempfile.mkdtemp()912 self.addCleanup(shutil.rmtree, base_dir)913 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:914 f.write("FROM busybox\n")915 f.write("ARG build_version\n")916 f.write("RUN echo ${build_version}\n")917 service = self.create_service('buildwithargs',918 build={'context': str(base_dir),919 'args': {"build_version": "1"}})920 service.build(build_args_override={'build_version': '2'})921 self.addCleanup(self.client.remove_image, service.image_name)922 assert service.image()923 assert "build_version=2" in service.image()['ContainerConfig']['Cmd']924 def test_build_with_build_labels(self):925 base_dir = tempfile.mkdtemp()926 self.addCleanup(shutil.rmtree, base_dir)927 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:928 f.write('FROM busybox\n')929 service = self.create_service('buildlabels', build={930 'context': str(base_dir),931 'labels': {'com.docker.compose.test': 'true'}932 })933 service.build()934 self.addCleanup(self.client.remove_image, service.image_name)935 assert service.image()936 assert service.image()['Config']['Labels']['com.docker.compose.test'] == 'true'937 @no_cluster('Container networks not on Swarm')938 def test_build_with_network(self):939 base_dir = tempfile.mkdtemp()940 self.addCleanup(shutil.rmtree, base_dir)941 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:942 f.write('FROM busybox\n')943 f.write('RUN ping -c1 google.local\n')944 net_container = self.client.create_container(945 'busybox', 'top', host_config=self.client.create_host_config(946 extra_hosts={'google.local': '127.0.0.1'}947 ), name='composetest_build_network'948 )949 self.addCleanup(self.client.remove_container, net_container, force=True)950 self.client.start(net_container)951 service = self.create_service('buildwithnet', build={952 'context': str(base_dir),953 'network': 'container:{}'.format(net_container['Id'])954 })955 service.build()956 self.addCleanup(self.client.remove_image, service.image_name)957 assert service.image()958 @no_cluster('Not supported on UCP 2.2.0-beta1') # FIXME: remove once support is added959 def test_build_with_target(self):960 self.require_api_version('1.30')961 base_dir = tempfile.mkdtemp()962 self.addCleanup(shutil.rmtree, base_dir)963 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:964 f.write('FROM busybox as one\n')965 f.write('LABEL com.docker.compose.test=true\n')966 f.write('LABEL com.docker.compose.test.target=one\n')967 f.write('FROM busybox as two\n')968 f.write('LABEL com.docker.compose.test.target=two\n')969 service = self.create_service('buildtarget', build={970 'context': str(base_dir),971 'target': 'one'972 })973 service.build()974 assert service.image()975 assert service.image()['Config']['Labels']['com.docker.compose.test.target'] == 'one'976 def test_build_with_extra_hosts(self):977 self.require_api_version('1.27')978 base_dir = tempfile.mkdtemp()979 self.addCleanup(shutil.rmtree, base_dir)980 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:981 f.write('\n'.join([982 'FROM busybox',983 'RUN ping -c1 foobar',984 'RUN ping -c1 baz',985 ]))986 service = self.create_service('build_extra_hosts', build={987 'context': str(base_dir),988 'extra_hosts': {989 'foobar': '127.0.0.1',990 'baz': '127.0.0.1'991 }992 })993 service.build()994 assert service.image()995 def test_build_with_gzip(self):996 base_dir = tempfile.mkdtemp()997 self.addCleanup(shutil.rmtree, base_dir)998 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:999 f.write('\n'.join([1000 'FROM busybox',1001 'COPY . /src',1002 'RUN cat /src/hello.txt'1003 ]))1004 with open(os.path.join(base_dir, 'hello.txt'), 'w') as f:1005 f.write('hello world\n')1006 service = self.create_service('build_gzip', build={1007 'context': str(base_dir),1008 })1009 service.build(gzip=True)1010 assert service.image()1011 def test_build_with_isolation(self):1012 base_dir = tempfile.mkdtemp()1013 self.addCleanup(shutil.rmtree, base_dir)1014 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:1015 f.write('FROM busybox\n')1016 service = self.create_service('build_isolation', build={1017 'context': str(base_dir),1018 'isolation': 'default',1019 })1020 service.build()1021 assert service.image()1022 def test_build_with_illegal_leading_chars(self):1023 base_dir = tempfile.mkdtemp()1024 self.addCleanup(shutil.rmtree, base_dir)1025 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:1026 f.write('FROM busybox\nRUN echo "Embodiment of Scarlet Devil"\n')1027 service = Service(1028 'build_leading_slug', client=self.client,1029 project='___-composetest', build={1030 'context': str(base_dir)1031 }1032 )1033 assert service.image_name == 'composetest_build_leading_slug'1034 service.build()1035 assert service.image()1036 def test_start_container_stays_unprivileged(self):1037 service = self.create_service('web')1038 container = create_and_start_container(service).inspect()1039 assert container['HostConfig']['Privileged'] is False1040 def test_start_container_becomes_privileged(self):1041 service = self.create_service('web', privileged=True)1042 container = create_and_start_container(service).inspect()1043 assert container['HostConfig']['Privileged'] is True1044 def test_expose_does_not_publish_ports(self):1045 service = self.create_service('web', expose=["8000"])1046 container = create_and_start_container(service).inspect()1047 assert container['NetworkSettings']['Ports'] == {'8000/tcp': None}1048 def test_start_container_creates_port_with_explicit_protocol(self):1049 service = self.create_service('web', ports=['8000/udp'])1050 container = create_and_start_container(service).inspect()1051 assert list(container['NetworkSettings']['Ports'].keys()) == ['8000/udp']1052 def test_start_container_creates_fixed_external_ports(self):1053 service = self.create_service('web', ports=['8000:8000'])1054 container = create_and_start_container(service).inspect()1055 assert '8000/tcp' in container['NetworkSettings']['Ports']1056 assert container['NetworkSettings']['Ports']['8000/tcp'][0]['HostPort'] == '8000'1057 def test_start_container_creates_fixed_external_ports_when_it_is_different_to_internal_port(self):1058 service = self.create_service('web', ports=['8001:8000'])1059 container = create_and_start_container(service).inspect()1060 assert '8000/tcp' in container['NetworkSettings']['Ports']1061 assert container['NetworkSettings']['Ports']['8000/tcp'][0]['HostPort'] == '8001'1062 def test_port_with_explicit_interface(self):1063 service = self.create_service('web', ports=[1064 '127.0.0.1:8001:8000',1065 '0.0.0.0:9001:9000/udp',1066 ])1067 container = create_and_start_container(service).inspect()1068 assert container['NetworkSettings']['Ports']['8000/tcp'] == [{1069 'HostIp': '127.0.0.1',1070 'HostPort': '8001',1071 }]1072 assert container['NetworkSettings']['Ports']['9000/udp'][0]['HostPort'] == '9001'1073 if not is_cluster(self.client):1074 assert container['NetworkSettings']['Ports']['9000/udp'][0]['HostIp'] == '0.0.0.0'1075 # self.assertEqual(container['NetworkSettings']['Ports'], {1076 # '8000/tcp': [1077 # {1078 # 'HostIp': '127.0.0.1',1079 # 'HostPort': '8001',1080 # },1081 # ],1082 # '9000/udp': [1083 # {1084 # 'HostIp': '0.0.0.0',1085 # 'HostPort': '9001',1086 # },1087 # ],1088 # })1089 def test_create_with_image_id(self):1090 pull_busybox(self.client)1091 image_id = self.client.inspect_image(BUSYBOX_IMAGE_WITH_TAG)['Id'][:12]1092 service = self.create_service('foo', image=image_id)1093 service.create_container()1094 def test_scale(self):1095 service = self.create_service('web')1096 service.scale(1)1097 assert len(service.containers()) == 11098 # Ensure containers don't have stdout or stdin connected1099 container = service.containers()[0]1100 config = container.inspect()['Config']1101 assert not config['AttachStderr']1102 assert not config['AttachStdout']1103 assert not config['AttachStdin']1104 service.scale(3)1105 assert len(service.containers()) == 31106 service.scale(1)1107 assert len(service.containers()) == 11108 service.scale(0)1109 assert len(service.containers()) == 01110 @pytest.mark.skipif(1111 SWARM_SKIP_CONTAINERS_ALL,1112 reason='Swarm /containers/json bug'1113 )1114 def test_scale_with_stopped_containers(self):1115 """1116 Given there are some stopped containers and scale is called with a1117 desired number that is the same as the number of stopped containers,1118 test that those containers are restarted and not removed/recreated.1119 """1120 service = self.create_service('web')1121 service.create_container(number=1)1122 service.create_container(number=2)1123 ParallelStreamWriter.instance = None1124 with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr:1125 service.scale(2)1126 for container in service.containers():1127 assert container.is_running1128 assert container.number in [1, 2]1129 captured_output = mock_stderr.getvalue()1130 assert 'Creating' not in captured_output1131 assert 'Starting' in captured_output1132 def test_scale_with_stopped_containers_and_needing_creation(self):1133 """1134 Given there are some stopped containers and scale is called with a1135 desired number that is greater than the number of stopped containers,1136 test that those containers are restarted and required number are created.1137 """1138 service = self.create_service('web')1139 next_number = service._next_container_number()1140 service.create_container(number=next_number, quiet=True)1141 for container in service.containers():1142 assert not container.is_running1143 ParallelStreamWriter.instance = None1144 with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr:1145 service.scale(2)1146 assert len(service.containers()) == 21147 for container in service.containers():1148 assert container.is_running1149 captured_output = mock_stderr.getvalue()1150 assert 'Creating' in captured_output1151 assert 'Starting' in captured_output1152 def test_scale_with_api_error(self):1153 """Test that when scaling if the API returns an error, that error is handled1154 and the remaining threads continue.1155 """1156 service = self.create_service('web')1157 next_number = service._next_container_number()1158 service.create_container(number=next_number, quiet=True)1159 with mock.patch(1160 'compose.container.Container.create',1161 side_effect=APIError(1162 message="testing",1163 response={},1164 explanation="Boom")):1165 with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr:1166 with pytest.raises(OperationFailedError):1167 service.scale(3)1168 assert len(service.containers()) == 11169 assert service.containers()[0].is_running1170 assert "ERROR: for composetest_web_" in mock_stderr.getvalue()1171 assert "Cannot create container for service web: Boom" in mock_stderr.getvalue()1172 def test_scale_with_unexpected_exception(self):1173 """Test that when scaling if the API returns an error, that is not of type1174 APIError, that error is re-raised.1175 """1176 service = self.create_service('web')1177 next_number = service._next_container_number()1178 service.create_container(number=next_number, quiet=True)1179 with mock.patch(1180 'compose.container.Container.create',1181 side_effect=ValueError("BOOM")1182 ):1183 with pytest.raises(ValueError):1184 service.scale(3)1185 assert len(service.containers()) == 11186 assert service.containers()[0].is_running1187 @mock.patch('compose.service.log')1188 def test_scale_with_desired_number_already_achieved(self, mock_log):1189 """1190 Test that calling scale with a desired number that is equal to the1191 number of containers already running results in no change.1192 """1193 service = self.create_service('web')1194 next_number = service._next_container_number()1195 container = service.create_container(number=next_number, quiet=True)1196 container.start()1197 container.inspect()1198 assert container.is_running1199 assert len(service.containers()) == 11200 service.scale(1)1201 assert len(service.containers()) == 11202 container.inspect()1203 assert container.is_running1204 captured_output = mock_log.info.call_args[0]1205 assert 'Desired container number already achieved' in captured_output1206 @mock.patch('compose.service.log')1207 def test_scale_with_custom_container_name_outputs_warning(self, mock_log):1208 """Test that calling scale on a service that has a custom container name1209 results in warning output.1210 """1211 service = self.create_service('app', container_name='custom-container')1212 assert service.custom_container_name == 'custom-container'1213 with pytest.raises(OperationFailedError):1214 service.scale(3)1215 captured_output = mock_log.warning.call_args[0][0]1216 assert len(service.containers()) == 11217 assert "Remove the custom name to scale the service." in captured_output1218 def test_scale_sets_ports(self):1219 service = self.create_service('web', ports=['8000'])1220 service.scale(2)1221 containers = service.containers()1222 assert len(containers) == 21223 for container in containers:1224 assert list(container.get('HostConfig.PortBindings')) == ['8000/tcp']1225 def test_scale_with_immediate_exit(self):1226 service = self.create_service('web', image='busybox', command='true')1227 service.scale(2)1228 assert len(service.containers(stopped=True)) == 21229 def test_network_mode_none(self):1230 service = self.create_service('web', network_mode=NetworkMode('none'))1231 container = create_and_start_container(service)1232 assert container.get('HostConfig.NetworkMode') == 'none'1233 def test_network_mode_bridged(self):1234 service = self.create_service('web', network_mode=NetworkMode('bridge'))1235 container = create_and_start_container(service)1236 assert container.get('HostConfig.NetworkMode') == 'bridge'1237 def test_network_mode_host(self):1238 service = self.create_service('web', network_mode=NetworkMode('host'))1239 container = create_and_start_container(service)1240 assert container.get('HostConfig.NetworkMode') == 'host'1241 def test_pid_mode_none_defined(self):1242 service = self.create_service('web', pid_mode=None)1243 container = create_and_start_container(service)1244 assert container.get('HostConfig.PidMode') == ''1245 def test_pid_mode_host(self):1246 service = self.create_service('web', pid_mode=PidMode('host'))1247 container = create_and_start_container(service)1248 assert container.get('HostConfig.PidMode') == 'host'1249 def test_ipc_mode_none_defined(self):1250 service = self.create_service('web', ipc_mode=None)1251 container = create_and_start_container(service)1252 print(container.get('HostConfig.IpcMode'))1253 assert container.get('HostConfig.IpcMode') == 'shareable'1254 def test_ipc_mode_host(self):1255 service = self.create_service('web', ipc_mode=IpcMode('host'))1256 container = create_and_start_container(service)1257 assert container.get('HostConfig.IpcMode') == 'host'1258 def test_userns_mode_none_defined(self):1259 service = self.create_service('web', userns_mode=None)1260 container = create_and_start_container(service)1261 assert container.get('HostConfig.UsernsMode') == ''1262 def test_userns_mode_host(self):1263 service = self.create_service('web', userns_mode='host')1264 container = create_and_start_container(service)1265 assert container.get('HostConfig.UsernsMode') == 'host'1266 def test_dns_no_value(self):1267 service = self.create_service('web')1268 container = create_and_start_container(service)1269 assert container.get('HostConfig.Dns') is None1270 def test_dns_list(self):1271 service = self.create_service('web', dns=['8.8.8.8', '9.9.9.9'])1272 container = create_and_start_container(service)1273 assert container.get('HostConfig.Dns') == ['8.8.8.8', '9.9.9.9']1274 def test_mem_swappiness(self):1275 service = self.create_service('web', mem_swappiness=11)1276 container = create_and_start_container(service)1277 assert container.get('HostConfig.MemorySwappiness') == 111278 def test_mem_reservation(self):1279 service = self.create_service('web', mem_reservation='20m')1280 container = create_and_start_container(service)1281 assert container.get('HostConfig.MemoryReservation') == 20 * 1024 * 10241282 def test_restart_always_value(self):1283 service = self.create_service('web', restart={'Name': 'always'})1284 container = create_and_start_container(service)1285 assert container.get('HostConfig.RestartPolicy.Name') == 'always'1286 def test_oom_score_adj_value(self):1287 service = self.create_service('web', oom_score_adj=500)1288 container = create_and_start_container(service)1289 assert container.get('HostConfig.OomScoreAdj') == 5001290 def test_group_add_value(self):1291 service = self.create_service('web', group_add=["root", "1"])1292 container = create_and_start_container(service)1293 host_container_groupadd = container.get('HostConfig.GroupAdd')1294 assert "root" in host_container_groupadd1295 assert "1" in host_container_groupadd1296 def test_dns_opt_value(self):1297 service = self.create_service('web', dns_opt=["use-vc", "no-tld-query"])1298 container = create_and_start_container(service)1299 dns_opt = container.get('HostConfig.DnsOptions')1300 assert 'use-vc' in dns_opt1301 assert 'no-tld-query' in dns_opt1302 def test_restart_on_failure_value(self):1303 service = self.create_service('web', restart={1304 'Name': 'on-failure',1305 'MaximumRetryCount': 51306 })1307 container = create_and_start_container(service)1308 assert container.get('HostConfig.RestartPolicy.Name') == 'on-failure'1309 assert container.get('HostConfig.RestartPolicy.MaximumRetryCount') == 51310 def test_cap_add_list(self):1311 service = self.create_service('web', cap_add=['SYS_ADMIN', 'NET_ADMIN'])1312 container = create_and_start_container(service)1313 assert container.get('HostConfig.CapAdd') == ['SYS_ADMIN', 'NET_ADMIN']1314 def test_cap_drop_list(self):1315 service = self.create_service('web', cap_drop=['SYS_ADMIN', 'NET_ADMIN'])1316 container = create_and_start_container(service)1317 assert container.get('HostConfig.CapDrop') == ['SYS_ADMIN', 'NET_ADMIN']1318 def test_dns_search(self):1319 service = self.create_service('web', dns_search=['dc1.example.com', 'dc2.example.com'])1320 container = create_and_start_container(service)1321 assert container.get('HostConfig.DnsSearch') == ['dc1.example.com', 'dc2.example.com']1322 def test_tmpfs(self):1323 service = self.create_service('web', tmpfs=['/run'])1324 container = create_and_start_container(service)1325 assert container.get('HostConfig.Tmpfs') == {'/run': ''}1326 def test_working_dir_param(self):1327 service = self.create_service('container', working_dir='/working/dir/sample')1328 container = service.create_container()1329 assert container.get('Config.WorkingDir') == '/working/dir/sample'1330 def test_split_env(self):1331 service = self.create_service(1332 'web',1333 environment=['NORMAL=F1', 'CONTAINS_EQUALS=F=2', 'TRAILING_EQUALS='])1334 env = create_and_start_container(service).environment1335 for k, v in {'NORMAL': 'F1', 'CONTAINS_EQUALS': 'F=2', 'TRAILING_EQUALS': ''}.items():1336 assert env[k] == v1337 def test_env_from_file_combined_with_env(self):1338 service = self.create_service(1339 'web',1340 environment=['ONE=1', 'TWO=2', 'THREE=3'],1341 env_file=['tests/fixtures/env/one.env', 'tests/fixtures/env/two.env'])1342 env = create_and_start_container(service).environment1343 for k, v in {1344 'ONE': '1',1345 'TWO': '2',1346 'THREE': '3',1347 'FOO': 'baz',1348 'DOO': 'dah'1349 }.items():1350 assert env[k] == v1351 def test_build_with_cachefrom(self):1352 base_dir = tempfile.mkdtemp()1353 self.addCleanup(shutil.rmtree, base_dir)1354 with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:1355 f.write("FROM busybox\n")1356 service = self.create_service('cache_from',1357 build={'context': base_dir,1358 'cache_from': ['build1']})1359 service.build()1360 self.addCleanup(self.client.remove_image, service.image_name)1361 assert service.image()1362 @mock.patch.dict(os.environ)1363 def test_resolve_env(self):1364 os.environ['FILE_DEF'] = 'E1'1365 os.environ['FILE_DEF_EMPTY'] = 'E2'1366 os.environ['ENV_DEF'] = 'E3'1367 service = self.create_service(1368 'web',1369 environment={1370 'FILE_DEF': 'F1',1371 'FILE_DEF_EMPTY': '',1372 'ENV_DEF': None,1373 'NO_DEF': None1374 }1375 )1376 env = create_and_start_container(service).environment1377 for k, v in {1378 'FILE_DEF': 'F1',1379 'FILE_DEF_EMPTY': '',1380 'ENV_DEF': 'E3',1381 'NO_DEF': None1382 }.items():1383 assert env[k] == v1384 def test_with_high_enough_api_version_we_get_default_network_mode(self):1385 # TODO: remove this test once minimum docker version is 1.8.x1386 with mock.patch.object(self.client, '_version', '1.20'):1387 service = self.create_service('web')1388 service_config = service._get_container_host_config({})1389 assert service_config['NetworkMode'] == 'default'1390 def test_labels(self):1391 labels_dict = {1392 'com.example.description': "Accounting webapp",1393 'com.example.department': "Finance",1394 'com.example.label-with-empty-value': "",1395 }1396 compose_labels = {1397 LABEL_ONE_OFF: 'False',1398 LABEL_PROJECT: 'composetest',1399 LABEL_SERVICE: 'web',1400 LABEL_VERSION: __version__,1401 LABEL_CONTAINER_NUMBER: '1'1402 }1403 expected = dict(labels_dict, **compose_labels)1404 service = self.create_service('web', labels=labels_dict)1405 ctnr = create_and_start_container(service)1406 labels = ctnr.labels.items()1407 for pair in expected.items():1408 assert pair in labels1409 def test_empty_labels(self):1410 labels_dict = {'foo': '', 'bar': ''}1411 service = self.create_service('web', labels=labels_dict)1412 labels = create_and_start_container(service).labels.items()1413 for name in labels_dict:1414 assert (name, '') in labels1415 def test_stop_signal(self):1416 stop_signal = 'SIGINT'1417 service = self.create_service('web', stop_signal=stop_signal)1418 container = create_and_start_container(service)1419 assert container.stop_signal == stop_signal1420 def test_custom_container_name(self):1421 service = self.create_service('web', container_name='my-web-container')1422 assert service.custom_container_name == 'my-web-container'1423 container = create_and_start_container(service)1424 assert container.name == 'my-web-container'1425 one_off_container = service.create_container(one_off=True)1426 assert one_off_container.name != 'my-web-container'1427 @pytest.mark.skipif(True, reason="Broken on 1.11.0 - 17.03.0")1428 def test_log_drive_invalid(self):1429 service = self.create_service('web', logging={'driver': 'xxx'})1430 expected_error_msg = "logger: no log driver named 'xxx' is registered"1431 with pytest.raises(APIError) as excinfo:1432 create_and_start_container(service)1433 assert re.search(expected_error_msg, excinfo.value)1434 def test_log_drive_empty_default_jsonfile(self):1435 service = self.create_service('web')1436 log_config = create_and_start_container(service).log_config1437 assert 'json-file' == log_config['Type']1438 assert not log_config['Config']1439 def test_log_drive_none(self):1440 service = self.create_service('web', logging={'driver': 'none'})1441 log_config = create_and_start_container(service).log_config1442 assert 'none' == log_config['Type']1443 assert not log_config['Config']1444 def test_devices(self):1445 service = self.create_service('web', devices=["/dev/random:/dev/mapped-random"])1446 device_config = create_and_start_container(service).get('HostConfig.Devices')1447 device_dict = {1448 'PathOnHost': '/dev/random',1449 'CgroupPermissions': 'rwm',1450 'PathInContainer': '/dev/mapped-random'1451 }1452 assert 1 == len(device_config)1453 assert device_dict == device_config[0]1454 def test_duplicate_containers(self):1455 service = self.create_service('web')1456 options = service._get_container_create_options({}, service._next_container_number())1457 original = Container.create(service.client, **options)1458 assert set(service.containers(stopped=True)) == {original}1459 assert set(service.duplicate_containers()) == set()1460 options['name'] = 'temporary_container_name'1461 duplicate = Container.create(service.client, **options)1462 assert set(service.containers(stopped=True)) == {original, duplicate}1463 assert set(service.duplicate_containers()) == {duplicate}1464def converge(service, strategy=ConvergenceStrategy.changed):1465 """Create a converge plan from a strategy and execute the plan."""1466 plan = service.convergence_plan(strategy)1467 return service.execute_convergence_plan(plan, timeout=1)1468class ConfigHashTest(DockerClientTestCase):1469 def test_no_config_hash_when_one_off(self):1470 web = self.create_service('web')1471 container = web.create_container(one_off=True)1472 assert LABEL_CONFIG_HASH not in container.labels1473 def test_no_config_hash_when_overriding_options(self):1474 web = self.create_service('web')1475 container = web.create_container(environment={'FOO': '1'})1476 assert LABEL_CONFIG_HASH not in container.labels1477 def test_config_hash_with_custom_labels(self):1478 web = self.create_service('web', labels={'foo': '1'})1479 container = converge(web)[0]1480 assert LABEL_CONFIG_HASH in container.labels1481 assert 'foo' in container.labels1482 def test_config_hash_sticks_around(self):1483 web = self.create_service('web', command=["top"])1484 container = converge(web)[0]1485 assert LABEL_CONFIG_HASH in container.labels1486 web = self.create_service('web', command=["top", "-d", "1"])1487 container = converge(web)[0]...

Full Screen

Full Screen

start-services.py

Source:start-services.py Github

copy

Full Screen

...45 for service in services:46 if service.name == name:47 return service48 return None49def create_service(name, replicas, postfix):50 service = get_service(name)51 if service is not None:52 update_replicas(service, replicas)53 else:54 run_service(name, replicas, postfix)55def create_service_telegraf(name, postfix):56 subprocess.run(["bash", "start-services_exec.sh"] + global_params + ["create_service_telegraf", name])57def create_service_telegraf_on_master(name, postfix):58 subprocess.run(["bash", "start-services_exec.sh"] + global_params + ["create_service_telegraf_on_master", name])59def rm_service(name, postfix):60 # subprocess.run(["docker", "service", "rm", name])61 subprocess.run(["bash", "start-services_exec.sh"] + global_params + ["rm_service", name])62def create_secrets(name):63 # subprocess.run(["docker", "service", "rm", name])64 subprocess.run(["bash", "start-services_exec.sh"] + global_params + ["create_secrets" + "_" + name])65def create_network():66 client.networks.create("smartmeter", driver="overlay")67## RUN SCENARIO ##68def run(steps):69 if not isinstance(steps[0], list):70 steps = [steps]71 for step in steps:72 print()73 print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")74 print(step)75 if step[0] == "create_service" :76 create_service(step[1], step[2], postfix)77 elif step[0] == "rm_service" :78 rm_service(step[1], postfix)79 elif step[0] == "create_secrets" :80 create_secrets(step[1])81 elif step[0] == "create_service_telegraf" :82 create_service_telegraf(step[1], postfix)83 elif step[0] == "create_service_telegraf_on_master" :84 create_service_telegraf_on_master(step[1], postfix)85 else:86 call(step[0], step[1], step[2:])87def run_or_kill(steps):88 if not isinstance(steps[0], list):89 steps = [steps]90 # Collect all existing services names...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful