Best Python code snippet using autotest_python
subcommand_unittest.py
Source:subcommand_unittest.py  
...182    def _get_cmd(self, func, args):183        cmd = _create_subcommand(func, args)184        cmd.result_pickle = self.god.create_mock_class(file, 'file')185        return self.god.create_mock_class(cmd, 'subcommand')186    def _get_tasklist(self):187        return [self._get_cmd(lambda x: x * 2, (3,)),188                self._get_cmd(lambda: None, [])]189    def _setup_common(self):190        tasklist = self._get_tasklist()191        for task in tasklist:192            task.fork_start.expect_call()193        return tasklist194    def test_success(self):195        tasklist = self._setup_common()196        for task in tasklist:197            task.fork_waitfor.expect_call(timeout=None).and_return(0)198            (subcommand.cPickle.load.expect_call(task.result_pickle)199                    .and_return(6))200            task.result_pickle.close.expect_call()201        subcommand.parallel(tasklist)202        self.god.check_playback()203    def test_failure(self):204        tasklist = self._setup_common()...resultparser.py
Source:resultparser.py  
1import MySQLdb2from string import join, strip3db = None4def init(dbname):5    global db6    pwd = strip(open("/home/sfarmer/.xeno-mysql-pwd").readline())7    db = MySQLdb.connect(host="localhost", user="sfarmer",8                         db=dbname, passwd=pwd)9def create_qualified_name(id, tasks):10    task = tasks[id]11    if task[2] > 0:12        return create_qualified_name(task[2], tasks) + "/" + task[0]13    else:14        return task[0]15# The result of one task for one build on one system16class TaskResult:17    def __init__(self, taskid, task, status):18        self.taskid = taskid19        self.task = task20        self.status = status21    def successful(self):22        return self.status == "PASS"23    def get_full_name(self):24        return self.task[3]25class TaskParser:26    # Get the expected list of tasks27    _get_tasklist = "SELECT id, sort_order, parent, name FROM task ORDER BY sort_order"28    def __init__(self):29        self.tasks = {}30        tasklist = db.cursor()31        tasklist.execute(self._get_tasklist)32        for id, sort_order, parent, name in tasklist.fetchall():33            self.tasks[id] = [name, sort_order, parent, name]34        # create list of qualified task names35        for id, task in filter(lambda task: task[1][2] > 0,36                               self.tasks.items()):37            self.tasks[id][3] = create_qualified_name(id, self.tasks)38    def make_taskresult(self, taskid, status):39        return TaskResult(taskid, self.tasks[taskid], status)40    def sort_hierarchy(self, a, b):41        a_parent = self.tasks[a][2]42        b_parent = self.tasks[b][2]43        if a_parent == b_parent:44            return int(self.tasks[a][1] - self.tasks[b][1])45        else:46            if (a_parent == 0 or b_parent == 0):47                # Both can't be zero or above is true; cheat - one is zero48                return int(a_parent - b_parent)49            else:50                # Both have (different) parent51                return int(self.tasks[a_parent][1] - self.tasks[b_parent][1])52    def get_expected_task_ids(self):53        ids = self.tasks.keys()54        ids.sort(self.sort_hierarchy)55        return ids56    def get_task_info(self, id):57        return self.tasks[id]58class ResultSet:59    # Get results on all tasks from one build by one system60    _single_result = "SELECT task, status FROM task_result WHERE system = %i AND build = %i"61    def __init__(self, task_parser, build, system, time):62        self.tasks = {}63        self.success = 164        self.time = time65        self.build = build66        self.system = system67        self.task_parser = task_parser68        taskcur = db.cursor()69        taskcur.execute(self._single_result % (system, build))70        for taskid, status in taskcur.fetchall():71            self.tasks[taskid] = self.task_parser.make_taskresult(taskid,72                                                                  status)73            # If any one task is unsuccessful, the whole result is74            # considered unsatisfactory75            if status != "PASS":76                self.success = 077            # FIXME: make sure we have a TaskResult for every task in78            # the build, even if it was skipped79    def successful(self):         return self.success80    def get_time(self):           return self.time81    def get_build_id(self):       return self.build82    def get_system_id(self):      return self.system83    def get_tasks(self):84        return self.tasks.values()85    def get_task_by_id(self, id):86        return self.tasks[id]87    def get_failed_tasks(self):88        return filter(lambda x: not x.successful(), self.tasks.values())89class SystemList:90    # Get info on one system91    _system_info = "SELECT name, sysname, release, version, machine, testname FROM system WHERE id = %i"92    def __init__(self):93        self.dict = {}94        self.infocur = db.cursor()95    def add_system(self, system):96        if not self.dict.has_key(system):97            self.infocur.execute(self._system_info % system)98            # name, sysname, release, version, machine, testname99            self.dict[system] = self.infocur.fetchone()100    def get_list(self):101        return self.dict.values()102    def get_identity(self, system):103        dasys = self.dict[system]104        name = dasys[0]105        test = ""106        if dasys[5] != "":107            test = "-%s" % dasys[5]108        return "%s%s" % (name, test)109class ResultList:110    def __init__(self, query, parser):111        self.results = []112        self.syslist = SystemList()113        self.buildlist = []114        if parser != None:115            self.parser = parser116        else:117            self.parser = TaskParser()118        self.recent = db.cursor()119        self.recent.execute(query)120        for build, system, time in self.recent.fetchall():121            self.syslist.add_system(system)122            if not [build, time] in self.buildlist:123                self.buildlist.append([build, time])124            self.results.append(ResultSet(self.parser, build, system, time))125    def get_system_list(self):126        return self.syslist127    def get_build_list(self):128        return self.buildlist129    def get_results_by_build(self, daid):130        return filter(lambda res, id=daid: res.get_build_id() == id,131                      self.results)132    def get_successful(self):133        return filter(lambda x: x.successful(), self.results)134    def get_failed(self):135        return filter(lambda x: not x.successful(), self.results)136    def get_task_parser(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
