Best Python code snippet using localstack_python
docker_sdk_client.py
Source:docker_sdk_client.py  
...39        if self.docker_client:40            return self.docker_client41        else:42            raise ContainerException("Docker not available")43    def _read_from_sock(self, sock: socket, tty: bool):44        """Reads multiplexed messages from a socket returned by attach_socket.45        Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach46        """47        stdout = b""48        stderr = b""49        for frame_type, frame_data in frames_iter(sock, tty):50            if frame_type == STDOUT:51                stdout += frame_data52            elif frame_type == STDERR:53                stderr += frame_data54            else:55                raise ContainerException("Invalid frame type when reading from socket")56        return stdout, stderr57    def _container_path_info(self, container: Container, container_path: str):58        """59        Get information about a path in the given container60        :param container: Container to be inspected61        :param container_path: Path in container62        :return: Tuple (path_exists, path_is_directory)63        """64        # Docker CLI copy uses go FileMode to determine if target is a dict or not65        # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L26066        # The isDir Bit is the most significant bit in the 32bit struct:67        # https://golang.org/src/os/types.go?s=2650:268368        stats = {}69        try:70            _, stats = container.get_archive(container_path)71            target_exists = True72        except APIError:73            target_exists = False74        target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)75        return target_exists, target_is_dir76    def get_container_status(self, container_name: str) -> DockerContainerStatus:77        # LOG.debug("Getting container status for container: %s", container_name) #  too verbose78        try:79            container = self.client().containers.get(container_name)80            if container.status == "running":81                return DockerContainerStatus.UP82            elif container.status == "paused":83                return DockerContainerStatus.PAUSED84            else:85                return DockerContainerStatus.DOWN86        except NotFound:87            return DockerContainerStatus.NON_EXISTENT88        except APIError:89            raise ContainerException()90    def stop_container(self, container_name: str, timeout: int = None) -> None:91        if timeout is None:92            timeout = self.STOP_TIMEOUT93        LOG.debug("Stopping container: %s", container_name)94        try:95            container = self.client().containers.get(container_name)96            container.stop(timeout=timeout)97        except NotFound:98            raise NoSuchContainer(container_name)99        except APIError:100            raise ContainerException()101    def pause_container(self, container_name: str) -> None:102        LOG.debug("Pausing container: %s", container_name)103        try:104            container = self.client().containers.get(container_name)105            container.pause()106        except NotFound:107            raise NoSuchContainer(container_name)108        except APIError:109            raise ContainerException()110    def unpause_container(self, container_name: str) -> None:111        LOG.debug("Unpausing container: %s", container_name)112        try:113            container = self.client().containers.get(container_name)114            container.unpause()115        except NotFound:116            raise NoSuchContainer(container_name)117        except APIError:118            raise ContainerException()119    def remove_container(self, container_name: str, force=True, check_existence=False) -> None:120        LOG.debug("Removing container: %s", container_name)121        if check_existence and container_name not in self.get_running_container_names():122            LOG.debug("Aborting removing due to check_existence check")123            return124        try:125            container = self.client().containers.get(container_name)126            container.remove(force=force)127        except NotFound:128            if not force:129                raise NoSuchContainer(container_name)130        except APIError:131            raise ContainerException()132    def list_containers(self, filter: Union[List[str], str, None] = None, all=True) -> List[dict]:133        if filter:134            filter = [filter] if isinstance(filter, str) else filter135            filter = dict([f.split("=", 1) for f in filter])136        LOG.debug("Listing containers with filters: %s", filter)137        try:138            container_list = self.client().containers.list(filters=filter, all=all)139            result = []140            for container in container_list:141                try:142                    result.append(143                        {144                            "id": container.id,145                            "image": container.image,146                            "name": container.name,147                            "status": container.status,148                            "labels": container.labels,149                        }150                    )151                except Exception as e:152                    LOG.error(f"Error checking container {container}: {e}")153            return result154        except APIError:155            raise ContainerException()156    def copy_into_container(157        self, container_name: str, local_path: str, container_path: str158    ) -> None:  # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/159        LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)160        try:161            container = self.client().containers.get(container_name)162            target_exists, target_isdir = self._container_path_info(container, container_path)163            target_path = container_path if target_isdir else os.path.dirname(container_path)164            with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:165                container.put_archive(target_path, tar)166        except NotFound:167            raise NoSuchContainer(container_name)168        except APIError:169            raise ContainerException()170    def copy_from_container(171        self,172        container_name: str,173        local_path: str,174        container_path: str,175    ) -> None:176        LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)177        try:178            container = self.client().containers.get(container_name)179            bits, _ = container.get_archive(container_path)180            Util.untar_to_path(bits, local_path)181        except NotFound:182            raise NoSuchContainer(container_name)183        except APIError:184            raise ContainerException()185    def pull_image(self, docker_image: str) -> None:186        LOG.debug("Pulling Docker image: %s", docker_image)187        # some path in the docker image string indicates a custom repository188        try:189            self.client().images.pull(docker_image)190        except ImageNotFound:191            raise NoSuchImage(docker_image)192        except APIError:193            raise ContainerException()194    def push_image(self, docker_image: str) -> None:195        LOG.debug("Pushing Docker image: %s", docker_image)196        try:197            result = self.client().images.push(docker_image)198            # some SDK clients (e.g., 5.0.0) seem to return an error string, instead of raising199            if isinstance(result, (str, bytes)) and '"errorDetail"' in to_str(result):200                if "image does not exist locally" in to_str(result):201                    raise NoSuchImage(docker_image)202                if "is denied" in to_str(result):203                    raise AccessDenied(docker_image)204                if "connection refused" in to_str(result):205                    raise RegistryConnectionError(result)206                raise ContainerException(result)207        except ImageNotFound:208            raise NoSuchImage(docker_image)209        except APIError as e:210            raise ContainerException() from e211    def build_image(self, dockerfile_path: str, image_name: str, context_path: str = None):212        try:213            dockerfile_path = Util.resolve_dockerfile_path(dockerfile_path)214            context_path = context_path or os.path.dirname(dockerfile_path)215            LOG.debug("Building Docker image %s from %s", image_name, dockerfile_path)216            self.client().images.build(217                path=context_path,218                dockerfile=dockerfile_path,219                tag=image_name,220                rm=True,221            )222        except APIError as e:223            raise ContainerException("Unable to build Docker image") from e224    def tag_image(self, source_ref: str, target_name: str) -> None:225        try:226            LOG.debug("Tagging Docker image '%s' as '%s'", source_ref, target_name)227            image = self.client().images.get(source_ref)228            image.tag(target_name)229        except APIError as e:230            if e.status_code == 404:231                raise NoSuchImage(source_ref)232            raise ContainerException("Unable to tag Docker image") from e233    def get_docker_image_names(self, strip_latest=True, include_tags=True):234        try:235            images = self.client().images.list()236            image_names = [tag for image in images for tag in image.tags if image.tags]237            if not include_tags:238                image_names = list(map(lambda image_name: image_name.split(":")[0], image_names))239            if strip_latest:240                Util.append_without_latest(image_names)241            return image_names242        except APIError:243            raise ContainerException()244    def get_container_logs(self, container_name_or_id: str, safe=False) -> str:245        try:246            container = self.client().containers.get(container_name_or_id)247            return to_str(container.logs())248        except NotFound:249            if safe:250                return ""251            raise NoSuchContainer(container_name_or_id)252        except APIError:253            if safe:254                return ""255            raise ContainerException()256    def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:257        try:258            return self.client().containers.get(container_name_or_id).attrs259        except NotFound:260            raise NoSuchContainer(container_name_or_id)261        except APIError:262            raise ContainerException()263    def inspect_image(self, image_name: str, pull: bool = True) -> Dict[str, Union[Dict, str]]:264        try:265            return self.client().images.get(image_name).attrs266        except NotFound:267            if pull:268                self.pull_image(image_name)269                return self.inspect_image(image_name, pull=False)270            raise NoSuchImage(image_name)271        except APIError:272            raise ContainerException()273    def inspect_network(self, network_name: str) -> Dict[str, Union[Dict, str]]:274        try:275            return self.client().networks.get(network_name).attrs276        except NotFound:277            raise NoSuchNetwork(network_name)278        except APIError:279            raise ContainerException()280    def connect_container_to_network(281        self, network_name: str, container_name_or_id: str, aliases: Optional[List] = None282    ) -> None:283        LOG.debug(284            "Connecting container '%s' to network '%s' with aliases '%s'",285            container_name_or_id,286            network_name,287            aliases,288        )289        try:290            network = self.client().networks.get(network_name)291        except NotFound:292            raise NoSuchNetwork(network_name)293        try:294            network.connect(container=container_name_or_id, aliases=aliases)295        except NotFound:296            raise NoSuchContainer(container_name_or_id)297        except APIError:298            raise ContainerException()299    def disconnect_container_from_network(300        self, network_name: str, container_name_or_id: str301    ) -> None:302        LOG.debug(303            "Disconnecting container '%s' from network '%s'", container_name_or_id, network_name304        )305        try:306            try:307                network = self.client().networks.get(network_name)308            except NotFound:309                raise NoSuchNetwork(network_name)310            try:311                network.disconnect(container_name_or_id)312            except NotFound:313                raise NoSuchContainer(container_name_or_id)314        except APIError:315            raise ContainerException()316    def get_container_ip(self, container_name_or_id: str) -> str:317        networks = self.inspect_container(container_name_or_id)["NetworkSettings"]["Networks"]318        network_names = list(networks)319        if len(network_names) > 1:320            LOG.info("Container has more than one assigned network. Picking the first one...")321        return networks[network_names[0]]["IPAddress"]322    def has_docker(self) -> bool:323        try:324            if not self.docker_client:325                return False326            self.client().ping()327            return True328        except APIError:329            return False330    def remove_image(self, image: str, force: bool = True):331        LOG.debug("Removing image %s %s", image, "(forced)" if force else "")332        try:333            self.client().images.remove(image=image, force=force)334        except ImageNotFound:335            if not force:336                raise NoSuchImage(image)337    def commit(338        self,339        container_name_or_id: str,340        image_name: str,341        image_tag: str,342    ):343        LOG.debug(344            "Creating image from container %s as %s:%s", container_name_or_id, image_name, image_tag345        )346        try:347            container = self.client().containers.get(container_name_or_id)348            container.commit(repository=image_name, tag=image_tag)349        except NotFound:350            raise NoSuchContainer(container_name_or_id)351        except APIError:352            raise ContainerException()353    def start_container(354        self,355        container_name_or_id: str,356        stdin=None,357        interactive: bool = False,358        attach: bool = False,359        flags: Optional[str] = None,360    ) -> Tuple[bytes, bytes]:361        LOG.debug("Starting container %s", container_name_or_id)362        try:363            container = self.client().containers.get(container_name_or_id)364            stdout = to_bytes(container_name_or_id)365            stderr = b""366            if interactive or attach:367                params = {"stdout": 1, "stderr": 1, "stream": 1}368                if interactive:369                    params["stdin"] = 1370                sock = container.attach_socket(params=params)371                sock = sock._sock if hasattr(sock, "_sock") else sock372                result_queue = queue.Queue()373                thread_started = threading.Event()374                start_waiting = threading.Event()375                # Note: We need to be careful about potential race conditions here - .wait() should happen right376                #   after .start(). Hence starting a thread and asynchronously waiting for the container exit code377                def wait_for_result(*_):378                    _exit_code = -1379                    try:380                        thread_started.set()381                        start_waiting.wait()382                        _exit_code = container.wait()["StatusCode"]383                    except APIError as e:384                        _exit_code = 1385                        raise ContainerException(str(e))386                    finally:387                        result_queue.put(_exit_code)388                # start listener thread389                start_worker_thread(wait_for_result)390                thread_started.wait()391                # start container392                container.start()393                # start awaiting container result394                start_waiting.set()395                # handle container input/output396                # under windows, the socket has no __enter__ / cannot be used as context manager397                # therefore try/finally instead of with here398                try:399                    if stdin:400                        sock.sendall(to_bytes(stdin))401                        sock.shutdown(socket.SHUT_WR)402                    stdout, stderr = self._read_from_sock(sock, False)403                except socket.timeout:404                    LOG.debug(405                        f"Socket timeout when talking to the I/O streams of Docker container '{container_name_or_id}'"406                    )407                finally:408                    sock.close()409                # get container exit code410                exit_code = result_queue.get()411                if exit_code:412                    raise ContainerException(413                        "Docker container returned with exit code %s" % exit_code,414                        stdout=stdout,415                        stderr=stderr,416                    )417            else:418                container.start()419            return stdout, stderr420        except NotFound:421            raise NoSuchContainer(container_name_or_id)422        except APIError:423            raise ContainerException()424    def create_container(425        self,426        image_name: str,427        *,428        name: Optional[str] = None,429        entrypoint: Optional[str] = None,430        remove: bool = False,431        interactive: bool = False,432        tty: bool = False,433        detach: bool = False,434        command: Optional[Union[List[str], str]] = None,435        mount_volumes: Optional[List[SimpleVolumeBind]] = None,436        ports: Optional[PortMappings] = None,437        env_vars: Optional[Dict[str, str]] = None,438        user: Optional[str] = None,439        cap_add: Optional[List[str]] = None,440        cap_drop: Optional[List[str]] = None,441        network: Optional[str] = None,442        dns: Optional[str] = None,443        additional_flags: Optional[str] = None,444        workdir: Optional[str] = None,445    ) -> str:446        LOG.debug("Creating container with attributes: %s", locals())447        extra_hosts = None448        if additional_flags:449            env_vars, ports, mount_volumes, extra_hosts, network = Util.parse_additional_flags(450                additional_flags, env_vars, ports, mount_volumes, network451            )452        try:453            kwargs = {}454            if cap_add:455                kwargs["cap_add"] = cap_add456            if cap_drop:457                kwargs["cap_drop"] = cap_drop458            if dns:459                kwargs["dns"] = [dns]460            if ports:461                kwargs["ports"] = ports.to_dict()462            if workdir:463                kwargs["working_dir"] = workdir464            mounts = None465            if mount_volumes:466                mounts = Util.convert_mount_list_to_dict(mount_volumes)467            def create_container():468                return self.client().containers.create(469                    image=image_name,470                    command=command,471                    auto_remove=remove,472                    name=name,473                    stdin_open=interactive,474                    tty=tty,475                    entrypoint=entrypoint,476                    environment=env_vars,477                    detach=detach,478                    user=user,479                    network=network,480                    volumes=mounts,481                    extra_hosts=extra_hosts,482                    **kwargs,483                )484            try:485                container = create_container()486            except ImageNotFound:487                self.pull_image(image_name)488                container = create_container()489            return container.id490        except ImageNotFound:491            raise NoSuchImage(image_name)492        except APIError:493            raise ContainerException()494    def run_container(495        self,496        image_name: str,497        stdin=None,498        *,499        name: Optional[str] = None,500        entrypoint: Optional[str] = None,501        remove: bool = False,502        interactive: bool = False,503        tty: bool = False,504        detach: bool = False,505        command: Optional[Union[List[str], str]] = None,506        mount_volumes: Optional[List[SimpleVolumeBind]] = None,507        ports: Optional[PortMappings] = None,508        env_vars: Optional[Dict[str, str]] = None,509        user: Optional[str] = None,510        cap_add: Optional[List[str]] = None,511        cap_drop: Optional[List[str]] = None,512        network: Optional[str] = None,513        dns: Optional[str] = None,514        additional_flags: Optional[str] = None,515        workdir: Optional[str] = None,516    ) -> Tuple[bytes, bytes]:517        LOG.debug("Running container with image: %s", image_name)518        container = None519        try:520            container = self.create_container(521                image_name,522                name=name,523                entrypoint=entrypoint,524                interactive=interactive,525                tty=tty,526                detach=detach,527                remove=remove and detach,528                command=command,529                mount_volumes=mount_volumes,530                ports=ports,531                env_vars=env_vars,532                user=user,533                cap_add=cap_add,534                cap_drop=cap_drop,535                network=network,536                dns=dns,537                additional_flags=additional_flags,538                workdir=workdir,539            )540            result = self.start_container(541                container_name_or_id=container,542                stdin=stdin,543                interactive=interactive,544                attach=not detach,545            )546        finally:547            if remove and container and not detach:548                self.remove_container(container)549        return result550    def exec_in_container(551        self,552        container_name_or_id: str,553        command: Union[List[str], str],554        interactive=False,555        detach=False,556        env_vars: Optional[Dict[str, Optional[str]]] = None,557        stdin: Optional[bytes] = None,558        user: Optional[str] = None,559        workdir: Optional[str] = None,560    ) -> Tuple[bytes, bytes]:561        LOG.debug("Executing command in container %s: %s", container_name_or_id, command)562        try:563            container: Container = self.client().containers.get(container_name_or_id)564            result = container.exec_run(565                cmd=command,566                environment=env_vars,567                user=user,568                detach=detach,569                stdin=interactive and bool(stdin),570                socket=interactive and bool(stdin),571                stdout=True,572                stderr=True,573                demux=True,574                workdir=workdir,575            )576            tty = False577            if interactive and stdin:  # result is a socket578                sock = result[1]579                sock = sock._sock if hasattr(sock, "_sock") else sock580                with sock:581                    try:582                        sock.sendall(stdin)583                        sock.shutdown(socket.SHUT_WR)584                        stdout, stderr = self._read_from_sock(sock, tty)585                        return stdout, stderr586                    except socket.timeout:587                        pass588            else:589                if detach:590                    return b"", b""591                return_code = result[0]592                if isinstance(result[1], bytes):593                    stdout = result[1]594                    stderr = b""595                else:596                    stdout, stderr = result[1]597                if return_code != 0:598                    raise ContainerException(...Conn.py
Source:Conn.py  
...41        self._sock = sock42        self._buffer = io.BytesIO()43        self.bytes_read = 044        self.bytes_written = 045    def _read_from_sock(self, length=None):46        buf = self._buffer47        buf.seek(self.bytes_written)48        tr = 049        while True:50            data = self._sock.recv(1024)51            if isinstance(data, bytes) and len(data) == 0:52                raise Exception('Connection closed')53            buf.write(data)54            self.bytes_written += len(data)55            tr += len(data)56            if length is not None and length > tr:57                continue58            break59    def _already_read(self):60        return self.bytes_written - self.bytes_read61    def _read_line(self):62        buf = self._buffer63        buf.seek(self.bytes_read)64        data = buf.readline()65        while not data.endswith(self.end_marker):66            self._read_from_sock()67            buf.seek(self.bytes_read)68            data = buf.readline()69        self.bytes_read += len(data)70        if self.bytes_read == self.bytes_written:71            self._purge_buffer()72        return data[:-1*len(self.end_marker)]73    def _read_bulk(self, length):74        length = length +len (self.end_marker)75        if length > self._already_read():76            self._read_from_sock(length - self._already_read())77        self._buffer.seek(self.bytes_read)78        data = self._buffer.read(length)79        self.bytes_read += len(data)80        if self.bytes_read == self.bytes_written:81            self._purge_buffer()82        return data[:-2]83    def _send_request(self, command):84        self.lazy_connect()85        self._sock.sendall(command + self.end_marker)86        return True87    def do_request(self, command, debug=False):88        if debug:89            print('>>>>%s' % command)90        self._send_request(command)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
