Best Python code snippet using lisa_python
platform_.py
Source:platform_.py  
...493            # parsed from latest to a specified version.494            node.capability.set_extended_runbook(aws_node_runbook)495            nodes_parameters.append(aws_node_runbook)496            # Set data disk array497            aws_parameters.data_disks = self._generate_data_disks(498                node, aws_node_runbook499            )500            if not aws_parameters.location:501                # take first one's location502                aws_parameters.location = aws_node_runbook.location503            # save vm's information into node504            node_context = get_node_context(node)505            # vm's name, use to find it from aws506            node_context.vm_name = aws_node_runbook.name507            # ssh related information will be filled back once vm is created508            node_context.username = self.runbook.admin_username509            node_context.private_key_file = self.runbook.admin_private_key_file510            log.info(f"vm setting: {aws_node_runbook}")511        aws_parameters.nodes = nodes_parameters512        # In Azure, each VM should have only one nic in one subnet. So calculate513        # the max nic count, and set to subnet count.514        aws_parameters.subnet_count = max(x.nic_count for x in aws_parameters.nodes)515        # composite deployment properties516        parameters = aws_parameters.to_dict()  # type:ignore517        parameters = {k: {"value": v} for k, v in parameters.items()}518        log.debug(f"parameters: {parameters}")519        return aws_parameters520    def _create_node_runbook(521        self,522        index: int,523        node_space: schema.NodeSpace,524        log: Logger,525    ) -> AwsNodeSchema:526        aws_node_runbook = node_space.get_extended_runbook(AwsNodeSchema, type_name=AWS)527        if not aws_node_runbook.name:528            aws_node_runbook.name = f"node-{index}"529        if not aws_node_runbook.vm_size:530            raise LisaException("vm_size is not detected before deploy")531        if not aws_node_runbook.location:532            raise LisaException("location is not detected before deploy")533        if not aws_node_runbook.marketplace:534            # set to default marketplace, if nothing specified535            aws_node_runbook.marketplace = AwsVmMarketplaceSchema()536        # Set disk type537        assert node_space.disk, "node space must have disk defined."538        assert isinstance(node_space.disk.disk_type, schema.DiskType)539        aws_node_runbook.disk_type = features.get_aws_disk_type(540            node_space.disk.disk_type541        )542        aws_node_runbook.data_disk_caching_type = node_space.disk.data_disk_caching_type543        assert isinstance(544            node_space.disk.data_disk_iops, int545        ), f"actual: {type(node_space.disk.data_disk_iops)}"546        aws_node_runbook.data_disk_iops = node_space.disk.data_disk_iops547        assert isinstance(548            node_space.disk.data_disk_size, int549        ), f"actual: {type(node_space.disk.data_disk_size)}"550        aws_node_runbook.data_disk_size = node_space.disk.data_disk_size551        assert node_space.network_interface552        assert isinstance(553            node_space.network_interface.nic_count, int554        ), f"actual: {node_space.network_interface.nic_count}"555        aws_node_runbook.nic_count = node_space.network_interface.nic_count556        assert isinstance(557            node_space.network_interface.data_path, schema.NetworkDataPath558        ), f"actual: {type(node_space.network_interface.data_path)}"559        if node_space.network_interface.data_path == schema.NetworkDataPath.Sriov:560            aws_node_runbook.enable_sriov = True561        return aws_node_runbook562    def _deploy(563        self, deployment_parameters: AwsDeployParameter, log: Logger564    ) -> Dict[str, Any]:565        ec2_resource = boto3.resource("ec2")566        instances = {}567        subnets = self._create_subnets(self._vpc.id, deployment_parameters, log)568        block_device_mappings = self._create_block_devices(deployment_parameters, log)569        for node in deployment_parameters.nodes:570            network_interfaces = self._create_network_interfaces(571                deployment_parameters, node, subnets, log572            )573            try:574                instance = ec2_resource.create_instances(575                    ImageId=node.get_image_id(),576                    InstanceType=cast(InstanceTypeType, node.vm_size),577                    NetworkInterfaces=network_interfaces,578                    BlockDeviceMappings=block_device_mappings,579                    KeyName=deployment_parameters.key_pair_name,580                    MinCount=1,581                    MaxCount=1,582                )[0]583                instance.wait_until_running()584                instance.load()585                log.info("Created instance %s.", instance.id)586                # Enable ENA support if the test case requires.587                # Don't support the Intel 82599 Virtual Function (VF) interface now.588                # Refer to the document about AWS Enhanced networking on Linux.589                if node.enable_sriov and (not instance.ena_support):590                    self._ec2_client.modify_instance_attribute(591                        InstanceId=instance.id,592                        EnaSupport={593                            "Value": True,594                        },595                    )596                instances[node.name] = instance.instance_id597            except ClientError:598                log.exception(599                    "Couldn't create instance with image %s, "600                    "instance type %s, and key %s.",601                    node.get_image_id(),602                    node.vm_size,603                    deployment_parameters.key_pair_name,604                )605                raise606        return instances607    def _delete_environment(self, environment: Environment, log: Logger) -> None:608        environment_context = get_environment_context(environment=environment)609        security_group_name = environment_context.security_group_name610        # the resource group name is empty when it is not deployed for some reasons,611        # like capability doesn't meet case requirement.612        if not security_group_name:613            return614        assert self._aws_runbook615        if not environment_context.security_group_is_created:616            log.info(617                f"skipped to delete security resource group: {security_group_name}, "618                f"as it's not created by this run."619            )620        elif self._aws_runbook.dry_run:621            log.info(622                f"skipped to delete security resource group: {security_group_name}, "623                f"as it's a dry run."624            )625        else:626            ec2_resource = boto3.resource("ec2")627            for node in environment.nodes.list():628                node_context = get_node_context(node)629                instance_id = node_context.instance_id630                self.terminate_instance(ec2_resource, instance_id, log)631            self.delete_security_group(632                ec2_resource,633                environment_context.security_group_id,634                environment_context.security_group_name,635                log,636            )637            self.delete_key_pair(ec2_resource, environment_context.key_pair_name, log)638            try:639                log.info(f"deleting vpc: {self._vpc.id}")640                for association in self._route_table.associations:641                    association.delete()642                self._route_table.delete()643                self._internet_gateway.detach_from_vpc(VpcId=self._vpc.id)644                self._internet_gateway.delete()645                for subnet in self._vpc.subnets.all():646                    subnet.delete()647                self._vpc.delete()648            except ClientError:649                log.exception(650                    "Couldn't delete vpc %s.",651                    self._vpc.id,652                )653                raise654    def terminate_instance(655        self, ec2_resource: Any, instance_id: str, log: Logger656    ) -> None:657        if not instance_id:658            return659        try:660            instance = ec2_resource.Instance(instance_id)661            instance.terminate()662            instance.wait_until_terminated()663            log.info("Terminating instance %s.", instance_id)664        except ClientError:665            log.exception("Couldn't terminate instance %s.", instance_id)666    def delete_security_group(667        self, ec2_resource: Any, group_id: str, security_group_name: str, log: Logger668    ) -> None:669        try:670            ec2_resource.SecurityGroup(group_id).delete()671            log.info("Deleting security group: %s.", security_group_name)672        except ClientError:673            log.exception(674                "Couldn't delete security group %s.",675                security_group_name,676            )677    def delete_key_pair(self, ec2_resource: Any, key_name: str, log: Logger) -> None:678        try:679            ec2_resource.KeyPair(key_name).delete()680            log.info("Deleted key pair %s.", key_name)681        except ClientError:682            log.exception("Couldn't delete key pair %s.", key_name)683    def _create_subnets(684        self, vpc_id: str, deployment_parameters: AwsDeployParameter, log: Logger685    ) -> Dict[int, Any]:686        subnets: Dict[int, Any] = {}687        try:688            addrs = self._vpc.cidr_block.split(".")689            for i in range(deployment_parameters.subnet_count):690                cidr_block = f"{addrs[0]}.{addrs[1]}.{str(i)}.0/24"691                subnets[i] = self._ec2_client.create_subnet(692                    CidrBlock=cidr_block,693                    VpcId=vpc_id,694                )695                self._route_table.associate_with_subnet(696                    SubnetId=subnets[i]["Subnet"]["SubnetId"]697                )698        except ClientError:699            log.exception("Could not create a custom subnet.")700            raise701        else:702            return subnets703    def _create_network_interfaces(704        self,705        deployment_parameters: AwsDeployParameter,706        node: AwsNodeSchema,707        subnets: Dict[int, Any],708        log: Logger,709    ) -> List[Any]:710        network_interfaces = [711            {712                "Description": f"{node.name}-extra-0",713                "AssociatePublicIpAddress": True,714                "SubnetId": subnets[0]["Subnet"]["SubnetId"],715                "DeviceIndex": 0,716                "Groups": [deployment_parameters.security_group_id],717            }718        ]719        for i in range(1, node.nic_count):720            network_interfaces.append(721                {722                    "Description": f"{node.name}-extra-{i}",723                    "AssociatePublicIpAddress": False,724                    "SubnetId": subnets[i]["Subnet"]["SubnetId"],725                    "DeviceIndex": i,726                    "Groups": [deployment_parameters.security_group_id],727                }728            )729        return network_interfaces730    def _create_block_devices(731        self,732        deployment_parameters: AwsDeployParameter,733        log: Logger,734    ) -> List[Any]:735        # There are some instance volume limits, please refer to736        # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html#linux-specific-volume-limits737        block_device_mappings = []738        volumes = self._get_available_volumes(deployment_parameters)739        for idx, disk in enumerate(deployment_parameters.data_disks):740            if (741                disk.create_option742                == DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_EMPTY743            ):744                if idx >= len(volumes):745                    raise LisaException(746                        f"No device names available "747                        f"for {len(deployment_parameters.data_disks)} disks!",748                    )749                block_device_mappings.append(750                    {751                        "DeviceName": volumes[idx],752                        "Ebs": {753                            "DeleteOnTermination": True,754                            "VolumeSize": disk.size,755                            "VolumeType": disk.type,756                            "Iops": disk.iops,757                        },758                    }759                )760        return block_device_mappings761    def _get_available_volumes(762        self, deployment_parameters: AwsDeployParameter763    ) -> List[str]:764        # In current implementation, all nodes use the same image.765        image_id = deployment_parameters.nodes[0].get_image_id()766        virtualization_type = boto3.resource("ec2").Image(image_id).virtualization_type767        volumes: List[str] = []768        # Create the available volume names based on virtualization type.769        # Refer to the following link770        # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/device_naming.html771        if virtualization_type == "hvm":772            for c in range(ord("b"), ord("c") + 1):773                for p in range(ord("a"), ord("z") + 1):774                    volumes.append(f"/dev/xvd{chr(c)}{chr(p)}")775        elif virtualization_type == "paravirtual":776            for c in range(ord("f"), ord("p") + 1):777                for p in range(1, 7):778                    volumes.append(f"/dev/sd{chr(c)}{p}")779        else:780            raise LisaException(781                f"The virtualization type {virtualization_type} is not supported now."782            )783        return volumes784    def _initialize_nodes(785        self, environment: Environment, instances: Dict[str, Any], log: Logger786    ) -> None:787        ec2_resource = boto3.resource("ec2")788        node_context_map: Dict[str, Node] = {}789        for node in environment.nodes.list():790            node_context = get_node_context(node)791            node_context.instance_id = instances[node_context.vm_name]792            node_context_map[node_context.vm_name] = node793        for vm_name, node in node_context_map.items():794            node_context = get_node_context(node)795            vm = ec2_resource.Instance(node_context.instance_id)796            if not vm:797                raise LisaException(798                    f"cannot find vm: '{vm_name}', make sure deployment is correct."799                )800            public_ip = vm.public_ip_address801            assert public_ip, "public IP address cannot be empty!"802            if not node.name:803                node.name = vm_name804            assert isinstance(node, RemoteNode)805            node.set_connection_info(806                address=vm.private_ip_address,807                port=22,808                public_address=public_ip,809                public_port=22,810                username=node_context.username,811                password=node_context.password,812                private_key_file=node_context.private_key_file,813            )814    @retry(tries=10, delay=1, jitter=(0.5, 1))815    def _load_location_info_from_file(816        self, cached_file_name: Path, log: Logger817    ) -> Optional[AwsLocation]:818        loaded_obj: Optional[AwsLocation] = None819        if cached_file_name.exists():820            try:821                with open(cached_file_name, "r") as f:822                    loaded_data: Dict[str, Any] = json.load(f)823                loaded_obj = schema.load_by_type(AwsLocation, loaded_data)824            except Exception as identifier:825                # if schema changed, There may be exception, remove cache and retry826                # Note: retry on this method depends on decorator827                log.debug(828                    f"error on loading cache, delete cache and retry. {identifier}"829                )830                cached_file_name.unlink()831                raise identifier832        return loaded_obj833    def _get_location_info(self, location: str, log: Logger) -> AwsLocation:834        cached_file_name = constants.CACHE_PATH.joinpath(835            f"aws_locations_{location}.json"836        )837        should_refresh: bool = True838        key = location839        location_data = self._locations_data_cache.get(key, None)840        if not location_data:841            location_data = self._load_location_info_from_file(842                cached_file_name=cached_file_name, log=log843            )844        if location_data:845            delta = datetime.now() - location_data.updated_time846            # refresh cached locations every 1 day.847            if delta.days < 1:848                should_refresh = False849                log.debug(850                    f"{key}: cache used: {location_data.updated_time}, "851                    f"sku count: {len(location_data.capabilities)}"852                )853            else:854                log.debug(855                    f"{key}: cache timeout: {location_data.updated_time},"856                    f"sku count: {len(location_data.capabilities)}"857                )858        else:859            log.debug(f"{key}: no cache found")860        if should_refresh:861            ec2_region = boto3.client("ec2", region_name=location)862            log.debug(f"{key}: querying")863            all_skus: List[AwsCapability] = []864            instance_types = ec2_region.describe_instance_types()865            for instance_type in instance_types["InstanceTypes"]:866                capability = self._instance_type_to_capability(location, instance_type)867                # estimate vm cost for priority868                assert isinstance(capability.core_count, int)869                assert isinstance(capability.gpu_count, int)870                estimated_cost = capability.core_count + capability.gpu_count * 100871                aws_capability = AwsCapability(872                    location=location,873                    vm_size=instance_type["InstanceType"],874                    capability=capability,875                    resource_sku=instance_type,876                    estimated_cost=estimated_cost,877                )878                all_skus.append(aws_capability)879            location_data = AwsLocation(location=location, capabilities=all_skus)880            log.debug(f"{location}: saving to disk")881            with open(cached_file_name, "w") as f:882                json.dump(location_data.to_dict(), f)  # type: ignore883            log.debug(f"{key}: new data, " f"sku: {len(location_data.capabilities)}")884        assert location_data885        self._locations_data_cache[key] = location_data886        return location_data887    def _instance_type_to_capability(  # noqa: C901888        self, location: str, instance_type: Any889    ) -> schema.NodeSpace:890        # fill in default values, in case no capability meet.891        node_space = schema.NodeSpace(892            node_count=1,893            core_count=0,894            memory_mb=0,895            gpu_count=0,896        )897        instancetype_name: str = instance_type["InstanceType"]898        node_space.name = f"{location}_{instancetype_name}"899        node_space.features = search_space.SetSpace[schema.FeatureSettings](900            is_allow_set=True901        )902        node_space.disk = features.AwsDiskOptionSettings()903        node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](904            is_allow_set=True, items=[]905        )906        node_space.disk.data_disk_iops = search_space.IntRange(min=0)907        node_space.disk.data_disk_size = search_space.IntRange(min=0)908        node_space.network_interface = schema.NetworkInterfaceOptionSettings()909        node_space.network_interface.data_path = search_space.SetSpace[910            schema.NetworkDataPath911        ](is_allow_set=True, items=[])912        for name, value in instance_type.items():913            if name == "VCpuInfo":914                node_space.core_count = int(value["DefaultVCpus"])915            elif name == "MemoryInfo":916                node_space.memory_mb = int(value["SizeInMiB"])917            elif name == "NetworkInfo":918                nic_count = value["MaximumNetworkInterfaces"]919                node_space.network_interface.nic_count = search_space.IntRange(920                    min=1, max=nic_count921                )922                node_space.network_interface.max_nic_count = nic_count923                if value["EnaSupport"] == "supported":924                    node_space.network_interface.data_path.add(925                        schema.NetworkDataPath.Sriov926                    )927            elif name == "GpuInfo":928                for gpu in value["Gpus"]:929                    node_space.gpu_count += gpu["Count"]930                # update features list if gpu feature is supported931                node_space.features.add(932                    schema.FeatureSettings.create(features.Gpu.name())933                )934        # all nodes support following features935        node_space.features.update(936            [937                schema.FeatureSettings.create(features.StartStop.name()),938                schema.FeatureSettings.create(features.SerialConsole.name()),939            ]940        )941        node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)942        node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)943        node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)944        node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)945        return node_space946    def get_eligible_vm_sizes(self, location: str, log: Logger) -> List[AwsCapability]:947        # load eligible vm sizes948        # 1. vm size supported in current location949        # 2. vm size match predefined pattern950        location_capabilities: List[AwsCapability] = []951        key = self._get_location_key(location)952        if key not in self._eligible_capabilities:953            location_info: AwsLocation = self._get_location_info(location, log)954            # loop all fall back levels955            for fallback_pattern in VM_SIZE_FALLBACK_PATTERNS:956                level_capabilities: List[AwsCapability] = []957                # loop all capabilities958                for aws_capability in location_info.capabilities:959                    # exclude one core which may be too slow to work in some distro960                    assert isinstance(aws_capability.capability.core_count, int)961                    if (962                        fallback_pattern.match(aws_capability.vm_size)963                        and aws_capability.capability.core_count > 1964                    ):965                        level_capabilities.append(aws_capability)966                # sort by rough cost967                level_capabilities.sort(key=lambda x: (x.estimated_cost))968                log.debug(969                    f"{key}, pattern '{fallback_pattern.pattern}'"970                    f" {len(level_capabilities)} candidates: "971                    f"{[x.vm_size for x in level_capabilities]}"972                )973                location_capabilities.extend(level_capabilities)974            self._eligible_capabilities[key] = location_capabilities975        return self._eligible_capabilities[key]976    def _get_location_key(self, location: str) -> str:977        return f"lisa_aws_{location}"978    def _generate_min_capability(979        self,980        requirement: schema.NodeSpace,981        aws_capability: AwsCapability,982        location: str,983    ) -> schema.NodeSpace:984        min_cap: schema.NodeSpace = requirement.generate_min_capability(985            aws_capability.capability986        )987        # Apply aws specified values.988        aws_node_runbook = min_cap.get_extended_runbook(AwsNodeSchema, AWS)989        if aws_node_runbook.location:990            assert aws_node_runbook.location == location, (991                f"predefined location [{aws_node_runbook.location}] "992                f"must be same as "993                f"cap location [{location}]"994            )995        # the location may not be set996        aws_node_runbook.location = location997        aws_node_runbook.vm_size = aws_capability.vm_size998        assert min_cap.network_interface999        assert isinstance(1000            min_cap.network_interface.nic_count, int1001        ), f"actual: {min_cap.network_interface.nic_count}"1002        aws_node_runbook.nic_count = min_cap.network_interface.nic_count1003        assert isinstance(1004            min_cap.network_interface.data_path, schema.NetworkDataPath1005        ), f"actual: {type(min_cap.network_interface.data_path)}"1006        if min_cap.network_interface.data_path == schema.NetworkDataPath.Sriov:1007            aws_node_runbook.enable_sriov = True1008        assert min_cap.disk, "disk must exists"1009        assert isinstance(1010            min_cap.disk.data_disk_count, int1011        ), f"actual: {min_cap.disk.data_disk_count}"1012        aws_node_runbook.data_disk_count = min_cap.disk.data_disk_count1013        assert isinstance(1014            min_cap.disk.data_disk_caching_type, str1015        ), f"actual: {min_cap.disk.data_disk_caching_type}"1016        aws_node_runbook.data_disk_caching_type = min_cap.disk.data_disk_caching_type1017        return min_cap1018    def _generate_data_disks(1019        self,1020        node: Node,1021        aws_node_runbook: AwsNodeSchema,1022    ) -> List[DataDiskSchema]:1023        data_disks: List[DataDiskSchema] = []1024        assert node.capability.disk1025        if aws_node_runbook.marketplace:1026            image = boto3.resource("ec2").Image(aws_node_runbook.marketplace.imageid)1027            # AWS images has the root data disks by default1028            for data_disk in image.block_device_mappings:1029                if "Ebs" in data_disk and "VolumeSize" in data_disk["Ebs"]:1030                    assert isinstance(node.capability.disk.data_disk_iops, int)1031                    data_disks.append(1032                        DataDiskSchema(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
