Best Python code snippet using localstack_python
alarm_scheduler.py
Source:alarm_scheduler.py  
...122            "Stat": alarm_details["Statistic"].capitalize(),123        },124        # TODO other fields might be required in the future125    }126def is_threshold_exceeded(metric_values: List[float], alarm_details: MetricAlarm) -> bool:127    """Evaluates if the threshold is exceeded for the configured alarm and given metric values128    :param metric_values: values to compare against threshold129    :param alarm_details: Alarm Description, as returned from describe_alarms130    :return: True if threshold is exceeded, else False131    """132    threshold = alarm_details["Threshold"]133    comparison_operator = alarm_details["ComparisonOperator"]134    treat_missing_data = alarm_details.get("TreatMissingData", "missing")135    evaluation_periods = alarm_details.get("EvaluationPeriods")136    datapoints_to_alarm = alarm_details.get("DatapointsToAlarm", evaluation_periods)137    evaluated_datapoints = []138    for value in metric_values:139        if value is None:140            if treat_missing_data == "breaching":141                evaluated_datapoints.append(True)142            elif treat_missing_data == "notBreaching":143                evaluated_datapoints.append(False)144            # else we can ignore the data145        else:146            evaluated_datapoints.append(COMPARISON_OPS.get(comparison_operator)(value, threshold))147    sum_breaching = evaluated_datapoints.count(True)148    if sum_breaching >= datapoints_to_alarm:149        return True150    return False151def is_triggering_premature_alarm(metric_values: List[float], alarm_details: MetricAlarm) -> bool:152    """153    Checks if a premature alarm should be triggered.154    https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html#CloudWatch-alarms-avoiding-premature-transition:155    [...] alarms are designed to always go into ALARM state when the oldest available breaching datapoint during the Evaluation156    Periods number of data points is at least as old as the value of Datapoints to Alarm, and all other more recent data157    points are breaching or missing. In this case, the alarm goes into ALARM state even if the total number of datapoints158    available is lower than M (Datapoints to Alarm).159    This alarm logic applies to M out of N alarms as well.160    """161    treat_missing_data = alarm_details.get("TreatMissingData", "missing")162    if treat_missing_data not in ("missing", "ignore"):163        return False164    datapoints_to_alarm = alarm_details.get("DatapointsToAlarm", 1)165    if datapoints_to_alarm > 1:166        comparison_operator = alarm_details["ComparisonOperator"]167        threshold = alarm_details["Threshold"]168        oldest_datapoints = metric_values[:-datapoints_to_alarm]169        if oldest_datapoints.count(None) == len(oldest_datapoints):170            if metric_values[-datapoints_to_alarm] and COMPARISON_OPS.get(comparison_operator)(171                metric_values[-datapoints_to_alarm], threshold172            ):173                values = list(filter(None, metric_values[len(oldest_datapoints) :]))174                if all(175                    COMPARISON_OPS.get(comparison_operator)(value, threshold) for value in values176                ):177                    return True178    return False179def collect_metric_data(alarm_details: MetricAlarm, client: "CloudWatchClient") -> List[float]:180    """181    Collects the metric data for the evaluation interval.182    :param alarm_details: the alarm details as returned by describe_alarms183    :param client: the cloudwatch client184    :return: list with data points185    """186    metric_values = []187    evaluation_periods = alarm_details["EvaluationPeriods"]188    period = alarm_details["Period"]189    # From the docs: "Whenever an alarm evaluates whether to change state, CloudWatch attempts to retrieve a higher number of data190    # points than the number specified as Evaluation Periods."191    # No other indication, try to calculate a reasonable value:192    magic_number = max(math.floor(evaluation_periods / 3), 2)193    collected_periods = evaluation_periods + magic_number194    now = datetime.utcnow().replace(tzinfo=timezone.utc)195    metric_query = generate_metric_query(alarm_details)196    # get_metric_data needs to be run in a loop, so we also collect empty data points on the right position197    for i in range(0, collected_periods):198        start_time = now - timedelta(seconds=period)199        end_time = now200        metric_data = client.get_metric_data(201            MetricDataQueries=[metric_query], StartTime=start_time, EndTime=end_time202        )["MetricDataResults"][0]203        val = metric_data["Values"]204        # oldest datapoint should be at the beginning of the list205        metric_values.insert(0, val[0] if val else None)206        now = start_time207    return metric_values208def update_alarm_state(209    client: "CloudWatchClient",210    alarm_name: str,211    current_state: str,212    desired_state: str,213    reason: str = DEFAULT_REASON,214) -> None:215    """Updates the alarm state, if the current_state is different than the desired_state216    :param client: the cloudwatch client217    :param alarm_name: the name of the alarm218    :param current_state: the state the alarm is currently in219    :param desired_state: the state the alarm should have after updating220    :param reason: reason why the state is set, will be used to for set_alarm_state221    """222    if current_state == desired_state:223        return224    client.set_alarm_state(AlarmName=alarm_name, StateValue=desired_state, StateReason=reason)225def calculate_alarm_state(alarm_arn: str) -> None:226    """227    Calculates and updates the state of the alarm228    :param alarm_arn: the arn of the alarm to be evaluated229    """230    alarm_details = get_metric_alarm_details_for_alarm_arn(alarm_arn)231    client = get_cloudwatch_client_for_region_of_alarm(alarm_arn)232    metric_values = collect_metric_data(alarm_details, client)233    alarm_name = alarm_details["AlarmName"]234    alarm_state = alarm_details["StateValue"]235    treat_missing_data = alarm_details.get("TreatMissingData", "missing")236    empty_datapoints = metric_values.count(None)237    if empty_datapoints == len(metric_values):238        if treat_missing_data == "missing":239            update_alarm_state(240                client,241                alarm_name,242                alarm_state,243                STATE_INSUFFICIENT_DATA,244                f"{INSUFFICIENT_DATA}: empty datapoints",245            )246        elif treat_missing_data == "breaching":247            update_alarm_state(248                client,249                alarm_name,250                alarm_state,251                STATE_ALARM,252                f"{THRESHOLD_CROSSED}: empty datapoints - treated as breaching",253            )254        elif treat_missing_data == "notBreaching":255            update_alarm_state(256                client,257                alarm_name,258                alarm_state,259                STATE_OK,260                f"{THRESHOLD_OK}: empty datapoints - treated as notBreaching",261            )262        # 'ignore': keep the same state263        return264    if is_triggering_premature_alarm(metric_values, alarm_details):265        if treat_missing_data == "missing":266            update_alarm_state(267                client,268                alarm_name,269                alarm_state,270                STATE_ALARM,271                f"{THRESHOLD_CROSSED}: premature alarm for missing datapoints",272            )273        # for 'ignore' the state should be retained274        return275    # collect all non-empty datapoints from the evaluation interval276    collected_datapoints = [val for val in reversed(metric_values) if val is not None]277    # adding empty data points until amount of data points == "evaluation periods"278    evaluation_periods = alarm_details["EvaluationPeriods"]279    while len(collected_datapoints) < evaluation_periods and treat_missing_data in (280        "breaching",281        "notBreaching",282    ):283        # breaching/non-breaching datapoints will be evaluated284        # ignore/missing are not relevant285        collected_datapoints.append(None)286    if is_threshold_exceeded(collected_datapoints, alarm_details):287        update_alarm_state(client, alarm_name, alarm_state, STATE_ALARM, THRESHOLD_CROSSED)288    else:...search_iterative_regression.py
Source:search_iterative_regression.py  
1from __future__ import division2from numbers import Number3from collections import namedtuple4import sys5from options import property_to_boolean, property_to_number6class _FindMinimum(namedtuple('FindMinimum', ['value', 'data', 'count', 'rounds'])):7    pass8class IterativeRegression():9    """10     Class to perform tuning using an iterative regression to reduce surface illumination standard deviation.11     The approach iteratively updates LED intensities that affect the surface triangle with the illumination score12     furthest from mean, until the normalised standard deviation error threshold is reached; i.e. the residual error13     falls below the desired threshold.14     This is greedy (best-first) limited local optimsation. It uses the following properties file parameters.15         [BrightnessControlTuner]16         tune.regression.threshold=0.005 [Range: >=0] - Threshold score to beat before terminating. (Stopping criteria 1)17         tune.regression.max_improvement_attempts_on_best_score=10 [Range: >=1] - Number of attempts to improve on the current best score before accepting that as the final score. (Stopping criteria 2)18         tune.debug=False [In: True or False] - Print progress of search evaluations to STDOUT.19     """20    def __init__(self, evaluator_func, update_func):21        self._evaluator_func = evaluator_func22        self._update_func = update_func23        self.__set_properties()24    def __set_properties(self):25        self.threshold = property_to_number(section='BrightnessControlTuner', key='tune.regression.threshold', vmin=None, vmax=None, vtype=float)26        self.max_iterations = property_to_number(section='BrightnessControlTuner', key='tune.regression.max_improvement_attempts_on_best_score', vmin=None, vmax=None, vtype=int)27        # dict_properties = getPropertiesFile()28        # self.threshold = float(dict_properties['BrightnessControlTuner']['tune.regression.threshold'])29        # self.max_iterations = int(dict_properties['BrightnessControlTuner']['tune.regression.max_improvement_attempts_on_best_score'])30        self.DEBUG = property_to_boolean(section="BrightnessControlTuner", key="tune.debug")31        assert isinstance(self.threshold, Number) and self.threshold >= 0.0, \32            "Residual error threshold should be positive float: " + str(self.threshold) + ". Type: "+str(type(self.threshold))33        assert isinstance(self.max_iterations, Number) and self.max_iterations >= 1, \34            "Max search iterations should be positive int: " + str(self.max_iterations) + ". Type: " + str(type(self.max_iterations))35    def start(self, start_data):36        _starting_score = None37        default_value = sys.maxsize38        data = start_data[:]39        current_minimum = _FindMinimum(value=default_value, data=data, count=0, rounds=None)40        rounds = 041        while True:42            rounds += 143            data = self._update_func(data)44            x = self._evaluator_func(data)45            _starting_score = x if _starting_score is None else _starting_score46            if self.DEBUG:47                print("Round: "+str(rounds)+") - IterativeRegression - Score (Std/Qty): "+str(x)+" - (Best Round:"+str(current_minimum.count)+", Since Best Round:"+str(rounds-current_minimum.count) + ", Best Score: " + str(current_minimum.value) +")")48            is_best = x < current_minimum.value49            if is_best:50                current_minimum = _FindMinimum(value=x, data=data, count=rounds, rounds=None)51            has_been_evaluated = current_minimum.value != default_value52            is_threshold_exceeded = x < self.threshold                                                          # Stopping criteria 153            has_exceeded_max_iterations_on_this_value = (current_minimum.count + self.max_iterations) <= rounds # Stopping criteria 254            if is_threshold_exceeded or (has_been_evaluated and has_exceeded_max_iterations_on_this_value):55                break56        best_value = current_minimum.value57        best_data = current_minimum.data58        best_count = current_minimum.count59        current_minimum = _FindMinimum(value=best_value, data=best_data, count=best_count, rounds=rounds)60        if self.DEBUG:61            print("Start Score: "+str(_starting_score)+" Improved Score from "+str(rounds)+" trials: "+str(best_value))...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
