How to use task_a method in locust

Best Python code snippet using locust

test_immediate.py

Source:test_immediate.py Github

copy

Full Screen

...8 def get_huey(self):9 return MemoryHuey(immediate=True, utc=False)10 def test_immediate(self):11 @self.huey.task()12 def task_a(n):13 return n + 114 r = task_a(3)15 # Task is not enqueued, but the result *is* stored in the result-store.16 self.assertEqual(len(self.huey), 0)17 self.assertEqual(self.huey.result_count(), 1)18 self.assertEqual(r.get(), 4)19 # After reading, result is removed, as we would expect.20 self.assertEqual(self.huey.result_count(), 0)21 # Cannot add 1 to "None", this produces an error. We get the usual22 # TaskException, which wraps the TypeError.23 r_err = task_a(None)24 self.assertRaises(TaskException, r_err.get)25 def test_immediate_pipeline(self):26 @self.huey.task()27 def add(a, b):28 return a + b29 p = add.s(3, 4).then(add, 5).then(add, 6).then(add, 7)30 result_group = self.huey.enqueue(p)31 self.assertEqual(result_group(), [7, 12, 18, 25])32 def test_immediate_scheduling(self):33 @self.huey.task()34 def task_a(n):35 return n + 136 r = task_a.schedule((3,), delay=10)37 # Task is not enqueued, no result is generated, the task is added to38 # the schedule, however -- even though the scheduler never runs in39 # immediate mode.40 self.assertEqual(len(self.huey), 0)41 self.assertEqual(self.huey.result_count(), 0)42 self.assertEqual(self.huey.scheduled_count(), 1)43 self.assertTrue(r.get() is None)44 def test_immediate_reschedule(self):45 state = []46 @self.huey.task(context=True)47 def task_s(task=None):48 state.append(task.id)49 return 150 r = task_s.schedule(delay=60)51 self.assertEqual(len(self.huey), 0)52 self.assertTrue(r() is None)53 r2 = r.reschedule()54 self.assertTrue(r.id != r2.id)55 self.assertEqual(state, [r2.id])56 self.assertEqual(r2(), 1)57 self.assertEqual(len(self.huey), 0)58 self.assertTrue(r.is_revoked())59 # Because the scheduler never picks up the original task (r), its60 # revocation key sits in the result store and the task is in the61 # schedule still.62 self.assertEqual(self.huey.result_count(), 1)63 self.assertEqual(self.huey.scheduled_count(), 1)64 def test_immediate_revoke_restore(self):65 @self.huey.task()66 def task_a(n):67 return n + 168 task_a.revoke()69 r = task_a(3)70 self.assertEqual(len(self.huey), 0)71 self.assertTrue(r.get() is None)72 self.assertTrue(task_a.restore())73 r = task_a(4)74 self.assertEqual(r.get(), 5)75 def test_swap_immediate(self):76 @self.huey.task()77 def task_a(n):78 return n + 179 r = task_a(1)80 self.assertEqual(r.get(), 2)81 self.huey.immediate = False82 r = task_a(2)83 self.assertEqual(len(self.huey), 1)84 self.assertEqual(self.huey.result_count(), 0)85 task = self.huey.dequeue()86 self.assertEqual(self.huey.execute(task), 3)87 self.assertEqual(r.get(), 3)88 self.huey.immediate = True89 r = task_a(3)90 self.assertEqual(r.get(), 4)91 self.assertEqual(len(self.huey), 0)92 self.assertEqual(self.huey.result_count(), 0)93 def test_map(self):94 @self.huey.task()95 def task_a(n):96 return n + 197 result_group = task_a.map(range(8))98 self.assertEqual(result_group(), [1, 2, 3, 4, 5, 6, 7, 8])99class NoUseException(Exception): pass100class NoUseStorage(BlackHoleStorage):101 def enqueue(self, data, priority=None): raise NoUseException()102 def dequeue(self): raise NoUseException()103 def add_to_schedule(self, data, ts, utc): raise NoUseException()104 def read_schedule(self, ts): raise NoUseException()105 def put_data(self, key, value): raise NoUseException()106 def peek_data(self, key): raise NoUseException()107 def pop_data(self, key): raise NoUseException()108 def has_data_for_key(self, key): raise NoUseException()109 def put_if_empty(self, key, value): raise NoUseException()110class NoUseHuey(Huey):111 def get_storage(self, **storage_kwargs):112 return NoUseStorage()113class TestImmediateMemoryStorage(BaseTestCase):114 def get_huey(self):115 return NoUseHuey(utc=False)116 def test_immediate_storage(self):117 @self.huey.task()118 def task_a(n):119 return n + 1120 self.huey.immediate = True121 # If any operation happens to touch the "real" storage engine, an122 # exception will be raised. These tests validate that immediate mode123 # doesn't accidentally interact with the live storage.124 res = task_a(2)125 self.assertEqual(res(), 3)126 task_a.revoke()127 res = task_a(3)128 self.assertTrue(res() is None)129 self.assertTrue(task_a.restore())130 res = task_a(4)131 self.assertEqual(res(), 5)132 eta = datetime.datetime.now() + datetime.timedelta(seconds=60)133 res = task_a.schedule((5,), eta=eta)134 self.assertTrue(res() is None)135 minus_1 = eta - datetime.timedelta(seconds=1)136 self.assertEqual(self.huey.read_schedule(minus_1), [])137 tasks = self.huey.read_schedule(eta)138 self.assertEqual([t.id for t in tasks], [res.id])139 self.assertTrue(res() is None)140 # Switch back to regular storage / non-immediate mode.141 self.huey.immediate = False142 self.assertRaises(NoUseException, task_a, 1)143 # Switch back to immediate mode.144 self.huey.immediate = True145 res = task_a(10)146 self.assertEqual(res(), 11)147 def test_immediate_real_storage(self):148 self.huey.immediate_use_memory = False149 @self.huey.task()150 def task_a(n):151 return n + 1152 self.huey.immediate = True153 self.assertRaises(NoUseException, task_a, 1)154 self.huey.immediate = False...

Full Screen

Full Screen

test_signals.py

Source:test_signals.py Github

copy

Full Screen

...11 self.assertEqual([s[0] for s in self._state], expected)12 self._state = []13 def test_signals_simple(self):14 @self.huey.task()15 def task_a(n):16 return n + 117 r = task_a(3)18 self.assertSignals([])19 self.assertEqual(self.execute_next(), 4)20 self.assertSignals([SIGNAL_EXECUTING, SIGNAL_COMPLETE])21 r = task_a.schedule((2,), delay=60)22 self.assertSignals([])23 self.assertTrue(self.execute_next() is None)24 self.assertSignals([SIGNAL_SCHEDULED])25 r = task_a(None)26 self.assertSignals([])27 self.assertTrue(self.execute_next() is None)28 self.assertSignals([SIGNAL_EXECUTING, SIGNAL_ERROR])29 def test_signal_complete_result_ready(self):30 @self.huey.task()31 def task_a(n):32 return n + 133 results = []34 @self.huey.signal(SIGNAL_COMPLETE)35 def on_complete(sig, task, *_):36 results.append(self.huey.result(task.id))37 r = task_a(2)38 self.assertEqual(self.execute_next(), 3)39 self.assertEqual(results, [3])40 def test_signals_on_retry(self):41 @self.huey.task(retries=1)42 def task_a(n):43 return n + 144 r = task_a(None)45 self.assertSignals([])46 self.assertTrue(self.execute_next() is None)47 self.assertSignals([SIGNAL_EXECUTING, SIGNAL_ERROR, SIGNAL_RETRYING])48 self.assertTrue(self.execute_next() is None)49 self.assertSignals([SIGNAL_EXECUTING, SIGNAL_ERROR])50 @self.huey.task(retries=1, retry_delay=60)51 def task_b(n):52 return n + 153 r = task_b(None)54 self.assertSignals([])55 self.assertTrue(self.execute_next() is None)56 self.assertSignals([SIGNAL_EXECUTING, SIGNAL_ERROR, SIGNAL_RETRYING,57 SIGNAL_SCHEDULED])58 def test_signals_revoked(self):59 @self.huey.task()60 def task_a(n):61 return n + 162 task_a.revoke(revoke_once=True)63 r = task_a(2)64 self.assertSignals([])65 self.assertTrue(self.execute_next() is None)66 self.assertSignals([SIGNAL_REVOKED])67 r = task_a(3)68 self.assertEqual(self.execute_next(), 4)69 self.assertSignals([SIGNAL_EXECUTING, SIGNAL_COMPLETE])70 def test_signals_locked(self):71 @self.huey.task()72 @self.huey.lock_task('lock-a')73 def task_a(n):74 return n + 175 r = task_a(1)76 self.assertSignals([])77 self.assertEqual(self.execute_next(), 2)78 self.assertSignals([SIGNAL_EXECUTING, SIGNAL_COMPLETE])79 with self.huey.lock_task('lock-a'):80 r = task_a(2)81 self.assertSignals([])82 self.assertTrue(self.execute_next() is None)83 self.assertSignals([SIGNAL_EXECUTING, SIGNAL_LOCKED])84 def test_specific_handler(self):85 extra_state = []86 @self.huey.signal(SIGNAL_EXECUTING)87 def extra_handler(signal, task):88 extra_state.append(task.args[0])89 @self.huey.task()90 def task_a(n):91 return n + 192 r = task_a(3)93 self.assertEqual(extra_state, [])94 self.assertEqual(self.execute_next(), 4)95 self.assertEqual(extra_state, [3])96 self.assertSignals([SIGNAL_EXECUTING, SIGNAL_COMPLETE])97 r2 = task_a(1)98 self.assertEqual(self.execute_next(), 2)99 self.assertEqual(extra_state, [3, 1])100 self.assertSignals([SIGNAL_EXECUTING, SIGNAL_COMPLETE])101 self.huey.disconnect_signal(extra_handler, SIGNAL_EXECUTING)102 r3 = task_a(2)103 self.assertEqual(self.execute_next(), 3)104 self.assertEqual(extra_state, [3, 1])105 self.assertSignals([SIGNAL_EXECUTING, SIGNAL_COMPLETE])106 def test_multi_handlers(self):107 state1 = []108 state2 = []109 @self.huey.signal(SIGNAL_EXECUTING, SIGNAL_COMPLETE)110 def handler1(signal, task):111 state1.append(signal)112 @self.huey.signal(SIGNAL_EXECUTING, SIGNAL_COMPLETE)113 def handler2(signal, task):114 state2.append(signal)115 @self.huey.task()116 def task_a(n):117 return n + 1118 r = task_a(1)119 self.assertEqual(self.execute_next(), 2)120 self.assertEqual(state1, ['executing', 'complete'])121 self.assertEqual(state2, ['executing', 'complete'])122 self.huey.disconnect_signal(handler1, SIGNAL_COMPLETE)123 self.huey.disconnect_signal(handler2)124 r2 = task_a(2)125 self.assertEqual(self.execute_next(), 3)126 self.assertEqual(state1, ['executing', 'complete', 'executing'])...

Full Screen

Full Screen

test_registry.py

Source:test_registry.py Github

copy

Full Screen

...5 def setUp(self):6 super(TestRegistry, self).setUp()7 self.registry = self.huey._registry8 def test_register_unique(self):9 def task_a(): pass10 def task_b(): pass11 ta = self.huey.task()(task_a)12 self.assertRaises(ValueError, self.huey.task(), task_a)13 self.assertRaises(ValueError, self.huey.task(name='task_a'), task_b)14 # We can register task_b and re-register task_a providing a new name.15 tb = self.huey.task()(task_b)16 ta2 = self.huey.task(name='task_a2')(task_a)17 t1 = ta.s()18 t2 = ta2.s()19 self.assertTrue(t1.name != t2.name)20 def test_register_unregister(self):21 @self.huey.task()22 def task_a():23 pass24 self.assertTrue(task_a.unregister())25 self.assertFalse(task_a.unregister())26 def test_message_wrapping(self):27 @self.huey.task(retries=1)28 def task_a(p1, p2, p3=3, p4=None):29 pass30 task = task_a.s('v1', 'v2', p4='v4')31 message = self.registry.create_message(task)32 self.assertEqual(message.id, task.id)33 self.assertEqual(message.retries, 1)34 self.assertEqual(message.retry_delay, 0)35 self.assertEqual(message.args, ('v1', 'v2'))36 self.assertEqual(message.kwargs, {'p4': 'v4'})37 self.assertTrue(message.on_complete is None)38 self.assertTrue(message.on_error is None)39 task2 = self.registry.create_task(message)40 self.assertEqual(task2.id, task.id)41 self.assertEqual(task2.retries, 1)42 self.assertEqual(task2.retry_delay, 0)43 self.assertEqual(task2.args, ('v1', 'v2'))44 self.assertEqual(task2.kwargs, {'p4': 'v4'})45 self.assertTrue(task2.on_complete is None)46 self.assertTrue(task2.on_error is None)47 def test_missing_task(self):48 @self.huey.task()49 def task_a():50 pass51 # Serialize the task invocation.52 task = task_a.s()53 message = self.registry.create_message(task)54 # Unregister the task, which will raise an error when we try to55 # deserialize the message back into a task instance.56 self.assertTrue(task_a.unregister())57 self.assertRaises(HueyException, self.registry.create_task, message)58 # Similarly, we can no longer serialize the task to a message.59 self.assertRaises(HueyException, self.registry.create_message, task)60 def test_periodic_tasks(self):61 def task_fn(): pass62 self.huey.task(name='a')(task_fn)63 p1 = self.huey.periodic_task(lambda _: False, name='p1')(task_fn)64 p2 = self.huey.periodic_task(lambda _: False, name='p2')(task_fn)65 self.huey.task(name='b')(task_fn)66 periodic = sorted(t.name for t in self.registry.periodic_tasks)67 self.assertEqual(periodic, ['p1', 'p2'])68 self.assertTrue(p1.unregister())69 periodic = sorted(t.name for t in self.registry.periodic_tasks)70 self.assertEqual(periodic, ['p2'])71 def test_huey1_compat(self):72 @self.huey.task()73 def task_a(n):74 return n + 175 t = task_a.s(2)76 # Enqueue a message using the old message serialization format.77 tc = task_a.task_class78 old_message = (t.id, '%s.%s' % (tc.__module__, tc.__name__), None, 0,79 0, ((2,), {}), None)80 self.huey.storage.enqueue(pickle.dumps(old_message))81 self.assertEqual(len(self.huey), 1)82 self.assertEqual(self.execute_next(), 3)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful