Best Python code snippet using autotest_python
RecipeParser.py
Source:RecipeParser.py  
...120                    slave = XmlData(slave_tag)121                    slave["id"] = self._get_attribute(slave_tag, "id")122                    # slave options123                    opts_tag = slave_tag.find("options")124                    opts = self._process_options(opts_tag)125                    if len(opts) > 0:126                        slave["options"] = opts127                    iface["slaves"].append(slave)128            # interface options129            opts_tag = iface_tag.find("options")130            opts = self._process_options(opts_tag)131            if len(opts) > 0:132                iface["options"] = opts133        elif iface["type"] in ["vti", "vti6"]:134            # interface options135            opts_tag = iface_tag.find("options")136            opts = self._process_options(opts_tag)137            iface["options"] = opts138        elif iface["type"] in ["vlan"]:139            # real_dev of the VLAN interface140            slaves_tag = iface_tag.find("slaves")141            if slaves_tag is None or len(slaves_tag) != 1:142                msg = "VLAN '%s' need exactly one slave definition."\143                        % iface["id"]144                raise RecipeError(msg, iface_tag)145            iface["slaves"] = XmlCollection(slaves_tag)146            slave_tag = slaves_tag[0]147            slave = XmlData(slave_tag)148            slave["id"] = self._get_attribute(slave_tag, "id")149            iface["slaves"].append(slave)150            # interface options151            opts_tag = iface_tag.find("options")152            opts = self._process_options(opts_tag)153            if len(opts) > 0:154                iface["options"] = opts155        elif iface["type"] in ["vxlan", "gre", "ipip"]:156            # real_dev of the interface157            slaves_tag = iface_tag.find("slaves")158            if slaves_tag is not  None and len(slaves_tag) > 1:159                msg = "%s '%s' needs one or no slave definition."\160                        % (iface["type"], iface["id"])161                raise RecipeError(msg, iface_tag)162            if slaves_tag:163                iface["slaves"] = XmlCollection(slaves_tag)164                slave_tag = slaves_tag[0]165                slave = XmlData(slave_tag)166                slave["id"] = self._get_attribute(slave_tag, "id")167                iface["slaves"].append(slave)168            # interface options169            opts_tag = iface_tag.find("options")170            opts = self._process_options(opts_tag)171            if len(opts) > 0:172                iface["options"] = opts173        elif iface["type"] == "ovs_bridge":174            slaves_tag = iface_tag.find("slaves")175            iface["slaves"] = XmlCollection(slaves_tag)176            ovsb_slaves = []177            iface["ovs_conf"] = XmlData(slaves_tag)178            if slaves_tag is not None:179                for slave_tag in slaves_tag:180                    slave = XmlData(slave_tag)181                    slave["id"] = str(self._get_attribute(slave_tag, "id"))182                    ovsb_slaves.append(slave["id"])183                    opts_tag = slave_tag.find("options")184                    opts = self._process_options(opts_tag)185                    slave["options"] = opts186                    iface["slaves"].append(slave)187            vlan_elems = iface_tag.findall("vlan")188            vlans = iface["ovs_conf"]["vlans"] = XmlData(slaves_tag)189            for vlan in vlan_elems:190                vlan_tag = str(self._get_attribute(vlan, "tag"))191                if vlan_tag in vlans:192                    msg = "VLAN '%s' already defined for "\193                          "this ovs_bridge." % vlan_tag194                    raise RecipeError(msg, vlan)195                vlans[vlan_tag] = XmlData(vlan)196                vlans[vlan_tag]["slaves"] = XmlCollection(vlan)197                vlan_slaves = vlans[vlan_tag]["slaves"]198                slaves_tag = vlan.find("slaves")199                for slave_tag in slaves_tag:200                    slave_id = str(self._get_attribute(slave_tag, "id"))201                    if slave_id not in ovsb_slaves:202                        msg = "No port with id '%s' defined for "\203                              "this ovs_bridge." % slave_id204                        raise RecipeError(msg, slave_tag)205                    if slave_id in vlan_slaves:206                        msg = "Port '%s' already a member of vlan %s"\207                              % (slave_id, vlan_tag)208                        raise RecipeError(msg, slave_tag)209                    else:210                        vlan_slaves.append(slave_id)211            bonded_slaves = {}212            bond_elems = iface_tag.findall("bond")213            bonds = iface["ovs_conf"]["bonds"] = XmlData(slaves_tag)214            for bond_tag in bond_elems:215                bond_id = str(self._get_attribute(bond_tag, "id"))216                if bond_id in bonds:217                    msg = "Bond with id '%s' already defined for "\218                          "this ovs_bridge." % bond_id219                    raise RecipeError(msg, bond_tag)220                bonds[bond_id] = XmlData(bond_tag)221                bond_slaves = bonds[bond_id]["slaves"] = XmlCollection(bond_tag)222                slaves_tag = bond_tag.find("slaves")223                for slave_tag in slaves_tag:224                    slave_id = str(self._get_attribute(slave_tag, "id"))225                    if slave_id not in ovsb_slaves:226                        msg = "No port with id '%s' defined for "\227                              "this ovs_bridge." % slave_id228                        raise RecipeError(msg, slave_tag)229                    if slave_id in bonded_slaves:230                        msg = "Port with id '%s' already in bond with id '%s'"\231                              % (slave_id, bonded_slaves[slave_id])232                        raise RecipeError(msg, slave_tag)233                    else:234                        bonded_slaves[slave_id] = bond_id235                    bond_slaves.append(slave_id)236                opts_tag = bond_tag.find("options")237                opts = self._process_options(opts_tag)238                if len(opts) > 0:239                    bonds[bond_id]["options"] = opts240            unique_ids = []241            tunnels = iface["ovs_conf"]["tunnels"] = XmlCollection(slaves_tag)242            tunnel_elems = iface_tag.findall("tunnel")243            for tunnel_elem in tunnel_elems:244                tunnels.append(XmlData(tunnel_elem))245                tunnel = tunnels[-1]246                tunnel["id"] = str(self._get_attribute(tunnel_elem, "id"))247                if tunnel["id"] in unique_ids:248                    msg = "Tunnel with id '%s' already defined for "\249                          "this ovs_bridge." % tunnel["id"]250                    raise RecipeError(msg, tunnel_elem)251                else:252                    unique_ids.append(tunnel["id"])253                t = str(self._get_attribute(tunnel_elem, "type"))254                tunnel["type"] = t255                opts_elem = tunnel_elem.find("options")256                opts = self._process_options(opts_elem)257                if len(opts) > 0:258                    tunnel["options"] = opts259                # addresses260                addresses_tag = tunnel_elem.find("addresses")261                addrs = self._process_addresses(addresses_tag)262                tunnel["addresses"] = addrs263            iface["ovs_conf"]["internals"] = XmlCollection(slaves_tag)264            internals = iface["ovs_conf"]["internals"]265            internal_elems = iface_tag.findall("internal")266            for internal_elem in internal_elems:267                internals.append(XmlData(internal_elem))268                internal = internals[-1]269                internal["id"] = str(self._get_attribute(internal_elem, "id"))270                if internal["id"] in unique_ids:271                    msg = "Internal id '%s' already defined for "\272                          "this ovs_bridge." % internal["id"]273                    raise RecipeError(msg, internal_elem)274                else:275                    unique_ids.append(internal["id"])276                opts_elem = internal_elem.find("options")277                opts = self._process_options(opts_elem)278                if len(opts) > 0:279                    internal["options"] = opts280                # addresses281                addresses_tag = internal_elem.find("addresses")282                addrs = self._process_addresses(addresses_tag)283                internal["addresses"] = addrs284            iface["ovs_conf"]["flow_entries"] = XmlCollection(slaves_tag)285            flow_entries = iface["ovs_conf"]["flow_entries"]286            flow_elems = iface_tag.findall("flow_entries")287            if len(flow_elems) == 1:288                entries = flow_elems[0].findall("entry")289                for entry in entries:290                    if self._has_attribute(entry, "value"):291                        flow_entries.append(self._get_attribute(entry,292                                                                "value"))293                    else:294                        flow_entries.append(self._get_content(entry))295        return [iface]296    def _process_addresses(self, addresses_tag):297        addresses = XmlCollection(addresses_tag)298        if addresses_tag is not None and len(addresses_tag) > 0:299            for addr_tag in addresses_tag:300                if self._has_attribute(addr_tag, "value"):301                    addr = self._get_attribute(addr_tag, "value")302                else:303                    addr = self._get_content(addr_tag)304                addresses.append(addr)305        return addresses306    def _process_options(self, opts_tag):307        options = XmlCollection(opts_tag)308        if opts_tag is not None:309            for opt_tag in opts_tag:310                opt = XmlData(opt_tag)311                opt["name"] = self._get_attribute(opt_tag, "name")312                if self._has_attribute(opt_tag, "value"):313                    opt["value"] = self._get_attribute(opt_tag, "value")314                else:315                    opt["value"] = self._get_content(opt_tag)316                options.append(opt)317        return options318    def _validate_netem(self, options, netem_op, netem_tag):319        if netem_op == "delay":320            valid = False321            jitter = False322            correlation = False323            distribution = False324            valid_distributions = ["normal", "uniform", "pareto", "paretonormal"]325            for opt in options:326                if "time" in opt.values():327                    valid = True328                elif "distribution" in opt.values():329                    if opt["value"] not in valid_distributions:330                        raise RecipeError("netem: invalid distribution type", netem_tag)331                    else:332                        distribution = True333                elif "jitter" in opt.values():334                    jitter =  True335                elif "correlation" in opt.values():336                    correlation = True337            if not jitter:338                if correlation or distribution:339                    raise RecipeError("netem: jitter option is mandatory when using <correlation> or <distribution>", netem_tag)340            if not valid:341                raise RecipeError("netem: time option is mandatory for <delay>", netem_tag)342        elif netem_op == "loss":343            for opt in options:344                if "percent" in opt.values():345                    return346            raise RecipeError("netem: percent option is mandatory for <loss>", netem_tag)347        elif netem_op == "duplication":348            for opt in options:349                if "percent" in opt.values():350                    return351            raise RecipeError("netem: percent option is mandatory for <duplication>", netem_tag)352        elif netem_op == "corrupt":353            for opt in options:354                if "percent" in opt.values():355                    return356            raise RecipeError("netem: percent option is mandatory for <corrupt>", netem_tag)357        elif netem_op == "reordering":358            for opt in options:359                if "percent" in opt.values():360                    return361            raise RecipeError("netem: percent option is mandatory for <reordering>", netem_tag)362    def _process_netem(self, netem_tag):363        interface = XmlData(netem_tag)364        # params365        for netem_op in ["delay", "loss", "duplication", "corrupt", "reordering"]:366            netem_op_tag = netem_tag.find(netem_op)367            if netem_op_tag is not None:368                options_tag = netem_op_tag.find("options")369                options = self._process_options(options_tag)370                if len(options) > 0:371                    self._validate_netem(options, netem_op, netem_tag)372                    interface[netem_op] = options373        return interface374    def _process_task(self, task_tag):375        task = XmlData(task_tag)376        if self._has_attribute(task_tag, "quit_on_fail"):377            task["quit_on_fail"] = self._get_attribute(task_tag, "quit_on_fail")378        if self._has_attribute(task_tag, "module_dir"):379            base_dir = os.path.dirname(task_tag.attrib["__file"])380            dir_path = str(self._get_attribute(task_tag, "module_dir"))381            exp_path = os.path.expanduser(dir_path)382            abs_path = os.path.join(base_dir, exp_path)383            norm_path = os.path.normpath(abs_path)384            task["module_dir"] = norm_path385        if self._has_attribute(task_tag, "tools_dir"):386            base_dir = os.path.dirname(task_tag.attrib["__file"])387            dir_path = str(self._get_attribute(task_tag, "tools_dir"))388            exp_path = os.path.expanduser(dir_path)389            abs_path = os.path.join(base_dir, exp_path)390            norm_path = os.path.normpath(abs_path)391            task["tools_dir"] = norm_path392        if self._has_attribute(task_tag, "python"):393            task["python"] = self._get_attribute(task_tag, "python")394            return task395        if len(task_tag) > 0:396            task["commands"] = XmlCollection(task_tag)397            for cmd_tag in task_tag:398                if cmd_tag.tag == "run":399                    cmd = self._process_run_cmd(cmd_tag)400                elif cmd_tag.tag == "config":401                    cmd = self._process_config_cmd(cmd_tag)402                elif cmd_tag.tag == "ctl_wait":403                    cmd = self._process_ctl_wait_cmd(cmd_tag)404                elif cmd_tag.tag in ["wait", "intr", "kill"]:405                    cmd = self._process_signal_cmd(cmd_tag)406                else:407                    msg = "Unknown command '%s'." % cmd_tag.tag408                    raise RecipeError(msg, cmd_tag)409                task["commands"].append(cmd)410        return task411    def _process_run_cmd(self, cmd_tag):412        cmd = XmlData(cmd_tag)413        cmd["host"] = self._get_attribute(cmd_tag, "host")414        cmd["netns"] = None415        if self._has_attribute(cmd_tag, "netns"):416            cmd["netns"] = self._get_attribute(cmd_tag, "netns")417        has_module = self._has_attribute(cmd_tag, "module")418        has_command = self._has_attribute(cmd_tag, "command")419        has_from = self._has_attribute(cmd_tag, "from")420        if (has_module and has_command) or (has_module and has_from):421            msg = "Invalid combination of attributes."422            raise RecipeError(msg, cmd)423        if has_module:424            cmd["type"] = "test"425            cmd["module"] = self._get_attribute(cmd_tag, "module")426            # options427            opts_tag = cmd_tag.find("options")428            opts = self._process_options(opts_tag)429            if len(opts) > 0:430                cmd["options"] = opts431        elif has_command:432            cmd["type"] = "exec"433            cmd["command"] = self._get_attribute(cmd_tag, "command")434            if self._has_attribute(cmd_tag, "from"):435                cmd["from"] = self._get_attribute(cmd_tag, "from")436        if self._has_attribute(cmd_tag, "bg_id"):437            cmd["bg_id"] = self._get_attribute(cmd_tag, "bg_id")438        if self._has_attribute(cmd_tag, "timeout"):439            cmd["timeout"] = self._get_attribute(cmd_tag, "timeout")440        if self._has_attribute(cmd_tag, "expect"):441            cmd["expect"] = self._get_attribute(cmd_tag, "expect")442        return cmd443    def _process_config_cmd(self, cmd_tag):444        cmd = XmlData(cmd_tag)445        cmd["type"] = "config"446        cmd["host"] = self._get_attribute(cmd_tag, "host")447        cmd["netns"] = None448        if self._has_attribute(cmd_tag, "netns"):449            cmd["netns"] = self._get_attribute(cmd_tag, "netns")450        if self._has_attribute(cmd_tag, "persistent"):451            cmd["persistent"] = self._get_attribute(cmd_tag, "persistent")452        # inline option453        if self._has_attribute(cmd_tag, "option"):454            cmd["options"] = XmlCollection(cmd_tag)455            if self._has_attribute(cmd_tag, "value"):456                opt = XmlData(cmd_tag)457                opt["name"] = self._get_attribute(cmd_tag, "option")458                opt["value"] = self._get_attribute(cmd_tag, "value")459                cmd["options"] = XmlCollection(cmd_tag)460                cmd["options"].append(opt)461            else:462                raise RecipeError("Missing option value.", cmd)463        else:464            # options465            opts_tag = cmd_tag.find("options")466            opts = self._process_options(opts_tag)467            if len(opts) > 0:468                cmd["options"] = opts469        return cmd470    def _process_ctl_wait_cmd(self, cmd_tag):471        cmd = XmlData(cmd_tag)472        cmd["type"] = "ctl_wait"473        cmd["seconds"] = self._get_attribute(cmd_tag, "seconds")474        return cmd475    def _process_signal_cmd(self, cmd_tag):476        cmd = XmlData(cmd_tag)477        cmd["type"] = cmd_tag.tag478        cmd["host"] = self._get_attribute(cmd_tag, "host")479        cmd["bg_id"] = self._get_attribute(cmd_tag, "bg_id")480        cmd["netns"] = None...app.py
Source:app.py  
...58        if user_input == '':59            return argument.default_value60        elif re.fullmatch(argument.pattern, user_input):61            return argument.cast(user_input)62def _process_options(options: List[Option]) -> None:63    print()64    for option in options:65        print(f'{option.key}) {option.description}')66    user_input = input('> ')67    dictionary = {option.key: option for option in options}68    chosen_option = dictionary.get(user_input)69    if isinstance(chosen_option, ChoiceOption):70        _process_options(chosen_option.suboptions)71    elif isinstance(chosen_option, ActionOption):72        if chosen_option.arguments:73            arguments = []74            for argument in chosen_option.arguments:75                arguments.append(_request_argument(argument))76            chosen_option.action(*arguments)77        else:78            chosen_option.action()79    else:80        _process_options(options)81@constants.add_parameters82def default() -> None:83    print_service.print_subheading('Please choose an option...')84    _process_options(all_options)85@argh.named('backup')86@constants.add_parameters87def backup() -> None:88    backup_service.start_backup()89def main() -> None:90    print_service.print_heading('sensor-am2302-history-client')91    parser = ArgumentParser()92    argh.add_commands(parser, [backup])93    argh.set_default_command(parser, default)94    argh.dispatch(parser)95if __name__ == '__main__':...list.py
Source:list.py  
...15                return result.index(options[index])16            except ValueError:17                pass18        raise ValueError19    def _process_options(options_list):20        deferred = []21        connected = False22        for options in options_list:23            for i, option in enumerate(options):24                # option already processed25                if option in result:26                    continue27                try:28                    # scan upwards and insert after29                    index = _scan(options, i, -1)30                    result.insert(index + 1, option)31                    connected = True32                except ValueError:33                    try:34                        # scan downwards and insert before35                        index = _scan(options, i, 1)36                        result.insert(index, option)37                        connected = True38                    except ValueError:39                        # not able to connect40                        if result:41                            deferred.append(options)42                        else:43                            result.extend(options)44                        break45        if deferred:46            if connected:47                # process deferred options48                _process_options(deferred)49            else:50                # done, simply add remaining items to bottom51                for options in deferred:52                    for option in options:53                        if option not in result:54                            result.append(option)55    # collect result56    _process_options(options_list)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
