How to use run_tasks method in Lemoncheesecake

Best Python code snippet using lemoncheesecake

test_trigger.py

Source:test_trigger.py Github

copy

Full Screen

...17 def setUpClass(cls):18 activate_module('tests')19 def setUp(self):20 TRIGGER_LOGS.clear()21 def run_tasks(self):22 pool = Pool()23 Queue = pool.get('ir.queue')24 transaction = Transaction()25 while transaction.tasks:26 task = Queue(transaction.tasks.pop())27 task.run()28 @with_transaction()29 def test_constraints(self):30 'Test constraints'31 pool = Pool()32 Model = pool.get('ir.model')33 Trigger = pool.get('ir.trigger')34 transaction = Transaction()35 model, = Model.search([36 ('model', '=', 'test.triggered'),37 ])38 values = {39 'name': 'Test',40 'model': model.id,41 'on_time': True,42 'condition': 'true',43 'action': 'test.trigger_action|trigger',44 }45 self.assertTrue(Trigger.create([values]))46 transaction.rollback()47 # on_exclusive48 for i in range(1, 4):49 for combination in combinations(50 ['create', 'write', 'delete'], i):51 combination_values = values.copy()52 for mode in combination:53 combination_values['on_%s' % mode] = True54 self.assertRaises(SQLConstraintError, Trigger.create,55 [combination_values])56 transaction.rollback()57 # check_condition58 condition_values = values.copy()59 condition_values['condition'] = '='60 self.assertRaises(TriggerConditionError, Trigger.create,61 [condition_values])62 transaction.rollback()63 # Restart the cache on the get_triggers method of ir.trigger64 Trigger._get_triggers_cache.clear()65 @with_transaction()66 def test_on_create(self):67 'Test on_create'68 pool = Pool()69 Model = pool.get('ir.model')70 Trigger = pool.get('ir.trigger')71 Triggered = pool.get('test.triggered')72 model, = Model.search([73 ('model', '=', 'test.triggered'),74 ])75 trigger, = Trigger.create([{76 'name': 'Test',77 'model': model.id,78 'on_create': True,79 'condition': 'true',80 'action': 'test.trigger_action|trigger',81 }])82 triggered, = Triggered.create([{83 'name': 'Test',84 }])85 self.run_tasks()86 self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])87 TRIGGER_LOGS.pop()88 # Trigger with condition89 condition = PYSONEncoder().encode(90 Eval('self', {}).get('name') == 'Bar')91 Trigger.write([trigger], {92 'condition': condition,93 })94 # Matching condition95 triggered, = Triggered.create([{96 'name': 'Bar',97 }])98 self.run_tasks()99 self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])100 TRIGGER_LOGS.pop()101 # Non matching condition102 triggered, = Triggered.create([{103 'name': 'Foo',104 }])105 self.run_tasks()106 self.assertEqual(TRIGGER_LOGS, [])107 # With limit number108 Trigger.write([trigger], {109 'condition': 'true',110 'limit_number': 1,111 })112 triggered, = Triggered.create([{113 'name': 'Test',114 }])115 self.run_tasks()116 self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])117 TRIGGER_LOGS.pop()118 # With minimum delay119 Trigger.write([trigger], {120 'limit_number': 0,121 'minimum_time_delay': datetime.timedelta(hours=1),122 })123 triggered, = Triggered.create([{124 'name': 'Test',125 }])126 self.run_tasks()127 self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])128 TRIGGER_LOGS.pop()129 # Restart the cache on the get_triggers method of ir.trigger130 Trigger._get_triggers_cache.clear()131 @with_transaction()132 def test_on_write(self):133 'Test on_write'134 pool = Pool()135 Model = pool.get('ir.model')136 Trigger = pool.get('ir.trigger')137 TriggerLog = pool.get('ir.trigger.log')138 Triggered = pool.get('test.triggered')139 model, = Model.search([140 ('model', '=', 'test.triggered'),141 ])142 trigger, = Trigger.create([{143 'name': 'Test',144 'model': model.id,145 'on_write': True,146 'condition': 'true',147 'action': 'test.trigger_action|trigger',148 }])149 triggered, = Triggered.create([{150 'name': 'Test',151 }])152 Triggered.write([triggered], {153 'name': 'Foo',154 })155 self.run_tasks()156 self.assertEqual(TRIGGER_LOGS, [])157 # Trigger with condition158 condition = PYSONEncoder().encode(159 Eval('self', {}).get('name') == 'Bar')160 Trigger.write([trigger], {161 'condition': condition,162 })163 # Matching condition164 Triggered.write([triggered], {165 'name': 'Bar',166 })167 self.run_tasks()168 self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])169 TRIGGER_LOGS.pop()170 # No change in condition171 Triggered.write([triggered], {172 'name': 'Bar',173 })174 self.run_tasks()175 self.assertEqual(TRIGGER_LOGS, [])176 # Different change in condition177 Triggered.write([triggered], {178 'name': 'Foo',179 })180 self.run_tasks()181 self.assertEqual(TRIGGER_LOGS, [])182 # With limit number183 condition = PYSONEncoder().encode(184 Eval('self', {}).get('name') == 'Bar')185 Trigger.write([trigger], {186 'condition': condition,187 'limit_number': 1,188 })189 triggered, = Triggered.create([{190 'name': 'Foo',191 }])192 Triggered.write([triggered], {193 'name': 'Bar',194 })195 Triggered.write([triggered], {196 'name': 'Foo',197 })198 Triggered.write([triggered], {199 'name': 'Bar',200 })201 self.run_tasks()202 self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])203 TRIGGER_LOGS.pop()204 # With minimum delay205 Trigger.write([trigger], {206 'limit_number': 0,207 'minimum_time_delay': datetime.timedelta.max,208 })209 triggered, = Triggered.create([{210 'name': 'Foo',211 }])212 for name in ('Bar', 'Foo', 'Bar'):213 Triggered.write([triggered], {214 'name': name,215 })216 self.run_tasks()217 self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])218 TRIGGER_LOGS.pop()219 Trigger.write([trigger], {220 'minimum_time_delay': datetime.timedelta(seconds=1),221 })222 triggered, = Triggered.create([{223 'name': 'Foo',224 }])225 for name in ('Bar', 'Foo'):226 Triggered.write([triggered], {227 'name': name,228 })229 self.run_tasks()230 self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])231 TRIGGER_LOGS.pop()232 Transaction().trigger_records.clear()233 # Make time pass by moving back in time the log creation234 trigger_log = TriggerLog.__table__()235 cursor = Transaction().connection.cursor()236 cursor.execute(*trigger_log.update(237 [trigger_log.create_date],238 [datetime.datetime.now() - datetime.timedelta(days=1)],239 where=trigger_log.record_id == triggered.id))240 Triggered.write([triggered], {241 'name': 'Bar',242 })243 self.run_tasks()244 self.assertEqual(TRIGGER_LOGS, [([triggered], trigger)])245 TRIGGER_LOGS.pop()246 # Restart the cache on the get_triggers method of ir.trigger247 Trigger._get_triggers_cache.clear()248 @with_transaction()249 def test_on_delete(self):250 'Test on_delete'251 pool = Pool()252 Model = pool.get('ir.model')253 Trigger = pool.get('ir.trigger')254 Triggered = pool.get('test.triggered')255 TriggerLog = pool.get('ir.trigger.log')256 model, = Model.search([257 ('model', '=', 'test.triggered'),...

Full Screen

Full Screen

tests.py

Source:tests.py Github

copy

Full Screen

1from django.test import TestCase2from account.models import User3from django.urls import reverse4from django.contrib.auth.models import Permission5class TasksTestMethods(TestCase):6 def setUp(self):7 # Create user8 user = User.objects.create_user('temp', 'temp@temp.tld', 'temppass')9 user.first_name = 'temp_first'10 user.last_name = 'temp_last'11 user.save()12 # login with user13 self.client.login(username='temp', password='temppass')14 def test_task_list_permission(self):15 "User should only access task list if view permission is set"16 user = User.objects.get(username='temp')17 18 response = self.client.get(reverse('tasks:task_list'))19 self.assertEqual(response.status_code, 403)20 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))21 response = self.client.get(reverse('tasks:task_list'))22 self.assertEqual(response.status_code, 200)23 def test_run_subscription_task_permission(self):24 "User should only access subscription task if view, run and subscription_task permissions are set"25 user = User.objects.get(username='temp')26 27 response = self.client.get(reverse('tasks:apply_subscriptions'))28 self.assertEqual(response.status_code, 403)29 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))30 response = self.client.get(reverse('tasks:apply_subscriptions'))31 self.assertEqual(response.status_code, 403)32 user.user_permissions.remove(Permission.objects.get(codename='view_tasks'))33 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))34 response = self.client.get(reverse('tasks:apply_subscriptions'))35 self.assertEqual(response.status_code, 403)36 user.user_permissions.remove(Permission.objects.get(codename='run_tasks'))37 user.user_permissions.add(Permission.objects.get(codename='run_subscription_task'))38 response = self.client.get(reverse('tasks:apply_subscriptions'))39 self.assertEqual(response.status_code, 403)40 user.user_permissions.remove(Permission.objects.get(codename='run_subscription_task'))41 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))42 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))43 response = self.client.get(reverse('tasks:apply_subscriptions'))44 self.assertEqual(response.status_code, 403)45 user.user_permissions.remove(Permission.objects.get(codename='view_tasks'))46 user.user_permissions.remove(Permission.objects.get(codename='run_tasks'))47 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))48 user.user_permissions.add(Permission.objects.get(codename='run_subscription_task'))49 response = self.client.get(reverse('tasks:apply_subscriptions'))50 self.assertEqual(response.status_code, 403)51 user.user_permissions.remove(Permission.objects.get(codename='run_tasks'))52 user.user_permissions.remove(Permission.objects.get(codename='run_subscription_task'))53 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))54 user.user_permissions.add(Permission.objects.get(codename='run_subscription_task'))55 response = self.client.get(reverse('tasks:apply_subscriptions'))56 self.assertEqual(response.status_code, 403)57 user.user_permissions.remove(Permission.objects.get(codename='view_tasks'))58 user.user_permissions.remove(Permission.objects.get(codename='run_subscription_task'))59 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))60 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))61 user.user_permissions.add(Permission.objects.get(codename='run_subscription_task'))62 response = self.client.get(reverse('tasks:apply_subscriptions'))63 self.assertEqual(response.status_code, 400)64 def test_run_closure_task_permission(self):65 "User should only access closure task if view, run and closure_task permissions are set"66 user = User.objects.get(username='temp')67 68 response = self.client.get(reverse('tasks:apply_annualclosure'))69 self.assertEqual(response.status_code, 403)70 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))71 response = self.client.get(reverse('tasks:apply_annualclosure'))72 self.assertEqual(response.status_code, 403)73 user.user_permissions.remove(Permission.objects.get(codename='view_tasks'))74 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))75 response = self.client.get(reverse('tasks:apply_annualclosure'))76 self.assertEqual(response.status_code, 403)77 user.user_permissions.remove(Permission.objects.get(codename='run_tasks'))78 user.user_permissions.add(Permission.objects.get(codename='run_closure_task'))79 response = self.client.get(reverse('tasks:apply_annualclosure'))80 self.assertEqual(response.status_code, 403)81 user.user_permissions.remove(Permission.objects.get(codename='run_closure_task'))82 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))83 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))84 response = self.client.get(reverse('tasks:apply_annualclosure'))85 self.assertEqual(response.status_code, 403)86 user.user_permissions.remove(Permission.objects.get(codename='view_tasks'))87 user.user_permissions.remove(Permission.objects.get(codename='run_tasks'))88 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))89 user.user_permissions.add(Permission.objects.get(codename='run_closure_task'))90 response = self.client.get(reverse('tasks:apply_annualclosure'))91 self.assertEqual(response.status_code, 403)92 user.user_permissions.remove(Permission.objects.get(codename='run_tasks'))93 user.user_permissions.remove(Permission.objects.get(codename='run_closure_task'))94 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))95 user.user_permissions.add(Permission.objects.get(codename='run_closure_task'))96 response = self.client.get(reverse('tasks:apply_annualclosure'))97 self.assertEqual(response.status_code, 403)98 user.user_permissions.remove(Permission.objects.get(codename='view_tasks'))99 user.user_permissions.remove(Permission.objects.get(codename='run_closure_task'))100 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))101 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))102 user.user_permissions.add(Permission.objects.get(codename='run_closure_task'))103 response = self.client.get(reverse('tasks:apply_annualclosure'))104 self.assertEqual(response.status_code, 400)105 106 def test_run_delete_terminated_members_task_permission(self):107 "User should only access delete terminated members task if view, run and delete_terminated_members_task permissions are set"108 user = User.objects.get(username='temp')109 110 response = self.client.get(reverse('tasks:delete_terminated_members'))111 self.assertEqual(response.status_code, 403)112 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))113 response = self.client.get(reverse('tasks:delete_terminated_members'))114 self.assertEqual(response.status_code, 403)115 user.user_permissions.remove(Permission.objects.get(codename='view_tasks'))116 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))117 response = self.client.get(reverse('tasks:delete_terminated_members'))118 self.assertEqual(response.status_code, 403)119 user.user_permissions.remove(Permission.objects.get(codename='run_tasks'))120 user.user_permissions.add(Permission.objects.get(codename='run_delete_terminated_members_task'))121 response = self.client.get(reverse('tasks:delete_terminated_members'))122 self.assertEqual(response.status_code, 403)123 user.user_permissions.remove(Permission.objects.get(codename='run_delete_terminated_members_task'))124 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))125 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))126 response = self.client.get(reverse('tasks:delete_terminated_members'))127 self.assertEqual(response.status_code, 403)128 user.user_permissions.remove(Permission.objects.get(codename='view_tasks'))129 user.user_permissions.remove(Permission.objects.get(codename='run_tasks'))130 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))131 user.user_permissions.add(Permission.objects.get(codename='run_delete_terminated_members_task'))132 response = self.client.get(reverse('tasks:delete_terminated_members'))133 self.assertEqual(response.status_code, 403)134 user.user_permissions.remove(Permission.objects.get(codename='run_tasks'))135 user.user_permissions.remove(Permission.objects.get(codename='run_delete_terminated_members_task'))136 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))137 user.user_permissions.add(Permission.objects.get(codename='run_delete_terminated_members_task'))138 response = self.client.get(reverse('tasks:delete_terminated_members'))139 self.assertEqual(response.status_code, 403)140 user.user_permissions.remove(Permission.objects.get(codename='view_tasks'))141 user.user_permissions.remove(Permission.objects.get(codename='run_delete_terminated_members_task'))142 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))143 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))144 user.user_permissions.add(Permission.objects.get(codename='run_delete_terminated_members_task'))145 response = self.client.get(reverse('tasks:delete_terminated_members'))146 self.assertEqual(response.status_code, 400)147 def test_run_delete_report_data_task_permission(self):148 "User should only access delete report data task if view, run and delete_report_data_task permissions are set"149 user = User.objects.get(username='temp')150 151 response = self.client.get(reverse('tasks:delete_report_data'))152 self.assertEqual(response.status_code, 403)153 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))154 response = self.client.get(reverse('tasks:delete_report_data'))155 self.assertEqual(response.status_code, 403)156 user.user_permissions.remove(Permission.objects.get(codename='view_tasks'))157 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))158 response = self.client.get(reverse('tasks:delete_report_data'))159 self.assertEqual(response.status_code, 403)160 user.user_permissions.remove(Permission.objects.get(codename='run_tasks'))161 user.user_permissions.add(Permission.objects.get(codename='run_delete_report_data_task'))162 response = self.client.get(reverse('tasks:delete_report_data'))163 self.assertEqual(response.status_code, 403)164 user.user_permissions.remove(Permission.objects.get(codename='run_delete_report_data_task'))165 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))166 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))167 response = self.client.get(reverse('tasks:delete_report_data'))168 self.assertEqual(response.status_code, 403)169 user.user_permissions.remove(Permission.objects.get(codename='view_tasks'))170 user.user_permissions.remove(Permission.objects.get(codename='run_tasks'))171 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))172 user.user_permissions.add(Permission.objects.get(codename='run_delete_report_data_task'))173 response = self.client.get(reverse('tasks:delete_report_data'))174 self.assertEqual(response.status_code, 403)175 user.user_permissions.remove(Permission.objects.get(codename='run_tasks'))176 user.user_permissions.remove(Permission.objects.get(codename='run_delete_report_data_task'))177 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))178 user.user_permissions.add(Permission.objects.get(codename='run_delete_report_data_task'))179 response = self.client.get(reverse('tasks:delete_report_data'))180 self.assertEqual(response.status_code, 403)181 user.user_permissions.remove(Permission.objects.get(codename='view_tasks'))182 user.user_permissions.remove(Permission.objects.get(codename='run_delete_report_data_task'))183 user.user_permissions.add(Permission.objects.get(codename='view_tasks'))184 user.user_permissions.add(Permission.objects.get(codename='run_tasks'))185 user.user_permissions.add(Permission.objects.get(codename='run_delete_report_data_task'))186 response = self.client.get(reverse('tasks:delete_report_data'))...

Full Screen

Full Screen

test_functional_component_hooks.py

Source:test_functional_component_hooks.py Github

copy

Full Screen

...15 def foo():16 ...17 with self.tree:18 tc()19 self.tree.run_tasks()20 set_state(True)21 with self.assertRaises(HookSequenceError):22 self.tree.run_tasks()23 def test_when_render_less_hooks(self):24 set_state = None25 @functional_component26 def tc():27 nonlocal set_state28 state, set_state = use_state(True)29 if state:30 @use_effect31 def foo():32 ...33 with self.tree:34 tc()35 self.tree.run_tasks()36 set_state(False)37 with self.assertRaises(HookSequenceError):38 self.tree.run_tasks()39 def test_when_render_different_hooks(self):40 set_state = None41 @functional_component42 def tc():43 nonlocal set_state44 state, set_state = use_state(False)45 if state:46 @use_effect47 def foo():48 ...49 else:50 use_state()51 with self.tree:52 tc()53 self.tree.run_tasks()54 set_state(True)55 with self.assertRaises(HookSequenceError):56 self.tree.run_tasks()57class UseSelfTest(TreeTestCase):58 def test_use_self(self):59 component_self = None60 @functional_component61 def tc():62 nonlocal component_self63 component_self = use_self()64 with self.tree:65 tc()66 self.tree.run_tasks()67 self.assertIs(component_self, self.tree.root)68class UseStateTest(TreeTestCase):69 def test_use_state(self):70 set_state = None71 @functional_component72 def tc():73 nonlocal set_state74 state, set_state = use_state('default state')75 Component(test_prop=state).insert()76 with self.tree:77 tc()78 self.tree.run_tasks()79 self.assertTrue(callable(set_state))80 self.assertEqual(81 list(self.tree.root.mounted_children())[0].props['test_prop'],82 'default state'83 )84 set_state('new state')85 prev_set_state = set_state86 n_run = self.tree.run_tasks()87 self.assertGreater(n_run, 0)88 self.assertEqual(set_state, prev_set_state)89 self.assertEqual(90 list(self.tree.root.mounted_children())[0].props['test_prop'],91 'new state'92 )93class UseToggleTest(TreeTestCase):94 def test_use_toggle(self):95 toggle = None96 @functional_component97 def tc():98 nonlocal toggle99 state, toggle = use_toggle()100 if state:101 fragment()102 with self.tree:103 tc()104 self.tree.run_tasks()105 self.assertTrue(callable(toggle))106 self.assertEqual(107 len(list(self.tree.root.mounted_children())),108 0109 )110 prev_toggle = toggle111 toggle()112 self.tree.run_tasks()113 self.assertEqual(toggle, prev_toggle)114 self.assertEqual(115 len(list(self.tree.root.mounted_children())),116 1117 )118 prev_toggle = toggle119 toggle()120 self.tree.run_tasks()121 self.assertEqual(toggle, prev_toggle)122 self.assertEqual(123 len(list(self.tree.root.mounted_children())),124 0125 )126class UsePreviousTest(TreeTestCase):127 def test_use_previous(self):128 set_state = None129 track_change = Mock()130 @functional_component131 def tc():132 nonlocal set_state133 state, set_state = use_state('initial state')134 prev = use_previous(state, 'initial prev')135 if prev != state:136 track_change(prev, state)137 with self.tree:138 tc()139 self.tree.run_tasks()140 track_change.assert_called_with('initial prev', 'initial state')141 track_change.reset_mock()142 for _ in range(2):143 self.tree.root.enqueue_update()144 self.tree.run_tasks()145 track_change.assert_not_called()146 set_state('intermediate state')147 set_state('new state')148 self.tree.run_tasks()149 track_change.assert_called_with('initial state', 'new state')150 track_change.reset_mock()151 for _ in range(2):152 self.tree.root.enqueue_update()153 self.tree.run_tasks()154 track_change.assert_not_called()155class UseRefTest(TreeTestCase):156 def test_use_ref(self):157 ref1, ref2 = None, None158 @functional_component159 def fc():160 nonlocal ref1, ref2161 ref1 = use_ref()162 ref2 = use_ref()163 with self.tree:164 fc()165 self.tree.run_tasks()166 self.assertIsInstance(ref1, Ref)167 self.assertIsInstance(ref2, Ref)168 self.assertIsNot(ref1, ref2)169 prev1, prev2 = ref1, ref2170 ref1, ref2 = None, None171 self.tree.root.enqueue_update()172 self.tree.run_tasks()173 self.assertIs(ref1, prev1)174 self.assertIs(ref2, prev2)175class UseEffectTest(TreeTestCase):176 def test_apply_effect(self):177 effect = Mock()178 @functional_component179 def tc():180 @use_effect181 def fx():182 effect()183 with self.tree:184 tc()185 effect.assert_not_called()186 self.tree.run_tasks()187 effect.assert_called_once_with()188 effect.reset_mock()189 def test_rollback_on_unmount(self):190 rollback = Mock()191 @functional_component192 def tc():193 @use_effect194 def fx():195 return rollback196 with self.tree:197 tc()198 self.tree.run_tasks()199 rollback.assert_not_called()200 with self.tree:201 fragment()202 self.tree.run_tasks()203 rollback.assert_called_once_with()204 def test_rollback_and_reapply(self):205 effect, rollback = Mock(), Mock()206 set_state = None207 @functional_component208 def tc():209 nonlocal set_state210 state, set_state = use_state('initial state')211 @use_effect(state)212 def fx():213 effect(state)214 return lambda: rollback(state)215 with self.tree:216 tc()217 self.tree.run_tasks()218 effect.assert_called_once_with('initial state')219 rollback.assert_not_called()220 effect.reset_mock()221 self.tree.root.enqueue_update()222 self.tree.run_tasks()223 effect.assert_not_called()224 rollback.assert_not_called()225 set_state('new state')226 self.tree.run_tasks()227 effect.assert_called_once_with('new state')228 rollback.assert_called_once_with('initial state')229class UseCallbackProxyTest(TreeTestCase):230 def test_use_callback_proxy(self):231 proxy, set_state = None, None232 cb = Mock()233 @functional_component234 def tc():235 nonlocal set_state, proxy236 state, set_state = use_state('initial state')237 proxy = use_callback_proxy(lambda a: cb(state, a))238 with self.tree:239 tc()240 self.tree.run_tasks()241 self.assertTrue(callable(proxy))242 first_proxy = proxy243 proxy('arg 1')244 cb.assert_called_once_with('initial state', 'arg 1')245 cb.reset_mock()246 set_state('new state')247 self.tree.run_tasks()248 self.assertIs(proxy, first_proxy)249 proxy('arg 2')250 cb.assert_called_once_with('new state', 'arg 2')251class UseCallbackTest(TreeTestCase):252 def test_use_callback(self):253 set_state1, set_state2 = None, None254 cb = None255 fn = Mock()256 @functional_component257 def tc():258 nonlocal set_state1, set_state2, cb259 state1, set_state1 = use_state('initial state 1')260 state2, set_state2 = use_state('initial state 2')261 @use_callback(state1)262 def foo(arg):263 fn(state1, state2, arg)264 cb = foo265 with self.tree:266 tc()267 self.tree.run_tasks()268 self.assertTrue(callable(cb))269 cb('arg 1')270 fn.assert_called_once_with('initial state 1', 'initial state 2', 'arg 1')271 fn.reset_mock()272 first_cb = cb273 cb = None274 set_state2('new state 2')275 self.tree.run_tasks()276 self.assertIs(cb, first_cb)277 cb('arg 2')278 fn.assert_called_once_with('initial state 1', 'initial state 2', 'arg 2')279 fn.reset_mock()280 set_state1('new state 1')281 self.tree.run_tasks()282 self.assertIsNot(cb, first_cb)283 cb('arg 3')284 fn.assert_called_once_with('new state 1', 'new state 2', 'arg 3')285class UseMemoTest(TreeTestCase):286 def test_use_memo(self):287 value = None288 set_state1, set_state2 = None, None289 @functional_component290 def tc():291 nonlocal value, set_state1, set_state2292 state1, set_state1 = use_state('initial state 1')293 state2, set_state2 = use_state('initial state 2')294 @use_memo(state1)295 def memo():296 return [state1, state2]297 value = memo298 with self.tree:299 tc()300 self.tree.run_tasks()301 self.assertEqual(302 value,303 ['initial state 1', 'initial state 2']304 )305 first_value = value306 value = None307 set_state2('new state 2')308 self.tree.run_tasks()309 self.assertIs(first_value, value)310 set_state1('new state 1')311 self.tree.run_tasks()312 self.assertIsNot(value, first_value)313 self.assertEqual(314 value,315 ['new state 1', 'new state 2']...

Full Screen

Full Screen

test_async_threads.py

Source:test_async_threads.py Github

copy

Full Screen

...6 """7 Solve no tasks (with threads)8 """9 from arbiter.async import run_tasks10 results = run_tasks((), 2)11 assert_equals(results.completed, frozenset())12 assert_equals(results.failed, frozenset())13def test_no_dependencies():14 """15 run dependency-less tasks (with threads)16 """17 from arbiter.async import run_tasks18 executed_tasks = set()19 def make_task(name, dependencies=(), should_succeed=True):20 """21 Make a task22 """23 from arbiter.task import create_task24 function = succeed if should_succeed else fail25 return create_task(26 lambda: executed_tasks.add(name) or function(),27 name=name,28 dependencies=dependencies29 )30 results = run_tasks(31 (32 make_task('foo'),33 make_task('bar'),34 make_task('baz'),35 make_task('fail', should_succeed=False)36 ),37 238 )39 assert_equals(executed_tasks, frozenset(('foo', 'bar', 'baz', 'fail')))40 assert_equals(results.completed, frozenset(('foo', 'bar', 'baz')))41 assert_equals(results.failed, frozenset(('fail',)))42def test_chain():43 """44 run a dependency chain (with threads)45 """46 from arbiter.async import run_tasks47 executed_tasks = set()48 def make_task(name, dependencies=(), should_succeed=True):49 """50 Make a task51 """52 from arbiter.task import create_task53 function = succeed if should_succeed else fail54 return create_task(55 lambda: executed_tasks.add(name) or function(),56 name=name,57 dependencies=dependencies58 )59 results = run_tasks(60 (61 make_task('foo'),62 make_task('bar', ('foo',)),63 make_task('baz', ('bar',), should_succeed=False),64 make_task('qux', ('baz',)),65 ),66 267 )68 assert_equals(executed_tasks, frozenset(('foo', 'bar', 'baz',)))69 assert_equals(results.completed, frozenset(('foo', 'bar',)))70 assert_equals(results.failed, frozenset(('baz', 'qux')))71def test_tree():72 """73 run a dependency tree (with threads)74 """75 from arbiter.async import run_tasks76 executed_tasks = set()77 def make_task(name, dependencies=(), should_succeed=True):78 """79 Make a task80 """81 from arbiter.task import create_task82 function = succeed if should_succeed else fail83 return create_task(84 lambda: executed_tasks.add(name) or function(),85 name=name,86 dependencies=dependencies87 )88 results = run_tasks(89 (90 make_task('foo'),91 make_task('bar', ('foo',)),92 make_task('baz', ('bar',), should_succeed=False),93 make_task('qux', ('baz',)),94 make_task('bell', ('bar',)),95 make_task('alugosi', ('bell',), should_succeed=False),96 make_task('lorem'),97 make_task('ipsum', ('lorem',)),98 make_task('ouroboros', ('ouroboros',)),99 make_task('tick', ('tock',)),100 make_task('tock', ('tick',)),101 make_task('success', ('foo', 'lorem')),102 make_task('failed', ('qux', 'lorem')),103 ),104 2105 )106 assert_equals(107 executed_tasks,108 frozenset(109 (110 'foo', 'bar', 'baz', 'bell', 'alugosi',111 'lorem', 'ipsum', 'success'112 )113 )114 )115 assert_equals(116 results.completed,117 frozenset(118 ('foo', 'bar', 'bell', 'lorem', 'ipsum', 'success')119 )120 )121 assert_equals(122 results.failed,123 frozenset(124 ('baz', 'qux', 'alugosi', 'ouroboros', 'tick', 'tock', 'failed')125 )126 )127def test_with_data():128 """129 Pass data.130 """131 from arbiter.async import run_tasks132 from arbiter.task import create_task133 data = [4, 5, 6]134 def myfunc(val=-1):135 """136 Modify some data which will be passed from another task.137 """138 data.append(val)139 foo = create_task(len, [1, 2], name='foo')140 bar = create_task(myfunc, name='bar', val=foo)141 results = run_tasks((foo, bar), 2)142 assert_equals(results.exceptions, [])143 assert_equals(results.completed, frozenset(('foo', 'bar')))144 assert_equals(data, [4, 5, 6, 2])145def succeed():146 """147 A task that succeeds148 """149 return True150def fail():151 """152 A task that fails153 """...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Lemoncheesecake automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful