How to use get_service_port method in testcontainers-python

Best Python code snippet using testcontainers-python_python

commands.py

Source:commands.py Github

copy

Full Screen

...13 # ------------------------------------14 # bigbuild get port <user-id> <app-id>15 # ------------------------------------16 if args.get("get") and args.get("port"):17 return get_service_port(uid=args.get('<user-id>'), aid=args.get('<app-id>'))18 # ------------------------------------19 # bigbuild (describe|deploy|destroy|log) app <user-id> <app-id>20 # ------------------------------------21 if args.get("describe") and args.get("app"):22 return describe_app(uid=args.get('<user-id>'), aid=args.get('<app-id>'))23 if args.get("destroy") and args.get("app"):24 return destroy_app(uid=args.get('<user-id>'), aid=args.get('<app-id>'))25 26 if args.get("log") and args.get("app"):27 return log_app(uid=args.get('<user-id>'), aid=args.get('<app-id>'))28 # ------------------------------------29 # bigbuild list apps <user-id>30 # ------------------------------------31 if args.get("list"):32 return list_apps(uid=args.get('<user-id>'))33 # ------------------------------------34 # bigbuild scale <user-id> <app-id> [-r=<num>]35 # ------------------------------------36 if args.get("scale"):37 return scale_app(uid=args.get('<user-id>'), aid=args.get('<app-id>'), r=args.get('-r'))38 if args.get("test"):39 return test_app()40def test_app():41 k8utils.get_service_port('kube-dj', 'user--wonyoung1026')42def get_service_port(uid, aid):43 namespace = USER_NAMESPACE_PREFIX + uid44 r = k8utils.get_service_port(ns=namespace, name=aid)45 print(r[1] if r[0] else r)46def describe_app(uid, aid):47 namespace = USER_NAMESPACE_PREFIX + uid48 r = k8utils.get_deployment_description(ns=namespace, name=aid)49 print(r[1] if r[0] else r)50def deploy_app(uid, aid, image, port, replicas):51 # TODO: accept multiple ports and their types52 namespace = USER_NAMESPACE_PREFIX + uid53 54 print("[INFO] Setting namespace")55 r = k8utils.set_namespace(namespace)56 if not r[0]:57 print(r)58 return59 60 print("[INFO] Creating PV")61 r = k8utils.create_pv(name=aid, ns=namespace)62 if not r[0]:63 print(r)64 return65 print("[INFO] Creating PVC")66 r = k8utils.create_pvc(name=aid, ns=namespace)67 if not r[0]:68 print(r)69 return70 print("[INFO] Creating deployment")71 r = k8utils.create_deployment(72 name=aid, ns=namespace, image=image, 73 port=port, replicas=replicas74 )75 if not r[0]:76 print(r)77 return78 print("[INFO] Creating service")79 r = k8utils.create_service(name=aid, ns=namespace)80 if not r[0]:81 print(r)82 return83 84 print("[INFO] Fetching node port number")85 r = k8utils.get_service_port(ns=namespace, name=aid)86 if not r[0]:87 print(r)88 return89 90 print("Deployed on port#"+r[1])91def destroy_app(uid, aid):92 namespace=USER_NAMESPACE_PREFIX+uid93 # print("[INFO] Deleting deployment")94 r = k8utils.delete_deployment(ns=namespace, name=aid)95 if not r[0]:96 print(r)97 return98 # print("[INFO] Deleting service")99 r = k8utils.delete_service(ns=namespace, name=aid)...

Full Screen

Full Screen

server.py

Source:server.py Github

copy

Full Screen

...19 async def serve(cls):20 loop = asyncio.get_running_loop()21 service = cls()22 host = get_service_host("reducer")23 port = get_service_port("reducer")24 service.setUp()25 server = Server([service], loop=loop)26 print(f'Server running at {host}:{port}')27 await server.start(host, port)28 await server.wait_closed()29 def setUp(self):30 fizz_channel = Channel(get_service_host("fizz"),31 get_service_port("fizz"),32 loop=asyncio.get_running_loop())33 self.fizz_client = FizzStub(fizz_channel)34 buzz_channel = Channel(get_service_host("buzz"),35 get_service_port("buzz"),36 loop=asyncio.get_running_loop())37 self.buzz_client = BuzzStub(buzz_channel)38 async def Ask(self, stream: Stream):39 fizz_result: FizzResponse40 buzz_result: BuzzResponse41 query: ReduceQuery = await stream.recv_message()42 fizz_future = self.fizz_client.Ask(FizzQuery(number=query.number))43 buzz_future = self.buzz_client.Ask(BuzzQuery(number=query.number))44 fizz_result, buzz_result = await asyncio.gather(45 fizz_future,46 buzz_future,47 loop=asyncio.get_running_loop(),48 )49 message = ""50 if fizz_result.isFizz:51 message += "Fizz"52 if buzz_result.isBuzz:53 message += "Buzz"54 await stream.send_message(ReduceResult(message=message))55def get_service_host(service_name: str) -> str:56 return os.environ.get(f"{service_name.upper()}_SERVICE_HOST", "127.0.0.1")57def get_service_port(service_name: str) -> int:58 return int(os.environ.get(f"{service_name.upper()}_SERVICE_PORT", "50051"))59if __name__ == "__main__":...

Full Screen

Full Screen

test_docker_compose.py

Source:test_docker_compose.py Github

copy

Full Screen

...6 def test_services_up(self):7 with DockerCompose(filepath='.') as compose:8 time.sleep(10)9 postgres_host = compose.get_service_host("postgres", 5432)10 postgres_port = compose.get_service_port("postgres", 5432)11 assert postgres_host == "0.0.0.0"12 assert postgres_port == "5432"13 airflow_webserver_host = compose.get_service_host("webserver", 8080)14 airflow_webserver_port = compose.get_service_port("webserver", 8080)15 assert airflow_webserver_host == "0.0.0.0"16 assert airflow_webserver_port == "8080"...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run testcontainers-python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful