Best Python code snippet using localstack_python
test_lambda.py
Source:test_lambda.py  
...138    ]139    if use_docker()140    else [LAMBDA_RUNTIME_JAVA11]141)142def is_old_provider():143    return (144        os.environ.get("TEST_TARGET") != "AWS_CLOUD"145        and os.environ.get("PROVIDER_OVERRIDE_LAMBDA") != "asf"146    )147PROVIDED_TEST_RUNTIMES = [148    LAMBDA_RUNTIME_PROVIDED,149    # TODO remove skip once we use correct images150    pytest.param(151        LAMBDA_RUNTIME_PROVIDED_AL2,152        marks=pytest.mark.skipif(153            is_old_provider(), reason="curl missing in provided.al2 lambci image"154        ),155    ),156]157# Snapshot patterns158IGNORE_LOGSTREAM_ID: Pattern[str] = re.compile(159    r"\d{4}/\d{2}/\d{2}/\[((\$LATEST)|\d+)\][0-9a-f]{32}"160)161T = TypeVar("T")162def read_streams(payload: T) -> T:163    new_payload = {}164    for k, v in payload.items():165        if isinstance(v, Dict):166            new_payload[k] = read_streams(v)167        elif isinstance(v, StreamingBody):168            new_payload[k] = to_str(v.read())169        else:170            new_payload[k] = v171    return new_payload172@pytest.fixture173def check_lambda_logs(logs_client):174    def _check_logs(func_name: str, expected_lines: List[str] = None):175        if not expected_lines:176            expected_lines = []177        log_events = get_lambda_logs(func_name, logs_client=logs_client)178        log_messages = [e["message"] for e in log_events]179        for line in expected_lines:180            if ".*" in line:181                found = [re.match(line, m, flags=re.DOTALL) for m in log_messages]182                if any(found):183                    continue184            assert line in log_messages185    return _check_logs186def configure_snapshot_for_context(snapshot, function_name: str):187    """188    Utility function to configure snapshot to ignore a function name and log stream ids in its body.189    Helpful if invoke calls return the context object, in which they are present190    :param snapshot: Snapshot fixture result191    :param function_name: Function name of the current function192    """193    snapshot.register_replacement(IGNORE_LOGSTREAM_ID, "<log_stream_id>")194    snapshot.register_replacement(re.compile(function_name), "<function_name>")195# API only functions (no lambda execution itself)196class TestLambdaAPI:197    @pytest.mark.only_localstack198    def test_create_lambda_function(self, lambda_client):199        """Basic test that creates and deletes a Lambda function"""200        func_name = f"lambda_func-{short_uid()}"201        kms_key_arn = f"arn:{aws_stack.get_partition()}:kms:{aws_stack.get_region()}:{TEST_AWS_ACCOUNT_ID}:key11"202        vpc_config = {203            "SubnetIds": ["subnet-123456789"],204            "SecurityGroupIds": ["sg-123456789"],205        }206        tags = {"env": "testing"}207        kwargs = {208            "FunctionName": func_name,209            "Runtime": LAMBDA_RUNTIME_PYTHON37,210            "Handler": LAMBDA_DEFAULT_HANDLER,211            "Role": LAMBDA_TEST_ROLE,212            "KMSKeyArn": kms_key_arn,213            "Code": {214                "ZipFile": create_lambda_archive(215                    load_file(TEST_LAMBDA_PYTHON_ECHO), get_content=True216                )217            },218            "Timeout": 3,219            "VpcConfig": vpc_config,220            "Tags": tags,221            "Environment": {"Variables": {"foo": "bar"}},222        }223        result = lambda_client.create_function(**kwargs)224        function_arn = result["FunctionArn"]225        assert testutil.response_arn_matches_partition(lambda_client, function_arn)226        partial_function_arn = ":".join(function_arn.split(":")[3:])227        # Get function by Name, ARN and partial ARN228        for func_ref in [func_name, function_arn, partial_function_arn]:229            rs = lambda_client.get_function(FunctionName=func_ref)230            assert rs["Configuration"].get("KMSKeyArn", "") == kms_key_arn231            assert rs["Configuration"].get("VpcConfig", {}) == vpc_config232            assert rs["Tags"] == tags233        # clean up234        lambda_client.delete_function(FunctionName=func_name)235        with pytest.raises(Exception) as exc:236            lambda_client.delete_function(FunctionName=func_name)237        assert "ResourceNotFoundException" in str(exc)238    @pytest.mark.snapshot239    def test_add_lambda_permission_aws(240        self, lambda_client, iam_client, create_lambda_function, snapshot241    ):242        """Testing the add_permission call on lambda, by adding a new resource-based policy to a lambda function"""243        function_name = f"lambda_func-{short_uid()}"244        lambda_create_response = create_lambda_function(245            handler_file=TEST_LAMBDA_PYTHON_ECHO,246            func_name=function_name,247            runtime=LAMBDA_RUNTIME_PYTHON36,248        )249        lambda_arn = lambda_create_response["CreateFunctionResponse"]["FunctionArn"]250        snapshot.match("create_lambda", lambda_create_response)251        # create lambda permission252        action = "lambda:InvokeFunction"253        sid = "s3"254        principal = "s3.amazonaws.com"255        resp = lambda_client.add_permission(256            FunctionName=function_name,257            Action=action,258            StatementId=sid,259            Principal=principal,260            SourceArn=aws_stack.s3_bucket_arn("test-bucket"),261        )262        snapshot.match("add_permission", resp)263        # fetch lambda policy264        get_policy_result = lambda_client.get_policy(FunctionName=function_name)265        snapshot.match("get_policy", get_policy_result)266        assert lambda_arn == json.loads(get_policy_result["Policy"])["Statement"][0]["Resource"]267    # TODO permissions cannot be added to $LATEST268    @pytest.mark.skipif(269        not is_old_provider(), reason="test does not make valid assertions against AWS"270    )271    def test_add_lambda_permission(self, lambda_client, iam_client, create_lambda_function):272        function_name = f"lambda_func-{short_uid()}"273        lambda_create_response = create_lambda_function(274            handler_file=TEST_LAMBDA_PYTHON_ECHO,275            func_name=function_name,276            runtime=LAMBDA_RUNTIME_PYTHON36,277        )278        lambda_arn = lambda_create_response["CreateFunctionResponse"]["FunctionArn"]279        # create lambda permission280        action = "lambda:InvokeFunction"281        sid = "s3"282        principal = "s3.amazonaws.com"283        resp = lambda_client.add_permission(284            FunctionName=function_name,285            Action=action,286            StatementId=sid,287            Principal=principal,288            SourceArn=aws_stack.s3_bucket_arn("test-bucket"),289        )290        # fetch lambda policy291        policy = lambda_client.get_policy(FunctionName=function_name)["Policy"]292        assert isinstance(policy, str)293        policy = json.loads(to_str(policy))294        assert action == policy["Statement"][0]["Action"]295        assert sid == policy["Statement"][0]["Sid"]296        assert lambda_arn == policy["Statement"][0]["Resource"]297        assert principal == policy["Statement"][0]["Principal"]["Service"]298        assert (299            aws_stack.s3_bucket_arn("test-bucket")300            == policy["Statement"][0]["Condition"]["ArnLike"]["AWS:SourceArn"]301        )302        # fetch IAM policy303        # this is not a valid assertion in general (especially against AWS)304        policies = iam_client.list_policies(Scope="Local", MaxItems=500)["Policies"]305        policy_name = get_lambda_policy_name(function_name)306        matching = [p for p in policies if p["PolicyName"] == policy_name]307        assert len(matching) == 1308        assert ":policy/" in matching[0]["Arn"]309        # remove permission that we just added310        resp = lambda_client.remove_permission(311            FunctionName=function_name,312            StatementId=sid,313            Qualifier="qual1",314            RevisionId="r1",315        )316        assert 200 == resp["ResponseMetadata"]["HTTPStatusCode"]317    def test_remove_multi_permissions(self, lambda_client, create_lambda_function, snapshot):318        """Tests creation and subsequent removal of multiple permissions, including the changes in the policy"""319        function_name = f"lambda_func-{short_uid()}"320        create_lambda_function(321            handler_file=TEST_LAMBDA_PYTHON_ECHO,322            func_name=function_name,323            runtime=LAMBDA_RUNTIME_PYTHON36,324        )325        action = "lambda:InvokeFunction"326        sid = "s3"327        principal = "s3.amazonaws.com"328        permission_1_add = lambda_client.add_permission(329            FunctionName=function_name,330            Action=action,331            StatementId=sid,332            Principal=principal,333        )334        snapshot.match("add_permission_1", permission_1_add)335        sid_2 = "sqs"336        principal_2 = "sqs.amazonaws.com"337        permission_2_add = lambda_client.add_permission(338            FunctionName=function_name,339            Action=action,340            StatementId=sid_2,341            Principal=principal_2,342            SourceArn=aws_stack.s3_bucket_arn("test-bucket"),343        )344        snapshot.match("add_permission_2", permission_2_add)345        policy_response = lambda_client.get_policy(346            FunctionName=function_name,347        )348        snapshot.match("policy_after_2_add", policy_response)349        with pytest.raises(ClientError) as e:350            lambda_client.remove_permission(351                FunctionName=function_name,352                StatementId="non-existent",353            )354        assert e.value.response["Error"]["Code"] == "ResourceNotFoundException"355        lambda_client.remove_permission(356            FunctionName=function_name,357            StatementId=sid_2,358        )359        policy = json.loads(360            lambda_client.get_policy(361                FunctionName=function_name,362            )["Policy"]363        )364        snapshot.match("policy_after_removal", policy)365        assert policy["Statement"][0]["Sid"] == sid366        lambda_client.remove_permission(367            FunctionName=function_name,368            StatementId=sid,369        )370        with pytest.raises(ClientError) as ctx:371            lambda_client.get_policy(FunctionName=function_name)372        assert ctx.value.response["Error"]["Code"] == "ResourceNotFoundException"373    @pytest.mark.skipif(374        not is_old_provider(), reason="test does not make valid assertions against AWS"375    )376    def test_add_lambda_multiple_permission(377        self, iam_client, lambda_client, create_lambda_function378    ):379        """Test adding multiple permissions"""380        function_name = f"lambda_func-{short_uid()}"381        create_lambda_function(382            handler_file=TEST_LAMBDA_PYTHON_ECHO,383            func_name=function_name,384            runtime=LAMBDA_RUNTIME_PYTHON36,385        )386        # create lambda permissions387        action = "lambda:InvokeFunction"388        principal = "s3.amazonaws.com"389        statement_ids = ["s4", "s5"]390        for sid in statement_ids:391            resp = lambda_client.add_permission(392                FunctionName=function_name,393                Action=action,394                StatementId=sid,395                Principal=principal,396                SourceArn=aws_stack.s3_bucket_arn("test-bucket"),397            )398            assert "Statement" in resp399        # fetch IAM policy400        # this is not a valid assertion in general (especially against AWS)401        policies = iam_client.list_policies(Scope="Local", MaxItems=500)["Policies"]402        policy_name = get_lambda_policy_name(function_name)403        matching = [p for p in policies if p["PolicyName"] == policy_name]404        assert 1 == len(matching)405        assert ":policy/" in matching[0]["Arn"]406        # validate both statements407        policy = matching[0]408        versions = iam_client.list_policy_versions(PolicyArn=policy["Arn"])["Versions"]409        assert 1 == len(versions)410        statements = versions[0]["Document"]["Statement"]411        for i in range(len(statement_ids)):412            assert action == statements[i]["Action"]413            assert lambda_api.func_arn(function_name) == statements[i]["Resource"]414            assert principal == statements[i]["Principal"]["Service"]415            assert (416                aws_stack.s3_bucket_arn("test-bucket")417                == statements[i]["Condition"]["ArnLike"]["AWS:SourceArn"]418            )419            # check statement_ids in reverse order420            assert statement_ids[abs(i - 1)] == statements[i]["Sid"]421        # remove permission that we just added422        resp = lambda_client.remove_permission(423            FunctionName=function_name,424            StatementId=sid,425            Qualifier="qual1",426            RevisionId="r1",427        )428        assert 200 == resp["ResponseMetadata"]["HTTPStatusCode"]429    @pytest.mark.snapshot430    def test_lambda_asynchronous_invocations(431        self,432        lambda_client,433        create_lambda_function,434        sqs_queue,435        sqs_queue_arn,436        lambda_su_role,437        snapshot,438    ):439        """Testing API actions of function event config"""440        function_name = f"lambda_func-{short_uid()}"441        create_lambda_function(442            handler_file=TEST_LAMBDA_PYTHON_ECHO,443            func_name=function_name,444            runtime=LAMBDA_RUNTIME_PYTHON36,445            role=lambda_su_role,446        )447        queue_arn = sqs_queue_arn(sqs_queue)448        destination_config = {449            "OnSuccess": {"Destination": queue_arn},450            "OnFailure": {"Destination": queue_arn},451        }452        # adding event invoke config453        response = lambda_client.put_function_event_invoke_config(454            FunctionName=function_name,455            MaximumRetryAttempts=2,456            MaximumEventAgeInSeconds=123,457            DestinationConfig=destination_config,458        )459        snapshot.match("put_function_event_invoke_config", response)460        # over writing event invoke config461        response = lambda_client.put_function_event_invoke_config(462            FunctionName=function_name,463            MaximumRetryAttempts=2,464            DestinationConfig=destination_config,465        )466        snapshot.match("put_function_event_invoke_config_overwritemaxeventage", response)467        # updating event invoke config468        response = lambda_client.update_function_event_invoke_config(469            FunctionName=function_name,470            MaximumRetryAttempts=1,471        )472        snapshot.match("put_function_event_invoke_config_maxattempt1", response)473        # clean up474        lambda_client.delete_function_event_invoke_config(FunctionName=function_name)475    def test_function_concurrency(self, lambda_client, create_lambda_function, snapshot):476        """Testing the api of the put function concurrency action"""477        function_name = f"lambda_func-{short_uid()}"478        create_lambda_function(479            handler_file=TEST_LAMBDA_PYTHON_ECHO,480            func_name=function_name,481            runtime=LAMBDA_RUNTIME_PYTHON36,482        )483        response = lambda_client.put_function_concurrency(484            FunctionName=function_name, ReservedConcurrentExecutions=123485        )486        snapshot.match("put_function_concurrency", response)487        assert "ReservedConcurrentExecutions" in response488        response = lambda_client.get_function_concurrency(FunctionName=function_name)489        snapshot.match("get_function_concurrency", response)490        assert "ReservedConcurrentExecutions" in response491        lambda_client.delete_function_concurrency(FunctionName=function_name)492    def test_function_code_signing_config(self, lambda_client, create_lambda_function, snapshot):493        """Testing the API of code signing config"""494        function_name = f"lambda_func-{short_uid()}"495        create_lambda_function(496            handler_file=TEST_LAMBDA_PYTHON_ECHO,497            func_name=function_name,498            runtime=LAMBDA_RUNTIME_PYTHON36,499        )500        response = lambda_client.create_code_signing_config(501            Description="Testing CodeSigning Config",502            AllowedPublishers={503                "SigningProfileVersionArns": [504                    f"arn:aws:signer:{aws_stack.get_region()}:000000000000:/signing-profiles/test",505                ]506            },507            CodeSigningPolicies={"UntrustedArtifactOnDeployment": "Enforce"},508        )509        snapshot.replace_value(re.compile(r"^csc-[0-9a-f]{17}$"), "<csc-id>")510        snapshot.match("create_code_signing_config", response)511        assert "Description" in response["CodeSigningConfig"]512        assert "SigningProfileVersionArns" in response["CodeSigningConfig"]["AllowedPublishers"]513        assert (514            "UntrustedArtifactOnDeployment" in response["CodeSigningConfig"]["CodeSigningPolicies"]515        )516        code_signing_arn = response["CodeSigningConfig"]["CodeSigningConfigArn"]517        response = lambda_client.update_code_signing_config(518            CodeSigningConfigArn=code_signing_arn,519            CodeSigningPolicies={"UntrustedArtifactOnDeployment": "Warn"},520        )521        snapshot.match("update_code_signing_config", response)522        assert (523            "Warn"524            == response["CodeSigningConfig"]["CodeSigningPolicies"]["UntrustedArtifactOnDeployment"]525        )526        response = lambda_client.get_code_signing_config(CodeSigningConfigArn=code_signing_arn)527        assert 200 == response["ResponseMetadata"]["HTTPStatusCode"]528        snapshot.match("get_code_signing_config", response)529        response = lambda_client.put_function_code_signing_config(530            CodeSigningConfigArn=code_signing_arn, FunctionName=function_name531        )532        assert 200 == response["ResponseMetadata"]["HTTPStatusCode"]533        snapshot.match("put_function_code_signing_config", response)534        response = lambda_client.get_function_code_signing_config(FunctionName=function_name)535        assert 200 == response["ResponseMetadata"]["HTTPStatusCode"]536        snapshot.match("get_function_code_signing_config", response)537        assert code_signing_arn == response["CodeSigningConfigArn"]538        assert function_name == response["FunctionName"]539        response = lambda_client.delete_function_code_signing_config(FunctionName=function_name)540        assert 204 == response["ResponseMetadata"]["HTTPStatusCode"]541        response = lambda_client.delete_code_signing_config(CodeSigningConfigArn=code_signing_arn)542        assert 204 == response["ResponseMetadata"]["HTTPStatusCode"]543    def create_multiple_lambda_permissions(self, lambda_client, create_lambda_function, snapshot):544        """Test creating multiple lambda permissions and checking the policy"""545        function_name = f"test-function-{short_uid()}"546        create_lambda_function(547            funct_name=function_name,548            runtime=LAMBDA_RUNTIME_PYTHON37,549            libs=TEST_LAMBDA_LIBS,550        )551        action = "lambda:InvokeFunction"552        sid = "logs"553        resp = lambda_client.add_permission(554            FunctionName=function_name,555            Action=action,556            StatementId=sid,557            Principal="logs.amazonaws.com",558        )559        snapshot.match("add_permission_response_1", resp)560        assert "Statement" in resp561        sid = "kinesis"562        resp = lambda_client.add_permission(563            FunctionName=function_name,564            Action=action,565            StatementId=sid,566            Principal="kinesis.amazonaws.com",567        )568        snapshot.match("add_permission_response_2", resp)569        assert "Statement" in resp570        policy_response = lambda_client.get_policy(571            FunctionName=function_name,572        )573        snapshot.match("policy_after_2_add", policy_response)574class TestLambdaBaseFeatures:575    def test_dead_letter_queue(576        self,577        lambda_client,578        create_lambda_function,579        sqs_client,580        sqs_create_queue,581        sqs_queue_arn,582        lambda_su_role,583        snapshot,584    ):585        """Creates a lambda with a defined dead letter queue, and check failed lambda invocation leads to a message"""586        # create DLQ and Lambda function587        queue_name = f"test-{short_uid()}"588        lambda_name = f"test-{short_uid()}"589        queue_url = sqs_create_queue(QueueName=queue_name)590        queue_arn = sqs_queue_arn(queue_url)591        create_lambda_response = create_lambda_function(592            handler_file=TEST_LAMBDA_PYTHON,593            func_name=lambda_name,594            libs=TEST_LAMBDA_LIBS,595            runtime=LAMBDA_RUNTIME_PYTHON36,596            DeadLetterConfig={"TargetArn": queue_arn},597            role=lambda_su_role,598        )599        snapshot.match("create_lambda_with_dlq", create_lambda_response)600        snapshot.skip_key(re.compile("ReceiptHandle"), "<receipt-handle>")601        snapshot.skip_key(re.compile("MD5Of.*"), "<md5-hash>")602        # invoke Lambda, triggering an error603        payload = {lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1}604        lambda_client.invoke(605            FunctionName=lambda_name,606            Payload=json.dumps(payload),607            InvocationType="Event",608        )609        # assert that message has been received on the DLQ610        def receive_dlq():611            result = sqs_client.receive_message(QueueUrl=queue_url, MessageAttributeNames=["All"])612            assert len(result["Messages"]) > 0613            msg_attrs = result["Messages"][0]["MessageAttributes"]614            assert "RequestID" in msg_attrs615            assert "ErrorCode" in msg_attrs616            assert "ErrorMessage" in msg_attrs617            snapshot.match("sqs_dlq_message", result)618        # on AWS, event retries can be quite delayed, so we have to wait up to 6 minutes here, potential flakes619        retry(receive_dlq, retries=120, sleep=3)620        # update DLQ config621        update_function_config_response = lambda_client.update_function_configuration(622            FunctionName=lambda_name, DeadLetterConfig={}623        )624        snapshot.match("delete_dlq", update_function_config_response)625        # invoke Lambda again, assert that status code is 200 and error details contained in the payload626        result = lambda_client.invoke(627            FunctionName=lambda_name, Payload=json.dumps(payload), LogType="Tail"628        )629        result = read_streams(result)630        payload = json.loads(to_str(result["Payload"]))631        snapshot.match("result_payload", payload)632        assert 200 == result["StatusCode"]633        assert "Unhandled" == result["FunctionError"]634        assert "$LATEST" == result["ExecutedVersion"]635        assert "Test exception" in payload["errorMessage"]636        assert "Exception" in payload["errorType"]637        assert isinstance(payload["stackTrace"], list)638        log_result = result.get("LogResult")639        assert log_result640        logs = to_str(base64.b64decode(to_str(log_result)))641        assert "START" in logs642        assert "Test exception" in logs643        assert "END" in logs644        assert "REPORT" in logs645    @pytest.mark.parametrize(646        "condition,payload",647        [648            ("Success", {}),649            ("RetriesExhausted", {lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1}),650        ],651    )652    def test_assess_lambda_destination_invocation(653        self,654        condition,655        payload,656        lambda_client,657        sqs_client,658        create_lambda_function,659        sqs_create_queue,660        sqs_queue_arn,661        lambda_su_role,662        snapshot,663    ):664        """Testing the destination config API and operation (for the OnSuccess case)"""665        # create DLQ and Lambda function666        queue_name = f"test-{short_uid()}"667        lambda_name = f"test-{short_uid()}"668        queue_url = sqs_create_queue(QueueName=queue_name)669        queue_arn = sqs_queue_arn(queue_url)670        create_lambda_function(671            handler_file=TEST_LAMBDA_PYTHON,672            func_name=lambda_name,673            libs=TEST_LAMBDA_LIBS,674            role=lambda_su_role,675        )676        put_event_invoke_config_response = lambda_client.put_function_event_invoke_config(677            FunctionName=lambda_name,678            DestinationConfig={679                "OnSuccess": {"Destination": queue_arn},680                "OnFailure": {"Destination": queue_arn},681            },682        )683        snapshot.match("put_function_event_invoke_config", put_event_invoke_config_response)684        snapshot.skip_key(re.compile("ReceiptHandle"), "<receipt-handle>")685        snapshot.skip_key(re.compile("MD5Of.*"), "<md5-hash>")686        lambda_client.invoke(687            FunctionName=lambda_name,688            Payload=json.dumps(payload),689            InvocationType="Event",690        )691        configure_snapshot_for_context(snapshot, lambda_name)692        def receive_message():693            rs = sqs_client.receive_message(QueueUrl=queue_url, MessageAttributeNames=["All"])694            assert len(rs["Messages"]) > 0695            msg = rs["Messages"][0]["Body"]696            msg = json.loads(msg)697            assert condition == msg["requestContext"]["condition"]698            snapshot.match("destination_message", rs)699        retry(receive_message, retries=120, sleep=3)700    def test_large_payloads(self, caplog, lambda_client, create_lambda_function, snapshot):701        """Testing large payloads sent to lambda functions (~5MB)"""702        # Set the loglevel to INFO for this test to avoid breaking a CI environment (due to excessive log outputs)703        caplog.set_level(logging.INFO)704        function_name = f"large_payload-{short_uid()}"705        create_lambda_function(706            handler_file=TEST_LAMBDA_PYTHON_ECHO,707            func_name=function_name,708            runtime=LAMBDA_RUNTIME_PYTHON36,709        )710        payload = {"test": "test123456" * 100 * 1000 * 5}  # 5MB payload711        payload_bytes = to_bytes(json.dumps(payload))712        result = lambda_client.invoke(FunctionName=function_name, Payload=payload_bytes)713        result = read_streams(result)714        snapshot.match("invocation_response", result)715        assert 200 == result["ResponseMetadata"]["HTTPStatusCode"]716        result_data = result["Payload"]717        result_data = json.loads(to_str(result_data))718        assert payload == result_data719parametrize_python_runtimes = pytest.mark.parametrize(720    "runtime",721    PYTHON_TEST_RUNTIMES,722)723class TestPythonRuntimes:724    @pytest.fixture(725        params=PYTHON_TEST_RUNTIMES,726    )727    def python_function_name(self, request, lambda_client, create_lambda_function):728        function_name = f"python-test-function-{short_uid()}"729        create_lambda_function(730            func_name=function_name,731            handler_file=TEST_LAMBDA_PYTHON,732            libs=TEST_LAMBDA_LIBS,733            runtime=request.param,734        )735        return function_name736    def test_invocation_type_not_set(self, lambda_client, python_function_name, snapshot):737        """Test invocation of a lambda with no invocation type set, but LogType="Tail""" ""738        result = lambda_client.invoke(739            FunctionName=python_function_name, Payload=b"{}", LogType="Tail"740        )741        result = read_streams(result)742        snapshot.skip_key(re.compile("LogResult"), "<log_result>")743        configure_snapshot_for_context(snapshot, python_function_name)744        snapshot.match("invoke", result)745        result_data = json.loads(result["Payload"])746        # assert response details747        assert 200 == result["StatusCode"]748        assert {} == result_data["event"]749        # assert that logs are contained in response750        logs = result.get("LogResult", "")751        logs = to_str(base64.b64decode(to_str(logs)))752        snapshot.register_replacement(753            re.compile(r"Duration: \d+(\.\d{2})? ms"), "Duration: <duration> ms"754        )755        snapshot.register_replacement(re.compile(r"Used: \d+ MB"), "Used: <memory> MB")756        snapshot.match("logs", {"logs": logs})757        assert "START" in logs758        assert "Lambda log message" in logs759        assert "END" in logs760        assert "REPORT" in logs761    def test_invocation_type_request_response(self, lambda_client, python_function_name, snapshot):762        """Test invocation with InvocationType RequestResponse explicitely set"""763        result = lambda_client.invoke(764            FunctionName=python_function_name,765            Payload=b"{}",766            InvocationType="RequestResponse",767        )768        result = read_streams(result)769        configure_snapshot_for_context(snapshot, python_function_name)770        snapshot.match("invoke-result", result)771        result_data = result["Payload"]772        result_data = json.loads(result_data)773        assert "application/json" == result["ResponseMetadata"]["HTTPHeaders"]["content-type"]774        assert 200 == result["StatusCode"]775        assert isinstance(result_data, dict)776    def test_invocation_type_event(self, lambda_client, python_function_name, snapshot):777        """Check invocation response for type event"""778        result = lambda_client.invoke(779            FunctionName=python_function_name, Payload=b"{}", InvocationType="Event"780        )781        result = read_streams(result)782        snapshot.match("invoke-result", result)783        assert 202 == result["StatusCode"]784    def test_invocation_type_dry_run(self, lambda_client, python_function_name, snapshot):785        """Check invocation response for type dryrun"""786        result = lambda_client.invoke(787            FunctionName=python_function_name, Payload=b"{}", InvocationType="DryRun"788        )789        result = read_streams(result)790        snapshot.match("invoke-result", result)791        assert 204 == result["StatusCode"]792    @parametrize_python_runtimes793    def test_lambda_environment(self, lambda_client, create_lambda_function, runtime, snapshot):794        """Tests invoking a lambda function with environment variables set on creation"""795        function_name = f"env-test-function-{short_uid()}"796        env_vars = {"Hello": "World"}797        creation_result = create_lambda_function(798            handler_file=TEST_LAMBDA_ENV,799            libs=TEST_LAMBDA_LIBS,800            func_name=function_name,801            envvars=env_vars,802            runtime=runtime,803        )804        snapshot.match("creation-result", creation_result)805        # invoke function and assert result contains env vars806        result = lambda_client.invoke(FunctionName=function_name, Payload=b"{}")807        result = read_streams(result)808        snapshot.match("invocation-result", result)809        result_data = result["Payload"]810        assert 200 == result["StatusCode"]811        assert json.loads(result_data) == env_vars812        # get function config and assert result contains env vars813        result = lambda_client.get_function_configuration(FunctionName=function_name)814        snapshot.match("get-configuration-result", result)815        assert result["Environment"] == {"Variables": env_vars}816    @parametrize_python_runtimes817    def test_invocation_with_qualifier(818        self,819        lambda_client,820        s3_client,821        s3_bucket,822        runtime,823        check_lambda_logs,824        lambda_su_role,825        wait_until_lambda_ready,826        snapshot,827    ):828        """Tests invocation of python lambda with a given qualifier"""829        function_name = f"test_lambda_{short_uid()}"830        bucket_key = "test_lambda.zip"831        # upload zip file to S3832        zip_file = create_lambda_archive(833            load_file(TEST_LAMBDA_PYTHON), get_content=True, libs=TEST_LAMBDA_LIBS, runtime=runtime834        )835        s3_client.upload_fileobj(BytesIO(zip_file), s3_bucket, bucket_key)836        # create lambda function837        response = lambda_client.create_function(838            FunctionName=function_name,839            Runtime=runtime,840            Role=lambda_su_role,841            Publish=True,842            Handler="handler.handler",843            Code={"S3Bucket": s3_bucket, "S3Key": bucket_key},844            Timeout=10,845        )846        snapshot.match("creation-response", response)847        configure_snapshot_for_context(snapshot, function_name)848        assert "Version" in response849        qualifier = response["Version"]850        wait_until_lambda_ready(function_name=function_name, qualifier=qualifier)851        # invoke lambda function852        data_before = b'{"foo": "bar with \'quotes\\""}'853        result = lambda_client.invoke(854            FunctionName=function_name, Payload=data_before, Qualifier=qualifier855        )856        result = read_streams(result)857        snapshot.match("invocation-response", result)858        data_after = json.loads(result["Payload"])859        assert json.loads(to_str(data_before)) == data_after["event"]860        context = data_after["context"]861        assert response["Version"] == context["function_version"]862        assert context.get("aws_request_id")863        assert function_name == context["function_name"]864        assert f"/aws/lambda/{function_name}" == context["log_group_name"]865        assert context.get("log_stream_name")866        assert context.get("memory_limit_in_mb")867        # assert that logs are present868        expected = [".*Lambda log message - print function.*"]869        if use_docker():870            # Note that during regular test execution, nosetests captures the output from871            # the logging module - hence we can only expect this when running in Docker872            expected.append(".*Lambda log message - logging module.*")873        def check_logs():874            check_lambda_logs(function_name, expected_lines=expected)875        retry(check_logs, retries=10)876        lambda_client.delete_function(FunctionName=function_name)877    @parametrize_python_runtimes878    def test_upload_lambda_from_s3(879        self,880        lambda_client,881        s3_client,882        s3_bucket,883        runtime,884        lambda_su_role,885        wait_until_lambda_ready,886        snapshot,887    ):888        """Test invocation of a python lambda with its deployment package uploaded to s3"""889        function_name = f"test_lambda_{short_uid()}"890        bucket_key = "test_lambda.zip"891        # upload zip file to S3892        zip_file = testutil.create_lambda_archive(893            load_file(TEST_LAMBDA_PYTHON), get_content=True, libs=TEST_LAMBDA_LIBS, runtime=runtime894        )895        s3_client.upload_fileobj(BytesIO(zip_file), s3_bucket, bucket_key)896        # create lambda function897        create_response = lambda_client.create_function(898            FunctionName=function_name,899            Runtime=runtime,900            Handler="handler.handler",901            Role=lambda_su_role,902            Code={"S3Bucket": s3_bucket, "S3Key": bucket_key},903            Timeout=10,904        )905        snapshot.match("creation-response", create_response)906        configure_snapshot_for_context(snapshot, function_name)907        wait_until_lambda_ready(function_name=function_name)908        # invoke lambda function909        data_before = b'{"foo": "bar with \'quotes\\""}'910        result = lambda_client.invoke(FunctionName=function_name, Payload=data_before)911        result = read_streams(result)912        snapshot.match("invocation-response", result)913        data_after = json.loads(result["Payload"])914        assert json.loads(to_str(data_before)) == data_after["event"]915        context = data_after["context"]916        assert "$LATEST" == context["function_version"]917        assert function_name == context["function_name"]918        # clean up919        lambda_client.delete_function(FunctionName=function_name)920    @parametrize_python_runtimes921    def test_handler_in_submodule(self, lambda_client, create_lambda_function, runtime):922        """Test invocation of a lambda handler which resides in a submodule (= not root module)"""923        function_name = f"test-function-{short_uid()}"924        zip_file = testutil.create_lambda_archive(925            load_file(TEST_LAMBDA_PYTHON),926            get_content=True,927            libs=TEST_LAMBDA_LIBS,928            runtime=runtime,929            file_name="localstack_package/def/main.py",930        )931        create_lambda_function(932            func_name=function_name,933            zip_file=zip_file,934            handler="localstack_package.def.main.handler",935            runtime=runtime,936        )937        # invoke function and assert result938        result = lambda_client.invoke(FunctionName=function_name, Payload=b"{}")939        result_data = json.loads(result["Payload"].read())940        assert 200 == result["StatusCode"]941        assert json.loads("{}") == result_data["event"]942    @parametrize_python_runtimes943    def test_lambda_send_message_to_sqs(944        self,945        lambda_client,946        create_lambda_function,947        sqs_client,948        sqs_create_queue,949        runtime,950        lambda_su_role,951    ):952        """Send sqs message to sqs queue inside python lambda"""953        function_name = f"test-function-{short_uid()}"954        queue_name = f"lambda-queue-{short_uid()}"955        queue_url = sqs_create_queue(QueueName=queue_name)956        create_lambda_function(957            handler_file=TEST_LAMBDA_SEND_MESSAGE_FILE,958            func_name=function_name,959            runtime=runtime,960            role=lambda_su_role,961        )962        event = {963            "message": f"message-from-test-lambda-{short_uid()}",964            "queue_name": queue_name,965            "region_name": sqs_client.meta.region_name,966        }967        lambda_client.invoke(FunctionName=function_name, Payload=json.dumps(event))968        # assert that message has been received on the Queue969        def receive_message():970            rs = sqs_client.receive_message(QueueUrl=queue_url, MessageAttributeNames=["All"])971            assert len(rs["Messages"]) > 0972            return rs["Messages"][0]973        message = retry(receive_message, retries=15, sleep=2)974        assert event["message"] == message["Body"]975    @parametrize_python_runtimes976    def test_lambda_put_item_to_dynamodb(977        self,978        lambda_client,979        create_lambda_function,980        dynamodb_create_table,981        runtime,982        dynamodb_resource,983        lambda_su_role,984        dynamodb_client,985    ):986        """Put item into dynamodb from python lambda"""987        table_name = f"ddb-table-{short_uid()}"988        function_name = f"test-function-{short_uid()}"989        dynamodb_create_table(table_name=table_name, partition_key="id")990        create_lambda_function(991            handler_file=TEST_LAMBDA_PUT_ITEM_FILE,992            func_name=function_name,993            runtime=runtime,994            role=lambda_su_role,995        )996        data = {short_uid(): f"data-{i}" for i in range(3)}997        event = {998            "table_name": table_name,999            "region_name": dynamodb_client.meta.region_name,1000            "items": [{"id": k, "data": v} for k, v in data.items()],1001        }1002        def wait_for_table_created():1003            return (1004                dynamodb_client.describe_table(TableName=table_name)["Table"]["TableStatus"]1005                == "ACTIVE"1006            )1007        assert poll_condition(wait_for_table_created, timeout=30)1008        lambda_client.invoke(FunctionName=function_name, Payload=json.dumps(event))1009        rs = dynamodb_resource.Table(table_name).scan()1010        items = rs["Items"]1011        assert len(items) == len(data.keys())1012        for item in items:1013            assert data[item["id"]] == item["data"]1014    @parametrize_python_runtimes1015    def test_lambda_start_stepfunctions_execution(1016        self, lambda_client, stepfunctions_client, create_lambda_function, runtime, lambda_su_role1017    ):1018        """Start stepfunctions machine execution from lambda"""1019        function_name = f"test-function-{short_uid()}"1020        resource_lambda_name = f"test-resource-{short_uid()}"1021        state_machine_name = f"state-machine-{short_uid()}"1022        create_lambda_function(1023            handler_file=TEST_LAMBDA_START_EXECUTION_FILE,1024            func_name=function_name,1025            runtime=runtime,1026            role=lambda_su_role,1027        )1028        resource_lambda_arn = create_lambda_function(1029            handler_file=TEST_LAMBDA_PYTHON_ECHO,1030            func_name=resource_lambda_name,1031            runtime=runtime,1032            role=lambda_su_role,1033        )["CreateFunctionResponse"]["FunctionArn"]1034        state_machine_def = {1035            "StartAt": "step1",1036            "States": {1037                "step1": {1038                    "Type": "Task",1039                    "Resource": resource_lambda_arn,1040                    "ResultPath": "$.result_value",1041                    "End": True,1042                }1043            },1044        }1045        rs = stepfunctions_client.create_state_machine(1046            name=state_machine_name,1047            definition=json.dumps(state_machine_def),1048            roleArn=lambda_su_role,1049        )1050        sm_arn = rs["stateMachineArn"]1051        try:1052            lambda_client.invoke(1053                FunctionName=function_name,1054                Payload=json.dumps(1055                    {1056                        "state_machine_arn": sm_arn,1057                        "region_name": stepfunctions_client.meta.region_name,1058                        "input": {},1059                    }1060                ),1061            )1062            time.sleep(1)1063            rs = stepfunctions_client.list_executions(stateMachineArn=sm_arn)1064            # assert that state machine get executed 1 time1065            assert 1 == len([ex for ex in rs["executions"] if ex["stateMachineArn"] == sm_arn])1066        finally:1067            # clean up1068            stepfunctions_client.delete_state_machine(stateMachineArn=sm_arn)1069    @pytest.mark.skipif(1070        not use_docker(), reason="Test for docker python runtimes not applicable if run locally"1071    )1072    @parametrize_python_runtimes1073    def test_python_runtime_correct_versions(self, lambda_client, create_lambda_function, runtime):1074        """Test different versions of python runtimes to report back the correct python version"""1075        function_name = f"test_python_executor_{short_uid()}"1076        create_lambda_function(1077            func_name=function_name,1078            handler_file=TEST_LAMBDA_PYTHON_VERSION,1079            runtime=runtime,1080        )1081        result = lambda_client.invoke(1082            FunctionName=function_name,1083            Payload=b"{}",1084        )1085        result = json.loads(to_str(result["Payload"].read()))1086        assert result["version"] == runtime1087    @pytest.mark.skipif(1088        not use_docker(), reason="Test for docker python runtimes not applicable if run locally"1089    )1090    @parametrize_python_runtimes1091    def test_python_runtime_unhandled_errors(1092        self, lambda_client, create_lambda_function, runtime, snapshot1093    ):1094        """Test unhandled errors during python lambda invocation"""1095        function_name = f"test_python_executor_{short_uid()}"1096        creation_response = create_lambda_function(1097            func_name=function_name,1098            handler_file=TEST_LAMBDA_PYTHON_UNHANDLED_ERROR,1099            runtime=runtime,1100        )1101        snapshot.match("creation_response", creation_response)1102        result = lambda_client.invoke(1103            FunctionName=function_name,1104            Payload=b"{}",1105        )1106        result = read_streams(result)1107        snapshot.match("invocation_response", result)1108        assert result["StatusCode"] == 2001109        assert result["ExecutedVersion"] == "$LATEST"1110        assert result["FunctionError"] == "Unhandled"1111        payload = json.loads(result["Payload"])1112        assert payload["errorType"] == "CustomException"1113        assert payload["errorMessage"] == "some error occurred"1114        assert "stackTrace" in payload1115        if (1116            runtime == "python3.9" and not is_old_provider()1117        ):  # TODO: remove this after the legacy provider is gone1118            assert "requestId" in payload1119        else:1120            assert "requestId" not in payload1121parametrize_node_runtimes = pytest.mark.parametrize(1122    "runtime",1123    NODE_TEST_RUNTIMES,1124)1125class TestNodeJSRuntimes:1126    @pytest.mark.skipif(1127        not use_docker(), reason="Test for docker nodejs runtimes not applicable if run locally"1128    )1129    @parametrize_node_runtimes1130    def test_nodejs_lambda_with_context(1131        self, lambda_client, create_lambda_function, runtime, check_lambda_logs, snapshot1132    ):1133        """Test context of nodejs lambda invocation"""1134        function_name = f"test-function-{short_uid()}"1135        creation_response = create_lambda_function(1136            func_name=function_name,1137            handler_file=TEST_LAMBDA_INTEGRATION_NODEJS,1138            handler="lambda_integration.handler",1139            runtime=runtime,1140        )1141        snapshot.match("creation", creation_response)1142        ctx = {1143            "custom": {"foo": "bar"},1144            "client": {"snap": ["crackle", "pop"]},1145            "env": {"fizz": "buzz"},1146        }1147        configure_snapshot_for_context(snapshot, function_name)1148        result = lambda_client.invoke(1149            FunctionName=function_name,1150            Payload=b"{}",1151            ClientContext=to_str(base64.b64encode(to_bytes(json.dumps(ctx)))),1152        )1153        result = read_streams(result)1154        snapshot.match("invocation", result)1155        result_data = result["Payload"]1156        assert 200 == result["StatusCode"]1157        client_context = json.loads(result_data)["context"]["clientContext"]1158        # TODO in the old provider, for some reason this is necessary. That is invalid behavior1159        if is_old_provider():1160            client_context = json.loads(client_context)1161        assert "bar" == client_context.get("custom").get("foo")1162        # assert that logs are present1163        expected = [".*Node.js Lambda handler executing."]1164        def check_logs():1165            check_lambda_logs(function_name, expected_lines=expected)1166        retry(check_logs, retries=15)1167    @parametrize_node_runtimes1168    def test_invoke_nodejs_lambda(1169        self, lambda_client, create_lambda_function, runtime, logs_client, snapshot1170    ):1171        """Test simple nodejs lambda invocation"""1172        function_name = f"test-function-{short_uid()}"1173        result = create_lambda_function(...test_lambda_integration.py
Source:test_lambda_integration.py  
...83            partition_key="id",84            stream_view_type="NEW_IMAGE",85        )["TableDescription"]86        # table ARNs are not sufficient as event source, needs to be a dynamodb stream arn87        if not is_old_provider():88            with pytest.raises(ClientError) as e:89                lambda_client.create_event_source_mapping(90                    EventSourceArn=table_description["TableArn"],91                    FunctionName=function_name,92                    StartingPosition="LATEST",93                )94            e.match(INVALID_PARAMETER_VALUE_EXCEPTION)95        # check if event source mapping can be created with latest stream ARN96        rs = lambda_client.create_event_source_mapping(97            EventSourceArn=table_description["LatestStreamArn"],98            FunctionName=function_name,99            StartingPosition="LATEST",100        )101        assert BATCH_SIZE_RANGES["dynamodb"][0] == rs["BatchSize"]102    def test_disabled_event_source_mapping_with_dynamodb(103        self,104        create_lambda_function,105        lambda_client,106        dynamodb_resource,107        dynamodb_client,108        dynamodb_create_table,109        logs_client,110        dynamodbstreams_client,111        lambda_su_role,112    ):113        function_name = f"lambda_func-{short_uid()}"114        ddb_table = f"ddb_table-{short_uid()}"115        create_lambda_function(116            func_name=function_name,117            handler_file=TEST_LAMBDA_PYTHON_ECHO,118            runtime=LAMBDA_RUNTIME_PYTHON36,119            role=lambda_su_role,120        )121        latest_stream_arn = dynamodb_create_table(122            table_name=ddb_table, partition_key="id", stream_view_type="NEW_IMAGE"123        )["TableDescription"]["LatestStreamArn"]124        rs = lambda_client.create_event_source_mapping(125            FunctionName=function_name,126            EventSourceArn=latest_stream_arn,127            StartingPosition="TRIM_HORIZON",128            MaximumBatchingWindowInSeconds=1,129        )130        uuid = rs["UUID"]131        def wait_for_table_created():132            return (133                dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]134                == "ACTIVE"135            )136        assert poll_condition(wait_for_table_created, timeout=30)137        def wait_for_stream_created():138            return (139                dynamodbstreams_client.describe_stream(StreamArn=latest_stream_arn)[140                    "StreamDescription"141                ]["StreamStatus"]142                == "ENABLED"143            )144        assert poll_condition(wait_for_stream_created, timeout=30)145        table = dynamodb_resource.Table(ddb_table)146        items = [147            {"id": short_uid(), "data": "data1"},148            {"id": short_uid(), "data": "data2"},149        ]150        table.put_item(Item=items[0])151        def assert_events():152            events = get_lambda_log_events(function_name, logs_client=logs_client)153            # lambda was invoked 1 time154            assert 1 == len(events[0]["Records"])155        # might take some time against AWS156        retry(assert_events, sleep=3, retries=10)157        # disable event source mapping158        lambda_client.update_event_source_mapping(UUID=uuid, Enabled=False)159        table.put_item(Item=items[1])160        events = get_lambda_log_events(function_name, logs_client=logs_client)161        # lambda no longer invoked, still have 1 event162        assert 1 == len(events[0]["Records"])163    # TODO invalid test against AWS, this behavior just is not correct164    def test_deletion_event_source_mapping_with_dynamodb(165        self, create_lambda_function, lambda_client, dynamodb_client, lambda_su_role166    ):167        function_name = f"lambda_func-{short_uid()}"168        ddb_table = f"ddb_table-{short_uid()}"169        create_lambda_function(170            func_name=function_name,171            handler_file=TEST_LAMBDA_PYTHON_ECHO,172            runtime=LAMBDA_RUNTIME_PYTHON36,173            role=lambda_su_role,174        )175        latest_stream_arn = aws_stack.create_dynamodb_table(176            table_name=ddb_table,177            partition_key="id",178            client=dynamodb_client,179            stream_view_type="NEW_IMAGE",180        )["TableDescription"]["LatestStreamArn"]181        lambda_client.create_event_source_mapping(182            FunctionName=function_name,183            EventSourceArn=latest_stream_arn,184            StartingPosition="TRIM_HORIZON",185        )186        def wait_for_table_created():187            return (188                dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]189                == "ACTIVE"190            )191        assert poll_condition(wait_for_table_created, timeout=30)192        dynamodb_client.delete_table(TableName=ddb_table)193        result = lambda_client.list_event_source_mappings(EventSourceArn=latest_stream_arn)194        assert 1 == len(result["EventSourceMappings"])195    def test_event_source_mapping_with_sqs(196        self,197        create_lambda_function,198        lambda_client,199        sqs_client,200        sqs_create_queue,201        sqs_queue_arn,202        logs_client,203        lambda_su_role,204    ):205        function_name = f"lambda_func-{short_uid()}"206        queue_name_1 = f"queue-{short_uid()}-1"207        create_lambda_function(208            func_name=function_name,209            handler_file=TEST_LAMBDA_PYTHON_ECHO,210            runtime=LAMBDA_RUNTIME_PYTHON36,211            role=lambda_su_role,212        )213        queue_url_1 = sqs_create_queue(QueueName=queue_name_1)214        queue_arn_1 = sqs_queue_arn(queue_url_1)215        lambda_client.create_event_source_mapping(216            EventSourceArn=queue_arn_1, FunctionName=function_name, MaximumBatchingWindowInSeconds=1217        )218        sqs_client.send_message(QueueUrl=queue_url_1, MessageBody=json.dumps({"foo": "bar"}))219        def assert_lambda_log_events():220            events = get_lambda_log_events(function_name=function_name, logs_client=logs_client)221            # lambda was invoked 1 time222            assert 1 == len(events[0]["Records"])223        retry(assert_lambda_log_events, sleep_before=3, retries=30)224        rs = sqs_client.receive_message(QueueUrl=queue_url_1)225        assert rs.get("Messages") is None226    def test_create_kinesis_event_source_mapping(227        self,228        create_lambda_function,229        lambda_client,230        kinesis_client,231        kinesis_create_stream,232        lambda_su_role,233        wait_for_stream_ready,234        logs_client,235    ):236        function_name = f"lambda_func-{short_uid()}"237        stream_name = f"test-foobar-{short_uid()}"238        create_lambda_function(239            func_name=function_name,240            handler_file=TEST_LAMBDA_PYTHON_ECHO,241            runtime=LAMBDA_RUNTIME_PYTHON36,242            role=lambda_su_role,243        )244        kinesis_create_stream(StreamName=stream_name, ShardCount=1)245        stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][246            "StreamARN"247        ]248        # only valid against AWS / new provider (once implemented)249        if not is_old_provider():250            with pytest.raises(ClientError) as e:251                lambda_client.create_event_source_mapping(252                    EventSourceArn=stream_arn, FunctionName=function_name253                )254            e.match(INVALID_PARAMETER_VALUE_EXCEPTION)255        wait_for_stream_ready(stream_name=stream_name)256        lambda_client.create_event_source_mapping(257            EventSourceArn=stream_arn, FunctionName=function_name, StartingPosition="TRIM_HORIZON"258        )259        stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)260        assert 1 == stream_summary["StreamDescriptionSummary"]["OpenShardCount"]261        num_events_kinesis = 10262        kinesis_client.put_records(263            Records=[...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
