Best Python code snippet using locust
graphs.py
Source:graphs.py  
1from parsing import *2import matplotlib.pyplot as plt3import os4def make_graph(app, storage, test_name, test_size, dfs, y_axis_label, graph_type, \5    x_axis_label = 'Tempo desde inÃcio do benchmark (HH:MM:SS)', rotate_xlabel = True, close = True):6    fig, ax = plt.subplots(figsize = (16, 10))7    if type(dfs) == list:8        for df in dfs:9            df.plot(ax = ax, legend = len(dfs) > 1)10    elif callable(dfs):11        dfs(ax)12    ax.set_xlabel(x_axis_label, fontdict={'fontsize':12})13    ax.set_ylabel(y_axis_label, fontdict={'fontsize':12})14    if rotate_xlabel:15        ax.tick_params(axis = 'x', labelrotation = 45)16    for tick in ax.xaxis.get_major_ticks():17        tick.label1.set_fontsize(14)18        tick.label119    for tick in ax.yaxis.get_major_ticks():20        tick.label1.set_fontsize(14)21        tick.label122    filepath = f'{app.lower()}/{storage.replace("/", "_").lower()}/{test_name}_{test_size}'23    os.makedirs(filepath, exist_ok = True)24    fig.savefig(f'{filepath}/{test_name}_{test_size}_{graph_type}.png', transparent = True, bbox_inches = 'tight')25    if close:26        plt.close(fig)27    else:28        plt.show()29def make_system_graphs(app, storage, test_name, test_size, close = True):30    df = read_dfs_system(app, storage, test_name, test_size)[['cpu_%', 'mem_%', 'net_recv_mb', 'net_send_mb']]31    cpu_usage = df.reset_index() \32        .pivot(index = 'time', columns = 'machine', values = ['cpu_%'])['cpu_%'] \33        .median(axis = 1) \34        .rolling(2).mean() \35        .dropna()36    37    mem_usage = df.reset_index() \38        .pivot(index = 'time', columns = 'machine', values = ['mem_%'])['mem_%'] \39        .median(axis = 1) \40        .rolling(2).mean() \41        .dropna()42    make_graph(app, storage, test_name, test_size,43        [cpu_usage],44        y_axis_label = 'Utilização do CPU (%)',45        graph_type = 'cpu',46        close = close47    )48    make_graph(app, storage, test_name, test_size,49        [mem_usage],50        y_axis_label = 'Utilização da RAM (%)',51        graph_type = 'mem',52        close = close53    )54    usage_net_recv = df.reset_index() \55        .pivot(index = 'time', columns = 'machine', values = ['net_recv_mb'])['net_recv_mb'] \56        .sum(axis = 1) \57        .rolling(2).mean() \58        .dropna()59    usage_net_recv.name = 'Network RX'60    61    usage_net_send = df.reset_index() \62        .pivot(index = 'time', columns = 'machine', values = ['net_send_mb'])['net_send_mb'] \63        .sum(axis = 1) \64        .rolling(2).mean() \65        .dropna()66    usage_net_send.name = 'Network TX'67    make_graph(app, storage, test_name, test_size,68        [usage_net_recv, usage_net_send],69        y_axis_label = 'Utilização da rede (Mb/s)',70        graph_type = 'net',71        close = close72    )73def make_disk_nfs_graph(app, test_name, test_size, close = True):74    df = read_df_io(app, 'nfs', test_name, test_size, 'cloud108').loc['cloud108']75    read_series = df['read_mb'] \76        .rolling(2).mean() \77        .dropna()78    read_series.name = 'Leitura'79    write_series = df['write_mb'] \80        .rolling(2).mean() \81        .dropna()82    write_series.name = 'Escrita'83    make_graph(app, 'NFS', test_name, test_size,84        [read_series, write_series],85        y_axis_label = 'Utilização do disco (MB/s)',86        graph_type = 'disk',87        close = close88    )89def make_disk_ceph_graph(app, test_name, test_size, close = True):90    df = read_dfs_io(app, 'ceph', test_name, test_size)91    read_series = df.reset_index() \92        .pivot(index = 'time', columns = 'machine', values = ['read_mb'])['read_mb'] \93        .sum(axis = 1) \94        .rolling(2).mean() \95        .dropna()96    read_series.name = 'Leitura'97    write_series = df.reset_index() \98        .pivot(index = 'time', columns = 'machine', values = ['write_mb'])['write_mb'] \99        .sum(axis = 1) \100        .rolling(2).mean() \101        .dropna()102    write_series.name = 'Escrita'103    make_graph(app, 'Ceph', test_name, test_size,104        [read_series, write_series],105        y_axis_label = 'Utilização do disco (MB/s)',106        graph_type = 'disk',107        close = close108    )109def make_disk_comparation_graph(app, test_name, test_size, close = True, type = 'write'):110    df_nfs = read_df_io(app, 'nfs', test_name, test_size, 'cloud108').loc['cloud108']111    df_ceph = read_dfs_io(app, 'ceph', test_name, test_size)112    series_nfs = df_nfs[type + '_mb'] \113        .rolling(2).mean() \114        .dropna()115    series_nfs.name = 'Leitura NFS' if type == 'read' else 'Escrita NFS'116    series_ceph = df_ceph.reset_index() \117        .pivot(index = 'time', columns = 'machine', values = [type + '_mb'])[type + '_mb'] \118        .sum(axis = 1) \119        .rolling(2).mean() \120        .dropna()121    series_ceph.name = 'Leitura Ceph' if type == 'read' else 'Escrita Ceph'122    make_graph(app, 'NFS/Ceph', test_name, test_size,123        [series_nfs, series_ceph],124        y_axis_label = 'Utilização do disco (MB/s)',125        graph_type = 'disk',126        close = close127    )128def make_locust_report_graphs(app, storage, test_name, test_size, close = True):129    df = read_df_reporthtml(app, storage, test_name, test_size)[['current_rps', 'current_fail_per_sec', 'response_time_percentile_50']]130    rps_series = df['current_rps'].rolling(2).mean().dropna()131    rps_series.name = 'Pedidos por segundo'132    133    fps_series = df['current_fail_per_sec'].rolling(2).mean().dropna()134    fps_series.name = 'Falhas por segundo'135    responsetimes_series = df['response_time_percentile_50'].rolling(2).mean().dropna()136    make_graph(app, storage, test_name, test_size,137        [rps_series, fps_series],138        y_axis_label = 'Throughput (nº pedidos/s)',139        graph_type = 'rpsfps',140        close = close141    )142    make_graph(app, storage, test_name, test_size,143        [responsetimes_series],144        y_axis_label = 'Tempo de resposta (ms)',145        graph_type = 'responsetimes',146        close = close147    )148def make_locust_csv_graphs(app, storage, test_name, close = True):149    df = read_df_locustcsv(app, storage, test_name)150    def draw(ax):151        df.plot(ax = ax, marker = 'o')152    153    make_graph(app, storage, test_name, 'all',154        draw,155        x_axis_label = 'Tempo de resposta (s)',156        y_axis_label = 'Throughput (nº pedidos/s)',157        graph_type = 'rps_of_responsetimes',158        close = close,159        rotate_xlabel = False160    )161def make_reqfails_comparation_graph(app, test_name, test_size, close = True):162    df_nfs = read_df_reporthtml(app, 'NFS', test_name, test_size)[['current_rps', 'current_fail_per_sec']]163    df_ceph = read_df_reporthtml(app, 'Ceph', test_name, test_size)[['current_rps', 'current_fail_per_sec']]164    rps_series_nfs = df_nfs['current_rps'] # .rolling(2).mean().dropna()165    rps_series_nfs.name = 'NFS'166    # rps_series_nfs.name = 'Pedidos por segundo (NFS)'167    168    # fps_series_nfs = df_nfs['current_fail_per_sec'] # .rolling(2).mean().dropna()169    # fps_series_nfs.name = 'Falhas por segundo (NFS)'170    rps_series_ceph = df_ceph['current_rps'] # .rolling(2).mean().dropna()171    rps_series_ceph.name = 'Ceph'172    # rps_series_ceph.name = 'Pedidos por segundo (Ceph)'173    174    # fps_series_ceph = df_ceph['current_fail_per_sec'] # .rolling(2).mean().dropna()175    # fps_series_ceph.name = 'Falhas por segundo (Ceph)'176    def draw(ax):177        rps_series_nfs.plot(ax = ax, linestyle = '-', color = 'blue', legend = True)178        # fps_series_nfs.plot(ax = ax, linestyle = '-', color = 'blue', legend = True)179        rps_series_ceph.plot(ax = ax, linestyle = '-', color = 'red', legend = True)180        # fps_series_ceph.plot(ax = ax, linestyle = '-', color = 'red', legend = True)181    make_graph(app, 'NFS/Ceph', test_name, test_size,182        draw,183        y_axis_label = 'Throughput (nº pedidos/s)',184        graph_type = 'rpsfps',185        close = close186    )187def make_responsetimes_comparation_graph(app, test_name, test_size, close = True):188    df_nfs = read_df_reporthtml(app, 'NFS', test_name, test_size)[['response_time_percentile_50']]189    df_ceph = read_df_reporthtml(app, 'Ceph', test_name, test_size)[['response_time_percentile_50']]190    responsetimes_series_nfs = df_nfs['response_time_percentile_50']191    responsetimes_series_nfs.name = 'NFS'192    responsetimes_series_ceph = df_ceph['response_time_percentile_50']193    responsetimes_series_ceph.name = 'Ceph'194    def draw(ax):195        responsetimes_series_nfs.plot(ax = ax, linestyle = '-', color = 'blue', legend = True)196        responsetimes_series_ceph.plot(ax = ax, linestyle = '-', color = 'red', legend = True)197    198    make_graph(app, 'NFS/Ceph', test_name, test_size,199        draw,200        y_axis_label = 'Tempo de resposta (ms)',201        graph_type = 'responsetimes',202        close = close...prometheus_exporter.py
Source:prometheus_exporter.py  
1# coding: utf82import six3from itertools import chain4from flask import request, Response5from locust import stats as locust_stats, runners as locust_runners6from locust import User, task, events7from prometheus_client import Metric, REGISTRY, exposition8# This locustfile adds an external web endpoint to the locust master, and makes it serve as a prometheus exporter.9# Runs it as a normal locustfile, then points prometheus to it.10# locust -f prometheus_exporter.py --master11# Lots of code taken from [mbolek's locust_exporter](https://github.com/mbolek/locust_exporter), thx mbolek!12class LocustCollector(object):13    registry = REGISTRY14    def __init__(self, environment, runner):15        self.environment = environment16        self.runner = runner17    def collect(self):18        # collect metrics only when locust runner is hatching or running.19        runner = self.runner20        if runner and runner.state in (locust_runners.STATE_HATCHING, locust_runners.STATE_RUNNING):21            stats = []22            for s in chain(locust_stats.sort_stats(runner.stats.entries), [runner.stats.total]):23                stats.append({24                    "method": s.method,25                    "name": s.name,26                    "num_requests": s.num_requests,27                    "num_failures": s.num_failures,28                    "avg_response_time": s.avg_response_time,29                    "min_response_time": s.min_response_time or 0,30                    "max_response_time": s.max_response_time,31                    "current_rps": s.current_rps,32                    "median_response_time": s.median_response_time,33                    "ninetieth_response_time": s.get_response_time_percentile(0.9),34                    # only total stats can use current_response_time, so sad.35                    #"current_response_time_percentile_95": s.get_current_response_time_percentile(0.95),36                    "avg_content_length": s.avg_content_length,37                    "current_fail_per_sec": s.current_fail_per_sec38                })39            # perhaps StatsError.parse_error in e.to_dict only works in python slave, take notices!40            errors = [e.to_dict() for e in six.itervalues(runner.stats.errors)]41            metric = Metric('locust_user_count', 'Swarmed users', 'gauge')42            metric.add_sample('locust_user_count', value=runner.user_count, labels={})43            yield metric44            45            metric = Metric('locust_errors', 'Locust requests errors', 'gauge')46            for err in errors:47                metric.add_sample('locust_errors', value=err['occurrences'],48                                  labels={'path': err['name'], 'method': err['method'],49                                          'error': err['error']})50            yield metric51            is_distributed = isinstance(runner, locust_runners.MasterRunner)52            if is_distributed:53                metric = Metric('locust_slave_count', 'Locust number of slaves', 'gauge')54                metric.add_sample('locust_slave_count', value=len(runner.clients.values()), labels={})55                yield metric56            metric = Metric('locust_fail_ratio', 'Locust failure ratio', 'gauge')57            metric.add_sample('locust_fail_ratio', value=runner.stats.total.fail_ratio, labels={})58            yield metric59            metric = Metric('locust_state', 'State of the locust swarm', 'gauge')60            metric.add_sample('locust_state', value=1, labels={'state': runner.state})61            yield metric62            stats_metrics = ['avg_content_length', 'avg_response_time', 'current_rps', 'current_fail_per_sec',63                             'max_response_time', 'ninetieth_response_time', 'median_response_time', 'min_response_time',64                             'num_failures', 'num_requests']65            for mtr in stats_metrics:66                mtype = 'gauge'67                if mtr in ['num_requests', 'num_failures']:68                    mtype = 'counter'69                metric = Metric('locust_stats_' + mtr, 'Locust stats ' + mtr, mtype)70                for stat in stats:71                    # Aggregated stat's method label is None, so name it as Aggregated72                    # locust has changed name Total to Aggregated since 0.12.173                    if 'Aggregated' != stat['name']:74                        metric.add_sample('locust_stats_' + mtr, value=stat[mtr],75                                          labels={'path': stat['name'], 'method': stat['method']})76                    else:77                        metric.add_sample('locust_stats_' + mtr, value=stat[mtr],78                                          labels={'path': stat['name'], 'method': 'Aggregated'})79                yield metric80@events.init.add_listener81def locust_init(environment, runner, **kwargs):82    print("locust init event received")83    if environment.web_ui and runner:84        @environment.web_ui.app.route("/export/prometheus")85        def prometheus_exporter():86            registry = REGISTRY87            encoder, content_type = exposition.choose_encoder(request.headers.get('Accept'))88            if 'name[]' in request.args:89                registry = REGISTRY.restricted_registry(request.args.get('name[]'))90            body = encoder(registry)91            return Response(body, content_type=content_type)92        REGISTRY.register(LocustCollector(environment, runner))93class Dummy(User):94    @task(20)95    def hello(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
