How to use _read_from_sock method in localstack

Best Python code snippet using localstack_python

docker_sdk_client.py

Source:docker_sdk_client.py Github

copy

Full Screen

...39 if self.docker_client:40 return self.docker_client41 else:42 raise ContainerException("Docker not available")43 def _read_from_sock(self, sock: socket, tty: bool):44 """Reads multiplexed messages from a socket returned by attach_socket.45 Uses the protocol specified here: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach46 """47 stdout = b""48 stderr = b""49 for frame_type, frame_data in frames_iter(sock, tty):50 if frame_type == STDOUT:51 stdout += frame_data52 elif frame_type == STDERR:53 stderr += frame_data54 else:55 raise ContainerException("Invalid frame type when reading from socket")56 return stdout, stderr57 def _container_path_info(self, container: Container, container_path: str):58 """59 Get information about a path in the given container60 :param container: Container to be inspected61 :param container_path: Path in container62 :return: Tuple (path_exists, path_is_directory)63 """64 # Docker CLI copy uses go FileMode to determine if target is a dict or not65 # https://github.com/docker/cli/blob/e3dfc2426e51776a3263cab67fbba753dd3adaa9/cli/command/container/cp.go#L26066 # The isDir Bit is the most significant bit in the 32bit struct:67 # https://golang.org/src/os/types.go?s=2650:268368 stats = {}69 try:70 _, stats = container.get_archive(container_path)71 target_exists = True72 except APIError:73 target_exists = False74 target_is_dir = target_exists and bool(stats["mode"] & SDK_ISDIR)75 return target_exists, target_is_dir76 def get_container_status(self, container_name: str) -> DockerContainerStatus:77 # LOG.debug("Getting container status for container: %s", container_name) # too verbose78 try:79 container = self.client().containers.get(container_name)80 if container.status == "running":81 return DockerContainerStatus.UP82 elif container.status == "paused":83 return DockerContainerStatus.PAUSED84 else:85 return DockerContainerStatus.DOWN86 except NotFound:87 return DockerContainerStatus.NON_EXISTENT88 except APIError:89 raise ContainerException()90 def stop_container(self, container_name: str, timeout: int = None) -> None:91 if timeout is None:92 timeout = self.STOP_TIMEOUT93 LOG.debug("Stopping container: %s", container_name)94 try:95 container = self.client().containers.get(container_name)96 container.stop(timeout=timeout)97 except NotFound:98 raise NoSuchContainer(container_name)99 except APIError:100 raise ContainerException()101 def pause_container(self, container_name: str) -> None:102 LOG.debug("Pausing container: %s", container_name)103 try:104 container = self.client().containers.get(container_name)105 container.pause()106 except NotFound:107 raise NoSuchContainer(container_name)108 except APIError:109 raise ContainerException()110 def unpause_container(self, container_name: str) -> None:111 LOG.debug("Unpausing container: %s", container_name)112 try:113 container = self.client().containers.get(container_name)114 container.unpause()115 except NotFound:116 raise NoSuchContainer(container_name)117 except APIError:118 raise ContainerException()119 def remove_container(self, container_name: str, force=True, check_existence=False) -> None:120 LOG.debug("Removing container: %s", container_name)121 if check_existence and container_name not in self.get_running_container_names():122 LOG.debug("Aborting removing due to check_existence check")123 return124 try:125 container = self.client().containers.get(container_name)126 container.remove(force=force)127 except NotFound:128 if not force:129 raise NoSuchContainer(container_name)130 except APIError:131 raise ContainerException()132 def list_containers(self, filter: Union[List[str], str, None] = None, all=True) -> List[dict]:133 if filter:134 filter = [filter] if isinstance(filter, str) else filter135 filter = dict([f.split("=", 1) for f in filter])136 LOG.debug("Listing containers with filters: %s", filter)137 try:138 container_list = self.client().containers.list(filters=filter, all=all)139 result = []140 for container in container_list:141 try:142 result.append(143 {144 "id": container.id,145 "image": container.image,146 "name": container.name,147 "status": container.status,148 "labels": container.labels,149 }150 )151 except Exception as e:152 LOG.error(f"Error checking container {container}: {e}")153 return result154 except APIError:155 raise ContainerException()156 def copy_into_container(157 self, container_name: str, local_path: str, container_path: str158 ) -> None: # TODO behave like https://docs.docker.com/engine/reference/commandline/cp/159 LOG.debug("Copying file %s into %s:%s", local_path, container_name, container_path)160 try:161 container = self.client().containers.get(container_name)162 target_exists, target_isdir = self._container_path_info(container, container_path)163 target_path = container_path if target_isdir else os.path.dirname(container_path)164 with Util.tar_path(local_path, container_path, is_dir=target_isdir) as tar:165 container.put_archive(target_path, tar)166 except NotFound:167 raise NoSuchContainer(container_name)168 except APIError:169 raise ContainerException()170 def copy_from_container(171 self,172 container_name: str,173 local_path: str,174 container_path: str,175 ) -> None:176 LOG.debug("Copying file from %s:%s to %s", container_name, container_path, local_path)177 try:178 container = self.client().containers.get(container_name)179 bits, _ = container.get_archive(container_path)180 Util.untar_to_path(bits, local_path)181 except NotFound:182 raise NoSuchContainer(container_name)183 except APIError:184 raise ContainerException()185 def pull_image(self, docker_image: str) -> None:186 LOG.debug("Pulling Docker image: %s", docker_image)187 # some path in the docker image string indicates a custom repository188 try:189 self.client().images.pull(docker_image)190 except ImageNotFound:191 raise NoSuchImage(docker_image)192 except APIError:193 raise ContainerException()194 def push_image(self, docker_image: str) -> None:195 LOG.debug("Pushing Docker image: %s", docker_image)196 try:197 result = self.client().images.push(docker_image)198 # some SDK clients (e.g., 5.0.0) seem to return an error string, instead of raising199 if isinstance(result, (str, bytes)) and '"errorDetail"' in to_str(result):200 if "image does not exist locally" in to_str(result):201 raise NoSuchImage(docker_image)202 if "is denied" in to_str(result):203 raise AccessDenied(docker_image)204 if "connection refused" in to_str(result):205 raise RegistryConnectionError(result)206 raise ContainerException(result)207 except ImageNotFound:208 raise NoSuchImage(docker_image)209 except APIError as e:210 raise ContainerException() from e211 def build_image(self, dockerfile_path: str, image_name: str, context_path: str = None):212 try:213 dockerfile_path = Util.resolve_dockerfile_path(dockerfile_path)214 context_path = context_path or os.path.dirname(dockerfile_path)215 LOG.debug("Building Docker image %s from %s", image_name, dockerfile_path)216 self.client().images.build(217 path=context_path,218 dockerfile=dockerfile_path,219 tag=image_name,220 rm=True,221 )222 except APIError as e:223 raise ContainerException("Unable to build Docker image") from e224 def tag_image(self, source_ref: str, target_name: str) -> None:225 try:226 LOG.debug("Tagging Docker image '%s' as '%s'", source_ref, target_name)227 image = self.client().images.get(source_ref)228 image.tag(target_name)229 except APIError as e:230 if e.status_code == 404:231 raise NoSuchImage(source_ref)232 raise ContainerException("Unable to tag Docker image") from e233 def get_docker_image_names(self, strip_latest=True, include_tags=True):234 try:235 images = self.client().images.list()236 image_names = [tag for image in images for tag in image.tags if image.tags]237 if not include_tags:238 image_names = list(map(lambda image_name: image_name.split(":")[0], image_names))239 if strip_latest:240 Util.append_without_latest(image_names)241 return image_names242 except APIError:243 raise ContainerException()244 def get_container_logs(self, container_name_or_id: str, safe=False) -> str:245 try:246 container = self.client().containers.get(container_name_or_id)247 return to_str(container.logs())248 except NotFound:249 if safe:250 return ""251 raise NoSuchContainer(container_name_or_id)252 except APIError:253 if safe:254 return ""255 raise ContainerException()256 def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:257 try:258 return self.client().containers.get(container_name_or_id).attrs259 except NotFound:260 raise NoSuchContainer(container_name_or_id)261 except APIError:262 raise ContainerException()263 def inspect_image(self, image_name: str, pull: bool = True) -> Dict[str, Union[Dict, str]]:264 try:265 return self.client().images.get(image_name).attrs266 except NotFound:267 if pull:268 self.pull_image(image_name)269 return self.inspect_image(image_name, pull=False)270 raise NoSuchImage(image_name)271 except APIError:272 raise ContainerException()273 def inspect_network(self, network_name: str) -> Dict[str, Union[Dict, str]]:274 try:275 return self.client().networks.get(network_name).attrs276 except NotFound:277 raise NoSuchNetwork(network_name)278 except APIError:279 raise ContainerException()280 def connect_container_to_network(281 self, network_name: str, container_name_or_id: str, aliases: Optional[List] = None282 ) -> None:283 LOG.debug(284 "Connecting container '%s' to network '%s' with aliases '%s'",285 container_name_or_id,286 network_name,287 aliases,288 )289 try:290 network = self.client().networks.get(network_name)291 except NotFound:292 raise NoSuchNetwork(network_name)293 try:294 network.connect(container=container_name_or_id, aliases=aliases)295 except NotFound:296 raise NoSuchContainer(container_name_or_id)297 except APIError:298 raise ContainerException()299 def disconnect_container_from_network(300 self, network_name: str, container_name_or_id: str301 ) -> None:302 LOG.debug(303 "Disconnecting container '%s' from network '%s'", container_name_or_id, network_name304 )305 try:306 try:307 network = self.client().networks.get(network_name)308 except NotFound:309 raise NoSuchNetwork(network_name)310 try:311 network.disconnect(container_name_or_id)312 except NotFound:313 raise NoSuchContainer(container_name_or_id)314 except APIError:315 raise ContainerException()316 def get_container_ip(self, container_name_or_id: str) -> str:317 networks = self.inspect_container(container_name_or_id)["NetworkSettings"]["Networks"]318 network_names = list(networks)319 if len(network_names) > 1:320 LOG.info("Container has more than one assigned network. Picking the first one...")321 return networks[network_names[0]]["IPAddress"]322 def has_docker(self) -> bool:323 try:324 if not self.docker_client:325 return False326 self.client().ping()327 return True328 except APIError:329 return False330 def remove_image(self, image: str, force: bool = True):331 LOG.debug("Removing image %s %s", image, "(forced)" if force else "")332 try:333 self.client().images.remove(image=image, force=force)334 except ImageNotFound:335 if not force:336 raise NoSuchImage(image)337 def commit(338 self,339 container_name_or_id: str,340 image_name: str,341 image_tag: str,342 ):343 LOG.debug(344 "Creating image from container %s as %s:%s", container_name_or_id, image_name, image_tag345 )346 try:347 container = self.client().containers.get(container_name_or_id)348 container.commit(repository=image_name, tag=image_tag)349 except NotFound:350 raise NoSuchContainer(container_name_or_id)351 except APIError:352 raise ContainerException()353 def start_container(354 self,355 container_name_or_id: str,356 stdin=None,357 interactive: bool = False,358 attach: bool = False,359 flags: Optional[str] = None,360 ) -> Tuple[bytes, bytes]:361 LOG.debug("Starting container %s", container_name_or_id)362 try:363 container = self.client().containers.get(container_name_or_id)364 stdout = to_bytes(container_name_or_id)365 stderr = b""366 if interactive or attach:367 params = {"stdout": 1, "stderr": 1, "stream": 1}368 if interactive:369 params["stdin"] = 1370 sock = container.attach_socket(params=params)371 sock = sock._sock if hasattr(sock, "_sock") else sock372 result_queue = queue.Queue()373 thread_started = threading.Event()374 start_waiting = threading.Event()375 # Note: We need to be careful about potential race conditions here - .wait() should happen right376 # after .start(). Hence starting a thread and asynchronously waiting for the container exit code377 def wait_for_result(*_):378 _exit_code = -1379 try:380 thread_started.set()381 start_waiting.wait()382 _exit_code = container.wait()["StatusCode"]383 except APIError as e:384 _exit_code = 1385 raise ContainerException(str(e))386 finally:387 result_queue.put(_exit_code)388 # start listener thread389 start_worker_thread(wait_for_result)390 thread_started.wait()391 # start container392 container.start()393 # start awaiting container result394 start_waiting.set()395 # handle container input/output396 # under windows, the socket has no __enter__ / cannot be used as context manager397 # therefore try/finally instead of with here398 try:399 if stdin:400 sock.sendall(to_bytes(stdin))401 sock.shutdown(socket.SHUT_WR)402 stdout, stderr = self._read_from_sock(sock, False)403 except socket.timeout:404 LOG.debug(405 f"Socket timeout when talking to the I/O streams of Docker container '{container_name_or_id}'"406 )407 finally:408 sock.close()409 # get container exit code410 exit_code = result_queue.get()411 if exit_code:412 raise ContainerException(413 "Docker container returned with exit code %s" % exit_code,414 stdout=stdout,415 stderr=stderr,416 )417 else:418 container.start()419 return stdout, stderr420 except NotFound:421 raise NoSuchContainer(container_name_or_id)422 except APIError:423 raise ContainerException()424 def create_container(425 self,426 image_name: str,427 *,428 name: Optional[str] = None,429 entrypoint: Optional[str] = None,430 remove: bool = False,431 interactive: bool = False,432 tty: bool = False,433 detach: bool = False,434 command: Optional[Union[List[str], str]] = None,435 mount_volumes: Optional[List[SimpleVolumeBind]] = None,436 ports: Optional[PortMappings] = None,437 env_vars: Optional[Dict[str, str]] = None,438 user: Optional[str] = None,439 cap_add: Optional[List[str]] = None,440 cap_drop: Optional[List[str]] = None,441 network: Optional[str] = None,442 dns: Optional[str] = None,443 additional_flags: Optional[str] = None,444 workdir: Optional[str] = None,445 ) -> str:446 LOG.debug("Creating container with attributes: %s", locals())447 extra_hosts = None448 if additional_flags:449 env_vars, ports, mount_volumes, extra_hosts, network = Util.parse_additional_flags(450 additional_flags, env_vars, ports, mount_volumes, network451 )452 try:453 kwargs = {}454 if cap_add:455 kwargs["cap_add"] = cap_add456 if cap_drop:457 kwargs["cap_drop"] = cap_drop458 if dns:459 kwargs["dns"] = [dns]460 if ports:461 kwargs["ports"] = ports.to_dict()462 if workdir:463 kwargs["working_dir"] = workdir464 mounts = None465 if mount_volumes:466 mounts = Util.convert_mount_list_to_dict(mount_volumes)467 def create_container():468 return self.client().containers.create(469 image=image_name,470 command=command,471 auto_remove=remove,472 name=name,473 stdin_open=interactive,474 tty=tty,475 entrypoint=entrypoint,476 environment=env_vars,477 detach=detach,478 user=user,479 network=network,480 volumes=mounts,481 extra_hosts=extra_hosts,482 **kwargs,483 )484 try:485 container = create_container()486 except ImageNotFound:487 self.pull_image(image_name)488 container = create_container()489 return container.id490 except ImageNotFound:491 raise NoSuchImage(image_name)492 except APIError:493 raise ContainerException()494 def run_container(495 self,496 image_name: str,497 stdin=None,498 *,499 name: Optional[str] = None,500 entrypoint: Optional[str] = None,501 remove: bool = False,502 interactive: bool = False,503 tty: bool = False,504 detach: bool = False,505 command: Optional[Union[List[str], str]] = None,506 mount_volumes: Optional[List[SimpleVolumeBind]] = None,507 ports: Optional[PortMappings] = None,508 env_vars: Optional[Dict[str, str]] = None,509 user: Optional[str] = None,510 cap_add: Optional[List[str]] = None,511 cap_drop: Optional[List[str]] = None,512 network: Optional[str] = None,513 dns: Optional[str] = None,514 additional_flags: Optional[str] = None,515 workdir: Optional[str] = None,516 ) -> Tuple[bytes, bytes]:517 LOG.debug("Running container with image: %s", image_name)518 container = None519 try:520 container = self.create_container(521 image_name,522 name=name,523 entrypoint=entrypoint,524 interactive=interactive,525 tty=tty,526 detach=detach,527 remove=remove and detach,528 command=command,529 mount_volumes=mount_volumes,530 ports=ports,531 env_vars=env_vars,532 user=user,533 cap_add=cap_add,534 cap_drop=cap_drop,535 network=network,536 dns=dns,537 additional_flags=additional_flags,538 workdir=workdir,539 )540 result = self.start_container(541 container_name_or_id=container,542 stdin=stdin,543 interactive=interactive,544 attach=not detach,545 )546 finally:547 if remove and container and not detach:548 self.remove_container(container)549 return result550 def exec_in_container(551 self,552 container_name_or_id: str,553 command: Union[List[str], str],554 interactive=False,555 detach=False,556 env_vars: Optional[Dict[str, Optional[str]]] = None,557 stdin: Optional[bytes] = None,558 user: Optional[str] = None,559 workdir: Optional[str] = None,560 ) -> Tuple[bytes, bytes]:561 LOG.debug("Executing command in container %s: %s", container_name_or_id, command)562 try:563 container: Container = self.client().containers.get(container_name_or_id)564 result = container.exec_run(565 cmd=command,566 environment=env_vars,567 user=user,568 detach=detach,569 stdin=interactive and bool(stdin),570 socket=interactive and bool(stdin),571 stdout=True,572 stderr=True,573 demux=True,574 workdir=workdir,575 )576 tty = False577 if interactive and stdin: # result is a socket578 sock = result[1]579 sock = sock._sock if hasattr(sock, "_sock") else sock580 with sock:581 try:582 sock.sendall(stdin)583 sock.shutdown(socket.SHUT_WR)584 stdout, stderr = self._read_from_sock(sock, tty)585 return stdout, stderr586 except socket.timeout:587 pass588 else:589 if detach:590 return b"", b""591 return_code = result[0]592 if isinstance(result[1], bytes):593 stdout = result[1]594 stderr = b""595 else:596 stdout, stderr = result[1]597 if return_code != 0:598 raise ContainerException(...

Full Screen

Full Screen

Conn.py

Source:Conn.py Github

copy

Full Screen

...41 self._sock = sock42 self._buffer = io.BytesIO()43 self.bytes_read = 044 self.bytes_written = 045 def _read_from_sock(self, length=None):46 buf = self._buffer47 buf.seek(self.bytes_written)48 tr = 049 while True:50 data = self._sock.recv(1024)51 if isinstance(data, bytes) and len(data) == 0:52 raise Exception('Connection closed')53 buf.write(data)54 self.bytes_written += len(data)55 tr += len(data)56 if length is not None and length > tr:57 continue58 break59 def _already_read(self):60 return self.bytes_written - self.bytes_read61 def _read_line(self):62 buf = self._buffer63 buf.seek(self.bytes_read)64 data = buf.readline()65 while not data.endswith(self.end_marker):66 self._read_from_sock()67 buf.seek(self.bytes_read)68 data = buf.readline()69 self.bytes_read += len(data)70 if self.bytes_read == self.bytes_written:71 self._purge_buffer()72 return data[:-1*len(self.end_marker)]73 def _read_bulk(self, length):74 length = length +len (self.end_marker)75 if length > self._already_read():76 self._read_from_sock(length - self._already_read())77 self._buffer.seek(self.bytes_read)78 data = self._buffer.read(length)79 self.bytes_read += len(data)80 if self.bytes_read == self.bytes_written:81 self._purge_buffer()82 return data[:-2]83 def _send_request(self, command):84 self.lazy_connect()85 self._sock.sendall(command + self.end_marker)86 return True87 def do_request(self, command, debug=False):88 if debug:89 print('>>>>%s' % command)90 self._send_request(command)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful