How to use base_context method in autotest

Best Python code snippet using autotest_python

test_base_runner.py

Source:test_base_runner.py Github

copy

Full Screen

1import os2import re3import pytest4from flexmock import flexmock5from typing import Dict, List, IO6from algorithm_tester.concurrency_runners import Runner, Runners, BaseRunner7import algorithm_tester.concurrency_runners as concurrency_runners8from algorithm_tester_common.tester_dataclasses import Algorithm, AlgTesterContext, Parser, InstancesLogger9from algorithm_tester.helpers import curr_time_millis, create_path10from tests.test_internal.fixtures import create_dummy_context, create_dummy_algorithm, get_base_parsed_data, create_dummy_parser11from algorithm_tester.plugins import Plugins12_runner: BaseRunner = BaseRunner()13def test_notify_communicators_timing():14 base_context = create_dummy_context()15 notification_vars = {16 "last_comm_time": 0,17 "instances_done": 018 }19 res = _runner.notify_communicators(base_context, [], {}, notification_vars)20 new_last_comm_time = notification_vars["last_comm_time"]21 assert notification_vars["last_comm_time"] != 022 assert res == True23 res = _runner.notify_communicators(base_context, [], {}, notification_vars)24 assert res == False25 assert notification_vars["last_comm_time"] == new_last_comm_time26 notification_vars["last_comm_time"] -= base_context.min_time_between_communications + 127 new_last_comm_time = notification_vars["last_comm_time"]28 res = _runner.notify_communicators(base_context, [], {}, notification_vars)29 assert res == True30 assert notification_vars["last_comm_time"] != new_last_comm_time31 print32def _removing_perform(context: AlgTesterContext, parsed_data: Dict[str, object]):33 """34 A function that does not return input data that is further needed in the algorithm.35 36 Arguments:37 context {AlgTesterContext} -- Used context.38 parsed_data {Dict[str, object]} -- Input data.39 40 Returns:41 Dict[str, object] -- Modified input data.42 """43 return {"id": parsed_data["id"], "item_count": parsed_data["item_count"]}44@pytest.mark.parametrize('algorithm', 45 (create_dummy_algorithm(),46 create_dummy_algorithm(name="DummyRemovingAlgorithm", perform_func=_removing_perform))47)48def test_get_solution_for_instance(algorithm: Algorithm):49 base_context = create_dummy_context(algorithms=[algorithm.get_name()])50 base_data = get_base_parsed_data(base_context, algorithm)51 base_data.update({"id": 0, "item_count": 0})52 res: Dict[str, object] = _runner.get_solution_for_instance(base_context, algorithm, base_data)53 assert "algorithm" in res54 assert "algorithm_name" in res55 assert "output_filename" in res56 print57@pytest.mark.parametrize('algorithm', 58 (create_dummy_algorithm(),59 create_dummy_algorithm(name="DummyRemovingAlgorithm", perform_func=_removing_perform))60)61def test_get_parsed_instances_data(algorithm: Algorithm):62 parser = create_dummy_parser()63 base_context = create_dummy_context(algorithms=[algorithm.get_name()], parser=parser.get_name())64 base_data = get_base_parsed_data(base_context, algorithm)65 with open(f'{base_context.input_dir}/4_inst.dat', "r") as input_file:66 res: List[Dict[str, object]] = _runner.get_parsed_instances_data(base_context, input_file, parser, algorithm)67 assert len(res) > 068 assert "algorithm" in res[0]69 assert "algorithm_name" in res[0]70 assert "output_filename" in res[0]71 print72@pytest.mark.parametrize('algorithms', 73 ([create_dummy_algorithm(), create_dummy_algorithm("Alg2")],74 [create_dummy_algorithm(name="DummyRemovingAlgorithm", perform_func=_removing_perform), create_dummy_algorithm("Alg2")])75)76def test_run_tester_for_file(algorithms: Algorithm, tmpdir):77 output_dir = tmpdir78 parser = create_dummy_parser()79 base_context: AlgTesterContext = create_dummy_context(algorithms=[alg.get_name() for alg in algorithms], parser=parser.get_name())80 base_context.num_of_instances = 500*len(algorithms)81 base_context.output_dir = output_dir.strpath82 notification_vars = {83 "last_comm_time": 0,84 "instances_done": 0,85 "instances_failed": 086 }87 instances_logger: InstancesLogger = InstancesLogger(base_context.output_dir, base_context.is_forced)88 create_path(base_context.output_dir)89 90 flexmock(Plugins)91 Plugins.should_receive("get_parser").and_return(parser)92 93 for algorithm in algorithms:94 (Plugins.should_receive("get_algorithm")95 .with_args(algorithm.get_name())96 .and_return(algorithm))97 flexmock(BaseRunner)98 BaseRunner.should_receive("notify_communicators").times(base_context.num_of_instances + 1)99 flexmock(parser).should_receive("write_result_to_file").times(base_context.num_of_instances)100 _runner.init(instances_logger)101 _runner.run_tester_for_file(base_context, f'{base_context.input_dir}/4_inst.dat', notification_vars)102 assert notification_vars["instances_done"] == base_context.num_of_instances103 assert not instances_logger._instance_log.closed104 instances_logger.close_log()105 assert instances_logger._instance_log.closed106 instances_logger.load_instances()107 assert instances_logger.get_num_of_done_instances() == base_context.num_of_instances108 109 print110def _dummy_failing_func(context: AlgTesterContext, parsed_data: Dict[str, object]) -> Dict[str, object]:111 raise Exception("Dummy exception")112def test_run_tester_for_file_exceptions(tmpdir):113 output_dir = tmpdir114 parser = create_dummy_parser()115 algorithms = [create_dummy_algorithm(), create_dummy_algorithm(name="AlgFailure", perform_func=_dummy_failing_func)]116 base_context: AlgTesterContext = create_dummy_context(parser=parser.get_name(), algorithms=[alg.get_name() for alg in algorithms])117 base_context.num_of_instances = 500118 base_context.output_dir = output_dir.strpath119 notification_vars = {120 "last_comm_time": 0,121 "instances_done": 0,122 "instances_failed": 0123 }124 instances_logger: InstancesLogger = InstancesLogger(base_context.output_dir, base_context.is_forced)125 create_path(base_context.output_dir)126 127 flexmock(Plugins)128 Plugins.should_receive("get_parser").and_return(parser)129 130 for algorithm in algorithms:131 (Plugins.should_receive("get_algorithm")132 .with_args(algorithm.get_name())133 .and_return(algorithm))134 flexmock(BaseRunner)135 BaseRunner.should_receive("notify_communicators").times(base_context.num_of_instances + 1)136 flexmock(parser).should_receive("write_result_to_file").times(base_context.num_of_instances)137 _runner.init(instances_logger)138 _runner.run_tester_for_file(base_context, f'{base_context.input_dir}/4_inst.dat', notification_vars)139 assert notification_vars["instances_done"] == base_context.num_of_instances140 assert notification_vars["instances_failed"] == base_context.num_of_instances141 print142@pytest.mark.parametrize('is_change_forced', (True, False))143def test_compute_results(is_change_forced: bool):144 base_context: AlgTesterContext = create_dummy_context()145 base_context.max_files_to_check = None146 base_context.is_forced = is_change_forced147 instances_logger: InstancesLogger = InstancesLogger(base_context.output_dir, base_context.is_forced)148 input_files = list()149 for root, _, files in os.walk(base_context.input_dir):150 for filename in files:151 input_files.append(f'{root}/{filename}')152 flexmock(BaseRunner)153 (BaseRunner.should_receive("run_tester_for_file")154 .with_args(base_context, re.compile(f'{base_context.input_dir}/.*'), object)155 .and_return(None)156 .times(len(input_files)))157 _runner.init(instances_logger)158 _runner.compute_results(base_context, input_files)...

Full Screen

Full Screen

tc-decision.py

Source:tc-decision.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2from __future__ import absolute_import, print_function, unicode_literals3from glob import glob4from functools import reduce5import json6import jsone7import os8import sys9import requests10import slugid11import yaml12import subprocess13import networkx as nx14TASKS_ROOT = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])))15TASKCLUSTER_API_BASEURL = 'http://taskcluster/queue/v1/task/%(task_id)s'16def string_to_dict(sid, value):17 parts = sid.split('.')18 def pack(parts):19 if len(parts) == 1:20 return {parts[0]: value}21 elif len(parts):22 return {parts[0]: pack(parts[1:])}23 return parts24 return pack(parts)25def merge_dicts(*dicts):26 if not reduce(lambda x, y: isinstance(y, dict) and x, dicts, True):27 raise TypeError("Object in *dicts not of type dict")28 if len(dicts) < 2:29 raise ValueError("Requires 2 or more dict objects")30 def merge(a, b):31 for d in set(a.keys()).union(b.keys()):32 if d in a and d in b:33 if type(a[d]) == type(b[d]):34 if not isinstance(a[d], dict):35 ret = list({a[d], b[d]})36 if len(ret) == 1: ret = ret[0]37 yield (d, sorted(ret))38 else:39 yield (d, dict(merge(a[d], b[d])))40 else:41 raise TypeError("Conflicting key:value type assignment", type(a[d]), a[d], type(b[d]), b[d])42 elif d in a:43 yield (d, a[d])44 elif d in b:45 yield (d, b[d])46 else:47 raise KeyError48 return reduce(lambda x, y: dict(merge(x, y)), dicts[1:], dicts[0])49def taskcluster_event_context():50 das_context = {}51 # Pre-filterting52 for k in os.environ.keys():53 if k == 'GITHUB_HEAD_USER':54 os.environ['GITHUB_HEAD_USER_LOGIN'] = os.environ[k]55 del os.environ['GITHUB_HEAD_USER']56 for k in os.environ.keys():57 if k == 'TASK_ID':58 parts = string_to_dict('taskcluster.taskGroupId', os.environ[k])59 das_context = merge_dicts(das_context, parts)60 if k.startswith('GITHUB_'):61 parts = string_to_dict(k.lower().replace('_', '.').replace('github', 'event'), os.environ[k])62 das_context = merge_dicts(das_context, parts)63 return das_context64def load_specific_contextFile(file):65 specific_context = {}66 try:67 with open(os.path.join(TASKS_ROOT, file)) as src:68 specific_context = yaml.load(src)69 if specific_context is None:70 specific_context = {}71 except FileNotFoundError:72 specific_context = {}73 return specific_context74def defaultValues_build_context():75 return load_specific_contextFile('.build.yml')76def shared_context():77 return load_specific_contextFile('.shared.yml')78def create_task_payload(build, base_context):79 print('build', build)80 build_type = os.path.splitext(os.path.basename(build))[0]81 build_context = defaultValues_build_context()82 with open(build) as src:83 build_context['build'].update(yaml.load(src)['build'])84 # Be able to use what has been defined in base_context85 # e.g., the {${event.head.branch}}86 build_context = jsone.render(build_context, base_context)87 template_context = {88 'taskcluster': {89 'taskId': as_slugid(build_type)90 },91 'build_type': build_type92 }93 with open(os.path.join(TASKS_ROOT, build_context['build']['template_file'])) as src:94 template = yaml.load(src)95 contextes = merge_dicts({}, base_context, template_context, build_context)96 for one_context in glob(os.path.join(TASKS_ROOT, '*.cyml')):97 with open(one_context) as src:98 contextes = merge_dicts(contextes, yaml.load(src))99 return jsone.render(template, contextes)100def send_task(t):101 url = TASKCLUSTER_API_BASEURL % {'task_id': t['taskId']}102 del t['taskId']103 r = requests.put(url, json=t)104 print(url, r.status_code)105 if r.status_code != requests.codes.ok:106 print(json.dumps(t, indent=2))107 print(r.content)108 print(json.loads(r.content.decode())['message'])109 return r.status_code == requests.codes.ok110slugids = {}111def as_slugid(name):112 if name not in slugids:113 slugids[name] = slugid.nice().decode()114 print('cache miss', name, slugids[name])115 else:116 print('cache hit', name, slugids[name])117 return slugids[name]118def to_int(x):119 return int(x)120def functions_context():121 return {122 'as_slugid': as_slugid,123 'to_int': to_int124 }125def is_dry_run():126 return (len(sys.argv) > 1) and (sys.argv[1] == '--dry')127def should_run():128 # Make a quick clone to fetch the last commit129 try:130 subprocess.check_call([131 'git', 'clone', '--quiet', '-b', os.environ.get('GITHUB_HEAD_BRANCH'),132 '--single-branch', os.environ.get('GITHUB_HEAD_REPO_URL'),133 '--depth=1', '/tmp/ds-clone/'134 ], env={'GIT_LFS_SKIP_SMUDGE': '1'})135 except subprocess.CalledProcessError as e:136 print("Error while git cloning:", e, file=sys.stderr)137 return False138 try:139 git_msg = subprocess.check_output([140 'git', '--git-dir=/tmp/ds-clone/.git/',141 'log', '--format=%b', '-n', '1',142 os.environ.get('GITHUB_HEAD_SHA')143 ]).decode('utf-8').strip().upper()144 except subprocess.CalledProcessError as e:145 print("Error while git show:", e, file=sys.stderr)146 return False147 print('Commit message:', git_msg)148 x_deepspeech = filter(lambda x: 'X-DEEPSPEECH:' in x, git_msg.split('\n'))149 if len(list(filter(lambda x: 'NOBUILD' in x, x_deepspeech))) == 1:150 print('Not running anything according to commit message')151 return False152 return True153if __name__ == '__main__':154 if not is_dry_run():155 # We might want to NOT run in some cases156 if not should_run():157 sys.exit(0)158 base_context = taskcluster_event_context()159 base_context = merge_dicts(base_context, functions_context())160 base_context = merge_dicts(base_context, shared_context())161 root_task = base_context['taskcluster']['taskGroupId']162 tasks_graph = nx.DiGraph()163 tasks = {}164 for build in glob(os.path.join(TASKS_ROOT, '*.yml')):165 t = create_task_payload(build, base_context)166 # We allow template to produce completely empty output167 if not t:168 continue169 if 'dependencies' in t and len(t['dependencies']) > 0:170 for dep in t['dependencies']:171 tasks_graph.add_edge(t['taskId'], dep)172 else:173 tasks_graph.add_edge(t['taskId'], root_task)174 tasks[t['taskId']] = t175 for task in nx.dfs_postorder_nodes(tasks_graph):176 # root_task is the task group and also the task id that is already177 # running, so we don't have to schedule that178 if task == root_task:179 continue180 t = tasks[task]181 if is_dry_run():182 print(json.dumps(t, indent=2))183 continue184 p = send_task(t)185 if not p:...

Full Screen

Full Screen

jsons.py

Source:jsons.py Github

copy

Full Screen

1import decimal2import regex3import ast4import logging5import json6from datetime import datetime7from dotted.collection import DottedDict8def search_json(json_data, key):9 """Search for dot notation in json10 If ALL or ANY are included in the search path, then we will return a list of matching items.11 For example: with the data of:12 {"dogs": [13 {"name": "spot",14 "breed": "dalmatian"},15 {"name": "Daisy",16 "breed": "boomer"},17 {"name": "jack",18 "breed": "jack russel"},19 ]20 }21 Search terms, and their responses:22 dogs.1.breed = "boomer"23 dogs.2 = {"name": "jack", "breed": "jack russel"}24 dogs.ALL.name = ["spot", "Daisy", "jack"]25 dogs.ANY.name = ["spot", "Daisy", "jack"]26 dogs.ANY.name = ["spot", "Daisy", "jack"]27 Parameters:28 json_data (str): The json to be searched in29 key (str): The string to look for30 Returns:31 String: The new string with found values32 """33 if ".ALL." in key or ".ANY." in key:34 key = key.replace(".ANY.", ".ALL.")35 short_list = DottedDict(json_data)[key.split(".ALL.")[0]].to_python()36 collated_values = []37 for item in short_list:38 if type(item[key.split(".ALL.")[1]]) == datetime:39 collated_value = str(item[key.split(".ALL.")[1]])40 else:41 collated_value = item[key.split(".ALL.")[1]]42 collated_values.append(collated_value)43 found_value = json.dumps(collated_values)44 else:45 if type(DottedDict(json_data)[key]) in [str, int, bool, float, decimal.Decimal, datetime]:46 found_value = str(DottedDict(json_data)[key])47 elif callable(DottedDict(json_data)[key]):48 found_value = DottedDict(json_data)[key]()49 elif DottedDict(json_data)[key] == None:50 found_value = "None"51 else:52 found_value = json.dumps(DottedDict(json_data)[key].to_python(), sort_keys=True, default=str)53 return found_value54def merge_context(base_context, addition):55 """Merges one set of json into another56 Parameters:57 base_context (dict): The initial json to merge to58 addition (dict): The json that we want to add59 60 Returns:61 String (dict): The new json that we'll return62 """63 if len(base_context) > 0:64 for u_item in addition:65 if u_item not in base_context:66 base_context[u_item] = addition[u_item]67 else:68 if type(addition[u_item]) == dict:69 base_context[u_item] = merge_context(base_context[u_item], addition[u_item])70 elif type(addition[u_item]) == list and type(base_context[u_item]) == list:71 base_context[u_item] += addition[u_item]72 else:73 base_context[u_item] = addition[u_item]74 else:75 base_context = addition76 return base_context77def merge_json(base_context, addition):78 """Merges one set of json into another79 Parameters:80 base_context (dict): The initial json to merge to81 addition (dict): The json that we want to add82 Returns:83 String (dict): The new json that we'll return84 """85 if len(base_context) > 0:86 for u_item in addition:87 if u_item not in base_context:88 base_context[u_item] = addition[u_item]89 else:90 if type(addition[u_item]) == dict:91 base_context[u_item] = merge_json(base_context[u_item], addition[u_item])92 else:93 base_context[u_item] = addition[u_item]94 else:95 base_context = addition96 return base_context97def is_json(myjson):98 try:99 json_object = json.loads(myjson)100 except ValueError as e:101 return False102 return True103def get_dotsearch(context, string_to_search):104 # rx = ast.literal_eval(regex.findall(r"(\[.*\])", string_to_search)[0]) 105 rx = json.loads(regex.findall(r"(\[.*\])", string_to_search)[0])106 search_pattern = string_to_search.replace(regex.findall(r"(\[.*\], )", string_to_search)[0], "")107 search_key = search_pattern[:search_pattern.find(".")]108 # search_value = search_pattern[search_pattern.rfind(".") + 1:]109 search_value = search_pattern.lstrip(search_key)110 search_value = search_value.replace(".", "", 1)111 logging.debug(f"search_key: {search_key}")112 logging.debug(f"search_value: {search_value}")113 found_index = "NOTFOUND"114 for array_index, dict_struct in enumerate(rx):115 if search_key in dict_struct:116 found_value = search_json(dict_struct, search_key)117 if (found_value != "NOTFOUND") and (found_value == search_value):118 found_index = str(array_index)119 logging.debug(f"found_index: {found_index}")120 ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful