How to use _wait_until_ready method in localstack

Best Python code snippet using localstack_python

transceiver.py

Source:transceiver.py Github

copy

Full Screen

...88 #89 # Send/Receive Messages90 #91 def write(self, message):92 self._wait_until_ready()93 self._serial.write(message.bytes)94 def read(self, num_bytes):95 data = self._serial.read(num_bytes)96 # print('Received..', data)97 return data98 def _get_first_byte(self):99 return self.read(1)100 # self._wait_until_ready()101 # buf_byte = None102 # # If state is good since last read, clear the null-bytes103 # # and return the first data byte104 # if(self._state == 0):105 # print('state good')106 # while(True):107 # print('waiting for byte')108 # buf_byte = self._serial.read(1)109 # print('byte read', buf_byte)110 # if(buf_byte != b'\x00'):111 # print('breaking')112 # break113 # print('returning first byte', buf_byte)114 # return buf_byte115 # # If the state is bad, begin resetting the state. First, clear116 # # all bad bytes from previous message, then set the state to correcting.117 # while(self._state == 2):118 # buf_byte = self._serial.read(1)119 # if(buf_byte is 0x00):120 # self._state == 1121 # # Next remove all null bytes and set the mode back to good. Return first122 # # good byte to the user.123 # while(self._state == 1):124 # buf_byte = self._serial.read(1)125 # if(buf_byte is not 0x00):126 # self._state = 0127 # return buf_byte128 def get_message(self):129 # get the op_code130 op_code = unpack('B', self.read(1))[0]131 print("opcode", op_code)132 # get the length133 length = unpack('B', self.read(1))[0]134 print("length", length)135 # get the data136 data = unpack(OP_TO_FMT_STR[op_code], self.read(length))137 print("data", data)138 # get the checksum139 checksum = unpack('B', self.read(1))[0]140 print("recv_checksum", checksum)141 # create a message object from given data142 message = Message(op_code=op_code, data=data)143 # if the data is invalid, throw an error144 # The XOR (^) of two identical numbers will result in 0145 # Therefore, if the checksum from rebuilding the message146 # XOR'd with the checksum sent over the line is non-zero,147 # data was corrupted.148 print('built_checksum', message.checksum)149 if(message.checksum ^ checksum):150 # throw error151 # Set state to bad (2)152 print('Invalid data')153 # all clear, return message object to the user154 return message155 #156 # Change Device Settings157 #158 def _set_config_mode(self):159 if(self.mode != "config"):160 print('setting config mode')161 self._wait_until_ready()162 time.sleep(0.02)163 self.m0 = False164 self.m1 = False165 self.mode = "config"166 self._wait_until_ready()167 time.sleep(0.02)168 print('finished setting mode')169 def _set_transmission_mode(self):170 if(self.mode != "transmission"):171 print('setting transmit mode:')172 self._wait_until_ready()173 time.sleep(0.02)174 self.m0 = True175 self.m1 = True176 self.mode = "transmission"177 self._wait_until_ready()178 time.sleep(0.02)179 print('finished setting mode')180 def set_frequency(self, freq):181 adj_freq = freq - self.MIN_FREQUENCY182 if(adj_freq <= 0x1F and adj_freq >= 0x00):183 last_config = self._last_config184 self._set_config_mode()185 req = bytearray(186 [0xC2, last_config[1],187 last_config[2],188 last_config[3],189 adj_freq,190 last_config[5]]191 )192 self._serial.write(req)193 self._set_transmission_mode()194 def set_air_data_rate(self, data_rate):195 if(data_rate <= 0x5 and data_rate >= 0x00):196 last_config = self._last_config197 self._set_config_mode()198 req = bytearray(199 [0xC2,200 last_config[1],201 last_config[2],202 last_config[3],203 data_rate,204 last_config[5]]205 )206 self._serial.write(req)207 self._set_transmission_mode()208 #209 # Utility methods -- NOT WORKING IMPLEMENTATION210 #211 def _wait_until_ready(self):212 '''213 Blocks execution of code until device is ready.214 '''215 while(not self.is_available):216 continue217 def _fininalize_initialization(self):218 # Default starting mode is transmission mode219 self.mode = "config"220 self._set_transmission_mode()221 def _unique_initalization(self):222 pass223# Current pinout on FTDI chip224# CTS := AUX225# RTS := M1...

Full Screen

Full Screen

test_recent_performance_samples.py

Source:test_recent_performance_samples.py Github

copy

Full Screen

2import time3from pytest import fixture, mark4from .utils import assert_valid_response5@fixture(scope="session")6def _wait_until_ready() -> None:7 """Sleep for a minute so that performance samples are available."""8 time.sleep(60)9@mark.integration10@mark.asyncio11async def test_get_recent_performance_samples_async(test_http_client_async, _wait_until_ready):12 """Test get recent performance samples (async)."""13 resp = await test_http_client_async.get_recent_performance_samples(4)14 assert_valid_response(resp)15@mark.integration16def test_get_recent_performance_samples(test_http_client, _wait_until_ready):17 """Test get recent performance samples (synchronous)."""18 resp = test_http_client.get_recent_performance_samples(4)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful