How to use fix_table_arn method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...361 tags = table_definitions.pop("Tags", [])362 result["TableDescription"].pop("Tags", None)363 if tags:364 table_arn = result["TableDescription"]["TableArn"]365 table_arn = self.fix_table_arn(table_arn)366 DynamoDBRegion.TABLE_TAGS[table_arn] = {tag["Key"]: tag["Value"] for tag in tags}367 event_publisher.fire_event(368 event_publisher.EVENT_DYNAMODB_CREATE_TABLE,369 payload={"n": event_publisher.get_hash(table_name)},370 )371 return result372 def delete_table(self, context: RequestContext, table_name: TableName) -> DeleteTableOutput:373 # Check if table exists, to avoid error log output from DynamoDBLocal374 if not self.table_exists(table_name):375 raise ResourceNotFoundException("Cannot do operations on a non-existent table")376 # forward request to backend377 result = self.forward_request(context)378 event_publisher.fire_event(379 event_publisher.EVENT_DYNAMODB_DELETE_TABLE,380 payload={"n": event_publisher.get_hash(table_name)},381 )382 table_arn = result.get("TableDescription", {}).get("TableArn")383 table_arn = self.fix_table_arn(table_arn)384 self.delete_all_event_source_mappings(table_arn)385 dynamodbstreams_api.delete_streams(table_arn)386 DynamoDBRegion.TABLE_TAGS.pop(table_arn, None)387 return result388 def describe_table(self, context: RequestContext, table_name: TableName) -> DescribeTableOutput:389 # Check if table exists, to avoid error log output from DynamoDBLocal390 if not self.table_exists(table_name):391 raise ResourceNotFoundException("Cannot do operations on a non-existent table")392 # forward request to backend393 result = self.forward_request(context)394 # update response with additional props395 table_props = DynamoDBRegion.get().table_properties.get(table_name)396 if table_props:397 result.get("Table", {}).update(table_props)398 # update only TableId and SSEDescription if present399 table_definitions = DynamoDBRegion.get().table_definitions.get(table_name)400 if table_definitions:401 for key in ["TableId", "SSEDescription"]:402 if table_definitions.get(key):403 result.get("Table", {})[key] = table_definitions[key]404 return result405 @handler("UpdateTable", expand=False)406 def update_table(407 self, context: RequestContext, update_table_input: UpdateTableInput408 ) -> UpdateTableOutput:409 try:410 # forward request to backend411 result = self.forward_request(context)412 except CommonServiceException as e:413 is_no_update_error = (414 e.code == "ValidationException" and "Nothing to update" in e.message415 )416 if is_no_update_error and update_table_input.get("ReplicaUpdates"):417 table_name = update_table_input.get("TableName")418 # update local table props (replicas)419 table_properties = DynamoDBRegion.get().table_properties420 table_properties[table_name] = table_props = table_properties.get(table_name) or {}421 table_props["Replicas"] = replicas = table_props.get("Replicas") or []422 for repl_update in update_table_input["ReplicaUpdates"]:423 for key, details in repl_update.items():424 region = details.get("RegionName")425 if key == "Create":426 details["ReplicaStatus"] = details.get("ReplicaStatus") or "ACTIVE"427 replicas.append(details)428 if key == "Update":429 replica = [r for r in replicas if r.get("RegionName") == region]430 if replica:431 replica[0].update(details)432 if key == "Delete":433 table_props["Replicas"] = [434 r for r in replicas if r.get("RegionName") != region435 ]436 # update response content437 schema = SchemaExtractor.get_table_schema(table_name)438 return UpdateTableOutput(TableDescription=schema["Table"])439 raise440 if "StreamSpecification" in update_table_input:441 create_dynamodb_stream(442 update_table_input, result["TableDescription"].get("LatestStreamLabel")443 )444 return result445 def list_tables(446 self,447 context: RequestContext,448 exclusive_start_table_name: TableName = None,449 limit: ListTablesInputLimit = None,450 ) -> ListTablesOutput:451 return self.forward_request(context)452 @handler("PutItem", expand=False)453 def put_item(self, context: RequestContext, put_item_input: PutItemInput) -> PutItemOutput:454 existing_item = None455 table_name = put_item_input["TableName"]456 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)457 if event_sources_or_streams_enabled:458 existing_item = ItemFinder.find_existing_item(put_item_input)459 # forward request to backend460 self.fix_return_consumed_capacity(put_item_input)461 result = self.forward_request(context, put_item_input)462 # Get stream specifications details for the table463 if event_sources_or_streams_enabled:464 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)465 item = put_item_input["Item"]466 # prepare record keys467 keys = SchemaExtractor.extract_keys(item=item, table_name=table_name)468 # create record469 record = self.get_record_template()470 record["eventName"] = "INSERT" if not existing_item else "MODIFY"471 record["dynamodb"].update(472 {473 "Keys": keys,474 "NewImage": item,475 "SizeBytes": len(json.dumps(item)),476 }477 )478 if stream_spec:479 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]480 if existing_item:481 record["dynamodb"]["OldImage"] = existing_item482 self.forward_stream_records([record], table_name=table_name)483 return result484 @handler("DeleteItem", expand=False)485 def delete_item(486 self,487 context: RequestContext,488 delete_item_input: DeleteItemInput,489 ) -> DeleteItemOutput:490 existing_item = None491 table_name = delete_item_input["TableName"]492 if has_event_sources_or_streams_enabled(table_name):493 existing_item = ItemFinder.find_existing_item(delete_item_input)494 # forward request to backend495 self.fix_return_consumed_capacity(delete_item_input)496 result = self.forward_request(context, delete_item_input)497 # determine and forward stream record498 if existing_item:499 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)500 if event_sources_or_streams_enabled:501 # create record502 record = self.get_record_template()503 record["eventName"] = "REMOVE"504 record["dynamodb"].update(505 {506 "Keys": delete_item_input["Key"],507 "OldImage": existing_item,508 "SizeBytes": len(json.dumps(existing_item)),509 }510 )511 # Get stream specifications details for the table512 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)513 if stream_spec:514 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]515 self.forward_stream_records([record], table_name=table_name)516 return result517 @handler("UpdateItem", expand=False)518 def update_item(519 self,520 context: RequestContext,521 update_item_input: UpdateItemInput,522 ) -> UpdateItemOutput:523 existing_item = None524 table_name = update_item_input["TableName"]525 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)526 if event_sources_or_streams_enabled:527 existing_item = ItemFinder.find_existing_item(update_item_input)528 # forward request to backend529 self.fix_return_consumed_capacity(update_item_input)530 result = self.forward_request(context, update_item_input)531 # construct and forward stream record532 if event_sources_or_streams_enabled:533 updated_item = ItemFinder.find_existing_item(update_item_input)534 if updated_item:535 record = self.get_record_template()536 record["eventName"] = "INSERT" if not existing_item else "MODIFY"537 record["dynamodb"].update(538 {539 "Keys": update_item_input["Key"],540 "NewImage": updated_item,541 "SizeBytes": len(json.dumps(updated_item)),542 }543 )544 if existing_item:545 record["dynamodb"]["OldImage"] = existing_item546 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)547 if stream_spec:548 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]549 self.forward_stream_records([record], table_name=table_name)550 return result551 @handler("GetItem", expand=False)552 def get_item(self, context: RequestContext, get_item_input: GetItemInput) -> GetItemOutput:553 result = self.forward_request(context)554 self.fix_consumed_capacity(get_item_input, result)555 return result556 @handler("Query", expand=False)557 def query(self, context: RequestContext, query_input: QueryInput) -> QueryOutput:558 index_name = query_input.get("IndexName")559 if index_name:560 if not is_index_query_valid(query_input):561 raise ValidationException(562 "One or more parameter values were invalid: Select type ALL_ATTRIBUTES "563 "is not supported for global secondary index id-index because its projection "564 "type is not ALL",565 )566 result = self.forward_request(context)567 self.fix_consumed_capacity(query_input, result)568 return result569 @handler("Scan", expand=False)570 def scan(self, context: RequestContext, scan_input: ScanInput) -> ScanOutput:571 return self.forward_request(context)572 @handler("BatchWriteItem", expand=False)573 def batch_write_item(574 self,575 context: RequestContext,576 batch_write_item_input: BatchWriteItemInput,577 ) -> BatchWriteItemOutput:578 existing_items = []579 unprocessed_put_items = []580 unprocessed_delete_items = []581 request_items = batch_write_item_input["RequestItems"]582 for table_name in sorted(request_items.keys()):583 for request in request_items[table_name]:584 for key in ["PutRequest", "DeleteRequest"]:585 inner_request = request.get(key)586 if inner_request:587 if self.should_throttle("BatchWriteItem"):588 if key == "PutRequest":589 unprocessed_put_items.append(inner_request)590 elif key == "DeleteRequest":591 unprocessed_delete_items.append(inner_request)592 else:593 item = ItemFinder.find_existing_item(inner_request, table_name)594 existing_items.append(item)595 # forward request to backend596 result = self.forward_request(context)597 # determine and forward stream records598 request_items = batch_write_item_input["RequestItems"]599 records, unprocessed_items = self.prepare_batch_write_item_records(600 request_items=request_items,601 unprocessed_put_items=unprocessed_put_items,602 unprocessed_delete_items=unprocessed_delete_items,603 existing_items=existing_items,604 )605 streams_enabled_cache = {}606 event_sources_or_streams_enabled = False607 for record in records:608 event_sources_or_streams_enabled = (609 event_sources_or_streams_enabled610 or has_event_sources_or_streams_enabled(611 record["eventSourceARN"], streams_enabled_cache612 )613 )614 if event_sources_or_streams_enabled:615 self.forward_stream_records(records)616 # update response617 if any(unprocessed_items):618 table_name = list(request_items.keys())[0]619 unprocessed = result["UnprocessedItems"]620 if table_name not in unprocessed:621 unprocessed[table_name] = []622 for key in ["PutRequest", "DeleteRequest"]:623 if any(unprocessed_items[key]):624 unprocessed_items[table_name].append({key: unprocessed_items[key]})625 for key in list(unprocessed.keys()):626 if not unprocessed.get(key):627 del unprocessed[key]628 return result629 @handler("TransactWriteItems", expand=False)630 def transact_write_items(631 self,632 context: RequestContext,633 transact_write_items_input: TransactWriteItemsInput,634 ) -> TransactWriteItemsOutput:635 existing_items = []636 for item in transact_write_items_input["TransactItems"]:637 for key in ["Put", "Update", "Delete"]:638 inner_item = item.get(key)639 if inner_item:640 existing_items.append(ItemFinder.find_existing_item(inner_item))641 # forward request to backend642 result = self.forward_request(context)643 # determine and forward stream records644 streams_enabled_cache = {}645 records = self.prepare_transact_write_item_records(646 transact_items=transact_write_items_input["TransactItems"],647 existing_items=existing_items,648 )649 event_sources_or_streams_enabled = False650 for record in records:651 event_sources_or_streams_enabled = (652 event_sources_or_streams_enabled653 or has_event_sources_or_streams_enabled(654 record["eventSourceARN"], streams_enabled_cache655 )656 )657 if event_sources_or_streams_enabled:658 self.forward_stream_records(records)659 return result660 @handler("ExecuteStatement", expand=False)661 def execute_statement(662 self,663 context: RequestContext,664 execute_statement_input: ExecuteStatementInput,665 ) -> ExecuteStatementOutput:666 statement = execute_statement_input["Statement"]667 table_name = extract_table_name_from_partiql_update(statement)668 existing_items = None669 if table_name and has_event_sources_or_streams_enabled(table_name):670 # Note: fetching the entire list of items is hugely inefficient, especially for larger tables671 # TODO: find a mechanism to hook into the PartiQL update mechanism of DynamoDB Local directly!672 existing_items = ItemFinder.list_existing_items_for_statement(statement)673 # forward request to backend674 result = self.forward_request(context)675 # construct and forward stream record676 event_sources_or_streams_enabled = table_name and has_event_sources_or_streams_enabled(677 table_name678 )679 if event_sources_or_streams_enabled:680 records = get_updated_records(table_name, existing_items)681 self.forward_stream_records(records, table_name=table_name)682 return result683 def tag_resource(684 self, context: RequestContext, resource_arn: ResourceArnString, tags: TagList685 ) -> None:686 table_tags = DynamoDBRegion.TABLE_TAGS687 if resource_arn not in table_tags:688 table_tags[resource_arn] = {}689 table_tags[resource_arn].update({tag["Key"]: tag["Value"] for tag in tags})690 def untag_resource(691 self, context: RequestContext, resource_arn: ResourceArnString, tag_keys: TagKeyList692 ) -> None:693 for tag_key in tag_keys or []:694 DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).pop(tag_key, None)695 def list_tags_of_resource(696 self,697 context: RequestContext,698 resource_arn: ResourceArnString,699 next_token: NextTokenString = None,700 ) -> ListTagsOfResourceOutput:701 result = [702 {"Key": k, "Value": v}703 for k, v in DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).items()704 ]705 return ListTagsOfResourceOutput(Tags=result)706 def describe_time_to_live(707 self, context: RequestContext, table_name: TableName708 ) -> DescribeTimeToLiveOutput:709 backend = DynamoDBRegion.get()710 ttl_spec = backend.ttl_specifications.get(table_name)711 result = {"TimeToLiveStatus": "DISABLED"}712 if ttl_spec:713 if ttl_spec.get("Enabled"):714 ttl_status = "ENABLED"715 else:716 ttl_status = "DISABLED"717 result = {718 "AttributeName": ttl_spec.get("AttributeName"),719 "TimeToLiveStatus": ttl_status,720 }721 return DescribeTimeToLiveOutput(TimeToLiveDescription=result)722 def update_time_to_live(723 self,724 context: RequestContext,725 table_name: TableName,726 time_to_live_specification: TimeToLiveSpecification,727 ) -> UpdateTimeToLiveOutput:728 # TODO: TTL status is maintained/mocked but no real expiry is happening for items729 backend = DynamoDBRegion.get()730 backend.ttl_specifications[table_name] = time_to_live_specification731 return UpdateTimeToLiveOutput(TimeToLiveSpecification=time_to_live_specification)732 def create_global_table(733 self, context: RequestContext, global_table_name: TableName, replication_group: ReplicaList734 ) -> CreateGlobalTableOutput:735 if global_table_name in DynamoDBRegion.GLOBAL_TABLES:736 raise GlobalTableAlreadyExistsException("Global table with this name already exists")737 replication_group = [grp.copy() for grp in replication_group or []]738 data = {"GlobalTableName": global_table_name, "ReplicationGroup": replication_group}739 DynamoDBRegion.GLOBAL_TABLES[global_table_name] = data740 for group in replication_group:741 group["ReplicaStatus"] = "ACTIVE"742 group["ReplicaStatusDescription"] = "Replica active"743 return CreateGlobalTableOutput(GlobalTableDescription=data)744 def describe_global_table(745 self, context: RequestContext, global_table_name: TableName746 ) -> DescribeGlobalTableOutput:747 details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)748 if not details:749 raise GlobalTableNotFoundException("Global table with this name does not exist")750 return DescribeGlobalTableOutput(GlobalTableDescription=details)751 def list_global_tables(752 self,753 context: RequestContext,754 exclusive_start_global_table_name: TableName = None,755 limit: PositiveIntegerObject = None,756 region_name: RegionName = None,757 ) -> ListGlobalTablesOutput:758 # TODO: add paging support759 result = [760 select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])761 for tab in DynamoDBRegion.GLOBAL_TABLES.values()762 ]763 return ListGlobalTablesOutput(GlobalTables=result)764 def update_global_table(765 self,766 context: RequestContext,767 global_table_name: TableName,768 replica_updates: ReplicaUpdateList,769 ) -> UpdateGlobalTableOutput:770 details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)771 if not details:772 raise GlobalTableNotFoundException("Global table with this name does not exist")773 for update in replica_updates or []:774 repl_group = details["ReplicationGroup"]775 # delete existing776 delete = update.get("Delete")777 if delete:778 details["ReplicationGroup"] = [779 g for g in repl_group if g["RegionName"] != delete["RegionName"]780 ]781 # create new782 create = update.get("Create")783 if create:784 exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]785 if exists:786 continue787 new_group = {788 "RegionName": create["RegionName"],789 "ReplicaStatus": "ACTIVE",790 "ReplicaStatusDescription": "Replica active",791 }792 details["ReplicationGroup"].append(new_group)793 return UpdateGlobalTableOutput(GlobalTableDescription=details)794 def enable_kinesis_streaming_destination(795 self, context: RequestContext, table_name: TableName, stream_arn: StreamArn796 ) -> KinesisStreamingDestinationOutput:797 # Check if table exists, to avoid error log output from DynamoDBLocal798 if not self.table_exists(table_name):799 raise ResourceNotFoundException("Cannot do operations on a non-existent table")800 stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)801 if not stream:802 raise ValidationException("User does not have a permission to use kinesis stream")803 table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})804 dest_status = table_def.get("KinesisDataStreamDestinationStatus")805 if dest_status not in ["DISABLED", "ENABLE_FAILED", None]:806 raise ValidationException(807 "Table is not in a valid state to enable Kinesis Streaming "808 "Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "809 "to perform ENABLE operation."810 )811 table_def["KinesisDataStreamDestinations"] = (812 table_def.get("KinesisDataStreamDestinations") or []813 )814 # remove the stream destination if already present815 table_def["KinesisDataStreamDestinations"] = [816 t for t in table_def["KinesisDataStreamDestinations"] if t["StreamArn"] != stream_arn817 ]818 # append the active stream destination at the end of the list819 table_def["KinesisDataStreamDestinations"].append(820 {821 "DestinationStatus": "ACTIVE",822 "DestinationStatusDescription": "Stream is active",823 "StreamArn": stream_arn,824 }825 )826 table_def["KinesisDataStreamDestinationStatus"] = "ACTIVE"827 return KinesisStreamingDestinationOutput(828 DestinationStatus="ACTIVE", StreamArn=stream_arn, TableName=table_name829 )830 def disable_kinesis_streaming_destination(831 self, context: RequestContext, table_name: TableName, stream_arn: StreamArn832 ) -> KinesisStreamingDestinationOutput:833 # Check if table exists, to avoid error log output from DynamoDBLocal834 if not self.table_exists(table_name):835 raise ResourceNotFoundException("Cannot do operations on a non-existent table")836 stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)837 if not stream:838 raise ValidationException(839 "User does not have a permission to use kinesis stream",840 )841 table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})842 stream_destinations = table_def.get("KinesisDataStreamDestinations")843 if stream_destinations:844 if table_def["KinesisDataStreamDestinationStatus"] == "ACTIVE":845 for dest in stream_destinations:846 if dest["StreamArn"] == stream_arn and dest["DestinationStatus"] == "ACTIVE":847 dest["DestinationStatus"] = "DISABLED"848 dest["DestinationStatusDescription"] = ("Stream is disabled",)849 table_def["KinesisDataStreamDestinationStatus"] = "DISABLED"850 return KinesisStreamingDestinationOutput(851 DestinationStatus="DISABLED",852 StreamArn=stream_arn,853 TableName=table_name,854 )855 raise ValidationException(856 "Table is not in a valid state to disable Kinesis Streaming Destination:"857 "DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."858 )859 def describe_kinesis_streaming_destination(860 self, context: RequestContext, table_name: TableName861 ) -> DescribeKinesisStreamingDestinationOutput:862 # Check if table exists, to avoid error log output from DynamoDBLocal863 if not self.table_exists(table_name):864 raise ResourceNotFoundException("Cannot do operations on a non-existent table")865 table_def = DynamoDBRegion.get().table_definitions.get(table_name)866 stream_destinations = table_def.get("KinesisDataStreamDestinations") or []867 return DescribeKinesisStreamingDestinationOutput(868 KinesisDataStreamDestinations=stream_destinations, TableName=table_name869 )870 @staticmethod871 def table_exists(table_name):872 return aws_stack.dynamodb_table_exists(table_name)873 @staticmethod874 def prepare_request_headers(headers):875 def _replace(regex, replace):876 headers["Authorization"] = re.sub(877 regex, replace, headers.get("Authorization") or "", flags=re.IGNORECASE878 )879 # Note: We need to ensure that the same access key is used here for all requests,880 # otherwise DynamoDBLocal stores tables/items in separate namespaces881 _replace(r"Credential=[^/]+/", rf"Credential={constants.INTERNAL_AWS_ACCESS_KEY_ID}/")882 # Note: The NoSQL Workbench sends "localhost" or "local" as the region name, which we need to fix here883 _replace(884 r"Credential=([^/]+/[^/]+)/local(host)?/",885 rf"Credential=\1/{aws_stack.get_local_region()}/",886 )887 def fix_return_consumed_capacity(self, request_dict):888 # Fix incorrect values if ReturnValues==ALL_OLD and ReturnConsumedCapacity is889 # empty, see https://github.com/localstack/localstack/issues/2049890 return_values_all = (request_dict.get("ReturnValues") == "ALL_OLD") or (891 not request_dict.get("ReturnValues")892 )893 if return_values_all and not request_dict.get("ReturnConsumedCapacity"):894 request_dict["ReturnConsumedCapacity"] = "TOTAL"895 def fix_consumed_capacity(self, request: Dict, result: Dict):896 # make sure we append 'ConsumedCapacity', which is properly897 # returned by dynalite, but not by AWS's DynamoDBLocal898 table_name = request.get("TableName")899 return_cap = request.get("ReturnConsumedCapacity")900 if "ConsumedCapacity" not in result and return_cap in ["TOTAL", "INDEXES"]:901 request["ConsumedCapacity"] = {902 "TableName": table_name,903 "CapacityUnits": 5, # TODO hardcoded904 "ReadCapacityUnits": 2,905 "WriteCapacityUnits": 3,906 }907 def fix_table_arn(self, table_arn: str) -> str:908 return re.sub(909 "arn:aws:dynamodb:ddblocal:",910 f"arn:aws:dynamodb:{aws_stack.get_region()}:",911 table_arn,912 )913 def prepare_transact_write_item_records(self, transact_items, existing_items):914 records = []915 record = self.get_record_template()916 # Fix issue #2745: existing_items only contain the Put/Update/Delete records,917 # so we will increase the index based on these events918 i = 0919 for request in transact_items:920 put_request = request.get("Put")921 if put_request:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful