How to use fix_consumed_capacity method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...550 return result551 @handler("GetItem", expand=False)552 def get_item(self, context: RequestContext, get_item_input: GetItemInput) -> GetItemOutput:553 result = self.forward_request(context)554 self.fix_consumed_capacity(get_item_input, result)555 return result556 @handler("Query", expand=False)557 def query(self, context: RequestContext, query_input: QueryInput) -> QueryOutput:558 index_name = query_input.get("IndexName")559 if index_name:560 if not is_index_query_valid(query_input):561 raise ValidationException(562 "One or more parameter values were invalid: Select type ALL_ATTRIBUTES "563 "is not supported for global secondary index id-index because its projection "564 "type is not ALL",565 )566 result = self.forward_request(context)567 self.fix_consumed_capacity(query_input, result)568 return result569 @handler("Scan", expand=False)570 def scan(self, context: RequestContext, scan_input: ScanInput) -> ScanOutput:571 return self.forward_request(context)572 @handler("BatchWriteItem", expand=False)573 def batch_write_item(574 self,575 context: RequestContext,576 batch_write_item_input: BatchWriteItemInput,577 ) -> BatchWriteItemOutput:578 existing_items = []579 unprocessed_put_items = []580 unprocessed_delete_items = []581 request_items = batch_write_item_input["RequestItems"]582 for table_name in sorted(request_items.keys()):583 for request in request_items[table_name]:584 for key in ["PutRequest", "DeleteRequest"]:585 inner_request = request.get(key)586 if inner_request:587 if self.should_throttle("BatchWriteItem"):588 if key == "PutRequest":589 unprocessed_put_items.append(inner_request)590 elif key == "DeleteRequest":591 unprocessed_delete_items.append(inner_request)592 else:593 item = ItemFinder.find_existing_item(inner_request, table_name)594 existing_items.append(item)595 # forward request to backend596 result = self.forward_request(context)597 # determine and forward stream records598 request_items = batch_write_item_input["RequestItems"]599 records, unprocessed_items = self.prepare_batch_write_item_records(600 request_items=request_items,601 unprocessed_put_items=unprocessed_put_items,602 unprocessed_delete_items=unprocessed_delete_items,603 existing_items=existing_items,604 )605 streams_enabled_cache = {}606 event_sources_or_streams_enabled = False607 for record in records:608 event_sources_or_streams_enabled = (609 event_sources_or_streams_enabled610 or has_event_sources_or_streams_enabled(611 record["eventSourceARN"], streams_enabled_cache612 )613 )614 if event_sources_or_streams_enabled:615 self.forward_stream_records(records)616 # update response617 if any(unprocessed_items):618 table_name = list(request_items.keys())[0]619 unprocessed = result["UnprocessedItems"]620 if table_name not in unprocessed:621 unprocessed[table_name] = []622 for key in ["PutRequest", "DeleteRequest"]:623 if any(unprocessed_items[key]):624 unprocessed_items[table_name].append({key: unprocessed_items[key]})625 for key in list(unprocessed.keys()):626 if not unprocessed.get(key):627 del unprocessed[key]628 return result629 @handler("TransactWriteItems", expand=False)630 def transact_write_items(631 self,632 context: RequestContext,633 transact_write_items_input: TransactWriteItemsInput,634 ) -> TransactWriteItemsOutput:635 existing_items = []636 for item in transact_write_items_input["TransactItems"]:637 for key in ["Put", "Update", "Delete"]:638 inner_item = item.get(key)639 if inner_item:640 existing_items.append(ItemFinder.find_existing_item(inner_item))641 # forward request to backend642 result = self.forward_request(context)643 # determine and forward stream records644 streams_enabled_cache = {}645 records = self.prepare_transact_write_item_records(646 transact_items=transact_write_items_input["TransactItems"],647 existing_items=existing_items,648 )649 event_sources_or_streams_enabled = False650 for record in records:651 event_sources_or_streams_enabled = (652 event_sources_or_streams_enabled653 or has_event_sources_or_streams_enabled(654 record["eventSourceARN"], streams_enabled_cache655 )656 )657 if event_sources_or_streams_enabled:658 self.forward_stream_records(records)659 return result660 @handler("ExecuteStatement", expand=False)661 def execute_statement(662 self,663 context: RequestContext,664 execute_statement_input: ExecuteStatementInput,665 ) -> ExecuteStatementOutput:666 statement = execute_statement_input["Statement"]667 table_name = extract_table_name_from_partiql_update(statement)668 existing_items = None669 if table_name and has_event_sources_or_streams_enabled(table_name):670 # Note: fetching the entire list of items is hugely inefficient, especially for larger tables671 # TODO: find a mechanism to hook into the PartiQL update mechanism of DynamoDB Local directly!672 existing_items = ItemFinder.list_existing_items_for_statement(statement)673 # forward request to backend674 result = self.forward_request(context)675 # construct and forward stream record676 event_sources_or_streams_enabled = table_name and has_event_sources_or_streams_enabled(677 table_name678 )679 if event_sources_or_streams_enabled:680 records = get_updated_records(table_name, existing_items)681 self.forward_stream_records(records, table_name=table_name)682 return result683 def tag_resource(684 self, context: RequestContext, resource_arn: ResourceArnString, tags: TagList685 ) -> None:686 table_tags = DynamoDBRegion.TABLE_TAGS687 if resource_arn not in table_tags:688 table_tags[resource_arn] = {}689 table_tags[resource_arn].update({tag["Key"]: tag["Value"] for tag in tags})690 def untag_resource(691 self, context: RequestContext, resource_arn: ResourceArnString, tag_keys: TagKeyList692 ) -> None:693 for tag_key in tag_keys or []:694 DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).pop(tag_key, None)695 def list_tags_of_resource(696 self,697 context: RequestContext,698 resource_arn: ResourceArnString,699 next_token: NextTokenString = None,700 ) -> ListTagsOfResourceOutput:701 result = [702 {"Key": k, "Value": v}703 for k, v in DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).items()704 ]705 return ListTagsOfResourceOutput(Tags=result)706 def describe_time_to_live(707 self, context: RequestContext, table_name: TableName708 ) -> DescribeTimeToLiveOutput:709 backend = DynamoDBRegion.get()710 ttl_spec = backend.ttl_specifications.get(table_name)711 result = {"TimeToLiveStatus": "DISABLED"}712 if ttl_spec:713 if ttl_spec.get("Enabled"):714 ttl_status = "ENABLED"715 else:716 ttl_status = "DISABLED"717 result = {718 "AttributeName": ttl_spec.get("AttributeName"),719 "TimeToLiveStatus": ttl_status,720 }721 return DescribeTimeToLiveOutput(TimeToLiveDescription=result)722 def update_time_to_live(723 self,724 context: RequestContext,725 table_name: TableName,726 time_to_live_specification: TimeToLiveSpecification,727 ) -> UpdateTimeToLiveOutput:728 # TODO: TTL status is maintained/mocked but no real expiry is happening for items729 backend = DynamoDBRegion.get()730 backend.ttl_specifications[table_name] = time_to_live_specification731 return UpdateTimeToLiveOutput(TimeToLiveSpecification=time_to_live_specification)732 def create_global_table(733 self, context: RequestContext, global_table_name: TableName, replication_group: ReplicaList734 ) -> CreateGlobalTableOutput:735 if global_table_name in DynamoDBRegion.GLOBAL_TABLES:736 raise GlobalTableAlreadyExistsException("Global table with this name already exists")737 replication_group = [grp.copy() for grp in replication_group or []]738 data = {"GlobalTableName": global_table_name, "ReplicationGroup": replication_group}739 DynamoDBRegion.GLOBAL_TABLES[global_table_name] = data740 for group in replication_group:741 group["ReplicaStatus"] = "ACTIVE"742 group["ReplicaStatusDescription"] = "Replica active"743 return CreateGlobalTableOutput(GlobalTableDescription=data)744 def describe_global_table(745 self, context: RequestContext, global_table_name: TableName746 ) -> DescribeGlobalTableOutput:747 details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)748 if not details:749 raise GlobalTableNotFoundException("Global table with this name does not exist")750 return DescribeGlobalTableOutput(GlobalTableDescription=details)751 def list_global_tables(752 self,753 context: RequestContext,754 exclusive_start_global_table_name: TableName = None,755 limit: PositiveIntegerObject = None,756 region_name: RegionName = None,757 ) -> ListGlobalTablesOutput:758 # TODO: add paging support759 result = [760 select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])761 for tab in DynamoDBRegion.GLOBAL_TABLES.values()762 ]763 return ListGlobalTablesOutput(GlobalTables=result)764 def update_global_table(765 self,766 context: RequestContext,767 global_table_name: TableName,768 replica_updates: ReplicaUpdateList,769 ) -> UpdateGlobalTableOutput:770 details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)771 if not details:772 raise GlobalTableNotFoundException("Global table with this name does not exist")773 for update in replica_updates or []:774 repl_group = details["ReplicationGroup"]775 # delete existing776 delete = update.get("Delete")777 if delete:778 details["ReplicationGroup"] = [779 g for g in repl_group if g["RegionName"] != delete["RegionName"]780 ]781 # create new782 create = update.get("Create")783 if create:784 exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]785 if exists:786 continue787 new_group = {788 "RegionName": create["RegionName"],789 "ReplicaStatus": "ACTIVE",790 "ReplicaStatusDescription": "Replica active",791 }792 details["ReplicationGroup"].append(new_group)793 return UpdateGlobalTableOutput(GlobalTableDescription=details)794 def enable_kinesis_streaming_destination(795 self, context: RequestContext, table_name: TableName, stream_arn: StreamArn796 ) -> KinesisStreamingDestinationOutput:797 # Check if table exists, to avoid error log output from DynamoDBLocal798 if not self.table_exists(table_name):799 raise ResourceNotFoundException("Cannot do operations on a non-existent table")800 stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)801 if not stream:802 raise ValidationException("User does not have a permission to use kinesis stream")803 table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})804 dest_status = table_def.get("KinesisDataStreamDestinationStatus")805 if dest_status not in ["DISABLED", "ENABLE_FAILED", None]:806 raise ValidationException(807 "Table is not in a valid state to enable Kinesis Streaming "808 "Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "809 "to perform ENABLE operation."810 )811 table_def["KinesisDataStreamDestinations"] = (812 table_def.get("KinesisDataStreamDestinations") or []813 )814 # remove the stream destination if already present815 table_def["KinesisDataStreamDestinations"] = [816 t for t in table_def["KinesisDataStreamDestinations"] if t["StreamArn"] != stream_arn817 ]818 # append the active stream destination at the end of the list819 table_def["KinesisDataStreamDestinations"].append(820 {821 "DestinationStatus": "ACTIVE",822 "DestinationStatusDescription": "Stream is active",823 "StreamArn": stream_arn,824 }825 )826 table_def["KinesisDataStreamDestinationStatus"] = "ACTIVE"827 return KinesisStreamingDestinationOutput(828 DestinationStatus="ACTIVE", StreamArn=stream_arn, TableName=table_name829 )830 def disable_kinesis_streaming_destination(831 self, context: RequestContext, table_name: TableName, stream_arn: StreamArn832 ) -> KinesisStreamingDestinationOutput:833 # Check if table exists, to avoid error log output from DynamoDBLocal834 if not self.table_exists(table_name):835 raise ResourceNotFoundException("Cannot do operations on a non-existent table")836 stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)837 if not stream:838 raise ValidationException(839 "User does not have a permission to use kinesis stream",840 )841 table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})842 stream_destinations = table_def.get("KinesisDataStreamDestinations")843 if stream_destinations:844 if table_def["KinesisDataStreamDestinationStatus"] == "ACTIVE":845 for dest in stream_destinations:846 if dest["StreamArn"] == stream_arn and dest["DestinationStatus"] == "ACTIVE":847 dest["DestinationStatus"] = "DISABLED"848 dest["DestinationStatusDescription"] = ("Stream is disabled",)849 table_def["KinesisDataStreamDestinationStatus"] = "DISABLED"850 return KinesisStreamingDestinationOutput(851 DestinationStatus="DISABLED",852 StreamArn=stream_arn,853 TableName=table_name,854 )855 raise ValidationException(856 "Table is not in a valid state to disable Kinesis Streaming Destination:"857 "DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."858 )859 def describe_kinesis_streaming_destination(860 self, context: RequestContext, table_name: TableName861 ) -> DescribeKinesisStreamingDestinationOutput:862 # Check if table exists, to avoid error log output from DynamoDBLocal863 if not self.table_exists(table_name):864 raise ResourceNotFoundException("Cannot do operations on a non-existent table")865 table_def = DynamoDBRegion.get().table_definitions.get(table_name)866 stream_destinations = table_def.get("KinesisDataStreamDestinations") or []867 return DescribeKinesisStreamingDestinationOutput(868 KinesisDataStreamDestinations=stream_destinations, TableName=table_name869 )870 @staticmethod871 def table_exists(table_name):872 return aws_stack.dynamodb_table_exists(table_name)873 @staticmethod874 def prepare_request_headers(headers):875 def _replace(regex, replace):876 headers["Authorization"] = re.sub(877 regex, replace, headers.get("Authorization") or "", flags=re.IGNORECASE878 )879 # Note: We need to ensure that the same access key is used here for all requests,880 # otherwise DynamoDBLocal stores tables/items in separate namespaces881 _replace(r"Credential=[^/]+/", rf"Credential={constants.INTERNAL_AWS_ACCESS_KEY_ID}/")882 # Note: The NoSQL Workbench sends "localhost" or "local" as the region name, which we need to fix here883 _replace(884 r"Credential=([^/]+/[^/]+)/local(host)?/",885 rf"Credential=\1/{aws_stack.get_local_region()}/",886 )887 def fix_return_consumed_capacity(self, request_dict):888 # Fix incorrect values if ReturnValues==ALL_OLD and ReturnConsumedCapacity is889 # empty, see https://github.com/localstack/localstack/issues/2049890 return_values_all = (request_dict.get("ReturnValues") == "ALL_OLD") or (891 not request_dict.get("ReturnValues")892 )893 if return_values_all and not request_dict.get("ReturnConsumedCapacity"):894 request_dict["ReturnConsumedCapacity"] = "TOTAL"895 def fix_consumed_capacity(self, request: Dict, result: Dict):896 # make sure we append 'ConsumedCapacity', which is properly897 # returned by dynalite, but not by AWS's DynamoDBLocal898 table_name = request.get("TableName")899 return_cap = request.get("ReturnConsumedCapacity")900 if "ConsumedCapacity" not in result and return_cap in ["TOTAL", "INDEXES"]:901 request["ConsumedCapacity"] = {902 "TableName": table_name,903 "CapacityUnits": 5, # TODO hardcoded904 "ReadCapacityUnits": 2,905 "WriteCapacityUnits": 3,906 }907 def fix_table_arn(self, table_arn: str) -> str:908 return re.sub(909 "arn:aws:dynamodb:ddblocal:",...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful