How to use finished_statuses method in avocado

Best Python code snippet using avocado_python

_classes.py

Source:_classes.py Github

copy

Full Screen

1import asyncio2from abc import abstractmethod3from datetime import datetime4from app.base.status_updater.models import StatusApplication5from app.db import DBS6from app.db.queries.mysql import QUERY_UPDATE_CARGO_STATUS_BY_IDMONOPOLIA7from app.routes.pecom.utils import batch8from app.settings.consts import TK_ID_DICT9from app.settings.log import logger10class TKStatusApplicationsIterator:11 batch_size: int = 112 semaphore_size: int = 113 def __init__(self, applications: list[StatusApplication], test=False):14 self.applications = applications15 self.batch_size = self.batch_size16 self.semaphore = asyncio.Semaphore(self.semaphore_size)17 batcher = batch(self.batch_size)18 self.applications_batched = [b for b in batcher(self.applications)]19 self.test = test20 async def __aiter__(self):21 # нам возвращается в цикле корутины, результаты которых list22 # а мы хотим написать итератор, поэтому делаем yield от результата23 for st_a in await asyncio.gather(24 *[25 self.get_applications_status_filtered(apl)26 for apl in self.applications_batched27 ]28 ):29 # logger.info(f'заявки, у которых мзменился статус: {st_a}')30 if st_a:31 for a in st_a:32 yield a33 async def get_applications_status_filtered(34 self, applications: list[StatusApplication]35 ):36 """37 возвращаем только те заявки, статус которых изменился38 """39 async with self.semaphore:40 sorted_appl = sorted(applications, key=lambda x: x.idmonopolia)41 newer_statuses = await self.get_applications_status(sorted_appl)42 # для прода43 predicate = lambda x, y: x != y44 # для теста45 if self.test:46 predicate = lambda x, y: True47 l = list(48 filter(49 lambda z: predicate(z[0].status, z[1].status),50 zip(sorted_appl, newer_statuses),51 )52 )53 return [t[1] for t in l]54 @abstractmethod55 async def get_applications_status(56 self, applications: list[StatusApplication]57 ) -> list[StatusApplication]:58 """59 Обрабатывает заявки из массива applications и отдает по ним статус60 Размер массива applications равен batch_size.61 Таким образом, если статус может быть только по одной заявке, считаем что в массиве applications62 находится только 1 элемент63 """64 pass65class TKStatusDB:66 def __init__(self, idtk: int, finished_statuses: list[str]):67 self.idtk = idtk68 self.finished_statuses = finished_statuses69 if not self.finished_statuses:70 raise NotImplementedError("Не может не быть финальных статусов грузов")71 mng_ep = DBS["mongo_epool_admin"]["client"]72 self.mysql_write = DBS["mysql_write"]73 self.mysql_read = DBS["mysql"]74 self.orders_coll = mng_ep.get_collection("tk_" + TK_ID_DICT[self.idtk])75 async def update_status(self, application: StatusApplication) -> None:76 mysql_write = DBS["mysql_write"]77 status_info = {"status": application.status}78 # if application.status.id:79 # status_info["status_id"] = application.status.id80 self.orders_coll.update_one(81 {"_id": application.idmonopolia}, {"$set": status_info}82 )83 dt = datetime.now()84 await mysql_write.fetch_one(85 QUERY_UPDATE_CARGO_STATUS_BY_IDMONOPOLIA,86 {87 "idmonopolia": application.idmonopolia,88 "tk_status": application.status,89 "status_changed": dt,90 },91 )92 logger.info(93 f"{TK_ID_DICT[self.idtk].upper()} STATUS UPDATER: "94 f"груз с idmonopolia=%s обновлен %s со status=%s",95 application.idmonopolia,96 dt,97 application.status,98 )99 def get_applications_query(self):100 finish_cond = []101 for finish_st in self.finished_statuses:102 finish_cond += [f"tk_status LIKE '{finish_st}'"]103 finish_cond = " OR ".join(finish_cond)104 return (105 "SELECT idmonopolia, tk_num, tk_status AS status "106 "FROM mircomf4_epool.ep_zakaz_mon4tk_acc "107 f"WHERE idtk={self.idtk} AND (tk_status IS NULL OR "108 f"NOT ({finish_cond}));"109 )110 async def get_applications(self) -> list[StatusApplication]:111 applications = [112 dict(t)113 for t in await self.mysql_read.fetch_all(self.get_applications_query())114 ]115 # logger.info(applications[0])116 applications = [StatusApplication(**t) for t in applications]117 logger.info(118 f"{TK_ID_DICT[self.idtk].upper()} STATUS UPDATER: "119 f"количество заявок для обработки len(applications)={len(applications)}"120 )...

Full Screen

Full Screen

_status_updater.py

Source:_status_updater.py Github

copy

Full Screen

1import datetime2from typing import Type3from app.base.sms.sender_api import SenderAPI4from app.base.sms.sms_handler import SMSHandler5from app.base.sms.sms_sender import SMSSender6from ._classes import TKStatusApplicationsIterator, TKStatusDB7from ._consts import STATUS_TIME_FROM, STATUS_TIME_TO8from ...env import SETTINGS9from ...settings.consts import TK_ID_DICT, SMS_SENDER10from ...settings.log import logger11class TKStatusUpdater:12 def __init__(13 self,14 idtk: int,15 finished_statuses: list[str],16 status_iterator_class: Type[TKStatusApplicationsIterator],17 status_db_class: Type[TKStatusDB] = TKStatusDB,18 ):19 self.idtk = idtk20 self.finished_statuses = finished_statuses21 self.status_iterator_class = status_iterator_class22 self.status_db_class = status_db_class23 self.sms_handler = SMSHandler(24 idtk=idtk, sender=SMSSender(SenderAPI(who=SMS_SENDER))25 )26 async def update_all(self, test=False):27 if not SETTINGS.STATUS_OFF:28 now_hour = datetime.datetime.now().hour29 # Время Московское (установлено в докере)30 # Вежливая отправка31 if STATUS_TIME_FROM <= now_hour < STATUS_TIME_TO:32 status_db = self.status_db_class(self.idtk, self.finished_statuses)33 applications = await status_db.get_applications()34 # для теста обрежем чутка35 if test:36 applications = applications[:5]37 async_gen = self.status_iterator_class(applications, test=test)38 cnt = 039 async for appl in async_gen:40 cnt += async_gen.batch_size41 if not test:42 await status_db.update_status(appl)43 await self.sms_handler.handle(appl)44 if not test:45 logger.info(46 f"Статус обновлен у {cnt} {TK_ID_DICT[self.idtk].upper()} заявок"...

Full Screen

Full Screen

handler.py

Source:handler.py Github

copy

Full Screen

1import json2from okdata.aws.logging import log_add, logging_wrapper3from okdata.aws.status import TraceStatus, TraceEventStatus4from okdata.aws.status.sdk import Status5from okdata.aws.status.wrapper import _status_from_lambda_context6finished_statuses = {7 "ABORTED": TraceEventStatus.FAILED,8 "FAILED": TraceEventStatus.FAILED,9 "TIMED_OUT": TraceEventStatus.FAILED,10 "SUCCEEDED": TraceEventStatus.OK,11}12@logging_wrapper13def act_on_queue(event, context):14 records = event.get("Records")15 if not records:16 raise ValueError("Event does not contain Records")17 # We always get 1 event, see Reliability section of https://aws.amazon.com/sns/faqs/18 record = records[0]19 source = record["EventSource"]20 if source != "aws:sns":21 raise ValueError(22 f"Unsuported 'EventSource' {source}. Supported types: 'aws:sns'"23 )24 event = json.loads(record["Sns"]["Message"])25 try:26 event_status = event["detail"]["status"]27 except KeyError:28 return False29 trace_id = event["detail"].get("name")30 log_add(trace_id=trace_id, event_status=event_status)31 # Ignore statuses where pipeline is not finished32 if event_status not in finished_statuses:33 return False34 return _set_finished_status(event, context, trace_id, event_status)35def _set_finished_status(event, context, trace_id, event_status):36 status = Status(_status_from_lambda_context(event, context))37 trace_event_status = finished_statuses[event_status]38 log_add(trace_event_status=trace_event_status)39 status.add(40 trace_id=trace_id,41 domain="dataset",42 operation="_set_finished_status",43 trace_event_status=trace_event_status,44 trace_status=TraceStatus.FINISHED,45 )46 status_api_ok = status.done() is not None47 log_add(status_api_ok=status_api_ok)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful