Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...452    @handler("PutItem", expand=False)453    def put_item(self, context: RequestContext, put_item_input: PutItemInput) -> PutItemOutput:454        existing_item = None455        table_name = put_item_input["TableName"]456        event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)457        if event_sources_or_streams_enabled:458            existing_item = ItemFinder.find_existing_item(put_item_input)459        # forward request to backend460        self.fix_return_consumed_capacity(put_item_input)461        result = self.forward_request(context, put_item_input)462        # Get stream specifications details for the table463        if event_sources_or_streams_enabled:464            stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)465            item = put_item_input["Item"]466            # prepare record keys467            keys = SchemaExtractor.extract_keys(item=item, table_name=table_name)468            # create record469            record = self.get_record_template()470            record["eventName"] = "INSERT" if not existing_item else "MODIFY"471            record["dynamodb"].update(472                {473                    "Keys": keys,474                    "NewImage": item,475                    "SizeBytes": len(json.dumps(item)),476                }477            )478            if stream_spec:479                record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]480            if existing_item:481                record["dynamodb"]["OldImage"] = existing_item482            self.forward_stream_records([record], table_name=table_name)483        return result484    @handler("DeleteItem", expand=False)485    def delete_item(486        self,487        context: RequestContext,488        delete_item_input: DeleteItemInput,489    ) -> DeleteItemOutput:490        existing_item = None491        table_name = delete_item_input["TableName"]492        if has_event_sources_or_streams_enabled(table_name):493            existing_item = ItemFinder.find_existing_item(delete_item_input)494        # forward request to backend495        self.fix_return_consumed_capacity(delete_item_input)496        result = self.forward_request(context, delete_item_input)497        # determine and forward stream record498        if existing_item:499            event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)500            if event_sources_or_streams_enabled:501                # create record502                record = self.get_record_template()503                record["eventName"] = "REMOVE"504                record["dynamodb"].update(505                    {506                        "Keys": delete_item_input["Key"],507                        "OldImage": existing_item,508                        "SizeBytes": len(json.dumps(existing_item)),509                    }510                )511                # Get stream specifications details for the table512                stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)513                if stream_spec:514                    record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]515                self.forward_stream_records([record], table_name=table_name)516        return result517    @handler("UpdateItem", expand=False)518    def update_item(519        self,520        context: RequestContext,521        update_item_input: UpdateItemInput,522    ) -> UpdateItemOutput:523        existing_item = None524        table_name = update_item_input["TableName"]525        event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)526        if event_sources_or_streams_enabled:527            existing_item = ItemFinder.find_existing_item(update_item_input)528        # forward request to backend529        self.fix_return_consumed_capacity(update_item_input)530        result = self.forward_request(context, update_item_input)531        # construct and forward stream record532        if event_sources_or_streams_enabled:533            updated_item = ItemFinder.find_existing_item(update_item_input)534            if updated_item:535                record = self.get_record_template()536                record["eventName"] = "INSERT" if not existing_item else "MODIFY"537                record["dynamodb"].update(538                    {539                        "Keys": update_item_input["Key"],540                        "NewImage": updated_item,541                        "SizeBytes": len(json.dumps(updated_item)),542                    }543                )544                if existing_item:545                    record["dynamodb"]["OldImage"] = existing_item546                stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)547                if stream_spec:548                    record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]549                self.forward_stream_records([record], table_name=table_name)550        return result551    @handler("GetItem", expand=False)552    def get_item(self, context: RequestContext, get_item_input: GetItemInput) -> GetItemOutput:553        result = self.forward_request(context)554        self.fix_consumed_capacity(get_item_input, result)555        return result556    @handler("Query", expand=False)557    def query(self, context: RequestContext, query_input: QueryInput) -> QueryOutput:558        index_name = query_input.get("IndexName")559        if index_name:560            if not is_index_query_valid(query_input):561                raise ValidationException(562                    "One or more parameter values were invalid: Select type ALL_ATTRIBUTES "563                    "is not supported for global secondary index id-index because its projection "564                    "type is not ALL",565                )566        result = self.forward_request(context)567        self.fix_consumed_capacity(query_input, result)568        return result569    @handler("Scan", expand=False)570    def scan(self, context: RequestContext, scan_input: ScanInput) -> ScanOutput:571        return self.forward_request(context)572    @handler("BatchWriteItem", expand=False)573    def batch_write_item(574        self,575        context: RequestContext,576        batch_write_item_input: BatchWriteItemInput,577    ) -> BatchWriteItemOutput:578        existing_items = []579        unprocessed_put_items = []580        unprocessed_delete_items = []581        request_items = batch_write_item_input["RequestItems"]582        for table_name in sorted(request_items.keys()):583            for request in request_items[table_name]:584                for key in ["PutRequest", "DeleteRequest"]:585                    inner_request = request.get(key)586                    if inner_request:587                        if self.should_throttle("BatchWriteItem"):588                            if key == "PutRequest":589                                unprocessed_put_items.append(inner_request)590                            elif key == "DeleteRequest":591                                unprocessed_delete_items.append(inner_request)592                        else:593                            item = ItemFinder.find_existing_item(inner_request, table_name)594                            existing_items.append(item)595        # forward request to backend596        result = self.forward_request(context)597        # determine and forward stream records598        request_items = batch_write_item_input["RequestItems"]599        records, unprocessed_items = self.prepare_batch_write_item_records(600            request_items=request_items,601            unprocessed_put_items=unprocessed_put_items,602            unprocessed_delete_items=unprocessed_delete_items,603            existing_items=existing_items,604        )605        streams_enabled_cache = {}606        event_sources_or_streams_enabled = False607        for record in records:608            event_sources_or_streams_enabled = (609                event_sources_or_streams_enabled610                or has_event_sources_or_streams_enabled(611                    record["eventSourceARN"], streams_enabled_cache612                )613            )614        if event_sources_or_streams_enabled:615            self.forward_stream_records(records)616        # update response617        if any(unprocessed_items):618            table_name = list(request_items.keys())[0]619            unprocessed = result["UnprocessedItems"]620            if table_name not in unprocessed:621                unprocessed[table_name] = []622            for key in ["PutRequest", "DeleteRequest"]:623                if any(unprocessed_items[key]):624                    unprocessed_items[table_name].append({key: unprocessed_items[key]})625            for key in list(unprocessed.keys()):626                if not unprocessed.get(key):627                    del unprocessed[key]628        return result629    @handler("TransactWriteItems", expand=False)630    def transact_write_items(631        self,632        context: RequestContext,633        transact_write_items_input: TransactWriteItemsInput,634    ) -> TransactWriteItemsOutput:635        existing_items = []636        for item in transact_write_items_input["TransactItems"]:637            for key in ["Put", "Update", "Delete"]:638                inner_item = item.get(key)639                if inner_item:640                    existing_items.append(ItemFinder.find_existing_item(inner_item))641        # forward request to backend642        result = self.forward_request(context)643        # determine and forward stream records644        streams_enabled_cache = {}645        records = self.prepare_transact_write_item_records(646            transact_items=transact_write_items_input["TransactItems"],647            existing_items=existing_items,648        )649        event_sources_or_streams_enabled = False650        for record in records:651            event_sources_or_streams_enabled = (652                event_sources_or_streams_enabled653                or has_event_sources_or_streams_enabled(654                    record["eventSourceARN"], streams_enabled_cache655                )656            )657        if event_sources_or_streams_enabled:658            self.forward_stream_records(records)659        return result660    @handler("ExecuteStatement", expand=False)661    def execute_statement(662        self,663        context: RequestContext,664        execute_statement_input: ExecuteStatementInput,665    ) -> ExecuteStatementOutput:666        statement = execute_statement_input["Statement"]667        table_name = extract_table_name_from_partiql_update(statement)668        existing_items = None669        if table_name and has_event_sources_or_streams_enabled(table_name):670            # Note: fetching the entire list of items is hugely inefficient, especially for larger tables671            # TODO: find a mechanism to hook into the PartiQL update mechanism of DynamoDB Local directly!672            existing_items = ItemFinder.list_existing_items_for_statement(statement)673        # forward request to backend674        result = self.forward_request(context)675        # construct and forward stream record676        event_sources_or_streams_enabled = table_name and has_event_sources_or_streams_enabled(677            table_name678        )679        if event_sources_or_streams_enabled:680            records = get_updated_records(table_name, existing_items)681            self.forward_stream_records(records, table_name=table_name)682        return result683    def tag_resource(684        self, context: RequestContext, resource_arn: ResourceArnString, tags: TagList685    ) -> None:686        table_tags = DynamoDBRegion.TABLE_TAGS687        if resource_arn not in table_tags:688            table_tags[resource_arn] = {}689        table_tags[resource_arn].update({tag["Key"]: tag["Value"] for tag in tags})690    def untag_resource(691        self, context: RequestContext, resource_arn: ResourceArnString, tag_keys: TagKeyList692    ) -> None:693        for tag_key in tag_keys or []:694            DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).pop(tag_key, None)695    def list_tags_of_resource(696        self,697        context: RequestContext,698        resource_arn: ResourceArnString,699        next_token: NextTokenString = None,700    ) -> ListTagsOfResourceOutput:701        result = [702            {"Key": k, "Value": v}703            for k, v in DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).items()704        ]705        return ListTagsOfResourceOutput(Tags=result)706    def describe_time_to_live(707        self, context: RequestContext, table_name: TableName708    ) -> DescribeTimeToLiveOutput:709        backend = DynamoDBRegion.get()710        ttl_spec = backend.ttl_specifications.get(table_name)711        result = {"TimeToLiveStatus": "DISABLED"}712        if ttl_spec:713            if ttl_spec.get("Enabled"):714                ttl_status = "ENABLED"715            else:716                ttl_status = "DISABLED"717            result = {718                "AttributeName": ttl_spec.get("AttributeName"),719                "TimeToLiveStatus": ttl_status,720            }721        return DescribeTimeToLiveOutput(TimeToLiveDescription=result)722    def update_time_to_live(723        self,724        context: RequestContext,725        table_name: TableName,726        time_to_live_specification: TimeToLiveSpecification,727    ) -> UpdateTimeToLiveOutput:728        # TODO: TTL status is maintained/mocked but no real expiry is happening for items729        backend = DynamoDBRegion.get()730        backend.ttl_specifications[table_name] = time_to_live_specification731        return UpdateTimeToLiveOutput(TimeToLiveSpecification=time_to_live_specification)732    def create_global_table(733        self, context: RequestContext, global_table_name: TableName, replication_group: ReplicaList734    ) -> CreateGlobalTableOutput:735        if global_table_name in DynamoDBRegion.GLOBAL_TABLES:736            raise GlobalTableAlreadyExistsException("Global table with this name already exists")737        replication_group = [grp.copy() for grp in replication_group or []]738        data = {"GlobalTableName": global_table_name, "ReplicationGroup": replication_group}739        DynamoDBRegion.GLOBAL_TABLES[global_table_name] = data740        for group in replication_group:741            group["ReplicaStatus"] = "ACTIVE"742            group["ReplicaStatusDescription"] = "Replica active"743        return CreateGlobalTableOutput(GlobalTableDescription=data)744    def describe_global_table(745        self, context: RequestContext, global_table_name: TableName746    ) -> DescribeGlobalTableOutput:747        details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)748        if not details:749            raise GlobalTableNotFoundException("Global table with this name does not exist")750        return DescribeGlobalTableOutput(GlobalTableDescription=details)751    def list_global_tables(752        self,753        context: RequestContext,754        exclusive_start_global_table_name: TableName = None,755        limit: PositiveIntegerObject = None,756        region_name: RegionName = None,757    ) -> ListGlobalTablesOutput:758        # TODO: add paging support759        result = [760            select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])761            for tab in DynamoDBRegion.GLOBAL_TABLES.values()762        ]763        return ListGlobalTablesOutput(GlobalTables=result)764    def update_global_table(765        self,766        context: RequestContext,767        global_table_name: TableName,768        replica_updates: ReplicaUpdateList,769    ) -> UpdateGlobalTableOutput:770        details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)771        if not details:772            raise GlobalTableNotFoundException("Global table with this name does not exist")773        for update in replica_updates or []:774            repl_group = details["ReplicationGroup"]775            # delete existing776            delete = update.get("Delete")777            if delete:778                details["ReplicationGroup"] = [779                    g for g in repl_group if g["RegionName"] != delete["RegionName"]780                ]781            # create new782            create = update.get("Create")783            if create:784                exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]785                if exists:786                    continue787                new_group = {788                    "RegionName": create["RegionName"],789                    "ReplicaStatus": "ACTIVE",790                    "ReplicaStatusDescription": "Replica active",791                }792                details["ReplicationGroup"].append(new_group)793        return UpdateGlobalTableOutput(GlobalTableDescription=details)794    def enable_kinesis_streaming_destination(795        self, context: RequestContext, table_name: TableName, stream_arn: StreamArn796    ) -> KinesisStreamingDestinationOutput:797        # Check if table exists, to avoid error log output from DynamoDBLocal798        if not self.table_exists(table_name):799            raise ResourceNotFoundException("Cannot do operations on a non-existent table")800        stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)801        if not stream:802            raise ValidationException("User does not have a permission to use kinesis stream")803        table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})804        dest_status = table_def.get("KinesisDataStreamDestinationStatus")805        if dest_status not in ["DISABLED", "ENABLE_FAILED", None]:806            raise ValidationException(807                "Table is not in a valid state to enable Kinesis Streaming "808                "Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "809                "to perform ENABLE operation."810            )811        table_def["KinesisDataStreamDestinations"] = (812            table_def.get("KinesisDataStreamDestinations") or []813        )814        # remove the stream destination if already present815        table_def["KinesisDataStreamDestinations"] = [816            t for t in table_def["KinesisDataStreamDestinations"] if t["StreamArn"] != stream_arn817        ]818        # append the active stream destination at the end of the list819        table_def["KinesisDataStreamDestinations"].append(820            {821                "DestinationStatus": "ACTIVE",822                "DestinationStatusDescription": "Stream is active",823                "StreamArn": stream_arn,824            }825        )826        table_def["KinesisDataStreamDestinationStatus"] = "ACTIVE"827        return KinesisStreamingDestinationOutput(828            DestinationStatus="ACTIVE", StreamArn=stream_arn, TableName=table_name829        )830    def disable_kinesis_streaming_destination(831        self, context: RequestContext, table_name: TableName, stream_arn: StreamArn832    ) -> KinesisStreamingDestinationOutput:833        # Check if table exists, to avoid error log output from DynamoDBLocal834        if not self.table_exists(table_name):835            raise ResourceNotFoundException("Cannot do operations on a non-existent table")836        stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)837        if not stream:838            raise ValidationException(839                "User does not have a permission to use kinesis stream",840            )841        table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})842        stream_destinations = table_def.get("KinesisDataStreamDestinations")843        if stream_destinations:844            if table_def["KinesisDataStreamDestinationStatus"] == "ACTIVE":845                for dest in stream_destinations:846                    if dest["StreamArn"] == stream_arn and dest["DestinationStatus"] == "ACTIVE":847                        dest["DestinationStatus"] = "DISABLED"848                        dest["DestinationStatusDescription"] = ("Stream is disabled",)849                        table_def["KinesisDataStreamDestinationStatus"] = "DISABLED"850                        return KinesisStreamingDestinationOutput(851                            DestinationStatus="DISABLED",852                            StreamArn=stream_arn,853                            TableName=table_name,854                        )855        raise ValidationException(856            "Table is not in a valid state to disable Kinesis Streaming Destination:"857            "DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."858        )859    def describe_kinesis_streaming_destination(860        self, context: RequestContext, table_name: TableName861    ) -> DescribeKinesisStreamingDestinationOutput:862        # Check if table exists, to avoid error log output from DynamoDBLocal863        if not self.table_exists(table_name):864            raise ResourceNotFoundException("Cannot do operations on a non-existent table")865        table_def = DynamoDBRegion.get().table_definitions.get(table_name)866        stream_destinations = table_def.get("KinesisDataStreamDestinations") or []867        return DescribeKinesisStreamingDestinationOutput(868            KinesisDataStreamDestinations=stream_destinations, TableName=table_name869        )870    @staticmethod871    def table_exists(table_name):872        return aws_stack.dynamodb_table_exists(table_name)873    @staticmethod874    def prepare_request_headers(headers):875        def _replace(regex, replace):876            headers["Authorization"] = re.sub(877                regex, replace, headers.get("Authorization") or "", flags=re.IGNORECASE878            )879        # Note: We need to ensure that the same access key is used here for all requests,880        # otherwise DynamoDBLocal stores tables/items in separate namespaces881        _replace(r"Credential=[^/]+/", rf"Credential={constants.INTERNAL_AWS_ACCESS_KEY_ID}/")882        # Note: The NoSQL Workbench sends "localhost" or "local" as the region name, which we need to fix here883        _replace(884            r"Credential=([^/]+/[^/]+)/local(host)?/",885            rf"Credential=\1/{aws_stack.get_local_region()}/",886        )887    def fix_return_consumed_capacity(self, request_dict):888        # Fix incorrect values if ReturnValues==ALL_OLD and ReturnConsumedCapacity is889        # empty, see https://github.com/localstack/localstack/issues/2049890        return_values_all = (request_dict.get("ReturnValues") == "ALL_OLD") or (891            not request_dict.get("ReturnValues")892        )893        if return_values_all and not request_dict.get("ReturnConsumedCapacity"):894            request_dict["ReturnConsumedCapacity"] = "TOTAL"895    def fix_consumed_capacity(self, request: Dict, result: Dict):896        # make sure we append 'ConsumedCapacity', which is properly897        # returned by dynalite, but not by AWS's DynamoDBLocal898        table_name = request.get("TableName")899        return_cap = request.get("ReturnConsumedCapacity")900        if "ConsumedCapacity" not in result and return_cap in ["TOTAL", "INDEXES"]:901            request["ConsumedCapacity"] = {902                "TableName": table_name,903                "CapacityUnits": 5,  # TODO hardcoded904                "ReadCapacityUnits": 2,905                "WriteCapacityUnits": 3,906            }907    def fix_table_arn(self, table_arn: str) -> str:908        return re.sub(909            "arn:aws:dynamodb:ddblocal:",910            f"arn:aws:dynamodb:{aws_stack.get_region()}:",911            table_arn,912        )913    def prepare_transact_write_item_records(self, transact_items, existing_items):914        records = []915        record = self.get_record_template()916        # Fix issue #2745: existing_items only contain the Put/Update/Delete records,917        # so we will increase the index based on these events918        i = 0919        for request in transact_items:920            put_request = request.get("Put")921            if put_request:922                existing_item = existing_items[i]923                table_name = put_request["TableName"]924                keys = SchemaExtractor.extract_keys(item=put_request["Item"], table_name=table_name)925                # Add stream view type to record if ddb stream is enabled926                stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)927                if stream_spec:928                    record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]929                new_record = copy.deepcopy(record)930                new_record["eventID"] = short_uid()931                new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"932                new_record["dynamodb"]["Keys"] = keys933                new_record["dynamodb"]["NewImage"] = put_request["Item"]934                if existing_item:935                    new_record["dynamodb"]["OldImage"] = existing_item936                new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)937                new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))938                records.append(new_record)939                i += 1940            update_request = request.get("Update")941            if update_request:942                table_name = update_request["TableName"]943                keys = update_request["Key"]944                updated_item = ItemFinder.find_existing_item(update_request, table_name)945                if not updated_item:946                    return []947                stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)948                if stream_spec:949                    record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]950                new_record = copy.deepcopy(record)951                new_record["eventID"] = short_uid()952                new_record["eventName"] = "MODIFY"953                new_record["dynamodb"]["Keys"] = keys954                new_record["dynamodb"]["OldImage"] = existing_items[i]955                new_record["dynamodb"]["NewImage"] = updated_item956                new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)957                new_record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))958                records.append(new_record)959                i += 1960            delete_request = request.get("Delete")961            if delete_request:962                table_name = delete_request["TableName"]963                keys = delete_request["Key"]964                existing_item = existing_items[i]965                stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)966                if stream_spec:967                    record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]968                new_record = copy.deepcopy(record)969                new_record["eventID"] = short_uid()970                new_record["eventName"] = "REMOVE"971                new_record["dynamodb"]["Keys"] = keys972                new_record["dynamodb"]["OldImage"] = existing_item973                new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_item))974                new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)975                records.append(new_record)976                i += 1977        return records978    def prepare_batch_write_item_records(979        self,980        request_items,981        existing_items,982        unprocessed_put_items: List,983        unprocessed_delete_items: List,984    ):985        records = []986        record = self.get_record_template()987        unprocessed_items = {"PutRequest": {}, "DeleteRequest": {}}988        i = 0989        for table_name in sorted(request_items.keys()):990            # Add stream view type to record if ddb stream is enabled991            stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)992            if stream_spec:993                record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]994            for request in request_items[table_name]:995                put_request = request.get("PutRequest")996                if put_request:997                    if existing_items and len(existing_items) > i:998                        existing_item = existing_items[i]999                        keys = SchemaExtractor.extract_keys(1000                            item=put_request["Item"], table_name=table_name1001                        )1002                        new_record = copy.deepcopy(record)1003                        new_record["eventID"] = short_uid()1004                        new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))1005                        new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"1006                        new_record["dynamodb"]["Keys"] = keys1007                        new_record["dynamodb"]["NewImage"] = put_request["Item"]1008                        if existing_item:1009                            new_record["dynamodb"]["OldImage"] = existing_item1010                        new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1011                        records.append(new_record)1012                    if unprocessed_put_items and len(unprocessed_put_items) > i:1013                        unprocessed_item = unprocessed_put_items[i]1014                        if unprocessed_item:1015                            unprocessed_items["PutRequest"].update(1016                                json.loads(json.dumps(unprocessed_item))1017                            )1018                delete_request = request.get("DeleteRequest")1019                if delete_request:1020                    if existing_items and len(existing_items) > i:1021                        keys = delete_request["Key"]1022                        new_record = copy.deepcopy(record)1023                        new_record["eventID"] = short_uid()1024                        new_record["eventName"] = "REMOVE"1025                        new_record["dynamodb"]["Keys"] = keys1026                        new_record["dynamodb"]["OldImage"] = existing_items[i]1027                        new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_items[i]))1028                        new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1029                        records.append(new_record)1030                    if unprocessed_delete_items and len(unprocessed_delete_items) > i:1031                        unprocessed_item = unprocessed_delete_items[i]1032                        if unprocessed_item:1033                            unprocessed_items["DeleteRequest"].update(1034                                json.loads(json.dumps(unprocessed_item))1035                            )1036                i += 11037        return records, unprocessed_items1038    def forward_stream_records(self, records: List[Dict], table_name: str = None):1039        if records and "eventName" in records[0]:1040            if table_name:1041                for record in records:1042                    record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1043            EventForwarder.forward_to_targets(records, background=True)1044    def delete_all_event_source_mappings(self, table_arn):1045        if table_arn:1046            # fix start dynamodb service without lambda1047            if not is_api_enabled("lambda"):1048                return1049            lambda_client = aws_stack.connect_to_service("lambda")1050            result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)1051            for event in result["EventSourceMappings"]:1052                event_source_mapping_id = event["UUID"]1053                lambda_client.delete_event_source_mapping(UUID=event_source_mapping_id)1054    def get_record_template(self) -> Dict:1055        return {1056            "eventID": short_uid(),1057            "eventVersion": "1.1",1058            "dynamodb": {1059                # expects nearest second rounded down1060                "ApproximateCreationDateTime": int(time.time()),1061                # 'StreamViewType': 'NEW_AND_OLD_IMAGES',1062                "SizeBytes": -1,1063            },1064            "awsRegion": aws_stack.get_region(),1065            "eventSource": "aws:dynamodb",1066        }1067    def action_should_throttle(self, action, actions):1068        throttled = [f"{ACTION_PREFIX}{a}" for a in actions]1069        return (action in throttled) or (action in actions)1070    def should_throttle(self, action):1071        rand = random.random()1072        if rand < config.DYNAMODB_READ_ERROR_PROBABILITY and self.action_should_throttle(1073            action, READ_THROTTLED_ACTIONS1074        ):1075            return True1076        elif rand < config.DYNAMODB_WRITE_ERROR_PROBABILITY and self.action_should_throttle(1077            action, WRITE_THROTTLED_ACTIONS1078        ):1079            return True1080        elif rand < config.DYNAMODB_ERROR_PROBABILITY and self.action_should_throttle(1081            action, THROTTLED_ACTIONS1082        ):1083            return True1084        else:1085            return False1086# ---1087# Misc. util functions1088# ---1089def get_global_secondary_index(table_name, index_name):1090    schema = SchemaExtractor.get_table_schema(table_name)1091    for index in schema["Table"].get("GlobalSecondaryIndexes", []):1092        if index["IndexName"] == index_name:1093            return index1094    raise ResourceNotFoundException("Index not found")1095def is_index_query_valid(query_data: dict) -> bool:1096    table_name = to_str(query_data["TableName"])1097    index_name = to_str(query_data["IndexName"])1098    index_query_type = query_data.get("Select")1099    index = get_global_secondary_index(table_name, index_name)1100    index_projection_type = index.get("Projection").get("ProjectionType")1101    if index_query_type == "ALL_ATTRIBUTES" and index_projection_type != "ALL":1102        return False1103    return True1104def has_event_sources_or_streams_enabled(table_name: str, cache: Dict = None):1105    if cache is None:1106        cache = {}1107    if not table_name:1108        return1109    table_arn = aws_stack.dynamodb_table_arn(table_name)1110    cached = cache.get(table_arn)1111    if isinstance(cached, bool):1112        return cached1113    sources = lambda_api.get_event_sources(source_arn=table_arn)1114    result = False1115    if sources:1116        result = True1117    if not result and dynamodbstreams_api.get_stream_for_table(table_arn):1118        result = True...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
