Best Python code snippet using autotest_python
monitor_db_functional_unittest.py
Source:monitor_db_functional_unittest.py  
...111        assert pidfile_id is not None112        contents = self._pidfiles[pidfile_id]113        contents.exit_status = exit_status114        contents.num_tests_failed = 0115    def was_last_process_killed(self, pidfile_type):116        pidfile_id = self._last_pidfile_id[pidfile_type]117        return pidfile_id in self._killed_pidfiles118    def nonfinished_pidfile_ids(self):119        return [pidfile_id for pidfile_id, pidfile_contents120                in self._pidfiles.iteritems()121                if pidfile_contents.exit_status is None]122    def running_pidfile_ids(self):123        return [pidfile_id for pidfile_id in self.nonfinished_pidfile_ids()124                if self._pidfiles[pidfile_id].process is not None]125    def pidfile_from_path(self, working_directory, pidfile_name):126        return self._pidfile_index[(working_directory, pidfile_name)]127    def attached_files(self, working_directory):128        """129        Return dict mapping path to contents for attached files with specified130        paths.131        """132        return dict((path, contents) for path, contents133                    in self._attached_files.get(working_directory, [])134                    if path is not None)135    # DroneManager emulation APIs for use by monitor_db136    def get_orphaned_autoserv_processes(self):137        return set()138    def total_running_processes(self):139        return sum(pidfile_id._num_processes140                   for pidfile_id in self.nonfinished_pidfile_ids())141    def max_runnable_processes(self, username, drone_hostnames_allowed):142        return self.process_capacity - self.total_running_processes()143    def refresh(self):144        for pidfile_id in self._unregistered_pidfiles:145            # intentionally handle non-registered pidfiles silently146            self._pidfiles.pop(pidfile_id, None)147        self._unregistered_pidfiles = set()148    def execute_actions(self):149        # executing an "execute_command" causes a pidfile to be created150        for pidfile_id in self._future_pidfiles:151            # Process objects are opaque to monitor_db152            process = object()153            self._pidfiles[pidfile_id].process = process154            self._process_index[process] = pidfile_id155        self._future_pidfiles = []156    def attach_file_to_execution(self, result_dir, file_contents,157                                 file_path=None):158        self._attached_files.setdefault(result_dir, set()).add((file_path,159                                                                file_contents))160        return 'attach_path'161    def _initialize_pidfile(self, pidfile_id):162        if pidfile_id not in self._pidfiles:163            assert pidfile_id.key() not in self._pidfile_index164            self._pidfiles[pidfile_id] = drone_manager.PidfileContents()165            self._pidfile_index[pidfile_id.key()] = pidfile_id166    def _set_last_pidfile(self, pidfile_id, working_directory, pidfile_name):167        if working_directory.startswith('hosts/'):168            # such paths look like hosts/host1/1-verify, we'll grab the end169            type_string = working_directory.rsplit('-', 1)[1]170            pidfile_type = _PidfileType.get_value(type_string)171        else:172            pidfile_type = _PIDFILE_TO_PIDFILE_TYPE[pidfile_name]173        self._last_pidfile_id[pidfile_type] = pidfile_id174    def execute_command(self, command, working_directory, pidfile_name,175                        num_processes, log_file=None, paired_with_pidfile=None,176                        username=None, drone_hostnames_allowed=None):177        logging.debug('Executing %s in %s', command, working_directory)178        pidfile_id = self._DummyPidfileId(working_directory, pidfile_name)179        if pidfile_id.key() in self._pidfile_index:180            pidfile_id = self._pidfile_index[pidfile_id.key()]181        pidfile_id._num_processes = num_processes182        pidfile_id._paired_with_pidfile = paired_with_pidfile183        self._future_pidfiles.append(pidfile_id)184        self._initialize_pidfile(pidfile_id)185        self._pidfile_index[(working_directory, pidfile_name)] = pidfile_id186        self._set_last_pidfile(pidfile_id, working_directory, pidfile_name)187        return pidfile_id188    def get_pidfile_contents(self, pidfile_id, use_second_read=False):189        if pidfile_id not in self._pidfiles:190            logging.debug('Request for nonexistent pidfile %s' % pidfile_id)191        return self._pidfiles.get(pidfile_id, drone_manager.PidfileContents())192    def is_process_running(self, process):193        return True194    def register_pidfile(self, pidfile_id):195        self._initialize_pidfile(pidfile_id)196    def unregister_pidfile(self, pidfile_id):197        self._unregistered_pidfiles.add(pidfile_id)198    def declare_process_count(self, pidfile_id, num_processes):199        pidfile_id.num_processes = num_processes200    def absolute_path(self, path):201        return 'absolute/' + path202    def write_lines_to_file(self, file_path, lines, paired_with_process=None):203        # TODO: record this204        pass205    def get_pidfile_id_from(self, execution_tag, pidfile_name):206        default_pidfile = self._DummyPidfileId(execution_tag, pidfile_name,207                                               num_processes=0)208        return self._pidfile_index.get((execution_tag, pidfile_name),209                                       default_pidfile)210    def kill_process(self, process):211        pidfile_id = self._process_index[process]212        self._killed_pidfiles.add(pidfile_id)213        self._set_pidfile_exit_status(pidfile_id, 271)214class MockEmailManager(NullMethodObject):215    _NULL_METHODS = ('send_queued_admin', 'send')216    def enqueue_admin(self, subject, message):217        logging.warn('enqueue_notify_email: %s', subject)218        logging.warn(message)219class SchedulerFunctionalTest(unittest.TestCase,220                              test_utils.FrontendTestMixin):221    # some number of ticks after which the scheduler is presumed to have222    # stabilized, given no external changes223    _A_LOT_OF_TICKS = 10224    def setUp(self):225        self._frontend_common_setup()226        self._set_stubs()227        self._set_settings_values()228        self._create_dispatcher()229        logging.basicConfig(level=logging.DEBUG)230    def _create_dispatcher(self):231        self.dispatcher = monitor_db.Dispatcher()232    def tearDown(self):233        self._database.disconnect()234        self._frontend_common_teardown()235    def _set_stubs(self):236        self.mock_config = MockGlobalConfig()237        self.god.stub_with(settings, 'settings', self.mock_config)238        self.mock_drone_manager = MockDroneManager()239        drone_manager._set_instance(self.mock_drone_manager)240        self.mock_email_manager = MockEmailManager()241        self.god.stub_with(mail, "manager", self.mock_email_manager)242        self._database = (243            database_connection.TranslatingDatabase.get_test_database(244                translators=_DB_TRANSLATORS))245        self._database.connect(db_type='django')246        self.god.stub_with(monitor_db, '_db', self._database)247        self.god.stub_with(scheduler_models, '_db', self._database)248        monitor_db.initialize_globals()249        scheduler_models.initialize_globals()250    def _set_settings_values(self):251        self.mock_config.set_config_value('SCHEDULER', 'pidfile_timeout_mins',252                                          1)253        self.mock_config.set_config_value('SCHEDULER', 'gc_stats_interval_mins',254                                          999999)255    def _initialize_test(self):256        self.dispatcher.initialize()257    def _run_dispatcher(self):258        for _ in xrange(self._A_LOT_OF_TICKS):259            self.dispatcher.tick()260    def test_idle(self):261        self._initialize_test()262        self._run_dispatcher()263    def _assert_process_executed(self, working_directory, pidfile_name):264        process_was_executed = self.mock_drone_manager.was_process_executed(265            'hosts/host1/1-verify', drone_manager.AUTOSERV_PID_FILE)266        self.assert_(process_was_executed,267                     '%s/%s not executed' % (working_directory, pidfile_name))268    def _update_instance(self, model_instance):269        return type(model_instance).objects.get(pk=model_instance.pk)270    def _check_statuses(self, queue_entry, queue_entry_status,271                        host_status=None):272        self._check_entry_status(queue_entry, queue_entry_status)273        if host_status:274            self._check_host_status(queue_entry.host, host_status)275    def _check_entry_status(self, queue_entry, status):276        # update from DB277        queue_entry = self._update_instance(queue_entry)278        self.assertEquals(queue_entry.status, status)279    def _check_host_status(self, host, status):280        # update from DB281        host = self._update_instance(host)282        self.assertEquals(host.status, status)283    def _run_pre_job_verify(self, queue_entry):284        self._run_dispatcher()  # launches verify285        self._check_statuses(queue_entry, HqeStatus.VERIFYING,286                             HostStatus.VERIFYING)287        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)288    def test_simple_job(self):289        self._initialize_test()290        job, queue_entry = self._make_job_and_queue_entry()291        self._run_pre_job_verify(queue_entry)292        self._run_dispatcher()  # launches job293        self._check_statuses(queue_entry, HqeStatus.RUNNING, HostStatus.RUNNING)294        self._finish_job(queue_entry)295        self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)296        self._assert_nothing_is_running()297    def _setup_for_pre_job_cleanup(self):298        self._initialize_test()299        job, queue_entry = self._make_job_and_queue_entry()300        job.reboot_before = model_attributes.RebootBefore.ALWAYS301        job.save()302        return queue_entry303    def _run_pre_job_cleanup_job(self, queue_entry):304        self._run_dispatcher()  # cleanup305        self._check_statuses(queue_entry, HqeStatus.VERIFYING,306                             HostStatus.CLEANING)307        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)308        self._run_dispatcher()  # verify309        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)310        self._run_dispatcher()  # job311        self._finish_job(queue_entry)312    def test_pre_job_cleanup(self):313        queue_entry = self._setup_for_pre_job_cleanup()314        self._run_pre_job_cleanup_job(queue_entry)315    def _run_pre_job_cleanup_one_failure(self):316        queue_entry = self._setup_for_pre_job_cleanup()317        self._run_dispatcher()  # cleanup318        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,319                                               exit_status=256)320        self._run_dispatcher()  # repair321        self._check_statuses(queue_entry, HqeStatus.QUEUED,322                             HostStatus.REPAIRING)323        self.mock_drone_manager.finish_process(_PidfileType.REPAIR)324        return queue_entry325    def test_pre_job_cleanup_failure(self):326        queue_entry = self._run_pre_job_cleanup_one_failure()327        # from here the job should run as normal328        self._run_pre_job_cleanup_job(queue_entry)329    def test_pre_job_cleanup_double_failure(self):330        # TODO (showard): this test isn't perfect.  in reality, when the second331        # cleanup fails, it copies its results over to the job directory using332        # copy_results_on_drone() and then parses them.  since we don't handle333        # that, there appear to be no results at the job directory.  the334        # scheduler handles this gracefully, parsing gets effectively skipped,335        # and this test passes as is.  but we ought to properly test that336        # behavior.337        queue_entry = self._run_pre_job_cleanup_one_failure()338        self._run_dispatcher()  # second cleanup339        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,340                                               exit_status=256)341        self._run_dispatcher()342        self._check_statuses(queue_entry, HqeStatus.FAILED,343                             HostStatus.REPAIR_FAILED)344        # nothing else should run345        self._assert_nothing_is_running()346    def _assert_nothing_is_running(self):347        self.assertEquals(self.mock_drone_manager.running_pidfile_ids(), [])348    def _setup_for_post_job_cleanup(self):349        self._initialize_test()350        job, queue_entry = self._make_job_and_queue_entry()351        job.reboot_after = model_attributes.RebootAfter.ALWAYS352        job.save()353        return queue_entry354    def _run_post_job_cleanup_failure_up_to_repair(self, queue_entry,355                                                   include_verify=True):356        if include_verify:357            self._run_pre_job_verify(queue_entry)358        self._run_dispatcher()  # job359        self.mock_drone_manager.finish_process(_PidfileType.JOB)360        self._run_dispatcher()  # parsing + cleanup361        self.mock_drone_manager.finish_process(_PidfileType.PARSE)362        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,363                                               exit_status=256)364        self._run_dispatcher()  # repair, HQE unaffected365        self.mock_drone_manager.finish_process(_PidfileType.ARCHIVE)366        self._run_dispatcher()367        return queue_entry368    def test_post_job_cleanup_failure(self):369        queue_entry = self._setup_for_post_job_cleanup()370        self._run_post_job_cleanup_failure_up_to_repair(queue_entry)371        self._check_statuses(queue_entry, HqeStatus.COMPLETED,372                             HostStatus.REPAIRING)373        self.mock_drone_manager.finish_process(_PidfileType.REPAIR)374        self._run_dispatcher()375        self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)376    def test_post_job_cleanup_failure_repair_failure(self):377        queue_entry = self._setup_for_post_job_cleanup()378        self._run_post_job_cleanup_failure_up_to_repair(queue_entry)379        self.mock_drone_manager.finish_process(_PidfileType.REPAIR,380                                               exit_status=256)381        self._run_dispatcher()382        self._check_statuses(queue_entry, HqeStatus.COMPLETED,383                             HostStatus.REPAIR_FAILED)384    def _ensure_post_job_process_is_paired(self, queue_entry, pidfile_type):385        pidfile_name = _PIDFILE_TYPE_TO_PIDFILE[pidfile_type]386        queue_entry = self._update_instance(queue_entry)387        pidfile_id = self.mock_drone_manager.pidfile_from_path(388            queue_entry.execution_path(), pidfile_name)389        self.assert_(pidfile_id._paired_with_pidfile)390    def _finish_job(self, queue_entry):391        self.mock_drone_manager.finish_process(_PidfileType.JOB)392        self._run_dispatcher()  # launches parsing + cleanup393        self._check_statuses(queue_entry, HqeStatus.PARSING,394                             HostStatus.CLEANING)395        self._ensure_post_job_process_is_paired(queue_entry, _PidfileType.PARSE)396        self._finish_parsing_and_cleanup(queue_entry)397    def _finish_parsing_and_cleanup(self, queue_entry):398        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)399        self.mock_drone_manager.finish_process(_PidfileType.PARSE)400        self._run_dispatcher()401        self._check_entry_status(queue_entry, HqeStatus.ARCHIVING)402        self.mock_drone_manager.finish_process(_PidfileType.ARCHIVE)403        self._run_dispatcher()404    def _create_reverify_request(self):405        host = self.hosts[0]406        models.SpecialTask.schedule_special_task(407            host=host, task=models.SpecialTask.Task.VERIFY)408        return host409    def test_requested_reverify(self):410        host = self._create_reverify_request()411        self._run_dispatcher()412        self._check_host_status(host, HostStatus.VERIFYING)413        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)414        self._run_dispatcher()415        self._check_host_status(host, HostStatus.READY)416    def test_requested_reverify_failure(self):417        host = self._create_reverify_request()418        self._run_dispatcher()419        self.mock_drone_manager.finish_process(_PidfileType.VERIFY,420                                               exit_status=256)421        self._run_dispatcher()  # repair422        self._check_host_status(host, HostStatus.REPAIRING)423        self.mock_drone_manager.finish_process(_PidfileType.REPAIR)424        self._run_dispatcher()425        self._check_host_status(host, HostStatus.READY)426    def _setup_for_do_not_verify(self):427        self._initialize_test()428        job, queue_entry = self._make_job_and_queue_entry()429        queue_entry.host.protection = host_protections.Protection.DO_NOT_VERIFY430        queue_entry.host.save()431        return queue_entry432    def test_do_not_verify_job(self):433        queue_entry = self._setup_for_do_not_verify()434        self._run_dispatcher()  # runs job directly435        self._finish_job(queue_entry)436    def test_do_not_verify_job_with_cleanup(self):437        queue_entry = self._setup_for_do_not_verify()438        queue_entry.job.reboot_before = model_attributes.RebootBefore.ALWAYS439        queue_entry.job.save()440        self._run_dispatcher()  # cleanup441        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)442        self._run_dispatcher()  # job443        self._finish_job(queue_entry)444    def test_do_not_verify_pre_job_cleanup_failure(self):445        queue_entry = self._setup_for_do_not_verify()446        queue_entry.job.reboot_before = model_attributes.RebootBefore.ALWAYS447        queue_entry.job.save()448        self._run_dispatcher()  # cleanup449        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,450                                               exit_status=256)451        self._run_dispatcher()  # failure ignored; job runs452        self._finish_job(queue_entry)453    def test_do_not_verify_post_job_cleanup_failure(self):454        queue_entry = self._setup_for_do_not_verify()455        self._run_post_job_cleanup_failure_up_to_repair(queue_entry,456                                                        include_verify=False)457        # failure ignored, host still set to Ready458        self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)459        self._run_dispatcher()  # nothing else runs460        self._assert_nothing_is_running()461    def test_do_not_verify_requested_reverify_failure(self):462        host = self._create_reverify_request()463        host.protection = host_protections.Protection.DO_NOT_VERIFY464        host.save()465        self._run_dispatcher()466        self.mock_drone_manager.finish_process(_PidfileType.VERIFY,467                                               exit_status=256)468        self._run_dispatcher()469        self._check_host_status(host, HostStatus.READY)  # ignore failure470        self._assert_nothing_is_running()471    def test_job_abort_in_verify(self):472        self._initialize_test()473        job = self._create_job(hosts=[1])474        self._run_dispatcher()  # launches verify475        job.hostqueueentry_set.update(aborted=True)476        self._run_dispatcher()  # kills verify, launches cleanup477        self.assert_(self.mock_drone_manager.was_last_process_killed(478            _PidfileType.VERIFY))479        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)480        self._run_dispatcher()481    def test_job_abort(self):482        self._initialize_test()483        job = self._create_job(hosts=[1])484        job.run_verify = False485        job.save()486        self._run_dispatcher()  # launches job487        job.hostqueueentry_set.update(aborted=True)488        self._run_dispatcher()  # kills job, launches gathering489        self.assert_(self.mock_drone_manager.was_last_process_killed(490            _PidfileType.JOB))491        self.mock_drone_manager.finish_process(_PidfileType.GATHER)492        self._run_dispatcher()  # launches parsing + cleanup493        queue_entry = job.hostqueueentry_set.all()[0]494        self._finish_parsing_and_cleanup(queue_entry)495    def test_job_abort_queued_synchronous(self):496        self._initialize_test()497        job = self._create_job(hosts=[1, 2])498        job.synch_count = 2499        job.save()500        job.hostqueueentry_set.update(aborted=True)501        self._run_dispatcher()502        for host_queue_entry in job.hostqueueentry_set.all():503            self.assertEqual(host_queue_entry.status,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
