Best Python code snippet using selenium-respectful_python
test_respectful_requester.py
Source:test_respectful_requester.py  
...7import requests as r8# Tests9def test_setup():10    rr = RespectfulRequester()11    rr.unregister_realm("TEST123")12    RespectfulRequester.configure_default()13def test_the_class_should_accept_configuration_values():14    rr = RespectfulRequester()15    assert rr._config()["safety_threshold"] == 1016    assert rr._config()["requests_module_name"] == "requests"17    RespectfulRequester.configure(safety_threshold=20)18    RespectfulRequester.configure(requests_module_name="r")19    RespectfulRequester.configure(redis={"host": "0.0.0.0", "port": 6379, "database": 1})20    assert rr._config()["safety_threshold"] == 2021    assert rr._config()["requests_module_name"] == "r"22    assert rr._config()["redis"]["host"] == "0.0.0.0"23    assert rr._config()["redis"]["database"] == 124    RespectfulRequester.configure_default()25def test_the_class_should_validate_provided_configuration_values():26    with pytest.raises(RequestsRespectfulConfigError):27        RespectfulRequester.configure(redis="REDIS")28    with pytest.raises(RequestsRespectfulConfigError):29        RespectfulRequester.configure(redis=dict())30    with pytest.raises(RequestsRespectfulConfigError):31        RespectfulRequester.configure(redis={"host": "localhost", "port": 6379})32    RespectfulRequester.configure(redis={"host": "localhost", "port": 6379, "database": 0})33    with pytest.raises(RequestsRespectfulConfigError):34        RespectfulRequester.configure(safety_threshold="SAFETY")35    with pytest.raises(RequestsRespectfulConfigError):36        RespectfulRequester.configure(safety_threshold=-15)37    RespectfulRequester.configure(safety_threshold=10)38    with pytest.raises(RequestsRespectfulConfigError):39        RespectfulRequester.configure(requests_module_name=100)40    RespectfulRequester.configure(requests_module_name="requests")41    RespectfulRequester.configure_default()42def test_the_class_should_be_able_to_restore_the_default_configuration_values():43    rr = RespectfulRequester()44    RespectfulRequester.configure(safety_threshold=20)45    RespectfulRequester.configure(requests_module_name="r")46    RespectfulRequester.configure(redis={"host": "0.0.0.0", "port": 6379, "database": 1})47    RespectfulRequester.configure_default()48    assert rr._config()["safety_threshold"] == 1049    assert rr._config()["requests_module_name"] == "requests"50    assert rr._config()["redis"]["host"] == "localhost"51    assert rr._config()["redis"]["database"] == 052def test_the_instance_should_have_a_property_that_holds_a_redis_object():53    rr = RespectfulRequester()54    assert type(rr.redis) == redis.StrictRedis55def test_the_instance_should_have_a_property_that_holds_a_redis_prefix():56    rr = RespectfulRequester()57    assert rr.redis_prefix == "RespectfulRequester"58def test_the_instance_should_be_able_to_access_the_global_config():59    rr = RespectfulRequester()60    assert "redis" in rr._config()61    assert "safety_threshold" in rr._config()62    assert "requests_module_name" in rr._config()63def test_the_instance_should_be_able_to_generate_a_redis_key_when_provided_with_a_realm():64    rr = RespectfulRequester()65    assert rr._realm_redis_key("TEST") == "%s:REALMS:TEST" % rr.redis_prefix66    assert rr._realm_redis_key("TEST2") == "%s:REALMS:TEST2" % rr.redis_prefix67    assert rr._realm_redis_key("TEST SPACED") == "%s:REALMS:TEST SPACED" % rr.redis_prefix68    assert rr._realm_redis_key("TEST ÃÃÃ") == "%s:REALMS:TEST ÃÃÃ" % rr.redis_prefix69def test_the_instance_should_be_able_to_register_a_realm():70    rr = RespectfulRequester()71    rr.register_realm("TEST123", max_requests=100, timespan=300)72    assert rr.realm_max_requests("TEST123") == 10073    assert rr.realm_timespan("TEST123") == 30074    assert rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST123")75    rr.unregister_realm("TEST123")76def test_the_instance_should_be_able_to_register_multiple_realms():77    rr = RespectfulRequester()78    rr.register_realm("TEST123", max_requests=100, timespan=300)79    realm_tuples = [80        ["TEST123", 100, 300],81        ["TEST234", 200, 600],82        ["TEST345", 300, 900],83    ]84    rr.register_realms(realm_tuples)85    assert rr.realm_max_requests("TEST123") == 10086    assert rr.realm_timespan("TEST123") == 30087    assert rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST123")88    assert rr.realm_max_requests("TEST234") == 20089    assert rr.realm_timespan("TEST234") == 60090    assert rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST234")91    assert rr.realm_max_requests("TEST345") == 30092    assert rr.realm_timespan("TEST345") == 90093    assert rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST345")94    rr.unregister_realm("TEST123")95    rr.unregister_realm("TEST234")96    rr.unregister_realm("TEST345")97def test_the_instance_should_not_overwrite_when_registering_a_realm():98    rr = RespectfulRequester()99    rr.register_realm("TEST123", max_requests=100, timespan=300)100    rr.register_realm("TEST123", max_requests=1000, timespan=3000)101    assert rr.realm_max_requests("TEST123") == 100102    assert rr.realm_timespan("TEST123") == 300103    rr.unregister_realm("TEST123")104def test_the_instance_should_be_able_to_update_a_registered_realm():105    rr = RespectfulRequester()106    rr.register_realm("TEST123", max_requests=100, timespan=300)107    rr.update_realm("TEST123", max_requests=1000, timespan=3000)108    assert rr.realm_max_requests("TEST123") == 1000109    assert rr.realm_timespan("TEST123") == 3000110    rr.unregister_realm("TEST123")111def test_the_instance_should_be_able_to_fetch_a_list_of_registered_realms():112    rr = RespectfulRequester()113    rr.register_realm("TEST123", max_requests=100, timespan=300)114    assert "TEST123" in rr.fetch_registered_realms()115    rr.unregister_realm("TEST123")116def test_the_instance_should_ignore_invalid_values_when_updating_a_realm():117    rr = RespectfulRequester()118    rr.register_realm("TEST123", max_requests=100, timespan=300)119    rr.update_realm("TEST123", max_requests="FOO", timespan="BAR", fake=True)120    assert rr.realm_max_requests("TEST123") == 100121    assert rr.realm_timespan("TEST123") == 300122    rr.unregister_realm("TEST123")123def test_the_instance_should_be_able_to_unregister_a_realm():124    rr = RespectfulRequester()125    rr.register_realm("TEST123", max_requests=100, timespan=300)126    request_func = lambda: requests.get("http://google.com")127    rr._perform_request(request_func, realms=["TEST123"])128    rr.unregister_realm("TEST123")129    assert rr.redis.get(rr._realm_redis_key("TEST123")) is None130    assert not rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST123")131    assert not len(rr.redis.keys("%s:REQUESTS:%s:*" % (rr.redis_prefix, "TEST123")))132def test_the_instance_should_be_able_to_unregister_multiple_realms():133    rr = RespectfulRequester()134    realm_tuples = [135        ["TEST123", 100, 300],136        ["TEST234", 200, 600],137        ["TEST345", 300, 900],138    ]139    rr.register_realms(realm_tuples)140    request_func = lambda: requests.get("http://google.com")141    rr._perform_request(request_func, realms=["TEST123", "TEST234", "TEST345"])142    rr.unregister_realms(["TEST123", "TEST234", "TEST345"])143    assert rr.redis.get(rr._realm_redis_key("TEST123")) is None144    assert not rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST123")145    assert not len(rr.redis.keys("%s:REQUESTS:%s:*" % (rr.redis_prefix, "TEST123")))146    assert rr.redis.get(rr._realm_redis_key("TEST234")) is None147    assert not rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST234")148    assert not len(rr.redis.keys("%s:REQUESTS:%s:*" % (rr.redis_prefix, "TEST234")))149    assert rr.redis.get(rr._realm_redis_key("TEST345")) is None150    assert not rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST345")151    assert not len(rr.redis.keys("%s:REQUESTS:%s:*" % (rr.redis_prefix, "TEST345")))152def test_the_instance_should_ignore_invalid_realms_when_attempting_to_unregister():153    rr = RespectfulRequester()154    rr.unregister_realm("TEST123")155    rr.unregister_realm("TEST")156    rr.unregister_realm("TEST12345")157def test_the_instance_should_be_able_to_fetch_the_information_of_a_registered_realm():158    rr = RespectfulRequester()159    rr.register_realm("TEST123", max_requests=100, timespan=300)160    assert b"max_requests" in rr._fetch_realm_info("TEST123")161    assert rr._fetch_realm_info("TEST123")[b"max_requests"] == b"100"162    assert b"timespan" in rr._fetch_realm_info("TEST123")163    assert rr._fetch_realm_info("TEST123")[b"timespan"] == b"300"164    rr.unregister_realm("TEST123")165def test_the_instance_should_be_able_to_return_the_max_requests_value_of_a_registered_realm():166    rr = RespectfulRequester()167    rr.register_realm("TEST123", max_requests=100, timespan=300)168    assert rr.realm_max_requests("TEST123") == 100169    rr.unregister_realm("TEST123")170def test_the_instance_should_be_able_to_return_the_timespan_value_of_a_registered_realm():171    rr = RespectfulRequester()172    rr.register_realm("TEST123", max_requests=100, timespan=300)173    assert rr.realm_timespan("TEST123") == 300174    rr.unregister_realm("TEST123")175def test_the_instance_should_validate_that_the_request_lambda_is_actually_a_requests_call():176    rr = RespectfulRequester()177    with pytest.raises(RequestsRespectfulError):178        rr._validate_request_func(lambda: 1 + 1)179    rr._validate_request_func(lambda: requests.get("http://google.com"))180    rr._validate_request_func(lambda: getattr(requests, "get")("http://google.com"))181def test_the_instance_should_be_able_to_determine_the_amount_of_requests_performed_in_a_timespan_for_a_registered_realm():182    rr = RespectfulRequester()183    rr.register_realm("TEST123", max_requests=1000, timespan=5)184    assert rr._requests_in_timespan("TEST123") == 0185    request_func = lambda: requests.get("http://google.com")186    rr._perform_request(request_func, realms=["TEST123"])187    rr._perform_request(request_func, realms=["TEST123"])188    rr._perform_request(request_func, realms=["TEST123"])189    assert rr._requests_in_timespan("TEST123") == 3190    rr.unregister_realm("TEST123")191def test_the_instance_should_be_able_to_determine_if_it_can_perform_a_request_for_a_registered_realm():192    rr = RespectfulRequester()193    rr.register_realm("TEST123", max_requests=1000, timespan=5)194    assert rr._can_perform_request("TEST123")195    rr.update_realm("TEST123", max_requests=0)196    assert not rr._can_perform_request("TEST123")197    rr.unregister_realm("TEST123")198def test_the_instance_should_not_allow_a_request_to_be_made_on_an_unregistered_realm():199    rr = RespectfulRequester()200    request_func = lambda: requests.get("http://google.com")201    with pytest.raises(RequestsRespectfulError):202        rr.request(request_func, realms=["TEST123"])203def test_the_instance_should_perform_the_request_if_it_is_allowed_to_on_a_registered_realm():204    rr = RespectfulRequester()205    rr.register_realm("TEST123", max_requests=1000, timespan=5)206    request_func = lambda: requests.get("http://google.com")207    assert type(rr._perform_request(request_func, realms=["TEST123"])) == requests.Response208    rr.unregister_realm("TEST123")209def test_the_instance_should_perform_the_request_if_it_is_allowed_to_on_multiple_registered_realms():210    rr = RespectfulRequester()211    rr.register_realm("TEST123", max_requests=1000, timespan=5)212    rr.register_realm("TEST234", max_requests=1000, timespan=5)213    request_func = lambda: requests.get("http://google.com")214    assert type(rr._perform_request(request_func, realms=["TEST123", "TEST234"])) == requests.Response215    rr.unregister_realm("TEST123")216    rr.unregister_realm("TEST234")217def test_the_instance_should_return_a_rate_limit_exception_if_the_request_is_not_allowed_on_a_registered_realm():218    rr = RespectfulRequester()219    rr.register_realm("TEST123", max_requests=0, timespan=5)220    request_func = lambda: requests.get("http://google.com")221    with pytest.raises(RequestsRespectfulRateLimitedError):222        rr._perform_request(request_func, realms=["TEST123"])223    rr.unregister_realm("TEST123")224def test_the_instance_should_return_a_rate_limit_exception_if_the_request_is_not_allowed_on_one_or_multiple_registered_realms():225    rr = RespectfulRequester()226    rr.register_realm("TEST123", max_requests=0, timespan=5)227    rr.register_realm("TEST234", max_requests=0, timespan=5)228    request_func = lambda: requests.get("http://google.com")229    with pytest.raises(RequestsRespectfulRateLimitedError):230        rr._perform_request(request_func, realms=["TEST123", "TEST234"])231    rr.update_realm("TEST123", max_requests=10)232    with pytest.raises(RequestsRespectfulRateLimitedError):233        rr._perform_request(request_func, realms=["TEST123", "TEST234"])234    rr.update_realm("TEST123", max_requests=0)235    rr.update_realm("TEST234", max_requests=10)236    with pytest.raises(RequestsRespectfulRateLimitedError):237        rr._perform_request(request_func, realms=["TEST123", "TEST234"])238    rr.unregister_realm("TEST123")239    rr.unregister_realm("TEST234")240def test_the_instance_should_be_able_to_wait_for_a_request_to_be_allowed_on_a_registered_realm():241    rr = RespectfulRequester()242    RespectfulRequester.configure(safety_threshold=0)243    rr.register_realm("TEST123", max_requests=1, timespan=2)244    request_func = lambda: requests.get("http://google.com")245    rr.request(request_func, realms=["TEST123"], wait=True)246    rr.request(request_func, realms=["TEST123"], wait=True)247    rr.request(request_func, realms=["TEST123"], wait=True)248    rr.unregister_realm("TEST123")249    RespectfulRequester.configure_default()250def test_the_instance_should_be_able_to_wait_for_a_request_to_be_allowed_on_multiple_registered_realms():251    rr = RespectfulRequester()252    RespectfulRequester.configure(safety_threshold=0)253    rr.register_realm("TEST123", max_requests=1, timespan=5)254    rr.register_realm("TEST234", max_requests=1, timespan=2)255    request_func = lambda: requests.get("http://google.com")256    rr.request(request_func, realms=["TEST123", "TEST234"], wait=True)257    rr.request(request_func, realms=["TEST123", "TEST234"], wait=True)258    rr.request(request_func, realms=["TEST123", "TEST234"], wait=True)259    rr.unregister_realm("TEST123")260    rr.unregister_realm("TEST234")261    RespectfulRequester.configure_default()262def test_the_instance_should_recognize_the_requests_proxy_methods():263    rr = RespectfulRequester()264    getattr(rr, "delete")265    getattr(rr, "get")266    getattr(rr, "head")267    getattr(rr, "options")268    getattr(rr, "patch")269    getattr(rr, "post")270    getattr(rr, "put")271    with pytest.raises(AttributeError):272        getattr(rr, "foo")273def test_the_instance_should_get_the_same_results_by_using_the_requests_proxy_as_when_using_the_request_method():274    rr = RespectfulRequester()275    rr.register_realm("TEST123", max_requests=100, timespan=300)276    assert type(rr.get("http://google.com", realm="TEST123")) == requests.Response277    rr.update_realm("TEST123", max_requests=0)278    with pytest.raises(RequestsRespectfulRateLimitedError):279        rr.get("http://google.com", realm="TEST123")280    rr.unregister_realm("TEST123")281def test_the_safety_threshold_configuration_value_should_have_the_expected_effect():282    rr = RespectfulRequester()283    rr.register_realm("TEST123", max_requests=11, timespan=300)284    RespectfulRequester.configure(safety_threshold=10)285    request_func = lambda: requests.get("http://google.com")286    rr.request(request_func, realms=["TEST123"])287    with pytest.raises(RequestsRespectfulRateLimitedError):288        rr.request(request_func, realms=["TEST123"])289    RespectfulRequester.configure_default()290    rr.unregister_realm("TEST123")291def test_the_requests_module_name_configuration_value_should_have_the_expected_effect():292    rr = RespectfulRequester()293    RespectfulRequester.configure(requests_module_name="r")294    request_func = lambda: r.get("http://google.com")295    rr._validate_request_func(request_func)296    RespectfulRequester.configure_default()297def test_teardown():...respectful_requester.py
Source:respectful_requester.py  
...39        else:40            return self._perform_request(request_func, realms=realms)41    def fetch_registered_realms(self):42        return list(map(lambda k: k.decode("utf-8"), self.redis.smembers("%s:REALMS" % self.redis_prefix)))43    def register_realm(self, realm, max_requests, timespan):44        redis_key = self._realm_redis_key(realm)45        if not self.redis.hexists(redis_key, "max_requests"):46            self.redis.hmset(redis_key, {"max_requests": max_requests, "timespan": timespan})47            self.redis.sadd("%s:REALMS" % self.redis_prefix, realm)48        return True49    def register_realms(self, realm_tuples):50        for realm_tuple in realm_tuples:51            self.register_realm(*realm_tuple)52        return True53    def update_realm(self, realm, **kwargs):54        redis_key = self._realm_redis_key(realm)55        updatable_keys = ["max_requests", "timespan"]56        for updatable_key in updatable_keys:57            if updatable_key in kwargs and type(kwargs[updatable_key]) == int:58                self.redis.hset(redis_key, updatable_key, kwargs[updatable_key])59        return True60    def unregister_realm(self, realm):61        self.redis.delete(self._realm_redis_key(realm))62        self.redis.srem("%s:REALMS" % self.redis_prefix, realm)63        request_keys = self.redis.keys("%s:REQUEST:%s:*" % (self.redis_prefix, realm))64        [self.redis.delete(k) for k in request_keys]65        return True66    def unregister_realms(self, realms):67        for realm in realms:68            self.unregister_realm(realm)69        return True70    def realm_max_requests(self, realm):71        realm_info = self._fetch_realm_info(realm)72        return int(realm_info["max_requests".encode("utf-8")].decode("utf-8"))73    def realm_timespan(self, realm):74        realm_info = self._fetch_realm_info(realm)75        return int(realm_info["timespan".encode("utf-8")].decode("utf-8"))76    @classmethod77    def configure(cls, **kwargs):78        if "redis" in kwargs:79            if type(kwargs["redis"]) != dict:80                raise RequestsRespectfulConfigError("'redis' key must be a dict")81            expected_redis_keys = ["host", "port", "password", "database"]82            missing_redis_keys = list()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
