Best Python code snippet using autotest_python
kafka_ocr.py
Source:kafka_ocr.py  
1from logging import exception2from typing import final3from .common_functions import KafkaInteractor4from .common_functions import * 5from google.auth.exceptions import GoogleAuthError6import json7import copy8FIELDS = ['imagens']9OCR_VERSION = 'Google Cloud Vision [Google OCR]'10STT = "STT"11NERR = "NERR"12ERR = "ERR"13class Ocr(KafkaInteractor):14  15  def __init__(self, configuration_file, volume_name=None):16    17    super().__init__(configuration_file)18    self.__valid_consumers = self._KafkaInteractor__get_valid_consumers(FIELDS)19  20    self.__volume_name = volume_name if volume_name != None else 'ufmg/'21  def transcribe_images_in_kafka_messages(self, topic_name, obj, verbose=False):22    23    24    message_batch = None25    26    consumer = None27    try:28      timeout_batch = int(self._KafkaInteractor__timeouts['timeout_batch'])*100029      30      consumer = self._KafkaInteractor__connect_kafka_consumer(topic_name, batch_num=3)31      #Não é garantido o retorno de mensagens em um primeiro poll. Por isso 2 ou mais chamadas consecutivas é uma boa ideia.32      33      for _ in range(2):34        message_batch = consumer.poll(timeout_batch)35        if len(message_batch) != 0:36          break37      38      error = None39      40      new_message = ""41      42      empty = True43      for topic_partition, partition_batch in message_batch.items():44          for message in partition_batch:45              final_message = (message.key.decode('utf-8'), message.value.decode('utf-8'))46              47              error, new_message, empty_message = self.__transcribe_image(final_message[1], obj)48              49              if error == ERR:50                raise Exception(f'{message} Erro durante o processo de transcrição. Parando ferramenta de OCR.')51        52              elif error == STT:53                new_message['motivo_erro'] = "Arquivo não encontrado." 54                if publish_kafka_message(self._KafkaInteractor__producer, self._KafkaInteractor__write_topics['error_topic_name'], json.dumps(new_message), final_message[0], get_key=True):55                  if verbose : print(green_string('Transcrição de mensagem feita com sucesso.'))            56              57              elif error == NERR:58                if new_message:59                  60                  if not publish_kafka_message(self._KafkaInteractor__producer, self._KafkaInteractor__write_topics['text_topic_name'], json.dumps(new_message), final_message[0], get_key=True):61                    if verbose : print(green_string('Transcrição de mensagem feita com sucesso.'))62                63                64                if empty_message:65                  self.__send_to_trash(json.dumps(empty_message), final_message[0], get_key=True, verbose=True)66              empty = False67              68              consumer.commit({topic_partition: OffsetAndMetadata(message.offset+1, "no metadata")})69      consumer.close()70      71      return empty72    73    74    except GoogleAuthError as e:75      76      error_msg = {}77      error_msg['erro'] = e78      self.__send_to_error(json.dumps(error_msg), final_message[0], get_key=True)79      raise GoogleAuthError(f'{e} Falha na autenticação.')80    except Exception as e:81      82      try:83        84        consumer.commit({topic_partition: OffsetAndMetadata(message.offset+1, "no metadata")})85        86        consumer.close()87        88        motive = f"Erro: {e}"89        90        destined_message = json.loads(copy.deepcopy(final_message[1]))91        92        destined_message['erro'] = motive93        94        self.__send_to_error(json.dumps(destined_message), final_message[0], get_key=True)95      96      except:97        pass98      99      raise Exception(f'{e} Erro na ferramenta.')100  def __transcribe_image(self, message, obj):101    102    transcribe_list = list()103    empty_list = list()104    105    new_message = ""106    107    full_message = ""108    109    empty_message = ""110    try:111      112      new_message = json.loads(copy.deepcopy(message))113      new_message['versao_ferramenta'] = OCR_VERSION114      if "midias" in new_message:115        image_msg = new_message['midias']116        for item in image_msg:117          118          full_path = item['caminho']119                  120          image_path = full_path.split(self.__volume_name)[1]121          122          text = obj.transcribe_single_image(image_path, kafka=True)123          124          transcribe_list.append(text) if any([True for key,value in text.items() if len(value) != 0]) else \125            empty_list.append(text)126      127      elif "datalake" in new_message:128        image_msg = new_message["datalake"]129        130        image_path = image_msg.split(self.__volume_name)[1]131        text = obj.transcribe_single_image(image_path, kafka=True)132        133        transcribe_list.append(text) if any([True for key,value in text.items() if len(value) != 0]) else \134          empty_list.append(text)135      if len(empty_list) > 0:136        137        empty_message = copy.deepcopy(new_message)138        139        empty_message['motivo_descarte'] = f"Imagens transcritas nao possuem texto."140        141        empty_message['texto'] = empty_list142      143      if len(transcribe_list) > 0:144        145        full_message = copy.deepcopy(new_message)146        147        full_message['texto'] = transcribe_list148      return NERR, full_message, empty_message149    150    except (FileExistsError, FileNotFoundError):151      return STT, new_message, None152    except GoogleAuthError as e:153      raise GoogleAuthError(f'{e}. Falha na autenticação.')154    except Exception as e:155      return ERR, e, None156    157 158 159  def __send_to_trash(self, message, key, get_key=False, verbose=False):160      """161      Método que realiza o envio para o tópico de descarte. Isso ocorre se a mensagem estiver no perÃodo fornecido no arquivo162      de entrada (ou seja, estiver dentro do perÃodo eleitoral).163      164      165      """166      date = datetime.now()167      if not publish_kafka_message(self._KafkaInteractor__producer, self._KafkaInteractor__write_topics['discard_topic_name'], message,key, get_key):168          if verbose: print(red_string(f"[{date}] Envio da mensagem para o tópico falhou."))169          raise Exception('Mensagem não pode ser enviada para o tópico de descarte.')170 171 172  def __send_to_error(self, message, key, get_key=False, verbose=False):173      date = datetime.now()174      if not publish_kafka_message(self._KafkaInteractor__producer, self._KafkaInteractor__write_topics['error_topic_name'], message,key, get_key):175          if verbose: print(red_string(f"[{date}] Envio da mensagem com para o tópico de lixo falhou."))176          raise Exception('Mensagem não pode ser enviada para o tópico de erro.')177          178if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
