How to use cmd_wait method in localstack

Best Python code snippet using localstack_python

sqm.py

Source:sqm.py Github

copy

Full Screen

1#2# Copyright 2018-2019 Universidad Complutense de Madrid3#4# This file is part of tessreader5#6# SPDX-License-Identifier: GPL-3.0+7# License-Filename: LICENSE.txt8#9import logging10import time11import re12import datetime13import numpy14from .device import Device, PhotometerConf15MEASURE_RE = re.compile(br"""16 \s* # Skip whitespace17 (?P<cmd>r),18 (?P<magnitude>[\-\s]\d{2}\.\d{2})m,19 (?P<freq>\d{10})Hz,20 (?P<period_counts>\d{10})c,21 (?P<period>\d{7}\.\d{3})s,22 (?P<temp_ambient>[\-\s]\d{3}\.\d)C23 """, re.VERBOSE)24CALIB_RE = re.compile(br"""25 (?P<cmd>c),26 (?P<light_cal>\d{8}\.\d{2})m,27 (?P<dark_period>\d{7}\.\d{3})s,28 (?P<light_cal_temp>[\-\s]\d{3}\.\d)C,29 (?P<off_ref>\d{8}\.\d{2})m,30 (?P<dark_cal_temp>[\-\s]\d{3}\.\d)C31 """, re.VERBOSE)32META_RE = re.compile(br"""33 (?P<cmd>i),34 (?P<protocol_number>\d{8}),35 (?P<model_number>\d{8}),36 (?P<feature_number>\d{8}),37 (?P<serial_number>\d{8})38 """, re.VERBOSE)39_logger = logging.getLogger(__name__)40class SQMConf(PhotometerConf):41 pass42class SQM(Device):43 def __init__(self, name="unknown", model='unknown'):44 super().__init__(name, model)45 # Get Photometer identification codes46 self.protocol_number = 047 self.model_number = 048 self.feature_number = 049 self.serial_number = 050 self.calibration = 051 self.ix_readout = ""52 self.rx_readout = ""53 self.cx_readout = ""54 self.cmd_wait = 155 def static_conf(self):56 conf = SQMConf()57 conf.name = self.name58 conf.model = self.model59 conf.serial_number = self.serial_number60 conf.firmware = self.feature_number61 conf.zero_point = self.calibration62 conf.ix_readout = self.ix_readout63 conf.rx_readout = self.rx_readout64 conf.cx_readout = self.cx_readout65 conf.zero_point = self.calibration66 return conf67 def process_metadata(self, match):68 if match:69 self.ix_readout = match.group()70 self.protocol_number = int(match.group('protocol_number'))71 self.model_number = int(match.group('model_number'))72 self.feature_number = int(match.group('feature_number'))73 self.serial_number = int(match.group('serial_number'))74 return {'cmd': 'i', 'protocol_number': self.protocol_number,75 'model_number': self.model_number,76 'feature_number': self.feature_number,77 'serial_number': self.serial_number}78 else:79 raise ValueError('process_metadata')80 def process_msg(self, match) -> dict:81 """Convert the message from the photometer to unified format"""82 self.rx_readout = match.group()83 result = dict()84 result['name'] = self.name85 result['model'] = 'SQM'86 result['cmd'] = match.group('cmd').decode('utf-8')87 result['magnitude'] = float(match.group('magnitude'))88 #result['freq'] = int(match.group('freq'))89 result['freq_sensor'] = int(match.group('freq'))90 result['period'] = float(match.group('period'))91 result['temp_ambient'] = float(match.group('temp_ambient'))92 result['zero_point'] = self.calibration93 # Add time information94 # Complete the payload with tstamp95 now = datetime.datetime.utcnow()96 result['tstamp'] = now97 return result98 def process_calibration(self, match):99 if match:100 self.cx_readout = match.group()101 self.calibration = float(match.group('light_cal'))102 return {'cmd': 'c', 'calibration': self.calibration}103 else:104 raise ValueError('process_calibration')105 def start_connection(self):106 pass107 def close_connection(self):108 pass109 def reset_device(self):110 """Restart connection"""111 _logger.debug('reset device')112 self.close_connection()113 self.start_connection()114 def read_metadata(self, tries=1):115 """Read the serial number, firmware version."""116 logger = logging.getLogger(__name__)117 cmd = b'ix'118 logger.debug("passing command %s", cmd)119 self.pass_command(cmd)120 time.sleep(self.cmd_wait)121 this_try = 0122 while this_try < tries:123 msg = self.read_msg()124 logger.debug("msg is %s", msg)125 meta_match = META_RE.match(msg)126 if meta_match:127 logger.debug('process metadata')128 pmsg = self.process_metadata(meta_match)129 logger.debug('metadata is %s', pmsg)130 return pmsg131 else:132 logger.warning('malformed metadata, try again')133 this_try += 1134 time.sleep(self.cmd_wait)135 self.reset_device()136 time.sleep(self.cmd_wait)137 logger.error('reading metadata after %d tries', tries)138 raise ValueError139 def read_calibration(self, tries=1):140 """Read the calibration parameters"""141 logger = logging.getLogger(__name__)142 cmd = b'cx'143 logger.debug("passing command %s", cmd)144 self.pass_command(cmd)145 time.sleep(self.cmd_wait)146 this_try = 0147 while this_try < tries:148 msg = self.read_msg()149 logger.debug("msg is %s", msg)150 match = CALIB_RE.match(msg)151 if match:152 logger.debug('process calibration')153 pmsg = self.process_calibration(match)154 logger.debug('calibration is %s', pmsg)155 return pmsg156 else:157 logger.warning('malformed calibration, try again')158 this_try += 1159 time.sleep(self.cmd_wait)160 self.reset_device()161 time.sleep(self.cmd_wait)162 logger.error('reading calibration after %d tries', tries)163 raise ValueError164 def read_data(self, tries=1):165 """Read the calibration parameters"""166 logger = logging.getLogger(__name__)167 cmd = b'rx'168 logger.debug("passing command %s", cmd)169 self.pass_command(cmd)170 time.sleep(self.cmd_wait)171 this_try = 0172 while this_try < tries:173 msg = self.read_msg()174 logger.debug("msg is %s", msg)175 match = MEASURE_RE.match(msg)176 if match:177 pmsg = self.process_msg(match)178 logger.debug('data is %s', pmsg)179 if pmsg['magnitude'] < 0:180 logger.warning('negative measured magnitude, try again')181 this_try += 1182 time.sleep(self.cmd_wait)183 continue184 return pmsg185 else:186 logger.warning('malformed data, try again')187 logger.debug('data is %s', msg)188 this_try += 1189 time.sleep(self.cmd_wait)190 self.reset_device()191 time.sleep(self.cmd_wait)192 logger.error('reading data after %d tries', tries)193 return None194 def pass_command(self, cmd):195 pass196 def read_msg(self):197 return b''198class SQMLU(SQM):199 def __init__(self, conn, name="", sleep_time=1, tries=10):200 super().__init__(name=name, model='SQM-LU')201 self.serial = conn202 # Clearing buffer203 self.read_msg()204 def start_connection(self):205 """Start photometer connection"""206 _logger.debug('start connection')207 time.sleep(self.cmd_wait)208 self.read_metadata(tries=10)209 time.sleep(self.cmd_wait)210 self.read_calibration(tries=10)211 time.sleep(self.cmd_wait)212 self.read_data(tries=10)213 def close_connection(self):214 """End photometer connection"""215 # Check until there is no answer from device216 _logger.debug('close connection')217 answer = True218 while answer:219 answer = self.read_msg()220 self.serial.close()221 def read_msg(self):222 """Read the data"""223 msg = self.serial.readline()224 return msg225 def pass_command(self, cmd):226 self.serial.write(cmd)227class SQMTest(SQM):228 def __init__(self):229 super().__init__(name='sqmtest', model='SQM-TEST')230 self.ix = b'i,00000004,00000003,00000023,00002142\r\n'231 self.cx = b'c,00000019.84m,0000151.517s, 022.2C,00000008.71m, 023.2C\r\n'232 self.rx = b'r, 19.29m,0000000002Hz,0000277871c,0000000.603s, 029.9C\r\n'233 def start_connection(self):234 """Start photometer connection"""235 self.read_metadata()236 self.read_calibration()237 self.read_data()238 def read_metadata(self, tries=1):239 """Read the serial number, firmware version."""240 msg = self.ix241 match = META_RE.match(msg)242 return self.process_metadata(match)243 def read_calibration(self, tries=1):244 """Read the calibration parameters"""245 msg = self.cx246 match = CALIB_RE.match(msg)247 return self.process_calibration(match)248 def read_data(self, tries=1):249 msg = self.rx250 match = MEASURE_RE.match(msg)...

Full Screen

Full Screen

prNew

Source:prNew Github

copy

Full Screen

...59 print('###')60 name_no_underscores = name.replace('_', '-')61 else:62 name_no_underscores = name63 cmd_wait(['sed', '-i', 's/<name>/%s/' % name, 'Makefile'])64 cmd_wait(['sed', '-i', 's/<name>/%s/' % name, 'name.cabal'])65 cmd_wait(['sed', '-i',66 's/<name_no_underscores>/%s/' % name_no_underscores, 'name.cabal'])67 cmd_wait(['mv', 'name.cabal', name + '.cabal'])68 cmd_wait(['ln', '-s', 'dist/build/%s/%s' % (name, name), name])69elif template == 'py':70 cmd_wait(['sed', '-i', 's/<name>/%s/' % name, 'setup.py'])71elif template == 'sh':72 cmd_wait(['mv', 'src/name', 'src/' + name])73elif template == 'pl':74 cmd_wait(['mv', 'src/name.pl', 'src/%s.pl' % name])75cmd_wait(['git', 'init'])76cmd_wait(['git', 'add', '.'])...

Full Screen

Full Screen

chAnal

Source:chAnal Github

copy

Full Screen

...7(opts, args) = opt_p.parse_args()8if args:9 error_out('usage')10def anal(f, dest):11 cmd_wait("echo 'annotateh '%s$' wb 1-999 0.5 5\\nquit' | crafty" %12 arg_esc(f))13 cmd_wait(['ff', f + '.html'])14 cmd_wait('rsync *.pgn* ' + arg_esc(dest))15home = os.getenv('HOME')16if opts.jzhuo:17 os.chdir(os.path.join(home, 'g', 'ch', 'jzhuo'))18 anal(opts.jzhuo, 'dan:/var/www/g/ch/jzhuo')19else:20 os.chdir(os.path.join(home, 'g', 'ch', 'chia'))21 f = cmd_output('mydate')[0] + '.pgn'22 cmd_wait(['mv', 'game.pgn', f])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful