How to use _process_options method in autotest

Best Python code snippet using autotest_python

RecipeParser.py

Source:RecipeParser.py Github

copy

Full Screen

...120 slave = XmlData(slave_tag)121 slave["id"] = self._get_attribute(slave_tag, "id")122 # slave options123 opts_tag = slave_tag.find("options")124 opts = self._process_options(opts_tag)125 if len(opts) > 0:126 slave["options"] = opts127 iface["slaves"].append(slave)128 # interface options129 opts_tag = iface_tag.find("options")130 opts = self._process_options(opts_tag)131 if len(opts) > 0:132 iface["options"] = opts133 elif iface["type"] in ["vti", "vti6"]:134 # interface options135 opts_tag = iface_tag.find("options")136 opts = self._process_options(opts_tag)137 iface["options"] = opts138 elif iface["type"] in ["vlan"]:139 # real_dev of the VLAN interface140 slaves_tag = iface_tag.find("slaves")141 if slaves_tag is None or len(slaves_tag) != 1:142 msg = "VLAN '%s' need exactly one slave definition."\143 % iface["id"]144 raise RecipeError(msg, iface_tag)145 iface["slaves"] = XmlCollection(slaves_tag)146 slave_tag = slaves_tag[0]147 slave = XmlData(slave_tag)148 slave["id"] = self._get_attribute(slave_tag, "id")149 iface["slaves"].append(slave)150 # interface options151 opts_tag = iface_tag.find("options")152 opts = self._process_options(opts_tag)153 if len(opts) > 0:154 iface["options"] = opts155 elif iface["type"] in ["vxlan", "gre", "ipip"]:156 # real_dev of the interface157 slaves_tag = iface_tag.find("slaves")158 if slaves_tag is not None and len(slaves_tag) > 1:159 msg = "%s '%s' needs one or no slave definition."\160 % (iface["type"], iface["id"])161 raise RecipeError(msg, iface_tag)162 if slaves_tag:163 iface["slaves"] = XmlCollection(slaves_tag)164 slave_tag = slaves_tag[0]165 slave = XmlData(slave_tag)166 slave["id"] = self._get_attribute(slave_tag, "id")167 iface["slaves"].append(slave)168 # interface options169 opts_tag = iface_tag.find("options")170 opts = self._process_options(opts_tag)171 if len(opts) > 0:172 iface["options"] = opts173 elif iface["type"] == "ovs_bridge":174 slaves_tag = iface_tag.find("slaves")175 iface["slaves"] = XmlCollection(slaves_tag)176 ovsb_slaves = []177 iface["ovs_conf"] = XmlData(slaves_tag)178 if slaves_tag is not None:179 for slave_tag in slaves_tag:180 slave = XmlData(slave_tag)181 slave["id"] = str(self._get_attribute(slave_tag, "id"))182 ovsb_slaves.append(slave["id"])183 opts_tag = slave_tag.find("options")184 opts = self._process_options(opts_tag)185 slave["options"] = opts186 iface["slaves"].append(slave)187 vlan_elems = iface_tag.findall("vlan")188 vlans = iface["ovs_conf"]["vlans"] = XmlData(slaves_tag)189 for vlan in vlan_elems:190 vlan_tag = str(self._get_attribute(vlan, "tag"))191 if vlan_tag in vlans:192 msg = "VLAN '%s' already defined for "\193 "this ovs_bridge." % vlan_tag194 raise RecipeError(msg, vlan)195 vlans[vlan_tag] = XmlData(vlan)196 vlans[vlan_tag]["slaves"] = XmlCollection(vlan)197 vlan_slaves = vlans[vlan_tag]["slaves"]198 slaves_tag = vlan.find("slaves")199 for slave_tag in slaves_tag:200 slave_id = str(self._get_attribute(slave_tag, "id"))201 if slave_id not in ovsb_slaves:202 msg = "No port with id '%s' defined for "\203 "this ovs_bridge." % slave_id204 raise RecipeError(msg, slave_tag)205 if slave_id in vlan_slaves:206 msg = "Port '%s' already a member of vlan %s"\207 % (slave_id, vlan_tag)208 raise RecipeError(msg, slave_tag)209 else:210 vlan_slaves.append(slave_id)211 bonded_slaves = {}212 bond_elems = iface_tag.findall("bond")213 bonds = iface["ovs_conf"]["bonds"] = XmlData(slaves_tag)214 for bond_tag in bond_elems:215 bond_id = str(self._get_attribute(bond_tag, "id"))216 if bond_id in bonds:217 msg = "Bond with id '%s' already defined for "\218 "this ovs_bridge." % bond_id219 raise RecipeError(msg, bond_tag)220 bonds[bond_id] = XmlData(bond_tag)221 bond_slaves = bonds[bond_id]["slaves"] = XmlCollection(bond_tag)222 slaves_tag = bond_tag.find("slaves")223 for slave_tag in slaves_tag:224 slave_id = str(self._get_attribute(slave_tag, "id"))225 if slave_id not in ovsb_slaves:226 msg = "No port with id '%s' defined for "\227 "this ovs_bridge." % slave_id228 raise RecipeError(msg, slave_tag)229 if slave_id in bonded_slaves:230 msg = "Port with id '%s' already in bond with id '%s'"\231 % (slave_id, bonded_slaves[slave_id])232 raise RecipeError(msg, slave_tag)233 else:234 bonded_slaves[slave_id] = bond_id235 bond_slaves.append(slave_id)236 opts_tag = bond_tag.find("options")237 opts = self._process_options(opts_tag)238 if len(opts) > 0:239 bonds[bond_id]["options"] = opts240 unique_ids = []241 tunnels = iface["ovs_conf"]["tunnels"] = XmlCollection(slaves_tag)242 tunnel_elems = iface_tag.findall("tunnel")243 for tunnel_elem in tunnel_elems:244 tunnels.append(XmlData(tunnel_elem))245 tunnel = tunnels[-1]246 tunnel["id"] = str(self._get_attribute(tunnel_elem, "id"))247 if tunnel["id"] in unique_ids:248 msg = "Tunnel with id '%s' already defined for "\249 "this ovs_bridge." % tunnel["id"]250 raise RecipeError(msg, tunnel_elem)251 else:252 unique_ids.append(tunnel["id"])253 t = str(self._get_attribute(tunnel_elem, "type"))254 tunnel["type"] = t255 opts_elem = tunnel_elem.find("options")256 opts = self._process_options(opts_elem)257 if len(opts) > 0:258 tunnel["options"] = opts259 # addresses260 addresses_tag = tunnel_elem.find("addresses")261 addrs = self._process_addresses(addresses_tag)262 tunnel["addresses"] = addrs263 iface["ovs_conf"]["internals"] = XmlCollection(slaves_tag)264 internals = iface["ovs_conf"]["internals"]265 internal_elems = iface_tag.findall("internal")266 for internal_elem in internal_elems:267 internals.append(XmlData(internal_elem))268 internal = internals[-1]269 internal["id"] = str(self._get_attribute(internal_elem, "id"))270 if internal["id"] in unique_ids:271 msg = "Internal id '%s' already defined for "\272 "this ovs_bridge." % internal["id"]273 raise RecipeError(msg, internal_elem)274 else:275 unique_ids.append(internal["id"])276 opts_elem = internal_elem.find("options")277 opts = self._process_options(opts_elem)278 if len(opts) > 0:279 internal["options"] = opts280 # addresses281 addresses_tag = internal_elem.find("addresses")282 addrs = self._process_addresses(addresses_tag)283 internal["addresses"] = addrs284 iface["ovs_conf"]["flow_entries"] = XmlCollection(slaves_tag)285 flow_entries = iface["ovs_conf"]["flow_entries"]286 flow_elems = iface_tag.findall("flow_entries")287 if len(flow_elems) == 1:288 entries = flow_elems[0].findall("entry")289 for entry in entries:290 if self._has_attribute(entry, "value"):291 flow_entries.append(self._get_attribute(entry,292 "value"))293 else:294 flow_entries.append(self._get_content(entry))295 return [iface]296 def _process_addresses(self, addresses_tag):297 addresses = XmlCollection(addresses_tag)298 if addresses_tag is not None and len(addresses_tag) > 0:299 for addr_tag in addresses_tag:300 if self._has_attribute(addr_tag, "value"):301 addr = self._get_attribute(addr_tag, "value")302 else:303 addr = self._get_content(addr_tag)304 addresses.append(addr)305 return addresses306 def _process_options(self, opts_tag):307 options = XmlCollection(opts_tag)308 if opts_tag is not None:309 for opt_tag in opts_tag:310 opt = XmlData(opt_tag)311 opt["name"] = self._get_attribute(opt_tag, "name")312 if self._has_attribute(opt_tag, "value"):313 opt["value"] = self._get_attribute(opt_tag, "value")314 else:315 opt["value"] = self._get_content(opt_tag)316 options.append(opt)317 return options318 def _validate_netem(self, options, netem_op, netem_tag):319 if netem_op == "delay":320 valid = False321 jitter = False322 correlation = False323 distribution = False324 valid_distributions = ["normal", "uniform", "pareto", "paretonormal"]325 for opt in options:326 if "time" in opt.values():327 valid = True328 elif "distribution" in opt.values():329 if opt["value"] not in valid_distributions:330 raise RecipeError("netem: invalid distribution type", netem_tag)331 else:332 distribution = True333 elif "jitter" in opt.values():334 jitter = True335 elif "correlation" in opt.values():336 correlation = True337 if not jitter:338 if correlation or distribution:339 raise RecipeError("netem: jitter option is mandatory when using <correlation> or <distribution>", netem_tag)340 if not valid:341 raise RecipeError("netem: time option is mandatory for <delay>", netem_tag)342 elif netem_op == "loss":343 for opt in options:344 if "percent" in opt.values():345 return346 raise RecipeError("netem: percent option is mandatory for <loss>", netem_tag)347 elif netem_op == "duplication":348 for opt in options:349 if "percent" in opt.values():350 return351 raise RecipeError("netem: percent option is mandatory for <duplication>", netem_tag)352 elif netem_op == "corrupt":353 for opt in options:354 if "percent" in opt.values():355 return356 raise RecipeError("netem: percent option is mandatory for <corrupt>", netem_tag)357 elif netem_op == "reordering":358 for opt in options:359 if "percent" in opt.values():360 return361 raise RecipeError("netem: percent option is mandatory for <reordering>", netem_tag)362 def _process_netem(self, netem_tag):363 interface = XmlData(netem_tag)364 # params365 for netem_op in ["delay", "loss", "duplication", "corrupt", "reordering"]:366 netem_op_tag = netem_tag.find(netem_op)367 if netem_op_tag is not None:368 options_tag = netem_op_tag.find("options")369 options = self._process_options(options_tag)370 if len(options) > 0:371 self._validate_netem(options, netem_op, netem_tag)372 interface[netem_op] = options373 return interface374 def _process_task(self, task_tag):375 task = XmlData(task_tag)376 if self._has_attribute(task_tag, "quit_on_fail"):377 task["quit_on_fail"] = self._get_attribute(task_tag, "quit_on_fail")378 if self._has_attribute(task_tag, "module_dir"):379 base_dir = os.path.dirname(task_tag.attrib["__file"])380 dir_path = str(self._get_attribute(task_tag, "module_dir"))381 exp_path = os.path.expanduser(dir_path)382 abs_path = os.path.join(base_dir, exp_path)383 norm_path = os.path.normpath(abs_path)384 task["module_dir"] = norm_path385 if self._has_attribute(task_tag, "tools_dir"):386 base_dir = os.path.dirname(task_tag.attrib["__file"])387 dir_path = str(self._get_attribute(task_tag, "tools_dir"))388 exp_path = os.path.expanduser(dir_path)389 abs_path = os.path.join(base_dir, exp_path)390 norm_path = os.path.normpath(abs_path)391 task["tools_dir"] = norm_path392 if self._has_attribute(task_tag, "python"):393 task["python"] = self._get_attribute(task_tag, "python")394 return task395 if len(task_tag) > 0:396 task["commands"] = XmlCollection(task_tag)397 for cmd_tag in task_tag:398 if cmd_tag.tag == "run":399 cmd = self._process_run_cmd(cmd_tag)400 elif cmd_tag.tag == "config":401 cmd = self._process_config_cmd(cmd_tag)402 elif cmd_tag.tag == "ctl_wait":403 cmd = self._process_ctl_wait_cmd(cmd_tag)404 elif cmd_tag.tag in ["wait", "intr", "kill"]:405 cmd = self._process_signal_cmd(cmd_tag)406 else:407 msg = "Unknown command '%s'." % cmd_tag.tag408 raise RecipeError(msg, cmd_tag)409 task["commands"].append(cmd)410 return task411 def _process_run_cmd(self, cmd_tag):412 cmd = XmlData(cmd_tag)413 cmd["host"] = self._get_attribute(cmd_tag, "host")414 cmd["netns"] = None415 if self._has_attribute(cmd_tag, "netns"):416 cmd["netns"] = self._get_attribute(cmd_tag, "netns")417 has_module = self._has_attribute(cmd_tag, "module")418 has_command = self._has_attribute(cmd_tag, "command")419 has_from = self._has_attribute(cmd_tag, "from")420 if (has_module and has_command) or (has_module and has_from):421 msg = "Invalid combination of attributes."422 raise RecipeError(msg, cmd)423 if has_module:424 cmd["type"] = "test"425 cmd["module"] = self._get_attribute(cmd_tag, "module")426 # options427 opts_tag = cmd_tag.find("options")428 opts = self._process_options(opts_tag)429 if len(opts) > 0:430 cmd["options"] = opts431 elif has_command:432 cmd["type"] = "exec"433 cmd["command"] = self._get_attribute(cmd_tag, "command")434 if self._has_attribute(cmd_tag, "from"):435 cmd["from"] = self._get_attribute(cmd_tag, "from")436 if self._has_attribute(cmd_tag, "bg_id"):437 cmd["bg_id"] = self._get_attribute(cmd_tag, "bg_id")438 if self._has_attribute(cmd_tag, "timeout"):439 cmd["timeout"] = self._get_attribute(cmd_tag, "timeout")440 if self._has_attribute(cmd_tag, "expect"):441 cmd["expect"] = self._get_attribute(cmd_tag, "expect")442 return cmd443 def _process_config_cmd(self, cmd_tag):444 cmd = XmlData(cmd_tag)445 cmd["type"] = "config"446 cmd["host"] = self._get_attribute(cmd_tag, "host")447 cmd["netns"] = None448 if self._has_attribute(cmd_tag, "netns"):449 cmd["netns"] = self._get_attribute(cmd_tag, "netns")450 if self._has_attribute(cmd_tag, "persistent"):451 cmd["persistent"] = self._get_attribute(cmd_tag, "persistent")452 # inline option453 if self._has_attribute(cmd_tag, "option"):454 cmd["options"] = XmlCollection(cmd_tag)455 if self._has_attribute(cmd_tag, "value"):456 opt = XmlData(cmd_tag)457 opt["name"] = self._get_attribute(cmd_tag, "option")458 opt["value"] = self._get_attribute(cmd_tag, "value")459 cmd["options"] = XmlCollection(cmd_tag)460 cmd["options"].append(opt)461 else:462 raise RecipeError("Missing option value.", cmd)463 else:464 # options465 opts_tag = cmd_tag.find("options")466 opts = self._process_options(opts_tag)467 if len(opts) > 0:468 cmd["options"] = opts469 return cmd470 def _process_ctl_wait_cmd(self, cmd_tag):471 cmd = XmlData(cmd_tag)472 cmd["type"] = "ctl_wait"473 cmd["seconds"] = self._get_attribute(cmd_tag, "seconds")474 return cmd475 def _process_signal_cmd(self, cmd_tag):476 cmd = XmlData(cmd_tag)477 cmd["type"] = cmd_tag.tag478 cmd["host"] = self._get_attribute(cmd_tag, "host")479 cmd["bg_id"] = self._get_attribute(cmd_tag, "bg_id")480 cmd["netns"] = None...

Full Screen

Full Screen

app.py

Source:app.py Github

copy

Full Screen

...58 if user_input == '':59 return argument.default_value60 elif re.fullmatch(argument.pattern, user_input):61 return argument.cast(user_input)62def _process_options(options: List[Option]) -> None:63 print()64 for option in options:65 print(f'{option.key}) {option.description}')66 user_input = input('> ')67 dictionary = {option.key: option for option in options}68 chosen_option = dictionary.get(user_input)69 if isinstance(chosen_option, ChoiceOption):70 _process_options(chosen_option.suboptions)71 elif isinstance(chosen_option, ActionOption):72 if chosen_option.arguments:73 arguments = []74 for argument in chosen_option.arguments:75 arguments.append(_request_argument(argument))76 chosen_option.action(*arguments)77 else:78 chosen_option.action()79 else:80 _process_options(options)81@constants.add_parameters82def default() -> None:83 print_service.print_subheading('Please choose an option...')84 _process_options(all_options)85@argh.named('backup')86@constants.add_parameters87def backup() -> None:88 backup_service.start_backup()89def main() -> None:90 print_service.print_heading('sensor-am2302-history-client')91 parser = ArgumentParser()92 argh.add_commands(parser, [backup])93 argh.set_default_command(parser, default)94 argh.dispatch(parser)95if __name__ == '__main__':...

Full Screen

Full Screen

list.py

Source:list.py Github

copy

Full Screen

...15 return result.index(options[index])16 except ValueError:17 pass18 raise ValueError19 def _process_options(options_list):20 deferred = []21 connected = False22 for options in options_list:23 for i, option in enumerate(options):24 # option already processed25 if option in result:26 continue27 try:28 # scan upwards and insert after29 index = _scan(options, i, -1)30 result.insert(index + 1, option)31 connected = True32 except ValueError:33 try:34 # scan downwards and insert before35 index = _scan(options, i, 1)36 result.insert(index, option)37 connected = True38 except ValueError:39 # not able to connect40 if result:41 deferred.append(options)42 else:43 result.extend(options)44 break45 if deferred:46 if connected:47 # process deferred options48 _process_options(deferred)49 else:50 # done, simply add remaining items to bottom51 for options in deferred:52 for option in options:53 if option not in result:54 result.append(option)55 # collect result56 _process_options(options_list)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful