Best Python code snippet using yandex-tank
test_worker.py
Source:test_worker.py  
1"""Module testing the kale.worker module."""2from __future__ import absolute_import3import mock4import signal5import unittest6from kale import exceptions7from kale import test_utils8from kale import worker9from six.moves import range10class WorkerTestCase(unittest.TestCase):11    """Test worker logic."""12    def _create_patch(self, name):13        """Helper method for creating scoped mocks."""14        patcher = mock.patch(name)15        patch = patcher.start()16        self.addCleanup(patcher.stop)17        return patch18    def testRun(self):19        """Test an iteration that has tasks."""20        mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')21        mock_consumer.return_value = None22        startup_handler = self._create_patch('kale.settings.ON_WORKER_STARTUP')23        worker_inst = worker.Worker()24        self.assertTrue(worker_inst is not None)25        startup_handler.assert_called_once_with()26    def testRunIterationWithTasks(self):27        """Test an iteration that has tasks."""28        mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')29        mock_consumer.return_value = None30        fetch_batch = self._create_patch('kale.consumer.Consumer.fetch_batch')31        message = test_utils.new_mock_message()32        fetch_batch.return_value = [message]33        run_batch = self._create_patch('kale.worker.Worker._run_batch')34        run_batch.return_value = (1, 1)35        worker_inst = worker.Worker()36        mock_consumer.assert_called_once_with()37        worker_inst._batch_queue = worker_inst._queue_selector.get_queue()38        self.assertTrue(worker_inst._run_single_iteration())39        self.assertEqual(fetch_batch.called, 1)40        self.assertTrue(worker_inst._dirty)41        run_batch.assert_called_once_with([message])42    def testRunIterationWithoutTasks(self):43        """Test an iteration that does not have tasks."""44        mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')45        mock_consumer.return_value = None46        fetch_batch = self._create_patch('kale.consumer.Consumer.fetch_batch')47        fetch_batch.return_value = []48        run_batch = self._create_patch('kale.worker.Worker._run_batch')49        worker_inst = worker.Worker()50        mock_consumer.assert_called_once_with()51        worker_inst._batch_queue = worker_inst._queue_selector.get_queue()52        self.assertFalse(worker_inst._run_single_iteration())53        self.assertFalse(worker_inst._dirty)54        self.assertEqual(fetch_batch.called, 1)55        self.assertFalse(run_batch.called)56    def testCleanupWorkerStop(self):57        """Test cleanup worker."""58        mock_consumer = self._create_patch('kale.consumer.Consumer')59        release_batch = self._create_patch('kale.worker.Worker._release_batch')60        shutdown_handler = self._create_patch(61            'kale.settings.ON_WORKER_SHUTDOWN')62        sys_exit = self._create_patch('sys.exit')63        worker_inst = worker.Worker()64        mock_consumer.assert_called_once_with()65        release_batch.return_value = (0, 0)66        worker_inst._cleanup_worker(signal.SIGABRT, None)67        release_batch.assert_called_once_with()68        sys_exit.assert_called_once_with(0)69        shutdown_handler.assert_called_once_with()70    def testCleanupWorkerSuspend(self):71        """Test cleanup worker."""72        mock_consumer = self._create_patch('kale.consumer.Consumer')73        release_batch = self._create_patch('kale.worker.Worker._release_batch')74        sys_exit = self._create_patch('sys.exit')75        worker_inst = worker.Worker()76        mock_consumer.assert_called_once_with()77        release_batch.return_value = (0, 0)78        worker_inst._cleanup_worker(signal.SIGTSTP, None)79        release_batch.assert_called_once_with()80        assert not sys_exit.called, 'System should not have exited.'81    def testReleaseBatchWithTimeToSpare(self):82        """Test releasing a batch where the spare time is over the threshold.83        """84        mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')85        mock_consumer.return_value = None86        mock_release = self._create_patch(87            'kale.consumer.Consumer.release_messages')88        mock_delete = self._create_patch(89            'kale.consumer.Consumer.delete_messages')90        mock_publish_dlq = self._create_patch(91            'kale.publisher.Publisher.publish_messages_to_dead_letter_queue')92        get_time = self._create_patch('time.time')93        worker_inst = worker.Worker()94        worker_inst._batch_queue = worker_inst._queue_selector.get_queue()95        mock_consumer.assert_called_once_with()96        worker_inst._incomplete_messages = [97            test_utils.new_mock_message() for i in range(2)]98        worker_inst._successful_messages = [99            test_utils.new_mock_message() for i in range(3)]100        worker_inst._failed_messages = [101            test_utils.new_mock_message() for i in range(4)]102        worker_inst._batch_stop_time = 20103        # _batch_stop_time - get_time > RESET_TIMEOUT_THRESHOLD (20 - 10 > 1)104        get_time.return_value = 10105        releasable_messages = worker_inst._incomplete_messages106        deletable_messages = (107            worker_inst._successful_messages + worker_inst._failed_messages)108        num_deleted, num_released = worker_inst._release_batch()109        mock_release.assert_called_once_with(110            releasable_messages, worker_inst._batch_queue.name)111        mock_delete.assert_called_once_with(112            deletable_messages, worker_inst._batch_queue.name)113        assert not mock_publish_dlq.called, ('No messages should have been '114                                             'moved to dlq.')115        self.assertEqual(num_deleted, len(deletable_messages))116        self.assertEqual(num_released, len(releasable_messages))117        self.assertEqual(0, len(worker_inst._incomplete_messages))118        self.assertEqual(0, len(worker_inst._successful_messages))119        self.assertEqual(0, len(worker_inst._failed_messages))120        self.assertEqual(0, len(worker_inst._permanent_failures))121    def testReleaseBatchWithPermanent(self):122        """Test releasing a batch where the spare time is over the threshold.123        """124        mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')125        mock_consumer.return_value = None126        mock_release = self._create_patch(127            'kale.consumer.Consumer.release_messages')128        mock_delete = self._create_patch(129            'kale.consumer.Consumer.delete_messages')130        mock_publish_dlq = self._create_patch(131            'kale.publisher.Publisher.publish_messages_to_dead_letter_queue')132        get_time = self._create_patch('time.time')133        worker_inst = worker.Worker()134        worker_inst._batch_queue = worker_inst._queue_selector.get_queue()135        mock_consumer.assert_called_once_with()136        worker_inst._incomplete_messages = [137            test_utils.new_mock_message() for i in range(2)]138        worker_inst._successful_messages = [139            test_utils.new_mock_message() for i in range(3)]140        worker_inst._failed_messages = [141            test_utils.new_mock_message() for i in range(4)]142        # Permanent failures should be a subset of failures.143        worker_inst._permanent_failures = worker_inst._failed_messages[:2]144        worker_inst._batch_stop_time = 20145        # _batch_stop_time - get_time > RESET_TIMEOUT_THRESHOLD (20 - 10 > 1)146        get_time.return_value = 10147        releasable_messages = worker_inst._incomplete_messages148        permament_failures = worker_inst._permanent_failures149        deletable_messages = (150            worker_inst._successful_messages + worker_inst._failed_messages)151        num_deleted, num_released = worker_inst._release_batch()152        mock_release.assert_called_once_with(153            releasable_messages, worker_inst._batch_queue.name)154        mock_delete.assert_called_once_with(155            deletable_messages, worker_inst._batch_queue.name)156        mock_publish_dlq.assert_called_once_with(157            worker_inst._batch_queue.dlq_name, permament_failures)158        self.assertEqual(num_deleted, len(deletable_messages))159        self.assertEqual(num_released, len(releasable_messages))160        self.assertEqual(0, len(worker_inst._incomplete_messages))161        self.assertEqual(0, len(worker_inst._successful_messages))162        self.assertEqual(0, len(worker_inst._failed_messages))163        self.assertEqual(0, len(worker_inst._permanent_failures))164    def testReleaseBatchWithNoSuccessfulAndNoTimeLeft(self):165        """Test releasing a batch where the spare time is over the threshold.166        """167        mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')168        mock_consumer.return_value = None169        mock_release = self._create_patch(170            'kale.consumer.Consumer.release_messages')171        mock_delete = self._create_patch(172            'kale.consumer.Consumer.delete_messages')173        mock_publish_dlq = self._create_patch(174            'kale.publisher.Publisher.publish_messages_to_dead_letter_queue')175        get_time = self._create_patch('time.time')176        worker_inst = worker.Worker()177        worker_inst._batch_queue = worker_inst._queue_selector.get_queue()178        mock_consumer.assert_called_once_with()179        worker_inst._successful_messages = []180        worker_inst._incomplete_messages = [181            test_utils.new_mock_message() for i in range(2)]182        worker_inst._failed_messages = [183            test_utils.new_mock_message() for i in range(4)]184        worker_inst._batch_stop_time = 20185        # _batch_stop_time - get_time > RESET_TIMEOUT_THRESHOLD (20 - 19.5 < 1)186        get_time.return_value = 19.5187        deletable_messages = worker_inst._failed_messages188        num_deleted, num_released = worker_inst._release_batch()189        assert not mock_release.called, ('No messages should have '190                                         'been released.')191        # Failed messages should have been deleted.192        mock_delete.assert_called_once_with(193            deletable_messages, worker_inst._batch_queue.name)194        assert not mock_publish_dlq.called, ('No messages should have'195                                             'been moved to dlq.')196        self.assertEqual(num_deleted, len(deletable_messages))197        self.assertEqual(num_released, 0)198        self.assertEqual(0, len(worker_inst._incomplete_messages))199        self.assertEqual(0, len(worker_inst._successful_messages))200        self.assertEqual(0, len(worker_inst._failed_messages))201        self.assertEqual(0, len(worker_inst._permanent_failures))202    def testReleaseBatchWithNoDeletableAndNoTimeLeft(self):203        """Test releasing a batch where the spare time is over the threshold.204        """205        mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')206        mock_consumer.return_value = None207        mock_release = self._create_patch(208            'kale.consumer.Consumer.release_messages')209        mock_delete = self._create_patch(210            'kale.consumer.Consumer.delete_messages')211        mock_publish_dlq = self._create_patch(212            'kale.publisher.Publisher.publish_messages_to_dead_letter_queue')213        get_time = self._create_patch('time.time')214        worker_inst = worker.Worker()215        worker_inst._batch_queue = worker_inst._queue_selector.get_queue()216        mock_consumer.assert_called_once_with()217        worker_inst._successful_messages = []218        worker_inst._failed_messages = []219        worker_inst._incomplete_messages = [220            test_utils.new_mock_message() for i in range(2)]221        worker_inst._batch_stop_time = 20222        # _batch_stop_time - get_time > RESET_TIMEOUT_THRESHOLD (20 - 19.5 < 1)223        get_time.return_value = 19.5224        num_deleted, num_released = worker_inst._release_batch()225        assert not mock_release.called, ('No messages should have '226                                         'been released.')227        assert not mock_delete.called, 'No messages should have been deleted.'228        assert not mock_publish_dlq.called, ('No messages should have'229                                             ' been moved to dlq.')230        self.assertEqual(num_deleted, 0)231        self.assertEqual(num_released, 0)232        self.assertEqual(0, len(worker_inst._incomplete_messages))233        self.assertEqual(0, len(worker_inst._successful_messages))234        self.assertEqual(0, len(worker_inst._failed_messages))235        self.assertEqual(0, len(worker_inst._permanent_failures))236    def testReleaseBatchWithNoDeletableAndWithTimeLeft(self):237        """Test releasing a batch where the spare time is over the threshold.238        """239        mock_consumer = self._create_patch('kale.consumer.Consumer.__init__')240        mock_consumer.return_value = None241        mock_release = self._create_patch(242            'kale.consumer.Consumer.release_messages')243        mock_delete = self._create_patch(244            'kale.consumer.Consumer.delete_messages')245        mock_publish_dlq = self._create_patch(246            'kale.publisher.Publisher.publish_messages_to_dead_letter_queue')247        get_time = self._create_patch('time.time')248        worker_inst = worker.Worker()249        worker_inst._batch_queue = worker_inst._queue_selector.get_queue()250        mock_consumer.assert_called_once_with()251        worker_inst._successful_messages = []252        worker_inst._failed_messages = []253        worker_inst._incomplete_messages = [254            test_utils.new_mock_message() for i in range(2)]255        worker_inst._batch_stop_time = 20256        # _batch_stop_time - get_time > RESET_TIMEOUT_THRESHOLD (20 - 19.5 < 1)257        get_time.return_value = 10258        releasable_messages = worker_inst._incomplete_messages259        num_deleted, num_released = worker_inst._release_batch()260        mock_release.assert_called_once_with(261            releasable_messages, worker_inst._batch_queue.name)262        assert not mock_delete.called, 'No messages should have been deleted.'263        assert not mock_publish_dlq.called, ('No messages should have '264                                             'been moved to dlq.')265        self.assertEqual(num_deleted, 0)266        self.assertEqual(num_released, len(releasable_messages))267        self.assertEqual(0, len(worker_inst._incomplete_messages))268        self.assertEqual(0, len(worker_inst._successful_messages))269        self.assertEqual(0, len(worker_inst._failed_messages))270        self.assertEqual(0, len(worker_inst._permanent_failures))271    def testRunBatchSuccessful(self):272        """Test a successful batch."""273        mock_consumer = self._create_patch('kale.consumer.Consumer')274        get_time = self._create_patch('time.time')275        worker_inst = worker.Worker()276        worker_inst._batch_queue = worker_inst._queue_selector.get_queue()277        mock_consumer.assert_called_once_with()278        worker_inst._batch_stop_time = 100279        # _batch_stop_time - (get_time + task.time_limit) > 0280        # (100 - (10 + 60)) > 0)281        get_time.return_value = 10282        message_batch = [test_utils.new_mock_message()]283        num_messages = len(message_batch)284        worker_inst._run_batch(message_batch)285        self.assertEqual(0, len(worker_inst._incomplete_messages))286        self.assertEqual(num_messages, len(worker_inst._successful_messages))287        self.assertEqual(0, len(worker_inst._failed_messages))288        self.assertEqual(0, len(worker_inst._permanent_failures))289    def testRunBatchNoTimeRemaining(self):290        """Test a batch where there is not enough time remaining."""291        mock_consumer = self._create_patch('kale.consumer.Consumer')292        get_time = self._create_patch('time.time')293        worker_inst = worker.Worker()294        worker_inst._batch_queue = worker_inst._queue_selector.get_queue()295        mock_consumer.assert_called_once_with()296        worker_inst._batch_stop_time = 50297        # _batch_stop_time - (get_time + task.time_limit) > 0298        # (100 - (10 + 60)) < 0)299        get_time.return_value = 10300        message_batch = [test_utils.new_mock_message()]301        num_messages = len(message_batch)302        worker_inst._run_batch(message_batch)303        self.assertEqual(num_messages, len(worker_inst._incomplete_messages))304        self.assertEqual(0, len(worker_inst._successful_messages))305        self.assertEqual(0, len(worker_inst._failed_messages))306        self.assertEqual(0, len(worker_inst._permanent_failures))307    def testRunBatchTaskTimeout(self):308        """Test batch with a task timeout."""309        mock_consumer = self._create_patch('kale.consumer.Consumer')310        get_time = self._create_patch('time.time')311        mock_failure = self._create_patch(312            'kale.test_utils.TimeoutTask.handle_failure')313        mock_failure.return_value = True314        worker_inst = worker.Worker()315        worker_inst._batch_queue = worker_inst._queue_selector.get_queue()316        mock_consumer.assert_called_once_with()317        worker_inst._batch_stop_time = 100318        # _batch_stop_time - (get_time + task.time_limit) > 0319        # (100 - (10 + 60)) > 0)320        get_time.return_value = 10321        message = test_utils.new_mock_message(322            task_class=test_utils.TimeoutTask)323        message_batch = [message]324        num_messages = len(message_batch)325        worker_inst._run_batch(message_batch)326        fail_msg, fail_exc = mock_failure.call_args[0]327        self.assertEqual(fail_msg, message)328        self.assertTrue(type(fail_exc) == exceptions.TimeoutException)329        self.assertEqual(0, len(worker_inst._incomplete_messages))330        self.assertEqual(0, len(worker_inst._successful_messages))331        self.assertEqual(0, len(worker_inst._permanent_failures))332        self.assertEqual(num_messages, len(worker_inst._failed_messages))333    def testRunBatchTaskException(self):334        """Test batch with a task exception."""335        mock_consumer = self._create_patch('kale.consumer.Consumer')336        get_time = self._create_patch('time.time')337        mock_failure = self._create_patch(338            'kale.test_utils.FailTask.handle_failure')339        mock_failure.return_value = True340        worker_inst = worker.Worker()341        worker_inst._batch_queue = worker_inst._queue_selector.get_queue()342        mock_consumer.assert_called_once_with()343        worker_inst._batch_stop_time = 100344        # _batch_stop_time - (get_time + task.time_limit) > 0345        # (100 - (10 + 60)) > 0)346        get_time.return_value = 10347        message = test_utils.new_mock_message(task_class=test_utils.FailTask)348        message_batch = [message]349        num_messages = len(message_batch)350        worker_inst._run_batch(message_batch)351        fail_msg, fail_exc = mock_failure.call_args[0]352        self.assertEqual(fail_msg, message)353        self.assertTrue(type(fail_exc) == exceptions.TaskException)354        self.assertEqual(0, len(worker_inst._incomplete_messages))355        self.assertEqual(0, len(worker_inst._successful_messages))356        self.assertEqual(0, len(worker_inst._permanent_failures))357        self.assertEqual(num_messages, len(worker_inst._failed_messages))358    def testRunBatchTaskExceptionPermanentFailure(self):359        """Test batch with a task exception."""360        mock_consumer = self._create_patch('kale.consumer.Consumer')361        get_time = self._create_patch('time.time')362        mock_failure = self._create_patch(363            'kale.test_utils.FailTask.handle_failure')364        mock_failure.return_value = False365        worker_inst = worker.Worker()366        worker_inst._batch_queue = worker_inst._queue_selector.get_queue()367        mock_consumer.assert_called_once_with()368        worker_inst._batch_stop_time = 100369        # _batch_stop_time - (get_time + task.time_limit) > 0370        # (100 - (10 + 60)) > 0)371        get_time.return_value = 10372        message = test_utils.new_mock_message(task_class=test_utils.FailTask)373        message_batch = [message]374        num_messages = len(message_batch)375        worker_inst._run_batch(message_batch)376        fail_msg, fail_exc = mock_failure.call_args[0]377        self.assertEqual(fail_msg, message)378        self.assertTrue(type(fail_exc) == exceptions.TaskException)379        self.assertEqual(0, len(worker_inst._incomplete_messages))380        self.assertEqual(0, len(worker_inst._successful_messages))381        self.assertEqual(1, len(worker_inst._permanent_failures))382        self.assertEqual(num_messages, len(worker_inst._failed_messages))383    def testCheckProcessExceedingMemory(self):384        """Test process resources method."""385        mock_resource = self._create_patch('resource.getrusage')386        sys_exit = self._create_patch('sys.exit')387        self._create_patch('kale.consumer.Consumer')388        worker_inst = worker.Worker()389        mock_resource.return_value = mock.MagicMock(ru_maxrss=1000000000)390        worker_inst._check_process_resources()391        sys_exit.assert_called_once_with(1)392    def testCheckProcessDirty(self):393        """Test process resources method."""394        mock_resource = self._create_patch('resource.getrusage')395        mock_resource.return_value = mock.MagicMock(ru_maxrss=10)396        sys_exit = self._create_patch('sys.exit')397        self._create_patch('kale.consumer.Consumer')398        worker_inst = worker.Worker()399        worker_inst._dirty = True400        self.assertTrue(worker_inst._check_process_resources())401        self.assertFalse(sys_exit.called)402    def testCheckProcessNotDirty(self):403        """Test process resources method."""404        mock_logger = self._create_patch('kale.worker.logger.info')405        mock_resource = self._create_patch('resource.getrusage')406        mock_resource.return_value = mock.MagicMock(ru_maxrss=10)407        sys_exit = self._create_patch('sys.exit')408        self._create_patch('kale.consumer.Consumer')409        worker_inst = worker.Worker()410        worker_inst._dirty = False411        self.assertTrue(worker_inst._check_process_resources())412        self.assertFalse(mock_logger.called)413        self.assertFalse(sys_exit.called)414    def testRemoveMessageOrExitSuccess(self):415        """Test remove_message_or_exit method."""416        sys_exit = self._create_patch('sys.exit')417        worker_inst = worker.Worker()418        worker_inst._incomplete_messages = [1, 2]419        worker_inst.remove_message_or_exit(1)420        self.assertEqual(worker_inst._incomplete_messages, [2])421        sys_exit.assert_not_called()422    def testRemoveMessageOrExitFailure(self):423        """Test remove_message_or_exit method."""424        sys_exit = self._create_patch('sys.exit')425        worker_inst = worker.Worker()426        worker_inst._incomplete_messages = [1, 2]427        worker_inst.remove_message_or_exit(3)428        self.assertEqual(worker_inst._incomplete_messages, [1, 2])...store_configuration_test.py
Source:store_configuration_test.py  
1from unittest import mock2import pytest3from confluent_kafka import Consumer4from streaming_data_types.fbschemas.forwarder_config_update_rf5k.UpdateType import (5    UpdateType,6)7from streaming_data_types.forwarder_config_update_rf5k import (8    Protocol,9    StreamInfo,10    serialise_rf5k,11)12from forwarder.configuration_store import ConfigurationStore13from forwarder.parse_config_update import (14    Channel,15    EpicsProtocol,16    config_change_to_command_type,17    parse_config_update,18)19from tests.kafka.fake_producer import FakeProducer20DUMMY_UPDATE_HANDLER = None21CHANNELS_TO_STORE = {22    Channel("channel1", EpicsProtocol.PVA, "topic1", "f142"): DUMMY_UPDATE_HANDLER,23    Channel("channel2", EpicsProtocol.CA, "topic2", "tdct"): DUMMY_UPDATE_HANDLER,24}25STREAMS_TO_RETRIEVE = [26    StreamInfo(27        channel.name,28        channel.schema,29        channel.output_topic,30        Protocol.Protocol.PVA31        if channel.protocol == EpicsProtocol.PVA32        else Protocol.Protocol.CA,33    )34    for channel in CHANNELS_TO_STORE.keys()35]36class FakeKafkaMessage:37    def __init__(self, message):38        self._message = message39    def value(self):40        return self._message41def assert_stored_channel_correct(outputted_channel):42    # Will only be found if key exists and as the key is the channel43    # it will only match if the values are exactly the same.44    assert outputted_channel in CHANNELS_TO_STORE45def test_when_multiple_pvs_dumped_config_contains_all_pv_details():46    producer = FakeProducer()47    store = ConfigurationStore(producer, consumer=None, topic="store_topic")48    store.save_configuration(CHANNELS_TO_STORE)49    stored_message = parse_config_update(producer.published_payload)  # type: ignore50    stored_channels = stored_message.channels51    assert_stored_channel_correct(stored_channels[0])  # type: ignore52    assert_stored_channel_correct(stored_channels[1])  # type: ignore53def test_when_no_pvs_stored_message_type_is_remove_all():54    producer = FakeProducer()55    store = ConfigurationStore(producer, consumer=None, topic="store_topic")56    store.save_configuration({})57    stored_message = parse_config_update(producer.published_payload)  # type: ignore58    assert stored_message.channels is None59    assert (60        stored_message.command_type61        == config_change_to_command_type[UpdateType.REMOVEALL]62    )63def test_retrieving_stored_info_with_no_pvs_gets_message_without_streams():64    mock_consumer = mock.create_autospec(Consumer)65    mock_consumer.get_watermark_offsets.return_value = (0, 100)66    message = serialise_rf5k(UpdateType.REMOVEALL, [])67    mock_consumer.consume.return_value = [FakeKafkaMessage(message)]68    store = ConfigurationStore(69        producer=None, consumer=mock_consumer, topic="store_topic"70    )71    config = parse_config_update(store.retrieve_configuration())72    assert config.channels is None73def test_retrieving_stored_info_with_multiple_pvs_gets_streams():74    mock_consumer = mock.create_autospec(Consumer)75    mock_consumer.get_watermark_offsets.return_value = (0, 100)76    message = serialise_rf5k(UpdateType.ADD, STREAMS_TO_RETRIEVE)77    mock_consumer.consume.return_value = [FakeKafkaMessage(message)]78    store = ConfigurationStore(79        producer=None, consumer=mock_consumer, topic="store_topic"80    )81    config = parse_config_update(store.retrieve_configuration())82    channels = config.channels83    assert_stored_channel_correct(channels[0])  # type: ignore84    assert_stored_channel_correct(channels[1])  # type: ignore85def test_retrieve_config_find_valid_message_amongst_junk():86    message = serialise_rf5k(UpdateType.ADD, STREAMS_TO_RETRIEVE)87    messages_in_storage_topic = [88        [FakeKafkaMessage(":: SOME JUNK MESSAGE 1 ::")],89        [FakeKafkaMessage(":: SOME JUNK MESSAGE 2 ::")],90        [FakeKafkaMessage(message)],91        [FakeKafkaMessage(":: SOME JUNK MESSAGE 3 ::")],92        [FakeKafkaMessage(":: SOME JUNK MESSAGE 4 ::")],93    ]  # type: ignore94    mock_consumer = mock.create_autospec(Consumer)95    mock_consumer.get_watermark_offsets.return_value = (96        0,97        len(messages_in_storage_topic),98    )99    mock_consumer.consume.side_effect = messages_in_storage_topic100    store = ConfigurationStore(101        producer=None, consumer=mock_consumer, topic="store_topic"102    )103    config = parse_config_update(store.retrieve_configuration())104    channels = config.channels105    assert_stored_channel_correct(channels[0])  # type: ignore106    assert_stored_channel_correct(channels[1])  # type: ignore107def test_retrieve_config_with_only_junk_as_message_in_storage_topic():108    messages_in_storage_topic = [109        [FakeKafkaMessage(":: SOME JUNK MESSAGE 1 ::")],110        [FakeKafkaMessage(":: SOME JUNK MESSAGE 2 ::")],111        [FakeKafkaMessage(":: SOME JUNK MESSAGE 3 ::")],112    ]  # type: ignore113    mock_consumer = mock.create_autospec(Consumer)114    mock_consumer.get_watermark_offsets.return_value = (115        0,116        len(messages_in_storage_topic),117    )118    mock_consumer.consume.side_effect = messages_in_storage_topic119    store = ConfigurationStore(120        producer=None, consumer=mock_consumer, topic="store_topic"121    )122    with pytest.raises(RuntimeError):123        store.retrieve_configuration()124def test_retrieve_config_with_empty_storage_topic():125    messages_in_storage_topic = []  # type: ignore126    mock_consumer = mock.create_autospec(Consumer)127    mock_consumer.get_watermark_offsets.return_value = (128        0,129        len(messages_in_storage_topic),130    )131    mock_consumer.consume.side_effect = messages_in_storage_topic132    store = ConfigurationStore(133        producer=None, consumer=mock_consumer, topic="store_topic"134    )135    with pytest.raises(RuntimeError):...TestParser.py
Source:TestParser.py  
1import nose2from pymock import *3import sys4sys.path.insert(0,'..')5from profileparser import ProfileParser6class TestParser(PyMockTestCase):7	def test_should_construct_with_consumer(self):8		mock_consumer = self.mock()9		parser = ProfileParser(mock_consumer)10	def test_should_parse_thread_msg(self):11		mock_consumer = self.mock()12		parser = ProfileParser(mock_consumer)13		self.expectAndReturn(mock_consumer.on_thread(332, "My Thread Name"), None)14		self.replay()15		parser.parse("T 332 My Thread Name")16		self.verify()17	def test_should_parse_thread_msg_2(self):18		mock_consumer = self.mock()19		parser = ProfileParser(mock_consumer)20		self.expectAndReturn(mock_consumer.on_thread(102, "Another Thread"), None)21		self.replay()22		parser.parse("T 102 Another Thread")23		self.verify()24	def test_should_parse_function_msg(self):25		mock_consumer = self.mock()26		parser = ProfileParser(mock_consumer)27		self.expectAndReturn(mock_consumer.on_function(113, 223, "My function name"), None)28		self.replay()29		parser.parse("F 113 223 My function name")30		self.verify()31	def test_should_parse_function_msg_2(self):32		mock_consumer = self.mock()33		parser = ProfileParser(mock_consumer)34		self.expectAndReturn(mock_consumer.on_function(43, 44, "Another function"), None)35		self.replay()36		parser.parse("F 43 44 Another function")37		self.verify()38	def test_should_parse_sample_start_msg(self):39		mock_consumer = self.mock()40		parser = ProfileParser(mock_consumer)41		self.expectAndReturn(mock_consumer.on_sample_start(0, 113, 333), None)42		self.replay()43		parser.parse("S 0 113 333")44		self.verify()45	def test_should_parse_sample_start_msg_2(self):46		mock_consumer = self.mock()47		parser = ProfileParser(mock_consumer)48		self.expectAndReturn(mock_consumer.on_sample_start(2, 1, 444), None)49		self.replay()50		parser.parse("S 2 1 444")51		self.verify()52	def test_should_parse_sample_finish_msg(self):53		mock_consumer = self.mock()54		parser = ProfileParser(mock_consumer)55		self.expectAndReturn(mock_consumer.on_sample_finish(4, 113, 333), None)56		self.replay()57		parser.parse("E 4 113 333")58		self.verify()59	def test_should_parse_sample_finish_msg_2(self):60		mock_consumer = self.mock()61		parser = ProfileParser(mock_consumer)62		self.expectAndReturn(mock_consumer.on_sample_finish(7, 1, 444), None)63		self.replay()64		parser.parse("E 7 1 444")65		self.verify()66	def test_should_ignore_comments(self):67		mock_consumer = self.mock()68		parser = ProfileParser(mock_consumer)69		self.replay()70		parser.parse("#a_comment_without_spaces")71		parser.parse("# another comment with spaces")72		self.verify()73	74	def test_should_parse_event_msg(self):75		mock_consumer = self.mock()76		parser = ProfileParser(mock_consumer)77		self.expectAndReturn(mock_consumer.on_event(113, 223, "My event name"), None)78		self.replay()79		parser.parse("V 113 223 My event name")80		self.verify()81	def test_should_parse_event_msg_2(self):82		mock_consumer = self.mock()83		parser = ProfileParser(mock_consumer)84		self.expectAndReturn(mock_consumer.on_event(43, 44, "Another event"), None)85		self.replay()86		parser.parse("V 43 44 Another event")87		self.verify()88	89	def test_should_parse_emit_event_msg(self):90		mock_consumer = self.mock()91		parser = ProfileParser(mock_consumer)92		self.expectAndReturn(mock_consumer.on_event_emit(0, 113, 333), None)93		self.replay()94		parser.parse("Y 0 113 333")95		self.verify()96	def test_should_parse_emit_event_msg_2(self):97		mock_consumer = self.mock()98		parser = ProfileParser(mock_consumer)99		self.expectAndReturn(mock_consumer.on_event_emit(2, 1, 444), None)100		self.replay()101		parser.parse("Y 2 1 444")102		self.verify()103	def test_should_parse_counter_registration(self):104		mock_consumer = self.mock()105		parser = ProfileParser(mock_consumer)106		self.expectAndReturn(mock_consumer.on_counter(132, "My counter"), None)107		self.replay()108		parser.parse("C 132 My counter")109		self.verify()110	111	def test_should_parse_counter_value(self):112		mock_consumer = self.mock()113		parser = ProfileParser(mock_consumer)114		self.expectAndReturn(mock_consumer.on_counter_value(132, 22, 44), None)115		self.replay()116		parser.parse("D 132 22 44")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
