Best Python code snippet using localstack_python
lambda_executors.py
Source:lambda_executors.py  
...41    def startup(self):42        pass43    def cleanup(self, arn=None):44        pass45    def run_lambda_executor(self, cmd, env_vars={}, asynchronous=False):46        process = run(cmd, asynchronous=True, stderr=subprocess.PIPE, outfile=subprocess.PIPE, env_vars=env_vars)47        if asynchronous:48            result = '{"asynchronous": "%s"}' % asynchronous49            log_output = 'Lambda executed asynchronously'50        else:51            return_code = process.wait()52            result = to_str(process.stdout.read())53            log_output = to_str(process.stderr.read())54            if return_code != 0:55                raise Exception('Lambda process returned error status code: %s. Output:\n%s' %56                    (return_code, log_output))57        return result, log_output58# holds information about an existing container.59class ContainerInfo:60    """61    Contains basic information about a docker container.62    """63    def __init__(self, name, entry_point):64        self.name = name65        self.entry_point = entry_point66class LambdaExecutorContainers(LambdaExecutor):67    """ Abstract executor class for executing Lambda functions in Docker containers """68    def prepare_execution(self, func_arn, env_vars, runtime, command, handler, lambda_cwd):69        raise Exception('Not implemented')70    def execute(self, func_arn, func_details, event, context=None, version=None, asynchronous=False):71        lambda_cwd = func_details.cwd72        runtime = func_details.runtime73        handler = func_details.handler74        environment = func_details.envvars.copy()75        # configure USE_SSL in environment76        if config.USE_SSL:77            environment['USE_SSL'] = '1'78        # prepare event body79        if not event:80            LOG.warning('Empty event body specified for invocation of Lambda "%s"' % func_arn)81            event = {}82        event_body = json.dumps(event)83        event_body_escaped = event_body.replace("'", "\\'")84        docker_host = config.DOCKER_HOST_FROM_CONTAINER85        # amend the environment variables for execution86        environment['AWS_LAMBDA_EVENT_BODY'] = event_body_escaped87        environment['HOSTNAME'] = docker_host88        environment['LOCALSTACK_HOSTNAME'] = docker_host89        if context:90            environment['AWS_LAMBDA_FUNCTION_NAME'] = context.function_name91            environment['AWS_LAMBDA_FUNCTION_VERSION'] = context.function_version92            environment['AWS_LAMBDA_FUNCTION_INVOKED_ARN'] = context.invoked_function_arn93        # custom command to execute in the container94        command = ''95        # if running a Java Lambda, set up classpath arguments96        if runtime == LAMBDA_RUNTIME_JAVA8:97            # copy executor jar into temp directory98            cp_r(LAMBDA_EXECUTOR_JAR, lambda_cwd)99            # TODO cleanup once we have custom Java Docker image100            taskdir = '/var/task'101            save_file(os.path.join(lambda_cwd, LAMBDA_EVENT_FILE), event_body)102            command = ("bash -c 'cd %s; java -cp .:`ls *.jar | tr \"\\n\" \":\"` \"%s\" \"%s\" \"%s\"'" %103                (taskdir, LAMBDA_EXECUTOR_CLASS, handler, LAMBDA_EVENT_FILE))104        # determine the command to be executed (implemented by subclasses)105        cmd = self.prepare_execution(func_arn, environment, runtime, command, handler, lambda_cwd)106        # lambci writes the Lambda result to stdout and logs to stderr, fetch it from there!107        LOG.debug('Running lambda cmd: %s' % cmd)108        result, log_output = self.run_lambda_executor(cmd, environment, asynchronous)109        LOG.debug('Lambda result / log output:\n%s\n>%s' % (result.strip(), log_output.strip().replace('\n', '\n> ')))110        return result, log_output111class LambdaExecutorReuseContainers(LambdaExecutorContainers):112    """ Executor class for executing Lambda functions in re-usable Docker containers """113    def __init__(self):114        super(LambdaExecutorReuseContainers, self).__init__()115        # keeps track of each function arn and the last time it was invoked116        self.function_invoke_times = {}117        # locking thread for creation/destruction of docker containers.118        self.docker_container_lock = threading.RLock()119    def prepare_execution(self, func_arn, env_vars, runtime, command, handler, lambda_cwd):120        # check whether the Lambda has been invoked before121        has_been_invoked_before = func_arn in self.function_invoke_times122        # set the invocation time123        self.function_invoke_times[func_arn] = time.time()124        # create/verify the docker container is running.125        LOG.debug('Priming docker container with runtime "%s" and arn "%s".', runtime, func_arn)126        container_info = self.prime_docker_container(runtime, func_arn, env_vars.items(), lambda_cwd)127        # Note: currently "docker exec" does not support --env-file, i.e., environment variables can only be128        # passed directly on the command line, using "-e" below. TODO: Update this code once --env-file is129        # available for docker exec, to better support very large Lambda events (very long environment values)130        exec_env_vars = ' '.join(['-e {}="${}"'.format(k, k) for (k, v) in env_vars.items()])131        if not command:132            command = '%s %s' % (container_info.entry_point, handler)133        # determine files to be copied into the container134        copy_command = ''135        event_file = os.path.join(lambda_cwd, LAMBDA_EVENT_FILE)136        if not has_been_invoked_before:137            # if this is the first invocation: copy the entire folder into the container138            copy_command = 'docker cp "%s/." "%s:/var/task"; ' % (lambda_cwd, container_info.name)139        elif os.path.exists(event_file):140            # otherwise, copy only the event file if it exists141            copy_command = 'docker cp "%s" "%s:/var/task"; ' % (event_file, container_info.name)142        cmd = (143            '%s'  # copy files command144            'docker exec'145            ' %s'  # env variables146            ' %s'  # container name147            ' %s'  # run cmd148        ) % (copy_command, exec_env_vars, container_info.name, command)149        return cmd150    def startup(self):151        self.cleanup()152        # start a process to remove idle containers153        self.start_idle_container_destroyer_interval()154    def cleanup(self, arn=None):155        if arn:156            self.function_invoke_times.pop(arn, None)157            return self.destroy_docker_container(arn)158        self.function_invoke_times = {}159        return self.destroy_existing_docker_containers()160    def prime_docker_container(self, runtime, func_arn, env_vars, lambda_cwd):161        """162        Prepares a persistent docker container for a specific function.163        :param runtime: Lamda runtime environment. python2.7, nodejs6.10, etc.164        :param func_arn: The ARN of the lambda function.165        :param env_vars: The environment variables for the lambda.166        :param lambda_cwd: The local directory containing the code for the lambda function.167        :return: ContainerInfo class containing the container name and default entry point.168        """169        with self.docker_container_lock:170            # Get the container name and id.171            container_name = self.get_container_name(func_arn)172            LOG.debug('Priming docker container: %s' % container_name)173            status = self.get_docker_container_status(func_arn)174            # Container is not running or doesn't exist.175            if status < 1:176                # Make sure the container does not exist in any form/state.177                self.destroy_docker_container(func_arn)178                env_vars_str = ' '.join(['-e {}={}'.format(k, cmd_quote(v)) for (k, v) in env_vars])179                # Create and start the container180                LOG.debug('Creating container: %s' % container_name)181                cmd = (182                    'docker create'183                    ' --name "%s"'184                    ' --entrypoint /bin/bash'  # Load bash when it starts.185                    ' --interactive'  # Keeps the container running bash.186                    ' -e AWS_LAMBDA_EVENT_BODY="$AWS_LAMBDA_EVENT_BODY"'187                    ' -e HOSTNAME="$HOSTNAME"'188                    ' -e LOCALSTACK_HOSTNAME="$LOCALSTACK_HOSTNAME"'189                    '  %s'  # env_vars190                    ' lambci/lambda:%s'191                ) % (container_name, env_vars_str, runtime)192                LOG.debug(cmd)193                run(cmd, stderr=subprocess.PIPE, outfile=subprocess.PIPE)194                LOG.debug('Copying files to container "%s" from "%s".' % (container_name, lambda_cwd))195                cmd = (196                    'docker cp'197                    ' "%s/." "%s:/var/task"'198                ) % (lambda_cwd, container_name)199                LOG.debug(cmd)200                run(cmd, stderr=subprocess.PIPE, outfile=subprocess.PIPE)201                LOG.debug('Starting container: %s' % container_name)202                cmd = 'docker start %s' % (container_name)203                LOG.debug(cmd)204                run(cmd, stderr=subprocess.PIPE, outfile=subprocess.PIPE)205                # give the container some time to start up206                time.sleep(1)207            # Get the entry point for the image.208            LOG.debug('Getting the entrypoint for image: lambci/lambda:%s' % runtime)209            cmd = (210                'docker image inspect'211                ' --format="{{ .ContainerConfig.Entrypoint }}"'212                ' lambci/lambda:%s'213            ) % (runtime)214            LOG.debug(cmd)215            run_result = run(cmd, asynchronous=False, stderr=subprocess.PIPE, outfile=subprocess.PIPE)216            entry_point = run_result.strip('[]\n\r ')217            LOG.debug('Using entrypoint "%s" for container "%s".' % (entry_point, container_name))218            return ContainerInfo(container_name, entry_point)219    def destroy_docker_container(self, func_arn):220        """221        Stops and/or removes a docker container for a specific lambda function ARN.222        :param func_arn: The ARN of the lambda function.223        :return: None224        """225        with self.docker_container_lock:226            status = self.get_docker_container_status(func_arn)227            # Get the container name and id.228            container_name = self.get_container_name(func_arn)229            if status == 1:230                LOG.debug('Stopping container: %s' % container_name)231                cmd = (232                    'docker stop -t0 %s'233                ) % (container_name)234                LOG.debug(cmd)235                run(cmd, asynchronous=False, stderr=subprocess.PIPE, outfile=subprocess.PIPE)236                status = self.get_docker_container_status(func_arn)237            if status == -1:238                LOG.debug('Removing container: %s' % container_name)239                cmd = (240                    'docker rm %s'241                ) % (container_name)242                LOG.debug(cmd)243                run(cmd, asynchronous=False, stderr=subprocess.PIPE, outfile=subprocess.PIPE)244    def get_all_container_names(self):245        """246        Returns a list of container names for lambda containers.247        :return: A String[] localstack docker container names for each function.248        """249        with self.docker_container_lock:250            LOG.debug('Getting all lambda containers names.')251            cmd = 'docker ps -a --filter="name=localstack_lambda_*" --format "{{.Names}}"'252            LOG.debug(cmd)253            cmd_result = run(cmd, asynchronous=False, stderr=subprocess.PIPE, outfile=subprocess.PIPE).strip()254            if len(cmd_result) > 0:255                container_names = cmd_result.split('\n')256            else:257                container_names = []258            return container_names259    def destroy_existing_docker_containers(self):260        """261        Stops and/or removes all lambda docker containers for localstack.262        :return: None263        """264        with self.docker_container_lock:265            container_names = self.get_all_container_names()266            LOG.debug('Removing %d containers.' % len(container_names))267            for container_name in container_names:268                cmd = 'docker rm -f %s' % container_name269                LOG.debug(cmd)270                run(cmd, asynchronous=False, stderr=subprocess.PIPE, outfile=subprocess.PIPE)271    def get_docker_container_status(self, func_arn):272        """273        Determine the status of a docker container.274        :param func_arn: The ARN of the lambda function.275        :return: 1 If the container is running,276        -1 if the container exists but is not running277        0 if the container does not exist.278        """279        with self.docker_container_lock:280            # Get the container name and id.281            container_name = self.get_container_name(func_arn)282            # Check if the container is already running.283            LOG.debug('Getting container status: %s' % container_name)284            cmd = (285                'docker ps'286                ' -a'287                ' --filter name="%s"'288                ' --format "{{ .Status }}"'289            ) % (container_name)290            LOG.debug(cmd)291            cmd_result = run(cmd, asynchronous=False, stderr=subprocess.PIPE, outfile=subprocess.PIPE)292            # If the container doesn't exist. Create and start it.293            container_status = cmd_result.strip()294            if len(container_status) == 0:295                return 0296            if container_status.lower().startswith('up '):297                return 1298            return -1299    def idle_container_destroyer(self):300        """301        Iterates though all the lambda containers and destroys any container that has302        been inactive for longer than MAX_CONTAINER_IDLE_TIME.303        :return: None304        """305        LOG.info('Checking if there are idle containers.')306        current_time = time.time()307        for func_arn, last_run_time in self.function_invoke_times.items():308            duration = current_time - last_run_time309            # not enough idle time has passed310            if duration < MAX_CONTAINER_IDLE_TIME:311                continue312            # container has been idle, destroy it.313            self.destroy_docker_container(func_arn)314    def start_idle_container_destroyer_interval(self):315        """316        Starts a repeating timer that triggers start_idle_container_destroyer_interval every 60 seconds.317        Thus checking for idle containers and destroying them.318        :return: None319        """320        self.idle_container_destroyer()321        threading.Timer(60.0, self.start_idle_container_destroyer_interval).start()322    def get_container_name(self, func_arn):323        """324        Given a function ARN, returns a valid docker container name.325        :param func_arn: The ARN of the lambda function.326        :return: A docker compatible name for the arn.327        """328        return 'localstack_lambda_' + re.sub(r'[^a-zA-Z0-9_.-]', '_', func_arn)329class LambdaExecutorSeparateContainers(LambdaExecutorContainers):330    def prepare_execution(self, func_arn, env_vars, runtime, command, handler, lambda_cwd):331        entrypoint = ''332        if command:333            entrypoint = ' --entrypoint ""'334        else:335            command = '"%s"' % handler336        env_vars_string = ' '.join(['-e {}="${}"'.format(k, k) for (k, v) in env_vars.items()])337        if config.LAMBDA_REMOTE_DOCKER:338            cmd = (339                'CONTAINER_ID="$(docker create'340                ' %s'341                ' %s'342                ' "lambci/lambda:%s" %s'343                ')";'344                'docker cp "%s/." "$CONTAINER_ID:/var/task";'345                'docker start -a "$CONTAINER_ID";'346            ) % (entrypoint, env_vars_string, runtime, command, lambda_cwd)347        else:348            lambda_cwd_on_host = self.get_host_path_for_path_in_docker(lambda_cwd)349            cmd = (350                'docker run'351                '%s -v "%s":/var/task'352                ' %s'353                ' --rm'354                ' "lambci/lambda:%s" %s'355            ) % (entrypoint, lambda_cwd_on_host, env_vars_string, runtime, command)356        return cmd357    def get_host_path_for_path_in_docker(self, path):358        return re.sub(r'^%s/(.*)$' % config.TMP_FOLDER,359                    r'%s/\1' % config.HOST_TMP_FOLDER, path)360class LambdaExecutorLocal(LambdaExecutor):361    def execute(self, func_arn, func_details, event, context=None, version=None, asynchronous=False):362        lambda_cwd = func_details.cwd363        environment = func_details.envvars.copy()364        # execute the Lambda function in a forked sub-process, sync result via queue365        queue = Queue()366        lambda_function = func_details.function(version)367        def do_execute():368            # now we're executing in the child process, safe to change CWD and ENV369            if lambda_cwd:370                os.chdir(lambda_cwd)371            if environment:372                os.environ.update(environment)373            result = lambda_function(event, context)374            queue.put(result)375        process = Process(target=do_execute)376        process.run()377        result = queue.get()378        # TODO capture log output during local execution?379        log_output = ''380        return result, log_output381    def execute_java_lambda(self, event, context, handler, main_file):382        event_file = EVENT_FILE_PATTERN.replace('*', short_uid())383        save_file(event_file, json.dumps(event))384        TMP_FILES.append(event_file)385        class_name = handler.split('::')[0]386        classpath = '%s:%s' % (LAMBDA_EXECUTOR_JAR, main_file)387        cmd = 'java -cp %s %s %s %s' % (classpath, LAMBDA_EXECUTOR_CLASS, class_name, event_file)388        asynchronous = False389        # flip asynchronous flag depending on origin390        if 'Records' in event:391            # TODO: add more event supporting asynchronous lambda execution392            if 'Sns' in event['Records'][0]:393                asynchronous = True394            if 'dynamodb' in event['Records'][0]:395                asynchronous = True396        result, log_output = self.run_lambda_executor(cmd, asynchronous=asynchronous)397        LOG.debug('Lambda result / log output:\n%s\n> %s' % (result.strip(), log_output.strip().replace('\n', '\n> ')))398        return result, log_output399# --------------400# GLOBAL STATE401# --------------402EXECUTOR_LOCAL = LambdaExecutorLocal()403EXECUTOR_CONTAINERS_SEPARATE = LambdaExecutorSeparateContainers()404EXECUTOR_CONTAINERS_REUSE = LambdaExecutorReuseContainers()405DEFAULT_EXECUTOR = EXECUTOR_LOCAL406# the keys of AVAILABLE_EXECUTORS map to the LAMBDA_EXECUTOR config variable407AVAILABLE_EXECUTORS = {408    'local': EXECUTOR_LOCAL,409    'docker': EXECUTOR_CONTAINERS_SEPARATE,410    'docker-reuse': EXECUTOR_CONTAINERS_REUSE...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
