How to use test_schedule_task method in locust

Best Python code snippet using locust

god_tests.py

Source:god_tests.py Github

copy

Full Screen

...312 new_task = self.scheduler.db_jobs.find_one({'id': task_id})313 self.assertTrue(new_task is not None)314 self.assertTrue(new_task['status']['primary'] == 'pending')315 return task_id316 def test_schedule_task(self):317 task_id = self.test_task_create()318 pending_tasks = self.scheduler.db_jobs.find({'status.primary': 'pending'})319 task_list = []320 for p in pending_tasks:321 task_list.append(p)322 queued_tasks = self.scheduler.schedule_tasks(task_list)323 self.assertTrue(len(queued_tasks) == 1)324 return queued_tasks325 def test_run_task(self):326 queued_tasks = self.test_schedule_task()327 self.scheduler.run_tasks(queued_tasks)328 running_tasks = self.scheduler.db_jobs.find({'status.primary': 'running'})329 self.assertTrue(running_tasks.count() == 1)330 return running_tasks331 def test_run_task_watcher_reject(self):332 self.sample_task['meta']['name'] = 'norun'333 queued_tasks = self.test_schedule_task()334 self.scheduler.run_tasks(queued_tasks)335 running_tasks = self.scheduler.db_jobs.find({'status.primary': 'running'})336 self.assertTrue(running_tasks.count() == 0)337 pending_tasks = self.scheduler.db_jobs.find({'status.primary': 'pending'})338 self.assertTrue(pending_tasks.count() == 1)339 def test_check_redis_restore(self):340 running_tasks = self.test_run_task()341 nb_running_redis = self.scheduler.r.llen(self.watcher.cfg['redis_prefix']+':jobs:running')342 total_redis = int(self.scheduler.r.get(self.watcher.cfg['redis_prefix']+':jobs'))343 self.assertTrue(nb_running_redis > 0)344 self.scheduler.r.flushdb()345 self.assertTrue(self.scheduler.r.llen(self.watcher.cfg['redis_prefix']+':jobs:running') == 0)346 self.scheduler.check_redis()347 nb_running_redis_after_restore = self.scheduler.r.llen(self.watcher.cfg['redis_prefix']+':jobs:running')348 self.assertTrue(nb_running_redis == nb_running_redis_after_restore)349 total_redis_after_restore = int(self.scheduler.r.get(self.watcher.cfg['redis_prefix']+':jobs'))350 self.assertTrue(total_redis == total_redis_after_restore)351 @attr('test')352 def test_watch_task_over(self):353 self.test_run_task()354 self.watcher.check_running_jobs()355 over_tasks = self.watcher.db_jobsover.find()356 self.assertTrue(over_tasks.count() == 1)357 for task in over_tasks:358 self.assertTrue(task['container']['meta']['disk_size'] > 0)359 return over_tasks360 def test_archive(self):361 self.test_watch_task_over()362 over_tasks = self.watcher.db_jobsover.find()363 for task in over_tasks:364 self.archiver.archive_task(task)365 over_tasks = self.watcher.db_jobsover.find()366 for task in over_tasks:367 self.assertTrue(task['status']['primary'] == godutils.STATUS_ARCHIVED)368 def test_dependent_tasks(self):369 running = self.test_run_task()370 self.sample_task['requirements']['tasks'] = [running[0]['id']]371 running = self.test_run_task()372 # Parent is running, task should have been kept in pending373 self.assertTrue(running.count() == 1)374 # Finish parent job375 self.watcher.check_running_jobs()376 # Reschedule377 pending_tasks = self.scheduler.db_jobs.find({'status.primary': 'pending'})378 task_list = []379 for p in pending_tasks:380 task_list.append(p)381 queued_tasks = self.scheduler.schedule_tasks(task_list)382 self.scheduler.run_tasks(queued_tasks)383 running_tasks = self.scheduler.db_jobs.find({'status.primary': 'running'})384 self.assertTrue(running_tasks.count() == 1)385 def test_dependent_tasks_ids_are_str(self):386 running = self.test_run_task()387 self.sample_task['requirements']['tasks'] = [str(running[0]['id'])]388 running = self.test_run_task()389 # Parent is running, task should have been kept in pending390 self.assertTrue(running.count() == 1)391 # Finish parent job392 self.watcher.check_running_jobs()393 # Reschedule394 pending_tasks = self.scheduler.db_jobs.find({'status.primary': 'pending'})395 task_list = []396 for p in pending_tasks:397 task_list.append(p)398 queued_tasks = self.scheduler.schedule_tasks(task_list)399 self.scheduler.run_tasks(queued_tasks)400 running_tasks = self.scheduler.db_jobs.find({'status.primary': 'running'})401 self.assertTrue(running_tasks.count() == 1)402 def test_kill_task_running(self):403 running_tasks = self.test_run_task()404 task_to_kill = running_tasks[0]405 self.watcher.r.rpush(self.watcher.cfg['redis_prefix']+':jobs:kill', dumps(task_to_kill))406 self.watcher.kill_tasks([task_to_kill])407 running_tasks = self.scheduler.db_jobs.find({'status.primary': 'running'})408 self.assertTrue(running_tasks.count() == 0)409 over_tasks = self.scheduler.db_jobsover.find()410 self.assertTrue(over_tasks.count() == 1)411 def test_kill_task_pending(self):412 pending_tasks = self.test_schedule_task()413 task_to_kill = pending_tasks[0]414 self.watcher.r.rpush(self.watcher.cfg['redis_prefix']+':jobs:kill', dumps(task_to_kill))415 self.watcher.kill_tasks([task_to_kill])416 running_tasks = self.scheduler.db_jobs.find({'status.primary': 'running'})417 self.assertTrue(running_tasks.count() == 0)418 over_tasks = self.scheduler.db_jobsover.find()419 self.assertTrue(over_tasks.count() == 1)420 def test_reschedule(self):421 running_tasks = self.test_run_task()422 task_to_reschedule = running_tasks[0]423 pending_tasks = self.scheduler.db_jobs.find({'status.primary': 'pending'})424 self.assertTrue(pending_tasks.count() == 0)425 self.scheduler.reschedule_tasks([task_to_reschedule])426 self.watcher.kill_tasks([task_to_reschedule])427 pending_tasks = self.scheduler.db_jobs.find({'status.primary': 'pending'})428 self.assertTrue(pending_tasks.count() == 1)429 running_tasks = self.scheduler.db_jobs.find({'status.primary': 'running'})430 self.assertTrue(running_tasks.count() == 0)431 over_tasks = self.scheduler.db_jobsover.find()432 self.assertTrue(over_tasks.count() == 0)433 def test_suspend_task_running(self):434 running_tasks = self.test_run_task()435 task_to_suspend = running_tasks[0]436 self.watcher.r.rpush(self.watcher.cfg['redis_prefix']+':jobs:suspend', dumps(task_to_suspend))437 self.watcher.suspend_tasks([task_to_suspend])438 suspended_task = self.scheduler.db_jobs.find_one({'id': task_to_suspend['id']})439 self.assertTrue(suspended_task['status']['secondary'] == 'suspended')440 return suspended_task441 def test_suspend_task_pending(self):442 pending_tasks = self.test_schedule_task()443 task_to_suspend = pending_tasks[0]444 self.watcher.r.rpush(self.watcher.cfg['redis_prefix']+':jobs:suspend', dumps(task_to_suspend))445 self.watcher.suspend_tasks([task_to_suspend])446 suspended_task = self.scheduler.db_jobs.find_one({'id': task_to_suspend['id']})447 self.assertTrue(suspended_task['status']['secondary'] == 'suspend rejected')448 return task_to_suspend449 def test_resume_task_suspended(self):450 suspended_task = self.test_suspend_task_running()451 suspended_task['status']['secondary'] = 'resume requested'452 self.watcher.r.rpush(self.watcher.cfg['redis_prefix']+':jobs:resume', dumps(suspended_task))453 self.watcher.resume_tasks([suspended_task])454 resumed_task = self.scheduler.db_jobs.find_one({'id': suspended_task['id']})455 self.assertTrue(resumed_task['status']['secondary'] == 'resumed')456 return resumed_task457 def test_resume_task_pending(self):458 pending_tasks = self.test_schedule_task()459 task_to_resume = pending_tasks[0]460 self.watcher.r.rpush(self.watcher.cfg['redis_prefix']+':jobs:resume', dumps(task_to_resume))461 self.watcher.resume_tasks([task_to_resume])462 resumed_task = self.scheduler.db_jobs.find_one({'id': task_to_resume['id']})463 self.assertTrue(resumed_task['status']['secondary'] == 'resume rejected')464 def test_resume_task_running(self):465 running_tasks = self.test_run_task()466 task_to_resume = running_tasks[0]467 self.watcher.r.rpush(self.watcher.cfg['redis_prefix']+':jobs:resume', dumps(task_to_resume))468 self.watcher.resume_tasks([task_to_resume])469 resumed_task = self.scheduler.db_jobs.find_one({'id': task_to_resume['id']})470 self.assertTrue(resumed_task['status']['secondary'] == 'resume rejected')471 def test_rejected_task(self):472 queued_tasks = []473 # Queue 6 tasks474 for i in range(6):475 task_id = self.test_task_create()476 pending_task = self.scheduler.db_jobs.find_one({'id': task_id})477 queued_tasks.append(pending_task)478 # Run tasks479 self.scheduler.run_tasks(queued_tasks)480 running_tasks = self.scheduler.db_jobs.find({'status.primary': 'running'})481 self.assertTrue(running_tasks.count() == 5)482 # 1 task should have been rejected and set back as pending483 pending_tasks = self.scheduler.db_jobs.find({'status.primary': 'pending'})484 self.assertTrue(pending_tasks.count() == 1)485 def test_reschedule_rejected_task(self):486 self.test_rejected_task()487 # Now, we have 1 left in pending488 pending_tasks = self.scheduler.db_jobs.find({'status.primary': 'pending'})489 task_list = []490 for p in pending_tasks:491 task_list.append(p)492 queued_tasks = self.scheduler.schedule_tasks(task_list)493 self.scheduler.run_tasks(queued_tasks)494 pending_tasks = self.scheduler.db_jobs.find({'status.primary': 'pending'})495 self.assertTrue(pending_tasks.count() == 0)496 def test_run_interactive_task(self):497 task = copy.deepcopy(self.sample_task)498 task['command']['interactive'] = True499 task_id = self.scheduler.add_task(task)500 pending_task = self.scheduler.db_jobs.find_one({'id': task_id})501 queued_tasks = [pending_task]502 self.scheduler.run_tasks(queued_tasks)503 ports_allocated = self.scheduler.r.llen(self.scheduler.cfg['redis_prefix']+':ports:fake-laptop')504 self.assertTrue(ports_allocated >= 1)505 nb_ports_before = self.scheduler.r.llen(self.scheduler.cfg['redis_prefix']+':ports:fake-laptop')506 self.watcher.check_running_jobs()507 nb_ports_after = self.scheduler.r.llen(self.scheduler.cfg['redis_prefix']+':ports:fake-laptop')508 # Check port is released509 self.assertTrue(nb_ports_before + 1 == nb_ports_after)510 def test_task_array_create(self):511 task = copy.deepcopy(self.sample_task)512 task['requirements']['array']['values'] = '1:3:1'513 task_id = self.scheduler.add_task(task)514 self.assertTrue(task_id > 0)515 new_task = self.scheduler.db_jobs.find_one({'id': task_id})516 self.assertTrue(new_task is not None)517 self.assertTrue(new_task['status']['primary'] == 'pending')518 nb_tasks = self.scheduler.db_jobs.find().count()519 self.assertTrue(nb_tasks == 4)520 return (task_id, new_task['requirements']['array']['tasks'])521 def test_task_array_schedule(self):522 (task_id, subtasks) = self.test_task_array_create()523 pending_tasks = self.scheduler.db_jobs.find({'status.primary': 'pending'})524 task_list = []525 for p in pending_tasks:526 task_list.append(p)527 queued_tasks = self.scheduler.schedule_tasks(task_list)528 self.assertTrue(len(queued_tasks) == 4)529 return queued_tasks530 def test_run_task_array(self):531 queued_tasks = self.test_task_array_schedule()532 self.scheduler.run_tasks(queued_tasks)533 running_tasks = self.scheduler.db_jobs.find({'status.primary': 'running'})534 self.assertTrue(running_tasks.count() == 4)535 return running_tasks536 def test_watch_task_array_over(self):537 self.test_run_task_array()538 self.watcher.check_running_jobs()539 # May need a second pass, need to get all child tasks over first to get parent task over540 self.watcher.check_running_jobs()541 over_tasks = self.watcher.db_jobsover.find()542 #tasks = self.watcher.db_jobs.find()543 #for task in tasks:544 # print(str(task))545 #print("###"+str(over_tasks.count()))546 self.assertTrue(over_tasks.count() == 4)547 def test_kill_task_running(self):548 running_tasks = self.test_run_task_array()549 task_to_kill = None550 subtasks_to_kill = []551 for running_task in running_tasks:552 if running_task['parent_task_id'] is None:553 task_to_kill = running_task554 else:555 subtasks_to_kill.append(running_task)556 self.watcher.r.rpush(self.watcher.cfg['redis_prefix']+':jobs:kill', dumps(task_to_kill))557 self.watcher.kill_tasks([task_to_kill])558 self.watcher.kill_tasks(subtasks_to_kill)559 self.watcher.kill_tasks([task_to_kill])560 running_tasks = self.scheduler.db_jobs.find({'status.primary': 'running'})561 self.assertTrue(running_tasks.count() == 0)562 over_tasks = self.scheduler.db_jobsover.find()563 self.assertTrue(over_tasks.count() == 4)564 def test_plugin_get_users(self):565 user_list = self.scheduler.executors[0].get_users(['osallou'])566 self.assertTrue(user_list.count()==1)567 def test_user_rate_limit(self):568 self.scheduler.cfg['rate_limit'] = 1569 task = copy.deepcopy(self.sample_task)570 task_id = self.scheduler.add_task(task)571 pending_tasks = self.scheduler.db_jobs.find({'status.primary': 'pending'})572 task_list = []573 for p in pending_tasks:574 task_list.append(p)575 queued_tasks = self.scheduler.schedule_tasks(task_list)576 task = copy.deepcopy(self.sample_task)577 task_id = self.scheduler.add_task(task)578 self.assertTrue(task_id is None)579 def test_global_rate_limit(self):580 self.scheduler.cfg['rate_limit_all'] = 1581 task = copy.deepcopy(self.sample_task)582 task_id = self.scheduler.add_task(task)583 pending_tasks = self.scheduler.db_jobs.find({'status.primary': 'pending'})584 task_list = []585 for p in pending_tasks:586 task_list.append(p)587 queued_tasks = self.scheduler.schedule_tasks(task_list)588 task = copy.deepcopy(self.sample_task)589 task_id = self.scheduler.add_task(task)590 self.assertTrue(task_id is None)591 def test_disk_quota(self):592 queued_tasks = self.test_schedule_task()593 self.scheduler.run_tasks(queued_tasks)594 running_tasks = self.scheduler.db_jobs.find({'status.primary': 'running'})595 self.assertTrue(running_tasks.count() == 1)596 self.watcher.check_running_jobs()597 self.scheduler.cfg['disk_default_quota'] = '10'598 queued_tasks = self.test_schedule_task()599 quota_job_id = queued_tasks[0]['id']600 self.scheduler.run_tasks(queued_tasks)601 over_quota_task = self.scheduler.db_jobsover.find_one({'id': quota_job_id})...

Full Screen

Full Screen

test_prepare.py

Source:test_prepare.py Github

copy

Full Screen

...12 """Test that a reminder task can be prepared"""13 task = selector.prepare(example_tasks["remind"])14 assert len(task) == 115 assert task[0]["verb"] == "remind"16 def test_schedule_task(self, selector, no_automate, example_tasks):17 """Test that a schedule task can be prepared"""18 task = selector.prepare(example_tasks["schedule"])19 assert len(task) == 120 assert task[0]["verb"] == "schedule"21 def test_send_task(self, selector, no_automate, example_tasks):22 """Test that a send task can be prepared"""23 task = selector.prepare(example_tasks["send"])24 assert len(task) == 125 assert task[0]["verb"] == "send"26 def test_remove_task(self, selector, no_automate, example_tasks):27 """Test that a remove scheduled event task can be prepared"""28 task = selector.prepare(example_tasks["remove"])29 assert len(task) == 130 assert task[0]["verb"] == "remove"...

Full Screen

Full Screen

test_scheduler.py

Source:test_scheduler.py Github

copy

Full Screen

...10 def schedule_single_task(self):11 self.scheduler = TaskScheduler(QtScheduler())12 self.task = Task(test_func)13 self.scheduledTask = self.scheduler.schedule_task_in_interval(self.task, 1, Time.SEC)14 def test_schedule_task(self):15 assert self.scheduledTask != None16 def test_cancel_scheduled_task(self):17 self.scheduler.cancel_scheduled_task(self.scheduledTask)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful