How to use _generate_data_disks method in lisa

Best Python code snippet using lisa_python

platform_.py

Source:platform_.py Github

copy

Full Screen

...493 # parsed from latest to a specified version.494 node.capability.set_extended_runbook(aws_node_runbook)495 nodes_parameters.append(aws_node_runbook)496 # Set data disk array497 aws_parameters.data_disks = self._generate_data_disks(498 node, aws_node_runbook499 )500 if not aws_parameters.location:501 # take first one's location502 aws_parameters.location = aws_node_runbook.location503 # save vm's information into node504 node_context = get_node_context(node)505 # vm's name, use to find it from aws506 node_context.vm_name = aws_node_runbook.name507 # ssh related information will be filled back once vm is created508 node_context.username = self.runbook.admin_username509 node_context.private_key_file = self.runbook.admin_private_key_file510 log.info(f"vm setting: {aws_node_runbook}")511 aws_parameters.nodes = nodes_parameters512 # In Azure, each VM should have only one nic in one subnet. So calculate513 # the max nic count, and set to subnet count.514 aws_parameters.subnet_count = max(x.nic_count for x in aws_parameters.nodes)515 # composite deployment properties516 parameters = aws_parameters.to_dict() # type:ignore517 parameters = {k: {"value": v} for k, v in parameters.items()}518 log.debug(f"parameters: {parameters}")519 return aws_parameters520 def _create_node_runbook(521 self,522 index: int,523 node_space: schema.NodeSpace,524 log: Logger,525 ) -> AwsNodeSchema:526 aws_node_runbook = node_space.get_extended_runbook(AwsNodeSchema, type_name=AWS)527 if not aws_node_runbook.name:528 aws_node_runbook.name = f"node-{index}"529 if not aws_node_runbook.vm_size:530 raise LisaException("vm_size is not detected before deploy")531 if not aws_node_runbook.location:532 raise LisaException("location is not detected before deploy")533 if not aws_node_runbook.marketplace:534 # set to default marketplace, if nothing specified535 aws_node_runbook.marketplace = AwsVmMarketplaceSchema()536 # Set disk type537 assert node_space.disk, "node space must have disk defined."538 assert isinstance(node_space.disk.disk_type, schema.DiskType)539 aws_node_runbook.disk_type = features.get_aws_disk_type(540 node_space.disk.disk_type541 )542 aws_node_runbook.data_disk_caching_type = node_space.disk.data_disk_caching_type543 assert isinstance(544 node_space.disk.data_disk_iops, int545 ), f"actual: {type(node_space.disk.data_disk_iops)}"546 aws_node_runbook.data_disk_iops = node_space.disk.data_disk_iops547 assert isinstance(548 node_space.disk.data_disk_size, int549 ), f"actual: {type(node_space.disk.data_disk_size)}"550 aws_node_runbook.data_disk_size = node_space.disk.data_disk_size551 assert node_space.network_interface552 assert isinstance(553 node_space.network_interface.nic_count, int554 ), f"actual: {node_space.network_interface.nic_count}"555 aws_node_runbook.nic_count = node_space.network_interface.nic_count556 assert isinstance(557 node_space.network_interface.data_path, schema.NetworkDataPath558 ), f"actual: {type(node_space.network_interface.data_path)}"559 if node_space.network_interface.data_path == schema.NetworkDataPath.Sriov:560 aws_node_runbook.enable_sriov = True561 return aws_node_runbook562 def _deploy(563 self, deployment_parameters: AwsDeployParameter, log: Logger564 ) -> Dict[str, Any]:565 ec2_resource = boto3.resource("ec2")566 instances = {}567 subnets = self._create_subnets(self._vpc.id, deployment_parameters, log)568 block_device_mappings = self._create_block_devices(deployment_parameters, log)569 for node in deployment_parameters.nodes:570 network_interfaces = self._create_network_interfaces(571 deployment_parameters, node, subnets, log572 )573 try:574 instance = ec2_resource.create_instances(575 ImageId=node.get_image_id(),576 InstanceType=cast(InstanceTypeType, node.vm_size),577 NetworkInterfaces=network_interfaces,578 BlockDeviceMappings=block_device_mappings,579 KeyName=deployment_parameters.key_pair_name,580 MinCount=1,581 MaxCount=1,582 )[0]583 instance.wait_until_running()584 instance.load()585 log.info("Created instance %s.", instance.id)586 # Enable ENA support if the test case requires.587 # Don't support the Intel 82599 Virtual Function (VF) interface now.588 # Refer to the document about AWS Enhanced networking on Linux.589 if node.enable_sriov and (not instance.ena_support):590 self._ec2_client.modify_instance_attribute(591 InstanceId=instance.id,592 EnaSupport={593 "Value": True,594 },595 )596 instances[node.name] = instance.instance_id597 except ClientError:598 log.exception(599 "Couldn't create instance with image %s, "600 "instance type %s, and key %s.",601 node.get_image_id(),602 node.vm_size,603 deployment_parameters.key_pair_name,604 )605 raise606 return instances607 def _delete_environment(self, environment: Environment, log: Logger) -> None:608 environment_context = get_environment_context(environment=environment)609 security_group_name = environment_context.security_group_name610 # the resource group name is empty when it is not deployed for some reasons,611 # like capability doesn't meet case requirement.612 if not security_group_name:613 return614 assert self._aws_runbook615 if not environment_context.security_group_is_created:616 log.info(617 f"skipped to delete security resource group: {security_group_name}, "618 f"as it's not created by this run."619 )620 elif self._aws_runbook.dry_run:621 log.info(622 f"skipped to delete security resource group: {security_group_name}, "623 f"as it's a dry run."624 )625 else:626 ec2_resource = boto3.resource("ec2")627 for node in environment.nodes.list():628 node_context = get_node_context(node)629 instance_id = node_context.instance_id630 self.terminate_instance(ec2_resource, instance_id, log)631 self.delete_security_group(632 ec2_resource,633 environment_context.security_group_id,634 environment_context.security_group_name,635 log,636 )637 self.delete_key_pair(ec2_resource, environment_context.key_pair_name, log)638 try:639 log.info(f"deleting vpc: {self._vpc.id}")640 for association in self._route_table.associations:641 association.delete()642 self._route_table.delete()643 self._internet_gateway.detach_from_vpc(VpcId=self._vpc.id)644 self._internet_gateway.delete()645 for subnet in self._vpc.subnets.all():646 subnet.delete()647 self._vpc.delete()648 except ClientError:649 log.exception(650 "Couldn't delete vpc %s.",651 self._vpc.id,652 )653 raise654 def terminate_instance(655 self, ec2_resource: Any, instance_id: str, log: Logger656 ) -> None:657 if not instance_id:658 return659 try:660 instance = ec2_resource.Instance(instance_id)661 instance.terminate()662 instance.wait_until_terminated()663 log.info("Terminating instance %s.", instance_id)664 except ClientError:665 log.exception("Couldn't terminate instance %s.", instance_id)666 def delete_security_group(667 self, ec2_resource: Any, group_id: str, security_group_name: str, log: Logger668 ) -> None:669 try:670 ec2_resource.SecurityGroup(group_id).delete()671 log.info("Deleting security group: %s.", security_group_name)672 except ClientError:673 log.exception(674 "Couldn't delete security group %s.",675 security_group_name,676 )677 def delete_key_pair(self, ec2_resource: Any, key_name: str, log: Logger) -> None:678 try:679 ec2_resource.KeyPair(key_name).delete()680 log.info("Deleted key pair %s.", key_name)681 except ClientError:682 log.exception("Couldn't delete key pair %s.", key_name)683 def _create_subnets(684 self, vpc_id: str, deployment_parameters: AwsDeployParameter, log: Logger685 ) -> Dict[int, Any]:686 subnets: Dict[int, Any] = {}687 try:688 addrs = self._vpc.cidr_block.split(".")689 for i in range(deployment_parameters.subnet_count):690 cidr_block = f"{addrs[0]}.{addrs[1]}.{str(i)}.0/24"691 subnets[i] = self._ec2_client.create_subnet(692 CidrBlock=cidr_block,693 VpcId=vpc_id,694 )695 self._route_table.associate_with_subnet(696 SubnetId=subnets[i]["Subnet"]["SubnetId"]697 )698 except ClientError:699 log.exception("Could not create a custom subnet.")700 raise701 else:702 return subnets703 def _create_network_interfaces(704 self,705 deployment_parameters: AwsDeployParameter,706 node: AwsNodeSchema,707 subnets: Dict[int, Any],708 log: Logger,709 ) -> List[Any]:710 network_interfaces = [711 {712 "Description": f"{node.name}-extra-0",713 "AssociatePublicIpAddress": True,714 "SubnetId": subnets[0]["Subnet"]["SubnetId"],715 "DeviceIndex": 0,716 "Groups": [deployment_parameters.security_group_id],717 }718 ]719 for i in range(1, node.nic_count):720 network_interfaces.append(721 {722 "Description": f"{node.name}-extra-{i}",723 "AssociatePublicIpAddress": False,724 "SubnetId": subnets[i]["Subnet"]["SubnetId"],725 "DeviceIndex": i,726 "Groups": [deployment_parameters.security_group_id],727 }728 )729 return network_interfaces730 def _create_block_devices(731 self,732 deployment_parameters: AwsDeployParameter,733 log: Logger,734 ) -> List[Any]:735 # There are some instance volume limits, please refer to736 # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html#linux-specific-volume-limits737 block_device_mappings = []738 volumes = self._get_available_volumes(deployment_parameters)739 for idx, disk in enumerate(deployment_parameters.data_disks):740 if (741 disk.create_option742 == DataDiskCreateOption.DATADISK_CREATE_OPTION_TYPE_EMPTY743 ):744 if idx >= len(volumes):745 raise LisaException(746 f"No device names available "747 f"for {len(deployment_parameters.data_disks)} disks!",748 )749 block_device_mappings.append(750 {751 "DeviceName": volumes[idx],752 "Ebs": {753 "DeleteOnTermination": True,754 "VolumeSize": disk.size,755 "VolumeType": disk.type,756 "Iops": disk.iops,757 },758 }759 )760 return block_device_mappings761 def _get_available_volumes(762 self, deployment_parameters: AwsDeployParameter763 ) -> List[str]:764 # In current implementation, all nodes use the same image.765 image_id = deployment_parameters.nodes[0].get_image_id()766 virtualization_type = boto3.resource("ec2").Image(image_id).virtualization_type767 volumes: List[str] = []768 # Create the available volume names based on virtualization type.769 # Refer to the following link770 # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/device_naming.html771 if virtualization_type == "hvm":772 for c in range(ord("b"), ord("c") + 1):773 for p in range(ord("a"), ord("z") + 1):774 volumes.append(f"/dev/xvd{chr(c)}{chr(p)}")775 elif virtualization_type == "paravirtual":776 for c in range(ord("f"), ord("p") + 1):777 for p in range(1, 7):778 volumes.append(f"/dev/sd{chr(c)}{p}")779 else:780 raise LisaException(781 f"The virtualization type {virtualization_type} is not supported now."782 )783 return volumes784 def _initialize_nodes(785 self, environment: Environment, instances: Dict[str, Any], log: Logger786 ) -> None:787 ec2_resource = boto3.resource("ec2")788 node_context_map: Dict[str, Node] = {}789 for node in environment.nodes.list():790 node_context = get_node_context(node)791 node_context.instance_id = instances[node_context.vm_name]792 node_context_map[node_context.vm_name] = node793 for vm_name, node in node_context_map.items():794 node_context = get_node_context(node)795 vm = ec2_resource.Instance(node_context.instance_id)796 if not vm:797 raise LisaException(798 f"cannot find vm: '{vm_name}', make sure deployment is correct."799 )800 public_ip = vm.public_ip_address801 assert public_ip, "public IP address cannot be empty!"802 if not node.name:803 node.name = vm_name804 assert isinstance(node, RemoteNode)805 node.set_connection_info(806 address=vm.private_ip_address,807 port=22,808 public_address=public_ip,809 public_port=22,810 username=node_context.username,811 password=node_context.password,812 private_key_file=node_context.private_key_file,813 )814 @retry(tries=10, delay=1, jitter=(0.5, 1))815 def _load_location_info_from_file(816 self, cached_file_name: Path, log: Logger817 ) -> Optional[AwsLocation]:818 loaded_obj: Optional[AwsLocation] = None819 if cached_file_name.exists():820 try:821 with open(cached_file_name, "r") as f:822 loaded_data: Dict[str, Any] = json.load(f)823 loaded_obj = schema.load_by_type(AwsLocation, loaded_data)824 except Exception as identifier:825 # if schema changed, There may be exception, remove cache and retry826 # Note: retry on this method depends on decorator827 log.debug(828 f"error on loading cache, delete cache and retry. {identifier}"829 )830 cached_file_name.unlink()831 raise identifier832 return loaded_obj833 def _get_location_info(self, location: str, log: Logger) -> AwsLocation:834 cached_file_name = constants.CACHE_PATH.joinpath(835 f"aws_locations_{location}.json"836 )837 should_refresh: bool = True838 key = location839 location_data = self._locations_data_cache.get(key, None)840 if not location_data:841 location_data = self._load_location_info_from_file(842 cached_file_name=cached_file_name, log=log843 )844 if location_data:845 delta = datetime.now() - location_data.updated_time846 # refresh cached locations every 1 day.847 if delta.days < 1:848 should_refresh = False849 log.debug(850 f"{key}: cache used: {location_data.updated_time}, "851 f"sku count: {len(location_data.capabilities)}"852 )853 else:854 log.debug(855 f"{key}: cache timeout: {location_data.updated_time},"856 f"sku count: {len(location_data.capabilities)}"857 )858 else:859 log.debug(f"{key}: no cache found")860 if should_refresh:861 ec2_region = boto3.client("ec2", region_name=location)862 log.debug(f"{key}: querying")863 all_skus: List[AwsCapability] = []864 instance_types = ec2_region.describe_instance_types()865 for instance_type in instance_types["InstanceTypes"]:866 capability = self._instance_type_to_capability(location, instance_type)867 # estimate vm cost for priority868 assert isinstance(capability.core_count, int)869 assert isinstance(capability.gpu_count, int)870 estimated_cost = capability.core_count + capability.gpu_count * 100871 aws_capability = AwsCapability(872 location=location,873 vm_size=instance_type["InstanceType"],874 capability=capability,875 resource_sku=instance_type,876 estimated_cost=estimated_cost,877 )878 all_skus.append(aws_capability)879 location_data = AwsLocation(location=location, capabilities=all_skus)880 log.debug(f"{location}: saving to disk")881 with open(cached_file_name, "w") as f:882 json.dump(location_data.to_dict(), f) # type: ignore883 log.debug(f"{key}: new data, " f"sku: {len(location_data.capabilities)}")884 assert location_data885 self._locations_data_cache[key] = location_data886 return location_data887 def _instance_type_to_capability( # noqa: C901888 self, location: str, instance_type: Any889 ) -> schema.NodeSpace:890 # fill in default values, in case no capability meet.891 node_space = schema.NodeSpace(892 node_count=1,893 core_count=0,894 memory_mb=0,895 gpu_count=0,896 )897 instancetype_name: str = instance_type["InstanceType"]898 node_space.name = f"{location}_{instancetype_name}"899 node_space.features = search_space.SetSpace[schema.FeatureSettings](900 is_allow_set=True901 )902 node_space.disk = features.AwsDiskOptionSettings()903 node_space.disk.disk_type = search_space.SetSpace[schema.DiskType](904 is_allow_set=True, items=[]905 )906 node_space.disk.data_disk_iops = search_space.IntRange(min=0)907 node_space.disk.data_disk_size = search_space.IntRange(min=0)908 node_space.network_interface = schema.NetworkInterfaceOptionSettings()909 node_space.network_interface.data_path = search_space.SetSpace[910 schema.NetworkDataPath911 ](is_allow_set=True, items=[])912 for name, value in instance_type.items():913 if name == "VCpuInfo":914 node_space.core_count = int(value["DefaultVCpus"])915 elif name == "MemoryInfo":916 node_space.memory_mb = int(value["SizeInMiB"])917 elif name == "NetworkInfo":918 nic_count = value["MaximumNetworkInterfaces"]919 node_space.network_interface.nic_count = search_space.IntRange(920 min=1, max=nic_count921 )922 node_space.network_interface.max_nic_count = nic_count923 if value["EnaSupport"] == "supported":924 node_space.network_interface.data_path.add(925 schema.NetworkDataPath.Sriov926 )927 elif name == "GpuInfo":928 for gpu in value["Gpus"]:929 node_space.gpu_count += gpu["Count"]930 # update features list if gpu feature is supported931 node_space.features.add(932 schema.FeatureSettings.create(features.Gpu.name())933 )934 # all nodes support following features935 node_space.features.update(936 [937 schema.FeatureSettings.create(features.StartStop.name()),938 schema.FeatureSettings.create(features.SerialConsole.name()),939 ]940 )941 node_space.disk.disk_type.add(schema.DiskType.StandardHDDLRS)942 node_space.disk.disk_type.add(schema.DiskType.StandardSSDLRS)943 node_space.disk.disk_type.add(schema.DiskType.PremiumSSDLRS)944 node_space.network_interface.data_path.add(schema.NetworkDataPath.Synthetic)945 return node_space946 def get_eligible_vm_sizes(self, location: str, log: Logger) -> List[AwsCapability]:947 # load eligible vm sizes948 # 1. vm size supported in current location949 # 2. vm size match predefined pattern950 location_capabilities: List[AwsCapability] = []951 key = self._get_location_key(location)952 if key not in self._eligible_capabilities:953 location_info: AwsLocation = self._get_location_info(location, log)954 # loop all fall back levels955 for fallback_pattern in VM_SIZE_FALLBACK_PATTERNS:956 level_capabilities: List[AwsCapability] = []957 # loop all capabilities958 for aws_capability in location_info.capabilities:959 # exclude one core which may be too slow to work in some distro960 assert isinstance(aws_capability.capability.core_count, int)961 if (962 fallback_pattern.match(aws_capability.vm_size)963 and aws_capability.capability.core_count > 1964 ):965 level_capabilities.append(aws_capability)966 # sort by rough cost967 level_capabilities.sort(key=lambda x: (x.estimated_cost))968 log.debug(969 f"{key}, pattern '{fallback_pattern.pattern}'"970 f" {len(level_capabilities)} candidates: "971 f"{[x.vm_size for x in level_capabilities]}"972 )973 location_capabilities.extend(level_capabilities)974 self._eligible_capabilities[key] = location_capabilities975 return self._eligible_capabilities[key]976 def _get_location_key(self, location: str) -> str:977 return f"lisa_aws_{location}"978 def _generate_min_capability(979 self,980 requirement: schema.NodeSpace,981 aws_capability: AwsCapability,982 location: str,983 ) -> schema.NodeSpace:984 min_cap: schema.NodeSpace = requirement.generate_min_capability(985 aws_capability.capability986 )987 # Apply aws specified values.988 aws_node_runbook = min_cap.get_extended_runbook(AwsNodeSchema, AWS)989 if aws_node_runbook.location:990 assert aws_node_runbook.location == location, (991 f"predefined location [{aws_node_runbook.location}] "992 f"must be same as "993 f"cap location [{location}]"994 )995 # the location may not be set996 aws_node_runbook.location = location997 aws_node_runbook.vm_size = aws_capability.vm_size998 assert min_cap.network_interface999 assert isinstance(1000 min_cap.network_interface.nic_count, int1001 ), f"actual: {min_cap.network_interface.nic_count}"1002 aws_node_runbook.nic_count = min_cap.network_interface.nic_count1003 assert isinstance(1004 min_cap.network_interface.data_path, schema.NetworkDataPath1005 ), f"actual: {type(min_cap.network_interface.data_path)}"1006 if min_cap.network_interface.data_path == schema.NetworkDataPath.Sriov:1007 aws_node_runbook.enable_sriov = True1008 assert min_cap.disk, "disk must exists"1009 assert isinstance(1010 min_cap.disk.data_disk_count, int1011 ), f"actual: {min_cap.disk.data_disk_count}"1012 aws_node_runbook.data_disk_count = min_cap.disk.data_disk_count1013 assert isinstance(1014 min_cap.disk.data_disk_caching_type, str1015 ), f"actual: {min_cap.disk.data_disk_caching_type}"1016 aws_node_runbook.data_disk_caching_type = min_cap.disk.data_disk_caching_type1017 return min_cap1018 def _generate_data_disks(1019 self,1020 node: Node,1021 aws_node_runbook: AwsNodeSchema,1022 ) -> List[DataDiskSchema]:1023 data_disks: List[DataDiskSchema] = []1024 assert node.capability.disk1025 if aws_node_runbook.marketplace:1026 image = boto3.resource("ec2").Image(aws_node_runbook.marketplace.imageid)1027 # AWS images has the root data disks by default1028 for data_disk in image.block_device_mappings:1029 if "Ebs" in data_disk and "VolumeSize" in data_disk["Ebs"]:1030 assert isinstance(node.capability.disk.data_disk_iops, int)1031 data_disks.append(1032 DataDiskSchema(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful