How to use _get_lambda_invocation_events method in localstack

Best Python code snippet using localstack_python

test_lambda_integration.py

Source:test_lambda_integration.py Github

copy

Full Screen

...441 for i in range(0, num_events_kinesis)442 ],443 StreamName=stream_name,444 )445 events = _get_lambda_invocation_events(446 logs_client, function_name, expected_num_events=1447 )448 records = events[0]["Records"]449 assert len(records) == num_events_kinesis450 for record in records:451 assert "eventID" in record452 assert "eventSourceARN" in record453 assert "eventSource" in record454 assert "eventVersion" in record455 assert "eventName" in record456 assert "invokeIdentityArn" in record457 assert "awsRegion" in record458 assert "kinesis" in record459 actual_record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")460 assert actual_record_data == record_data461 finally:462 lambda_client.delete_event_source_mapping(UUID=uuid)463 @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)464 def test_kinesis_event_source_mapping_with_async_invocation(465 self,466 lambda_client,467 kinesis_client,468 create_lambda_function,469 kinesis_create_stream,470 wait_for_stream_ready,471 logs_client,472 lambda_su_role,473 ):474 # TODO: this test will fail if `log_cli=true` is set and `LAMBDA_EXECUTOR=local`!475 # apparently this particular configuration prevents lambda logs from being extracted properly, giving the476 # appearance that the function was never invoked.477 try:478 function_name = f"lambda_func-{short_uid()}"479 stream_name = f"test-foobar-{short_uid()}"480 num_records_per_batch = 10481 num_batches = 2482 create_lambda_function(483 handler_file=TEST_LAMBDA_PARALLEL_FILE,484 func_name=function_name,485 runtime=LAMBDA_RUNTIME_PYTHON36,486 role=lambda_su_role,487 )488 kinesis_create_stream(StreamName=stream_name, ShardCount=1)489 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[490 "StreamDescription"491 ]["StreamARN"]492 wait_for_stream_ready(stream_name=stream_name)493 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)494 assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1495 uuid = lambda_client.create_event_source_mapping(496 EventSourceArn=stream_arn,497 FunctionName=function_name,498 StartingPosition="LATEST",499 BatchSize=num_records_per_batch,500 )["UUID"]501 _await_event_source_mapping_enabled(lambda_client, uuid)502 for i in range(num_batches):503 start = time.perf_counter()504 kinesis_client.put_records(505 Records=[506 {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}507 for j in range(0, num_records_per_batch)508 ],509 StreamName=stream_name,510 )511 assert (time.perf_counter() - start) < 1 # this should not take more than a second512 invocation_events = _get_lambda_invocation_events(513 logs_client, function_name, expected_num_events=num_batches514 )515 for i in range(num_batches):516 event = invocation_events[i]517 assert len(event["event"]["Records"]) == num_records_per_batch518 actual_record_ids = []519 for record in event["event"]["Records"]:520 assert "eventID" in record521 assert "eventSourceARN" in record522 assert "eventSource" in record523 assert "eventVersion" in record524 assert "eventName" in record525 assert "invokeIdentityArn" in record526 assert "awsRegion" in record527 assert "kinesis" in record528 record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")529 actual_record_id = json.loads(record_data)["record_id"]530 actual_record_ids.append(actual_record_id)531 actual_record_ids.sort()532 assert actual_record_ids == [i for i in range(num_records_per_batch)]533 assert (534 invocation_events[1]["executionStart"] - invocation_events[0]["executionStart"]535 ) > 5536 finally:537 lambda_client.delete_event_source_mapping(UUID=uuid)538 def test_kinesis_event_source_trim_horizon(539 self,540 lambda_client,541 kinesis_client,542 create_lambda_function,543 kinesis_create_stream,544 wait_for_stream_ready,545 logs_client,546 lambda_su_role,547 ):548 function_name = f"lambda_func-{short_uid()}"549 stream_name = f"test-foobar-{short_uid()}"550 num_records_per_batch = 10551 num_batches = 3552 try:553 create_lambda_function(554 handler_file=TEST_LAMBDA_PARALLEL_FILE,555 func_name=function_name,556 runtime=LAMBDA_RUNTIME_PYTHON36,557 role=lambda_su_role,558 )559 kinesis_create_stream(StreamName=stream_name, ShardCount=1)560 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[561 "StreamDescription"562 ]["StreamARN"]563 wait_for_stream_ready(stream_name=stream_name)564 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)565 assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1566 # insert some records before event source mapping created567 for i in range(num_batches - 1):568 kinesis_client.put_records(569 Records=[570 {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}571 for j in range(0, num_records_per_batch)572 ],573 StreamName=stream_name,574 )575 uuid = lambda_client.create_event_source_mapping(576 EventSourceArn=stream_arn,577 FunctionName=function_name,578 StartingPosition="TRIM_HORIZON",579 BatchSize=num_records_per_batch,580 )["UUID"]581 # insert some more records582 kinesis_client.put_records(583 Records=[584 {"Data": json.dumps({"record_id": i}), "PartitionKey": f"test_{num_batches}"}585 for i in range(0, num_records_per_batch)586 ],587 StreamName=stream_name,588 )589 invocation_events = _get_lambda_invocation_events(590 logs_client, function_name, expected_num_events=num_batches591 )592 for i in range(num_batches):593 event = invocation_events[i]594 assert len(event["event"]["Records"]) == num_records_per_batch595 actual_record_ids = []596 for record in event["event"]["Records"]:597 assert "eventID" in record598 assert "eventSourceARN" in record599 assert "eventSource" in record600 assert "eventVersion" in record601 assert "eventName" in record602 assert "invokeIdentityArn" in record603 assert "awsRegion" in record604 assert "kinesis" in record605 record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")606 actual_record_id = json.loads(record_data)["record_id"]607 actual_record_ids.append(actual_record_id)608 actual_record_ids.sort()609 assert actual_record_ids == [i for i in range(num_records_per_batch)]610 finally:611 lambda_client.delete_event_source_mapping(UUID=uuid)612 def test_disable_kinesis_event_source_mapping(613 self,614 lambda_client,615 kinesis_client,616 create_lambda_function,617 kinesis_create_stream,618 wait_for_stream_ready,619 logs_client,620 lambda_su_role,621 ):622 function_name = f"lambda_func-{short_uid()}"623 stream_name = f"test-foobar-{short_uid()}"624 num_records_per_batch = 10625 try:626 create_lambda_function(627 handler_file=TEST_LAMBDA_PYTHON_ECHO,628 func_name=function_name,629 runtime=LAMBDA_RUNTIME_PYTHON36,630 role=lambda_su_role,631 )632 kinesis_create_stream(StreamName=stream_name, ShardCount=1)633 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[634 "StreamDescription"635 ]["StreamARN"]636 wait_for_stream_ready(stream_name=stream_name)637 event_source_uuid = lambda_client.create_event_source_mapping(638 EventSourceArn=stream_arn,639 FunctionName=function_name,640 StartingPosition="LATEST",641 BatchSize=num_records_per_batch,642 )["UUID"]643 _await_event_source_mapping_enabled(lambda_client, event_source_uuid)644 kinesis_client.put_records(645 Records=[646 {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}647 for i in range(0, num_records_per_batch)648 ],649 StreamName=stream_name,650 )651 events = _get_lambda_invocation_events(652 logs_client, function_name, expected_num_events=1653 )654 assert len(events) == 1655 lambda_client.update_event_source_mapping(UUID=event_source_uuid, Enabled=False)656 time.sleep(2)657 kinesis_client.put_records(658 Records=[659 {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}660 for i in range(0, num_records_per_batch)661 ],662 StreamName=stream_name,663 )664 time.sleep(7) # wait for records to pass through stream665 # should still only get the first batch from before mapping was disabled666 events = _get_lambda_invocation_events(667 logs_client, function_name, expected_num_events=1668 )669 assert len(events) == 1670 finally:671 lambda_client.delete_event_source_mapping(UUID=event_source_uuid)672 def test_kinesis_event_source_mapping_with_on_failure_destination_config(673 self,674 lambda_client,675 create_lambda_function,676 sqs_client,677 sqs_queue_arn,678 sqs_create_queue,679 create_iam_role_with_policy,680 kinesis_client,681 wait_for_stream_ready,682 ):683 try:684 function_name = f"lambda_func-{short_uid()}"685 role = f"test-lambda-role-{short_uid()}"686 policy_name = f"test-lambda-policy-{short_uid()}"687 kinesis_name = f"test-kinesis-{short_uid()}"688 role_arn = create_iam_role_with_policy(689 RoleName=role,690 PolicyName=policy_name,691 RoleDefinition=lambda_role,692 PolicyDefinition=s3_lambda_permission,693 )694 create_lambda_function(695 handler_file=TEST_LAMBDA_PYTHON,696 func_name=function_name,697 runtime=LAMBDA_RUNTIME_PYTHON37,698 role=role_arn,699 )700 kinesis_client.create_stream(StreamName=kinesis_name, ShardCount=1)701 result = kinesis_client.describe_stream(StreamName=kinesis_name)["StreamDescription"]702 kinesis_arn = result["StreamARN"]703 wait_for_stream_ready(stream_name=kinesis_name)704 queue_event_source_mapping = sqs_create_queue()705 destination_queue = sqs_queue_arn(queue_event_source_mapping)706 destination_config = {"OnFailure": {"Destination": destination_queue}}707 message = {708 "input": "hello",709 "value": "world",710 lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1,711 }712 result = lambda_client.create_event_source_mapping(713 FunctionName=function_name,714 BatchSize=1,715 StartingPosition="LATEST",716 EventSourceArn=kinesis_arn,717 MaximumBatchingWindowInSeconds=1,718 MaximumRetryAttempts=1,719 DestinationConfig=destination_config,720 )721 event_source_mapping_uuid = result["UUID"]722 _await_event_source_mapping_enabled(lambda_client, event_source_mapping_uuid)723 kinesis_client.put_record(724 StreamName=kinesis_name, Data=to_bytes(json.dumps(message)), PartitionKey="custom"725 )726 def verify_failure_received():727 result = sqs_client.receive_message(QueueUrl=queue_event_source_mapping)728 msg = result["Messages"][0]729 body = json.loads(msg["Body"])730 assert body["requestContext"]["condition"] == "RetryAttemptsExhausted"731 assert body["KinesisBatchInfo"]["batchSize"] == 1732 assert body["KinesisBatchInfo"]["streamArn"] == kinesis_arn733 retry(verify_failure_received, retries=50, sleep=5, sleep_before=5)734 finally:735 kinesis_client.delete_stream(StreamName=kinesis_name, EnforceConsumerDeletion=True)736 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)737def _await_event_source_mapping_enabled(lambda_client, uuid, retries=30):738 def assert_mapping_enabled():739 assert lambda_client.get_event_source_mapping(UUID=uuid)["State"] == "Enabled"740 retry(assert_mapping_enabled, sleep_before=2, retries=retries)741def _await_dynamodb_table_active(dynamodb_client, table_name, retries=6):742 def assert_table_active():743 assert (744 dynamodb_client.describe_table(TableName=table_name)["Table"]["TableStatus"] == "ACTIVE"745 )746 retry(assert_table_active, retries=retries, sleep_before=2)747def _get_lambda_invocation_events(logs_client, function_name, expected_num_events, retries=30):748 def get_events():749 events = get_lambda_log_events(function_name, logs_client=logs_client)750 assert len(events) == expected_num_events751 return events...

Full Screen

Full Screen

test_lambda_integration_kinesis.py

Source:test_lambda_integration_kinesis.py Github

copy

Full Screen

...100 for i in range(0, num_events_kinesis)101 ],102 StreamName=stream_name,103 )104 return _get_lambda_invocation_events(105 logs_client, function_name, expected_num_events=1, retries=5106 )107 # need to retry here in case the LATEST StartingPosition of the event source mapping does not catch records108 events = retry(_send_and_receive_messages, retries=3)109 records = events[0]110 snapshot.match("kinesis_records", records)111 @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)112 @pytest.mark.aws_validated113 def test_kinesis_event_source_mapping_with_async_invocation(114 self,115 lambda_client,116 kinesis_client,117 create_lambda_function,118 kinesis_create_stream,119 wait_for_stream_ready,120 logs_client,121 lambda_su_role,122 cleanups,123 snapshot,124 ):125 # TODO: this test will fail if `log_cli=true` is set and `LAMBDA_EXECUTOR=local`!126 # apparently this particular configuration prevents lambda logs from being extracted properly, giving the127 # appearance that the function was never invoked.128 function_name = f"lambda_func-{short_uid()}"129 stream_name = f"test-foobar-{short_uid()}"130 num_records_per_batch = 10131 num_batches = 2132 create_lambda_function(133 handler_file=TEST_LAMBDA_PARALLEL_FILE,134 func_name=function_name,135 runtime=LAMBDA_RUNTIME_PYTHON39,136 role=lambda_su_role,137 )138 kinesis_create_stream(StreamName=stream_name, ShardCount=1)139 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][140 "StreamARN"141 ]142 wait_for_stream_ready(stream_name=stream_name)143 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)144 assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1145 create_event_source_mapping_response = lambda_client.create_event_source_mapping(146 EventSourceArn=stream_arn,147 FunctionName=function_name,148 StartingPosition="LATEST",149 BatchSize=num_records_per_batch,150 )151 snapshot.match("create_event_source_mapping_response", create_event_source_mapping_response)152 uuid = create_event_source_mapping_response["UUID"]153 cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=uuid))154 _await_event_source_mapping_enabled(lambda_client, uuid)155 def _send_and_receive_messages():156 for i in range(num_batches):157 start = time.perf_counter()158 kinesis_client.put_records(159 Records=[160 {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}161 for j in range(0, num_records_per_batch)162 ],163 StreamName=stream_name,164 )165 assert (time.perf_counter() - start) < 1 # this should not take more than a second166 return _get_lambda_invocation_events(167 logs_client, function_name, expected_num_events=num_batches, retries=5168 )169 # need to retry here in case the LATEST StartingPosition of the event source mapping does not catch records170 invocation_events = retry(_send_and_receive_messages, retries=3)171 snapshot.match("invocation_events", invocation_events)172 assert (invocation_events[1]["executionStart"] - invocation_events[0]["executionStart"]) > 5173 @pytest.mark.aws_validated174 def test_kinesis_event_source_trim_horizon(175 self,176 lambda_client,177 kinesis_client,178 create_lambda_function,179 kinesis_create_stream,180 wait_for_stream_ready,181 logs_client,182 lambda_su_role,183 cleanups,184 snapshot,185 ):186 function_name = f"lambda_func-{short_uid()}"187 stream_name = f"test-foobar-{short_uid()}"188 num_records_per_batch = 10189 num_batches = 3190 create_lambda_function(191 handler_file=TEST_LAMBDA_PARALLEL_FILE,192 func_name=function_name,193 runtime=LAMBDA_RUNTIME_PYTHON39,194 role=lambda_su_role,195 )196 kinesis_create_stream(StreamName=stream_name, ShardCount=1)197 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][198 "StreamARN"199 ]200 wait_for_stream_ready(stream_name=stream_name)201 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)202 assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1203 # insert some records before event source mapping created204 for i in range(num_batches - 1):205 kinesis_client.put_records(206 Records=[207 {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}208 for j in range(0, num_records_per_batch)209 ],210 StreamName=stream_name,211 )212 create_event_source_mapping_response = lambda_client.create_event_source_mapping(213 EventSourceArn=stream_arn,214 FunctionName=function_name,215 StartingPosition="TRIM_HORIZON",216 BatchSize=num_records_per_batch,217 )218 snapshot.match("create_event_source_mapping_response", create_event_source_mapping_response)219 uuid = create_event_source_mapping_response["UUID"]220 cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=uuid))221 # insert some more records222 kinesis_client.put_records(223 Records=[224 {"Data": json.dumps({"record_id": i}), "PartitionKey": f"test_{num_batches}"}225 for i in range(0, num_records_per_batch)226 ],227 StreamName=stream_name,228 )229 invocation_events = _get_lambda_invocation_events(230 logs_client, function_name, expected_num_events=num_batches231 )232 snapshot.match("invocation_events", invocation_events)233 @pytest.mark.aws_validated234 def test_disable_kinesis_event_source_mapping(235 self,236 lambda_client,237 kinesis_client,238 create_lambda_function,239 kinesis_create_stream,240 wait_for_stream_ready,241 logs_client,242 lambda_su_role,243 cleanups,244 snapshot,245 ):246 function_name = f"lambda_func-{short_uid()}"247 stream_name = f"test-foobar-{short_uid()}"248 num_records_per_batch = 10249 create_lambda_function(250 handler_file=TEST_LAMBDA_PYTHON_ECHO,251 func_name=function_name,252 runtime=LAMBDA_RUNTIME_PYTHON39,253 role=lambda_su_role,254 )255 kinesis_create_stream(StreamName=stream_name, ShardCount=1)256 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][257 "StreamARN"258 ]259 wait_for_stream_ready(stream_name=stream_name)260 create_event_source_mapping_response = lambda_client.create_event_source_mapping(261 EventSourceArn=stream_arn,262 FunctionName=function_name,263 StartingPosition="LATEST",264 BatchSize=num_records_per_batch,265 )266 snapshot.match("create_event_source_mapping_response", create_event_source_mapping_response)267 event_source_uuid = create_event_source_mapping_response["UUID"]268 cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=event_source_uuid))269 _await_event_source_mapping_enabled(lambda_client, event_source_uuid)270 def _send_and_receive_messages():271 kinesis_client.put_records(272 Records=[273 {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}274 for i in range(0, num_records_per_batch)275 ],276 StreamName=stream_name,277 )278 return _get_lambda_invocation_events(279 logs_client, function_name, expected_num_events=1, retries=10280 )281 invocation_events = retry(_send_and_receive_messages, retries=3)282 snapshot.match("invocation_events", invocation_events)283 lambda_client.update_event_source_mapping(UUID=event_source_uuid, Enabled=False)284 _await_event_source_mapping_state(lambda_client, event_source_uuid, state="Disabled")285 # we need to wait here, so the event source mapping is for sure disabled, sadly the state is no real indication286 if is_aws_cloud():287 time.sleep(60)288 kinesis_client.put_records(289 Records=[290 {"Data": json.dumps({"record_id_disabled": i}), "PartitionKey": "test"}291 for i in range(0, num_records_per_batch)292 ],293 StreamName=stream_name,294 )295 time.sleep(7) # wait for records to pass through stream296 # should still only get the first batch from before mapping was disabled297 _get_lambda_invocation_events(logs_client, function_name, expected_num_events=1, retries=10)298 @pytest.mark.skip_snapshot_verify(299 paths=[300 "$..Messages..Body.KinesisBatchInfo.approximateArrivalOfFirstRecord",301 "$..Messages..Body.KinesisBatchInfo.approximateArrivalOfLastRecord",302 "$..Messages..Body.KinesisBatchInfo.shardId",303 "$..Messages..Body.KinesisBatchInfo.streamArn",304 "$..Messages..Body.requestContext.approximateInvokeCount",305 "$..Messages..Body.requestContext.functionArn",306 "$..Messages..Body.responseContext.statusCode",307 # destination config arn missing, which leads to those having wrong resource ids308 "$..EventSourceArn",309 "$..FunctionArn",310 ],311 condition=is_old_provider,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful