How to use logs_client method in localstack

Best Python code snippet using localstack_python

cw_log_puller.py

Source:cw_log_puller.py Github

copy

Full Screen

1"""2CloudWatch log event puller implementation3"""4import logging5import time6from datetime import datetime7from typing import Optional, Any, List8from botocore.exceptions import ClientError9from samcli.lib.observability.cw_logs.cw_log_event import CWLogEvent10from samcli.lib.observability.observability_info_puller import ObservabilityPuller, ObservabilityEventConsumer11from samcli.lib.utils.time import to_timestamp, to_datetime12LOG = logging.getLogger(__name__)13class CWLogPuller(ObservabilityPuller):14 """15 Puller implementation that can pull events from CloudWatch log group16 """17 def __init__(18 self,19 logs_client: Any,20 consumer: ObservabilityEventConsumer,21 cw_log_group: str,22 resource_name: Optional[str] = None,23 max_retries: int = 1000,24 poll_interval: int = 1,25 ):26 """27 Parameters28 ----------29 logs_client: CloudWatchLogsClient30 boto3 logs client instance31 consumer : ObservabilityEventConsumer32 Consumer instance that will process pulled events33 cw_log_group : str34 CloudWatch log group name35 resource_name : Optional[str]36 Optional parameter to assign a resource name for each event.37 max_retries: int38 Optional parameter to set maximum retries when tailing. Default value is 100039 poll_interval: int40 Optional parameter to define sleep interval between pulling new log events when tailing. Default value is 141 """42 self.logs_client = logs_client43 self.consumer = consumer44 self.cw_log_group = cw_log_group45 self.resource_name = resource_name46 self._max_retries = max_retries47 self._poll_interval = poll_interval48 self.latest_event_time = 049 self.had_data = False50 self._invalid_log_group = False51 def tail(self, start_time: Optional[datetime] = None, filter_pattern: Optional[str] = None):52 if start_time:53 self.latest_event_time = to_timestamp(start_time)54 counter = self._max_retries55 while counter > 0 and not self.cancelled:56 LOG.debug("Tailing logs from %s starting at %s", self.cw_log_group, str(self.latest_event_time))57 counter -= 158 try:59 self.load_time_period(to_datetime(self.latest_event_time), filter_pattern=filter_pattern)60 except ClientError as err:61 error_code = err.response.get("Error", {}).get("Code")62 if error_code == "ThrottlingException":63 # if throttled, increase poll interval by 1 second each time64 if self._poll_interval == 1:65 self._poll_interval += 166 else:67 self._poll_interval **= 268 LOG.warning(69 "Throttled by CloudWatch Logs API, consider pulling logs for certain resources. "70 "Increasing the poll interval time for resource %s to %s seconds",71 self.cw_log_group,72 self._poll_interval,73 )74 else:75 # if error is other than throttling, re-raise it76 LOG.error("Failed while fetching new log events", exc_info=err)77 raise err78 # This poll fetched logs. Reset the retry counter and set the timestamp for next poll79 if self.had_data:80 counter = self._max_retries81 self.latest_event_time += 1 # one extra millisecond to fetch next log event82 self.had_data = False83 # We already fetched logs once. Sleep for some time before querying again.84 # This also helps us scoot under the TPS limit for CloudWatch API call.85 time.sleep(self._poll_interval)86 def load_time_period(87 self,88 start_time: Optional[datetime] = None,89 end_time: Optional[datetime] = None,90 filter_pattern: Optional[str] = None,91 ):92 kwargs = {"logGroupName": self.cw_log_group, "interleaved": True}93 if start_time:94 kwargs["startTime"] = to_timestamp(start_time)95 if end_time:96 kwargs["endTime"] = to_timestamp(end_time)97 if filter_pattern:98 kwargs["filterPattern"] = filter_pattern99 while True:100 LOG.debug("Fetching logs from CloudWatch with parameters %s", kwargs)101 try:102 result = self.logs_client.filter_log_events(**kwargs)103 self._invalid_log_group = False104 except self.logs_client.exceptions.ResourceNotFoundException:105 if not self._invalid_log_group:106 LOG.debug(107 "The specified log group %s does not exist. "108 "This may be due to your resource have not been invoked yet.",109 self.cw_log_group,110 )111 self._invalid_log_group = True112 break113 # Several events will be returned. Consume one at a time114 for event in result.get("events", []):115 self.had_data = True116 cw_event = CWLogEvent(self.cw_log_group, dict(event), self.resource_name)117 if cw_event.timestamp > self.latest_event_time:118 self.latest_event_time = cw_event.timestamp119 self.consumer.consume(cw_event)120 # Keep iterating until there are no more logs left to query.121 next_token = result.get("nextToken", None)122 kwargs["nextToken"] = next_token123 if not next_token:124 break125 def load_events(self, event_ids: List[Any]):...

Full Screen

Full Screen

lambda_function.py

Source:lambda_function.py Github

copy

Full Screen

1from __future__ import print_function2import sys3import json4import logging5import threading6import traceback7import os8import boto39from urllib.request import Request, urlopen10logger = logging.getLogger()11logger.setLevel(logging.INFO)12logging.basicConfig(13 format='%(levelname)s %(threadName)s [%(filename)s:%(lineno)d] %(message)s',14 datefmt='%Y-%m-%d:%H:%M:%S',15 level=logging.INFO16)17try:18 logger.info("Container initialization completed")19except Exception as e:20 logger.error(e, exc_info=True)21 init_failed = e22############################################################23# SIGNAL HANDLER FUNCTIONS #24############################################################25def create(event, logs_client, logger):26 """ 27 Upon creation of dataset, puts a subscription filter to stream logs to Lambda 28 :param event: Event sent from CFN when the custom resource is deleted 29 :param logs_client: Initialized CloudWatch Logs client 30 """31 resource_properties = event['ResourceProperties']32 log_group_name = resource_properties['LogGroupName']33 destination_arn = resource_properties['DestinationArn']34 filter_pattern = resource_properties['FilterPattern']35 # Get name of function36 destination_lambda_name = destination_arn.split(":")[6]37 filter_name = ""38 try:39 filter_name = logs_client.describe_subscription_filters(40 logGroupName=log_group_name)['subscriptionFilters'][0]['filterName']41 except (KeyError, IndexError) as e:42 filter_name = "LambdaStream_" + destination_lambda_name43 logger.info(44 "Cannot find existing subscription filter: %s. Created filter name: %s", e, filter_name)45 logs_client.put_subscription_filter(46 logGroupName=log_group_name,47 filterName=filter_name,48 filterPattern=filter_pattern,49 destinationArn=destination_arn50 )51 return52def delete(event, logs_client):53 """ 54 Deletes the subscription filter that streams to Lambda 55 :param event: Event sent from CFN when the custom resource is deleted 56 :param logs_client: Initialized CloudWatch Logs client 57 """58 resource_properties = event['ResourceProperties']59 log_group_name = resource_properties['LogGroupName']60 filter_name = logs_client.describe_subscription_filters(61 logGroupName=log_group_name)['subscriptionFilters'][0]['filterName']62 logs_client.delete_subscription_filter(63 logGroupName=log_group_name,64 filterName=filter_name65 )66 return67# NOT IN USE68def update(event, context):69 """ 70 """71 return72############################################################73# HELPER FUNCTION #74############################################################75def send_response(e, c, rs, rd):76 """ 77 Packages response and send signals to CloudFormation 78 :param e: The event given to this Lambda function 79 :param c: Context object, as above 80 :param rs: Returned status to be sent back to CFN 81 :param rd: Returned data to be sent back to CFN 82 """83 r = json.dumps({84 "Status": rs,85 "Reason": "CloudWatch Log Stream: " + c.log_stream_name,86 "PhysicalResourceId": e['LogicalResourceId'],87 "StackId": e['StackId'],88 "RequestId": e['RequestId'],89 "LogicalResourceId": e['LogicalResourceId'],90 "Data": rd91 })92 d = str.encode(r)93 h = {94 'content-type': '',95 'content-length': str(len(d))96 }97 req = Request(e['ResponseURL'], data=d, method='PUT', headers=h)98 r = urlopen(req)99 logger.info("Status message: {} {}".format(r.msg, r.getcode()))100############################################################101# LAMBDA FUNCTION HANDLER #102############################################################103# IMPORTANT: The Lambda function will be called whenever #104# changes are made to the stack. Thus, ensure that the #105# signals are handled by your Lambda function correctly, #106# or the stack could get stuck in the DELETE_FAILED state #107############################################################108def handler(event, context):109 """ 110 Entrypoint to Lambda, updates the main CloudTrail trail 111 :param event: The event given to this Lambda function 112 :param context: Context object containing Lambda metadata 113 """114 request_type = event['RequestType']115 client = boto3.client('logs')116 try:117 if request_type == 'Create':118 create(event, client, logger)119 send_response(event, context, "SUCCESS", {"Message": "Created"})120 elif request_type == 'Update':121 create(event, client, logger)122 send_response(event, context, "SUCCESS",123 {"Message": "Updated"})124 elif request_type == 'Delete':125 delete(event, client)126 send_response(event, context, "SUCCESS",127 {"Message": "Deleted"})128 else:129 send_response(event, context, "FAILED",130 {"Message": "Unexpected"})131 except Exception as ex:132 logger.error(ex)133 traceback.print_tb(ex.__traceback__)134 send_response(135 event,136 context,137 "FAILED",138 {139 "Message": "Exception"140 }...

Full Screen

Full Screen

cloudwatch_logging.py

Source:cloudwatch_logging.py Github

copy

Full Screen

1import time2import boto33import botocore.exceptions4def log_message(log_group_name: str, log_stream_name: str, message: str):5 """Logs a message to cloudwatch."""6 logs_client = boto3.client("logs")7 def get_sequence_token():8 # try to get the upload sequence token9 paginator = logs_client.get_paginator('describe_log_streams')10 for page in paginator.paginate(logGroupName=log_group_name, logStreamNamePrefix=log_stream_name):11 for log_stream in page['logStreams']:12 if log_stream['logStreamName'] == log_stream_name:13 return log_stream.get('uploadSequenceToken', None)14 return None15 while True:16 try:17 logs_client.create_log_group(logGroupName=log_group_name)18 except logs_client.exceptions.ResourceAlreadyExistsException:19 pass20 try:21 logs_client.create_log_stream(22 logGroupName=log_group_name, logStreamName=log_stream_name)23 except logs_client.exceptions.ResourceAlreadyExistsException:24 pass25 sequence_token = get_sequence_token()26 try:27 kwargs = dict(28 logGroupName=log_group_name,29 logStreamName=log_stream_name,30 logEvents=[dict(31 timestamp=int(time.time() * 1000),32 message=message,33 )],34 )35 if sequence_token is not None:36 kwargs['sequenceToken'] = sequence_token37 logs_client.put_log_events(**kwargs)38 break39 except logs_client.exceptions.InvalidSequenceTokenException:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful