Best Python code snippet using hypothesis
ab_service.py
Source:ab_service.py  
1# -*- coding: utf-8 -*-2from __future__ import division3import copy4from collections import namedtuple5from datetime import datetime6from struct import unpack7from stars.apps.customer.finance.ab import ab_util, utils8from stars.apps.customer.finance.ab.common_const import AgriculturalBankTradeConstData9class Handler(object):10    def handle(self):11        pass12class ErrorHandler(Handler):13    def handle(self):14        pass15class SucHandler(Handler):16    def handle(self):17        pass18class AbService(object):19    head_msg_type = 'R'20    trade_no = ''21    msg_template = ''22    trade_source = ''23    __mch_id = AgriculturalBankTradeConstData.INST_NO24    Header = namedtuple('Header', 'api_type ver msg_size head_msg_type trade_no mch_id encryption_flag mac_flag mac reserve')25    __header_fmt = '!1s3s5s1s5s8s1s1s8s8s'26    __key_fields_set = {'BankPassword', 'TradePassword', 'FundPassword', 'InstOprPwd'}27    must_exist_item = set()28    def __init__(self, suc_handler=None, error_handler=None):29        self.suc_handler = suc_handler30        self.error_handler = error_handler31    # def get_msg_template(self):32    #     return msg_template33    @classmethod34    def create_ab_inst_serial(cls):35        return ab_util.create_ab_inst_serial(ab_util.get_service_name_for_ab(cls.trade_no))36    def get_trade_no(self):37        return self.trade_no38    def send_and_receive(self, data):39        raise NotImplemented40    def send(self, data):41        raise NotImplemented42    def receive(self, size):43        raise NotImplemented44    def get_mch_id(self):45        return self.__class__.__mch_id46    def get_key(self):47        return ab_util.get_key()48    def get_date(self):49        return datetime.today().strftime('%Y%m%d')50    def get_time(self):51        return datetime.today().strftime('%H%M%S')52    @staticmethod53    def get_header_from_text(text):54        a=AbService.Header._make(unpack(AbService.__header_fmt, text))55        return a._replace(msg_size=int(a.msg_size))56    def split_header_msg(self, buf):57        if isinstance(buf, list):58            buf = ''.join(buf)59        s = buf[:41]60        header = self.get_header_from_text(s)61        msg_size = header.msg_size62        cr_msg = buf[41: 41+msg_size]63        return header, cr_msg64    def decrypt_recv_msg_body(self, cr_msg, buf_size, pack_key, pin_key, mac_key):65        # with open('b.txt', 'wb') as w:66        #     w.write(cr_msg)67        msg = self.decrypt_msg(cr_msg, pack_key, pin_key)68        # TODO for test69        with open('received_msg.txt', 'wb') as w:70            w.write(msg)71        # b=chardet.detect(msg)72        # print(b)73        data = utils.make_dict_from_xml(msg)74        return data75    def decrypt_msg(self, data, pack_key, pin_key):76        try:77            msg = ab_util.ab_decrypt_msg(data, pack_key)78            # å
³é®å79            key_fields = self.__class__.__key_fields_set.intersection(set(data.keys()))80            if key_fields:81                data = copy.deepcopy(data)82                for ele in key_fields:83                    if data[ele]:84                        data[ele] = ab_util.ab_decrypt_password(data[ele], pin_key)85            return msg86        except Exception as e:87            print(e)88    def check_msg_text(self, data, pack_key, pin_key, mac_key):89        s = self.check_msg_text_for_common(data, pack_key, pin_key, mac_key)90        return s91    def make_data(self, data, pack_key, pin_key, mac_key, msg_template=None):92        return self.__make_data(data, pack_key, pin_key, mac_key, msg_template)93    def get_code(self, s):94        return '17{}'.format(s)95    def check_mac(self, text, mac, mac_key, pack_key, pin_key):96        # 䏿£æ¥97        # if not ab_util.check_mac(text, mac, mac_key):98        #     code = self.get_code('2043')99        #     m = {'InstructionCode': self.__class__.trade_no,  # 交æå·100        #         'TradeSource': 'B',   # 交æåèµ·æ¹ï¼å¸åº101        #         'InstNo': self.get_mch_id(),   # å¸åºç¼å·102        #         'Date': self.get_date(),103        #          'Time': self.get_time(),104        #          'Code': code,105        #     }106        #     # t = self.make_data(m, pack_key, pin_key, mac_key)107        #     t = m108        #     return t109        return None110    def check_msg_text_for_common(self, data, pack_key, pin_key, mac_key):111        # è§£æé误112        m = None113        if not data:114            code = self.get_code('2044')  # éè®¯æ¶æ¯ä½æ ¼å¼é误115            m = {'InstructionCode': self.__class__.trade_no,  # 交æå·116                'TradeSource': self.trade_source,  # 交æåèµ·æ¹117                'InstNo': self.get_mch_id(),  # å¸åºç¼å·118                'Date': self.get_date(),119                 'Time': self.get_time(),120                 'Code': code,121                 'Info': u'éè®¯æ¶æ¯ä½æ ¼å¼é误:'122                 }123        # 缺å°å¿
须项124        elif self.must_exist_item and not self.must_exist_item.issubset(set(data.keys())):125            code = self.get_code('2044')126            m = {'InstructionCode': self.__class__.trade_no,  # 交æå·127                'TradeSource': self.trade_source,  # 交æåèµ·æ¹128                'InstNo': self.get_mch_id(),  # å¸åºç¼å·129                'Date': self.get_date(),130                 'Time': self.get_time(),131                 'Code': code,132                 'Info': u'缺å°å¿
须项:' + ','.join(self.must_exist_item - set(data.keys()))133                 }134        elif data.get('InstructionCode') != self.trade_no:135            code = '2031'  # 交æç é136            m = {'InstructionCode': self.__class__.trade_no,  # 交æå·137                'TradeSource': self.trade_source,   # 交æåèµ·æ¹138                'InstNo': self.get_mch_id(),   # å¸åºç¼å·139                'Date': self.get_date(),140                 'Time': self.get_time(),141                 'Code': code,142                 'Info': u'交æç é:'143            }144        elif not utils.is_none_or_empty_or_blank(data.get('InstNo')) and data.get('InstNo') != self.get_mch_id():145            code = '2031'  # å¸åºç¼ç é146            m = {'InstructionCode': self.__class__.trade_no,  # 交æå·147                'TradeSource': self.trade_source,   # 交æåèµ·æ¹148                'InstNo': self.get_mch_id(),   # å¸åºç¼å·149                'Date': self.get_date(),150                 'Time': self.get_time(),151                 'Code': code,152                 'Info': u'å¸åºç¼ç é:'153            }154        if m:155            t = self.make_data(m, pack_key, pin_key, mac_key)156            return t157        return None158    def __make_header(self, msg_size, mac):159        return ab_util.make_header(msg_size, mac=mac,160                                   msg_type=self.__class__.head_msg_type,161                                   trade_no=self.__class__.trade_no,162                                   mch_id=self.__class__.__mch_id)163    def __make_enc_msg(self, data, pack_key, pin_key=None, msg_template=None ):164        key_fields = self.__class__.__key_fields_set.intersection(set(data.keys()))165        if key_fields:166            data = copy.deepcopy(data)167            for ele in key_fields:168                if data[ele]:169                    data[ele] = ab_util.ab_encrypt_password(data[ele], pin_key)170        if not msg_template:171            s = self.__class__.msg_template.decode('utf8').format(**data)172        else:173            s = msg_template.decode('utf8').format(**data)174        s = s.encode('gbk')175        # s = s.encode('utf8')176        with open('r_msg.txt', 'wb') as w:177            w.write(s)178            # w.write(pack_key)179            # w.write(pin_key)180        # logging.exception('test by lwj 11111111111111111')181        # logging.exception('pa_k :'+pack_key)182        # logging.exception('pi_k :'+pin_key)183        c_msg = ab_util.ab_encrypt_msg(s, pack_key)184        return c_msg185    def __make_data(self, data, pack_key, pin_key, mac_key, msg_template=None ):186        c_msg = self.__make_enc_msg(data, pack_key, pin_key,  msg_template)187        mac = ab_util.make_mac(c_msg, mac_key)188        header = self.__make_header(len(c_msg), mac)189        return header + c_msg190    def __check_header_msg(self, header, recv_msg, mac_key):191        mac = header.mac192        if not ab_util.check_mac(recv_msg, mac, mac_key):193            pass # TODO194    def do_event(self, data, keys=None):...rbpi_controller.py
Source:rbpi_controller.py  
1'''Created on 2018/11/172   last update 2018/12/073@author: Taylor W Hickem4'''5import sys6import RPi.GPIO as GPIO7import datetime as dt8import logger9LOGGER_STATUS = True    #log results to csv file Y/N?10#define the settings11light_settings = {}12pump_settings = {}13light_settings['start_hrs'] = 614light_settings['lights-T'] = {'OFF hrs':8.00}15light_settings['lights-M'] = None16light_settings['lights-L'] = {'OFF hrs':8.00}17pump_settings['pumpA'] = {'ON sec':30,'OFF sec':900,'last_update':None,'last_value':None}18pump_settings['pumpB'] = {'ON sec':60,'OFF sec':600,'last_update':None,'last_value':None}19#define the GPIO pin arrangement20pins = {'pumpA':17,'pumpB':5,'lights-M':25,'lights-L':23,'lights-T':24}21#define the default value settings22# the pumps are configured as NC and the relay is inverted23# so a 1 corresponds to the OFF position24# for the lights, they are configured as NO so a 1 corresponds to ON25ON_values = {'pumpA':0,'pumpB':0,'lights-M':1,'lights-L':1,'lights-T':1}26defaultValues = {'pumpA':False,'pumpB':False,'lights-T':True,'lights-M':False,'lights-L':True}27def setupPins():28    for pin in pins:29        GPIO.setup(pins[pin],GPIO.OUT)30def pinIsON(pin_key):31    pinId = pins[pin_key]32    pinONValue = ON_values[pin_key]33    GPIOValue = GPIO.input(pinId)34    isON = (GPIOValue==pinONValue)35    return isON36def setPinValue(pin_key,isON):37    pinId = pins[pin_key]38    pinONValue = ON_values[pin_key]39    if isON:40        outputValue = pinONValue41    else:42        outputValue = 1-pinONValue43    GPIO.output(pinId,outputValue)44    45def set_defaultValues():46    for pin in pins:47        setPinValue(pin,defaultValues[pin])48        if LOGGER_STATUS:49            logger.writelog_controller_action('set default value'50                ,defaultValues[pin]*1,'set default value for %s' % pin)51        52def updateLightStatus(light_key,nowTime):53    settings = light_settings[light_key]54    if not settings is None:55        start_hrs = light_settings['start_hrs']56        offHrs = settings['OFF hrs']57        onHrs = 24 - offHrs58        isON = pinIsON(light_key)59        if start_hrs+onHrs < 24: # OFF time is within same day60            sameDay = True61            end_hrs = start_hrs + onHrs62        else:   #OFF time is the next day63            sameDay = False64            end_hrs = onHrs - (24-start_hrs)65        now_hrs = nowTime.hour + float(nowTime.minute)/6066        if sameDay:67            elapsed = [now_hrs-start_hrs,now_hrs-end_hrs]68            if elapsed[0]<0 and elapsed[1]<0: # OFF69                target_state = False70            elif elapsed[0]>=0 and elapsed[1]<0: # ON71                target_state = True72            else: # OFF73                target_state = False74        else:75            elapsed = [now_hrs-end_hrs,now_hrs-start_hrs]76            if elapsed[0]<0 and elapsed[1]<0: # ON77                target_state = True78            elif elapsed[0]>=0 and elapsed[1]<0: # OFF79                target_state = False80            else: # ON81                target_state = True82        if not (isON == target_state):83            setPinValue(light_key,target_state)84            if LOGGER_STATUS:85                logger.writelog_controller_action('change light status'86                    ,target_state*1,'changed light status for %s' % light_key)87            88def updatePumpStatus(pump_key,nowTime):89    90    def changePumpStatus(isON):91        setPinValue(pump_key,isON)92        pump_settings[pump_key]['last_update'] = nowTime93        pump_settings[pump_key]['last_value'] = isON94        if LOGGER_STATUS:95            logger.writelog_controller_action('change pump status'96                ,isON*1,'changed pump status for %s' % pump_key)97    settings = pump_settings[pump_key]98    last_update = settings['last_update']99    last_value = settings['last_value']100    isON = pinIsON(pump_key)101    if last_update is None:102        last_update = nowTime103        last_value = isON104        pump_settings[pump_key]['last_update'] = last_update105        pump_settings[pump_key]['last_value'] = last_value106    ONOFF_sec = (settings['ON sec'],settings['OFF sec'])107    elapsed = nowTime - last_update108    if last_value:  #ON109        if elapsed.seconds>ONOFF_sec[0]:110            if isON:111                changePumpStatus(False)112        else:113            if not isON:114                changePumpStatus(True)115    else:           #OFF116        if elapsed.seconds>ONOFF_sec[1]:117            if not isON:118                changePumpStatus(True)119        else:120            if isON:121                changePumpStatus(False)122def updateLightsStatus(nowTime):123    for light_key in [x for x in light_settings if not x == 'start_hrs']:124        updateLightStatus(light_key,nowTime)125def updatePumpsStatus(nowTime):126    for pump_key in pump_settings:127        updatePumpStatus(pump_key,nowTime)128    129def run_process():130    while True:131        nowTime = dt.datetime.now()132        updateLightsStatus(nowTime)133        updatePumpsStatus(nowTime)134def main():135        # set the mode136        GPIO.setmode(GPIO.BCM)137        # setup the pins138        setupPins()139        140        # set pins to default values141        set_defaultValues()142        143        # run the process144        run_process()145if __name__ == "__main__":146    try:147        main()148    except BaseException as e:149        print('shutting down for exception %s' % e)150        print(e.message)151        if LOGGER_STATUS:152            logger.writelog_exception(153                str(e.message),154                value=str(e.args),155                exception_type=type(e).__name__)156        # set to default values157        set_defaultValues()158        GPIO.cleanup()...harness.py
Source:harness.py  
1from dbmanager import DBManager2def harnessapp(config):3    dbm = DBManager(config)4    dbm.readdb()5    data = dbm.getdata()6    cables = data['cable']7    output = []8    for cable in cables:9        output.append(cable['Parent'] + '/' + cable['Name'])10        # output.append('\tConnectors:')11        for conn in cable['Connectors']:12            output.append('-'*4 + conn['Name'] + ':' + conn['BlockType'])13            try:14                output.append('-'*8 + 'Family: ' + conn['Family'])15            except KeyError:16                output.append('-'*8 +'Part number: ' + conn['PartNumber'])17            for pin in range(1, int(conn['Pins'])+1):18                if pin < 10:19                    pin_key = 'Pin0' + str(pin)20                else:21                    pin_key = 'Pin' + str(pin)22                try:23                    if conn[pin_key] != '':24                        output.append('-'*8 + pin_key + ': ' + conn[pin_key])25                except KeyError:26                    pass27        output.append('.')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
