...104 def _fetch_asset(self, url):105 cachedirs = self.config.get("datadir.paths.cache_dirs")106 asset = Asset(url, cache_dirs=cachedirs)107 return asset.fetch()108 def get_eggs_paths(self, py_major, py_minor):109 """Return the basic eggs needed to bootstrap Avocado.110 This will return a tuple with the current location and where this111 should be deployed.112 """113 result = []114 # Setuptools115 # For now let's pin to setuptools 59.2.116 # TODO: Automatically get latest setuptools version.117 eggs = [118 f"{py_major}.{py_minor}.egg"119 ]120 local_egg = self.config.get("spawner.podman.avocado_spawner_egg")121 if local_egg:122 eggs.append(local_egg)123 else:124 remote_egg = f"{VERSION}/avocado_framework-{VERSION}-py{py_major}.{py_minor}.egg"125 eggs.append(remote_egg)126 for url in eggs:127 path = self._fetch_asset(url)128 to = os.path.join("/tmp/", os.path.basename(path))129 result.append((path, to))130 return result131 @property132 async def python_version(self):133 image = self.config.get("spawner.podman.image")134 if image not in self._PYTHON_VERSIONS_CACHE:135 if not self.podman:136 msg = "Cannot get Python version: self.podman not defined."137 LOG.debug(msg)138 return None, None, None139 result = await self.podman.get_python_version(image)140 self._PYTHON_VERSIONS_CACHE[image] = result141 return self._PYTHON_VERSIONS_CACHE[image]142 async def deploy_artifacts(self):143 pass144 async def deploy_avocado(self, where):145 # Deploy all the eggs to container inside /tmp/146 major, minor, _ = await self.python_version147 eggs = self.get_eggs_paths(major, minor)148 for egg, to in eggs:149 await self.podman.copy_to_container(where, egg, to)150 async def _create_container_for_task(151 self, runtime_task, env_args, test_output=None152 ):153 mount_status_server_socket = False154 mounted_status_server_socket = "/tmp/.status_server.sock"155 status_server_uri = runtime_task.task.status_services[0].uri156 if ":" not in status_server_uri:157 # a unix domain socket is being used158 mount_status_server_socket = True159 runtime_task.task.status_services[0].uri = mounted_status_server_socket160 _, _, python_binary = await self.python_version161 entry_point_args = [python_binary, "-m", "avocado.core.nrunner", "task-run"]162 test_opts = ()163 if runtime_task.task.category == "test":164 runnable_uri = runtime_task.task.runnable.uri165 try:166 test_path, _ = runnable_uri.split(":", 1)167 except ValueError:168 test_path = runnable_uri169 if os.path.exists(test_path):170 to = os.path.join("/tmp", test_path)171 runtime_task.task.runnable.uri = os.path.join("/tmp", runnable_uri)172 test_opts = ("-v", f"{os.path.abspath(test_path)}:{to}:ro")173 task = runtime_task.task174 entry_point_args.extend(task.get_command_args())175 entry_point = json.dumps(entry_point_args)176 entry_point_arg = "--entrypoint=" + entry_point177 if mount_status_server_socket:178 status_server_opts = (179 "--privileged",180 "-v",181 f"{status_server_uri}:{mounted_status_server_socket}",182 )183 else:184 status_server_opts = ("--net=host",)185 output_opts = ()186 if test_output:187 podman_output = runtime_task.task.runnable.output_dir188 output_opts = (189 "-v",190 (f"{test_output}:" f"{os.path.expanduser(podman_output)}"),191 )192 image, _ = self._get_image_from_cache(runtime_task)193 if not image:194 image = self.config.get("spawner.podman.image")195 envs = [f"-e={k}={v}" for k, v in env_args.items()]196 try:197 # pylint: disable=W0201198 _, stdout, _ = await self.podman.execute(199 "create",200 *status_server_opts,201 *output_opts,202 *test_opts,203 entry_point_arg,204 *envs,205 image,206 )207 except PodmanException as ex:208 msg = f"Could not create podman container: {ex}"209 runtime_task.status = msg210 return False211 return stdout.decode().strip()212 async def spawn_task(self, runtime_task):213 self.create_task_output_dir(runtime_task)214 podman_bin = self.config.get("spawner.podman.bin")215 try:216 # pylint: disable=W0201217 self.podman = Podman(podman_bin)218 except PodmanException as ex:219 runtime_task.status = str(ex)220 return False221 major, minor, _ = await self.python_version222 # Return only the "to" location223 eggs = self.get_eggs_paths(major, minor)224 destination_eggs = ":".join(map(lambda egg: str(egg[1]), eggs))225 env_args = {"PYTHONPATH": destination_eggs}226 output_dir_path = self.task_output_dir(runtime_task)227 container_id = await self._create_container_for_task(228 runtime_task, env_args, output_dir_path229 )230 runtime_task.spawner_handle = container_id231 await self.deploy_avocado(container_id)232 try:233 # pylint: disable=W0201234 returncode, _, _ = await self.podman.start(container_id)235 except PodmanException as ex:236 msg = f"Could not start container: {ex}"237 runtime_task.status = msg...

Full Screen

2import socket3import subprocess4from vimpdb import config5from vimpdb import errors6def get_eggs_paths():7 import vim_bridge8 vimpdb_path = config.get_package_path(errors.ReturnCodeError())9 vim_bridge_path = config.get_package_path(vim_bridge.bridged)10 return (11 os.path.dirname(vimpdb_path),12 os.path.dirname(vim_bridge_path),13 )14class Communicator(object):15 def __init__(self, script, server_name):16 self.script = script17 self.server_name = server_name18 def prepare_subprocess(self, *args):19 parts = self.script.split()20 parts.extend(args)21 return parts22 def _remote_expr(self, expr):23 parts = self.prepare_subprocess('--servername',24 self.server_name, "--remote-expr", expr)25 p = subprocess.Popen(parts, stdout=subprocess.PIPE)26 return_code = p.wait()27 if return_code:28 raise errors.RemoteUnavailable()29 child_stdout = p.stdout30 output = return output.strip()32 def _send(self, command):33 # add ':<BS>' to hide last keys sent in VIM command-line34 command = ''.join((command, ':<BS>'))35 parts = self.prepare_subprocess('--servername',36 self.server_name, "--remote-send", command)37 return_code = if return_code:39 raise errors.RemoteUnavailable()40class ProxyToVim(object):41 """42 use subprocess to launch Vim instance that use clientserver mode43 to communicate with Vim instance used for debugging.44 """45 def __init__(self, communicator):46 self.communicator = communicator47 def _send(self, command):48 self.communicator._send(command)49 config.logger.debug("sent: %s" % command)50 def _remote_expr(self, expr):51 return self.communicator._remote_expr(expr)52 def setupRemote(self):53 if not self.isRemoteSetup():54 # source vimpdb.vim55 proxy_package_path = config.get_package_path(self)56 filename = os.path.join(proxy_package_path, "vimpdb.vim")57 command = "<C-\><C-N>:source %s<CR>" % filename58 self._send(command)59 for egg_path in get_eggs_paths():60 self._send(':call PDB_setup_egg(%s)<CR>' % repr(egg_path))61 self._send(':call PDB_init_controller()')62 def isRemoteSetup(self):63 status = self._expr("exists('*PDB_setup_egg')")64 return status == '1'65 def showFeedback(self, feedback):66 if not feedback:67 return68 feedback_list = feedback.splitlines()69 self.setupRemote()70 self._send(':call PDB_show_feedback(%s)<CR>' % repr(feedback_list))71 def displayLocals(self, feedback):72 if not feedback:73 return...

