How to use dynamodb_get_table_stream_specification method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...460 self.fix_return_consumed_capacity(put_item_input)461 result = self.forward_request(context, put_item_input)462 # Get stream specifications details for the table463 if event_sources_or_streams_enabled:464 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)465 item = put_item_input["Item"]466 # prepare record keys467 keys = SchemaExtractor.extract_keys(item=item, table_name=table_name)468 # create record469 record = self.get_record_template()470 record["eventName"] = "INSERT" if not existing_item else "MODIFY"471 record["dynamodb"].update(472 {473 "Keys": keys,474 "NewImage": item,475 "SizeBytes": len(json.dumps(item)),476 }477 )478 if stream_spec:479 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]480 if existing_item:481 record["dynamodb"]["OldImage"] = existing_item482 self.forward_stream_records([record], table_name=table_name)483 return result484 @handler("DeleteItem", expand=False)485 def delete_item(486 self,487 context: RequestContext,488 delete_item_input: DeleteItemInput,489 ) -> DeleteItemOutput:490 existing_item = None491 table_name = delete_item_input["TableName"]492 if has_event_sources_or_streams_enabled(table_name):493 existing_item = ItemFinder.find_existing_item(delete_item_input)494 # forward request to backend495 self.fix_return_consumed_capacity(delete_item_input)496 result = self.forward_request(context, delete_item_input)497 # determine and forward stream record498 if existing_item:499 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)500 if event_sources_or_streams_enabled:501 # create record502 record = self.get_record_template()503 record["eventName"] = "REMOVE"504 record["dynamodb"].update(505 {506 "Keys": delete_item_input["Key"],507 "OldImage": existing_item,508 "SizeBytes": len(json.dumps(existing_item)),509 }510 )511 # Get stream specifications details for the table512 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)513 if stream_spec:514 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]515 self.forward_stream_records([record], table_name=table_name)516 return result517 @handler("UpdateItem", expand=False)518 def update_item(519 self,520 context: RequestContext,521 update_item_input: UpdateItemInput,522 ) -> UpdateItemOutput:523 existing_item = None524 table_name = update_item_input["TableName"]525 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)526 if event_sources_or_streams_enabled:527 existing_item = ItemFinder.find_existing_item(update_item_input)528 # forward request to backend529 self.fix_return_consumed_capacity(update_item_input)530 result = self.forward_request(context, update_item_input)531 # construct and forward stream record532 if event_sources_or_streams_enabled:533 updated_item = ItemFinder.find_existing_item(update_item_input)534 if updated_item:535 record = self.get_record_template()536 record["eventName"] = "INSERT" if not existing_item else "MODIFY"537 record["dynamodb"].update(538 {539 "Keys": update_item_input["Key"],540 "NewImage": updated_item,541 "SizeBytes": len(json.dumps(updated_item)),542 }543 )544 if existing_item:545 record["dynamodb"]["OldImage"] = existing_item546 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)547 if stream_spec:548 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]549 self.forward_stream_records([record], table_name=table_name)550 return result551 @handler("GetItem", expand=False)552 def get_item(self, context: RequestContext, get_item_input: GetItemInput) -> GetItemOutput:553 result = self.forward_request(context)554 self.fix_consumed_capacity(get_item_input, result)555 return result556 @handler("Query", expand=False)557 def query(self, context: RequestContext, query_input: QueryInput) -> QueryOutput:558 index_name = query_input.get("IndexName")559 if index_name:560 if not is_index_query_valid(query_input):561 raise ValidationException(562 "One or more parameter values were invalid: Select type ALL_ATTRIBUTES "563 "is not supported for global secondary index id-index because its projection "564 "type is not ALL",565 )566 result = self.forward_request(context)567 self.fix_consumed_capacity(query_input, result)568 return result569 @handler("Scan", expand=False)570 def scan(self, context: RequestContext, scan_input: ScanInput) -> ScanOutput:571 return self.forward_request(context)572 @handler("BatchWriteItem", expand=False)573 def batch_write_item(574 self,575 context: RequestContext,576 batch_write_item_input: BatchWriteItemInput,577 ) -> BatchWriteItemOutput:578 existing_items = []579 unprocessed_put_items = []580 unprocessed_delete_items = []581 request_items = batch_write_item_input["RequestItems"]582 for table_name in sorted(request_items.keys()):583 for request in request_items[table_name]:584 for key in ["PutRequest", "DeleteRequest"]:585 inner_request = request.get(key)586 if inner_request:587 if self.should_throttle("BatchWriteItem"):588 if key == "PutRequest":589 unprocessed_put_items.append(inner_request)590 elif key == "DeleteRequest":591 unprocessed_delete_items.append(inner_request)592 else:593 item = ItemFinder.find_existing_item(inner_request, table_name)594 existing_items.append(item)595 # forward request to backend596 result = self.forward_request(context)597 # determine and forward stream records598 request_items = batch_write_item_input["RequestItems"]599 records, unprocessed_items = self.prepare_batch_write_item_records(600 request_items=request_items,601 unprocessed_put_items=unprocessed_put_items,602 unprocessed_delete_items=unprocessed_delete_items,603 existing_items=existing_items,604 )605 streams_enabled_cache = {}606 event_sources_or_streams_enabled = False607 for record in records:608 event_sources_or_streams_enabled = (609 event_sources_or_streams_enabled610 or has_event_sources_or_streams_enabled(611 record["eventSourceARN"], streams_enabled_cache612 )613 )614 if event_sources_or_streams_enabled:615 self.forward_stream_records(records)616 # update response617 if any(unprocessed_items):618 table_name = list(request_items.keys())[0]619 unprocessed = result["UnprocessedItems"]620 if table_name not in unprocessed:621 unprocessed[table_name] = []622 for key in ["PutRequest", "DeleteRequest"]:623 if any(unprocessed_items[key]):624 unprocessed_items[table_name].append({key: unprocessed_items[key]})625 for key in list(unprocessed.keys()):626 if not unprocessed.get(key):627 del unprocessed[key]628 return result629 @handler("TransactWriteItems", expand=False)630 def transact_write_items(631 self,632 context: RequestContext,633 transact_write_items_input: TransactWriteItemsInput,634 ) -> TransactWriteItemsOutput:635 existing_items = []636 for item in transact_write_items_input["TransactItems"]:637 for key in ["Put", "Update", "Delete"]:638 inner_item = item.get(key)639 if inner_item:640 existing_items.append(ItemFinder.find_existing_item(inner_item))641 # forward request to backend642 result = self.forward_request(context)643 # determine and forward stream records644 streams_enabled_cache = {}645 records = self.prepare_transact_write_item_records(646 transact_items=transact_write_items_input["TransactItems"],647 existing_items=existing_items,648 )649 event_sources_or_streams_enabled = False650 for record in records:651 event_sources_or_streams_enabled = (652 event_sources_or_streams_enabled653 or has_event_sources_or_streams_enabled(654 record["eventSourceARN"], streams_enabled_cache655 )656 )657 if event_sources_or_streams_enabled:658 self.forward_stream_records(records)659 return result660 @handler("ExecuteStatement", expand=False)661 def execute_statement(662 self,663 context: RequestContext,664 execute_statement_input: ExecuteStatementInput,665 ) -> ExecuteStatementOutput:666 statement = execute_statement_input["Statement"]667 table_name = extract_table_name_from_partiql_update(statement)668 existing_items = None669 if table_name and has_event_sources_or_streams_enabled(table_name):670 # Note: fetching the entire list of items is hugely inefficient, especially for larger tables671 # TODO: find a mechanism to hook into the PartiQL update mechanism of DynamoDB Local directly!672 existing_items = ItemFinder.list_existing_items_for_statement(statement)673 # forward request to backend674 result = self.forward_request(context)675 # construct and forward stream record676 event_sources_or_streams_enabled = table_name and has_event_sources_or_streams_enabled(677 table_name678 )679 if event_sources_or_streams_enabled:680 records = get_updated_records(table_name, existing_items)681 self.forward_stream_records(records, table_name=table_name)682 return result683 def tag_resource(684 self, context: RequestContext, resource_arn: ResourceArnString, tags: TagList685 ) -> None:686 table_tags = DynamoDBRegion.TABLE_TAGS687 if resource_arn not in table_tags:688 table_tags[resource_arn] = {}689 table_tags[resource_arn].update({tag["Key"]: tag["Value"] for tag in tags})690 def untag_resource(691 self, context: RequestContext, resource_arn: ResourceArnString, tag_keys: TagKeyList692 ) -> None:693 for tag_key in tag_keys or []:694 DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).pop(tag_key, None)695 def list_tags_of_resource(696 self,697 context: RequestContext,698 resource_arn: ResourceArnString,699 next_token: NextTokenString = None,700 ) -> ListTagsOfResourceOutput:701 result = [702 {"Key": k, "Value": v}703 for k, v in DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).items()704 ]705 return ListTagsOfResourceOutput(Tags=result)706 def describe_time_to_live(707 self, context: RequestContext, table_name: TableName708 ) -> DescribeTimeToLiveOutput:709 backend = DynamoDBRegion.get()710 ttl_spec = backend.ttl_specifications.get(table_name)711 result = {"TimeToLiveStatus": "DISABLED"}712 if ttl_spec:713 if ttl_spec.get("Enabled"):714 ttl_status = "ENABLED"715 else:716 ttl_status = "DISABLED"717 result = {718 "AttributeName": ttl_spec.get("AttributeName"),719 "TimeToLiveStatus": ttl_status,720 }721 return DescribeTimeToLiveOutput(TimeToLiveDescription=result)722 def update_time_to_live(723 self,724 context: RequestContext,725 table_name: TableName,726 time_to_live_specification: TimeToLiveSpecification,727 ) -> UpdateTimeToLiveOutput:728 # TODO: TTL status is maintained/mocked but no real expiry is happening for items729 backend = DynamoDBRegion.get()730 backend.ttl_specifications[table_name] = time_to_live_specification731 return UpdateTimeToLiveOutput(TimeToLiveSpecification=time_to_live_specification)732 def create_global_table(733 self, context: RequestContext, global_table_name: TableName, replication_group: ReplicaList734 ) -> CreateGlobalTableOutput:735 if global_table_name in DynamoDBRegion.GLOBAL_TABLES:736 raise GlobalTableAlreadyExistsException("Global table with this name already exists")737 replication_group = [grp.copy() for grp in replication_group or []]738 data = {"GlobalTableName": global_table_name, "ReplicationGroup": replication_group}739 DynamoDBRegion.GLOBAL_TABLES[global_table_name] = data740 for group in replication_group:741 group["ReplicaStatus"] = "ACTIVE"742 group["ReplicaStatusDescription"] = "Replica active"743 return CreateGlobalTableOutput(GlobalTableDescription=data)744 def describe_global_table(745 self, context: RequestContext, global_table_name: TableName746 ) -> DescribeGlobalTableOutput:747 details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)748 if not details:749 raise GlobalTableNotFoundException("Global table with this name does not exist")750 return DescribeGlobalTableOutput(GlobalTableDescription=details)751 def list_global_tables(752 self,753 context: RequestContext,754 exclusive_start_global_table_name: TableName = None,755 limit: PositiveIntegerObject = None,756 region_name: RegionName = None,757 ) -> ListGlobalTablesOutput:758 # TODO: add paging support759 result = [760 select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])761 for tab in DynamoDBRegion.GLOBAL_TABLES.values()762 ]763 return ListGlobalTablesOutput(GlobalTables=result)764 def update_global_table(765 self,766 context: RequestContext,767 global_table_name: TableName,768 replica_updates: ReplicaUpdateList,769 ) -> UpdateGlobalTableOutput:770 details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)771 if not details:772 raise GlobalTableNotFoundException("Global table with this name does not exist")773 for update in replica_updates or []:774 repl_group = details["ReplicationGroup"]775 # delete existing776 delete = update.get("Delete")777 if delete:778 details["ReplicationGroup"] = [779 g for g in repl_group if g["RegionName"] != delete["RegionName"]780 ]781 # create new782 create = update.get("Create")783 if create:784 exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]785 if exists:786 continue787 new_group = {788 "RegionName": create["RegionName"],789 "ReplicaStatus": "ACTIVE",790 "ReplicaStatusDescription": "Replica active",791 }792 details["ReplicationGroup"].append(new_group)793 return UpdateGlobalTableOutput(GlobalTableDescription=details)794 def enable_kinesis_streaming_destination(795 self, context: RequestContext, table_name: TableName, stream_arn: StreamArn796 ) -> KinesisStreamingDestinationOutput:797 # Check if table exists, to avoid error log output from DynamoDBLocal798 if not self.table_exists(table_name):799 raise ResourceNotFoundException("Cannot do operations on a non-existent table")800 stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)801 if not stream:802 raise ValidationException("User does not have a permission to use kinesis stream")803 table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})804 dest_status = table_def.get("KinesisDataStreamDestinationStatus")805 if dest_status not in ["DISABLED", "ENABLE_FAILED", None]:806 raise ValidationException(807 "Table is not in a valid state to enable Kinesis Streaming "808 "Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "809 "to perform ENABLE operation."810 )811 table_def["KinesisDataStreamDestinations"] = (812 table_def.get("KinesisDataStreamDestinations") or []813 )814 # remove the stream destination if already present815 table_def["KinesisDataStreamDestinations"] = [816 t for t in table_def["KinesisDataStreamDestinations"] if t["StreamArn"] != stream_arn817 ]818 # append the active stream destination at the end of the list819 table_def["KinesisDataStreamDestinations"].append(820 {821 "DestinationStatus": "ACTIVE",822 "DestinationStatusDescription": "Stream is active",823 "StreamArn": stream_arn,824 }825 )826 table_def["KinesisDataStreamDestinationStatus"] = "ACTIVE"827 return KinesisStreamingDestinationOutput(828 DestinationStatus="ACTIVE", StreamArn=stream_arn, TableName=table_name829 )830 def disable_kinesis_streaming_destination(831 self, context: RequestContext, table_name: TableName, stream_arn: StreamArn832 ) -> KinesisStreamingDestinationOutput:833 # Check if table exists, to avoid error log output from DynamoDBLocal834 if not self.table_exists(table_name):835 raise ResourceNotFoundException("Cannot do operations on a non-existent table")836 stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)837 if not stream:838 raise ValidationException(839 "User does not have a permission to use kinesis stream",840 )841 table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})842 stream_destinations = table_def.get("KinesisDataStreamDestinations")843 if stream_destinations:844 if table_def["KinesisDataStreamDestinationStatus"] == "ACTIVE":845 for dest in stream_destinations:846 if dest["StreamArn"] == stream_arn and dest["DestinationStatus"] == "ACTIVE":847 dest["DestinationStatus"] = "DISABLED"848 dest["DestinationStatusDescription"] = ("Stream is disabled",)849 table_def["KinesisDataStreamDestinationStatus"] = "DISABLED"850 return KinesisStreamingDestinationOutput(851 DestinationStatus="DISABLED",852 StreamArn=stream_arn,853 TableName=table_name,854 )855 raise ValidationException(856 "Table is not in a valid state to disable Kinesis Streaming Destination:"857 "DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."858 )859 def describe_kinesis_streaming_destination(860 self, context: RequestContext, table_name: TableName861 ) -> DescribeKinesisStreamingDestinationOutput:862 # Check if table exists, to avoid error log output from DynamoDBLocal863 if not self.table_exists(table_name):864 raise ResourceNotFoundException("Cannot do operations on a non-existent table")865 table_def = DynamoDBRegion.get().table_definitions.get(table_name)866 stream_destinations = table_def.get("KinesisDataStreamDestinations") or []867 return DescribeKinesisStreamingDestinationOutput(868 KinesisDataStreamDestinations=stream_destinations, TableName=table_name869 )870 @staticmethod871 def table_exists(table_name):872 return aws_stack.dynamodb_table_exists(table_name)873 @staticmethod874 def prepare_request_headers(headers):875 def _replace(regex, replace):876 headers["Authorization"] = re.sub(877 regex, replace, headers.get("Authorization") or "", flags=re.IGNORECASE878 )879 # Note: We need to ensure that the same access key is used here for all requests,880 # otherwise DynamoDBLocal stores tables/items in separate namespaces881 _replace(r"Credential=[^/]+/", rf"Credential={constants.INTERNAL_AWS_ACCESS_KEY_ID}/")882 # Note: The NoSQL Workbench sends "localhost" or "local" as the region name, which we need to fix here883 _replace(884 r"Credential=([^/]+/[^/]+)/local(host)?/",885 rf"Credential=\1/{aws_stack.get_local_region()}/",886 )887 def fix_return_consumed_capacity(self, request_dict):888 # Fix incorrect values if ReturnValues==ALL_OLD and ReturnConsumedCapacity is889 # empty, see https://github.com/localstack/localstack/issues/2049890 return_values_all = (request_dict.get("ReturnValues") == "ALL_OLD") or (891 not request_dict.get("ReturnValues")892 )893 if return_values_all and not request_dict.get("ReturnConsumedCapacity"):894 request_dict["ReturnConsumedCapacity"] = "TOTAL"895 def fix_consumed_capacity(self, request: Dict, result: Dict):896 # make sure we append 'ConsumedCapacity', which is properly897 # returned by dynalite, but not by AWS's DynamoDBLocal898 table_name = request.get("TableName")899 return_cap = request.get("ReturnConsumedCapacity")900 if "ConsumedCapacity" not in result and return_cap in ["TOTAL", "INDEXES"]:901 request["ConsumedCapacity"] = {902 "TableName": table_name,903 "CapacityUnits": 5, # TODO hardcoded904 "ReadCapacityUnits": 2,905 "WriteCapacityUnits": 3,906 }907 def fix_table_arn(self, table_arn: str) -> str:908 return re.sub(909 "arn:aws:dynamodb:ddblocal:",910 f"arn:aws:dynamodb:{aws_stack.get_region()}:",911 table_arn,912 )913 def prepare_transact_write_item_records(self, transact_items, existing_items):914 records = []915 record = self.get_record_template()916 # Fix issue #2745: existing_items only contain the Put/Update/Delete records,917 # so we will increase the index based on these events918 i = 0919 for request in transact_items:920 put_request = request.get("Put")921 if put_request:922 existing_item = existing_items[i]923 table_name = put_request["TableName"]924 keys = SchemaExtractor.extract_keys(item=put_request["Item"], table_name=table_name)925 # Add stream view type to record if ddb stream is enabled926 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)927 if stream_spec:928 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]929 new_record = copy.deepcopy(record)930 new_record["eventID"] = short_uid()931 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"932 new_record["dynamodb"]["Keys"] = keys933 new_record["dynamodb"]["NewImage"] = put_request["Item"]934 if existing_item:935 new_record["dynamodb"]["OldImage"] = existing_item936 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)937 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))938 records.append(new_record)939 i += 1940 update_request = request.get("Update")941 if update_request:942 table_name = update_request["TableName"]943 keys = update_request["Key"]944 updated_item = ItemFinder.find_existing_item(update_request, table_name)945 if not updated_item:946 return []947 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)948 if stream_spec:949 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]950 new_record = copy.deepcopy(record)951 new_record["eventID"] = short_uid()952 new_record["eventName"] = "MODIFY"953 new_record["dynamodb"]["Keys"] = keys954 new_record["dynamodb"]["OldImage"] = existing_items[i]955 new_record["dynamodb"]["NewImage"] = updated_item956 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)957 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))958 records.append(new_record)959 i += 1960 delete_request = request.get("Delete")961 if delete_request:962 table_name = delete_request["TableName"]963 keys = delete_request["Key"]964 existing_item = existing_items[i]965 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)966 if stream_spec:967 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]968 new_record = copy.deepcopy(record)969 new_record["eventID"] = short_uid()970 new_record["eventName"] = "REMOVE"971 new_record["dynamodb"]["Keys"] = keys972 new_record["dynamodb"]["OldImage"] = existing_item973 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_item))974 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)975 records.append(new_record)976 i += 1977 return records978 def prepare_batch_write_item_records(979 self,980 request_items,981 existing_items,982 unprocessed_put_items: List,983 unprocessed_delete_items: List,984 ):985 records = []986 record = self.get_record_template()987 unprocessed_items = {"PutRequest": {}, "DeleteRequest": {}}988 i = 0989 for table_name in sorted(request_items.keys()):990 # Add stream view type to record if ddb stream is enabled991 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)992 if stream_spec:993 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]994 for request in request_items[table_name]:995 put_request = request.get("PutRequest")996 if put_request:997 if existing_items and len(existing_items) > i:998 existing_item = existing_items[i]999 keys = SchemaExtractor.extract_keys(1000 item=put_request["Item"], table_name=table_name1001 )1002 new_record = copy.deepcopy(record)1003 new_record["eventID"] = short_uid()1004 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))1005 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"1006 new_record["dynamodb"]["Keys"] = keys1007 new_record["dynamodb"]["NewImage"] = put_request["Item"]1008 if existing_item:1009 new_record["dynamodb"]["OldImage"] = existing_item1010 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1011 records.append(new_record)1012 if unprocessed_put_items and len(unprocessed_put_items) > i:1013 unprocessed_item = unprocessed_put_items[i]1014 if unprocessed_item:1015 unprocessed_items["PutRequest"].update(1016 json.loads(json.dumps(unprocessed_item))1017 )1018 delete_request = request.get("DeleteRequest")1019 if delete_request:1020 if existing_items and len(existing_items) > i:1021 keys = delete_request["Key"]1022 new_record = copy.deepcopy(record)1023 new_record["eventID"] = short_uid()1024 new_record["eventName"] = "REMOVE"1025 new_record["dynamodb"]["Keys"] = keys1026 new_record["dynamodb"]["OldImage"] = existing_items[i]1027 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_items[i]))1028 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1029 records.append(new_record)1030 if unprocessed_delete_items and len(unprocessed_delete_items) > i:1031 unprocessed_item = unprocessed_delete_items[i]1032 if unprocessed_item:1033 unprocessed_items["DeleteRequest"].update(1034 json.loads(json.dumps(unprocessed_item))1035 )1036 i += 11037 return records, unprocessed_items1038 def forward_stream_records(self, records: List[Dict], table_name: str = None):1039 if records and "eventName" in records[0]:1040 if table_name:1041 for record in records:1042 record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1043 EventForwarder.forward_to_targets(records, background=True)1044 def delete_all_event_source_mappings(self, table_arn):1045 if table_arn:1046 # fix start dynamodb service without lambda1047 if not is_api_enabled("lambda"):1048 return1049 lambda_client = aws_stack.connect_to_service("lambda")1050 result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)1051 for event in result["EventSourceMappings"]:1052 event_source_mapping_id = event["UUID"]1053 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_id)1054 def get_record_template(self) -> Dict:1055 return {1056 "eventID": short_uid(),1057 "eventVersion": "1.1",1058 "dynamodb": {1059 # expects nearest second rounded down1060 "ApproximateCreationDateTime": int(time.time()),1061 # 'StreamViewType': 'NEW_AND_OLD_IMAGES',1062 "SizeBytes": -1,1063 },1064 "awsRegion": aws_stack.get_region(),1065 "eventSource": "aws:dynamodb",1066 }1067 def action_should_throttle(self, action, actions):1068 throttled = [f"{ACTION_PREFIX}{a}" for a in actions]1069 return (action in throttled) or (action in actions)1070 def should_throttle(self, action):1071 rand = random.random()1072 if rand < config.DYNAMODB_READ_ERROR_PROBABILITY and self.action_should_throttle(1073 action, READ_THROTTLED_ACTIONS1074 ):1075 return True1076 elif rand < config.DYNAMODB_WRITE_ERROR_PROBABILITY and self.action_should_throttle(1077 action, WRITE_THROTTLED_ACTIONS1078 ):1079 return True1080 elif rand < config.DYNAMODB_ERROR_PROBABILITY and self.action_should_throttle(1081 action, THROTTLED_ACTIONS1082 ):1083 return True1084 else:1085 return False1086# ---1087# Misc. util functions1088# ---1089def get_global_secondary_index(table_name, index_name):1090 schema = SchemaExtractor.get_table_schema(table_name)1091 for index in schema["Table"].get("GlobalSecondaryIndexes", []):1092 if index["IndexName"] == index_name:1093 return index1094 raise ResourceNotFoundException("Index not found")1095def is_index_query_valid(query_data: dict) -> bool:1096 table_name = to_str(query_data["TableName"])1097 index_name = to_str(query_data["IndexName"])1098 index_query_type = query_data.get("Select")1099 index = get_global_secondary_index(table_name, index_name)1100 index_projection_type = index.get("Projection").get("ProjectionType")1101 if index_query_type == "ALL_ATTRIBUTES" and index_projection_type != "ALL":1102 return False1103 return True1104def has_event_sources_or_streams_enabled(table_name: str, cache: Dict = None):1105 if cache is None:1106 cache = {}1107 if not table_name:1108 return1109 table_arn = aws_stack.dynamodb_table_arn(table_name)1110 cached = cache.get(table_arn)1111 if isinstance(cached, bool):1112 return cached1113 sources = lambda_api.get_event_sources(source_arn=table_arn)1114 result = False1115 if sources:1116 result = True1117 if not result and dynamodbstreams_api.get_stream_for_table(table_arn):1118 result = True1119 # if kinesis streaming destination is enabled1120 # get table name from table_arn1121 # since batch_wrtie and transact write operations passing table_arn instead of table_name1122 table_name = table_arn.split("/", 1)[-1]1123 table_definitions = DynamoDBRegion.get().table_definitions1124 if not result and table_definitions.get(table_name):1125 if table_definitions[table_name].get("KinesisDataStreamDestinationStatus") == "ACTIVE":1126 result = True1127 cache[table_arn] = result1128 return result1129def get_updated_records(table_name: str, existing_items: List) -> List:1130 """1131 Determine the list of record updates, to be sent to a DDB stream after a PartiQL update operation.1132 Note: This is currently a fairly expensive operation, as we need to retrieve the list of all items1133 from the table, and compare the items to the previously available. This is a limitation as1134 we're currently using the DynamoDB Local backend as a blackbox. In future, we should consider hooking1135 into the PartiQL query execution inside DynamoDB Local and directly extract the list of updated items.1136 """1137 result = []1138 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)1139 key_schema = SchemaExtractor.get_key_schema(table_name)1140 before = ItemSet(existing_items, key_schema=key_schema)1141 after = ItemSet(ItemFinder.get_all_table_items(table_name), key_schema=key_schema)1142 def _add_record(item, comparison_set: ItemSet):1143 matching_item = comparison_set.find_item(item)1144 if matching_item == item:1145 return1146 # determine event type1147 if comparison_set == after:1148 if matching_item:1149 return1150 event_name = "REMOVE"1151 else:1152 event_name = "INSERT" if not matching_item else "MODIFY"1153 old_image = item if event_name == "REMOVE" else matching_item1154 new_image = matching_item if event_name == "REMOVE" else item1155 # prepare record1156 keys = SchemaExtractor.extract_keys_for_schema(item=item, key_schema=key_schema)1157 record = {1158 "eventName": event_name,1159 "eventID": short_uid(),1160 "dynamodb": {1161 "Keys": keys,1162 "NewImage": new_image,1163 "SizeBytes": len(json.dumps(item)),1164 },1165 }1166 if stream_spec:1167 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]1168 if old_image:1169 record["dynamodb"]["OldImage"] = old_image1170 result.append(record)1171 # loop over items in new item list (find INSERT/MODIFY events)1172 for item in after.items_list:1173 _add_record(item, before)1174 # loop over items in old item list (find REMOVE events)1175 for item in before.items_list:1176 _add_record(item, after)1177 return result1178def fix_headers_for_updated_response(response):1179 response.headers["Content-Length"] = len(to_bytes(response.content))1180 response.headers["x-amz-crc32"] = calculate_crc32(response)1181def create_dynamodb_stream(data, latest_stream_label):1182 stream = data["StreamSpecification"]1183 enabled = stream.get("StreamEnabled")1184 if enabled not in [False, "False"]:1185 table_name = data["TableName"]1186 view_type = stream["StreamViewType"]1187 dynamodbstreams_api.add_dynamodb_stream(1188 table_name=table_name,1189 latest_stream_label=latest_stream_label,1190 view_type=view_type,1191 enabled=enabled,1192 )1193def dynamodb_get_table_stream_specification(table_name):1194 try:1195 table_schema = SchemaExtractor.get_table_schema(table_name)1196 return table_schema["Table"].get("StreamSpecification")1197 except Exception as e:1198 LOG.info(1199 "Unable to get stream specification for table %s: %s %s",1200 table_name,1201 e,1202 traceback.format_exc(),1203 )...

Full Screen

Full Screen

dynamodb_listener.py

Source:dynamodb_listener.py Github

copy

Full Screen

...324 if existing_item:325 record["dynamodb"]["OldImage"] = existing_item326 record["dynamodb"]["NewImage"] = updated_item327 record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))328 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)329 if stream_spec:330 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]331 elif action == "BatchWriteItem":332 records, unprocessed_items = self.prepare_batch_write_item_records(record, data)333 for record in records:334 event_sources_or_streams_enabled = (335 event_sources_or_streams_enabled336 or has_event_sources_or_streams_enabled(337 record["eventSourceARN"], streams_enabled_cache338 )339 )340 if response.status_code == 200 and any(unprocessed_items):341 content = json.loads(to_str(response.content))342 table_name = list(data["RequestItems"].keys())[0]343 if table_name not in content["UnprocessedItems"]:344 content["UnprocessedItems"][table_name] = []345 for key in ["PutRequest", "DeleteRequest"]:346 if any(unprocessed_items[key]):347 content["UnprocessedItems"][table_name].append(348 {key: unprocessed_items[key]}349 )350 unprocessed = content["UnprocessedItems"]351 for key in list(unprocessed.keys()):352 if not unprocessed.get(key):353 del unprocessed[key]354 response._content = json.dumps(content)355 fix_headers_for_updated_response(response)356 elif action == "TransactWriteItems":357 records = self.prepare_transact_write_item_records(record, data)358 for record in records:359 event_sources_or_streams_enabled = (360 event_sources_or_streams_enabled361 or has_event_sources_or_streams_enabled(362 record["eventSourceARN"], streams_enabled_cache363 )364 )365 elif action == "PutItem":366 if response.status_code == 200:367 keys = dynamodb_extract_keys(item=data["Item"], table_name=table_name)368 if isinstance(keys, Response):369 return keys370 # fix response371 if response._content == "{}":372 response._content = update_put_item_response_content(data, response._content)373 fix_headers_for_updated_response(response)374 if event_sources_or_streams_enabled:375 existing_item = self._thread_local("existing_item")376 # Get stream specifications details for the table377 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)378 record["eventName"] = "INSERT" if not existing_item else "MODIFY"379 # prepare record keys380 record["dynamodb"]["Keys"] = keys381 record["dynamodb"]["NewImage"] = data["Item"]382 record["dynamodb"]["SizeBytes"] = len(json.dumps(data["Item"]))383 record["eventID"] = short_uid()384 if stream_spec:385 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]386 if existing_item:387 record["dynamodb"]["OldImage"] = existing_item388 elif action in ("GetItem", "Query"):389 if response.status_code == 200:390 content = json.loads(to_str(response.content))391 # make sure we append 'ConsumedCapacity', which is properly392 # returned by dynalite, but not by AWS's DynamoDBLocal393 if "ConsumedCapacity" not in content and data.get("ReturnConsumedCapacity") in [394 "TOTAL",395 "INDEXES",396 ]:397 content["ConsumedCapacity"] = {398 "TableName": table_name,399 "CapacityUnits": 5, # TODO hardcoded400 "ReadCapacityUnits": 2,401 "WriteCapacityUnits": 3,402 }403 response._content = json.dumps(content)404 fix_headers_for_updated_response(response)405 elif action == "DeleteItem":406 if response.status_code == 200 and event_sources_or_streams_enabled:407 old_item = self._thread_local("existing_item")408 record["eventID"] = short_uid()409 record["eventName"] = "REMOVE"410 record["dynamodb"]["Keys"] = data["Key"]411 record["dynamodb"]["OldImage"] = old_item412 record["dynamodb"]["SizeBytes"] = len(json.dumps(old_item))413 # Get stream specifications details for the table414 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)415 if stream_spec:416 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]417 elif action == "CreateTable":418 if response.status_code == 200:419 table_definitions = (420 DynamoDBRegion.get().table_definitions.get(data["TableName"]) or {}421 )422 if "TableId" not in table_definitions:423 table_definitions["TableId"] = long_uid()424 if "SSESpecification" in table_definitions:425 sse_specification = table_definitions.pop("SSESpecification")426 table_definitions["SSEDescription"] = get_sse_description(sse_specification)427 content = json.loads(to_str(response.content))428 if table_definitions:429 table_content = content.get("Table", {})430 table_content.update(table_definitions)431 content["TableDescription"].update(table_content)432 update_response_content(response, content)433 if "StreamSpecification" in data:434 create_dynamodb_stream(435 data, content["TableDescription"].get("LatestStreamLabel")436 )437 if data.get("Tags"):438 table_arn = content["TableDescription"]["TableArn"]439 DynamoDBRegion.TABLE_TAGS[table_arn] = {440 tag["Key"]: tag["Value"] for tag in data["Tags"]441 }442 event_publisher.fire_event(443 event_publisher.EVENT_DYNAMODB_CREATE_TABLE,444 payload={"n": event_publisher.get_hash(table_name)},445 )446 return447 elif action == "DeleteTable":448 if response.status_code == 200:449 table_arn = (450 json.loads(response._content).get("TableDescription", {}).get("TableArn")451 )452 event_publisher.fire_event(453 event_publisher.EVENT_DYNAMODB_DELETE_TABLE,454 payload={"n": event_publisher.get_hash(table_name)},455 )456 self.delete_all_event_source_mappings(table_arn)457 dynamodbstreams_api.delete_streams(table_arn)458 DynamoDBRegion.TABLE_TAGS.pop(table_arn, None)459 return460 elif action == "UpdateTable":461 content_str = to_str(response._content or "")462 if response.status_code == 200 and "StreamSpecification" in data:463 content = json.loads(content_str)464 create_dynamodb_stream(data, content["TableDescription"].get("LatestStreamLabel"))465 if (466 response.status_code >= 400467 and data.get("ReplicaUpdates")468 and "Nothing to update" in content_str469 ):470 table_name = data.get("TableName")471 # update local table props (replicas)472 table_properties = DynamoDBRegion.get().table_properties473 table_properties[table_name] = table_props = table_properties.get(table_name) or {}474 table_props["Replicas"] = replicas = table_props.get("Replicas") or []475 for repl_update in data["ReplicaUpdates"]:476 for key, details in repl_update.items():477 region = details.get("RegionName")478 if key == "Create":479 details["ReplicaStatus"] = details.get("ReplicaStatus") or "ACTIVE"480 replicas.append(details)481 if key == "Update":482 replica = [r for r in replicas if r.get("RegionName") == region]483 if replica:484 replica[0].update(details)485 if key == "Delete":486 table_props["Replicas"] = [487 r for r in replicas if r.get("RegionName") != region488 ]489 # update response content490 schema = get_table_schema(table_name)491 result = {"TableDescription": schema["Table"]}492 update_response_content(response, json_safe(result), 200)493 return494 elif action == "DescribeTable":495 table_name = data.get("TableName")496 table_props = DynamoDBRegion.get().table_properties.get(table_name)497 if table_props:498 content = json.loads(to_str(response.content))499 content.get("Table", {}).update(table_props)500 update_response_content(response, content)501 # Update only TableId and SSEDescription if present502 table_definitions = DynamoDBRegion.get().table_definitions.get(table_name)503 if table_definitions:504 for key in ["TableId", "SSEDescription"]:505 if table_definitions.get(key):506 content = json.loads(to_str(response.content))507 content.get("Table", {})[key] = table_definitions[key]508 update_response_content(response, content)509 elif action == "TagResource":510 table_arn = data["ResourceArn"]511 table_tags = DynamoDBRegion.TABLE_TAGS512 if table_arn not in table_tags:513 table_tags[table_arn] = {}514 table_tags[table_arn].update({tag["Key"]: tag["Value"] for tag in data.get("Tags", [])})515 return516 elif action == "UntagResource":517 table_arn = data["ResourceArn"]518 for tag_key in data.get("TagKeys", []):519 DynamoDBRegion.TABLE_TAGS.get(table_arn, {}).pop(tag_key, None)520 return521 else:522 # nothing to do523 return524 if event_sources_or_streams_enabled and records and "eventName" in records[0]:525 if "TableName" in data:526 records[0]["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)527 # forward to kinesis stream528 records_to_kinesis = copy.deepcopy(records)529 forward_to_kinesis_stream(records_to_kinesis)530 # forward to lambda and ddb_streams531 forward_to_lambda(records)532 records = self.prepare_records_to_forward_to_ddb_stream(records)533 forward_to_ddb_stream(records)534 # -------------535 # UTIL METHODS536 # -------------537 def prepare_request_headers(self, headers):538 # Note: We need to ensure that the same access key is used here for all requests,539 # otherwise DynamoDBLocal stores tables/items in separate namespaces540 headers["Authorization"] = re.sub(541 r"Credential=[^/]+/",542 r"Credential=%s/" % constants.TEST_AWS_ACCESS_KEY_ID,543 headers.get("Authorization") or "",544 )545 def prepare_batch_write_item_records(self, record, data):546 records = []547 unprocessed_items = {"PutRequest": {}, "DeleteRequest": {}}548 i = 0549 for table_name in sorted(data["RequestItems"].keys()):550 # Add stream view type to record if ddb stream is enabled551 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)552 if stream_spec:553 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]554 for request in data["RequestItems"][table_name]:555 put_request = request.get("PutRequest")556 existing_items = self._thread_local("existing_items")557 if put_request:558 if existing_items and len(existing_items) > i:559 existing_item = existing_items[i]560 keys = dynamodb_extract_keys(561 item=put_request["Item"], table_name=table_name562 )563 if isinstance(keys, Response):564 return keys565 new_record = clone(record)566 new_record["eventID"] = short_uid()567 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))568 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"569 new_record["dynamodb"]["Keys"] = keys570 new_record["dynamodb"]["NewImage"] = put_request["Item"]571 if existing_item:572 new_record["dynamodb"]["OldImage"] = existing_item573 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)574 records.append(new_record)575 unprocessed_put_items = self._thread_local("unprocessed_put_items")576 if unprocessed_put_items and len(unprocessed_put_items) > i:577 unprocessed_item = unprocessed_put_items[i]578 if unprocessed_item:579 unprocessed_items["PutRequest"].update(580 json.loads(json.dumps(unprocessed_item))581 )582 delete_request = request.get("DeleteRequest")583 if delete_request:584 if existing_items and len(existing_items) > i:585 keys = delete_request["Key"]586 if isinstance(keys, Response):587 return keys588 new_record = clone(record)589 new_record["eventID"] = short_uid()590 new_record["eventName"] = "REMOVE"591 new_record["dynamodb"]["Keys"] = keys592 new_record["dynamodb"]["OldImage"] = existing_items[i]593 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_items[i]))594 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)595 records.append(new_record)596 unprocessed_delete_items = self._thread_local("unprocessed_delete_items")597 if unprocessed_delete_items and len(unprocessed_delete_items) > i:598 unprocessed_item = unprocessed_delete_items[i]599 if unprocessed_item:600 unprocessed_items["DeleteRequest"].update(601 json.loads(json.dumps(unprocessed_item))602 )603 i += 1604 return records, unprocessed_items605 def prepare_transact_write_item_records(self, record, data):606 records = []607 # Fix issue #2745: existing_items only contain the Put/Update/Delete records,608 # so we will increase the index based on these events609 i = 0610 for request in data["TransactItems"]:611 put_request = request.get("Put")612 if put_request:613 existing_item = self._thread_local("existing_items")[i]614 table_name = put_request["TableName"]615 keys = dynamodb_extract_keys(item=put_request["Item"], table_name=table_name)616 if isinstance(keys, Response):617 return keys618 # Add stream view type to record if ddb stream is enabled619 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)620 if stream_spec:621 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]622 new_record = clone(record)623 new_record["eventID"] = short_uid()624 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"625 new_record["dynamodb"]["Keys"] = keys626 new_record["dynamodb"]["NewImage"] = put_request["Item"]627 if existing_item:628 new_record["dynamodb"]["OldImage"] = existing_item629 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)630 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))631 records.append(new_record)632 i += 1633 update_request = request.get("Update")634 if update_request:635 table_name = update_request["TableName"]636 keys = update_request["Key"]637 if isinstance(keys, Response):638 return keys639 updated_item = find_existing_item(update_request, table_name)640 if not updated_item:641 return []642 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)643 if stream_spec:644 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]645 new_record = clone(record)646 new_record["eventID"] = short_uid()647 new_record["eventName"] = "MODIFY"648 new_record["dynamodb"]["Keys"] = keys649 new_record["dynamodb"]["OldImage"] = self._thread_local("existing_items")[i]650 new_record["dynamodb"]["NewImage"] = updated_item651 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)652 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))653 records.append(new_record)654 i += 1655 delete_request = request.get("Delete")656 if delete_request:657 table_name = delete_request["TableName"]658 keys = delete_request["Key"]659 existing_item = self._thread_local("existing_items")[i]660 if isinstance(keys, Response):661 return keys662 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)663 if stream_spec:664 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]665 new_record = clone(record)666 new_record["eventID"] = short_uid()667 new_record["eventName"] = "REMOVE"668 new_record["dynamodb"]["Keys"] = keys669 new_record["dynamodb"]["OldImage"] = existing_item670 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_item))671 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)672 records.append(new_record)673 i += 1674 return records675 def prepare_records_to_forward_to_ddb_stream(self, records):676 # StreamViewType determines what information is written to the stream for the table677 # When an item in the table is inserted, updated or deleted678 for record in records:679 if record["dynamodb"].get("StreamViewType"):680 # KEYS_ONLY - Only the key attributes of the modified item are written to the stream681 if record["dynamodb"]["StreamViewType"] == "KEYS_ONLY":682 record["dynamodb"].pop("OldImage", None)683 record["dynamodb"].pop("NewImage", None)684 # NEW_IMAGE - The entire item, as it appears after it was modified, is written to the stream685 elif record["dynamodb"]["StreamViewType"] == "NEW_IMAGE":686 record["dynamodb"].pop("OldImage", None)687 # OLD_IMAGE - The entire item, as it appeared before it was modified, is written to the stream688 elif record["dynamodb"]["StreamViewType"] == "OLD_IMAGE":689 record["dynamodb"].pop("NewImage", None)690 return records691 def delete_all_event_source_mappings(self, table_arn):692 if table_arn:693 # fix start dynamodb service without lambda694 if not is_api_enabled("lambda"):695 return696 lambda_client = aws_stack.connect_to_service("lambda")697 result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)698 for event in result["EventSourceMappings"]:699 event_source_mapping_id = event["UUID"]700 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_id)701 @staticmethod702 def _thread_local(name, default=None):703 try:704 return getattr(ProxyListenerDynamoDB.thread_local, name)705 except AttributeError:706 return default707def get_sse_kms_managed_key():708 if MANAGED_KMS_KEYS.get(aws_stack.get_region()):709 return MANAGED_KMS_KEYS[aws_stack.get_region()]710 kms_client = aws_stack.connect_to_service("kms")711 key_data = kms_client.create_key(Description="Default key that protects DynamoDB data")712 key_id = key_data["KeyMetadata"]["KeyId"]713 # not really happy with this import here714 from localstack.services.kms import kms_listener715 kms_listener.set_key_managed(key_id)716 MANAGED_KMS_KEYS[aws_stack.get_region()] = key_id717 return key_id718def get_sse_description(data):719 if data.get("Enabled"):720 kms_master_key_id = data.get("KMSMasterKeyId")721 if not kms_master_key_id:722 # this is of course not the actual key for dynamodb, just a better, since existing, mock723 kms_master_key_id = get_sse_kms_managed_key()724 kms_master_key_id = aws_stack.kms_key_arn(kms_master_key_id)725 return {726 "Status": "ENABLED",727 "SSEType": "KMS", # no other value is allowed here728 "KMSMasterKeyArn": kms_master_key_id,729 }730 return {}731def handle_special_request(method, path, data, headers):732 if path.startswith("/shell") or method == "GET":733 if path == "/shell":734 headers = {"Refresh": "0; url=%s/shell/" % config.TEST_DYNAMODB_URL}735 return aws_responses.requests_response("", headers=headers)736 return True737 if method == "OPTIONS":738 return 200739def create_global_table(data):740 table_name = data["GlobalTableName"]741 if table_name in DynamoDBRegion.GLOBAL_TABLES:742 return get_error_message(743 "Global Table with this name already exists",744 "GlobalTableAlreadyExistsException",745 )746 DynamoDBRegion.GLOBAL_TABLES[table_name] = data747 for group in data.get("ReplicationGroup", []):748 group["ReplicaStatus"] = "ACTIVE"749 group["ReplicaStatusDescription"] = "Replica active"750 result = {"GlobalTableDescription": data}751 return result752def describe_global_table(data):753 table_name = data["GlobalTableName"]754 details = DynamoDBRegion.GLOBAL_TABLES.get(table_name)755 if not details:756 return get_error_message(757 "Global Table with this name does not exist", "GlobalTableNotFoundException"758 )759 result = {"GlobalTableDescription": details}760 return result761def list_global_tables(data):762 result = [763 select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])764 for tab in DynamoDBRegion.GLOBAL_TABLES.values()765 ]766 result = {"GlobalTables": result}767 return result768def update_global_table(data):769 table_name = data["GlobalTableName"]770 details = DynamoDBRegion.GLOBAL_TABLES.get(table_name)771 if not details:772 return get_error_message(773 "Global Table with this name does not exist", "GlobalTableNotFoundException"774 )775 for update in data.get("ReplicaUpdates", []):776 repl_group = details["ReplicationGroup"]777 # delete existing778 delete = update.get("Delete")779 if delete:780 details["ReplicationGroup"] = [781 g for g in repl_group if g["RegionName"] != delete["RegionName"]782 ]783 # create new784 create = update.get("Create")785 if create:786 exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]787 if exists:788 continue789 new_group = {790 "RegionName": create["RegionName"],791 "ReplicaStatus": "ACTIVE",792 "ReplicaStatusDescription": "Replica active",793 }794 details["ReplicationGroup"].append(new_group)795 result = {"GlobalTableDescription": details}796 return result797def is_index_query_valid(table_name, index_query_type):798 schema = get_table_schema(table_name)799 for index in schema["Table"].get("GlobalSecondaryIndexes", []):800 index_projection_type = index.get("Projection").get("ProjectionType")801 if index_query_type == "ALL_ATTRIBUTES" and index_projection_type != "ALL":802 return False803 return True804def has_event_sources_or_streams_enabled(table_name, cache={}):805 if not table_name:806 return807 table_arn = aws_stack.dynamodb_table_arn(table_name)808 cached = cache.get(table_arn)809 if isinstance(cached, bool):810 return cached811 sources = lambda_api.get_event_sources(source_arn=table_arn)812 result = False813 if sources:814 result = True815 if not result and dynamodbstreams_api.get_stream_for_table(table_arn):816 result = True817 cache[table_arn] = result818 # if kinesis streaming destination is enabled819 # get table name from table_arn820 # since batch_wrtie and transact write operations passing table_arn instead of table_name821 table_name = table_arn.split("/", 1)[-1]822 table_definitions = DynamoDBRegion.get().table_definitions823 if not result and table_definitions.get(table_name):824 if table_definitions[table_name].get("KinesisDataStreamDestinationStatus") == "ACTIVE":825 result = True826 return result827def get_table_schema(table_name):828 key = "%s/%s" % (aws_stack.get_region(), table_name)829 schema = SCHEMA_CACHE.get(key)830 if not schema:831 ddb_client = aws_stack.connect_to_service("dynamodb")832 schema = ddb_client.describe_table(TableName=table_name)833 SCHEMA_CACHE[key] = schema834 return schema835def find_existing_item(put_item, table_name=None):836 table_name = table_name or put_item["TableName"]837 ddb_client = aws_stack.connect_to_service("dynamodb")838 search_key = {}839 if "Key" in put_item:840 search_key = put_item["Key"]841 else:842 schema = get_table_schema(table_name)843 schemas = [schema["Table"]["KeySchema"]]844 for index in schema["Table"].get("GlobalSecondaryIndexes", []):845 # TODO846 # schemas.append(index['KeySchema'])847 pass848 for schema in schemas:849 for key in schema:850 key_name = key["AttributeName"]851 search_key[key_name] = put_item["Item"][key_name]852 if not search_key:853 return854 req = {"TableName": table_name, "Key": search_key}855 existing_item = aws_stack.dynamodb_get_item_raw(req)856 if not existing_item:857 return existing_item858 if "Item" not in existing_item:859 if "message" in existing_item:860 table_names = ddb_client.list_tables()["TableNames"]861 msg = "Unable to get item from DynamoDB (existing tables: %s ...truncated if >100 tables): %s" % (862 table_names,863 existing_item["message"],864 )865 LOGGER.warning(msg)866 return867 return existing_item.get("Item")868def get_error_message(message, error_type):869 response = error_response(message=message, error_type=error_type)870 fix_headers_for_updated_response(response)871 return response872def get_table_not_found_error():873 return get_error_message(874 message="Cannot do operations on a non-existent table",875 error_type="ResourceNotFoundException",876 )877def fix_headers_for_updated_response(response):878 response.headers["Content-Length"] = len(to_bytes(response.content))879 response.headers["x-amz-crc32"] = calculate_crc32(response)880def update_response_content(response, content, status_code=None):881 aws_responses.set_response_content(response, content)882 if status_code:883 response.status_code = status_code884 fix_headers_for_updated_response(response)885def update_put_item_response_content(data, response_content):886 # when return-values variable is set only then attribute data should be returned887 # in the response otherwise by default is should not return any data.888 # https://github.com/localstack/localstack/issues/2121889 if data.get("ReturnValues"):890 response_content = json.dumps({"Attributes": data["Item"]})891 return response_content892def calculate_crc32(response):893 return crc32(to_bytes(response.content)) & 0xFFFFFFFF894def create_dynamodb_stream(data, latest_stream_label):895 stream = data["StreamSpecification"]896 enabled = stream.get("StreamEnabled")897 if enabled not in [False, "False"]:898 table_name = data["TableName"]899 view_type = stream["StreamViewType"]900 dynamodbstreams_api.add_dynamodb_stream(901 table_name=table_name,902 latest_stream_label=latest_stream_label,903 view_type=view_type,904 enabled=enabled,905 )906def forward_to_lambda(records):907 for record in records:908 sources = lambda_api.get_event_sources(source_arn=record["eventSourceARN"])909 event = {"Records": [record]}910 for src in sources:911 if src.get("State") != "Enabled":912 continue913 lambda_api.run_lambda(914 func_arn=src["FunctionArn"],915 event=event,916 context={},917 asynchronous=not config.SYNCHRONOUS_DYNAMODB_EVENTS,918 )919def forward_to_ddb_stream(records):920 dynamodbstreams_api.forward_events(records)921def forward_to_kinesis_stream(records):922 kinesis = aws_stack.connect_to_service("kinesis")923 table_definitions = DynamoDBRegion.get().table_definitions924 for record in records:925 if record.get("eventSourceARN"):926 table_name = record["eventSourceARN"].split("/", 1)[-1]927 table_def = table_definitions.get(table_name) or {}928 if table_def.get("KinesisDataStreamDestinationStatus") == "ACTIVE":929 stream_name = table_def["KinesisDataStreamDestinations"][-1]["StreamArn"].split(930 "/", 1931 )[-1]932 record["tableName"] = table_name933 record.pop("eventSourceARN", None)934 record["dynamodb"].pop("StreamViewType", None)935 partition_key = list(936 filter(lambda key: key["KeyType"] == "HASH", table_def["KeySchema"])937 )[0]["AttributeName"]938 kinesis.put_record(939 StreamName=stream_name,940 Data=json.dumps(record),941 PartitionKey=partition_key,942 )943def dynamodb_extract_keys(item, table_name):944 result = {}945 table_definitions = DynamoDBRegion.get().table_definitions946 if table_name not in table_definitions:947 LOGGER.warning("Unknown table: %s not found in %s" % (table_name, table_definitions))948 return None949 for key in table_definitions[table_name]["KeySchema"]:950 attr_name = key["AttributeName"]951 if attr_name not in item:952 return error_response(953 error_type="ValidationException",954 message="One of the required keys was not given a value",955 )956 result[attr_name] = item[attr_name]957 return result958def dynamodb_get_table_stream_specification(table_name):959 try:960 return get_table_schema(table_name)["Table"].get("StreamSpecification")961 except Exception as e:962 LOGGER.info(963 "Unable to get stream specification for table %s : %s %s"964 % (table_name, e, traceback.format_exc())965 )966 raise e967def is_kinesis_stream_exists(stream_arn):968 # connect to kinesis969 kinesis = aws_stack.connect_to_service("kinesis")970 stream_name_from_arn = stream_arn.split("/", 1)[1]971 # check if the stream exists in kinesis for the user972 filtered = list(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful