Best Python code snippet using lisa_python
platform_.py
Source:platform_.py  
...1225                f"from '{environment_context.resource_group_name}'"1226            )1227        return nics_map1228    @retry(exceptions=LisaException, tries=150, delay=2)1229    def load_public_ips_from_resource_group(1230        self, resource_group_name: str, log: Logger1231    ) -> Dict[str, str]:1232        network_client = get_network_client(self)1233        log.debug(f"listing public ips in resource group '{resource_group_name}'")1234        # get public IP1235        public_ip_addresses = network_client.public_ip_addresses.list(1236            resource_group_name1237        )1238        public_ips_map: Dict[str, str] = {}1239        for ip_address in public_ip_addresses:1240            # nic name is like node-0-nic-2, get vm name part for later pick1241            # only find primary nic, which is ended by -nic-01242            node_name_from_public_ip = RESOURCE_ID_PUBLIC_IP_PATTERN.findall(1243                ip_address.name1244            )1245            assert (1246                ip_address1247            ), f"public IP address cannot be empty, ip_address object: {ip_address}"1248            if node_name_from_public_ip:1249                name = node_name_from_public_ip[0]1250                public_ips_map[name] = ip_address.ip_address1251                log.debug(1252                    f"  found public IP '{ip_address.name}', and saved for next step."1253                )1254            else:1255                log.debug(1256                    f"  found public IP '{ip_address.name}', but dropped "1257                    "because it's not primary nic."1258                )1259        if not public_ips_map:1260            raise LisaException(1261                f"deployment succeeded, but public ips not found in 5 minutes "1262                f"from '{resource_group_name}'"1263            )1264        return public_ips_map1265    def initialize_environment(self, environment: Environment, log: Logger) -> None:1266        node_context_map: Dict[str, Node] = {}1267        for node in environment.nodes.list():1268            node_context = get_node_context(node)1269            node_context_map[node_context.vm_name] = node1270        vms_map: Dict[str, VirtualMachine] = self._load_vms(environment, log)1271        nics_map: Dict[str, NetworkInterface] = self._load_nics(environment, log)1272        environment_context = get_environment_context(environment=environment)1273        public_ips_map: Dict[str, str] = self.load_public_ips_from_resource_group(1274            environment_context.resource_group_name, log1275        )1276        for vm_name, node in node_context_map.items():1277            node_context = get_node_context(node)1278            vm = vms_map.get(vm_name, None)1279            if not vm:1280                raise LisaException(1281                    f"cannot find vm: '{vm_name}', make sure deployment is correct."1282                )1283            nic = nics_map[vm_name]1284            public_ip = public_ips_map[vm_name]1285            address = nic.ip_configurations[0].private_ip_address1286            if not node.name:1287                node.name = vm_name1288            assert isinstance(node, RemoteNode)1289            node.set_connection_info(1290                address=address,1291                port=22,1292                public_address=public_ip,1293                public_port=22,1294                username=node_context.username,1295                password=node_context.password,1296                private_key_file=node_context.private_key_file,1297            )1298        # enable ssh for windows, if it's not Windows, or SSH reachable, it will1299        # skip.1300        run_in_parallel(1301            [1302                partial(self._enable_ssh_on_windows, node=x)1303                for x in environment.nodes.list()1304            ]1305        )1306    def _resource_sku_to_capability(  # noqa: C9011307        self, location: str, resource_sku: ResourceSku1308    ) -> schema.NodeSpace:1309        # fill in default values, in case no capability meet.1310        node_space = schema.NodeSpace(1311            node_count=1,1312            core_count=0,1313            memory_mb=0,1314            gpu_count=0,1315        )1316        node_space.name = f"{location}_{resource_sku.name}"1317        node_space.features = search_space.SetSpace[schema.FeatureSettings](1318            is_allow_set=True1319        )1320        node_space.disk = features.AzureDiskOptionSettings()1321        node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](1322            is_allow_set=True, items=[]1323        )1324        node_space.disk.data_disk_iops = search_space.IntRange(min=0)1325        node_space.disk.data_disk_size = search_space.IntRange(min=0)1326        node_space.network_interface = schema.NetworkInterfaceOptionSettings()1327        node_space.network_interface.data_path = search_space.SetSpace[1328            schema.NetworkDataPath1329        ](is_allow_set=True, items=[])1330        # fill supported features1331        azure_raw_capabilities: Dict[str, str] = {}1332        for sku_capability in resource_sku.capabilities:1333            # prevent to loop in every feature1334            azure_raw_capabilities[sku_capability.name] = sku_capability.value1335        # calculate cpu count. Some vm sizes, like Standard_HC44rs, doesn't have1336        # vCPUsAvailable, so use vCPUs.1337        vcpus_available = int(azure_raw_capabilities.get("vCPUsAvailable", "0"))1338        if vcpus_available:1339            node_space.core_count = vcpus_available1340        else:1341            node_space.core_count = int(azure_raw_capabilities.get("vCPUs", "0"))1342        memory_value = azure_raw_capabilities.get("MemoryGB", None)1343        if memory_value:1344            node_space.memory_mb = int(float(memory_value) * 1024)1345        max_disk_count = azure_raw_capabilities.get("MaxDataDiskCount", None)1346        if max_disk_count:1347            node_space.disk.max_data_disk_count = int(max_disk_count)1348            node_space.disk.data_disk_count = search_space.IntRange(1349                max=node_space.disk.max_data_disk_count1350            )1351        max_nic_count = azure_raw_capabilities.get("MaxNetworkInterfaces", None)1352        if max_nic_count:1353            # set a min value for nic_count work around for an azure python sdk bug1354            # nic_count is 0 when get capability for some sizes e.g. Standard_D8a_v31355            sku_nic_count = int(max_nic_count)1356            if sku_nic_count == 0:1357                sku_nic_count = 11358            node_space.network_interface.nic_count = search_space.IntRange(1359                min=1, max=sku_nic_count1360            )1361            node_space.network_interface.max_nic_count = sku_nic_count1362        premium_io_supported = azure_raw_capabilities.get("PremiumIO", None)1363        if premium_io_supported and eval(premium_io_supported) is True:1364            node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)1365        ephemeral_supported = azure_raw_capabilities.get(1366            "EphemeralOSDiskSupported", None1367        )1368        if ephemeral_supported and eval(ephemeral_supported) is True:1369            # Check if CachedDiskBytes is greater than 30GB1370            # We use diffdisk as cache disk for ephemeral OS disk1371            cached_disk_bytes = azure_raw_capabilities.get("CachedDiskBytes", 0)1372            cached_disk_bytes_gb = int(cached_disk_bytes) / 1024 / 1024 / 10241373            if cached_disk_bytes_gb >= 30:1374                node_space.disk.disk_type.add(schema.DiskType.Ephemeral)1375        # set AN1376        an_enabled = azure_raw_capabilities.get("AcceleratedNetworkingEnabled", None)1377        if an_enabled and eval(an_enabled) is True:1378            # refer1379            # https://docs.microsoft.com/en-us/azure/virtual-machines/dcv2-series#configuration1380            # https://docs.microsoft.com/en-us/azure/virtual-machines/ncv2-series1381            # https://docs.microsoft.com/en-us/azure/virtual-machines/ncv3-series1382            # https://docs.microsoft.com/en-us/azure/virtual-machines/nd-series1383            # below VM size families don't support `Accelerated Networking` but1384            # API return `True`, fix this issue temporarily will revert it till1385            # bug fixed.1386            if resource_sku.family not in [1387                "standardDCSv2Family",1388                "standardNCSv2Family",1389                "standardNCSv3Family",1390                "standardNDSFamily",1391            ]:1392                # update data path types if sriov feature is supported1393                node_space.network_interface.data_path.add(schema.NetworkDataPath.Sriov)1394        # for some new sizes, there is no MaxNetworkInterfaces capability1395        # and we have to set a default value for max_nic_count1396        if not node_space.network_interface.max_nic_count:1397            node_space.network_interface.max_nic_count = 11398        # some vm size do not have resource disk present1399        # https://docs.microsoft.com/en-us/azure/virtual-machines/azure-vms-no-temp-disk1400        if resource_sku.family in [1401            "standardDv4Family",1402            "standardDSv4Family",1403            "standardEv4Family",1404            "standardESv4Family",1405            "standardEASv4Family",1406            "standardEASv5Family",1407            "standardESv5Family",1408            "standardEADSv5Family",1409            "standardDASv5Family",1410            "standardDSv5Family",1411            "standardFSv2Family",1412            "standardNCFamily",1413            "standardESv3Family",1414            "standardDPSv5Family",1415            "standardEBSv5Family",1416            "standardEv5Family",1417        ]:1418            node_space.disk.has_resource_disk = False1419        else:1420            node_space.disk.has_resource_disk = True1421        for supported_feature in self.supported_features():1422            if supported_feature.name() in [1423                features.Disk.name(),1424                features.NetworkInterface.name(),1425            ]:1426                # Skip the disk and network interfaces features. They will be1427                # handled by node_space directly.1428                continue1429            feature_setting = supported_feature.create_setting(1430                raw_capabilities=azure_raw_capabilities,1431                resource_sku=resource_sku,1432                node_space=node_space,1433            )1434            if feature_setting:1435                node_space.features.add(feature_setting)1436        node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)1437        node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)1438        node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)1439        return node_space1440    def get_sorted_vm_sizes(1441        self, capabilities: List[AzureCapability], log: Logger1442    ) -> List[AzureCapability]:1443        # sort vm size by predefined pattern1444        sorted_capabilities: List[AzureCapability] = []1445        found_vm_sizes: Set[str] = set()1446        # loop all fall back levels1447        for fallback_pattern in VM_SIZE_FALLBACK_PATTERNS:1448            level_capabilities: List[AzureCapability] = []1449            # loop all capabilities1450            for capability in capabilities:1451                vm_size = capability.vm_size1452                if fallback_pattern.match(vm_size) and vm_size not in found_vm_sizes:1453                    level_capabilities.append(capability)1454                    found_vm_sizes.add(vm_size)1455            # sort by rough cost1456            level_capabilities.sort(key=lambda x: (x.capability.cost))1457            sorted_capabilities.extend(level_capabilities)1458        return sorted_capabilities1459    def load_public_ip(self, node: Node, log: Logger) -> str:1460        node_context = get_node_context(node)1461        vm_name = node_context.vm_name1462        resource_group_name = node_context.resource_group_name1463        public_ips_map: Dict[str, str] = self.load_public_ips_from_resource_group(1464            resource_group_name=resource_group_name, log=self._log1465        )1466        return public_ips_map[vm_name]1467    @lru_cache(maxsize=10)  # noqa: B0191468    def _resolve_marketplace_image(1469        self, location: str, marketplace: AzureVmMarketplaceSchema1470    ) -> AzureVmMarketplaceSchema:1471        new_marketplace = copy.copy(marketplace)1472        # latest doesn't work, it needs a specified version.1473        if marketplace.version.lower() == "latest":1474            compute_client = get_compute_client(self)1475            with global_credential_access_lock:1476                versioned_images = compute_client.virtual_machine_images.list(1477                    location=location,...common.py
Source:common.py  
...933    environments = load_environments(934        schema.EnvironmentRoot(environments=[environment_runbook])935    )936    environment = next(x for x in environments.values())937    public_ips = platform.load_public_ips_from_resource_group(resource_group_name, log)938    platform_runbook: schema.Platform = platform.runbook939    for node in environment.nodes.list():940        assert isinstance(node, RemoteNode)941        node_context = get_node_context(node)942        node_context.vm_name = node.name943        node_context.resource_group_name = resource_group_name944        node_context.username = platform_runbook.admin_username945        node_context.password = platform_runbook.admin_password946        node_context.private_key_file = platform_runbook.admin_private_key_file947        node.set_connection_info(948            public_address=public_ips[node.name],949            username=node_context.username,950            password=node_context.password,951            private_key_file=node_context.private_key_file,...transformers.py
Source:transformers.py  
...180    def _get_public_ip_address(181        self, platform: AzurePlatform, virtual_machine: Any182    ) -> str:183        runbook: VhdTransformerSchema = self.runbook184        public_ips = platform.load_public_ips_from_resource_group(185            runbook.resource_group_name, self._log186        )187        public_ip_address: str = public_ips[runbook.vm_name]188        assert (189            public_ip_address190        ), "cannot find public IP address, make sure the VM is in running status."191        return public_ip_address192class DeployTransformer(Transformer):193    """194    deploy a node in transformer phase for further operations195    """196    __resource_group_name = "resource_group_name"197    @classmethod198    def type_name(cls) -> str:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
