Best Python code snippet using avocado_python
process.py
Source:process.py  
...207                if ' -s' not in sudo_cmd:208                    sudo_cmd = '%s -s' % sudo_cmd209            cmd = '%s %s' % (sudo_cmd, cmd)210        return cmd211    def _init_subprocess(self):212        if self._popen is None:213            if self.verbose:214                log.info("Running '%s'", self.cmd)215            if self.shell is False:216                cmd = shlex.split(self.cmd)217            else:218                cmd = self.cmd219            try:220                self._popen = subprocess.Popen(cmd,221                                               stdout=subprocess.PIPE,222                                               stderr=subprocess.PIPE,223                                               shell=self.shell,224                                               env=self.env)225            except OSError as details:226                details.strerror += " (%s)" % self.cmd227                raise details228            self.start_time = time.time()229            self.stdout_file = StringIO()230            self.stderr_file = StringIO()231            self.stdout_lock = threading.Lock()232            ignore_bg_processes = self._ignore_bg_processes233            self.stdout_thread = threading.Thread(target=self._fd_drainer,234                                                  name="%s-stdout" % self.cmd,235                                                  args=[self._popen.stdout,236                                                        ignore_bg_processes])237            self.stdout_thread.daemon = True238            self.stderr_lock = threading.Lock()239            self.stderr_thread = threading.Thread(target=self._fd_drainer,240                                                  name="%s-stderr" % self.cmd,241                                                  args=[self._popen.stderr,242                                                        ignore_bg_processes])243            self.stderr_thread.daemon = True244            self.stdout_thread.start()245            self.stderr_thread.start()246            def signal_handler(signum, frame):247                self.result.interrupted = "signal/ctrl+c"248                self.wait()249                signal.default_int_handler()250            try:251                signal.signal(signal.SIGINT, signal_handler)252            except ValueError:253                if self.verbose:254                    log.info("Command %s running on a thread", self.cmd)255    def _fd_drainer(self, input_pipe, ignore_bg_processes=False):256        """257        Read from input_pipe, storing and logging output.258        :param input_pipe: File like object to the stream.259        """260        stream_prefix = "%s"261        if input_pipe == self._popen.stdout:262            prefix = '[stdout] %s'263            if self.allow_output_check in ['none', 'stderr']:264                stream_logger = None265            else:266                stream_logger = stdout_log267            output_file = self.stdout_file268            lock = self.stdout_lock269        elif input_pipe == self._popen.stderr:270            prefix = '[stderr] %s'271            if self.allow_output_check in ['none', 'stdout']:272                stream_logger = None273            else:274                stream_logger = stderr_log275            output_file = self.stderr_file276            lock = self.stderr_lock277        fileno = input_pipe.fileno()278        bfr = ''279        while True:280            if ignore_bg_processes:281                # Exit if there are no new data and the main process finished282                if (not select.select([fileno], [], [], 1)[0] and283                        self.result.exit_status is not None):284                    break285                # Don't read unless there are new data available:286                if not select.select([fileno], [], [], 1)[0]:287                    continue288            tmp = os.read(fileno, 8192).decode()289            if tmp == '':290                break291            lock.acquire()292            try:293                output_file.write(tmp)294                if self.verbose:295                    bfr += tmp296                    if tmp.endswith('\n'):297                        for line in bfr.splitlines():298                            log.debug(prefix, line)299                            if stream_logger is not None:300                                stream_logger.debug(stream_prefix, line)301                        bfr = ''302            finally:303                lock.release()304        # Write the rest of the bfr unfinished by \n305        if self.verbose and bfr:306            for line in bfr.splitlines():307                log.debug(prefix, line)308                if stream_logger is not None:309                    stream_logger.debug(stream_prefix, line)310    def _fill_results(self, rc):311        self._init_subprocess()312        self.result.exit_status = rc313        if self.result.duration == 0:314            self.result.duration = time.time() - self.start_time315        if self.verbose:316            log.info("Command '%s' finished with %s after %ss", self.cmd, rc,317                     self.result.duration)318        self.result.pid = self._popen.pid319        self._fill_streams()320    def _fill_streams(self):321        """322        Close subprocess stdout and stderr, and put values into result obj.323        """324        # Cleaning up threads325        self.stdout_thread.join()326        self.stderr_thread.join()327        # Clean subprocess pipes and populate stdout/err328        self._popen.stdout.close()329        self._popen.stderr.close()330        self.result.stdout = self.get_stdout()331        self.result.stderr = self.get_stderr()332    def start(self):333        """334        Start running the subprocess.335        This method is particularly useful for background processes, since336        you can start the subprocess and not block your test flow.337        :return: Subprocess PID.338        :rtype: int339        """340        self._init_subprocess()341        return self._popen.pid342    def get_stdout(self):343        """344        Get the full stdout of the subprocess so far.345        :return: Standard output of the process.346        :rtype: str347        """348        self._init_subprocess()349        self.stdout_lock.acquire()350        stdout = self.stdout_file.getvalue()351        self.stdout_lock.release()352        return stdout353    def get_stderr(self):354        """355        Get the full stderr of the subprocess so far.356        :return: Standard error of the process.357        :rtype: str358        """359        self._init_subprocess()360        self.stderr_lock.acquire()361        stderr = self.stderr_file.getvalue()362        self.stderr_lock.release()363        return stderr364    def terminate(self):365        """366        Send a :attr:`signal.SIGTERM` to the process.367        """368        self._init_subprocess()369        self.send_signal(signal.SIGTERM)370    def kill(self):371        """372        Send a :attr:`signal.SIGKILL` to the process.373        """374        self._init_subprocess()375        self.send_signal(signal.SIGKILL)376    def send_signal(self, sig):377        """378        Send the specified signal to the process.379        :param sig: Signal to send.380        """381        self._init_subprocess()382        self._popen.send_signal(sig)383    def poll(self):384        """385        Call the subprocess poll() method, fill results if rc is not None.386        """387        self._init_subprocess()388        rc = self._popen.poll()389        if rc is not None:390            self._fill_results(rc)391        return rc392    def wait(self):393        """394        Call the subprocess poll() method, fill results if rc is not None.395        """396        self._init_subprocess()397        rc = self._popen.wait()398        if rc is not None:399            self._fill_results(rc)400        return rc401    def stop(self):402        """403        Stop background subprocess.404        Call this method to terminate the background subprocess and405        wait for it results.406        """407        self._init_subprocess()408        if self.result.exit_status is None:409            self.terminate()410        return self.wait()411    def get_pid(self):412        """413        Reports PID of this process414        """415        self._init_subprocess()416        return self._popen.pid417    def run(self, timeout=None, sig=signal.SIGTERM):418        """419        Start a process and wait for it to end, returning the result attr.420        If the process was already started using .start(), this will simply421        wait for it to end.422        :param timeout: Time (seconds) we'll wait until the process is423                        finished. If it's not, we'll try to terminate it424                        and get a status.425        :type timeout: float426        :param sig: Signal to send to the process in case it did not end after427                    the specified timeout.428        :type sig: int429        :returns: The command result object.430        :rtype: A :class:`CmdResult` instance.431        """432        def timeout_handler():433            self.send_signal(sig)434            self.result.interrupted = "timeout after %ss" % timeout435        self._init_subprocess()436        if timeout is None:437            self.wait()438        elif timeout > 0.0:439            timer = threading.Timer(timeout, timeout_handler)440            try:441                timer.start()442                self.wait()443            finally:444                timer.cancel()445        if self.result.exit_status is None:446            stop_time = time.time() + 1447            while time.time() < stop_time:448                self.poll()449                if self.result.exit_status is not None:...002.compute_cascades.py
Source:002.compute_cascades.py  
...8from tqdm import tqdm9from s4.cascade.analysis import compute_cascade10from s4.data import open_data11logging.basicConfig(level=logging.WARNING, format='[%(levelname)s]: %(message)s')12def _init_subprocess(o_icsd, use_temperature):13    _init_subprocess.only_icsd = o_icsd14    _init_subprocess.use_temperature = use_temperature15    sys.stdout = open(os.devnull, 'w')16    sys.stderr = open(os.devnull, 'w')17def _compute_cascade_for_one(args):18    k, d = args19    temperatures = _init_subprocess.use_temperature20    results = []21    for j, reaction in enumerate(d.reactions):22        per_reaction = {}23        for i, temp in enumerate([d.exp_t] + temperatures):24            furnace = [temp + 273.15] * 1025            try:26                step_info = compute_cascade(reaction, furnace, only_icsd=_init_subprocess.only_icsd)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
