How to use _Cycle method in autotest

Best Python code snippet using autotest_python Github


Full Screen

...46 calls on application objects implementing base interfaces and47 translate calls from application objects implementing base interfaces48 into tickets sent to a joined link.49 """50class _Cycle(object):51 """State for a single start-stop End lifecycle."""52 def __init__(self, pool):53 self.pool = pool54 self.grace = False55 self.futures = []56 self.operations = {}57 self.idle_actions = []58def _abort(operations):59 for operation in operations:60 operation.abort(base.Outcome.Kind.LOCAL_SHUTDOWN)61def _cancel_futures(futures):62 for future in futures:63 future.cancel()64def _future_shutdown(lock, cycle, event):65 def in_future():66 with lock:67 _abort(cycle.operations.values())68 _cancel_futures(cycle.futures)69 return in_future70class _End(End):71 """An End implementation."""72 def __init__(self, servicer_package):73 """Constructor.74 Args:75 servicer_package: A _ServicerPackage for servicing operations or None if76 this end will not be used to service operations.77 """78 self._lock = threading.Condition()79 self._servicer_package = servicer_package80 self._stats = {outcome_kind: 0 for outcome_kind in base.Outcome.Kind}81 self._mate = None82 self._cycle = None83 def _termination_action(self, operation_id):84 """Constructs the termination action for a single operation.85 Args:86 operation_id: The operation ID for the termination action.87 Returns:88 A callable that takes an operation outcome kind as its sole parameter and89 that should be used as the termination action for the operation90 associated with the given operation ID.91 """92 def termination_action(outcome_kind):93 with self._lock:94 self._stats[outcome_kind] += 195 self._cycle.operations.pop(operation_id, None)96 if not self._cycle.operations:97 for action in self._cycle.idle_actions:98 self._cycle.pool.submit(action)99 self._cycle.idle_actions = []100 if self._cycle.grace:101 _cancel_futures(self._cycle.futures)102 self._cycle.pool.shutdown(wait=False)103 self._cycle = None104 return termination_action105 def start(self):106 """See base.End.start for specification."""107 with self._lock:108 if self._cycle is not None:109 raise ValueError('Tried to start a not-stopped End!')110 else:111 self._cycle = _Cycle(logging_pool.pool(1))112 def stop(self, grace):113 """See base.End.stop for specification."""114 with self._lock:115 if self._cycle is None:116 event = threading.Event()117 event.set()118 return event119 elif not self._cycle.operations:120 event = threading.Event()121 self._cycle.pool.submit(event.set)122 self._cycle.pool.shutdown(wait=False)123 self._cycle = None124 return event125 else:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?