Best Python code snippet using fMBT_python
server.py
Source:server.py  
1from multiprocessing.reduction import reduce_socket2import multiprocessing3import select4import socket5import sys6try:7    import argparse8except:9    # argparse is only available in Python 2.7+10    print >> sys.stderr, 'pip install -U argparse'11    sys.exit(1)12def handle_conn(conn, addr):13    data = conn.recv(32)14    conn.sendall(data)15    conn.close()16def queued_handle_conn(queue):17    while True:18        rebuild_func, hints, addr = queue.get()19        conn = rebuild_func(*hints)20        handle_conn(conn, addr)21def basic_server(socket_):22    child = []23    try:24        while True:25            conn, addr = socket_.accept()26            p = multiprocessing.Process(target=handle_conn, args=(conn, addr))27            p.start()28            child.append(p)29    finally:30        [p.terminate() for p in child if p.is_alive()]31def select_server(socket_, timeout=1, use_worker=False):32    '''Single process select() with non-blocking accept() and recv().'''33    peers = []34    try:35        max_peers = 036        if use_worker:37            queue = multiprocessing.Queue()38            worker = multiprocessing.Process(target=queued_handle_conn,39                                             args=(queue,))40            worker.start()41        while True:42            max_peers = max(max_peers, len(peers))43            readable, w, e = select.select(peers + [socket_], [], [], timeout)44            for s in readable:45                if s is socket_:46                    while True:47                        try:48                            conn, addr = socket_.accept()49                            conn.setblocking(0)50                            peers.append(conn)51                        except:52                            break53                else:54                    peers.remove(s)55                    conn, addr = s, s.getpeername()56                    if use_worker:57                        # Behind-the-scene: 'conn' is serialized and sent to58                        # worker process via socket (IPC).59                        rebuild_func, hints = reduce_socket(conn)60                        queue.put((rebuild_func, hints, addr))61                    else:62                        handle_conn(conn, addr)63    finally:64        if use_worker and worker.is_alive():65            worker.terminate()66        print 'Max. number of connections:', max_peers67def poll_server(socket_, timeout=1, use_worker=False):68    '''Single process poll() with non-blocking accept() and recv().'''69    peers = {}  # {fileno: socket}70    flag = (select.POLLIN |71            select.POLLERR |72            select.POLLHUP)73    try:74        max_peers = 075        if use_worker:76            queue = multiprocessing.Queue()77            worker = multiprocessing.Process(target=queued_handle_conn,78                                             args=(queue,))79            worker.start()80        poll = select.poll()81        poll.register(socket_, select.POLLIN)82        while True:83            max_peers = max(max_peers, len(peers))84            actionable = poll.poll(timeout)85            for fd, event in actionable:86                if fd == socket_.fileno():87                    while True:88                        try:89                            conn, addr = socket_.accept()90                            conn.setblocking(0)91                            peers[conn.fileno()] = conn92                            poll.register(conn, flag)93                        except:94                            break95                elif event & select.POLLIN:96                    poll.unregister(fd)97                    conn, addr = peers[fd], peers[fd].getpeername()98                    if use_worker:99                        # Behind-the-scene: 'conn' is serialized and sent to100                        # worker process via socket (IPC).101                        rebuild_func, hints = reduce_socket(conn)102                        queue.put((rebuild_func, hints, addr))103                    else:104                        handle_conn(conn, addr)105                elif event & select.POLLERR or event & select.POLLHUP:106                    poll.unregister(fd)107                    peers[fd].close()108    finally:109        if use_worker and worker.is_alive():110            worker.terminate()111        print 'Max. number of connections:', max_peers112def epoll_server(socket_, timeout=1, use_worker=False):113    '''Single process epoll() with non-blocking accept() and recv().'''114    peers = {}  # {fileno: socket}115    flag = (select.EPOLLIN |116            select.EPOLLET |117            select.EPOLLERR |118            select.EPOLLHUP)119    try:120        max_peers = 0121        if use_worker:122            queue = multiprocessing.Queue()123            worker = multiprocessing.Process(target=queued_handle_conn,124                                             args=(queue,))125            worker.start()126        epoll = select.epoll()127        epoll.register(socket_, select.EPOLLIN | select.EPOLLET)128        while True:129            max_peers = max(max_peers, len(peers))130            actionable = epoll.poll(timeout=timeout)131            for fd, event in actionable:132                if fd == socket_.fileno():133                    while True:134                        try:135                            conn, addr = socket_.accept()136                            conn.setblocking(0)137                            peers[conn.fileno()] = conn138                            epoll.register(conn, flag)139                        except:140                            break141                elif event & select.EPOLLIN:142                    epoll.unregister(fd)143                    conn, addr = peers[fd], peers[fd].getpeername()144                    if use_worker:145                        # Behind-the-scene: 'conn' is serialized and sent to146                        # worker process via socket (IPC).147                        rebuild_func, hints = reduce_socket(conn)148                        queue.put((rebuild_func, hints, addr))149                    else:150                        handle_conn(conn, addr)151                elif event & select.EPOLLERR or event & select.EPOLLHUP:152                    epoll.unregister(fd)153                    peers[fd].close()154    finally:155        if use_worker and worker.is_alive():156            worker.terminate()157        epoll.close()158        print 'Max. number of connections:', max_peers159def main():160    HOST, PORT = '127.0.0.1', 8000161    MODES = ('basic', 'select', 'poll', 'epoll')162    argparser = argparse.ArgumentParser()163    argparser.add_argument('mode', help=('Operating mode of the server: %s'164                                         % ', '.join(MODES)))165    argparser.add_argument('--backlog', type=int, default=0,166                           help='socket.listen() backlog')167    argparser.add_argument('--timeout', type=int, default=1000,168                           help='select/poll/epoll timeout in ms')169    argparser.add_argument('--worker', action='store_true',170                           help=('Spawn a worker to process request in '171                                 'select/poll/epoll mode. '172                                 'NOTE: The sole purpose of this option is '173                                 'experiment, it does not really help shorten '174                                 'the response time.'))175    args = argparser.parse_args()176    if args.mode not in MODES:177        msg = 'Availble operating modes: %s' % ', '.join(MODES)178        print >> sys.stderr, msg179        sys.exit(1)180    socket_ = socket.socket(socket.AF_INET, socket.SOCK_STREAM)181    try:182        socket_.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)183        socket_.bind((HOST, PORT))184        if args.mode in ('select', 'poll', 'epoll'):185            socket_.setblocking(0)186        timeout = args.timeout / 1000187        socket_.listen(args.backlog)188        if args.mode == 'basic':189            basic_server(socket_)190        elif args.mode == 'select':191            select_server(socket_, timeout, use_worker=args.worker)192        elif args.mode == 'poll':193            poll_server(socket_, timeout, use_worker=args.worker)194        elif args.mode == 'epoll':195            epoll_server(socket_, timeout, use_worker=args.worker)196    except KeyboardInterrupt:197        pass198    finally:199        socket_.close()200if __name__ == '__main__':...test_workers.py
Source:test_workers.py  
1"""2this is a test module testing the workers api of Chariots3"""4import time5import pytest6from flaky import flaky7from redis import Redis8from chariots.pipelines import PipelinesServer, Pipeline9from chariots.pipelines.nodes import Node10from chariots.testing import TestPipelinesClient11from chariots.workers import JobStatus, RQWorkerPool12from chariots.errors import VersionError13from chariots._helpers.test_helpers import IsPair, RQWorkerContext, build_keras_pipeline, \14    do_keras_pipeline_predictions_test15def do_async_pipeline_test(test_client, pipe, use_worker=None):16    """helper function that tests the basic pipeline with just the IsPair op"""17    response = test_client.call_pipeline(pipe, pipeline_input=list(range(20)), use_worker=use_worker)18    assert response.job_status == JobStatus.queued19    time.sleep(5)20    response = test_client.fetch_job(response.job_id, pipe)21    assert response.job_status == JobStatus.done22    assert response.value == [not i % 2 for i in range(20)]23def test_app_async(tmpdir, opstore_func):24    """tests executing the app async with the app setting `use_workers` to true"""25    with RQWorkerContext():26        pipe1 = Pipeline([27            Node(IsPair(), input_nodes=['__pipeline_input__'], output_nodes='__pipeline_output__'),28        ], name='inner_pipe')29        app = PipelinesServer([pipe1], op_store_client=opstore_func(tmpdir), import_name='some_app',30                              worker_pool=RQWorkerPool(redis=Redis()),31                              use_workers=True)32        test_client = TestPipelinesClient(app)33        do_async_pipeline_test(test_client, pipe1)34def test_app_async_pipeline(tmpdir, opstore_func):35    """tests executing the app async with the pipeline setting `use_workers` to true"""36    with RQWorkerContext():37        pipe1 = Pipeline([38            Node(IsPair(), input_nodes=['__pipeline_input__'], output_nodes='__pipeline_output__')39        ], name='inner_pipe', use_worker=True)40        app = PipelinesServer([pipe1], op_store_client=opstore_func(tmpdir), import_name='some_app',41                              worker_pool=RQWorkerPool(redis=Redis()))42        test_client = TestPipelinesClient(app)43        do_async_pipeline_test(test_client, pipe1)44def test_app_async_request(tmpdir, opstore_func):45    """tests executing the app async with the client setting `use_workers` to true"""46    with RQWorkerContext():47        pipe1 = Pipeline([48            Node(IsPair(), input_nodes=['__pipeline_input__'], output_nodes='__pipeline_output__')49        ], name='inner_pipe')50        app = PipelinesServer([pipe1], op_store_client=opstore_func(tmpdir), import_name='some_app',51                              worker_pool=RQWorkerPool(redis=Redis()))52        test_client = TestPipelinesClient(app)53        do_async_pipeline_test(test_client, pipe1, use_worker=True)54def test_app_async_conflicting_config(tmpdir, opstore_func):55    """56    tests the behavior when their are conflicts in the `use_workers` config (there is at least one True and one False)57    """58    with RQWorkerContext():59        pipe1 = Pipeline([60            Node(IsPair(), input_nodes=['__pipeline_input__'], output_nodes='__pipeline_output__')61        ], name='inner_pipe', use_worker=True)62        app = PipelinesServer([pipe1], op_store_client=opstore_func(tmpdir), import_name='some_app',63                              worker_pool=RQWorkerPool(redis=Redis()), use_workers=False)64        test_client = TestPipelinesClient(app)65        response = test_client.call_pipeline(pipe1, pipeline_input=list(range(20)), use_worker=True)66        assert response.job_status == JobStatus.done67        assert response.value == [not i % 2 for i in range(20)]68# this needs to be flaky because it might take a little bit longer69@flaky(3, 1)70def test_complex_sk_training_pipeline_async(complex_sk_pipelines, tmpdir, opstore_func):71    """tests the async with a more complexe sklearn based pipeline"""72    with RQWorkerContext():73        train_transform, train_pipe, pred_pipe = complex_sk_pipelines74        train_transform.use_worker = True75        train_pipe.use_worker = True76        my_app = PipelinesServer(app_pipelines=[train_transform, train_pipe, pred_pipe],77                                 op_store_client=opstore_func(tmpdir), import_name='my_app',78                                 worker_pool=RQWorkerPool(redis=Redis()))79        test_client = TestPipelinesClient(my_app)80        response = test_client.call_pipeline(train_transform)81        time.sleep(5.)82        response = test_client.fetch_job(response.job_id, train_transform)83        assert response.job_status == JobStatus.done84        # test_client.load_pipeline(train_pipe)85        response = test_client.call_pipeline(train_pipe)86        time.sleep(5.)87        response = test_client.fetch_job(response.job_id, train_pipe)88        assert response.job_status == JobStatus.done89        test_client.load_pipeline(pred_pipe)90        response = test_client.call_pipeline(pred_pipe, pipeline_input=[91            [100, 101, 102],92            [101, 102, 103],93            [102, 103, 104]])94        assert response.job_status == JobStatus.done95        assert len(response.value) == 396        for i, individual_value in enumerate(response.value):97            assert abs(101 + i - individual_value) < 1e-598        response = test_client.call_pipeline(train_transform)99        test_client.save_pipeline(train_transform)100        time.sleep(5.)101        response = test_client.fetch_job(response.job_id, train_transform)102        assert response.job_status == JobStatus.done103        with pytest.raises(VersionError):104            test_client.load_pipeline(pred_pipe)105@flaky(5, 1)106def test_train_keras_pipeline_async(tmpdir, opstore_func):107    """test the workers with a keras pipeline"""108    with RQWorkerContext():109        train_pipeline, pred_pipeline = build_keras_pipeline(train_async=True)110        my_app = PipelinesServer(app_pipelines=[train_pipeline, pred_pipeline], op_store_client=opstore_func(tmpdir),111                                 import_name='my_app', worker_pool=RQWorkerPool(redis=Redis()))112        client = TestPipelinesClient(my_app)113        client.call_pipeline(train_pipeline)114        response = client.call_pipeline(train_pipeline)115        time.sleep(10)116        response = client.fetch_job(response.job_id, train_pipeline)117        assert response.job_status == JobStatus.done118        client.load_pipeline(pred_pipeline)...test_subpool.py
Source:test_subpool.py  
1from __future__ import print_function2from future import standard_library3standard_library.install_aliases()4from builtins import range5from bson import ObjectId6import urllib.request, urllib.error, urllib.parse7import json8import time9import os10import pytest11@pytest.mark.parametrize(["use_worker"], [[False], [True]])12def test_subpool_simple(worker, use_worker):13    # Check that a subpool can be used both in an outside of a Job context14    if use_worker:15        worker.start()16    else:17        from tests.tasks.general import SubPool18    def run(params):19        if use_worker:20            return worker.send_task("tests.tasks.general.SubPool", params)21        else:22            return SubPool().run(params)23    # Check that sequential sleeps work24    start_time = time.time()25    result = run({26        "pool_size": 1, "inner_params": [1, 1]27    })28    total_time = time.time() - start_time29    assert result == [1, 1]30    assert total_time > 231    # py.test doesn't use gevent so we don't get the benefits of the hub32    if use_worker:33        # Parallel sleeps34        start_time = time.time()35        result = run({36            "pool_size": 20, "inner_params": [1] * 2037        })38        total_time = time.time() - start_time39        assert result == [1] * 2040        assert total_time < 241@pytest.mark.parametrize(["p_imap"], [42    [True],43    [False]44])45def test_subpool_exception(worker, p_imap):46    # An exception in the subpool is raised outside the pool47    worker.send_task("tests.tasks.general.SubPool", {48        "pool_size": 20, "inner_params": ["exception"], "imap": p_imap49    }, accept_statuses=["failed"])50    job = worker.mongodb_jobs.mrq_jobs.find_one()51    assert job52    assert job["status"] == "failed"53    assert "__INNER_EXCEPTION_LINE__" in job["traceback"]54@pytest.mark.parametrize(["p_size"], [55    [0],56    [1],57    [2],58    [100]59])60def test_subpool_import(worker, p_size):61    """ This tests that the patch_import() function does its job of preventing a gevent crash62    like explained in https://code.google.com/p/gevent/issues/detail?id=108 """63    # Large file import64    worker.send_task("tests.tasks.general.SubPool", {65        "pool_size": p_size, "inner_params": ["import-large-file"] * p_size66    }, accept_statuses=["success"])67def test_subpool_imap():68    from mrq.context import subpool_imap69    def iterator(n):70        for i in range(0, n):71            if i == 5:72                raise Exception("Iterator exception!")73            yield i74    def inner_func(i):75        time.sleep(1)76        print("inner_func: %s" % i)77        if i == 4:78            raise Exception("Inner exception!")79        return i * 280    with pytest.raises(Exception):81        for res in subpool_imap(10, inner_func, iterator(10)):82            print("Got %s" % res)83    for res in subpool_imap(2, inner_func, iterator(1)):84        print("Got %s" % res)85    with pytest.raises(Exception):86        for res in subpool_imap(2, inner_func, iterator(5)):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
