Best Python code snippet using localstack_python
test_lambda_integration.py
Source:test_lambda_integration.py  
...105            {"id": short_uid(), "data": "data1"},106            {"id": short_uid(), "data": "data2"},107        ]108        table.put_item(Item=items[0])109        events = get_lambda_log_events(function_name)110        # lambda was invoked 1 time111        self.assertEqual(1, len(events[0]["Records"]))112        # disable event source mapping113        lambda_client.update_event_source_mapping(UUID=uuid, Enabled=False)114        table.put_item(Item=items[1])115        events = get_lambda_log_events(function_name)116        # lambda no longer invoked, still have 1 event117        self.assertEqual(1, len(events[0]["Records"]))118        # clean up119        dynamodb_client = aws_stack.create_external_boto_client("dynamodb")120        dynamodb_client.delete_table(TableName=ddb_table)121        lambda_client.delete_function(FunctionName=function_name)122    def test_deletion_event_source_mapping_with_dynamodb(self):123        function_name = "lambda_func-{}".format(short_uid())124        ddb_table = "ddb_table-{}".format(short_uid())125        testutil.create_lambda_function(126            handler_file=TEST_LAMBDA_PYTHON_ECHO,127            func_name=function_name,128            runtime=LAMBDA_RUNTIME_PYTHON36,129        )130        table_arn = aws_stack.create_dynamodb_table(ddb_table, partition_key="id")[131            "TableDescription"132        ]["TableArn"]133        lambda_client = aws_stack.create_external_boto_client("lambda")134        lambda_client.create_event_source_mapping(135            FunctionName=function_name, EventSourceArn=table_arn136        )137        dynamodb_client = aws_stack.create_external_boto_client("dynamodb")138        dynamodb_client.delete_table(TableName=ddb_table)139        result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)140        self.assertEqual(0, len(result["EventSourceMappings"]))141        # clean up142        lambda_client.delete_function(FunctionName=function_name)143    def test_event_source_mapping_with_sqs(self):144        lambda_client = aws_stack.create_external_boto_client("lambda")145        sqs_client = aws_stack.create_external_boto_client("sqs")146        function_name = "lambda_func-{}".format(short_uid())147        queue_name_1 = "queue-{}-1".format(short_uid())148        testutil.create_lambda_function(149            handler_file=TEST_LAMBDA_PYTHON_ECHO,150            func_name=function_name,151            runtime=LAMBDA_RUNTIME_PYTHON36,152        )153        queue_url_1 = sqs_client.create_queue(QueueName=queue_name_1)["QueueUrl"]154        queue_arn_1 = aws_stack.sqs_queue_arn(queue_name_1)155        lambda_client.create_event_source_mapping(156            EventSourceArn=queue_arn_1, FunctionName=function_name157        )158        sqs_client.send_message(QueueUrl=queue_url_1, MessageBody=json.dumps({"foo": "bar"}))159        events = retry(get_lambda_log_events, sleep_before=3, function_name=function_name)160        # lambda was invoked 1 time161        self.assertEqual(1, len(events[0]["Records"]))162        rs = sqs_client.receive_message(QueueUrl=queue_url_1)163        self.assertIsNone(rs.get("Messages"))164        # clean up165        sqs_client.delete_queue(QueueUrl=queue_url_1)166        lambda_client.delete_function(FunctionName=function_name)167    def test_create_kinesis_event_source_mapping(self):168        function_name = f"lambda_func-{short_uid()}"169        stream_name = f"test-foobar-{short_uid()}"170        testutil.create_lambda_function(171            handler_file=TEST_LAMBDA_PYTHON_ECHO,172            func_name=function_name,173            runtime=LAMBDA_RUNTIME_PYTHON36,174        )175        arn = aws_stack.kinesis_stream_arn(stream_name, account_id="000000000000")176        lambda_client = aws_stack.create_external_boto_client("lambda")177        lambda_client.create_event_source_mapping(EventSourceArn=arn, FunctionName=function_name)178        def process_records(record):179            assert record180        aws_stack.create_kinesis_stream(stream_name, delete=True)181        kinesis_connector.listen_to_kinesis(182            stream_name=stream_name,183            listener_func=process_records,184            wait_until_started=True,185        )186        kinesis = aws_stack.create_external_boto_client("kinesis")187        stream_summary = kinesis.describe_stream_summary(StreamName=stream_name)188        self.assertEqual(1, stream_summary["StreamDescriptionSummary"]["OpenShardCount"])189        num_events_kinesis = 10190        kinesis.put_records(191            Records=[192                {"Data": "{}", "PartitionKey": "test_%s" % i} for i in range(0, num_events_kinesis)193            ],194            StreamName=stream_name,195        )196        events = get_lambda_log_events(function_name)197        self.assertEqual(10, len(events[0]["Records"]))198        self.assertIn("eventID", events[0]["Records"][0])199        self.assertIn("eventSourceARN", events[0]["Records"][0])200        self.assertIn("eventSource", events[0]["Records"][0])201        self.assertIn("eventVersion", events[0]["Records"][0])202        self.assertIn("eventName", events[0]["Records"][0])203        self.assertIn("invokeIdentityArn", events[0]["Records"][0])204        self.assertIn("awsRegion", events[0]["Records"][0])205        self.assertIn("kinesis", events[0]["Records"][0])206    def test_python_lambda_subscribe_sns_topic(self):207        sns_client = aws_stack.create_external_boto_client("sns")208        function_name = "{}-{}".format(TEST_LAMBDA_FUNCTION_PREFIX, short_uid())209        testutil.create_lambda_function(210            handler_file=TEST_LAMBDA_PYTHON_ECHO,211            func_name=function_name,212            runtime=LAMBDA_RUNTIME_PYTHON36,213        )214        topic = sns_client.create_topic(Name=TEST_SNS_TOPIC_NAME)215        topic_arn = topic["TopicArn"]216        sns_client.subscribe(217            TopicArn=topic_arn,218            Protocol="lambda",219            Endpoint=lambda_api.func_arn(function_name),220        )221        subject = "[Subject] Test subject"222        message = "Hello world."223        sns_client.publish(TopicArn=topic_arn, Subject=subject, Message=message)224        events = retry(225            check_expected_lambda_log_events_length,226            retries=3,227            sleep=1,228            function_name=function_name,229            expected_length=1,230            regex_filter="Records.*Sns",231        )232        notification = events[0]["Records"][0]["Sns"]233        self.assertIn("Subject", notification)234        self.assertEqual(subject, notification["Subject"])235class TestLambdaHttpInvocation(unittest.TestCase):236    def test_http_invocation_with_apigw_proxy(self):237        lambda_name = "test_lambda_%s" % short_uid()238        lambda_resource = "/api/v1/{proxy+}"239        lambda_path = "/api/v1/hello/world"240        lambda_request_context_path = "/" + TEST_STAGE_NAME + lambda_path241        lambda_request_context_resource_path = lambda_resource242        # create lambda function243        testutil.create_lambda_function(244            handler_file=TEST_LAMBDA_PYTHON,245            libs=TEST_LAMBDA_LIBS,246            func_name=lambda_name,247        )248        # create API Gateway and connect it to the Lambda proxy backend249        lambda_uri = aws_stack.lambda_function_arn(lambda_name)250        invocation_uri = "arn:aws:apigateway:%s:lambda:path/2015-03-31/functions/%s/invocations"251        target_uri = invocation_uri % (aws_stack.get_region(), lambda_uri)252        result = testutil.connect_api_gateway_to_http_with_lambda_proxy(253            "test_gateway2",254            target_uri,255            path=lambda_resource,256            stage_name=TEST_STAGE_NAME,257        )258        api_id = result["id"]259        url = path_based_url(api_id=api_id, stage_name=TEST_STAGE_NAME, path=lambda_path)260        result = safe_requests.post(261            url, data=b"{}", headers={"User-Agent": "python-requests/testing"}262        )263        content = json.loads(result.content)264        self.assertEqual(lambda_path, content["path"])265        self.assertEqual(lambda_resource, content["resource"])266        self.assertEqual(lambda_request_context_path, content["requestContext"]["path"])267        self.assertEqual(268            lambda_request_context_resource_path,269            content["requestContext"]["resourcePath"],270        )271        # clean up272        testutil.delete_lambda_function(lambda_name)273class TestKinesisSource:274    @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)275    def test_kinesis_lambda_parallelism(self, lambda_client, kinesis_client):276        function_name = f"lambda_func-{short_uid()}"277        stream_name = f"test-foobar-{short_uid()}"278        testutil.create_lambda_function(279            handler_file=TEST_LAMBDA_PARALLEL_FILE,280            func_name=function_name,281            runtime=LAMBDA_RUNTIME_PYTHON36,282        )283        arn = aws_stack.kinesis_stream_arn(stream_name, account_id="000000000000")284        lambda_client.create_event_source_mapping(EventSourceArn=arn, FunctionName=function_name)285        def process_records(record):286            assert record287        aws_stack.create_kinesis_stream(stream_name, delete=True)288        kinesis_connector.listen_to_kinesis(289            stream_name=stream_name,290            listener_func=process_records,291            wait_until_started=True,292        )293        kinesis = aws_stack.create_external_boto_client("kinesis")294        stream_summary = kinesis.describe_stream_summary(StreamName=stream_name)295        assert 1 == stream_summary["StreamDescriptionSummary"]["OpenShardCount"]296        num_events_kinesis = 10297        # assure async call298        start = time.perf_counter()299        kinesis.put_records(300            Records=[301                {"Data": '{"batch": 0}', "PartitionKey": "test_%s" % i}302                for i in range(0, num_events_kinesis)303            ],304            StreamName=stream_name,305        )306        assert (time.perf_counter() - start) < 1  # this should not take more than a second307        kinesis.put_records(308            Records=[309                {"Data": '{"batch": 1}', "PartitionKey": "test_%s" % i}310                for i in range(0, num_events_kinesis)311            ],312            StreamName=stream_name,313        )314        def get_events():315            events = get_lambda_log_events(function_name, regex_filter=r"event.*Records")316            assert len(events) == 2317            return events318        events = retry(get_events, retries=5)319        def assertEvent(event, batch_no):320            assert 10 == len(event["event"]["Records"])321            assert "eventID" in event["event"]["Records"][0]322            assert "eventSourceARN" in event["event"]["Records"][0]323            assert "eventSource" in event["event"]["Records"][0]324            assert "eventVersion" in event["event"]["Records"][0]325            assert "eventName" in event["event"]["Records"][0]326            assert "invokeIdentityArn" in event["event"]["Records"][0]327            assert "awsRegion" in event["event"]["Records"][0]328            assert "kinesis" in event["event"]["Records"][0]329            assert {"batch": batch_no} == json.loads(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
