How to use write_sync method in localstack

Best Python code snippet using localstack_python

tests.py

Source:tests.py Github

copy

Full Screen

1import pytest2import logging3import time4import yaml5from math import nan6from roboglia.utils import register_class, unregister_class, registered_classes, get_registered_class7from roboglia.utils import check_key, check_options, check_type, check_not_empty8from roboglia.base import BaseRobot, BaseDevice, BaseBus, BaseRegister9from roboglia.base import RegisterWithConversion, RegisterWithThreshold10from roboglia.base import RegisterWithMapping11from roboglia.base import BaseThread12from roboglia.base import PVL, PVLList13from roboglia.base import SharedFileBus14from roboglia.dynamixel import DynamixelBus15from roboglia.i2c import SharedI2CBus16from roboglia.move import Script17# format = '%(asctime)s %(levelname)-7s %(threadName)-18s %(name)-32s %(message)s'18# logging.basicConfig(format=format, 19# # file = 'test.log', 20# level=60) # silent21logger = logging.getLogger(__name__)22class TestManualRobot:23 def test_manual_robot(self):24 _ = BaseRobot()25 def test_manual_bus(self, caplog):26 rob = BaseRobot()27 bus = SharedFileBus(name='busA', port='/tmp/busA.log')28 rob.add_bus(bus)29 assert len(rob.buses) == 130 bus.open()31 # re-add32 caplog.clear()33 rob.add_bus(bus)34 assert len(caplog.records) == 135 assert len(rob.buses) == 136 def test_manual_device(self, caplog):37 rob = BaseRobot()38 bus = SharedFileBus(name='busA', port='/tmp/busA.log')39 rob.add_bus(bus)40 dev = BaseDevice(name='dev1', bus=bus, dev_id=42, model='DUMMY')41 rob.add_device(dev)42 assert len(rob.devices) == 143 bus.open()44 # re-add45 caplog.clear()46 rob.add_device(dev)47 assert len(caplog.records) == 148 assert len(rob.devices) == 1 49class TestMockRobot:50 @pytest.fixture51 def mock_robot(self): 52 robot = BaseRobot.from_yaml('tests/dummy_robot.yml')53 robot.start()54 yield robot55 robot.stop()56 57 @pytest.fixture58 def dummy_device(self):59 bus = BaseBus(robot='robot', port='dev')60 return BaseDevice(name='device', bus=bus, dev_id=42, model='DUMMY',61 robot='robot')62 def test_incomplete_robot(self, caplog):63 # we now allow creating robots with no buses64 caplog.clear()65 _ = BaseRobot.from_yaml('tests/no_buses.yml')66 # we now allow creating robots with no devices67 caplog.clear()68 _ = BaseRobot.from_yaml('tests/no_devices.yml')69 caplog.clear()70 _ = BaseRobot.from_yaml('tests/dummy_robot.yml')71 assert 'Only the first robot will be considered' in caplog.text72 def test_robot_from_yaml(self, mock_robot):73 mock_robot.stop() # to avoid conflicts on the bus74 new_robot = BaseRobot.from_yaml('tests/dummy_robot.yml')75 new_robot.start()76 new_robot.syncs['write'].start()77 time.sleep(2) # for loops to run78 new_robot.stop()79 mock_robot.start() # restart for other cases80 assert mock_robot.name == 'dummy'81 def test_device_str(self, mock_robot):82 rep = str(mock_robot.devices['d01'])83 assert 'd01' in rep84 assert 'busA' in rep85 assert 'enable_device' in rep86 def test_register_bool(self, mock_robot):87 device = mock_robot.devices['d03']88 assert not device.status.clone89 assert device.status_unmasked.clone90 assert device.status_one.clone91 assert device.status_2and3.clone92 assert device.status_2or3.clone93 # values94 assert device.status_unmasked.value95 assert device.status_one.value96 assert not device.status_2and3.value97 assert device.status_2or3.value98 # setting99 device.status_unmasked.value = False100 assert not device.status_unmasked.value101 device.status_2and3.value = True102 assert device.status_2and3.value103 assert device.status_2and3.int_value == 0b00000110104 def test_register_bool_with_mask(self, mock_robot):105 device = mock_robot.devices['d03']106 assert not device.status_masked.value107 device.status_masked.value = True108 assert device.status_masked.value109 assert device.status_masked.int_value == 0b10100101110 def test_register_with_conversion(self, mock_robot, caplog):111 reg = mock_robot.devices['d03'].desired_pos112 assert isinstance(reg, RegisterWithConversion)113 reg.value = 100114 assert abs(reg.value - 100) < 0.1115 exp_int = 100 * reg.factor + reg.offset116 assert (reg.int_value - exp_int) <=1117 assert reg.range == (0,1023)118 exp_min_ext = - 150.0119 exp_max_ext = 150.0120 assert (reg.min_ext - exp_min_ext) < 1121 assert (reg.max_ext - exp_max_ext) < 1122 ext_range = reg.range_ext123 assert (ext_range[0] - exp_min_ext) < 1124 assert (ext_range[1] - exp_max_ext) < 1125 # try changing the internal value directly126 # removed after bug #64127 # 128 # caplog.clear()129 # reg.int_value = 1130 # assert len(caplog.records) >= 1131 # assert 'only BaseSync subclasses can change' in caplog.text132 def test_register_with_threshold(self, mock_robot):133 reg = mock_robot.devices['d03'].writeable_current_load134 assert isinstance(reg, RegisterWithThreshold)135 assert not reg.sync136 reg.value = 50137 assert abs(reg.value - 50) < 0.1138 exp_int = 100 * reg.factor139 assert (reg.int_value - exp_int) <=1140 reg.value = -50141 assert abs(reg.value + 50) < 0.1142 exp_int = 100 * reg.factor + reg.threshold143 assert (reg.int_value - exp_int) <=1144 145 def test_register_read_only(self, mock_robot, caplog):146 reg = mock_robot.devices['d03'].current_pos147 assert isinstance(reg, RegisterWithConversion)148 caplog.clear()149 reg.value = 100150 assert len(caplog.records) == 1151 assert 'Attempted to write in RO register current_pos' in caplog.text152 def test_register_with_mapping(self, dummy_device, caplog):153 reg = RegisterWithMapping(154 name='test',155 device=dummy_device,156 address=42,157 mask=0b00000011,158 sync=True, # avoids calling read / write159 access='RW', # so we can change it!160 mapping={1:100, 2:2000, 3:30000}161 )162 reg.value = 100163 assert reg.value == 100164 assert reg.int_value == 1165 # wrong value > log error166 caplog.clear()167 reg.value = 99168 assert len(caplog.records) == 1169 assert 'when converting to internal for register' in caplog.text170 def test_sync_pause_resume(self, mock_robot):171 write_sync = mock_robot.syncs['write']172 write_sync.start()173 assert write_sync.started174 assert write_sync.running175 assert not write_sync.paused176 assert not write_sync.stopped177 write_sync.pause()178 assert write_sync.started179 assert not write_sync.running180 assert write_sync.paused181 assert not write_sync.stopped182 time.sleep(0.5)183 write_sync.resume()184 assert write_sync.started185 assert write_sync.running186 assert not write_sync.paused187 assert not write_sync.stopped 188 write_sync.stop()189 assert not write_sync.started190 assert not write_sync.running191 assert not write_sync.paused192 assert write_sync.stopped 193 def test_sync_start_restart(self, mock_robot):194 write_sync = mock_robot.syncs['write']195 assert not write_sync.started196 assert not write_sync.running197 assert not write_sync.paused198 assert write_sync.stopped 199 # re-stop200 write_sync.stop()201 assert not write_sync.started202 assert not write_sync.running203 assert not write_sync.paused204 assert write_sync.stopped205 # start206 write_sync.start()207 assert write_sync.started208 assert write_sync.running209 assert not write_sync.paused210 assert not write_sync.stopped211 # re-start212 write_sync.start()213 assert write_sync.started214 assert write_sync.running215 assert not write_sync.paused216 assert not write_sync.stopped217 write_sync.stop()218 def test_loop_warning_review(self, mock_robot):219 read_sync = mock_robot.syncs['read']220 assert read_sync.warning == 0.9 # default221 read_sync.warning = 0.95222 assert read_sync.warning == 0.95223 # percentage set224 read_sync.warning = 90225 assert read_sync.warning == 0.9226 assert read_sync.review == 1.0 # default227 assert read_sync.frequency == 100228 def test_sync_with_closed_bus(self, mock_robot, caplog):229 write_sync = mock_robot.syncs['write']230 assert write_sync.stopped231 bus = write_sync.bus232 caplog.clear()233 if bus.is_open:234 bus.close()235 assert len(caplog.records) >= 1236 assert 'that is used by running syncs' in caplog.text237 assert bus.is_open238 read_sync = mock_robot.syncs['read']239 read_sync.stop()240 if bus.is_open:241 bus.close()242 assert not bus.is_open243 caplog.clear()244 write_sync.start()245 assert len(caplog.records) == 1246 assert 'attempt to start with a bus not open' in caplog.text247 assert write_sync.stopped248 # now set things back249 bus.open()250 assert bus.is_open251 def test_joint_info(self, mock_robot):252 j = mock_robot.joints['pan']253 d = mock_robot.devices['d01']254 assert j.position_read_register == d.current_pos255 assert j.position_write_register == d.desired_pos256 assert j.activate_register == d.enable_device257 assert j.velocity_read_register == d.current_speed258 assert j.velocity_write_register == d.desired_speed259 assert j.load_read_register == d.current_load260 assert j.load_write_register == d.desired_load261 assert j.active262 def test_joint_value(self, mock_robot):263 tilt = mock_robot.joints['tilt']264 d02 = mock_robot.devices['d02']265 # joint is 'inverse'266 assert tilt.position == - d02.current_pos.value + tilt.offset267 assert tilt.velocity == - d02.current_speed.value268 assert tilt.load == - d02.current_load.value269 # setter270 tilt.position = 10271 tilt.velocity = 20272 tilt.load = 50273 assert abs(tilt.desired_position - 10) < 0.2274 assert abs(tilt.desired_velocity - 20) < 0.2275 assert abs(tilt.desired_load - 50) < 0.2276 def test_joint_no_activate(self, mock_robot, caplog):277 joint = mock_robot.joints['no_activate']278 assert joint.active279 caplog.clear()280 joint.active = True281 assert len(caplog.records) >= 1282 assert 'attempted to change activation of joint' in caplog.text283 def test_joint_min_max(self, mock_robot):284 # check min and max285 pan = mock_robot.joints['pan']286 minv, maxv = pan.range287 pan.position = 50288 assert abs(pan.desired_position - maxv) < 0.2289 pan.position = -50290 assert abs(pan.desired_position - minv) < 0.2291 def test_joint_repr(self, mock_robot):292 pan = mock_robot.joints['pan']293 pan_str = str(pan)294 assert pan.name in pan_str295 assert 'p=' in pan_str296 assert 'v=' in pan_str297 assert 'l=' in pan_str298 def test_sensor_info(self, mock_robot):299 s = mock_robot.sensors['bus_voltage']300 d = mock_robot.devices['d01']301 assert s.device == d302 assert s.read_register == d.current_voltage303 assert s.activate_register is None304 assert s.active305 # assert s.bits is None306 assert s.offset == 0307 assert not s.inverse308 assert s.auto_activate309 def test_sensor_value(self, mock_robot):310 s = mock_robot.sensors['bus_voltage']311 d = mock_robot.devices['d01']312 assert s.value == d.current_voltage.value313 314 def test_bus_repr(self, mock_robot):315 dev = mock_robot.devices['d01']316 for register in dev.registers.values():317 register.read()318 str_repr = str(mock_robot.buses['busA'])319 assert f'Device {dev.dev_id}' in str_repr320 for register in dev.registers.values():321 if not register.sync:322 assert str(register.address) in str_repr323 assert str(register.int_value) in str_repr324 def test_bus_acquire(self, mock_robot, caplog):325 dev = mock_robot.devices['d01']326 bus = mock_robot.buses['busA']327 # stop syncs to avoid interference (additional messages)328 for sync in mock_robot.syncs.values():329 sync.stop()330 bus.can_use() # lock the bus331 # read332 caplog.clear()333 bus.read(dev.current_pos)334 assert len(caplog.records) == 1335 assert 'failed to acquire bus busA' in caplog.text336 # write337 caplog.clear()338 bus.write(dev.current_pos, 10)339 assert len(caplog.records) >= 1340 assert 'failed to acquire bus busA' in caplog.text 341 # release bus342 bus.stop_using()343 mock_robot.stop()344 def test_bus_small_branches(self, mock_robot, caplog):345 bus = mock_robot.buses['busA']346 # close bus used by syncs347 caplog.clear()348 bus.close()349 assert len(caplog.records) == 1350 assert 'Attempted to close bus' in caplog.text351 # open bus already open352 caplog.clear()353 bus.open()354 assert len(caplog.records) == 1355 assert 'bus busA already open' in caplog.text356 # read from closed bus357 device = mock_robot.devices['d04']358 caplog.clear()359 device.delay.write()360 assert len(caplog.records) == 1361 assert 'attempt to write to closed bus' in caplog.text362 caplog.clear()363 device.model.read()364 assert len(caplog.records) == 1365 assert 'attempt to read from closed bus' in caplog.text366 # timeout367 assert bus.timeout == 0.5368 def test_thread_crash(self):369 class CrashAtRun(BaseThread):370 def run(self):371 time.sleep(0.25)372 raise OSError373 class CrashAtSetup(BaseThread):374 def setup(self):375 time.sleep(.25)376 raise OSError377 class CrashLongSetup(BaseThread):378 def setup(self):379 time.sleep(1)380 raise OSError381 thread = CrashAtRun(name='my_thread')382 thread.start()383 time.sleep(0.5)384 assert not thread.started385 assert not thread.paused386 thread = CrashAtSetup(name='my_thread', patience=0.3)387 with pytest.raises(RuntimeError):388 thread.start()389 time.sleep(0.5)390 assert not thread.started391 assert not thread.paused 392 thread = CrashLongSetup(name='my_thread', patience=0.3)393 with pytest.raises(RuntimeError):394 thread.start()395 time.sleep(0.5)396 assert not thread.started397 assert not thread.paused 398class TestUtilsFactory:399 def test_register_not_class(self):400 mess = 'You must pass a Class not an instance'401 with pytest.raises(ValueError) as excinfo:402 register_class(10)403 assert mess in str(excinfo.value)404 def test_unregister_missing_class(self):405 mess = 'not registered with the factory'406 with pytest.raises(KeyError) as excinfo:407 unregister_class('dummy')408 assert mess in str(excinfo.value)409 def test_unregister_class(self):410 class Test: pass411 register_class(Test)412 assert 'Test' in registered_classes()413 unregister_class('Test')414 assert 'Test' not in registered_classes()415 def test_get_registered_class_not_available(self):416 mess = 'not registered with the factory'417 with pytest.raises(KeyError) as excinfo:418 get_registered_class('dummy')419 assert mess in str(excinfo.value)420 def test_register_class_existing(self):421 class Test: pass422 register_class(Test)423 # again424 register_class(Test)425 assert 'Test' in registered_classes()426 unregister_class('Test')427 assert 'Test' not in registered_classes()428class TestUtilsChecks:429 def test_check_key(self):430 mess = 'specification missing'431 with pytest.raises(KeyError) as excinfo:432 check_key('key', {}, 'object', 10, logger)433 assert mess in str(excinfo.value)434 with pytest.raises(KeyError) as excinfo:435 check_key('key', {}, 'object', 10, logger, 'custom')436 assert 'custom' in str(excinfo.value)437 def test_check_type(self):438 mess = 'should be of type'439 with pytest.raises(ValueError) as excinfo:440 check_type('10', int, 'object', 10, logger)441 assert mess in str(excinfo.value)442 with pytest.raises(ValueError) as excinfo:443 check_type('10', int, 'object', 10, logger, 'custom')444 assert 'custom' in str(excinfo.value)445 with pytest.raises(ValueError) as excinfo:446 check_type('10', [int, float], 'object', 10, logger) 447 def test_check_options(self):448 mess = 'should be one of'449 with pytest.raises(ValueError) as excinfo:450 check_options(10, ['a', 'b'], 'object', 10, logger)451 assert mess in str(excinfo.value)452 with pytest.raises(ValueError) as excinfo:453 check_options(10, ['a', 'b'], 'object', 10, logger, 'custom')454 assert 'custom' in str(excinfo.value)455 def test_check_not_empty(self):456 mess = 'should not be empty'457 # string458 with pytest.raises(ValueError) as excinfo:459 check_not_empty('', 'string', 'object', 10, logger)460 assert mess in str(excinfo.value)461 # number462 with pytest.raises(ValueError) as excinfo:463 check_not_empty(0, 'integer', 'object', 10, logger)464 assert mess in str(excinfo.value)465 # list466 with pytest.raises(ValueError) as excinfo:467 check_not_empty([], 'list', 'object', 10, logger)468 assert mess in str(excinfo.value)469 # dict470 with pytest.raises(ValueError) as excinfo:471 check_not_empty({}, 'dict', 'object', 10, logger)472 assert mess in str(excinfo.value)473 # custom message474 with pytest.raises(ValueError) as excinfo:475 check_not_empty('', 'string', 'object', 10, logger, 'custom')476 assert 'custom' in str(excinfo.value)477class TestDynamixelRobot:478 @pytest.fixture479 def mock_robot_init(self):480 with open('tests/dynamixel_robot.yml', 'r') as f:481 info_dict = yaml.load(f, Loader=yaml.FullLoader)482 yield info_dict483 @pytest.fixture484 def dummy_device(self):485 bus = BaseBus(robot='robot', port='dev')486 return BaseDevice(name='device', bus=bus, dev_id=42, model='DUMMY',487 robot='robot')488 def test_dynamixel_robot(self, mock_robot_init):489 for name, init_dict in mock_robot_init.items():490 break491 robot = BaseRobot(name=name, **init_dict)492 robot.start()493 robot.stop()494 def test_dynamixel_write(self, mock_robot_init):495 robot = BaseRobot(**mock_robot_init['dynamixel'])496 robot.start()497 dev = robot.devices['d11']498 for _ in range(100): # so that we also hit some comm errors499 dev.temperature_limit.value = 85500 assert dev.temperature_limit.value == 85501 dev.cw_angle_limit_deg.value = 10502 assert (dev.cw_angle_limit_deg.value - 10) < 0.2503 robot.stop()504 def test_dynamixel_register_4Bytes(self, mock_robot_init):505 robot = BaseRobot(**mock_robot_init['dynamixel'])506 robot.start()507 dev = robot.devices['d11']508 register = BaseRegister(name='test', device=dev, address=150, size=4,509 access='RW')510 register.value = 100511 assert register.value == 100512 # removed in ver 0.1.0 after introduction of RegisterWithMapping513 # def test_dynamixel__AXBaudRateRegister(self, dummy_device, caplog):514 # reg = DynamixelAXBaudRateRegister(515 # name='test',516 # device=dummy_device,517 # address=42,518 # minim=33, # for testing branches519 # maxim=66, # for testing branches520 # sync=True, # avoids calling read / write521 # access='RW' # so we can change it!522 # )523 # reg.value = 1000000524 # assert reg.value == 1000000525 # assert reg.int_value == 1526 # # wrong value > log error527 # caplog.clear()528 # reg.value = 99529 # assert len(caplog.records) == 1530 # assert 'attempt to write a non supported for AX baud' in caplog.text531 # removed in ver 0.1.0 after introduction of RegisterWithMapping532 # def test_dynamixel__AXComplianceSlopeRegister(self, dummy_device):533 # reg = DynamixelAXComplianceSlopeRegister(534 # name='test',535 # device=dummy_device,536 # address=42,537 # maxim=66, # for testing branches538 # sync=True, # avoids calling read / write539 # access='RW' # so we can change it!540 # )541 # reg.value = 7542 # assert reg.value == 7543 # assert reg.int_value == 128544 # removed in ver 0.1.0 after introduction of RegisterWithMapping545 # def test_dynamixel__XLBaudRateRegister(self, dummy_device, caplog):546 # reg = DynamixelXLBaudRateRegister(547 # name='test',548 # device=dummy_device,549 # address=42,550 # minim=33, # for testing branches551 # maxim=66, # for testing branches552 # sync=True, # avoids calling read / write553 # access='RW' # so we can change it!554 # )555 # reg.value = 1000000556 # assert reg.value == 1000000557 # assert reg.int_value == 3558 # # wrong value > log error559 # caplog.clear()560 # reg.value = 99561 # assert len(caplog.records) == 1562 # assert 'attempt to write a non supported for XL baud' in caplog.text563 def test_open_device_with_sync_items(self, mock_robot_init):564 robot = BaseRobot(**mock_robot_init['dynamixel'])565 robot.start()566 dev = robot.devices['d11']567 dev.present_position_deg.sync = True568 dev.open()569 robot.stop()570 def test_dynamixel_syncwrite(self, mock_robot_init):571 robot = BaseRobot(**mock_robot_init['dynamixel'])572 robot.start()573 robot.syncs['syncwrite'].start()574 time.sleep(1)575 robot.stop()576 def test_dynamixel_syncread(self, mock_robot_init):577 robot = BaseRobot(**mock_robot_init['dynamixel'])578 robot.start()579 robot.syncs['syncread'].start()580 time.sleep(1)581 robot.stop()582 def test_dynamixel_bulkwrite(self, mock_robot_init):583 robot = BaseRobot(**mock_robot_init['dynamixel'])584 robot.start()585 robot.syncs['bulkwrite'].start()586 time.sleep(1)587 robot.stop()588 def test_dynamixel_bulkread(self, mock_robot_init):589 robot = BaseRobot(**mock_robot_init['dynamixel'])590 robot.start()591 robot.syncs['bulkread'].start()592 time.sleep(1)593 robot.stop()594 def test_dynamixel_rangeread(self, mock_robot_init):595 robot = BaseRobot(**mock_robot_init['dynamixel'])596 robot.start()597 robot.syncs['rangeread'].start()598 time.sleep(1)599 robot.stop()600 def test_protocol1_syncread(self, mock_robot_init):601 mock_robot_init['dynamixel']['buses']['ttys1']['protocol'] = 1.0602 # we remove the bulkwrite so that the error will refer to syncread603 del mock_robot_init['dynamixel']['syncs']['bulkwrite']604 with pytest.raises(ValueError) as excinfo:605 _ = BaseRobot(**mock_robot_init['dynamixel'])606 assert 'SyncRead only supported for Dynamixel Protocol 2.0' \607 in str(excinfo.value)608 def test_protocol1_bulkwrite(self, mock_robot_init):609 mock_robot_init['dynamixel']['buses']['ttys1']['protocol'] = 1.0610 # we remove the bulkwrite so that the error will refer to syncread611 del mock_robot_init['dynamixel']['syncs']['syncread']612 with pytest.raises(ValueError) as excinfo:613 _ = BaseRobot(**mock_robot_init['dynamixel'])614 assert 'BulkWrite only supported for Dynamixel Protocol 2.0' \615 in str(excinfo.value)616 def test_dynamixel_scan(self, mock_robot_init):617 robot = BaseRobot(**mock_robot_init['dynamixel'])618 robot.start()619 ids = robot.buses['ttys1'].scan()620 assert 11 in ids621 assert 12 in ids622 robot.stop()623 def test_dynamixel_ping(self, mock_robot_init):624 robot = BaseRobot(**mock_robot_init['dynamixel'])625 robot.start()626 assert robot.buses['ttys1'].ping(11) == True627 robot.stop()628 # def test_dynamixel_bus_set_port_handler(self, mock_robot_init):629 # robot = BaseRobot(mock_robot_init)630 # mess = 'you can use the setter only with MockBus'631 # with pytest.raises(ValueError) as excinfo:632 # robot.buses['ttys1'].port_handler = 'dummy'633 # assert mess in str(excinfo.value)634 # robot.stop()635 636 # def test_dynamixel_bus_set_packet_handler(self, mock_robot_init):637 # robot = BaseRobot(mock_robot_init)638 # mess = 'you can use the setter only with MockPacketHandler'639 # with pytest.raises(ValueError) as excinfo:640 # robot.buses['ttys1'].packet_handler = 'dummy'641 # assert mess in str(excinfo.value)642 # robot.stop()643 def test_dynamixel_bus_params(self, mock_robot_init):644 robot = BaseRobot(**mock_robot_init['dynamixel'])645 assert robot.buses['ttys1'].baudrate == 19200646 assert not robot.buses['ttys1'].rs485647 robot.stop()648 def test_dynamixel_bus_closed(self, mock_robot_init, caplog):649 robot = BaseRobot(**mock_robot_init['dynamixel'])650 bus = robot.buses['ttys1']651 # ping652 caplog.clear()653 bus.ping(11)654 assert len(caplog.records) == 1655 assert 'Ping invoked with a bus not opened' in caplog.text656 # scan657 caplog.clear()658 bus.scan(range(10))659 assert len(caplog.records) == 1660 assert 'Scan invoked with a bus not opened' in caplog.text661 # read662 dev = robot.devices['d11']663 caplog.clear()664 bus.read(dev.return_delay_time)665 assert len(caplog.records) == 1666 assert 'Attempt to use closed bus "ttys1"' in caplog.text667 # write668 caplog.clear()669 bus.write(dev.return_delay_time, 10)670 assert len(caplog.records) == 1671 assert 'Attempt to use closed bus "ttys1"' in caplog.text672 robot.stop()673 def test_dynamixel_bus_acquire(self, mock_robot_init, caplog):674 robot = BaseRobot(**mock_robot_init['dynamixel'])675 robot.start()676 dev = robot.devices['d11']677 bus = robot.buses['ttys1']678 bus.can_use() # lock the bus679 # read680 caplog.clear()681 bus.read(dev.return_delay_time)682 assert len(caplog.records) >= 1683 assert 'failed to acquire bus ttys1' in caplog.text684 # write685 caplog.clear()686 bus.write(dev.return_delay_time, 10)687 assert len(caplog.records) >= 1688 assert 'failed to acquire bus ttys1' in caplog.text689 # release bus690 bus.stop_using()691 robot.stop()692 def test_dynamixel_bus_acquire_syncwrite(self, mock_robot_init, caplog):693 # syncs694 robot = BaseRobot(**mock_robot_init['dynamixel'])695 robot.start()696 robot.buses['ttys1'].can_use() # lock the bus697 caplog.clear()698 robot.syncs['syncwrite'].start()699 time.sleep(1)700 assert len(caplog.records) >= 1701 assert 'failed to acquire bus ttys1' in caplog.text702 robot.syncs['syncwrite'].stop()703 # release bus704 robot.buses['ttys1'].stop_using()705 robot.stop()706 def test_dynamixel_bus_acquire_syncread(self, mock_robot_init, caplog):707 # syncs708 robot = BaseRobot(**mock_robot_init['dynamixel'])709 robot.start()710 robot.buses['ttys1'].can_use() # lock the bus711 caplog.clear()712 robot.syncs['syncread'].start()713 time.sleep(1)714 assert len(caplog.records) >= 1715 assert 'failed to acquire bus ttys1' in caplog.text716 robot.syncs['syncread'].stop()717 # release bus718 robot.buses['ttys1'].stop_using()719 robot.stop()720 def test_dynamixel_bus_acquire_bulkwrite(self, mock_robot_init, caplog):721 # syncs722 robot = BaseRobot(**mock_robot_init['dynamixel'])723 robot.start()724 robot.buses['ttys1'].can_use() # lock the bus725 caplog.clear()726 robot.syncs['bulkwrite'].start()727 time.sleep(1)728 assert len(caplog.records) >= 1729 assert 'failed to acquire bus ttys1' in caplog.text730 robot.syncs['bulkwrite'].stop()731 # release bus732 robot.buses['ttys1'].stop_using()733 robot.stop()734 def test_dynamixel_bus_acquire_bulkread(self, mock_robot_init, caplog):735 # syncs736 robot = BaseRobot(**mock_robot_init['dynamixel'])737 robot.start()738 robot.buses['ttys1'].can_use() # lock the bus739 caplog.clear()740 robot.syncs['bulkread'].start()741 time.sleep(1)742 assert len(caplog.records) >= 1743 assert 'failed to acquire bus ttys1' in caplog.text744 robot.syncs['bulkread'].stop()745 # release bus746 robot.buses['ttys1'].stop_using()747 robot.stop()748 def test_dynamixel_register_low_endian(self, mock_robot_init, caplog):749 robot = BaseRobot(**mock_robot_init['dynamixel'])750 dev = robot.devices['d11']751 assert dev.register_low_endian(123, 4) == [123, 0, 0, 0]752 num = 12 * 256 + 42753 assert dev.register_low_endian(num, 4) == [42, 12, 0, 0]754 num = 39 * 65536 + 12 * 256 + 42755 assert dev.register_low_endian(num, 4) == [42, 12, 39, 0]756 num = 45 * 16777216 + 39 * 65536 + 12 * 256 + 42757 assert dev.register_low_endian(num, 4) == [42, 12, 39, 45]758 caplog.clear()759 _ = dev.register_low_endian(num, 6)760 assert len(caplog.records) == 1761 assert 'Unexpected register size' in caplog.text762class TestI2CRobot:763 @pytest.fixture764 def mock_robot_init(self):765 with open('tests/i2c_robot.yml', 'r') as f:766 info_dict = yaml.load(f, Loader=yaml.FullLoader)767 yield info_dict768 def test_i2c_robot_bus_error(self, mock_robot_init, caplog):769 mock_robot_init['i2crobot']['buses']['i2c2']['mock'] = False770 mock_robot_init['i2crobot']['buses']['i2c2']['auto'] = False771 mock_robot_init['i2crobot']['buses']['i2c2']['port'] = 42772 robot = BaseRobot(**mock_robot_init['i2crobot'])773 caplog.clear()774 robot.buses['i2c2'].open()775 assert len(caplog.records) >= 2776 assert 'failed to open I2C bus' in caplog.text777 # caplog.clear()778 # robot.buses['i2c2'].close()779 # assert len(caplog.records) == 2780 # assert 'failed to close I2C bus' in caplog.text781 def test_i2c_robot(self, mock_robot_init, caplog):782 robot = BaseRobot(**mock_robot_init['i2crobot'])783 robot.start()784 dev = robot.devices['imu']785 # 1 Byte registers786 for _ in range(5): # to account for possible comm errs787 dev.byte_xl_x.value = 20788 assert dev.byte_xl_x.value == 20789 # word register790 dev.word_xl_x.value = 12345791 assert dev.word_xl_x.value == 12345 792 robot.stop()793 def test_i2c_register_with_sign(self, mock_robot_init):794 robot = BaseRobot(**mock_robot_init['i2crobot'])795 robot.start()796 d = robot.devices['imu']797 # no factor798 d.word_xl_x.value = 10799 assert d.word_xl_x.value == 10800 assert d.word_xl_x.int_value == 10801 d.word_xl_x.value = -10802 assert d.word_xl_x.value == -10803 assert d.word_xl_x.int_value == (65536 - 10)804 # with factor805 d.word_xl_y.value = 100806 assert d.word_xl_y.value == 100807 assert d.word_xl_y.int_value == 1000808 d.word_xl_y.value = -100809 assert d.word_xl_y.value == -100810 assert d.word_xl_y.int_value == (65536 - 1000)811 def test_register_with_dynamic_conversion(self, mock_robot_init):812 mock_robot_init['i2crobot']['buses']['i2c2']['err'] = 0813 robot = BaseRobot(**mock_robot_init['i2crobot'])814 robot.start()815 d = robot.devices['imu']816 # set the control register817 d.range_control.value = 0b110000818 assert d.range_control_conv.value == 3819 assert d.dynamic_register.value == 1024.0 / 10.0 * 3.0820 d.range_control_conv.value = 6821 assert d.range_control.value == 0b1100000822 assert d.dynamic_register.value == 1024.0 / 10.0 * 6.0823 def test_i2c_sensor(self, mock_robot_init, caplog):824 robot = BaseRobot(**mock_robot_init['i2crobot'])825 robot.start()826 s = robot.sensors['temp']827 d = robot.devices['imu']828 assert s.device == d829 assert s.read_register == d.temp830 assert s.activate_register == d.activate_temp831 assert not s.active832 # removed the masking in sensor833 # assert s.bits is None834 assert s.offset == 0835 assert s.inverse836 assert s.auto_activate837 s.active = True838 assert s.active839 assert s.value == -10.0840 # masks841 assert robot.sensors['status0'].value 842 assert not robot.sensors['status1'].value843 # setting active for register w/o active register844 caplog.clear()845 robot.sensors['status0'].active = True846 assert len(caplog.records) == 1847 assert 'attempted to change activation of sensor' in caplog.text848 def test_i2c_sensorXYZ(self, mock_robot_init):849 robot = BaseRobot(**mock_robot_init['i2crobot'])850 robot.start()851 s = robot.sensors['gyro']852 d = robot.devices['imu']853 assert s.device == d854 assert s.x_register == d. word_g_x855 assert s.y_register == d. word_g_y856 assert s.z_register == d. word_g_z857 assert s.activate_register == d.activate_g858 assert not s.active859 assert s.x_offset == 0860 assert s.x_inverse861 assert s.y_offset == 256862 assert not s.y_inverse863 assert s.z_offset == 1024864 assert s.z_inverse 865 assert s.auto_activate866 s.active = True867 assert s.active868 def test_i2c_sensorXYZ_value(self, mock_robot_init, caplog):869 robot = BaseRobot(**mock_robot_init['i2crobot'])870 robot.start()871 s = robot.sensors['gyro']872 d = robot.devices['imu']873 exp_x = - d.word_g_x.value874 exp_y = d.word_g_y.value + 256875 exp_z = - d.word_g_z.value + 1024876 assert s.x == exp_x877 assert s.y == exp_y878 assert s.z == exp_z879 assert s.value == (exp_x, exp_y, exp_z)880 # byte sensor881 s = robot.sensors['accel_byte']882 exp_x = d.byte_xl_x.value + 128883 exp_y = - d.byte_xl_y.value -128884 exp_z = d.byte_xl_z.value + 64885 assert s.x == exp_x886 assert s.y == exp_y887 assert s.z == exp_z888 assert s.value == (exp_x, exp_y, exp_z)889 assert s.active890 # activate w/o register891 caplog.clear()892 s.active = True893 assert len(caplog.records) == 1894 assert 'attempted to change activation of sensor' in caplog.text895 def test_i2c_bus_closed(self, mock_robot_init, caplog):896 robot = BaseRobot(**mock_robot_init['i2crobot'])897 dev = robot.devices['imu']898 # write to closed bus899 caplog.clear()900 dev.byte_xl_x.value = 20901 assert len(caplog.records) >= 1902 assert 'attempted to write to a closed bus' in caplog.text903 # read from closed bus904 caplog.clear()905 _ = dev.byte_xl_x.value906 assert len(caplog.records) >= 1907 assert 'attempted to read from a closed bus' in caplog.text908 def test_i2c_read_loop(self, mock_robot_init):909 robot = BaseRobot(**mock_robot_init['i2crobot'])910 robot.start()911 robot.syncs['read_g'].start()912 time.sleep(1)913 robot.stop()914 def test_i2c_write_loop(self, mock_robot_init):915 robot = BaseRobot(**mock_robot_init['i2crobot'])916 robot.start()917 robot.syncs['write_xl'].start()918 time.sleep(1)919 robot.stop()920 def test_i2c_loop_failed_acquire(self, mock_robot_init, caplog):921 robot = BaseRobot(**mock_robot_init['i2crobot'])922 robot.start()923 # lock the bus924 robot.buses['i2c2'].can_use()925 # read926 caplog.clear()927 robot.syncs['read_g'].start()928 time.sleep(1)929 assert len(caplog.records) >= 1930 assert 'failed to acquire bus' in caplog.text931 robot.syncs['read_g'].stop() 932 # write933 caplog.clear()934 robot.syncs['write_xl'].start()935 time.sleep(1)936 assert len(caplog.records) >= 1937 assert 'failed to acquire bus' in caplog.text938 robot.syncs['read_g'].stop() 939 def test_i2c_sharedbus_closed(self, mock_robot_init, caplog):940 robot = BaseRobot(**mock_robot_init['i2crobot'])941 # we haven't started the bus942 # read943 caplog.clear()944 robot.syncs['read_g'].start()945 assert len(caplog.records) == 1946 assert 'attempt to start with a bus not open' in caplog.text947 robot.syncs['read_g'].stop() 948 # write949 caplog.clear()950 robot.syncs['write_xl'].start()951 assert len(caplog.records) == 1952 assert 'attempt to start with a bus not open' in caplog.text953 robot.syncs['read_g'].stop()954 def test_i2c_write2_with_errors(self, mock_robot_init, caplog):955 robot = BaseRobot(**mock_robot_init['i2crobot'])956 robot.start()957 device = robot.devices['imu']958 for _ in range(20): # so that we also get some errors959 device.word_control_2.value = 0x2121960 assert device.word_control_2.value == 0x2121961 def test_i2c_closed_bus(self, mock_robot_init, caplog):962 robot = BaseRobot(**mock_robot_init['i2crobot'])963 device = robot.devices['imu']964 caplog.clear()965 _ = device.word_control_2.value966 assert len(caplog.records) >= 1967 assert 'attempted to read from a closed bus' in caplog.text968 caplog.clear()969 device.word_control_2.value = 0x4242970 assert len(caplog.records) >= 1971 assert 'attempted to write to a closed bus' in caplog.text972 bus = robot.buses['i2c2']973 caplog.clear()974 _ = bus.read_block(device, 1, 6)975 assert len(caplog.records) >= 1976 assert 'attempted to read from a closed bus' in caplog.text977 caplog.clear()978 bus.write_block(device, 1, [1,2,3,4,5,6])979 assert len(caplog.records) >= 1980 assert 'attempted to write to a closed bus' in caplog.text981class TestMove:982 @pytest.fixture983 def mock_robot(self): 984 robot = BaseRobot.from_yaml('tests/move_robot.yml')985 robot.start()986 yield robot987 robot.stop()988 def test_pvl(self):989 list1 = PVLList(p=[1,2,3], v=[nan], ld=[nan, 10, 10, nan])990 assert len(list1) == 4991 list1.append(p=10, v=20, ld=nan)992 assert len(list1) == 5993 assert list1.positions == [1, 2, 3, nan, 10]994 assert list1.velocities == [nan, nan, nan, nan, 20]995 assert list1.loads == [nan, 10, 10, nan, nan]996 list1.append(p_list=[20, nan], v_list=[nan, nan, nan], l_list=[])997 assert len(list1) == 8998 list2 = PVLList(p=[3,4,5], v=[1,1,1], ld=[5,5,5])999 list1.append(pvl_list=list2)1000 assert len(list1) == 111001 list1.append(pvl=PVL(p=4, v=5, ld=6))1002 assert len(list1) == 121003 assert list1.positions == [1, 2, 3, nan, 10, 20, nan, nan, 3, 4, 5, 4]1004 # func with one item1005 list3 = PVLList()1006 list3.append(pvl=PVL(10,10,10))1007 avg = list3.process()1008 assert avg == PVL(10,10,10)1009 assert avg != 101010 # adition1011 pvl1 = PVL(10, nan, nan)1012 assert (pvl1 + 10) == PVL(20, nan, nan)1013 assert (pvl1 - 10) == PVL(0, nan, nan)1014 assert (pvl1 + PVL(5, 10, nan)) == PVL(15, 10, nan)1015 assert (pvl1 - PVL(5, 10, nan)) == PVL(5, -10, nan)1016 assert (pvl1 + [10, 20, 30]) == PVL(20, 20, 30)1017 assert (pvl1 - [10, 20, 30]) == PVL(0, -20, -30)1018 assert -pvl1 == PVL(-10, nan, nan)1019 assert not pvl1 == PVL(nan, nan, nan)1020 # raise errors1021 with pytest.raises(RuntimeError):1022 _ = pvl1 + [1,2]1023 with pytest.raises(RuntimeError):1024 _ = pvl1 + 'string'1025 with pytest.raises(RuntimeError):1026 _ = pvl1 - [1,2]1027 with pytest.raises(RuntimeError):1028 _ = pvl1 - 'string'1029 def test_move_load_robot(self, mock_robot):1030 manager = mock_robot.manager1031 assert len(manager.joints) == 31032 p = PVL(100)1033 pv = PVL(100, 10)1034 pvl = PVL(100, 10, 50)1035 all_comm = [p, pv, pvl]1036 for joint in manager.joints:1037 for comm in all_comm:1038 joint.value = comm1039 assert mock_robot.joints['j01'].value == PVL(0,nan,nan)1040 assert mock_robot.joints['j02'].value == PVL(0,57.05,nan)1041 assert mock_robot.joints['j03'].value == PVL(0,57.05,-50.0)1042 assert mock_robot.joints['j01'].desired == PVL(100, nan, nan)1043 assert mock_robot.joints['j02'].desired == PVL(100, 10.03, nan)1044 assert mock_robot.joints['j03'].desired == PVL(100, 10.03, 50.0)1045 def test_move_load_script(self, mock_robot, caplog):1046 caplog.set_level(logging.DEBUG, logger='roboglia.move.moves')1047 caplog.clear()1048 script = Script.from_yaml(robot=mock_robot, file_name='tests/moves/script_1.yml')1049 assert len(script.joints) == 41050 assert len(script.frames) == 61051 assert len(script.sequences) == 41052 c = list(script.scenes['greet'].play())1053 assert len(c) == 281054 assert len(caplog.records) >= 491055 def test_move_execute_script(self, mock_robot, caplog):1056 script = Script.from_yaml(robot=mock_robot, file_name='tests/moves/script_1.yml')1057 script.start()1058 time.sleep(1)1059 script.pause()1060 time.sleep(0.5)1061 script.resume()1062 while script.running:1063 time.sleep(0.5)1064 caplog.set_level(logging.DEBUG)1065 def test_move_execute_script_with_stop(self, mock_robot, caplog):1066 script = Script.from_yaml(robot=mock_robot, file_name='tests/moves/script_1.yml')1067 script.start()1068 time.sleep(1)1069 script.stop()1070 while script.running:1071 time.sleep(0.5)1072 def test_move_execute_two_scripts(self, mock_robot):1073 script1 = Script.from_yaml(robot=mock_robot, file_name='tests/moves/script_1.yml')1074 script2 = Script.from_yaml(robot=mock_robot, file_name='tests/moves/script_2.yml')1075 script1.start()1076 script2.start()1077 while script1.running and script2.running:1078 time.sleep(0.5)1079 time.sleep(0.5)1080 assert True1081 def test_move_execute_two_scripts_stop_robot(self, mock_robot):1082 script1 = Script.from_yaml(robot=mock_robot, file_name='tests/moves/script_1.yml')1083 script2 = Script.from_yaml(robot=mock_robot, file_name='tests/moves/script_2.yml')1084 script1.start()1085 script2.start()1086 time.sleep(0.5)1087 mock_robot.stop()1088 assert True 1089 def test_lock_joint_manager(self, mock_robot, caplog):1090 script1 = Script.from_yaml(robot=mock_robot, file_name='tests/moves/script_1.yml')1091 script1.start()1092 manager = mock_robot.manager1093 caplog.clear()1094 lock = manager._JointManager__lock1095 lock.acquire()1096 time.sleep(0.5)1097 lock.release() 1098 assert len(caplog.records) >= 11099 assert 'failed to acquire manager for stream' in caplog.text...

Full Screen

Full Screen

datalayerclient.py

Source:datalayerclient.py Github

copy

Full Screen

...203 if vt == VariantType.ARRAY_UINT8:204 return Result.UNSUPPORTED205 if vt == VariantType.BOOL8:206 data.set_bool8(bool(value))207 result, var = client.write_sync(address, data)208 return result209 if vt == VariantType.FLATBUFFERS:210 return Result.UNSUPPORTED211 if vt == VariantType.FLOAT32:212 data.set_float32(float(value))213 result, var = client.write_sync(address, data)214 return result215 if vt == VariantType.FLOAT64:216 data.set_float64(float(value))217 result, var = client.write_sync(address, data)218 return result219 if vt == VariantType.INT16:220 data.set_int16(int(value))221 result, var = client.write_sync(address, data)222 return result223 if vt == VariantType.INT32:224 data.set_int32(int(value))225 result, var = client.write_sync(address, data)226 return result227 if vt == VariantType.INT64:228 data.set_int64(int(value))229 result, var = client.write_sync(address, data)230 return result231 if vt == VariantType.INT8:232 data.set_int8(int(value))233 result, var = client.write_sync(address, data)234 return result235 if vt == VariantType.STRING:236 data.set_string(value)237 result, var = client.write_sync(address, data)238 return result239 if vt == VariantType.UINT16:240 data.set_uint16(int(value))241 result, var = client.write_sync(address, data)242 return result243 if vt == VariantType.UINT32:244 data.set_uint32(int(value))245 result, var = client.write_sync(address, data)246 return result247 if vt == VariantType.UINT64:248 data.set_uint64(int(value))249 result, var = client.write_sync(address, data)250 return result251 if vt == VariantType.UINT8:252 data.set_uint8(int(value))253 result, var = client.write_sync(address, data)254 return result255 print("WARNING Unknow Variant Type:", vt)...

Full Screen

Full Screen

test_json.py

Source:test_json.py Github

copy

Full Screen

...19class JsonReaderDictsTest(Json, ReaderTest, TestCase):20 input_data = '[{"foo": "bar"},\n{"baz": "boz"}]'21 @incontext()22 def test_nofields(self, context):23 context.write_sync(EMPTY)24 context.stop()25 assert context.get_buffer() == [({"foo": "bar"},), ({"baz": "boz"},)]26class JsonReaderListsTest(Json, ReaderTest, TestCase):27 input_data = "[[1,2,3],\n[4,5,6]]"28 @incontext()29 def test_nofields(self, context):30 context.write_sync(EMPTY)31 context.stop()32 assert context.get_buffer() == [([1, 2, 3],), ([4, 5, 6],)]33 @incontext(output_type=tuple)34 def test_output_type(self, context):35 context.write_sync(EMPTY)36 context.stop()37 assert context.get_buffer() == [([1, 2, 3],), ([4, 5, 6],)]38class JsonReaderStringsTest(Json, ReaderTest, TestCase):39 input_data = "[" + ",\n".join(map(json.dumps, ("foo", "bar", "baz"))) + "]"40 @incontext()41 def test_nofields(self, context):42 context.write_sync(EMPTY)43 context.stop()44 assert context.get_buffer() == [("foo",), ("bar",), ("baz",)]45 @incontext(output_type=tuple)46 def test_output_type(self, context):47 context.write_sync(EMPTY)48 context.stop()49 assert context.get_buffer() == [("foo",), ("bar",), ("baz",)]50class JsonWriterTest(Json, WriterTest, TestCase):51 @incontext()52 def test_fields(self, context):53 context.set_input_fields(["foo", "bar"])54 context.write_sync(("a", "b"), ("c", "d"))55 context.stop()56 assert self.readlines() == ('[{"foo": "a", "bar": "b"},', '{"foo": "c", "bar": "d"}]')57 @incontext()58 def test_fields_from_type(self, context):59 context.set_input_type(namedtuple("Point", "x y"))60 context.write_sync((1, 2), (3, 4))61 context.stop()62 assert self.readlines() == ('[{"x": 1, "y": 2},', '{"x": 3, "y": 4}]')63 @incontext()64 def test_nofields_multiple_args(self, context):65 # multiple args are iterated onto and flattened in output66 context.write_sync((FOOBAR, FOOBAR), (OD_ABC, FOOBAR), (FOOBAZ, FOOBAR))67 context.stop()68 assert self.readlines() == (69 '[{"foo": "bar"},',70 '{"foo": "bar"},',71 '{"a": "A", "b": "B", "c": "C"},',72 '{"foo": "bar"},',73 '{"foo": "baz"},',74 '{"foo": "bar"}]',75 )76 @incontext()77 def test_nofields_multiple_args_length_mismatch(self, context):78 # if length of input vary, then we get a TypeError (unrecoverable)79 with pytest.raises(TypeError):80 context.write_sync((FOOBAR, FOOBAR), (OD_ABC))81 @incontext()82 def test_nofields_single_arg(self, context):83 # single args are just dumped, shapes can vary.84 context.write_sync(FOOBAR, OD_ABC, FOOBAZ)85 context.stop()86 assert self.readlines() == ('[{"foo": "bar"},', '{"a": "A", "b": "B", "c": "C"},', '{"foo": "baz"}]')87 @incontext()88 def test_nofields_empty_args(self, context):89 # empty calls are ignored90 context.write_sync(EMPTY, EMPTY, EMPTY)91 context.stop()92 assert self.readlines() == ("[]",)93###94# Line Delimiter JSON Readers / Writers95###96class Ldjson:97 extension = "ldjson"98 ReaderNodeType = LdjsonReader99 WriterNodeType = LdjsonWriter100class LdjsonReaderDictsTest(Ldjson, ReaderTest, TestCase):101 input_data = '{"foo": "bar"}\n{"baz": "boz"}'102 @incontext()103 def test_nofields(self, context):104 context.write_sync(EMPTY)105 context.stop()106 assert context.get_buffer() == [({"foo": "bar"},), ({"baz": "boz"},)]107class LdjsonReaderListsTest(Ldjson, ReaderTest, TestCase):108 input_data = "[1,2,3]\n[4,5,6]"109 @incontext()110 def test_nofields(self, context):111 context.write_sync(EMPTY)112 context.stop()113 assert context.get_buffer() == [([1, 2, 3],), ([4, 5, 6],)]114 @incontext(output_type=tuple)115 def test_output_type(self, context):116 context.write_sync(EMPTY)117 context.stop()118 assert context.get_buffer() == [([1, 2, 3],), ([4, 5, 6],)]119class LdjsonReaderStringsTest(Ldjson, ReaderTest, TestCase):120 input_data = "\n".join(map(json.dumps, ("foo", "bar", "baz")))121 @incontext()122 def test_nofields(self, context):123 context.write_sync(EMPTY)124 context.stop()125 assert context.get_buffer() == [("foo",), ("bar",), ("baz",)]126 @incontext(output_type=tuple)127 def test_output_type(self, context):128 context.write_sync(EMPTY)129 context.stop()130 assert context.get_buffer() == [("foo",), ("bar",), ("baz",)]131class LdjsonWriterTest(Ldjson, WriterTest, TestCase):132 @incontext()133 def test_fields(self, context):134 context.set_input_fields(["foo", "bar"])135 context.write_sync(("a", "b"), ("c", "d"))136 context.stop()137 assert self.readlines() == ('{"foo": "a", "bar": "b"}', '{"foo": "c", "bar": "d"}')138 @incontext()139 def test_fields_from_type(self, context):140 context.set_input_type(namedtuple("Point", "x y"))141 context.write_sync((1, 2), (3, 4))142 context.stop()143 assert self.readlines() == ('{"x": 1, "y": 2}', '{"x": 3, "y": 4}')144 @incontext()145 def test_nofields_multiple_args(self, context):146 # multiple args are iterated onto and flattened in output147 context.write_sync((FOOBAR, FOOBAR), (OD_ABC, FOOBAR), (FOOBAZ, FOOBAR))148 context.stop()149 assert self.readlines() == (150 '{"foo": "bar"}',151 '{"foo": "bar"}',152 '{"a": "A", "b": "B", "c": "C"}',153 '{"foo": "bar"}',154 '{"foo": "baz"}',155 '{"foo": "bar"}',156 )157 @incontext()158 def test_nofields_multiple_args_length_mismatch(self, context):159 # if length of input vary, then we get a TypeError (unrecoverable)160 with pytest.raises(TypeError):161 context.write_sync((FOOBAR, FOOBAR), (OD_ABC))162 @incontext()163 def test_nofields_single_arg(self, context):164 # single args are just dumped, shapes can vary.165 context.write_sync(FOOBAR, OD_ABC, FOOBAZ)166 context.stop()167 assert self.readlines() == ('{"foo": "bar"}', '{"a": "A", "b": "B", "c": "C"}', '{"foo": "baz"}')168 @incontext()169 def test_nofields_empty_args(self, context):170 # empty calls are ignored171 context.write_sync(EMPTY, EMPTY, EMPTY)172 context.stop()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful