Best Python code snippet using fMBT_python
server.py
Source:server.py  
...48             datetime.datetime(1970, 1, 1)).total_seconds(),)49    else:50        rv = datetime.datetime.now().strftime("%s.%f")51    return rv52def daemon_log(msg):53    if opt_debug_limit >= 0:54        if len(msg) > opt_debug_limit:55            msg = (msg[:opt_debug_limit/2] +56                   ("...[%s B, log CRC %s]..." % (len(msg), messages.crc(msg))) +57                   msg[-opt_debug_limit/2:])58    formatted_msg = "%s %s\n" % (timestamp(), msg)59    if opt_log_fd != None:60        os.write(opt_log_fd, formatted_msg)61        if has_os_fdatasync:62            os.fdatasync(opt_log_fd)63    if opt_debug and opt_debug_limit != 0:64        sys.stdout.write(formatted_msg)65        sys.stdout.flush()66def code2string(code):67    return "\n".join(68        ["%-4s %s" % (li+1, l) for li, l in enumerate(code.splitlines())])69def exception2string(exc_info):70    return ''.join(traceback.format_exception(*exc_info))71def _store_return_value(func, queue):72    while True:73        queue.put(func())74def _read_lines_from_stdin(queue):75    while True:76        line = sys.stdin.readline()77        if not line:78            break79        queue.put(line)80    daemon_log("stdin closed")81class Pythonshare_ns(object):82    """Pythonshare services inside a namespace83    """84    def __init__(self, ns):85        self.ns = ns86        self._on_disconnect = []87        self._on_drop = []88    def ns_type(self, ns):89        """Query the type of a namespace.90        Returns "local" or "remote" if namespace exists, otherwise None.91        """92        if ns in _g_local_namespaces:93            return "local"94        elif ns in _g_remote_namespaces:95            return "remote"96        else:97            return None98    def local_nss(self):99        """List local namespaces100        """101        return _g_local_namespaces.keys()102    def remote_nss(self, ls_opts={}):103        """List remote namespaces104        """105        if "ip" in ls_opts and ls_opts["ip"] == True:106            key_peername = {}107            for k in _g_remote_namespaces.keys():108                try:109                    key_peername[k] = _g_remote_namespaces[k].conn.getpeername()110                except Exception:111                    key_peername[k] = ("?", "?")112            return key_peername113        return _g_remote_namespaces.keys()114    def on_disconnect(self):115        """Return codes that will be executed when a client has disconnected.116        """117        return self._on_disconnect118    def on_drop(self):119        """Return codes that will be executed when the namespace is dropped.120        """121        return self._on_drop122    def exec_on_disconnect(self, code, any_connection=False):123        """Add code that will be executed when client has disconnected.124        """125        if not any_connection:126            conn_id = _g_executing_pythonshare_conn_id127        else:128            conn_id = None129        self._on_disconnect.append((conn_id, code))130    def exec_on_drop(self, code):131        """Add code that will be executed when namespace is dropped.132        """133        self._on_drop.append(code)134    def set_on_disconnect(self, list_of_code):135        """Replace all "on disconnect" codes with new list of codes.136        """137        self._on_disconnect = list_of_code138    def set_on_drop(self, list_of_code):139        """Replace all "on drop" codes with new list of codes."""140        self._on_drop = list_of_code141    def call_on_disconnect(self, conn_id):142        for setter_conn_id, code in self._on_disconnect:143            if not setter_conn_id or setter_conn_id == conn_id:144                exec_msg = messages.Exec(self.ns, code, None)145                if opt_debug:146                    daemon_log("on disconnect %s: %s" % (conn_id, exec_msg,))147                rv = _local_execute(exec_msg)148                if opt_debug:149                    daemon_log("on disconnect rv: %s" % (rv,))150                if setter_conn_id == conn_id:151                    self._on_disconnect.remove((conn_id, code))152    def call_on_drop(self):153        for code in self._on_drop:154            exec_msg = messages.Exec(self.ns, code, None)155            if opt_debug:156                daemon_log("on drop: %s" % (exec_msg,))157            rv = _local_execute(exec_msg)158            if opt_debug:159                daemon_log("on drop rv: %s" % (rv,))160    def read_rv(self, async_rv):161        """Return and remove asynchronous return value.162        """163        if self.ns != async_rv.ns:164            raise ValueError("Namespace mismatch")165        if (async_rv.ns in _g_async_rvs and166            async_rv.rvid in _g_async_rvs[async_rv.ns]):167            rv = _g_async_rvs[async_rv.ns][async_rv.rvid]168            if not isinstance(rv, pythonshare.InProgress):169                del _g_async_rvs[async_rv.ns][async_rv.rvid]170            return rv171        else:172            raise ValueError('Invalid return value id: "%s"'173                             % (async_rv.rvid,))174    def poll_rvs(self):175        """Returns list of Async_rv instances that are ready for reading.176        """177        rv = []178        for rvid, value in _g_async_rvs[self.ns].iteritems():179            if not isinstance(value, pythonshare.InProgress):180                rv.append(messages.Async_rv(self.ns, rvid))181        return rv182class Pythonshare_rns(object):183    """Remote namespace"""184    def __init__(self, conn, to_remote, from_remote):185        self.conn = conn186        self.to_remote = to_remote187        self.from_remote = from_remote188    def __del__(self):189        pythonshare._close(self.conn, self.to_remote, self.from_remote)190_g_local_namespaces = {}191# client-id -> set of namespaces192_g_namespace_users = {}193_g_executing_pythonshare_conn_id = None194# _g_remote_namespaces: namespace -> Connection to origin195_g_remote_namespaces = {}196# _g_namespace_exports: namespace -> list of Connections to which the197# namespace (remote or local) has been exported. If the namespace is198# deleted (or connection to origin is lost), these Connection objects199# are to be notified.200_g_namespace_exports = {}201_g_local_namespace_locks = {}202_g_async_rvs = {}203_g_async_rv_counter = 0204_g_server_shutdown = False205def _init_local_namespace(ns, init_code=None, force=False):206    if not ns in _g_local_namespaces:207        if opt_allow_new_namespaces or force:208            daemon_log('added local namespace "%s"' % (ns,))209            _g_local_namespaces[ns] = {210                "pythonshare_ns": Pythonshare_ns(ns),211                "Async_rv": pythonshare.messages.Async_rv212            }213            _g_local_namespace_locks[ns] = thread.allocate_lock()214            _g_async_rvs[ns] = {}215        else:216            raise ValueError('Unknown namespace "%s"' % (ns,))217    if init_code != None:218        if isinstance(init_code, basestring):219            try:220                exec init_code in _g_local_namespaces[ns]221            except Exception, e:222                daemon_log('namespace "%s" init error in <string>:\n%s\n\n%s' % (223                    ns, code2string(init_code), exception2string(sys.exc_info())))224        elif isinstance(init_code, dict):225            # Directly use the dictionary (locals() or globals(), for226            # instance) as a Pythonshare namespace.227            clean_ns = _g_local_namespaces[ns]228            _g_local_namespaces[ns] = init_code229            _g_local_namespaces[ns].update(clean_ns) # copy pythonshare defaults230        else:231            raise TypeError("unsupported init_code type")232def _drop_local_namespace(ns):233    daemon_log('drop local namespace "%s"' % (ns,))234    _g_local_namespaces[ns]["pythonshare_ns"].call_on_drop()235    del _g_local_namespaces[ns]236    del _g_local_namespace_locks[ns]237    del _g_async_rvs[ns]238    # send notification to all connections in _g_namespace_exports[ns]?239def _drop_remote_namespace(ns):240    daemon_log('drop remote namespace "%s"' % (ns,))241    try:242        rns = _g_remote_namespaces[ns]243        del _g_remote_namespaces[ns]244        rns.__del__()245    except KeyError:246        pass # already dropped247    # send notification to all connections in _g_namespace_exports[ns]?248def _init_remote_namespace(ns, conn, to_remote, from_remote):249    if ns in _g_remote_namespaces:250        raise ValueError('Remote namespace "%s" already registered' % (251            ns,))252    daemon_log('added remote namespace "%s", origin "%s"' % (253        ns, conn.getpeername()))254    _g_remote_namespaces[ns] = Pythonshare_rns(conn, to_remote, from_remote)255def _register_exported_namespace(ns, conn):256    if not ns in _g_namespace_exports:257        _g_namespace_exports[ns] = []258    _g_namespace_exports[ns].append(conn)259def _local_execute(exec_msg, conn_id=None):260    global _g_executing_pythonshare_conn_id261    ns = exec_msg.namespace262    if not ns in _g_local_namespaces:263        code_exc = expr_exc = "no local namespace %s" % (ns,)264        return messages.Exec_rv(code_exc, expr_exc, None)265    if conn_id:266        if not conn_id in _g_namespace_users:267            _g_namespace_users[conn_id] = set([ns])268        else:269            _g_namespace_users[conn_id].add(ns)270    code_exc, expr_exc, expr_rv = None, None, None271    if not exec_msg.lock or _g_local_namespace_locks[ns].acquire():272        _g_executing_pythonshare_conn_id = conn_id273        try:274            if exec_msg.code not in [None, ""]:275                try:276                    exec exec_msg.code in _g_local_namespaces[ns]277                except Exception, e:278                    code_exc = exception2string(sys.exc_info())279            if exec_msg.expr not in [None, ""]:280                try:281                    expr_rv = eval(exec_msg.expr, _g_local_namespaces[ns])282                except Exception, e:283                    expr_exc = exception2string(sys.exc_info())284        finally:285            _g_executing_pythonshare_conn_id = None286            if exec_msg.lock:287                try:288                    _g_local_namespace_locks[ns].release()289                except thread.error:290                    pass # already unlocked namespace291    else:292        code_exc = expr_exc = 'locking namespace "%s" failed' % (ns,)293    if isinstance(expr_rv, pythonshare.messages.Exec_rv):294        return expr_rv295    else:296        return messages.Exec_rv(code_exc, expr_exc, expr_rv)297def _local_async_execute(async_rv, exec_msg):298    exec_rv = _local_execute(exec_msg)299    _g_async_rvs[exec_msg.namespace][async_rv.rvid] = exec_rv300def _remote_execute(ns, exec_msg):301    rns = _g_remote_namespaces[ns]302    pythonshare._send(exec_msg, rns.to_remote)303    # _recv raises EOFError() if disconnected,304    # let it raise through.305    return pythonshare._recv(rns.from_remote)306def _remote_execute_and_forward(ns, exec_msg, to_client, peername=None):307    """returns (forward_status, info)308    forward_status values:309       True:  everything successfully forwarded,310              info contains pair (forwarded byte count, full length).311       False: not everything forwarded,312              info contains pair (forwarded byte count, full length).313              to_client file/socket is not functional.314       None:  no forwarding,315              info contains Exec_rv that should be sent normally.316    Raises EOFError if connection to remote namespace is not functional.317    The peername parameter is used for logging only.318    """319    client_supports_rv_info = exec_msg.recv_cap_data_info()320    exec_msg.set_recv_cap_data_info(True)321    rns = _g_remote_namespaces[ns]322    from_remote = rns.from_remote323    # Must keep simultaneously two locks:324    # - send lock on to_client325    # - recv lock on from_remote326    pythonshare._acquire_recv_lock(from_remote)327    try:328        pythonshare._send(exec_msg, rns.to_remote)329        response = pythonshare._recv(from_remote, acquire_recv_lock=False)330        if not isinstance(response, messages.Data_info):331            # Got direct response without forward mode332            return (None, response)333        pythonshare._acquire_send_lock(to_client)334        if client_supports_rv_info:335            # send data_info to client336            pythonshare._send(response, to_client, acquire_send_lock=False)337        try:338            if opt_debug and peername:339                daemon_log("%s:%s <= Exec_rv([forwarding %s B])" % (peername + (response.data_length,)))340            forwarded_bytes = pythonshare._forward(341                from_remote, to_client, response.data_length,342                acquire_recv_lock=False,343                acquire_send_lock=False)344            if forwarded_bytes == response.data_length:345                return (True, (forwarded_bytes, response.data_length))346            else:347                return (False, (forwarded_bytes, response.data_length))348        finally:349            pythonshare._release_send_lock(to_client)350    finally:351        exec_msg.set_recv_cap_data_info(client_supports_rv_info)352        pythonshare._release_recv_lock(from_remote)353def _connection_lost(conn_id, *closables):354    if closables:355        pythonshare._close(*closables)356    try:357        for ns in _g_namespace_users[conn_id]:358            try:359                _g_local_namespaces[ns]["pythonshare_ns"].call_on_disconnect(conn_id)360            except KeyError:361                pass362    except KeyError:363        pass364def _serve_connection(conn, conn_opts):365    global _g_async_rv_counter366    global _g_server_shutdown367    if isinstance(conn, client.Connection):368        to_client = conn._to_server369        from_client = conn._from_server370    else: # conn is a connected socket371        to_client = conn.makefile("w")372        from_client = conn.makefile("r")373    try:374        peername = conn.getpeername()375    except socket.error:376        peername = ("unknown", "?")377    if opt_debug:378        daemon_log("connected %s:%s" % peername)379    conn_id = "%s-%s" % (timestamp(), id(conn))380    auth_ok = False381    passwords = [k for k in conn_opts.keys() if k.startswith("password.")]382    kill_server_on_close = conn_opts.get("kill-server-on-close", False)383    if passwords:384        # password authentication is required for this connection385        try:386            received_password = pythonshare._recv(from_client)387        except Exception, e:388            daemon_log('error receiving password: %r' % (e,))389            received_password = None390        for password_type in passwords:391            algorithm = password_type.split(".")[1]392            if type(received_password) == str:393                if (algorithm == "plaintext" and394                    received_password == conn_opts[password_type]):395                    auth_ok = True396                elif (hasattr(hashlib, algorithm) and397                      getattr(hashlib, algorithm)(received_password).hexdigest() ==398                      conn_opts[password_type]):399                    auth_ok = True400        try:401            if auth_ok:402                pythonshare._send(messages.Auth_rv(True), to_client)403                if opt_debug:404                    daemon_log("%s:%s authentication ok" % peername)405            elif not received_password is None:406                pythonshare._send(messages.Auth_rv(False), to_client)407                if opt_debug:408                    daemon_log("%s:%s authentication failed" % peername)409        except socket.error:410            daemon_log("authentication failed due to socket error")411            auth_ok = False412    else:413        auth_ok = True # no password required414    whitelist_local = conn_opts.get("whitelist_local", None)415    while auth_ok:416        try:417            obj = pythonshare._recv(from_client)418            if opt_debug:419                daemon_log("%s:%s => %s" % (peername + (obj,)))420        except (EOFError, pythonshare.socket.error):421            break422        if isinstance(obj, messages.Register_ns):423            try:424                _init_remote_namespace(obj.ns, conn, to_client, from_client)425                pythonshare._send(messages.Ns_rv(True), to_client)426                # from this point on, this connection is reserved for427                # sending remote namespace traffic. The connection will be428                # used by other threads, this thread stops here.429                return430            except Exception, e:431                pythonshare._send(messages.Ns_rv(False, exception2string(sys.exc_info())), to_client)432        elif isinstance(obj, messages.Drop_ns):433            try:434                if obj.ns in _g_local_namespaces:435                    _drop_local_namespace(obj.ns)436                elif obj.ns in _g_remote_namespaces:437                    _drop_remote_namespace(obj.ns)438                else:439                    raise ValueError('Unknown namespace "%s"' % (obj.ns,))440                pythonshare._send(messages.Ns_rv(True), to_client)441            except Exception, e:442                if opt_debug:443                    daemon_log("namespace drop error: %s" % (e,))444                pythonshare._send(messages.Ns_rv(False, exception2string(sys.exc_info())), to_client)445        elif isinstance(obj, messages.Request_ns):446            ns = obj.ns447            if (ns in _g_remote_namespaces or448                ns in _g_local_namespaces):449                _register_exported_namespace(ns, conn)450                pythonshare._send(messages.Ns_rv(True), to_client)451                # from this point on, this connection is reserved for452                # receiving executions on requested namespace. This453                # thread starts serving the connection.454        elif isinstance(obj, messages.Exec):455            ns = obj.namespace456            if ns in _g_remote_namespaces: # execute in remote namespace457                try:458                    _fwd_status, _fwd_info = _remote_execute_and_forward(459                        ns, obj, to_client, peername)460                    if _fwd_status == True:461                        # successfully forwarded462                        if opt_debug:463                            daemon_log("%s:%s forwarded %s B" % (peername + (_fwd_info[0],)))464                        exec_rv = None # return value fully forwarded465                    elif _fwd_status == False:466                        # connection to client is broken467                        if opt_debug:468                            daemon_log("%s:%s error after forwarding %s/%s B" % (peername + _fwd_info))469                        break470                    elif _fwd_status is None:471                        # nothing forwarded, send return value by normal means472                        exec_rv = _fwd_info473                except (EOFError, socket.error):474                    daemon_log('connection lost to "%s"' % (ns,))475                    _drop_remote_namespace(ns)476                    break477            else: # execute in local namespace478                if whitelist_local == None or ns in whitelist_local:479                    _init_local_namespace(ns)480                if obj.async:481                    # asynchronous execution, return handle (Async_rv)482                    _g_async_rv_counter += 1483                    rvid = timestamp() + str(_g_async_rv_counter)484                    exec_rv = messages.Async_rv(ns, rvid)485                    _g_async_rvs[ns][rvid] = pythonshare.InProgress()486                    thread.start_new_thread(_local_async_execute, (exec_rv, obj))487                else:488                    # synchronous execution, return true return value489                    exec_rv = _local_execute(obj, conn_id)490            if not exec_rv is None:491                if opt_debug:492                    daemon_log("%s:%s <= %s" % (peername + (exec_rv,)))493                try:494                    try:495                        if obj.recv_cap_data_info():496                            info = pythonshare._send_opt(exec_rv, to_client, obj.recv_caps)497                            if info:498                                sent_info = " %s B, format:%s" % (499                                    info.data_length, info.data_format)500                            else:501                                sent_info = ""502                        else:503                            pythonshare._send(exec_rv, to_client)504                            sent_info = ""505                        if opt_debug:506                            daemon_log("%s:%s sent%s" % (peername + (sent_info,)))507                    except (EOFError, socket.error):508                        break509                except (TypeError, ValueError, cPickle.PicklingError): # pickling rv fails510                    exec_rv.expr_rv = messages.Unpicklable(exec_rv.expr_rv)511                    try:512                        pythonshare._send(exec_rv, to_client)513                    except (EOFError, socket.error):514                        break515        elif isinstance(obj, messages.Server_ctl):516            if obj.command == "die":517                ns = obj.args[0]518                if ns in _g_remote_namespaces:519                    try:520                        rv = _remote_execute(ns, obj)521                        if opt_debug:522                            daemon_log("%s:%s <= %s" % (peername + (rv,)))523                        pythonshare._send(rv, to_client)524                    except (EOFError, socket.error): # connection lost525                        daemon_log('connection lost to "%s"' % (ns,))526                        _drop_remote_namespace(ns)527                        break528                else:529                    _g_server_shutdown = True530                    server_ctl_rv = messages.Server_ctl_rv(0, "shutting down")531                    pythonshare._send(server_ctl_rv, to_client)532                    if _g_wake_server_function:533                        _g_wake_server_function()534                    break535            elif obj.command == "unlock":536                try:537                    ns = obj.args[0]538                    if ns in _g_remote_namespaces:539                        try:540                            rv = _remote_execute(ns, obj)541                        except (EOFError, socket.error): # connection lost542                            daemon_log('connection lost to "%s"' % (ns,))543                            _drop_remote_namespace(ns)544                            break545                    elif ns in _g_local_namespace_locks:546                        try:547                            _g_local_namespace_locks[ns].release()548                            server_ctl_rv = messages.Server_ctl_rv(549                                0, "%s unlocked" % (repr(ns),))550                        except thread.error, e:551                            server_ctl_rv = messages.Server_ctl_rv(552                                1, "%s already unlocked" %553                                (repr(ns),))554                    elif ns in _g_local_namespaces:555                        server_ctl_rv = messages.Server_ctl_rv(556                            2, "namespace %s is not locked" % (repr(ns),))557                    else:558                        server_ctl_rv = messages.Server_ctl_rv(559                            -1, "unknown namespace %s" % (repr(ns),))560                    if opt_debug:561                        daemon_log("%s:%s <= %s" % (peername + (server_ctl_rv,)))562                    pythonshare._send(server_ctl_rv, to_client)563                except Exception, e:564                    if opt_debug:565                        daemon_log("Exception in handling %s: %s" % (obj, e))566        else:567            daemon_log("unknown message type: %s in %s" % (type(obj), obj))568            pythonshare._send(messages.Auth_rv(False), to_client)569            auth_ok = False570    if opt_debug:571        daemon_log("disconnected %s:%s" % peername)572    _connection_lost(conn_id, to_client, from_client, conn)573    if kill_server_on_close:574        _g_server_shutdown = True575        if _g_wake_server_function:576            _g_wake_server_function()577def start_server(host, port,578                 ns_init_import_export=[],579                 conn_opts={},580                 listen_stdin=True):581    global _g_wake_server_function582    global _g_waker_lock583    daemon_log("pid: %s" % (os.getpid(),))584    # Initialise, import and export namespaces585    for task, ns, arg in ns_init_import_export:586        if task == "init":587            # If arg is a string, it will be executed in ns.588            # If arg is a dict, it will be used as ns.589            _init_local_namespace(ns, arg, force=True)590        elif task == "export":591            # Make sure ns exists before exporting.592            _init_local_namespace(ns, None, force=True)593            daemon_log('exporting "%s" to %s' % (ns, arg))594            try:595                c = pythonshare.connection(arg)596            except Exception, e:597                daemon_log('connecting to %s failed: %s' % (arg, e))598                return599            if c.export_ns(ns):600                _register_exported_namespace(ns, c)601                thread.start_new_thread(602                    _serve_connection, (c, {"kill-server-on-close": True}))603            else:604                raise ValueError('Export namespace "%s" to "%s" failed'605                                 % (ns, arg))606        elif task == "import":607            if (ns in _g_local_namespaces or608                ns in _g_remote_namespaces):609                raise ValueError('Import failed, namespace "%s" already exists'610                                 % (ns,))611            c = pythonshare.connection(arg)612            if c.import_ns(ns):613                _init_remote_namespace(ns, c, c._to_server, c._from_server)614    try:615        addrinfos = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)616        for addrinfo in addrinfos:617            daemon_log("listen: %s:%s" % (addrinfo[4][0], addrinfo[4][1]))618    except socket.error:619        daemon_log("listen: %s:%s" % (host, port))620    if isinstance(port, int):621        def wake_server_function():622            _g_waker_lock.release() # wake up server623        _g_wake_server_function = wake_server_function624        _g_waker_lock = thread.allocate_lock()625        _g_waker_lock.acquire() # unlocked626        # Start listening to the port627        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)628        try:629            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)630        except:631            pass632        s.bind((host, port))633        s.listen(4)634        event_queue = Queue.Queue()635        thread.start_new_thread(_store_return_value, (s.accept, event_queue))636        thread.start_new_thread(_store_return_value, (_g_waker_lock.acquire, event_queue))637        if not sys.stdin.closed and listen_stdin:638            daemon_log("listening to stdin")639            thread.start_new_thread(_read_lines_from_stdin, (event_queue,))640        else:641            daemon_log("not listening stdin")642        while 1:643            event = event_queue.get()644            if isinstance(event, tuple):645                # returned from s.accept646                conn, _ = event647                thread.start_new_thread(_serve_connection, (conn, conn_opts))648            elif event == True:649                # returned from _g_waker_lock.acquire650                daemon_log("shutting down.")651                break652            else:653                # returned from sys.stdin.readline654                pass655    elif port == "stdin":656        opt_debug_limit = 0657        if os.name == "nt":658            import msvcrt659            msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)660            msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)661        conn = client.Connection(sys.stdin, sys.stdout)662        _serve_connection(conn, conn_opts)663    for ns in sorted(_g_remote_namespaces.keys()):664        _drop_remote_namespace(ns)...ninja_scons_daemon.py
Source:ninja_scons_daemon.py  
...68    filemode="a",69    format="%(asctime)s %(message)s",70    level=logging.DEBUG,71)72def daemon_log(message):73    logging.debug(message)74def custom_readlines(handle, line_separator="\n", chunk_size=1):75    buf = ""76    while not handle.closed:77        data = handle.read(chunk_size)78        if not data:79            break80        buf += data.decode("utf-8")81        if line_separator in buf:82            chunks = buf.split(line_separator)83            buf = chunks.pop()84            for chunk in chunks:85                yield chunk + line_separator86        if buf.endswith("scons>>>"):87            yield buf88            buf = ""89def custom_readerr(handle, line_separator="\n", chunk_size=1):90    buf = ""91    while not handle.closed:92        data = handle.read(chunk_size)93        if not data:94            break95        buf += data.decode("utf-8")96        if line_separator in buf:97            chunks = buf.split(line_separator)98            buf = chunks.pop()99            for chunk in chunks:100                yield chunk + line_separator101def enqueue_output(out, queue):102    for line in iter(custom_readlines(out)):103        queue.put(line)104    out.close()105def enqueue_error(err, queue):106    for line in iter(custom_readerr(err)):107        queue.put(line)108    err.close()109input_q = queue.Queue()110output_q = queue.Queue()111error_q = queue.Queue()112building_cv = Condition()113error_cv = Condition()114class StateInfo:115    def __init__(self) -> None:116        self.thread_error = False117        self.finished_building = []118        self.error_nodes = []119        self.startup_failed = False120        self.startup_output = ''121        self.daemon_needs_to_shutdown = False122        self.httpd = None123shared_state = StateInfo()124def sigint_func(signum, frame):125    global shared_state126    shared_state.daemon_needs_to_shutdown = True127signal.signal(signal.SIGINT, sigint_func)128def daemon_thread_func():129    global shared_state130    try:131        args_list = args + ["--interactive"]132        daemon_log(f"Starting daemon with args: {' '.join(args_list)}")133        daemon_log(f"cwd: {os.getcwd()}")134        p = Popen(args_list, stdout=PIPE, stderr=PIPE, stdin=PIPE)135        t = threading.Thread(target=enqueue_output, args=(p.stdout, output_q))136        t.daemon = True137        t.start()138        te = threading.Thread(target=enqueue_error, args=(p.stderr, error_q))139        te.daemon = True140        te.start()141        daemon_ready = False142        143        building_node = None144        startup_complete = False145        # While scons interactive process is stil running...146        while p.poll() is None:147            # while there is scons output to process148            while True:149                try:150                    line = output_q.get(block=False, timeout=0.01)151                except queue.Empty:152                    # breaks out of the output processing loop153                    break154                else:155                    daemon_log("output: " + line.strip())156                    if not startup_complete:157                        shared_state.startup_output += line158                    if "scons: building terminated because of errors." in line:159                        error_output = ""160                        while True:161                            try:162                                error_output += error_q.get(block=False, timeout=0.01)163                            except queue.Empty:164                                break165                        shared_state.error_nodes += [{"node": building_node, "error": error_output}]166                        daemon_ready = True167                        building_node = None168                        with building_cv:169                            building_cv.notify()170                    elif line == "scons>>>":171                        shared_state.startup_output = ''172                        startup_complete = True173                        with error_q.mutex:174                            error_q.queue.clear()175                        daemon_ready = True176                        with building_cv:177                            building_cv.notify()178                        building_node = None179            # while there is input to process...180            while daemon_ready and not input_q.empty():181                try:182                    building_node = input_q.get(block=False, timeout=0.01)183                except queue.Empty:184                    break185                if "exit" in building_node:186                    daemon_log("input: " + "exit")187                    p.stdin.write("exit\n".encode("utf-8"))188                    p.stdin.flush()189                    with building_cv:190                        shared_state.finished_building += [building_node]191                    daemon_ready = False192                    shared_state.daemon_needs_to_shutdown = True193                    break194                else:195                    input_command = "build " + building_node + "\n"196                    daemon_log("input: " + input_command.strip())197                    p.stdin.write(input_command.encode("utf-8"))198                    p.stdin.flush()199                    with building_cv:200                        shared_state.finished_building += [building_node]201                    daemon_ready = False202            if shared_state.daemon_needs_to_shutdown:203                break204            time.sleep(0.01)205        # our scons process is done, make sure we are shutting down in this case206        if not shared_state.daemon_needs_to_shutdown:207            if not startup_complete:208                shared_state.startup_failed = True209            shared_state.daemon_needs_to_shutdown = True210    except Exception:211        shared_state.thread_error = True212        daemon_log("SERVER ERROR: " + traceback.format_exc())213        raise214daemon_thread = threading.Thread(target=daemon_thread_func)215daemon_thread.daemon = True216daemon_thread.start()217logging.debug(218    f"Starting request server on port {port}, keep alive: {daemon_keep_alive}"219)220keep_alive_timer = timer()221def server_thread_func():222    global shared_state223    class S(http.server.BaseHTTPRequestHandler):224        def do_GET(self):225            global shared_state226            global keep_alive_timer227            try:228                gets = parse_qs(urlparse(self.path).query)229                230                # process a request from ninja for a node for scons to build. 231                # Currently this is a serial process because scons interactive is serial232                # is it was originally meant for a real human user to be providing input233                # parallel input was never implemented.234                build = gets.get("build")235                if build:236                    keep_alive_timer = timer()237                    daemon_log(f"Got request: {build[0]}")238                    input_q.put(build[0])239                    def pred():240                        return build[0] in shared_state.finished_building241                    with building_cv:242                        building_cv.wait_for(pred)243                    for error_node in shared_state.error_nodes:244                        if error_node["node"] == build[0]:245                            self.send_response(500)246                            self.send_header("Content-type", "text/html")247                            self.end_headers()248                            self.wfile.write(error_node["error"].encode())249                            return250                    self.send_response(200)251                    self.send_header("Content-type", "text/html")252                    self.end_headers()253                    return254                # this message is used in server startup, to make sure the server launched255                # successfully. If SCons interactive got to a input prompt (scons>>>), then256                # the server is ready to start processing commands. Otherwise the server will257                # send an error response back to ninja and shut itself down.258                ready = gets.get("ready")259                if ready:260                    if shared_state.startup_failed:261                        self.send_response(500)262                        self.send_header("Content-type", "text/html")263                        self.end_headers()264                        self.wfile.write(shared_state.startup_output.encode())265                        return266                exitbuild = gets.get("exit")267                if exitbuild:268                    input_q.put("exit")269                self.send_response(200)270                self.send_header("Content-type", "text/html")271                self.end_headers()272            except Exception:273                shared_state.thread_error = True274                daemon_log("SERVER ERROR: " + traceback.format_exc())275                raise276            def log_message(self, format, *args):277                return278    socketserver.TCPServer.allow_reuse_address = True279    shared_state.httpd = socketserver.TCPServer(("127.0.0.1", port), S)280    shared_state.httpd.serve_forever()281server_thread = threading.Thread(target=server_thread_func)282server_thread.daemon = True283server_thread.start()284while (timer() - keep_alive_timer < daemon_keep_alive 285        and not shared_state.thread_error 286        and not shared_state.daemon_needs_to_shutdown):287    time.sleep(1)288if shared_state.thread_error:289    daemon_log(f"Shutting server on port {port} down because thread error.")290elif shared_state.daemon_needs_to_shutdown:291    daemon_log("Server shutting down upon request.")292else:293    daemon_log(294        f"Shutting server on port {port} down because timed out: {daemon_keep_alive}"295    )296shared_state.httpd.shutdown()297if os.path.exists(ninja_builddir / "scons_daemon_dirty"):298    os.unlink(ninja_builddir / "scons_daemon_dirty")299if os.path.exists(daemon_dir / "pidfile"):300    os.unlink(daemon_dir / "pidfile")301# Local Variables:302# tab-width:4303# indent-tabs-mode:nil304# End:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
