Best Python code snippet using locust
lst_creator.py
Source:lst_creator.py  
1#!/usr/bin/env python2#3## Licensed to the .NET Foundation under one or more agreements.4## The .NET Foundation licenses this file to you under the MIT license.5#6##7# Title               :lst_creator.py8#9# Script to create a working list file from the test overlay directory. This10# will be used by smarty to run tests.11#12################################################################################13import argparse14import datetime15import os16import re17import sys18from collections import defaultdict19################################################################################20# Argument Parser21################################################################################22DESCRIPTION = """Python script to help create/update the arm64 lstFile23              """24PARSER = argparse.ArgumentParser(description=DESCRIPTION)25PARSER.add_argument("--test", dest="testing", action="store_true", default=False)26PARSER.add_argument("-lst_file", dest="old_list_file", nargs='?', default=None)27PARSER.add_argument("-pri0_test_dir", dest="pri0_test_dir", nargs='?', default=None)28PARSER.add_argument("-pri1_test_dir", dest="pri1_test_dir", nargs='?', default=None)29PARSER.add_argument("-commit_hash", dest="commit_hash", nargs='?', default=None)30PARSER.add_argument("-failures_csv", dest="failures_csv", nargs='?', default=None)31PARSER.add_argument("--unset_new", dest="unset_new", action="store_true", default=False)32ARGS = PARSER.parse_args(sys.argv[1:])33################################################################################34# Helper Functions35################################################################################36def create_list_file(file_name, metadata):37    """ Create a lstFile given a set of metadata input38    Args:39        file_name (str): Location to write the lstFile40        metadata ({ str: { str: str } }): Dictionary mapping test name to41                                        : a tuple, the first tuple's value is42                                        : a dictionary of key/value attributes,43                                        : the second is test index.44    """45    current_time = datetime.datetime.now()46    current_time_str = current_time.strftime("%d-%b-%Y %H:%M:%S%z")47    metadata = [metadata[item] for item in metadata]48    metadata = sorted(metadata, key=lambda item: item[1])49    new_metadata = [item for item in metadata if item[1] == -1]50    old_metadata = [item for item in metadata if item[1] != -1]51    with open(file_name, "w") as file_handle:52        file_handle.write("## This list file has been produced automatically. Any changes\n")53        file_handle.write("## are subject to being overwritten when reproducing this file.\n")54        file_handle.write("## \n")55        file_handle.write("## Last Updated: %s\n" % current_time_str)56        file_handle.write("## Commit: %s\n" % ARGS.commit_hash)57        file_handle.write("## \n")58        order = ["RelativePath", "WorkingDir", "Expected", 59                "MaxAllowedDurationSeconds", "Categories", "HostStyle"]60        def write_metadata(data, count=None):61            for item in data:62                test_name = item[0]["RelativePath"]63                if item[1] != -1:64                    count = item[1]65                item = item[0]66                # Get the test name.67                title = "[%s_%d]" % (test_name.split("\\")[-1], count)68                count += 169                file_handle.write("%s\n" % title)70                attribute_str = ""71                for key in order:72                    attribute_str += "%s=%s\n" % (key, item[key])73                file_handle.write(attribute_str + "\n")74        write_metadata(old_metadata)75        old_number = 076        try:77            old_number = old_metadata[-1][1] + 178        except:79            # New lstFile80            pass81        write_metadata(new_metadata, old_number + 1)82def create_metadata(tests):83    """ Given a set of tests create the metadata around them84    Args:85        tests ({str : int}): List of tests for which to determine metadata86                           : int represents the priority87    Returns:88        test_metadata ({ str: { str: str } }): Dictionary mapping test name to89                                             : a dictionary of key/value90                                             : attributes.91    """92    test_metadata = defaultdict(lambda: None)93    failures_csv = ARGS.failures_csv94    failure_information = defaultdict(lambda: None)95    if failures_csv is not None:96        lines = []97        assert(os.path.isfile(failures_csv))98        with open(failures_csv, "r") as file_handle:99            lines = file_handle.readlines()100        101        try:102            for line in lines:103                split = line.split(",")104                relative_path = split[0].replace("/", "\\")105                category = split[1]106                failure_information[relative_path] = category.strip()107        except:108            raise Exception("Error. CSV format expects: relativepath,category")109    for test in tests:110        test_name = test111        priority = tests[test]112        working_directory = os.path.dirname(test_name).replace("/", "\\")113        # Make sure the tests use the windows \ seperator.114        relative_path = test_name.replace("/", "\\")115        max_duration = "600"116        117        if priority == 0:118            categories = "EXPECTED_PASS"119        else:120            categories = "EXPECTED_PASS;Pri%d" % priority121        122        expected = "0"123        host_style = "0"124        metadata = defaultdict(lambda: None)125        metadata["RelativePath"] = relative_path126        metadata["WorkingDir"] = working_directory127        metadata["MaxAllowedDurationSeconds"] = max_duration128        metadata["HostStyle"] = host_style129        metadata["Expected"] = expected130        metadata["Categories"] = categories131        if failure_information[relative_path] is not None:132            metadata["Categories"] = failure_information[relative_path]133        test_metadata[relative_path] = metadata134    return test_metadata135def get_all_tests(base_dir):136    """ Find all of the tests in the enlistment137    Args:138        base_dir (str): Directory to start traversing from139    Returns:140        test_list ([str]): List of the tests. Note this is defined to be every141                         : cmd file under the base_dir.142    Note:143        To find the tests correctly you must build the tests correctly and 144        pass that directory. This method will NOT check to make sure that 145        this has been done correctly.146        This is a recursive method.147    """148    def get_all_tests_helper(working_dir):149        """ Helper function to recursively get all tests.150        """151        assert os.path.isdir(working_dir)152        items = os.listdir(working_dir)153        items = [os.path.join(working_dir, item) for item in items]154        dirs = [item for item in items if os.path.isdir(item)]155        tests = [item for item in items if ".cmd" in item]156        for item in dirs:157            tests += get_all_tests_helper(item)158        return tests159    # Recursively get all of the tests in the directory.160    tests = get_all_tests_helper(base_dir)161    # Find the correct base directory for the tests.162    common_prefix = os.path.commonprefix(tests)163    if common_prefix is not None:164        tests = [test.replace(common_prefix, "") for test in tests]165    return tests166def log(message):167    """ Log a debug message. This is to be used when the --test option is passed168    """169    if ARGS.testing is True:170        print message171def parse_lst_file(lst_file):172    """Parse a lstFile given.173    Args:174        lst_file(str): location of the lstFile175    Returns:176        test_metadata (defaultdict(lambda: None)): Key is test name.177    """178    assert os.path.isfile(lst_file)179    contents = None180    with open(lst_file) as file_handle:181        contents = file_handle.read()182    split = re.split("\[(.*?)\]", contents)183    unique_name = None184    test_metadata = defaultdict(lambda: None)185    for item in split:186        if len(item) == 0 or item[0] == "#":187            continue188        if unique_name is None:189            unique_name = item190        else:191            index = int(unique_name.split("_")[-1])192            metadata = defaultdict(lambda: None)193            attributes = item.split("\n")194            for attribute in attributes:195                # Skip the removed new lines.196                if len(attribute) == 0:197                    continue198                pair = attribute.split("=")199                key = pair[0].strip()200                value = pair[1].strip()201                metadata[key] = value202            # Relative path is unique, while the test name alone is not.203            unique_name = metadata["RelativePath"]204            test_metadata[unique_name] = (metadata, index)205            unique_name = None206    return test_metadata207################################################################################208# Main209################################################################################210def main(args):211    """ Main method212    Args:213        args ([str]): the arugments passed to the program.214    """215    # Assign all of the passed variables.216    pri0_test_dir = args.pri0_test_dir217    pri1_test_dir = args.pri1_test_dir218    old_list_file = args.old_list_file219    commit_hash = args.commit_hash220    unset_new = args.unset_new221    if commit_hash is None:222        print "Error please provide a commit hash."223        sys.exit(1)224    if pri0_test_dir is None or not os.path.isdir(pri0_test_dir):225        print "Error the Pri0 test directory passed is not a valid directory."226        sys.exit(1)227    if pri1_test_dir is None or not os.path.isdir(pri1_test_dir):228        print "Error the Pri1 test directory passed is not a valid directory."229        sys.exit(1)230    pri0_tests = get_all_tests(pri0_test_dir)231    print "Found %d tests in the pri0 test directory." % (len(pri0_tests))232    pri1_tests = get_all_tests(pri1_test_dir)233    print "Found %d tests in the pri1 test directory." % (len(pri1_tests))234    print235    priority_marked_tests = defaultdict(lambda: None)236    237    for test in pri1_tests:238        priority_marked_tests[test] = 1239    for test in pri0_tests:240        priority_marked_tests[test] = 0241    old_test_metadata = None242    # If we are updating an old lstFile. Get all of the tests from that243    # lstFile and their metadata.244    if old_list_file is not None:245        old_test_metadata = parse_lst_file(old_list_file)246        print "Found %d tests in the old lstFile." % (len(old_test_metadata))247        print248    test_metadata = create_metadata(priority_marked_tests)249    # Make sure the tuples are set up correctly.250    for item in test_metadata:251        test_metadata[item] = (test_metadata[item], -1)252    if old_test_metadata is not None:253        # If the new information has been changed, we will need to update254        # the lstFile.255        new_test_count = 0256        update_count = 0257        for test_name in test_metadata:258            new_metadata = test_metadata[test_name]259            old_metadata = old_test_metadata[test_name]260            attributes = None261            if old_test_metadata[test_name] is None:262                new_test_count += 1263                new_metadata[0]["Categories"] += ";NEW"264                old_test_metadata[test_name] = (new_metadata[0], -1)265            else:266                index = old_metadata[1]267                old_metadata = old_metadata[0]268                attributes = set(old_metadata.keys() + new_metadata[0].keys())269                # Make sure we go through all attributes of both sets.270                # If an attribute exists in one set but not the other it will271                # be None. If the new metadata has a new attribute, write this272                # into the old metadata. If the old metadata has an attribute273                # that does not exist in the new set. Do not remove it.274                overwritten = False275                for attribute in attributes:276                    if attribute == "MaxAllowedDurationSeconds":277                            continue278                    if attribute == "Categories":279                        new_split = new_metadata[0]["Categories"].split(";")280                        old_split = old_metadata["Categories"].split(";")281                        if unset_new:282                            if "NEW" in old_split:283                                old_split.remove("NEW")284                        # If an old test is marked as a failing test. Make285                        # sure that we carry that information along.286                        if "EXPECTED_PASS" in new_split and "EXPECTED_FAIL" in old_split:287                            new_split.remove("EXPECTED_PASS")288                        # If it used to be marked as pass but it is now failing. Make sure289                        # we remove the old category.290                        elif "EXPECTED_FAIL" in new_split and "EXPECTED_PASS" in old_split:291                            old_split.remove("EXPECTED_PASS")292                        joined_categories = set(old_split + new_split)293                        if (old_split != new_split):294                            overwritten = True295                        ordered_categories = []296                        for item in old_split:297                            if item in joined_categories:298                                ordered_categories.append(item)299                                joined_categories.remove(item)300                        ordered_categories = [item for item in ordered_categories if item != ""]301                        old_metadata[attribute] = ";".join(ordered_categories)302                        old_metadata[attribute] = old_metadata[attribute] + ";" + ";".join(joined_categories) if len(joined_categories) > 0 else old_metadata[attribute]303                        old_test_metadata[test_name] = (old_metadata, index)304                    elif new_metadata[0][attribute] != old_metadata[attribute]:305                            # If the old information is not the same as the new306                            # information, keep the new information. overwrite the old307                            # metadata.308                            if new_metadata[0][attribute] is not None:309                                overwritten = True310                                old_metadata[attribute] = new_metadata[0][attribute]311                                old_test_metadata[test_name] = (old_metadata, index)312                    if overwritten:313                        update_count += 1314        tests_removed = 0315        tests_to_remove = []316        for old_test_name in old_test_metadata:317            # Remove all old unreferenced tests318            if old_test_name not in test_metadata:319                tests_to_remove.append(old_test_name)320                tests_removed += 1321        for test_name in tests_to_remove:322            old_test_metadata.pop(test_name)323        print "Added %d tests." % new_test_count324        print "Removed %d tests." % tests_removed325        print "Finished join. %d tests updated." % update_count326        test_metadata = old_test_metadata327    # Overwrite the old file if provided, else use the generic name Tests.lst328    lst_file = "Tests.lst" if old_list_file is None else old_list_file329    # Write out the new lstFile330    create_list_file(lst_file, test_metadata)331################################################################################332################################################################################333if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
