Best Python code snippet using localstack_python
dynamodb_listener.py
Source:dynamodb_listener.py  
...525            if "TableName" in data:526                records[0]["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)527            # forward to kinesis stream528            records_to_kinesis = copy.deepcopy(records)529            forward_to_kinesis_stream(records_to_kinesis)530            # forward to lambda and ddb_streams531            forward_to_lambda(records)532            records = self.prepare_records_to_forward_to_ddb_stream(records)533            forward_to_ddb_stream(records)534    # -------------535    # UTIL METHODS536    # -------------537    def prepare_request_headers(self, headers):538        # Note: We need to ensure that the same access key is used here for all requests,539        # otherwise DynamoDBLocal stores tables/items in separate namespaces540        headers["Authorization"] = re.sub(541            r"Credential=[^/]+/",542            r"Credential=%s/" % constants.TEST_AWS_ACCESS_KEY_ID,543            headers.get("Authorization") or "",544        )545    def prepare_batch_write_item_records(self, record, data):546        records = []547        unprocessed_items = {"PutRequest": {}, "DeleteRequest": {}}548        i = 0549        for table_name in sorted(data["RequestItems"].keys()):550            # Add stream view type to record if ddb stream is enabled551            stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)552            if stream_spec:553                record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]554            for request in data["RequestItems"][table_name]:555                put_request = request.get("PutRequest")556                existing_items = self._thread_local("existing_items")557                if put_request:558                    if existing_items and len(existing_items) > i:559                        existing_item = existing_items[i]560                        keys = dynamodb_extract_keys(561                            item=put_request["Item"], table_name=table_name562                        )563                        if isinstance(keys, Response):564                            return keys565                        new_record = clone(record)566                        new_record["eventID"] = short_uid()567                        new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))568                        new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"569                        new_record["dynamodb"]["Keys"] = keys570                        new_record["dynamodb"]["NewImage"] = put_request["Item"]571                        if existing_item:572                            new_record["dynamodb"]["OldImage"] = existing_item573                        new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)574                        records.append(new_record)575                    unprocessed_put_items = self._thread_local("unprocessed_put_items")576                    if unprocessed_put_items and len(unprocessed_put_items) > i:577                        unprocessed_item = unprocessed_put_items[i]578                        if unprocessed_item:579                            unprocessed_items["PutRequest"].update(580                                json.loads(json.dumps(unprocessed_item))581                            )582                delete_request = request.get("DeleteRequest")583                if delete_request:584                    if existing_items and len(existing_items) > i:585                        keys = delete_request["Key"]586                        if isinstance(keys, Response):587                            return keys588                        new_record = clone(record)589                        new_record["eventID"] = short_uid()590                        new_record["eventName"] = "REMOVE"591                        new_record["dynamodb"]["Keys"] = keys592                        new_record["dynamodb"]["OldImage"] = existing_items[i]593                        new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_items[i]))594                        new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)595                        records.append(new_record)596                    unprocessed_delete_items = self._thread_local("unprocessed_delete_items")597                    if unprocessed_delete_items and len(unprocessed_delete_items) > i:598                        unprocessed_item = unprocessed_delete_items[i]599                        if unprocessed_item:600                            unprocessed_items["DeleteRequest"].update(601                                json.loads(json.dumps(unprocessed_item))602                            )603                i += 1604        return records, unprocessed_items605    def prepare_transact_write_item_records(self, record, data):606        records = []607        # Fix issue #2745: existing_items only contain the Put/Update/Delete records,608        # so we will increase the index based on these events609        i = 0610        for request in data["TransactItems"]:611            put_request = request.get("Put")612            if put_request:613                existing_item = self._thread_local("existing_items")[i]614                table_name = put_request["TableName"]615                keys = dynamodb_extract_keys(item=put_request["Item"], table_name=table_name)616                if isinstance(keys, Response):617                    return keys618                # Add stream view type to record if ddb stream is enabled619                stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)620                if stream_spec:621                    record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]622                new_record = clone(record)623                new_record["eventID"] = short_uid()624                new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"625                new_record["dynamodb"]["Keys"] = keys626                new_record["dynamodb"]["NewImage"] = put_request["Item"]627                if existing_item:628                    new_record["dynamodb"]["OldImage"] = existing_item629                new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)630                new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))631                records.append(new_record)632                i += 1633            update_request = request.get("Update")634            if update_request:635                table_name = update_request["TableName"]636                keys = update_request["Key"]637                if isinstance(keys, Response):638                    return keys639                updated_item = find_existing_item(update_request, table_name)640                if not updated_item:641                    return []642                stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)643                if stream_spec:644                    record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]645                new_record = clone(record)646                new_record["eventID"] = short_uid()647                new_record["eventName"] = "MODIFY"648                new_record["dynamodb"]["Keys"] = keys649                new_record["dynamodb"]["OldImage"] = self._thread_local("existing_items")[i]650                new_record["dynamodb"]["NewImage"] = updated_item651                new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)652                new_record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))653                records.append(new_record)654                i += 1655            delete_request = request.get("Delete")656            if delete_request:657                table_name = delete_request["TableName"]658                keys = delete_request["Key"]659                existing_item = self._thread_local("existing_items")[i]660                if isinstance(keys, Response):661                    return keys662                stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)663                if stream_spec:664                    record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]665                new_record = clone(record)666                new_record["eventID"] = short_uid()667                new_record["eventName"] = "REMOVE"668                new_record["dynamodb"]["Keys"] = keys669                new_record["dynamodb"]["OldImage"] = existing_item670                new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_item))671                new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)672                records.append(new_record)673                i += 1674        return records675    def prepare_records_to_forward_to_ddb_stream(self, records):676        # StreamViewType determines what information is written to the stream for the table677        # When an item in the table is inserted, updated or deleted678        for record in records:679            if record["dynamodb"].get("StreamViewType"):680                # KEYS_ONLY  - Only the key attributes of the modified item are written to the stream681                if record["dynamodb"]["StreamViewType"] == "KEYS_ONLY":682                    record["dynamodb"].pop("OldImage", None)683                    record["dynamodb"].pop("NewImage", None)684                # NEW_IMAGE - The entire item, as it appears after it was modified, is written to the stream685                elif record["dynamodb"]["StreamViewType"] == "NEW_IMAGE":686                    record["dynamodb"].pop("OldImage", None)687                # OLD_IMAGE - The entire item, as it appeared before it was modified, is written to the stream688                elif record["dynamodb"]["StreamViewType"] == "OLD_IMAGE":689                    record["dynamodb"].pop("NewImage", None)690        return records691    def delete_all_event_source_mappings(self, table_arn):692        if table_arn:693            # fix start dynamodb service without lambda694            if not is_api_enabled("lambda"):695                return696            lambda_client = aws_stack.connect_to_service("lambda")697            result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)698            for event in result["EventSourceMappings"]:699                event_source_mapping_id = event["UUID"]700                lambda_client.delete_event_source_mapping(UUID=event_source_mapping_id)701    @staticmethod702    def _thread_local(name, default=None):703        try:704            return getattr(ProxyListenerDynamoDB.thread_local, name)705        except AttributeError:706            return default707def get_sse_kms_managed_key():708    if MANAGED_KMS_KEYS.get(aws_stack.get_region()):709        return MANAGED_KMS_KEYS[aws_stack.get_region()]710    kms_client = aws_stack.connect_to_service("kms")711    key_data = kms_client.create_key(Description="Default key that protects DynamoDB data")712    key_id = key_data["KeyMetadata"]["KeyId"]713    # not really happy with this import here714    from localstack.services.kms import kms_listener715    kms_listener.set_key_managed(key_id)716    MANAGED_KMS_KEYS[aws_stack.get_region()] = key_id717    return key_id718def get_sse_description(data):719    if data.get("Enabled"):720        kms_master_key_id = data.get("KMSMasterKeyId")721        if not kms_master_key_id:722            # this is of course not the actual key for dynamodb, just a better, since existing, mock723            kms_master_key_id = get_sse_kms_managed_key()724        kms_master_key_id = aws_stack.kms_key_arn(kms_master_key_id)725        return {726            "Status": "ENABLED",727            "SSEType": "KMS",  # no other value is allowed here728            "KMSMasterKeyArn": kms_master_key_id,729        }730    return {}731def handle_special_request(method, path, data, headers):732    if path.startswith("/shell") or method == "GET":733        if path == "/shell":734            headers = {"Refresh": "0; url=%s/shell/" % config.TEST_DYNAMODB_URL}735            return aws_responses.requests_response("", headers=headers)736        return True737    if method == "OPTIONS":738        return 200739def create_global_table(data):740    table_name = data["GlobalTableName"]741    if table_name in DynamoDBRegion.GLOBAL_TABLES:742        return get_error_message(743            "Global Table with this name already exists",744            "GlobalTableAlreadyExistsException",745        )746    DynamoDBRegion.GLOBAL_TABLES[table_name] = data747    for group in data.get("ReplicationGroup", []):748        group["ReplicaStatus"] = "ACTIVE"749        group["ReplicaStatusDescription"] = "Replica active"750    result = {"GlobalTableDescription": data}751    return result752def describe_global_table(data):753    table_name = data["GlobalTableName"]754    details = DynamoDBRegion.GLOBAL_TABLES.get(table_name)755    if not details:756        return get_error_message(757            "Global Table with this name does not exist", "GlobalTableNotFoundException"758        )759    result = {"GlobalTableDescription": details}760    return result761def list_global_tables(data):762    result = [763        select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])764        for tab in DynamoDBRegion.GLOBAL_TABLES.values()765    ]766    result = {"GlobalTables": result}767    return result768def update_global_table(data):769    table_name = data["GlobalTableName"]770    details = DynamoDBRegion.GLOBAL_TABLES.get(table_name)771    if not details:772        return get_error_message(773            "Global Table with this name does not exist", "GlobalTableNotFoundException"774        )775    for update in data.get("ReplicaUpdates", []):776        repl_group = details["ReplicationGroup"]777        # delete existing778        delete = update.get("Delete")779        if delete:780            details["ReplicationGroup"] = [781                g for g in repl_group if g["RegionName"] != delete["RegionName"]782            ]783        # create new784        create = update.get("Create")785        if create:786            exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]787            if exists:788                continue789            new_group = {790                "RegionName": create["RegionName"],791                "ReplicaStatus": "ACTIVE",792                "ReplicaStatusDescription": "Replica active",793            }794            details["ReplicationGroup"].append(new_group)795    result = {"GlobalTableDescription": details}796    return result797def is_index_query_valid(table_name, index_query_type):798    schema = get_table_schema(table_name)799    for index in schema["Table"].get("GlobalSecondaryIndexes", []):800        index_projection_type = index.get("Projection").get("ProjectionType")801        if index_query_type == "ALL_ATTRIBUTES" and index_projection_type != "ALL":802            return False803    return True804def has_event_sources_or_streams_enabled(table_name, cache={}):805    if not table_name:806        return807    table_arn = aws_stack.dynamodb_table_arn(table_name)808    cached = cache.get(table_arn)809    if isinstance(cached, bool):810        return cached811    sources = lambda_api.get_event_sources(source_arn=table_arn)812    result = False813    if sources:814        result = True815    if not result and dynamodbstreams_api.get_stream_for_table(table_arn):816        result = True817    cache[table_arn] = result818    # if kinesis streaming destination is enabled819    # get table name from table_arn820    # since batch_wrtie and transact write operations passing table_arn instead of table_name821    table_name = table_arn.split("/", 1)[-1]822    table_definitions = DynamoDBRegion.get().table_definitions823    if not result and table_definitions.get(table_name):824        if table_definitions[table_name].get("KinesisDataStreamDestinationStatus") == "ACTIVE":825            result = True826    return result827def get_table_schema(table_name):828    key = "%s/%s" % (aws_stack.get_region(), table_name)829    schema = SCHEMA_CACHE.get(key)830    if not schema:831        ddb_client = aws_stack.connect_to_service("dynamodb")832        schema = ddb_client.describe_table(TableName=table_name)833        SCHEMA_CACHE[key] = schema834    return schema835def find_existing_item(put_item, table_name=None):836    table_name = table_name or put_item["TableName"]837    ddb_client = aws_stack.connect_to_service("dynamodb")838    search_key = {}839    if "Key" in put_item:840        search_key = put_item["Key"]841    else:842        schema = get_table_schema(table_name)843        schemas = [schema["Table"]["KeySchema"]]844        for index in schema["Table"].get("GlobalSecondaryIndexes", []):845            # TODO846            # schemas.append(index['KeySchema'])847            pass848        for schema in schemas:849            for key in schema:850                key_name = key["AttributeName"]851                search_key[key_name] = put_item["Item"][key_name]852        if not search_key:853            return854    req = {"TableName": table_name, "Key": search_key}855    existing_item = aws_stack.dynamodb_get_item_raw(req)856    if not existing_item:857        return existing_item858    if "Item" not in existing_item:859        if "message" in existing_item:860            table_names = ddb_client.list_tables()["TableNames"]861            msg = "Unable to get item from DynamoDB (existing tables: %s ...truncated if >100 tables): %s" % (862                table_names,863                existing_item["message"],864            )865            LOGGER.warning(msg)866        return867    return existing_item.get("Item")868def get_error_message(message, error_type):869    response = error_response(message=message, error_type=error_type)870    fix_headers_for_updated_response(response)871    return response872def get_table_not_found_error():873    return get_error_message(874        message="Cannot do operations on a non-existent table",875        error_type="ResourceNotFoundException",876    )877def fix_headers_for_updated_response(response):878    response.headers["Content-Length"] = len(to_bytes(response.content))879    response.headers["x-amz-crc32"] = calculate_crc32(response)880def update_response_content(response, content, status_code=None):881    aws_responses.set_response_content(response, content)882    if status_code:883        response.status_code = status_code884    fix_headers_for_updated_response(response)885def update_put_item_response_content(data, response_content):886    # when return-values variable is set only then attribute data should be returned887    # in the response otherwise by default is should not return any data.888    # https://github.com/localstack/localstack/issues/2121889    if data.get("ReturnValues"):890        response_content = json.dumps({"Attributes": data["Item"]})891    return response_content892def calculate_crc32(response):893    return crc32(to_bytes(response.content)) & 0xFFFFFFFF894def create_dynamodb_stream(data, latest_stream_label):895    stream = data["StreamSpecification"]896    enabled = stream.get("StreamEnabled")897    if enabled not in [False, "False"]:898        table_name = data["TableName"]899        view_type = stream["StreamViewType"]900        dynamodbstreams_api.add_dynamodb_stream(901            table_name=table_name,902            latest_stream_label=latest_stream_label,903            view_type=view_type,904            enabled=enabled,905        )906def forward_to_lambda(records):907    for record in records:908        sources = lambda_api.get_event_sources(source_arn=record["eventSourceARN"])909        event = {"Records": [record]}910        for src in sources:911            if src.get("State") != "Enabled":912                continue913            lambda_api.run_lambda(914                func_arn=src["FunctionArn"],915                event=event,916                context={},917                asynchronous=not config.SYNCHRONOUS_DYNAMODB_EVENTS,918            )919def forward_to_ddb_stream(records):920    dynamodbstreams_api.forward_events(records)921def forward_to_kinesis_stream(records):922    kinesis = aws_stack.connect_to_service("kinesis")923    table_definitions = DynamoDBRegion.get().table_definitions924    for record in records:925        if record.get("eventSourceARN"):926            table_name = record["eventSourceARN"].split("/", 1)[-1]927            table_def = table_definitions.get(table_name) or {}928            if table_def.get("KinesisDataStreamDestinationStatus") == "ACTIVE":929                stream_name = table_def["KinesisDataStreamDestinations"][-1]["StreamArn"].split(930                    "/", 1931                )[-1]932                record["tableName"] = table_name933                record.pop("eventSourceARN", None)934                record["dynamodb"].pop("StreamViewType", None)935                partition_key = list(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
