How to use generate_metric_query method in localstack

Best Python code snippet using localstack_python

alarm_scheduler.py

Source:alarm_scheduler.py Github

copy

Full Screen

...104 return client.describe_alarms(AlarmNames=[alarm_name])["MetricAlarms"][0]105def get_cloudwatch_client_for_region_of_alarm(alarm_arn: str) -> "CloudWatchClient":106 region = aws_stack.extract_region_from_arn(alarm_arn)107 return aws_stack.connect_to_service("cloudwatch", region_name=region)108def generate_metric_query(alarm_details: MetricAlarm) -> MetricDataQuery:109 """Creates the dict with the required data for MetricDataQueries when calling client.get_metric_data"""110 metric = {111 "MetricName": alarm_details["MetricName"],112 }113 if alarm_details.get("Namespace"):114 metric["Namespace"] = alarm_details["Namespace"]115 if alarm_details.get("Dimensions"):116 metric["Dimensions"] = alarm_details["Dimensions"]117 return {118 "Id": alarm_details["AlarmName"],119 "MetricStat": {120 "Metric": metric,121 "Period": alarm_details["Period"],122 "Stat": alarm_details["Statistic"].capitalize(),123 },124 # TODO other fields might be required in the future125 }126def is_threshold_exceeded(metric_values: List[float], alarm_details: MetricAlarm) -> bool:127 """Evaluates if the threshold is exceeded for the configured alarm and given metric values128 :param metric_values: values to compare against threshold129 :param alarm_details: Alarm Description, as returned from describe_alarms130 :return: True if threshold is exceeded, else False131 """132 threshold = alarm_details["Threshold"]133 comparison_operator = alarm_details["ComparisonOperator"]134 treat_missing_data = alarm_details.get("TreatMissingData", "missing")135 evaluation_periods = alarm_details.get("EvaluationPeriods")136 datapoints_to_alarm = alarm_details.get("DatapointsToAlarm", evaluation_periods)137 evaluated_datapoints = []138 for value in metric_values:139 if value is None:140 if treat_missing_data == "breaching":141 evaluated_datapoints.append(True)142 elif treat_missing_data == "notBreaching":143 evaluated_datapoints.append(False)144 # else we can ignore the data145 else:146 evaluated_datapoints.append(COMPARISON_OPS.get(comparison_operator)(value, threshold))147 sum_breaching = evaluated_datapoints.count(True)148 if sum_breaching >= datapoints_to_alarm:149 return True150 return False151def is_triggering_premature_alarm(metric_values: List[float], alarm_details: MetricAlarm) -> bool:152 """153 Checks if a premature alarm should be triggered.154 https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html#CloudWatch-alarms-avoiding-premature-transition:155 [...] alarms are designed to always go into ALARM state when the oldest available breaching datapoint during the Evaluation156 Periods number of data points is at least as old as the value of Datapoints to Alarm, and all other more recent data157 points are breaching or missing. In this case, the alarm goes into ALARM state even if the total number of datapoints158 available is lower than M (Datapoints to Alarm).159 This alarm logic applies to M out of N alarms as well.160 """161 treat_missing_data = alarm_details.get("TreatMissingData", "missing")162 if treat_missing_data not in ("missing", "ignore"):163 return False164 datapoints_to_alarm = alarm_details.get("DatapointsToAlarm", 1)165 if datapoints_to_alarm > 1:166 comparison_operator = alarm_details["ComparisonOperator"]167 threshold = alarm_details["Threshold"]168 oldest_datapoints = metric_values[:-datapoints_to_alarm]169 if oldest_datapoints.count(None) == len(oldest_datapoints):170 if metric_values[-datapoints_to_alarm] and COMPARISON_OPS.get(comparison_operator)(171 metric_values[-datapoints_to_alarm], threshold172 ):173 values = list(filter(None, metric_values[len(oldest_datapoints) :]))174 if all(175 COMPARISON_OPS.get(comparison_operator)(value, threshold) for value in values176 ):177 return True178 return False179def collect_metric_data(alarm_details: MetricAlarm, client: "CloudWatchClient") -> List[float]:180 """181 Collects the metric data for the evaluation interval.182 :param alarm_details: the alarm details as returned by describe_alarms183 :param client: the cloudwatch client184 :return: list with data points185 """186 metric_values = []187 evaluation_periods = alarm_details["EvaluationPeriods"]188 period = alarm_details["Period"]189 # From the docs: "Whenever an alarm evaluates whether to change state, CloudWatch attempts to retrieve a higher number of data190 # points than the number specified as Evaluation Periods."191 # No other indication, try to calculate a reasonable value:192 magic_number = max(math.floor(evaluation_periods / 3), 2)193 collected_periods = evaluation_periods + magic_number194 now = datetime.utcnow().replace(tzinfo=timezone.utc)195 metric_query = generate_metric_query(alarm_details)196 # get_metric_data needs to be run in a loop, so we also collect empty data points on the right position197 for i in range(0, collected_periods):198 start_time = now - timedelta(seconds=period)199 end_time = now200 metric_data = client.get_metric_data(201 MetricDataQueries=[metric_query], StartTime=start_time, EndTime=end_time202 )["MetricDataResults"][0]203 val = metric_data["Values"]204 # oldest datapoint should be at the beginning of the list205 metric_values.insert(0, val[0] if val else None)206 now = start_time207 return metric_values208def update_alarm_state(209 client: "CloudWatchClient",...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful