Best Python code snippet using lisa_python
platform_.py
Source:platform_.py  
...160            features.Disk,161        ]162    def _initialize(self, *args: Any, **kwargs: Any) -> None:163        # set needed environment variables for authentication164        aws_runbook: AwsPlatformSchema = self.runbook.get_extended_runbook(165            AwsPlatformSchema166        )167        assert aws_runbook, "platform runbook cannot be empty"168        self._aws_runbook = aws_runbook169        self._initialize_credential()170        # boto3 client is thread safe171        self._ec2_client = boto3.client("ec2")172    def _initialize_credential(self) -> None:173        aws_runbook = self._aws_runbook174        if aws_runbook.aws_access_key_id:175            os.environ["AWS_ACCESS_KEY_ID"] = aws_runbook.aws_access_key_id176        if aws_runbook.aws_secret_access_key:177            os.environ["AWS_SECRET_ACCESS_KEY"] = aws_runbook.aws_secret_access_key178        if aws_runbook.aws_session_token:179            os.environ["AWS_SESSION_TOKEN"] = aws_runbook.aws_session_token180        if aws_runbook.aws_default_region:181            os.environ["AWS_DEFAULT_REGION"] = aws_runbook.aws_default_region182    def _create_key_pair(self, key_name: str, private_key_file: str) -> Any:183        try:184            ec2_resource = boto3.resource("ec2")185            key_pair = ec2_resource.import_key_pair(186                KeyName=key_name,187                PublicKeyMaterial=get_public_key_data(private_key_file),188            )189            self._log.info("Created key %s.", key_pair.name)190        except ClientError:191            self._log.error("Couldn't create key %s.", key_name)192            raise193        else:194            return key_pair195    def _check_or_create_security_group(  # noqa: C901196        self, security_group_name: str, group_description: str197    ) -> Any:198        try:199            ec2_resource = boto3.resource("ec2")200            # By default, AWS users can create up to 5 VPCs201            for i in range(50, 55):202                cidr_block = "173." + str(i) + ".0.0/16"203                vpcs = list(204                    ec2_resource.vpcs.filter(205                        Filters=[{"Name": "cidr", "Values": [cidr_block]}]206                    )207                )208                if len(vpcs) == 0:209                    self._vpc = ec2_resource.create_vpc(CidrBlock=cidr_block)210                    self._log.info(211                        f"Create a new VPC: {self._vpc.id}"212                        f"with CIDR block {self._vpc.cidr_block}"213                    )214                    self._internet_gateway = ec2_resource.create_internet_gateway()215                    self._vpc.attach_internet_gateway(216                        InternetGatewayId=self._internet_gateway.id217                    )218                    self._route_table = ec2_resource.create_route_table(219                        VpcId=self._vpc.id220                    )221                    self._route_table.create_route(222                        DestinationCidrBlock="0.0.0.0/0",223                        GatewayId=self._internet_gateway.id,224                    )225                    self._log.info(226                        "Create an internet gateway: %s and a route table %s",227                        self._internet_gateway.id,228                        self._route_table.id,229                    )230                    break231            if self._vpc is None:232                raise LisaException(233                    "Couldn't get/create VPCs as there are 5 exiting VPCs."234                    "Please wait for others finishing test."235                )236        except ClientError:237            self._log.exception("Couldn't get/create VPCs.")238            raise239        try:240            security_group = self._vpc.create_security_group(241                GroupName=security_group_name, Description=group_description242            )243            self._log.info(244                "Created security group %s in VPC %s.",245                security_group_name,246                self._vpc.id,247            )248        except ClientError:249            self._log.exception(250                "Couldn't create security group %s.", security_group_name251            )252            raise253        try:254            ip_permissions: List[IpPermissionTypeDef] = [255                {256                    # SSH ingress open to anyone257                    "IpProtocol": "tcp",258                    "FromPort": 22,259                    "ToPort": 22,260                    "IpRanges": [{"CidrIp": "0.0.0.0/0"}],261                },262                {263                    # Open to ips in the vpc264                    "IpProtocol": "-1",265                    "FromPort": -1,266                    "ToPort": -1,267                    "IpRanges": [{"CidrIp": self._vpc.cidr_block}],268                },269            ]270            security_group.authorize_ingress(IpPermissions=ip_permissions)271            self._log.info("Set inbound rules for %s to allow SSH.", security_group.id)272        except ClientError:273            self._log.exception(274                "couldn't authorize inbound rules for %s.", security_group_name275            )276            raise277        else:278            return security_group279    def _prepare_environment(  # noqa: C901280        self, environment: Environment, log: Logger281    ) -> bool:282        # TODO: Reduce this function's complexity and remove the disabled warning.283        """284        Main flow285        1. load location, vm size patterns firstly.286        2. load available vm sizes for each location.287        3. match vm sizes by pattern.288        for each environment289        1. If predefined location exists on node level, check conflict and use it.290        2. If predefined vm size exists on node level, check exists and use it.291        3. check capability for each node by order of pattern.292        4. get min capability for each match293        """294        is_success: bool = True295        ec2_resource = boto3.resource("ec2")296        if environment.runbook.nodes_requirement:297            is_success = False298            nodes_requirement = environment.runbook.nodes_requirement299            node_count = len(nodes_requirement)300            # fills predefined locations here.301            predefined_caps: List[Any] = [None] * node_count302            # make sure all vms are in same location.303            existing_location: str = ""304            predefined_cost: float = 0305            for req in nodes_requirement:306                # covert to aws node space, so the aws extensions can be loaded.307                _convert_to_aws_node_space(req)308                # check locations309                # apply aws specified values310                node_runbook: AwsNodeSchema = req.get_extended_runbook(311                    AwsNodeSchema, AWS312                )313                if node_runbook.location:314                    if existing_location:315                        # if any one has different location, calculate again316                        if existing_location != node_runbook.location:317                            raise LisaException(318                                f"predefined node must be in same location, "319                                f"previous: {existing_location}, "320                                f"found: {node_runbook.location}"321                            )322                    else:323                        existing_location = node_runbook.location324            if existing_location:325                locations = [existing_location]326            else:327                locations = LOCATIONS328            # check eligible locations329            found_or_skipped = False330            for location_name in locations:331                predefined_cost = 0332                predefined_caps = [None] * node_count333                for req_index, req in enumerate(nodes_requirement):334                    found_or_skipped = False335                    node_runbook = req.get_extended_runbook(AwsNodeSchema, AWS)336                    if not node_runbook.vm_size:337                        # not to check, if no vm_size set338                        found_or_skipped = True339                        continue340                    # find predefined vm size on all available's.341                    location_info: AwsLocation = self._get_location_info(342                        location_name, log343                    )344                    matched_score: float = 0345                    matched_cap: Optional[AwsCapability] = None346                    matcher = SequenceMatcher(None, node_runbook.vm_size.lower(), "")347                    for aws_cap in location_info.capabilities:348                        matcher.set_seq2(aws_cap.vm_size.lower())349                        if (350                            node_runbook.vm_size.lower() in aws_cap.vm_size.lower()351                            and matched_score < matcher.ratio()352                        ):353                            matched_cap = aws_cap354                            matched_score = matcher.ratio()355                    if matched_cap:356                        predefined_cost += matched_cap.estimated_cost357                        min_cap = self._generate_min_capability(358                            req, matched_cap, location_name359                        )360                        if not existing_location:361                            existing_location = location_name362                        predefined_caps[req_index] = min_cap363                        found_or_skipped = True364                    else:365                        # if not found any, skip and try next location366                        break367                if found_or_skipped:368                    # if found all, skip other locations369                    break370            if found_or_skipped:371                for location_name in locations:372                    # in each location, all node must be found373                    # fill them as None and check after met capability374                    found_capabilities: List[Any] = list(predefined_caps)375                    # skip unmatched location376                    if existing_location and existing_location != location_name:377                        continue378                    estimated_cost: float = 0379                    location_caps = self.get_eligible_vm_sizes(location_name, log)380                    for req_index, req in enumerate(nodes_requirement):381                        node_runbook = req.get_extended_runbook(AwsNodeSchema, AWS)382                        image = ec2_resource.Image(node_runbook.get_image_id())383                        for aws_cap in location_caps:384                            if found_capabilities[req_index]:385                                # found, so skipped386                                break387                            # Check if the instance type is on the same architecture388                            # as the image.389                            processor_info = aws_cap.resource_sku["ProcessorInfo"]390                            supported_archs = processor_info["SupportedArchitectures"]391                            if image.architecture != supported_archs[0]:392                                continue393                            check_result = req.check(aws_cap.capability)394                            if check_result.result:395                                min_cap = self._generate_min_capability(396                                    req, aws_cap, aws_cap.location397                                )398                                estimated_cost += aws_cap.estimated_cost399                                found_capabilities[req_index] = min_cap400                        if all(x for x in found_capabilities):401                            break402                    if all(x for x in found_capabilities):403                        # all found and replace current requirement404                        environment.runbook.nodes_requirement = found_capabilities405                        environment.cost = estimated_cost + predefined_cost406                        is_success = True407                        log.debug(408                            f"requirement meet, "409                            f"cost: {environment.cost}, "410                            f"cap: {environment.runbook.nodes_requirement}"411                        )412                        break413        return is_success414    def _deploy_environment(self, environment: Environment, log: Logger) -> None:415        assert self._ec2_client416        assert self._aws_runbook417        environment_context = get_environment_context(environment=environment)418        normalized_run_name = constants.NORMALIZE_PATTERN.sub("_", constants.RUN_NAME)419        if self._aws_runbook.security_group_name:420            security_group_name = self._aws_runbook.security_group_name421        else:422            security_group_name = f"{normalized_run_name}__sec_group"423        if self._aws_runbook.key_pair_name:424            key_pair_name = self._aws_runbook.key_pair_name425        else:426            key_pair_name = f"{normalized_run_name}_keypair"427        environment_context.security_group_name = security_group_name428        environment_context.key_pair_name = key_pair_name429        if self._aws_runbook.dry_run:430            log.info(f"dry_run: {self._aws_runbook.dry_run}")431        else:432            try:433                if self._aws_runbook.deploy:434                    log.info(435                        f"creating or updating security group: [{security_group_name}]"436                    )437                    self._security_group = self._check_or_create_security_group(438                        security_group_name=security_group_name,439                        group_description="Lisa security group for testing.",440                    )441                    environment_context.security_group_is_created = True442                    environment_context.security_group_id = self._security_group.id443                    if self.runbook.admin_private_key_file:444                        self._key_pair = self._create_key_pair(445                            key_pair_name, self.runbook.admin_private_key_file446                        )447                else:448                    log.info(449                        f"reusing security group: [{security_group_name}]"450                        f" and key pair: [{key_pair_name}]"451                    )452                deployment_parameters = self._create_deployment_parameters(453                    security_group_name, environment, log454                )455                instances = {}456                if self._aws_runbook.deploy:457                    instances = self._deploy(deployment_parameters, log)458                # Even skipped deploy, try best to initialize nodes459                self._initialize_nodes(environment, instances, log)460            except Exception as identifier:461                self._delete_environment(environment, log)462                raise identifier463    def _create_deployment_parameters(464        self, security_group_name: str, environment: Environment, log: Logger465    ) -> AwsDeployParameter:466        assert environment.runbook, "env data cannot be None"467        assert environment.runbook.nodes_requirement, "node requirement cannot be None"468        log.debug("creating deployment")469        # construct parameters470        aws_parameters = AwsDeployParameter()471        environment_context = get_environment_context(environment=environment)472        aws_parameters.key_pair_name = environment_context.key_pair_name473        aws_parameters.security_group_name = environment_context.security_group_name474        aws_parameters.security_group_id = environment_context.security_group_id475        nodes_parameters: List[AwsNodeSchema] = []476        for node_space in environment.runbook.nodes_requirement:477            assert isinstance(478                node_space, schema.NodeSpace479            ), f"actual: {type(node_space)}"480            aws_node_runbook = node_space.get_extended_runbook(481                AwsNodeSchema, type_name=AWS482            )483            # init node484            node = environment.create_node_from_requirement(485                node_space,486            )487            aws_node_runbook = self._create_node_runbook(488                len(nodes_parameters),489                node_space,490                log,491            )492            # save parsed runbook back, for example, the version of marketplace may be493            # parsed from latest to a specified version.494            node.capability.set_extended_runbook(aws_node_runbook)495            nodes_parameters.append(aws_node_runbook)496            # Set data disk array497            aws_parameters.data_disks = self._generate_data_disks(498                node, aws_node_runbook499            )500            if not aws_parameters.location:501                # take first one's location502                aws_parameters.location = aws_node_runbook.location503            # save vm's information into node504            node_context = get_node_context(node)505            # vm's name, use to find it from aws506            node_context.vm_name = aws_node_runbook.name507            # ssh related information will be filled back once vm is created508            node_context.username = self.runbook.admin_username509            node_context.private_key_file = self.runbook.admin_private_key_file510            log.info(f"vm setting: {aws_node_runbook}")511        aws_parameters.nodes = nodes_parameters512        # In Azure, each VM should have only one nic in one subnet. So calculate513        # the max nic count, and set to subnet count.514        aws_parameters.subnet_count = max(x.nic_count for x in aws_parameters.nodes)515        # composite deployment properties516        parameters = aws_parameters.to_dict()  # type:ignore517        parameters = {k: {"value": v} for k, v in parameters.items()}518        log.debug(f"parameters: {parameters}")519        return aws_parameters520    def _create_node_runbook(521        self,522        index: int,523        node_space: schema.NodeSpace,524        log: Logger,525    ) -> AwsNodeSchema:526        aws_node_runbook = node_space.get_extended_runbook(AwsNodeSchema, type_name=AWS)527        if not aws_node_runbook.name:528            aws_node_runbook.name = f"node-{index}"529        if not aws_node_runbook.vm_size:530            raise LisaException("vm_size is not detected before deploy")531        if not aws_node_runbook.location:532            raise LisaException("location is not detected before deploy")533        if not aws_node_runbook.marketplace:534            # set to default marketplace, if nothing specified535            aws_node_runbook.marketplace = AwsVmMarketplaceSchema()536        # Set disk type537        assert node_space.disk, "node space must have disk defined."538        assert isinstance(node_space.disk.disk_type, schema.DiskType)539        aws_node_runbook.disk_type = features.get_aws_disk_type(540            node_space.disk.disk_type541        )542        aws_node_runbook.data_disk_caching_type = node_space.disk.data_disk_caching_type543        assert isinstance(544            node_space.disk.data_disk_iops, int545        ), f"actual: {type(node_space.disk.data_disk_iops)}"546        aws_node_runbook.data_disk_iops = node_space.disk.data_disk_iops547        assert isinstance(548            node_space.disk.data_disk_size, int549        ), f"actual: {type(node_space.disk.data_disk_size)}"550        aws_node_runbook.data_disk_size = node_space.disk.data_disk_size551        assert node_space.network_interface552        assert isinstance(553            node_space.network_interface.nic_count, int554        ), f"actual: {node_space.network_interface.nic_count}"555        aws_node_runbook.nic_count = node_space.network_interface.nic_count556        assert isinstance(557            node_space.network_interface.data_path, schema.NetworkDataPath558        ), f"actual: {type(node_space.network_interface.data_path)}"559        if node_space.network_interface.data_path == schema.NetworkDataPath.Sriov:560            aws_node_runbook.enable_sriov = True561        return aws_node_runbook562    def _deploy(563        self, deployment_parameters: AwsDeployParameter, log: Logger564    ) -> Dict[str, Any]:565        ec2_resource = boto3.resource("ec2")566        instances = {}567        subnets = self._create_subnets(self._vpc.id, deployment_parameters, log)568        block_device_mappings = self._create_block_devices(deployment_parameters, log)569        for node in deployment_parameters.nodes:570            network_interfaces = self._create_network_interfaces(571                deployment_parameters, node, subnets, log572            )573            try:574                instance = ec2_resource.create_instances(575                    ImageId=node.get_image_id(),576                    InstanceType=cast(InstanceTypeType, node.vm_size),577                    NetworkInterfaces=network_interfaces,578                    BlockDeviceMappings=block_device_mappings,579                    KeyName=deployment_parameters.key_pair_name,580                    MinCount=1,581                    MaxCount=1,582                )[0]583                instance.wait_until_running()584                instance.load()585                log.info("Created instance %s.", instance.id)586                # Enable ENA support if the test case requires.587                # Don't support the Intel 82599 Virtual Function (VF) interface now.588                # Refer to the document about AWS Enhanced networking on Linux.589                if node.enable_sriov and (not instance.ena_support):590                    self._ec2_client.modify_instance_attribute(591                        InstanceId=instance.id,592                        EnaSupport={593                            "Value": True,594                        },595                    )596                instances[node.name] = instance.instance_id597            except ClientError:598                log.exception(599                    "Couldn't create instance with image %s, "600                    "instance type %s, and key %s.",601                    node.get_image_id(),602                    node.vm_size,603                    deployment_parameters.key_pair_name,604                )605                raise606        return instances607    def _delete_environment(self, environment: Environment, log: Logger) -> None:608        environment_context = get_environment_context(environment=environment)609        security_group_name = environment_context.security_group_name610        # the resource group name is empty when it is not deployed for some reasons,611        # like capability doesn't meet case requirement.612        if not security_group_name:613            return614        assert self._aws_runbook615        if not environment_context.security_group_is_created:616            log.info(617                f"skipped to delete security resource group: {security_group_name}, "618                f"as it's not created by this run."619            )620        elif self._aws_runbook.dry_run:621            log.info(622                f"skipped to delete security resource group: {security_group_name}, "623                f"as it's a dry run."624            )625        else:626            ec2_resource = boto3.resource("ec2")627            for node in environment.nodes.list():628                node_context = get_node_context(node)629                instance_id = node_context.instance_id630                self.terminate_instance(ec2_resource, instance_id, log)631            self.delete_security_group(632                ec2_resource,633                environment_context.security_group_id,634                environment_context.security_group_name,635                log,636            )637            self.delete_key_pair(ec2_resource, environment_context.key_pair_name, log)638            try:639                log.info(f"deleting vpc: {self._vpc.id}")640                for association in self._route_table.associations:641                    association.delete()642                self._route_table.delete()643                self._internet_gateway.detach_from_vpc(VpcId=self._vpc.id)644                self._internet_gateway.delete()645                for subnet in self._vpc.subnets.all():646                    subnet.delete()647                self._vpc.delete()648            except ClientError:649                log.exception(650                    "Couldn't delete vpc %s.",651                    self._vpc.id,652                )653                raise654    def terminate_instance(655        self, ec2_resource: Any, instance_id: str, log: Logger656    ) -> None:657        if not instance_id:658            return659        try:660            instance = ec2_resource.Instance(instance_id)661            instance.terminate()662            instance.wait_until_terminated()663            log.info("Terminating instance %s.", instance_id)664        except ClientError:665            log.exception("Couldn't terminate instance %s.", instance_id)666    def delete_security_group(667        self, ec2_resource: Any, group_id: str, security_group_name: str, log: Logger668    ) -> None:669        try:670            ec2_resource.SecurityGroup(group_id).delete()671            log.info("Deleting security group: %s.", security_group_name)672        except ClientError:673            log.exception(674                "Couldn't delete security group %s.",675                security_group_name,676            )677    def delete_key_pair(self, ec2_resource: Any, key_name: str, log: Logger) -> None:678        try:679            ec2_resource.KeyPair(key_name).delete()680            log.info("Deleted key pair %s.", key_name)681        except ClientError:682            log.exception("Couldn't delete key pair %s.", key_name)683    def _create_subnets(684        self, vpc_id: str, deployment_parameters: AwsDeployParameter, log: Logger685    ) -> Dict[int, Any]:686        subnets: Dict[int, Any] = {}687        try:688            addrs = self._vpc.cidr_block.split(".")689            for i in range(deployment_parameters.subnet_count):690                cidr_block = f"{addrs[0]}.{addrs[1]}.{str(i)}.0/24"691                subnets[i] = self._ec2_client.create_subnet(692                    CidrBlock=cidr_block,693                    VpcId=vpc_id,694                )695                self._route_table.associate_with_subnet(696                    SubnetId=subnets[i]["Subnet"]["SubnetId"]697                )698        except ClientError:699            log.exception("Could not create a custom subnet.")700            raise701        else:702            return subnets703    def _create_network_interfaces(704        self,705        deployment_parameters: AwsDeployParameter,706        node: AwsNodeSchema,707        subnets: Dict[int, Any],708        log: Logger,709    ) -> List[Any]:710        network_interfaces = [711            {712                "Description": f"{node.name}-extra-0",713                "AssociatePublicIpAddress": True,714                "SubnetId": subnets[0]["Subnet"]["SubnetId"],715                "DeviceIndex": 0,716                "Groups": [deployment_parameters.security_group_id],717            }718        ]719        for i in range(1, node.nic_count):720            network_interfaces.append(721                {722                    "Description": f"{node.name}-extra-{i}",723                    "AssociatePublicIpAddress": False,724                    "SubnetId": subnets[i]["Subnet"]["SubnetId"],725                    "DeviceIndex": i,726                    "Groups": [deployment_parameters.security_group_id],727                }728            )729        return network_interfaces730    def _create_block_devices(731        self,732        deployment_parameters: AwsDeployParameter,733        log: Logger,734    ) -> List[Any]:735        # There are some instance volume limits, please refer to736        # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html#linux-specific-volume-limits737        block_device_mappings = []738        volumes = self._get_available_volumes(deployment_parameters)739        for idx, disk in enumerate(deployment_parameters.data_disks):740            if (741                disk.create_option742                == DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_EMPTY743            ):744                if idx >= len(volumes):745                    raise LisaException(746                        f"No device names available "747                        f"for {len(deployment_parameters.data_disks)} disks!",748                    )749                block_device_mappings.append(750                    {751                        "DeviceName": volumes[idx],752                        "Ebs": {753                            "DeleteOnTermination": True,754                            "VolumeSize": disk.size,755                            "VolumeType": disk.type,756                            "Iops": disk.iops,757                        },758                    }759                )760        return block_device_mappings761    def _get_available_volumes(762        self, deployment_parameters: AwsDeployParameter763    ) -> List[str]:764        # In current implementation, all nodes use the same image.765        image_id = deployment_parameters.nodes[0].get_image_id()766        virtualization_type = boto3.resource("ec2").Image(image_id).virtualization_type767        volumes: List[str] = []768        # Create the available volume names based on virtualization type.769        # Refer to the following link770        # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/device_naming.html771        if virtualization_type == "hvm":772            for c in range(ord("b"), ord("c") + 1):773                for p in range(ord("a"), ord("z") + 1):774                    volumes.append(f"/dev/xvd{chr(c)}{chr(p)}")775        elif virtualization_type == "paravirtual":776            for c in range(ord("f"), ord("p") + 1):777                for p in range(1, 7):778                    volumes.append(f"/dev/sd{chr(c)}{p}")779        else:780            raise LisaException(781                f"The virtualization type {virtualization_type} is not supported now."782            )783        return volumes784    def _initialize_nodes(785        self, environment: Environment, instances: Dict[str, Any], log: Logger786    ) -> None:787        ec2_resource = boto3.resource("ec2")788        node_context_map: Dict[str, Node] = {}789        for node in environment.nodes.list():790            node_context = get_node_context(node)791            node_context.instance_id = instances[node_context.vm_name]792            node_context_map[node_context.vm_name] = node793        for vm_name, node in node_context_map.items():794            node_context = get_node_context(node)795            vm = ec2_resource.Instance(node_context.instance_id)796            if not vm:797                raise LisaException(798                    f"cannot find vm: '{vm_name}', make sure deployment is correct."799                )800            public_ip = vm.public_ip_address801            assert public_ip, "public IP address cannot be empty!"802            if not node.name:803                node.name = vm_name804            assert isinstance(node, RemoteNode)805            node.set_connection_info(806                address=vm.private_ip_address,807                port=22,808                public_address=public_ip,809                public_port=22,810                username=node_context.username,811                password=node_context.password,812                private_key_file=node_context.private_key_file,813            )814    @retry(tries=10, delay=1, jitter=(0.5, 1))815    def _load_location_info_from_file(816        self, cached_file_name: Path, log: Logger817    ) -> Optional[AwsLocation]:818        loaded_obj: Optional[AwsLocation] = None819        if cached_file_name.exists():820            try:821                with open(cached_file_name, "r") as f:822                    loaded_data: Dict[str, Any] = json.load(f)823                loaded_obj = schema.load_by_type(AwsLocation, loaded_data)824            except Exception as identifier:825                # if schema changed, There may be exception, remove cache and retry826                # Note: retry on this method depends on decorator827                log.debug(828                    f"error on loading cache, delete cache and retry. {identifier}"829                )830                cached_file_name.unlink()831                raise identifier832        return loaded_obj833    def _get_location_info(self, location: str, log: Logger) -> AwsLocation:834        cached_file_name = constants.CACHE_PATH.joinpath(835            f"aws_locations_{location}.json"836        )837        should_refresh: bool = True838        key = location839        location_data = self._locations_data_cache.get(key, None)840        if not location_data:841            location_data = self._load_location_info_from_file(842                cached_file_name=cached_file_name, log=log843            )844        if location_data:845            delta = datetime.now() - location_data.updated_time846            # refresh cached locations every 1 day.847            if delta.days < 1:848                should_refresh = False849                log.debug(850                    f"{key}: cache used: {location_data.updated_time}, "851                    f"sku count: {len(location_data.capabilities)}"852                )853            else:854                log.debug(855                    f"{key}: cache timeout: {location_data.updated_time},"856                    f"sku count: {len(location_data.capabilities)}"857                )858        else:859            log.debug(f"{key}: no cache found")860        if should_refresh:861            ec2_region = boto3.client("ec2", region_name=location)862            log.debug(f"{key}: querying")863            all_skus: List[AwsCapability] = []864            instance_types = ec2_region.describe_instance_types()865            for instance_type in instance_types["InstanceTypes"]:866                capability = self._instance_type_to_capability(location, instance_type)867                # estimate vm cost for priority868                assert isinstance(capability.core_count, int)869                assert isinstance(capability.gpu_count, int)870                estimated_cost = capability.core_count + capability.gpu_count * 100871                aws_capability = AwsCapability(872                    location=location,873                    vm_size=instance_type["InstanceType"],874                    capability=capability,875                    resource_sku=instance_type,876                    estimated_cost=estimated_cost,877                )878                all_skus.append(aws_capability)879            location_data = AwsLocation(location=location, capabilities=all_skus)880            log.debug(f"{location}: saving to disk")881            with open(cached_file_name, "w") as f:882                json.dump(location_data.to_dict(), f)  # type: ignore883            log.debug(f"{key}: new data, " f"sku: {len(location_data.capabilities)}")884        assert location_data885        self._locations_data_cache[key] = location_data886        return location_data887    def _instance_type_to_capability(  # noqa: C901888        self, location: str, instance_type: Any889    ) -> schema.NodeSpace:890        # fill in default values, in case no capability meet.891        node_space = schema.NodeSpace(892            node_count=1,893            core_count=0,894            memory_mb=0,895            gpu_count=0,896        )897        instancetype_name: str = instance_type["InstanceType"]898        node_space.name = f"{location}_{instancetype_name}"899        node_space.features = search_space.SetSpace[schema.FeatureSettings](900            is_allow_set=True901        )902        node_space.disk = features.AwsDiskOptionSettings()903        node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](904            is_allow_set=True, items=[]905        )906        node_space.disk.data_disk_iops = search_space.IntRange(min=0)907        node_space.disk.data_disk_size = search_space.IntRange(min=0)908        node_space.network_interface = schema.NetworkInterfaceOptionSettings()909        node_space.network_interface.data_path = search_space.SetSpace[910            schema.NetworkDataPath911        ](is_allow_set=True, items=[])912        for name, value in instance_type.items():913            if name == "VCpuInfo":914                node_space.core_count = int(value["DefaultVCpus"])915            elif name == "MemoryInfo":916                node_space.memory_mb = int(value["SizeInMiB"])917            elif name == "NetworkInfo":918                nic_count = value["MaximumNetworkInterfaces"]919                node_space.network_interface.nic_count = search_space.IntRange(920                    min=1, max=nic_count921                )922                node_space.network_interface.max_nic_count = nic_count923                if value["EnaSupport"] == "supported":924                    node_space.network_interface.data_path.add(925                        schema.NetworkDataPath.Sriov926                    )927            elif name == "GpuInfo":928                for gpu in value["Gpus"]:929                    node_space.gpu_count += gpu["Count"]930                # update features list if gpu feature is supported931                node_space.features.add(932                    schema.FeatureSettings.create(features.Gpu.name())933                )934        # all nodes support following features935        node_space.features.update(936            [937                schema.FeatureSettings.create(features.StartStop.name()),938                schema.FeatureSettings.create(features.SerialConsole.name()),939            ]940        )941        node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)942        node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)943        node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)944        node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)945        return node_space946    def get_eligible_vm_sizes(self, location: str, log: Logger) -> List[AwsCapability]:947        # load eligible vm sizes948        # 1. vm size supported in current location949        # 2. vm size match predefined pattern950        location_capabilities: List[AwsCapability] = []951        key = self._get_location_key(location)952        if key not in self._eligible_capabilities:953            location_info: AwsLocation = self._get_location_info(location, log)954            # loop all fall back levels955            for fallback_pattern in VM_SIZE_FALLBACK_PATTERNS:956                level_capabilities: List[AwsCapability] = []957                # loop all capabilities958                for aws_capability in location_info.capabilities:959                    # exclude one core which may be too slow to work in some distro960                    assert isinstance(aws_capability.capability.core_count, int)961                    if (962                        fallback_pattern.match(aws_capability.vm_size)963                        and aws_capability.capability.core_count > 1964                    ):965                        level_capabilities.append(aws_capability)966                # sort by rough cost967                level_capabilities.sort(key=lambda x: (x.estimated_cost))968                log.debug(969                    f"{key}, pattern '{fallback_pattern.pattern}'"970                    f" {len(level_capabilities)} candidates: "971                    f"{[x.vm_size for x in level_capabilities]}"972                )973                location_capabilities.extend(level_capabilities)974            self._eligible_capabilities[key] = location_capabilities975        return self._eligible_capabilities[key]976    def _get_location_key(self, location: str) -> str:977        return f"lisa_aws_{location}"978    def _generate_min_capability(979        self,980        requirement: schema.NodeSpace,981        aws_capability: AwsCapability,982        location: str,983    ) -> schema.NodeSpace:984        min_cap: schema.NodeSpace = requirement.generate_min_capability(985            aws_capability.capability986        )987        # Apply aws specified values.988        aws_node_runbook = min_cap.get_extended_runbook(AwsNodeSchema, AWS)989        if aws_node_runbook.location:990            assert aws_node_runbook.location == location, (991                f"predefined location [{aws_node_runbook.location}] "992                f"must be same as "993                f"cap location [{location}]"994            )995        # the location may not be set996        aws_node_runbook.location = location997        aws_node_runbook.vm_size = aws_capability.vm_size998        assert min_cap.network_interface999        assert isinstance(1000            min_cap.network_interface.nic_count, int1001        ), f"actual: {min_cap.network_interface.nic_count}"1002        aws_node_runbook.nic_count = min_cap.network_interface.nic_count...test_prepare.py
Source:test_prepare.py  
...340        if node_req_count > 0:341            runbook._original_nodes_requirement = []342            for _ in range(node_req_count):343                node_req = schema.NodeSpace()344                _ = node_req.get_extended_runbook(common.AzureNodeSchema, AZURE)345                runbook._original_nodes_requirement.append(node_req)346        environment = Environment(347            is_predefined=True, warn_as_error=False, id_=0, runbook=runbook348        )349        return environment350    def set_node_runbook(351        self,352        environment: Environment,353        index: int,354        location: str = "",355        vm_size: str = "",356        max_capability: bool = False,357    ) -> None:358        assert environment.runbook._original_nodes_requirement359        node_runbook = environment.runbook._original_nodes_requirement[360            index361        ].get_extended_runbook(common.AzureNodeSchema, AZURE)362        node_runbook.location = location363        node_runbook.vm_size = vm_size364        node_runbook.maximize_capability = max_capability365    def verify_prepared_nodes(366        self,367        expected_result: bool,368        expected_locations: List[str],369        expected_vm_sizes: List[str],370        expected_cost: int,371        environment: Environment,372    ) -> None:373        actual_result = self._platform._prepare_environment(environment, self._log)374        self.assertEqual(expected_result, actual_result)375        if expected_locations:376            assert environment.runbook.nodes_requirement377            # get node runbook for validating378            nodes_runbook = [379                x.get_extended_runbook(common.AzureNodeSchema, AZURE)380                for x in environment.runbook.nodes_requirement381            ]382            self.assertListEqual(383                expected_locations,384                [x.location for x in nodes_runbook],385            )386            self.assertListEqual(387                expected_vm_sizes,388                [x.vm_size for x in nodes_runbook],389            )390            # all cap values must be covered to specified int value, not space391            for node_cap in environment.runbook.nodes_requirement:392                assert node_cap393                assert node_cap.disk...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
