How to use test_drainer method in autotest

Best Python code snippet using autotest_python

test_asynchronous.py

Source:test_asynchronous.py Github

copy

Full Screen

1import os2import socket3import time4import threading5import pytest6from case import patch, skip, Mock7from vine import promise8from celery.backends.asynchronous import BaseResultConsumer9from celery.backends.base import Backend10from celery.utils import cached_property11@pytest.fixture(autouse=True)12def setup_eventlet():13 # By default eventlet will patch the DNS resolver when imported.14 os.environ.update(EVENTLET_NO_GREENDNS='yes')15class DrainerTests(object):16 """17 Base test class for the Default / Gevent / Eventlet drainers.18 """19 interval = 0.1 # Check every tenth of a second20 MAX_TIMEOUT = 10 # Specify a max timeout so it doesn't run forever21 def get_drainer(self, environment):22 with patch('celery.backends.asynchronous.detect_environment') as d:23 d.return_value = environment24 backend = Backend(self.app)25 consumer = BaseResultConsumer(backend, self.app, backend.accept,26 pending_results={},27 pending_messages={})28 consumer.drain_events = Mock(side_effect=self.result_consumer_drain_events)29 return consumer.drainer30 @pytest.fixture(autouse=True)31 def setup_drainer(self):32 raise NotImplementedError33 @cached_property34 def sleep(self):35 """36 Sleep on the event loop.37 """38 raise NotImplementedError39 def schedule_thread(self, thread):40 """41 Set up a thread that runs on the event loop.42 """43 raise NotImplementedError44 def teardown_thread(self, thread):45 """46 Wait for a thread to stop.47 """48 raise NotImplementedError49 def result_consumer_drain_events(self, timeout=None):50 """51 Subclasses should override this method to define the behavior of52 drainer.result_consumer.drain_events.53 """54 raise NotImplementedError55 def test_drain_checks_on_interval(self):56 p = promise()57 def fulfill_promise_thread():58 self.sleep(self.interval * 2)59 p('done')60 fulfill_thread = self.schedule_thread(fulfill_promise_thread)61 on_interval = Mock()62 for _ in self.drainer.drain_events_until(p,63 on_interval=on_interval,64 interval=self.interval,65 timeout=self.MAX_TIMEOUT):66 pass67 self.teardown_thread(fulfill_thread)68 assert p.ready, 'Should have terminated with promise being ready'69 assert on_interval.call_count < 20, 'Should have limited number of calls to on_interval'70 def test_drain_does_not_block_event_loop(self):71 """72 This test makes sure that other greenlets can still operate while drain_events_until is73 running.74 """75 p = promise()76 liveness_mock = Mock()77 def fulfill_promise_thread():78 self.sleep(self.interval * 2)79 p('done')80 def liveness_thread():81 while 1:82 if p.ready:83 return84 self.sleep(self.interval / 10)85 liveness_mock()86 fulfill_thread = self.schedule_thread(fulfill_promise_thread)87 liveness_thread = self.schedule_thread(liveness_thread)88 on_interval = Mock()89 for _ in self.drainer.drain_events_until(p,90 on_interval=on_interval,91 interval=self.interval,92 timeout=self.MAX_TIMEOUT):93 pass94 self.teardown_thread(fulfill_thread)95 self.teardown_thread(liveness_thread)96 assert p.ready, 'Should have terminated with promise being ready'97 assert on_interval.call_count <= liveness_mock.call_count, \98 'Should have served liveness_mock while waiting for event'99 def test_drain_timeout(self):100 p = promise()101 on_interval = Mock()102 with pytest.raises(socket.timeout):103 for _ in self.drainer.drain_events_until(p,104 on_interval=on_interval,105 interval=self.interval,106 timeout=self.interval * 5):107 pass108 assert not p.ready, 'Promise should remain un-fulfilled'109 assert on_interval.call_count < 20, 'Should have limited number of calls to on_interval'110@skip.unless_module('eventlet')111class test_EventletDrainer(DrainerTests):112 @pytest.fixture(autouse=True)113 def setup_drainer(self):114 self.drainer = self.get_drainer('eventlet')115 @cached_property116 def sleep(self):117 from eventlet import sleep118 return sleep119 def result_consumer_drain_events(self, timeout=None):120 import eventlet121 eventlet.sleep(0)122 def schedule_thread(self, thread):123 import eventlet124 g = eventlet.spawn(thread)125 eventlet.sleep(0)126 return g127 def teardown_thread(self, thread):128 thread.wait()129class test_Drainer(DrainerTests):130 @pytest.fixture(autouse=True)131 def setup_drainer(self):132 self.drainer = self.get_drainer('default')133 @cached_property134 def sleep(self):135 from time import sleep136 return sleep137 def result_consumer_drain_events(self, timeout=None):138 time.sleep(timeout)139 def schedule_thread(self, thread):140 t = threading.Thread(target=thread)141 t.start()142 return t143 def teardown_thread(self, thread):144 thread.join()145@skip.unless_module('gevent')146class test_GeventDrainer(DrainerTests):147 @pytest.fixture(autouse=True)148 def setup_drainer(self):149 self.drainer = self.get_drainer('gevent')150 @cached_property151 def sleep(self):152 from gevent import sleep153 return sleep154 def result_consumer_drain_events(self, timeout=None):155 import gevent156 gevent.sleep(0)157 def schedule_thread(self, thread):158 import gevent159 g = gevent.spawn(thread)160 gevent.sleep(0)161 return g162 def teardown_thread(self, thread):163 import gevent...

Full Screen

Full Screen

test_drainer.py

Source:test_drainer.py Github

copy

Full Screen

1# Global imports2import unittest3import numpy as np4from scipy.sparse import csr_matrix, eye5# Local import6from firing_graph.graph import FiringGraph, create_empty_matrices7from firing_graph.drainer import FiringGraphDrainer8from firing_graph.servers import ArrayServer9from .data import d_graphs, d_signals10class TestDrainer(unittest.TestCase):11 """12 """13 def setUp(self):14 d_matrices = self.build_matrices(d_graphs[9]['Iw'], d_graphs[9]['Im'])15 server = ArrayServer(16 csr_matrix(d_signals[11]['input'], dtype=bool), csr_matrix(d_signals[11]['got'], dtype=bool)17 )18 fg = FiringGraph(19 'test', np.array(d_graphs[9]['levels']), d_matrices, input_partitions=d_graphs[4]['input_partitions']20 )21 self.signal1 = self.build_signals(d_signals[11])22 self.drainer = FiringGraphDrainer(23 fg, server, d_signals[11]['bs'], penalties=d_signals[11]['p'], rewards=d_signals[11]['r']24 )25 @staticmethod26 def build_matrices(l_inputs, l_mask_inputs):27 # init matrices28 n_outputs = len(l_inputs)29 d_matrices = create_empty_matrices(len(l_inputs[0]), len(l_inputs), n_outputs, write_mode=False)30 # Set matrices31 d_matrices['Iw'] = csr_matrix(l_inputs).T.tocsr()32 d_matrices['Im'] = csr_matrix(l_mask_inputs, dtype=bool).T.tocsr()33 d_matrices['O'] = eye(n_outputs, format='csr', dtype=bool)34 return d_matrices35 @staticmethod36 def build_signals(d_signals):37 return {38 'o': csr_matrix(d_signals['out'], dtype=bool),39 'cb': csr_matrix(d_signals['cb'], dtype=np.int32),40 'e': d_signals['expected']41 }42 def test_drain(self):43 """44 python -m unittest tests.unit.test_drainer.TestDrainer.test_drain45 """46 self.drainer.drain()47 self.assertTrue((np.array(self.signal1['e']['Iw']).T == self.drainer.firing_graph.matrices['Iw']).all())48 self.assertTrue((np.array(self.signal1['e']['Ic']).T == self.drainer.firing_graph.backward_firing).all())...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful