Best Python code snippet using localstack_python
test_events.py
Source:test_events.py  
...185        rule_name = f"rule-{short_uid()}"186        target_id = f"target-{short_uid()}"187        bus_name = f"bus-{short_uid()}"188        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]189        queue_arn = self._get_queue_arn(queue_url, sqs_client)190        policy = {191            "Version": "2012-10-17",192            "Id": f"sqs-eventbridge-{short_uid()}",193            "Statement": [194                {195                    "Sid": f"SendMessage-{short_uid()}",196                    "Effect": "Allow",197                    "Principal": {"Service": "events.amazonaws.com"},198                    "Action": "sqs:SendMessage",199                    "Resource": queue_arn,200                }201            ],202        }203        sqs_client.set_queue_attributes(204            QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}205        )206        events_client.create_event_bus(Name=bus_name)207        events_client.put_rule(208            Name=rule_name,209            EventBusName=bus_name,210            EventPattern=json.dumps(pattern),211        )212        kwargs = {"InputPath": input_path} if input_path else {}213        rs = events_client.put_targets(214            Rule=rule_name,215            EventBusName=bus_name,216            Targets=[{"Id": target_id, "Arn": queue_arn, **kwargs}],217        )218        assert rs["FailedEntryCount"] == 0219        assert rs["FailedEntries"] == []220        try:221            for entry_asserts in entries_asserts:222                entries = entry_asserts[0]223                for entry in entries:224                    entry.setdefault("EventBusName", bus_name)225                self._put_entries_assert_results_sqs(226                    events_client,227                    sqs_client,228                    queue_url,229                    entries=entries,230                    should_match=entry_asserts[1],231                )232        finally:233            self.cleanup(234                bus_name,235                rule_name,236                target_id,237                queue_url=queue_url,238                events_client=events_client,239                sqs_client=sqs_client,240            )241    def _put_entries_assert_results_sqs(242        self, events_client, sqs_client, queue_url: str, entries: List[Dict], should_match: bool243    ):244        response = events_client.put_events(Entries=entries)245        assert not response.get("FailedEntryCount")246        def get_message(queue_url):247            resp = sqs_client.receive_message(QueueUrl=queue_url)248            messages = resp.get("Messages")249            if should_match:250                assert len(messages) == 1251            return messages252        messages = retry(get_message, retries=5, sleep=1, queue_url=queue_url)253        if should_match:254            actual_event = json.loads(messages[0]["Body"])255            if "detail" in actual_event:256                self.assert_valid_event(actual_event)257        else:258            assert not messages259        return messages260    # TODO: further unify/parameterize the tests for the different target types below261    def test_put_events_with_target_sns(262        self, events_client, sns_client, sqs_client, sns_subscription263    ):264        queue_name = "test-%s" % short_uid()265        rule_name = "rule-{}".format(short_uid())266        target_id = "target-{}".format(short_uid())267        bus_name = "bus-{}".format(short_uid())268        topic_name = "topic-{}".format(short_uid())269        topic_arn = sns_client.create_topic(Name=topic_name)["TopicArn"]270        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]271        queue_arn = aws_stack.sqs_queue_arn(queue_name)272        sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)273        events_client.create_event_bus(Name=bus_name)274        events_client.put_rule(275            Name=rule_name,276            EventBusName=bus_name,277            EventPattern=json.dumps(TEST_EVENT_PATTERN),278        )279        rs = events_client.put_targets(280            Rule=rule_name,281            EventBusName=bus_name,282            Targets=[{"Id": target_id, "Arn": topic_arn}],283        )284        assert "FailedEntryCount" in rs285        assert "FailedEntries" in rs286        assert rs["FailedEntryCount"] == 0287        assert rs["FailedEntries"] == []288        events_client.put_events(289            Entries=[290                {291                    "EventBusName": bus_name,292                    "Source": TEST_EVENT_PATTERN["source"][0],293                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],294                    "Detail": json.dumps(EVENT_DETAIL),295                }296            ]297        )298        def get_message(queue_url):299            resp = sqs_client.receive_message(QueueUrl=queue_url)300            return resp["Messages"]301        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)302        assert len(messages) == 1303        actual_event = json.loads(messages[0]["Body"]).get("Message")304        self.assert_valid_event(actual_event)305        assert json.loads(actual_event).get("detail") == EVENT_DETAIL306        # clean up307        sns_client.delete_topic(TopicArn=topic_arn)308        self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)309    def test_put_events_into_event_bus(self, events_client, sqs_client):310        queue_name = "queue-{}".format(short_uid())311        rule_name = "rule-{}".format(short_uid())312        target_id = "target-{}".format(short_uid())313        bus_name_1 = "bus1-{}".format(short_uid())314        bus_name_2 = "bus2-{}".format(short_uid())315        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]316        queue_arn = self._get_queue_arn(queue_url, sqs_client)317        events_client.create_event_bus(Name=bus_name_1)318        resp = events_client.create_event_bus(Name=bus_name_2)319        events_client.put_rule(320            Name=rule_name,321            EventBusName=bus_name_1,322            EventPattern=json.dumps(TEST_EVENT_PATTERN),323        )324        events_client.put_targets(325            Rule=rule_name,326            EventBusName=bus_name_1,327            Targets=[{"Id": target_id, "Arn": resp.get("EventBusArn")}],328        )329        events_client.put_targets(330            Rule=rule_name,331            EventBusName=bus_name_2,332            Targets=[{"Id": target_id, "Arn": queue_arn}],333        )334        events_client.put_events(335            Entries=[336                {337                    "EventBusName": bus_name_1,338                    "Source": TEST_EVENT_PATTERN["source"][0],339                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],340                    "Detail": json.dumps(EVENT_DETAIL),341                }342            ]343        )344        def get_message(queue_url):345            resp = sqs_client.receive_message(QueueUrl=queue_url)346            return resp["Messages"]347        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)348        assert len(messages) == 1349        actual_event = json.loads(messages[0]["Body"])350        self.assert_valid_event(actual_event)351        assert actual_event["detail"] == EVENT_DETAIL352        # clean up353        self.cleanup(bus_name_1, rule_name, target_id)354        self.cleanup(bus_name_2)355        sqs_client.delete_queue(QueueUrl=queue_url)356    def test_put_events_with_target_lambda(self, events_client):357        rule_name = "rule-{}".format(short_uid())358        function_name = "lambda-func-{}".format(short_uid())359        target_id = "target-{}".format(short_uid())360        bus_name = "bus-{}".format(short_uid())361        rs = testutil.create_lambda_function(362            handler_file=TEST_LAMBDA_PYTHON_ECHO,363            func_name=function_name,364            runtime=LAMBDA_RUNTIME_PYTHON36,365        )366        func_arn = rs["CreateFunctionResponse"]["FunctionArn"]367        events_client.create_event_bus(Name=bus_name)368        events_client.put_rule(369            Name=rule_name,370            EventBusName=bus_name,371            EventPattern=json.dumps(TEST_EVENT_PATTERN),372        )373        rs = events_client.put_targets(374            Rule=rule_name,375            EventBusName=bus_name,376            Targets=[{"Id": target_id, "Arn": func_arn}],377        )378        assert "FailedEntryCount" in rs379        assert "FailedEntries" in rs380        assert rs["FailedEntryCount"] == 0381        assert rs["FailedEntries"] == []382        events_client.put_events(383            Entries=[384                {385                    "EventBusName": bus_name,386                    "Source": TEST_EVENT_PATTERN["source"][0],387                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],388                    "Detail": json.dumps(EVENT_DETAIL),389                }390            ]391        )392        # Get lambda's log events393        events = retry(394            check_expected_lambda_log_events_length,395            retries=3,396            sleep=1,397            function_name=function_name,398            expected_length=1,399        )400        actual_event = events[0]401        self.assert_valid_event(actual_event)402        assert actual_event["detail"] == EVENT_DETAIL403        # clean up404        testutil.delete_lambda_function(function_name)405        self.cleanup(bus_name, rule_name, target_id)406    def test_rule_disable(self, events_client):407        rule_name = "rule-{}".format(short_uid())408        events_client.put_rule(Name=rule_name, ScheduleExpression="rate(1 minutes)")409        response = events_client.list_rules()410        assert response["Rules"][0]["State"] == "ENABLED"411        events_client.disable_rule(Name=rule_name)412        response = events_client.list_rules(NamePrefix=rule_name)413        assert response["Rules"][0]["State"] == "DISABLED"414        # clean up415        self.cleanup(rule_name=rule_name)416    def test_scheduled_expression_events(417        self, stepfunctions_client, sns_client, sqs_client, events_client, sns_subscription418    ):419        class HttpEndpointListener(ProxyListener):420            def forward_request(self, method, path, data, headers):421                event = json.loads(to_str(data))422                events.append(event)423                return 200424        local_port = get_free_tcp_port()425        proxy = start_proxy(local_port, update_listener=HttpEndpointListener())426        wait_for_port_open(local_port)427        topic_name = "topic-{}".format(short_uid())428        queue_name = "queue-{}".format(short_uid())429        fifo_queue_name = "queue-{}.fifo".format(short_uid())430        rule_name = "rule-{}".format(short_uid())431        endpoint = "{}://{}:{}".format(432            get_service_protocol(), config.LOCALSTACK_HOSTNAME, local_port433        )434        sm_role_arn = aws_stack.role_arn("sfn_role")435        sm_name = "state-machine-{}".format(short_uid())436        topic_target_id = "target-{}".format(short_uid())437        sm_target_id = "target-{}".format(short_uid())438        queue_target_id = "target-{}".format(short_uid())439        fifo_queue_target_id = "target-{}".format(short_uid())440        events = []441        state_machine_definition = """442        {443            "StartAt": "Hello",444            "States": {445                "Hello": {446                    "Type": "Pass",447                    "Result": "World",448                    "End": true449                }450            }451        }452        """453        state_machine_arn = stepfunctions_client.create_state_machine(454            name=sm_name, definition=state_machine_definition, roleArn=sm_role_arn455        )["stateMachineArn"]456        topic_arn = sns_client.create_topic(Name=topic_name)["TopicArn"]457        sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint)458        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]459        fifo_queue_url = sqs_client.create_queue(460            QueueName=fifo_queue_name,461            Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},462        )["QueueUrl"]463        queue_arn = aws_stack.sqs_queue_arn(queue_name)464        fifo_queue_arn = aws_stack.sqs_queue_arn(fifo_queue_name)465        event = {"env": "testing"}466        events_client.put_rule(Name=rule_name, ScheduleExpression="rate(1 minutes)")467        events_client.put_targets(468            Rule=rule_name,469            Targets=[470                {"Id": topic_target_id, "Arn": topic_arn, "Input": json.dumps(event)},471                {472                    "Id": sm_target_id,473                    "Arn": state_machine_arn,474                    "Input": json.dumps(event),475                },476                {"Id": queue_target_id, "Arn": queue_arn, "Input": json.dumps(event)},477                {478                    "Id": fifo_queue_target_id,479                    "Arn": fifo_queue_arn,480                    "Input": json.dumps(event),481                    "SqsParameters": {"MessageGroupId": "123"},482                },483            ],484        )485        def received(q_urls):486            # state machine got executed487            executions = stepfunctions_client.list_executions(stateMachineArn=state_machine_arn)[488                "executions"489            ]490            assert len(executions) >= 1491            # http endpoint got events492            assert len(events) >= 2493            notifications = [494                event["Message"] for event in events if event["Type"] == "Notification"495            ]496            assert len(notifications) >= 1497            # get state machine execution detail498            execution_arn = executions[0]["executionArn"]499            execution_input = stepfunctions_client.describe_execution(executionArn=execution_arn)[500                "input"501            ]502            all_msgs = []503            # get message from queue504            for url in q_urls:505                msgs = sqs_client.receive_message(QueueUrl=url).get("Messages", [])506                assert len(msgs) >= 1507                all_msgs.append(msgs[0])508            return execution_input, notifications[0], all_msgs509        execution_input, notification, msgs_received = retry(510            received, retries=5, sleep=15, q_urls=[queue_url, fifo_queue_url]511        )512        assert json.loads(notification) == event513        assert json.loads(execution_input) == event514        for msg_received in msgs_received:515            assert json.loads(msg_received["Body"]) == event516        # clean up517        proxy.stop()518        target_ids = [topic_target_id, sm_target_id, queue_target_id, fifo_queue_target_id]519        self.cleanup(None, rule_name, target_ids=target_ids, queue_url=queue_url)520        sns_client.delete_topic(TopicArn=topic_arn)521        stepfunctions_client.delete_state_machine(stateMachineArn=state_machine_arn)522    def test_api_destinations(self, events_client):523        token = short_uid()524        bearer = "Bearer %s" % token525        class HttpEndpointListener(ProxyListener):526            def forward_request(self, method, path, data, headers):527                event = json.loads(to_str(data))528                events.append(event)529                paths_list.append(path)530                auth = headers.get("Api") or headers.get("Authorization")531                if auth not in headers_list:532                    headers_list.append(auth)533                if headers.get("target_header"):534                    headers_list.append(headers.get("target_header"))535                if "client_id" in event:536                    oauth_data.update(537                        {538                            "client_id": event.get("client_id"),539                            "client_secret": event.get("client_secret"),540                            "header_value": headers.get("oauthheader"),541                            "body_value": event.get("oauthbody"),542                            "path": path,543                        }544                    )545                return requests_response(546                    {547                        "access_token": token,548                        "token_type": "Bearer",549                        "expires_in": 86400,550                    }551                )552        events = []553        paths_list = []554        headers_list = []555        oauth_data = {}556        local_port = get_free_tcp_port()557        proxy = start_proxy(local_port, update_listener=HttpEndpointListener())558        wait_for_port_open(local_port)559        url = f"http://localhost:{local_port}"560        auth_types = [561            {562                "type": "BASIC",563                "key": "BasicAuthParameters",564                "parameters": {"Username": "user", "Password": "pass"},565            },566            {567                "type": "API_KEY",568                "key": "ApiKeyAuthParameters",569                "parameters": {"ApiKeyName": "Api", "ApiKeyValue": "apikey_secret"},570            },571            {572                "type": "OAUTH_CLIENT_CREDENTIALS",573                "key": "OAuthParameters",574                "parameters": {575                    "AuthorizationEndpoint": url,576                    "ClientParameters": {"ClientID": "id", "ClientSecret": "password"},577                    "HttpMethod": "put",578                    "OAuthHttpParameters": {579                        "BodyParameters": [{"Key": "oauthbody", "Value": "value1"}],580                        "HeaderParameters": [{"Key": "oauthheader", "Value": "value2"}],581                        "QueryStringParameters": [{"Key": "oauthquery", "Value": "value3"}],582                    },583                },584            },585        ]586        for auth in auth_types:587            connection_name = "c-%s" % short_uid()588            connection_arn = events_client.create_connection(589                Name=connection_name,590                AuthorizationType=auth.get("type"),591                AuthParameters={592                    auth.get("key"): auth.get("parameters"),593                    "InvocationHttpParameters": {594                        "BodyParameters": [595                            {"Key": "key", "Value": "value", "IsValueSecret": False}596                        ],597                        "HeaderParameters": [598                            {"Key": "key", "Value": "value", "IsValueSecret": False}599                        ],600                        "QueryStringParameters": [601                            {"Key": "key", "Value": "value", "IsValueSecret": False}602                        ],603                    },604                },605            )["ConnectionArn"]606            # create api destination607            dest_name = "d-%s" % short_uid()608            result = events_client.create_api_destination(609                Name=dest_name,610                ConnectionArn=connection_arn,611                InvocationEndpoint=url,612                HttpMethod="POST",613            )614            # create rule and target615            rule_name = "r-%s" % short_uid()616            target_id = "target-{}".format(short_uid())617            pattern = json.dumps({"source": ["source-123"], "detail-type": ["type-123"]})618            events_client.put_rule(Name=rule_name, EventPattern=pattern)619            events_client.put_targets(620                Rule=rule_name,621                Targets=[622                    {623                        "Id": target_id,624                        "Arn": result["ApiDestinationArn"],625                        "Input": '{"target_value":"value"}',626                        "HttpParameters": {627                            "PathParameterValues": ["target_path"],628                            "HeaderParameters": {"target_header": "target_header_value"},629                            "QueryStringParameters": {"target_query": "t_query"},630                        },631                    }632                ],633            )634            entries = [635                {636                    "Source": "source-123",637                    "DetailType": "type-123",638                    "Detail": '{"i": %s}' % 0,639                }640            ]641            events_client.put_events(Entries=entries)642            # clean up643            events_client.delete_connection(Name=connection_name)644            events_client.delete_api_destination(Name=dest_name)645            self.cleanup(rule_name=rule_name, target_ids=target_id)646        # assert that all events have been received in the HTTP server listener647        def check():648            assert len(events) >= len(auth_types)649            assert "key" in paths_list[0] and "value" in paths_list[0]650            assert "target_query" in paths_list[0] and "t_query" in paths_list[0]651            assert "target_path" in paths_list[0]652            assert events[0].get("key") == "value"653            assert events[0].get("target_value") == "value"654            assert oauth_data.get("client_id") == "id"655            assert oauth_data.get("client_secret") == "password"656            assert oauth_data.get("header_value") == "value2"657            assert oauth_data.get("body_value") == "value1"658            assert "oauthquery" in oauth_data.get("path")659            assert "value3" in oauth_data.get("path")660            user_pass = to_str(base64.b64encode(b"user:pass"))661            assert f"Basic {user_pass}" in headers_list662            assert "apikey_secret" in headers_list663            assert bearer in headers_list664            assert "target_header_value" in headers_list665        retry(check, sleep=0.5, retries=5)666        # clean up667        proxy.stop()668    def test_put_events_with_target_firehose(self, events_client, s3_client, firehose_client):669        s3_bucket = "s3-{}".format(short_uid())670        s3_prefix = "testeventdata"671        stream_name = "firehose-{}".format(short_uid())672        rule_name = "rule-{}".format(short_uid())673        target_id = "target-{}".format(short_uid())674        bus_name = "bus-{}".format(short_uid())675        # create firehose target bucket676        aws_stack.get_or_create_bucket(s3_bucket)677        # create firehose delivery stream to s3678        stream = firehose_client.create_delivery_stream(679            DeliveryStreamName=stream_name,680            S3DestinationConfiguration={681                "RoleARN": aws_stack.iam_resource_arn("firehose"),682                "BucketARN": aws_stack.s3_bucket_arn(s3_bucket),683                "Prefix": s3_prefix,684            },685        )686        stream_arn = stream["DeliveryStreamARN"]687        events_client.create_event_bus(Name=bus_name)688        events_client.put_rule(689            Name=rule_name,690            EventBusName=bus_name,691            EventPattern=json.dumps(TEST_EVENT_PATTERN),692        )693        rs = events_client.put_targets(694            Rule=rule_name,695            EventBusName=bus_name,696            Targets=[{"Id": target_id, "Arn": stream_arn}],697        )698        assert "FailedEntryCount" in rs699        assert "FailedEntries" in rs700        assert rs["FailedEntryCount"] == 0701        assert rs["FailedEntries"] == []702        events_client.put_events(703            Entries=[704                {705                    "EventBusName": bus_name,706                    "Source": TEST_EVENT_PATTERN["source"][0],707                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],708                    "Detail": json.dumps(EVENT_DETAIL),709                }710            ]711        )712        # run tests713        bucket_contents = s3_client.list_objects(Bucket=s3_bucket)["Contents"]714        assert len(bucket_contents) == 1715        key = bucket_contents[0]["Key"]716        s3_object = s3_client.get_object(Bucket=s3_bucket, Key=key)717        actual_event = json.loads(s3_object["Body"].read().decode())718        self.assert_valid_event(actual_event)719        assert actual_event["detail"] == EVENT_DETAIL720        # clean up721        firehose_client.delete_delivery_stream(DeliveryStreamName=stream_name)722        # empty and delete bucket723        s3_client.delete_object(Bucket=s3_bucket, Key=key)724        s3_client.delete_bucket(Bucket=s3_bucket)725        self.cleanup(bus_name, rule_name, target_id)726    def test_put_events_with_target_sqs_new_region(self):727        events_client = aws_stack.create_external_boto_client("events", region_name="eu-west-1")728        queue_name = "queue-{}".format(short_uid())729        rule_name = "rule-{}".format(short_uid())730        target_id = "target-{}".format(short_uid())731        bus_name = "bus-{}".format(short_uid())732        sqs_client = aws_stack.create_external_boto_client("sqs", region_name="eu-west-1")733        sqs_client.create_queue(QueueName=queue_name)734        queue_arn = aws_stack.sqs_queue_arn(queue_name)735        events_client.create_event_bus(Name=bus_name)736        events_client.put_rule(737            Name=rule_name,738            EventBusName=bus_name,739            EventPattern=json.dumps(TEST_EVENT_PATTERN),740        )741        events_client.put_targets(742            Rule=rule_name,743            EventBusName=bus_name,744            Targets=[{"Id": target_id, "Arn": queue_arn}],745        )746        response = events_client.put_events(747            Entries=[748                {749                    "Source": "com.mycompany.myapp",750                    "Detail": '{ "key1": "value1", "key": "value2" }',751                    "Resources": [],752                    "DetailType": "myDetailType",753                }754            ]755        )756        assert "Entries" in response757        assert len(response.get("Entries")) == 1758        assert "EventId" in response.get("Entries")[0]759    def test_put_events_with_target_kinesis(self, events_client, kinesis_client):760        rule_name = "rule-{}".format(short_uid())761        target_id = "target-{}".format(short_uid())762        bus_name = "bus-{}".format(short_uid())763        stream_name = "stream-{}".format(short_uid())764        stream_arn = aws_stack.kinesis_stream_arn(stream_name)765        kinesis_client.create_stream(StreamName=stream_name, ShardCount=1)766        events_client.create_event_bus(Name=bus_name)767        events_client.put_rule(768            Name=rule_name,769            EventBusName=bus_name,770            EventPattern=json.dumps(TEST_EVENT_PATTERN),771        )772        put_response = events_client.put_targets(773            Rule=rule_name,774            EventBusName=bus_name,775            Targets=[776                {777                    "Id": target_id,778                    "Arn": stream_arn,779                    "KinesisParameters": {"PartitionKeyPath": "$.detail-type"},780                }781            ],782        )783        assert "FailedEntryCount" in put_response784        assert "FailedEntries" in put_response785        assert put_response["FailedEntryCount"] == 0786        assert put_response["FailedEntries"] == []787        def check_stream_status():788            _stream = kinesis_client.describe_stream(StreamName=stream_name)789            assert _stream["StreamDescription"]["StreamStatus"] == "ACTIVE"790        # wait until stream becomes available791        retry(check_stream_status, retries=7, sleep=0.8)792        events_client.put_events(793            Entries=[794                {795                    "EventBusName": bus_name,796                    "Source": TEST_EVENT_PATTERN["source"][0],797                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],798                    "Detail": json.dumps(EVENT_DETAIL),799                }800            ]801        )802        stream = kinesis_client.describe_stream(StreamName=stream_name)803        shard_id = stream["StreamDescription"]["Shards"][0]["ShardId"]804        shard_iterator = kinesis_client.get_shard_iterator(805            StreamName=stream_name,806            ShardId=shard_id,807            ShardIteratorType="AT_TIMESTAMP",808            Timestamp=datetime(2020, 1, 1),809        )["ShardIterator"]810        record = kinesis_client.get_records(ShardIterator=shard_iterator)["Records"][0]811        partition_key = record["PartitionKey"]812        data = json.loads(record["Data"].decode())813        assert partition_key == TEST_EVENT_PATTERN["detail-type"][0]814        assert data["detail"] == EVENT_DETAIL815        self.assert_valid_event(data)816    def test_put_events_with_input_path(self, events_client, sqs_client):817        queue_name = f"queue-{short_uid()}"818        rule_name = f"rule-{short_uid()}"819        target_id = f"target-{short_uid()}"820        bus_name = f"bus-{short_uid()}"821        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]822        queue_arn = aws_stack.sqs_queue_arn(queue_name)823        events_client.create_event_bus(Name=bus_name)824        events_client.put_rule(825            Name=rule_name,826            EventBusName=bus_name,827            EventPattern=json.dumps(TEST_EVENT_PATTERN),828        )829        events_client.put_targets(830            Rule=rule_name,831            EventBusName=bus_name,832            Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],833        )834        events_client.put_events(835            Entries=[836                {837                    "EventBusName": bus_name,838                    "Source": TEST_EVENT_PATTERN["source"][0],839                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],840                    "Detail": json.dumps(EVENT_DETAIL),841                }842            ]843        )844        def get_message(queue_url):845            resp = sqs_client.receive_message(QueueUrl=queue_url)846            return resp.get("Messages")847        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)848        assert len(messages) == 1849        assert json.loads(messages[0].get("Body")) == EVENT_DETAIL850        events_client.put_events(851            Entries=[852                {853                    "EventBusName": bus_name,854                    "Source": "dummySource",855                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],856                    "Detail": json.dumps(EVENT_DETAIL),857                }858            ]859        )860        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)861        assert messages is None862        # clean up863        self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)864    def test_put_events_with_input_path_multiple(self, events_client, sqs_client):865        queue_name = "queue-{}".format(short_uid())866        queue_name_1 = "queue-{}".format(short_uid())867        rule_name = "rule-{}".format(short_uid())868        target_id = "target-{}".format(short_uid())869        target_id_1 = "target-{}".format(short_uid())870        bus_name = "bus-{}".format(short_uid())871        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]872        queue_arn = aws_stack.sqs_queue_arn(queue_name)873        queue_url_1 = sqs_client.create_queue(QueueName=queue_name_1)["QueueUrl"]874        queue_arn_1 = aws_stack.sqs_queue_arn(queue_name_1)875        events_client.create_event_bus(Name=bus_name)876        events_client.put_rule(877            Name=rule_name,878            EventBusName=bus_name,879            EventPattern=json.dumps(TEST_EVENT_PATTERN),880        )881        events_client.put_targets(882            Rule=rule_name,883            EventBusName=bus_name,884            Targets=[885                {"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"},886                {887                    "Id": target_id_1,888                    "Arn": queue_arn_1,889                },890            ],891        )892        events_client.put_events(893            Entries=[894                {895                    "EventBusName": bus_name,896                    "Source": TEST_EVENT_PATTERN["source"][0],897                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],898                    "Detail": json.dumps(EVENT_DETAIL),899                }900            ]901        )902        def get_message(queue_url):903            resp = sqs_client.receive_message(QueueUrl=queue_url)904            return resp.get("Messages")905        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)906        assert len(messages) == 1907        assert json.loads(messages[0].get("Body")) == EVENT_DETAIL908        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url_1)909        assert len(messages) == 1910        assert json.loads(messages[0].get("Body")).get("detail") == EVENT_DETAIL911        events_client.put_events(912            Entries=[913                {914                    "EventBusName": bus_name,915                    "Source": "dummySource",916                    "DetailType": TEST_EVENT_PATTERN["detail-type"][0],917                    "Detail": json.dumps(EVENT_DETAIL),918                }919            ]920        )921        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)922        assert messages is None923        # clean up924        self.cleanup(bus_name, rule_name, [target_id, target_id_1], queue_url=queue_url)925    def test_put_event_without_source(self):926        events_client = aws_stack.create_external_boto_client("events", region_name="eu-west-1")927        response = events_client.put_events(Entries=[{"DetailType": "Test", "Detail": "{}"}])928        assert response.get("Entries")929    def test_put_event_without_detail(self):930        events_client = aws_stack.create_external_boto_client("events", region_name="eu-west-1")931        response = events_client.put_events(932            Entries=[933                {934                    "DetailType": "Test",935                }936            ]937        )938        assert response.get("Entries")939    def test_trigger_event_on_ssm_change(self, events_client, sqs_client, ssm_client):940        rule_name = "rule-{}".format(short_uid())941        target_id = "target-{}".format(short_uid())942        # create queue943        queue_name = "queue-{}".format(short_uid())944        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]945        queue_arn = aws_stack.sqs_queue_arn(queue_name)946        # put rule listening on SSM changes947        ssm_prefix = "/test/local/"948        events_client.put_rule(949            Name=rule_name,950            EventPattern=json.dumps(951                {952                    "detail": {953                        "name": [{"prefix": ssm_prefix}],954                        "operation": ["Create", "Update", "Delete", "LabelParameterVersion"],955                    },956                    "detail-type": ["Parameter Store Change"],957                    "source": ["aws.ssm"],958                }959            ),960            State="ENABLED",961            Description="Trigger on SSM parameter changes",962        )963        # put target964        events_client.put_targets(965            Rule=rule_name,966            EventBusName=TEST_EVENT_BUS_NAME,967            Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],968        )969        # change SSM param to trigger event970        ssm_client.put_parameter(Name=f"{ssm_prefix}/test123", Value="value1", Type="String")971        def assert_message():972            resp = sqs_client.receive_message(QueueUrl=queue_url)973            result = resp.get("Messages")974            body = json.loads(result[0]["Body"])975            assert body == {"name": "/test/local/test123", "operation": "Create"}976        # assert that message has been received977        retry(assert_message, retries=7, sleep=0.3)978        # clean up979        self.cleanup(rule_name=rule_name, target_ids=target_id)980    def test_put_event_with_content_base_rule_in_pattern(self, events_client, sqs_client):981        queue_name = f"queue-{short_uid()}"982        rule_name = f"rule-{short_uid()}"983        target_id = f"target-{short_uid()}"984        queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]985        queue_arn = aws_stack.sqs_queue_arn(queue_name)986        pattern = {987            "Source": [{"exists": True}],988            "detail-type": [{"prefix": "core.app"}],989            "Detail": {990                "decription": ["this-is-event-details"],991                "amount": [200],992                "salary": [2000, 4000],993                "env": ["dev", "prod"],994                "user": ["user1", "user2", "user3"],995                "admins": ["skyli", {"prefix": "hey"}, {"prefix": "ad"}],996                "test1": [{"anything-but": 200}],997                "test2": [{"anything-but": "test2"}],998                "test3": [{"anything-but": ["test3", "test33"]}],999                "test4": [{"anything-but": {"prefix": "test4"}}],1000                "ip": [{"cidr": "10.102.1.0/24"}],1001                "num-test1": [{"numeric": ["<", 200]}],1002                "num-test2": [{"numeric": ["<=", 200]}],1003                "num-test3": [{"numeric": [">", 200]}],1004                "num-test4": [{"numeric": [">=", 200]}],1005                "num-test5": [{"numeric": [">=", 200, "<=", 500]}],1006                "num-test6": [{"numeric": [">", 200, "<", 500]}],1007                "num-test7": [{"numeric": [">=", 200, "<", 500]}],1008            },1009        }1010        event = {1011            "EventBusName": TEST_EVENT_BUS_NAME,1012            "Source": "core.update-account-command",1013            "DetailType": "core.app.backend",1014            "Detail": json.dumps(1015                {1016                    "decription": "this-is-event-details",1017                    "amount": 200,1018                    "salary": 2000,1019                    "env": "prod",1020                    "user": "user3",1021                    "admins": "admin",1022                    "test1": 300,1023                    "test2": "test22",1024                    "test3": "test333",1025                    "test4": "this test4",1026                    "ip": "10.102.1.100",1027                    "num-test1": 100,1028                    "num-test2": 200,1029                    "num-test3": 300,1030                    "num-test4": 200,1031                    "num-test5": 500,1032                    "num-test6": 300,1033                    "num-test7": 300,1034                }1035            ),1036        }1037        events_client.create_event_bus(Name=TEST_EVENT_BUS_NAME)1038        events_client.put_rule(1039            Name=rule_name,1040            EventBusName=TEST_EVENT_BUS_NAME,1041            EventPattern=json.dumps(pattern),1042        )1043        events_client.put_targets(1044            Rule=rule_name,1045            EventBusName=TEST_EVENT_BUS_NAME,1046            Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],1047        )1048        events_client.put_events(Entries=[event])1049        def get_message(queue_url):1050            resp = sqs_client.receive_message(QueueUrl=queue_url)1051            return resp.get("Messages")1052        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)1053        assert len(messages) == 11054        assert json.loads(messages[0].get("Body")) == json.loads(event["Detail"])1055        event_details = json.loads(event["Detail"])1056        event_details["admins"] = "no"1057        event["Detail"] = json.dumps(event_details)1058        events_client.put_events(Entries=[event])1059        messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)1060        assert messages is None1061        # clean up1062        self.cleanup(TEST_EVENT_BUS_NAME, rule_name, target_id, queue_url=queue_url)1063    def _get_queue_arn(self, queue_url, sqs_client):1064        queue_attrs = sqs_client.get_queue_attributes(1065            QueueUrl=queue_url, AttributeNames=["QueueArn"]1066        )1067        return queue_attrs["Attributes"]["QueueArn"]1068    def cleanup(1069        self,1070        bus_name=None,1071        rule_name=None,1072        target_ids=None,1073        queue_url=None,1074        events_client=None,1075        sqs_client=None,1076    ):1077        events_client = events_client or aws_stack.create_external_boto_client("events")...partial_environment_create.py
Source:partial_environment_create.py  
...173            if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':174                return sqs_client.create_queue(QueueName=queue_name)['QueueUrl']175            raise e176    @staticmethod177    def _get_queue_arn(queue_url):178        sqs_client = boto3.client('sqs')179        response = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['QueueArn'])180        return response['Attributes']['QueueArn']181    def _ensure_queue_exists(self, output_config, queue_name):182        queue_url = self._get_queue_url(output_config['sqs']['queue_names'][queue_name])183        queue_arn = self._get_queue_arn(queue_url)184        return queue_arn, queue_url185    @staticmethod186    def _retry_if_service_unavailable_error(exception):187        return isinstance(exception, DartRequestException) and exception.response.status_code == 503188    @staticmethod189    @retry(stop_max_attempt_number=10, wait_fixed=5000, retry_on_exception=_retry_if_service_unavailable_error)190    def _with_retries(function, *args, **kwargs):191        function(*args, **kwargs)192def _parse_args():193    parser = argparse.ArgumentParser()194    parser.add_argument('-n', '--environment-name', action='store', dest='environment_name', required=True)195    parser.add_argument('-m', '--mode', action='store', dest='mode', required=True)196    parser.add_argument('-i', '--input-config-path', action='store', dest='input_config_path', required=True)197    parser.add_argument('-o', '--output-config-s3-path', action='store', dest='output_config_s3_path', required=True)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
