How to use _current_handlers method in autotest

Best Python code snippet using autotest_python

MouseBehavior.py

Source:MouseBehavior.py Github

copy

Full Screen

1"""2MouseBehavior.py3$Id$4TODO:5split into other files, one for class Tool*,6one for parse_command (rename to avoid confusion w/ user command package?), etc7"""8from pyglet.event import EVENT_HANDLED9from demoapp.foundation.description_utils import description_maker_for_methods_in_class10DEBUG_TRANSITIONS = False11# ==12class PlaceHolder(object): #refile13 """14 For singleton symbol-like instances which can be found and replaced15 wherever they occur in other values, and are never confused16 with ordinary values.17 """18 def __init__(self, name):19 self.name = name20 def __repr__(self):21 return self.name22 pass23CMD_RETVAL = PlaceHolder('CMD_RETVAL')24SAME_STATE = PlaceHolder('SAME_STATE')25# ==26NOOP = 'NOOP' # stub27 # unlike None, this means we stop at this object, never calling lower handlers28def parse_command(command): ### REFACTOR into this returning name, args, parse_tip, and parse_transition29 "return (name, args) for any command"30 #### DECIDE: what actually this is retval? event name???? in pane or model? which coords? (guess: model, model coords)31 # TODO: improve to have more return options, be a first class object -- really an expr (description of what to do)32 if type(command) is tuple:33 name, args = command[0], command[1:]34 elif command is None:35 return None, ()36 else:37 # any command with no args38 name, args = command, ()39 assert isinstance(name, type("")) #e improve40 return name, args41# ==42class Transition(object): # use superclass Description??43 """44 Describe a potential state transition, for use by something45 which might actually do it or might just indicate something46 about it in the UI.47 Note: callers are encouraged to supply these as named arguments for clarity,48 and to always supply them in this order, and to supply all of them49 (except for handled) even when they have their default values.50 @param indicators: tooltip and highlighting indicators, to show the user51 what would happen if this transition was taken.52 @type indicators: sequence of HighlightGraphics_descriptions objects ###doc more53 @param command: what to do to the model when this transition is taken.54 Can be None for "no operation".55 @type command: ('command name', *parameters )56 @param next_state: next state to go into, when this transition is taken.57 Can be SAME_STATE to remain in the same state with the58 same parameters. Can include CMD_RETVAL when the command59 return value is needed as a parameter of the new state.60 @type next_state: ( state_class, *parameters)61 @param handled: whether a mouse event resulting in this transition being62 indicated or taken has been fully handled (true) or needs63 further handling by background event handlers (false)64 @type handled: boolean65 """66 def __init__(self,67 indicators = (),68 command = None,69 next_state = None,70 handled = True ):71 self.indicators = indicators72 self.command = command73 self.next_state = next_state74 self.handled = handled and EVENT_HANDLED ## technically: or None75 #e todo: also save file and line of caller, if a debug option is set,76 # and print this in tracebacks when processing this transition77 # (perhaps using a feature to store extra info in frames to be printed then?)78 pass79def parse_transition(transition):80 t = transition81 if t is None:82 return None, None, SAME_STATE, False # guesses, 080616 night83 return t.indicators, t.command, t.next_state, t.handled84def parse_state( state_desc):85 "return class, args"86 # maybe: use parse_description_tuple?87 # maybe optim: replacements at same time (as this or as instantiate_state)?88 if state_desc is None:89 return None, ()90 try:91 return state_desc[0], state_desc[1:]92 except:93 print "following exception was in parse_state(%r):" % (state_desc,)94 raise95 pass96def replace_symbols_in(desc, **replacements): # maybe: grab code from exprs for Symbols, to generalize97 res = desc98 if type(desc) is type(()):99 res = tuple([replace_symbols_in(x, **replacements) for x in desc])100 elif type(desc) is PlaceHolder:101 if desc.name in replacements:102 res = replacements[desc.name]103 pass104 # todo: dicts, lists105 # maybe: if res == desc: return desc106 return res107# ==108class ToolStateBehavior(object):109 #doc; related to MouseBehavior (is name ok? ToolState by itself seemed ambiguous...)110 """111 """112 _cmd_retval = None113 def __init__(self, tool):114 "subclasses typically have more init args"115 self.tool = tool116 self.pane = tool.pane # convenience for subclasses117 self.model = tool.pane.model118 return119 def transition_to(self, next_state):120 if next_state is not SAME_STATE:121 if DEBUG_TRANSITIONS:122 print "%r transto %r" % (self, next_state) ##### DEBUG123 self.tool.transition_to( next_state,124 CMD_RETVAL = self._cmd_retval )125 # maybe: might decide tool can grab it from self if needed (passed as an arg)126 pass127# ==128class Tool(object):129 """130 Subclasses are specific tools, not being used.131 Instances of subclases are uses of specific tools in specific panes.132 """133 # per-subclass constants134 _default_state = None135 HighlightGraphics_class = None136 # instance variables137 _current_handlers = None138 _f_HighlightGraphics_instance = None # (could probably be per-class)139 _f_HighlightGraphics_descriptions = None # (could probably be per-class)140 def __init__(self, pane):141 """142 @note: doesn't do self.activate()143 """144 self.pane = pane145 # optim (minor): the following could probably be cached per-class146 if self.HighlightGraphics_class:147 tool = self148 self._f_HighlightGraphics_instance = self.HighlightGraphics_class(tool)149 self._f_HighlightGraphics_descriptions = description_maker_for_methods_in_class( self.HighlightGraphics_class)150 pass151 def activate(self, initial_state = None):152 """153 @note: doesn't deactivate other tools on self.pane154 """155 self.transition_to(initial_state or self._default_state)156 return157 def deactivate(self):158 self.remove_state_handlers()159 return160 def remove_state_handlers(self):161 if self._current_handlers:162 self.pane.remove_handlers(self._current_handlers)163 self._current_handlers = None164 return165 def transition_to(self, next_state, **replacements):166 self.remove_state_handlers()167 next_state = replace_symbols_in( next_state, **replacements )168 new_handlers = self.instantiate_state( next_state)169 self.push_state_handlers( new_handlers)170 # review: would this be better if it could call a new171 # replace_handlers method on EventDispatcher172 # (so as to insert them at the same stack level as before)?173 # not sure whether this ever matters in practice...174 # yes, it does matter -- I tried putting some controls on top175 # of the tool area, but that fails from the start, since the176 # first toolstate ends up on top of them.177 return178 def push_state_handlers(self, new_handlers):179 self.pane.push_handlers(new_handlers)180 self._current_handlers = new_handlers181 return182 def instantiate_state(self, state):183 try:184 assert state, "state must not be %r" % (state,)185 state_class, state_args = parse_state(state)186 assert issubclass( state_class, ToolStateBehavior ), \187 "%r should be a subclass of ToolStateBehavior" % (state_class,)188 res = state_class( self, *state_args)189 return res190 except:191 print "following exception is in %r.instantiate_state(%r):" % (self, state)192 raise193 pass...

Full Screen

Full Screen

core.py

Source:core.py Github

copy

Full Screen

1import libyang2import logging3import asyncio4import os5import time6from .server_connector import create_server_connector7from .errors import InvalArgError, InternalError, UnsupportedError8from .util import call9logger = logging.getLogger(__name__)10DEFAULT_REVERT_TIMEOUT = int(os.getenv("GOLDSTONE_DEFAULT_REVERT_TIMEOUT", 6))11class ChangeHandler(object):12 def __init__(self, server, change):13 self.server = server14 self.change = change15 self.type = self.change.type16 def setup_cache(self, user):17 cache = user.get("cache")18 if not user.get("cache"):19 cache = self.server.conn.get_config_cache(user["changes"])20 user["cache"] = cache21 return cache22 def validate(self, user):23 pass24 def apply(self, user):25 pass26 def revert(self, user):27 pass28NoOp = ChangeHandler29class ServerBase(object):30 def __init__(self, conn, module, revert_timeout=DEFAULT_REVERT_TIMEOUT):31 self.conn = create_server_connector(conn, module)32 self.handlers = {}33 self._current_handlers = None # (req_id, handlers, user)34 self._stop_event = asyncio.Event()35 self.revert_timeout = revert_timeout36 self.lock = asyncio.Lock()37 def get_running_data(38 self, xpath, default=None, strip=True, include_implicit_defaults=False39 ):40 return self.conn.get(41 xpath,42 default=default,43 strip=strip,44 include_implicit_defaults=include_implicit_defaults,45 )46 def get_operational_data(47 self, xpath, default=None, strip=True, include_implicit_defaults=False48 ):49 return self.conn.get_operational(50 xpath,51 default=default,52 strip=strip,53 include_implicit_defaults=include_implicit_defaults,54 )55 def get_handler(self, xpath):56 xpath = libyang.xpath_split(xpath)57 cursor = self.handlers58 for x in xpath:59 v = cursor.get(x[1])60 if v == None:61 return None62 if type(v) == type and issubclass(v, ChangeHandler):63 return v64 cursor = v65 return NoOp66 async def start(self):67 self.conn.subscribe_module_change(self.change_cb)68 self.conn.subscribe_oper_data_request(self._oper_cb)69 return [self._stop_event.wait()]70 def stop(self):71 self._stop_event.set()72 self.conn.stop()73 async def reconcile(self):74 logger.debug("reconcile")75 # In Goldstone Management Layer, one model is subcribed by one Server.76 # When a sysrepo transction only includes changes for one model, 'abort' event77 # never happens. However, when a sysrepo transaction includes changes for more than one model,78 # change_cb() may get an 'abort' event if error happens in a change_cb() of other servers.79 #80 # We store the handlers created in the 'change' event, and discard them if we get a succeeding 'done' event.81 # If we get 'abort' event, call revert() of the stored handlers then discard them.82 #83 # - 'change' event handling84 # - 1. iterate through the changes, do basic validation, degenerate changes if possible85 # - 2. do the actual change handling, if any error happens, revert the changes made in advance and raise error86 # - 3. store the handlers for 'abort' event87 # - 'done' event handling88 # - 1. discard the stored handlers89 # - 'abort' event handling90 # - 1. call revert() of the stored handlers91 # - 2. discard the stored handlers92 #93 # ## Client side timeout handling94 # When timeout of client expired during 'change' event handling, sysrepo doesn't issue any event.95 # We create a task that waits for "done" or "abort" event after finishing "change" event handling.96 # If no event arrives within certain amount of time, assume it as client timeout happens then clal revert()97 # of the stored handlers.98 # If "done" or "abort" event arrives within the time window, cancel this task.99 async def change_cb(self, event, req_id, changes, priv):100 logger.debug(f"id: {req_id}, event: {event}, changes: {changes}")101 async with self.lock:102 if event not in ["change", "done", "abort"]:103 logger.warning(f"unsupported event: {event}")104 return105 if event in ["done", "abort"]:106 if self._current_handlers == None:107 logger.error(f"current_handlers is null")108 self._stop_event.set()109 raise InternalError("fatal error happened")110 id, handlers, user, revert_task = self._current_handlers111 revert_task.cancel()112 try:113 await revert_task114 except asyncio.CancelledError:115 pass116 if id != req_id:117 logger.error(f"{id=} != {req_id=}")118 self._stop_event.set()119 raise InternalError("fatal error happened")120 if event == "abort":121 for done in reversed(handlers):122 await call(done.revert, user)123 self._current_handlers = None124 return125 if self._current_handlers != None:126 id, handlers, user, revert_task = self._current_handlers127 raise InternalError(128 f"waiting 'done' or 'abort' event for id {id}. got '{event}' event for id {req_id}"129 )130 handlers = []131 user = {"changes": changes}132 await call(self.pre, user)133 for change in changes:134 cls = self.get_handler(change.xpath)135 if not cls:136 if change.type == "deleted":137 continue138 raise UnsupportedError(f"{change.xpath} not supported")139 h = cls(self, change)140 # to support async initialization141 init = getattr(h, "_init", None)142 if init:143 await call(init, user)144 await call(h.validate, user)145 handlers.append(h)146 for i, handler in enumerate(handlers):147 try:148 await call(handler.apply, user)149 except Exception as e:150 for done in reversed(handlers[:i]):151 await call(done.revert, user)152 raise e153 await call(self.post, user)154 async def do_revert():155 await asyncio.sleep(self.revert_timeout)156 logging.warning("client timeout happens? reverting changes we made")157 for done in reversed(handlers):158 await call(done.revert, user)159 self._current_handlers = None160 revert_task = asyncio.create_task(do_revert())161 self._current_handlers = (req_id, handlers, user, revert_task)162 def pre(self, user):163 pass164 def post(self, user):165 pass166 async def _oper_cb(self, xpath, priv):167 logger.debug(f"xpath: {xpath}")168 time_start = time.perf_counter_ns()169 data = await call(self.oper_cb, xpath, priv)170 time_end = time.perf_counter_ns()171 elapsed = (time_end - time_start) / 1000_1000_10172 logger.debug(f"xpath: {xpath}, elapsed: {elapsed}sec")173 return data174 def oper_cb(self, xpath, priv):175 pass176 def send_notification(self, name, notification):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful