Best Python code snippet using locust
server.py
Source:server.py  
1# Copyright (C) 2019 Garmin Ltd.2#3# SPDX-License-Identifier: GPL-2.0-only4#5from contextlib import closing6from datetime import datetime7import asyncio8import json9import logging10import math11import os12import signal13import socket14import time15from . import chunkify, DEFAULT_MAX_CHUNK16logger = logging.getLogger('hashserv.server')17class Measurement(object):18    def __init__(self, sample):19        self.sample = sample20    def start(self):21        self.start_time = time.perf_counter()22    def end(self):23        self.sample.add(time.perf_counter() - self.start_time)24    def __enter__(self):25        self.start()26        return self27    def __exit__(self, *args, **kwargs):28        self.end()29class Sample(object):30    def __init__(self, stats):31        self.stats = stats32        self.num_samples = 033        self.elapsed = 034    def measure(self):35        return Measurement(self)36    def __enter__(self):37        return self38    def __exit__(self, *args, **kwargs):39        self.end()40    def add(self, elapsed):41        self.num_samples += 142        self.elapsed += elapsed43    def end(self):44        if self.num_samples:45            self.stats.add(self.elapsed)46            self.num_samples = 047            self.elapsed = 048class Stats(object):49    def __init__(self):50        self.reset()51    def reset(self):52        self.num = 053        self.total_time = 054        self.max_time = 055        self.m = 056        self.s = 057        self.current_elapsed = None58    def add(self, elapsed):59        self.num += 160        if self.num == 1:61            self.m = elapsed62            self.s = 063        else:64            last_m = self.m65            self.m = last_m + (elapsed - last_m) / self.num66            self.s = self.s + (elapsed - last_m) * (elapsed - self.m)67        self.total_time += elapsed68        if self.max_time < elapsed:69            self.max_time = elapsed70    def start_sample(self):71        return Sample(self)72    @property73    def average(self):74        if self.num == 0:75            return 076        return self.total_time / self.num77    @property78    def stdev(self):79        if self.num <= 1:80            return 081        return math.sqrt(self.s / (self.num - 1))82    def todict(self):83        return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}84class ClientError(Exception):85    pass86class ServerClient(object):87    FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'88    ALL_QUERY =  'SELECT *                         FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'89    def __init__(self, reader, writer, db, request_stats):90        self.reader = reader91        self.writer = writer92        self.db = db93        self.request_stats = request_stats94        self.max_chunk = DEFAULT_MAX_CHUNK95        self.handlers = {96            'get': self.handle_get,97            'report': self.handle_report,98            'report-equiv': self.handle_equivreport,99            'get-stream': self.handle_get_stream,100            'get-stats': self.handle_get_stats,101            'reset-stats': self.handle_reset_stats,102            'chunk-stream': self.handle_chunk,103        }104    async def process_requests(self):105        try:106            self.addr = self.writer.get_extra_info('peername')107            logger.debug('Client %r connected' % (self.addr,))108            # Read protocol and version109            protocol = await self.reader.readline()110            if protocol is None:111                return112            (proto_name, proto_version) = protocol.decode('utf-8').rstrip().split()113            if proto_name != 'OEHASHEQUIV':114                return115            proto_version = tuple(int(v) for v in proto_version.split('.'))116            if proto_version < (1, 0) or proto_version > (1, 1):117                return118            # Read headers. Currently, no headers are implemented, so look for119            # an empty line to signal the end of the headers120            while True:121                line = await self.reader.readline()122                if line is None:123                    return124                line = line.decode('utf-8').rstrip()125                if not line:126                    break127            # Handle messages128            while True:129                d = await self.read_message()130                if d is None:131                    break132                await self.dispatch_message(d)133                await self.writer.drain()134        except ClientError as e:135            logger.error(str(e))136        finally:137            self.writer.close()138    async def dispatch_message(self, msg):139        for k in self.handlers.keys():140            if k in msg:141                logger.debug('Handling %s' % k)142                if 'stream' in k:143                    await self.handlers[k](msg[k])144                else:145                    with self.request_stats.start_sample() as self.request_sample, \146                            self.request_sample.measure():147                        await self.handlers[k](msg[k])148                return149        raise ClientError("Unrecognized command %r" % msg)150    def write_message(self, msg):151        for c in chunkify(json.dumps(msg), self.max_chunk):152            self.writer.write(c.encode('utf-8'))153    async def read_message(self):154        l = await self.reader.readline()155        if not l:156            return None157        try:158            message = l.decode('utf-8')159            if not message.endswith('\n'):160                return None161            return json.loads(message)162        except (json.JSONDecodeError, UnicodeDecodeError) as e:163            logger.error('Bad message from client: %r' % message)164            raise e165    async def handle_chunk(self, request):166        lines = []167        try:168            while True:169                l = await self.reader.readline()170                l = l.rstrip(b"\n").decode("utf-8")171                if not l:172                    break173                lines.append(l)174            msg = json.loads(''.join(lines))175        except (json.JSONDecodeError, UnicodeDecodeError) as e:176            logger.error('Bad message from client: %r' % message)177            raise e178        if 'chunk-stream' in msg:179            raise ClientError("Nested chunks are not allowed")180        await self.dispatch_message(msg)181    async def handle_get(self, request):182        method = request['method']183        taskhash = request['taskhash']184        if request.get('all', False):185            row = self.query_equivalent(method, taskhash, self.ALL_QUERY)186        else:187            row = self.query_equivalent(method, taskhash, self.FAST_QUERY)188        if row is not None:189            logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))190            d = {k: row[k] for k in row.keys()}191            self.write_message(d)192        else:193            self.write_message(None)194    async def handle_get_stream(self, request):195        self.write_message('ok')196        while True:197            l = await self.reader.readline()198            if not l:199                return200            try:201                # This inner loop is very sensitive and must be as fast as202                # possible (which is why the request sample is handled manually203                # instead of using 'with', and also why logging statements are204                # commented out.205                self.request_sample = self.request_stats.start_sample()206                request_measure = self.request_sample.measure()207                request_measure.start()208                l = l.decode('utf-8').rstrip()209                if l == 'END':210                    self.writer.write('ok\n'.encode('utf-8'))211                    return212                (method, taskhash) = l.split()213                #logger.debug('Looking up %s %s' % (method, taskhash))214                row = self.query_equivalent(method, taskhash, self.FAST_QUERY)215                if row is not None:216                    msg = ('%s\n' % row['unihash']).encode('utf-8')217                    #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))218                else:219                    msg = '\n'.encode('utf-8')220                self.writer.write(msg)221            finally:222                request_measure.end()223                self.request_sample.end()224            await self.writer.drain()225    async def handle_report(self, data):226        with closing(self.db.cursor()) as cursor:227            cursor.execute('''228                -- Find tasks with a matching outhash (that is, tasks that229                -- are equivalent)230                SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND outhash=:outhash231                -- If there is an exact match on the taskhash, return it.232                -- Otherwise return the oldest matching outhash of any233                -- taskhash234                ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,235                    created ASC236                -- Only return one row237                LIMIT 1238                ''', {k: data[k] for k in ('method', 'outhash', 'taskhash')})239            row = cursor.fetchone()240            # If no matching outhash was found, or one *was* found but it241            # wasn't an exact match on the taskhash, a new entry for this242            # taskhash should be added243            if row is None or row['taskhash'] != data['taskhash']:244                # If a row matching the outhash was found, the unihash for245                # the new taskhash should be the same as that one.246                # Otherwise the caller provided unihash is used.247                unihash = data['unihash']248                if row is not None:249                    unihash = row['unihash']250                insert_data = {251                    'method': data['method'],252                    'outhash': data['outhash'],253                    'taskhash': data['taskhash'],254                    'unihash': unihash,255                    'created': datetime.now()256                }257                for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):258                    if k in data:259                        insert_data[k] = data[k]260                cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % (261                    ', '.join(sorted(insert_data.keys())),262                    ', '.join(':' + k for k in sorted(insert_data.keys()))),263                    insert_data)264                self.db.commit()265                logger.info('Adding taskhash %s with unihash %s',266                            data['taskhash'], unihash)267                d = {268                    'taskhash': data['taskhash'],269                    'method': data['method'],270                    'unihash': unihash271                }272            else:273                d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}274        self.write_message(d)275    async def handle_equivreport(self, data):276        with closing(self.db.cursor()) as cursor:277            insert_data = {278                'method': data['method'],279                'outhash': "",280                'taskhash': data['taskhash'],281                'unihash': data['unihash'],282                'created': datetime.now()283            }284            for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):285                if k in data:286                    insert_data[k] = data[k]287            cursor.execute('''INSERT OR IGNORE INTO tasks_v2 (%s) VALUES (%s)''' % (288                ', '.join(sorted(insert_data.keys())),289                ', '.join(':' + k for k in sorted(insert_data.keys()))),290                insert_data)291            self.db.commit()292            # Fetch the unihash that will be reported for the taskhash. If the293            # unihash matches, it means this row was inserted (or the mapping294            # was already valid)295            row = self.query_equivalent(data['method'], data['taskhash'], self.FAST_QUERY)296            if row['unihash'] == data['unihash']:297                logger.info('Adding taskhash equivalence for %s with unihash %s',298                                data['taskhash'], row['unihash'])299            d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}300        self.write_message(d)301    async def handle_get_stats(self, request):302        d = {303            'requests': self.request_stats.todict(),304        }305        self.write_message(d)306    async def handle_reset_stats(self, request):307        d = {308            'requests': self.request_stats.todict(),309        }310        self.request_stats.reset()311        self.write_message(d)312    def query_equivalent(self, method, taskhash, query):313        # This is part of the inner loop and must be as fast as possible314        try:315            cursor = self.db.cursor()316            cursor.execute(query, {'method': method, 'taskhash': taskhash})317            return cursor.fetchone()318        except:319            cursor.close()320class Server(object):321    def __init__(self, db, loop=None):322        self.request_stats = Stats()323        self.db = db324        if loop is None:325            self.loop = asyncio.new_event_loop()326            self.close_loop = True327        else:328            self.loop = loop329            self.close_loop = False330        self._cleanup_socket = None331    def start_tcp_server(self, host, port):332        self.server = self.loop.run_until_complete(333            asyncio.start_server(self.handle_client, host, port, loop=self.loop)334        )335        for s in self.server.sockets:336            logger.info('Listening on %r' % (s.getsockname(),))337            # Newer python does this automatically. Do it manually here for338            # maximum compatibility339            s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)340            s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)341        name = self.server.sockets[0].getsockname()342        if self.server.sockets[0].family == socket.AF_INET6:343            self.address = "[%s]:%d" % (name[0], name[1])344        else:345            self.address = "%s:%d" % (name[0], name[1])346    def start_unix_server(self, path):347        def cleanup():348            os.unlink(path)349        cwd = os.getcwd()350        try:351            # Work around path length limits in AF_UNIX352            os.chdir(os.path.dirname(path))353            self.server = self.loop.run_until_complete(354                asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop)355            )356        finally:357            os.chdir(cwd)358        logger.info('Listening on %r' % path)359        self._cleanup_socket = cleanup360        self.address = "unix://%s" % os.path.abspath(path)361    async def handle_client(self, reader, writer):362        # writer.transport.set_write_buffer_limits(0)363        try:364            client = ServerClient(reader, writer, self.db, self.request_stats)365            await client.process_requests()366        except Exception as e:367            import traceback368            logger.error('Error from client: %s' % str(e), exc_info=True)369            traceback.print_exc()370            writer.close()371        logger.info('Client disconnected')372    def serve_forever(self):373        def signal_handler():374            self.loop.stop()375        self.loop.add_signal_handler(signal.SIGTERM, signal_handler)376        try:377            self.loop.run_forever()378        except KeyboardInterrupt:379            pass380        self.server.close()381        self.loop.run_until_complete(self.server.wait_closed())382        logger.info('Server shutting down')383        if self.close_loop:384            self.loop.close()385        if self._cleanup_socket is not None:...stats.py
Source:stats.py  
1from flask import current_app2from app.server.util.utils import timestamp3# We use a list to calculate requests per second4request_stats = []5def add_request():6    t = timestamp()7    while len(request_stats) > 0 and request_stats[0] < t - current_app.config['REQUEST_STATS_WINDOW']:8        del request_stats[0]9    request_stats.append(t)10def requests_per_second():...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
