How to use _get_run_details method in stestr

Best Python code snippet using stestr_python

kubeflow_handler.py

Source:kubeflow_handler.py Github

copy

Full Screen

...218 return experiment.id219 def _print_runs(self, pipeline_name, runs):220 """Prints runs in a tabular format with headers mentioned below."""221 headers = ['pipeline_name', 'run_id', 'status', 'created_at', 'link']222 def _get_run_details(run_id):223 """Return the link to the run detail page."""224 return '{prefix}/#/runs/details/{run_id}'.format(225 prefix=self._client._get_url_prefix(), run_id=run_id) # pylint: disable=protected-access226 data = [227 [ # pylint: disable=g-complex-comprehension228 pipeline_name, run.id, run.status,229 run.created_at.isoformat(),230 _get_run_details(run.id)231 ] for run in runs232 ]...

Full Screen

Full Screen

polaris.py

Source:polaris.py Github

copy

Full Screen

...35 self._switch_mode_specific_inst_settings(kwargs.get("mode"))36 self._inst_settings.update_attributes(kwargs=kwargs)37 vanadium_d = self._create_vanadium(run_number_string=self._inst_settings.run_in_range,38 do_absorb_corrections=self._inst_settings.do_absorb_corrections)39 run_details = self._get_run_details(run_number_string=self._inst_settings.run_in_range)40 polaris_algs.save_unsplined_vanadium(vanadium_ws=vanadium_d,41 output_path=run_details.unsplined_vanadium_file_path)42 return vanadium_d43 def create_total_scattering_pdf(self, **kwargs):44 if 'pdf_type' not in kwargs or kwargs['pdf_type'] not in ['G(r)', 'g(r)', 'RDF(r)', 'G_k(r)']:45 kwargs['pdf_type'] = 'G(r)'46 logger.warning('PDF type not specified or is invalid, defaulting to G(r)')47 self._inst_settings.update_attributes(kwargs=kwargs)48 # Generate pdf49 run_details = self._get_run_details(self._inst_settings.run_number)50 focus_file_path = self._generate_out_file_paths(run_details)["nxs_filename"]51 cal_file_name = os.path.join(self._inst_settings.calibration_dir, self._inst_settings.grouping_file_name)52 pdf_output = polaris_algs.generate_ts_pdf(run_number=self._inst_settings.run_number,53 focus_file_path=focus_file_path,54 merge_banks=self._inst_settings.merge_banks,55 q_lims=self._inst_settings.q_lims,56 cal_file_name=cal_file_name,57 sample_details=self._sample_details,58 delta_r=self._inst_settings.delta_r,59 delta_q=self._inst_settings.delta_q,60 pdf_type=self._inst_settings.pdf_type,61 lorch_filter=self._inst_settings.lorch_filter,62 freq_params=self._inst_settings.freq_params,63 debug=self._inst_settings.debug)64 return pdf_output65 def set_sample_details(self, **kwargs):66 self._switch_mode_specific_inst_settings(kwargs.get("mode"))67 kwarg_name = "sample"68 sample_details_obj = common.dictionary_key_helper(69 dictionary=kwargs, key=kwarg_name,70 exception_msg="The argument containing sample details was not found. Please"71 " set the following argument: " + kwarg_name)72 self._sample_details = sample_details_obj73 # Overrides74 def _apply_absorb_corrections(self, run_details, ws_to_correct):75 if self._is_vanadium:76 return polaris_algs.calculate_van_absorb_corrections(77 ws_to_correct=ws_to_correct, multiple_scattering=self._inst_settings.multiple_scattering,78 is_vanadium=self._is_vanadium)79 else:80 return absorb_corrections.run_cylinder_absorb_corrections(81 ws_to_correct=ws_to_correct, multiple_scattering=self._inst_settings.multiple_scattering,82 sample_details_obj=self._sample_details, is_vanadium=self._is_vanadium)83 def _crop_banks_to_user_tof(self, focused_banks):84 return common.crop_banks_using_crop_list(focused_banks, self._inst_settings.focused_cropping_values)85 def _crop_raw_to_expected_tof_range(self, ws_to_crop):86 cropped_ws = common.crop_in_tof(ws_to_crop=ws_to_crop, x_min=self._inst_settings.raw_data_crop_values[0],87 x_max=self._inst_settings.raw_data_crop_values[1])88 return cropped_ws89 def _crop_van_to_expected_tof_range(self, van_ws_to_crop):90 cropped_ws = common.crop_banks_using_crop_list(bank_list=van_ws_to_crop,91 crop_values_list=self._inst_settings.van_crop_values)92 return cropped_ws93 @staticmethod94 def _generate_input_file_name(run_number, file_ext=""):95 polaris_old_name = "POL"96 polaris_new_name = "POLARIS"97 first_run_new_name = 9691298 if isinstance(run_number, list):99 # Lists use recursion to deal with individual entries100 updated_list = []101 for run in run_number:102 updated_list.append(Polaris._generate_input_file_name(run_number=run))103 return updated_list104 else:105 # Select between old and new prefix106 # Test if it can be converted to an int or if we need to ask Mantid to do it for us107 if isinstance(run_number, str) and not run_number.isdigit():108 # Convert using Mantid and take the first element which is most likely to be the lowest digit109 use_new_name = int(common.generate_run_numbers(run_number)[0]) >= first_run_new_name110 else:111 use_new_name = int(run_number) >= first_run_new_name112 prefix = polaris_new_name if use_new_name else polaris_old_name113 return prefix + str(run_number) + file_ext114 def _get_input_batching_mode(self):115 return self._inst_settings.input_mode116 def _get_instrument_bin_widths(self):117 return self._inst_settings.focused_bin_widths118 def _get_run_details(self, run_number_string):119 run_number_string_key = self._generate_run_details_fingerprint(run_number_string,120 self._inst_settings.file_extension)121 if run_number_string_key in self._run_details_cached_obj:122 return self._run_details_cached_obj[run_number_string_key]123 self._run_details_cached_obj[run_number_string_key] = polaris_algs.get_run_details(124 run_number_string=run_number_string, inst_settings=self._inst_settings, is_vanadium_run=self._is_vanadium)125 return self._run_details_cached_obj[run_number_string_key]126 def _switch_mode_specific_inst_settings(self, mode):127 if mode is None and hasattr(self._inst_settings, "mode"):128 mode = self._inst_settings.mode129 self._inst_settings.update_attributes(advanced_config=polaris_advanced_config.get_mode_specific_dict(mode),130 suppress_warnings=True)131 def _apply_paalmanpings_absorb_and_subtract_empty(self, workspace, summed_empty, sample_details,132 paalman_pings_events_per_point=None):...

Full Screen

Full Screen

nfss_upload_results.py

Source:nfss_upload_results.py Github

copy

Full Screen

...14from glob import glob15source_file_path = ""16upload_script = ""17nfss_config = ""18def _get_run_details(app_name):19 return dict(20 image_arch='',21 ran_at=datetime.datetime.utcnow().isoformat(),22 package_version=_get_package_version(app_name),23 application_name=app_name,24 )25def _get_package_version(appname):26 """Return the package version for application *appname*.27 Return empty string if appname details are not found.28 """29 try:30 dpkg_output = subprocess.check_output(31 ['adb', 'shell', 'dpkg', '-s', appname],32 universal_newlines=True33 )34 version_line = [35 l for l in dpkg_output.split('\n') if l.startswith('Version:')36 ]37 return version_line[0].split()[1]38 except (subprocess.CalledProcessError, IndexError):39 return "Unknown"40def upload_json_details(run_details, app_details):41 app_run_details = run_details.copy()42 app_run_details["events"] = app_details43 _upload_data(run_details['application_name'], app_run_details)44def _upload_data(test_name, run_json):45 try:46 run_json_string = json.dumps(run_json)47 except ValueError as e:48 print("Error: Data does not appear to be valid json: %s" % e)49 sys.exit(3)50 print("Uploading data for :memevent:-", test_name)51 global upload_script52 global nfss_config53 try:54 upload_process = subprocess.Popen(55 [upload_script, nfss_config, 'memevent', test_name],56 stdin=subprocess.PIPE,57 stdout=subprocess.PIPE,58 stderr=subprocess.PIPE,59 )60 stdout, stderr = upload_process.communicate(61 input=run_json_string.encode()62 )63 print("stdout: {}\n\nstderr: {}".format(stdout, stderr))64 except Exception as e:65 print("Something went terribly wrong: ", e)66 if upload_process.returncode != 0:67 raise subprocess.CalledProcessError('Failed to upload to nfss')68def _get_files_app_name_and_test(filename):69 """return tuple containing (appname, testname)."""70 app_name_search = re.search(71 'memory_usage_([a-zA-Z-]*).(.*)\.json',72 filename73 )74 return (app_name_search.group(1), app_name_search.group(2))75def get_application_readings(json_data):76 app_results = []77 pids = json_data["pids"]78 for reading in json_data["readings"]:79 results = dict(80 event=reading["event"],81 start_time=reading["start_time"],82 stop_time=reading["stop_time"],83 )84 # find the data results for this event (based on pid).85 for pid in pids:86 if str(pid) in reading["data"].keys():87 results["data"] = reading["data"][str(pid)]88 results["pid"] = pid89 break90 app_results.append(results)91 return app_results92def get_application_results(json_filepath):93 with open(json_filepath, "r") as f:94 json_data = json.load(f)95 return get_application_readings(json_data)96def get_application_details(application_files):97 """For every file this application has grab out the details and return a98 list dictionaries containing reading details.99 """100 app_result = []101 for json_file in application_files:102 app_result.extend(get_application_results(json_file))103 return app_result104def map_files_to_applications():105 """For any memory result files that exist, return a dictionary whos keys106 are the applications name mapped to the file.107 We can then produce a single json result for each application regardless of108 there being many tests / results for it.109 """110 global source_file_path111 json_results_filename_pattern = os.path.join(112 source_file_path,113 "memory_usage_*.json"114 )115 file_map = defaultdict(list)116 for json_result_file in glob(json_results_filename_pattern):117 app_name, test_name = _get_files_app_name_and_test(json_result_file)118 file_map[app_name].append(json_result_file)119 return file_map120def usage():121 print("{} <source file path> <nfss upload script> <nfss config file>"122 .format(sys.argv[0]))123def main():124 if len(sys.argv) != 4:125 usage()126 exit(1)127 global source_file_path128 source_file_path = sys.argv[1]129 global upload_script130 upload_script = sys.argv[2]131 global nfss_config132 nfss_config = sys.argv[3]133 app_details = dict()134 file_map = map_files_to_applications()135 for app_name in file_map.keys():136 app_details[app_name] = get_application_details(file_map[app_name])137 for app_name in app_details.keys():138 run_details = _get_run_details(app_name)139 upload_json_details(run_details, app_details[app_name])140if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run stestr automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful