How to use cpu_online_map method in autotest

Best Python code snippet using autotest_python Github


Full Screen

1import os2import logging3import random4from avocado.utils import cpu5from virttest.libvirt_xml import vm_xml6from virttest import utils_libvirtd, virsh7from virttest.cpu import cpus_parser8from virttest.staging import utils_cgroup9from virttest.virt_vm import VMStartError10def get_emulatorpin_from_cgroup(params, test):11 """12 Get a list of domain-specific per block stats from cgroup blkio controller.13 :params: the parameter dictionary14 """15 vm = params.get("vm")16 cpuset_path = \17 utils_cgroup.resolve_task_cgroup_path(vm.get_pid(), "cpuset")18 cpuset_file = os.path.join(cpuset_path, "cpuset.cpus")19 try:20 with open(cpuset_file, "rU") as f_emulatorpin_params:21 emulatorpin_params_from_cgroup = f_emulatorpin_params.readline()22 return emulatorpin_params_from_cgroup23 except IOError:24 test.error("Failed to get emulatorpin "25 "params from %s" % cpuset_file)26def check_emulatorpin(params, test):27 """28 Check emulator affinity29 :params: the parameter dictionary30 """31 dicts = {}32 vm = params.get("vm")33 vm_name = params.get("main_vm")34 cpu_list = params.get("cpu_list")35 cgconfig = params.get("cgconfig", "on")36 options = params.get("emulatorpin_options")37 result = virsh.emulatorpin(vm_name, debug=True)38 cmd_output = result.stdout.strip().splitlines()39 logging.debug(cmd_output)40 # Parsing command output and putting them into python dictionary.41 for l in cmd_output[2:]:42 k, v = l.split(':')43 dicts[k.strip()] = v.strip()44 logging.debug(dicts)45 emulator_from_cmd = dicts['*']46 emulatorpin_from_xml = ""47 # To change a running guest with 'config' option, which will affect48 # next boot, if don't shutdown the guest, we need to run virsh dumpxml49 # with 'inactive' option to get guest XML changes.50 if options == "config" and vm and not vm.is_alive():51 emulatorpin_from_xml = \52 vm_xml.VMXML().new_from_dumpxml(vm_name, "--inactive").cputune.emulatorpin53 else:54 emulatorpin_from_xml = \55 vm_xml.VMXML().new_from_dumpxml(vm_name).cputune.emulatorpin56 # To get guest corresponding emulator/cpuset.cpus value57 # from cpuset controller of the cgroup.58 if cgconfig == "on" and vm and vm.is_alive():59 emulatorpin_from_cgroup = get_emulatorpin_from_cgroup(params, test)60 logging.debug("The emulatorpin value from "61 "cgroup: %s", emulatorpin_from_cgroup)62 # To check specified cpulist value with virsh command output63 # and/or cpuset.cpus from cpuset controller of the cgroup.64 if cpu_list:65 if vm and vm.is_alive() and options != "config":66 if (cpu_list != cpus_parser(emulator_from_cmd)) or \67 (cpu_list != cpus_parser(emulatorpin_from_cgroup)):68 logging.error("To expect emulatorpin %s: %s",69 cpu_list, emulator_from_cmd)70 return False71 else:72 if cpu_list != cpus_parser(emulatorpin_from_xml):73 logging.error("To expect emulatorpin %s: %s",74 cpu_list, emulatorpin_from_xml)75 return False76 return True77def get_emulatorpin_parameter(params, test):78 """79 Get the emulatorpin parameters80 :params: the parameter dictionary81 """82 vm_name = params.get("main_vm")83 vm = params.get("vm")84 options = params.get("emulatorpin_options")85 start_vm = params.get("start_vm", "yes")86 if start_vm == "no" and vm and vm.is_alive():87 vm.destroy()88 result = virsh.emulatorpin(vm_name, options=options, debug=True)89 status = result.exit_status90 # Check status_error91 status_error = params.get("status_error", "no")92 if status_error == "yes":93 if status or not check_emulatorpin(params, test):94"It's an expected : %s", result.stderr)95 else:96"%d not a expected command "97 "return value" % status)98 elif status_error == "no":99 if status:100 else:102 set_emulatorpin_parameter(params, test):104 """105 Set the emulatorpin parameters106 :params: the parameter dictionary107 """108 vm_name = params.get("main_vm")109 vm = params.get("vm")110 cpulist = params.get("emulatorpin_cpulist")111 options = params.get("emulatorpin_options")112 start_vm = params.get("start_vm", "yes")113 if start_vm == "no" and vm and vm.is_alive():114 vm.destroy()115 result = virsh.emulatorpin(vm_name, cpulist, options, debug=True)116 status = result.exit_status117 # Check status_error118 status_error = params.get("status_error")119 if status_error == "yes":120 if status or not check_emulatorpin(params, test):121"It's an expected : %s", result.stderr)122 else:123"%d not a expected command "124 "return value" % status)125 elif status_error == "no":126 if status:127 else:129 if check_emulatorpin(params, test):130 else:132"The 'cpulist' is inconsistent with"133 " 'cpulist' emulatorpin XML or/and is"134 " different from emulator/cpuset.cpus"135 " value from cgroup cpuset controller")136def run(test, params, env):137 """138 Test emulatorpin tuning139 1) Positive testing140 1.1) get the current emulatorpin parameters for a running/shutoff guest141 1.2) set the current emulatorpin parameters for a running/shutoff guest142 2) Negative testing143 2.1) get emulatorpin parameters for a running/shutoff guest144 2.2) set emulatorpin parameters running/shutoff guest145 """146 # Run test case147 vm_name = params.get("main_vm")148 vm = env.get_vm(vm_name)149 cgconfig = params.get("cgconfig", "on")150 cpulist = params.get("emulatorpin_cpulist")151 status_error = params.get("status_error", "no")152 change_parameters = params.get("change_parameters", "no")153 # Backup original vm154 vmxml = vm_xml.VMXML.new_from_inactive_dumpxml(vm_name)155 vmxml_backup = vmxml.copy()156 emulatorpin_placement = params.get("emulatorpin_placement", "")157 if emulatorpin_placement:158 vm.destroy()159 vmxml.placement = emulatorpin_placement160 vmxml.sync()161 try:162 vm.start()163 except VMStartError as detail:164 # Recover the VM and failout early165 vmxml_backup.sync()166 logging.debug("Used VM XML:\n %s", vmxml)167"VM Fails to start: %s" % detail)168 test_dicts = dict(params)169 test_dicts['vm'] = vm170 host_cpus = cpu.online_cpus_count()171 test_dicts['host_cpus'] = host_cpus172 cpu_max = int(host_cpus) - 1173 cpu_list = None174 # Assemble cpu list for positive test175 if status_error == "no":176 if cpulist is None:177 pass178 elif cpulist == "x":179 cpu_online_map = list(map(str, cpu.cpu_online_list()))180 cpulist = random.choice(cpu_online_map)181 elif cpulist == "x-y":182 # By default, emulator is pined to all cpus, and element183 # 'cputune/emulatorpin' may not exist in VM's XML.184 # And libvirt will do nothing if pin emulator to the same185 # cpus, that means VM's XML still have that element.186 # So for testing, we should avoid that value(0-$cpu_max).187 if cpu_max < 2:188 cpulist = "0-0"189 else:190 cpulist = "0-%s" % (cpu_max - 1)191 elif cpulist == "x,y":192 cpu_online_map = list(map(str, cpu.cpu_online_list()))193 cpulist = ','.join(random.sample(cpu_online_map, 2))194 elif cpulist == "x-y,^z":195 cpulist = "0-%s,^%s" % (cpu_max, cpu_max)196 elif cpulist == "-1":197 cpulist = "-1"198 elif cpulist == "out_of_max":199 cpulist = str(cpu_max + 1)200 else:201 test.cancel("CPU-list=%s is not recognized."202 % cpulist)203 test_dicts['emulatorpin_cpulist'] = cpulist204 if cpulist:205 cpu_list = cpus_parser(cpulist)206 test_dicts['cpu_list'] = cpu_list207 logging.debug("CPU list is %s", cpu_list)208 cg = utils_cgroup.CgconfigService()209 if cgconfig == "off":210 if cg.cgconfig_is_running():211 cg.cgconfig_stop()212 # positive and negative testing #########213 try:214 if status_error == "no":215 if change_parameters == "no":216 get_emulatorpin_parameter(test_dicts, test)217 else:218 set_emulatorpin_parameter(test_dicts, test)219 if status_error == "yes":220 if change_parameters == "no":221 get_emulatorpin_parameter(test_dicts, test)222 else:223 set_emulatorpin_parameter(test_dicts, test)224 finally:225 # Recover cgconfig and libvirtd service226 if not cg.cgconfig_is_running():227 cg.cgconfig_start()228 utils_libvirtd.libvirtd_restart()229 # Recover vm....

Full Screen

Full Screen Github


Full Screen

...16 e_msg = 'Single CPU online detected, test not supported.'17 raise error.TestNAError(e_msg)18 # Have a simple and quick check first, FIX me please.19 utils.system('dmesg -c > /dev/null')20 for cpu in utils.cpu_online_map():21 if os.path.isfile('/sys/devices/system/cpu/cpu%s/online' % cpu):22 utils.system('echo 0 > /sys/devices/system/cpu/cpu%s/online' % cpu, 1)23 utils.system('dmesg -c')24 time.sleep(3)25 utils.system('echo 1 > /sys/devices/system/cpu/cpu%s/online' % cpu, 1)26 utils.system('dmesg -c')27 time.sleep(3)28 def run_once(self):29 # Begin this cpu hotplug test big guru.30 os.chdir(self.srcdir)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?