Best Python code snippet using localstack_python
lambda_executors.py
Source:lambda_executors.py  
...798        """799        with self.docker_container_lock:800            LOG.debug("Getting all lambda containers names.")801            list_result = DOCKER_CLIENT.list_containers(802                filter=f"name={self.get_container_prefix()}*"803            )804            container_names = list(map(lambda container: container["name"], list_result))805            return container_names806    def destroy_existing_docker_containers(self):807        """808        Stops and/or removes all lambda docker containers for localstack.809        :return: None810        """811        with self.docker_container_lock:812            container_names = self.get_all_container_names()813            LOG.debug("Removing %d containers." % len(container_names))814            for container_name in container_names:815                DOCKER_CLIENT.remove_container(container_name)816    def get_docker_container_status(self, func_arn):817        """818        Determine the status of a docker container.819        :param func_arn: The ARN of the lambda function.820        :return: 1 If the container is running,821        -1 if the container exists but is not running822        0 if the container does not exist.823        """824        with self.docker_container_lock:825            # Get the container name and id.826            container_name = self.get_container_name(func_arn)827            container_status = DOCKER_CLIENT.get_container_status(container_name)828            return container_status.value829    def get_docker_container_network(self, func_arn):830        """831        Determine the network of a docker container.832        :param func_arn: The ARN of the lambda function.833        :return: name of the container network834        """835        with self.docker_container_lock:836            status = self.get_docker_container_status(func_arn)837            # container does not exist838            if status == 0:839                return ""840            # Get the container name.841            container_name = self.get_container_name(func_arn)842            container_network = DOCKER_CLIENT.get_network(container_name)843            return container_network844    def idle_container_destroyer(self):845        """846        Iterates though all the lambda containers and destroys any container that has847        been inactive for longer than MAX_CONTAINER_IDLE_TIME_MS.848        :return: None849        """850        LOG.debug("Checking if there are idle containers ...")851        current_time = int(time.time() * 1000)852        for func_arn, last_run_time in dict(self.function_invoke_times).items():853            duration = current_time - last_run_time854            # not enough idle time has passed855            if duration < MAX_CONTAINER_IDLE_TIME_MS:856                continue857            # container has been idle, destroy it.858            self.destroy_docker_container(func_arn)859    def start_idle_container_destroyer_interval(self):860        """861        Starts a repeating timer that triggers start_idle_container_destroyer_interval every 60 seconds.862        Thus checking for idle containers and destroying them.863        :return: None864        """865        self.idle_container_destroyer()866        threading.Timer(60.0, self.start_idle_container_destroyer_interval).start()867    def get_container_prefix(self) -> str:868        """869        Returns the prefix of all docker-reuse lambda containers for this LocalStack instance870        :return: Lambda container name prefix871        """872        return f"{bootstrap.get_main_container_name()}_lambda_"873    def get_container_name(self, func_arn):874        """875        Given a function ARN, returns a valid docker container name.876        :param func_arn: The ARN of the lambda function.877        :return: A docker compatible name for the arn.878        """879        return self.get_container_prefix() + re.sub(r"[^a-zA-Z0-9_.-]", "_", func_arn)880class LambdaExecutorSeparateContainers(LambdaExecutorContainers):881    def __init__(self):882        super(LambdaExecutorSeparateContainers, self).__init__()883        self.max_port = LAMBDA_API_UNIQUE_PORTS884        self.port_offset = LAMBDA_API_PORT_OFFSET885    def prepare_event(self, environment: Dict, event_body: str) -> bytes:886        # Tell Lambci to use STDIN for the event887        environment["DOCKER_LAMBDA_USE_STDIN"] = "1"888        return event_body.encode()889    def execute_in_container(890        self,891        lambda_function: LambdaFunction,892        inv_context: InvocationContext,893        stdin=None,...test_dispatcher_method.py
Source:test_dispatcher_method.py  
...71        req = Request.blank('/both/v1.0/AUTH_test')72        self.assertEqual(self.app.app._get_merged_path(req), ('AUTH_test', None, None, None))73        req = Request.blank('/both')74        self.assertEqual(self.app.app._get_merged_path(req), (None, None, None, None))75    def test_get_container_prefix(self):76        """ get_container_prefix """77        container = 'hoge:TEST0'78        self.assertEqual(self.app.app._get_container_prefix(container), 'hoge')79        container = 'TEST0'80        self.assertEqual(self.app.app._get_container_prefix(container), None)81    def test_get_copy_from(self):82        """ get_copy_from """83        req = Request.blank('/both/v1.0/AUTH_test/gere:TEST0/copied_test0.txt')84        req.headers['x-copy-from'] = '/hoge:TEST0/test0.txt'85        self.assertEqual(self.app.app._get_copy_from(req), ('hoge', 'TEST0', 'test0.txt'))86    def test_merge_headers(self):87        """ merge_headers """88        resp0 = Response()89        resp0.headers['x-storage-url'] = 'http://192.168.2.0:8080/v1.0/AUTH_test'90        resp0.headers['x-auth-token'] = 'dummy0'91        resp0.headers['x-account-bytes-used'] = '1024'92        resp0.headers['x-account-container-count'] = '10'93        resp0.headers['x-account-object-count'] = '5'94        resp1 = Response()...structures.py
Source:structures.py  
...85                base_dir_mount=self.base_dir_mount86            )87        )88    @classmethod89    def get_container_prefix(cls):90        return 'agent_{name}_{flavor}'.format(name=cls.name, flavor=cls.flavor)91    @classmethod92    def get_container_name(cls, instance_name=None, location=None, direct=False):93        if location and direct:94            return '{}_{}'.format(cls.get_container_prefix(), basepath(location).lower())95        else:96            return '{}_{}'.format(cls.get_container_prefix(), instance_name or DEFAULT_NAME)97    def format_compose_file(self, compose_file):98        return compose_file.format(99            image=self.image,100            api_key=self.api_key,101            container_name=self.container_name,102            conf_mount=self.conf_mount,103            check_mount=self.check_mount,104            base_mount=self.base_mount,105            **self.options106        )107class VagrantCheck(Check):108    def __init__(self, d, api_key, conf_path, agent_version, check_dirs=None,109                 instance_name=None, no_instance=False, direct=False, **options):110        super().__init__(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
