Best Python code snippet using hypothesis
socket_cache_test.py
Source:socket_cache_test.py  
1# Copyright (C) 2011  Internet Systems Consortium, Inc. ("ISC")2#3# Permission to use, copy, modify, and distribute this software for any4# purpose with or without fee is hereby granted, provided that the above5# copyright notice and this permission notice appear in all copies.6#7# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM8# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL9# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL10# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,11# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING12# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,13# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION14# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.15import unittest16import isc.log17import isc.bind10.socket_cache18import isc.bind10.sockcreator19from isc.net.addr import IPAddr20import os21class Test(unittest.TestCase):22    """23    Base for the tests here. It replaces the os.close method.24    """25    def setUp(self):26        self._closes = []27        isc.bind10.socket_cache.os.close = self.__close28    def tearDown(self):29        # This is not very clean solution. But when the test stops30        # to exist, the method must not be used to destroy the31        # object any more. And we can't restore the os.close here32        # as we never work with real sockets here.33        isc.bind10.socket_cache.os.close = lambda fd: None34    def __close(self, fd):35        """36        Just log a close was called.37        """38        self._closes.append(fd)39class SocketTest(Test):40    """41    Test for the Socket class.42    """43    def setUp(self):44        """45        Creates the socket to be tested.46        It also creates other useful test variables.47        """48        Test.setUp(self)49        self.__address = IPAddr("192.0.2.1")50        self.__socket = isc.bind10.socket_cache.Socket('UDP', self.__address,51                                                       1024, 42)52    def test_init(self):53        """54        Checks the intrnals of the cache just after the creation.55        """56        self.assertEqual('UDP', self.__socket.protocol)57        self.assertEqual(self.__address, self.__socket.address)58        self.assertEqual(1024, self.__socket.port)59        self.assertEqual(42, self.__socket.fileno)60        self.assertEqual({}, self.__socket.active_tokens)61        self.assertEqual({}, self.__socket.shares)62        self.assertEqual(set(), self.__socket.waiting_tokens)63    def test_del(self):64        """65        Check it closes the socket when removed.66        """67        # This should make the refcount 0 and call the descructor68        # right away69        self.__socket = None70        self.assertEqual([42], self._closes)71    def test_share_modes(self):72        """73        Test the share mode compatibility check function.74        """75        modes = ['NO', 'SAMEAPP', 'ANY']76        # If there are no shares, it is compatible with everything.77        for mode in modes:78            self.assertTrue(self.__socket.share_compatible(mode, 'anything'))79        # There's an NO already, so it is incompatible with everything.80        self.__socket.shares = {'token': ('NO', 'anything')}81        for mode in modes:82            self.assertFalse(self.__socket.share_compatible(mode, 'anything'))83        # If there's SAMEAPP, it is compatible with ANY and SAMEAPP with the84        # same name.85        self.__socket.shares = {'token': ('SAMEAPP', 'app')}86        self.assertFalse(self.__socket.share_compatible('NO', 'app'))87        self.assertFalse(self.__socket.share_compatible('SAMEAPP',88                                                        'something'))89        self.assertTrue(self.__socket.share_compatible('SAMEAPP', 'app'))90        self.assertTrue(self.__socket.share_compatible('ANY', 'app'))91        self.assertFalse(self.__socket.share_compatible('ANY', 'something'))92        # If there's ANY, then ANY and SAMEAPP with the same name is compatible93        self.__socket.shares = {'token': ('ANY', 'app')}94        self.assertFalse(self.__socket.share_compatible('NO', 'app'))95        self.assertFalse(self.__socket.share_compatible('SAMEAPP',96                                                        'something'))97        self.assertTrue(self.__socket.share_compatible('SAMEAPP', 'app'))98        self.assertTrue(self.__socket.share_compatible('ANY', 'something'))99        # In case there are multiple already inside100        self.__socket.shares = {101            'token': ('ANY', 'app'),102            'another': ('SAMEAPP', 'app')103        }104        self.assertFalse(self.__socket.share_compatible('NO', 'app'))105        self.assertFalse(self.__socket.share_compatible('SAMEAPP',106                                                        'something'))107        self.assertTrue(self.__socket.share_compatible('SAMEAPP', 'app'))108        self.assertFalse(self.__socket.share_compatible('ANY', 'something'))109        self.assertTrue(self.__socket.share_compatible('ANY', 'app'))110        # Invalid inputs are rejected111        self.assertRaises(ValueError, self.__socket.share_compatible, 'bad',112                          'bad')113class SocketCacheTest(Test):114    """115    Some tests for the isc.bind10.socket_cache.Cache.116    This class, as well as being the testcase, pretends to be the117    socket creator so it can hijack all the requests for sockets.118    """119    def setUp(self):120        """121        Creates the cache for tests with us being the socket creator.122        Also creates some more variables for testing.123        """124        Test.setUp(self)125        self.__cache = isc.bind10.socket_cache.Cache(self)126        self.__address = IPAddr("192.0.2.1")127        self.__socket = isc.bind10.socket_cache.Socket('UDP', self.__address,128                                                       1024, 42)129        self.__get_socket_called = False130    def test_init(self):131        """132        Checks the internals of the cache just after the creation.133        """134        self.assertEqual(self, self.__cache._creator)135        self.assertEqual({}, self.__cache._waiting_tokens)136        self.assertEqual({}, self.__cache._active_tokens)137        self.assertEqual({}, self.__cache._active_apps)138        self.assertEqual({}, self.__cache._sockets)139        self.assertEqual(set(), self.__cache._live_tokens)140    def get_socket(self, address, port, socktype):141        """142        Pretend to be a socket creator.143        This expects to be called with the _address, port 1024 and 'UDP'.144        Returns 42 and notes down it was called.145        """146        self.assertEqual(self.__address, address)147        self.assertEqual(1024, port)148        self.assertEqual('UDP', socktype)149        self.__get_socket_called = True150        return 42151    def test_get_token_cached(self):152        """153        Check the behaviour of get_token when the requested socket is already154        cached inside.155        """156        self.__cache._sockets = {157            'UDP': {'192.0.2.1': {1024: self.__socket}}158        }159        token = self.__cache.get_token('UDP', self.__address, 1024, 'ANY',160                                       'test')161        # It didn't call get_socket162        self.assertFalse(self.__get_socket_called)163        # It returned something164        self.assertIsNotNone(token)165        # The token is both in the waiting sockets and the live tokens166        self.assertEqual({token: self.__socket}, self.__cache._waiting_tokens)167        self.assertEqual(set([token]), self.__cache._live_tokens)168        # The token got the new share to block any relevant queries169        self.assertEqual({token: ('ANY', 'test')}, self.__socket.shares)170        # The socket knows the token is waiting in it171        self.assertEqual(set([token]), self.__socket.waiting_tokens)172        # If we request one more, with incompatible share, it is rejected173        self.assertRaises(isc.bind10.socket_cache.ShareError,174                          self.__cache.get_token, 'UDP', self.__address, 1024,175                          'NO', 'test')176        # The internals are not changed, so the same checks177        self.assertEqual({token: self.__socket}, self.__cache._waiting_tokens)178        self.assertEqual(set([token]), self.__cache._live_tokens)179        self.assertEqual({token: ('ANY', 'test')}, self.__socket.shares)180        self.assertEqual(set([token]), self.__socket.waiting_tokens)181    def test_get_token_uncached(self):182        """183        Check a new socket is created when a corresponding one is missing.184        """185        token = self.__cache.get_token('UDP', self.__address, 1024, 'ANY',186                                       'test')187        # The get_socket was called188        self.assertTrue(self.__get_socket_called)189        # It returned something190        self.assertIsNotNone(token)191        # Get the socket and check it looks OK192        socket = self.__cache._waiting_tokens[token]193        self.assertEqual(self.__address, socket.address)194        self.assertEqual(1024, socket.port)195        self.assertEqual(42, socket.fileno)196        self.assertEqual('UDP', socket.protocol)197        # The socket is properly cached198        self.assertEqual({199            'UDP': {'192.0.2.1': {1024: socket}}200        }, self.__cache._sockets)201        # The token is both in the waiting sockets and the live tokens202        self.assertEqual({token: socket}, self.__cache._waiting_tokens)203        self.assertEqual(set([token]), self.__cache._live_tokens)204        # The token got the new share to block any relevant queries205        self.assertEqual({token: ('ANY', 'test')}, socket.shares)206        # The socket knows the token is waiting in it207        self.assertEqual(set([token]), socket.waiting_tokens)208    def test_get_token_excs(self):209        """210        Test that it is handled properly if the socket creator raises211        some exceptions.212        """213        def raiseCreatorError(fatal):214            raise isc.bind10.sockcreator.CreatorError('test error', fatal)215        # First, fatal socket creator errors are passed through216        self.get_socket = lambda addr, port, proto: raiseCreatorError(True)217        self.assertRaises(isc.bind10.sockcreator.CreatorError,218                          self.__cache.get_token, 'UDP', self.__address, 1024,219                          'NO', 'test')220        # And nonfatal are converted to SocketError221        self.get_socket = lambda addr, port, proto: raiseCreatorError(False)222        self.assertRaises(isc.bind10.socket_cache.SocketError,223                          self.__cache.get_token, 'UDP', self.__address, 1024,224                          'NO', 'test')225    def test_get_socket(self):226        """227        Test that we can pickup a socket if we know a token.228        """229        token = "token"230        app = 13231        # No socket prepared there232        self.assertRaises(ValueError, self.__cache.get_socket, token, app)233        # Not changed234        self.assertEqual({}, self.__cache._active_tokens)235        self.assertEqual({}, self.__cache._active_apps)236        self.assertEqual({}, self.__cache._sockets)237        self.assertEqual(set(), self.__cache._live_tokens)238        # Prepare a token there239        self.__socket.waiting_tokens = set([token])240        self.__socket.shares = {token: ('ANY', 'app')}241        self.__cache._waiting_tokens = {token: self.__socket}242        self.__cache._sockets = {'UDP': {'192.0.2.1': {1024: self.__socket}}}243        self.__cache._live_tokens = set([token])244        socket = self.__cache.get_socket(token, app)245        # Received the fileno246        self.assertEqual(42, socket)247        # It moved from waiting to active ones248        self.assertEqual({}, self.__cache._waiting_tokens)249        self.assertEqual({token: self.__socket}, self.__cache._active_tokens)250        self.assertEqual({13: set([token])}, self.__cache._active_apps)251        self.assertEqual(set([token]), self.__cache._live_tokens)252        self.assertEqual(set(), self.__socket.waiting_tokens)253        self.assertEqual({token: 13}, self.__socket.active_tokens)254        # Trying to get it again fails255        self.assertRaises(ValueError, self.__cache.get_socket, token, app)256    def test_drop_application(self):257        """258        Test that a drop_application calls drop_socket on all the sockets259        held by the application.260        """261        sockets = set()262        def drop_socket(token):263            sockets.add(token)264        # Mock the drop_socket so we know it is called265        self.__cache.drop_socket = drop_socket266        self.assertRaises(ValueError, self.__cache.drop_application,267                          13)268        self.assertEqual(set(), sockets)269        # Put the tokens into active_apps. Nothing else should be touched270        # by this call, so leave it alone.271        self.__cache._active_apps = {272            1: set(['t1', 't2']),273            2: set(['t3'])274        }275        self.__cache.drop_application(1)276        # We don't check the _active_apps, as it would be cleaned by277        # drop_socket and we removed it.278        self.assertEqual(set(['t1', 't2']), sockets)279    def test_drop_socket(self):280        """281        Test the drop_socket call. It tests:282        * That a socket that still has something to keep it alive is left alive283          (both waiting and active).284        * If not, it is deleted.285        * All bookkeeping data around are properly removed.286        * Of course the exception.287        """288        self.assertRaises(ValueError, self.__cache.drop_socket, "bad token")289        self.__socket.active_tokens = {'t1': 1}290        self.__socket.waiting_tokens = set(['t2'])291        self.__socket.shares = {'t1': ('ANY', 'app1'), 't2': ('ANY', 'app2')}292        self.__cache._waiting_tokens = {'t2': self.__socket}293        self.__cache._active_tokens = {'t1': self.__socket}294        self.__cache._sockets = {'UDP': {'192.0.2.1': {1024: self.__socket}}}295        self.__cache._live_tokens = set(['t1', 't2'])296        self.__cache._active_apps = {1: set(['t1'])}297        # We can't drop what wasn't picket up yet298        self.assertRaises(ValueError, self.__cache.drop_socket, 't2')299        self.assertEqual({'t1': 1}, self.__socket.active_tokens)300        self.assertEqual(set(['t2']), self.__socket.waiting_tokens)301        self.assertEqual({'t1': ('ANY', 'app1'), 't2': ('ANY', 'app2')},302                         self.__socket.shares)303        self.assertEqual({'t2': self.__socket}, self.__cache._waiting_tokens)304        self.assertEqual({'t1': self.__socket}, self.__cache._active_tokens)305        self.assertEqual({'UDP': {'192.0.2.1': {1024: self.__socket}}},306                         self.__cache._sockets)307        self.assertEqual(set(['t1', 't2']), self.__cache._live_tokens)308        self.assertEqual({1: set(['t1'])}, self.__cache._active_apps)309        self.assertEqual([], self._closes)310        # If we drop this, it survives because it waits for being picked up311        self.__cache.drop_socket('t1')312        self.assertEqual({}, self.__socket.active_tokens)313        self.assertEqual(set(['t2']), self.__socket.waiting_tokens)314        self.assertEqual({'t2': ('ANY', 'app2')}, self.__socket.shares)315        self.assertEqual({}, self.__cache._active_tokens)316        self.assertEqual({'UDP': {'192.0.2.1': {1024: self.__socket}}},317                         self.__cache._sockets)318        self.assertEqual(set(['t2']), self.__cache._live_tokens)319        self.assertEqual({}, self.__cache._active_apps)320        self.assertEqual([], self._closes)321        # Fill it again, now two applications having the same socket322        self.__socket.active_tokens = {'t1': 1, 't2': 2}323        self.__socket.waiting_tokens = set()324        self.__socket.shares = {'t1': ('ANY', 'app1'), 't2': ('ANY', 'app2')}325        self.__cache._waiting_tokens = {}326        self.__cache._active_tokens = {327            't1': self.__socket,328            't2': self.__socket329        }330        self.__cache._live_tokens = set(['t1', 't2', 't3'])331        self.assertEqual([], self._closes)332        # We cheat here little bit, the t3 doesn't exist enywhere else, but333        # we need to check the app isn't removed too soon and it shouldn't334        # matter anywhere else, so we just avoid the tiresome filling in335        self.__cache._active_apps = {1: set(['t1', 't3']), 2: set(['t2'])}336        # Drop it as t1. It should still live.337        self.__cache.drop_socket('t1')338        self.assertEqual({'t2': 2}, self.__socket.active_tokens)339        self.assertEqual(set(), self.__socket.waiting_tokens)340        self.assertEqual({'t2': ('ANY', 'app2')}, self.__socket.shares)341        self.assertEqual({}, self.__cache._waiting_tokens)342        self.assertEqual({'t2': self.__socket}, self.__cache._active_tokens)343        self.assertEqual({'UDP': {'192.0.2.1': {1024: self.__socket}}},344                         self.__cache._sockets)345        self.assertEqual(set(['t3', 't2']), self.__cache._live_tokens)346        self.assertEqual({1: set(['t3']), 2: set(['t2'])},347                         self.__cache._active_apps)348        self.assertEqual([], self._closes)349        # Drop it again, from the other application. It should get removed350        # and closed.351        self.__cache.drop_socket('t2')352        self.assertEqual({}, self.__socket.active_tokens)353        self.assertEqual(set(), self.__socket.waiting_tokens)354        self.assertEqual({}, self.__socket.shares)355        self.assertEqual({}, self.__cache._waiting_tokens)356        self.assertEqual({}, self.__cache._active_tokens)357        self.assertEqual({}, self.__cache._sockets)358        self.assertEqual(set(['t3']), self.__cache._live_tokens)359        self.assertEqual({1: set(['t3'])}, self.__cache._active_apps)360        # The cache doesn't hold the socket. So when we remove it ourself,361        # it should get closed.362        self.__socket = None363        self.assertEqual([42], self._closes)364if __name__ == '__main__':365    isc.log.init("bind10")366    isc.log.resetUnitTestRootLogger()...DossierRequester.py
Source:DossierRequester.py  
1# Python bytecode 2.7 (decompiled from Python 2.7)2# Embedded file name: scripts/client/gui/shared/utils/requesters/DossierRequester.py3import time4from functools import partial5import BigWorld6import constants7import dossiers28import AccountCommands9from adisp import async10from debug_utils import LOG_ERROR11from gui.shared.utils import code2str12from gui.shared.utils.requesters.abstract import AbstractSyncDataRequester13from gui.shared.utils.requesters.common import RequestProcessor14from skeletons.gui.shared.utils.requesters import IDossierRequester15class UserDossier(object):16    __queue = []17    __lastResponseTime = 018    __request = None19    def __init__(self, databaseID):20        self.__cache = {'databaseID': int(databaseID),21         'account': None,22         'vehicles': {},23         'clan': None,24         'hidden': False,25         'available': True,26         'rating': None,27         'rated7x7Seasons': {},28         'ranked': None}29        return30    def __setLastResponseTime(self):31        self.__lastResponseTime = time.time()32    def __nextRequestTime(self):33        t = constants.REQUEST_COOLDOWN.PLAYER_DOSSIER - (time.time() - self.__lastResponseTime)34        return t if t > 0 else 035    def __processQueue(self):36        if self.__request is not None:37            return38        elif self.__queue:39            self.__request = RequestProcessor(self.__nextRequestTime(), self.__queue.pop())40            return41        else:42            return43    def __requestPlayerInfo(self, callback):44        def proxyCallback(value):45            if value is not None and len(value) > 1:46                self.__cache['databaseID'] = value[0]47                self.__cache['account'] = dossiers2.getAccountDossierDescr(value[1])48                self.__cache['clan'] = value[2]49                self.__cache['rating'] = value[3]50                self.__cache['rated7x7Seasons'] = seasons = {}51                self.__cache['ranked'] = value[5]52                self.__cache['dogTag'] = value[6]53                self.__cache['battleRoyaleStats'] = value[7]54                for sID, d in (value[4] or {}).iteritems():55                    seasons[sID] = dossiers2.getRated7x7DossierDescr(d)56            callback(self.__cache['account'])57            return58        self.__queue.append(lambda : BigWorld.player().requestPlayerInfo(self.__cache['databaseID'], partial(lambda c, code, databaseID, dossier, clanID, clanInfo, gRating, eSportSeasons, ranked, dogTag, br: self.__processValueResponse(c, code, (databaseID,59         dossier,60         (clanID, clanInfo),61         gRating,62         eSportSeasons,63         ranked,64         dogTag,65         br)), proxyCallback)))66        self.__processQueue()67    def __requestAccountDossier(self, callback):68        def proxyCallback(dossier):69            self.__cache['account'] = dossiers2.getAccountDossierDescr(dossier)70            callback(self.__cache['account'])71        self.__queue.append(lambda : BigWorld.player().requestAccountDossier(self.__cache['databaseID'], partial(self.__processValueResponse, proxyCallback)))72        self.__processQueue()73    def __requestVehicleDossier(self, vehCompDescr, callback):74        def proxyCallback(dossier):75            self.__cache['vehicles'][vehCompDescr] = dossiers2.getVehicleDossierDescr(dossier)76            callback(self.__cache['vehicles'][vehCompDescr])77        self.__queue.append(lambda : BigWorld.player().requestVehicleDossier(self.__cache['databaseID'], vehCompDescr, partial(self.__processValueResponse, proxyCallback)))78        self.__processQueue()79    def __requestClanInfo(self, callback):80        self.__queue.append(lambda : BigWorld.player().requestPlayerClanInfo(self.__cache['databaseID'], partial(lambda c, code, str, clanDBID, clanInfo: self.__processValueResponse(c, code, (clanDBID, clanInfo)), callback)))81        self.__processQueue()82    def __processValueResponse(self, callback, code, value):83        self.__setLastResponseTime()84        self.__request = None85        if code < 0:86            LOG_ERROR('Error while server request (code=%s): %s' % (code, code2str(code)))87            if code == AccountCommands.RES_HIDDEN_DOSSIER:88                self.__cache['hidden'] = True89            elif code == AccountCommands.RES_CENTER_DISCONNECTED:90                self.__cache['available'] = False91            callback('')92        else:93            callback(value)94        self.__processQueue()95        return96    @async97    def getAccountDossier(self, callback):98        if not self.isValid:99            callback(None)100        if self.__cache.get('account') is None:101            self.__requestPlayerInfo(callback)102            return103        else:104            callback(self.__cache['account'])105            return106    @async107    def getClanInfo(self, callback):108        if not self.isValid:109            callback(None)110        if self.__cache.get('clan') is None:111            self.__requestClanInfo(callback)112            return113        else:114            callback(self.__cache['clan'])115            return116    @async117    def getRated7x7Seasons(self, callback):118        if not self.isValid:119            callback({})120        if self.__cache.get('rated7x7Seasons') is None:121            self.__requestPlayerInfo(lambda accDossier: callback(self.__cache['rated7x7Seasons']))122            return123        else:124            callback(self.__cache['rated7x7Seasons'])125            return126    @async127    def getRankedInfo(self, callback):128        if not self.isValid:129            callback({})130        if self.__cache.get('ranked') is None:131            self.__requestPlayerInfo(lambda accDossier: callback(self.__cache['ranked']))132            return133        else:134            callback(self.__cache['ranked'])135            return136    @async137    def getGlobalRating(self, callback):138        if not self.isValid:139            callback(None)140        if self.__cache.get('rating') is None:141            self.__requestPlayerInfo(lambda accDossier: callback(self.__cache['rating']))142            return143        else:144            callback(self.__cache['rating'])145            return146    @async147    def getVehicleDossier(self, vehCompDescr, callback):148        if not self.isValid:149            callback(None)150        if self.__cache.get('vehicles', {}).get(vehCompDescr, None) is None:151            self.__requestVehicleDossier(vehCompDescr, callback)152            return153        else:154            callback(self.__cache['vehicles'][vehCompDescr])155            return156    @async157    def getDogTag(self, callback):158        if not self.isValid:159            callback(None)160        if self.__cache.get('dogTag') is None:161            self.__requestPlayerInfo(callback)162            return163        else:164            callback(self.__cache['dogTag'])165            return166    @async167    def getBattleRoyaleStats(self, callback):168        if not self.isValid:169            callback({})170        if self.__cache.get('battleRoyaleStats') is None:171            self.__requestPlayerInfo(lambda accDossier: callback(self.__cache['battleRoyaleStats']))172            return173        else:174            callback(self.__cache['battleRoyaleStats'])175            return176    @property177    def isHidden(self):178        return self.__cache.get('hidden', False)179    @property180    def isAvailable(self):181        return self.__cache.get('available', False)182    @property183    def isValid(self):184        return not self.isHidden and self.isAvailable185class DossierRequester(AbstractSyncDataRequester, IDossierRequester):186    def __init__(self):187        super(DossierRequester, self).__init__()188        self.__users = {}189    @async190    def _requestCache(self, callback):191        BigWorld.player().dossierCache.getCache(lambda resID, value: self._response(resID, value, callback))192    def getVehicleDossier(self, vehTypeCompDescr):193        return self.getCacheValue((constants.DOSSIER_TYPE.VEHICLE, vehTypeCompDescr), (0, ''))[1]194    def getVehDossiersIterator(self):195        for (dossierType, vehIntCD), records in self._data.iteritems():196            if dossierType == constants.DOSSIER_TYPE.VEHICLE:197                yield (vehIntCD, records[1])198    def getUserDossierRequester(self, databaseID):199        databaseID = int(databaseID)200        return self.__users.setdefault(databaseID, UserDossier(databaseID))201    def closeUserDossier(self, databaseID):202        if databaseID in self.__users:203            del self.__users[databaseID]204    def onCenterIsLongDisconnected(self, isLongDisconnected):205        if isLongDisconnected:206            return...problem_1_test.py
Source:problem_1_test.py  
1import unittest2from Project_2.problem_1.problem_1 import LRUCache3class LRUCacheTest(unittest.TestCase):4    def setUp(self):5        self.__cache = LRUCache()6        self.__tmp = self.__cache.get_dll()7    def test_set_value_cache(self):8        self.assertTrue(self.__cache.set(1, "Yadira"))9        # Set on top. Sets "Edgar" on top10        self.assertTrue(self.__cache.set(9, "Edgar"))11        self.assertEqual(self.__tmp.to_list(), ["Edgar", "Yadira"])12        # Prints cache["Edgar", "Yadira"]13        print(self.__tmp.to_list())14    def test_set_value_existing_key(self):15        self.__cache.set(2, 2)16        self.__cache.set(1, 1)17        self.__cache.set(3, 3)18        self.__cache.set(1, 10)19        print(self.__cache.get(1))20        # should return 1021        print(self.__cache.get(2))22        # should return 223        self.__cache.set(1, "Carmen")24        print(self.__cache.get(1))25    def test_max_capacity_cache(self):26        self.__cache.set(1, 1)27        self.__cache.set(2, 2)28        self.__cache.set(3, 3)29        self.__cache.set(4, 4)30        print(self.__cache.get(1))31        # returns 132        print(self.__cache.get(2))33        # returns 234        print(self.__cache.get(9))35        # returns -1 because 9 is not present in the cache36        self.__cache.set(5, 5)37        # Max capacity. Remove LRU to add 638        self.__cache.set(6, 6)39        # returns 640        print(self.__cache.get(6))41        # prints capacity 542        print(self.__cache.number_elements)43        # should return -144        print(self.__cache.get(3))45    def test_set_none(self):46        # Values as None, asserts False47        self.assertFalse(self.__cache.set(None, None))48        # return False for None values49        print(self.__cache.set(None, None))50        self.assertFalse(self.__cache.set(0, None))51        # return False, if at least one value is None52        print(self.__cache.set(0, None))53        self.assertFalse(self.__cache.set(None, 0))54        # return False, if at least one value is None55        print(self.__cache.set(None, 0))56    def test_set_zero_capacity(self):57        new_cache = LRUCache(0)58        self.assertEqual(new_cache.set(1, 1), "Can't perform operations on 0 capacity cache")59        print(new_cache.set(1, 1))60    def test_get_existing_value(self):61        self.__cache.set(1, "Yadira")62        self.__cache.set(9, "Edgar")63        self.assertEqual(self.__cache.get(1), "Yadira")64        # prints Yadira65        print(self.__cache.get(1))66        # Setting existing value to top67        self.__cache.get(1)68        self.assertEqual(self.__tmp.to_list(), ["Yadira", "Edgar"])69        # Existing value "Yadira" on top of cache. Prints ["Yadira", "Edgar"]70        print(self.__cache.get_dll().to_list())71    def test_get_non_existing_value(self):72        self.__cache.set(1, 1)73        self.__cache.set(2, 2)74        self.assertEqual(self.__cache.get(10), -1)75        # Prints -1 for a non existing value76        print(self.__cache.get(10))77        print(self.__cache.get(20))78    def test_get_none(self):79        # Key is None80        self.assertFalse(self.__cache.get(None))81        # Print False when key is None82        print(self.__cache.get(None))83    def test_get_value_zero_capacity_cache(self):84        new_cache = LRUCache(0)85        new_cache.set(1, 1)86        # prints -1, no key will be found on 0 capacity cache87        self.assertEqual(new_cache.get(1), -1)88        print(new_cache.get(1))89if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
