How to use is_old_provider method in localstack

Best Python code snippet using localstack_python

test_lambda.py

Source:test_lambda.py Github

copy

Full Screen

...138 ]139 if use_docker()140 else [LAMBDA_RUNTIME_JAVA11]141)142def is_old_provider():143 return (144 os.environ.get("TEST_TARGET") != "AWS_CLOUD"145 and os.environ.get("PROVIDER_OVERRIDE_LAMBDA") != "asf"146 )147PROVIDED_TEST_RUNTIMES = [148 LAMBDA_RUNTIME_PROVIDED,149 # TODO remove skip once we use correct images150 pytest.param(151 LAMBDA_RUNTIME_PROVIDED_AL2,152 marks=pytest.mark.skipif(153 is_old_provider(), reason="curl missing in provided.al2 lambci image"154 ),155 ),156]157# Snapshot patterns158IGNORE_LOGSTREAM_ID: Pattern[str] = re.compile(159 r"\d{4}/\d{2}/\d{2}/\[((\$LATEST)|\d+)\][0-9a-f]{32}"160)161T = TypeVar("T")162def read_streams(payload: T) -> T:163 new_payload = {}164 for k, v in payload.items():165 if isinstance(v, Dict):166 new_payload[k] = read_streams(v)167 elif isinstance(v, StreamingBody):168 new_payload[k] = to_str(v.read())169 else:170 new_payload[k] = v171 return new_payload172@pytest.fixture173def check_lambda_logs(logs_client):174 def _check_logs(func_name: str, expected_lines: List[str] = None):175 if not expected_lines:176 expected_lines = []177 log_events = get_lambda_logs(func_name, logs_client=logs_client)178 log_messages = [e["message"] for e in log_events]179 for line in expected_lines:180 if ".*" in line:181 found = [re.match(line, m, flags=re.DOTALL) for m in log_messages]182 if any(found):183 continue184 assert line in log_messages185 return _check_logs186def configure_snapshot_for_context(snapshot, function_name: str):187 """188 Utility function to configure snapshot to ignore a function name and log stream ids in its body.189 Helpful if invoke calls return the context object, in which they are present190 :param snapshot: Snapshot fixture result191 :param function_name: Function name of the current function192 """193 snapshot.register_replacement(IGNORE_LOGSTREAM_ID, "<log_stream_id>")194 snapshot.register_replacement(re.compile(function_name), "<function_name>")195# API only functions (no lambda execution itself)196class TestLambdaAPI:197 @pytest.mark.only_localstack198 def test_create_lambda_function(self, lambda_client):199 """Basic test that creates and deletes a Lambda function"""200 func_name = f"lambda_func-{short_uid()}"201 kms_key_arn = f"arn:{aws_stack.get_partition()}:kms:{aws_stack.get_region()}:{TEST_AWS_ACCOUNT_ID}:key11"202 vpc_config = {203 "SubnetIds": ["subnet-123456789"],204 "SecurityGroupIds": ["sg-123456789"],205 }206 tags = {"env": "testing"}207 kwargs = {208 "FunctionName": func_name,209 "Runtime": LAMBDA_RUNTIME_PYTHON37,210 "Handler": LAMBDA_DEFAULT_HANDLER,211 "Role": LAMBDA_TEST_ROLE,212 "KMSKeyArn": kms_key_arn,213 "Code": {214 "ZipFile": create_lambda_archive(215 load_file(TEST_LAMBDA_PYTHON_ECHO), get_content=True216 )217 },218 "Timeout": 3,219 "VpcConfig": vpc_config,220 "Tags": tags,221 "Environment": {"Variables": {"foo": "bar"}},222 }223 result = lambda_client.create_function(**kwargs)224 function_arn = result["FunctionArn"]225 assert testutil.response_arn_matches_partition(lambda_client, function_arn)226 partial_function_arn = ":".join(function_arn.split(":")[3:])227 # Get function by Name, ARN and partial ARN228 for func_ref in [func_name, function_arn, partial_function_arn]:229 rs = lambda_client.get_function(FunctionName=func_ref)230 assert rs["Configuration"].get("KMSKeyArn", "") == kms_key_arn231 assert rs["Configuration"].get("VpcConfig", {}) == vpc_config232 assert rs["Tags"] == tags233 # clean up234 lambda_client.delete_function(FunctionName=func_name)235 with pytest.raises(Exception) as exc:236 lambda_client.delete_function(FunctionName=func_name)237 assert "ResourceNotFoundException" in str(exc)238 @pytest.mark.snapshot239 def test_add_lambda_permission_aws(240 self, lambda_client, iam_client, create_lambda_function, snapshot241 ):242 """Testing the add_permission call on lambda, by adding a new resource-based policy to a lambda function"""243 function_name = f"lambda_func-{short_uid()}"244 lambda_create_response = create_lambda_function(245 handler_file=TEST_LAMBDA_PYTHON_ECHO,246 func_name=function_name,247 runtime=LAMBDA_RUNTIME_PYTHON36,248 )249 lambda_arn = lambda_create_response["CreateFunctionResponse"]["FunctionArn"]250 snapshot.match("create_lambda", lambda_create_response)251 # create lambda permission252 action = "lambda:InvokeFunction"253 sid = "s3"254 principal = "s3.amazonaws.com"255 resp = lambda_client.add_permission(256 FunctionName=function_name,257 Action=action,258 StatementId=sid,259 Principal=principal,260 SourceArn=aws_stack.s3_bucket_arn("test-bucket"),261 )262 snapshot.match("add_permission", resp)263 # fetch lambda policy264 get_policy_result = lambda_client.get_policy(FunctionName=function_name)265 snapshot.match("get_policy", get_policy_result)266 assert lambda_arn == json.loads(get_policy_result["Policy"])["Statement"][0]["Resource"]267 # TODO permissions cannot be added to $LATEST268 @pytest.mark.skipif(269 not is_old_provider(), reason="test does not make valid assertions against AWS"270 )271 def test_add_lambda_permission(self, lambda_client, iam_client, create_lambda_function):272 function_name = f"lambda_func-{short_uid()}"273 lambda_create_response = create_lambda_function(274 handler_file=TEST_LAMBDA_PYTHON_ECHO,275 func_name=function_name,276 runtime=LAMBDA_RUNTIME_PYTHON36,277 )278 lambda_arn = lambda_create_response["CreateFunctionResponse"]["FunctionArn"]279 # create lambda permission280 action = "lambda:InvokeFunction"281 sid = "s3"282 principal = "s3.amazonaws.com"283 resp = lambda_client.add_permission(284 FunctionName=function_name,285 Action=action,286 StatementId=sid,287 Principal=principal,288 SourceArn=aws_stack.s3_bucket_arn("test-bucket"),289 )290 # fetch lambda policy291 policy = lambda_client.get_policy(FunctionName=function_name)["Policy"]292 assert isinstance(policy, str)293 policy = json.loads(to_str(policy))294 assert action == policy["Statement"][0]["Action"]295 assert sid == policy["Statement"][0]["Sid"]296 assert lambda_arn == policy["Statement"][0]["Resource"]297 assert principal == policy["Statement"][0]["Principal"]["Service"]298 assert (299 aws_stack.s3_bucket_arn("test-bucket")300 == policy["Statement"][0]["Condition"]["ArnLike"]["AWS:SourceArn"]301 )302 # fetch IAM policy303 # this is not a valid assertion in general (especially against AWS)304 policies = iam_client.list_policies(Scope="Local", MaxItems=500)["Policies"]305 policy_name = get_lambda_policy_name(function_name)306 matching = [p for p in policies if p["PolicyName"] == policy_name]307 assert len(matching) == 1308 assert ":policy/" in matching[0]["Arn"]309 # remove permission that we just added310 resp = lambda_client.remove_permission(311 FunctionName=function_name,312 StatementId=sid,313 Qualifier="qual1",314 RevisionId="r1",315 )316 assert 200 == resp["ResponseMetadata"]["HTTPStatusCode"]317 def test_remove_multi_permissions(self, lambda_client, create_lambda_function, snapshot):318 """Tests creation and subsequent removal of multiple permissions, including the changes in the policy"""319 function_name = f"lambda_func-{short_uid()}"320 create_lambda_function(321 handler_file=TEST_LAMBDA_PYTHON_ECHO,322 func_name=function_name,323 runtime=LAMBDA_RUNTIME_PYTHON36,324 )325 action = "lambda:InvokeFunction"326 sid = "s3"327 principal = "s3.amazonaws.com"328 permission_1_add = lambda_client.add_permission(329 FunctionName=function_name,330 Action=action,331 StatementId=sid,332 Principal=principal,333 )334 snapshot.match("add_permission_1", permission_1_add)335 sid_2 = "sqs"336 principal_2 = "sqs.amazonaws.com"337 permission_2_add = lambda_client.add_permission(338 FunctionName=function_name,339 Action=action,340 StatementId=sid_2,341 Principal=principal_2,342 SourceArn=aws_stack.s3_bucket_arn("test-bucket"),343 )344 snapshot.match("add_permission_2", permission_2_add)345 policy_response = lambda_client.get_policy(346 FunctionName=function_name,347 )348 snapshot.match("policy_after_2_add", policy_response)349 with pytest.raises(ClientError) as e:350 lambda_client.remove_permission(351 FunctionName=function_name,352 StatementId="non-existent",353 )354 assert e.value.response["Error"]["Code"] == "ResourceNotFoundException"355 lambda_client.remove_permission(356 FunctionName=function_name,357 StatementId=sid_2,358 )359 policy = json.loads(360 lambda_client.get_policy(361 FunctionName=function_name,362 )["Policy"]363 )364 snapshot.match("policy_after_removal", policy)365 assert policy["Statement"][0]["Sid"] == sid366 lambda_client.remove_permission(367 FunctionName=function_name,368 StatementId=sid,369 )370 with pytest.raises(ClientError) as ctx:371 lambda_client.get_policy(FunctionName=function_name)372 assert ctx.value.response["Error"]["Code"] == "ResourceNotFoundException"373 @pytest.mark.skipif(374 not is_old_provider(), reason="test does not make valid assertions against AWS"375 )376 def test_add_lambda_multiple_permission(377 self, iam_client, lambda_client, create_lambda_function378 ):379 """Test adding multiple permissions"""380 function_name = f"lambda_func-{short_uid()}"381 create_lambda_function(382 handler_file=TEST_LAMBDA_PYTHON_ECHO,383 func_name=function_name,384 runtime=LAMBDA_RUNTIME_PYTHON36,385 )386 # create lambda permissions387 action = "lambda:InvokeFunction"388 principal = "s3.amazonaws.com"389 statement_ids = ["s4", "s5"]390 for sid in statement_ids:391 resp = lambda_client.add_permission(392 FunctionName=function_name,393 Action=action,394 StatementId=sid,395 Principal=principal,396 SourceArn=aws_stack.s3_bucket_arn("test-bucket"),397 )398 assert "Statement" in resp399 # fetch IAM policy400 # this is not a valid assertion in general (especially against AWS)401 policies = iam_client.list_policies(Scope="Local", MaxItems=500)["Policies"]402 policy_name = get_lambda_policy_name(function_name)403 matching = [p for p in policies if p["PolicyName"] == policy_name]404 assert 1 == len(matching)405 assert ":policy/" in matching[0]["Arn"]406 # validate both statements407 policy = matching[0]408 versions = iam_client.list_policy_versions(PolicyArn=policy["Arn"])["Versions"]409 assert 1 == len(versions)410 statements = versions[0]["Document"]["Statement"]411 for i in range(len(statement_ids)):412 assert action == statements[i]["Action"]413 assert lambda_api.func_arn(function_name) == statements[i]["Resource"]414 assert principal == statements[i]["Principal"]["Service"]415 assert (416 aws_stack.s3_bucket_arn("test-bucket")417 == statements[i]["Condition"]["ArnLike"]["AWS:SourceArn"]418 )419 # check statement_ids in reverse order420 assert statement_ids[abs(i - 1)] == statements[i]["Sid"]421 # remove permission that we just added422 resp = lambda_client.remove_permission(423 FunctionName=function_name,424 StatementId=sid,425 Qualifier="qual1",426 RevisionId="r1",427 )428 assert 200 == resp["ResponseMetadata"]["HTTPStatusCode"]429 @pytest.mark.snapshot430 def test_lambda_asynchronous_invocations(431 self,432 lambda_client,433 create_lambda_function,434 sqs_queue,435 sqs_queue_arn,436 lambda_su_role,437 snapshot,438 ):439 """Testing API actions of function event config"""440 function_name = f"lambda_func-{short_uid()}"441 create_lambda_function(442 handler_file=TEST_LAMBDA_PYTHON_ECHO,443 func_name=function_name,444 runtime=LAMBDA_RUNTIME_PYTHON36,445 role=lambda_su_role,446 )447 queue_arn = sqs_queue_arn(sqs_queue)448 destination_config = {449 "OnSuccess": {"Destination": queue_arn},450 "OnFailure": {"Destination": queue_arn},451 }452 # adding event invoke config453 response = lambda_client.put_function_event_invoke_config(454 FunctionName=function_name,455 MaximumRetryAttempts=2,456 MaximumEventAgeInSeconds=123,457 DestinationConfig=destination_config,458 )459 snapshot.match("put_function_event_invoke_config", response)460 # over writing event invoke config461 response = lambda_client.put_function_event_invoke_config(462 FunctionName=function_name,463 MaximumRetryAttempts=2,464 DestinationConfig=destination_config,465 )466 snapshot.match("put_function_event_invoke_config_overwritemaxeventage", response)467 # updating event invoke config468 response = lambda_client.update_function_event_invoke_config(469 FunctionName=function_name,470 MaximumRetryAttempts=1,471 )472 snapshot.match("put_function_event_invoke_config_maxattempt1", response)473 # clean up474 lambda_client.delete_function_event_invoke_config(FunctionName=function_name)475 def test_function_concurrency(self, lambda_client, create_lambda_function, snapshot):476 """Testing the api of the put function concurrency action"""477 function_name = f"lambda_func-{short_uid()}"478 create_lambda_function(479 handler_file=TEST_LAMBDA_PYTHON_ECHO,480 func_name=function_name,481 runtime=LAMBDA_RUNTIME_PYTHON36,482 )483 response = lambda_client.put_function_concurrency(484 FunctionName=function_name, ReservedConcurrentExecutions=123485 )486 snapshot.match("put_function_concurrency", response)487 assert "ReservedConcurrentExecutions" in response488 response = lambda_client.get_function_concurrency(FunctionName=function_name)489 snapshot.match("get_function_concurrency", response)490 assert "ReservedConcurrentExecutions" in response491 lambda_client.delete_function_concurrency(FunctionName=function_name)492 def test_function_code_signing_config(self, lambda_client, create_lambda_function, snapshot):493 """Testing the API of code signing config"""494 function_name = f"lambda_func-{short_uid()}"495 create_lambda_function(496 handler_file=TEST_LAMBDA_PYTHON_ECHO,497 func_name=function_name,498 runtime=LAMBDA_RUNTIME_PYTHON36,499 )500 response = lambda_client.create_code_signing_config(501 Description="Testing CodeSigning Config",502 AllowedPublishers={503 "SigningProfileVersionArns": [504 f"arn:aws:signer:{aws_stack.get_region()}:000000000000:/signing-profiles/test",505 ]506 },507 CodeSigningPolicies={"UntrustedArtifactOnDeployment": "Enforce"},508 )509 snapshot.replace_value(re.compile(r"^csc-[0-9a-f]{17}$"), "<csc-id>")510 snapshot.match("create_code_signing_config", response)511 assert "Description" in response["CodeSigningConfig"]512 assert "SigningProfileVersionArns" in response["CodeSigningConfig"]["AllowedPublishers"]513 assert (514 "UntrustedArtifactOnDeployment" in response["CodeSigningConfig"]["CodeSigningPolicies"]515 )516 code_signing_arn = response["CodeSigningConfig"]["CodeSigningConfigArn"]517 response = lambda_client.update_code_signing_config(518 CodeSigningConfigArn=code_signing_arn,519 CodeSigningPolicies={"UntrustedArtifactOnDeployment": "Warn"},520 )521 snapshot.match("update_code_signing_config", response)522 assert (523 "Warn"524 == response["CodeSigningConfig"]["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"]525 )526 response = lambda_client.get_code_signing_config(CodeSigningConfigArn=code_signing_arn)527 assert 200 == response["ResponseMetadata"]["HTTPStatusCode"]528 snapshot.match("get_code_signing_config", response)529 response = lambda_client.put_function_code_signing_config(530 CodeSigningConfigArn=code_signing_arn, FunctionName=function_name531 )532 assert 200 == response["ResponseMetadata"]["HTTPStatusCode"]533 snapshot.match("put_function_code_signing_config", response)534 response = lambda_client.get_function_code_signing_config(FunctionName=function_name)535 assert 200 == response["ResponseMetadata"]["HTTPStatusCode"]536 snapshot.match("get_function_code_signing_config", response)537 assert code_signing_arn == response["CodeSigningConfigArn"]538 assert function_name == response["FunctionName"]539 response = lambda_client.delete_function_code_signing_config(FunctionName=function_name)540 assert 204 == response["ResponseMetadata"]["HTTPStatusCode"]541 response = lambda_client.delete_code_signing_config(CodeSigningConfigArn=code_signing_arn)542 assert 204 == response["ResponseMetadata"]["HTTPStatusCode"]543 def create_multiple_lambda_permissions(self, lambda_client, create_lambda_function, snapshot):544 """Test creating multiple lambda permissions and checking the policy"""545 function_name = f"test-function-{short_uid()}"546 create_lambda_function(547 funct_name=function_name,548 runtime=LAMBDA_RUNTIME_PYTHON37,549 libs=TEST_LAMBDA_LIBS,550 )551 action = "lambda:InvokeFunction"552 sid = "logs"553 resp = lambda_client.add_permission(554 FunctionName=function_name,555 Action=action,556 StatementId=sid,557 Principal="logs.amazonaws.com",558 )559 snapshot.match("add_permission_response_1", resp)560 assert "Statement" in resp561 sid = "kinesis"562 resp = lambda_client.add_permission(563 FunctionName=function_name,564 Action=action,565 StatementId=sid,566 Principal="kinesis.amazonaws.com",567 )568 snapshot.match("add_permission_response_2", resp)569 assert "Statement" in resp570 policy_response = lambda_client.get_policy(571 FunctionName=function_name,572 )573 snapshot.match("policy_after_2_add", policy_response)574class TestLambdaBaseFeatures:575 def test_dead_letter_queue(576 self,577 lambda_client,578 create_lambda_function,579 sqs_client,580 sqs_create_queue,581 sqs_queue_arn,582 lambda_su_role,583 snapshot,584 ):585 """Creates a lambda with a defined dead letter queue, and check failed lambda invocation leads to a message"""586 # create DLQ and Lambda function587 queue_name = f"test-{short_uid()}"588 lambda_name = f"test-{short_uid()}"589 queue_url = sqs_create_queue(QueueName=queue_name)590 queue_arn = sqs_queue_arn(queue_url)591 create_lambda_response = create_lambda_function(592 handler_file=TEST_LAMBDA_PYTHON,593 func_name=lambda_name,594 libs=TEST_LAMBDA_LIBS,595 runtime=LAMBDA_RUNTIME_PYTHON36,596 DeadLetterConfig={"TargetArn": queue_arn},597 role=lambda_su_role,598 )599 snapshot.match("create_lambda_with_dlq", create_lambda_response)600 snapshot.skip_key(re.compile("ReceiptHandle"), "<receipt-handle>")601 snapshot.skip_key(re.compile("MD5Of.*"), "<md5-hash>")602 # invoke Lambda, triggering an error603 payload = {lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1}604 lambda_client.invoke(605 FunctionName=lambda_name,606 Payload=json.dumps(payload),607 InvocationType="Event",608 )609 # assert that message has been received on the DLQ610 def receive_dlq():611 result = sqs_client.receive_message(QueueUrl=queue_url, MessageAttributeNames=["All"])612 assert len(result["Messages"]) > 0613 msg_attrs = result["Messages"][0]["MessageAttributes"]614 assert "RequestID" in msg_attrs615 assert "ErrorCode" in msg_attrs616 assert "ErrorMessage" in msg_attrs617 snapshot.match("sqs_dlq_message", result)618 # on AWS, event retries can be quite delayed, so we have to wait up to 6 minutes here, potential flakes619 retry(receive_dlq, retries=120, sleep=3)620 # update DLQ config621 update_function_config_response = lambda_client.update_function_configuration(622 FunctionName=lambda_name, DeadLetterConfig={}623 )624 snapshot.match("delete_dlq", update_function_config_response)625 # invoke Lambda again, assert that status code is 200 and error details contained in the payload626 result = lambda_client.invoke(627 FunctionName=lambda_name, Payload=json.dumps(payload), LogType="Tail"628 )629 result = read_streams(result)630 payload = json.loads(to_str(result["Payload"]))631 snapshot.match("result_payload", payload)632 assert 200 == result["StatusCode"]633 assert "Unhandled" == result["FunctionError"]634 assert "$LATEST" == result["ExecutedVersion"]635 assert "Test exception" in payload["errorMessage"]636 assert "Exception" in payload["errorType"]637 assert isinstance(payload["stackTrace"], list)638 log_result = result.get("LogResult")639 assert log_result640 logs = to_str(base64.b64decode(to_str(log_result)))641 assert "START" in logs642 assert "Test exception" in logs643 assert "END" in logs644 assert "REPORT" in logs645 @pytest.mark.parametrize(646 "condition,payload",647 [648 ("Success", {}),649 ("RetriesExhausted", {lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1}),650 ],651 )652 def test_assess_lambda_destination_invocation(653 self,654 condition,655 payload,656 lambda_client,657 sqs_client,658 create_lambda_function,659 sqs_create_queue,660 sqs_queue_arn,661 lambda_su_role,662 snapshot,663 ):664 """Testing the destination config API and operation (for the OnSuccess case)"""665 # create DLQ and Lambda function666 queue_name = f"test-{short_uid()}"667 lambda_name = f"test-{short_uid()}"668 queue_url = sqs_create_queue(QueueName=queue_name)669 queue_arn = sqs_queue_arn(queue_url)670 create_lambda_function(671 handler_file=TEST_LAMBDA_PYTHON,672 func_name=lambda_name,673 libs=TEST_LAMBDA_LIBS,674 role=lambda_su_role,675 )676 put_event_invoke_config_response = lambda_client.put_function_event_invoke_config(677 FunctionName=lambda_name,678 DestinationConfig={679 "OnSuccess": {"Destination": queue_arn},680 "OnFailure": {"Destination": queue_arn},681 },682 )683 snapshot.match("put_function_event_invoke_config", put_event_invoke_config_response)684 snapshot.skip_key(re.compile("ReceiptHandle"), "<receipt-handle>")685 snapshot.skip_key(re.compile("MD5Of.*"), "<md5-hash>")686 lambda_client.invoke(687 FunctionName=lambda_name,688 Payload=json.dumps(payload),689 InvocationType="Event",690 )691 configure_snapshot_for_context(snapshot, lambda_name)692 def receive_message():693 rs = sqs_client.receive_message(QueueUrl=queue_url, MessageAttributeNames=["All"])694 assert len(rs["Messages"]) > 0695 msg = rs["Messages"][0]["Body"]696 msg = json.loads(msg)697 assert condition == msg["requestContext"]["condition"]698 snapshot.match("destination_message", rs)699 retry(receive_message, retries=120, sleep=3)700 def test_large_payloads(self, caplog, lambda_client, create_lambda_function, snapshot):701 """Testing large payloads sent to lambda functions (~5MB)"""702 # Set the loglevel to INFO for this test to avoid breaking a CI environment (due to excessive log outputs)703 caplog.set_level(logging.INFO)704 function_name = f"large_payload-{short_uid()}"705 create_lambda_function(706 handler_file=TEST_LAMBDA_PYTHON_ECHO,707 func_name=function_name,708 runtime=LAMBDA_RUNTIME_PYTHON36,709 )710 payload = {"test": "test123456" * 100 * 1000 * 5} # 5MB payload711 payload_bytes = to_bytes(json.dumps(payload))712 result = lambda_client.invoke(FunctionName=function_name, Payload=payload_bytes)713 result = read_streams(result)714 snapshot.match("invocation_response", result)715 assert 200 == result["ResponseMetadata"]["HTTPStatusCode"]716 result_data = result["Payload"]717 result_data = json.loads(to_str(result_data))718 assert payload == result_data719parametrize_python_runtimes = pytest.mark.parametrize(720 "runtime",721 PYTHON_TEST_RUNTIMES,722)723class TestPythonRuntimes:724 @pytest.fixture(725 params=PYTHON_TEST_RUNTIMES,726 )727 def python_function_name(self, request, lambda_client, create_lambda_function):728 function_name = f"python-test-function-{short_uid()}"729 create_lambda_function(730 func_name=function_name,731 handler_file=TEST_LAMBDA_PYTHON,732 libs=TEST_LAMBDA_LIBS,733 runtime=request.param,734 )735 return function_name736 def test_invocation_type_not_set(self, lambda_client, python_function_name, snapshot):737 """Test invocation of a lambda with no invocation type set, but LogType="Tail""" ""738 result = lambda_client.invoke(739 FunctionName=python_function_name, Payload=b"{}", LogType="Tail"740 )741 result = read_streams(result)742 snapshot.skip_key(re.compile("LogResult"), "<log_result>")743 configure_snapshot_for_context(snapshot, python_function_name)744 snapshot.match("invoke", result)745 result_data = json.loads(result["Payload"])746 # assert response details747 assert 200 == result["StatusCode"]748 assert {} == result_data["event"]749 # assert that logs are contained in response750 logs = result.get("LogResult", "")751 logs = to_str(base64.b64decode(to_str(logs)))752 snapshot.register_replacement(753 re.compile(r"Duration: \d+(\.\d{2})? ms"), "Duration: <duration> ms"754 )755 snapshot.register_replacement(re.compile(r"Used: \d+ MB"), "Used: <memory> MB")756 snapshot.match("logs", {"logs": logs})757 assert "START" in logs758 assert "Lambda log message" in logs759 assert "END" in logs760 assert "REPORT" in logs761 def test_invocation_type_request_response(self, lambda_client, python_function_name, snapshot):762 """Test invocation with InvocationType RequestResponse explicitely set"""763 result = lambda_client.invoke(764 FunctionName=python_function_name,765 Payload=b"{}",766 InvocationType="RequestResponse",767 )768 result = read_streams(result)769 configure_snapshot_for_context(snapshot, python_function_name)770 snapshot.match("invoke-result", result)771 result_data = result["Payload"]772 result_data = json.loads(result_data)773 assert "application/json" == result["ResponseMetadata"]["HTTPHeaders"]["content-type"]774 assert 200 == result["StatusCode"]775 assert isinstance(result_data, dict)776 def test_invocation_type_event(self, lambda_client, python_function_name, snapshot):777 """Check invocation response for type event"""778 result = lambda_client.invoke(779 FunctionName=python_function_name, Payload=b"{}", InvocationType="Event"780 )781 result = read_streams(result)782 snapshot.match("invoke-result", result)783 assert 202 == result["StatusCode"]784 def test_invocation_type_dry_run(self, lambda_client, python_function_name, snapshot):785 """Check invocation response for type dryrun"""786 result = lambda_client.invoke(787 FunctionName=python_function_name, Payload=b"{}", InvocationType="DryRun"788 )789 result = read_streams(result)790 snapshot.match("invoke-result", result)791 assert 204 == result["StatusCode"]792 @parametrize_python_runtimes793 def test_lambda_environment(self, lambda_client, create_lambda_function, runtime, snapshot):794 """Tests invoking a lambda function with environment variables set on creation"""795 function_name = f"env-test-function-{short_uid()}"796 env_vars = {"Hello": "World"}797 creation_result = create_lambda_function(798 handler_file=TEST_LAMBDA_ENV,799 libs=TEST_LAMBDA_LIBS,800 func_name=function_name,801 envvars=env_vars,802 runtime=runtime,803 )804 snapshot.match("creation-result", creation_result)805 # invoke function and assert result contains env vars806 result = lambda_client.invoke(FunctionName=function_name, Payload=b"{}")807 result = read_streams(result)808 snapshot.match("invocation-result", result)809 result_data = result["Payload"]810 assert 200 == result["StatusCode"]811 assert json.loads(result_data) == env_vars812 # get function config and assert result contains env vars813 result = lambda_client.get_function_configuration(FunctionName=function_name)814 snapshot.match("get-configuration-result", result)815 assert result["Environment"] == {"Variables": env_vars}816 @parametrize_python_runtimes817 def test_invocation_with_qualifier(818 self,819 lambda_client,820 s3_client,821 s3_bucket,822 runtime,823 check_lambda_logs,824 lambda_su_role,825 wait_until_lambda_ready,826 snapshot,827 ):828 """Tests invocation of python lambda with a given qualifier"""829 function_name = f"test_lambda_{short_uid()}"830 bucket_key = "test_lambda.zip"831 # upload zip file to S3832 zip_file = create_lambda_archive(833 load_file(TEST_LAMBDA_PYTHON), get_content=True, libs=TEST_LAMBDA_LIBS, runtime=runtime834 )835 s3_client.upload_fileobj(BytesIO(zip_file), s3_bucket, bucket_key)836 # create lambda function837 response = lambda_client.create_function(838 FunctionName=function_name,839 Runtime=runtime,840 Role=lambda_su_role,841 Publish=True,842 Handler="handler.handler",843 Code={"S3Bucket": s3_bucket, "S3Key": bucket_key},844 Timeout=10,845 )846 snapshot.match("creation-response", response)847 configure_snapshot_for_context(snapshot, function_name)848 assert "Version" in response849 qualifier = response["Version"]850 wait_until_lambda_ready(function_name=function_name, qualifier=qualifier)851 # invoke lambda function852 data_before = b'{"foo": "bar with \'quotes\\""}'853 result = lambda_client.invoke(854 FunctionName=function_name, Payload=data_before, Qualifier=qualifier855 )856 result = read_streams(result)857 snapshot.match("invocation-response", result)858 data_after = json.loads(result["Payload"])859 assert json.loads(to_str(data_before)) == data_after["event"]860 context = data_after["context"]861 assert response["Version"] == context["function_version"]862 assert context.get("aws_request_id")863 assert function_name == context["function_name"]864 assert f"/aws/lambda/{function_name}" == context["log_group_name"]865 assert context.get("log_stream_name")866 assert context.get("memory_limit_in_mb")867 # assert that logs are present868 expected = [".*Lambda log message - print function.*"]869 if use_docker():870 # Note that during regular test execution, nosetests captures the output from871 # the logging module - hence we can only expect this when running in Docker872 expected.append(".*Lambda log message - logging module.*")873 def check_logs():874 check_lambda_logs(function_name, expected_lines=expected)875 retry(check_logs, retries=10)876 lambda_client.delete_function(FunctionName=function_name)877 @parametrize_python_runtimes878 def test_upload_lambda_from_s3(879 self,880 lambda_client,881 s3_client,882 s3_bucket,883 runtime,884 lambda_su_role,885 wait_until_lambda_ready,886 snapshot,887 ):888 """Test invocation of a python lambda with its deployment package uploaded to s3"""889 function_name = f"test_lambda_{short_uid()}"890 bucket_key = "test_lambda.zip"891 # upload zip file to S3892 zip_file = testutil.create_lambda_archive(893 load_file(TEST_LAMBDA_PYTHON), get_content=True, libs=TEST_LAMBDA_LIBS, runtime=runtime894 )895 s3_client.upload_fileobj(BytesIO(zip_file), s3_bucket, bucket_key)896 # create lambda function897 create_response = lambda_client.create_function(898 FunctionName=function_name,899 Runtime=runtime,900 Handler="handler.handler",901 Role=lambda_su_role,902 Code={"S3Bucket": s3_bucket, "S3Key": bucket_key},903 Timeout=10,904 )905 snapshot.match("creation-response", create_response)906 configure_snapshot_for_context(snapshot, function_name)907 wait_until_lambda_ready(function_name=function_name)908 # invoke lambda function909 data_before = b'{"foo": "bar with \'quotes\\""}'910 result = lambda_client.invoke(FunctionName=function_name, Payload=data_before)911 result = read_streams(result)912 snapshot.match("invocation-response", result)913 data_after = json.loads(result["Payload"])914 assert json.loads(to_str(data_before)) == data_after["event"]915 context = data_after["context"]916 assert "$LATEST" == context["function_version"]917 assert function_name == context["function_name"]918 # clean up919 lambda_client.delete_function(FunctionName=function_name)920 @parametrize_python_runtimes921 def test_handler_in_submodule(self, lambda_client, create_lambda_function, runtime):922 """Test invocation of a lambda handler which resides in a submodule (= not root module)"""923 function_name = f"test-function-{short_uid()}"924 zip_file = testutil.create_lambda_archive(925 load_file(TEST_LAMBDA_PYTHON),926 get_content=True,927 libs=TEST_LAMBDA_LIBS,928 runtime=runtime,929 file_name="localstack_package/def/main.py",930 )931 create_lambda_function(932 func_name=function_name,933 zip_file=zip_file,934 handler="localstack_package.def.main.handler",935 runtime=runtime,936 )937 # invoke function and assert result938 result = lambda_client.invoke(FunctionName=function_name, Payload=b"{}")939 result_data = json.loads(result["Payload"].read())940 assert 200 == result["StatusCode"]941 assert json.loads("{}") == result_data["event"]942 @parametrize_python_runtimes943 def test_lambda_send_message_to_sqs(944 self,945 lambda_client,946 create_lambda_function,947 sqs_client,948 sqs_create_queue,949 runtime,950 lambda_su_role,951 ):952 """Send sqs message to sqs queue inside python lambda"""953 function_name = f"test-function-{short_uid()}"954 queue_name = f"lambda-queue-{short_uid()}"955 queue_url = sqs_create_queue(QueueName=queue_name)956 create_lambda_function(957 handler_file=TEST_LAMBDA_SEND_MESSAGE_FILE,958 func_name=function_name,959 runtime=runtime,960 role=lambda_su_role,961 )962 event = {963 "message": f"message-from-test-lambda-{short_uid()}",964 "queue_name": queue_name,965 "region_name": sqs_client.meta.region_name,966 }967 lambda_client.invoke(FunctionName=function_name, Payload=json.dumps(event))968 # assert that message has been received on the Queue969 def receive_message():970 rs = sqs_client.receive_message(QueueUrl=queue_url, MessageAttributeNames=["All"])971 assert len(rs["Messages"]) > 0972 return rs["Messages"][0]973 message = retry(receive_message, retries=15, sleep=2)974 assert event["message"] == message["Body"]975 @parametrize_python_runtimes976 def test_lambda_put_item_to_dynamodb(977 self,978 lambda_client,979 create_lambda_function,980 dynamodb_create_table,981 runtime,982 dynamodb_resource,983 lambda_su_role,984 dynamodb_client,985 ):986 """Put item into dynamodb from python lambda"""987 table_name = f"ddb-table-{short_uid()}"988 function_name = f"test-function-{short_uid()}"989 dynamodb_create_table(table_name=table_name, partition_key="id")990 create_lambda_function(991 handler_file=TEST_LAMBDA_PUT_ITEM_FILE,992 func_name=function_name,993 runtime=runtime,994 role=lambda_su_role,995 )996 data = {short_uid(): f"data-{i}" for i in range(3)}997 event = {998 "table_name": table_name,999 "region_name": dynamodb_client.meta.region_name,1000 "items": [{"id": k, "data": v} for k, v in data.items()],1001 }1002 def wait_for_table_created():1003 return (1004 dynamodb_client.describe_table(TableName=table_name)["Table"]["TableStatus"]1005 == "ACTIVE"1006 )1007 assert poll_condition(wait_for_table_created, timeout=30)1008 lambda_client.invoke(FunctionName=function_name, Payload=json.dumps(event))1009 rs = dynamodb_resource.Table(table_name).scan()1010 items = rs["Items"]1011 assert len(items) == len(data.keys())1012 for item in items:1013 assert data[item["id"]] == item["data"]1014 @parametrize_python_runtimes1015 def test_lambda_start_stepfunctions_execution(1016 self, lambda_client, stepfunctions_client, create_lambda_function, runtime, lambda_su_role1017 ):1018 """Start stepfunctions machine execution from lambda"""1019 function_name = f"test-function-{short_uid()}"1020 resource_lambda_name = f"test-resource-{short_uid()}"1021 state_machine_name = f"state-machine-{short_uid()}"1022 create_lambda_function(1023 handler_file=TEST_LAMBDA_START_EXECUTION_FILE,1024 func_name=function_name,1025 runtime=runtime,1026 role=lambda_su_role,1027 )1028 resource_lambda_arn = create_lambda_function(1029 handler_file=TEST_LAMBDA_PYTHON_ECHO,1030 func_name=resource_lambda_name,1031 runtime=runtime,1032 role=lambda_su_role,1033 )["CreateFunctionResponse"]["FunctionArn"]1034 state_machine_def = {1035 "StartAt": "step1",1036 "States": {1037 "step1": {1038 "Type": "Task",1039 "Resource": resource_lambda_arn,1040 "ResultPath": "$.result_value",1041 "End": True,1042 }1043 },1044 }1045 rs = stepfunctions_client.create_state_machine(1046 name=state_machine_name,1047 definition=json.dumps(state_machine_def),1048 roleArn=lambda_su_role,1049 )1050 sm_arn = rs["stateMachineArn"]1051 try:1052 lambda_client.invoke(1053 FunctionName=function_name,1054 Payload=json.dumps(1055 {1056 "state_machine_arn": sm_arn,1057 "region_name": stepfunctions_client.meta.region_name,1058 "input": {},1059 }1060 ),1061 )1062 time.sleep(1)1063 rs = stepfunctions_client.list_executions(stateMachineArn=sm_arn)1064 # assert that state machine get executed 1 time1065 assert 1 == len([ex for ex in rs["executions"] if ex["stateMachineArn"] == sm_arn])1066 finally:1067 # clean up1068 stepfunctions_client.delete_state_machine(stateMachineArn=sm_arn)1069 @pytest.mark.skipif(1070 not use_docker(), reason="Test for docker python runtimes not applicable if run locally"1071 )1072 @parametrize_python_runtimes1073 def test_python_runtime_correct_versions(self, lambda_client, create_lambda_function, runtime):1074 """Test different versions of python runtimes to report back the correct python version"""1075 function_name = f"test_python_executor_{short_uid()}"1076 create_lambda_function(1077 func_name=function_name,1078 handler_file=TEST_LAMBDA_PYTHON_VERSION,1079 runtime=runtime,1080 )1081 result = lambda_client.invoke(1082 FunctionName=function_name,1083 Payload=b"{}",1084 )1085 result = json.loads(to_str(result["Payload"].read()))1086 assert result["version"] == runtime1087 @pytest.mark.skipif(1088 not use_docker(), reason="Test for docker python runtimes not applicable if run locally"1089 )1090 @parametrize_python_runtimes1091 def test_python_runtime_unhandled_errors(1092 self, lambda_client, create_lambda_function, runtime, snapshot1093 ):1094 """Test unhandled errors during python lambda invocation"""1095 function_name = f"test_python_executor_{short_uid()}"1096 creation_response = create_lambda_function(1097 func_name=function_name,1098 handler_file=TEST_LAMBDA_PYTHON_UNHANDLED_ERROR,1099 runtime=runtime,1100 )1101 snapshot.match("creation_response", creation_response)1102 result = lambda_client.invoke(1103 FunctionName=function_name,1104 Payload=b"{}",1105 )1106 result = read_streams(result)1107 snapshot.match("invocation_response", result)1108 assert result["StatusCode"] == 2001109 assert result["ExecutedVersion"] == "$LATEST"1110 assert result["FunctionError"] == "Unhandled"1111 payload = json.loads(result["Payload"])1112 assert payload["errorType"] == "CustomException"1113 assert payload["errorMessage"] == "some error occurred"1114 assert "stackTrace" in payload1115 if (1116 runtime == "python3.9" and not is_old_provider()1117 ): # TODO: remove this after the legacy provider is gone1118 assert "requestId" in payload1119 else:1120 assert "requestId" not in payload1121parametrize_node_runtimes = pytest.mark.parametrize(1122 "runtime",1123 NODE_TEST_RUNTIMES,1124)1125class TestNodeJSRuntimes:1126 @pytest.mark.skipif(1127 not use_docker(), reason="Test for docker nodejs runtimes not applicable if run locally"1128 )1129 @parametrize_node_runtimes1130 def test_nodejs_lambda_with_context(1131 self, lambda_client, create_lambda_function, runtime, check_lambda_logs, snapshot1132 ):1133 """Test context of nodejs lambda invocation"""1134 function_name = f"test-function-{short_uid()}"1135 creation_response = create_lambda_function(1136 func_name=function_name,1137 handler_file=TEST_LAMBDA_INTEGRATION_NODEJS,1138 handler="lambda_integration.handler",1139 runtime=runtime,1140 )1141 snapshot.match("creation", creation_response)1142 ctx = {1143 "custom": {"foo": "bar"},1144 "client": {"snap": ["crackle", "pop"]},1145 "env": {"fizz": "buzz"},1146 }1147 configure_snapshot_for_context(snapshot, function_name)1148 result = lambda_client.invoke(1149 FunctionName=function_name,1150 Payload=b"{}",1151 ClientContext=to_str(base64.b64encode(to_bytes(json.dumps(ctx)))),1152 )1153 result = read_streams(result)1154 snapshot.match("invocation", result)1155 result_data = result["Payload"]1156 assert 200 == result["StatusCode"]1157 client_context = json.loads(result_data)["context"]["clientContext"]1158 # TODO in the old provider, for some reason this is necessary. That is invalid behavior1159 if is_old_provider():1160 client_context = json.loads(client_context)1161 assert "bar" == client_context.get("custom").get("foo")1162 # assert that logs are present1163 expected = [".*Node.js Lambda handler executing."]1164 def check_logs():1165 check_lambda_logs(function_name, expected_lines=expected)1166 retry(check_logs, retries=15)1167 @parametrize_node_runtimes1168 def test_invoke_nodejs_lambda(1169 self, lambda_client, create_lambda_function, runtime, logs_client, snapshot1170 ):1171 """Test simple nodejs lambda invocation"""1172 function_name = f"test-function-{short_uid()}"1173 result = create_lambda_function(...

Full Screen

Full Screen

test_lambda_integration.py

Source:test_lambda_integration.py Github

copy

Full Screen

...83 partition_key="id",84 stream_view_type="NEW_IMAGE",85 )["TableDescription"]86 # table ARNs are not sufficient as event source, needs to be a dynamodb stream arn87 if not is_old_provider():88 with pytest.raises(ClientError) as e:89 lambda_client.create_event_source_mapping(90 EventSourceArn=table_description["TableArn"],91 FunctionName=function_name,92 StartingPosition="LATEST",93 )94 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)95 # check if event source mapping can be created with latest stream ARN96 rs = lambda_client.create_event_source_mapping(97 EventSourceArn=table_description["LatestStreamArn"],98 FunctionName=function_name,99 StartingPosition="LATEST",100 )101 assert BATCH_SIZE_RANGES["dynamodb"][0] == rs["BatchSize"]102 def test_disabled_event_source_mapping_with_dynamodb(103 self,104 create_lambda_function,105 lambda_client,106 dynamodb_resource,107 dynamodb_client,108 dynamodb_create_table,109 logs_client,110 dynamodbstreams_client,111 lambda_su_role,112 ):113 function_name = f"lambda_func-{short_uid()}"114 ddb_table = f"ddb_table-{short_uid()}"115 create_lambda_function(116 func_name=function_name,117 handler_file=TEST_LAMBDA_PYTHON_ECHO,118 runtime=LAMBDA_RUNTIME_PYTHON36,119 role=lambda_su_role,120 )121 latest_stream_arn = dynamodb_create_table(122 table_name=ddb_table, partition_key="id", stream_view_type="NEW_IMAGE"123 )["TableDescription"]["LatestStreamArn"]124 rs = lambda_client.create_event_source_mapping(125 FunctionName=function_name,126 EventSourceArn=latest_stream_arn,127 StartingPosition="TRIM_HORIZON",128 MaximumBatchingWindowInSeconds=1,129 )130 uuid = rs["UUID"]131 def wait_for_table_created():132 return (133 dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]134 == "ACTIVE"135 )136 assert poll_condition(wait_for_table_created, timeout=30)137 def wait_for_stream_created():138 return (139 dynamodbstreams_client.describe_stream(StreamArn=latest_stream_arn)[140 "StreamDescription"141 ]["StreamStatus"]142 == "ENABLED"143 )144 assert poll_condition(wait_for_stream_created, timeout=30)145 table = dynamodb_resource.Table(ddb_table)146 items = [147 {"id": short_uid(), "data": "data1"},148 {"id": short_uid(), "data": "data2"},149 ]150 table.put_item(Item=items[0])151 def assert_events():152 events = get_lambda_log_events(function_name, logs_client=logs_client)153 # lambda was invoked 1 time154 assert 1 == len(events[0]["Records"])155 # might take some time against AWS156 retry(assert_events, sleep=3, retries=10)157 # disable event source mapping158 lambda_client.update_event_source_mapping(UUID=uuid, Enabled=False)159 table.put_item(Item=items[1])160 events = get_lambda_log_events(function_name, logs_client=logs_client)161 # lambda no longer invoked, still have 1 event162 assert 1 == len(events[0]["Records"])163 # TODO invalid test against AWS, this behavior just is not correct164 def test_deletion_event_source_mapping_with_dynamodb(165 self, create_lambda_function, lambda_client, dynamodb_client, lambda_su_role166 ):167 function_name = f"lambda_func-{short_uid()}"168 ddb_table = f"ddb_table-{short_uid()}"169 create_lambda_function(170 func_name=function_name,171 handler_file=TEST_LAMBDA_PYTHON_ECHO,172 runtime=LAMBDA_RUNTIME_PYTHON36,173 role=lambda_su_role,174 )175 latest_stream_arn = aws_stack.create_dynamodb_table(176 table_name=ddb_table,177 partition_key="id",178 client=dynamodb_client,179 stream_view_type="NEW_IMAGE",180 )["TableDescription"]["LatestStreamArn"]181 lambda_client.create_event_source_mapping(182 FunctionName=function_name,183 EventSourceArn=latest_stream_arn,184 StartingPosition="TRIM_HORIZON",185 )186 def wait_for_table_created():187 return (188 dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]189 == "ACTIVE"190 )191 assert poll_condition(wait_for_table_created, timeout=30)192 dynamodb_client.delete_table(TableName=ddb_table)193 result = lambda_client.list_event_source_mappings(EventSourceArn=latest_stream_arn)194 assert 1 == len(result["EventSourceMappings"])195 def test_event_source_mapping_with_sqs(196 self,197 create_lambda_function,198 lambda_client,199 sqs_client,200 sqs_create_queue,201 sqs_queue_arn,202 logs_client,203 lambda_su_role,204 ):205 function_name = f"lambda_func-{short_uid()}"206 queue_name_1 = f"queue-{short_uid()}-1"207 create_lambda_function(208 func_name=function_name,209 handler_file=TEST_LAMBDA_PYTHON_ECHO,210 runtime=LAMBDA_RUNTIME_PYTHON36,211 role=lambda_su_role,212 )213 queue_url_1 = sqs_create_queue(QueueName=queue_name_1)214 queue_arn_1 = sqs_queue_arn(queue_url_1)215 lambda_client.create_event_source_mapping(216 EventSourceArn=queue_arn_1, FunctionName=function_name, MaximumBatchingWindowInSeconds=1217 )218 sqs_client.send_message(QueueUrl=queue_url_1, MessageBody=json.dumps({"foo": "bar"}))219 def assert_lambda_log_events():220 events = get_lambda_log_events(function_name=function_name, logs_client=logs_client)221 # lambda was invoked 1 time222 assert 1 == len(events[0]["Records"])223 retry(assert_lambda_log_events, sleep_before=3, retries=30)224 rs = sqs_client.receive_message(QueueUrl=queue_url_1)225 assert rs.get("Messages") is None226 def test_create_kinesis_event_source_mapping(227 self,228 create_lambda_function,229 lambda_client,230 kinesis_client,231 kinesis_create_stream,232 lambda_su_role,233 wait_for_stream_ready,234 logs_client,235 ):236 function_name = f"lambda_func-{short_uid()}"237 stream_name = f"test-foobar-{short_uid()}"238 create_lambda_function(239 func_name=function_name,240 handler_file=TEST_LAMBDA_PYTHON_ECHO,241 runtime=LAMBDA_RUNTIME_PYTHON36,242 role=lambda_su_role,243 )244 kinesis_create_stream(StreamName=stream_name, ShardCount=1)245 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][246 "StreamARN"247 ]248 # only valid against AWS / new provider (once implemented)249 if not is_old_provider():250 with pytest.raises(ClientError) as e:251 lambda_client.create_event_source_mapping(252 EventSourceArn=stream_arn, FunctionName=function_name253 )254 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)255 wait_for_stream_ready(stream_name=stream_name)256 lambda_client.create_event_source_mapping(257 EventSourceArn=stream_arn, FunctionName=function_name, StartingPosition="TRIM_HORIZON"258 )259 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)260 assert 1 == stream_summary["StreamDescriptionSummary"]["OpenShardCount"]261 num_events_kinesis = 10262 kinesis_client.put_records(263 Records=[...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful