How to use interrupt_str method in avocado

Best Python code snippet using avocado_python

console.py

Source:console.py Github

copy

Full Screen

1# SPDX-License-Identifier: BSD-3-Clause2# Depthcharge: <https://github.com/nccgroup/depthcharge>3"""4Functionality for interacting with a U-Boot console.5"""6import os7import re8import time9import serial10from . import log11from .monitor import Monitor12from .string import str_to_property_keyval13class Console:14 """15 This class encapsulates a serial console interface and provides higher level16 functionality for interacting with a U-Boot console.17 The *device* argument are required to configure the serial console connection.18 Default, but not necessarily correct, values will be used if these are not provided.19 Note that the baudrate can either be specified as part of the *device* string20 or as an additional *baudrate=<value>* keyword argument.21 The *prompt* parameter specifies the U-Boot prompt string that should22 be used to detect when the interactive console is ready to accept input.23 If left as ``None``, Depthcharge will attempt to later determine this using24 the :py:meth:`discover_prompt()` method.25 If you wish to view or capture data sent and received using this console,26 provide a :py:class:`~depthcharge.monitor.Monitor` instance as the *monitor* parameter.27 By default, the underlying serial console is initialized with a 150 ms timeout.28 Lowering this will speed up some operations (e.g. dumping memory via console29 operations), but setting it too low might cause Depthcharge to time out before a30 device has had a chance to finish responding with output from an operation.31 The timeout be changed by providing a float value (in seconds) as a *timeout* keyword argument32 or by setting a *DEPTHCHARGE_CONSOLE_TIMEOUT* environment variable. The latter takes precedence.33 On some systems, you may find that sending too much data at a U-Boot console34 will cause the target device's UART FIFO to fill, dropping characters. You35 can find an example of this here: <https://twitter.com/sz_jynik/status/1414989128245067780>36 If you wish to introduce intra-character delay to console input, provide an *intrachar* keyword37 argument or DEPTHCHARGE_CONSOLE_INTRACHAR environment variable. (The latter takes presence.)38 This floating point value, in seconds, is the minimum amount of time that shall be39 inserted between successive bytes. Realistically, the value will be larger because40 this mode of operation will send each byte with a single write() + flush(),41 incurring non-negligible overhead. You may set a value of 0 to incur only this42 implicit overhead, with no additional sleep()-based delay.43 """44 def __init__(self, device='/dev/ttyUSB0:115200', prompt=None, monitor=None, **kwargs):45 if 'timeout' not in kwargs:46 kwargs['timeout'] = 0.15047 timeout_env = os.getenv('DEPTHCHARGE_CONSOLE_TIMEOUT')48 if timeout_env is not None:49 timeout_env = float(timeout_env)50 kwargs['timeout'] = timeout_env51 self._intrachar = None52 if 'intrachar' in kwargs:53 self._intrachar = kwargs.pop('intrachar')54 intrachar_env = os.getenv('DEPTHCHARGE_CONSOLE_INTRACHAR')55 if intrachar_env is not None:56 self._intrachar = float(intrachar_env)57 # Parse device string and merge its entries into the provided kwargs,58 # giving priority to the items in the device string.59 device, device_args = str_to_property_keyval(device)60 for arg in list(device_args.keys()):61 # Special case - baudrate allowed without 'baudrate=' syntax62 # for convenience.63 if isinstance(device_args[arg], bool):64 try:65 kwargs['baudrate'] = int(arg)66 device_args.pop(arg)67 except ValueError:68 pass69 else:70 try:71 kwargs[arg] = int(device_args[arg])72 except ValueError:73 kwargs[arg] = device_args[arg]74 # If, when Depthcharge is trying to detect a prompt at the console,75 # we see a match for this regular expression, we will enter a76 # loop in which we attempt to reboot the platform.77 #78 # It's a bit of "undocumented magic" that I use myself, but am not79 # really sure about whether other people will find it useful.80 #81 # TODO: Document this as an opt-in behavior in API docs.82 # (More feedback and mulling over this required.)83 #84 self._reboot_re = kwargs.pop('reboot_re', None)85 self._reboot_cmd = kwargs.pop('reboot_cmd', 'reboot || shutdown -r now')86 if self._reboot_re:87 self._reboot_re = re.compile(self._reboot_re)88 msg = (89 'Using regular expression for reboot match trigger: '90 + self._reboot_re.pattern + '\n'91 + ' Will use this command: ' + self._reboot_cmd92 )93 log.note(msg)94 # We're going to pass this off to the Serial constructor in a moment.95 if 'baudrate' not in kwargs:96 self._baudrate = 11520097 kwargs['baudrate'] = self._baudrate98 else:99 self._baudrate = kwargs.get('baudrate')100 # Store Serial constructor arguments for later reopen()101 self._dev = device102 self._kwargs = kwargs103 self._ser = serial.Serial(port=self._dev, **kwargs)104 self._encoding = 'latin-1'105 self.monitor = monitor if monitor is not None else Monitor()106 self.prompt = prompt107 self.interrupt_ind = '<INTERRUPT>'108 @property109 def device(self):110 """111 Device or interface used to communicate with console on target device.112 """113 return self._dev114 @property115 def baudrate(self):116 """117 Serial console's baud rate configuration.118 """119 return self._baudrate120 def send_command(self, cmd: str, read_response=True) -> str:121 """122 Send the provided command (`cmd`) to the attached U-Boot console.123 If `read_response` is `True`, the response is returned. Otherwise,124 `None` is returned and no attempt to read the response data is made.125 If one does not plan to use the response, keep `read_response`126 set to `True` and simply ignore the return value; this will ensure127 response data is removed from underlying buffers.128 """129 self._ser.flush()130 if not cmd.endswith('\n'):131 cmd += '\n'132 self.write(cmd)133 self._ser.flush()134 if read_response:135 resp = self.read()136 resp = self.strip_echoed_input(cmd, resp)137 # We expect this138 if resp.endswith(self.prompt):139 resp = resp[:-len(self.prompt)]140 return resp141 return None142 def interrupt(self, interrupt_str='\x03', timeout=30.0):143 """144 Attempt to interrupt U-Boot and retrieve a console prompt.145 By default, the character associated traditionally with Ctrl-C is sent to interrupt U-Boot.146 If trying to do this at boot-time (within the autoboot grace period),147 note that a specific "Stop String" may be required.148 Refer to U-Boot's doc/README.autoboot regarding the149 `CONFIG_AUTOBOOT_KEYED` and `CONFIG_AUTOBOOT_STOP_STR` configuration150 options for more information.151 """152 self._ser.flush()153 if self.prompt is None or len(self.prompt) == 0:154 log.note('No user-specified prompt provided. Attempting to determine this.')155 return self.discover_prompt(interrupt_str, timeout)156 ret = ''157 t_start = time.time()158 now = t_start159 while (now - t_start) < timeout:160 self.write(interrupt_str)161 self._ser.flush()162 response = self.read()163 ret += response164 if response.endswith(self.prompt):165 return ret166 now = time.time()167 raise TimeoutError('Timed out while attempting to return to U-Boot console prompt')168 def discover_prompt(self, interrupt_str='\x03', timeout=30.0, count=10):169 """170 Attempt to deduce the U-Boot prompt string by repeatedly attempting to interrupt its171 execution by sending *interrupt_str* until *count* consecutive prompt strings are observed.172 """173 t_start = time.time()174 now = t_start175 ret = ''176 candidate = ''177 candidate_count = 0178 while (now - t_start) < timeout:179 self.write(interrupt_str)180 self._ser.flush()181 response = self.read().replace(self.interrupt_ind, '')182 ret += response183 response = response.lstrip().splitlines()184 # We want to see the same thing repeated <count> times,185 # with no other output emitted in between186 if len(response) != 1:187 candidate = ''188 candidate_count = 0189 continue190 response = response[0]191 if candidate in ('', candidate):192 candidate = response193 candidate_count += 1194 if candidate_count >= count:195 # Is this prompt indicative of a state we don't want to be in?196 # (e.g. Linux shell)197 #198 # If so, attempt to issue 'reboot' command.199 response_stripped = response.strip()200 if self._reboot_re is not None and self._reboot_re.match(response_stripped):201 candidate = ''202 candidate_count = 0203 msg = 'Attempting reboot. Matched reboot regex: ' + response_stripped204 log.note(msg)205 self.write(self._reboot_cmd + '\n')206 self._ser.flush()207 if candidate_count > 0:208 log.note('Identified prompt: ' + response)209 self.prompt = response210 return ret211 else:212 candidate = ''213 candidate_count = 0214 now = time.time()215 raise TimeoutError('Timed out while attempting to identify U-Boot console prompt')216 def readline(self, update_monitor=True) -> str:217 """218 Read and return one line from the serial console.219 If `update_monitor` is `True`, this data is recorded by any attached220 :py:class:`~depthcharge.monitor.Monitor`.221 """222 data = self._ser.readline()223 if update_monitor:224 self.monitor.read(data)225 return data.decode(self._encoding)226 def read(self, readlen=64, update_monitor=True) -> str:227 """228 Read the specified number of characters (`readlen`) from the serial console.229 If `update_monitor` is `True`, this data is recorded by any attached230 :py:class:`~depthcharge.monitor.Monitor`.231 """232 raw_data = self.read_raw(readlen, update_monitor=update_monitor)233 ret_str = raw_data.decode(self._encoding)234 return ret_str.replace('\r\n', '\n')235 def read_raw(self, readlen=64, update_monitor=True) -> bytes:236 """237 Read and return `readlen` bytes of raw data from the serial console.238 If `update_monitor` is `True`, this data is recorded by any attached239 :py:class:`~depthcharge.monitor.Monitor`.240 """241 ret = b''242 data = None243 # serial.Serial.read_until() enforces the read timeout at the244 # granularity of the entire payload, rather than a read() timeout.245 # This causes us to time out when reading large responses.246 while data != b'':247 data = self._ser.read(readlen)248 ret += data249 if update_monitor:250 self.monitor.read(ret)251 return ret252 def write(self, data: str, update_monitor=True):253 """254 Write the provided string (`data)` to the serial console.255 If `update_monitor` is `True`, this data is recorded by any attached256 :py:class:`~depthcharge.monitor.Monitor`.257 """258 self.write_raw(data.encode(self._encoding), update_monitor=update_monitor)259 def write_raw(self, data: bytes, update_monitor=True):260 """261 Write the provided raw data bytes to the serial console.262 If `update_monitor` is `True`, this data is recorded by any attached263 :py:class:`~depthcharge.monitor.Monitor`.264 """265 if self._intrachar is None:266 self._ser.write(data)267 else:268 for b in data:269 # A value of 0 will induce only the overhead of a per-byte270 # write() + flush()... which is quite substantial.271 if self._intrachar > 0:272 time.sleep(self._intrachar)273 self._ser.write(b.to_bytes(1, 'little'))274 self._ser.flush()275 if update_monitor:276 self.monitor.write(data)277 def close(self, close_monitor=True):278 """279 Close the serial console connection.280 After this method is called, no further operations on this object281 should be perform, with the exception of :py:meth:`reopen()`.282 If *close_monitor* is ``True`` and a monitor is attached,283 it will be closed as well.284 """285 if self.monitor is not None and close_monitor:286 self.monitor.close()287 self._ser.close()288 def reopen(self):289 """290 Re-open a closed console connection with the same settings it was originally291 created with. After this function returns successfully, the object may292 be used again.293 """294 self._ser = serial.Serial(port=self._dev, **self._kwargs)295 @staticmethod296 def strip_echoed_input(input_str: str, output: str) -> str:297 """298 Remove echoed input (`input_str`) from data read from a serial console299 (`output`) and return the stripped string.300 """301 input_str = input_str.rstrip()302 if output[:len(input_str)] == input_str:303 return output[len(input_str):].lstrip()...

Full Screen

Full Screen

optimize_round_size.py

Source:optimize_round_size.py Github

copy

Full Screen

1#!/usr/bin/env python32import os3import subprocess as sp4import numpy as np5import scipy.stats as st6from sys import argv7def get_auto_cov(x, mean):8 n = len(x)9 xi_sum = np.sum([(x[i] - mean) * (x[i+1] - mean) for i in range(n-1)])10 return xi_sum/(n-2)11nm = []12if __name__ == "__main__":13 try:14 metric = argv[1]15 except:16 raise ValueError()17 devnull = open(os.devnull, 'w')18 # sp.run('make clean', shell=True, stdout=devnull)19 sp.run('make CPPFLAGS="-D PYTHON_SCRIPT"', shell=True, stdout=devnull)20 21 warmup = 50000022 rho = 0.723 interrupt = 024 def next_round_size():25 k = 5026 for i in range(500):27 yield int(k)28 k *= 1.529 interrupt_str = "ON" if interrupt else "OFF"30 print('Parameter: ' + metric + f' (warmup={warmup}, rho1={rho}, interruption {interrupt_str})')31 print('{:<10} {:<10} {:<10} {:<14} {:<12} {:<10} {:^5}'.format('Rounds', 'Round size', 'Mean', 'Autocovariance', 'Variance', 'IC', 'Prec'))32 for round_size in next_round_size():33 acc_avg = dict()34 op = sp.run('./simulador {} {} {} {}'.format(warmup, round_size, rho, interrupt), stderr=sp.PIPE, stdout=devnull, shell=True, check=True)35 x = []36 for line in op.stderr.decode('utf8').strip().split('\n'):37 round_info = line.split()38 metrics = [round_info[i].replace("rm.", "") for i in range(0, len(round_info), 2)]39 round_means = [float(round_info[i]) for i in range(1, len(round_info), 2)]40 x.append(dict(zip(metrics, round_means)))41 xs = dict()42 mean = dict()43 autocov = dict()44 var = dict()45 for param in x[0].keys():46 xs[param] = [xi[param] for xi in x]47 mean[param] = np.mean(xs[param])48 autocov[param] = get_auto_cov(xs[param], mean[param])49 var[param] = np.var(xs[param], ddof=1)50 51 ic = st.t.interval(0.9, len(xs[metric])-1, scale=np.sqrt(var[metric]/len(xs[metric])))52 prec = ic[1]/mean[metric] * 10053 print('{:<10} {:<10} {:<10.6f} {:<14.6f} {:<12.6f} ±{:<10.6f}{:>5.2f}%'.format(len(xs[metric]), round_size, mean[metric], autocov[metric], var[metric], ic[1], prec))54 if any(var[param] / abs(autocov[param]) < 5 for param in xs.keys()):55 continue56 else:57 break...

Full Screen

Full Screen

PomodoroTimer.py

Source:PomodoroTimer.py Github

copy

Full Screen

1"""2Pomodoro Timer3Version 1.04PomodoroTimer.py5Written by Spencer Michalski6July 13, 20197A simple productivity tool based on the Pomodoro Technique.8"""9import logging, time, datetime10""" Set up logging """11LOG_FILENAME = 'PomodoroTimer.log'12logging.basicConfig(filename=LOG_FILENAME, level=logging.DEBUG)13""" Return a timestamp string """14def nowStr():15 return str(datetime.datetime.now())16""" Start a 25-minute Pomodoro, returning a completion status """17def startPomodoro(taskName):18 msg = nowStr() + ": Pomodoro started. Task: " + taskName19 print(msg)20 logging.debug(msg)21 # Time loop for 25 minutes22 minute = 023 while minute < 25:24 try:25 time.sleep(60)26 except KeyboardInterrupt:27 interrupt_str = nowStr() + ": Task interrupted after " + str(minute) + " minutes. "28 print(interrupt_str)29 interrupt_reason = input("What is the reason for your interruption? ")30 log_interruption_str = interrupt_str + "Task: " + taskName + ". Interruption reason: " + interrupt_reason31 logging.debug(log_interruption_str)32 return "n"33 minute += 134 print(str(minute) + " Minutes")35 completeStr = nowStr() + ": Pomodoro completed. Task: " + taskName36 print(completeStr)37 logging.debug(completeStr)38 accomplished = input("Did you accomplish your task? (y/n): ")39 return accomplished40print("Welcome to PomodoroTimer 1.0\n")41# Cycle through Pomodoros, continually allowing the user to work until they Ctrl-C to exit the program.42taskName = input("What is your task? ")43while True:44 taskAccomplished = startPomodoro(taskName)45 print("Get up, stretch, and take a 5 minute break!")46 sameTask = "n"47 if taskAccomplished == "y":48 accomplishedStr = "Task accomplished: " + taskName49 logging.debug(accomplishedStr)50 print("Good job!")51 else:52 sameTask = input("Would you like to continue working on this task? (y/n): ")53 if sameTask != "y":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful