How to use get_pci_id method in avocado

...101 NvidiaGPU._ref_count -= 1102 if not NvidiaGPU._ref_count:103 nvmlShutdown()104 def __repr__(self) -> str:105 return f"{} @{self.get_pci_id()}"106 def get_supported_types(self) -> Sequence[GPUPartitionType]:107 return self.supported_vgpu_types108 def get_creatable_types(self) -> Sequence[GPUPartitionType]:109 return self.get_creatable_vgpus()110 def create_partition(self, type: GPUPartitionType) -> GPUPartition:111 assert isinstance(type, VGPUType)112 return self.create_vgpu(type)113 def get_partitions(self) -> Sequence[GPUPartition]:114 return self.get_created_mdevs()115 def create_gpu_instance(self, profile_id: int) -> GPUInstance:116 handle = nvmlDeviceCreateGpuInstance(self._nvml_dev, profile_id)117 instance = GPUInstance.from_nvml_handle_parent(handle, self)118 for profile in instance.get_compute_instance_profiles():119 if profile.slice_count == instance.profile.slice_count:120 instance.create_compute_instance(profile)121 break122 return handle123 def create_gpu_instance_maybe(self, vgpu_type: VGPUType) -> None:124 gpu_instances = self.get_gpu_instances()125 vgpu_types = [mdev.type for mdev in self.get_created_mdevs()] + [vgpu_type]126 for vgpu_type in vgpu_types:127 gpu_instance = next(128 (x for x in gpu_instances if == vgpu_type.gip_id), None129 )130 if gpu_instance:131 gpu_instances.remove(gpu_instance)132 else:133 self.create_gpu_instance(vgpu_type.gip_id)134 def create_vgpu(self, type: VGPUType) -> VGPUMdev:135 required_mig_status = (136 MIGModeStatus.ENABLE if type.is_mig else MIGModeStatus.DISABLE137 )138 if self.get_current_mig_status() != required_mig_status:139 if self.get_created_mdevs():140 raise VGPUCreateException(141 f"vGPU type {type} requires MIG mode change, "142 "but some mdevs are present"143 )144 self.set_mig_mode(required_mig_status)145 if type.is_mig:146 self.create_gpu_instance_maybe(type)147 sysfs_handle = self.get_sysfs_handle()148 if self.get_vgpu_mode() == VGPUMode.SRIOV:149 if not sysfs_handle.get_sriov_active():150 self.set_sriov_enable(True)151 tmp = sysfs_handle.get_sriov_available_vf()152 if tmp is None:153 raise VGPUCreateException(f"GPU {self} has no available VFs")154 sysfs_handle = tmp155 mdev_type = type.get_mdev_type()156 if not sysfs_handle.get_mdev_type_available(mdev_type):157 raise VGPUCreateException(f"GPU {self} can't create any {type}")158 return VGPUMdev.from_sysfs_handle(sysfs_handle.create_mdev(mdev_type))159 def destroy_gpu_instance_maybe(self, profile: GPUInstanceProfile) -> None:160 gpu_instances = [x for x in self.get_gpu_instances() if x.profile == profile]161 if not gpu_instances:162 return163 mdevs = [164 mdev for mdev in self.get_created_mdevs() if mdev.type.gip_id == profile.id165 ]166 for vgpu in self.get_active_vgpus():167 gpu_instances = [168 instance169 for instance in gpu_instances170 if != vgpu.get_gpu_instance_id()171 ]172 mdev = next(mdev for mdev in mdevs if mdev.type == vgpu.type)173 mdevs.remove(mdev)174 if len(gpu_instances) > len(mdevs):175 gpu_instances.pop().destroy()176 def get_active_vgpus(self) -> list[VGPUInstance]:177 return [178 VGPUInstance.from_nvml(dev)179 for dev in nvmlDeviceGetActiveVgpus(self._nvml_dev)180 ]181 def get_creatable_vgpus(self) -> list[VGPUType]:182 if not self.get_created_mdevs():183 return self.supported_vgpu_types184 supported_mig_vgpu_types = [185 type for type in self.supported_vgpu_types if type.is_mig186 ]187 return (188 [189 type190 for type in supported_mig_vgpu_types191 if self.get_gpu_instance_remaining_capacity(192 GPUInstanceProfile.from_id(type.gip_id, self)193 )194 ]195 if self.get_current_mig_status() == MIGModeStatus.ENABLE196 else [197 VGPUType.from_id(id)198 for id in nvmlDeviceGetCreatableVgpus(self._nvml_dev)199 ]200 )201 def get_created_mdevs(self) -> list[VGPUMdev]:202 vgpus: list[VGPUMdev] = []203 sysfs_handle = GPUSysFsHandle.from_gpu(self)204 if sysfs_handle.get_mdev_supported():205 vgpus.extend(206 VGPUMdev.from_sysfs_handle(dev)207 for dev in sysfs_handle.get_mdev_devices()208 )209 if sysfs_handle.get_sriov_active():210 for vf in sysfs_handle.get_sriov_vfs():211 vgpus.extend(212 VGPUMdev.from_sysfs_handle(dev) for dev in vf.get_mdev_devices()213 )214 return vgpus215 def get_current_mig_status(self) -> MIGModeStatus:216 current, _ = nvmlDeviceGetMigMode(self._nvml_dev)217 return MIGModeStatus(current)218 def get_pending_mig_status(self) -> MIGModeStatus:219 _, pending = nvmlDeviceGetMigMode(self._nvml_dev)220 return MIGModeStatus(pending)221 def get_gpu_instance_remaining_capacity(self, profile: GPUInstanceProfile) -> int:222 return nvmlDeviceGetGpuInstanceRemainingCapacity(self._nvml_dev, def get_gpu_instances(self) -> list[GPUInstance]:224 from ctypes import byref, c_uint225 if self.get_current_mig_status() != MIGModeStatus.ENABLE:226 raise NvidiaGPUMIGDisabled227 instances = []228 for profile in range(NVML_GPU_INSTANCE_PROFILE_COUNT):229 try:230 info = nvmlDeviceGetGpuInstanceProfileInfo(self._nvml_dev, profile)231 except NVMLError_NotSupported:232 continue233 InstancesArray = NVMLGpuInstance * info.instanceCount234 c_count = c_uint()235 c_profile_instances = InstancesArray()236 nvmlDeviceGetGpuInstances(237 self._nvml_dev,, c_profile_instances, byref(c_count)238 )239 for i in range(c_count.value):240 instances.append(241 GPUInstance.from_nvml_handle_parent(c_profile_instances[i], self)242 )243 return instances244 def get_pci_id(self) -> str:245 return nvmlDeviceGetPciInfo_v3(self._nvml_dev).busIdLegacy.lower()246 def get_sysfs_handle(self) -> GPUSysFsHandle:247 return GPUSysFsHandle.from_gpu(self)248 def get_vgpu_mode(self) -> VGPUMode:249 mode = nvmlDeviceGetHostVgpuMode(self._nvml_dev)250 return VGPUMode(mode)251 def get_vgpu_max_instances(self, type: VGPUType) -> int:252 return nvmlVgpuTypeGetMaxInstances(self._nvml_dev, def get_vf_available_vgpu(self, vf_num: int, vgpu_type: VGPUType) -> bool:254 path = (255 f"/sys/bus/pci/devices/{self.get_pci_id()}/virtfn{vf_num}"256 f"/mdev_supported_types/nvidia-{}/available_instances"257 )258 with open(path) as f:259 return bool(int( def get_vf_available_vgpus(self, vf_num: int) -> list[VGPUType]:261 return [262 x263 for x in self.get_creatable_vgpus()264 if self.get_vf_available_vgpu(vf_num, x)265 ]266 def set_mig_mode(self, mode: MIGModeStatus) -> None:267 status = nvmlDeviceSetMigMode(self._nvml_dev, mode)268 if status != NVML_SUCCESS:269 raise NVMLError(status)270 def set_sriov_enable(self, enable: bool) -> None:271 from subprocess import run272 p = run(273 [274 "/usr/lib/nvidia/sriov-manage",275 "-e" if enable else "-d",276 self.get_pci_id(),277 ]278 )...

...41 self.domain_id = domain_id42 self.bus_id = bus_id43 self.devfun_id = devfun_id44 self.pci = domain_id + ':' + bus_id + ':' + devfun_id45 self.pci_id = get_pci_id(crb, domain_id, bus_id, devfun_id)46 self.default_driver = settings.get_nic_driver(self.pci_id)47 self.intf_name = 'N/A'48 self.intf2_name = None49 self.get_interface_name()50 def __send_expect(self, cmds, expected, timeout=TIMEOUT, alt_session=True):51 """52 Wrap the crb's session as private session for sending expect.53 """54 return self.crb.send_expect(55 cmds, expected, timeout=timeout, alt_session=alt_session)56 def nic_has_driver(func):57 """58 Check if the NIC has a driver.59 """60 @wraps(func)61 def wrapper(*args, **kwargs):62 nic_instance = args[0]63 nic_instance.current_driver = nic_instance.get_nic_driver()64 if not nic_instance.current_driver:65 return ''66 return func(*args, **kwargs)67 return wrapper68 def get_nic_driver(self):69 """70 Get the NIC driver.71 """72 return self.crb.get_pci_dev_driver(73 self.domain_id, self.bus_id, self.devfun_id)74 @nic_has_driver75 def get_interface_name(self):76 """77 Get interface name of NICs.78 """79 driver = self.current_driver80 driver_alias = driver.replace('-', '_')81 try:82 get_interface_name = getattr(83 self, 'get_interface_name_%s' %84 driver_alias)85 except Exception as e:86 generic_driver = 'generic'87 get_interface_name = getattr(88 self, 'get_interface_name_%s' %89 generic_driver)90 out = get_interface_name(self.domain_id, self.bus_id, self.devfun_id)91 if "No such file or directory" in out:92 self.intf_name = 'N/A'93 else:94 self.intf_name = out95 return self.intf_name96 def get_interface_name_generic(self, domain_id, bus_id, devfun_id):97 """98 Get the interface name by the default way.99 """100 command = 'ls --color=never /sys/bus/pci/devices/%s\:%s\:%s/net' % (101 domain_id, bus_id, devfun_id)102 return self.__send_expect(command, '# ')103 def get_interface2_name(self):104 """105 Get interface name of second port of this pci device.106 """107 return self.intf2_name108def get_pci_id(crb, domain_id, bus_id, devfun_id):109 pass110def add_to_list(host, obj):111 """112 Add NICs object to global structure113 Parameter 'host' is ip address, 'obj' is netdevice object114 """115 nic = {}116 nic['host'] = host117 nic['pci'] = obj.pci118 nic['port'] = obj119 NICS_LIST.append(nic)120def get_from_list(host, domain_id, bus_id, devfun_id):121 """122 Get NICs object from global structure123 Parameter will by host ip, pci domain id, pci bus id, pci function id124 """125 for nic in NICS_LIST:126 if host == nic['host']:127 pci = ':'.join((domain_id, bus_id, devfun_id))128 if pci == nic['pci']:129 return nic['port']130 return None131def GetNicObj(crb, domain_id, bus_id, devfun_id):132 """133 Get NICs object. If NICs has been initialized, just return object.134 """135 obj = get_from_list(crb.crb['My IP'], domain_id, bus_id, devfun_id)136 if obj:137 return obj138 pci_id = get_pci_id(crb, domain_id, bus_id, devfun_id)139 nic = settings.get_nic_name(pci_id)140 obj = NetDevice(crb, domain_id, bus_id, devfun_id)141 add_to_list(crb.crb['My IP'], obj)...

