How to use find_global_table_region method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...249 # special case: AWS NoSQL Workbench sends "localhost" as region - replace with proper region here250 if _region == "localhost":251 _region = aws_stack.get_local_region()252 return dynamodb_stores[_account_id][_region]253def find_global_table_region(table_name: str, target_region: str | None = None) -> str | None:254 """255 Check if the table is a Version 2019.11.21 table.256 :param target_region: the region we are looking for257 :param table_name: the name of the table258 :return: the original region; `None` if this is not a global table259 """260 replicas = get_store().REPLICA_UPDATES.get(table_name)261 if not replicas:262 return None263 original_region = list(replicas.keys())[0]264 return original_region if target_region in replicas[original_region] else None265@contextmanager266def modify_context_region(context: RequestContext, region: str):267 """268 Context manager that modifies the region of a `RequestContext`. At the exit, the context is restored to its269 original state.270 :param context: the context to modify271 :param region: the modified region272 :return: a modified `RequestContext`273 """274 original_region = context.region275 original_authorization = context.request.headers.get("Authorization")276 context.region = region277 context.request.headers["Authorization"] = re.sub(278 r"Credential=([^/]+/[^/]+)/(.*?)/",279 rf"Credential=\1/{region}/",280 original_authorization or "",281 flags=re.IGNORECASE,282 )283 yield context284 # revert the original context285 context.region = original_region286 context.request.headers["Authorization"] = original_authorization287class DynamoDBProvider(DynamodbApi, ServiceLifecycleHook):288 def __init__(self):289 self.request_forwarder = get_request_forwarder_http(self.get_forward_url)290 def on_after_init(self):291 # add response processor specific to ddblocal292 handlers.modify_service_response.append(self.service, self._modify_ddblocal_arns)293 # routes for the shell ui294 ROUTER.add(295 path="/shell",296 endpoint=self.handle_shell_ui_redirect,297 methods=["GET"],298 )299 ROUTER.add(300 path="/shell/<regex('.*'):req_path>",301 endpoint=self.handle_shell_ui_request,302 )303 def _modify_ddblocal_arns(self, chain, context: RequestContext, response: Response):304 """A service response handler that modifies the dynamodb backend response."""305 if response_content := response.get_data(as_text=True):306 # fix the table and latest stream ARNs (DynamoDBLocal hardcodes "ddblocal" as the region)307 content_replaced = re.sub(308 r'("TableArn"|"LatestStreamArn"|"StreamArn")\s*:\s*"arn:([a-z-]+):dynamodb:ddblocal:([^"]+)"',309 rf'\1: "arn:\2:dynamodb:{aws_stack.get_region()}:\3"',310 response_content,311 )312 if content_replaced != response_content:313 response.data = content_replaced314 context.service_response = (315 None # make sure the service response is parsed again later316 )317 # update x-amz-crc32 header required by some clients318 response.headers["x-amz-crc32"] = crc32(response.data) & 0xFFFFFFFF319 def forward_request(320 self, context: RequestContext, service_request: ServiceRequest = None321 ) -> ServiceResponse:322 # check rate limiting for this request and raise an error, if provisioned throughput is exceeded323 self.check_provisioned_throughput(context.operation.name)324 # note: modifying headers in-place here before forwarding the request325 self.prepare_request_headers(context.request.headers)326 return self.request_forwarder(context, service_request)327 def get_forward_url(self) -> str:328 """Return the URL of the backend DynamoDBLocal server to forward requests to"""329 return f"http://{LOCALHOST}:{server.get_server().port}"330 def on_before_start(self):331 start_dynamodb()332 wait_for_dynamodb()333 def handle_shell_ui_redirect(self, request: werkzeug.Request) -> Response:334 headers = {"Refresh": f"0; url={config.service_url('dynamodb')}/shell/index.html"}335 return Response("", headers=headers)336 def handle_shell_ui_request(self, request: werkzeug.Request, req_path: str) -> Response:337 # TODO: "DynamoDB Local Web Shell was deprecated with version 1.16.X and is not available any338 # longer from 1.17.X to latest. There are no immediate plans for a new Web Shell to be introduced."339 # -> keeping this for now, to allow configuring custom installs; should consider removing it in the future340 # https://repost.aws/questions/QUHyIzoEDqQ3iOKlUEp1LPWQ#ANdBm9Nz9TRf6VqR3jZtcA1g341 req_path = f"/{req_path}" if not req_path.startswith("/") else req_path342 url = f"{self.get_forward_url()}/shell{req_path}"343 result = requests.request(344 method=request.method, url=url, headers=request.headers, data=request.data345 )346 return Response(result.content, headers=dict(result.headers), status=result.status_code)347 @handler("CreateTable", expand=False)348 def create_table(349 self,350 context: RequestContext,351 create_table_input: CreateTableInput,352 ) -> CreateTableOutput:353 # Check if table exists, to avoid error log output from DynamoDBLocal354 table_name = create_table_input["TableName"]355 if self.table_exists(table_name):356 raise ResourceInUseException("Cannot create preexisting table")357 billing_mode = create_table_input.get("BillingMode")358 provisioned_throughput = create_table_input.get("ProvisionedThroughput")359 if billing_mode == BillingMode.PAY_PER_REQUEST and provisioned_throughput is not None:360 raise ValidationException(361 "One or more parameter values were invalid: Neither ReadCapacityUnits nor WriteCapacityUnits can be "362 "specified when BillingMode is PAY_PER_REQUEST"363 )364 # forward request to backend365 result = self.forward_request(context)366 table_description = result["TableDescription"]367 backend = get_store(context)368 backend.table_definitions[table_name] = table_definitions = dict(create_table_input)369 if "TableId" not in table_definitions:370 table_definitions["TableId"] = long_uid()371 if "SSESpecification" in table_definitions:372 sse_specification = table_definitions.pop("SSESpecification")373 table_definitions["SSEDescription"] = SSEUtils.get_sse_description(sse_specification)374 if table_definitions:375 table_content = result.get("Table", {})376 table_content.update(table_definitions)377 table_description.update(table_content)378 if "StreamSpecification" in table_definitions:379 create_dynamodb_stream(table_definitions, table_description.get("LatestStreamLabel"))380 if "TableClass" in table_definitions:381 table_class = table_description.pop("TableClass", None) or table_definitions.pop(382 "TableClass"383 )384 table_description["TableClassSummary"] = {"TableClass": table_class}385 tags = table_definitions.pop("Tags", [])386 if tags:387 table_arn = table_description["TableArn"]388 table_arn = self.fix_table_arn(table_arn)389 get_store(context).TABLE_TAGS[table_arn] = {tag["Key"]: tag["Value"] for tag in tags}390 # remove invalid attributes from result391 table_description.pop("Tags", None)392 table_description.pop("BillingMode", None)393 return result394 def delete_table(self, context: RequestContext, table_name: TableName) -> DeleteTableOutput:395 # Check if table exists, to avoid error log output from DynamoDBLocal396 if not self.table_exists(table_name):397 raise ResourceNotFoundException("Cannot do operations on a non-existent table")398 # forward request to backend399 result = self.forward_request(context)400 table_arn = result.get("TableDescription", {}).get("TableArn")401 table_arn = self.fix_table_arn(table_arn)402 self.delete_all_event_source_mappings(table_arn)403 dynamodbstreams_api.delete_streams(table_arn)404 get_store(context).TABLE_TAGS.pop(table_arn, None)405 return result406 def _forward_request(self, context: RequestContext, region: str | None) -> ServiceResponse:407 if region:408 with modify_context_region(context, region):409 return self.forward_request(context)410 return self.forward_request(context)411 def describe_table(self, context: RequestContext, table_name: TableName) -> DescribeTableOutput:412 global_table_region: str | None = find_global_table_region(413 table_name=table_name, target_region=context.region414 )415 # Check if table exists, to avoid error log output from DynamoDBLocal416 if not self.table_exists(table_name) and not global_table_region:417 raise ResourceNotFoundException("Cannot do operations on a non-existent table")418 result = self._forward_request(context=context, region=global_table_region)419 # update response with additional props420 table_props = get_store(context).table_properties.get(table_name)421 if table_props:422 result.get("Table", {}).update(table_props)423 replicas: Dict = get_store().REPLICA_UPDATES.get(table_name)424 if replicas and replicas.get(context.region):425 regions = replicas.get(context.region)426 result.get("Table", {}).update(427 {"Replicas": [{"RegionName": region} for region in regions]}428 )429 # update only TableId and SSEDescription if present430 table_definitions = get_store(context).table_definitions.get(table_name)431 if table_definitions:432 for key in ["TableId", "SSEDescription"]:433 if table_definitions.get(key):434 result.get("Table", {})[key] = table_definitions[key]435 if "TableClass" in table_definitions:436 result.get("Table", {})["TableClassSummary"] = {437 "TableClass": table_definitions["TableClass"]438 }439 return result440 @handler("UpdateTable", expand=False)441 def update_table(442 self, context: RequestContext, update_table_input: UpdateTableInput443 ) -> UpdateTableOutput:444 try:445 # forward request to backend446 result = self.forward_request(context)447 except CommonServiceException as e:448 is_no_update_error = (449 e.code == "ValidationException" and "Nothing to update" in e.message450 )451 if not is_no_update_error or not list(452 {"TableClass", "ReplicaUpdates"} & set(update_table_input.keys())453 ):454 raise455 table_name = update_table_input.get("TableName")456 if update_table_input.get("TableClass"):457 table_definitions = get_store(context).table_definitions.setdefault(table_name, {})458 table_definitions["TableClass"] = update_table_input.get("TableClass")459 if update_table_input.get("ReplicaUpdates"):460 # update local table props (replicas)461 table_properties: Dict = get_store(context).REPLICA_UPDATES462 table_properties[table_name] = table_replicas = (463 table_properties.get(table_name) or {}464 )465 # get dict of replicas for the table => {original_region: {regions}}466 original_region = context.region467 table_replicas[original_region] = region_replicas = (468 table_replicas.get(original_region) or set()469 )470 for repl_update in update_table_input["ReplicaUpdates"]:471 for key, details in repl_update.items():472 # target region473 region = details.get("RegionName")474 match key:475 case "Create":476 table_replicas[original_region].add(region)477 case "Update":478 table_replicas[original_region] = set(479 [r for r in region_replicas if r == region]480 )481 case "Delete":482 table_replicas[original_region] = set(483 [r for r in region_replicas if r != region]484 )485 # update response content486 schema = SchemaExtractor.get_table_schema(table_name)487 return UpdateTableOutput(TableDescription=schema["Table"])488 if "StreamSpecification" in update_table_input:489 create_dynamodb_stream(490 update_table_input, result["TableDescription"].get("LatestStreamLabel")491 )492 return result493 def list_tables(494 self,495 context: RequestContext,496 exclusive_start_table_name: TableName = None,497 limit: ListTablesInputLimit = None,498 ) -> ListTablesOutput:499 return self.forward_request(context)500 @handler("PutItem", expand=False)501 def put_item(self, context: RequestContext, put_item_input: PutItemInput) -> PutItemOutput:502 global_table_region: str | None = find_global_table_region(503 table_name=put_item_input.get("TableName"), target_region=context.region504 )505 existing_item = None506 table_name = put_item_input["TableName"]507 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)508 if event_sources_or_streams_enabled:509 existing_item = ItemFinder.find_existing_item(put_item_input)510 result = self._forward_request(context=context, region=global_table_region)511 # Get stream specifications details for the table512 if event_sources_or_streams_enabled:513 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)514 item = put_item_input["Item"]515 # prepare record keys516 keys = SchemaExtractor.extract_keys(item=item, table_name=table_name)517 # create record518 record = self.get_record_template()519 record["eventName"] = "INSERT" if not existing_item else "MODIFY"520 record["dynamodb"].update(521 {522 "Keys": keys,523 "NewImage": item,524 "SizeBytes": _get_size_bytes(item),525 }526 )527 if stream_spec:528 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]529 if existing_item:530 record["dynamodb"]["OldImage"] = existing_item531 self.forward_stream_records([record], table_name=table_name)532 return result533 @handler("DeleteItem", expand=False)534 def delete_item(535 self,536 context: RequestContext,537 delete_item_input: DeleteItemInput,538 ) -> DeleteItemOutput:539 existing_item = None540 table_name = delete_item_input["TableName"]541 if has_event_sources_or_streams_enabled(table_name):542 existing_item = ItemFinder.find_existing_item(delete_item_input)543 # forward request to backend544 result = self.forward_request(context)545 # determine and forward stream record546 if existing_item:547 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)548 if event_sources_or_streams_enabled:549 # create record550 record = self.get_record_template()551 record["eventName"] = "REMOVE"552 record["dynamodb"].update(553 {554 "Keys": delete_item_input["Key"],555 "OldImage": existing_item,556 "SizeBytes": _get_size_bytes(existing_item),557 }558 )559 # Get stream specifications details for the table560 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)561 if stream_spec:562 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]563 self.forward_stream_records([record], table_name=table_name)564 return result565 @handler("UpdateItem", expand=False)566 def update_item(567 self,568 context: RequestContext,569 update_item_input: UpdateItemInput,570 ) -> UpdateItemOutput:571 existing_item = None572 table_name = update_item_input["TableName"]573 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)574 if event_sources_or_streams_enabled:575 existing_item = ItemFinder.find_existing_item(update_item_input)576 # forward request to backend577 result = self.forward_request(context)578 # construct and forward stream record579 if event_sources_or_streams_enabled:580 updated_item = ItemFinder.find_existing_item(update_item_input)581 if updated_item:582 record = self.get_record_template()583 record["eventName"] = "INSERT" if not existing_item else "MODIFY"584 record["dynamodb"].update(585 {586 "Keys": update_item_input["Key"],587 "NewImage": updated_item,588 "SizeBytes": _get_size_bytes(updated_item),589 }590 )591 if existing_item:592 record["dynamodb"]["OldImage"] = existing_item593 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)594 if stream_spec:595 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]596 self.forward_stream_records([record], table_name=table_name)597 return result598 @handler("GetItem", expand=False)599 def get_item(self, context: RequestContext, get_item_input: GetItemInput) -> GetItemOutput:600 global_table_region: str | None = find_global_table_region(601 table_name=get_item_input.get("TableName"), target_region=context.region602 )603 result = self._forward_request(context=context, region=global_table_region)604 self.fix_consumed_capacity(get_item_input, result)605 return result606 @handler("Query", expand=False)607 def query(self, context: RequestContext, query_input: QueryInput) -> QueryOutput:608 index_name = query_input.get("IndexName")609 if index_name:610 if not is_index_query_valid(query_input):611 raise ValidationException(612 "One or more parameter values were invalid: Select type ALL_ATTRIBUTES "613 "is not supported for global secondary index id-index because its projection "614 "type is not ALL",...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful