Best Python code snippet using autotest_python
translator_pb2.py
Source:translator_pb2.py  
1# -*- coding: utf-8 -*-2# Generated by the protocol buffer compiler.  DO NOT EDIT!3# source: translator.proto4"""Generated protocol buffer code."""5from google.protobuf import descriptor as _descriptor6from google.protobuf import message as _message7from google.protobuf import reflection as _reflection8from google.protobuf import symbol_database as _symbol_database9# @@protoc_insertion_point(imports)10_sym_db = _symbol_database.Default()11DESCRIPTOR = _descriptor.FileDescriptor(12  name='translator.proto',13  package='translator_grpc',14  syntax='proto3',15  serialized_options=b'\n\032com.xyphuz.translator_grpcB\016TranslatorGRPCP\001\242\002\004CXTG',16  create_key=_descriptor._internal_create_key,17  serialized_pb=b'\n\x10translator.proto\x12\x0ftranslator_grpc\"Q\n\x10TranslateRequest\x12\x15\n\rsourceContent\x18\x01 \x01(\t\x12\x12\n\nsourceLang\x18\x02 \x01(\t\x12\x12\n\ntargetLang\x18\x03 \x01(\t\" \n\x0eTranslateReply\x12\x0e\n\x06result\x18\x01 \x01(\t2_\n\nTranslator\x12Q\n\tTranslate\x12!.translator_grpc.TranslateRequest\x1a\x1f.translator_grpc.TranslateReply\"\x00\x42\x35\n\x1a\x63om.xyphuz.translator_grpcB\x0eTranslatorGRPCP\x01\xa2\x02\x04\x43XTGb\x06proto3'18)19_TRANSLATEREQUEST = _descriptor.Descriptor(20  name='TranslateRequest',21  full_name='translator_grpc.TranslateRequest',22  filename=None,23  file=DESCRIPTOR,24  containing_type=None,25  create_key=_descriptor._internal_create_key,26  fields=[27    _descriptor.FieldDescriptor(28      name='sourceContent', full_name='translator_grpc.TranslateRequest.sourceContent', index=0,29      number=1, type=9, cpp_type=9, label=1,30      has_default_value=False, default_value=b"".decode('utf-8'),31      message_type=None, enum_type=None, containing_type=None,32      is_extension=False, extension_scope=None,33      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),34    _descriptor.FieldDescriptor(35      name='sourceLang', full_name='translator_grpc.TranslateRequest.sourceLang', index=1,36      number=2, type=9, cpp_type=9, label=1,37      has_default_value=False, default_value=b"".decode('utf-8'),38      message_type=None, enum_type=None, containing_type=None,39      is_extension=False, extension_scope=None,40      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),41    _descriptor.FieldDescriptor(42      name='targetLang', full_name='translator_grpc.TranslateRequest.targetLang', index=2,43      number=3, type=9, cpp_type=9, label=1,44      has_default_value=False, default_value=b"".decode('utf-8'),45      message_type=None, enum_type=None, containing_type=None,46      is_extension=False, extension_scope=None,47      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),48  ],49  extensions=[50  ],51  nested_types=[],52  enum_types=[53  ],54  serialized_options=None,55  is_extendable=False,56  syntax='proto3',57  extension_ranges=[],58  oneofs=[59  ],60  serialized_start=37,61  serialized_end=118,62)63_TRANSLATEREPLY = _descriptor.Descriptor(64  name='TranslateReply',65  full_name='translator_grpc.TranslateReply',66  filename=None,67  file=DESCRIPTOR,68  containing_type=None,69  create_key=_descriptor._internal_create_key,70  fields=[71    _descriptor.FieldDescriptor(72      name='result', full_name='translator_grpc.TranslateReply.result', index=0,73      number=1, type=9, cpp_type=9, label=1,74      has_default_value=False, default_value=b"".decode('utf-8'),75      message_type=None, enum_type=None, containing_type=None,76      is_extension=False, extension_scope=None,77      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),78  ],79  extensions=[80  ],81  nested_types=[],82  enum_types=[83  ],84  serialized_options=None,85  is_extendable=False,86  syntax='proto3',87  extension_ranges=[],88  oneofs=[89  ],90  serialized_start=120,91  serialized_end=152,92)93DESCRIPTOR.message_types_by_name['TranslateRequest'] = _TRANSLATEREQUEST94DESCRIPTOR.message_types_by_name['TranslateReply'] = _TRANSLATEREPLY95_sym_db.RegisterFileDescriptor(DESCRIPTOR)96TranslateRequest = _reflection.GeneratedProtocolMessageType('TranslateRequest', (_message.Message,), {97  'DESCRIPTOR' : _TRANSLATEREQUEST,98  '__module__' : 'translator_pb2'99  # @@protoc_insertion_point(class_scope:translator_grpc.TranslateRequest)100  })101_sym_db.RegisterMessage(TranslateRequest)102TranslateReply = _reflection.GeneratedProtocolMessageType('TranslateReply', (_message.Message,), {103  'DESCRIPTOR' : _TRANSLATEREPLY,104  '__module__' : 'translator_pb2'105  # @@protoc_insertion_point(class_scope:translator_grpc.TranslateReply)106  })107_sym_db.RegisterMessage(TranslateReply)108DESCRIPTOR._options = None109_TRANSLATOR = _descriptor.ServiceDescriptor(110  name='Translator',111  full_name='translator_grpc.Translator',112  file=DESCRIPTOR,113  index=0,114  serialized_options=None,115  create_key=_descriptor._internal_create_key,116  serialized_start=154,117  serialized_end=249,118  methods=[119  _descriptor.MethodDescriptor(120    name='Translate',121    full_name='translator_grpc.Translator.Translate',122    index=0,123    containing_service=None,124    input_type=_TRANSLATEREQUEST,125    output_type=_TRANSLATEREPLY,126    serialized_options=None,127    create_key=_descriptor._internal_create_key,128  ),129])130_sym_db.RegisterServiceDescriptor(_TRANSLATOR)131DESCRIPTOR.services_by_name['Translator'] = _TRANSLATOR...app.py
Source:app.py  
1from fastapi import FastAPI2from pydantic import BaseModel3from typing import Optional4from fastapi.middleware.cors import CORSMiddleware5import requests6import os7from google.cloud import translate8from google.transliteration import transliterate_text9from indictrans import Transliterator10def gct_action(f):11    project_id = "akshayk-poc"12    location = "global"13    parent = f"projects/{project_id}/locations/{location}"14    model = f"{parent}/models/general/nmt"15    client = translate.TranslationServiceClient()16    def do_action(text:str, **kwargs):17        if not kwargs.get('model'):18            kwargs['model'] = model19        return f(client, text, parent=parent, **kwargs)20    return do_action21@gct_action22def translate_text(client, text, parent=None, source="en", target="en", **kwargs):23    # print("Target: ", target)24    if kwargs.get('model'):25        parts = kwargs["model"].split('/')26        if len(parts) < 6:27            raise ValueError("Invalid model name")28        parent = '/'.join(parts[:4])29        model = kwargs["model"]30    request={31        "parent": parent,32        "model": model,33        "contents": [text],34        "source_language_code": source,35        "target_language_code": target,36    }37    print(request)38    response = client.translate_text(request=request)39    for translation in response.translations:40        return translation.translated_text41@gct_action42def detect_lang(client, text, parent=None, **kwargs):43    response = client.detect_language(44            request={45                "parent": parent,46                "content": text,47                }48            )49    for language in response.languages:50        return language.language_code51app = FastAPI()52app.add_middleware(53    CORSMiddleware,54    allow_origins=["*"],55    allow_methods=["*"],56    allow_headers=["*"],57)58class TranslateRequest(BaseModel):59    text: str60    target_lang: Optional[str] = "en"61@app.get("/")62async def root():63    return {"message": "Hello World"}64@app.post("/api/getLang")65async def getLang(req: TranslateRequest):66    return detect_lang(req.text)67@app.post("/api/translate")68async def translateHandler(req: TranslateRequest):69    lang_map = {70            'hi': 'hin',71            'kn': 'kan',72            'en': 'eng'73    }74    model_map = {}75    model_map["en"] = {76            "hi": "projects/891447297173/locations/us-central1/models/TRL4307101506324135936"77    }78    src_lang = detect_lang(req.text)79    print("Transliterating", req.text, "to", src_lang)80    src_text_translit = req.text81    if src_lang != 'en':82        src_text_translit = transliterate_text(req.text, lang_code=src_lang)83    print("Transliterated text:", src_text_translit)84    print("Translating", src_text_translit, "to", req.target_lang)85    if model_map.get(src_lang) and model_map[src_lang].get(req.target_lang):86        model = model_map[src_lang][req.target_lang]87        ret = translate_text(src_text_translit, target=req.target_lang, model=model)88    else:89        ret = translate_text(src_text_translit, source=src_lang, target=req.target_lang)90    print("Translated text:", ret)91    print(lang_map[src_lang])92    if req.target_lang != 'en':93        t = Transliterator(source=lang_map[req.target_lang], target='eng')94        ret = t.transform(ret)95    print(ret)...consumers.py
Source:consumers.py  
1import logging2import json3from channels.sessions import channel_session4# from translator.models import TranslateRequest5from lingglemt.linggletranslate import spg_translate_with_progress6log = logging.getLogger(__name__)7# def http_consumer(message):8#     # Decode the request from message format to a Request object9#     django_request = AsgiRequest(message)10#     # Run view11#     django_response = view(django_request)12#     # Encode the response into message format13#     for chunk in AsgiHandler.encode_response(django_response):14#         message.reply_channel.send(chunk)15@channel_session16def ws_connect(message):17    prefix = message['path'].strip('/').split('/')[0]18    if prefix != 'translate':19        log.debug('invalid ws path=%s', message['path'])20        return21    # new_request = TranslateRequest.objects.create()22    # message.channel_session['rid'] = new_request.id23@channel_session24def ws_receive(message):25    def reply_json(status, content):26        msg = {'type': status, 'content': content}27        message.reply_channel.send({'text': json.dumps(msg)})28    # Look up the room from the channel session, bailing if it doesn't exist29    # try:30    #     rid = message.channel_session['rid']31    #     request = TranslateRequest.objects.get(id=rid)32    # except KeyError:33    #     log.debug('no rid in channel_session')34    #     return35    # except TranslateRequest.DoesNotExist:36    #     log.debug('recieved message, but request does not exist rid=%s', rid)37    #     return38    data = json.loads(message['text'])39    # request.source = data['text']40    for status, content in spg_translate_with_progress(data['text']):41        # TODO: if new request comes, it should be interrupted.42        # if request.source is not data['text']:43        #     return44        reply_json(status, content)45    # request.result = content46    # try:47    #     for status, content in spg_translate(request.source):48    #         reply_json(status, content)49    #     request.result = content50    # except Exception as e:51    #     message.reply_channel.send({'text', json.dumps(e)})52@channel_session53def ws_disconnect(message):54    pass55    # try:56    #     rid = message.channel_session['rid']57    #     request = TranslateRequest.objects.get(id=rid)58    #     # request.delete()59    # except (KeyError, TranslateRequest.DoesNotExist):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
