How to use get_container_prefix method in localstack

Best Python code snippet using localstack_python

lambda_executors.py

Source:lambda_executors.py Github

copy

Full Screen

...798 """799 with self.docker_container_lock:800 LOG.debug("Getting all lambda containers names.")801 list_result = DOCKER_CLIENT.list_containers(802 filter=f"name={self.get_container_prefix()}*"803 )804 container_names = list(map(lambda container: container["name"], list_result))805 return container_names806 def destroy_existing_docker_containers(self):807 """808 Stops and/or removes all lambda docker containers for localstack.809 :return: None810 """811 with self.docker_container_lock:812 container_names = self.get_all_container_names()813 LOG.debug("Removing %d containers." % len(container_names))814 for container_name in container_names:815 DOCKER_CLIENT.remove_container(container_name)816 def get_docker_container_status(self, func_arn):817 """818 Determine the status of a docker container.819 :param func_arn: The ARN of the lambda function.820 :return: 1 If the container is running,821 -1 if the container exists but is not running822 0 if the container does not exist.823 """824 with self.docker_container_lock:825 # Get the container name and id.826 container_name = self.get_container_name(func_arn)827 container_status = DOCKER_CLIENT.get_container_status(container_name)828 return container_status.value829 def get_docker_container_network(self, func_arn):830 """831 Determine the network of a docker container.832 :param func_arn: The ARN of the lambda function.833 :return: name of the container network834 """835 with self.docker_container_lock:836 status = self.get_docker_container_status(func_arn)837 # container does not exist838 if status == 0:839 return ""840 # Get the container name.841 container_name = self.get_container_name(func_arn)842 container_network = DOCKER_CLIENT.get_network(container_name)843 return container_network844 def idle_container_destroyer(self):845 """846 Iterates though all the lambda containers and destroys any container that has847 been inactive for longer than MAX_CONTAINER_IDLE_TIME_MS.848 :return: None849 """850 LOG.debug("Checking if there are idle containers ...")851 current_time = int(time.time() * 1000)852 for func_arn, last_run_time in dict(self.function_invoke_times).items():853 duration = current_time - last_run_time854 # not enough idle time has passed855 if duration < MAX_CONTAINER_IDLE_TIME_MS:856 continue857 # container has been idle, destroy it.858 self.destroy_docker_container(func_arn)859 def start_idle_container_destroyer_interval(self):860 """861 Starts a repeating timer that triggers start_idle_container_destroyer_interval every 60 seconds.862 Thus checking for idle containers and destroying them.863 :return: None864 """865 self.idle_container_destroyer()866 threading.Timer(60.0, self.start_idle_container_destroyer_interval).start()867 def get_container_prefix(self) -> str:868 """869 Returns the prefix of all docker-reuse lambda containers for this LocalStack instance870 :return: Lambda container name prefix871 """872 return f"{bootstrap.get_main_container_name()}_lambda_"873 def get_container_name(self, func_arn):874 """875 Given a function ARN, returns a valid docker container name.876 :param func_arn: The ARN of the lambda function.877 :return: A docker compatible name for the arn.878 """879 return self.get_container_prefix() + re.sub(r"[^a-zA-Z0-9_.-]", "_", func_arn)880class LambdaExecutorSeparateContainers(LambdaExecutorContainers):881 def __init__(self):882 super(LambdaExecutorSeparateContainers, self).__init__()883 self.max_port = LAMBDA_API_UNIQUE_PORTS884 self.port_offset = LAMBDA_API_PORT_OFFSET885 def prepare_event(self, environment: Dict, event_body: str) -> bytes:886 # Tell Lambci to use STDIN for the event887 environment["DOCKER_LAMBDA_USE_STDIN"] = "1"888 return event_body.encode()889 def execute_in_container(890 self,891 lambda_function: LambdaFunction,892 inv_context: InvocationContext,893 stdin=None,...

Full Screen

Full Screen

test_dispatcher_method.py

Source:test_dispatcher_method.py Github

copy

Full Screen

...71 req = Request.blank('/both/v1.0/AUTH_test')72 self.assertEqual(self.app.app._get_merged_path(req), ('AUTH_test', None, None, None))73 req = Request.blank('/both')74 self.assertEqual(self.app.app._get_merged_path(req), (None, None, None, None))75 def test_get_container_prefix(self):76 """ get_container_prefix """77 container = 'hoge:TEST0'78 self.assertEqual(self.app.app._get_container_prefix(container), 'hoge')79 container = 'TEST0'80 self.assertEqual(self.app.app._get_container_prefix(container), None)81 def test_get_copy_from(self):82 """ get_copy_from """83 req = Request.blank('/both/v1.0/AUTH_test/gere:TEST0/copied_test0.txt')84 req.headers['x-copy-from'] = '/hoge:TEST0/test0.txt'85 self.assertEqual(self.app.app._get_copy_from(req), ('hoge', 'TEST0', 'test0.txt'))86 def test_merge_headers(self):87 """ merge_headers """88 resp0 = Response()89 resp0.headers['x-storage-url'] = 'http://192.168.2.0:8080/v1.0/AUTH_test'90 resp0.headers['x-auth-token'] = 'dummy0'91 resp0.headers['x-account-bytes-used'] = '1024'92 resp0.headers['x-account-container-count'] = '10'93 resp0.headers['x-account-object-count'] = '5'94 resp1 = Response()...

Full Screen

Full Screen

structures.py

Source:structures.py Github

copy

Full Screen

...85 base_dir_mount=self.base_dir_mount86 )87 )88 @classmethod89 def get_container_prefix(cls):90 return 'agent_{name}_{flavor}'.format(name=cls.name, flavor=cls.flavor)91 @classmethod92 def get_container_name(cls, instance_name=None, location=None, direct=False):93 if location and direct:94 return '{}_{}'.format(cls.get_container_prefix(), basepath(location).lower())95 else:96 return '{}_{}'.format(cls.get_container_prefix(), instance_name or DEFAULT_NAME)97 def format_compose_file(self, compose_file):98 return compose_file.format(99 image=self.image,100 api_key=self.api_key,101 container_name=self.container_name,102 conf_mount=self.conf_mount,103 check_mount=self.check_mount,104 base_mount=self.base_mount,105 **self.options106 )107class VagrantCheck(Check):108 def __init__(self, d, api_key, conf_path, agent_version, check_dirs=None,109 instance_name=None, no_instance=False, direct=False, **options):110 super().__init__(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful