How to use on_drop method in fMBT

Best Python code snippet using fMBT_python

wasp_general_task_scheduler_scheduler_ignore.py

Source:wasp_general_task_scheduler_scheduler_ignore.py Github

copy

Full Screen

...111 assert(registry.maximum_records() is None)112 assert(registry.has_records() is False)113 assert(len(registry) == 0)114 drop_count = []115 def on_drop():116 drop_count.append(None)117 task = TestWSchedulerWatchdog.DummyTask()118 wait_schedule1 = WScheduleRecord(119 task, policy=WScheduleRecord.PostponePolicy.wait, on_drop=on_drop120 )121 wait_schedule2 = WScheduleRecord(122 task, policy=WScheduleRecord.PostponePolicy.wait, on_drop=on_drop123 )124 assert(len(drop_count) == 0)125 assert(len(registry) == 0)126 registry.postpone(wait_schedule1)127 assert(len(drop_count) == 0)128 assert(len(registry) == 1)129 registry.postpone(wait_schedule2)130 assert(len(drop_count) == 0)131 assert(len(registry) == 2)132 tasks = [x for x in registry]133 assert(len(registry) == 0)134 assert(wait_schedule1 in tasks)135 assert(wait_schedule2 in tasks)136 registry = WPostponedRecordRegistry(maximum_records=0)137 assert(len(drop_count) == 0)138 assert(len(registry) == 0)139 registry.postpone(wait_schedule1)140 assert(len(drop_count) == 1)141 assert(len(registry) == 0)142 registry.postpone(wait_schedule2)143 assert(len(drop_count) == 2)144 assert(len(registry) == 0)145 drop_count.clear()146 registry = WPostponedRecordRegistry(maximum_records=1)147 assert(len(drop_count) == 0)148 assert(len(registry) == 0)149 registry.postpone(wait_schedule1)150 assert(len(drop_count) == 0)151 assert(len(registry) == 1)152 registry.postpone(wait_schedule2)153 assert(len(drop_count) == 1)154 assert(len(registry) == 1)155 tasks = [x for x in registry]156 assert(len(registry) == 0)157 assert(wait_schedule1 in tasks)158 assert(wait_schedule2 not in tasks)159 drop_count.clear()160 registry = WPostponedRecordRegistry()161 assert(len(drop_count) == 0)162 assert(len(registry) == 0)163 drop_schedule = WScheduleRecord(164 task, policy=WScheduleRecord.PostponePolicy.drop, on_drop=on_drop165 )166 registry.postpone(drop_schedule)167 assert(len(drop_count) == 1)168 assert(len(registry) == 0)169 drop_count.clear()170 postpone_first_group_1_schedule1 = WScheduleRecord(171 task, policy=WScheduleRecord.PostponePolicy.postpone_first, task_group_id='group1', on_drop=on_drop172 )173 postpone_first_group_1_schedule2 = WScheduleRecord(174 task, policy=WScheduleRecord.PostponePolicy.postpone_first, task_group_id='group1', on_drop=on_drop175 )176 postpone_first_group_2_schedule = WScheduleRecord(177 task, policy=WScheduleRecord.PostponePolicy.postpone_first, task_group_id='group2', on_drop=on_drop178 )179 postpone_first_schedule1 = WScheduleRecord(180 task, policy=WScheduleRecord.PostponePolicy.postpone_first, on_drop=on_drop181 )182 postpone_first_schedule2 = WScheduleRecord(183 task, policy=WScheduleRecord.PostponePolicy.postpone_first, on_drop=on_drop184 )185 registry.postpone(postpone_first_group_1_schedule1)186 assert(len(drop_count) == 0)187 assert(len(registry) == 1)188 registry.postpone(postpone_first_group_1_schedule2)189 assert(len(drop_count) == 1)190 assert(len(registry) == 1)191 registry.postpone(postpone_first_group_2_schedule)192 assert(len(drop_count) == 1)193 assert(len(registry) == 2)194 registry.postpone(postpone_first_schedule1)195 assert(len(drop_count) == 1)196 assert(len(registry) == 3)197 registry.postpone(postpone_first_schedule2)198 assert(len(drop_count) == 1)199 assert(len(registry) == 4)200 tasks = [x for x in registry]201 assert(len(registry) == 0)202 assert(postpone_first_group_1_schedule1 in tasks)203 assert(postpone_first_group_2_schedule in tasks)204 assert(postpone_first_schedule1 in tasks)205 assert(postpone_first_schedule2 in tasks)206 wait_group_1 = WScheduleRecord(207 task, policy=WScheduleRecord.PostponePolicy.wait, task_group_id='group1', on_drop=on_drop208 )209 registry.postpone(wait_group_1)210 pytest.raises(RuntimeError, registry.postpone, postpone_first_group_1_schedule1)211 tasks = [x for x in registry]212 drop_count.clear()213 postpone_last_group_1_schedule1 = WScheduleRecord(214 task, policy=WScheduleRecord.PostponePolicy.postpone_last, task_group_id='group1', on_drop=on_drop215 )216 postpone_last_group_1_schedule2 = WScheduleRecord(217 task, policy=WScheduleRecord.PostponePolicy.postpone_last, task_group_id='group1', on_drop=on_drop218 )219 postpone_last_group_2_schedule = WScheduleRecord(220 task, policy=WScheduleRecord.PostponePolicy.postpone_last, task_group_id='group2', on_drop=on_drop221 )222 postpone_last_schedule1 = WScheduleRecord(223 task, policy=WScheduleRecord.PostponePolicy.postpone_last, on_drop=on_drop224 )225 postpone_last_schedule2 = WScheduleRecord(226 task, policy=WScheduleRecord.PostponePolicy.postpone_last, on_drop=on_drop227 )228 registry.postpone(postpone_last_group_1_schedule1)229 assert(len(drop_count) == 0)230 assert(len(registry) == 1)231 registry.postpone(postpone_last_group_1_schedule2)232 assert(len(drop_count) == 1)233 assert(len(registry) == 1)234 registry.postpone(postpone_last_group_2_schedule)235 assert(len(drop_count) == 1)236 assert(len(registry) == 2)237 registry.postpone(postpone_last_schedule1)238 assert(len(drop_count) == 1)239 assert(len(registry) == 3)240 registry.postpone(postpone_last_schedule2)241 assert(len(drop_count) == 1)242 assert(len(registry) == 4)243 tasks = [x for x in registry]244 assert(len(registry) == 0)245 assert(postpone_last_group_1_schedule2 in tasks)246 assert(postpone_last_group_2_schedule in tasks)247 assert(postpone_last_schedule1 in tasks)248 assert(postpone_last_schedule2 in tasks)249 wait_group_1 = WScheduleRecord(250 task, policy=WScheduleRecord.PostponePolicy.wait, task_group_id='group1', on_drop=on_drop251 )252 registry.postpone(wait_group_1)253 pytest.raises(RuntimeError, registry.postpone, postpone_last_group_1_schedule1)254 wait_group_1.policy = lambda: None255 pytest.raises(RuntimeError, registry.postpone, wait_group_1)256class TestWTaskSourceRegistry:257 class TaskSource(WTaskSourceProto):258 def __init__(self):259 WTaskSourceProto.__init__(self)260 self.tasks = []261 def has_records(self):262 if self.tasks is not None:263 result = tuple(self.tasks)264 self.tasks.clear()265 return result266 def next_start(self):267 if len(self.tasks) > 0:268 return utc_datetime()269 def tasks_planned(self):270 return len(self.tasks)271 def scheduler_service(self):272 return None273 @repeat_fn(__test_repeat_count__)274 def test(self):275 registry = WTaskSourceRegistry()276 assert(registry.check() is None)277 assert(registry.task_sources() == tuple())278 task_source1 = TestWTaskSourceRegistry.TaskSource()279 registry.add_source(task_source1)280 assert(registry.check() is None)281 assert(registry.task_sources() == (task_source1, ))282 task1 = WScheduleRecord(TestWSchedulerWatchdog.DummyTask())283 task_source1.tasks.append(task1)284 registry.update()285 assert(registry.check() == (task1, ))286 assert(registry.check() is None)287 task_source2 = TestWTaskSourceRegistry.TaskSource()288 registry.add_source(task_source2)289 assert(registry.check() is None)290 result = registry.task_sources()291 assert(result == (task_source1, task_source2) or result == (task_source2, task_source1))292 task_source1.tasks.append(task1)293 task2 = WScheduleRecord(TestWSchedulerWatchdog.DummyTask())294 task_source2.tasks.append(task2)295 assert(registry.check() is None)296 registry.update(task_source=task_source2)297 assert(registry.check() == (task2, ))298 assert(registry.check() == (task1, ))299 utc_now = utc_datetime()300 task_source1.tasks.append(task1)301 task_source2.tasks.append(task2)302 task_source2.next_start = lambda: utc_now303 registry.update()304 assert(registry.check() == (task2, ))305 task_source1.next_start = lambda: utc_now306 task_source2.tasks.append(task2)307 registry.update()308 result = registry.check()309 assert(result == (task1, task2) or result == (task2, task1))310 task_source1.next_start = lambda: datetime.now()311 pytest.raises(ValueError, registry.update)312class TestWSchedulerService:313 __wait_task_timeout__ = 0.001314 class HFSchedulerService(WSchedulerService):315 __thread_polling_timeout__ = (316 TestWRunningRecordRegistry.HFRunningTaskRegistry.__thread_polling_timeout__ / 2317 )318 class DummyTask(TestWSchedulerWatchdog.DummyTask):319 __result__ = 0320 __dropped__ = 0321 __waited__ = 0322 def __init__(self, wait_for=None):323 TestWSchedulerWatchdog.DummyTask.__init__(self, wait_for=wait_for)324 self.drop_event = Event()325 self.wait_event = Event()326 def thread_started(self):327 TestWSchedulerWatchdog.DummyTask.thread_started(self)328 TestWSchedulerService.DummyTask.__result__ += 1329 def on_drop(self):330 TestWSchedulerService.DummyTask.__dropped__ += 1331 self.drop_event.set()332 def on_wait(self):333 TestWSchedulerService.DummyTask.__waited__ += 1334 self.wait_event.set()335 def wait_for_events(*events, every=False):336 events = list(events)337 while len(events) > 0:338 for i in range(len(events)):339 event = events[i]340 if event.is_set() is True:341 if every is False:342 return343 events.pop(i)...

Full Screen

Full Screen

test_future_queue_app.py

Source:test_future_queue_app.py Github

copy

Full Screen

...28 except asyncio.CancelledError:29 log.debug("Cancelled")30 finally:31 await self._mgr.close()32 def on_drop(self, message, reason):33 print(f"Dropped: {message} (reason: {reason})")34 async def consumer(self):35 while True:36 msg = await self._queue.receive()37 print(f"Consumed: {msg}")38 await asyncio.sleep(2)39 async def producer(self):40 i = 0 41 while True:42 await self._pub.publish({'type' : 'test', 'i' : i})43 await asyncio.sleep(1)...

Full Screen

Full Screen

test_managed_queue_app.py

Source:test_managed_queue_app.py Github

copy

Full Screen

...28 except asyncio.CancelledError:29 log.debug("Cancelled")30 finally:31 await self._mgr.close()32 def on_drop(self, message, reason):33 print(f"Dropped: {message} (reason: {reason.value})")34 async def consumer(self):35 while True:36 msg = await self._queue.receive()37 print(f"Consumed: {msg}")38 await asyncio.sleep(2)39 async def producer(self):40 i = 0 41 while True:42 await self._pub.publish({'type' : 'test', 'i' : i})43 await asyncio.sleep(1)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful