Best Python code snippet using lisa_python
test_operations.py
Source:test_operations.py  
...9from .. import relations10from .. import operations11from .. import exceptions12from . import helpers13async def create_timer(owner_id, entity_id, type, speed=4.5, border=450, callback_data=None, resources=0):14    if callback_data is None:15        callback_data = 'data_{}_{}_{}'.format(owner_id, entity_id, type)16    result = await operations.create_timer(owner_id=owner_id,17                                           entity_id=entity_id,18                                           type=type,19                                           speed=speed,20                                           border=border,21                                           resources=resources,22                                           callback_data=callback_data)23    return result24class CreateTimerTests(helpers.BaseTests):25    @test_utils.unittest_run_loop26    async def test_success(self):27        timer = await create_timer(1, 2, 3, resources=0)28        results = await db.sql('SELECT * FROM timers')29        self.assertEqual(len(results), 1)30        self.assertEqual(results[0]['owner'], 1)31        self.assertEqual(results[0]['entity'], 2)32        self.assertEqual(results[0]['type'], 3)33        self.assertEqual(results[0]['speed'], 4.5)34        self.assertEqual(results[0]['border'], 450)35        self.assertEqual(results[0]['resources'], 0)36        self.assertEqual(results[0]['finish_at'], results[0]['resources_at'] + datetime.timedelta(seconds=450/4.5))37        self.assertEqual(results[0]['data'], {'callback_data': 'data_1_2_3'})38        self.assertEqual(operations.TIMERS_QUEUE.first(), (timer.id, timer.finish_at))39    @test_utils.unittest_run_loop40    async def test_success__initial_resources(self):41        timer = await create_timer(1, 2, 3, resources=4.5*10)42        results = await db.sql('SELECT * FROM timers')43        self.assertEqual(len(results), 1)44        self.assertEqual(results[0]['owner'], 1)45        self.assertEqual(results[0]['entity'], 2)46        self.assertEqual(results[0]['type'], 3)47        self.assertEqual(results[0]['speed'], 4.5)48        self.assertEqual(results[0]['border'], 450)49        self.assertEqual(results[0]['resources'], 45)50        self.assertEqual(results[0]['finish_at'], results[0]['resources_at'] + datetime.timedelta(seconds=(450-45)/4.5))51        self.assertEqual(results[0]['data'], {'callback_data': 'data_1_2_3'})52        self.assertEqual(operations.TIMERS_QUEUE.first(), (timer.id, timer.finish_at))53    @test_utils.unittest_run_loop54    async def test_already_exists(self):55        timer = await create_timer(1, 2, 3)56        with self.assertRaises(exceptions.TimerAlreadyExists):57            await create_timer(1, 2, 3)58        results = await db.sql('SELECT * FROM timers')59        self.assertEqual(len(results), 1)60        self.assertEqual(len(operations.TIMERS_QUEUE), 1)61        self.assertEqual(operations.TIMERS_QUEUE.first(), (timer.id, timer.finish_at))62    @test_utils.unittest_run_loop63    async def test_multiple_timers(self):64        timer_1 = await create_timer(1, 2, 3)65        timer_2 = await create_timer(10, 20, 30)66        results = await db.sql('SELECT * FROM timers')67        self.assertEqual(len(results), 2)68        self.assertEqual(len(operations.TIMERS_QUEUE), 2)69        self.assertEqual(operations.TIMERS_QUEUE.pop(), (timer_1.id, timer_1.finish_at))70        self.assertEqual(operations.TIMERS_QUEUE.pop(), (timer_2.id, timer_2.finish_at))71class ChangeSpeedTests(helpers.BaseTests):72    @test_utils.unittest_run_loop73    async def test_success(self):74        await create_timer(1, 2, 3, resources=10, speed=10, border=100)75        time.sleep(0.1)76        timer = await operations.change_speed(1, 2, 3, speed=20)77        results = await db.sql('SELECT * FROM timers')78        self.assertEqual(len(results), 1)79        self.assertEqual(timer.owner_id, 1)80        self.assertEqual(timer.entity_id, 2)81        self.assertEqual(timer.type, 3)82        self.assertEqual(timer.speed, 20)83        self.assertEqual(timer.border, 100)84        self.assertEqual(round(timer.resources), 11)85        finish_at_delta = (timer.finish_at - (timer.resources_at + datetime.timedelta(seconds=(100-11)/20))).total_seconds()86        self.assertTrue(abs(finish_at_delta) < 0.01)87        self.assertEqual(operations.TIMERS_QUEUE.first(), (timer.id, timer.finish_at))88        results = await db.sql('SELECT * FROM timers')89        self.assertEqual(len(results), 1)90        self.assertEqual(results[0]['owner'], timer.owner_id)91        self.assertEqual(results[0]['entity'], timer.entity_id)92        self.assertEqual(results[0]['type'], timer.type)93        self.assertEqual(results[0]['speed'], timer.speed)94        self.assertEqual(results[0]['border'], timer.border)95        self.assertEqual(results[0]['resources'], timer.resources)96        self.assertEqual(results[0]['resources_at'].replace(tzinfo=None), timer.resources_at)97        self.assertEqual(results[0]['finish_at'].replace(tzinfo=None), timer.finish_at)98        self.assertEqual(results[0]['data'], {'callback_data': 'data_1_2_3'})99    @test_utils.unittest_run_loop100    async def test_no_timer(self):101        await create_timer(1, 2, 3, resources=10, speed=10, border=100)102        with self.assertRaises(exceptions.TimerNotFound):103            await operations.change_speed(1, 2, 4, speed=20)104class LoadAllTimersTests(helpers.BaseTests):105    @test_utils.unittest_run_loop106    async def test_no_timers(self):107        await operations.load_all_timers()108        self.assertTrue(operations.TIMERS_QUEUE.empty())109    @test_utils.unittest_run_loop110    async def test_has_timers(self):111        timer_1 = await create_timer(1, 2, 3, speed=1)112        timer_2 = await create_timer(10, 2, 3, speed=4)113        timer_3 = await create_timer(1, 20, 3, speed=3)114        timer_4 = await create_timer(1, 2, 30, speed=2)115        operations.TIMERS_QUEUE.clean()116        self.assertTrue(operations.TIMERS_QUEUE.empty())117        await operations.load_all_timers()118        self.assertEqual(len(operations.TIMERS_QUEUE), 4)119        self.assertEqual(operations.TIMERS_QUEUE.pop(), (timer_2.id, timer_2.finish_at))120        self.assertEqual(operations.TIMERS_QUEUE.pop(), (timer_3.id, timer_3.finish_at))121        self.assertEqual(operations.TIMERS_QUEUE.pop(), (timer_4.id, timer_4.finish_at))122        self.assertEqual(operations.TIMERS_QUEUE.pop(), (timer_1.id, timer_1.finish_at))123class FakeScheduler(object):124    def __init__(self):125        self.timers = []126    def __call__(self, callback, timer_id, config):127        self.timers.append(timer_id)128class FinishCompletedTimersTests(helpers.BaseTests):129    @test_utils.unittest_run_loop130    async def test_no_timers(self):131        scheduler = FakeScheduler()132        operations.finish_completed_timers(scheduler, {})133        self.assertEqual(scheduler.timers, [])134    @test_utils.unittest_run_loop135    async def test_has_timers(self):136        timer_1 = await create_timer(1, 2, 3, speed=1)137        timer_2 = await create_timer(10, 2, 3, speed=100000)138        timer_3 = await create_timer(1, 20, 3, speed=10000000000000)139        timer_4 = await create_timer(1, 2, 30, speed=2)140        time.sleep(0.01)141        scheduler = FakeScheduler()142        operations.finish_completed_timers(scheduler, {})143        self.assertEqual(scheduler.timers, [timer_3.id, timer_2.id])144    @test_utils.unittest_run_loop145    async def test_process_all(self):146        timer_1 = await create_timer(1, 2, 3, speed=10000000000000)147        timer_2 = await create_timer(10, 2, 3, speed=100000)148        timer_3 = await create_timer(1, 20, 3, speed=10000000000000)149        time.sleep(0.01)150        scheduler = FakeScheduler()151        operations.finish_completed_timers(scheduler, {})152        self.assertEqual(scheduler.timers, [timer_1.id, timer_3.id, timer_2.id])153class FakeCallback(object):154    def __init__(self, results, delay=0):155        self.calls = []156        self.results = list(results)157        self.delay = delay158    async def __call__(self, **kwargs):159        if self.delay > 0:160            asyncio.sleep(self.delay)161        self.calls.append(kwargs)162        return self.results.pop(0)163class FinishTimerTests(helpers.BaseTests):164    def setUp(self):165        super().setUp()166        self.config = helpers.get_config()['custom']167    @test_utils.unittest_run_loop168    async def test_no_timer(self):169        callback = FakeCallback(results=[(True, relations.POSTPROCESS_TYPE.RESTART)])170        await operations.finish_timer(timer_id=666, config=self.config, callback=callback)171        self.assertEqual(callback.calls, [])172    @test_utils.unittest_run_loop173    async def test_timer_not_finished(self):174        timer = await create_timer(1, 2, 3, speed=0.01)175        callback = FakeCallback(results=[(True, relations.POSTPROCESS_TYPE.RESTART)])176        await operations.finish_timer(timer_id=timer.id, config=self.config, callback=callback)177        self.assertEqual(callback.calls, [])178        self.assertEqual(len(operations.TIMERS_QUEUE), 1)179    @test_utils.unittest_run_loop180    async def test_callback_successed(self):181        timer = await create_timer(1, 2, 3, speed=10000000)182        operations.TIMERS_QUEUE.pop()183        time.sleep(0.01)184        callback = FakeCallback(results=[(True, relations.POSTPROCESS_TYPE.RESTART)])185        async def postprocess(**kwargs):186            pass187        await operations.finish_timer(timer_id=timer.id,188                                      config=self.config,189                                      callback=callback,190                                      postprocess=postprocess)191        self.assertEqual(callback.calls, [{'data': 'data_1_2_3',192                                           'secret': 'test.secret',193                                           'url': 'http://example.com/3',194                                           'timer': timer}])195        self.assertTrue(operations.TIMERS_QUEUE.empty())196    @test_utils.unittest_run_loop197    async def test_race(self):198        timer = await create_timer(1, 2, 3, speed=10000000)199        operations.TIMERS_QUEUE.pop()200        time.sleep(0.01)201        callback = FakeCallback(results=[(True, relations.POSTPROCESS_TYPE.REMOVE),202                                         (True, relations.POSTPROCESS_TYPE.REMOVE)])203        task_1 = operations.finish_timer(timer_id=timer.id,204                                         config=self.config,205                                         callback=callback)206        task_2 = operations.finish_timer(timer_id=timer.id,207                                         config=self.config,208                                         callback=callback)209        await asyncio.gather(task_1, task_2)210        # first call must remove timer211        self.assertEqual(callback.calls, [{'data': 'data_1_2_3',212                                           'secret': 'test.secret',213                                           'url': 'http://example.com/3',214                                           'timer': timer}])215        self.assertTrue(operations.TIMERS_QUEUE.empty())216    @test_utils.unittest_run_loop217    async def test_semaphore(self):218        delays = [0.02, 0.03, 0.05, 0.07, 0.11, 0.13, 0.17, 0.19, 0.23, 0.29]219        timers = []220        for i in range(len(delays)):221            timer = await create_timer(i, 2, 3, speed=10000000)222            timers.append(timer)223        operations.TIMERS_QUEUE.pop()224        time.sleep(0.01)225        storage = set()226        history = []227        def create_callback(delay):228            async def callback(*argv, **kwargs):229                uid = uuid.uuid4().hex230                storage.add(uid)231                self.assertTrue(len(storage) <= 5)232                await asyncio.sleep(delay)233                storage.remove(uid)234                history.append(len(storage))235                return True, relations.POSTPROCESS_TYPE.RESTART236            return callback237        async def postprocess(**kwargs):238            pass239        tasks = []240        random.shuffle(delays)241        for delay, timer in zip(delays, timers):242            task = operations.finish_timer(timer_id=timer.id,243                                           config=self.config,244                                           callback=create_callback(delay),245                                           postprocess=postprocess)246            tasks.append(task)247        await asyncio.gather(*tasks)248        self.assertEqual(storage, set())249        self.assertEqual(history, [4, 4, 4, 4, 4, 4, 3, 2, 1, 0])250    @test_utils.unittest_run_loop251    async def test_callback_failed(self):252        timer = await create_timer(1, 2, 1, speed=10000000)253        operations.TIMERS_QUEUE.pop()254        time.sleep(0.01)255        callback = FakeCallback(results=((False, None),256                                         (False, None),257                                         (True, relations.POSTPROCESS_TYPE.RESTART)))258        async def postprocess(**kwargs):259            pass260        await operations.finish_timer(timer_id=timer.id,261                                      config=self.config,262                                      callback=callback,263                                      postprocess=postprocess)264        self.assertEqual(callback.calls, [{'data': 'data_1_2_1',265                                           'secret': 'test.secret',266                                           'url': 'http://example.com/1',267                                           'timer': timer},268                                          {'data': 'data_1_2_1',269                                           'secret': 'test.secret',270                                           'url': 'http://example.com/1',271                                           'timer': timer},272                                          {'data': 'data_1_2_1',273                                           'secret': 'test.secret',274                                           'url': 'http://example.com/1',275                                           'timer': timer}])276        self.assertTrue(operations.TIMERS_QUEUE.empty())277    @test_utils.unittest_run_loop278    async def test_wrong_type(self):279        timer = await create_timer(1, 2, 666, speed=10000000)280        operations.TIMERS_QUEUE.pop()281        time.sleep(0.01)282        callback = FakeCallback(results=[(True, relations.POSTPROCESS_TYPE.RESTART)])283        async def postprocess(**kwargs):284            pass285        with self.assertRaises(exceptions.WrongTimerType):286            await operations.finish_timer(timer_id=timer.id,287                                          config=self.config,288                                          callback=callback,289                                          postprocess=postprocess)290        self.assertEqual(callback.calls, [])291        self.assertTrue(operations.TIMERS_QUEUE.empty())292class DoCallbackTests(helpers.BaseTests):293    @test_utils.unittest_run_loop294    async def test_wrong_format(self):295        timer = await create_timer(1, 2, 666, speed=10000000)296        operations.TIMERS_QUEUE.pop()297        async def fake_http_caller(*argv):298            return True, b'wrong data'299        success, postprocess_type = await operations.do_callback(secret='test.secret',300                                                                 url='http://example.com/1',301                                                                 timer=timer,302                                                                 data='data_x',303                                                                 http_caller=fake_http_caller)304        self.assertFalse(success)305        self.assertEqual(postprocess_type, None)306    @test_utils.unittest_run_loop307    async def test_remove(self):308        timer = await create_timer(1, 2, 666, speed=10000000)309        operations.TIMERS_QUEUE.pop()310        async def fake_http_caller(*argv):311            data = timers_pb2.CallbackAnswer(postprocess_type=timers_pb2.CallbackAnswer.PostprocessType.Value('REMOVE'))312            return True, data.SerializeToString()313        success, postprocess_type = await operations.do_callback(secret='test.secret',314                                                                 url='http://example.com/1',315                                                                 timer=timer,316                                                                 data='data_x',317                                                                 http_caller=fake_http_caller)318        self.assertTrue(success)319        self.assertEqual(postprocess_type, relations.POSTPROCESS_TYPE.REMOVE)320    @test_utils.unittest_run_loop321    async def test_restart(self):322        timer = await create_timer(1, 2, 666, speed=10000000)323        operations.TIMERS_QUEUE.pop()324        async def fake_http_caller(*argv):325            data = timers_pb2.CallbackAnswer(postprocess_type=timers_pb2.CallbackAnswer.PostprocessType.Value('RESTART'))326            return True, data.SerializeToString()327        success, postprocess_type = await operations.do_callback(secret='test.secret',328                                                                 url='http://example.com/1',329                                                                 timer=timer,330                                                                 data='data_x',331                                                                 http_caller=fake_http_caller)332        self.assertTrue(success)333        self.assertEqual(postprocess_type, relations.POSTPROCESS_TYPE.RESTART)334    @test_utils.unittest_run_loop335    async def test_wrong_command(self):336        timer = await create_timer(1, 2, 666, speed=10000000)337        operations.TIMERS_QUEUE.pop()338        async def fake_http_caller(*argv):339            data = timers_pb2.CallbackAnswer(postprocess_type=111)340            return True, data.SerializeToString()341        success, postprocess_type = await operations.do_callback(secret='test.secret',342                                                                 url='http://example.com/1',343                                                                 timer=timer,344                                                                 data='data_x',345                                                                 http_caller=fake_http_caller)346        self.assertFalse(success)347        self.assertEqual(postprocess_type, None)348class PostprocessRestartTests(helpers.BaseTests):349    @test_utils.unittest_run_loop350    async def test_continue(self):351        timer = await create_timer(1, 2, 3, speed=2, border=10)352        await operations.postprocess_timer(timer.id, relations.POSTPROCESS_TYPE.RESTART)353        results = await db.sql('SELECT * FROM timers WHERE id=%(id)s', {'id': timer.id})354        self.assertEqual(len(results), 1)355        self.assertEqual(results[0]['resources'], 0)356        self.assertEqual(results[0]['resources_at'].replace(tzinfo=None), timer.finish_at)357        self.assertEqual(results[0]['finish_at'].replace(tzinfo=None), timer.finish_at+datetime.timedelta(seconds=10/2))358class PostprocessRemoveTests(helpers.BaseTests):359    @test_utils.unittest_run_loop360    async def test_continue(self):361        timer = await create_timer(1, 2, 3, speed=2, border=10)362        await operations.postprocess_timer(timer.id, relations.POSTPROCESS_TYPE.REMOVE)363        results = await db.sql('SELECT * FROM timers WHERE id=%(id)s', {'id': timer.id})364        self.assertFalse(results)365class GetOwnerTimersTests(helpers.BaseTests):366    @test_utils.unittest_run_loop367    async def test_no_timer(self):368        await create_timer(1, 2, 3)369        await create_timer(2, 2, 3)370        timers = await operations.get_owner_timers(666)371        self.assertEqual(timers, [])372    @test_utils.unittest_run_loop373    async def test_success(self):374        timer_1 = await create_timer(1, 2, 3, speed=10)375        timer_2 = await create_timer(2, 2, 3)376        timer_3 = await create_timer(1, 3, 4, speed=100)377        timers = await operations.get_owner_timers(1)...test_counter.py
Source:test_counter.py  
...3#4from unittest import mock5from base_python.cdk.utils.event_timing import create_timer6def test_counter_init():7    with create_timer("Counter") as timer:8        assert timer.name == "Counter"9def test_counter_start_event():10    with create_timer("Counter") as timer:11        with mock.patch("base_python.cdk.utils.event_timing.EventTimer.start_event") as mock_start_event:12            timer.start_event("test_event")13            mock_start_event.assert_called_with("test_event")14def test_counter_finish_event():15    with create_timer("Counter") as timer:16        with mock.patch("base_python.cdk.utils.event_timing.EventTimer.finish_event") as mock_finish_event:17            timer.finish_event("test_event")18            mock_finish_event.assert_called_with("test_event")19def test_timer_multiple_events():20    with create_timer("Counter") as timer:21        for i in range(10):22            timer.start_event("test_event")23            timer.finish_event()24        assert timer.count == 1025def test_report_is_ordered_by_name_by_default():26    names = ["j", "b", "g", "d", "e", "f", "c", "h", "i", "a"]27    with create_timer("Source Counter") as timer:28        for name in names:29            timer.start_event(name)30            timer.finish_event()31        report = timer.report().split("\n")[1:]  # ignore the first line32        report_names = [line.split(" ")[0] for line in report]33        assert report_names == sorted(names)34def test_double_finish_is_safely_ignored():35    with create_timer("Source Counter") as timer:36        timer.start_event("test_event")37        timer.finish_event()38        timer.finish_event()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
