Best Python code snippet using localstack_python
sqm.py
Source:sqm.py  
1#2# Copyright 2018-2019 Universidad Complutense de Madrid3#4# This file is part of tessreader5#6# SPDX-License-Identifier: GPL-3.0+7# License-Filename: LICENSE.txt8#9import logging10import time11import re12import datetime13import numpy14from .device import Device, PhotometerConf15MEASURE_RE = re.compile(br"""16                \s* # Skip whitespace17                (?P<cmd>r),18                (?P<magnitude>[\-\s]\d{2}\.\d{2})m,19                (?P<freq>\d{10})Hz,20                (?P<period_counts>\d{10})c,21                (?P<period>\d{7}\.\d{3})s,22                (?P<temp_ambient>[\-\s]\d{3}\.\d)C23                """, re.VERBOSE)24CALIB_RE = re.compile(br"""25                (?P<cmd>c),26                (?P<light_cal>\d{8}\.\d{2})m,27                (?P<dark_period>\d{7}\.\d{3})s,28                (?P<light_cal_temp>[\-\s]\d{3}\.\d)C,29                (?P<off_ref>\d{8}\.\d{2})m,30                (?P<dark_cal_temp>[\-\s]\d{3}\.\d)C31                """, re.VERBOSE)32META_RE = re.compile(br"""33                (?P<cmd>i),34                (?P<protocol_number>\d{8}),35                (?P<model_number>\d{8}),36                (?P<feature_number>\d{8}),37                (?P<serial_number>\d{8})38                """, re.VERBOSE)39_logger = logging.getLogger(__name__)40class SQMConf(PhotometerConf):41    pass42class SQM(Device):43    def __init__(self, name="unknown", model='unknown'):44        super().__init__(name, model)45        # Get Photometer identification codes46        self.protocol_number = 047        self.model_number = 048        self.feature_number = 049        self.serial_number = 050        self.calibration = 051        self.ix_readout = ""52        self.rx_readout = ""53        self.cx_readout = ""54        self.cmd_wait = 155    def static_conf(self):56        conf = SQMConf()57        conf.name = self.name58        conf.model = self.model59        conf.serial_number = self.serial_number60        conf.firmware = self.feature_number61        conf.zero_point = self.calibration62        conf.ix_readout = self.ix_readout63        conf.rx_readout = self.rx_readout64        conf.cx_readout = self.cx_readout65        conf.zero_point = self.calibration66        return conf67    def process_metadata(self, match):68        if match:69            self.ix_readout = match.group()70            self.protocol_number = int(match.group('protocol_number'))71            self.model_number = int(match.group('model_number'))72            self.feature_number = int(match.group('feature_number'))73            self.serial_number = int(match.group('serial_number'))74            return {'cmd': 'i', 'protocol_number': self.protocol_number,75                    'model_number': self.model_number,76                    'feature_number': self.feature_number,77                    'serial_number': self.serial_number}78        else:79            raise ValueError('process_metadata')80    def process_msg(self, match) -> dict:81        """Convert the message from the photometer to unified format"""82        self.rx_readout = match.group()83        result = dict()84        result['name'] = self.name85        result['model'] = 'SQM'86        result['cmd'] = match.group('cmd').decode('utf-8')87        result['magnitude'] = float(match.group('magnitude'))88        #result['freq'] = int(match.group('freq'))89        result['freq_sensor'] = int(match.group('freq'))90        result['period'] = float(match.group('period'))91        result['temp_ambient'] = float(match.group('temp_ambient'))92        result['zero_point'] = self.calibration93        # Add time information94        # Complete the payload with tstamp95        now = datetime.datetime.utcnow()96        result['tstamp'] = now97        return result98    def process_calibration(self, match):99        if match:100            self.cx_readout = match.group()101            self.calibration = float(match.group('light_cal'))102            return {'cmd': 'c', 'calibration': self.calibration}103        else:104            raise ValueError('process_calibration')105    def start_connection(self):106        pass107    def close_connection(self):108        pass109    def reset_device(self):110        """Restart connection"""111        _logger.debug('reset device')112        self.close_connection()113        self.start_connection()114    def read_metadata(self, tries=1):115        """Read the serial number, firmware version."""116        logger = logging.getLogger(__name__)117        cmd = b'ix'118        logger.debug("passing command %s", cmd)119        self.pass_command(cmd)120        time.sleep(self.cmd_wait)121        this_try = 0122        while this_try < tries:123            msg = self.read_msg()124            logger.debug("msg is %s", msg)125            meta_match = META_RE.match(msg)126            if meta_match:127                logger.debug('process metadata')128                pmsg = self.process_metadata(meta_match)129                logger.debug('metadata is %s', pmsg)130                return pmsg131            else:132                logger.warning('malformed metadata, try again')133                this_try += 1134                time.sleep(self.cmd_wait)135                self.reset_device()136                time.sleep(self.cmd_wait)137        logger.error('reading metadata after %d tries', tries)138        raise ValueError139    def read_calibration(self, tries=1):140        """Read the calibration parameters"""141        logger = logging.getLogger(__name__)142        cmd = b'cx'143        logger.debug("passing command %s", cmd)144        self.pass_command(cmd)145        time.sleep(self.cmd_wait)146        this_try = 0147        while this_try < tries:148            msg = self.read_msg()149            logger.debug("msg is %s", msg)150            match = CALIB_RE.match(msg)151            if match:152                logger.debug('process calibration')153                pmsg = self.process_calibration(match)154                logger.debug('calibration is %s', pmsg)155                return pmsg156            else:157                logger.warning('malformed calibration, try again')158                this_try += 1159                time.sleep(self.cmd_wait)160                self.reset_device()161                time.sleep(self.cmd_wait)162        logger.error('reading calibration after %d tries', tries)163        raise ValueError164    def read_data(self, tries=1):165        """Read the calibration parameters"""166        logger = logging.getLogger(__name__)167        cmd = b'rx'168        logger.debug("passing command %s", cmd)169        self.pass_command(cmd)170        time.sleep(self.cmd_wait)171        this_try = 0172        while this_try < tries:173            msg = self.read_msg()174            logger.debug("msg is %s", msg)175            match = MEASURE_RE.match(msg)176            if match:177                pmsg = self.process_msg(match)178                logger.debug('data is %s', pmsg)179                if pmsg['magnitude'] < 0:180                    logger.warning('negative measured magnitude, try again')181                    this_try += 1182                    time.sleep(self.cmd_wait)183                    continue184                return pmsg185            else:186                logger.warning('malformed data, try again')187                logger.debug('data is %s', msg)188                this_try += 1189                time.sleep(self.cmd_wait)190                self.reset_device()191                time.sleep(self.cmd_wait)192        logger.error('reading data after %d tries', tries)193        return None194    def pass_command(self, cmd):195        pass196    def read_msg(self):197        return b''198class SQMLU(SQM):199    def __init__(self, conn, name="", sleep_time=1, tries=10):200        super().__init__(name=name, model='SQM-LU')201        self.serial = conn202        # Clearing buffer203        self.read_msg()204    def start_connection(self):205        """Start photometer connection"""206        _logger.debug('start connection')207        time.sleep(self.cmd_wait)208        self.read_metadata(tries=10)209        time.sleep(self.cmd_wait)210        self.read_calibration(tries=10)211        time.sleep(self.cmd_wait)212        self.read_data(tries=10)213    def close_connection(self):214        """End photometer connection"""215        # Check until there is no answer from device216        _logger.debug('close connection')217        answer = True218        while answer:219            answer = self.read_msg()220        self.serial.close()221    def read_msg(self):222        """Read the data"""223        msg = self.serial.readline()224        return msg225    def pass_command(self, cmd):226        self.serial.write(cmd)227class SQMTest(SQM):228    def __init__(self):229        super().__init__(name='sqmtest', model='SQM-TEST')230        self.ix = b'i,00000004,00000003,00000023,00002142\r\n'231        self.cx = b'c,00000019.84m,0000151.517s, 022.2C,00000008.71m, 023.2C\r\n'232        self.rx = b'r, 19.29m,0000000002Hz,0000277871c,0000000.603s, 029.9C\r\n'233    def start_connection(self):234        """Start photometer connection"""235        self.read_metadata()236        self.read_calibration()237        self.read_data()238    def read_metadata(self, tries=1):239        """Read the serial number, firmware version."""240        msg = self.ix241        match = META_RE.match(msg)242        return self.process_metadata(match)243    def read_calibration(self, tries=1):244        """Read the calibration parameters"""245        msg = self.cx246        match = CALIB_RE.match(msg)247        return self.process_calibration(match)248    def read_data(self, tries=1):249        msg = self.rx250        match = MEASURE_RE.match(msg)...prNew
Source:prNew  
...59        print('###')60        name_no_underscores = name.replace('_', '-')61    else:62        name_no_underscores = name63    cmd_wait(['sed', '-i', 's/<name>/%s/' % name, 'Makefile'])64    cmd_wait(['sed', '-i', 's/<name>/%s/' % name, 'name.cabal'])65    cmd_wait(['sed', '-i',66        's/<name_no_underscores>/%s/' % name_no_underscores, 'name.cabal'])67    cmd_wait(['mv', 'name.cabal', name + '.cabal'])68    cmd_wait(['ln', '-s', 'dist/build/%s/%s' % (name, name), name])69elif template == 'py':70    cmd_wait(['sed', '-i', 's/<name>/%s/' % name, 'setup.py'])71elif template == 'sh':72    cmd_wait(['mv', 'src/name', 'src/' + name])73elif template == 'pl':74    cmd_wait(['mv', 'src/name.pl', 'src/%s.pl' % name])75cmd_wait(['git', 'init'])76cmd_wait(['git', 'add', '.'])...chAnal
Source:chAnal  
...7(opts, args) = opt_p.parse_args()8if args:9    error_out('usage')10def anal(f, dest):11    cmd_wait("echo 'annotateh '%s$' wb 1-999 0.5 5\\nquit' | crafty" %12        arg_esc(f))13    cmd_wait(['ff', f + '.html'])14    cmd_wait('rsync *.pgn* ' + arg_esc(dest))15home = os.getenv('HOME')16if opts.jzhuo:17    os.chdir(os.path.join(home, 'g', 'ch', 'jzhuo'))18    anal(opts.jzhuo, 'dan:/var/www/g/ch/jzhuo')19else:20    os.chdir(os.path.join(home, 'g', 'ch', 'chia'))21    f = cmd_output('mydate')[0] + '.pgn'22    cmd_wait(['mv', 'game.pgn', f])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
