Best Python code snippet using localstack_python
lambda_executors.py
Source:lambda_executors.py  
...566        output = OutputLog(result, log_output)567        LOG.debug(568            f"Lambda {func_arn} result / log output:"569            f"\n{output.stdout_formatted()}"570            f"\n>{output.stderr_formatted()}"571        )572        # store log output - TODO get live logs from `process` above?573        store_lambda_logs(lambda_function, log_output)574        if error:575            output.output_file()576            raise InvocationException(577                "Lambda process returned with error. Result: %s. Output:\n%s"578                % (result, log_output),579                log_output,580                result,581            ) from error582        # create result583        invocation_result = InvocationResult(result, log_output=log_output)584        # run plugins post-processing logic585        invocation_result = self.process_result_via_plugins(inv_context, invocation_result)586        return invocation_result587    def prepare_event(self, environment: Dict, event_body: str) -> bytes:588        """Return the event as a stdin string."""589        # amend the environment variables for execution590        environment["AWS_LAMBDA_EVENT_BODY"] = event_body591        return event_body.encode()592    def _execute(self, lambda_function: LambdaFunction, inv_context: InvocationContext):593        runtime = lambda_function.runtime594        handler = lambda_function.handler595        environment = inv_context.environment = self._prepare_environment(lambda_function)596        event = inv_context.event597        context = inv_context.context598        # configure USE_SSL in environment599        if config.USE_SSL:600            environment["USE_SSL"] = "1"601        # prepare event body602        if not event:603            LOG.info(604                'Empty event body specified for invocation of Lambda "%s"', lambda_function.arn()605            )606            event = {}607        event_body = json.dumps(json_safe(event))608        event_bytes_for_stdin = self.prepare_event(environment, event_body)609        inv_context.event = event_bytes_for_stdin610        Util.inject_endpoints_into_env(environment)611        environment["EDGE_PORT"] = str(config.EDGE_PORT)612        environment[LAMBDA_HANDLER_ENV_VAR_NAME] = handler613        if os.environ.get("HTTP_PROXY"):614            environment["HTTP_PROXY"] = os.environ["HTTP_PROXY"]615        if lambda_function.timeout:616            environment["AWS_LAMBDA_FUNCTION_TIMEOUT"] = str(lambda_function.timeout)617        if context:618            environment["AWS_LAMBDA_FUNCTION_NAME"] = context.function_name619            environment["AWS_LAMBDA_FUNCTION_VERSION"] = context.function_version620            environment["AWS_LAMBDA_FUNCTION_INVOKED_ARN"] = context.invoked_function_arn621            if context.cognito_identity:622                environment["AWS_LAMBDA_COGNITO_IDENTITY"] = json.dumps(context.cognito_identity)623            if context.client_context is not None:624                environment["AWS_LAMBDA_CLIENT_CONTEXT"] = json.dumps(625                    to_str(base64.b64decode(to_bytes(context.client_context)))626                )627        # pass JVM options to the Lambda environment, if configured628        if config.LAMBDA_JAVA_OPTS and is_java_lambda(runtime):629            if environment.get("JAVA_TOOL_OPTIONS"):630                LOG.info(631                    "Skip setting LAMBDA_JAVA_OPTS as JAVA_TOOL_OPTIONS already defined in Lambda env vars"632                )633            else:634                LOG.debug(635                    "Passing JVM options to container environment: JAVA_TOOL_OPTIONS=%s",636                    config.LAMBDA_JAVA_OPTS,637                )638                environment["JAVA_TOOL_OPTIONS"] = config.LAMBDA_JAVA_OPTS639        # accept any self-signed certificates for outgoing calls from the Lambda640        if is_nodejs_runtime(runtime):641            environment["NODE_TLS_REJECT_UNAUTHORIZED"] = "0"642        # run Lambda executor and fetch invocation result643        LOG.info("Running lambda: %s", lambda_function.arn())644        result = self.run_lambda_executor(lambda_function=lambda_function, inv_context=inv_context)645        return result646    def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:647        if config.LAMBDA_REMOTE_DOCKER:648            LOG.info("TODO: copy file into container for LAMBDA_REMOTE_DOCKER=1 - %s", local_file)649            return local_file650        mountable_file = Util.get_host_path_for_path_in_docker(local_file)651        _, extension = os.path.splitext(local_file)652        target_file_name = f"{md5(local_file)}{extension}"653        target_path = f"/tmp/{target_file_name}"654        inv_context.docker_flags = inv_context.docker_flags or ""655        inv_context.docker_flags += f"-v {mountable_file}:{target_path}"656        return target_path657class LambdaExecutorReuseContainers(LambdaExecutorContainers):658    """Executor class for executing Lambda functions in re-usable Docker containers"""659    def __init__(self):660        super(LambdaExecutorReuseContainers, self).__init__()661        # locking thread for creation/destruction of docker containers.662        self.docker_container_lock = threading.RLock()663        # On each invocation we try to construct a port unlikely to conflict664        # with a previously invoked lambda function. This is a problem with at665        # least the lambci/lambda:go1.x container, which execs a go program that666        # attempts to bind to the same default port.667        self.next_port = 0668        self.max_port = LAMBDA_SERVER_UNIQUE_PORTS669        self.port_offset = LAMBDA_SERVER_PORT_OFFSET670    def execute_in_container(671        self,672        lambda_function: LambdaFunction,673        inv_context: InvocationContext,674        stdin=None,675        background=False,676    ) -> Tuple[bytes, bytes]:677        func_arn = lambda_function.arn()678        lambda_cwd = lambda_function.cwd679        runtime = lambda_function.runtime680        env_vars = inv_context.environment681        # Choose a port for this invocation682        with self.docker_container_lock:683            env_vars["_LAMBDA_SERVER_PORT"] = str(self.next_port + self.port_offset)684            self.next_port = (self.next_port + 1) % self.max_port685        # create/verify the docker container is running.686        LOG.debug(687            'Priming docker container with runtime "%s" and arn "%s".',688            runtime,689            func_arn,690        )691        container_info = self.prime_docker_container(692            lambda_function, dict(env_vars), lambda_cwd, inv_context.docker_flags693        )694        if not inv_context.lambda_command and inv_context.handler:695            command = shlex.split(container_info.entry_point)696            command.append(inv_context.handler)697            inv_context.lambda_command = command698        lambda_docker_ip = DOCKER_CLIENT.get_container_ip(container_info.name)699        if not self._should_use_stay_open_mode(lambda_function, lambda_docker_ip, check_port=True):700            LOG.debug("Using 'docker exec' to run invocation in docker-reuse Lambda container")701            # disable stay open mode for this one, to prevent starting runtime API server702            env_vars["DOCKER_LAMBDA_STAY_OPEN"] = None703            return DOCKER_CLIENT.exec_in_container(704                container_name_or_id=container_info.name,705                command=inv_context.lambda_command,706                interactive=True,707                env_vars=env_vars,708                stdin=stdin,709            )710        inv_result = self.invoke_lambda(lambda_function, inv_context, lambda_docker_ip)711        return (inv_result.result, inv_result.log_output)712    def invoke_lambda(713        self,714        lambda_function: LambdaFunction,715        inv_context: InvocationContext,716        lambda_docker_ip=None,717    ) -> InvocationResult:718        full_url = self._get_lambda_stay_open_url(lambda_docker_ip)719        client = aws_stack.connect_to_service("lambda", endpoint_url=full_url)720        event = inv_context.event or "{}"721        LOG.debug(f"Calling {full_url} to run invocation in docker-reuse Lambda container")722        response = client.invoke(723            FunctionName=lambda_function.name(),724            InvocationType=inv_context.invocation_type,725            Payload=to_bytes(event),726            LogType="Tail",727        )728        log_output = base64.b64decode(response.get("LogResult") or b"").decode("utf-8")729        result = response["Payload"].read().decode("utf-8")730        if "FunctionError" in response:731            raise InvocationException(732                "Lambda process returned with error. Result: %s. Output:\n%s"733                % (result, log_output),734                log_output,735                result,736            )737        return InvocationResult(result, log_output)738    def _should_use_stay_open_mode(739        self,740        lambda_function: LambdaFunction,741        lambda_docker_ip: Optional[str] = None,742        check_port: bool = False,743    ) -> bool:744        """Return whether to use stay-open execution mode - if we're running in Docker, the given IP745        is defined, and if the target API endpoint is available (optionally, if check_port is True)."""746        if not lambda_docker_ip:747            func_arn = lambda_function.arn()748            container_name = self.get_container_name(func_arn)749            lambda_docker_ip = DOCKER_CLIENT.get_container_ip(container_name_or_id=container_name)750        should_use = lambda_docker_ip and config.LAMBDA_STAY_OPEN_MODE751        if not should_use or not check_port:752            return should_use753        full_url = self._get_lambda_stay_open_url(lambda_docker_ip)754        return is_port_open(full_url)755    def _get_lambda_stay_open_url(self, lambda_docker_ip: str) -> str:756        return f"http://{lambda_docker_ip}:{STAY_OPEN_API_PORT}"757    def _execute(self, func_arn: str, *args, **kwargs) -> InvocationResult:758        if not LAMBDA_CONCURRENCY_LOCK.get(func_arn):759            concurrency_lock = threading.RLock()760            LAMBDA_CONCURRENCY_LOCK[func_arn] = concurrency_lock761        with LAMBDA_CONCURRENCY_LOCK[func_arn]:762            return super(LambdaExecutorReuseContainers, self)._execute(func_arn, *args, **kwargs)763    def startup(self):764        self.cleanup()765        # start a process to remove idle containers766        if config.LAMBDA_REMOVE_CONTAINERS:767            self.start_idle_container_destroyer_interval()768    def cleanup(self, arn: str = None):769        if arn:770            self.function_invoke_times.pop(arn, None)771            return self.destroy_docker_container(arn)772        self.function_invoke_times = {}773        return self.destroy_existing_docker_containers()774    def prime_docker_container(775        self,776        lambda_function: LambdaFunction,777        env_vars: Dict,778        lambda_cwd: str,779        docker_flags: str = None,780    ):781        """782        Prepares a persistent docker container for a specific function.783        :param lambda_function: The Details of the lambda function.784        :param env_vars: The environment variables for the lambda.785        :param lambda_cwd: The local directory containing the code for the lambda function.786        :return: ContainerInfo class containing the container name and default entry point.787        """788        with self.docker_container_lock:789            # Get the container name and id.790            func_arn = lambda_function.arn()791            container_name = self.get_container_name(func_arn)792            status = self.get_docker_container_status(func_arn)793            LOG.debug('Priming Docker container (status "%s"): %s', status, container_name)794            docker_image = Util.docker_image_for_lambda(lambda_function)795            # Container is not running or doesn't exist.796            if status < 1:797                # Make sure the container does not exist in any form/state.798                self.destroy_docker_container(func_arn)799                # get container startup command and run it800                LOG.debug("Creating container: %s", container_name)801                self.create_container(lambda_function, env_vars, lambda_cwd, docker_flags)802                LOG.debug("Starting docker-reuse Lambda container: %s", container_name)803                DOCKER_CLIENT.start_container(container_name)804                def wait_up():805                    cont_status = DOCKER_CLIENT.get_container_status(container_name)806                    assert cont_status == DockerContainerStatus.UP807                    if not in_docker():808                        return809                    # if we're executing in Docker using stay-open mode, additionally check if the target is available810                    lambda_docker_ip = DOCKER_CLIENT.get_container_ip(container_name)811                    if self._should_use_stay_open_mode(lambda_function, lambda_docker_ip):812                        full_url = self._get_lambda_stay_open_url(lambda_docker_ip)813                        wait_for_port_open(full_url, sleep_time=0.5, retries=8)814                # give the container some time to start up815                retry(wait_up, retries=15, sleep=0.8)816            container_network = self.get_docker_container_network(func_arn)817            entry_point = DOCKER_CLIENT.get_image_entrypoint(docker_image)818            LOG.debug(819                'Using entrypoint "%s" for container "%s" on network "%s".',820                entry_point,821                container_name,822                container_network,823            )824            return ContainerInfo(container_name, entry_point)825    def create_container(826        self,827        lambda_function: LambdaFunction,828        env_vars: Dict,829        lambda_cwd: str,830        docker_flags: str = None,831    ):832        docker_image = Util.docker_image_for_lambda(lambda_function)833        container_config = LambdaContainerConfiguration(image_name=docker_image)834        container_config.name = self.get_container_name(lambda_function.arn())835        # make sure AWS_LAMBDA_EVENT_BODY is not set (otherwise causes issues with "docker exec ..." above)836        env_vars.pop("AWS_LAMBDA_EVENT_BODY", None)837        container_config.env_vars = env_vars838        container_config.network = get_container_network_for_lambda()839        container_config.additional_flags = docker_flags840        container_config.dns = config.LAMBDA_DOCKER_DNS841        if lambda_cwd:842            if config.LAMBDA_REMOTE_DOCKER:843                container_config.required_files.append((f"{lambda_cwd}/.", DOCKER_TASK_FOLDER))844            else:845                lambda_cwd_on_host = Util.get_host_path_for_path_in_docker(lambda_cwd)846                # TODO not necessary after Windows 10. Should be deprecated and removed in the future847                if ":" in lambda_cwd and "\\" in lambda_cwd:848                    lambda_cwd_on_host = Util.format_windows_path(lambda_cwd_on_host)849                container_config.required_files.append((lambda_cwd_on_host, DOCKER_TASK_FOLDER))850        container_config.entrypoint = "/bin/bash"851        container_config.interactive = True852        if config.LAMBDA_STAY_OPEN_MODE:853            container_config.env_vars["DOCKER_LAMBDA_STAY_OPEN"] = "1"854            # clear docker lambda use stdin since not relevant with stay open855            container_config.env_vars.pop("DOCKER_LAMBDA_USE_STDIN", None)856            container_config.entrypoint = None857            container_config.command = [lambda_function.handler]858            container_config.interactive = False859        # default settings860        container_config.remove = True861        container_config.detach = True862        on_docker_reuse_container_creation.run(lambda_function, container_config)863        if not config.LAMBDA_REMOTE_DOCKER and container_config.required_files:864            container_config.volumes = container_config.required_files865        LOG.debug(866            "Creating docker-reuse Lambda container %s from image %s",867            container_config.name,868            container_config.image_name,869        )870        container_id = DOCKER_CLIENT.create_container(871            image_name=container_config.image_name,872            remove=container_config.remove,873            interactive=container_config.interactive,874            detach=container_config.detach,875            name=container_config.name,876            entrypoint=container_config.entrypoint,877            command=container_config.command,878            network=container_config.network,879            env_vars=container_config.env_vars,880            dns=container_config.dns,881            mount_volumes=container_config.volumes,882            additional_flags=container_config.additional_flags,883            workdir=container_config.workdir,884            user=container_config.user,885            cap_add=container_config.cap_add,886        )887        if config.LAMBDA_REMOTE_DOCKER and container_config.required_files:888            for source, target in container_config.required_files:889                LOG.debug('Copying "%s" to "%s:%s".', source, container_config.name, target)890                DOCKER_CLIENT.copy_into_container(container_config.name, source, target)891        return container_id892    def destroy_docker_container(self, func_arn):893        """894        Stops and/or removes a docker container for a specific lambda function ARN.895        :param func_arn: The ARN of the lambda function.896        :return: None897        """898        with self.docker_container_lock:899            status = self.get_docker_container_status(func_arn)900            # Get the container name and id.901            container_name = self.get_container_name(func_arn)902            if status == 1:903                LOG.debug("Stopping container: %s", container_name)904                DOCKER_CLIENT.stop_container(container_name)905                status = self.get_docker_container_status(func_arn)906            if status == -1:907                LOG.debug("Removing container: %s", container_name)908                rm_docker_container(container_name, safe=True)909            # clean up function invoke times, as some init logic depends on this910            self.function_invoke_times.pop(func_arn, None)911    def get_all_container_names(self):912        """913        Returns a list of container names for lambda containers.914        :return: A String[] localstack docker container names for each function.915        """916        with self.docker_container_lock:917            LOG.debug("Getting all lambda containers names.")918            list_result = DOCKER_CLIENT.list_containers(919                filter=f"name={self.get_container_prefix()}*"920            )921            container_names = list(map(lambda container: container["name"], list_result))922            return container_names923    def destroy_existing_docker_containers(self):924        """925        Stops and/or removes all lambda docker containers for localstack.926        :return: None927        """928        with self.docker_container_lock:929            container_names = self.get_all_container_names()930            LOG.debug("Removing %d containers.", len(container_names))931            for container_name in container_names:932                DOCKER_CLIENT.remove_container(container_name)933    def get_docker_container_status(self, func_arn):934        """935        Determine the status of a docker container.936        :param func_arn: The ARN of the lambda function.937        :return: 1 If the container is running,938        -1 if the container exists but is not running939        0 if the container does not exist.940        """941        with self.docker_container_lock:942            # Get the container name and id.943            container_name = self.get_container_name(func_arn)944            container_status = DOCKER_CLIENT.get_container_status(container_name)945            return container_status.value946    def get_docker_container_network(self, func_arn):947        """948        Determine the network of a docker container.949        :param func_arn: The ARN of the lambda function.950        :return: name of the container network951        """952        with self.docker_container_lock:953            status = self.get_docker_container_status(func_arn)954            # container does not exist955            if status == 0:956                return ""957            # Get the container name.958            container_name = self.get_container_name(func_arn)959            container_network = DOCKER_CLIENT.get_networks(container_name)[0]960            return container_network961    def idle_container_destroyer(self):962        """963        Iterates though all the lambda containers and destroys any container that has964        been inactive for longer than MAX_CONTAINER_IDLE_TIME_MS.965        :return: None966        """967        LOG.debug("Checking if there are idle containers ...")968        current_time = int(time.time() * 1000)969        for func_arn, last_run_time in dict(self.function_invoke_times).items():970            duration = current_time - last_run_time971            # not enough idle time has passed972            if duration < MAX_CONTAINER_IDLE_TIME_MS:973                continue974            # container has been idle, destroy it.975            self.destroy_docker_container(func_arn)976    def start_idle_container_destroyer_interval(self):977        """978        Starts a repeating timer that triggers start_idle_container_destroyer_interval every 60 seconds.979        Thus checking for idle containers and destroying them.980        :return: None981        """982        self.idle_container_destroyer()983        threading.Timer(60.0, self.start_idle_container_destroyer_interval).start()984    def get_container_prefix(self) -> str:985        """986        Returns the prefix of all docker-reuse lambda containers for this LocalStack instance987        :return: Lambda container name prefix988        """989        return f"{bootstrap.get_main_container_name()}_lambda_"990    def get_container_name(self, func_arn):991        """992        Given a function ARN, returns a valid docker container name.993        :param func_arn: The ARN of the lambda function.994        :return: A docker compatible name for the arn.995        """996        return self.get_container_prefix() + re.sub(r"[^a-zA-Z0-9_.-]", "_", func_arn)997class LambdaExecutorSeparateContainers(LambdaExecutorContainers):998    def __init__(self):999        super(LambdaExecutorSeparateContainers, self).__init__()1000        self.max_port = LAMBDA_API_UNIQUE_PORTS1001        self.port_offset = LAMBDA_API_PORT_OFFSET1002    def prepare_event(self, environment: Dict, event_body: str) -> bytes:1003        # Tell Lambci to use STDIN for the event1004        environment["DOCKER_LAMBDA_USE_STDIN"] = "1"1005        return event_body.encode()1006    def execute_in_container(1007        self,1008        lambda_function: LambdaFunction,1009        inv_context: InvocationContext,1010        stdin=None,1011        background=False,1012    ) -> Tuple[bytes, bytes]:1013        docker_image = Util.docker_image_for_lambda(lambda_function)1014        container_config = LambdaContainerConfiguration(image_name=docker_image)1015        container_config.env_vars = inv_context.environment1016        if inv_context.lambda_command:1017            container_config.entrypoint = ""1018        elif inv_context.handler:1019            inv_context.lambda_command = inv_context.handler1020        # add Docker Lambda env vars1021        container_config.network = get_container_network_for_lambda()1022        if container_config.network == "host":1023            port = get_free_tcp_port()1024            container_config.env_vars["DOCKER_LAMBDA_API_PORT"] = port1025            container_config.env_vars["DOCKER_LAMBDA_RUNTIME_PORT"] = port1026        container_config.additional_flags = inv_context.docker_flags or ""1027        container_config.dns = config.LAMBDA_DOCKER_DNS1028        container_config.ports = PortMappings()1029        if Util.debug_java_port:1030            container_config.ports.add(Util.debug_java_port)1031        container_config.command = inv_context.lambda_command1032        container_config.remove = True1033        container_config.interactive = True1034        container_config.detach = background1035        lambda_cwd = lambda_function.cwd1036        if lambda_cwd:1037            if config.LAMBDA_REMOTE_DOCKER:1038                container_config.required_files.append((f"{lambda_cwd}/.", DOCKER_TASK_FOLDER))1039            else:1040                container_config.required_files.append(1041                    (Util.get_host_path_for_path_in_docker(lambda_cwd), DOCKER_TASK_FOLDER)1042                )1043        # running hooks to modify execution parameters1044        on_docker_separate_execution.run(lambda_function, container_config)1045        # actual execution1046        # TODO make container client directly accept ContainerConfiguration (?)1047        if not config.LAMBDA_REMOTE_DOCKER and container_config.required_files:1048            container_config.volumes = container_config.volumes or []1049            container_config.volumes += container_config.required_files1050        container_id = DOCKER_CLIENT.create_container(1051            image_name=container_config.image_name,1052            interactive=container_config.interactive,1053            entrypoint=container_config.entrypoint,1054            remove=container_config.remove,1055            network=container_config.network,1056            env_vars=container_config.env_vars,1057            dns=container_config.dns,1058            additional_flags=container_config.additional_flags,1059            ports=container_config.ports,1060            command=container_config.command,1061            mount_volumes=container_config.volumes,1062            workdir=container_config.workdir,1063            user=container_config.user,1064            cap_add=container_config.cap_add,1065        )1066        if config.LAMBDA_REMOTE_DOCKER:1067            for source, target in container_config.required_files:1068                DOCKER_CLIENT.copy_into_container(container_id, source, target)1069        return DOCKER_CLIENT.start_container(1070            container_id,1071            interactive=not container_config.detach,1072            attach=not container_config.detach,1073            stdin=stdin,1074        )1075class LambdaExecutorLocal(LambdaExecutor):1076    # maps functionARN -> functionVersion -> callable used to invoke a Lambda function locally1077    FUNCTION_CALLABLES: Dict[str, Dict[str, Callable]] = {}1078    def _execute_in_custom_runtime(1079        self, cmd: Union[str, List[str]], lambda_function: LambdaFunction = None1080    ) -> InvocationResult:1081        """1082        Generic run function for executing lambdas in custom runtimes.1083        :param cmd: the command to execute1084        :param lambda_function: function details1085        :return: the InvocationResult1086        """1087        env_vars = lambda_function and lambda_function.envvars1088        kwargs = {"stdin": True, "inherit_env": True, "asynchronous": True, "env_vars": env_vars}1089        process = run(cmd, stderr=subprocess.PIPE, outfile=subprocess.PIPE, **kwargs)1090        result, log_output = process.communicate()1091        try:1092            result = to_str(result).strip()1093        except Exception:1094            pass1095        log_output = to_str(log_output).strip()1096        return_code = process.returncode1097        # Note: The user's code may have been logging to stderr, in which case the logs1098        # will be part of the "result" variable here. Hence, make sure that we extract1099        # only the *last* line of "result" and consider anything above that as log output.1100        # TODO: not sure if this code is needed/used1101        if isinstance(result, str) and "\n" in result:1102            lines = result.split("\n")1103            idx = last_index_of(1104                lines, lambda line: line and not line.startswith(INTERNAL_LOG_PREFIX)1105            )1106            if idx >= 0:1107                result = lines[idx]1108                additional_logs = "\n".join(lines[:idx] + lines[idx + 1 :])1109                log_output += "\n%s" % additional_logs1110        func_arn = lambda_function and lambda_function.arn()1111        output = OutputLog(result, log_output)1112        LOG.debug(1113            f"Lambda {func_arn} result / log output:"1114            f"\n{output.stdout_formatted()}"1115            f"\n>{output.stderr_formatted()}"1116        )1117        # store log output - TODO get live logs from `process` above?1118        # store_lambda_logs(lambda_function, log_output)1119        if return_code != 0:1120            output.output_file()1121            raise InvocationException(1122                "Lambda process returned error status code: %s. Result: %s. Output:\n%s"1123                % (return_code, result, log_output),1124                log_output,1125                result,1126            )1127        invocation_result = InvocationResult(result, log_output=log_output)1128        return invocation_result1129    def _execute(1130        self, lambda_function: LambdaFunction, inv_context: InvocationContext1131    ) -> InvocationResult:1132        # apply plugin patches to prepare invocation context1133        result = self.apply_plugin_patches(inv_context)1134        if isinstance(result, InvocationResult):1135            return result1136        lambda_cwd = lambda_function.cwd1137        environment = self._prepare_environment(lambda_function)1138        environment["LOCALSTACK_HOSTNAME"] = config.LOCALSTACK_HOSTNAME1139        environment["EDGE_PORT"] = str(config.EDGE_PORT)1140        if lambda_function.timeout:1141            environment["AWS_LAMBDA_FUNCTION_TIMEOUT"] = str(lambda_function.timeout)1142        context = inv_context.context1143        if context:1144            environment["AWS_LAMBDA_FUNCTION_NAME"] = context.function_name1145            environment["AWS_LAMBDA_FUNCTION_VERSION"] = context.function_version1146            environment["AWS_LAMBDA_FUNCTION_INVOKED_ARN"] = context.invoked_function_arn1147            environment["AWS_LAMBDA_FUNCTION_MEMORY_SIZE"] = str(context.memory_limit_in_mb)1148        # execute the Lambda function in a forked sub-process, sync result via queue1149        queue = Queue()1150        lambda_function_callable = self.get_lambda_callable(1151            lambda_function, qualifier=inv_context.function_version1152        )1153        def do_execute():1154            # now we're executing in the child process, safe to change CWD and ENV1155            result = None1156            try:1157                if lambda_cwd:1158                    os.chdir(lambda_cwd)1159                    sys.path.insert(0, "")1160                if environment:1161                    os.environ.update(environment)1162                # set default env variables required for most Lambda handlers1163                self.set_default_env_variables()1164                # run the actual handler function1165                result = lambda_function_callable(inv_context.event, context)1166            except Exception as e:1167                result = str(e)1168                sys.stderr.write("%s %s" % (e, traceback.format_exc()))1169                raise1170            finally:1171                queue.put(result)1172        process = Process(target=do_execute)1173        start_time = now(millis=True)1174        error = None1175        with CaptureOutput() as c:1176            try:1177                process.run()1178            except Exception as e:1179                error = e1180        result = queue.get()1181        end_time = now(millis=True)1182        # Make sure to keep the log line below, to ensure the log stream gets created1183        request_id = long_uid()1184        log_output = 'START %s: Lambda %s started via "local" executor ...' % (1185            request_id,1186            lambda_function.arn(),1187        )1188        # TODO: Interweaving stdout/stderr currently not supported1189        for stream in (c.stdout(), c.stderr()):1190            if stream:1191                log_output += ("\n" if log_output else "") + stream1192        if isinstance(result, InvocationResult) and result.log_output:1193            log_output += "\n" + result.log_output1194        log_output += "\nEND RequestId: %s" % request_id1195        log_output += "\nREPORT RequestId: %s Duration: %s ms" % (1196            request_id,1197            int((end_time - start_time) * 1000),1198        )1199        # store logs to CloudWatch1200        store_lambda_logs(lambda_function, log_output)1201        result = result.result if isinstance(result, InvocationResult) else result1202        if error:1203            LOG.info(1204                'Error executing Lambda "%s": %s %s',1205                lambda_function.arn(),1206                error,1207                "".join(traceback.format_tb(error.__traceback__)),1208            )1209            result = json.dumps(1210                {1211                    "errorType": error.__class__.__name__,1212                    "errorMessage": error.args[0],1213                    "stackTrace": traceback.format_tb(error.__traceback__),1214                }1215            )1216            raise InvocationException(result, log_output=log_output, result=result)1217        # construct final invocation result1218        invocation_result = InvocationResult(result, log_output=log_output)1219        # run plugins post-processing logic1220        invocation_result = self.process_result_via_plugins(inv_context, invocation_result)1221        return invocation_result1222    def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:1223        # This is a no-op for local executors - simply return the given local file path1224        return local_file1225    def execute_java_lambda(1226        self, event, context, main_file, lambda_function: LambdaFunction = None1227    ) -> InvocationResult:1228        lambda_function.envvars = lambda_function.envvars or {}1229        java_opts = config.LAMBDA_JAVA_OPTS or ""1230        handler = lambda_function.handler1231        lambda_function.envvars[LAMBDA_HANDLER_ENV_VAR_NAME] = handler1232        event_file = EVENT_FILE_PATTERN.replace("*", short_uid())1233        save_file(event_file, json.dumps(json_safe(event)))1234        TMP_FILES.append(event_file)1235        classpath = "%s:%s:%s" % (1236            main_file,1237            Util.get_java_classpath(main_file),1238            LAMBDA_EXECUTOR_JAR,1239        )1240        cmd = "java %s -cp %s %s %s" % (1241            java_opts,1242            classpath,1243            LAMBDA_EXECUTOR_CLASS,1244            event_file,1245        )1246        # apply plugin patches1247        inv_context = InvocationContext(1248            lambda_function, event, environment=lambda_function.envvars, lambda_command=cmd1249        )1250        result = self.apply_plugin_patches(inv_context)1251        if isinstance(result, InvocationResult):1252            return result1253        cmd = inv_context.lambda_command1254        LOG.info(cmd)1255        # execute Lambda and get invocation result1256        invocation_result = self._execute_in_custom_runtime(cmd, lambda_function=lambda_function)1257        return invocation_result1258    def execute_javascript_lambda(1259        self, event, context, main_file, lambda_function: LambdaFunction = None1260    ):1261        handler = lambda_function.handler1262        function = handler.split(".")[-1]1263        event_json_string = "%s" % (json.dumps(json_safe(event)) if event else "{}")1264        context_json_string = "%s" % (json.dumps(context.__dict__) if context else "{}")1265        cmd = [1266            "node",1267            "-e",1268            f'const res = require("{main_file}").{function}({event_json_string},{context_json_string}); '1269            f"const log = (rs) => console.log(JSON.stringify(rs)); "1270            "res && res.then ? res.then(r => log(r)) : log(res)",1271        ]1272        LOG.info(cmd)1273        result = self._execute_in_custom_runtime(cmd, lambda_function=lambda_function)1274        return result1275    def execute_go_lambda(self, event, context, main_file, lambda_function: LambdaFunction = None):1276        if lambda_function:1277            lambda_function.envvars["AWS_LAMBDA_FUNCTION_HANDLER"] = main_file1278            lambda_function.envvars["AWS_LAMBDA_EVENT_BODY"] = json.dumps(json_safe(event))1279        else:1280            LOG.warning("Unable to get function details for local execution of Golang Lambda")1281        cmd = GO_LAMBDA_RUNTIME1282        LOG.debug("Running Golang Lambda with runtime: %s", cmd)1283        result = self._execute_in_custom_runtime(cmd, lambda_function=lambda_function)1284        return result1285    @staticmethod1286    def set_default_env_variables():1287        # set default env variables required for most Lambda handlers1288        default_env_vars = {"AWS_DEFAULT_REGION": aws_stack.get_region()}1289        env_vars_before = {var: os.environ.get(var) for var in default_env_vars}1290        os.environ.update({k: v for k, v in default_env_vars.items() if not env_vars_before.get(k)})1291        return env_vars_before1292    @staticmethod1293    def reset_default_env_variables(env_vars_before):1294        for env_name, env_value in env_vars_before.items():1295            env_value_before = env_vars_before.get(env_name)1296            os.environ[env_name] = env_value_before or ""1297            if env_value_before is None:1298                os.environ.pop(env_name, None)1299    @classmethod1300    def get_lambda_callable(cls, function: LambdaFunction, qualifier: str = None) -> Callable:1301        """Returns the function Callable for invoking the given function locally"""1302        qualifier = function.get_qualifier_version(qualifier)1303        func_dict = cls.FUNCTION_CALLABLES.get(function.arn()) or {}1304        # TODO: function versioning and qualifiers should be refactored and designed properly!1305        callable = func_dict.get(qualifier) or func_dict.get(LambdaFunction.QUALIFIER_LATEST)1306        if not callable:1307            raise Exception(1308                f"Unable to find callable for Lambda function {function.arn()} - {qualifier}"1309            )1310        return callable1311    @classmethod1312    def add_function_callable(cls, function: LambdaFunction, lambda_handler: Callable):1313        """Sets the function Callable for invoking the $LATEST version of the Lambda function."""1314        func_dict = cls.FUNCTION_CALLABLES.setdefault(function.arn(), {})1315        qualifier = function.get_qualifier_version(LambdaFunction.QUALIFIER_LATEST)1316        func_dict[qualifier] = lambda_handler1317class Util:1318    debug_java_port = False1319    @classmethod1320    def get_java_opts(cls):1321        opts = config.LAMBDA_JAVA_OPTS or ""1322        # Replace _debug_port_ with a random free port1323        if "_debug_port_" in opts:1324            if not cls.debug_java_port:1325                cls.debug_java_port = get_free_tcp_port()1326            opts = opts.replace("_debug_port_", ("%s" % cls.debug_java_port))1327        else:1328            # Parse the debug port from opts1329            m = re.match(".*address=(.+:)?(\\d+).*", opts)1330            if m is not None:1331                cls.debug_java_port = m.groups()[1]1332        return opts1333    @classmethod1334    def get_host_path_for_path_in_docker(cls, path):1335        return re.sub(r"^%s/(.*)$" % config.dirs.tmp, r"%s/\1" % config.dirs.functions, path)1336    @classmethod1337    def format_windows_path(cls, path):1338        temp = path.replace(":", "").replace("\\", "/")1339        if len(temp) >= 1 and temp[:1] != "/":1340            temp = "/" + temp1341        temp = "%s%s" % (config.WINDOWS_DOCKER_MOUNT_PREFIX, temp)1342        return temp1343    @classmethod1344    def docker_image_for_lambda(cls, lambda_function: LambdaFunction):1345        runtime = lambda_function.runtime or ""1346        if lambda_function.code.get("ImageUri"):1347            LOG.warning(1348                "ImageUri is set: Using Lambda container images is only supported in LocalStack Pro"1349            )1350        docker_tag = runtime1351        docker_image = config.LAMBDA_CONTAINER_REGISTRY1352        if runtime == "nodejs14.x" and docker_image == DEFAULT_LAMBDA_CONTAINER_REGISTRY:1353            # TODO temporary fix until lambci image for nodejs14.x becomes available1354            docker_image = "localstack/lambda-js"1355        elif (1356            runtime in ["python3.9", "dotnet6"]1357            and docker_image == DEFAULT_LAMBDA_CONTAINER_REGISTRY1358        ):1359            # TODO temporary fix until we support AWS images via https://github.com/localstack/localstack/pull/47341360            docker_image = "mlupin/docker-lambda"1361        return "%s:%s" % (docker_image, docker_tag)1362    @classmethod1363    def get_java_classpath(cls, archive):1364        """1365        Return the Java classpath, using the parent folder of the1366        given archive as the base folder.1367        The result contains any *.jar files in the base folder, as1368        well as any JAR files in the "lib/*" subfolder living1369        alongside the supplied java archive (.jar or .zip).1370        :param archive: an absolute path to a .jar or .zip Java archive1371        :return: the Java classpath, relative to the base dir of "archive"1372        """1373        entries = ["."]1374        base_dir = os.path.dirname(archive)1375        for pattern in ["%s/*.jar", "%s/lib/*.jar", "%s/java/lib/*.jar", "%s/*.zip"]:1376            for entry in glob.glob(pattern % base_dir):1377                if os.path.realpath(archive) != os.path.realpath(entry):1378                    entries.append(os.path.relpath(entry, base_dir))1379        # make sure to append the localstack-utils.jar at the end of the classpath1380        # https://github.com/localstack/localstack/issues/11601381        entries.append(os.path.relpath(archive, base_dir))1382        entries.append("*.jar")1383        entries.append("java/lib/*.jar")1384        result = ":".join(entries)1385        return result1386    @staticmethod1387    def mountable_tmp_file():1388        f = os.path.join(config.dirs.tmp, short_uid())1389        TMP_FILES.append(f)1390        return f1391    @staticmethod1392    def inject_endpoints_into_env(env_vars: Dict[str, str]):1393        env_vars = env_vars or {}1394        main_endpoint = get_main_endpoint_from_container()1395        if not env_vars.get("LOCALSTACK_HOSTNAME"):1396            env_vars["LOCALSTACK_HOSTNAME"] = main_endpoint1397        return env_vars1398class OutputLog:1399    __slots__ = ["_stdout", "_stderr"]1400    def __init__(self, stdout, stderr):1401        self._stdout = stdout1402        self._stderr = stderr1403    def stderr_formatted(self, truncated_to: int = LAMBDA_TRUNCATE_STDOUT):1404        return truncate(to_str(self._stderr).strip().replace("\n", "\n> "), truncated_to)1405    def stdout_formatted(self, truncated_to: int = LAMBDA_TRUNCATE_STDOUT):1406        return truncate(to_str(self._stdout).strip(), truncated_to)1407    def output_file(self):1408        try:1409            with tempfile.NamedTemporaryFile(1410                dir=TMP_FOLDER, delete=False, suffix=".log", prefix="lambda_"1411            ) as f:1412                LOG.info(f"writing log to file '{f.name}'")1413                f.write(to_bytes(self._stderr))1414        except Exception as e:1415            LOG.warning("failed to write log to file, error %s", e)1416# --------------1417# GLOBAL STATE...admin.py
Source:admin.py  
1import asyncio2import contextlib3import io4import json5import pprint6import textwrap7import time8import traceback9import discord10from discord.ext import commands11def format_codeblock(content):12    if not str(content):13        return None14    formatted = pprint.pformat(content, width=51, compact=True)15    if len(formatted) > 1014:16        formatted = formatted[:1013] + 'â¦'17    return f'```py\n{formatted}\n```'18def get_files(*contents):19    files = []20    for content in contents:21        if len(pprint.pformat(content, width=51, compact=True)) > 1014:22            files.append(discord.File(io.StringIO(pprint.pformat(content, width=128))))23    return files24class Admin(commands.Cog, command_attrs={"hidden": True}):25    def __init__(self, bot: commands.Bot):26        self.bot = bot27        self._last_eval_value = None28    def cog_check(self, ctx):29        return self.bot.is_owner(ctx.author)30    @commands.command(aliases=['.'])31    async def eval(self, ctx, *, code: str):32        if code.startswith('```'):33            code = '\n'.join(code.splitlines()[1:-1])34        else:35            code = f"return {code.strip('` ')}"36        code = 'async def func():\n' + textwrap.indent(code, '  ')37        code_return = '<empty>'38        code_stdout = io.StringIO()39        env = {'_': self._last_eval_value, 'b': self.bot}40        env.update(globals())41        env.update(locals())42        try:43            exec_time = time.perf_counter()44            exec(code, env)45            with contextlib.redirect_stdout(code_stdout):46                code_return = await env['func']()47        except:  # noqa: E72248            return_formatted = format_codeblock(code_return)49            stdout_formatted = format_codeblock(code_stdout.getvalue())50            traceback_formatted = format_codeblock(traceback.format_exc(-1))51            embed = discord.Embed(52                color=0xfa5050,53                title=":x: error!",54                description=f"{(time.perf_counter()-exec_time)*1000:g}ms :clock2:"55            )56            embed.add_field(name='Traceback', value=traceback_formatted, inline=False)57            embed.add_field(name='Return', value=return_formatted, inline=False)58            if stdout_formatted:59                embed.add_field(name='Stdout', value=stdout_formatted, inline=False)60            await ctx.send(61                embed=embed,62                files=get_files(code_return, code_stdout.getvalue(), traceback.format_exc(-1))63            )64        else:65            return_formatted = format_codeblock(code_return)66            stdout_formatted = format_codeblock(code_stdout.getvalue())67            embed = discord.Embed(68                color=0x50fa50,69                title=":white_check_mark: Code evaluated",70                description=f"{(time.perf_counter()-exec_time)*1000:g}ms :clock2:"71            )72            embed.add_field(73                name='Return', value=return_formatted, inline=False74            )75            if stdout_formatted:76                embed.add_field(77                    name='Stdout', value=stdout_formatted, inline=False78                )79            await ctx.send(80                embed=embed,81                files=get_files(code_return, code_stdout.getvalue())82            )83            if code_return is not None:84                self._last_eval_value = code_return85    @commands.command(aliases=['h'])86    async def shell(self, ctx, *, command):87        command = '\n'.join(command.splitlines()[1:-1]) \88                    if command.startswith('```') \89                    else command.strip('` ')90        exec_time = time.perf_counter()91        proc = await asyncio.subprocess.create_subprocess_shell(92            command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE93        )94        stdout, stderr = await proc.communicate()95        stdout_formatted = format_codeblock(stdout.decode())96        stderr_formatted = format_codeblock(stderr.decode())97        if proc.returncode == 0:98            embed = discord.Embed(99                color=0x50fa50,100                title=':white_check_mark: Subprocess finished',101                description=f"{(time.perf_counter()-exec_time)*1000:g}ms :clock2:"102            )103        else:104            embed = discord.Embed(105                color=0xfa7050,106                title=f':x: Subprocess exited with returncode {proc.returncode}',107                description=f"{(time.perf_counter()-exec_time)*1000:g}ms :clock2:"108            )109        if stdout_formatted is None and stderr_formatted is None:110            embed.description = 'no output · ' + embed.description111        if stdout_formatted is not None:112            embed.add_field(name='Stdout:', value=stdout_formatted, inline=False)113        if stderr_formatted is not None:114            embed.add_field(name='Stderr:', value=stderr_formatted, inline=False)115        await ctx.send(embed=embed, files=get_files(stdout.decode(), stderr.decode()))116    @commands.command()117    async def reloadcfg(self, ctx):118        self.bot.config = json.load(open('config_defaults.json')) | json.load(open('config.json'))119        await ctx.message.add_reaction('\N{white heavy check mark}')120    @commands.command(aliases=['l'])121    async def loadexts(self, ctx, *exts):122        if len(exts) == 0:123            await ctx.send(', '.join(f'`{i}`' for i in self.bot.extensions.keys()))124            return125        errors = []126        for ext in exts:127            ext = f'bot.exts.{ext}' if not ext.startswith('bot.exts.') else f'{ext}'128            try:129                if ext in self.bot.extensions:130                    self.bot.reload_extension(ext)131                else:132                    self.bot.load_extension(ext)133            except Exception as exc:134                exc = repr(exc)135                if len(exc) > 200:136                    exc = exc[:200] + 'â¦'137                errors.append(traceback.format_exc())138        if errors:139            await ctx.send(file=discord.File(io.StringIO('\n'.join(errors)), 'errors.py'))140        await ctx.message.add_reaction('\N{white heavy check mark}')141def setup(bot):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
