How to use _await_event_source_mapping_enabled method in localstack

Best Python code snippet using localstack_python

test_lambda_integration.py

Source:test_lambda_integration.py Github

copy

Full Screen

...96 EventSourceArn=queue_arn_1, FunctionName=function_name97 )98 uuid = rs["UUID"]99 assert BATCH_SIZE_RANGES["sqs"][0] == rs["BatchSize"]100 _await_event_source_mapping_enabled(lambda_client, uuid)101 with pytest.raises(ClientError) as e:102 # Update batch size with invalid value103 lambda_client.update_event_source_mapping(104 UUID=uuid,105 FunctionName=function_name,106 BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,107 )108 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)109 queue_url_2 = sqs_create_queue(QueueName=queue_name_2)110 queue_arn_2 = sqs_queue_arn(queue_url_2)111 with pytest.raises(ClientError) as e:112 # Create event source mapping with invalid batch size value113 lambda_client.create_event_source_mapping(114 EventSourceArn=queue_arn_2,115 FunctionName=function_name,116 BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,117 )118 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)119 finally:120 lambda_client.delete_event_source_mapping(UUID=uuid)121 def test_sqs_event_source_mapping(122 self,123 create_lambda_function,124 lambda_client,125 sqs_client,126 sqs_create_queue,127 sqs_queue_arn,128 logs_client,129 lambda_su_role,130 ):131 function_name = f"lambda_func-{short_uid()}"132 queue_name_1 = f"queue-{short_uid()}-1"133 try:134 create_lambda_function(135 func_name=function_name,136 handler_file=TEST_LAMBDA_PYTHON_ECHO,137 runtime=LAMBDA_RUNTIME_PYTHON36,138 role=lambda_su_role,139 )140 queue_url_1 = sqs_create_queue(QueueName=queue_name_1)141 queue_arn_1 = sqs_queue_arn(queue_url_1)142 mapping_uuid = lambda_client.create_event_source_mapping(143 EventSourceArn=queue_arn_1,144 FunctionName=function_name,145 MaximumBatchingWindowInSeconds=1,146 )["UUID"]147 _await_event_source_mapping_enabled(lambda_client, mapping_uuid)148 sqs_client.send_message(QueueUrl=queue_url_1, MessageBody=json.dumps({"foo": "bar"}))149 retry(150 check_expected_lambda_log_events_length,151 retries=10,152 sleep=1,153 function_name=function_name,154 expected_length=1,155 logs_client=logs_client,156 )157 rs = sqs_client.receive_message(QueueUrl=queue_url_1)158 assert rs.get("Messages") is None159 finally:160 lambda_client.delete_event_source_mapping(UUID=mapping_uuid)161class TestDynamoDBEventSourceMapping:162 def test_dynamodb_event_source_mapping(163 self,164 lambda_client,165 create_lambda_function,166 create_iam_role_with_policy,167 dynamodb_client,168 dynamodb_create_table,169 logs_client,170 check_lambda_logs,171 ):172 def check_logs():173 expected = [174 r'.*"Records":.*',175 r'.*"dynamodb": {(.*)}.*',176 r'.*"eventSource": ("aws:dynamodb").*',177 r'.*"eventName": ("INSERT").*',178 r'.*"Keys": {0}.*'.format(json.dumps(db_item)),179 ]180 check_lambda_logs(function_name, expected_lines=expected)181 function_name = f"lambda_func-{short_uid()}"182 role = f"test-lambda-role-{short_uid()}"183 policy_name = f"test-lambda-policy-{short_uid()}"184 table_name = f"test-table-{short_uid()}"185 partition_key = "my_partition_key"186 db_item = {partition_key: {"S": "hello world"}}187 try:188 role_arn = create_iam_role_with_policy(189 RoleName=role,190 PolicyName=policy_name,191 RoleDefinition=lambda_role,192 PolicyDefinition=s3_lambda_permission,193 )194 create_lambda_function(195 handler_file=TEST_LAMBDA_PYTHON_ECHO,196 func_name=function_name,197 runtime=LAMBDA_RUNTIME_PYTHON37,198 role=role_arn,199 )200 dynamodb_create_table(table_name=table_name, partition_key=partition_key)201 _await_dynamodb_table_active(dynamodb_client, table_name)202 stream_arn = dynamodb_client.update_table(203 TableName=table_name,204 StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_IMAGE"},205 )["TableDescription"]["LatestStreamArn"]206 event_source_uuid = lambda_client.create_event_source_mapping(207 FunctionName=function_name,208 BatchSize=1,209 StartingPosition="LATEST",210 EventSourceArn=stream_arn,211 MaximumBatchingWindowInSeconds=1,212 MaximumRetryAttempts=1,213 )["UUID"]214 _await_event_source_mapping_enabled(lambda_client, event_source_uuid)215 dynamodb_client.put_item(TableName=table_name, Item=db_item)216 retry(check_logs, retries=50, sleep=2)217 finally:218 lambda_client.delete_event_source_mapping(UUID=event_source_uuid)219 def test_disabled_dynamodb_event_source_mapping(220 self,221 create_lambda_function,222 lambda_client,223 dynamodb_resource,224 dynamodb_client,225 dynamodb_create_table,226 logs_client,227 dynamodbstreams_client,228 lambda_su_role,229 ):230 def is_stream_enabled():231 return (232 dynamodbstreams_client.describe_stream(StreamArn=latest_stream_arn)[233 "StreamDescription"234 ]["StreamStatus"]235 == "ENABLED"236 )237 function_name = f"lambda_func-{short_uid()}"238 ddb_table = f"ddb_table-{short_uid()}"239 items = [240 {"id": short_uid(), "data": "data1"},241 {"id": short_uid(), "data": "data2"},242 ]243 try:244 create_lambda_function(245 func_name=function_name,246 handler_file=TEST_LAMBDA_PYTHON_ECHO,247 runtime=LAMBDA_RUNTIME_PYTHON36,248 role=lambda_su_role,249 )250 latest_stream_arn = dynamodb_create_table(251 table_name=ddb_table, partition_key="id", stream_view_type="NEW_IMAGE"252 )["TableDescription"]["LatestStreamArn"]253 rs = lambda_client.create_event_source_mapping(254 FunctionName=function_name,255 EventSourceArn=latest_stream_arn,256 StartingPosition="TRIM_HORIZON",257 MaximumBatchingWindowInSeconds=1,258 )259 uuid = rs["UUID"]260 _await_event_source_mapping_enabled(lambda_client, uuid)261 assert poll_condition(is_stream_enabled, timeout=30)262 table = dynamodb_resource.Table(ddb_table)263 table.put_item(Item=items[0])264 # Lambda should be invoked 1 time265 retry(266 check_expected_lambda_log_events_length,267 retries=10,268 sleep=3,269 function_name=function_name,270 expected_length=1,271 logs_client=logs_client,272 )273 # disable event source mapping274 lambda_client.update_event_source_mapping(UUID=uuid, Enabled=False)275 time.sleep(2)276 table.put_item(Item=items[1])277 # lambda no longer invoked, still have 1 event278 check_expected_lambda_log_events_length(279 expected_length=1, function_name=function_name, logs_client=logs_client280 )281 finally:282 lambda_client.delete_event_source_mapping(UUID=uuid)283 # TODO invalid test against AWS, this behavior just is not correct284 def test_deletion_event_source_mapping_with_dynamodb(285 self, create_lambda_function, lambda_client, dynamodb_client, lambda_su_role286 ):287 function_name = f"lambda_func-{short_uid()}"288 ddb_table = f"ddb_table-{short_uid()}"289 create_lambda_function(290 func_name=function_name,291 handler_file=TEST_LAMBDA_PYTHON_ECHO,292 runtime=LAMBDA_RUNTIME_PYTHON36,293 role=lambda_su_role,294 )295 latest_stream_arn = aws_stack.create_dynamodb_table(296 table_name=ddb_table,297 partition_key="id",298 client=dynamodb_client,299 stream_view_type="NEW_IMAGE",300 )["TableDescription"]["LatestStreamArn"]301 lambda_client.create_event_source_mapping(302 FunctionName=function_name,303 EventSourceArn=latest_stream_arn,304 StartingPosition="TRIM_HORIZON",305 )306 _await_dynamodb_table_active(dynamodb_client, ddb_table)307 dynamodb_client.delete_table(TableName=ddb_table)308 result = lambda_client.list_event_source_mappings(EventSourceArn=latest_stream_arn)309 assert 1 == len(result["EventSourceMappings"])310 def test_dynamodb_event_source_mapping_with_on_failure_destination_config(311 self,312 lambda_client,313 create_lambda_function,314 sqs_client,315 sqs_queue_arn,316 sqs_create_queue,317 create_iam_role_with_policy,318 dynamodb_client,319 dynamodb_create_table,320 ):321 function_name = f"lambda_func-{short_uid()}"322 role = f"test-lambda-role-{short_uid()}"323 policy_name = f"test-lambda-policy-{short_uid()}"324 table_name = f"test-table-{short_uid()}"325 partition_key = "my_partition_key"326 item = {partition_key: {"S": "hello world"}}327 try:328 role_arn = create_iam_role_with_policy(329 RoleName=role,330 PolicyName=policy_name,331 RoleDefinition=lambda_role,332 PolicyDefinition=s3_lambda_permission,333 )334 create_lambda_function(335 handler_file=TEST_LAMBDA_PYTHON_UNHANDLED_ERROR,336 func_name=function_name,337 runtime=LAMBDA_RUNTIME_PYTHON37,338 role=role_arn,339 )340 dynamodb_create_table(table_name=table_name, partition_key=partition_key)341 _await_dynamodb_table_active(dynamodb_client, table_name)342 result = dynamodb_client.update_table(343 TableName=table_name,344 StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_IMAGE"},345 )346 stream_arn = result["TableDescription"]["LatestStreamArn"]347 destination_queue = sqs_create_queue()348 queue_failure_event_source_mapping_arn = sqs_queue_arn(destination_queue)349 destination_config = {350 "OnFailure": {"Destination": queue_failure_event_source_mapping_arn}351 }352 result = lambda_client.create_event_source_mapping(353 FunctionName=function_name,354 BatchSize=1,355 StartingPosition="LATEST",356 EventSourceArn=stream_arn,357 MaximumBatchingWindowInSeconds=1,358 MaximumRetryAttempts=1,359 DestinationConfig=destination_config,360 )361 event_source_mapping_uuid = result["UUID"]362 _await_event_source_mapping_enabled(lambda_client, event_source_mapping_uuid)363 dynamodb_client.put_item(TableName=table_name, Item=item)364 def verify_failure_received():365 res = sqs_client.receive_message(QueueUrl=destination_queue)366 msg = res["Messages"][0]367 body = json.loads(msg["Body"])368 assert body["requestContext"]["condition"] == "RetryAttemptsExhausted"369 assert body["DDBStreamBatchInfo"]["batchSize"] == 1370 assert body["DDBStreamBatchInfo"]["streamArn"] in stream_arn371 retry(verify_failure_received, retries=5, sleep=5, sleep_before=5)372 finally:373 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)374class TestLambdaHttpInvocation:375 def test_http_invocation_with_apigw_proxy(self, create_lambda_function):376 lambda_name = f"test_lambda_{short_uid()}"377 lambda_resource = "/api/v1/{proxy+}"378 lambda_path = "/api/v1/hello/world"379 lambda_request_context_path = "/" + TEST_STAGE_NAME + lambda_path380 lambda_request_context_resource_path = lambda_resource381 create_lambda_function(382 func_name=lambda_name,383 handler_file=TEST_LAMBDA_PYTHON,384 libs=TEST_LAMBDA_LIBS,385 )386 # create API Gateway and connect it to the Lambda proxy backend387 lambda_uri = aws_stack.lambda_function_arn(lambda_name)388 target_uri = f"arn:aws:apigateway:{aws_stack.get_region()}:lambda:path/2015-03-31/functions/{lambda_uri}/invocations"389 result = testutil.connect_api_gateway_to_http_with_lambda_proxy(390 "test_gateway2",391 target_uri,392 path=lambda_resource,393 stage_name=TEST_STAGE_NAME,394 )395 api_id = result["id"]396 url = path_based_url(api_id=api_id, stage_name=TEST_STAGE_NAME, path=lambda_path)397 result = safe_requests.post(398 url, data=b"{}", headers={"User-Agent": "python-requests/testing"}399 )400 content = json.loads(result.content)401 assert lambda_path == content["path"]402 assert lambda_resource == content["resource"]403 assert lambda_request_context_path == content["requestContext"]["path"]404 assert lambda_request_context_resource_path == content["requestContext"]["resourcePath"]405class TestKinesisSource:406 def test_create_kinesis_event_source_mapping(407 self,408 create_lambda_function,409 lambda_client,410 kinesis_client,411 kinesis_create_stream,412 lambda_su_role,413 wait_for_stream_ready,414 logs_client,415 ):416 function_name = f"lambda_func-{short_uid()}"417 stream_name = f"test-foobar-{short_uid()}"418 record_data = "hello"419 num_events_kinesis = 10420 try:421 create_lambda_function(422 func_name=function_name,423 handler_file=TEST_LAMBDA_PYTHON_ECHO,424 runtime=LAMBDA_RUNTIME_PYTHON36,425 role=lambda_su_role,426 )427 kinesis_create_stream(StreamName=stream_name, ShardCount=1)428 wait_for_stream_ready(stream_name=stream_name)429 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)430 assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1431 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[432 "StreamDescription"433 ]["StreamARN"]434 uuid = lambda_client.create_event_source_mapping(435 EventSourceArn=stream_arn, FunctionName=function_name, StartingPosition="LATEST"436 )["UUID"]437 _await_event_source_mapping_enabled(lambda_client, uuid)438 kinesis_client.put_records(439 Records=[440 {"Data": record_data, "PartitionKey": f"test_{i}"}441 for i in range(0, num_events_kinesis)442 ],443 StreamName=stream_name,444 )445 events = _get_lambda_invocation_events(446 logs_client, function_name, expected_num_events=1447 )448 records = events[0]["Records"]449 assert len(records) == num_events_kinesis450 for record in records:451 assert "eventID" in record452 assert "eventSourceARN" in record453 assert "eventSource" in record454 assert "eventVersion" in record455 assert "eventName" in record456 assert "invokeIdentityArn" in record457 assert "awsRegion" in record458 assert "kinesis" in record459 actual_record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")460 assert actual_record_data == record_data461 finally:462 lambda_client.delete_event_source_mapping(UUID=uuid)463 @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)464 def test_kinesis_event_source_mapping_with_async_invocation(465 self,466 lambda_client,467 kinesis_client,468 create_lambda_function,469 kinesis_create_stream,470 wait_for_stream_ready,471 logs_client,472 lambda_su_role,473 ):474 # TODO: this test will fail if `log_cli=true` is set and `LAMBDA_EXECUTOR=local`!475 # apparently this particular configuration prevents lambda logs from being extracted properly, giving the476 # appearance that the function was never invoked.477 try:478 function_name = f"lambda_func-{short_uid()}"479 stream_name = f"test-foobar-{short_uid()}"480 num_records_per_batch = 10481 num_batches = 2482 create_lambda_function(483 handler_file=TEST_LAMBDA_PARALLEL_FILE,484 func_name=function_name,485 runtime=LAMBDA_RUNTIME_PYTHON36,486 role=lambda_su_role,487 )488 kinesis_create_stream(StreamName=stream_name, ShardCount=1)489 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[490 "StreamDescription"491 ]["StreamARN"]492 wait_for_stream_ready(stream_name=stream_name)493 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)494 assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1495 uuid = lambda_client.create_event_source_mapping(496 EventSourceArn=stream_arn,497 FunctionName=function_name,498 StartingPosition="LATEST",499 BatchSize=num_records_per_batch,500 )["UUID"]501 _await_event_source_mapping_enabled(lambda_client, uuid)502 for i in range(num_batches):503 start = time.perf_counter()504 kinesis_client.put_records(505 Records=[506 {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}507 for j in range(0, num_records_per_batch)508 ],509 StreamName=stream_name,510 )511 assert (time.perf_counter() - start) < 1 # this should not take more than a second512 invocation_events = _get_lambda_invocation_events(513 logs_client, function_name, expected_num_events=num_batches514 )515 for i in range(num_batches):516 event = invocation_events[i]517 assert len(event["event"]["Records"]) == num_records_per_batch518 actual_record_ids = []519 for record in event["event"]["Records"]:520 assert "eventID" in record521 assert "eventSourceARN" in record522 assert "eventSource" in record523 assert "eventVersion" in record524 assert "eventName" in record525 assert "invokeIdentityArn" in record526 assert "awsRegion" in record527 assert "kinesis" in record528 record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")529 actual_record_id = json.loads(record_data)["record_id"]530 actual_record_ids.append(actual_record_id)531 actual_record_ids.sort()532 assert actual_record_ids == [i for i in range(num_records_per_batch)]533 assert (534 invocation_events[1]["executionStart"] - invocation_events[0]["executionStart"]535 ) > 5536 finally:537 lambda_client.delete_event_source_mapping(UUID=uuid)538 def test_kinesis_event_source_trim_horizon(539 self,540 lambda_client,541 kinesis_client,542 create_lambda_function,543 kinesis_create_stream,544 wait_for_stream_ready,545 logs_client,546 lambda_su_role,547 ):548 function_name = f"lambda_func-{short_uid()}"549 stream_name = f"test-foobar-{short_uid()}"550 num_records_per_batch = 10551 num_batches = 3552 try:553 create_lambda_function(554 handler_file=TEST_LAMBDA_PARALLEL_FILE,555 func_name=function_name,556 runtime=LAMBDA_RUNTIME_PYTHON36,557 role=lambda_su_role,558 )559 kinesis_create_stream(StreamName=stream_name, ShardCount=1)560 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[561 "StreamDescription"562 ]["StreamARN"]563 wait_for_stream_ready(stream_name=stream_name)564 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)565 assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1566 # insert some records before event source mapping created567 for i in range(num_batches - 1):568 kinesis_client.put_records(569 Records=[570 {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}571 for j in range(0, num_records_per_batch)572 ],573 StreamName=stream_name,574 )575 uuid = lambda_client.create_event_source_mapping(576 EventSourceArn=stream_arn,577 FunctionName=function_name,578 StartingPosition="TRIM_HORIZON",579 BatchSize=num_records_per_batch,580 )["UUID"]581 # insert some more records582 kinesis_client.put_records(583 Records=[584 {"Data": json.dumps({"record_id": i}), "PartitionKey": f"test_{num_batches}"}585 for i in range(0, num_records_per_batch)586 ],587 StreamName=stream_name,588 )589 invocation_events = _get_lambda_invocation_events(590 logs_client, function_name, expected_num_events=num_batches591 )592 for i in range(num_batches):593 event = invocation_events[i]594 assert len(event["event"]["Records"]) == num_records_per_batch595 actual_record_ids = []596 for record in event["event"]["Records"]:597 assert "eventID" in record598 assert "eventSourceARN" in record599 assert "eventSource" in record600 assert "eventVersion" in record601 assert "eventName" in record602 assert "invokeIdentityArn" in record603 assert "awsRegion" in record604 assert "kinesis" in record605 record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")606 actual_record_id = json.loads(record_data)["record_id"]607 actual_record_ids.append(actual_record_id)608 actual_record_ids.sort()609 assert actual_record_ids == [i for i in range(num_records_per_batch)]610 finally:611 lambda_client.delete_event_source_mapping(UUID=uuid)612 def test_disable_kinesis_event_source_mapping(613 self,614 lambda_client,615 kinesis_client,616 create_lambda_function,617 kinesis_create_stream,618 wait_for_stream_ready,619 logs_client,620 lambda_su_role,621 ):622 function_name = f"lambda_func-{short_uid()}"623 stream_name = f"test-foobar-{short_uid()}"624 num_records_per_batch = 10625 try:626 create_lambda_function(627 handler_file=TEST_LAMBDA_PYTHON_ECHO,628 func_name=function_name,629 runtime=LAMBDA_RUNTIME_PYTHON36,630 role=lambda_su_role,631 )632 kinesis_create_stream(StreamName=stream_name, ShardCount=1)633 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[634 "StreamDescription"635 ]["StreamARN"]636 wait_for_stream_ready(stream_name=stream_name)637 event_source_uuid = lambda_client.create_event_source_mapping(638 EventSourceArn=stream_arn,639 FunctionName=function_name,640 StartingPosition="LATEST",641 BatchSize=num_records_per_batch,642 )["UUID"]643 _await_event_source_mapping_enabled(lambda_client, event_source_uuid)644 kinesis_client.put_records(645 Records=[646 {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}647 for i in range(0, num_records_per_batch)648 ],649 StreamName=stream_name,650 )651 events = _get_lambda_invocation_events(652 logs_client, function_name, expected_num_events=1653 )654 assert len(events) == 1655 lambda_client.update_event_source_mapping(UUID=event_source_uuid, Enabled=False)656 time.sleep(2)657 kinesis_client.put_records(658 Records=[659 {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}660 for i in range(0, num_records_per_batch)661 ],662 StreamName=stream_name,663 )664 time.sleep(7) # wait for records to pass through stream665 # should still only get the first batch from before mapping was disabled666 events = _get_lambda_invocation_events(667 logs_client, function_name, expected_num_events=1668 )669 assert len(events) == 1670 finally:671 lambda_client.delete_event_source_mapping(UUID=event_source_uuid)672 def test_kinesis_event_source_mapping_with_on_failure_destination_config(673 self,674 lambda_client,675 create_lambda_function,676 sqs_client,677 sqs_queue_arn,678 sqs_create_queue,679 create_iam_role_with_policy,680 kinesis_client,681 wait_for_stream_ready,682 ):683 try:684 function_name = f"lambda_func-{short_uid()}"685 role = f"test-lambda-role-{short_uid()}"686 policy_name = f"test-lambda-policy-{short_uid()}"687 kinesis_name = f"test-kinesis-{short_uid()}"688 role_arn = create_iam_role_with_policy(689 RoleName=role,690 PolicyName=policy_name,691 RoleDefinition=lambda_role,692 PolicyDefinition=s3_lambda_permission,693 )694 create_lambda_function(695 handler_file=TEST_LAMBDA_PYTHON,696 func_name=function_name,697 runtime=LAMBDA_RUNTIME_PYTHON37,698 role=role_arn,699 )700 kinesis_client.create_stream(StreamName=kinesis_name, ShardCount=1)701 result = kinesis_client.describe_stream(StreamName=kinesis_name)["StreamDescription"]702 kinesis_arn = result["StreamARN"]703 wait_for_stream_ready(stream_name=kinesis_name)704 queue_event_source_mapping = sqs_create_queue()705 destination_queue = sqs_queue_arn(queue_event_source_mapping)706 destination_config = {"OnFailure": {"Destination": destination_queue}}707 message = {708 "input": "hello",709 "value": "world",710 lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1,711 }712 result = lambda_client.create_event_source_mapping(713 FunctionName=function_name,714 BatchSize=1,715 StartingPosition="LATEST",716 EventSourceArn=kinesis_arn,717 MaximumBatchingWindowInSeconds=1,718 MaximumRetryAttempts=1,719 DestinationConfig=destination_config,720 )721 event_source_mapping_uuid = result["UUID"]722 _await_event_source_mapping_enabled(lambda_client, event_source_mapping_uuid)723 kinesis_client.put_record(724 StreamName=kinesis_name, Data=to_bytes(json.dumps(message)), PartitionKey="custom"725 )726 def verify_failure_received():727 result = sqs_client.receive_message(QueueUrl=queue_event_source_mapping)728 msg = result["Messages"][0]729 body = json.loads(msg["Body"])730 assert body["requestContext"]["condition"] == "RetryAttemptsExhausted"731 assert body["KinesisBatchInfo"]["batchSize"] == 1732 assert body["KinesisBatchInfo"]["streamArn"] == kinesis_arn733 retry(verify_failure_received, retries=50, sleep=5, sleep_before=5)734 finally:735 kinesis_client.delete_stream(StreamName=kinesis_name, EnforceConsumerDeletion=True)736 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)737def _await_event_source_mapping_enabled(lambda_client, uuid, retries=30):738 def assert_mapping_enabled():739 assert lambda_client.get_event_source_mapping(UUID=uuid)["State"] == "Enabled"740 retry(assert_mapping_enabled, sleep_before=2, retries=retries)741def _await_dynamodb_table_active(dynamodb_client, table_name, retries=6):742 def assert_table_active():743 assert (744 dynamodb_client.describe_table(TableName=table_name)["Table"]["TableStatus"] == "ACTIVE"745 )746 retry(assert_table_active, retries=retries, sleep_before=2)747def _get_lambda_invocation_events(logs_client, function_name, expected_num_events, retries=30):748 def get_events():749 events = get_lambda_log_events(function_name, logs_client=logs_client)750 assert len(events) == expected_num_events751 return events...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful