How to use _unregister_agent_for_ids method in autotest

Best Python code snippet using autotest_python Github


Full Screen

...327 gc_stats._log_garbage_collector_stats()328 def _register_agent_for_ids(self, agent_dict, object_ids, agent):329 for object_id in object_ids:330 agent_dict.setdefault(object_id, set()).add(agent)331 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):332 for object_id in object_ids:333 assert object_id in agent_dict334 agent_dict[object_id].remove(agent)335 # If an ID has no more active agent associated, there is no need to336 # keep it in the dictionary. Otherwise, scheduler will keep an337 # unnecessarily big dictionary until being restarted.338 if not agent_dict[object_id]:339 agent_dict.pop(object_id)340 def add_agent_task(self, agent_task):341 """342 Creates and adds an agent to the dispatchers list.343 In creating the agent we also pass on all the queue_entry_ids and344 host_ids from the special agent task. For every agent we create, we345 add it to 1. a dict against the queue_entry_ids given to it 2. A dict346 against the host_ids given to it. So theoritically, a host can have any347 number of agents associated with it, and each of them can have any348 special agent task, though in practice we never see > 1 agent/task per349 host at any time.350 @param agent_task: A SpecialTask for the agent to manage.351 """352 agent = Agent(agent_task)353 self._agents.append(agent)354 agent.dispatcher = self355 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)356 self._register_agent_for_ids(self._queue_entry_agents,357 agent.queue_entry_ids, agent)358 def get_agents_for_entry(self, queue_entry):359 """360 Find agents corresponding to the specified queue_entry.361 """362 return list(self._queue_entry_agents.get(, set()))363 def host_has_agent(self, host):364 """365 Determine if there is currently an Agent present using this host.366 """367 return bool(self._host_agents.get(, None))368 def remove_agent(self, agent):369 self._agents.remove(agent)370 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,371 agent)372 self._unregister_agent_for_ids(self._queue_entry_agents,373 agent.queue_entry_ids, agent)374 def _host_has_scheduled_special_task(self, host):375 return bool(models.SpecialTask.objects.filter(,376 is_active=False,377 is_complete=False))378 def _recover_processes(self):379 agent_tasks = self._create_recovery_agent_tasks()380 self._register_pidfiles(agent_tasks)381 _drone_manager.refresh()382 self._recover_tasks(agent_tasks)383 self._recover_pending_entries()384 self._check_for_unrecovered_verifying_entries()385 self._reverify_remaining_hosts()386 # reinitialize drones after killing orphaned processes, since they can...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?