How to use stat_log method in yandex-tank

Best Python code snippet using yandex-tank

GUI.py

Source:GUI.py Github

copy

Full Screen

1from View import AbcView2from View import AbcViewFile3class View(AbcView):4 """Консольный интерфейс.5 """6 def show_start_message(self):7 """ Вывести приветственное сообщение. """8 print('Здравствуйте, это игра BlackJack.\n')9 def show_main_menu(self, lines_menu):10 """ Показать главное меню. """11 print('---------------------------------')12 menu = {1: ' 1. Новая игра.\n',13 2: ' 2. Загрузить игру.\n',14 3: ' 3. Восстановить игру.\n',15 10: ' 0. Выход.\n'}16 mes = ''17 for line in sorted(lines_menu):18 mes += menu[line]19 print(mes)20 def get_start_action(self):21 """ Узнать первоначальное действие игрока. """22 return input("Введите соответствующую цифру: ")23 def get_action(self):24 """ Узнать действие игрока. """25 return input("> ")26 def get_name_gamer(self):27 """Спрашивает у пользователя желаемое имя игрока28 :return: Возвращает введёное имя игрока29 """30 print('---------------------------------')31 print(' Новая игра:')32 name = input('Введите название игры: ')33 return name34 def show_game_menu(self):35 """ Показать игровое меню. """36 print('---------------------------------')37 print(' 1. Играть.\n 0. Назад.')38 def show_start_capital(self, gamer):39 """ Показать начальный баланс. """40 print('Начальный баланс: ' + str(gamer.balance))41 def show_capital(self, gamer):42 """ Показать баланс. """43 print(gamer)44 def show_not_ante(self):45 """ Сообщить что ставка не сделана. """46 print('Сделайте ставку!')47 def show_bye(self):48 """ Показать завершающее сообщение. """49 print('---------------------------------')50 print('До свидания!')51 def get_ante(self, gamer):52 """ Спросить размер ставки. """53 print('Ваш баланс: {}'.format(str(gamer.balance)))54 return input('Размер вашей ставки ?\n> ')55 def show_card_points(self, cards, points):56 """ Показать карты и очки. """57 s = '{0}\n{1}\n{2}\n{3}'.format(' ___ '*len(cards), '| |'*len(cards),58 '| {.name} |'*len(cards), '|___|'*len(cards))59 print(s.format(*cards).replace('10 ', '10'))60 print('Кол-во очков: '+str(points))61 def show_game_dial(self):62 """ Показать игровой диалог. """63 print('---------------------------------')64 print(' 1. Ещё.\n 0. Хватит.')65 def show_part_end(self, player1, player2, is_win):66 """ Вывести результат партии. """67 print('---------------------------------')68 print('Карты ' + player1['name'] + ':')69 self.show_card_points(player1['card'], player1['points'])70 print('Карты ' + player2['name'] + ':')71 self.show_card_points(player2['card'], player2['points'])72 if is_win:73 print('Вы выйграли !')74 else:75 print('Вы проиграли !')76 def show_stat_game(self, win, les):77 """Показать статистику игры78 :param win: Кол-во выйгрышей игрока79 :param les: Кол-во пройгрышей игрока80 :return:81 """82 print('{} : {} (выйгрышей : пройгрышей)'.format(str(win), str(les)))83 def show_list_games(self, games):84 """Вывести список доступных игр.85 :param games: Список доступных игр86 """87 print('---------------------------------')88 print('Выберите игру:')89 i = 090 for game in games:91 i += 192 print(' '+str(i)+'. '+game)93 print('\n 0. Назад')94 def show_restore_fail(self):95 """ Вывести сообщение о неудачном восстановлении. """96 print('---------------------------------')97 print('Восстановить не удалось!')98 def show_name_gamer(self, gamer):99 """ Вывести назване игры (имя игрока). """100 print('---------------------------------')101 print('Название игры: '+gamer.name)102 def show_not_enough_money(self):103 """ Вывести сообщение о низком балансе. """104 print('Ошибка, низкий баланс!')105class ViewFile(AbcViewFile):106 """Интерфейс для записи в файл107 """108 def write_game_log(self, log):109 """Записать лог игры.110 :param log: Сообщение.111 """112 with open('game_log', 'a') as f:113 f.write(log+'\n')114 def write_stat_game(self, game_name, is_win):115 """Записать статистику игры.116 :param game_name: Название игры (имя игрока)117 :param is_win: если подедил - 1, иначе - 0118 """119 import json120 line = [0, 0, 0]121 try:122 # Открыть файл и считываем json объект123 with open('stat_log.json', 'r') as fr:124 stat_log = json.loads(fr.read())125 except IOError:126 # Если файл не существует127 if is_win:128 line[0] += 1129 else:130 line[1] += 1131 # Формируем словарь и записываем в json файл132 stat_log = dict()133 stat_log[game_name] = line134 with open('stat_log.json', 'w') as fw:135 json.dump(stat_log, fw)136 return line137 try:138 # Если файл существует и в нём есть нужная запись139 # Достаем из него значения и увеличиваем одно из них140 line = stat_log[game_name]141 except KeyError:142 # Если файл существует, но в нём нет нужной записи143 line = [0, 0, 0]144 if is_win:145 line[0] += 1146 else:147 line[1] += 1148 stat_log[game_name] = line149 with open('stat_log.json', 'w') as fw:150 json.dump(stat_log, fw)151 return line152 def write_stat_game_all(self, name_game, balance, win=0, les=0):153 """Запись статистики игр.154 :param name_game: Название игры155 :param balance: Баланс156 :param win: Кол-во выйгрышей157 :param les: Кол-во пройгрышей158 :return:159 """160 import json161 line = [win, les, balance]162 try:163 # Открыть файл и считываем json объект164 with open('stat_log.json', 'r') as fr:165 stat_log = json.loads(fr.read())166 except IOError:167 # Если файл не существует168 # Формируем словарь и записываем в json файл169 stat_log = dict()170 stat_log[name_game] = line171 with open('stat_log.json', 'w') as fw:172 json.dump(stat_log, fw)173 return174 try:175 # Если файл существует и в нём есть нужная запись176 # Достаем из него значения177 line = stat_log[name_game]178 line[2] = balance179 except KeyError:180 # Если файл существует, но в нём нет нужной записи181 line = [win, les, balance]182 stat_log[name_game] = line183 with open('stat_log.json', 'w') as fw:184 json.dump(stat_log, fw)185 return186 def read_conf(self):187 """Возвращает содержимое конфиг файла188 :return: Словарь с конфигурациями189 """190 import json191 with open("config.json", "r") as f:192 conf = json.loads(f.read())193 return conf194 def read_part_log(self, min_stack):195 """ Возвращает часть лог файла196 :min_stack: Минимальное кол-во строк которое вернет функция197 :return: Список строк лог файла198 """199 try:200 f = open('game_log', 'r')201 except IOError:202 return 0203 block_log = []204 i = 0205 for line in f.readlines():206 if line.find('Start game') != -1:207 i = 0208 i += 1209 block_log.append(line)210 if (len(block_log) >= min_stack) and (i < min_stack):211 while len(block_log) > min_stack:212 block_log.pop(0)213 return block_log214 def check_exist_game(self):215 """ Проверить существует ли файл статистики игры. """216 import json217 try:218 # Открыть файл и считываем json объект219 with open('stat_log.json', 'r') as fr:220 json.loads(fr.read())221 return 1222 except IOError:223 return 0224 def read_stat(self):225 """ Возвращает словарь со статистикой игры.226 :return:227 """228 import json229 with open('stat_log.json', 'r') as fr:230 stat_log = json.loads(fr.read())...

Full Screen

Full Screen

run_oozie_workflow.py

Source:run_oozie_workflow.py Github

copy

Full Screen

1#!/usr/bin/python2# Purpose: wrapper for oozie workflow3import os,getpass,envvars,sys4import commands5import time6import datetime7from optparse import OptionParser8def main():9 global return_code, group, start_line10 return_code = 011 start_line = "".join('*' for i in range(100))12 print(start_line)13 print("run_oozie_workflow.py -> Started : " + datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))14 sys.stdout.flush()15 abc_parameter,workflow, options_file = arg_handle()16 abc_parameters=abc_parameter.split(',')17 env=abc_parameters[0]18 env_ver=abc_parameters[1]19 app=abc_parameters[2]20 sub_app=abc_parameters[3]21 group=abc_parameters[4]22 parent_script=abc_parameters[5]23 #config path for ABC logging24 # Get envvars from oozie_common_properties file25 envvars.populate(env,env_ver,app, sub_app)26 print ("ABC Parameter"+abc_parameter)27 oozie_wf_cmd = "oozie job -oozie "+envvars.list['oozieNode']+" -config "28 oozie_wf_cmd = oozie_wf_cmd + options_file29 oozie_wf_cmd = oozie_wf_cmd + ' -Doozie.wf.application.path='30 oozie_wf_cmd = oozie_wf_cmd + workflow31 oozie_wf_cmd = oozie_wf_cmd + ' -debug -run'32 print("run_oozie_workflow.py -> Invoked : " + oozie_wf_cmd)33 rc, jobid_str = commands.getstatusoutput(oozie_wf_cmd)34 if rc == 0: 35 jobid_str = jobid_str.split('job: ')36 jobid = jobid_str[1].strip()37 abc_line = "|".join([group,jobid,"oozie","run_oozie_workflow.py","","","STARTED",38 getpass.getuser(),"oozie workflow started",str(datetime.datetime.today())]) 39 print("**ABC_log**->"+abc_line)40 sys.stdout.flush() 41 else:42 print("run_oozie_workflow.py -> Failed : " + jobid_str)43 return_code = 844 sys.exit(return_code)45 46 print(jobid + "-> Started : " + datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))47 sys.stdout.flush()48 #ABC logging49 50 #status = "RUNNING"51 #cnt = 052 53 get_status(jobid,envvars.list['oozieNode'],"Main")54 abc_line = "|".join([group,jobid,"oozie","run_oozie_workflow.py","","","ENDED",55 getpass.getuser(),"oozie workflow Ended",str(datetime.datetime.today())]) 56 print("**ABC_log**->"+abc_line)57 sys.stdout.flush()58 print(jobid + "-> Ended : " + datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))59 60 61 abc_line = "|".join([group,"run_oozie_workflow.py","python",parent_script,"","","ENDED",62 getpass.getuser(),"return-code:"+str(return_code),str(datetime.datetime.today())]) 63 print("**ABC_log**->"+abc_line)64 sys.stdout.flush() 65 print("run_oozie_workflow.py -> Ended : " + datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))66 67def check_subworkflow(stat_log,oozieNode,oozie_st_cmd,parent_step_name):68 try: 69 print stat_log[len(stat_log)-3] 70 sys.stdout.flush()71 sub_jobid = stat_log[len(stat_log)-3].split()[2].strip()72 final_status = stat_log[4].split(':')[1]73 if "-oozie-oozi-W" in sub_jobid:74 sub_jobid = sub_jobid.split("-oozie-oozi-W")[0] + "-oozie-oozi-W"75 print "Sub workflow:" + sub_jobid + "initiated, checking its status"76 get_status(sub_jobid,oozieNode,"Sub")77 print "Sub workflow:" + sub_jobid + "Completed, checking its parent workflow status"78 # After sub workflow status is verified check the status of current workflow79 rc = 180 retry_count = 081 while ( rc!=0 or retry_count <=10):82 rc, stat_log = commands.getstatusoutput(oozie_st_cmd)83 retry_count = retry_count + 184 if (rc!=0):85 print "Error getting Status, return-code: " + str(rc)86 print stat_log87 sys.exit(11)88 #print "After sub workflow" + stat_log89 stat_log = stat_log.split('\n')90 # check if there is second sub workflow.91 return check_subworkflow(stat_log,oozieNode,oozie_st_cmd,parent_step_name)92 else:93 return stat_log94 except ValueError:95 print('Ignoring: malformed line: "{}"'.format(stat_log))96 get_status(jobid,oozieNode,parent_step_name)97 except IndexError:98 print ('Index error occurred.. Return-code :'+str(rc))99 print "\n".join(stat_log)100 print ('Re-trying..')101 get_status(jobid,oozieNode,parent_step_name)102 103def get_status(jobid,oozieNode,parent_step_name):104 oozie_st_cmd = "oozie job -oozie "+oozieNode+" -info " + jobid105 rc = 1106 retry_count = 0107 while ( rc!=0 or retry_count <=10):108 rc, stat_log = commands.getstatusoutput(oozie_st_cmd)109 retry_count = retry_count + 1110 if rc != 0:111 print "Error getting Status, max retry count 10 reached.. Return-code: " + str(rc)112 print stat_log113 #get_status(jobid,oozieNode,parent_step_name)114 sys.exit(11) 115 try: 116 stat_log = stat_log.split('\n')117 stat_log = check_subworkflow(stat_log,oozieNode,oozie_st_cmd,parent_step_name)118 #last_line = stat_log[len(stat_log)-3].split()119 status = stat_log[len(stat_log)-3][78:88].strip()120 ext_status = stat_log[len(stat_log)-3][111:122].strip()121 ext_rc = stat_log[len(stat_log)-3][122:].strip()122 step_name = stat_log[len(stat_log)-3][:78].split("@")[1]123 #print "Status is " + str(stat_log[len(stat_log)-3][78:88].strip()) 124 if status.strip() == 'RUNNING' or status.strip() == 'PREP':125 126 while (parent_step_name == step_name) and (status.strip() == 'RUNNING' or status.strip() == 'PREP'):127 time.sleep(5) 128 rc = 1129 retry_count = 0130 while ( rc!=0 or retry_count <=10):131 rc, stat_log = commands.getstatusoutput(oozie_st_cmd)132 retry_count = retry_count + 1133 if rc == 0:134 stat_log = stat_log.split('\n')135 st_count = 0136 while ( st_count <=10):137 if stat_log[st_count].split(':')[0].lower().strip() == "status":138 break139 st_count = st_count + 1140 final_status = stat_log[st_count].split(':')[1]141 print stat_log[len(stat_log)-3]142 sys.stdout.flush()143 status = stat_log[len(stat_log)-3][78:88].strip() 144 ext_status = stat_log[len(stat_log)-3][111:122].strip() 145 ext_rc = stat_log[len(stat_log)-3][122:].strip() 146 step_name = stat_log[len(stat_log)-3][:78].split("@")[1]147 else:148 print "Error getting Status, return-code: " + str(rc)149 print stat_log150 sys.exit(11)151 if (parent_step_name != step_name):152 abc_line = "|".join([group,step_name,"oozie",jobid,"","",status.strip(),153 getpass.getuser(),"oozie workflow running",str(datetime.datetime.today())]) 154 print("**ABC_log**->"+abc_line)155 sys.stdout.flush()156 get_status(jobid,oozieNode,step_name)157 return158 #return159 rc, stat_log = commands.getstatusoutput(oozie_st_cmd)160 stat_log = stat_log.split('\n')161 print stat_log[len(stat_log)-3] 162 sys.stdout.flush()163 sub_jobid = stat_log[len(stat_log)-3].split()[2].strip()164 st_count = 0165 while ( st_count <=10):166 if stat_log[st_count].split(':')[0].lower().strip() == "status":167 break168 st_count = st_count + 1169 final_status = stat_log[st_count].split(':')[1]170 #It should not reach here with Running status171 if final_status.strip() == 'RUNNING':172 get_status(jobid,oozieNode,parent_step_name)173 return 174 if (ext_status.strip() != 'SUCCEEDED' and ext_rc.strip() != '-' and ext_status.strip() != 'OK') or final_status.strip() != 'SUCCEEDED':175 print "Erroring Since - Ext Status:" + ext_status.strip() + "Ext RC:" + ext_rc.strip()+ "Overall Status:" + final_status.strip()176 print(start_line)177 print "\n".join(stat_log)178 print(start_line)179 abc_line = "|".join([group,step_name,"oozie",jobid,"","",status.strip(),180 getpass.getuser(),"oozie workflow "+ext_status.strip(),str(datetime.datetime.today())]) 181 print("**ABC_log**->"+abc_line)182 sys.stdout.flush()183 print "Return-Code:9"184 sys.exit(9)185 else:186 abc_line = "|".join([group,step_name,"oozie",jobid,"","",ext_status.strip(),187 getpass.getuser(),"oozie workflow running",str(datetime.datetime.today())]) 188 print("**ABC_log**->"+abc_line)189 sys.stdout.flush()190 191 #print(jobid + "-> Status : " + status.strip())192 sys.stdout.flush()193 194 status = stat_log[4].split(':')[1]195 except ValueError:196 print('Ignoring: malformed line: "{}"'.format(stat_log))197 get_status(jobid,oozieNode,parent_step_name)198 except IndexError:199 print ('Index error occurred.. Return-code :'+str(rc))200 print "\n".join(stat_log)201 print ('Re-trying..')202 get_status(jobid,oozieNode,parent_step_name)203 204 return # rc, status205 206 207def arg_handle():208 usage = "usage: run_oozie_workflow.py workflow options_file abc_parameter"209 parser = OptionParser(usage)210 (options, args) = parser.parse_args()211 212 if len(args) != 3:213 abc_line = "|".join(["","run_oozie_workflow.py","python","run_job.py","",str(args),"FAILED",214 getpass.getuser(),"Argument, workflow path and properties file, is required, return-code:10 ",str(datetime.datetime.today())]) 215 print("**ABC_log**->"+abc_line)216 sys.stdout.flush()217 parser.error("Argument, workflow path and properties file, is required.")218 return_code = 10219 sys.exit(return_code)220 abc_line = "|".join([args[2].split(',')[4],"run_oozie_workflow.py","python",args[2].split(',')[5],"",str(args),"STARTED",221 getpass.getuser(),"run_oozie_workflow started",str(datetime.datetime.today())]) 222 print("**ABC_log**->"+abc_line)223 sys.stdout.flush() 224 return args[2], args[0],args[1]225if __name__ == "__main__":...

Full Screen

Full Screen

Api.py

Source:Api.py Github

copy

Full Screen

1# pylint: disable=C01142import logging3import tweepy # for exceptions4from Externals import Network5from Externals.Measure import MeasureTweet6from Externals.message import fromTweet7from .Tweet import User, mocked_source, mocked_tweets8logger = logging.getLogger('bot.test.api')9class Count: # pylint: disable=too-few-public-methods10 def __init__(self):11 self.correct = 012 self.missed = 013 self.bad_content = 014class Result: # pylint: disable=too-few-public-methods15 def __init__(self):16 self.tweet = Count()17 self.follow = Count()18class MockApi(Network): # pylint: disable=too-many-instance-attributes19 def __init__(self, **kwargs):20 self.running_id = 1000121 self.myself = User.theBot22 self.mode = kwargs.get('mode', 'testcases')23 mocked_t = mocked_tweets()24 if self.mode == 'external':25 self.mock = mocked_source()26 elif self.mode == 'testcases':27 self.mock = mocked_t28 elif self.mode == 'id':29 self.mock = [t for t in mocked_t if t.id in kwargs.get('id_list', [])]30 else:31 raise ValueError("Invalid mode in {}: {}".format(__name__, self.mode))32 self.replies = {}33 self.double_replies = []34 self.measure = MeasureTweet()35 self.readonly = True36 self.high_message = 037 self.from_function = fromTweet38 def get_status(self, status_id):39 for t in self.mock:40 if t.id == status_id:41 return t42 raise tweepy.TweepError("Kein solcher Tweet vorhanden")43 def post_single(self, text, **kwargs):44 super().post_single(text, **kwargs)45 orig_tweet = kwargs.get('reply_to_status', None)46 if orig_tweet:47 # don't track thread answers:48 if orig_tweet != self.running_id:49 if orig_tweet.id in self.replies:50 logger.warning("Tweet %d was replied to twice!", orig_tweet)51 self.double_replies.append(orig_tweet.id)52 self.replies[orig_tweet.id] = text.strip()53 self.running_id += 154 return self.running_id55 def defollow(self, _):56 pass57 def follow(self, _):58 pass59 def is_followed(self, _):60 return True61 def mentions(self):62 mention_list = []63 for t in self.mock:64 for um in t.raw['entities']['user_mentions']:65 if um['screen_name'] == self.myself.screen_name:66 mention_list.append(t)67 logger.debug("found %d mentions", len(mention_list))68 return mention_list69 def timeline(self):70 result = [t for t in self.mock if str(t.author) == "followee"]71 logger.debug("found %d status in timeline", len(result))72 return result73 def hashtags(self, mt_list):74 result = [t for t in self.mock if fromTweet(t, self.myself).has_hashtag(mt_list)]75 logger.debug("found %d status in hashtags", len(result))76 return result77 def statistics(self, output='descriptive'):78 stat_log = logging.getLogger('statistics')79 res_count = Result()80 stat_log.debug(" RESULTS")81 for t in self.mock:82 was_replied_to = t.id in self.replies83 if t.expected_answer is None:84 if was_replied_to:85 stat_log.error("Tweet %d falsely answered", t.id)86 res_count.tweet.missed += 187 else:88 res_count.tweet.correct += 189 stat_log.info("Tweet %d correctly unanswered", t.id)90 continue91 # expected answer is not None:92 if not was_replied_to:93 res_count.tweet.missed += 194 stat_log.error("Tweet %d falsely unanswered", t.id)95 continue96 # correctly answered: is it the correct answer?97 if t.expected_answer == self.replies[t.id]:98 res_count.tweet.correct += 199 stat_log.info("Tweet %d correctly answered with correct answer", t.id)100 continue101 res_count.tweet.bad_content += 1102 stat_log.error("Tweet %d correctly answered, but with wrong answer", t.id)103 stat_log.warning(t.expected_answer)104 stat_log.warning("↑↑↑↑EXPECTED↑↑↑↑ ↓↓↓↓GOT THIS↓↓↓↓")105 stat_log.warning(self.replies[t.id])106 self.report_statisctics(stat_log, output, res_count)107 return res_count.tweet.missed + res_count.tweet.bad_content108 def report_statisctics(self, stat_log, output, res_count): # pylint: disable=R0201109 denominator = (res_count.tweet.correct + res_count.tweet.missed +110 res_count.tweet.bad_content)111 if denominator == 0:112 stat_log.log(51, "No testcases found")113 elif output == 'descriptive':114 stat_log.log(51, "ALL GOOD: %2d", res_count.tweet.correct)115 stat_log.log(51, "INCORRECT TEXT: %2d", res_count.tweet.bad_content)116 stat_log.log(51, "WRONG ANSWER/NOT ANSWER:%2d", res_count.tweet.missed)117 elif output == 'summary':118 ratio = (res_count.tweet.correct) / (0.0 + denominator)119 stat_log.log(51, "A %d/%d R %.1f%%",120 res_count.tweet.correct,121 res_count.tweet.bad_content + res_count.tweet.missed,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run yandex-tank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful