Best Python code snippet using autotest_python
test_base_runner.py
Source:test_base_runner.py  
1import os2import re3import pytest4from flexmock import flexmock5from typing import Dict, List, IO6from algorithm_tester.concurrency_runners import Runner, Runners, BaseRunner7import algorithm_tester.concurrency_runners as concurrency_runners8from algorithm_tester_common.tester_dataclasses import Algorithm, AlgTesterContext, Parser, InstancesLogger9from algorithm_tester.helpers import curr_time_millis, create_path10from tests.test_internal.fixtures import create_dummy_context, create_dummy_algorithm, get_base_parsed_data, create_dummy_parser11from algorithm_tester.plugins import Plugins12_runner: BaseRunner = BaseRunner()13def test_notify_communicators_timing():14    base_context = create_dummy_context()15    notification_vars = {16        "last_comm_time": 0,17        "instances_done": 018    }19    res = _runner.notify_communicators(base_context, [], {}, notification_vars)20    new_last_comm_time = notification_vars["last_comm_time"]21    assert notification_vars["last_comm_time"] != 022    assert res == True23    res = _runner.notify_communicators(base_context, [], {}, notification_vars)24    assert res == False25    assert notification_vars["last_comm_time"] == new_last_comm_time26    notification_vars["last_comm_time"] -= base_context.min_time_between_communications + 127    new_last_comm_time = notification_vars["last_comm_time"]28    res = _runner.notify_communicators(base_context, [], {}, notification_vars)29    assert res == True30    assert notification_vars["last_comm_time"] != new_last_comm_time31    print32def _removing_perform(context: AlgTesterContext, parsed_data: Dict[str, object]):33    """34    A function that does not return input data that is further needed in the algorithm.35    36    Arguments:37        context {AlgTesterContext} -- Used context.38        parsed_data {Dict[str, object]} -- Input data.39    40    Returns:41        Dict[str, object] -- Modified input data.42    """43    return {"id": parsed_data["id"], "item_count": parsed_data["item_count"]}44@pytest.mark.parametrize('algorithm', 45    (create_dummy_algorithm(),46    create_dummy_algorithm(name="DummyRemovingAlgorithm", perform_func=_removing_perform))47)48def test_get_solution_for_instance(algorithm: Algorithm):49    base_context = create_dummy_context(algorithms=[algorithm.get_name()])50    base_data = get_base_parsed_data(base_context, algorithm)51    base_data.update({"id": 0, "item_count": 0})52    res: Dict[str, object] = _runner.get_solution_for_instance(base_context, algorithm, base_data)53    assert "algorithm" in res54    assert "algorithm_name" in res55    assert "output_filename" in res56    print57@pytest.mark.parametrize('algorithm', 58    (create_dummy_algorithm(),59    create_dummy_algorithm(name="DummyRemovingAlgorithm", perform_func=_removing_perform))60)61def test_get_parsed_instances_data(algorithm: Algorithm):62    parser = create_dummy_parser()63    base_context = create_dummy_context(algorithms=[algorithm.get_name()], parser=parser.get_name())64    base_data = get_base_parsed_data(base_context, algorithm)65    with open(f'{base_context.input_dir}/4_inst.dat', "r") as input_file:66        res: List[Dict[str, object]] = _runner.get_parsed_instances_data(base_context, input_file, parser, algorithm)67    assert len(res) > 068    assert "algorithm" in res[0]69    assert "algorithm_name" in res[0]70    assert "output_filename" in res[0]71    print72@pytest.mark.parametrize('algorithms', 73    ([create_dummy_algorithm(), create_dummy_algorithm("Alg2")],74    [create_dummy_algorithm(name="DummyRemovingAlgorithm", perform_func=_removing_perform), create_dummy_algorithm("Alg2")])75)76def test_run_tester_for_file(algorithms: Algorithm, tmpdir):77    output_dir = tmpdir78    parser = create_dummy_parser()79    base_context: AlgTesterContext = create_dummy_context(algorithms=[alg.get_name() for alg in algorithms], parser=parser.get_name())80    base_context.num_of_instances = 500*len(algorithms)81    base_context.output_dir = output_dir.strpath82    notification_vars = {83        "last_comm_time": 0,84        "instances_done": 0,85        "instances_failed": 086    }87    instances_logger: InstancesLogger = InstancesLogger(base_context.output_dir, base_context.is_forced)88    create_path(base_context.output_dir)89    90    flexmock(Plugins)91    Plugins.should_receive("get_parser").and_return(parser)92    93    for algorithm in algorithms:94        (Plugins.should_receive("get_algorithm")95            .with_args(algorithm.get_name())96            .and_return(algorithm))97    flexmock(BaseRunner)98    BaseRunner.should_receive("notify_communicators").times(base_context.num_of_instances + 1)99    flexmock(parser).should_receive("write_result_to_file").times(base_context.num_of_instances)100    _runner.init(instances_logger)101    _runner.run_tester_for_file(base_context, f'{base_context.input_dir}/4_inst.dat', notification_vars)102    assert notification_vars["instances_done"] == base_context.num_of_instances103    assert not instances_logger._instance_log.closed104    instances_logger.close_log()105    assert instances_logger._instance_log.closed106    instances_logger.load_instances()107    assert instances_logger.get_num_of_done_instances() == base_context.num_of_instances108    109    print110def _dummy_failing_func(context: AlgTesterContext, parsed_data: Dict[str, object]) -> Dict[str, object]:111    raise Exception("Dummy exception")112def test_run_tester_for_file_exceptions(tmpdir):113    output_dir = tmpdir114    parser = create_dummy_parser()115    algorithms = [create_dummy_algorithm(), create_dummy_algorithm(name="AlgFailure", perform_func=_dummy_failing_func)]116    base_context: AlgTesterContext = create_dummy_context(parser=parser.get_name(), algorithms=[alg.get_name() for alg in algorithms])117    base_context.num_of_instances = 500118    base_context.output_dir = output_dir.strpath119    notification_vars = {120        "last_comm_time": 0,121        "instances_done": 0,122        "instances_failed": 0123    }124    instances_logger: InstancesLogger = InstancesLogger(base_context.output_dir, base_context.is_forced)125    create_path(base_context.output_dir)126    127    flexmock(Plugins)128    Plugins.should_receive("get_parser").and_return(parser)129    130    for algorithm in algorithms:131        (Plugins.should_receive("get_algorithm")132            .with_args(algorithm.get_name())133            .and_return(algorithm))134    flexmock(BaseRunner)135    BaseRunner.should_receive("notify_communicators").times(base_context.num_of_instances + 1)136    flexmock(parser).should_receive("write_result_to_file").times(base_context.num_of_instances)137    _runner.init(instances_logger)138    _runner.run_tester_for_file(base_context, f'{base_context.input_dir}/4_inst.dat', notification_vars)139    assert notification_vars["instances_done"] == base_context.num_of_instances140    assert notification_vars["instances_failed"] == base_context.num_of_instances141    print142@pytest.mark.parametrize('is_change_forced', (True, False))143def test_compute_results(is_change_forced: bool):144    base_context: AlgTesterContext = create_dummy_context()145    base_context.max_files_to_check = None146    base_context.is_forced = is_change_forced147    instances_logger: InstancesLogger = InstancesLogger(base_context.output_dir, base_context.is_forced)148    input_files = list()149    for root, _, files in os.walk(base_context.input_dir):150        for filename in files:151            input_files.append(f'{root}/{filename}')152    flexmock(BaseRunner)153    (BaseRunner.should_receive("run_tester_for_file")154        .with_args(base_context, re.compile(f'{base_context.input_dir}/.*'), object)155        .and_return(None)156        .times(len(input_files)))157    _runner.init(instances_logger)158    _runner.compute_results(base_context, input_files)...tc-decision.py
Source:tc-decision.py  
1# -*- coding: utf-8 -*-2from __future__ import absolute_import, print_function, unicode_literals3from glob import glob4from functools import reduce5import json6import jsone7import os8import sys9import requests10import slugid11import yaml12import subprocess13import networkx as nx14TASKS_ROOT = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])))15TASKCLUSTER_API_BASEURL = 'http://taskcluster/queue/v1/task/%(task_id)s'16def string_to_dict(sid, value):17    parts = sid.split('.')18    def pack(parts):19        if len(parts) == 1:20            return {parts[0]: value}21        elif len(parts):22            return {parts[0]: pack(parts[1:])}23        return parts24    return pack(parts)25def merge_dicts(*dicts):26    if not reduce(lambda x, y: isinstance(y, dict) and x, dicts, True):27        raise TypeError("Object in *dicts not of type dict")28    if len(dicts) < 2:29        raise ValueError("Requires 2 or more dict objects")30    def merge(a, b):31        for d in set(a.keys()).union(b.keys()):32            if d in a and d in b:33                if type(a[d]) == type(b[d]):34                    if not isinstance(a[d], dict):35                        ret = list({a[d], b[d]})36                        if len(ret) == 1: ret = ret[0]37                        yield (d, sorted(ret))38                    else:39                        yield (d, dict(merge(a[d], b[d])))40                else:41                    raise TypeError("Conflicting key:value type assignment", type(a[d]), a[d], type(b[d]), b[d])42            elif d in a:43                yield (d, a[d])44            elif d in b:45                yield (d, b[d])46            else:47                raise KeyError48    return reduce(lambda x, y: dict(merge(x, y)), dicts[1:], dicts[0])49def taskcluster_event_context():50    das_context = {}51    # Pre-filterting52    for k in os.environ.keys():53        if k == 'GITHUB_HEAD_USER':54            os.environ['GITHUB_HEAD_USER_LOGIN'] = os.environ[k]55            del os.environ['GITHUB_HEAD_USER']56    for k in os.environ.keys():57        if k == 'TASK_ID':58            parts = string_to_dict('taskcluster.taskGroupId', os.environ[k])59            das_context = merge_dicts(das_context, parts)60        if k.startswith('GITHUB_'):61            parts = string_to_dict(k.lower().replace('_', '.').replace('github', 'event'), os.environ[k])62            das_context = merge_dicts(das_context, parts)63    return das_context64def load_specific_contextFile(file):65    specific_context = {}66    try:67        with open(os.path.join(TASKS_ROOT, file)) as src:68            specific_context = yaml.load(src)69        if specific_context is None:70            specific_context = {}71    except FileNotFoundError:72        specific_context = {}73    return specific_context74def defaultValues_build_context():75    return load_specific_contextFile('.build.yml')76def shared_context():77    return load_specific_contextFile('.shared.yml')78def create_task_payload(build, base_context):79    print('build', build)80    build_type = os.path.splitext(os.path.basename(build))[0]81    build_context = defaultValues_build_context()82    with open(build) as src:83        build_context['build'].update(yaml.load(src)['build'])84    # Be able to use what has been defined in base_context85    # e.g., the {${event.head.branch}}86    build_context = jsone.render(build_context, base_context)87    template_context = {88        'taskcluster': {89            'taskId': as_slugid(build_type)90        },91        'build_type': build_type92    }93    with open(os.path.join(TASKS_ROOT, build_context['build']['template_file'])) as src:94        template = yaml.load(src)95    contextes = merge_dicts({}, base_context, template_context, build_context)96    for one_context in glob(os.path.join(TASKS_ROOT, '*.cyml')):97        with open(one_context) as src:98            contextes = merge_dicts(contextes, yaml.load(src))99    return jsone.render(template, contextes)100def send_task(t):101    url = TASKCLUSTER_API_BASEURL % {'task_id': t['taskId']}102    del t['taskId']103    r = requests.put(url, json=t)104    print(url, r.status_code)105    if r.status_code != requests.codes.ok:106        print(json.dumps(t, indent=2))107        print(r.content)108        print(json.loads(r.content.decode())['message'])109    return r.status_code == requests.codes.ok110slugids = {}111def as_slugid(name):112    if name not in slugids:113        slugids[name] = slugid.nice().decode()114        print('cache miss', name, slugids[name])115    else:116        print('cache hit', name, slugids[name])117    return slugids[name]118def to_int(x):119    return int(x)120def functions_context():121    return {122        'as_slugid': as_slugid,123        'to_int': to_int124    }125def is_dry_run():126    return (len(sys.argv) > 1) and (sys.argv[1] == '--dry')127def should_run():128    # Make a quick clone to fetch the last commit129    try:130        subprocess.check_call([131            'git', 'clone', '--quiet', '-b', os.environ.get('GITHUB_HEAD_BRANCH'),132            '--single-branch', os.environ.get('GITHUB_HEAD_REPO_URL'),133            '--depth=1', '/tmp/ds-clone/'134        ], env={'GIT_LFS_SKIP_SMUDGE': '1'})135    except subprocess.CalledProcessError as e:136        print("Error while git cloning:", e, file=sys.stderr)137        return False138    try:139        git_msg = subprocess.check_output([140            'git', '--git-dir=/tmp/ds-clone/.git/',141            'log', '--format=%b', '-n', '1',142            os.environ.get('GITHUB_HEAD_SHA')143        ]).decode('utf-8').strip().upper()144    except subprocess.CalledProcessError as e:145        print("Error while git show:", e, file=sys.stderr)146        return False147    print('Commit message:', git_msg)148    x_deepspeech = filter(lambda x: 'X-DEEPSPEECH:' in x, git_msg.split('\n'))149    if len(list(filter(lambda x: 'NOBUILD' in x, x_deepspeech))) == 1:150        print('Not running anything according to commit message')151        return False152    return True153if __name__ == '__main__':154    if not is_dry_run():155        # We might want to NOT run in some cases156        if not should_run():157            sys.exit(0)158    base_context = taskcluster_event_context()159    base_context = merge_dicts(base_context, functions_context())160    base_context = merge_dicts(base_context, shared_context())161    root_task = base_context['taskcluster']['taskGroupId']162    tasks_graph = nx.DiGraph()163    tasks = {}164    for build in glob(os.path.join(TASKS_ROOT, '*.yml')):165        t = create_task_payload(build, base_context)166        # We allow template to produce completely empty output167        if not t:168            continue169        if 'dependencies' in t and len(t['dependencies']) > 0:170            for dep in t['dependencies']:171                tasks_graph.add_edge(t['taskId'], dep)172        else:173            tasks_graph.add_edge(t['taskId'], root_task)174        tasks[t['taskId']] = t175    for task in nx.dfs_postorder_nodes(tasks_graph):176        # root_task is the task group and also the task id that is already177        # running, so we don't have to schedule that178        if task == root_task:179            continue180        t = tasks[task]181        if is_dry_run():182            print(json.dumps(t, indent=2))183            continue184        p = send_task(t)185        if not p:...jsons.py
Source:jsons.py  
1import decimal2import regex3import ast4import logging5import json6from datetime import datetime7from dotted.collection import DottedDict8def search_json(json_data, key):9    """Search for dot notation in json10    If ALL or ANY are included in the search path, then we will return a list of matching items.11    For example: with the data of:12      {"dogs": [13      {"name": "spot",14      "breed": "dalmatian"},15      {"name": "Daisy",16      "breed": "boomer"},17      {"name": "jack",18      "breed": "jack russel"},19      ]20      }21    Search terms, and their responses:22    dogs.1.breed = "boomer"23    dogs.2 = {"name": "jack", "breed": "jack russel"}24    dogs.ALL.name = ["spot", "Daisy", "jack"]25    dogs.ANY.name = ["spot", "Daisy", "jack"]26    dogs.ANY.name = ["spot", "Daisy", "jack"]27    Parameters:28        json_data (str): The json to be searched in29        key (str): The string to look for30    Returns:31        String: The new string with found values32    """33    if ".ALL." in key or ".ANY." in key:34        key = key.replace(".ANY.", ".ALL.")35        short_list = DottedDict(json_data)[key.split(".ALL.")[0]].to_python()36        collated_values = []37        for item in short_list:38            if type(item[key.split(".ALL.")[1]]) == datetime:39                collated_value = str(item[key.split(".ALL.")[1]])40            else:41                collated_value = item[key.split(".ALL.")[1]]42            collated_values.append(collated_value)43        found_value = json.dumps(collated_values)44    else:45        if type(DottedDict(json_data)[key]) in [str, int, bool, float, decimal.Decimal, datetime]:46            found_value = str(DottedDict(json_data)[key])47        elif callable(DottedDict(json_data)[key]):48            found_value = DottedDict(json_data)[key]()49        elif DottedDict(json_data)[key] == None:50            found_value = "None"51        else:52            found_value = json.dumps(DottedDict(json_data)[key].to_python(), sort_keys=True, default=str)53    return found_value54def merge_context(base_context, addition):55    """Merges one set of json into another56    Parameters:57        base_context (dict): The initial json to merge to58        addition (dict): The json that we want to add59        60    Returns:61        String (dict): The new json that we'll return62    """63    if len(base_context) > 0:64        for u_item in addition:65            if u_item not in base_context:66                base_context[u_item] = addition[u_item]67            else:68                if type(addition[u_item]) == dict:69                    base_context[u_item] = merge_context(base_context[u_item], addition[u_item])70                elif type(addition[u_item]) == list and type(base_context[u_item]) == list:71                    base_context[u_item] += addition[u_item]72                else:73                    base_context[u_item] = addition[u_item]74    else:75        base_context = addition76    return base_context77def merge_json(base_context, addition):78    """Merges one set of json into another79    Parameters:80        base_context (dict): The initial json to merge to81        addition (dict): The json that we want to add82    Returns:83        String (dict): The new json that we'll return84    """85    if len(base_context) > 0:86        for u_item in addition:87            if u_item not in base_context:88                base_context[u_item] = addition[u_item]89            else:90                if type(addition[u_item]) == dict:91                    base_context[u_item] = merge_json(base_context[u_item], addition[u_item])92                else:93                    base_context[u_item] = addition[u_item]94    else:95        base_context = addition96    return base_context97def is_json(myjson):98    try:99        json_object = json.loads(myjson)100    except ValueError as e:101        return False102    return True103def get_dotsearch(context, string_to_search):104    # rx = ast.literal_eval(regex.findall(r"(\[.*\])", string_to_search)[0])    105    rx = json.loads(regex.findall(r"(\[.*\])", string_to_search)[0])106    search_pattern = string_to_search.replace(regex.findall(r"(\[.*\], )", string_to_search)[0], "")107    search_key = search_pattern[:search_pattern.find(".")]108    # search_value = search_pattern[search_pattern.rfind(".") + 1:]109    search_value = search_pattern.lstrip(search_key)110    search_value = search_value.replace(".", "", 1)111    logging.debug(f"search_key: {search_key}")112    logging.debug(f"search_value: {search_value}")113    found_index = "NOTFOUND"114    for array_index, dict_struct in enumerate(rx):115        if search_key in dict_struct:116            found_value = search_json(dict_struct, search_key)117            if (found_value != "NOTFOUND") and (found_value == search_value):118                found_index = str(array_index)119    logging.debug(f"found_index: {found_index}")120            ...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
