How to use prepare_invocation method in localstack

Best Python code snippet using localstack_python

lambda_executors.py

Source:lambda_executors.py Github

copy

Full Screen

...171 pass172 def should_apply(self, context: InvocationContext) -> bool:173 """Whether the plugin logic should get applied for the given Lambda invocation context."""174 return False175 def prepare_invocation(176 self, context: InvocationContext177 ) -> Optional[Union[AdditionalInvocationOptions, InvocationResult]]:178 """Return additional invocation options for given Lambda invocation context. Optionally, an179 InvocationResult can be returned, in which case the result is returned to the client right away."""180 return None181 def process_result(182 self, context: InvocationContext, result: InvocationResult183 ) -> InvocationResult:184 """Optionally modify the result returned from the given Lambda invocation."""185 return result186 def init_function_configuration(self, lambda_function: LambdaFunction):187 """Initialize the configuration of the given function upon creation or function update."""188 pass189 def init_function_code(self, lambda_function: LambdaFunction):190 """Initialize the code of the given function upon creation or function update."""191 pass192 @classmethod193 def get_plugins(cls) -> List["LambdaExecutorPlugin"]:194 if not cls.INSTANCES:195 classes = get_all_subclasses(LambdaExecutorPlugin)196 cls.INSTANCES = [clazz() for clazz in classes]197 return cls.INSTANCES198class LambdaInvocationForwarderPlugin(LambdaExecutorPlugin):199 """Plugin that forwards Lambda invocations to external targets defined in LAMBDA_FORWARD_URL"""200 def should_apply(self, context: InvocationContext) -> bool:201 """If LAMBDA_FORWARD_URL is configured, forward the invocation of this Lambda to the target URL."""202 func_forward_url = self._forward_url(context)203 return bool(func_forward_url)204 def prepare_invocation(205 self, context: InvocationContext206 ) -> Optional[Union[AdditionalInvocationOptions, InvocationResult]]:207 forward_url = self._forward_url(context)208 result = self._forward_to_url(209 forward_url,210 context.lambda_function,211 context.event,212 context.context,213 context.invocation_type,214 )215 return result216 def _forward_to_url(217 self,218 forward_url: str,219 lambda_function: LambdaFunction,220 event: Union[Dict, bytes],221 context: LambdaContext,222 invocation_type: str,223 ) -> InvocationResult:224 func_name = lambda_function.name()225 url = "%s%s/functions/%s/invocations" % (forward_url, API_PATH_ROOT, func_name)226 copied_env_vars = lambda_function.envvars.copy()227 copied_env_vars["LOCALSTACK_HOSTNAME"] = config.HOSTNAME_EXTERNAL228 copied_env_vars["LOCALSTACK_EDGE_PORT"] = str(config.EDGE_PORT)229 headers = aws_stack.mock_aws_request_headers("lambda")230 headers["X-Amz-Region"] = lambda_function.region()231 headers["X-Amz-Request-Id"] = context.aws_request_id232 headers["X-Amz-Handler"] = lambda_function.handler233 headers["X-Amz-Function-ARN"] = context.invoked_function_arn234 headers["X-Amz-Function-Name"] = context.function_name235 headers["X-Amz-Function-Version"] = context.function_version236 headers["X-Amz-Role"] = lambda_function.role237 headers["X-Amz-Runtime"] = lambda_function.runtime238 headers["X-Amz-Timeout"] = str(lambda_function.timeout)239 headers["X-Amz-Memory-Size"] = str(context.memory_limit_in_mb)240 headers["X-Amz-Log-Group-Name"] = context.log_group_name241 headers["X-Amz-Log-Stream-Name"] = context.log_stream_name242 headers["X-Amz-Env-Vars"] = json.dumps(copied_env_vars)243 headers["X-Amz-Last-Modified"] = str(int(lambda_function.last_modified.timestamp() * 1000))244 headers["X-Amz-Invocation-Type"] = invocation_type245 headers["X-Amz-Log-Type"] = "Tail"246 if context.client_context:247 headers["X-Amz-Client-Context"] = context.client_context248 if context.cognito_identity:249 headers["X-Amz-Cognito-Identity"] = context.cognito_identity250 data = run_safe(lambda: to_str(event)) or event251 data = json.dumps(json_safe(data)) if isinstance(data, dict) else str(data)252 LOG.debug(253 "Forwarding Lambda invocation to LAMBDA_FORWARD_URL: %s" % config.LAMBDA_FORWARD_URL254 )255 result = safe_requests.post(url, data, headers=headers)256 if result.status_code >= 400:257 raise Exception(258 "Received error status code %s from external Lambda invocation" % result.status_code259 )260 content = run_safe(lambda: to_str(result.content)) or result.content261 LOG.debug(262 "Received result from external Lambda endpoint (status %s): %s"263 % (result.status_code, content)264 )265 result = InvocationResult(content)266 return result267 def _forward_url(self, context: InvocationContext) -> str:268 env_vars = context.lambda_function.envvars269 return env_vars.get("LOCALSTACK_LAMBDA_FORWARD_URL") or config.LAMBDA_FORWARD_URL270def handle_error(271 lambda_function: LambdaFunction, event: Dict, error: Exception, asynchronous: bool = False272):273 if asynchronous:274 if get_record_from_event(event, "eventSource") == EVENT_SOURCE_SQS:275 sqs_queue_arn = get_record_from_event(event, "eventSourceARN")276 if sqs_queue_arn:277 # event source is SQS, send event back to dead letter queue278 return sqs_error_to_dead_letter_queue(sqs_queue_arn, event, error)279 else:280 # event source is not SQS, send back to lambda dead letter queue281 lambda_error_to_dead_letter_queue(lambda_function, event, error)282class LambdaAsyncLocks:283 locks: Dict[str, Union[threading.Semaphore, threading.Lock]]284 creation_lock: threading.Lock285 def __init__(self):286 self.locks = {}287 self.creation_lock = threading.Lock()288 def assure_lock_present(289 self, key: str, lock: Union[threading.Semaphore, threading.Lock]290 ) -> Union[threading.Semaphore, threading.Lock]:291 with self.creation_lock:292 return self.locks.setdefault(key, lock)293LAMBDA_ASYNC_LOCKS = LambdaAsyncLocks()294class LambdaExecutor(object):295 """Base class for Lambda executors. Subclasses must overwrite the _execute method"""296 def __init__(self):297 # keeps track of each function arn and the last time it was invoked298 self.function_invoke_times = {}299 def _prepare_environment(self, lambda_function: LambdaFunction):300 # setup environment pre-defined variables for docker environment301 result = lambda_function.envvars.copy()302 # injecting aws credentials into docker environment if not provided303 aws_stack.inject_test_credentials_into_env(result)304 # injecting the region into the docker environment305 aws_stack.inject_region_into_env(result, lambda_function.region())306 return result307 def execute(308 self,309 func_arn: str, # TODO remove and get from lambda_function310 lambda_function: LambdaFunction,311 event: Dict,312 context: LambdaContext = None,313 version: str = None,314 asynchronous: bool = False,315 callback: Callable = None,316 lock_discriminator: str = None,317 ):318 # note: leave here to avoid circular import issues319 from localstack.utils.aws.message_forwarding import lambda_result_to_destination320 def do_execute(*args):321 @cloudwatched("lambda")322 def _run(func_arn=None):323 with contextlib.ExitStack() as stack:324 if lock_discriminator:325 stack.enter_context(LAMBDA_ASYNC_LOCKS.locks[lock_discriminator])326 # set the invocation time in milliseconds327 invocation_time = int(time.time() * 1000)328 # start the execution329 raised_error = None330 result = None331 dlq_sent = None332 invocation_type = "Event" if asynchronous else "RequestResponse"333 inv_context = InvocationContext(334 lambda_function,335 event=event,336 function_version=version,337 context=context,338 invocation_type=invocation_type,339 )340 try:341 result = self._execute(lambda_function, inv_context)342 except Exception as e:343 raised_error = e344 dlq_sent = handle_error(lambda_function, event, e, asynchronous)345 raise e346 finally:347 self.function_invoke_times[func_arn] = invocation_time348 callback and callback(349 result, func_arn, event, error=raised_error, dlq_sent=dlq_sent350 )351 lambda_result_to_destination(352 lambda_function, event, result, asynchronous, raised_error353 )354 # return final result355 return result356 return _run(func_arn=func_arn)357 # Inform users about asynchronous mode of the lambda execution.358 if asynchronous:359 LOG.debug(360 "Lambda executed in Event (asynchronous) mode, no response will be returned to caller"361 )362 FuncThread(do_execute).start()363 return InvocationResult(None, log_output="Lambda executed asynchronously.")364 return do_execute()365 def _execute(self, lambda_function: LambdaFunction, inv_context: InvocationContext):366 """This method must be overwritten by subclasses."""367 raise NotImplementedError368 def startup(self):369 """Called once during startup - can be used, e.g., to prepare Lambda Docker environment"""370 pass371 def cleanup(self, arn=None):372 """Called once during startup - can be used, e.g., to clean up left-over Docker containers"""373 pass374 def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:375 """Make the given file available to the Lambda process (e.g., by copying into the container) for the376 given invocation context; Returns the path to the file that will be available to the Lambda handler."""377 raise NotImplementedError378 def apply_plugin_patches(self, inv_context: InvocationContext) -> Optional[InvocationResult]:379 """Loop through the list of plugins, and apply their patches to the invocation context (if applicable)"""380 invocation_results = []381 for plugin in LambdaExecutorPlugin.get_plugins():382 if not plugin.should_apply(inv_context):383 continue384 # initialize, if not done yet385 if not hasattr(plugin, "_initialized"):386 LOG.debug("Initializing Lambda executor plugin %s", plugin.__class__)387 plugin.initialize()388 plugin._initialized = True389 # invoke plugin to prepare invocation390 inv_options = plugin.prepare_invocation(inv_context)391 if not inv_options:392 continue393 if isinstance(inv_options, InvocationResult):394 invocation_results.append(inv_options)395 continue396 # copy files397 file_keys_map = {}398 for key, file_path in inv_options.files_to_add.items():399 file_in_container = self.provide_file_to_lambda(file_path, inv_context)400 file_keys_map[key] = file_in_container401 # replace placeholders like "{<fileKey>}" with corresponding file path402 for key, file_path in file_keys_map.items():403 for env_key, env_value in inv_options.env_updates.items():404 inv_options.env_updates[env_key] = str(env_value).replace(...

Full Screen

Full Screen

thundra.py

Source:thundra.py Github

copy

Full Screen

...87 thundra_apikey = _get_apikey(context.environment)88 if not thundra_apikey:89 return False90 return True91 def prepare_invocation(92 self, context: InvocationContext93 ) -> Optional[AdditionalInvocationOptions]:94 # download and initialize Java agent95 _ensure_java_agent_initialized()96 result = AdditionalInvocationOptions()97 environment = context.environment98 agent_flag = "-javaagent:{agent_path}"99 # Inject Thundra agent path into "JAVA_TOOL_OPTIONS" env var,100 # so it will be automatically loaded on JVM startup101 java_tool_opts = environment.get("JAVA_TOOL_OPTIONS", "")102 if agent_flag not in java_tool_opts:103 java_tool_opts += f" {agent_flag}"104 result.env_updates["JAVA_TOOL_OPTIONS"] = java_tool_opts.strip()105 # Disable CDS (Class Data Sharing),...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful