Best Python code snippet using localstack_python
docker_utils.py
Source:docker_utils.py  
...470        try:471            output = safe_run(cmd)472            image_names = output.splitlines()473            if strip_latest:474                Util.append_without_latest(image_names)475            return image_names476        except Exception as e:477            LOG.info('Unable to list Docker images via "%s": %s' % (cmd, e))478            return []479    def get_container_logs(self, container_name_or_id: str, safe=False) -> str:480        cmd = self._docker_cmd()481        cmd += ["logs", container_name_or_id]482        try:483            return safe_run(cmd)484        except subprocess.CalledProcessError as e:485            if safe:486                return ""487            if "No such container" in to_str(e.stdout):488                raise NoSuchContainer(container_name_or_id, stdout=e.stdout, stderr=e.stderr)489            else:490                raise ContainerException(491                    "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr492                )493    def _inspect_object(self, object_name_or_id: str) -> Dict[str, Union[Dict, str]]:494        cmd = self._docker_cmd()495        cmd += ["inspect", "--format", "{{json .}}", object_name_or_id]496        try:497            cmd_result = safe_run(cmd)498        except subprocess.CalledProcessError as e:499            if "No such object" in to_str(e.stdout):500                raise NoSuchObject(object_name_or_id, stdout=e.stdout, stderr=e.stderr)501            else:502                raise ContainerException(503                    "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr504                )505        image_data = json.loads(cmd_result.strip())506        return image_data507    def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:508        try:509            return self._inspect_object(container_name_or_id)510        except NoSuchObject as e:511            raise NoSuchContainer(container_name_or_id=e.object_id)512    def inspect_image(self, image_name: str) -> Dict[str, Union[Dict, str]]:513        try:514            return self._inspect_object(image_name)515        except NoSuchObject as e:516            raise NoSuchImage(image_name=e.object_id)517    def inspect_network(self, network_name: str) -> Dict[str, Union[Dict, str]]:518        try:519            return self._inspect_object(network_name)520        except NoSuchObject as e:521            raise NoSuchNetwork(network_name=e.object_id)522    def get_container_ip(self, container_name_or_id: str) -> str:523        cmd = self._docker_cmd()524        cmd += [525            "inspect",526            "--format",527            "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}",528            container_name_or_id,529        ]530        try:531            return safe_run(cmd).strip()532        except subprocess.CalledProcessError as e:533            if "No such object" in to_str(e.stdout):534                raise NoSuchContainer(container_name_or_id, stdout=e.stdout, stderr=e.stderr)535            else:536                raise ContainerException(537                    "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr538                )539    def has_docker(self) -> bool:540        try:541            safe_run(self._docker_cmd() + ["ps"])542            return True543        except subprocess.CalledProcessError:544            return False545    def create_container(self, image_name: str, **kwargs) -> str:546        cmd, env_file = self._build_run_create_cmd("create", image_name, **kwargs)547        LOG.debug("Create container with cmd: %s", cmd)548        try:549            container_id = safe_run(cmd)550            # Note: strip off Docker warning messages like "DNS setting (--dns=127.0.0.1) may fail in containers"551            container_id = container_id.strip().split("\n")[-1]552            return container_id.strip()553        except subprocess.CalledProcessError as e:554            if "Unable to find image" in to_str(e.stdout):555                raise NoSuchImage(image_name, stdout=e.stdout, stderr=e.stderr)556            raise ContainerException(557                "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr558            )559        finally:560            Util.rm_env_vars_file(env_file)561    def run_container(self, image_name: str, stdin=None, **kwargs) -> Tuple[bytes, bytes]:562        cmd, env_file = self._build_run_create_cmd("run", image_name, **kwargs)563        LOG.debug("Run container with cmd: %s", cmd)564        result = self._run_async_cmd(cmd, stdin, kwargs.get("name") or "", image_name)565        Util.rm_env_vars_file(env_file)566        return result567    def exec_in_container(568        self,569        container_name_or_id: str,570        command: Union[List[str], str],571        interactive=False,572        detach=False,573        env_vars: Optional[Dict[str, str]] = None,574        stdin: Optional[bytes] = None,575        user: Optional[str] = None,576    ) -> Tuple[bytes, bytes]:577        env_file = None578        cmd = self._docker_cmd()579        cmd.append("exec")580        if interactive:581            cmd.append("--interactive")582        if detach:583            cmd.append("--detach")584        if user:585            cmd += ["--user", user]586        if env_vars:587            env_flag, env_file = Util.create_env_vars_file_flag(env_vars)588            cmd += env_flag589        cmd.append(container_name_or_id)590        cmd += command if isinstance(command, List) else [command]591        LOG.debug("Execute in container cmd: %s", cmd)592        result = self._run_async_cmd(cmd, stdin, container_name_or_id)593        Util.rm_env_vars_file(env_file)594        return result595    def start_container(596        self,597        container_name_or_id: str,598        stdin=None,599        interactive: bool = False,600        attach: bool = False,601        flags: Optional[str] = None,602    ) -> Tuple[bytes, bytes]:603        cmd = self._docker_cmd() + ["start"]604        if flags:605            cmd.append(flags)606        if interactive:607            cmd.append("--interactive")608        if attach:609            cmd.append("--attach")610        cmd.append(container_name_or_id)611        LOG.debug("Start container with cmd: %s", cmd)612        return self._run_async_cmd(cmd, stdin, container_name_or_id)613    def _run_async_cmd(614        self, cmd: List[str], stdin: bytes, container_name: str, image_name=None615    ) -> Tuple[bytes, bytes]:616        kwargs = {617            "inherit_env": True,618            "asynchronous": True,619            "stderr": subprocess.PIPE,620            "outfile": subprocess.PIPE,621        }622        if stdin:623            kwargs["stdin"] = True624        try:625            process = safe_run(cmd, **kwargs)626            stdout, stderr = process.communicate(input=stdin)627            if process.returncode != 0:628                raise subprocess.CalledProcessError(629                    process.returncode,630                    cmd,631                    stdout,632                    stderr,633                )634            else:635                return stdout, stderr636        except subprocess.CalledProcessError as e:637            stderr_str = to_str(e.stderr)638            if "Unable to find image" in stderr_str:639                raise NoSuchImage(image_name or "", stdout=e.stdout, stderr=e.stderr)640            if "No such container" in stderr_str:641                raise NoSuchContainer(container_name, stdout=e.stdout, stderr=e.stderr)642            raise ContainerException(643                "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr644            )645    def _build_run_create_cmd(646        self,647        action: str,648        image_name: str,649        *,650        name: Optional[str] = None,651        entrypoint: Optional[str] = None,652        remove: bool = False,653        interactive: bool = False,654        tty: bool = False,655        detach: bool = False,656        command: Optional[Union[List[str], str]] = None,657        mount_volumes: Optional[List[Tuple[str, str]]] = None,658        ports: Optional[PortMappings] = None,659        env_vars: Optional[Dict[str, str]] = None,660        user: Optional[str] = None,661        cap_add: Optional[str] = None,662        network: Optional[str] = None,663        dns: Optional[str] = None,664        additional_flags: Optional[str] = None,665        workdir: Optional[str] = None,666    ) -> Tuple[List[str], str]:667        env_file = None668        cmd = self._docker_cmd() + [action]669        if remove:670            cmd.append("--rm")671        if name:672            cmd += ["--name", name]673        if entrypoint is not None:  # empty string entrypoint can be intentional674            cmd += ["--entrypoint", entrypoint]675        if mount_volumes:676            cmd += [677                volume678                for host_path, docker_path in mount_volumes679                for volume in ["-v", f"{host_path}:{docker_path}"]680            ]681        if interactive:682            cmd.append("--interactive")683        if tty:684            cmd.append("--tty")685        if detach:686            cmd.append("--detach")687        if ports:688            cmd += ports.to_list()689        if env_vars:690            env_flags, env_file = Util.create_env_vars_file_flag(env_vars)691            cmd += env_flags692        if user:693            cmd += ["--user", user]694        if cap_add:695            cmd += ["--cap-add", cap_add]696        if network:697            cmd += ["--network", network]698        if dns:699            cmd += ["--dns", dns]700        if workdir:701            cmd += ["--workdir", workdir]702        if additional_flags:703            cmd += shlex.split(additional_flags)704        cmd.append(image_name)705        if command:706            cmd += command if isinstance(command, List) else [command]707        return cmd, env_file708class Util:709    MAX_ENV_ARGS_LENGTH = 20000710    @classmethod711    def create_env_vars_file_flag(cls, env_vars: Dict) -> Tuple[List[str], Optional[str]]:712        if not env_vars:713            return [], None714        result = []715        env_vars = dict(env_vars)716        env_file = None717        if len(str(env_vars)) > cls.MAX_ENV_ARGS_LENGTH:718            # default ARG_MAX=131072 in Docker - let's create an env var file if the string becomes too long...719            env_file = cls.mountable_tmp_file()720            env_content = ""721            for name, value in dict(env_vars).items():722                if len(value) > cls.MAX_ENV_ARGS_LENGTH:723                    # each line in the env file has a max size as well (error "bufio.Scanner: token too long")724                    continue725                env_vars.pop(name)726                value = value.replace("\n", "\\")727                env_content += "%s=%s\n" % (name, value)728            save_file(env_file, env_content)729            result += ["--env-file", env_file]730        env_vars_res = [item for k, v in env_vars.items() for item in ["-e", "{}={}".format(k, v)]]731        result += env_vars_res732        return result, env_file733    @staticmethod734    def rm_env_vars_file(env_vars_file) -> None:735        if env_vars_file:736            return rm_rf(env_vars_file)737    @staticmethod738    def mountable_tmp_file():739        f = os.path.join(config.TMP_FOLDER, short_uid())740        TMP_FILES.append(f)741        return f742    @staticmethod743    def append_without_latest(image_names):744        suffix = ":latest"745        for image in list(image_names):746            if image.endswith(suffix):747                image_names.append(image[: -len(suffix)])748    @staticmethod749    def tar_path(path, target_path, is_dir: bool):750        f = tempfile.NamedTemporaryFile()751        with tarfile.open(mode="w", fileobj=f) as t:752            abs_path = os.path.abspath(path)753            arcname = (754                os.path.basename(path)755                if is_dir756                else (os.path.basename(target_path) or os.path.basename(path))757            )758            t.add(abs_path, arcname=arcname)759        f.seek(0)760        return f761    @staticmethod762    def untar_to_path(tardata, target_path):763        target_path = Path(target_path)764        with tarfile.open(mode="r", fileobj=io.BytesIO(b"".join(b for b in tardata))) as t:765            if target_path.is_dir():766                t.extractall(path=target_path)767            else:768                member = t.next()769                if member:770                    member.name = target_path.name771                    t.extract(member, target_path.parent)772                else:773                    LOG.debug("File to copy empty, ignoring...")774    @staticmethod775    def parse_additional_flags(776        additional_flags: str,777        env_vars: Dict[str, str] = None,778        ports: PortMappings = None,779        mounts: List[Tuple[str, str]] = None,780        network: Optional[str] = None,781    ) -> Tuple[782        Dict[str, str], PortMappings, List[Tuple[str, str]], Optional[Dict[str, str]], Optional[str]783    ]:784        """Parses environment, volume and port flags passed as string785        :param additional_flags: String which contains the flag definitions786        :param env_vars: Dict with env vars. Will be modified in place.787        :param ports: PortMapping object. Will be modified in place.788        :param mounts: List of mount tuples (host_path, container_path). Will be modified in place.789        :param network: Existing network name (optional). Warning will be printed if network is overwritten in flags.790        :return: A tuple containing the env_vars, ports, mount, extra_hosts and network objects. Will return new objects791                if respective parameters were None and additional flags contained a flag for that object, the same which792                are passed otherwise.793        """794        cur_state = None795        extra_hosts = None796        # TODO Use argparse to simplify this logic797        for flag in shlex.split(additional_flags):798            if not cur_state:799                if flag in ["-v", "--volume"]:800                    cur_state = "volume"801                elif flag in ["-p", "--publish"]:802                    cur_state = "port"803                elif flag in ["-e", "--env"]:804                    cur_state = "env"805                elif flag == "--add-host":806                    cur_state = "add-host"807                elif flag == "--network":808                    cur_state = "set-network"809                else:810                    raise NotImplementedError(811                        f"Flag {flag} is currently not supported by this Docker client."812                    )813            else:814                if cur_state == "volume":815                    mounts = mounts if mounts is not None else []816                    match = re.match(817                        r"(?P<host>[\w\s\\\/:\-.]+?):(?P<container>[\w\s\/\-.]+)(?::(?P<arg>ro|rw|z|Z))?",818                        flag,819                    )820                    if not match:821                        LOG.warning("Unable to parse volume mount Docker flags: %s", flag)822                        continue823                    host_path = match.group("host")824                    container_path = match.group("container")825                    rw_args = match.group("arg")826                    if rw_args:827                        LOG.info("Volume options like :ro or :rw are currently ignored.")828                    mounts.append((host_path, container_path))829                elif cur_state == "port":830                    port_split = flag.split(":")831                    protocol = "tcp"832                    if len(port_split) == 2:833                        host_port, container_port = port_split834                    elif len(port_split) == 3:835                        LOG.warning(836                            "Host part of port mappings are ignored currently in additional flags"837                        )838                        _, host_port, container_port = port_split839                    else:840                        raise ValueError("Invalid port string provided: %s", flag)841                    if "/" in container_port:842                        container_port, protocol = container_port.split("/")843                    ports = ports if ports is not None else PortMappings()844                    ports.add(int(host_port), int(container_port), protocol)845                elif cur_state == "env":846                    lhs, _, rhs = flag.partition("=")847                    env_vars = env_vars if env_vars is not None else {}848                    env_vars[lhs] = rhs849                elif cur_state == "add-host":850                    extra_hosts = extra_hosts if extra_hosts is not None else {}851                    hosts_split = flag.split(":")852                    extra_hosts[hosts_split[0]] = hosts_split[1]853                elif cur_state == "set-network":854                    if network:855                        LOG.warning(856                            "Overwriting Docker container network '%s' with new value '%s'",857                            network,858                            flag,859                        )860                    network = flag861                cur_state = None862        return env_vars, ports, mounts, extra_hosts, network863    @staticmethod864    def convert_mount_list_to_dict(865        mount_volumes: List[Tuple[str, str]]866    ) -> Dict[str, Dict[str, str]]:867        """Converts a List of (host_path, container_path) tuples to a Dict suitable as volume argument for docker sdk"""868        return dict(869            map(870                lambda paths: (str(paths[0]), {"bind": paths[1], "mode": "rw"}),871                mount_volumes,872            )873        )874class SdkDockerClient(ContainerClient):875    """Class for managing docker using the python docker sdk"""876    docker_client: Optional[DockerClient]877    def __init__(self):878        try:879            self.docker_client = docker.from_env()880            logging.getLogger("urllib3").setLevel(logging.INFO)881        except DockerException:882            self.docker_client = None883    def client(self):884        if self.docker_client:885            return self.docker_client886        else:887            raise ContainerException("Docker not available")888    def _read_from_sock(self, sock: socket, tty: bool):889        """Reads multiplexed messages from a socket returned by attach_socket.890        Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach891        """892        stdout = b""893        stderr = b""894        for frame_type, frame_data in frames_iter(sock, tty):895            if frame_type == STDOUT:896                stdout += frame_data897            elif frame_type == STDERR:898                stderr += frame_data899            else:900                raise ContainerException("Invalid frame type when reading from socket")901        return stdout, stderr902    def _container_path_info(self, container: Container, container_path: str):903        """904        Get information about a path in the given container905        :param container: Container to be inspected906        :param container_path: Path in container907        :return: Tuple (path_exists, path_is_directory)908        """909        # Docker CLI copy uses go FileMode to determine if target is a dict or not910        # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260911        # The isDir Bit is the most significant bit in the 32bit struct:912        # https://golang.org/src/os/types.go?s=2650:2683913        stats = {}914        try:915            _, stats = container.get_archive(container_path)916            target_exists = True917        except APIError:918            target_exists = False919        target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)920        return target_exists, target_is_dir921    def get_container_status(self, container_name: str) -> DockerContainerStatus:922        # LOG.debug("Getting container status for container: %s", container_name) #  too verbose923        try:924            container = self.client().containers.get(container_name)925            if container.status == "running":926                return DockerContainerStatus.UP927            else:928                return DockerContainerStatus.DOWN929        except NotFound:930            return DockerContainerStatus.NON_EXISTENT931        except APIError:932            raise ContainerException()933    def get_network(self, container_name: str) -> str:934        LOG.debug("Getting network type for container: %s", container_name)935        try:936            container = self.client().containers.get(container_name)937            return container.attrs["HostConfig"]["NetworkMode"]938        except NotFound:939            raise NoSuchContainer(container_name)940        except APIError:941            raise ContainerException()942    def stop_container(self, container_name: str) -> None:943        LOG.debug("Stopping container: %s", container_name)944        try:945            container = self.client().containers.get(container_name)946            container.stop(timeout=0)947        except NotFound:948            raise NoSuchContainer(container_name)949        except APIError:950            raise ContainerException()951    def remove_container(self, container_name: str, force=True, check_existence=False) -> None:952        LOG.debug("Removing container: %s", container_name)953        if check_existence and container_name not in self.get_running_container_names():954            LOG.debug("Aborting removing due to check_existence check")955            return956        try:957            container = self.client().containers.get(container_name)958            container.remove(force=force)959        except NotFound:960            if not force:961                raise NoSuchContainer(container_name)962        except APIError:963            raise ContainerException()964    def list_containers(self, filter: Union[List[str], str, None] = None, all=True) -> List[dict]:965        if filter:966            filter = [filter] if isinstance(filter, str) else filter967            filter = dict([f.split("=", 1) for f in filter])968        LOG.debug("Listing containers with filters: %s", filter)969        try:970            container_list = self.client().containers.list(filters=filter, all=all)971            return list(972                map(973                    lambda container: {974                        "id": container.id,975                        "image": container.image,976                        "name": container.name,977                        "status": container.status,978                        "labels": container.labels,979                    },980                    container_list,981                )982            )983        except APIError:984            raise ContainerException()985    def copy_into_container(986        self, container_name: str, local_path: str, container_path: str987    ) -> None:  # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/988        LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)989        try:990            container = self.client().containers.get(container_name)991            target_exists, target_isdir = self._container_path_info(container, container_path)992            target_path = container_path if target_isdir else os.path.dirname(container_path)993            with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:994                container.put_archive(target_path, tar)995        except NotFound:996            raise NoSuchContainer(container_name)997        except APIError:998            raise ContainerException()999    def copy_from_container(1000        self,1001        container_name: str,1002        local_path: str,1003        container_path: str,1004    ) -> None:1005        LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)1006        try:1007            container = self.client().containers.get(container_name)1008            bits, _ = container.get_archive(container_path)1009            Util.untar_to_path(bits, local_path)1010        except NotFound:1011            raise NoSuchContainer(container_name)1012        except APIError:1013            raise ContainerException()1014    def pull_image(self, docker_image: str) -> None:1015        LOG.debug("Pulling image: %s", docker_image)1016        # some path in the docker image string indicates a custom repository1017        path_split = docker_image.rpartition("/")1018        image_split = path_split[2].partition(":")1019        repository = f"{path_split[0]}{path_split[1]}{image_split[0]}"1020        tag = image_split[2]1021        try:1022            LOG.debug("Repository: %s Tag: %s", repository, tag)1023            self.client().images.pull(repository, tag)1024        except ImageNotFound:1025            raise NoSuchImage(docker_image)1026        except APIError:1027            raise ContainerException()1028    def get_docker_image_names(self, strip_latest=True, include_tags=True):1029        try:1030            images = self.client().images.list()1031            image_names = [tag for image in images for tag in image.tags if image.tags]1032            if not include_tags:1033                image_names = list(map(lambda image_name: image_name.split(":")[0], image_names))1034            if strip_latest:1035                Util.append_without_latest(image_names)1036            return image_names1037        except APIError:1038            raise ContainerException()1039    def get_container_logs(self, container_name_or_id: str, safe=False) -> str:1040        try:1041            container = self.client().containers.get(container_name_or_id)1042            return to_str(container.logs())1043        except NotFound:1044            if safe:1045                return ""1046            raise NoSuchContainer(container_name_or_id)1047        except APIError:1048            if safe:1049                return ""...docker.py
Source:docker.py  
...447        try:448            output = safe_run(cmd)449            image_names = output.splitlines()450            if strip_latest:451                Util.append_without_latest(image_names)452            return image_names453        except Exception as e:454            LOG.info('Unable to list Docker images via "%s": %s' % (cmd, e))455            return []456    def get_container_logs(self, container_name_or_id: str, safe=False) -> str:457        cmd = self._docker_cmd()458        cmd += ["logs", container_name_or_id]459        try:460            return safe_run(cmd)461        except subprocess.CalledProcessError as e:462            if safe:463                return ""464            if "No such container" in to_str(e.stdout):465                raise NoSuchContainer(container_name_or_id, stdout=e.stdout, stderr=e.stderr)466            else:467                raise ContainerException(468                    "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr469                )470    def _inspect_object(self, object_name_or_id: str) -> Dict[str, Union[Dict, str]]:471        cmd = self._docker_cmd()472        cmd += ["inspect", "--format", "{{json .}}", object_name_or_id]473        try:474            cmd_result = safe_run(cmd)475        except subprocess.CalledProcessError as e:476            if "No such object" in to_str(e.stdout):477                raise NoSuchObject(object_name_or_id, stdout=e.stdout, stderr=e.stderr)478            else:479                raise ContainerException(480                    "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr481                )482        image_data = json.loads(cmd_result.strip())483        return image_data484    def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:485        try:486            return self._inspect_object(container_name_or_id)487        except NoSuchObject as e:488            raise NoSuchContainer(container_name_or_id=e.object_id)489    def inspect_image(self, image_name: str) -> Dict[str, Union[Dict, str]]:490        try:491            return self._inspect_object(image_name)492        except NoSuchObject as e:493            raise NoSuchImage(image_name=e.object_id)494    def get_container_ip(self, container_name_or_id: str) -> str:495        cmd = self._docker_cmd()496        cmd += [497            "inspect",498            "--format",499            "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}",500            container_name_or_id,501        ]502        try:503            return safe_run(cmd).strip()504        except subprocess.CalledProcessError as e:505            if "No such object" in to_str(e.stdout):506                raise NoSuchContainer(container_name_or_id, stdout=e.stdout, stderr=e.stderr)507            else:508                raise ContainerException(509                    "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr510                )511    def has_docker(self) -> bool:512        try:513            safe_run(self._docker_cmd() + ["ps"])514            return True515        except subprocess.CalledProcessError:516            return False517    def create_container(self, image_name: str, **kwargs) -> str:518        cmd, env_file = self._build_run_create_cmd("create", image_name, **kwargs)519        LOG.debug("Create container with cmd: %s", cmd)520        try:521            container_id = safe_run(cmd)522            # Note: strip off Docker warning messages like "DNS setting (--dns=127.0.0.1) may fail in containers"523            container_id = container_id.strip().split("\n")[-1]524            return container_id.strip()525        except subprocess.CalledProcessError as e:526            if "Unable to find image" in to_str(e.stdout):527                raise NoSuchImage(image_name, stdout=e.stdout, stderr=e.stderr)528            raise ContainerException(529                "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr530            )531        finally:532            Util.rm_env_vars_file(env_file)533    def run_container(self, image_name: str, stdin=None, **kwargs) -> Tuple[bytes, bytes]:534        cmd, env_file = self._build_run_create_cmd("run", image_name, **kwargs)535        LOG.debug("Run container with cmd: %s", cmd)536        result = self._run_async_cmd(cmd, stdin, kwargs.get("name") or "", image_name)537        Util.rm_env_vars_file(env_file)538        return result539    def exec_in_container(540        self,541        container_name_or_id: str,542        command: Union[List[str], str],543        interactive=False,544        detach=False,545        env_vars: Optional[Dict[str, str]] = None,546        stdin: Optional[bytes] = None,547        user: Optional[str] = None,548    ) -> Tuple[bytes, bytes]:549        env_file = None550        cmd = self._docker_cmd()551        cmd.append("exec")552        if interactive:553            cmd.append("--interactive")554        if detach:555            cmd.append("--detach")556        if user:557            cmd += ["--user", user]558        if env_vars:559            env_flag, env_file = Util.create_env_vars_file_flag(env_vars)560            cmd += env_flag561        cmd.append(container_name_or_id)562        cmd += command if isinstance(command, List) else [command]563        LOG.debug("Execute in container cmd: %s", cmd)564        result = self._run_async_cmd(cmd, stdin, container_name_or_id)565        Util.rm_env_vars_file(env_file)566        return result567    def start_container(568        self,569        container_name_or_id: str,570        stdin=None,571        interactive: bool = False,572        attach: bool = False,573        flags: Optional[str] = None,574    ) -> Tuple[bytes, bytes]:575        cmd = self._docker_cmd() + ["start"]576        if flags:577            cmd.append(flags)578        if interactive:579            cmd.append("--interactive")580        if attach:581            cmd.append("--attach")582        cmd.append(container_name_or_id)583        LOG.debug("Start container with cmd: %s", cmd)584        return self._run_async_cmd(cmd, stdin, container_name_or_id)585    def _run_async_cmd(586        self, cmd: List[str], stdin: bytes, container_name: str, image_name=None587    ) -> Tuple[bytes, bytes]:588        kwargs = {589            "inherit_env": True,590            "asynchronous": True,591            "stderr": subprocess.PIPE,592            "outfile": subprocess.PIPE,593        }594        if stdin:595            kwargs["stdin"] = True596        try:597            process = safe_run(cmd, **kwargs)598            stdout, stderr = process.communicate(input=stdin)599            if process.returncode != 0:600                raise subprocess.CalledProcessError(601                    process.returncode,602                    cmd,603                    stdout,604                    stderr,605                )606            else:607                return stdout, stderr608        except subprocess.CalledProcessError as e:609            stderr_str = to_str(e.stderr)610            if "Unable to find image" in stderr_str:611                raise NoSuchImage(image_name or "", stdout=e.stdout, stderr=e.stderr)612            if "No such container" in stderr_str:613                raise NoSuchContainer(container_name, stdout=e.stdout, stderr=e.stderr)614            raise ContainerException(615                "Docker process returned with errorcode %s" % e.returncode, e.stdout, e.stderr616            )617    def _build_run_create_cmd(618        self,619        action: str,620        image_name: str,621        *,622        name: Optional[str] = None,623        entrypoint: Optional[str] = None,624        remove: bool = False,625        interactive: bool = False,626        tty: bool = False,627        detach: bool = False,628        command: Optional[Union[List[str], str]] = None,629        mount_volumes: Optional[List[Tuple[str, str]]] = None,630        ports: Optional[PortMappings] = None,631        env_vars: Optional[Dict[str, str]] = None,632        user: Optional[str] = None,633        cap_add: Optional[str] = None,634        network: Optional[str] = None,635        dns: Optional[str] = None,636        additional_flags: Optional[str] = None,637    ) -> Tuple[List[str], str]:638        env_file = None639        cmd = self._docker_cmd() + [action]640        if remove:641            cmd.append("--rm")642        if name:643            cmd += ["--name", name]644        if entrypoint is not None:  # empty string entrypoint can be intentional645            cmd += ["--entrypoint", entrypoint]646        if mount_volumes:647            cmd += [648                volume649                for host_path, docker_path in mount_volumes650                for volume in ["-v", f"{host_path}:{docker_path}"]651            ]652        if interactive:653            cmd.append("--interactive")654        if tty:655            cmd.append("--tty")656        if detach:657            cmd.append("--detach")658        if ports:659            cmd += ports.to_list()660        if env_vars:661            env_flags, env_file = Util.create_env_vars_file_flag(env_vars)662            cmd += env_flags663        if user:664            cmd += ["--user", user]665        if cap_add:666            cmd += ["--cap-add", cap_add]667        if network:668            cmd += ["--network", network]669        if dns:670            cmd += ["--dns", dns]671        if additional_flags:672            cmd += shlex.split(additional_flags)673        cmd.append(image_name)674        if command:675            cmd += command if isinstance(command, List) else [command]676        return cmd, env_file677class Util:678    MAX_ENV_ARGS_LENGTH = 20000679    @classmethod680    def create_env_vars_file_flag(cls, env_vars: Dict) -> Tuple[List[str], Optional[str]]:681        if not env_vars:682            return [], None683        result = []684        env_vars = dict(env_vars)685        env_file = None686        if len(str(env_vars)) > cls.MAX_ENV_ARGS_LENGTH:687            # default ARG_MAX=131072 in Docker - let's create an env var file if the string becomes too long...688            env_file = cls.mountable_tmp_file()689            env_content = ""690            for name, value in dict(env_vars).items():691                if len(value) > cls.MAX_ENV_ARGS_LENGTH:692                    # each line in the env file has a max size as well (error "bufio.Scanner: token too long")693                    continue694                env_vars.pop(name)695                value = value.replace("\n", "\\")696                env_content += "%s=%s\n" % (name, value)697            save_file(env_file, env_content)698            result += ["--env-file", env_file]699        env_vars_res = [item for k, v in env_vars.items() for item in ["-e", "{}={}".format(k, v)]]700        result += env_vars_res701        return result, env_file702    @staticmethod703    def rm_env_vars_file(env_vars_file) -> None:704        if env_vars_file:705            return rm_rf(env_vars_file)706    @staticmethod707    def mountable_tmp_file():708        f = os.path.join(config.TMP_FOLDER, short_uid())709        TMP_FILES.append(f)710        return f711    @staticmethod712    def append_without_latest(image_names):713        suffix = ":latest"714        for image in list(image_names):715            if image.endswith(suffix):716                image_names.append(image[: -len(suffix)])717    @staticmethod718    def tar_path(path, target_path, is_dir: bool):719        f = tempfile.NamedTemporaryFile()720        with tarfile.open(mode="w", fileobj=f) as t:721            abs_path = os.path.abspath(path)722            arcname = (723                os.path.basename(path)724                if is_dir725                else (os.path.basename(target_path) or os.path.basename(path))726            )727            t.add(abs_path, arcname=arcname)728        f.seek(0)729        return f730    @staticmethod731    def untar_to_path(tardata, target_path):732        target_path = Path(target_path)733        with tarfile.open(mode="r", fileobj=io.BytesIO(b"".join(b for b in tardata))) as t:734            if target_path.is_dir():735                t.extractall(path=target_path)736            else:737                member = t.next()738                if member:739                    member.name = target_path.name740                    t.extract(member, target_path.parent)741                else:742                    LOG.debug("File to copy empty, ignoring...")743    @staticmethod744    def parse_additional_flags(745        additional_flags: str,746        env_vars: Dict[str, str] = None,747        ports: PortMappings = None,748        mounts: List[Tuple[str, str]] = None,749        network: Optional[str] = None,750    ) -> Tuple[751        Dict[str, str], PortMappings, List[Tuple[str, str]], Optional[Dict[str, str]], Optional[str]752    ]:753        """Parses environment, volume and port flags passed as string754        :param additional_flags: String which contains the flag definitions755        :param env_vars: Dict with env vars. Will be modified in place.756        :param ports: PortMapping object. Will be modified in place.757        :param mounts: List of mount tuples (host_path, container_path). Will be modified in place.758        :param network: Existing network name (optional). Warning will be printed if network is overwritten in flags.759        :return: A tuple containing the env_vars, ports, mount, extra_hosts and network objects. Will return new objects760                if respective parameters were None and additional flags contained a flag for that object, the same which761                are passed otherwise.762        """763        cur_state = None764        extra_hosts = None765        # TODO Use argparse to simplify this logic766        for flag in shlex.split(additional_flags):767            if not cur_state:768                if flag in ["-v", "--volume"]:769                    cur_state = "volume"770                elif flag in ["-p", "--publish"]:771                    cur_state = "port"772                elif flag in ["-e", "--env"]:773                    cur_state = "env"774                elif flag == "--add-host":775                    cur_state = "add-host"776                elif flag == "--network":777                    cur_state = "set-network"778                else:779                    raise NotImplementedError(780                        f"Flag {flag} is currently not supported by this Docker client."781                    )782            else:783                if cur_state == "volume":784                    mounts = mounts if mounts is not None else []785                    match = re.match(786                        r"(?P<host>[\w\s\\\/:\-.]+?):(?P<container>[\w\s\/\-.]+)(?::(?P<arg>ro|rw|z|Z))?",787                        flag,788                    )789                    if not match:790                        LOG.warning("Unable to parse volume mount Docker flags: %s", flag)791                        continue792                    host_path = match.group("host")793                    container_path = match.group("container")794                    rw_args = match.group("arg")795                    if rw_args:796                        LOG.info("Volume options like :ro or :rw are currently ignored.")797                    mounts.append((host_path, container_path))798                elif cur_state == "port":799                    port_split = flag.split(":")800                    protocol = "tcp"801                    if len(port_split) == 2:802                        host_port, container_port = port_split803                    elif len(port_split) == 3:804                        LOG.warning(805                            "Host part of port mappings are ignored currently in additional flags"806                        )807                        _, host_port, container_port = port_split808                    else:809                        raise ValueError("Invalid port string provided: %s", flag)810                    if "/" in container_port:811                        container_port, protocol = container_port.split("/")812                    ports = ports if ports is not None else PortMappings()813                    ports.add(int(host_port), int(container_port), protocol)814                elif cur_state == "env":815                    lhs, _, rhs = flag.partition("=")816                    env_vars = env_vars if env_vars is not None else {}817                    env_vars[lhs] = rhs818                elif cur_state == "add-host":819                    extra_hosts = extra_hosts if extra_hosts is not None else {}820                    hosts_split = flag.split(":")821                    extra_hosts[hosts_split[0]] = hosts_split[1]822                elif cur_state == "set-network":823                    if network:824                        LOG.warning(825                            "Overwriting Docker container network '%s' with new value '%s'",826                            network,827                            flag,828                        )829                    network = flag830                cur_state = None831        return env_vars, ports, mounts, extra_hosts, network832    @staticmethod833    def convert_mount_list_to_dict(834        mount_volumes: List[Tuple[str, str]]835    ) -> Dict[str, Dict[str, str]]:836        """Converts a List of (host_path, container_path) tuples to a Dict suitable as volume argument for docker sdk"""837        return dict(838            map(839                lambda paths: (str(paths[0]), {"bind": paths[1], "mode": "rw"}),840                mount_volumes,841            )842        )843class SdkDockerClient(ContainerClient):844    """Class for managing docker using the python docker sdk"""845    docker_client: Optional[DockerClient]846    def __init__(self):847        try:848            self.docker_client = docker.from_env()849            logging.getLogger("urllib3").setLevel(logging.INFO)850        except DockerException:851            self.docker_client = None852    def client(self):853        if self.docker_client:854            return self.docker_client855        else:856            raise ContainerException("Docker not available")857    def _read_from_sock(self, sock: socket, tty: bool):858        """Reads multiplexed messages from a socket returned by attach_socket.859        Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach860        """861        stdout = b""862        stderr = b""863        for frame_type, frame_data in frames_iter(sock, tty):864            if frame_type == STDOUT:865                stdout += frame_data866            elif frame_type == STDERR:867                stderr += frame_data868            else:869                raise ContainerException("Invalid frame type when reading from socket")870        return stdout, stderr871    def _container_path_info(self, container: Container, container_path: str):872        """873        Get information about a path in the given container874        :param container: Container to be inspected875        :param container_path: Path in container876        :return: Tuple (path_exists, path_is_directory)877        """878        # Docker CLI copy uses go FileMode to determine if target is a dict or not879        # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L260880        # The isDir Bit is the most significant bit in the 32bit struct:881        # https://golang.org/src/os/types.go?s=2650:2683882        stats = {}883        try:884            _, stats = container.get_archive(container_path)885            target_exists = True886        except APIError:887            target_exists = False888        target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)889        return target_exists, target_is_dir890    def get_container_status(self, container_name: str) -> DockerContainerStatus:891        # LOG.debug("Getting container status for container: %s", container_name) #  too verbose892        try:893            container = self.client().containers.get(container_name)894            if container.status == "running":895                return DockerContainerStatus.UP896            else:897                return DockerContainerStatus.DOWN898        except NotFound:899            return DockerContainerStatus.NON_EXISTENT900        except APIError:901            raise ContainerException()902    def get_network(self, container_name: str) -> str:903        LOG.debug("Getting network type for container: %s", container_name)904        try:905            container = self.client().containers.get(container_name)906            return container.attrs["HostConfig"]["NetworkMode"]907        except NotFound:908            raise NoSuchContainer(container_name)909        except APIError:910            raise ContainerException()911    def stop_container(self, container_name: str) -> None:912        LOG.debug("Stopping container: %s", container_name)913        try:914            container = self.client().containers.get(container_name)915            container.stop(timeout=0)916        except NotFound:917            raise NoSuchContainer(container_name)918        except APIError:919            raise ContainerException()920    def remove_container(self, container_name: str, force=True, check_existence=False) -> None:921        LOG.debug("Removing container: %s", container_name)922        if check_existence and container_name not in self.get_running_container_names():923            LOG.debug("Aborting removing due to check_existence check")924            return925        try:926            container = self.client().containers.get(container_name)927            container.remove(force=force)928        except NotFound:929            if not force:930                raise NoSuchContainer(container_name)931        except APIError:932            raise ContainerException()933    def list_containers(self, filter: Union[List[str], str, None] = None, all=True) -> List[dict]:934        if filter:935            filter = [filter] if isinstance(filter, str) else filter936            filter = dict([f.split("=", 1) for f in filter])937        LOG.debug("Listing containers with filters: %s", filter)938        try:939            container_list = self.client().containers.list(filters=filter, all=all)940            return list(941                map(942                    lambda container: {943                        "id": container.id,944                        "image": container.image,945                        "name": container.name,946                        "status": container.status,947                        "labels": container.labels,948                    },949                    container_list,950                )951            )952        except APIError:953            raise ContainerException()954    def copy_into_container(955        self, container_name: str, local_path: str, container_path: str956    ) -> None:  # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/957        LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)958        try:959            container = self.client().containers.get(container_name)960            target_exists, target_isdir = self._container_path_info(container, container_path)961            target_path = container_path if target_isdir else os.path.dirname(container_path)962            with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:963                container.put_archive(target_path, tar)964        except NotFound:965            raise NoSuchContainer(container_name)966        except APIError:967            raise ContainerException()968    def copy_from_container(969        self,970        container_name: str,971        local_path: str,972        container_path: str,973    ) -> None:974        LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)975        try:976            container = self.client().containers.get(container_name)977            bits, _ = container.get_archive(container_path)978            Util.untar_to_path(bits, local_path)979        except NotFound:980            raise NoSuchContainer(container_name)981        except APIError:982            raise ContainerException()983    def pull_image(self, docker_image: str) -> None:984        LOG.debug("Pulling image: %s", docker_image)985        # some path in the docker image string indicates a custom repository986        path_split = docker_image.rpartition("/")987        image_split = path_split[2].partition(":")988        repository = f"{path_split[0]}{path_split[1]}{image_split[0]}"989        tag = image_split[2]990        try:991            LOG.debug("Repository: %s Tag: %s", repository, tag)992            self.client().images.pull(repository, tag)993        except ImageNotFound:994            raise NoSuchImage(docker_image)995        except APIError:996            raise ContainerException()997    def get_docker_image_names(self, strip_latest=True, include_tags=True):998        try:999            images = self.client().images.list()1000            image_names = [tag for image in images for tag in image.tags if image.tags]1001            if not include_tags:1002                image_names = list(map(lambda image_name: image_name.split(":")[0], image_names))1003            if strip_latest:1004                Util.append_without_latest(image_names)1005            return image_names1006        except APIError:1007            raise ContainerException()1008    def get_container_logs(self, container_name_or_id: str, safe=False) -> str:1009        try:1010            container = self.client().containers.get(container_name_or_id)1011            return to_str(container.logs())1012        except NotFound:1013            if safe:1014                return ""1015            raise NoSuchContainer(container_name_or_id)1016        except APIError:1017            if safe:1018                return ""...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
