Best Python code snippet using molotov_python
Serial.py
Source:Serial.py  
1import serial2import time3import usb4import struct5import enum6vid_pid ="10c4:ea60"7ser = serial.Serial()8RED = '\033[91m'9BOLD = '\033[1m'10YELLOW = '\033[93m'11CYAN = '\033[96m'12GREEN = '\033[92m'13END = '\033[0m'14UNDERLINE = '\033[4m'15buffer = ""16class FRAME(enum.Enum):17    START = 218    STOP = 319    LF = 1020class ADDR(enum.Enum):21    BROADCAST = 022    MASTER = 123class CTRL(enum.Enum):24    NONE = 025    ADR = 6526    GET_STATE = 8327    GET_FRQ = 7028    GET_L = 7629    IO0 = 4830    IO1 = 4931    CALIBRATE = 6732    PING = 8033class ARG_1(enum.Enum):34    NONE = 035    REMOVE  = 8236    GIVE = 7137    SET_IN_OUT = 6838    READ = 6639    SET = 7340class ARG_2(enum.Enum):41    NONE = 042class ERROR(enum.Enum):43    NONE = 044    TIMEOUT_ERROR = 145    STX_ERROR = 246    STOP_LF_ERROR = 347    LENGTH_MISMATCH_ERROR = 448def init_serial(port):49    if port != 0:50        ser.port = port51        ser.baudrate = 5760052        ser.bytesize = 853        ser.stopbits = 154        ser.parity = serial.PARITY_NONE55        ser.timeout = 256        if ser.is_open !=True:57            ser.open()58        print("starting")59    else:60        print("UART Bridge not found")61def write_frame(adress, controll = CTRL.NONE.value, argument_1 = ARG_1.NONE.value, argument_2 = ARG_2.NONE.value, data = [0] * 8): 62    if ser.is_open:63        safetime = 1e-364        if adress < 256 and adress >= 0 and adress != None:65            adress = "0x{:02x}".format(adress)66            #print(adress)67            MSB = adress[2].encode()68            LSB = adress[3].encode()   69        else: 70            return "Wrong Adress value"71        if controll < 256 and controll > -1:72            controll = struct.pack('>B',controll)73        else:74            return "CTRL wrong size"75        if argument_1 < 256 and argument_1 > -1:76            argument_1 = struct.pack('>B',argument_1)77        else:78            return "ARG_1 wrong size"79        if argument_2 < 256 and argument_2 > -1:80            argument_2 = struct.pack('>B',argument_2)81        else:82            return "ARG_2 wrong size"83        ser.reset_input_buffer()84        ser.reset_output_buffer()  85        ser.write(struct.pack('>B', FRAME.START.value))86        time.sleep(safetime)87        ser.write(MSB)88        time.sleep(safetime)89        ser.write(LSB)90        time.sleep(safetime)91        ser.write(controll)92        time.sleep(safetime)93        ser.write(argument_1)94        time.sleep(safetime)95        ser.write(argument_2)96        time.sleep(safetime)97        98        for i in range(8):99            if data[i] is None:100                ser.write(struct.pack('>B',0))101                time.sleep(safetime)102            elif type(data[i]) is not int:103                ser.write(data[i])104                time.sleep(safetime)105            else:106                ser.write(struct.pack('>B',data[i]))107                time.sleep(safetime)108        ser.write(struct.pack('>B', FRAME.STOP.value))109        time.sleep(safetime)110        ser.write(struct.pack('>B', FRAME.LF.value))111        time.sleep(safetime)112        return "Frame sent"113    else:114        return "Port not open"115def give_all_adr(tries):116    retries = tries117    error_sent = ERROR.NONE118    error_received = ERROR.NONE119    s = ''120    i = 40121    adresses = []122    remove_all_adresses()123    if tries <= 1:124        print("tries must be greater than 0")125        return 126    while error_received != ERROR.TIMEOUT_ERROR:127        give_adr(i,ARG_1.GIVE.value)128        (s, data, error_sent, error_received) = read_frame(True)129        adresses.append(i)130        if error_received != ERROR.TIMEOUT_ERROR and error_received != ERROR.NONE:131            print(f"Unexpected Error occured during adress assignment: {error_received.name}")132            print(s)133            return134        i+=1135    adresses.pop(-1)136    if adresses == []:137        return None    138    return adresses139def give_adr(adress: int, remove_give):140    if adress < 256 and adress > -1 and adress != None:141        adress = "0x{:02x}".format(adress)142        MSB = adress[2].encode()143        LSB = adress[3].encode() 144        write_frame(adress = 0, controll = CTRL.ADR.value, argument_1 = remove_give, data = [MSB, LSB, 0, 0, 0, 0, 0, 0])145        return "Frame sent"146    else:147        return "Wrong Adress value"148def remove_all_adresses():149    write_frame(adress = ADDR.BROADCAST.value, controll = CTRL.ADR.value,argument_1 = ARG_1.REMOVE.value ,data = [0, 0, 0, 0, 0, 0, 0, 0])150    time.sleep(1)151def set_io(device_adr, number, out: bool, on_off: bool) -> str:152    on_off = int(on_off == True)153    if number == 0:154        if out == 1:155            return write_frame(adress = device_adr, controll = CTRL.IO0.value, argument_1 = ARG_1.SET.value, argument_2 = on_off, data = [0, 0, 0, 0, 0, 0, 0, 0])156        return write_frame(adress = 0, controll = CTRL.IO0.value, argument_1 = ARG_1.READ.value, argument_2 = on_off, data = [0, 0, 0, 0, 0, 0, 0, 0])157        158    elif number == 1:159        if out == 1:160            return write_frame(adress = device_adr, controll = CTRL.IO1.value, argument_1 = ARG_1.SET.value, argument_2 = on_off, data = [0, 0, 0, 0, 0, 0, 0, 0])161        return write_frame(adress = 0, controll = CTRL.IO1.value, argument_1 = ARG_1.READ.value, argument_2 = on_off, data = [0, 0, 0, 0, 0, 0, 0, 0])162def read_frame(loopback: bool):163    global buffer 164    s = ''165    error_sent = ERROR.NONE.value166    error_received = ERROR.NONE.value167    if loopback:168        (s, data, error_sent, error_received) = read_frame(False)169        error_sent = error_received170        s += '\t' * 6 + '|\n'171        s += '\t' * 6 + 'V\n'172    line = ser.readline()173    if loopback is False:174        buffer += str(line) + "\n"175    s += byteframe_to_string(line)[0]176    data = byteframe_to_string(line)[1]177    error_received = byteframe_to_string(line)[2]178    return s, data, error_sent, error_received179def byteframe_to_string(b = [0] * 8):180    dashes = '-' * 100 + '\n'181    lines = END + '_' * 100 + '\n'182    s = ''183    s += f'\nBytearray: {b}\n' 184    if b == b'':185        s = '\n' + lines186        s += f'Received Byte Array: {b} \t\n' + RED187        return s + f'{ERROR.TIMEOUT_ERROR.name}\n' + lines, 0, ERROR.TIMEOUT_ERROR188    if len(b) != 16:189        s = '\n' + lines190        s += f'Received Byte Array: {b} \t\n' + RED191        return s + f'{ERROR.LENGTH_MISMATCH_ERROR.name}: {len(b)}\n' + lines, 0, ERROR.LENGTH_MISMATCH_ERROR192    if b[0] != 2:193        s = '\n' + lines194        s += f'Received Byte Array: {b} \t\n' + RED195        return s + f'{ERROR.STX_ERROR.name}: b[0]= {b[0]}\n' + lines, 0, ERROR.STX_ERROR196    if b[14] != 3 or b[15] != 10:197        s = '\n' + lines198        s += f'Received Byte Array: {b} \t\n' + RED199        return s + f'{ERROR.STOP_LF_ERROR.name}: b[14] = {b[14]}, b[15] = {b[15]}\n' + lines, 0, ERROR.STOP_LF_ERROR200    s = '\n' + dashes 201    if b != None:202        s += f'\nSTX: {b[0]} \t'203        #print(b)204        #print(b[1:3])205        #print("HMM: " + b[1:3].decode("ascii"))206        address = int(b[1:3].decode("ascii"), 16)207        try:208            s += f'Adresse: {YELLOW}{ADDR(address).name}{END} ' + END209        except:210            s += f'Adresse: {YELLOW}{address}{END} \t'211        212        s += f'CTRL: {CYAN}{CTRL(b[3]).name}{END} \t'213        try:214            s += f'ARG_1: {GREEN}{ARG_1(b[4]).name}{END} \t'215        except:216            s += f'ARG_1: {GREEN}{b[4]}{END} \t'217        try:218            s += f'ARG_2: {GREEN}{ARG_2(b[5]).name}{END} \t'219        except:220            s += f'ARG_2: {GREEN}{b[5]}{END} \t'221        s += f'STOP: {b[14]} \t'222        s += f'LF: {b[15]} \t\n'223        s1 = s2 = s3 = s4 =''224        for i in range(8):225            s1 += f'\t| {b[6 + i]}'226            s2 += f'\t|   {i}'227            s3 += '\t| ' + '0x{:02x}'.format(b[6 + i])228            s4 += '\t| ' + chr(b[6 + i])229            230        s += '\nDATA: ' + UNDERLINE + '\nNr.: ' + s2 + '\t|' + END + '\nDEC: ' + s1 + '\t|\nHEX: ' + s3 +'\t|\nASCII: ' + s4 + '\t|\n'231        s += dashes + '\n'232    data = b[6:14]233    return s, data, ERROR.NONE234    235def print_frame_error(loopback: bool):236    (s, data, error_sent, error_received) = read_frame(loopback)237    if error_sent != ERROR.NONE or error_received != ERROR.NONE:238        print(s)239    ...test_resume_unsupported.py
Source:test_resume_unsupported.py  
1import asyncio2from typing import Tuple3import pytest4from rsocket.error_codes import ErrorCode5from rsocket.extensions.mimetypes import WellKnownMimeTypes6from rsocket.frame import SetupFrame, ResumeFrame7from rsocket.payload import Payload8from rsocket.request_handler import BaseRequestHandler9from rsocket.rsocket_client import RSocketClient10from rsocket.rsocket_server import RSocketServer11from tests.rsocket.misbehaving_rsocket import MisbehavingRSocket12@pytest.mark.allow_error_log(regex_filter='Protocol error')13async def test_setup_resume_unsupported(pipe_tcp_without_auto_connect: Tuple[RSocketServer, RSocketClient]):14    _, client = pipe_tcp_without_auto_connect15    received_error_code = None16    error_received = asyncio.Event()17    class Handler(BaseRequestHandler):18        async def on_error(self, error_code: ErrorCode, payload: Payload):19            nonlocal received_error_code20            received_error_code = error_code21            error_received.set()22    client.set_handler_using_factory(Handler)23    async with client as connected_client:24        transport = await connected_client._current_transport()25        bad_client = MisbehavingRSocket(transport)26        setup = SetupFrame()27        setup.flags_lease = False28        setup.flags_resume = True29        setup.token_length = 130        setup.resume_identification_token = b'a'31        setup.keep_alive_milliseconds = 12332        setup.max_lifetime_milliseconds = 45633        setup.data_encoding = WellKnownMimeTypes.APPLICATION_JSON.name.encode()34        setup.metadata_encoding = WellKnownMimeTypes.APPLICATION_JSON.name.encode()35        await bad_client.send_frame(setup)36        await error_received.wait()37        assert received_error_code == ErrorCode.UNSUPPORTED_SETUP38@pytest.mark.allow_error_log(regex_filter='Protocol error')39async def test_resume_request_unsupported(pipe_tcp: Tuple[RSocketServer, RSocketClient]):40    server, client = pipe_tcp41    received_error_code = None42    error_received = asyncio.Event()43    class Handler(BaseRequestHandler):44        async def on_error(self, error_code: ErrorCode, payload: Payload):45            nonlocal received_error_code46            received_error_code = error_code47            error_received.set()48    client.set_handler_using_factory(Handler)49    transport = await client._current_transport()50    bad_client = MisbehavingRSocket(transport)51    resume = ResumeFrame()52    resume.token_length = 153    resume.resume_identification_token = b'a'54    resume.first_client_position = 12355    resume.last_server_position = 123456    await bad_client.send_frame(resume)57    await error_received.wait()...test_http_functions.py
Source:test_http_functions.py  
1from unittest.mock import patch2import pytest3from app_server import http_functions4def test_auth_server_ping_successfully():5	with patch('requests.get') as mock:6		mock.return_value.status_code = 2007		response = http_functions.get_auth_server_request('test')8		assert response.status_code == 2009def test_auth_server_ping_error_url_empty():10	with pytest.raises(ValueError) as error_received:11		http_functions.get_auth_server_request('')12	assert str(error_received.value) == 'URL received is empty'13def test_auth_media_ping_error_url_empty():14	with pytest.raises(ValueError) as error_received:15		http_functions.get_media_server_ping('')16	assert str(error_received.value) == 'URL received is empty'17def test_post_auth_server_error_url_empty():18	with pytest.raises(ValueError) as error_received:19		http_functions.post_auth_server('', None)20	assert str(error_received.value) == 'URL received is empty'21def test_post_auth_server_error_data_empty():22	with pytest.raises(ValueError) as error_received:23		http_functions.post_auth_server('test', None)24	assert str(error_received.value) == 'User data is None'25def test_put_auth_server_error_url_empty():26	with pytest.raises(ValueError) as error_received:27		http_functions.put_auth_server('', None)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
