Best Python code snippet using localstack_python
test_events.py
Source:test_events.py  
1# -*- coding: utf-8 -*-2import json3import os4import unittest5import uuid6from datetime import datetime7from localstack import config8from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON369from localstack.services.events.events_listener import EVENTS_TMP_DIR10from localstack.services.generic_proxy import ProxyListener11from localstack.services.infra import start_proxy12from localstack.utils import testutil13from localstack.utils.aws import aws_stack14from localstack.utils.common import (15    get_free_tcp_port,16    get_service_protocol,17    load_file,18    retry,19    short_uid,20    to_str,21    wait_for_port_open,22)23from localstack.utils.testutil import check_expected_lambda_log_events_length24THIS_FOLDER = os.path.dirname(os.path.realpath(__file__))25TEST_EVENT_BUS_NAME = "command-bus-dev"26EVENT_DETAIL = '{"command":"update-account","payload":{"acc_id":"0a787ecb-4015","sf_id":"baz"}}'27TEST_EVENT_PATTERN = {28    "Source": ["core.update-account-command"],29    "detail-type": ["core.update-account-command"],30    "Detail": [EVENT_DETAIL],31}32class EventsTest(unittest.TestCase):33    def setUp(self):34        self.events_client = aws_stack.connect_to_service("events")35        self.iam_client = aws_stack.connect_to_service("iam")36        self.sns_client = aws_stack.connect_to_service("sns")37        self.sfn_client = aws_stack.connect_to_service("stepfunctions")38        self.sqs_client = aws_stack.connect_to_service("sqs")39    def assertIsValidEvent(self, event):40        expected_fields = (41            "version",42            "id",43            "detail-type",44            "source",45            "account",46            "time",47            "region",48            "resources",49            "detail",50        )51        for field in expected_fields:52            self.assertIn(field, event)53    def test_put_rule(self):54        rule_name = "rule-{}".format(short_uid())55        self.events_client.put_rule(Name=rule_name, EventPattern=json.dumps(TEST_EVENT_PATTERN))56        rules = self.events_client.list_rules(NamePrefix=rule_name)["Rules"]57        self.assertEqual(1, len(rules))58        self.assertEqual(TEST_EVENT_PATTERN, json.loads(rules[0]["EventPattern"]))59        # clean up60        self.events_client.delete_rule(Name=rule_name, Force=True)61    def test_events_written_to_disk_are_timestamp_prefixed_for_chronological_ordering(62        self,63    ):64        event_type = str(uuid.uuid4())65        event_details_to_publish = list(map(lambda n: "event %s" % n, range(10)))66        for detail in event_details_to_publish:67            self.events_client.put_events(68                Entries=[69                    {70                        "Source": "unittest",71                        "Resources": [],72                        "DetailType": event_type,73                        "Detail": json.dumps(detail),74                    }75                ]76            )77        sorted_events_written_to_disk = map(78            lambda filename: json.loads(str(load_file(os.path.join(EVENTS_TMP_DIR, filename)))),79            sorted(os.listdir(EVENTS_TMP_DIR)),80        )81        sorted_events = list(82            filter(83                lambda event: event["DetailType"] == event_type,84                sorted_events_written_to_disk,85            )86        )87        self.assertListEqual(88            event_details_to_publish,89            list(map(lambda event: json.loads(event["Detail"]), sorted_events)),90        )91    def test_list_tags_for_resource(self):92        rule_name = "rule-{}".format(short_uid())93        rule = self.events_client.put_rule(94            Name=rule_name, EventPattern=json.dumps(TEST_EVENT_PATTERN)95        )96        rule_arn = rule["RuleArn"]97        expected = [98            {"Key": "key1", "Value": "value1"},99            {"Key": "key2", "Value": "value2"},100        ]101        # insert two tags, verify both are visible102        self.events_client.tag_resource(ResourceARN=rule_arn, Tags=expected)103        actual = self.events_client.list_tags_for_resource(ResourceARN=rule_arn)["Tags"]104        self.assertEqual(expected, actual)105        # remove 'key2', verify only 'key1' remains106        expected = [{"Key": "key1", "Value": "value1"}]107        self.events_client.untag_resource(ResourceARN=rule_arn, TagKeys=["key2"])108        actual = self.events_client.list_tags_for_resource(ResourceARN=rule_arn)["Tags"]109        self.assertEqual(expected, actual)110        # clean up111        self.events_client.delete_rule(Name=rule_name, Force=True)112    def test_put_events_with_target_sqs(self):113        queue_name = "queue-{}".format(short_uid())114        rule_name = "rule-{}".format(short_uid())115        target_id = "target-{}".format(short_uid())116        bus_name = "bus-{}".format(short_uid())117        sqs_client = aws_stack.connect_to_service("sqs")118        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]119        queue_arn = aws_stack.sqs_queue_arn(queue_name)120        self.events_client.create_event_bus(Name=bus_name)121        self.events_client.put_rule(122            Name=rule_name,123            EventBusName=bus_name,124            EventPattern=json.dumps(TEST_EVENT_PATTERN),125        )126        rs = self.events_client.put_targets(127            Rule=rule_name,128            EventBusName=bus_name,129            Targets=[{"Id": target_id, "Arn": queue_arn}],130        )131        self.assertIn("FailedEntryCount", rs)132        self.assertIn("FailedEntries", rs)133        self.assertEqual(0, rs["FailedEntryCount"])134        self.assertEqual([], rs["FailedEntries"])135        self.events_client.put_events(136            Entries=[137                {138                    "EventBusName": bus_name,139                    "Source": TEST_EVENT_PATTERN["Source"][0],140                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],141                    "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),142                }143            ]144        )145        def get_message(queue_url):146            resp = sqs_client.receive_message(QueueUrl=queue_url)147            return resp["Messages"]148        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)149        self.assertEqual(1, len(messages))150        actual_event = json.loads(messages[0]["Body"])151        self.assertIsValidEvent(actual_event)152        self.assertEqual(TEST_EVENT_PATTERN["Detail"][0], actual_event["detail"])153        # clean up154        self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)155    def test_put_events_with_target_sqs_event_detail_match(self):156        queue_name = "queue-{}".format(short_uid())157        rule_name = "rule-{}".format(short_uid())158        target_id = "target-{}".format(short_uid())159        bus_name = "bus-{}".format(short_uid())160        sqs_client = aws_stack.connect_to_service("sqs")161        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]162        queue_arn = aws_stack.sqs_queue_arn(queue_name)163        self.events_client.create_event_bus(Name=bus_name)164        self.events_client.put_rule(165            Name=rule_name,166            EventBusName=bus_name,167            EventPattern=json.dumps({"detail": {"EventType": ["0", "1"]}}),168        )169        rs = self.events_client.put_targets(170            Rule=rule_name,171            EventBusName=bus_name,172            Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],173        )174        self.assertIn("FailedEntryCount", rs)175        self.assertIn("FailedEntries", rs)176        self.assertEqual(0, rs["FailedEntryCount"])177        self.assertEqual([], rs["FailedEntries"])178        self.events_client.put_events(179            Entries=[180                {181                    "EventBusName": bus_name,182                    "Source": TEST_EVENT_PATTERN["Source"][0],183                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],184                    "Detail": json.dumps({"EventType": "1"}),185                }186            ]187        )188        def get_message(queue_url):189            resp = sqs_client.receive_message(QueueUrl=queue_url)190            return resp.get("Messages")191        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)192        self.assertEqual(1, len(messages))193        actual_event = json.loads(messages[0]["Body"])194        self.assertEqual({"EventType": "1"}, actual_event)195        self.events_client.put_events(196            Entries=[197                {198                    "EventBusName": bus_name,199                    "Source": TEST_EVENT_PATTERN["Source"][0],200                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],201                    "Detail": json.dumps({"EventType": "2"}),202                }203            ]204        )205        def get_message(queue_url):206            resp = sqs_client.receive_message(QueueUrl=queue_url)207            return resp.get("Messages", [])208        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)209        self.assertEqual(0, len(messages))210        # clean up211        self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)212    def test_put_events_with_target_sns(self):213        queue_name = "test-%s" % short_uid()214        rule_name = "rule-{}".format(short_uid())215        target_id = "target-{}".format(short_uid())216        bus_name = "bus-{}".format(short_uid())217        sns_client = aws_stack.connect_to_service("sns")218        sqs_client = aws_stack.connect_to_service("sqs")219        topic_name = "topic-{}".format(short_uid())220        topic_arn = sns_client.create_topic(Name=topic_name)["TopicArn"]221        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]222        queue_arn = aws_stack.sqs_queue_arn(queue_name)223        sns_client.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)224        self.events_client.create_event_bus(Name=bus_name)225        self.events_client.put_rule(226            Name=rule_name,227            EventBusName=bus_name,228            EventPattern=json.dumps(TEST_EVENT_PATTERN),229        )230        rs = self.events_client.put_targets(231            Rule=rule_name,232            EventBusName=bus_name,233            Targets=[{"Id": target_id, "Arn": topic_arn}],234        )235        self.assertIn("FailedEntryCount", rs)236        self.assertIn("FailedEntries", rs)237        self.assertEqual(0, rs["FailedEntryCount"])238        self.assertEqual([], rs["FailedEntries"])239        self.events_client.put_events(240            Entries=[241                {242                    "EventBusName": bus_name,243                    "Source": TEST_EVENT_PATTERN["Source"][0],244                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],245                    "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),246                }247            ]248        )249        def get_message(queue_url):250            resp = sqs_client.receive_message(QueueUrl=queue_url)251            return resp["Messages"]252        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)253        self.assertEqual(1, len(messages))254        actual_event = json.loads(messages[0]["Body"]).get("Message")255        self.assertIsValidEvent(actual_event)256        self.assertEqual(TEST_EVENT_PATTERN["Detail"][0], json.loads(actual_event).get("detail"))257        # clean up258        sns_client.delete_topic(TopicArn=topic_arn)259        self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)260    def test_put_events_into_event_bus(self):261        queue_name = "queue-{}".format(short_uid())262        rule_name = "rule-{}".format(short_uid())263        target_id = "target-{}".format(short_uid())264        bus_name_1 = "bus1-{}".format(short_uid())265        bus_name_2 = "bus2-{}".format(short_uid())266        sqs_client = aws_stack.connect_to_service("sqs")267        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]268        queue_arn = aws_stack.sqs_queue_arn(queue_name)269        self.events_client.create_event_bus(Name=bus_name_1)270        resp = self.events_client.create_event_bus(Name=bus_name_2)271        self.events_client.put_rule(272            Name=rule_name,273            EventBusName=bus_name_1,274            EventPattern=json.dumps(TEST_EVENT_PATTERN),275        )276        self.events_client.put_targets(277            Rule=rule_name,278            EventBusName=bus_name_1,279            Targets=[{"Id": target_id, "Arn": resp.get("EventBusArn")}],280        )281        self.events_client.put_targets(282            Rule=rule_name,283            EventBusName=bus_name_2,284            Targets=[{"Id": target_id, "Arn": queue_arn}],285        )286        self.events_client.put_events(287            Entries=[288                {289                    "EventBusName": bus_name_1,290                    "Source": TEST_EVENT_PATTERN["Source"][0],291                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],292                    "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),293                }294            ]295        )296        def get_message(queue_url):297            resp = sqs_client.receive_message(QueueUrl=queue_url)298            return resp["Messages"]299        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)300        self.assertEqual(1, len(messages))301        actual_event = json.loads(messages[0]["Body"])302        self.assertIsValidEvent(actual_event)303        self.assertEqual(TEST_EVENT_PATTERN["Detail"][0], actual_event["detail"])304        # clean up305        self.cleanup(bus_name_1, rule_name, target_id)306        self.cleanup(bus_name_2)307        sqs_client.delete_queue(QueueUrl=queue_url)308    def test_put_events_with_target_lambda(self):309        rule_name = "rule-{}".format(short_uid())310        function_name = "lambda-func-{}".format(short_uid())311        target_id = "target-{}".format(short_uid())312        bus_name = "bus-{}".format(short_uid())313        handler_file = os.path.join(THIS_FOLDER, "lambdas", "lambda_echo.py")314        rs = testutil.create_lambda_function(315            handler_file=handler_file,316            func_name=function_name,317            runtime=LAMBDA_RUNTIME_PYTHON36,318        )319        func_arn = rs["CreateFunctionResponse"]["FunctionArn"]320        self.events_client.create_event_bus(Name=bus_name)321        self.events_client.put_rule(322            Name=rule_name,323            EventBusName=bus_name,324            EventPattern=json.dumps(TEST_EVENT_PATTERN),325        )326        rs = self.events_client.put_targets(327            Rule=rule_name,328            EventBusName=bus_name,329            Targets=[{"Id": target_id, "Arn": func_arn}],330        )331        self.assertIn("FailedEntryCount", rs)332        self.assertIn("FailedEntries", rs)333        self.assertEqual(0, rs["FailedEntryCount"])334        self.assertEqual([], rs["FailedEntries"])335        self.events_client.put_events(336            Entries=[337                {338                    "EventBusName": bus_name,339                    "Source": TEST_EVENT_PATTERN["Source"][0],340                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],341                    "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),342                }343            ]344        )345        # Get lambda's log events346        events = retry(347            check_expected_lambda_log_events_length,348            retries=3,349            sleep=1,350            function_name=function_name,351            expected_length=1,352        )353        actual_event = events[0]354        self.assertIsValidEvent(actual_event)355        self.assertDictEqual(356            json.loads(actual_event["detail"]),357            json.loads(TEST_EVENT_PATTERN["Detail"][0]),358        )359        # clean up360        testutil.delete_lambda_function(function_name)361        self.cleanup(bus_name, rule_name, target_id)362    def test_rule_disable(self):363        rule_name = "rule-{}".format(short_uid())364        self.events_client.put_rule(Name=rule_name, ScheduleExpression="rate(1 minutes)")365        response = self.events_client.list_rules()366        self.assertEqual("ENABLED", response["Rules"][0]["State"])367        _ = self.events_client.disable_rule(Name=rule_name)368        response = self.events_client.list_rules(NamePrefix=rule_name)369        self.assertEqual("DISABLED", response["Rules"][0]["State"])370        # clean up371        self.events_client.delete_rule(Name=rule_name, Force=True)372    def test_scheduled_expression_events(self):373        class HttpEndpointListener(ProxyListener):374            def forward_request(self, method, path, data, headers):375                event = json.loads(to_str(data))376                events.append(event)377                return 200378        local_port = get_free_tcp_port()379        proxy = start_proxy(local_port, update_listener=HttpEndpointListener())380        wait_for_port_open(local_port)381        topic_name = "topic-{}".format(short_uid())382        queue_name = "queue-{}".format(short_uid())383        fifo_queue_name = "queue-{}.fifo".format(short_uid())384        rule_name = "rule-{}".format(short_uid())385        endpoint = "{}://{}:{}".format(386            get_service_protocol(), config.LOCALSTACK_HOSTNAME, local_port387        )388        sm_role_arn = aws_stack.role_arn("sfn_role")389        sm_name = "state-machine-{}".format(short_uid())390        topic_target_id = "target-{}".format(short_uid())391        sm_target_id = "target-{}".format(short_uid())392        queue_target_id = "target-{}".format(short_uid())393        fifo_queue_target_id = "target-{}".format(short_uid())394        events = []395        state_machine_definition = """396        {397            "StartAt": "Hello",398            "States": {399                "Hello": {400                    "Type": "Pass",401                    "Result": "World",402                    "End": true403                }404            }405        }406        """407        state_machine_arn = self.sfn_client.create_state_machine(408            name=sm_name, definition=state_machine_definition, roleArn=sm_role_arn409        )["stateMachineArn"]410        topic_arn = self.sns_client.create_topic(Name=topic_name)["TopicArn"]411        self.sns_client.subscribe(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint)412        queue_url = self.sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]413        fifo_queue_url = self.sqs_client.create_queue(414            QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"}415        )["QueueUrl"]416        queue_arn = aws_stack.sqs_queue_arn(queue_name)417        fifo_queue_arn = aws_stack.sqs_queue_arn(fifo_queue_name)418        event = {"env": "testing"}419        self.events_client.put_rule(Name=rule_name, ScheduleExpression="rate(1 minutes)")420        self.events_client.put_targets(421            Rule=rule_name,422            Targets=[423                {"Id": topic_target_id, "Arn": topic_arn, "Input": json.dumps(event)},424                {425                    "Id": sm_target_id,426                    "Arn": state_machine_arn,427                    "Input": json.dumps(event),428                },429                {"Id": queue_target_id, "Arn": queue_arn, "Input": json.dumps(event)},430                {431                    "Id": fifo_queue_target_id,432                    "Arn": fifo_queue_arn,433                    "Input": json.dumps(event),434                    "SqsParameters": {"MessageGroupId": "123"},435                },436            ],437        )438        def received(q_urls):439            # state machine got executed440            executions = self.sfn_client.list_executions(stateMachineArn=state_machine_arn)[441                "executions"442            ]443            self.assertGreaterEqual(len(executions), 1)444            # http endpoint got events445            self.assertGreaterEqual(len(events), 2)446            notifications = [447                event["Message"] for event in events if event["Type"] == "Notification"448            ]449            self.assertGreaterEqual(len(notifications), 1)450            # get state machine execution detail451            execution_arn = executions[0]["executionArn"]452            execution_input = self.sfn_client.describe_execution(executionArn=execution_arn)[453                "input"454            ]455            all_msgs = []456            # get message from queue457            for url in q_urls:458                msgs = self.sqs_client.receive_message(QueueUrl=url).get("Messages", [])459                self.assertGreaterEqual(len(msgs), 1)460                all_msgs.append(msgs[0])461            return execution_input, notifications[0], all_msgs462        execution_input, notification, msgs_received = retry(463            received, retries=5, sleep=15, q_urls=[queue_url, fifo_queue_url]464        )465        self.assertEqual(event, json.loads(notification))466        self.assertEqual(event, json.loads(execution_input))467        for msg_received in msgs_received:468            self.assertEqual(event, json.loads(msg_received["Body"]))469        # clean up470        proxy.stop()471        self.cleanup(472            None,473            rule_name,474            target_ids=[topic_target_id, sm_target_id],475            queue_url=queue_url,476        )477        self.sns_client.delete_topic(TopicArn=topic_arn)478        self.sfn_client.delete_state_machine(stateMachineArn=state_machine_arn)479    def test_api_destinations(self):480        class HttpEndpointListener(ProxyListener):481            def forward_request(self, method, path, data, headers):482                event = json.loads(to_str(data))483                events.append(event)484                return 200485        events = []486        local_port = get_free_tcp_port()487        proxy = start_proxy(local_port, update_listener=HttpEndpointListener())488        wait_for_port_open(local_port)489        # create api destination490        dest_name = "d-%s" % short_uid()491        url = "http://localhost:%s" % local_port492        result = self.events_client.create_api_destination(493            Name=dest_name,494            ConnectionArn="c1",495            InvocationEndpoint=url,496            HttpMethod="POST",497        )498        # create rule and target499        rule_name = "r-%s" % short_uid()500        target_id = "target-{}".format(short_uid())501        pattern = json.dumps({"source": ["source-123"], "detail-type": ["type-123"]})502        self.events_client.put_rule(Name=rule_name, EventPattern=pattern)503        self.events_client.put_targets(504            Rule=rule_name,505            Targets=[{"Id": target_id, "Arn": result["ApiDestinationArn"]}],506        )507        # put events, to trigger rules508        num_events = 5509        for i in range(num_events):510            entries = [511                {512                    "Source": "source-123",513                    "DetailType": "type-123",514                    "Detail": '{"i": %s}' % i,515                }516            ]517            self.events_client.put_events(Entries=entries)518        # assert that all events have been received in the HTTP server listener519        def check():520            self.assertEqual(len(events), num_events)521        retry(check, sleep=0.5, retries=5)522        # clean up523        proxy.stop()524    def test_put_events_with_target_firehose(self):525        s3_bucket = "s3-{}".format(short_uid())526        s3_prefix = "testeventdata"527        stream_name = "firehose-{}".format(short_uid())528        rule_name = "rule-{}".format(short_uid())529        target_id = "target-{}".format(short_uid())530        bus_name = "bus-{}".format(short_uid())531        # create firehose target bucket532        s3_client = aws_stack.connect_to_service("s3")533        s3_client.create_bucket(Bucket=s3_bucket)534        # create firehose delivery stream to s3535        firehose_client = aws_stack.connect_to_service("firehose")536        stream = firehose_client.create_delivery_stream(537            DeliveryStreamName=stream_name,538            S3DestinationConfiguration={539                "RoleARN": aws_stack.iam_resource_arn("firehose"),540                "BucketARN": aws_stack.s3_bucket_arn(s3_bucket),541                "Prefix": s3_prefix,542            },543        )544        stream_arn = stream["DeliveryStreamARN"]545        self.events_client.create_event_bus(Name=bus_name)546        self.events_client.put_rule(547            Name=rule_name,548            EventBusName=bus_name,549            EventPattern=json.dumps(TEST_EVENT_PATTERN),550        )551        rs = self.events_client.put_targets(552            Rule=rule_name,553            EventBusName=bus_name,554            Targets=[{"Id": target_id, "Arn": stream_arn}],555        )556        self.assertIn("FailedEntryCount", rs)557        self.assertIn("FailedEntries", rs)558        self.assertEqual(0, rs["FailedEntryCount"])559        self.assertEqual([], rs["FailedEntries"])560        self.events_client.put_events(561            Entries=[562                {563                    "EventBusName": bus_name,564                    "Source": TEST_EVENT_PATTERN["Source"][0],565                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],566                    "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),567                }568            ]569        )570        # run tests571        bucket_contents = s3_client.list_objects(Bucket=s3_bucket)["Contents"]572        self.assertEqual(1, len(bucket_contents))573        key = bucket_contents[0]["Key"]574        s3_object = s3_client.get_object(Bucket=s3_bucket, Key=key)575        actual_event = json.loads(s3_object["Body"].read().decode())576        self.assertIsValidEvent(actual_event)577        self.assertEqual(TEST_EVENT_PATTERN["Detail"][0], actual_event["detail"])578        # clean up579        firehose_client.delete_delivery_stream(DeliveryStreamName=stream_name)580        # empty and delete bucket581        s3_client.delete_object(Bucket=s3_bucket, Key=key)582        s3_client.delete_bucket(Bucket=s3_bucket)583        self.cleanup(bus_name, rule_name, target_id)584    def test_put_events_with_target_sqs_new_region(self):585        self.events_client = aws_stack.connect_to_service("events", region_name="eu-west-1")586        queue_name = "queue-{}".format(short_uid())587        rule_name = "rule-{}".format(short_uid())588        target_id = "target-{}".format(short_uid())589        bus_name = "bus-{}".format(short_uid())590        sqs_client = aws_stack.connect_to_service("sqs", region_name="eu-west-1")591        sqs_client.create_queue(QueueName=queue_name)592        queue_arn = aws_stack.sqs_queue_arn(queue_name)593        self.events_client.create_event_bus(Name=bus_name)594        self.events_client.put_rule(595            Name=rule_name,596            EventBusName=bus_name,597            EventPattern=json.dumps(TEST_EVENT_PATTERN),598        )599        self.events_client.put_targets(600            Rule=rule_name,601            EventBusName=bus_name,602            Targets=[{"Id": target_id, "Arn": queue_arn}],603        )604        response = self.events_client.put_events(605            Entries=[606                {607                    "Source": "com.mycompany.myapp",608                    "Detail": '{ "key1": "value1", "key": "value2" }',609                    "Resources": [],610                    "DetailType": "myDetailType",611                }612            ]613        )614        self.assertIn("Entries", response)615        self.assertEqual(1, len(response.get("Entries")))616        self.assertIn("EventId", response.get("Entries")[0])617    def test_put_events_with_target_kinesis(self):618        rule_name = "rule-{}".format(short_uid())619        target_id = "target-{}".format(short_uid())620        bus_name = "bus-{}".format(short_uid())621        stream_name = "stream-{}".format(short_uid())622        stream_arn = aws_stack.kinesis_stream_arn(stream_name)623        kinesis_client = aws_stack.connect_to_service("kinesis")624        kinesis_client.create_stream(StreamName=stream_name, ShardCount=1)625        self.events_client.create_event_bus(Name=bus_name)626        self.events_client.put_rule(627            Name=rule_name,628            EventBusName=bus_name,629            EventPattern=json.dumps(TEST_EVENT_PATTERN),630        )631        put_response = self.events_client.put_targets(632            Rule=rule_name,633            EventBusName=bus_name,634            Targets=[635                {636                    "Id": target_id,637                    "Arn": stream_arn,638                    "KinesisParameters": {"PartitionKeyPath": "$.detail-type"},639                }640            ],641        )642        self.assertIn("FailedEntryCount", put_response)643        self.assertIn("FailedEntries", put_response)644        self.assertEqual(0, put_response["FailedEntryCount"])645        self.assertEqual([], put_response["FailedEntries"])646        def check_stream_status():647            _stream = kinesis_client.describe_stream(StreamName=stream_name)648            assert _stream["StreamDescription"]["StreamStatus"] == "ACTIVE"649        # wait until stream becomes available650        retry(check_stream_status, retries=7, sleep=0.8)651        self.events_client.put_events(652            Entries=[653                {654                    "EventBusName": bus_name,655                    "Source": TEST_EVENT_PATTERN["Source"][0],656                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],657                    "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),658                }659            ]660        )661        stream = kinesis_client.describe_stream(StreamName=stream_name)662        shard_id = stream["StreamDescription"]["Shards"][0]["ShardId"]663        shard_iterator = kinesis_client.get_shard_iterator(664            StreamName=stream_name,665            ShardId=shard_id,666            ShardIteratorType="AT_TIMESTAMP",667            Timestamp=datetime(2020, 1, 1),668        )["ShardIterator"]669        record = kinesis_client.get_records(ShardIterator=shard_iterator)["Records"][0]670        partition_key = record["PartitionKey"]671        data = json.loads(record["Data"].decode())672        self.assertEqual(TEST_EVENT_PATTERN["detail-type"][0], partition_key)673        self.assertEqual(EVENT_DETAIL, data["detail"])674        self.assertIsValidEvent(data)675    def test_put_events_with_input_path(self):676        queue_name = "queue-{}".format(short_uid())677        rule_name = "rule-{}".format(short_uid())678        target_id = "target-{}".format(short_uid())679        bus_name = "bus-{}".format(short_uid())680        sqs_client = aws_stack.connect_to_service("sqs")681        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]682        queue_arn = aws_stack.sqs_queue_arn(queue_name)683        self.events_client.create_event_bus(Name=bus_name)684        self.events_client.put_rule(685            Name=rule_name,686            EventBusName=bus_name,687            EventPattern=json.dumps(TEST_EVENT_PATTERN),688        )689        self.events_client.put_targets(690            Rule=rule_name,691            EventBusName=bus_name,692            Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],693        )694        self.events_client.put_events(695            Entries=[696                {697                    "EventBusName": bus_name,698                    "Source": TEST_EVENT_PATTERN["Source"][0],699                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],700                    "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),701                }702            ]703        )704        def get_message(queue_url):705            resp = sqs_client.receive_message(QueueUrl=queue_url)706            return resp.get("Messages")707        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)708        self.assertEqual(1, len(messages))709        self.assertEqual(EVENT_DETAIL, json.loads(messages[0].get("Body")))710        self.events_client.put_events(711            Entries=[712                {713                    "EventBusName": bus_name,714                    "Source": "dummySource",715                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],716                    "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),717                }718            ]719        )720        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)721        self.assertIsNone(messages)722        # clean up723        self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)724    def test_put_events_with_input_path_multiple(self):725        queue_name = "queue-{}".format(short_uid())726        queue_name_1 = "queue-{}".format(short_uid())727        rule_name = "rule-{}".format(short_uid())728        target_id = "target-{}".format(short_uid())729        target_id_1 = "target-{}".format(short_uid())730        bus_name = "bus-{}".format(short_uid())731        sqs_client = aws_stack.connect_to_service("sqs")732        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]733        queue_arn = aws_stack.sqs_queue_arn(queue_name)734        queue_url_1 = sqs_client.create_queue(QueueName=queue_name_1)["QueueUrl"]735        queue_arn_1 = aws_stack.sqs_queue_arn(queue_name_1)736        self.events_client.create_event_bus(Name=bus_name)737        self.events_client.put_rule(738            Name=rule_name,739            EventBusName=bus_name,740            EventPattern=json.dumps(TEST_EVENT_PATTERN),741        )742        self.events_client.put_targets(743            Rule=rule_name,744            EventBusName=bus_name,745            Targets=[746                {"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"},747                {748                    "Id": target_id_1,749                    "Arn": queue_arn_1,750                },751            ],752        )753        self.events_client.put_events(754            Entries=[755                {756                    "EventBusName": bus_name,757                    "Source": TEST_EVENT_PATTERN["Source"][0],758                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],759                    "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),760                }761            ]762        )763        def get_message(queue_url):764            resp = sqs_client.receive_message(QueueUrl=queue_url)765            return resp.get("Messages")766        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)767        self.assertEqual(1, len(messages))768        self.assertEqual(EVENT_DETAIL, json.loads(messages[0].get("Body")))769        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url_1)770        self.assertEqual(1, len(messages))771        self.assertEqual(EVENT_DETAIL, json.loads(messages[0].get("Body")).get("detail"))772        self.events_client.put_events(773            Entries=[774                {775                    "EventBusName": bus_name,776                    "Source": "dummySource",777                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],778                    "Detail": json.dumps(TEST_EVENT_PATTERN["Detail"][0]),779                }780            ]781        )782        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)783        self.assertIsNone(messages)784        # clean up785        self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)786    def test_put_event_without_source(self):787        self.events_client = aws_stack.connect_to_service("events", region_name="eu-west-1")788        response = self.events_client.put_events(Entries=[{"DetailType": "Test", "Detail": "{}"}])789        self.assertIn("Entries", response)790    def test_put_event_without_detail(self):791        self.events_client = aws_stack.connect_to_service("events", region_name="eu-west-1")792        response = self.events_client.put_events(793            Entries=[794                {795                    "DetailType": "Test",796                }797            ]798        )799        self.assertIn("Entries", response)800    def test_put_event_with_content_base_rule_in_pattern(self):801        queue_name = "queue-{}".format(short_uid())802        rule_name = "rule-{}".format(short_uid())803        target_id = "target-{}".format(short_uid())804        sqs_client = aws_stack.connect_to_service("sqs")805        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]806        queue_arn = aws_stack.sqs_queue_arn(queue_name)807        pattern = {808            "Source": [{"exists": True}],809            "detail-type": [{"prefix": "core.app"}],810            "Detail": {811                "decription": ["this-is-event-details"],812                "amount": [200],813                "salary": [2000, 4000],814                "env": ["dev", "prod"],815                "user": ["user1", "user2", "user3"],816                "admins": ["skyli", {"prefix": "hey"}, {"prefix": "ad"}],817                "test1": [{"anything-but": 200}],818                "test2": [{"anything-but": "test2"}],819                "test3": [{"anything-but": ["test3", "test33"]}],820                "test4": [{"anything-but": {"prefix": "test4"}}],821                "ip": [{"cidr": "10.102.1.0/24"}],822                "num-test1": [{"numeric": ["<", 200]}],823                "num-test2": [{"numeric": ["<=", 200]}],824                "num-test3": [{"numeric": [">", 200]}],825                "num-test4": [{"numeric": [">=", 200]}],826                "num-test5": [{"numeric": [">=", 200, "<=", 500]}],827                "num-test6": [{"numeric": [">", 200, "<", 500]}],828                "num-test7": [{"numeric": [">=", 200, "<", 500]}],829            },830        }831        event = {832            "EventBusName": TEST_EVENT_BUS_NAME,833            "Source": "core.update-account-command",834            "DetailType": "core.app.backend",835            "Detail": json.dumps(836                {837                    "decription": "this-is-event-details",838                    "amount": 200,839                    "salary": 2000,840                    "env": "prod",841                    "user": "user3",842                    "admins": "admin",843                    "test1": 300,844                    "test2": "test22",845                    "test3": "test333",846                    "test4": "this test4",847                    "ip": "10.102.1.100",848                    "num-test1": 100,849                    "num-test2": 200,850                    "num-test3": 300,851                    "num-test4": 200,852                    "num-test5": 500,853                    "num-test6": 300,854                    "num-test7": 300,855                }856            ),857        }858        self.events_client.create_event_bus(Name=TEST_EVENT_BUS_NAME)859        self.events_client.put_rule(860            Name=rule_name,861            EventBusName=TEST_EVENT_BUS_NAME,862            EventPattern=json.dumps(pattern),863        )864        self.events_client.put_targets(865            Rule=rule_name,866            EventBusName=TEST_EVENT_BUS_NAME,867            Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],868        )869        self.events_client.put_events(Entries=[event])870        def get_message(queue_url):871            resp = sqs_client.receive_message(QueueUrl=queue_url)872            return resp.get("Messages")873        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)874        self.assertEqual(1, len(messages))875        self.assertEqual(json.loads(event["Detail"]), json.loads(messages[0].get("Body")))876        event_details = json.loads(event["Detail"])877        event_details["admins"] = "no"878        event["Detail"] = json.dumps(event_details)879        self.events_client.put_events(Entries=[event])880        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)881        self.assertIsNone(messages)882        # clean up883        self.cleanup(TEST_EVENT_BUS_NAME, rule_name, target_id, queue_url=queue_url)884    def cleanup(self, bus_name, rule_name=None, target_ids=None, queue_url=None):885        kwargs = {"EventBusName": bus_name} if bus_name else {}886        if target_ids:887            target_ids = target_ids if isinstance(target_ids, list) else [target_ids]888            self.events_client.remove_targets(Rule=rule_name, Ids=target_ids, Force=True, **kwargs)889        if rule_name:890            self.events_client.delete_rule(Name=rule_name, Force=True, **kwargs)891        if bus_name:892            self.events_client.delete_event_bus(Name=bus_name)893        if queue_url:894            sqs_client = aws_stack.connect_to_service("sqs")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
