...71 self.assertTrue(os.path.exists(path))72 path = data_dir.create_job_logs_dir(logdir, uid)73 self.assertEqual(path, path_prefix + uid + ".1")74 self.assertTrue(os.path.exists(path))75 def test_get_job_results_dir(self):76 from avocado.core import data_dir77 from avocado.core import job_id78 # First let's mock a jobs results directory79 #80 logs_dir = self.mapping.get('logs_dir')81 self.assertNotEqual(None, logs_dir)82 unique_id = job_id.create_unique_job_id()83 # Expected job results dir84 expected_jrd = data_dir.create_job_logs_dir(logs_dir, unique_id)85 # Now let's test some cases86 #87 self.assertEqual(None,88 data_dir.get_job_results_dir(expected_jrd, logs_dir),89 ("If passing a directory reference, it expects the id"90 "file"))91 # Create the id file.92 id_file_path = os.path.join(expected_jrd, 'id')93 with open(id_file_path, 'w') as id_file:94 id_file.write("%s\n" % unique_id)95 id_file.flush()96 os.fsync(id_file)97 self.assertEqual(expected_jrd,98 data_dir.get_job_results_dir(expected_jrd, logs_dir),99 "It should get from the path to the directory")100 results_dirname = os.path.basename(expected_jrd)101 self.assertEqual(None,102 data_dir.get_job_results_dir(results_dirname,103 logs_dir),104 "It should not get from a valid path to the directory")105 pwd = os.getcwd()106 os.chdir(logs_dir)107 self.assertEqual(expected_jrd,108 data_dir.get_job_results_dir(results_dirname,109 logs_dir),110 "It should get from relative path to the directory")111 os.chdir(pwd)112 self.assertEqual(expected_jrd,113 data_dir.get_job_results_dir(id_file_path, logs_dir),114 "It should get from the path to the id file")115 self.assertEqual(expected_jrd,116 data_dir.get_job_results_dir(unique_id, logs_dir),117 "It should get from the id")118 another_id = job_id.create_unique_job_id()119 self.assertNotEqual(unique_id, another_id)120 self.assertEqual(None,121 data_dir.get_job_results_dir(another_id, logs_dir),122 "It should not get from unexisting job")123 self.assertEqual(expected_jrd,124 data_dir.get_job_results_dir(unique_id[:7], logs_dir),125 "It should get from partial id equals to 7 digits")126 self.assertEqual(expected_jrd,127 data_dir.get_job_results_dir(unique_id[:4], logs_dir),128 "It should get from partial id less than 7 digits")129 almost_id = unique_id[:7] + ('a' * (len(unique_id) - 7))130 self.assertNotEqual(unique_id, almost_id)131 self.assertEqual(None,132 data_dir.get_job_results_dir(almost_id, logs_dir),133 ("It should not get if the id is equal on only"134 "the first 7 characters"))135 os.symlink(expected_jrd, os.path.join(logs_dir, 'latest'))136 self.assertEqual(expected_jrd,137 data_dir.get_job_results_dir('latest', logs_dir),138 "It should get from the 'latest' id")139 stg = settings.Settings()140 with unittest.mock.patch('avocado.core.stgs', stg):141 import avocado.core142 avocado.core.register_core_options()143 stg.process_config_path(self.config_file_path)144 stg.merge_with_configs()145 with unittest.mock.patch('avocado.core.data_dir.settings',146 stg):147 self.assertEqual(expected_jrd,148 data_dir.get_job_results_dir(unique_id),149 "It should use the default base logs directory")150class AltDataDirTest(Base):151 def test_settings_dir_alternate_dynamic(self):152 """153 Tests that changes to the data_dir settings are applied dynamically154 To guarantee that, first the data_dir module is loaded. Then a new,155 alternate set of data directories are created and set in the156 "canonical" settings location, that is, avocado.core.settings.settings.157 No data_dir module reload should be necessary to get the new locations158 from data_dir APIs.159 """160 # Initial settings with initial data_dir locations161 stg = settings.Settings()162 with unittest.mock.patch('avocado.core.stgs', stg):...

...44 You need to provide in your spawner a `stream_output()` method if this45 one is not suitable for your spawner. i.e: if the spawner is trying to46 access a remote output file.47 """48 results_dir = get_job_results_dir(job_id)49 task_id = string_to_safe_path(task_id)50 data_pointer = '{}/test-results/{}/data'.format(results_dir, task_id)51 src = open(data_pointer, 'r').readline().rstrip()52 try:53 for path in Path(src).expanduser().iterdir():54 if path.is_file() and path.stat().st_size != 0:55 for stream in SpawnerMixin.bytes_from_file(str(path)):56 yield (, stream)57 except FileNotFoundError as e:...

