Best Python code snippet using Airtest
test_run.py
Source:test_run.py  
1import asyncio2from datetime import datetime3from time import time4import pytest5from further_link.util.message import create_message, parse_message6from .helpers import receive_data, wait_for_data7@pytest.mark.asyncio8async def test_bad_message(run_ws_client):9    start_cmd = create_message("start", {"runner": "python3", "code": ""})10    await run_ws_client.send_str(start_cmd)11    await wait_for_data(run_ws_client, "error", "message", "Bad message")12@pytest.mark.asyncio13async def test_run_code_script(run_ws_client):14    code = """\15from datetime import datetime16print(datetime.now().strftime("%A"))17"""18    start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")19    await run_ws_client.send_str(start_cmd)20    await receive_data(run_ws_client, "started", process="1")21    day = datetime.now().strftime("%A")22    await wait_for_data(run_ws_client, "stdout", "output", day + "\n", 0, "1")23    await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")24@pytest.mark.asyncio25async def test_run_shell(run_ws_client):26    code = """\27date +%s # unix time in seconds28"""29    start_cmd = create_message("start", {"runner": "shell"}, "1")30    await run_ws_client.send_str(start_cmd)31    await receive_data(run_ws_client, "started", process="1")32    commands = create_message("stdin", {"input": code}, "1")33    await run_ws_client.send_str(commands)34    seconds = str(int(time()))35    await wait_for_data(run_ws_client, "stdout", "output", seconds + "\n", 0, "1")36    stop_cmd = create_message("stop", None, "1")37    await run_ws_client.send_str(stop_cmd)38    await wait_for_data(run_ws_client, "stopped", "exitCode", -15, 0, "1")39@pytest.mark.asyncio40async def test_run_executable(run_ws_client):41    code = """\42#!/bin/bash43date +%s # unix time in seconds44"""45    start_cmd = create_message("start", {"runner": "exec", "code": code}, "1")46    await run_ws_client.send_str(start_cmd)47    await receive_data(run_ws_client, "started", process="1")48    seconds = str(int(time()))49    await wait_for_data(run_ws_client, "stdout", "output", seconds + "\n", 0, "1")50    await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")51@pytest.mark.asyncio52async def test_run_two_scripts(run_ws_client):53    code1 = """\54from time import sleep55sleep(1)56print(1)57"""58    code2 = """\59print(2)60"""61    start_cmd = create_message("start", {"runner": "python3", "code": code1}, "1")62    await run_ws_client.send_str(start_cmd)63    await receive_data(run_ws_client, "started", process="1")64    start_cmd = create_message("start", {"runner": "python3", "code": code2}, "2")65    await run_ws_client.send_str(start_cmd)66    await receive_data(run_ws_client, "started", process="2")67    await wait_for_data(run_ws_client, "stdout", "output", "2\n", 0, "2")68    await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "2")69    await wait_for_data(run_ws_client, "stdout", "output", "1\n", 0, "1")70    await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")71@pytest.mark.asyncio72async def test_stop_early(run_ws_client):73    code = "while True: pass"74    start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")75    await run_ws_client.send_str(start_cmd)76    await receive_data(run_ws_client, "started", process="1")77    stop_cmd = create_message("stop", None, "1")78    await run_ws_client.send_str(stop_cmd)79    await wait_for_data(run_ws_client, "stopped", "exitCode", -15, 0, "1")80@pytest.mark.asyncio81async def test_bad_code(run_ws_client):82    code = "i'm not valid python"83    start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")84    await run_ws_client.send_str(start_cmd)85    await receive_data(run_ws_client, "started", process="1")86    await asyncio.sleep(0.1)  # wait for data87    message = await run_ws_client.receive()88    m_type, m_data, m_process = parse_message(message.data)89    assert m_type == "stderr"90    lines = m_data["output"].split("\n")91    assert lines[0].startswith("  File")92    assert lines[1] == "    i'm not valid python"93    assert lines[2][-1] == "^"94    assert lines[3] == "SyntaxError: EOL while scanning string literal"95    await wait_for_data(run_ws_client, "stopped", "exitCode", 1, 0, "1")96@pytest.mark.asyncio97async def test_input(run_ws_client):98    code = """s = input()99while "BYE" != s:100    print(["HUH?! SPEAK UP, SONNY!","NO, NOT SINCE 1930"][s.isupper()])101    s = input()"""102    start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")103    await run_ws_client.send_str(start_cmd)104    await receive_data(run_ws_client, "started", process="1")105    user_input = create_message("stdin", {"input": "hello\n"}, "1")106    await run_ws_client.send_str(user_input)107    await wait_for_data(108        run_ws_client, "stdout", "output", "HUH?! SPEAK UP, SONNY!\n", 0, "1"109    )110    user_input = create_message("stdin", {"input": "HEY GRANDMA\n"}, "1")111    await run_ws_client.send_str(user_input)112    await wait_for_data(113        run_ws_client, "stdout", "output", "NO, NOT SINCE 1930\n", 0, "1"114    )115    user_input = create_message("stdin", {"input": "BYE\n"}, "1")116    await run_ws_client.send_str(user_input)117    await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")118@pytest.mark.asyncio119async def test_two_clients(run_ws_client, run_ws_client2):120    code = "while True: pass"121    start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")122    await run_ws_client.send_str(start_cmd)123    await receive_data(run_ws_client, "started", process="1")124    await run_ws_client2.send_str(start_cmd)125    await receive_data(run_ws_client2, "started", process="1")126    stop_cmd = create_message("stop", None, "1")127    await run_ws_client.send_str(stop_cmd)128    await wait_for_data(run_ws_client, "stopped", "exitCode", -15, 100, "1")129    stop_cmd = create_message("stop", None, "1")130    await run_ws_client2.send_str(stop_cmd)131    await wait_for_data(run_ws_client2, "stopped", "exitCode", -15, 100, "1")132@pytest.mark.asyncio133async def test_out_of_order_commands(run_ws_client):134    # send input135    user_input = create_message("stdin", {"input": "hello\n"}, "1")136    await run_ws_client.send_str(user_input)137    # bad message138    await receive_data(run_ws_client, "error", "message", "Bad message")139    # send stop140    stop_cmd = create_message("stop", None, "1")141    await run_ws_client.send_str(stop_cmd)142    # bad message143    await receive_data(run_ws_client, "error", "message", "Bad message")144    # send start145    code = "while True: pass"146    start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")147    await run_ws_client.send_str(start_cmd)148    # started149    await receive_data(run_ws_client, "started", process="1")150    # send start151    start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")152    await run_ws_client.send_str(start_cmd)153    # bad message154    await receive_data(run_ws_client, "error", "message", "Bad message")155    # send stop156    stop_cmd = create_message("stop", None, "1")157    await run_ws_client.send_str(stop_cmd)158    # stopped159    await wait_for_data(run_ws_client, "stopped", "exitCode", -15, 0, "1")160    # send stop161    stop_cmd = create_message("stop", None, "1")162    await run_ws_client.send_str(stop_cmd)163    # bad message164    await receive_data(run_ws_client, "error", "message", "Bad message")165@pytest.mark.asyncio166async def test_discard_old_input(run_ws_client):167    code = 'print("hello world")'168    start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")169    await run_ws_client.send_str(start_cmd)170    await receive_data(run_ws_client, "started", process="1")171    unterminated_input = create_message("stdin", {"input": "unterminated input"}, "1")172    await run_ws_client.send_str(unterminated_input)173    await wait_for_data(run_ws_client, "stdout", "output", "hello world\n", 100, "1")174    await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")175    code = "print(input())"176    start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")177    await run_ws_client.send_str(start_cmd)178    await receive_data(run_ws_client, "started", process="1")179    user_input = create_message("stdin", {"input": "hello\n"}, "1")180    await run_ws_client.send_str(user_input)181    await wait_for_data(run_ws_client, "stdout", "output", "hello\n", 0, "1")...ConmmunicationInterface.py
Source:ConmmunicationInterface.py  
1from serial import Serial2import _thread3import serial4import time5class CommunicationInterface:6    def __init__(self,com):7        self.ser = None8        self.com_name = ''9        self.id_mapping = {10            # æ ¼å¼ä¸º[Screen id, component id]11            "minute":[0x00,0x01],12            'second':[0x00,0x02],13            'Tw':[0x00,0x07],14            'Tc':[0x00,0x15],15            "voltage_max":[0x00,0x26],16            'width1_us_0':[0x01,0x4],17            'width1_us_1':[0x01,0x7],18            'width1_us_2':[0x01,0x3B],19            'width2_us_0':[0x01,0x05],20            'width2_us_1':[0x01,0x08],21            'width2_us_2':[0x01,0x0B],22            'phase_us_0':[0x01,0x06],23            'phase_us_1':[0x01,0x09],24            'phase_us_2':[0x01,0x0C],25            'D1':[0x01,0x16],26            'D2':[0x01,0x17],27            'phase2':[0x01,0x18],28            'M':[0x01,0x1E],29            'index':[0x01,0x0D],30            'pulse_source_index':[0x00,0x12],31            'frequency_0':[0x01,0x1A],32            'frequency_1':[0x01,0x1B],33            'frequency_2':[0x01,0x1C],34            'internal':[0x00,0x7A],35            'external':[0x00,0x7B],36            'preset0':[0x01,0x7C],37            'preset1':[0x01,0x7D],38            'preset2':[0x01,0x7E]39        }40    def set_com(self,com_name):41        self.com_name = com_name42        print(f"[INFO] Change to {com_name}")43    def start_com(self):44        try:45            self.ser.close()46        except:47            pass48        if(self.ser is None):49            self.ser=Serial(self.com_name,115200,timeout=0.5)50        else:51            self.ser.open()52        # self.ser.open()53        print("[INFO] serial connected")54    def com_valid(self):55        return self.ser is not None56    def stop_com(self):57        start_cmd = bytes()58        start_cmd += bytes([0xEE,0xB1,0x11])59        start_cmd += bytes([0x00,0x00])60        start_cmd += bytes([0x00,0x69])61        start_cmd += bytes([0x00,0xFF,0xFC,0xFF,0xFF])62        start_cmd += '\n'.encode()63        self.ser.write(start_cmd)64        time.sleep(1)65        self.ser.close()66        print("[INFO] serial disconnected")67        68    def format_data(self,name,value):69        # è½¬æ¢æ°æ®çæ ¼å¼ï¼çææ°æ®å¸§70        format_result = bytes()71        format_result += bytes([0xEE,0xB1,0x11])72        try:73            format_result += bytes([0x00,self.id_mapping[name][0]])74            format_result += bytes([0x00,self.id_mapping[name][1],0x11])75            format_result += str(value).encode()76        except KeyError:77            print('[ERROR] Variable does not exist, please check it!')78        format_result += bytes([0x00,0xFF,0xFC,0xFF,0xFF])79        format_result += '\n'.encode()80        return format_result81    def start_pwm(self):82        start_cmd = bytes()83        start_cmd += bytes([0xEE,0xB1,0x11])84        start_cmd += bytes([0x00,0x00])85        start_cmd += bytes([0x00,0x08])86        start_cmd += bytes([0x00,0xFF,0xFC,0xFF,0xFF])87        start_cmd += '\n'.encode()88        try:89            self.ser.write(start_cmd)90            print("[INFO] PWM has started")91        except serial.serialutil.PortNotOpenError:92            print("[ERROR] Port has been closed")93            raise AttributeError94    def stop_pwm(self):95        start_cmd = bytes()96        start_cmd += bytes([0xEE,0xB1,0x11])97        start_cmd += bytes([0x00,0x00])98        start_cmd += bytes([0x00,0x09])99        start_cmd += bytes([0x00,0xFF,0xFC,0xFF,0xFF])100        start_cmd += '\n'.encode()101        try:102            self.ser.write(start_cmd)103            print("[INFO] PWM has stopped")104        except serial.serialutil.PortNotOpenError:105            print("[ERROR] Port has been closed")106            raise AttributeError107    def send_data(self,name,value):108        # å°æ°æ®æ ¼å¼è½¬å109        try:110            formatted_data = self.format_data(name, value)111            # # 转æ¢ä¸ºå¯ä»¥äººè½ççåå
è¿å¶çæ°æ®112            # str_hex = ":".join("{:02x}".format(ord(c)) for c in formatted_data)113            # æå°114            # print(f"Send data {str_hex}")115            # è¾åºå°ä¸²å£116            self.ser.write(formatted_data)117            print("[INFO] %s has been changed to %s"%(name.capitalize(),value))118        except serial.serialutil.PortNotOpenError:119            print("[ERROR] Port has been closed")120            # raise AttributeError121    def send_number(self,name,value):122        format_result = bytes()123        format_result += bytes([0xEE,0xB1,0x11])124        try:125            format_result += bytes([0x00,self.id_mapping[name][0]])126            format_result += bytes([0x00,self.id_mapping[name][1],0x11])127            format_result += bytes([value])128        except KeyError:129            print('[ERROR] Variable does not exist, please check it!')130        format_result += bytes([0x00,0xFF,0xFC,0xFF,0xFF])131        format_result += '\n'.encode()132        print(format_result)133        self.ser.write(format_result)134    # change trigger mode index135    def change_pulse_index(self,trigger_mode):136        """ä¿®æ¹PWMçè§¦åæ¨¡å¼137        Args:138            trigger_mode (str): internalæè
external139        """140        if(trigger_mode == 'internal'):141            self.send_number("pulse_source_index",0x00)142            print("[INFO] Index change to internal")143        else:144            self.send_number("pulse_source_index",0x01)145            print("[INFO] Index change to external")146    # change preset index147    def change_index(self,index):148        self.send_number("index",0x00+index)149        print(f"[INFO] Pulse Index: {index}")150    @DeprecationWarning151    def change_port(self,value):152        com_name = value.split(' ')[0]153        print(com_name)154        self.set_com(com_name)155        # _thread.start_new_thread(self.get_data_from_serial, (com,))...test_pihole_scripts.py
Source:test_pihole_scripts.py  
1import pytest2@pytest.fixture3def start_cmd():4    ''' broken by default, required override '''5    return None6@pytest.fixture7def RunningPiHole(DockerPersist, Slow, persist_webserver, persist_tag, start_cmd):8    ''' Override the RunningPiHole to run and check for success of a9        dnsmasq start based `pihole` script command '''10    #print DockerPersist.run('ps -ef').stdout11    Slow(lambda: DockerPersist.run('pgrep dnsmasq').rc == 0)12    Slow(lambda: DockerPersist.run('pgrep {}'.format(persist_webserver)).rc == 0)13    oldpid = DockerPersist.run('pidof dnsmasq')14    for service in [ 'dnsmasq', 'nginx', ]:15        print DockerPersist.run('service {} status'.format(service))16    cmd = DockerPersist.run('pihole {}'.format(start_cmd))17    Slow(lambda: DockerPersist.run('pgrep dnsmasq').rc == 0)18    newpid = DockerPersist.run('pidof dnsmasq')19    for pid in [oldpid, newpid]:20        assert pid != ''21    # ensure a new pid for dnsmasq appeared due to service restart22    assert oldpid != newpid23    assert cmd.rc == 024    # Save out cmd result to check different stdout of start/enable/disable25    DockerPersist.cmd = cmd26    return DockerPersist27@pytest.mark.parametrize('start_cmd', ['start_cmd'])28def test_pihole_start_cmd(RunningPiHole, start_cmd, persist_tag):29    ''' the start_cmd tests are all built into the RunningPiHole fixture in this file '''30    assert RunningPiHole.cmd.stdout == START_DNS_STDOUT[persist_tag]31@pytest.mark.parametrize('start_cmd,hostname,expected_ip', [32    ('enable',  'pi.hole', '127.0.0.1'),33    ('disable', 'pi.hole', '127.0.0.1'),34])35def test_pihole_start_cmd(RunningPiHole, Dig, persist_tag, start_cmd, hostname, expected_ip):36    ''' the start_cmd tests are all built into the RunningPiHole fixture in this file '''37    dig_cmd = "dig +time=1 +noall +answer {} @test_pihole | awk '{{ print $5 }}'".format(hostname)38    lookup = RunningPiHole.dig.run(dig_cmd).stdout.rstrip('\n')39    assert lookup == expected_ip40    stdout = "::: Blocking has been {}d!\n".format(start_cmd)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
