Best Python code snippet using tempest_python
unifi.py
Source:unifi.py  
1import re2from typing import Dict, NoReturn, Union, cast3from .monitor import Monitor, register4try:5    from paramiko.client import SSHClient, RejectPolicy6    from paramiko.ssh_exception import SSHException7    ssh2_available = True8except ImportError:9    ssh2_available = False10@register11class MonitorUnifiFailover(Monitor):12    monitor_type = "unifi_failover"13    def __init__(self, name: str, config_options: dict) -> None:14        if "gap" not in config_options:15            config_options["gap"] = 300  # 5 mins16        super().__init__(name, config_options)17        self._router_address = cast(18            str, self.get_config_option("router_address", required=True)19        )20        self._username = cast(21            str, self.get_config_option("router_username", required=True)22        )23        self._password = self.get_config_option("router_password", required=False)24        self._ssh_key = self.get_config_option("ssh_key", required=False)25        if self._ssh_key is None and self._password is None:26            raise ValueError("must specify only one of router_password or ssh_key")27        self._check_interface = cast(28            str, self.get_config_option("check_interface", default="eth2")29        )30    def run_test(self) -> Union[NoReturn, bool]:31        if not ssh2_available:32            return self.record_fail("ssh2_python library is not installed")33        try:34            with SSHClient() as client:35                client.set_missing_host_key_policy(RejectPolicy)36                client.load_system_host_keys()37                client.connect(38                    hostname=self._router_address,39                    username=self._username,40                    password=self._password,41                    key_filename=self._ssh_key,42                )43                _, stdout, _ = client.exec_command(  # nosec44                    "sudo /usr/sbin/ubnt-hal wlbGetStatus"45                )46                data = {}  # type: Dict[str, Dict[str, str]]47                data_block = {}  # type: Dict[str, str]48                interface = ""49                for _line in stdout.readlines():50                    line = _line.strip()51                    matches = re.match(r"interface +: (\w+)", line)52                    if matches:53                        if interface != "" and len(data_block) > 0:54                            data[interface] = data_block55                            data_block = {}56                        interface = matches.group(1)57                        data_block = {}58                        continue59                    matches = re.match(r"carrier +: (\w+)", line)60                    if matches:61                        data_block["carrier"] = matches.group(1)62                        continue63                    matches = re.match(r"status +: (\w+)", line)64                    if matches:65                        data_block["status"] = matches.group(1)66                        continue67                    matches = re.match(r"gateway +: (\w+)", line)68                    if matches:69                        data_block["gateway"] = matches.group(1)70                if interface != "" and len(data_block) > 0:71                    data[interface] = data_block72        except SSHException as error:73            self.monitor_logger.exception("Failed to ssh to USG")74            return self.record_fail("Failed to ssh to USG: {}".format(error))75        if self._check_interface not in data:76            self.monitor_logger.debug("processed data was %s", data)77            return self.record_fail(78                "Could not get status for interface {}".format(self._check_interface)79            )80        if data[self._check_interface]["carrier"] != "up":81            return self.record_fail(82                "Interface {} carrier is in status {} (wanted 'up')".format(83                    self._check_interface, data[self._check_interface]["carrier"]84                )85            )86        if data[self._check_interface]["status"] != "failover":87            return self.record_fail(88                "Interface {} is in status {} (wanted 'failover')".format(89                    self._check_interface, data[self._check_interface]["status"]90                )91            )92        if data[self._check_interface]["gateway"] == "unknown":93            return self.record_fail(94                "Interface {} has gateway {}".format(95                    self._check_interface, data[self._check_interface]["gateway"]96                )97            )98        return self.record_success(99            "Interface {} is {} with status {}".format(100                self._check_interface,101                data[self._check_interface]["carrier"],102                data[self._check_interface]["status"],103            )104        )105    def describe(self) -> str:106        return "Checking USG at {} has interface {} up and not failed over".format(107            self._router_address, self._check_interface108        )109@register110class MonitorUnifiFailoverWatchdog(Monitor):111    monitor_type = "unifi_watchdog"112    def __init__(self, name: str, config_options: dict) -> None:113        if "gap" not in config_options:114            config_options["gap"] = 300  # 5 mins115        super().__init__(name, config_options)116        self._router_address = cast(117            str, self.get_config_option("router_address", required=True)118        )119        self._username = cast(120            str, self.get_config_option("router_username", required=True)121        )122        self._password = self.get_config_option("router_password", required=False)123        self._ssh_key = self.get_config_option("ssh_key", required=False)124        if self._ssh_key is None and self._password is None:125            raise ValueError("must specify only one of router_password or ssh_key")126        self._primary_interface = cast(127            str, self.get_config_option("primary_interface", default="pppoe0")128        )129        self._secondary_interface = cast(130            str, self.get_config_option("secondary_interface", default="eth2")131        )132    def run_test(self) -> Union[NoReturn, bool]:133        if not ssh2_available:134            return self.record_fail("ssh2_python library is not installed")135        try:136            with SSHClient() as client:137                client.set_missing_host_key_policy(RejectPolicy)138                client.load_system_host_keys()139                client.connect(140                    hostname=self._router_address,141                    username=self._username,142                    password=self._password,143                    key_filename=self._ssh_key,144                )145                _, stdout, _ = client.exec_command(  # nosec146                    "/usr/sbin/ubnt-hal wlbGetWdStatus"147                )148                data = {}  # type: Dict[str, Dict[str, str]]149                data_block = {}  # type: Dict[str, str]150                interface = ""151                for _line in stdout.readlines():152                    line = _line.strip()153                    matches = re.match(154                        r"([a-z]+[0-9])", line155                    )  # two spaces and an if name156                    if matches:157                        if interface != "" and len(data_block) > 0:158                            data[interface] = data_block159                            data_block = {}160                        interface = matches.group(1)161                        data_block = {}162                        continue163                    matches = re.match(r"status: (\w+)", line)164                    if matches:165                        data_block["status"] = matches.group(1)166                        continue167                    matches = re.match(r"ping gateway: ([^ ]+) - (\w+)", line)168                    if matches:169                        data_block["gateway"] = matches.group(1)170                        data_block["ping_status"] = matches.group(2)171                        continue172                if interface != "" and len(data_block) > 0:173                    data[interface] = data_block174        except SSHException as error:175            self.monitor_logger.exception("Failed to ssh to USG")176            return self.record_fail("Failed to ssh to USG: {}".format(error))177        for interface in [self._primary_interface, self._secondary_interface]:178            if interface not in data:179                self.monitor_logger.debug("processed data was %s", data)180                return self.record_fail(181                    "Could not get status for interface {}".format(interface)182                )183            if data[interface]["status"] != "Running":184                return self.record_fail(185                    "Interface {} in status {} (wanted 'Running')".format(186                        interface, data[interface]["status"]187                    )188                )189            if data[interface]["ping_status"] != "REACHABLE":190                return self.record_fail(191                    "Interface {} ping ({}) is {} (wanted 'REACHABLE')".format(192                        interface,193                        data[interface]["gateway"],194                        data[interface]["ping_status"],195                    )196                )197        return self.record_success(198            "Interfaces {} and {} both running and pinging".format(199                self._primary_interface, self._secondary_interface200            )201        )202    def describe(self) -> str:203        return "Checking USG at {} has interface {} and {} running and pinging".format(204            self._router_address, self._primary_interface, self._secondary_interface...test_ovsdb_lib.py
Source:test_ovsdb_lib.py  
...45        # Make sure exceptions pass through by calling do_post_commit directly46        mock.patch.object(47            impl_idl.OvsVsctlTransaction, 'post_commit',48            side_effect=impl_idl.OvsVsctlTransaction.do_post_commit).start()49    def _check_interface(self, port, parameter, expected_value):50        def check_value():51            return (self._ovsdb.db_get(52                'Interface', port, parameter).execute() == expected_value)53        self.assertTrue(base.wait_until_true(check_value, timeout=2,54                                             sleep=0.5))55    def _add_port(self, bridge, port, may_exist=True):56        with self._ovsdb.transaction() as txn:57            txn.add(self._ovsdb.add_port(bridge, port, may_exist=may_exist))58            txn.add(self._ovsdb.db_set('Interface', port,59                                       ('type', 'internal')))60        self.assertIn(port, self._list_ports_in_bridge(bridge))61    def _list_ports_in_bridge(self, bridge):62        return self._ovsdb.list_ports(bridge).execute()63    def _check_bridge(self, name):64        return self._ovsdb.br_exists(name).execute()65    def _add_bridge(self, name, may_exist=True, datapath_type=None):66        self._ovsdb.add_br(name, may_exist=may_exist,67                           datapath_type=datapath_type).execute()68        self.assertTrue(self._check_bridge(name))69    def _del_bridge(self, name):70        self._ovsdb.del_br(name).execute()71    def test__set_mtu_request(self):72        port_name = 'port1-' + self.interface73        self._add_bridge(self.brname)74        self.addCleanup(self._del_bridge, self.brname)75        self._add_port(self.brname, port_name)76        if self.ovs._ovs_supports_mtu_requests():77            self.ovs._set_mtu_request(port_name, 1000)78            self._check_interface(port_name, 'mtu', 1000)79            self.ovs._set_mtu_request(port_name, 1500)80            self._check_interface(port_name, 'mtu', 1500)81        else:82            self.skipTest('Current version of Open vSwitch does not support '83                          '"mtu_request" parameter')84    def test_create_ovs_vif_port(self):85        port_name = 'port2-' + self.interface86        iface_id = 'iface_id'87        mac = 'ca:fe:ca:fe:ca:fe'88        instance_id = uuidutils.generate_uuid()89        interface_type = constants.OVS_VHOSTUSER_INTERFACE_TYPE90        vhost_server_path = '/fake/path'91        mtu = 150092        self._add_bridge(self.brname)93        self.addCleanup(self._del_bridge, self.brname)94        self.ovs.create_ovs_vif_port(self.brname, port_name, iface_id, mac,95                                     instance_id, mtu=mtu,96                                     interface_type=interface_type,97                                     vhost_server_path=vhost_server_path)98        expected_external_ids = {'iface-status': 'active',99                                 'iface-id': iface_id,100                                 'attached-mac': mac,101                                 'vm-uuid': instance_id}102        self._check_interface(port_name, 'external_ids', expected_external_ids)103        self._check_interface(port_name, 'type', interface_type)104        expected_vhost_server_path = {'vhost-server-path': vhost_server_path}105        self._check_interface(port_name, 'options', expected_vhost_server_path)106    @mock.patch.object(linux_net, 'delete_net_dev')107    def test_delete_ovs_vif_port(self, *mock):108        port_name = 'port3-' + self.interface109        self._add_bridge(self.brname)110        self.addCleanup(self._del_bridge, self.brname)111        self._add_port(self.brname, port_name)112        self.ovs.delete_ovs_vif_port(self.brname, port_name)113        self.assertNotIn(port_name, self._list_ports_in_bridge(self.brname))114    def test_ensure_ovs_bridge(self):115        bridge_name = 'bridge2-' + self.interface116        self.ovs.ensure_ovs_bridge(bridge_name, constants.OVS_DATAPATH_SYSTEM)117        self.assertTrue(self._check_bridge(bridge_name))...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
