Best Python code snippet using localstack_python
test_allocation.py
Source:test_allocation.py  
1def test_inactive_agent(super_client, new_context):2	host = super_client.reload(new_context.host)3	agent = host.agent()4	c = new_context.create_container()5	assert c.state == "running"6	agent = super_client.wait_success(agent.deactivate())7	assert agent.state == "inactive"8	c = new_context.create_container_no_success()9	assert c.transitioning == "error"10	assert c.transitioningMessage == "Allocation failed: No healthy hosts with sufficient " "resources available"11	assert c.state == "error"12def test_allocation_with_shared_storage_pool(super_client, new_context):13	count = 314	client = new_context.client15	host2 = register_simulated_host(client)16	register_simulated_host(client)17	hosts = [new_context.host, host2]18	hosts = wait_all_success(super_client, hosts)19	sp = add_storage_pool(new_context, [new_context.host.uuid, host2.uuid])20	sp_name = sp.name21	for h in hosts:22		assert h.state == "active"23		assert h.agent().state == "active"24		assert len(h.storagePools()) == 225		assert h.storagePools()[0].state == "active"26		assert h.storagePools()[1].state == "active"27	v1 = client.create_volume(name=random_str(), driver=sp_name)28	v1 = client.wait_success(v1)29	assert v1.state == "inactive"30	data_volume_mounts = {"/con/path": v1.id}31	containers = []32	for _ in range(len(hosts) * count):33		c = client.create_container(imageUuid=new_context.image_uuid, dataVolumeMounts=data_volume_mounts)34		containers.append(c)35		time.sleep(1)36	wait_all_success(super_client, containers, timeout=60)37	for c in containers:38		new_context.wait_for_state(c, "running")39def test_allocate_to_host_with_pool(new_context, super_client):40	client = new_context.client41	host = new_context.host42	host2 = register_simulated_host(client)43	sp = add_storage_pool(new_context)44	sp_name = sp.name45	assert len(host.storagePools()) == 246	assert len(host2.storagePools()) == 147	c = new_context.create_container_no_success(imageUuid=new_context.image_uuid, volumeDriver=sp_name, requestedHostId=host2.id, dataVolume=["vol1:/con/path"])48	c = super_client.reload(c)49	assert c.state == "error"50	assert c.transitioning == "error"51	assert c.transitioningMessage.startswith("Allocation failed: valid host(s) [")52def test_allocation_stay_associated_to_host(super_client, context):53	c = context.create_container()54	c = context.client.wait_success(c.stop())55	assert c.state == "stopped"56	assert len(c.hosts()) == 157def test_port_constraint(new_context):58	host1 = new_context.host59	client = new_context.client60	image_uuid = new_context.image_uuid61	containers = []62	try:63		c = client.wait_success(client.create_container(imageUuid=image_uuid, requestedHostId=host1.id, ports=["8081:81/tcp"]))64		containers.append(c)65		c2 = client.wait_transitioning(client.create_container(imageUuid=image_uuid, ports=["8081:81/tcp"]))66		assert c2.transitioning == "error"67		assert "Allocation failed: host needs ports 8081/tcp available" in c2.transitioningMessage68		assert c2.state == "error"69		c3 = new_context.super_create_container(imageUuid=image_uuid, ports=["8082:81/tcp"])70		containers.append(c3)71		c4 = client.wait_success(client.create_container(imageUuid=image_uuid, ports=["8081:81/udp"]))72		containers.append(c4)73		c5 = client.wait_transitioning(client.create_container(imageUuid=image_uuid, ports=["8081:81/udp"]))74		assert c5.transitioning == "error"75		assert "Allocation failed: host needs ports 8081/udp available" in c5.transitioningMessage76		assert c5.state == "error"77		c6 = client.wait_success(client.create_container(imageUuid=image_uuid, requestedHostId=host1.id, ports=["127.2.2.2:8081:81/tcp"]))78		containers.append(c6)79		c7 = client.wait_transitioning(client.create_container(imageUuid=image_uuid, ports=["127.2.2.2:8081:81/tcp"]))80		assert c7.transitioning == "error"81		assert "Allocation failed: host needs ports 8081/tcp available" in c7.transitioningMessage82		assert c7.state == "error"83		host2 = register_simulated_host(new_context.client)84		c8 = client.wait_success(client.create_container(imageUuid=image_uuid, ports=["8081:81/tcp"]))85		assert c8.hosts()[0].id == host2.id86		containers.append(c8)87	finally:88		for c in containers:89			if c is not None:90				new_context.delete(c)91def test_conflicting_ports_in_deployment_unit(new_context):92	client = new_context.client93	image_uuid = new_context.image_uuid94	client.wait_success(client.create_container(name="reset", imageUuid=image_uuid))95	env = client.create_stack(name=random_str())96	env = client.wait_success(env)97	assert env.state == "active"98	launch_config = {"imageUuid": image_uuid, "ports": ["5555:6666"]}99	secondary_lc = {"imageUuid": image_uuid, "name": "secondary", "ports": ["5555:6666"]}100	svc = client.create_service(name=random_str(), stackId=env.id, launchConfig=launch_config, secondaryLaunchConfigs=[secondary_lc])101	svc = client.wait_success(svc)102	assert svc.state == "inactive"103	svc = svc.activate()104	c = _wait_for_compose_instance_error(client, svc, env)105	assert "Port 5555/tcp requested more than once." in c.transitioningMessage106	env.remove()107def test_simultaneous_port_allocation(new_context):108	client = new_context.client109	image_uuid = new_context.image_uuid110	env = client.create_stack(name=random_str())111	env = client.wait_success(env)112	assert env.state == "active"113	launch_config = {"imageUuid": image_uuid, "ports": ["5555:6666"]}114	svc = client.create_service(name=random_str(), stackId=env.id, launchConfig=launch_config, scale=2)115	svc = client.wait_success(svc)116	assert svc.state == "inactive"117	svc = svc.activate()118	c = _wait_for_compose_instance_error(client, svc, env)119	assert "host needs ports 5555/tcp available" in c.transitioningMessage120def _wait_for_compose_instance_error(client, service, env):121	name = env.name + "-" + service.name + "%"122	def check():123		containers = client.list_container(name_like=name, state="error")124		if len(containers) > 0:125			return containers[0]126	container = wait_for(check)127	return container128def test_request_host_override(new_context):129	host = new_context.host130	c = None131	c2 = None132	try:133		c = new_context.super_create_container(validHostIds=[host.id], ports=["8081:81/tcp"])134		c2 = new_context.super_create_container(requestedHostId=host.id, ports=["8081:81/tcp"])135	finally:136		if c is not None:137			new_context.delete(c)138		if c2 is not None:139			new_context.delete(c2)140def test_host_affinity(super_client, new_context):141	host = new_context.host142	host2 = register_simulated_host(new_context)143	host = super_client.update(host, labels={"size": "huge", "latency": "long"})144	host2 = super_client.update(host2, labels={"size": "tiny", "latency": "short"})145	containers = []146	try:147		c = new_context.create_container(environment={"constraint:size==huge": ""})148		assert c.hosts()[0].id == host.id149		containers.append(c)150		c = new_context.create_container(labels={"io.rancher.scheduler.affinity:host_label": "size=huge"})151		assert c.hosts()[0].id == host.id152		containers.append(c)153		c = new_context.create_container(environment={"constraint:size!=huge": ""})154		assert c.hosts()[0].id == host2.id155		containers.append(c)156		c = new_context.create_container(labels={"io.rancher.scheduler.affinity:host_label_ne": "size=huge"})157		assert c.hosts()[0].id == host2.id158		containers.append(c)159		c = new_context.create_container(environment={"constraint:size==huge": "", "constraint:latency==~short": ""})160		assert c.hosts()[0].id == host.id161		containers.append(c)162		c = new_context.create_container(labels={"io.rancher.scheduler.affinity:host_label": "size=huge", "io.rancher.scheduler.affinity:host_label_soft_ne": "latency=short"})163		assert c.hosts()[0].id == host.id164		containers.append(c)165		c = new_context.create_container(environment={"constraint:latency!=~long": ""})166		assert c.hosts()[0].id == host2.id167		containers.append(c)168		c = new_context.create_container(labels={"io.rancher.scheduler.affinity:host_label_soft_ne": "latency=long"})169		assert c.hosts()[0].id == host2.id170		containers.append(c)171	finally:172		for c in containers:173			new_context.delete(c)174def test_container_affinity(new_context):175	register_simulated_host(new_context)176	containers = []177	try:178		name1 = "affinity" + random_str()179		c1 = new_context.create_container(name=name1)180		containers.append(c1)181		c2 = new_context.create_container(environment={"affinity:container==" + name1: ""})182		containers.append(c2)183		assert c2.hosts()[0].id == c1.hosts()[0].id184		c3 = new_context.create_container(labels={"io.rancher.scheduler.affinity:container": name1})185		containers.append(c3)186		assert c3.hosts()[0].id == c1.hosts()[0].id187		c4 = new_context.create_container(environment={"affinity:container==" + c1.uuid: ""})188		containers.append(c4)189		assert c4.hosts()[0].id == c1.hosts()[0].id190		c5 = new_context.create_container(labels={"io.rancher.scheduler.affinity:container": c1.uuid})191		containers.append(c5)192		assert c5.hosts()[0].id == c1.hosts()[0].id193		c6 = new_context.create_container(environment={"affinity:container!=" + name1: ""})194		containers.append(c6)195		assert c6.hosts()[0].id != c1.hosts()[0].id196		c7 = new_context.create_container(labels={"io.rancher.scheduler.affinity:container_ne": name1})197		containers.append(c7)198		assert c7.hosts()[0].id != c1.hosts()[0].id199	finally:200		for c in containers:201			new_context.delete(c)202def test_container_label_affinity(new_context):203	register_simulated_host(new_context)204	containers = []205	try:206		c1_label = random_str()207		c1 = new_context.create_container(labels={"foo": c1_label})208		containers.append(c1)209		c2 = new_context.create_container(environment={"affinity:foo==" + c1_label: ""})210		containers.append(c2)211		assert c2.hosts()[0].id == c1.hosts()[0].id212		c3 = new_context.create_container(labels={"io.rancher.scheduler.affinity:container_label": "foo=" + c1_label})213		containers.append(c3)214		assert c3.hosts()[0].id == c1.hosts()[0].id215		c4_label = random_str()216		c4 = new_context.create_container(environment={"affinity:foo!=" + c1_label: ""}, labels={"foo": c4_label})217		containers.append(c4)218		assert c4.hosts()[0].id != c1.hosts()[0].id219		c5 = new_context.create_container(environment={"affinity:foo!=" + c1_label: "", "affinity:foo!=~" + c4_label: ""})220		containers.append(c5)221		assert c5.hosts()[0].id == c4.hosts()[0].id222		c6 = new_context.create_container(environment={"affinity:foo!=" + c1_label: ""}, labels={"io.rancher.scheduler.affinity:container_label_soft_ne": "foo=" + c4_label})223		containers.append(c6)224		assert c6.hosts()[0].id == c4.hosts()[0].id225	finally:226		for c in containers:227			new_context.delete(c)228def test_volumes_from_constraint(new_context):229	register_simulated_host(new_context)230	register_simulated_host(new_context)231	containers = []232	try:233		c1 = new_context.create_container_no_success(startOnCreate=False)234		c2 = new_context.create_container_no_success(startOnCreate=False, dataVolumesFrom=[c1.id])235		c1 = c1.start()236		c2 = c2.start()237		c1 = new_context.wait_for_state(c1, "running")238		c2 = new_context.wait_for_state(c2, "running")239		containers.append(c1)240		containers.append(c2)241		assert c1.hosts()[0].id == c2.hosts()[0].id242		c3 = new_context.create_container_no_success(startOnCreate=False)243		c4 = new_context.create_container_no_success(startOnCreate=False, dataVolumesFrom=[c3.id])244		c4 = c4.start()245		c4 = new_context.client.wait_transitioning(c4)246		assert c4.transitioning == "error"247		assert c4.transitioningMessage == "volumeFrom instance is not " "running : Dependencies readiness" " error"248	finally:249		for c in containers:250			new_context.delete(c)251def test_network_mode_constraint(new_context):252	client = new_context.client253	register_simulated_host(new_context)254	register_simulated_host(new_context)255	containers = []256	try:257		c1 = new_context.create_container(startOnCreate=False)258		c2 = new_context.create_container(startOnCreate=False, networkMode="container", networkContainerId=c1.id)259		c1 = client.wait_success(c1.start())260		c2 = client.wait_success(c2.start())261		assert c1.state == "running"262		containers.append(c1)263		assert c1.state == "running"264		containers.append(c2)265		assert c1.hosts()[0].id == c2.hosts()[0].id266	finally:267		for c in containers:...test_link.py
Source:test_link.py  
1from common_fixtures import *2def test_link_instance_stop_start(super_client, client, context):3	target1 = context.create_container(ports=["180", "122/udp"])4	target2 = context.create_container(ports=["280", "222/udp"])5	c = context.create_container(instanceLinks={"target1_link": target1.id, "target2_link": target2.id})6	assert c.state == "running"7	assert len(c.instanceLinks()) > 08def _find_agent_instance_ip(nsp, source):9	assert source is not None10	vnet_id = source.nics()[0].vnetId11	assert vnet_id is not None12	for agent_instance in nsp.instances():13		if agent_instance.nics()[0].vnetId == vnet_id:14			assert agent_instance.primaryIpAddress is not None15			return agent_instance.primaryIpAddress16	assert False, "Failed to find agent instance for " + source.id17def test_link_create(client, super_client, context):18	target1 = context.create_container(ports=["180", "122/udp"])19	target2 = context.create_container(ports=["280", "222/udp"])20	c = context.create_container(instanceLinks={"target1_link": target1.id, "target2_link": target2.id})21	assert c.state == "running"22	assert len(c.instanceLinks()) == 223	assert len(target1.targetInstanceLinks()) == 124	assert len(target2.targetInstanceLinks()) == 125	links = c.instanceLinks()26	names = set([x.linkName for x in links])27	assert names == set(["target1_link", "target2_link"])28def test_link_update(client, context):29	target1 = context.create_container()30	target2 = context.create_container()31	c = context.create_container(instanceLinks={"target1_link": target1.id})32	link = c.instanceLinks()[0]33	assert link.targetInstanceId == target1.id34	link.targetInstanceId = target2.id35	link = client.update(link, link)36	assert link.state == "updating-active"37	link = client.wait_success(link)38	assert link.targetInstanceId == target2.id39	assert link.state == "active"40def test_link_remove(client, context):41	target1 = context.create_container()42	c = client.create_container(imageUuid=context.image_uuid, startOnCreate=False, instanceLinks={"target1_link": target1.id})43	c = client.wait_success(c)44	links = c.instanceLinks()45	assert len(links) == 146	link = links[0]47	assert link.state == "inactive"48	c = client.wait_success(c.start())49	link = client.reload(link)50	assert c.state == "running"51	assert link.state == "active"52	c = client.wait_success(c.stop())53	link = client.reload(link)54	assert c.state == "stopped"55	assert link.state == "inactive"56	c = client.wait_success(client.delete(c))57	link = client.reload(link)58	assert c.state == "removed"59	assert link.state != "active"60def test_null_links(context):61	c = context.create_container(instanceLinks={"null_link": None})62	links = c.instanceLinks()63	assert len(links) == 164	assert links[0].state == "active"65	assert links[0].linkName == "null_link"66	assert links[0].targetInstanceId is None67def test_link_timeout(super_client, client, context):68	t = client.create_container(imageUuid=context.image_uuid, startOnCreate=False)69	c = super_client.create_container(accountId=context.project.id, imageUuid=context.image_uuid, instanceLinks={"t": t.id}, data={"linkWaitTime": 100})70	c = client.wait_transitioning(c)71	assert c.state == "running"72def test_link_remove_instance_restart(client, super_client, context):73	target1 = context.create_container()74	c = client.create_container(imageUuid=context.image_uuid, startOnCreate=False, instanceLinks={"target1_link": target1.id})75	c = client.wait_success(c)76	links = c.instanceLinks()77	assert len(links) == 178	link = links[0]79	assert link.state == "inactive"80	c = client.wait_success(c.start())81	link = client.reload(link)82	assert c.state == "running"83	assert link.state == "active"84	c = client.wait_success(c.stop())85	assert c.state == "stopped"86	link = client.reload(link)87	link = super_client.wait_success(link.remove())88	assert link.state == "removed"...test_container.py
Source:test_container.py  
...10from src.exceptions import CoffeeMachineException11from src.models.container import Container12from src.utils import Refillable13@fixture()14def create_container(request) -> Container:15    """Create Container object for testing."""16    try:17        _container_capacity = request.param18    except AttributeError:19        _container_capacity = TestContainer.capacity_default20    _container = Container(capacity=_container_capacity)21    return _container22class TestContainer:23    capacity_default: Refillable = 100024    def test_initialization(self, create_container: Container) -> None:25        """Test Container object initialization"""26        pass27    def test_initial_attribute_values(self, create_container: Container) -> None:28        """Test checking the initial attribute values of the Container"""...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
