Best Python code snippet using autotest_python
monitor_db.py
Source:monitor_db.py  
...562            # Create a new HQE for every additional assigned_host.563            new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)564            new_hqe.save()565            new_hqe.set_host(assigned_host)566            self._run_queue_entry(new_hqe)567        # The first assigned host uses the original HostQueueEntry568        queue_entry.set_host(group_hosts[0])569        self._run_queue_entry(queue_entry)570    def _schedule_hostless_job(self, queue_entry):571        self.add_agent_task(HostlessQueueTask(queue_entry))572        queue_entry.set_status(models.HostQueueEntry.Status.STARTING)573    def _schedule_new_jobs(self):574        queue_entries = self._refresh_pending_queue_entries()575        if not queue_entries:576            return577        for queue_entry in queue_entries:578            is_unassigned_atomic_group = (579                queue_entry.atomic_group_id is not None and580                queue_entry.host_id is None)581            if queue_entry.is_hostless():582                self._schedule_hostless_job(queue_entry)583            elif is_unassigned_atomic_group:584                self._schedule_atomic_group(queue_entry)585            else:586                assigned_host = self._host_scheduler.schedule_entry(queue_entry)587                if assigned_host and not self.host_has_agent(assigned_host):588                    assert assigned_host.id == queue_entry.host_id589                    self._run_queue_entry(queue_entry)590    def _schedule_running_host_queue_entries(self):591        for agent_task in self._get_queue_entry_agent_tasks():592            self.add_agent_task(agent_task)593    def _schedule_delay_tasks(self):594        for entry in scheduler_models.HostQueueEntry.fetch(595                where='status = "%s"' % models.HostQueueEntry.Status.WAITING):596            task = entry.job.schedule_delayed_callback_task(entry)597            if task:598                self.add_agent_task(task)599    def _run_queue_entry(self, queue_entry):600        queue_entry.schedule_pre_job_tasks()601    def _find_aborting(self):602        jobs_to_stop = set()603        for entry in scheduler_models.HostQueueEntry.fetch(604                where='aborted and not complete'):605            logging.info('Aborting %s', entry)606            for agent in self.get_agents_for_entry(entry):607                agent.abort()608            entry.abort(self)609            jobs_to_stop.add(entry.job)610        for job in jobs_to_stop:611            job.stop_if_necessary()612    def _can_start_agent(self, agent, num_started_this_cycle,613                         have_reached_limit):...task_manager.py
Source:task_manager.py  
...70        self.task_count += 171        self.tasks[task.id()] = task72        self.server_queue.put(task)73        return task.id()74    def _run_queue_entry(self, i, parent_thread):75        album_logging.configure_logging(76            "worker" + str(i), parent_thread_id=parent_thread77        )78        while True:79            task = self.server_queue.get()80            self._handle_task(task)81            self.server_queue.task_done()82    def _handle_task(self, task: ITask):83        module_logger().info(f"TaskManager: starting task {task.id()}...")84        logger = album_logging.configure_logging("task" + str(task.id()))85        handler = LogHandler()86        task.set_log_handler(handler)87        logger.addHandler(handler)88        task.set_status(ITask.Status.RUNNING)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
