Best Python code snippet using localstack_python
fixtures.py
Source:fixtures.py  
...596            )597        poll_condition(is_stream_ready)598    return _wait_for_stream_ready599@pytest.fixture600def wait_for_dynamodb_stream_ready(dynamodbstreams_client):601    def _wait_for_stream_ready(stream_arn: str):602        def is_stream_ready():603            describe_stream_response = dynamodbstreams_client.describe_stream(StreamArn=stream_arn)604            return describe_stream_response["StreamDescription"]["StreamStatus"] == "ENABLED"605        poll_condition(is_stream_ready)606    return _wait_for_stream_ready607@pytest.fixture()608def kms_create_key(kms_client):609    key_ids = []610    def _create_key(**kwargs):611        if "Description" not in kwargs:612            kwargs["Description"] = f"test description - {short_uid()}"613        if "KeyUsage" not in kwargs:614            kwargs["KeyUsage"] = "ENCRYPT_DECRYPT"...test_lambda_integration.py
Source:test_lambda_integration.py  
...635            stream_arn = dynamodb_client.update_table(636                TableName=table_name,637                StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"},638            )["TableDescription"]["LatestStreamArn"]639            wait_for_dynamodb_stream_ready(stream_arn)640            event_source_mapping_kwargs = {641                "FunctionName": function_name,642                "BatchSize": 1,643                "StartingPosition": "TRIM_HORIZON",644                "EventSourceArn": stream_arn,645                "MaximumBatchingWindowInSeconds": 1,646                "MaximumRetryAttempts": 1,647            }648            event_source_mapping_kwargs.update(649                FilterCriteria={650                    "Filters": [651                        {"Pattern": json.dumps(filter)},652                    ]653                }654            )655            event_source_uuid = lambda_client.create_event_source_mapping(656                **event_source_mapping_kwargs657            )["UUID"]658            _await_event_source_mapping_enabled(lambda_client, event_source_uuid)659            dynamodb_client.put_item(TableName=table_name, Item=item_to_put1)660            def assert_lambda_called():661                events = get_lambda_log_events(function_name, logs_client=logs_client)662                if calls > 0:663                    assert len(events) == 1664                else:665                    # negative test for 'numeric' filter666                    assert len(events) == 0667            retry(assert_lambda_called, retries=max_retries)668            # Following lines are relevant if variables are set via parametrize669            if item_to_put2:670                # putting a new item (item_to_put2) a second time is a 'INSERT' request671                dynamodb_client.put_item(TableName=table_name, Item=item_to_put2)672            else:673                # putting the same item (item_to_put1) a second time is a 'MODIFY' request (at least in Localstack)674                dynamodb_client.put_item(TableName=table_name, Item=item_to_put1)675            # depending on the parametrize values the filter (and the items to put) the lambda might be called multiple times676            if calls > 1:677                def assert_events_called_multiple():678                    events = get_lambda_log_events(function_name, logs_client=logs_client)679                    assert len(events) == calls680                # lambda was called a second time, so new records should be found681                retry(assert_events_called_multiple, retries=max_retries)682            else:683                # lambda wasn't called a second time, so no new records should be found684                retry(assert_lambda_called, retries=max_retries)685        finally:686            if event_source_uuid:687                lambda_client.delete_event_source_mapping(UUID=event_source_uuid)688    @pytest.mark.aws_validated689    @pytest.mark.parametrize(690        "filter",691        [692            "single-string",693            '[{"eventName": ["INSERT"=123}]',694        ],695    )696    def test_dynamodb_invalid_event_filter(697        self,698        create_lambda_function,699        lambda_client,700        dynamodb_client,701        dynamodb_create_table,702        lambda_su_role,703        wait_for_dynamodb_stream_ready,704        filter,705    ):706        function_name = f"lambda_func-{short_uid()}"707        table_name = f"test-table-{short_uid()}"708        create_lambda_function(709            handler_file=TEST_LAMBDA_PYTHON_ECHO,710            func_name=function_name,711            runtime=LAMBDA_RUNTIME_PYTHON37,712            role=lambda_su_role,713        )714        dynamodb_create_table(table_name=table_name, partition_key="id")715        _await_dynamodb_table_active(dynamodb_client, table_name)716        stream_arn = dynamodb_client.update_table(717            TableName=table_name,718            StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"},719        )["TableDescription"]["LatestStreamArn"]720        wait_for_dynamodb_stream_ready(stream_arn)721        event_source_mapping_kwargs = {722            "FunctionName": function_name,723            "BatchSize": 1,724            "StartingPosition": "TRIM_HORIZON",725            "EventSourceArn": stream_arn,726            "MaximumBatchingWindowInSeconds": 1,727            "MaximumRetryAttempts": 1,728            "FilterCriteria": {729                "Filters": [730                    {"Pattern": filter},731                ]732            },733        }734        with pytest.raises(Exception) as expected:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
