How to use flush_all_buffers method in autotest

Best Python code snippet using autotest_python

autotest.py

Source:autotest.py Github

copy

Full Screen

...463 @staticmethod464 def is_client_job_rebooting(last_line):465 return bool(re.match(r'^\t*GOOD\t----\treboot\.start.*$', last_line))466 def log_unexpected_abort(self, stderr_redirector):467 stderr_redirector.flush_all_buffers()468 msg = "Autotest client terminated unexpectedly"469 self.host.job.record("END ABORT", None, None, msg)470 def _execute_in_background(self, section, timeout):471 full_cmd = self.get_background_cmd(section)472 devnull = open(os.devnull, "w")473 self.copy_client_config_file(self.get_client_log())474 self.host.job.push_execution_context(self.results_dir)475 try:476 result = self.host.run(full_cmd, ignore_status=True,477 timeout=timeout,478 stdout_tee=devnull,479 stderr_tee=devnull)480 finally:481 self.host.job.pop_execution_context()482 return result483 @staticmethod484 def _strip_stderr_prologue(stderr):485 """Strips the 'standard' prologue that get pre-pended to every486 remote command and returns the text that was actually written to487 stderr by the remote command."""488 stderr_lines = stderr.split("\n")[1:]489 if not stderr_lines:490 return ""491 elif stderr_lines[0].startswith("NOTE: autotestd_monitor"):492 del stderr_lines[0]493 return "\n".join(stderr_lines)494 def _execute_daemon(self, section, timeout, stderr_redirector,495 client_disconnect_timeout):496 monitor_dir = self.host.get_tmp_dir()497 daemon_cmd = self.get_daemon_cmd(section, monitor_dir)498 # grab the location for the server-side client log file499 client_log_prefix = self.get_client_log()500 client_log_path = os.path.join(self.results_dir, 'debug',501 client_log_prefix + '.log')502 client_log = open(client_log_path, 'w', 0)503 self.copy_client_config_file(client_log_prefix)504 stdout_read = stderr_read = 0505 self.host.job.push_execution_context(self.results_dir)506 try:507 self.host.run(daemon_cmd, ignore_status=True, timeout=timeout)508 disconnect_warnings = []509 while True:510 monitor_cmd = self.get_monitor_cmd(monitor_dir, stdout_read,511 stderr_read)512 try:513 result = self.host.run(monitor_cmd, ignore_status=True,514 timeout=timeout,515 stdout_tee=client_log,516 stderr_tee=stderr_redirector)517 except error.AutoservRunError, e:518 result = e.result_obj519 result.exit_status = None520 disconnect_warnings.append(e.description)521 stderr_redirector.log_warning(522 "Autotest client was disconnected: %s" % e.description,523 "NETWORK")524 except error.AutoservSSHTimeout:525 result = utils.CmdResult(monitor_cmd, "", "", None, 0)526 stderr_redirector.log_warning(527 "Attempt to connect to Autotest client timed out",528 "NETWORK")529 stdout_read += len(result.stdout)530 stderr_read += len(self._strip_stderr_prologue(result.stderr))531 if result.exit_status is not None:532 return result533 elif not self.host.wait_up(client_disconnect_timeout):534 raise error.AutoservSSHTimeout(535 "client was disconnected, reconnect timed out")536 finally:537 client_log.close()538 self.host.job.pop_execution_context()539 def execute_section(self, section, timeout, stderr_redirector,540 client_disconnect_timeout):541 logging.info("Executing %s/bin/autotest %s/control phase %d",542 self.autodir, self.autodir, section)543 if self.background:544 result = self._execute_in_background(section, timeout)545 else:546 result = self._execute_daemon(section, timeout, stderr_redirector,547 client_disconnect_timeout)548 last_line = stderr_redirector.last_line549 # check if we failed hard enough to warrant an exception550 if result.exit_status == 1:551 err = error.AutotestRunError("client job was aborted")552 elif not self.background and not result.stderr:553 err = error.AutotestRunError(554 "execute_section %s failed to return anything\n"555 "stdout:%s\n" % (section, result.stdout))556 else:557 err = None558 # log something if the client failed AND never finished logging559 if err and not self.is_client_job_finished(last_line):560 self.log_unexpected_abort(stderr_redirector)561 if err:562 raise err563 else:564 return stderr_redirector.last_line565 def _wait_for_reboot(self, old_boot_id):566 logging.info("Client is rebooting")567 logging.info("Waiting for client to halt")568 if not self.host.wait_down(HALT_TIME, old_boot_id=old_boot_id):569 err = "%s failed to shutdown after %d"570 err %= (self.host.hostname, HALT_TIME)571 raise error.AutotestRunError(err)572 logging.info("Client down, waiting for restart")573 if not self.host.wait_up(BOOT_TIME):574 # since reboot failed575 # hardreset the machine once if possible576 # before failing this control file577 warning = "%s did not come back up, hard resetting"578 warning %= self.host.hostname579 logging.warning(warning)580 try:581 self.host.hardreset(wait=False)582 except (AttributeError, error.AutoservUnsupportedError):583 warning = "Hard reset unsupported on %s"584 warning %= self.host.hostname585 logging.warning(warning)586 raise error.AutotestRunError("%s failed to boot after %ds" %587 (self.host.hostname, BOOT_TIME))588 self.host.reboot_followup()589 def execute_control(self, timeout=None, client_disconnect_timeout=None):590 if not self.background:591 collector = log_collector(self.host, self.tag, self.results_dir)592 hostname = self.host.hostname593 remote_results = collector.client_results_dir594 local_results = collector.server_results_dir595 self.host.job.add_client_log(hostname, remote_results,596 local_results)597 job_record_context = self.host.job.get_record_context()598 section = 0599 start_time = time.time()600 logger = client_logger(self.host, self.tag, self.results_dir)601 try:602 while not timeout or time.time() < start_time + timeout:603 if timeout:604 section_timeout = start_time + timeout - time.time()605 else:606 section_timeout = None607 boot_id = self.host.get_boot_id()608 last = self.execute_section(section, section_timeout,609 logger, client_disconnect_timeout)610 if self.background:611 return612 section += 1613 if self.is_client_job_finished(last):614 logging.info("Client complete")615 return616 elif self.is_client_job_rebooting(last):617 try:618 self._wait_for_reboot(boot_id)619 except error.AutotestRunError, e:620 self.host.job.record("ABORT", None, "reboot", str(e))621 self.host.job.record("END ABORT", None, None, str(e))622 raise623 continue624 # if we reach here, something unexpected happened625 self.log_unexpected_abort(logger)626 # give the client machine a chance to recover from a crash627 self.host.wait_up(CRASH_RECOVERY_TIME)628 msg = ("Aborting - unexpected final status message from "629 "client on %s: %s\n") % (self.host.hostname, last)630 raise error.AutotestRunError(msg)631 finally:632 logger.close()633 if not self.background:634 collector.collect_client_job_results()635 collector.remove_redundant_client_logs()636 state_file = os.path.basename(self.remote_control_file637 + '.state')638 state_path = os.path.join(self.results_dir, state_file)639 self.host.job.postprocess_client_state(state_path)640 self.host.job.remove_client_log(hostname, remote_results,641 local_results)642 job_record_context.restore()643 # should only get here if we timed out644 assert timeout645 raise error.AutotestTimeoutError()646class log_collector(object):647 def __init__(self, host, client_tag, results_dir):648 self.host = host649 if not client_tag:650 client_tag = "default"651 self.client_results_dir = os.path.join(host.get_autodir(), "results",652 client_tag)653 self.server_results_dir = results_dir654 def collect_client_job_results(self):655 """ A method that collects all the current results of a running656 client job into the results dir. By default does nothing as no657 client job is running, but when running a client job you can override658 this with something that will actually do something. """659 # make an effort to wait for the machine to come up660 try:661 self.host.wait_up(timeout=30)662 except error.AutoservError:663 # don't worry about any errors, we'll try and664 # get the results anyway665 pass666 # Copy all dirs in default to results_dir667 try:668 self.host.get_file(self.client_results_dir + '/',669 self.server_results_dir, preserve_symlinks=True)670 except Exception:671 # well, don't stop running just because we couldn't get logs672 e_msg = "Unexpected error copying test result logs, continuing ..."673 logging.error(e_msg)674 traceback.print_exc(file=sys.stdout)675 def remove_redundant_client_logs(self):676 """Remove client.*.log files in favour of client.*.DEBUG files."""677 debug_dir = os.path.join(self.server_results_dir, 'debug')678 debug_files = [f for f in os.listdir(debug_dir)679 if re.search(r'^client\.\d+\.DEBUG$', f)]680 for debug_file in debug_files:681 log_file = debug_file.replace('DEBUG', 'log')682 log_file = os.path.join(debug_dir, log_file)683 if os.path.exists(log_file):684 os.remove(log_file)685# a file-like object for catching stderr from an autotest client and686# extracting status logs from it687class client_logger(object):688 """Partial file object to write to both stdout and689 the status log file. We only implement those methods690 utils.run() actually calls.691 """692 status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")693 test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")694 fetch_package_parser = re.compile(695 r"^AUTOTEST_FETCH_PACKAGE:([^:]*):([^:]*):(.*)$")696 extract_indent = re.compile(r"^(\t*).*$")697 extract_timestamp = re.compile(r".*\ttimestamp=(\d+)\t.*$")698 def __init__(self, host, tag, server_results_dir):699 self.host = host700 self.job = host.job701 self.log_collector = log_collector(host, tag, server_results_dir)702 self.leftover = ""703 self.last_line = ""704 self.logs = {}705 def _process_log_dict(self, log_dict):706 log_list = log_dict.pop("logs", [])707 for key in sorted(log_dict.iterkeys()):708 log_list += self._process_log_dict(log_dict.pop(key))709 return log_list710 def _process_logs(self):711 """Go through the accumulated logs in self.log and print them712 out to stdout and the status log. Note that this processes713 logs in an ordering where:714 1) logs to different tags are never interleaved715 2) logs to x.y come before logs to x.y.z for all z716 3) logs to x.y come before x.z whenever y < z717 Note that this will in general not be the same as the718 chronological ordering of the logs. However, if a chronological719 ordering is desired that one can be reconstructed from the720 status log by looking at timestamp lines."""721 log_list = self._process_log_dict(self.logs)722 for entry in log_list:723 self.job.record_entry(entry, log_in_subdir=False)724 if log_list:725 self.last_line = log_list[-1].render()726 def _process_quoted_line(self, tag, line):727 """Process a line quoted with an AUTOTEST_STATUS flag. If the728 tag is blank then we want to push out all the data we've been729 building up in self.logs, and then the newest line. If the730 tag is not blank, then push the line into the logs for handling731 later."""732 entry = base_job.status_log_entry.parse(line)733 if entry is None:734 return # the line contains no status lines735 if tag == "":736 self._process_logs()737 self.job.record_entry(entry, log_in_subdir=False)738 self.last_line = line739 else:740 tag_parts = [int(x) for x in tag.split(".")]741 log_dict = self.logs742 for part in tag_parts:743 log_dict = log_dict.setdefault(part, {})744 log_list = log_dict.setdefault("logs", [])745 log_list.append(entry)746 def _process_info_line(self, line):747 """Check if line is an INFO line, and if it is, interpret any control748 messages (e.g. enabling/disabling warnings) that it may contain."""749 match = re.search(r"^\t*INFO\t----\t----(.*)\t[^\t]*$", line)750 if not match:751 return # not an INFO line752 for field in match.group(1).split('\t'):753 if field.startswith("warnings.enable="):754 func = self.job.warning_manager.enable_warnings755 elif field.startswith("warnings.disable="):756 func = self.job.warning_manager.disable_warnings757 else:758 continue759 warning_type = field.split("=", 1)[1]760 func(warning_type)761 def _process_line(self, line):762 """Write out a line of data to the appropriate stream. Status763 lines sent by autotest will be prepended with764 "AUTOTEST_STATUS", and all other lines are ssh error765 messages."""766 status_match = self.status_parser.search(line)767 test_complete_match = self.test_complete_parser.search(line)768 fetch_package_match = self.fetch_package_parser.search(line)769 if status_match:770 tag, line = status_match.groups()771 self._process_info_line(line)772 self._process_quoted_line(tag, line)773 elif test_complete_match:774 self._process_logs()775 fifo_path, = test_complete_match.groups()776 try:777 self.log_collector.collect_client_job_results()778 self.host.run("echo A > %s" % fifo_path)779 except Exception:780 msg = "Post-test log collection failed, continuing anyway"781 logging.exception(msg)782 elif fetch_package_match:783 pkg_name, dest_path, fifo_path = fetch_package_match.groups()784 serve_packages = global_config.global_config.get_config_value(785 "PACKAGES", "serve_packages_from_autoserv", type=bool)786 if serve_packages and pkg_name.endswith(".tar.bz2"):787 try:788 self._send_tarball(pkg_name, dest_path)789 except Exception:790 msg = "Package tarball creation failed, continuing anyway"791 logging.exception(msg)792 try:793 self.host.run("echo B > %s" % fifo_path)794 except Exception:795 msg = "Package tarball installation failed, continuing anyway"796 logging.exception(msg)797 else:798 logging.info(line)799 def _send_tarball(self, pkg_name, remote_dest):800 name, pkg_type = self.job.pkgmgr.parse_tarball_name(pkg_name)801 src_dirs = []802 if pkg_type == 'test':803 for test_dir in ['site_tests', 'tests']:804 src_dir = os.path.join(self.job.clientdir, test_dir, name)805 if os.path.exists(src_dir):806 src_dirs += [src_dir]807 if autoserv_prebuild:808 prebuild.setup(self.job.clientdir, src_dir)809 break810 elif pkg_type == 'profiler':811 src_dirs += [os.path.join(self.job.clientdir, 'profilers', name)]812 if autoserv_prebuild:813 prebuild.setup(self.job.clientdir, src_dir)814 elif pkg_type == 'dep':815 src_dirs += [os.path.join(self.job.clientdir, 'deps', name)]816 elif pkg_type == 'client':817 return # you must already have a client to hit this anyway818 else:819 return # no other types are supported820 # iterate over src_dirs until we find one that exists, then tar it821 for src_dir in src_dirs:822 if os.path.exists(src_dir):823 try:824 logging.info('Bundling %s into %s', src_dir, pkg_name)825 temp_dir = autotemp.tempdir(unique_id='autoserv-packager',826 dir=self.job.tmpdir)827 tarball_path = self.job.pkgmgr.tar_package(828 pkg_name, src_dir, temp_dir.name, " .")829 self.host.send_file(tarball_path, remote_dest)830 finally:831 temp_dir.clean()832 return833 def log_warning(self, msg, warning_type):834 """Injects a WARN message into the current status logging stream."""835 timestamp = int(time.time())836 if self.job.warning_manager.is_valid(timestamp, warning_type):837 self.job.record('WARN', None, None, {}, msg)838 def write(self, data):839 # now start processing the existing buffer and the new data840 data = self.leftover + data841 lines = data.split('\n')842 processed_lines = 0843 try:844 # process all the buffered data except the last line845 # ignore the last line since we may not have all of it yet846 for line in lines[:-1]:847 self._process_line(line)848 processed_lines += 1849 finally:850 # save any unprocessed lines for future processing851 self.leftover = '\n'.join(lines[processed_lines:])852 def flush(self):853 sys.stdout.flush()854 def flush_all_buffers(self):855 if self.leftover:856 self._process_line(self.leftover)857 self.leftover = ""858 self._process_logs()859 self.flush()860 def close(self):861 self.flush_all_buffers()862SiteAutotest = client_utils.import_site_class(863 __file__, "autotest_lib.server.site_autotest", "SiteAutotest",864 BaseAutotest)865_SiteRun = client_utils.import_site_class(866 __file__, "autotest_lib.server.site_autotest", "_SiteRun", _BaseRun)867class Autotest(SiteAutotest):868 pass869class _Run(_SiteRun):870 pass871class AutotestHostMixin(object):872 """A generic mixin to add a run_test method to classes, which will allow873 you to run an autotest client test on a machine directly."""874 # for testing purposes875 _Autotest = Autotest...

Full Screen

Full Screen

prepare_cursor_gaze5.py

Source:prepare_cursor_gaze5.py Github

copy

Full Screen

...81 ftc.connect("localhost", port) # connect to buffer82 hdr = ftc.getHeader()83 data = ftc.getData([0, hdr.nSamples-1])84 return data85 def flush_all_buffers():86 ports_to_use = np.concatenate([Ports.P1,87 Ports.P2,88 [Ports.target_positions_solo],89 Ports.P1_gaze_solo,90 Ports.P2_gaze_solo,91 Ports.joint,92 [Ports.target_positions_joint],93 Ports.P1_gaze_joint,94 Ports.P2_gaze_joint,95 [Ports.session]])96 for port in ports_to_use:97 flush_buffer(port)98 ## buffer data99 close_buffers()100 ## start buffers101 port_info = [(Ports.P1, common.Number.frames),102 (Ports.P2, common.Number.frames),103 ([Ports.target_positions_solo], 1),104 (Ports.P1_gaze_solo, common.Number.frames),105 (Ports.P2_gaze_solo, common.Number.frames),106 (Ports.joint, common.Number.frames),107 ([Ports.target_positions_joint], 1),108 (Ports.P1_gaze_joint, common.Number.frames),109 (Ports.P2_gaze_joint, common.Number.frames),110 ([Ports.session], 1),111 ([Ports.status], 1)]112 for port in port_info:113 for p in port[0]:114 start_buffer(p, port[1])115 ## run interactive routine116 is_escape = False117 flush_buffer(Ports.status)118 flush_all_buffers()119 sessions_to_use = range(0, common.Number.sessions)120 for SESSION in sessions_to_use:121 common.print_stars()122 print(SESSION)123 if SESSION == 1:124 print('missing eye data...')125 continue126 if not is_use_python:127 # ----- load data...128 print(common.Labels.session2[SESSION])129 mat = scipy.io.loadmat('..\\visual_input\\movie_cursor_eye\\movie_data\\' + common.Labels.session2[SESSION] + '.mat')130 else:131 print('using python!')132 # ----- SESSION133 put_data(port=Ports.session, array=prepare_integer_array(SESSION))134 # ----- solo135 if not is_use_python:136 cursor_xy = mat['CURSOR_XY2'][0, 0]137 else:138 cursor_xy = CD_results[SESSION][0].cursor_xy2139 for CURSOR in [0, 1]:140 if CURSOR == 0:141 ports_to_use = Ports.P1142 elif CURSOR == 1:143 ports_to_use = Ports.P2144 for AXIS in [0, 1]:145 trajectory = np.transpose(cursor_xy[AXIS, :, :, CURSOR])146 put_data(port=ports_to_use[AXIS], array=trajectory)147 if not is_use_python:148 target_position = mat['TARGET_POSITION'][0, 0]149 else:150 target_position = CD_results[SESSION][0].target_position151 target_position = target_position[:, np.newaxis]152 put_data(port=Ports.target_positions_solo, array=target_position)153 if not is_use_python:154 gaze = mat['GAZE'][0, 0]155 else:156 gaze = CD_results[SESSION][0].gaze157 gaze = np.transpose(gaze, (1, 0, 2, 3))158 for CURSOR in [0, 1]:159 if CURSOR == 0:160 ports_to_use = Ports.P1_gaze_solo161 elif CURSOR == 1:162 ports_to_use = Ports.P2_gaze_solo163 for AXIS in [0, 1]:164 trajectory = np.transpose(gaze[AXIS, :, :, CURSOR])165 put_data(port=ports_to_use[AXIS], array=trajectory)166 # ----- joint167 if not is_use_python:168 cursor_xy = mat['CURSOR_XY2'][1, 0]169 else:170 cursor_xy = CD_results[SESSION][1].cursor_xy2171 ports_to_use = Ports.joint172 for AXIS in [0, 1]:173 trajectory = np.transpose(cursor_xy[AXIS, :, :, 2])174 put_data(port=ports_to_use[AXIS], array=trajectory)175 if not is_use_python:176 target_position = mat['TARGET_POSITION'][1, 0]177 else:178 target_position = CD_results[SESSION][1].target_position179 target_position = target_position[:, np.newaxis]180 put_data(port=Ports.target_positions_joint, array=target_position)181 if not is_use_python:182 gaze = mat['GAZE'][1, 0]183 else:184 gaze = CD_results[SESSION][1].gaze185 gaze = np.transpose(gaze, (1, 0, 2, 3))186 for CURSOR in [0, 1]:187 if CURSOR == 0:188 ports_to_use = Ports.P1_gaze_joint189 elif CURSOR == 1:190 ports_to_use = Ports.P2_gaze_joint191 for AXIS in [0, 1]:192 trajectory = np.transpose(gaze[AXIS, :, :, CURSOR])193 put_data(port=ports_to_use[AXIS], array=trajectory)194 stop = time.time()195 print(stop-start)196 if SESSION == sessions_to_use[-1]:197 put_data(port=Ports.status, array=prepare_integer_array(99)) # ready198 break199 else:200 put_data(port=Ports.status, array=prepare_integer_array(1)) # ready201 # ------ status202 put_data(Ports.status, prepare_integer_array(9))203 number_of_samples = get_number_of_samples(Ports.status)204 # wait for unity205 while True:206 if number_of_samples != get_number_of_samples(Ports.status):207 is_wait_for_unity = False208 print('connected...')209 break210 else:211 time.sleep(1)212 print('waiting...')213 if keyboard.is_pressed('Esc'): # if key 'q' is pressed214 print('you pressed escape!')215 is_escape = True216 break217 # clear ports218 flush_all_buffers()219 if is_escape:...

Full Screen

Full Screen

prepare_cursor_gaze4.py

Source:prepare_cursor_gaze4.py Github

copy

Full Screen

...82 ftc.connect("localhost", port) # connect to buffer83 hdr = ftc.getHeader()84 data = ftc.getData([0, hdr.nSamples-1])85 return data86def flush_all_buffers():87 ports_to_use = np.concatenate([Ports.P1,88 Ports.P2,89 [Ports.target_positions_solo],90 Ports.P1_gaze_solo,91 Ports.P2_gaze_solo,92 Ports.joint,93 [Ports.target_positions_joint],94 Ports.P1_gaze_joint,95 Ports.P2_gaze_joint,96 [Ports.session]])97 for port in ports_to_use:98 flush_buffer(port)99## buffer data100close_buffers()101## start buffers102port_info = [(Ports.P1, 360),103 (Ports.P2, 360),104 ([Ports.target_positions_solo], 1),105 (Ports.P1_gaze_solo, 360),106 (Ports.P2_gaze_solo, 360),107 (Ports.joint, 360),108 ([Ports.target_positions_joint], 1),109 (Ports.P1_gaze_joint, 360),110 (Ports.P2_gaze_joint, 360),111 ([Ports.session], 1),112 ([Ports.status], 1)]113for port in port_info:114 for p in port[0]:115 start_buffer(p, port[1])116## run interactive routine117is_escape = False118flush_buffer(Ports.status)119flush_all_buffers()120sessions_to_use = np.array(range(3, 20))121for SESSION in sessions_to_use:122 # ----- load data...123 print(Labels.session[SESSION])124 mat = scipy.io.loadmat('..\\visual_input\\movie_cursor_eye\\movie_data\\' + Labels.session[SESSION] + '.mat')125 # ----- SESSION126 put_data(port=Ports.session, array=prepare_integer_array(SESSION))127 # ----- solo128 cursor_xy = mat['CURSOR_XY2'][0, 0]129 for CURSOR in [0, 1]:130 if CURSOR == 0:131 ports_to_use = Ports.P1132 elif CURSOR == 1:133 ports_to_use = Ports.P2134 for AXIS in [0, 1]:135 trajectory = np.transpose(cursor_xy[AXIS, :, :, CURSOR])136 put_data(port=ports_to_use[AXIS], array=trajectory)137 target_position = mat['TARGET_POSITION'][0, 0]138 put_data(port=Ports.target_positions_solo, array=target_position)139 gaze = mat['GAZE'][0, 0]140 for CURSOR in [0, 1]:141 if CURSOR == 0:142 ports_to_use = Ports.P1_gaze_solo143 elif CURSOR == 1:144 ports_to_use = Ports.P2_gaze_solo145 for AXIS in [0, 1]:146 trajectory = np.transpose(gaze[AXIS, :, :, CURSOR])147 put_data(port=ports_to_use[AXIS], array=trajectory)148 # ----- joint149 cursor_xy = mat['CURSOR_XY2'][1, 0]150 ports_to_use = Ports.joint151 for AXIS in [0, 1]:152 trajectory = np.transpose(cursor_xy[AXIS, :, :, 2])153 put_data(port=ports_to_use[AXIS], array=trajectory)154 target_position = mat['TARGET_POSITION'][1, 0]155 put_data(port=Ports.target_positions_joint, array=target_position)156 gaze = mat['GAZE'][1, 0]157 for CURSOR in [0, 1]:158 if CURSOR == 0:159 ports_to_use = Ports.P1_gaze_joint160 elif CURSOR == 1:161 ports_to_use = Ports.P2_gaze_joint162 for AXIS in [0, 1]:163 trajectory = np.transpose(gaze[AXIS, :, :, CURSOR])164 put_data(port=ports_to_use[AXIS], array=trajectory)165 stop = time.time()166 print(stop-start)167 if SESSION == sessions_to_use[-1]:168 put_data(port=Ports.status, array=prepare_integer_array(99)) # ready169 break170 else:171 put_data(port=Ports.status, array=prepare_integer_array(1)) # ready172 # ------ status173 put_data(Ports.status, prepare_integer_array(9))174 number_of_samples = get_number_of_samples(Ports.status)175 # wait for unity176 while True:177 if number_of_samples != get_number_of_samples(Ports.status):178 is_wait_for_unity = False179 print('connected...')180 break181 else:182 time.sleep(1)183 print('waiting...')184 if keyboard.is_pressed('Esc'): # if key 'q' is pressed185 print('you pressed escape!')186 is_escape = True187 break188 # clear ports189 flush_all_buffers()190 if is_escape:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful