Best Python code snippet using molotov_python
asyncExecutor.py
Source:asyncExecutor.py  
1import os2import sys3import shlex4import signal5import threading6import itertools7import subprocess8from .utils import *9if sys.version_info >= (3, 0):10    import queue as Queue11    lfDEVNULL = subprocess.DEVNULL12else:13    import Queue14    lfDEVNULL = open(os.devnull)15class AsyncExecutor(object):16    """17    A class to implement executing a command in subprocess, then18    read the output asynchronously.19    """20    def __init__(self):21        self._errQueue = Queue.Queue()22        self._process = None23        self._finished = False24        self._max_count = int(lfEval("g:Lf_MaxCount"))25    def _readerThread(self, fd, queue):26        try:27            for line in iter(fd.readline, b""):28                queue.put(line)29        except ValueError:30            pass31        finally:32            queue.put(None)33    def execute(self, cmd, encoding=None, cleanup=None, env=None, raise_except=True, format_line=None):34        if os.name == 'nt':35            self._process = subprocess.Popen(cmd, bufsize=-1,36                                             stdin=lfDEVNULL,37                                             stdout=subprocess.PIPE,38                                             stderr=subprocess.PIPE,39                                             shell=True,40                                             env=env,41                                             universal_newlines=False)42        else:43            self._process = subprocess.Popen(cmd, bufsize=-1,44                                             stdin=lfDEVNULL,45                                             stdout=subprocess.PIPE,46                                             stderr=subprocess.PIPE,47                                             preexec_fn=os.setsid,48                                             shell=True,49                                             env=env,50                                             universal_newlines=False)51        self._finished = False52        stderr_thread = threading.Thread(target=self._readerThread,53                                         args=(self._process.stderr, self._errQueue))54        stderr_thread.daemon = True55        stderr_thread.start()56        if sys.version_info >= (3, 0):57            def read(source):58                try:59                    count = 060                    if encoding:61                        for line in source:62                            try:63                                line.decode("ascii")64                                yield (65                                    format_line(line.rstrip(b"\r\n").decode())66                                ) if format_line else line.rstrip(b"\r\n").decode()67                            except UnicodeDecodeError:68                                yield (69                                    format_line(lfBytes2Str(line.rstrip(b"\r\n"), encoding))70                                ) if format_line else (71                                    lfBytes2Str(line.rstrip(b"\r\n"), encoding)72                                )73                            if self._max_count > 0:74                                count += 175                                if count >= self._max_count:76                                    self.killProcess()77                                    break78                    else:79                        for line in source:80                            try:81                                line.decode("ascii")82                                yield (83                                    format_line(line.rstrip(b"\r\n").decode())84                                ) if format_line else line.rstrip(b"\r\n").decode()85                            except UnicodeDecodeError:86                                yield (87                                    format_line(lfBytes2Str(line.rstrip(b"\r\n")))88                                ) if format_line else lfBytes2Str(line.rstrip(b"\r\n"))89                            if self._max_count > 0:90                                count += 191                                if count >= self._max_count:92                                    self.killProcess()93                                    break94                    err = b"".join(iter(self._errQueue.get, None))95                    if err and raise_except:96                        raise Exception(lfBytes2Str(err, encoding))97                except ValueError:98                    pass99                finally:100                    self._finished = True101                    try:102                        if self._process:103                            self._process.stdout.close()104                            self._process.stderr.close()105                            self._process.poll()106                    except IOError:107                        pass108                    if cleanup:109                        cleanup()110        else:111            def read(source):112                try:113                    count = 0114                    if encoding:115                        for line in source:116                            yield (117                                format_line(line.rstrip(b"\r\n"))118                            ) if format_line else line.rstrip(b"\r\n")119                            if self._max_count > 0:120                                count += 1121                                if count >= self._max_count:122                                    self.killProcess()123                                    break124                    else:125                        for line in source:126                            try:127                                line.decode("ascii")128                                yield (129                                    format_line(line.rstrip(b"\r\n"))130                                ) if format_line else line.rstrip(b"\r\n")131                            except UnicodeDecodeError:132                                yield (133                                    format_line(lfEncode(line.rstrip(b"\r\n")))134                                ) if format_line else lfEncode(line.rstrip(b"\r\n"))135                            if self._max_count > 0:136                                count += 1137                                if count >= self._max_count:138                                    self.killProcess()139                                    break140                    err = b"".join(iter(self._errQueue.get, None))141                    if err and raise_except:142                        raise Exception(err)143                except ValueError:144                    pass145                finally:146                    self._finished = True147                    try:148                        if self._process:149                            self._process.stdout.close()150                            self._process.stderr.close()151                            self._process.poll()152                    except IOError:153                        pass154                    if cleanup:155                        cleanup()156        result = AsyncExecutor.Result(read(iter(self._process.stdout.readline, b"")))157        return result158    def killProcess(self):159        # Popen.poll always returns None, bug?160        # if self._process and not self._process.poll():161        if self._process and not self._finished:162            if os.name == 'nt':163                subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=self._process.pid), shell=True)164            else:165                try:166                    os.killpg(os.getpgid(self._process.pid), signal.SIGTERM)167                except OSError:168                    pass169            self._process = None170    class Result(object):171        def __init__(self, iterable):172            self._g = iterable173        def __add__(self, iterable):174            self._g = itertools.chain(self._g, iterable)175            return self176        def __iadd__(self, iterable):177            self._g = itertools.chain(self._g, iterable)178            return self179        def join_left(self, iterable):180            self._g = itertools.chain(iterable, self._g)181            return self182        def __iter__(self):183            return self._g184if __name__ == "__main__":185    executor = AsyncExecutor()186    out = executor.execute("ctags -f- -R")187    print("stdout begin: ============================================")188    for i in out:189        print(repr(i))...shellscript.py
Source:shellscript.py  
1import subprocess2import tempfile3import shutil4import signal5import os6from pathlib import Path7import time8import sys9from typing import Optional, List, Any, Union10PathType = Union[str, Path]11class ShellScript():12    def __init__(self, script: str, script_path: Optional[PathType] = None, log_path: Optional[PathType] = None,13                 keep_temp_files: bool = False, verbose: bool = False):14        lines = script.splitlines()15        lines = self._remove_initial_blank_lines(lines)16        if len(lines) > 0:17            num_initial_spaces = self._get_num_initial_spaces(lines[0])18            for ii, line in enumerate(lines):19                if len(line.strip()) > 0:20                    n = self._get_num_initial_spaces(line)21                    if n < num_initial_spaces:22                        print(script)23                        raise Exception('Problem in script. First line must not be indented relative to others')24                    lines[ii] = lines[ii][num_initial_spaces:]25        self._script = '\n'.join(lines)26        self._script_path = script_path27        self._log_path = log_path28        self._keep_temp_files = keep_temp_files29        self._process: Optional[subprocess.Popen] = None30        self._files_to_remove: List[str] = []31        self._dirs_to_remove: List[str] = []32        self._start_time: Optional[float] = None33        self._verbose = verbose34    def __del__(self):35        self.cleanup()36    def substitute(self, old: str, new: Any) -> None:37        self._script = self._script.replace(old, '{}'.format(new))38    def write(self, script_path: Optional[str] = None) -> None:39        if script_path is None:40            script_path = self._script_path41        if script_path is None:42            raise Exception('Cannot write script. No path specified')43        with open(script_path, 'w') as f:44            f.write(self._script)45        os.chmod(script_path, 0o744)46    def start(self) -> None:47        if self._script_path is not None:48            script_path = Path(self._script_path)49            if script_path.suffix == '':50                if 'win' in sys.platform and sys.platform != 'darwin':51                    script_path = script_path.parent / (script_path.name + '.bat')52                else:53                    script_path = script_path.parent / (script_path.name + '.sh')54        else:55            tempdir = Path(tempfile.mkdtemp(prefix='tmp_shellscript'))56            if 'win' in sys.platform and sys.platform != 'darwin':57                script_path = tempdir / 'script.bat'58            else:59                script_path = tempdir / 'script.sh'60            self._dirs_to_remove.append(tempdir)61        if self._log_path is None:62            script_log_path = script_path.parent / 'spikesorters_log.txt'63        else:64            script_log_path = Path(self._log_path)65            if script_path.suffix == '':66                script_log_path = script_log_path.parent / (script_log_path.name + '.txt')67        self.write(script_path)68        cmd = str(script_path)69        print('RUNNING SHELL SCRIPT: ' + cmd)70        self._start_time = time.time()71        self._process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1,72                                         universal_newlines=True)73        with open(script_log_path, 'w+') as script_log_file:74            for line in self._process.stdout:75                script_log_file.write(line)76                if self._verbose:  # Print onto console depending on the verbose property passed on from the sorter class77                    print(line)78    def wait(self, timeout=None) -> Optional[int]:79        if not self.isRunning():80            return self.returnCode()81        assert self._process is not None, "Unexpected self._process is None even though it is running."82        try:83            retcode = self._process.wait(timeout=timeout)84            return retcode85        except:86            return None87    def cleanup(self) -> None:88        if self._keep_temp_files:89            return90        for dirpath in self._dirs_to_remove:91            _rmdir_with_retries(str(dirpath), num_retries=5)92    def stop(self) -> None:93        if not self.isRunning():94            return95        assert self._process is not None, "Unexpected self._process is None even though it is running."96        signals = [signal.SIGINT] * 10 + [signal.SIGTERM] * 10 + [signal.SIGKILL] * 1097        for signal0 in signals:98            self._process.send_signal(signal0)99            try:100                self._process.wait(timeout=0.02)101                return102            except:103                pass104    def kill(self) -> None:105        if not self.isRunning():106            return107        assert self._process is not None, "Unexpected self._process is None even though it is running."108        self._process.send_signal(signal.SIGKILL)109        try:110            self._process.wait(timeout=1)111        except:112            print('WARNING: unable to kill shell script.')113            pass114    def stopWithSignal(self, sig, timeout) -> bool:115        if not self.isRunning():116            return True117        assert self._process is not None, "Unexpected self._process is None even though it is running."118        self._process.send_signal(sig)119        try:120            self._process.wait(timeout=timeout)121            return True122        except:123            return False124    def elapsedTimeSinceStart(self) -> Optional[float]:125        if self._start_time is None:126            return None127        return time.time() - self._start_time128    def isRunning(self) -> bool:129        if not self._process:130            return False131        retcode = self._process.poll()132        if retcode is None:133            return True134        return False135    def isFinished(self) -> bool:136        if not self._process:137            return False138        return not self.isRunning()139    def returnCode(self) -> Optional[int]:140        if not self.isFinished():141            raise Exception('Cannot get return code before process is finished.')142        assert self._process is not None, "Unexpected self._process is None even though it is finished."143        return self._process.returncode144    def scriptPath(self) -> Optional[str]:145        return self._script_path146    def _remove_initial_blank_lines(self, lines: List[str]) -> List[str]:147        ii = 0148        while ii < len(lines) and len(lines[ii].strip()) == 0:149            ii = ii + 1150        return lines[ii:]151    def _get_num_initial_spaces(self, line: str) -> int:152        ii = 0153        while ii < len(line) and line[ii] == ' ':154            ii = ii + 1155        return ii156def _rmdir_with_retries(dirname, num_retries, delay_between_tries=1):157    for retry_num in range(1, num_retries + 1):158        if not Path(dirname).is_dir():159            return160        try:161            shutil.rmtree(dirname)162            break163        except:164            if retry_num < num_retries:165                print('Retrying to remove directory: {}'.format(dirname))166                time.sleep(delay_between_tries)167            else:...elf.py
Source:elf.py  
1import logging2logger = logging.getLogger(__name__)3from termcolor import cprint, colored4from ... import types, common5elf_types = {6    0x0: 'NONE',7    0x1: 'REL',8    0x2: 'EXEC',9    0x3: 'DYN',10    0x4: 'CORE',11    0xfe00: 'LOOS',12    0xfeff: 'HIOS',13    0xff00: 'LOPROC',14    0xffff: 'HIPROC',15}16class ELF(object):17    def __init__(self, process, module):18        """Parses the ELF in memory.19        Args:20            process (Process): revenge Process object21            module (Module): Module object to start parsing at22        """23        self._process = process24        self.module = module25        self.bits = None26        assert self._process.memory[self.module.base:self.module.base+4].bytes == b'\x7fELF', "This does not appear to be an ELF file..."27    @property28    def program_headers(self):29        """Program headers object."""30        return ProgramHeaders(self._process, self)31    @property32    def section_headers(self):33        """Section headers object."""34        return SectionHeaders(self._process, self)35    @property36    def bits(self):37        """int: How many bits is this ELF?"""38        if self.__bits is None:39            self.__bits = 32 if self._process.memory[self.module.base+0x4].int8 == 1 else 6440        return self.__bits41    @bits.setter42    def bits(self, bits):43        self.__bits = bits44    @property45    def entry(self):46        """int: ELF entrypoint. Rebased"""47        ret = self._process.memory[self.module.base+0x18].pointer48        if self.type_str == 'DYN':49            ret = types.Pointer(ret + self.module.base)50        return ret51    @property52    def phoff(self):53        """int: Program header offset."""54        if self.bits == 32:55            ret = self._process.memory[self.module.base+0x1C].pointer56        else:57            ret = self._process.memory[self.module.base+0x20].pointer58            59        return types.Pointer(ret + self.module.base)60    @property61    def phnum(self):62        """int: Number of program headers."""63        if self.bits == 32:64            return self._process.memory[self.module.base+0x2C].uint1665        else:66            return self._process.memory[self.module.base+0x38].uint1667    @property68    def phentsize(self):69        """int: Size of program headers."""70        if self.bits == 32:71            return self._process.memory[self.module.base+0x2A].uint1672        else:73            return self._process.memory[self.module.base+0x36].uint1674    @property75    def flags(self):76        """int: Flags"""77        if self.bits == 32:78            return self._process.memory[self.module.base+0x24].uint3279        else:80            return self._process.memory[self.module.base+0x30].uint3281    @property82    def ehsize(self):83        """int: Size of this header."""84        if self.bits == 32:85            return self._process.memory[self.module.base+0x28].uint1686        else:87            return self._process.memory[self.module.base+0x34].uint1688    @property89    def shentsize(self):90        """int: Size of a section header entry"""91        if self.bits == 32:92            return self._process.memory[self.module.base+0x2e].uint1693        else:94            return self._process.memory[self.module.base+0x3a].uint1695    @property96    def shnum(self):97        """int: Number of section header entries."""98        if self.bits == 32:99            return self._process.memory[self.module.base+0x30].uint16100        else:101            return self._process.memory[self.module.base+0x3c].uint16102    @property103    def shstrndx(self):104        """int: Section header index that contains the section names."""105        if self.bits == 32:106            return self._process.memory[self.module.base+0x32].uint16107        else:108            return self._process.memory[self.module.base+0x3e].uint16109    @property110    def shoff(self):111        """int: Section Header offset"""112        if self.bits == 32:113            ret = self._process.memory[self.module.base+0x20].pointer114        else:115            ret = self._process.memory[self.module.base+0x28].pointer116        return types.Pointer(ret + self.module.base)117    @property118    def type(self):119        """int: Type of this binary."""120        try:121            return self.__type122        except AttributeError:123            self.__type = self._process.memory[self.module.base+0x10].uint16124            return self.__type125    @property126    def type_str(self):127        """str: Type of this binary."""128        return elf_types[self.type]129        130from .program_headers import ProgramHeaders...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
