Best Python code snippet using autotest_python
_end.py
Source:_end.py  
...46  calls on application objects implementing base interfaces and47  translate calls from application objects implementing base interfaces48  into tickets sent to a joined link.49  """50class _Cycle(object):51  """State for a single start-stop End lifecycle."""52  def __init__(self, pool):53    self.pool = pool54    self.grace = False55    self.futures = []56    self.operations = {}57    self.idle_actions = []58def _abort(operations):59  for operation in operations:60    operation.abort(base.Outcome.Kind.LOCAL_SHUTDOWN)61def _cancel_futures(futures):62  for future in futures:63    future.cancel()64def _future_shutdown(lock, cycle, event):65  def in_future():66    with lock:67      _abort(cycle.operations.values())68      _cancel_futures(cycle.futures)69  return in_future70class _End(End):71  """An End implementation."""72  def __init__(self, servicer_package):73    """Constructor.74    Args:75      servicer_package: A _ServicerPackage for servicing operations or None if76        this end will not be used to service operations.77    """78    self._lock = threading.Condition()79    self._servicer_package = servicer_package80    self._stats = {outcome_kind: 0 for outcome_kind in base.Outcome.Kind}81    self._mate = None82    self._cycle = None83  def _termination_action(self, operation_id):84    """Constructs the termination action for a single operation.85    Args:86      operation_id: The operation ID for the termination action.87    Returns:88      A callable that takes an operation outcome kind as its sole parameter and89        that should be used as the termination action for the operation90        associated with the given operation ID.91    """92    def termination_action(outcome_kind):93      with self._lock:94        self._stats[outcome_kind] += 195        self._cycle.operations.pop(operation_id, None)96        if not self._cycle.operations:97          for action in self._cycle.idle_actions:98            self._cycle.pool.submit(action)99          self._cycle.idle_actions = []100          if self._cycle.grace:101            _cancel_futures(self._cycle.futures)102            self._cycle.pool.shutdown(wait=False)103            self._cycle = None104    return termination_action105  def start(self):106    """See base.End.start for specification."""107    with self._lock:108      if self._cycle is not None:109        raise ValueError('Tried to start a not-stopped End!')110      else:111        self._cycle = _Cycle(logging_pool.pool(1))112  def stop(self, grace):113    """See base.End.stop for specification."""114    with self._lock:115      if self._cycle is None:116        event = threading.Event()117        event.set()118        return event119      elif not self._cycle.operations:120        event = threading.Event()121        self._cycle.pool.submit(event.set)122        self._cycle.pool.shutdown(wait=False)123        self._cycle = None124        return event125      else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
