How to use transcribe_client method in localstack

Best Python code snippet using localstack_python

demo.py

Source:demo.py Github

copy

Full Screen

1# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.2# SPDX-License-Identifier: Apache-2.03"""4Purpose5Shows how to use the AWS SDK for Python (Boto3) with the Amazon Transcribe API to6transcribe an audio file to a text file. Also shows how to define a custom vocabulary7to improve the accuracy of the transcription.8This example uses a public domain audio file downloaded from Wikipedia and converted9from .ogg to .mp3 format. The file contains a reading of the poem Jabberwocky by10Lewis Carroll. The original audio source file can be found here:11 https://en.wikisource.org/wiki/File:Jabberwocky.ogg12"""13import logging14import sys15import os16import time17import boto318from botocore.exceptions import ClientError19import requests20from pydub import AudioSegment21import csv22import moviepy.editor as mp23import json24import codecs25import datetime26# Add relative path to include demo_tools in this code example without need for setup.27sys.path.append('../..')28from demo_tools import CustomWaiter, WaitState29logger = logging.getLogger(__name__)30class TranscribeCompleteWaiter(CustomWaiter):31 """32 Waits for the transcription to complete.33 """34 def __init__(self, client):35 super().__init__(36 'TranscribeComplete', 'GetTranscriptionJob',37 'TranscriptionJob.TranscriptionJobStatus',38 {'COMPLETED': WaitState.SUCCESS, 'FAILED': WaitState.FAILURE},39 client)40 def wait(self, job_name):41 self._wait(TranscriptionJobName=job_name)42class VocabularyReadyWaiter(CustomWaiter):43 """44 Waits for the custom vocabulary to be ready for use.45 """46 def __init__(self, client):47 super().__init__(48 'VocabularyReady', 'GetVocabulary', 'VocabularyState',49 {'READY': WaitState.SUCCESS}, client)50 def wait(self, vocabulary_name):51 self._wait(VocabularyName=vocabulary_name)52def start_job(53 job_name, media_uri, media_format, language_code, transcribe_client,54 vocabulary_name=None, more_settings={}):55 """56 Starts a transcription job. This function returns as soon as the job is started.57 To get the current status of the job, call get_transcription_job. The job is58 successfully completed when the job status is 'COMPLETED'.59 :param more_settings: additional_settings added to jobargs bellow60 :param job_name: The name of the transcription job. This must be unique for61 your AWS account.62 :param media_uri: The URI where the audio file is stored. This is typically63 in an Amazon S3 bucket.64 :param media_format: The format of the audio file. For example, mp3 or wav.65 :param language_code: The language code of the audio file.66 For example, en-US or ja-JP67 :param transcribe_client: The Boto3 Transcribe client.68 :param vocabulary_name: The name of a custom vocabulary to use when transcribing69 the audio file.70 :return: Data about the job.71 """72 try:73 job_args = {74 'TranscriptionJobName': job_name,75 'Media': {'MediaFileUri': media_uri},76 'MediaFormat': media_format,77 'LanguageCode': language_code}78 if vocabulary_name is not None:79 job_args['Settings'] = {'VocabularyName': vocabulary_name}80 if job_args.get('Settings', None):81 job_args['Settings'].update(more_settings)82 else:83 if len(more_settings.items()) > 0:84 job_args['Settings'] = more_settings85 response = transcribe_client.start_transcription_job(**job_args)86 job = response['TranscriptionJob']87 logger.info("Started transcription job %s.", job_name)88 except ClientError:89 logger.exception("Couldn't start transcription job %s.", job_name)90 raise91 else:92 return job93def list_jobs(job_filter, transcribe_client):94 """95 Lists summaries of the transcription jobs for the current AWS account.96 :param job_filter: The list of returned jobs must contain this string in their97 names.98 :param transcribe_client: The Boto3 Transcribe client.99 :return: The list of retrieved transcription job summaries.100 """101 try:102 response = transcribe_client.list_transcription_jobs(103 JobNameContains=job_filter)104 jobs = response['TranscriptionJobSummaries']105 next_token = response.get('NextToken')106 while next_token is not None:107 response = transcribe_client.list_transcription_jobs(108 JobNameContains=job_filter, NextToken=next_token)109 jobs += response['TranscriptionJobSummaries']110 next_token = response.get('NextToken')111 logger.info("Got %s jobs with filter %s.", len(jobs), job_filter)112 except ClientError:113 logger.exception("Couldn't get jobs with filter %s.", job_filter)114 raise115 else:116 return jobs117def get_job(job_name, transcribe_client):118 """119 Gets details about a transcription job.120 :param job_name: The name of the job to retrieve.121 :param transcribe_client: The Boto3 Transcribe client.122 :return: The retrieved transcription job.123 """124 try:125 response = transcribe_client.get_transcription_job(126 TranscriptionJobName=job_name)127 job = response['TranscriptionJob']128 logger.info("Got job %s.", job['TranscriptionJobName'])129 except ClientError:130 logger.exception("Couldn't get job %s.", job_name)131 raise132 else:133 return job134def delete_job(job_name, transcribe_client):135 """136 Deletes a transcription job. This also deletes the transcript associated with137 the job.138 :param job_name: The name of the job to delete.139 :param transcribe_client: The Boto3 Transcribe client.140 """141 try:142 transcribe_client.delete_transcription_job(143 TranscriptionJobName=job_name)144 logger.info("Deleted job %s.", job_name)145 except ClientError:146 logger.exception("Couldn't delete job %s.", job_name)147 raise148def create_vocabulary(149 vocabulary_name, language_code, transcribe_client,150 phrases=None, table_uri=None):151 """152 Creates a custom vocabulary that can be used to improve the accuracy of153 transcription jobs. This function returns as soon as the vocabulary processing154 is started. Call get_vocabulary to get the current status of the vocabulary.155 The vocabulary is ready to use when its status is 'READY'.156 :param vocabulary_name: The name of the custom vocabulary.157 :param language_code: The language code of the vocabulary.158 For example, en-US or nl-NL.159 :param transcribe_client: The Boto3 Transcribe client.160 :param phrases: A list of comma-separated phrases to include in the vocabulary.161 :param table_uri: A table of phrases and pronunciation hints to include in the162 vocabulary.163 :return: Information about the newly created vocabulary.164 """165 try:166 vocab_args = {'VocabularyName': vocabulary_name, 'LanguageCode': language_code}167 if phrases is not None:168 vocab_args['Phrases'] = phrases169 elif table_uri is not None:170 vocab_args['VocabularyFileUri'] = table_uri171 response = transcribe_client.create_vocabulary(**vocab_args)172 logger.info("Created custom vocabulary %s.", response['VocabularyName'])173 except ClientError:174 logger.exception("Couldn't create custom vocabulary %s.", vocabulary_name)175 raise176 else:177 return response178def list_vocabularies(vocabulary_filter, transcribe_client):179 """180 Lists the custom vocabularies created for this AWS account.181 :param vocabulary_filter: The returned vocabularies must contain this string in182 their names.183 :param transcribe_client: The Boto3 Transcribe client.184 :return: The list of retrieved vocabularies.185 """186 try:187 response = transcribe_client.list_vocabularies(188 NameContains=vocabulary_filter)189 vocabs = response['Vocabularies']190 next_token = response.get('NextToken')191 while next_token is not None:192 response = transcribe_client.list_vocabularies(193 NameContains=vocabulary_filter, NextToken=next_token)194 vocabs += response['Vocabularies']195 next_token = response.get('NextToken')196 logger.info(197 "Got %s vocabularies with filter %s.", len(vocabs), vocabulary_filter)198 except ClientError:199 logger.exception(200 "Couldn't list vocabularies with filter %s.", vocabulary_filter)201 raise202 else:203 return vocabs204def get_vocabulary(vocabulary_name, transcribe_client):205 """206 Gets information about a customer vocabulary.207 :param vocabulary_name: The name of the vocabulary to retrieve.208 :param transcribe_client: The Boto3 Transcribe client.209 :return: Information about the vocabulary.210 """211 try:212 response = transcribe_client.get_vocabulary(VocabularyName=vocabulary_name)213 logger.info("Got vocabulary %s.", response['VocabularyName'])214 except ClientError:215 logger.exception("Couldn't get vocabulary %s.", vocabulary_name)216 raise217 else:218 return response219def update_vocabulary(220 vocabulary_name, language_code, transcribe_client, phrases=None,221 table_uri=None):222 """223 Updates an existing custom vocabulary. The entire vocabulary is replaced with224 the contents of the update.225 :param vocabulary_name: The name of the vocabulary to update.226 :param language_code: The language code of the vocabulary.227 :param transcribe_client: The Boto3 Transcribe client.228 :param phrases: A list of comma-separated phrases to include in the vocabulary.229 :param table_uri: A table of phrases and pronunciation hints to include in the230 vocabulary.231 """232 try:233 vocab_args = {'VocabularyName': vocabulary_name, 'LanguageCode': language_code}234 if phrases is not None:235 vocab_args['Phrases'] = phrases236 elif table_uri is not None:237 vocab_args['VocabularyFileUri'] = table_uri238 response = transcribe_client.update_vocabulary(**vocab_args)239 logger.info(240 "Updated custom vocabulary %s.", response['VocabularyName'])241 except ClientError:242 logger.exception("Couldn't update custom vocabulary %s.", vocabulary_name)243 raise244def delete_vocabulary(vocabulary_name, transcribe_client):245 """246 Deletes a custom vocabulary.247 :param vocabulary_name: The name of the vocabulary to delete.248 :param transcribe_client: The Boto3 Transcribe client.249 """250 try:251 transcribe_client.delete_vocabulary(VocabularyName=vocabulary_name)252 logger.info("Deleted vocabulary %s.", vocabulary_name)253 except ClientError:254 logger.exception("Couldn't delete vocabulary %s.", vocabulary_name)255 raise256def write_results_to_end(file_name, results):257 if not os.path.exists(file_name):258 # os.makedirs(results_path)259 with open(file_name, 'w') as fd_for_file:260 fd_for_file.write(results)261 else:262 with open(file_name, 'a') as fd_for_file:263 fd_for_file.write('\n')264 fd_for_file.write(results)265 fd_for_file.write('\n')266def replace_n(file_name):267 # home = os.path.expanduser('~')268 # results_path = os.path.join(home, 'Documents', 'transcriptions')269 if not os.path.exists(file_name):270 # os.makedirs(results_path)271 with open(file_name, 'r') as read_fd_for_file:272 t = read_fd_for_file.read()273 else:274 with open(file_name, 'r') as read_fd_for_file:275 t = read_fd_for_file.read()276 with open(file_name, 'w') as write_fd_for_file:277 write_fd_for_file.write(t.replace('/n', '\n'))278def aws_transcribe_loop_dir(path_to_dir, language_code):279 with open('rootkey.csv', newline='') as csvfile:280 config = {item[0]:item[1] for item in csv.reader(csvfile, delimiter='=')}281 config['language_code'] = language_code282 for path, subdirs, files in os.walk(path_to_dir):283 for name in files:284 try:285 short_name, extention = os.path.splitext(name)286 if extention == '.wav':287 fullWav = path + '/' + name288 fullMp3name = path + '/' + short_name + '.mp3'289 AudioSegment.from_wav(fullWav).export(fullMp3name, format="mp3")290 do_aws_transcription(file_name=fullMp3name, config=config )291 else :292 if extention == '.mp3':293 do_aws_transcription(path + '/' + name, config)294 if extention == '.txt':295 replace_n(path + '/' + name)296 except Exception as e:297 print("error: {}".format(e))298 continue299def remove_buckets():300 csvfile = open('rootkey.csv', newline='')301 config = {item[0]: item[1] for item in csv.reader(csvfile, delimiter='=')}302 csvfile.close()303 s3 = boto3.client('s3', aws_access_key_id=config['AWSAccessKeyId'], aws_secret_access_key=config['AWSSecretKey'])304 buckets_dict = s3.list_buckets()305 buckets_list = buckets_dict.get('Buckets', None)306 s3resource = boto3.resource('s3',aws_access_key_id=config['AWSAccessKeyId'],307 aws_secret_access_key=config['AWSSecretKey'])308 for item in buckets_list:309 try:310 if item['Name'] != 'zeev-bucket-1641252030744081000':311 delete_res = s3resource.Object(item['Name'], '20211207-C0015.mp3').delete()312 print(f"delete object response: {delete_res}")313 b = s3resource.Bucket(item['Name'])314 res = b.delete()315 print("delete bucket response: {}".format(res))316 except Exception as e:317 print("delete error : {}".format(e))318def convert_to_mp3(path_to_dir):319 for path, subdirs, files in os.walk(path_to_dir):320 for name in files:321 try:322 short_name, extension = os.path.splitext(name)323 if extension in ['.MOV', '.mov', '.MP4', '.mp4']:324 full_movie_path = path + '/' + name325 full_mp3_path = path + '/' + short_name + '.mp3'326 clip = mp.VideoFileClip(full_movie_path)327 clip.audio.write_audiofile(full_mp3_path)328 except Exception as err:329 print("error: {}".format(err))330 continue331def parse_transcription_results(file_name):332 name, extension = os.path.splitext(file_name)333 with codecs.open(name + '_parsed_' + '.txt', 'w', 'utf-8') as dest_file:334 with codecs.open(file_name, 'r', 'utf-8') as source_f:335 data = json.loads(source_f.read())336 labels = data['speaker_labels']['segments']337 speaker_start_times = {}338 for label in labels:339 for item in label['items']:340 speaker_start_times[item['start_time']] = item['speaker_label']341 items = data['items']342 lines = []343 line = ''344 time = 0345 speaker = 'null'346 i = 0347 for item in items:348 i = i + 1349 content = item['alternatives'][0]['content']350 if item.get('start_time'):351 current_speaker = speaker_start_times[item['start_time']]352 elif item['type'] == 'punctuation':353 line = line + content354 if current_speaker != speaker:355 if speaker:356 lines.append({'speaker': speaker, 'line': line, 'time': time})357 line = content358 speaker = current_speaker359 time = item['start_time']360 elif item['type'] != 'punctuation':361 line = line + ' ' + content362 lines.append({'speaker': speaker, 'line': line, 'time': time})363 sorted_lines = sorted(lines, key=lambda k: float(k['time']))364 for line_data in sorted_lines:365 line = '[' + str(366 datetime.timedelta(seconds=int(round(float(line_data['time']))))) + '] ' + line_data.get(367 'speaker') + ': ' + line_data.get('line')368 dest_file.write(line + '\n\n')369def do_aws_transcription(file_name, config):370 """Shows how to use the Amazon Transcribe service."""371 logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')372 s3_resource = boto3.resource('s3',373 aws_access_key_id=config['AWSAccessKeyId'],374 aws_secret_access_key=config['AWSSecretKey']375 )376 transcribe_client = boto3.client('transcribe',377 aws_access_key_id=config['AWSAccessKeyId'],378 aws_secret_access_key=config['AWSSecretKey'],379 region_name='eu-west-1')380 bucket_name = f'zeev-bucket'381 print(f"Creating bucket {bucket_name}.")382 bucket = s3_resource.create_bucket(383 Bucket=bucket_name,384 CreateBucketConfiguration={385 'LocationConstraint': transcribe_client.meta.region_name})386 media_file_name = file_name387 media_object_key = f'{os.path.split(os.path.dirname(file_name))[1]}-{os.path.split(file_name)[1]}'388 pre, ext = os.path.splitext(media_file_name)389 result_file_name = pre + '.txt'390 print("result file name: " + result_file_name)391 print(f"Uploading media file {media_file_name}.")392 bucket.upload_file(media_file_name, media_object_key)393 media_uri = f's3://{bucket.name}/{media_object_key}'394 job_name_simple = f'Jabber-{time.time_ns()}'395 print(f"Starting transcription job {job_name_simple}.")396 start_job(397 job_name_simple, f's3://{bucket_name}/{media_object_key}', 'mp3', config['language_code'],398 transcribe_client, more_settings={'ShowSpeakerLabels': True, 'MaxSpeakerLabels': 5})399 transcribe_waiter = TranscribeCompleteWaiter(transcribe_client)400 transcribe_waiter.wait(job_name_simple)401 job_simple = get_job(job_name_simple, transcribe_client)402 # transcription result403 transcript_simple = requests.get(404 job_simple['Transcript']['TranscriptFileUri']).json()405 print(f"Transcript for job {transcript_simple['jobName']}:")406 for item in transcript_simple['results']['transcripts']:407 print("transcript item: ".format(item))408 print('-'*88)409 results_json_path = os.path.splitext(result_file_name)[0] + '.json'410 with open(results_json_path, "w") as outfile:411 json.dump(transcript_simple.get('results', {"no": "results"}), outfile)412 for item in transcript_simple['results']['transcripts']:413 write_results_to_end(result_file_name, item['transcript'])414 # print("Creating a custom vocabulary that lists the nonsense words to try to "415 # "improve the transcription.")416 # vocabulary_name = f'Jabber-vocabulary-{time.time_ns()}'417 # create_vocabulary(418 # vocabulary_name, config['language_code'], transcribe_client,419 # phrases=[420 # 'יהודים','עברית','שפה', 'דקות','רוסיה','לצאת','עלייה',],421 # )422 # vocabulary_ready_waiter = VocabularyReadyWaiter(transcribe_client)423 # vocabulary_ready_waiter.wait(vocabulary_name)424 #425 # job_name_vocabulary_list = f'Jabber-vocabulary-list-{time.time_ns()}'426 # print(f"Starting transcription job {job_name_vocabulary_list}.")427 # start_job(428 # job_name_vocabulary_list, media_uri, 'mp3', 'en-US', transcribe_client,429 # vocabulary_name)430 # transcribe_waiter.wait(job_name_vocabulary_list)431 # job_vocabulary_list = get_job(job_name_vocabulary_list, transcribe_client)432 # transcript_vocabulary_list = requests.get(433 # job_vocabulary_list['Transcript']['TranscriptFileUri']).json()434 # print(f"Transcript for job {transcript_vocabulary_list['jobName']}:")435 # print(transcript_vocabulary_list['results']['transcripts'][0]['transcript'])436 # print('-'*88)437 # print("Updating the custom vocabulary with table data that provides additional "438 # "pronunciation hints.")439 # table_vocab_file = 'jabber-vocabulary-table.txt'440 # bucket.upload_file(table_vocab_file, table_vocab_file)441 # update_vocabulary(442 # vocabulary_name, 'en-US', transcribe_client,443 # table_uri=f's3://{bucket.name}/{table_vocab_file}')444 # vocabulary_ready_waiter.wait(vocabulary_name)445 # job_name_vocab_table = f'Jabber-vocab-table-{time.time_ns()}'446 # print(f"Starting transcription job {job_name_vocab_table}.")447 # start_job(448 # job_name_vocab_table, media_uri, 'mp3', 'he-IL', transcribe_client,449 # vocabulary_name=vocabulary_name)450 # transcribe_waiter.wait(job_name_vocab_table)451 # job_vocab_table = get_job(job_name_vocab_table, transcribe_client)452 # transcript_vocab_table = requests.get(453 # job_vocab_table['Transcript']['TranscriptFileUri']).json()454 # print(f"Transcript for job {transcript_vocab_table['jobName']}:")455 # print(transcript_vocab_table['results']['transcripts'][0]['transcript'])456 print('-'*88)457 print("Getting data for jobs and vocabularies.")458 jabber_jobs = list_jobs('Jabber', transcribe_client)459 print(f"Found {len(jabber_jobs)} jobs:")460 for job_sum in jabber_jobs:461 job = get_job(job_sum['TranscriptionJobName'], transcribe_client)462 print(f"\t{job['TranscriptionJobName']}, {job['Media']['MediaFileUri']}, "463 f"{job['Settings'].get('VocabularyName')}")464 jabber_vocabs = list_vocabularies('Jabber', transcribe_client)465 print(f"Found {len(jabber_vocabs)} vocabularies:")466 for vocab_sum in jabber_vocabs:467 vocab = get_vocabulary(vocab_sum['VocabularyName'], transcribe_client)468 vocab_content = requests.get(vocab['DownloadUri']).text469 print(f"\t{vocab['VocabularyName']} contents:")470 print(vocab_content)471 print('-'*88)472 # for job_name in [job_name_simple]:473 # delete_job(job_name, transcribe_client)474 # delete_vocabulary(transcribe_client)475 # bucket.objects.delete()476 # bucket.delete()477if __name__ == '__main__':478 import argparse479 try:480 parser = argparse.ArgumentParser(description=481 "This script transcribes all mp3 files in given working dir (wd)")482 # parser.add_argument('-key_id', help='aws access key id', required=True)483 # parser.add_argument('-key_value', help='aws access key id value', required=True)484 parser.add_argument('-language_code', help='specify language_code', required=False)485 parser.add_argument('-wd', help='specify root directory from which to run script', required=True)486 args = vars(parser.parse_args())487 # aws_transcribe_loop_dir(args['wd'], args.get('language_code', None))488 # convert_to_mp3(args['wd'])489 # remove_buckets()490 parse_transcription_results(args['wd'])491 except Exception as e:...

Full Screen

Full Screen

transcribe_basics.py

Source:transcribe_basics.py Github

copy

Full Screen

1from custom_waiter import CustomWaiter, WaitState2import logging3import sys4import time5from unittest import result6import boto37from botocore.exceptions import ClientError8import requests9import os10region = "us-east-1"11session = boto3.Session(12 aws_access_key_id=os.environ.get('AWS_ACCESS_KEY'),13 aws_secret_access_key=os.environ.get('AWS_SECRET_KEY'),14 region_name=region15)16S3_BUCKET = os.environ.get('S3_BUCKET')17sys.path.append('../..')18logger = logging.getLogger(__name__)19class TranscribeCompleteWaiter(CustomWaiter):20 def __init__(self, client):21 super().__init__(22 'TranscribeComplete', 'GetTranscriptionJob',23 'TranscriptionJob.TranscriptionJobStatus',24 {'COMPLETED': WaitState.SUCCESS, 'FAILED': WaitState.FAILURE},25 client)26 def wait(self, job_name):27 self._wait(TranscriptionJobName=job_name)28class VocabularyReadyWaiter(CustomWaiter):29 """30 Waits for the custom vocabulary to be ready for use.31 """32 def __init__(self, client):33 super().__init__(34 'VocabularyReady', 'GetVocabulary', 'VocabularyState',35 {'READY': WaitState.SUCCESS}, client)36 def wait(self, vocabulary_name):37 self._wait(VocabularyName=vocabulary_name)38def start_job(job_name, media_uri, media_format, language_code, transcribe_client, vocabulary_name=None):39 try:40 job_args = {41 'TranscriptionJobName': job_name,42 'Media': {'MediaFileUri': media_uri},43 'MediaFormat': media_format,44 'LanguageCode': language_code}45 if vocabulary_name is not None:46 job_args['Settings'] = {'VocabularyName': vocabulary_name}47 response = transcribe_client.start_transcription_job(**job_args)48 job = response['TranscriptionJob']49 logger.info("Started transcription job %s.", job_name)50 except ClientError:51 logger.exception("Couldn't start transcription job %s.", job_name)52 raise53 else:54 return job55def list_jobs(job_filter, transcribe_client):56 try:57 response = transcribe_client.list_transcription_jobs(58 JobNameContains=job_filter)59 jobs = response['TranscriptionJobSummaries']60 next_token = response.get('NextToken')61 while next_token is not None:62 response = transcribe_client.list_transcription_jobs(63 JobNameContains=job_filter, NextToken=next_token)64 jobs += response['TranscriptionJobSummaries']65 next_token = response.get('NextToken')66 logger.info("Got %s jobs with filter %s.", len(jobs), job_filter)67 except ClientError:68 logger.exception("Couldn't get jobs with filter %s.", job_filter)69 raise70 else:71 return jobs72def get_job(job_name, transcribe_client):73 try:74 response = transcribe_client.get_transcription_job(75 TranscriptionJobName=job_name)76 job = response['TranscriptionJob']77 logger.info("Got job %s.", job['TranscriptionJobName'])78 except ClientError:79 logger.exception("Couldn't get job %s.", job_name)80 raise81 else:82 return job83def delete_job(job_name, transcribe_client):84 try:85 transcribe_client.delete_transcription_job(86 TranscriptionJobName=job_name)87 logger.info("Deleted job %s.", job_name)88 except ClientError:89 logger.exception("Couldn't delete job %s.", job_name)90 raise91def create_vocabulary(92 vocabulary_name, language_code, transcribe_client,93 phrases=None, table_uri=None):94 try:95 vocab_args = {'VocabularyName': vocabulary_name,96 'LanguageCode': language_code}97 if phrases is not None:98 vocab_args['Phrases'] = phrases99 elif table_uri is not None:100 vocab_args['VocabularyFileUri'] = table_uri101 response = transcribe_client.create_vocabulary(**vocab_args)102 logger.info("Created custom vocabulary %s.",103 response['VocabularyName'])104 except ClientError:105 logger.exception(106 "Couldn't create custom vocabulary %s.", vocabulary_name)107 raise108 else:109 return response110def list_vocabularies(vocabulary_filter, transcribe_client):111 try:112 response = transcribe_client.list_vocabularies(113 NameContains=vocabulary_filter)114 vocabs = response['Vocabularies']115 next_token = response.get('NextToken')116 while next_token is not None:117 response = transcribe_client.list_vocabularies(118 NameContains=vocabulary_filter, NextToken=next_token)119 vocabs += response['Vocabularies']120 next_token = response.get('NextToken')121 logger.info(122 "Got %s vocabularies with filter %s.", len(vocabs), vocabulary_filter)123 except ClientError:124 logger.exception(125 "Couldn't list vocabularies with filter %s.", vocabulary_filter)126 raise127 else:128 return vocabs129def get_vocabulary(vocabulary_name, transcribe_client):130 try:131 response = transcribe_client.get_vocabulary(132 VocabularyName=vocabulary_name)133 logger.info("Got vocabulary %s.", response['VocabularyName'])134 except ClientError:135 logger.exception("Couldn't get vocabulary %s.", vocabulary_name)136 raise137 else:138 return response139def update_vocabulary(140 vocabulary_name, language_code, transcribe_client, phrases=None,141 table_uri=None):142 try:143 vocab_args = {'VocabularyName': vocabulary_name,144 'LanguageCode': language_code}145 if phrases is not None:146 vocab_args['Phrases'] = phrases147 elif table_uri is not None:148 vocab_args['VocabularyFileUri'] = table_uri149 response = transcribe_client.update_vocabulary(**vocab_args)150 logger.info("Updated custom vocabulary %s.",151 response['VocabularyName'])152 except ClientError:153 logger.exception(154 "Couldn't update custom vocabulary %s.", vocabulary_name)155 raise156def delete_vocabulary(vocabulary_name, transcribe_client):157 try:158 transcribe_client.delete_vocabulary(VocabularyName=vocabulary_name)159 logger.info("Deleted vocabulary %s.", vocabulary_name)160 except ClientError:161 logger.exception("Couldn't delete vocabulary %s.", vocabulary_name)162 raise163def upload_bucket(bucket_name, local_file_path, obj_key):164 s3_resource = session.resource('s3')165 print(f"Creating bucket {bucket_name}.")166 s3_resource.meta.client.upload_file(local_file_path, bucket_name, obj_key)167 media_uri = f's3://{bucket_name}/{obj_key}'168 return media_uri169def Transcribe(local_file_path, object_key):170 transcribe_client = session.client('transcribe')171 media_uri = upload_bucket(S3_BUCKET, local_file_path, local_file_path)172 job_name_simple = f'demo-{time.time_ns()}'173 print(f"Starting transcription job {job_name_simple}")174 start_job(job_name_simple, media_uri, 'wav', 'ar-AE', transcribe_client)175 transcribe_waiter = TranscribeCompleteWaiter(transcribe_client)176 transcribe_waiter.wait(job_name_simple)177 job_simple = get_job(job_name_simple, transcribe_client)178 transcript_simple = requests.get(179 job_simple['Transcript']['TranscriptFileUri']).json()180 print(f"Transcript for job {transcript_simple['jobName']}:")181 result = transcript_simple['results']['transcripts'][0]['transcript']182 delete_job(job_name_simple, transcribe_client)183 print(result)184 file_url = object_key + ".txt"185 createfile(file_url, result)186 print("reed from text file" + '-'*88)187 url = upload_bucket(S3_BUCKET, file_url, file_url)188 print("uri link " + url)189 os.remove(local_file_path)190 os.remove(file_url)191 return result192def createfile(url, text):193 f = open(url, "w+", encoding='utf-8')194 f.write(text)195 f.close196def read(path_to_file):197 with open(path_to_file) as f:198 contents = f.readlines()...

Full Screen

Full Screen

transcribe.py

Source:transcribe.py Github

copy

Full Screen

1# Programa basado en transcribe_basics.py como apoyo rapido.2# https://docs.aws.amazon.com/code-samples/latest/catalog/python-transcribe-transcribe_basics.py.html3import logging4import boto35from time import time_ns6from botocore.exceptions import ClientError7from lib.tools.custom_waiter import CustomWaiter, WaitState8import requests9import sys10sys.path.append('../')11logger = logging.getLogger(__name__)12class TranscribeCompleteWaiter(CustomWaiter):13 def __init__(self, client):14 super().__init__(15 'TranscribeComplete',16 'GetTranscriptionJob',17 'TranscriptionJob.TranscriptionJobStatus',18 {'COMPLETED': WaitState.SUCCESS, 'FAILED': WaitState.FAILURE},19 client)20 def wait(self, job_name):21 self._wait(TranscriptionJobName=job_name)22def start_job(job_name, media_uri, media_format, language_code, transcribe_client, vocabulary_name=None):23 """24 Starts a transcription job. This function returns as soon as the job is started.25 To get the current status of the job, call get_transcription_job. The job is26 successfully completed when the job status is 'COMPLETED'.27 :param job_name: The name of the transcription job. This must be unique for your AWS account.28 :param media_uri: The URI where the audio file is stored. This is typically in an Amazon S3 bucket.29 :param media_format: The format of the audio file. For example, mp3 or wav.30 :param language_code: The language code of the audio file. For example, en-US or ja-JP31 :param transcribe_client: The Boto3 Transcribe client.32 :param vocabulary_name: The name of a custom vocabulary to use when transcribing the audio file.33 :return: Data about the job.34 """35 try:36 job_args = {37 'TranscriptionJobName': job_name,38 'Media': {'MediaFileUri': media_uri},39 'MediaFormat': media_format,40 'LanguageCode': language_code,41 'Settings': {42 'MaxSpeakerLabels': 2,43 'ShowSpeakerLabels': True}44 }45 if vocabulary_name is not None:46 job_args['Settings'] = {'VocabularyName': vocabulary_name}47 response = transcribe_client.start_transcription_job(**job_args)48 job = response['TranscriptionJob']49 logger.info("Started transcription job %s.", job_name)50 except ClientError:51 logger.exception("Couldn't start transcription job %s.", job_name)52 raise53 else:54 return job55def get_job(job_name, transcribe_client):56 """57 Gets details about a transcription job.58 :param job_name: The name of the job to retrieve.59 :param transcribe_client: The Boto3 Transcribe client.60 :return: The retrieved transcription job.61 """62 try:63 response = transcribe_client.get_transcription_job(64 TranscriptionJobName=job_name)65 job = response['TranscriptionJob']66 logger.info("Got job %s.", job['TranscriptionJobName'])67 except ClientError:68 logger.exception("Couldn't get job %s.", job_name)69 raise70 else:71 return job72def delete_job(job_name, transcribe_client):73 """74 Deletes a transcription job. This also deletes the transcript associated with75 the job.76 :param job_name: The name of the job to delete.77 :param transcribe_client: The Boto3 Transcribe client.78 """79 try:80 transcribe_client.delete_transcription_job(81 TranscriptionJobName=job_name)82 logger.info("Deleted job %s.", job_name)83 except ClientError:84 logger.exception("Couldn't delete job %s.", job_name)85 raise86def transcribe(media_file_name, media_object_key):87 logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')88 s3_resource = boto3.resource('s3')89 transcribe_client = boto3.client('transcribe')90 print("Welcome to the Amazon Transcribe!")91 print('-' * 100)92 bucket_name = f'el-analista-bucket-{time_ns()}'93 print(f"Creating bucket {bucket_name} in {transcribe_client.meta.region_name} region.")94 bucket = s3_resource.create_bucket(95 Bucket = bucket_name,96 CreateBucketConfiguration = {97 'LocationConstraint': transcribe_client.meta.region_name})98 print(f"Uploading media file {media_file_name}.")99 bucket.upload_file(media_file_name, media_object_key)100 media_uri = f's3://{bucket.name}/{media_object_key}'101 job_name_simple = f'el-analista-{time_ns()}'102 print(f"Starting transcription job {job_name_simple}.")103 start_job(104 job_name_simple,105 f's3://{bucket.name}/{media_object_key}',106 media_object_key[media_object_key.rfind('.')+1:],107 'es-ES',108 transcribe_client)109 transcribe_waiter = TranscribeCompleteWaiter(transcribe_client)110 transcribe_waiter.wait(job_name_simple)111 job_simple = get_job(job_name_simple, transcribe_client)112 transcript_simple = requests.get(113 job_simple['Transcript']['TranscriptFileUri']).json()114 print('-'*88)115 print("Deleting jobs.")116 for job_name in [job_name_simple]:117 delete_job(job_name, transcribe_client)118 print("Deleting bucket.")119 bucket.objects.delete()120 bucket.delete()121 # print('*' * 20)122 # print(transcript_simple)123 # print('*' * 20)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful