How to use register_realm method in selenium-respectful

Best Python code snippet using selenium-respectful_python

test_respectful_requester.py

Source:test_respectful_requester.py Github

copy

Full Screen

...7import requests as r8# Tests9def test_setup():10 rr = RespectfulRequester()11 rr.unregister_realm("TEST123")12 RespectfulRequester.configure_default()13def test_the_class_should_accept_configuration_values():14 rr = RespectfulRequester()15 assert rr._config()["safety_threshold"] == 1016 assert rr._config()["requests_module_name"] == "requests"17 RespectfulRequester.configure(safety_threshold=20)18 RespectfulRequester.configure(requests_module_name="r")19 RespectfulRequester.configure(redis={"host": "0.0.0.0", "port": 6379, "database": 1})20 assert rr._config()["safety_threshold"] == 2021 assert rr._config()["requests_module_name"] == "r"22 assert rr._config()["redis"]["host"] == "0.0.0.0"23 assert rr._config()["redis"]["database"] == 124 RespectfulRequester.configure_default()25def test_the_class_should_validate_provided_configuration_values():26 with pytest.raises(RequestsRespectfulConfigError):27 RespectfulRequester.configure(redis="REDIS")28 with pytest.raises(RequestsRespectfulConfigError):29 RespectfulRequester.configure(redis=dict())30 with pytest.raises(RequestsRespectfulConfigError):31 RespectfulRequester.configure(redis={"host": "localhost", "port": 6379})32 RespectfulRequester.configure(redis={"host": "localhost", "port": 6379, "database": 0})33 with pytest.raises(RequestsRespectfulConfigError):34 RespectfulRequester.configure(safety_threshold="SAFETY")35 with pytest.raises(RequestsRespectfulConfigError):36 RespectfulRequester.configure(safety_threshold=-15)37 RespectfulRequester.configure(safety_threshold=10)38 with pytest.raises(RequestsRespectfulConfigError):39 RespectfulRequester.configure(requests_module_name=100)40 RespectfulRequester.configure(requests_module_name="requests")41 RespectfulRequester.configure_default()42def test_the_class_should_be_able_to_restore_the_default_configuration_values():43 rr = RespectfulRequester()44 RespectfulRequester.configure(safety_threshold=20)45 RespectfulRequester.configure(requests_module_name="r")46 RespectfulRequester.configure(redis={"host": "0.0.0.0", "port": 6379, "database": 1})47 RespectfulRequester.configure_default()48 assert rr._config()["safety_threshold"] == 1049 assert rr._config()["requests_module_name"] == "requests"50 assert rr._config()["redis"]["host"] == "localhost"51 assert rr._config()["redis"]["database"] == 052def test_the_instance_should_have_a_property_that_holds_a_redis_object():53 rr = RespectfulRequester()54 assert type(rr.redis) == redis.StrictRedis55def test_the_instance_should_have_a_property_that_holds_a_redis_prefix():56 rr = RespectfulRequester()57 assert rr.redis_prefix == "RespectfulRequester"58def test_the_instance_should_be_able_to_access_the_global_config():59 rr = RespectfulRequester()60 assert "redis" in rr._config()61 assert "safety_threshold" in rr._config()62 assert "requests_module_name" in rr._config()63def test_the_instance_should_be_able_to_generate_a_redis_key_when_provided_with_a_realm():64 rr = RespectfulRequester()65 assert rr._realm_redis_key("TEST") == "%s:REALMS:TEST" % rr.redis_prefix66 assert rr._realm_redis_key("TEST2") == "%s:REALMS:TEST2" % rr.redis_prefix67 assert rr._realm_redis_key("TEST SPACED") == "%s:REALMS:TEST SPACED" % rr.redis_prefix68 assert rr._realm_redis_key("TEST ÉÉÉ") == "%s:REALMS:TEST ÉÉÉ" % rr.redis_prefix69def test_the_instance_should_be_able_to_register_a_realm():70 rr = RespectfulRequester()71 rr.register_realm("TEST123", max_requests=100, timespan=300)72 assert rr.realm_max_requests("TEST123") == 10073 assert rr.realm_timespan("TEST123") == 30074 assert rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST123")75 rr.unregister_realm("TEST123")76def test_the_instance_should_be_able_to_register_multiple_realms():77 rr = RespectfulRequester()78 rr.register_realm("TEST123", max_requests=100, timespan=300)79 realm_tuples = [80 ["TEST123", 100, 300],81 ["TEST234", 200, 600],82 ["TEST345", 300, 900],83 ]84 rr.register_realms(realm_tuples)85 assert rr.realm_max_requests("TEST123") == 10086 assert rr.realm_timespan("TEST123") == 30087 assert rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST123")88 assert rr.realm_max_requests("TEST234") == 20089 assert rr.realm_timespan("TEST234") == 60090 assert rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST234")91 assert rr.realm_max_requests("TEST345") == 30092 assert rr.realm_timespan("TEST345") == 90093 assert rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST345")94 rr.unregister_realm("TEST123")95 rr.unregister_realm("TEST234")96 rr.unregister_realm("TEST345")97def test_the_instance_should_not_overwrite_when_registering_a_realm():98 rr = RespectfulRequester()99 rr.register_realm("TEST123", max_requests=100, timespan=300)100 rr.register_realm("TEST123", max_requests=1000, timespan=3000)101 assert rr.realm_max_requests("TEST123") == 100102 assert rr.realm_timespan("TEST123") == 300103 rr.unregister_realm("TEST123")104def test_the_instance_should_be_able_to_update_a_registered_realm():105 rr = RespectfulRequester()106 rr.register_realm("TEST123", max_requests=100, timespan=300)107 rr.update_realm("TEST123", max_requests=1000, timespan=3000)108 assert rr.realm_max_requests("TEST123") == 1000109 assert rr.realm_timespan("TEST123") == 3000110 rr.unregister_realm("TEST123")111def test_the_instance_should_be_able_to_fetch_a_list_of_registered_realms():112 rr = RespectfulRequester()113 rr.register_realm("TEST123", max_requests=100, timespan=300)114 assert "TEST123" in rr.fetch_registered_realms()115 rr.unregister_realm("TEST123")116def test_the_instance_should_ignore_invalid_values_when_updating_a_realm():117 rr = RespectfulRequester()118 rr.register_realm("TEST123", max_requests=100, timespan=300)119 rr.update_realm("TEST123", max_requests="FOO", timespan="BAR", fake=True)120 assert rr.realm_max_requests("TEST123") == 100121 assert rr.realm_timespan("TEST123") == 300122 rr.unregister_realm("TEST123")123def test_the_instance_should_be_able_to_unregister_a_realm():124 rr = RespectfulRequester()125 rr.register_realm("TEST123", max_requests=100, timespan=300)126 request_func = lambda: requests.get("http://google.com")127 rr._perform_request(request_func, realms=["TEST123"])128 rr.unregister_realm("TEST123")129 assert rr.redis.get(rr._realm_redis_key("TEST123")) is None130 assert not rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST123")131 assert not len(rr.redis.keys("%s:REQUESTS:%s:*" % (rr.redis_prefix, "TEST123")))132def test_the_instance_should_be_able_to_unregister_multiple_realms():133 rr = RespectfulRequester()134 realm_tuples = [135 ["TEST123", 100, 300],136 ["TEST234", 200, 600],137 ["TEST345", 300, 900],138 ]139 rr.register_realms(realm_tuples)140 request_func = lambda: requests.get("http://google.com")141 rr._perform_request(request_func, realms=["TEST123", "TEST234", "TEST345"])142 rr.unregister_realms(["TEST123", "TEST234", "TEST345"])143 assert rr.redis.get(rr._realm_redis_key("TEST123")) is None144 assert not rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST123")145 assert not len(rr.redis.keys("%s:REQUESTS:%s:*" % (rr.redis_prefix, "TEST123")))146 assert rr.redis.get(rr._realm_redis_key("TEST234")) is None147 assert not rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST234")148 assert not len(rr.redis.keys("%s:REQUESTS:%s:*" % (rr.redis_prefix, "TEST234")))149 assert rr.redis.get(rr._realm_redis_key("TEST345")) is None150 assert not rr.redis.sismember("%s:REALMS" % rr.redis_prefix, "TEST345")151 assert not len(rr.redis.keys("%s:REQUESTS:%s:*" % (rr.redis_prefix, "TEST345")))152def test_the_instance_should_ignore_invalid_realms_when_attempting_to_unregister():153 rr = RespectfulRequester()154 rr.unregister_realm("TEST123")155 rr.unregister_realm("TEST")156 rr.unregister_realm("TEST12345")157def test_the_instance_should_be_able_to_fetch_the_information_of_a_registered_realm():158 rr = RespectfulRequester()159 rr.register_realm("TEST123", max_requests=100, timespan=300)160 assert b"max_requests" in rr._fetch_realm_info("TEST123")161 assert rr._fetch_realm_info("TEST123")[b"max_requests"] == b"100"162 assert b"timespan" in rr._fetch_realm_info("TEST123")163 assert rr._fetch_realm_info("TEST123")[b"timespan"] == b"300"164 rr.unregister_realm("TEST123")165def test_the_instance_should_be_able_to_return_the_max_requests_value_of_a_registered_realm():166 rr = RespectfulRequester()167 rr.register_realm("TEST123", max_requests=100, timespan=300)168 assert rr.realm_max_requests("TEST123") == 100169 rr.unregister_realm("TEST123")170def test_the_instance_should_be_able_to_return_the_timespan_value_of_a_registered_realm():171 rr = RespectfulRequester()172 rr.register_realm("TEST123", max_requests=100, timespan=300)173 assert rr.realm_timespan("TEST123") == 300174 rr.unregister_realm("TEST123")175def test_the_instance_should_validate_that_the_request_lambda_is_actually_a_requests_call():176 rr = RespectfulRequester()177 with pytest.raises(RequestsRespectfulError):178 rr._validate_request_func(lambda: 1 + 1)179 rr._validate_request_func(lambda: requests.get("http://google.com"))180 rr._validate_request_func(lambda: getattr(requests, "get")("http://google.com"))181def test_the_instance_should_be_able_to_determine_the_amount_of_requests_performed_in_a_timespan_for_a_registered_realm():182 rr = RespectfulRequester()183 rr.register_realm("TEST123", max_requests=1000, timespan=5)184 assert rr._requests_in_timespan("TEST123") == 0185 request_func = lambda: requests.get("http://google.com")186 rr._perform_request(request_func, realms=["TEST123"])187 rr._perform_request(request_func, realms=["TEST123"])188 rr._perform_request(request_func, realms=["TEST123"])189 assert rr._requests_in_timespan("TEST123") == 3190 rr.unregister_realm("TEST123")191def test_the_instance_should_be_able_to_determine_if_it_can_perform_a_request_for_a_registered_realm():192 rr = RespectfulRequester()193 rr.register_realm("TEST123", max_requests=1000, timespan=5)194 assert rr._can_perform_request("TEST123")195 rr.update_realm("TEST123", max_requests=0)196 assert not rr._can_perform_request("TEST123")197 rr.unregister_realm("TEST123")198def test_the_instance_should_not_allow_a_request_to_be_made_on_an_unregistered_realm():199 rr = RespectfulRequester()200 request_func = lambda: requests.get("http://google.com")201 with pytest.raises(RequestsRespectfulError):202 rr.request(request_func, realms=["TEST123"])203def test_the_instance_should_perform_the_request_if_it_is_allowed_to_on_a_registered_realm():204 rr = RespectfulRequester()205 rr.register_realm("TEST123", max_requests=1000, timespan=5)206 request_func = lambda: requests.get("http://google.com")207 assert type(rr._perform_request(request_func, realms=["TEST123"])) == requests.Response208 rr.unregister_realm("TEST123")209def test_the_instance_should_perform_the_request_if_it_is_allowed_to_on_multiple_registered_realms():210 rr = RespectfulRequester()211 rr.register_realm("TEST123", max_requests=1000, timespan=5)212 rr.register_realm("TEST234", max_requests=1000, timespan=5)213 request_func = lambda: requests.get("http://google.com")214 assert type(rr._perform_request(request_func, realms=["TEST123", "TEST234"])) == requests.Response215 rr.unregister_realm("TEST123")216 rr.unregister_realm("TEST234")217def test_the_instance_should_return_a_rate_limit_exception_if_the_request_is_not_allowed_on_a_registered_realm():218 rr = RespectfulRequester()219 rr.register_realm("TEST123", max_requests=0, timespan=5)220 request_func = lambda: requests.get("http://google.com")221 with pytest.raises(RequestsRespectfulRateLimitedError):222 rr._perform_request(request_func, realms=["TEST123"])223 rr.unregister_realm("TEST123")224def test_the_instance_should_return_a_rate_limit_exception_if_the_request_is_not_allowed_on_one_or_multiple_registered_realms():225 rr = RespectfulRequester()226 rr.register_realm("TEST123", max_requests=0, timespan=5)227 rr.register_realm("TEST234", max_requests=0, timespan=5)228 request_func = lambda: requests.get("http://google.com")229 with pytest.raises(RequestsRespectfulRateLimitedError):230 rr._perform_request(request_func, realms=["TEST123", "TEST234"])231 rr.update_realm("TEST123", max_requests=10)232 with pytest.raises(RequestsRespectfulRateLimitedError):233 rr._perform_request(request_func, realms=["TEST123", "TEST234"])234 rr.update_realm("TEST123", max_requests=0)235 rr.update_realm("TEST234", max_requests=10)236 with pytest.raises(RequestsRespectfulRateLimitedError):237 rr._perform_request(request_func, realms=["TEST123", "TEST234"])238 rr.unregister_realm("TEST123")239 rr.unregister_realm("TEST234")240def test_the_instance_should_be_able_to_wait_for_a_request_to_be_allowed_on_a_registered_realm():241 rr = RespectfulRequester()242 RespectfulRequester.configure(safety_threshold=0)243 rr.register_realm("TEST123", max_requests=1, timespan=2)244 request_func = lambda: requests.get("http://google.com")245 rr.request(request_func, realms=["TEST123"], wait=True)246 rr.request(request_func, realms=["TEST123"], wait=True)247 rr.request(request_func, realms=["TEST123"], wait=True)248 rr.unregister_realm("TEST123")249 RespectfulRequester.configure_default()250def test_the_instance_should_be_able_to_wait_for_a_request_to_be_allowed_on_multiple_registered_realms():251 rr = RespectfulRequester()252 RespectfulRequester.configure(safety_threshold=0)253 rr.register_realm("TEST123", max_requests=1, timespan=5)254 rr.register_realm("TEST234", max_requests=1, timespan=2)255 request_func = lambda: requests.get("http://google.com")256 rr.request(request_func, realms=["TEST123", "TEST234"], wait=True)257 rr.request(request_func, realms=["TEST123", "TEST234"], wait=True)258 rr.request(request_func, realms=["TEST123", "TEST234"], wait=True)259 rr.unregister_realm("TEST123")260 rr.unregister_realm("TEST234")261 RespectfulRequester.configure_default()262def test_the_instance_should_recognize_the_requests_proxy_methods():263 rr = RespectfulRequester()264 getattr(rr, "delete")265 getattr(rr, "get")266 getattr(rr, "head")267 getattr(rr, "options")268 getattr(rr, "patch")269 getattr(rr, "post")270 getattr(rr, "put")271 with pytest.raises(AttributeError):272 getattr(rr, "foo")273def test_the_instance_should_get_the_same_results_by_using_the_requests_proxy_as_when_using_the_request_method():274 rr = RespectfulRequester()275 rr.register_realm("TEST123", max_requests=100, timespan=300)276 assert type(rr.get("http://google.com", realm="TEST123")) == requests.Response277 rr.update_realm("TEST123", max_requests=0)278 with pytest.raises(RequestsRespectfulRateLimitedError):279 rr.get("http://google.com", realm="TEST123")280 rr.unregister_realm("TEST123")281def test_the_safety_threshold_configuration_value_should_have_the_expected_effect():282 rr = RespectfulRequester()283 rr.register_realm("TEST123", max_requests=11, timespan=300)284 RespectfulRequester.configure(safety_threshold=10)285 request_func = lambda: requests.get("http://google.com")286 rr.request(request_func, realms=["TEST123"])287 with pytest.raises(RequestsRespectfulRateLimitedError):288 rr.request(request_func, realms=["TEST123"])289 RespectfulRequester.configure_default()290 rr.unregister_realm("TEST123")291def test_the_requests_module_name_configuration_value_should_have_the_expected_effect():292 rr = RespectfulRequester()293 RespectfulRequester.configure(requests_module_name="r")294 request_func = lambda: r.get("http://google.com")295 rr._validate_request_func(request_func)296 RespectfulRequester.configure_default()297def test_teardown():...

Full Screen

Full Screen

respectful_requester.py

Source:respectful_requester.py Github

copy

Full Screen

...39 else:40 return self._perform_request(request_func, realms=realms)41 def fetch_registered_realms(self):42 return list(map(lambda k: k.decode("utf-8"), self.redis.smembers("%s:REALMS" % self.redis_prefix)))43 def register_realm(self, realm, max_requests, timespan):44 redis_key = self._realm_redis_key(realm)45 if not self.redis.hexists(redis_key, "max_requests"):46 self.redis.hmset(redis_key, {"max_requests": max_requests, "timespan": timespan})47 self.redis.sadd("%s:REALMS" % self.redis_prefix, realm)48 return True49 def register_realms(self, realm_tuples):50 for realm_tuple in realm_tuples:51 self.register_realm(*realm_tuple)52 return True53 def update_realm(self, realm, **kwargs):54 redis_key = self._realm_redis_key(realm)55 updatable_keys = ["max_requests", "timespan"]56 for updatable_key in updatable_keys:57 if updatable_key in kwargs and type(kwargs[updatable_key]) == int:58 self.redis.hset(redis_key, updatable_key, kwargs[updatable_key])59 return True60 def unregister_realm(self, realm):61 self.redis.delete(self._realm_redis_key(realm))62 self.redis.srem("%s:REALMS" % self.redis_prefix, realm)63 request_keys = self.redis.keys("%s:REQUEST:%s:*" % (self.redis_prefix, realm))64 [self.redis.delete(k) for k in request_keys]65 return True66 def unregister_realms(self, realms):67 for realm in realms:68 self.unregister_realm(realm)69 return True70 def realm_max_requests(self, realm):71 realm_info = self._fetch_realm_info(realm)72 return int(realm_info["max_requests".encode("utf-8")].decode("utf-8"))73 def realm_timespan(self, realm):74 realm_info = self._fetch_realm_info(realm)75 return int(realm_info["timespan".encode("utf-8")].decode("utf-8"))76 @classmethod77 def configure(cls, **kwargs):78 if "redis" in kwargs:79 if type(kwargs["redis"]) != dict:80 raise RequestsRespectfulConfigError("'redis' key must be a dict")81 expected_redis_keys = ["host", "port", "password", "database"]82 missing_redis_keys = list()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run selenium-respectful automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful