Best Python code snippet using lisa_python
watch_test.py
Source:watch_test.py  
1#!/usr/bin/env python2# Copyright 2016 The Kubernetes Authors.3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8#     http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15import unittest16from mock import Mock, call17from .watch import Watch18class WatchTests(unittest.TestCase):19  def setUp(self):20    # counter for a test that needs test global state21    self.callcount = 022  def test_watch_with_decode(self):23    fake_resp = Mock()24    fake_resp.close = Mock()25    fake_resp.release_conn = Mock()26    fake_resp.read_chunked = Mock(return_value=[27        '{"type": "ADDED", "object": {"metadata": {"name": "test1",'28        '"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',29        '{"type": "ADDED", "object": {"metadata": {"name": "test2",'30        '"resourceVersion": "2"}, "spec": {}, "sta', 'tus": {}}}\n'31        '{"type": "ADDED", "object": {"metadata": {"name": "test3",'32        '"resourceVersion": "3"}, "spec": {}, "status": {}}}\n',33        'should_not_happened\n'34    ])35    fake_api = Mock()36    fake_api.get_namespaces = Mock(return_value=fake_resp)37    fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'38    w = Watch()39    count = 140    for e in w.stream(fake_api.get_namespaces):41      self.assertEqual('ADDED', e['type'])42      # make sure decoder worked and we got a model with the right name43      self.assertEqual('test%d' % count, e['object'].metadata.name)44      # make sure decoder worked and updated Watch.resource_version45      self.assertEqual('%d' % count, e['object'].metadata.resource_version)46      self.assertEqual('%d' % count, w.resource_version)47      count += 148      # make sure we can stop the watch and the last event with won't be49      # returned50      if count == 4:51        w.stop()52    fake_api.get_namespaces.assert_called_once_with(53        _preload_content=False, watch=True)54    fake_resp.read_chunked.assert_called_once_with(decode_content=False)55    fake_resp.close.assert_called_once()56    fake_resp.release_conn.assert_called_once()57  def test_watch_for_follow(self):58    fake_resp = Mock()59    fake_resp.close = Mock()60    fake_resp.release_conn = Mock()61    fake_resp.read_chunked = Mock(return_value=['log_line_1\n', 'log_line_2\n'])62    fake_api = Mock()63    fake_api.read_namespaced_pod_log = Mock(return_value=fake_resp)64    fake_api.read_namespaced_pod_log.__doc__ = (':param bool follow:\n:return: '65                                                'str')66    w = Watch()67    count = 168    for e in w.stream(fake_api.read_namespaced_pod_log):69      self.assertEqual('log_line_1', e)70      count += 171      # make sure we can stop the watch and the last event with won't be72      # returned73      if count == 2:74        w.stop()75    fake_api.read_namespaced_pod_log.assert_called_once_with(76        _preload_content=False, follow=True)77    fake_resp.read_chunked.assert_called_once_with(decode_content=False)78    fake_resp.close.assert_called_once()79    fake_resp.release_conn.assert_called_once()80  def test_watch_resource_version_set(self):81    # https://github.com/kubernetes-client/python/issues/70082    # ensure watching from a resource version does reset to resource83    # version 0 after k8s resets the watch connection84    fake_resp = Mock()85    fake_resp.close = Mock()86    fake_resp.release_conn = Mock()87    values = [88        '{"type": "ADDED", "object": {"metadata": {"name": "test1",'89        '"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',90        '{"type": "ADDED", "object": {"metadata": {"name": "test2",'91        '"resourceVersion": "2"}, "spec": {}, "sta', 'tus": {}}}\n'92        '{"type": "ADDED", "object": {"metadata": {"name": "test3",'93        '"resourceVersion": "3"}, "spec": {}, "status": {}}}\n'94    ]95    # return nothing on the first call and values on the second96    # this emulates a watch from a rv that returns nothing in the first k8s97    # watch reset and values later98    def get_values(*args, **kwargs):99      self.callcount += 1100      if self.callcount == 1:101        return []102      else:103        return values104    fake_resp.read_chunked = Mock(side_effect=get_values)105    fake_api = Mock()106    fake_api.get_namespaces = Mock(return_value=fake_resp)107    fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'108    w = Watch()109    # ensure we keep our requested resource version or the version latest110    # returned version when the existing versions are older than the111    # requested version112    # needed for the list existing objects, then watch from there use case113    calls = []114    iterations = 2115    # first two calls must use the passed rv, the first call is a116    # "reset" and does not actually return anything117    # the second call must use the same rv but will return values118    # (with a wrong rv but a real cluster would behave correctly)119    # calls following that will use the rv from those returned values120    calls.append(call(_preload_content=False, watch=True, resource_version='5'))121    calls.append(call(_preload_content=False, watch=True, resource_version='5'))122    for i in range(iterations):123      # ideally we want 5 here but as rv must be treated as an124      # opaque value we cannot interpret it and order it so rely125      # on k8s returning the events completely and in order126      calls.append(127          call(_preload_content=False, watch=True, resource_version='3'))128    for c, e in enumerate(129        w.stream(fake_api.get_namespaces, resource_version='5')):130      if c == len(values) * iterations:131        w.stop()132    # check calls are in the list, gives good error output133    fake_api.get_namespaces.assert_has_calls(calls)134    # more strict test with worse error message135    self.assertEqual(fake_api.get_namespaces.mock_calls, calls)136  def test_watch_stream_twice(self):137    w = Watch(float)138    for step in ['first', 'second']:139      fake_resp = Mock()140      fake_resp.close = Mock()141      fake_resp.release_conn = Mock()142      fake_resp.read_chunked = Mock(143          return_value=['{"type": "ADDED", "object": 1}\n'] * 4)144      fake_api = Mock()145      fake_api.get_namespaces = Mock(return_value=fake_resp)146      fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'147      count = 1148      for e in w.stream(fake_api.get_namespaces):149        count += 1150        if count == 3:151          w.stop()152      self.assertEqual(count, 3)153      fake_api.get_namespaces.assert_called_once_with(154          _preload_content=False, watch=True)155      fake_resp.read_chunked.assert_called_once_with(decode_content=False)156      fake_resp.close.assert_called_once()157      fake_resp.release_conn.assert_called_once()158  def test_watch_stream_loop(self):159    w = Watch(float)160    fake_resp = Mock()161    fake_resp.close = Mock()162    fake_resp.release_conn = Mock()163    fake_resp.read_chunked = Mock(164        return_value=['{"type": "ADDED", "object": 1}\n'])165    fake_api = Mock()166    fake_api.get_namespaces = Mock(return_value=fake_resp)167    fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'168    count = 0169    # when timeout_seconds is set, auto-exist when timeout reaches170    for e in w.stream(fake_api.get_namespaces, timeout_seconds=1):171      count = count + 1172    self.assertEqual(count, 1)173    # when no timeout_seconds, only exist when w.stop() is called174    for e in w.stream(fake_api.get_namespaces):175      count = count + 1176      if count == 2:177        w.stop()178    self.assertEqual(count, 2)179    self.assertEqual(fake_api.get_namespaces.call_count, 2)180    self.assertEqual(fake_resp.read_chunked.call_count, 2)181    self.assertEqual(fake_resp.close.call_count, 2)182    self.assertEqual(fake_resp.release_conn.call_count, 2)183  def test_unmarshal_with_float_object(self):184    w = Watch()185    event = w.unmarshal_event('{"type": "ADDED", "object": 1}', 'float')186    self.assertEqual('ADDED', event['type'])187    self.assertEqual(1.0, event['object'])188    self.assertTrue(isinstance(event['object'], float))189    self.assertEqual(1, event['raw_object'])190  def test_unmarshal_with_no_return_type(self):191    w = Watch()192    event = w.unmarshal_event('{"type": "ADDED", "object": ["test1"]}', None)193    self.assertEqual('ADDED', event['type'])194    self.assertEqual(['test1'], event['object'])195    self.assertEqual(['test1'], event['raw_object'])196  def test_unmarshal_with_custom_object(self):197    w = Watch()198    event = w.unmarshal_event(199        '{"type": "ADDED", "object": {"apiVersion":'200        '"test.com/v1beta1","kind":"foo","metadata":'201        '{"name": "bar", "resourceVersion": "1"}}}', 'object')202    self.assertEqual('ADDED', event['type'])203    # make sure decoder deserialized json into dictionary and updated204    # Watch.resource_version205    self.assertTrue(isinstance(event['object'], dict))206    self.assertEqual('1', event['object']['metadata']['resourceVersion'])207    self.assertEqual('1', w.resource_version)208  def test_watch_with_exception(self):209    fake_resp = Mock()210    fake_resp.close = Mock()211    fake_resp.release_conn = Mock()212    fake_resp.read_chunked = Mock(side_effect=KeyError('expected'))213    fake_api = Mock()214    fake_api.get_thing = Mock(return_value=fake_resp)215    w = Watch()216    try:217      for _ in w.stream(fake_api.get_thing):218        self.fail(self, 'Should fail on exception.')219    except KeyError:220      pass221      # expected222    fake_api.get_thing.assert_called_once_with(223        _preload_content=False, watch=True)224    fake_resp.read_chunked.assert_called_once_with(decode_content=False)225    fake_resp.close.assert_called_once()226    fake_resp.release_conn.assert_called_once()227if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
