Best Python code snippet using selenium-respectful_python
test_respectful_requester.py
Source:test_respectful_requester.py  
...68    assert rr._realm_redis_key("TEST ÃÃÃ") == "%s:REALMS:TEST ÃÃÃ" % rr.redis_prefix69def test_the_instance_should_be_able_to_register_a_realm():70    rr = RespectfulRequester()71    rr.register_realm("TEST123", max_requests=100, timespan=300)72    assert rr.realm_max_requests("TEST123") == 10073    assert rr.realm_timespan("TEST123") == 30074    assert rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST123")75    rr.unregister_realm("TEST123")76def test_the_instance_should_be_able_to_register_multiple_realms():77    rr = RespectfulRequester()78    rr.register_realm("TEST123", max_requests=100, timespan=300)79    realm_tuples = [80        ["TEST123", 100, 300],81        ["TEST234", 200, 600],82        ["TEST345", 300, 900],83    ]84    rr.register_realms(realm_tuples)85    assert rr.realm_max_requests("TEST123") == 10086    assert rr.realm_timespan("TEST123") == 30087    assert rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST123")88    assert rr.realm_max_requests("TEST234") == 20089    assert rr.realm_timespan("TEST234") == 60090    assert rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST234")91    assert rr.realm_max_requests("TEST345") == 30092    assert rr.realm_timespan("TEST345") == 90093    assert rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST345")94    rr.unregister_realm("TEST123")95    rr.unregister_realm("TEST234")96    rr.unregister_realm("TEST345")97def test_the_instance_should_not_overwrite_when_registering_a_realm():98    rr = RespectfulRequester()99    rr.register_realm("TEST123", max_requests=100, timespan=300)100    rr.register_realm("TEST123", max_requests=1000, timespan=3000)101    assert rr.realm_max_requests("TEST123") == 100102    assert rr.realm_timespan("TEST123") == 300103    rr.unregister_realm("TEST123")104def test_the_instance_should_be_able_to_update_a_registered_realm():105    rr = RespectfulRequester()106    rr.register_realm("TEST123", max_requests=100, timespan=300)107    rr.update_realm("TEST123", max_requests=1000, timespan=3000)108    assert rr.realm_max_requests("TEST123") == 1000109    assert rr.realm_timespan("TEST123") == 3000110    rr.unregister_realm("TEST123")111def test_the_instance_should_be_able_to_fetch_a_list_of_registered_realms():112    rr = RespectfulRequester()113    rr.register_realm("TEST123", max_requests=100, timespan=300)114    assert "TEST123" in rr.fetch_registered_realms()115    rr.unregister_realm("TEST123")116def test_the_instance_should_ignore_invalid_values_when_updating_a_realm():117    rr = RespectfulRequester()118    rr.register_realm("TEST123", max_requests=100, timespan=300)119    rr.update_realm("TEST123", max_requests="FOO", timespan="BAR", fake=True)120    assert rr.realm_max_requests("TEST123") == 100121    assert rr.realm_timespan("TEST123") == 300122    rr.unregister_realm("TEST123")123def test_the_instance_should_be_able_to_unregister_a_realm():124    rr = RespectfulRequester()125    rr.register_realm("TEST123", max_requests=100, timespan=300)126    request_func = lambda: requests.get("http://google.com")127    rr._perform_request(request_func, realms=["TEST123"])128    rr.unregister_realm("TEST123")129    assert rr.redis.get(rr._realm_redis_key("TEST123")) is None130    assert not rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST123")131    assert not len(rr.redis.keys("%s:REQUESTS:%s:*" % (rr.redis_prefix, "TEST123")))132def test_the_instance_should_be_able_to_unregister_multiple_realms():133    rr = RespectfulRequester()134    realm_tuples = [135        ["TEST123", 100, 300],136        ["TEST234", 200, 600],137        ["TEST345", 300, 900],138    ]139    rr.register_realms(realm_tuples)140    request_func = lambda: requests.get("http://google.com")141    rr._perform_request(request_func, realms=["TEST123", "TEST234", "TEST345"])142    rr.unregister_realms(["TEST123", "TEST234", "TEST345"])143    assert rr.redis.get(rr._realm_redis_key("TEST123")) is None144    assert not rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST123")145    assert not len(rr.redis.keys("%s:REQUESTS:%s:*" % (rr.redis_prefix, "TEST123")))146    assert rr.redis.get(rr._realm_redis_key("TEST234")) is None147    assert not rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST234")148    assert not len(rr.redis.keys("%s:REQUESTS:%s:*" % (rr.redis_prefix, "TEST234")))149    assert rr.redis.get(rr._realm_redis_key("TEST345")) is None150    assert not rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST345")151    assert not len(rr.redis.keys("%s:REQUESTS:%s:*" % (rr.redis_prefix, "TEST345")))152def test_the_instance_should_ignore_invalid_realms_when_attempting_to_unregister():153    rr = RespectfulRequester()154    rr.unregister_realm("TEST123")155    rr.unregister_realm("TEST")156    rr.unregister_realm("TEST12345")157def test_the_instance_should_be_able_to_fetch_the_information_of_a_registered_realm():158    rr = RespectfulRequester()159    rr.register_realm("TEST123", max_requests=100, timespan=300)160    assert b"max_requests" in rr._fetch_realm_info("TEST123")161    assert rr._fetch_realm_info("TEST123")[b"max_requests"] == b"100"162    assert b"timespan" in rr._fetch_realm_info("TEST123")163    assert rr._fetch_realm_info("TEST123")[b"timespan"] == b"300"164    rr.unregister_realm("TEST123")165def test_the_instance_should_be_able_to_return_the_max_requests_value_of_a_registered_realm():166    rr = RespectfulRequester()167    rr.register_realm("TEST123", max_requests=100, timespan=300)168    assert rr.realm_max_requests("TEST123") == 100169    rr.unregister_realm("TEST123")170def test_the_instance_should_be_able_to_return_the_timespan_value_of_a_registered_realm():171    rr = RespectfulRequester()172    rr.register_realm("TEST123", max_requests=100, timespan=300)173    assert rr.realm_timespan("TEST123") == 300174    rr.unregister_realm("TEST123")175def test_the_instance_should_validate_that_the_request_lambda_is_actually_a_requests_call():176    rr = RespectfulRequester()177    with pytest.raises(RequestsRespectfulError):178        rr._validate_request_func(lambda: 1 + 1)179    rr._validate_request_func(lambda: requests.get("http://google.com"))180    rr._validate_request_func(lambda: getattr(requests, "get")("http://google.com"))181def test_the_instance_should_be_able_to_determine_the_amount_of_requests_performed_in_a_timespan_for_a_registered_realm():182    rr = RespectfulRequester()...respectful_requester.py
Source:respectful_requester.py  
...85    def unregister_realms(self, realms):86        for realm in realms:87            self.unregister_realm(realm)88        return True89    def realm_max_requests(self, realm):90        realm_info = self._fetch_realm_info(realm)91        return int(realm_info["max_requests".encode("utf-8")].decode("utf-8"))92    def realm_timespan(self, realm):93        realm_info = self._fetch_realm_info(realm)94        return int(realm_info["timespan".encode("utf-8")].decode("utf-8"))95    @classmethod96    def configure(cls, **kwargs):97        if "redis" in kwargs:98            if type(kwargs["redis"]) != dict:99                raise RequestsRespectfulConfigError("'redis' key must be a dict")100            expected_redis_keys = ["host", "port", "password", "database"]101            missing_redis_keys = list()102            for expected_redis_key in expected_redis_keys:103                if expected_redis_key not in kwargs["redis"]:104                    missing_redis_keys.append(expected_redis_key)105            if len(missing_redis_keys):106                raise RequestsRespectfulConfigError(107                    "'%s' %s missing from the 'redis' configuration key"108                    % (109                        ", ".join(missing_redis_keys),110                        "is" if len(missing_redis_keys) == 1 else "are",111                    )112                )113            config["redis"] = kwargs["redis"]114            global redis115            redis = StrictRedis(116                host=config["redis"]["host"],117                port=config["redis"]["port"],118                password=config["redis"]["password"],119                db=config["redis"]["database"],120            )121        if "safety_threshold" in kwargs:122            if (123                type(kwargs["safety_threshold"]) != int124                or kwargs["safety_threshold"] < 0125            ):126                raise RequestsRespectfulConfigError(127                    "'safety_threshold' key must be a positive integer"128                )129            config["safety_threshold"] = kwargs["safety_threshold"]130        if "requests_module_name" in kwargs:131            if type(kwargs["requests_module_name"]) != str:132                raise RequestsRespectfulConfigError(133                    "'requests_module_name' key must be string"134                )135            config["requests_module_name"] = kwargs["requests_module_name"]136        return config137    @classmethod138    def configure_default(cls):139        for key in config:140            config[key] = default_config[key]141        return config142    def _perform_request(self, request_func, realms=None):143        self._validate_request_func(request_func)144        rate_limited_realms = list()145        for realm in realms:146            if not self._can_perform_request(realm):147                rate_limited_realms.append(realm)148        if not len(rate_limited_realms):149            for realm in realms:150                request_uuid = str(uuid.uuid4())151                self.redis.setex(152                    name="%s:REQUEST:%s:%s" % (self.redis_prefix, realm, request_uuid),153                    time=self.realm_timespan(realm),154                    value=request_uuid,155                )156            return request_func()157        else:158            raise RequestsRespectfulRateLimitedError(159                "Currently rate-limited on Realm(s): %s"160                % ", ".join(rate_limited_realms)161            )162    def _realm_redis_key(self, realm):163        return "%s:REALMS:%s" % (self.redis_prefix, realm)164    def _fetch_realm_info(self, realm):165        redis_key = self._realm_redis_key(realm)166        return self.redis.hgetall(redis_key)167    def _requests_in_timespan(self, realm):168        return len(169            self.redis.scan(170                cursor=0,171                match="%s:REQUEST:%s:*" % (self.redis_prefix, realm),172                count=self._redis_keys_in_db() + 100,173            )[1]174        )175    def _redis_keys_in_db(self):176        return self.redis.info().get("db%d" % config["redis"]["database"]).get("keys")177    def _can_perform_request(self, realm):178        return self._requests_in_timespan(realm) < (179            self.realm_max_requests(realm) - config["safety_threshold"]180        )181    # Requests proxy182    def _requests_proxy(self, method, *args, **kwargs):183        realm = kwargs.pop("realm", None)184        realms = kwargs.pop("realms", list())185        if realm:186            warnings.warn(187                "'realm' kwarg will be removed in favor of providing a 'realms' list starting in 0.3.0",188                DeprecationWarning,189            )190            realms.append(realm)191        if not len(realms):192            raise RequestsRespectfulError("'realms' is a required kwarg")193        wait = kwargs.pop("wait", False)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
