Best Python code snippet using localstack_python
lambda_executors.py
Source:lambda_executors.py  
...374    def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:375        """Make the given file available to the Lambda process (e.g., by copying into the container) for the376        given invocation context; Returns the path to the file that will be available to the Lambda handler."""377        raise NotImplementedError378    def apply_plugin_patches(self, inv_context: InvocationContext) -> Optional[InvocationResult]:379        """Loop through the list of plugins, and apply their patches to the invocation context (if applicable)"""380        invocation_results = []381        for plugin in LambdaExecutorPlugin.get_plugins():382            if not plugin.should_apply(inv_context):383                continue384            # initialize, if not done yet385            if not hasattr(plugin, "_initialized"):386                LOG.debug("Initializing Lambda executor plugin %s", plugin.__class__)387                plugin.initialize()388                plugin._initialized = True389            # invoke plugin to prepare invocation390            inv_options = plugin.prepare_invocation(inv_context)391            if not inv_options:392                continue393            if isinstance(inv_options, InvocationResult):394                invocation_results.append(inv_options)395                continue396            # copy files397            file_keys_map = {}398            for key, file_path in inv_options.files_to_add.items():399                file_in_container = self.provide_file_to_lambda(file_path, inv_context)400                file_keys_map[key] = file_in_container401            # replace placeholders like "{<fileKey>}" with corresponding file path402            for key, file_path in file_keys_map.items():403                for env_key, env_value in inv_options.env_updates.items():404                    inv_options.env_updates[env_key] = str(env_value).replace(405                        "{%s}" % key, file_path406                    )407                if inv_options.updated_command:408                    inv_options.updated_command = inv_options.updated_command.replace(409                        "{%s}" % key, file_path410                    )411                    inv_context.lambda_command = inv_options.updated_command412            # update environment413            inv_context.environment.update(inv_options.env_updates)414            # update handler415            if inv_options.updated_handler:416                inv_context.handler = inv_options.updated_handler417        if invocation_results:418            # TODO: This is currently indeterministic! If multiple execution plugins attempt to return419            #  an invocation result right away, only the first one is returned. We need a more solid420            #  mechanism for conflict resolution if multiple plugins interfere!421            if len(invocation_results) > 1:422                LOG.warning(423                    "Multiple invocation results returned from "424                    "LambdaExecutorPlugin.prepare_invocation calls - choosing the first one: %s",425                    invocation_results,426                )427            return invocation_results[0]428    def process_result_via_plugins(429        self, inv_context: InvocationContext, invocation_result: InvocationResult430    ) -> InvocationResult:431        """Loop through the list of plugins, and apply their post-processing logic to the Lambda invocation result."""432        for plugin in LambdaExecutorPlugin.get_plugins():433            if not plugin.should_apply(inv_context):434                continue435            invocation_result = plugin.process_result(inv_context, invocation_result)436        return invocation_result437class ContainerInfo:438    """Contains basic information about a docker container."""439    def __init__(self, name, entry_point):440        self.name = name441        self.entry_point = entry_point442class LambdaExecutorContainers(LambdaExecutor):443    """Abstract executor class for executing Lambda functions in Docker containers"""444    def execute_in_container(445        self,446        lambda_function: LambdaFunction,447        inv_context: InvocationContext,448        stdin=None,449        background=False,450    ) -> Tuple[bytes, bytes]:451        raise NotImplementedError452    def run_lambda_executor(self, lambda_function: LambdaFunction, inv_context: InvocationContext):453        env_vars = inv_context.environment454        runtime = lambda_function.runtime or ""455        event = inv_context.event456        stdin_str = None457        event_body = event if event is not None else env_vars.get("AWS_LAMBDA_EVENT_BODY")458        event_body = json.dumps(event_body) if isinstance(event_body, dict) else event_body459        event_body = event_body or ""460        is_large_event = len(event_body) > MAX_ENV_ARGS_LENGTH461        is_provided = runtime.startswith(LAMBDA_RUNTIME_PROVIDED)462        if (463            not is_large_event464            and lambda_function465            and is_provided466            and env_vars.get("DOCKER_LAMBDA_USE_STDIN") == "1"467        ):468            # Note: certain "provided" runtimes (e.g., Rust programs) can block if we pass in469            # the event payload via stdin, hence we rewrite the command to "echo ... | ..." below470            env_updates = {471                "AWS_LAMBDA_EVENT_BODY": to_str(472                    event_body473                ),  # Note: seems to be needed for provided runtimes!474                "DOCKER_LAMBDA_USE_STDIN": "1",475            }476            env_vars.update(env_updates)477            # Note: $AWS_LAMBDA_COGNITO_IDENTITY='{}' causes Rust Lambdas to hang478            env_vars.pop("AWS_LAMBDA_COGNITO_IDENTITY", None)479        if is_large_event:480            # in case of very large event payloads, we need to pass them via stdin481            LOG.debug(482                "Received large Lambda event payload (length %s) - passing via stdin"483                % len(event_body)484            )485            env_vars["DOCKER_LAMBDA_USE_STDIN"] = "1"486        if env_vars.get("DOCKER_LAMBDA_USE_STDIN") == "1":487            stdin_str = event_body488            if not is_provided:489                env_vars.pop("AWS_LAMBDA_EVENT_BODY", None)490        elif "AWS_LAMBDA_EVENT_BODY" not in env_vars:491            env_vars["AWS_LAMBDA_EVENT_BODY"] = to_str(event_body)492        # apply plugin patches493        result = self.apply_plugin_patches(inv_context)494        if isinstance(result, InvocationResult):495            return result496        if config.LAMBDA_DOCKER_FLAGS:497            inv_context.docker_flags = (498                f"{config.LAMBDA_DOCKER_FLAGS} {inv_context.docker_flags or ''}".strip()499            )500        event_stdin_bytes = stdin_str and to_bytes(stdin_str)501        error = None502        try:503            result, log_output = self.execute_in_container(504                lambda_function,505                inv_context,506                stdin=event_stdin_bytes,507            )508        except ContainerException as e:509            result = e.stdout or ""510            log_output = e.stderr or ""511            error = e512        try:513            result = to_str(result).strip()514        except Exception:515            pass516        log_output = to_str(log_output).strip()517        # Note: The user's code may have been logging to stderr, in which case the logs518        # will be part of the "result" variable here. Hence, make sure that we extract519        # only the *last* line of "result" and consider anything above that as log output.520        if isinstance(result, str) and "\n" in result:521            lines = result.split("\n")522            idx = last_index_of(523                lines, lambda line: line and not line.startswith(INTERNAL_LOG_PREFIX)524            )525            if idx >= 0:526                result = lines[idx]527                additional_logs = "\n".join(lines[:idx] + lines[idx + 1 :])528                log_output += "\n%s" % additional_logs529        log_formatted = log_output.strip().replace("\n", "\n> ")530        func_arn = lambda_function and lambda_function.arn()531        LOG.debug(532            "Lambda %s result / log output:\n%s\n> %s" % (func_arn, result.strip(), log_formatted)533        )534        # store log output - TODO get live logs from `process` above?535        store_lambda_logs(lambda_function, log_output)536        if error:537            raise InvocationException(538                "Lambda process returned with error. Result: %s. Output:\n%s"539                % (result, log_output),540                log_output,541                result,542            ) from error543        # create result544        invocation_result = InvocationResult(result, log_output=log_output)545        # run plugins post-processing logic546        invocation_result = self.process_result_via_plugins(inv_context, invocation_result)547        return invocation_result548    def prepare_event(self, environment: Dict, event_body: str) -> bytes:549        """Return the event as a stdin string."""550        # amend the environment variables for execution551        environment["AWS_LAMBDA_EVENT_BODY"] = event_body552        return event_body.encode()553    def _execute(self, lambda_function: LambdaFunction, inv_context: InvocationContext):554        runtime = lambda_function.runtime555        handler = lambda_function.handler556        environment = inv_context.environment = self._prepare_environment(lambda_function)557        event = inv_context.event558        context = inv_context.context559        # configure USE_SSL in environment560        if config.USE_SSL:561            environment["USE_SSL"] = "1"562        # prepare event body563        if not event:564            LOG.info(565                'Empty event body specified for invocation of Lambda "%s"' % lambda_function.arn()566            )567            event = {}568        event_body = json.dumps(json_safe(event))569        event_bytes_for_stdin = self.prepare_event(environment, event_body)570        inv_context.event = event_bytes_for_stdin571        Util.inject_endpoints_into_env(environment)572        environment["EDGE_PORT"] = str(config.EDGE_PORT)573        environment[LAMBDA_HANDLER_ENV_VAR_NAME] = handler574        if os.environ.get("HTTP_PROXY"):575            environment["HTTP_PROXY"] = os.environ["HTTP_PROXY"]576        if lambda_function.timeout:577            environment["AWS_LAMBDA_FUNCTION_TIMEOUT"] = str(lambda_function.timeout)578        if context:579            environment["AWS_LAMBDA_FUNCTION_NAME"] = context.function_name580            environment["AWS_LAMBDA_FUNCTION_VERSION"] = context.function_version581            environment["AWS_LAMBDA_FUNCTION_INVOKED_ARN"] = context.invoked_function_arn582            environment["AWS_LAMBDA_COGNITO_IDENTITY"] = json.dumps(context.cognito_identity or {})583            if context.client_context is not None:584                environment["AWS_LAMBDA_CLIENT_CONTEXT"] = json.dumps(585                    to_str(base64.b64decode(to_bytes(context.client_context)))586                )587        # pass JVM options to the Lambda environment, if configured588        if config.LAMBDA_JAVA_OPTS and is_java_lambda(runtime):589            if environment.get("JAVA_TOOL_OPTIONS"):590                LOG.info(591                    "Skip setting LAMBDA_JAVA_OPTS as JAVA_TOOL_OPTIONS already defined in Lambda env vars"592                )593            else:594                LOG.debug(595                    "Passing JVM options to container environment: JAVA_TOOL_OPTIONS=%s"596                    % config.LAMBDA_JAVA_OPTS597                )598                environment["JAVA_TOOL_OPTIONS"] = config.LAMBDA_JAVA_OPTS599        # accept any self-signed certificates for outgoing calls from the Lambda600        if is_nodejs_runtime(runtime):601            environment["NODE_TLS_REJECT_UNAUTHORIZED"] = "0"602        # run Lambda executor and fetch invocation result603        LOG.info("Running lambda: %s" % lambda_function.arn())604        result = self.run_lambda_executor(lambda_function=lambda_function, inv_context=inv_context)605        return result606    def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:607        if config.LAMBDA_REMOTE_DOCKER:608            LOG.info("TODO: copy file into container for LAMBDA_REMOTE_DOCKER=1 - %s", local_file)609            return local_file610        mountable_file = Util.get_host_path_for_path_in_docker(local_file)611        _, extension = os.path.splitext(local_file)612        target_file_name = f"{md5(local_file)}{extension}"613        target_path = f"/tmp/{target_file_name}"614        inv_context.docker_flags = inv_context.docker_flags or ""615        inv_context.docker_flags += f"-v {mountable_file}:{target_path}"616        return target_path617class LambdaExecutorReuseContainers(LambdaExecutorContainers):618    """Executor class for executing Lambda functions in re-usable Docker containers"""619    def __init__(self):620        super(LambdaExecutorReuseContainers, self).__init__()621        # locking thread for creation/destruction of docker containers.622        self.docker_container_lock = threading.RLock()623        # On each invocation we try to construct a port unlikely to conflict624        # with a previously invoked lambda function. This is a problem with at625        # least the lambci/lambda:go1.x container, which execs a go program that626        # attempts to bind to the same default port.627        self.next_port = 0628        self.max_port = LAMBDA_SERVER_UNIQUE_PORTS629        self.port_offset = LAMBDA_SERVER_PORT_OFFSET630    def execute_in_container(631        self,632        lambda_function: LambdaFunction,633        inv_context: InvocationContext,634        stdin=None,635        background=False,636    ) -> Tuple[bytes, bytes]:637        func_arn = lambda_function.arn()638        lambda_cwd = lambda_function.cwd639        runtime = lambda_function.runtime640        env_vars = inv_context.environment641        # check whether the Lambda has been invoked before642        has_been_invoked_before = func_arn in self.function_invoke_times643        # Choose a port for this invocation644        with self.docker_container_lock:645            env_vars["_LAMBDA_SERVER_PORT"] = str(self.next_port + self.port_offset)646            self.next_port = (self.next_port + 1) % self.max_port647        # create/verify the docker container is running.648        LOG.debug(649            'Priming docker container with runtime "%s" and arn "%s".',650            runtime,651            func_arn,652        )653        container_info = self.prime_docker_container(654            lambda_function, dict(env_vars), lambda_cwd, inv_context.docker_flags655        )656        if not inv_context.lambda_command and inv_context.handler:657            command = container_info.entry_point.split()658            command.append(inv_context.handler)659            inv_context.lambda_command = command660        # determine files to be copied into the container661        if not has_been_invoked_before and config.LAMBDA_REMOTE_DOCKER:662            # if this is the first invocation: copy the entire folder into the container663            DOCKER_CLIENT.copy_into_container(664                container_info.name, f"{lambda_cwd}/.", DOCKER_TASK_FOLDER665            )666        return DOCKER_CLIENT.exec_in_container(667            container_name_or_id=container_info.name,668            command=inv_context.lambda_command,669            interactive=True,670            env_vars=env_vars,671            stdin=stdin,672        )673    def _execute(self, func_arn, *args, **kwargs):674        if not LAMBDA_CONCURRENCY_LOCK.get(func_arn):675            concurrency_lock = threading.RLock()676            LAMBDA_CONCURRENCY_LOCK[func_arn] = concurrency_lock677        with LAMBDA_CONCURRENCY_LOCK[func_arn]:678            return super(LambdaExecutorReuseContainers, self)._execute(func_arn, *args, **kwargs)679    def startup(self):680        self.cleanup()681        # start a process to remove idle containers682        if config.LAMBDA_REMOVE_CONTAINERS:683            self.start_idle_container_destroyer_interval()684    def cleanup(self, arn=None):685        if arn:686            self.function_invoke_times.pop(arn, None)687            return self.destroy_docker_container(arn)688        self.function_invoke_times = {}689        return self.destroy_existing_docker_containers()690    def prime_docker_container(691        self,692        lambda_function: LambdaFunction,693        env_vars: Dict,694        lambda_cwd: str,695        docker_flags: str = None,696    ):697        """698        Prepares a persistent docker container for a specific function.699        :param lambda_function: The Details of the lambda function.700        :param env_vars: The environment variables for the lambda.701        :param lambda_cwd: The local directory containing the code for the lambda function.702        :return: ContainerInfo class containing the container name and default entry point.703        """704        with self.docker_container_lock:705            # Get the container name and id.706            func_arn = lambda_function.arn()707            container_name = self.get_container_name(func_arn)708            status = self.get_docker_container_status(func_arn)709            LOG.debug('Priming Docker container (status "%s"): %s' % (status, container_name))710            docker_image = Util.docker_image_for_lambda(lambda_function)711            # Container is not running or doesn't exist.712            if status < 1:713                # Make sure the container does not exist in any form/state.714                self.destroy_docker_container(func_arn)715                # get container startup command and run it716                LOG.debug("Creating container: %s" % container_name)717                self.create_container(lambda_function, env_vars, lambda_cwd, docker_flags)718                if config.LAMBDA_REMOTE_DOCKER:719                    LOG.debug(720                        'Copying files to container "%s" from "%s".' % (container_name, lambda_cwd)721                    )722                    DOCKER_CLIENT.copy_into_container(723                        container_name, "%s/." % lambda_cwd, DOCKER_TASK_FOLDER724                    )725                LOG.debug("Starting docker-reuse Lambda container: %s", container_name)726                DOCKER_CLIENT.start_container(container_name)727                # give the container some time to start up728                time.sleep(1)729            container_network = self.get_docker_container_network(func_arn)730            entry_point = DOCKER_CLIENT.get_image_entrypoint(docker_image)731            LOG.debug(732                'Using entrypoint "%s" for container "%s" on network "%s".'733                % (entry_point, container_name, container_network)734            )735            return ContainerInfo(container_name, entry_point)736    def create_container(737        self,738        lambda_function: LambdaFunction,739        env_vars: Dict,740        lambda_cwd: str,741        docker_flags: str = None,742    ):743        docker_image = Util.docker_image_for_lambda(lambda_function)744        container_name = self.get_container_name(lambda_function.arn())745        # make sure we set LOCALSTACK_HOSTNAME746        Util.inject_endpoints_into_env(env_vars)747        # make sure AWS_LAMBDA_EVENT_BODY is not set (otherwise causes issues with "docker exec ..." above)748        env_vars.pop("AWS_LAMBDA_EVENT_BODY", None)749        network = config.LAMBDA_DOCKER_NETWORK750        additional_flags = docker_flags751        dns = config.LAMBDA_DOCKER_DNS752        mount_volumes = not config.LAMBDA_REMOTE_DOCKER753        lambda_cwd_on_host = Util.get_host_path_for_path_in_docker(lambda_cwd)754        if ":" in lambda_cwd and "\\" in lambda_cwd:755            lambda_cwd_on_host = Util.format_windows_path(lambda_cwd_on_host)756        mount_volumes = [(lambda_cwd_on_host, DOCKER_TASK_FOLDER)] if mount_volumes else None757        if os.environ.get("HOSTNAME"):758            env_vars["HOSTNAME"] = os.environ.get("HOSTNAME")759        env_vars["EDGE_PORT"] = config.EDGE_PORT760        LOG.debug(761            "Creating docker-reuse Lambda container %s from image %s", container_name, docker_image762        )763        return DOCKER_CLIENT.create_container(764            image_name=docker_image,765            remove=True,766            interactive=True,767            name=container_name,768            entrypoint="/bin/bash",769            network=network,770            env_vars=env_vars,771            dns=dns,772            mount_volumes=mount_volumes,773            additional_flags=additional_flags,774        )775    def destroy_docker_container(self, func_arn):776        """777        Stops and/or removes a docker container for a specific lambda function ARN.778        :param func_arn: The ARN of the lambda function.779        :return: None780        """781        with self.docker_container_lock:782            status = self.get_docker_container_status(func_arn)783            # Get the container name and id.784            container_name = self.get_container_name(func_arn)785            if status == 1:786                LOG.debug("Stopping container: %s" % container_name)787                DOCKER_CLIENT.stop_container(container_name)788                status = self.get_docker_container_status(func_arn)789            if status == -1:790                LOG.debug("Removing container: %s" % container_name)791                rm_docker_container(container_name, safe=True)792            # clean up function invoke times, as some init logic depends on this793            self.function_invoke_times.pop(func_arn, None)794    def get_all_container_names(self):795        """796        Returns a list of container names for lambda containers.797        :return: A String[] localstack docker container names for each function.798        """799        with self.docker_container_lock:800            LOG.debug("Getting all lambda containers names.")801            list_result = DOCKER_CLIENT.list_containers(802                filter=f"name={self.get_container_prefix()}*"803            )804            container_names = list(map(lambda container: container["name"], list_result))805            return container_names806    def destroy_existing_docker_containers(self):807        """808        Stops and/or removes all lambda docker containers for localstack.809        :return: None810        """811        with self.docker_container_lock:812            container_names = self.get_all_container_names()813            LOG.debug("Removing %d containers." % len(container_names))814            for container_name in container_names:815                DOCKER_CLIENT.remove_container(container_name)816    def get_docker_container_status(self, func_arn):817        """818        Determine the status of a docker container.819        :param func_arn: The ARN of the lambda function.820        :return: 1 If the container is running,821        -1 if the container exists but is not running822        0 if the container does not exist.823        """824        with self.docker_container_lock:825            # Get the container name and id.826            container_name = self.get_container_name(func_arn)827            container_status = DOCKER_CLIENT.get_container_status(container_name)828            return container_status.value829    def get_docker_container_network(self, func_arn):830        """831        Determine the network of a docker container.832        :param func_arn: The ARN of the lambda function.833        :return: name of the container network834        """835        with self.docker_container_lock:836            status = self.get_docker_container_status(func_arn)837            # container does not exist838            if status == 0:839                return ""840            # Get the container name.841            container_name = self.get_container_name(func_arn)842            container_network = DOCKER_CLIENT.get_network(container_name)843            return container_network844    def idle_container_destroyer(self):845        """846        Iterates though all the lambda containers and destroys any container that has847        been inactive for longer than MAX_CONTAINER_IDLE_TIME_MS.848        :return: None849        """850        LOG.debug("Checking if there are idle containers ...")851        current_time = int(time.time() * 1000)852        for func_arn, last_run_time in dict(self.function_invoke_times).items():853            duration = current_time - last_run_time854            # not enough idle time has passed855            if duration < MAX_CONTAINER_IDLE_TIME_MS:856                continue857            # container has been idle, destroy it.858            self.destroy_docker_container(func_arn)859    def start_idle_container_destroyer_interval(self):860        """861        Starts a repeating timer that triggers start_idle_container_destroyer_interval every 60 seconds.862        Thus checking for idle containers and destroying them.863        :return: None864        """865        self.idle_container_destroyer()866        threading.Timer(60.0, self.start_idle_container_destroyer_interval).start()867    def get_container_prefix(self) -> str:868        """869        Returns the prefix of all docker-reuse lambda containers for this LocalStack instance870        :return: Lambda container name prefix871        """872        return f"{bootstrap.get_main_container_name()}_lambda_"873    def get_container_name(self, func_arn):874        """875        Given a function ARN, returns a valid docker container name.876        :param func_arn: The ARN of the lambda function.877        :return: A docker compatible name for the arn.878        """879        return self.get_container_prefix() + re.sub(r"[^a-zA-Z0-9_.-]", "_", func_arn)880class LambdaExecutorSeparateContainers(LambdaExecutorContainers):881    def __init__(self):882        super(LambdaExecutorSeparateContainers, self).__init__()883        self.max_port = LAMBDA_API_UNIQUE_PORTS884        self.port_offset = LAMBDA_API_PORT_OFFSET885    def prepare_event(self, environment: Dict, event_body: str) -> bytes:886        # Tell Lambci to use STDIN for the event887        environment["DOCKER_LAMBDA_USE_STDIN"] = "1"888        return event_body.encode()889    def execute_in_container(890        self,891        lambda_function: LambdaFunction,892        inv_context: InvocationContext,893        stdin=None,894        background=False,895    ) -> Tuple[bytes, bytes]:896        lambda_cwd = lambda_function.cwd897        env_vars = inv_context.environment898        entrypoint = None899        if inv_context.lambda_command:900            entrypoint = ""901        elif inv_context.handler:902            inv_context.lambda_command = inv_context.handler903        # add Docker Lambda env vars904        network = config.LAMBDA_DOCKER_NETWORK or None905        if network == "host":906            port = get_free_tcp_port()907            env_vars["DOCKER_LAMBDA_API_PORT"] = port908            env_vars["DOCKER_LAMBDA_RUNTIME_PORT"] = port909        additional_flags = inv_context.docker_flags or ""910        dns = config.LAMBDA_DOCKER_DNS911        docker_java_ports = PortMappings()912        if Util.debug_java_port:913            docker_java_ports.add(Util.debug_java_port)914        docker_image = Util.docker_image_for_lambda(lambda_function)915        if config.LAMBDA_REMOTE_DOCKER:916            container_id = DOCKER_CLIENT.create_container(917                image_name=docker_image,918                interactive=True,919                entrypoint=entrypoint,920                remove=True,921                network=network,922                env_vars=env_vars,923                dns=dns,924                additional_flags=additional_flags,925                ports=docker_java_ports,926                command=inv_context.lambda_command,927            )928            DOCKER_CLIENT.copy_into_container(container_id, f"{lambda_cwd}/.", DOCKER_TASK_FOLDER)929            return DOCKER_CLIENT.start_container(930                container_id, interactive=not background, attach=not background, stdin=stdin931            )932        else:933            mount_volumes = None934            if lambda_cwd:935                mount_volumes = [936                    (Util.get_host_path_for_path_in_docker(lambda_cwd), DOCKER_TASK_FOLDER)937                ]938            return DOCKER_CLIENT.run_container(939                image_name=docker_image,940                interactive=True,941                detach=background,942                entrypoint=entrypoint,943                remove=True,944                network=network,945                env_vars=env_vars,946                dns=dns,947                additional_flags=additional_flags,948                command=inv_context.lambda_command,949                mount_volumes=mount_volumes,950                stdin=stdin,951            )952class LambdaExecutorLocal(LambdaExecutor):953    def _execute_in_custom_runtime(954        self, cmd: Union[str, List[str]], lambda_function: LambdaFunction = None955    ) -> InvocationResult:956        """957        Generic run function for executing lambdas in custom runtimes.958        :param cmd: the command to execute959        :param lambda_function: function details960        :return: the InvocationResult961        """962        env_vars = lambda_function and lambda_function.envvars963        kwargs = {"stdin": True, "inherit_env": True, "asynchronous": True, "env_vars": env_vars}964        process = run(cmd, stderr=subprocess.PIPE, outfile=subprocess.PIPE, **kwargs)965        result, log_output = process.communicate()966        try:967            result = to_str(result).strip()968        except Exception:969            pass970        log_output = to_str(log_output).strip()971        return_code = process.returncode972        # Note: The user's code may have been logging to stderr, in which case the logs973        # will be part of the "result" variable here. Hence, make sure that we extract974        # only the *last* line of "result" and consider anything above that as log output.975        # TODO: not sure if this code is needed/used976        if isinstance(result, str) and "\n" in result:977            lines = result.split("\n")978            idx = last_index_of(979                lines, lambda line: line and not line.startswith(INTERNAL_LOG_PREFIX)980            )981            if idx >= 0:982                result = lines[idx]983                additional_logs = "\n".join(lines[:idx] + lines[idx + 1 :])984                log_output += "\n%s" % additional_logs985        log_formatted = log_output.strip().replace("\n", "\n> ")986        func_arn = lambda_function and lambda_function.arn()987        LOG.debug(988            "Lambda %s result / log output:\n%s\n> %s" % (func_arn, result.strip(), log_formatted)989        )990        # store log output - TODO get live logs from `process` above?991        # store_lambda_logs(lambda_function, log_output)992        if return_code != 0:993            raise InvocationException(994                "Lambda process returned error status code: %s. Result: %s. Output:\n%s"995                % (return_code, result, log_output),996                log_output,997                result,998            )999        invocation_result = InvocationResult(result, log_output=log_output)1000        return invocation_result1001    def _execute(1002        self, lambda_function: LambdaFunction, inv_context: InvocationContext1003    ) -> InvocationResult:1004        # apply plugin patches to prepare invocation context1005        result = self.apply_plugin_patches(inv_context)1006        if isinstance(result, InvocationResult):1007            return result1008        lambda_cwd = lambda_function.cwd1009        environment = self._prepare_environment(lambda_function)1010        if lambda_function.timeout:1011            environment["AWS_LAMBDA_FUNCTION_TIMEOUT"] = str(lambda_function.timeout)1012        context = inv_context.context1013        if context:1014            environment["AWS_LAMBDA_FUNCTION_NAME"] = context.function_name1015            environment["AWS_LAMBDA_FUNCTION_VERSION"] = context.function_version1016            environment["AWS_LAMBDA_FUNCTION_INVOKED_ARN"] = context.invoked_function_arn1017            environment["AWS_LAMBDA_FUNCTION_MEMORY_SIZE"] = str(context.memory_limit_in_mb)1018        # execute the Lambda function in a forked sub-process, sync result via queue1019        queue = Queue()1020        lambda_function_callable = lambda_function.function(inv_context.function_version)1021        def do_execute():1022            # now we're executing in the child process, safe to change CWD and ENV1023            result = None1024            try:1025                if lambda_cwd:1026                    os.chdir(lambda_cwd)1027                    sys.path.insert(0, "")1028                if environment:1029                    os.environ.update(environment)1030                # set default env variables required for most Lambda handlers1031                self.set_default_env_variables()1032                # run the actual handler function1033                result = lambda_function_callable(inv_context.event, context)1034            except Exception as e:1035                result = str(e)1036                sys.stderr.write("%s %s" % (e, traceback.format_exc()))1037                raise1038            finally:1039                queue.put(result)1040        process = Process(target=do_execute)1041        start_time = now(millis=True)1042        error = None1043        with CaptureOutput() as c:1044            try:1045                process.run()1046            except Exception as e:1047                error = e1048        result = queue.get()1049        end_time = now(millis=True)1050        # Make sure to keep the log line below, to ensure the log stream gets created1051        request_id = long_uid()1052        log_output = 'START %s: Lambda %s started via "local" executor ...' % (1053            request_id,1054            lambda_function.arn(),1055        )1056        # TODO: Interweaving stdout/stderr currently not supported1057        for stream in (c.stdout(), c.stderr()):1058            if stream:1059                log_output += ("\n" if log_output else "") + stream1060        if isinstance(result, InvocationResult) and result.log_output:1061            log_output += "\n" + result.log_output1062        log_output += "\nEND RequestId: %s" % request_id1063        log_output += "\nREPORT RequestId: %s Duration: %s ms" % (1064            request_id,1065            int((end_time - start_time) * 1000),1066        )1067        # store logs to CloudWatch1068        store_lambda_logs(lambda_function, log_output)1069        result = result.result if isinstance(result, InvocationResult) else result1070        if error:1071            LOG.info(1072                'Error executing Lambda "%s": %s %s',1073                lambda_function.arn(),1074                error,1075                "".join(traceback.format_tb(error.__traceback__)),1076            )1077            raise InvocationException(result, log_output)1078        # construct final invocation result1079        invocation_result = InvocationResult(result, log_output=log_output)1080        # run plugins post-processing logic1081        invocation_result = self.process_result_via_plugins(inv_context, invocation_result)1082        return invocation_result1083    def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:1084        # This is a no-op for local executors - simply return the given local file path1085        return local_file1086    def execute_java_lambda(1087        self, event, context, main_file, lambda_function: LambdaFunction = None1088    ) -> InvocationResult:1089        lambda_function.envvars = lambda_function.envvars or {}1090        java_opts = config.LAMBDA_JAVA_OPTS or ""1091        handler = lambda_function.handler1092        lambda_function.envvars[LAMBDA_HANDLER_ENV_VAR_NAME] = handler1093        event_file = EVENT_FILE_PATTERN.replace("*", short_uid())1094        save_file(event_file, json.dumps(json_safe(event)))1095        TMP_FILES.append(event_file)1096        classpath = "%s:%s:%s" % (1097            main_file,1098            Util.get_java_classpath(main_file),1099            LAMBDA_EXECUTOR_JAR,1100        )1101        cmd = "java %s -cp %s %s %s" % (1102            java_opts,1103            classpath,1104            LAMBDA_EXECUTOR_CLASS,1105            event_file,1106        )1107        # apply plugin patches1108        inv_context = InvocationContext(1109            lambda_function, event, environment=lambda_function.envvars, lambda_command=cmd1110        )1111        result = self.apply_plugin_patches(inv_context)1112        if isinstance(result, InvocationResult):1113            return result1114        cmd = inv_context.lambda_command1115        LOG.info(cmd)1116        # execute Lambda and get invocation result1117        invocation_result = self._execute_in_custom_runtime(cmd, lambda_function=lambda_function)1118        return invocation_result1119    def execute_javascript_lambda(1120        self, event, context, main_file, lambda_function: LambdaFunction = None1121    ):1122        handler = lambda_function.handler1123        function = handler.split(".")[-1]1124        event_json_string = "%s" % (json.dumps(json_safe(event)) if event else "{}")1125        context_json_string = "%s" % (json.dumps(context.__dict__) if context else "{}")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
