How to use _convert_to_aws_node_space method in lisa

Best Python code snippet using lisa_python

platform_.py

Source:platform_.py Github

copy

Full Screen

...75 estimated_cost: int76 resource_sku: InstanceTypeInfoTypeDef77 def __post_init__(self, *args: Any, **kwargs: Any) -> None:78 # reload features settings with platform specified types.79 _convert_to_aws_node_space(self.capability)80@dataclass_json()81@dataclass82class AwsLocation:83 updated_time: datetime = field(84 default_factory=datetime.now,85 metadata=field_metadata(86 fields.DateTime,87 encoder=datetime.isoformat,88 decoder=datetime.fromisoformat,89 format="iso",90 ),91 )92 location: str = ""93 capabilities: List[AwsCapability] = field(default_factory=list)94@dataclass_json()95@dataclass96class AwsPlatformSchema:97 aws_access_key_id: str = field(default="")98 aws_secret_access_key: str = field(default="")99 aws_session_token: str = field(default="")100 aws_default_region: str = field(default="")101 security_group_name: str = field(default="")102 key_pair_name: str = field(default="")103 locations: Optional[Union[str, List[str]]] = field(default=None)104 log_level: str = field(105 default=logging.getLevelName(logging.WARN),106 metadata=field_metadata(107 validate=validate.OneOf(108 [109 logging.getLevelName(logging.ERROR),110 logging.getLevelName(logging.WARN),111 logging.getLevelName(logging.INFO),112 logging.getLevelName(logging.DEBUG),113 ]114 ),115 ),116 )117 # do actual deployment, or pass through for troubleshooting118 dry_run: bool = False119 # do actual deployment, or try to retrieve existing vms120 deploy: bool = True121 # wait resource deleted or not122 wait_delete: bool = False123 def __post_init__(self, *args: Any, **kwargs: Any) -> None:124 strip_strs(125 self,126 [127 "aws_access_key_id",128 "aws_secret_access_key",129 "aws_session_token",130 "aws_default_region",131 "security_group_name",132 "key_pair_name",133 "locations",134 "log_level",135 ],136 )137 if self.aws_access_key_id:138 add_secret(self.aws_access_key_id)139 if self.aws_secret_access_key:140 add_secret(self.aws_secret_access_key)141 if self.aws_session_token:142 add_secret(self.aws_session_token)143 if not self.locations:144 self.locations = LOCATIONS145class AwsPlatform(Platform):146 _locations_data_cache: Dict[str, AwsLocation] = {}147 _eligible_capabilities: Dict[str, List[AwsCapability]] = {}148 def __init__(self, runbook: schema.Platform) -> None:149 super().__init__(runbook=runbook)150 @classmethod151 def type_name(cls) -> str:152 return AWS153 @classmethod154 def supported_features(cls) -> List[Type[feature.Feature]]:155 return [156 features.Gpu,157 features.SerialConsole,158 features.StartStop,159 features.NetworkInterface,160 features.Disk,161 ]162 def _initialize(self, *args: Any, **kwargs: Any) -> None:163 # set needed environment variables for authentication164 aws_runbook: AwsPlatformSchema = self.runbook.get_extended_runbook(165 AwsPlatformSchema166 )167 assert aws_runbook, "platform runbook cannot be empty"168 self._aws_runbook = aws_runbook169 self._initialize_credential()170 # boto3 client is thread safe171 self._ec2_client = boto3.client("ec2")172 def _initialize_credential(self) -> None:173 aws_runbook = self._aws_runbook174 if aws_runbook.aws_access_key_id:175 os.environ["AWS_ACCESS_KEY_ID"] = aws_runbook.aws_access_key_id176 if aws_runbook.aws_secret_access_key:177 os.environ["AWS_SECRET_ACCESS_KEY"] = aws_runbook.aws_secret_access_key178 if aws_runbook.aws_session_token:179 os.environ["AWS_SESSION_TOKEN"] = aws_runbook.aws_session_token180 if aws_runbook.aws_default_region:181 os.environ["AWS_DEFAULT_REGION"] = aws_runbook.aws_default_region182 def _create_key_pair(self, key_name: str, private_key_file: str) -> Any:183 try:184 ec2_resource = boto3.resource("ec2")185 key_pair = ec2_resource.import_key_pair(186 KeyName=key_name,187 PublicKeyMaterial=get_public_key_data(private_key_file),188 )189 self._log.info("Created key %s.", key_pair.name)190 except ClientError:191 self._log.error("Couldn't create key %s.", key_name)192 raise193 else:194 return key_pair195 def _check_or_create_security_group( # noqa: C901196 self, security_group_name: str, group_description: str197 ) -> Any:198 try:199 ec2_resource = boto3.resource("ec2")200 # By default, AWS users can create up to 5 VPCs201 for i in range(50, 55):202 cidr_block = "173." + str(i) + ".0.0/16"203 vpcs = list(204 ec2_resource.vpcs.filter(205 Filters=[{"Name": "cidr", "Values": [cidr_block]}]206 )207 )208 if len(vpcs) == 0:209 self._vpc = ec2_resource.create_vpc(CidrBlock=cidr_block)210 self._log.info(211 f"Create a new VPC: {self._vpc.id}"212 f"with CIDR block {self._vpc.cidr_block}"213 )214 self._internet_gateway = ec2_resource.create_internet_gateway()215 self._vpc.attach_internet_gateway(216 InternetGatewayId=self._internet_gateway.id217 )218 self._route_table = ec2_resource.create_route_table(219 VpcId=self._vpc.id220 )221 self._route_table.create_route(222 DestinationCidrBlock="0.0.0.0/0",223 GatewayId=self._internet_gateway.id,224 )225 self._log.info(226 "Create an internet gateway: %s and a route table %s",227 self._internet_gateway.id,228 self._route_table.id,229 )230 break231 if self._vpc is None:232 raise LisaException(233 "Couldn't get/create VPCs as there are 5 exiting VPCs."234 "Please wait for others finishing test."235 )236 except ClientError:237 self._log.exception("Couldn't get/create VPCs.")238 raise239 try:240 security_group = self._vpc.create_security_group(241 GroupName=security_group_name, Description=group_description242 )243 self._log.info(244 "Created security group %s in VPC %s.",245 security_group_name,246 self._vpc.id,247 )248 except ClientError:249 self._log.exception(250 "Couldn't create security group %s.", security_group_name251 )252 raise253 try:254 ip_permissions: List[IpPermissionTypeDef] = [255 {256 # SSH ingress open to anyone257 "IpProtocol": "tcp",258 "FromPort": 22,259 "ToPort": 22,260 "IpRanges": [{"CidrIp": "0.0.0.0/0"}],261 },262 {263 # Open to ips in the vpc264 "IpProtocol": "-1",265 "FromPort": -1,266 "ToPort": -1,267 "IpRanges": [{"CidrIp": self._vpc.cidr_block}],268 },269 ]270 security_group.authorize_ingress(IpPermissions=ip_permissions)271 self._log.info("Set inbound rules for %s to allow SSH.", security_group.id)272 except ClientError:273 self._log.exception(274 "couldn't authorize inbound rules for %s.", security_group_name275 )276 raise277 else:278 return security_group279 def _prepare_environment( # noqa: C901280 self, environment: Environment, log: Logger281 ) -> bool:282 # TODO: Reduce this function's complexity and remove the disabled warning.283 """284 Main flow285 1. load location, vm size patterns firstly.286 2. load available vm sizes for each location.287 3. match vm sizes by pattern.288 for each environment289 1. If predefined location exists on node level, check conflict and use it.290 2. If predefined vm size exists on node level, check exists and use it.291 3. check capability for each node by order of pattern.292 4. get min capability for each match293 """294 is_success: bool = True295 ec2_resource = boto3.resource("ec2")296 if environment.runbook.nodes_requirement:297 is_success = False298 nodes_requirement = environment.runbook.nodes_requirement299 node_count = len(nodes_requirement)300 # fills predefined locations here.301 predefined_caps: List[Any] = [None] * node_count302 # make sure all vms are in same location.303 existing_location: str = ""304 predefined_cost: float = 0305 for req in nodes_requirement:306 # covert to aws node space, so the aws extensions can be loaded.307 _convert_to_aws_node_space(req)308 # check locations309 # apply aws specified values310 node_runbook: AwsNodeSchema = req.get_extended_runbook(311 AwsNodeSchema, AWS312 )313 if node_runbook.location:314 if existing_location:315 # if any one has different location, calculate again316 if existing_location != node_runbook.location:317 raise LisaException(318 f"predefined node must be in same location, "319 f"previous: {existing_location}, "320 f"found: {node_runbook.location}"321 )322 else:323 existing_location = node_runbook.location324 if existing_location:325 locations = [existing_location]326 else:327 locations = LOCATIONS328 # check eligible locations329 found_or_skipped = False330 for location_name in locations:331 predefined_cost = 0332 predefined_caps = [None] * node_count333 for req_index, req in enumerate(nodes_requirement):334 found_or_skipped = False335 node_runbook = req.get_extended_runbook(AwsNodeSchema, AWS)336 if not node_runbook.vm_size:337 # not to check, if no vm_size set338 found_or_skipped = True339 continue340 # find predefined vm size on all available's.341 location_info: AwsLocation = self._get_location_info(342 location_name, log343 )344 matched_score: float = 0345 matched_cap: Optional[AwsCapability] = None346 matcher = SequenceMatcher(None, node_runbook.vm_size.lower(), "")347 for aws_cap in location_info.capabilities:348 matcher.set_seq2(aws_cap.vm_size.lower())349 if (350 node_runbook.vm_size.lower() in aws_cap.vm_size.lower()351 and matched_score < matcher.ratio()352 ):353 matched_cap = aws_cap354 matched_score = matcher.ratio()355 if matched_cap:356 predefined_cost += matched_cap.estimated_cost357 min_cap = self._generate_min_capability(358 req, matched_cap, location_name359 )360 if not existing_location:361 existing_location = location_name362 predefined_caps[req_index] = min_cap363 found_or_skipped = True364 else:365 # if not found any, skip and try next location366 break367 if found_or_skipped:368 # if found all, skip other locations369 break370 if found_or_skipped:371 for location_name in locations:372 # in each location, all node must be found373 # fill them as None and check after met capability374 found_capabilities: List[Any] = list(predefined_caps)375 # skip unmatched location376 if existing_location and existing_location != location_name:377 continue378 estimated_cost: float = 0379 location_caps = self.get_eligible_vm_sizes(location_name, log)380 for req_index, req in enumerate(nodes_requirement):381 node_runbook = req.get_extended_runbook(AwsNodeSchema, AWS)382 image = ec2_resource.Image(node_runbook.get_image_id())383 for aws_cap in location_caps:384 if found_capabilities[req_index]:385 # found, so skipped386 break387 # Check if the instance type is on the same architecture388 # as the image.389 processor_info = aws_cap.resource_sku["ProcessorInfo"]390 supported_archs = processor_info["SupportedArchitectures"]391 if image.architecture != supported_archs[0]:392 continue393 check_result = req.check(aws_cap.capability)394 if check_result.result:395 min_cap = self._generate_min_capability(396 req, aws_cap, aws_cap.location397 )398 estimated_cost += aws_cap.estimated_cost399 found_capabilities[req_index] = min_cap400 if all(x for x in found_capabilities):401 break402 if all(x for x in found_capabilities):403 # all found and replace current requirement404 environment.runbook.nodes_requirement = found_capabilities405 environment.cost = estimated_cost + predefined_cost406 is_success = True407 log.debug(408 f"requirement meet, "409 f"cost: {environment.cost}, "410 f"cap: {environment.runbook.nodes_requirement}"411 )412 break413 return is_success414 def _deploy_environment(self, environment: Environment, log: Logger) -> None:415 assert self._ec2_client416 assert self._aws_runbook417 environment_context = get_environment_context(environment=environment)418 normalized_run_name = constants.NORMALIZE_PATTERN.sub("_", constants.RUN_NAME)419 if self._aws_runbook.security_group_name:420 security_group_name = self._aws_runbook.security_group_name421 else:422 security_group_name = f"{normalized_run_name}__sec_group"423 if self._aws_runbook.key_pair_name:424 key_pair_name = self._aws_runbook.key_pair_name425 else:426 key_pair_name = f"{normalized_run_name}_keypair"427 environment_context.security_group_name = security_group_name428 environment_context.key_pair_name = key_pair_name429 if self._aws_runbook.dry_run:430 log.info(f"dry_run: {self._aws_runbook.dry_run}")431 else:432 try:433 if self._aws_runbook.deploy:434 log.info(435 f"creating or updating security group: [{security_group_name}]"436 )437 self._security_group = self._check_or_create_security_group(438 security_group_name=security_group_name,439 group_description="Lisa security group for testing.",440 )441 environment_context.security_group_is_created = True442 environment_context.security_group_id = self._security_group.id443 if self.runbook.admin_private_key_file:444 self._key_pair = self._create_key_pair(445 key_pair_name, self.runbook.admin_private_key_file446 )447 else:448 log.info(449 f"reusing security group: [{security_group_name}]"450 f" and key pair: [{key_pair_name}]"451 )452 deployment_parameters = self._create_deployment_parameters(453 security_group_name, environment, log454 )455 instances = {}456 if self._aws_runbook.deploy:457 instances = self._deploy(deployment_parameters, log)458 # Even skipped deploy, try best to initialize nodes459 self._initialize_nodes(environment, instances, log)460 except Exception as identifier:461 self._delete_environment(environment, log)462 raise identifier463 def _create_deployment_parameters(464 self, security_group_name: str, environment: Environment, log: Logger465 ) -> AwsDeployParameter:466 assert environment.runbook, "env data cannot be None"467 assert environment.runbook.nodes_requirement, "node requirement cannot be None"468 log.debug("creating deployment")469 # construct parameters470 aws_parameters = AwsDeployParameter()471 environment_context = get_environment_context(environment=environment)472 aws_parameters.key_pair_name = environment_context.key_pair_name473 aws_parameters.security_group_name = environment_context.security_group_name474 aws_parameters.security_group_id = environment_context.security_group_id475 nodes_parameters: List[AwsNodeSchema] = []476 for node_space in environment.runbook.nodes_requirement:477 assert isinstance(478 node_space, schema.NodeSpace479 ), f"actual: {type(node_space)}"480 aws_node_runbook = node_space.get_extended_runbook(481 AwsNodeSchema, type_name=AWS482 )483 # init node484 node = environment.create_node_from_requirement(485 node_space,486 )487 aws_node_runbook = self._create_node_runbook(488 len(nodes_parameters),489 node_space,490 log,491 )492 # save parsed runbook back, for example, the version of marketplace may be493 # parsed from latest to a specified version.494 node.capability.set_extended_runbook(aws_node_runbook)495 nodes_parameters.append(aws_node_runbook)496 # Set data disk array497 aws_parameters.data_disks = self._generate_data_disks(498 node, aws_node_runbook499 )500 if not aws_parameters.location:501 # take first one's location502 aws_parameters.location = aws_node_runbook.location503 # save vm's information into node504 node_context = get_node_context(node)505 # vm's name, use to find it from aws506 node_context.vm_name = aws_node_runbook.name507 # ssh related information will be filled back once vm is created508 node_context.username = self.runbook.admin_username509 node_context.private_key_file = self.runbook.admin_private_key_file510 log.info(f"vm setting: {aws_node_runbook}")511 aws_parameters.nodes = nodes_parameters512 # In Azure, each VM should have only one nic in one subnet. So calculate513 # the max nic count, and set to subnet count.514 aws_parameters.subnet_count = max(x.nic_count for x in aws_parameters.nodes)515 # composite deployment properties516 parameters = aws_parameters.to_dict() # type:ignore517 parameters = {k: {"value": v} for k, v in parameters.items()}518 log.debug(f"parameters: {parameters}")519 return aws_parameters520 def _create_node_runbook(521 self,522 index: int,523 node_space: schema.NodeSpace,524 log: Logger,525 ) -> AwsNodeSchema:526 aws_node_runbook = node_space.get_extended_runbook(AwsNodeSchema, type_name=AWS)527 if not aws_node_runbook.name:528 aws_node_runbook.name = f"node-{index}"529 if not aws_node_runbook.vm_size:530 raise LisaException("vm_size is not detected before deploy")531 if not aws_node_runbook.location:532 raise LisaException("location is not detected before deploy")533 if not aws_node_runbook.marketplace:534 # set to default marketplace, if nothing specified535 aws_node_runbook.marketplace = AwsVmMarketplaceSchema()536 # Set disk type537 assert node_space.disk, "node space must have disk defined."538 assert isinstance(node_space.disk.disk_type, schema.DiskType)539 aws_node_runbook.disk_type = features.get_aws_disk_type(540 node_space.disk.disk_type541 )542 aws_node_runbook.data_disk_caching_type = node_space.disk.data_disk_caching_type543 assert isinstance(544 node_space.disk.data_disk_iops, int545 ), f"actual: {type(node_space.disk.data_disk_iops)}"546 aws_node_runbook.data_disk_iops = node_space.disk.data_disk_iops547 assert isinstance(548 node_space.disk.data_disk_size, int549 ), f"actual: {type(node_space.disk.data_disk_size)}"550 aws_node_runbook.data_disk_size = node_space.disk.data_disk_size551 assert node_space.network_interface552 assert isinstance(553 node_space.network_interface.nic_count, int554 ), f"actual: {node_space.network_interface.nic_count}"555 aws_node_runbook.nic_count = node_space.network_interface.nic_count556 assert isinstance(557 node_space.network_interface.data_path, schema.NetworkDataPath558 ), f"actual: {type(node_space.network_interface.data_path)}"559 if node_space.network_interface.data_path == schema.NetworkDataPath.Sriov:560 aws_node_runbook.enable_sriov = True561 return aws_node_runbook562 def _deploy(563 self, deployment_parameters: AwsDeployParameter, log: Logger564 ) -> Dict[str, Any]:565 ec2_resource = boto3.resource("ec2")566 instances = {}567 subnets = self._create_subnets(self._vpc.id, deployment_parameters, log)568 block_device_mappings = self._create_block_devices(deployment_parameters, log)569 for node in deployment_parameters.nodes:570 network_interfaces = self._create_network_interfaces(571 deployment_parameters, node, subnets, log572 )573 try:574 instance = ec2_resource.create_instances(575 ImageId=node.get_image_id(),576 InstanceType=cast(InstanceTypeType, node.vm_size),577 NetworkInterfaces=network_interfaces,578 BlockDeviceMappings=block_device_mappings,579 KeyName=deployment_parameters.key_pair_name,580 MinCount=1,581 MaxCount=1,582 )[0]583 instance.wait_until_running()584 instance.load()585 log.info("Created instance %s.", instance.id)586 # Enable ENA support if the test case requires.587 # Don't support the Intel 82599 Virtual Function (VF) interface now.588 # Refer to the document about AWS Enhanced networking on Linux.589 if node.enable_sriov and (not instance.ena_support):590 self._ec2_client.modify_instance_attribute(591 InstanceId=instance.id,592 EnaSupport={593 "Value": True,594 },595 )596 instances[node.name] = instance.instance_id597 except ClientError:598 log.exception(599 "Couldn't create instance with image %s, "600 "instance type %s, and key %s.",601 node.get_image_id(),602 node.vm_size,603 deployment_parameters.key_pair_name,604 )605 raise606 return instances607 def _delete_environment(self, environment: Environment, log: Logger) -> None:608 environment_context = get_environment_context(environment=environment)609 security_group_name = environment_context.security_group_name610 # the resource group name is empty when it is not deployed for some reasons,611 # like capability doesn't meet case requirement.612 if not security_group_name:613 return614 assert self._aws_runbook615 if not environment_context.security_group_is_created:616 log.info(617 f"skipped to delete security resource group: {security_group_name}, "618 f"as it's not created by this run."619 )620 elif self._aws_runbook.dry_run:621 log.info(622 f"skipped to delete security resource group: {security_group_name}, "623 f"as it's a dry run."624 )625 else:626 ec2_resource = boto3.resource("ec2")627 for node in environment.nodes.list():628 node_context = get_node_context(node)629 instance_id = node_context.instance_id630 self.terminate_instance(ec2_resource, instance_id, log)631 self.delete_security_group(632 ec2_resource,633 environment_context.security_group_id,634 environment_context.security_group_name,635 log,636 )637 self.delete_key_pair(ec2_resource, environment_context.key_pair_name, log)638 try:639 log.info(f"deleting vpc: {self._vpc.id}")640 for association in self._route_table.associations:641 association.delete()642 self._route_table.delete()643 self._internet_gateway.detach_from_vpc(VpcId=self._vpc.id)644 self._internet_gateway.delete()645 for subnet in self._vpc.subnets.all():646 subnet.delete()647 self._vpc.delete()648 except ClientError:649 log.exception(650 "Couldn't delete vpc %s.",651 self._vpc.id,652 )653 raise654 def terminate_instance(655 self, ec2_resource: Any, instance_id: str, log: Logger656 ) -> None:657 if not instance_id:658 return659 try:660 instance = ec2_resource.Instance(instance_id)661 instance.terminate()662 instance.wait_until_terminated()663 log.info("Terminating instance %s.", instance_id)664 except ClientError:665 log.exception("Couldn't terminate instance %s.", instance_id)666 def delete_security_group(667 self, ec2_resource: Any, group_id: str, security_group_name: str, log: Logger668 ) -> None:669 try:670 ec2_resource.SecurityGroup(group_id).delete()671 log.info("Deleting security group: %s.", security_group_name)672 except ClientError:673 log.exception(674 "Couldn't delete security group %s.",675 security_group_name,676 )677 def delete_key_pair(self, ec2_resource: Any, key_name: str, log: Logger) -> None:678 try:679 ec2_resource.KeyPair(key_name).delete()680 log.info("Deleted key pair %s.", key_name)681 except ClientError:682 log.exception("Couldn't delete key pair %s.", key_name)683 def _create_subnets(684 self, vpc_id: str, deployment_parameters: AwsDeployParameter, log: Logger685 ) -> Dict[int, Any]:686 subnets: Dict[int, Any] = {}687 try:688 addrs = self._vpc.cidr_block.split(".")689 for i in range(deployment_parameters.subnet_count):690 cidr_block = f"{addrs[0]}.{addrs[1]}.{str(i)}.0/24"691 subnets[i] = self._ec2_client.create_subnet(692 CidrBlock=cidr_block,693 VpcId=vpc_id,694 )695 self._route_table.associate_with_subnet(696 SubnetId=subnets[i]["Subnet"]["SubnetId"]697 )698 except ClientError:699 log.exception("Could not create a custom subnet.")700 raise701 else:702 return subnets703 def _create_network_interfaces(704 self,705 deployment_parameters: AwsDeployParameter,706 node: AwsNodeSchema,707 subnets: Dict[int, Any],708 log: Logger,709 ) -> List[Any]:710 network_interfaces = [711 {712 "Description": f"{node.name}-extra-0",713 "AssociatePublicIpAddress": True,714 "SubnetId": subnets[0]["Subnet"]["SubnetId"],715 "DeviceIndex": 0,716 "Groups": [deployment_parameters.security_group_id],717 }718 ]719 for i in range(1, node.nic_count):720 network_interfaces.append(721 {722 "Description": f"{node.name}-extra-{i}",723 "AssociatePublicIpAddress": False,724 "SubnetId": subnets[i]["Subnet"]["SubnetId"],725 "DeviceIndex": i,726 "Groups": [deployment_parameters.security_group_id],727 }728 )729 return network_interfaces730 def _create_block_devices(731 self,732 deployment_parameters: AwsDeployParameter,733 log: Logger,734 ) -> List[Any]:735 # There are some instance volume limits, please refer to736 # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html#linux-specific-volume-limits737 block_device_mappings = []738 volumes = self._get_available_volumes(deployment_parameters)739 for idx, disk in enumerate(deployment_parameters.data_disks):740 if (741 disk.create_option742 == DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_EMPTY743 ):744 if idx >= len(volumes):745 raise LisaException(746 f"No device names available "747 f"for {len(deployment_parameters.data_disks)} disks!",748 )749 block_device_mappings.append(750 {751 "DeviceName": volumes[idx],752 "Ebs": {753 "DeleteOnTermination": True,754 "VolumeSize": disk.size,755 "VolumeType": disk.type,756 "Iops": disk.iops,757 },758 }759 )760 return block_device_mappings761 def _get_available_volumes(762 self, deployment_parameters: AwsDeployParameter763 ) -> List[str]:764 # In current implementation, all nodes use the same image.765 image_id = deployment_parameters.nodes[0].get_image_id()766 virtualization_type = boto3.resource("ec2").Image(image_id).virtualization_type767 volumes: List[str] = []768 # Create the available volume names based on virtualization type.769 # Refer to the following link770 # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/device_naming.html771 if virtualization_type == "hvm":772 for c in range(ord("b"), ord("c") + 1):773 for p in range(ord("a"), ord("z") + 1):774 volumes.append(f"/dev/xvd{chr(c)}{chr(p)}")775 elif virtualization_type == "paravirtual":776 for c in range(ord("f"), ord("p") + 1):777 for p in range(1, 7):778 volumes.append(f"/dev/sd{chr(c)}{p}")779 else:780 raise LisaException(781 f"The virtualization type {virtualization_type} is not supported now."782 )783 return volumes784 def _initialize_nodes(785 self, environment: Environment, instances: Dict[str, Any], log: Logger786 ) -> None:787 ec2_resource = boto3.resource("ec2")788 node_context_map: Dict[str, Node] = {}789 for node in environment.nodes.list():790 node_context = get_node_context(node)791 node_context.instance_id = instances[node_context.vm_name]792 node_context_map[node_context.vm_name] = node793 for vm_name, node in node_context_map.items():794 node_context = get_node_context(node)795 vm = ec2_resource.Instance(node_context.instance_id)796 if not vm:797 raise LisaException(798 f"cannot find vm: '{vm_name}', make sure deployment is correct."799 )800 public_ip = vm.public_ip_address801 assert public_ip, "public IP address cannot be empty!"802 if not node.name:803 node.name = vm_name804 assert isinstance(node, RemoteNode)805 node.set_connection_info(806 address=vm.private_ip_address,807 port=22,808 public_address=public_ip,809 public_port=22,810 username=node_context.username,811 password=node_context.password,812 private_key_file=node_context.private_key_file,813 )814 @retry(tries=10, delay=1, jitter=(0.5, 1))815 def _load_location_info_from_file(816 self, cached_file_name: Path, log: Logger817 ) -> Optional[AwsLocation]:818 loaded_obj: Optional[AwsLocation] = None819 if cached_file_name.exists():820 try:821 with open(cached_file_name, "r") as f:822 loaded_data: Dict[str, Any] = json.load(f)823 loaded_obj = schema.load_by_type(AwsLocation, loaded_data)824 except Exception as identifier:825 # if schema changed, There may be exception, remove cache and retry826 # Note: retry on this method depends on decorator827 log.debug(828 f"error on loading cache, delete cache and retry. {identifier}"829 )830 cached_file_name.unlink()831 raise identifier832 return loaded_obj833 def _get_location_info(self, location: str, log: Logger) -> AwsLocation:834 cached_file_name = constants.CACHE_PATH.joinpath(835 f"aws_locations_{location}.json"836 )837 should_refresh: bool = True838 key = location839 location_data = self._locations_data_cache.get(key, None)840 if not location_data:841 location_data = self._load_location_info_from_file(842 cached_file_name=cached_file_name, log=log843 )844 if location_data:845 delta = datetime.now() - location_data.updated_time846 # refresh cached locations every 1 day.847 if delta.days < 1:848 should_refresh = False849 log.debug(850 f"{key}: cache used: {location_data.updated_time}, "851 f"sku count: {len(location_data.capabilities)}"852 )853 else:854 log.debug(855 f"{key}: cache timeout: {location_data.updated_time},"856 f"sku count: {len(location_data.capabilities)}"857 )858 else:859 log.debug(f"{key}: no cache found")860 if should_refresh:861 ec2_region = boto3.client("ec2", region_name=location)862 log.debug(f"{key}: querying")863 all_skus: List[AwsCapability] = []864 instance_types = ec2_region.describe_instance_types()865 for instance_type in instance_types["InstanceTypes"]:866 capability = self._instance_type_to_capability(location, instance_type)867 # estimate vm cost for priority868 assert isinstance(capability.core_count, int)869 assert isinstance(capability.gpu_count, int)870 estimated_cost = capability.core_count + capability.gpu_count * 100871 aws_capability = AwsCapability(872 location=location,873 vm_size=instance_type["InstanceType"],874 capability=capability,875 resource_sku=instance_type,876 estimated_cost=estimated_cost,877 )878 all_skus.append(aws_capability)879 location_data = AwsLocation(location=location, capabilities=all_skus)880 log.debug(f"{location}: saving to disk")881 with open(cached_file_name, "w") as f:882 json.dump(location_data.to_dict(), f) # type: ignore883 log.debug(f"{key}: new data, " f"sku: {len(location_data.capabilities)}")884 assert location_data885 self._locations_data_cache[key] = location_data886 return location_data887 def _instance_type_to_capability( # noqa: C901888 self, location: str, instance_type: Any889 ) -> schema.NodeSpace:890 # fill in default values, in case no capability meet.891 node_space = schema.NodeSpace(892 node_count=1,893 core_count=0,894 memory_mb=0,895 gpu_count=0,896 )897 instancetype_name: str = instance_type["InstanceType"]898 node_space.name = f"{location}_{instancetype_name}"899 node_space.features = search_space.SetSpace[schema.FeatureSettings](900 is_allow_set=True901 )902 node_space.disk = features.AwsDiskOptionSettings()903 node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](904 is_allow_set=True, items=[]905 )906 node_space.disk.data_disk_iops = search_space.IntRange(min=0)907 node_space.disk.data_disk_size = search_space.IntRange(min=0)908 node_space.network_interface = schema.NetworkInterfaceOptionSettings()909 node_space.network_interface.data_path = search_space.SetSpace[910 schema.NetworkDataPath911 ](is_allow_set=True, items=[])912 for name, value in instance_type.items():913 if name == "VCpuInfo":914 node_space.core_count = int(value["DefaultVCpus"])915 elif name == "MemoryInfo":916 node_space.memory_mb = int(value["SizeInMiB"])917 elif name == "NetworkInfo":918 nic_count = value["MaximumNetworkInterfaces"]919 node_space.network_interface.nic_count = search_space.IntRange(920 min=1, max=nic_count921 )922 node_space.network_interface.max_nic_count = nic_count923 if value["EnaSupport"] == "supported":924 node_space.network_interface.data_path.add(925 schema.NetworkDataPath.Sriov926 )927 elif name == "GpuInfo":928 for gpu in value["Gpus"]:929 node_space.gpu_count += gpu["Count"]930 # update features list if gpu feature is supported931 node_space.features.add(932 schema.FeatureSettings.create(features.Gpu.name())933 )934 # all nodes support following features935 node_space.features.update(936 [937 schema.FeatureSettings.create(features.StartStop.name()),938 schema.FeatureSettings.create(features.SerialConsole.name()),939 ]940 )941 node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)942 node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)943 node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)944 node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)945 return node_space946 def get_eligible_vm_sizes(self, location: str, log: Logger) -> List[AwsCapability]:947 # load eligible vm sizes948 # 1. vm size supported in current location949 # 2. vm size match predefined pattern950 location_capabilities: List[AwsCapability] = []951 key = self._get_location_key(location)952 if key not in self._eligible_capabilities:953 location_info: AwsLocation = self._get_location_info(location, log)954 # loop all fall back levels955 for fallback_pattern in VM_SIZE_FALLBACK_PATTERNS:956 level_capabilities: List[AwsCapability] = []957 # loop all capabilities958 for aws_capability in location_info.capabilities:959 # exclude one core which may be too slow to work in some distro960 assert isinstance(aws_capability.capability.core_count, int)961 if (962 fallback_pattern.match(aws_capability.vm_size)963 and aws_capability.capability.core_count > 1964 ):965 level_capabilities.append(aws_capability)966 # sort by rough cost967 level_capabilities.sort(key=lambda x: (x.estimated_cost))968 log.debug(969 f"{key}, pattern '{fallback_pattern.pattern}'"970 f" {len(level_capabilities)} candidates: "971 f"{[x.vm_size for x in level_capabilities]}"972 )973 location_capabilities.extend(level_capabilities)974 self._eligible_capabilities[key] = location_capabilities975 return self._eligible_capabilities[key]976 def _get_location_key(self, location: str) -> str:977 return f"lisa_aws_{location}"978 def _generate_min_capability(979 self,980 requirement: schema.NodeSpace,981 aws_capability: AwsCapability,982 location: str,983 ) -> schema.NodeSpace:984 min_cap: schema.NodeSpace = requirement.generate_min_capability(985 aws_capability.capability986 )987 # Apply aws specified values.988 aws_node_runbook = min_cap.get_extended_runbook(AwsNodeSchema, AWS)989 if aws_node_runbook.location:990 assert aws_node_runbook.location == location, (991 f"predefined location [{aws_node_runbook.location}] "992 f"must be same as "993 f"cap location [{location}]"994 )995 # the location may not be set996 aws_node_runbook.location = location997 aws_node_runbook.vm_size = aws_capability.vm_size998 assert min_cap.network_interface999 assert isinstance(1000 min_cap.network_interface.nic_count, int1001 ), f"actual: {min_cap.network_interface.nic_count}"1002 aws_node_runbook.nic_count = min_cap.network_interface.nic_count1003 assert isinstance(1004 min_cap.network_interface.data_path, schema.NetworkDataPath1005 ), f"actual: {type(min_cap.network_interface.data_path)}"1006 if min_cap.network_interface.data_path == schema.NetworkDataPath.Sriov:1007 aws_node_runbook.enable_sriov = True1008 assert min_cap.disk, "disk must exists"1009 assert isinstance(1010 min_cap.disk.data_disk_count, int1011 ), f"actual: {min_cap.disk.data_disk_count}"1012 aws_node_runbook.data_disk_count = min_cap.disk.data_disk_count1013 assert isinstance(1014 min_cap.disk.data_disk_caching_type, str1015 ), f"actual: {min_cap.disk.data_disk_caching_type}"1016 aws_node_runbook.data_disk_caching_type = min_cap.disk.data_disk_caching_type1017 return min_cap1018 def _generate_data_disks(1019 self,1020 node: Node,1021 aws_node_runbook: AwsNodeSchema,1022 ) -> List[DataDiskSchema]:1023 data_disks: List[DataDiskSchema] = []1024 assert node.capability.disk1025 if aws_node_runbook.marketplace:1026 image = boto3.resource("ec2").Image(aws_node_runbook.marketplace.imageid)1027 # AWS images has the root data disks by default1028 for data_disk in image.block_device_mappings:1029 if "Ebs" in data_disk and "VolumeSize" in data_disk["Ebs"]:1030 assert isinstance(node.capability.disk.data_disk_iops, int)1031 data_disks.append(1032 DataDiskSchema(1033 node.capability.disk.data_disk_caching_type,1034 data_disk["Ebs"]["VolumeSize"],1035 node.capability.disk.data_disk_iops,1036 aws_node_runbook.disk_type,1037 DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_FROM_IMAGE,1038 )1039 )1040 assert isinstance(1041 node.capability.disk.data_disk_count, int1042 ), f"actual: {type(node.capability.disk.data_disk_count)}"1043 for _ in range(node.capability.disk.data_disk_count):1044 assert isinstance(node.capability.disk.data_disk_size, int)1045 assert isinstance(node.capability.disk.data_disk_iops, int)1046 data_disks.append(1047 DataDiskSchema(1048 node.capability.disk.data_disk_caching_type,1049 node.capability.disk.data_disk_size,1050 node.capability.disk.data_disk_iops,1051 aws_node_runbook.disk_type,1052 DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_EMPTY,1053 )1054 )1055 return data_disks1056def _convert_to_aws_node_space(node_space: schema.NodeSpace) -> None:1057 if node_space:1058 if node_space.features:1059 new_settings = search_space.SetSpace[schema.FeatureSettings](1060 is_allow_set=True1061 )1062 for current_settings in node_space.features:1063 # reload to type specified settings1064 settings_type = feature.get_feature_settings_type_by_name(1065 current_settings.type, AwsPlatform.supported_features()1066 )1067 new_settings.add(schema.load_by_type(settings_type, current_settings))1068 node_space.features = new_settings1069 if node_space.disk:1070 node_space.disk = schema.load_by_type(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful