Best Python code snippet using localstack_python
yosun.py
Source:yosun.py  
...20    def __call__(self, *args, **kwargs):21        try:22            self._callback(*args, **kwargs)23        except Exception as e:24            self._on_exception(e)25    def _default_on_exception(self, exception):26        raise exception27class Subscription(object):28    def __init__(self, connection, exchange, binding_key, key_prefix='', reconnect_timeout=10, on_exception=None):29        self._connection = connection30        self._exchange = exchange31        self._binding_key = binding_key32        self._key_prefix = key_prefix33        self._reconnect_timeout = reconnect_timeout34        if on_exception is not None:35            if not isroutine(on_exception):36                raise ValueError('on_exception must be a function or method')37            self._on_exception = on_exception38        else:39            self._on_exception = self._default_on_exception40        self._handlers = defaultdict(list)41        self._handlers_for_all = []42        self._events = {}43        self._event_any = Event()44        self._running = False45        self._thread = None46        self.start()47    def __del__(self):48        self._running = False49        try:50            self._thread.join()51        except Exception as e:52            pass53    def _default_on_exception(self, exception):54        raise exception55    @property56    def is_alive(self):57        return self._thread is not None and self._thread.is_alive()58    def start(self):59        if not self._running:60            self._running = True61            self._thread = Thread(target=self._consume)62            self._thread.start()63    def stop(self):64        self._running = False65    def on(self, routing_key, callback, on_exception=None):66        # register a callback for the given routing key67        routing_key = '{0}{1}'.format(self._key_prefix, routing_key)68        self._handlers[routing_key].append(Handler(callback, on_exception))69        return self70    def all(self, callback, on_exception=None):71        # register a callback for all routing keys72        self._handlers_for_all.append(Handler(callback, on_exception))73        return self74    def wait(self, routing_key, timeout=None):75        # register an event wait76        routing_key = '{0}{1}'.format(self._key_prefix, routing_key)77        if routing_key not in self._events:78            self._events[routing_key] = Event()79        return self._events[routing_key].wait(timeout)80    def wait_any(self, timeout=None):81        return self._event_any.wait(timeout)82    def _on_message(self, body, message):83        if not self._running:84            return85        routing_key = message.delivery_info['routing_key']86        if routing_key in self._events:87            self._events[routing_key].set()88            self._events[routing_key].clear()89        self._event_any.set()90        self._event_any.clear()91        for handler in self._handlers[routing_key]:92            try:93                handler(body, message)94            except Exception as e:95                self._on_exception(e)96        for handler in self._handlers_for_all:97            try:98                handler(body, message)99            except Exception as e:100                self._on_exception(e)101        try:102            message.ack()103        except MessageStateError:104            pass105    def _consume(self):106        routing_key = '{0}{1}'.format(self._key_prefix, self._binding_key)107        while self._running:108            try:109                with connections[self._connection].acquire(block=True) as conn:110                    queue = Queue(exchange=self._exchange, routing_key=routing_key, channel=conn,111                                  durable=False, exclusive=True, auto_delete=True)112                    with Consumer(conn, queue, callbacks=[self._on_message]):113                        try:114                            while self._running:...utils.py
Source:utils.py  
...20    def __call__(self, *args, **kwargs):21        try:22            self._callback(*args, **kwargs)23        except Exception as e:24            self._on_exception(e)25    def _default_on_exception(self, exception):26        raise exception27class Subscription(object):28    def __init__(self, connection, exchange, binding_key, key_prefix='', reconnect_timeout=10, on_exception=None):29        self._connection = connection30        self._exchange = exchange31        self._binding_key = binding_key32        self._key_prefix = key_prefix33        self._reconnect_timeout = reconnect_timeout34        if on_exception is not None:35            if not isroutine(on_exception):36                raise ValueError('on_exception must be a function or method')37            self._on_exception = on_exception38        else:39            self._on_exception = self._default_on_exception40        self._handlers = defaultdict(list)41        self._handlers_for_all = []42        self._events = {}43        self._event_any = Event()44        self._running = False45        self._thread = None46        self.start()47    def __del__(self):48        self._running = False49        try:50            self._thread.join()51        except Exception as e:52            pass53    def _default_on_exception(self, exception):54        raise exception55    @property56    def is_alive(self):57        return self._thread is not None and self._thread.is_alive()58    def start(self):59        if not self._running:60            self._running = True61            self._thread = Thread(target=self._consume)62            self._thread.start()63    def stop(self):64        self._running = False65    def on(self, routing_key, callback, on_exception=None):66        # register a callback for the given routing key67        routing_key = '{0}{1}'.format(self._key_prefix, routing_key)68        self._handlers[routing_key].append(Handler(callback, on_exception))69        return self70    def all(self, callback, on_exception=None):71        # register a callback for all routing keys72        self._handlers_for_all.append(Handler(callback, on_exception))73        return self74    def wait(self, routing_key, timeout=None):75        # register an event wait76        routing_key = '{0}{1}'.format(self._key_prefix, routing_key)77        if routing_key not in self._events:78            self._events[routing_key] = Event()79        return self._events[routing_key].wait(timeout)80    def wait_any(self, timeout=None):81        return self._event_any.wait(timeout)82    def _on_message(self, body, message):83        if not self._running:84            return85        routing_key = message.delivery_info['routing_key']86        if routing_key in self._events:87            self._events[routing_key].set()88            self._events[routing_key].clear()89        self._event_any.set()90        self._event_any.clear()91        for handler in self._handlers[routing_key]:92            try:93                handler(body)94            except Exception as e:95                self._on_exception(e)96        for handler in self._handlers_for_all:97            try:98                handler(body)99            except Exception as e:100                self._on_exception(e)101        try:102            message.ack()103        except MessageStateError:104            pass105    def _consume(self):106        routing_key = '{0}{1}'.format(self._key_prefix, self._binding_key)107        while self._running:108            try:109                with connections[self._connection].acquire(block=True) as conn:110                    queue = Queue(exchange=self._exchange, routing_key=routing_key, channel=conn,111                                  durable=False, exclusive=True, auto_delete=True)112                    with Consumer(conn, queue, callbacks=[self._on_message]):113                        try:114                            while self._running:...retry.py
Source:retry.py  
...20            except catch_exceptions_types as ex:21                attempt += 122                if on_exception:23                    if iscoroutinefunction(on_exception):24                        await on_exception(ex, attempt)25                    else:26                        on_exception(ex, attempt)27                if attempt > times:28                    raise29                if delay is not None:30                    await asyncio.sleep(delay, loop=loop)31    return async_wrapper32def retry(times: int = 3,33          delay: Optional[float] = 0.1,34          catch_exceptions_types: CatchException = None,35          on_exception: OnException = None,36          loop=None):37    if catch_exceptions_types is None:38        catch_exceptions_types = Exception39    def retry_decorator(fn):40        if iscoroutinefunction(fn):41            return _get_retry_async_wrapper(fn,42                                            times,43                                            delay,44                                            catch_exceptions_types,45                                            on_exception,46                                            loop)47        @wraps(fn)48        def wrapper(*args, **kwargs):49            attempt = 050            while True:51                try:52                    return fn(*args, **kwargs)53                except catch_exceptions_types as ex:54                    attempt += 155                    if on_exception:56                        on_exception(ex, attempt)57                    if attempt > times:58                        raise59                    if delay is not None:60                        time.sleep(delay)61        return wrapper...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
