How to use forward_to_targets method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...132 self.table_properties = {}133 self.ttl_specifications = {}134class EventForwarder:135 @classmethod136 def forward_to_targets(cls, records: List[Dict], background: bool = True):137 def _forward(*args):138 # forward to kinesis stream139 records_to_kinesis = copy.deepcopy(records)140 cls.forward_to_kinesis_stream(records_to_kinesis)141 # forward to lambda and ddb_streams142 forward_records = cls.prepare_records_to_forward_to_ddb_stream(records)143 records_to_ddb = copy.deepcopy(forward_records)144 cls.forward_to_ddb_stream(records_to_ddb)145 # lambda receives the same records as the ddb streams146 cls.forward_to_lambda(forward_records)147 if background:148 return start_worker_thread(_forward)149 _forward()150 @staticmethod151 def forward_to_lambda(records):152 for record in records:153 event_source_arn = record.get("eventSourceARN")154 if not event_source_arn:155 continue156 sources = lambda_api.get_event_sources(source_arn=event_source_arn)157 event = {"Records": [record]}158 for src in sources:159 if src.get("State") != "Enabled":160 continue161 lambda_api.run_lambda(162 func_arn=src["FunctionArn"],163 event=event,164 context={},165 asynchronous=not config.SYNCHRONOUS_DYNAMODB_EVENTS,166 )167 @staticmethod168 def forward_to_ddb_stream(records):169 dynamodbstreams_api.forward_events(records)170 @staticmethod171 def forward_to_kinesis_stream(records):172 kinesis = aws_stack.connect_to_service("kinesis")173 table_definitions = DynamoDBRegion.get().table_definitions174 for record in records:175 event_source_arn = record.get("eventSourceARN")176 if not event_source_arn:177 continue178 table_name = event_source_arn.split("/", 1)[-1]179 table_def = table_definitions.get(table_name) or {}180 if table_def.get("KinesisDataStreamDestinationStatus") != "ACTIVE":181 continue182 stream_arn = table_def["KinesisDataStreamDestinations"][-1]["StreamArn"]183 stream_name = stream_arn.split("/", 1)[-1]184 record["tableName"] = table_name185 record.pop("eventSourceARN", None)186 record["dynamodb"].pop("StreamViewType", None)187 hash_keys = list(filter(lambda key: key["KeyType"] == "HASH", table_def["KeySchema"]))188 partition_key = hash_keys[0]["AttributeName"]189 kinesis.put_record(190 StreamName=stream_name,191 Data=json.dumps(record),192 PartitionKey=partition_key,193 )194 @classmethod195 def prepare_records_to_forward_to_ddb_stream(cls, records):196 # StreamViewType determines what information is written to the stream for the table197 # When an item in the table is inserted, updated or deleted198 for record in records:199 record.pop("eventID", None)200 ddb_record = record["dynamodb"]201 stream_type = ddb_record.get("StreamViewType")202 if not stream_type:203 continue204 if "SequenceNumber" not in ddb_record:205 ddb_record["SequenceNumber"] = str(206 dynamodbstreams_api.DynamoDBStreamsBackend.SEQUENCE_NUMBER_COUNTER207 )208 dynamodbstreams_api.DynamoDBStreamsBackend.SEQUENCE_NUMBER_COUNTER += 1209 # KEYS_ONLY - Only the key attributes of the modified item are written to the stream210 if stream_type == "KEYS_ONLY":211 ddb_record.pop("OldImage", None)212 ddb_record.pop("NewImage", None)213 # NEW_IMAGE - The entire item, as it appears after it was modified, is written to the stream214 elif stream_type == "NEW_IMAGE":215 ddb_record.pop("OldImage", None)216 # OLD_IMAGE - The entire item, as it appeared before it was modified, is written to the stream217 elif stream_type == "OLD_IMAGE":218 ddb_record.pop("NewImage", None)219 return records220 @classmethod221 def is_kinesis_stream_exists(cls, stream_arn):222 kinesis = aws_stack.connect_to_service("kinesis")223 stream_name_from_arn = stream_arn.split("/", 1)[1]224 # check if the stream exists in kinesis for the user225 filtered = list(226 filter(227 lambda stream_name: stream_name == stream_name_from_arn,228 kinesis.list_streams()["StreamNames"],229 )230 )231 return bool(filtered)232class SSEUtils:233 """Utils for server-side encryption (SSE)"""234 @classmethod235 def get_sse_kms_managed_key(cls):236 from localstack.services.kms import provider237 existing_key = MANAGED_KMS_KEYS.get(aws_stack.get_region())238 if existing_key:239 return existing_key240 kms_client = aws_stack.connect_to_service("kms")241 key_data = kms_client.create_key(Description="Default key that protects DynamoDB data")242 key_id = key_data["KeyMetadata"]["KeyId"]243 provider.set_key_managed(key_id)244 MANAGED_KMS_KEYS[aws_stack.get_region()] = key_id245 return key_id246 @classmethod247 def get_sse_description(cls, data):248 if data.get("Enabled"):249 kms_master_key_id = data.get("KMSMasterKeyId")250 if not kms_master_key_id:251 # this is of course not the actual key for dynamodb, just a better, since existing, mock252 kms_master_key_id = cls.get_sse_kms_managed_key()253 kms_master_key_id = aws_stack.kms_key_arn(kms_master_key_id)254 return {255 "Status": "ENABLED",256 "SSEType": "KMS", # no other value is allowed here257 "KMSMasterKeyArn": kms_master_key_id,258 }259 return {}260class ValidationException(CommonServiceException):261 def __init__(self, message: str):262 super().__init__(code="ValidationException", status_code=400, message=message)263class DynamoDBApiListener(AwsApiListener):264 def __init__(self, provider=None):265 provider = provider or DynamoDBProvider()266 self.provider = provider267 super().__init__("dynamodb", HttpFallbackDispatcher(provider, provider.get_forward_url))268 def forward_request(self, method, path, data, headers):269 action = headers.get("X-Amz-Target", "")270 action = action.replace(ACTION_PREFIX, "")271 if self.provider.should_throttle(action):272 message = (273 "The level of configured provisioned throughput for the table was exceeded. "274 + "Consider increasing your provisioning level with the UpdateTable API"275 )276 raise ProvisionedThroughputExceededException(message)277 return super().forward_request(method, path, data, headers)278 def return_response(self, method, path, data, headers, response):279 if response._content:280 # fix the table and latest stream ARNs (DynamoDBLocal hardcodes "ddblocal" as the region)281 content_replaced = re.sub(282 r'("TableArn"|"LatestStreamArn"|"StreamArn")\s*:\s*"arn:aws:dynamodb:ddblocal:([^"]+)"',283 rf'\1: "arn:aws:dynamodb:{aws_stack.get_region()}:\2"',284 to_str(response._content),285 )286 if content_replaced != response._content:287 response._content = content_replaced288 # set x-amz-crc32 headers required by some client289 fix_headers_for_updated_response(response)290 # update table definitions291 data = json.loads(to_str(data))292 if data and "TableName" in data and "KeySchema" in data:293 table_definitions = DynamoDBRegion.get().table_definitions294 table_definitions[data["TableName"]] = data295class DynamoDBProvider(DynamodbApi, ServiceLifecycleHook):296 def __init__(self):297 self.request_forwarder = get_request_forwarder_http(self.get_forward_url)298 def on_after_init(self):299 ROUTER.add(300 path="/shell",301 endpoint=self.handle_shell_ui_redirect,302 methods=["GET"],303 )304 ROUTER.add(305 path="/shell/<regex('.*'):req_path>",306 endpoint=self.handle_shell_ui_request,307 )308 def forward_request(309 self, context: RequestContext, service_request: ServiceRequest = None310 ) -> ServiceResponse:311 # note: modifying headers in-place here before forwarding the request312 self.prepare_request_headers(context.request.headers)313 return self.request_forwarder(context, service_request)314 def get_forward_url(self) -> str:315 """Return the URL of the backend DynamoDBLocal server to forward requests to"""316 return f"http://{LOCALHOST}:{server.get_server().port}"317 def on_before_start(self):318 start_dynamodb()319 wait_for_dynamodb()320 def handle_shell_ui_redirect(self, request: werkzeug.Request) -> Response:321 headers = {"Refresh": f"0; url={config.service_url('dynamodb')}/shell/index.html"}322 return Response("", headers=headers)323 def handle_shell_ui_request(self, request: werkzeug.Request, req_path: str) -> Response:324 # TODO: "DynamoDB Local Web Shell was deprecated with version 1.16.X and is not available any325 # longer from 1.17.X to latest. There are no immediate plans for a new Web Shell to be introduced."326 # -> keeping this for now, to allow configuring custom installs; should consider removing it in the future327 # https://repost.aws/questions/QUHyIzoEDqQ3iOKlUEp1LPWQ#ANdBm9Nz9TRf6VqR3jZtcA1g328 req_path = f"/{req_path}" if not req_path.startswith("/") else req_path329 url = f"{self.get_forward_url()}/shell{req_path}"330 result = requests.request(331 method=request.method, url=url, headers=request.headers, data=request.data332 )333 return Response(result.content, headers=dict(result.headers), status=result.status_code)334 @handler("CreateTable", expand=False)335 def create_table(336 self,337 context: RequestContext,338 create_table_input: CreateTableInput,339 ) -> CreateTableOutput:340 # Check if table exists, to avoid error log output from DynamoDBLocal341 table_name = create_table_input["TableName"]342 if self.table_exists(table_name):343 raise ResourceInUseException("Cannot create preexisting table")344 # forward request to backend345 result = self.forward_request(context)346 backend = DynamoDBRegion.get()347 backend.table_definitions[table_name] = table_definitions = dict(create_table_input)348 if "TableId" not in table_definitions:349 table_definitions["TableId"] = long_uid()350 if "SSESpecification" in table_definitions:351 sse_specification = table_definitions.pop("SSESpecification")352 table_definitions["SSEDescription"] = SSEUtils.get_sse_description(sse_specification)353 if table_definitions:354 table_content = result.get("Table", {})355 table_content.update(table_definitions)356 result["TableDescription"].update(table_content)357 if "StreamSpecification" in table_definitions:358 create_dynamodb_stream(359 table_definitions, result["TableDescription"].get("LatestStreamLabel")360 )361 tags = table_definitions.pop("Tags", [])362 result["TableDescription"].pop("Tags", None)363 if tags:364 table_arn = result["TableDescription"]["TableArn"]365 table_arn = self.fix_table_arn(table_arn)366 DynamoDBRegion.TABLE_TAGS[table_arn] = {tag["Key"]: tag["Value"] for tag in tags}367 event_publisher.fire_event(368 event_publisher.EVENT_DYNAMODB_CREATE_TABLE,369 payload={"n": event_publisher.get_hash(table_name)},370 )371 return result372 def delete_table(self, context: RequestContext, table_name: TableName) -> DeleteTableOutput:373 # Check if table exists, to avoid error log output from DynamoDBLocal374 if not self.table_exists(table_name):375 raise ResourceNotFoundException("Cannot do operations on a non-existent table")376 # forward request to backend377 result = self.forward_request(context)378 event_publisher.fire_event(379 event_publisher.EVENT_DYNAMODB_DELETE_TABLE,380 payload={"n": event_publisher.get_hash(table_name)},381 )382 table_arn = result.get("TableDescription", {}).get("TableArn")383 table_arn = self.fix_table_arn(table_arn)384 self.delete_all_event_source_mappings(table_arn)385 dynamodbstreams_api.delete_streams(table_arn)386 DynamoDBRegion.TABLE_TAGS.pop(table_arn, None)387 return result388 def describe_table(self, context: RequestContext, table_name: TableName) -> DescribeTableOutput:389 # Check if table exists, to avoid error log output from DynamoDBLocal390 if not self.table_exists(table_name):391 raise ResourceNotFoundException("Cannot do operations on a non-existent table")392 # forward request to backend393 result = self.forward_request(context)394 # update response with additional props395 table_props = DynamoDBRegion.get().table_properties.get(table_name)396 if table_props:397 result.get("Table", {}).update(table_props)398 # update only TableId and SSEDescription if present399 table_definitions = DynamoDBRegion.get().table_definitions.get(table_name)400 if table_definitions:401 for key in ["TableId", "SSEDescription"]:402 if table_definitions.get(key):403 result.get("Table", {})[key] = table_definitions[key]404 return result405 @handler("UpdateTable", expand=False)406 def update_table(407 self, context: RequestContext, update_table_input: UpdateTableInput408 ) -> UpdateTableOutput:409 try:410 # forward request to backend411 result = self.forward_request(context)412 except CommonServiceException as e:413 is_no_update_error = (414 e.code == "ValidationException" and "Nothing to update" in e.message415 )416 if is_no_update_error and update_table_input.get("ReplicaUpdates"):417 table_name = update_table_input.get("TableName")418 # update local table props (replicas)419 table_properties = DynamoDBRegion.get().table_properties420 table_properties[table_name] = table_props = table_properties.get(table_name) or {}421 table_props["Replicas"] = replicas = table_props.get("Replicas") or []422 for repl_update in update_table_input["ReplicaUpdates"]:423 for key, details in repl_update.items():424 region = details.get("RegionName")425 if key == "Create":426 details["ReplicaStatus"] = details.get("ReplicaStatus") or "ACTIVE"427 replicas.append(details)428 if key == "Update":429 replica = [r for r in replicas if r.get("RegionName") == region]430 if replica:431 replica[0].update(details)432 if key == "Delete":433 table_props["Replicas"] = [434 r for r in replicas if r.get("RegionName") != region435 ]436 # update response content437 schema = SchemaExtractor.get_table_schema(table_name)438 return UpdateTableOutput(TableDescription=schema["Table"])439 raise440 if "StreamSpecification" in update_table_input:441 create_dynamodb_stream(442 update_table_input, result["TableDescription"].get("LatestStreamLabel")443 )444 return result445 def list_tables(446 self,447 context: RequestContext,448 exclusive_start_table_name: TableName = None,449 limit: ListTablesInputLimit = None,450 ) -> ListTablesOutput:451 return self.forward_request(context)452 @handler("PutItem", expand=False)453 def put_item(self, context: RequestContext, put_item_input: PutItemInput) -> PutItemOutput:454 existing_item = None455 table_name = put_item_input["TableName"]456 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)457 if event_sources_or_streams_enabled:458 existing_item = ItemFinder.find_existing_item(put_item_input)459 # forward request to backend460 self.fix_return_consumed_capacity(put_item_input)461 result = self.forward_request(context, put_item_input)462 # Get stream specifications details for the table463 if event_sources_or_streams_enabled:464 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)465 item = put_item_input["Item"]466 # prepare record keys467 keys = SchemaExtractor.extract_keys(item=item, table_name=table_name)468 # create record469 record = self.get_record_template()470 record["eventName"] = "INSERT" if not existing_item else "MODIFY"471 record["dynamodb"].update(472 {473 "Keys": keys,474 "NewImage": item,475 "SizeBytes": len(json.dumps(item)),476 }477 )478 if stream_spec:479 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]480 if existing_item:481 record["dynamodb"]["OldImage"] = existing_item482 self.forward_stream_records([record], table_name=table_name)483 return result484 @handler("DeleteItem", expand=False)485 def delete_item(486 self,487 context: RequestContext,488 delete_item_input: DeleteItemInput,489 ) -> DeleteItemOutput:490 existing_item = None491 table_name = delete_item_input["TableName"]492 if has_event_sources_or_streams_enabled(table_name):493 existing_item = ItemFinder.find_existing_item(delete_item_input)494 # forward request to backend495 self.fix_return_consumed_capacity(delete_item_input)496 result = self.forward_request(context, delete_item_input)497 # determine and forward stream record498 if existing_item:499 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)500 if event_sources_or_streams_enabled:501 # create record502 record = self.get_record_template()503 record["eventName"] = "REMOVE"504 record["dynamodb"].update(505 {506 "Keys": delete_item_input["Key"],507 "OldImage": existing_item,508 "SizeBytes": len(json.dumps(existing_item)),509 }510 )511 # Get stream specifications details for the table512 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)513 if stream_spec:514 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]515 self.forward_stream_records([record], table_name=table_name)516 return result517 @handler("UpdateItem", expand=False)518 def update_item(519 self,520 context: RequestContext,521 update_item_input: UpdateItemInput,522 ) -> UpdateItemOutput:523 existing_item = None524 table_name = update_item_input["TableName"]525 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)526 if event_sources_or_streams_enabled:527 existing_item = ItemFinder.find_existing_item(update_item_input)528 # forward request to backend529 self.fix_return_consumed_capacity(update_item_input)530 result = self.forward_request(context, update_item_input)531 # construct and forward stream record532 if event_sources_or_streams_enabled:533 updated_item = ItemFinder.find_existing_item(update_item_input)534 if updated_item:535 record = self.get_record_template()536 record["eventName"] = "INSERT" if not existing_item else "MODIFY"537 record["dynamodb"].update(538 {539 "Keys": update_item_input["Key"],540 "NewImage": updated_item,541 "SizeBytes": len(json.dumps(updated_item)),542 }543 )544 if existing_item:545 record["dynamodb"]["OldImage"] = existing_item546 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)547 if stream_spec:548 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]549 self.forward_stream_records([record], table_name=table_name)550 return result551 @handler("GetItem", expand=False)552 def get_item(self, context: RequestContext, get_item_input: GetItemInput) -> GetItemOutput:553 result = self.forward_request(context)554 self.fix_consumed_capacity(get_item_input, result)555 return result556 @handler("Query", expand=False)557 def query(self, context: RequestContext, query_input: QueryInput) -> QueryOutput:558 index_name = query_input.get("IndexName")559 if index_name:560 if not is_index_query_valid(query_input):561 raise ValidationException(562 "One or more parameter values were invalid: Select type ALL_ATTRIBUTES "563 "is not supported for global secondary index id-index because its projection "564 "type is not ALL",565 )566 result = self.forward_request(context)567 self.fix_consumed_capacity(query_input, result)568 return result569 @handler("Scan", expand=False)570 def scan(self, context: RequestContext, scan_input: ScanInput) -> ScanOutput:571 return self.forward_request(context)572 @handler("BatchWriteItem", expand=False)573 def batch_write_item(574 self,575 context: RequestContext,576 batch_write_item_input: BatchWriteItemInput,577 ) -> BatchWriteItemOutput:578 existing_items = []579 unprocessed_put_items = []580 unprocessed_delete_items = []581 request_items = batch_write_item_input["RequestItems"]582 for table_name in sorted(request_items.keys()):583 for request in request_items[table_name]:584 for key in ["PutRequest", "DeleteRequest"]:585 inner_request = request.get(key)586 if inner_request:587 if self.should_throttle("BatchWriteItem"):588 if key == "PutRequest":589 unprocessed_put_items.append(inner_request)590 elif key == "DeleteRequest":591 unprocessed_delete_items.append(inner_request)592 else:593 item = ItemFinder.find_existing_item(inner_request, table_name)594 existing_items.append(item)595 # forward request to backend596 result = self.forward_request(context)597 # determine and forward stream records598 request_items = batch_write_item_input["RequestItems"]599 records, unprocessed_items = self.prepare_batch_write_item_records(600 request_items=request_items,601 unprocessed_put_items=unprocessed_put_items,602 unprocessed_delete_items=unprocessed_delete_items,603 existing_items=existing_items,604 )605 streams_enabled_cache = {}606 event_sources_or_streams_enabled = False607 for record in records:608 event_sources_or_streams_enabled = (609 event_sources_or_streams_enabled610 or has_event_sources_or_streams_enabled(611 record["eventSourceARN"], streams_enabled_cache612 )613 )614 if event_sources_or_streams_enabled:615 self.forward_stream_records(records)616 # update response617 if any(unprocessed_items):618 table_name = list(request_items.keys())[0]619 unprocessed = result["UnprocessedItems"]620 if table_name not in unprocessed:621 unprocessed[table_name] = []622 for key in ["PutRequest", "DeleteRequest"]:623 if any(unprocessed_items[key]):624 unprocessed_items[table_name].append({key: unprocessed_items[key]})625 for key in list(unprocessed.keys()):626 if not unprocessed.get(key):627 del unprocessed[key]628 return result629 @handler("TransactWriteItems", expand=False)630 def transact_write_items(631 self,632 context: RequestContext,633 transact_write_items_input: TransactWriteItemsInput,634 ) -> TransactWriteItemsOutput:635 existing_items = []636 for item in transact_write_items_input["TransactItems"]:637 for key in ["Put", "Update", "Delete"]:638 inner_item = item.get(key)639 if inner_item:640 existing_items.append(ItemFinder.find_existing_item(inner_item))641 # forward request to backend642 result = self.forward_request(context)643 # determine and forward stream records644 streams_enabled_cache = {}645 records = self.prepare_transact_write_item_records(646 transact_items=transact_write_items_input["TransactItems"],647 existing_items=existing_items,648 )649 event_sources_or_streams_enabled = False650 for record in records:651 event_sources_or_streams_enabled = (652 event_sources_or_streams_enabled653 or has_event_sources_or_streams_enabled(654 record["eventSourceARN"], streams_enabled_cache655 )656 )657 if event_sources_or_streams_enabled:658 self.forward_stream_records(records)659 return result660 @handler("ExecuteStatement", expand=False)661 def execute_statement(662 self,663 context: RequestContext,664 execute_statement_input: ExecuteStatementInput,665 ) -> ExecuteStatementOutput:666 statement = execute_statement_input["Statement"]667 table_name = extract_table_name_from_partiql_update(statement)668 existing_items = None669 if table_name and has_event_sources_or_streams_enabled(table_name):670 # Note: fetching the entire list of items is hugely inefficient, especially for larger tables671 # TODO: find a mechanism to hook into the PartiQL update mechanism of DynamoDB Local directly!672 existing_items = ItemFinder.list_existing_items_for_statement(statement)673 # forward request to backend674 result = self.forward_request(context)675 # construct and forward stream record676 event_sources_or_streams_enabled = table_name and has_event_sources_or_streams_enabled(677 table_name678 )679 if event_sources_or_streams_enabled:680 records = get_updated_records(table_name, existing_items)681 self.forward_stream_records(records, table_name=table_name)682 return result683 def tag_resource(684 self, context: RequestContext, resource_arn: ResourceArnString, tags: TagList685 ) -> None:686 table_tags = DynamoDBRegion.TABLE_TAGS687 if resource_arn not in table_tags:688 table_tags[resource_arn] = {}689 table_tags[resource_arn].update({tag["Key"]: tag["Value"] for tag in tags})690 def untag_resource(691 self, context: RequestContext, resource_arn: ResourceArnString, tag_keys: TagKeyList692 ) -> None:693 for tag_key in tag_keys or []:694 DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).pop(tag_key, None)695 def list_tags_of_resource(696 self,697 context: RequestContext,698 resource_arn: ResourceArnString,699 next_token: NextTokenString = None,700 ) -> ListTagsOfResourceOutput:701 result = [702 {"Key": k, "Value": v}703 for k, v in DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).items()704 ]705 return ListTagsOfResourceOutput(Tags=result)706 def describe_time_to_live(707 self, context: RequestContext, table_name: TableName708 ) -> DescribeTimeToLiveOutput:709 backend = DynamoDBRegion.get()710 ttl_spec = backend.ttl_specifications.get(table_name)711 result = {"TimeToLiveStatus": "DISABLED"}712 if ttl_spec:713 if ttl_spec.get("Enabled"):714 ttl_status = "ENABLED"715 else:716 ttl_status = "DISABLED"717 result = {718 "AttributeName": ttl_spec.get("AttributeName"),719 "TimeToLiveStatus": ttl_status,720 }721 return DescribeTimeToLiveOutput(TimeToLiveDescription=result)722 def update_time_to_live(723 self,724 context: RequestContext,725 table_name: TableName,726 time_to_live_specification: TimeToLiveSpecification,727 ) -> UpdateTimeToLiveOutput:728 # TODO: TTL status is maintained/mocked but no real expiry is happening for items729 backend = DynamoDBRegion.get()730 backend.ttl_specifications[table_name] = time_to_live_specification731 return UpdateTimeToLiveOutput(TimeToLiveSpecification=time_to_live_specification)732 def create_global_table(733 self, context: RequestContext, global_table_name: TableName, replication_group: ReplicaList734 ) -> CreateGlobalTableOutput:735 if global_table_name in DynamoDBRegion.GLOBAL_TABLES:736 raise GlobalTableAlreadyExistsException("Global table with this name already exists")737 replication_group = [grp.copy() for grp in replication_group or []]738 data = {"GlobalTableName": global_table_name, "ReplicationGroup": replication_group}739 DynamoDBRegion.GLOBAL_TABLES[global_table_name] = data740 for group in replication_group:741 group["ReplicaStatus"] = "ACTIVE"742 group["ReplicaStatusDescription"] = "Replica active"743 return CreateGlobalTableOutput(GlobalTableDescription=data)744 def describe_global_table(745 self, context: RequestContext, global_table_name: TableName746 ) -> DescribeGlobalTableOutput:747 details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)748 if not details:749 raise GlobalTableNotFoundException("Global table with this name does not exist")750 return DescribeGlobalTableOutput(GlobalTableDescription=details)751 def list_global_tables(752 self,753 context: RequestContext,754 exclusive_start_global_table_name: TableName = None,755 limit: PositiveIntegerObject = None,756 region_name: RegionName = None,757 ) -> ListGlobalTablesOutput:758 # TODO: add paging support759 result = [760 select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])761 for tab in DynamoDBRegion.GLOBAL_TABLES.values()762 ]763 return ListGlobalTablesOutput(GlobalTables=result)764 def update_global_table(765 self,766 context: RequestContext,767 global_table_name: TableName,768 replica_updates: ReplicaUpdateList,769 ) -> UpdateGlobalTableOutput:770 details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)771 if not details:772 raise GlobalTableNotFoundException("Global table with this name does not exist")773 for update in replica_updates or []:774 repl_group = details["ReplicationGroup"]775 # delete existing776 delete = update.get("Delete")777 if delete:778 details["ReplicationGroup"] = [779 g for g in repl_group if g["RegionName"] != delete["RegionName"]780 ]781 # create new782 create = update.get("Create")783 if create:784 exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]785 if exists:786 continue787 new_group = {788 "RegionName": create["RegionName"],789 "ReplicaStatus": "ACTIVE",790 "ReplicaStatusDescription": "Replica active",791 }792 details["ReplicationGroup"].append(new_group)793 return UpdateGlobalTableOutput(GlobalTableDescription=details)794 def enable_kinesis_streaming_destination(795 self, context: RequestContext, table_name: TableName, stream_arn: StreamArn796 ) -> KinesisStreamingDestinationOutput:797 # Check if table exists, to avoid error log output from DynamoDBLocal798 if not self.table_exists(table_name):799 raise ResourceNotFoundException("Cannot do operations on a non-existent table")800 stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)801 if not stream:802 raise ValidationException("User does not have a permission to use kinesis stream")803 table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})804 dest_status = table_def.get("KinesisDataStreamDestinationStatus")805 if dest_status not in ["DISABLED", "ENABLE_FAILED", None]:806 raise ValidationException(807 "Table is not in a valid state to enable Kinesis Streaming "808 "Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "809 "to perform ENABLE operation."810 )811 table_def["KinesisDataStreamDestinations"] = (812 table_def.get("KinesisDataStreamDestinations") or []813 )814 # remove the stream destination if already present815 table_def["KinesisDataStreamDestinations"] = [816 t for t in table_def["KinesisDataStreamDestinations"] if t["StreamArn"] != stream_arn817 ]818 # append the active stream destination at the end of the list819 table_def["KinesisDataStreamDestinations"].append(820 {821 "DestinationStatus": "ACTIVE",822 "DestinationStatusDescription": "Stream is active",823 "StreamArn": stream_arn,824 }825 )826 table_def["KinesisDataStreamDestinationStatus"] = "ACTIVE"827 return KinesisStreamingDestinationOutput(828 DestinationStatus="ACTIVE", StreamArn=stream_arn, TableName=table_name829 )830 def disable_kinesis_streaming_destination(831 self, context: RequestContext, table_name: TableName, stream_arn: StreamArn832 ) -> KinesisStreamingDestinationOutput:833 # Check if table exists, to avoid error log output from DynamoDBLocal834 if not self.table_exists(table_name):835 raise ResourceNotFoundException("Cannot do operations on a non-existent table")836 stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)837 if not stream:838 raise ValidationException(839 "User does not have a permission to use kinesis stream",840 )841 table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})842 stream_destinations = table_def.get("KinesisDataStreamDestinations")843 if stream_destinations:844 if table_def["KinesisDataStreamDestinationStatus"] == "ACTIVE":845 for dest in stream_destinations:846 if dest["StreamArn"] == stream_arn and dest["DestinationStatus"] == "ACTIVE":847 dest["DestinationStatus"] = "DISABLED"848 dest["DestinationStatusDescription"] = ("Stream is disabled",)849 table_def["KinesisDataStreamDestinationStatus"] = "DISABLED"850 return KinesisStreamingDestinationOutput(851 DestinationStatus="DISABLED",852 StreamArn=stream_arn,853 TableName=table_name,854 )855 raise ValidationException(856 "Table is not in a valid state to disable Kinesis Streaming Destination:"857 "DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."858 )859 def describe_kinesis_streaming_destination(860 self, context: RequestContext, table_name: TableName861 ) -> DescribeKinesisStreamingDestinationOutput:862 # Check if table exists, to avoid error log output from DynamoDBLocal863 if not self.table_exists(table_name):864 raise ResourceNotFoundException("Cannot do operations on a non-existent table")865 table_def = DynamoDBRegion.get().table_definitions.get(table_name)866 stream_destinations = table_def.get("KinesisDataStreamDestinations") or []867 return DescribeKinesisStreamingDestinationOutput(868 KinesisDataStreamDestinations=stream_destinations, TableName=table_name869 )870 @staticmethod871 def table_exists(table_name):872 return aws_stack.dynamodb_table_exists(table_name)873 @staticmethod874 def prepare_request_headers(headers):875 def _replace(regex, replace):876 headers["Authorization"] = re.sub(877 regex, replace, headers.get("Authorization") or "", flags=re.IGNORECASE878 )879 # Note: We need to ensure that the same access key is used here for all requests,880 # otherwise DynamoDBLocal stores tables/items in separate namespaces881 _replace(r"Credential=[^/]+/", rf"Credential={constants.INTERNAL_AWS_ACCESS_KEY_ID}/")882 # Note: The NoSQL Workbench sends "localhost" or "local" as the region name, which we need to fix here883 _replace(884 r"Credential=([^/]+/[^/]+)/local(host)?/",885 rf"Credential=\1/{aws_stack.get_local_region()}/",886 )887 def fix_return_consumed_capacity(self, request_dict):888 # Fix incorrect values if ReturnValues==ALL_OLD and ReturnConsumedCapacity is889 # empty, see https://github.com/localstack/localstack/issues/2049890 return_values_all = (request_dict.get("ReturnValues") == "ALL_OLD") or (891 not request_dict.get("ReturnValues")892 )893 if return_values_all and not request_dict.get("ReturnConsumedCapacity"):894 request_dict["ReturnConsumedCapacity"] = "TOTAL"895 def fix_consumed_capacity(self, request: Dict, result: Dict):896 # make sure we append 'ConsumedCapacity', which is properly897 # returned by dynalite, but not by AWS's DynamoDBLocal898 table_name = request.get("TableName")899 return_cap = request.get("ReturnConsumedCapacity")900 if "ConsumedCapacity" not in result and return_cap in ["TOTAL", "INDEXES"]:901 request["ConsumedCapacity"] = {902 "TableName": table_name,903 "CapacityUnits": 5, # TODO hardcoded904 "ReadCapacityUnits": 2,905 "WriteCapacityUnits": 3,906 }907 def fix_table_arn(self, table_arn: str) -> str:908 return re.sub(909 "arn:aws:dynamodb:ddblocal:",910 f"arn:aws:dynamodb:{aws_stack.get_region()}:",911 table_arn,912 )913 def prepare_transact_write_item_records(self, transact_items, existing_items):914 records = []915 record = self.get_record_template()916 # Fix issue #2745: existing_items only contain the Put/Update/Delete records,917 # so we will increase the index based on these events918 i = 0919 for request in transact_items:920 put_request = request.get("Put")921 if put_request:922 existing_item = existing_items[i]923 table_name = put_request["TableName"]924 keys = SchemaExtractor.extract_keys(item=put_request["Item"], table_name=table_name)925 # Add stream view type to record if ddb stream is enabled926 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)927 if stream_spec:928 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]929 new_record = copy.deepcopy(record)930 new_record["eventID"] = short_uid()931 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"932 new_record["dynamodb"]["Keys"] = keys933 new_record["dynamodb"]["NewImage"] = put_request["Item"]934 if existing_item:935 new_record["dynamodb"]["OldImage"] = existing_item936 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)937 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))938 records.append(new_record)939 i += 1940 update_request = request.get("Update")941 if update_request:942 table_name = update_request["TableName"]943 keys = update_request["Key"]944 updated_item = ItemFinder.find_existing_item(update_request, table_name)945 if not updated_item:946 return []947 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)948 if stream_spec:949 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]950 new_record = copy.deepcopy(record)951 new_record["eventID"] = short_uid()952 new_record["eventName"] = "MODIFY"953 new_record["dynamodb"]["Keys"] = keys954 new_record["dynamodb"]["OldImage"] = existing_items[i]955 new_record["dynamodb"]["NewImage"] = updated_item956 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)957 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))958 records.append(new_record)959 i += 1960 delete_request = request.get("Delete")961 if delete_request:962 table_name = delete_request["TableName"]963 keys = delete_request["Key"]964 existing_item = existing_items[i]965 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)966 if stream_spec:967 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]968 new_record = copy.deepcopy(record)969 new_record["eventID"] = short_uid()970 new_record["eventName"] = "REMOVE"971 new_record["dynamodb"]["Keys"] = keys972 new_record["dynamodb"]["OldImage"] = existing_item973 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_item))974 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)975 records.append(new_record)976 i += 1977 return records978 def prepare_batch_write_item_records(979 self,980 request_items,981 existing_items,982 unprocessed_put_items: List,983 unprocessed_delete_items: List,984 ):985 records = []986 record = self.get_record_template()987 unprocessed_items = {"PutRequest": {}, "DeleteRequest": {}}988 i = 0989 for table_name in sorted(request_items.keys()):990 # Add stream view type to record if ddb stream is enabled991 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)992 if stream_spec:993 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]994 for request in request_items[table_name]:995 put_request = request.get("PutRequest")996 if put_request:997 if existing_items and len(existing_items) > i:998 existing_item = existing_items[i]999 keys = SchemaExtractor.extract_keys(1000 item=put_request["Item"], table_name=table_name1001 )1002 new_record = copy.deepcopy(record)1003 new_record["eventID"] = short_uid()1004 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))1005 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"1006 new_record["dynamodb"]["Keys"] = keys1007 new_record["dynamodb"]["NewImage"] = put_request["Item"]1008 if existing_item:1009 new_record["dynamodb"]["OldImage"] = existing_item1010 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1011 records.append(new_record)1012 if unprocessed_put_items and len(unprocessed_put_items) > i:1013 unprocessed_item = unprocessed_put_items[i]1014 if unprocessed_item:1015 unprocessed_items["PutRequest"].update(1016 json.loads(json.dumps(unprocessed_item))1017 )1018 delete_request = request.get("DeleteRequest")1019 if delete_request:1020 if existing_items and len(existing_items) > i:1021 keys = delete_request["Key"]1022 new_record = copy.deepcopy(record)1023 new_record["eventID"] = short_uid()1024 new_record["eventName"] = "REMOVE"1025 new_record["dynamodb"]["Keys"] = keys1026 new_record["dynamodb"]["OldImage"] = existing_items[i]1027 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_items[i]))1028 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1029 records.append(new_record)1030 if unprocessed_delete_items and len(unprocessed_delete_items) > i:1031 unprocessed_item = unprocessed_delete_items[i]1032 if unprocessed_item:1033 unprocessed_items["DeleteRequest"].update(1034 json.loads(json.dumps(unprocessed_item))1035 )1036 i += 11037 return records, unprocessed_items1038 def forward_stream_records(self, records: List[Dict], table_name: str = None):1039 if records and "eventName" in records[0]:1040 if table_name:1041 for record in records:1042 record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1043 EventForwarder.forward_to_targets(records, background=True)1044 def delete_all_event_source_mappings(self, table_arn):1045 if table_arn:1046 # fix start dynamodb service without lambda1047 if not is_api_enabled("lambda"):1048 return1049 lambda_client = aws_stack.connect_to_service("lambda")1050 result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)1051 for event in result["EventSourceMappings"]:1052 event_source_mapping_id = event["UUID"]1053 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_id)1054 def get_record_template(self) -> Dict:1055 return {1056 "eventID": short_uid(),1057 "eventVersion": "1.1",...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful