How to use _send_packet method in autotest

Best Python code snippet using autotest_python

bv5.py

Source:bv5.py Github

copy

Full Screen

...294 payload = struct.pack("<4B3I4s2I24s", options['function'], options['target'], options['vid'],295 options['options'], options['length'], options['addr'], options['crc'],296 options['type'], options['timestamp'], options['version'],297 options['name'].encode(encoding='ascii'))298 self._send_packet(self.FILE_INIT, payload)299 rx = self._recv_packet('<H2I')300 rx = dict(zip(['max_packet_size', 'file_size', 'crc'], rx))301 return rx302 def ft_complete(self, run_mode=RUN_NONE):303 payload = struct.pack('<B', run_mode)304 self._send_packet(self.FILE_EXIT, payload)305 return self._recv_packet()306 def ft_write(self, address, data):307 data = self._align(data)308 fmt = f'<I{len(data)}s'309 payload = struct.pack(fmt, address, data)310 self._send_packet(self.FILE_WRITE, payload)311 return self._recv_packet()312 def ft_read(self, address, no_bytes):313 aligned_no = math.ceil(no_bytes / 4) * 4314 payload = struct.pack('<IH', address, aligned_no)315 self._send_packet(self.FILE_READ, payload)316 resp = self._recv_packet(f'<I{no_bytes}s{aligned_no - no_bytes}s', check_ack=False)317 return resp[1]318 def ft_set_link(self, link_name, vid=VID_USER):319 payload = struct.pack('<2I24s', vid, 0, link_name.encode('ascii'))320 self._send_packet(self.FILE_LINK, payload)321 return self._recv_packet()322 def get_dir_count(self, vid=VID_USER):323 payload = struct.pack('<2B', vid, 0)324 self._send_packet(self.FILE_DIR, payload)325 return self._recv_packet('<h')[0]326 def get_file_metadata_by_idx(self, idx):327 payload = struct.pack('<2B', idx, 0)328 self._send_packet(self.FILE_DIR_ENTRY, payload)329 resp = self._recv_packet('<B3L4s2l24s')330 resp = dict(zip(['idx', 'size', 'addr', 'crc', 'type', 'timestamp', 'version', 'linked_filename'], resp))331 resp['type'] = resp['type'].split(b'\0', 1)[0]332 resp['timestamp'] = self.EPOCH + timedelta(seconds=resp['timestamp'])333 resp['linked_filename'] = resp['linked_filename'].split(b'\0', 1)[0].decode('ascii')334 return resp335 def execute_program(self, filename, vid=VID_USER, runmode=RUNMODE_START):336 payload = struct.pack('<2B24s', vid, runmode, filename.encode('ascii'))337 self._send_packet(self.FILE_LOAD, payload)338 return self._recv_packet()339 def get_file_metadata_by_name(self, filename, vid):340 payload = struct.pack('<2B24s', vid, 0, filename.encode('ascii'))341 self._send_packet(self.FILE_GET_INFO, payload)342 resp = self._recv_packet('<B3L4sll24s')343 resp = dict(zip(['idx', 'size', 'addr', 'crc', 'type', 'timestamp', 'version', 'linked_filename'], resp))344 resp['type'] = resp['type']345 resp['timestamp'] = self.EPOCH + timedelta(seconds=resp['timestamp'])346 resp['linked_filename'] = resp['linked_filename'].decode('ascii')347 return resp348 # TODO:349 # + FILE_SET_INFO350 def erase_file(self, file_name, erase_all=False, vid=VID_USER):351 options = 0352 options |= (0x80 if erase_all else 0)353 tx_payload = struct.pack('<2B24s', vid, options, file_name.encode(encoding='ascii'))354 self._send_packet(self.FILE_ERASE, tx_payload)355 try:356 self._recv_packet()357 except OSError:358 if not self.silent:359 directory = 'USER' if vid == self.VID_USER else 'SYSTEM'360 print(f'\033[31;01m==> No file {directory}/{file_name} exists\033[0m')361 return False362 else:363 if not self.silent:364 directory = 'USER' if vid == self.VID_USER else 'SYSTEM'365 print(f'\033[31;01m==> Erased file {directory}/{file_name}\033[0m')366 self.ft_complete()367 return True368 def file_user_stat(self, file_name, options=0, vid=VID_USER):369 packet = struct.pack('<2B24s', vid, options, file_name)370 self._send_packet(self.FILE_USER_STAT, packet)371 return self._recv_packet('<2B')[0]372 # TODO:373 # - FILE_VISUALIZE?374 def file_cleanup(self, vid=VID_USER):375 packet = struct.pack('<2B', vid, 0)376 self._send_packet(self.FILE_CLEANUP, packet)377 return self._recv_packet('<H')[0]378 # TODO:379 # + FILE_FORMAT380 def get_system_status(self):381 self._send_packet(self.SYS_STATUS, b'')382 version = self._recv_packet(b'<x12B3xBI12x')383 return {384 'system_version': parse_version('{}.{}.{}-{}'.format(*version[0:4])),385 'cpu0_version': parse_version('{}.{}.{}-{}'.format(*version[4:8])),386 'cpu1_version': parse_version('{}.{}.{}-{}'.format(*version[8:12])),387 'touch_version': version[12],388 'system_id': version[13]389 }390 # TODO:391 # + DEV_STATUS392 # - A load of stuff I don't care about393 # - Logs if I can be bothered394 def sys_screen_cap(self):395 self._send_packet(self.SYS_SCREEN_CAP, b'')396 return self._recv_packet()397 # TODO:398 # - More crud I don't care about399 def sys_kv_load(self, variable):400 packet = struct.pack('<{}s'.format(len(variable) + 1), variable + b'\0')401 self._send_packet(self.SYS_KV_LOAD, packet)402 return self._recv_packet()[:-1]403 def sys_kv_save(self, variable, value):404 ln1 = len(variable) + 1405 ln2 = len(value) + 1406 packet = struct.pack('<{}s{}s'.format(ln1, ln2), variable + b'\0', value + b'\0')407 self._send_packet(self.SYS_KV_SAVE, packet)408 return self._recv_packet()409 def sys_c_info_14(self):410 self._send_packet(self.SYS_C_INFO_14, b'')411 packet = self._recv_data()['payload'][2:]412 mask = struct.unpack('<B', packet[0:1])[0]413 packet = packet[1:]414 slots = []415 for i in range(4):416 slot = None417 if mask & (1 << i):418 ic, nl = struct.unpack('<HB', packet[:3])419 name = struct.unpack('<{}s'.format(nl), packet[3:3 + nl])[0]420 packet = packet[3 + nl:]421 slot = (name[:-1], ic)422 slots.append(slot)423 return slots424 def sys_c_info_58(self):425 self._send_packet(self.SYS_C_INFO_58, b'')426 packet = self._recv_data()['payload'][2:]427 mask = struct.unpack('<B', packet[0:1])[0]428 packet = packet[1:]429 slots = []430 for i in range(4, 8):431 slot = None432 if mask & (1 << i):433 ic, nl = struct.unpack('<HB', packet[:3])434 name = struct.unpack('<{}s'.format(nl), packet[3:3 + nl])[0]435 packet = packet[3 + nl:]436 slot = (name[:-1], ic)437 slots.append(slot)438 return slots439 def factory_ebl(self):440 self._send_packet(self.FACTORY_EBL, self.DFU_SECRET)441 return self._recv_packet()442 def factory_status(self):443 self._send_packet(self.FACTORY_STATUS, b'')444 state, percent = self._recv_packet('<2B')445 return state, percent446 # Just for testing447 def send_test(self, op, data=b''):448 self._send_packet(op, data)449 return self._recv_packet()450 # Nicer functions451 @property452 def _supports_compression(self):453 ver = self.get_system_status()['system_version']454 return ver >= parse_version('1.0.5')455 def firmware_update(self, boot, assets, refresh=.5, callback=None):456 if boot is not None:457 self.factory_ebl()458 self.write_file(boot, 'null.bin', type=b'bin\x00', target=self.TARGET_FIRMWARE, callback=callback)459 while True:460 state, percent = self.factory_status()461 print(f'\r{self.DFU_STATES.get(state)}: {percent}% ', end='', flush=True)462 if callback is not None:463 callback(self.DFU_STATES.get(state, ''), percent)464 if state == self.DFU_STATE_DONE:465 break466 time.sleep(refresh)467 print()468 if assets is not None:469 self.factory_ebl()470 self.write_file(assets, 'null.bin', type=b'bin\x00', target=self.TARGET_ASSETS, callback=callback)471 while True:472 state, percent = self.factory_status()473 print(f'\r{self.DFU_STATES.get(state)}: {percent}% ', end='', flush=True)474 if callback is not None:475 callback(self.DFU_STATES.get(state, ''), percent)476 if state == self.DFU_STATE_DONE:477 break478 time.sleep(refresh)479 print()480 def generate_ini_file(self, remote_name, slot, ini=None, **kwargs):481 project_ini = ConfigParser()482 default_icon = self.ICON483 project_ini['program'] = {484 'version': kwargs.get('version', '0.0.0') or '0.0.0',485 'name': remote_name,486 'slot': slot,487 'icon': kwargs.get('icon', default_icon) or default_icon,488 'description': kwargs.get('description', self.DESCRIPTION),489 'date': datetime.now().isoformat()490 }491 if ini:492 project_ini.update(ini)493 with io.StringIO() as ini_str:494 project_ini.write(ini_str)495 return ini_str.getvalue()496 def write_file(self, file, remote_file, run_after=RUN_NONE, callback=None, **kwargs):497 file_len = file.seek(0, 2)498 file.seek(0, 0)499 if self._supports_compression and kwargs.get('type') == b'bin':500 buf = io.BytesIO()501 with gzip.GzipFile(fileobj=buf, mode='wb') as f:502 while True:503 data = file.read(16 * 1024)504 if not data:505 break506 f.write(data)507 file = buf508 file_len = file.seek(0, 2)509 file.seek(0, 0)510 crc32 = self.CRC32(file.read(file_len))511 file.seek(-file_len, 2)512 if not self.silent:513 print(f'\033[32;02m==> Transferring {remote_file} ({file_len} bytes) to the V5' +514 (f' from "{file.name}"' if hasattr(file, 'name') else '') + '\033[0m')515 ft_meta = self.ft_init(remote_file, function=self.FUNCTION_UPLOAD, length=file_len, crc=crc32, **kwargs)516 assert ft_meta['file_size'] >= file_len517 if len(remote_file) > 24:518 print(f' :: Truncating {remote_file} to {remote_file[:24]} due to length')519 remote_file = remote_file[:24]520 if hasattr(file, 'name'):521 remote_file = f'{remote_file} ({file.name})'522 addr = kwargs.get('addr', self.USR_ADDR)523 for i in range(0, file_len, ft_meta['max_packet_size']):524 packet_size = ft_meta['max_packet_size']525 if i + ft_meta['max_packet_size'] > file_len:526 packet_size = file_len - i527 percent = round(i / file_len * 100, 1)528 if callback is not None:529 callback(f'Uploading {remote_file}... {percent}%', percent)530 if not self.silent:531 print('\r :: Uploading {}... {}%'.format(remote_file, percent), end='',532 flush=True)533 self.ft_write(addr + i, file.read(packet_size))534 if not self.silent:535 print('\r\033[33;01m :: Uploaded {} successfully\033[0m'.format(remote_file))536 self.ft_complete(run_mode=run_after)537 def read_file(self, file, remote_file, vid=VID_USER, target=TARGET_FLASH):538 start_position = file.tell()539 metadata = self.get_file_metadata_by_name(remote_file, vid=vid)540 ft_meta = self.ft_init(remote_file, function=self.FUNCTION_DOWNLOAD, vid=vid, target=target)541 for i in range(0, ft_meta['file_size'], ft_meta['max_packet_size']):542 packet_size = ft_meta['max_packet_size']543 if i + ft_meta['max_packet_size'] > ft_meta['file_size']:544 packet_size = ft_meta['file_size'] - i545 file.write(self.ft_read(metadata['addr'] + i, packet_size))546 file.seek(start_position)547 return self.ft_complete()548 def write_program(self, file, remote_name=None, slot=0, run_after=RUN_NONE, target=TARGET_FLASH, **kwargs):549 remote_base = f'slot_{slot + 1}'550 if target == self.TARGET_DDR:551 self.write_file(file, f'{remote_base}.bin', type=b'bin', target=self.TARGET_DDR, run_after=run_after)552 return553 if not remote_name:554 remote_name = file.name555 if len(remote_name) > 20:556 if not self.silent:557 print(' :: Truncating remote name to {} for length.'.format(remote_name[:20]))558 remote_name = remote_name[:20]559 ini_file = self.generate_ini_file(remote_name, slot, **kwargs)560 # Stop program561 self.execute_program('', runmode=self.RUNMODE_STOP)562 # Wait for it to stop to be safe563 time.sleep(0.2)564 with io.BytesIO(ini_file.encode(encoding='ascii')) as ini_bin:565 # Write ini566 self.write_file(ini_bin, f'{remote_base}.ini', type=b'ini')567 # Write bin568 self.write_file(file, f'{remote_base}.bin', type=b'bin', run_after=run_after)569 def erase_slot(self, slot):570 self.erase_file(f'slot_{slot}.ini')571 self.erase_file(f'slot_{slot}.bin')572 def erase_brain(self):573 user = self.get_dir_count(self.VID_USER)574 fns = []575 for idx in range(user):576 fns.append(self.get_file_metadata_by_idx(idx)['linked_filename'])577 for fn in fns:578 self.erase_file(fn)579 def tree(self, prompt=''):580 user = self.get_dir_count(self.VID_DEV1)581 sys = self.get_system_status()582 name = self[self.ROBOT_NAME].decode('ascii') or 'V5'583 name += ' (' + hex(sys['system_id'])[2:] + ')'584 colours = {585 'ini': '\033[34;01m',586 'bin': '\033[35;01m',587 }588 print(prompt + name)589 for n, vid in enumerate(self.VIDS.keys()):590 count = self.get_dir_count(vid)591 if n == len(self.VIDS) - 1:592 print(prompt + '└───' + self.VIDS[vid])593 lp = ' '594 else:595 print(prompt + '├───' + self.VIDS[vid])596 lp = '│'597 if not count:598 print(prompt + lp + ' └───\033[31;01mFolder empty\033[0m')599 else:600 for i in range(count):601 f = self.get_file_metadata_by_idx(i)602 print(f)603 pre = ' └───' if i == count - 1 else ' ├───'604 pre += colours.get(f['linked_filename'].split('.')[-1], '')605 print(prompt + lp + pre + f['linked_filename'] + '\033[0m')606 print()607 def get_slots(self):608 return self.sys_c_info_14() + self.sys_c_info_58()609 ## Kernel variables610 def __setitem__(self, item, val):611 if not isinstance(item, bytes):612 item = item.encode('ascii')613 if not isinstance(val, bytes):614 val = val.encode('ascii')615 self.sys_kv_save(item, val)616 def __getitem__(self, item):617 if not isinstance(item, bytes):618 item = item.encode('ascii')619 return self.sys_kv_load(item)620 # Packet utils621 @staticmethod622 def _align(data, to=4):623 if len(data) % to == 0:624 return data625 return data + b'\0' * (to - len(data) % to)626 def _send_packet(self, command, payload):627 packet = self._ext_packet(command, payload)628 self.conn.read_all()629 self.conn.write(packet)630 self.conn.flush()631 return len(packet)632 def _recv_packet(self, fmt=None, timeout=0.1, check_ack=True):633 packet = self._recv_data(timeout=timeout)634 assert (packet['command'] == self.USER_CDC)635 if not self.CRC16(packet['raw']) == 0:636 raise ValueError("CRC of message didn't match 0: {}".format(self.CRC16(packet['raw'])), packet)637 msg = packet['payload'][1:-2]638 if check_ack:639 if msg[0] in self.NACKS.keys():640 raise OSError("Device NACK'd with reason: {}".format(self.NACKS[msg[0]]), msg)...

Full Screen

Full Screen

lx16a.py

Source:lx16a.py Github

copy

Full Screen

...119 if LX16A._checksum(packet[:-1]) != packet[-1]:120 LX16A._controller.flushInput()121 raise ServoChecksumError(f"Servo {servo_id}: bad checksum", servo_id)122 @staticmethod123 def _send_packet(packet: list[int]) -> None:124 packet = [0x55, 0x55, *packet]125 packet.append(LX16A._checksum(packet))126 LX16A._controller.write(bytes(packet))127 @staticmethod128 def _read_packet(num_bytes: int, servo_id: int) -> list[int]:129 received = LX16A._controller.read(num_bytes + 6)130 if len(received) != num_bytes + 6:131 raise ServoTimeoutError(132 f"Servo {servo_id}: {len(received)} bytes (expected {num_bytes})",133 servo_id,134 )135 if LX16A._checksum(received[:-1]) != received[-1]:136 raise ServoChecksumError(f"Servo {servo_id}: bad checksum", servo_id)137 return list(received[5:-1])138 @staticmethod139 def _to_servo_range(angle: float) -> int:140 return round(angle * 25 / 6)141 @staticmethod142 def _from_servo_range(angle: int) -> float:143 return angle * 6 / 25144 @staticmethod145 def _check_within_limits(146 value: Union[float, int],147 lower_limit: Union[float, int],148 upper_limit: Union[float, int],149 variable_name: str,150 servo_id: int,151 ) -> None:152 if value < lower_limit or value > upper_limit:153 raise ServoArgumentError(154 f"Servo {servo_id}: {variable_name} must be between {lower_limit} and {upper_limit} (received {value})",155 servo_id,156 )157 ################ Write Commands ################158 def move(159 self, angle: float, time: int = 0, relative: bool = False, wait: bool = False, wait_to_complete: bool = False160 ) -> None:161 if not self._torque_enabled:162 raise ServoLogicalError(163 f"Servo {self._id}: torque must be enabled to move", self._id164 )165 if self._motor_mode:166 raise ServoLogicalError(167 f"Servo {self._id}: motor mode must be disabled to control movement",168 self._id,169 )170 LX16A._check_within_limits(angle, 0, 240, "angle", self._id)171 LX16A._check_within_limits(172 angle,173 LX16A._to_servo_range(self._angle_limits[0]),174 LX16A._to_servo_range(self._angle_limits[1]),175 "angle",176 self._id,177 )178 degree = angle179 angle = LX16A._to_servo_range(angle)180 if relative:181 angle += self._commanded_angle182 if wait:183 packet = [self._id, 7, 7, *LX16A._to_bytes(angle), *LX16A._to_bytes(time)]184 else:185 packet = [self._id, 7, 1, *LX16A._to_bytes(angle), *LX16A._to_bytes(time)]186 LX16A._send_packet(packet)187 if wait:188 self._waiting_angle = angle189 self._waiting_for_move = True190 else:191 self._commanded_angle = angle192 if wait_to_complete:193 print('Waiting to complete before moving on to other commands')194 completing = True195 init_angle = self.get_physical_angle()196 start = t.time()197 198 while completing:199 if abs(degree-init_angle)/100 < time/1000:200 if t.time()-start > time/1000+0.25:201 print('Completed Command')202 completing = False203 else:204 if t.time()-start > abs(degree-init_angle)/100+.25:205 print('Completed Command')206 completing = False207 def move_bspline(self, x: float, time: int = 0, wait: bool = False) -> None:208 if self._bspline is None:209 raise ServoLogicalError(f"Servo {self._id}: no B-Spline defined", self._id)210 self.move(self._bspline.sample_x(x), time, False, wait)211 def move_start(self) -> None:212 if not self._waiting_for_move:213 raise ServoLogicalError(f"Servo {self._id}: not waiting for move", self._id)214 if not self._torque_enabled:215 raise ServoLogicalError(216 f"Servo {self._id}: torque must be enabled to move", self._id217 )218 if self._motor_mode:219 raise ServoLogicalError(220 f"Servo {self._id}: motor mode must be disabled to control movement",221 self._id,222 )223 packet = [self._id, 3, 11]224 LX16A._send_packet(packet)225 self._commanded_angle = self._waiting_angle226 self._waiting_for_move = False227 def move_stop(self) -> None:228 if self._motor_mode:229 raise ServoLogicalError(230 f"Servo {self._id}: motor mode must be disabled to control movement",231 self._id,232 )233 packet = [self._id, 3, 12]234 LX16A._send_packet(packet)235 self._commanded_angle = LX16A._to_servo_range(self.get_physical_angle())236 def set_id(self, id_: int) -> None:237 LX16A._check_within_limits(id_, 0, 253, "servo ID", self._id)238 packet = [self._id, 4, 13, id_]239 LX16A._send_packet(packet)240 self._id = id_241 def set_angle_offset(self, offset: int, permanent: bool = False) -> None:242 LX16A._check_within_limits(offset, -30, 30, "angle offset", self._id)243 offset = LX16A._to_servo_range(offset)244 if offset < 0:245 offset = 256 + offset246 packet = [self._id, 4, 17, offset]247 LX16A._send_packet(packet)248 self._angle_offset = offset249 if permanent:250 packet = [self._id, 3, 18]251 LX16A._send_packet(packet)252 def set_angle_limits(self, lower_limit: float, upper_limit: float) -> None:253 LX16A._check_within_limits(lower_limit, 0, 240, "lower limit", self._id)254 LX16A._check_within_limits(upper_limit, 0, 240, "upper limit", self._id)255 if upper_limit < lower_limit:256 raise ServoArgumentError(257 f"Servo {self._id}: lower limit (received {lower_limit} must be less than upper limit (received {upper_limit})",258 self._id,259 )260 lower_limit = LX16A._to_servo_range(lower_limit)261 upper_limit = LX16A._to_servo_range(upper_limit)262 packet = [263 self._id,264 7,265 20,266 *LX16A._to_bytes(lower_limit),267 *LX16A._to_bytes(upper_limit),268 ]269 LX16A._send_packet(packet)270 self._angle_limits = lower_limit, upper_limit271 def set_vin_limits(self, lower_limit: int, upper_limit: int) -> None:272 LX16A._check_within_limits(lower_limit, 4500, 12000, "lower limit", self._id)273 LX16A._check_within_limits(upper_limit, 4500, 12000, "upper limit", self._id)274 if upper_limit < lower_limit:275 raise ServoArgumentError(276 f"Servo {self._id}: lower limit (received {lower_limit} must be less than upper limit (received {upper_limit})",277 self._id,278 )279 packet = [280 self._id,281 7,282 22,283 *LX16A._to_bytes(lower_limit),284 *LX16A._to_bytes(upper_limit),285 ]286 LX16A._send_packet(packet)287 self._vin_limits = lower_limit, upper_limit288 def set_temp_limit(self, upper_limit: int) -> None:289 LX16A._check_within_limits(upper_limit, 50, 100, "temperature limit", self._id)290 packet = [self._id, 4, 24, upper_limit]291 LX16A._send_packet(packet)292 self._temp_limit = upper_limit293 def motor_mode(self, speed: int) -> None:294 if not self._torque_enabled:295 raise ServoLogicalError(296 f"Servo {self._id}: torque must be enabled to control movement",297 self._id,298 )299 # if self._motor_mode:300 # raise ServoLogicalError(f'Servo {self._id}: servo is already in motor mode')301 LX16A._check_within_limits(speed, -1000, 1000, "motor speed", self._id)302 if speed < 0:303 speed += 65536304 packet = [self._id, 7, 29, 1, 0, *LX16A._to_bytes(speed)]305 LX16A._send_packet(packet)306 self._motor_mode = True307 def servo_mode(self) -> None:308 # if not self._motor_mode:309 # raise ServoLogicalError(f'Servo {self._id}: servo is already in servo mode')310 packet = [self._id, 7, 29, 0, 0, 0, 0]311 LX16A._send_packet(packet)312 self._motor_mode = False313 def enable_torque(self) -> None:314 packet = [self._id, 4, 31, 0]315 LX16A._send_packet(packet)316 self._torque_enabled = True317 def disable_torque(self) -> None:318 packet = [self._id, 4, 31, 1]319 LX16A._send_packet(packet)320 self._torque_enabled = False321 def led_power_off(self) -> None:322 packet = [self._id, 4, 33, 1]323 LX16A._send_packet(packet)324 self._led_powered = False325 def led_power_on(self) -> None:326 packet = [self._id, 4, 33, 0]327 LX16A._send_packet(packet)328 self._led_powered = True329 def set_led_error_triggers(330 self, over_temperature: bool, over_voltage: bool, rotor_locked: bool331 ) -> None:332 combined = 4 * rotor_locked + 2 * over_voltage + over_temperature333 packet = [self._id, 4, 35, combined]334 LX16A._send_packet(packet)335 self._led_error_triggers = over_temperature, over_voltage, rotor_locked336 def set_bspline(337 self,338 knots: list[float],339 control_points: list[tuple[float, float]],340 degree: int,341 num_samples: int = 100,342 ) -> None:343 if len(knots) != len(control_points) - degree + 1:344 raise ServoArgumentError(345 f"Servo {self._id}: len(knots) != len(control_points) - degree + 1",346 self._id,347 )348 self._bspline = _BSpline(knots, control_points, degree, num_samples)349 ################ Read Commands ################350 def get_last_instant_move_hw(self) -> tuple[float, int]:351 packet = [self._id, 3, 2]352 LX16A._send_packet(packet)353 received = LX16A._read_packet(4, self._id)354 angle = LX16A._from_servo_range(received[0] + received[1] * 256)355 time = received[2] + received[3] * 256356 return angle, time357 def get_last_delayed_move_hw(self) -> tuple[float, int]:358 packet = [self._id, 3, 8]359 LX16A._send_packet(packet)360 received = LX16A._read_packet(4, self._id)361 angle = LX16A._from_servo_range(received[0] + received[1] * 256)362 time = received[2] + received[3] * 256363 return angle, time364 def get_id(self, poll_hardware: bool = False) -> int:365 if not poll_hardware:366 return self._id367 packet = [self._id, 3, 14]368 LX16A._send_packet(packet)369 received = LX16A._read_packet(1, self._id)370 return received[0]371 def get_angle_offset(self, poll_hardware: bool = False) -> int:372 if not poll_hardware:373 return LX16A._from_servo_range(self._angle_offset)374 packet = [self._id, 3, 19]375 LX16A._send_packet(packet)376 received = LX16A._read_packet(1, self._id)377 if received[0] > 125:378 return received[0] - 256379 return LX16A._from_servo_range(received[0])380 def get_angle_limits(self, poll_hardware: bool = False) -> tuple[float, float]:381 if not poll_hardware:382 return LX16A._from_servo_range(383 self._angle_limits[0]384 ), LX16A._from_servo_range(self._angle_limits[1])385 packet = [self._id, 3, 21]386 LX16A._send_packet(packet)387 received = LX16A._read_packet(4, self._id)388 lower_limit = LX16A._from_servo_range(received[0] + received[1] * 256)389 upper_limit = LX16A._from_servo_range(received[2] + received[3] * 256)390 return lower_limit, upper_limit391 def get_vin_limits(self, poll_hardware: bool = False) -> tuple[int, int]:392 if not poll_hardware:393 return self._vin_limits394 packet = [self._id, 3, 23]395 LX16A._send_packet(packet)396 received = LX16A._read_packet(4, self._id)397 lower_limit = received[0] + received[1] * 256398 upper_limit = received[2] + received[3] * 256399 return lower_limit, upper_limit400 def get_temp_limit(self, poll_hardware: bool = False) -> int:401 if not poll_hardware:402 return self._temp_limit403 packet = [self._id, 3, 25]404 LX16A._send_packet(packet)405 received = LX16A._read_packet(1, self._id)406 return received[0]407 def is_motor_mode(self, poll_hardware: bool = False) -> bool:408 if not poll_hardware:409 return self._motor_mode410 packet = [self._id, 3, 30]411 LX16A._send_packet(packet)412 received = LX16A._read_packet(4, self._id)413 return received[0] == 1414 def get_motor_speed(self, poll_hardware: bool = False) -> int:415 if not self._motor_mode:416 raise ServoLogicalError(f"Servo {self._id}: not in motor mode", self._id)417 if not poll_hardware:418 return self._motor_speed419 packet = [self._id, 3, 30]420 LX16A._send_packet(packet)421 received = LX16A._read_packet(4, self._id)422 if received[0] == 1:423 speed = received[2] + received[3] * 256424 return speed - 65536 if speed > 32767 else speed425 return None426 def is_torque_enabled(self, poll_hardware: bool = False) -> bool:427 if not poll_hardware:428 return self._torque_enabled429 packet = [self._id, 3, 32]430 LX16A._send_packet(packet)431 received = LX16A._read_packet(1, self._id)432 return received[0] == 1433 def is_led_power_on(self, poll_hardware: bool = False) -> bool:434 if not poll_hardware:435 return self._led_powered436 packet = [self._id, 3, 34]437 LX16A._send_packet(packet)438 received = LX16A._read_packet(1, self._id)439 return received[0] == 0440 def get_led_error_triggers(441 self, poll_hardware: bool = False442 ) -> tuple[bool, bool, bool]:443 if not poll_hardware:444 return self._led_error_triggers445 packet = [self._id, 3, 36]446 LX16A._send_packet(packet)447 received = LX16A._read_packet(1, self._id)448 over_temperature = received[0] & 1 != 0449 over_voltage = received[0] & 2 != 0450 rotor_locked = received[0] & 4 != 0451 return over_temperature, over_voltage, rotor_locked452 def get_temp(self) -> int:453 packet = [self._id, 3, 26]454 LX16A._send_packet(packet)455 received = LX16A._read_packet(1, self._id)456 return received[0]457 def get_vin(self) -> int:458 packet = [self._id, 3, 27]459 LX16A._send_packet(packet)460 received = LX16A._read_packet(2, self._id)461 return received[0] + received[1] * 256462 def get_physical_angle(self) -> float:463 packet = [self._id, 3, 28]464 LX16A._send_packet(packet)465 # print('Motor = ', self._id)466 received = LX16A._read_packet(2, self._id)467 angle = received[0] + received[1] * 256468 return LX16A._from_servo_range(angle - 65536 if angle > 32767 else angle)469 def get_commanded_angle(self) -> float:470 return LX16A._from_servo_range(self._commanded_angle)471 def get_waiting_angle(self) -> float:472 if not self._waiting_for_move:473 raise ServoLogicalError(f"Servo {self._id}: not waiting for move", self._id)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful