Best Python code snippet using avocado_python
distributed_cache.py
Source:distributed_cache.py  
1#!/usr/bin/env python2# -*- encoding: utf-8 -*-3from typing import Optional, Dict, Union, List4from dask.distributed import Client, Variable, Lock, ActorFuture, TimeoutError5from src.kv_cache import KVGraphCache, KVCacheValueType6class DistributedCache:7    def __init__(self, config: Optional[Dict[str, Union[str, bool]]], dask_client=None):8        """9        Class that implements a simple distributed key value cache that takes care of persisting and restoring to database10        :param config: keys required:\n11                                db_username: the username to login to db with12                                db_system: the db type - arango-db13                                db_name: the name of the database14                                db_config_file_path: the path with the database configuration15                                db_queries_file_path: the path with the query definitions16                                named_query_collection - postfix for a store's named query collection17                                scheduler_address - the address of the Dask Cluster Scheduler18        """19        self.config = config20        if dask_client is None:21            self.client = Client(config['scheduler_address'])22        else:23            self.client = dask_client24    @staticmethod25    def lock_store(store_name: str) -> Lock:26        """27        method to create a distributed lock on a complete store28        :param store_name: the store to lock29        :return: lock object to use within a with statement30        """31        distributed_store_name = 'KVCache:{}'.format(store_name)32        return Lock(distributed_store_name)33    @staticmethod34    def lock_key(store_name: str, key: str) -> Lock:35        """36        method to create a distributed lock on a key within a store37        :param store_name: the store to lock38        :param key: the key in the store to lock39        :return: lock object to use within a with statement40        """41        distributed_store_name_key = 'KVCache:{}:{}'.format(store_name, key)42        return Lock(distributed_store_name_key)43    def create_distributed_store(self, store_name: str, key: Optional[Union[List[str], str]] = None, restore: bool = True):44        """45        method to initialise a store and restore46        :param store_name: the store_name47        :param key: a key or list of keys to restore48        :param restore: If true then restore from database49        :return: True if restored else false50        """51        store_lock = self.lock_store(store_name=store_name)52        store_lock.acquire()53        # create a distributed variable with the kv_cache object54        #55        distributed_store_name = 'KVCache_{}'.format(store_name)56        # flag indicating if restored from DB57        #58        result = False59        try:60            # try to get the cache61            #62            Variable(distributed_store_name).get(timeout=0.1)63        except TimeoutError:64            # the distributed variable does not exist so create it65            #66            cache_future = self.client.submit(KVGraphCache, self.config, actor=True)67            # create the distributed variable and store the new cache actor68            #69            Variable(distributed_store_name).set(cache_future)70            # only restore if not in cache and asked to71            #72            if restore:73                cache = cache_future.result()74                cache.restore(store_name=store_name, key=key, include_history=False)75                result = True76        store_lock.release()77        return result78    def set_kv(self, store_name: str, key: str, value: KVCacheValueType, set_update_flag: bool = True, persist: bool = True, keep_cache: bool = True, lock_cache: bool = True) -> ActorFuture:79        """80        method to add a key value pair to a store. If key already exists then will overwrite81        :param store_name: the store name82        :param key: the unique key within the store83        :param value: the data to save84        :param set_update_flag: If true then the update flag will be set85        :param persist: If true the changes will be persisted in database86        :param lock_cache: If True the cache will be locked before accessing. If False it assumes a lock has already been acquired87        :param keep_cache: If False then cache will be deleted after being persisted88        :return: ActorFuture with True if success else False89        """90        distributed_store_name = 'KVCache_{}'.format(store_name)91        try:92            dist_cache_var = Variable(distributed_store_name).get(timeout=0.1)93            cache = dist_cache_var.result()94            if lock_cache:95                cache_lock = self.lock_key(store_name=store_name, key=key)96                cache_lock.acquire()97            else:98                cache_lock = None99            result = cache.set_kv(store_name=store_name, key=key, value=value, set_update_flag=set_update_flag)100            if persist:101                result = cache.persist(store_name=store_name, keep_cache=keep_cache)102            if cache_lock is not None:103                cache_lock.release()104        except TimeoutError:105            result = None106        return result107    def del_kv(self, store_name: str, key: Optional[str] = None, persist=True, lock_cache: bool = True) -> ActorFuture:108        """109        method to delete a key within a store or the whole store. If the store/key has been persisted then the delete will be propagated when persist() is called110        :param store_name: the store name111        :param key: the key to delete112        :param persist: If true the changes will be persisted in database113        :param lock_cache: If True the cache will be locked before accessing. If False it assumes a lock has already been acquired114        :return: ActorFuture with True if success else False115        """116        distributed_store_name = 'KVCache_{}'.format(store_name)117        try:118            dist_cache_var = Variable(distributed_store_name).get(timeout=0.1)119            cache = dist_cache_var.result()120            if lock_cache:121                cache_lock = self.lock_key(store_name=store_name, key=key)122                cache_lock.acquire()123            else:124                cache_lock = None125            result = cache.del_kv(store_name=store_name, key=key, delete_from_db=persist)126            if persist:127                result = cache.persist(store_name=store_name)128            if cache_lock is not None:129                cache_lock.release()130        except TimeoutError:131            result = None132        return result133    def get_kv(self, store_name: str, key: Optional[str] = None, lock_cache: bool = True) -> ActorFuture:134        """135        method to retrieve a stored value136        :param store_name: the store name137        :param key: the key within the store138        :param lock_cache: If True the cache will be locked before accessing. If False it assumes a lock has already been acquired139        :return: ActorFuture with either KVCacheValueType or None if it doesnt exist140        """141        distributed_store_name = 'KVCache_{}'.format(store_name)142        try:143            dist_cache_var = Variable(distributed_store_name).get(timeout=0.1)144            cache = dist_cache_var.result()145            if lock_cache:146                cache_lock = self.lock_key(store_name=store_name, key=key)147                cache_lock.acquire()148            else:149                cache_lock = None150            result = cache.get_kv(store_name=store_name, key=key)151            if cache_lock is not None:152                cache_lock.release()153        except TimeoutError:154            result = None155        return result156    def restore(self, store_name, include_history: bool = False, lock_cache: bool = True) -> bool:157        """158        method to restore a store_name and optionally specific key159        :param store_name: the store_name to restore160        :param include_history: If True restore all history161        :param lock_cache: If True the cache will be locked before accessing. If False it assumes a lock has already been acquired162        :return: ActorFuture with True if restored else False163        """164        distributed_store_name = 'KVCache_{}'.format(store_name)165        try:166            dist_cache_var = Variable(distributed_store_name).get(timeout=0.1)167            cache = dist_cache_var.result()168            if lock_cache:169                cache_lock = self.lock_store(store_name=store_name)170                cache_lock.acquire()171            else:172                cache_lock = None173            result = cache.restore(store_name=store_name, include_history=include_history)174            if cache_lock is not None:175                cache_lock.release()176        except TimeoutError:177            result = None...cache.py
Source:cache.py  
1from threading import Lock2import time3cache_lock = Lock()4encounter_cache = {}5def get_cached_count():6    return len(encounter_cache)7def get_cached_encounter(encounter_id):8    return encounter_cache.get(encounter_id, False)9def cache_encounter(encounter_id, encounter_data):10    cache_lock.acquire()11    encounter_cache[encounter_id] = encounter_data12    cache_lock.release()13def cleanup_cache():14    # Remove all entries from encounter cache older than 1 hour.15    now = time.time()16    cache_lock.acquire()17    num_deleted = 018    for encounter_id in encounter_cache.keys():19        encounter = encounter_cache[encounter_id]20        if now - encounter['encountered_time'] > 60 * 60:21            del encounter_cache[encounter_id]22            num_deleted += 123    cache_lock.release()...ledger.py
Source:ledger.py  
1from tools.threads.readwritelock import ReadWriteLock234class Ledger:5    def __init__(self, update_trigger, ledger_source):6        self.ledger_source = ledger_source7        self.update_trigger = update_trigger8        self.cache = {}910        self.update_trigger.subscribe(self.update)11        self.update_trigger.prime()1213        self.cache_lock = ReadWriteLock()1415    def update(self):1617        data = self.ledger_source.pull()1819        self.cache_lock.acquire_write()20        self.cache = data21        self.cache_lock.release_write()2223    def get(self):2425        self.cache_lock.acquire_read()26        data = self.cache27        self.cache_lock.release_read()28
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
