Best Python code snippet using localstack_python
ush.py
Source:ush.py  
1import collections2import contextlib3import errno4import glob5import os6import re7import subprocess8import sys9import types10__all__ = ('Shell', 'Command', 'InvalidPipeline', 'AlreadyRedirected',11           'ProcessError')12STDOUT = subprocess.STDOUT13PIPE = subprocess.PIPE14# Some opts have None as a valid value, so we use EMPTY as a default value when15# reading opts to determine if an option was passed.16EMPTY = object()17# Cross/platform /dev/null specifier alias18NULL = os.devnull19MAX_CHUNK_SIZE = 0xffff20GLOB_PATTERNS = re.compile(r'(?:\*|\?|\[[^\]]+\])')21GLOB_OPTS = {}22# We have python2/3 compatibility, but don't want to rely on `six` package so23# this script can be used independently.24try:25    xrange26    from Queue import Queue, Empty27    import StringIO28    StringIO = BytesIO = StringIO.StringIO29    def is_string(o):30        return isinstance(o, basestring)31    to_cstr = str32    PY3 = False33except NameError:34    xrange = range35    from queue import Queue, Empty36    import io37    StringIO = io.StringIO38    BytesIO = io.BytesIO39    def is_string(o):40        return isinstance(o, str) or isinstance(o, bytes)41    def to_cstr(obj):42        if isinstance(obj, bytes):43            return obj44        return str(obj).encode('utf-8')45    PY3 = True46    if sys.version_info >= (3, 5):47        GLOB_OPTS = {'recursive': True}48if sys.platform == 'win32':49    import threading50    def set_extra_popen_opts(opts):51        pass52    def concurrent_communicate(proc, read_streams):53        return concurrent_communicate_with_threads(proc, read_streams)54else:55    import select56    from signal import signal, SIGPIPE, SIG_DFL57    _PIPE_BUF = getattr(select, 'PIPE_BUF', 512)58    def set_extra_popen_opts(opts):59        user_preexec_fn = opts.get('preexec_fn', None)60        def preexec_fn():61            if user_preexec_fn:62                user_preexec_fn()63            # Restore SIGPIPE default handler when forked. This is required for64            # handling pipelines correctly.65            signal(SIGPIPE, SIG_DFL)66        opts['preexec_fn'] = preexec_fn67    def concurrent_communicate(proc, read_streams):68        return concurrent_communicate_with_select(proc, read_streams)69class InvalidPipeline(Exception):70    pass71class AlreadyRedirected(Exception):72    pass73class ProcessError(Exception):74    def __init__(self, process_info):75        msg = 'One or more commands failed: {}'.format(process_info)76        super(ProcessError, self).__init__(msg)77        self.process_info = process_info78def expand_filenames(argv, cwd):79    def expand_arg(arg):80        return [os.path.relpath(p, cwd)81                for p in glob.iglob(os.path.join(cwd, arg), **GLOB_OPTS)]82    rv = [argv[0]]83    for arg in argv[1:]:84        if arg and arg[0] != '-' and GLOB_PATTERNS.search(arg):85            rv += expand_arg(arg)86        else:87            rv.append(arg)88    return rv89def update_opts_env(opts, extra_env):90    if extra_env is None:91        del opts['env']92        return93    env = opts.get('env', None)94    if env is None:95        env = {}96    else:97        env = env.copy()98    env.update(extra_env)99    opts['env'] = env100def set_environment(proc_opts):101    env = proc_opts.get('env', None)102    if env is None:103        return104    new_env = {}105    if proc_opts.get('merge_env', True):106        new_env.update(os.environ)107    new_env.update(env)108    # unset environment variables set to `None`109    for k in list(new_env.keys()):110        if new_env[k] is None: del new_env[k]111    proc_opts['env'] = new_env112def fileobj_has_fileno(fileobj):113    try:114        fileobj.fileno()115        return True116    except Exception:117        return False118def remove_invalid_opts(opts):119    new_opts = {}120    new_opts.update(opts)121    for opt in ('raise_on_error', 'merge_env', 'glob'):122        if opt in new_opts: del new_opts[opt] 123    return new_opts124LS = os.linesep125LS_LEN = len(LS)126def iterate_lines(chunk_iterator, trim_trailing_lf=False):127    remaining = {}128    for chunk, stream_id in chunk_iterator:129        chunk = remaining.get(stream_id, '') + chunk.decode('utf-8')130        last_ls_index = -LS_LEN131        while True:132            start = last_ls_index + LS_LEN133            try:134                ls_index = chunk.index(LS, start)135            except ValueError:136                remaining[stream_id] = chunk[last_ls_index + LS_LEN:]137                break138            yield chunk[start:ls_index], stream_id139            remaining[stream_id] = chunk[ls_index + LS_LEN:]140            last_ls_index = ls_index141    for stream_id in remaining:142        line = remaining[stream_id]143        if line or not trim_trailing_lf:144            yield line, stream_id145def validate_pipeline(commands):146    for index, command in enumerate(commands):147        is_first = index == 0148        is_last = index == len(commands) - 1149        if not is_first and command.opts.get('stdin', None) is not None:150            msg = (151                'Command {0} is not the first in the pipeline and has '152                'stdin set to a value different than "None"'153            ).format(command)154            raise InvalidPipeline(msg)155        if not is_last and command.opts.get('stdout', None) is not None:156            msg = (157                'Command {0} is not the last in the pipeline and has '158                'stdout set to a value different than "None"'159            ).format(command)160            raise InvalidPipeline(msg)161def wait(procs, raise_on_error):162    status_codes = []163    result = tuple(iterate_outputs(procs, raise_on_error, status_codes))164    assert result == ()165    return tuple(status_codes)166def iterate_outputs(procs, raise_on_error, status_codes):167    read_streams = [proc.stderr_stream for proc in procs if proc.stderr]168    if procs[-1].stdout:169        read_streams.append(procs[-1].stdout_stream)170    write_stream = procs[0].stdin_stream if procs[0].stdin else None171    co = communicate(procs)172    wchunk = None173    while True:174        try:175            ri = co.send(wchunk)176            if ri:177                rchunk, i = ri178                if read_streams[i]:179                    read_streams[i].write(rchunk)180                else:181                    yield ri182        except StopIteration:183            break184        try:185            wchunk = next(write_stream) if write_stream else None186        except StopIteration:187            wchunk = None188    status_codes += [proc.wait() for proc in procs]189    if raise_on_error and len(list(filter(lambda c: c != 0, status_codes))):190        process_info = [191            (proc.argv, proc.pid, proc.returncode) for proc in procs192        ]193        raise ProcessError(process_info)194def write_chunk(proc, chunk):195    try:196        proc.stdin.write(to_cstr(chunk))197    except IOError as e:198        if e.errno == errno.EPIPE:199            # communicate() should ignore broken pipe error200            pass201        elif (e.errno == errno.EINVAL and proc.poll() is not None):202            # Issue #19612: stdin.write() fails with EINVAL203            # if the process already exited before the write204            pass205        else:206            raise207def communicate(procs):208    # make a list of (readable streams, sinks) tuples209    read_streams = [proc.stderr for proc in procs if proc.stderr]210    if procs[-1].stdout:211        read_streams.append(procs[-1].stdout)212    writer = procs[0]213    if len(read_streams + [w for w in [writer] if w.stdin]) > 1:214        return concurrent_communicate(writer, read_streams)215    if writer.stdin or len(read_streams) == 1:216        return simple_communicate(writer, read_streams)217    else:218        return stub_communicate()219def stub_communicate():220    return221    yield222def simple_communicate(proc, read_streams):223    if proc.stdin:224        while True:225            chunk = yield226            if not chunk:227                break228            write_chunk(proc, chunk)229        proc.stdin.close()230    else:231        read_stream = read_streams[0]232        while True:233            chunk = read_stream.read(MAX_CHUNK_SIZE)234            if not chunk:235                break236            yield (chunk, 0)237def concurrent_communicate_with_select(proc, read_streams):238    reading = [] + read_streams239    writing = [proc.stdin] if proc.stdin else []240    indexes = dict((r.fileno(), i) for i, r in enumerate(read_streams))241    write_queue = collections.deque()242    while reading or writing:243        try:244            rlist, wlist, xlist = select.select(reading, writing, [])245        except select.error as e:246            if e.args[0] == errno.EINTR:247                continue248            raise249        for rstream in rlist:250            rchunk = os.read(rstream.fileno(), MAX_CHUNK_SIZE)251            if not rchunk:252                rstream.close()253                reading.remove(rstream)254                continue255            write_queue.append((yield rchunk, indexes[rstream.fileno()]))256        if not write_queue:257            write_queue.append((yield))258        if not wlist:259            continue260        while write_queue:261            wchunk = write_queue.popleft()262            if wchunk is None:263                assert not write_queue264                writing = []265                proc.stdin.close()266                break267            wchunk = to_cstr(wchunk)268            chunk = wchunk[:_PIPE_BUF]269            if len(wchunk) > _PIPE_BUF:270                write_queue.appendleft(wchunk[_PIPE_BUF:])271            try:272                written = os.write(proc.stdin.fileno(), chunk)273            except OSError as e:274                if e.errno != errno.EPIPE:275                    raise276                writing = []277                proc.stdin.close()278            else:279                if len(chunk) > written:280                    write_queue.appendleft(chunk[written:])281                    # break so we wait for the pipe buffer to be drained282                    break283def concurrent_communicate_with_threads(proc, read_streams):284    def read(queue, read_stream, index):285        while True:286            chunk = read_stream.read(MAX_CHUNK_SIZE)287            if not chunk:288                break289            queue.put((chunk, index))290        queue.put((None, index))291    def write(queue, proc):292        while True:293            chunk = queue.get()294            if not chunk:295                break296            write_chunk(proc, chunk)297        proc.stdin.close()298    rqueue = Queue(maxsize=1)299    wqueue = Queue()300    threads = []301    for i, rs in enumerate(read_streams):302        threads.append(threading.Thread(target=read, args=(rqueue, rs, i)))303        threads[-1].setDaemon(True)304        threads[-1].start()305    if proc.stdin:306        threads.append(threading.Thread(target=write, args=(wqueue, proc)))307        threads[-1].setDaemon(True)308        threads[-1].start()309    writing = True310    reading = len(read_streams)311    while writing or reading or rqueue.qsize():312        if reading or rqueue.qsize():313            try:314                rchunk, index = rqueue.get(block=not writing)315                if rchunk:316                    wchunk = yield rchunk, index317                else:318                    reading -= 1319                    continue320            except Empty:321                wchunk = yield322        else:323            wchunk = yield324        if writing:325            wqueue.put(wchunk)326            writing = wchunk is not None327    for t in threads:328        t.join()329def setup_redirect(proc_opts, key):330    stream = proc_opts.get(key, None)331    if stream in (None, STDOUT, PIPE) or fileobj_has_fileno(stream):332        # Simple case which will be handled automatically by Popen: stream is333        # STDOUT/PIPE or a file object backed by file.334        return None, False335    if is_string(stream):336        # stream is a string representing a filename, we'll open the file with337        # appropriate mode which will be set to proc_opts[key].338        if key == 'stdin':339            proc_opts[key] = open(stream, 'rb')340        else:341            if stream.endswith('+'):342                proc_opts[key] = open(stream[:-1], 'ab')343                # On MS Windows we need to explicitly the file position to the344                # end or the file contents will be replaced.345                proc_opts[key].seek(0, os.SEEK_END)346            else:347                proc_opts[key] = open(stream, 'wb')348        return None, True349    if key == 'stdin':350        if hasattr(stream, 'read'):351            # replace with an iterator that yields data in up to 64k chunks.352            # This is done to avoid the yield-by-line logic when iterating353            # file-like objects that contain binary data.354            stream = fileobj_to_iterator(stream)355        elif hasattr(stream, '__iter__'):356            stream = iter(stream)357    proc_opts[key] = PIPE358    return stream, False359def fileobj_to_iterator(fobj):360    def iterator():361        while True:362            data = fobj.read(MAX_CHUNK_SIZE)363            if not data:364                break365            yield data366    return iterator()367def echo(s):368    if isinstance(s, str):369        return StringIO(s)370    else:371        assert isinstance(s, bytes)372        return BytesIO(s)373class RunningProcess(object):374    def __init__(self, popen, stdin_stream, stdout_stream, stderr_stream,375                 argv):376        self.popen = popen377        self.stdin_stream = stdin_stream378        self.stdout_stream = stdout_stream379        self.stderr_stream = stderr_stream380        self.argv = argv381    @property382    def returncode(self):383        return self.popen.returncode384    @property385    def stdin(self):386        return self.popen.stdin387    @property388    def stdout(self):389        return self.popen.stdout390    @property391    def stderr(self):392        return self.popen.stderr393    @property394    def pid(self):395        return self.popen.pid396    def wait(self):397        return self.popen.wait()398    def poll(self):399        return self.popen.poll()400class Shell(object):401    def __init__(self, **defaults):402        self.aliases = {}403        self.envstack = []404        self.dirstack = []405        if 'env' in defaults:406            self.envstack.append(defaults['env'])407            del defaults['env']408        if 'cwd' in defaults:409            self.dirstack.append(defaults['cwd'])410            del defaults['cwd']411        self.defaults = defaults412        self.echo = echo413    def __call__(self, *argvs, **opts):414        rv = []415        for argv in argvs:416            if is_string(argv):417                argv = self.aliases.get(argv, argv)418                if is_string(argv):419                    argv = [argv]420            rv.append(Command(argv, shell=self, **opts))421        return rv[0] if len(rv) == 1 else rv422    @contextlib.contextmanager423    def setenv(self, env):424        self.envstack.append(env)425        yield426        e = self.envstack.pop()427        assert e == env428    @contextlib.contextmanager429    def chdir(self, path):430        path = str(path)  # allow pathlib.Path instances431        if path[0] != '/':432            # not absolute path, consider the current stack and join with the433            # last path434            if self.dirstack:435                path = os.path.normpath('{}/{}'.format(self.dirstack[-1],436                                                       path))437        self.dirstack.append(path)438        yield439        p = self.dirstack.pop()440        assert p == path441    def alias(self, **aliases):442        self.aliases.update(aliases)443    def export_as_module(self, module_name, full_name=False):444        if full_name:445            sys.modules[module_name] = ShellModule(self, module_name)446            return447        if module_name in globals():448            raise Exception('Name "{}" is already taken'.format(module_name))449        full_module_name = __name__ + '.' + module_name450        module = ShellModule(self, full_module_name)451        sys.modules[full_module_name] = module452        globals()[module_name] = module453class ShellModule(types.ModuleType):454    def __init__(self, shell, name):455        self.__shell = shell456        self.__file__ = '<frozen>'457        self.__name__ = name458        self.__package__ = __package__459        self.__loader__ = None460    def __repr__(self):461        return repr(self.__shell)462    def __getattr__(self, name):463        if name.startswith('__'):464            return super(ShellModule, self).__getattr__(name) 465        attr = getattr(self.__shell, name, None)466        if attr and name != 'export_as_module':467            return attr468        return self.__shell(name)469class PipelineBasePy3(object):470    def __bytes__(self):471        return self._collect_output()472    def __str__(self):473        return bytes(self).decode('utf-8')474class PipelineBasePy2(object):475    def __str__(self):476        return self._collect_output()477    def __unicode__(self):478        return str(self).decode('utf-8')479class Pipeline(PipelineBasePy3 if PY3 else PipelineBasePy2):480    def __init__(self, commands):481        validate_pipeline(commands)482        self.commands = commands483    def __repr__(self):484        return ' | '.join((repr(c) for c in self.commands))485    def __or__(self, other):486        if isinstance(other, Shell):487            return other(self)488        elif hasattr(other, 'write') or is_string(other):489            return Pipeline(self.commands[:-1] +490                            [self.commands[-1]._redirect('stdout', other)])491        assert isinstance(other, Command)492        return Pipeline(self.commands + [other])493    def __ror__(self, other):494        if hasattr(other, '__iter__') or is_string(other):495            return Pipeline([self.commands[0]._redirect('stdin', other)] +496                            self.commands[1:])497        assert False, "Invalid"498    def __call__(self):499        procs, raise_on_error = self._spawn()500        return wait(procs, raise_on_error)501    def __iter__(self):502        return self._iter(False)503    def _iter(self, raw):504        pipeline = Pipeline(self.commands[:-1] +505                            [self.commands[-1]._redirect('stdout', PIPE)])506        procs, raise_on_error = pipeline._spawn()507        pipe_count = sum(1 for proc in procs if proc.stderr)508        if procs[-1].stdout:509            pipe_count += 1510        if not pipe_count:511            wait(procs, raise_on_error)512            # nothing to yield513            return514        iterator = iterate_outputs(procs, raise_on_error, [])515        if not raw:516            iterator = iterate_lines(iterator, trim_trailing_lf=True)517        if pipe_count == 1:518            for line, stream_index in iterator:519                yield line520        else:521            for line, stream_index in iterator:522                yield tuple(line if stream_index == index else None523                            for index in xrange(pipe_count))524    def _collect_output(self):525        sink = BytesIO()526        (self | sink)()527        return sink.getvalue()528    def _spawn(self):529        procs = []530        raise_on_error = False531        for index, command in enumerate(self.commands):532            close_in = False533            close_out = False534            close_err = False535            is_first = index == 0536            is_last = index == len(self.commands) - 1537            stdin_stream = None538            stdout_stream = None539            stderr_stream = None540            # copy argv/opts541            proc_argv = [str(a) for a in command.argv]542            proc_opts = command.copy_opts()543            raise_on_error = raise_on_error or proc_opts.get('raise_on_error',544                                                             False)545            if is_first:546                # first command in the pipeline may redirect stdin547                stdin_stream, close_in = setup_redirect(proc_opts, 'stdin')548            else:549                # only set current process stdin if it is not the first in the550                # pipeline.551                proc_opts['stdin'] = procs[-1].stdout552            if is_last:553                # last command in the pipeline may redirect stdout554                stdout_stream, close_out = setup_redirect(proc_opts, 'stdout')555            else:556                # only set current process stdout if it is not the last in the557                # pipeline.558                proc_opts['stdout'] = PIPE559            # stderr may be set at any point in the pipeline560            stderr_stream, close_err = setup_redirect(proc_opts, 'stderr')561            set_extra_popen_opts(proc_opts)562            if proc_opts.get('glob', False):563                proc_argv = expand_filenames(564                    proc_argv, os.path.realpath(565                        proc_opts.get('cwd', os.curdir)))566            set_environment(proc_opts)567            current_proc = RunningProcess(568                subprocess.Popen(proc_argv, **remove_invalid_opts(proc_opts)),569                stdin_stream, stdout_stream, stderr_stream, proc_argv570                )571            # if files were opened and connected to the process stdio, close572            # our copies of the descriptors573            if close_in:574                proc_opts['stdin'].close()575            if close_out:576                proc_opts['stdout'].close()577            if close_err:578                proc_opts['stderr'].close()579            if not is_first:580                # close our copy of the previous process's stdout, now that it581                # is connected to the current process's stdin582                procs[-1].stdout.close()583            procs.append(current_proc)584        return procs, raise_on_error585    def iter_raw(self):586        return self._iter(True)587class Command(object):588    OPTS = ('stdin', 'stdout', 'stderr', 'env', 'cwd', 'preexec_fn',589            'raise_on_error', 'merge_env', 'glob')590    def __init__(self, argv, shell=None, **opts):591        self.argv = tuple(argv)592        self.shell = shell593        self.opts = {}594        for key in opts:595            if key not in Command.OPTS:596                raise TypeError('Invalid keyword argument "{}"'.format(key))597            value = opts[key]598            if key == 'cwd' and value is not None:599                value = str(value)  # allow pathlib.Path instances600            self.opts[key] = value601    def __call__(self, *argv, **opts):602        if not argv and not opts:603            # invoke the command604            return Pipeline([self])()605        new_opts = self.opts.copy()606        if 'env' in opts:607            update_opts_env(new_opts, opts['env'])608            del opts['env']609        for key in Command.OPTS:610            if key in opts:611                new_opts[key] = opts[key]612        return Command(self.argv + argv, shell=self.shell, **new_opts)613    def __repr__(self):614        argstr = ' '.join(self.argv)615        optstr = ' '.join(616            '{}={}'.format(key, self.get_opt(key))617            for key in self.iter_opts() if self.get_opt(key, None) is not None618            )619        if optstr:620            return '{0} ({1})'.format(argstr, optstr)621        else:622            return argstr623    def __str__(self):624        return str(Pipeline([self]))625    def __bytes__(self):626        return bytes(Pipeline([self]))627    def __unicode__(self):628        return unicode(Pipeline([self]))629    def __iter__(self):630        return iter(Pipeline([self]))631    def __or__(self, other):632        return Pipeline([self]) | other633    def __ror__(self, other):634        return other | Pipeline([self])635    def _redirect(self, key, stream):636        if self.get_opt(key, None) is not None:637            raise AlreadyRedirected('command already redirects ' + key)638        return self(**{key: stream})639    def iter_raw(self):640        return Pipeline([self]).iter_raw()641    def get_env(self):642        if not self.shell.envstack and 'env' not in self.opts:643            return None644        env = {}645        for e in self.shell.envstack:646            env.update(e)647        env.update(self.opts.get('env', {}))648        return env649    def get_cwd(self):650        cwd = self.opts.get('cwd', None)651        if cwd:652            return cwd653        if self.shell.dirstack:654            return self.shell.dirstack[-1]655        return None656    def get_opt(self, opt, default=None):657        if opt == 'env':658            return self.get_env()659        if opt == 'cwd':660            return self.get_cwd()661        rv = self.opts.get(opt, EMPTY)662        if rv is EMPTY:663            rv = self.shell.defaults.get(opt, EMPTY)664        if rv is EMPTY:665            return default666        return rv667    def iter_opts(self):668        return set(list(self.opts.keys()) + list(self.shell.defaults.keys()) +669                   ['cwd', 'env'])670    def copy_opts(self):671        rv = {}672        for opt in self.iter_opts():673            val = self.get_opt(opt)674            if val is not None:675                rv[opt] = val676        return rv677builtin_sh = Shell(raise_on_error=True)678builtin_sh.alias(679    apt_cache='apt-cache',680    apt_get='apt-get',681    apt_key='apt-key',682    dpkg_divert='dpkg-divert',683    grub_install='grub-install',684    grub_mkconfig='grub-mkconfig',685    locale_gen='locale-gen',686    mkfs_ext2='mkfs.ext2',687    mkfs_ext3='mkfs.ext3',688    mkfs_ext4='mkfs.ext4',689    mkfs_vfat='mkfs.vfat',690    qemu_img='qemu-img',691    repo_add='repo-add',692    update_grub='update-grub',693    update_initramfs='update-initramfs',694    update_locale='update-locale',695    )...webenv.py
Source:webenv.py  
1# 1 day in development.2import gc3import sys4import asyncio5import numpy as np6continue_on_errors = False7def js_executor(code): # Returns a Bash command that will run `code` in Node.js.8    # Note: shlex.quote(code) quotes improperly on Windows, so, we quote manually.9    code = '"' + code.replace('\\', '\\\\').replace('"', '\\"') + '"'10    import shutil11    if shutil.which('nodejs') is not None:12        return 'nodejs -e ' + code13    return 'node -e ' + code14def webenv(agent, *interfaces, int_size=0, webenv_path='webenv', js_executor=js_executor):15    """16    A Python wrapper for creating and connecting to a local Web environment.17    Pass in the agent and all the interfaces. This will loop infinitely.18    (This does not have an OpenAI-Gym-like interface, because that makes asynchronicity less natural to implement, and assumes that sizes are static.)19    Arguments:20    - `agent`: an async function, from observations and the recommended action length (a number), to a tuple of predictions and actions, all NumPy arrays and -1â¦1|NaN unless specified.21    For throughput, immediately send commands to another device, and return an `await`able Future.22    To stop this web env, `raise` an exception.23    - `interfaces`: a list of either strings (which are put as-is as JS code, where `we` is the webenv module) or structured args.24    Args are a convenience: numbers and bools and strings are put as-is (JS strings must be quoted again), arrays become function calls (with the first string item being the unescaped function to call), dicts become objects.25    - `int_size`: increase throughput at the cost of precision. `0` communicates through float32, `1` through int8, `2` through int16. Do not specify `we.io()` manually.26    - `webenv_path`: what the generated JS should `require`. `'webenv'` by default.27    - `js_executor`: the function from generated JS to the executed system command; escape quotes manually. Uses NodeJS directly by default.28    Example:29    >>> import webenv30    >>> webenv.webenv(lambda x: x, '"https://www.youtube.com/watch?v=dQw4w9WgXcQ"', 'we.defaults', ['we.randomAgent'])31    """32    if not callable(agent):33        raise TypeError('Agent must be a function')34    if int_size != 0 and int_size != 1 and int_size != 2:35        raise TypeError('Int size must be 0 (float32) or 1 (int8) or 2 (int16)')36    code = _js_code_for_interfaces(interfaces, webenv_path)37    cmd = js_executor(code)38    prev_write = [None] # A lock, to only do one write at a time.39    prev_flush_info = [None] # A lock on flushes. (Effectively unused.)40    read_streams = {} # index â asyncio.Queue41    max_index = 042    async def read(reader):43        # Receive index & observations & action-length packets, and put it into `read_streams`.44        nonlocal max_index45        while True:46            index = await _read_u32(reader)47            obs = _decode(await _read_data(reader, int_size))48            # Bug: if there are too few observations (<4095), this fails to read `obs`'s length correctly.49            #   Just provide more observations, why fix it.50            act_len = await _read_u32(reader)51            if index not in read_streams:52                read_streams[index] = asyncio.Queue(64)53            if index > max_index: max_index = index54            await read_streams[index].put((obs, act_len))55            read_streams['any'].put_nowait(None)56    async def step(writer, read_lock):57        # Read from `read_streams`, call `agent`, and write what we did.58        try:59            indices, obs, act_len = [], [], []60            while True:61                await asyncio.sleep(.001)62                while True:63                    # Wait for more data to arrive, distributing computation better.64                    #   All these runtime adaptations at each processing step are hacky.65                    #   Maybe if we gave feedback on how much each stream is processed to WebEnv, it could optimize throughput better?66                    if all([q.qsize() >= 1 for (i,q) in read_streams.items() if i != 'any']): break67                    if any([q.qsize() >= 2 for (i,q) in read_streams.items() if i != 'any']): break68                    await asyncio.sleep(.01)69                for i in range(max_index+1):70                    if i not in read_streams: continue71                    queue = read_streams[i]72                    if i == 'any': continue73                    if queue.empty(): continue # Only process available data.74                    item = queue.get_nowait()75                    read_streams['any'].get_nowait()76                    if item[1] == 0xFFFFFFFF: continue # Just ignore dealloc events.77                    indices.append([i])78                    obs.append(item[0])79                    act_len.append(item[1])80                if len(obs): break81                # If all streams are empty, wait for the next item.82                await read_streams['any'].get()83                read_streams['any'].put_nowait(None)84            indices = np.array(indices, dtype=np.int64)85            preds, acts = await agent(read_lock, indices, obs, act_len)86            prevW = prev_write[0]87            nextW = prev_write[0] = asyncio.Future()88            _write_all(writer, int_size, indices, preds, acts)89            # await _flush(writer, prev_flush_info) # Apparently, `asyncio`'s `.drain()` cannot be trusted to return. Maybe it's because we turned off buffering.90            if asyncio.isfuture(prevW): await prevW # Ensure linear ordering of writes.91            nextW.set_result(None)92        except Exception as err:93            read_lock.set_result(None)94            if not continue_on_errors: raise95            print(err)96    async def steps(cmd):97        P = asyncio.subprocess.PIPE98        proc = await asyncio.create_subprocess_shell(cmd, stdin=P, stdout=P)99        # Turn off buffering. (Only 5 writes per message, so a buffer might not help much anyway.)100        proc.stdin.transport.set_write_buffer_limits(0, 0)101        reader, writer = proc.stdout, proc.stdin102        _write_u32(writer, 0x01020304)103        _write_u32(writer, int_size)104        await _flush(writer, prev_flush_info)105        counter = 0106        read_streams['any'] = asyncio.Queue()107        asyncio.create_task(read(reader))108        while True:109            try:110                read_lock = asyncio.Future()111                asyncio.create_task(step(writer, read_lock))112                if counter % 1000 == 0:113                    gc.collect()114                await read_lock115                counter = counter + 1116            except Exception as err:117                if not continue_on_errors: raise118                print(err)119    asyncio.run(steps(cmd))120async def _read_n(stream, n):121    return await stream.readexactly(n)122async def _read_u32(stream):123    bytes = await _read_n(stream, 4)124    return int.from_bytes(bytes, sys.byteorder)125async def _read_data(stream, int_size = 0):126    # Length then data.127    len = await _read_u32(stream)128    byteCount = len*4 if int_size == 0 else len*int_size129    bytes = await _read_n(stream, byteCount)130    dtype = np.float32 if int_size == 0 else np.int8 if int_size == 1 else np.int16131    return np.frombuffer(bytes, dtype)132def _write_u32(stream, x):133    stream.write(x.to_bytes(4, sys.byteorder))134def _write_data(stream, data):135    # Length then data. Don't forget to flush afterwards.136    _write_u32(stream, data.size)137    stream.write(data.tobytes())138def _write_all(stream, int_size, indices, preds, acts):139    # indices/pred/act equal-size lists (or int64 NumPy array, for indices).140    for i in range(len(preds)):141        index, pred, act = indices[i], preds[i], acts[i]142        if pred.dtype != np.float32 or act.dtype != np.float32:143            raise TypeError('Predictions & actions must be float32 arrays')144        _write_u32(stream, int(index.item()))145        _write_data(stream, _encode(pred, int_size))146        _write_data(stream, _encode(act, int_size))147async def _flush(stream, prev_flush):148    # `stream.drain()` can only be called one at a time, so we await the previous flush.149    prev = prev_flush[0]150    fut = prev_flush[0] = asyncio.Future()151    if prev is not None: await prev152    await stream.drain()153    fut.set_result(None)154def _decode(ints):155    if ints.dtype == np.float32:156        return ints157    scale = 127 if ints.dtype == np.int8 else 32767158    nanValue = -scale-1159    x = ints.astype(np.float32)160    return np.where(x == nanValue, np.nan, x / scale)161def _encode(floats, int_size = 0):162    if int_size == 0:163        return floats164    scale = 127 if int_size == 1 else 32767165    nanValue = -scale-1166    rounded = np.where(np.isnan(floats), nanValue, np.rint(np.clip(floats, -1, 1) * scale))167    dtype = np.int8 if int_size == 1 else np.int16168    return rounded.astype(dtype)169def _js_code_for_interfaces(inters, webenv_path):170    code = "const we = require('" + webenv_path + "');"171    code += "we.init(we.io(),"172    for i in inters:173        if isinstance(i, str):174            code = code + i + ','175        else:176            code = code + _js_code_for_args(i) + ','177    code = code + ")"178    return code179def _js_code_for_args(a):180    if isinstance(a, int) or isinstance(a, float) or isinstance(a, str):181        return str(a)182    if isinstance(a, bool):183        return 'true' if a else 'false'184    if isinstance(a, list):185        if not isinstance(a[0], str):186            raise TypeError('Interfaces can only call JS-string functions')187        return a[0] + "(" + ",".join([_js_code_for_args(x) for x in a[1:]]) + ")"188    if isinstance(a, dict):189        return "{" + ",".join([k + ":" + _js_code_for_args(a[k]) for k in a]) + "}"...__init__.py
Source:__init__.py  
1from cffi import FFI2import io3import itertools4ffibuilder = FFI()5ffibuilder.cdef(r"""6extern "Python" size_t _py_fread(char* ptr, size_t size, size_t count, unsigned stream);7uint64_t _xxh3_stream_int(unsigned stream);8uint64_t _xxh3_int(unsigned stream, unsigned size);9""")10ffibuilder.set_source(11    "_xxh3_cffi",12    r'''13#include <assert.h>14#include "xxh3.h"15#define BUFFER_SIZE 102416#define ASSERT(cond, ...) if (!(cond)) { fprintf(stderr, ##__VA_ARGS__); exit(1); }17static size_t _py_fread(char* ptr, size_t size, size_t count, unsigned stream);18uint64_t _xxh3_int(unsigned stream, unsigned size) {19    char buffer[size];20    size_t read_bytes = _py_fread(buffer, 1, size, stream);21    ASSERT(read_bytes == size, "didnt read enought bytes\n");22    return XXH3_64bits(buffer, size);23}24uint64_t _xxh3_stream_int(unsigned stream) {25    size_t read_bytes;26    XXH3_state_t state;27    char buffer[BUFFER_SIZE];28    ASSERT(XXH3_64bits_reset(&state) != XXH_ERROR, "xxh3 reset failed\n");29    while (1) {30        read_bytes = _py_fread(buffer, 1, BUFFER_SIZE, stream);31        ASSERT(XXH3_64bits_update(&state, buffer, read_bytes) != XXH_ERROR, "xxh3 update failed\n");32        if (BUFFER_SIZE != read_bytes)33            break;34    }35    return XXH3_64bits_digest(&state);36}37    ''',38    sources=['xxh3/xxhash.c'],39    include_dirs=['xxh3'],40    extra_compile_args=['-Wall', '-O3',  '-march=native', '-mtune=native'],41)42try:43    from _xxh3_cffi import ffi, lib44except:45    pass46else:47    read_i = itertools.count(0)48    read_streams = {}49    @ffi.def_extern()50    def _py_fread(ptr, size, count, stream):51        size *= count52        val = read_streams[stream].read(size)53        read_size = len(val)54        ffi.memmove(ptr, val, read_size)55        return read_size56    def oneshot_int(val):57        assert len(val) <= 1024, 'to hash large values use the streaming interface'58        in_file = next(read_i)59        read_streams[in_file] = io.BytesIO(val)60        val = lib._xxh3_int(in_file, len(val))61        del read_streams[in_file]62        return val63        return lib._xxh3_int(val, len(val))64    def oneshot_hex(val):65        return hex(oneshot_int(val))[2:].rjust(16, '0')66    def stream_int(reader):67        in_file = next(read_i)68        read_streams[in_file] = reader69        val = lib._xxh3_stream_int(in_file)70        del read_streams[in_file]71        return val72    def stream_hex(reader):73        return hex(stream_int(reader))[2:].rjust(16, '0')74if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
