Best Python code snippet using fMBT_python
wasp_general_task_scheduler_scheduler_ignore.py
Source:wasp_general_task_scheduler_scheduler_ignore.py  
...111		assert(registry.maximum_records() is None)112		assert(registry.has_records() is False)113		assert(len(registry) == 0)114		drop_count = []115		def on_drop():116			drop_count.append(None)117		task = TestWSchedulerWatchdog.DummyTask()118		wait_schedule1 = WScheduleRecord(119			task, policy=WScheduleRecord.PostponePolicy.wait, on_drop=on_drop120		)121		wait_schedule2 = WScheduleRecord(122			task, policy=WScheduleRecord.PostponePolicy.wait, on_drop=on_drop123		)124		assert(len(drop_count) == 0)125		assert(len(registry) == 0)126		registry.postpone(wait_schedule1)127		assert(len(drop_count) == 0)128		assert(len(registry) == 1)129		registry.postpone(wait_schedule2)130		assert(len(drop_count) == 0)131		assert(len(registry) == 2)132		tasks = [x for x in registry]133		assert(len(registry) == 0)134		assert(wait_schedule1 in tasks)135		assert(wait_schedule2 in tasks)136		registry = WPostponedRecordRegistry(maximum_records=0)137		assert(len(drop_count) == 0)138		assert(len(registry) == 0)139		registry.postpone(wait_schedule1)140		assert(len(drop_count) == 1)141		assert(len(registry) == 0)142		registry.postpone(wait_schedule2)143		assert(len(drop_count) == 2)144		assert(len(registry) == 0)145		drop_count.clear()146		registry = WPostponedRecordRegistry(maximum_records=1)147		assert(len(drop_count) == 0)148		assert(len(registry) == 0)149		registry.postpone(wait_schedule1)150		assert(len(drop_count) == 0)151		assert(len(registry) == 1)152		registry.postpone(wait_schedule2)153		assert(len(drop_count) == 1)154		assert(len(registry) == 1)155		tasks = [x for x in registry]156		assert(len(registry) == 0)157		assert(wait_schedule1 in tasks)158		assert(wait_schedule2 not in tasks)159		drop_count.clear()160		registry = WPostponedRecordRegistry()161		assert(len(drop_count) == 0)162		assert(len(registry) == 0)163		drop_schedule = WScheduleRecord(164			task, policy=WScheduleRecord.PostponePolicy.drop, on_drop=on_drop165		)166		registry.postpone(drop_schedule)167		assert(len(drop_count) == 1)168		assert(len(registry) == 0)169		drop_count.clear()170		postpone_first_group_1_schedule1 = WScheduleRecord(171			task, policy=WScheduleRecord.PostponePolicy.postpone_first, task_group_id='group1', on_drop=on_drop172		)173		postpone_first_group_1_schedule2 = WScheduleRecord(174			task, policy=WScheduleRecord.PostponePolicy.postpone_first, task_group_id='group1', on_drop=on_drop175		)176		postpone_first_group_2_schedule = WScheduleRecord(177			task, policy=WScheduleRecord.PostponePolicy.postpone_first, task_group_id='group2', on_drop=on_drop178		)179		postpone_first_schedule1 = WScheduleRecord(180			task, policy=WScheduleRecord.PostponePolicy.postpone_first, on_drop=on_drop181		)182		postpone_first_schedule2 = WScheduleRecord(183			task, policy=WScheduleRecord.PostponePolicy.postpone_first, on_drop=on_drop184		)185		registry.postpone(postpone_first_group_1_schedule1)186		assert(len(drop_count) == 0)187		assert(len(registry) == 1)188		registry.postpone(postpone_first_group_1_schedule2)189		assert(len(drop_count) == 1)190		assert(len(registry) == 1)191		registry.postpone(postpone_first_group_2_schedule)192		assert(len(drop_count) == 1)193		assert(len(registry) == 2)194		registry.postpone(postpone_first_schedule1)195		assert(len(drop_count) == 1)196		assert(len(registry) == 3)197		registry.postpone(postpone_first_schedule2)198		assert(len(drop_count) == 1)199		assert(len(registry) == 4)200		tasks = [x for x in registry]201		assert(len(registry) == 0)202		assert(postpone_first_group_1_schedule1 in tasks)203		assert(postpone_first_group_2_schedule in tasks)204		assert(postpone_first_schedule1 in tasks)205		assert(postpone_first_schedule2 in tasks)206		wait_group_1 = WScheduleRecord(207			task, policy=WScheduleRecord.PostponePolicy.wait, task_group_id='group1', on_drop=on_drop208		)209		registry.postpone(wait_group_1)210		pytest.raises(RuntimeError, registry.postpone, postpone_first_group_1_schedule1)211		tasks = [x for x in registry]212		drop_count.clear()213		postpone_last_group_1_schedule1 = WScheduleRecord(214			task, policy=WScheduleRecord.PostponePolicy.postpone_last, task_group_id='group1', on_drop=on_drop215		)216		postpone_last_group_1_schedule2 = WScheduleRecord(217			task, policy=WScheduleRecord.PostponePolicy.postpone_last, task_group_id='group1', on_drop=on_drop218		)219		postpone_last_group_2_schedule = WScheduleRecord(220			task, policy=WScheduleRecord.PostponePolicy.postpone_last, task_group_id='group2', on_drop=on_drop221		)222		postpone_last_schedule1 = WScheduleRecord(223			task, policy=WScheduleRecord.PostponePolicy.postpone_last, on_drop=on_drop224		)225		postpone_last_schedule2 = WScheduleRecord(226			task, policy=WScheduleRecord.PostponePolicy.postpone_last, on_drop=on_drop227		)228		registry.postpone(postpone_last_group_1_schedule1)229		assert(len(drop_count) == 0)230		assert(len(registry) == 1)231		registry.postpone(postpone_last_group_1_schedule2)232		assert(len(drop_count) == 1)233		assert(len(registry) == 1)234		registry.postpone(postpone_last_group_2_schedule)235		assert(len(drop_count) == 1)236		assert(len(registry) == 2)237		registry.postpone(postpone_last_schedule1)238		assert(len(drop_count) == 1)239		assert(len(registry) == 3)240		registry.postpone(postpone_last_schedule2)241		assert(len(drop_count) == 1)242		assert(len(registry) == 4)243		tasks = [x for x in registry]244		assert(len(registry) == 0)245		assert(postpone_last_group_1_schedule2 in tasks)246		assert(postpone_last_group_2_schedule in tasks)247		assert(postpone_last_schedule1 in tasks)248		assert(postpone_last_schedule2 in tasks)249		wait_group_1 = WScheduleRecord(250			task, policy=WScheduleRecord.PostponePolicy.wait, task_group_id='group1', on_drop=on_drop251		)252		registry.postpone(wait_group_1)253		pytest.raises(RuntimeError, registry.postpone, postpone_last_group_1_schedule1)254		wait_group_1.policy = lambda: None255		pytest.raises(RuntimeError, registry.postpone, wait_group_1)256class TestWTaskSourceRegistry:257	class TaskSource(WTaskSourceProto):258		def __init__(self):259			WTaskSourceProto.__init__(self)260			self.tasks = []261		def has_records(self):262			if self.tasks is not None:263				result = tuple(self.tasks)264				self.tasks.clear()265				return result266		def next_start(self):267			if len(self.tasks) > 0:268				return utc_datetime()269		def tasks_planned(self):270			return len(self.tasks)271		def scheduler_service(self):272			return None273	@repeat_fn(__test_repeat_count__)274	def test(self):275		registry = WTaskSourceRegistry()276		assert(registry.check() is None)277		assert(registry.task_sources() == tuple())278		task_source1 = TestWTaskSourceRegistry.TaskSource()279		registry.add_source(task_source1)280		assert(registry.check() is None)281		assert(registry.task_sources() == (task_source1, ))282		task1 = WScheduleRecord(TestWSchedulerWatchdog.DummyTask())283		task_source1.tasks.append(task1)284		registry.update()285		assert(registry.check() == (task1, ))286		assert(registry.check() is None)287		task_source2 = TestWTaskSourceRegistry.TaskSource()288		registry.add_source(task_source2)289		assert(registry.check() is None)290		result = registry.task_sources()291		assert(result == (task_source1, task_source2) or result == (task_source2, task_source1))292		task_source1.tasks.append(task1)293		task2 = WScheduleRecord(TestWSchedulerWatchdog.DummyTask())294		task_source2.tasks.append(task2)295		assert(registry.check() is None)296		registry.update(task_source=task_source2)297		assert(registry.check() == (task2, ))298		assert(registry.check() == (task1, ))299		utc_now = utc_datetime()300		task_source1.tasks.append(task1)301		task_source2.tasks.append(task2)302		task_source2.next_start = lambda: utc_now303		registry.update()304		assert(registry.check() == (task2, ))305		task_source1.next_start = lambda: utc_now306		task_source2.tasks.append(task2)307		registry.update()308		result = registry.check()309		assert(result == (task1, task2) or result == (task2, task1))310		task_source1.next_start = lambda: datetime.now()311		pytest.raises(ValueError, registry.update)312class TestWSchedulerService:313	__wait_task_timeout__ = 0.001314	class HFSchedulerService(WSchedulerService):315		__thread_polling_timeout__ = (316			TestWRunningRecordRegistry.HFRunningTaskRegistry.__thread_polling_timeout__ / 2317		)318	class DummyTask(TestWSchedulerWatchdog.DummyTask):319		__result__ = 0320		__dropped__ = 0321		__waited__ = 0322		def __init__(self, wait_for=None):323			TestWSchedulerWatchdog.DummyTask.__init__(self, wait_for=wait_for)324			self.drop_event = Event()325			self.wait_event = Event()326		def thread_started(self):327			TestWSchedulerWatchdog.DummyTask.thread_started(self)328			TestWSchedulerService.DummyTask.__result__ += 1329		def on_drop(self):330			TestWSchedulerService.DummyTask.__dropped__ += 1331			self.drop_event.set()332		def on_wait(self):333			TestWSchedulerService.DummyTask.__waited__ += 1334			self.wait_event.set()335	def wait_for_events(*events, every=False):336		events = list(events)337		while len(events) > 0:338			for i in range(len(events)):339				event = events[i]340				if event.is_set() is True:341					if every is False:342						return343					events.pop(i)...test_future_queue_app.py
Source:test_future_queue_app.py  
...28    except asyncio.CancelledError:29      log.debug("Cancelled")30    finally:31      await self._mgr.close()32  def on_drop(self, message, reason):33    print(f"Dropped: {message} (reason: {reason})")34  async def consumer(self):35    while True:36      msg = await self._queue.receive()37      print(f"Consumed: {msg}")38      await asyncio.sleep(2)39  async def producer(self):40    i = 0 41    while True:42      await self._pub.publish({'type' : 'test', 'i' : i})43      await asyncio.sleep(1)...test_managed_queue_app.py
Source:test_managed_queue_app.py  
...28    except asyncio.CancelledError:29      log.debug("Cancelled")30    finally:31      await self._mgr.close()32  def on_drop(self, message, reason):33    print(f"Dropped: {message} (reason: {reason.value})")34  async def consumer(self):35    while True:36      msg = await self._queue.receive()37      print(f"Consumed: {msg}")38      await asyncio.sleep(2)39  async def producer(self):40    i = 0 41    while True:42      await self._pub.publish({'type' : 'test', 'i' : i})43      await asyncio.sleep(1)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
