How to use get_log_stream_name method in localstack

Best Python code snippet using localstack_python

aws_batch_helpers.py

Source:aws_batch_helpers.py Github

copy

Full Screen

...54 jobDefinitionName=register_job_definition_response['jobDefinitionName']55 )56 job_definition = job_definitions_response['jobDefinitions'][0]57 return job_definition58def get_log_stream_name(job_response: DescribeJobsResponseTypeDef) -> str:59 try:60 log_stream_name = job_response['jobs'][0]['container']['logStreamName']61 except Exception as e:62 log_stream_name = None63 cloudwatch_logger.exception("Exception occurred")64 cloudwatch_logger.error(job_response['jobs'])65 return log_stream_name66def get_log_events(client: CloudWatchLogsClient, log_group, log_stream_name, start_time=0, skip=0, start_from_head=True):67 """68 A generator for log items in a single stream. This will yield all the69 items that are available at the current moment.70 Completely stole this from here71 https://airflow.apache.org/docs/apache-airflow/1.10.5/_modules/airflow/contrib/hooks/aws_logs_hook.html72 :param log_group: The name of the log group.73 :type log_group: str74 :param log_stream_name: The name of the specific stream.75 :type log_stream_name: str76 :param start_time: The time stamp value to start reading the logs from (default: 0).77 :type start_time: int78 :param skip: The number of log entries to skip at the start (default: 0).79 This is for when there are multiple entries at the same timestamp.80 :type skip: int81 :param start_from_head: whether to start from the beginning (True) of the log or82 at the end of the log (False).83 :type start_from_head: bool84 :rtype: dict85 :return: | A CloudWatch log event with the following key-value pairs:86 | 'timestamp' (int): The time in milliseconds of the event.87 | 'message' (str): The log event data.88 | 'ingestionTime' (int): The time in milliseconds the event was ingested.89 """90 next_token = None91 event_count = 192 while event_count > 0:93 if next_token is not None:94 token_arg = {'nextToken': next_token}95 else:96 token_arg = {}97 response: GetLogEventsResponseTypeDef = client.get_log_events(logGroupName=log_group,98 logStreamName=log_stream_name,99 startTime=start_time,100 startFromHead=start_from_head,101 **token_arg)102 events = response['events']103 event_count = len(events)104 if event_count > skip:105 events = events[skip:]106 skip = 0107 else:108 skip = skip - event_count109 events = []110 for ev in events:111 yield ev112 if 'nextForwardToken' in response:113 next_token = response['nextForwardToken']114 else:115 return116def print_logs(client: CloudWatchLogsClient, log_stream_name: str, start_time: int = 0):117 log_events: List[OutputLogEventTypeDef] = get_log_events(client, log_group='/aws/batch/job',118 log_stream_name=log_stream_name, start_time=start_time)119 last_time_stamp = start_time120 for log_event in log_events:121 last_time_stamp = log_event['timestamp']122 human_timestamp = get_human_readable_time(last_time_stamp)123 message = log_event['message']124 cloudwatch_logger.info(f'[{human_timestamp}] {message}')125 if last_time_stamp > 0:126 last_time_stamp = last_time_stamp + 1127 return last_time_stamp128def watch_job(batch_client: BatchClient, log_client: CloudWatchLogsClient, job_response: DescribeJobsResponseTypeDef) -> JobStatusType:129 """Watch an AWS Batch job and print out the logs130 Shoutout to aws labs:131 https://github.com/awslabs/aws-batch-helpers/blob/master/gpu-example/submit-job.py132 Args:133 batch_client (BatchClient): boto3.client('batch')134 log_client (CloudWatchLogsClient): boto3.client('logs')135 job_response (DescribeJobsResponseTypeDef): batch_client.describe_jobs(jobs=[jobId])136 Returns:137 JobStatusType: AWS Batch Job Status138 """139 spinner = 0140 running = False141 start_time = 0142 wait = True143 spin = ['-', '/', '|', '\\', '-', '/', '|', '\\']144 job_id = job_response['jobs'][0]['jobId']145 job_name = job_response['jobs'][0]['jobName']146 log_stream_name: Any = None147 line = '=' * 80148 while wait:149 time.sleep(1)150 describe_jobs_response: DescribeJobsResponseTypeDef = batch_client.describe_jobs(jobs=[151 job_id])152 status: JobStatusType = describe_jobs_response['jobs'][0]['status']153 if status == 'SUCCEEDED' or status == 'FAILED':154 log_stream_name = get_log_stream_name(155 job_response=describe_jobs_response)156 if not running and log_stream_name:157 running = False158 watch_logger.info(f'Job [{job_name} - {job_id}] is COMPLETE with status: {status}')159 # print('\rJob [%s - %s] is COMPLETE with status: %s.' %160 # (job_name, job_id, status))161 # print('Output [%s]:\n %s' % (log_stream_name, '=' * 80))162 watch_logger.info(f'Logs for log stream: {log_stream_name}:')163 if log_stream_name:164 start_time = print_logs(client=log_client,165 log_stream_name=log_stream_name,166 start_time=start_time)167 watch_logger.info(f'{line}\nJob [{job_name} - {job_id}] {status}')168 break169 elif status == 'RUNNING':170 log_stream_name = get_log_stream_name(171 job_response=describe_jobs_response)172 if not running and log_stream_name:173 running = True174 watch_logger.info(f'Job [{job_name} - {job_id}] is RUNNING')175 watch_logger.info(f'Polling cloudwatch logs...')176 watch_logger.info(f'Output for logstream: {log_stream_name}:\n{line}')177 if log_stream_name:178 start_time = print_logs(client=log_client,179 log_stream_name=log_stream_name,180 start_time=start_time)181 else:182 this_spin = spin[spinner % len(spin)]183 watch_logger.info(f'Job [{job_name} - {job_id}] is: {status}... {this_spin}')184 sys.stdout.flush()...

Full Screen

Full Screen

supervisor.py

Source:supervisor.py Github

copy

Full Screen

...77 "statusCode": 500,78 "headers": {79 "amz-lambda-request-id": self.lambda_instance.get_request_id(),80 "amz-log-group-name": self.lambda_instance.get_log_group_name(),81 "amz-log-stream-name": self.lambda_instance.get_log_stream_name()82 },83 "body": StrUtils.dict_to_base64str({"exception": exception_msg}),84 "isBase64Encoded": True,85 }86 def create_response(self):87 res = {88 "statusCode": 200,89 "headers": {90 "amz-lambda-request-id": self.lambda_instance.get_request_id(),91 "amz-log-group-name": self.lambda_instance.get_log_group_name(),92 "amz-log-stream-name": self.lambda_instance.get_log_stream_name()93 },94 "body": "",95 "isBase64Encoded": True,96 }97 if "udocker_output" in self.body:98 res["body"] = StrUtils.bytes_to_base64str(self.body["udocker_output"])99 elif "container_output" in self.body:100 res["body"] = StrUtils.bytes_to_base64str(self.body["container_output"])...

Full Screen

Full Screen

flow.py

Source:flow.py Github

copy

Full Screen

...14def batch_run_name(partner_id: str):15 return f"{partner_id}_{prefect.context.flow_run_name}"16batch_observer = AWSClientWait(client="batch")17@task(max_retries=3, retry_delay=timedelta(seconds=30))18def get_log_stream_name(job_id):19 # let's attempt to get the log stream name20 batch_client = get_boto_client("batch")21 response = batch_client.describe_jobs(jobs=[job_id])22 with open("out_response.json", "w") as j:23 json.dump(response, j)24 logStreamName = response["jobs"][0]["attempts"][0]["container"]["logStreamName"]25 if logStreamName:26 create_link_artifact(27 f"https://console.aws.amazon.com/cloudwatch/home?#logsV2:log-groups/{logStreamName}"28 )29 print(f"Found logs at {logStreamName}")30@task31def print_context():32 print(prefect.context.__dict__)33with Flow("batch-jobs") as flow:34 phrase = Parameter("phrase", default="0")35 batch_job_name = batch_run_name(phrase)36 batch_job_cowsay = batch_submit(37 job_name=batch_job_name,38 job_definition="whalesay",39 job_queue="whalesay-queue",40 batch_kwargs={41 "containerOverrides": {42 "vcpus": 1,43 "memory": 128,44 },45 "parameters": {"phrase": phrase},46 },47 task_args=dict(log_stdout=True),48 )49 complete_job = batch_observer(50 task_args=dict(name="Running Job Waiter"),51 waiter_name="JobComplete",52 waiter_kwargs={53 "jobs": [batch_job_cowsay],54 "WaiterConfig": {"Delay": 10, "MaxAttempts": 10},55 },56 )57 log_artifact_waiter = batch_observer(58 task_args=dict(trigger=prefect.triggers.always_run, name="Running Job Waiter"),59 waiter_name="JobRunning",60 waiter_kwargs={61 "jobs": [batch_job_cowsay],62 "WaiterConfig": {"Delay": 10, "MaxAttempts": 10},63 },64 )65 log_artifact = get_log_stream_name(66 batch_job_cowsay, upstream_tasks=[log_artifact_waiter]67 )68if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful