Best Python code snippet using locust
stats.py
Source:stats.py  
1# encoding: utf-82"""3The MIT License4Copyright (c) 2009-2010, Carl Byström, Jonatan Heyman5Permission is hereby granted, free of charge, to any person obtaining a copy6of this software and associated documentation files (the "Software"), to deal7in the Software without restriction, including without limitation the rights8to use, copy, modify, merge, publish, distribute, sublicense, and/or sell9copies of the Software, and to permit persons to whom the Software is10furnished to do so, subject to the following conditions:11The above copyright notice and this permission notice shall be included in12all copies or substantial portions of the Software.13THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR14IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,15FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE16AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19THE SOFTWARE.20"""21import time22import datetime23import gevent24import hashlib25import events26STATS_NAME_WIDTH = 6027class RequestStatsAdditionError(Exception):28    pass29class RequestStats(object):30    def __init__(self):31        self.entries = {}32        self.timings = {}33        self.errors = {}34        self.num_requests = 035        self.num_failures = 036        self.max_requests = None37        self.last_request_timestamp = None38        self.start_time = None39    def get_timings(self, name):40        timings = self.timings.get(name)41        if not timings:42            timings = TimingsEntry(name)43            self.timings[name] = timings44        return timings45    def get(self, name, method):46        """47        Retrieve a StatsEntry instance by name and method48        """49        entry = self.entries.get((name, method))50        if not entry:51            entry = StatsEntry(self, name, method)52            self.entries[(name, method)] = entry53        return entry54    def aggregated_stats(self, name="Total", full_request_history=False):55        """56        Returns a StatsEntry which is an aggregate of all stats entries57        within entries.58        """59        total = StatsEntry(self, name, method=None)60        for r in self.entries.itervalues():61            total.extend(r, full_request_history=full_request_history)62        return total63    def reset_all(self):64        """65        Go through all stats entries and reset them to zero66        """67        self.start_time = time.time()68        self.num_requests = 069        self.num_failures = 070        for r in self.entries.itervalues():71            r.reset()72        self.timings = {}73    def clear_all(self):74        """75        Remove all stats entries and errors76        """77        self.num_requests = 078        self.num_failures = 079        self.entries = {}80        self.timings = {}81        self.errors = {}82        self.max_requests = None83        self.last_request_timestamp = None84        self.start_time = None85class TimingsEntry(object):86    name = None87    max_response_times = {}88    current_max = 089    def __init__(self, name):90        self.name = name91    def reset(self):92        self.max_response_times = {}93        self.current_max = 094        return95    def log(self, timestamp, new_max_response_time):96        """97        check if there is an existing max, if so take the max and save it.98        """99        max_record = self.max_response_times.get(timestamp)100        if max_record:101            self.max_response_times[timestamp] = max(max_record, new_max_response_time)102        else:103            self.max_response_times[timestamp] = new_max_response_time104        self.current_max = new_max_response_time105        return106class StatsEntry(object):107    """108    Represents a single stats entry (name and method)109    """110    name = None111    """ Name (URL) of this stats entry """112    method = None113    """ Method (GET, POST, PUT, etc.) """114    num_requests = None115    """ The number of requests made """116    num_failures = None117    """ Number of failed request """118    total_response_time = None119    """ Total sum of the response times """120    min_response_time = None121    """ Minimum response time """122    max_response_time = None123    """ Maximum response time """124    num_reqs_per_sec = None125    """ A {second => request_count} dict that holds the number of requests made per second """126    response_times = None127    """128    A {response_time => count} dict that holds the response time distribution of all129    the requests.130    The keys (the response time in ms) are rounded to store 1, 2, ... 9, 10, 20. .. 90,131    100, 200 .. 900, 1000, 2000 ... 9000, in order to save memory.132    This dict is used to calculate the median and percentile response times.133    """134    total_content_length = None135    """ The sum of the content length of all the requests for this entry """136    start_time = None137    """ Time of the first request for this entry """138    last_request_timestamp = None139    """ Time of the last request for this entry """140    def __init__(self, stats, name, method):141        self.stats = stats142        self.name = name143        self.method = method144        self.reset()145    def reset(self):146        self.start_time = time.time()147        self.num_requests = 0148        self.num_failures = 0149        self.total_response_time = 0150        self.response_times = {}151        self.min_response_time = None152        self.max_response_time = 0153        self.last_request_timestamp = int(time.time())154        self.num_reqs_per_sec = {}155        self.total_content_length = 0156    def log(self, response_time, content_length):157        self.stats.num_requests += 1158        self.num_requests += 1159        self._log_time_of_request()160        self._log_response_time(response_time)161        # increase total content-length162        self.total_content_length += content_length163    def _log_time_of_request(self):164        t = int(time.time())165        self.num_reqs_per_sec[t] = self.num_reqs_per_sec.setdefault(t, 0) + 1166        self.last_request_timestamp = t167        self.stats.last_request_timestamp = t168    def _log_response_time(self, response_time):169        self.total_response_time += response_time170        if self.min_response_time is None:171            self.min_response_time = response_time172        self.min_response_time = min(self.min_response_time, response_time)173        self.max_response_time = max(self.max_response_time, response_time)174        # to avoid to much data that has to be transfered to the master node when175        # running in distributed mode, we save the response time rounded in a dict176        # so that 147 becomes 150, 3432 becomes 3400 and 58760 becomes 59000177        if response_time < 100:178            rounded_response_time = response_time179        elif response_time < 1000:180            rounded_response_time = int(round(response_time, -1))181        elif response_time < 10000:182            rounded_response_time = int(round(response_time, -2))183        else:184            rounded_response_time = int(round(response_time, -3))185        # increase request count for the rounded key in response time dict186        self.response_times.setdefault(rounded_response_time, 0)187        self.response_times[rounded_response_time] += 1188    def log_error(self, error):189        self.num_failures += 1190        self.stats.num_failures += 1191        key = StatsError.create_key(self.method, self.name, error)192        entry = self.stats.errors.get(key)193        if not entry:194            entry = StatsError(self.method, self.name, error)195            self.stats.errors[key] = entry196        entry.occured()197    @property198    def fail_ratio(self):199        try:200            return float(self.num_failures) / (self.num_requests + self.num_failures)201        except ZeroDivisionError:202            if self.num_failures > 0:203                return 1.0204            else:205                return 0.0206    @property207    def avg_response_time(self):208        try:209            return float(self.total_response_time) / self.num_requests210        except ZeroDivisionError:211            return 0212    @property213    def median_response_time(self):214        if not self.response_times:215            return 0216        return median_from_dict(self.num_requests, self.response_times)217    @property218    def current_rps(self):219        if self.stats.last_request_timestamp is None:220            return 0221        slice_start_time = max(self.stats.last_request_timestamp - 12, int(self.stats.start_time or 0))222        reqs = [self.num_reqs_per_sec.get(t, 0) for t in range(slice_start_time, self.stats.last_request_timestamp-2)]223        return avg(reqs)224    @property225    def total_rps(self):226        if not self.stats.last_request_timestamp or not self.stats.start_time:227            return 0.0228        return self.num_requests / max(self.stats.last_request_timestamp - self.stats.start_time, 1)229    @property230    def avg_content_length(self):231        try:232            return self.total_content_length / self.num_requests233        except ZeroDivisionError:234            return 0235    def extend(self, other, full_request_history=False):236        """237        Extend the data fro the current StatsEntry with the stats from another238        StatsEntry instance.239        If full_request_history is False, we'll only care to add the data from240        the last 20 seconds of other's stats. The reason for this argument is that241        extend can be used to generate an aggregate of multiple different StatsEntry242        instances on the fly, in order to get the *total* current RPS, average243        response time, etc.244        """245        self.last_request_timestamp = max(self.last_request_timestamp, other.last_request_timestamp)246        self.start_time = min(self.start_time, other.start_time)247        self.num_requests = self.num_requests + other.num_requests248        self.num_failures = self.num_failures + other.num_failures249        self.total_response_time = self.total_response_time + other.total_response_time250        self.max_response_time = max(self.max_response_time, other.max_response_time)251        self.min_response_time = min(self.min_response_time, other.min_response_time) or other.min_response_time252        self.total_content_length = self.total_content_length + other.total_content_length253        if full_request_history:254            for key in other.response_times:255                self.response_times[key] = self.response_times.get(key, 0) + other.response_times[key]256            for key in other.num_reqs_per_sec:257                self.num_reqs_per_sec[key] = self.num_reqs_per_sec.get(key, 0) +  other.num_reqs_per_sec[key]258        else:259            # still add the number of reqs per seconds the last 20 seconds260            for i in xrange(other.last_request_timestamp-20, other.last_request_timestamp+1):261                if i in other.num_reqs_per_sec:262                    self.num_reqs_per_sec[i] = self.num_reqs_per_sec.get(i, 0) + other.num_reqs_per_sec[i]263    def serialize(self):264        return {265            "name": self.name,266            "method": self.method,267            "last_request_timestamp": self.last_request_timestamp,268            "start_time": self.start_time,269            "num_requests": self.num_requests,270            "num_failures": self.num_failures,271            "total_response_time": self.total_response_time,272            "max_response_time": self.max_response_time,273            "min_response_time": self.min_response_time,274            "total_content_length": self.total_content_length,275            "response_times": self.response_times,276            "num_reqs_per_sec": self.num_reqs_per_sec,277        }278    @classmethod279    def unserialize(cls, data):280        obj = cls(None, data["name"], data["method"])281        for key in [282            "last_request_timestamp",283            "start_time",284            "num_requests",285            "num_failures",286            "total_response_time",287            "max_response_time",288            "min_response_time",289            "total_content_length",290            "response_times",291            "num_reqs_per_sec",292        ]:293            setattr(obj, key, data[key])294        return obj295    def get_stripped_report(self):296        """297        Return the serialized version of this StatsEntry, and then clear the current stats.298        """299        report = self.serialize()300        self.reset()301        return report302    def __str__(self):303        try:304            fail_percent = (self.num_failures/float(self.num_requests + self.num_failures))*100305        except ZeroDivisionError:306            fail_percent = 0307        return (" %-" + str(STATS_NAME_WIDTH) + "s %7d %12s %7d %7d %7d  | %7d %7.2f") % (308            self.method + " " + self.name,309            self.num_requests,310            "%d(%.2f%%)" % (self.num_failures, fail_percent),311            self.avg_response_time,312            self.min_response_time or 0,313            self.max_response_time,314            self.median_response_time or 0,315            self.current_rps or 0316        )317    def get_response_time_percentile(self, percent):318        """319        Get the response time that a certain number of percent of the requests320        finished within.321        Percent specified in range: 0.0 - 1.0322        """323        num_of_request = int((self.num_requests * percent))324        processed_count = 0325        for response_time in sorted(self.response_times.iterkeys(), reverse=True):326            processed_count += self.response_times[response_time]327            if((self.num_requests - processed_count) <= num_of_request):328                return response_time329    def percentile(self, tpl=" %-" + str(STATS_NAME_WIDTH) + "s %8d %6d %6d %6d %6d %6d %6d %6d %6d %6d"):330        if not self.num_requests:331            raise ValueError("Can't calculate percentile on url with no successful requests")332        return tpl % (333            str(self.method) + " " + self.name,334            self.num_requests,335            self.get_response_time_percentile(0.5),336            self.get_response_time_percentile(0.66),337            self.get_response_time_percentile(0.75),338            self.get_response_time_percentile(0.80),339            self.get_response_time_percentile(0.90),340            self.get_response_time_percentile(0.95),341            self.get_response_time_percentile(0.98),342            self.get_response_time_percentile(0.99),343            self.max_response_time344        )345class StatsError(object):346    def __init__(self, method, name, error, occurences=0):347        self.method = method348        self.name = name349        self.error = error350        self.occurences = occurences351    @classmethod352    def create_key(cls, method, name, error):353        key = "%s.%s.%r" % (method, name, error)354        return hashlib.md5(key).hexdigest()355    def occured(self):356        self.occurences += 1357    def to_name(self):358        return "%s %s: %r" % (self.method,359            self.name, repr(self.error))360    def to_dict(self):361        return {362            "method": self.method,363            "name": self.name,364            "error": repr(self.error),365            "occurences": self.occurences366        }367    @classmethod368    def from_dict(cls, data):369        return cls(370            data["method"],371            data["name"],372            data["error"],373            data["occurences"]374        )375def avg(values):376    return sum(values, 0.0) / max(len(values), 1)377def median_from_dict(total, count):378    """379    total is the number of requests made380    count is a dict {response_time: count}381    """382    pos = (total - 1) / 2383    for k in sorted(count.iterkeys()):384        if pos < count[k]:385            return k386        pos -= count[k]387global_stats = RequestStats()388"""389A global instance for holding the statistics. Should be removed eventually.390"""391def on_request_success(request_type, name, response_time, response_length):392    if global_stats.max_requests is not None and (global_stats.num_requests + global_stats.num_failures) >= global_stats.max_requests:393        raise Exception("Maximum number of requests reached")394    global_stats.get(name, request_type).log(response_time, response_length)395def on_request_failure(request_type, name, response_time, exception):396    if global_stats.max_requests is not None and (global_stats.num_requests + global_stats.num_failures) >= global_stats.max_requests:397        raise Exception("Maximum number of requests reached")398    global_stats.get(name, request_type).log_error(exception)399def on_report_to_master(client_id, data):400    data["stats"] = [global_stats.entries[key].get_stripped_report() for key in global_stats.entries.iterkeys() if not (global_stats.entries[key].num_requests == 0 and global_stats.entries[key].num_failures == 0)]401    data["errors"] =  dict([(k, e.to_dict()) for k, e in global_stats.errors.iteritems()])402    # clear out the errors.403    global_stats.errors = {}404def on_manager_report(client_id, data):405    timing_entry = global_stats.get_timings(client_id)406    record_stamp = int(time.time())407    for stats_data in data["stats"]:408        entry = StatsEntry.unserialize(stats_data)409        # for each entry get the max_response_time,410        # store that in a time stamped data object for that client.411        timing_entry.log(record_stamp, entry.max_response_time)412        request_key = (entry.name, entry.method)413        if not request_key in global_stats.entries:414            global_stats.entries[request_key] = StatsEntry(global_stats, entry.name, entry.method)415        global_stats.entries[request_key].extend(entry, full_request_history=True)416        global_stats.last_request_timestamp = max(global_stats.last_request_timestamp, entry.last_request_timestamp)417    for error_key, error in data["errors"].iteritems():418        if error_key not in global_stats.errors:419            global_stats.errors[error_key] = StatsError.from_dict(error)420        else:421            global_stats.errors[error_key].occurences += error["occurences"]422def on_request_slow(name, response_time, response_length):423    print "****** {2} slow request({0}): {1}".format(response_time, name, str(datetime.datetime.now()))424events.request_success += on_request_success425events.request_failure += on_request_failure426events.request_slow += on_request_slow427events.report_to_master += on_report_to_master...main.py
Source:main.py  
1import time2import threading3class RequestTokenBucket(object):4    """5    A class that models a token bucket to be used for rate limiting purposes.6    Attributes7    ----------8    max_tokens : int9        The maximum number of tokens the RequestTokenBucket object is able to hold at any given time.10        Defaults to 10.11    refill_rate : int12        The number of seconds it takes for a single token to be refilled into the RequestTokenBucket object.13        Defaults to 5.14    last_request_timestamp : int15        A timestamp of the last time a request was allowed by the bucket, represented as seconds since epoch.16    current_count : int17        The current number of tokens in the bucket.18    lock : Lock19        A Lock object that allows the bucket to be locked when multiple threads might be trying to make requests.20    Methods21    -------22    calculate_current_tokens():23        Determine the number of tokens a RequestTokenBucket object should have and set the object's current_count attribute to that value.24        Return without setting the current_count value if the RequestTokenBucket object does not have a last_request_timestamp value, as25        we assume this to mean no requests have been made to the bucket yet, and buckets are full of tokens when instantiated.26    27    print_bucket_summary():28        Print a formatted summary of the max token capacity, refill rate, current token count, and last request time of a RequestTokenBucket object.29    """30    def __init__(self, max_tokens=10, refill_rate=5):31        """32        Constructor for RequestTokenBucket objects.33        Initialize the last_request_timestamp attribute as None because we could not have made a request to a bucket that did not exist before.34        Initialize the current_count attribute to the value of max_tokens, as we want the RequestTokenBucket object to be full when instantiated.35        Parameters36        ----------37        max_tokens : int38            The maximum number of tokens the RequestTokenBucket object is able to hold at any given time.39            Defaults to 10.40        refill_rate : int41            The number of seconds it takes for a single token to be refilled into the RequestTokenBucket object.42            Defaults to 5.43        """44        self.max_tokens = max_tokens45        self.refill_rate = refill_rate46        self.last_request_timestamp = None47        self.current_count = max_tokens48        self.lock = threading.Lock()49    def calculate_current_tokens(self):50        """51        Determine the number of tokens a RequestTokenBucket object should have and set the object's current_count attribute to that value.52        Return without setting the current_count value if the RequestTokenBucket object does not have a last_request_timestamp value, as53        we assume this to mean no requests have been made to the bucket yet, and buckets are full of tokens when instantiated.54        Parameters55        ----------56        None57        Returns58        -------59        None60        """61        if self.last_request_timestamp is None:62            return63        tokens_since_last_request = self.__time_since_last_request() // self.refill_rate64        self.current_count = min(self.max_tokens, self.current_count + tokens_since_last_request)65    66    def print_bucket_summary(self):67        """68        Print a formatted summary of the max token capacity, refill rate, current token count, and last request time of a RequestTokenBucket object.69        Parameters70        ----------71        None72        Returns73        -------74        None75        """76        print("Max Token Capacity: {}".format(self.max_tokens))77        print("Refill Rate: {}".format(self.refill_rate))78        print("Current Token Count: {}".format(self.current_count))79        print("Last Request Time: {}".format(self.last_request_timestamp))80    @classmethod81    def __get_current_time_in_seconds(cls):82        """83        Return the current time represented in seconds since epoch.84        Parameters85        ----------86        None87        Returns88        -------89        An integer representing the current time as seconds since epoch.90        """91        return int(round(time.time()))92    93    def __time_since_last_request(self):94        """95        Return an integer representing the number of seconds since the last request made to a RequestTokenBucket object.96        Parameters97        ----------98        None99        Returns100        -------101        An integer representing the number of seconds since the last request made to a RequestTokenBucket object.102        """103        return self.__get_current_time_in_seconds() - self.last_request_timestamp104    105class TokenBucketRateLimiter(object):106    """107    A class that interacts with the RequestTokenBucket class to simulate rate limiting behavior. 108    Attributes109    ----------110    rate_limiter_dict : Dictionary111        A dictionary that maps an account ID (key) to a RequestTokenBucket object (value).112    Methods113    -------114    add_account(account_id, request_token_bucket):115        Add the account_id and request_token_bucket as a key-value pair to the TokenBucketRateLimiter object's rate_limiter_dict.116    117    allow_request_to_service(account_id):118        Determine if a request should be allowed based on the current token count of a bucket.119        Print information about the bucket being requested and if the request will be allowed.120        If allowed, perform the request in a thread-safe manner.121    """122    def __init__(self):123        """124        Constructor for TokenBucketRateLimiter objects.125        Parameters126        ----------127        None128        """129        self.rate_limiter_dict = {}130    def add_account(self, account_id, request_token_bucket):131        """132        Add the account_id and request_token_bucket as a key-value pair to the TokenBucketRateLimiter object's rate_limiter_dict.133        Parameters134        ----------135        account_id : int136            The ID of the account we are working with.137        request_token_bucket : RequestTokenBucket138            The token bucket we want to use to determine if requests from the account associated with account_id should be allowed or not.139        Returns140        -------141        None142        """143        self.rate_limiter_dict[account_id] = request_token_bucket144    def allow_request_to_service(self, account_id):145        """146        Calculate the current number of tokens in the bucket associated with account_id, allow the request if there are enough tokens to do so, reject the request if not.147        Print information regarding which thread is making a request, which bucket they are requesting, and the current token count of the bucket being requested.148        Parameters149        ----------150        account_id : int151            The ID of the account we are working with.152        Returns153        -------154        A boolean denoting if the request was allowed or not.155        """156        token_bucket = self.rate_limiter_dict[account_id]157        # Lock the bucket so that we do not have concurrency issues that result in us bypassing the rate limit.158        with token_bucket.lock:159            token_bucket.calculate_current_tokens()160            print("**** {} is making a request to bucket {}****".format(threading.current_thread().name, account_id))161            print("Current Tokens for Bucket {}: {}".format(account_id, token_bucket.current_count))162            # Allow the request if the token bucket has at least 1 token.163            # If the request is allowed, we will update the bucket's last_request_timestamp and remove a token from the bucket.164            if token_bucket.current_count > 0:165                print("Processing request\n")166                token_bucket.last_request_timestamp = int(round(time.time()))167                token_bucket.current_count -= 1168                return True169            else:170                print("Not enough tokens to process request. Please try again in {} seconds.\n".format(token_bucket.refill_rate))171        return False172        173def simulate_requests(rate_limiter):174    """175    Takes in a TokenBucketRateLimiter object and simulates making requests to buckets within that object for 60 seconds.176    Parameters177    ----------178    rate_limiter : TokenBucketRateLimiter179        A TokenBucketRateLimiter containing buckets we want to simulate the rate limiting capabilities of.180    Returns181    -------182    None183    """184    end_time = time.time() + 60185    while time.time() < end_time:186        rate_limiter.allow_request_to_service(1)187        rate_limiter.allow_request_to_service(2)188        time.sleep(3)189rate_limiter = TokenBucketRateLimiter()190max_tokens_input = int(input("Please specify the maximum number of tokens the first bucket should have: "))191refill_rate_input = int(input("Please specify the rate in seconds at which a single token should be refilled into the first bucket: "))192rate_limiter.add_account(1, RequestTokenBucket(max_tokens_input, refill_rate_input))193print("**** Summary of Specs for Token Bucket One ****")194rate_limiter.rate_limiter_dict[1].print_bucket_summary()195print("\n")196rate_limiter.add_account(2, RequestTokenBucket(5, 10))197print("**** Summary of Specs for Token Bucket Two ****")198rate_limiter.rate_limiter_dict[2].print_bucket_summary()199print("\n")200if __name__=="__main__":201    thread1 = threading.Thread(target=simulate_requests, args=(rate_limiter,))202    thread2 = threading.Thread(target=simulate_requests, args=(rate_limiter,))203    thread1.start()204    thread2.start()205    thread1.join()206    thread2.join()...chs_wrapper_class.py
Source:chs_wrapper_class.py  
1import requests2import json3import datetime4import time56class CompaniesHouseService:7    """A wrapper around the companies house API.8    9    Attributes:10        search_url (str): Base url for Companies House search query.11        company_url (str): Base url for Companies House company query.12        13    """14    search_url = "https://api.companieshouse.gov.uk/search/companies?q={}"15    company_url = "https://api.companieshouse.gov.uk/company/{}"16    17    def __init__(self, key, time_between_requests=0.5):18        """19        Args:20            key (str): The API key issued in the Companies House API 21                applications.22            time_between_requests (float): Time in seconds between requests to 23                the API to prevent spam. Default is 0.5 to prevent calls 24                exceeding the 600 per 5 minutes limit.25            26        """27        self.key = key28        self.time_between_requests = time_between_requests29        30        #: datetime: Timestamp instantiated as NoneType 31        self.last_request_timestamp = None32    33    def _query_ch_api(self, url, query):34        """Sends a request to the Companies House API.35        36        Args:37            url (str): The specific url to be queried depending on the type38                of request (search, profile etc.).39            query (str): The query parameter to be sent alongside the url.40        41        Returns:42            dict: A structured dictionary containing all of the information43                returned by the API.44                45        """46        query = self._remove_problem_characters(query)47        48        self._rate_limiting()4950        resultQuery = requests.get(url.format(query),auth=(self.key,''))51        #200 is the authorised code for RESTful API calls52        if resultQuery.status_code == 200:53            result = json.JSONDecoder().decode(resultQuery.text)54        else:55            print(f"Failed with error code: {resultQuery.status_code} | "\56                  f"Reason: {resultQuery.reason}")57            result = {}58        59        return result60    61    def _rate_limiting(self):62        """Waits up to the defined time between requests.63        64        If more than the defined "time_between_requests" has passed (in 65        seconds) since the last call, this function will not wait any time.66        The last_request_timestamp class variable is reset to the current67        time every time this method is called.68        69        """70        if self.last_request_timestamp is None:71            self.last_request_timestamp = datetime.datetime.now()72            73        else:74            current_time = datetime.datetime.now()75            76            time_since_request = (current_time - 77                                  self.last_request_timestamp78                                  ).total_seconds()79            80            wait_time = max(self.time_between_requests - 81                            time_since_request,82                            0)83            84            time.sleep(wait_time)85            self.last_request_timestamp = datetime.datetime.now()   86            87    def _remove_problem_characters(self, string):88        """Remove invalid query parameters from the url query89        90        Spaces and the "&" sign will cause issues in an HTTP request so are91        replaced.92        93        Args:94            string (str): The query to be "cleaned".95            96        Returns:97            str: An equivalent string in HTTP GET format98        99        """100        string = string.replace(" ","+")101        string = string.replace("&","%26")102        103        return string104    105    def get_first_company_search(self, company_name):106        """Search for a company and return the top result.107        108        If no results are returned from the Companies House API then returns109        NoneType using a try block.110        111        Args:112            companyName (str): The company to search for.113            114        Returns:115            dict: The profile of the first result found from the API search.116        117        """118        search_result = self._query_ch_api(self.search_url, company_name)119        120        try: 121            first_result = search_result["items"][0]122        except IndexError:123            first_result = None124        125        return first_result126    127    def get_company_profile(self, company_number):128        """Return a company profile from the company number.129        130        Args:131            company_number (str): The unique company number as defined on132                Companies House.133                134        Returns:135            dict: The profile of the corresponding company136        137        """138        company_profile = self._query_ch_api(self.company_url, company_number)139        140        return company_profile141    142if __name__ == "__main__":143    key = "d8d5f785-9b19-4873-b1a0-22659560f7b9" 144    ch_api = CompaniesHouseService(key)145    iterations = 10146    147    tic = datetime.datetime.now()148    149    for company in range(iterations): 150        ch_profile = ch_api.get_company_profile("07958752")151        152    toc = datetime.datetime.now()153    154    time_taken = (toc - tic).total_seconds()155    print(f"Average time per iteration: "\
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
