How to use _process method in Molotov

Best Python code snippet using molotov_python

asyncExecutor.py

Source:asyncExecutor.py Github

copy

Full Screen

1import os2import sys3import shlex4import signal5import threading6import itertools7import subprocess8from .utils import *9if sys.version_info >= (3, 0):10 import queue as Queue11 lfDEVNULL = subprocess.DEVNULL12else:13 import Queue14 lfDEVNULL = open(os.devnull)15class AsyncExecutor(object):16 """17 A class to implement executing a command in subprocess, then18 read the output asynchronously.19 """20 def __init__(self):21 self._errQueue = Queue.Queue()22 self._process = None23 self._finished = False24 self._max_count = int(lfEval("g:Lf_MaxCount"))25 def _readerThread(self, fd, queue):26 try:27 for line in iter(fd.readline, b""):28 queue.put(line)29 except ValueError:30 pass31 finally:32 queue.put(None)33 def execute(self, cmd, encoding=None, cleanup=None, env=None, raise_except=True, format_line=None):34 if os.name == 'nt':35 self._process = subprocess.Popen(cmd, bufsize=-1,36 stdin=lfDEVNULL,37 stdout=subprocess.PIPE,38 stderr=subprocess.PIPE,39 shell=True,40 env=env,41 universal_newlines=False)42 else:43 self._process = subprocess.Popen(cmd, bufsize=-1,44 stdin=lfDEVNULL,45 stdout=subprocess.PIPE,46 stderr=subprocess.PIPE,47 preexec_fn=os.setsid,48 shell=True,49 env=env,50 universal_newlines=False)51 self._finished = False52 stderr_thread = threading.Thread(target=self._readerThread,53 args=(self._process.stderr, self._errQueue))54 stderr_thread.daemon = True55 stderr_thread.start()56 if sys.version_info >= (3, 0):57 def read(source):58 try:59 count = 060 if encoding:61 for line in source:62 try:63 line.decode("ascii")64 yield (65 format_line(line.rstrip(b"\r\n").decode())66 ) if format_line else line.rstrip(b"\r\n").decode()67 except UnicodeDecodeError:68 yield (69 format_line(lfBytes2Str(line.rstrip(b"\r\n"), encoding))70 ) if format_line else (71 lfBytes2Str(line.rstrip(b"\r\n"), encoding)72 )73 if self._max_count > 0:74 count += 175 if count >= self._max_count:76 self.killProcess()77 break78 else:79 for line in source:80 try:81 line.decode("ascii")82 yield (83 format_line(line.rstrip(b"\r\n").decode())84 ) if format_line else line.rstrip(b"\r\n").decode()85 except UnicodeDecodeError:86 yield (87 format_line(lfBytes2Str(line.rstrip(b"\r\n")))88 ) if format_line else lfBytes2Str(line.rstrip(b"\r\n"))89 if self._max_count > 0:90 count += 191 if count >= self._max_count:92 self.killProcess()93 break94 err = b"".join(iter(self._errQueue.get, None))95 if err and raise_except:96 raise Exception(lfBytes2Str(err, encoding))97 except ValueError:98 pass99 finally:100 self._finished = True101 try:102 if self._process:103 self._process.stdout.close()104 self._process.stderr.close()105 self._process.poll()106 except IOError:107 pass108 if cleanup:109 cleanup()110 else:111 def read(source):112 try:113 count = 0114 if encoding:115 for line in source:116 yield (117 format_line(line.rstrip(b"\r\n"))118 ) if format_line else line.rstrip(b"\r\n")119 if self._max_count > 0:120 count += 1121 if count >= self._max_count:122 self.killProcess()123 break124 else:125 for line in source:126 try:127 line.decode("ascii")128 yield (129 format_line(line.rstrip(b"\r\n"))130 ) if format_line else line.rstrip(b"\r\n")131 except UnicodeDecodeError:132 yield (133 format_line(lfEncode(line.rstrip(b"\r\n")))134 ) if format_line else lfEncode(line.rstrip(b"\r\n"))135 if self._max_count > 0:136 count += 1137 if count >= self._max_count:138 self.killProcess()139 break140 err = b"".join(iter(self._errQueue.get, None))141 if err and raise_except:142 raise Exception(err)143 except ValueError:144 pass145 finally:146 self._finished = True147 try:148 if self._process:149 self._process.stdout.close()150 self._process.stderr.close()151 self._process.poll()152 except IOError:153 pass154 if cleanup:155 cleanup()156 result = AsyncExecutor.Result(read(iter(self._process.stdout.readline, b"")))157 return result158 def killProcess(self):159 # Popen.poll always returns None, bug?160 # if self._process and not self._process.poll():161 if self._process and not self._finished:162 if os.name == 'nt':163 subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self._process.pid), shell=True)164 else:165 try:166 os.killpg(os.getpgid(self._process.pid), signal.SIGTERM)167 except OSError:168 pass169 self._process = None170 class Result(object):171 def __init__(self, iterable):172 self._g = iterable173 def __add__(self, iterable):174 self._g = itertools.chain(self._g, iterable)175 return self176 def __iadd__(self, iterable):177 self._g = itertools.chain(self._g, iterable)178 return self179 def join_left(self, iterable):180 self._g = itertools.chain(iterable, self._g)181 return self182 def __iter__(self):183 return self._g184if __name__ == "__main__":185 executor = AsyncExecutor()186 out = executor.execute("ctags -f- -R")187 print("stdout begin: ============================================")188 for i in out:189 print(repr(i))...

Full Screen

Full Screen

shellscript.py

Source:shellscript.py Github

copy

Full Screen

1import subprocess2import tempfile3import shutil4import signal5import os6from pathlib import Path7import time8import sys9from typing import Optional, List, Any, Union10PathType = Union[str, Path]11class ShellScript():12 def __init__(self, script: str, script_path: Optional[PathType] = None, log_path: Optional[PathType] = None,13 keep_temp_files: bool = False, verbose: bool = False):14 lines = script.splitlines()15 lines = self._remove_initial_blank_lines(lines)16 if len(lines) > 0:17 num_initial_spaces = self._get_num_initial_spaces(lines[0])18 for ii, line in enumerate(lines):19 if len(line.strip()) > 0:20 n = self._get_num_initial_spaces(line)21 if n < num_initial_spaces:22 print(script)23 raise Exception('Problem in script. First line must not be indented relative to others')24 lines[ii] = lines[ii][num_initial_spaces:]25 self._script = '\n'.join(lines)26 self._script_path = script_path27 self._log_path = log_path28 self._keep_temp_files = keep_temp_files29 self._process: Optional[subprocess.Popen] = None30 self._files_to_remove: List[str] = []31 self._dirs_to_remove: List[str] = []32 self._start_time: Optional[float] = None33 self._verbose = verbose34 def __del__(self):35 self.cleanup()36 def substitute(self, old: str, new: Any) -> None:37 self._script = self._script.replace(old, '{}'.format(new))38 def write(self, script_path: Optional[str] = None) -> None:39 if script_path is None:40 script_path = self._script_path41 if script_path is None:42 raise Exception('Cannot write script. No path specified')43 with open(script_path, 'w') as f:44 f.write(self._script)45 os.chmod(script_path, 0o744)46 def start(self) -> None:47 if self._script_path is not None:48 script_path = Path(self._script_path)49 if script_path.suffix == '':50 if 'win' in sys.platform and sys.platform != 'darwin':51 script_path = script_path.parent / (script_path.name + '.bat')52 else:53 script_path = script_path.parent / (script_path.name + '.sh')54 else:55 tempdir = Path(tempfile.mkdtemp(prefix='tmp_shellscript'))56 if 'win' in sys.platform and sys.platform != 'darwin':57 script_path = tempdir / 'script.bat'58 else:59 script_path = tempdir / 'script.sh'60 self._dirs_to_remove.append(tempdir)61 if self._log_path is None:62 script_log_path = script_path.parent / 'spikesorters_log.txt'63 else:64 script_log_path = Path(self._log_path)65 if script_path.suffix == '':66 script_log_path = script_log_path.parent / (script_log_path.name + '.txt')67 self.write(script_path)68 cmd = str(script_path)69 print('RUNNING SHELL SCRIPT: ' + cmd)70 self._start_time = time.time()71 self._process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1,72 universal_newlines=True)73 with open(script_log_path, 'w+') as script_log_file:74 for line in self._process.stdout:75 script_log_file.write(line)76 if self._verbose: # Print onto console depending on the verbose property passed on from the sorter class77 print(line)78 def wait(self, timeout=None) -> Optional[int]:79 if not self.isRunning():80 return self.returnCode()81 assert self._process is not None, "Unexpected self._process is None even though it is running."82 try:83 retcode = self._process.wait(timeout=timeout)84 return retcode85 except:86 return None87 def cleanup(self) -> None:88 if self._keep_temp_files:89 return90 for dirpath in self._dirs_to_remove:91 _rmdir_with_retries(str(dirpath), num_retries=5)92 def stop(self) -> None:93 if not self.isRunning():94 return95 assert self._process is not None, "Unexpected self._process is None even though it is running."96 signals = [signal.SIGINT] * 10 + [signal.SIGTERM] * 10 + [signal.SIGKILL] * 1097 for signal0 in signals:98 self._process.send_signal(signal0)99 try:100 self._process.wait(timeout=0.02)101 return102 except:103 pass104 def kill(self) -> None:105 if not self.isRunning():106 return107 assert self._process is not None, "Unexpected self._process is None even though it is running."108 self._process.send_signal(signal.SIGKILL)109 try:110 self._process.wait(timeout=1)111 except:112 print('WARNING: unable to kill shell script.')113 pass114 def stopWithSignal(self, sig, timeout) -> bool:115 if not self.isRunning():116 return True117 assert self._process is not None, "Unexpected self._process is None even though it is running."118 self._process.send_signal(sig)119 try:120 self._process.wait(timeout=timeout)121 return True122 except:123 return False124 def elapsedTimeSinceStart(self) -> Optional[float]:125 if self._start_time is None:126 return None127 return time.time() - self._start_time128 def isRunning(self) -> bool:129 if not self._process:130 return False131 retcode = self._process.poll()132 if retcode is None:133 return True134 return False135 def isFinished(self) -> bool:136 if not self._process:137 return False138 return not self.isRunning()139 def returnCode(self) -> Optional[int]:140 if not self.isFinished():141 raise Exception('Cannot get return code before process is finished.')142 assert self._process is not None, "Unexpected self._process is None even though it is finished."143 return self._process.returncode144 def scriptPath(self) -> Optional[str]:145 return self._script_path146 def _remove_initial_blank_lines(self, lines: List[str]) -> List[str]:147 ii = 0148 while ii < len(lines) and len(lines[ii].strip()) == 0:149 ii = ii + 1150 return lines[ii:]151 def _get_num_initial_spaces(self, line: str) -> int:152 ii = 0153 while ii < len(line) and line[ii] == ' ':154 ii = ii + 1155 return ii156def _rmdir_with_retries(dirname, num_retries, delay_between_tries=1):157 for retry_num in range(1, num_retries + 1):158 if not Path(dirname).is_dir():159 return160 try:161 shutil.rmtree(dirname)162 break163 except:164 if retry_num < num_retries:165 print('Retrying to remove directory: {}'.format(dirname))166 time.sleep(delay_between_tries)167 else:...

Full Screen

Full Screen

elf.py

Source:elf.py Github

copy

Full Screen

1import logging2logger = logging.getLogger(__name__)3from termcolor import cprint, colored4from ... import types, common5elf_types = {6 0x0: 'NONE',7 0x1: 'REL',8 0x2: 'EXEC',9 0x3: 'DYN',10 0x4: 'CORE',11 0xfe00: 'LOOS',12 0xfeff: 'HIOS',13 0xff00: 'LOPROC',14 0xffff: 'HIPROC',15}16class ELF(object):17 def __init__(self, process, module):18 """Parses the ELF in memory.19 Args:20 process (Process): revenge Process object21 module (Module): Module object to start parsing at22 """23 self._process = process24 self.module = module25 self.bits = None26 assert self._process.memory[self.module.base:self.module.base+4].bytes == b'\x7fELF', "This does not appear to be an ELF file..."27 @property28 def program_headers(self):29 """Program headers object."""30 return ProgramHeaders(self._process, self)31 @property32 def section_headers(self):33 """Section headers object."""34 return SectionHeaders(self._process, self)35 @property36 def bits(self):37 """int: How many bits is this ELF?"""38 if self.__bits is None:39 self.__bits = 32 if self._process.memory[self.module.base+0x4].int8 == 1 else 6440 return self.__bits41 @bits.setter42 def bits(self, bits):43 self.__bits = bits44 @property45 def entry(self):46 """int: ELF entrypoint. Rebased"""47 ret = self._process.memory[self.module.base+0x18].pointer48 if self.type_str == 'DYN':49 ret = types.Pointer(ret + self.module.base)50 return ret51 @property52 def phoff(self):53 """int: Program header offset."""54 if self.bits == 32:55 ret = self._process.memory[self.module.base+0x1C].pointer56 else:57 ret = self._process.memory[self.module.base+0x20].pointer58 59 return types.Pointer(ret + self.module.base)60 @property61 def phnum(self):62 """int: Number of program headers."""63 if self.bits == 32:64 return self._process.memory[self.module.base+0x2C].uint1665 else:66 return self._process.memory[self.module.base+0x38].uint1667 @property68 def phentsize(self):69 """int: Size of program headers."""70 if self.bits == 32:71 return self._process.memory[self.module.base+0x2A].uint1672 else:73 return self._process.memory[self.module.base+0x36].uint1674 @property75 def flags(self):76 """int: Flags"""77 if self.bits == 32:78 return self._process.memory[self.module.base+0x24].uint3279 else:80 return self._process.memory[self.module.base+0x30].uint3281 @property82 def ehsize(self):83 """int: Size of this header."""84 if self.bits == 32:85 return self._process.memory[self.module.base+0x28].uint1686 else:87 return self._process.memory[self.module.base+0x34].uint1688 @property89 def shentsize(self):90 """int: Size of a section header entry"""91 if self.bits == 32:92 return self._process.memory[self.module.base+0x2e].uint1693 else:94 return self._process.memory[self.module.base+0x3a].uint1695 @property96 def shnum(self):97 """int: Number of section header entries."""98 if self.bits == 32:99 return self._process.memory[self.module.base+0x30].uint16100 else:101 return self._process.memory[self.module.base+0x3c].uint16102 @property103 def shstrndx(self):104 """int: Section header index that contains the section names."""105 if self.bits == 32:106 return self._process.memory[self.module.base+0x32].uint16107 else:108 return self._process.memory[self.module.base+0x3e].uint16109 @property110 def shoff(self):111 """int: Section Header offset"""112 if self.bits == 32:113 ret = self._process.memory[self.module.base+0x20].pointer114 else:115 ret = self._process.memory[self.module.base+0x28].pointer116 return types.Pointer(ret + self.module.base)117 @property118 def type(self):119 """int: Type of this binary."""120 try:121 return self.__type122 except AttributeError:123 self.__type = self._process.memory[self.module.base+0x10].uint16124 return self.__type125 @property126 def type_str(self):127 """str: Type of this binary."""128 return elf_types[self.type]129 130from .program_headers import ProgramHeaders...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Molotov automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful