How to use lambda_su_role method in localstack

Best Python code snippet using localstack_python

test_lambda_integration.py

Source:test_lambda_integration.py Github

copy

Full Screen

1import base642import json3import os4import time5from unittest.mock import patch6import pytest7from botocore.exceptions import ClientError8from localstack import config9from localstack.services.apigateway.helpers import path_based_url10from localstack.services.awslambda.lambda_api import (11 BATCH_SIZE_RANGES,12 INVALID_PARAMETER_VALUE_EXCEPTION,13)14from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON3615from localstack.utils import testutil16from localstack.utils.aws import aws_stack17from localstack.utils.common import retry, safe_requests, short_uid18from localstack.utils.sync import poll_condition19from localstack.utils.testutil import check_expected_lambda_log_events_length, get_lambda_log_events20from .test_lambda import (21 TEST_LAMBDA_FUNCTION_PREFIX,22 TEST_LAMBDA_LIBS,23 TEST_LAMBDA_PYTHON,24 TEST_LAMBDA_PYTHON_ECHO,25 is_old_provider,26)27TEST_STAGE_NAME = "testing"28TEST_SNS_TOPIC_NAME = "sns-topic-1"29THIS_FOLDER = os.path.dirname(os.path.realpath(__file__))30TEST_LAMBDA_PARALLEL_FILE = os.path.join(THIS_FOLDER, "functions", "lambda_parallel.py")31class TestLambdaEventSourceMappings:32 def test_event_source_mapping_default_batch_size(33 self,34 create_lambda_function,35 lambda_client,36 sqs_client,37 sqs_create_queue,38 sqs_queue_arn,39 dynamodb_client,40 dynamodb_create_table,41 lambda_su_role,42 ):43 function_name = f"lambda_func-{short_uid()}"44 queue_name_1 = f"queue-{short_uid()}-1"45 queue_name_2 = f"queue-{short_uid()}-2"46 ddb_table = f"ddb_table-{short_uid()}"47 create_lambda_function(48 func_name=function_name,49 handler_file=TEST_LAMBDA_PYTHON_ECHO,50 runtime=LAMBDA_RUNTIME_PYTHON36,51 role=lambda_su_role,52 )53 queue_url_1 = sqs_create_queue(QueueName=queue_name_1)54 queue_arn_1 = sqs_queue_arn(queue_url_1)55 rs = lambda_client.create_event_source_mapping(56 EventSourceArn=queue_arn_1, FunctionName=function_name57 )58 assert BATCH_SIZE_RANGES["sqs"][0] == rs["BatchSize"]59 uuid = rs["UUID"]60 def wait_for_event_source_mapping():61 return lambda_client.get_event_source_mapping(UUID=uuid)["State"] == "Enabled"62 assert poll_condition(wait_for_event_source_mapping, timeout=30)63 with pytest.raises(ClientError) as e:64 # Update batch size with invalid value65 lambda_client.update_event_source_mapping(66 UUID=uuid,67 FunctionName=function_name,68 BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,69 )70 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)71 queue_url_2 = sqs_create_queue(QueueName=queue_name_2)72 queue_arn_2 = sqs_queue_arn(queue_url_2)73 with pytest.raises(ClientError) as e:74 # Create event source mapping with invalid batch size value75 lambda_client.create_event_source_mapping(76 EventSourceArn=queue_arn_2,77 FunctionName=function_name,78 BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,79 )80 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)81 table_description = dynamodb_create_table(82 table_name=ddb_table,83 partition_key="id",84 stream_view_type="NEW_IMAGE",85 )["TableDescription"]86 # table ARNs are not sufficient as event source, needs to be a dynamodb stream arn87 if not is_old_provider():88 with pytest.raises(ClientError) as e:89 lambda_client.create_event_source_mapping(90 EventSourceArn=table_description["TableArn"],91 FunctionName=function_name,92 StartingPosition="LATEST",93 )94 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)95 # check if event source mapping can be created with latest stream ARN96 rs = lambda_client.create_event_source_mapping(97 EventSourceArn=table_description["LatestStreamArn"],98 FunctionName=function_name,99 StartingPosition="LATEST",100 )101 assert BATCH_SIZE_RANGES["dynamodb"][0] == rs["BatchSize"]102 def test_disabled_event_source_mapping_with_dynamodb(103 self,104 create_lambda_function,105 lambda_client,106 dynamodb_resource,107 dynamodb_client,108 dynamodb_create_table,109 logs_client,110 dynamodbstreams_client,111 lambda_su_role,112 ):113 function_name = f"lambda_func-{short_uid()}"114 ddb_table = f"ddb_table-{short_uid()}"115 create_lambda_function(116 func_name=function_name,117 handler_file=TEST_LAMBDA_PYTHON_ECHO,118 runtime=LAMBDA_RUNTIME_PYTHON36,119 role=lambda_su_role,120 )121 latest_stream_arn = dynamodb_create_table(122 table_name=ddb_table, partition_key="id", stream_view_type="NEW_IMAGE"123 )["TableDescription"]["LatestStreamArn"]124 rs = lambda_client.create_event_source_mapping(125 FunctionName=function_name,126 EventSourceArn=latest_stream_arn,127 StartingPosition="TRIM_HORIZON",128 MaximumBatchingWindowInSeconds=1,129 )130 uuid = rs["UUID"]131 def wait_for_table_created():132 return (133 dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]134 == "ACTIVE"135 )136 assert poll_condition(wait_for_table_created, timeout=30)137 def wait_for_stream_created():138 return (139 dynamodbstreams_client.describe_stream(StreamArn=latest_stream_arn)[140 "StreamDescription"141 ]["StreamStatus"]142 == "ENABLED"143 )144 assert poll_condition(wait_for_stream_created, timeout=30)145 table = dynamodb_resource.Table(ddb_table)146 items = [147 {"id": short_uid(), "data": "data1"},148 {"id": short_uid(), "data": "data2"},149 ]150 table.put_item(Item=items[0])151 def assert_events():152 events = get_lambda_log_events(function_name, logs_client=logs_client)153 # lambda was invoked 1 time154 assert 1 == len(events[0]["Records"])155 # might take some time against AWS156 retry(assert_events, sleep=3, retries=10)157 # disable event source mapping158 lambda_client.update_event_source_mapping(UUID=uuid, Enabled=False)159 table.put_item(Item=items[1])160 events = get_lambda_log_events(function_name, logs_client=logs_client)161 # lambda no longer invoked, still have 1 event162 assert 1 == len(events[0]["Records"])163 # TODO invalid test against AWS, this behavior just is not correct164 def test_deletion_event_source_mapping_with_dynamodb(165 self, create_lambda_function, lambda_client, dynamodb_client, lambda_su_role166 ):167 function_name = f"lambda_func-{short_uid()}"168 ddb_table = f"ddb_table-{short_uid()}"169 create_lambda_function(170 func_name=function_name,171 handler_file=TEST_LAMBDA_PYTHON_ECHO,172 runtime=LAMBDA_RUNTIME_PYTHON36,173 role=lambda_su_role,174 )175 latest_stream_arn = aws_stack.create_dynamodb_table(176 table_name=ddb_table,177 partition_key="id",178 client=dynamodb_client,179 stream_view_type="NEW_IMAGE",180 )["TableDescription"]["LatestStreamArn"]181 lambda_client.create_event_source_mapping(182 FunctionName=function_name,183 EventSourceArn=latest_stream_arn,184 StartingPosition="TRIM_HORIZON",185 )186 def wait_for_table_created():187 return (188 dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]189 == "ACTIVE"190 )191 assert poll_condition(wait_for_table_created, timeout=30)192 dynamodb_client.delete_table(TableName=ddb_table)193 result = lambda_client.list_event_source_mappings(EventSourceArn=latest_stream_arn)194 assert 1 == len(result["EventSourceMappings"])195 def test_event_source_mapping_with_sqs(196 self,197 create_lambda_function,198 lambda_client,199 sqs_client,200 sqs_create_queue,201 sqs_queue_arn,202 logs_client,203 lambda_su_role,204 ):205 function_name = f"lambda_func-{short_uid()}"206 queue_name_1 = f"queue-{short_uid()}-1"207 create_lambda_function(208 func_name=function_name,209 handler_file=TEST_LAMBDA_PYTHON_ECHO,210 runtime=LAMBDA_RUNTIME_PYTHON36,211 role=lambda_su_role,212 )213 queue_url_1 = sqs_create_queue(QueueName=queue_name_1)214 queue_arn_1 = sqs_queue_arn(queue_url_1)215 lambda_client.create_event_source_mapping(216 EventSourceArn=queue_arn_1, FunctionName=function_name, MaximumBatchingWindowInSeconds=1217 )218 sqs_client.send_message(QueueUrl=queue_url_1, MessageBody=json.dumps({"foo": "bar"}))219 def assert_lambda_log_events():220 events = get_lambda_log_events(function_name=function_name, logs_client=logs_client)221 # lambda was invoked 1 time222 assert 1 == len(events[0]["Records"])223 retry(assert_lambda_log_events, sleep_before=3, retries=30)224 rs = sqs_client.receive_message(QueueUrl=queue_url_1)225 assert rs.get("Messages") is None226 def test_create_kinesis_event_source_mapping(227 self,228 create_lambda_function,229 lambda_client,230 kinesis_client,231 kinesis_create_stream,232 lambda_su_role,233 wait_for_stream_ready,234 logs_client,235 ):236 function_name = f"lambda_func-{short_uid()}"237 stream_name = f"test-foobar-{short_uid()}"238 create_lambda_function(239 func_name=function_name,240 handler_file=TEST_LAMBDA_PYTHON_ECHO,241 runtime=LAMBDA_RUNTIME_PYTHON36,242 role=lambda_su_role,243 )244 kinesis_create_stream(StreamName=stream_name, ShardCount=1)245 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][246 "StreamARN"247 ]248 # only valid against AWS / new provider (once implemented)249 if not is_old_provider():250 with pytest.raises(ClientError) as e:251 lambda_client.create_event_source_mapping(252 EventSourceArn=stream_arn, FunctionName=function_name253 )254 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)255 wait_for_stream_ready(stream_name=stream_name)256 lambda_client.create_event_source_mapping(257 EventSourceArn=stream_arn, FunctionName=function_name, StartingPosition="TRIM_HORIZON"258 )259 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)260 assert 1 == stream_summary["StreamDescriptionSummary"]["OpenShardCount"]261 num_events_kinesis = 10262 kinesis_client.put_records(263 Records=[264 {"Data": "{}", "PartitionKey": f"test_{i}"} for i in range(0, num_events_kinesis)265 ],266 StreamName=stream_name,267 )268 def get_lambda_events():269 events = get_lambda_log_events(function_name, logs_client=logs_client)270 assert events271 return events272 events = retry(get_lambda_events, retries=30)273 assert 10 == len(events[0]["Records"])274 assert "eventID" in events[0]["Records"][0]275 assert "eventSourceARN" in events[0]["Records"][0]276 assert "eventSource" in events[0]["Records"][0]277 assert "eventVersion" in events[0]["Records"][0]278 assert "eventName" in events[0]["Records"][0]279 assert "invokeIdentityArn" in events[0]["Records"][0]280 assert "awsRegion" in events[0]["Records"][0]281 assert "kinesis" in events[0]["Records"][0]282 def test_python_lambda_subscribe_sns_topic(283 self,284 create_lambda_function,285 sns_client,286 lambda_su_role,287 sns_topic,288 logs_client,289 lambda_client,290 ):291 function_name = f"{TEST_LAMBDA_FUNCTION_PREFIX}-{short_uid()}"292 permission_id = f"test-statement-{short_uid()}"293 lambda_creation_response = create_lambda_function(294 func_name=function_name,295 handler_file=TEST_LAMBDA_PYTHON_ECHO,296 runtime=LAMBDA_RUNTIME_PYTHON36,297 role=lambda_su_role,298 )299 lambda_arn = lambda_creation_response["CreateFunctionResponse"]["FunctionArn"]300 topic_arn = sns_topic["Attributes"]["TopicArn"]301 lambda_client.add_permission(302 FunctionName=function_name,303 StatementId=permission_id,304 Action="lambda:InvokeFunction",305 Principal="sns.amazonaws.com",306 SourceArn=topic_arn,307 )308 sns_client.subscribe(309 TopicArn=topic_arn,310 Protocol="lambda",311 Endpoint=lambda_arn,312 )313 subject = "[Subject] Test subject"314 message = "Hello world."315 sns_client.publish(TopicArn=topic_arn, Subject=subject, Message=message)316 events = retry(317 check_expected_lambda_log_events_length,318 retries=10,319 sleep=1,320 function_name=function_name,321 expected_length=1,322 regex_filter="Records.*Sns",323 logs_client=logs_client,324 )325 notification = events[0]["Records"][0]["Sns"]326 assert "Subject" in notification327 assert subject == notification["Subject"]328class TestLambdaHttpInvocation:329 def test_http_invocation_with_apigw_proxy(self, create_lambda_function):330 lambda_name = f"test_lambda_{short_uid()}"331 lambda_resource = "/api/v1/{proxy+}"332 lambda_path = "/api/v1/hello/world"333 lambda_request_context_path = "/" + TEST_STAGE_NAME + lambda_path334 lambda_request_context_resource_path = lambda_resource335 # create lambda function336 create_lambda_function(337 func_name=lambda_name,338 handler_file=TEST_LAMBDA_PYTHON,339 libs=TEST_LAMBDA_LIBS,340 )341 # create API Gateway and connect it to the Lambda proxy backend342 lambda_uri = aws_stack.lambda_function_arn(lambda_name)343 target_uri = f"arn:aws:apigateway:{aws_stack.get_region()}:lambda:path/2015-03-31/functions/{lambda_uri}/invocations"344 result = testutil.connect_api_gateway_to_http_with_lambda_proxy(345 "test_gateway2",346 target_uri,347 path=lambda_resource,348 stage_name=TEST_STAGE_NAME,349 )350 api_id = result["id"]351 url = path_based_url(api_id=api_id, stage_name=TEST_STAGE_NAME, path=lambda_path)352 result = safe_requests.post(353 url, data=b"{}", headers={"User-Agent": "python-requests/testing"}354 )355 content = json.loads(result.content)356 assert lambda_path == content["path"]357 assert lambda_resource == content["resource"]358 assert lambda_request_context_path == content["requestContext"]["path"]359 assert lambda_request_context_resource_path == content["requestContext"]["resourcePath"]360class TestKinesisSource:361 @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)362 def test_kinesis_lambda_parallelism(363 self,364 lambda_client,365 kinesis_client,366 create_lambda_function,367 kinesis_create_stream,368 wait_for_stream_ready,369 logs_client,370 lambda_su_role,371 ):372 function_name = f"lambda_func-{short_uid()}"373 stream_name = f"test-foobar-{short_uid()}"374 create_lambda_function(375 handler_file=TEST_LAMBDA_PARALLEL_FILE,376 func_name=function_name,377 runtime=LAMBDA_RUNTIME_PYTHON36,378 role=lambda_su_role,379 )380 kinesis_create_stream(StreamName=stream_name, ShardCount=1)381 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][382 "StreamARN"383 ]384 wait_for_stream_ready(stream_name=stream_name)385 lambda_client.create_event_source_mapping(386 EventSourceArn=stream_arn,387 FunctionName=function_name,388 StartingPosition="TRIM_HORIZON",389 BatchSize=10,390 )391 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)392 assert 1 == stream_summary["StreamDescriptionSummary"]["OpenShardCount"]393 num_events_kinesis = 10394 # assure async call395 start = time.perf_counter()396 kinesis_client.put_records(397 Records=[398 {"Data": '{"batch": 0}', "PartitionKey": f"test_{i}"}399 for i in range(0, num_events_kinesis)400 ],401 StreamName=stream_name,402 )403 assert (time.perf_counter() - start) < 1 # this should not take more than a second404 kinesis_client.put_records(405 Records=[406 {"Data": '{"batch": 1}', "PartitionKey": f"test_{i}"}407 for i in range(0, num_events_kinesis)408 ],409 StreamName=stream_name,410 )411 def get_events():412 events = get_lambda_log_events(413 function_name, regex_filter=r"event.*Records", logs_client=logs_client414 )415 assert len(events) == 2416 return events417 events = retry(get_events, retries=30)418 def assertEvent(event, batch_no):419 assert 10 == len(event["event"]["Records"])420 assert "eventID" in event["event"]["Records"][0]421 assert "eventSourceARN" in event["event"]["Records"][0]422 assert "eventSource" in event["event"]["Records"][0]423 assert "eventVersion" in event["event"]["Records"][0]424 assert "eventName" in event["event"]["Records"][0]425 assert "invokeIdentityArn" in event["event"]["Records"][0]426 assert "awsRegion" in event["event"]["Records"][0]427 assert "kinesis" in event["event"]["Records"][0]428 assert {"batch": batch_no} == json.loads(429 base64.b64decode(event["event"]["Records"][0]["kinesis"]["data"]).decode(430 config.DEFAULT_ENCODING431 )432 )433 assertEvent(events[0], 0)434 assertEvent(events[1], 1)...

Full Screen

Full Screen

test_lambda_size_lims.py

Source:test_lambda_size_lims.py Github

copy

Full Screen

1import os2from io import BytesIO3import pytest4from botocore.exceptions import ClientError5from localstack.services.awslambda.lambda_api import LAMBDA_DEFAULT_HANDLER6from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON377from localstack.utils import testutil8from localstack.utils.common import short_uid9THIS_FOLDER = os.path.dirname(os.path.realpath(__file__))10TEST_LAMBDA_PYTHON_ECHO = os.path.join(THIS_FOLDER, "functions", "lambda_echo.py")11FUNCTION_MAX_UNZIPPED_SIZE = 26214400012def generate_sized_python_str(size):13 """Generate a text file of the specified size."""14 with open(TEST_LAMBDA_PYTHON_ECHO, "r") as f:15 py_str = f.read()16 py_str += "#" * (size - len(py_str))17 return py_str18class TestLambdaSizeLimits:19 @pytest.mark.aws_validated20 def test_oversized_lambda(self, lambda_client, s3_client, s3_bucket, lambda_su_role):21 function_name = f"test_lambda_{short_uid()}"22 bucket_key = "test_lambda.zip"23 code_str = generate_sized_python_str(FUNCTION_MAX_UNZIPPED_SIZE)24 # upload zip file to S325 zip_file = testutil.create_lambda_archive(26 code_str, get_content=True, runtime=LAMBDA_RUNTIME_PYTHON3727 )28 s3_client.upload_fileobj(BytesIO(zip_file), s3_bucket, bucket_key)29 # create lambda function30 with pytest.raises(ClientError) as e:31 lambda_client.create_function(32 FunctionName=function_name,33 Runtime=LAMBDA_RUNTIME_PYTHON37,34 Handler=LAMBDA_DEFAULT_HANDLER,35 Role=lambda_su_role,36 Code={"S3Bucket": s3_bucket, "S3Key": bucket_key},37 Timeout=10,38 )39 assert e40 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 40041 e.match(42 r"An error occurred \(InvalidParameterValueException\) when calling the CreateFunction operation\: Unzipped size must be smaller than [0-9]* bytes"43 )44 @pytest.mark.aws_validated45 def test_large_lambda(self, lambda_client, s3_client, s3_bucket, lambda_su_role):46 function_name = f"test_lambda_{short_uid()}"47 bucket_key = "test_lambda.zip"48 code_str = generate_sized_python_str(FUNCTION_MAX_UNZIPPED_SIZE - 1000)49 # upload zip file to S350 zip_file = testutil.create_lambda_archive(51 code_str, get_content=True, runtime=LAMBDA_RUNTIME_PYTHON3752 )53 s3_client.upload_fileobj(BytesIO(zip_file), s3_bucket, bucket_key)54 # create lambda function55 try:56 result = lambda_client.create_function(57 FunctionName=function_name,58 Runtime=LAMBDA_RUNTIME_PYTHON37,59 Handler=LAMBDA_DEFAULT_HANDLER,60 Role=lambda_su_role,61 Code={"S3Bucket": s3_bucket, "S3Key": bucket_key},62 Timeout=10,63 )64 function_arn = result["FunctionArn"]65 assert testutil.response_arn_matches_partition(lambda_client, function_arn)66 finally:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful