How to use requeue method in autotest

Best Python code snippet using autotest_python

test_client_interface.py

Source:test_client_interface.py Github

copy

Full Screen

1#!/usr/bin/env python3.82import unittest3import tibia_terminator.interface.client_interface as sut4from tibia_terminator.interface.client_interface import (5 Command,6 ThrottleBehavior,7 CommandSender,8)9from unittest import TestCase10from typing import List11class FakeStatsLogger:12 def log_action(self, debug_level: int, msg: str) -> None:13 pass14class FakeCommand(Command):15 def __init__(self, *args, **kwargs):16 super().__init__(*args, **kwargs)17 def _send(self, tibia_wid: str) -> None:18 pass19class TestClientInterface(TestCase):20 test_cmd_type: str21 default_test_cmd_id: str22 timestamps: List[int]23 def __init__(self, *args, **kwargs):24 super().__init__(*args, **kwargs)25 self.test_cmd_type = "_test_cmd_type_"26 self.default_test_cmd_id = "_test_cmd_id_"27 self.timestamps = []28 def setUp(self) -> None:29 def next_timestamp():30 if len(self.timestamps) > 0:31 return self.timestamps.pop()32 else:33 return 034 sut.timestamp_ms = next_timestamp35 def make_target(self):36 return CommandSender("_test_tibia_wid_", FakeStatsLogger(), False)37 def make_cmd(38 self,39 throttle_behavior: ThrottleBehavior,40 throttle_ms: int = 0,41 cmd_id: str = None,42 ) -> Command:43 return FakeCommand(44 cmd_type=self.test_cmd_type,45 throttle_ms=throttle_ms,46 cmd_id=cmd_id or self.default_test_cmd_id,47 throttle_behavior=throttle_behavior,48 )49 def test_fetch_next_cmd_forced_throttle(self) -> None:50 # given that a single forced command is given51 input_cmd = self.make_cmd(ThrottleBehavior.FORCE)52 # when the next command is fetched53 # then the forced command is issued54 self.check_fetch_next_cmd([input_cmd], input_cmd, [0, 1])55 def test_fetch_next_cmd_forced_throttle_1000ms(self) -> None:56 # given that a single forced command is given57 input_cmd_1 = self.make_cmd(ThrottleBehavior.FORCE)58 input_cmd_2 = self.make_cmd(ThrottleBehavior.FORCE)59 # when the next command is fetched60 # then the forced command is issued61 self.check_fetch_next_cmd([input_cmd_1, input_cmd_2], input_cmd_2, [0, 1, 2, 3])62 def test_fetch_next_cmd_forced_throttle_1000ms_drop_first(self) -> None:63 drop_cmd = self.make_cmd(ThrottleBehavior.DROP)64 foced_cmd = self.make_cmd(ThrottleBehavior.FORCE)65 self.check_fetch_next_cmd(66 # given that a dropped and forced commands are sent67 [drop_cmd, foced_cmd],68 # when the forced command is fetched69 # then the forced command is issued70 foced_cmd,71 [0, 1, 2],72 )73 def test_fetch_next_cmd_drop_behavior(self) -> None:74 drop_cmd = self.make_cmd(ThrottleBehavior.DROP)75 self.check_fetch_next_cmd([drop_cmd], drop_cmd, [0, 1])76 def test_fetch_next_cmd_drop_behavior_throttled(self) -> None:77 drop_cmd_1 = self.make_cmd(ThrottleBehavior.DROP)78 drop_cmd_2 = self.make_cmd(ThrottleBehavior.DROP, throttle_ms=10)79 self.check_fetch_next_cmd(80 [drop_cmd_1, drop_cmd_2], CommandSender.NOOP_COMMAND, [0, 1, 2]81 )82 def test_fetch_next_cmd_drop_behavior_throttled_with_third_cmd(self) -> None:83 drop_cmd_1 = self.make_cmd(ThrottleBehavior.DROP)84 drop_cmd_2 = self.make_cmd(ThrottleBehavior.DROP, throttle_ms=10)85 drop_cmd_3 = self.make_cmd(ThrottleBehavior.DROP, throttle_ms=1)86 self.check_fetch_next_cmd(87 [drop_cmd_1, drop_cmd_2, drop_cmd_3], drop_cmd_3, [0, 1, 2, 3]88 )89 def test_fetch_next_cmd_requeued_top(self) -> None:90 requeue_top_cmd = self.make_cmd(ThrottleBehavior.REQUEUE_TOP)91 self.check_fetch_next_cmd([requeue_top_cmd], requeue_top_cmd, [0, 1])92 def test_fetch_next_cmd_requeued_top_throttled_once(self) -> None:93 requeue_top_cmd_1 = self.make_cmd(ThrottleBehavior.REQUEUE_TOP)94 requeue_top_cmd_2 = self.make_cmd(ThrottleBehavior.REQUEUE_TOP, throttle_ms=10)95 self.check_fetch_next_cmd(96 [requeue_top_cmd_1, requeue_top_cmd_2],97 CommandSender.NOOP_COMMAND,98 [0, 1, 2],99 )100 def test_fetch_next_cmd_requeued_top_not_throttled_twice(self) -> None:101 requeue_top_cmd_1 = self.make_cmd(ThrottleBehavior.REQUEUE_TOP)102 requeue_top_cmd_2 = self.make_cmd(ThrottleBehavior.REQUEUE_TOP, throttle_ms=10)103 self.check_fetch_next_cmd(104 [requeue_top_cmd_1, requeue_top_cmd_2], requeue_top_cmd_2, [0, 1, 20]105 )106 def test_fetch_next_cmd_requeued_top_throttled_then_requeued(self) -> None:107 requeue_top_cmd_1 = self.make_cmd(ThrottleBehavior.REQUEUE_TOP)108 requeue_top_cmd_2 = self.make_cmd(ThrottleBehavior.REQUEUE_TOP, throttle_ms=10)109 other_cmd_3 = self.make_cmd(110 ThrottleBehavior.DROP, throttle_ms=10, cmd_id="other_cmd"111 )112 self.check_fetch_next_cmd(113 [requeue_top_cmd_1, requeue_top_cmd_2, other_cmd_3],114 requeue_top_cmd_2,115 [1, 2, 3, 13],116 )117 def test_fetch_next_cmd_requeued_back(self) -> None:118 requeue_back_cmd = self.make_cmd(ThrottleBehavior.REQUEUE_BACK)119 self.check_fetch_next_cmd([requeue_back_cmd], requeue_back_cmd, [0, 1])120 def test_fetch_next_cmd_requeued_back_throttled_once(self) -> None:121 requeue_back_cmd_1 = self.make_cmd(ThrottleBehavior.REQUEUE_BACK)122 requeue_back_cmd_2 = self.make_cmd(ThrottleBehavior.REQUEUE_BACK, throttle_ms=10)123 self.check_fetch_next_cmd(124 [requeue_back_cmd_1, requeue_back_cmd_2],125 CommandSender.NOOP_COMMAND,126 [0, 1, 2],127 )128 def test_fetch_next_cmd_requeued_back_not_throttled_twice(self) -> None:129 requeue_back_cmd_1 = self.make_cmd(ThrottleBehavior.REQUEUE_BACK)130 requeue_back_cmd_2 = self.make_cmd(ThrottleBehavior.REQUEUE_BACK, throttle_ms=10)131 self.check_fetch_next_cmd(132 [requeue_back_cmd_1, requeue_back_cmd_2], requeue_back_cmd_2, [0, 1, 20]133 )134 def test_fetch_next_cmd_requeued_back_throttled_then_requeued_back(self) -> None:135 requeue_back_cmd_1 = self.make_cmd(ThrottleBehavior.REQUEUE_BACK)136 requeue_back_cmd_2 = self.make_cmd(ThrottleBehavior.REQUEUE_BACK, throttle_ms=10)137 other_cmd_3 = self.make_cmd(138 ThrottleBehavior.DROP, throttle_ms=1, cmd_id="other_cmd"139 )140 self.check_fetch_next_cmd(141 [requeue_back_cmd_1, requeue_back_cmd_2, other_cmd_3, None],142 requeue_back_cmd_2,143 [1, 2, 3, 13, 24, 34],144 )145 def check_fetch_next_cmd(146 self,147 cmds_to_send: List[Command],148 expected_final_cmd: Command = CommandSender.NOOP_COMMAND,149 timestamps: List[int] = None,150 ) -> None:151 # given152 if timestamps:153 self.timestamps = list(reversed(timestamps))154 target = self.make_target()155 # given that commands were sent instantaneously156 for cmd in cmds_to_send:157 if cmd is not None:158 target.send(cmd)159 # given that n - 1 commands are fetched160 for i in range(len(cmds_to_send) - 1):161 next_cmd = target.fetch_next_cmd()162 if not next_cmd is CommandSender.NOOP_COMMAND:163 # Make it so that the command is issued exactly164 # when it is fetched165 target.issue_cmd(next_cmd)166 # when the final command is fetched167 actual_cmd = target.fetch_next_cmd()168 # then we expect a specific command at the end169 self.assertEqual(170 actual_cmd,171 expected_final_cmd,172 (f"actual_cmd: {actual_cmd}\n, " f"expected_cmd: {expected_final_cmd}"),173 )174if __name__ == "__main__":...

Full Screen

Full Screen

requeue.py

Source:requeue.py Github

copy

Full Screen

1#!/usr/bin/env python2#3# Copyright 2007 Google Inc.4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17"""A thread-safe queue in which removed objects put back to the front."""18import logging19import Queue20import threading21import time22logger = logging.getLogger('google.appengine.tools.requeue')23class ReQueue(object):24 """A special thread-safe queue.25 A ReQueue allows unfinished work items to be returned with a call to26 reput(). When an item is reput, task_done() should *not* be called27 in addition, getting an item that has been reput does not increase28 the number of outstanding tasks.29 This class shares an interface with Queue.Queue and provides the30 additional reput method.31 """32 def __init__(self,33 queue_capacity,34 requeue_capacity=None,35 queue_factory=Queue.Queue,36 get_time=time.time):37 """Initialize a ReQueue instance.38 Args:39 queue_capacity: The number of items that can be put in the ReQueue.40 requeue_capacity: The numer of items that can be reput in the ReQueue.41 queue_factory: Used for dependency injection.42 get_time: Used for dependency injection.43 """44 if requeue_capacity is None:45 requeue_capacity = queue_capacity46 self.get_time = get_time47 self.queue = queue_factory(queue_capacity)48 self.requeue = queue_factory(requeue_capacity)49 self.lock = threading.Lock()50 self.put_cond = threading.Condition(self.lock)51 self.get_cond = threading.Condition(self.lock)52 def _DoWithTimeout(self,53 action,54 exc,55 wait_cond,56 done_cond,57 lock,58 timeout=None,59 block=True):60 """Performs the given action with a timeout.61 The action must be non-blocking, and raise an instance of exc on a62 recoverable failure. If the action fails with an instance of exc,63 we wait on wait_cond before trying again. Failure after the64 timeout is reached is propagated as an exception. Success is65 signalled by notifying on done_cond and returning the result of66 the action. If action raises any exception besides an instance of67 exc, it is immediately propagated.68 Args:69 action: A callable that performs a non-blocking action.70 exc: An exception type that is thrown by the action to indicate71 a recoverable error.72 wait_cond: A condition variable which should be waited on when73 action throws exc.74 done_cond: A condition variable to signal if the action returns.75 lock: The lock used by wait_cond and done_cond.76 timeout: A non-negative float indicating the maximum time to wait.77 block: Whether to block if the action cannot complete immediately.78 Returns:79 The result of the action, if it is successful.80 Raises:81 ValueError: If the timeout argument is negative.82 """83 if timeout is not None and timeout < 0.0:84 raise ValueError('\'timeout\' must not be a negative number')85 if not block:86 timeout = 0.087 result = None88 success = False89 start_time = self.get_time()90 lock.acquire()91 try:92 while not success:93 try:94 result = action()95 success = True96 except Exception, e:97 if not isinstance(e, exc):98 raise e99 if timeout is not None:100 elapsed_time = self.get_time() - start_time101 timeout -= elapsed_time102 if timeout <= 0.0:103 raise e104 wait_cond.wait(timeout)105 finally:106 if success:107 done_cond.notify()108 lock.release()109 return result110 def put(self, item, block=True, timeout=None):111 """Put an item into the requeue.112 Args:113 item: An item to add to the requeue.114 block: Whether to block if the requeue is full.115 timeout: Maximum on how long to wait until the queue is non-full.116 Raises:117 Queue.Full if the queue is full and the timeout expires.118 """119 def PutAction():120 self.queue.put(item, block=False)121 self._DoWithTimeout(PutAction,122 Queue.Full,123 self.get_cond,124 self.put_cond,125 self.lock,126 timeout=timeout,127 block=block)128 def reput(self, item, block=True, timeout=None):129 """Re-put an item back into the requeue.130 Re-putting an item does not increase the number of outstanding131 tasks, so the reput item should be uniquely associated with an132 item that was previously removed from the requeue and for which133 TaskDone has not been called.134 Args:135 item: An item to add to the requeue.136 block: Whether to block if the requeue is full.137 timeout: Maximum on how long to wait until the queue is non-full.138 Raises:139 Queue.Full is the queue is full and the timeout expires.140 """141 def ReputAction():142 self.requeue.put(item, block=False)143 self._DoWithTimeout(ReputAction,144 Queue.Full,145 self.get_cond,146 self.put_cond,147 self.lock,148 timeout=timeout,149 block=block)150 def get(self, block=True, timeout=None):151 """Get an item from the requeue.152 Args:153 block: Whether to block if the requeue is empty.154 timeout: Maximum on how long to wait until the requeue is non-empty.155 Returns:156 An item from the requeue.157 Raises:158 Queue.Empty if the queue is empty and the timeout expires.159 """160 def GetAction():161 try:162 result = self.requeue.get(block=False)163 self.requeue.task_done()164 except Queue.Empty:165 result = self.queue.get(block=False)166 return result167 return self._DoWithTimeout(GetAction,168 Queue.Empty,169 self.put_cond,170 self.get_cond,171 self.lock,172 timeout=timeout,173 block=block)174 def join(self):175 """Blocks until all of the items in the requeue have been processed."""176 self.queue.join()177 def task_done(self):178 """Indicate that a previously enqueued item has been fully processed."""179 self.queue.task_done()180 def empty(self):181 """Returns true if the requeue is empty."""182 return self.queue.empty() and self.requeue.empty()183 def get_nowait(self):184 """Try to get an item from the queue without blocking."""185 return self.get(block=False)186 def qsize(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful