How to use events_client method in localstack

Best Python code snippet using localstack_python

test_events.py

Source:test_events.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import base643import json4import os5import uuid6from datetime import datetime7from typing import Dict, List, Tuple8import pytest9from localstack import config10from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON3611from localstack.services.events.provider import _get_events_tmp_dir12from localstack.services.generic_proxy import ProxyListener13from localstack.services.infra import start_proxy14from localstack.utils import testutil15from localstack.utils.aws import aws_stack16from localstack.utils.aws.aws_responses import requests_response17from localstack.utils.common import (18 get_free_tcp_port,19 get_service_protocol,20 load_file,21 retry,22 short_uid,23 to_str,24 wait_for_port_open,25)26from localstack.utils.testutil import check_expected_lambda_log_events_length27from .awslambda.test_lambda import TEST_LAMBDA_PYTHON_ECHO28THIS_FOLDER = os.path.dirname(os.path.realpath(__file__))29TEST_EVENT_BUS_NAME = "command-bus-dev"30EVENT_DETAIL = {"command": "update-account", "payload": {"acc_id": "0a787ecb-4015", "sf_id": "baz"}}31TEST_EVENT_PATTERN = {32 "source": ["core.update-account-command"],33 "detail-type": ["core.update-account-command"],34 "detail": {"command": ["update-account"]},35}36class TestEvents:37 def assert_valid_event(self, event):38 expected_fields = (39 "version",40 "id",41 "detail-type",42 "source",43 "account",44 "time",45 "region",46 "resources",47 "detail",48 )49 for field in expected_fields:50 assert field in event51 def test_put_rule(self, events_client):52 rule_name = "rule-{}".format(short_uid())53 events_client.put_rule(Name=rule_name, EventPattern=json.dumps(TEST_EVENT_PATTERN))54 rules = events_client.list_rules(NamePrefix=rule_name)["Rules"]55 assert len(rules) == 156 assert json.loads(rules[0]["EventPattern"]) == TEST_EVENT_PATTERN57 # clean up58 self.cleanup(rule_name=rule_name)59 def test_events_written_to_disk_are_timestamp_prefixed_for_chronological_ordering(60 self, events_client61 ):62 event_type = str(uuid.uuid4())63 event_details_to_publish = list(map(lambda n: f"event {n}", range(10)))64 for detail in event_details_to_publish:65 events_client.put_events(66 Entries=[67 {68 "Source": "unittest",69 "Resources": [],70 "DetailType": event_type,71 "Detail": json.dumps(detail),72 }73 ]74 )75 events_tmp_dir = _get_events_tmp_dir()76 sorted_events_written_to_disk = map(77 lambda filename: json.loads(str(load_file(os.path.join(events_tmp_dir, filename)))),78 sorted(os.listdir(events_tmp_dir)),79 )80 sorted_events = list(81 filter(82 lambda event: event.get("DetailType") == event_type,83 sorted_events_written_to_disk,84 )85 )86 assert (87 list(map(lambda event: json.loads(event["Detail"]), sorted_events))88 == event_details_to_publish89 )90 def test_list_tags_for_resource(self, events_client):91 rule_name = "rule-{}".format(short_uid())92 rule = events_client.put_rule(Name=rule_name, EventPattern=json.dumps(TEST_EVENT_PATTERN))93 rule_arn = rule["RuleArn"]94 expected = [95 {"Key": "key1", "Value": "value1"},96 {"Key": "key2", "Value": "value2"},97 ]98 # insert two tags, verify both are visible99 events_client.tag_resource(ResourceARN=rule_arn, Tags=expected)100 actual = events_client.list_tags_for_resource(ResourceARN=rule_arn)["Tags"]101 assert actual == expected102 # remove 'key2', verify only 'key1' remains103 expected = [{"Key": "key1", "Value": "value1"}]104 events_client.untag_resource(ResourceARN=rule_arn, TagKeys=["key2"])105 actual = events_client.list_tags_for_resource(ResourceARN=rule_arn)["Tags"]106 assert actual == expected107 # clean up108 self.cleanup(rule_name=rule_name)109 @pytest.mark.aws_validated110 def test_put_events_with_target_sqs(self, events_client, sqs_client):111 entries = [112 {113 "Source": TEST_EVENT_PATTERN["source"][0],114 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],115 "Detail": json.dumps(EVENT_DETAIL),116 }117 ]118 self._put_events_with_filter_to_sqs(119 events_client, sqs_client, pattern=TEST_EVENT_PATTERN, entries_asserts=[(entries, True)]120 )121 @pytest.mark.aws_validated122 def test_put_events_with_nested_event_pattern(self, events_client, sqs_client):123 pattern = {"detail": {"event": {"data": {"type": ["1"]}}}}124 entries1 = [125 {126 "Source": "test",127 "DetailType": "test",128 "Detail": json.dumps({"event": {"data": {"type": "1"}}}),129 }130 ]131 entries2 = [132 {133 "Source": "test",134 "DetailType": "test",135 "Detail": json.dumps({"event": {"data": {"type": "2"}}}),136 }137 ]138 entries3 = [139 {140 "Source": "test",141 "DetailType": "test",142 "Detail": json.dumps({"hello": "world"}),143 }144 ]145 entries_asserts = [(entries1, True), (entries2, False), (entries3, False)]146 self._put_events_with_filter_to_sqs(147 events_client,148 sqs_client,149 pattern=pattern,150 entries_asserts=entries_asserts,151 input_path="$.detail",152 )153 def test_put_events_with_target_sqs_event_detail_match(self, events_client, sqs_client):154 entries1 = [155 {156 "Source": TEST_EVENT_PATTERN["source"][0],157 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],158 "Detail": json.dumps({"EventType": "1"}),159 }160 ]161 entries2 = [162 {163 "Source": TEST_EVENT_PATTERN["source"][0],164 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],165 "Detail": json.dumps({"EventType": "2"}),166 }167 ]168 entries_asserts = [(entries1, True), (entries2, False)]169 self._put_events_with_filter_to_sqs(170 events_client,171 sqs_client,172 pattern={"detail": {"EventType": ["0", "1"]}},173 entries_asserts=entries_asserts,174 input_path="$.detail",175 )176 def _put_events_with_filter_to_sqs(177 self,178 events_client,179 sqs_client,180 pattern: Dict,181 entries_asserts: List[Tuple[List[Dict], bool]],182 input_path: str = None,183 ):184 queue_name = f"queue-{short_uid()}"185 rule_name = f"rule-{short_uid()}"186 target_id = f"target-{short_uid()}"187 bus_name = f"bus-{short_uid()}"188 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]189 queue_arn = self._get_queue_arn(queue_url, sqs_client)190 policy = {191 "Version": "2012-10-17",192 "Id": f"sqs-eventbridge-{short_uid()}",193 "Statement": [194 {195 "Sid": f"SendMessage-{short_uid()}",196 "Effect": "Allow",197 "Principal": {"Service": "events.amazonaws.com"},198 "Action": "sqs:SendMessage",199 "Resource": queue_arn,200 }201 ],202 }203 sqs_client.set_queue_attributes(204 QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}205 )206 events_client.create_event_bus(Name=bus_name)207 events_client.put_rule(208 Name=rule_name,209 EventBusName=bus_name,210 EventPattern=json.dumps(pattern),211 )212 kwargs = {"InputPath": input_path} if input_path else {}213 rs = events_client.put_targets(214 Rule=rule_name,215 EventBusName=bus_name,216 Targets=[{"Id": target_id, "Arn": queue_arn, **kwargs}],217 )218 assert rs["FailedEntryCount"] == 0219 assert rs["FailedEntries"] == []220 try:221 for entry_asserts in entries_asserts:222 entries = entry_asserts[0]223 for entry in entries:224 entry.setdefault("EventBusName", bus_name)225 self._put_entries_assert_results_sqs(226 events_client,227 sqs_client,228 queue_url,229 entries=entries,230 should_match=entry_asserts[1],231 )232 finally:233 self.cleanup(234 bus_name,235 rule_name,236 target_id,237 queue_url=queue_url,238 events_client=events_client,239 sqs_client=sqs_client,240 )241 def _put_entries_assert_results_sqs(242 self, events_client, sqs_client, queue_url: str, entries: List[Dict], should_match: bool243 ):244 response = events_client.put_events(Entries=entries)245 assert not response.get("FailedEntryCount")246 def get_message(queue_url):247 resp = sqs_client.receive_message(QueueUrl=queue_url)248 messages = resp.get("Messages")249 if should_match:250 assert len(messages) == 1251 return messages252 messages = retry(get_message, retries=5, sleep=1, queue_url=queue_url)253 if should_match:254 actual_event = json.loads(messages[0]["Body"])255 if "detail" in actual_event:256 self.assert_valid_event(actual_event)257 else:258 assert not messages259 return messages260 # TODO: further unify/parameterize the tests for the different target types below261 def test_put_events_with_target_sns(262 self, events_client, sns_client, sqs_client, sns_subscription263 ):264 queue_name = "test-%s" % short_uid()265 rule_name = "rule-{}".format(short_uid())266 target_id = "target-{}".format(short_uid())267 bus_name = "bus-{}".format(short_uid())268 topic_name = "topic-{}".format(short_uid())269 topic_arn = sns_client.create_topic(Name=topic_name)["TopicArn"]270 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]271 queue_arn = aws_stack.sqs_queue_arn(queue_name)272 sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)273 events_client.create_event_bus(Name=bus_name)274 events_client.put_rule(275 Name=rule_name,276 EventBusName=bus_name,277 EventPattern=json.dumps(TEST_EVENT_PATTERN),278 )279 rs = events_client.put_targets(280 Rule=rule_name,281 EventBusName=bus_name,282 Targets=[{"Id": target_id, "Arn": topic_arn}],283 )284 assert "FailedEntryCount" in rs285 assert "FailedEntries" in rs286 assert rs["FailedEntryCount"] == 0287 assert rs["FailedEntries"] == []288 events_client.put_events(289 Entries=[290 {291 "EventBusName": bus_name,292 "Source": TEST_EVENT_PATTERN["source"][0],293 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],294 "Detail": json.dumps(EVENT_DETAIL),295 }296 ]297 )298 def get_message(queue_url):299 resp = sqs_client.receive_message(QueueUrl=queue_url)300 return resp["Messages"]301 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)302 assert len(messages) == 1303 actual_event = json.loads(messages[0]["Body"]).get("Message")304 self.assert_valid_event(actual_event)305 assert json.loads(actual_event).get("detail") == EVENT_DETAIL306 # clean up307 sns_client.delete_topic(TopicArn=topic_arn)308 self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)309 def test_put_events_into_event_bus(self, events_client, sqs_client):310 queue_name = "queue-{}".format(short_uid())311 rule_name = "rule-{}".format(short_uid())312 target_id = "target-{}".format(short_uid())313 bus_name_1 = "bus1-{}".format(short_uid())314 bus_name_2 = "bus2-{}".format(short_uid())315 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]316 queue_arn = self._get_queue_arn(queue_url, sqs_client)317 events_client.create_event_bus(Name=bus_name_1)318 resp = events_client.create_event_bus(Name=bus_name_2)319 events_client.put_rule(320 Name=rule_name,321 EventBusName=bus_name_1,322 EventPattern=json.dumps(TEST_EVENT_PATTERN),323 )324 events_client.put_targets(325 Rule=rule_name,326 EventBusName=bus_name_1,327 Targets=[{"Id": target_id, "Arn": resp.get("EventBusArn")}],328 )329 events_client.put_targets(330 Rule=rule_name,331 EventBusName=bus_name_2,332 Targets=[{"Id": target_id, "Arn": queue_arn}],333 )334 events_client.put_events(335 Entries=[336 {337 "EventBusName": bus_name_1,338 "Source": TEST_EVENT_PATTERN["source"][0],339 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],340 "Detail": json.dumps(EVENT_DETAIL),341 }342 ]343 )344 def get_message(queue_url):345 resp = sqs_client.receive_message(QueueUrl=queue_url)346 return resp["Messages"]347 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)348 assert len(messages) == 1349 actual_event = json.loads(messages[0]["Body"])350 self.assert_valid_event(actual_event)351 assert actual_event["detail"] == EVENT_DETAIL352 # clean up353 self.cleanup(bus_name_1, rule_name, target_id)354 self.cleanup(bus_name_2)355 sqs_client.delete_queue(QueueUrl=queue_url)356 def test_put_events_with_target_lambda(self, events_client):357 rule_name = "rule-{}".format(short_uid())358 function_name = "lambda-func-{}".format(short_uid())359 target_id = "target-{}".format(short_uid())360 bus_name = "bus-{}".format(short_uid())361 rs = testutil.create_lambda_function(362 handler_file=TEST_LAMBDA_PYTHON_ECHO,363 func_name=function_name,364 runtime=LAMBDA_RUNTIME_PYTHON36,365 )366 func_arn = rs["CreateFunctionResponse"]["FunctionArn"]367 events_client.create_event_bus(Name=bus_name)368 events_client.put_rule(369 Name=rule_name,370 EventBusName=bus_name,371 EventPattern=json.dumps(TEST_EVENT_PATTERN),372 )373 rs = events_client.put_targets(374 Rule=rule_name,375 EventBusName=bus_name,376 Targets=[{"Id": target_id, "Arn": func_arn}],377 )378 assert "FailedEntryCount" in rs379 assert "FailedEntries" in rs380 assert rs["FailedEntryCount"] == 0381 assert rs["FailedEntries"] == []382 events_client.put_events(383 Entries=[384 {385 "EventBusName": bus_name,386 "Source": TEST_EVENT_PATTERN["source"][0],387 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],388 "Detail": json.dumps(EVENT_DETAIL),389 }390 ]391 )392 # Get lambda's log events393 events = retry(394 check_expected_lambda_log_events_length,395 retries=3,396 sleep=1,397 function_name=function_name,398 expected_length=1,399 )400 actual_event = events[0]401 self.assert_valid_event(actual_event)402 assert actual_event["detail"] == EVENT_DETAIL403 # clean up404 testutil.delete_lambda_function(function_name)405 self.cleanup(bus_name, rule_name, target_id)406 def test_rule_disable(self, events_client):407 rule_name = "rule-{}".format(short_uid())408 events_client.put_rule(Name=rule_name, ScheduleExpression="rate(1 minutes)")409 response = events_client.list_rules()410 assert response["Rules"][0]["State"] == "ENABLED"411 events_client.disable_rule(Name=rule_name)412 response = events_client.list_rules(NamePrefix=rule_name)413 assert response["Rules"][0]["State"] == "DISABLED"414 # clean up415 self.cleanup(rule_name=rule_name)416 def test_scheduled_expression_events(417 self, stepfunctions_client, sns_client, sqs_client, events_client, sns_subscription418 ):419 class HttpEndpointListener(ProxyListener):420 def forward_request(self, method, path, data, headers):421 event = json.loads(to_str(data))422 events.append(event)423 return 200424 local_port = get_free_tcp_port()425 proxy = start_proxy(local_port, update_listener=HttpEndpointListener())426 wait_for_port_open(local_port)427 topic_name = "topic-{}".format(short_uid())428 queue_name = "queue-{}".format(short_uid())429 fifo_queue_name = "queue-{}.fifo".format(short_uid())430 rule_name = "rule-{}".format(short_uid())431 endpoint = "{}://{}:{}".format(432 get_service_protocol(), config.LOCALSTACK_HOSTNAME, local_port433 )434 sm_role_arn = aws_stack.role_arn("sfn_role")435 sm_name = "state-machine-{}".format(short_uid())436 topic_target_id = "target-{}".format(short_uid())437 sm_target_id = "target-{}".format(short_uid())438 queue_target_id = "target-{}".format(short_uid())439 fifo_queue_target_id = "target-{}".format(short_uid())440 events = []441 state_machine_definition = """442 {443 "StartAt": "Hello",444 "States": {445 "Hello": {446 "Type": "Pass",447 "Result": "World",448 "End": true449 }450 }451 }452 """453 state_machine_arn = stepfunctions_client.create_state_machine(454 name=sm_name, definition=state_machine_definition, roleArn=sm_role_arn455 )["stateMachineArn"]456 topic_arn = sns_client.create_topic(Name=topic_name)["TopicArn"]457 sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint)458 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]459 fifo_queue_url = sqs_client.create_queue(460 QueueName=fifo_queue_name,461 Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},462 )["QueueUrl"]463 queue_arn = aws_stack.sqs_queue_arn(queue_name)464 fifo_queue_arn = aws_stack.sqs_queue_arn(fifo_queue_name)465 event = {"env": "testing"}466 events_client.put_rule(Name=rule_name, ScheduleExpression="rate(1 minutes)")467 events_client.put_targets(468 Rule=rule_name,469 Targets=[470 {"Id": topic_target_id, "Arn": topic_arn, "Input": json.dumps(event)},471 {472 "Id": sm_target_id,473 "Arn": state_machine_arn,474 "Input": json.dumps(event),475 },476 {"Id": queue_target_id, "Arn": queue_arn, "Input": json.dumps(event)},477 {478 "Id": fifo_queue_target_id,479 "Arn": fifo_queue_arn,480 "Input": json.dumps(event),481 "SqsParameters": {"MessageGroupId": "123"},482 },483 ],484 )485 def received(q_urls):486 # state machine got executed487 executions = stepfunctions_client.list_executions(stateMachineArn=state_machine_arn)[488 "executions"489 ]490 assert len(executions) >= 1491 # http endpoint got events492 assert len(events) >= 2493 notifications = [494 event["Message"] for event in events if event["Type"] == "Notification"495 ]496 assert len(notifications) >= 1497 # get state machine execution detail498 execution_arn = executions[0]["executionArn"]499 execution_input = stepfunctions_client.describe_execution(executionArn=execution_arn)[500 "input"501 ]502 all_msgs = []503 # get message from queue504 for url in q_urls:505 msgs = sqs_client.receive_message(QueueUrl=url).get("Messages", [])506 assert len(msgs) >= 1507 all_msgs.append(msgs[0])508 return execution_input, notifications[0], all_msgs509 execution_input, notification, msgs_received = retry(510 received, retries=5, sleep=15, q_urls=[queue_url, fifo_queue_url]511 )512 assert json.loads(notification) == event513 assert json.loads(execution_input) == event514 for msg_received in msgs_received:515 assert json.loads(msg_received["Body"]) == event516 # clean up517 proxy.stop()518 target_ids = [topic_target_id, sm_target_id, queue_target_id, fifo_queue_target_id]519 self.cleanup(None, rule_name, target_ids=target_ids, queue_url=queue_url)520 sns_client.delete_topic(TopicArn=topic_arn)521 stepfunctions_client.delete_state_machine(stateMachineArn=state_machine_arn)522 def test_api_destinations(self, events_client):523 token = short_uid()524 bearer = "Bearer %s" % token525 class HttpEndpointListener(ProxyListener):526 def forward_request(self, method, path, data, headers):527 event = json.loads(to_str(data))528 events.append(event)529 paths_list.append(path)530 auth = headers.get("Api") or headers.get("Authorization")531 if auth not in headers_list:532 headers_list.append(auth)533 if headers.get("target_header"):534 headers_list.append(headers.get("target_header"))535 if "client_id" in event:536 oauth_data.update(537 {538 "client_id": event.get("client_id"),539 "client_secret": event.get("client_secret"),540 "header_value": headers.get("oauthheader"),541 "body_value": event.get("oauthbody"),542 "path": path,543 }544 )545 return requests_response(546 {547 "access_token": token,548 "token_type": "Bearer",549 "expires_in": 86400,550 }551 )552 events = []553 paths_list = []554 headers_list = []555 oauth_data = {}556 local_port = get_free_tcp_port()557 proxy = start_proxy(local_port, update_listener=HttpEndpointListener())558 wait_for_port_open(local_port)559 url = f"http://localhost:{local_port}"560 auth_types = [561 {562 "type": "BASIC",563 "key": "BasicAuthParameters",564 "parameters": {"Username": "user", "Password": "pass"},565 },566 {567 "type": "API_KEY",568 "key": "ApiKeyAuthParameters",569 "parameters": {"ApiKeyName": "Api", "ApiKeyValue": "apikey_secret"},570 },571 {572 "type": "OAUTH_CLIENT_CREDENTIALS",573 "key": "OAuthParameters",574 "parameters": {575 "AuthorizationEndpoint": url,576 "ClientParameters": {"ClientID": "id", "ClientSecret": "password"},577 "HttpMethod": "put",578 "OAuthHttpParameters": {579 "BodyParameters": [{"Key": "oauthbody", "Value": "value1"}],580 "HeaderParameters": [{"Key": "oauthheader", "Value": "value2"}],581 "QueryStringParameters": [{"Key": "oauthquery", "Value": "value3"}],582 },583 },584 },585 ]586 for auth in auth_types:587 connection_name = "c-%s" % short_uid()588 connection_arn = events_client.create_connection(589 Name=connection_name,590 AuthorizationType=auth.get("type"),591 AuthParameters={592 auth.get("key"): auth.get("parameters"),593 "InvocationHttpParameters": {594 "BodyParameters": [595 {"Key": "key", "Value": "value", "IsValueSecret": False}596 ],597 "HeaderParameters": [598 {"Key": "key", "Value": "value", "IsValueSecret": False}599 ],600 "QueryStringParameters": [601 {"Key": "key", "Value": "value", "IsValueSecret": False}602 ],603 },604 },605 )["ConnectionArn"]606 # create api destination607 dest_name = "d-%s" % short_uid()608 result = events_client.create_api_destination(609 Name=dest_name,610 ConnectionArn=connection_arn,611 InvocationEndpoint=url,612 HttpMethod="POST",613 )614 # create rule and target615 rule_name = "r-%s" % short_uid()616 target_id = "target-{}".format(short_uid())617 pattern = json.dumps({"source": ["source-123"], "detail-type": ["type-123"]})618 events_client.put_rule(Name=rule_name, EventPattern=pattern)619 events_client.put_targets(620 Rule=rule_name,621 Targets=[622 {623 "Id": target_id,624 "Arn": result["ApiDestinationArn"],625 "Input": '{"target_value":"value"}',626 "HttpParameters": {627 "PathParameterValues": ["target_path"],628 "HeaderParameters": {"target_header": "target_header_value"},629 "QueryStringParameters": {"target_query": "t_query"},630 },631 }632 ],633 )634 entries = [635 {636 "Source": "source-123",637 "DetailType": "type-123",638 "Detail": '{"i": %s}' % 0,639 }640 ]641 events_client.put_events(Entries=entries)642 # clean up643 events_client.delete_connection(Name=connection_name)644 events_client.delete_api_destination(Name=dest_name)645 self.cleanup(rule_name=rule_name, target_ids=target_id)646 # assert that all events have been received in the HTTP server listener647 def check():648 assert len(events) >= len(auth_types)649 assert "key" in paths_list[0] and "value" in paths_list[0]650 assert "target_query" in paths_list[0] and "t_query" in paths_list[0]651 assert "target_path" in paths_list[0]652 assert events[0].get("key") == "value"653 assert events[0].get("target_value") == "value"654 assert oauth_data.get("client_id") == "id"655 assert oauth_data.get("client_secret") == "password"656 assert oauth_data.get("header_value") == "value2"657 assert oauth_data.get("body_value") == "value1"658 assert "oauthquery" in oauth_data.get("path")659 assert "value3" in oauth_data.get("path")660 user_pass = to_str(base64.b64encode(b"user:pass"))661 assert f"Basic {user_pass}" in headers_list662 assert "apikey_secret" in headers_list663 assert bearer in headers_list664 assert "target_header_value" in headers_list665 retry(check, sleep=0.5, retries=5)666 # clean up667 proxy.stop()668 def test_put_events_with_target_firehose(self, events_client, s3_client, firehose_client):669 s3_bucket = "s3-{}".format(short_uid())670 s3_prefix = "testeventdata"671 stream_name = "firehose-{}".format(short_uid())672 rule_name = "rule-{}".format(short_uid())673 target_id = "target-{}".format(short_uid())674 bus_name = "bus-{}".format(short_uid())675 # create firehose target bucket676 aws_stack.get_or_create_bucket(s3_bucket)677 # create firehose delivery stream to s3678 stream = firehose_client.create_delivery_stream(679 DeliveryStreamName=stream_name,680 S3DestinationConfiguration={681 "RoleARN": aws_stack.iam_resource_arn("firehose"),682 "BucketARN": aws_stack.s3_bucket_arn(s3_bucket),683 "Prefix": s3_prefix,684 },685 )686 stream_arn = stream["DeliveryStreamARN"]687 events_client.create_event_bus(Name=bus_name)688 events_client.put_rule(689 Name=rule_name,690 EventBusName=bus_name,691 EventPattern=json.dumps(TEST_EVENT_PATTERN),692 )693 rs = events_client.put_targets(694 Rule=rule_name,695 EventBusName=bus_name,696 Targets=[{"Id": target_id, "Arn": stream_arn}],697 )698 assert "FailedEntryCount" in rs699 assert "FailedEntries" in rs700 assert rs["FailedEntryCount"] == 0701 assert rs["FailedEntries"] == []702 events_client.put_events(703 Entries=[704 {705 "EventBusName": bus_name,706 "Source": TEST_EVENT_PATTERN["source"][0],707 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],708 "Detail": json.dumps(EVENT_DETAIL),709 }710 ]711 )712 # run tests713 bucket_contents = s3_client.list_objects(Bucket=s3_bucket)["Contents"]714 assert len(bucket_contents) == 1715 key = bucket_contents[0]["Key"]716 s3_object = s3_client.get_object(Bucket=s3_bucket, Key=key)717 actual_event = json.loads(s3_object["Body"].read().decode())718 self.assert_valid_event(actual_event)719 assert actual_event["detail"] == EVENT_DETAIL720 # clean up721 firehose_client.delete_delivery_stream(DeliveryStreamName=stream_name)722 # empty and delete bucket723 s3_client.delete_object(Bucket=s3_bucket, Key=key)724 s3_client.delete_bucket(Bucket=s3_bucket)725 self.cleanup(bus_name, rule_name, target_id)726 def test_put_events_with_target_sqs_new_region(self):727 events_client = aws_stack.create_external_boto_client("events", region_name="eu-west-1")728 queue_name = "queue-{}".format(short_uid())729 rule_name = "rule-{}".format(short_uid())730 target_id = "target-{}".format(short_uid())731 bus_name = "bus-{}".format(short_uid())732 sqs_client = aws_stack.create_external_boto_client("sqs", region_name="eu-west-1")733 sqs_client.create_queue(QueueName=queue_name)734 queue_arn = aws_stack.sqs_queue_arn(queue_name)735 events_client.create_event_bus(Name=bus_name)736 events_client.put_rule(737 Name=rule_name,738 EventBusName=bus_name,739 EventPattern=json.dumps(TEST_EVENT_PATTERN),740 )741 events_client.put_targets(742 Rule=rule_name,743 EventBusName=bus_name,744 Targets=[{"Id": target_id, "Arn": queue_arn}],745 )746 response = events_client.put_events(747 Entries=[748 {749 "Source": "com.mycompany.myapp",750 "Detail": '{ "key1": "value1", "key": "value2" }',751 "Resources": [],752 "DetailType": "myDetailType",753 }754 ]755 )756 assert "Entries" in response757 assert len(response.get("Entries")) == 1758 assert "EventId" in response.get("Entries")[0]759 def test_put_events_with_target_kinesis(self, events_client, kinesis_client):760 rule_name = "rule-{}".format(short_uid())761 target_id = "target-{}".format(short_uid())762 bus_name = "bus-{}".format(short_uid())763 stream_name = "stream-{}".format(short_uid())764 stream_arn = aws_stack.kinesis_stream_arn(stream_name)765 kinesis_client.create_stream(StreamName=stream_name, ShardCount=1)766 events_client.create_event_bus(Name=bus_name)767 events_client.put_rule(768 Name=rule_name,769 EventBusName=bus_name,770 EventPattern=json.dumps(TEST_EVENT_PATTERN),771 )772 put_response = events_client.put_targets(773 Rule=rule_name,774 EventBusName=bus_name,775 Targets=[776 {777 "Id": target_id,778 "Arn": stream_arn,779 "KinesisParameters": {"PartitionKeyPath": "$.detail-type"},780 }781 ],782 )783 assert "FailedEntryCount" in put_response784 assert "FailedEntries" in put_response785 assert put_response["FailedEntryCount"] == 0786 assert put_response["FailedEntries"] == []787 def check_stream_status():788 _stream = kinesis_client.describe_stream(StreamName=stream_name)789 assert _stream["StreamDescription"]["StreamStatus"] == "ACTIVE"790 # wait until stream becomes available791 retry(check_stream_status, retries=7, sleep=0.8)792 events_client.put_events(793 Entries=[794 {795 "EventBusName": bus_name,796 "Source": TEST_EVENT_PATTERN["source"][0],797 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],798 "Detail": json.dumps(EVENT_DETAIL),799 }800 ]801 )802 stream = kinesis_client.describe_stream(StreamName=stream_name)803 shard_id = stream["StreamDescription"]["Shards"][0]["ShardId"]804 shard_iterator = kinesis_client.get_shard_iterator(805 StreamName=stream_name,806 ShardId=shard_id,807 ShardIteratorType="AT_TIMESTAMP",808 Timestamp=datetime(2020, 1, 1),809 )["ShardIterator"]810 record = kinesis_client.get_records(ShardIterator=shard_iterator)["Records"][0]811 partition_key = record["PartitionKey"]812 data = json.loads(record["Data"].decode())813 assert partition_key == TEST_EVENT_PATTERN["detail-type"][0]814 assert data["detail"] == EVENT_DETAIL815 self.assert_valid_event(data)816 def test_put_events_with_input_path(self, events_client, sqs_client):817 queue_name = f"queue-{short_uid()}"818 rule_name = f"rule-{short_uid()}"819 target_id = f"target-{short_uid()}"820 bus_name = f"bus-{short_uid()}"821 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]822 queue_arn = aws_stack.sqs_queue_arn(queue_name)823 events_client.create_event_bus(Name=bus_name)824 events_client.put_rule(825 Name=rule_name,826 EventBusName=bus_name,827 EventPattern=json.dumps(TEST_EVENT_PATTERN),828 )829 events_client.put_targets(830 Rule=rule_name,831 EventBusName=bus_name,832 Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],833 )834 events_client.put_events(835 Entries=[836 {837 "EventBusName": bus_name,838 "Source": TEST_EVENT_PATTERN["source"][0],839 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],840 "Detail": json.dumps(EVENT_DETAIL),841 }842 ]843 )844 def get_message(queue_url):845 resp = sqs_client.receive_message(QueueUrl=queue_url)846 return resp.get("Messages")847 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)848 assert len(messages) == 1849 assert json.loads(messages[0].get("Body")) == EVENT_DETAIL850 events_client.put_events(851 Entries=[852 {853 "EventBusName": bus_name,854 "Source": "dummySource",855 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],856 "Detail": json.dumps(EVENT_DETAIL),857 }858 ]859 )860 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)861 assert messages is None862 # clean up863 self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)864 def test_put_events_with_input_path_multiple(self, events_client, sqs_client):865 queue_name = "queue-{}".format(short_uid())866 queue_name_1 = "queue-{}".format(short_uid())867 rule_name = "rule-{}".format(short_uid())868 target_id = "target-{}".format(short_uid())869 target_id_1 = "target-{}".format(short_uid())870 bus_name = "bus-{}".format(short_uid())871 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]872 queue_arn = aws_stack.sqs_queue_arn(queue_name)873 queue_url_1 = sqs_client.create_queue(QueueName=queue_name_1)["QueueUrl"]874 queue_arn_1 = aws_stack.sqs_queue_arn(queue_name_1)875 events_client.create_event_bus(Name=bus_name)876 events_client.put_rule(877 Name=rule_name,878 EventBusName=bus_name,879 EventPattern=json.dumps(TEST_EVENT_PATTERN),880 )881 events_client.put_targets(882 Rule=rule_name,883 EventBusName=bus_name,884 Targets=[885 {"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"},886 {887 "Id": target_id_1,888 "Arn": queue_arn_1,889 },890 ],891 )892 events_client.put_events(893 Entries=[894 {895 "EventBusName": bus_name,896 "Source": TEST_EVENT_PATTERN["source"][0],897 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],898 "Detail": json.dumps(EVENT_DETAIL),899 }900 ]901 )902 def get_message(queue_url):903 resp = sqs_client.receive_message(QueueUrl=queue_url)904 return resp.get("Messages")905 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)906 assert len(messages) == 1907 assert json.loads(messages[0].get("Body")) == EVENT_DETAIL908 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url_1)909 assert len(messages) == 1910 assert json.loads(messages[0].get("Body")).get("detail") == EVENT_DETAIL911 events_client.put_events(912 Entries=[913 {914 "EventBusName": bus_name,915 "Source": "dummySource",916 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],917 "Detail": json.dumps(EVENT_DETAIL),918 }919 ]920 )921 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)922 assert messages is None923 # clean up924 self.cleanup(bus_name, rule_name, [target_id, target_id_1], queue_url=queue_url)925 def test_put_event_without_source(self):926 events_client = aws_stack.create_external_boto_client("events", region_name="eu-west-1")927 response = events_client.put_events(Entries=[{"DetailType": "Test", "Detail": "{}"}])928 assert response.get("Entries")929 def test_put_event_without_detail(self):930 events_client = aws_stack.create_external_boto_client("events", region_name="eu-west-1")931 response = events_client.put_events(932 Entries=[933 {934 "DetailType": "Test",935 }936 ]937 )938 assert response.get("Entries")939 def test_trigger_event_on_ssm_change(self, events_client, sqs_client, ssm_client):940 rule_name = "rule-{}".format(short_uid())941 target_id = "target-{}".format(short_uid())942 # create queue943 queue_name = "queue-{}".format(short_uid())944 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]945 queue_arn = aws_stack.sqs_queue_arn(queue_name)946 # put rule listening on SSM changes947 ssm_prefix = "/test/local/"948 events_client.put_rule(949 Name=rule_name,950 EventPattern=json.dumps(951 {952 "detail": {953 "name": [{"prefix": ssm_prefix}],954 "operation": ["Create", "Update", "Delete", "LabelParameterVersion"],955 },956 "detail-type": ["Parameter Store Change"],957 "source": ["aws.ssm"],958 }959 ),960 State="ENABLED",961 Description="Trigger on SSM parameter changes",962 )963 # put target964 events_client.put_targets(965 Rule=rule_name,966 EventBusName=TEST_EVENT_BUS_NAME,967 Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],968 )969 # change SSM param to trigger event970 ssm_client.put_parameter(Name=f"{ssm_prefix}/test123", Value="value1", Type="String")971 def assert_message():972 resp = sqs_client.receive_message(QueueUrl=queue_url)973 result = resp.get("Messages")974 body = json.loads(result[0]["Body"])975 assert body == {"name": "/test/local/test123", "operation": "Create"}976 # assert that message has been received977 retry(assert_message, retries=7, sleep=0.3)978 # clean up979 self.cleanup(rule_name=rule_name, target_ids=target_id)980 def test_put_event_with_content_base_rule_in_pattern(self, events_client, sqs_client):981 queue_name = f"queue-{short_uid()}"982 rule_name = f"rule-{short_uid()}"983 target_id = f"target-{short_uid()}"984 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]985 queue_arn = aws_stack.sqs_queue_arn(queue_name)986 pattern = {987 "Source": [{"exists": True}],988 "detail-type": [{"prefix": "core.app"}],989 "Detail": {990 "decription": ["this-is-event-details"],991 "amount": [200],992 "salary": [2000, 4000],993 "env": ["dev", "prod"],994 "user": ["user1", "user2", "user3"],995 "admins": ["skyli", {"prefix": "hey"}, {"prefix": "ad"}],996 "test1": [{"anything-but": 200}],997 "test2": [{"anything-but": "test2"}],998 "test3": [{"anything-but": ["test3", "test33"]}],999 "test4": [{"anything-but": {"prefix": "test4"}}],1000 "ip": [{"cidr": "10.102.1.0/24"}],1001 "num-test1": [{"numeric": ["<", 200]}],1002 "num-test2": [{"numeric": ["<=", 200]}],1003 "num-test3": [{"numeric": [">", 200]}],1004 "num-test4": [{"numeric": [">=", 200]}],1005 "num-test5": [{"numeric": [">=", 200, "<=", 500]}],1006 "num-test6": [{"numeric": [">", 200, "<", 500]}],1007 "num-test7": [{"numeric": [">=", 200, "<", 500]}],1008 },1009 }1010 event = {1011 "EventBusName": TEST_EVENT_BUS_NAME,1012 "Source": "core.update-account-command",1013 "DetailType": "core.app.backend",1014 "Detail": json.dumps(1015 {1016 "decription": "this-is-event-details",1017 "amount": 200,1018 "salary": 2000,1019 "env": "prod",1020 "user": "user3",1021 "admins": "admin",1022 "test1": 300,1023 "test2": "test22",1024 "test3": "test333",1025 "test4": "this test4",1026 "ip": "10.102.1.100",1027 "num-test1": 100,1028 "num-test2": 200,1029 "num-test3": 300,1030 "num-test4": 200,1031 "num-test5": 500,1032 "num-test6": 300,1033 "num-test7": 300,1034 }1035 ),1036 }1037 events_client.create_event_bus(Name=TEST_EVENT_BUS_NAME)1038 events_client.put_rule(1039 Name=rule_name,1040 EventBusName=TEST_EVENT_BUS_NAME,1041 EventPattern=json.dumps(pattern),1042 )1043 events_client.put_targets(1044 Rule=rule_name,1045 EventBusName=TEST_EVENT_BUS_NAME,1046 Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],1047 )1048 events_client.put_events(Entries=[event])1049 def get_message(queue_url):1050 resp = sqs_client.receive_message(QueueUrl=queue_url)1051 return resp.get("Messages")1052 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)1053 assert len(messages) == 11054 assert json.loads(messages[0].get("Body")) == json.loads(event["Detail"])1055 event_details = json.loads(event["Detail"])1056 event_details["admins"] = "no"1057 event["Detail"] = json.dumps(event_details)1058 events_client.put_events(Entries=[event])1059 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)1060 assert messages is None1061 # clean up1062 self.cleanup(TEST_EVENT_BUS_NAME, rule_name, target_id, queue_url=queue_url)1063 def _get_queue_arn(self, queue_url, sqs_client):1064 queue_attrs = sqs_client.get_queue_attributes(1065 QueueUrl=queue_url, AttributeNames=["QueueArn"]1066 )1067 return queue_attrs["Attributes"]["QueueArn"]1068 def cleanup(1069 self,1070 bus_name=None,1071 rule_name=None,1072 target_ids=None,1073 queue_url=None,1074 events_client=None,1075 sqs_client=None,1076 ):1077 events_client = events_client or aws_stack.create_external_boto_client("events")1078 kwargs = {"EventBusName": bus_name} if bus_name else {}1079 if target_ids:1080 target_ids = target_ids if isinstance(target_ids, list) else [target_ids]1081 events_client.remove_targets(Rule=rule_name, Ids=target_ids, Force=True, **kwargs)1082 if rule_name:1083 events_client.delete_rule(Name=rule_name, Force=True, **kwargs)1084 if bus_name:1085 events_client.delete_event_bus(Name=bus_name)1086 if queue_url:1087 sqs_client = sqs_client or aws_stack.create_external_boto_client("sqs")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful