Best Python code snippet using green
TestRunner.py
Source:TestRunner.py  
1import os2from athenataf.config import fwork3from athenataf.config import devices4from athenataf.lib.util.ConfigReader import ConfigReader5from athenataf.lib.util.ExcelReader import ExcelReader6from athenataf.lib.test.TestReporter import TestReporter7#from athenataf.lib.functionality.common.deviceverifier import DeviceVerifier8import logging9logger = logging.getLogger('athenataf')10import pprint11import time12class TestRunner:13    '''14        Base runner Class for all Test Cases15        Add tests to run from Excel data file thus 16        providing Data Driven Support to the frame work17    '''18    def __init__(self, config):19        '''20            Base runner Class Initiator Sets Data files from where Data need21            to be retrieved and to be used for Building Tests.22        '''23        logger.info("Initializing TestRunner")24        # The keys are test link ids and the values are import information.25        self.__test_map = {}26        if not os.path.isdir(fwork.RESULTS_DIR):27            os.makedirs(fwork.RESULTS_DIR)28        self.reporter = TestReporter(config)29        self.config = config30        if not 'device' in os.environ:31            if not self.config.options.switch:32                os.environ['device'] = "IAP_1"33            else:34                os.environ['device'] = "Switch_1"35        self.config.global_vars = ConfigReader("global_vars")36        self.config.config_vars = ConfigReader("config_vars")37        self._test_discover_count = 038        self.filter_input_files = False39        self.filter_testlink_ids = False40        self.filter_test_classes = False41        self.filter_test_methods = False42        self.filtered_test_id_list = []43        if self.config.options.input_files != "ALL":44            self.filter_input_files = True45            self.config.input_files = [i.strip().upper() for i in self.config.options.input_files.split(",")]46        if self.config.options.testlink_ids != "ALL":47            self.filter_testlink_ids = True48            self.config.testlink_ids = [i.strip().upper() for i in self.config.options.testlink_ids.split(",")]49        if self.config.options.test_classes != "ALL":50            self.filter_test_classes = True51            self.config.test_classes = [i.strip().upper() for i in self.config.options.test_classes.split(",")]52        if self.config.options.test_methods != "ALL":53            self.filter_test_methods = True54            self.config.test_methods = [i.strip().upper() for i in self.config.options.test_methods.split(",")]55        import re56        self.aClassRef = {}57        self.aObjectRef = {}58        if not self.config.options.ignore_device:59            from inspect import isclass60            from Device_Module import IAPDevice61            # from Device_Module import ClientDevice62            from Device_Module import SwitchDevice63            classes = [x for x in dir(devices) if isclass(getattr(devices, x))]64            print classes65            for clas in classes:66                if getattr(devices, clas).type == "IAP":67                    self.aClassRef[clas] = type(clas, (IAPDevice.IAPDevice,), {})68                # elif getattr(devices, clas).type == "CLIENT":69                    # self.aClassRef[clas] = type(clas, (ClientDevice.ClientDevice,), {})70                elif getattr(devices, clas).type == "SWITCH":71                   self.aClassRef[clas] = type(clas, (SwitchDevice.SwitchDevice,), {})72                self.aObjectRef[clas] = self.aClassRef[clas]()73                print self.aObjectRef.values()74                val = self.aClassRef[clas]75                print clas76                print val77                for att in dir(getattr(devices, clas)):78                    if att != "__doc__" and att != "__module__":79                        setattr(val, att, getattr(getattr(devices, clas), att))80        self.error_type = None81    def _include_input_file(self, dd_file):82        logger.debug("Executing Input File Filter.")    83        if not dd_file.endswith(".xls"):    84            return False85        if self.filter_input_files:86            logger.debug("Check %s in %s" % (dd_file, self.config.input_files))    87            file_name = dd_file.split(".")[0]88            if file_name.upper() in self.config.input_files:89                return True90            else:91                return False92        else:93            return True94    def _include_test_id_based_on_column_filter(self, test_info):95        filter = test_info.record.get("FILTER?", None)96        if (filter is None) or (filter == u''):97            return True98        filter = filter.upper().strip()99        if self.config.options.exclude_filter:100            if filter == "Y":101                return False102            else:103                return True104        else:105            if filter == "Y":106                return True107            else:108                return False            109                            110    def _include_test_id(self, test_info):111        include = False112        if self.filter_testlink_ids:113            logger.debug("Check filter: %s in %s" % (test_info.id.upper(), self.config.testlink_ids))114            if test_info.id.upper() in self.config.testlink_ids:115                include = self._include_test_id_based_on_column_filter(test_info)116            else:117                include = False118        else:119            include = self._include_test_id_based_on_column_filter(test_info)120        121        return include122        123    def _include_test_class(self, class_name):124        if self.filter_test_classes:125            if class_name.upper() in self.config.test_classes:126                return True127            else:128                return False129        else:130            return True    131    def _include_test_method(self, method_name):132        if self.filter_test_methods:133            if method_name.upper() in self.config.test_methods:134                return True135            else:136                return False137        else:138            return True            139    def _add_test(self, test_module_meta):140        '''141            Adds Test meta data and forms Test_Queue from data files142        '''143        Test = None144        exec ("from %s import %s as Test" % (test_module_meta[0], test_module_meta[1]))145        test = Test(self.config)146        method_tuples = []147        logger.debug("Disovering Test Methods for Test Class: %s" % test.__class__.__name__)148        for m in test.__class__.__dict__.keys():149            if m.startswith("test"):150                test_id = "-".join([i.upper() for i in m.split("_")[1:3]])            151                if not self._include_test_method(m):152                    self.filtered_test_id_list.append(test_id)153                    continue154                self._test_discover_count += 1155                logger.debug(m)156                self.__test_map[test_id] = {157                                                "PARENT" : test,158                                                "PARENT_IMPORT_PATH" : test_module_meta[0],159                                                "METHOD_NAME" : m,160                                            }161        162    def discover(self, tests_filter=None):163        test_queue = []164        counter = 1165        for root, dirs, files in os.walk(self.config.options.tests_dir):166            index = root.find(self.config.options.tests_dir)167            import_path_prefix = ".".join(["athenataf", "tests", root[index + len(self.config.options.tests_dir) + 1:].replace("\\",".").replace("/", "."), ""])168            for test_module in files:169                if test_module.endswith(".py") and not test_module.startswith("_"):170                    test_module_name = test_module.split(".")[0]171                    if not self._include_test_class(test_module_name):172                        continue173                    test_module_import = import_path_prefix + test_module_name174                    include = True175                    if tests_filter is not None:176                        if test_module_name not in tests_filter:177                            include = False178                    if include:179                        test_queue.append([test_module_import, test_module_name])180        for test_module_meta in test_queue:181            self._add_test(test_module_meta)182            183        logger.info("TOTAL TESTS DISCOVERED IN CODE: %d" % self._test_discover_count)184        logger.debug("Test ID Keys:")    185        keys = self.__test_map.keys()186        keys.sort()187        for key in keys:188            logger.debug(key)189            190    def prepare(self):191        pass192        193    def setUp(self):194        self.reporter.setUp()195        if not self.config.options.ignore_device:196            retries = 0197            print(self.aObjectRef.values())198            for dev in self.aObjectRef.values():199                dev.connect()200            # while retries < 5:201                # dev = self.aObjectRef[os.environ['device']]202                # if dev.get_device_status():203                    # break204                # retries = retries + 1205            # if retries == 5:206                # logger.info("*** DEVICE IS NOT UP. ABORTING TEST EXECUTION. ***")207                # for dev in self.aObjectRef.values():208                    # dev.disconnect()209                # import sys210                # sys.exit(1)211        212    def _get_test_result_map(self, test_info):213        test_idea = test_info.record.get("TEST_IDEA", None)214        if (test_idea == u"") or (test_idea is None):215            test_idea = "Not Provided"216        test_result = {217                    "TESTLINK_ID" : test_info.record["TESTLINK_ID"],218                    "TEST_IDEA" : test_idea,                                    219                    "IMPORT_PATH" : test_info.ipath,220                    "TEST_CLASS" : test_info.klass,221                    "TEST_METHOD"    : test_info.method,222                    "TEST_DATA" : str(test_info.record),223                    "RESULT" : "NOT_SET",224                    "DURATION" : "",225                    "EXCEPTION" : "",226                    "TRACE"    : ""227        }228        return test_result229    def _execute_fixture(self, test_obj, method_name, test_result):230        try:231            if not method_name.startswith("test"):232                if not self.config.options.ignore_device:233                    retries = 0234                    # while retries < 3:235                        # dev = self.aObjectRef[os.environ['device']]236                        # if dev.get_device_status():237                            # break238                        # retries = retries + 1239                    # if retries == 3:240                        # logger.info("*** DEVICE IS NOT UP. ABORTING TEST EXECUTION. ***")241                        # for dev in self.aObjectRef.values():242                            # dev.disconnect()243                        # import sys244                        # sys.exit(1)245                logger.info("**** Device is UP *****")246                logger.info("Executing test.%s" % method_name)247            if not self.config.options.fake_run:248                apply(getattr(test_obj, method_name))249        except AssertionError, e:250            if 'actual and expected running config' in str(e):251                self.error_type = 'EE'252            elif 'Post clean-up the device did not' in str(e):253                self.error_type = 'EP'254            else:255                self.error_type = 'UKN'256            test_result["RESULT"] = "FAIL"257            import traceback258            test_result["EXCEPTION"] = "AssertionError in Test %s: %s" % (method_name, str(e))259            test_result["TRACE"] = traceback.format_exc(e)260            test_obj.on_fail()261            return None            262        except Exception, e:263            self.error_type = 'UKN'264            test_result["RESULT"] = "ERROR"            265            import traceback266            test_result["EXCEPTION"] = "Exception in Test %s: %s" % (method_name, str(e))267            test_result["TRACE"] = traceback.format_exc(e)    268            test_obj.on_error()            269            return None270        else:271            if method_name in ["setUp","tearDown"]:272                return "%s_SUCCESS" % method_name.upper()273            else:274                return "TESTMETHOD_SUCCESS"275            276    def _execute_test(self, test_info, test_result):277        try:278            setup_result = self._execute_fixture(test_info.parent, "setUp", test_result)279            run_result = None280            teardown_result = None281            if setup_result is not None:282                test_info.parent.record = test_info.record                283                run_result = self._execute_fixture(test_info.parent, test_info.method, test_result)284            teardown_result = self._execute_fixture(test_info.parent, "tearDown", test_result)285            if not ((setup_result is None) or (run_result is None) or (teardown_result is None)):286                test_result["RESULT"] = "PASS"287                method_parts = test_info.method.split('_')288                test_method_id = method_parts[1] + '-' + method_parts[2]289                f = open(os.path.join(self.config.results_dir, "passed_test_cases.txt"), "a")290                f.write(test_method_id + '\n')291                f.close()292                logger.info("TEST CASE RESULT : PASS")293            else:294                method_parts = test_info.method.split('_')295                test_method_id = method_parts[1] + '-' + method_parts[2]296                f = open(os.path.join(self.config.results_dir, "failed_test_cases.txt"), "a")297                f.write(test_method_id + ' - ' + self.error_type + '\n')298                f.close()299                logger.info("TEST CASE RESULT : FAIL")300                self.error_type = None301        except Exception, e:302            msg = "Exception in TestRunner while executing test: %s::%s" % (test_info.klass, test_info.method)303            logger.error(msg)304            test_result["RESULT"] = "ERROR_IN_TESTRUNNER"305            import traceback306            test_result["EXCEPTION"] = msg + str(e)307            test_result["TRACE"] = traceback.format_exc()308    def _execute_teardown_testclass(self):309        try:310            # Execute the Class Tear Down311            if ExecutionContext.current_class is not None:312                logger.info("%s:Tear Down Test Class" % ExecutionContext.current_class.split(".")[-1])313            else:314                logger.debug("[None]:Tear Down Test Class")315            if not self.config.options.fake_run:            316                ExecutionContext.current_test_obj.tearDownTestClass()317            logger.info("*" * 30)318        except Exception, e:319            msg = "Error in tearDownTestClass for %s" % ExecutionContext.current_class320            logger.error(msg)321            import traceback322            logger.debug(traceback.format_exc())323            if ExecutionContext.current_test_obj is not None:324                ExecutionContext.current_test_obj.on_error()325                ExecutionContext.current_test_obj.stop_browser()326            else:327                logger.debug("Current Test Object is None.")                328            329    def _execute_setup_testclass(self, test_info, test_result):330        # Execute the Class Set Up                    331        logger.info("*" * 30)                        332        logger.info("%s:Set Up Test Class" % test_info.klass)333        ExecutionContext.current_class = test_info.ipath334        ExecutionContext.current_test_obj =  test_info.parent        335        try:336            if not self.config.options.fake_run:337                test_info.parent.setUpTestClass()338        except Exception, e:339            msg = "Error in setUpTestClass for %s" % test_info.klass340            logger.error(msg)341            test_result["RESULT"] = "ERROR_IN_SETUP_TESTCLASS"342            import traceback343            test_result["EXCEPTION"] = msg + str(e)344            test_result["TRACE"] = traceback.format_exc()345            ExecutionContext.last_setup_class_failure = True346            test_info.parent.on_error()            347            return None348        else:349            # Only the absence of exception, the current variables are reset to new class350            ExecutionContext.last_setup_class_failure = False351            return "SUCCESS"352            353    def _log_testid_info(self, test_info):354        logger.info("-" * 30)                    355        logger.info("%s%s" % ("TESTLINK ID:".ljust(15),test_info.id))356        logger.info("%s%s" % ("IMPORT PATH:".ljust(15),test_info.ipath))357        logger.info("%s%s" % ("TEST METHOD:".ljust(15),test_info.method))                    358        logger.info("-" * 30)359    def _get_test_info(self, tests_reader, counter):360        try:361            record = tests_reader.get_next_record()362            logger.debug(str(record))363        except Exception, e:364            msg = "Error in reading records from Excel file: %s" % tests_reader.file_path365            logger.error(msg)366            logger.error(str(e))367            import traceback368            logger.error(traceback.print_exc())                        369            raise Exception(msg)                370        if record is None:371            raise Exception("Records finished")372            373        test_id = record.get("TESTLINK_ID", None)374        if (test_id is None) or (test_id == u''):375            logger.error("TESTLINK_ID not mentioned in %s, Record# %d" % (tests_reader.file_path, counter))376            return None377        test_method_meta = self.__test_map.get(test_id, None)378        if test_method_meta is None:379            if not test_id in self.filtered_test_id_list:380                logger.error("No automated test found for the TESTLINK_ID=%s" % (test_id))381            else:382                logger.error("Was added to excluded list post filtering: TESTLINK_ID=%s" % (test_id))383            return None    384        test_info = TestInfo()385        test_info.id = test_id386        test_info.record = record387        test_info.parent = test_method_meta["PARENT"]388        test_info.ipath = test_method_meta["PARENT_IMPORT_PATH"]389        test_info.klass = test_info.parent.__class__.__name__390        test_info.method = test_method_meta["METHOD_NAME"]391        return test_info392            393    def _get_tests_reader(self, file_name, dir_name):394        try:395            tests_dir_path = os.path.join(self.config.options.input_dir, dir_name)396            tests_path = os.path.join(tests_dir_path, file_name)397            tests_reader = ExcelReader(tests_path)398            return tests_reader399        except Exception, e:400            logger.error("Error in initialization of Excel file: %s" % tests_path)401            logger.error(str(e))402            import traceback403            logger.error(traceback.print_exc())404            return None405                    406    def _execute_excel_tests(self, file_name, dir_name):407        tests_reader = self._get_tests_reader(file_name, dir_name)408        if tests_reader is None: raise Exception("Not able to read from Excel File.")409        counter = 0410        while 1:411            begin_timestamp = time.time()412            counter += 1413            try:414                test_info = self._get_test_info(tests_reader, counter)415            except Exception, e:416                # import traceback417                # traceback.print_exc()418                break419            if test_info is None:420                continue421            if not self._include_test_id(test_info):422                logger.debug("Excluding the Test Id: %s" % test_info.id)423                continue424            else:425                logger.debug("Including the Test Id: %s" % test_info.id)426            test_result = self._get_test_result_map(test_info)427            logger.debug("Compare Current test class and test method's class")428            logger.debug("Current Test Class: %s" % ExecutionContext.current_class)429            logger.debug("Test Method Class: %s" % test_info.klass)    430            test_info.parent.current_test_method = test_info.method431            test_info.parent.current_test_id = test_info.id            432            if (ExecutionContext.current_class != test_info.ipath):433                logger.debug("Not same.")434                if ExecutionContext.current_class is not None:435                    logger.debug("Current class not None. So calling teardown.")436                    self._execute_teardown_testclass()437                logger.debug("Calling Setup test class.")                    438                result = self._execute_setup_testclass(test_info, test_result)439                if result is None:440                    logger.debug("Setup test class returned None. Skipping the current test method.")441                    logger.debug("Executing teardown test class.")                    442                    self._execute_teardown_testclass()                443                    test_result["SCREENSHOT"] = test_info.parent.current_screen_shots.get(test_info.method,[])444                    self.reporter.report(test_result)445                    continue446            elif ExecutionContext.last_setup_class_failure:447                logger.debug("Last setup class had failed.")            448                result = self._execute_setup_testclass(test_info, test_result)449                if result is None:450                    logger.debug("Setup test class returned None. Skipping the current test method.")                451                    logger.debug("Executing teardown test class.")                    452                    self._execute_teardown_testclass()                453                    test_result["SCREENSHOT"] = test_info.parent.current_screen_shots.get(test_info.method,[])454                    self.reporter.report(test_result)455                    continue456                    457            self._log_testid_info(test_info)458            self._execute_test(test_info, test_result)459            ExecutionContext.current_test_obj.record = None460            end_timestamp = time.time()461            test_result["DURATION"] = "%.2f" % (end_timestamp - begin_timestamp)462            test_result["SCREENSHOT"] = test_info.parent.current_screen_shots.get(test_info.method,[])463            self.reporter.report(test_result)464        # Tear down the last test class in the queue465        self._execute_teardown_testclass()466        467    def run(self):468        '''469            Runs Tests which are added in the Test_Queue470        '''471        allowed_test_types = []472        if self.config.options.test_types.upper() == "ALL":473            allowed_test_types = ["ui", "api"]474        else:475            allowed_test_types = [self.config.options.test_types.lower()]476        for dd_dir in os.listdir(self.config.options.input_dir):477            if dd_dir in allowed_test_types:478                for dd_file in os.listdir(os.path.join(self.config.options.input_dir, dd_dir)):479                    try:480                        if self._include_input_file(dd_file):481                            logger.debug("Including the Input File: %s" % dd_file)                    482                            self._execute_excel_tests(dd_file, dd_dir)483                        else:484                            logger.debug("Excluding the Input File: %s" % dd_file)485                    except Exception, e:486                        print "Exception occured in TestRunner"487                        import traceback488                        traceback.print_exc()489        490    def tearDown(self):491        '''492            Tear Down for Test Runner493        '''494        for dev in self.aObjectRef.values():495            logger.info(dev)496            dev.disconnect()497        self.reporter.tearDown()    498        499class TestInfo:500    def __init__(self):501        id = None502        record = None503        parent = None504        ipath = None505        klass = None506        method = None507        508class ExecutionContext:509    current_class = None510    current_test_obj = None...utility.py
Source:utility.py  
...89    else:90        logger.error('Enter the test case id to check the status')91def load_test_from_test_case(testCaseClass):92    # sortTestMethodsUsing = Callable[[str, str], bool]93    def filter_test_methods(attrname):94        testMethodPrefix = 'test'95        return attrname.startswith(testMethodPrefix) \96               and callable(getattr(testCaseClass, attrname))97    test_case_names = list(filter(filter_test_methods, dir(testCaseClass)))98    logger.info("Test case names: {}".format(test_case_names))99    return test_case_names100def run_test_by_one(id):101    cmd = "nosetests" + " -v --with-id " + id102    process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')103    if process.stdout:104        log = process.communicate()[1]105        # log = log.decode("utf-8")106        endlist = log.partition('\n')[-1]107#...utility_load_tests.py
Source:utility_load_tests.py  
...3from test_files.test_python_scripts import SimpleTest4from collections import OrderedDict5def loadTestsFromTestCase(testCaseClass):6    # sortTestMethodsUsing = Callable[[str, str], bool]7    def filter_test_methods(attrname):8        testMethodPrefix = 'test'9        return attrname.startswith(testMethodPrefix) \10               and callable(getattr(testCaseClass, attrname))11    test_case_names = list(filter(filter_test_methods, dir(testCaseClass)))12    print("Test case names: {}".format(test_case_names))13    return test_case_names14def run_test_by_one(id):15    cmd = "nosetests" + " -v --with-id " + id16    print(cmd)17    process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')18    if process.stdout:19        log = process.communicate()[1]20        # log = log.decode("utf-8")21        print('loging', log)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
