Best Python code snippet using autotest_python
redis_cache.py
Source:redis_cache.py  
...30    global check_deadline31    logging.warning("Redis server up")32    server_alive = True33    check_deadline = -134def check_server_alive() -> bool:35    global conn36    global server_alive37    global check_deadline38    if server_alive:39        return True40    if check_deadline < time.time():41        try:42            conn.ping()43            server_up()44            return True45        except:46            server_down()47            return False48def gen_hash(input_str):49    return str(hashlib.sha256(input_str.encode('utf-8')).hexdigest())50def generate_cache_key(input_dict):51    cache_key = key_prefix + ":" + gen_hash(json.dumps(input_dict))52    return cache_key53def cache_get(key):54    global conn55    try:56        cache_key = generate_cache_key(key)57        response = conn.get(cache_key)58        if response:59            decoded_response = json.loads(response)60            return decoded_response61    except redis.ConnectionError as e:62        server_down()63        logging.error("Redis server down: cache disabled in cache_get!")64    except json.JSONDecodeError:65        logging.error("Redis server corrupted response!")66    except Exception as e:67        logging.error("Redis server error: " + str(e))68def cache_get_protected(key):69    global server_alive70    if server_alive:71        return cache_get(key)72    else:73        return None74def cache_set_protected(key, value):75    global server_alive76    if server_alive:77        cache_set(key, value)78def cache_set(key, value):79    global conn80    global key_expire81    try:82        cache_key = generate_cache_key(key)83        value_json = u.to_json(value)84        conn.set(cache_key, value_json)85        conn.expire(cache_key, key_expire)86    except redis.ConnectionError as e:87        server_down()88        logging.error("Redis server down: cache disabled in cache_get!")89    except json.JSONDecodeError:90        logging.error("Redis key cannot be Json encoded! " + str(key))91    except Exception as e:92        server_down()93        logging.error("Redis cache disabled in cache_set! + Exception: " + str(e))94# decorator95def cache(**decorator_kwargs):96    global server_alive97    global key_prefix98    def decorator(func):99        @functools.wraps(func)100        def wrapper(*func_args, **func_kwargs):101            if not check_server_alive():102                return func(*func_args, **func_kwargs)103            cache_key = {'func': func.__globals__['__file__'] + '::' + func.__name__,104                         'func_args': str(func_args),105                         'func_kwargs': str(func_kwargs),106                         'decorator_kwargs': str(decorator_kwargs)107                         }108            response_from_cache = cache_get(cache_key)109            if response_from_cache:110                logging.debug(f"Redis returned object from cache for key ({cache_key}):" + str(response_from_cache))111                return response_from_cache112            else:113                response = func(*func_args, **func_kwargs)114                logging.debug(f"Redis returned object from wrapped function for key ({cache_key}): " + str(response))115                cache_set(cache_key, response)...gtornado.py
Source:gtornado.py  
1# -*- coding: utf-8 -2#3# This file is part of gunicorn released under the MIT license.4# See the NOTICE for more information.5import copy6import os7import sys8try:9    import tornado.web10except ImportError:11    raise RuntimeError("You need tornado installed to use this worker.")12import tornado.httpserver13from tornado.ioloop import IOLoop, PeriodicCallback14from tornado.wsgi import WSGIContainer15from gunicorn.workers.base import Worker16from gunicorn import __version__ as gversion17class TornadoWorker(Worker):18    @classmethod19    def setup(cls):20        web = sys.modules.pop("tornado.web")21        old_clear = web.RequestHandler.clear22        def clear(self):23            old_clear(self)24            if not "Gunicorn" in self._headers["Server"]:25                self._headers["Server"] += " (Gunicorn/%s)" % gversion26        web.RequestHandler.clear = clear27        sys.modules["tornado.web"] = web28    def handle_exit(self, sig, frame):29        if self.alive:30            super(TornadoWorker, self).handle_exit(sig, frame)31    def handle_request(self):32        self.nr += 133        if self.alive and self.nr >= self.max_requests:34            self.log.info("Autorestarting worker after current request.")35            self.alive = False36    def watchdog(self):37        if self.alive:38            self.notify()39        if self.ppid != os.getppid():40            self.log.info("Parent changed, shutting down: %s", self)41            self.alive = False42    def heartbeat(self):43        if not self.alive:44            if self.server_alive:45                if hasattr(self, 'server'):46                    try:47                        self.server.stop()48                    except Exception:49                        pass50                self.server_alive = False51            else:52                if not self.ioloop._callbacks:53                    self.ioloop.stop()54    def run(self):55        self.ioloop = IOLoop.instance()56        self.alive = True57        self.server_alive = False58        PeriodicCallback(self.watchdog, 1000, io_loop=self.ioloop).start()59        PeriodicCallback(self.heartbeat, 1000, io_loop=self.ioloop).start()60        # Assume the app is a WSGI callable if its not an61        # instance of tornado.web.Application or is an62        # instance of tornado.wsgi.WSGIApplication63        app = self.wsgi64        if not isinstance(app, tornado.web.Application) or \65           isinstance(app, tornado.wsgi.WSGIApplication):66            app = WSGIContainer(app)67        # Monkey-patching HTTPConnection.finish to count the68        # number of requests being handled by Tornado. This69        # will help gunicorn shutdown the worker if max_requests70        # is exceeded.71        httpserver = sys.modules["tornado.httpserver"]72        if hasattr(httpserver, 'HTTPConnection'):73            old_connection_finish = httpserver.HTTPConnection.finish74            def finish(other):75                self.handle_request()76                old_connection_finish(other)77            httpserver.HTTPConnection.finish = finish78            sys.modules["tornado.httpserver"] = httpserver79            server_class = tornado.httpserver.HTTPServer80        else:81            class _HTTPServer(tornado.httpserver.HTTPServer):82                def on_close(instance, server_conn):83                    self.handle_request()84                    super(_HTTPServer, instance).on_close(server_conn)85            server_class = _HTTPServer86        if self.cfg.is_ssl:87             _ssl_opt = copy.deepcopy(self.cfg.ssl_options)88             # tornado refuses initialization if ssl_options contains following89             # options90             del _ssl_opt["do_handshake_on_connect"]91             del _ssl_opt["suppress_ragged_eofs"]92             server = server_class(app, io_loop=self.ioloop,93                    ssl_options=_ssl_opt)94        else:95            server = server_class(app, io_loop=self.ioloop)96        self.server = server97        self.server_alive = True98        for s in self.sockets:99            s.setblocking(0)100            if hasattr(server, "add_socket"):  # tornado > 2.0101                server.add_socket(s)102            elif hasattr(server, "_sockets"):  # tornado 2.0103                server._sockets[s.fileno()] = s104        server.no_keep_alive = self.cfg.keepalive <= 0105        server.start(num_processes=1)...Broadcaster.py
Source:Broadcaster.py  
1import asyncio2import threading3import websockets4class Broadcaster(threading.Thread):5    clients = []6    board = []7    server_alive = True8    server_task = None9    stop_event = threading.Event()10    def run(self)-> None:11        asyncio.run(self.start_server())12    async def start_server(self):13        stop = asyncio.get_event_loop().run_in_executor(None, self.stop_event.wait)14        async with websockets.serve(self.handler, "localhost", 8765):15            await stop16    def setBoard(self, board):17        self.board = board18    async def handler(self, websocket):19        self.clients.append(websocket)20        while self.server_alive:21            message = await websocket.recv()22            if not self.server_alive:23                raise Exception("Server is now close")24            if message == 'retrieveGrid':25                await websocket.send("GRID:"+','.join(str(i) for i in self.board))26        # async for message in websocket:27        #     await websocket.send(message)28    def close(self):29        self.server_alive = False30        self.stop_event.set()31    async def broadcast(self, state):32        for ws in self.clients:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
