Best Python code snippet using autotest_python
test_client_interface.py
Source:test_client_interface.py  
1#!/usr/bin/env python3.82import unittest3import tibia_terminator.interface.client_interface as sut4from tibia_terminator.interface.client_interface import (5    Command,6    ThrottleBehavior,7    CommandSender,8)9from unittest import TestCase10from typing import List11class FakeStatsLogger:12    def log_action(self, debug_level: int, msg: str) -> None:13        pass14class FakeCommand(Command):15    def __init__(self, *args, **kwargs):16        super().__init__(*args, **kwargs)17    def _send(self, tibia_wid: str) -> None:18        pass19class TestClientInterface(TestCase):20    test_cmd_type: str21    default_test_cmd_id: str22    timestamps: List[int]23    def __init__(self, *args, **kwargs):24        super().__init__(*args, **kwargs)25        self.test_cmd_type = "_test_cmd_type_"26        self.default_test_cmd_id = "_test_cmd_id_"27        self.timestamps = []28    def setUp(self) -> None:29        def next_timestamp():30            if len(self.timestamps) > 0:31                return self.timestamps.pop()32            else:33                return 034        sut.timestamp_ms = next_timestamp35    def make_target(self):36        return CommandSender("_test_tibia_wid_", FakeStatsLogger(), False)37    def make_cmd(38        self,39        throttle_behavior: ThrottleBehavior,40        throttle_ms: int = 0,41        cmd_id: str = None,42    ) -> Command:43        return FakeCommand(44            cmd_type=self.test_cmd_type,45            throttle_ms=throttle_ms,46            cmd_id=cmd_id or self.default_test_cmd_id,47            throttle_behavior=throttle_behavior,48        )49    def test_fetch_next_cmd_forced_throttle(self) -> None:50        # given that a single forced command is given51        input_cmd = self.make_cmd(ThrottleBehavior.FORCE)52        # when the next command is fetched53        # then the forced command is issued54        self.check_fetch_next_cmd([input_cmd], input_cmd, [0, 1])55    def test_fetch_next_cmd_forced_throttle_1000ms(self) -> None:56        # given that a single forced command is given57        input_cmd_1 = self.make_cmd(ThrottleBehavior.FORCE)58        input_cmd_2 = self.make_cmd(ThrottleBehavior.FORCE)59        # when the next command is fetched60        # then the forced command is issued61        self.check_fetch_next_cmd([input_cmd_1, input_cmd_2], input_cmd_2, [0, 1, 2, 3])62    def test_fetch_next_cmd_forced_throttle_1000ms_drop_first(self) -> None:63        drop_cmd = self.make_cmd(ThrottleBehavior.DROP)64        foced_cmd = self.make_cmd(ThrottleBehavior.FORCE)65        self.check_fetch_next_cmd(66            # given that a dropped and forced commands are sent67            [drop_cmd, foced_cmd],68            # when the forced command is fetched69            # then the forced command is issued70            foced_cmd,71            [0, 1, 2],72        )73    def test_fetch_next_cmd_drop_behavior(self) -> None:74        drop_cmd = self.make_cmd(ThrottleBehavior.DROP)75        self.check_fetch_next_cmd([drop_cmd], drop_cmd, [0, 1])76    def test_fetch_next_cmd_drop_behavior_throttled(self) -> None:77        drop_cmd_1 = self.make_cmd(ThrottleBehavior.DROP)78        drop_cmd_2 = self.make_cmd(ThrottleBehavior.DROP, throttle_ms=10)79        self.check_fetch_next_cmd(80            [drop_cmd_1, drop_cmd_2], CommandSender.NOOP_COMMAND, [0, 1, 2]81        )82    def test_fetch_next_cmd_drop_behavior_throttled_with_third_cmd(self) -> None:83        drop_cmd_1 = self.make_cmd(ThrottleBehavior.DROP)84        drop_cmd_2 = self.make_cmd(ThrottleBehavior.DROP, throttle_ms=10)85        drop_cmd_3 = self.make_cmd(ThrottleBehavior.DROP, throttle_ms=1)86        self.check_fetch_next_cmd(87            [drop_cmd_1, drop_cmd_2, drop_cmd_3], drop_cmd_3, [0, 1, 2, 3]88        )89    def test_fetch_next_cmd_requeued_top(self) -> None:90        requeue_top_cmd = self.make_cmd(ThrottleBehavior.REQUEUE_TOP)91        self.check_fetch_next_cmd([requeue_top_cmd], requeue_top_cmd, [0, 1])92    def test_fetch_next_cmd_requeued_top_throttled_once(self) -> None:93        requeue_top_cmd_1 = self.make_cmd(ThrottleBehavior.REQUEUE_TOP)94        requeue_top_cmd_2 = self.make_cmd(ThrottleBehavior.REQUEUE_TOP, throttle_ms=10)95        self.check_fetch_next_cmd(96            [requeue_top_cmd_1, requeue_top_cmd_2],97            CommandSender.NOOP_COMMAND,98            [0, 1, 2],99        )100    def test_fetch_next_cmd_requeued_top_not_throttled_twice(self) -> None:101        requeue_top_cmd_1 = self.make_cmd(ThrottleBehavior.REQUEUE_TOP)102        requeue_top_cmd_2 = self.make_cmd(ThrottleBehavior.REQUEUE_TOP, throttle_ms=10)103        self.check_fetch_next_cmd(104            [requeue_top_cmd_1, requeue_top_cmd_2], requeue_top_cmd_2, [0, 1, 20]105        )106    def test_fetch_next_cmd_requeued_top_throttled_then_requeued(self) -> None:107        requeue_top_cmd_1 = self.make_cmd(ThrottleBehavior.REQUEUE_TOP)108        requeue_top_cmd_2 = self.make_cmd(ThrottleBehavior.REQUEUE_TOP, throttle_ms=10)109        other_cmd_3 = self.make_cmd(110            ThrottleBehavior.DROP, throttle_ms=10, cmd_id="other_cmd"111        )112        self.check_fetch_next_cmd(113            [requeue_top_cmd_1, requeue_top_cmd_2, other_cmd_3],114            requeue_top_cmd_2,115            [1, 2, 3, 13],116        )117    def test_fetch_next_cmd_requeued_back(self) -> None:118        requeue_back_cmd = self.make_cmd(ThrottleBehavior.REQUEUE_BACK)119        self.check_fetch_next_cmd([requeue_back_cmd], requeue_back_cmd, [0, 1])120    def test_fetch_next_cmd_requeued_back_throttled_once(self) -> None:121        requeue_back_cmd_1 = self.make_cmd(ThrottleBehavior.REQUEUE_BACK)122        requeue_back_cmd_2 = self.make_cmd(ThrottleBehavior.REQUEUE_BACK, throttle_ms=10)123        self.check_fetch_next_cmd(124            [requeue_back_cmd_1, requeue_back_cmd_2],125            CommandSender.NOOP_COMMAND,126            [0, 1, 2],127        )128    def test_fetch_next_cmd_requeued_back_not_throttled_twice(self) -> None:129        requeue_back_cmd_1 = self.make_cmd(ThrottleBehavior.REQUEUE_BACK)130        requeue_back_cmd_2 = self.make_cmd(ThrottleBehavior.REQUEUE_BACK, throttle_ms=10)131        self.check_fetch_next_cmd(132            [requeue_back_cmd_1, requeue_back_cmd_2], requeue_back_cmd_2, [0, 1, 20]133        )134    def test_fetch_next_cmd_requeued_back_throttled_then_requeued_back(self) -> None:135        requeue_back_cmd_1 = self.make_cmd(ThrottleBehavior.REQUEUE_BACK)136        requeue_back_cmd_2 = self.make_cmd(ThrottleBehavior.REQUEUE_BACK, throttle_ms=10)137        other_cmd_3 = self.make_cmd(138            ThrottleBehavior.DROP, throttle_ms=1, cmd_id="other_cmd"139        )140        self.check_fetch_next_cmd(141            [requeue_back_cmd_1, requeue_back_cmd_2, other_cmd_3, None],142            requeue_back_cmd_2,143            [1, 2, 3, 13, 24, 34],144        )145    def check_fetch_next_cmd(146        self,147        cmds_to_send: List[Command],148        expected_final_cmd: Command = CommandSender.NOOP_COMMAND,149        timestamps: List[int] = None,150    ) -> None:151        # given152        if timestamps:153            self.timestamps = list(reversed(timestamps))154        target = self.make_target()155        # given that commands were sent instantaneously156        for cmd in cmds_to_send:157            if cmd is not None:158                target.send(cmd)159        # given that n - 1 commands are fetched160        for i in range(len(cmds_to_send) - 1):161            next_cmd = target.fetch_next_cmd()162            if not next_cmd is CommandSender.NOOP_COMMAND:163                # Make it so that the command is issued exactly164                # when it is fetched165                target.issue_cmd(next_cmd)166        # when the final command is fetched167        actual_cmd = target.fetch_next_cmd()168        # then we expect a specific command at the end169        self.assertEqual(170            actual_cmd,171            expected_final_cmd,172            (f"actual_cmd: {actual_cmd}\n, " f"expected_cmd: {expected_final_cmd}"),173        )174if __name__ == "__main__":...requeue.py
Source:requeue.py  
1#!/usr/bin/env python2#3# Copyright 2007 Google Inc.4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9#     http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17"""A thread-safe queue in which removed objects put back to the front."""18import logging19import Queue20import threading21import time22logger = logging.getLogger('google.appengine.tools.requeue')23class ReQueue(object):24  """A special thread-safe queue.25  A ReQueue allows unfinished work items to be returned with a call to26  reput().  When an item is reput, task_done() should *not* be called27  in addition, getting an item that has been reput does not increase28  the number of outstanding tasks.29  This class shares an interface with Queue.Queue and provides the30  additional reput method.31  """32  def __init__(self,33               queue_capacity,34               requeue_capacity=None,35               queue_factory=Queue.Queue,36               get_time=time.time):37    """Initialize a ReQueue instance.38    Args:39      queue_capacity: The number of items that can be put in the ReQueue.40      requeue_capacity: The numer of items that can be reput in the ReQueue.41      queue_factory: Used for dependency injection.42      get_time: Used for dependency injection.43    """44    if requeue_capacity is None:45      requeue_capacity = queue_capacity46    self.get_time = get_time47    self.queue = queue_factory(queue_capacity)48    self.requeue = queue_factory(requeue_capacity)49    self.lock = threading.Lock()50    self.put_cond = threading.Condition(self.lock)51    self.get_cond = threading.Condition(self.lock)52  def _DoWithTimeout(self,53                     action,54                     exc,55                     wait_cond,56                     done_cond,57                     lock,58                     timeout=None,59                     block=True):60    """Performs the given action with a timeout.61    The action must be non-blocking, and raise an instance of exc on a62    recoverable failure.  If the action fails with an instance of exc,63    we wait on wait_cond before trying again.  Failure after the64    timeout is reached is propagated as an exception.  Success is65    signalled by notifying on done_cond and returning the result of66    the action.  If action raises any exception besides an instance of67    exc, it is immediately propagated.68    Args:69      action: A callable that performs a non-blocking action.70      exc: An exception type that is thrown by the action to indicate71        a recoverable error.72      wait_cond: A condition variable which should be waited on when73        action throws exc.74      done_cond: A condition variable to signal if the action returns.75      lock: The lock used by wait_cond and done_cond.76      timeout: A non-negative float indicating the maximum time to wait.77      block: Whether to block if the action cannot complete immediately.78    Returns:79      The result of the action, if it is successful.80    Raises:81      ValueError: If the timeout argument is negative.82    """83    if timeout is not None and timeout < 0.0:84      raise ValueError('\'timeout\' must not be a negative  number')85    if not block:86      timeout = 0.087    result = None88    success = False89    start_time = self.get_time()90    lock.acquire()91    try:92      while not success:93        try:94          result = action()95          success = True96        except Exception, e:97          if not isinstance(e, exc):98            raise e99          if timeout is not None:100            elapsed_time = self.get_time() - start_time101            timeout -= elapsed_time102            if timeout <= 0.0:103              raise e104          wait_cond.wait(timeout)105    finally:106      if success:107        done_cond.notify()108      lock.release()109    return result110  def put(self, item, block=True, timeout=None):111    """Put an item into the requeue.112    Args:113      item: An item to add to the requeue.114      block: Whether to block if the requeue is full.115      timeout: Maximum on how long to wait until the queue is non-full.116    Raises:117      Queue.Full if the queue is full and the timeout expires.118    """119    def PutAction():120      self.queue.put(item, block=False)121    self._DoWithTimeout(PutAction,122                        Queue.Full,123                        self.get_cond,124                        self.put_cond,125                        self.lock,126                        timeout=timeout,127                        block=block)128  def reput(self, item, block=True, timeout=None):129    """Re-put an item back into the requeue.130    Re-putting an item does not increase the number of outstanding131    tasks, so the reput item should be uniquely associated with an132    item that was previously removed from the requeue and for which133    TaskDone has not been called.134    Args:135      item: An item to add to the requeue.136      block: Whether to block if the requeue is full.137      timeout: Maximum on how long to wait until the queue is non-full.138    Raises:139      Queue.Full is the queue is full and the timeout expires.140    """141    def ReputAction():142      self.requeue.put(item, block=False)143    self._DoWithTimeout(ReputAction,144                        Queue.Full,145                        self.get_cond,146                        self.put_cond,147                        self.lock,148                        timeout=timeout,149                        block=block)150  def get(self, block=True, timeout=None):151    """Get an item from the requeue.152    Args:153      block: Whether to block if the requeue is empty.154      timeout: Maximum on how long to wait until the requeue is non-empty.155    Returns:156      An item from the requeue.157    Raises:158      Queue.Empty if the queue is empty and the timeout expires.159    """160    def GetAction():161      try:162        result = self.requeue.get(block=False)163        self.requeue.task_done()164      except Queue.Empty:165        result = self.queue.get(block=False)166      return result167    return self._DoWithTimeout(GetAction,168                               Queue.Empty,169                               self.put_cond,170                               self.get_cond,171                               self.lock,172                               timeout=timeout,173                               block=block)174  def join(self):175    """Blocks until all of the items in the requeue have been processed."""176    self.queue.join()177  def task_done(self):178    """Indicate that a previously enqueued item has been fully processed."""179    self.queue.task_done()180  def empty(self):181    """Returns true if the requeue is empty."""182    return self.queue.empty() and self.requeue.empty()183  def get_nowait(self):184    """Try to get an item from the queue without blocking."""185    return self.get(block=False)186  def qsize(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
