How to use filter_log_events method in localstack

Best Python code snippet using localstack_python

test_logs.py

Source:test_logs.py Github

copy

Full Screen

1import copy2import mock3import sys4import unittest5import botocore6from slam import cli7from .test_deploy import config, describe_stacks_response8BUILTIN = '__builtin__'9if sys.version_info >= (3, 0):10 BUILTIN = 'builtins'11class LogsTests(unittest.TestCase):12 @mock.patch(BUILTIN + '.print')13 @mock.patch('slam.cli.time.time', return_value=1000)14 @mock.patch('slam.cli.boto3.client')15 @mock.patch('slam.cli._load_config', return_value=config)16 def test_dev_logs(self, _load_config, client, time, mock_print):17 mock_cfn = mock.MagicMock()18 mock_logs = mock.MagicMock()19 mock_cfn.describe_stacks.return_value = describe_stacks_response20 mock_logs.filter_log_events.side_effect = [21 {22 'events': [23 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990000,24 'message': 'foo'},25 {'logStreamName': 'abc[1]', 'timestamp': 990000,26 'message': 'foo'},27 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990050,28 'message': 'baz'},29 ]30 },31 {32 'events': [33 {'timestamp': 990025, 'message': 'bar'}34 ]35 }36 ]37 client.side_effect = [mock_cfn, mock_logs]38 cli.main(['logs'])39 mock_logs.filter_log_events.assert_any_call(40 logGroupName='/aws/lambda/foo', startTime=940000, interleaved=True)41 mock_logs.filter_log_events.assert_any_call(42 logGroupName='API-Gateway-Execution-Logs_123abc/dev',43 startTime=940000, interleaved=True)44 self.assertEqual(mock_print.call_count, 3)45 self.assertIn(' foo', mock_print.call_args_list[0][0][0])46 self.assertIn(' bar', mock_print.call_args_list[1][0][0])47 self.assertIn(' baz', mock_print.call_args_list[2][0][0])48 @mock.patch(BUILTIN + '.print')49 @mock.patch('slam.cli.time.time', return_value=1000)50 @mock.patch('slam.cli.boto3.client')51 @mock.patch('slam.cli._load_config', return_value=config)52 def test_dev_no_api_logs(self, _load_config, client, time, mock_print):53 mock_cfn = mock.MagicMock()54 mock_logs = mock.MagicMock()55 r = copy.deepcopy(describe_stacks_response)56 r['Stacks'][0]['Outputs'] = [57 {'OutputKey': 'FunctionArn', 'OutputValue': 'arn:lambda:foo'},58 ]59 mock_cfn.describe_stacks.return_value = r60 mock_logs.filter_log_events.side_effect = [61 {62 'events': [63 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990000,64 'message': 'foo'},65 {'logStreamName': 'abc[1]', 'timestamp': 990000,66 'message': 'foo'},67 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990050,68 'message': 'baz'},69 ]70 }71 ]72 client.side_effect = [mock_cfn, mock_logs]73 cli.main(['logs'])74 mock_logs.filter_log_events.assert_called_once_with(75 logGroupName='/aws/lambda/foo', startTime=940000, interleaved=True)76 self.assertEqual(mock_print.call_count, 2)77 self.assertIn(' foo', mock_print.call_args_list[0][0][0])78 self.assertIn(' baz', mock_print.call_args_list[1][0][0])79 @mock.patch(BUILTIN + '.print')80 @mock.patch('slam.cli.time.time', return_value=1000)81 @mock.patch('slam.cli.boto3.client')82 @mock.patch('slam.cli._load_config', return_value=config)83 def test_stage_logs(self, _load_config, client, time, mock_print):84 mock_cfn = mock.MagicMock()85 mock_logs = mock.MagicMock()86 mock_cfn.describe_stacks.return_value = describe_stacks_response87 mock_logs.filter_log_events.side_effect = [88 {89 'events': [90 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990000,91 'message': 'foo'},92 {'logStreamName': 'abc[2]', 'timestamp': 990000,93 'message': 'foo'},94 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990050,95 'message': 'baz'},96 ]97 },98 {99 'events': [100 {'timestamp': 990025, 'message': 'bar'}101 ]102 }103 ]104 client.side_effect = [mock_cfn, mock_logs]105 cli.main(['logs', '--stage', 'prod'])106 mock_logs.filter_log_events.assert_any_call(107 logGroupName='/aws/lambda/foo', startTime=940000, interleaved=True)108 mock_logs.filter_log_events.assert_any_call(109 logGroupName='API-Gateway-Execution-Logs_123abc/prod',110 startTime=940000, interleaved=True)111 self.assertEqual(mock_print.call_count, 2)112 self.assertIn(' foo', mock_print.call_args_list[0][0][0])113 self.assertIn(' bar', mock_print.call_args_list[1][0][0])114 @mock.patch(BUILTIN + '.print')115 @mock.patch('slam.cli.time.time', return_value=1000)116 @mock.patch('slam.cli.boto3.client')117 @mock.patch('slam.cli._load_config', return_value=config)118 def test_no_group(self, _load_config, client, time, mock_print):119 mock_cfn = mock.MagicMock()120 mock_logs = mock.MagicMock()121 mock_cfn.describe_stacks.return_value = describe_stacks_response122 mock_logs.filter_log_events.side_effect = [123 botocore.exceptions.ClientError({'Error': {}}, 'operation'),124 {125 'events': [126 {'timestamp': 990025, 'message': 'bar'}127 ]128 }129 ]130 client.side_effect = [mock_cfn, mock_logs]131 cli.main(['logs'])132 mock_logs.filter_log_events.assert_any_call(133 logGroupName='/aws/lambda/foo', startTime=940000, interleaved=True)134 mock_logs.filter_log_events.assert_any_call(135 logGroupName='API-Gateway-Execution-Logs_123abc/dev',136 startTime=940000, interleaved=True)137 self.assertEqual(mock_print.call_count, 1)138 self.assertIn(' bar', mock_print.call_args_list[0][0][0])139 @mock.patch(BUILTIN + '.print')140 @mock.patch('slam.cli.time.time', return_value=1000)141 @mock.patch('slam.cli.boto3.client')142 @mock.patch('slam.cli._load_config', return_value=config)143 def test_paginated_logs(self, _load_config, client, time, mock_print):144 mock_cfn = mock.MagicMock()145 mock_logs = mock.MagicMock()146 mock_cfn.describe_stacks.return_value = describe_stacks_response147 mock_logs.filter_log_events.side_effect = [148 {149 'events': [150 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990000,151 'message': 'foo'},152 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990050,153 'message': 'bar'},154 ],155 'nextToken': 'foo-token'156 },157 {158 'events': [159 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990075,160 'message': 'baz'},161 ]162 },163 {164 'events': []165 }166 ]167 client.side_effect = [mock_cfn, mock_logs]168 cli.main(['logs'])169 mock_logs.filter_log_events.assert_any_call(170 logGroupName='/aws/lambda/foo', startTime=940000, interleaved=True)171 mock_logs.filter_log_events.assert_any_call(172 logGroupName='/aws/lambda/foo', startTime=990051, interleaved=True,173 nextToken='foo-token')174 self.assertEqual(mock_print.call_count, 3)175 self.assertIn(' foo', mock_print.call_args_list[0][0][0])176 self.assertIn(' bar', mock_print.call_args_list[1][0][0])177 self.assertIn(' baz', mock_print.call_args_list[2][0][0])178 @mock.patch('slam.cli.time.sleep')179 @mock.patch(BUILTIN + '.print')180 @mock.patch('slam.cli.time.time', return_value=1000)181 @mock.patch('slam.cli.boto3.client')182 @mock.patch('slam.cli._load_config', return_value=config)183 def test_tailed_logs(self, _load_config, client, time, mock_print,184 mock_sleep):185 mock_cfn = mock.MagicMock()186 mock_logs = mock.MagicMock()187 mock_cfn.describe_stacks.return_value = describe_stacks_response188 mock_logs.filter_log_events.side_effect = [189 {190 'events': [191 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990000,192 'message': 'foo'},193 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990050,194 'message': 'bar'},195 ],196 },197 {198 'events': []199 },200 {201 'events': [202 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990075,203 'message': 'baz'},204 ]205 },206 {207 'events': [208 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990074,209 'message': 'api'},210 ]211 },212 RuntimeError213 ]214 client.side_effect = [mock_cfn, mock_logs]215 try:216 cli.main(['logs', '--tail'])217 except RuntimeError:218 pass219 mock_logs.filter_log_events.assert_any_call(220 logGroupName='/aws/lambda/foo', startTime=940000, interleaved=True)221 mock_logs.filter_log_events.assert_any_call(222 logGroupName='/aws/lambda/foo', startTime=990051, interleaved=True)223 self.assertEqual(mock_sleep.call_count, 2)224 mock_sleep.assert_any_call(5)225 self.assertIn(' foo', mock_print.call_args_list[0][0][0])226 self.assertIn(' bar', mock_print.call_args_list[1][0][0])227 self.assertIn(' api', mock_print.call_args_list[2][0][0])228 self.assertIn(' baz', mock_print.call_args_list[3][0][0])229 @mock.patch('slam.cli.time.time', return_value=1000000)230 @mock.patch('slam.cli.boto3.client')231 @mock.patch('slam.cli._load_config', return_value=config)232 def test_week_logs(self, _load_config, client, time):233 mock_cfn = mock.MagicMock()234 mock_logs = mock.MagicMock()235 mock_cfn.describe_stacks.return_value = describe_stacks_response236 mock_logs.filter_log_events.side_effect = [237 {238 'events': [239 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990000,240 'message': 'foo'},241 {'logStreamName': 'abc[1]', 'timestamp': 990000,242 'message': 'foo'},243 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990050,244 'message': 'baz'},245 ]246 },247 {248 'events': []249 }250 ]251 client.side_effect = [mock_cfn, mock_logs]252 cli.main(['logs', '--period', '1w'])253 mock_logs.filter_log_events.assert_any_call(254 logGroupName='/aws/lambda/foo', startTime=395200000,255 interleaved=True)256 @mock.patch('slam.cli.time.time', return_value=1000000)257 @mock.patch('slam.cli.boto3.client')258 @mock.patch('slam.cli._load_config', return_value=config)259 def test_day_logs(self, _load_config, client, time):260 mock_cfn = mock.MagicMock()261 mock_logs = mock.MagicMock()262 mock_cfn.describe_stacks.return_value = describe_stacks_response263 mock_logs.filter_log_events.side_effect = [264 {265 'events': [266 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990000,267 'message': 'foo'},268 {'logStreamName': 'abc[1]', 'timestamp': 990000,269 'message': 'foo'},270 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990050,271 'message': 'baz'},272 ]273 },274 {275 'events': []276 }277 ]278 client.side_effect = [mock_cfn, mock_logs]279 cli.main(['logs', '--period', '2.5d'])280 mock_logs.filter_log_events.assert_any_call(281 logGroupName='/aws/lambda/foo', startTime=784000000,282 interleaved=True)283 @mock.patch('slam.cli.time.time', return_value=1000000)284 @mock.patch('slam.cli.boto3.client')285 @mock.patch('slam.cli._load_config', return_value=config)286 def test_hour_logs(self, _load_config, client, time):287 mock_cfn = mock.MagicMock()288 mock_logs = mock.MagicMock()289 mock_cfn.describe_stacks.return_value = describe_stacks_response290 mock_logs.filter_log_events.side_effect = [291 {292 'events': [293 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990000,294 'message': 'foo'},295 {'logStreamName': 'abc[1]', 'timestamp': 990000,296 'message': 'foo'},297 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990050,298 'message': 'baz'},299 ]300 },301 {302 'events': []303 }304 ]305 client.side_effect = [mock_cfn, mock_logs]306 cli.main(['logs', '--period', '5h'])307 mock_logs.filter_log_events.assert_any_call(308 logGroupName='/aws/lambda/foo', startTime=982000000,309 interleaved=True)310 @mock.patch('slam.cli.time.time', return_value=1000000)311 @mock.patch('slam.cli.boto3.client')312 @mock.patch('slam.cli._load_config', return_value=config)313 def test_minute_logs(self, _load_config, client, time):314 mock_cfn = mock.MagicMock()315 mock_logs = mock.MagicMock()316 mock_cfn.describe_stacks.return_value = describe_stacks_response317 mock_logs.filter_log_events.side_effect = [318 {319 'events': [320 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990000,321 'message': 'foo'},322 {'logStreamName': 'abc[1]', 'timestamp': 990000,323 'message': 'foo'},324 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990050,325 'message': 'baz'},326 ]327 },328 {329 'events': []330 }331 ]332 client.side_effect = [mock_cfn, mock_logs]333 cli.main(['logs', '--period', '10m'])334 mock_logs.filter_log_events.assert_any_call(335 logGroupName='/aws/lambda/foo', startTime=999400000,336 interleaved=True)337 @mock.patch('slam.cli.time.time', return_value=1000000)338 @mock.patch('slam.cli.boto3.client')339 @mock.patch('slam.cli._load_config', return_value=config)340 def test_second_logs(self, _load_config, client, time):341 mock_cfn = mock.MagicMock()342 mock_logs = mock.MagicMock()343 mock_cfn.describe_stacks.return_value = describe_stacks_response344 mock_logs.filter_log_events.side_effect = [345 {346 'events': [347 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990000,348 'message': 'foo'},349 {'logStreamName': 'abc[1]', 'timestamp': 990000,350 'message': 'foo'},351 {'logStreamName': 'abc[$LATEST]', 'timestamp': 990050,352 'message': 'baz'},353 ]354 },355 {356 'events': []357 }358 ]359 client.side_effect = [mock_cfn, mock_logs]360 cli.main(['logs', '--period', '6s'])361 mock_logs.filter_log_events.assert_any_call(362 logGroupName='/aws/lambda/foo', startTime=999994000,363 interleaved=True)364 @mock.patch('slam.cli.time.time', return_value=1000000)365 @mock.patch('slam.cli.boto3.client')366 @mock.patch('slam.cli._load_config', return_value=config)367 def test_invalid_period_suffix(self, _load_config, client, time):368 mock_cfn = mock.MagicMock()369 mock_logs = mock.MagicMock()370 mock_cfn.describe_stacks.return_value = describe_stacks_response371 mock_logs.filter_log_events.return_value = {372 'events': [373 {'timestamp': 1000, 'message': 'foo'},374 {'timestamp': 1001, 'message': 'bar'},375 ]376 }377 client.side_effect = [mock_cfn, mock_logs]378 self.assertRaises(ValueError, cli.main, ['logs', '--period', '5b'])379 @mock.patch('slam.cli.time.time', return_value=1000000)380 @mock.patch('slam.cli.boto3.client')381 @mock.patch('slam.cli._load_config', return_value=config)382 def test_invalid_period_value(self, _load_config, client, time):383 mock_cfn = mock.MagicMock()384 mock_logs = mock.MagicMock()385 mock_cfn.describe_stacks.return_value = describe_stacks_response386 mock_logs.filter_log_events.return_value = {387 'events': [388 {'timestamp': 1000, 'message': 'foo'},389 {'timestamp': 1001, 'message': 'bar'},390 ]391 }392 client.side_effect = [mock_cfn, mock_logs]393 self.assertRaises(ValueError, cli.main, ['logs', '--period', '5ad'])394 @mock.patch('slam.cli.boto3.client')395 @mock.patch('slam.cli._load_config', return_value=config)396 def test_not_deployed(self, _load_config, client):397 mock_cfn = mock.MagicMock()398 mock_logs = mock.MagicMock()399 mock_cfn.describe_stacks.side_effect = \400 botocore.exceptions.ClientError({'Error': {}}, 'operation')401 mock_logs.filter_log_events.return_value = {402 'events': [403 {'timestamp': 1000, 'message': 'foo'},404 {'timestamp': 1001, 'message': 'bar'},405 ]406 }407 client.side_effect = [mock_cfn, mock_logs]408 cli.main(['logs'])...

Full Screen

Full Screen

test_cw_log_puller.py

Source:test_cw_log_puller.py Github

copy

Full Screen

1import copy2from datetime import datetime3from unittest import TestCase4from unittest.mock import Mock, call, patch, ANY5import botocore.session6from botocore.stub import Stubber7from samcli.lib.observability.cw_logs.cw_log_event import CWLogEvent8from samcli.lib.observability.cw_logs.cw_log_puller import CWLogPuller9from samcli.lib.utils.time import to_timestamp, to_datetime10class TestCWLogPuller_load_time_period(TestCase):11 def setUp(self):12 self.log_group_name = "name"13 self.stream_name = "stream name"14 self.timestamp = to_timestamp(datetime.utcnow())15 real_client = botocore.session.get_session().create_client("logs", region_name="us-east-1")16 self.client_stubber = Stubber(real_client)17 self.consumer = Mock()18 self.fetcher = CWLogPuller(real_client, self.consumer, self.log_group_name)19 self.mock_api_response = {20 "events": [21 {22 "eventId": "id1",23 "ingestionTime": 0,24 "logStreamName": self.stream_name,25 "message": "message 1",26 "timestamp": self.timestamp,27 },28 {29 "eventId": "id2",30 "ingestionTime": 0,31 "logStreamName": self.stream_name,32 "message": "message 2",33 "timestamp": self.timestamp,34 },35 ]36 }37 self.expected_events = [38 CWLogEvent(39 self.log_group_name,40 {41 "eventId": "id1",42 "ingestionTime": 0,43 "logStreamName": self.stream_name,44 "message": "message 1",45 "timestamp": self.timestamp,46 },47 ),48 CWLogEvent(49 self.log_group_name,50 {51 "eventId": "id2",52 "ingestionTime": 0,53 "logStreamName": self.stream_name,54 "message": "message 2",55 "timestamp": self.timestamp,56 },57 ),58 ]59 def test_must_fetch_logs_for_log_group(self):60 expected_params = {"logGroupName": self.log_group_name, "interleaved": True}61 # Configure the stubber to return the configured response. The stubber also verifies62 # that input params were provided as expected63 self.client_stubber.add_response("filter_log_events", self.mock_api_response, expected_params)64 with self.client_stubber:65 self.fetcher.load_time_period()66 call_args = [args[0] for (args, _) in self.consumer.consume.call_args_list]67 for event in self.expected_events:68 self.assertIn(event, call_args)69 def test_must_fetch_logs_with_all_params(self):70 pattern = "foobar"71 start = datetime.utcnow()72 end = datetime.utcnow()73 expected_params = {74 "logGroupName": self.log_group_name,75 "interleaved": True,76 "startTime": to_timestamp(start),77 "endTime": to_timestamp(end),78 "filterPattern": pattern,79 }80 self.client_stubber.add_response("filter_log_events", self.mock_api_response, expected_params)81 with self.client_stubber:82 self.fetcher.load_time_period(start_time=start, end_time=end, filter_pattern=pattern)83 call_args = [args[0] for (args, _) in self.consumer.consume.call_args_list]84 for event in self.expected_events:85 self.assertIn(event, call_args)86 @patch("samcli.lib.observability.cw_logs.cw_log_puller.LOG")87 def test_must_print_resource_not_found_only_once(self, patched_log):88 pattern = "foobar"89 start = datetime.utcnow()90 end = datetime.utcnow()91 expected_params = {92 "logGroupName": self.log_group_name,93 "interleaved": True,94 "startTime": to_timestamp(start),95 "endTime": to_timestamp(end),96 "filterPattern": pattern,97 }98 self.client_stubber.add_client_error(99 "filter_log_events", expected_params=expected_params, service_error_code="ResourceNotFoundException"100 )101 self.client_stubber.add_client_error(102 "filter_log_events", expected_params=expected_params, service_error_code="ResourceNotFoundException"103 )104 self.client_stubber.add_response("filter_log_events", self.mock_api_response, expected_params)105 with self.client_stubber:106 self.assertFalse(self.fetcher._invalid_log_group)107 self.fetcher.load_time_period(start_time=start, end_time=end, filter_pattern=pattern)108 self.assertTrue(self.fetcher._invalid_log_group)109 self.fetcher.load_time_period(start_time=start, end_time=end, filter_pattern=pattern)110 self.assertTrue(self.fetcher._invalid_log_group)111 self.fetcher.load_time_period(start_time=start, end_time=end, filter_pattern=pattern)112 self.assertFalse(self.fetcher._invalid_log_group)113 def test_must_paginate_using_next_token(self):114 """Make three API calls, first two returns a nextToken and last does not."""115 token = "token"116 expected_params = {"logGroupName": self.log_group_name, "interleaved": True}117 expected_params_with_token = {"logGroupName": self.log_group_name, "interleaved": True, "nextToken": token}118 mock_response_with_token = copy.deepcopy(self.mock_api_response)119 mock_response_with_token["nextToken"] = token120 # Call 1 returns a token. Also when first call is made, token is **not** passed as API params121 self.client_stubber.add_response("filter_log_events", mock_response_with_token, expected_params)122 # Call 2 returns a token123 self.client_stubber.add_response("filter_log_events", mock_response_with_token, expected_params_with_token)124 # Call 3 DOES NOT return a token. This will terminate the loop.125 self.client_stubber.add_response("filter_log_events", self.mock_api_response, expected_params_with_token)126 # Same data was returned in each API call127 expected_events_result = self.expected_events + self.expected_events + self.expected_events128 with self.client_stubber:129 self.fetcher.load_time_period()130 call_args = [args[0] for (args, _) in self.consumer.consume.call_args_list]131 for event in expected_events_result:132 self.assertIn(event, call_args)133class TestCWLogPuller_tail(TestCase):134 def setUp(self):135 self.log_group_name = "name"136 self.filter_pattern = "pattern"137 self.start_time = to_datetime(10)138 self.max_retries = 3139 self.poll_interval = 1140 real_client = botocore.session.get_session().create_client("logs", region_name="us-east-1")141 self.client_stubber = Stubber(real_client)142 self.consumer = Mock()143 self.fetcher = CWLogPuller(144 real_client,145 self.consumer,146 self.log_group_name,147 max_retries=self.max_retries,148 poll_interval=self.poll_interval,149 )150 self.mock_api_empty_response = {"events": []}151 self.mock_api_response_1 = {152 "events": [153 {154 "timestamp": 11,155 },156 {157 "timestamp": 12,158 },159 ]160 }161 self.mock_api_response_2 = {162 "events": [163 {164 "timestamp": 13,165 },166 {167 "timestamp": 14,168 },169 ]170 }171 self.mock_events1 = [172 CWLogEvent(self.log_group_name, {"timestamp": 11}),173 CWLogEvent(self.log_group_name, {"timestamp": 12}),174 ]175 self.mock_events2 = [176 CWLogEvent(self.log_group_name, {"timestamp": 13}),177 CWLogEvent(self.log_group_name, {"timestamp": 14}),178 ]179 self.mock_events_empty = []180 @patch("samcli.lib.observability.cw_logs.cw_log_puller.time")181 def test_must_tail_logs_with_single_data_fetch(self, time_mock):182 expected_params = {183 "logGroupName": self.log_group_name,184 "interleaved": True,185 "startTime": 10,186 "filterPattern": self.filter_pattern,187 }188 expected_params_second_try = {189 "logGroupName": self.log_group_name,190 "interleaved": True,191 "startTime": 13,192 "filterPattern": self.filter_pattern,193 }194 # first successful return195 self.client_stubber.add_response("filter_log_events", self.mock_api_response_1, expected_params)196 # 3 empty returns as the number of max retries197 self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_second_try)198 self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_second_try)199 self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_second_try)200 with patch.object(201 self.fetcher, "load_time_period", wraps=self.fetcher.load_time_period202 ) as patched_load_time_period:203 with self.client_stubber:204 self.fetcher.tail(205 start_time=self.start_time,206 filter_pattern=self.filter_pattern,207 )208 expected_load_time_period_calls = [209 # First fetch returns data210 call(self.start_time, filter_pattern=self.filter_pattern),211 # Three empty fetches212 call(to_datetime(13), filter_pattern=self.filter_pattern),213 call(to_datetime(13), filter_pattern=self.filter_pattern),214 call(to_datetime(13), filter_pattern=self.filter_pattern),215 ]216 # One per poll217 expected_sleep_calls = [call(self.poll_interval) for _ in expected_load_time_period_calls]218 consumer_call_args = [args[0] for (args, _) in self.consumer.consume.call_args_list]219 self.assertEqual(self.mock_events1, consumer_call_args)220 self.assertEqual(expected_sleep_calls, time_mock.sleep.call_args_list)221 self.assertEqual(expected_load_time_period_calls, patched_load_time_period.call_args_list)222 @patch("samcli.lib.observability.cw_logs.cw_log_puller.time")223 def test_must_tail_logs_with_multiple_data_fetches(self, time_mock):224 expected_params = {225 "logGroupName": self.log_group_name,226 "interleaved": True,227 "startTime": 10,228 "filterPattern": self.filter_pattern,229 }230 expected_params_second_try = {231 "logGroupName": self.log_group_name,232 "interleaved": True,233 "startTime": 13,234 "filterPattern": self.filter_pattern,235 }236 expected_params_third_try = {237 "logGroupName": self.log_group_name,238 "interleaved": True,239 "startTime": 15,240 "filterPattern": self.filter_pattern,241 }242 self.client_stubber.add_response("filter_log_events", self.mock_api_response_1, expected_params)243 self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_second_try)244 self.client_stubber.add_response("filter_log_events", self.mock_api_response_2, expected_params_second_try)245 self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_third_try)246 self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_third_try)247 self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_third_try)248 expected_load_time_period_calls = [249 # First fetch returns data250 call(self.start_time, filter_pattern=self.filter_pattern),251 # This fetch was empty252 call(to_datetime(13), filter_pattern=self.filter_pattern),253 # This fetch returned data254 call(to_datetime(13), filter_pattern=self.filter_pattern),255 # Three empty fetches256 call(to_datetime(15), filter_pattern=self.filter_pattern),257 call(to_datetime(15), filter_pattern=self.filter_pattern),258 call(to_datetime(15), filter_pattern=self.filter_pattern),259 ]260 # One per poll261 expected_sleep_calls = [call(self.poll_interval) for _ in expected_load_time_period_calls]262 with patch.object(263 self.fetcher, "load_time_period", wraps=self.fetcher.load_time_period264 ) as patched_load_time_period:265 with self.client_stubber:266 self.fetcher.tail(start_time=self.start_time, filter_pattern=self.filter_pattern)267 expected_consumer_call_args = [args[0] for (args, _) in self.consumer.consume.call_args_list]268 self.assertEqual(self.mock_events1 + self.mock_events2, expected_consumer_call_args)269 self.assertEqual(expected_load_time_period_calls, patched_load_time_period.call_args_list)270 self.assertEqual(expected_sleep_calls, time_mock.sleep.call_args_list)271 @patch("samcli.lib.observability.cw_logs.cw_log_puller.time")272 def test_without_start_time(self, time_mock):273 expected_params = {274 "logGroupName": self.log_group_name,275 "interleaved": True,276 "startTime": 0,277 "filterPattern": self.filter_pattern,278 }279 self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params)280 self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params)281 self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params)282 expected_load_time_period_calls = [283 # Three empty fetches, all with default start time284 call(to_datetime(0), filter_pattern=ANY),285 call(to_datetime(0), filter_pattern=ANY),286 call(to_datetime(0), filter_pattern=ANY),287 ]288 # One per poll289 expected_sleep_calls = [call(self.poll_interval) for _ in expected_load_time_period_calls]290 with patch.object(291 self.fetcher, "load_time_period", wraps=self.fetcher.load_time_period292 ) as patched_load_time_period:293 with self.client_stubber:294 self.fetcher.tail(295 filter_pattern=self.filter_pattern,296 )297 expected_consumer_call_args = [args[0] for (args, _) in self.consumer.consume.call_args_list]298 self.assertEqual([], expected_consumer_call_args)299 self.assertEqual(expected_load_time_period_calls, patched_load_time_period.call_args_list)300 self.assertEqual(expected_sleep_calls, time_mock.sleep.call_args_list)301 @patch("samcli.lib.observability.cw_logs.cw_log_puller.time")302 def test_with_throttling(self, time_mock):303 expected_params = {304 "logGroupName": self.log_group_name,305 "interleaved": True,306 "startTime": 0,307 "filterPattern": self.filter_pattern,308 }309 for _ in range(self.max_retries):310 self.client_stubber.add_client_error(311 "filter_log_events", expected_params=expected_params, service_error_code="ThrottlingException"312 )313 expected_load_time_period_calls = [call(to_datetime(0), filter_pattern=ANY) for _ in range(self.max_retries)]314 expected_time_calls = [call(2), call(4), call(16)]315 with patch.object(316 self.fetcher, "load_time_period", wraps=self.fetcher.load_time_period317 ) as patched_load_time_period:318 with self.client_stubber:319 self.fetcher.tail(filter_pattern=self.filter_pattern)320 self.consumer.consume.assert_not_called()321 self.assertEqual(expected_load_time_period_calls, patched_load_time_period.call_args_list)...

Full Screen

Full Screen

cwlogs_manager.py

Source:cwlogs_manager.py Github

copy

Full Screen

...26 if stream_name:27 params['logStreamNamePrefix'] = stream_name28 res = cwlogs.describe_log_streams(**params)29 return res['logStreams']30def filter_log_events(group_name, filter_pat,start=None, stop=None,region_name=None):31 cwlogs = boto3.client('logs', region_name=region_name)32 params = {33 'logGroupName': group_name,34 'filterPattern': filter_pat,35 }36 if start:37 params['startTime'] = start38 if stop:39 params['endTime'] = stop40 res = cwlogs.filter_log_events(**params)41 return res['events']42start_ts = int(datetime(2022, 7, 20, 0, 25, tzinfo=timezone.utc).timestamp() * 1000)43end_ts = int(datetime(2022, 7, 20, 0, 25, tzinfo=timezone.utc).timestamp() * 1000)44if __name__ == '__main__':45 46 list_groups = list_log_groups(region_name='us-east-2')47 log.info(f'{list_groups}')48 49 list_group_streams = list_log_group_streams('/aws/lambda/logtest_soyoppa', region_name='ap-southeast-1')50 log.info(f'{list_group_streams}')51 52 #filter_events = filter_log_events('/aws/lambda/logtest_soyoppa', 'INFO Function start', region_name='ap-southeast-1')53 #log.info(f'{filter_events}')54 filter_log_events = filter_log_events('/aws/lambda/logtest_soyoppa', 'INFO Function start',start=start_ts, stop=end_ts,region_name='ap-southeast-1')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful