Best Python code snippet using avocado_python
main.py
Source:main.py  
1from __future__ import division2import pandas as pd3import pickle as pkl4import numpy as np5import os, sys6import multiprocessing as mp7import logging8logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')9logger = logging.getLogger()10logger.setLevel('DEBUG')11from plotter import *12CHANGE_TIMEZONE = 0     # for final_dw and final_up convert time during sanitization13INPUTPATH = "/data/users/sarthak/comcast-data/separated/"14OUTPUTPATH = "/data/users/sarthak/comcast-data/plots/"15#OUTPUTPATH = "/data/users/sarthak/comcast-analysis/plots/"16#OUTPUTPATH = "~/public_html/files/comcast/plots/"17PROCESSEDPATH = "/data/users/sarthak/comcast-data/process/"18if not os.path.exists(OUTPUTPATH):19    os.makedirs(OUTPUTPATH)20# TODO make this class: easier to manage const21def init_setup(folder):22    CURPATH = INPUTPATH + folder + '/'23    PLOTPATH = OUTPUTPATH + folder + '/'24    PROCPATH = PROCESSEDPATH + folder + '/'25    if not os.path.exists(PLOTPATH):26        os.makedirs(PLOTPATH)27    if not os.path.exists(PROCPATH):28        os.makedirs(PROCPATH)29    logger.debug("load test and control sets from " + CURPATH)30    try:31        test_full = pd.read_pickle(CURPATH + "test.pkl")32        control_full = pd.read_pickle(CURPATH + "control.pkl")33    except Exception:34        logger.error(INPUTPATH + folder + " doesn't have the files needed")35        raise36    # CHANGE TIMEZONE TO MST37    if CHANGE_TIMEZONE:38        test_full['datetime']-=datetime.timedelta(hours=6)39        control_full['datetime']-=datetime.timedelta(hours=6)40    # Add date and time41    logger.info("Add the time column for datasets")42    if 'time' not in test_full.columns:43        test_full['time'] = test_full.set_index('datetime').index.time44        #test_full['time'] = test_full.set_index('datetime').resample('H').index.time45    if 'time' not in control_full.columns:46        control_full['time'] = control_full.set_index('datetime').index.time47    if 'date' not in test_full.columns:48        #test_full['date'] = test_full.set_index('datetime').resample('D').index49        test_full['date'] = test_full.set_index('datetime').index.date50    if 'date' not in control_full.columns:51        control_full['date'] = control_full.set_index('datetime').index.date52    logger.info("Done adding time column for datasets")53    return CURPATH, PLOTPATH, PROCPATH, test_full, control_full54def primetime(test_full, control_full, PLOTPATH):55    #mp_plotter('test_dw')56    logger.debug("get average (summed) peak primetime at different times")57    peak_t, nonpeak_t, peak_c, nonpeak_c = get_peak_nonpeak_series(test_full, control_full, PLOTPATH)58    field = 'octets_passed'59    logger.debug("get primetime ratio using field="+field)60    r_test, r_control = get_primetime_ratio(peak_t, nonpeak_t, peak_c, nonpeak_c)61    del peak_t, nonpeak_t, peak_c, nonpeak_c62    #logger.debug("draw a scatter plot of device vs datetime with colormap for ratio")63    #plot_primetime_ratio_scatter(r_test, r_control, PLOTPATH)64    param='all1'65    logger.debug("plot prime time ratio by date, group devices by "+param)66    plot_primetime_ratio_by_date(r_test, r_control, param, PLOTPATH)67    logger.debug("plot primetime ratio per device, group dates by "+param)68    plot_primetime_ratio_per_device(r_test, r_control, param, PLOTPATH)69    param = 'all2'70    logger.debug("plot prime time ratio by date, group devices by "+param)71    plot_primetime_ratio_by_date(r_test, r_control, param, PLOTPATH)72    logger.debug("plot primetime ratio per device, group dates by "+param)73    plot_primetime_ratio_per_device(r_test, r_control, param, PLOTPATH)74    del r_test, r_control75    return76def initial_timeseries(test_full, control_full, PLOTPATH):77    g1 = test_full.groupby("datetime")78    g2 = control_full.groupby("datetime")79    logger.debug("plot initial time series")80    for param in ['sum', 'max', 'perc90', 'mean', 'median']:81        plot_initial_timeseries(g1, g2, param, PLOTPATH)82    del g1, g283    return84def peak_ratio(test_full, control_full, PROCPATH, PLOTPATH):85    # throughput stats calculation per device per day86    if not os.path.isfile(PROCPATH + 'tps1.pkl'):87        logger.debug("Calculate throughput stats per device for test")88        tps1 = throughput_stats_per_device_per_date(test_full)89        tps1.to_pickle(PROCPATH + 'tps1.pkl')90        logger.debug("Calculate throughput stats per device for control")91        tps2 = throughput_stats_per_device_per_date(control_full)92        tps2.to_pickle(PROCPATH + 'tps2.pkl')93    else:94        logger.debug("Load throughput stats per device for test")95        tps1 = pd.read_pickle(PROCPATH + 'tps1.pkl')96        logger.debug("Load throughput stats per device for control")97        tps2 = pd.read_pickle(PROCPATH + 'tps2.pkl')98    # peak ratio (defined) =  [perc90 : median] of throughput (per day per device)99    # returns pandas dataframe [ Device_number | date | peakratio ]100    logger.debug("Calculate peak ratio = [perc95:mean] throughput per date per device")101    peak_ratio1 = get_peak_ratios(tps1, 'perc90', 'mean')102    peak_ratio2 = get_peak_ratios(tps2, 'perc90', 'mean')103    #logger.debug("Calculate peak ratio = [perc95:median] throughput per date per device")104    #peak_ratio1 = get_peak_ratios(tps1, 'perc90', 'median')105    #peak_ratio2 = get_peak_ratios(tps2, 'perc90', 'median')106    del tps1, tps2107    # use peak_ratio['peakratio'] to get all ratios regardless of day/time108    logger.debug("plot peak ratio CDF of all")109    plot_peak_ratio_cdf(peak_ratio1['peakratio'], peak_ratio2['peakratio'], 'all', PLOTPATH)110    for agg_param in ["min", "mean", "median", "perc90", "max"]:111        peak_ratio_per_day1 = ratios_per_date(peak_ratio1, agg_param)112        peak_ratio_per_day2 = ratios_per_date(peak_ratio2, agg_param)113        logger.debug("plot peak ratio CDF aggregated over dates: filter by "+agg_param)114        plot_peak_ratio_timeseries(peak_ratio_per_day1, peak_ratio_per_day2, agg_param, PLOTPATH)115        peak_ratio_per_dev1 = ratios_per_device(peak_ratio1, agg_param)116        peak_ratio_per_dev2 = ratios_per_device(peak_ratio2, agg_param)117        logger.debug("plot peak ratio timeseries aggregated over devices: filter by "+agg_param)118        plot_peak_ratio_cdf(peak_ratio_per_dev1, peak_ratio_per_dev2, agg_param, PLOTPATH)119    del peak_ratio1, peak_ratio2120    del peak_ratio_per_day1, peak_ratio_per_day2121    del peak_ratio_per_dev1, peak_ratio_per_dev2122    return123def throughput_weekday(test_full, control_full, PROCPATH, PLOTPATH):124    # octets stats calculation per datetime aggregate125    if not os.path.isfile(PROCPATH + 'os1.pkl'):126        logger.debug("Calculate octets stats per datetime for test")127        os1 = aggregate_octets_stats_per_datetime(test_full)128        os1.to_pickle(PROCPATH + 'os1.pkl')129        logger.debug("Calculate octets stats per datetime for control")130        os2 = aggregate_octets_stats_per_datetime(control_full)131        os2.to_pickle(PROCPATH + 'os2.pkl')132    else:133        logger.debug("Load octets stats per datetime for test")134        os1 = pd.read_pickle(PROCPATH + 'os1.pkl')135        logger.debug("Load octets stats per datetime for control")136        os2 = pd.read_pickle(PROCPATH + 'os2.pkl')137    # group octets [max, min, median, perc90, len, std] by weekday and time138    # column to select from g1 and g2 groups, originally in os1 and os2139    # selecting sum here would create plots biased towards the set with more140    # devices, so should use mean to unbias that141    # can also try 'perc90' across all devices or 'median' across all devices142    # and then take mean or median when we fold on time143    g1 = os1.groupby([ 'day', 'time'])144    g2 = os2.groupby([ 'day', 'time'])145    # parameter to aggregate over devices146    param_device = 'mean'147    # parameter to aggregate over a week148    param_time = 'all'149    logger.debug("plot aggregated bytes throughput medians, perc95 per day")150    param_time = 'all1'151    plot_octets_per_day(g1, g2, param_device, param_time, PLOTPATH)152    plot_throughput_per_day(g1, g2, param_device, param_time, PLOTPATH)153    logger.debug("plot aggregated bytes + throughput max, mean per day")154    param_time = 'all2'155    plot_octets_per_day(g1, g2, param_device, param_time, PLOTPATH)156    plot_throughput_per_day(g1, g2, param_device, param_time, PLOTPATH)157    del g1, g2, os1, os2158    return159def plot_cdf(test_full, control_full, PLOTPATH):160    logger.debug("plot dataset throughput CDFs")161    plot_cdf_all_bytes(test_full, control_full, PLOTPATH)162    # MAX, perc95163    plot_cdf_per_device(test_full, control_full, PLOTPATH, None, 'max', 'perc95')164    plot_cdf_per_device(test_full, control_full, PLOTPATH, 'date', 'max', 'perc95')165    # perc95, mean166    plot_cdf_per_device(test_full, control_full, PLOTPATH, None, 'perc95', 'mean')167    plot_cdf_per_device(test_full, control_full, PLOTPATH, 'date', 'perc95', 'mean')168    # MAX, median169    plot_cdf_per_device(test_full, control_full, PLOTPATH, None, 'max', 'median')170    plot_cdf_per_device(test_full, control_full, PLOTPATH, 'date', 'max', 'median')171    return172def prevalence(test_full, control_full, PLOTPATH):173    logger.debug("plot prevalance: total devices by threshold")174    plot_prevalence_total_devices(test_full, control_full, PLOTPATH)175    return176def mp_plotter(folder):177    """178    Parallelized version of plotter179    """180    CURPATH, PLOTPATH, PROCPATH, test_full, control_full = init_setup(folder)181    jobs = []182    jobs.append( mp.Process(target= initial_timeseries,183                            args=(test_full, control_full, PLOTPATH,)) )184    jobs.append( mp.Process(target= peak_ratio,185                            args=(test_full, control_full, PROCPATH, PLOTPATH,)) )186    jobs.append( mp.Process(target= primetime,187                            args=(test_full, control_full, PLOTPATH,)) )188    jobs.append( mp.Process(target= throughput_weekday,189                            args=(test_full, control_full, PROCPATH, PLOTPATH,)) )190    jobs.append( mp.Process(target= plot_cdf,191                            args=(test_full, control_full, PLOTPATH,)) )192    jobs.append( mp.Process(target= prevalence,193                            args=(test_full, control_full, PLOTPATH,)) )194    logger.debug("Start parallel code for folder "+folder)195    for proc in jobs:196        proc.start()197    return198def plotter(folder):199    # INITIALIZE200    CURPATH, PLOTPATH, PROCPATH, test_full, control_full = init_setup(folder)201    # TIME SERIES202    initial_timeseries(test_full, control_full, PLOTPATH)203    # PEAK RATIO204    peak_ratio(test_full, control_full, PLOTPATH)205    # PRIME TIME206    primetime(test_full, control_full, PLOTPATH)207    # THROUGHPUT PER WEEKDAY208    throughput_weekday(test_full, control_full, PLOTPATH)209    # PLOT CDF210    plot_cdf(test_full, control_full, PLOTPATH)211    # PREVALENCE212    prevalence(test_full, control_full, PLOTPATH)213    logger.debug("DONE "+folder+" (for now)")214    """215    # prime time ratio = sum octets in peak hour : sum octets in off-peak hour216    # returns pandas dataframe [ Device_number | datetime (date only) | peakratio ]217    # prime time ratio calc per datetime218    #TODO get_prime_time_ratio()219    logger.debug("Calculate peak ratio = [perc90:median] throughput per date per device")220    peak_ratio1 = get_peak_ratios(tps1, 'perc90', 'median')221    peak_ratio2 = get_peak_ratios(tps2, 'perc90', 'median')222    del tps1, tps2223    """224    #TODO WEEKDAYS/HOLIDAYS/WEEKENDS SPLIT225    # GET date AND time:226    #logger.debug("Shitty way of getting date and time for datasets")227    #test_full['time'] = test_full.set_index('datetime').index.time228    #control_full['time'] = control_full.set_index('datetime').index.time229    #test_full['time'] = test_full['datetime'].apply(lambda x: x.time())230    #control_full['time'] = control_full['datetime'].apply(lambda x: x.time())231    #test_full['date'] = test_full['datetime'].apply(lambda x: x.date())232    #control_full['date'] = control_full['datetime'].apply(lambda x: x.date())233    #test_full['weekday'] = test_full['datetime'].apply(lambda x: x.weekday())234    #control_full['weekday'] = control_full['datetime'].apply(lambda x: x.weekday())235    return236def mp_plot_all():237    pool = mp.Pool(processes=12) #use 12 cores only238    for folder in os.listdir(INPUTPATH):239        pool.apply_async(mp_plotter, args=(folder,))240    pool.close()241    pool.join()242    return243def main(argv):244    #for folder in os.listdir("../separated/"):245    for folder in [argv]:246        mp_plotter(folder)247    return248def test():249    folder = 'control1_dw'250    CURPATH, PLOTPATH, PROCPATH, test_full, control_full = init_setup(folder)251    primetime(test_full, control_full, PLOTPATH)252    return253if __name__ == "__main__":254    print "INPUTPATH ", INPUTPATH255    print "OUTPUTPATH ", OUTPUTPATH256    print "PROCESSEDPATH ", PROCESSEDPATH257    print "folder = ", sys.argv[1]258    #test()259    main(sys.argv[1])...test_kernels.py
Source:test_kernels.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3"""4SCORR - Salvus Correlation5:copyright:6    Korbinian Sager (korbinian_sager@brown.edu), 20217:license:8    MIT License9"""10from scorr.kernel.source_kernel import SourceKernel11from scorr.test.helpers import DIR_TEST_DATA, wavefield_file_exists12from scorr.wavefield.wavefield import Wavefield13def test_source_kernel():14    test1 = SourceKernel()15    assert test1.coordinates is None16    assert test1.connectivity is None17    assert test1.globalElementIds is None18    assert test1.n_elements_global is None19    assert test1.kernel == 0.020    test2 = SourceKernel.init_with_kernel_file(DIR_TEST_DATA / "kernel_source.h5")21    wavefield = Wavefield(filename=wavefield_file_exists, starttime=0.0, endtime=1.0)22    print(test2.coordinates - wavefield.coordinates)23    assert (test2.coordinates == wavefield.coordinates).all()24    assert (test2.connectivity == wavefield.connectivity).all()25    assert (test2.globalElementIds == wavefield.globalElementIds).all()26    assert test2.n_elements_global == wavefield.n_elements_global27def test_add():28    test_empty = SourceKernel()29    test_full = SourceKernel.init_with_kernel_file(DIR_TEST_DATA / "kernel_source.h5")30    wavefield = Wavefield(filename=wavefield_file_exists, starttime=0.0, endtime=1.0)31    # add two empty source kernels32    test_sum = test_empty + test_empty33    assert test_sum.coordinates is None34    assert test_sum.connectivity is None35    assert test_sum.globalElementIds is None36    assert test_sum.n_elements_global is None37    assert test_sum.kernel == 0.038    # empty + full39    test_sum = test_empty + test_full40    assert (test_sum.coordinates == test_full.coordinates).all()41    assert (test_sum.connectivity == test_full.connectivity).all()42    assert (test_sum.globalElementIds == test_full.globalElementIds).all()43    assert test_sum.n_elements_global == test_full.n_elements_global44    assert (test_sum.kernel == test_full.kernel).all()45    assert (test_sum.coordinates == wavefield.coordinates).all()46    assert (test_sum.connectivity == wavefield.connectivity).all()47    assert (test_sum.globalElementIds == wavefield.globalElementIds).all()48    assert test_sum.n_elements_global == wavefield.n_elements_global49    # full + empty50    test_sum = test_full + test_empty51    assert (test_sum.coordinates == test_full.coordinates).all()52    assert (test_sum.connectivity == test_full.connectivity).all()53    assert (test_sum.globalElementIds == test_full.globalElementIds).all()54    assert test_sum.n_elements_global == test_full.n_elements_global55    assert (test_sum.kernel == test_full.kernel).all()56    assert (test_sum.coordinates == wavefield.coordinates).all()57    assert (test_sum.connectivity == wavefield.connectivity).all()58    assert (test_sum.globalElementIds == wavefield.globalElementIds).all()59    assert test_sum.n_elements_global == wavefield.n_elements_global60    # full + full61    test_sum = test_full + test_full62    assert (test_sum.coordinates == test_full.coordinates).all()63    assert (test_sum.connectivity == test_full.connectivity).all()64    assert (test_sum.globalElementIds == test_full.globalElementIds).all()65    assert test_sum.n_elements_global == test_full.n_elements_global66    assert (test_sum.kernel == 2 * test_full.kernel).all()67    assert (test_sum.coordinates == wavefield.coordinates).all()68    assert (test_sum.connectivity == wavefield.connectivity).all()69    assert (test_sum.globalElementIds == wavefield.globalElementIds).all()70    assert test_sum.n_elements_global == wavefield.n_elements_global71def test_iadd():72    test_full_check = SourceKernel.init_with_kernel_file(DIR_TEST_DATA / "kernel_source.h5")73    wavefield = Wavefield(filename=wavefield_file_exists, starttime=0.0, endtime=1.0)74    # add two empty source kernels75    test_empty = SourceKernel()76    test_empty += test_empty77    assert test_empty.coordinates is None78    assert test_empty.connectivity is None79    assert test_empty.globalElementIds is None80    assert test_empty.n_elements_global is None81    assert test_empty.kernel == 0.082    # empty + full83    test_empty = SourceKernel()84    test_full = SourceKernel.init_with_kernel_file(DIR_TEST_DATA / "kernel_source.h5")85    test_empty += test_full86    assert (test_empty.coordinates == test_full.coordinates).all()87    assert (test_empty.connectivity == test_full.connectivity).all()88    assert (test_empty.globalElementIds == test_full.globalElementIds).all()89    assert test_empty.n_elements_global == test_full.n_elements_global90    assert (test_empty.kernel == test_full.kernel).all()91    assert (test_empty.coordinates == wavefield.coordinates).all()92    assert (test_empty.connectivity == wavefield.connectivity).all()93    assert (test_empty.globalElementIds == wavefield.globalElementIds).all()94    assert test_empty.n_elements_global == wavefield.n_elements_global95    # full + empty96    test_empty = SourceKernel()97    test_full = SourceKernel.init_with_kernel_file(DIR_TEST_DATA / "kernel_source.h5")98    test_full += test_empty99    assert (test_full.kernel == test_full_check.kernel).all()100    assert (test_full.coordinates == wavefield.coordinates).all()101    assert (test_full.connectivity == wavefield.connectivity).all()102    assert (test_full.globalElementIds == wavefield.globalElementIds).all()103    assert test_full.n_elements_global == wavefield.n_elements_global104    # full + full105    test_full += test_full106    assert (test_full.kernel == 2 * test_full_check.kernel).all()107    assert (test_full.coordinates == wavefield.coordinates).all()108    assert (test_full.connectivity == wavefield.connectivity).all()109    assert (test_full.globalElementIds == wavefield.globalElementIds).all()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
