Best Python code snippet using autotest_python
consumer_test.py
Source:consumer_test.py  
1# Licensed to the Apache Software Foundation (ASF) under one or more2# contributor license agreements.  See the NOTICE file distributed with3# this work for additional information regarding copyright ownership.4# The ASF licenses this file to You under the Apache License, Version 2.05# (the "License"); you may not use this file except in compliance with6# the License.  You may obtain a copy of the License at7#8#    http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15from ducktape.mark import matrix16from ducktape.utils.util import wait_until17from kafkatest.tests.kafka_test import KafkaTest18from kafkatest.services.zookeeper import ZookeeperService19from kafkatest.services.kafka import KafkaService20from kafkatest.services.verifiable_producer import VerifiableProducer21from kafkatest.services.verifiable_consumer import VerifiableConsumer22from kafkatest.services.kafka import TopicPartition23import signal24def partitions_for(topic, num_partitions):25    partitions = set()26    for i in range(num_partitions):27        partitions.add(TopicPartition(topic=topic, partition=i))28    return partitions29class VerifiableConsumerTest(KafkaTest):30    STOPIC = "simple_topic"31    TOPIC = "test_topic"32    NUM_PARTITIONS = 333    PARTITIONS = partitions_for(TOPIC, NUM_PARTITIONS)34    GROUP_ID = "test_group_id"35    def __init__(self, test_context):36        super(VerifiableConsumerTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={37            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },38            self.STOPIC : { 'partitions': 1, 'replication-factor': 2 }39        })40        self.num_producers = 141        self.num_consumers = 242        self.session_timeout = 1000043    def min_cluster_size(self):44        """Override this since we're adding services outside of the constructor"""45        return super(VerifiableConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers46    def _partitions(self, assignment):47        partitions = []48        for parts in assignment.itervalues():49            partitions += parts50        return partitions51    def _valid_assignment(self, assignment):52        partitions = self._partitions(assignment)53        return len(partitions) == self.NUM_PARTITIONS and set(partitions) == self.PARTITIONS54    def _setup_consumer(self, topic, enable_autocommit=False):55        return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka,56                                  topic, self.GROUP_ID, session_timeout=self.session_timeout,57                                  enable_autocommit=enable_autocommit)58    def _setup_producer(self, topic, max_messages=-1):59        return VerifiableProducer(self.test_context, self.num_producers, self.kafka, topic,60                                  max_messages=max_messages, throughput=500)61    def _await_all_members(self, consumer):62        # Wait until all members have joined the group63        wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers, timeout_sec=self.session_timeout+5,64                   err_msg="Consumers failed to join in a reasonable amount of time")65    def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):66        for _ in range(num_bounces):67            for node in consumer.nodes:68                consumer.stop_node(node, clean_shutdown)69                wait_until(lambda: len(consumer.dead_nodes()) == (self.num_consumers - 1), timeout_sec=self.session_timeout,70                           err_msg="Timed out waiting for the consumers to shutdown")71                total_consumed = consumer.total_consumed()72            73                consumer.start_node(node)74                wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed,75                           timeout_sec=self.session_timeout,76                           err_msg="Timed out waiting for the consumers to shutdown")77    def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):78        for _ in range(num_bounces):79            for node in consumer.nodes:80                consumer.stop_node(node, clean_shutdown)81            wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,82                       err_msg="Timed out waiting for the consumers to shutdown")83            total_consumed = consumer.total_consumed()84            85            for node in consumer.nodes:86                consumer.start_node(node)87            wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed,88                       timeout_sec=self.session_timeout*2,89                       err_msg="Timed out waiting for the consumers to shutdown")90    def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True):91        for _ in range(num_bounces):92            for node in self.kafka.nodes:93                total_consumed = consumer.total_consumed()94                self.kafka.restart_node(node, clean_shutdown=True)95                wait_until(lambda: len(consumer.joined_nodes()) == self.num_consumers and consumer.total_consumed() > total_consumed,96                           timeout_sec=30,97                           err_msg="Timed out waiting for the broker to shutdown")98    def bounce_all_brokers(self, consumer, num_bounces=5, clean_shutdown=True):99        for _ in range(num_bounces):100            for node in self.kafka.nodes:101                self.kafka.stop_node(node)102            for node in self.kafka.nodes:103                self.kafka.start_node(node)104            105    def test_broker_rolling_bounce(self):106        """107        Verify correct consumer behavior when the brokers are consecutively restarted.108        Setup: single Kafka cluster with one producer writing messages to a single topic with one109        partition, an a set of consumers in the same group reading from the same topic.110        - Start a producer which continues producing new messages throughout the test.111        - Start up the consumers and wait until they've joined the group.112        - In a loop, restart each broker consecutively, waiting for the group to stabilize between113          each broker restart.114        - Verify delivery semantics according to the failure type and that the broker bounces115          did not cause unexpected group rebalances.116        """117        partition = TopicPartition(self.STOPIC, 0)118        119        producer = self._setup_producer(self.STOPIC)120        consumer = self._setup_consumer(self.STOPIC)121        producer.start()122        wait_until(lambda: producer.num_acked > 1000, timeout_sec=10,123                   err_msg="Producer failed waiting for messages to be written")124        consumer.start()125        self._await_all_members(consumer)126        num_rebalances = consumer.num_rebalances()127        # TODO: make this test work with hard shutdowns, which probably requires128        #       pausing before the node is restarted to ensure that any ephemeral129        #       nodes have time to expire130        self.rolling_bounce_brokers(consumer, clean_shutdown=True)131        132        unexpected_rebalances = consumer.num_rebalances() - num_rebalances133        assert unexpected_rebalances == 0, \134            "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances135        consumer.stop_all()136        assert consumer.current_position(partition) == consumer.total_consumed(), \137            "Total consumed records did not match consumed position"138    @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])139    def test_consumer_bounce(self, clean_shutdown, bounce_mode):140        """141        Verify correct consumer behavior when the consumers in the group are consecutively restarted.142        Setup: single Kafka cluster with one producer and a set of consumers in one group.143        - Start a producer which continues producing new messages throughout the test.144        - Start up the consumers and wait until they've joined the group.145        - In a loop, restart each consumer, waiting for each one to rejoin the group before146          restarting the rest.147        - Verify delivery semantics according to the failure type.148        """149        partition = TopicPartition(self.STOPIC, 0)150        151        producer = self._setup_producer(self.STOPIC)152        consumer = self._setup_consumer(self.STOPIC)153        producer.start()154        wait_until(lambda: producer.num_acked > 1000, timeout_sec=10,155                   err_msg="Producer failed waiting for messages to be written")156        consumer.start()157        self._await_all_members(consumer)158        if bounce_mode == "all":159            self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)160        else:161            self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)162                163        consumer.stop_all()164        if clean_shutdown:165            # if the total records consumed matches the current position, we haven't seen any duplicates166            # this can only be guaranteed with a clean shutdown167            assert consumer.current_position(partition) == consumer.total_consumed(), \168                "Total consumed records did not match consumed position"169        else:170            # we may have duplicates in a hard failure171            assert consumer.current_position(partition) <= consumer.total_consumed(), \172                "Current position greater than the total number of consumed records"173    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])174    def test_consumer_failure(self, clean_shutdown, enable_autocommit):175        partition = TopicPartition(self.STOPIC, 0)176        177        consumer = self._setup_consumer(self.STOPIC, enable_autocommit=enable_autocommit)178        producer = self._setup_producer(self.STOPIC)179        consumer.start()180        self._await_all_members(consumer)181        partition_owner = consumer.owner(partition)182        assert partition_owner is not None183        # startup the producer and ensure that some records have been written184        producer.start()185        wait_until(lambda: producer.num_acked > 1000, timeout_sec=10,186                   err_msg="Producer failed waiting for messages to be written")187        # stop the partition owner and await its shutdown188        consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)189        wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None,190                   timeout_sec=self.session_timeout+5, err_msg="Timed out waiting for consumer to close")191        # ensure that the remaining consumer does some work after rebalancing192        current_total_consumed = consumer.total_consumed()193        wait_until(lambda: consumer.total_consumed() > current_total_consumed + 1000, timeout_sec=10,194                   err_msg="Timed out waiting for additional records to be consumed after first consumer failed")195        consumer.stop_all()196        if clean_shutdown:197            # if the total records consumed matches the current position, we haven't seen any duplicates198            # this can only be guaranteed with a clean shutdown199            assert consumer.current_position(partition) == consumer.total_consumed(), \200                "Total consumed records did not match consumed position"201        else:202            # we may have duplicates in a hard failure203            assert consumer.current_position(partition) <= consumer.total_consumed(), \204                "Current position greater than the total number of consumed records"205        # if autocommit is not turned on, we can also verify the last committed offset206        if not enable_autocommit:207            assert consumer.last_commit(partition) == consumer.current_position(partition), \208                "Last committed offset did not match last consumed position"209    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])210    def test_broker_failure(self, clean_shutdown, enable_autocommit):211        partition = TopicPartition(self.STOPIC, 0)212        213        consumer = self._setup_consumer(self.STOPIC, enable_autocommit=enable_autocommit)214        producer = self._setup_producer(self.STOPIC)215        producer.start()216        consumer.start()217        self._await_all_members(consumer)218        num_rebalances = consumer.num_rebalances()219        # shutdown one of the brokers220        # TODO: we need a way to target the coordinator instead of picking arbitrarily221        self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL)222        # ensure that the consumers do some work after the broker failure223        current_total_consumed = consumer.total_consumed()224        wait_until(lambda: consumer.total_consumed() > current_total_consumed + 1000, timeout_sec=20,225                   err_msg="Timed out waiting for additional records to be consumed after first consumer failed")226        # verify that there were no rebalances on failover227        assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance"228        consumer.stop_all()229        # if the total records consumed matches the current position, we haven't seen any duplicates230        assert consumer.current_position(partition) == consumer.total_consumed(), \231            "Total consumed records did not match consumed position"232        # if autocommit is not turned on, we can also verify the last committed offset233        if not enable_autocommit:234            assert consumer.last_commit(partition) == consumer.current_position(partition), \235                "Last committed offset did not match last consumed position"236    def test_simple_consume(self):237        total_records = 1000238        consumer = self._setup_consumer(self.STOPIC)239        producer = self._setup_producer(self.STOPIC, max_messages=total_records)240        partition = TopicPartition(self.STOPIC, 0)241        consumer.start()242        self._await_all_members(consumer)243        producer.start()244        wait_until(lambda: producer.num_acked == total_records, timeout_sec=20,245                   err_msg="Producer failed waiting for messages to be written")246        wait_until(lambda: consumer.last_commit(partition) == total_records, timeout_sec=10,247                   err_msg="Consumer failed to read all expected messages")248        assert consumer.current_position(partition) == total_records249    def test_valid_assignment(self):250        consumer = self._setup_consumer(self.TOPIC)251        consumer.start()252        self._await_all_members(consumer)...scheduler_lib_unittest.py
Source:scheduler_lib_unittest.py  
1#!/usr/bin/python2#3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.4# Use of this source code is governed by a BSD-style license that can be5# found in the LICENSE file.6import mock7import unittest8import common9from autotest_lib.database import database_connection10from autotest_lib.frontend import setup_django_environment11from autotest_lib.frontend.afe import readonly_connection12from autotest_lib.server import utils as server_utils13from autotest_lib.scheduler import scheduler_lib14from django.db import utils as django_utils15class ConnectionManagerTests(unittest.TestCase):16    """Connection manager unittests."""17    def setUp(self):18        self.connection_manager = None19        readonly_connection.set_globally_disabled = mock.MagicMock()20        setup_django_environment.enable_autocommit = mock.MagicMock()21        server_utils.Singleton._instances = {}22    def tearDown(self):23        readonly_connection.set_globally_disabled.reset_mock()24        setup_django_environment.enable_autocommit.reset_mock()25    def testConnectionDisconnect(self):26        """Test connection and disconnecting from the database."""27        # Test that the connection manager only opens a connection once.28        connection_manager = scheduler_lib.ConnectionManager()29        connection_manager.open_connection = mock.MagicMock()30        connection = connection_manager.get_connection()31        connection_manager.open_connection.assert_called_once_with()32        connection_manager.open_connection.reset_mock()33        connection = connection_manager.get_connection()34        self.assertTrue(35                connection_manager.open_connection.call_count == 0)36        connection_manager.open_connection.reset_mock()37        # Test that del on the connection manager closes the connection38        connection_manager.disconnect = mock.MagicMock()39        connection_manager.__del__()40        connection_manager.disconnect.assert_called_once_with()41    def testConnectionReconnect(self):42        """Test that retries don't destroy the connection."""43        database_connection._DjangoBackend.execute = mock.MagicMock()44        database_connection._DjangoBackend.execute.side_effect = (45                django_utils.DatabaseError('Database Error'))46        connection_manager = scheduler_lib.ConnectionManager()47        connection = connection_manager.get_connection()48        self.assertRaises(django_utils.DatabaseError,49                          connection.execute, *('', None, True))50        self.assertTrue(51                database_connection._DjangoBackend.execute.call_count == 2)52        database_connection._DjangoBackend.execute.reset_mock()53        self.assertTrue(connection_manager.db_connection ==54                        connection_manager.get_connection())55    def testConnectionManagerSingleton(self):56        """Test that the singleton works as expected."""57        # Confirm that instantiating the class applies global db settings.58        connection_manager = scheduler_lib.ConnectionManager()59        readonly_connection.set_globally_disabled.assert_called_once_with(True)60        setup_django_environment.enable_autocommit.assert_called_once_with()61        readonly_connection.set_globally_disabled.reset_mock()62        setup_django_environment.enable_autocommit.reset_mock()63        # Confirm that instantiating another connection manager doesn't change64        # the database settings, and in fact, returns the original manager.65        connection_manager_2 = scheduler_lib.ConnectionManager()66        self.assertTrue(connection_manager == connection_manager_2)67        self.assertTrue(68                readonly_connection.set_globally_disabled.call_count == 0)69        self.assertTrue(70                setup_django_environment.enable_autocommit.call_count == 0)71        # Confirm that we don't open the connection when the class is72        # instantiated.73        self.assertTrue(connection_manager.db_connection is None)74if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
