Best Python code snippet using localstack_python
os_custom_ml.py
Source:os_custom_ml.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3"""4ibm-wos-utils==2.1.15Python 3.76"""7import warnings8warnings.filterwarnings('ignore')9import random10import time11import uuid12import matplotlib.pyplot as plt13import pandas as pd14import requests15from ibm_cloud_sdk_core.authenticators import CloudPakForDataAuthenticator16from ibm_watson_openscale import APIClient17from ibm_watson_openscale.base_classes.watson_open_scale_v2 import *18from ibm_watson_openscale.supporting_classes.enums import *19from ibm_watson_openscale.supporting_classes.payload_record import PayloadRecord20from ibm_wos_utils.drift.drift_trainer import DriftTrainer21def get_scoring_payload(df, cols_to_remove, no_of_records_to_score=1):22    for col in cols_to_remove:23        if col in df.columns:24            del df[col]25    fields = df.columns.tolist()26    values = df[fields].values.tolist()27    payload_scoring = {"fields": fields, "values": values[:no_of_records_to_score]}28    return payload_scoring29def custom_ml_scoring(scoring_url, payload_scoring):30    header = {"Content-Type": "application/json", "x": "y"}31    scoring_response = requests.post(scoring_url, json=payload_scoring, headers=header, verify=False)32    jsonify_scoring_response = scoring_response.json()33    return jsonify_scoring_response34def payload_logging(payload_scoring, scoring_response, wos_client, payload_data_set_id):35    scoring_id = str(uuid.uuid4())36    records_list = []37    # manual PL logging for custom ml provider38    pl_record = PayloadRecord(scoring_id=scoring_id, request=payload_scoring, response=scoring_response,39                              response_time=int(460))40    records_list.append(pl_record)41    wos_client.data_sets.store_records(data_set_id=payload_data_set_id, request_body=records_list)42    time.sleep(5)43    pl_records_count = wos_client.data_sets.get_records_count(payload_data_set_id)44    print("Number of records in the payload logging table: {}".format(pl_records_count))45    return scoring_id46def auth_cpd(WOS_CREDENTIALS):47    authenticator = CloudPakForDataAuthenticator(48        url=WOS_CREDENTIALS['url'],49        username=WOS_CREDENTIALS['username'],50        password=WOS_CREDENTIALS['password'],51        disable_ssl_verification=True52    )53    wos_client = APIClient(service_url=WOS_CREDENTIALS['url'], authenticator=authenticator)54    print(wos_client.version)55    print(wos_client.data_marts.show())56    return wos_client57def remove_existing_service_provider(wos_client, SERVICE_PROVIDER_NAME):58    service_providers = wos_client.service_providers.list().result.service_providers59    for service_provider in service_providers:60        service_instance_name = service_provider.entity.name61        if service_instance_name == SERVICE_PROVIDER_NAME:62            service_provider_id = service_provider.metadata.id63            wos_client.service_providers.delete(service_provider_id)64            print("Deleted existing service_provider for WML instance: {}".format(service_provider_id))65def add_service_provider(SERVICE_PROVIDER_NAME, SERVICE_PROVIDER_DESCRIPTION, ):66    request_headers = {"Content-Type": "application/json", "Custom_header_X": "Custom_header_X_value_Y"}67    MLCredentials = {}68    added_service_provider_result = wos_client.service_providers.add(69        name=SERVICE_PROVIDER_NAME,70        description=SERVICE_PROVIDER_DESCRIPTION,71        service_type=ServiceTypes.CUSTOM_MACHINE_LEARNING,72        request_headers=request_headers,73        operational_space_id="production",74        credentials=MLCredentials,75        background_mode=False76    ).result77    service_provider_id = added_service_provider_result.metadata.id78    print(wos_client.service_providers.get(service_provider_id).result)79    print('Service Provider ID : ' + service_provider_id)80    return service_provider_id81def remove_existing_subscription(wos_client, SUBSCRIPTION_NAME):82    subscriptions = wos_client.subscriptions.list().result.subscriptions83    for subscription in subscriptions:84        if subscription.entity.asset.name == "[asset] " + SUBSCRIPTION_NAME:85            sub_model_id = subscription.metadata.id86            wos_client.subscriptions.delete(subscription.metadata.id)87            print('Deleted existing subscription for model', sub_model_id)88def create_monitor(data_mart_id, target, parameters, thresholds, type, wos_client):89    type_dict = {'fairness': wos_client.monitor_definitions.MONITORS.FAIRNESS.ID,90                 'quality': wos_client.monitor_definitions.MONITORS.QUALITY.ID,91                 'drift': wos_client.monitor_definitions.MONITORS.DRIFT.ID}92    monitor_details = wos_client.monitor_instances.create(93        data_mart_id=data_mart_id,94        background_mode=False,95        monitor_definition_id=type_dict[type],96        target=target,97        parameters=parameters,98        thresholds=thresholds).result99    monitor_instance_id = monitor_details.metadata.id100    return monitor_instance_id101def finish_explanation_tasks(wos_client, explanation_task_ids, sample_size):102    finished_explanations = []103    finished_explanation_task_ids = []104    # Check for the explanation task status for finished status.105    # If it is in-progress state, then sleep for some time and check again.106    # Perform the same for couple of times, so that all tasks get into finished state.107    for i in range(0, 5):108        # for each explanation109        print('iteration ' + str(i))110        # check status for all explanation tasks111        for explanation_task_id in explanation_task_ids:112            if explanation_task_id not in finished_explanation_task_ids:113                result = wos_client.monitor_instances.get_explanation_tasks(114                    explanation_task_id=explanation_task_id).result115                print(result)116                print(explanation_task_id + ' : ' + result.entity.status.state)117                if (118                        result.entity.status.state == 'finished' or result.entity.status.state == 'error') and explanation_task_id not in finished_explanation_task_ids:119                    finished_explanation_task_ids.append(explanation_task_id)120                    finished_explanations.append(result)121        # if there is altest one explanation task that is not yet completed, then sleep for sometime,122        # and check for all those tasks, for which explanation is not yet completeed.123        if len(finished_explanation_task_ids) != sample_size:124            print('sleeping for some time..')125            time.sleep(10)126        else:127            break128    return finished_explanations129def construct_explanation_features_map(feature_name, feature_weight, explanation_features_map):130    if feature_name in explanation_features_map:131        explanation_features_map[feature_name].append(feature_weight)132    else:133        explanation_features_map[feature_name] = [feature_weight]134def score(training_data_frame):135    # The data type of the label column and prediction column should be same .136    # User needs to make sure that label column and prediction column array should have the same unique class labels137    prediction_column_name = "prediction"138    probability_column_name = "probability"139    feature_columns = list(training_data_frame.columns)140    training_data_rows = training_data_frame[feature_columns].values.tolist()141    payload_scoring_records = {142        "fields": feature_columns,143        "values": [x for x in training_data_rows]144    }145    header = {"Content-Type": "application/json", "x": "y"}146    scoring_response_raw = requests.post(scoring_url, json=payload_scoring_records, headers=header, verify=False)147    scoring_response = scoring_response_raw.json()148    prob_col_index = list(scoring_response.get('fields')).index(probability_column_name)149    predict_col_index = list(scoring_response.get('fields')).index(prediction_column_name)150    if prob_col_index < 0 or predict_col_index < 0:151        raise Exception("Missing prediction/probability column in the scoring response")152    import numpy as np153    probability_array = np.array([value[prob_col_index] for value in scoring_response.get('values')])154    prediction_vector = np.array([value[predict_col_index] for value in scoring_response.get('values')])155    return probability_array, prediction_vector156def generating_drift_model(training_df, drift_detection_input, scoring_method):157    drift_trainer = DriftTrainer(training_df, drift_detection_input)158    if model_type != "regression":159        drift_trainer.generate_drift_detection_model(scoring_method, batch_size=training_df.shape[0])160    # Note: Two column constraints are not computed beyond two_column_learner_limit(default set to 200)161    drift_trainer.learn_constraints(two_column_learner_limit=200)162    drift_trainer.create_archive()163def remove_drift_monitor_for_subscription(wos_client, subscription_id):164    monitor_instances = wos_client.monitor_instances.list().result.monitor_instances165    for monitor_instance in monitor_instances:166        monitor_def_id = monitor_instance.entity.monitor_definition_id167        if monitor_def_id == "drift" and monitor_instance.entity.target.target_id == subscription_id:168            wos_client.monitor_instances.delete(monitor_instance.metadata.id)169            print('Deleted existing drift monitor instance with id: ', monitor_instance.metadata.id)170def fairness(wos_client, data_mart_id, subscription_id):171    # ===========fairness  min100/hourly172    target = Target(173        target_type=TargetTypes.SUBSCRIPTION,174        target_id=subscription_id175    )176    parameters = {177        "features": [178            {"feature": "Sex",179             "majority": ['male'],180             "minority": ['female']181             },182            {"feature": "Age",183             "majority": [[26, 75]],184             "minority": [[18, 25]]185             }186        ],187        "favourable_class": ["No Risk"],188        "unfavourable_class": ["Risk"],189        "min_records": 100190    }191    thresholds = [{192        "metric_id": "fairness_value",193        "specific_values": [{194            "applies_to": [{195                "key": "feature",196                "type": "tag",197                "value": "Age"198            }],199            "value": 95200        },201            {202                "applies_to": [{203                    "key": "feature",204                    "type": "tag",205                    "value": "Sex"206                }],207                "value": 95208            }209        ],210        "type": "lower_limit",211        "value": 80.0212    }]213    fairness_monitor_instance_id = create_monitor(data_mart_id, target, parameters, thresholds, 'fairness',214                                                  wos_client)215    ### Get Fairness Monitor Instance216    wos_client.monitor_instances.show()217    ### Get run details218    runs = wos_client.monitor_instances.list_runs(fairness_monitor_instance_id, limit=1).result.to_dict()219    fairness_monitoring_run_id = runs["runs"][0]["metadata"]["id"]220    run_status = None221    while (run_status not in ["finished", "error"]):222        run_details = wos_client.monitor_instances.get_run_details(fairness_monitor_instance_id,223                                                                   fairness_monitoring_run_id).result.to_dict()224        run_status = run_details["entity"]["status"]["state"]225        print('run_status: ', run_status)226        if run_status in ["finished", "error"]:227            break228        time.sleep(10)229    ### Fairness run output230    wos_client.monitor_instances.get_run_details(fairness_monitor_instance_id,231                                                 fairness_monitoring_run_id).result.to_dict()232    wos_client.monitor_instances.show_metrics(monitor_instance_id=fairness_monitor_instance_id)233def explainability(wos_client, data_mart_id, subscription_id, sample_size=2):234    # ====================Explainability235    target = Target(236        target_type=TargetTypes.SUBSCRIPTION,237        target_id=subscription_id238    )239    parameters = {240        "enabled": True241    }242    explain_monitor_details = wos_client.monitor_instances.create(243        data_mart_id=data_mart_id,244        background_mode=False,245        monitor_definition_id=wos_client.monitor_definitions.MONITORS.EXPLAINABILITY.ID,246        target=target,247        parameters=parameters248    ).result249    scoring_ids = []250    for i in range(0, sample_size):251        n = random.randint(1, 100)252        scoring_ids.append(scoring_id + '-' + str(n))253    print("Running explanations on scoring IDs: {}".format(scoring_ids))254    explanation_types = ["lime", "contrastive"]255    result = wos_client.monitor_instances.explanation_tasks(scoring_ids=scoring_ids,256                                                            explanation_types=explanation_types).result257    print(result)258    ### Explanation tasks259    explanation_task_ids = result.metadata.explanation_task_ids260    return explanation_task_ids261def explanation_feature_map_plot(finished_explanations):262    explanation_features_map = {}263    for result in finished_explanations:264        print('\n>>>>>>>>>>>>>>>>>>>>>>\n')265        print(266            'explanation task: ' + str(result.metadata.explanation_task_id) + ', perturbed:' + str(267                result.entity.perturbed))268        if result.entity.explanations is not None:269            explanations = result.entity.explanations270            for explanation in explanations:271                if 'predictions' in explanation:272                    predictions = explanation['predictions']273                    for prediction in predictions:274                        predicted_value = prediction['value']275                        probability = prediction['probability']276                        print('prediction : ' + str(predicted_value) + ', probability : ' + str(probability))277                        if 'explanation_features' in prediction:278                            explanation_features = prediction['explanation_features']279                            for explanation_feature in explanation_features:280                                feature_name = explanation_feature['feature_name']281                                feature_weight = explanation_feature['weight']282                                if (feature_weight >= 0):283                                    feature_weight_percent = round(feature_weight * 100, 2)284                                    print(str(feature_name) + ' : ' + str(feature_weight_percent))285                                    task_feature_weight_map = {}286                                    task_feature_weight_map[287                                        result.metadata.explanation_task_id] = feature_weight_percent288                                    construct_explanation_features_map(feature_name, feature_weight_percent,289                                                                       explanation_features_map)290            print('\n>>>>>>>>>>>>>>>>>>>>>>\n')291    for key in explanation_features_map.keys():292        # plot_graph(key, explanation_features_map[key])293        values = explanation_features_map[key]294        plt.title(key)295        plt.ylabel('Weight')296        plt.bar(range(len(values)), values)297        plt.show()298def quality(wos_client, subscription_id, feedback_file_path):299    # =========================Quality monitoring and feedback logging300    target = Target(301        target_type=TargetTypes.SUBSCRIPTION,302        target_id=subscription_id303    )304    parameters = {305        "min_feedback_data_size": 90306    }307    thresholds = [308        {309            "metric_id": "area_under_roc",310            "type": "lower_limit",311            "value": .80312        }313    ]314    quality_monitor_instance_id = create_monitor(data_mart_id, target, parameters, thresholds, 'quality',315                                                 wos_client)316    ## Feedback logging317    ## Get feedback logging dataset ID318    feedback_dataset_id = None319    feedback_dataset = wos_client.data_sets.list(type=DataSetTypes.FEEDBACK,320                                                 target_target_id=subscription_id,321                                                 target_target_type=TargetTypes.SUBSCRIPTION).result322    feedback_dataset_id = feedback_dataset.data_sets[0].metadata.id323    if feedback_dataset_id is None:324        print("Feedback data set not found. Please check quality monitor status.")325    with open(feedback_file_path) as feedback_file:326        additional_feedback_data = json.load(feedback_file)327    wos_client.data_sets.store_records(feedback_dataset_id, request_body=additional_feedback_data,328                                       background_mode=False)329    wos_client.data_sets.get_records_count(data_set_id=feedback_dataset_id)330    run_details = wos_client.monitor_instances.run(monitor_instance_id=quality_monitor_instance_id,331                                                   background_mode=False).result332    wos_client.monitor_instances.show_metrics(monitor_instance_id=quality_monitor_instance_id)333def drift(wos_client, data_mart_id, training_file_path, subscription_id, score_method, drift_detection_input):334    # =======================Drift335    # Drift detection model generation336    df = pd.read_csv(training_file_path)337    ### Define the drift detection input338    ### Generate drift detection model339    filename = 'drift_detection_model.tar.gz'340    generating_drift_model(df, drift_detection_input, score_method)341    ### Upload the drift detection model to OpenScale subscription342    wos_client.monitor_instances.upload_drift_model(343        model_path=filename,344        archive_name=filename,345        data_mart_id=data_mart_id,346        subscription_id=subscription_id,347        enable_data_drift=True,348        enable_model_drift=True349    )350    ### Delete the existing drift monitor instance for the subscription351    remove_drift_monitor_for_subscription(wos_client, subscription_id)352    target = Target(353        target_type=TargetTypes.SUBSCRIPTION,354        target_id=subscription_id355    )356    parameters = {357        "min_samples": 100,358        "drift_threshold": 0.1,359        "train_drift_model": False,360        "enable_model_drift": True,361        "enable_data_drift": True362    }363    drift_monitor_instance_id = create_monitor(data_mart_id, target, parameters, {}, 'drift', wos_client)364    ### Drift run365    drift_run_details = wos_client.monitor_instances.run(monitor_instance_id=drift_monitor_instance_id,366                                                         background_mode=False)367    time.sleep(5)368    wos_client.monitor_instances.show_metrics(monitor_instance_id=drift_monitor_instance_id)369if __name__ == '__main__':370    WOS_CPD_TECHZONE = {371        "url": "https://services-uscentral.skytap.com:8586/",372        "username": "admin",373        "password": "password",374        "version": "3.5"375    }376    WOS_CPD_LUBAN = {377        "url": "https://zen-cpd-zen.apps.bj-prod-2.luban.cdl.ibm.com/",378        "username": "admin",379        "password": "password",380        "version": "3.5"381    }382    DB2_TECHZONE = {'table_name': 'GERMAN_CREDIT',383                    'schema_name': 'aiopenscale00',384                    'hostname': '10.1.1.1',385                    'username': 'admin',386                    'password': 'password',387                    'database_name': 'aiopenscale00'}388    DB2_LUBAN = {'table_name': 'GERMAN_CREDIT',389                 'schema_name': 'OPENSCALE',390                 'hostname': 'worker19.bj-prod-2.luban.cdl.ibm.com:30157',391                 'username': 'admin',392                 'password': 'password',393                 'database_name': 'aiopenscale00'}394    VM_SCORING_URL = 'http://169.62.165.235:8880/predict/'395    GBS_SCORING_URL = 'https://9.112.255.149/edi/service199/api'396    general_header = {397        "Content-Type": "application/json",398        "Custom_header_X": "Custom_header_X_value_Y"399    }400    GBS_request_header = {401        'Content-Type': 'application/json',402        'apikey': 'eyJhbGciOiJSUzI1NiIsInppcCI6IkdaSVAifQ.H4sIAAAAAAAAAC2MWwrCMBQF93K_cyFN0oZ0A-Iy8rhKtG1KHqKIezcWvw7DGeYNpTmYYU2BlrQXrOlOG15po2xrTBu6F_rFxrUAg1hKd72lH2QqqWVPp5zafg4wD5xBofyIng40hgE9d5iVNpyLaZQjg9aN4-01W_tOclL_71ZjzwclSWjucXBOorpYhXYUGjtpsloMRnj4fAE1qncluQAAAA.LcoU4zK_a1qq8mxZAc1iBD0wQ9XuHkgWvjOy5As4Ob1Zb2_uZ1MHPu_nYnoPnRi07EMC-p-kqP8ahjlATXvIElxhxkUXHJP11TbkAQGnwVgfYX_iH4VN8Pft8Ma8eIjpnZ-QH5XM9iYige9u9dFDR2RHcD94LPVHHwLs2CNMX_U'403    }404    # TECHZONE405    wos_config = WOS_CPD_TECHZONE406    db2_config = DB2_TECHZONE407    scoring_url = VM_SCORING_URL408    scoring_request_headers = general_header409    # # GBS410    # wos_config = WOS_CPD_LUBAN411    # db2_config = DB2_LUBAN412    # scoring_url = GBS_SCORING_URL413    # scoring_request_headers=GBS_request_header414    label_column = "Risk"415    model_type = "binary"416    training_file_path = "../data/german_credit_data_biased_training.csv"417    feedback_file_path = '../data/additional_feedback_data_v2.json'418    df = pd.read_csv(training_file_path)419    cols_to_remove = [label_column]420    payload_scoring = get_scoring_payload(df, cols_to_remove, 1)421    scoring_id = None422    wos_client = auth_cpd(wos_config)423    data_marts = wos_client.data_marts.list().result.data_marts424    if len(data_marts) == 0:425        raise Exception("Missing data mart.")426    data_mart_id = data_marts[0].metadata.id427    print('Using existing datamart {}'.format(data_mart_id))428    data_mart_details = wos_client.data_marts.list().result.data_marts[0]429    data_mart_details.to_dict()430    print(wos_client.service_providers.show())431    SERVICE_PROVIDER_NAME = "Custom ML Provider Demo - All Monitors"432    SERVICE_PROVIDER_DESCRIPTION = "Added by tutorial WOS notebook to showcase monitoring Fairness, Quality, Drift and Explainability against a Custom ML provider."433    remove_existing_service_provider(wos_client, SERVICE_PROVIDER_NAME)434    service_provider_id = add_service_provider(SERVICE_PROVIDER_NAME, SERVICE_PROVIDER_DESCRIPTION, )435    print('Data Mart ID : ' + data_mart_id)436    wos_client.subscriptions.show()437    SUBSCRIPTION_NAME = "Custom ML Subscription - All Monitors"438    remove_existing_subscription(wos_client, SUBSCRIPTION_NAME)439    feature_columns = ["CheckingStatus", "LoanDuration", "CreditHistory", "LoanPurpose", "LoanAmount",440                       "ExistingSavings",441                       "EmploymentDuration", "InstallmentPercent", "Sex", "OthersOnLoan", "CurrentResidenceDuration",442                       "OwnsProperty", "Age", "InstallmentPlans", "Housing", "ExistingCreditsCount", "Job",443                       "Dependents",444                       "Telephone", "ForeignWorker"]445    cat_features = ["CheckingStatus", "CreditHistory", "LoanPurpose", "ExistingSavings", "EmploymentDuration", "Sex",446                    "OthersOnLoan", "OwnsProperty", "InstallmentPlans", "Housing", "Job", "Telephone", "ForeignWorker"]447    asset_id = str(uuid.uuid4())448    asset_name = '[asset] ' + SUBSCRIPTION_NAME449    url = ''450    asset_deployment_id = str(uuid.uuid4())451    asset_deployment_name = asset_name452    asset_deployment_scoring_url = scoring_url453    scoring_endpoint_url = scoring_url454    subscription_details = wos_client.subscriptions.add(455        data_mart_id=data_mart_id,456        service_provider_id=service_provider_id,457        asset=Asset(458            asset_id=asset_id,459            name=asset_name,460            url=url,461            asset_type=AssetTypes.MODEL,462            input_data_type=InputDataType.STRUCTURED,463            problem_type=ProblemType.BINARY_CLASSIFICATION464        ),465        deployment=AssetDeploymentRequest(466            deployment_id=asset_deployment_id,467            name=asset_deployment_name,468            deployment_type=DeploymentTypes.ONLINE,469            scoring_endpoint=ScoringEndpointRequest(470                url=scoring_endpoint_url,471                request_headers=scoring_request_headers472            )473        ),474        asset_properties=AssetPropertiesRequest(475            label_column=label_column,476            probability_fields=["probability"],477            prediction_field="prediction",478            feature_fields=feature_columns,479            categorical_fields=cat_features,480            training_data_reference=TrainingDataReference(type="db2",481                                                          location=DB2TrainingDataReferenceLocation(482                                                              table_name=db2_config['table_name'],483                                                              schema_name=db2_config['schema_name']),484                                                          connection=DB2TrainingDataReferenceConnection.from_dict({485                                                              'hostname': db2_config['hostname'],486                                                              'username': db2_config['username'],487                                                              'password': db2_config['password'],488                                                              'database_name': db2_config['database_name']}))489        )490    ).result491    subscription_id = subscription_details.metadata.id492    print('Subscription ID: ' + subscription_id)493    time.sleep(5)494    payload_data_set_id = None495    payload_data_set_id = wos_client.data_sets.list(type=DataSetTypes.PAYLOAD_LOGGING,496                                                    target_target_id=subscription_id,497                                                    target_target_type=TargetTypes.SUBSCRIPTION).result.data_sets[498        0].metadata.id499    if payload_data_set_id is None:500        print("Payload data set not found. Please check subscription status.")501    else:502        print("Payload data set id:", payload_data_set_id)503    wos_client.subscriptions.get(subscription_id).result.to_dict()504    # send a request to the model before we configure OpenScale.505    # This allows OpenScale to create a payload log in the datamart with the correct schema, so it can capture data coming into and out of the model.506    payload_scoring = get_scoring_payload(df, cols_to_remove, no_of_records_to_score=100)507    scoring_response = custom_ml_scoring(scoring_url, payload_scoring)508    scoring_id = payload_logging(payload_scoring, scoring_response, wos_client, payload_data_set_id)509    print('scoring_id: ' + str(scoring_id))510    fairness(wos_client, data_mart_id, subscription_id)511    sample_size = 2512    # explanation_task_ids = explainability(wos_client, data_mart_id, subscription_id, sample_size=2)513    # finished_explanations = finish_explanation_tasks(wos_client, explanation_task_ids, sample_size=2)514    # explanation_task_ids = ['8e71821a-cb9c-453e-a08f-ab8e3395aa11', '39bc9469-c956-43b8-be83-1a344be39367']515    # finished_explanations = []516    # while len(finished_explanations) < sample_size:517    #     finished_explanations = finish_explanation_tasks(wos_client, explanation_task_ids, sample_size=2)518    # explanation_feature_map_plot(finished_explanations)519    quality(wos_client, subscription_id, feedback_file_path)520    drift_detection_input = {521        "feature_columns": feature_columns,522        "categorical_columns": cat_features,523        "label_column": label_column,524        "problem_type": model_type525    }...monitor.py
Source:monitor.py  
1import logging2from asyncio import CancelledError3from exceptions import (4    JobAlreadyStarted,5    JobNotStarted,6    InvalidInterval,7    InvalidName,8    InvalidType,9    TooMuchArgument,10)11from mattermost.notify import notify, NOTIFICATION_SUCCESS12from models.monitor import MonitorModel13from monitors.implems import all_monitors14import scheduler15class Monitor:16    monitor_instances = {}17    @classmethod18    async def create(cls, monitor_conf, custom_conf):19        Monitor.validate_monitor_conf(monitor_conf)20        monitor_class = all_monitors[monitor_conf["type"]]21        await monitor_class.validate_custom_conf(custom_conf)22        model = await MonitorModel.create(monitor_conf, custom_conf)23        monitor = monitor_class(model)24        Monitor.monitor_instances[monitor_conf["name"]] = monitor25        return monitor26    @classmethod27    async def load_all(cls):28        for model in await MonitorModel.get_all():29            monitor_class = all_monitors[(await model.monitor_conf())["type"]]30            monitor = monitor_class(model)31            Monitor.monitor_instances[(await model.monitor_conf())["name"]] = monitor32        return Monitor.monitor_instances33    def __init__(self, model):34        self.job = None35        self.model = model36    async def notify(self, data, notification_type=NOTIFICATION_SUCCESS):37        monitor_conf = await self.get_monitor_conf()38        username = monitor_conf["name"]39        channel = monitor_conf["channel"] if "channel" in monitor_conf else None40        await notify(41            data,42            notification_type=notification_type,43            username=username,44            channel=channel,45        )46    async def get_custom_conf(self):47        return await self.model.custom_conf()48    async def set_custom_conf(self, conf):49        await self.validate_custom_conf(conf)50        await self.model.custom_conf(conf)51    async def get_monitor_conf(self):52        return await self.model.monitor_conf()53    async def set_monitor_conf(self, conf):54        return await self.model.monitor_conf(conf)55    async def get_state(self):56        return await self.model.state()57    async def set_state(self, state):58        await self.model.state(state)59    async def set_channel(self, channel=None):60        monitor_conf = await self.get_monitor_conf()61        if channel:62            monitor_conf["channel"] = channel63        elif "channel" in monitor_conf:64            del monitor_conf["channel"]65        await self.set_monitor_conf(monitor_conf)66    @classmethod67    def validate_monitor_conf(cls, conf):68        if (69            "interval" not in conf70            or type(conf["interval"]) is not int71            or conf["interval"] < 072        ):73            raise InvalidInterval74        if (75            "name" not in conf76            or conf["name"] in Monitor.monitor_instances77            or not conf["name"].isalnum()78        ):79            raise InvalidName80        if "type" not in conf or conf["type"] not in all_monitors:81            raise InvalidType82        if len(conf) > 3:83            raise TooMuchArgument84    async def job_start(self):85        if self.job_is_started():86            raise JobAlreadyStarted87        interval = (await self.get_monitor_conf())["interval"]88        self.job = scheduler.scheduler.add_job(89            self.do_job, "interval", seconds=interval90        )91    def job_stop(self):92        if not self.job_is_started():93            raise JobNotStarted94        self.job.remove()95        self.job = None96    def job_is_started(self):97        return self.job is not None98    async def do_job(self):99        # Todo: log it100        # Todo: try catch101        # Todo: Execution time102        name = (await self.get_monitor_conf())["name"]103        logging.warning(f"Executing job {name}")104        try:105            old_state = await self.get_state()106            new_state = await self.refresh()107            await self.compare(old_state, new_state)108            await self.set_state(new_state)109        except CancelledError:  # pragma: no cover110            pass111        except Exception as e:  # pragma: no cover112            if self.job is not None:113                raise e114            print("Exception on job due to killing it")115    async def remove(self):116        try:117            self.job_stop()118        except JobNotStarted:119            pass120        names = []121        for name in Monitor.monitor_instances:122            if Monitor.monitor_instances[name] is self:123                names.append(name)124        for name in names:125            del Monitor.monitor_instances[name]126        await self.model.remove()127    # To be implemented in implems128    async def refresh(self):  # pragma: no cover129        raise NotImplemented130    async def compare(self, old_state, new_state):  # pragma: no cover131        raise NotImplemented132    @classmethod133    async def validate_custom_conf(cls, conf):  # pragma: no cover134        raise NotImplemented135    @classmethod136    async def test_me(cls, test_case):  # pragma: no cover...tromino.py
Source:tromino.py  
1import asyncio2from exceptions import JobAlreadyStarted3from mattermost.notify import notify4from monitors.monitor import Monitor5from scheduler import scheduler6from server import run_server7import monitors8async def main():9    await notify("Tromino: Starting...")10    scheduler.start()11    await notify("Tromino: Scheduler up")12    monitor_types = monitors.load_monitors()13    await notify(f"Tromino: Loading plugins - {', '.join([m for m in monitor_types])}")14    # job = scheduler.add_job(lambda: print(1), "interval", seconds=5)15    monitor_instances = await Monitor.load_all()16    await notify(f"Tromino: Loading monitor instances - {len(monitor_instances)}")17    for m in monitor_instances:18        try:19            await monitor_instances[m].job_start()20            await notify(f"Tromino: Starting {m}")21        except JobAlreadyStarted:22            pass23    await run_server()24    await notify("Tromino: HTTP Server stared")25    await notify("Tromino: Hello ! What could I do for you ?")26asyncio.ensure_future(main())...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
