Best Python code snippet using slash
main.py
Source:main.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3# Name:     main.py4# Author:   xiooli <xioooli[at]yahoo.com.cn>5# Site:     http://joolix.com6# Licence:  GPLv37# Version:  0912168import web, sys, os, hashlib, zlib, re, time, pickle, base64, json9from settings import *10import libmymoldb as lm11from libmymoldb.functions import *12reload(sys)13sys.setdefaultencoding('utf-8')14# =====================================================BEGIN INITIALIZATION==============================================15# init the permission check function16is_permited = lambda user, user_group, actions = ('s',), involved_user = None, actions_rules = None: \17        permission_check(user, user_group, actions, involved_user, ACTIONS_RULES)18# init the web application19app = web.application(URL_MAP, globals())20# init session21session = SESSION(app)22# init class for translation23trans_class = trans(dic_file_path = TRANS_PATH)24# init dbdef object25dbd_objs = {}26for i in DATABASES:27    dbd_objs[i] = lm.dbdef(ENVS[i]['DEF_FILE'])28# results dic template29tpl_results_dic = {30        'links': NAV_LINKS,31        'logged_user': None,32        'result_list': [],33        'len_results': 0,34        'dbs': PUBLIC_DBS,35        'db': '1',36        'mol': '',37        'table_heads': [],38        'max_results_num': 300,39        'pages': 1,40        'page': 1,41        'search_mol': '',42        'trans_class': None,43        'db_selected': is_selected('1'),44        'mode_selected': is_selected('2'),45        'query_id': '',46        'time_consumed': 0,47        'last_query_id': '',48        'last_db': '',49        'adv_search_query': '',50        'results_search_type': '2',51        'query_mols_dic': {},52        'lang': 'zh_CN',53        'html_title': '',54        'info_head': 'info',55        'info': [],56        'min_simi': '0',57        'urls_dic': URLS_DIC,58        'current_page': ''59        }60# make html pages from templates61html_normal_search = lambda results: WEB_RENDER('header', results_dic = results) + \62        WEB_RENDER('normal_editor', results_dic = results) +\63        WEB_RENDER('last_query', results_dic = results) + \64        WEB_RENDER('display_results', results_dic = results) + \65        WEB_RENDER('footer', results_dic = results)66html_advanced_search = lambda results: WEB_RENDER('header', results_dic = results) + \67        WEB_RENDER('advanced_editor', results_dic = results) +\68        WEB_RENDER('last_query', results_dic = results) + \69        WEB_RENDER('display_results', results_dic = results) + \70        WEB_RENDER('footer', results_dic = results)71html_molinfo = lambda results: WEB_RENDER('header', results_dic = results) + \72        WEB_RENDER('molinfo', results_dic = results) +\73        WEB_RENDER('footer', results_dic = results)74html_info = lambda results: WEB_RENDER('header', results_dic = results) + \75        WEB_RENDER('info', results_dic = results) +\76        WEB_RENDER('footer', results_dic = results)77html_index = lambda results: WEB_RENDER('header', results_dic = results) + \78        WEB_RENDER('index', results_dic = results) +\79        WEB_RENDER('footer', results_dic = results)80html_login = lambda results: WEB_RENDER('header', results_dic = results) + \81        WEB_RENDER('login', results_dic = results) +\82        WEB_RENDER('footer', results_dic = results)83html_register = lambda results: WEB_RENDER('header', results_dic = results) + \84        WEB_RENDER('register', results_dic = results) +\85        WEB_RENDER('footer', results_dic = results)86html_edit = lambda results: WEB_RENDER('header', results_dic = results) + \87        WEB_RENDER('edit', results_dic = results) +\88        WEB_RENDER('footer', results_dic = results)89html_ucpanel = lambda results: WEB_RENDER('header', results_dic = results) + \90        WEB_RENDER('ucpanel', results_dic = results) +\91        WEB_RENDER('footer', results_dic = results)92def html_no_permision(results_dic):93    results_dic['html_title'] = 'permission denied'94    results_dic['info'].append('you have no permission to access here')95    return html_info(results_dic)96def html_query_imcomplete(results_dic):97    results_dic['info'].append('query imcomplete')98    results_dic['html_title'] = 'query err'99    return html_info(results_dic)100def html_query_illegal(results_dic):101    results_dic['info'].append('contains illegal words')102    results_dic['html_title'] = 'query err'103    return html_info(results_dic)104def html_wrong_db(results_dic):105    results_dic['info'].append('wrong db name')106    results_dic['html_title'] = 'wrong db name'107    return html_info(results_dic)108#================================================END INITIALIZATION==============================================109#=======================================================MAIN=====================================================110# The main script of the web site mymoldb111# web site classes112class index:113    def __init__(self):114        # set the target language to session.lang115        trans_class.lang = session.lang116        self.results_dic = tpl_results_dic.copy()117        self.results_dic.update( {118            'logged_user': session.nickname,119            'trans_class': trans_class,120            'mode_selected': is_selected(str(session.search_mode)),121            'html_title': 'home',122            'info': [],123            'current_page': 'home',124            'lang': session.lang125            } )126    def GET(self, name = ''):127        results_dic = self.results_dic128        return html_info(results_dic)129class register:130    def __init__(self):131        # set the target language to session.lang132        trans_class.lang = session.lang133        self.results_dic = tpl_results_dic.copy()134        self.results_dic.update( {135            'logged_user': session.nickname,136            'trans_class': trans_class,137            'mode_selected': is_selected(str(session.search_mode)),138            'html_title': 'register',139            'ref': URLS_DIC["home_url"],140            'info': [],141            'lang': session.lang142            } )143    def GET(self, name = ''):144        if not session.authorized:145            return html_register(self.results_dic)146        else:147            web.seeother(self.results_dic['ref'])148    def POST(self, name = ''):149        input = web.input()150        results_dic = self.results_dic151        for i in ('nick', 'passwd', 'cfm_pw', 'username'):152            if not ( input.has_key(i) and input.get(i) ):153                results_dic['info'].append('info not complete')154                return html_register(results_dic)155        nick = query_preprocessing(input.get('nick'))156        if not 2 <= len(nick) <= 20:157            results_dic['info'].append('nick length not fit')158            return html_register(results_dic)159        # username is email160        username = query_preprocessing(input.get('username'))161        if not re.match(r'[._0-9a-zA-Z]+@[._0-9a-zA-Z]+', username):162            results_dic['info'].append('not valid username')163            return html_register(results_dic)164        # passwd is about to hash, so, no need to preprocess165        passwd = input.get('passwd')166        if passwd != input.get('cfm_pw'):167            results_dic['info'].append('pass word not well confirmed')168            return html_register(results_dic)169        # check if the username available170        env_db = ENVS[USERSDB]171        userdb_obj = lm.users_db(env_db)172        if userdb_obj.select(['*'], '%s = "%s"' %(env_db['USER_EMAIL_FIELD'], username)):173            results_dic['info'] += ['user already registered', ': ', username]174            userdb_obj.close()175            return html_register(results_dic)176        userdb_obj.close()177        if APPROVE_ALL_REGISTERS:178            # to add auto register179            userdb_obj = lm.users_db(env_db)180            userdb_obj.insert_into_usersdb('', nick, DEFAULT_USER_GROUP, md5(passwd), username, time.strftime('%Y-%m-%d %H:%M:%S'), 1)181            results_dic['info'] += ['register finished', 'you can login now']182            userdb_obj.close()183        else:184            tmp_info_dic = {'username': username, 'group': DEFAULT_USER_GROUP, 'nick': nick, 'passwd': md5(passwd)}185            id = md5(str(tmp_info_dic))186            info_dic = { id: tmp_info_dic }187            try:188                register_info_file = open(REGISTER_INFO_FILE, 'r')189                info_dic_from_file = pickle.load(register_info_file)190                register_info_file.close()191                if not info_dic_from_file.has_key(id):192                    info_dic_from_file.update(info_dic)193                else:194                    results_dic['info'] += ['you hava already submited your info', ',', 'please waiting for been approved']195                    return html_register(results_dic)196                register_info_file = open(REGISTER_INFO_FILE, 'w')197                pickle.dump(info_dic_from_file, register_info_file)198                register_info_file.close()199            except:200                register_info_file = open(REGISTER_INFO_FILE, 'w')201                pickle.dump(info_dic, register_info_file)202                register_info_file.close()203            results_dic['info'] += ['register finished', ',', 'please waiting for been approved']204        return html_register(results_dic)205class login:206    '''class for user login/logout'''207    def __init__(self):208        # set the target language to session.lang209        trans_class.lang = session.lang210        self.last_login_time = session.last_login_time211        self.login_try_times = session.login_try_times212        self.results_dic = tpl_results_dic.copy()213        self.results_dic.update( {214            'logged_user': session.nickname,215            'trans_class': trans_class,216            'mode_selected': is_selected(str(session.search_mode)),217            'html_title': 'login',218            'ref': URLS_DIC["home_url"],219            'info': [],220            'lang': session.lang221            } )222    def GET(self, name = ''):223        results_dic = self.results_dic224        ref = URLS_DIC['home_url']225        input = web.input()226        if input.has_key('ref') and input.get('ref'):227            if  input.get('ref') != web.ctx.path:228                ref = input.get('ref')229        # logout230        if input.has_key('action') and input.get('action'):231            if input.get('action') == 'logout':232                session.kill()233                return 'logout OK'234        if session.authorized:235            return web.seeother(ref)236        results_dic['ref'] = ref237        if self.login_try_times >= MAX_LOGIN_TRY_TIMES:238            if time.time() - self.last_login_time < LOGIN_INTERVAL:239                results_dic["info"].append('try again after a while')240                return html_info(self.results_dic)241            else:242                session.login_try_times = 0243        else:244            session.login_try_times += 1245        return html_login(results_dic)246    def POST(self, name = ''):247        input = web.input()248        env_db = ENVS[USERSDB]249        userdb_obj = lm.users_db(env_db)250        results_dic = self.results_dic251        ref = URLS_DIC['home_url']252        if input.has_key('ref') and input.get('ref'):253            ref = base64.b64decode(input.get('ref'))254            if  ref != web.ctx.path:255                results_dic['ref'] = ref256        if session.authorized:257            return web.seeother(ref)258        if ( input.has_key("username") and input.get("username") ) \259                and ( input.has_key("password") and input.get("password") ):260            # user password261            pw = ''262            # default user group is 3 (viewer)263            ug = session.usergroup264            # user name265            nm = query_preprocessing(input.get("username"))266            # username should be email267            if not re.match(r'[0-9_.a-zA-Z]+@[0-9_.a-zA-Z]+', nm):268                results_dic['info'].append("not valid user name")269                return html_login(results_dic)270            # try to connect to the user accounts db to verify the login info of the user271            user_info = userdb_obj.select(['*'], '%s = "%s" LIMIT 1' %(env_db['USER_EMAIL_FIELD'], nm))272            if user_info:273                user_info_dic = user_info[0]274                status = user_info_dic.get(env_db['STATUS_FIELD'])275                if status != 1:276                    results_dic['info'].append("user had been deactived")277                    return html_login(results_dic)278                else:279                    pw = user_info_dic.get(env_db['PASSWORD_FIELD'])280                    ug = user_info_dic.get(env_db['USER_GROUP_FIELD'])281                    ui = user_info_dic.get(env_db['USER_ID_FIELD'])282                    nk = user_info_dic.get(env_db['NICK_FIELD'])283                    if pw == md5(input.get("password")):284                        session.authorized = True285                        # if no nick name, then use user name (the email) as nick name286                        if not nk:287                            session.nickname = nm288                        else:289                            session.nickname = nk290                        session.usergroup = ug291                        session.userid = ui292                        session.username = nm293                    else:294                        results_dic['info'].append("pass word incorrect")295                        return html_login(results_dic)296            else:297                results_dic['info'].append("user not valid")298                return html_login(results_dic)299        return web.seeother(ref)300class man_users:301    def __init__(self):302        trans_class.lang = session.lang303        self.results_dic = tpl_results_dic.copy()304        self.results_dic.update( {305            'logged_user': session.nickname,306            'trans_class': trans_class,307            'lang': session.lang,308            'groups': ', '.join([ str(i) for i in USER_GROUPS ]),309            'info': [],310            'current_page': ''311            } )312    def GET(self, name = ''):313        input = web.input()314        results_dic = self.results_dic315        env_db = ENVS[USERSDB]316        if ( not session.authorized ) or session.usergroup != 1:317            results_dic['info'].append('you have no permission to access here')318            return WEB_RENDER('info', results = results_dic)319        userdb_obj = lm.users_db(env_db)320        # select out the users in group 1 ( administrators ), in case no administrator left321        # after removment or deactivation322        admins = []323        for a in userdb_obj.select([env_db['USER_ID_FIELD']], '%s = "%s"' %(env_db['USER_GROUP_FIELD'], '1')):324            if a.has_key(env_db['USER_ID_FIELD']):325                admins.append(a.get(env_db['USER_ID_FIELD']))326        if input.has_key('approve_id') and input.get('approve_id'):327            try:328                register_info_file = open(REGISTER_INFO_FILE, 'r')329                info_dic_from_file = pickle.load(register_info_file)330                register_info_file.close()331                info = info_dic_from_file.get(input.get('approve_id'))332                # send email to notify user333                sendmail(SENDMAIL_SETTINGS_DICT,334                        info['username'],335                        'MyMolDB register successed notify',336                        '''337Dear %s,338Your application on MyMolDB as a contributor and user has been approved, you can login the site with the user name %s.339The URL of our site is http://xxxxxxxxxx.xxx/db/.340Welcome!341Sincerely yours''' %(info['nick'], info['username']) )342                userdb_obj.insert_into_usersdb('', info['nick'], DEFAULT_USER_GROUP, info['passwd'], info['username'], time.strftime('%Y-%m-%d %H:%M:%S'), 1)343                # delete the info in REGISTER_INFO_FILE after the register procedure finished344                info_dic_from_file.pop(input.get('approve_id'))345                register_info_file = open(REGISTER_INFO_FILE, 'w')346                pickle.dump(info_dic_from_file, register_info_file)347                register_info_file.close()348                results_dic['info'] += ['approve user', info['username'], 'successed']349            except:350                results_dic['info'] += ['approve user', info['username'], 'failed']351        if input.has_key('active_id') and input.get('active_id'):352            active_id = input.get('active_id')353            userdb_obj.update_usersdb(354                    {env_db['STATUS_FIELD']: '1'},355                    '%s = "%s"' %(env_db['USER_ID_FIELD'], active_id) )356            results_dic['info'] += ['active', 'user', active_id, '(id)', 'successed']357        if input.has_key('deactive_id') and input.get('deactive_id'):358            deactive_id = input.get('deactive_id')359            if len(admins) <= 1 and int(deactive_id) in admins:360                results_dic['info'] += ['at least one admin is needed', ', ', 'you can not', 'deactive', 'it']361            else:362                userdb_obj.update_usersdb(363                        {env_db['STATUS_FIELD']: '0'},364                        '%s = "%s"' %(env_db['USER_ID_FIELD'], input.get('deactive_id')) )365                results_dic['info'] += ['deactive', 'user', deactive_id, '(id)', 'successed']366        if input.has_key('remove_id') and input.get('remove_id'):367            remove_id = input.get('remove_id')368            if len(admins) <= 1 and int(remove_id) in admins:369                results_dic['info'] += ['at least one admin is needed', ', ', 'you can not', ' ', 'remove', 'it']370            else:371                userdb_obj.delete('%s = "%s"' %(env_db['USER_ID_FIELD'], input.get('remove_id')))372                results_dic['info'] += ['remove', 'user', remove_id, '(id)', 'successed']373        if ( input.has_key('chgroup_id') and input.get('chgroup_id') ) and \374                ( input.has_key('group') and int(input.get('group')) in USER_GROUPS):375            id = input.get('chgroup_id')376            group = input.get('group')377            userdb_obj.update_usersdb(378                    {env_db['USER_GROUP_FIELD']: group},379                    "%s = '%s'" %(env_db['USER_ID_FIELD'], id) )380            results_dic['info'] += [chgroup_id, '(id)', 'change group', 'to', group, 'successed']381        approved_users = []382        unapproved_users = []383        user_info = userdb_obj.select(['*'], '1')384        userdb_obj.close()385        if user_info:386            for info in user_info:387                approved_users.append(388                        (str(info[env_db['USER_ID_FIELD']]),389                        info[env_db['USER_GROUP_FIELD']],390                        info[env_db['USER_EMAIL_FIELD']],391                        info[env_db['NICK_FIELD']],392                        str(info[env_db['STATUS_FIELD']])) )393        try:394            register_info_file = open(REGISTER_INFO_FILE, 'r')395            info_dic_from_file = pickle.load(register_info_file)396            register_info_file.close()397            for id, info in info_dic_from_file.items():398                # '-1' means still not approved, '0' means inactive while '1' is active399                unapproved_users.append( (id, info['group'], info['username'], info['nick'], '-1') )400        except:401            pass402        results_dic['approved_users'] = approved_users403        results_dic['unapproved_users'] = unapproved_users404        #return approved_users, unapproved_users405        return WEB_RENDER('man_users', results_dic = results_dic)406class ucpanel:407    def __init__(self):408        trans_class.lang = session.lang409        self.results_dic = tpl_results_dic.copy()410        self.userid = session.userid411        self.nickname = session.nickname412        self.usergroup = session.usergroup413        self.results_dic.update( {414            'logged_user': self.nickname,415            'trans_class': trans_class,416            'lang': session.lang,417            'html_title': 'user control panel',418            'ucp_urls': [],419            'info': [],420            'current_page': ''421            } )422    def GET(self, name = ''):423        results_dic = self.results_dic424        if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s')) ):425            return html_no_permision(results_dic)426        results_dic['ucp_urls'] += UCP_URLS427        # users in group 1 are administrators, so, show them the user management option428        if session.usergroup == 1:429            user_man_url = ('user management', URLS_DIC['manusers_url'], '')430            if not user_man_url in results_dic['ucp_urls']:431                results_dic['ucp_urls'].append(user_man_url)432        return html_ucpanel(results_dic)433class chusersettings:434    def __init__(self):435        trans_class.lang = session.lang436        self.results_dic = tpl_results_dic.copy()437        self.results_dic.update( {438            'logged_user': session.nickname,439            'trans_class': trans_class,440            'lang': session.lang,441            'reload_parent': False,442            'info': [],443            'nick_name': session.nickname,444            'email': session.username445            } )446    def GET(self, name = ''):447        # check basic permission448        if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s')) ):449            return html_no_permision(self.results_dic)450        return WEB_RENDER('chusersettings', results_dic = self.results_dic)451    def POST(self, name = ''):452        # check basic permission453        if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s')) ):454            return html_no_permision(self.results_dic)455        input = web.input()456        results_dic = self.results_dic457        partial_sql = ''458        sql_string = ''459        new_nick = ''460        new_email = ''461        env_db = ENVS[USERSDB]462        old_pw = ''463        quit = False464        if input.has_key('nick') and input.get('nick'):465            new_nick = input.get('nick')466            if new_nick != session.nickname:467                if 2 <= len(new_nick) <= 20:468                    partial_sql += ' %s = "%s", ' %(env_db['NICK_FIELD'], new_nick)469        if ( input.has_key('new_pw') and input.get('new_pw') ) and ( input.has_key('cfm_pw') and input.get('cfm_pw') ):470            new_pw = input.get('new_pw')471            if new_pw == input.get('cfm_pw'):472                if input.has_key('old_pw') and input.get('old_pw'):473                    old_pw = input.get('old_pw')474                    userdb_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])475                    user_info = userdb_obj.execute('SELECT %s FROM %s WHERE %s = "%s" LIMIT 1;' %(476                        env_db['PASSWORD_FIELD'], env_db['USERS_TABLE'], env_db['NICK_FIELD'], session.nickname))477                    userdb_obj.close()478                    if user_info:479                        user_info = user_info[0]480                        if user_info.has_key(env_db['PASSWORD_FIELD']):481                            if user_info.get(env_db['PASSWORD_FIELD']) == md5(old_pw):482                                partial_sql += ' %s = "%s", ' %(env_db['PASSWORD_FIELD'], md5(new_pw))483                            else:484                                 results_dic['info'].append('wrong old pass word')485                                 quit = True486                        else:487                             results_dic['info'].append('failed to get user info')488                             quit = True489                    else:490                        results_dic['info'] += ['no such user', ': ', session.nickname]491                        quit = True492                else:493                     results_dic['info'].append('old pass word is needed')494                     quit = True495            else:496                 results_dic['info'].append('pass word not well confirmed')497                 quit = True498        if quit:499            return WEB_RENDER('chusersettings', results = results_dic)500        elif partial_sql:501            userdb_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])502            sql_string = 'UPDATE %s SET %s WHERE %s = "%s";' %(503                    env_db['USERS_TABLE'], partial_sql.rstrip(', '), env_db['NICK_FIELD'], session.nickname)504            userdb_obj.execute(sql_string)505            userdb_obj.close()506            if new_nick:507                session.nickname = new_nick508            if new_email:509                session.username = new_email510            results_dic['reload_parent'] = True511            results_dic['info'].append('settings updated successfully')512            return WEB_RENDER('chusersettings', results_dic = results_dic)513        else:514            results_dic['info'].append('nothing to change')515            return WEB_RENDER('chusersettings', results_dic = results_dic)516class search:517    '''for structure search'''518    def __init__(self):519        # set default results to show per page520        self.results_per_page = int(session.results_per_page)521        # set max results number if it's not yet set522        self.max_results_num = int(session.max_results_num)523        # set the target language to session.lang524        trans_class.lang = session.lang525        self.results_dic = tpl_results_dic.copy()526        self.results_dic.update( {527            'logged_user': session.nickname,528            'max_results_num': self.max_results_num,529            'trans_class': trans_class,530            'db_selected': is_selected('1'),531            'mode_selected': is_selected('2'),532            'query_id': '',533            'part_fields': [],534            'pri_and_struc_fields': {'pri_field': '', 'struc_field': ''},535            'results_search_type': '2',536            'lang': session.lang,537            'prequery': '',538            'html_title': 'normal search',539            'info': [],540            'current_page': 'search'541            } )542    def GET(self, name = ''):543        input = web.input()544        # results_mol_ids stores the mol ids found by the previous query545        results_of_get = []546        simi_values = []547        results_mol_ids = []548        results = []549        results_dic = self.results_dic550        search_mol = session.search_mol551        stored_results_of_post = ''552        stored_results_of_get = {}553        results_mol_ids = []554        query_info = {}555        query_mols_dic = {}556        adv_search_query = ''557        search_mol = ''558        md5_query = ''559        available_dbs = PUBLIC_DBS560        if session.authorized:561            available_dbs = DATABASES562            self.results_dic.update({'dbs': available_dbs})563        # check basic permission564        if not is_permited(session.userid, session.usergroup, ('s')):565            return html_no_permision(results_dic)566        try:567            if input.has_key('db') and input.get('db'):568                db_name = input.get('db')569                if db_name not in available_dbs:570                    return html_wrong_db(results_dic)571                session.db = db_name572            else:573                db_name = session.db574            if input.has_key('prequery') and input.get('prequery'):575                results_dic['prequery'] = input.get('prequery')576            env_db = ENVS[db_name]577            dbd_obj = dbd_objs[db_name]578            tables = dbd_obj.tables579            pri_field = env_db['PRI_FIELD']580            struc_field = env_db['2D_STRUC_FIELD']581            sql_fields_dic = env_db['FIELDS_TO_SHOW_DIC']582            fields_to_show_list_all = sql_fields_dic['all']583            fields_to_show_list_part = sql_fields_dic['part']584            part_fields = fields_to_show_list_part['fields']585            part_fields = [ re.sub(r'^[^\.]+\.', '', i) for i in part_fields ]586            table_heads = fields_to_show_list_part['comments']587            if input.has_key('query_id') and input.get('query_id'):588                md5_query = str(input.get('query_id'))589            if input.has_key('results_per_page'):590                self.results_per_page = int(input.get('results_per_page'))591                session.results_per_page = int(input.get('results_per_page'))592            if input.has_key('max_results_num'):593                self.max_results_num = int(input.get('max_results_num'))594                session.max_results_num = int(input.get('max_results_num'))595            search_type = session.search_mode596            if input.has_key('mode') and input.get('mode'):597                search_type = input.get('mode')598            else:599                search_type = session.search_mode600            results_dic['mode_selected'] = is_selected(search_type)601            if input.has_key('page'):602                # check if the results_of_get_file already exists603                # results_of_get_file is the results from the GET method of search class604                results_of_get_file = CACHE_PATH + '/' + md5_query + '.get'605                if os.path.exists(results_of_get_file):606                    if time.time() - os.path.getmtime(results_of_get_file) <= RESULTS_FILE_TIME_OUT:607                        f = open(results_of_get_file)608                        stored_results_of_get = pickle.load(f)609                        query_db = stored_results_of_get['query_info']['query_db']610                        if( not query_db in available_dbs ) or query_db != db_name:611                            return html_wrong_db(results_dic)612                        f.close()613                    else:614                        os.remove(results_of_get_file)615                else:616                    # post_results_file is the results from the POST method of search class617                    post_results_file = CACHE_PATH + '/' + md5_query + '.post'618                    if os.path.exists(post_results_file):619                        f = open(post_results_file)620                        stored_results_of_post = pickle.load(f)621                        f.close()622                        results_mol_ids = stored_results_of_post['query_results_ids']623                        simi_values = stored_results_of_post['simi_values']624                        query_info = stored_results_of_post['query_info']625                    else:626                        return web.seeother(URLS_DIC['search_url'])627                    if results_mol_ids:628                        db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])629                        sql_obj = lm.sql(env_db)630                        if not pri_field in part_fields:631                            select_cols = tables[0] + '.' + pri_field + ', ' + ', '.join(fields_to_show_list_part['fields'])632                        else:633                            select_cols = ', '.join(fields_to_show_list_part['fields'])634                        query_string = 'SELECT %s FROM %s WHERE ( %s IN ( %s ) );' % (635                                select_cols,636                                sql_obj.gen_join_part_sql(tables, pri_field),637                                tables[0] + '.' + pri_field, ', '.join(results_mol_ids) )638                        time_before_search = time.time()639                        # this step is always fast, so no lock is set640                        results = db_obj.execute(query_string)641                        time_consumed = time.time() - time_before_search642                        query_info['time_consumed'] += time_consumed643                        db_obj.close()644                        simi_field = env_db['SIMI_FIELD']645                        for r in results:646                            tmpd = {}647                            mol_id = ''648                            if not pri_field in part_fields:649                                tmpd[pri_field] = r[pri_field]650                                mol_id = r[pri_field]651                            for j in fields_to_show_list_part['fields']:652                                j = re.sub(r'^[^\.]+\.', '', j)653                                if j in [ dbd_obj.get_field(k) for k in env_db['COMPRESSED_KEYS'] ]:654                                    # uncompress compressed entries655                                    if j == dbd_obj.get_field(env_db['2D_STRUC_KEY']):656                                        if session.removeh:657                                            mol = lm.mymol('mol', zlib.decompress(r[j])).removeh()658                                        else:659                                            mol = zlib.decompress(r[j])660                                        tmpd[j] = mol661                                    else:662                                        tmpd[j] = zlib.decompress(r[j])663                                elif j == pri_field:664                                    mol_id = r[j]665                                    tmpd[j] = mol_id666                                else:667                                    tmpd[j] = r[j]668                            if mol_id and simi_values and search_type == "3":669                                tmpd[simi_field] = simi_values[mol_id]670                            # l contains the mol info, each mol in a sublist: [ [...], [...] ]671                            results_of_get.append(tmpd)672                        if search_type == '3':673                            table_heads = fields_to_show_list_part['comments'] + ['simi value']674                            part_fields = fields_to_show_list_part['fields'] + [simi_field]675                            part_fields = [ re.sub(r'^[^\.]+\.', '', i) for i in part_fields ]676                            # sort the results on similarity677                            if simi_values:678                                results_of_get.sort( lambda e1, e2: - cmp(e1[simi_field], e2[simi_field]) )679                            for i in results_of_get:680                                i.update({simi_field: str(round(i[simi_field]*100, 2)) + '%'})681                    stored_results_of_get = {682                            'results_of_get': results_of_get,683                            'part_fields': part_fields,684                            'pri_and_struc_fields': {'pri_field': pri_field, 'struc_field': struc_field},685                            'table_heads': table_heads,686                            'query_info': query_info687                            }688                    # store the results689                    f = open(results_of_get_file, 'w')690                    pickle.dump(stored_results_of_get, f)691                    f.close()692                # results about to display693                query_info = stored_results_of_get['query_info']694                db_name = query_info['query_db']695                query_mols_dic = query_info['query_mols_dic']696                page = int(input.get('page'))697                results_of_get = stored_results_of_get['results_of_get']698                len_results = len(results_of_get)699                # calculate the page thing700                pages = ( lambda x, y: x % y and x / y + 1 or x / y ) (len_results, self.results_per_page)701                show_range_left = self.results_per_page * (page - 1)702                if show_range_left > len_results:703                    show_range_left = len_results - ( len_results % self.results_per_page )704                show_range_right = self.results_per_page * page705                # store the results in a dict706                results_dic.update({707                    'result_list': results_of_get[show_range_left:show_range_right],708                    'len_results': len_results,709                    'db': db_name,710                    'table_heads': stored_results_of_get['table_heads'],711                    'part_fields': stored_results_of_get['part_fields'],712                    'pri_and_struc_fields': stored_results_of_get['pri_and_struc_fields'],713                    'max_results_num': query_info['max_results_num'],714                    'pages': pages,715                    'page': page,716                    'min_simi': str(round(query_info['min_simi'], 2)),717                    'search_mol': query_info['query_mol'],718                    'query_mols_dic': query_info['query_mols_dic'],719                    'query_id': md5_query,720                    'time_consumed': round(query_info['time_consumed'], 2),721                    'adv_search_query': query_info['adv_search_query'],722                    'db_selected': is_selected(db_name),723                    'results_search_type': query_info['query_mode'],724                    'last_query_id': query_info['last_query_id'],725                    'last_db': query_info['last_db']726                    })727        except Exception, e:728            results_dic['info'].append('check your query')729            results_dic['info'].append(e)730            results_dic['html_title'] = 'query err'731            return html_info(results_dic)732        # set the title of the html page and render the results733        if search_type == "4":734            results_dic['html_title'] = 'advanced search'735            return html_advanced_search(results_dic)736        else:737            results_dic['html_title'] = 'normal search'738            return html_normal_search(results_dic)739    def POST(self, name = ''):740        # search types: 1: exact search, 2: substructure search,741        # 3: similarity search, 4: advanced search, 5: superstructure search742        input = web.input()743        results_dic = tpl_results_dic.copy()744        # check basic permission745        if not is_permited(session.userid, session.usergroup, ('s')):746            return html_no_permision(results_dic)747        query_smiles = ''748        adv_search_query = ''749        query_smiles_dic = {}750        query_mols_dic = {}751        max_results_num_from_query = 0752        # for similarity search753        min_simi = 0754        simi_values_dic = {}755        regex0 = ''756        regex1 = ''757        regex2 = ''758        last_mol_ids = []759        last_query_id = ''760        last_db = ''761        available_dbs = PUBLIC_DBS762        if session.authorized:763            available_dbs = DATABASES764            self.results_dic.update({'dbs': available_dbs})765        try:766            if input.has_key('results_per_page'):767                self.results_per_page = int(input.get('results_per_page'))768                session.results_per_page = int(input.get('results_per_page'))769            if input.has_key('max_results_num'):770                self.max_results_num = int(input.get('max_results_num'))771                session.max_results_num = int(input.get('max_results_num'))772            if input.has_key('mol'):773                search_mol = str(input.get('mol'))774                session.search_mol = str(input.get('mol'))775            else:776                search_mol = ''777            search_type = session.search_mode778            if input.has_key('mode') and input.get('mode'):779                if input.get('mode') in SEARCH_MODES:780                    search_type = input.get('mode')781                    session.search_mode = input.get('mode')782                else:783                    results_dic['info'] += ['invalid mode', input.get('mode')]784                    return html_info(results_dic)785            # chose which database to use786            if input.has_key('db') and input.get('db'):787                db_name = input.get('db')788                if db_name not in available_dbs:789                    return html_wrong_db(results_dic)790                session.db = input.get('db')791            else:792                db_name = session.db793            env_db = ENVS[db_name]794            sql_obj = lm.sql(env_db)795            dbd_obj = dbd_objs[db_name]796            tables = dbd_obj.tables797            pri_field = env_db['PRI_FIELD']798            smi_field = dbd_obj.get_field(env_db['SMILES_KEY'])799            simi_field = env_db['SIMI_FIELD']800            # in advanced mode, there could be more than one smiles and mols separated with "|".801            if search_type == "4":802                if input.has_key('query') and input.get('query'):803                    adv_search_query = query_preprocessing(str(input.get('query'))) + ' ' # add a space at the end for regex match804                    # recover escaped ' and ", for ' and " are legal in mode "4"805                    adv_search_query = re.sub(r'\\[\'\"]', '"', adv_search_query)806                else:807                    return html_query_imcomplete(results_dic)808                # get the smiless and mols from the input of advanced mode809                query_smiles_dic = {}810                query_mols_dic = {}811                if input.has_key('smiless') and input.get('smiless'):812                    for j in [ i for i in query_preprocessing(str(input.get('smiless'))).split('|') if i and i != '|' ]:813                        tmpl = j.split(':')814                        if len(tmpl) == 2:815                            query_smiles_dic[tmpl[0]] = tmpl[1]816                if input.has_key('mols') and input.get('mols'):817                    for j in [ i for i in query_preprocessing(str(input.get('mols'))).split('|') if i and i != '|' ]:818                        tmpl = j.split(':')819                        if len(tmpl) == 2:820                            query_mols_dic[tmpl[0]] = tmpl[1]821                # store in session822                if query_smiles_dic:823                    session.query_smiles_dic = query_smiles_dic824                if query_mols_dic:825                    session.query_mols_dic = query_mols_dic826                # check if the query legal827                # first check if there are key words what are not in the abbr_dic828                regex0 = re.compile(r'([><=!~]+ *[^ )(]+[ )(]*)|([)(])|([sS][uU][bBpP])|([mM][aA][xX])|([aA][nN][dD])|([oO][rR])|([nN][oR][tT])')829                key_words = []830                key_words = list(set(regex0.sub(' ', adv_search_query).split()))831                for k in key_words:832                    if not k in env_db['ABBR_DIC'].keys():833                        results_dic = self.results_dic834                        results_dic['info'].append('contains illegal words')835                        results_dic['info'].append(': ' + k)836                        results_dic['html_title'] = 'query err'837                        return html_info(results_dic)838                # second check if the mol buffer contains all needed molecules839                regex1 = re.compile(r'[sS][uU][bBpP] *[!=]+ *[^ )(]*(?=[ )(]+)')840                mol_key = ''841                for i in regex1.findall(adv_search_query):842                    mol_key = i.split('=')[-1].strip(' ')843                    if not ( query_smiles_dic.has_key(mol_key) and query_smiles_dic[mol_key] ):844                        results_dic = self.results_dic845                        results_dic['info'].append('mol buffer imcomplete')846                        results_dic['html_title'] = 'query err'847                        return html_info(results_dic)848                # replace some words (~ to like) and abstract the max (limit) value if it has849                new_adv_search_query = adv_search_query.replace('~', ' LIKE ')850                regex2 = re.compile(r'[mM][aA][xX] *=+ *[^ )(]*(?=[ )(]+)')851                tmpl = regex2.findall(new_adv_search_query)852                if len(tmpl) == 1:853                    try:854                        max_results_num_from_query = int(tmpl[0].split('=')[-1].strip(' '))855                    except:856                        pass857                new_adv_search_query = regex2.sub('', new_adv_search_query)858                try:859                    query_sql = sql_obj.gen_adv_search_sql(new_adv_search_query, query_smiles_dic, env_db['ABBR_DIC'])860                except Exception, e:861                    results_dic['info'] += ['query err', e]862                    return html_info(results_dic)863            elif input.has_key('smiles') and input.get('smiles'):864                query_smiles = query_preprocessing(str(input.get('smiles')))865                if search_type == "3":866                    if input.has_key('min_simi') and input.get("min_simi"):867                        try:868                            min_simi = float(input.get("min_simi"))869                        except:870                            results_dic = self.results_dic871                            results_dic['info'].append('min simi contains illegal char')872                            results_dic['html_title'] = 'query err'873                            return html_info(results_dic)874                    else:875                        return html_query_imcomplete(results_dic)876                    query_sql = sql_obj.gen_simi_search_sql(query_smiles, min_simi)877                else:878                    if search_type == '1':879                        type = '1'880                    elif search_type == '2':881                        type = '2'882                    elif search_type == '5':883                        type = '4'884                    query_sql = sql_obj.gen_search_sql(query_smiles, type)885            else:886                return html_query_imcomplete(results_dic)887            db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])888            results = []889            results_tmp = True890            # search in results891            if input.has_key('search_in_results') and input.get('search_in_results'):892                last_results_of_post_file = CACHE_PATH + '/' + input.get('search_in_results') + '.post'893                if os.path.exists(last_results_of_post_file):894                    f = open(last_results_of_post_file)895                    last_results_of_post = pickle.load(f)896                    f.close()897                    last_mol_ids = last_results_of_post['query_results_ids']898                    last_query_info = last_results_of_post['query_info']899                    last_query_id = last_query_info['query_id']900                    last_db = last_query_info['query_db']901            # generates the sql string for query902            if last_mol_ids:903                query_string = 'SELECT %s, %s %s AND %s IN (%s)' % (904                        tables[0] + '.' + pri_field,905                        smi_field,906                        query_sql,907                        tables[0] + '.' + pri_field,908                        ', '.join(last_mol_ids) )909            else:910                query_string = 'SELECT %s, %s %s' % (911                        tables[0] + '.' + pri_field,912                        smi_field,913                        query_sql)914            # check if there's already a results file915            md5_query = md5(query_string + db_name + search_type)916            session.md5_query = md5_query917            results_of_post_file = CACHE_PATH + '/' + md5_query + '.post'918            lock_file = results_of_post_file + '.lock'919            if os.path.exists(results_of_post_file):920                if time.time() - os.path.getmtime(results_of_post_file) >= RESULTS_FILE_TIME_OUT:921                    os.remove(results_of_post_file)922                else:923                    return web.seeother(URLS_DIC['search_url'] + '?page=1&db=%s&query_id=%s' %(db_name, md5_query))924            # check if the lock file exists, if exists then wait, else continue.925            while os.path.exists(lock_file):926                # check if the life span of lock_file reached927                if time.time() - os.path.getmtime(lock_file) >= LOCK_FILE_LIFE_SPAN:928                    os.remove(lock_file)929                else:930                    time.sleep(5)931            # define filters932            filter = None933            if search_type == "1":934                def filter(results_dict, mol_obj = lm.mymol('smi', query_smiles), smiles_field = smi_field):935                    if results_dict.has_key(smiles_field):936                        return mol_obj.gen_openbabel_can_smiles() == results_dict[smiles_field]937                    return False938            elif search_type == "2":939                def filter(results_dict, mol_obj = lm.mymol('smi', query_smiles), smiles_field = smi_field):940                    if results_dict.has_key(smiles_field):941                        return mol_obj.sub_match('smi', results_dict[smiles_field])942                    return False943            elif search_type == "3":944                # similarity search actually needs no filter.945                pass946            elif search_type == '4':947                sub_smiles = []948                sup_smiles = []949                sub_m_objs = []950                sup_m_objs = []951                re_sub = re.compile(r'[sS][uU][bB] *[!=]+ *[^ )(]*(?=[ )(]+)')952                re_sup = re.compile(r'[sS][uU][pP] *[!=]+ *[^ )(]*(?=[ )(]+)')953                for i in re_sub.findall(adv_search_query):954                    # for querying for a molecule contains no a paticular substructure always955                    # filters out some positive ones, so it's no need to filter here any more,956                    # hence we exclude those '!=' ones here.957                    if re.findall(r'!=', i):958                        continue959                    elif re.findall(r'[^!]=', i):960                        mol_key = i.split('=')[-1].strip(' ')961                        if query_smiles_dic.has_key(mol_key) and query_smiles_dic[mol_key]:962                            sub_smiles.append(query_smiles_dic[mol_key])963                for i in re_sup.findall(adv_search_query):964                    # for querying for the superstructure of a paticular molecule always965                    # filters out some positive ones, so it's no need to filter here any more,966                    # hence we exclude those '!=' ones here.967                    if re.findall(r'!=', i):968                        continue969                    elif re.findall(r'[^!]=', i):970                        mol_key = i.split('=')[-1].strip(' ')971                        if query_smiles_dic.has_key(mol_key) and query_smiles_dic[mol_key]:972                            sup_smiles.append(query_smiles_dic[mol_key])973                sub_m_objs = [ lm.mymol('smi', m) for m in sub_smiles ]974                sup_m_objs = [ lm.mymol('smi', m) for m in sup_smiles ]975                # filter is only needed to define when the m_objs list is not empty.976                if sub_m_objs or sup_m_objs:977                    def filter(results_dict,978                            sub_mol_objs = sub_m_objs,979                            sup_mol_objs = sup_m_objs,980                            smiles_field = smi_field):981                        if results_dict.has_key(smiles_field):982                            for i in sub_mol_objs:983                                if not i.sub_match('smi', results_dict[smiles_field]):984                                    return False985                            for i in sup_mol_objs:986                                if not i.sup_match('smi', results_dict[smiles_field]):987                                    return False988                            return True989                        return False990            elif search_type == '5':991                def filter(results_dict, mol_obj = lm.mymol('smi', query_smiles), smiles_field = smi_field):992                    if results_dict.has_key(smiles_field):993                        return mol_obj.sup_match('smi', results_dict[smiles_field])994                    return False995            # limit the results996            if max_results_num_from_query:997                num_per_select = 150998                max_results_num = max_results_num_from_query999            elif search_type == '3':1000                max_results_num = 101001            else:1002                num_per_select = 1501003                max_results_num = self.max_results_num1004            # search in database and filter the reuslts1005            # record time consuming1006            time_before_search = time.time()1007            if search_type in ('1', '3'):1008                if search_type == '1':1009                    limit = '' #' LIMIT 1'1010                elif search_type == '3':1011                    limit = ' LIMIT %s ' %(max_results_num,)1012                # set lock to avoid duplocated search1013                open(lock_file, 'w')1014                results = db_obj.execute(query_string + limit + ';')1015                if os.path.exists(lock_file):1016                    os.remove(lock_file)1017                db_obj.close()1018            elif search_type in ('2', '4', '5'):1019                # set lock to avoid duplocated search1020                open(lock_file, 'w')1021                results = db_obj.query(1022                        query_string,1023                        filter,1024                        tables[0] + '.' + pri_field,1025                        max_results_num,1026                        num_per_select)1027                if os.path.exists(lock_file):1028                    os.remove(lock_file)1029                db_obj.close()1030            time_consumed = time.time() - time_before_search1031            # preprocessing the results to store1032            # cut of the extra results1033            results = results[:max_results_num]1034            results_dic_to_store = {}1035            mol_id_to_store = []1036            query_info = {}1037            for r in results:1038                if r.has_key(pri_field):1039                    id = r[pri_field]1040                    mol_id_to_store.append(str(id))1041                    if r.has_key(simi_field):1042                        simi_values_dic[id] = r[simi_field]1043            query_info = {1044                    'query_mols_dic': query_mols_dic,1045                    'query_smiles_dic': query_smiles_dic,1046                    'query_smiles': query_smiles,1047                    'query_mol': search_mol,1048                    'query_string': query_string,1049                    'query_db': db_name,1050                    'max_results_num': max_results_num,1051                    'adv_search_query': adv_search_query,1052                    'query_mode': search_type,1053                    'min_simi': min_simi,1054                    'time_consumed': time_consumed,1055                    'last_query_id': last_query_id,1056                    'last_db': last_db,1057                    'query_id': md5_query1058                    }1059            results_dic_to_store = {1060                    'query_results_ids': mol_id_to_store,1061                    'simi_values': simi_values_dic,1062                    'query_info': query_info1063                    }1064            # store search results1065            f = open(results_of_post_file, 'w')1066            pickle.dump(results_dic_to_store, f)1067            f.close()1068            return web.seeother(URLS_DIC['search_url'] + '?page=1&db=%s&query_id=%s' %(db_name, md5_query))1069        except Exception, e:1070            results_dic = self.results_dic1071            results_dic['info'].append('check your query')1072            results_dic['info'].append(e)1073            results_dic['html_title'] = 'query err'1074            return html_info(results_dic)1075class webapi:1076    '''web api for structure search'''1077    def __init__(self):1078        # set default results to show per page1079        self.results_per_page = int(session.results_per_page)1080        # set max results number if it's not yet set1081        self.max_results_num = int(session.max_results_num)1082        # set the target language to session.lang1083        self.results_dic = tpl_results_dic.copy()1084        self.results_dic.update( {1085            'logged_user': session.nickname,1086            'max_results_num': self.max_results_num,1087            'query_id': '',1088            'part_fields': [],1089            'pri_and_struc_fields': {'pri_field': '', 'struc_field': ''},1090            'results_search_type': '2',1091            'prequery': '',1092            'info': [],1093            } )1094    def GET(self, name = ''):1095        input = web.input()1096        # results_mol_ids stores the mol ids found by the previous query1097        results_of_get = []1098        simi_values = []1099        results_mol_ids = []1100        results = []1101        results_dic = self.results_dic1102        search_mol = session.search_mol1103        stored_results_of_post = ''1104        stored_results_of_get = {}1105        results_mol_ids = []1106        query_info = {}1107        query_mols_dic = {}1108        adv_search_query = ''1109        search_mol = ''1110        md5_query = ''1111        available_dbs = PUBLIC_DBS1112        if session.authorized:1113            available_dbs = DATABASES1114            self.results_dic.update({'dbs': available_dbs})1115        # check basic permission1116        if not is_permited(session.userid, session.usergroup, ('s')):1117            return json.dumps({'status': 'Permition denied'})1118        try:1119            if input.has_key('db') and input.get('db'):1120                db_name = input.get('db')1121                if db_name not in available_dbs:1122                    return json.dumps({'status': 'DB not accessible!'})1123                session.db = db_name1124            else:1125                db_name = session.db1126            if input.has_key('prequery') and input.get('prequery'):1127                results_dic['prequery'] = input.get('prequery')1128            env_db = ENVS[db_name]1129            dbd_obj = dbd_objs[db_name]1130            tables = dbd_obj.tables1131            pri_field = env_db['PRI_FIELD']1132            struc_field = env_db['2D_STRUC_FIELD']1133            sql_fields_dic = env_db['FIELDS_TO_SHOW_DIC']1134            fields_to_show_list_all = sql_fields_dic['all']1135            fields_to_show_list_part = sql_fields_dic['part']1136            part_fields = fields_to_show_list_part['fields']1137            part_fields = [ re.sub(r'^[^\.]+\.', '', i) for i in part_fields ]1138            table_heads = fields_to_show_list_part['comments']1139            if input.has_key('query_id') and input.get('query_id'):1140                md5_query = str(input.get('query_id'))1141            if input.has_key('results_per_page'):1142                self.results_per_page = int(input.get('results_per_page'))1143                session.results_per_page = int(input.get('results_per_page'))1144            if input.has_key('max_results_num'):1145                self.max_results_num = int(input.get('max_results_num'))1146                session.max_results_num = int(input.get('max_results_num'))1147            search_type = session.search_mode1148            if input.has_key('mode') and input.get('mode'):1149                search_type = input.get('mode')1150            else:1151                search_type = session.search_mode1152            if input.has_key('page'):1153                # check if the results_of_get_file already exists1154                # results_of_get_file is the results from the GET method of search class1155                results_of_get_file = CACHE_PATH + '/' + md5_query + '.get'1156                if os.path.exists(results_of_get_file):1157                    if time.time() - os.path.getmtime(results_of_get_file) <= RESULTS_FILE_TIME_OUT:1158                        f = open(results_of_get_file)1159                        stored_results_of_get = pickle.load(f)1160                        query_db = stored_results_of_get['query_info']['query_db']1161                        if( not query_db in available_dbs ) or query_db != db_name:1162                            return json.dumps({'status': 'DB not accessible!'})1163                        f.close()1164                    else:1165                        os.remove(results_of_get_file)1166                else:1167                    # post_results_file is the results from the POST method of search class1168                    post_results_file = CACHE_PATH + '/' + md5_query + '.post'1169                    if os.path.exists(post_results_file):1170                        f = open(post_results_file)1171                        stored_results_of_post = pickle.load(f)1172                        f.close()1173                        results_mol_ids = stored_results_of_post['query_results_ids']1174                        simi_values = stored_results_of_post['simi_values']1175                        query_info = stored_results_of_post['query_info']1176                    else:1177                        return json.dumps({'status': 'Not yet posted!'})1178                    if results_mol_ids:1179                        db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])1180                        sql_obj = lm.sql(env_db)1181                        if not pri_field in part_fields:1182                            select_cols = tables[0] + '.' + pri_field + ', ' + ', '.join(fields_to_show_list_part['fields'])1183                        else:1184                            select_cols = ', '.join(fields_to_show_list_part['fields'])1185                        query_string = 'SELECT %s FROM %s WHERE ( %s IN ( %s ) );' % (1186                                select_cols,1187                                sql_obj.gen_join_part_sql(tables, pri_field),1188                                tables[0] + '.' + pri_field, ', '.join(results_mol_ids) )1189                        time_before_search = time.time()1190                        # this step is always fast, so no lock is set1191                        results = db_obj.execute(query_string)1192                        time_consumed = time.time() - time_before_search1193                        query_info['time_consumed'] += time_consumed1194                        db_obj.close()1195                        simi_field = env_db['SIMI_FIELD']1196                        for r in results:1197                            tmpd = {}1198                            mol_id = ''1199                            if not pri_field in part_fields:1200                                tmpd[pri_field] = r[pri_field]1201                                mol_id = r[pri_field]1202                            for j in fields_to_show_list_part['fields']:1203                                j = re.sub(r'^[^\.]+\.', '', j)1204                                if j in [ dbd_obj.get_field(k) for k in env_db['COMPRESSED_KEYS'] ]:1205                                    # uncompress compressed entries1206                                    if j == dbd_obj.get_field(env_db['2D_STRUC_KEY']):1207                                        if session.removeh:1208                                            mol = lm.mymol('mol', zlib.decompress(r[j])).removeh()1209                                        else:1210                                            mol = zlib.decompress(r[j])1211                                        tmpd[j] = mol1212                                    else:1213                                        tmpd[j] = zlib.decompress(r[j])1214                                elif j == pri_field:1215                                    mol_id = r[j]1216                                    tmpd[j] = mol_id1217                                else:1218                                    tmpd[j] = r[j]1219                            if mol_id and simi_values and search_type == "3":1220                                tmpd[simi_field] = simi_values[mol_id]1221                            # l contains the mol info, each mol in a sublist: [ [...], [...] ]1222                            results_of_get.append(tmpd)1223                        if search_type == '3':1224                            table_heads = fields_to_show_list_part['comments'] + ['simi value']1225                            part_fields = fields_to_show_list_part['fields'] + [simi_field]1226                            part_fields = [ re.sub(r'^[^\.]+\.', '', i) for i in part_fields ]1227                            # sort the results on similarity1228                            if simi_values:1229                                results_of_get.sort( lambda e1, e2: - cmp(e1[simi_field], e2[simi_field]) )1230                            for i in results_of_get:1231                                i.update({simi_field: str(round(i[simi_field]*100, 2)) + '%'})1232                    stored_results_of_get = {1233                            'results_of_get': results_of_get,1234                            'part_fields': part_fields,1235                            'pri_and_struc_fields': {'pri_field': pri_field, 'struc_field': struc_field},1236                            'table_heads': table_heads,1237                            'query_info': query_info1238                            }1239                    # store the results1240                    f = open(results_of_get_file, 'w')1241                    pickle.dump(stored_results_of_get, f)1242                    f.close()1243                # results about to display1244                query_info = stored_results_of_get['query_info']1245                db_name = query_info['query_db']1246                query_mols_dic = query_info['query_mols_dic']1247                page = int(input.get('page'))1248                results_of_get = stored_results_of_get['results_of_get']1249                len_results = len(results_of_get)1250                # calculate the page thing1251                pages = ( lambda x, y: x % y and x / y + 1 or x / y ) (len_results, self.results_per_page)1252                show_range_left = self.results_per_page * (page - 1)1253                if show_range_left > len_results:1254                    show_range_left = len_results - ( len_results % self.results_per_page )1255                show_range_right = self.results_per_page * page1256                # store the results in a dict1257                results_dic.update({1258                    'result_list': results_of_get[show_range_left:show_range_right],1259                    'len_results': len_results,1260                    'db': db_name,1261                    'table_heads': stored_results_of_get['table_heads'],1262                    'part_fields': stored_results_of_get['part_fields'],1263                    'pri_and_struc_fields': stored_results_of_get['pri_and_struc_fields'],1264                    'max_results_num': query_info['max_results_num'],1265                    'pages': pages,1266                    'page': page,1267                    'min_simi': str(round(query_info['min_simi'], 2)),1268                    'search_mol': query_info['query_mol'],1269                    'query_mols_dic': query_info['query_mols_dic'],1270                    'query_id': md5_query,1271                    'time_consumed': round(query_info['time_consumed'], 2),1272                    'adv_search_query': query_info['adv_search_query'],1273                    'results_search_type': query_info['query_mode'],1274                    'last_query_id': query_info['last_query_id'],1275                    'last_db': query_info['last_db']1276                    })1277        except Exception, e:1278            results_dic['info'].append('check your query')1279            results_dic['info'].append(e)1280            return json.dumps(results_dic)1281        for key in ('mode_selected', 'db_selected',1282                'links', 'urls_dic', 'table_heads',1283                'last_db', 'part_fields', 'pri_and_struc_fields',1284                'lang', 'trans_class', 'info_head',1285                'html_title', 'current_page'):1286            self.results_dic.pop(key)1287        return json.dumps(results_dic)1288    def POST(self, name = ''):1289        # search types: 1: exact search, 2: substructure search,1290        # 3: similarity search, 4: advanced search, 5: superstructure search1291        input_raw = web.input()1292        if input_raw.has_key('query_format'):1293            qfmt = input_raw.get('query_format')1294            if qfmt == 'json' and input_raw.has_key('json'):1295                input = json.loads(input_raw.get('json'))1296            else:1297                input = input_raw1298        else:1299            input = input_raw1300        results_dic = tpl_results_dic.copy()1301        # check basic permission1302        if not is_permited(session.userid, session.usergroup, ('s')):1303            return json.dumps({'status': 'Permition denied!'})1304        query_smiles = ''1305        adv_search_query = ''1306        query_smiles_dic = {}1307        query_mols_dic = {}1308        max_results_num_from_query = 01309        # for similarity search1310        min_simi = 01311        simi_values_dic = {}1312        regex0 = ''1313        regex1 = ''1314        regex2 = ''1315        last_mol_ids = []1316        last_query_id = ''1317        last_db = ''1318        available_dbs = PUBLIC_DBS1319        if session.authorized:1320            available_dbs = DATABASES1321            self.results_dic.update({'dbs': available_dbs})1322        try:1323            if input.has_key('results_per_page'):1324                self.results_per_page = int(input.get('results_per_page'))1325                session.results_per_page = int(input.get('results_per_page'))1326            if input.has_key('max_results_num'):1327                self.max_results_num = int(input.get('max_results_num'))1328                session.max_results_num = int(input.get('max_results_num'))1329            if input.has_key('mol'):1330                search_mol = str(input.get('mol'))1331                session.search_mol = str(input.get('mol'))1332            else:1333                search_mol = ''1334            search_type = session.search_mode1335            if input.has_key('mode') and input.get('mode'):1336                if input.get('mode') in SEARCH_MODES:1337                    search_type = input.get('mode')1338                    session.search_mode = input.get('mode')1339                else:1340                    return json.dumps({'status': 'Invalid mode: %s' %input.get('mode')})1341            # chose which database to use1342            if input.has_key('db') and input.get('db'):1343                db_name = input.get('db')1344                if db_name not in available_dbs:1345                    return json.dumps({'status': 'DB not accessible!'})1346                session.db = input.get('db')1347            else:1348                db_name = session.db1349            env_db = ENVS[db_name]1350            sql_obj = lm.sql(env_db)1351            dbd_obj = dbd_objs[db_name]1352            tables = dbd_obj.tables1353            pri_field = env_db['PRI_FIELD']1354            smi_field = dbd_obj.get_field(env_db['SMILES_KEY'])1355            simi_field = env_db['SIMI_FIELD']1356            # in advanced mode, there could be more than one smiles and mols separated with "|".1357            if search_type == "4":1358                if input.has_key('query') and input.get('query'):1359                    adv_search_query = query_preprocessing(str(input.get('query'))) + ' ' # add a space at the end for regex match1360                    # recover escaped ' and ", for ' and " are legal in mode "4"1361                    adv_search_query = re.sub(r'\\[\'\"]', '"', adv_search_query)1362                else:1363                    return json.dumps({'status': 'Query imcomplete!'})1364                # get the smiless and mols from the input of advanced mode1365                query_smiles_dic = {}1366                query_mols_dic = {}1367                if input.has_key('smiless') and input.get('smiless'):1368                    for j in [ i for i in query_preprocessing(str(input.get('smiless'))).split('|') if i and i != '|' ]:1369                        tmpl = j.split(':')1370                        if len(tmpl) == 2:1371                            query_smiles_dic[tmpl[0]] = tmpl[1]1372                elif input.has_key('mols') and input.get('mols'):1373                    for j in [ i for i in query_preprocessing(str(input.get('mols'))).split('|') if i and i != '|' ]:1374                        tmpl = j.split(':')1375                        if len(tmpl) == 2:1376                            query_mols_dic[tmpl[0]] = tmpl[1]1377                # store in session1378                if query_smiles_dic:1379                    session.query_smiles_dic = query_smiles_dic1380                elif query_mols_dic:1381                    for k, v in query_smiles_dic.items():1382                        query_smiles_dic[k] = lm.mymol('mol', v).mol.write('smi')1383                if query_smiles_dic:1384                    session.query_smiles_dic = query_smiles_dic1385                if query_mols_dic:1386                    session.query_mols_dic = query_mols_dic1387                # check if the query legal1388                # first check if there are key words what are not in the abbr_dic1389                regex0 = re.compile(r'([><=!~]+ *[^ )(]+[ )(]*)|([)(])|([sS][uU][bBpP])|([mM][aA][xX])|([aA][nN][dD])|([oO][rR])|([nN][oR][tT])')1390                key_words = []1391                key_words = list(set(regex0.sub(' ', adv_search_query).split()))1392                for k in key_words:1393                    if not k in env_db['ABBR_DIC'].keys():1394                        return json.dumps({'status': 'Contains illegal words!'})1395                # second check if the mol buffer contains all needed molecules1396                regex1 = re.compile(r'[sS][uU][bBpP] *[!=]+ *[^ )(]*(?=[ )(]+)')1397                mol_key = ''1398                for i in regex1.findall(adv_search_query):1399                    mol_key = i.split('=')[-1].strip(' ')1400                    if not ( query_smiles_dic.has_key(mol_key) and query_smiles_dic[mol_key] ):1401                        return json.dumps({'status': 'Mol buffer imcomplete!'})1402                # replace some words (~ to like) and abstract the max (limit) value if it has1403                new_adv_search_query = adv_search_query.replace('~', ' LIKE ')1404                regex2 = re.compile(r'[mM][aA][xX] *=+ *[^ )(]*(?=[ )(]+)')1405                tmpl = regex2.findall(new_adv_search_query)1406                if len(tmpl) == 1:1407                    try:1408                        max_results_num_from_query = int(tmpl[0].split('=')[-1].strip(' '))1409                    except:1410                        pass1411                new_adv_search_query = regex2.sub('', new_adv_search_query)1412                try:1413                    query_sql = sql_obj.gen_adv_search_sql(new_adv_search_query, query_smiles_dic, env_db['ABBR_DIC'])1414                except Exception, e:1415                    return json.dumps({'status': 'Query error: %s' %str(e)})1416            elif (input.has_key('mol') and input.get('mol')) or (input.has_key('smiles') and input.get('smiles')):1417                if input.has_key('smiles') and input.get('smiles'):1418                    query_smiles = input.get('smiles')1419                else:1420                    query_smiles = query_preprocessing(lm.mymol('mol', str(input.get('mol'))).mol.write('smi'))1421                if search_type == "3":1422                    if input.has_key('min_simi') and input.get("min_simi"):1423                        try:1424                            min_simi = float(input.get("min_simi"))1425                        except:1426                            return json.dumps({'status': 'Min simi contains illegal char!'})1427                    else:1428                        return json.dumps({'status': 'Query imcomplete!'})1429                    query_sql = sql_obj.gen_simi_search_sql(query_smiles, min_simi)1430                else:1431                    if search_type == '1':1432                        type = '1'1433                    elif search_type == '2':1434                        type = '2'1435                    elif search_type == '5':1436                        type = '4'1437                    query_sql = sql_obj.gen_search_sql(query_smiles, type)1438            else:1439                return json.dumps({'status': 'Query imcomplete!'})1440            db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])1441            results = []1442            results_tmp = True1443            # search in results1444            if input.has_key('search_in_results') and input.get('search_in_results'):1445                last_results_of_post_file = CACHE_PATH + '/' + input.get('search_in_results') + '.post'1446                if os.path.exists(last_results_of_post_file):1447                    f = open(last_results_of_post_file)1448                    last_results_of_post = pickle.load(f)1449                    f.close()1450                    last_mol_ids = last_results_of_post['query_results_ids']1451                    last_query_info = last_results_of_post['query_info']1452                    last_query_id = last_query_info['query_id']1453                    last_db = last_query_info['query_db']1454            # generates the sql string for query1455            if last_mol_ids:1456                query_string = 'SELECT %s, %s %s AND %s IN (%s)' % (1457                        tables[0] + '.' + pri_field,1458                        smi_field,1459                        query_sql,1460                        tables[0] + '.' + pri_field,1461                        ', '.join(last_mol_ids) )1462            else:1463                query_string = 'SELECT %s, %s %s' % (1464                        tables[0] + '.' + pri_field,1465                        smi_field,1466                        query_sql)1467            # check if there's already a results file1468            md5_query = md5(query_string + db_name + search_type)1469            session.md5_query = md5_query1470            results_of_post_file = CACHE_PATH + '/' + md5_query + '.post'1471            lock_file = results_of_post_file + '.lock'1472            if os.path.exists(results_of_post_file):1473                if time.time() - os.path.getmtime(results_of_post_file) >= RESULTS_FILE_TIME_OUT:1474                    os.remove(results_of_post_file)1475                else:1476                    return json.dumps({'status': 'OK',1477                        'result_url': URLS_DIC['webapi_url'] + '?page=1&db=%s&query_id=%s' %(db_name, md5_query)})1478            # check if the lock file exists, if exists then wait, else continue.1479            while os.path.exists(lock_file):1480                # check if the life span of lock_file reached1481                if time.time() - os.path.getmtime(lock_file) >= LOCK_FILE_LIFE_SPAN:1482                    os.remove(lock_file)1483                else:1484                    time.sleep(5)1485            # define filters1486            filter = None1487            if search_type == "1":1488                def filter(results_dict, mol_obj = lm.mymol('smi', query_smiles), smiles_field = smi_field):1489                    if results_dict.has_key(smiles_field):1490                        return mol_obj.gen_openbabel_can_smiles() == results_dict[smiles_field]1491                    return False1492            elif search_type == "2":1493                def filter(results_dict, mol_obj = lm.mymol('smi', query_smiles), smiles_field = smi_field):1494                    if results_dict.has_key(smiles_field):1495                        return mol_obj.sub_match('smi', results_dict[smiles_field])1496                    return False1497            elif search_type == "3":1498                # similarity search actually needs no filter.1499                pass1500            elif search_type == '4':1501                sub_smiles = []1502                sup_smiles = []1503                sub_m_objs = []1504                sup_m_objs = []1505                re_sub = re.compile(r'[sS][uU][bB] *[!=]+ *[^ )(]*(?=[ )(]+)')1506                re_sup = re.compile(r'[sS][uU][pP] *[!=]+ *[^ )(]*(?=[ )(]+)')1507                for i in re_sub.findall(adv_search_query):1508                    # for querying for a molecule contains no a paticular substructure always1509                    # filters out some positive ones, so it's no need to filter here any more,1510                    # hence we exclude those '!=' ones here.1511                    if re.findall(r'!=', i):1512                        continue1513                    elif re.findall(r'[^!]=', i):1514                        mol_key = i.split('=')[-1].strip(' ')1515                        if query_smiles_dic.has_key(mol_key) and query_smiles_dic[mol_key]:1516                            sub_smiles.append(query_smiles_dic[mol_key])1517                for i in re_sup.findall(adv_search_query):1518                    # for querying for the superstructure of a paticular molecule always1519                    # filters out some positive ones, so it's no need to filter here any more,1520                    # hence we exclude those '!=' ones here.1521                    if re.findall(r'!=', i):1522                        continue1523                    elif re.findall(r'[^!]=', i):1524                        mol_key = i.split('=')[-1].strip(' ')1525                        if query_smiles_dic.has_key(mol_key) and query_smiles_dic[mol_key]:1526                            sup_smiles.append(query_smiles_dic[mol_key])1527                sub_m_objs = [ lm.mymol('smi', m) for m in sub_smiles ]1528                sup_m_objs = [ lm.mymol('smi', m) for m in sup_smiles ]1529                # filter is only needed to define when the m_objs list is not empty.1530                if sub_m_objs or sup_m_objs:1531                    def filter(results_dict,1532                            sub_mol_objs = sub_m_objs,1533                            sup_mol_objs = sup_m_objs,1534                            smiles_field = smi_field):1535                        if results_dict.has_key(smiles_field):1536                            for i in sub_mol_objs:1537                                if not i.sub_match('smi', results_dict[smiles_field]):1538                                    return False1539                            for i in sup_mol_objs:1540                                if not i.sup_match('smi', results_dict[smiles_field]):1541                                    return False1542                            return True1543                        return False1544            elif search_type == '5':1545                def filter(results_dict, mol_obj = lm.mymol('smi', query_smiles), smiles_field = smi_field):1546                    if results_dict.has_key(smiles_field):1547                        return mol_obj.sup_match('smi', results_dict[smiles_field])1548                    return False1549            # limit the results1550            if max_results_num_from_query:1551                num_per_select = 1501552                max_results_num = max_results_num_from_query1553            elif search_type == '3':1554                max_results_num = 101555            else:1556                num_per_select = 1501557                max_results_num = self.max_results_num1558            # search in database and filter the reuslts1559            # record time consuming1560            time_before_search = time.time()1561            if search_type in ('1', '3'):1562                if search_type == '1':1563                    limit = '' #' LIMIT 1'1564                elif search_type == '3':1565                    limit = ' LIMIT %s ' %(max_results_num,)1566                # set lock to avoid duplocated search1567                open(lock_file, 'w')1568                results = db_obj.execute(query_string + limit + ';')1569                if os.path.exists(lock_file):1570                    os.remove(lock_file)1571                db_obj.close()1572            elif search_type in ('2', '4', '5'):1573                # set lock to avoid duplocated search1574                open(lock_file, 'w')1575                results = db_obj.query(1576                        query_string,1577                        filter,1578                        tables[0] + '.' + pri_field,1579                        max_results_num,1580                        num_per_select)1581                if os.path.exists(lock_file):1582                    os.remove(lock_file)1583                db_obj.close()1584            time_consumed = time.time() - time_before_search1585            # preprocessing the results to store1586            # cut of the extra results1587            results = results[:max_results_num]1588            results_dic_to_store = {}1589            mol_id_to_store = []1590            query_info = {}1591            for r in results:1592                if r.has_key(pri_field):1593                    id = r[pri_field]1594                    mol_id_to_store.append(str(id))1595                    if r.has_key(simi_field):1596                        simi_values_dic[id] = r[simi_field]1597            query_info = {1598                    'query_mols_dic': query_mols_dic,1599                    'query_smiles_dic': query_smiles_dic,1600                    'query_smiles': query_smiles,1601                    'query_mol': search_mol,1602                    'query_string': query_string,1603                    'query_db': db_name,1604                    'max_results_num': max_results_num,1605                    'adv_search_query': adv_search_query,1606                    'query_mode': search_type,1607                    'min_simi': min_simi,1608                    'time_consumed': time_consumed,1609                    'last_query_id': last_query_id,1610                    'last_db': last_db,1611                    'query_id': md5_query1612                    }1613            results_dic_to_store = {1614                    'query_results_ids': mol_id_to_store,1615                    'simi_values': simi_values_dic,1616                    'query_info': query_info1617                    }1618            # store search results1619            f = open(results_of_post_file, 'w')1620            pickle.dump(results_dic_to_store, f)1621            f.close()1622            return json.dumps({'status': 'OK',1623                'result_url': URLS_DIC['webapi_url'] + '?page=1&db=%s&query_id=%s' %(db_name, md5_query)})1624        except Exception, e:1625            return json.dumps({'status': 'Query error: %s' %str(e)})1626class molinfo:1627    def __init__(self):1628        # set the target language to session.lang1629        trans_class.lang = session.lang1630        self.results_dic = tpl_results_dic.copy()1631        self.results_dic.update( {1632            'logged_user': session.nickname,1633            'trans_class': trans_class,1634            'mode_selected': is_selected(str(session.search_mode)),1635            'html_title': 'mol info',1636            'info': [],1637            'show_edit': False,1638            'current_page': 'search',1639            'lang': session.lang1640            } )1641    def GET(self, name = ''):1642        input = web.input()1643        results_dic = self.results_dic1644        available_dbs = PUBLIC_DBS1645        if session.authorized:1646            available_dbs = DATABASES1647            self.results_dic.update({'dbs': available_dbs})1648        # check basic permission1649        if not is_permited(session.userid, session.usergroup, ('s')):1650            return html_no_permision(results_dic)1651        # chose which database to use1652        if input.has_key('db') and input.get('db'):1653            db_name = input.get('db')1654            if db_name not in available_dbs:1655                return html_wrong_db(results_dic)1656            session.db = db_name1657        else:1658            db_name = session.db1659        env_db = ENVS[db_name]1660        sql_obj = lm.sql(env_db)1661        sql_fields_dic = env_db['FIELDS_TO_SHOW_DIC']1662        fields_to_show_list_all = sql_fields_dic['all']1663        fields_to_show_list_part = sql_fields_dic['part']1664        dbd_obj = dbd_objs[db_name]1665        tables = dbd_obj.tables1666        pri_field = env_db['PRI_FIELD']1667        smi_field = dbd_obj.get_field(env_db['SMILES_KEY'])1668        simi_field = env_db['SIMI_FIELD']1669        try:1670            if input.has_key('mol_id'):1671                mol_id = query_preprocessing(str(input.get('mol_id')))1672                if not re.match('^[0-9]+$', mol_id):1673                    return html_query_illegal(results_dic)1674                db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])1675                query_string = 'SELECT %s FROM %s WHERE ( %s = %s ) LIMIT 1;' % (1676                        ', '.join(fields_to_show_list_all['fields']),1677                        sql_obj.gen_join_part_sql(tables, pri_field),1678                        tables[0] + '.' + pri_field,1679                        mol_id )1680                results = db_obj.execute(query_string)1681                db_obj.close()1682                if not results:1683                    results_dic['info'].append('no mol to show')1684                    return html_info(results_dic)1685                else:1686                    results = results[0]1687                    # change the decimal submiter id to nick name1688                    submiter_field = dbd_obj.get_field(env_db['SUBMITER_KEY'])1689                    if results.has_key(submiter_field):1690                        submiter_id = results.get(submiter_field)1691                        if not submiter_id:1692                            submiter_id = "''"1693                        env_dbuser = ENVS[USERSDB]1694                        dbuser_obj = lm.database(env_dbuser['HOST'], env_dbuser['USER'], env_dbuser['PASSWORD'], env_dbuser['DBNAME'])1695                        results_user = dbuser_obj.execute('SELECT %s FROM %s WHERE (%s = %s) LIMIT 1;' % (1696                            env_dbuser['NICK_FIELD'],1697                            env_dbuser['USERS_TABLE'],1698                            env_dbuser['USER_ID_FIELD'],1699                            submiter_id) )1700                        dbuser_obj.close()1701                        if results_user and results_user[0].has_key(env_dbuser['NICK_FIELD']):1702                            results[submiter_field] = results_user[0][env_dbuser['NICK_FIELD']]1703                    # uncompress those compressed entries and add comments to the results_dic for display1704                    tmpl = []1705                    mol = ''1706                    fields = fields_to_show_list_all['fields']1707                    comments = fields_to_show_list_all['comments']1708                    for j in xrange(len(fields)):1709                        i = fields[j]1710                        i = re.sub(r'^[^\.]+\.', '', i)1711                        if i in [ dbd_obj.get_field(k) for k in env_db['COMPRESSED_KEYS'] ]:1712                            # uncompress compressed entries1713                            if i == dbd_obj.get_field(env_db['2D_STRUC_KEY']):1714                                if session.removeh:1715                                    mol = lm.mymol('mol', zlib.decompress(results[i])).removeh()1716                                else:1717                                    mol = zlib.decompress(results[i])1718                            else:1719                                tmpl.append([comments[j], zlib.decompress(results[i])])1720                        else:1721                            tmpl.append([comments[j], results[i]])1722                    # show different interface to different groups1723                    if db_name in EDITABLE_DBS and session.authorized and \1724                            is_permited(session.userid, session.usergroup, ('u'), submiter_id):1725                        results_dic['show_edit'] = True1726                    results_dic['result_list'] = tmpl1727                    results_dic['mol'] = mol1728                    results_dic['mol_id'] = mol_id1729                    results_dic['db'] = db_name1730                    return html_molinfo(results_dic)1731            else:1732                return html_query_imcomplete(results_dic)1733        except Exception, e:1734            results_dic['info'].append('check your query')1735            results_dic['info'].append(e)1736            results_dic['html_title'] = 'query err'1737            return html_info(results_dic)1738class edit:1739    def __init__(self):1740        # set the target language to session.lang1741        trans_class.lang = session.lang1742        self.results_dic = tpl_results_dic.copy()1743        self.results_dic.update( {1744            'logged_user': session.nickname,1745            'trans_class': trans_class,1746            'html_title': 'edit',1747            'dbs': EDITABLE_DBS,1748            'mol_id': None,1749            'show_del': False,1750            'current_page': 'edit',1751            'calculable_keys': [],1752            'submiter_nick': '',1753            'needed_keys': [],1754            'info': [],1755            'lang': session.lang1756            } )1757    def GET(self, name = ''):1758        input = web.input()1759        results_dic = self.results_dic1760        # check basic permission1761        if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s', 'i')) ):1762            return html_no_permision(results_dic)1763        # chose which database to use1764        if input.has_key('db') and input.get('db'):1765            db_name = input.get('db')1766            if ( db_name not in EDITABLE_DBS ) or ( not db_name in EDITABLE_DBS ):1767                return html_wrong_db(results_dic)1768            session.db = db_name1769        else:1770            db_name = EDITABLE_DBS[0]1771        submiter_id = session.userid1772        results_dic = self.results_dic1773        needed_keys = []1774        env_db = ENVS[db_name]1775        dbd_obj = dbd_objs[db_name]1776        sql_obj = lm.sql(env_db)1777        pri_field = env_db['PRI_FIELD']1778        tables = dbd_obj.tables1779        # keys of the molecular properties that can be calculated (with out fingerprints keys)1780        calculable_keys = lm.mymol.mol_data_keys() + lm.mymol.mol_stat_keys()1781        for k in dbd_obj.comments_and_keys_list(1782                [env_db['FP_TABLE']],1783                [env_db['2D_STRUC_KEY'],1784                env_db['SUBMITER_KEY']] + env_db['PRI_KEYS']):1785            if not k[1] in calculable_keys:1786                if not k[0]:1787                    k[0] = k[1]1788                needed_keys.append(k)1789        # uniq needed_keys1790        tmpl = []1791        for i in needed_keys:1792            if not i in tmpl:1793                tmpl.append(i)1794        needed_keys = tmpl1795        # edit an exist molecule1796        if input.has_key('mol_id'):1797            mol_id = query_preprocessing(str(input.get('mol_id')))1798            if not re.match('^[0-9]+$', mol_id):1799                return html_query_illegal(results_dic)1800            db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])1801            query_string = 'SELECT * FROM %s WHERE ( %s = %s ) LIMIT 1;' % (1802                    sql_obj.gen_join_part_sql(tables, pri_field),1803                    tables[0] + '.' + pri_field,1804                    mol_id )1805            results = db_obj.execute(query_string)1806            db_obj.close()1807            # check if the molecule exists and if the user has permission to edit it.1808            if results:1809                results = results[0]1810                results_dic['mol_id'] = mol_id1811                submiter_field = dbd_obj.get_field(env_db['SUBMITER_KEY'])1812                if results.has_key(submiter_field):1813                    submiter_id = results.get(submiter_field)1814                # check the edit molecule permission (mast has 'u' in his permission list)1815                if not is_permited(session.userid, session.usergroup, ('u'), submiter_id):1816                    results_dic['info'] += ['can not edit mol', ': ' + mol_id]1817                    results_dic['mol_id'] = ''1818                else:1819                    # congrats, you can edit the molecule, and we then prepare the mol info for display.1820                    # if you can edit the molecule, then you can also delete it1821                    results_dic['show_del'] = True1822                    # change the decimal submiter id to nick name1823                    env_dbuser = ENVS[USERSDB]1824                    dbuser_obj = lm.database(env_dbuser['HOST'], env_dbuser['USER'], env_dbuser['PASSWORD'], env_dbuser['DBNAME'])1825                    results_user = dbuser_obj.execute('SELECT %s FROM %s WHERE (%s = %s) LIMIT 1;' % (1826                        env_dbuser['NICK_FIELD'],1827                        env_dbuser['USERS_TABLE'],1828                        env_dbuser['USER_ID_FIELD'],1829                        submiter_id) )1830                    dbuser_obj.close()1831                    if results_user and results_user[0].has_key(env_dbuser['NICK_FIELD']):1832                        results_dic['submiter_nick'] = results_user[0][env_dbuser['NICK_FIELD']]1833                    # uncompress compressed entries1834                    compressed_fields = [ dbd_obj.get_field(k) for k in env_db['COMPRESSED_KEYS'] ]1835                    for i in compressed_fields:1836                        if i == dbd_obj.get_field(env_db['2D_STRUC_KEY']):1837                            if session.removeh:1838                                mol = results[i] = lm.mymol('mol', zlib.decompress(results[i])).removeh()1839                            else:1840                                mol = results[i] = zlib.decompress(results[i])1841                            results_dic['mol'] = mol1842                        else:1843                            results[i] = zlib.decompress(results[i])1844                    # store the mol info into needed_keys1845                    for i in needed_keys:1846                        field = dbd_obj.get_field(i[1])1847                        if field:1848                            for k, v in results.items():1849                                if field == k:1850                                    i.append(v)1851                                    continue1852            else:1853                # molecule with the id of the given mol_id dose not exist, change mode to "add mol"1854                results_dic['info'] += ['no mol to show', ': ' + mol_id]1855                results_dic['mol_id'] = ''1856        results_dic['needed_tuple'] = needed_keys1857        results_dic['db'] = db_name1858        results_dic['db_selected'] = is_selected(db_name)1859        return html_edit(results_dic)1860    def POST(self, name = ''):1861        input = web.input()1862        values_dict = input.copy()1863        results_dic = self.results_dic1864        # check basic permission, 'i' permits add new molecule, while 'u' edit an exist molecule.1865        if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s', 'i')) ):1866            return html_no_permision(results_dic)1867        # chose which database to use1868        if input.has_key('db') and input.get('db'):1869            db_name = input.get('db')1870            session.db = db_name1871        else:1872            db_name = EDITABLE_DBS[0]1873        if ( db_name not in EDITABLE_DBS ) or ( not db_name in EDITABLE_DBS ):1874            return html_wrong_db(results_dic)1875        env_db = ENVS[db_name]1876        sql_obj = lm.sql(env_db)1877        dbd_obj = dbd_objs[db_name]1878        db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])1879        pri_field = env_db['PRI_FIELD']1880        tables = dbd_obj.tables1881        mol_id = ''1882        smiles = ''1883        add_mol = False1884        submiter_id = session.userid1885        submiter_field = dbd_obj.get_field(env_db['SUBMITER_KEY'])1886        try:1887            # smiles from the web input is needed1888            if not (input.has_key('SMILES') and input.get('SMILES')):1889                results_dic['info'].append('needed entry imcomplete')1890                results_dic['html_title'] = 'needed entry imcomplete'1891                return html_info(results_dic)1892            else:1893                smiles = query_preprocessing(str(input.get('SMILES')))1894                if input.has_key('mol_id') and input.get('mol_id'):1895                    mol_id = query_preprocessing(input.get('mol_id'))1896                    if not re.match('^[0-9]+$', mol_id):1897                        return html_query_illegal(results_dic)1898                else:1899                    try:1900                        # the id of new added mol is the max id plus one.1901                        mol_id = db_obj.execute('SELECT max(%s) AS max_mol_id FROM %s;' %(pri_field, tables[0]))[0]['max_mol_id']1902                        mol_id = str(int(mol_id) + 1)1903                        # add new molecule. those has permission of executing 'i' action can do this.1904                        add_mol = True1905                    except:1906                        pass1907                if not (mol_id and smiles):1908                    results_dic['info'].append('needed entry imcomplete')1909                    results_dic['html_title'] = 'needed entry imcomplete'1910                    return html_info(results_dic)1911                else:1912                    # check if one has the permission to edit the molecule ( user not in group 1 can only1913                    # insert new ones or update those submited by himself)1914                    if not add_mol:1915                        # if you are here, then you are editing an exist molecule, only those1916                        # has the permission of executing 'u' action can do this.1917                        # find the submiter of the molecule with the id of mol_id1918                        query_string = 'SELECT %s FROM %s WHERE ( %s = %s ) LIMIT 1;' % (1919                                submiter_field,1920                                dbd_obj.get_table_name(env_db['SUBMITER_KEY']),1921                                pri_field,1922                                mol_id )1923                        results = db_obj.execute(query_string)1924                        if results:1925                            results = results[0]1926                            if results.has_key(submiter_field):1927                                submiter_id = results.get(submiter_field)1928                        # check the edit exist molecule permission1929                        if not is_permited(session.userid, session.usergroup, ('u'), submiter_id):1930                            results_dic['html_title'] = 'permission denied'1931                            results_dic['info'] += ['<br/>', 'can not edit mol', ': ' + mol_id]1932                            return html_info(results_dic)1933                    for id_key in env_db['PRI_KEYS']:1934                        values_dict[id_key] = mol_id1935                    # the submiter id should keep still1936                    # caculates the caculable properties1937                    tmp_dict = {}1938                    mol = lm.mymol('smi', smiles)1939                    mol.get_mol_data(tmp_dict)1940                    mol.get_fps(tmp_dict)1941                    mol.get_mol_stat(tmp_dict)1942                    values_dict.update(tmp_dict)1943                    # store the new info of the mol1944                    for sql_string in sql_obj.gen_insert_sqls(values_dict, 'REPLACE').values():1945                       db_obj.execute(sql_string + ';')1946                    db_obj.close()1947                    results_dic['html_title'] = 'edit finished'1948                    results_dic['info'] += ['edit mol finished',1949                            '<a href="%s?db=%s&mol_id=%s">%s</a>' %(URLS_DIC['molinfo_url'], db_name, mol_id, mol_id)]1950                    return html_info(results_dic)1951        except Exception, e:1952            results_dic['info'].append('check your query')1953            results_dic['info'].append(e)1954            results_dic['html_title'] = 'query err'1955            return html_info(results_dic)1956class delmol:1957    '''1958    this class deletes the molecule from the database1959    '''1960    def __init__(self):1961        trans_class.lang = session.lang1962        self.results_dic = tpl_results_dic.copy()1963        self.results_dic.update( {1964            'logged_user': session.nickname,1965            'trans_class': trans_class,1966            'html_title': 'del mol',1967            'info': [],1968            'current_page': 'search',1969            'lang': session.lang1970            } )1971    def GET(self, name = ''):1972        input = web.input()1973        results_dic = self.results_dic1974        available_dbs = PUBLIC_DBS1975        if session.authorized:1976            available_dbs = DATABASES1977            self.results_dic.update({'dbs': available_dbs})1978        # check basic permission1979        if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s', 'i')) ):1980            return html_no_permision(results_dic)1981        # chose which database to use1982        if input.has_key('db') and input.get('db'):1983            db_name = input.get('db')1984            if ( db_name not in available_dbs ) or ( not db_name in EDITABLE_DBS ):1985                return html_wrong_db(results_dic)1986            session.db = db_name1987        else:1988            db_name = session.db1989        env_db = ENVS[db_name]1990        sql_obj = lm.sql(env_db)1991        dbd_obj = dbd_objs[db_name]1992        pri_field = env_db['PRI_FIELD']1993        tables = dbd_obj.tables1994        submiter_id = session.userid1995        submiter_field = dbd_obj.get_field(env_db['SUBMITER_KEY'])1996        empty_mol_list = []1997        del_failed_mol_list = []1998        del_finished_mol_list = []1999        if input.has_key('mol_id') and input.get('mol_id'):2000            mol_id_list = [ id for id in query_preprocessing(input.get('mol_id')).split('|') if id ]2001            for id in mol_id_list:2002                if not re.match('^[0-9]+$', id):2003                    return html_query_illegal(results_dic)2004            db_obj = lm.database(env_db['HOST'], env_db['USER'], env_db['PASSWORD'], env_db['DBNAME'])2005            for mol_id in mol_id_list:2006                submiter_id = ''2007                # find the submiter of the molecule with the id of mol_id2008                query_string = 'SELECT %s FROM %s WHERE ( %s = %s ) LIMIT 1;' % (2009                        submiter_field,2010                        dbd_obj.get_table_name(env_db['SUBMITER_KEY']),2011                        pri_field,2012                        mol_id )2013                results = db_obj.execute(query_string)2014                if results:2015                    results = results[0]2016                    if results.has_key(submiter_field):2017                        submiter_id = results.get(submiter_field)2018                else:2019                    empty_mol_list.append(mol_id)2020                    continue2021                # check if you can delete a molecule2022                if not is_permited(session.userid, session.usergroup, ('u'), submiter_id):2023                    del_failed_mol_list.append(mol_id)2024                    continue2025                else:2026                    for t in tables:2027                        db_obj.execute("DELETE FROM %s WHERE (%s = %s);" %(t, pri_field, mol_id))2028                    del_finished_mol_list.append(mol_id)2029            db_obj.close()2030            results_dic['html_title'] = 'del report'2031            if del_failed_mol_list:2032                # report the deleting failed molecules2033                mol_url_list = []2034                for id in del_failed_mol_list:2035                    mol_url_list.append('<br/><a href="%s?db=%s&mol_id=%s">%s</a>' %(URLS_DIC['molinfo_url'], db_name, id, id))2036                results_dic['info'] += ['del mol failed', ':', '\n'.join(mol_url_list)]2037            if empty_mol_list:2038                for id in empty_mol_list:2039                    results_dic['info'] += ['<br/>', 'empty mol entry', '<br/>' + id]2040            if del_finished_mol_list:2041                mol_url_list = []2042                for id in del_finished_mol_list:2043                    results_dic['info'] += ['<br/>', 'del mol', id, 'completed']2044            return html_info(results_dic)2045        else:2046            results_dic['info'].append('mol id not post')2047            results_dic['html_title'] = 'needed entry imcomplete'2048            return html_info(results_dic)2049class mymols:2050    def __init__(self):2051        trans_class.lang = session.lang2052        self.results_dic = tpl_results_dic.copy()2053        self.results_dic.update( {2054            'logged_user': session.nickname,2055            'trans_class': trans_class,2056            'info': [],2057            'lang': session.lang2058            } )2059    def GET(self, name = ''):2060        input = web.input()2061        results_dic = self.results_dic2062        available_dbs = PUBLIC_DBS2063        if session.authorized:2064            available_dbs = DATABASES2065            self.results_dic.update({'dbs': available_dbs})2066        # check basic permission2067        if ( not session.authorized ) or ( not is_permited(session.userid, session.usergroup, ('s', 'i')) ):2068            return html_no_permision(results_dic)2069        # chose which database to use2070        if input.has_key('db') and input.get('db'):2071            db_name = input.get('db')2072            if db_name not in available_dbs:2073                return html_wrong_db(results_dic)2074            session.db = db_name2075        else:2076            db_name = session.db2077        env_db = ENVS[db_name]2078        submiter_abbr = env_db['SUBMITER_ABBR']2079        return web.seeother('%s?mode=%s&db=%s&prequery=%s' %(2080            URLS_DIC['search_url'], '4', db_name, submiter_abbr + '+%3D+' + str(session.userid)))2081class doc:2082    def __init__(self):2083        # set the target language to session.lang2084        trans_class.lang = session.lang2085        # docments2086        self.results_dic = tpl_results_dic.copy()2087        self.results_dic.update( {2088            'logged_user': session.nickname,2089            'trans_class': trans_class,2090            'html_title': 'mol info',2091            'info': [],2092            'current_page': 'help',2093            'lang': session.lang2094            } )2095        def doc_render(lang, tpl_name, **kwargs):2096            return DOC_RENDER(lang, tpl_name, **kwargs)2097        self.doc_render = doc_render2098    def GET(self, name = ''):2099        input = web.input()2100        results_dic = self.results_dic2101        results_dic['html_title'] = 'help'2102        info_head = ''2103        doc_id = ''2104        # check basic permission2105        if not is_permited(session.userid, session.usergroup, ('s')):2106            return html_no_permision(results_dic)2107        if input.has_key('doc_id') and input.get('doc_id'):2108            doc_id = input.get('doc_id')2109            if doc_id == 'about':2110                results_dic['current_page'] = 'about'2111                results_dic['html_title'] = 'about'2112            f = DOCUMENTS_PATH + '/' + session.lang + '/' + '.html'2113            if not os.path.exists(f):2114                f = DOCUMENTS_PATH + '/' + DEFAULT_LANG + '/' + doc_id + '.html'2115            if os.path.exists(f):2116                info_head = doc_id2117                tmp_line = open(f).readline().rstrip('\n')2118                if re.findall(r'^ *##', tmp_line):2119                    info_head = re.sub(r'^ *##', '', tmp_line)2120                results_dic['info_head'] = info_head2121            else:2122                results_dic['info'].append('failed to open document')2123                results_dic['html_title'] = 'query err'2124                return html_info(results_dic)2125            try:2126                return WEB_RENDER('header', results_dic = results_dic) + \2127                        WEB_RENDER('info', results_dic = results_dic) + \2128                        self.doc_render(session.lang, doc_id) + \2129                        WEB_RENDER('footer', results_dic = results_dic)2130            except SyntaxError:2131                results_dic['info'].append('failed to open document')2132                results_dic['html_title'] = 'query err'2133                return html_info(results_dic)2134            except:2135                try:2136                    return WEB_RENDER('header', results_dic = results_dic) + \2137                            WEB_RENDER('info', results_dic = results_dic) + \2138                            self.doc_render(DEFAULT_LANG, doc_id) + \2139                            WEB_RENDER('footer', results_dic = results_dic)2140                except AttributeError:2141                    results_dic['info'].append('check your query')2142                    results_dic['html_title'] = 'query err'2143                    return html_info(results_dic)2144        else:2145                results_dic['info_head'] = 'documents'2146                help_urls = []2147                title = ''2148                for i in os.listdir(DOCUMENTS_PATH + '/' + session.lang):2149                    try:2150                        f = DOCUMENTS_PATH + '/' + session.lang + '/' + i2151                        title = re.sub('.html$', '', i)2152                        tmp_line = open(f).readline().rstrip('\n')2153                        if re.findall(r'^ *##', tmp_line):2154                            title = re.sub(r'^ *##', '', tmp_line)2155                        help_urls.append((title, URLS_DIC['doc_url'] + '?doc_id=' + re.sub('.html$', '', i)))2156                    except:2157                        pass2158                return WEB_RENDER('header', results_dic = results_dic) + \2159                        WEB_RENDER('info', results_dic = results_dic) + \2160                        WEB_RENDER('doc_links', links = help_urls) + \2161                        WEB_RENDER('footer', results_dic = results_dic)2162class chsettings:2163    '''change setting such as language, search mode and database'''2164    def GET(self, name = ''):2165        input = web.input()2166        info = ''2167        if input.has_key('lang') and input.get('lang'):2168            lang = input.get('lang')2169            if lang in LANGS:2170                session.lang = lang2171                info += 'change language OK.\n'2172        if input.has_key('mode') and input.get('mode'):2173            mode = input.get('mode')2174            if mode in SEARCH_MODES:2175                session.search_mode = mode2176                info += 'change mode OK.\n'2177        if input.has_key('db') and input.get('db'):2178            db = input.get('db')2179            if db in available_dbs:2180                session.db = db2181                info += 'change database OK.\n'2182        return info2183# run site2184if __name__ == "__main__":...test_compare_pop_report.py
Source:test_compare_pop_report.py  
...267        results = get_comparing_populations_report(testParam)268        self.assertEqual(len(results['records']), 4)269        self.assertEqual(results['records'][1]['results']['subject1']['total'], 11)270        self.assertEqual(results['records'][1]['results']['subject2']['total'], -1)271    def test_filters_with_no_results(self):272        testParam = {}273        testParam[Constants.STATECODE] = 'NC'274        testParam[Constants.ASMTYEAR] = 2016275        testParam[filters.FILTERS_PROGRAM_504] = ['NS']276        testParam[filters.FILTERS_PROGRAM_IEP] = ['NS']277        results = get_comparing_populations_report(testParam)278        self.assertEqual(len(results['records']), 2)279    def test_district_view_with_grades(self):280        testParam = {}281        testParam[Constants.STATECODE] = 'NC'282        testParam[Constants.DISTRICTGUID] = '229'283        testParam[Constants.ASMTYEAR] = 2016284        testParam[filters.FILTERS_GRADE] = ['03']285        results = get_comparing_populations_report(testParam)286        self.assertEqual(results['records'][0]['results']['subject1']['total'], 0)287        self.assertEqual(len(results['records']), 1)288    def test_view_with_multi_grades(self):289        testParam = {}290        testParam[Constants.STATECODE] = 'NC'291        testParam[Constants.DISTRICTGUID] = '229'292        testParam[Constants.ASMTYEAR] = 2016293        testParam[filters.FILTERS_GRADE] = ['03', '06', '07', '11']294        results = get_comparing_populations_report(testParam)295        self.assertEqual(results['records'][0]['results']['subject1']['total'], 0)296        self.assertEqual(results['records'][1]['results']['subject1']['total'], 7)297        self.assertEqual(len(results['records']), 3)298    def test_view_with_lep_yes(self):299        testParam = {}300        testParam[Constants.STATECODE] = 'NC'301        testParam[Constants.DISTRICTGUID] = '0513ba44-e8ec-4186-9a0e-8481e9c16206'302        testParam[filters.FILTERS_PROGRAM_LEP] = ['Y']303        testParam[Constants.ASMTYEAR] = 2015304        results = get_comparing_populations_report(testParam)305        self.assertEqual(len(results['records']), 4)306        self.assertEqual(results['records'][0]['results']['subject1']['total'], 3)307    def test_view_with_lep_no(self):308        testParam = {}309        testParam[Constants.STATECODE] = 'NC'310        testParam[Constants.DISTRICTGUID] = '0513ba44-e8ec-4186-9a0e-8481e9c16206'311        testParam[filters.FILTERS_PROGRAM_LEP] = ['N']312        testParam[Constants.ASMTYEAR] = 2015313        results = get_comparing_populations_report(testParam)314        self.assertEqual(len(results['records']), 4)315        self.assertEqual(results['records'][1]['results']['subject1']['total'], 53)316    def test_view_with_lep_multi(self):317        testParam = {}318        testParam[Constants.STATECODE] = 'NC'319        testParam[Constants.DISTRICTGUID] = '0513ba44-e8ec-4186-9a0e-8481e9c16206'320        testParam[filters.FILTERS_PROGRAM_LEP] = ['N', 'Y', 'NS']321        testParam[Constants.ASMTYEAR] = 2015322        results = get_comparing_populations_report(testParam)323        self.assertEqual(len(results['records']), 4)324        self.assertEqual(results['records'][1]['results']['subject1']['total'], 55)325    def test_comparing_populations_min_cell_size(self):326        testParam = {}327        testParam[Constants.STATECODE] = 'NC'328        testParam[Constants.DISTRICTGUID] = '229'329        testParam[filters.FILTERS_ETHNICITY] = [filters.FILTERS_ETHNICITY_HISPANIC]330        # TODO: Fix this when metadata has the correct value set331        # We probably don't need to set the default min cell size after we set a value in csv332#        set_default_min_cell_size(5)333#        results = get_comparing_populations_report(testParam)334#        self.assertEqual(len(results['records']), 3)335#        # total must be filtered out336#        self.assertEqual(results['records'][0]['results']['subject1']['total'], -1)337#        self.assertEqual(results['records'][0]['results']['subject1']['intervals'][0]['percentage'], -1)338#        self.assertEqual(results['records'][0]['results']['subject1']['intervals'][1]['percentage'], -1)339#        self.assertEqual(results['records'][0]['results']['subject1']['intervals'][2]['percentage'], -1)340#        self.assertEqual(results['records'][0]['results']['subject1']['intervals'][3]['percentage'], -1)341#        set_default_min_cell_size(0)342    def test_comparing_populations_with_sex(self):343        testParam = {}344        testParam[Constants.STATECODE] = 'NC'345        testParam[Constants.DISTRICTGUID] = '229'346        testParam[Constants.ASMTYEAR] = 2016347        testParam[filters.FILTERS_SEX] = [filters.FILTERS_SEX_MALE]348        results = get_comparing_populations_report(testParam)349        self.assertEqual(len(results['records']), 2)350        self.assertEqual(results['records'][0]['results']['subject1']['total'], -1)351        self.assertEqual(results['records'][0]['results']['subject2']['total'], 5)352        self.assertEqual(results['records'][1]['results']['subject1']['total'], -1)353        self.assertEqual(results['records'][1]['results']['subject2']['total'], 0)354    def test_comparing_populations_with_sex_not_stated(self):355        testParam = {}356        testParam[Constants.STATECODE] = 'NC'357        testParam[Constants.DISTRICTGUID] = '229'358        testParam[filters.FILTERS_SEX] = [filters.FILTERS_SEX_NOT_STATED]359        results = get_comparing_populations_report(testParam)360        self.assertEqual(len(results['records']), 1)361        self.assertEqual(results['records'][0]['results']['subject1']['total'], -1)362        self.assertEqual(results['records'][0]['results']['subject2']['total'], 0)363    def test_comparing_populations_with_not_stated_count(self):364        testParam = {}365        testParam[Constants.STATECODE] = 'NC'366        testParam[Constants.DISTRICTGUID] = '229'367        results = get_comparing_populations_report(testParam)368        self.assertEqual(results['not_stated']['total'], 29)369        self.assertEqual(results['not_stated']['dmgPrg504'], 2)370        self.assertEqual(results['not_stated']['dmgPrgIep'], 2)371        self.assertEqual(results['not_stated']['dmgPrgLep'], 0)372        self.assertEqual(results['not_stated']['dmgStsMig'], 1)373        self.assertEqual(results['not_stated']['ethnicity'], 0)374        self.assertEqual(results['not_stated']['sex'], 1)375    def test_filter_with_unfiltered_results(self):376        testParam = {}377        testParam[Constants.STATECODE] = 'NC'378        testParam[Constants.DISTRICTGUID] = '229'379        testParam[Constants.ASMTYEAR] = 2016380        testParam[filters.FILTERS_SEX] = [filters.FILTERS_SEX_MALE]381        results = get_comparing_populations_report(testParam)382        self.assertEqual(len(results['records']), 2)383        self.assertEqual(results['records'][0]['results']['subject1']['unfilteredTotal'], 9)384        self.assertEqual(results['records'][0]['results']['subject2']['unfilteredTotal'], 9)385        self.assertEqual(results['records'][1]['results']['subject1']['unfilteredTotal'], 3)386        self.assertEqual(results['records'][1]['results']['subject2']['unfilteredTotal'], -1)387        self.assertEqual(results['summary'][0]['results']['subject1']['unfilteredTotal'], 14)388        self.assertEqual(results['summary'][0]['results']['subject2']['unfilteredTotal'], 14)389    def test_get_merged_report_records(self):390        summative = {'records': [{'id': 'a', 'name': 'a', 'type': 'sum',391                                  'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}},392                                 {'id': 'b', 'name': 'b', 'type': 'sum',393                                  'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}}],394                     'subjects': {'a': 'a'}, 'summary': [{'results': {'a': {'intervals': [{'percentage': 100}]}}}]}395        interim = {'records': [{'id': 'a', 'name': 'a', 'type': 'int',396                                'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}},397                               {'id': 'b', 'name': 'b', 'type': 'int',398                                'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}}],399                   'subjects': {'a': 'a'}}400        results = get_merged_report_records(summative, interim)401        self.assertEqual(len(results), 2)402        self.assertEqual(results[0]['type'], 'sum')403        self.assertEqual(results[0]['name'], 'a')404        self.assertEqual(results[1]['type'], 'sum')405        self.assertEqual(results[1]['name'], 'b')406    def test_get_merged_report_records_with_no_summative(self):407        summative = {'records': [],408                     'subjects': {'a': 'a'},409                     'summary': [{'results': {'a': {'intervals': [{'percentage': 10}]}}}]}410        interim = {'records': [{'id': 'a', 'name': 'a', 'type': 'int',411                                'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}},412                               {'id': 'b', 'name': 'b', 'type': 'int',413                                'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}}],414                   'subjects': {'a': 'a'}}415        results = get_merged_report_records(summative, interim)416        self.assertEqual(len(results), 2)417        self.assertEqual(results[0]['type'], 'int')418        self.assertEqual(results[0]['name'], 'a')419        self.assertEqual(results[1]['type'], 'int')420        self.assertEqual(results[1]['name'], 'b')421        self.assertEqual(results[0]['results']['a']['intervals'][0]['percentage'], -1)422        self.assertEqual(results[0]['results']['a']['intervals'][0]['count'], -1)423    def test_get_merged_report_records_with_mixed_asmt_types(self):424        summative = {'records': [{'id': 'b', 'name': 'b', 'type': 'sum',425                                  'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}}],426                     'subjects': {'a': 'a'},427                     'summary': [{'results': {'a': {'intervals': [{'percentage': 100}]}}}]}428        interim = {'records': [{'id': 'a', 'name': 'a', 'type': 'int',429                                'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}},430                               {'id': 'b', 'name': 'b', 'type': 'int',431                                'results': {'a': {'total': 3, 'intervals': [{'percentage': 100}]}}}],432                   'subjects': {'a': 'a'}}433        results = get_merged_report_records(summative, interim)434        self.assertEqual(len(results), 2)435        self.assertEqual(results[0]['type'], 'int')436        self.assertEqual(results[0]['name'], 'a')437        self.assertEqual(results[1]['type'], 'sum')438        self.assertEqual(results[1]['name'], 'b')439    def test_get_merged_report_records_with_interim(self):440        summative = {'records': [{'id': 'b', 'name': 'b', 'type': 'sum',441                                  'results': {'a': {'total': 0, 'intervals': [{'percentage': 0}]}}}],442                     'subjects': {'a': 'a'},443                     'summary': [{'results': {'a': {'intervals': [{'percentage': 100}]}}}]}444        interim = {'records': [{'id': 'b', 'name': 'b', 'type': 'int',445                                'results': {'a': {'total': -1, 'hasInterim': True, 'intervals': [{'percentage': -1}]}}}],446                   'subjects': {'a': 'a'}}447        results = get_merged_report_records(summative, interim)448        self.assertEqual(len(results), 1)449        self.assertEqual(results[0]['type'], 'sum')450        self.assertEqual(results[0]['name'], 'b')451        self.assertEqual(results[0]['results']['a']['hasInterim'], True)452    def test_get_merged_report_records_with_no_results(self):453        summative = {'records': [{'id': 'b', 'name': 'b', 'type': 'sum',454                                  'results': {'a': {'total': 0, 'intervals': [{'percentage': 0}]}}}],455                     'subjects': {'a': 'a'},456                     'summary': [{'results': {'a': {'intervals': [{'percentage': 100}]}}}]}457        interim = {'records': [{'id': 'b', 'name': 'b', 'type': 'int',458                                'results': {'a': {'total': 3, 'hasInterim': True, 'intervals': [{'percentage': 100}]}}}],459                   'subjects': {'a': 'a'}}460        results = get_merged_report_records(summative, interim)461        self.assertEqual(len(results), 1)462        self.assertEqual(results[0]['type'], 'sum')463        self.assertEqual(results[0]['name'], 'b')464        self.assertEqual(results[0]['results']['a']['hasInterim'], True)465if __name__ == "__main__":466    # import sys;sys.argv = ['', 'Test.testReport']...json_results_generator.py
Source:json_results_generator.py  
1# Copyright 2014 The Chromium Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4#5# Most of this file was ported over from Blink's6# Tools/Scripts/webkitpy/layout_tests/layout_package/json_results_generator.py7# Tools/Scripts/webkitpy/common/net/file_uploader.py8#9import json10import logging11import mimetypes12import os13import time14import urllib215_log = logging.getLogger(__name__)16_JSON_PREFIX = 'ADD_RESULTS('17_JSON_SUFFIX = ');'18def HasJSONWrapper(string):19  return string.startswith(_JSON_PREFIX) and string.endswith(_JSON_SUFFIX)20def StripJSONWrapper(json_content):21  # FIXME: Kill this code once the server returns json instead of jsonp.22  if HasJSONWrapper(json_content):23    return json_content[len(_JSON_PREFIX):len(json_content) - len(_JSON_SUFFIX)]24  return json_content25def WriteJSON(json_object, file_path, callback=None):26  # Specify separators in order to get compact encoding.27  json_string = json.dumps(json_object, separators=(',', ':'))28  if callback:29    json_string = callback + '(' + json_string + ');'30  with open(file_path, 'w') as fp:31    fp.write(json_string)32def ConvertTrieToFlatPaths(trie, prefix=None):33  """Flattens the trie of paths, prepending a prefix to each."""34  result = {}35  for name, data in trie.iteritems():36    if prefix:37      name = prefix + '/' + name38    if len(data) and not 'results' in data:39      result.update(ConvertTrieToFlatPaths(data, name))40    else:41      result[name] = data42  return result43def AddPathToTrie(path, value, trie):44  """Inserts a single path and value into a directory trie structure."""45  if not '/' in path:46    trie[path] = value47    return48  directory, _, rest = path.partition('/')49  if not directory in trie:50    trie[directory] = {}51  AddPathToTrie(rest, value, trie[directory])52def TestTimingsTrie(individual_test_timings):53  """Breaks a test name into dicts by directory54  foo/bar/baz.html: 1ms55  foo/bar/baz1.html: 3ms56  becomes57  foo: {58      bar: {59          baz.html: 1,60          baz1.html: 361      }62  }63  """64  trie = {}65  for test_result in individual_test_timings:66    test = test_result.test_name67    AddPathToTrie(test, int(1000 * test_result.test_run_time), trie)68  return trie69class TestResult(object):70  """A simple class that represents a single test result."""71  # Test modifier constants.72  (NONE, FAILS, FLAKY, DISABLED) = range(4)73  def __init__(self, test, failed=False, elapsed_time=0):74    self.test_name = test75    self.failed = failed76    self.test_run_time = elapsed_time77    test_name = test78    try:79      test_name = test.split('.')[1]80    except IndexError:81      _log.warn('Invalid test name: %s.', test)82    if test_name.startswith('FAILS_'):83      self.modifier = self.FAILS84    elif test_name.startswith('FLAKY_'):85      self.modifier = self.FLAKY86    elif test_name.startswith('DISABLED_'):87      self.modifier = self.DISABLED88    else:89      self.modifier = self.NONE90  def Fixable(self):91    return self.failed or self.modifier == self.DISABLED92class JSONResultsGeneratorBase(object):93  """A JSON results generator for generic tests."""94  MAX_NUMBER_OF_BUILD_RESULTS_TO_LOG = 75095  # Min time (seconds) that will be added to the JSON.96  MIN_TIME = 197  # Note that in non-chromium tests those chars are used to indicate98  # test modifiers (FAILS, FLAKY, etc) but not actual test results.99  PASS_RESULT = 'P'100  SKIP_RESULT = 'X'101  FAIL_RESULT = 'F'102  FLAKY_RESULT = 'L'103  NO_DATA_RESULT = 'N'104  MODIFIER_TO_CHAR = {TestResult.NONE: PASS_RESULT,105                      TestResult.DISABLED: SKIP_RESULT,106                      TestResult.FAILS: FAIL_RESULT,107                      TestResult.FLAKY: FLAKY_RESULT}108  VERSION = 4109  VERSION_KEY = 'version'110  RESULTS = 'results'111  TIMES = 'times'112  BUILD_NUMBERS = 'buildNumbers'113  TIME = 'secondsSinceEpoch'114  TESTS = 'tests'115  FIXABLE_COUNT = 'fixableCount'116  FIXABLE = 'fixableCounts'117  ALL_FIXABLE_COUNT = 'allFixableCount'118  RESULTS_FILENAME = 'results.json'119  TIMES_MS_FILENAME = 'times_ms.json'120  INCREMENTAL_RESULTS_FILENAME = 'incremental_results.json'121  # line too long pylint: disable=line-too-long122  URL_FOR_TEST_LIST_JSON = (123      'http://%s/testfile?builder=%s&name=%s&testlistjson=1&testtype=%s&master=%s')124  # pylint: enable=line-too-long125  def __init__(self, builder_name, build_name, build_number,126               results_file_base_path, builder_base_url,127               test_results_map, svn_repositories=None,128               test_results_server=None,129               test_type='',130               master_name=''):131    """Modifies the results.json file. Grabs it off the archive directory132    if it is not found locally.133    Args134      builder_name: the builder name (e.g. Webkit).135      build_name: the build name (e.g. webkit-rel).136      build_number: the build number.137      results_file_base_path: Absolute path to the directory containing the138          results json file.139      builder_base_url: the URL where we have the archived test results.140          If this is None no archived results will be retrieved.141      test_results_map: A dictionary that maps test_name to TestResult.142      svn_repositories: A (json_field_name, svn_path) pair for SVN143          repositories that tests rely on.  The SVN revision will be144          included in the JSON with the given json_field_name.145      test_results_server: server that hosts test results json.146      test_type: test type string (e.g. 'layout-tests').147      master_name: the name of the buildbot master.148    """149    self._builder_name = builder_name150    self._build_name = build_name151    self._build_number = build_number152    self._builder_base_url = builder_base_url153    self._results_directory = results_file_base_path154    self._test_results_map = test_results_map155    self._test_results = test_results_map.values()156    self._svn_repositories = svn_repositories157    if not self._svn_repositories:158      self._svn_repositories = {}159    self._test_results_server = test_results_server160    self._test_type = test_type161    self._master_name = master_name162    self._archived_results = None163  def GenerateJSONOutput(self):164    json_object = self.GetJSON()165    if json_object:166      file_path = (167          os.path.join(168              self._results_directory,169              self.INCREMENTAL_RESULTS_FILENAME))170      WriteJSON(json_object, file_path)171  def GenerateTimesMSFile(self):172    times = TestTimingsTrie(self._test_results_map.values())173    file_path = os.path.join(self._results_directory, self.TIMES_MS_FILENAME)174    WriteJSON(times, file_path)175  def GetJSON(self):176    """Gets the results for the results.json file."""177    results_json = {}178    if not results_json:179      results_json, error = self._GetArchivedJSONResults()180      if error:181        # If there was an error don't write a results.json182        # file at all as it would lose all the information on the183        # bot.184        _log.error('Archive directory is inaccessible. Not '185                   'modifying or clobbering the results.json '186                   'file: ' + str(error))187        return None188    builder_name = self._builder_name189    if results_json and builder_name not in results_json:190      _log.debug('Builder name (%s) is not in the results.json file.',191                 builder_name)192    self._ConvertJSONToCurrentVersion(results_json)193    if builder_name not in results_json:194      results_json[builder_name] = (195          self._CreateResultsForBuilderJSON())196    results_for_builder = results_json[builder_name]197    if builder_name:198      self._InsertGenericMetaData(results_for_builder)199    self._InsertFailureSummaries(results_for_builder)200    # Update the all failing tests with result type and time.201    tests = results_for_builder[self.TESTS]202    all_failing_tests = self._GetFailedTestNames()203    all_failing_tests.update(ConvertTrieToFlatPaths(tests))204    for test in all_failing_tests:205      self._InsertTestTimeAndResult(test, tests)206    return results_json207  def SetArchivedResults(self, archived_results):208    self._archived_results = archived_results209  def UploadJSONFiles(self, json_files):210    """Uploads the given json_files to the test_results_server (if the211    test_results_server is given)."""212    if not self._test_results_server:213      return214    if not self._master_name:215      _log.error(216          '--test-results-server was set, but --master-name was not.  Not '217          'uploading JSON files.')218      return219    _log.info('Uploading JSON files for builder: %s', self._builder_name)220    attrs = [('builder', self._builder_name),221             ('testtype', self._test_type),222             ('master', self._master_name)]223    files = [(json_file, os.path.join(self._results_directory, json_file))224             for json_file in json_files]225    url = 'http://%s/testfile/upload' % self._test_results_server226    # Set uploading timeout in case appengine server is having problems.227    # 120 seconds are more than enough to upload test results.228    uploader = _FileUploader(url, 120)229    try:230      response = uploader.UploadAsMultipartFormData(files, attrs)231      if response:232        if response.code == 200:233          _log.info('JSON uploaded.')234        else:235          _log.debug(236              "JSON upload failed, %d: '%s'", response.code, response.read())237      else:238        _log.error('JSON upload failed; no response returned')239    except Exception, err: # pylint: disable=broad-except240      _log.error('Upload failed: %s', err)241      return242  def _GetTestTiming(self, test_name):243    """Returns test timing data (elapsed time) in second244    for the given test_name."""245    if test_name in self._test_results_map:246      # Floor for now to get time in seconds.247      return int(self._test_results_map[test_name].test_run_time)248    return 0249  def _GetFailedTestNames(self):250    """Returns a set of failed test names."""251    return set([r.test_name for r in self._test_results if r.failed])252  def _GetModifierChar(self, test_name):253    """Returns a single char (e.g. SKIP_RESULT, FAIL_RESULT,254    PASS_RESULT, NO_DATA_RESULT, etc) that indicates the test modifier255    for the given test_name.256    """257    if test_name not in self._test_results_map:258      return self.__class__.NO_DATA_RESULT259    test_result = self._test_results_map[test_name]260    if test_result.modifier in self.MODIFIER_TO_CHAR.keys():261      return self.MODIFIER_TO_CHAR[test_result.modifier]262    return self.__class__.PASS_RESULT263  def _get_result_char(self, test_name):264    """Returns a single char (e.g. SKIP_RESULT, FAIL_RESULT,265    PASS_RESULT, NO_DATA_RESULT, etc) that indicates the test result266    for the given test_name.267    """268    if test_name not in self._test_results_map:269      return self.__class__.NO_DATA_RESULT270    test_result = self._test_results_map[test_name]271    if test_result.modifier == TestResult.DISABLED:272      return self.__class__.SKIP_RESULT273    if test_result.failed:274      return self.__class__.FAIL_RESULT275    return self.__class__.PASS_RESULT276  def _GetSVNRevision(self, in_directory):277    """Returns the svn revision for the given directory.278    Args:279      in_directory: The directory where svn is to be run.280    """281    # This is overridden in flakiness_dashboard_results_uploader.py.282    raise NotImplementedError()283  def _GetArchivedJSONResults(self):284    """Download JSON file that only contains test285    name list from test-results server. This is for generating incremental286    JSON so the file generated has info for tests that failed before but287    pass or are skipped from current run.288    Returns (archived_results, error) tuple where error is None if results289    were successfully read.290    """291    results_json = {}292    old_results = None293    error = None294    if not self._test_results_server:295      return {}, None296    results_file_url = (self.URL_FOR_TEST_LIST_JSON %297                        (urllib2.quote(self._test_results_server),298                         urllib2.quote(self._builder_name),299                         self.RESULTS_FILENAME,300                         urllib2.quote(self._test_type),301                         urllib2.quote(self._master_name)))302    try:303      # FIXME: We should talk to the network via a Host object.304      results_file = urllib2.urlopen(results_file_url)305      old_results = results_file.read()306    except urllib2.HTTPError, http_error:307      # A non-4xx status code means the bot is hosed for some reason308      # and we can't grab the results.json file off of it.309      if http_error.code < 400 and http_error.code >= 500:310        error = http_error311    except urllib2.URLError, url_error:312      error = url_error313    if old_results:314      # Strip the prefix and suffix so we can get the actual JSON object.315      old_results = StripJSONWrapper(old_results)316      try:317        results_json = json.loads(old_results)318      except Exception: # pylint: disable=broad-except319        _log.debug('results.json was not valid JSON. Clobbering.')320        # The JSON file is not valid JSON. Just clobber the results.321        results_json = {}322    else:323      _log.debug('Old JSON results do not exist. Starting fresh.')324      results_json = {}325    return results_json, error326  def _InsertFailureSummaries(self, results_for_builder):327    """Inserts aggregate pass/failure statistics into the JSON.328    This method reads self._test_results and generates329    FIXABLE, FIXABLE_COUNT and ALL_FIXABLE_COUNT entries.330    Args:331      results_for_builder: Dictionary containing the test results for a332          single builder.333    """334    # Insert the number of tests that failed or skipped.335    fixable_count = len([r for r in self._test_results if r.Fixable()])336    self._InsertItemIntoRawList(results_for_builder,337                                fixable_count, self.FIXABLE_COUNT)338    # Create a test modifiers (FAILS, FLAKY etc) summary dictionary.339    entry = {}340    for test_name in self._test_results_map.iterkeys():341      result_char = self._GetModifierChar(test_name)342      entry[result_char] = entry.get(result_char, 0) + 1343    # Insert the pass/skip/failure summary dictionary.344    self._InsertItemIntoRawList(results_for_builder, entry,345                                self.FIXABLE)346    # Insert the number of all the tests that are supposed to pass.347    all_test_count = len(self._test_results)348    self._InsertItemIntoRawList(results_for_builder,349                                all_test_count, self.ALL_FIXABLE_COUNT)350  def _InsertItemIntoRawList(self, results_for_builder, item, key):351    """Inserts the item into the list with the given key in the results for352    this builder. Creates the list if no such list exists.353    Args:354      results_for_builder: Dictionary containing the test results for a355          single builder.356      item: Number or string to insert into the list.357      key: Key in results_for_builder for the list to insert into.358    """359    if key in results_for_builder:360      raw_list = results_for_builder[key]361    else:362      raw_list = []363    raw_list.insert(0, item)364    raw_list = raw_list[:self.MAX_NUMBER_OF_BUILD_RESULTS_TO_LOG]365    results_for_builder[key] = raw_list366  def _InsertItemRunLengthEncoded(self, item, encoded_results):367    """Inserts the item into the run-length encoded results.368    Args:369      item: String or number to insert.370      encoded_results: run-length encoded results. An array of arrays, e.g.371          [[3,'A'],[1,'Q']] encodes AAAQ.372    """373    if len(encoded_results) and item == encoded_results[0][1]:374      num_results = encoded_results[0][0]375      if num_results <= self.MAX_NUMBER_OF_BUILD_RESULTS_TO_LOG:376        encoded_results[0][0] = num_results + 1377    else:378      # Use a list instead of a class for the run-length encoding since379      # we want the serialized form to be concise.380      encoded_results.insert(0, [1, item])381  def _InsertGenericMetaData(self, results_for_builder):382    """ Inserts generic metadata (such as version number, current time etc)383    into the JSON.384    Args:385      results_for_builder: Dictionary containing the test results for386          a single builder.387    """388    self._InsertItemIntoRawList(results_for_builder,389                                self._build_number, self.BUILD_NUMBERS)390    # Include SVN revisions for the given repositories.391    for (name, path) in self._svn_repositories:392      # Note: for JSON file's backward-compatibility we use 'chrome' rather393      # than 'chromium' here.394      lowercase_name = name.lower()395      if lowercase_name == 'chromium':396        lowercase_name = 'chrome'397      self._InsertItemIntoRawList(results_for_builder,398                                  self._GetSVNRevision(path),399                                  lowercase_name + 'Revision')400    self._InsertItemIntoRawList(results_for_builder,401                                int(time.time()),402                                self.TIME)403  def _InsertTestTimeAndResult(self, test_name, tests):404    """ Insert a test item with its results to the given tests dictionary.405    Args:406      tests: Dictionary containing test result entries.407    """408    result = self._get_result_char(test_name)409    test_time = self._GetTestTiming(test_name)410    this_test = tests411    for segment in test_name.split('/'):412      if segment not in this_test:413        this_test[segment] = {}414      this_test = this_test[segment]415    if not len(this_test):416      self._PopulateResultsAndTimesJSON(this_test)417    if self.RESULTS in this_test:418      self._InsertItemRunLengthEncoded(result, this_test[self.RESULTS])419    else:420      this_test[self.RESULTS] = [[1, result]]421    if self.TIMES in this_test:422      self._InsertItemRunLengthEncoded(test_time, this_test[self.TIMES])423    else:424      this_test[self.TIMES] = [[1, test_time]]425  def _ConvertJSONToCurrentVersion(self, results_json):426    """If the JSON does not match the current version, converts it to the427    current version and adds in the new version number.428    """429    if self.VERSION_KEY in results_json:430      archive_version = results_json[self.VERSION_KEY]431      if archive_version == self.VERSION:432        return433    else:434      archive_version = 3435    # version 3->4436    if archive_version == 3:437      for results in results_json.values():438        self._ConvertTestsToTrie(results)439    results_json[self.VERSION_KEY] = self.VERSION440  def _ConvertTestsToTrie(self, results):441    if not self.TESTS in results:442      return443    test_results = results[self.TESTS]444    test_results_trie = {}445    for test in test_results.iterkeys():446      single_test_result = test_results[test]447      AddPathToTrie(test, single_test_result, test_results_trie)448    results[self.TESTS] = test_results_trie449  def _PopulateResultsAndTimesJSON(self, results_and_times):450    results_and_times[self.RESULTS] = []451    results_and_times[self.TIMES] = []452    return results_and_times453  def _CreateResultsForBuilderJSON(self):454    results_for_builder = {}455    results_for_builder[self.TESTS] = {}456    return results_for_builder457  def _RemoveItemsOverMaxNumberOfBuilds(self, encoded_list):458    """Removes items from the run-length encoded list after the final459    item that exceeds the max number of builds to track.460    Args:461      encoded_results: run-length encoded results. An array of arrays, e.g.462          [[3,'A'],[1,'Q']] encodes AAAQ.463    """464    num_builds = 0465    index = 0466    for result in encoded_list:467      num_builds = num_builds + result[0]468      index = index + 1469      if num_builds > self.MAX_NUMBER_OF_BUILD_RESULTS_TO_LOG:470        return encoded_list[:index]471    return encoded_list472  def _NormalizeResultsJSON(self, test, test_name, tests):473    """ Prune tests where all runs pass or tests that no longer exist and474    truncate all results to maxNumberOfBuilds.475    Args:476      test: ResultsAndTimes object for this test.477      test_name: Name of the test.478      tests: The JSON object with all the test results for this builder.479    """480    test[self.RESULTS] = self._RemoveItemsOverMaxNumberOfBuilds(481        test[self.RESULTS])482    test[self.TIMES] = self._RemoveItemsOverMaxNumberOfBuilds(483        test[self.TIMES])484    is_all_pass = self._IsResultsAllOfType(test[self.RESULTS],485                                           self.PASS_RESULT)486    is_all_no_data = self._IsResultsAllOfType(test[self.RESULTS],487                                              self.NO_DATA_RESULT)488    max_time = max([test_time[1] for test_time in test[self.TIMES]])489    # Remove all passes/no-data from the results to reduce noise and490    # filesize. If a test passes every run, but takes > MIN_TIME to run,491    # don't throw away the data.492    if is_all_no_data or (is_all_pass and max_time <= self.MIN_TIME):493      del tests[test_name]494  # method could be a function pylint: disable=R0201495  def _IsResultsAllOfType(self, results, result_type):496    """Returns whether all the results are of the given type497    (e.g. all passes)."""498    return len(results) == 1 and results[0][1] == result_type499class _FileUploader(object):500  def __init__(self, url, timeout_seconds):501    self._url = url502    self._timeout_seconds = timeout_seconds503  def UploadAsMultipartFormData(self, files, attrs):504    file_objs = []505    for filename, path in files:506      with file(path, 'rb') as fp:507        file_objs.append(('file', filename, fp.read()))508    # FIXME: We should use the same variable names for the formal and actual509    # parameters.510    content_type, data = _EncodeMultipartFormData(attrs, file_objs)511    return self._UploadData(content_type, data)512  def _UploadData(self, content_type, data):513    start = time.time()514    end = start + self._timeout_seconds515    while time.time() < end:516      try:517        request = urllib2.Request(self._url, data,518                                  {'Content-Type': content_type})519        return urllib2.urlopen(request)520      except urllib2.HTTPError as e:521        _log.warn("Received HTTP status %s loading \"%s\".  "522                  'Retrying in 10 seconds...', e.code, e.filename)523        time.sleep(10)524def _GetMIMEType(filename):525  return mimetypes.guess_type(filename)[0] or 'application/octet-stream'526# FIXME: Rather than taking tuples, this function should take more527# structured data.528def _EncodeMultipartFormData(fields, files):529  """Encode form fields for multipart/form-data.530  Args:531    fields: A sequence of (name, value) elements for regular form fields.532    files: A sequence of (name, filename, value) elements for data to be533           uploaded as files.534  Returns:535    (content_type, body) ready for httplib.HTTP instance.536  Source:537    http://code.google.com/p/rietveld/source/browse/trunk/upload.py538  """539  BOUNDARY = '-M-A-G-I-C---B-O-U-N-D-A-R-Y-'540  CRLF = '\r\n'541  lines = []542  for key, value in fields:543    lines.append('--' + BOUNDARY)544    lines.append('Content-Disposition: form-data; name="%s"' % key)545    lines.append('')546    if isinstance(value, unicode):547      value = value.encode('utf-8')548    lines.append(value)549  for key, filename, value in files:550    lines.append('--' + BOUNDARY)551    lines.append('Content-Disposition: form-data; name="%s"; '552                 'filename="%s"' % (key, filename))553    lines.append('Content-Type: %s' % _GetMIMEType(filename))554    lines.append('')555    if isinstance(value, unicode):556      value = value.encode('utf-8')557    lines.append(value)558  lines.append('--' + BOUNDARY + '--')559  lines.append('')560  body = CRLF.join(lines)561  content_type = 'multipart/form-data; boundary=%s' % BOUNDARY...page_test_results_unittest.py
Source:page_test_results_unittest.py  
1# Copyright 2014 The Chromium Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4import os5import unittest6from telemetry import benchmark7from telemetry import story8from telemetry.internal.results import base_test_results_unittest9from telemetry.internal.results import chart_json_output_formatter10from telemetry.internal.results import json_output_formatter11from telemetry.internal.results import page_test_results12from telemetry import page as page_module13from telemetry.testing import stream14from telemetry.value import failure15from telemetry.value import histogram16from telemetry.value import improvement_direction17from telemetry.value import scalar18from telemetry.value import skip19from telemetry.value import trace20from tracing.trace_data import trace_data21class PageTestResultsTest(base_test_results_unittest.BaseTestResultsUnittest):22  def setUp(self):23    story_set = story.StorySet(base_dir=os.path.dirname(__file__))24    story_set.AddStory(page_module.Page("http://www.bar.com/", story_set, story_set.base_dir))25    story_set.AddStory(page_module.Page("http://www.baz.com/", story_set, story_set.base_dir))26    story_set.AddStory(page_module.Page("http://www.foo.com/", story_set, story_set.base_dir))27    self.story_set = story_set28  @property29  def pages(self):30    return self.story_set.stories31  def testFailures(self):32    results = page_test_results.PageTestResults()33    results.WillRunPage(self.pages[0])34    results.AddValue(35        failure.FailureValue(self.pages[0], self.CreateException()))36    results.DidRunPage(self.pages[0])37    results.WillRunPage(self.pages[1])38    results.DidRunPage(self.pages[1])39    self.assertEqual(set([self.pages[0]]), results.pages_that_failed)40    self.assertEqual(set([self.pages[1]]), results.pages_that_succeeded)41    self.assertEqual(2, len(results.all_page_runs))42    self.assertTrue(results.all_page_runs[0].failed)43    self.assertTrue(results.all_page_runs[1].ok)44  def testSkips(self):45    results = page_test_results.PageTestResults()46    results.WillRunPage(self.pages[0])47    results.AddValue(skip.SkipValue(self.pages[0], 'testing reason'))48    results.DidRunPage(self.pages[0])49    results.WillRunPage(self.pages[1])50    results.DidRunPage(self.pages[1])51    self.assertTrue(results.all_page_runs[0].skipped)52    self.assertEqual(self.pages[0], results.all_page_runs[0].story)53    self.assertEqual(set([self.pages[0], self.pages[1]]),54                     results.pages_that_succeeded)55    self.assertEqual(2, len(results.all_page_runs))56    self.assertTrue(results.all_page_runs[0].skipped)57    self.assertTrue(results.all_page_runs[1].ok)58  def testBasic(self):59    results = page_test_results.PageTestResults()60    results.WillRunPage(self.pages[0])61    results.AddValue(scalar.ScalarValue(62        self.pages[0], 'a', 'seconds', 3,63        improvement_direction=improvement_direction.UP))64    results.DidRunPage(self.pages[0])65    results.WillRunPage(self.pages[1])66    results.AddValue(scalar.ScalarValue(67        self.pages[1], 'a', 'seconds', 3,68        improvement_direction=improvement_direction.UP))69    results.DidRunPage(self.pages[1])70    results.PrintSummary()71    values = results.FindPageSpecificValuesForPage(self.pages[0], 'a')72    self.assertEquals(1, len(values))73    v = values[0]74    self.assertEquals(v.name, 'a')75    self.assertEquals(v.page, self.pages[0])76    values = results.FindAllPageSpecificValuesNamed('a')77    assert len(values) == 278  def testAddValueWithStoryGroupingKeys(self):79    results = page_test_results.PageTestResults()80    self.pages[0].grouping_keys['foo'] = 'bar'81    self.pages[0].grouping_keys['answer'] = '42'82    results.WillRunPage(self.pages[0])83    results.AddValue(scalar.ScalarValue(84        self.pages[0], 'a', 'seconds', 3,85        improvement_direction=improvement_direction.UP))86    results.DidRunPage(self.pages[0])87    results.PrintSummary()88    values = results.FindPageSpecificValuesForPage(self.pages[0], 'a')89    v = values[0]90    self.assertEquals(v.grouping_keys['foo'], 'bar')91    self.assertEquals(v.grouping_keys['answer'], '42')92    self.assertEquals(v.tir_label, '42_bar')93  def testAddValueWithStoryGroupingKeysAndMatchingTirLabel(self):94    results = page_test_results.PageTestResults()95    self.pages[0].grouping_keys['foo'] = 'bar'96    self.pages[0].grouping_keys['answer'] = '42'97    results.WillRunPage(self.pages[0])98    results.AddValue(scalar.ScalarValue(99        self.pages[0], 'a', 'seconds', 3,100        improvement_direction=improvement_direction.UP,101        tir_label='42_bar'))102    results.DidRunPage(self.pages[0])103    results.PrintSummary()104    values = results.FindPageSpecificValuesForPage(self.pages[0], 'a')105    v = values[0]106    self.assertEquals(v.grouping_keys['foo'], 'bar')107    self.assertEquals(v.grouping_keys['answer'], '42')108    self.assertEquals(v.tir_label, '42_bar')109  def testAddValueWithStoryGroupingKeysAndMismatchingTirLabel(self):110    results = page_test_results.PageTestResults()111    self.pages[0].grouping_keys['foo'] = 'bar'112    self.pages[0].grouping_keys['answer'] = '42'113    results.WillRunPage(self.pages[0])114    with self.assertRaises(AssertionError):115      results.AddValue(scalar.ScalarValue(116          self.pages[0], 'a', 'seconds', 3,117          improvement_direction=improvement_direction.UP,118          tir_label='another_label'))119  def testAddValueWithDuplicateStoryGroupingKeyFails(self):120    results = page_test_results.PageTestResults()121    self.pages[0].grouping_keys['foo'] = 'bar'122    results.WillRunPage(self.pages[0])123    with self.assertRaises(AssertionError):124      results.AddValue(scalar.ScalarValue(125          self.pages[0], 'a', 'seconds', 3,126          improvement_direction=improvement_direction.UP,127          grouping_keys={'foo': 'bar'}))128  def testUrlIsInvalidValue(self):129    results = page_test_results.PageTestResults()130    results.WillRunPage(self.pages[0])131    self.assertRaises(132      AssertionError,133      lambda: results.AddValue(scalar.ScalarValue(134          self.pages[0], 'url', 'string', 'foo',135          improvement_direction=improvement_direction.UP)))136  def testAddSummaryValueWithPageSpecified(self):137    results = page_test_results.PageTestResults()138    results.WillRunPage(self.pages[0])139    self.assertRaises(140      AssertionError,141      lambda: results.AddSummaryValue(scalar.ScalarValue(142          self.pages[0], 'a', 'units', 3,143          improvement_direction=improvement_direction.UP)))144  def testUnitChange(self):145    results = page_test_results.PageTestResults()146    results.WillRunPage(self.pages[0])147    results.AddValue(scalar.ScalarValue(148        self.pages[0], 'a', 'seconds', 3,149        improvement_direction=improvement_direction.UP))150    results.DidRunPage(self.pages[0])151    results.WillRunPage(self.pages[1])152    self.assertRaises(153      AssertionError,154      lambda: results.AddValue(scalar.ScalarValue(155          self.pages[1], 'a', 'foobgrobbers', 3,156          improvement_direction=improvement_direction.UP)))157  def testTypeChange(self):158    results = page_test_results.PageTestResults()159    results.WillRunPage(self.pages[0])160    results.AddValue(scalar.ScalarValue(161        self.pages[0], 'a', 'seconds', 3,162        improvement_direction=improvement_direction.UP))163    results.DidRunPage(self.pages[0])164    results.WillRunPage(self.pages[1])165    self.assertRaises(166      AssertionError,167      lambda: results.AddValue(histogram.HistogramValue(168          self.pages[1], 'a', 'seconds',169          raw_value_json='{"buckets": [{"low": 1, "high": 2, "count": 1}]}',170          improvement_direction=improvement_direction.UP)))171  def testGetPagesThatSucceededAllPagesFail(self):172    results = page_test_results.PageTestResults()173    results.WillRunPage(self.pages[0])174    results.AddValue(scalar.ScalarValue(175        self.pages[0], 'a', 'seconds', 3,176        improvement_direction=improvement_direction.UP))177    results.AddValue(failure.FailureValue.FromMessage(self.pages[0], 'message'))178    results.DidRunPage(self.pages[0])179    results.WillRunPage(self.pages[1])180    results.AddValue(scalar.ScalarValue(181        self.pages[1], 'a', 'seconds', 7,182        improvement_direction=improvement_direction.UP))183    results.AddValue(failure.FailureValue.FromMessage(self.pages[1], 'message'))184    results.DidRunPage(self.pages[1])185    results.PrintSummary()186    self.assertEquals(0, len(results.pages_that_succeeded))187  def testGetSuccessfulPageValuesMergedNoFailures(self):188    results = page_test_results.PageTestResults()189    results.WillRunPage(self.pages[0])190    results.AddValue(scalar.ScalarValue(191        self.pages[0], 'a', 'seconds', 3,192        improvement_direction=improvement_direction.UP))193    self.assertEquals(1, len(results.all_page_specific_values))194    results.DidRunPage(self.pages[0])195  def testGetAllValuesForSuccessfulPages(self):196    results = page_test_results.PageTestResults()197    results.WillRunPage(self.pages[0])198    value1 = scalar.ScalarValue(199        self.pages[0], 'a', 'seconds', 3,200        improvement_direction=improvement_direction.UP)201    results.AddValue(value1)202    results.DidRunPage(self.pages[0])203    results.WillRunPage(self.pages[1])204    value2 = scalar.ScalarValue(205        self.pages[1], 'a', 'seconds', 3,206        improvement_direction=improvement_direction.UP)207    results.AddValue(value2)208    results.DidRunPage(self.pages[1])209    results.WillRunPage(self.pages[2])210    value3 = scalar.ScalarValue(211        self.pages[2], 'a', 'seconds', 3,212        improvement_direction=improvement_direction.UP)213    results.AddValue(value3)214    results.DidRunPage(self.pages[2])215    self.assertEquals(216        [value1, value2, value3], results.all_page_specific_values)217  def testGetAllValuesForSuccessfulPagesOnePageFails(self):218    results = page_test_results.PageTestResults()219    results.WillRunPage(self.pages[0])220    value1 = scalar.ScalarValue(221        self.pages[0], 'a', 'seconds', 3,222        improvement_direction=improvement_direction.UP)223    results.AddValue(value1)224    results.DidRunPage(self.pages[0])225    results.WillRunPage(self.pages[1])226    value2 = failure.FailureValue.FromMessage(self.pages[1], 'Failure')227    results.AddValue(value2)228    results.DidRunPage(self.pages[1])229    results.WillRunPage(self.pages[2])230    value3 = scalar.ScalarValue(231        self.pages[2], 'a', 'seconds', 3,232        improvement_direction=improvement_direction.UP)233    results.AddValue(value3)234    results.DidRunPage(self.pages[2])235    self.assertEquals(236        [value1, value2, value3], results.all_page_specific_values)237  def testFindValues(self):238    results = page_test_results.PageTestResults()239    results.WillRunPage(self.pages[0])240    v0 = scalar.ScalarValue(241        self.pages[0], 'a', 'seconds', 3,242        improvement_direction=improvement_direction.UP)243    results.AddValue(v0)244    v1 = scalar.ScalarValue(245        self.pages[0], 'a', 'seconds', 4,246        improvement_direction=improvement_direction.UP)247    results.AddValue(v1)248    results.DidRunPage(self.pages[1])249    values = results.FindValues(lambda v: v.value == 3)250    self.assertEquals([v0], values)251  def testValueWithTIRLabel(self):252    results = page_test_results.PageTestResults()253    results.WillRunPage(self.pages[0])254    v0 = scalar.ScalarValue(255        self.pages[0], 'a', 'seconds', 3, tir_label='foo',256        improvement_direction=improvement_direction.UP)257    results.AddValue(v0)258    v1 = scalar.ScalarValue(259        self.pages[0], 'a', 'seconds', 3, tir_label='bar',260        improvement_direction=improvement_direction.UP)261    results.AddValue(v1)262    results.DidRunPage(self.pages[0])263    values = results.FindAllPageSpecificValuesFromIRNamed('foo', 'a')264    self.assertEquals([v0], values)265  def testTraceValue(self):266    results = page_test_results.PageTestResults()267    results.WillRunPage(self.pages[0])268    results.AddValue(trace.TraceValue(269        None, trace_data.CreateTraceDataFromRawData([[{'test': 1}]])))270    results.DidRunPage(self.pages[0])271    results.WillRunPage(self.pages[1])272    results.AddValue(trace.TraceValue(273        None, trace_data.CreateTraceDataFromRawData([[{'test': 2}]])))274    results.DidRunPage(self.pages[1])275    results.PrintSummary()276    values = results.FindAllTraceValues()277    self.assertEquals(2, len(values))278  def testCleanUpCleansUpTraceValues(self):279    results = page_test_results.PageTestResults()280    v0 = trace.TraceValue(281        None, trace_data.CreateTraceDataFromRawData([{'test': 1}]))282    v1 = trace.TraceValue(283        None, trace_data.CreateTraceDataFromRawData([{'test': 2}]))284    results.WillRunPage(self.pages[0])285    results.AddValue(v0)286    results.DidRunPage(self.pages[0])287    results.WillRunPage(self.pages[1])288    results.AddValue(v1)289    results.DidRunPage(self.pages[1])290    results.CleanUp()291    self.assertTrue(v0.cleaned_up)292    self.assertTrue(v1.cleaned_up)293  def testNoTracesLeftAfterCleanUp(self):294    results = page_test_results.PageTestResults()295    v0 = trace.TraceValue(None,296                          trace_data.CreateTraceDataFromRawData([{'test': 1}]))297    v1 = trace.TraceValue(None,298                          trace_data.CreateTraceDataFromRawData([{'test': 2}]))299    results.WillRunPage(self.pages[0])300    results.AddValue(v0)301    results.DidRunPage(self.pages[0])302    results.WillRunPage(self.pages[1])303    results.AddValue(v1)304    results.DidRunPage(self.pages[1])305    results.CleanUp()306    self.assertFalse(results.FindAllTraceValues())307  def testPrintSummaryDisabledResults(self):308    output_stream = stream.TestOutputStream()309    output_formatters = []310    benchmark_metadata = benchmark.BenchmarkMetadata(311      'benchmark_name', 'benchmark_description')312    output_formatters.append(313        chart_json_output_formatter.ChartJsonOutputFormatter(314            output_stream, benchmark_metadata))315    output_formatters.append(json_output_formatter.JsonOutputFormatter(316        output_stream, benchmark_metadata))317    results = page_test_results.PageTestResults(318        output_formatters=output_formatters, benchmark_enabled=False)319    results.PrintSummary()320    self.assertEquals(output_stream.output_data,321      "{\n  \"enabled\": false,\n  \"benchmark_name\": \"benchmark_name\"\n}\n")322class PageTestResultsFilterTest(unittest.TestCase):323  def setUp(self):324    story_set = story.StorySet(base_dir=os.path.dirname(__file__))325    story_set.AddStory(326        page_module.Page('http://www.foo.com/', story_set, story_set.base_dir))327    story_set.AddStory(328        page_module.Page('http://www.bar.com/', story_set, story_set.base_dir))329    self.story_set = story_set330  @property331  def pages(self):332    return self.story_set.stories333  def testFilterValue(self):334    def AcceptValueNamed_a(value, _):335      return value.name == 'a'336    results = page_test_results.PageTestResults(337        value_can_be_added_predicate=AcceptValueNamed_a)338    results.WillRunPage(self.pages[0])339    results.AddValue(scalar.ScalarValue(340        self.pages[0], 'a', 'seconds', 3,341        improvement_direction=improvement_direction.UP))342    results.AddValue(scalar.ScalarValue(343        self.pages[0], 'b', 'seconds', 3,344        improvement_direction=improvement_direction.UP))345    results.DidRunPage(self.pages[0])346    results.WillRunPage(self.pages[1])347    results.AddValue(scalar.ScalarValue(348        self.pages[1], 'a', 'seconds', 3,349        improvement_direction=improvement_direction.UP))350    results.AddValue(scalar.ScalarValue(351        self.pages[1], 'd', 'seconds', 3,352        improvement_direction=improvement_direction.UP))353    results.DidRunPage(self.pages[1])354    results.PrintSummary()355    self.assertEquals(356        [('a', 'http://www.foo.com/'), ('a', 'http://www.bar.com/')],357        [(v.name, v.page.url) for v in results.all_page_specific_values])358  def testFilterIsFirstResult(self):359    def AcceptSecondValues(_, is_first_result):360      return not is_first_result361    results = page_test_results.PageTestResults(362        value_can_be_added_predicate=AcceptSecondValues)363    # First results (filtered out)364    results.WillRunPage(self.pages[0])365    results.AddValue(scalar.ScalarValue(366        self.pages[0], 'a', 'seconds', 7,367        improvement_direction=improvement_direction.UP))368    results.AddValue(scalar.ScalarValue(369        self.pages[0], 'b', 'seconds', 8,370        improvement_direction=improvement_direction.UP))371    results.DidRunPage(self.pages[0])372    results.WillRunPage(self.pages[1])373    results.AddValue(scalar.ScalarValue(374        self.pages[1], 'a', 'seconds', 5,375        improvement_direction=improvement_direction.UP))376    results.AddValue(scalar.ScalarValue(377        self.pages[1], 'd', 'seconds', 6,378        improvement_direction=improvement_direction.UP))379    results.DidRunPage(self.pages[1])380    # Second results381    results.WillRunPage(self.pages[0])382    results.AddValue(scalar.ScalarValue(383        self.pages[0], 'a', 'seconds', 3,384        improvement_direction=improvement_direction.UP))385    results.AddValue(scalar.ScalarValue(386        self.pages[0], 'b', 'seconds', 4,387        improvement_direction=improvement_direction.UP))388    results.DidRunPage(self.pages[0])389    results.WillRunPage(self.pages[1])390    results.AddValue(scalar.ScalarValue(391        self.pages[1], 'a', 'seconds', 1,392        improvement_direction=improvement_direction.UP))393    results.AddValue(scalar.ScalarValue(394        self.pages[1], 'd', 'seconds', 2,395        improvement_direction=improvement_direction.UP))396    results.DidRunPage(self.pages[1])397    results.PrintSummary()398    expected_values = [399        ('a', 'http://www.foo.com/', 3),400        ('b', 'http://www.foo.com/', 4),401        ('a', 'http://www.bar.com/', 1),402        ('d', 'http://www.bar.com/', 2)]403    actual_values = [(v.name, v.page.url, v.value)404                     for v in results.all_page_specific_values]405    self.assertEquals(expected_values, actual_values)406  def testFailureValueCannotBeFiltered(self):407    def AcceptValueNamed_a(value, _):408      return value.name == 'a'409    results = page_test_results.PageTestResults(410        value_can_be_added_predicate=AcceptValueNamed_a)411    results.WillRunPage(self.pages[0])412    results.AddValue(scalar.ScalarValue(413        self.pages[0], 'b', 'seconds', 8,414        improvement_direction=improvement_direction.UP))415    failure_value = failure.FailureValue.FromMessage(self.pages[0], 'failure')416    results.AddValue(failure_value)417    results.DidRunPage(self.pages[0])418    results.PrintSummary()419    # Although predicate says only accept values named 'a', the failure value is420    # added anyway.421    self.assertEquals(len(results.all_page_specific_values), 1)422    self.assertIn(failure_value, results.all_page_specific_values)423  def testSkipValueCannotBeFiltered(self):424    def AcceptValueNamed_a(value, _):425      return value.name == 'a'426    results = page_test_results.PageTestResults(427        value_can_be_added_predicate=AcceptValueNamed_a)428    results.WillRunPage(self.pages[0])429    skip_value = skip.SkipValue(self.pages[0], 'skip for testing')430    results.AddValue(scalar.ScalarValue(431        self.pages[0], 'b', 'seconds', 8,432        improvement_direction=improvement_direction.UP))433    results.AddValue(skip_value)434    results.DidRunPage(self.pages[0])435    results.PrintSummary()436    # Although predicate says only accept value with named 'a', skip value is437    # added anyway.438    self.assertEquals(len(results.all_page_specific_values), 1)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
