Best Python code snippet using stestr_python
pyscript.py
Source:pyscript.py  
1#===============================================================================2# Copyright 2012 Jake Ross3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8#   http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15#===============================================================================16#============= enthought library imports =======================17from traits.api import Str, Any, Bool, DelegatesTo, Dict, Property18from pyface.timer.do_later import do_later19from pyface.confirmation_dialog import confirm# from pyface.wx.dialog import confirmation20#============= standard library imports ========================21import time22import os23import inspect24from threading import Thread, Event25#============= local library imports  ==========================26from src.pyscripts.wait_dialog import WaitDialog27from src.loggable import Loggable28from Queue import Queue, Empty29from globals import globalv30class DummyManager(Loggable):31    def open_valve(self, *args, **kw):32        self.info('open valve')33    def close_valve(self, *args, **kw):34        self.info('close valve')35class GosubError(Exception):36    def __init__(self, path):37        self.path = path38    def __str__(self):39        return 'GosubError: {} does not exist'.format(self.path)40def KlassError(Exceotion):41    def __init__(self, klass):42        self.klass = klass43    def __str__(self):44        return 'KlassError: {} does not exist'.format(self.klass)45class PyscriptError(Exception):46    def __init__(self, name, err):47        self.name = name48        self.err = err49    def __str__(self):50        return 'Pyscript error in {}: {}'.format(self.name, self.err)51class IntervalError(Exception):52    def __str__(self):53        return 'Poorly matched BeginInterval-CompleteInterval'54class MainError(Exception):55    def __str__(self):56        return 'No "main" function defined'57def verbose_skip(func):58    def decorator(obj, *args, **kw):59        if obj._truncate:60            return61        fname = func.__name__62#        print fname, obj.testing_syntax, obj._cancel63        if fname.startswith('_m_'):64            fname = fname[3:]65        args1, _, _, defaults = inspect.getargspec(func)66        nd = sum([1 for di in defaults if di is not None]) if defaults else 067        min_args = len(args1) - 1 - nd68        an = len(args) + len(kw)69        if an < min_args:70            raise PyscriptError('invalid arguments count for {}, args={} kwargs={}'.format(fname,71                                                                                           args, kw))72#        if obj.testing_syntax or obj._cancel:73#            return74        if obj._cancel:75            return76        obj.debug('{} {} {}'.format(fname, args, kw))77        return func(obj, *args, **kw)78    return decorator79def skip(func):80    def decorator(obj, *args, **kw):81        if obj.testing_syntax or obj._cancel or obj._truncate:82            return83        return func(obj, *args, **kw)84    return decorator85def count_verbose_skip(func):86    def decorator(obj, *args, **kw):87        if obj._truncate or obj._cancel:88            return89        fname = func.__name__90#        print fname, obj.testing_syntax, obj._cancel91        if fname.startswith('_m_'):92            fname = fname[3:]93        args1, _, _, defaults = inspect.getargspec(func)94        nd = sum([1 for di in defaults if di is not None]) if defaults else 095        min_args = len(args1) - 1 - nd96        an = len(args) + len(kw)97        if an < min_args:98            raise PyscriptError('invalid arguments count for {}, args={} kwargs={}'.format(fname,99                                                                                           args, kw))100#        if obj._cancel:101        if obj.testing_syntax:102            func(obj, calc_time=True, *args, **kw)103            return104        obj.debug('{} {} {}'.format(fname, args, kw))105        return func(obj, *args, **kw)106    return decorator107def makeRegistry():108    registry = {}109    def registrar(func):110        registry[func.__name__] = func.__name__111        return func  # normally a decorator returns a wrapped function,112                    # but here we return func unmodified, after registering it113    registrar.commands = registry114    return registrar115def makeNamedRegistry(cmd_register):116    def named_register(name):117        def decorator(func):118            cmd_register.commands[name] = func.__name__119            return func120        return decorator121    return named_register122command_register = makeRegistry()123named_register = makeNamedRegistry(command_register)124'''125@todo: cancel script if action fails. eg fatal comm. error126'''127class PyScript(Loggable):128    text = Property129    syntax_checked = Property130    manager = Any131    parent = Any132    parent_script = Any133    root = Str134    filename = Str135    info_color = Str136    testing_syntax = Bool(False)137    cancel_flag = Bool138    hash_key = None139    _ctx = Dict140    _text = Str141    _interval_stack = Queue142    _interval_flag = None143    _cancel = Bool(False)144    _completed = False145    _truncate = False146    _syntax_checked = False147    _syntax_error = None148    _gosub_script = None149    _wait_dialog = None150    _estimated_duration = 0151    _graph_calc = False152    def execute(self, new_thread=False, bootstrap=True, finished_callback=None):153        def _ex_():154            if bootstrap:155                self.bootstrap()156            ok = True157            if not self.syntax_checked:158                self.test()159            if ok:160                self._execute()161                if finished_callback:162                    finished_callback()163            return self._completed164        if new_thread:165            t = Thread(target=_ex_)166            t.start()167        else:168            return _ex_()169    def test(self):170        self.testing_syntax = True171        self._syntax_error = True172        self.info('testing syntax')173        r = self._execute(test=True)174        if r is not None:175            self.info('invalid syntax')176            ee = PyscriptError(self.filename, r)177            raise ee178        elif not self._interval_stack.empty():179            raise IntervalError()180        else:181            self.info('syntax checking passed')182            self._syntax_error = False183        self.syntax_checked = True184        self.testing_syntax = False185    def compile_snippet(self, snippet):186        try:187            code = compile(snippet, '<string>', 'exec')188        except Exception, e:189            import traceback190            traceback.print_exc()191            return e192        else:193            return code194    def execute_snippet(self, snippet):195        safe_dict = self.get_context()196        code_or_err = self._compile_snippet(snippet)197        if not isinstance(code_or_err, Exception):198            try:199                exec code_or_err in safe_dict200                safe_dict['main']()201            except KeyError, e:202                return MainError()203            except Exception, e:204                import traceback205                traceback.print_exc()206# #            self.warning_dialog(str(e))207                return e208        else:209            return code_or_err210#            return  traceback.format_exc()211#        safe_dict = self.get_context()212#        try:213#            code = compile(snippet, '<string>', 'exec')214#            exec code in safe_dict215#            safe_dict['main']()216#        except KeyError, e:217#            print e218#            print '#============'219#220#            for di in safe_dict.keys():221#                print di222#223#            return MainError()224#225#        except Exception, e:226#            import traceback227#            traceback.print_exc()228# #            self.warning_dialog(str(e))229#            return e230#            return  traceback.format_exc()231    def syntax_ok(self):232        return not self._syntax_error233    def check_for_modifications(self):234        old = self.toblob()235        with open(self.filename, 'r') as f:236            new = f.read()237        return old != new238    def toblob(self):239        return self.text240    def get_estimated_duration(self):241        return self._estimated_duration242    def set_default_context(self):243        pass244    def setup_context(self, **kw):245        self._ctx.update(kw)246    def get_context(self):247        ctx = dict()248        for k  in self.get_commands():249            if isinstance(k, tuple):250                ka, kb = k251                name, func = ka, getattr(self, kb)252            else:253                name, func = k, getattr(self, k)254            ctx[name] = func255        for v in self.get_variables():256            ctx[v] = getattr(self, v)257        ctx.update(self._ctx)258        return ctx259    def get_variables(self):260        return []261#    def get_core_commands(self):262#        cmds = [263# #                ('info', '_m_info')264#                ]265#266#        return cmds267#    def get_script_commands(self):268#        return []269    def get_commands(self):270#        return self.get_core_commands() + \271#        return self.get_script_commands() + \272        return self.get_command_register() + \273                        command_register.commands.items()274    def get_command_register(self):275        return []276    def truncate(self, style=None):277        if style is None:278            self._truncate = True279        if self._gosub_script is not None:280            self._gosub_script.truncate(style=style)281    def cancel(self):282        self._cancel = True283        if self._gosub_script is not None:284            if not self._gosub_script._cancel:285                self._gosub_script.cancel()286        if self.parent:287            self.parent._executing = False288        if self.parent_script:289            if not self.parent_script._cancel:290                self.parent_script.cancel()291        if self._wait_dialog:292            self._wait_dialog.stop()293        self._cancel_hook()294    def bootstrap(self, load=True, **kw):295        self._interval_flag = Event()296        self._interval_stack = Queue()297        if self.root and self.name and load:298            with open(self.filename, 'r') as f:299                self.text = f.read()300            return True301#==============================================================================302# commands303#==============================================================================304    @command_register305    def gosub(self, name=None, root=None, klass=None, **kw):306        if not name.endswith('.py'):307            name += '.py'308        if root is None:309            d = None310            if '/' in name:311                d = '/'312            elif ':' in name:313                d = ':'314            if d:315                dirs = name.split(d)316                name = dirs[0]317                for di in dirs[1:]:318                    name = os.path.join(name, di)319            root = self.root320        p = os.path.join(root, name)321        if not os.path.isfile(p):322            raise GosubError(p)323        if klass is None:324            klass = self.__class__325        else:326            klassname = klass327            pkg = 'src.pyscripts.api'328            mod = __import__(pkg, fromlist=[klass])329            klass = getattr(mod, klass)330        if not klass:331            raise KlassError(klassname)332        s = klass(root=root,333#                          path=p,334                          name=name,335                          manager=self.manager,336                          parent_script=self,337                          _syntax_checked=self._syntax_checked,338                          _ctx=self._ctx,339                          **kw340                          )341        if self.testing_syntax:342            s.bootstrap()343            err = s.test()344            if err:345                raise PyscriptError(self.name, err)346        else:347            if not self._cancel:348                self.info('doing GOSUB')349                self._gosub_script = s350                s.execute()351                self._gosub_script = None352                if not self._cancel:353                    self.info('gosub finished')354    @verbose_skip355    @command_register356    def exit(self):357        self.info('doing EXIT')358        self.cancel()359    @command_register360    def complete_interval(self):361        try:362            pi = self._interval_stack.get(timeout=0.01)363            if pi != 'b':364                raise IntervalError()365        except Empty:366            raise IntervalError()367        if self.testing_syntax:368            return369        if self._cancel:370            return371        self.info('COMPLETE INTERVAL waiting for begin interval completion')372        # wait until _interval_flag is set373        while not self._interval_flag.isSet():374            if self._cancel:375                break376            self._sleep(0.5)377        if not self._cancel:378            self._interval_flag.clear()379    @verbose_skip380    @command_register381    def begin_interval(self, duration=0):382        def wait(t):383            self._sleep(t)384            if not self._cancel:385                self.info('interval finished')386                if self._interval_flag:387                    self._interval_flag.set()388        duration = float(duration)389#        if self.testing_syntax or self._cancel:390#            return391        self._interval_stack.put('b')392        self.info('BEGIN INTERVAL waiting for {}'.format(duration))393        t = Thread(target=wait, args=(duration,))394        t.start()395    @command_register396    def sleep(self, duration=0, message=None):397        self._estimated_duration += duration398        if self.parent_script is not None:399            self.parent_script._estimated_duration += self._estimated_duration400        if self._graph_calc:401            va = self._xs[-1] + duration402            self._xs.append(va)403            self._ys.append(self._ys[-1])404            return405        if self.testing_syntax or self._cancel:406            return407        self.info('SLEEP {}'.format(duration))408        if globalv.experiment_debug:409            duration = min(duration, 5)410        self._sleep(duration, message=message)411    @skip412    @named_register('info')413    def _m_info(self, message=None):414        message = str(message)415        self.info(message)416        try:417            if self.manager:418                if self.info_color:419                    self.manager.info(message, color=self.info_color, log=False)420                else:421                    self.manager.info(message, log=False)422        except AttributeError, e:423            self.debug('m_info {}'.format(e))424#===============================================================================425# handlers426#===============================================================================427    def _cancel_flag_changed(self, v):428        if v:429            result = confirmation(None, 'Are you sure you want to cancel {}'.format(self.logger_name))430            if result != 5104:431                self.cancel()432            else:433                self.cancel_flag = False434#===============================================================================435# private436#===============================================================================437    def _execute(self):438        if not self.text:439            return 'No script text'440        self._cancel = False441        self._completed = False442        self._truncate = False443        error = self.execute_snippet(self.text)444        if error:445            return error446        if self.testing_syntax:447            return448        if self._cancel:449            self.info('{} canceled'.format(self.name))450        else:451            self.info('{} completed successfully'.format(self.name))452            self._completed = True453            if self.parent:454                self.parent._executing = False455                try:456                    del self.parent.scripts[self.hash_key]457                except KeyError:458                    pass459    def _manager_action(self, func, name=None, protocol=None, *args, **kw):460        man = self.manager461        if protocol is not None and man is not None:462            app = man.application463            if app is not None:464                args = (protocol,)465                if name is not None:466                    args = (protocol, 'name=="{}"'.format(name))467                man = app.get_service(*args)468        if man is not None:469            if not isinstance(func, list):470                func = [(func, args, kw)]471            return [getattr(man, f)(*a, **k) for f, a, k in func]472        else:473            self.warning('could not find manager {}'.format(name))474    def _cancel_hook(self):475        pass476#==============================================================================477# Sleep/ Wait478#==============================================================================479    def _sleep(self, v, message=None):480        v = float(v)481#        if self.testing_syntax or self._cancel:482#            return483        if v > 1:484            if v >= 5:485                self._block(v, message=message, dialog=True)486            else:487                self._block(v, dialog=False)488        else:489            time.sleep(v)490    def _block(self, timeout, message=None, dialog=False):491        if dialog:492            if message is None:493                message = ''494            st = time.time()495#            c = Condition()496#            c.acquire()497            evt = Event()498            self._wait_dialog = wd = WaitDialog(wtime=timeout,499#                                condition=c,500                                end_evt=evt,501                                parent=self,502                                title='{} - Wait'.format(self.logger_name),503                                message='Waiting for {:0.1f}  {}'.format(timeout, message)504                                )505            do_later(wd.edit_traits)506            evt.wait(timeout=timeout + 0.25)507            do_later(wd.stop)508            if wd._canceled:509                self.cancel()510            elif wd._continued:511                self.info('continuing script after {:0.3f} s'.format(time.time() - st))512        else:513            st = time.time()514            time.sleep(1)515            while time.time() - st < timeout:516                if self._cancel:517                    break518                time.sleep(1)519#===============================================================================520# properties521#===============================================================================522    @property523    def filename(self):524        return os.path.join(self.root, self.name)525    @property526    def state(self):527        # states528        # 0=running529        # 1=canceled530        # 2=completed531        if self._cancel:532            return '1'533        if self._completed:534            return '2'535        return '0'536    def _get_text(self):537        return self._text538    def _set_text(self, t):539        self._text = t540    def _get_syntax_checked(self):541        return self._syntax_checked542    def _set_syntax_checked(self, v):543        self._syntax_checked = v544    def __str__(self):545        return self.name546if __name__ == '__main__':547    from src.helpers.logger_setup import logging_setup548    logging_setup('pscript')549#    execute_script(t)550    from src.paths import paths551    p = PyScript(root=os.path.join(paths.scripts_dir, 'pyscripts'),552                      path='test.py',553                      _manager=DummyManager())554    p.execute()...daemonprocess.py
Source:daemonprocess.py  
1#!/usr/bin/env python22"""3Syncthing-GTK - DaemonProcess4Runs syncthing daemon process as subprocess of application5"""6from __future__ import unicode_literals7from gi.repository import Gio, GLib, GObject8from syncthing_gtk.tools import IS_WINDOWS9from collections import deque10import os, logging11log = logging.getLogger("DaemonProcess")12HAS_SUBPROCESS = hasattr(Gio, "Subprocess")13if IS_WINDOWS:14	# POpen is used on Windows15	from subprocess import Popen, PIPE, STARTUPINFO, STARTF_USESHOWWINDOW16	from syncthing_gtk.windows import WinPopenReader, nice_to_priority_class17elif not HAS_SUBPROCESS:18	# Gio.Subprocess is not available in Gio < 3.1219	from subprocess import Popen, PIPE20class DaemonProcess(GObject.GObject):21	__gsignals__ = {22		# line(text)	- emitted when process outputs full line23		b"line"			: (GObject.SIGNAL_RUN_FIRST, None, (object,)),24		# exit(code)	- emitted when process exits25		b"exit"			: (GObject.SIGNAL_RUN_FIRST, None, (int,)),26		# failed(exception) - emitted if process fails to start27		b"failed"		: (GObject.SIGNAL_RUN_FIRST, None, (object,)),28	}29	SCROLLBACK_SIZE = 500	# Maximum number of output lines stored in memory30	PRIORITY_LOWEST		= 1931	PRIORITY_LOW		= 1032	PRIORITY_NORMAL		= 033	PRIORITY_HIGH		= -1034	PRIORITY_HIGHEST	= -2035	36	def __init__(self, cmdline, priority=PRIORITY_NORMAL, max_cpus=0, env={}):37		""" cmdline should be list of arguments """38		GObject.GObject.__init__(self)39		self.cmdline = cmdline40		self.priority = priority41		self.env = { x:env[x] for x in env }42		self.env["STNORESTART"] = "1"	# see syncthing --help43		self.env["STNOUPGRADE"] = "1"44		if max_cpus > 0:45			self.env["GOMAXPROCS"] = str(max_cpus)46		self._proc = None47	48	def start(self):49		for x in self.env:50			os.environ[x] = self.env[x]51		try:52			self._cancel = Gio.Cancellable()53			if IS_WINDOWS:54				# Windows55				sinfo = STARTUPINFO()56				sinfo.dwFlags = STARTF_USESHOWWINDOW57				sinfo.wShowWindow = 058				cflags = nice_to_priority_class(self.priority)59				self._proc = Popen(self.cmdline,60							stdin=PIPE, stdout=PIPE, stderr=PIPE,61							startupinfo=sinfo, creationflags=cflags)62				self._stdout = WinPopenReader(self._proc.stdout)63				self._check = GLib.timeout_add_seconds(1, self._cb_check_alive)64			elif HAS_SUBPROCESS:65				# New Gio66				flags = Gio.SubprocessFlags.STDOUT_PIPE | Gio.SubprocessFlags.STDERR_MERGE67				if self.priority == 0:68					self._proc = Gio.Subprocess.new(self.cmdline, flags)69				else:70					# I just really do hope that there is no distro w/out nice command71					self._proc = Gio.Subprocess.new([ "nice", "-n", "%s" % self.priority ] + self.cmdline, flags)72				self._proc.wait_check_async(None, self._cb_finished)73				self._stdout = self._proc.get_stdout_pipe()74			else:75				# Gio < 3.12 - Gio.Subprocess is missing :(76				if self.priority == 0:77					self._proc = Popen(self.cmdline, stdout=PIPE)78				else:79					# still hoping80					self._proc = Popen([ "nice", "-n", "%s" % self.priority ], stdout=PIPE)81				self._stdout = Gio.UnixInputStream.new(self._proc.stdout.fileno(), False)82				self._check = GLib.timeout_add_seconds(1, self._cb_check_alive)83		except Exception as e:84			# Startup failed85			self.emit("failed", e)86			return87		self._lines = deque([], DaemonProcess.SCROLLBACK_SIZE)88		self._buffer = ""89		self._stdout.read_bytes_async(256, 0, self._cancel, self._cb_read, ())90	91	def _cb_read(self, pipe, results, *a):92		""" Handler for read_bytes_async """93		try:94			response = pipe.read_bytes_finish(results)95		except Exception as e:96			if not self._cancel.is_cancelled():97				log.exception(e)98				GLib.idle_add(pipe.read_bytes_async, 256, 1, None, self._cb_read)99			return100		response = response.get_data().decode('utf-8')101		self._buffer = "%s%s" % (self._buffer, response)102		while "\n" in self._buffer:103			line, self._buffer = self._buffer.split("\n", 1)104			self._lines.append(line)105			self.emit('line', line)106		if not self._cancel.is_cancelled():107			GLib.idle_add(pipe.read_bytes_async, 256, 1, None, self._cb_read, ())108	109	def _cb_check_alive(self, *a):110		"""111		Repeatedly check if process is still alive.112		Called only on windows113		"""114		if self._proc == None:115			# Never started or killed really fast116			self.emit('exit', 1)117			self._cancel.cancel()118			if IS_WINDOWS: self._stdout.close()119			return False120		self._proc.poll()121		if self._proc.returncode is None:122			# Repeat until finished or canceled123			return (not self._cancel.is_cancelled())124		# Child just died :)125		self.emit('exit', self._proc.returncode)126		self._cancel.cancel()127		if IS_WINDOWS: self._stdout.close()128		return False129	130	def _cb_finished(self, proc, results):131		"""132		Callback for wait_check_async.133		With Gio < 3.12, timer and _cb_check_alive is used.134		"""135		try:136			proc.wait_check_finish(results)137			log.info("Subprocess finished with code %s", proc.get_exit_status())138		except GLib.GError:139			# Exited with exit code140			log.info("Subprocess exited with code %s", proc.get_exit_status())141		if proc.get_exit_status() == 127:142			# Command not found143			self.emit("failed", Exception("Command not found"))144		else:145			self.emit('exit', proc.get_exit_status())146		if IS_WINDOWS: self._stdout.close()147		self._cancel.cancel()148	149	def terminate(self):150		""" Terminates process (sends SIGTERM) """151		if not self._proc is None:152			if IS_WINDOWS:153				# Windows154				self._proc.terminate()155			elif HAS_SUBPROCESS:156				# Gio.Subprocess157				self._proc.send_signal(15)158			else:159				# subprocess.Popen160				self._proc.terminate()161			self._proc = None162			if IS_WINDOWS: self._stdout.close()163			self._cancel.cancel()164	165	def kill(self):166		""" Kills process (sends SIGTERM) """167		if not self._proc is None:168			if IS_WINDOWS:169				# Windows - can't actually kill170				self._proc.terminate()171			elif HAS_SUBPROCESS:172				# Gio.Subprocess173				self._proc.force_exit()174			else:175				# subprocess.Popen176				self._proc.kill()177			self._proc = None178			if IS_WINDOWS: self._stdout.close()179			self._cancel.cancel()180	181	def get_output(self):182		""" Returns process output as iterable list of lines """183		return self._lines184	185	def get_commandline(self):186		""" Returns commandline used to start process """...scheduleperiodic.py
Source:scheduleperiodic.py  
1from rx.disposables import SerialDisposable2class SchedulePeriodic(object):3    """Scheduler with support for running periodic tasks. This type of4    scheduler can be used to run timers more efficiently instead of using5    recursive scheduling."""6    def __init__(self, scheduler, period, action, state=None):7        """8        Keyword arguments:9        state -- Initial state passed to the action upon the first iteration.10        period -- Period for running the work periodically.11        action -- Action to be executed, potentially updating the state."""12        self._scheduler = scheduler13        self._state = state14        self._period = period15        self._action = action16        self._cancel = SerialDisposable()17    def tick(self, scheduler, command):18        self._cancel.disposable = self._scheduler.schedule_relative(self._period, self.tick, 0)19        try:20            new_state = self._action(self._state)21        except Exception:22            self._cancel.dispose()23            raise24        else:25            if new_state is not None:  # Update state if other than None26                self._state = new_state27    def start(self):28        """Returns the disposable object used to cancel the scheduled recurring29        action (best effort).30        """31        self._cancel.disposable = self._scheduler.schedule_relative(self._period, self.tick, 0)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
