How to use _get_file_mock method in avocado

Best Python code snippet using avocado_python

test_utils_cpu.py

Source:test_utils_cpu.py Github

copy

Full Screen

...7from .. import recent_mock8from avocado.utils import cpu9class Cpu(unittest.TestCase):10 @staticmethod11 def _get_file_mock(content):12 file_mock = mock.Mock()13 file_mock.__enter__ = mock.Mock(return_value=io.BytesIO(content))14 file_mock.__exit__ = mock.Mock()15 return file_mock16 @unittest.skipUnless(recent_mock(),17 "mock library version cannot (easily) patch open()")18 def test_s390x_cpu_online(self):19 s390x = b"""vendor_id : IBM/S39020# processors : 221bogomips per cpu: 2913.0022max thread id : 023features : esan3 zarch stfle msa ldisp eimm dfp edat etf3eh highgprs te sie24facilities : 0 1 2 3 4 6 7 8 9 10 12 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 30 31 32 33 34 35 36 37 40 41 42 43 44 45 46 47 48 49 50 51 52 57 73 74 75 76 7725cache0 : level=1 type=Data scope=Private size=96K line_size=256 associativity=626cache1 : level=1 type=Instruction scope=Private size=64K line_size=256 associativity=427cache2 : level=2 type=Data scope=Private size=1024K line_size=256 associativity=828cache3 : level=2 type=Instruction scope=Private size=1024K line_size=256 associativity=829cache4 : level=3 type=Unified scope=Shared size=49152K line_size=256 associativity=1230cache5 : level=4 type=Unified scope=Shared size=393216K line_size=256 associativity=2431processor 0: version = FF, identification = 32C047, machine = 282732processor 1: version = FF, identification = 32C047, machine = 282733cpu number : 034cpu MHz dynamic : 550435cpu MHz static : 550436cpu number : 137cpu MHz dynamic : 550438cpu MHz static : 550439"""40 s390x_2 = b"""vendor_id : IBM/S39041# processors : 442bogomips per cpu: 2913.0043max thread id : 044features : esan3 zarch stfle msa ldisp eimm dfp edat etf3eh highgprs te sie45facilities : 0 1 2 3 4 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 30 31 32 33 34 35 36 37 40 41 42 43 44 45 46 47 48 49 50 51 52 57 64 65 66 67 68 69 70 71 72 73 75 76 77 78 131 13246cache0 : level=1 type=Data scope=Private size=96K line_size=256 associativity=647cache1 : level=1 type=Instruction scope=Private size=64K line_size=256 associativity=448cache2 : level=2 type=Data scope=Private size=1024K line_size=256 associativity=849cache3 : level=2 type=Instruction scope=Private size=1024K line_size=256 associativity=850cache4 : level=3 type=Unified scope=Shared size=49152K line_size=256 associativity=1251cache5 : level=4 type=Unified scope=Shared size=393216K line_size=256 associativity=2452processor 0: version = 00, identification = 3FC047, machine = 282753processor 1: version = 00, identification = 3FC047, machine = 282754processor 2: version = 00, identification = 3FC047, machine = 282755processor 3: version = 00, identification = 3FC047, machine = 282756cpu number : 057cpu MHz dynamic : 550458cpu MHz static : 550459cpu number : 160cpu MHz dynamic : 550461cpu MHz static : 550462cpu number : 263cpu MHz dynamic : 550464cpu MHz static : 550465cpu number : 366cpu MHz dynamic : 550467cpu MHz static : 550468"""69 with mock.patch('avocado.utils.cpu.platform.machine', return_value='s390x'):70 with mock.patch('avocado.utils.cpu.open',71 return_value=self._get_file_mock(s390x)):72 self.assertEqual(len(cpu.cpu_online_list()), 2)73 with mock.patch('avocado.utils.cpu.open',74 return_value=self._get_file_mock(s390x_2)):75 self.assertEqual(len(cpu.cpu_online_list()), 4)76 @unittest.skipUnless(recent_mock(),77 "mock library version cannot (easily) patch open()")78 def test_x86_64_cpu_online(self):79 x86_64 = b"""processor : 080vendor_id : GenuineIntel81cpu family : 682model : 6083model name : Intel(R) Core(TM) i7-4710MQ CPU @ 2.50GHz84stepping : 385microcode : 0x2286cpu MHz : 2494.30187cache size : 6144 KB88physical id : 089siblings : 890core id : 091cpu cores : 492apicid : 093initial apicid : 094fpu : yes95fpu_exception : yes96cpuid level : 1397wp : yes98flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault epb tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts99bugs :100bogomips : 4988.60101clflush size : 64102cache_alignment : 64103address sizes : 39 bits physical, 48 bits virtual104power management:105processor : 1106vendor_id : GenuineIntel107cpu family : 6108model : 60109model name : Intel(R) Core(TM) i7-4710MQ CPU @ 2.50GHz110stepping : 3111microcode : 0x22112cpu MHz : 2494.301113cache size : 6144 KB114physical id : 0115siblings : 8116core id : 0117cpu cores : 4118apicid : 1119initial apicid : 1120fpu : yes121fpu_exception : yes122cpuid level : 13123wp : yes124flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault epb tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts125bugs :126bogomips : 4988.60127clflush size : 64128cache_alignment : 64129address sizes : 39 bits physical, 48 bits virtual130power management:131processor : 2132vendor_id : GenuineIntel133cpu family : 6134model : 60135model name : Intel(R) Core(TM) i7-4710MQ CPU @ 2.50GHz136stepping : 3137microcode : 0x22138cpu MHz : 2494.301139cache size : 6144 KB140physical id : 0141siblings : 8142core id : 1143cpu cores : 4144apicid : 2145initial apicid : 2146fpu : yes147fpu_exception : yes148cpuid level : 13149wp : yes150flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault epb tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts151bugs :152bogomips : 4988.60153clflush size : 64154cache_alignment : 64155address sizes : 39 bits physical, 48 bits virtual156power management:157processor : 3158vendor_id : GenuineIntel159cpu family : 6160model : 60161model name : Intel(R) Core(TM) i7-4710MQ CPU @ 2.50GHz162stepping : 3163microcode : 0x22164cpu MHz : 2494.301165cache size : 6144 KB166physical id : 0167siblings : 8168core id : 1169cpu cores : 4170apicid : 3171initial apicid : 3172fpu : yes173fpu_exception : yes174cpuid level : 13175wp : yes176flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault epb tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts177bugs :178bogomips : 4988.60179clflush size : 64180cache_alignment : 64181address sizes : 39 bits physical, 48 bits virtual182power management:183processor : 4184vendor_id : GenuineIntel185cpu family : 6186model : 60187model name : Intel(R) Core(TM) i7-4710MQ CPU @ 2.50GHz188stepping : 3189microcode : 0x22190cpu MHz : 2494.301191cache size : 6144 KB192physical id : 0193siblings : 8194core id : 2195cpu cores : 4196apicid : 4197initial apicid : 4198fpu : yes199fpu_exception : yes200cpuid level : 13201wp : yes202flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault epb tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts203bugs :204bogomips : 4988.60205clflush size : 64206cache_alignment : 64207address sizes : 39 bits physical, 48 bits virtual208power management:209processor : 5210vendor_id : GenuineIntel211cpu family : 6212model : 60213model name : Intel(R) Core(TM) i7-4710MQ CPU @ 2.50GHz214stepping : 3215microcode : 0x22216cpu MHz : 2494.301217cache size : 6144 KB218physical id : 0219siblings : 8220core id : 2221cpu cores : 4222apicid : 5223initial apicid : 5224fpu : yes225fpu_exception : yes226cpuid level : 13227wp : yes228flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault epb tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts229bugs :230bogomips : 4988.60231clflush size : 64232cache_alignment : 64233address sizes : 39 bits physical, 48 bits virtual234power management:235processor : 6236vendor_id : GenuineIntel237cpu family : 6238model : 60239model name : Intel(R) Core(TM) i7-4710MQ CPU @ 2.50GHz240stepping : 3241microcode : 0x22242cpu MHz : 2494.301243cache size : 6144 KB244physical id : 0245siblings : 8246core id : 3247cpu cores : 4248apicid : 6249initial apicid : 6250fpu : yes251fpu_exception : yes252cpuid level : 13253wp : yes254flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault epb tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts255bugs :256bogomips : 4988.60257clflush size : 64258cache_alignment : 64259address sizes : 39 bits physical, 48 bits virtual260power management:261processor : 7262vendor_id : GenuineIntel263cpu family : 6264model : 60265model name : Intel(R) Core(TM) i7-4710MQ CPU @ 2.50GHz266stepping : 3267microcode : 0x22268cpu MHz : 2494.301269cache size : 6144 KB270physical id : 0271siblings : 8272core id : 3273cpu cores : 4274apicid : 7275initial apicid : 7276fpu : yes277fpu_exception : yes278cpuid level : 13279wp : yes280flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault epb tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts281bugs :282bogomips : 4988.60283clflush size : 64284cache_alignment : 64285address sizes : 39 bits physical, 48 bits virtual286power management:287"""288 with mock.patch('avocado.utils.cpu.platform.machine', return_value='x86_64'):289 with mock.patch('avocado.utils.cpu.open',290 return_value=self._get_file_mock(x86_64)):291 self.assertEqual(len(cpu.cpu_online_list()), 8)292 @unittest.skipUnless(recent_mock(),293 "mock library version cannot (easily) patch open()")294 def test_cpu_arch_i386(self):295 cpu_output = b"""processor : 0296vendor_id : GenuineIntel297cpu family : 6298model : 13299model name : Intel(R) Pentium(R) M processor 2.00GHz300stepping : 8301microcode : 0x20302cpu MHz : 2000.000303cache size : 2048 KB304physical id : 0305siblings : 1306core id : 0307cpu cores : 1308apicid : 0309initial apicid : 0310fdiv_bug : no311f00f_bug : no312coma_bug : no313fpu : yes314fpu_exception : yes315cpuid level : 2316wp : yes317flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov clflush dts acpi mmx fxsr sse sse2 ss tm pbe nx bts cpuid est tm2318bugs : cpu_meltdown spectre_v1 spectre_v2319bogomips : 3990.09320clflush size : 64321cache_alignment : 64322address sizes : 32 bits physical, 32 bits virtual323power management:324"""325 with mock.patch('avocado.utils.cpu.open',326 return_value=self._get_file_mock(cpu_output)):327 self.assertEqual(cpu.get_cpu_arch(), "i386")328 @unittest.skipUnless(recent_mock(),329 "mock library version cannot (easily) patch open()")330 def test_cpu_arch_x86_64(self):331 cpu_output = b"""processor : 0332vendor_id : GenuineIntel333cpu family : 6334model : 60335model name : Intel(R) Core(TM) i7-4810MQ CPU @ 2.80GHz336stepping : 3337microcode : 0x24338cpu MHz : 1766.058339cache size : 6144 KB340physical id : 0341siblings : 8342core id : 0343cpu cores : 4344apicid : 0345initial apicid : 0346fpu : yes347fpu_exception : yes348cpuid level : 13349wp : yes350flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault epb invpcid_single pti tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt ibpb ibrs stibp dtherm ida arat pln pts351bugs : cpu_meltdown spectre_v1 spectre_v2352bogomips : 5586.93353clflush size : 64354cache_alignment : 64355address sizes : 39 bits physical, 48 bits virtual356power management:357"""358 with mock.patch('avocado.utils.cpu.open',359 return_value=self._get_file_mock(cpu_output)):360 self.assertEqual(cpu.get_cpu_arch(), "x86_64")361 @unittest.skipUnless(recent_mock(),362 "mock library version cannot (easily) patch open()")363 def test_cpu_arch_ppc64_power8(self):364 cpu_output = b"""processor : 88365cpu : POWER8E (raw), altivec supported366clock : 3325.000000MHz367revision : 2.1 (pvr 004b 0201)368timebase : 512000000369platform : PowerNV370model : 8247-21L371machine : PowerNV 8247-21L372firmware : OPAL v3373"""374 with mock.patch('avocado.utils.cpu.open',375 return_value=self._get_file_mock(cpu_output)):376 self.assertEqual(cpu.get_cpu_arch(), "power8")377 @unittest.skipUnless(recent_mock(),378 "mock library version cannot (easily) patch open()")379 def test_cpu_arch_ppc64_le_power8(self):380 cpu_output = b"""processor : 88381cpu : POWER8E (raw), altivec supported382clock : 3325.000000MHz383revision : 2.1 (pvr 004b 0201)384timebase : 512000000385platform : PowerNV386model : 8247-21L387machine : PowerNV 8247-21L388firmware : OPAL v3389"""390 with mock.patch('avocado.utils.cpu.open',391 return_value=self._get_file_mock(cpu_output)):392 self.assertEqual(cpu.get_cpu_arch(), "power8")393 @unittest.skipUnless(recent_mock(),394 "mock library version cannot (easily) patch open()")395 def test_cpu_arch_ppc64_le_power9(self):396 cpu_output = b"""processor : 20397cpu : POWER9 (raw), altivec supported398clock : 2050.000000MHz399revision : 1.0 (pvr 004e 0100)400timebase : 512000000401platform : PowerNV402model : 8375-42A403machine : PowerNV 8375-42A404firmware : OPAL405"""406 with mock.patch('avocado.utils.cpu.open',407 return_value=self._get_file_mock(cpu_output)):408 self.assertEqual(cpu.get_cpu_arch(), "power9")409 @unittest.skipUnless(recent_mock(),410 "mock library version cannot (easily) patch open()")411 def test_cpu_arch_s390(self):412 cpu_output = b"""vendor_id : IBM/S390413# processors : 2414bogomips per cpu: 2913.00415max thread id : 0416features : esan3 zarch stfle msa ldisp eimm dfp edat etf3eh highgprs te sie417facilities : 0 1 2 3 4 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 30 31 32 33 34 35 36 37 40 41 42 43 44 45 46 47 48 49 50 51 52 57 64 65 66 67 68 69 70 71 72 73 75 76 77 78 81 82 131 132418cache0 : level=1 type=Data scope=Private size=96K line_size=256 associativity=6419cache1 : level=1 type=Instruction scope=Private size=64K line_size=256 associativity=4420cache2 : level=2 type=Data scope=Private size=1024K line_size=256 associativity=8421cache3 : level=2 type=Instruction scope=Private size=1024K line_size=256 associativity=8422cache4 : level=3 type=Unified scope=Shared size=49152K line_size=256 associativity=12423cache5 : level=4 type=Unified scope=Shared size=393216K line_size=256 associativity=24424processor 0: version = 00, identification = 3FC047, machine = 2827425processor 1: version = 00, identification = 3FC047, machine = 2827426cpu number : 0427cpu MHz dynamic : 5504428cpu MHz static : 5504429cpu number : 1430cpu MHz dynamic : 5504431cpu MHz static : 5504432"""433 with mock.patch('avocado.utils.cpu.open',434 return_value=self._get_file_mock(cpu_output)):435 self.assertEqual(cpu.get_cpu_arch(), "s390")436 @unittest.skipUnless(recent_mock(),437 "mock library version cannot (easily) patch open()")438 def test_cpu_arch_arm_v7(self):439 cpu_output = b"""Processor : ARMv7 Processor rev 2 (v7l)440BogoMIPS : 994.65441Features : swp half thumb fastmult vfp edsp thumbee neon vfpv3442CPU implementer : 0x41443CPU architecture: 7444CPU variant : 0x2445CPU part : 0xc08446CPU revision : 2447Hardware : herring448Revision : 0034449Serial : 3534268a5e0700ec450"""451 with mock.patch('avocado.utils.cpu.open',452 return_value=self._get_file_mock(cpu_output)):453 self.assertEqual(cpu.get_cpu_arch(), "arm")454 @unittest.skipUnless(recent_mock(),455 "mock library version cannot (easily) patch open()")456 def test_cpu_arch_arm_v8(self):457 cpu_output = b"""processor : 0458BogoMIPS : 200.00459Features : fp asimd evtstrm aes pmull sha1 sha2 crc32 cpuid460CPU implementer : 0x43461CPU architecture: 8462CPU variant : 0x1463CPU part : 0x0a1464CPU revision : 1465"""466 with mock.patch('avocado.utils.cpu.open',467 return_value=self._get_file_mock(cpu_output)):468 self.assertEqual(cpu.get_cpu_arch(), "aarch64")469 @unittest.skipUnless(recent_mock(),470 "mock library version cannot (easily) patch open()")471 def test_cpu_arch_risc_v(self):472 cpu_output = b"""hart : 1473isa : rv64imafdc474mmu : sv39475uarch : sifive,rocket0476"""477 with mock.patch('avocado.utils.cpu.open',478 return_value=self._get_file_mock(cpu_output)):479 self.assertEqual(cpu.get_cpu_arch(), "riscv")480 @unittest.skipUnless(recent_mock(),481 "mock library version cannot (easily) patch open()")482 def test_get_cpuidle_state_off(self):483 retval = {0: {0: 0}}484 with mock.patch('avocado.utils.cpu.cpu_online_list', return_value=[0]):485 with mock.patch('glob.glob', return_value=['/sys/devices/system/cpu/cpu0/cpuidle/state1']):486 with mock.patch('avocado.utils.cpu.open', return_value=io.BytesIO(b'0')):487 self.assertEqual(cpu.get_cpuidle_state(), retval)488 @unittest.skipUnless(recent_mock(),489 "mock library version cannot (easily) patch open()")490 def test_get_cpuidle_state_on(self):491 retval = {0: {0: 1}}492 with mock.patch('avocado.utils.cpu.cpu_online_list', return_value=[0]):...

Full Screen

Full Screen

local_test.py

Source:local_test.py Github

copy

Full Screen

1"""Local API tests.2"""3from __future__ import absolute_import4from __future__ import division5from __future__ import print_function6from __future__ import unicode_literals7import functools8import io9import json10import os11import shutil12import tarfile13import tempfile14import unittest15import mock16import six17from six.moves import _thread18from treadmill import appenv19from treadmill import fs20from treadmill.api import local21# pylint: disable=W062222from treadmill.exc import (LocalFileNotFoundError, InvalidInputError)23APPS_DIR = os.path.join('...', 'apps')24ARCHIVES_DIR = os.path.join('...', 'archives')25METRICS_DIR = os.path.join('...', 'metrics')26RUNNING_DIR = os.path.join('...', 'running')27# do not complain about accessing protected member28# pylint: disable=W021229class MetricsAPITest(unittest.TestCase):30 """treadmill.api.local._MetricsAPI tests."""31 def setUp(self):32 tm_env = mock.Mock()33 tm_env.metrics_dir = METRICS_DIR34 tm_env.archives_dir = ARCHIVES_DIR35 tm_env_func = mock.Mock()36 tm_env_func.return_value = tm_env37 self.met = local.mk_metrics_api(tm_env_func)()38 def test_abs_met_path(self):39 """Test the path for application and service rrd metrics"""40 self.assertEqual(41 self.met._abs_met_path(app='proid.app#00123',42 uniq='asdf'),43 os.path.join(METRICS_DIR, 'apps', 'proid.app-00123-asdf.rrd')44 )45 self.assertEqual(46 self.met._abs_met_path(service='test'),47 os.path.join(METRICS_DIR, 'core', 'test.rrd')48 )49 def test_unpack_id(self):50 """Test the unpack_id() method."""51 self.assertEqual(self.met._unpack_id('test'), {'service': 'test'})52 self.assertEqual(53 self.met._unpack_id('proid.app#123/asdf'), {'app': 'proid.app#123',54 'uniq': 'asdf'})55 def test_file_path(self):56 """Test the publicly accessable file_path() method"""57 self.assertEqual(58 self.met.file_path('proid.app#00123/asdf'),59 os.path.join(METRICS_DIR, 'apps', 'proid.app-00123-asdf.rrd')60 )61 @mock.patch('treadmill.rrdutils.get_json_metrics')62 def test_get(self, mock_):63 """Test the _MetricsAPI.get() method."""64 self.met._get_rrd_file = mock.Mock(return_value='rrd.file')65 self.met.get('id', 'foo', as_json=True)66 mock_.assert_called_with('rrd.file', 'foo')67class LogAPITest(unittest.TestCase):68 """treadmill.api.local._LogAPI tests."""69 def setUp(self):70 tm_env = mock.Mock()71 tm_env.apps_dir = APPS_DIR72 tm_env.archives_dir = ARCHIVES_DIR73 tm_env.running_dir = RUNNING_DIR74 tm_env_func = mock.Mock()75 tm_env_func.return_value = tm_env76 self.log = local.mk_logapi(tm_env_func)()77 def test_get(self):78 """Test the _LogAPI.get() method."""79 with mock.patch('treadmill.api.local._get_file'):80 with mock.patch('io.open', mock.mock_open()):81 with self.assertRaises(InvalidInputError):82 self.log.get('no/such/log/exists', start=-1)83 with mock.patch('treadmill.api.local._fragment',84 mock.Mock(spec_set=True,85 return_value='invoked')):86 self.assertEqual(87 self.log.get('no/such/log/exists', start=0, limit=3),88 'invoked'89 )90 # make sure that things don't break if the log file contains some91 # binary data with ord num > 128 (eg. \xc5 below) ie. not ascii92 # decodeable93 with tempfile.NamedTemporaryFile(mode='wb', delete=False) as temp:94 temp.write(b'\x00\x01\xc5\x0a')95 temp.seek(0)96 with mock.patch('treadmill.api.local._get_file',97 return_value=temp.name):98 self.assertTrue(''.join(self.log.get('no/such/log/exists')))99 @mock.patch('treadmill.api.local._get_file')100 def test_get_logfile_new(self, _get_file_mock):101 """Test _LogAPI._get_logfile_new()."""102 # ARCHIVED103 self.log._get_logfile_new('proid.app#123', 'uniq', 'service', 'foo')104 _get_file_mock.assert_called_once_with(105 os.path.join('...', 'apps', 'proid.app-123-uniq', 'data',106 'services', 'foo', 'data', 'log', 'current'),107 arch=os.path.join('...', 'archives',108 'proid.app-123-uniq.service.tar.gz'),109 arch_extract=True,110 arch_extract_filter=mock.ANY)111 _get_file_mock.reset_mock()112 # RUNNING113 self.log._get_logfile_new('proid.app#123', 'running', 'service', 'foo')114 _get_file_mock.assert_called_once_with(115 os.path.join('...', 'running', 'proid.app#123', 'data',116 'services', 'foo', 'data', 'log', 'current'),117 arch=os.path.join('...', 'archives',118 'proid.app-123-running.service.tar.gz'),119 arch_extract=False,120 arch_extract_filter=mock.ANY)121 @mock.patch('treadmill.api.local._get_file')122 def test_get_logfile_old(self, _get_file_mock):123 """Test _LogAPI._get_logfile_old()."""124 # ARCHIVED125 self.log._get_logfile_old('app#123', 'uniq', 'service', 'foo')126 _get_file_mock.assert_called_once_with(127 os.path.join('...', 'apps', 'app-123-uniq', 'services', 'foo',128 'log', 'current'),129 arch=os.path.join('...', 'archives',130 'app-123-uniq.service.tar.gz'),131 arch_extract=True,132 arch_extract_filter=mock.ANY)133 _get_file_mock.reset_mock()134 # RUNNING135 self.log._get_logfile_old('proid.app#123', 'running', 'service', 'foo')136 _get_file_mock.assert_called_once_with(137 os.path.join('...', 'running', 'proid.app#123', 'services', 'foo',138 'log', 'current'),139 arch=os.path.join('...', 'archives',140 'proid.app-123-running.service.tar.gz'),141 arch_extract=False,142 arch_extract_filter=mock.ANY)143class HelperFuncTests(unittest.TestCase):144 """treadmill.api.local top level function tests."""145 def setUp(self):146 self.tm_env = mock.Mock()147 self.tm_env.apps_dir = APPS_DIR148 self.tm_env.archives_dir = ARCHIVES_DIR149 self.tm_env.running_dir = RUNNING_DIR150 self.tm_env_func = mock.Mock()151 self.tm_env_func.return_value = self.tm_env152 @mock.patch('_thread.get_ident', mock.Mock(return_value='123'))153 def test_temp_dir(self):154 """Dummy test of _temp_dir()."""155 self.assertEqual(local._temp_dir(),156 os.path.join(tempfile.gettempdir(), 'local-123.temp'))157 shutil.copy(__file__,158 os.path.join(tempfile.gettempdir(), 'local-123.temp'))159 self.assertEqual(len(os.listdir(160 os.path.join(tempfile.gettempdir(), 'local-123.temp'))), 1)161 # subsequent invocations should delete the temp dir content162 self.assertEqual(os.listdir(local._temp_dir()), [])163 def test_extract_archive(self):164 """Test the _extract_archive() func."""165 with self.assertRaises(LocalFileNotFoundError):166 local._extract_archive('no_such_file')167 temp_dir = os.path.join(tempfile.gettempdir(), 'tm-unittest')168 temp_subdir = os.path.join(temp_dir, 'foo')169 shutil.rmtree(temp_dir, True)170 os.mkdir(temp_dir)171 os.mkdir(temp_subdir)172 shutil.copy(__file__, os.path.join(temp_subdir, 'current'))173 shutil.copy(__file__, os.path.join(temp_subdir, '@4000zzzz.s'))174 with tarfile.open(os.path.join(temp_dir, 'f.tar'), mode='w') as tar:175 orig_cwd = os.getcwd()176 os.chdir(temp_dir)177 # this creates 3 entries because the subdir is separate entry...178 tar.add('foo')179 os.chdir(orig_cwd)180 self.assertEqual(181 len(local._extract_archive(os.path.join(temp_dir, 'f.tar'))),182 3)183 self.assertEqual(184 len(local._extract_archive(os.path.join(temp_dir, 'f.tar'),185 extract_filter=functools.partial(186 local._arch_log_filter,187 rel_log_dir='foo'))),188 2)189 shutil.rmtree(temp_dir)190 def test_concat_files(self):191 """Test the _concat_files() func."""192 ident = _thread.get_ident()193 file_lst = []194 for i in six.moves.range(3):195 file_lst.append(os.path.join(tempfile.gettempdir(),196 '{}.{}'.format(ident, i)))197 with io.open(file_lst[-1], 'wb') as logs:198 logs.write(bytearray('{}\n'.format(i), 'ascii'))199 result = local._concat_files(file_lst)200 self.assertTrue(isinstance(result, io.TextIOWrapper))201 self.assertEqual(result.read(), u'0\n1\n2\n')202 # check that _concat_files() catches IOError for non existing file203 file_lst.append('no_such_file')204 local._concat_files(file_lst)205 for f in file_lst[:-1]:206 os.remove(f)207 # make sure that things don't break if the log file contains some208 # binary data with ord num > 128 (eg. \xc5 below) ie. not ascii209 # decodeable210 with tempfile.NamedTemporaryFile(mode='wb', delete=False) as temp:211 temp.write(b'\x42\x00\x01\xc5\x45\x0a')212 temp.seek(0)213 self.assertTrue(''.join(local._concat_files([temp.name])))214 def test_rel_log_dir_path(self):215 """Test the rel_log_dir_path() func."""216 self.assertEqual(local._rel_log_dir_path('sys', 'foo'),217 os.path.join('sys', 'foo', 'data', 'log'))218 self.assertEqual(local._rel_log_dir_path('app', 'foo'),219 os.path.join('services', 'foo', 'data', 'log'))220 def test_abs_log_dir_path(self):221 """Test the abs_log_dir_path() func."""222 self.assertEqual(223 local._abs_log_dir_path(self.tm_env_func, 'proid.app-123',224 'running', '...'),225 os.path.join('...', 'running', 'proid.app-123', 'data', '...'))226 self.assertEqual(227 local._abs_log_dir_path(self.tm_env_func, 'proid.app-123',228 'xyz', '...'),229 os.path.join('...', 'apps', 'proid.app-123-xyz', 'data', '...'))230 def test_fragment_file(self):231 """Test the _fragment() func."""232 self.assertEqual(list(local._fragment(iter(six.moves.range(10)),233 limit=-1)),234 list(six.moves.range(10)))235 self.assertEqual(236 list(local._fragment(iter(six.moves.range(10)), limit=2)),237 list(six.moves.range(2)))238 self.assertEqual(239 list(local._fragment(iter(six.moves.range(10)), start=0, limit=3)),240 list(six.moves.range(3)))241 self.assertEqual(242 list(local._fragment(iter(six.moves.range(10)), start=1, limit=4)),243 list(six.moves.range(1, 5)))244 self.assertEqual(245 list(local._fragment(iter(six.moves.range(10)), start=5,246 limit=-1)),247 list(six.moves.range(5, 10)))248 self.assertEqual(249 list(local._fragment(iter(six.moves.range(10)), start=8, limit=8)),250 [8, 9])251 self.assertEqual(252 list(local._fragment(iter(six.moves.range(10)), start=8,253 limit=40)),254 [8, 9])255 with self.assertRaises(InvalidInputError):256 list(local._fragment(iter(six.moves.range(10)), start=99,257 limit=-1))258 with self.assertRaises(InvalidInputError):259 list(local._fragment(iter(six.moves.range(10)), 99, limit=5))260 def test_fragment_in_reverse(self):261 """Test the _fragment_in_reverse() func."""262 self.assertEqual(263 list(local._fragment_in_reverse(iter(six.moves.range(10)),264 limit=-1)),265 list(reversed(six.moves.range(10))))266 self.assertEqual(267 list(local._fragment_in_reverse(iter(six.moves.range(10)),268 limit=2)),269 [9, 8])270 self.assertEqual(271 list(local._fragment_in_reverse(iter(six.moves.range(10)), 0,272 limit=3)),273 [9, 8, 7])274 self.assertEqual(275 list(local._fragment_in_reverse(iter(six.moves.range(10)), 1, 4)),276 list(six.moves.range(8, 4, -1)))277 self.assertEqual(278 list(local._fragment_in_reverse(iter(six.moves.range(10)),279 start=5, limit=-1)),280 [4, 3, 2, 1, 0])281 self.assertEqual(282 list(local._fragment_in_reverse(iter(six.moves.range(10)), 8,283 limit=8)),284 [1, 0])285 self.assertEqual(286 list(local._fragment_in_reverse(iter(six.moves.range(10)), 8,287 limit=40)),288 [1, 0])289 self.assertEqual(290 list(local._fragment_in_reverse(iter(six.moves.range(10)), 8, 1)),291 [1])292 with self.assertRaises(InvalidInputError):293 list(local._fragment_in_reverse(iter(six.moves.range(10)),294 start=99))295 with self.assertRaises(InvalidInputError):296 list(local._fragment_in_reverse(iter(six.moves.range(10)), 99,297 limit=9))298 def test_archive_path(self):299 """Test the _archive_paths() func."""300 self.assertEqual(301 local._archive_path(self.tm_env_func, 'app', 'app#123', 'uniq'),302 os.path.join(ARCHIVES_DIR, 'app-123-uniq.app.tar.gz')303 )304class APITest(unittest.TestCase):305 """Basic API tests.306 """307 def setUp(self):308 self.root = tempfile.mkdtemp()309 os.environ['TREADMILL_APPROOT'] = self.root310 self.tm_env = appenv.AppEnvironment(root=self.root)311 fs.mkdir_safe(self.tm_env.apps_dir)312 fs.mkdir_safe(self.tm_env.archives_dir)313 full_names = (314 ('proid.simplehttp', '0001025686', 'ymweWiRm86C7A'),315 ('proid.myapi.test', '0001027473', 'kJoV4j0DU6dtJ'),316 )317 for app, instance, uniq in full_names:318 link = '#'.join([app, instance])319 fs.mkfile_safe(os.path.join(self.tm_env.running_dir, link))320 target = '-'.join([app, instance, uniq])321 fs.mkdir_safe(os.path.join(self.tm_env.apps_dir, target, 'data'))322 fs.symlink_safe(323 os.path.join(self.tm_env.running_dir, link),324 os.path.join(self.tm_env.apps_dir, target),325 )326 files = (327 # incorrect file328 'proid.app-foo-bar#123.sys.tar.gz',329 'proid.app#123.sys.tar.gz',330 # correct file331 'proid.app-123-uniq.sys.tar.gz',332 'proid.test.sleep-901-uniq.sys.tar.gz',333 )334 for f in files:335 fs.mkfile_safe(os.path.join(self.tm_env.archives_dir, f))336 self.api = local.API()337 def tearDown(self):338 if self.root and os.path.isdir(self.root):339 shutil.rmtree(self.root)340 def test_archive(self):341 """Test ArvhiveApi's get() method.342 """343 with self.assertRaises(LocalFileNotFoundError):344 self.api.archive.get('no/such/archive')345 def test_list_running(self):346 """Test _list_running().347 """348 res = self.api.list(state='running')349 self.assertEqual(len(res), 2)350 self.assertSetEqual(351 set(ins['_id'] for ins in res),352 set([353 'proid.simplehttp#0001025686/ymweWiRm86C7A',354 'proid.myapi.test#0001027473/kJoV4j0DU6dtJ',355 ])356 )357 res = self.api.list(358 state='running',359 app_name='proid.simplehttp#0001025686',360 )361 self.assertEqual(len(res), 1)362 self.assertEqual(363 res[0]['_id'], 'proid.simplehttp#0001025686/ymweWiRm86C7A'364 )365 def test_list_finished(self):366 """Test _list_finished().367 """368 res = self.api.list(state='finished')369 self.assertEqual(len(res), 2)370 self.assertSetEqual(371 set(ins['_id'] for ins in res),372 set([373 'proid.app#123/uniq',374 'proid.test.sleep#901/uniq',375 ])376 )377 cases = [378 ('proid.app#123', 1),379 ('proid.test.sleep#901', 1),380 ('proid.unknown#1', 0),381 ]382 for app_name, expected in cases:383 res = self.api.list(state='finished', app_name=app_name)384 self.assertEqual(len(res), expected)385 def test_get(self):386 """Test _get(uniqid).387 """388 state = {'foo': 'bar'}389 path_to_state_file = os.path.join(390 self.tm_env.apps_dir,391 'proid.simplehttp-0001025686-ymweWiRm86C7A',392 'data',393 'state.json',394 )395 with io.open(path_to_state_file, 'w') as f:396 json.dump(state, f)397 # check that state.json can be read back successfully398 self.assertEqual(399 self.api.get('proid.simplehttp-0001025686/ymweWiRm86C7A'),400 state,401 )402 # add state.json to an archive403 path_to_archive = os.path.join(404 self.tm_env.archives_dir,405 'proid.app-123-uniq.sys.tar.gz'406 )407 with tarfile.open(path_to_archive, mode='w') as out:408 out.add(path_to_state_file, arcname='state.json')409 # check that state.json can be read back from an archive410 self.assertEqual(self.api.get('proid.app-123/uniq'), state)411if __name__ == '__main__':...

Full Screen

Full Screen

test_service.py

Source:test_service.py Github

copy

Full Screen

...177 "builtins.open",178 mock_open(read_data=base64.b64decode(load_fixture("color_extractor_file.txt"))),179 create=True,180)181def _get_file_mock(file_path):182 """Convert file to BytesIO for testing due to PIL UnidentifiedImageError."""183 _file = None184 with open(file_path) as file_handler:185 _file = io.BytesIO(file_handler.read())186 _file.name = "color_extractor.jpg"187 _file.seek(0)188 return _file189@patch("os.path.isfile", Mock(return_value=True))190@patch("os.access", Mock(return_value=True))191async def test_file(hass):192 """Test that the file only service reads a file and translates to light RGB."""193 service_data = {194 ATTR_PATH: "/opt/image.png",195 ATTR_ENTITY_ID: LIGHT_ENTITY,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful