Best Python code snippet using hypothesis
request_handler.py
Source:request_handler.py  
1import hashlib2from server.async_database import *3from asyncio import StreamReader, StreamWriter4from server.async_database import *5import aiomysql6from server.data_buffer import DataBuffer7import random8import datetime9class RequestHandler:10    def __init__(self, data_buffer: DataBuffer):11        self.request = []12        self.data_buffer = data_buffer13    async def Request_Binding(self, request_number):14        result = None15        request_number = int(request_number)16        # 0ë² ìì²ì ì¬ì©íì§ ìëë¤.17        # ìì² ëë²ë 2ì ë°°ìë¡ ì íë¤18        if request_number == 0:19            pass20        if request_number == 2:21            echo = EchoRequest(self.data_buffer)22            result = await echo.main()23        elif request_number == 4:24            select = LoginRequest(self.data_buffer)25            result = await select.main()26        elif request_number == 6:27            create = CheckRegisterIdRequest(self.data_buffer)28            result = await create.main()29        elif request_number == 8:30            insert = CreateNewAccount(self.data_buffer)31            result = await insert.main()32        elif request_number == 10:33            insert = GetCaptchaTestSet(self.data_buffer)34            result = await insert.main()35        elif request_number == 12:36            insert = GetLastDriveDate(self.data_buffer)37            result = await insert.main()38        elif request_number == 14:39            insert = GetCaptchaTestSet2(self.data_buffer)40            result = await insert.main()41        elif request_number == 16:42            insert = GetUserInfo(self.data_buffer)43            result = await insert.main()44        elif request_number == 18:45            insert = GetAllUserInfo(self.data_buffer)46            result = await insert.main()47        elif request_number == 20:48            insert = UpdateUserInfo(self.data_buffer)49            result = await insert.main()50        elif request_number == 22:51            insert = UpdateLastDriveDate(self.data_buffer)52            result = await insert.main()53        elif request_number == 24:54            insert = UpdateAlcoholCount(self.data_buffer)55            result = await insert.main()56            57        elif request_number == 26:58            insert = FindUserId(self.data_buffer)59            result = await insert.main()60        elif request_number == 28:61            insert = FindUserPw(self.data_buffer)62            result = await insert.main()63        return result64'''65íì¬ EchoRequest ë¡ì§ììë§ 66ë°ì´í° ë²í¼ë¥¼ ì´ì©í´ ë°ì´í°ë¥¼ ì²ë¦¬ íëë¡í¨.67ì°¨í ìì±íë ê°ì²´ 모ëì ë¤ìê³¼ ê°ì´ ì ì© ìí¬ê²68'''69# 0ë² ìì²ì¼ë¡ ì¬ì©íì§ ìëë¤70# í
ì´ë¸ ìì±ì ìí ìì²71class CreateTableRequest(RequestHandler):72    def __init__(self, data_buffer: DataBuffer):73        super().__init__(data_buffer)74    async def main(self):75        temp = self.data_buffer.get_data()76        table_create_query = "CREATE TABLE person " \77                             "( _id INT AUTO_INCREMENT, name VARCHAR(32) NOT NULL, " \78                             "belong VARCHAR(12) DEFAULT 'FOO', " \79                             "phone VARCHAR(12), PRIMARY KEY(_id) ) " \80                             "ENGINE=INNODB"81        result = await query_operator(table_create_query)82        return result83# 2ë² ìì²84# ìì½ ìì²ì¼ë¡, ë°ì ë°ì´í°ë¥¼ ê·¸ëë¡ ë³´ë´ì¤ë¤.85class EchoRequest(RequestHandler):86    def __init__(self, data_buffer: DataBuffer):87        super().__init__(data_buffer)88    async def main(self):89        try:90            result = self.data_buffer.get_data()91            if result:92                return result93            else:94                result = "false"95                return result96        except:97            result = "false"98            return result99# 4ë² ìì²100# ë¡ê·¸ì¸ ìì²101class LoginRequest(RequestHandler):102    def __init__(self, data_buffer: DataBuffer):103        super().__init__(data_buffer)104    async def main(self):105        temp = self.data_buffer.get_data()106        table_name = "member"107        login_query = "select * from" + " " + table_name108        try:109            result = await query_operator(login_query)110            id = None111            pw = None112            temp = temp.replace("[", "", 1)113            temp = temp.replace("]", "", 1)114            temp = temp.replace(" ", "")115            temp = temp.replace("'", "")116            temp = temp.split(',')117            for i in temp:118                i = i.split('=')119                if (i[0] == "user_id"):120                    id = i[1]121                if (i[0] == "user_pw"):122                    pw = i[1]123            db_id = None124            db_pw = None125            for i in result:126                db_id = i['id']127                db_pw = i['passwd']128                if (id == db_id):129                    if (pw == db_pw):130                        result = "true"131                        return result132                    else:133                        result = "false"134                else:135                    result = "false"136            return result137        except:138            result = "false"139            return result140# 6ë² ìì²141# íìê°ì
ìì ìì´ë ì¤ë³µ ê²ì¬ë¥¼ ìí í´ëì¤142class CheckRegisterIdRequest(RequestHandler):143    def __init__(self, data_buffer: DataBuffer):144        super().__init__(data_buffer)145    async def main(self):146        temp = self.data_buffer.get_data()147        table_name = "member"148        get_all_id_query = "select id from" + " " + table_name149        try:150            result = await query_operator(get_all_id_query)151            id = None152            temp = temp.replace("[", "", 1)153            temp = temp.replace("]", "", 1)154            temp = temp.replace(" ", "")155            temp = temp.replace("'", "")156            temp = temp.split(',')157            for i in temp:158                i = i.split('=')159                if (i[0] == "user_id"):160                    id = i[1]161            db_id = None162            for i in result:163                db_id = i['id']164                if (id == db_id):165                    result = "true"166                    return result167                else:168                    result = "false"169            return result170        except:171            result = "false"172            return result173# 8ë² ìì²174# ìë¡ì´ ê³ì  ìì± íë ê°ì²´175class CreateNewAccount(RequestHandler):176    def __init__(self, data_buffer: DataBuffer):177        super().__init__(data_buffer)178    async def main(self):179        temp = self.data_buffer.get_data()180        try:181            id = None182            passwd = None183            name = None184            tel = None185            temp = temp.replace("[", "", 1)186            temp = temp.replace("]", "", 1)187            temp = temp.replace(" ", "")188            temp = temp.replace("'", "")189            temp = temp.split(',')190            for i in temp:191                i = i.split('=')192                if (i[0] == "user_id"):193                    id = i[1]194                if (i[0] == "user_pw"):195                    passwd = i[1]196                if (i[0] == "user_name"):197                    name = i[1]198                if (i[0] == "user_tel"):199                    tel = i[1]200            current_time = (datetime.datetime.now()).strftime('%y-%m-%d')201            create_new_account_query = "INSERT INTO member (id, passwd, name, tel, last_drive_date) " \202                                       "VALUES('" + str(id) + "', '" + str(passwd) + "', '" + str(name) + "', '" + str(tel) + "', '" + current_time + "');"203            '''204            name = "asdfasdf"205            pw = "asdfadf"206            data_insert_query = "INSERT INTO person (userid, passwd) VALUES('" + str(name) + "', '" + str(pw) + "');"207            '''208            try:209                result = await test_example_execute(create_new_account_query)210                return result211            except:212                result = "false"213                return result214            result = "true"215            return result216        except:217            result = "false"218            return result219# 10ë² ìì²220# 캡차 문ì ì ì ëµì ê°ì ¸ì¤ë í´ëì¤221class GetCaptchaTestSet(RequestHandler):222    def __init__(self, data_buffer: DataBuffer):223        super().__init__(data_buffer)224    async def main(self):225        temp = self.data_buffer.get_data()226        try:227            # 캡챠 문ì ë ëë¦´ê² íì¬ë 3ë² ë¬¸ì ê¹ì§ ìì228            answer = ""229            test_set_num = random.randrange(1, 100)230            test_set_num %= 9231            # ëë¹ë 1ë² ë¶í° ììíë¯ë¡, +1ì í´ì¤ì¼í¨232            test_set_num += 1233            check_captcha_answer_query = "SELECT captcha_answer FROM captcha where captcha_num=" + "'" + str(test_set_num) + "';"234            try:235                result = await query_operator(check_captcha_answer_query)236                result = result[0]237                result = result.get('captcha_answer')238                return result239            except:240                result = "false"241                return result242            return result243        except:244            result = "false"245            return result246# 12ë² ìì²247# ë§ì§ë§ ì´ì  ë ì§ë¥¼ ê°ì ¸ì¤ë í´ëì¤248class GetLastDriveDate(RequestHandler):249    def __init__(self, data_buffer: DataBuffer):250        super().__init__(data_buffer)251    async def main(self):252        temp = self.data_buffer.get_data()253        user_id=None254        temp = temp.replace("[", "", 1)255        temp = temp.replace("]", "", 1)256        temp = temp.replace(" ", "")257        temp = temp.replace("'", "")258        temp = temp.split(',')259        for i in temp:260            i = i.split('=')261        if (i[0] == "user_id"):262            user_id = i[1]263        try:264            query = "SELECT last_drive_date FROM member where id="+"'"+user_id + "';"265            try:266                result = await query_operator(query)267                result = result[0]268                result = result.get('last_drive_date')269                # result = result.split(" ")270                # temp1 = result[0]271                temp1 = str(result)272                # temp2 = result[1]273                temp1 = temp1.split("-")274                # temp2 = temp2.split(":")275                ''' ë§ì§ë§ ì ì ë ì§ '''276                year = int(temp1[0])277                month = int(temp1[1])278                day = int(temp1[2])279                ''' íì¬ ë ì§ '''280                now = datetime.datetime.today()281                now = str(now).split()282                now = str(now[0]).split("-")283                now_year = int(now[0])284                now_month = int(now[1])285                now_day = int(now[2])286                ''' ë ì§ ì°ì° ìì  '''287                last_access_date = datetime.date(year, month, day)288                now_date = datetime.date(now_year, now_month, now_day)289                delta = now_date - last_access_date290                result = str(delta.days)291                return result292            except:293                result = "false"294                return result295        except:296            result = "false"297            return result298# 14ë² ìì²299# 캡차 문ì ì ì ëµì ê°ì ¸ì¤ë í´ëì¤300class GetCaptchaTestSet2(RequestHandler):301    def __init__(self, data_buffer: DataBuffer):302        super().__init__(data_buffer)303    async def main(self):304        temp = self.data_buffer.get_data()305        try:306            captcha2_number = None307            captcha2_answer = None308            temp = temp.replace("[", "", 1)309            temp = temp.replace("]", "", 1)310            temp = temp.replace(" ", "")311            temp = temp.replace("'", "")312            temp = temp.split(',')313            for i in temp:314                i = i.split('=')315                if (i[0] == "captcha2_number"):316                    captcha2_number = i[1]317                if (i[0] == "captcha2_answer"):318                    captcha2_answer = i[1]319            # check_captcha_answer_query = "SELECT captcha_answer FROM captcha2 where captcha_num=captcha2_number ;"320            check_captcha_answer_query = "SELECT captcha_answer FROM captcha2 where captcha_num="+"'"+captcha2_number +"';"321            try:322                result = await query_operator(check_captcha_answer_query)323                result = result[0]324                result = result.get('captcha_answer')325                # ë문ìë¡ ì
ë ¥íì ê²½ì° ì문ìë¡ ë°ê¿ì¤ë¤.326                captcha2_answer = captcha2_answer.lower()327                if captcha2_answer == result:328                    result = "true"329                    return result330                else:331                    result = "false"332                    return result333            except:334                result = "false"335                return result336        except:337            result = "false"338            return result339# 16ë² ìì²340# ì´ì  ì ë³´ë¥¼ ë겨주ë í´ëì¤341class GetUserInfo(RequestHandler):342    def __init__(self, data_buffer: DataBuffer):343        super().__init__(data_buffer)344    async def main(self):345        temp = self.data_buffer.get_data()346        try:347            userid = None348            temp = temp.replace("[", "", 1)349            temp = temp.replace("]", "", 1)350            temp = temp.replace(" ", "")351            temp = temp.replace("'", "")352            temp = temp.split(',')353            for i in temp:354                i = i.split('=')355                if (i[0] == "userid"):356                    userid = i[1]357            get_userinfo_query="SELECT * FROM MEMBER WHERE id="+"'"+ userid +"';"358            try:359                qeury_result = await query_operator(get_userinfo_query)360                qeury_result = qeury_result[0]361                result = ""362                # ëë¼ì´ë¸ íì, 졸ìì´ì  ê°ì§ íì, ì주ì´ì  ê°ì§ íì, ë§ì§ë§ ì´ì  ë ì§363                result += str(str(qeury_result.get('drive_cnt'))+"-")364                result += str(str(qeury_result.get('sleep_detect_cnt'))+"-")365                result += str(str(qeury_result.get('alcohol_detect_cnt'))+"-")366                last_drive_date = str(str(qeury_result.get('last_drive_date')))367                last_drive_date = last_drive_date.split(" ")368                last_drive_date = last_drive_date[0]369                result += last_drive_date.replace("-", "/")370                return result371            except:372                result = "false"373                return result374        except:375            result = "false"376            return result377# 18ë² ìì²378# ì¬ì©ìì ëª¨ë  ì ë³´ë¥¼ ë겨주ë í´ëì¤379class GetAllUserInfo(RequestHandler):380    def __init__(self, data_buffer: DataBuffer):381        super().__init__(data_buffer)382    async def main(self):383        temp = self.data_buffer.get_data()384        try:385            userid = None386            temp = temp.replace("[", "", 1)387            temp = temp.replace("]", "", 1)388            temp = temp.replace(" ", "")389            temp = temp.replace("'", "")390            temp = temp.split(',')391            for i in temp:392                i = i.split('=')393                if (i[0] == "userid"):394                    userid = i[1]395            get_all_userinfo_query="SELECT * FROM MEMBER WHERE id="+"'" + userid + "';"396            try:397                qeury_result = await query_operator(get_all_userinfo_query)398                qeury_result = qeury_result[0]399                result = ""400                # ëë¼ì´ë¸ íì, 졸ìì´ì  ê°ì§ íì, ì주ì´ì  ê°ì§ íì, ë§ì§ë§ ì´ì  ë ì§401                result += str(str(qeury_result.get('id'))+"-")402                result += str(str(qeury_result.get('passwd'))+"-")403                result += str(str(qeury_result.get('name'))+"-")404                result += str(str(qeury_result.get('tel'))+"-")405                return result406            except:407                result = "false"408                return result409        except:410            result = "false"411            return result412# 20ë² ìì²413# ì¬ì©ì ì ë³´ë¥¼ ì
ë°ì´í¸ íë í´ëì¤414class UpdateUserInfo(RequestHandler):415    def __init__(self, data_buffer: DataBuffer):416        super().__init__(data_buffer)417    async def main(self):418        temp = self.data_buffer.get_data()419        try:420            userid = None421            userpassword = None422            username = None423            usertel = None424            temp = temp.replace("[", "", 1)425            temp = temp.replace("]", "", 1)426            temp = temp.replace(" ", "")427            temp = temp.replace("'", "")428            temp = temp.split(',')429            for i in temp:430                i = i.split('=')431                if (i[0] == "userid"):432                    userid = i[1]433                if (i[0] == "userpassword"):434                    userpassword = i[1]435                if (i[0] == "username"):436                    username = i[1]437                if (i[0] == "usertel"):438                    usertel = i[1]439            get_primary_key ="SELECT * FROM MEMBER WHERE id="+"'" + userid + "';"440            primary_key = None441            try:442                primary_key = await query_operator(get_primary_key)443                primary_key = primary_key[0]444                primary_key = int(primary_key.get('index'))445                update_userinfo_query = "UPDATE member SET `passwd` = "+"'" + str(userpassword) + "'" + " , `name` = "+"'" + str(username) + "'" + " , `tel` = " + "'" + str(usertel) + "'" + " WHERE (`index` = " + "'" + str(primary_key) + "');"446                qeury_result = await update_execute(update_userinfo_query)447                if qeury_result:448                    result = "true"449                    return result450                else:451                    result = "false"452                    return result453            except:454                result = "false"455                return result456        except:457            result = "false"458            return result459# 22ë² ìì²460# ë§ì§ë§ ì´ì  ë ì§ë¥¼ ì
ë°ì´í¸ íë í¨ì461class UpdateLastDriveDate(RequestHandler):462    def __init__(self, data_buffer: DataBuffer):463        super().__init__(data_buffer)464    async def main(self):465        temp = self.data_buffer.get_data()466        try:467            userid = None468            temp = temp.replace("[", "", 1)469            temp = temp.replace("]", "", 1)470            temp = temp.replace(" ", "")471            temp = temp.replace("'", "")472            temp = temp.split(',')473            for i in temp:474                i = i.split('=')475                if (i[0] == "userid"):476                    userid = i[1]477            get_primary_key ="SELECT * FROM MEMBER WHERE id="+"'" + userid + "';"478            primary_key = None479            try:480                primary_key = await query_operator(get_primary_key)481                temp_result = primary_key482                primary_key = primary_key[0]483                primary_key = int(primary_key.get('index'))484                temp_result = temp_result[0]485                drive_cnt = int(temp_result.get('drive_cnt'))486                drive_cnt += 1487                new_last_drive_date = (datetime.datetime.now()).strftime('%y-%m-%d')488                update_userinfo_query = "UPDATE member SET `drive_cnt` = "+"'" + str(drive_cnt) + "'" + " , `last_drive_date` = "+"'" + str(new_last_drive_date) + "'" + " WHERE (`index` = " + "'" + str(primary_key) + "');"489                qeury_result = await update_execute(update_userinfo_query)490                if qeury_result:491                    result = "true"492                    return result493                else:494                    result = "false"495                    return result496            except:497                result = "false"498                return result499        except:500            result = "false"501            return result502# 24ë² ìì²503# í
ì¤í¸ ì¤í¨ìì ì주ì´ì  íì 1í ì¦ê° ìí´504class UpdateAlcoholCount(RequestHandler):505    def __init__(self, data_buffer: DataBuffer):506        super().__init__(data_buffer)507    async def main(self):508        temp = self.data_buffer.get_data()509        try:510            userid = None511            temp = temp.replace("[", "", 1)512            temp = temp.replace("]", "", 1)513            temp = temp.replace(" ", "")514            temp = temp.replace("'", "")515            temp = temp.split(',')516            for i in temp:517                i = i.split('=')518                if (i[0] == "userid"):519                    userid = i[1]520            get_primary_key ="SELECT * FROM MEMBER WHERE id="+"'" + userid + "';"521            primary_key = None522            try:523                primary_key = await query_operator(get_primary_key)524                temp_result = primary_key525                primary_key = primary_key[0]526                primary_key = int(primary_key.get('index'))527                temp_result = temp_result[0]528                alcohol_detect_count = int(temp_result.get('alcohol_detect_cnt'))529                alcohol_detect_count += 1530                 # update_userinfo_query = "UPDATE member SET `alcohol_detect_cnt` = "+"'" + str(alcohol_detect_count) + "'" + " WHERE (`index` = " + "'" + str(primary_key) + "');"531                update_userinfo_query = "UPDATE `test`.`member` SET `alcohol_detect_cnt` = '" + str(alcohol_detect_count) + "' WHERE(`index` = '" + str(primary_key) + "');"532                qeury_result = await update_execute(update_userinfo_query)533                if qeury_result:534                    result = "true"535                    return result536                else:537                    result = "false"538                    return result539            except:540                result = "false"541                return result542        except:543            result = "false"544            return result545# 26ë² ìì²546# ì¬ì©ìì ìì´ë를 ì°¾ì ë°íí¨547class FindUserId(RequestHandler):548    def __init__(self, data_buffer: DataBuffer):549        super().__init__(data_buffer)550    async def main(self):551        temp = self.data_buffer.get_data()552        try:553            username = None554            usertel = None555            temp = temp.replace("[", "", 1)556            temp = temp.replace("]", "", 1)557            temp = temp.replace(" ", "")558            temp = temp.replace("'", "")559            temp = temp.split(',')560            for i in temp:561                i = i.split('=')562                if (i[0] == "username"):563                    username = i[1]564                if (i[0] == "usertel"):565                    usertel = i[1]566            get_all_userinfo_query = "SELECT * FROM MEMBER WHERE name=" + "'" + username + "'" + " AND tel=" + "'" + usertel + "'" + "; "567            try:568                qeury_result = await query_operator(get_all_userinfo_query)569                qeury_result = qeury_result[0]570                result = ""571                # ë³´ë¸ ì´ë¦ê³¼ ì íë²í¸ê° ì ì¥ë ì´ë¦ê³¼ ì íë²í¸í ì¼ì¹íëì§ ì ê²572                if username == str(qeury_result.get('name')) and usertel == str(qeury_result.get('tel')):573                    result += str(qeury_result.get('id'))574                else:575                    result += "false"576                return result577            except:578                result = "false"579                return result580        except:581            result = "false"582            return result583# 28ë² ìì²584# ì¬ì©ìì ë¹ë°ë²í¸ë¥¼ ì°¾ì ë°íí¨585class FindUserPw(RequestHandler):586    def __init__(self, data_buffer: DataBuffer):587        super().__init__(data_buffer)588    async def main(self):589        temp = self.data_buffer.get_data()590        try:591            userid = None592            username = None593            temp = temp.replace("[", "", 1)594            temp = temp.replace("]", "", 1)595            temp = temp.replace(" ", "")596            temp = temp.replace("'", "")597            temp = temp.split(',')598            for i in temp:599                i = i.split('=')600                if (i[0] == "userid"):601                    userid = i[1]602                if (i[0] == "username"):603                    username = i[1]604            get_all_userinfo_query = "SELECT * FROM MEMBER WHERE name=" + "'" + username + "'" + " AND id=" + "'" + userid + "'" + "; "605            try:606                qeury_result = await query_operator(get_all_userinfo_query)607                qeury_result = qeury_result[0]608                result = ""609                # ë³´ë¸ ì´ë¦ê³¼ ì íë²í¸ê° ì ì¥ë ì´ë¦ê³¼ ì íë²í¸í ì¼ì¹íëì§ ì ê²610                if userid == str(qeury_result.get('id')) and username == str(qeury_result.get('name')):611                    result += str(qeury_result.get('passwd'))612                else:613                    result += "false"614                return result615            except:616                result = "false"617                return result618        except:619            result = "false"...test_client.py
Source:test_client.py  
1#!/usr/bin/python2##3## Copyright 2008, Various4## Adrian Likins <alikins@redhat.com>5##6## This software may be freely redistributed under the terms of the GNU7## general public license.8##9import os10import socket11import unittest12import xmlrpclib13import func.overlord.client as fc14import func.utils15import socket16class BaseTest:17    # assume we are talking to localhost18    th = socket.gethostname()19    nforks=120    async=False21    def __init__(self):22        pass23    def setUp(self):24        self.overlord = fc.Overlord(self.th,25                                    nforks=self.nforks,26                                    async=self.async)27    def test_module_version(self):28        mod = getattr(self.overlord, self.module)29        result = mod.module_version()30        self.assert_on_fault(result)31    def test_module_api_version(self):32        mod = getattr(self.overlord, self.module)33        result = mod.module_api_version()34        self.assert_on_fault(result)35    def test_module_description(self):36        mod = getattr(self.overlord, self.module)37        result = mod.module_description()38        self.assert_on_fault(result)39    def test_module_list_methods(self):40        mod = getattr(self.overlord, self.module)41        result = mod.list_methods()42        self.assert_on_fault(result)43    def test_module_get_method_args(self):44        mod = getattr(self.overlord,self.module)45        arg_result=mod.get_method_args()46        self.assert_on_fault(arg_result)47    def test_module_inventory(self):48        mod = getattr(self.overlord, self.module)49        result = mod.list_methods()50        res = result[self.th]51        # not all modules have an inventory, so check for it52        # FIXME: not real happy about having to call list method for53        #        every module, but it works for now -akl54        if "inventory" in res:55            result = mod.inventory()56        self.assert_on_fault(result)57    # we do this all over the place...58    def assert_on_fault(self, result):59        assert func.utils.is_error(result[self.th]) == False60#        assert type(result[self.th]) != xmlrpclib.Fault61    def assert_on_no_fault(self, results):62        assert func.utils.is_error(results[self.th]) == True63    # attrs set so we can skip these via nosetest64    test_module_version.intro = True65    test_module_api_version.intro = True66    test_module_description.intro = True67    test_module_list_methods.intro = True68    test_module_inventory.intro = True69    test_module_get_method_args.intro = True70class TestTest(BaseTest):71    module = "test"72    def test_add(self):73        result = self.overlord.test.add(1,5)74        self.assert_on_fault(result)75        assert result[self.th] == 676    def test_add_string(self):77        result = self.overlord.test.add("foo", "bar")78        self.assert_on_fault(result)79        assert result[self.th] == "foobar"80    def test_sleep(self):81        result = self.overlord.test.sleep(1)82        self.assert_on_fault(result)83    def test_explode(self):84        results = self.overlord.test.explode()85        print results86        self.assert_on_no_fault(results)87    def test_explode_no_string(self):88        results = self.overlord.test.explode_no_string()89        print results90        self.assert_on_no_fault(results)91    def _echo_test(self, data):92        result = self.overlord.test.echo(data)93        self.assert_on_fault(result)94        assert result[self.th] == data95    # this tests are basically just to test the basic96    # marshalling/demarshaling bits97    def test_echo_int(self):98        self._echo_test(1)99    def test_echo_string(self):100        self._echo_test("caneatcheese")101    def test_echo_array(self):102        self._echo_test([1, 2, "three", "fore", "V",])103    def test_echo_hash(self):104        self._echo_test({"one":1, "too":2, "III":3, "many":8, "lots":12312})105    def test_echo_bool_false(self):106        self._echo_test(False)107    def test_echo_bool_true(self):108        self._echo_test(True)109    def test_echo_float(self):110        self._echo_test(123.456)111    def test_echo_big_float(self):112        self._echo_test(123121232.23)113    def test_echo_bigger_float(self):114        self._echo_test(234234234234234234234.234234234234234)115    def test_echo_little_float(self):116        self._echo_test(0.000000000000000000000000000000037)117    def test_echo_binary(self):118        blob = "348dshke354ts0d9urgk"119        import xmlrpclib120        data = xmlrpclib.Binary(blob)121        self._echo_test(data)122    def test_echo_date(self):123        import datetime124        dt = datetime.datetime(1974, 1, 5, 11, 59 ,0,0, None)125        import xmlrpclib126        data = xmlrpclib.DateTime(dt)127        self._echo_test(data)128    def test_config(self):129        result = self.overlord.test.configfoo()130        config = result[self.th]131        self.assert_on_fault(result)132    def test_config_write(self):133        result = self.overlord.test.config_save()134        config = result[self.th]135        self.assert_on_fault(result)136    def test_config_set(self):137        result = self.overlord.test.config_set("example", 5000)138        config = result[self.th]139        self.assert_on_fault(result)140    def test_config_get_example(self):141        result = self.overlord.test.config_get("example")142        config = result[self.th]143        self.assert_on_fault(result)144    def test_config_get_string(self):145        result = self.overlord.test.config_get("string_option")146        option = result[self.th]147        print option148        assert(type(option) == type("foo"))149        self.assert_on_fault(result)150    def test_config_get_int(self):151        result = self.overlord.test.config_get("int_option")152        option = result[self.th]153        assert(type(option) == type(1))154        self.assert_on_fault(result)155    def test_config_get_bool(self):156        result = self.overlord.test.config_get("bool_option")157        option = result[self.th]158        assert(type(option) == type(True))159        self.assert_on_fault(result)160    def test_config_get_float(self):161        result = self.overlord.test.config_get("float_option")162        option = result[self.th]163        assert(type(option) == type(2.71828183))164        self.assert_on_fault(result)165    def test_config_get_test(self):166        result = self.overlord.test.config_get_test()167        self.assert_on_fault(result)168class TestCommand(BaseTest):169    module = "command"170    def test_echo(self):171        result = self.overlord.command.run("echo -n foo")172        self.assert_on_fault(result)173        assert result[self.th][1] == "foo"174    def test_rpm(self):175        # looksing for some package that should be there, rh specific176        # ish at the moment177        result = self.overlord.command.run("rpm -q filesystem")178        self.assert_on_fault(result)179        assert result[self.th][1].split("-")[0] == "filesystem"180    def test_env(self):181        result = self.overlord.command.run("env",182                                           {'BLIPPYFOO':'awesome'})183        self.assert_on_fault(result)184        assert result[self.th][1].find("BLIPPYFOO=awesome") != -1185#    def test_sleep_long(self):186#        result = self.overlord.command.run("sleep 256")187#        self.assert_on_fault(result)188    def test_shell_globs(self):189        result = self.overlord.command.run("ls /etc/cron*")190        self.assert_on_fault(result)191    def test_shell_pipe(self):192        result = self.overlord.command.run("echo 'foobar' | grep foo")193        self.assert_on_fault(result)194        assert result[self.th][1] == "foobar\n"195    def test_shell_redirect(self):196        result = self.overlord.command.run("mktemp")197        tmpfile = result[self.th][1].strip()198        result = self.overlord.command.run("echo foobar >> %s; cat %s" % (tmpfile, tmpfile))199        self.assert_on_fault(result)200        assert result[self.th][1] == "foobar\n"201    def test_shell_multiple_commands(self):202        result = self.overlord.command.run("cal; date; uptime; ls;")203        self.assert_on_fault(result)204class TestCopyfile(BaseTest):205    fn = "/tmp/func_test_file"206    dest_fn = "/tmp/func_test_file_dest"207    content = "this is a func test file"208    module = "copyfile"209    def create_a_file(self, size=1):210        f = open(self.fn, "w")211        f.write(self.content*size)212        f.close()213#    def test_local_copyfile(self):214#        result = self.overlord.local.copyfile.send(self.fn, self.dest_fn)215#        print result216#        self.assert_on_fault(result)217    def test_copyfile(self, size=1):218        self.create_a_file(size=size)219        fb = open(self.fn,"r").read()220        data = xmlrpclib.Binary(fb)221        result = self.overlord.copyfile.copyfile(self.dest_fn, data)222        self.assert_on_fault(result)223        assert result[self.th]  == 0224 #   def test_copyfile_big(self):225 #       # make a file in the ~70 meg range226 #       self.test_copyfile(size=100)227    def test_checksum(self):228        self.create_a_file()229        fb = open(self.fn,"r").read()230        data = xmlrpclib.Binary(fb)231        result = self.overlord.copyfile.copyfile(self.dest_fn, data)232        result = self.overlord.copyfile.checksum(self.dest_fn)233        self.assert_on_fault(result)234        assert result[self.th] == "b36a8040e44c16605d7784cdf1b3d9ed04ea7f55"235class TestHardware(BaseTest):236    module = "hardware"237    def test_inventory(self):238        result = self.overlord.hardware.inventory()239        self.assert_on_fault(result)240    def test_halinfo(self):241        result = self.overlord.hardware.hal_info()242        self.assert_on_fault(result)243    def test_info(self):244        result = self.overlord.hardware.info()245        self.assert_on_fault(result)246    def test_info_no_devices(self):247        result = self.overlord.hardware.info(False)248        self.assert_on_fault(result)249class TestFileTracker(BaseTest):250    fn = "/etc/hosts"251    fn_glob = "/etc/init.d/*"252    fn_list = ["/etc/hosts", "/etc/func/minion.conf"]253    fn_glob_list = ["/etc/hosts", "/etc/func/*"]254    module = "filetracker"255    def test_track(self):256        result = self.overlord.filetracker.track(self.fn)257        assert result[self.th] == 1258        self.assert_on_fault(result)259    def test_track_glob(self):260        result = self.overlord.filetracker.track(self.fn)261        assert result[self.th] == 1262        self.assert_on_fault(result)263    def test_untrack_glob(self):264        result = self.overlord.filetracker.track(self.fn_glob)265        result = self.overlord.filetracker.untrack(self.fn_glob)266        self.assert_on_fault(result)267    def test_track_fn_list(self):268        result = self.overlord.filetracker.track(self.fn_list)269        assert result[self.th] == 1270        self.assert_on_fault(result)271    def test_untrack_fn_list(self):272        result = self.overlord.filetracker.track(self.fn_list)273        result = self.overlord.filetracker.untrack(self.fn_list)274        self.assert_on_fault(result)275    def test_track_fn_glob_list(self):276        result = self.overlord.filetracker.track(self.fn_glob_list)277        assert result[self.th] == 1278        self.assert_on_fault(result)279    def test_untrack_fn_glob_list(self):280        result = self.overlord.filetracker.track(self.fn_glob_list)281        result = self.overlord.filetracker.untrack(self.fn_glob_list)282        self.assert_on_fault(result)283    def test_inventory(self):284        result = self.overlord.filetracker.track(self.fn)285        result = self.overlord.filetracker.inventory(False)286        self.assert_on_fault(result)287        assert self.fn in result[self.th][0]288#        assert result[self.th][0][3] == 0289    def test_untrack(self):290        result = self.overlord.filetracker.track(self.fn)291        result = self.overlord.filetracker.untrack(self.fn)292        self.assert_on_fault(result)293        result_inv = self.overlord.filetracker.inventory(False)294        tracked_files = result_inv[self.th]295        for i in tracked_files:296            if i[0] == self.fn:297                assert "%s was not properly untracked" % self.fn298class TestMount(BaseTest):299    module = "mount"300    def test_mount_list(self):301        result = self.overlord.mount.list()302        self.assert_on_fault(result)303    # INSERT some clever way to test mount here304class TestNetworkTest(BaseTest):305    module = "networktest"306    def test_ping(self):307        result = self.overlord.networktest.ping(self.th, "-c", "2")308        self.assert_on_fault(result)309    def test_ping_bad_arg(self):310        result = self.overlord.networktest.ping(self.th)311        # this should give us a FuncException312        foo = func.utils.is_error(result[self.th])313    def test_netstat(self):314        result = self.overlord.networktest.netstat("-n")315        self.assert_on_fault(result)316    def test_traceroute(self):317        result = self.overlord.networktest.traceroute(self.th)318        self.assert_on_fault(result)319    def test_dig(self):320        result = self.overlord.networktest.dig("redhat.com")321        self.assert_on_fault(result)322    def test_isportopen_closed_port(self):323        result = self.overlord.networktest.isportopen(self.th, 34251)324        self.assert_on_fault(result)325    def test_isportopen_open_port(self):326        result = self.overlord.networktest.isportopen(self.th, 51234)327        self.assert_on_fault(result)328class TestProcess(BaseTest):329    module = "process"330    def test_info(self):331        result = self.overlord.process.info()332        self.assert_on_fault(result)333    def test_mem(self):334        result = self.overlord.process.mem()335        self.assert_on_fault(result)336    # FIXME: how to test kill/pkill? start a process with337    #        command and then kill it?338class TestService(BaseTest):339    module = "service"340    def test_inventory(self):341        result = self.overlord.service.inventory()342        self.assert_on_fault(result)343    def test_get_enabled(self):344        result = self.overlord.service.get_enabled()345        self.assert_on_fault(result)346    def test_get_running(self):347        result = self.overlord.service.get_running()348        self.assert_on_fault(result)349    def test_get_status(self):350        running_data = self.overlord.service.get_running()[self.th]351        result = self.overlord.service.status(running_data[0][0])352        self.assert_on_fault(result)353        #FIXME: whats a good way to test starting/stoping services without354        #       doing bad things? -akl355class TestRpm(BaseTest):356    module = "rpms"357    def test_inventory(self):358        result = self.overlord.rpms.inventory()359        self.assert_on_fault(result)360    def test_glob(self):361        # if func is running, there should at least be python installed ;->362        result = self.overlord.rpms.glob("python*", False)363        self.assert_on_fault(result)364    def test_glob_flatten(self):365        result = self.overlord.rpms.glob("python*", True)366        self.assert_on_fault(result)367    def test_glob_nomatch(self):368        # shouldn't be any rpms called "-" ;->369        result = self.overlord.rpms.glob("-*")370        self.assert_on_fault(result)371    def test_glob_gpg_pubkey(self):372        # gpg-pubkey packages are weird rpm virtual packages, and tend to do373        # weird things, so try that too374        result = self.overlord.rpms.glob("gpg-pubkey*")375        self.assert_on_fault(result)376    def test_glob_gpg_pubkey_no_flat(self):377        # gpg-pubkey packages are weird rpm virtual packages, and tend to do378        # weird things, so try that too379        result = self.overlord.rpms.glob("gpg-pubkey*", False)380        self.assert_on_fault(result)381    def test_glob_match_all(self):382        result = self.overlord.rpms.glob("*", False)383        self.assert_on_fault(result)384    def test_verify(self):385        result = self.overlord.rpms.verify("bash")386        self.assert_on_fault(result)387class TestSmart(BaseTest):388    module = "smart"389    def test_info(self):390        result = self.overlord.smart.info()391        self.assert_on_fault(result)392class TestSysctl(BaseTest):393    module = "sysctl"394    def test_list(self):395        result = self.overlord.sysctl.list()396        self.assert_on_fault(result)397    def test_get(self):398        result = self.overlord.sysctl.get("kernel.max_lock_depth")399        self.assert_on_fault(result)400class TestYum(BaseTest):401    module = "yumcmd"402    def test_check_update(self):403        result = self.overlord.yumcmd.check_update()404        self.assert_on_fault(result)405    def test_check_update_empty_filter(self):406        results = self.overlord.yumcmd.check_update([])407        self.assert_on_fault(results)408        results_no_filter = self.overlord.yumcmd.check_update()409        assert results == results_no_filter410    def test_check_update_splat_filter(self):411        results = self.overlord.yumcmd.check_update(['*'])412        self.assert_on_fault(results)413        results_no_filter = self.overlord.yumcmd.check_update()414        assert results == results_no_filter415# this fails on fc6, need to test on newer yum to see whats up416#    def test_update_non_existent_package(self):417#        result = self.overlord.yumcmd.update("thisisapackage-_-that_should==never+exist234234234")418#        self.assert_on_fault(result)419#        # hmm, that method always returns True... not much to test there... -akl420class TestIptables(BaseTest):421    module = "iptables"422    def test_dump(self):423        result = self.overlord.iptables.dump()424        # at the moment, we dont set anything up425        # to verify, so this is just a basic426        # "does it crash" test427    def test_policy(self):428        result = self.overlord.iptables.policy()429class TestIptablesPort(BaseTest):430    module = "iptables.port"431    def test_inventory(self):432        results = self.overlord.iptables.port.inventory()433        # doesnt have an inventory, so er... -akl434#class TestHttpd(BaseTest):435#    module = "httpd"436#    def test_server_status(self):437#        result = self.overlord.httpd.server_status()438#        self.assert_on_fault(result)439#440#    def test_graceful(self):441#        result = self.overlord.httpd.graceful()442#        self.assert_on_fault(result)443class TestEchoTest(BaseTest):444    module = "echo"445    def test_run_string(self):446        result=self.overlord.echo.run_string("heyman")447        self.assert_on_fault(result)448    def test_run_int(self):449        result=self.overlord.echo.run_int(12)450        self.assert_on_fault(result)451    def test_run_float(self):452        result=self.overlord.echo.run_float(12.0)453        self.assert_on_fault(result)454    def test_run_options(self):455        result=self.overlord.echo.run_options("hehehh")456        self.assert_on_fault(result)457    def test_run_list(self):458        result=self.overlord.echo.run_list(['one','two','three'])459        self.assert_on_fault(result)460    def test_run_hash(self):461        result=self.overlord.echo.run_hash({'one':1,'two':2})462        self.assert_on_fault(result)463    def test_run_boolean(self):464        result=self.overlord.echo.run_hash(True)465        self.assert_on_fault(result)466class TestFactsModule(BaseTest):467    module = "fact"468    def test_list_fact_modules(self):469        result=self.overlord.fact.list_fact_modules()470        print result471        self.assert_on_fault(result)472    def test_list_fact_methods(self):473        # we want to catch the case if haveconflicts ...474        result=self.overlord.fact.list_fact_methods(True)475        print result476        self.assert_on_fault(result)477        if type(result[self.th])==dict and result[self.th].has_key('__conflict__'):478            raise Exception("You have conflict in tags run fact.list_fact_methods for more info : %s"%result)479    def test_show_fact_module(self):480        modules = self.overlord.fact.list_fact_modules().values()481        print "Modules to run for show fact module are ",modules482        for module in modules[0]:483            result = self.overlord.fact.show_fact_module(module)484            print result485            self.assert_on_fault(result)486    def test_show_fact_method(self):487        methods = self.overlord.fact.list_fact_methods().values()488        print "Methods to run for show fact method are ",methods489        for method in methods[0]:490            result = self.overlord.fact.show_fact_method(method)491            print result492            self.assert_on_fault(result)493    def test_fact_call(self):494        methods = self.overlord.fact.list_fact_methods().values()495        for method in methods[0]:496            result = self.overlord.fact.call_fact(method)497            print result498            self.assert_on_fault(result)499class TestSystem(BaseTest):500    module = "system"501    def test_list_methods(self):502        result = self.overlord.system.list_methods()503        self.assert_on_fault(result)504    def test_listMethods(self):505        result = self.overlord.system.listMethods()506        self.assert_on_fault(result)507    def test_list_modules(self):508        result = self.overlord.system.list_modules()509        self.assert_on_fault(result)510    #FIXME: we really should just implement these for the system stuff511    #       as well512    def test_module_version(self):513        pass514    def test_module_api_version(self):515        pass516    def test_module_description(self):517        pass518    def test_module_get_method_args(self):519        pass520#import time521#class TestAsyncTest(BaseTest):522#    module = "async.test"523#    nforks=1524#    async=True525#    def test_sleep_async(self):526#        job_id = self.overlord.test.sleep(5)527#        print "job_id", job_id528#        time.sleep(5)529#        (return_code, results) = self.overlord.job_status(job_id)530#        print "return_code", return_code531#        print "results", results532#533#    def test_add_async(self):534#        job_id = self.overlord.test.add(1,5)535#        print "job_id", job_id536#        time.sleep(6)537#        (return_code, results) = self.overlord.job_status(job_id)538#        print "return_code", return_code...Tilte_minmax.py
Source:Tilte_minmax.py  
1from vic import *2import requests3import threading4import time5import json67lock = threading.Lock()8Gas_dict={}9Gas_param = {}10Gas_return = {}11121314def mainLoop(info, arr):15	16	ID_string = str(PAGE01.OCR337).replace(' ', '')17	char_list = split(ID_string)18	colon_list = []19	slash_list = []20	E_ID = ''21	U_ID = ''22	print(char_list)23	24	for i in range(len(char_list)):25		if char_list[i] == ':':26			colon_list.append(i)27		if char_list[i] == '/':28			slash_list.append(i)29	30	for i in range(colon_list[0] + 1, slash_list[0]):31		E_ID += char_list[i]3233	print(E_ID)34	35	print(colon_list)36	print(slash_list)37			38			39	pass4041def split(word):42	return[char for char in word]4344def SetGasParam(**kwargs):45	46	global Gas_param, lock47	48	Gas_param_temp = {}49	50	try:51		E_ID = kwargs['E_ID']52		Gas_param_temp['E_ID'] = E_ID53	except:54		return {'error_message':'Cannot find E_ID.', 'is_success':'N'}55	56	try:57		U_ID = kwargs['U_ID']58		Gas_param_temp['U_ID'] = U_ID59	except:60		return {'error_message':'Cannot find U_ID.', 'is_success':'N'}61	62	try:63		O_ID = kwargs['O_ID']64		Gas_param_temp['O_ID'] = O_ID65	except:66		return {'error_message':'Cannot find O_ID.', 'is_success':'N'}67	68	try:69		F_ID = kwargs['F_ID']70		Gas_param_temp['F_ID'] = F_ID71	except:72		return {'error_message':'Cannot find F_ID.', 'is_success':'N'}73	74	try:75		Gas_set = kwargs['Gas_set']76		Gas_param_temp['Gas_set'] = Gas_set77	except:78		return {'error_message':'Cannot find Gas_set.', 'is_success':'N'}79	80	81	lock.acquire()82	83	Gas_param[F_ID] = json.dumps(Gas_param_temp)84	85	lock.release()86	87	#print(Gas_param)88	89	return {'is_success':'Y'}90	9192	93def RecognizeImage(**args):94	95	global Gas_dict, Gas_param, Gas_return, lock9697	try:98		img = args['img']99	except:100		return {'error_message':'Cannot find img.', 'is_success':'N'}101	102	Gas_dict_temp = {}103	Gas_return_temp = {}104	find = 0105	result = {}106	107	E_ID = ''108	U_ID = ''109	F_ID = ''110	ID = ''111	ID_temp = ''112	ID_string = ''113    char_list = []114	colon_list = []115	slash_list = []116	117	'''118	x1 = int(result['PATTERN01'].X)+int(result['PATTERN01'].WIDTH)119	y1 = int(result['PATTERN01'].Y)120	w1 = int(result['PATTERN02'].X)-x1121	h1 = int(result['PATTERN01'].HEIGHT)122	x2 = int(result['PATTERN03'].X)+int(result['PATTERN03'].WIDTH)123	y2 = int(result['PATTERN03'].Y)124	w2 = 100125	h2 = int(result['PATTERN03'].HEIGHT)126	E_ID = TOOL.OCR(0, 'epmm_ID', x1, y1, w1, h1, img, {'segmentation':7, 'preprocess_resize':7, 'preprocess_resize_method': 0, 'preprocess_threshold_method':1, 'preprocess_threshold_algorithm': 0, 'preprocess_threshold_value': 135})127	U_ID = TOOL.OCR(0, 'epmm_ID', x2, y2, w2, h2, img, {'segmentation':7, 'preprocess_resize':7, 'preprocess_resize_method': 0, 'preprocess_threshold_method':1, 'preprocess_threshold_algorithm': 0, 'preprocess_threshold_value': 135})	128	E_ID = str(E_ID).replace(' ', '')129	U_ID = str(U_ID).replace(' ', '')130	'''131	132	'''133	E_ID = args['E_ID']134	U_ID = args['U_ID']135	'''136	137	ID_string = TOOL.OCR(0, 'ID_test', 127,0,1393,22, img, {'segmentation':7, 'preprocess_resize':5, 'preprocess_resize_method': 0, 'preprocess_threshold_method':1, 'preprocess_threshold_algorithm': 1, 'preprocess_threshold_value': 135})138	ID_string = ID_string.replace(' ', '')139	char_list = split(ID_string)140    141	for i in range(len(char_list)):142		if char_list[i] == ':':143			colon_list.append(i)144		if char_list[i] == '/':145			slash_list.append(i)146	147	for i in range(colon_list[0] + 1, slash_list[0]):148		E_ID += char_list[i]149	150	for i in range(colon_list[3] + 1, len(char_list)):151		U_ID += char_list[i]152	153	ID = E_ID + U_ID154	155	for key in Gas_param.keys():156		ID_temp = ''157		ID_temp = json.loads(Gas_param[key])['E_ID'] + json.loads(Gas_param[key])['U_ID']158		if ID_temp == ID:159			find = 1160			F_ID = key161	162	if find == 1:163		164		result = PAGE_PROCESS('01', img)165		166		Gas1_name=''167		Gas1_set=[]168		Gas1_error=[]169		Gas1_dict={}170		Gas1_name=result['OCR01']171		Gas1_set=[result['OCR02'],172				 result['OCR03'],173				 result['OCR04'],174				 result['OCR05'],175				 result['OCR06'],176				 result['OCR07'],177				 result['OCR08'],178				 result['OCR09'],179				 result['OCR10'],180				 result['OCR11']181				 ]182		Gas1_error=[result['OCR12'],183				 result['OCR13'],184				 result['OCR14'],185				 result['OCR15'],186				 result['OCR16'],187				 result['OCR17'],188				 result['OCR18'],189				 result['OCR19'],190				 result['OCR20'],191				 result['OCR21']192				 ]193194		Gas2_name=''195		Gas2_set=[]196		Gas2_error=[]197		Gas2_dict={}198		Gas2_name=result['OCR22']199		Gas2_set=[result['OCR23'],200				 result['OCR24'],201				 result['OCR25'],202				 result['OCR26'],203				 result['OCR27'],204				 result['OCR28'],205				 result['OCR29'],206				 result['OCR30'],207				 result['OCR31'],208				 result['OCR32']209				 ]210		Gas2_error=[result['OCR33'],211				 result['OCR34'],212				 result['OCR35'],213				 result['OCR36'],214				 result['OCR37'],215				 result['OCR38'],216				 result['OCR39'],217				 result['OCR40'],218				 result['OCR41'],219				 result['OCR42']220				 ]221222		Gas3_name=''223		Gas3_set=[]224		Gas3_error=[]225		Gas3_dict={}226		Gas3_name=result['OCR43']227		Gas3_set=[result['OCR44'],228				 result['OCR45'],229				 result['OCR46'],230				 result['OCR47'],231				 result['OCR48'],232				 result['OCR49'],233				 result['OCR50'],234				 result['OCR51'],235				 result['OCR52'],236				 result['OCR53']237				 ]238		Gas3_error=[result['OCR54'],239				 result['OCR55'],240				 result['OCR56'],241				 result['OCR57'],242				 result['OCR58'],243				 result['OCR59'],244				 result['OCR60'],245				 result['OCR61'],246				 result['OCR62'],247				 result['OCR63']248				 ]249250		Gas4_name=''251		Gas4_set=[]252		Gas4_error=[]253		Gas4_dict={}254		Gas4_name=result['OCR64']255		Gas4_set=[result['OCR65'],256				 result['OCR66'],257				 result['OCR67'],258				 result['OCR68'],259				 result['OCR69'],260				 result['OCR70'],261				 result['OCR71'],262				 result['OCR72'],263				 result['OCR73'],264				 result['OCR74']265				 ]266		Gas4_error=[result['OCR75'],267				 result['OCR76'],268				 result['OCR77'],269				 result['OCR78'],270				 result['OCR79'],271				 result['OCR80'],272				 result['OCR81'],273				 result['OCR82'],274				 result['OCR83'],275				 result['OCR84']276				 ]277278		Gas5_name=''279		Gas5_set=[]280		Gas5_error=[]281		Gas5_dict={}282		Gas5_name=result['OCR85']283		Gas5_set=[result['OCR86'],284				 result['OCR87'],285				 result['OCR88'],286				 result['OCR89'],287				 result['OCR90'],288				 result['OCR91'],289				 result['OCR92'],290				 result['OCR93'],291				 result['OCR94'],292				 result['OCR95']293				 ]294		Gas5_error=[result['OCR96'],295				 result['OCR97'],296				 result['OCR98'],297				 result['OCR99'],298				 result['OCR100'],299				 result['OCR101'],300				 result['OCR102'],301				 result['OCR103'],302				 result['OCR104'],303				 result['OCR105']304				 ]305306		Gas6_name=''307		Gas6_set=[]308		Gas6_error=[]309		Gas6_dict={}310		Gas6_name=result['OCR106']311		Gas6_set=[result['OCR107'],312				 result['OCR108'],313				 result['OCR109'],314				 result['OCR110'],315				 result['OCR111'],316				 result['OCR112'],317				 result['OCR113'],318				 result['OCR114'],319				 result['OCR115'],320				 result['OCR116']321				 ]322		Gas6_error=[result['OCR117'],323				 result['OCR118'],324				 result['OCR119'],325				 result['OCR120'],326				 result['OCR121'],327				 result['OCR122'],328				 result['OCR123'],329				 result['OCR124'],330				 result['OCR125'],331				 result['OCR126']332				 ]333334		Gas7_name=''335		Gas7_set=[]336		Gas7_error=[]337		Gas7_dict={}338		Gas7_name=result['OCR127']339		Gas7_set=[result['OCR128'],340				 result['OCR129'],341				 result['OCR130'],342				 result['OCR131'],343				 result['OCR132'],344				 result['OCR133'],345				 result['OCR134'],346				 result['OCR135'],347				 result['OCR136'],348				 result['OCR137']349				 ]350		Gas7_error=[result['OCR138'],351				 result['OCR139'],352				 result['OCR140'],353				 result['OCR141'],354				 result['OCR142'],355				 result['OCR143'],356				 result['OCR144'],357				 result['OCR145'],358				 result['OCR146'],359				 result['OCR147']360				 ]361362		Gas8_name=''363		Gas8_set=[]364		Gas8_error=[]365		Gas8_dict={}366		Gas8_name=result['OCR148']367		Gas8_set=[result['OCR149'],368				 result['OCR150'],369				 result['OCR151'],370				 result['OCR152'],371				 result['OCR153'],372				 result['OCR154'],373				 result['OCR155'],374				 result['OCR156'],375				 result['OCR157'],376				 result['OCR158']377				 ]378		Gas8_error=[result['OCR159'],379				 result['OCR160'],380				 result['OCR161'],381				 result['OCR162'],382				 result['OCR163'],383				 result['OCR164'],384				 result['OCR165'],385				 result['OCR166'],386				 result['OCR167'],387				 result['OCR168']388				 ]389390		Gas9_name=''391		Gas9_set=[]392		Gas9_error=[]393		Gas9_dict={}394		Gas9_name=result['OCR169']395		Gas9_set=[result['OCR170'],396				 result['OCR171'],397				 result['OCR172'],398				 result['OCR173'],399				 result['OCR174'],400				 result['OCR175'],401				 result['OCR176'],402				 result['OCR177'],403				 result['OCR178'],404				 result['OCR179']405				 ]406		Gas9_error=[result['OCR180'],407				 result['OCR181'],408				 result['OCR182'],409				 result['OCR183'],410				 result['OCR184'],411				 result['OCR185'],412				 result['OCR186'],413				 result['OCR187'],414				 result['OCR188'],415				 result['OCR189']416				 ]417418		Gas10_name=''419		Gas10_set=[]420		Gas10_error=[]421		Gas10_dict={}422		Gas10_name=result['OCR190']423		Gas10_set=[result['OCR191'],424				 result['OCR192'],425				 result['OCR193'],426				 result['OCR194'],427				 result['OCR195'],428				 result['OCR196'],429				 result['OCR197'],430				 result['OCR198'],431				 result['OCR199'],432				 result['OCR200']433				 ]434		Gas10_error=[result['OCR201'],435				 result['OCR202'],436				 result['OCR203'],437				 result['OCR204'],438				 result['OCR205'],439				 result['OCR206'],440				 result['OCR207'],441				 result['OCR208'],442				 result['OCR209'],443				 result['OCR210']444				 ]445446		Gas11_name=''447		Gas11_set=[]448		Gas11_error=[]449		Gas11_dict={}450		Gas11_name=result['OCR211']451		Gas11_set=[result['OCR212'],452				 result['OCR213'],453				 result['OCR214'],454				 result['OCR215'],455				 result['OCR216'],456				 result['OCR217'],457				 result['OCR218'],458				 result['OCR219'],459				 result['OCR220'],460				 result['OCR221']461				 ]462		Gas11_error=[result['OCR222'],463				 result['OCR223'],464				 result['OCR224'],465				 result['OCR225'],466				 result['OCR226'],467				 result['OCR227'],468				 result['OCR228'],469				 result['OCR229'],470				 result['OCR230'],471				 result['OCR231']472				 ]473474		Gas12_name=''475		Gas12_set=[]476		Gas12_error=[]477		Gas12_dict={}478		Gas12_name=result['OCR232']479		Gas12_set=[result['OCR233'],480				 result['OCR234'],481				 result['OCR235'],482				 result['OCR236'],483				 result['OCR237'],484				 result['OCR238'],485				 result['OCR239'],486				 result['OCR240'],487				 result['OCR241'],488				 result['OCR242']489				 ]490		Gas12_error=[result['OCR243'],491				 result['OCR244'],492				 result['OCR245'],493				 result['OCR246'],494				 result['OCR247'],495				 result['OCR248'],496				 result['OCR249'],497				 result['OCR250'],498				 result['OCR251'],499				 result['OCR252']500				 ]501502		Gas13_name=''503		Gas13_set=[]504		Gas13_error=[]505		Gas13_dict={}506		Gas13_name=result['OCR253']507		Gas13_set=[result['OCR254'],508				 result['OCR255'],509				 result['OCR256'],510				 result['OCR257'],511				 result['OCR258'],512				 result['OCR259'],513				 result['OCR260'],514				 result['OCR261'],515				 result['OCR262'],516				 result['OCR263']517				 ]518		Gas13_error=[result['OCR264'],519				 result['OCR265'],520				 result['OCR266'],521				 result['OCR267'],522				 result['OCR268'],523				 result['OCR269'],524				 result['OCR270'],525				 result['OCR271'],526				 result['OCR272'],527				 result['OCR273']528				 ]529530		Gas14_name=''531		Gas14_set=[]532		Gas14_error=[]533		Gas14_dict={}534		Gas14_name=result['OCR274']535		Gas14_set=[result['OCR275'],536				 result['OCR276'],537				 result['OCR277'],538				 result['OCR278'],539				 result['OCR279'],540				 result['OCR280'],541				 result['OCR281'],542				 result['OCR282'],543				 result['OCR283'],544				 result['OCR284']545				 ]546		Gas14_error=[result['OCR285'],547				 result['OCR286'],548				 result['OCR287'],549				 result['OCR288'],550				 result['OCR289'],551				 result['OCR290'],552				 result['OCR291'],553				 result['OCR292'],554				 result['OCR293'],555				 result['OCR294']556				 ]557558		Gas15_name=''559		Gas15_set=[]560		Gas15_error=[]561		Gas15_dict={}562		Gas15_name=result['OCR295']563		Gas15_set=[result['OCR296'],564				 result['OCR297'],565				 result['OCR298'],566				 result['OCR299'],567				 result['OCR300'],568				 result['OCR301'],569				 result['OCR302'],570				 result['OCR303'],571				 result['OCR304'],572				 result['OCR305']573				 ]574		Gas15_error=[result['OCR306'],575				 result['OCR307'],576				 result['OCR308'],577				 result['OCR309'],578				 result['OCR310'],579				 result['OCR311'],580				 result['OCR312'],581				 result['OCR313'],582				 result['OCR314'],583				 result['OCR315']584				 ]585586		Gas16_name=''587		Gas16_set=[]588		Gas16_error=[]589		Gas16_dict={}590		Gas16_name=result['OCR316']591		Gas16_set=[result['OCR317'],592				 result['OCR318'],593				 result['OCR319'],594				 result['OCR320'],595				 result['OCR321'],596				 result['OCR322'],597				 result['OCR323'],598				 result['OCR324'],599				 result['OCR325'],600				 result['OCR326']601				 ]602		Gas16_error=[result['OCR327'],603				 result['OCR328'],604				 result['OCR329'],605				 result['OCR330'],606				 result['OCR331'],607				 result['OCR332'],608				 result['OCR333'],609				 result['OCR334'],610				 result['OCR335'],611				 result['OCR336']612				 ]613		'''614		Gas17_name=''615		Gas17_set=[]616		Gas17_error=[]617		Gas17_dict={}618		Gas17_name='Tuning' + result['OCR337']619		Gas17_set=[result['OCR338'],620				 result['OCR339'],621				 result['OCR340'],622				 result['OCR341'],623				 result['OCR342'],624				 result['OCR343'],625				 result['OCR344'],626				 result['OCR345'],627				 result['OCR346'],628				 result['OCR347']629				 ]630		Gas17_error=[result['OCR348'],631				 result['OCR349'],632				 result['OCR350'],633				 result['OCR351'],634				 result['OCR352'],635				 result['OCR353'],636				 result['OCR354'],637				 result['OCR355'],638				 result['OCR356'],639				 result['OCR357']640				 ]641642		Gas18_name=''643		Gas18_set=[]644		Gas18_error=[]645		Gas18_dict={}646		Gas18_name='Tuning' + result['OCR358']647		Gas18_set=[result['OCR359'],648				 result['OCR360'],649				 result['OCR361'],650				 result['OCR362'],651				 result['OCR363'],652				 result['OCR364'],653				 result['OCR365'],654				 result['OCR366'],655				 result['OCR367'],656				 result['OCR368']657				 ]658		Gas18_error=[result['OCR369'],659				 result['OCR370'],660				 result['OCR371'],661				 result['OCR372'],662				 result['OCR373'],663				 result['OCR374'],664				 result['OCR375'],665				 result['OCR376'],666				 result['OCR377'],667				 result['OCR378']668				 ]669		'''670		for i in range(len(Gas1_set)):671			GasError(Gas1_dict, Gas1_set[i], Gas1_error[i])672			GasError(Gas2_dict, Gas2_set[i], Gas2_error[i])673			GasError(Gas3_dict, Gas3_set[i], Gas3_error[i])674			GasError(Gas4_dict, Gas4_set[i], Gas4_error[i])675			GasError(Gas5_dict, Gas5_set[i], Gas5_error[i])676			GasError(Gas6_dict, Gas6_set[i], Gas6_error[i])677			GasError(Gas7_dict, Gas7_set[i], Gas7_error[i])678			GasError(Gas8_dict, Gas8_set[i], Gas8_error[i])679			GasError(Gas9_dict, Gas9_set[i], Gas9_error[i])680			GasError(Gas10_dict, Gas10_set[i], Gas10_error[i])681			GasError(Gas11_dict, Gas11_set[i], Gas11_error[i])682			GasError(Gas12_dict, Gas12_set[i], Gas12_error[i])683			GasError(Gas13_dict, Gas13_set[i], Gas13_error[i])684			GasError(Gas14_dict, Gas14_set[i], Gas14_error[i])685			GasError(Gas15_dict, Gas15_set[i], Gas15_error[i])686			GasError(Gas16_dict, Gas16_set[i], Gas16_error[i])687			#GasError(Gas17_dict, Gas17_set[i], Gas17_error[i])688			#GasError(Gas18_dict, Gas18_set[i], Gas18_error[i])689690		Gas_dict_temp[Gas1_name]=json.dumps(Gas1_dict)691		Gas_dict_temp[Gas2_name]=json.dumps(Gas2_dict)692		Gas_dict_temp[Gas3_name]=json.dumps(Gas3_dict)693		Gas_dict_temp[Gas4_name]=json.dumps(Gas4_dict)694		Gas_dict_temp[Gas5_name]=json.dumps(Gas5_dict)695		Gas_dict_temp[Gas6_name]=json.dumps(Gas6_dict)696		Gas_dict_temp[Gas7_name]=json.dumps(Gas7_dict)697		Gas_dict_temp[Gas8_name]=json.dumps(Gas8_dict)698		Gas_dict_temp[Gas9_name]=json.dumps(Gas9_dict)699		Gas_dict_temp[Gas10_name]=json.dumps(Gas10_dict)700		Gas_dict_temp[Gas11_name]=json.dumps(Gas11_dict)701		Gas_dict_temp[Gas12_name]=json.dumps(Gas12_dict)702		Gas_dict_temp[Gas13_name]=json.dumps(Gas13_dict)703		Gas_dict_temp[Gas14_name]=json.dumps(Gas14_dict)704		Gas_dict_temp[Gas15_name]=json.dumps(Gas15_dict)705		Gas_dict_temp[Gas16_name]=json.dumps(Gas16_dict)706		#Gas_dict_temp[Gas17_name]=json.dumps(Gas17_dict)707		#Gas_dict_temp[Gas18_name]=json.dumps(Gas18_dict)708		#SHOW_IMAGE(img)709710		E_ID=json.loads(Gas_param[F_ID])['E_ID']711		Gas_dict_temp['E_ID'] = E_ID712		Gas_return_temp['E_ID'] = E_ID713		U_ID=json.loads(Gas_param[F_ID])['U_ID']714		Gas_dict_temp['U_ID'] = U_ID715		Gas_return_temp['U_ID'] = U_ID716		O_ID=json.loads(Gas_param[F_ID])['O_ID']717		Gas_dict_temp['O_ID'] = O_ID718		Gas_return_temp['O_ID'] = O_ID719		F_ID=json.loads(Gas_param[F_ID])['F_ID']720		Gas_dict_temp['F_ID'] = F_ID721		Gas_return_temp['F_ID'] = F_ID722		723		Gas_set = json.loads(Gas_param[F_ID])['Gas_set']724		Gas_set = Gas_set.split(',')725		726		for i in range(len(Gas_set)):727			temp=Gas_set[i].split('_')728			try:729				Gas_return_temp[temp[0]+'_'+temp[1]]=json.loads(Gas_dict_temp[temp[0]])[temp[1]]730			except:731				if not temp[0] in Gas_dict_temp.keys():732					Gas_return_temp[temp[0]]='Unknown'733				elif not temp[1] in json.loads(Gas_dict_temp[temp[0]]).keys():734					Gas_return_temp[temp[0]+'_'+temp[1]]='Unknown'735736		lock.acquire()737738		Gas_dict[F_ID] = json.dumps(Gas_dict_temp)739		Gas_return[F_ID] = json.dumps(Gas_return_temp)740741		lock.release()742743		#print(Gas_dict)744		#print(Gas_return)745746		return {'result':'Data for F_ID = ' + F_ID +', E_ID = ' + E_ID + ', and U_ID = ' + U_ID + ' is ready.', 'is_success':'Y'}747	748	else:749		return {'error_message':'Cannot find data matching F_ID = ' + F_ID +', E_ID = ' + E_ID + ', and U_ID = ' + U_ID + '.', 'is_success':'N'}750751752	753	754	755756def GasError(Gas_Temp_Dict, Set, Error):757	758	if Set=='' or Error=='':759		pass760	elif not Set in Gas_Temp_Dict.keys():761		Gas_Temp_Dict[Set]=Error762763'''764def GetGasSet(**kwargs):765	766	global Gas_dict, Gas_return, Gas_param767	768	return_dict = {}769	770	try:771		E_ID=kwargs['E_ID']772	except:773		return {'Error':'Cannot find E_ID.'}774	775	try:776		U_ID=kwargs['U_ID']777	except:778		return {'Error':'Cannot find U_ID.'}779	780	ID = E_ID + U_ID781	782	if (ID in Gas_return.keys() and ID in Gas_dict.keys()) and ID in Gas_param.keys():783		784		try:785			Gas_name = kwargs['Gas_name']786			Gas_name = Gas_name.split(',')787			Gas_dict_temp = json.loads(Gas_dict[ID])788	789			if 	Gas_name[0] == '':790				return {'Error':'There is no input parameter.'}791	792			for i in range(len(Gas_name)):793		794				try:795					return_dict[Gas_name[i]] = Gas_dict_temp[Gas_name[i]]796				except:797					if Gas_name[i] == '':798						return {'Error':'There is error in input parameters. Please check them.'}799					else:800						return_dict[Gas_name[i]] = 'Unknown Gas_name'801						802			return return_dict		803		except:804			return {'Error':'Cannot find Gas_name'}805	else:806		return {'Error':'Cannot find data matching E_ID = ' + E_ID +' and U_ID = ' + U_ID + '.'}807	808	809	return return_dict810'''	811	812	813def GetGasData(**kwargs):814	815	global Gas_dict, Gas_return, Gas_param, lock816	817	return_dict = {}818	819	try:820		F_ID=kwargs['F_ID']821	except:822		return {'error_message':'Cannot find F_ID.', 'is_success':'N'}823	824	825	if (F_ID in Gas_return.keys() and F_ID in Gas_dict.keys()) and F_ID in Gas_param.keys():826827		return_dict['data'] = Gas_return[F_ID]828		return_dict['is_success'] = 'Y'829		830		lock.acquire()831		832		Gas_dict.pop(F_ID, None)833		Gas_return.pop(F_ID, None)834		Gas_param.pop(F_ID, None)835		836		lock.release()837		838		return return_dict 839840	else:841		return {'error_message':'Cannot find data matching F_ID = ' + F_ID + '.', 'is_success':'N'}842		843	
...test_result.py
Source:test_result.py  
1import sys2import textwrap3from StringIO import StringIO4from test import test_support5import traceback6import unittest7class Test_TestResult(unittest.TestCase):8    # Note: there are not separate tests for TestResult.wasSuccessful(),9    # TestResult.errors, TestResult.failures, TestResult.testsRun or10    # TestResult.shouldStop because these only have meaning in terms of11    # other TestResult methods.12    #13    # Accordingly, tests for the aforenamed attributes are incorporated14    # in with the tests for the defining methods.15    ################################################################16    def test_init(self):17        result = unittest.TestResult()18        self.assertTrue(result.wasSuccessful())19        self.assertEqual(len(result.errors), 0)20        self.assertEqual(len(result.failures), 0)21        self.assertEqual(result.testsRun, 0)22        self.assertEqual(result.shouldStop, False)23        self.assertIsNone(result._stdout_buffer)24        self.assertIsNone(result._stderr_buffer)25    # "This method can be called to signal that the set of tests being26    # run should be aborted by setting the TestResult's shouldStop27    # attribute to True."28    def test_stop(self):29        result = unittest.TestResult()30        result.stop()31        self.assertEqual(result.shouldStop, True)32    # "Called when the test case test is about to be run. The default33    # implementation simply increments the instance's testsRun counter."34    def test_startTest(self):35        class Foo(unittest.TestCase):36            def test_1(self):37                pass38        test = Foo('test_1')39        result = unittest.TestResult()40        result.startTest(test)41        self.assertTrue(result.wasSuccessful())42        self.assertEqual(len(result.errors), 0)43        self.assertEqual(len(result.failures), 0)44        self.assertEqual(result.testsRun, 1)45        self.assertEqual(result.shouldStop, False)46        result.stopTest(test)47    # "Called after the test case test has been executed, regardless of48    # the outcome. The default implementation does nothing."49    def test_stopTest(self):50        class Foo(unittest.TestCase):51            def test_1(self):52                pass53        test = Foo('test_1')54        result = unittest.TestResult()55        result.startTest(test)56        self.assertTrue(result.wasSuccessful())57        self.assertEqual(len(result.errors), 0)58        self.assertEqual(len(result.failures), 0)59        self.assertEqual(result.testsRun, 1)60        self.assertEqual(result.shouldStop, False)61        result.stopTest(test)62        # Same tests as above; make sure nothing has changed63        self.assertTrue(result.wasSuccessful())64        self.assertEqual(len(result.errors), 0)65        self.assertEqual(len(result.failures), 0)66        self.assertEqual(result.testsRun, 1)67        self.assertEqual(result.shouldStop, False)68    # "Called before and after tests are run. The default implementation does nothing."69    def test_startTestRun_stopTestRun(self):70        result = unittest.TestResult()71        result.startTestRun()72        result.stopTestRun()73    # "addSuccess(test)"74    # ...75    # "Called when the test case test succeeds"76    # ...77    # "wasSuccessful() - Returns True if all tests run so far have passed,78    # otherwise returns False"79    # ...80    # "testsRun - The total number of tests run so far."81    # ...82    # "errors - A list containing 2-tuples of TestCase instances and83    # formatted tracebacks. Each tuple represents a test which raised an84    # unexpected exception. Contains formatted85    # tracebacks instead of sys.exc_info() results."86    # ...87    # "failures - A list containing 2-tuples of TestCase instances and88    # formatted tracebacks. Each tuple represents a test where a failure was89    # explicitly signalled using the TestCase.fail*() or TestCase.assert*()90    # methods. Contains formatted tracebacks instead91    # of sys.exc_info() results."92    def test_addSuccess(self):93        class Foo(unittest.TestCase):94            def test_1(self):95                pass96        test = Foo('test_1')97        result = unittest.TestResult()98        result.startTest(test)99        result.addSuccess(test)100        result.stopTest(test)101        self.assertTrue(result.wasSuccessful())102        self.assertEqual(len(result.errors), 0)103        self.assertEqual(len(result.failures), 0)104        self.assertEqual(result.testsRun, 1)105        self.assertEqual(result.shouldStop, False)106    # "addFailure(test, err)"107    # ...108    # "Called when the test case test signals a failure. err is a tuple of109    # the form returned by sys.exc_info(): (type, value, traceback)"110    # ...111    # "wasSuccessful() - Returns True if all tests run so far have passed,112    # otherwise returns False"113    # ...114    # "testsRun - The total number of tests run so far."115    # ...116    # "errors - A list containing 2-tuples of TestCase instances and117    # formatted tracebacks. Each tuple represents a test which raised an118    # unexpected exception. Contains formatted119    # tracebacks instead of sys.exc_info() results."120    # ...121    # "failures - A list containing 2-tuples of TestCase instances and122    # formatted tracebacks. Each tuple represents a test where a failure was123    # explicitly signalled using the TestCase.fail*() or TestCase.assert*()124    # methods. Contains formatted tracebacks instead125    # of sys.exc_info() results."126    def test_addFailure(self):127        class Foo(unittest.TestCase):128            def test_1(self):129                pass130        test = Foo('test_1')131        try:132            test.fail("foo")133        except:134            exc_info_tuple = sys.exc_info()135        result = unittest.TestResult()136        result.startTest(test)137        result.addFailure(test, exc_info_tuple)138        result.stopTest(test)139        self.assertFalse(result.wasSuccessful())140        self.assertEqual(len(result.errors), 0)141        self.assertEqual(len(result.failures), 1)142        self.assertEqual(result.testsRun, 1)143        self.assertEqual(result.shouldStop, False)144        test_case, formatted_exc = result.failures[0]145        self.assertIs(test_case, test)146        self.assertIsInstance(formatted_exc, str)147    # "addError(test, err)"148    # ...149    # "Called when the test case test raises an unexpected exception err150    # is a tuple of the form returned by sys.exc_info():151    # (type, value, traceback)"152    # ...153    # "wasSuccessful() - Returns True if all tests run so far have passed,154    # otherwise returns False"155    # ...156    # "testsRun - The total number of tests run so far."157    # ...158    # "errors - A list containing 2-tuples of TestCase instances and159    # formatted tracebacks. Each tuple represents a test which raised an160    # unexpected exception. Contains formatted161    # tracebacks instead of sys.exc_info() results."162    # ...163    # "failures - A list containing 2-tuples of TestCase instances and164    # formatted tracebacks. Each tuple represents a test where a failure was165    # explicitly signalled using the TestCase.fail*() or TestCase.assert*()166    # methods. Contains formatted tracebacks instead167    # of sys.exc_info() results."168    def test_addError(self):169        class Foo(unittest.TestCase):170            def test_1(self):171                pass172        test = Foo('test_1')173        try:174            raise TypeError()175        except:176            exc_info_tuple = sys.exc_info()177        result = unittest.TestResult()178        result.startTest(test)179        result.addError(test, exc_info_tuple)180        result.stopTest(test)181        self.assertFalse(result.wasSuccessful())182        self.assertEqual(len(result.errors), 1)183        self.assertEqual(len(result.failures), 0)184        self.assertEqual(result.testsRun, 1)185        self.assertEqual(result.shouldStop, False)186        test_case, formatted_exc = result.errors[0]187        self.assertIs(test_case, test)188        self.assertIsInstance(formatted_exc, str)189    def testGetDescriptionWithoutDocstring(self):190        result = unittest.TextTestResult(None, True, 1)191        self.assertEqual(192                result.getDescription(self),193                'testGetDescriptionWithoutDocstring (' + __name__ +194                '.Test_TestResult)')195    @unittest.skipIf(sys.flags.optimize >= 2,196                     "Docstrings are omitted with -O2 and above")197    def testGetDescriptionWithOneLineDocstring(self):198        """Tests getDescription() for a method with a docstring."""199        result = unittest.TextTestResult(None, True, 1)200        self.assertEqual(201                result.getDescription(self),202               ('testGetDescriptionWithOneLineDocstring '203                '(' + __name__ + '.Test_TestResult)\n'204                'Tests getDescription() for a method with a docstring.'))205    @unittest.skipIf(sys.flags.optimize >= 2,206                     "Docstrings are omitted with -O2 and above")207    def testGetDescriptionWithMultiLineDocstring(self):208        """Tests getDescription() for a method with a longer docstring.209        The second line of the docstring.210        """211        result = unittest.TextTestResult(None, True, 1)212        self.assertEqual(213                result.getDescription(self),214               ('testGetDescriptionWithMultiLineDocstring '215                '(' + __name__ + '.Test_TestResult)\n'216                'Tests getDescription() for a method with a longer '217                'docstring.'))218    def testStackFrameTrimming(self):219        class Frame(object):220            class tb_frame(object):221                f_globals = {}222        result = unittest.TestResult()223        self.assertFalse(result._is_relevant_tb_level(Frame))224        Frame.tb_frame.f_globals['__unittest'] = True225        self.assertTrue(result._is_relevant_tb_level(Frame))226    def testFailFast(self):227        result = unittest.TestResult()228        result._exc_info_to_string = lambda *_: ''229        result.failfast = True230        result.addError(None, None)231        self.assertTrue(result.shouldStop)232        result = unittest.TestResult()233        result._exc_info_to_string = lambda *_: ''234        result.failfast = True235        result.addFailure(None, None)236        self.assertTrue(result.shouldStop)237        result = unittest.TestResult()238        result._exc_info_to_string = lambda *_: ''239        result.failfast = True240        result.addUnexpectedSuccess(None)241        self.assertTrue(result.shouldStop)242    def testFailFastSetByRunner(self):243        runner = unittest.TextTestRunner(stream=StringIO(), failfast=True)244        def test(result):245            self.assertTrue(result.failfast)246        runner.run(test)247classDict = dict(unittest.TestResult.__dict__)248for m in ('addSkip', 'addExpectedFailure', 'addUnexpectedSuccess',249           '__init__'):250    del classDict[m]251def __init__(self, stream=None, descriptions=None, verbosity=None):252    self.failures = []253    self.errors = []254    self.testsRun = 0255    self.shouldStop = False256    self.buffer = False257classDict['__init__'] = __init__258OldResult = type('OldResult', (object,), classDict)259class Test_OldTestResult(unittest.TestCase):260    def assertOldResultWarning(self, test, failures):261        with test_support.check_warnings(("TestResult has no add.+ method,",262                                          RuntimeWarning)):263            result = OldResult()264            test.run(result)265            self.assertEqual(len(result.failures), failures)266    def testOldTestResult(self):267        class Test(unittest.TestCase):268            def testSkip(self):269                self.skipTest('foobar')270            @unittest.expectedFailure271            def testExpectedFail(self):272                raise TypeError273            @unittest.expectedFailure274            def testUnexpectedSuccess(self):275                pass276        for test_name, should_pass in (('testSkip', True),277                                       ('testExpectedFail', True),278                                       ('testUnexpectedSuccess', False)):279            test = Test(test_name)280            self.assertOldResultWarning(test, int(not should_pass))281    def testOldTestTesultSetup(self):282        class Test(unittest.TestCase):283            def setUp(self):284                self.skipTest('no reason')285            def testFoo(self):286                pass287        self.assertOldResultWarning(Test('testFoo'), 0)288    def testOldTestResultClass(self):289        @unittest.skip('no reason')290        class Test(unittest.TestCase):291            def testFoo(self):292                pass293        self.assertOldResultWarning(Test('testFoo'), 0)294    def testOldResultWithRunner(self):295        class Test(unittest.TestCase):296            def testFoo(self):297                pass298        runner = unittest.TextTestRunner(resultclass=OldResult,299                                          stream=StringIO())300        # This will raise an exception if TextTestRunner can't handle old301        # test result objects302        runner.run(Test('testFoo'))303class MockTraceback(object):304    @staticmethod305    def format_exception(*_):306        return ['A traceback']307def restore_traceback():308    unittest.result.traceback = traceback309class TestOutputBuffering(unittest.TestCase):310    def setUp(self):311        self._real_out = sys.stdout312        self._real_err = sys.stderr313    def tearDown(self):314        sys.stdout = self._real_out315        sys.stderr = self._real_err316    def testBufferOutputOff(self):317        real_out = self._real_out318        real_err = self._real_err319        result = unittest.TestResult()320        self.assertFalse(result.buffer)321        self.assertIs(real_out, sys.stdout)322        self.assertIs(real_err, sys.stderr)323        result.startTest(self)324        self.assertIs(real_out, sys.stdout)325        self.assertIs(real_err, sys.stderr)326    def testBufferOutputStartTestAddSuccess(self):327        real_out = self._real_out328        real_err = self._real_err329        result = unittest.TestResult()330        self.assertFalse(result.buffer)331        result.buffer = True332        self.assertIs(real_out, sys.stdout)333        self.assertIs(real_err, sys.stderr)334        result.startTest(self)335        self.assertIsNot(real_out, sys.stdout)336        self.assertIsNot(real_err, sys.stderr)337        self.assertIsInstance(sys.stdout, StringIO)338        self.assertIsInstance(sys.stderr, StringIO)339        self.assertIsNot(sys.stdout, sys.stderr)340        out_stream = sys.stdout341        err_stream = sys.stderr342        result._original_stdout = StringIO()343        result._original_stderr = StringIO()344        print 'foo'345        print >> sys.stderr, 'bar'346        self.assertEqual(out_stream.getvalue(), 'foo\n')347        self.assertEqual(err_stream.getvalue(), 'bar\n')348        self.assertEqual(result._original_stdout.getvalue(), '')349        self.assertEqual(result._original_stderr.getvalue(), '')350        result.addSuccess(self)351        result.stopTest(self)352        self.assertIs(sys.stdout, result._original_stdout)353        self.assertIs(sys.stderr, result._original_stderr)354        self.assertEqual(result._original_stdout.getvalue(), '')355        self.assertEqual(result._original_stderr.getvalue(), '')356        self.assertEqual(out_stream.getvalue(), '')357        self.assertEqual(err_stream.getvalue(), '')358    def getStartedResult(self):359        result = unittest.TestResult()360        result.buffer = True361        result.startTest(self)362        return result363    def testBufferOutputAddErrorOrFailure(self):364        unittest.result.traceback = MockTraceback365        self.addCleanup(restore_traceback)366        for message_attr, add_attr, include_error in [367            ('errors', 'addError', True),368            ('failures', 'addFailure', False),369            ('errors', 'addError', True),370            ('failures', 'addFailure', False)371        ]:372            result = self.getStartedResult()373            buffered_out = sys.stdout374            buffered_err = sys.stderr375            result._original_stdout = StringIO()376            result._original_stderr = StringIO()377            print >> sys.stdout, 'foo'378            if include_error:379                print >> sys.stderr, 'bar'380            addFunction = getattr(result, add_attr)381            addFunction(self, (None, None, None))382            result.stopTest(self)383            result_list = getattr(result, message_attr)384            self.assertEqual(len(result_list), 1)385            test, message = result_list[0]386            expectedOutMessage = textwrap.dedent("""387                Stdout:388                foo389            """)390            expectedErrMessage = ''391            if include_error:392                expectedErrMessage = textwrap.dedent("""393                Stderr:394                bar395            """)396            expectedFullMessage = 'A traceback%s%s' % (expectedOutMessage, expectedErrMessage)397            self.assertIs(test, self)398            self.assertEqual(result._original_stdout.getvalue(), expectedOutMessage)399            self.assertEqual(result._original_stderr.getvalue(), expectedErrMessage)400            self.assertMultiLineEqual(message, expectedFullMessage)401    def testBufferSetupClass(self):402        result = unittest.TestResult()403        result.buffer = True404        class Foo(unittest.TestCase):405            @classmethod406            def setUpClass(cls):407                1//0408            def test_foo(self):409                pass410        suite = unittest.TestSuite([Foo('test_foo')])411        suite(result)412        self.assertEqual(len(result.errors), 1)413    def testBufferTearDownClass(self):414        result = unittest.TestResult()415        result.buffer = True416        class Foo(unittest.TestCase):417            @classmethod418            def tearDownClass(cls):419                1//0420            def test_foo(self):421                pass422        suite = unittest.TestSuite([Foo('test_foo')])423        suite(result)424        self.assertEqual(len(result.errors), 1)425    def testBufferSetUpModule(self):426        result = unittest.TestResult()427        result.buffer = True428        class Foo(unittest.TestCase):429            def test_foo(self):430                pass431        class Module(object):432            @staticmethod433            def setUpModule():434                1//0435        Foo.__module__ = 'Module'436        sys.modules['Module'] = Module437        self.addCleanup(sys.modules.pop, 'Module')438        suite = unittest.TestSuite([Foo('test_foo')])439        suite(result)440        self.assertEqual(len(result.errors), 1)441    def testBufferTearDownModule(self):442        result = unittest.TestResult()443        result.buffer = True444        class Foo(unittest.TestCase):445            def test_foo(self):446                pass447        class Module(object):448            @staticmethod449            def tearDownModule():450                1//0451        Foo.__module__ = 'Module'452        sys.modules['Module'] = Module453        self.addCleanup(sys.modules.pop, 'Module')454        suite = unittest.TestSuite([Foo('test_foo')])455        suite(result)456        self.assertEqual(len(result.errors), 1)457if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
