Best Python code snippet using avocado_python
test_processmonitor.py
Source:test_processmonitor.py  
...21    timeout = max(timeout, 1)22    while time.time() - started < timeout:23        all_stopped = True24        for process in processes:25            if ProcessMonitor.is_process_alive(process):26                all_stopped = False27                break28        if all_stopped:29            return30        time.sleep(0.5)31def sleep_tenth_sec():32    time.sleep(0.1)33def run_exit():34    return35class TestProcessMonitor(TestCase):36    def test_monitor(self):37        mp = MockProcess()38        p1 = Process(target=run_exit)39        p2 = Process(target=mp.run)40        p1.start()41        p2.start()42        pm = ProcessMonitor(p1, p2)43        pm.add_callbacks(pm.kill_processes, pm.exit)44        pm.start()45        wait_for_processes(10, p1, p2)46        self.assertFalse(pm.is_process_alive(p1), "process not finished")47        self.assertFalse(pm.is_process_alive(p2), "process not killed")48    def test_monitor_2(self):49        mp1, mp2 = MockProcess(), MockProcess(timeout=0)50        p1 = Process(target=mp1.run)51        p2 = Process(target=mp2.run)52        p1.start()53        p2.start()54        pm = ProcessMonitor(p1, p2)55        pm.add_callbacks(pm.kill_processes, pm.exit)56        pm.start()57        wait_for_processes(10, p1, p2)58        self.assertFalse(pm.is_process_alive(p1), "process not killed")59        self.assertFalse(pm.is_process_alive(p2), "process not finished")60    def test_exit(self):61        import logging62        logger = logging.getLogger("golem.core")63        mp1, mp2 = MockProcess(), MockProcess()64        p1 = Process(target=mp1.run)65        p2 = Process(target=mp2.run)66        p1.start()67        p2.start()68        pm = ProcessMonitor(p1, p2)69        def callback():70            logger.warning("Shutting down...")71        pm.add_callbacks(callback)72        pm.start()73        pm.exit()74        pm.join()75        self.assertFalse(pm.is_process_alive(p1))76        self.assertFalse(pm.is_process_alive(p2))77    def test_add_remove_callbacks(self):78        pm = ProcessMonitor()79        pm.add_callbacks(pm.exit)80        pm.remove_callbacks(pm.exit)81        assert not pm._callbacks82    def test_add_child_process(self):83        mp1, mp2 = MockProcess(), MockProcess(timeout=1)84        p1 = Process(target=mp1.run)85        p2 = Process(target=mp2.run)86        pm = ProcessMonitor(p1)87        pm.add_child_processes(p2)88        assert len(pm._child_processes) == 289    def test_lifecycle_popen(self):90        process = subprocess.Popen(91            ['python', '-c', 'import time; time.sleep(0.1)'])92        assert ProcessMonitor.is_process_alive(process)93        assert ProcessMonitor._pid(process)94        assert ProcessMonitor.is_supported(process)95        process.communicate()96        assert not ProcessMonitor.is_process_alive(process)97        assert ProcessMonitor.exit_code(process) is not None98    def test_lifecycle_multiprocessing(self):99        process = Process(target=sleep_tenth_sec)100        assert not ProcessMonitor.is_process_alive(process)101        assert ProcessMonitor.is_supported(process)102        process.start()103        assert ProcessMonitor.is_process_alive(process)104        process.join()105        assert not ProcessMonitor.is_process_alive(process)106        assert ProcessMonitor.exit_code(process) == 0107    def test_lifecycle_none(self):108        process = None109        assert not ProcessMonitor.is_process_alive(process)110        assert not ProcessMonitor.is_supported(process)111        assert not ProcessMonitor._pid(process)112        assert ProcessMonitor.exit_code(process) is None113    def test_kill_process_popen(self):114        process = subprocess.Popen(['python', '-c', 'import time; time.sleep(1)'])115        assert ProcessMonitor.is_process_alive(process)116        ProcessMonitor.kill_process(process)117        assert not ProcessMonitor.is_process_alive(process)118    def test_kill_process_multiprocessing(self):119        process = Process(target=sleep_tenth_sec)120        process.start()121        assert ProcessMonitor.is_process_alive(process)122        ProcessMonitor.kill_process(process)123        assert not ProcessMonitor.is_process_alive(process)124        process = Process(target=sleep_tenth_sec)125        ProcessMonitor.kill_process(process)126    def test_exit_code(self):127        process_psutil = psutil.Popen.__new__(psutil.Popen, None)128        process_subprocess = subprocess.Popen.__new__(subprocess.Popen, None)129        process_multiprocessing = Process.__new__(Process, None)130        process_psutil.poll = Mock()131        process_subprocess.poll = Mock()132        process_multiprocessing._popen = Mock()133        process_multiprocessing._parent_pid = os.getpid()134        process_multiprocessing._name = "test"135        process_multiprocessing._daemonic = False136        process_psutil.returncode = None137        process_subprocess.returncode = None138        assert ProcessMonitor.is_process_alive(process_psutil)139        assert ProcessMonitor.is_process_alive(process_subprocess)140        with patch('multiprocessing.Process.is_alive', side_effect=lambda: False):141            assert not ProcessMonitor.is_process_alive(process_multiprocessing)142        assert ProcessMonitor.exit_code(None) is None143        assert ProcessMonitor.exit_code(process_psutil) is None144        assert ProcessMonitor.exit_code(process_subprocess) is None145        with patch('multiprocessing.Process.exitcode') as exitcode:146            exitcode.__get__ = Mock(return_value=None)147            assert ProcessMonitor.exit_code(process_multiprocessing) is None148        process_psutil.poll = Mock()149        process_psutil.returncode = 0150        process_subprocess.poll = Mock()151        process_subprocess.returncode = 0152        assert not ProcessMonitor.is_process_alive(None)153        assert not ProcessMonitor.is_process_alive(process_psutil)154        assert not ProcessMonitor.is_process_alive(process_subprocess)155        with patch('multiprocessing.Process.exitcode') as exitcode:156            exitcode.__get__ = Mock(return_value=0)157            assert not ProcessMonitor.is_process_alive(process_multiprocessing)158        assert ProcessMonitor.exit_code(process_psutil) == 0159        assert ProcessMonitor.exit_code(process_subprocess) == 0160        with patch('multiprocessing.Process.exitcode') as exitcode:161            exitcode.__get__ = Mock(return_value=0)...process.py
Source:process.py  
...5  # --- set valid instance credentials --- #6  from recompute.instance import InstanceManager7  from recompute.config import ConfigManager8  return InstanceManager(ConfigManager()).get()9def test_is_process_alive():10  from recompute.process import is_process_alive11  import subprocess12  import os13  proc = subprocess.Popen(['sleep 60', '...'], shell=True, stdout=open(os.devnull))14  assert is_process_alive(proc.pid)15  proc.terminate()16def test_execute():17  from recompute.process import is_process_alive18  from recompute.process import execute19  pid, output = execute('ls -1a')20  assert not is_process_alive(pid)21  output_list = output.split('\n')22  assert '.' in output_list and '..' in output_list23  assert len(output_list) > 224def test_async_execute():25  from recompute.process import is_process_alive26  from recompute.process import async_execute27  import signal28  import os29  pid, output = async_execute('sleep 60')30  assert not output31  assert is_process_alive(pid)32  os.kill(pid, signal.SIGTERM)33def test_remote_execute(instance):34  from recompute.process import remote_execute35  pid, output = remote_execute('ls -1a', instance)36  output_list = output.split('\n')37  assert '.' in output_list and '..' in output_list38  assert len(output_list) > 239def test_remote_async_execute(instance):40  from recompute.process import remote_async_execute41  from recompute.process import is_remote_process_alive42  pid, _ = remote_async_execute('sleep 60', instance)43  assert isinstance(pid, type(42))44  assert is_remote_process_alive(pid, instance)45def test_is_remote_process_alive(instance):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
