How to use test_output_error method in avocado

Best Python code snippet using avocado_python

rule_dry_runner.py

Source:rule_dry_runner.py Github

copy

Full Screen

1#!/usr/bin/python32"""This module runs the pipeline for specified events and shows how processing changed them."""3import json4import shutil5import tempfile6from copy import deepcopy7from difflib import ndiff8from pathlib import Path9from colorama import Fore, Back10from ruamel.yaml import YAML11from logprep.runner import Runner12from logprep.util.helper import (13 color_print_line,14 recursive_compare,15 remove_file_if_exists,16 color_print_title,17)18from logprep.util.json_handling import dump_config_as_file, parse_jsonl, parse_json19yaml = YAML(typ="safe", pure=True)20def get_runner_outputs(patched_runner):21 # pylint: disable=protected-access22 """23 Extracts the outputs of a patched logprep runner.24 Parameters25 ----------26 patched_runner : Runner27 The patched logprep runner28 Returns29 -------30 parsed_outputs : list31 A list of logprep outputs containing events, extra outputs like pre-detections or pseudonyms32 and errors33 """34 parsed_outputs = [None, None, None]35 output_config = list(patched_runner._configuration.get("output").values())[0]36 output_paths = [37 output_path for key, output_path in output_config.items() if "output_file" in key38 ]39 for output_path in output_paths:40 remove_file_if_exists(output_path)41 patched_runner.start()42 for index, output_path in enumerate(output_paths):43 parsed_outputs[index] = parse_jsonl(output_path)44 remove_file_if_exists(output_path)45 return parsed_outputs46def get_patched_runner(config_path, logger):47 """48 Creates a patched runner that bypasses check to obtain non singleton instance and the runner49 won't continue iterating on an empty pipeline.50 Parameters51 ----------52 config_path : str53 The logprep configuration that should be used for the patched runner54 logger : Logger55 The application logger the runner should use56 Returns57 -------58 runner : Runner59 The patched logprep runner60 """61 runner = Runner(bypass_check_to_obtain_non_singleton_instance=True)62 runner.set_logger(logger)63 runner.load_configuration(config_path)64 # patch runner to stop on empty pipeline65 runner._keep_iterating = lambda: False # pylint: disable=protected-access66 return runner67class DryRunner:68 """Used to run pipeline with given events and show changes made by processing."""69 def __init__(self, dry_run: str, config_path: str, full_output: bool, use_json: bool, logger):70 with open(config_path, "r", encoding="utf8") as yaml_file:71 self._config_yml = yaml.load(yaml_file)72 self._full_output = full_output73 self._use_json = use_json74 self._config_yml["input"] = {75 "json_input": {76 "type": "json_input" if use_json else "jsonl_input",77 "documents_path": dry_run,78 }79 }80 self._config_yml["output"] = {81 "jsonl_output": {82 "type": "jsonl_output",83 "output_file": dry_run,84 }85 }86 self._config_yml["process_count"] = 187 self._logger = logger88 def run(self):89 """Run the dry runner."""90 tmp_path = tempfile.mkdtemp()91 config_path = self._patch_config(tmp_path)92 patched_runner = get_patched_runner(config_path, self._logger)93 test_output, test_output_custom, test_output_error = get_runner_outputs(patched_runner)94 input_path = self._config_yml["input"]["json_input"]["documents_path"]95 input_data = parse_json(input_path) if self._use_json else parse_jsonl(input_path)96 self._print_output_results(input_data, test_output, test_output_custom, test_output_error)97 shutil.rmtree(tmp_path)98 def _patch_config(self, tmp_path):99 """Generate a config file on disk which contains the output jsonl files in a tmp dir."""100 tmp_path = Path(tmp_path)101 output_file = tmp_path / "output.jsonl"102 output_file_custom = tmp_path / "output_custom.jsonl"103 output_file_error = tmp_path / "output_errors.jsonl"104 output_config = self._config_yml["output"]["jsonl_output"]105 output_config["output_file"] = str(output_file)106 output_config["output_file_custom"] = str(output_file_custom)107 output_config["output_file_error"] = str(output_file_error)108 config_path = str(tmp_path / "generated_config.yml")109 dump_config_as_file(config_path, self._config_yml)110 return config_path111 def _print_output_results(self, input_data, test_output, test_output_custom, test_output_error):112 """Call the print methods that correspond to the output type"""113 if not test_output_error:114 self._print_transformed_events(input_data, test_output, test_output_custom)115 if self._full_output and test_output_custom and not test_output_error:116 self._print_pseudonyms(test_output_custom)117 self._print_predetections(test_output_custom)118 if test_output_error:119 self._print_errors(test_output_error)120 def _print_transformed_events(self, input_data, test_output, test_output_custom):121 """122 Print the differences between input and output event as well as corresponding pre-detections123 """124 transformed_cnt = 0125 for idx, test_item in enumerate(test_output):126 test_copy = deepcopy(test_item)127 input_copy = deepcopy(input_data[idx])128 difference = recursive_compare(test_copy, input_copy)129 if difference:130 test_json = json.dumps(test_item, sort_keys=True, indent=4)131 input_path_json = json.dumps(input_data[idx], sort_keys=True, indent=4)132 diff = ndiff(input_path_json.splitlines(), test_json.splitlines())133 color_print_title(Back.CYAN, "PROCESSED EVENT")134 self._print_ndiff_items(diff)135 transformed_cnt += 1136 for test_item_custom in test_output_custom:137 detector_id = test_item_custom.get("pre_detection_id")138 if detector_id and detector_id == test_item.get("pre_detection_id"):139 color_print_title(Back.YELLOW, "PRE-DETECTION FOR PRECEDING EVENT")140 test_json_custom = json.dumps(test_item_custom, sort_keys=True, indent=4)141 color_print_line(Back.BLACK, Fore.YELLOW, test_json_custom)142 color_print_title(Back.WHITE, f"TRANSFORMED EVENTS: {transformed_cnt}/{len(test_output)}")143 def _print_ndiff_items(self, diff):144 """145 Print the results from the ndiff library with colored lines, depending on the diff type146 """147 for item in diff:148 if item.startswith("- "):149 color_print_line(Back.BLACK, Fore.RED, item)150 elif item.startswith("+ "):151 color_print_line(Back.BLACK, Fore.GREEN, item)152 elif item.startswith("? "):153 color_print_line(Back.BLACK, Fore.WHITE, item)154 else:155 color_print_line(Back.BLACK, Fore.CYAN, item)156 def _print_pseudonyms(self, test_output_custom):157 """Print only the pseudonyms from all custom outputs"""158 color_print_title(Back.MAGENTA, "ALL PSEUDONYMS")159 for test_item in test_output_custom:160 if "pseudonym" in test_item.keys() and "origin" in test_item.keys():161 test_json = json.dumps(test_item, sort_keys=True, indent=4)162 color_print_line(Back.BLACK, Fore.MAGENTA, test_json)163 def _print_predetections(self, test_output_custom):164 """Print only the pre-detections from all custom outputs"""165 color_print_title(Back.YELLOW, "ALL PRE-DETECTIONS")166 for test_item in test_output_custom:167 if "pre_detection_id" in test_item.keys():168 test_json = json.dumps(test_item, sort_keys=True, indent=4)169 color_print_line(Back.BLACK, Fore.YELLOW, test_json)170 def _print_errors(self, test_output_error):171 """Print all errors"""172 for test_items in test_output_error:173 color_print_title(Back.RED, "ERROR")174 json_message = test_items.get("error_message")175 color_print_line(Back.BLACK, Fore.RED, json_message)176 json_original = test_items.get("document_received")177 json_processed = test_items.get("document_processed")178 diff = ndiff(str(json_original), str(json_processed))179 color_print_title(Back.YELLOW, "PARTIALLY PROCESSED EVENT")180 self._print_ndiff_items(diff)181 log_message = "^^^ RESULTS CAN NOT BE SHOWN UNTIL ALL ERRORS HAVE BEEN FIXED ^^^"...

Full Screen

Full Screen

local_target_test.py

Source:local_target_test.py Github

copy

Full Screen

1from .local_target import *2from .misc import makedirs_p3from test_support import *4@require_rsync5@pytest.fixture(scope="function")6def input_local_data(isolated_dir):7 '''Generate some input test data so we can check that it works.'''8 fname = os.path.abspath("./test_input/temp.txt")9 makedirs_p(os.path.dirname(fname))10 with open(fname, "w") as fh:11 print("hi there", file=fh)12 yield fname13 14@require_rsync15@require_python316def test_input_local_target(input_local_data):17 '''Test a local input file target'''18 ldir = os.path.abspath("./local_directory")19 assert os.path.exists(input_local_data)20 assert not os.path.exists(ldir + input_local_data)21 ft = InLocalTarget(input_local_data, ldir)22 assert ft.exists()23 assert ft.filename() == input_local_data24 assert ft.local_filename() == ldir + input_local_data25 assert os.path.exists(input_local_data)26 assert not os.path.exists(ldir + input_local_data)27 ft.prepare()28 assert os.path.exists(ldir + input_local_data)29 ft.finish()30 assert ft.exists()31 assert os.path.exists(input_local_data)32 assert os.path.exists(ldir + input_local_data)33 ft.remove()34 assert os.path.exists(input_local_data)35 assert not os.path.exists(ldir + input_local_data)36@require_rsync37@require_python338def test_input_local_target_error(input_local_data):39 '''Test a local input file target when an error occurs'''40 ldir = os.path.abspath("./local_directory")41 assert os.path.exists(input_local_data)42 assert not os.path.exists(ldir + input_local_data)43 ft = InLocalTarget(input_local_data, ldir)44 assert ft.exists()45 assert ft.filename() == input_local_data46 assert ft.local_filename() == ldir + input_local_data47 assert os.path.exists(input_local_data)48 assert not os.path.exists(ldir + input_local_data)49 ft.prepare()50 assert os.path.exists(ldir + input_local_data)51 ft.finish_error()52 assert ft.exists()53 assert os.path.exists(input_local_data)54 assert os.path.exists(ldir + input_local_data)55@require_rsync56@require_python357def test_output_local_target(isolated_dir):58 '''Test local output file target'''59 fname = os.path.abspath("./test_output/temp.txt")60 assert not os.path.exists("./test_output")61 ft = OutLocalTarget(fname, "./local_directory")62 assert not ft.exists()63 with open(ft.local_filename(), "w") as fh:64 print("hi there", file=fh)65 assert os.path.exists("./local_directory" + fname + ".generating")66 assert not os.path.exists(fname)67 assert not ft.exists()68 ft.finish()69 assert os.path.exists("./local_directory" + fname)70 assert os.path.exists(fname)71 assert ft.exists()72 ft.remove()73 assert not os.path.exists("./local_directory" + fname)74 assert not os.path.exists(fname)75 76@require_rsync77@require_python378def test_output_local_target_error(isolated_dir):79 '''Test a local output file target when an error occurs'''80 fname = os.path.abspath("./test_output/temp.txt")81 assert not os.path.exists("./test_output")82 ft = OutLocalTarget(fname, "./local_directory")83 assert not ft.exists()84 with open(ft.local_filename(), "w") as fh:85 print("hi there", file=fh)86 assert os.path.exists("./local_directory" + fname + ".generating")87 assert not os.path.exists(fname)88 assert not ft.exists()89 ft.finish_error()90 assert os.path.exists("./local_directory" + fname + ".error")91 assert os.path.exists(fname + ".error")92 assert not ft.exists()93@require_rsync94@require_python395def test_output_temp_local_target(isolated_dir):96 '''Test local output file target'''97 fname = os.path.abspath("./test_output/temp.txt")98 assert not os.path.exists("./test_output")99 ft = OutTempLocalTarget(fname, "./local_directory")100 assert not ft.exists()101 with open(ft.local_filename(), "w") as fh:102 print("hi there", file=fh)103 assert os.path.exists("./local_directory" + fname + ".generating")104 assert not os.path.exists(fname)105 assert not ft.exists()106 ft.finish()107 assert os.path.exists("./local_directory" + fname)108 # Check that real file never is created109 assert not os.path.exists(fname)110 assert ft.exists()111 ft.remove_temporary()112 assert not os.path.exists("./local_directory" + fname)113 assert not ft.exists()114 with open(ft.local_filename(), "w") as fh:115 print("hi there", file=fh)116 assert os.path.exists("./local_directory" + fname + ".generating")117 ft.remove()118 assert not os.path.exists("./local_directory" + fname + ".generating")119 120@require_rsync121@require_python3122def test_output_temp_dir_local_target(isolated_dir):123 '''Test local output file target'''124 fname = os.path.abspath("./test_output/dir")125 bname = "./local_directory/" + os.path.dirname(os.path.dirname(fname))126 assert not os.path.exists(bname + "/test_output/dir/test.txt")127 assert not os.path.exists(bname + "/test_output_generating/dir/test.txt")128 assert not os.path.exists(bname + "/test_output_error/dir/test.txt")129 ft = OutTempDirLocalTarget("test_output/dir", "./local_directory")130 assert not ft.exists()131 makedirs_p(ft.local_filename())132 with open(ft.local_filename() + "/test.txt", "w") as fh:133 print("hi there", file=fh)134 assert not os.path.exists(bname + "/test_output/dir/test.txt")135 assert os.path.exists(bname + "/test_output_generating/dir/test.txt")136 assert not os.path.exists(bname + "/test_output_error/dir/test.txt")137 assert not ft.exists()138 ft.finish()139 assert os.path.exists(bname + "/test_output/dir/test.txt")140 assert not os.path.exists(bname + "/test_output_generating/dir/test.txt")141 assert not os.path.exists(bname + "/test_output_error/dir/test.txt")142 assert ft.exists()143 ft.remove_temporary()144 assert not os.path.exists(bname + "/test_output/dir/test.txt")145 assert not os.path.exists(bname + "/test_output_generating/dir/test.txt")146 assert not os.path.exists(bname + "/test_output_error/dir/test.txt")147 assert not ft.exists()148@require_rsync149@require_python3150def test_output_temp_dir_local_target_error(isolated_dir):151 '''Test local output file target'''152 fname = os.path.abspath("./test_output/dir")153 bname = "./local_directory/" + os.path.dirname(os.path.dirname(fname))154 assert not os.path.exists(bname + "/test_output/dir/test.txt")155 assert not os.path.exists(bname + "/test_output_generating/dir/test.txt")156 assert not os.path.exists(bname + "/test_output_error/dir/test.txt")157 ft = OutTempDirLocalTarget("test_output/dir", "./local_directory")158 assert not ft.exists()159 makedirs_p(ft.local_filename())160 with open(ft.local_filename() + "/test.txt", "w") as fh:161 print("hi there", file=fh)162 assert not os.path.exists(bname + "/test_output/dir/test.txt")163 assert os.path.exists(bname + "/test_output_generating/dir/test.txt")164 assert not os.path.exists(bname + "/test_output_error/dir/test.txt")165 assert not ft.exists()166 ft.finish_error()167 assert not os.path.exists(bname + "/test_output/dir/test.txt")168 assert not os.path.exists(bname + "/test_output_generating/dir/test.txt")169 assert os.path.exists(bname + "/test_output_error/dir/test.txt")170 assert not ft.exists()171 ft.remove_temporary()172 assert not os.path.exists(bname + "/test_output/dir/test.txt")173 assert not os.path.exists(bname + "/test_output_generating/dir/test.txt")174 assert not os.path.exists(bname + "/test_output_error/dir/test.txt")175 assert not ft.exists()176# This was copied over from MSPI. We can't actually do this test here since177# we don't have all the support stuff. I'll leave this here, but we really178# need some other test here.179#from .l1b2_igc_task import L1b2IgcTask180#from geocal import read_shelve181#def test_xml_local_target(task_with_l1b1_test,182# test_version_info, test_target_info, cache_result):183 # We have a number of sequences and view here. Just pick the184 # first one to run.185# views = test_target_info.views(test_target_info.sequence_numbers()[0])186# tsk = L1b2IgcTask(views, test_version_info, cache_result=cache_result)187# tsk.run_pipeline(skip_cleanup_on_error=True)188# igc_fname = tsk.igc.filename()189# ft = XmlLocalTarget(igc_fname, "./local_directory")190# igc = read_shelve(ft.filename())191# ft.prepare()192# igc = read_shelve(ft.local_filename())193 ...

Full Screen

Full Screen

test_output.py

Source:test_output.py Github

copy

Full Screen

...5def test_output_success(click):6 output_success("test")7 click.secho.assert_called_once_with("test", fg="green", bold=True)8@mock.patch("ali.helpers.output.click")9def test_output_error(click):10 output_error("test")11 click.secho.assert_called_once_with("Error: test", fg="red", bold=True)12@mock.patch("ali.helpers.output.formatters")13@mock.patch("ali.helpers.output.lexers")14@mock.patch("ali.helpers.output.highlight")15@mock.patch("ali.helpers.output.click")16def test_output_json(click, highlight, lexers, formatters):17 lexer_mock = mock.MagicMock()18 formatter_mock = mock.MagicMock()19 lexers.get_lexer_by_name.return_value = lexer_mock20 formatters.get_formatter_by_name.return_value = formatter_mock21 value = json.dumps({"test": {"items": [1, 2, 3]}})22 highlight.return_value = "highlighted"23 output_json(value)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful