Best Python code snippet using playwright-python
test_debug_server.py
Source:test_debug_server.py  
...18            killed = True19            break20    return killed21def setup_server(start_server, find_free_port, no_eval=False, stdout=False, env=None):22    port = find_free_port()23    args = ["+DEBUG_LOG", "+NO_EVAL"] if no_eval else ["+DEBUG_LOG"]24    s = start_server(port, "test_debug_server", args, stdout=stdout, env=env)25    assert s.poll() is None26    uri = "ws://localhost:{0}".format(port)27    return s, uri28def test_continue_stop(start_server, find_free_port):29    s, uri = setup_server(start_server, find_free_port, True)30    async def test_logic():31        client = hgdb.HGDBClient(uri, None)32        await client.connect()33        await client.continue_()34        await client.stop()35    asyncio.get_event_loop().run_until_complete(test_logic())36    # check if process exit...test_jupyter_kernel_raw.py
Source:test_jupyter_kernel_raw.py  
...6import logging7import zmq8from aiozmq import create_zmq_stream9import asyncio10def find_free_port() -> int:11    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:12        sock.bind(('', 0))13        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)14        return sock.getsockname()[1]15async def test_kernel() -> None:16    transport = 'tcp'17    ip = '127.0.0.1'18    shell_port = find_free_port()19    control_port = find_free_port()20    stdin_port = find_free_port()21    iopub_port = find_free_port()22    heartbeat_port = find_free_port()23    configuration = {24        'transport': transport,25        'ip': ip,26        'shell_port': shell_port,27        'control_port': control_port,28        'stdin_port': stdin_port,29        'iopub_port': iopub_port,30        'hb_port': heartbeat_port,31        'key': 'a7d9f4f3-acad37f08d4fe05069a03422',32        'signature_scheme': 'hmac-sha256'33    }34    configuration_json = json.dumps(configuration)35    with NamedTemporaryFile() as configuration_file:36        configuration_file.write(str.encode(configuration_json))...test_hgdb.py
Source:test_hgdb.py  
...24    p = subprocess.Popen(args, stdout=subprocess.PIPE if not log else sys.stdout,25                         stderr=subprocess.PIPE if not log else sys.stdout)26    time.sleep(wait)27    return p28def find_free_port():29    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:30        s.bind(('', 0))31        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)32        return s.getsockname()[1]33def start_program(port, **kwargs):34    dirname = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))35    hgdb = os.path.join(dirname, "hgdb")36    # use a fake db for arguments37    args = [hgdb, ":" + str(port),  "no-db", "--no-db-connection"]38    extra_args = []39    for n, v in kwargs.items():40        extra_args.append("--" + n)41        extra_args.append(v)42    args = args + extra_args43    p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)44    return p45def test_set_breakpoint_continue():46    port = find_free_port()47    # start the server48    s = start_server(port, "test_debug_server")49    # run the debugger50    p = start_program(port)51    # continue52    out = p.communicate(input=b"b test.py:1\nc\n")[0]53    out = out.decode("ascii")54    assert "Breakpoint 2 at test.py:1" in out55    s.kill()56    p.kill()57def test_rewind_time():58    port = find_free_port()59    # start the server60    s = start_server(port, "test_debug_server", supports_rewind=True)61    # run the debugger62    p = start_program(port)63    # continue64    out = p.communicate(input=b"b test.py:1\nc\ngo 200\ninfo time\n")[0]65    out = out.decode("ascii")66    assert "201" in out67    s.kill()68    p.kill()69def test_repl():70    port = find_free_port()71    # start the server72    s = start_server(port, "test_debug_server")73    # run the debugger74    p = start_program(port)75    # continue76    out = p.communicate(input=b"p 41 + mod.a\n")[0]77    out = out.decode("ascii")78    assert "42" in out79    s.kill()80    p.kill()81def test_step_over():82    port = find_free_port()83    # start the server84    s = start_server(port, "test_debug_server")85    # run the debugger86    p = start_program(port)87    # continue88    out = p.communicate(input=b"n\nn\nn\n")[0]89    out = out.decode("ascii")90    assert "Breakpoint 2 at test.py:1" in out91    assert "Breakpoint 8 at test.py:1" in out92    assert "Breakpoint 0 at test.py:2" in out93    s.kill()94    p.kill()95def test_set_value():96    port = find_free_port()97    # start the server98    s = start_server(port, "test_debug_server")99    p = start_program(port)100    # continue101    out = p.communicate(input=b"n\nset a=100\np a\nn\nn\np a + 1\n")[0]102    out = out.decode("ascii")103    assert "100" in out104    assert "101" in out105    s.kill()106    p.kill()107def test_data_breakpoint():108    port = find_free_port()109    s = start_server(port, "test_debug_server")110    p = start_program(port)111    # continue112    out = p.communicate(input=b"w c\nc\nc\ninfo watchpoint\n")[0]113    out = out.decode("ascii")114    assert "Watchpoint 0 at test.py:2" in out115    assert "Watchpoint 3 at test.py:5" in out116    assert "3\ttest.py:5\tc" in out117    s.kill()118    p.kill()119def test_src_mapping():120    port = find_free_port()121    # start the server122    s = start_server(port, "test_debug_server")123    # run the debugger124    p = start_program(port, map=":/tmp")125    # continue126    out = p.communicate(input=b"b /tmp/test.py:1\nc\n")[0]127    out = out.decode("ascii")128    assert "Breakpoint 2 at test.py:1" in out129    s.kill()130    p.kill()131if __name__ == "__main__":...test_ws_server.py
Source:test_ws_server.py  
...4import asyncio5import time6import websockets7def test_echo(start_server, find_free_port):8    port = find_free_port()9    s = start_server(port, "test_ws_server", wait=0.05)10    async def send_msg():11        uri = "ws://localhost:{0}".format(port)12        payload = "hello world"13        async with websockets.connect(uri) as ws:14            await ws.send(payload)15            echo = await ws.recv()16            assert echo == payload17    asyncio.get_event_loop().run_until_complete(send_msg())18    # kill the server19    s.terminate()20    while s.poll() is None:21        pass22def test_shutdown(start_server, find_free_port):23    port = find_free_port()24    s = start_server(port, "test_ws_server", wait=0.05)25    async def send_msg():26        uri = "ws://localhost:{0}".format(port)27        payload = "stop"28        async with websockets.connect(uri) as ws:29            await ws.send(payload)30    asyncio.get_event_loop().run_until_complete(send_msg())31    # check if process exit32    t = time.time()33    killed = False34    while time.time() < t + 1:35        if s.poll() is not None:36            killed = True37            break38    if not killed:39        s.terminate()40    assert killed41def test_topic_pub(start_server, find_free_port):42    port = find_free_port()43    s = start_server(port, "test_ws_server", wait=0.05)44    async def send_msg():45        uri = "ws://localhost:{0}".format(port)46        payload1 = "42"47        payload2 = "hello world"48        async with websockets.connect(uri) as ws1:49            async with websockets.connect(uri) as ws2:50                await ws1.send(payload1)51                echo1 = await ws1.recv()52                assert echo1 == payload153                await ws2.send(payload2)54                await ws1.recv()55                echo2 = await ws1.recv()56                assert(echo2 == payload2 * 2)...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
