How to use test_alerts method in SeleniumBase

Best Python code snippet using SeleniumBase

basemodel.py

Source:basemodel.py Github

copy

Full Screen

1# Dependencies2import warnings3warnings.filterwarnings('ignore')4import sys 5import argparse6import pandas as pd7import numpy as np8import matplotlib.pyplot as plt9import matplotlib.dates as mdates10import seaborn as sns11from statsmodels.tsa.seasonal import seasonal_decompose12from sklearn.preprocessing import StandardScaler13from sklearn.covariance import EllipticEnvelope14class RHRAD_online:15 16 def __init__(self, 17 hr="AHYIJDV_hr.csv", 18 steps="AHYIJDV_steps.csv",19 myphd_id="myphd_id",20 symptom_date="NaN",21 diagnosis_date="NaN",22 RANDOM_SEED=1337,23 outliers_fraction=0.1,24 baseline_window=480, # 20 days25 sliding_window=1,26 myphd_id_anomalies="myphd_id_anomalies.csv",27 myphd_id_figure1 = "myphd_id_anomalies.pdf",28 myphd_id_alerts = "myphd_id_alerts.csv",29 myphd_id_figure2 = "myphd_id_alerts.pdf",30 last_day_only=True31 ):32 33 # Initialize Variables34 self.fitbit_hr = hr35 self.fitbit_steps = steps36 self.myphd_id = hr.split("_")[0]37 self.symptom_date = symptom_date38 self.diagnosis_date = diagnosis_date39 self.RANDOM_SEED = RANDOM_SEED40 self.outliers_fraction = outliers_fraction41 self.baseline_window = baseline_window #74442 self.sliding_window = sliding_window43 44 # self.myphd_id_anomalies = self.myphd_id+"_anomalies.csv"45 self.myphd_id_anomalies = myphd_id_anomalies46 self.myphd_id_figure1 = myphd_id_figure147 # self.myphd_id_alerts = self.myphd_id+"_alerts.csv"48 self.myphd_id_alerts = myphd_id_alerts49 self.myphd_id_figure2 = myphd_id_figure250 self.last_day_only = last_day_only51 52 # Process data53 df1 = self.resting_heart_rate(self.fitbit_hr, self.fitbit_steps) # RHR df at 1min resolution54 df2 = self.pre_processing(df1) # RHR df, smoothed and at 1hr resolution55 # Apply seasonality correction56 sdHR = df2[['heartrate']]57 data_seasnCorec = self.seasonality_correction(sdHR)58 data_seasnCorec += 0.159 # Run model60 self.dfs = []61 self.data_train = []62 self.data_test = []63 self.online_anomaly_detection(data_seasnCorec, self.baseline_window, self.sliding_window)64 # print(self.data_test)65 # Process results66 results = self.merge_test_results(self.data_test)67 self.pos_anomalies = self.positive_anomalies(results)68 if(len(self.pos_anomalies) > 0):69 self.alerts = self.create_alerts(self.pos_anomalies, results, self.fitbit_hr)70 self.test_alerts = self.merge_alerts(results, self.alerts)71 72 # Infer resting heart rate ------------------------------------------------------73 def resting_heart_rate(self, heartrate, steps):74 """75 This function uses heart rate and steps data to infer resting heart rate.76 It filters the heart rate with steps that are zero and also 12 minutes ahead.77 """78 # heart rate data79 df_hr = pd.read_csv(heartrate)80 df_hr = df_hr.set_index('datetime')81 df_hr.index.name = None82 df_hr.index = pd.to_datetime(df_hr.index)83 # steps data84 df_steps = pd.read_csv(steps)85 df_steps = df_steps.set_index('datetime')86 df_steps.index.name = None87 df_steps.index = pd.to_datetime(df_steps.index)88 # merge heartrate and steps89 df1 = pd.merge(df_hr, df_steps, left_index=True, right_index=True)90 df1 = df1.resample('1min').mean() # resample to 1min resolution91 df1 = df1.dropna()92 93 # define RHR as the HR measurements recorded when there were zero steps taken during a rolling time window of the preceding 12 minutes (including the current minute)94 df1['steps_window_12'] = df1['steps'].rolling(12).sum()95 df1 = df1.loc[(df1['steps_window_12'] == 0)]96 return df197 98 # Pre-processing ------------------------------------------------------99 def pre_processing(self, resting_heart_rate):100 """101 This function takes resting heart rate data and applies moving averages to smooth the data and 102 downsamples to one hour by taking the avegare values103 """104 df1 = resting_heart_rate105 # smooth data106 df_nonas = df1.dropna()107 df1_rom = df_nonas.rolling(400).mean()108 # resample109 df1_resmp = df1_rom.resample('1H').mean()110 df2 = df1_resmp.drop(['steps', 'steps_window_12'], axis=1)111 df2 = df2.dropna()112 return df2113 114 # Seasonality correction ------------------------------------------------------115 def seasonality_correction(self, resting_heart_rate):116 """117 This function takes output pre-processing and applies seasonality correction118 """119 sdHR = resting_heart_rate120 sdHR_decomposition = seasonal_decompose(sdHR, model='additive', freq=1)121 sdHR_decomp = pd.DataFrame(sdHR_decomposition.resid + sdHR_decomposition.trend)122 sdHR_decomp.rename(columns={sdHR_decomp.columns[0]:'heartrate'}, inplace=True)123 124 return sdHR_decomp125 126 # Train model and predict anomalies ------------------------------------------------------127 def online_anomaly_detection(self, data_seasnCorec, baseline_window, sliding_window):128 """129 data_seasnCorec comes from previous step130 baseline_window and sliding_window are both (int) types -- lengths of respective windows131 132 # split the data, standardize the data inside a sliding window 133 # parameters - 1 month baseline window and 1 hour sliding window134 # fit the model and predict the test set135 """136 if(self.last_day_only):137 for i in range(len(data_seasnCorec)-15, len(data_seasnCorec)):138 data_train_w = data_seasnCorec[i-baseline_window:i] 139 # train data normalization ------------------------------------------------------140 data_train_w += 0.1141 standardizer = StandardScaler().fit(data_train_w.values)142 data_train_scaled = standardizer.transform(data_train_w.values)143 data_train_scaled_features = pd.DataFrame(data_train_scaled, index=data_train_w.index, columns=data_train_w.columns)144 145 data = pd.DataFrame(data_train_scaled_features)146 data_1 = pd.DataFrame(data).fillna(0)147 data_train_w = data_1148 self.data_train.append(data_train_w)149 data_test_w = data_seasnCorec[i:i+sliding_window] 150 # test data normalization ------------------------------------------------------151 data_test_w += 0.1152 data_test_scaled = standardizer.transform(data_test_w.values)153 data_scaled_features = pd.DataFrame(data_test_scaled, index=data_test_w.index, columns=data_test_w.columns)154 155 data = pd.DataFrame(data_scaled_features)156 data_1 = pd.DataFrame(data).fillna(0)157 data_test_w = data_1158 self.data_test.append(data_test_w)159 # fit the model ------------------------------------------------------160 model = EllipticEnvelope(random_state=self.RANDOM_SEED,161 contamination=self.outliers_fraction,162 support_fraction=0.7).fit(data_train_w)163 # predict the test set164 preds = model.predict(data_test_w)165 #preds = preds.rename(lambda x: 'anomaly' if x == 0 else x, axis=1)166 self.dfs.append(preds)167 168 else:169 for i in range(baseline_window, len(data_seasnCorec)):170 data_train_w = data_seasnCorec[i-baseline_window:i] 171 # train data normalization ------------------------------------------------------172 data_train_w += 0.1173 standardizer = StandardScaler().fit(data_train_w.values)174 data_train_scaled = standardizer.transform(data_train_w.values)175 data_train_scaled_features = pd.DataFrame(data_train_scaled, index=data_train_w.index, columns=data_train_w.columns)176 177 data = pd.DataFrame(data_train_scaled_features)178 data_1 = pd.DataFrame(data).fillna(0)179 data_train_w = data_1180 self.data_train.append(data_train_w)181 data_test_w = data_seasnCorec[i:i+sliding_window] 182 # test data normalization ------------------------------------------------------183 data_test_w += 0.1184 data_test_scaled = standardizer.transform(data_test_w.values)185 data_scaled_features = pd.DataFrame(data_test_scaled, index=data_test_w.index, columns=data_test_w.columns)186 187 data = pd.DataFrame(data_scaled_features)188 data_1 = pd.DataFrame(data).fillna(0)189 data_test_w = data_1190 self.data_test.append(data_test_w)191 # fit the model ------------------------------------------------------192 model = EllipticEnvelope(random_state=self.RANDOM_SEED,193 contamination=self.outliers_fraction,194 support_fraction=0.7).fit(data_train_w)195 # predict the test set196 preds = model.predict(data_test_w)197 #preds = preds.rename(lambda x: 'anomaly' if x == 0 else x, axis=1)198 self.dfs.append(preds)199 200 # Merge predictions ------------------------------------------------------201 def merge_test_results(self, data_test):202 """203 Merge predictions204 """205 # concat all test data (from sliding window) with their datetime index and others206 data_test = pd.concat(data_test)207 # merge predicted anomalies from test data with their corresponding index and other features 208 preds = pd.DataFrame(self.dfs)209 preds = preds.rename(lambda x: 'anomaly' if x == 0 else x, axis=1)210 data_test_df = pd.DataFrame(data_test)211 data_test_df = data_test_df.reset_index()212 data_test_preds = data_test_df.join(preds)213 return data_test_preds214 215 # Positive Anomalies -----------------------------------------------------------------216 """217 Selects anomalies in positive direction and saves in a CSV file218 """219 def positive_anomalies(self, data):220 a = data.loc[data['anomaly'] == -1, ('index', 'heartrate')]221 positive_anomalies = a[(a['heartrate']> 0)]222 # Anomaly results223 positive_anomalies['Anomalies'] = self.myphd_id224 positive_anomalies.columns = ['datetime', 'std.rhr', 'name']225 positive_anomalies.to_csv(self.myphd_id_anomalies, header=True) 226 return positive_anomalies227 228 # Alerts ------------------------------------------------------229 def create_alerts(self, anomalies, data, fitbit_oldProtocol_hr):230 """231 # creates alerts at every 24 hours and send at 9PM.232 # visualise alerts233 """234 # function to assign different alert names235 def alert_types(alert):236 if alert['alerts'] >=6:237 return 'RED'238 elif alert['alerts'] >=1:239 return 'YELLOW'240 else:241 return 'GREEN'242 # summarize hourly alerts243 anomalies = anomalies[['datetime']]244 anomalies['datetime'] = pd.to_datetime(anomalies['datetime'], errors='coerce')245 anomalies['alerts'] = 1246 anomalies = anomalies.set_index('datetime')247 anomalies = anomalies[~anomalies.index.duplicated(keep='first')]248 anomalies = anomalies.sort_index()249 alerts = anomalies.groupby(pd.Grouper(freq = '24H', base=21)).cumsum()250 # apply alert_types function251 alerts['alert_type'] = alerts.apply(alert_types, axis=1)252 alerts_reset = alerts.reset_index()253 # save alerts254 #alerts.to_csv(myphd_id_alerts, mode='a', header=True) 255 # summarize hourly alerts to daily alerts256 daily_alerts = alerts_reset.resample('24H', on='datetime', base=21, label='right').count()257 daily_alerts = daily_alerts.drop(['datetime'], axis=1)258 # apply alert_types function259 daily_alerts['alert_type'] = daily_alerts.apply(alert_types, axis=1)260 # merge missing 'datetime' with 'alerts' as zero aka GREEN261 data1 = data[['index']]262 data1['alert_type'] = 0263 data1 = data1.rename(columns={"index": "datetime"})264 data1['datetime'] = pd.to_datetime(data1['datetime'], errors='coerce')265 data1 = data1.resample('24H', on='datetime', base=21, label='right').count()266 data1 = data1.drop(data1.columns[[0,1]], axis=1)267 data1 = data1.reset_index()268 data1['alert_type'] = 0269 data3 = pd.merge(data1, daily_alerts, on='datetime', how='outer')270 data4 = data3[['datetime', 'alert_type_y']]271 data4 = data4.rename(columns={ "alert_type_y": "alert_type"})272 daily_alerts = data4.fillna("GREEN")273 daily_alerts = daily_alerts.set_index('datetime')274 daily_alerts = daily_alerts.sort_index()275 # merge alerts with main data and pass 'NA' when there is a missing day instead of 'GREEN'276 df_hr = pd.read_csv(fitbit_oldProtocol_hr)277 df_hr['datetime'] = pd.to_datetime(df_hr['datetime'], errors='coerce')278 df_hr = df_hr.resample('24H', on='datetime', base=21, label='right').mean()279 df_hr = df_hr.reset_index()280 df_hr = df_hr.set_index('datetime')281 df_hr.index.name = None282 df_hr.index = pd.to_datetime(df_hr.index)283 df3 = pd.merge(df_hr, daily_alerts, how='outer', left_index=True, right_index=True)284 df3 = df3[df3.alert_type.notnull()]285 df3.loc[df3.heartrate.isna(), 'alert_type'] = pd.NA286 daily_alerts = df3.drop('heartrate', axis=1)287 daily_alerts = daily_alerts.reset_index()288 daily_alerts = daily_alerts.rename(columns={"index": "datetime"})289 daily_alerts.to_csv(self.myphd_id_alerts, na_rep='NA', header=True) 290 return daily_alerts291 # Merge alerts ------------------------------------------------------292 def merge_alerts(self, data_test, alerts):293 """294 Merge alerts with their corresponding index and other features 295 """296 data_test = data_test.reset_index()297 data_test['index'] = pd.to_datetime(data_test['index'], errors='coerce')298 test_alerts = alerts299 test_alerts = test_alerts.rename(columns={"datetime": "index"})300 test_alerts['index'] = pd.to_datetime(test_alerts['index'], errors='coerce')301 test_alerts = pd.merge(data_test, test_alerts, how='outer', on='index')302 test_alerts.fillna(0, inplace=True)303 return test_alerts304 305 # Visualization and save predictions ------------------------------------------------------306 def visualize(self, results, positive_anomalies, test_alerts, symptom_date, diagnosis_date):307 """308 visualize all the data with anomalies and alerts309 """310 try:311 with plt.style.context('fivethirtyeight'):312 fig, ax = plt.subplots(1, figsize=(80,15))313 314 ax.bar(test_alerts['index'], test_alerts['heartrate'], linestyle='-', color='midnightblue', lw=6, width=0.01)315 colors = {0:'', 'RED': 'red', 'YELLOW': 'yellow', 'GREEN': 'lightgreen'}316 317 for i in range(len(test_alerts)):318 v = colors.get(test_alerts['alert_type'][i])319 ax.vlines(test_alerts['index'][i], test_alerts['heartrate'].min(), test_alerts['heartrate'].max(), linestyle='dotted', lw=4, color=v)320 321 #ax.scatter(positive_anomalies['index'],positive_anomalies['heartrate'], color='red', label='Anomaly', s=500)322 ax.tick_params(axis='both', which='major', color='blue', labelsize=60)323 ax.tick_params(axis='both', which='minor', color='blue', labelsize=60)324 ax.set_title(myphd_id,fontweight="bold", size=50) # Title325 ax.set_ylabel('Std. RHR\n', fontsize = 50) # Y label326 ax.axvline(pd.to_datetime(symptom_date), color='grey', zorder=1, linestyle='--', marker="v" , markersize=22, lw=6) # Symptom date 327 ax.axvline(pd.to_datetime(diagnosis_date), color='purple',zorder=1, linestyle='--', marker="v" , markersize=22, lw=6) # Diagnosis date328 ax.tick_params(axis='both', which='major', labelsize=60)329 ax.tick_params(axis='both', which='minor', labelsize=60)330 ax.xaxis.set_major_locator(mdates.DayLocator(interval=7))331 ax.grid(zorder=0)332 ax.grid(True)333 plt.xticks(fontsize=30, rotation=90)334 plt.yticks(fontsize=50)335 ax.patch.set_facecolor('white')336 fig.patch.set_facecolor('white') 337 figure = fig.savefig(myphd_id_figure1, bbox_inches='tight') 338 return figure339 except:340 with plt.style.context('fivethirtyeight'):341 fig, ax = plt.subplots(1, figsize=(80,15))342 ax.bar(test_alerts['index'], test_alerts['heartrate'], linestyle='-', color='midnightblue', lw=6, width=0.01)343 colors = {0:'', 'RED': 'red', 'YELLOW': 'yellow', 'GREEN': 'lightgreen'}344 345 for i in range(len(test_alerts)):346 v = colors.get(test_alerts['alert_type'][i])347 ax.vlines(test_alerts['index'][i], test_alerts['heartrate'].min(), test_alerts['heartrate'].max(), linestyle='dotted', lw=4, color=v)348 349 #ax.scatter(positive_anomalies['index'],positive_anomalies['heartrate'], color='red', label='Anomaly', s=500)350 ax.tick_params(axis='both', which='major', color='blue', labelsize=60)351 ax.tick_params(axis='both', which='minor', color='blue', labelsize=60)352 ax.set_title(myphd_id,fontweight="bold", size=50) # Title353 ax.set_ylabel('Std. RHR\n', fontsize = 50) # Y label354 ax.tick_params(axis='both', which='major', labelsize=60)355 ax.tick_params(axis='both', which='minor', labelsize=60)356 ax.xaxis.set_major_locator(mdates.DayLocator(interval=7))357 ax.grid(zorder=0)358 ax.grid(True)359 plt.xticks(fontsize=30, rotation=90)360 plt.yticks(fontsize=50)361 ax.patch.set_facecolor('white')362 fig.patch.set_facecolor('white') 363 figure = fig.savefig(myphd_id_figure1, bbox_inches='tight') 364 return figure365class resultsProcesser:366 def __init__(self,367 anomaliesCSV,368 alertsCSV):369 self.anomaliesCSV = anomaliesCSV370 self.alertsCSV = alertsCSV371 self.alertLevel = None372 self.anomalies = pd.read_csv(self.anomaliesCSV)373 numAnomalies = len(self.anomalies)374 if(numAnomalies == 0):375 self.alertLevel = "low"376 else:377 alerts = pd.read_csv(self.alertsCSV)378 temp = alerts.iloc[0]["alert_type"]379 if(temp == "GREEN"):380 self.alertLevel = "low"381 elif(temp == "YELLOW"):382 self.alertLevel = "medium"383 elif(temp == "RED"):384 self.alertLevel = "high"385 def getAlertLevel(self):386 return self.alertLevel387 def getAnomalousHours(self):388 hours = []389 for i in range(len(self.anomalies)):390 hours.append(self.anomalies.iloc[i]["datetime"].split(" ")[1][:5])391 return hours[:5]392 393 394 ...

Full Screen

Full Screen

test_pypheus_monitoring.py

Source:test_pypheus_monitoring.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""3Module for testing the functions in pypheus.monitoring.4"""5import os6import vcr7from unittest import TestCase8from unittest import mock9from nose.plugins.skip import SkipTest10import urllib311urllib3.disable_warnings()12import json13import requests14from pypheus.network import Network15from pypheus.storage import Storage16from pypheus.logs import Logs17from pypheus.monitoring import Monitoring18host = os.environ['HOST']19username = os.environ['USERNAME']20password = os.environ['PASSWORD']21test = Monitoring(host,username,password)22SKIPTEST=True23#TODO TAKE OUT HARDCODED DATA LATER24my_vcr = vcr.VCR(25 serializer='json',26 cassette_library_dir='./test_pyhpecfm/fixtures/cassettes',27 record_mode='new_episodes',28 match_on=['uri', 'method'],29)30class Monitoring(TestCase):31 """32 Test Case for pypheus.monitoring functions33 """34 @vcr.use_cassette(cassette_library_dir='./test_pypheus/fixtures/cassettes')35 def test_get_all_checks(self):36 """37 Simple test to return checks.38 :return:39 """40 test_checks = test.get_all_checks()41 self.assertIs(type(test_checks['checks']), list)42 self.assertIs(type(test_checks['checks'][0]), dict)43 def test_get_all_incidents(self):44 """45 Simple test to return incidents.46 :return:47 """48 test_incidents = test.get_all_incidents()49 self.assertIs(type(test_incidents['incidents']), list)50 self.assertIs(type(test_incidents['incidents'][0]), dict)51 def test_get_all_alerts(self):52 """53 Simple test to return alerts.54 :return:55 """56 test_alerts = test.get_all_alerts()57 self.assertIs(type(test_alerts['alerts']), list)58 self.assertIs(type(test_alerts['alerts'][0]), dict)59 def test_get_all_contacts(self):60 """61 Simple test to return contacts.62 :return:63 """64 test_contacts = test.get_all_contacts()65 self.assertIs(type(test_contacts['contacts']), list)...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

1from . import donation_alerts2from . import test_alerts3providers = {4 donation_alerts.DonationAlertsProvider.name: donation_alerts.DonationAlertsProvider,5 test_alerts.TestingProvider.name: test_alerts.TestingProvider,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run SeleniumBase automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful