Best Python code snippet using dbt-osmosis_python
Serial.py
Source:Serial.py  
...25        """26        # Initialize serial conn27        self.__device_file = device_file28        self.__conn = serial.Serial(self.__device_file, timeout=10)29        self._verify_connection()30        self.__timeout_start = None # type: datetime31    32    def _check_timeout(self, timeout, timeout_start):33        difference = datetime.now() - timeout_start  # type: timedelta34        if difference.total_seconds() >= timeout:35            raise TimeoutException()36    def reset_input_buffer(self):37        """38        Resets the buffer to empty.39        """40        _LOGGER.debug('Serial::reset_input_buffer:: Flushing input buffer.')41        self.__conn.flushInput()42        sleep(.1)43    def read_line(self, timeout=0.5):44        """45        Read line and return char array46        Read line and return char array47        Returns:48            Byte string of chars49        """50        # Initialize timeout51        timeout_start = datetime.now()52        ret_val = b''53        try:54            while ret_val[-1:] != b'\n' or ret_val[-1:] is None:55                self._check_timeout(timeout, timeout_start)56                if self.__conn.inWaiting() > 0:57                    ret_val += self.__conn.read()58        except TimeoutException:59            _LOGGER.debug("Serial::read_line:: Timeout raised.")60        _LOGGER.debug('Serial::read_line:: Read line: {}'.format(str(ret_val)))61        return ret_val62    def send_command(self, command):63        """64        Sends command to the board.65        Args:66            command: byte array to send over serial comm link67        """68        self.__conn.write(command)69        _LOGGER.debug('Serial::send_command:: Wrote: {}'.format(str(command)))70        self.__conn.reset_output_buffer()71        sleep(.5)72    def read_stop(self, command, regex: str, timeout=5):73        """74        Sends command to board. Returns content until stop string satisfied or timeout (s).75        Raises ConnectionRefusalException. Returns byte array of ASCII chars response.76        Args:77            command: Byte array to send over serial comm link78            regex: String to test response for to stop and return79            timeout: Time to wait until regex string is matched before exception raised80        Returns:81            Full byte array response up to and including matching regex line82        """83        _LOGGER.debug('Serial::read_stop:: Sending: {}'.format(command))84        _LOGGER.debug('Serial::read_stop:: Expecting regex: {}'.format(regex))85        # compile regex86        _re = re.compile(regex)87        # send the command88        self.send_command(command)89        # return variable90        ret_val = b''91        # Initialize timeout92        timeout_start = datetime.now()93        found = False94        try:95            while not found:96                self._check_timeout(timeout, timeout_start)97                # grab response98                line = self.read_line()99                ret_val += line100                # set found lv101                found = _re.search(str(line))102        except TimeoutException:103            _LOGGER.debug("Serial::read_stop:: Timeout raised.")104            if timeout != 5:105                raise TimeoutException106        finally:107            self.reset_input_buffer()108        return ret_val109    def open(self):110        """111        Opens connection with board if closed.112        """113        _LOGGER.info('Serial::open:: Opening connection...')114        if not self.__conn.is_open:115            self.__conn.open()116            self._verify_connection()117        else:118            _LOGGER.info('Serial::open:: Connection already open.')119    def _verify_connection(self, timeout=7):120        """121        Verifies connection.122        Args:123            timeout: Time allowed before retry or exception raise124        """125        if self.__conn_tries > self.__max_tries:126            # reset connection tries127            self.__conn_tries = 1128            raise ConnectionRefusalException('Connection failed after {} attempts'129                                             .format(self.__max_tries))130        # Reading variable131        _LOGGER.info('Serial::_verify_connection:: Verifying connection with {}'132                     .format(self.__device_file))133        line = ''134        # Initialize timeout135        timeout_start = datetime.now()136        # clear input137        self.__conn.reset_input_buffer()138        # init help function139        self.__conn.write(b'?\r\n')140        try:141            # last line of main menu in boot loader142            while line != b'run    - run the flash application\r\n':143                self._check_timeout(timeout, timeout_start)144                line = self.read_line()145            # reset connection tries146            self.__conn_tries = 1147            _LOGGER.info('Serial::_verify_connection:: Connection succeeded.')148        except TimeoutException:149            self.__conn_tries += 1150            if self.__conn_tries <= self.__max_tries:151                _LOGGER.info('Serial::_verify_connection:: Retrying... Attempt {} of {}'152                             .format(self.__conn_tries, self.__max_tries))153            # recurse154            self._verify_connection(timeout)155    def close(self):156        """157        Close connection.158        """159        _LOGGER.info('Serial::close:: Closing connection with {}'.format(self.__device_file))...supervisor.py
Source:supervisor.py  
...29class SupervisorModel(BaseModel):30    cls = None31    def __init__(self, sa=None):32        super(SupervisorModel, self).__init__(sa=sa)33    def _verify_connection(self, connection):34        if not isinstance(connection, xmlrpclib.ServerProxy):35            raise Exception('Invalid connection given, got %s, expected %s'36                            % (type(connection), xmlrpclib.ServerProxy))37    def get_connection(self, supervisor_uri):38        uri = supervisor_uri or 'http://'39        try:40            server_connection = xmlrpclib.ServerProxy(uri)41            return server_connection42        except Exception as e:43            log.error(traceback.format_exc())44            raise45    def get_master_log(self, connection, offset, length):46        self._verify_connection(connection)47        return connection.supervisor.readLog(offset, length)48    def get_master_state(self, connection):49        self._verify_connection(connection)50        _data = connection.supervisor.getState()51        _data.update({'pid': connection.supervisor.getPID()})52        _data.update({'id': connection.supervisor.getIdentification()})53        _data.update({'ver': connection.supervisor.getSupervisorVersion()})54        return _data55    def get_group_processes(self, connection, groupid):56        self._verify_connection(connection)57        res = []58        for data in connection.supervisor.getAllProcessInfo():59            if data['group'] == groupid:60               res.append(data)61        return res62    def get_process_info(self, connection, procid):63        self._verify_connection(connection)64        return connection.supervisor.getProcessInfo(procid)65    def read_process_log(self, connection, procid, offset, length):66        self._verify_connection(connection)67        if procid == SUPERVISOR_MASTER:68            log = self.get_master_log(connection, offset, length)69        else:70            log = connection.supervisor.readProcessLog(procid, offset, length)71        # make sure we just return whole lines not to confuse people...Database.py
Source:Database.py  
...5        self.db_name = self.__class__.__name__6        self.connection = 07        self.data = []8    9    def _verify_connection(self):10        if self.connection == 0:11            raise Exception(f"{self.db_name} connection is closed. Open to interact with it.")12    @property13    def fetch(self):14        self._verify_connection()15        print(f"Fetching items from {self.db_name}...")16        listing = [f"Item {self.data.index(item)+1}: {str(item)}" for item in self.data]17        resp = f"{self.db_name} has {len(self.data)} item(s) - {listing}"18        print(resp)19    20    def save(self, items):21        self._verify_connection()22        if type(items) == str:23            print(f"Saving {items}")24            progress(prefix="Uploading")25            self.data.append(items)26            return27        print([f"Saving {item}" for item in items])28        progress(prefix="Uploading", items_lengh=len(items))29        [self.data.append(item) for item in items]30    def update(self):31        self._verify_connection()32        return (f"Update with {self.db_name}")33    def delete(self):34        self._verify_connection()35        return (f"Delete with {self.db_name}")36    37    @property38    def open(self):39        print(f"Openning {self.db_name} connection...")40        self.connection = 141    42    @property43    def close(self):44        print(f"Closing {self.db_name} connection...")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
