Best Python code snippet using avocado_python
abstract_spawn_test_environment.py
Source:abstract_spawn_test_environment.py  
1from pathlib import Path2from typing import Generator, Tuple, Optional3import luigi4from exasol_integration_test_docker_environment.abstract_method_exception import AbstractMethodException5from exasol_integration_test_docker_environment.lib.base.base_task import BaseTask6from exasol_integration_test_docker_environment.lib.base.docker_base_task import DockerBaseTask7from exasol_integration_test_docker_environment.lib.data.container_info import ContainerInfo8from exasol_integration_test_docker_environment.lib.data.database_credentials import DatabaseCredentialsParameter9from exasol_integration_test_docker_environment.lib.data.database_info import DatabaseInfo10from exasol_integration_test_docker_environment.lib.data.docker_network_info import DockerNetworkInfo11from exasol_integration_test_docker_environment.lib.data.docker_volume_info import DockerVolumeInfo12from exasol_integration_test_docker_environment.lib.data.environment_info import EnvironmentInfo13from exasol_integration_test_docker_environment.lib.test_environment.docker_container_copy import DockerContainerCopy14from exasol_integration_test_docker_environment.lib.test_environment.parameter.general_spawn_test_environment_parameter import \15    GeneralSpawnTestEnvironmentParameter16from exasol_integration_test_docker_environment.lib.test_environment.spawn_test_container import SpawnTestContainer17DATABASE = "database"18TEST_CONTAINER = "test_container"19class AbstractSpawnTestEnvironment(DockerBaseTask,20                                   GeneralSpawnTestEnvironmentParameter,21                                   DatabaseCredentialsParameter):22    environment_name = luigi.Parameter()  # type: str23    def __init__(self, *args, **kwargs):24        super().__init__(*args, **kwargs)25        self.test_container_name = f"""test_container_{self.environment_name}"""26        self.network_name = f"""db_network_{self.environment_name}"""27    def get_environment_type(self):28        raise AbstractMethodException()29    def run_task(self):30        test_environment_info = yield from self._attempt_database_start()31        self.return_object(test_environment_info)32    def _attempt_database_start(self):33        is_database_ready = False34        attempt = 035        database_info = None36        test_container_info = None37        while not is_database_ready and attempt < self.max_start_attempts:38            network_info, database_info, is_database_ready, test_container_info = \39                yield from self._start_database(attempt)40            attempt += 141        if not is_database_ready and not attempt < self.max_start_attempts:42            raise Exception(f"Maximum attempts {attempt} to start the database reached.")43        test_environment_info = \44            EnvironmentInfo(name=self.environment_name,45                            env_type=self.get_environment_type(),46                            database_info=database_info,47                            test_container_info=test_container_info,48                            network_info=network_info)49        self.create_test_environment_info_in_test_container_and_on_host(test_environment_info)50        return test_environment_info51    def create_test_environment_info_in_test_container(self, test_environment_info: EnvironmentInfo,52                                                       environment_variables: str,53                                                       environment_variables_with_export: str,54                                                       json: str):55        test_container_name = test_environment_info.test_container_info.container_name56        with self._get_docker_client() as docker_client:57            test_container = docker_client.containers.get(test_container_name)58            self.logger.info(f"Create test environment info in test container '{test_container_name}' at '/'")59            copy = DockerContainerCopy(test_container)60            copy.add_string_to_file("environment_info.json", json)61            copy.add_string_to_file("environment_info.conf", environment_variables)62            copy.add_string_to_file("environment_info.sh", environment_variables_with_export)63            copy.copy("/")64    def create_test_environment_info_in_test_container_and_on_host(65            self, test_environment_info: EnvironmentInfo):66        test_environment_info_base_host_path = Path(self.get_cache_path(),67                                                    f"environments/{self.environment_name}")68        test_environment_info_base_host_path.mkdir(exist_ok=True, parents=True)69        self.logger.info(f"Create test environment info on the host at '{test_environment_info_base_host_path}'")70        json = test_environment_info.to_json()71        cache_environment_info_json_path = Path(test_environment_info_base_host_path,72                                                "environment_info.json")73        with cache_environment_info_json_path.open("w") as f:74            f.write(json)75        if test_environment_info.test_container_info is not None:76            test_container_name = test_environment_info.test_container_info.container_name77        else:78            test_container_name = ""79        environment_variables = \80            self.collect_environment_info_variables(test_container_name,81                                                    test_environment_info)82        cache_environment_info_conf_path = Path(test_environment_info_base_host_path,83                                                "environment_info.conf")84        with cache_environment_info_conf_path.open("w") as f:85            f.write(environment_variables)86        environment_variables_with_export = ""87        for line in environment_variables.splitlines():88            environment_variables_with_export += f"export {line}\n"89        cache_environment_info_sh_path = Path(test_environment_info_base_host_path, "environment_info.sh")90        with cache_environment_info_sh_path.open("w") as f:91            f.write(environment_variables_with_export)92        if test_environment_info.test_container_info is not None:93            self.create_test_environment_info_in_test_container(test_environment_info,94                                                                environment_variables,95                                                                environment_variables_with_export, json)96    def collect_environment_info_variables(self, test_container_name: str, test_environment_info):97        environment_variables = ""98        environment_variables += f"ENVIRONMENT_NAME={test_environment_info.name}\n"99        environment_variables += f"ENVIRONMENT_TYPE={test_environment_info.type}\n"100        environment_variables += f"ENVIRONMENT_DATABASE_HOST={test_environment_info.database_info.host}\n"101        environment_variables += f"ENVIRONMENT_DATABASE_DB_PORT={test_environment_info.database_info.db_port}\n"102        environment_variables += f"ENVIRONMENT_DATABASE_BUCKETFS_PORT={test_environment_info.database_info.bucketfs_port}\n"103        if test_environment_info.database_info.container_info is not None:104            environment_variables += f"""ENVIRONMENT_DATABASE_CONTAINER_NAME={test_environment_info.database_info.container_info.container_name}\n"""105            database_container_network_aliases = " ".join(106                test_environment_info.database_info.container_info.network_aliases)107            environment_variables += f"""ENVIRONMENT_DATABASE_CONTAINER_NETWORK_ALIASES="{database_container_network_aliases}"\n"""108            environment_variables += f"""ENVIRONMENT_DATABASE_CONTAINER_IP_ADDRESS={test_environment_info.database_info.container_info.ip_address}\n"""109            environment_variables += f"""ENVIRONMENT_DATABASE_CONTAINER_VOLUMNE_NAME={test_environment_info.database_info.container_info.volume_name}\n"""110            with self._get_docker_client() as docker_client:111                db_container = docker_client.containers.get(112                    test_environment_info.database_info.container_info.container_name)113                db_container.reload()114                default_bridge_ip_address = db_container.attrs["NetworkSettings"]["Networks"]["bridge"]["IPAddress"]115            environment_variables += f"""ENVIRONMENT_DATABASE_CONTAINER_DEFAULT_BRIDGE_IP_ADDRESS={default_bridge_ip_address}\n"""116        if test_environment_info.test_container_info is not None:117            environment_variables += f"""ENVIRONMENT_TEST_CONTAINER_NAME={test_container_name}\n"""118            test_container_network_aliases = " ".join(test_environment_info.test_container_info.network_aliases)119            environment_variables += f"""ENVIRONMENT_TEST_CONTAINER_NETWORK_ALIASES="{test_container_network_aliases}"\n"""120            environment_variables += f"""ENVIRONMENT_TEST_CONTAINER_IP_ADDRESS={test_environment_info.test_container_info.ip_address}\n"""121        return environment_variables122    def _start_database(self, attempt) \123            -> Generator[BaseTask, BaseTask, Tuple[DockerNetworkInfo, DatabaseInfo, bool, Optional[ContainerInfo]]]:124        network_info = yield from self._create_network(attempt)125        ssl_volume_info = None126        if self.create_certificates:127            ssl_volume_info = yield from self._create_ssl_certificates()128        database_info, test_container_info = \129            yield from self._spawn_database_and_test_container(network_info, ssl_volume_info, attempt)130        is_database_ready = yield from self._wait_for_database(database_info, attempt)131        return network_info, database_info, is_database_ready, test_container_info132    def _create_ssl_certificates(self) -> DockerVolumeInfo:133        ssl_info_future = yield from self.run_dependencies(self.create_ssl_certificates())134        ssl_info = self.get_values_from_future(ssl_info_future)135        return ssl_info136    def create_ssl_certificates(self):137        raise AbstractMethodException()138    def _create_network(self, attempt):139        network_info_future = yield from self.run_dependencies(self.create_network_task(attempt))140        network_info = self.get_values_from_future(network_info_future)141        return network_info142    def create_network_task(self, attempt: int):143        raise AbstractMethodException()144    def _spawn_database_and_test_container(self,145                                           network_info: DockerNetworkInfo,146                                           certificate_volume_info: Optional[DockerVolumeInfo],147                                           attempt: int) -> Tuple[DatabaseInfo, Optional[ContainerInfo]]:148        certificate_volume_name = certificate_volume_info.volume_name if certificate_volume_info is not None else None149        dependencies_tasks = {150                DATABASE: self.create_spawn_database_task(network_info, certificate_volume_info, attempt)151            }152        if self.test_container_content is not None:153            dependencies_tasks[TEST_CONTAINER] = \154                self.create_spawn_test_container_task(network_info, certificate_volume_name, attempt)155        database_and_test_container_info_future = yield from self.run_dependencies(dependencies_tasks)156        database_and_test_container_info = \157            self.get_values_from_futures(database_and_test_container_info_future)158        test_container_info = None159        if self.test_container_content is not None:160            test_container_info = database_and_test_container_info[TEST_CONTAINER]161        database_info = database_and_test_container_info[DATABASE]162        return database_info, test_container_info163    def create_spawn_database_task(self,164                                   network_info: DockerNetworkInfo,165                                   certificate_volume_info: Optional[DockerVolumeInfo],166                                   attempt: int):167        raise AbstractMethodException()168    def create_spawn_test_container_task(self, network_info: DockerNetworkInfo,169                                         certificate_volume_name: str, attempt: int):170        return self.create_child_task_with_common_params(171                SpawnTestContainer,172                test_container_name=self.test_container_name,173                network_info=network_info,174                ip_address_index_in_subnet=1,175                certificate_volume_name=certificate_volume_name,176                attempt=attempt,177                test_container_content=self.test_container_content178                )179    def _wait_for_database(self,180                           database_info: DatabaseInfo,181                           attempt: int):182        database_ready_target_future = \183            yield from self.run_dependencies(self.create_wait_for_database_task(attempt, database_info))184        is_database_ready = self.get_values_from_futures(database_ready_target_future)185        return is_database_ready186    def create_wait_for_database_task(self,187                                      attempt: int,188                                      database_info: DatabaseInfo):...environment_info.py
Source:environment_info.py  
1from typing import Optional2from exasol_integration_test_docker_environment.lib.base.info import Info3from exasol_integration_test_docker_environment.lib.data.container_info import ContainerInfo4from exasol_integration_test_docker_environment.lib.data.database_info import DatabaseInfo5from exasol_integration_test_docker_environment.lib.data.docker_network_info import DockerNetworkInfo6class EnvironmentInfo(Info):7    def __init__(self,8                 name: str, env_type: str,9                 database_info: DatabaseInfo,10                 test_container_info: Optional[ContainerInfo],11                 network_info: DockerNetworkInfo):12        self.name = name13        self.type = env_type14        self.test_container_info = test_container_info15        self.database_info = database_info...test_docker_models.py
Source:test_docker_models.py  
1import pytest2from docker_addons.models import ContainerInfo3@pytest.mark.django_db4def test_container_info():5    seen = set()6    for _ in range(30):7        instance = ContainerInfo.objects.create()8        assert len(instance.name) == 509        assert instance.name not in seen...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
