Best Python code snippet using autotest_python
test_caching.py
Source:test_caching.py  
...28        self.da = DA.DA('da','title','conn_id','arg1 arg2','some sql')29        # set the da's caching parameters30        self.da.cache_time_ = 1031        self.da.max_cache_ = 232    def _do_query(self,query,t):33        try:34            self.DA.time = DummyTime(t)35            result = self.da._cached_result(DummyDB(),query,1,'conn_id')36        finally:37            self.DA.time = time38        self.assertEqual(result,'result for '+query)39    def _check_mapping(self,expected,actual):40        missing = []41        extra = []42        different = []43        for key,value in expected.items():44            try:45                ai = actual[key]46            except KeyError:47                missing.append(key)48            else:49                if ai!=value:50                    different.append('%r: %r != %r' % (key,value,ai))51        for key in actual.keys():52            try:53                expected[key]54            except KeyError:55                extra.append(key)56        result = []57        if different:58            different.sort()59            result.append("Mismatching, key: (expected != actual):")60            for r in different:61                result.append(r)62        if missing:63            missing.sort()64            result.append("The following keys were missing from actual:")65            for r in missing:66                result.append(repr(r))67        if extra:68            extra.sort()69            result.append("The following extra keys were found in actual:")70            for r in extra:71                result.append(repr(r))72        return result73        74    def _check_cache(self,cache,tcache):75        if self.echo:76            print "cache:"77            pprint(self.da._v_cache[0])78            print "tcache:"79            pprint(self.da._v_cache[1])80            print81        result = []82        r = self._check_mapping(cache,self.da._v_cache[0])83        if r:84            result.append("cache didn't match expected:")85            result.extend(r)86        r = self._check_mapping(tcache,self.da._v_cache[1])87        if r:88            result.append("tcache didn't match expected:")89            result.extend(r)90        if result:91            self.fail('\n\n'+'\n'.join(result))92        93    def test_bad_aquisition(self):94        # checks that something called _v_cache isn't acquired from anywhere95        from ExtensionClass import Base96        class Dummy(Base):97            _v_cache = 'muhahaha'98        obj = Dummy()99        self.da = self.da.__of__(obj)100        del self.da._v_cache101        self._do_query('query',1)102        103    def test_same_query_different_seconds(self):104        # this tests a sequence of requests for the same105        # query, but where the item returned is always in the cache106        self._check_cache({},{})107        for t in range(1,6):108            self._do_query('query',t)109            self._check_cache(110                {('query',1,'conn_id'): (1,'result for query')},111                {1: ('query',1,'conn_id')}112                )113    def test_same_query_same_second(self):114        # this tests a sequence set of requests for the same115        # query, but where the item returned is always in the cache116        # and where the queries all occur in the same second117        self._check_cache({},{})118        for t in range(11,16,1):119            t = float(t)/10120            self._do_query('query',t)121            self._check_cache(122                {('query',1,'conn_id'): (1.1,'result for query')},123                {1.1: ('query',1,'conn_id')}124                )125    def test_different_queries_different_second(self):126        # This tests different queries being fired into the cache127        # in sufficient volume to excercise the purging code128        self._check_cache({},{})129        # one130        self._do_query('query1',1.1)131        self._check_cache(132            {('query1',1,'conn_id'): (1.1,'result for query1')},133            {1.1: ('query1',1,'conn_id')}134            )135        # two136        self._do_query( 'query2',3.2)137        self._check_cache(138            {('query1',1,'conn_id'): (1.1,'result for query1'),139             ('query2',1,'conn_id'): (3.2,'result for query2'),},140            {1.1: ('query1',1,'conn_id'),141             3.2: ('query2',1,'conn_id'),}142            )143        # three - now we drop our first cache entry144        self._do_query('query3',4.3)145        self._check_cache(146            {('query2',1,'conn_id'): (3.2,'result for query2'),147             ('query3',1,'conn_id'): (4.3,'result for query3'),},148            {3.2: ('query2',1,'conn_id'),149             4.3: ('query3',1,'conn_id'),}150            )151        # four - now we drop our second cache entry152        self._do_query('query4',8.4)153        self._check_cache(154            {('query3',1,'conn_id'): (4.3,'result for query3'),155             ('query4',1,'conn_id'): (8.4,'result for query4'),},156            {4.3: ('query3',1,'conn_id'),157             8.4: ('query4',1,'conn_id'),}158            )159        160    def test_different_queries_same_second(self):161        # This tests different queries being fired into the cache162        # in the same second in sufficient quantities to exercise163        # the purging code164        self._check_cache({},{})165        # one166        self._do_query('query1',1.0)167        self._check_cache(168            {('query1',1,'conn_id'): (1.0,'result for query1')},169            {1.0: ('query1',1,'conn_id')}170            )171        # two172        self._do_query( 'query2',1.1)173        self._check_cache(174            {('query1',1,'conn_id'): (1.0,'result for query1'),175             ('query2',1,'conn_id'): (1.1,'result for query2'),},176            {1.0: ('query1',1,'conn_id'),177             1.1: ('query2',1,'conn_id'),}178            )179        # three - now we drop our first cache entry180        self._do_query('query3',1.2)181        self._check_cache(182            {('query2',1,'conn_id'): (1.1,'result for query2'),183             ('query3',1,'conn_id'): (1.2,'result for query3'),},184            {1.1: ('query2',1,'conn_id'),185             1.2: ('query3',1,'conn_id'),}186            )187        # four - now we drop another cache entry188        self._do_query('query4',1.3)189        self._check_cache(190            {('query3',1,'conn_id'): (1.2,'result for query3'),191             ('query4',1,'conn_id'): (1.3,'result for query4'),},192            {1.2: ('query3',1,'conn_id'),193             1.3: ('query4',1,'conn_id'),}194            )195    def test_time_tcache_expires(self):196        # This tests that once the cache purging code is triggered,197        # it will actively hoover out all expired cache entries198        199        # the first query gets cached200        self._do_query('query1',1)201        self._check_cache(202            {('query1',1,'conn_id'): (1,'result for query1')},203            {1: ('query1',1,'conn_id')}204            )205        # the 2nd gets cached, the cache is still smaller than max_cache206        self._do_query('query2',2)207        self._check_cache(208            {('query1',1,'conn_id'): (1,'result for query1'),209             ('query2',1,'conn_id'): (2,'result for query2')},210            {1: ('query1',1,'conn_id'),211             2:('query2',1,'conn_id')}212            )213        # the 3rd trips the max_cache trigger, so both our old queries get214        # dumped because they are past their expiration time215        self._do_query('query',23)216        self._check_cache(217            {('query',1,'conn_id'): (23,'result for query')},218            {23:('query',1,'conn_id')}219            )220    221    def test_time_refreshed_cache(self):222        # This tests that when a cached query is expired when it comes223        # to check for a cached entry for that query, the stale entry is224        # removed and replaced with a fresh entry.225        226        # the first query gets cached227        self._do_query('query1',1)228        self._check_cache(229            {('query1',1,'conn_id'): (1,'result for query1')},230            {1: ('query1',1,'conn_id')}231            )232        # do the same query much later, so new one gets cached233        self._do_query('query1',12)234        self._check_cache(235            {('query1',1,'conn_id'): (12,'result for query1')},236            {12: ('query1',1,'conn_id')}237            )238class DummyDA:239    def __call__(self):240        conn = DummyDB()241        conn.result = ((),())242        return conn243    sql_quote__ = "I don't know what this is."244class Hook:245    conn_num = 1246    247    def __call__(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
