Best Python code snippet using ATX
Anomaly_Detection.py
Source:Anomaly_Detection.py  
1import numpy as np2import pandas as pd3import pickle4import math5from sklearn.decomposition import PCA6from sklearn import preprocessing7import matplotlib.pyplot as plt8import matplotlib.patches as mpatches9from sklearn.preprocessing import StandardScaler10from scipy.spatial.distance import pdist, cdist, squareform11from matplotlib.colors import ListedColormap12from sklearn.model_selection import train_test_split13import os14import SODA15import data_manipulation as dm16import multiprocessing17from sklearn.utils.validation import check_array18import sys19#-------------------------Main Code--------------------------#20def calculate(func, args):21    result = func(*args)22    return result23def calculatestar(args):24    return calculate(*args)25def main():26    #------------------------------------------------------------#27    #-------------------Initiation Part--------------------------#28    ####### Variables set by user #######29    # PCA number of components30    N_PCs = 831    # List of granularities 32    gra_list = [13] 33    # Number of iteration34    iterations = 135    # Number of process to create in the multiprocessing step36    PROCESSES = 437    # Number of Data-set divisions38    total = 10000039    # Number of Data-set divisions40    windows = 10041    # Percentage of background samples on the testing phase42    background_percent = 9943    # Firstly the model loads the background and signal data, then it removes the 44    # attributes first string line, in order to avoid NaN values in the array.45    # Using multiprocess to load the data-sets into the code46    print('         ==== Commencing Initiation ====\n', file=open("log_file.txt", "a+"))47    ### Background    48    b_name='/AtlasDisk/user/pestana/Input/Input_Background_1.csv'49    #b_name='Input_Background_1.csv'50    background = np.genfromtxt(b_name, delimiter=',')51    background = background[1:,:]52    background, _ = dm.divide(background, windows, total)53    print("     .Background Loaded...", file=open("log_file.txt", "a"))54    ### Signal55    s_name='/AtlasDisk/user/pestana/Input/Input_Signal_1.csv'56    #s_name='Input_Signal_1.csv'57    signal = np.genfromtxt(s_name, delimiter=',')58    signal = signal[1:,:]59    print("     .Signal Loaded...", file=open("log_file.txt", "a"))60    print('\n          ==== Initiation Complete ====\n', file=open("log_file.txt", "a"))61    print('=*='*17, file=open("log_file.txt", "a"))62    print('      ==== Commencing Data Processing ====', file=open("log_file.txt", "a"))63    with multiprocessing.Pool(PROCESSES) as pool:64        TASKS = [(model, (n_i,background,background_percent,signal,windows,N_PCs,gra_list, total)) for n_i in range(iterations)]65        print('             .Executing SODA for granularities', gra_list, file=open("log_file.txt", "a"))66        pool.map(calculatestar, TASKS)67def model(n_i,background,background_percent,signal,windows,N_PCs,gra_list, total):68    print('\n     => Iteration Number', (n_i+1), file=open("log_file.txt", "a"))69    # Devide data-set into training and testing sub-sets70    print('         .Dividing training and testing sub-sets', file=open("log_file.txt", "a"))71    test_size = 0.372    test = int(total*test_size)73    b_test = int(test*background_percent/100)74    background_train, background_test = train_test_split(background, test_size=0.30, random_state=42)75    background_test, _ = dm.divide(background_test, windows, b_test)76    # Defining number of events Signal events on online phase.77    signal_online_samples = int(test - b_test)78    # Devide online signal79    print('         .Selecting Signal on the following porpotion:', file=open("log_file.txt", "a"))80    print('             .' + str(background_percent) + '% Background samples', file=open("log_file.txt", "a"))81    print('             .' + str(100-background_percent) + '% Signal samples', file=open("log_file.txt", "a"))82    print('             .{:9d} of Background samples (Offline)'.format(int(total*(1-test_size))), file=open("log_file.txt", "a"))83    print('             .{:9d} of Background samples (Online)'.format(int(b_test)), file=open("log_file.txt", "a"))84    print('             .{:9d} of Signal samples (Online)'.format(int(signal_online_samples)), file=open("log_file.txt", "a"))85    reduced_signal, signal_sample_id = dm.divide(signal, windows, signal_online_samples)86    # Nextly, the Signal data processed is saved in the Analised data directory.87    #np.savetxt('/AtlasDisk/user/pestana/Output/Analysed_Signal/Reduced_iteration_' + str(n_i) + '_' + s_name,reduced_signal,delimiter=',')88    #np.savetxt('/AtlasDisk/user/pestana/Output/Analysed_Signal/Reduced_ID_iteration_' + str(n_i) + '_' + s_name,signal_sample_id,delimiter=',')89    # Concatenating Signal and the Test Background sub-set90    streaming_data = np.concatenate((background_test,reduced_signal), axis=0)91    # Normalize Data92    norm_background_train = preprocessing.normalize(background_train)93    norm_streaming_data = preprocessing.normalize(streaming_data)94    #print('         .Normalizing Data', file=open("log_file.txt", "a"))95    # Calculates Statistical attributes96    print('         .Calculating statistical attributes', file=open("log_file.txt", "a"))97    xyz_streaming_data = dm.statistics_attributes(norm_streaming_data)98    xyz_background_train = dm.statistics_attributes(norm_background_train)99    #xyz_streaming_data = dm.statistics_attributes(streaming_data)100    #xyz_background_train = dm.statistics_attributes(background_train)101    #xyz_signal = dm.statistics_attributes(signal)102    #xyz_background = dm.statistics_attributes(background)103    104    #transformer = preprocessing.Normalizer().fit(np.vstack((xyz_signal,xyz_background)))105    106    #xyz_background = transformer.transform(xyz_background)107    #xyz_signal = transformer.transform(xyz_signal)108    #np.savetxt('xyz_reduced_signal_norm.csv',xyz_signal,delimiter=',')109    #np.savetxt('xyz_background_norm.csv',xyz_background,delimiter=',')110    # Normalize Features111    #print('         .Normalizing Features', file=open("log_file.txt", "a"))112    # Calculates PCA and projects the sub-sets 113    print('         .Calculating PCA:', file=open("log_file.txt", "a"))114    proj_xyz_background_train, proj_xyz_streaming_data, xyz_mantained_variation, xyz_attributes_influence = dm.PCA_Projection(xyz_background_train,xyz_streaming_data,N_PCs)115    #proj_xyz_background_train, proj_xyz_streaming_data, xyz_mantained_variation, xyz_attributes_influence = dm.PCA_Projection(norm_xyz_background_train,norm_xyz_streaming_data,N_PCs)116    # Plots PCA results117    print('         .Ploting PCA results', file=open("log_file.txt", "a"))118    dm.PCA_Analysis(xyz_mantained_variation,xyz_attributes_influence)119    #proj_xyz_background_train = preprocessing.normalize(proj_xyz_background_train)120    #proj_xyz_streaming_data = preprocessing.normalize(proj_xyz_streaming_data)121    122    for gra in gra_list:123        dm.SODA_Granularity_Iteration(proj_xyz_background_train,proj_xyz_streaming_data, gra,len(background_test),n_i)124    125    """126    print('         .Running SODA on base granularity', file=open("log_file.txt", "a"))127    dm.SODA_Granularity_Iteration(proj_xyz_background_train,proj_xyz_streaming_data, 1,len(background_test),n_i)128    print('         .Creating pool with %d processes:' % PROCESSES, file=open("log_file.txt", "a"))129    with multiprocessing.Pool(PROCESSES) as pool:130        TASKS = [(dm.SODA_Granularity_Iteration, (proj_xyz_background_train,proj_xyz_streaming_data, gra,len(background_test),n_i)) for gra in gra_list]131        print('             .Executing SODA for granularities', gra_list, file=open("log_file.txt", "a"))132        pool.map(calculatestar, TASKS)133    """134    135        136    print('\n        ====Data Processing Complete====\n', file=open("log_file.txt", "a"))137    print('=*='*17, file=open("log_file.txt", "a"))138            139if __name__ == '__main__':140    multiprocessing.freeze_support()...realtime_test.py
Source:realtime_test.py  
1from __future__ import print_function2import time3try:  # Python 24    import urlparse5except ModuleNotFoundError:  # Python 36    from urllib import parse as urlparse7import threading8import json9import os.path10import webbrowser11from optparse import OptionParser12from . import webserver13from . import monitor14from . import background_test15from . import project16            17background_test.RunTests.instance = background_test.RunTests()18class RunAllTestsWhenAChangeHappens(object):19   20    21    def __init__(self, server):22        self.must_run = False23        self.server = server24        25    def start(self):26        self.must_run = True;27        self.thread = threading.Thread(target=self.run)28        self.thread.daemon = True;29        self.thread.start()30    31    def stop(self):32        self.must_run = False;33        34    def run(self):35        cwd = os.getcwd()36        paths = [os.path.join(cwd, x) for x in project.DIRECTORIES] 37        monitor_directories = monitor.MonitorDirectories(paths)38        monitor_directories.check()39        monitor_directories.changed = True40        while self.must_run:41            if monitor_directories.changed:42                if not background_test.RunTests.instance.can_start_a_test():43                    monitor_directories.check()44                    time.sleep(0.5)45                    continue46                47                if not self.server.last_report is None:48                    for element in monitor_directories.updated_elements:49                        if not element.is_file():50                            continue51                        for x in self.server.last_report.address_to_report.values():52                            path, module, testcase =  x.address53                            if path == element.path:54                                x.reset_timing()55                                print("will rerun: ", module, testcase)56                print("Changed files:")57                number_of_changed_files = 058                for element in monitor_directories.updated_elements:59                    if not element.is_file():60                        continue61                    print(element.path)62                    number_of_changed_files += 163                    64                if number_of_changed_files > 0 or self.server.last_report is None:65                    last_report = self.server.last_report66                    67                    self.server.set_last_report(None)68                    69                    report = background_test.RunTests.instance.run_tests(last_report)70                71                    self.server.set_last_report(report)72             73                monitor_directories.check()74            else:75                time.sleep(0.5)76                monitor_directories.check()77                78                79    80class HandleRequest(webserver.HandleRequest):81    82    def do_start(self):83        self.server.restart_testrunner()84        string = 'null'85        content_type = 'text/javascript'86        return string, content_type87    88    89    def do_pause(self):90        self.server.stop_testrunner()91        return 'null', 'text/javascript' 92    93    def do_get_last_report(self):94        string = json.dumps(self.server.get_last_report_as_dict())95        content_type = 'text/javascript'96        return string, content_type97    def do_get_last_report_information(self):98        string = json.dumps(self.server.get_last_report_information())99        content_type = 'text/javascript'100        return string, content_type101    102    def do_run_test(self):103        parameters = urlparse.parse_qs(self.parsed_path.query)104        a0 = parameters['a0'][0]105        a1 = parameters['a1'][0]106        a2 = parameters['a2'][0]107        address = (a0, a1, a2)108        result = background_test.RunTests.instance.run_test_with_address(address)109        string = json.dumps(result)110        content_type = 'text/javascript'111        self.server.continue_testrunner()112        return string, content_type113        114    def index_file(self):115        base = os.path.split(__file__)[0]116        filename = os.path.join(base, "realtime_test.html")117        with open(filename, "r") as file:118            contents = file.read()119            return contents, 'text/html'120            121class ContinuosTestWebServer(webserver.WebServer):122    123    def __init__(self, port):124        webserver.WebServer.__init__(self,  port, HandleRequest)125        self.last_report = None126        self.run_all_tests = RunAllTestsWhenAChangeHappens(self)127        self.run_all_tests.start()128        self.report_id = 0129        130        131    def stop(self):132        self.run_all_tests.stop()133        self.shutdown()134        135    def restart_testrunner(self):136        self.run_all_tests.stop()137        self.last_report = None138        self.run_all_tests = RunAllTestsWhenAChangeHappens(self)139        self.run_all_tests.start()140        141    def stop_testrunner(self):142        self.run_all_tests.stop()        143    144    def continue_testrunner(self):145        if not self.run_all_tests.must_run:146            self.restart_testrunner()147        148    def get_last_report_as_dict(self):149        if self.last_report is None:150            return None151        else:152            result = self.last_report.to_dict()153            return result154            155    def get_last_report_information(self):156        if self.last_report is None:157            result =  self.get_live_report_info()158        else:159            result =  self.last_report.to_information_dict()160        result['reports'] = self.get_live_reports()161        return result162    163    def get_live_reports(self):164        return background_test.RunTests.instance.get_reports()165    166    def get_live_report_info(self):167        return background_test.RunTests.instance.report_info168        169    def set_last_report(self, report):170        self.last_report = report171        if not report is None:172            self.report_id += 1173            self.last_report.report_id = self.report_id174        175def start_browser(serverport):176    time.sleep(2.0)177    webbrowser.open("http://localhost:{0}/".format(serverport))178            179if __name__ == '__main__':180    parser = OptionParser() 181    182    183    parser.add_option("-p", "--port", 184      dest="serverport",185      help="start serving on PORT", 186      metavar="PORT", 187      default=9070,188      type="int")189      190    parser.add_option("-e", "--editor", 191      dest="editor",192      help="preferred EDITOR for editing the files", 193      metavar="EDITOR", 194      default="geany",195      type="string")196    parser.add_option("-b", "--browser", 197      dest="startbrowser",198      help="automatically start a browser", 199      metavar="PORT", 200      default="yes",201      type="string")202      203    (options, args) = parser.parse_args()204    print("starting server on port: ", options.serverport)205    print("will use editor: ", options.editor)206    webserver.EDITOR = options.editor207    208    if options.startbrowser == "yes":209        thread = threading.Thread(target = start_browser, args = (options.serverport,))210        thread.start()211    212    server = ContinuosTestWebServer(options.serverport)213    server.start()...pca.py
Source:pca.py  
1#!/usr/bin/env python2"""3============================4Principal Component Analysis5============================6Principal Component Analysis (PCA) applied to this data identifies the7combination of attributes (principal components, or directions in the feature8space) that account for the most variance in the data.9"""10print __doc__11import numpy as np12import pylab as pl13from matplotlib.ticker import NullFormatter14from sklearn.decomposition import PCA15from sklearn import svm16import samples17import features18def perform_pca(channel):19    signals, backgrounds = samples.get_samples(channel, purpose='train')20    if channel == '01jet':21        branches = features.hh_01jet_vars22    else:23        branches = features.hh_2jet_vars24    X_train, X_test,\25    w_train, w_test,\26    y_train, y_test = samples.make_classification(27            *(samples.make_train_test(signals, backgrounds,28                branches=branches,29                train_fraction=.5,30                max_sig_train=2000,31                max_bkg_train=2000,32                max_sig_test=2000,33                max_bkg_test=2000,34                same_size_train=True,35                same_size_test=True,36                norm_sig_to_bkg_train=True,37                norm_sig_to_bkg_test=True)),38            standardize=True)39    print X_train40    print X_test41    print w_train42    print w_test43    print w_train.min(), w_train.max()44    pca = PCA(n_components=2)45    # fit only on background46    pca.fit(X_train[y_train == 0])47    X_train_pca = pca.transform(X_train)48    X_test_pca = pca.transform(X_test)49    xmin = X_test_pca[:, 0].min()50    xmax = X_test_pca[:, 0].max()51    ymin = X_test_pca[:, 1].min()52    ymax = X_test_pca[:, 1].max()53    width = xmax - xmin54    height = ymax - ymin55    xmin -= width*.156    xmax += width*.157    ymin -= height*.158    ymax += height*.159    # fit support vector machine on output of PCA60    clf = svm.SVC(C=100, gamma=.01, probability=True, scale_C=True)61    clf.fit(X_train_pca, y_train, sample_weight=w_train)62    # plot the decision function63    xx, yy = np.meshgrid(np.linspace(xmin, xmax, 500), np.linspace(ymin, ymax, 500))64    Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()])65    Z = Z.reshape(xx.shape)66    channel_name = samples.CHANNEL_NAMES[channel]67    target_names = ['%s Signal' % channel_name,68                    '%s Background' % channel_name]69    target_values = [1, 0]70    # Percentage of variance explained for each components71    print 'explained variance ratio (first two components):', \72        pca.explained_variance_ratio_73    # plot PCA and SVM output74    pl.figure()75    # plot support vector machine decision function76    pl.set_cmap(pl.cm.jet)77    pl.contourf(xx, yy, Z, alpha=0.75)78    for c, i, target_name in zip("rb", target_values, target_names):79        pl.scatter(X_test_pca[y_test == i, 0], X_test_pca[y_test == i, 1],80                   c=c, label=target_name,81                   s=w_test[y_test == i]*10,82                   alpha=0.9)83    pl.xlim((xmin, xmax))84    pl.ylim((ymin, ymax))85    pl.legend()86    pl.xlabel('Principal Component [arb. units]')87    pl.ylabel('Secondary Component [arb. units]')88    pl.title('Principal Component Analysis\n'89             'and Support Vector Machine Decision Function')90    pl.savefig('pca_%s.png' % channel)91    # testing:92    signals, backgrounds = samples.get_samples(channel, purpose='test',93            mass=125)94    signal_train, signal_weight_train, \95    signal_test, signal_weight_test, \96    background_train, background_weight_train, \97    background_test, background_weight_test = samples.make_train_test(98                signals, backgrounds,99                branches=branches,100                train_fraction=.5,101                norm_sig_to_bkg_train=False,102                norm_sig_to_bkg_test=False)103    sample_test = np.concatenate((background_test, signal_test))104    sample_test = samples.std(sample_test)105    background_test, signal_test = sample_test[:len(background_test)], \106                                   sample_test[len(background_test):]107    signal_test = pca.transform(signal_test)108    background_test = pca.transform(background_test)109    pl.figure()110    pl.hist(clf.predict_proba(background_test)[:,-1],111            weights=background_weight_test, bins=30, range=(0, 1),112            label='Background', color='b')113    pl.hist(clf.predict_proba(signal_test)[:,-1],114            weights=signal_weight_test*10, bins=30, range=(0, 1),115            label='Signal x 10', color='r')116    pl.legend()117    pl.ylabel('Events')118    pl.xlabel('Support Vector Machine Signal Probability')119    pl.savefig('pca_svm_score_%s.png' % channel)120if __name__ == '__main__':121    perform_pca('2jet')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
