Best Python code snippet using molotov_python
cassandra_client.py
Source:cassandra_client.py  
...73        self.session = cluster.connect(keyspace)74        self.session.row_factory = datetime_hack_dict_factory75        self.session.default_consistency_level = cassandra.ConsistencyLevel.LOCAL_QUORUM76        self.timer = DummyTimer()77    def _prepare(self, query):78        if query in self.prepared:79            return self.prepared[query]80        prep = self.session.prepare(query)81        self.prepared[query] = prep82        return prep83    def _get_statement(self, table, columns=None, idcolumns=('id',)):84        if columns is None:85            columns = self.default_columns[table]86        where = ' AND '.join(('{} = ?'.format(col) for col in idcolumns))87        stmt = 'SELECT {} FROM {} WHERE {}'.format(','.join(columns), table, where)88        return self._prepare(stmt)89    def _get_compound_pk(self, table, idvs, columns, idcolumns):90        prep = self._get_statement(table, columns, idcolumns)91        res = self.session.execute(prep.bind(idvs))92        try:93            return next(iter(res))94        except StopIteration:95            raise KeyError('{} entry not found'.format(table))96    def _get(self, table, idv, columns=None, idcolumn='id'):97        return self._get_compound_pk(table, [idv], columns, [idcolumn])98    def _get_concurrent(self, statement, ids):99        output = {}100        if ids and not isinstance(ids[0], tuple):101            qids = [(u,) for u in ids]102        else:103            qids = ids104        results = cassandra.concurrent.execute_concurrent_with_args(self.session, statement, qids)105        for id_, (success, result) in zip(ids, results):106            if not success:107                raise result108            if result:109                output[id_] = result[0]110        return output111    @staticmethod112    def val_to_store(client, colname, jsoncols):113        ret = client[colname]114        if colname in jsoncols:115            ret = json.dumps(ret)116        return ret117    def insert_generic(self, data, tablename):118        table_columns = self.default_columns[tablename]119        colnames = ','.join(table_columns)120        placeholders = ('?,'*len(table_columns))[:-1]  # '?,?,..,?'121        stmt = 'INSERT INTO {} ({}) VALUES ({})'.format(tablename, colnames, placeholders)122        prep = self._prepare(stmt)123        jsoncols = set(self.json_columns[tablename])124        bindvals = [self.val_to_store(data, colname, jsoncols) for colname in table_columns]125        self.session.execute(prep.bind(bindvals))126    def insert_client(self, client):127        self.insert_generic(client, 'clients')128    def get_client_by_id(self, clientid):129        return self._get('clients', clientid)130    def get_clients_by_id(self, clientids):131        statement = self._get_statement('clients')132        return self._get_concurrent(statement, clientids)133    def get_generic(self, table, selectors, values, maxrows):134        if len(selectors) != len(values):135            raise KeyError('Selectors and values not same length')136        cols = ','.join(self.default_columns[table])137        if not selectors:138            stmt = 'SELECT {} from {} LIMIT {}'.format(cols, table, maxrows)139        else:140            tmpl = 'SELECT {} from {} WHERE {} LIMIT {} ALLOW FILTERING'141            stmt = tmpl.format(cols, table, ' and '.join(selectors), maxrows)142        with self.timer.time('cassandra.get_generic.{}'.format(table)):143            prep = self._prepare(stmt)144            res = self.session.execute(prep.bind(values))145        return res146    def get_clients(self, selectors, values, maxrows):147        return self.get_generic('clients', selectors, values, maxrows)148    def get_clients_by_owner(self, owner):149        prep = self._prepare('SELECT * from clients WHERE owner = ?')150        with self.timer.time('cassandra.get_clients_by_owner'):151            res = self.session.execute(prep.bind([owner]))152        return res153    def get_clients_by_scope(self, scope):154        prep = self._prepare('SELECT * from clients WHERE scopes CONTAINS ?')155        res = self.session.execute(prep.bind([scope]))156        return res157    def get_clients_by_scope_requested(self, scope):158        prep = self._prepare('SELECT * from clients WHERE scopes_requested CONTAINS ?')159        res = self.session.execute(prep.bind([scope]))160        return res161    def add_client_counters(self, clients):162        ids = [c['id'] for c in clients]163        print(ids)164        statement = self._get_statement('clients_counters')165        counters = self._get_concurrent(statement, ids)166        for client in clients:167            client.update(counters.get(client['id'], {'count_users': 0, 'count_tokens': 0}))168    def delete_client(self, clientid):169        prep = self._prepare('DELETE FROM clients WHERE id = ?')170        with self.timer.time('cassandra.delete_client'):171            self.session.execute(prep.bind([clientid]))172            authzq = self._prepare('SELECT userid FROM oauth_authorizations WHERE clientid = ?')173            userids = self.session.execute(authzq.bind([clientid]))174            for row in userids:175                userid = row['userid']176                self.delete_authorization(userid, clientid)177    def insert_orgauthorization(self, clientid, realm, scopes):178        prep = self._prepare('UPDATE clients SET orgauthorization[?] = ? WHERE id = ?')179        self.session.execute(prep.bind([realm, scopes, clientid]))180    def delete_orgauthorization(self, clientid, realm):181        prep = self._prepare('DELETE orgauthorization[?] FROM clients WHERE id = ?')182        self.session.execute(prep.bind([realm, clientid]))183    def get_token(self, tokenid):184        return self._get('oauth_tokens', tokenid, ['*'], 'access_token')185    def get_tokens_by_scope(self, scope):186        prep = self._prepare('SELECT * FROM oauth_tokens WHERE scope contains ?')187        return self.session.execute(prep.bind([scope]))188    def update_token_scopes(self, access_token, scopes):189        prep = self._prepare('UPDATE oauth_tokens SET scope = ? WHERE access_token = ?')190        self.session.execute(prep.bind([scopes, access_token]))191    def get_user_by_id(self, userid):192        return self._get('users', userid, None, 'userid')193    def get_users(self, userids):194        statement = self._get_statement('users', None, ['userid'])195        return self._get_concurrent(statement, userids)196    def get_user_profilephoto(self, userid):197        userinfo = self._get('users', userid,198                             ['selectedsource', 'profilephoto', 'updated'], 'userid')199        selectedsource = userinfo['selectedsource']200        profilephoto = userinfo['profilephoto']201        updated = userinfo['updated']202        if profilephoto is None:203            return None, updated204        return profilephoto.get(selectedsource, None), updated205    def insert_user(self, userid, email, name, profilephoto,206                    profilephotohash, selectedsource, userid_sec):207        tstamp = now()208        sec_prep = self._prepare('INSERT INTO userid_sec (userid_sec, userid) VALUES (?, ?)')209        for sec in userid_sec:210            self.session.execute(sec_prep.bind([sec, userid]))211        userid_sec_seen = {sec: tstamp for sec in userid_sec}212        prep = self._prepare(213            'INSERT INTO users (userid, created, email, name, profilephoto, profilephotohash, ' +214            'selectedsource, updated, userid_sec, userid_sec_seen) VALUES (?,?,?,?,?,?,?,?,?,?)')215        self.session.execute(prep.bind([216            userid,217            tstamp,218            email,219            name,220            profilephoto,221            profilephotohash,222            selectedsource,223            tstamp,224            userid_sec,225            userid_sec_seen226        ]))227    def reset_user(self, userid):228        prep = self._prepare(229            'INSERT INTO users (userid, usageterms,email, name, profilephoto, profilephotohash, ' +230            'selectedsource, aboveagelimit) VALUES (?,?,?,?,?,?,?,?)')231        self.session.execute(prep.bind([232            userid, False, {}, {}, {}, {}, None, None233        ]))234    def get_userid_by_userid_sec(self, sec):235        return self._get('userid_sec', sec, ['userid'], 'userid_sec')['userid']236    def get_apigk(self, gkid):237        return parse_apigk(self._get('apigk', gkid))238    def get_apigks(self, selectors, values, maxrows):239        return [parse_apigk(gk) for gk in self.get_generic('apigk', selectors, values, maxrows)]240    def delete_apigk(self, gkid):241        prep = self._prepare('DELETE FROM apigk WHERE id = ?')242        self.session.execute(prep.bind([gkid]))243    def insert_apigk(self, apigk):244        self.insert_generic(apigk, 'apigk')245    def _get_logo(self, table, idvalue):246        res = self._get(table, idvalue, ['logo', 'updated'])247        return res['logo'], res['updated']248    def get_client_logo(self, clientid):249        return self._get_logo('clients', clientid)250    def get_apigk_logo(self, gkid):251        return self._get_logo('apigk', gkid)252    def save_logo(self, table, itemid, data, updated):253        prep = self._prepare('INSERT INTO {} (id, logo, updated) VALUES (?, ?, ?)'.format(table))254        self.session.execute(prep.bind([itemid, data, updated]))255    def get_authorizations(self, userid):256        prep = self._prepare('SELECT * FROM oauth_authorizations WHERE userid = ?')257        return self.session.execute(prep.bind([userid]))258    def delete_token(self, token):259        prep = self._prepare('DELETE FROM oauth_tokens WHERE access_token = ?')260        prep.consistency_level = cassandra.ConsistencyLevel.ALL261        self.session.execute(prep.bind([token]))262    def delete_authorization(self, userid, clientid):263        stmt = 'DELETE FROM oauth_authorizations WHERE userid = ? AND clientid = ?'264        prep_del_auth = self._prepare(stmt)265        prep_del_auth.consistency_level = cassandra.ConsistencyLevel.ALL266        self.session.execute(prep_del_auth.bind([userid, clientid]))267        prep_find_tokens = self._prepare(268            'SELECT access_token FROM oauth_tokens WHERE userid = ? AND clientid = ? ' +269            'ALLOW FILTERING')270        for token in self.session.execute(prep_find_tokens.bind([userid, clientid])):271            tokenid = token['access_token']272            self.log.debug('deleting token', token=tokenid)273            self.delete_token(tokenid)274    def delete_all_authorizations(self, clientid):275        prep = self._prepare('SELECT userid FROM oauth_tokens WHERE clientid = ?')276        prep.consistency_level = cassandra.ConsistencyLevel.ALL277        res = self.session.execute(prep.bind([clientid]))278        for userid in {row['userid'] for row in res}:279            self.delete_authorization(userid, clientid)280    def get_oauth_authz_by_scope(self, scope):281        prep = self._prepare('SELECT * FROM oauth_authorizations WHERE scopes CONTAINS ?')282        return self.session.execute(prep.bind([scope]))283    def update_oauth_auth_scopes(self, auth):284        prep = self._prepare(285            'INSERT INTO oauth_authorizations (userid, clientid, scopes) VALUES (?, ?, ?)')286        self.session.execute(prep.bind([auth['userid'], auth['clientid'], auth['scopes']]))287    def get_group(self, groupid):288        return self._get('groups', groupid)289    def delete_group(self, groupid):290        prep = self._prepare('DELETE FROM groups WHERE id = ?')291        self.session.execute(prep.bind([groupid]))292    def insert_group(self, group):293        prep = self._prepare(294            'INSERT INTO groups (id, created, descr, name, owner, updated, public, ' +295            'invitation_token) VALUES (?, ?, ?, ?, ?, ?, ?, ?)')296        self.session.execute(prep.bind([297            group['id'],298            group['created'],299            group['descr'],300            group['name'],301            group['owner'],302            group['updated'],303            group['public'],304            group['invitation_token'],305        ]))306    def get_group_logo(self, groupid):307        return self._get_logo('groups', groupid)308    def get_groups(self, selectors, values, maxrows):309        return self.get_generic('groups', selectors, values, maxrows)310    def get_group_members(self, groupid):311        prep = self._prepare('SELECT * FROM group_members WHERE groupid=?')312        return self.session.execute(prep.bind([groupid]))313    def add_group_member(self, groupid, userid, mtype, status, added_by):314        prep = self._prepare(315            'INSERT INTO group_members (groupid, userid, type, status, added_by) ' +316            'values (?,?,?,?,?)')317        self.session.execute(prep.bind([groupid, userid, mtype, status, added_by]))318    def set_group_member_status(self, groupid, userid, status):319        prep = self._prepare('INSERT INTO group_members (groupid, userid, status) values (?,?,?)')320        self.session.execute(prep.bind([groupid, userid, status]))321    def set_group_member_type(self, groupid, userid, mtype):322        prep = self._prepare('INSERT INTO group_members (groupid, userid, type) values (?,?,?)')323        self.session.execute(prep.bind([groupid, userid, mtype]))324    def del_group_member(self, groupid, userid):325        prep = self._prepare('DELETE FROM group_members WHERE groupid = ? AND userid = ?')326        self.session.execute(prep.bind([groupid, userid]))327    def get_membership_data(self, groupid, userid):328        return self._get_compound_pk('group_members', [groupid, userid],329                                     ['*'], ['groupid', 'userid'])330    def get_group_memberships(self, userid, mtype, status, maxrows):331        selectors = ['userid = ?']332        values = [userid]333        if mtype is not None:334            selectors.append('type = ?')335            values.append(mtype)336        if status is not None:337            selectors.append('status = ?')338            values.append(status)339        return self.get_generic('group_members', selectors, values, maxrows)340    def insert_grep_code(self, grep):341        prep = self._prepare(342            'INSERT INTO grep_codes (id, code, title, type, last_changed) values (?,?,?,?,?)')343        self.session.execute(prep.bind([grep['id'], grep['code'], grep['title'],344                                        grep['type'], grep['last_changed']]))345    def get_grep_code(self, grepid):346        return self._get('grep_codes', grepid, ['*'])347    def get_grep_code_by_code(self, code, greptype):348        prep = self._prepare('SELECT * from grep_codes WHERE code = ? and type = ? ALLOW FILTERING')349        data = list(self.session.execute(prep.bind([code, greptype])))350        if not data:351            raise KeyError('No such grep code')352        return data[0]353    def get_org(self, orgid):354        data = self._get('organizations', orgid)355        if 'name' in data and data['name'] is not None:356            data['name'] = translatable(data['name'])357        return data358    def get_orgs(self, orgids):359        statement = self._get_statement('organizations')360        result = self._get_concurrent(statement, orgids)361        for org in result.values():362            if 'name' in org and org['name'] is not None:363                org['name'] = translatable(org['name'])364        return result365    def get_org_by_realm(self, realm):366        data = self._get('organizations', realm, idcolumn='realm')367        if 'name' in data and data['name'] is not None:368            data['name'] = translatable(data['name'])369        return data370    def list_orgs(self):371        tbl = 'organizations'372        stmt = 'SELECT {} from {}'.format(','.join(self.default_columns[tbl]), tbl)373        prep = self._prepare(stmt)374        data = self.session.execute(prep)375        for item in data:376            if 'name' in item and item['name'] is not None:377                item['name'] = translatable(item['name'])378            yield item379    def insert_org(self, org):380        self.insert_generic(org, 'organizations')381    def delete_org(self, orgid):382        prep = self._prepare('DELETE FROM organizations WHERE id = ?')383        self.session.execute(prep.bind([orgid]))384    def get_org_logo(self, orgid):385        res = self._get('organizations', orgid, ['logo', 'logo_updated'])386        return res['logo'], res['logo_updated']387    def save_org_logo(self, table, itemid, data, updated):388        stmt = 'INSERT INTO {} (id, logo, logo_updated) VALUES (?, ?, ?)'.format(table)389        prep = self._prepare(stmt)390        self.session.execute(prep.bind([itemid, data, updated]))391    def org_use_fs_groups(self, realm):392        prep = self._prepare('SELECT id FROM organizations WHERE realm = ?'393                             ' and services contains \'fsgroups\' ALLOW FILTERING')394        res = list(self.session.execute(prep.bind([realm])))395        if not res:396            return False397        return True398    def is_org_admin(self, identity, orgid):399        prep = self._prepare('SELECT role from orgroles where identity = ? AND orgid = ?')400        res = list(self.session.execute(prep.bind([identity, orgid])))401        if not res:402            return False403        return 'admin' in res[0]['role']404    def get_mandatory_clients(self, realm):405        prep = self._prepare('SELECT clientid from mandatory_clients where realm = ?')406        res = self.session.execute(prep.bind([realm]))407        return [x['clientid'] for x in res]408    def add_mandatory_client(self, realm, clientid):409        prep = self._prepare('INSERT INTO mandatory_clients (realm, clientid) values (?, ?)')410        self.session.execute(prep.bind([realm, clientid]))411    def del_mandatory_client(self, realm, clientid):412        prep = self._prepare('DELETE FROM mandatory_clients WHERE realm = ? AND clientid = ?')413        return self.session.execute(prep.bind([realm, clientid]))414    def add_services(self, org, services):415        stmt = 'UPDATE organizations set services = services + ? WHERE id = ?'416        prep = self._prepare(stmt)417        self.session.execute(prep.bind([services, org]))418    def del_services(self, org, services):419        stmt = 'UPDATE organizations set services = services - ? WHERE id = ?'420        prep = self._prepare(stmt)421        self.session.execute(prep.bind([services, org]))422    def get_roles(self, selectors, values, maxrows):423        return self.get_generic('orgroles', selectors, values, maxrows)424    def insert_role(self, role):425        self.insert_generic(role, 'orgroles')426    def del_role(self, orgid, identity):427        prep = self._prepare('DELETE FROM orgroles WHERE orgid = ? AND identity = ?')428        self.session.execute(prep.bind([orgid, identity]))429    def apigk_allowed_dn(self, dn):430        prep = self._prepare('SELECT dn from remote_apigatekeepers WHERE dn = ?')431        res = list(self.session.execute(prep.bind([dn])))432        return len(res) > 0433    def get_logins_stats(self, clientid, dates, authsource, maxrows):434        cols = 'date, authsource, timeslot, login_count'435        tmpl = 'SELECT {} FROM logins_stats WHERE clientid = ? AND date in ({})'436        dateslots = (','.join(['?']*len(dates)))  # e.g. (?,?,?)437        stmt = tmpl.format(cols, dateslots)438        bindvals = [clientid]439        bindvals += (str(date) for date in dates)440        if maxrows:441            limit = ' LIMIT {}'.format(maxrows)442        else:443            limit = ''444        if authsource:445            stmt += ' AND authsource = ?'446            bindvals.append(authsource)447            limit += ' ALLOW FILTERING'448        stmt += limit449        prep = self._prepare(stmt)450        return self.session.execute(prep.bind(bindvals))451    def get_statistics(self, date, metric):452        statement = 'SELECT * from statistics WHERE date=?'453        if metric:454            statement += " AND metric=?"455        prep = self._prepare(statement)456        params = [date]457        if metric:458            params.append(metric)459        return self.session.execute(prep.bind(params))460    def get_clients_counters(self, maxrows):...unittest_rqlannotation.py
Source:unittest_rqlannotation.py  
...27        self.__class__.repo = repo28        super(SQLGenAnnotatorTC, self).setUp()29    def test_0_1(self):30        with self.admin_access.cnx() as cnx:31            rqlst = self._prepare(cnx, 'Any SEN,RN,OEN WHERE X from_entity SE, '32                                  'SE eid 44, X relation_type R, R eid 139, '33                                  'X to_entity OE, OE eid 42, R name RN, SE name SEN, '34                                  'OE name OEN')35            self.assertEqual(rqlst.defined_vars['SE']._q_invariant, False)36            self.assertEqual(rqlst.defined_vars['OE']._q_invariant, False)37            self.assertEqual(rqlst.defined_vars['R']._q_invariant, False)38            self.assertEqual(rqlst.defined_vars['SE'].stinfo['attrvar'], None)39            self.assertEqual(rqlst.defined_vars['OE'].stinfo['attrvar'], None)40            self.assertEqual(rqlst.defined_vars['R'].stinfo['attrvar'], None)41    def test_0_2(self):42        with self.admin_access.cnx() as cnx:43            rqlst = self._prepare(cnx, 'Any O WHERE NOT S ecrit_par O, S eid 1, '44                                  'S inline1 P, O inline2 P')45            self.assertEqual(rqlst.defined_vars['P']._q_invariant, True)46            self.assertEqual(rqlst.defined_vars['O'].stinfo['attrvar'], None)47    def test_0_4(self):48        with self.admin_access.cnx() as cnx:49            rqlst = self._prepare(cnx, 'Any A,B,C WHERE A eid 12,A comment B, '50                                  'A ?wf_info_for C')51            self.assertEqual(rqlst.defined_vars['A']._q_invariant, False)52            self.assertTrue(rqlst.defined_vars['B'].stinfo['attrvar'])53            self.assertEqual(rqlst.defined_vars['C']._q_invariant, False)54            self.assertEqual(rqlst.solutions, [{'A': 'TrInfo', 'B': 'String', 'C': 'Affaire'},55                                               {'A': 'TrInfo', 'B': 'String', 'C': 'CWUser'},56                                               {'A': 'TrInfo', 'B': 'String', 'C': 'Note'}])57    def test_0_5(self):58        with self.admin_access.cnx() as cnx:59            rqlst = self._prepare(cnx, 'Any P WHERE N ecrit_par P, N eid 0')60            self.assertEqual(rqlst.defined_vars['N']._q_invariant, False)61            self.assertEqual(rqlst.defined_vars['P']._q_invariant, True)62    def test_0_6(self):63        with self.admin_access.cnx() as cnx:64            rqlst = self._prepare(cnx, 'Any P WHERE NOT N ecrit_par P, N eid 512')65            self.assertEqual(rqlst.defined_vars['P']._q_invariant, False)66    def test_0_7(self):67        with self.admin_access.cnx() as cnx:68            rqlst = self._prepare(cnx, 'Personne X,Y where X nom NX, '69                                  'Y nom NX, X eid XE, not Y eid XE')70            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)71            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False)72            self.assertTrue(rqlst.defined_vars['XE'].stinfo['attrvar'])73    def test_0_8(self):74        with self.admin_access.cnx() as cnx:75            rqlst = self._prepare(cnx, 'Any P WHERE X eid 0, NOT X connait P')76            self.assertEqual(rqlst.defined_vars['P']._q_invariant, False)77            self.assertEqual(len(rqlst.solutions), 1, rqlst.solutions)78    def test_0_10(self):79        with self.admin_access.cnx() as cnx:80            rqlst = self._prepare(cnx, 'Any X WHERE X concerne Y, Y is Note')81            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)82            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False)83    def test_0_11(self):84        with self.admin_access.cnx() as cnx:85            rqlst = self._prepare(cnx, 'Any X WHERE X todo_by Y, X is Affaire')86            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)87            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True)88    def test_0_12(self):89        with self.admin_access.cnx() as cnx:90            rqlst = self._prepare(cnx, 'Personne P WHERE P concerne A, '91                                  'A concerne S, S nom "Logilab"')92            self.assertEqual(rqlst.defined_vars['P']._q_invariant, True)93            self.assertEqual(rqlst.defined_vars['A']._q_invariant, True)94            self.assertEqual(rqlst.defined_vars['S']._q_invariant, False)95    def test_1_0(self):96        with self.admin_access.cnx() as cnx:97            rqlst = self._prepare(cnx, 'Any X,Y WHERE X created_by Y, '98                                  'X eid 5, NOT Y eid 6')99            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True)100    def test_1_1(self):101        with self.admin_access.cnx() as cnx:102            rqlst = self._prepare(cnx, 'Any X,Y WHERE X created_by Y, X eid 5, '103                                  'NOT Y eid IN (6,7)')104            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True)105    def test_2(self):106        with self.admin_access.cnx() as cnx:107            rqlst = self._prepare(cnx, 'Any X WHERE X identity Y, Y eid 1')108            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)109    def test_7(self):110        with self.admin_access.cnx() as cnx:111            rqlst = self._prepare(cnx, 'Personne X,Y where X nom NX, Y nom NX, '112                                  'X eid XE, not Y eid XE')113            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)114            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False)115    def test_8(self):116        with self.admin_access.cnx() as cnx:117            # DISTINCT Any P WHERE P require_group %(g)s,118            # NOT %(u)s has_group_permission P, P is CWPermission119            rqlst = self._prepare(cnx, 'DISTINCT Any X WHERE A concerne X, '120                                  'NOT N migrated_from X, '121                                  'X is Note, N eid 1')122            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)123    def test_diff_scope_identity_deamb(self):124        with self.admin_access.cnx() as cnx:125            rqlst = self._prepare(cnx, 'Any X WHERE X concerne Y, Y is Note, '126                                  'EXISTS(Y identity Z, Z migrated_from N)')127            self.assertEqual(rqlst.defined_vars['Z']._q_invariant, True)128            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True)129    def test_optional_inlined(self):130        with self.admin_access.cnx() as cnx:131            rqlst = self._prepare(cnx, 'Any X,S where X from_state S?')132            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)133            self.assertEqual(rqlst.defined_vars['S']._q_invariant, True)134    def test_optional_inlined_2(self):135        with self.admin_access.cnx() as cnx:136            rqlst = self._prepare(cnx, 'Any N,A WHERE N? inline1 A')137            self.assertEqual(rqlst.defined_vars['N']._q_invariant, False)138            self.assertEqual(rqlst.defined_vars['A']._q_invariant, False)139    def test_optional_1(self):140        with self.admin_access.cnx() as cnx:141            rqlst = self._prepare(cnx, 'Any X,S WHERE X travaille S?')142            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)143            self.assertEqual(rqlst.defined_vars['S']._q_invariant, True)144    def test_greater_eid(self):145        with self.admin_access.cnx() as cnx:146            rqlst = self._prepare(cnx, 'Any X WHERE X eid > 5')147            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)148    def test_greater_eid_typed(self):149        with self.admin_access.cnx() as cnx:150            rqlst = self._prepare(cnx, 'Any X WHERE X eid > 5, X is Note')151            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)152    def test_max_eid(self):153        with self.admin_access.cnx() as cnx:154            rqlst = self._prepare(cnx, 'Any MAX(X)')155            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)156    def test_max_eid_typed(self):157        with self.admin_access.cnx() as cnx:158            rqlst = self._prepare(cnx, 'Any MAX(X) WHERE X is Note')159            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)160    def test_all_entities(self):161        with self.admin_access.cnx() as cnx:162            rqlst = self._prepare(cnx, 'Any X')163            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)164    def test_all_typed_entity(self):165        with self.admin_access.cnx() as cnx:166            rqlst = self._prepare(cnx, 'Any X WHERE X is Note')167            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)168    def test_has_text_1(self):169        with self.admin_access.cnx() as cnx:170            rqlst = self._prepare(cnx, 'Any X WHERE X has_text "toto tata"')171            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)172            self.assertEqual(rqlst.defined_vars['X'].stinfo['principal'].r_type,173                             'has_text')174    def test_has_text_2(self):175        with self.admin_access.cnx() as cnx:176            rqlst = self._prepare(cnx, 'Any X WHERE X is Personne, '177                                  'X has_text "coucou"')178            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)179            self.assertEqual(rqlst.defined_vars['X'].stinfo['principal'].r_type,180                             'has_text')181    def test_not_relation_1(self):182        with self.admin_access.cnx() as cnx:183            # P can't be invariant since deambiguification caused by "NOT X require_permission P"184            # is not considered by generated sql (NOT EXISTS(...))185            rqlst = self._prepare(cnx, 'Any P,G WHERE P require_group G, '186                                  'NOT X require_permission P')187            self.assertEqual(rqlst.defined_vars['P']._q_invariant, False)188            self.assertEqual(rqlst.defined_vars['G']._q_invariant, True)189            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)190    def test_not_relation_2(self):191        with self.admin_access.cnx() as cnx:192            rqlst = self._prepare(cnx, 'TrInfo X WHERE X eid 2, '193                                  'NOT X from_state Y, Y is State')194            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True)195            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)196    def test_not_relation_3(self):197        with self.admin_access.cnx() as cnx:198            rqlst = self._prepare(cnx, 'Any X, Y WHERE X eid 1, Y eid in (2, 3)')199            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False)200    def test_not_relation_4_1(self):201        with self.admin_access.cnx() as cnx:202            rqlst = self._prepare(cnx, 'Note X WHERE NOT Y evaluee X')203            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)204            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True)205    def test_not_relation_4_2(self):206        with self.admin_access.cnx() as cnx:207            rqlst = self._prepare(cnx, 'Any X WHERE NOT Y evaluee X')208            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)209            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True)210    def test_not_relation_4_3(self):211        with self.admin_access.cnx() as cnx:212            rqlst = self._prepare(cnx, 'Any Y WHERE NOT Y evaluee X')213            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)214            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False)215    def test_not_relation_4_4(self):216        with self.admin_access.cnx() as cnx:217            rqlst = self._prepare(cnx, 'Any X WHERE NOT Y evaluee X, Y is CWUser')218            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)219            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False)220    def test_not_relation_4_5(self):221        with self.admin_access.cnx() as cnx:222            rqlst = self._prepare(cnx, 'Any X WHERE NOT Y evaluee X, '223                                  'Y eid %s, X is Note' % self.ueid)224            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)225            self.assertEqual(rqlst.solutions, [{'X': 'Note'}])226    def test_not_relation_5_1(self):227        with self.admin_access.cnx() as cnx:228            rqlst = self._prepare(cnx, 'Any X,Y WHERE X name "CWGroup", '229                                  'Y eid IN(1, 2, 3), NOT X read_permission Y')230            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)231            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False)232    def test_not_relation_5_2(self):233        with self.admin_access.cnx() as cnx:234            rqlst = self._prepare(cnx, 'DISTINCT Any X,Y WHERE X name "CWGroup", '235                                  'Y eid IN(1, 2, 3), NOT X read_permission Y')236            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)237            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False)238    def test_not_relation_6(self):239        with self.admin_access.cnx() as cnx:240            rqlst = self._prepare(cnx, 'Personne P where NOT P concerne A')241            self.assertEqual(rqlst.defined_vars['P']._q_invariant, False)242            self.assertEqual(rqlst.defined_vars['A']._q_invariant, True)243    def test_not_relation_7(self):244        with self.admin_access.cnx() as cnx:245            rqlst = self._prepare(cnx, 'Any K,V WHERE P is CWProperty, '246                                  'P pkey K, P value V, NOT P for_user U')247            self.assertEqual(rqlst.defined_vars['P']._q_invariant, False)248            self.assertEqual(rqlst.defined_vars['U']._q_invariant, True)249    def test_exists_1(self):250        with self.admin_access.cnx() as cnx:251            rqlst = self._prepare(cnx, 'Any U WHERE U eid IN (1,2), EXISTS(X owned_by U)')252            self.assertEqual(rqlst.defined_vars['U']._q_invariant, False)253            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)254    def test_exists_2(self):255        with self.admin_access.cnx() as cnx:256            rqlst = self._prepare(cnx, 'Any U WHERE EXISTS(U eid IN (1,2), X owned_by U)')257            self.assertEqual(rqlst.defined_vars['U']._q_invariant, False)258            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)259    def test_exists_3(self):260        with self.admin_access.cnx() as cnx:261            rqlst = self._prepare(cnx, 'Any U WHERE EXISTS(X owned_by U, X bookmarked_by U)')262            self.assertEqual(rqlst.defined_vars['U']._q_invariant, False)263            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)264    def test_exists_4(self):265        with self.admin_access.cnx() as cnx:266            rqlst = self._prepare(cnx, 'Any X,Y WHERE X name "CWGroup", '267                                  'Y eid IN(1, 2, 3), EXISTS(X read_permission Y)')268            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)269            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False)270    def test_exists_5(self):271        with self.admin_access.cnx() as cnx:272            rqlst = self._prepare(cnx, 'DISTINCT Any X,Y WHERE X name "CWGroup", '273                                  'Y eid IN(1, 2, 3), EXISTS(X read_permission Y)')274            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)275            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True)276    def test_not_exists_1(self):277        with self.admin_access.cnx() as cnx:278            rqlst = self._prepare(cnx, 'Any U WHERE NOT EXISTS(X owned_by U, '279                                  'X bookmarked_by U)')280            self.assertEqual(rqlst.defined_vars['U']._q_invariant, False)281            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)282    def test_not_exists_2(self):283        with self.admin_access.cnx() as cnx:284            rqlst = self._prepare(cnx, 'Any X,Y WHERE X name "CWGroup", '285                                  'Y eid IN(1, 2, 3), NOT EXISTS(X read_permission Y)')286            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False)287    def test_not_exists_distinct_1(self):288        with self.admin_access.cnx() as cnx:289            rqlst = self._prepare(cnx, 'DISTINCT Any X,Y WHERE X name "CWGroup", '290                                  'Y eid IN(1, 2, 3), NOT EXISTS(X read_permission Y)')291            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False)292    def test_or_1(self):293        with self.admin_access.cnx() as cnx:294            rqlst = self._prepare(cnx, 'Any X WHERE X concerne B OR '295                                  'C concerne X, B eid 12, C eid 13')296            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)297    def test_or_2(self):298        with self.admin_access.cnx() as cnx:299            rqlst = self._prepare(cnx, 'Any X WHERE X created_by U, X concerne B OR '300                                  'C concerne X, B eid 12, C eid 13')301            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)302            self.assertEqual(rqlst.defined_vars['U']._q_invariant, True)303            self.assertEqual(rqlst.defined_vars['X'].stinfo['principal'].r_type, 'created_by')304    def test_or_3(self):305        with self.admin_access.cnx() as cnx:306            rqlst = self._prepare(cnx, 'Any N WHERE A evaluee N or EXISTS(N todo_by U)')307            self.assertEqual(rqlst.defined_vars['N']._q_invariant, False)308            self.assertEqual(rqlst.defined_vars['A']._q_invariant, True)309            self.assertEqual(rqlst.defined_vars['U']._q_invariant, True)310    def test_or_exists_1(self):311        with self.admin_access.cnx() as cnx:312            # query generated by security rewriting313            rqlst = self._prepare(cnx, 'DISTINCT Any A,S WHERE A is Affaire, S nom "chouette", '314                                  'S is IN(Division, Societe, SubDivision),'315                                  '(EXISTS(A owned_by D)) '316                                  'OR ((((EXISTS(E concerne C?, C owned_by D, A identity E, '317                                  '              C is Note, E is Affaire)) '318                                  'OR (EXISTS(I concerne H?, H owned_by D, H is Societe, '319                                  '           A identity I, I is Affaire))) '320                                  'OR (EXISTS(J concerne G?, G owned_by D, G is SubDivision, '321                                  '           A identity J, J is Affaire))) '322                                  'OR (EXISTS(K concerne F?, F owned_by D, F is Division, '323                                  '           A identity K, K is Affaire)))')324            self.assertEqual(rqlst.defined_vars['A']._q_invariant, False)325            self.assertEqual(rqlst.defined_vars['S']._q_invariant, False)326    def test_or_exists_2(self):327        with self.admin_access.cnx() as cnx:328            rqlst = self._prepare(cnx, 'Any U WHERE EXISTS(U in_group G, G name "managers") OR '329                                  'EXISTS(X owned_by U, X bookmarked_by U)')330            self.assertEqual(rqlst.defined_vars['U']._q_invariant, False)331            self.assertEqual(rqlst.defined_vars['G']._q_invariant, False)332            self.assertEqual(rqlst.defined_vars['X']._q_invariant, True)333    def test_or_exists_3(self):334        with self.admin_access.cnx() as cnx:335            rqlst = self._prepare(cnx, 'Any COUNT(S),CS GROUPBY CS ORDERBY 1 DESC LIMIT 10 '336                                  'WHERE C is Societe, S concerne C, C nom CS, '337                                  '(EXISTS(S owned_by D)) '338                                  'OR (EXISTS(S documented_by N, N title "published"))')339            self.assertEqual(rqlst.defined_vars['S']._q_invariant, True)340            rqlst = self._prepare(cnx, 'Any COUNT(S),CS GROUPBY CS ORDERBY 1 DESC LIMIT 10 '341                                  'WHERE S is Affaire, C is Societe, S concerne C, C nom CS, '342                                  '(EXISTS(S owned_by D)) '343                                  'OR (EXISTS(S documented_by N, N title "published"))')344            self.assertEqual(rqlst.defined_vars['S']._q_invariant, True)345    def test_nonregr_ambiguity(self):346        with self.admin_access.cnx() as cnx:347            rqlst = self._prepare(cnx, 'Note N WHERE N attachment F')348            # N may be an image as well, not invariant349            self.assertEqual(rqlst.defined_vars['N']._q_invariant, False)350            self.assertEqual(rqlst.defined_vars['F']._q_invariant, True)351    def test_nonregr_ambiguity_2(self):352        with self.admin_access.cnx() as cnx:353            rqlst = self._prepare(cnx, 'Any S,SN WHERE X has_text "tot", '354                                  'X in_state S, S name SN, X is CWUser')355            # X use has_text but should not be invariant as ambiguous, and has_text356            # may not be its principal357            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)358            self.assertEqual(rqlst.defined_vars['S']._q_invariant, False)359    def test_remove_from_deleted_source_1(self):360        with self.admin_access.cnx() as cnx:361            rqlst = self._prepare(cnx, 'Note X WHERE X eid 999998, NOT X cw_source Y')362            self.assertNotIn('X', rqlst.defined_vars)  # simplified363            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True)364    def test_remove_from_deleted_source_2(self):365        with self.admin_access.cnx() as cnx:366            rqlst = self._prepare(cnx, 'Note X WHERE X eid IN (999998, 999999), NOT X cw_source Y')367            self.assertEqual(rqlst.defined_vars['X']._q_invariant, False)368            self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True)369    def test_has_text_security_cache_bug(self):370        with self.admin_access.cnx() as cnx:371            rqlst = self._prepare(cnx, 'Any X WHERE X has_text "toto" WITH X BEING '372                                  '(Any C WHERE C is Societe, C nom CS)')373            self.assertTrue(rqlst.parent.has_text_query)374if __name__ == '__main__':375    import unittest...client.py
Source:client.py  
...21        }22        23        # Loads the specified API resource24        if (api_version == 'v2'):25            self.items = self._prepare(v2.Items)26            self.recipes = self._prepare(v2.Recipes)27            self.skins = self._prepare(v2.Skins)28            self.materials = self._prepare(v2.Materials)29            self.search = self._prepare(v2.Search)30            self.maps = self._prepare(v2.Maps)31            self.continents = self._prepare(v2.Continents)32            self.continents.floors = self._prepare(v2.Continents.Floors)33            self.continents.floors = self._prepare(v2.Continents.Floors)34            self.continents.floors.regions = self._prepare(v2.Continents.Floors.Regions)35            self.continents.floors.regions.maps = self._prepare(v2.Continents.Floors.Regions.Maps)36            self.continents.floors.regions.maps.pois = self._prepare(v2.Continents.Floors.Regions.Maps.Subresource, "pois")37            self.continents.floors.regions.maps.sectors = self._prepare(v2.Continents.Floors.Regions.Maps.Subresource, "sectors")38            self.continents.floors.regions.maps.tasks = self._prepare(v2.Continents.Floors.Regions.Maps.Subresource, "tasks")39            self.build = self._prepare(v2.Build)40            self.colors = self._prepare(v2.Colors)41            self.files = self._prepare(v2.Files)42            self.specializations = self._prepare(v2.Specializations)43        elif (api_version =='v1'):44            self.items = self._prepare(v1.Items)45            self.recipes = self._prepare(v1.Recipes)46            self.recipe_details = self._prepare(v1.RecipeDetails)47            self.item_details = self._prepare(v1.ItemDetails)48            self.skins = self._prepare(v1.Skins)49            self.skin_details = self._prepare(v1.Skin_Details)50            self.matches = self._prepare(v1.Matches)51            self.match_details = self._prepare(v1.MatchDetails)52            self.objective_names = self._prepare(v1.ObjectiveNames)53            self.event_names = self._prepare(v1.EventNames)54            self.map_names = self._prepare(v1.MapNames)55            self.world_names = self._prepare(v1.WorldNames)56            self.guild_details = self._prepare(v1.GuildDetails)57            self.continents = self._prepare(v1.Continents)58            self.maps = self._prepare(v1.Maps)59            self.map_floor = self._prepare(v1.Map_Floor)60            self.build = self._prepare(v1.Build)61            self.colors = self._prepare(v1.Colors)62            self.files = self._prepare(v1.Files)63            self.event_details = self._prepare(v1.EventDetails)64        else:65            raise ValueError("Only v1 or v2 accepted")66    67    def _prepare(self, resource, subresource_name=None):68        """69        Intializes the resource using the settings supplied70        """71        if subresource_name is not None:72            return resource(self.options, self.session, subresource_name)...Command.js
Source:Command.js  
...6test('arguments:required', t => {7	const command = new Command()8	command.arguments = [new InputArgument('arg', InputArgument.VALUE_REQUIRED)]9	try {10		command._prepare(new Input([]))11		t.fail('Should have thrown an error')12	} catch (err) {13		t.pass()14	}15	command._prepare(new Input(['command', 'value']))16	t.true(command.containsArgument('arg'))17	t.is(command.argument('arg'), 'value')18})19test('arguments:optional', t => {20	const command = new Command()21	command.arguments = [new InputArgument('arg', InputArgument.VALUE_OPTIONAL)]22	command._prepare(new Input(['command']))23	t.false(command.containsArgument('arg'))24	command._prepare(new Input(['command', 'value']))25	t.true(command.containsArgument('arg'))26	t.is(command.argument('arg'), 'value')27})28test('arguments:default', t => {29	const command = new Command()30	command.arguments = [new InputArgument('arg', InputArgument.VALUE_OPTIONAL, null, 'def')]31	command._prepare(new Input(['command']))32	t.is(command.argument('arg'), 'def')33	command._prepare(new Input(['command', 'value']))34	t.is(command.argument('arg'), 'value')35})36test('arguments:fallback', t => {37	const command = new Command()38	command.arguments = [new InputArgument('arg', InputArgument.VALUE_OPTIONAL)]39	command._prepare(new Input(['command']))40	t.is(command.argument('arg'), null)41	t.is(command.argument('arg', 'fall'), 'fall')42	command._prepare(new Input(['command', 'value']))43	t.is(command.argument('arg'), 'value')44	t.is(command.argument('arg', 'fall'), 'value')45})46test('options:optional', t => {47	const command = new Command()48	command.options = [new InputOption('opt', InputOption.VALUE_OPTIONAL)]49	command._prepare(new Input(['command']))50	t.false(command.containsOption('opt'))51	command._prepare(new Input(['command', '--opt=ion']))52	t.true(command.containsOption('opt'))53})54test('options:default', t => {55	const command = new Command()56	command.options = [new InputOption('opt', InputOption.VALUE_OPTIONAL, null, 5)]57	command._prepare(new Input(['command']))58	t.is(command.option('opt'), 5)59	command._prepare(new Input(['command', '--opt=10']))60	t.is(command.option('opt'), '10')61})62test('options:fallback', t => {63	const command = new Command()64	command.options = [new InputOption('opt', InputOption.VALUE_OPTIONAL)]65	command._prepare(new Input(['command']))66	t.is(command.option('opt'), null)67	t.is(command.option('opt', 5), 5)68	command._prepare(new Input(['command', '--opt=10']))69	t.is(command.option('opt'), '10')70	t.is(command.option('opt', 5), '10')71})72test('options:flag', t => {73	const command = new Command()74	command.options = [new InputOption('opt', InputOption.VALUE_NONE)]75	command._prepare(new Input(['command']))76	t.is(command.containsOption('opt'), false)77	t.is(command.option('opt'), null)78	command._prepare(new Input(['command', '--opt']))79	t.is(command.containsOption('opt'), true)80	t.is(command.option('opt'), true)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
