How to use verify_failure_received method in localstack

Best Python code snippet using localstack_python

test_lambda_integration.py

Source:test_lambda_integration.py Github

copy

Full Screen

...360 )361 event_source_mapping_uuid = result["UUID"]362 _await_event_source_mapping_enabled(lambda_client, event_source_mapping_uuid)363 dynamodb_client.put_item(TableName=table_name, Item=item)364 def verify_failure_received():365 res = sqs_client.receive_message(QueueUrl=destination_queue)366 msg = res["Messages"][0]367 body = json.loads(msg["Body"])368 assert body["requestContext"]["condition"] == "RetryAttemptsExhausted"369 assert body["DDBStreamBatchInfo"]["batchSize"] == 1370 assert body["DDBStreamBatchInfo"]["streamArn"] in stream_arn371 retry(verify_failure_received, retries=5, sleep=5, sleep_before=5)372 finally:373 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)374class TestLambdaHttpInvocation:375 def test_http_invocation_with_apigw_proxy(self, create_lambda_function):376 lambda_name = f"test_lambda_{short_uid()}"377 lambda_resource = "/api/v1/{proxy+}"378 lambda_path = "/api/v1/hello/world"379 lambda_request_context_path = "/" + TEST_STAGE_NAME + lambda_path380 lambda_request_context_resource_path = lambda_resource381 create_lambda_function(382 func_name=lambda_name,383 handler_file=TEST_LAMBDA_PYTHON,384 libs=TEST_LAMBDA_LIBS,385 )386 # create API Gateway and connect it to the Lambda proxy backend387 lambda_uri = aws_stack.lambda_function_arn(lambda_name)388 target_uri = f"arn:aws:apigateway:{aws_stack.get_region()}:lambda:path/2015-03-31/functions/{lambda_uri}/invocations"389 result = testutil.connect_api_gateway_to_http_with_lambda_proxy(390 "test_gateway2",391 target_uri,392 path=lambda_resource,393 stage_name=TEST_STAGE_NAME,394 )395 api_id = result["id"]396 url = path_based_url(api_id=api_id, stage_name=TEST_STAGE_NAME, path=lambda_path)397 result = safe_requests.post(398 url, data=b"{}", headers={"User-Agent": "python-requests/testing"}399 )400 content = json.loads(result.content)401 assert lambda_path == content["path"]402 assert lambda_resource == content["resource"]403 assert lambda_request_context_path == content["requestContext"]["path"]404 assert lambda_request_context_resource_path == content["requestContext"]["resourcePath"]405class TestKinesisSource:406 def test_create_kinesis_event_source_mapping(407 self,408 create_lambda_function,409 lambda_client,410 kinesis_client,411 kinesis_create_stream,412 lambda_su_role,413 wait_for_stream_ready,414 logs_client,415 ):416 function_name = f"lambda_func-{short_uid()}"417 stream_name = f"test-foobar-{short_uid()}"418 record_data = "hello"419 num_events_kinesis = 10420 try:421 create_lambda_function(422 func_name=function_name,423 handler_file=TEST_LAMBDA_PYTHON_ECHO,424 runtime=LAMBDA_RUNTIME_PYTHON36,425 role=lambda_su_role,426 )427 kinesis_create_stream(StreamName=stream_name, ShardCount=1)428 wait_for_stream_ready(stream_name=stream_name)429 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)430 assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1431 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[432 "StreamDescription"433 ]["StreamARN"]434 uuid = lambda_client.create_event_source_mapping(435 EventSourceArn=stream_arn, FunctionName=function_name, StartingPosition="LATEST"436 )["UUID"]437 _await_event_source_mapping_enabled(lambda_client, uuid)438 kinesis_client.put_records(439 Records=[440 {"Data": record_data, "PartitionKey": f"test_{i}"}441 for i in range(0, num_events_kinesis)442 ],443 StreamName=stream_name,444 )445 events = _get_lambda_invocation_events(446 logs_client, function_name, expected_num_events=1447 )448 records = events[0]["Records"]449 assert len(records) == num_events_kinesis450 for record in records:451 assert "eventID" in record452 assert "eventSourceARN" in record453 assert "eventSource" in record454 assert "eventVersion" in record455 assert "eventName" in record456 assert "invokeIdentityArn" in record457 assert "awsRegion" in record458 assert "kinesis" in record459 actual_record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")460 assert actual_record_data == record_data461 finally:462 lambda_client.delete_event_source_mapping(UUID=uuid)463 @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)464 def test_kinesis_event_source_mapping_with_async_invocation(465 self,466 lambda_client,467 kinesis_client,468 create_lambda_function,469 kinesis_create_stream,470 wait_for_stream_ready,471 logs_client,472 lambda_su_role,473 ):474 # TODO: this test will fail if `log_cli=true` is set and `LAMBDA_EXECUTOR=local`!475 # apparently this particular configuration prevents lambda logs from being extracted properly, giving the476 # appearance that the function was never invoked.477 try:478 function_name = f"lambda_func-{short_uid()}"479 stream_name = f"test-foobar-{short_uid()}"480 num_records_per_batch = 10481 num_batches = 2482 create_lambda_function(483 handler_file=TEST_LAMBDA_PARALLEL_FILE,484 func_name=function_name,485 runtime=LAMBDA_RUNTIME_PYTHON36,486 role=lambda_su_role,487 )488 kinesis_create_stream(StreamName=stream_name, ShardCount=1)489 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[490 "StreamDescription"491 ]["StreamARN"]492 wait_for_stream_ready(stream_name=stream_name)493 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)494 assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1495 uuid = lambda_client.create_event_source_mapping(496 EventSourceArn=stream_arn,497 FunctionName=function_name,498 StartingPosition="LATEST",499 BatchSize=num_records_per_batch,500 )["UUID"]501 _await_event_source_mapping_enabled(lambda_client, uuid)502 for i in range(num_batches):503 start = time.perf_counter()504 kinesis_client.put_records(505 Records=[506 {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}507 for j in range(0, num_records_per_batch)508 ],509 StreamName=stream_name,510 )511 assert (time.perf_counter() - start) < 1 # this should not take more than a second512 invocation_events = _get_lambda_invocation_events(513 logs_client, function_name, expected_num_events=num_batches514 )515 for i in range(num_batches):516 event = invocation_events[i]517 assert len(event["event"]["Records"]) == num_records_per_batch518 actual_record_ids = []519 for record in event["event"]["Records"]:520 assert "eventID" in record521 assert "eventSourceARN" in record522 assert "eventSource" in record523 assert "eventVersion" in record524 assert "eventName" in record525 assert "invokeIdentityArn" in record526 assert "awsRegion" in record527 assert "kinesis" in record528 record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")529 actual_record_id = json.loads(record_data)["record_id"]530 actual_record_ids.append(actual_record_id)531 actual_record_ids.sort()532 assert actual_record_ids == [i for i in range(num_records_per_batch)]533 assert (534 invocation_events[1]["executionStart"] - invocation_events[0]["executionStart"]535 ) > 5536 finally:537 lambda_client.delete_event_source_mapping(UUID=uuid)538 def test_kinesis_event_source_trim_horizon(539 self,540 lambda_client,541 kinesis_client,542 create_lambda_function,543 kinesis_create_stream,544 wait_for_stream_ready,545 logs_client,546 lambda_su_role,547 ):548 function_name = f"lambda_func-{short_uid()}"549 stream_name = f"test-foobar-{short_uid()}"550 num_records_per_batch = 10551 num_batches = 3552 try:553 create_lambda_function(554 handler_file=TEST_LAMBDA_PARALLEL_FILE,555 func_name=function_name,556 runtime=LAMBDA_RUNTIME_PYTHON36,557 role=lambda_su_role,558 )559 kinesis_create_stream(StreamName=stream_name, ShardCount=1)560 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[561 "StreamDescription"562 ]["StreamARN"]563 wait_for_stream_ready(stream_name=stream_name)564 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)565 assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1566 # insert some records before event source mapping created567 for i in range(num_batches - 1):568 kinesis_client.put_records(569 Records=[570 {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}571 for j in range(0, num_records_per_batch)572 ],573 StreamName=stream_name,574 )575 uuid = lambda_client.create_event_source_mapping(576 EventSourceArn=stream_arn,577 FunctionName=function_name,578 StartingPosition="TRIM_HORIZON",579 BatchSize=num_records_per_batch,580 )["UUID"]581 # insert some more records582 kinesis_client.put_records(583 Records=[584 {"Data": json.dumps({"record_id": i}), "PartitionKey": f"test_{num_batches}"}585 for i in range(0, num_records_per_batch)586 ],587 StreamName=stream_name,588 )589 invocation_events = _get_lambda_invocation_events(590 logs_client, function_name, expected_num_events=num_batches591 )592 for i in range(num_batches):593 event = invocation_events[i]594 assert len(event["event"]["Records"]) == num_records_per_batch595 actual_record_ids = []596 for record in event["event"]["Records"]:597 assert "eventID" in record598 assert "eventSourceARN" in record599 assert "eventSource" in record600 assert "eventVersion" in record601 assert "eventName" in record602 assert "invokeIdentityArn" in record603 assert "awsRegion" in record604 assert "kinesis" in record605 record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")606 actual_record_id = json.loads(record_data)["record_id"]607 actual_record_ids.append(actual_record_id)608 actual_record_ids.sort()609 assert actual_record_ids == [i for i in range(num_records_per_batch)]610 finally:611 lambda_client.delete_event_source_mapping(UUID=uuid)612 def test_disable_kinesis_event_source_mapping(613 self,614 lambda_client,615 kinesis_client,616 create_lambda_function,617 kinesis_create_stream,618 wait_for_stream_ready,619 logs_client,620 lambda_su_role,621 ):622 function_name = f"lambda_func-{short_uid()}"623 stream_name = f"test-foobar-{short_uid()}"624 num_records_per_batch = 10625 try:626 create_lambda_function(627 handler_file=TEST_LAMBDA_PYTHON_ECHO,628 func_name=function_name,629 runtime=LAMBDA_RUNTIME_PYTHON36,630 role=lambda_su_role,631 )632 kinesis_create_stream(StreamName=stream_name, ShardCount=1)633 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[634 "StreamDescription"635 ]["StreamARN"]636 wait_for_stream_ready(stream_name=stream_name)637 event_source_uuid = lambda_client.create_event_source_mapping(638 EventSourceArn=stream_arn,639 FunctionName=function_name,640 StartingPosition="LATEST",641 BatchSize=num_records_per_batch,642 )["UUID"]643 _await_event_source_mapping_enabled(lambda_client, event_source_uuid)644 kinesis_client.put_records(645 Records=[646 {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}647 for i in range(0, num_records_per_batch)648 ],649 StreamName=stream_name,650 )651 events = _get_lambda_invocation_events(652 logs_client, function_name, expected_num_events=1653 )654 assert len(events) == 1655 lambda_client.update_event_source_mapping(UUID=event_source_uuid, Enabled=False)656 time.sleep(2)657 kinesis_client.put_records(658 Records=[659 {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}660 for i in range(0, num_records_per_batch)661 ],662 StreamName=stream_name,663 )664 time.sleep(7) # wait for records to pass through stream665 # should still only get the first batch from before mapping was disabled666 events = _get_lambda_invocation_events(667 logs_client, function_name, expected_num_events=1668 )669 assert len(events) == 1670 finally:671 lambda_client.delete_event_source_mapping(UUID=event_source_uuid)672 def test_kinesis_event_source_mapping_with_on_failure_destination_config(673 self,674 lambda_client,675 create_lambda_function,676 sqs_client,677 sqs_queue_arn,678 sqs_create_queue,679 create_iam_role_with_policy,680 kinesis_client,681 wait_for_stream_ready,682 ):683 try:684 function_name = f"lambda_func-{short_uid()}"685 role = f"test-lambda-role-{short_uid()}"686 policy_name = f"test-lambda-policy-{short_uid()}"687 kinesis_name = f"test-kinesis-{short_uid()}"688 role_arn = create_iam_role_with_policy(689 RoleName=role,690 PolicyName=policy_name,691 RoleDefinition=lambda_role,692 PolicyDefinition=s3_lambda_permission,693 )694 create_lambda_function(695 handler_file=TEST_LAMBDA_PYTHON,696 func_name=function_name,697 runtime=LAMBDA_RUNTIME_PYTHON37,698 role=role_arn,699 )700 kinesis_client.create_stream(StreamName=kinesis_name, ShardCount=1)701 result = kinesis_client.describe_stream(StreamName=kinesis_name)["StreamDescription"]702 kinesis_arn = result["StreamARN"]703 wait_for_stream_ready(stream_name=kinesis_name)704 queue_event_source_mapping = sqs_create_queue()705 destination_queue = sqs_queue_arn(queue_event_source_mapping)706 destination_config = {"OnFailure": {"Destination": destination_queue}}707 message = {708 "input": "hello",709 "value": "world",710 lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1,711 }712 result = lambda_client.create_event_source_mapping(713 FunctionName=function_name,714 BatchSize=1,715 StartingPosition="LATEST",716 EventSourceArn=kinesis_arn,717 MaximumBatchingWindowInSeconds=1,718 MaximumRetryAttempts=1,719 DestinationConfig=destination_config,720 )721 event_source_mapping_uuid = result["UUID"]722 _await_event_source_mapping_enabled(lambda_client, event_source_mapping_uuid)723 kinesis_client.put_record(724 StreamName=kinesis_name, Data=to_bytes(json.dumps(message)), PartitionKey="custom"725 )726 def verify_failure_received():727 result = sqs_client.receive_message(QueueUrl=queue_event_source_mapping)728 msg = result["Messages"][0]729 body = json.loads(msg["Body"])730 assert body["requestContext"]["condition"] == "RetryAttemptsExhausted"731 assert body["KinesisBatchInfo"]["batchSize"] == 1732 assert body["KinesisBatchInfo"]["streamArn"] == kinesis_arn733 retry(verify_failure_received, retries=50, sleep=5, sleep_before=5)734 finally:735 kinesis_client.delete_stream(StreamName=kinesis_name, EnforceConsumerDeletion=True)736 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)737def _await_event_source_mapping_enabled(lambda_client, uuid, retries=30):738 def assert_mapping_enabled():739 assert lambda_client.get_event_source_mapping(UUID=uuid)["State"] == "Enabled"740 retry(assert_mapping_enabled, sleep_before=2, retries=retries)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful