How to use read_streams method in localstack

Best Python code snippet using localstack_python

ush.py

Source:ush.py Github

copy

Full Screen

1import collections2import contextlib3import errno4import glob5import os6import re7import subprocess8import sys9import types10__all__ = ('Shell', 'Command', 'InvalidPipeline', 'AlreadyRedirected',11 'ProcessError')12STDOUT = subprocess.STDOUT13PIPE = subprocess.PIPE14# Some opts have None as a valid value, so we use EMPTY as a default value when15# reading opts to determine if an option was passed.16EMPTY = object()17# Cross/platform /dev/null specifier alias18NULL = os.devnull19MAX_CHUNK_SIZE = 0xffff20GLOB_PATTERNS = re.compile(r'(?:\*|\?|\[[^\]]+\])')21GLOB_OPTS = {}22# We have python2/3 compatibility, but don't want to rely on `six` package so23# this script can be used independently.24try:25 xrange26 from Queue import Queue, Empty27 import StringIO28 StringIO = BytesIO = StringIO.StringIO29 def is_string(o):30 return isinstance(o, basestring)31 to_cstr = str32 PY3 = False33except NameError:34 xrange = range35 from queue import Queue, Empty36 import io37 StringIO = io.StringIO38 BytesIO = io.BytesIO39 def is_string(o):40 return isinstance(o, str) or isinstance(o, bytes)41 def to_cstr(obj):42 if isinstance(obj, bytes):43 return obj44 return str(obj).encode('utf-8')45 PY3 = True46 if sys.version_info >= (3, 5):47 GLOB_OPTS = {'recursive': True}48if sys.platform == 'win32':49 import threading50 def set_extra_popen_opts(opts):51 pass52 def concurrent_communicate(proc, read_streams):53 return concurrent_communicate_with_threads(proc, read_streams)54else:55 import select56 from signal import signal, SIGPIPE, SIG_DFL57 _PIPE_BUF = getattr(select, 'PIPE_BUF', 512)58 def set_extra_popen_opts(opts):59 user_preexec_fn = opts.get('preexec_fn', None)60 def preexec_fn():61 if user_preexec_fn:62 user_preexec_fn()63 # Restore SIGPIPE default handler when forked. This is required for64 # handling pipelines correctly.65 signal(SIGPIPE, SIG_DFL)66 opts['preexec_fn'] = preexec_fn67 def concurrent_communicate(proc, read_streams):68 return concurrent_communicate_with_select(proc, read_streams)69class InvalidPipeline(Exception):70 pass71class AlreadyRedirected(Exception):72 pass73class ProcessError(Exception):74 def __init__(self, process_info):75 msg = 'One or more commands failed: {}'.format(process_info)76 super(ProcessError, self).__init__(msg)77 self.process_info = process_info78def expand_filenames(argv, cwd):79 def expand_arg(arg):80 return [os.path.relpath(p, cwd)81 for p in glob.iglob(os.path.join(cwd, arg), **GLOB_OPTS)]82 rv = [argv[0]]83 for arg in argv[1:]:84 if arg and arg[0] != '-' and GLOB_PATTERNS.search(arg):85 rv += expand_arg(arg)86 else:87 rv.append(arg)88 return rv89def update_opts_env(opts, extra_env):90 if extra_env is None:91 del opts['env']92 return93 env = opts.get('env', None)94 if env is None:95 env = {}96 else:97 env = env.copy()98 env.update(extra_env)99 opts['env'] = env100def set_environment(proc_opts):101 env = proc_opts.get('env', None)102 if env is None:103 return104 new_env = {}105 if proc_opts.get('merge_env', True):106 new_env.update(os.environ)107 new_env.update(env)108 # unset environment variables set to `None`109 for k in list(new_env.keys()):110 if new_env[k] is None: del new_env[k]111 proc_opts['env'] = new_env112def fileobj_has_fileno(fileobj):113 try:114 fileobj.fileno()115 return True116 except Exception:117 return False118def remove_invalid_opts(opts):119 new_opts = {}120 new_opts.update(opts)121 for opt in ('raise_on_error', 'merge_env', 'glob'):122 if opt in new_opts: del new_opts[opt] 123 return new_opts124LS = os.linesep125LS_LEN = len(LS)126def iterate_lines(chunk_iterator, trim_trailing_lf=False):127 remaining = {}128 for chunk, stream_id in chunk_iterator:129 chunk = remaining.get(stream_id, '') + chunk.decode('utf-8')130 last_ls_index = -LS_LEN131 while True:132 start = last_ls_index + LS_LEN133 try:134 ls_index = chunk.index(LS, start)135 except ValueError:136 remaining[stream_id] = chunk[last_ls_index + LS_LEN:]137 break138 yield chunk[start:ls_index], stream_id139 remaining[stream_id] = chunk[ls_index + LS_LEN:]140 last_ls_index = ls_index141 for stream_id in remaining:142 line = remaining[stream_id]143 if line or not trim_trailing_lf:144 yield line, stream_id145def validate_pipeline(commands):146 for index, command in enumerate(commands):147 is_first = index == 0148 is_last = index == len(commands) - 1149 if not is_first and command.opts.get('stdin', None) is not None:150 msg = (151 'Command {0} is not the first in the pipeline and has '152 'stdin set to a value different than "None"'153 ).format(command)154 raise InvalidPipeline(msg)155 if not is_last and command.opts.get('stdout', None) is not None:156 msg = (157 'Command {0} is not the last in the pipeline and has '158 'stdout set to a value different than "None"'159 ).format(command)160 raise InvalidPipeline(msg)161def wait(procs, raise_on_error):162 status_codes = []163 result = tuple(iterate_outputs(procs, raise_on_error, status_codes))164 assert result == ()165 return tuple(status_codes)166def iterate_outputs(procs, raise_on_error, status_codes):167 read_streams = [proc.stderr_stream for proc in procs if proc.stderr]168 if procs[-1].stdout:169 read_streams.append(procs[-1].stdout_stream)170 write_stream = procs[0].stdin_stream if procs[0].stdin else None171 co = communicate(procs)172 wchunk = None173 while True:174 try:175 ri = co.send(wchunk)176 if ri:177 rchunk, i = ri178 if read_streams[i]:179 read_streams[i].write(rchunk)180 else:181 yield ri182 except StopIteration:183 break184 try:185 wchunk = next(write_stream) if write_stream else None186 except StopIteration:187 wchunk = None188 status_codes += [proc.wait() for proc in procs]189 if raise_on_error and len(list(filter(lambda c: c != 0, status_codes))):190 process_info = [191 (proc.argv, proc.pid, proc.returncode) for proc in procs192 ]193 raise ProcessError(process_info)194def write_chunk(proc, chunk):195 try:196 proc.stdin.write(to_cstr(chunk))197 except IOError as e:198 if e.errno == errno.EPIPE:199 # communicate() should ignore broken pipe error200 pass201 elif (e.errno == errno.EINVAL and proc.poll() is not None):202 # Issue #19612: stdin.write() fails with EINVAL203 # if the process already exited before the write204 pass205 else:206 raise207def communicate(procs):208 # make a list of (readable streams, sinks) tuples209 read_streams = [proc.stderr for proc in procs if proc.stderr]210 if procs[-1].stdout:211 read_streams.append(procs[-1].stdout)212 writer = procs[0]213 if len(read_streams + [w for w in [writer] if w.stdin]) > 1:214 return concurrent_communicate(writer, read_streams)215 if writer.stdin or len(read_streams) == 1:216 return simple_communicate(writer, read_streams)217 else:218 return stub_communicate()219def stub_communicate():220 return221 yield222def simple_communicate(proc, read_streams):223 if proc.stdin:224 while True:225 chunk = yield226 if not chunk:227 break228 write_chunk(proc, chunk)229 proc.stdin.close()230 else:231 read_stream = read_streams[0]232 while True:233 chunk = read_stream.read(MAX_CHUNK_SIZE)234 if not chunk:235 break236 yield (chunk, 0)237def concurrent_communicate_with_select(proc, read_streams):238 reading = [] + read_streams239 writing = [proc.stdin] if proc.stdin else []240 indexes = dict((r.fileno(), i) for i, r in enumerate(read_streams))241 write_queue = collections.deque()242 while reading or writing:243 try:244 rlist, wlist, xlist = select.select(reading, writing, [])245 except select.error as e:246 if e.args[0] == errno.EINTR:247 continue248 raise249 for rstream in rlist:250 rchunk = os.read(rstream.fileno(), MAX_CHUNK_SIZE)251 if not rchunk:252 rstream.close()253 reading.remove(rstream)254 continue255 write_queue.append((yield rchunk, indexes[rstream.fileno()]))256 if not write_queue:257 write_queue.append((yield))258 if not wlist:259 continue260 while write_queue:261 wchunk = write_queue.popleft()262 if wchunk is None:263 assert not write_queue264 writing = []265 proc.stdin.close()266 break267 wchunk = to_cstr(wchunk)268 chunk = wchunk[:_PIPE_BUF]269 if len(wchunk) > _PIPE_BUF:270 write_queue.appendleft(wchunk[_PIPE_BUF:])271 try:272 written = os.write(proc.stdin.fileno(), chunk)273 except OSError as e:274 if e.errno != errno.EPIPE:275 raise276 writing = []277 proc.stdin.close()278 else:279 if len(chunk) > written:280 write_queue.appendleft(chunk[written:])281 # break so we wait for the pipe buffer to be drained282 break283def concurrent_communicate_with_threads(proc, read_streams):284 def read(queue, read_stream, index):285 while True:286 chunk = read_stream.read(MAX_CHUNK_SIZE)287 if not chunk:288 break289 queue.put((chunk, index))290 queue.put((None, index))291 def write(queue, proc):292 while True:293 chunk = queue.get()294 if not chunk:295 break296 write_chunk(proc, chunk)297 proc.stdin.close()298 rqueue = Queue(maxsize=1)299 wqueue = Queue()300 threads = []301 for i, rs in enumerate(read_streams):302 threads.append(threading.Thread(target=read, args=(rqueue, rs, i)))303 threads[-1].setDaemon(True)304 threads[-1].start()305 if proc.stdin:306 threads.append(threading.Thread(target=write, args=(wqueue, proc)))307 threads[-1].setDaemon(True)308 threads[-1].start()309 writing = True310 reading = len(read_streams)311 while writing or reading or rqueue.qsize():312 if reading or rqueue.qsize():313 try:314 rchunk, index = rqueue.get(block=not writing)315 if rchunk:316 wchunk = yield rchunk, index317 else:318 reading -= 1319 continue320 except Empty:321 wchunk = yield322 else:323 wchunk = yield324 if writing:325 wqueue.put(wchunk)326 writing = wchunk is not None327 for t in threads:328 t.join()329def setup_redirect(proc_opts, key):330 stream = proc_opts.get(key, None)331 if stream in (None, STDOUT, PIPE) or fileobj_has_fileno(stream):332 # Simple case which will be handled automatically by Popen: stream is333 # STDOUT/PIPE or a file object backed by file.334 return None, False335 if is_string(stream):336 # stream is a string representing a filename, we'll open the file with337 # appropriate mode which will be set to proc_opts[key].338 if key == 'stdin':339 proc_opts[key] = open(stream, 'rb')340 else:341 if stream.endswith('+'):342 proc_opts[key] = open(stream[:-1], 'ab')343 # On MS Windows we need to explicitly the file position to the344 # end or the file contents will be replaced.345 proc_opts[key].seek(0, os.SEEK_END)346 else:347 proc_opts[key] = open(stream, 'wb')348 return None, True349 if key == 'stdin':350 if hasattr(stream, 'read'):351 # replace with an iterator that yields data in up to 64k chunks.352 # This is done to avoid the yield-by-line logic when iterating353 # file-like objects that contain binary data.354 stream = fileobj_to_iterator(stream)355 elif hasattr(stream, '__iter__'):356 stream = iter(stream)357 proc_opts[key] = PIPE358 return stream, False359def fileobj_to_iterator(fobj):360 def iterator():361 while True:362 data = fobj.read(MAX_CHUNK_SIZE)363 if not data:364 break365 yield data366 return iterator()367def echo(s):368 if isinstance(s, str):369 return StringIO(s)370 else:371 assert isinstance(s, bytes)372 return BytesIO(s)373class RunningProcess(object):374 def __init__(self, popen, stdin_stream, stdout_stream, stderr_stream,375 argv):376 self.popen = popen377 self.stdin_stream = stdin_stream378 self.stdout_stream = stdout_stream379 self.stderr_stream = stderr_stream380 self.argv = argv381 @property382 def returncode(self):383 return self.popen.returncode384 @property385 def stdin(self):386 return self.popen.stdin387 @property388 def stdout(self):389 return self.popen.stdout390 @property391 def stderr(self):392 return self.popen.stderr393 @property394 def pid(self):395 return self.popen.pid396 def wait(self):397 return self.popen.wait()398 def poll(self):399 return self.popen.poll()400class Shell(object):401 def __init__(self, **defaults):402 self.aliases = {}403 self.envstack = []404 self.dirstack = []405 if 'env' in defaults:406 self.envstack.append(defaults['env'])407 del defaults['env']408 if 'cwd' in defaults:409 self.dirstack.append(defaults['cwd'])410 del defaults['cwd']411 self.defaults = defaults412 self.echo = echo413 def __call__(self, *argvs, **opts):414 rv = []415 for argv in argvs:416 if is_string(argv):417 argv = self.aliases.get(argv, argv)418 if is_string(argv):419 argv = [argv]420 rv.append(Command(argv, shell=self, **opts))421 return rv[0] if len(rv) == 1 else rv422 @contextlib.contextmanager423 def setenv(self, env):424 self.envstack.append(env)425 yield426 e = self.envstack.pop()427 assert e == env428 @contextlib.contextmanager429 def chdir(self, path):430 path = str(path) # allow pathlib.Path instances431 if path[0] != '/':432 # not absolute path, consider the current stack and join with the433 # last path434 if self.dirstack:435 path = os.path.normpath('{}/{}'.format(self.dirstack[-1],436 path))437 self.dirstack.append(path)438 yield439 p = self.dirstack.pop()440 assert p == path441 def alias(self, **aliases):442 self.aliases.update(aliases)443 def export_as_module(self, module_name, full_name=False):444 if full_name:445 sys.modules[module_name] = ShellModule(self, module_name)446 return447 if module_name in globals():448 raise Exception('Name "{}" is already taken'.format(module_name))449 full_module_name = __name__ + '.' + module_name450 module = ShellModule(self, full_module_name)451 sys.modules[full_module_name] = module452 globals()[module_name] = module453class ShellModule(types.ModuleType):454 def __init__(self, shell, name):455 self.__shell = shell456 self.__file__ = '<frozen>'457 self.__name__ = name458 self.__package__ = __package__459 self.__loader__ = None460 def __repr__(self):461 return repr(self.__shell)462 def __getattr__(self, name):463 if name.startswith('__'):464 return super(ShellModule, self).__getattr__(name) 465 attr = getattr(self.__shell, name, None)466 if attr and name != 'export_as_module':467 return attr468 return self.__shell(name)469class PipelineBasePy3(object):470 def __bytes__(self):471 return self._collect_output()472 def __str__(self):473 return bytes(self).decode('utf-8')474class PipelineBasePy2(object):475 def __str__(self):476 return self._collect_output()477 def __unicode__(self):478 return str(self).decode('utf-8')479class Pipeline(PipelineBasePy3 if PY3 else PipelineBasePy2):480 def __init__(self, commands):481 validate_pipeline(commands)482 self.commands = commands483 def __repr__(self):484 return ' | '.join((repr(c) for c in self.commands))485 def __or__(self, other):486 if isinstance(other, Shell):487 return other(self)488 elif hasattr(other, 'write') or is_string(other):489 return Pipeline(self.commands[:-1] +490 [self.commands[-1]._redirect('stdout', other)])491 assert isinstance(other, Command)492 return Pipeline(self.commands + [other])493 def __ror__(self, other):494 if hasattr(other, '__iter__') or is_string(other):495 return Pipeline([self.commands[0]._redirect('stdin', other)] +496 self.commands[1:])497 assert False, "Invalid"498 def __call__(self):499 procs, raise_on_error = self._spawn()500 return wait(procs, raise_on_error)501 def __iter__(self):502 return self._iter(False)503 def _iter(self, raw):504 pipeline = Pipeline(self.commands[:-1] +505 [self.commands[-1]._redirect('stdout', PIPE)])506 procs, raise_on_error = pipeline._spawn()507 pipe_count = sum(1 for proc in procs if proc.stderr)508 if procs[-1].stdout:509 pipe_count += 1510 if not pipe_count:511 wait(procs, raise_on_error)512 # nothing to yield513 return514 iterator = iterate_outputs(procs, raise_on_error, [])515 if not raw:516 iterator = iterate_lines(iterator, trim_trailing_lf=True)517 if pipe_count == 1:518 for line, stream_index in iterator:519 yield line520 else:521 for line, stream_index in iterator:522 yield tuple(line if stream_index == index else None523 for index in xrange(pipe_count))524 def _collect_output(self):525 sink = BytesIO()526 (self | sink)()527 return sink.getvalue()528 def _spawn(self):529 procs = []530 raise_on_error = False531 for index, command in enumerate(self.commands):532 close_in = False533 close_out = False534 close_err = False535 is_first = index == 0536 is_last = index == len(self.commands) - 1537 stdin_stream = None538 stdout_stream = None539 stderr_stream = None540 # copy argv/opts541 proc_argv = [str(a) for a in command.argv]542 proc_opts = command.copy_opts()543 raise_on_error = raise_on_error or proc_opts.get('raise_on_error',544 False)545 if is_first:546 # first command in the pipeline may redirect stdin547 stdin_stream, close_in = setup_redirect(proc_opts, 'stdin')548 else:549 # only set current process stdin if it is not the first in the550 # pipeline.551 proc_opts['stdin'] = procs[-1].stdout552 if is_last:553 # last command in the pipeline may redirect stdout554 stdout_stream, close_out = setup_redirect(proc_opts, 'stdout')555 else:556 # only set current process stdout if it is not the last in the557 # pipeline.558 proc_opts['stdout'] = PIPE559 # stderr may be set at any point in the pipeline560 stderr_stream, close_err = setup_redirect(proc_opts, 'stderr')561 set_extra_popen_opts(proc_opts)562 if proc_opts.get('glob', False):563 proc_argv = expand_filenames(564 proc_argv, os.path.realpath(565 proc_opts.get('cwd', os.curdir)))566 set_environment(proc_opts)567 current_proc = RunningProcess(568 subprocess.Popen(proc_argv, **remove_invalid_opts(proc_opts)),569 stdin_stream, stdout_stream, stderr_stream, proc_argv570 )571 # if files were opened and connected to the process stdio, close572 # our copies of the descriptors573 if close_in:574 proc_opts['stdin'].close()575 if close_out:576 proc_opts['stdout'].close()577 if close_err:578 proc_opts['stderr'].close()579 if not is_first:580 # close our copy of the previous process's stdout, now that it581 # is connected to the current process's stdin582 procs[-1].stdout.close()583 procs.append(current_proc)584 return procs, raise_on_error585 def iter_raw(self):586 return self._iter(True)587class Command(object):588 OPTS = ('stdin', 'stdout', 'stderr', 'env', 'cwd', 'preexec_fn',589 'raise_on_error', 'merge_env', 'glob')590 def __init__(self, argv, shell=None, **opts):591 self.argv = tuple(argv)592 self.shell = shell593 self.opts = {}594 for key in opts:595 if key not in Command.OPTS:596 raise TypeError('Invalid keyword argument "{}"'.format(key))597 value = opts[key]598 if key == 'cwd' and value is not None:599 value = str(value) # allow pathlib.Path instances600 self.opts[key] = value601 def __call__(self, *argv, **opts):602 if not argv and not opts:603 # invoke the command604 return Pipeline([self])()605 new_opts = self.opts.copy()606 if 'env' in opts:607 update_opts_env(new_opts, opts['env'])608 del opts['env']609 for key in Command.OPTS:610 if key in opts:611 new_opts[key] = opts[key]612 return Command(self.argv + argv, shell=self.shell, **new_opts)613 def __repr__(self):614 argstr = ' '.join(self.argv)615 optstr = ' '.join(616 '{}={}'.format(key, self.get_opt(key))617 for key in self.iter_opts() if self.get_opt(key, None) is not None618 )619 if optstr:620 return '{0} ({1})'.format(argstr, optstr)621 else:622 return argstr623 def __str__(self):624 return str(Pipeline([self]))625 def __bytes__(self):626 return bytes(Pipeline([self]))627 def __unicode__(self):628 return unicode(Pipeline([self]))629 def __iter__(self):630 return iter(Pipeline([self]))631 def __or__(self, other):632 return Pipeline([self]) | other633 def __ror__(self, other):634 return other | Pipeline([self])635 def _redirect(self, key, stream):636 if self.get_opt(key, None) is not None:637 raise AlreadyRedirected('command already redirects ' + key)638 return self(**{key: stream})639 def iter_raw(self):640 return Pipeline([self]).iter_raw()641 def get_env(self):642 if not self.shell.envstack and 'env' not in self.opts:643 return None644 env = {}645 for e in self.shell.envstack:646 env.update(e)647 env.update(self.opts.get('env', {}))648 return env649 def get_cwd(self):650 cwd = self.opts.get('cwd', None)651 if cwd:652 return cwd653 if self.shell.dirstack:654 return self.shell.dirstack[-1]655 return None656 def get_opt(self, opt, default=None):657 if opt == 'env':658 return self.get_env()659 if opt == 'cwd':660 return self.get_cwd()661 rv = self.opts.get(opt, EMPTY)662 if rv is EMPTY:663 rv = self.shell.defaults.get(opt, EMPTY)664 if rv is EMPTY:665 return default666 return rv667 def iter_opts(self):668 return set(list(self.opts.keys()) + list(self.shell.defaults.keys()) +669 ['cwd', 'env'])670 def copy_opts(self):671 rv = {}672 for opt in self.iter_opts():673 val = self.get_opt(opt)674 if val is not None:675 rv[opt] = val676 return rv677builtin_sh = Shell(raise_on_error=True)678builtin_sh.alias(679 apt_cache='apt-cache',680 apt_get='apt-get',681 apt_key='apt-key',682 dpkg_divert='dpkg-divert',683 grub_install='grub-install',684 grub_mkconfig='grub-mkconfig',685 locale_gen='locale-gen',686 mkfs_ext2='mkfs.ext2',687 mkfs_ext3='mkfs.ext3',688 mkfs_ext4='mkfs.ext4',689 mkfs_vfat='mkfs.vfat',690 qemu_img='qemu-img',691 repo_add='repo-add',692 update_grub='update-grub',693 update_initramfs='update-initramfs',694 update_locale='update-locale',695 )...

Full Screen

Full Screen

webenv.py

Source:webenv.py Github

copy

Full Screen

1# 1 day in development.2import gc3import sys4import asyncio5import numpy as np6continue_on_errors = False7def js_executor(code): # Returns a Bash command that will run `code` in Node.js.8 # Note: shlex.quote(code) quotes improperly on Windows, so, we quote manually.9 code = '"' + code.replace('\\', '\\\\').replace('"', '\\"') + '"'10 import shutil11 if shutil.which('nodejs') is not None:12 return 'nodejs -e ' + code13 return 'node -e ' + code14def webenv(agent, *interfaces, int_size=0, webenv_path='webenv', js_executor=js_executor):15 """16 A Python wrapper for creating and connecting to a local Web environment.17 Pass in the agent and all the interfaces. This will loop infinitely.18 (This does not have an OpenAI-Gym-like interface, because that makes asynchronicity less natural to implement, and assumes that sizes are static.)19 Arguments:20 - `agent`: an async function, from observations and the recommended action length (a number), to a tuple of predictions and actions, all NumPy arrays and -1…1|NaN unless specified.21 For throughput, immediately send commands to another device, and return an `await`able Future.22 To stop this web env, `raise` an exception.23 - `interfaces`: a list of either strings (which are put as-is as JS code, where `we` is the webenv module) or structured args.24 Args are a convenience: numbers and bools and strings are put as-is (JS strings must be quoted again), arrays become function calls (with the first string item being the unescaped function to call), dicts become objects.25 - `int_size`: increase throughput at the cost of precision. `0` communicates through float32, `1` through int8, `2` through int16. Do not specify `we.io()` manually.26 - `webenv_path`: what the generated JS should `require`. `'webenv'` by default.27 - `js_executor`: the function from generated JS to the executed system command; escape quotes manually. Uses NodeJS directly by default.28 Example:29 >>> import webenv30 >>> webenv.webenv(lambda x: x, '"https://www.youtube.com/watch?v=dQw4w9WgXcQ"', 'we.defaults', ['we.randomAgent'])31 """32 if not callable(agent):33 raise TypeError('Agent must be a function')34 if int_size != 0 and int_size != 1 and int_size != 2:35 raise TypeError('Int size must be 0 (float32) or 1 (int8) or 2 (int16)')36 code = _js_code_for_interfaces(interfaces, webenv_path)37 cmd = js_executor(code)38 prev_write = [None] # A lock, to only do one write at a time.39 prev_flush_info = [None] # A lock on flushes. (Effectively unused.)40 read_streams = {} # index → asyncio.Queue41 max_index = 042 async def read(reader):43 # Receive index & observations & action-length packets, and put it into `read_streams`.44 nonlocal max_index45 while True:46 index = await _read_u32(reader)47 obs = _decode(await _read_data(reader, int_size))48 # Bug: if there are too few observations (<4095), this fails to read `obs`'s length correctly.49 # Just provide more observations, why fix it.50 act_len = await _read_u32(reader)51 if index not in read_streams:52 read_streams[index] = asyncio.Queue(64)53 if index > max_index: max_index = index54 await read_streams[index].put((obs, act_len))55 read_streams['any'].put_nowait(None)56 async def step(writer, read_lock):57 # Read from `read_streams`, call `agent`, and write what we did.58 try:59 indices, obs, act_len = [], [], []60 while True:61 await asyncio.sleep(.001)62 while True:63 # Wait for more data to arrive, distributing computation better.64 # All these runtime adaptations at each processing step are hacky.65 # Maybe if we gave feedback on how much each stream is processed to WebEnv, it could optimize throughput better?66 if all([q.qsize() >= 1 for (i,q) in read_streams.items() if i != 'any']): break67 if any([q.qsize() >= 2 for (i,q) in read_streams.items() if i != 'any']): break68 await asyncio.sleep(.01)69 for i in range(max_index+1):70 if i not in read_streams: continue71 queue = read_streams[i]72 if i == 'any': continue73 if queue.empty(): continue # Only process available data.74 item = queue.get_nowait()75 read_streams['any'].get_nowait()76 if item[1] == 0xFFFFFFFF: continue # Just ignore dealloc events.77 indices.append([i])78 obs.append(item[0])79 act_len.append(item[1])80 if len(obs): break81 # If all streams are empty, wait for the next item.82 await read_streams['any'].get()83 read_streams['any'].put_nowait(None)84 indices = np.array(indices, dtype=np.int64)85 preds, acts = await agent(read_lock, indices, obs, act_len)86 prevW = prev_write[0]87 nextW = prev_write[0] = asyncio.Future()88 _write_all(writer, int_size, indices, preds, acts)89 # await _flush(writer, prev_flush_info) # Apparently, `asyncio`'s `.drain()` cannot be trusted to return. Maybe it's because we turned off buffering.90 if asyncio.isfuture(prevW): await prevW # Ensure linear ordering of writes.91 nextW.set_result(None)92 except Exception as err:93 read_lock.set_result(None)94 if not continue_on_errors: raise95 print(err)96 async def steps(cmd):97 P = asyncio.subprocess.PIPE98 proc = await asyncio.create_subprocess_shell(cmd, stdin=P, stdout=P)99 # Turn off buffering. (Only 5 writes per message, so a buffer might not help much anyway.)100 proc.stdin.transport.set_write_buffer_limits(0, 0)101 reader, writer = proc.stdout, proc.stdin102 _write_u32(writer, 0x01020304)103 _write_u32(writer, int_size)104 await _flush(writer, prev_flush_info)105 counter = 0106 read_streams['any'] = asyncio.Queue()107 asyncio.create_task(read(reader))108 while True:109 try:110 read_lock = asyncio.Future()111 asyncio.create_task(step(writer, read_lock))112 if counter % 1000 == 0:113 gc.collect()114 await read_lock115 counter = counter + 1116 except Exception as err:117 if not continue_on_errors: raise118 print(err)119 asyncio.run(steps(cmd))120async def _read_n(stream, n):121 return await stream.readexactly(n)122async def _read_u32(stream):123 bytes = await _read_n(stream, 4)124 return int.from_bytes(bytes, sys.byteorder)125async def _read_data(stream, int_size = 0):126 # Length then data.127 len = await _read_u32(stream)128 byteCount = len*4 if int_size == 0 else len*int_size129 bytes = await _read_n(stream, byteCount)130 dtype = np.float32 if int_size == 0 else np.int8 if int_size == 1 else np.int16131 return np.frombuffer(bytes, dtype)132def _write_u32(stream, x):133 stream.write(x.to_bytes(4, sys.byteorder))134def _write_data(stream, data):135 # Length then data. Don't forget to flush afterwards.136 _write_u32(stream, data.size)137 stream.write(data.tobytes())138def _write_all(stream, int_size, indices, preds, acts):139 # indices/pred/act equal-size lists (or int64 NumPy array, for indices).140 for i in range(len(preds)):141 index, pred, act = indices[i], preds[i], acts[i]142 if pred.dtype != np.float32 or act.dtype != np.float32:143 raise TypeError('Predictions & actions must be float32 arrays')144 _write_u32(stream, int(index.item()))145 _write_data(stream, _encode(pred, int_size))146 _write_data(stream, _encode(act, int_size))147async def _flush(stream, prev_flush):148 # `stream.drain()` can only be called one at a time, so we await the previous flush.149 prev = prev_flush[0]150 fut = prev_flush[0] = asyncio.Future()151 if prev is not None: await prev152 await stream.drain()153 fut.set_result(None)154def _decode(ints):155 if ints.dtype == np.float32:156 return ints157 scale = 127 if ints.dtype == np.int8 else 32767158 nanValue = -scale-1159 x = ints.astype(np.float32)160 return np.where(x == nanValue, np.nan, x / scale)161def _encode(floats, int_size = 0):162 if int_size == 0:163 return floats164 scale = 127 if int_size == 1 else 32767165 nanValue = -scale-1166 rounded = np.where(np.isnan(floats), nanValue, np.rint(np.clip(floats, -1, 1) * scale))167 dtype = np.int8 if int_size == 1 else np.int16168 return rounded.astype(dtype)169def _js_code_for_interfaces(inters, webenv_path):170 code = "const we = require('" + webenv_path + "');"171 code += "we.init(we.io(),"172 for i in inters:173 if isinstance(i, str):174 code = code + i + ','175 else:176 code = code + _js_code_for_args(i) + ','177 code = code + ")"178 return code179def _js_code_for_args(a):180 if isinstance(a, int) or isinstance(a, float) or isinstance(a, str):181 return str(a)182 if isinstance(a, bool):183 return 'true' if a else 'false'184 if isinstance(a, list):185 if not isinstance(a[0], str):186 raise TypeError('Interfaces can only call JS-string functions')187 return a[0] + "(" + ",".join([_js_code_for_args(x) for x in a[1:]]) + ")"188 if isinstance(a, dict):189 return "{" + ",".join([k + ":" + _js_code_for_args(a[k]) for k in a]) + "}"...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

1from cffi import FFI2import io3import itertools4ffibuilder = FFI()5ffibuilder.cdef(r"""6extern "Python" size_t _py_fread(char* ptr, size_t size, size_t count, unsigned stream);7uint64_t _xxh3_stream_int(unsigned stream);8uint64_t _xxh3_int(unsigned stream, unsigned size);9""")10ffibuilder.set_source(11 "_xxh3_cffi",12 r'''13#include <assert.h>14#include "xxh3.h"15#define BUFFER_SIZE 102416#define ASSERT(cond, ...) if (!(cond)) { fprintf(stderr, ##__VA_ARGS__); exit(1); }17static size_t _py_fread(char* ptr, size_t size, size_t count, unsigned stream);18uint64_t _xxh3_int(unsigned stream, unsigned size) {19 char buffer[size];20 size_t read_bytes = _py_fread(buffer, 1, size, stream);21 ASSERT(read_bytes == size, "didnt read enought bytes\n");22 return XXH3_64bits(buffer, size);23}24uint64_t _xxh3_stream_int(unsigned stream) {25 size_t read_bytes;26 XXH3_state_t state;27 char buffer[BUFFER_SIZE];28 ASSERT(XXH3_64bits_reset(&state) != XXH_ERROR, "xxh3 reset failed\n");29 while (1) {30 read_bytes = _py_fread(buffer, 1, BUFFER_SIZE, stream);31 ASSERT(XXH3_64bits_update(&state, buffer, read_bytes) != XXH_ERROR, "xxh3 update failed\n");32 if (BUFFER_SIZE != read_bytes)33 break;34 }35 return XXH3_64bits_digest(&state);36}37 ''',38 sources=['xxh3/xxhash.c'],39 include_dirs=['xxh3'],40 extra_compile_args=['-Wall', '-O3', '-march=native', '-mtune=native'],41)42try:43 from _xxh3_cffi import ffi, lib44except:45 pass46else:47 read_i = itertools.count(0)48 read_streams = {}49 @ffi.def_extern()50 def _py_fread(ptr, size, count, stream):51 size *= count52 val = read_streams[stream].read(size)53 read_size = len(val)54 ffi.memmove(ptr, val, read_size)55 return read_size56 def oneshot_int(val):57 assert len(val) <= 1024, 'to hash large values use the streaming interface'58 in_file = next(read_i)59 read_streams[in_file] = io.BytesIO(val)60 val = lib._xxh3_int(in_file, len(val))61 del read_streams[in_file]62 return val63 return lib._xxh3_int(val, len(val))64 def oneshot_hex(val):65 return hex(oneshot_int(val))[2:].rjust(16, '0')66 def stream_int(reader):67 in_file = next(read_i)68 read_streams[in_file] = reader69 val = lib._xxh3_stream_int(in_file)70 del read_streams[in_file]71 return val72 def stream_hex(reader):73 return hex(stream_int(reader))[2:].rjust(16, '0')74if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful