Best Python code snippet using localstack_python
aws_batch_helpers.py
Source:aws_batch_helpers.py  
...54        jobDefinitionName=register_job_definition_response['jobDefinitionName']55    )56    job_definition = job_definitions_response['jobDefinitions'][0]57    return job_definition58def get_log_stream_name(job_response: DescribeJobsResponseTypeDef) -> str:59    try:60        log_stream_name = job_response['jobs'][0]['container']['logStreamName']61    except Exception as e:62        log_stream_name = None63        cloudwatch_logger.exception("Exception occurred")64        cloudwatch_logger.error(job_response['jobs'])65    return log_stream_name66def get_log_events(client: CloudWatchLogsClient, log_group, log_stream_name, start_time=0, skip=0, start_from_head=True):67    """68    A generator for log items in a single stream. This will yield all the69    items that are available at the current moment.70    Completely stole this from here71    https://airflow.apache.org/docs/apache-airflow/1.10.5/_modules/airflow/contrib/hooks/aws_logs_hook.html72    :param log_group: The name of the log group.73    :type log_group: str74    :param log_stream_name: The name of the specific stream.75    :type log_stream_name: str76    :param start_time: The time stamp value to start reading the logs from (default: 0).77    :type start_time: int78    :param skip: The number of log entries to skip at the start (default: 0).79        This is for when there are multiple entries at the same timestamp.80    :type skip: int81    :param start_from_head: whether to start from the beginning (True) of the log or82        at the end of the log (False).83    :type start_from_head: bool84    :rtype: dict85    :return: | A CloudWatch log event with the following key-value pairs:86             |   'timestamp' (int): The time in milliseconds of the event.87             |   'message' (str): The log event data.88             |   'ingestionTime' (int): The time in milliseconds the event was ingested.89    """90    next_token = None91    event_count = 192    while event_count > 0:93        if next_token is not None:94            token_arg = {'nextToken': next_token}95        else:96            token_arg = {}97        response: GetLogEventsResponseTypeDef = client.get_log_events(logGroupName=log_group,98                                                                      logStreamName=log_stream_name,99                                                                      startTime=start_time,100                                                                      startFromHead=start_from_head,101                                                                      **token_arg)102        events = response['events']103        event_count = len(events)104        if event_count > skip:105            events = events[skip:]106            skip = 0107        else:108            skip = skip - event_count109            events = []110        for ev in events:111            yield ev112        if 'nextForwardToken' in response:113            next_token = response['nextForwardToken']114        else:115            return116def print_logs(client: CloudWatchLogsClient, log_stream_name: str, start_time: int = 0):117    log_events: List[OutputLogEventTypeDef] = get_log_events(client, log_group='/aws/batch/job',118                                                             log_stream_name=log_stream_name, start_time=start_time)119    last_time_stamp =  start_time120    for log_event in log_events:121        last_time_stamp = log_event['timestamp']122        human_timestamp  = get_human_readable_time(last_time_stamp)123        message = log_event['message']124        cloudwatch_logger.info(f'[{human_timestamp}] {message}')125    if last_time_stamp > 0:126        last_time_stamp = last_time_stamp + 1127    return last_time_stamp128def watch_job(batch_client: BatchClient, log_client: CloudWatchLogsClient, job_response: DescribeJobsResponseTypeDef) -> JobStatusType:129    """Watch an AWS Batch job and print out the logs130    Shoutout to aws labs:131    https://github.com/awslabs/aws-batch-helpers/blob/master/gpu-example/submit-job.py132    Args:133        batch_client (BatchClient): boto3.client('batch')134        log_client (CloudWatchLogsClient): boto3.client('logs')135        job_response (DescribeJobsResponseTypeDef): batch_client.describe_jobs(jobs=[jobId])136    Returns:137        JobStatusType: AWS Batch Job Status138    """139    spinner = 0140    running = False141    start_time = 0142    wait = True143    spin = ['-', '/', '|', '\\', '-', '/', '|', '\\']144    job_id = job_response['jobs'][0]['jobId']145    job_name = job_response['jobs'][0]['jobName']146    log_stream_name: Any = None147    line = '=' * 80148    while wait:149        time.sleep(1)150        describe_jobs_response: DescribeJobsResponseTypeDef = batch_client.describe_jobs(jobs=[151                                                                                         job_id])152        status: JobStatusType = describe_jobs_response['jobs'][0]['status']153        if status == 'SUCCEEDED' or status == 'FAILED':154            log_stream_name = get_log_stream_name(155                job_response=describe_jobs_response)156            if not running and log_stream_name:157                running = False158                watch_logger.info(f'Job [{job_name} - {job_id}] is COMPLETE with status: {status}')159                # print('\rJob [%s - %s] is COMPLETE with status: %s.' %160                #       (job_name, job_id, status))161                # print('Output [%s]:\n %s' % (log_stream_name, '=' * 80))162                watch_logger.info(f'Logs for log stream: {log_stream_name}:')163            if log_stream_name:164                start_time = print_logs(client=log_client,165                                        log_stream_name=log_stream_name,166                                        start_time=start_time)167            watch_logger.info(f'{line}\nJob [{job_name} - {job_id}] {status}')168            break169        elif status == 'RUNNING':170            log_stream_name = get_log_stream_name(171                job_response=describe_jobs_response)172            if not running and log_stream_name:173                running = True174                watch_logger.info(f'Job [{job_name} - {job_id}] is RUNNING')175                watch_logger.info(f'Polling cloudwatch logs...')176                watch_logger.info(f'Output for logstream: {log_stream_name}:\n{line}')177            if log_stream_name:178                start_time = print_logs(client=log_client,179                                        log_stream_name=log_stream_name,180                                        start_time=start_time)181        else:182            this_spin = spin[spinner % len(spin)]183            watch_logger.info(f'Job [{job_name} - {job_id}] is: {status}... {this_spin}')184            sys.stdout.flush()...supervisor.py
Source:supervisor.py  
...77            "statusCode": 500,78            "headers": {79                "amz-lambda-request-id": self.lambda_instance.get_request_id(),80                "amz-log-group-name": self.lambda_instance.get_log_group_name(),81                "amz-log-stream-name": self.lambda_instance.get_log_stream_name()82            },83            "body": StrUtils.dict_to_base64str({"exception": exception_msg}),84            "isBase64Encoded": True,85        }86    def create_response(self):87        res = {88            "statusCode": 200,89            "headers": {90                "amz-lambda-request-id": self.lambda_instance.get_request_id(),91                "amz-log-group-name": self.lambda_instance.get_log_group_name(),92                "amz-log-stream-name": self.lambda_instance.get_log_stream_name()93            },94            "body": "",95            "isBase64Encoded": True,96        }97        if "udocker_output" in self.body:98            res["body"] = StrUtils.bytes_to_base64str(self.body["udocker_output"])99        elif "container_output" in self.body:100            res["body"] = StrUtils.bytes_to_base64str(self.body["container_output"])...flow.py
Source:flow.py  
...14def batch_run_name(partner_id: str):15    return f"{partner_id}_{prefect.context.flow_run_name}"16batch_observer = AWSClientWait(client="batch")17@task(max_retries=3, retry_delay=timedelta(seconds=30))18def get_log_stream_name(job_id):19    # let's attempt to get the log stream name20    batch_client = get_boto_client("batch")21    response = batch_client.describe_jobs(jobs=[job_id])22    with open("out_response.json", "w") as j:23        json.dump(response, j)24    logStreamName = response["jobs"][0]["attempts"][0]["container"]["logStreamName"]25    if logStreamName:26        create_link_artifact(27            f"https://console.aws.amazon.com/cloudwatch/home?#logsV2:log-groups/{logStreamName}"28        )29        print(f"Found logs at {logStreamName}")30@task31def print_context():32    print(prefect.context.__dict__)33with Flow("batch-jobs") as flow:34    phrase = Parameter("phrase", default="0")35    batch_job_name = batch_run_name(phrase)36    batch_job_cowsay = batch_submit(37        job_name=batch_job_name,38        job_definition="whalesay",39        job_queue="whalesay-queue",40        batch_kwargs={41            "containerOverrides": {42                "vcpus": 1,43                "memory": 128,44            },45            "parameters": {"phrase": phrase},46        },47        task_args=dict(log_stdout=True),48    )49    complete_job = batch_observer(50        task_args=dict(name="Running Job Waiter"),51        waiter_name="JobComplete",52        waiter_kwargs={53            "jobs": [batch_job_cowsay],54            "WaiterConfig": {"Delay": 10, "MaxAttempts": 10},55        },56    )57    log_artifact_waiter = batch_observer(58        task_args=dict(trigger=prefect.triggers.always_run, name="Running Job Waiter"),59        waiter_name="JobRunning",60        waiter_kwargs={61            "jobs": [batch_job_cowsay],62            "WaiterConfig": {"Delay": 10, "MaxAttempts": 10},63        },64    )65    log_artifact = get_log_stream_name(66        batch_job_cowsay, upstream_tasks=[log_artifact_waiter]67    )68if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
