Best Python code snippet using autotest_python
monitor_db_unittest.py
Source:monitor_db_unittest.py  
...177        for _ in xrange(2): # metahost scheduling can take two ticks178            self._dispatcher._schedule_new_jobs()179    def _test_basic_scheduling_helper(self, use_metahosts):180        'Basic nonmetahost scheduling'181        self._create_job_simple([1], use_metahosts)182        self._create_job_simple([2], use_metahosts)183        self._run_scheduler()184        self._assert_job_scheduled_on(1, 1)185        self._assert_job_scheduled_on(2, 2)186        self._check_for_extra_schedulings()187    def _test_priorities_helper(self, use_metahosts):188        'Test prioritization ordering'189        self._create_job_simple([1], use_metahosts)190        self._create_job_simple([2], use_metahosts)191        self._create_job_simple([1,2], use_metahosts)192        self._create_job_simple([1], use_metahosts, priority=1)193        self._run_scheduler()194        self._assert_job_scheduled_on(4, 1) # higher priority195        self._assert_job_scheduled_on(2, 2) # earlier job over later196        self._check_for_extra_schedulings()197    def _test_hosts_ready_helper(self, use_metahosts):198        """199        Only hosts that are status=Ready, unlocked and not invalid get200        scheduled.201        """202        self._create_job_simple([1], use_metahosts)203        self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1')204        self._run_scheduler()205        self._check_for_extra_schedulings()206        self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 '207                       'WHERE id=1')208        self._run_scheduler()209        self._check_for_extra_schedulings()210        self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 '211                       'WHERE id=1')212        self._run_scheduler()213        if not use_metahosts:214            self._assert_job_scheduled_on(1, 1)215        self._check_for_extra_schedulings()216    def _test_hosts_idle_helper(self, use_metahosts):217        'Only idle hosts get scheduled'218        self._create_job(hosts=[1], active=True)219        self._create_job_simple([1], use_metahosts)220        self._run_scheduler()221        self._check_for_extra_schedulings()222    def _test_obey_ACLs_helper(self, use_metahosts):223        self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')224        self._create_job_simple([1], use_metahosts)225        self._run_scheduler()226        self._check_for_extra_schedulings()227    def test_basic_scheduling(self):228        self._test_basic_scheduling_helper(False)229    def test_priorities(self):230        self._test_priorities_helper(False)231    def test_hosts_ready(self):232        self._test_hosts_ready_helper(False)233    def test_hosts_idle(self):234        self._test_hosts_idle_helper(False)235    def test_obey_ACLs(self):236        self._test_obey_ACLs_helper(False)237    def test_one_time_hosts_ignore_ACLs(self):238        self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')239        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1')240        self._create_job_simple([1])241        self._run_scheduler()242        self._assert_job_scheduled_on(1, 1)243        self._check_for_extra_schedulings()244    def test_non_metahost_on_invalid_host(self):245        """246        Non-metahost entries can get scheduled on invalid hosts (this is how247        one-time hosts work).248        """249        self._do_query('UPDATE afe_hosts SET invalid=1')250        self._test_basic_scheduling_helper(False)251    def test_metahost_scheduling(self):252        """253        Basic metahost scheduling254        """255        self._test_basic_scheduling_helper(True)256    def test_metahost_priorities(self):257        self._test_priorities_helper(True)258    def test_metahost_hosts_ready(self):259        self._test_hosts_ready_helper(True)260    def test_metahost_hosts_idle(self):261        self._test_hosts_idle_helper(True)262    def test_metahost_obey_ACLs(self):263        self._test_obey_ACLs_helper(True)264    def test_nonmetahost_over_metahost(self):265        """266        Non-metahost entries should take priority over metahost entries267        for the same host268        """269        self._create_job(metahosts=[1])270        self._create_job(hosts=[1])271        self._run_scheduler()272        self._assert_job_scheduled_on(2, 1)273        self._check_for_extra_schedulings()274    def test_no_execution_subdir_not_found(self):275        """Reproduce bug crosbug.com/334353 and recover from it."""276        self.mock_config.set_config_value('SCHEDULER', 'drones', 'localhost')277        job = self._create_job(hostless=True)278        # Ensure execution_subdir is set before status279        original_set_status = scheduler_models.HostQueueEntry.set_status280        def fake_set_status(hqe, *args, **kwargs):281            self.assertEqual(hqe.execution_subdir, 'hostless')282            original_set_status(hqe, *args, **kwargs)283        self.god.stub_with(scheduler_models.HostQueueEntry, 'set_status',284                           fake_set_status)285        self._dispatcher._schedule_new_jobs()286        hqe = job.hostqueueentry_set.all()[0]287        self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)288        self.assertEqual('hostless', hqe.execution_subdir)289    def test_only_schedule_queued_entries(self):290        self._create_job(metahosts=[1])291        self._update_hqe(set='active=1, host_id=2')292        self._run_scheduler()293        self._check_for_extra_schedulings()294    def test_no_ready_hosts(self):295        self._create_job(hosts=[1])296        self._do_query('UPDATE afe_hosts SET status="Repair Failed"')297        self._run_scheduler()298        self._check_for_extra_schedulings()299    def test_garbage_collection(self):300        self.god.stub_with(self._dispatcher, '_seconds_between_garbage_stats',301                           999999)302        self.god.stub_function(gc, 'collect')303        self.god.stub_function(gc_stats, '_log_garbage_collector_stats')304        gc.collect.expect_call().and_return(0)305        gc_stats._log_garbage_collector_stats.expect_call()306        # Force a garbage collection run307        self._dispatcher._last_garbage_stats_time = 0308        self._dispatcher._garbage_collection()309        # The previous call should have reset the time, it won't do anything310        # the second time.  If it does, we'll get an unexpected call.311        self._dispatcher._garbage_collection()312class DispatcherThrottlingTest(BaseSchedulerTest):313    """314    Test that the dispatcher throttles:315     * total number of running processes316     * number of processes started per cycle317    """318    _MAX_RUNNING = 3319    _MAX_STARTED = 2320    def setUp(self):321        super(DispatcherThrottlingTest, self).setUp()322        scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING323        def fake_max_runnable_processes(fake_self, username,324                                        drone_hostnames_allowed):325            running = sum(agent.task.num_processes326                          for agent in self._agents327                          if agent.started and not agent.is_done())328            return self._MAX_RUNNING - running329        self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',330                           fake_max_runnable_processes)331    def _setup_some_agents(self, num_agents):332        self._agents = [DummyAgent() for i in xrange(num_agents)]333        self._dispatcher._agents = list(self._agents)334    def _run_a_few_ticks(self):335        for i in xrange(4):336            self._dispatcher._handle_agents()337    def _assert_agents_started(self, indexes, is_started=True):338        for i in indexes:339            self.assert_(self._agents[i].started == is_started,340                         'Agent %d %sstarted' %341                         (i, is_started and 'not ' or ''))342    def _assert_agents_not_started(self, indexes):343        self._assert_agents_started(indexes, False)344    def test_throttle_total(self):345        self._setup_some_agents(4)346        self._run_a_few_ticks()347        self._assert_agents_started([0, 1, 2])348        self._assert_agents_not_started([3])349    def test_throttle_with_synchronous(self):350        self._setup_some_agents(2)351        self._agents[0].task.num_processes = 3352        self._run_a_few_ticks()353        self._assert_agents_started([0])354        self._assert_agents_not_started([1])355    def test_large_agent_starvation(self):356        """357        Ensure large agents don't get starved by lower-priority agents.358        """359        self._setup_some_agents(3)360        self._agents[1].task.num_processes = 3361        self._run_a_few_ticks()362        self._assert_agents_started([0])363        self._assert_agents_not_started([1, 2])364        self._agents[0].set_done(True)365        self._run_a_few_ticks()366        self._assert_agents_started([1])367        self._assert_agents_not_started([2])368    def test_zero_process_agent(self):369        self._setup_some_agents(5)370        self._agents[4].task.num_processes = 0371        self._run_a_few_ticks()372        self._assert_agents_started([0, 1, 2, 4])373        self._assert_agents_not_started([3])374class PidfileRunMonitorTest(unittest.TestCase):375    execution_tag = 'test_tag'376    pid = 12345377    process = drone_manager.Process('myhost', pid)378    num_tests_failed = 1379    def setUp(self):380        self.god = mock.mock_god()381        self.mock_drone_manager = self.god.create_mock_class(382            drone_manager.DroneManager, 'drone_manager')383        self.god.stub_with(drone_manager, '_the_instance',384                           self.mock_drone_manager)385        self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs',386                           self._mock_get_pidfile_timeout_secs)387        self.pidfile_id = object()388        (self.mock_drone_manager.get_pidfile_id_from389             .expect_call(self.execution_tag,390                          pidfile_name=drone_manager.AUTOSERV_PID_FILE)391             .and_return(self.pidfile_id))392        self.monitor = pidfile_monitor.PidfileRunMonitor()393        self.monitor.attach_to_existing_process(self.execution_tag)394    def tearDown(self):395        self.god.unstub_all()396    def _mock_get_pidfile_timeout_secs(self):397        return 300398    def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,399                      use_second_read=False):400        contents = drone_manager.PidfileContents()401        if pid is not None:402            contents.process = drone_manager.Process('myhost', pid)403        contents.exit_status = exit_code404        contents.num_tests_failed = tests_failed405        self.mock_drone_manager.get_pidfile_contents.expect_call(406            self.pidfile_id, use_second_read=use_second_read).and_return(407            contents)408    def set_not_yet_run(self):409        self.setup_pidfile()410    def set_empty_pidfile(self):411        self.setup_pidfile()412    def set_running(self, use_second_read=False):413        self.setup_pidfile(self.pid, use_second_read=use_second_read)414    def set_complete(self, error_code, use_second_read=False):415        self.setup_pidfile(self.pid, error_code, self.num_tests_failed,416                           use_second_read=use_second_read)417    def _check_monitor(self, expected_pid, expected_exit_status,418                       expected_num_tests_failed):419        if expected_pid is None:420            self.assertEquals(self.monitor._state.process, None)421        else:422            self.assertEquals(self.monitor._state.process.pid, expected_pid)423        self.assertEquals(self.monitor._state.exit_status, expected_exit_status)424        self.assertEquals(self.monitor._state.num_tests_failed,425                          expected_num_tests_failed)426        self.god.check_playback()427    def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,428                                  expected_num_tests_failed):429        self.monitor._read_pidfile()430        self._check_monitor(expected_pid, expected_exit_status,431                            expected_num_tests_failed)432    def _get_expected_tests_failed(self, expected_exit_status):433        if expected_exit_status is None:434            expected_tests_failed = None435        else:436            expected_tests_failed = self.num_tests_failed437        return expected_tests_failed438    def test_read_pidfile(self):439        self.set_not_yet_run()440        self._test_read_pidfile_helper(None, None, None)441        self.set_empty_pidfile()442        self._test_read_pidfile_helper(None, None, None)443        self.set_running()444        self._test_read_pidfile_helper(self.pid, None, None)445        self.set_complete(123)446        self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed)447    def test_read_pidfile_error(self):448        self.mock_drone_manager.get_pidfile_contents.expect_call(449            self.pidfile_id, use_second_read=False).and_return(450            drone_manager.InvalidPidfile('error'))451        self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException,452                          self.monitor._read_pidfile)453        self.god.check_playback()454    def setup_is_running(self, is_running):455        self.mock_drone_manager.is_process_running.expect_call(456            self.process).and_return(is_running)457    def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,458                                      expected_num_tests_failed):459        self.monitor._get_pidfile_info()460        self._check_monitor(expected_pid, expected_exit_status,461                            expected_num_tests_failed)462    def test_get_pidfile_info(self):463        """464        normal cases for get_pidfile_info465        """466        # running467        self.set_running()468        self.setup_is_running(True)469        self._test_get_pidfile_info_helper(self.pid, None, None)470        # exited during check471        self.set_running()472        self.setup_is_running(False)473        self.set_complete(123, use_second_read=True) # pidfile gets read again474        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)475        # completed476        self.set_complete(123)477        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)478    def test_get_pidfile_info_running_no_proc(self):479        """480        pidfile shows process running, but no proc exists481        """482        # running but no proc483        self.set_running()484        self.setup_is_running(False)485        self.set_running(use_second_read=True)486        self._test_get_pidfile_info_helper(self.pid, 1, 0)487        self.assertTrue(self.monitor.lost_process)488    def test_get_pidfile_info_not_yet_run(self):489        """490        pidfile hasn't been written yet491        """492        self.set_not_yet_run()493        self._test_get_pidfile_info_helper(None, None, None)494    def test_process_failed_to_write_pidfile(self):495        self.set_not_yet_run()496        self.monitor._start_time = (time.time() -497                                    pidfile_monitor._get_pidfile_timeout_secs() - 1)498        self._test_get_pidfile_info_helper(None, 1, 0)499        self.assertTrue(self.monitor.lost_process)500class AgentTest(unittest.TestCase):501    def setUp(self):502        self.god = mock.mock_god()503        self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,504                                                      'dispatcher')505    def tearDown(self):506        self.god.unstub_all()507    def _create_mock_task(self, name):508        task = self.god.create_mock_class(agent_task.AgentTask, name)509        task.num_processes = 1510        _set_host_and_qe_ids(task)511        return task512    def _create_agent(self, task):513        agent = monitor_db.Agent(task)514        agent.dispatcher = self._dispatcher515        return agent516    def _finish_agent(self, agent):517        while not agent.is_done():518            agent.tick()519    def test_agent_abort(self):520        task = self._create_mock_task('task')521        task.poll.expect_call()522        task.is_done.expect_call().and_return(False)523        task.abort.expect_call()524        task.aborted = True525        agent = self._create_agent(task)526        agent.tick()527        agent.abort()528        self._finish_agent(agent)529        self.god.check_playback()530    def _test_agent_abort_before_started_helper(self, ignore_abort=False):531        task = self._create_mock_task('task')532        task.abort.expect_call()533        if ignore_abort:534            task.aborted = False535            task.poll.expect_call()536            task.is_done.expect_call().and_return(True)537            task.success = True538        else:539            task.aborted = True540        agent = self._create_agent(task)541        agent.abort()542        self._finish_agent(agent)543        self.god.check_playback()544    def test_agent_abort_before_started(self):545        self._test_agent_abort_before_started_helper()546        self._test_agent_abort_before_started_helper(True)547class JobSchedulingTest(BaseSchedulerTest):548    def _test_run_helper(self, expect_agent=True, expect_starting=False,549                         expect_pending=False):550        if expect_starting:551            expected_status = models.HostQueueEntry.Status.STARTING552        elif expect_pending:553            expected_status = models.HostQueueEntry.Status.PENDING554        else:555            expected_status = models.HostQueueEntry.Status.VERIFYING556        job = scheduler_models.Job.fetch('id = 1')[0]557        queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]558        assert queue_entry.job is job559        job.run_if_ready(queue_entry)560        self.god.check_playback()561        self._dispatcher._schedule_running_host_queue_entries()562        agent = self._dispatcher._agents[0]563        actual_status = models.HostQueueEntry.smart_get(1).status564        self.assertEquals(expected_status, actual_status)565        if not expect_agent:566            self.assertEquals(agent, None)567            return568        self.assert_(isinstance(agent, monitor_db.Agent))569        self.assert_(agent.task)570        return agent.task571    def test_run_synchronous_ready(self):572        self._create_job(hosts=[1, 2], synchronous=True)573        self._update_hqe("status='Pending', execution_subdir=''")574        queue_task = self._test_run_helper(expect_starting=True)575        self.assert_(isinstance(queue_task, monitor_db.QueueTask))576        self.assertEquals(queue_task.job.id, 1)577        hqe_ids = [hqe.id for hqe in queue_task.queue_entries]578        self.assertEquals(hqe_ids, [1, 2])579    def test_schedule_running_host_queue_entries_fail(self):580        self._create_job(hosts=[2])581        self._update_hqe("status='%s', execution_subdir=''" %582                         models.HostQueueEntry.Status.PENDING)583        job = scheduler_models.Job.fetch('id = 1')[0]584        queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]585        assert queue_entry.job is job586        job.run_if_ready(queue_entry)587        self.assertEqual(queue_entry.status,588                         models.HostQueueEntry.Status.STARTING)589        self.assert_(queue_entry.execution_subdir)590        self.god.check_playback()591        class dummy_test_agent(object):592            task = 'dummy_test_agent'593        self._dispatcher._register_agent_for_ids(594                self._dispatcher._host_agents, [queue_entry.host.id],595                dummy_test_agent)596        # Attempted to schedule on a host that already has an agent.597        # Verify that it doesn't raise any error.598        self._dispatcher._schedule_running_host_queue_entries()599    def test_schedule_hostless_job(self):600        job = self._create_job(hostless=True)601        self.assertEqual(1, job.hostqueueentry_set.count())602        hqe_query = scheduler_models.HostQueueEntry.fetch(603                'id = %s' % job.hostqueueentry_set.all()[0].id)604        self.assertEqual(1, len(hqe_query))605        hqe = hqe_query[0]606        self.assertEqual(models.HostQueueEntry.Status.QUEUED, hqe.status)607        self.assertEqual(0, len(self._dispatcher._agents))608        self._dispatcher._schedule_new_jobs()609        self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)610        self.assertEqual(1, len(self._dispatcher._agents))611        self._dispatcher._schedule_new_jobs()612        # No change to previously schedule hostless job, and no additional agent613        self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)614        self.assertEqual(1, len(self._dispatcher._agents))615class TopLevelFunctionsTest(unittest.TestCase):616    def setUp(self):617        self.god = mock.mock_god()618    def tearDown(self):619        self.god.unstub_all()620    def test_autoserv_command_line(self):621        machines = 'abcd12,efgh34'622        extra_args = ['-Z', 'hello']623        expected_command_line_base = set((monitor_db._autoserv_path, '-p',624                                          '-m', machines, '-r',625                                          '--lab', 'True',626                                          drone_manager.WORKING_DIRECTORY))627        expected_command_line = expected_command_line_base.union(628                ['--verbose']).union(extra_args)629        command_line = set(630                monitor_db._autoserv_command_line(machines, extra_args))631        self.assertEqual(expected_command_line, command_line)632        class FakeJob(object):633            owner = 'Bob'634            name = 'fake job name'635            test_retry = 0636            id = 1337637        class FakeHQE(object):638            job = FakeJob639        expected_command_line = expected_command_line_base.union(640                ['-u', FakeJob.owner, '-l', FakeJob.name])641        command_line = set(monitor_db._autoserv_command_line(642                machines, extra_args=[], queue_entry=FakeHQE, verbose=False))643        self.assertEqual(expected_command_line, command_line)644class AgentTaskTest(unittest.TestCase,645                    frontend_test_utils.FrontendTestMixin):646    def setUp(self):647        self._frontend_common_setup()648    def tearDown(self):649        self._frontend_common_teardown()650    def _setup_drones(self):651        self.god.stub_function(models.DroneSet, 'drone_sets_enabled')652        models.DroneSet.drone_sets_enabled.expect_call().and_return(True)653        drones = []654        for x in xrange(4):655            drones.append(models.Drone.objects.create(hostname=str(x)))656        drone_set_1 = models.DroneSet.objects.create(name='1')657        drone_set_1.drones.add(*drones[0:2])658        drone_set_2 = models.DroneSet.objects.create(name='2')659        drone_set_2.drones.add(*drones[2:4])660        drone_set_3 = models.DroneSet.objects.create(name='3')661        job_1 = self._create_job_simple([self.hosts[0].id],662                                        drone_set=drone_set_1)663        job_2 = self._create_job_simple([self.hosts[0].id],664                                        drone_set=drone_set_2)665        job_3 = self._create_job_simple([self.hosts[0].id],666                                        drone_set=drone_set_3)667        job_4 = self._create_job_simple([self.hosts[0].id])668        job_4.drone_set = None669        job_4.save()670        hqe_1 = job_1.hostqueueentry_set.all()[0]671        hqe_2 = job_2.hostqueueentry_set.all()[0]672        hqe_3 = job_3.hostqueueentry_set.all()[0]673        hqe_4 = job_4.hostqueueentry_set.all()[0]674        return (hqe_1, hqe_2, hqe_3, hqe_4), agent_task.AgentTask()675    def test_get_drone_hostnames_allowed_no_drones_in_set(self):676        hqes, task = self._setup_drones()677        task.queue_entry_ids = (hqes[2].id,)678        self.assertEqual(set(), task.get_drone_hostnames_allowed())679        self.god.check_playback()680    def test_get_drone_hostnames_allowed_no_drone_set(self):681        hqes, task = self._setup_drones()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
