Best Python code snippet using playwright-python
request_handler.py
Source:request_handler.py  
1import hashlib2from server.async_database import *3from asyncio import StreamReader, StreamWriter4from server.async_database import *5import aiomysql6from server.data_buffer import DataBuffer7import random8import datetime9class RequestHandler:10    def __init__(self, data_buffer: DataBuffer):11        self.request = []12        self.data_buffer = data_buffer13    async def Request_Binding(self, request_number):14        result = None15        request_number = int(request_number)16        # 0ë² ìì²ì ì¬ì©íì§ ìëë¤.17        # ìì² ëë²ë 2ì ë°°ìë¡ ì íë¤18        if request_number == 0:19            pass20        if request_number == 2:21            echo = EchoRequest(self.data_buffer)22            result = await echo.main()23        elif request_number == 4:24            select = LoginRequest(self.data_buffer)25            result = await select.main()26        elif request_number == 6:27            create = CheckRegisterIdRequest(self.data_buffer)28            result = await create.main()29        elif request_number == 8:30            insert = CreateNewAccount(self.data_buffer)31            result = await insert.main()32        elif request_number == 10:33            insert = GetCaptchaTestSet(self.data_buffer)34            result = await insert.main()35        elif request_number == 12:36            insert = GetLastDriveDate(self.data_buffer)37            result = await insert.main()38        elif request_number == 14:39            insert = GetCaptchaTestSet2(self.data_buffer)40            result = await insert.main()41        elif request_number == 16:42            insert = GetUserInfo(self.data_buffer)43            result = await insert.main()44        elif request_number == 18:45            insert = GetAllUserInfo(self.data_buffer)46            result = await insert.main()47        elif request_number == 20:48            insert = UpdateUserInfo(self.data_buffer)49            result = await insert.main()50        elif request_number == 22:51            insert = UpdateLastDriveDate(self.data_buffer)52            result = await insert.main()53        elif request_number == 24:54            insert = UpdateAlcoholCount(self.data_buffer)55            result = await insert.main()56            57        elif request_number == 26:58            insert = FindUserId(self.data_buffer)59            result = await insert.main()60        elif request_number == 28:61            insert = FindUserPw(self.data_buffer)62            result = await insert.main()63        return result64'''65íì¬ EchoRequest ë¡ì§ììë§ 66ë°ì´í° ë²í¼ë¥¼ ì´ì©í´ ë°ì´í°ë¥¼ ì²ë¦¬ íëë¡í¨.67ì°¨í ìì±íë ê°ì²´ 모ëì ë¤ìê³¼ ê°ì´ ì ì© ìí¬ê²68'''69# 0ë² ìì²ì¼ë¡ ì¬ì©íì§ ìëë¤70# í
ì´ë¸ ìì±ì ìí ìì²71class CreateTableRequest(RequestHandler):72    def __init__(self, data_buffer: DataBuffer):73        super().__init__(data_buffer)74    async def main(self):75        temp = self.data_buffer.get_data()76        table_create_query = "CREATE TABLE person " \77                             "( _id INT AUTO_INCREMENT, name VARCHAR(32) NOT NULL, " \78                             "belong VARCHAR(12) DEFAULT 'FOO', " \79                             "phone VARCHAR(12), PRIMARY KEY(_id) ) " \80                             "ENGINE=INNODB"81        result = await query_operator(table_create_query)82        return result83# 2ë² ìì²84# ìì½ ìì²ì¼ë¡, ë°ì ë°ì´í°ë¥¼ ê·¸ëë¡ ë³´ë´ì¤ë¤.85class EchoRequest(RequestHandler):86    def __init__(self, data_buffer: DataBuffer):87        super().__init__(data_buffer)88    async def main(self):89        try:90            result = self.data_buffer.get_data()91            if result:92                return result93            else:94                result = "false"95                return result96        except:97            result = "false"98            return result99# 4ë² ìì²100# ë¡ê·¸ì¸ ìì²101class LoginRequest(RequestHandler):102    def __init__(self, data_buffer: DataBuffer):103        super().__init__(data_buffer)104    async def main(self):105        temp = self.data_buffer.get_data()106        table_name = "member"107        login_query = "select * from" + " " + table_name108        try:109            result = await query_operator(login_query)110            id = None111            pw = None112            temp = temp.replace("[", "", 1)113            temp = temp.replace("]", "", 1)114            temp = temp.replace(" ", "")115            temp = temp.replace("'", "")116            temp = temp.split(',')117            for i in temp:118                i = i.split('=')119                if (i[0] == "user_id"):120                    id = i[1]121                if (i[0] == "user_pw"):122                    pw = i[1]123            db_id = None124            db_pw = None125            for i in result:126                db_id = i['id']127                db_pw = i['passwd']128                if (id == db_id):129                    if (pw == db_pw):130                        result = "true"131                        return result132                    else:133                        result = "false"134                else:135                    result = "false"136            return result137        except:138            result = "false"139            return result140# 6ë² ìì²141# íìê°ì
ìì ìì´ë ì¤ë³µ ê²ì¬ë¥¼ ìí í´ëì¤142class CheckRegisterIdRequest(RequestHandler):143    def __init__(self, data_buffer: DataBuffer):144        super().__init__(data_buffer)145    async def main(self):146        temp = self.data_buffer.get_data()147        table_name = "member"148        get_all_id_query = "select id from" + " " + table_name149        try:150            result = await query_operator(get_all_id_query)151            id = None152            temp = temp.replace("[", "", 1)153            temp = temp.replace("]", "", 1)154            temp = temp.replace(" ", "")155            temp = temp.replace("'", "")156            temp = temp.split(',')157            for i in temp:158                i = i.split('=')159                if (i[0] == "user_id"):160                    id = i[1]161            db_id = None162            for i in result:163                db_id = i['id']164                if (id == db_id):165                    result = "true"166                    return result167                else:168                    result = "false"169            return result170        except:171            result = "false"172            return result173# 8ë² ìì²174# ìë¡ì´ ê³ì  ìì± íë ê°ì²´175class CreateNewAccount(RequestHandler):176    def __init__(self, data_buffer: DataBuffer):177        super().__init__(data_buffer)178    async def main(self):179        temp = self.data_buffer.get_data()180        try:181            id = None182            passwd = None183            name = None184            tel = None185            temp = temp.replace("[", "", 1)186            temp = temp.replace("]", "", 1)187            temp = temp.replace(" ", "")188            temp = temp.replace("'", "")189            temp = temp.split(',')190            for i in temp:191                i = i.split('=')192                if (i[0] == "user_id"):193                    id = i[1]194                if (i[0] == "user_pw"):195                    passwd = i[1]196                if (i[0] == "user_name"):197                    name = i[1]198                if (i[0] == "user_tel"):199                    tel = i[1]200            current_time = (datetime.datetime.now()).strftime('%y-%m-%d')201            create_new_account_query = "INSERT INTO member (id, passwd, name, tel, last_drive_date) " \202                                       "VALUES('" + str(id) + "', '" + str(passwd) + "', '" + str(name) + "', '" + str(tel) + "', '" + current_time + "');"203            '''204            name = "asdfasdf"205            pw = "asdfadf"206            data_insert_query = "INSERT INTO person (userid, passwd) VALUES('" + str(name) + "', '" + str(pw) + "');"207            '''208            try:209                result = await test_example_execute(create_new_account_query)210                return result211            except:212                result = "false"213                return result214            result = "true"215            return result216        except:217            result = "false"218            return result219# 10ë² ìì²220# 캡차 문ì ì ì ëµì ê°ì ¸ì¤ë í´ëì¤221class GetCaptchaTestSet(RequestHandler):222    def __init__(self, data_buffer: DataBuffer):223        super().__init__(data_buffer)224    async def main(self):225        temp = self.data_buffer.get_data()226        try:227            # 캡챠 문ì ë ëë¦´ê² íì¬ë 3ë² ë¬¸ì ê¹ì§ ìì228            answer = ""229            test_set_num = random.randrange(1, 100)230            test_set_num %= 9231            # ëë¹ë 1ë² ë¶í° ììíë¯ë¡, +1ì í´ì¤ì¼í¨232            test_set_num += 1233            check_captcha_answer_query = "SELECT captcha_answer FROM captcha where captcha_num=" + "'" + str(test_set_num) + "';"234            try:235                result = await query_operator(check_captcha_answer_query)236                result = result[0]237                result = result.get('captcha_answer')238                return result239            except:240                result = "false"241                return result242            return result243        except:244            result = "false"245            return result246# 12ë² ìì²247# ë§ì§ë§ ì´ì  ë ì§ë¥¼ ê°ì ¸ì¤ë í´ëì¤248class GetLastDriveDate(RequestHandler):249    def __init__(self, data_buffer: DataBuffer):250        super().__init__(data_buffer)251    async def main(self):252        temp = self.data_buffer.get_data()253        user_id=None254        temp = temp.replace("[", "", 1)255        temp = temp.replace("]", "", 1)256        temp = temp.replace(" ", "")257        temp = temp.replace("'", "")258        temp = temp.split(',')259        for i in temp:260            i = i.split('=')261        if (i[0] == "user_id"):262            user_id = i[1]263        try:264            query = "SELECT last_drive_date FROM member where id="+"'"+user_id + "';"265            try:266                result = await query_operator(query)267                result = result[0]268                result = result.get('last_drive_date')269                # result = result.split(" ")270                # temp1 = result[0]271                temp1 = str(result)272                # temp2 = result[1]273                temp1 = temp1.split("-")274                # temp2 = temp2.split(":")275                ''' ë§ì§ë§ ì ì ë ì§ '''276                year = int(temp1[0])277                month = int(temp1[1])278                day = int(temp1[2])279                ''' íì¬ ë ì§ '''280                now = datetime.datetime.today()281                now = str(now).split()282                now = str(now[0]).split("-")283                now_year = int(now[0])284                now_month = int(now[1])285                now_day = int(now[2])286                ''' ë ì§ ì°ì° ìì  '''287                last_access_date = datetime.date(year, month, day)288                now_date = datetime.date(now_year, now_month, now_day)289                delta = now_date - last_access_date290                result = str(delta.days)291                return result292            except:293                result = "false"294                return result295        except:296            result = "false"297            return result298# 14ë² ìì²299# 캡차 문ì ì ì ëµì ê°ì ¸ì¤ë í´ëì¤300class GetCaptchaTestSet2(RequestHandler):301    def __init__(self, data_buffer: DataBuffer):302        super().__init__(data_buffer)303    async def main(self):304        temp = self.data_buffer.get_data()305        try:306            captcha2_number = None307            captcha2_answer = None308            temp = temp.replace("[", "", 1)309            temp = temp.replace("]", "", 1)310            temp = temp.replace(" ", "")311            temp = temp.replace("'", "")312            temp = temp.split(',')313            for i in temp:314                i = i.split('=')315                if (i[0] == "captcha2_number"):316                    captcha2_number = i[1]317                if (i[0] == "captcha2_answer"):318                    captcha2_answer = i[1]319            # check_captcha_answer_query = "SELECT captcha_answer FROM captcha2 where captcha_num=captcha2_number ;"320            check_captcha_answer_query = "SELECT captcha_answer FROM captcha2 where captcha_num="+"'"+captcha2_number +"';"321            try:322                result = await query_operator(check_captcha_answer_query)323                result = result[0]324                result = result.get('captcha_answer')325                # ë문ìë¡ ì
ë ¥íì ê²½ì° ì문ìë¡ ë°ê¿ì¤ë¤.326                captcha2_answer = captcha2_answer.lower()327                if captcha2_answer == result:328                    result = "true"329                    return result330                else:331                    result = "false"332                    return result333            except:334                result = "false"335                return result336        except:337            result = "false"338            return result339# 16ë² ìì²340# ì´ì  ì ë³´ë¥¼ ë겨주ë í´ëì¤341class GetUserInfo(RequestHandler):342    def __init__(self, data_buffer: DataBuffer):343        super().__init__(data_buffer)344    async def main(self):345        temp = self.data_buffer.get_data()346        try:347            userid = None348            temp = temp.replace("[", "", 1)349            temp = temp.replace("]", "", 1)350            temp = temp.replace(" ", "")351            temp = temp.replace("'", "")352            temp = temp.split(',')353            for i in temp:354                i = i.split('=')355                if (i[0] == "userid"):356                    userid = i[1]357            get_userinfo_query="SELECT * FROM MEMBER WHERE id="+"'"+ userid +"';"358            try:359                qeury_result = await query_operator(get_userinfo_query)360                qeury_result = qeury_result[0]361                result = ""362                # ëë¼ì´ë¸ íì, 졸ìì´ì  ê°ì§ íì, ì주ì´ì  ê°ì§ íì, ë§ì§ë§ ì´ì  ë ì§363                result += str(str(qeury_result.get('drive_cnt'))+"-")364                result += str(str(qeury_result.get('sleep_detect_cnt'))+"-")365                result += str(str(qeury_result.get('alcohol_detect_cnt'))+"-")366                last_drive_date = str(str(qeury_result.get('last_drive_date')))367                last_drive_date = last_drive_date.split(" ")368                last_drive_date = last_drive_date[0]369                result += last_drive_date.replace("-", "/")370                return result371            except:372                result = "false"373                return result374        except:375            result = "false"376            return result377# 18ë² ìì²378# ì¬ì©ìì ëª¨ë  ì ë³´ë¥¼ ë겨주ë í´ëì¤379class GetAllUserInfo(RequestHandler):380    def __init__(self, data_buffer: DataBuffer):381        super().__init__(data_buffer)382    async def main(self):383        temp = self.data_buffer.get_data()384        try:385            userid = None386            temp = temp.replace("[", "", 1)387            temp = temp.replace("]", "", 1)388            temp = temp.replace(" ", "")389            temp = temp.replace("'", "")390            temp = temp.split(',')391            for i in temp:392                i = i.split('=')393                if (i[0] == "userid"):394                    userid = i[1]395            get_all_userinfo_query="SELECT * FROM MEMBER WHERE id="+"'" + userid + "';"396            try:397                qeury_result = await query_operator(get_all_userinfo_query)398                qeury_result = qeury_result[0]399                result = ""400                # ëë¼ì´ë¸ íì, 졸ìì´ì  ê°ì§ íì, ì주ì´ì  ê°ì§ íì, ë§ì§ë§ ì´ì  ë ì§401                result += str(str(qeury_result.get('id'))+"-")402                result += str(str(qeury_result.get('passwd'))+"-")403                result += str(str(qeury_result.get('name'))+"-")404                result += str(str(qeury_result.get('tel'))+"-")405                return result406            except:407                result = "false"408                return result409        except:410            result = "false"411            return result412# 20ë² ìì²413# ì¬ì©ì ì ë³´ë¥¼ ì
ë°ì´í¸ íë í´ëì¤414class UpdateUserInfo(RequestHandler):415    def __init__(self, data_buffer: DataBuffer):416        super().__init__(data_buffer)417    async def main(self):418        temp = self.data_buffer.get_data()419        try:420            userid = None421            userpassword = None422            username = None423            usertel = None424            temp = temp.replace("[", "", 1)425            temp = temp.replace("]", "", 1)426            temp = temp.replace(" ", "")427            temp = temp.replace("'", "")428            temp = temp.split(',')429            for i in temp:430                i = i.split('=')431                if (i[0] == "userid"):432                    userid = i[1]433                if (i[0] == "userpassword"):434                    userpassword = i[1]435                if (i[0] == "username"):436                    username = i[1]437                if (i[0] == "usertel"):438                    usertel = i[1]439            get_primary_key ="SELECT * FROM MEMBER WHERE id="+"'" + userid + "';"440            primary_key = None441            try:442                primary_key = await query_operator(get_primary_key)443                primary_key = primary_key[0]444                primary_key = int(primary_key.get('index'))445                update_userinfo_query = "UPDATE member SET `passwd` = "+"'" + str(userpassword) + "'" + " , `name` = "+"'" + str(username) + "'" + " , `tel` = " + "'" + str(usertel) + "'" + " WHERE (`index` = " + "'" + str(primary_key) + "');"446                qeury_result = await update_execute(update_userinfo_query)447                if qeury_result:448                    result = "true"449                    return result450                else:451                    result = "false"452                    return result453            except:454                result = "false"455                return result456        except:457            result = "false"458            return result459# 22ë² ìì²460# ë§ì§ë§ ì´ì  ë ì§ë¥¼ ì
ë°ì´í¸ íë í¨ì461class UpdateLastDriveDate(RequestHandler):462    def __init__(self, data_buffer: DataBuffer):463        super().__init__(data_buffer)464    async def main(self):465        temp = self.data_buffer.get_data()466        try:467            userid = None468            temp = temp.replace("[", "", 1)469            temp = temp.replace("]", "", 1)470            temp = temp.replace(" ", "")471            temp = temp.replace("'", "")472            temp = temp.split(',')473            for i in temp:474                i = i.split('=')475                if (i[0] == "userid"):476                    userid = i[1]477            get_primary_key ="SELECT * FROM MEMBER WHERE id="+"'" + userid + "';"478            primary_key = None479            try:480                primary_key = await query_operator(get_primary_key)481                temp_result = primary_key482                primary_key = primary_key[0]483                primary_key = int(primary_key.get('index'))484                temp_result = temp_result[0]485                drive_cnt = int(temp_result.get('drive_cnt'))486                drive_cnt += 1487                new_last_drive_date = (datetime.datetime.now()).strftime('%y-%m-%d')488                update_userinfo_query = "UPDATE member SET `drive_cnt` = "+"'" + str(drive_cnt) + "'" + " , `last_drive_date` = "+"'" + str(new_last_drive_date) + "'" + " WHERE (`index` = " + "'" + str(primary_key) + "');"489                qeury_result = await update_execute(update_userinfo_query)490                if qeury_result:491                    result = "true"492                    return result493                else:494                    result = "false"495                    return result496            except:497                result = "false"498                return result499        except:500            result = "false"501            return result502# 24ë² ìì²503# í
ì¤í¸ ì¤í¨ìì ì주ì´ì  íì 1í ì¦ê° ìí´504class UpdateAlcoholCount(RequestHandler):505    def __init__(self, data_buffer: DataBuffer):506        super().__init__(data_buffer)507    async def main(self):508        temp = self.data_buffer.get_data()509        try:510            userid = None511            temp = temp.replace("[", "", 1)512            temp = temp.replace("]", "", 1)513            temp = temp.replace(" ", "")514            temp = temp.replace("'", "")515            temp = temp.split(',')516            for i in temp:517                i = i.split('=')518                if (i[0] == "userid"):519                    userid = i[1]520            get_primary_key ="SELECT * FROM MEMBER WHERE id="+"'" + userid + "';"521            primary_key = None522            try:523                primary_key = await query_operator(get_primary_key)524                temp_result = primary_key525                primary_key = primary_key[0]526                primary_key = int(primary_key.get('index'))527                temp_result = temp_result[0]528                alcohol_detect_count = int(temp_result.get('alcohol_detect_cnt'))529                alcohol_detect_count += 1530                 # update_userinfo_query = "UPDATE member SET `alcohol_detect_cnt` = "+"'" + str(alcohol_detect_count) + "'" + " WHERE (`index` = " + "'" + str(primary_key) + "');"531                update_userinfo_query = "UPDATE `test`.`member` SET `alcohol_detect_cnt` = '" + str(alcohol_detect_count) + "' WHERE(`index` = '" + str(primary_key) + "');"532                qeury_result = await update_execute(update_userinfo_query)533                if qeury_result:534                    result = "true"535                    return result536                else:537                    result = "false"538                    return result539            except:540                result = "false"541                return result542        except:543            result = "false"544            return result545# 26ë² ìì²546# ì¬ì©ìì ìì´ë를 ì°¾ì ë°íí¨547class FindUserId(RequestHandler):548    def __init__(self, data_buffer: DataBuffer):549        super().__init__(data_buffer)550    async def main(self):551        temp = self.data_buffer.get_data()552        try:553            username = None554            usertel = None555            temp = temp.replace("[", "", 1)556            temp = temp.replace("]", "", 1)557            temp = temp.replace(" ", "")558            temp = temp.replace("'", "")559            temp = temp.split(',')560            for i in temp:561                i = i.split('=')562                if (i[0] == "username"):563                    username = i[1]564                if (i[0] == "usertel"):565                    usertel = i[1]566            get_all_userinfo_query = "SELECT * FROM MEMBER WHERE name=" + "'" + username + "'" + " AND tel=" + "'" + usertel + "'" + "; "567            try:568                qeury_result = await query_operator(get_all_userinfo_query)569                qeury_result = qeury_result[0]570                result = ""571                # ë³´ë¸ ì´ë¦ê³¼ ì íë²í¸ê° ì ì¥ë ì´ë¦ê³¼ ì íë²í¸í ì¼ì¹íëì§ ì ê²572                if username == str(qeury_result.get('name')) and usertel == str(qeury_result.get('tel')):573                    result += str(qeury_result.get('id'))574                else:575                    result += "false"576                return result577            except:578                result = "false"579                return result580        except:581            result = "false"582            return result583# 28ë² ìì²584# ì¬ì©ìì ë¹ë°ë²í¸ë¥¼ ì°¾ì ë°íí¨585class FindUserPw(RequestHandler):586    def __init__(self, data_buffer: DataBuffer):587        super().__init__(data_buffer)588    async def main(self):589        temp = self.data_buffer.get_data()590        try:591            userid = None592            username = None593            temp = temp.replace("[", "", 1)594            temp = temp.replace("]", "", 1)595            temp = temp.replace(" ", "")596            temp = temp.replace("'", "")597            temp = temp.split(',')598            for i in temp:599                i = i.split('=')600                if (i[0] == "userid"):601                    userid = i[1]602                if (i[0] == "username"):603                    username = i[1]604            get_all_userinfo_query = "SELECT * FROM MEMBER WHERE name=" + "'" + username + "'" + " AND id=" + "'" + userid + "'" + "; "605            try:606                qeury_result = await query_operator(get_all_userinfo_query)607                qeury_result = qeury_result[0]608                result = ""609                # ë³´ë¸ ì´ë¦ê³¼ ì íë²í¸ê° ì ì¥ë ì´ë¦ê³¼ ì íë²í¸í ì¼ì¹íëì§ ì ê²610                if userid == str(qeury_result.get('id')) and username == str(qeury_result.get('name')):611                    result += str(qeury_result.get('passwd'))612                else:613                    result += "false"614                return result615            except:616                result = "false"617                return result618        except:619            result = "false"...test_client.py
Source:test_client.py  
1#!/usr/bin/python2##3## Copyright 2008, Various4## Adrian Likins <alikins@redhat.com>5##6## This software may be freely redistributed under the terms of the GNU7## general public license.8##9import os10import socket11import unittest12import xmlrpclib13import func.overlord.client as fc14import func.utils15import socket16class BaseTest:17    # assume we are talking to localhost18    th = socket.gethostname()19    nforks=120    async=False21    def __init__(self):22        pass23    def setUp(self):24        self.overlord = fc.Overlord(self.th,25                                    nforks=self.nforks,26                                    async=self.async)27    def test_module_version(self):28        mod = getattr(self.overlord, self.module)29        result = mod.module_version()30        self.assert_on_fault(result)31    def test_module_api_version(self):32        mod = getattr(self.overlord, self.module)33        result = mod.module_api_version()34        self.assert_on_fault(result)35    def test_module_description(self):36        mod = getattr(self.overlord, self.module)37        result = mod.module_description()38        self.assert_on_fault(result)39    def test_module_list_methods(self):40        mod = getattr(self.overlord, self.module)41        result = mod.list_methods()42        self.assert_on_fault(result)43    def test_module_get_method_args(self):44        mod = getattr(self.overlord,self.module)45        arg_result=mod.get_method_args()46        self.assert_on_fault(arg_result)47    def test_module_inventory(self):48        mod = getattr(self.overlord, self.module)49        result = mod.list_methods()50        res = result[self.th]51        # not all modules have an inventory, so check for it52        # FIXME: not real happy about having to call list method for53        #        every module, but it works for now -akl54        if "inventory" in res:55            result = mod.inventory()56        self.assert_on_fault(result)57    # we do this all over the place...58    def assert_on_fault(self, result):59        assert func.utils.is_error(result[self.th]) == False60#        assert type(result[self.th]) != xmlrpclib.Fault61    def assert_on_no_fault(self, results):62        assert func.utils.is_error(results[self.th]) == True63    # attrs set so we can skip these via nosetest64    test_module_version.intro = True65    test_module_api_version.intro = True66    test_module_description.intro = True67    test_module_list_methods.intro = True68    test_module_inventory.intro = True69    test_module_get_method_args.intro = True70class TestTest(BaseTest):71    module = "test"72    def test_add(self):73        result = self.overlord.test.add(1,5)74        self.assert_on_fault(result)75        assert result[self.th] == 676    def test_add_string(self):77        result = self.overlord.test.add("foo", "bar")78        self.assert_on_fault(result)79        assert result[self.th] == "foobar"80    def test_sleep(self):81        result = self.overlord.test.sleep(1)82        self.assert_on_fault(result)83    def test_explode(self):84        results = self.overlord.test.explode()85        print results86        self.assert_on_no_fault(results)87    def test_explode_no_string(self):88        results = self.overlord.test.explode_no_string()89        print results90        self.assert_on_no_fault(results)91    def _echo_test(self, data):92        result = self.overlord.test.echo(data)93        self.assert_on_fault(result)94        assert result[self.th] == data95    # this tests are basically just to test the basic96    # marshalling/demarshaling bits97    def test_echo_int(self):98        self._echo_test(1)99    def test_echo_string(self):100        self._echo_test("caneatcheese")101    def test_echo_array(self):102        self._echo_test([1, 2, "three", "fore", "V",])103    def test_echo_hash(self):104        self._echo_test({"one":1, "too":2, "III":3, "many":8, "lots":12312})105    def test_echo_bool_false(self):106        self._echo_test(False)107    def test_echo_bool_true(self):108        self._echo_test(True)109    def test_echo_float(self):110        self._echo_test(123.456)111    def test_echo_big_float(self):112        self._echo_test(123121232.23)113    def test_echo_bigger_float(self):114        self._echo_test(234234234234234234234.234234234234234)115    def test_echo_little_float(self):116        self._echo_test(0.000000000000000000000000000000037)117    def test_echo_binary(self):118        blob = "348dshke354ts0d9urgk"119        import xmlrpclib120        data = xmlrpclib.Binary(blob)121        self._echo_test(data)122    def test_echo_date(self):123        import datetime124        dt = datetime.datetime(1974, 1, 5, 11, 59 ,0,0, None)125        import xmlrpclib126        data = xmlrpclib.DateTime(dt)127        self._echo_test(data)128    def test_config(self):129        result = self.overlord.test.configfoo()130        config = result[self.th]131        self.assert_on_fault(result)132    def test_config_write(self):133        result = self.overlord.test.config_save()134        config = result[self.th]135        self.assert_on_fault(result)136    def test_config_set(self):137        result = self.overlord.test.config_set("example", 5000)138        config = result[self.th]139        self.assert_on_fault(result)140    def test_config_get_example(self):141        result = self.overlord.test.config_get("example")142        config = result[self.th]143        self.assert_on_fault(result)144    def test_config_get_string(self):145        result = self.overlord.test.config_get("string_option")146        option = result[self.th]147        print option148        assert(type(option) == type("foo"))149        self.assert_on_fault(result)150    def test_config_get_int(self):151        result = self.overlord.test.config_get("int_option")152        option = result[self.th]153        assert(type(option) == type(1))154        self.assert_on_fault(result)155    def test_config_get_bool(self):156        result = self.overlord.test.config_get("bool_option")157        option = result[self.th]158        assert(type(option) == type(True))159        self.assert_on_fault(result)160    def test_config_get_float(self):161        result = self.overlord.test.config_get("float_option")162        option = result[self.th]163        assert(type(option) == type(2.71828183))164        self.assert_on_fault(result)165    def test_config_get_test(self):166        result = self.overlord.test.config_get_test()167        self.assert_on_fault(result)168class TestCommand(BaseTest):169    module = "command"170    def test_echo(self):171        result = self.overlord.command.run("echo -n foo")172        self.assert_on_fault(result)173        assert result[self.th][1] == "foo"174    def test_rpm(self):175        # looksing for some package that should be there, rh specific176        # ish at the moment177        result = self.overlord.command.run("rpm -q filesystem")178        self.assert_on_fault(result)179        assert result[self.th][1].split("-")[0] == "filesystem"180    def test_env(self):181        result = self.overlord.command.run("env",182                                           {'BLIPPYFOO':'awesome'})183        self.assert_on_fault(result)184        assert result[self.th][1].find("BLIPPYFOO=awesome") != -1185#    def test_sleep_long(self):186#        result = self.overlord.command.run("sleep 256")187#        self.assert_on_fault(result)188    def test_shell_globs(self):189        result = self.overlord.command.run("ls /etc/cron*")190        self.assert_on_fault(result)191    def test_shell_pipe(self):192        result = self.overlord.command.run("echo 'foobar' | grep foo")193        self.assert_on_fault(result)194        assert result[self.th][1] == "foobar\n"195    def test_shell_redirect(self):196        result = self.overlord.command.run("mktemp")197        tmpfile = result[self.th][1].strip()198        result = self.overlord.command.run("echo foobar >> %s; cat %s" % (tmpfile, tmpfile))199        self.assert_on_fault(result)200        assert result[self.th][1] == "foobar\n"201    def test_shell_multiple_commands(self):202        result = self.overlord.command.run("cal; date; uptime; ls;")203        self.assert_on_fault(result)204class TestCopyfile(BaseTest):205    fn = "/tmp/func_test_file"206    dest_fn = "/tmp/func_test_file_dest"207    content = "this is a func test file"208    module = "copyfile"209    def create_a_file(self, size=1):210        f = open(self.fn, "w")211        f.write(self.content*size)212        f.close()213#    def test_local_copyfile(self):214#        result = self.overlord.local.copyfile.send(self.fn, self.dest_fn)215#        print result216#        self.assert_on_fault(result)217    def test_copyfile(self, size=1):218        self.create_a_file(size=size)219        fb = open(self.fn,"r").read()220        data = xmlrpclib.Binary(fb)221        result = self.overlord.copyfile.copyfile(self.dest_fn, data)222        self.assert_on_fault(result)223        assert result[self.th]  == 0224 #   def test_copyfile_big(self):225 #       # make a file in the ~70 meg range226 #       self.test_copyfile(size=100)227    def test_checksum(self):228        self.create_a_file()229        fb = open(self.fn,"r").read()230        data = xmlrpclib.Binary(fb)231        result = self.overlord.copyfile.copyfile(self.dest_fn, data)232        result = self.overlord.copyfile.checksum(self.dest_fn)233        self.assert_on_fault(result)234        assert result[self.th] == "b36a8040e44c16605d7784cdf1b3d9ed04ea7f55"235class TestHardware(BaseTest):236    module = "hardware"237    def test_inventory(self):238        result = self.overlord.hardware.inventory()239        self.assert_on_fault(result)240    def test_halinfo(self):241        result = self.overlord.hardware.hal_info()242        self.assert_on_fault(result)243    def test_info(self):244        result = self.overlord.hardware.info()245        self.assert_on_fault(result)246    def test_info_no_devices(self):247        result = self.overlord.hardware.info(False)248        self.assert_on_fault(result)249class TestFileTracker(BaseTest):250    fn = "/etc/hosts"251    fn_glob = "/etc/init.d/*"252    fn_list = ["/etc/hosts", "/etc/func/minion.conf"]253    fn_glob_list = ["/etc/hosts", "/etc/func/*"]254    module = "filetracker"255    def test_track(self):256        result = self.overlord.filetracker.track(self.fn)257        assert result[self.th] == 1258        self.assert_on_fault(result)259    def test_track_glob(self):260        result = self.overlord.filetracker.track(self.fn)261        assert result[self.th] == 1262        self.assert_on_fault(result)263    def test_untrack_glob(self):264        result = self.overlord.filetracker.track(self.fn_glob)265        result = self.overlord.filetracker.untrack(self.fn_glob)266        self.assert_on_fault(result)267    def test_track_fn_list(self):268        result = self.overlord.filetracker.track(self.fn_list)269        assert result[self.th] == 1270        self.assert_on_fault(result)271    def test_untrack_fn_list(self):272        result = self.overlord.filetracker.track(self.fn_list)273        result = self.overlord.filetracker.untrack(self.fn_list)274        self.assert_on_fault(result)275    def test_track_fn_glob_list(self):276        result = self.overlord.filetracker.track(self.fn_glob_list)277        assert result[self.th] == 1278        self.assert_on_fault(result)279    def test_untrack_fn_glob_list(self):280        result = self.overlord.filetracker.track(self.fn_glob_list)281        result = self.overlord.filetracker.untrack(self.fn_glob_list)282        self.assert_on_fault(result)283    def test_inventory(self):284        result = self.overlord.filetracker.track(self.fn)285        result = self.overlord.filetracker.inventory(False)286        self.assert_on_fault(result)287        assert self.fn in result[self.th][0]288#        assert result[self.th][0][3] == 0289    def test_untrack(self):290        result = self.overlord.filetracker.track(self.fn)291        result = self.overlord.filetracker.untrack(self.fn)292        self.assert_on_fault(result)293        result_inv = self.overlord.filetracker.inventory(False)294        tracked_files = result_inv[self.th]295        for i in tracked_files:296            if i[0] == self.fn:297                assert "%s was not properly untracked" % self.fn298class TestMount(BaseTest):299    module = "mount"300    def test_mount_list(self):301        result = self.overlord.mount.list()302        self.assert_on_fault(result)303    # INSERT some clever way to test mount here304class TestNetworkTest(BaseTest):305    module = "networktest"306    def test_ping(self):307        result = self.overlord.networktest.ping(self.th, "-c", "2")308        self.assert_on_fault(result)309    def test_ping_bad_arg(self):310        result = self.overlord.networktest.ping(self.th)311        # this should give us a FuncException312        foo = func.utils.is_error(result[self.th])313    def test_netstat(self):314        result = self.overlord.networktest.netstat("-n")315        self.assert_on_fault(result)316    def test_traceroute(self):317        result = self.overlord.networktest.traceroute(self.th)318        self.assert_on_fault(result)319    def test_dig(self):320        result = self.overlord.networktest.dig("redhat.com")321        self.assert_on_fault(result)322    def test_isportopen_closed_port(self):323        result = self.overlord.networktest.isportopen(self.th, 34251)324        self.assert_on_fault(result)325    def test_isportopen_open_port(self):326        result = self.overlord.networktest.isportopen(self.th, 51234)327        self.assert_on_fault(result)328class TestProcess(BaseTest):329    module = "process"330    def test_info(self):331        result = self.overlord.process.info()332        self.assert_on_fault(result)333    def test_mem(self):334        result = self.overlord.process.mem()335        self.assert_on_fault(result)336    # FIXME: how to test kill/pkill? start a process with337    #        command and then kill it?338class TestService(BaseTest):339    module = "service"340    def test_inventory(self):341        result = self.overlord.service.inventory()342        self.assert_on_fault(result)343    def test_get_enabled(self):344        result = self.overlord.service.get_enabled()345        self.assert_on_fault(result)346    def test_get_running(self):347        result = self.overlord.service.get_running()348        self.assert_on_fault(result)349    def test_get_status(self):350        running_data = self.overlord.service.get_running()[self.th]351        result = self.overlord.service.status(running_data[0][0])352        self.assert_on_fault(result)353        #FIXME: whats a good way to test starting/stoping services without354        #       doing bad things? -akl355class TestRpm(BaseTest):356    module = "rpms"357    def test_inventory(self):358        result = self.overlord.rpms.inventory()359        self.assert_on_fault(result)360    def test_glob(self):361        # if func is running, there should at least be python installed ;->362        result = self.overlord.rpms.glob("python*", False)363        self.assert_on_fault(result)364    def test_glob_flatten(self):365        result = self.overlord.rpms.glob("python*", True)366        self.assert_on_fault(result)367    def test_glob_nomatch(self):368        # shouldn't be any rpms called "-" ;->369        result = self.overlord.rpms.glob("-*")370        self.assert_on_fault(result)371    def test_glob_gpg_pubkey(self):372        # gpg-pubkey packages are weird rpm virtual packages, and tend to do373        # weird things, so try that too374        result = self.overlord.rpms.glob("gpg-pubkey*")375        self.assert_on_fault(result)376    def test_glob_gpg_pubkey_no_flat(self):377        # gpg-pubkey packages are weird rpm virtual packages, and tend to do378        # weird things, so try that too379        result = self.overlord.rpms.glob("gpg-pubkey*", False)380        self.assert_on_fault(result)381    def test_glob_match_all(self):382        result = self.overlord.rpms.glob("*", False)383        self.assert_on_fault(result)384    def test_verify(self):385        result = self.overlord.rpms.verify("bash")386        self.assert_on_fault(result)387class TestSmart(BaseTest):388    module = "smart"389    def test_info(self):390        result = self.overlord.smart.info()391        self.assert_on_fault(result)392class TestSysctl(BaseTest):393    module = "sysctl"394    def test_list(self):395        result = self.overlord.sysctl.list()396        self.assert_on_fault(result)397    def test_get(self):398        result = self.overlord.sysctl.get("kernel.max_lock_depth")399        self.assert_on_fault(result)400class TestYum(BaseTest):401    module = "yumcmd"402    def test_check_update(self):403        result = self.overlord.yumcmd.check_update()404        self.assert_on_fault(result)405    def test_check_update_empty_filter(self):406        results = self.overlord.yumcmd.check_update([])407        self.assert_on_fault(results)408        results_no_filter = self.overlord.yumcmd.check_update()409        assert results == results_no_filter410    def test_check_update_splat_filter(self):411        results = self.overlord.yumcmd.check_update(['*'])412        self.assert_on_fault(results)413        results_no_filter = self.overlord.yumcmd.check_update()414        assert results == results_no_filter415# this fails on fc6, need to test on newer yum to see whats up416#    def test_update_non_existent_package(self):417#        result = self.overlord.yumcmd.update("thisisapackage-_-that_should==never+exist234234234")418#        self.assert_on_fault(result)419#        # hmm, that method always returns True... not much to test there... -akl420class TestIptables(BaseTest):421    module = "iptables"422    def test_dump(self):423        result = self.overlord.iptables.dump()424        # at the moment, we dont set anything up425        # to verify, so this is just a basic426        # "does it crash" test427    def test_policy(self):428        result = self.overlord.iptables.policy()429class TestIptablesPort(BaseTest):430    module = "iptables.port"431    def test_inventory(self):432        results = self.overlord.iptables.port.inventory()433        # doesnt have an inventory, so er... -akl434#class TestHttpd(BaseTest):435#    module = "httpd"436#    def test_server_status(self):437#        result = self.overlord.httpd.server_status()438#        self.assert_on_fault(result)439#440#    def test_graceful(self):441#        result = self.overlord.httpd.graceful()442#        self.assert_on_fault(result)443class TestEchoTest(BaseTest):444    module = "echo"445    def test_run_string(self):446        result=self.overlord.echo.run_string("heyman")447        self.assert_on_fault(result)448    def test_run_int(self):449        result=self.overlord.echo.run_int(12)450        self.assert_on_fault(result)451    def test_run_float(self):452        result=self.overlord.echo.run_float(12.0)453        self.assert_on_fault(result)454    def test_run_options(self):455        result=self.overlord.echo.run_options("hehehh")456        self.assert_on_fault(result)457    def test_run_list(self):458        result=self.overlord.echo.run_list(['one','two','three'])459        self.assert_on_fault(result)460    def test_run_hash(self):461        result=self.overlord.echo.run_hash({'one':1,'two':2})462        self.assert_on_fault(result)463    def test_run_boolean(self):464        result=self.overlord.echo.run_hash(True)465        self.assert_on_fault(result)466class TestFactsModule(BaseTest):467    module = "fact"468    def test_list_fact_modules(self):469        result=self.overlord.fact.list_fact_modules()470        print result471        self.assert_on_fault(result)472    def test_list_fact_methods(self):473        # we want to catch the case if haveconflicts ...474        result=self.overlord.fact.list_fact_methods(True)475        print result476        self.assert_on_fault(result)477        if type(result[self.th])==dict and result[self.th].has_key('__conflict__'):478            raise Exception("You have conflict in tags run fact.list_fact_methods for more info : %s"%result)479    def test_show_fact_module(self):480        modules = self.overlord.fact.list_fact_modules().values()481        print "Modules to run for show fact module are ",modules482        for module in modules[0]:483            result = self.overlord.fact.show_fact_module(module)484            print result485            self.assert_on_fault(result)486    def test_show_fact_method(self):487        methods = self.overlord.fact.list_fact_methods().values()488        print "Methods to run for show fact method are ",methods489        for method in methods[0]:490            result = self.overlord.fact.show_fact_method(method)491            print result492            self.assert_on_fault(result)493    def test_fact_call(self):494        methods = self.overlord.fact.list_fact_methods().values()495        for method in methods[0]:496            result = self.overlord.fact.call_fact(method)497            print result498            self.assert_on_fault(result)499class TestSystem(BaseTest):500    module = "system"501    def test_list_methods(self):502        result = self.overlord.system.list_methods()503        self.assert_on_fault(result)504    def test_listMethods(self):505        result = self.overlord.system.listMethods()506        self.assert_on_fault(result)507    def test_list_modules(self):508        result = self.overlord.system.list_modules()509        self.assert_on_fault(result)510    #FIXME: we really should just implement these for the system stuff511    #       as well512    def test_module_version(self):513        pass514    def test_module_api_version(self):515        pass516    def test_module_description(self):517        pass518    def test_module_get_method_args(self):519        pass520#import time521#class TestAsyncTest(BaseTest):522#    module = "async.test"523#    nforks=1524#    async=True525#    def test_sleep_async(self):526#        job_id = self.overlord.test.sleep(5)527#        print "job_id", job_id528#        time.sleep(5)529#        (return_code, results) = self.overlord.job_status(job_id)530#        print "return_code", return_code531#        print "results", results532#533#    def test_add_async(self):534#        job_id = self.overlord.test.add(1,5)535#        print "job_id", job_id536#        time.sleep(6)537#        (return_code, results) = self.overlord.job_status(job_id)538#        print "return_code", return_code...Tilte_minmax.py
Source:Tilte_minmax.py  
1from vic import *2import requests3import threading4import time5import json67lock = threading.Lock()8Gas_dict={}9Gas_param = {}10Gas_return = {}11121314def mainLoop(info, arr):15	16	ID_string = str(PAGE01.OCR337).replace(' ', '')17	char_list = split(ID_string)18	colon_list = []19	slash_list = []20	E_ID = ''21	U_ID = ''22	print(char_list)23	24	for i in range(len(char_list)):25		if char_list[i] == ':':26			colon_list.append(i)27		if char_list[i] == '/':28			slash_list.append(i)29	30	for i in range(colon_list[0] + 1, slash_list[0]):31		E_ID += char_list[i]3233	print(E_ID)34	35	print(colon_list)36	print(slash_list)37			38			39	pass4041def split(word):42	return[char for char in word]4344def SetGasParam(**kwargs):45	46	global Gas_param, lock47	48	Gas_param_temp = {}49	50	try:51		E_ID = kwargs['E_ID']52		Gas_param_temp['E_ID'] = E_ID53	except:54		return {'error_message':'Cannot find E_ID.', 'is_success':'N'}55	56	try:57		U_ID = kwargs['U_ID']58		Gas_param_temp['U_ID'] = U_ID59	except:60		return {'error_message':'Cannot find U_ID.', 'is_success':'N'}61	62	try:63		O_ID = kwargs['O_ID']64		Gas_param_temp['O_ID'] = O_ID65	except:66		return {'error_message':'Cannot find O_ID.', 'is_success':'N'}67	68	try:69		F_ID = kwargs['F_ID']70		Gas_param_temp['F_ID'] = F_ID71	except:72		return {'error_message':'Cannot find F_ID.', 'is_success':'N'}73	74	try:75		Gas_set = kwargs['Gas_set']76		Gas_param_temp['Gas_set'] = Gas_set77	except:78		return {'error_message':'Cannot find Gas_set.', 'is_success':'N'}79	80	81	lock.acquire()82	83	Gas_param[F_ID] = json.dumps(Gas_param_temp)84	85	lock.release()86	87	#print(Gas_param)88	89	return {'is_success':'Y'}90	9192	93def RecognizeImage(**args):94	95	global Gas_dict, Gas_param, Gas_return, lock9697	try:98		img = args['img']99	except:100		return {'error_message':'Cannot find img.', 'is_success':'N'}101	102	Gas_dict_temp = {}103	Gas_return_temp = {}104	find = 0105	result = {}106	107	E_ID = ''108	U_ID = ''109	F_ID = ''110	ID = ''111	ID_temp = ''112	ID_string = ''113    char_list = []114	colon_list = []115	slash_list = []116	117	'''118	x1 = int(result['PATTERN01'].X)+int(result['PATTERN01'].WIDTH)119	y1 = int(result['PATTERN01'].Y)120	w1 = int(result['PATTERN02'].X)-x1121	h1 = int(result['PATTERN01'].HEIGHT)122	x2 = int(result['PATTERN03'].X)+int(result['PATTERN03'].WIDTH)123	y2 = int(result['PATTERN03'].Y)124	w2 = 100125	h2 = int(result['PATTERN03'].HEIGHT)126	E_ID = TOOL.OCR(0, 'epmm_ID', x1, y1, w1, h1, img, {'segmentation':7, 'preprocess_resize':7, 'preprocess_resize_method': 0, 'preprocess_threshold_method':1, 'preprocess_threshold_algorithm': 0, 'preprocess_threshold_value': 135})127	U_ID = TOOL.OCR(0, 'epmm_ID', x2, y2, w2, h2, img, {'segmentation':7, 'preprocess_resize':7, 'preprocess_resize_method': 0, 'preprocess_threshold_method':1, 'preprocess_threshold_algorithm': 0, 'preprocess_threshold_value': 135})	128	E_ID = str(E_ID).replace(' ', '')129	U_ID = str(U_ID).replace(' ', '')130	'''131	132	'''133	E_ID = args['E_ID']134	U_ID = args['U_ID']135	'''136	137	ID_string = TOOL.OCR(0, 'ID_test', 127,0,1393,22, img, {'segmentation':7, 'preprocess_resize':5, 'preprocess_resize_method': 0, 'preprocess_threshold_method':1, 'preprocess_threshold_algorithm': 1, 'preprocess_threshold_value': 135})138	ID_string = ID_string.replace(' ', '')139	char_list = split(ID_string)140    141	for i in range(len(char_list)):142		if char_list[i] == ':':143			colon_list.append(i)144		if char_list[i] == '/':145			slash_list.append(i)146	147	for i in range(colon_list[0] + 1, slash_list[0]):148		E_ID += char_list[i]149	150	for i in range(colon_list[3] + 1, len(char_list)):151		U_ID += char_list[i]152	153	ID = E_ID + U_ID154	155	for key in Gas_param.keys():156		ID_temp = ''157		ID_temp = json.loads(Gas_param[key])['E_ID'] + json.loads(Gas_param[key])['U_ID']158		if ID_temp == ID:159			find = 1160			F_ID = key161	162	if find == 1:163		164		result = PAGE_PROCESS('01', img)165		166		Gas1_name=''167		Gas1_set=[]168		Gas1_error=[]169		Gas1_dict={}170		Gas1_name=result['OCR01']171		Gas1_set=[result['OCR02'],172				 result['OCR03'],173				 result['OCR04'],174				 result['OCR05'],175				 result['OCR06'],176				 result['OCR07'],177				 result['OCR08'],178				 result['OCR09'],179				 result['OCR10'],180				 result['OCR11']181				 ]182		Gas1_error=[result['OCR12'],183				 result['OCR13'],184				 result['OCR14'],185				 result['OCR15'],186				 result['OCR16'],187				 result['OCR17'],188				 result['OCR18'],189				 result['OCR19'],190				 result['OCR20'],191				 result['OCR21']192				 ]193194		Gas2_name=''195		Gas2_set=[]196		Gas2_error=[]197		Gas2_dict={}198		Gas2_name=result['OCR22']199		Gas2_set=[result['OCR23'],200				 result['OCR24'],201				 result['OCR25'],202				 result['OCR26'],203				 result['OCR27'],204				 result['OCR28'],205				 result['OCR29'],206				 result['OCR30'],207				 result['OCR31'],208				 result['OCR32']209				 ]210		Gas2_error=[result['OCR33'],211				 result['OCR34'],212				 result['OCR35'],213				 result['OCR36'],214				 result['OCR37'],215				 result['OCR38'],216				 result['OCR39'],217				 result['OCR40'],218				 result['OCR41'],219				 result['OCR42']220				 ]221222		Gas3_name=''223		Gas3_set=[]224		Gas3_error=[]225		Gas3_dict={}226		Gas3_name=result['OCR43']227		Gas3_set=[result['OCR44'],228				 result['OCR45'],229				 result['OCR46'],230				 result['OCR47'],231				 result['OCR48'],232				 result['OCR49'],233				 result['OCR50'],234				 result['OCR51'],235				 result['OCR52'],236				 result['OCR53']237				 ]238		Gas3_error=[result['OCR54'],239				 result['OCR55'],240				 result['OCR56'],241				 result['OCR57'],242				 result['OCR58'],243				 result['OCR59'],244				 result['OCR60'],245				 result['OCR61'],246				 result['OCR62'],247				 result['OCR63']248				 ]249250		Gas4_name=''251		Gas4_set=[]252		Gas4_error=[]253		Gas4_dict={}254		Gas4_name=result['OCR64']255		Gas4_set=[result['OCR65'],256				 result['OCR66'],257				 result['OCR67'],258				 result['OCR68'],259				 result['OCR69'],260				 result['OCR70'],261				 result['OCR71'],262				 result['OCR72'],263				 result['OCR73'],264				 result['OCR74']265				 ]266		Gas4_error=[result['OCR75'],267				 result['OCR76'],268				 result['OCR77'],269				 result['OCR78'],270				 result['OCR79'],271				 result['OCR80'],272				 result['OCR81'],273				 result['OCR82'],274				 result['OCR83'],275				 result['OCR84']276				 ]277278		Gas5_name=''279		Gas5_set=[]280		Gas5_error=[]281		Gas5_dict={}282		Gas5_name=result['OCR85']283		Gas5_set=[result['OCR86'],284				 result['OCR87'],285				 result['OCR88'],286				 result['OCR89'],287				 result['OCR90'],288				 result['OCR91'],289				 result['OCR92'],290				 result['OCR93'],291				 result['OCR94'],292				 result['OCR95']293				 ]294		Gas5_error=[result['OCR96'],295				 result['OCR97'],296				 result['OCR98'],297				 result['OCR99'],298				 result['OCR100'],299				 result['OCR101'],300				 result['OCR102'],301				 result['OCR103'],302				 result['OCR104'],303				 result['OCR105']304				 ]305306		Gas6_name=''307		Gas6_set=[]308		Gas6_error=[]309		Gas6_dict={}310		Gas6_name=result['OCR106']311		Gas6_set=[result['OCR107'],312				 result['OCR108'],313				 result['OCR109'],314				 result['OCR110'],315				 result['OCR111'],316				 result['OCR112'],317				 result['OCR113'],318				 result['OCR114'],319				 result['OCR115'],320				 result['OCR116']321				 ]322		Gas6_error=[result['OCR117'],323				 result['OCR118'],324				 result['OCR119'],325				 result['OCR120'],326				 result['OCR121'],327				 result['OCR122'],328				 result['OCR123'],329				 result['OCR124'],330				 result['OCR125'],331				 result['OCR126']332				 ]333334		Gas7_name=''335		Gas7_set=[]336		Gas7_error=[]337		Gas7_dict={}338		Gas7_name=result['OCR127']339		Gas7_set=[result['OCR128'],340				 result['OCR129'],341				 result['OCR130'],342				 result['OCR131'],343				 result['OCR132'],344				 result['OCR133'],345				 result['OCR134'],346				 result['OCR135'],347				 result['OCR136'],348				 result['OCR137']349				 ]350		Gas7_error=[result['OCR138'],351				 result['OCR139'],352				 result['OCR140'],353				 result['OCR141'],354				 result['OCR142'],355				 result['OCR143'],356				 result['OCR144'],357				 result['OCR145'],358				 result['OCR146'],359				 result['OCR147']360				 ]361362		Gas8_name=''363		Gas8_set=[]364		Gas8_error=[]365		Gas8_dict={}366		Gas8_name=result['OCR148']367		Gas8_set=[result['OCR149'],368				 result['OCR150'],369				 result['OCR151'],370				 result['OCR152'],371				 result['OCR153'],372				 result['OCR154'],373				 result['OCR155'],374				 result['OCR156'],375				 result['OCR157'],376				 result['OCR158']377				 ]378		Gas8_error=[result['OCR159'],379				 result['OCR160'],380				 result['OCR161'],381				 result['OCR162'],382				 result['OCR163'],383				 result['OCR164'],384				 result['OCR165'],385				 result['OCR166'],386				 result['OCR167'],387				 result['OCR168']388				 ]389390		Gas9_name=''391		Gas9_set=[]392		Gas9_error=[]393		Gas9_dict={}394		Gas9_name=result['OCR169']395		Gas9_set=[result['OCR170'],396				 result['OCR171'],397				 result['OCR172'],398				 result['OCR173'],399				 result['OCR174'],400				 result['OCR175'],401				 result['OCR176'],402				 result['OCR177'],403				 result['OCR178'],404				 result['OCR179']405				 ]406		Gas9_error=[result['OCR180'],407				 result['OCR181'],408				 result['OCR182'],409				 result['OCR183'],410				 result['OCR184'],411				 result['OCR185'],412				 result['OCR186'],413				 result['OCR187'],414				 result['OCR188'],415				 result['OCR189']416				 ]417418		Gas10_name=''419		Gas10_set=[]420		Gas10_error=[]421		Gas10_dict={}422		Gas10_name=result['OCR190']423		Gas10_set=[result['OCR191'],424				 result['OCR192'],425				 result['OCR193'],426				 result['OCR194'],427				 result['OCR195'],428				 result['OCR196'],429				 result['OCR197'],430				 result['OCR198'],431				 result['OCR199'],432				 result['OCR200']433				 ]434		Gas10_error=[result['OCR201'],435				 result['OCR202'],436				 result['OCR203'],437				 result['OCR204'],438				 result['OCR205'],439				 result['OCR206'],440				 result['OCR207'],441				 result['OCR208'],442				 result['OCR209'],443				 result['OCR210']444				 ]445446		Gas11_name=''447		Gas11_set=[]448		Gas11_error=[]449		Gas11_dict={}450		Gas11_name=result['OCR211']451		Gas11_set=[result['OCR212'],452				 result['OCR213'],453				 result['OCR214'],454				 result['OCR215'],455				 result['OCR216'],456				 result['OCR217'],457				 result['OCR218'],458				 result['OCR219'],459				 result['OCR220'],460				 result['OCR221']461				 ]462		Gas11_error=[result['OCR222'],463				 result['OCR223'],464				 result['OCR224'],465				 result['OCR225'],466				 result['OCR226'],467				 result['OCR227'],468				 result['OCR228'],469				 result['OCR229'],470				 result['OCR230'],471				 result['OCR231']472				 ]473474		Gas12_name=''475		Gas12_set=[]476		Gas12_error=[]477		Gas12_dict={}478		Gas12_name=result['OCR232']479		Gas12_set=[result['OCR233'],480				 result['OCR234'],481				 result['OCR235'],482				 result['OCR236'],483				 result['OCR237'],484				 result['OCR238'],485				 result['OCR239'],486				 result['OCR240'],487				 result['OCR241'],488				 result['OCR242']489				 ]490		Gas12_error=[result['OCR243'],491				 result['OCR244'],492				 result['OCR245'],493				 result['OCR246'],494				 result['OCR247'],495				 result['OCR248'],496				 result['OCR249'],497				 result['OCR250'],498				 result['OCR251'],499				 result['OCR252']500				 ]501502		Gas13_name=''503		Gas13_set=[]504		Gas13_error=[]505		Gas13_dict={}506		Gas13_name=result['OCR253']507		Gas13_set=[result['OCR254'],508				 result['OCR255'],509				 result['OCR256'],510				 result['OCR257'],511				 result['OCR258'],512				 result['OCR259'],513				 result['OCR260'],514				 result['OCR261'],515				 result['OCR262'],516				 result['OCR263']517				 ]518		Gas13_error=[result['OCR264'],519				 result['OCR265'],520				 result['OCR266'],521				 result['OCR267'],522				 result['OCR268'],523				 result['OCR269'],524				 result['OCR270'],525				 result['OCR271'],526				 result['OCR272'],527				 result['OCR273']528				 ]529530		Gas14_name=''531		Gas14_set=[]532		Gas14_error=[]533		Gas14_dict={}534		Gas14_name=result['OCR274']535		Gas14_set=[result['OCR275'],536				 result['OCR276'],537				 result['OCR277'],538				 result['OCR278'],539				 result['OCR279'],540				 result['OCR280'],541				 result['OCR281'],542				 result['OCR282'],543				 result['OCR283'],544				 result['OCR284']545				 ]546		Gas14_error=[result['OCR285'],547				 result['OCR286'],548				 result['OCR287'],549				 result['OCR288'],550				 result['OCR289'],551				 result['OCR290'],552				 result['OCR291'],553				 result['OCR292'],554				 result['OCR293'],555				 result['OCR294']556				 ]557558		Gas15_name=''559		Gas15_set=[]560		Gas15_error=[]561		Gas15_dict={}562		Gas15_name=result['OCR295']563		Gas15_set=[result['OCR296'],564				 result['OCR297'],565				 result['OCR298'],566				 result['OCR299'],567				 result['OCR300'],568				 result['OCR301'],569				 result['OCR302'],570				 result['OCR303'],571				 result['OCR304'],572				 result['OCR305']573				 ]574		Gas15_error=[result['OCR306'],575				 result['OCR307'],576				 result['OCR308'],577				 result['OCR309'],578				 result['OCR310'],579				 result['OCR311'],580				 result['OCR312'],581				 result['OCR313'],582				 result['OCR314'],583				 result['OCR315']584				 ]585586		Gas16_name=''587		Gas16_set=[]588		Gas16_error=[]589		Gas16_dict={}590		Gas16_name=result['OCR316']591		Gas16_set=[result['OCR317'],592				 result['OCR318'],593				 result['OCR319'],594				 result['OCR320'],595				 result['OCR321'],596				 result['OCR322'],597				 result['OCR323'],598				 result['OCR324'],599				 result['OCR325'],600				 result['OCR326']601				 ]602		Gas16_error=[result['OCR327'],603				 result['OCR328'],604				 result['OCR329'],605				 result['OCR330'],606				 result['OCR331'],607				 result['OCR332'],608				 result['OCR333'],609				 result['OCR334'],610				 result['OCR335'],611				 result['OCR336']612				 ]613		'''614		Gas17_name=''615		Gas17_set=[]616		Gas17_error=[]617		Gas17_dict={}618		Gas17_name='Tuning' + result['OCR337']619		Gas17_set=[result['OCR338'],620				 result['OCR339'],621				 result['OCR340'],622				 result['OCR341'],623				 result['OCR342'],624				 result['OCR343'],625				 result['OCR344'],626				 result['OCR345'],627				 result['OCR346'],628				 result['OCR347']629				 ]630		Gas17_error=[result['OCR348'],631				 result['OCR349'],632				 result['OCR350'],633				 result['OCR351'],634				 result['OCR352'],635				 result['OCR353'],636				 result['OCR354'],637				 result['OCR355'],638				 result['OCR356'],639				 result['OCR357']640				 ]641642		Gas18_name=''643		Gas18_set=[]644		Gas18_error=[]645		Gas18_dict={}646		Gas18_name='Tuning' + result['OCR358']647		Gas18_set=[result['OCR359'],648				 result['OCR360'],649				 result['OCR361'],650				 result['OCR362'],651				 result['OCR363'],652				 result['OCR364'],653				 result['OCR365'],654				 result['OCR366'],655				 result['OCR367'],656				 result['OCR368']657				 ]658		Gas18_error=[result['OCR369'],659				 result['OCR370'],660				 result['OCR371'],661				 result['OCR372'],662				 result['OCR373'],663				 result['OCR374'],664				 result['OCR375'],665				 result['OCR376'],666				 result['OCR377'],667				 result['OCR378']668				 ]669		'''670		for i in range(len(Gas1_set)):671			GasError(Gas1_dict, Gas1_set[i], Gas1_error[i])672			GasError(Gas2_dict, Gas2_set[i], Gas2_error[i])673			GasError(Gas3_dict, Gas3_set[i], Gas3_error[i])674			GasError(Gas4_dict, Gas4_set[i], Gas4_error[i])675			GasError(Gas5_dict, Gas5_set[i], Gas5_error[i])676			GasError(Gas6_dict, Gas6_set[i], Gas6_error[i])677			GasError(Gas7_dict, Gas7_set[i], Gas7_error[i])678			GasError(Gas8_dict, Gas8_set[i], Gas8_error[i])679			GasError(Gas9_dict, Gas9_set[i], Gas9_error[i])680			GasError(Gas10_dict, Gas10_set[i], Gas10_error[i])681			GasError(Gas11_dict, Gas11_set[i], Gas11_error[i])682			GasError(Gas12_dict, Gas12_set[i], Gas12_error[i])683			GasError(Gas13_dict, Gas13_set[i], Gas13_error[i])684			GasError(Gas14_dict, Gas14_set[i], Gas14_error[i])685			GasError(Gas15_dict, Gas15_set[i], Gas15_error[i])686			GasError(Gas16_dict, Gas16_set[i], Gas16_error[i])687			#GasError(Gas17_dict, Gas17_set[i], Gas17_error[i])688			#GasError(Gas18_dict, Gas18_set[i], Gas18_error[i])689690		Gas_dict_temp[Gas1_name]=json.dumps(Gas1_dict)691		Gas_dict_temp[Gas2_name]=json.dumps(Gas2_dict)692		Gas_dict_temp[Gas3_name]=json.dumps(Gas3_dict)693		Gas_dict_temp[Gas4_name]=json.dumps(Gas4_dict)694		Gas_dict_temp[Gas5_name]=json.dumps(Gas5_dict)695		Gas_dict_temp[Gas6_name]=json.dumps(Gas6_dict)696		Gas_dict_temp[Gas7_name]=json.dumps(Gas7_dict)697		Gas_dict_temp[Gas8_name]=json.dumps(Gas8_dict)698		Gas_dict_temp[Gas9_name]=json.dumps(Gas9_dict)699		Gas_dict_temp[Gas10_name]=json.dumps(Gas10_dict)700		Gas_dict_temp[Gas11_name]=json.dumps(Gas11_dict)701		Gas_dict_temp[Gas12_name]=json.dumps(Gas12_dict)702		Gas_dict_temp[Gas13_name]=json.dumps(Gas13_dict)703		Gas_dict_temp[Gas14_name]=json.dumps(Gas14_dict)704		Gas_dict_temp[Gas15_name]=json.dumps(Gas15_dict)705		Gas_dict_temp[Gas16_name]=json.dumps(Gas16_dict)706		#Gas_dict_temp[Gas17_name]=json.dumps(Gas17_dict)707		#Gas_dict_temp[Gas18_name]=json.dumps(Gas18_dict)708		#SHOW_IMAGE(img)709710		E_ID=json.loads(Gas_param[F_ID])['E_ID']711		Gas_dict_temp['E_ID'] = E_ID712		Gas_return_temp['E_ID'] = E_ID713		U_ID=json.loads(Gas_param[F_ID])['U_ID']714		Gas_dict_temp['U_ID'] = U_ID715		Gas_return_temp['U_ID'] = U_ID716		O_ID=json.loads(Gas_param[F_ID])['O_ID']717		Gas_dict_temp['O_ID'] = O_ID718		Gas_return_temp['O_ID'] = O_ID719		F_ID=json.loads(Gas_param[F_ID])['F_ID']720		Gas_dict_temp['F_ID'] = F_ID721		Gas_return_temp['F_ID'] = F_ID722		723		Gas_set = json.loads(Gas_param[F_ID])['Gas_set']724		Gas_set = Gas_set.split(',')725		726		for i in range(len(Gas_set)):727			temp=Gas_set[i].split('_')728			try:729				Gas_return_temp[temp[0]+'_'+temp[1]]=json.loads(Gas_dict_temp[temp[0]])[temp[1]]730			except:731				if not temp[0] in Gas_dict_temp.keys():732					Gas_return_temp[temp[0]]='Unknown'733				elif not temp[1] in json.loads(Gas_dict_temp[temp[0]]).keys():734					Gas_return_temp[temp[0]+'_'+temp[1]]='Unknown'735736		lock.acquire()737738		Gas_dict[F_ID] = json.dumps(Gas_dict_temp)739		Gas_return[F_ID] = json.dumps(Gas_return_temp)740741		lock.release()742743		#print(Gas_dict)744		#print(Gas_return)745746		return {'result':'Data for F_ID = ' + F_ID +', E_ID = ' + E_ID + ', and U_ID = ' + U_ID + ' is ready.', 'is_success':'Y'}747	748	else:749		return {'error_message':'Cannot find data matching F_ID = ' + F_ID +', E_ID = ' + E_ID + ', and U_ID = ' + U_ID + '.', 'is_success':'N'}750751752	753	754	755756def GasError(Gas_Temp_Dict, Set, Error):757	758	if Set=='' or Error=='':759		pass760	elif not Set in Gas_Temp_Dict.keys():761		Gas_Temp_Dict[Set]=Error762763'''764def GetGasSet(**kwargs):765	766	global Gas_dict, Gas_return, Gas_param767	768	return_dict = {}769	770	try:771		E_ID=kwargs['E_ID']772	except:773		return {'Error':'Cannot find E_ID.'}774	775	try:776		U_ID=kwargs['U_ID']777	except:778		return {'Error':'Cannot find U_ID.'}779	780	ID = E_ID + U_ID781	782	if (ID in Gas_return.keys() and ID in Gas_dict.keys()) and ID in Gas_param.keys():783		784		try:785			Gas_name = kwargs['Gas_name']786			Gas_name = Gas_name.split(',')787			Gas_dict_temp = json.loads(Gas_dict[ID])788	789			if 	Gas_name[0] == '':790				return {'Error':'There is no input parameter.'}791	792			for i in range(len(Gas_name)):793		794				try:795					return_dict[Gas_name[i]] = Gas_dict_temp[Gas_name[i]]796				except:797					if Gas_name[i] == '':798						return {'Error':'There is error in input parameters. Please check them.'}799					else:800						return_dict[Gas_name[i]] = 'Unknown Gas_name'801						802			return return_dict		803		except:804			return {'Error':'Cannot find Gas_name'}805	else:806		return {'Error':'Cannot find data matching E_ID = ' + E_ID +' and U_ID = ' + U_ID + '.'}807	808	809	return return_dict810'''	811	812	813def GetGasData(**kwargs):814	815	global Gas_dict, Gas_return, Gas_param, lock816	817	return_dict = {}818	819	try:820		F_ID=kwargs['F_ID']821	except:822		return {'error_message':'Cannot find F_ID.', 'is_success':'N'}823	824	825	if (F_ID in Gas_return.keys() and F_ID in Gas_dict.keys()) and F_ID in Gas_param.keys():826827		return_dict['data'] = Gas_return[F_ID]828		return_dict['is_success'] = 'Y'829		830		lock.acquire()831		832		Gas_dict.pop(F_ID, None)833		Gas_return.pop(F_ID, None)834		Gas_param.pop(F_ID, None)835		836		lock.release()837		838		return return_dict 839840	else:841		return {'error_message':'Cannot find data matching F_ID = ' + F_ID + '.', 'is_success':'N'}842		843	
...test_result.py
Source:test_result.py  
1import sys2import textwrap3from StringIO import StringIO4from test import test_support5import traceback6import unittest7class Test_TestResult(unittest.TestCase):8    # Note: there are not separate tests for TestResult.wasSuccessful(),9    # TestResult.errors, TestResult.failures, TestResult.testsRun or10    # TestResult.shouldStop because these only have meaning in terms of11    # other TestResult methods.12    #13    # Accordingly, tests for the aforenamed attributes are incorporated14    # in with the tests for the defining methods.15    ################################################################16    def test_init(self):17        result = unittest.TestResult()18        self.assertTrue(result.wasSuccessful())19        self.assertEqual(len(result.errors), 0)20        self.assertEqual(len(result.failures), 0)21        self.assertEqual(result.testsRun, 0)22        self.assertEqual(result.shouldStop, False)23        self.assertIsNone(result._stdout_buffer)24        self.assertIsNone(result._stderr_buffer)25    # "This method can be called to signal that the set of tests being26    # run should be aborted by setting the TestResult's shouldStop27    # attribute to True."28    def test_stop(self):29        result = unittest.TestResult()30        result.stop()31        self.assertEqual(result.shouldStop, True)32    # "Called when the test case test is about to be run. The default33    # implementation simply increments the instance's testsRun counter."34    def test_startTest(self):35        class Foo(unittest.TestCase):36            def test_1(self):37                pass38        test = Foo('test_1')39        result = unittest.TestResult()40        result.startTest(test)41        self.assertTrue(result.wasSuccessful())42        self.assertEqual(len(result.errors), 0)43        self.assertEqual(len(result.failures), 0)44        self.assertEqual(result.testsRun, 1)45        self.assertEqual(result.shouldStop, False)46        result.stopTest(test)47    # "Called after the test case test has been executed, regardless of48    # the outcome. The default implementation does nothing."49    def test_stopTest(self):50        class Foo(unittest.TestCase):51            def test_1(self):52                pass53        test = Foo('test_1')54        result = unittest.TestResult()55        result.startTest(test)56        self.assertTrue(result.wasSuccessful())57        self.assertEqual(len(result.errors), 0)58        self.assertEqual(len(result.failures), 0)59        self.assertEqual(result.testsRun, 1)60        self.assertEqual(result.shouldStop, False)61        result.stopTest(test)62        # Same tests as above; make sure nothing has changed63        self.assertTrue(result.wasSuccessful())64        self.assertEqual(len(result.errors), 0)65        self.assertEqual(len(result.failures), 0)66        self.assertEqual(result.testsRun, 1)67        self.assertEqual(result.shouldStop, False)68    # "Called before and after tests are run. The default implementation does nothing."69    def test_startTestRun_stopTestRun(self):70        result = unittest.TestResult()71        result.startTestRun()72        result.stopTestRun()73    # "addSuccess(test)"74    # ...75    # "Called when the test case test succeeds"76    # ...77    # "wasSuccessful() - Returns True if all tests run so far have passed,78    # otherwise returns False"79    # ...80    # "testsRun - The total number of tests run so far."81    # ...82    # "errors - A list containing 2-tuples of TestCase instances and83    # formatted tracebacks. Each tuple represents a test which raised an84    # unexpected exception. Contains formatted85    # tracebacks instead of sys.exc_info() results."86    # ...87    # "failures - A list containing 2-tuples of TestCase instances and88    # formatted tracebacks. Each tuple represents a test where a failure was89    # explicitly signalled using the TestCase.fail*() or TestCase.assert*()90    # methods. Contains formatted tracebacks instead91    # of sys.exc_info() results."92    def test_addSuccess(self):93        class Foo(unittest.TestCase):94            def test_1(self):95                pass96        test = Foo('test_1')97        result = unittest.TestResult()98        result.startTest(test)99        result.addSuccess(test)100        result.stopTest(test)101        self.assertTrue(result.wasSuccessful())102        self.assertEqual(len(result.errors), 0)103        self.assertEqual(len(result.failures), 0)104        self.assertEqual(result.testsRun, 1)105        self.assertEqual(result.shouldStop, False)106    # "addFailure(test, err)"107    # ...108    # "Called when the test case test signals a failure. err is a tuple of109    # the form returned by sys.exc_info(): (type, value, traceback)"110    # ...111    # "wasSuccessful() - Returns True if all tests run so far have passed,112    # otherwise returns False"113    # ...114    # "testsRun - The total number of tests run so far."115    # ...116    # "errors - A list containing 2-tuples of TestCase instances and117    # formatted tracebacks. Each tuple represents a test which raised an118    # unexpected exception. Contains formatted119    # tracebacks instead of sys.exc_info() results."120    # ...121    # "failures - A list containing 2-tuples of TestCase instances and122    # formatted tracebacks. Each tuple represents a test where a failure was123    # explicitly signalled using the TestCase.fail*() or TestCase.assert*()124    # methods. Contains formatted tracebacks instead125    # of sys.exc_info() results."126    def test_addFailure(self):127        class Foo(unittest.TestCase):128            def test_1(self):129                pass130        test = Foo('test_1')131        try:132            test.fail("foo")133        except:134            exc_info_tuple = sys.exc_info()135        result = unittest.TestResult()136        result.startTest(test)137        result.addFailure(test, exc_info_tuple)138        result.stopTest(test)139        self.assertFalse(result.wasSuccessful())140        self.assertEqual(len(result.errors), 0)141        self.assertEqual(len(result.failures), 1)142        self.assertEqual(result.testsRun, 1)143        self.assertEqual(result.shouldStop, False)144        test_case, formatted_exc = result.failures[0]145        self.assertIs(test_case, test)146        self.assertIsInstance(formatted_exc, str)147    # "addError(test, err)"148    # ...149    # "Called when the test case test raises an unexpected exception err150    # is a tuple of the form returned by sys.exc_info():151    # (type, value, traceback)"152    # ...153    # "wasSuccessful() - Returns True if all tests run so far have passed,154    # otherwise returns False"155    # ...156    # "testsRun - The total number of tests run so far."157    # ...158    # "errors - A list containing 2-tuples of TestCase instances and159    # formatted tracebacks. Each tuple represents a test which raised an160    # unexpected exception. Contains formatted161    # tracebacks instead of sys.exc_info() results."162    # ...163    # "failures - A list containing 2-tuples of TestCase instances and164    # formatted tracebacks. Each tuple represents a test where a failure was165    # explicitly signalled using the TestCase.fail*() or TestCase.assert*()166    # methods. Contains formatted tracebacks instead167    # of sys.exc_info() results."168    def test_addError(self):169        class Foo(unittest.TestCase):170            def test_1(self):171                pass172        test = Foo('test_1')173        try:174            raise TypeError()175        except:176            exc_info_tuple = sys.exc_info()177        result = unittest.TestResult()178        result.startTest(test)179        result.addError(test, exc_info_tuple)180        result.stopTest(test)181        self.assertFalse(result.wasSuccessful())182        self.assertEqual(len(result.errors), 1)183        self.assertEqual(len(result.failures), 0)184        self.assertEqual(result.testsRun, 1)185        self.assertEqual(result.shouldStop, False)186        test_case, formatted_exc = result.errors[0]187        self.assertIs(test_case, test)188        self.assertIsInstance(formatted_exc, str)189    def testGetDescriptionWithoutDocstring(self):190        result = unittest.TextTestResult(None, True, 1)191        self.assertEqual(192                result.getDescription(self),193                'testGetDescriptionWithoutDocstring (' + __name__ +194                '.Test_TestResult)')195    @unittest.skipIf(sys.flags.optimize >= 2,196                     "Docstrings are omitted with -O2 and above")197    def testGetDescriptionWithOneLineDocstring(self):198        """Tests getDescription() for a method with a docstring."""199        result = unittest.TextTestResult(None, True, 1)200        self.assertEqual(201                result.getDescription(self),202               ('testGetDescriptionWithOneLineDocstring '203                '(' + __name__ + '.Test_TestResult)\n'204                'Tests getDescription() for a method with a docstring.'))205    @unittest.skipIf(sys.flags.optimize >= 2,206                     "Docstrings are omitted with -O2 and above")207    def testGetDescriptionWithMultiLineDocstring(self):208        """Tests getDescription() for a method with a longer docstring.209        The second line of the docstring.210        """211        result = unittest.TextTestResult(None, True, 1)212        self.assertEqual(213                result.getDescription(self),214               ('testGetDescriptionWithMultiLineDocstring '215                '(' + __name__ + '.Test_TestResult)\n'216                'Tests getDescription() for a method with a longer '217                'docstring.'))218    def testStackFrameTrimming(self):219        class Frame(object):220            class tb_frame(object):221                f_globals = {}222        result = unittest.TestResult()223        self.assertFalse(result._is_relevant_tb_level(Frame))224        Frame.tb_frame.f_globals['__unittest'] = True225        self.assertTrue(result._is_relevant_tb_level(Frame))226    def testFailFast(self):227        result = unittest.TestResult()228        result._exc_info_to_string = lambda *_: ''229        result.failfast = True230        result.addError(None, None)231        self.assertTrue(result.shouldStop)232        result = unittest.TestResult()233        result._exc_info_to_string = lambda *_: ''234        result.failfast = True235        result.addFailure(None, None)236        self.assertTrue(result.shouldStop)237        result = unittest.TestResult()238        result._exc_info_to_string = lambda *_: ''239        result.failfast = True240        result.addUnexpectedSuccess(None)241        self.assertTrue(result.shouldStop)242    def testFailFastSetByRunner(self):243        runner = unittest.TextTestRunner(stream=StringIO(), failfast=True)244        def test(result):245            self.assertTrue(result.failfast)246        runner.run(test)247classDict = dict(unittest.TestResult.__dict__)248for m in ('addSkip', 'addExpectedFailure', 'addUnexpectedSuccess',249           '__init__'):250    del classDict[m]251def __init__(self, stream=None, descriptions=None, verbosity=None):252    self.failures = []253    self.errors = []254    self.testsRun = 0255    self.shouldStop = False256    self.buffer = False257classDict['__init__'] = __init__258OldResult = type('OldResult', (object,), classDict)259class Test_OldTestResult(unittest.TestCase):260    def assertOldResultWarning(self, test, failures):261        with test_support.check_warnings(("TestResult has no add.+ method,",262                                          RuntimeWarning)):263            result = OldResult()264            test.run(result)265            self.assertEqual(len(result.failures), failures)266    def testOldTestResult(self):267        class Test(unittest.TestCase):268            def testSkip(self):269                self.skipTest('foobar')270            @unittest.expectedFailure271            def testExpectedFail(self):272                raise TypeError273            @unittest.expectedFailure274            def testUnexpectedSuccess(self):275                pass276        for test_name, should_pass in (('testSkip', True),277                                       ('testExpectedFail', True),278                                       ('testUnexpectedSuccess', False)):279            test = Test(test_name)280            self.assertOldResultWarning(test, int(not should_pass))281    def testOldTestTesultSetup(self):282        class Test(unittest.TestCase):283            def setUp(self):284                self.skipTest('no reason')285            def testFoo(self):286                pass287        self.assertOldResultWarning(Test('testFoo'), 0)288    def testOldTestResultClass(self):289        @unittest.skip('no reason')290        class Test(unittest.TestCase):291            def testFoo(self):292                pass293        self.assertOldResultWarning(Test('testFoo'), 0)294    def testOldResultWithRunner(self):295        class Test(unittest.TestCase):296            def testFoo(self):297                pass298        runner = unittest.TextTestRunner(resultclass=OldResult,299                                          stream=StringIO())300        # This will raise an exception if TextTestRunner can't handle old301        # test result objects302        runner.run(Test('testFoo'))303class MockTraceback(object):304    @staticmethod305    def format_exception(*_):306        return ['A traceback']307def restore_traceback():308    unittest.result.traceback = traceback309class TestOutputBuffering(unittest.TestCase):310    def setUp(self):311        self._real_out = sys.stdout312        self._real_err = sys.stderr313    def tearDown(self):314        sys.stdout = self._real_out315        sys.stderr = self._real_err316    def testBufferOutputOff(self):317        real_out = self._real_out318        real_err = self._real_err319        result = unittest.TestResult()320        self.assertFalse(result.buffer)321        self.assertIs(real_out, sys.stdout)322        self.assertIs(real_err, sys.stderr)323        result.startTest(self)324        self.assertIs(real_out, sys.stdout)325        self.assertIs(real_err, sys.stderr)326    def testBufferOutputStartTestAddSuccess(self):327        real_out = self._real_out328        real_err = self._real_err329        result = unittest.TestResult()330        self.assertFalse(result.buffer)331        result.buffer = True332        self.assertIs(real_out, sys.stdout)333        self.assertIs(real_err, sys.stderr)334        result.startTest(self)335        self.assertIsNot(real_out, sys.stdout)336        self.assertIsNot(real_err, sys.stderr)337        self.assertIsInstance(sys.stdout, StringIO)338        self.assertIsInstance(sys.stderr, StringIO)339        self.assertIsNot(sys.stdout, sys.stderr)340        out_stream = sys.stdout341        err_stream = sys.stderr342        result._original_stdout = StringIO()343        result._original_stderr = StringIO()344        print 'foo'345        print >> sys.stderr, 'bar'346        self.assertEqual(out_stream.getvalue(), 'foo\n')347        self.assertEqual(err_stream.getvalue(), 'bar\n')348        self.assertEqual(result._original_stdout.getvalue(), '')349        self.assertEqual(result._original_stderr.getvalue(), '')350        result.addSuccess(self)351        result.stopTest(self)352        self.assertIs(sys.stdout, result._original_stdout)353        self.assertIs(sys.stderr, result._original_stderr)354        self.assertEqual(result._original_stdout.getvalue(), '')355        self.assertEqual(result._original_stderr.getvalue(), '')356        self.assertEqual(out_stream.getvalue(), '')357        self.assertEqual(err_stream.getvalue(), '')358    def getStartedResult(self):359        result = unittest.TestResult()360        result.buffer = True361        result.startTest(self)362        return result363    def testBufferOutputAddErrorOrFailure(self):364        unittest.result.traceback = MockTraceback365        self.addCleanup(restore_traceback)366        for message_attr, add_attr, include_error in [367            ('errors', 'addError', True),368            ('failures', 'addFailure', False),369            ('errors', 'addError', True),370            ('failures', 'addFailure', False)371        ]:372            result = self.getStartedResult()373            buffered_out = sys.stdout374            buffered_err = sys.stderr375            result._original_stdout = StringIO()376            result._original_stderr = StringIO()377            print >> sys.stdout, 'foo'378            if include_error:379                print >> sys.stderr, 'bar'380            addFunction = getattr(result, add_attr)381            addFunction(self, (None, None, None))382            result.stopTest(self)383            result_list = getattr(result, message_attr)384            self.assertEqual(len(result_list), 1)385            test, message = result_list[0]386            expectedOutMessage = textwrap.dedent("""387                Stdout:388                foo389            """)390            expectedErrMessage = ''391            if include_error:392                expectedErrMessage = textwrap.dedent("""393                Stderr:394                bar395            """)396            expectedFullMessage = 'A traceback%s%s' % (expectedOutMessage, expectedErrMessage)397            self.assertIs(test, self)398            self.assertEqual(result._original_stdout.getvalue(), expectedOutMessage)399            self.assertEqual(result._original_stderr.getvalue(), expectedErrMessage)400            self.assertMultiLineEqual(message, expectedFullMessage)401    def testBufferSetupClass(self):402        result = unittest.TestResult()403        result.buffer = True404        class Foo(unittest.TestCase):405            @classmethod406            def setUpClass(cls):407                1//0408            def test_foo(self):409                pass410        suite = unittest.TestSuite([Foo('test_foo')])411        suite(result)412        self.assertEqual(len(result.errors), 1)413    def testBufferTearDownClass(self):414        result = unittest.TestResult()415        result.buffer = True416        class Foo(unittest.TestCase):417            @classmethod418            def tearDownClass(cls):419                1//0420            def test_foo(self):421                pass422        suite = unittest.TestSuite([Foo('test_foo')])423        suite(result)424        self.assertEqual(len(result.errors), 1)425    def testBufferSetUpModule(self):426        result = unittest.TestResult()427        result.buffer = True428        class Foo(unittest.TestCase):429            def test_foo(self):430                pass431        class Module(object):432            @staticmethod433            def setUpModule():434                1//0435        Foo.__module__ = 'Module'436        sys.modules['Module'] = Module437        self.addCleanup(sys.modules.pop, 'Module')438        suite = unittest.TestSuite([Foo('test_foo')])439        suite(result)440        self.assertEqual(len(result.errors), 1)441    def testBufferTearDownModule(self):442        result = unittest.TestResult()443        result.buffer = True444        class Foo(unittest.TestCase):445            def test_foo(self):446                pass447        class Module(object):448            @staticmethod449            def tearDownModule():450                1//0451        Foo.__module__ = 'Module'452        sys.modules['Module'] = Module453        self.addCleanup(sys.modules.pop, 'Module')454        suite = unittest.TestSuite([Foo('test_foo')])455        suite(result)456        self.assertEqual(len(result.errors), 1)457if __name__ == '__main__':...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
