Best Python code snippet using autotest_python
monitor_db_unittest.py
Source:monitor_db_unittest.py  
...51        tasks = list(parameter.queue.queue)52        if len(tasks) != 1:53            return False54        return tasks[0] == self._task55def _set_host_and_qe_ids(agent_or_task, id_list=None):56    if id_list is None:57        id_list = []58    agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list59    agent_or_task.hostnames = dict((host_id, '192.168.1.1')60                                   for host_id in id_list)61class BaseSchedulerTest(unittest.TestCase,62                        frontend_test_utils.FrontendTestMixin):63    _config_section = 'AUTOTEST_WEB'64    def _do_query(self, sql):65        self._database.execute(sql)66    def _set_monitor_stubs(self):67        self.mock_config = global_config.FakeGlobalConfig()68        self.god.stub_with(global_config, 'global_config', self.mock_config)69        # Clear the instance cache as this is a brand new database.70        scheduler_models.DBObject._clear_instance_cache()71        self._database = (72            database_connection.TranslatingDatabase.get_test_database(73                translators=scheduler_lib._DB_TRANSLATORS))74        self._database.connect(db_type='django')75        self._database.debug = _DEBUG76        connection_manager = scheduler_lib.ConnectionManager(autocommit=False)77        self.god.stub_with(connection_manager, 'db_connection', self._database)78        self.god.stub_with(monitor_db, '_db_manager', connection_manager)79        self.god.stub_with(monitor_db, '_db', self._database)80        self.god.stub_with(monitor_db.Dispatcher,81                           '_get_pending_queue_entries',82                           self._get_pending_hqes)83        self.god.stub_with(scheduler_models, '_db', self._database)84        self.god.stub_with(drone_manager.instance(), '_results_dir',85                           '/test/path')86        self.god.stub_with(drone_manager.instance(), '_temporary_directory',87                           '/test/path/tmp')88        self.god.stub_with(drone_manager.instance(), 'initialize',89                           lambda *args: None)90        self.god.stub_with(drone_manager.instance(), 'execute_actions',91                           lambda *args: None)92        monitor_db.initialize_globals()93        scheduler_models.initialize_globals()94    def setUp(self):95        self._frontend_common_setup()96        self._set_monitor_stubs()97        self._set_global_config_values()98        self._dispatcher = monitor_db.Dispatcher()99    def tearDown(self):100        self._database.disconnect()101        self._frontend_common_teardown()102    def _set_global_config_values(self):103        """Set global_config values to suit unittest needs."""104        self.mock_config.set_config_value(105                'SCHEDULER', 'inline_host_acquisition', True)106    def _update_hqe(self, set, where=''):107        query = 'UPDATE afe_host_queue_entries SET ' + set108        if where:109            query += ' WHERE ' + where110        self._do_query(query)111    def _get_pending_hqes(self):112        query_string=('afe_jobs.priority DESC, '113                      'ifnull(nullif(host_id, NULL), host_id) DESC, '114                      'ifnull(nullif(meta_host, NULL), meta_host) DESC, '115                      'job_id')116        return list(scheduler_models.HostQueueEntry.fetch(117            joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',118            where='NOT complete AND NOT active AND status="Queued"',119            order_by=query_string))120class DispatcherSchedulingTest(BaseSchedulerTest):121    _jobs_scheduled = []122    def tearDown(self):123        super(DispatcherSchedulingTest, self).tearDown()124    def _set_monitor_stubs(self):125        super(DispatcherSchedulingTest, self)._set_monitor_stubs()126        def hqe__do_schedule_pre_job_tasks_stub(queue_entry):127            """Called by HostQueueEntry.run()."""128            self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id)129            queue_entry.set_status('Starting')130        self.god.stub_with(scheduler_models.HostQueueEntry,131                           '_do_schedule_pre_job_tasks',132                           hqe__do_schedule_pre_job_tasks_stub)133    def _record_job_scheduled(self, job_id, host_id):134        record = (job_id, host_id)135        self.assert_(record not in self._jobs_scheduled,136                     'Job %d scheduled on host %d twice' %137                     (job_id, host_id))138        self._jobs_scheduled.append(record)139    def _assert_job_scheduled_on(self, job_id, host_id):140        record = (job_id, host_id)141        self.assert_(record in self._jobs_scheduled,142                     'Job %d not scheduled on host %d as expected\n'143                     'Jobs scheduled: %s' %144                     (job_id, host_id, self._jobs_scheduled))145        self._jobs_scheduled.remove(record)146    def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):147        """Assert job was scheduled on exactly number hosts out of a set."""148        found = []149        for host_id in host_ids:150            record = (job_id, host_id)151            if record in self._jobs_scheduled:152                found.append(record)153                self._jobs_scheduled.remove(record)154        if len(found) < number:155            self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'156                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))157        elif len(found) > number:158            self.fail('Job %d scheduled on more than %d hosts in %s.\n'159                      'Jobs scheduled: %s' % (job_id, number, host_ids, found))160    def _check_for_extra_schedulings(self):161        if len(self._jobs_scheduled) != 0:162            self.fail('Extra jobs scheduled: ' +163                      str(self._jobs_scheduled))164    def _convert_jobs_to_metahosts(self, *job_ids):165        sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'166        self._do_query('UPDATE afe_host_queue_entries SET '167                       'meta_host=host_id, host_id=NULL '168                       'WHERE job_id IN ' + sql_tuple)169    def _lock_host(self, host_id):170        self._do_query('UPDATE afe_hosts SET locked=1 WHERE id=' +171                       str(host_id))172    def setUp(self):173        super(DispatcherSchedulingTest, self).setUp()174        self._jobs_scheduled = []175    def _run_scheduler(self):176        self._dispatcher._host_scheduler.tick()177        for _ in xrange(2): # metahost scheduling can take two ticks178            self._dispatcher._schedule_new_jobs()179    def _test_basic_scheduling_helper(self, use_metahosts):180        'Basic nonmetahost scheduling'181        self._create_job_simple([1], use_metahosts)182        self._create_job_simple([2], use_metahosts)183        self._run_scheduler()184        self._assert_job_scheduled_on(1, 1)185        self._assert_job_scheduled_on(2, 2)186        self._check_for_extra_schedulings()187    def _test_priorities_helper(self, use_metahosts):188        'Test prioritization ordering'189        self._create_job_simple([1], use_metahosts)190        self._create_job_simple([2], use_metahosts)191        self._create_job_simple([1,2], use_metahosts)192        self._create_job_simple([1], use_metahosts, priority=1)193        self._run_scheduler()194        self._assert_job_scheduled_on(4, 1) # higher priority195        self._assert_job_scheduled_on(2, 2) # earlier job over later196        self._check_for_extra_schedulings()197    def _test_hosts_ready_helper(self, use_metahosts):198        """199        Only hosts that are status=Ready, unlocked and not invalid get200        scheduled.201        """202        self._create_job_simple([1], use_metahosts)203        self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1')204        self._run_scheduler()205        self._check_for_extra_schedulings()206        self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 '207                       'WHERE id=1')208        self._run_scheduler()209        self._check_for_extra_schedulings()210        self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 '211                       'WHERE id=1')212        self._run_scheduler()213        if not use_metahosts:214            self._assert_job_scheduled_on(1, 1)215        self._check_for_extra_schedulings()216    def _test_hosts_idle_helper(self, use_metahosts):217        'Only idle hosts get scheduled'218        self._create_job(hosts=[1], active=True)219        self._create_job_simple([1], use_metahosts)220        self._run_scheduler()221        self._check_for_extra_schedulings()222    def _test_obey_ACLs_helper(self, use_metahosts):223        self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')224        self._create_job_simple([1], use_metahosts)225        self._run_scheduler()226        self._check_for_extra_schedulings()227    def test_basic_scheduling(self):228        self._test_basic_scheduling_helper(False)229    def test_priorities(self):230        self._test_priorities_helper(False)231    def test_hosts_ready(self):232        self._test_hosts_ready_helper(False)233    def test_hosts_idle(self):234        self._test_hosts_idle_helper(False)235    def test_obey_ACLs(self):236        self._test_obey_ACLs_helper(False)237    def test_one_time_hosts_ignore_ACLs(self):238        self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')239        self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1')240        self._create_job_simple([1])241        self._run_scheduler()242        self._assert_job_scheduled_on(1, 1)243        self._check_for_extra_schedulings()244    def test_non_metahost_on_invalid_host(self):245        """246        Non-metahost entries can get scheduled on invalid hosts (this is how247        one-time hosts work).248        """249        self._do_query('UPDATE afe_hosts SET invalid=1')250        self._test_basic_scheduling_helper(False)251    def test_metahost_scheduling(self):252        """253        Basic metahost scheduling254        """255        self._test_basic_scheduling_helper(True)256    def test_metahost_priorities(self):257        self._test_priorities_helper(True)258    def test_metahost_hosts_ready(self):259        self._test_hosts_ready_helper(True)260    def test_metahost_hosts_idle(self):261        self._test_hosts_idle_helper(True)262    def test_metahost_obey_ACLs(self):263        self._test_obey_ACLs_helper(True)264    def test_nonmetahost_over_metahost(self):265        """266        Non-metahost entries should take priority over metahost entries267        for the same host268        """269        self._create_job(metahosts=[1])270        self._create_job(hosts=[1])271        self._run_scheduler()272        self._assert_job_scheduled_on(2, 1)273        self._check_for_extra_schedulings()274    def test_no_execution_subdir_not_found(self):275        """Reproduce bug crosbug.com/334353 and recover from it."""276        self.mock_config.set_config_value('SCHEDULER', 'drones', 'localhost')277        job = self._create_job(hostless=True)278        # Ensure execution_subdir is set before status279        original_set_status = scheduler_models.HostQueueEntry.set_status280        def fake_set_status(hqe, *args, **kwargs):281            self.assertEqual(hqe.execution_subdir, 'hostless')282            original_set_status(hqe, *args, **kwargs)283        self.god.stub_with(scheduler_models.HostQueueEntry, 'set_status',284                           fake_set_status)285        self._dispatcher._schedule_new_jobs()286        hqe = job.hostqueueentry_set.all()[0]287        self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)288        self.assertEqual('hostless', hqe.execution_subdir)289    def test_only_schedule_queued_entries(self):290        self._create_job(metahosts=[1])291        self._update_hqe(set='active=1, host_id=2')292        self._run_scheduler()293        self._check_for_extra_schedulings()294    def test_no_ready_hosts(self):295        self._create_job(hosts=[1])296        self._do_query('UPDATE afe_hosts SET status="Repair Failed"')297        self._run_scheduler()298        self._check_for_extra_schedulings()299    def test_garbage_collection(self):300        self.god.stub_with(self._dispatcher, '_seconds_between_garbage_stats',301                           999999)302        self.god.stub_function(gc, 'collect')303        self.god.stub_function(gc_stats, '_log_garbage_collector_stats')304        gc.collect.expect_call().and_return(0)305        gc_stats._log_garbage_collector_stats.expect_call()306        # Force a garbage collection run307        self._dispatcher._last_garbage_stats_time = 0308        self._dispatcher._garbage_collection()309        # The previous call should have reset the time, it won't do anything310        # the second time.  If it does, we'll get an unexpected call.311        self._dispatcher._garbage_collection()312class DispatcherThrottlingTest(BaseSchedulerTest):313    """314    Test that the dispatcher throttles:315     * total number of running processes316     * number of processes started per cycle317    """318    _MAX_RUNNING = 3319    _MAX_STARTED = 2320    def setUp(self):321        super(DispatcherThrottlingTest, self).setUp()322        scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING323        def fake_max_runnable_processes(fake_self, username,324                                        drone_hostnames_allowed):325            running = sum(agent.task.num_processes326                          for agent in self._agents327                          if agent.started and not agent.is_done())328            return self._MAX_RUNNING - running329        self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',330                           fake_max_runnable_processes)331    def _setup_some_agents(self, num_agents):332        self._agents = [DummyAgent() for i in xrange(num_agents)]333        self._dispatcher._agents = list(self._agents)334    def _run_a_few_ticks(self):335        for i in xrange(4):336            self._dispatcher._handle_agents()337    def _assert_agents_started(self, indexes, is_started=True):338        for i in indexes:339            self.assert_(self._agents[i].started == is_started,340                         'Agent %d %sstarted' %341                         (i, is_started and 'not ' or ''))342    def _assert_agents_not_started(self, indexes):343        self._assert_agents_started(indexes, False)344    def test_throttle_total(self):345        self._setup_some_agents(4)346        self._run_a_few_ticks()347        self._assert_agents_started([0, 1, 2])348        self._assert_agents_not_started([3])349    def test_throttle_with_synchronous(self):350        self._setup_some_agents(2)351        self._agents[0].task.num_processes = 3352        self._run_a_few_ticks()353        self._assert_agents_started([0])354        self._assert_agents_not_started([1])355    def test_large_agent_starvation(self):356        """357        Ensure large agents don't get starved by lower-priority agents.358        """359        self._setup_some_agents(3)360        self._agents[1].task.num_processes = 3361        self._run_a_few_ticks()362        self._assert_agents_started([0])363        self._assert_agents_not_started([1, 2])364        self._agents[0].set_done(True)365        self._run_a_few_ticks()366        self._assert_agents_started([1])367        self._assert_agents_not_started([2])368    def test_zero_process_agent(self):369        self._setup_some_agents(5)370        self._agents[4].task.num_processes = 0371        self._run_a_few_ticks()372        self._assert_agents_started([0, 1, 2, 4])373        self._assert_agents_not_started([3])374class PidfileRunMonitorTest(unittest.TestCase):375    execution_tag = 'test_tag'376    pid = 12345377    process = drone_manager.Process('myhost', pid)378    num_tests_failed = 1379    def setUp(self):380        self.god = mock.mock_god()381        self.mock_drone_manager = self.god.create_mock_class(382            drone_manager.DroneManager, 'drone_manager')383        self.god.stub_with(drone_manager, '_the_instance',384                           self.mock_drone_manager)385        self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs',386                           self._mock_get_pidfile_timeout_secs)387        self.pidfile_id = object()388        (self.mock_drone_manager.get_pidfile_id_from389             .expect_call(self.execution_tag,390                          pidfile_name=drone_manager.AUTOSERV_PID_FILE)391             .and_return(self.pidfile_id))392        self.monitor = pidfile_monitor.PidfileRunMonitor()393        self.monitor.attach_to_existing_process(self.execution_tag)394    def tearDown(self):395        self.god.unstub_all()396    def _mock_get_pidfile_timeout_secs(self):397        return 300398    def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,399                      use_second_read=False):400        contents = drone_manager.PidfileContents()401        if pid is not None:402            contents.process = drone_manager.Process('myhost', pid)403        contents.exit_status = exit_code404        contents.num_tests_failed = tests_failed405        self.mock_drone_manager.get_pidfile_contents.expect_call(406            self.pidfile_id, use_second_read=use_second_read).and_return(407            contents)408    def set_not_yet_run(self):409        self.setup_pidfile()410    def set_empty_pidfile(self):411        self.setup_pidfile()412    def set_running(self, use_second_read=False):413        self.setup_pidfile(self.pid, use_second_read=use_second_read)414    def set_complete(self, error_code, use_second_read=False):415        self.setup_pidfile(self.pid, error_code, self.num_tests_failed,416                           use_second_read=use_second_read)417    def _check_monitor(self, expected_pid, expected_exit_status,418                       expected_num_tests_failed):419        if expected_pid is None:420            self.assertEquals(self.monitor._state.process, None)421        else:422            self.assertEquals(self.monitor._state.process.pid, expected_pid)423        self.assertEquals(self.monitor._state.exit_status, expected_exit_status)424        self.assertEquals(self.monitor._state.num_tests_failed,425                          expected_num_tests_failed)426        self.god.check_playback()427    def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,428                                  expected_num_tests_failed):429        self.monitor._read_pidfile()430        self._check_monitor(expected_pid, expected_exit_status,431                            expected_num_tests_failed)432    def _get_expected_tests_failed(self, expected_exit_status):433        if expected_exit_status is None:434            expected_tests_failed = None435        else:436            expected_tests_failed = self.num_tests_failed437        return expected_tests_failed438    def test_read_pidfile(self):439        self.set_not_yet_run()440        self._test_read_pidfile_helper(None, None, None)441        self.set_empty_pidfile()442        self._test_read_pidfile_helper(None, None, None)443        self.set_running()444        self._test_read_pidfile_helper(self.pid, None, None)445        self.set_complete(123)446        self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed)447    def test_read_pidfile_error(self):448        self.mock_drone_manager.get_pidfile_contents.expect_call(449            self.pidfile_id, use_second_read=False).and_return(450            drone_manager.InvalidPidfile('error'))451        self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException,452                          self.monitor._read_pidfile)453        self.god.check_playback()454    def setup_is_running(self, is_running):455        self.mock_drone_manager.is_process_running.expect_call(456            self.process).and_return(is_running)457    def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,458                                      expected_num_tests_failed):459        self.monitor._get_pidfile_info()460        self._check_monitor(expected_pid, expected_exit_status,461                            expected_num_tests_failed)462    def test_get_pidfile_info(self):463        """464        normal cases for get_pidfile_info465        """466        # running467        self.set_running()468        self.setup_is_running(True)469        self._test_get_pidfile_info_helper(self.pid, None, None)470        # exited during check471        self.set_running()472        self.setup_is_running(False)473        self.set_complete(123, use_second_read=True) # pidfile gets read again474        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)475        # completed476        self.set_complete(123)477        self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)478    def test_get_pidfile_info_running_no_proc(self):479        """480        pidfile shows process running, but no proc exists481        """482        # running but no proc483        self.set_running()484        self.setup_is_running(False)485        self.set_running(use_second_read=True)486        self._test_get_pidfile_info_helper(self.pid, 1, 0)487        self.assertTrue(self.monitor.lost_process)488    def test_get_pidfile_info_not_yet_run(self):489        """490        pidfile hasn't been written yet491        """492        self.set_not_yet_run()493        self._test_get_pidfile_info_helper(None, None, None)494    def test_process_failed_to_write_pidfile(self):495        self.set_not_yet_run()496        self.monitor._start_time = (time.time() -497                                    pidfile_monitor._get_pidfile_timeout_secs() - 1)498        self._test_get_pidfile_info_helper(None, 1, 0)499        self.assertTrue(self.monitor.lost_process)500class AgentTest(unittest.TestCase):501    def setUp(self):502        self.god = mock.mock_god()503        self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,504                                                      'dispatcher')505    def tearDown(self):506        self.god.unstub_all()507    def _create_mock_task(self, name):508        task = self.god.create_mock_class(agent_task.AgentTask, name)509        task.num_processes = 1510        _set_host_and_qe_ids(task)511        return task512    def _create_agent(self, task):513        agent = monitor_db.Agent(task)514        agent.dispatcher = self._dispatcher515        return agent516    def _finish_agent(self, agent):517        while not agent.is_done():518            agent.tick()519    def test_agent_abort(self):520        task = self._create_mock_task('task')521        task.poll.expect_call()522        task.is_done.expect_call().and_return(False)523        task.abort.expect_call()524        task.aborted = True...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
