How to use test_start_app method in Airtest

...29 if not resp.startswith('\n00'):30 logger.warning('AT command response: {}'.format(resp))31 try:33 bt.test_start_app(app)34 except SmartBasicException as sb:35 app_startup_error_handle(bt, sb, bl654_hex, app, folder)36 bt.test_start_app(app)37def strip_extra_characters(str):38 str = re.sub('\d+\t', "", str)39 str = re.sub('[\r"0"]', "", str)40 return str41def generic_handler(cmd, err):42 pass43def str_to_bytes(my_str):44 return bytes(my_str, "ascii")45READ_DIR = b"at+dir \r\n"46RESET = b'atz \r\n'47FILE_CLOSE = b'at+fcl \r\n'48SB_VERSION = b"ati 13 \r\n"49SB_FIRMWARE = b"ati 3 \r\n"50def try_decode_ret(response):51 logger.debug(response)52 try:53 str_res = response.decode("ascii").strip("\n")54 logger.debug(str_res)55 return str_res.split("\t")[-1].strip("\r")56 except Exception as e:57 logger.error("response handling error {} ".format(e))58class SmartBasicException(Exception):59 def __init__(self, message):60 super().__init__(message)61class BTManager():62 def __enter__(self):63 return self64 def __init__(self, port):65"Running with port: {}".format(port)))66 self.sp = serial.Serial(port,67 115200,68 timeout=1,69 parity=serial.PARITY_NONE,70 rtscts=1)71 self.sp.send_break(duration=0.020)72 self.sp.reset_input_buffer()73"port initialized")74 def __exit__(self, type, value, traceback):75 self.sp.close()76 def get_sb_version(self):77 res = self.write_bytes(SB_VERSION)78 self.sp.read_until(b"\r")79 return try_decode_ret(res).strip(" ")80 def get_sb_firmware(self):81 res = self.write_bytes(SB_FIRMWARE)82 self.sp.read_until(b"\r")83 return try_decode_ret(res)84 def write_str(self, cmd):85 return self.write_bytes(str_to_bytes(cmd))86 def write_bytes(self, bcmd):87 self.sp.write(bcmd)88 self.sp.flush()89 return self.sp.read_until(b'\r', 100)90 def send_single_cmd(self, cmd, meta, handler):91 cmd_line = cmd + ' ' + meta + '\r'92 logger.debug(("sending single command {}".format(repr(cmd_line))))93 res = self.write_str(cmd_line)94 if b'01\t' not in res:95 return (True, "")96 else:97 err_string = handler(cmd, res)98 return (False, err_string)99 def reset(self):100 return self.write_bytes(RESET)101 def read_dir(self):102 self.sp.write(READ_DIR)103 self.sp.flush()104 res = self.sp.read_until('00\r')105"res {}".format(repr(res)))106 if b'\n01' in res:107 return (False, res)108 #read again109 self.sp.read_until(b"\r", 10)110 logger.debug(res)111 res_str = res.decode('utf-8')112 res = res_str.split('\n')113 res = list(map(strip_extra_characters, res))114 return (True, list([x for x in res if x != '']))115 def del_file(self, name):116 return self.send_single_cmd('at+del', '"{}"'.format(name),117 generic_handler)118 def load_file(self, name, file):119"file to load {} with name {}".format(file, name)))120 try:121 with open(file, 'rb') as f:122 self.write_bytes(RESET)123 fowcmd = 'at+fow \"' + name + '\"\r\n'124 res = self.write_str(fowcmd)125"open file response: {}".format(repr(res))))126 if b'01\t' in res:127 return False128 for block in iter(partial(, 100), b''):129 str_block = block.hex()130 strblock = 'at+fwrh \"' + str_block + '\"\r\n'131 logger.debug(("write chunk {}".format(repr(str_block))))132 res = self.write_str(strblock)133 logger.debug(("write chunk response {}".format(repr(res))))134 if b'01\t' in res:135 return False136 res = self.write_bytes(FILE_CLOSE)137 logger.debug(res)138 return res139 except IOError as e:140 logger.error(e)141 def at_command(self, cmd, timeout=2):142"at cmd:{}".format(cmd)))143 cmd = cmd + "\r"144 self.sp.timeout = timeout145 self.sp.write(str_to_bytes(cmd))146 self.sp.flush()147 res = self.sp.read_until('\r'.encode())148 logger.debug(("response raw:{}".format(repr(res))))149 return res.decode("utf-8")150 def test_start_app(self, cmd):151"starting app:{}".format(cmd)))152 res = self.at_command('at+run \"' + cmd + "\"")153'App start response: {}'.format(res))154 if not res.startswith("\n01"):155 return156 err = res.split("\t")[1].strip("\r")157 raise SmartBasicException(err)158def find_app_file(path, app, hex):159 search = path + app + "*"160 logger.debug("searching {}".format(search))161 files = glob.glob(search)162 logger.debug("found files {}".format(files))163 for file in files:164 if hex in file:...

1import os.path2import time3import unittest4import vodka5import vodka.plugins6import vodka.instance7@vodka.plugin.register("test_start_plugin_a")8class TimedPlugin(vodka.plugins.TimedPlugin):9 def init(self):10 self.counter = 011 self.setup_done = False12 def setup(self):13 self.setup_done = True14 def work(self):15 self.counter += 116HOME = os.path.join(os.path.dirname(__file__), "resources", "test_start_app")17CONFIG = {18 # applications for this test be loaded from resources/appdir/application.py19 "apps": {20 "test_start_app": {"home": HOME, "enabled": True},21 "test_start_app_inactive": {22 "home": HOME,23 "module": "test_start_app.application",24 "enabled": False,25 },26 },27 "plugins": [28 {29 "type": "test_start_plugin_a",30 "name": "test_start_plugin_a",31 "interval": 0.1,32 "async": "thread",33 },34 {35 "type": "test_start_plugin_a",36 "name": "test_start_plugin_b",37 "interval": 0.1,38 "start_manual": True,39 },40 ],41}42class TestStart(unittest.TestCase):43 def test_init_and_start(self):44 r = vodka.init(CONFIG, CONFIG)45 # make sure plugin workers were assigned accordingly46 self.assertEqual(47 r,48 {49 "asyncio_workers": [],50 "gevent_workers": [],51 "thread_workers": [vodka.plugin.get_instance("test_start_plugin_a")],52 },53 )54 # make sure app was instantiated55 app = vodka.instance.get_instance("test_start_app")56 self.assertEqual(app.setup_done, True)57 # make sure inactive app was not instantiated58 with self.assertRaises(KeyError):59 vodka.instance.get_instance("test_start_app_inactive")60 # make sure skipped app was not instantiated61 with self.assertRaises(KeyError):62 vodka.instance.get_instance("test_start_app_skipped")63 # make sure plugin was setup64 plugin = vodka.plugin.get_instance("test_start_plugin_a")65 self.assertEqual(plugin.setup_done, True)66 vodka.start(**r)67 # give some time for startup to complete68 time.sleep(0.25)69 # make sure plugin is running70 self.assertEqual(plugin.run_level, 1)71 # make sure manually started plugin isnt running72 plugin = vodka.plugin.get_instance("test_start_plugin_b")...

