How to use docker_image_for_lambda method in localstack

Best Python code snippet using localstack_python

lambda_executors.py

Source:lambda_executors.py Github

copy

Full Screen

...706 func_arn = lambda_function.arn()707 container_name = self.get_container_name(func_arn)708 status = self.get_docker_container_status(func_arn)709 LOG.debug('Priming Docker container (status "%s"): %s' % (status, container_name))710 docker_image = Util.docker_image_for_lambda(lambda_function)711 # Container is not running or doesn't exist.712 if status < 1:713 # Make sure the container does not exist in any form/state.714 self.destroy_docker_container(func_arn)715 # get container startup command and run it716 LOG.debug("Creating container: %s" % container_name)717 self.create_container(lambda_function, env_vars, lambda_cwd, docker_flags)718 if config.LAMBDA_REMOTE_DOCKER:719 LOG.debug(720 'Copying files to container "%s" from "%s".' % (container_name, lambda_cwd)721 )722 DOCKER_CLIENT.copy_into_container(723 container_name, "%s/." % lambda_cwd, DOCKER_TASK_FOLDER724 )725 LOG.debug("Starting docker-reuse Lambda container: %s", container_name)726 DOCKER_CLIENT.start_container(container_name)727 # give the container some time to start up728 time.sleep(1)729 container_network = self.get_docker_container_network(func_arn)730 entry_point = DOCKER_CLIENT.get_image_entrypoint(docker_image)731 LOG.debug(732 'Using entrypoint "%s" for container "%s" on network "%s".'733 % (entry_point, container_name, container_network)734 )735 return ContainerInfo(container_name, entry_point)736 def create_container(737 self,738 lambda_function: LambdaFunction,739 env_vars: Dict,740 lambda_cwd: str,741 docker_flags: str = None,742 ):743 docker_image = Util.docker_image_for_lambda(lambda_function)744 container_name = self.get_container_name(lambda_function.arn())745 # make sure we set LOCALSTACK_HOSTNAME746 Util.inject_endpoints_into_env(env_vars)747 # make sure AWS_LAMBDA_EVENT_BODY is not set (otherwise causes issues with "docker exec ..." above)748 env_vars.pop("AWS_LAMBDA_EVENT_BODY", None)749 network = config.LAMBDA_DOCKER_NETWORK750 additional_flags = docker_flags751 dns = config.LAMBDA_DOCKER_DNS752 mount_volumes = not config.LAMBDA_REMOTE_DOCKER753 lambda_cwd_on_host = Util.get_host_path_for_path_in_docker(lambda_cwd)754 if ":" in lambda_cwd and "\\" in lambda_cwd:755 lambda_cwd_on_host = Util.format_windows_path(lambda_cwd_on_host)756 mount_volumes = [(lambda_cwd_on_host, DOCKER_TASK_FOLDER)] if mount_volumes else None757 if os.environ.get("HOSTNAME"):758 env_vars["HOSTNAME"] = os.environ.get("HOSTNAME")759 env_vars["EDGE_PORT"] = config.EDGE_PORT760 LOG.debug(761 "Creating docker-reuse Lambda container %s from image %s", container_name, docker_image762 )763 return DOCKER_CLIENT.create_container(764 image_name=docker_image,765 remove=True,766 interactive=True,767 name=container_name,768 entrypoint="/bin/bash",769 network=network,770 env_vars=env_vars,771 dns=dns,772 mount_volumes=mount_volumes,773 additional_flags=additional_flags,774 )775 def destroy_docker_container(self, func_arn):776 """777 Stops and/or removes a docker container for a specific lambda function ARN.778 :param func_arn: The ARN of the lambda function.779 :return: None780 """781 with self.docker_container_lock:782 status = self.get_docker_container_status(func_arn)783 # Get the container name and id.784 container_name = self.get_container_name(func_arn)785 if status == 1:786 LOG.debug("Stopping container: %s" % container_name)787 DOCKER_CLIENT.stop_container(container_name)788 status = self.get_docker_container_status(func_arn)789 if status == -1:790 LOG.debug("Removing container: %s" % container_name)791 rm_docker_container(container_name, safe=True)792 # clean up function invoke times, as some init logic depends on this793 self.function_invoke_times.pop(func_arn, None)794 def get_all_container_names(self):795 """796 Returns a list of container names for lambda containers.797 :return: A String[] localstack docker container names for each function.798 """799 with self.docker_container_lock:800 LOG.debug("Getting all lambda containers names.")801 list_result = DOCKER_CLIENT.list_containers(802 filter=f"name={self.get_container_prefix()}*"803 )804 container_names = list(map(lambda container: container["name"], list_result))805 return container_names806 def destroy_existing_docker_containers(self):807 """808 Stops and/or removes all lambda docker containers for localstack.809 :return: None810 """811 with self.docker_container_lock:812 container_names = self.get_all_container_names()813 LOG.debug("Removing %d containers." % len(container_names))814 for container_name in container_names:815 DOCKER_CLIENT.remove_container(container_name)816 def get_docker_container_status(self, func_arn):817 """818 Determine the status of a docker container.819 :param func_arn: The ARN of the lambda function.820 :return: 1 If the container is running,821 -1 if the container exists but is not running822 0 if the container does not exist.823 """824 with self.docker_container_lock:825 # Get the container name and id.826 container_name = self.get_container_name(func_arn)827 container_status = DOCKER_CLIENT.get_container_status(container_name)828 return container_status.value829 def get_docker_container_network(self, func_arn):830 """831 Determine the network of a docker container.832 :param func_arn: The ARN of the lambda function.833 :return: name of the container network834 """835 with self.docker_container_lock:836 status = self.get_docker_container_status(func_arn)837 # container does not exist838 if status == 0:839 return ""840 # Get the container name.841 container_name = self.get_container_name(func_arn)842 container_network = DOCKER_CLIENT.get_network(container_name)843 return container_network844 def idle_container_destroyer(self):845 """846 Iterates though all the lambda containers and destroys any container that has847 been inactive for longer than MAX_CONTAINER_IDLE_TIME_MS.848 :return: None849 """850 LOG.debug("Checking if there are idle containers ...")851 current_time = int(time.time() * 1000)852 for func_arn, last_run_time in dict(self.function_invoke_times).items():853 duration = current_time - last_run_time854 # not enough idle time has passed855 if duration < MAX_CONTAINER_IDLE_TIME_MS:856 continue857 # container has been idle, destroy it.858 self.destroy_docker_container(func_arn)859 def start_idle_container_destroyer_interval(self):860 """861 Starts a repeating timer that triggers start_idle_container_destroyer_interval every 60 seconds.862 Thus checking for idle containers and destroying them.863 :return: None864 """865 self.idle_container_destroyer()866 threading.Timer(60.0, self.start_idle_container_destroyer_interval).start()867 def get_container_prefix(self) -> str:868 """869 Returns the prefix of all docker-reuse lambda containers for this LocalStack instance870 :return: Lambda container name prefix871 """872 return f"{bootstrap.get_main_container_name()}_lambda_"873 def get_container_name(self, func_arn):874 """875 Given a function ARN, returns a valid docker container name.876 :param func_arn: The ARN of the lambda function.877 :return: A docker compatible name for the arn.878 """879 return self.get_container_prefix() + re.sub(r"[^a-zA-Z0-9_.-]", "_", func_arn)880class LambdaExecutorSeparateContainers(LambdaExecutorContainers):881 def __init__(self):882 super(LambdaExecutorSeparateContainers, self).__init__()883 self.max_port = LAMBDA_API_UNIQUE_PORTS884 self.port_offset = LAMBDA_API_PORT_OFFSET885 def prepare_event(self, environment: Dict, event_body: str) -> bytes:886 # Tell Lambci to use STDIN for the event887 environment["DOCKER_LAMBDA_USE_STDIN"] = "1"888 return event_body.encode()889 def execute_in_container(890 self,891 lambda_function: LambdaFunction,892 inv_context: InvocationContext,893 stdin=None,894 background=False,895 ) -> Tuple[bytes, bytes]:896 lambda_cwd = lambda_function.cwd897 env_vars = inv_context.environment898 entrypoint = None899 if inv_context.lambda_command:900 entrypoint = ""901 elif inv_context.handler:902 inv_context.lambda_command = inv_context.handler903 # add Docker Lambda env vars904 network = config.LAMBDA_DOCKER_NETWORK or None905 if network == "host":906 port = get_free_tcp_port()907 env_vars["DOCKER_LAMBDA_API_PORT"] = port908 env_vars["DOCKER_LAMBDA_RUNTIME_PORT"] = port909 additional_flags = inv_context.docker_flags or ""910 dns = config.LAMBDA_DOCKER_DNS911 docker_java_ports = PortMappings()912 if Util.debug_java_port:913 docker_java_ports.add(Util.debug_java_port)914 docker_image = Util.docker_image_for_lambda(lambda_function)915 if config.LAMBDA_REMOTE_DOCKER:916 container_id = DOCKER_CLIENT.create_container(917 image_name=docker_image,918 interactive=True,919 entrypoint=entrypoint,920 remove=True,921 network=network,922 env_vars=env_vars,923 dns=dns,924 additional_flags=additional_flags,925 ports=docker_java_ports,926 command=inv_context.lambda_command,927 )928 DOCKER_CLIENT.copy_into_container(container_id, f"{lambda_cwd}/.", DOCKER_TASK_FOLDER)929 return DOCKER_CLIENT.start_container(930 container_id, interactive=not background, attach=not background, stdin=stdin931 )932 else:933 mount_volumes = None934 if lambda_cwd:935 mount_volumes = [936 (Util.get_host_path_for_path_in_docker(lambda_cwd), DOCKER_TASK_FOLDER)937 ]938 return DOCKER_CLIENT.run_container(939 image_name=docker_image,940 interactive=True,941 detach=background,942 entrypoint=entrypoint,943 remove=True,944 network=network,945 env_vars=env_vars,946 dns=dns,947 additional_flags=additional_flags,948 command=inv_context.lambda_command,949 mount_volumes=mount_volumes,950 stdin=stdin,951 )952class LambdaExecutorLocal(LambdaExecutor):953 def _execute_in_custom_runtime(954 self, cmd: Union[str, List[str]], lambda_function: LambdaFunction = None955 ) -> InvocationResult:956 """957 Generic run function for executing lambdas in custom runtimes.958 :param cmd: the command to execute959 :param lambda_function: function details960 :return: the InvocationResult961 """962 env_vars = lambda_function and lambda_function.envvars963 kwargs = {"stdin": True, "inherit_env": True, "asynchronous": True, "env_vars": env_vars}964 process = run(cmd, stderr=subprocess.PIPE, outfile=subprocess.PIPE, **kwargs)965 result, log_output = process.communicate()966 try:967 result = to_str(result).strip()968 except Exception:969 pass970 log_output = to_str(log_output).strip()971 return_code = process.returncode972 # Note: The user's code may have been logging to stderr, in which case the logs973 # will be part of the "result" variable here. Hence, make sure that we extract974 # only the *last* line of "result" and consider anything above that as log output.975 # TODO: not sure if this code is needed/used976 if isinstance(result, str) and "\n" in result:977 lines = result.split("\n")978 idx = last_index_of(979 lines, lambda line: line and not line.startswith(INTERNAL_LOG_PREFIX)980 )981 if idx >= 0:982 result = lines[idx]983 additional_logs = "\n".join(lines[:idx] + lines[idx + 1 :])984 log_output += "\n%s" % additional_logs985 log_formatted = log_output.strip().replace("\n", "\n> ")986 func_arn = lambda_function and lambda_function.arn()987 LOG.debug(988 "Lambda %s result / log output:\n%s\n> %s" % (func_arn, result.strip(), log_formatted)989 )990 # store log output - TODO get live logs from `process` above?991 # store_lambda_logs(lambda_function, log_output)992 if return_code != 0:993 raise InvocationException(994 "Lambda process returned error status code: %s. Result: %s. Output:\n%s"995 % (return_code, result, log_output),996 log_output,997 result,998 )999 invocation_result = InvocationResult(result, log_output=log_output)1000 return invocation_result1001 def _execute(1002 self, lambda_function: LambdaFunction, inv_context: InvocationContext1003 ) -> InvocationResult:1004 # apply plugin patches to prepare invocation context1005 result = self.apply_plugin_patches(inv_context)1006 if isinstance(result, InvocationResult):1007 return result1008 lambda_cwd = lambda_function.cwd1009 environment = self._prepare_environment(lambda_function)1010 if lambda_function.timeout:1011 environment["AWS_LAMBDA_FUNCTION_TIMEOUT"] = str(lambda_function.timeout)1012 context = inv_context.context1013 if context:1014 environment["AWS_LAMBDA_FUNCTION_NAME"] = context.function_name1015 environment["AWS_LAMBDA_FUNCTION_VERSION"] = context.function_version1016 environment["AWS_LAMBDA_FUNCTION_INVOKED_ARN"] = context.invoked_function_arn1017 environment["AWS_LAMBDA_FUNCTION_MEMORY_SIZE"] = str(context.memory_limit_in_mb)1018 # execute the Lambda function in a forked sub-process, sync result via queue1019 queue = Queue()1020 lambda_function_callable = lambda_function.function(inv_context.function_version)1021 def do_execute():1022 # now we're executing in the child process, safe to change CWD and ENV1023 result = None1024 try:1025 if lambda_cwd:1026 os.chdir(lambda_cwd)1027 sys.path.insert(0, "")1028 if environment:1029 os.environ.update(environment)1030 # set default env variables required for most Lambda handlers1031 self.set_default_env_variables()1032 # run the actual handler function1033 result = lambda_function_callable(inv_context.event, context)1034 except Exception as e:1035 result = str(e)1036 sys.stderr.write("%s %s" % (e, traceback.format_exc()))1037 raise1038 finally:1039 queue.put(result)1040 process = Process(target=do_execute)1041 start_time = now(millis=True)1042 error = None1043 with CaptureOutput() as c:1044 try:1045 process.run()1046 except Exception as e:1047 error = e1048 result = queue.get()1049 end_time = now(millis=True)1050 # Make sure to keep the log line below, to ensure the log stream gets created1051 request_id = long_uid()1052 log_output = 'START %s: Lambda %s started via "local" executor ...' % (1053 request_id,1054 lambda_function.arn(),1055 )1056 # TODO: Interweaving stdout/stderr currently not supported1057 for stream in (c.stdout(), c.stderr()):1058 if stream:1059 log_output += ("\n" if log_output else "") + stream1060 if isinstance(result, InvocationResult) and result.log_output:1061 log_output += "\n" + result.log_output1062 log_output += "\nEND RequestId: %s" % request_id1063 log_output += "\nREPORT RequestId: %s Duration: %s ms" % (1064 request_id,1065 int((end_time - start_time) * 1000),1066 )1067 # store logs to CloudWatch1068 store_lambda_logs(lambda_function, log_output)1069 result = result.result if isinstance(result, InvocationResult) else result1070 if error:1071 LOG.info(1072 'Error executing Lambda "%s": %s %s',1073 lambda_function.arn(),1074 error,1075 "".join(traceback.format_tb(error.__traceback__)),1076 )1077 raise InvocationException(result, log_output)1078 # construct final invocation result1079 invocation_result = InvocationResult(result, log_output=log_output)1080 # run plugins post-processing logic1081 invocation_result = self.process_result_via_plugins(inv_context, invocation_result)1082 return invocation_result1083 def provide_file_to_lambda(self, local_file: str, inv_context: InvocationContext) -> str:1084 # This is a no-op for local executors - simply return the given local file path1085 return local_file1086 def execute_java_lambda(1087 self, event, context, main_file, lambda_function: LambdaFunction = None1088 ) -> InvocationResult:1089 lambda_function.envvars = lambda_function.envvars or {}1090 java_opts = config.LAMBDA_JAVA_OPTS or ""1091 handler = lambda_function.handler1092 lambda_function.envvars[LAMBDA_HANDLER_ENV_VAR_NAME] = handler1093 event_file = EVENT_FILE_PATTERN.replace("*", short_uid())1094 save_file(event_file, json.dumps(json_safe(event)))1095 TMP_FILES.append(event_file)1096 classpath = "%s:%s:%s" % (1097 main_file,1098 Util.get_java_classpath(main_file),1099 LAMBDA_EXECUTOR_JAR,1100 )1101 cmd = "java %s -cp %s %s %s" % (1102 java_opts,1103 classpath,1104 LAMBDA_EXECUTOR_CLASS,1105 event_file,1106 )1107 # apply plugin patches1108 inv_context = InvocationContext(1109 lambda_function, event, environment=lambda_function.envvars, lambda_command=cmd1110 )1111 result = self.apply_plugin_patches(inv_context)1112 if isinstance(result, InvocationResult):1113 return result1114 cmd = inv_context.lambda_command1115 LOG.info(cmd)1116 # execute Lambda and get invocation result1117 invocation_result = self._execute_in_custom_runtime(cmd, lambda_function=lambda_function)1118 return invocation_result1119 def execute_javascript_lambda(1120 self, event, context, main_file, lambda_function: LambdaFunction = None1121 ):1122 handler = lambda_function.handler1123 function = handler.split(".")[-1]1124 event_json_string = "%s" % (json.dumps(json_safe(event)) if event else "{}")1125 context_json_string = "%s" % (json.dumps(context.__dict__) if context else "{}")1126 cmd = [1127 "node",1128 "-e",1129 'require("%s").%s(%s,%s).then(r => process.stdout.write(JSON.stringify(r)))'1130 % (1131 main_file,1132 function,1133 event_json_string,1134 context_json_string,1135 ),1136 ]1137 LOG.info(cmd)1138 result = self._execute_in_custom_runtime(cmd, lambda_function=lambda_function)1139 return result1140 @staticmethod1141 def set_default_env_variables():1142 # set default env variables required for most Lambda handlers1143 default_env_vars = {"AWS_DEFAULT_REGION": aws_stack.get_region()}1144 env_vars_before = {var: os.environ.get(var) for var in default_env_vars}1145 os.environ.update({k: v for k, v in default_env_vars.items() if not env_vars_before.get(k)})1146 return env_vars_before1147 @staticmethod1148 def reset_default_env_variables(env_vars_before):1149 for env_name, env_value in env_vars_before.items():1150 env_value_before = env_vars_before.get(env_name)1151 os.environ[env_name] = env_value_before or ""1152 if env_value_before is None:1153 os.environ.pop(env_name, None)1154 def execute_go_lambda(self, event, context, main_file, lambda_function: LambdaFunction = None):1155 if lambda_function:1156 lambda_function.envvars["AWS_LAMBDA_FUNCTION_HANDLER"] = main_file1157 lambda_function.envvars["AWS_LAMBDA_EVENT_BODY"] = json.dumps(json_safe(event))1158 else:1159 LOG.warning("Unable to get function details for local execution of Golang Lambda")1160 cmd = GO_LAMBDA_RUNTIME1161 LOG.info(cmd)1162 result = self._execute_in_custom_runtime(cmd, lambda_function=lambda_function)1163 return result1164class Util:1165 debug_java_port = False1166 @classmethod1167 def get_java_opts(cls):1168 opts = config.LAMBDA_JAVA_OPTS or ""1169 # Replace _debug_port_ with a random free port1170 if "_debug_port_" in opts:1171 if not cls.debug_java_port:1172 cls.debug_java_port = get_free_tcp_port()1173 opts = opts.replace("_debug_port_", ("%s" % cls.debug_java_port))1174 else:1175 # Parse the debug port from opts1176 m = re.match(".*address=(.+:)?(\\d+).*", opts)1177 if m is not None:1178 cls.debug_java_port = m.groups()[1]1179 return opts1180 @classmethod1181 def get_host_path_for_path_in_docker(cls, path):1182 return re.sub(r"^%s/(.*)$" % config.TMP_FOLDER, r"%s/\1" % config.HOST_TMP_FOLDER, path)1183 @classmethod1184 def format_windows_path(cls, path):1185 temp = path.replace(":", "").replace("\\", "/")1186 if len(temp) >= 1 and temp[:1] != "/":1187 temp = "/" + temp1188 temp = "%s%s" % (config.WINDOWS_DOCKER_MOUNT_PREFIX, temp)1189 return temp1190 @classmethod1191 def docker_image_for_lambda(cls, lambda_function: LambdaFunction):1192 runtime = lambda_function.runtime or ""1193 if lambda_function.code.get("ImageUri"):1194 LOG.warning(1195 "ImageUri is set: Using Lambda container images is only supported in LocalStack Pro"1196 )1197 docker_tag = runtime1198 docker_image = config.LAMBDA_CONTAINER_REGISTRY1199 # TODO: remove prefix once execution issues are fixed with dotnetcore/python lambdas1200 # See https://github.com/lambci/docker-lambda/pull/2181201 lambdas_to_add_prefix = [1202 "dotnetcore2.0",1203 "dotnetcore2.1",1204 "python3.6",1205 "python3.7",...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful