How to use run_lambda_executor method in localstack

Best Python code snippet using localstack_python

lambda_executors.py

Source:lambda_executors.py Github

copy

Full Screen

...41 def startup(self):42 pass43 def cleanup(self, arn=None):44 pass45 def run_lambda_executor(self, cmd, env_vars={}, asynchronous=False):46 process = run(cmd, asynchronous=True, stderr=subprocess.PIPE, outfile=subprocess.PIPE, env_vars=env_vars)47 if asynchronous:48 result = '{"asynchronous": "%s"}' % asynchronous49 log_output = 'Lambda executed asynchronously'50 else:51 return_code = process.wait()52 result = to_str(process.stdout.read())53 log_output = to_str(process.stderr.read())54 if return_code != 0:55 raise Exception('Lambda process returned error status code: %s. Output:\n%s' %56 (return_code, log_output))57 return result, log_output58# holds information about an existing container.59class ContainerInfo:60 """61 Contains basic information about a docker container.62 """63 def __init__(self, name, entry_point):64 self.name = name65 self.entry_point = entry_point66class LambdaExecutorContainers(LambdaExecutor):67 """ Abstract executor class for executing Lambda functions in Docker containers """68 def prepare_execution(self, func_arn, env_vars, runtime, command, handler, lambda_cwd):69 raise Exception('Not implemented')70 def execute(self, func_arn, func_details, event, context=None, version=None, asynchronous=False):71 lambda_cwd = func_details.cwd72 runtime = func_details.runtime73 handler = func_details.handler74 environment = func_details.envvars.copy()75 # configure USE_SSL in environment76 if config.USE_SSL:77 environment['USE_SSL'] = '1'78 # prepare event body79 if not event:80 LOG.warning('Empty event body specified for invocation of Lambda "%s"' % func_arn)81 event = {}82 event_body = json.dumps(event)83 event_body_escaped = event_body.replace("'", "\\'")84 docker_host = config.DOCKER_HOST_FROM_CONTAINER85 # amend the environment variables for execution86 environment['AWS_LAMBDA_EVENT_BODY'] = event_body_escaped87 environment['HOSTNAME'] = docker_host88 environment['LOCALSTACK_HOSTNAME'] = docker_host89 if context:90 environment['AWS_LAMBDA_FUNCTION_NAME'] = context.function_name91 environment['AWS_LAMBDA_FUNCTION_VERSION'] = context.function_version92 environment['AWS_LAMBDA_FUNCTION_INVOKED_ARN'] = context.invoked_function_arn93 # custom command to execute in the container94 command = ''95 # if running a Java Lambda, set up classpath arguments96 if runtime == LAMBDA_RUNTIME_JAVA8:97 # copy executor jar into temp directory98 cp_r(LAMBDA_EXECUTOR_JAR, lambda_cwd)99 # TODO cleanup once we have custom Java Docker image100 taskdir = '/var/task'101 save_file(os.path.join(lambda_cwd, LAMBDA_EVENT_FILE), event_body)102 command = ("bash -c 'cd %s; java -cp .:`ls *.jar | tr \"\\n\" \":\"` \"%s\" \"%s\" \"%s\"'" %103 (taskdir, LAMBDA_EXECUTOR_CLASS, handler, LAMBDA_EVENT_FILE))104 # determine the command to be executed (implemented by subclasses)105 cmd = self.prepare_execution(func_arn, environment, runtime, command, handler, lambda_cwd)106 # lambci writes the Lambda result to stdout and logs to stderr, fetch it from there!107 LOG.debug('Running lambda cmd: %s' % cmd)108 result, log_output = self.run_lambda_executor(cmd, environment, asynchronous)109 LOG.debug('Lambda result / log output:\n%s\n>%s' % (result.strip(), log_output.strip().replace('\n', '\n> ')))110 return result, log_output111class LambdaExecutorReuseContainers(LambdaExecutorContainers):112 """ Executor class for executing Lambda functions in re-usable Docker containers """113 def __init__(self):114 super(LambdaExecutorReuseContainers, self).__init__()115 # keeps track of each function arn and the last time it was invoked116 self.function_invoke_times = {}117 # locking thread for creation/destruction of docker containers.118 self.docker_container_lock = threading.RLock()119 def prepare_execution(self, func_arn, env_vars, runtime, command, handler, lambda_cwd):120 # check whether the Lambda has been invoked before121 has_been_invoked_before = func_arn in self.function_invoke_times122 # set the invocation time123 self.function_invoke_times[func_arn] = time.time()124 # create/verify the docker container is running.125 LOG.debug('Priming docker container with runtime "%s" and arn "%s".', runtime, func_arn)126 container_info = self.prime_docker_container(runtime, func_arn, env_vars.items(), lambda_cwd)127 # Note: currently "docker exec" does not support --env-file, i.e., environment variables can only be128 # passed directly on the command line, using "-e" below. TODO: Update this code once --env-file is129 # available for docker exec, to better support very large Lambda events (very long environment values)130 exec_env_vars = ' '.join(['-e {}="${}"'.format(k, k) for (k, v) in env_vars.items()])131 if not command:132 command = '%s %s' % (container_info.entry_point, handler)133 # determine files to be copied into the container134 copy_command = ''135 event_file = os.path.join(lambda_cwd, LAMBDA_EVENT_FILE)136 if not has_been_invoked_before:137 # if this is the first invocation: copy the entire folder into the container138 copy_command = 'docker cp "%s/." "%s:/var/task"; ' % (lambda_cwd, container_info.name)139 elif os.path.exists(event_file):140 # otherwise, copy only the event file if it exists141 copy_command = 'docker cp "%s" "%s:/var/task"; ' % (event_file, container_info.name)142 cmd = (143 '%s' # copy files command144 'docker exec'145 ' %s' # env variables146 ' %s' # container name147 ' %s' # run cmd148 ) % (copy_command, exec_env_vars, container_info.name, command)149 return cmd150 def startup(self):151 self.cleanup()152 # start a process to remove idle containers153 self.start_idle_container_destroyer_interval()154 def cleanup(self, arn=None):155 if arn:156 self.function_invoke_times.pop(arn, None)157 return self.destroy_docker_container(arn)158 self.function_invoke_times = {}159 return self.destroy_existing_docker_containers()160 def prime_docker_container(self, runtime, func_arn, env_vars, lambda_cwd):161 """162 Prepares a persistent docker container for a specific function.163 :param runtime: Lamda runtime environment. python2.7, nodejs6.10, etc.164 :param func_arn: The ARN of the lambda function.165 :param env_vars: The environment variables for the lambda.166 :param lambda_cwd: The local directory containing the code for the lambda function.167 :return: ContainerInfo class containing the container name and default entry point.168 """169 with self.docker_container_lock:170 # Get the container name and id.171 container_name = self.get_container_name(func_arn)172 LOG.debug('Priming docker container: %s' % container_name)173 status = self.get_docker_container_status(func_arn)174 # Container is not running or doesn't exist.175 if status < 1:176 # Make sure the container does not exist in any form/state.177 self.destroy_docker_container(func_arn)178 env_vars_str = ' '.join(['-e {}={}'.format(k, cmd_quote(v)) for (k, v) in env_vars])179 # Create and start the container180 LOG.debug('Creating container: %s' % container_name)181 cmd = (182 'docker create'183 ' --name "%s"'184 ' --entrypoint /bin/bash' # Load bash when it starts.185 ' --interactive' # Keeps the container running bash.186 ' -e AWS_LAMBDA_EVENT_BODY="$AWS_LAMBDA_EVENT_BODY"'187 ' -e HOSTNAME="$HOSTNAME"'188 ' -e LOCALSTACK_HOSTNAME="$LOCALSTACK_HOSTNAME"'189 ' %s' # env_vars190 ' lambci/lambda:%s'191 ) % (container_name, env_vars_str, runtime)192 LOG.debug(cmd)193 run(cmd, stderr=subprocess.PIPE, outfile=subprocess.PIPE)194 LOG.debug('Copying files to container "%s" from "%s".' % (container_name, lambda_cwd))195 cmd = (196 'docker cp'197 ' "%s/." "%s:/var/task"'198 ) % (lambda_cwd, container_name)199 LOG.debug(cmd)200 run(cmd, stderr=subprocess.PIPE, outfile=subprocess.PIPE)201 LOG.debug('Starting container: %s' % container_name)202 cmd = 'docker start %s' % (container_name)203 LOG.debug(cmd)204 run(cmd, stderr=subprocess.PIPE, outfile=subprocess.PIPE)205 # give the container some time to start up206 time.sleep(1)207 # Get the entry point for the image.208 LOG.debug('Getting the entrypoint for image: lambci/lambda:%s' % runtime)209 cmd = (210 'docker image inspect'211 ' --format="{{ .ContainerConfig.Entrypoint }}"'212 ' lambci/lambda:%s'213 ) % (runtime)214 LOG.debug(cmd)215 run_result = run(cmd, asynchronous=False, stderr=subprocess.PIPE, outfile=subprocess.PIPE)216 entry_point = run_result.strip('[]\n\r ')217 LOG.debug('Using entrypoint "%s" for container "%s".' % (entry_point, container_name))218 return ContainerInfo(container_name, entry_point)219 def destroy_docker_container(self, func_arn):220 """221 Stops and/or removes a docker container for a specific lambda function ARN.222 :param func_arn: The ARN of the lambda function.223 :return: None224 """225 with self.docker_container_lock:226 status = self.get_docker_container_status(func_arn)227 # Get the container name and id.228 container_name = self.get_container_name(func_arn)229 if status == 1:230 LOG.debug('Stopping container: %s' % container_name)231 cmd = (232 'docker stop -t0 %s'233 ) % (container_name)234 LOG.debug(cmd)235 run(cmd, asynchronous=False, stderr=subprocess.PIPE, outfile=subprocess.PIPE)236 status = self.get_docker_container_status(func_arn)237 if status == -1:238 LOG.debug('Removing container: %s' % container_name)239 cmd = (240 'docker rm %s'241 ) % (container_name)242 LOG.debug(cmd)243 run(cmd, asynchronous=False, stderr=subprocess.PIPE, outfile=subprocess.PIPE)244 def get_all_container_names(self):245 """246 Returns a list of container names for lambda containers.247 :return: A String[] localstack docker container names for each function.248 """249 with self.docker_container_lock:250 LOG.debug('Getting all lambda containers names.')251 cmd = 'docker ps -a --filter="name=localstack_lambda_*" --format "{{.Names}}"'252 LOG.debug(cmd)253 cmd_result = run(cmd, asynchronous=False, stderr=subprocess.PIPE, outfile=subprocess.PIPE).strip()254 if len(cmd_result) > 0:255 container_names = cmd_result.split('\n')256 else:257 container_names = []258 return container_names259 def destroy_existing_docker_containers(self):260 """261 Stops and/or removes all lambda docker containers for localstack.262 :return: None263 """264 with self.docker_container_lock:265 container_names = self.get_all_container_names()266 LOG.debug('Removing %d containers.' % len(container_names))267 for container_name in container_names:268 cmd = 'docker rm -f %s' % container_name269 LOG.debug(cmd)270 run(cmd, asynchronous=False, stderr=subprocess.PIPE, outfile=subprocess.PIPE)271 def get_docker_container_status(self, func_arn):272 """273 Determine the status of a docker container.274 :param func_arn: The ARN of the lambda function.275 :return: 1 If the container is running,276 -1 if the container exists but is not running277 0 if the container does not exist.278 """279 with self.docker_container_lock:280 # Get the container name and id.281 container_name = self.get_container_name(func_arn)282 # Check if the container is already running.283 LOG.debug('Getting container status: %s' % container_name)284 cmd = (285 'docker ps'286 ' -a'287 ' --filter name="%s"'288 ' --format "{{ .Status }}"'289 ) % (container_name)290 LOG.debug(cmd)291 cmd_result = run(cmd, asynchronous=False, stderr=subprocess.PIPE, outfile=subprocess.PIPE)292 # If the container doesn't exist. Create and start it.293 container_status = cmd_result.strip()294 if len(container_status) == 0:295 return 0296 if container_status.lower().startswith('up '):297 return 1298 return -1299 def idle_container_destroyer(self):300 """301 Iterates though all the lambda containers and destroys any container that has302 been inactive for longer than MAX_CONTAINER_IDLE_TIME.303 :return: None304 """305 LOG.info('Checking if there are idle containers.')306 current_time = time.time()307 for func_arn, last_run_time in self.function_invoke_times.items():308 duration = current_time - last_run_time309 # not enough idle time has passed310 if duration < MAX_CONTAINER_IDLE_TIME:311 continue312 # container has been idle, destroy it.313 self.destroy_docker_container(func_arn)314 def start_idle_container_destroyer_interval(self):315 """316 Starts a repeating timer that triggers start_idle_container_destroyer_interval every 60 seconds.317 Thus checking for idle containers and destroying them.318 :return: None319 """320 self.idle_container_destroyer()321 threading.Timer(60.0, self.start_idle_container_destroyer_interval).start()322 def get_container_name(self, func_arn):323 """324 Given a function ARN, returns a valid docker container name.325 :param func_arn: The ARN of the lambda function.326 :return: A docker compatible name for the arn.327 """328 return 'localstack_lambda_' + re.sub(r'[^a-zA-Z0-9_.-]', '_', func_arn)329class LambdaExecutorSeparateContainers(LambdaExecutorContainers):330 def prepare_execution(self, func_arn, env_vars, runtime, command, handler, lambda_cwd):331 entrypoint = ''332 if command:333 entrypoint = ' --entrypoint ""'334 else:335 command = '"%s"' % handler336 env_vars_string = ' '.join(['-e {}="${}"'.format(k, k) for (k, v) in env_vars.items()])337 if config.LAMBDA_REMOTE_DOCKER:338 cmd = (339 'CONTAINER_ID="$(docker create'340 ' %s'341 ' %s'342 ' "lambci/lambda:%s" %s'343 ')";'344 'docker cp "%s/." "$CONTAINER_ID:/var/task";'345 'docker start -a "$CONTAINER_ID";'346 ) % (entrypoint, env_vars_string, runtime, command, lambda_cwd)347 else:348 lambda_cwd_on_host = self.get_host_path_for_path_in_docker(lambda_cwd)349 cmd = (350 'docker run'351 '%s -v "%s":/var/task'352 ' %s'353 ' --rm'354 ' "lambci/lambda:%s" %s'355 ) % (entrypoint, lambda_cwd_on_host, env_vars_string, runtime, command)356 return cmd357 def get_host_path_for_path_in_docker(self, path):358 return re.sub(r'^%s/(.*)$' % config.TMP_FOLDER,359 r'%s/\1' % config.HOST_TMP_FOLDER, path)360class LambdaExecutorLocal(LambdaExecutor):361 def execute(self, func_arn, func_details, event, context=None, version=None, asynchronous=False):362 lambda_cwd = func_details.cwd363 environment = func_details.envvars.copy()364 # execute the Lambda function in a forked sub-process, sync result via queue365 queue = Queue()366 lambda_function = func_details.function(version)367 def do_execute():368 # now we're executing in the child process, safe to change CWD and ENV369 if lambda_cwd:370 os.chdir(lambda_cwd)371 if environment:372 os.environ.update(environment)373 result = lambda_function(event, context)374 queue.put(result)375 process = Process(target=do_execute)376 process.run()377 result = queue.get()378 # TODO capture log output during local execution?379 log_output = ''380 return result, log_output381 def execute_java_lambda(self, event, context, handler, main_file):382 event_file = EVENT_FILE_PATTERN.replace('*', short_uid())383 save_file(event_file, json.dumps(event))384 TMP_FILES.append(event_file)385 class_name = handler.split('::')[0]386 classpath = '%s:%s' % (LAMBDA_EXECUTOR_JAR, main_file)387 cmd = 'java -cp %s %s %s %s' % (classpath, LAMBDA_EXECUTOR_CLASS, class_name, event_file)388 asynchronous = False389 # flip asynchronous flag depending on origin390 if 'Records' in event:391 # TODO: add more event supporting asynchronous lambda execution392 if 'Sns' in event['Records'][0]:393 asynchronous = True394 if 'dynamodb' in event['Records'][0]:395 asynchronous = True396 result, log_output = self.run_lambda_executor(cmd, asynchronous=asynchronous)397 LOG.debug('Lambda result / log output:\n%s\n> %s' % (result.strip(), log_output.strip().replace('\n', '\n> ')))398 return result, log_output399# --------------400# GLOBAL STATE401# --------------402EXECUTOR_LOCAL = LambdaExecutorLocal()403EXECUTOR_CONTAINERS_SEPARATE = LambdaExecutorSeparateContainers()404EXECUTOR_CONTAINERS_REUSE = LambdaExecutorReuseContainers()405DEFAULT_EXECUTOR = EXECUTOR_LOCAL406# the keys of AVAILABLE_EXECUTORS map to the LAMBDA_EXECUTOR config variable407AVAILABLE_EXECUTORS = {408 'local': EXECUTOR_LOCAL,409 'docker': EXECUTOR_CONTAINERS_SEPARATE,410 'docker-reuse': EXECUTOR_CONTAINERS_REUSE...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful