...40 return json.load(fp)41 def _print_job_details(self, details):42 for key, value in details.items():43"%-12s: %s", key, value)44 def _print_job_tests(self, tests):45 test_matrix = []46 date_fmt = "%Y/%m/%d %H:%M:%S"47 for test in tests:48 status = test.get('status')49 decorator = output.TEST_STATUS_DECORATOR_MAPPING.get(status)50 end = datetime.fromtimestamp(test.get('end'))51 test_matrix.append((test.get('id'),52 end.strftime(date_fmt),53 "%5f" % float(test.get('time')),54 decorator(status, '')))55 header = (output.TERM_SUPPORT.header_str('Test ID'),56 output.TERM_SUPPORT.header_str('End Time'),57 output.TERM_SUPPORT.header_str('Run Time'),58 output.TERM_SUPPORT.header_str('Status'))59 for line in astring.iter_tabular_output(test_matrix,60 header=header,61 strip=True):62 LOG_UI.debug(line)63 def _save_stream_to_file(self, stream, filename):64 """Save stream to a file.65 Directory must exists before calling this function.66 """67 dirname = os.path.dirname(filename)68 os.makedirs(dirname, exist_ok=True)69 with open(filename, 'ab') as output_file:70 output_file.write(stream)71 def configure(self, parser):72 """73 Add the subparser for the assets action.74 :param parser: The Avocado command line application parser75 :type parser: :class:`avocado.core.parser.ArgumentParser`76 """77 parser = super(Jobs, self).configure(parser)78 subcommands = parser.add_subparsers(dest='jobs_subcommand',79 metavar='sub-command')80 subcommands.required = True81 help_msg = 'List all known jobs by Avocado'82 subcommands.add_parser('list', help=help_msg)83 help_msg = ('Show details about a specific job. When passing a Job '84 'ID, you can use any Job Reference (job_id, "latest", '85 'or job results path).')86 show_parser = subcommands.add_parser('show', help=help_msg)87 settings.register_option(section='',88 key='job_id',89 help_msg='JOB id',90 metavar='JOBID',91 default='latest',92 nargs='?',93 positional_arg=True,94 parser=show_parser)95 help_msg = ('Download output files generated by tests on '96 'AVOCADO_TEST_OUTPUT_DIR')97 output_files_parser = subcommands.add_parser('get-output-files',98 help=help_msg)99 settings.register_option(section='jobs.get.output_files',100 key='job_id',101 help_msg='JOB id',102 metavar='JOBID',103 default=None,104 positional_arg=True,105 parser=output_files_parser)106 settings.register_option(section='jobs.get.output_files',107 key='destination',108 help_msg='Destination path',109 metavar='DESTINATION',110 default=None,111 positional_arg=True,112 parser=output_files_parser)113 def handle_list_command(self, jobs_results):114 """Called when 'avocado jobs list' command is executed."""115 for filename in jobs_results.values():116 with open(filename, 'r') as fp:117 job = json.load(fp)118 try:119 started_ts = job['tests'][0]['start']120 started = datetime.fromtimestamp(started_ts)121 except IndexError:122 continue123"%-40s %-26s %3s (%s/%s/%s/%s)",124 job['job_id'],125 str(started),126 job['total'],127 job['pass'],128 job['skip'],129 job['errors'],130 job['failures'])131 return exit_codes.AVOCADO_ALL_OK132 def _download_tests(self, tests, destination, job_id, spawner):133 for test in tests:134 test_id = test.get('id')135"Downloading files for test %s", test_id)136 try:137 files_buffers = spawner().stream_output(job_id, test_id)138 for filename, stream in files_buffers:139 dest = os.path.join(destination,140 test_id.replace('/', '_'),141 filename)142 self._save_stream_to_file(stream, dest)143 except SpawnerException as ex:144 LOG_UI.error("Error: Failed to download: %s. Exiting...", ex)145 return exit_codes.AVOCADO_GENERIC_CRASH146 return exit_codes.AVOCADO_ALL_OK147 def handle_output_files_command(self, config):148 """Called when 'avocado jobs get-output-files' command is executed."""149 job_id = config.get('jobs.get.output_files.job_id')150 destination = config.get('jobs.get.output_files.destination')151 results_dir = get_job_results_dir(job_id)152 results_file = os.path.join(results_dir, 'results.json')153 config_file = os.path.join(results_dir, 'jobdata/args.json')154 try:155 config_data = self._get_data_from_file(config_file)156 results_data = self._get_data_from_file(results_file)157 except FileNotFoundError as ex:158 LOG_UI.error("Could not get job information: %s", ex)159 return exit_codes.AVOCADO_GENERIC_CRASH160 spawners = {'process': ProcessSpawner,161 'podman': PodmanSpawner}162 spawner_name = config_data.get('nrunner.spawner')163 spawner = spawners.get(spawner_name)164 if spawner is None:165 msg = ("Could not find the spawner for job %s. This command is "166 "experimental and only supported when job executed with "167 "the Spawner architecture.")168 LOG_UI.error(msg, job_id)169 return exit_codes.AVOCADO_GENERIC_CRASH170 return self._download_tests(results_data.get('tests'),171 destination,172 job_id,173 spawner)174 def handle_show_command(self, config):175 """Called when 'avocado jobs show' command is executed."""176 job_id = config.get('')177 results_dir = get_job_results_dir(job_id)178 if results_dir is None:179 LOG_UI.error("Error: Job %s not found", job_id)180 return exit_codes.AVOCADO_GENERIC_CRASH181 results_file = os.path.join(results_dir, 'results.json')182 config_file = os.path.join(results_dir, 'jobdata/args.json')183 try:184 results_data = self._get_data_from_file(results_file)185 except FileNotFoundError as ex:186 # Results data are important and should exit if not found187 LOG_UI.error(ex)188 return exit_codes.AVOCADO_GENERIC_CRASH189 try:190 config_data = self._get_data_from_file(config_file)191 except FileNotFoundError:192 pass193 data = {'JOB ID': job_id,194 'JOB LOG': results_data.get('debuglog'),195 'SPAWNER': config_data.get('nrunner.spawner', 'unknown')}196 # We could improve this soon with more data and colors197 self._print_job_details(data)198"")199 self._print_job_tests(results_data.get('tests'))200 results = ('PASS %d | ERROR %d | FAIL %d | SKIP %d |'201 'WARN %d | INTERRUPT %s | CANCEL %s')202 results %= (results_data.get('pass', 0),203 results_data.get('error', 0),204 results_data.get('failures', 0),205 results_data.get('skip', 0),206 results_data.get('warn', 0),207 results_data.get('interrupt', 0),208 results_data.get('cancel', 0))209 self._print_job_details({'RESULTS': results})210 return exit_codes.AVOCADO_ALL_OK211 def run(self, config):212 results = {}213 jobs_dir = get_logs_dir()...

