Best Python code snippet using autotest_python
test_p2p_daemon.py
Source:test_p2p_daemon.py  
...9from multiaddr import Multiaddr10from hivemind.p2p import P2P, P2PHandlerError11from hivemind.proto import dht_pb212from hivemind.utils.serializer import MSGPackSerializer13def is_process_running(pid: int) -> bool:14    return subprocess.run(["ps", "-p", str(pid)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 015async def replicate_if_needed(p2p: P2P, replicate: bool) -> P2P:16    return await P2P.replicate(p2p.daemon_listen_maddr) if replicate else p2p17@pytest.mark.asyncio18async def test_daemon_killed_on_del():19    p2p_daemon = await P2P.create()20    child_pid = p2p_daemon._child.pid21    assert is_process_running(child_pid)22    await p2p_daemon.shutdown()23    assert not is_process_running(child_pid)24@pytest.mark.parametrize(25    "host_maddrs",26    [27        [Multiaddr("/ip4/127.0.0.1/tcp/0")],28        [Multiaddr("/ip4/127.0.0.1/udp/0/quic")],29        [Multiaddr("/ip4/127.0.0.1/tcp/0"), Multiaddr("/ip4/127.0.0.1/udp/0/quic")],30    ],31)32@pytest.mark.asyncio33async def test_transports(host_maddrs: List[Multiaddr]):34    server = await P2P.create(quic=True, host_maddrs=host_maddrs)35    peers = await server.list_peers()36    assert len(peers) == 037    client = await P2P.create(quic=True, host_maddrs=host_maddrs, initial_peers=await server.get_visible_maddrs())38    await client.wait_for_at_least_n_peers(1)39    peers = await client.list_peers()40    assert len(peers) == 141    peers = await server.list_peers()42    assert len(peers) == 143@pytest.mark.asyncio44async def test_daemon_replica_does_not_affect_primary():45    p2p_daemon = await P2P.create()46    p2p_replica = await P2P.replicate(p2p_daemon.daemon_listen_maddr)47    child_pid = p2p_daemon._child.pid48    assert is_process_running(child_pid)49    await p2p_replica.shutdown()50    assert is_process_running(child_pid)51    await p2p_daemon.shutdown()52    assert not is_process_running(child_pid)53@pytest.mark.parametrize(54    "should_cancel,replicate",55    [56        (True, False),57        (True, True),58        (False, False),59        (False, True),60    ],61)62@pytest.mark.asyncio63async def test_call_protobuf_handler(should_cancel, replicate, handle_name="handle"):64    handler_cancelled = False65    server_primary = await P2P.create()66    server = await replicate_if_needed(server_primary, replicate)67    async def ping_handler(request, context):68        try:69            await asyncio.sleep(2)70        except asyncio.CancelledError:71            nonlocal handler_cancelled72            handler_cancelled = True73        return dht_pb2.PingResponse(peer=dht_pb2.NodeInfo(node_id=server.id.to_bytes()), available=True)74    server_pid = server_primary._child.pid75    await server.add_protobuf_handler(handle_name, ping_handler, dht_pb2.PingRequest)76    assert is_process_running(server_pid)77    client_primary = await P2P.create(initial_peers=await server.get_visible_maddrs())78    client = await replicate_if_needed(client_primary, replicate)79    client_pid = client_primary._child.pid80    assert is_process_running(client_pid)81    await client.wait_for_at_least_n_peers(1)82    ping_request = dht_pb2.PingRequest(peer=dht_pb2.NodeInfo(node_id=client.id.to_bytes()), validate=True)83    expected_response = dht_pb2.PingResponse(peer=dht_pb2.NodeInfo(node_id=server.id.to_bytes()), available=True)84    if should_cancel:85        call_task = asyncio.create_task(86            client.call_protobuf_handler(server.id, handle_name, ping_request, dht_pb2.PingResponse)87        )88        await asyncio.sleep(0.25)89        call_task.cancel()90        await asyncio.sleep(0.25)91        assert handler_cancelled92    else:93        actual_response = await client.call_protobuf_handler(94            server.id, handle_name, ping_request, dht_pb2.PingResponse95        )96        assert actual_response == expected_response97        assert not handler_cancelled98    await server.shutdown()99    await server_primary.shutdown()100    assert not is_process_running(server_pid)101    await client_primary.shutdown()102    assert not is_process_running(client_pid)103@pytest.mark.asyncio104async def test_call_protobuf_handler_error(handle_name="handle"):105    async def error_handler(request, context):106        raise ValueError("boom")107    server = await P2P.create()108    server_pid = server._child.pid109    await server.add_protobuf_handler(handle_name, error_handler, dht_pb2.PingRequest)110    assert is_process_running(server_pid)111    client = await P2P.create(initial_peers=await server.get_visible_maddrs())112    client_pid = client._child.pid113    assert is_process_running(client_pid)114    await client.wait_for_at_least_n_peers(1)115    ping_request = dht_pb2.PingRequest(peer=dht_pb2.NodeInfo(node_id=client.id.to_bytes()), validate=True)116    with pytest.raises(P2PHandlerError) as excinfo:117        await client.call_protobuf_handler(server.id, handle_name, ping_request, dht_pb2.PingResponse)118    assert "boom" in str(excinfo.value)119    await server.shutdown()120    await client.shutdown()121async def handle_square_stream(_, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:122    with closing(writer):123        while True:124            try:125                x = MSGPackSerializer.loads(await P2P.receive_raw_data(reader))126            except asyncio.IncompleteReadError:127                break128            result = x ** 2129            await P2P.send_raw_data(MSGPackSerializer.dumps(result), writer)130async def validate_square_stream(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:131    with closing(writer):132        for _ in range(10):133            x = np.random.randint(100)134            await P2P.send_raw_data(MSGPackSerializer.dumps(x), writer)135            result = MSGPackSerializer.loads(await P2P.receive_raw_data(reader))136            assert result == x ** 2137@pytest.mark.asyncio138async def test_call_peer_single_process():139    server = await P2P.create()140    server_pid = server._child.pid141    assert is_process_running(server_pid)142    handler_name = "square"143    await server.add_binary_stream_handler(handler_name, handle_square_stream)144    client = await P2P.create(initial_peers=await server.get_visible_maddrs())145    client_pid = client._child.pid146    assert is_process_running(client_pid)147    await client.wait_for_at_least_n_peers(1)148    _, reader, writer = await client.call_binary_stream_handler(server.id, handler_name)149    await validate_square_stream(reader, writer)150    await server.shutdown()151    assert not is_process_running(server_pid)152    await client.shutdown()153    assert not is_process_running(client_pid)154async def run_server(handler_name, server_side, response_received):155    server = await P2P.create()156    server_pid = server._child.pid157    assert is_process_running(server_pid)158    await server.add_binary_stream_handler(handler_name, handle_square_stream)159    server_side.send(server.id)160    server_side.send(await server.get_visible_maddrs())161    while response_received.value == 0:162        await asyncio.sleep(0.5)163    await server.shutdown()164    assert not is_process_running(server_pid)165def server_target(handler_name, server_side, response_received):166    asyncio.run(run_server(handler_name, server_side, response_received))167@pytest.mark.asyncio168async def test_call_peer_different_processes():169    handler_name = "square"170    server_side, client_side = mp.Pipe()171    response_received = mp.Value(np.ctypeslib.as_ctypes_type(np.int32))172    response_received.value = 0173    proc = mp.Process(target=server_target, args=(handler_name, server_side, response_received))174    proc.start()175    peer_id = client_side.recv()176    peer_maddrs = client_side.recv()177    client = await P2P.create(initial_peers=peer_maddrs)178    client_pid = client._child.pid179    assert is_process_running(client_pid)180    await client.wait_for_at_least_n_peers(1)181    _, reader, writer = await client.call_binary_stream_handler(peer_id, handler_name)182    await validate_square_stream(reader, writer)183    response_received.value = 1184    await client.shutdown()185    assert not is_process_running(client_pid)186    proc.join()187    assert proc.exitcode == 0188@pytest.mark.asyncio189async def test_error_closes_connection():190    async def handle_raising_error(_, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:191        with closing(writer):192            command = await P2P.receive_raw_data(reader)193            if command == b"raise_error":194                raise Exception("The handler has failed")195            else:196                await P2P.send_raw_data(b"okay", writer)197    server = await P2P.create()198    server_pid = server._child.pid199    assert is_process_running(server_pid)200    handler_name = "handler"201    await server.add_binary_stream_handler(handler_name, handle_raising_error)202    client = await P2P.create(initial_peers=await server.get_visible_maddrs())203    client_pid = client._child.pid204    assert is_process_running(client_pid)205    await client.wait_for_at_least_n_peers(1)206    _, reader, writer = await client.call_binary_stream_handler(server.id, handler_name)207    with closing(writer):208        await P2P.send_raw_data(b"raise_error", writer)209        with pytest.raises(asyncio.IncompleteReadError):  # Means that the connection is closed210            await P2P.receive_raw_data(reader)211    # Despite the handler raised an exception, the server did not crash and ready for next requests212    assert is_process_running(server_pid)213    _, reader, writer = await client.call_binary_stream_handler(server.id, handler_name)214    with closing(writer):215        await P2P.send_raw_data(b"behave_normally", writer)216        assert await P2P.receive_raw_data(reader) == b"okay"217    await server.shutdown()218    assert not is_process_running(server_pid)219    await client.shutdown()220    assert not is_process_running(client_pid)221@pytest.mark.asyncio222async def test_handlers_on_different_replicas():223    async def handler(_, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, key: str) -> None:224        with closing(writer):225            await P2P.send_raw_data(key, writer)226    server_primary = await P2P.create()227    server_id = server_primary.id228    await server_primary.add_binary_stream_handler("handle_primary", partial(handler, key=b"primary"))229    server_replica1 = await replicate_if_needed(server_primary, True)230    await server_replica1.add_binary_stream_handler("handle1", partial(handler, key=b"replica1"))231    server_replica2 = await replicate_if_needed(server_primary, True)232    await server_replica2.add_binary_stream_handler("handle2", partial(handler, key=b"replica2"))233    client = await P2P.create(initial_peers=await server_primary.get_visible_maddrs())234    await client.wait_for_at_least_n_peers(1)...yeshup_tests.py
Source:yeshup_tests.py  
...72    #p.stdin.close()73    p.communicate()74    code = p.wait()75    return code76def is_process_running(pid):77    try:78        os.kill(pid,0)79        return True80    except ProcessLookupError:81        return False82    83def test_simple_yeshup(trp):84    #print("run basic")85    #x = run_it([sys.argv[0], "launch", "basic"])86    #print(f"exit code: {x}")87    # start the main process in a thread, this will block88    # then can exit it asynchronously89    get_pid_queue = queue.Queue()90    91    def run_parent():92        # __file__ refers to this file, yeshup_test.py93        p = run_it_stdin([__file__, "launch", "basic"])94        #p.wait()95        #print("launched")96        # parse the two pids97        for line in p.stdout:98            #print(line)99            line = str(line, 'ascii')100            #print("here")101            #print(type(line))102            if line.startswith("parent pid:"):103                parent_pid = int(line[11:])104                break105            else:106                print(line)107        for line in p.stdout:108            line = str(line, 'ascii')109            #print("here")110            #print(type(line))111            if line.startswith("child pid:"):112                child_pid = int(line[10:])113                break114            else:115                print(line)116        #print((parent_pid, child_pid))117        get_pid_queue.put((parent_pid, child_pid))118        for line in p.stdout:119            print(line)120        p.wait()121    # need to get the stdout back from the run parent122    parent_runner_thread = threading.Thread(target=run_parent)123    parent_runner_thread.start()124    #print("queue get")125    (parent_pid,child_pid) = get_pid_queue.get()126    if trp is None:127        # display if they are running128        print(f"parent: {parent_pid} running {is_process_running(parent_pid)}")129        print(f"child: {child_pid} running {is_process_running(child_pid)}")130    else:131        trp.assert_true(f"parent running ", is_process_running(parent_pid))132        trp.assert_true(f"child running ", is_process_running(child_pid))133    # exit the process134    #c = close_stdin(p)135    #print("kill")136    os.kill(parent_pid, signal.SIGTERM)137    parent_runner_thread.join()138    139    # p.wait()140    # display again if the two processes are running141    # the child doesn't always exit fast enough without this142    # it's only needed for reliable automated testing143    time.sleep(0.01)144    if trp is None:145        print(f"parent: {parent_pid} running {is_process_running(parent_pid)}")146        print(f"child: {child_pid} running {is_process_running(child_pid)}")147    else:148        trp.assert_false(f"parent not running ", is_process_running(parent_pid))149        trp.assert_false(f"child not running ", is_process_running(child_pid))150    #return c151    152def launch_basic():153    #print("launch basic")154    print(f"parent pid: {os.getpid()}", flush=True)155    def child_process():156        # this is the line which makes sure the child exits157        # when the parent does. it has to be run in the child process158        # to make this more general, can call this then call exec*159        # it's reset on fork, but not on exec*160        yeshup.yeshup_me()161        print(f"child pid: {os.getpid()}", flush=True)162        #sys.stdout.flush()163        #for line in sys.stdin:...oc.py
Source:oc.py  
...24    @is_connected.setter25    def is_connected(self, value):26        self._is_connected = value27    @property28    def is_process_running(self):29        return self._is_process_running30    @is_process_running.setter31    def is_process_running(self, value):32        self._is_process_running = value33    @property34    def check_process_enabled(self):35        return self._check_process_enabled36    @check_process_enabled.setter37    def check_process_enabled(self, value):38        self._check_process_enabled = value39    @staticmethod40    def get_server_cert():41        result = subprocess.getoutput(f"echo {settings.current_profile['password']} | openconnect "42                                      f"--no-dtls --authgroup MGT {settings.current_profile['server']}"43                                      f" --passwd-on-stdin")44        cert = re.findall('--servercert (.+)\n', result)45        if not cert:...menu.py
Source:menu.py  
...55            ready_data = True56        elif key == '3':57            ns_loop(data, answers, zet_path)58        elif key == '4':59            if not app.is_process_running():60                app.start("OscGraph.exe")61            if not gen.is_process_running():62                gen.start("DAC_OCX.exe")63        elif key == '0':64            if app.is_process_running():65                app.kill()66            if gen.is_process_running():67                gen.kill()68            sys.exit()69if __name__ == "__main__":70    zet_path = "C:/Users/Public/Documents/ZETLab/artem/result"  # ÐУТЬ Ð ÐÐÐÐЫÐ71    try:72        rmtree(zet_path)73    except FileNotFoundError:74        pass75    print("Find path to oscgraph:", zet_path)76    app = Application(backend="uia")77    gen = Application(backend="uia")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
