Best Python code snippet using lisa_python
test_modules.py
Source:test_modules.py  
...69        modules_system.unload_module('foo')70    else:71        with pytest.raises(EnvironError):72            modules_system.load_module('foo')73        assert not modules_system.is_module_loaded('foo')74        assert 'foo' not in modules_system.loaded_modules()75        modules_system.load_module('testmod_foo')76        assert modules_system.is_module_loaded('testmod_foo')77        assert 'testmod_foo' in modules_system.loaded_modules()78        assert 'TESTMOD_FOO' in os.environ79        modules_system.unload_module('testmod_foo')80        assert not modules_system.is_module_loaded('testmod_foo')81        assert 'testmod_foo' not in modules_system.loaded_modules()82        assert 'TESTMOD_FOO' not in os.environ83def test_module_load_with_path(modules_system_nopath):84    if modules_system_nopath.name in ['nomod', 'spack']:85        pytest.skip('module load not supported')86    modules_system_nopath.load_module('testmod_foo',87                                      path=test_util.TEST_MODULES)88    with modules_system_nopath.change_module_path(test_util.TEST_MODULES):89        assert modules_system_nopath.is_module_loaded('testmod_foo')90        assert 'testmod_foo' in modules_system_nopath.loaded_modules()91        assert 'TESTMOD_FOO' in os.environ92def test_module_load_collection(modules_system, module_collection):93    # Load a conflicting module first94    modules_system.load_module('testmod_bar')95    modules_system.load_module(module_collection, collection=True)96    assert modules_system.is_module_loaded('testmod_base')97    assert modules_system.is_module_loaded('testmod_foo')98def test_module_load_force(modules_system):99    if modules_system.name in ['nomod', 'spack']:100        modules_system.load_module('foo', force=True)101    else:102        modules_system.load_module('testmod_foo')103        unloaded = modules_system.load_module('testmod_foo', force=True)104        assert unloaded == [('testmod_foo', [])]105        assert modules_system.is_module_loaded('testmod_foo')106        unloaded = modules_system.load_module('testmod_bar', force=True)107        assert modules_system.is_module_loaded('testmod_bar')108        assert not modules_system.is_module_loaded('testmod_foo')109        assert unloaded == [('testmod_bar', ['testmod_foo'])]110        assert 'TESTMOD_BAR' in os.environ111def test_module_load_force_collection(modules_system, module_collection):112    # Load a conflicting module first113    modules_system.load_module('testmod_bar')114    unloaded = modules_system.load_module(module_collection,115                                          collection=True, force=True)116    assert unloaded == [('test_collection', [])]117    assert modules_system.is_module_loaded('testmod_base')118    assert modules_system.is_module_loaded('testmod_foo')119def test_module_load_collection_searchpath(modules_system, tmpdir,120                                           module_collection):121    p1 = str(tmpdir.mkdir('path1'))122    p2 = str(tmpdir.mkdir('path2'))123    modules_system.searchpath_add(p1)124    modules_system.searchpath_add(p2)125    modules_system.searchpath_remove(p1)126    modules_system.load_module(module_collection, collection=True)127    assert p1 not in modules_system.searchpath128    assert p2 in modules_system.searchpath129def test_module_unload_all(modules_system):130    if modules_system.name == 'nomod':131        modules_system.unload_all()132    else:133        modules_system.load_module('testmod_base')134        modules_system.unload_all()135        assert 0 == len(modules_system.loaded_modules())136def test_module_list(modules_system):137    if modules_system.name == 'nomod':138        assert 0 == len(modules_system.loaded_modules())139    elif modules_system.name == 'spack':140        # If Spack is installed, we can't be sure that the user has not loaded141        # any module and we cannot unload them in here, since we don't have142        # Python bindings. So we only check that we get a list back.143        assert isinstance(modules_system.loaded_modules(), list)144    else:145        modules_system.load_module('testmod_foo')146        assert 'testmod_foo' in modules_system.loaded_modules()147        modules_system.unload_module('testmod_foo')148def test_module_conflict_list(modules_system):149    if modules_system.name in ['nomod', 'spack']:150        assert 0 == len(modules_system.conflicted_modules('foo'))151    else:152        conflict_list = modules_system.conflicted_modules('testmod_bar')153        assert 'testmod_foo' in conflict_list154        assert 'testmod_boo' in conflict_list155def test_module_available_all(modules_system):156    modules = sorted(modules_system.available_modules())157    if modules_system.name == 'nomod':158        assert modules == []159    elif modules_system.name == 'spack':160        # If Spack is installed, we can't fool it with environment variables161        # about its installed packages, like we can do with modules, so we162        # simply check that we get a list back.163        assert isinstance(modules, list)164    else:165        assert (modules == ['testmod_bar', 'testmod_base',166                            'testmod_boo', 'testmod_ext', 'testmod_foo'])167def _test_module_available_substr(modules_system):168    modules = sorted(modules_system.available_modules('testmod_b'))169    if modules_system.name == 'nomod':170        assert modules == []171    else:172        assert (modules == ['testmod_bar', 'testmod_base', 'testmod_boo'])173@test_util.dispatch('modules_system', suffix=lambda ms: ms.name)174def test_emit_load_commands(modules_system):175    modules_system.module_map = {176        'm0': ['m1', 'm2']177    }178    yield179def _emit_load_commands_tmod(modules_system):180    emit_cmds = modules_system.emit_load_commands181    assert emit_cmds('foo') == ['module load foo']182    assert emit_cmds('foo/1.2') == ['module load foo/1.2']183    # Module mappings are not taking into account since v3.3184    assert emit_cmds('m0') == ['module load m0']185def _emit_load_commands_tmod4(modules_system):186    emit_cmds = modules_system.emit_load_commands187    assert emit_cmds('foo') == ['module load foo']188    assert emit_cmds('foo', collection=True) == [189        'module restore foo', f'module use {test_util.TEST_MODULES}'190    ]191    assert emit_cmds('foo/1.2') == ['module load foo/1.2']192    if modules_system.name == 'lmod':193        assert emit_cmds('foo', path='/path') == ['module use /path',194                                                  'module load foo']195    else:196        assert emit_cmds('foo', path='/path') == ['module use /path',197                                                  'module load foo',198                                                  'module unuse /path']199    # Module mappings are not taking into account since v3.3200    assert emit_cmds('m0') == ['module load m0']201    assert emit_cmds('m0', collection=True) == [202        'module restore m0', f'module use {test_util.TEST_MODULES}'203    ]204def _emit_load_commands_lmod(modules_system):205    return _emit_load_commands_tmod4(modules_system)206def _emit_load_commands_spack(modules_system):207    emit_cmds = modules_system.emit_load_commands208    assert emit_cmds('foo') == ['spack load foo']209    assert emit_cmds('foo/1.2') == ['spack load foo/1.2']210    # Module mappings are not taking into account since v3.3211    assert emit_cmds('m0') == ['spack load m0']212def _emit_load_commands_nomod(modules_system):213    emit_cmds = modules_system.emit_load_commands214    assert emit_cmds('foo') == []215    assert emit_cmds('foo/1.2') == []216    assert emit_cmds('m0') == []217@test_util.dispatch('modules_system', suffix=lambda ms: ms.name)218def test_emit_unload_commands(modules_system):219    modules_system.module_map = {220        'm0': ['m1', 'm2']221    }222    yield223def _emit_unload_commands_tmod(modules_system):224    emit_cmds = modules_system.emit_unload_commands225    assert emit_cmds('foo') == ['module unload foo']226    assert emit_cmds('foo/1.2') == ['module unload foo/1.2']227    # Module mappings are not taking into account since v3.3228    assert emit_cmds('m0') == ['module unload m0']229def _emit_unload_commands_tmod4(modules_system):230    emit_cmds = modules_system.emit_unload_commands231    assert emit_cmds('foo') == ['module unload foo']232    assert emit_cmds('foo', collection=True) == []233    assert emit_cmds('foo/1.2') == ['module unload foo/1.2']234    assert emit_cmds('m0', collection=True) == []235    # Module mappings are not taking into account since v3.3236    assert emit_cmds('m0') == ['module unload m0']237def _emit_unload_commands_lmod(modules_system):238    emit_cmds = modules_system.emit_unload_commands239    assert emit_cmds('foo') == ['module unload foo']240    assert emit_cmds('foo/1.2') == ['module unload foo/1.2']241    # Module mappings are not taking into account since v3.3242    assert emit_cmds('m0') == ['module unload m0']243def _emit_unload_commands_spack(modules_system):244    emit_cmds = modules_system.emit_unload_commands245    assert emit_cmds('foo') == ['spack unload foo']246    assert emit_cmds('foo/1.2') == ['spack unload foo/1.2']247    # Module mappings are not taking into account since v3.3248    assert emit_cmds('m0') == ['spack unload m0']249def _emit_unload_commands_nomod(modules_system):250    emit_cmds = modules_system.emit_unload_commands251    assert emit_cmds('foo') == []252    assert emit_cmds('foo/1.2') == []253    assert emit_cmds('m0') == []254def test_module_construction():255    m = modules.Module('foo/1.2')256    assert m.name == 'foo'257    assert m.version == '1.2'258    assert m.collection is False259    with pytest.raises(ValueError):260        modules.Module('')261    with pytest.raises(ValueError):262        modules.Module(' ')263    with pytest.raises(TypeError):264        modules.Module(None)265    with pytest.raises(TypeError):266        modules.Module(23)267    m = modules.Module('foo/1.2', collection=True)268    assert m.collection is True269    m = modules.Module('foo/1.2', path='/foo/modules')270    assert m.path == '/foo/modules'271    assert m.collection is False272def test_module_equal():273    assert modules.Module('foo') == modules.Module('foo')274    assert modules.Module('foo/1.2') == modules.Module('foo/1.2')275    assert modules.Module('foo') == modules.Module('foo/1.2')276    assert hash(modules.Module('foo')) == hash(modules.Module('foo'))277    assert (hash(modules.Module('foo/1.2')) ==278            hash(modules.Module('foo/1.2')))279    assert hash(modules.Module('foo')) == hash(modules.Module('foo/1.2'))280    assert modules.Module('foo/1.2') != modules.Module('foo/1.3')281    assert modules.Module('foo') != modules.Module('bar')282    assert modules.Module('foo') != modules.Module('foobar')283    assert modules.Module('foo') != modules.Module('foo', collection=True)284    assert modules.Module('foo') != modules.Module('foo', path='/foo/modules')285@pytest.fixture286def modules_system_emu():287    class ModulesSystemEmulator(modules.ModulesSystemImpl):288        '''A convenience class that simulates a modules system.'''289        def __init__(self):290            self._loaded_modules = set()291            # The following two variables record the sequence of loads and292            # unloads293            self.load_seq = []294            self.unload_seq = []295        def loaded_modules(self):296            return list(self._loaded_modules)297        def conflicted_modules(self, module):298            return []299        def _execute(self, cmd, *args):300            return ''301        def load_module(self, module):302            self.load_seq.append(module.name)303            self._loaded_modules.add(module.name)304        def unload_module(self, module):305            self.unload_seq.append(module.name)306            try:307                self._loaded_modules.remove(module.name)308            except KeyError:309                pass310        def is_module_loaded(self, module):311            return module.name in self._loaded_modules312        def available_modules(self, substr):313            return []314        def name(self):315            return 'nomod_debug'316        def version(self):317            return '1.0'318        @property319        def modulecmd(self):320            return ''321        def unload_all(self):322            self._loaded_modules.clear()323        def searchpath(self):324            return []325        def searchpath_add(self, *dirs):326            pass327        def searchpath_remove(self, *dirs):328            pass329        def emit_load_instr(self, module):330            return ''331        def emit_unload_instr(self, module):332            return ''333    return modules.ModulesSystem(ModulesSystemEmulator())334def test_mapping_simple(modules_system_emu):335    #336    # m0 -> m1337    #338    modules_system_emu.module_map = {'m0': ['m1']}339    modules_system_emu.load_module('m0')340    assert modules_system_emu.is_module_loaded('m0')341    assert modules_system_emu.is_module_loaded('m1')342    assert ['m1'] == modules_system_emu.backend.load_seq343    # Unload module344    modules_system_emu.unload_module('m1')345    assert not modules_system_emu.is_module_loaded('m0')346    assert not modules_system_emu.is_module_loaded('m1')347def test_mapping_chain(modules_system_emu):348    #349    # m0 -> m1 -> m2350    #351    modules_system_emu.module_map = {352        'm0': ['m1'],353        'm1': ['m2']354    }355    modules_system_emu.load_module('m0')356    assert modules_system_emu.is_module_loaded('m0')357    assert modules_system_emu.is_module_loaded('m1')358    assert modules_system_emu.is_module_loaded('m2')359    assert ['m2'] == modules_system_emu.backend.load_seq360    # Unload module361    modules_system_emu.unload_module('m1')362    assert not modules_system_emu.is_module_loaded('m0')363    assert not modules_system_emu.is_module_loaded('m1')364    assert not modules_system_emu.is_module_loaded('m2')365def test_mapping_n_to_one(modules_system_emu):366    #367    # m0 -> m2 <- m1368    #369    modules_system_emu.module_map = {370        'm0': ['m2'],371        'm1': ['m2']372    }373    modules_system_emu.load_module('m0')374    assert modules_system_emu.is_module_loaded('m0')375    assert modules_system_emu.is_module_loaded('m1')376    assert modules_system_emu.is_module_loaded('m2')377    assert ['m2'] == modules_system_emu.backend.load_seq378    # Unload module379    modules_system_emu.unload_module('m0')380    assert not modules_system_emu.is_module_loaded('m0')381    assert not modules_system_emu.is_module_loaded('m1')382    assert not modules_system_emu.is_module_loaded('m2')383def test_mapping_one_to_n(modules_system_emu):384    #385    # m2 <- m0 -> m1386    #387    modules_system_emu.module_map = {388        'm0': ['m1', 'm2'],389    }390    modules_system_emu.load_module('m0')391    assert modules_system_emu.is_module_loaded('m0')392    assert modules_system_emu.is_module_loaded('m1')393    assert modules_system_emu.is_module_loaded('m2')394    assert ['m1', 'm2'] == modules_system_emu.backend.load_seq395    # m0 is loaded only if m1 and m2 are.396    modules_system_emu.unload_module('m2')397    assert not modules_system_emu.is_module_loaded('m0')398    assert modules_system_emu.is_module_loaded('m1')399def test_mapping_deep_dfs_order(modules_system_emu):400    #401    #    -- > m1 ---- > m3402    #   /       \403    # m0         \404    #   \         \405    #    -- > m2   -- > m4406    #407    modules_system_emu.module_map = {408        'm0': ['m1', 'm2'],409        'm1': ['m3', 'm4']410    }411    modules_system_emu.load_module('m0')412    assert modules_system_emu.is_module_loaded('m0')413    assert modules_system_emu.is_module_loaded('m1')414    assert modules_system_emu.is_module_loaded('m2')415    assert modules_system_emu.is_module_loaded('m3')416    assert modules_system_emu.is_module_loaded('m4')417    assert ['m3', 'm4', 'm2'] == modules_system_emu.backend.load_seq418    # Test unloading419    modules_system_emu.unload_module('m2')420    assert not modules_system_emu.is_module_loaded('m0')421    assert modules_system_emu.is_module_loaded('m1')422    assert not modules_system_emu.is_module_loaded('m2')423    assert modules_system_emu.is_module_loaded('m3')424    assert modules_system_emu.is_module_loaded('m4')425def test_mapping_deep_dfs_unload_order(modules_system_emu):426    #427    #    -- > m1 ---- > m3428    #   /       \429    # m0         \430    #   \         \431    #    -- > m2   -- > m4432    #433    modules_system_emu.module_map = {434        'm0': ['m1', 'm2'],435        'm1': ['m3', 'm4']436    }437    modules_system_emu.load_module('m0')438    modules_system_emu.unload_module('m0')439    assert ['m2', 'm4', 'm3'] == modules_system_emu.backend.unload_seq440def test_mapping_multiple_paths(modules_system_emu):441    #442    #    -- > m1443    #   /     ^444    # m0      |445    #   \     |446    #    -- > m2447    #448    modules_system_emu.module_map = {449        'm0': ['m1', 'm2'],450        'm2': ['m1'],451    }452    modules_system_emu.load_module('m0')453    assert modules_system_emu.is_module_loaded('m0')454    assert modules_system_emu.is_module_loaded('m1')455    assert modules_system_emu.is_module_loaded('m2')456    assert ['m1'] == modules_system_emu.backend.load_seq457    # Test unloading458    modules_system_emu.unload_module('m2')459    assert not modules_system_emu.is_module_loaded('m0')460    assert not modules_system_emu.is_module_loaded('m1')461    assert not modules_system_emu.is_module_loaded('m2')462def test_mapping_deep_multiple_paths(modules_system_emu):463    #464    #    -- > m1 ---- > m3465    #   /     ^ \466    # m0      |  \467    #   \     |   \468    #    -- > m2   -- > m4469    #470    modules_system_emu.module_map = {471        'm0': ['m1', 'm2'],472        'm1': ['m3', 'm4'],473        'm2': ['m1']474    }475    modules_system_emu.load_module('m0')476    assert modules_system_emu.is_module_loaded('m0')477    assert modules_system_emu.is_module_loaded('m1')478    assert modules_system_emu.is_module_loaded('m2')479    assert modules_system_emu.is_module_loaded('m3')480    assert modules_system_emu.is_module_loaded('m4')481    assert ['m3', 'm4'] == modules_system_emu.backend.load_seq482def test_mapping_cycle_simple(modules_system_emu):483    #484    # m0 -> m1 -> m0485    #486    modules_system_emu.module_map = {487        'm0': ['m1'],488        'm1': ['m0'],489    }490    with pytest.raises(EnvironError):491        modules_system_emu.load_module('m0')492    with pytest.raises(EnvironError):493        modules_system_emu.load_module('m1')494def test_mapping_single_module_self_loop(modules_system_emu):495    #496    # m0 -> m0497    #498    modules_system_emu.module_map = {499        'm0': ['m0'],500    }501    modules_system_emu.load_module('m0')502    assert modules_system_emu.is_module_loaded('m0')503    assert ['m0'] == modules_system_emu.backend.load_seq504def test_mapping_deep_cycle(modules_system_emu):505    #506    #    -- > m1 ---- > m3507    #   /     ^         |508    # m0      |         |509    #   \     |         .510    #    -- > m2 < ---- m4511    #512    modules_system_emu.module_map = {513        'm0': ['m1', 'm2'],514        'm1': ['m3'],515        'm2': ['m1'],516        'm3': ['m4'],517        'm4': ['m2']518    }519    with pytest.raises(EnvironError, match='m0->m1->m3->m4->m2->m1'):520        modules_system_emu.load_module('m0')521@pytest.fixture522def mapping_file(tmp_path):523    tmp_mapping = tmp_path / 'mapping'524    with open(tmp_mapping, 'w') as fp:525        yield fp526def test_mapping_from_file_simple(modules_system_emu, mapping_file):527    with mapping_file:528        mapping_file.write('m1:m2  m3   m3\n'529                           'm2 : m4 \n'530                           ' #m5: m6\n'531                           '\n'532                           ' m2: m7 m8\n'533                           'm9: m10 # Inline comment')534    reference_map = {535        'm1': ['m2', 'm3'],536        'm2': ['m7', 'm8'],537        'm9': ['m10']538    }539    modules_system_emu.load_mapping_from_file(mapping_file.name)540    assert reference_map == modules_system_emu.module_map541def test_mapping_from_file_missing_key_separator(modules_system_emu,542                                                 mapping_file):543    with mapping_file:544        mapping_file.write('m1 m2')545    with pytest.raises(ConfigError):546        modules_system_emu.load_mapping_from_file(mapping_file.name)547def test_mapping_from_file_empty_value(modules_system_emu, mapping_file):548    with mapping_file:549        mapping_file.write('m1: # m2')550    with pytest.raises(ConfigError):551        modules_system_emu.load_mapping_from_file(mapping_file.name)552def test_mapping_from_file_multiple_key_separators(modules_system_emu,553                                                   mapping_file):554    with mapping_file:555        mapping_file.write('m1 : m2 : m3')556    with pytest.raises(ConfigError):557        modules_system_emu.load_mapping_from_file(mapping_file.name)558def test_mapping_from_file_empty_key(modules_system_emu, mapping_file):559    with mapping_file:560        mapping_file.write(' :  m2')561    with pytest.raises(ConfigError):562        modules_system_emu.load_mapping_from_file(mapping_file.name)563def test_mapping_from_file_missing_file(modules_system_emu):564    with pytest.raises(OSError):565        modules_system_emu.load_mapping_from_file('foo')566def test_mapping_with_self_loop(modules_system_emu):567    modules_system_emu.module_map = {568        'm0': ['m1', 'm0', 'm2'],569        'm1': ['m4', 'm3']570    }571    modules_system_emu.load_module('m0')572    assert modules_system_emu.is_module_loaded('m0')573    assert modules_system_emu.is_module_loaded('m1')574    assert modules_system_emu.is_module_loaded('m2')575    assert modules_system_emu.is_module_loaded('m3')576    assert modules_system_emu.is_module_loaded('m4')577    assert ['m4', 'm3', 'm0', 'm2'] == modules_system_emu.backend.load_seq578def test_mapping_with_self_loop_and_duplicate_modules(modules_system_emu):579    modules_system_emu.module_map = {580        'm0': ['m0', 'm0', 'm1', 'm1'],581        'm1': ['m2', 'm3']582    }583    modules_system_emu.load_module('m0')584    assert modules_system_emu.is_module_loaded('m0')585    assert modules_system_emu.is_module_loaded('m1')586    assert modules_system_emu.is_module_loaded('m2')587    assert modules_system_emu.is_module_loaded('m3')...timekeeper_functions.py
Source:timekeeper_functions.py  
1#2# File  : timekeeper_functions.py3# 4# Brief : Exposes TimeKeeper API in python5#6# authors : Vignesh Babu7#8import os9from signal import SIGSTOP, SIGCONT10import time11import subprocess12TIMEKEEPER_FILE_NAME = "/proc/dilation/status"13DILATE = 'A'14DILATE_ALL = 'B'15FREEZE_OR_UNFREEZE = 'C'16FREEZE_OR_UNFREEZE_ALL = 'D'17LEAP = 'E'18ADD_TO_EXP_CBE = 'F'19ADD_TO_EXP_CS = 'G'20SYNC_AND_FREEZE = 'H'21START_EXP = 'I'22SET_INTERVAL = 'J'23PROGRESS = 'K'24FIX_TIMELINE = 'L'25RESET = 'M'26STOP_EXP = 'N'27SET_CBE_EXP_TIMESLICE = 'T'28SET_NETDEVICE_OWNER  = 'U'29PROGRESS_EXP_CBE = 'V'30RESUME_CBE = 'W'31def is_root() :32	if os.geteuid() == 0 :33		return 134	print "Needs to be run as root"35	return 036def is_Module_Loaded() :37	if os.path.isfile(TIMEKEEPER_FILE_NAME) == True:38		return 139	else :40		print "Timekeeper module is not loaded"41		return 042# Associates the given pid with a net_device43def set_netdevice_owner(pid,dev_name) :44	if is_root() == 0 or is_Module_Loaded() == 0 :45		print "ERROR in Set Netdevice Owner. TimeKeeper is not loaded!"46		return -147	cmd = SET_NETDEVICE_OWNER + "," + str(pid) + "," + str(dev_name)48	return send_to_timekeeper(cmd)49# Converts the double into a integer, makes computation easier in kernel land50# ie .5 is converted to -2000, 2 to 200051def fixDilation(dilation) :52        53	dil = 054	if dilation < 0 :55		print "Negative dilation does not make sense";56		return -157        58	if dilation < 1.0 and dilation > 0.0 :59		dil = (int)((1/dilation)*1000.0)60		dil = dil*-161        62	elif dilation == 1.0 or dilation == -1.0 :63                dil = 064	else:65		dil = (int)(dilation*1000.0)66        67        return dil;68def getpidfromname(lxcname) :69	my_pid = os.getpid()70	pid = -171	temp_file = "/tmp/" + str(my_pid) + "_timekeeper.txt"72	# For different versions of LXCs73	command = "lxc-info -n %s | grep PID | tr -s ' ' | cut -d ' ' -f 2 > %s" % (lxcname,temp_file)74	os.system(command)75	76	with open(temp_file,"r") as f :77		pidstr = f.read()78		if pidstr != "" :79			pid = int(pidstr)80	if pid == -1 :81		command  = "lxc-info -n %s | grep pid | tr -s ' ' | cut -d ' ' -f 2 > %s" % (lxcname,temp_file)82        	os.system(command)83		with open(temp_file,"r") as f :84	                pidstr = f.read()85        	        if pidstr != "" :86                	        pid = int(pidstr)87	88	89	os.remove(temp_file)90	91	return pid92def send_to_timekeeper(cmd) :93	if is_root()== 0 or is_Module_Loaded() == 0 :94		print "ERROR sending cmd to timekeeper"95		return -196	with open(TIMEKEEPER_FILE_NAME,"w") as f :97		f.write(cmd)98	return 199# timeslice in nanosecs100def set_cbe_experiment_timeslice(timeslice) :101	if is_root() == 0 or is_Module_Loaded() == 0 :102		print "ERROR setting timeslice value"103		return -1104	105	timeslice = int(timeslice)106	cmd = SET_CBE_EXP_TIMESLICE + "," + str(timeslice)107	return send_to_timekeeper(cmd)108#109#Given a pid, add that container to a CBE experiment.110#111def addToExp(pid) :112	if is_root() == 0 or is_Module_Loaded() == 0 :113		return -1 114	cmd = ADD_TO_EXP_CBE + "," + str(pid)115	return send_to_timekeeper(cmd)116#117#Starts a CBE Experiment118#119def startExp():120	if is_root() == 0 or is_Module_Loaded() == 0 :121		print "ERROR starting CBE Experiment"122		return -1123	cmd = START_EXP124	return send_to_timekeeper(cmd)125#126#Given all Pids added to experiment, will set all their virtual times to be the same, then freeze them all (CBE and CS)127#128def synchronizeAndFreeze() :129	if is_root() == 0 or is_Module_Loaded() == 0 :130		print "ERROR in Synchronize and Freeze"131		return -1132	cmd = SYNC_AND_FREEZE133	return send_to_timekeeper(cmd)134#135#Stop a running experiment (CBE or CS) **Do not call stopExp if you are waiting for a s3fProgress to return!!**136#137def stopExp() :138	if is_root() == 0 or is_Module_Loaded() == 0 :139		print "ERROR in Synchronize and Freeze"140		return -1141	cmd = STOP_EXP142	return send_to_timekeeper(cmd)143#144#Sets the TDF of the given pid145#146def dilate(pid,dilation) :147	if is_root() == 0 or is_Module_Loaded() == 0 :148		print "ERROR in setting dilation for pid :", pid149		return -1150	dil = fixDilation(dilation)151	if dil == - 1 :152		return - 1153	elif dil < 0 :154		cmd = DILATE + "," + str(pid) + ",1," + str(-1*dil)155	else :156		cmd = DILATE + "," + str(pid) + "," + str(dil)157	return send_to_timekeeper(cmd)158#159# Will set the TDF of a LXC and all of its children160#161def dilate_all(pid,dilation) :162	if is_root() == 0 or is_Module_Loaded() == 0 :163		print "ERROR in setting dilation for pid :", pid164		return -1165	dil = fixDilation(dilation)166	if dil == - 1 :167		return - 1168	elif dil < 0 :169		cmd = DILATE_ALL + "," + str(pid) + ",1," + str(-1*dil)170	else :171		cmd = DILATE_ALL + "," + str(pid) + "," + str(dil)172	return send_to_timekeeper(cmd)173#174# Takes an integer (pid of the process). This function will essentially 'freeze' the175# time of the process. It does this by sending a sigstop signal to the process.176#177def freeze(pid) :178	if is_root() == 0 or is_Module_Loaded() == 0 :179		return -1180	else :181		cmd = FREEZE_OR_UNFREEZE + "," + str(pid) + "," + SIGSTOP182		return send_to_timekeeper(cmd)183	184#185# Takes an integer (pid of the process). This function will unfreeze the process.186# When the process is unfrozen, it will think that no time has passed, and will187# continue doing whatever it was doing before it was frozen.188#189def unfreeze(pid) :190	if is_root() == 0 or is_Module_Loaded() == 0 :191		return -1192	cmd = FREEZE_OR_UNFREEZE + "," + str(pid) + "," +SIGCONT193	return send_to_timekeeper(cmd)194#195# Same as freeze, except that it will freeze the process as well as all of its children.196#197def freeze_all(pid) :198	if is_root() == 0 or is_Module_Loaded() == 0 :199		return -1200	else :201		cmd = FREEZE_OR_UNFREEZE_ALL + "," + str(pid) + "," + SIGSTOP202		return send_to_timekeeper(cmd)203	204# 205# Same as unfreeze, except that it will unfreeze the process as well as all of its children.206#207def unfreeze(pid) :208	if is_root() == 0 or is_Module_Loaded() == 0 :209		return -1210	cmd = FREEZE_OR_UNFREEZE_ALL + "," + str(pid) + "," +SIGCONT211	return send_to_timekeeper(cmd)212		213## CS Experiment Fuctions214def add_To_CS_Exp(pid,timeline) :215	if is_root() and is_Module_Loaded() and timeline >= 0 :216		cmd = ADD_TO_EXP_CS + "," + str(pid) + "," + str(timeline)217		return send_to_timekeeper(cmd)218	else:219		return -1220def leap(pid,interval) :221	if is_root() and is_Module_Loaded() and interval > 0 :222		cmd = LEAP + "," + str(pid) + "," + str(interval)223		return send_to_timekeeper(cmd)224	else:225		return -1226def set_interval(pid, interval, timeline) :227	if is_root() and is_Module_Loaded() and interval > 0:228		cmd = SET_INTERVAL + "," + str(pid) + "," + str(interval) + "," + str(timeline)229		return send_to_timekeeper(cmd)230	else:231		return -1232def fix_timeline(timeline) :233	if is_root() and is_Module_Loaded() and timeline >= 0:234		cmd = FIX_TIMELINE + "," + str(timeline)235		return send_to_timekeeper(cmd)236	else:237		return -1238def reset_timeline(timeline) :239	if is_root() and is_Module_Loaded() and timeline >= 0:240		cmd = RESET + "," + str(timeline)241		return send_to_timekeeper(cmd)242	else:243		return -1244def progress(timeline,mypid, force) :245	if is_root() and is_Module_Loaded() and timeline >= 0:246		cmd = PROGRESS + "," + str(timeline) + "," + str(mypid) + "," + str(force)247		return send_to_timekeeper(cmd)248	else:249		return -1250def progress_exp_cbe(n_rounds) :251	if is_root() and is_Module_Loaded() and n_rounds > 0 :252		cmd = PROGRESS_EXP_CBE + "," + str(n_rounds)253		return send_to_timekeeper(cmd)254	else:255		return -1256def resume_exp_cbe() :257	if is_root() and is_Module_Loaded() :258		cmd = RESUME_CBE259		return send_to_timekeeper(cmd)260	else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
