Best Python code snippet using stestr_python
kubeflow_handler.py
Source:kubeflow_handler.py  
...218    return experiment.id219  def _print_runs(self, pipeline_name, runs):220    """Prints runs in a tabular format with headers mentioned below."""221    headers = ['pipeline_name', 'run_id', 'status', 'created_at', 'link']222    def _get_run_details(run_id):223      """Return the link to the run detail page."""224      return '{prefix}/#/runs/details/{run_id}'.format(225          prefix=self._client._get_url_prefix(), run_id=run_id)  # pylint: disable=protected-access226    data = [227        [  # pylint: disable=g-complex-comprehension228            pipeline_name, run.id, run.status,229            run.created_at.isoformat(),230            _get_run_details(run.id)231        ] for run in runs232    ]...polaris.py
Source:polaris.py  
...35        self._switch_mode_specific_inst_settings(kwargs.get("mode"))36        self._inst_settings.update_attributes(kwargs=kwargs)37        vanadium_d = self._create_vanadium(run_number_string=self._inst_settings.run_in_range,38                                           do_absorb_corrections=self._inst_settings.do_absorb_corrections)39        run_details = self._get_run_details(run_number_string=self._inst_settings.run_in_range)40        polaris_algs.save_unsplined_vanadium(vanadium_ws=vanadium_d,41                                             output_path=run_details.unsplined_vanadium_file_path)42        return vanadium_d43    def create_total_scattering_pdf(self, **kwargs):44        if 'pdf_type' not in kwargs or kwargs['pdf_type'] not in ['G(r)', 'g(r)', 'RDF(r)', 'G_k(r)']:45            kwargs['pdf_type'] = 'G(r)'46            logger.warning('PDF type not specified or is invalid, defaulting to G(r)')47        self._inst_settings.update_attributes(kwargs=kwargs)48        # Generate pdf49        run_details = self._get_run_details(self._inst_settings.run_number)50        focus_file_path = self._generate_out_file_paths(run_details)["nxs_filename"]51        cal_file_name = os.path.join(self._inst_settings.calibration_dir, self._inst_settings.grouping_file_name)52        pdf_output = polaris_algs.generate_ts_pdf(run_number=self._inst_settings.run_number,53                                                  focus_file_path=focus_file_path,54                                                  merge_banks=self._inst_settings.merge_banks,55                                                  q_lims=self._inst_settings.q_lims,56                                                  cal_file_name=cal_file_name,57                                                  sample_details=self._sample_details,58                                                  delta_r=self._inst_settings.delta_r,59                                                  delta_q=self._inst_settings.delta_q,60                                                  pdf_type=self._inst_settings.pdf_type,61                                                  lorch_filter=self._inst_settings.lorch_filter,62                                                  freq_params=self._inst_settings.freq_params,63                                                  debug=self._inst_settings.debug)64        return pdf_output65    def set_sample_details(self, **kwargs):66        self._switch_mode_specific_inst_settings(kwargs.get("mode"))67        kwarg_name = "sample"68        sample_details_obj = common.dictionary_key_helper(69            dictionary=kwargs, key=kwarg_name,70            exception_msg="The argument containing sample details was not found. Please"71                          " set the following argument: " + kwarg_name)72        self._sample_details = sample_details_obj73    # Overrides74    def _apply_absorb_corrections(self, run_details, ws_to_correct):75        if self._is_vanadium:76            return polaris_algs.calculate_van_absorb_corrections(77                ws_to_correct=ws_to_correct, multiple_scattering=self._inst_settings.multiple_scattering,78                is_vanadium=self._is_vanadium)79        else:80            return absorb_corrections.run_cylinder_absorb_corrections(81                ws_to_correct=ws_to_correct, multiple_scattering=self._inst_settings.multiple_scattering,82                sample_details_obj=self._sample_details, is_vanadium=self._is_vanadium)83    def _crop_banks_to_user_tof(self, focused_banks):84        return common.crop_banks_using_crop_list(focused_banks, self._inst_settings.focused_cropping_values)85    def _crop_raw_to_expected_tof_range(self, ws_to_crop):86        cropped_ws = common.crop_in_tof(ws_to_crop=ws_to_crop, x_min=self._inst_settings.raw_data_crop_values[0],87                                        x_max=self._inst_settings.raw_data_crop_values[1])88        return cropped_ws89    def _crop_van_to_expected_tof_range(self, van_ws_to_crop):90        cropped_ws = common.crop_banks_using_crop_list(bank_list=van_ws_to_crop,91                                                       crop_values_list=self._inst_settings.van_crop_values)92        return cropped_ws93    @staticmethod94    def _generate_input_file_name(run_number, file_ext=""):95        polaris_old_name = "POL"96        polaris_new_name = "POLARIS"97        first_run_new_name = 9691298        if isinstance(run_number, list):99            # Lists use recursion to deal with individual entries100            updated_list = []101            for run in run_number:102                updated_list.append(Polaris._generate_input_file_name(run_number=run))103            return updated_list104        else:105            # Select between old and new prefix106            # Test if it can be converted to an int or if we need to ask Mantid to do it for us107            if isinstance(run_number, str) and not run_number.isdigit():108                # Convert using Mantid and take the first element which is most likely to be the lowest digit109                use_new_name = int(common.generate_run_numbers(run_number)[0]) >= first_run_new_name110            else:111                use_new_name = int(run_number) >= first_run_new_name112            prefix = polaris_new_name if use_new_name else polaris_old_name113            return prefix + str(run_number) + file_ext114    def _get_input_batching_mode(self):115        return self._inst_settings.input_mode116    def _get_instrument_bin_widths(self):117        return self._inst_settings.focused_bin_widths118    def _get_run_details(self, run_number_string):119        run_number_string_key = self._generate_run_details_fingerprint(run_number_string,120                                                                       self._inst_settings.file_extension)121        if run_number_string_key in self._run_details_cached_obj:122            return self._run_details_cached_obj[run_number_string_key]123        self._run_details_cached_obj[run_number_string_key] = polaris_algs.get_run_details(124            run_number_string=run_number_string, inst_settings=self._inst_settings, is_vanadium_run=self._is_vanadium)125        return self._run_details_cached_obj[run_number_string_key]126    def _switch_mode_specific_inst_settings(self, mode):127        if mode is None and hasattr(self._inst_settings, "mode"):128            mode = self._inst_settings.mode129        self._inst_settings.update_attributes(advanced_config=polaris_advanced_config.get_mode_specific_dict(mode),130                                              suppress_warnings=True)131    def _apply_paalmanpings_absorb_and_subtract_empty(self, workspace, summed_empty, sample_details,132                                                      paalman_pings_events_per_point=None):...nfss_upload_results.py
Source:nfss_upload_results.py  
...14from glob import glob15source_file_path = ""16upload_script = ""17nfss_config = ""18def _get_run_details(app_name):19    return dict(20        image_arch='',21        ran_at=datetime.datetime.utcnow().isoformat(),22        package_version=_get_package_version(app_name),23        application_name=app_name,24    )25def _get_package_version(appname):26    """Return the package version for application *appname*.27    Return empty string if appname details are not found.28    """29    try:30        dpkg_output = subprocess.check_output(31            ['adb', 'shell', 'dpkg', '-s', appname],32            universal_newlines=True33        )34        version_line = [35            l for l in dpkg_output.split('\n') if l.startswith('Version:')36        ]37        return version_line[0].split()[1]38    except (subprocess.CalledProcessError, IndexError):39        return "Unknown"40def upload_json_details(run_details, app_details):41    app_run_details = run_details.copy()42    app_run_details["events"] = app_details43    _upload_data(run_details['application_name'], app_run_details)44def _upload_data(test_name, run_json):45    try:46        run_json_string = json.dumps(run_json)47    except ValueError as e:48        print("Error: Data does not appear to be valid json: %s" % e)49        sys.exit(3)50    print("Uploading data for :memevent:-", test_name)51    global upload_script52    global nfss_config53    try:54        upload_process = subprocess.Popen(55            [upload_script, nfss_config, 'memevent', test_name],56            stdin=subprocess.PIPE,57            stdout=subprocess.PIPE,58            stderr=subprocess.PIPE,59        )60        stdout, stderr = upload_process.communicate(61            input=run_json_string.encode()62        )63        print("stdout: {}\n\nstderr: {}".format(stdout, stderr))64    except Exception as e:65        print("Something went terribly wrong: ", e)66    if upload_process.returncode != 0:67        raise subprocess.CalledProcessError('Failed to upload to nfss')68def _get_files_app_name_and_test(filename):69    """return tuple containing (appname, testname)."""70    app_name_search = re.search(71        'memory_usage_([a-zA-Z-]*).(.*)\.json',72        filename73    )74    return (app_name_search.group(1), app_name_search.group(2))75def get_application_readings(json_data):76    app_results = []77    pids = json_data["pids"]78    for reading in json_data["readings"]:79        results = dict(80            event=reading["event"],81            start_time=reading["start_time"],82            stop_time=reading["stop_time"],83        )84        # find the data results for this event (based on pid).85        for pid in pids:86            if str(pid) in reading["data"].keys():87                results["data"] = reading["data"][str(pid)]88                results["pid"] = pid89                break90        app_results.append(results)91    return app_results92def get_application_results(json_filepath):93    with open(json_filepath, "r") as f:94        json_data = json.load(f)95    return get_application_readings(json_data)96def get_application_details(application_files):97    """For every file this application has grab out the details and return a98    list dictionaries containing reading details.99    """100    app_result = []101    for json_file in application_files:102        app_result.extend(get_application_results(json_file))103    return app_result104def map_files_to_applications():105    """For any memory result files that exist, return a dictionary whos keys106    are the applications name mapped to the file.107    We can then produce a single json result for each application regardless of108    there being many tests / results for it.109    """110    global source_file_path111    json_results_filename_pattern = os.path.join(112        source_file_path,113        "memory_usage_*.json"114    )115    file_map = defaultdict(list)116    for json_result_file in glob(json_results_filename_pattern):117        app_name, test_name = _get_files_app_name_and_test(json_result_file)118        file_map[app_name].append(json_result_file)119    return file_map120def usage():121    print("{} <source file path> <nfss upload script> <nfss config file>"122          .format(sys.argv[0]))123def main():124    if len(sys.argv) != 4:125        usage()126        exit(1)127    global source_file_path128    source_file_path = sys.argv[1]129    global upload_script130    upload_script = sys.argv[2]131    global nfss_config132    nfss_config = sys.argv[3]133    app_details = dict()134    file_map = map_files_to_applications()135    for app_name in file_map.keys():136        app_details[app_name] = get_application_details(file_map[app_name])137    for app_name in app_details.keys():138        run_details = _get_run_details(app_name)139        upload_json_details(run_details, app_details[app_name])140if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
