Best Python code snippet using lisa_python
device_controller.py
Source:device_controller.py  
...98        '''99        Cannot write to device registry device status table100        '''101        raise SegfaultError('Segmentation fault when trying to write word at {}'.format(utils.word_to_str(pos)))102    def _set_device_status(self, ioport_number, status):103        if status < 0:104            status = 0105        if status > 255:106            status = 255107        changed = status != self._device_status_table[ioport_number]108        self._device_status_table[ioport_number] = status109        if changed:110            self.interrupt_controller.send(self.system_interrupts['device_status_changed'][ioport_number])111    def _output_thread_run(self):112        while True:113            try:114                ioport_number, device_host, device_port, command, data = self.output_queue.get(timeout=0.1)115            except queue.Empty:116                if self._stop_event.wait(0):117                    break118                continue119            logger.debug('Command "%s" from IOPort %s', command, ioport_number)120            if command == 'data':121                response = self._send_request(device_host, device_port, 'data', data)122                if response.status_code != 200:123                    raise DeviceError('Could not send data: {}'.format(response.text))124                logger.debug('[Device] %s', response.json()['message'])125            else:126                logger.error('Unknown command')127    def _ping_thread_run(self):128        ping_period = 1  # sec129        while True:130            for ioport in self.ioports:131                if ioport.registered:132                    self._check_and_update_device_status(ioport)133            if self._stop_event.wait(ping_period):134                break135    def _check_and_update_device_status(self, ioport):136        ponged = self._ping_device(ioport)137        if ponged:138            ping_message = 'ponged'139        else:140            ping_message = 'did not pong'141        logger.debug(142            'Device[%s:%s] @ IOPort[%s] %s.',143            ioport.device_host, ioport.device_port,144            ioport.ioport_number,145            ping_message,146        )147        if ponged:148            new_status = 0149        else:150            new_status = self._device_status_table[ioport.ioport_number] + 1151        self._set_device_status(ioport.ioport_number, new_status)152    def _ping_device(self, ioport):153        try:154            response = self._send_request(ioport.device_host, ioport.device_port, 'ping')155            if response.status_code != 200:156                raise DeviceError('Device did not pong.')157        except DeviceControllerError:158            return False159        return True160    def _send_request(self, device_host, device_port, command, data=None, content_type='application/octet-stream'):161        if data is None:162            data = b''163        try:164            response = requests.post(165                'http://{}:{}/{}'.format(166                    device_host,167                    device_port,168                    command,169                ),170                data=data,171                headers={'Content-Type': content_type},172            )173        except requests.exceptions.ConnectionError:174            raise DeviceControllerConnectionError('Could not connect to Aldebaran.')175        logger.debug('Request sent.')176        return response177    def _handle_incoming_request(self, path, headers, rfile):178        '''179        Handle incoming request from devices, called by GenericRequestHandler180        '''181        max_ioport_number = len(self.ioports) - 1182        path = path.lstrip('/')183        if '/' not in path:184            return (185                HTTPStatus.BAD_REQUEST,186                {187                    'error': 'Path must be /ioport/command',188                }189            )190        ioport_number, command = path.split('/', 1)191        try:192            ioport_number = int(ioport_number)193            if ioport_number < 0:194                raise ValueError()195            if ioport_number > max_ioport_number:196                raise ValueError()197        except ValueError:198            return (199                HTTPStatus.BAD_REQUEST,200                {201                    'error': 'IOPort number must be an integer between 0 and {}.'.format(max_ioport_number),202                }203            )204        try:205            request_body_length = int(headers.get('Content-Length'))206        except TypeError:207            return (HTTPStatus.LENGTH_REQUIRED, None)208        data = rfile.read(request_body_length)209        return self._handle_input(ioport_number, command, data)210    def _handle_input(self, ioport_number, command, data):211        '''212        Handle command from device213        '''214        logger.debug('Incoming command "%s" to IOPort %s', command, ioport_number)215        if command == 'register':216            return self._register_device(ioport_number, data)217        if command == 'unregister':218            return self._unregister_device(ioport_number)219        if command == 'ping':220            return (221                HTTPStatus.OK,222                {223                    'message': 'pong',224                }225            )226        if command == 'data':227            return self._send_data_to_ioport(ioport_number, data)228        return (229            HTTPStatus.BAD_REQUEST,230            {231                'error': 'Unknown command: {}'.format(command),232            }233        )234    def _send_data_to_ioport(self, ioport_number, data):235        if not self.ioports[ioport_number].registered:236            logger.info('No device is registered to IOPort %s.', ioport_number)237            return (238                HTTPStatus.FORBIDDEN,239                {240                    'error': 'No device is registered to this IOPort.',241                }242            )243        if len(data) > self.ioports[ioport_number].input_buffer_size:244            logger.info('Too much data sent to IOPort %s.', ioport_number)245            return (246                HTTPStatus.FORBIDDEN,247                {248                    'error': 'Too much data sent.',249                }250            )251        self.ioports[ioport_number].input_queue.put(data)252        logger.info('Delivered data to IOPort %s.', ioport_number)253        self.interrupt_controller.send(self.system_interrupts['ioport_in'][ioport_number])254        return (255            HTTPStatus.OK,256            {257                'message': 'Received data: {}'.format(utils.binary_to_str(data)),258            }259        )260    def _register_device(self, ioport_number, data):261        try:262            data = json.loads(data)263        except json.decoder.JSONDecodeError:264            return (265                HTTPStatus.BAD_REQUEST,266                {267                    'error': 'Could not parse data.',268                }269            )270        for field_name in {'type', 'id', 'host', 'port'}:271            if field_name not in data:272                return (273                    HTTPStatus.BAD_REQUEST,274                    {275                        'error': 'Field "{}" missing.'.format(field_name),276                    }277                )278        try:279            device_type = int(data['type'], 16)280            if device_type < 0x00 or device_type > 0xFF:281                raise ValueError()282        except ValueError:283            return (284                HTTPStatus.BAD_REQUEST,285                {286                    'error': 'Device type must be a 1-byte hex number.',287                }288            )289        try:290            device_id = int(data['id'], 16)291            if device_id < 0x00 or device_id > 0xFFFFFF:292                raise ValueError()293            device_id = list(device_id.to_bytes(3, 'big'))294        except ValueError:295            return (296                HTTPStatus.BAD_REQUEST,297                {298                    'error': 'Device ID must be a 1-byte hex number.',299                }300            )301        device_host, device_port = data['host'], data['port']302        if self.ioports[ioport_number].registered:303            logger.info('A device is already registered to IOPort %s.', ioport_number)304            return (305                HTTPStatus.FORBIDDEN,306                {307                    'error': 'A device is already registered to this IOPort.',308                }309            )310        if self.ioports[ioport_number].input_queue.qsize() > 0:311            logger.info('IOPort %s input queue not empty.', ioport_number)312            return (313                HTTPStatus.FORBIDDEN,314                {315                    'error': 'IOPort input queue not empty.',316                }317            )318        logger.info('Registering device to IOPort %s...', ioport_number)319        logger.info(320            'Device type and ID: %s %s',321            utils.byte_to_str(device_type),322            ' '.join(utils.byte_to_str(device_id[i]) for i in range(3)),323        )324        logger.info('Device host and port: %s:%s', device_host, device_port)325        self._device_registry[4 * ioport_number] = device_type326        for idx in range(3):327            self._device_registry[4 * ioport_number + 1 + idx] = device_id[idx]328        self._set_device_status(ioport_number, 0)329        self.ioports[ioport_number].register_device(device_host, device_port)330        self.interrupt_controller.send(self.system_interrupts['device_registered'])331        logger.info('Device registered to IOPort %s.', ioport_number)332        return (333            HTTPStatus.OK,334            {335                'message': 'Device registered.',336            }337        )338    def _unregister_device(self, ioport_number):339        if not self.ioports[ioport_number].registered:340            logger.info('No device is registered to IOPort %s.', ioport_number)341            return (342                HTTPStatus.FORBIDDEN,343                {344                    'error': 'No device is registered to this IOPort.',345                }346            )347        logger.info('Unregistering device from IOPort %s...', ioport_number)348        for idx in range(4):349            self._device_registry[4 * ioport_number + idx] = 0350        self._set_device_status(ioport_number, 0)351        self.ioports[ioport_number].unregister_device()352        self.interrupt_controller.send(self.system_interrupts['device_unregistered'])353        logger.info('Device unregistered from IOPort %s.', ioport_number)354        return (355            HTTPStatus.OK,356            {357                'message': 'Device unregistered.',358            }359        )360# pylint: disable=missing-docstring361class DeviceControllerError(AldebaranError):362    pass363class DeviceControllerConnectionError(DeviceControllerError):364    pass...polling_status.py
Source:polling_status.py  
...131            if self._metric_statuses[k] != DEVICE_METRICS_STATES.SUCCESS:132                self._metric_statuses[k] = DEVICE_METRICS_STATES.PARTIAL_METRIC_FAILURE133        else:134            self._metric_statuses[k] = DEVICE_METRICS_STATES.SUCCESS135        self._set_device_status()136    def handle_exception(self, k, e):137        """138        Mutate the state of the _device_metrics_status dictionary.139        Args:140            k(str): name of the metric to apply the exception to141            e(Exception): exception which has occurred for the given k142        """143        self.logger.warn(u'Error while trying to poll "%s" (%s) for "%s": %s - %s' %144                         (str(self._device_name), str(self._device_type), k, repr(e), traceback.format_exc()))145        if k in self._metric_statuses:146            if self._metric_statuses[k] in \147                    [DEVICE_METRICS_STATES.SUCCESS, DEVICE_METRICS_STATES.PARTIAL_METRIC_FAILURE]:148                self._metric_statuses[k] = DEVICE_METRICS_STATES.PARTIAL_METRIC_FAILURE149                self._set_device_status()150                return151        if type(e) in EXCEPTIONS_KEYS:152            self._metric_statuses[k] = exceptions_dict[type(e)]153        else:154            found_exception = False155            for exception in inspect.getmro(type(e)):156                if exception in EXCEPTIONS_KEYS:157                    self._metric_statuses[k] = exceptions_dict[exception]158                    found_exception = True159                    break160            if not found_exception:161                self._metric_statuses[k] = DEVICE_METRICS_STATES.INTERNAL_FAILURE162        self._set_device_status()163    def _set_device_status(self):164        """165        Call when done with operations on polling status object to set overall166        device status based upon component metric statuses.167        """168        if len(self._metric_statuses) > 0:169            if all(status == DEVICE_METRICS_STATES.SUCCESS for status in list(self._metric_statuses.values())):170                self._device_status = DEVICE_METRICS_STATES.SUCCESS171            elif DEVICE_METRICS_STATES.SUCCESS in list(self._metric_statuses.values()):172                self._device_status = DEVICE_METRICS_STATES.PARTIAL_METRIC_FAILURE173            else:174                count = Counter(list(self._metric_statuses.values()))175                if len(count.most_common()) > 0:176                    # get the most common (1) status and it's count as a tuple, and grab just its name177                    self._device_status = count.most_common(1)[0][0]...EdgeServer.py
Source:EdgeServer.py  
...66    def set_device_values__by_device_id(self, device_id, update_values: Device_Update_Model):67        # device = tuple(filter(lambda dev: dev.device_id == device_id, self.get_registered_device_list()))68        for device in self.get_registered_device_list():69            if device.device_id == device_id:70                self._set_device_status(device_id, update_values)71                break72    def set_device_values__by_device_type(self, device_type: ActiveDeviceTypes, update_values: Device_Update_Model):73        for device in self.get_registered_device_list():74            if device.device_type == device_type.name:75                self._set_device_status(device.device_id, update_values)76    def set_device_values__by_room(self, room: ActiveRooms, update_values: Device_Update_Model):77        for device in self.get_registered_device_list():78            if device.room_type == room.value:79                self._set_device_status(device.device_id, update_values)80    # Controlling and performing the operations on the devices81    # based on the request received82    def set_status(self, update_values: Device_Update_Model):83        for device in self.get_registered_device_list():84            self._set_device_status(device.device_id, update_values)85    def _set_device_status(self, device_id, update_values: Device_Update_Model):86        self.send_message_to_device(ActiveTopics.DEVICE_UPDATE_REQUEST_TOPIC_NAME.value.format(device_id),87                                    update_values.to_json(), qos=0)88    ################ SET call(s) to set device status/values :: START ################89    # handle different subscribed topics90    def handle_topic(self, topic_name, payload):91        if topic_name == ActiveTopics.DEVICE_REGISTER_REQUEST_TOPIC_NAME.value:92            self._device_registration_handler(Registration_Request.from_json(payload))93        elif topic_name == ActiveTopics.SEND_DEVICE_STATUS_TO_EDGE_TOPIC_NAME.value:94            self._process_device_status(payload)95        elif topic_name == ActiveTopics.DEVICE_UPDATE_COMMAND_RESULT_TOPIC_NAME.value:96            self._handle_device_update_ack(Update_Command_Result.from_json(payload))97        else:98            print(" {} topic is not supported! ".format(topic_name))99    # handle device registration request...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
