How to use get_free_tcp_port method in localstack

Best Python code snippet using localstack_python

server_sockets_test.py

Source:server_sockets_test.py Github

copy

Full Screen

...45 server.terminate()46 def test_default_socket(self):47 self._test_connect([], "allow", [], "hello", ":", EXIT_OK)48 def test_tcp_socket(self):49 port = get_free_tcp_port()50 self._test_connect(["--bind-tcp=0.0.0.0:%i" % port], "allow", [], "hello", "tcp/127.0.0.1:%i/" % port, EXIT_OK)51 self._test_connect(["--bind-tcp=0.0.0.0:%i" % port], "allow", [], "hello", "ws/127.0.0.1:%i/" % port, EXIT_OK)52 def test_ws_socket(self):53 port = get_free_tcp_port()54 self._test_connect(["--bind-ws=0.0.0.0:%i" % port], "allow", [], "hello", "ws/127.0.0.1:%i/" % port, EXIT_OK)55 def test_ssl(self):56 server = None57 display_no = self.find_free_display_no()58 display = ":%s" % display_no59 tcp_port = get_free_tcp_port()60 ws_port = get_free_tcp_port()61 wss_port = get_free_tcp_port()62 ssl_port = get_free_tcp_port()63 try:64 tmpdir = tempfile.mkdtemp(suffix='ssl-xpra')65 certfile = os.path.join(tmpdir, "self.pem")66 openssl_command = [67 "openssl", "req", "-new", "-newkey", "rsa:4096", "-days", "2", "-nodes", "-x509",68 "-subj", "/C=US/ST=Denial/L=Springfield/O=Dis/CN=localhost",69 "-keyout", certfile, "-out", certfile,70 ]71 openssl = self.run_command(openssl_command)72 assert pollwait(openssl, 10)==0, "openssl certificate generation failed"73 cert_data = load_binary_file(certfile)74 log("generated cert data: %s", repr_ellipsized(cert_data))75 if not cert_data:76 #cannot run openssl? (happens from rpmbuild)...

Full Screen

Full Screen

server.py

Source:server.py Github

copy

Full Screen

...18from user_srv.settings import settings19def on_exit(signo, frame):20 logger.info("进程中断")21 sys.exit(0)22def get_free_tcp_port():23 tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)24 tcp.bind(("", 0))25 _, port = tcp.getsockname()26 tcp.close()27 return port28def serve():29 parser = argparse.ArgumentParser()30 parser.add_argument('--ip',31 nargs="?",32 type=str,33 default="192.168.245.1",34 help="binding ip"35 )36 parser.add_argument('--port',37 nargs="?",38 type=int,39 default=0,40 help="the listening port"41 )42 args = parser.parse_args()43 if args.port == 0:44 port = get_free_tcp_port()45 else:46 port = args.port47 logger.add("logs/user_srv_{time}.log")48 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))49 # 注册用户服务50 user_pb2_grpc.add_UserServicer_to_server(UserServicer(), server)51 # 注册健康检查52 health_servicer = health.HealthServicer()53 health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)54 server.add_insecure_port(f'{args.ip}:{port}')55 """56 windows下支持的信号是有限的:57 SIGINT ctrl+c 中断58 SIGTERM kill发出的软件终止59 """60 # 主进程退出信号监听61 signal.signal(signal.SIGINT, on_exit)62 signal.signal(signal.SIGTERM, on_exit)63 logger.info(f"启动服务:{args.ip}:{port}")64 server.start()65 logger.info(f"服务注册开始")66 register = consul.ConsulRegister(settings.CONSUL_HOST, settings.CONSUL_PORT)67 # 没有注册成功68 if not register.register(name=settings.SERVICE_NAME, id=settings.SERVICE_NAME,69 address=args.ip, port=port, tags=settings.SERVICE_TAGS, check=None):70 logger.info(f"服务注册失败")71 sys.exit(0)72 logger.info(f"服务注册成功")73 server.wait_for_termination()74# @logger.catch75# def my_function(x, y, z):76# # An error? It's caught anyway!77# return 1 / (x + y + z)78if __name__ == "__main__":79 # my_function(0,0,0)80 # logger.debug("调试信息")81 # logger.info("普通信息")82 # logger.warning("警告信息")83 # logger.error("错误信息")84 # logger.critical("严重错误信息")85 # print(get_free_tcp_port())86 logging.basicConfig()...

Full Screen

Full Screen

base_unit_test.py

Source:base_unit_test.py Github

copy

Full Screen

...3 def setUp(self):4 # generate random docker container name5 self.container_name = self.generate_random_string(15)6 # get a free TCP port for the container7 self.port = self.get_free_tcp_port()8 def tearDown(self):9 from os import system10 system("docker kill {}".format(self.container_name))11 @staticmethod12 def generate_random_string(k: int):13 from random import choice14 from string import ascii_lowercase, digits15 return ''.join(choice(ascii_lowercase + digits) for _ in range(k))16 @staticmethod17 def get_free_tcp_port():18 from socket import socket, AF_INET, SOCK_STREAM19 tcp = socket(AF_INET, SOCK_STREAM)20 tcp.bind(('', 0))21 addr, port = tcp.getsockname()22 tcp.close()23 return port24if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful