How to use _adjust_partition method in localstack

Best Python code snippet using localstack_python

generic_proxy.py

Source:generic_proxy.py Github

copy

Full Screen

...212 ) -> Optional[RoutingRequest]:213 return RoutingRequest(214 method=method,215 path=self._adjust_partition_in_path(path, self.DEFAULT_INBOUND_PARTITION),216 data=self._adjust_partition(data, self.DEFAULT_INBOUND_PARTITION),217 headers=self._adjust_partition(headers, self.DEFAULT_INBOUND_PARTITION),218 )219 def return_response(220 self,221 method: str,222 path: str,223 data: MessagePayload,224 headers: Headers,225 response: Response,226 ) -> Optional[RoutingResponse]:227 # Only handle responses for calls from external clients228 if is_internal_call_context(headers):229 return None230 return RoutingResponse(231 status_code=response.status_code,232 content=self._adjust_partition(response.content),233 headers=self._adjust_partition(response.headers),234 )235 def _adjust_partition_in_path(self, path: str, static_partition: str = None):236 """Adjusts the (still url encoded) URL path"""237 parsed_url = urlparse(path)238 # Make sure to keep blank values, otherwise we drop query params which do not have a239 # value (f.e. "/?policy")240 decoded_query = parse_qs(qs=parsed_url.query, keep_blank_values=True)241 adjusted_path = self._adjust_partition(parsed_url.path, static_partition)242 adjusted_query = self._adjust_partition(decoded_query, static_partition)243 encoded_query = urlencode(adjusted_query, doseq=True)244 # Make sure to avoid empty equals signs (in between and in the end)245 encoded_query = encoded_query.replace("=&", "&")246 encoded_query = re.sub(r"=$", "", encoded_query)247 return f"{adjusted_path}{('?' + encoded_query) if encoded_query else ''}"248 def _adjust_partition(self, source, static_partition: str = None):249 # Call this function recursively if we get a dictionary or a list250 if isinstance(source, dict):251 result = {}252 for k, v in source.items():253 result[k] = self._adjust_partition(v, static_partition)254 return result255 if isinstance(source, list):256 result = []257 for v in source:258 result.append(self._adjust_partition(v, static_partition))259 return result260 elif isinstance(source, bytes):261 try:262 decoded = unquote(to_str(source))263 adjusted = self._adjust_partition(decoded, static_partition)264 return to_bytes(adjusted)265 except UnicodeDecodeError:266 # If the body can't be decoded to a string, we return the initial source267 return source268 elif not isinstance(source, str):269 # Ignore any other types270 return source271 return self.arn_regex.sub(lambda m: self._adjust_match(m, static_partition), source)272 def _adjust_match(self, match: Match, static_partition: str = None):273 region = match.group("Region")274 partition = self._partition_lookup(region) if static_partition is None else static_partition275 service = match.group("Service")276 account_id = match.group("AccountID")277 resource_path = match.group("ResourcePath")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful