Best Python code snippet using localstack_python
client.pyi
Source:client.pyi  
...242        Provides information about the specified Call Analytics category.243        [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/transcribe.html#TranscribeService.Client.get_call_analytics_category)244        [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_transcribe/client.html#get_call_analytics_category)245        """246    def get_call_analytics_job(247        self, *, CallAnalyticsJobName: str248    ) -> GetCallAnalyticsJobResponseTypeDef:249        """250        Provides information about the specified Call Analytics job.251        [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/transcribe.html#TranscribeService.Client.get_call_analytics_job)252        [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_transcribe/client.html#get_call_analytics_job)253        """254    def get_medical_transcription_job(255        self, *, MedicalTranscriptionJobName: str256    ) -> GetMedicalTranscriptionJobResponseTypeDef:257        """258        Provides information about the specified medical transcription job.259        [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/transcribe.html#TranscribeService.Client.get_medical_transcription_job)260        [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_transcribe/client.html#get_medical_transcription_job)...pca-transcribe-eventbridge.py
Source:pca-transcribe-eventbridge.py  
1"""2This python function is part of the main processing workflow.  It is called by Event Bridge once a Transcribe job3has completed.  It will look up that job record in DynamoDB, including the Step Functions task token associated with4that job, extract the job-status from the relevant Transcribe API and then resume the Step Function execution5Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.6SPDX-License-Identifier: Apache-2.07"""8import json9import boto310import time11import os12import pcaconfiguration as cf13# Total number of retry attempts to make14RETRY_LIMIT = 215def lambda_handler(event, context):16    # Our tracking table name is an environment variable17    DDB_TRACKING_TABLE = os.environ["TableName"]18    # Mapping of event type to Transcribe API type, which defines the19    # Transcribe call method and tags to use when looking up the jobs status20    transcribe = boto3.client("transcribe")21    TRANSCRIBE_API_MAP = {22        "Transcribe Job State Change": {23            "mode": cf.API_STANDARD,24            "eb_job_name": "TranscriptionJobName",25            "get_job_method": transcribe.get_transcription_job,26            "api_key": "TranscriptionJobName",27            "job_tag": "TranscriptionJob",28            "status_tag": "TranscriptionJobStatus"29        },30        "Call Analytics Job State Change": {31            "mode": cf.API_ANALYTICS,32            "eb_job_name": "JobName",33            "get_job_method": transcribe.get_call_analytics_job,34            "api_key": "CallAnalyticsJobName",35            "job_tag": "CallAnalyticsJob",36            "status_tag": "CallAnalyticsJobStatus"37        }38    }39    # Work out what Transcribe API mode this is - if it's40    # an event type that we don't support then quietly exit41    if TRANSCRIBE_API_MAP.get(event["detail-type"], False):42        api_map = TRANSCRIBE_API_MAP[event["detail-type"]]43        api_mode = api_map["mode"]44        # Lookup our job metadata and results, then find out DDB matching entry45        try:46            # Lookup the job status47            job_name = event["detail"][api_map["eb_job_name"]]48            kwargs = {api_map["api_key"]: job_name}49            response = api_map["get_job_method"](**{k: v for k, v in kwargs.items()})[api_map["job_tag"]]50            job_status = response[api_map["status_tag"]]51            # Read tracking entry between Transcribe job and its Step Function52            ddbClient = boto3.client("dynamodb")53            tracking = ddbClient.get_item(Key={'PKJobId': {'S': job_name}, 'SKApiMode': {'S': api_mode}},54                                          TableName=DDB_TRACKING_TABLE)55            # It's unlikely, but if we didn't get a value due to some race condition56            # meaning that the job finishes before the token was written then wait57            # for 5 seconds and try again.  Just once.  This may never happen58            if "Item" not in tracking:59                # Just sleep for a few seconds and try again60                time.sleep(5)61                tracking = ddbClient.get_item(Key={'PKJobId': {'S': job_name}, 'SKApiMode': {'S': api_mode}},62                                              TableName=DDB_TRACKING_TABLE)63        except:64            # If the job status lookup failed (unlikely) or DDB threw an exception,65            # then just say we didn't find a matching record and carry on66            tracking = {}67        # Did we have a result?68        if "Item" in tracking:69            # Delete entry in DDB table - there's no way we'll be processing this again70            ddbClient.delete_item(Key={'PKJobId': {'S': job_name}, 'SKApiMode': {'S': api_mode}},71                                  TableName=DDB_TRACKING_TABLE)72            # Extract the Step Functions task and previous event status73            taskToken = tracking["Item"]["taskToken"]['S']74            eventStatus = json.loads(tracking["Item"]["taskState"]['S'])75            # If the job has FAILED then we need to check if it's a service failure,76            # as this can happen, then we want to re-try the job another time77            finalResponse = job_status78            if job_status == "FAILED":79                errorMesg = response["FailureReason"]80                if errorMesg.startswith("Internal"):81                    # Internal failure - we want to retry a few times, but only once82                    retryCount = eventStatus.pop("retryCount", 0)83                    # Not retried enough yet - let's try another time84                    if (retryCount < RETRY_LIMIT):85                        eventStatus["retryCount"] = retryCount + 186                        finalResponse = "RETRY"87            # All complete - continue our workflow with this status/retry count88            eventStatus["transcribeStatus"] = finalResponse89            sfnClient = boto3.client("stepfunctions")90            sfnClient.send_task_success(taskToken=taskToken,91                                        output=json.dumps(eventStatus))92    # We're always positive... we'll get Transcribe events outside of PCA,93    # so if we didn't find our tracking entry then it shouldn't be an issue94    return {95        'statusCode': 200,96        'body': json.dumps('Success.')97    }98# Main entrypoint for testing99# Note, Status could be COMPLETED or FAILED100if __name__ == "__main__":101    event = {102        'version': '0',103        'id': '0029c6b1-7c8e-1f61-fed5-7ef256b3660b',104        'detail-type': 'Transcribe Job State Change',105        'source': 'aws.transcribe',106        'account': '710514874879',107        'time': '2020-08-05T13:32:03Z',108        'region': 'us-east-1',109        'resources': [],110        'detail': {111            'TranscriptionJobName': 'stereo.mp3',112            'TranscriptionJobStatus': 'COMPLETED'113        }114    }115    os.environ['TableName'] = 'cci-PCAServer-MK00H3MPFXK9-DDB-1DUUJKPYBH0LP-Table-1AOTYJNH0R9RF'...CallAnalyticsJob.py
Source:CallAnalyticsJob.py  
...33         ]34 )35    36while True:37    status = transcribe.get_call_analytics_job(CallAnalyticsJobName=job_name)38    if status['CallAnalyticsJob']['CallAnalyticsJobStatus'] in ['COMPLETED', 'FAILED']:39        break40    print("Not ready yet...")41    time.sleep(5)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
