How to use error_received method in Molotov

Best Python code snippet using molotov_python

Serial.py

Source:Serial.py Github

copy

Full Screen

1import serial2import time3import usb4import struct5import enum6vid_pid ="10c4:ea60"7ser = serial.Serial()8RED = '\033[91m'9BOLD = '\033[1m'10YELLOW = '\033[93m'11CYAN = '\033[96m'12GREEN = '\033[92m'13END = '\033[0m'14UNDERLINE = '\033[4m'15buffer = ""16class FRAME(enum.Enum):17 START = 218 STOP = 319 LF = 1020class ADDR(enum.Enum):21 BROADCAST = 022 MASTER = 123class CTRL(enum.Enum):24 NONE = 025 ADR = 6526 GET_STATE = 8327 GET_FRQ = 7028 GET_L = 7629 IO0 = 4830 IO1 = 4931 CALIBRATE = 6732 PING = 8033class ARG_1(enum.Enum):34 NONE = 035 REMOVE = 8236 GIVE = 7137 SET_IN_OUT = 6838 READ = 6639 SET = 7340class ARG_2(enum.Enum):41 NONE = 042class ERROR(enum.Enum):43 NONE = 044 TIMEOUT_ERROR = 145 STX_ERROR = 246 STOP_LF_ERROR = 347 LENGTH_MISMATCH_ERROR = 448def init_serial(port):49 if port != 0:50 ser.port = port51 ser.baudrate = 5760052 ser.bytesize = 853 ser.stopbits = 154 ser.parity = serial.PARITY_NONE55 ser.timeout = 256 if ser.is_open !=True:57 ser.open()58 print("starting")59 else:60 print("UART Bridge not found")61def write_frame(adress, controll = CTRL.NONE.value, argument_1 = ARG_1.NONE.value, argument_2 = ARG_2.NONE.value, data = [0] * 8): 62 if ser.is_open:63 safetime = 1e-364 if adress < 256 and adress >= 0 and adress != None:65 adress = "0x{:02x}".format(adress)66 #print(adress)67 MSB = adress[2].encode()68 LSB = adress[3].encode() 69 else: 70 return "Wrong Adress value"71 if controll < 256 and controll > -1:72 controll = struct.pack('>B',controll)73 else:74 return "CTRL wrong size"75 if argument_1 < 256 and argument_1 > -1:76 argument_1 = struct.pack('>B',argument_1)77 else:78 return "ARG_1 wrong size"79 if argument_2 < 256 and argument_2 > -1:80 argument_2 = struct.pack('>B',argument_2)81 else:82 return "ARG_2 wrong size"83 ser.reset_input_buffer()84 ser.reset_output_buffer() 85 ser.write(struct.pack('>B', FRAME.START.value))86 time.sleep(safetime)87 ser.write(MSB)88 time.sleep(safetime)89 ser.write(LSB)90 time.sleep(safetime)91 ser.write(controll)92 time.sleep(safetime)93 ser.write(argument_1)94 time.sleep(safetime)95 ser.write(argument_2)96 time.sleep(safetime)97 98 for i in range(8):99 if data[i] is None:100 ser.write(struct.pack('>B',0))101 time.sleep(safetime)102 elif type(data[i]) is not int:103 ser.write(data[i])104 time.sleep(safetime)105 else:106 ser.write(struct.pack('>B',data[i]))107 time.sleep(safetime)108 ser.write(struct.pack('>B', FRAME.STOP.value))109 time.sleep(safetime)110 ser.write(struct.pack('>B', FRAME.LF.value))111 time.sleep(safetime)112 return "Frame sent"113 else:114 return "Port not open"115def give_all_adr(tries):116 retries = tries117 error_sent = ERROR.NONE118 error_received = ERROR.NONE119 s = ''120 i = 40121 adresses = []122 remove_all_adresses()123 if tries <= 1:124 print("tries must be greater than 0")125 return 126 while error_received != ERROR.TIMEOUT_ERROR:127 give_adr(i,ARG_1.GIVE.value)128 (s, data, error_sent, error_received) = read_frame(True)129 adresses.append(i)130 if error_received != ERROR.TIMEOUT_ERROR and error_received != ERROR.NONE:131 print(f"Unexpected Error occured during adress assignment: {error_received.name}")132 print(s)133 return134 i+=1135 adresses.pop(-1)136 if adresses == []:137 return None 138 return adresses139def give_adr(adress: int, remove_give):140 if adress < 256 and adress > -1 and adress != None:141 adress = "0x{:02x}".format(adress)142 MSB = adress[2].encode()143 LSB = adress[3].encode() 144 write_frame(adress = 0, controll = CTRL.ADR.value, argument_1 = remove_give, data = [MSB, LSB, 0, 0, 0, 0, 0, 0])145 return "Frame sent"146 else:147 return "Wrong Adress value"148def remove_all_adresses():149 write_frame(adress = ADDR.BROADCAST.value, controll = CTRL.ADR.value,argument_1 = ARG_1.REMOVE.value ,data = [0, 0, 0, 0, 0, 0, 0, 0])150 time.sleep(1)151def set_io(device_adr, number, out: bool, on_off: bool) -> str:152 on_off = int(on_off == True)153 if number == 0:154 if out == 1:155 return write_frame(adress = device_adr, controll = CTRL.IO0.value, argument_1 = ARG_1.SET.value, argument_2 = on_off, data = [0, 0, 0, 0, 0, 0, 0, 0])156 return write_frame(adress = 0, controll = CTRL.IO0.value, argument_1 = ARG_1.READ.value, argument_2 = on_off, data = [0, 0, 0, 0, 0, 0, 0, 0])157 158 elif number == 1:159 if out == 1:160 return write_frame(adress = device_adr, controll = CTRL.IO1.value, argument_1 = ARG_1.SET.value, argument_2 = on_off, data = [0, 0, 0, 0, 0, 0, 0, 0])161 return write_frame(adress = 0, controll = CTRL.IO1.value, argument_1 = ARG_1.READ.value, argument_2 = on_off, data = [0, 0, 0, 0, 0, 0, 0, 0])162def read_frame(loopback: bool):163 global buffer 164 s = ''165 error_sent = ERROR.NONE.value166 error_received = ERROR.NONE.value167 if loopback:168 (s, data, error_sent, error_received) = read_frame(False)169 error_sent = error_received170 s += '\t' * 6 + '|\n'171 s += '\t' * 6 + 'V\n'172 line = ser.readline()173 if loopback is False:174 buffer += str(line) + "\n"175 s += byteframe_to_string(line)[0]176 data = byteframe_to_string(line)[1]177 error_received = byteframe_to_string(line)[2]178 return s, data, error_sent, error_received179def byteframe_to_string(b = [0] * 8):180 dashes = '-' * 100 + '\n'181 lines = END + '_' * 100 + '\n'182 s = ''183 s += f'\nBytearray: {b}\n' 184 if b == b'':185 s = '\n' + lines186 s += f'Received Byte Array: {b} \t\n' + RED187 return s + f'{ERROR.TIMEOUT_ERROR.name}\n' + lines, 0, ERROR.TIMEOUT_ERROR188 if len(b) != 16:189 s = '\n' + lines190 s += f'Received Byte Array: {b} \t\n' + RED191 return s + f'{ERROR.LENGTH_MISMATCH_ERROR.name}: {len(b)}\n' + lines, 0, ERROR.LENGTH_MISMATCH_ERROR192 if b[0] != 2:193 s = '\n' + lines194 s += f'Received Byte Array: {b} \t\n' + RED195 return s + f'{ERROR.STX_ERROR.name}: b[0]= {b[0]}\n' + lines, 0, ERROR.STX_ERROR196 if b[14] != 3 or b[15] != 10:197 s = '\n' + lines198 s += f'Received Byte Array: {b} \t\n' + RED199 return s + f'{ERROR.STOP_LF_ERROR.name}: b[14] = {b[14]}, b[15] = {b[15]}\n' + lines, 0, ERROR.STOP_LF_ERROR200 s = '\n' + dashes 201 if b != None:202 s += f'\nSTX: {b[0]} \t'203 #print(b)204 #print(b[1:3])205 #print("HMM: " + b[1:3].decode("ascii"))206 address = int(b[1:3].decode("ascii"), 16)207 try:208 s += f'Adresse: {YELLOW}{ADDR(address).name}{END} ' + END209 except:210 s += f'Adresse: {YELLOW}{address}{END} \t'211 212 s += f'CTRL: {CYAN}{CTRL(b[3]).name}{END} \t'213 try:214 s += f'ARG_1: {GREEN}{ARG_1(b[4]).name}{END} \t'215 except:216 s += f'ARG_1: {GREEN}{b[4]}{END} \t'217 try:218 s += f'ARG_2: {GREEN}{ARG_2(b[5]).name}{END} \t'219 except:220 s += f'ARG_2: {GREEN}{b[5]}{END} \t'221 s += f'STOP: {b[14]} \t'222 s += f'LF: {b[15]} \t\n'223 s1 = s2 = s3 = s4 =''224 for i in range(8):225 s1 += f'\t| {b[6 + i]}'226 s2 += f'\t| {i}'227 s3 += '\t| ' + '0x{:02x}'.format(b[6 + i])228 s4 += '\t| ' + chr(b[6 + i])229 230 s += '\nDATA: ' + UNDERLINE + '\nNr.: ' + s2 + '\t|' + END + '\nDEC: ' + s1 + '\t|\nHEX: ' + s3 +'\t|\nASCII: ' + s4 + '\t|\n'231 s += dashes + '\n'232 data = b[6:14]233 return s, data, ERROR.NONE234 235def print_frame_error(loopback: bool):236 (s, data, error_sent, error_received) = read_frame(loopback)237 if error_sent != ERROR.NONE or error_received != ERROR.NONE:238 print(s)239 ...

Full Screen

Full Screen

test_resume_unsupported.py

Source:test_resume_unsupported.py Github

copy

Full Screen

1import asyncio2from typing import Tuple3import pytest4from rsocket.error_codes import ErrorCode5from rsocket.extensions.mimetypes import WellKnownMimeTypes6from rsocket.frame import SetupFrame, ResumeFrame7from rsocket.payload import Payload8from rsocket.request_handler import BaseRequestHandler9from rsocket.rsocket_client import RSocketClient10from rsocket.rsocket_server import RSocketServer11from tests.rsocket.misbehaving_rsocket import MisbehavingRSocket12@pytest.mark.allow_error_log(regex_filter='Protocol error')13async def test_setup_resume_unsupported(pipe_tcp_without_auto_connect: Tuple[RSocketServer, RSocketClient]):14 _, client = pipe_tcp_without_auto_connect15 received_error_code = None16 error_received = asyncio.Event()17 class Handler(BaseRequestHandler):18 async def on_error(self, error_code: ErrorCode, payload: Payload):19 nonlocal received_error_code20 received_error_code = error_code21 error_received.set()22 client.set_handler_using_factory(Handler)23 async with client as connected_client:24 transport = await connected_client._current_transport()25 bad_client = MisbehavingRSocket(transport)26 setup = SetupFrame()27 setup.flags_lease = False28 setup.flags_resume = True29 setup.token_length = 130 setup.resume_identification_token = b'a'31 setup.keep_alive_milliseconds = 12332 setup.max_lifetime_milliseconds = 45633 setup.data_encoding = WellKnownMimeTypes.APPLICATION_JSON.name.encode()34 setup.metadata_encoding = WellKnownMimeTypes.APPLICATION_JSON.name.encode()35 await bad_client.send_frame(setup)36 await error_received.wait()37 assert received_error_code == ErrorCode.UNSUPPORTED_SETUP38@pytest.mark.allow_error_log(regex_filter='Protocol error')39async def test_resume_request_unsupported(pipe_tcp: Tuple[RSocketServer, RSocketClient]):40 server, client = pipe_tcp41 received_error_code = None42 error_received = asyncio.Event()43 class Handler(BaseRequestHandler):44 async def on_error(self, error_code: ErrorCode, payload: Payload):45 nonlocal received_error_code46 received_error_code = error_code47 error_received.set()48 client.set_handler_using_factory(Handler)49 transport = await client._current_transport()50 bad_client = MisbehavingRSocket(transport)51 resume = ResumeFrame()52 resume.token_length = 153 resume.resume_identification_token = b'a'54 resume.first_client_position = 12355 resume.last_server_position = 123456 await bad_client.send_frame(resume)57 await error_received.wait()...

Full Screen

Full Screen

test_http_functions.py

Source:test_http_functions.py Github

copy

Full Screen

1from unittest.mock import patch2import pytest3from app_server import http_functions4def test_auth_server_ping_successfully():5 with patch('requests.get') as mock:6 mock.return_value.status_code = 2007 response = http_functions.get_auth_server_request('test')8 assert response.status_code == 2009def test_auth_server_ping_error_url_empty():10 with pytest.raises(ValueError) as error_received:11 http_functions.get_auth_server_request('')12 assert str(error_received.value) == 'URL received is empty'13def test_auth_media_ping_error_url_empty():14 with pytest.raises(ValueError) as error_received:15 http_functions.get_media_server_ping('')16 assert str(error_received.value) == 'URL received is empty'17def test_post_auth_server_error_url_empty():18 with pytest.raises(ValueError) as error_received:19 http_functions.post_auth_server('', None)20 assert str(error_received.value) == 'URL received is empty'21def test_post_auth_server_error_data_empty():22 with pytest.raises(ValueError) as error_received:23 http_functions.post_auth_server('test', None)24 assert str(error_received.value) == 'User data is None'25def test_put_auth_server_error_url_empty():26 with pytest.raises(ValueError) as error_received:27 http_functions.put_auth_server('', None)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Molotov automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful