How to use start_cmd method in Airtest

Best Python code snippet using Airtest

test_run.py

Source:test_run.py Github

copy

Full Screen

1import asyncio2from datetime import datetime3from time import time4import pytest5from further_link.util.message import create_message, parse_message6from .helpers import receive_data, wait_for_data7@pytest.mark.asyncio8async def test_bad_message(run_ws_client):9 start_cmd = create_message("start", {"runner": "python3", "code": ""})10 await run_ws_client.send_str(start_cmd)11 await wait_for_data(run_ws_client, "error", "message", "Bad message")12@pytest.mark.asyncio13async def test_run_code_script(run_ws_client):14 code = """\15from datetime import datetime16print(datetime.now().strftime("%A"))17"""18 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")19 await run_ws_client.send_str(start_cmd)20 await receive_data(run_ws_client, "started", process="1")21 day = datetime.now().strftime("%A")22 await wait_for_data(run_ws_client, "stdout", "output", day + "\n", 0, "1")23 await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")24@pytest.mark.asyncio25async def test_run_shell(run_ws_client):26 code = """\27date +%s # unix time in seconds28"""29 start_cmd = create_message("start", {"runner": "shell"}, "1")30 await run_ws_client.send_str(start_cmd)31 await receive_data(run_ws_client, "started", process="1")32 commands = create_message("stdin", {"input": code}, "1")33 await run_ws_client.send_str(commands)34 seconds = str(int(time()))35 await wait_for_data(run_ws_client, "stdout", "output", seconds + "\n", 0, "1")36 stop_cmd = create_message("stop", None, "1")37 await run_ws_client.send_str(stop_cmd)38 await wait_for_data(run_ws_client, "stopped", "exitCode", -15, 0, "1")39@pytest.mark.asyncio40async def test_run_executable(run_ws_client):41 code = """\42#!/bin/bash43date +%s # unix time in seconds44"""45 start_cmd = create_message("start", {"runner": "exec", "code": code}, "1")46 await run_ws_client.send_str(start_cmd)47 await receive_data(run_ws_client, "started", process="1")48 seconds = str(int(time()))49 await wait_for_data(run_ws_client, "stdout", "output", seconds + "\n", 0, "1")50 await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")51@pytest.mark.asyncio52async def test_run_two_scripts(run_ws_client):53 code1 = """\54from time import sleep55sleep(1)56print(1)57"""58 code2 = """\59print(2)60"""61 start_cmd = create_message("start", {"runner": "python3", "code": code1}, "1")62 await run_ws_client.send_str(start_cmd)63 await receive_data(run_ws_client, "started", process="1")64 start_cmd = create_message("start", {"runner": "python3", "code": code2}, "2")65 await run_ws_client.send_str(start_cmd)66 await receive_data(run_ws_client, "started", process="2")67 await wait_for_data(run_ws_client, "stdout", "output", "2\n", 0, "2")68 await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "2")69 await wait_for_data(run_ws_client, "stdout", "output", "1\n", 0, "1")70 await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")71@pytest.mark.asyncio72async def test_stop_early(run_ws_client):73 code = "while True: pass"74 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")75 await run_ws_client.send_str(start_cmd)76 await receive_data(run_ws_client, "started", process="1")77 stop_cmd = create_message("stop", None, "1")78 await run_ws_client.send_str(stop_cmd)79 await wait_for_data(run_ws_client, "stopped", "exitCode", -15, 0, "1")80@pytest.mark.asyncio81async def test_bad_code(run_ws_client):82 code = "i'm not valid python"83 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")84 await run_ws_client.send_str(start_cmd)85 await receive_data(run_ws_client, "started", process="1")86 await asyncio.sleep(0.1) # wait for data87 message = await run_ws_client.receive()88 m_type, m_data, m_process = parse_message(message.data)89 assert m_type == "stderr"90 lines = m_data["output"].split("\n")91 assert lines[0].startswith(" File")92 assert lines[1] == " i'm not valid python"93 assert lines[2][-1] == "^"94 assert lines[3] == "SyntaxError: EOL while scanning string literal"95 await wait_for_data(run_ws_client, "stopped", "exitCode", 1, 0, "1")96@pytest.mark.asyncio97async def test_input(run_ws_client):98 code = """s = input()99while "BYE" != s:100 print(["HUH?! SPEAK UP, SONNY!","NO, NOT SINCE 1930"][s.isupper()])101 s = input()"""102 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")103 await run_ws_client.send_str(start_cmd)104 await receive_data(run_ws_client, "started", process="1")105 user_input = create_message("stdin", {"input": "hello\n"}, "1")106 await run_ws_client.send_str(user_input)107 await wait_for_data(108 run_ws_client, "stdout", "output", "HUH?! SPEAK UP, SONNY!\n", 0, "1"109 )110 user_input = create_message("stdin", {"input": "HEY GRANDMA\n"}, "1")111 await run_ws_client.send_str(user_input)112 await wait_for_data(113 run_ws_client, "stdout", "output", "NO, NOT SINCE 1930\n", 0, "1"114 )115 user_input = create_message("stdin", {"input": "BYE\n"}, "1")116 await run_ws_client.send_str(user_input)117 await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")118@pytest.mark.asyncio119async def test_two_clients(run_ws_client, run_ws_client2):120 code = "while True: pass"121 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")122 await run_ws_client.send_str(start_cmd)123 await receive_data(run_ws_client, "started", process="1")124 await run_ws_client2.send_str(start_cmd)125 await receive_data(run_ws_client2, "started", process="1")126 stop_cmd = create_message("stop", None, "1")127 await run_ws_client.send_str(stop_cmd)128 await wait_for_data(run_ws_client, "stopped", "exitCode", -15, 100, "1")129 stop_cmd = create_message("stop", None, "1")130 await run_ws_client2.send_str(stop_cmd)131 await wait_for_data(run_ws_client2, "stopped", "exitCode", -15, 100, "1")132@pytest.mark.asyncio133async def test_out_of_order_commands(run_ws_client):134 # send input135 user_input = create_message("stdin", {"input": "hello\n"}, "1")136 await run_ws_client.send_str(user_input)137 # bad message138 await receive_data(run_ws_client, "error", "message", "Bad message")139 # send stop140 stop_cmd = create_message("stop", None, "1")141 await run_ws_client.send_str(stop_cmd)142 # bad message143 await receive_data(run_ws_client, "error", "message", "Bad message")144 # send start145 code = "while True: pass"146 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")147 await run_ws_client.send_str(start_cmd)148 # started149 await receive_data(run_ws_client, "started", process="1")150 # send start151 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")152 await run_ws_client.send_str(start_cmd)153 # bad message154 await receive_data(run_ws_client, "error", "message", "Bad message")155 # send stop156 stop_cmd = create_message("stop", None, "1")157 await run_ws_client.send_str(stop_cmd)158 # stopped159 await wait_for_data(run_ws_client, "stopped", "exitCode", -15, 0, "1")160 # send stop161 stop_cmd = create_message("stop", None, "1")162 await run_ws_client.send_str(stop_cmd)163 # bad message164 await receive_data(run_ws_client, "error", "message", "Bad message")165@pytest.mark.asyncio166async def test_discard_old_input(run_ws_client):167 code = 'print("hello world")'168 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")169 await run_ws_client.send_str(start_cmd)170 await receive_data(run_ws_client, "started", process="1")171 unterminated_input = create_message("stdin", {"input": "unterminated input"}, "1")172 await run_ws_client.send_str(unterminated_input)173 await wait_for_data(run_ws_client, "stdout", "output", "hello world\n", 100, "1")174 await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")175 code = "print(input())"176 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")177 await run_ws_client.send_str(start_cmd)178 await receive_data(run_ws_client, "started", process="1")179 user_input = create_message("stdin", {"input": "hello\n"}, "1")180 await run_ws_client.send_str(user_input)181 await wait_for_data(run_ws_client, "stdout", "output", "hello\n", 0, "1")...

Full Screen

Full Screen

ConmmunicationInterface.py

Source:ConmmunicationInterface.py Github

copy

Full Screen

1from serial import Serial2import _thread3import serial4import time5class CommunicationInterface:6 def __init__(self,com):7 self.ser = None8 self.com_name = ''9 self.id_mapping = {10 # 格式为[Screen id, component id]11 "minute":[0x00,0x01],12 'second':[0x00,0x02],13 'Tw':[0x00,0x07],14 'Tc':[0x00,0x15],15 "voltage_max":[0x00,0x26],16 'width1_us_0':[0x01,0x4],17 'width1_us_1':[0x01,0x7],18 'width1_us_2':[0x01,0x3B],19 'width2_us_0':[0x01,0x05],20 'width2_us_1':[0x01,0x08],21 'width2_us_2':[0x01,0x0B],22 'phase_us_0':[0x01,0x06],23 'phase_us_1':[0x01,0x09],24 'phase_us_2':[0x01,0x0C],25 'D1':[0x01,0x16],26 'D2':[0x01,0x17],27 'phase2':[0x01,0x18],28 'M':[0x01,0x1E],29 'index':[0x01,0x0D],30 'pulse_source_index':[0x00,0x12],31 'frequency_0':[0x01,0x1A],32 'frequency_1':[0x01,0x1B],33 'frequency_2':[0x01,0x1C],34 'internal':[0x00,0x7A],35 'external':[0x00,0x7B],36 'preset0':[0x01,0x7C],37 'preset1':[0x01,0x7D],38 'preset2':[0x01,0x7E]39 }40 def set_com(self,com_name):41 self.com_name = com_name42 print(f"[INFO] Change to {com_name}")43 def start_com(self):44 try:45 self.ser.close()46 except:47 pass48 if(self.ser is None):49 self.ser=Serial(self.com_name,115200,timeout=0.5)50 else:51 self.ser.open()52 # self.ser.open()53 print("[INFO] serial connected")54 def com_valid(self):55 return self.ser is not None56 def stop_com(self):57 start_cmd = bytes()58 start_cmd += bytes([0xEE,0xB1,0x11])59 start_cmd += bytes([0x00,0x00])60 start_cmd += bytes([0x00,0x69])61 start_cmd += bytes([0x00,0xFF,0xFC,0xFF,0xFF])62 start_cmd += '\n'.encode()63 self.ser.write(start_cmd)64 time.sleep(1)65 self.ser.close()66 print("[INFO] serial disconnected")67 68 def format_data(self,name,value):69 # 转换数据的格式,生成数据帧70 format_result = bytes()71 format_result += bytes([0xEE,0xB1,0x11])72 try:73 format_result += bytes([0x00,self.id_mapping[name][0]])74 format_result += bytes([0x00,self.id_mapping[name][1],0x11])75 format_result += str(value).encode()76 except KeyError:77 print('[ERROR] Variable does not exist, please check it!')78 format_result += bytes([0x00,0xFF,0xFC,0xFF,0xFF])79 format_result += '\n'.encode()80 return format_result81 def start_pwm(self):82 start_cmd = bytes()83 start_cmd += bytes([0xEE,0xB1,0x11])84 start_cmd += bytes([0x00,0x00])85 start_cmd += bytes([0x00,0x08])86 start_cmd += bytes([0x00,0xFF,0xFC,0xFF,0xFF])87 start_cmd += '\n'.encode()88 try:89 self.ser.write(start_cmd)90 print("[INFO] PWM has started")91 except serial.serialutil.PortNotOpenError:92 print("[ERROR] Port has been closed")93 raise AttributeError94 def stop_pwm(self):95 start_cmd = bytes()96 start_cmd += bytes([0xEE,0xB1,0x11])97 start_cmd += bytes([0x00,0x00])98 start_cmd += bytes([0x00,0x09])99 start_cmd += bytes([0x00,0xFF,0xFC,0xFF,0xFF])100 start_cmd += '\n'.encode()101 try:102 self.ser.write(start_cmd)103 print("[INFO] PWM has stopped")104 except serial.serialutil.PortNotOpenError:105 print("[ERROR] Port has been closed")106 raise AttributeError107 def send_data(self,name,value):108 # 将数据格式转化109 try:110 formatted_data = self.format_data(name, value)111 # # 转换为可以人能看的十六进制的数据112 # str_hex = ":".join("{:02x}".format(ord(c)) for c in formatted_data)113 # 打印114 # print(f"Send data {str_hex}")115 # 输出到串口116 self.ser.write(formatted_data)117 print("[INFO] %s has been changed to %s"%(name.capitalize(),value))118 except serial.serialutil.PortNotOpenError:119 print("[ERROR] Port has been closed")120 # raise AttributeError121 def send_number(self,name,value):122 format_result = bytes()123 format_result += bytes([0xEE,0xB1,0x11])124 try:125 format_result += bytes([0x00,self.id_mapping[name][0]])126 format_result += bytes([0x00,self.id_mapping[name][1],0x11])127 format_result += bytes([value])128 except KeyError:129 print('[ERROR] Variable does not exist, please check it!')130 format_result += bytes([0x00,0xFF,0xFC,0xFF,0xFF])131 format_result += '\n'.encode()132 print(format_result)133 self.ser.write(format_result)134 # change trigger mode index135 def change_pulse_index(self,trigger_mode):136 """修改PWM的触发模式137 Args:138 trigger_mode (str): internal或者external139 """140 if(trigger_mode == 'internal'):141 self.send_number("pulse_source_index",0x00)142 print("[INFO] Index change to internal")143 else:144 self.send_number("pulse_source_index",0x01)145 print("[INFO] Index change to external")146 # change preset index147 def change_index(self,index):148 self.send_number("index",0x00+index)149 print(f"[INFO] Pulse Index: {index}")150 @DeprecationWarning151 def change_port(self,value):152 com_name = value.split(' ')[0]153 print(com_name)154 self.set_com(com_name)155 # _thread.start_new_thread(self.get_data_from_serial, (com,))...

Full Screen

Full Screen

test_pihole_scripts.py

Source:test_pihole_scripts.py Github

copy

Full Screen

1import pytest2@pytest.fixture3def start_cmd():4 ''' broken by default, required override '''5 return None6@pytest.fixture7def RunningPiHole(DockerPersist, Slow, persist_webserver, persist_tag, start_cmd):8 ''' Override the RunningPiHole to run and check for success of a9 dnsmasq start based `pihole` script command '''10 #print DockerPersist.run('ps -ef').stdout11 Slow(lambda: DockerPersist.run('pgrep dnsmasq').rc == 0)12 Slow(lambda: DockerPersist.run('pgrep {}'.format(persist_webserver)).rc == 0)13 oldpid = DockerPersist.run('pidof dnsmasq')14 for service in [ 'dnsmasq', 'nginx', ]:15 print DockerPersist.run('service {} status'.format(service))16 cmd = DockerPersist.run('pihole {}'.format(start_cmd))17 Slow(lambda: DockerPersist.run('pgrep dnsmasq').rc == 0)18 newpid = DockerPersist.run('pidof dnsmasq')19 for pid in [oldpid, newpid]:20 assert pid != ''21 # ensure a new pid for dnsmasq appeared due to service restart22 assert oldpid != newpid23 assert cmd.rc == 024 # Save out cmd result to check different stdout of start/enable/disable25 DockerPersist.cmd = cmd26 return DockerPersist27@pytest.mark.parametrize('start_cmd', ['start_cmd'])28def test_pihole_start_cmd(RunningPiHole, start_cmd, persist_tag):29 ''' the start_cmd tests are all built into the RunningPiHole fixture in this file '''30 assert RunningPiHole.cmd.stdout == START_DNS_STDOUT[persist_tag]31@pytest.mark.parametrize('start_cmd,hostname,expected_ip', [32 ('enable', 'pi.hole', '127.0.0.1'),33 ('disable', 'pi.hole', '127.0.0.1'),34])35def test_pihole_start_cmd(RunningPiHole, Dig, persist_tag, start_cmd, hostname, expected_ip):36 ''' the start_cmd tests are all built into the RunningPiHole fixture in this file '''37 dig_cmd = "dig +time=1 +noall +answer {} @test_pihole | awk '{{ print $5 }}'".format(hostname)38 lookup = RunningPiHole.dig.run(dig_cmd).stdout.rstrip('\n')39 assert lookup == expected_ip40 stdout = "::: Blocking has been {}d!\n".format(start_cmd)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Airtest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful