How to use test_event_pattern method in localstack

Best Python code snippet using localstack_python

test_events.py

Source:test_events.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import json3import os4import unittest5import uuid6from datetime import datetime7from localstack import config8from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON369from localstack.services.events.events_listener import EVENTS_TMP_DIR10from localstack.services.generic_proxy import ProxyListener11from localstack.services.infra import start_proxy12from localstack.utils import testutil13from localstack.utils.aws import aws_stack14from localstack.utils.common import (15 get_free_tcp_port,16 get_service_protocol,17 load_file,18 retry,19 short_uid,20 to_str,21 wait_for_port_open,22)23from localstack.utils.testutil import check_expected_lambda_log_events_length24THIS_FOLDER = os.path.dirname(os.path.realpath(__file__))25TEST_EVENT_BUS_NAME = "command-bus-dev"26EVENT_DETAIL = '{"command":"update-account","payload":{"acc_id":"0a787ecb-4015","sf_id":"baz"}}'27TEST_EVENT_PATTERN = {28 "Source": ["core.update-account-command"],29 "detail-type": ["core.update-account-command"],30 "Detail": [EVENT_DETAIL],31}32class EventsTest(unittest.TestCase):33 def setUp(self):34 self.events_client = aws_stack.connect_to_service("events")35 self.iam_client = aws_stack.connect_to_service("iam")36 self.sns_client = aws_stack.connect_to_service("sns")37 self.sfn_client = aws_stack.connect_to_service("stepfunctions")38 self.sqs_client = aws_stack.connect_to_service("sqs")39 def assertIsValidEvent(self, event):40 expected_fields = (41 "version",42 "id",43 "detail-type",44 "source",45 "account",46 "time",47 "region",48 "resources",49 "detail",50 )51 for field in expected_fields:52 self.assertIn(field, event)53 def test_put_rule(self):54 rule_name = "rule-{}".format(short_uid())55 self.events_client.put_rule(Name=rule_name, EventPattern=json.dumps(TEST_EVENT_PATTERN))56 rules = self.events_client.list_rules(NamePrefix=rule_name)["Rules"]57 self.assertEqual(1, len(rules))58 self.assertEqual(TEST_EVENT_PATTERN, json.loads(rules[0]["EventPattern"]))59 # clean up60 self.events_client.delete_rule(Name=rule_name, Force=True)61 def test_events_written_to_disk_are_timestamp_prefixed_for_chronological_ordering(62 self,63 ):64 event_type = str(uuid.uuid4())65 event_details_to_publish = list(map(lambda n: "event %s" % n, range(10)))66 for detail in event_details_to_publish:67 self.events_client.put_events(68 Entries=[69 {70 "Source": "unittest",71 "Resources": [],72 "DetailType": event_type,73 "Detail": json.dumps(detail),74 }75 ]76 )77 sorted_events_written_to_disk = map(78 lambda filename: json.loads(str(load_file(os.path.join(EVENTS_TMP_DIR, filename)))),79 sorted(os.listdir(EVENTS_TMP_DIR)),80 )81 sorted_events = list(82 filter(83 lambda event: event["DetailType"] == event_type,84 sorted_events_written_to_disk,85 )86 )87 self.assertListEqual(88 event_details_to_publish,89 list(map(lambda event: json.loads(event["Detail"]), sorted_events)),90 )91 def test_list_tags_for_resource(self):92 rule_name = "rule-{}".format(short_uid())93 rule = self.events_client.put_rule(94 Name=rule_name, EventPattern=json.dumps(TEST_EVENT_PATTERN)95 )96 rule_arn = rule["RuleArn"]97 expected = [98 {"Key": "key1", "Value": "value1"},99 {"Key": "key2", "Value": "value2"},100 ]101 # insert two tags, verify both are visible102 self.events_client.tag_resource(ResourceARN=rule_arn, Tags=expected)103 actual = self.events_client.list_tags_for_resource(ResourceARN=rule_arn)["Tags"]104 self.assertEqual(expected, actual)105 # remove 'key2', verify only 'key1' remains106 expected = [{"Key": "key1", "Value": "value1"}]107 self.events_client.untag_resource(ResourceARN=rule_arn, TagKeys=["key2"])108 actual = self.events_client.list_tags_for_resource(ResourceARN=rule_arn)["Tags"]109 self.assertEqual(expected, actual)110 # clean up111 self.events_client.delete_rule(Name=rule_name, Force=True)112 def test_put_events_with_target_sqs(self):113 queue_name = "queue-{}".format(short_uid())114 rule_name = "rule-{}".format(short_uid())115 target_id = "target-{}".format(short_uid())116 bus_name = "bus-{}".format(short_uid())117 sqs_client = aws_stack.connect_to_service("sqs")118 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]119 queue_arn = aws_stack.sqs_queue_arn(queue_name)120 self.events_client.create_event_bus(Name=bus_name)121 self.events_client.put_rule(122 Name=rule_name,123 EventBusName=bus_name,124 EventPattern=json.dumps(TEST_EVENT_PATTERN),125 )126 rs = self.events_client.put_targets(127 Rule=rule_name,128 EventBusName=bus_name,129 Targets=[{"Id": target_id, "Arn": queue_arn}],130 )131 self.assertIn("FailedEntryCount", rs)132 self.assertIn("FailedEntries", rs)133 self.assertEqual(0, rs["FailedEntryCount"])134 self.assertEqual([], rs["FailedEntries"])135 self.events_client.put_events(136 Entries=[137 {138 "EventBusName": bus_name,139 "Source": TEST_EVENT_PATTERN["Source"][0],140 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],141 "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),142 }143 ]144 )145 def get_message(queue_url):146 resp = sqs_client.receive_message(QueueUrl=queue_url)147 return resp["Messages"]148 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)149 self.assertEqual(1, len(messages))150 actual_event = json.loads(messages[0]["Body"])151 self.assertIsValidEvent(actual_event)152 self.assertEqual(TEST_EVENT_PATTERN["Detail"][0], actual_event["detail"])153 # clean up154 self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)155 def test_put_events_with_target_sqs_event_detail_match(self):156 queue_name = "queue-{}".format(short_uid())157 rule_name = "rule-{}".format(short_uid())158 target_id = "target-{}".format(short_uid())159 bus_name = "bus-{}".format(short_uid())160 sqs_client = aws_stack.connect_to_service("sqs")161 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]162 queue_arn = aws_stack.sqs_queue_arn(queue_name)163 self.events_client.create_event_bus(Name=bus_name)164 self.events_client.put_rule(165 Name=rule_name,166 EventBusName=bus_name,167 EventPattern=json.dumps({"detail": {"EventType": ["0", "1"]}}),168 )169 rs = self.events_client.put_targets(170 Rule=rule_name,171 EventBusName=bus_name,172 Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],173 )174 self.assertIn("FailedEntryCount", rs)175 self.assertIn("FailedEntries", rs)176 self.assertEqual(0, rs["FailedEntryCount"])177 self.assertEqual([], rs["FailedEntries"])178 self.events_client.put_events(179 Entries=[180 {181 "EventBusName": bus_name,182 "Source": TEST_EVENT_PATTERN["Source"][0],183 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],184 "Detail": json.dumps({"EventType": "1"}),185 }186 ]187 )188 def get_message(queue_url):189 resp = sqs_client.receive_message(QueueUrl=queue_url)190 return resp.get("Messages")191 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)192 self.assertEqual(1, len(messages))193 actual_event = json.loads(messages[0]["Body"])194 self.assertEqual({"EventType": "1"}, actual_event)195 self.events_client.put_events(196 Entries=[197 {198 "EventBusName": bus_name,199 "Source": TEST_EVENT_PATTERN["Source"][0],200 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],201 "Detail": json.dumps({"EventType": "2"}),202 }203 ]204 )205 def get_message(queue_url):206 resp = sqs_client.receive_message(QueueUrl=queue_url)207 return resp.get("Messages", [])208 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)209 self.assertEqual(0, len(messages))210 # clean up211 self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)212 def test_put_events_with_target_sns(self):213 queue_name = "test-%s" % short_uid()214 rule_name = "rule-{}".format(short_uid())215 target_id = "target-{}".format(short_uid())216 bus_name = "bus-{}".format(short_uid())217 sns_client = aws_stack.connect_to_service("sns")218 sqs_client = aws_stack.connect_to_service("sqs")219 topic_name = "topic-{}".format(short_uid())220 topic_arn = sns_client.create_topic(Name=topic_name)["TopicArn"]221 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]222 queue_arn = aws_stack.sqs_queue_arn(queue_name)223 sns_client.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)224 self.events_client.create_event_bus(Name=bus_name)225 self.events_client.put_rule(226 Name=rule_name,227 EventBusName=bus_name,228 EventPattern=json.dumps(TEST_EVENT_PATTERN),229 )230 rs = self.events_client.put_targets(231 Rule=rule_name,232 EventBusName=bus_name,233 Targets=[{"Id": target_id, "Arn": topic_arn}],234 )235 self.assertIn("FailedEntryCount", rs)236 self.assertIn("FailedEntries", rs)237 self.assertEqual(0, rs["FailedEntryCount"])238 self.assertEqual([], rs["FailedEntries"])239 self.events_client.put_events(240 Entries=[241 {242 "EventBusName": bus_name,243 "Source": TEST_EVENT_PATTERN["Source"][0],244 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],245 "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),246 }247 ]248 )249 def get_message(queue_url):250 resp = sqs_client.receive_message(QueueUrl=queue_url)251 return resp["Messages"]252 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)253 self.assertEqual(1, len(messages))254 actual_event = json.loads(messages[0]["Body"]).get("Message")255 self.assertIsValidEvent(actual_event)256 self.assertEqual(TEST_EVENT_PATTERN["Detail"][0], json.loads(actual_event).get("detail"))257 # clean up258 sns_client.delete_topic(TopicArn=topic_arn)259 self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)260 def test_put_events_into_event_bus(self):261 queue_name = "queue-{}".format(short_uid())262 rule_name = "rule-{}".format(short_uid())263 target_id = "target-{}".format(short_uid())264 bus_name_1 = "bus1-{}".format(short_uid())265 bus_name_2 = "bus2-{}".format(short_uid())266 sqs_client = aws_stack.connect_to_service("sqs")267 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]268 queue_arn = aws_stack.sqs_queue_arn(queue_name)269 self.events_client.create_event_bus(Name=bus_name_1)270 resp = self.events_client.create_event_bus(Name=bus_name_2)271 self.events_client.put_rule(272 Name=rule_name,273 EventBusName=bus_name_1,274 EventPattern=json.dumps(TEST_EVENT_PATTERN),275 )276 self.events_client.put_targets(277 Rule=rule_name,278 EventBusName=bus_name_1,279 Targets=[{"Id": target_id, "Arn": resp.get("EventBusArn")}],280 )281 self.events_client.put_targets(282 Rule=rule_name,283 EventBusName=bus_name_2,284 Targets=[{"Id": target_id, "Arn": queue_arn}],285 )286 self.events_client.put_events(287 Entries=[288 {289 "EventBusName": bus_name_1,290 "Source": TEST_EVENT_PATTERN["Source"][0],291 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],292 "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),293 }294 ]295 )296 def get_message(queue_url):297 resp = sqs_client.receive_message(QueueUrl=queue_url)298 return resp["Messages"]299 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)300 self.assertEqual(1, len(messages))301 actual_event = json.loads(messages[0]["Body"])302 self.assertIsValidEvent(actual_event)303 self.assertEqual(TEST_EVENT_PATTERN["Detail"][0], actual_event["detail"])304 # clean up305 self.cleanup(bus_name_1, rule_name, target_id)306 self.cleanup(bus_name_2)307 sqs_client.delete_queue(QueueUrl=queue_url)308 def test_put_events_with_target_lambda(self):309 rule_name = "rule-{}".format(short_uid())310 function_name = "lambda-func-{}".format(short_uid())311 target_id = "target-{}".format(short_uid())312 bus_name = "bus-{}".format(short_uid())313 handler_file = os.path.join(THIS_FOLDER, "lambdas", "lambda_echo.py")314 rs = testutil.create_lambda_function(315 handler_file=handler_file,316 func_name=function_name,317 runtime=LAMBDA_RUNTIME_PYTHON36,318 )319 func_arn = rs["CreateFunctionResponse"]["FunctionArn"]320 self.events_client.create_event_bus(Name=bus_name)321 self.events_client.put_rule(322 Name=rule_name,323 EventBusName=bus_name,324 EventPattern=json.dumps(TEST_EVENT_PATTERN),325 )326 rs = self.events_client.put_targets(327 Rule=rule_name,328 EventBusName=bus_name,329 Targets=[{"Id": target_id, "Arn": func_arn}],330 )331 self.assertIn("FailedEntryCount", rs)332 self.assertIn("FailedEntries", rs)333 self.assertEqual(0, rs["FailedEntryCount"])334 self.assertEqual([], rs["FailedEntries"])335 self.events_client.put_events(336 Entries=[337 {338 "EventBusName": bus_name,339 "Source": TEST_EVENT_PATTERN["Source"][0],340 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],341 "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),342 }343 ]344 )345 # Get lambda's log events346 events = retry(347 check_expected_lambda_log_events_length,348 retries=3,349 sleep=1,350 function_name=function_name,351 expected_length=1,352 )353 actual_event = events[0]354 self.assertIsValidEvent(actual_event)355 self.assertDictEqual(356 json.loads(actual_event["detail"]),357 json.loads(TEST_EVENT_PATTERN["Detail"][0]),358 )359 # clean up360 testutil.delete_lambda_function(function_name)361 self.cleanup(bus_name, rule_name, target_id)362 def test_rule_disable(self):363 rule_name = "rule-{}".format(short_uid())364 self.events_client.put_rule(Name=rule_name, ScheduleExpression="rate(1 minutes)")365 response = self.events_client.list_rules()366 self.assertEqual("ENABLED", response["Rules"][0]["State"])367 _ = self.events_client.disable_rule(Name=rule_name)368 response = self.events_client.list_rules(NamePrefix=rule_name)369 self.assertEqual("DISABLED", response["Rules"][0]["State"])370 # clean up371 self.events_client.delete_rule(Name=rule_name, Force=True)372 def test_scheduled_expression_events(self):373 class HttpEndpointListener(ProxyListener):374 def forward_request(self, method, path, data, headers):375 event = json.loads(to_str(data))376 events.append(event)377 return 200378 local_port = get_free_tcp_port()379 proxy = start_proxy(local_port, update_listener=HttpEndpointListener())380 wait_for_port_open(local_port)381 topic_name = "topic-{}".format(short_uid())382 queue_name = "queue-{}".format(short_uid())383 fifo_queue_name = "queue-{}.fifo".format(short_uid())384 rule_name = "rule-{}".format(short_uid())385 endpoint = "{}://{}:{}".format(386 get_service_protocol(), config.LOCALSTACK_HOSTNAME, local_port387 )388 sm_role_arn = aws_stack.role_arn("sfn_role")389 sm_name = "state-machine-{}".format(short_uid())390 topic_target_id = "target-{}".format(short_uid())391 sm_target_id = "target-{}".format(short_uid())392 queue_target_id = "target-{}".format(short_uid())393 fifo_queue_target_id = "target-{}".format(short_uid())394 events = []395 state_machine_definition = """396 {397 "StartAt": "Hello",398 "States": {399 "Hello": {400 "Type": "Pass",401 "Result": "World",402 "End": true403 }404 }405 }406 """407 state_machine_arn = self.sfn_client.create_state_machine(408 name=sm_name, definition=state_machine_definition, roleArn=sm_role_arn409 )["stateMachineArn"]410 topic_arn = self.sns_client.create_topic(Name=topic_name)["TopicArn"]411 self.sns_client.subscribe(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint)412 queue_url = self.sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]413 fifo_queue_url = self.sqs_client.create_queue(414 QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"}415 )["QueueUrl"]416 queue_arn = aws_stack.sqs_queue_arn(queue_name)417 fifo_queue_arn = aws_stack.sqs_queue_arn(fifo_queue_name)418 event = {"env": "testing"}419 self.events_client.put_rule(Name=rule_name, ScheduleExpression="rate(1 minutes)")420 self.events_client.put_targets(421 Rule=rule_name,422 Targets=[423 {"Id": topic_target_id, "Arn": topic_arn, "Input": json.dumps(event)},424 {425 "Id": sm_target_id,426 "Arn": state_machine_arn,427 "Input": json.dumps(event),428 },429 {"Id": queue_target_id, "Arn": queue_arn, "Input": json.dumps(event)},430 {431 "Id": fifo_queue_target_id,432 "Arn": fifo_queue_arn,433 "Input": json.dumps(event),434 "SqsParameters": {"MessageGroupId": "123"},435 },436 ],437 )438 def received(q_urls):439 # state machine got executed440 executions = self.sfn_client.list_executions(stateMachineArn=state_machine_arn)[441 "executions"442 ]443 self.assertGreaterEqual(len(executions), 1)444 # http endpoint got events445 self.assertGreaterEqual(len(events), 2)446 notifications = [447 event["Message"] for event in events if event["Type"] == "Notification"448 ]449 self.assertGreaterEqual(len(notifications), 1)450 # get state machine execution detail451 execution_arn = executions[0]["executionArn"]452 execution_input = self.sfn_client.describe_execution(executionArn=execution_arn)[453 "input"454 ]455 all_msgs = []456 # get message from queue457 for url in q_urls:458 msgs = self.sqs_client.receive_message(QueueUrl=url).get("Messages", [])459 self.assertGreaterEqual(len(msgs), 1)460 all_msgs.append(msgs[0])461 return execution_input, notifications[0], all_msgs462 execution_input, notification, msgs_received = retry(463 received, retries=5, sleep=15, q_urls=[queue_url, fifo_queue_url]464 )465 self.assertEqual(event, json.loads(notification))466 self.assertEqual(event, json.loads(execution_input))467 for msg_received in msgs_received:468 self.assertEqual(event, json.loads(msg_received["Body"]))469 # clean up470 proxy.stop()471 self.cleanup(472 None,473 rule_name,474 target_ids=[topic_target_id, sm_target_id],475 queue_url=queue_url,476 )477 self.sns_client.delete_topic(TopicArn=topic_arn)478 self.sfn_client.delete_state_machine(stateMachineArn=state_machine_arn)479 def test_api_destinations(self):480 class HttpEndpointListener(ProxyListener):481 def forward_request(self, method, path, data, headers):482 event = json.loads(to_str(data))483 events.append(event)484 return 200485 events = []486 local_port = get_free_tcp_port()487 proxy = start_proxy(local_port, update_listener=HttpEndpointListener())488 wait_for_port_open(local_port)489 # create api destination490 dest_name = "d-%s" % short_uid()491 url = "http://localhost:%s" % local_port492 result = self.events_client.create_api_destination(493 Name=dest_name,494 ConnectionArn="c1",495 InvocationEndpoint=url,496 HttpMethod="POST",497 )498 # create rule and target499 rule_name = "r-%s" % short_uid()500 target_id = "target-{}".format(short_uid())501 pattern = json.dumps({"source": ["source-123"], "detail-type": ["type-123"]})502 self.events_client.put_rule(Name=rule_name, EventPattern=pattern)503 self.events_client.put_targets(504 Rule=rule_name,505 Targets=[{"Id": target_id, "Arn": result["ApiDestinationArn"]}],506 )507 # put events, to trigger rules508 num_events = 5509 for i in range(num_events):510 entries = [511 {512 "Source": "source-123",513 "DetailType": "type-123",514 "Detail": '{"i": %s}' % i,515 }516 ]517 self.events_client.put_events(Entries=entries)518 # assert that all events have been received in the HTTP server listener519 def check():520 self.assertEqual(len(events), num_events)521 retry(check, sleep=0.5, retries=5)522 # clean up523 proxy.stop()524 def test_put_events_with_target_firehose(self):525 s3_bucket = "s3-{}".format(short_uid())526 s3_prefix = "testeventdata"527 stream_name = "firehose-{}".format(short_uid())528 rule_name = "rule-{}".format(short_uid())529 target_id = "target-{}".format(short_uid())530 bus_name = "bus-{}".format(short_uid())531 # create firehose target bucket532 s3_client = aws_stack.connect_to_service("s3")533 s3_client.create_bucket(Bucket=s3_bucket)534 # create firehose delivery stream to s3535 firehose_client = aws_stack.connect_to_service("firehose")536 stream = firehose_client.create_delivery_stream(537 DeliveryStreamName=stream_name,538 S3DestinationConfiguration={539 "RoleARN": aws_stack.iam_resource_arn("firehose"),540 "BucketARN": aws_stack.s3_bucket_arn(s3_bucket),541 "Prefix": s3_prefix,542 },543 )544 stream_arn = stream["DeliveryStreamARN"]545 self.events_client.create_event_bus(Name=bus_name)546 self.events_client.put_rule(547 Name=rule_name,548 EventBusName=bus_name,549 EventPattern=json.dumps(TEST_EVENT_PATTERN),550 )551 rs = self.events_client.put_targets(552 Rule=rule_name,553 EventBusName=bus_name,554 Targets=[{"Id": target_id, "Arn": stream_arn}],555 )556 self.assertIn("FailedEntryCount", rs)557 self.assertIn("FailedEntries", rs)558 self.assertEqual(0, rs["FailedEntryCount"])559 self.assertEqual([], rs["FailedEntries"])560 self.events_client.put_events(561 Entries=[562 {563 "EventBusName": bus_name,564 "Source": TEST_EVENT_PATTERN["Source"][0],565 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],566 "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),567 }568 ]569 )570 # run tests571 bucket_contents = s3_client.list_objects(Bucket=s3_bucket)["Contents"]572 self.assertEqual(1, len(bucket_contents))573 key = bucket_contents[0]["Key"]574 s3_object = s3_client.get_object(Bucket=s3_bucket, Key=key)575 actual_event = json.loads(s3_object["Body"].read().decode())576 self.assertIsValidEvent(actual_event)577 self.assertEqual(TEST_EVENT_PATTERN["Detail"][0], actual_event["detail"])578 # clean up579 firehose_client.delete_delivery_stream(DeliveryStreamName=stream_name)580 # empty and delete bucket581 s3_client.delete_object(Bucket=s3_bucket, Key=key)582 s3_client.delete_bucket(Bucket=s3_bucket)583 self.cleanup(bus_name, rule_name, target_id)584 def test_put_events_with_target_sqs_new_region(self):585 self.events_client = aws_stack.connect_to_service("events", region_name="eu-west-1")586 queue_name = "queue-{}".format(short_uid())587 rule_name = "rule-{}".format(short_uid())588 target_id = "target-{}".format(short_uid())589 bus_name = "bus-{}".format(short_uid())590 sqs_client = aws_stack.connect_to_service("sqs", region_name="eu-west-1")591 sqs_client.create_queue(QueueName=queue_name)592 queue_arn = aws_stack.sqs_queue_arn(queue_name)593 self.events_client.create_event_bus(Name=bus_name)594 self.events_client.put_rule(595 Name=rule_name,596 EventBusName=bus_name,597 EventPattern=json.dumps(TEST_EVENT_PATTERN),598 )599 self.events_client.put_targets(600 Rule=rule_name,601 EventBusName=bus_name,602 Targets=[{"Id": target_id, "Arn": queue_arn}],603 )604 response = self.events_client.put_events(605 Entries=[606 {607 "Source": "com.mycompany.myapp",608 "Detail": '{ "key1": "value1", "key": "value2" }',609 "Resources": [],610 "DetailType": "myDetailType",611 }612 ]613 )614 self.assertIn("Entries", response)615 self.assertEqual(1, len(response.get("Entries")))616 self.assertIn("EventId", response.get("Entries")[0])617 def test_put_events_with_target_kinesis(self):618 rule_name = "rule-{}".format(short_uid())619 target_id = "target-{}".format(short_uid())620 bus_name = "bus-{}".format(short_uid())621 stream_name = "stream-{}".format(short_uid())622 stream_arn = aws_stack.kinesis_stream_arn(stream_name)623 kinesis_client = aws_stack.connect_to_service("kinesis")624 kinesis_client.create_stream(StreamName=stream_name, ShardCount=1)625 self.events_client.create_event_bus(Name=bus_name)626 self.events_client.put_rule(627 Name=rule_name,628 EventBusName=bus_name,629 EventPattern=json.dumps(TEST_EVENT_PATTERN),630 )631 put_response = self.events_client.put_targets(632 Rule=rule_name,633 EventBusName=bus_name,634 Targets=[635 {636 "Id": target_id,637 "Arn": stream_arn,638 "KinesisParameters": {"PartitionKeyPath": "$.detail-type"},639 }640 ],641 )642 self.assertIn("FailedEntryCount", put_response)643 self.assertIn("FailedEntries", put_response)644 self.assertEqual(0, put_response["FailedEntryCount"])645 self.assertEqual([], put_response["FailedEntries"])646 def check_stream_status():647 _stream = kinesis_client.describe_stream(StreamName=stream_name)648 assert _stream["StreamDescription"]["StreamStatus"] == "ACTIVE"649 # wait until stream becomes available650 retry(check_stream_status, retries=7, sleep=0.8)651 self.events_client.put_events(652 Entries=[653 {654 "EventBusName": bus_name,655 "Source": TEST_EVENT_PATTERN["Source"][0],656 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],657 "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),658 }659 ]660 )661 stream = kinesis_client.describe_stream(StreamName=stream_name)662 shard_id = stream["StreamDescription"]["Shards"][0]["ShardId"]663 shard_iterator = kinesis_client.get_shard_iterator(664 StreamName=stream_name,665 ShardId=shard_id,666 ShardIteratorType="AT_TIMESTAMP",667 Timestamp=datetime(2020, 1, 1),668 )["ShardIterator"]669 record = kinesis_client.get_records(ShardIterator=shard_iterator)["Records"][0]670 partition_key = record["PartitionKey"]671 data = json.loads(record["Data"].decode())672 self.assertEqual(TEST_EVENT_PATTERN["detail-type"][0], partition_key)673 self.assertEqual(EVENT_DETAIL, data["detail"])674 self.assertIsValidEvent(data)675 def test_put_events_with_input_path(self):676 queue_name = "queue-{}".format(short_uid())677 rule_name = "rule-{}".format(short_uid())678 target_id = "target-{}".format(short_uid())679 bus_name = "bus-{}".format(short_uid())680 sqs_client = aws_stack.connect_to_service("sqs")681 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]682 queue_arn = aws_stack.sqs_queue_arn(queue_name)683 self.events_client.create_event_bus(Name=bus_name)684 self.events_client.put_rule(685 Name=rule_name,686 EventBusName=bus_name,687 EventPattern=json.dumps(TEST_EVENT_PATTERN),688 )689 self.events_client.put_targets(690 Rule=rule_name,691 EventBusName=bus_name,692 Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],693 )694 self.events_client.put_events(695 Entries=[696 {697 "EventBusName": bus_name,698 "Source": TEST_EVENT_PATTERN["Source"][0],699 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],700 "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),701 }702 ]703 )704 def get_message(queue_url):705 resp = sqs_client.receive_message(QueueUrl=queue_url)706 return resp.get("Messages")707 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)708 self.assertEqual(1, len(messages))709 self.assertEqual(EVENT_DETAIL, json.loads(messages[0].get("Body")))710 self.events_client.put_events(711 Entries=[712 {713 "EventBusName": bus_name,714 "Source": "dummySource",715 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],716 "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),717 }718 ]719 )720 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)721 self.assertIsNone(messages)722 # clean up723 self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)724 def test_put_events_with_input_path_multiple(self):725 queue_name = "queue-{}".format(short_uid())726 queue_name_1 = "queue-{}".format(short_uid())727 rule_name = "rule-{}".format(short_uid())728 target_id = "target-{}".format(short_uid())729 target_id_1 = "target-{}".format(short_uid())730 bus_name = "bus-{}".format(short_uid())731 sqs_client = aws_stack.connect_to_service("sqs")732 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]733 queue_arn = aws_stack.sqs_queue_arn(queue_name)734 queue_url_1 = sqs_client.create_queue(QueueName=queue_name_1)["QueueUrl"]735 queue_arn_1 = aws_stack.sqs_queue_arn(queue_name_1)736 self.events_client.create_event_bus(Name=bus_name)737 self.events_client.put_rule(738 Name=rule_name,739 EventBusName=bus_name,740 EventPattern=json.dumps(TEST_EVENT_PATTERN),741 )742 self.events_client.put_targets(743 Rule=rule_name,744 EventBusName=bus_name,745 Targets=[746 {"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"},747 {748 "Id": target_id_1,749 "Arn": queue_arn_1,750 },751 ],752 )753 self.events_client.put_events(754 Entries=[755 {756 "EventBusName": bus_name,757 "Source": TEST_EVENT_PATTERN["Source"][0],758 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],759 "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),760 }761 ]762 )763 def get_message(queue_url):764 resp = sqs_client.receive_message(QueueUrl=queue_url)765 return resp.get("Messages")766 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)767 self.assertEqual(1, len(messages))768 self.assertEqual(EVENT_DETAIL, json.loads(messages[0].get("Body")))769 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url_1)770 self.assertEqual(1, len(messages))771 self.assertEqual(EVENT_DETAIL, json.loads(messages[0].get("Body")).get("detail"))772 self.events_client.put_events(773 Entries=[774 {775 "EventBusName": bus_name,776 "Source": "dummySource",777 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],778 "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),779 }780 ]781 )782 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)783 self.assertIsNone(messages)784 # clean up785 self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)786 def test_put_event_without_source(self):787 self.events_client = aws_stack.connect_to_service("events", region_name="eu-west-1")788 response = self.events_client.put_events(Entries=[{"DetailType": "Test", "Detail": "{}"}])789 self.assertIn("Entries", response)790 def test_put_event_without_detail(self):791 self.events_client = aws_stack.connect_to_service("events", region_name="eu-west-1")792 response = self.events_client.put_events(793 Entries=[794 {795 "DetailType": "Test",796 }797 ]798 )799 self.assertIn("Entries", response)800 def test_put_event_with_content_base_rule_in_pattern(self):801 queue_name = "queue-{}".format(short_uid())802 rule_name = "rule-{}".format(short_uid())803 target_id = "target-{}".format(short_uid())804 sqs_client = aws_stack.connect_to_service("sqs")805 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]806 queue_arn = aws_stack.sqs_queue_arn(queue_name)807 pattern = {808 "Source": [{"exists": True}],809 "detail-type": [{"prefix": "core.app"}],810 "Detail": {811 "decription": ["this-is-event-details"],812 "amount": [200],813 "salary": [2000, 4000],814 "env": ["dev", "prod"],815 "user": ["user1", "user2", "user3"],816 "admins": ["skyli", {"prefix": "hey"}, {"prefix": "ad"}],817 "test1": [{"anything-but": 200}],818 "test2": [{"anything-but": "test2"}],819 "test3": [{"anything-but": ["test3", "test33"]}],820 "test4": [{"anything-but": {"prefix": "test4"}}],821 "ip": [{"cidr": "10.102.1.0/24"}],822 "num-test1": [{"numeric": ["<", 200]}],823 "num-test2": [{"numeric": ["<=", 200]}],824 "num-test3": [{"numeric": [">", 200]}],825 "num-test4": [{"numeric": [">=", 200]}],826 "num-test5": [{"numeric": [">=", 200, "<=", 500]}],827 "num-test6": [{"numeric": [">", 200, "<", 500]}],828 "num-test7": [{"numeric": [">=", 200, "<", 500]}],829 },830 }831 event = {832 "EventBusName": TEST_EVENT_BUS_NAME,833 "Source": "core.update-account-command",834 "DetailType": "core.app.backend",835 "Detail": json.dumps(836 {837 "decription": "this-is-event-details",838 "amount": 200,839 "salary": 2000,840 "env": "prod",841 "user": "user3",842 "admins": "admin",843 "test1": 300,844 "test2": "test22",845 "test3": "test333",846 "test4": "this test4",847 "ip": "10.102.1.100",848 "num-test1": 100,849 "num-test2": 200,850 "num-test3": 300,851 "num-test4": 200,852 "num-test5": 500,853 "num-test6": 300,854 "num-test7": 300,855 }856 ),857 }858 self.events_client.create_event_bus(Name=TEST_EVENT_BUS_NAME)859 self.events_client.put_rule(860 Name=rule_name,861 EventBusName=TEST_EVENT_BUS_NAME,862 EventPattern=json.dumps(pattern),863 )864 self.events_client.put_targets(865 Rule=rule_name,866 EventBusName=TEST_EVENT_BUS_NAME,867 Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],868 )869 self.events_client.put_events(Entries=[event])870 def get_message(queue_url):871 resp = sqs_client.receive_message(QueueUrl=queue_url)872 return resp.get("Messages")873 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)874 self.assertEqual(1, len(messages))875 self.assertEqual(json.loads(event["Detail"]), json.loads(messages[0].get("Body")))876 event_details = json.loads(event["Detail"])877 event_details["admins"] = "no"878 event["Detail"] = json.dumps(event_details)879 self.events_client.put_events(Entries=[event])880 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)881 self.assertIsNone(messages)882 # clean up883 self.cleanup(TEST_EVENT_BUS_NAME, rule_name, target_id, queue_url=queue_url)884 def cleanup(self, bus_name, rule_name=None, target_ids=None, queue_url=None):885 kwargs = {"EventBusName": bus_name} if bus_name else {}886 if target_ids:887 target_ids = target_ids if isinstance(target_ids, list) else [target_ids]888 self.events_client.remove_targets(Rule=rule_name, Ids=target_ids, Force=True, **kwargs)889 if rule_name:890 self.events_client.delete_rule(Name=rule_name, Force=True, **kwargs)891 if bus_name:892 self.events_client.delete_event_bus(Name=bus_name)893 if queue_url:894 sqs_client = aws_stack.connect_to_service("sqs")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful