How to use find_free_port method in avocado

Best Python code snippet using avocado_python

test_debug_server.py

Source:test_debug_server.py Github

copy

Full Screen

...18 killed = True19 break20 return killed21def setup_server(start_server, find_free_port, no_eval=False, stdout=False, env=None):22 port = find_free_port()23 args = ["+DEBUG_LOG", "+NO_EVAL"] if no_eval else ["+DEBUG_LOG"]24 s = start_server(port, "test_debug_server", args, stdout=stdout, env=env)25 assert s.poll() is None26 uri = "ws://localhost:{0}".format(port)27 return s, uri28def test_continue_stop(start_server, find_free_port):29 s, uri = setup_server(start_server, find_free_port, True)30 async def test_logic():31 client = hgdb.HGDBClient(uri, None)32 await client.connect()33 await client.continue_()34 await client.stop()35 asyncio.get_event_loop().run_until_complete(test_logic())36 # check if process exit...

Full Screen

Full Screen

test_jupyter_kernel_raw.py

Source:test_jupyter_kernel_raw.py Github

copy

Full Screen

...6import logging7import zmq8from aiozmq import create_zmq_stream9import asyncio10def find_free_port() -> int:11 with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:12 sock.bind(('', 0))13 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)14 return sock.getsockname()[1]15async def test_kernel() -> None:16 transport = 'tcp'17 ip = '127.0.0.1'18 shell_port = find_free_port()19 control_port = find_free_port()20 stdin_port = find_free_port()21 iopub_port = find_free_port()22 heartbeat_port = find_free_port()23 configuration = {24 'transport': transport,25 'ip': ip,26 'shell_port': shell_port,27 'control_port': control_port,28 'stdin_port': stdin_port,29 'iopub_port': iopub_port,30 'hb_port': heartbeat_port,31 'key': 'a7d9f4f3-acad37f08d4fe05069a03422',32 'signature_scheme': 'hmac-sha256'33 }34 configuration_json = json.dumps(configuration)35 with NamedTemporaryFile() as configuration_file:36 configuration_file.write(str.encode(configuration_json))...

Full Screen

Full Screen

test_hgdb.py

Source:test_hgdb.py Github

copy

Full Screen

...24 p = subprocess.Popen(args, stdout=subprocess.PIPE if not log else sys.stdout,25 stderr=subprocess.PIPE if not log else sys.stdout)26 time.sleep(wait)27 return p28def find_free_port():29 with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:30 s.bind(('', 0))31 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)32 return s.getsockname()[1]33def start_program(port, **kwargs):34 dirname = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))35 hgdb = os.path.join(dirname, "hgdb")36 # use a fake db for arguments37 args = [hgdb, ":" + str(port), "no-db", "--no-db-connection"]38 extra_args = []39 for n, v in kwargs.items():40 extra_args.append("--" + n)41 extra_args.append(v)42 args = args + extra_args43 p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)44 return p45def test_set_breakpoint_continue():46 port = find_free_port()47 # start the server48 s = start_server(port, "test_debug_server")49 # run the debugger50 p = start_program(port)51 # continue52 out = p.communicate(input=b"b test.py:1\nc\n")[0]53 out = out.decode("ascii")54 assert "Breakpoint 2 at test.py:1" in out55 s.kill()56 p.kill()57def test_rewind_time():58 port = find_free_port()59 # start the server60 s = start_server(port, "test_debug_server", supports_rewind=True)61 # run the debugger62 p = start_program(port)63 # continue64 out = p.communicate(input=b"b test.py:1\nc\ngo 200\ninfo time\n")[0]65 out = out.decode("ascii")66 assert "201" in out67 s.kill()68 p.kill()69def test_repl():70 port = find_free_port()71 # start the server72 s = start_server(port, "test_debug_server")73 # run the debugger74 p = start_program(port)75 # continue76 out = p.communicate(input=b"p 41 + mod.a\n")[0]77 out = out.decode("ascii")78 assert "42" in out79 s.kill()80 p.kill()81def test_step_over():82 port = find_free_port()83 # start the server84 s = start_server(port, "test_debug_server")85 # run the debugger86 p = start_program(port)87 # continue88 out = p.communicate(input=b"n\nn\nn\n")[0]89 out = out.decode("ascii")90 assert "Breakpoint 2 at test.py:1" in out91 assert "Breakpoint 8 at test.py:1" in out92 assert "Breakpoint 0 at test.py:2" in out93 s.kill()94 p.kill()95def test_set_value():96 port = find_free_port()97 # start the server98 s = start_server(port, "test_debug_server")99 p = start_program(port)100 # continue101 out = p.communicate(input=b"n\nset a=100\np a\nn\nn\np a + 1\n")[0]102 out = out.decode("ascii")103 assert "100" in out104 assert "101" in out105 s.kill()106 p.kill()107def test_data_breakpoint():108 port = find_free_port()109 s = start_server(port, "test_debug_server")110 p = start_program(port)111 # continue112 out = p.communicate(input=b"w c\nc\nc\ninfo watchpoint\n")[0]113 out = out.decode("ascii")114 assert "Watchpoint 0 at test.py:2" in out115 assert "Watchpoint 3 at test.py:5" in out116 assert "3\ttest.py:5\tc" in out117 s.kill()118 p.kill()119def test_src_mapping():120 port = find_free_port()121 # start the server122 s = start_server(port, "test_debug_server")123 # run the debugger124 p = start_program(port, map=":/tmp")125 # continue126 out = p.communicate(input=b"b /tmp/test.py:1\nc\n")[0]127 out = out.decode("ascii")128 assert "Breakpoint 2 at test.py:1" in out129 s.kill()130 p.kill()131if __name__ == "__main__":...

Full Screen

Full Screen

test_ws_server.py

Source:test_ws_server.py Github

copy

Full Screen

...4import asyncio5import time6import websockets7def test_echo(start_server, find_free_port):8 port = find_free_port()9 s = start_server(port, "test_ws_server", wait=0.05)10 async def send_msg():11 uri = "ws://localhost:{0}".format(port)12 payload = "hello world"13 async with websockets.connect(uri) as ws:14 await ws.send(payload)15 echo = await ws.recv()16 assert echo == payload17 asyncio.get_event_loop().run_until_complete(send_msg())18 # kill the server19 s.terminate()20 while s.poll() is None:21 pass22def test_shutdown(start_server, find_free_port):23 port = find_free_port()24 s = start_server(port, "test_ws_server", wait=0.05)25 async def send_msg():26 uri = "ws://localhost:{0}".format(port)27 payload = "stop"28 async with websockets.connect(uri) as ws:29 await ws.send(payload)30 asyncio.get_event_loop().run_until_complete(send_msg())31 # check if process exit32 t = time.time()33 killed = False34 while time.time() < t + 1:35 if s.poll() is not None:36 killed = True37 break38 if not killed:39 s.terminate()40 assert killed41def test_topic_pub(start_server, find_free_port):42 port = find_free_port()43 s = start_server(port, "test_ws_server", wait=0.05)44 async def send_msg():45 uri = "ws://localhost:{0}".format(port)46 payload1 = "42"47 payload2 = "hello world"48 async with websockets.connect(uri) as ws1:49 async with websockets.connect(uri) as ws2:50 await ws1.send(payload1)51 echo1 = await ws1.recv()52 assert echo1 == payload153 await ws2.send(payload2)54 await ws1.recv()55 echo2 = await ws1.recv()56 assert(echo2 == payload2 * 2)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful