Best Python code snippet using fMBT_python
server.py
Source:server.py  
...228            _g_local_namespaces[ns] = init_code229            _g_local_namespaces[ns].update(clean_ns) # copy pythonshare defaults230        else:231            raise TypeError("unsupported init_code type")232def _drop_local_namespace(ns):233    daemon_log('drop local namespace "%s"' % (ns,))234    _g_local_namespaces[ns]["pythonshare_ns"].call_on_drop()235    del _g_local_namespaces[ns]236    del _g_local_namespace_locks[ns]237    del _g_async_rvs[ns]238    # send notification to all connections in _g_namespace_exports[ns]?239def _drop_remote_namespace(ns):240    daemon_log('drop remote namespace "%s"' % (ns,))241    try:242        rns = _g_remote_namespaces[ns]243        del _g_remote_namespaces[ns]244        rns.__del__()245    except KeyError:246        pass # already dropped247    # send notification to all connections in _g_namespace_exports[ns]?248def _init_remote_namespace(ns, conn, to_remote, from_remote):249    if ns in _g_remote_namespaces:250        raise ValueError('Remote namespace "%s" already registered' % (251            ns,))252    daemon_log('added remote namespace "%s", origin "%s"' % (253        ns, conn.getpeername()))254    _g_remote_namespaces[ns] = Pythonshare_rns(conn, to_remote, from_remote)255def _register_exported_namespace(ns, conn):256    if not ns in _g_namespace_exports:257        _g_namespace_exports[ns] = []258    _g_namespace_exports[ns].append(conn)259def _local_execute(exec_msg, conn_id=None):260    global _g_executing_pythonshare_conn_id261    ns = exec_msg.namespace262    if not ns in _g_local_namespaces:263        code_exc = expr_exc = "no local namespace %s" % (ns,)264        return messages.Exec_rv(code_exc, expr_exc, None)265    if conn_id:266        if not conn_id in _g_namespace_users:267            _g_namespace_users[conn_id] = set([ns])268        else:269            _g_namespace_users[conn_id].add(ns)270    code_exc, expr_exc, expr_rv = None, None, None271    if not exec_msg.lock or _g_local_namespace_locks[ns].acquire():272        _g_executing_pythonshare_conn_id = conn_id273        try:274            if exec_msg.code not in [None, ""]:275                try:276                    exec exec_msg.code in _g_local_namespaces[ns]277                except Exception, e:278                    code_exc = exception2string(sys.exc_info())279            if exec_msg.expr not in [None, ""]:280                try:281                    expr_rv = eval(exec_msg.expr, _g_local_namespaces[ns])282                except Exception, e:283                    expr_exc = exception2string(sys.exc_info())284        finally:285            _g_executing_pythonshare_conn_id = None286            if exec_msg.lock:287                try:288                    _g_local_namespace_locks[ns].release()289                except thread.error:290                    pass # already unlocked namespace291    else:292        code_exc = expr_exc = 'locking namespace "%s" failed' % (ns,)293    if isinstance(expr_rv, pythonshare.messages.Exec_rv):294        return expr_rv295    else:296        return messages.Exec_rv(code_exc, expr_exc, expr_rv)297def _local_async_execute(async_rv, exec_msg):298    exec_rv = _local_execute(exec_msg)299    _g_async_rvs[exec_msg.namespace][async_rv.rvid] = exec_rv300def _remote_execute(ns, exec_msg):301    rns = _g_remote_namespaces[ns]302    pythonshare._send(exec_msg, rns.to_remote)303    # _recv raises EOFError() if disconnected,304    # let it raise through.305    return pythonshare._recv(rns.from_remote)306def _remote_execute_and_forward(ns, exec_msg, to_client, peername=None):307    """returns (forward_status, info)308    forward_status values:309       True:  everything successfully forwarded,310              info contains pair (forwarded byte count, full length).311       False: not everything forwarded,312              info contains pair (forwarded byte count, full length).313              to_client file/socket is not functional.314       None:  no forwarding,315              info contains Exec_rv that should be sent normally.316    Raises EOFError if connection to remote namespace is not functional.317    The peername parameter is used for logging only.318    """319    client_supports_rv_info = exec_msg.recv_cap_data_info()320    exec_msg.set_recv_cap_data_info(True)321    rns = _g_remote_namespaces[ns]322    from_remote = rns.from_remote323    # Must keep simultaneously two locks:324    # - send lock on to_client325    # - recv lock on from_remote326    pythonshare._acquire_recv_lock(from_remote)327    try:328        pythonshare._send(exec_msg, rns.to_remote)329        response = pythonshare._recv(from_remote, acquire_recv_lock=False)330        if not isinstance(response, messages.Data_info):331            # Got direct response without forward mode332            return (None, response)333        pythonshare._acquire_send_lock(to_client)334        if client_supports_rv_info:335            # send data_info to client336            pythonshare._send(response, to_client, acquire_send_lock=False)337        try:338            if opt_debug and peername:339                daemon_log("%s:%s <= Exec_rv([forwarding %s B])" % (peername + (response.data_length,)))340            forwarded_bytes = pythonshare._forward(341                from_remote, to_client, response.data_length,342                acquire_recv_lock=False,343                acquire_send_lock=False)344            if forwarded_bytes == response.data_length:345                return (True, (forwarded_bytes, response.data_length))346            else:347                return (False, (forwarded_bytes, response.data_length))348        finally:349            pythonshare._release_send_lock(to_client)350    finally:351        exec_msg.set_recv_cap_data_info(client_supports_rv_info)352        pythonshare._release_recv_lock(from_remote)353def _connection_lost(conn_id, *closables):354    if closables:355        pythonshare._close(*closables)356    try:357        for ns in _g_namespace_users[conn_id]:358            try:359                _g_local_namespaces[ns]["pythonshare_ns"].call_on_disconnect(conn_id)360            except KeyError:361                pass362    except KeyError:363        pass364def _serve_connection(conn, conn_opts):365    global _g_async_rv_counter366    global _g_server_shutdown367    if isinstance(conn, client.Connection):368        to_client = conn._to_server369        from_client = conn._from_server370    else: # conn is a connected socket371        to_client = conn.makefile("w")372        from_client = conn.makefile("r")373    try:374        peername = conn.getpeername()375    except socket.error:376        peername = ("unknown", "?")377    if opt_debug:378        daemon_log("connected %s:%s" % peername)379    conn_id = "%s-%s" % (timestamp(), id(conn))380    auth_ok = False381    passwords = [k for k in conn_opts.keys() if k.startswith("password.")]382    kill_server_on_close = conn_opts.get("kill-server-on-close", False)383    if passwords:384        # password authentication is required for this connection385        try:386            received_password = pythonshare._recv(from_client)387        except Exception, e:388            daemon_log('error receiving password: %r' % (e,))389            received_password = None390        for password_type in passwords:391            algorithm = password_type.split(".")[1]392            if type(received_password) == str:393                if (algorithm == "plaintext" and394                    received_password == conn_opts[password_type]):395                    auth_ok = True396                elif (hasattr(hashlib, algorithm) and397                      getattr(hashlib, algorithm)(received_password).hexdigest() ==398                      conn_opts[password_type]):399                    auth_ok = True400        try:401            if auth_ok:402                pythonshare._send(messages.Auth_rv(True), to_client)403                if opt_debug:404                    daemon_log("%s:%s authentication ok" % peername)405            elif not received_password is None:406                pythonshare._send(messages.Auth_rv(False), to_client)407                if opt_debug:408                    daemon_log("%s:%s authentication failed" % peername)409        except socket.error:410            daemon_log("authentication failed due to socket error")411            auth_ok = False412    else:413        auth_ok = True # no password required414    whitelist_local = conn_opts.get("whitelist_local", None)415    while auth_ok:416        try:417            obj = pythonshare._recv(from_client)418            if opt_debug:419                daemon_log("%s:%s => %s" % (peername + (obj,)))420        except (EOFError, pythonshare.socket.error):421            break422        if isinstance(obj, messages.Register_ns):423            try:424                _init_remote_namespace(obj.ns, conn, to_client, from_client)425                pythonshare._send(messages.Ns_rv(True), to_client)426                # from this point on, this connection is reserved for427                # sending remote namespace traffic. The connection will be428                # used by other threads, this thread stops here.429                return430            except Exception, e:431                pythonshare._send(messages.Ns_rv(False, exception2string(sys.exc_info())), to_client)432        elif isinstance(obj, messages.Drop_ns):433            try:434                if obj.ns in _g_local_namespaces:435                    _drop_local_namespace(obj.ns)436                elif obj.ns in _g_remote_namespaces:437                    _drop_remote_namespace(obj.ns)438                else:439                    raise ValueError('Unknown namespace "%s"' % (obj.ns,))440                pythonshare._send(messages.Ns_rv(True), to_client)441            except Exception, e:442                if opt_debug:443                    daemon_log("namespace drop error: %s" % (e,))444                pythonshare._send(messages.Ns_rv(False, exception2string(sys.exc_info())), to_client)445        elif isinstance(obj, messages.Request_ns):446            ns = obj.ns447            if (ns in _g_remote_namespaces or448                ns in _g_local_namespaces):449                _register_exported_namespace(ns, conn)450                pythonshare._send(messages.Ns_rv(True), to_client)451                # from this point on, this connection is reserved for452                # receiving executions on requested namespace. This453                # thread starts serving the connection.454        elif isinstance(obj, messages.Exec):455            ns = obj.namespace456            if ns in _g_remote_namespaces: # execute in remote namespace457                try:458                    _fwd_status, _fwd_info = _remote_execute_and_forward(459                        ns, obj, to_client, peername)460                    if _fwd_status == True:461                        # successfully forwarded462                        if opt_debug:463                            daemon_log("%s:%s forwarded %s B" % (peername + (_fwd_info[0],)))464                        exec_rv = None # return value fully forwarded465                    elif _fwd_status == False:466                        # connection to client is broken467                        if opt_debug:468                            daemon_log("%s:%s error after forwarding %s/%s B" % (peername + _fwd_info))469                        break470                    elif _fwd_status is None:471                        # nothing forwarded, send return value by normal means472                        exec_rv = _fwd_info473                except (EOFError, socket.error):474                    daemon_log('connection lost to "%s"' % (ns,))475                    _drop_remote_namespace(ns)476                    break477            else: # execute in local namespace478                if whitelist_local == None or ns in whitelist_local:479                    _init_local_namespace(ns)480                if obj.async:481                    # asynchronous execution, return handle (Async_rv)482                    _g_async_rv_counter += 1483                    rvid = timestamp() + str(_g_async_rv_counter)484                    exec_rv = messages.Async_rv(ns, rvid)485                    _g_async_rvs[ns][rvid] = pythonshare.InProgress()486                    thread.start_new_thread(_local_async_execute, (exec_rv, obj))487                else:488                    # synchronous execution, return true return value489                    exec_rv = _local_execute(obj, conn_id)490            if not exec_rv is None:491                if opt_debug:492                    daemon_log("%s:%s <= %s" % (peername + (exec_rv,)))493                try:494                    try:495                        if obj.recv_cap_data_info():496                            info = pythonshare._send_opt(exec_rv, to_client, obj.recv_caps)497                            if info:498                                sent_info = " %s B, format:%s" % (499                                    info.data_length, info.data_format)500                            else:501                                sent_info = ""502                        else:503                            pythonshare._send(exec_rv, to_client)504                            sent_info = ""505                        if opt_debug:506                            daemon_log("%s:%s sent%s" % (peername + (sent_info,)))507                    except (EOFError, socket.error):508                        break509                except (TypeError, ValueError, cPickle.PicklingError): # pickling rv fails510                    exec_rv.expr_rv = messages.Unpicklable(exec_rv.expr_rv)511                    try:512                        pythonshare._send(exec_rv, to_client)513                    except (EOFError, socket.error):514                        break515        elif isinstance(obj, messages.Server_ctl):516            if obj.command == "die":517                ns = obj.args[0]518                if ns in _g_remote_namespaces:519                    try:520                        rv = _remote_execute(ns, obj)521                        if opt_debug:522                            daemon_log("%s:%s <= %s" % (peername + (rv,)))523                        pythonshare._send(rv, to_client)524                    except (EOFError, socket.error): # connection lost525                        daemon_log('connection lost to "%s"' % (ns,))526                        _drop_remote_namespace(ns)527                        break528                else:529                    _g_server_shutdown = True530                    server_ctl_rv = messages.Server_ctl_rv(0, "shutting down")531                    pythonshare._send(server_ctl_rv, to_client)532                    if _g_wake_server_function:533                        _g_wake_server_function()534                    break535            elif obj.command == "unlock":536                try:537                    ns = obj.args[0]538                    if ns in _g_remote_namespaces:539                        try:540                            rv = _remote_execute(ns, obj)541                        except (EOFError, socket.error): # connection lost542                            daemon_log('connection lost to "%s"' % (ns,))543                            _drop_remote_namespace(ns)544                            break545                    elif ns in _g_local_namespace_locks:546                        try:547                            _g_local_namespace_locks[ns].release()548                            server_ctl_rv = messages.Server_ctl_rv(549                                0, "%s unlocked" % (repr(ns),))550                        except thread.error, e:551                            server_ctl_rv = messages.Server_ctl_rv(552                                1, "%s already unlocked" %553                                (repr(ns),))554                    elif ns in _g_local_namespaces:555                        server_ctl_rv = messages.Server_ctl_rv(556                            2, "namespace %s is not locked" % (repr(ns),))557                    else:558                        server_ctl_rv = messages.Server_ctl_rv(559                            -1, "unknown namespace %s" % (repr(ns),))560                    if opt_debug:561                        daemon_log("%s:%s <= %s" % (peername + (server_ctl_rv,)))562                    pythonshare._send(server_ctl_rv, to_client)563                except Exception, e:564                    if opt_debug:565                        daemon_log("Exception in handling %s: %s" % (obj, e))566        else:567            daemon_log("unknown message type: %s in %s" % (type(obj), obj))568            pythonshare._send(messages.Auth_rv(False), to_client)569            auth_ok = False570    if opt_debug:571        daemon_log("disconnected %s:%s" % peername)572    _connection_lost(conn_id, to_client, from_client, conn)573    if kill_server_on_close:574        _g_server_shutdown = True575        if _g_wake_server_function:576            _g_wake_server_function()577def start_server(host, port,578                 ns_init_import_export=[],579                 conn_opts={},580                 listen_stdin=True):581    global _g_wake_server_function582    global _g_waker_lock583    daemon_log("pid: %s" % (os.getpid(),))584    # Initialise, import and export namespaces585    for task, ns, arg in ns_init_import_export:586        if task == "init":587            # If arg is a string, it will be executed in ns.588            # If arg is a dict, it will be used as ns.589            _init_local_namespace(ns, arg, force=True)590        elif task == "export":591            # Make sure ns exists before exporting.592            _init_local_namespace(ns, None, force=True)593            daemon_log('exporting "%s" to %s' % (ns, arg))594            try:595                c = pythonshare.connection(arg)596            except Exception, e:597                daemon_log('connecting to %s failed: %s' % (arg, e))598                return599            if c.export_ns(ns):600                _register_exported_namespace(ns, c)601                thread.start_new_thread(602                    _serve_connection, (c, {"kill-server-on-close": True}))603            else:604                raise ValueError('Export namespace "%s" to "%s" failed'605                                 % (ns, arg))606        elif task == "import":607            if (ns in _g_local_namespaces or608                ns in _g_remote_namespaces):609                raise ValueError('Import failed, namespace "%s" already exists'610                                 % (ns,))611            c = pythonshare.connection(arg)612            if c.import_ns(ns):613                _init_remote_namespace(ns, c, c._to_server, c._from_server)614    try:615        addrinfos = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)616        for addrinfo in addrinfos:617            daemon_log("listen: %s:%s" % (addrinfo[4][0], addrinfo[4][1]))618    except socket.error:619        daemon_log("listen: %s:%s" % (host, port))620    if isinstance(port, int):621        def wake_server_function():622            _g_waker_lock.release() # wake up server623        _g_wake_server_function = wake_server_function624        _g_waker_lock = thread.allocate_lock()625        _g_waker_lock.acquire() # unlocked626        # Start listening to the port627        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)628        try:629            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)630        except:631            pass632        s.bind((host, port))633        s.listen(4)634        event_queue = Queue.Queue()635        thread.start_new_thread(_store_return_value, (s.accept, event_queue))636        thread.start_new_thread(_store_return_value, (_g_waker_lock.acquire, event_queue))637        if not sys.stdin.closed and listen_stdin:638            daemon_log("listening to stdin")639            thread.start_new_thread(_read_lines_from_stdin, (event_queue,))640        else:641            daemon_log("not listening stdin")642        while 1:643            event = event_queue.get()644            if isinstance(event, tuple):645                # returned from s.accept646                conn, _ = event647                thread.start_new_thread(_serve_connection, (conn, conn_opts))648            elif event == True:649                # returned from _g_waker_lock.acquire650                daemon_log("shutting down.")651                break652            else:653                # returned from sys.stdin.readline654                pass655    elif port == "stdin":656        opt_debug_limit = 0657        if os.name == "nt":658            import msvcrt659            msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)660            msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)661        conn = client.Connection(sys.stdin, sys.stdout)662        _serve_connection(conn, conn_opts)663    for ns in sorted(_g_remote_namespaces.keys()):664        _drop_remote_namespace(ns)665    for ns in sorted(_g_local_namespaces.keys()):666        _drop_local_namespace(ns)667def start_daemon(host="localhost", port=8089, debug=False,668                 log_fd=None, ns_init_import_export=[], conn_opts={},669                 debug_limit=None):670    global opt_log_fd, opt_debug, opt_debug_limit671    opt_log_fd = log_fd672    opt_debug = debug673    if debug_limit != None:674        opt_debug_limit = debug_limit675    if opt_debug_limit > 0:676        messages.MSG_STRING_FIELD_MAX_LEN = max(opt_debug_limit/3-40, 40)677    else:678        messages.MSG_STRING_FIELD_MAX_LEN = None679    if opt_debug == False and not on_windows and isinstance(port, int):680        listen_stdin = False...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
