How to use check_tracking method in lisa

Best Python code snippet using lisa_python

bot.py

Source:bot.py Github

copy

Full Screen

1import json2import logging3from aiogram import Bot, Dispatcher, types4from aiogram.dispatcher import FSMContext5from mongoengine.errors import NotUniqueError, ValidationError6from aiogram.contrib.fsm_storage.memory import MemoryStorage7import asyncio8from aiogram.utils.executor import start_webhook9from aiogram.contrib.middlewares.logging import LoggingMiddleware10import bot.constance as co11from bot.utils import inline_kb, generate_datas, take_message12from models.models import User13from states.states import FormAnswer14from datataker import datataker as dt15from bot.config import BOT_TOKEN16CHECK_TRACKING = {}17# об'єкт бота18bot = Bot(BOT_TOKEN)19# appointment memory storage20storage = MemoryStorage()21# Диспечер для бота22dp = Dispatcher(bot, storage=storage)23# liging24logging.basicConfig(level=logging.INFO)25dp.middleware.setup(LoggingMiddleware())26# -------------------------------------------------------------------------------------------27@dp.message_handler(commands='start')28async def cmd_start(message: types.Message):29 """30 Create template of replay keyboard31 - Personal data32 - Currency33 - ...34 """35 name = f', {message.from_user.first_name}' if getattr(message.from_user, 'first_name') else '!'36 try:37 User.objects.create(38 telegram_id=message.chat.id,39 username=getattr(message.from_user, 'username', None),40 first_name=getattr(message.from_user, 'first_name', None)41 )42 except NotUniqueError:43 greetings = co.GREETINGS_AGAIN.format(name)44 else:45 greetings = co.GREETINGS_FIRST .format(name)46 kb = types.ReplyKeyboardMarkup(resize_keyboard=True)47 buttons = [types.KeyboardButton(b) for b in co.MAIN_MENU.values()]48 kb.add(*buttons)49 await message.answer(text=greetings, reply_markup=kb)50# -------------------------------------------------------------------------------------51# Currency category52@dp.message_handler(lambda m: co.MAIN_MENU[co.currency_tag] == m.text)53async def cmd_start(message: types.Message):54 """55 Redirect to currency category56 """57 data = [58 {'tag': co.currency_one_to_one_tag},59 {'tag': co.currency_one_to_many_tag}]60 text = [co.ONE_TO_ONE_BUTTON, co.ONE_TO_MANY_BUTTON]61 keyboard = inline_kb(data, text)62 await message.answer('Choice:', reply_markup=keyboard)63@dp.message_handler(commands=co.MAIN_MENU[co.settings_tag])64async def cmd_start(message: types.Message):65 """66 Redirect to settings67 """68 data = {69 'tag': co.settings_tag70 }71 text = co.SETTINGS_BUTTON72 keyboard = inline_kb(data, text)73 await message.answer('Choice:', reply_markup=keyboard)74# ============================================================================75# Currency sector76# ----------------------------------------------------------------------------77# One to one78@dp.callback_query_handler(lambda call: json.loads(call.data)['tag'] == co.currency_one_to_one_tag)79async def currency_list_oto(call:types.CallbackQuery):80 """81 Give a list with awaliable currencies for one to one value82 """83 data = await dt.DataTaker.main_currency_list()84 85 datas, texts, message_text = generate_datas(86 data=data,87 message_if_correct=co.MESSAGE_SPEND_CURRENCY,88 tag=co.currency_list_pairs_tag,89 key=co.CURRENCY_ITEM90 )91 keyboard = inline_kb(datas,texts)92 await bot.edit_message_text(93 text=message_text,94 message_id=call.message.message_id,95 chat_id=call.message.chat.id,96 reply_markup=keyboard97 )98@dp.callback_query_handler(lambda call: json.loads(call.data)['tag'] == co.currency_list_pairs_tag)99async def get_awaliable_pairs(call:types.CallbackQuery):100 """101 Getting avaliable pairs with the selected one above102 """103 val = json.loads(call.data)['val']104 data = await dt.DataTaker.main_active_pair()105 datas = []106 texts = []107 for row in data.get(co.PAIRS_ITEM):108 if row.get(co.CURRENCY_TO_SPEND) == val:109 datas.append({'tag': co.currency_detail_oto_tag, 'val': val, 'pair': row.get(co.PAIR_NAME)})110 texts.append('{0}/{1}: how mach {1} for {0}'.format(row.get(co.CURRENCY_TO_GET), val))111 keyboard = inline_kb(datas,texts)112 message_text = co.MESSAGE_CHOICE_PAIR113 await bot.edit_message_text(114 text=message_text,115 message_id=call.message.message_id,116 chat_id=call.message.chat.id,117 reply_markup=keyboard118 )119@dp.callback_query_handler(lambda call: json.loads(call.data)['tag'] == co.currency_detail_oto_tag)120async def get_awaliable_pairs(call:types.CallbackQuery):121 """122 123 """124 call_dict = json.loads(call.data)125 if call_dict.get('comm') == co.stop_tracking_tag:126 CHECK_TRACKING.update({call.from_user.id: co.stop_tracking_tag})127 else:128 CHECK_TRACKING.update({call.from_user.id: co.start_tracking_tag})129 pair = call_dict['pair']130 131 button_data_list = [132 {133 'tag': co.action_oto_tag,134 'pair': pair,135 'comm': co.start_tracking_tag136 }, 137 {138 'tag': co.currency_one_to_one_tag139 }]140 button_text_list = [co.START_TRACKING_BUTTON, co.BASE_CURRENCY_LIST_BUTTON] 141 kb = inline_kb(button_data_list, button_text_list)142 message_text = await take_message(val=pair, oto=True)143 await bot.edit_message_text(144 text=message_text,145 chat_id=call.message.chat.id,146 message_id=call.message.message_id,147 reply_markup=kb148 )149@dp.callback_query_handler(lambda call: json.loads(call.data)['tag'] == co.action_oto_tag)150async def val_tracking(call: types.CallbackQuery):151 """152 Tracking currency with interval? that they in the settings. Default = 1.5 seconds153 """154 call_dict = json.loads(call.data)155 CHECK_TRACKING.update({call.from_user.id: co.start_tracking_tag})156 pair = call_dict['pair']157 button_data_list = [158 {159 'tag': co.currency_detail_oto_tag,160 'pair': pair,161 'comm': co.stop_tracking_tag162 }, 163 {164 'tag': co.currency_one_to_one_tag165 }]166 button_text_list = [co.STOP_TRACKING_BUTTON, co.BASE_CURRENCY_LIST_BUTTON]167 168 kb = inline_kb(button_data_list, button_text_list)169 num = 20170 for n in range(num):171 if CHECK_TRACKING[call.from_user.id] == co.stop_tracking_tag:172 break173 await asyncio.sleep(1.5)174 175 message_text = await take_message(val=pair, oto=True) + f'\nNumber of check {(num - 1) - n}\n'176 177 await bot.edit_message_text(178 text=message_text,179 chat_id=call.message.chat.id,180 message_id=call.message.message_id,181 reply_markup=kb182 )183 CHECK_TRACKING.update({call.from_user.id: co.stop_tracking_tag})184# ----------------------------------------------------------------------------185# One to meny186@dp.callback_query_handler(lambda call: json.loads(call.data)['tag'] == co.currency_one_to_many_tag)187async def currency_list_otm(call: types.CallbackQuery):188 """189 Give a list with available currencies for one to many190 """191 data = await dt.DataTaker.main_currency_list()192 193 datas, texts, message_text = generate_datas(194 data=data,195 tag=co.currency_detail_otm_tag,196 message_if_correct=co.MESSAGE_ALL_RATES,197 key=co.CURRENCY_ITEM198 )199 keyboard = inline_kb(callback=datas, text=texts)200 201 await bot.edit_message_text(202 text=message_text,203 message_id=call.message.message_id,204 chat_id=call.message.chat.id,205 reply_markup=keyboard206 )207@dp.callback_query_handler(lambda call: json.loads(call.data)['tag'] == co.currency_detail_otm_tag)208async def work_with_uah(call: types.CallbackQuery):209 """210 Realization of kurrency button, with frize course and two buttons211 with start and stop212 """213 call_dict = json.loads(call.data)214 if call_dict.get('comm') == co.stop_tracking_tag:215 CHECK_TRACKING.update({call.from_user.id: co.stop_tracking_tag})216 else:217 CHECK_TRACKING.update({call.from_user.id: co.start_tracking_tag})218 val = call_dict['val']219 button_data_list = []220 button_data_list.append({221 'tag': co.action_otm_tag,222 'val': val,223 'comm': co.start_tracking_tag224 })225 button_data_list.append({226 'tag': co.currency_one_to_many_tag227 })228 button_text_list = [co.START_TRACKING_BUTTON, co.CURRENCY_LIST_BUTTON]229 230 kb = inline_kb(button_data_list, button_text_list)231 message_text = await take_message(val=val)232 await bot.edit_message_text(233 text=message_text,234 chat_id=call.message.chat.id,235 message_id=call.message.message_id,236 reply_markup=kb237 )238@dp.callback_query_handler(lambda call: json.loads(call.data)['tag'] == co.action_otm_tag)239async def val_tracking(call: types.CallbackQuery):240 """241 Tracking currency with interval? that they in the settings. Default = 1.5 seconds242 """243 call_dict = json.loads(call.data)244 CHECK_TRACKING.update({call.from_user.id: co.start_tracking_tag})245 val = call_dict['val']246 button_data_list = []247 button_data_list.append({248 'tag': co.currency_detail_otm_tag,249 'val': val,250 'comm': co.stop_tracking_tag251 })252 button_data_list.append({253 'tag': co.currency_list_tag254 })255 button_text_list = [co.STOP_TRACKING_BUTTON, co.CURRENCY_LIST_BUTTON]256 257 kb = inline_kb(button_data_list, button_text_list)258 num = 20259 for n in range(num):260 if CHECK_TRACKING[call.from_user.id] == co.stop_tracking_tag:261 break262 await asyncio.sleep(2)263 264 message_text = await take_message(val=val) + f'\nNumber of check {(num - 1) - n}\n'265 266 await bot.edit_message_text(267 text=message_text,268 chat_id=call.message.chat.id,269 message_id=call.message.message_id,270 reply_markup=kb271 )272 CHECK_TRACKING.update({call.from_user.id: co.stop_tracking_tag})273#==========================================================================================274# Personal data sector275@dp.message_handler(lambda m: co.MAIN_MENU[co.settings_tag] == m.text)276async def handle_settings(message: types.Message):277 """278 Handler for settings button279 """280 user = User.objects.get(telegram_id=message.chat.id)281 data = user.formatted_data()282 button_data = {'tag': co.change_settings_tag}283 kb = inline_kb(button_data, co.MESSAGE_CHANGE_OWN_INFO)284 await message.answer(285 text=data,286 reply_markup=kb287 )288@dp.callback_query_handler(lambda call: json.loads(call.data)['tag'] == co.change_settings_tag)289async def handle_change_settings(call):290 """291 Execute buttons for personal data of user.292 """293 datas = []294 texts = []295 for com, name_button in co.SETTINGS_BUTTONS.items():296 datas.append({297 'tag': co.change_specific_settings_tag,298 'command': com299 })300 texts.append(name_button)301 keyboard = inline_kb(datas, texts)302 user = User.objects.get(telegram_id=call.from_user.id)303 user_data = user.formatted_data()304 await bot.edit_message_text(305 text=f'{user_data}\n\n{co.MESSAGE_CHOISE_SPECIFIC_CATEGORY}:',306 chat_id=call.message.chat.id,307 message_id=call.message.message_id,308 reply_markup=keyboard309 )310@dp.callback_query_handler(lambda call: json.loads(call.data)['tag'] == co.change_specific_settings_tag)311async def handler_change_specific_settings(call):312 """313 Processor of personal data changes314 """315 call_dict = json.loads(call.data)316 if call_dict['command'] == co.change_name:317 await bot.send_message(call.message.chat.id, co.CHANGE_SETTINGS_MESSAGES[call_dict['command']])318 await FormAnswer.change_name.set()319 elif call_dict['command'] == co.change_phone:320 await bot.send_message(call.message.chat.id, co.CHANGE_SETTINGS_MESSAGES[call_dict['command']])321 await FormAnswer.change_phone.set()322 elif call_dict['command'] == co.change_email:323 await bot.send_message(call.message.chat.id, co.CHANGE_SETTINGS_MESSAGES[call_dict['command']])324 await FormAnswer.change_email.set()325 elif call_dict['command'] == co.change_city:326 await bot.send_message(call.message.chat.id, co.CHANGE_SETTINGS_MESSAGES[call_dict['command']])327 await FormAnswer.change_city.set()328@dp.message_handler(state=FormAnswer.change_name)329async def handle_change_first_name(message:types.Message, state:FSMContext ): 330 """331 Execute change name user332 """333 user = User.objects.get(telegram_id=message.from_user.id)334 try:335 user.first_name = message.text336 user.save()337 message_text = co.CHANGE_SETTINGS_MESSAGES[co.complete_change]338 except ValidationError:339 message_text = co.CHANGE_SETTINGS_MESSAGES[co.wrong_change]340 data = {'tag': co.change_settings_tag}341 kb = inline_kb(data, co.SETTINGS_BUTTONS[co.back_to_settings])342 await bot.send_message(343 message.chat.id,344 text=message_text,345 reply_markup=kb346 )347 await state.finish()348@dp.message_handler(state=FormAnswer.change_phone)349async def handle_change_phone(message:types.Message, state:FSMContext):350 """351 Execute change phone_number352 """353 user = User.objects.get(telegram_id=message.from_user.id)354 try:355 user.phone_number = message.text356 user.save()357 message_text = co.CHANGE_SETTINGS_MESSAGES[co.complete_change]358 except ValidationError:359 message_text = co.CHANGE_SETTINGS_MESSAGES[co.wrong_change]360 data = {'tag': co.change_settings_tag}361 kb = inline_kb(data, co.SETTINGS_BUTTONS[co.back_to_settings])362 await bot.send_message(363 message.chat.id,364 text=message_text,365 reply_markup=kb366 )367 await state.finish()368@dp.message_handler(state=FormAnswer.change_email)369async def handle_change_email(message:types.Message, state:FSMContext):370 """371 Execute change user email372 """373 user = User.objects.get(telegram_id=message.from_user.id)374 try:375 user.email = message.text376 user.save()377 message_text = co.CHANGE_SETTINGS_MESSAGES[co.complete_change]378 except ValidationError:379 message_text = co.CHANGE_SETTINGS_MESSAGES[co.wrong_change]380 data = {'tag': co.change_settings_tag}381 kb = inline_kb(data, co.SETTINGS_BUTTONS[co.back_to_settings])382 await bot.send_message(383 message.chat.id,384 text=message_text,385 reply_markup=kb386 )387 await state.finish()388@dp.message_handler(state=FormAnswer.change_city)389async def handle_change_city(message:types.Message, state:FSMContext):390 """391 Execute change user city392 """393 user = User.objects.get(telegram_id=message.from_user.id)394 try:395 user.user_city = message.text396 user.save()397 message_text = co.CHANGE_SETTINGS_MESSAGES[co.complete_change]398 except ValidationError:399 message_text = co.CHANGE_SETTINGS_MESSAGES[co.wrong_change]400 data = {'tag': co.change_settings_tag}401 kb = inline_kb(data, co.CHANGE_SETTINGS_NAMES_B[co.back_to_settings])402 await bot.send_message(403 message.chat.id,404 text=message_text,405 reply_markup=kb406 )...

Full Screen

Full Screen

030-trackingreview.py

Source:030-trackingreview.py Github

copy

Full Screen

...24 work.run(["push", "origin", "HEAD"])25 REVIEWED_SHA1.append(work.run(["rev-parse", "HEAD"]).strip())26with_class = testing.expect.with_class27extract_text = testing.expect.extract_text28def check_tracking(branch_name, disabled=False):29 def check(document):30 class_names = ["tracking"]31 if disabled:32 class_names.append("disabled")33 p_tracking = document.find("p", attrs=with_class(*class_names))34 testing.expect.check("tracking", extract_text(p_tracking))35 if not disabled:36 testing.expect.check("tracking", p_tracking["class"])37 code_branch = document.findAll("code", attrs=with_class("branch"))38 testing.expect.check(2, len(code_branch))39 testing.expect.check(branch_name, extract_text(code_branch[1]))40 code_repository = document.findAll("code", attrs=with_class("repository"))41 testing.expect.check(2, len(code_repository))42 testing.expect.check(repository.url, extract_text(code_repository[1]))43 return check44SETTINGS = { "email.subjectLine.updatedReview.reviewRebased":45 "Rebased Review: %(summary)s" }46with testing.utils.settings("alice", SETTINGS), frontend.signin("alice"):47 result = frontend.operation(48 "fetchremotebranch",49 data={50 "repository_name": "critic",51 "remote": repository.url,52 "branch": BRANCH_NAME[0],53 "upstream": "refs/heads/" + UPSTREAM_NAME[0] },54 expect={55 "head_sha1": REVIEWED_SHA1[0],56 "upstream_sha1": UPSTREAM_SHA1[0] })57 # Run a GC to make sure the objects fetched by /fetchremotebranch are58 # referenced and thus usable by the subsequent /submitreview operation.59 instance.gc("critic.git")60 commit_ids = result["commit_ids"]61 result = frontend.operation(62 "submitreview",63 data={64 "repository": "critic",65 "branch": "r/" + TEST_NAME,66 "summary": SUMMARY,67 "commit_ids": commit_ids,68 "trackedbranch": { "remote": repository.url,69 "name": BRANCH_NAME[0] }})70 review_id = result["review_id"]71 trackedbranch_id = result["trackedbranch_id"]72 mailbox.pop(73 accept=[to("alice"),74 about("New Review: " + SUMMARY)])75 # Wait for the immediate fetch of the tracked branch that /submitreview76 # schedules.77 instance.synchronize_service("branchtracker")78 # Emulate a review rebase via /rebasetrackingreview.79 frontend.page(80 "r/%d" % review_id,81 expect={82 "tracking": check_tracking(BRANCH_NAME[0]) })83 frontend.page(84 "rebasetrackingreview",85 params={86 "review": review_id })87 result = frontend.operation(88 "fetchremotebranch",89 data={90 "repository_name": "critic",91 "remote": repository.url,92 "branch": BRANCH_NAME[1],93 "upstream": "refs/heads/" + UPSTREAM_NAME[1] },94 expect={95 "head_sha1": REVIEWED_SHA1[1],96 "upstream_sha1": UPSTREAM_SHA1[1] })97 # Run a GC to make sure the objects fetched by /fetchremotebranch are98 # referenced and thus usable by the subsequent /rebasetrackingreview99 # operation.100 instance.gc("critic.git")101 frontend.page(102 "rebasetrackingreview",103 params={104 "review": review_id,105 "newbranch": BRANCH_NAME[1],106 "upstream": UPSTREAM_NAME[1],107 "newhead": REVIEWED_SHA1[1],108 "newupstream": UPSTREAM_SHA1[1] })109 frontend.operation(110 "checkconflictsstatus",111 data={112 "review_id": review_id,113 "new_head_sha1": REVIEWED_SHA1[1],114 "new_upstream_sha1": UPSTREAM_SHA1[1] },115 expect={116 "has_changes": False,117 "has_conflicts": False })118 frontend.operation(119 "rebasereview",120 data={121 "review_id": review_id,122 "new_head_sha1": REVIEWED_SHA1[1],123 "new_upstream_sha1": UPSTREAM_SHA1[1],124 "new_trackedbranch": BRANCH_NAME[1] })125 frontend.page(126 "r/%d" % review_id,127 expect={128 "tracking": check_tracking(BRANCH_NAME[1]) })129 mailbox.pop(130 accept=[to("alice"),131 about("Rebased Review: " + SUMMARY)])132 # Disable and enable the tracking.133 frontend.operation(134 "disabletrackedbranch",135 data={136 "branch_id": trackedbranch_id })137 frontend.page(138 "r/%d" % review_id,139 expect={140 "tracking": check_tracking(BRANCH_NAME[1], disabled=True) })141 frontend.operation(142 "enabletrackedbranch",143 data={144 "branch_id": trackedbranch_id })145 frontend.page(146 "r/%d" % review_id,147 expect={...

Full Screen

Full Screen

stock_move.py

Source:stock_move.py Github

copy

Full Screen

...67 "'%s' in lot '%s'"68 ) % (operation_or_move.product_id.name, qty, lot.name))69 return True70 @api.model71 def check_tracking(self, move, lot_id):72 res = super(StockMove, self).check_tracking(move, lot_id)73 self.check_unicity_move_qty()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful