Best Python code snippet using pytest
dbpool.py
Source:dbpool.py  
1# coding: utf-82import datetime3import logging4import os5import random6import threading7import time8import types9debug = True10dbpool = {}11def timeit(func):12    def _(*args, **kwargs):13        start = time.time()14        err = ''15        try:16            retval = func(*args, **kwargs)17            return retval18        except Exception as e:19            err = str(e)20            raise21        finally:22            end = time.time()23            conn = args[0]24            dbcf = conn.param25            logging.info('name=%s|user=%s|addr=%s:%d|db=%s|time=%d|sql=%s|err=%s',26                         conn.name, dbcf.get('user', ''),27                         dbcf.get('host', ''), dbcf.get('port', 0),28                         os.path.basename(dbcf.get('db', '')),29                         int((end - start) * 1000000),30                         repr(args[1])[:100], err)31    return _32class DBPoolBase:33    def acquire(self, name):34        pass35    def release(self, name, conn):36        pass37class DBResult:38    def __init__(self, fields, data):39        self.fields = fields40        self.data = data41    def todict(self):42        ret = []43        for item in self.data:44            ret.append(dict(zip(self.fields, item)))45        return ret46    def __iter__(self):47        for row in self.data:48            yield dict(zip(self.fields, row))49    def row(self, i, isdict=True):50        if isdict:51            return dict(zip(self.fields, self.data[i]))52        return self.data[i]53    def __getitem__(self, i):54        return dict(zip(self.fields, self.data[i]))55class DBFunc:56    def __init__(self, data):57        self.value = data58class DBConnection:59    def __init__(self, param, lasttime, status):60        self.name = None61        self.param = param62        self.conn = None63        self.status = status64        self.lasttime = lasttime65        self.pool = None66    def is_available(self):67        if self.status == 0:68            return True69        return False70    def useit(self):71        self.status = 172        self.lasttime = time.time()73    def releaseit(self):74        self.status = 075    def connect(self):76        pass77    def close(self):78        pass79    def alive(self):80        pass81    def cursor(self):82        return self.conn.cursor()83    84    @timeit85    def execute(self, sql, param=None):86        # logging.debug(sql[:200])87        cur = self.conn.cursor()88        try:89            if param:90                cur.execute(sql, param)91            else:92                cur.execute(sql)93        except Exception, e:94            raise e95            # logging.warning(e)96            self.connect()97            if param:98                cur.execute(sql, param)99            else:100                cur.execute(sql)101        ret = cur.fetchall()102        cur.close()103        return ret104    @timeit105    def executemany(self, sql, param):106        cur = self.conn.cursor()107        try:108            ret = cur.executemany(sql, param)109        except:110            self.connect()111            ret = cur.executemany(sql, param)112        cur.close()113        return ret114    @timeit115    def query(self, sql, param=None, isdict=True):116        '''sqlæ¥è¯¢ï¼è¿åæ¥è¯¢ç»æ'''117        cur = self.conn.cursor()118        try:119            if not param:120                cur.execute(sql)121            else:122                cur.execute(sql, param)123        except:124            self.connect()125            if not param:126                cur.execute(sql)127            else:128                cur.execute(sql, param)129        res = cur.fetchall()130        cur.close()131        # logging.info('desc:', cur.description)132        if res and isdict:133            ret = []134            xkeys = [i[0] for i in cur.description]135            for item in res:136                ret.append(dict(zip(xkeys, item)))137        else:138            ret = res139        return ret140    @timeit141    def get(self, sql, param=None, isdict=True):142        '''sqlæ¥è¯¢ï¼åªè¿å䏿¡'''143        cur = self.conn.cursor()144        try:145            if not param:146                cur.execute(sql)147            else:148                cur.execute(sql, param)149        except:150            self.connect()151            if not param:152                cur.execute(sql)153            else:154                cur.execute(sql, param)155        res = cur.fetchone()156        cur.close()157        if res and isdict:158            xkeys = [i[0] for i in cur.description]159            return dict(zip(xkeys, res))160        else:161            return res162    def value2sql(self, v, charset='utf-8'):163        tv = type(v)164        if tv in [types.StringType, types.UnicodeType]:165            if tv == types.UnicodeType:166                v = v.encode(charset)167            if v.startswith(('now()', 'md5(')):168                return v169            # v = self.escape(v)170            return "'%s'" % v if not v or v[0] != "'" else v171        elif isinstance(v, datetime.datetime):172            return "'%s'" % str(v)173        elif isinstance(v, DBFunc):174            return v.value175        else:176            if v is None:177                return 'NULL'178            return str(v)179    def dict2sql(self, d, sp=','):180        space_char = '"' if isinstance(self, PGConnection) else '`'181        """åå
¸å¯ä»¥æ¯ {name:value} å½¢å¼ï¼ä¹å¯ä»¥æ¯ {name:(operator, value)}"""182        x = []183        for k, v in d.items():184            if isinstance(v, types.TupleType):185                x.append('%s' % self.exp2sql(k, v[0], v[1]))186            else:187                x.append('%s%s%s=%s' % (space_char,188                                        k.strip(' %s'%space_char).replace('.', '%s.%s'%(space_char, space_char)),189                                        space_char, self.value2sql(v)))190        return sp.join(x)191    def exp2sql(self, key, op, value):192        space_char = '"' if isinstance(self, PGConnection) else '`'193        item = '(%s%s%s %s ' % (space_char, key.strip(space_char).replace('.', '%s.%s'%(space_char, space_char)),194                                space_char, op.strip())195        if op in ['in', 'not in']:196            item += '(%s))' % ','.join([self.value2sql(x) for x in value])197        elif op == 'between':198            item += ' %s and %s)' % (self.value2sql(value[0]), self.value2sql(value[1]))199        else:200            item += self.value2sql(value) + ')'201        return item202    def dict2insert(self, d):203        space_char = '"' if isinstance(self, PGConnection) else '`'204        keys = d.keys()205        vals = []206        for k in keys:207            vals.append('%s' % self.value2sql(d[k]))208        new_keys = [space_char + k.strip(space_char) + space_char for k in keys]209        return ','.join(new_keys), ','.join(vals)210    def insert(self, table, values):211        # sql = "insert into %s set %s" % (table, self.dict2sql(values))212        keys, vals = self.dict2insert(values)213        sql = "insert into %s(%s) values (%s)" % (table, keys, vals)214        ret = self.execute(sql)215        if ret:216            ret = self.last_insert_id()217        return ret218    def insert_ignore(self, table, values):219        keys, vals = self.dict2insert(values)220        sql = "insert ignore into %s(%s) values (%s)" % (table, keys, vals)221        ret = self.execute(sql)222        if ret:223            ret = self.last_insert_id()224        return ret225    def update(self, table, values, where=None):226        sql = "update %s set %s" % (table, self.dict2sql(values))227        if where:228            sql += " where %s" % self.dict2sql(where, ' and ')229        return self.execute(sql)230    def delete(self, table, where):231        sql = "delete from %s" % table232        if where:233            sql += " where %s" % self.dict2sql(where, ' and ')234        return self.execute(sql)235    def select(self, table, where=None, fields='*', other=None, isdict=True):236        sql = "select %s from %s" % (fields, table)237        if where:238            sql += " where %s" % self.dict2sql(where, ' and ')239        if other:240            sql += ' ' + other241        return self.query(sql, None, isdict=isdict)242    def select_one(self, table, where=None, fields='*', other=None, isdict=True):243        sql = "select %s from %s" % (fields, table)244        if where:245            sql += " where %s" % self.dict2sql(where, ' and ')246        if other:247            sql += ' ' + other248        return self.get(sql, None, isdict=isdict)249    def get_page_data(self, table, where=None, fields='*', other=None, isdict=True, page_num=1, page_count=20):250        """æ ¹æ®æ¡ä»¶è¿è¡å页æ¥è¯¢251        get_page_data252        return all_count, page_num, data253            all_count: æææ°æ®æ¡ç®æ°254            page_num: å½åè¿åæ°æ®ç页ç 255            data: 页颿°æ®å
容256        """257        page_num = int(page_num)258        page_count = int(page_count)259        count_data = self.select_one(table, where, "count(*) as count", other, isdict=isdict)260        if count_data and count_data['count']:261            all_count = count_data["count"]262        else:263            all_count = 0264        if all_count == 0:265            return 0, page_num, []266       267        offset = page_num * page_count - page_count268        if offset > all_count:269            data = []270        else:271            other += " limit %d offset %d" % (page_count, offset)272            data = self.select(table, where, fields, other, isdict=isdict)273        return all_count, page_num, data274    def last_insert_id(self):275        pass276    def start(self):  # start transaction277        pass278    def commit(self):279        self.conn.commit()280    def rollback(self):281        self.conn.rollback()282    def escape(self, s):283        return s284def with_mysql_reconnect(func):285    def _(self, *args, **argitems):286        try:287            import MySQLdb288            mysqllib = MySQLdb289        except:290            logging.info("MySQLdb load error! Load pymysql...")291            import pymysql292            mysqllib = pymysql293        trycount = 3294        while True:295            try:296                x = func(self, *args, **argitems)297            except mysqllib.OperationalError as e:298                # logging.err('mysql error:', e)299                if e[0] >= 2000:  # client error300                    # logging.err('reconnect ...')301                    self.conn.close()302                    self.connect()303                    trycount -= 1304                    if trycount > 0:305                        continue306                raise307            else:308                return x309    return _310def with_pg_reconnect(func):311    def _(self, *args, **argitems):312        import psycopg2313        trycount = 3314        while True:315            try:316                x = func(self, *args, **argitems)317            except psycopg2.OperationalError as e:318                # logging.err('mysql error:', e)319                if e[0] >= 2000:  # client error320                    # logging.err('reconnect ...')321                    self.conn.close()322                    self.connect()323                    trycount -= 1324                    if trycount > 0:325                        continue326                raise327            else:328                return x329    return _330class PGConnection(DBConnection):331    name = "pg"332    def __init__(self, param, lasttime, status):333        DBConnection.__init__(self, param, lasttime, status)334        self.connect()335    def useit(self):336        self.status = 1337        self.lasttime = time.time()338    def releaseit(self):339        self.status = 0340    def connect(self):341        engine = self.param['engine']342        if engine == 'pg':343            import psycopg2344            self.conn = psycopg2.connect(host=self.param['host'],345                                         port=self.param['port'],346                                         user=self.param['user'],347                                         password=self.param['passwd'],348                                         database=self.param['db']349                                         )350            self.conn.autocommit = 1351        else:352            raise ValueError('engine error:' + engine)353            # logging.note('mysql connected', self.conn)354    def close(self):355        self.conn.close()356        self.conn = None357    @with_pg_reconnect358    def alive(self):359        if self.is_available():360            cur = self.conn.cursor()361            cur.query("show tables;")362            cur.close()363            self.conn.ping()364    @with_pg_reconnect365    def execute(self, sql, param=None):366        return DBConnection.execute(self, sql, param)367    @with_pg_reconnect368    def executemany(self, sql, param):369        return DBConnection.executemany(self, sql, param)370    @with_pg_reconnect371    def query(self, sql, param=None, isdict=True):372        return DBConnection.query(self, sql, param, isdict)373    @with_pg_reconnect374    def get(self, sql, param=None, isdict=True):375        return DBConnection.get(self, sql, param, isdict)376    def escape(self, s, enc='utf-8'):377        if type(s) == types.UnicodeType:378            s = s.encode(enc)379        import psycopg2380        ns = psycopg2._param_escape(s)381        return unicode(ns, enc)382    def last_insert_id(self):383        ret = self.query('select last_insert_id()', isdict=False)384        return ret[0][0]385    def start(self):386        sql = "start transaction"387        return self.execute(sql)388    def insert(self, table, values):389        # sql = "insert into %s set %s" % (table, self.dict2sql(values))390        ret = 0391        try:392            keys, vals = self.dict2insert(values)393            sql = "insert into %s(%s) values (%s) RETURNING id" % (table, keys, vals)394            data = self.query(sql)395            if data:396                ret = data[0].get('id')397        except Exception as e:398            logging.error(e)399        return ret400    def insert_ignore(self, table, values):401        return self.insert(table, values)402class MySQLConnection(DBConnection):403    name = "mysql"404    def __init__(self, param, lasttime, status):405        DBConnection.__init__(self, param, lasttime, status)406        self.connect()407    def useit(self):408        self.status = 1409        self.lasttime = time.time()410    def releaseit(self):411        self.status = 0412    def connect(self):413        engine = self.param['engine']414        if engine == 'mysql':415            try:416                import MySQLdb417                mysqllib = MySQLdb418            except:419                logging.info("MySQLdb load error! Load pymysql...")420                import pymysql421                mysqllib = pymysql422            self.conn = mysqllib.connect(host=self.param['host'],423                                         port=self.param['port'],424                                         user=self.param['user'],425                                         passwd=self.param['passwd'],426                                         db=self.param['db'],427                                         charset=self.param['charset'],428                                         connect_timeout=self.param.get('timeout', 20),429                                         )430            self.conn.autocommit(1)431            # if self.param.get('autocommit',None):432            #    logging.note('set autocommit')433            #    self.conn.autocommit(1)434            # initsqls = self.param.get('init_command')435            # if initsqls:436            #    logging.note('init sqls:', initsqls)437            #    cur = self.conn.cursor()438            #    cur.execute(initsqls)439            #    cur.close()440        else:441            raise ValueError('engine error:' + engine)442            # logging.note('mysql connected', self.conn)443    def close(self):444        self.conn.close()445        self.conn = None446    @with_mysql_reconnect447    def alive(self):448        if self.is_available():449            cur = self.conn.cursor()450            cur.execute("show tables;")451            cur.close()452            self.conn.ping()453    @with_mysql_reconnect454    def execute(self, sql, param=None):455        return DBConnection.execute(self, sql, param)456    @with_mysql_reconnect457    def executemany(self, sql, param):458        return DBConnection.executemany(self, sql, param)459    @with_mysql_reconnect460    def query(self, sql, param=None, isdict=True):461        return DBConnection.query(self, sql, param, isdict)462    @with_mysql_reconnect463    def get(self, sql, param=None, isdict=True):464        return DBConnection.get(self, sql, param, isdict)465    def escape(self, s, enc='utf-8'):466        if type(s) == types.UnicodeType:467            s = s.encode(enc)468        ns = self.conn.escape_string(s)469        return unicode(ns, enc)470    def last_insert_id(self):471        ret = self.query('select last_insert_id()', isdict=False)472        return ret[0][0]473    def start(self):474        sql = "start transaction"475        return self.execute(sql)476class SQLiteConnection(DBConnection):477    name = "sqlite"478    def __init__(self, param, lasttime, status):479        DBConnection.__init__(self, param, lasttime, status)480    def connect(self):481        engine = self.param['engine']482        if engine == 'sqlite':483            import sqlite3484            self.conn = sqlite3.connect(self.param['db'], isolation_level=None)485        else:486            raise ValueError('engine error:' + engine)487    def useit(self):488        DBConnection.useit(self)489        if not self.conn:490            self.connect()491    def releaseit(self):492        DBConnection.releaseit(self)493        self.conn.close()494        self.conn = None495    def escape(self, s, enc='utf-8'):496        s = s.replace("'", "\\'")497        s = s.replace('"', '\\"')498        return s499    def last_insert_id(self):500        ret = self.query('select last_insert_rowid()', isdict=False)501        return ret[0][0]502    def start(self):503        sql = "BEGIN"504        return self.conn.execute(sql)505class DBPool(DBPoolBase):506    def __init__(self, dbcf):507        # one item: [conn, last_get_time, stauts]508        self.dbconn_idle = []509        self.dbconn_using = []510        self.dbcf = dbcf511        self.max_conn = 20512        self.min_conn = 1513        if 'conn' in self.dbcf:514            self.max_conn = self.dbcf['conn']515        self.connection_class = {}516        x = globals()517        for v in x.values():518            try:519                class_type = types.ClassType520            except:521                class_type = type522            if type(v) == class_type and v != DBConnection and issubclass(v, DBConnection):523                self.connection_class[v.name] = v524        self.lock = threading.Lock()525        self.cond = threading.Condition(self.lock)526        self.open(self.min_conn)527    def synchronize(func):528        def _(self, *args, **argitems):529            self.lock.acquire()530            x = None531            try:532                x = func(self, *args, **argitems)533            finally:534                self.lock.release()535            return x536        return _537    def open(self, n=1):538        param = self.dbcf539        newconns = []540        for i in range(0, n):541            try:542                myconn = self.connection_class[param['engine']](param, time.time(), 0)543                newconns.append(myconn)544            except Exception as e:545                logging.info(e)546                logging.error("%s connection error!" % param)547        self.dbconn_idle += newconns548    def clear_timeout(self):549        # logging.info('try clear timeout conn ...')550        now = time.time()551        dels = []552        allconn = len(self.dbconn_idle) + len(self.dbconn_using)553        for c in self.dbconn_idle:554            if allconn == 1:555                break556            if now - c.lasttime > 10:557                dels.append(c)558                allconn -= 1559        logging.warn('close timeout db conn:%d', len(dels))560        for c in dels:561            c.close()562            self.dbconn_idle.remove(c)563    @synchronize564    def acquire(self, timeout=None):565        try_count = 10566        while len(self.dbconn_idle) == 0:567            try_count -= 1568            if not try_count:569                break570            if len(self.dbconn_idle) + len(self.dbconn_using) < self.max_conn:571                self.open()572                continue573            self.cond.wait(timeout)574        if not self.dbconn_idle:575            return None576        conn = self.dbconn_idle.pop(0)577        conn.useit()578        self.dbconn_using.append(conn)579        if random.randint(0, 100) > 80:580            self.clear_timeout()581        return conn582    @synchronize583    def release(self, conn):584        self.dbconn_using.remove(conn)585        conn.releaseit()586        self.dbconn_idle.insert(0, conn)587        self.cond.notify()588    @synchronize589    def alive(self):590        for conn in self.dbconn_idle:591            conn.alive()592    def size(self):593        return len(self.dbconn_idle), len(self.dbconn_using)594class DBConnProxy:595    def __init__(self, master_conn, slave_conn):596        # self.name   = ''597        self.master = master_conn598        self.slave = slave_conn599        self._modify_methods = set(600            ['execute', 'executemany', 'last_insert_id', 'insert', 'update', 'delete', 'insert_list'])601        self._master_methods = {602            'selectw_one': 'select_one',603            'selectw': 'select',604            'queryw': 'query',605            'getw': 'get',606        }607    def __getattr__(self, name):608        if name in self._modify_methods:609            return getattr(self.master, name)610        elif name in self._master_methods:611            return getattr(self.master, self._master_methods[name])612        else:613            return getattr(self.slave, name)614class RWDBPool:615    def __init__(self, dbcf):616        self.dbcf = dbcf617        self.name = ''618        self.policy = dbcf.get('policy', 'round_robin')619        self.master = DBPool(dbcf.get('master', None))620        self.slaves = []621        self._slave_current = -1622        for x in dbcf.get('slave', []):623            self.slaves.append(DBPool(x))624    def get_slave(self):625        if self.policy == 'round_robin':626            size = len(self.slaves)627            self._slave_current = (self._slave_current + 1) % size628            return self.slaves[self._slave_current]629        else:630            raise ValueError('policy not support')631    def get_master(self):632        return self.master633    def acquire(self, timeout=None):634        # logging.debug('rwdbpool acquire')635        master_conn = None636        slave_conn = None637        try:638            master_conn = self.master.acquire(timeout)639            slave_conn = self.get_slave().acquire(timeout)640            return DBConnProxy(master_conn, slave_conn)641        except:642            if master_conn:643                master_conn.pool.release(master_conn)644            if slave_conn:645                slave_conn.pool.release(slave_conn)646            raise647    def release(self, conn):648        # logging.debug('rwdbpool release')649        conn.master.pool.release(conn.master)650        conn.slave.pool.release(conn.slave)651    def size(self):652        ret = {'master': self.master.size(), 'slave': []}653        for x in self.slaves:654            key = '%s@%s:%d' % (x.dbcf['user'], x.dbcf['host'], x.dbcf['port'])655            ret['slave'].append((key, x.size()))656        return ret657def checkalive(name=None):658    global dbpool659    while True:660        if name is None:661            checknames = dbpool.keys()662        else:663            checknames = [name]664        for k in checknames:665            pool = dbpool[k]666            pool.alive()667        time.sleep(300)668def install(cf, force=False):669    global dbpool670    if dbpool and not force:671        return dbpool672    dbpool = {}673    for name, item in cf.items():674        # item = cf[name]675        dbp = None676        if 'master' in item:677            dbp = RWDBPool(item)678        else:679            dbp = DBPool(item)680        dbpool[name] = dbp681    return dbpool682def acquire(name, timeout=None):683    global dbpool684    # logging.info("acquire:", name)685    pool = dbpool.get(name, None)686    x = None687    if pool:688        x = pool.acquire(timeout)689        if x:690            x.name = name691    return x692def release(conn):693    global dbpool694    # logging.info("release:", name)695    if not conn:696        return None697    pool = dbpool[conn.name]698    return pool.release(conn)699def execute(db, sql, param=None):700    return db.execute(sql, param)701def executemany(db, sql, param):702    return db.executemany(sql, param)703def query(db, sql, param=None, isdict=True):704    return db.query(sql, param, isdict)705def with_database(name, errfunc=None, errstr=''):706    def f(func):707        def _(self, *args, **argitems):708            self.db = acquire(name)709            x = None710            try:711                x = func(self, *args, **argitems)712            except:713                if errfunc:714                    return getattr(self, errfunc)(error=errstr)715                else:716                    raise717            finally:718                release(self.db)719                self.db = None720            return x721        return _722    return f723def with_database_class(name):724    def _(cls):725        try:726            cls.db = acquire(name)727        except:728            cls.db = None729        finally:730            release(cls.db)731        return cls..._dict.py
Source:_dict.py  
1from __future__ import annotations2from typing import Any, Callable, Container, Dict, Optional, Union, overload3from ._base import DirtyEquals, DirtyEqualsMeta4NotGiven = object()5class IsDict(DirtyEquals[Dict[Any, Any]]):6    """7    Base class for comparing dictionaries. By default, `IsDict` isn't particularly useful on its own8    (it behaves pretty much like a normal `dict`), but it can be subclassed9    (see [`IsPartialDict`][dirty_equals.IsPartialDict] and [`IsStrictDict`][dirty_equals.IsStrictDict]) or modified10    with `.settings(...)` to powerful things.11    """12    @overload13    def __init__(self, expected: Dict[Any, Any]):14        ...15    @overload16    def __init__(self, **expected: Any):17        ...18    def __init__(self, *expected_args: Dict[Any, Any], **expected_kwargs: Any):19        """20        Can be created from either key word arguments or an existing dictionary (same as `dict()`).21        `IsDict` is not particularly useful on its own, but it can be subclassed or modified with22        [`.settings(...)`][dirty_equals.IsDict.settings] to facilitate powerful comparison of dictionaries.23        ```py title="IsDict"24        from dirty_equals import IsDict25        assert {'a': 1, 'b': 2} == IsDict(a=1, b=2)26        assert {1: 2, 3: 4} == IsDict({1: 2, 3: 4})27        ```28        """29        if expected_kwargs:30            self.expected_values = expected_kwargs31            if expected_args:32                raise TypeError('IsDict requires either a single argument or kwargs, not both')33        elif not expected_args:34            self.expected_values = {}35        elif len(expected_args) == 1:36            self.expected_values = expected_args[0]37        else:38            raise TypeError(f'IsDict expected at most 1 argument, got {len(expected_args)}')39        if not isinstance(self.expected_values, dict):40            raise TypeError(f'expected_values must be a dict, got {type(self.expected_values)}')41        self.strict = False42        self.partial = False43        self.ignore: Union[None, Container[Any], Callable[[Any], bool]] = None44        self._post_init()45        super().__init__()46    def _post_init(self) -> None:47        pass48    def settings(49        self,50        *,51        strict: Optional[bool] = None,52        partial: Optional[bool] = None,53        ignore: Union[None, Container[Any], Callable[[Any], bool]] = NotGiven,  # type: ignore[assignment]54    ) -> IsDict:55        """56        Allows you to customise the behaviour of `IsDict`, technically a new `IsDict` is required to allow chaining.57        Args:58            strict (bool): If `True`, the order of key/value pairs must match.59            partial (bool): If `True`, only keys include in the wrapped dict are checked.60            ignore (Union[None, Container[Any], Callable[[Any], bool]]): Values to omit from comparison.61                Can be either a `Container` (e.g. `set` or `list`) of values to ignore, or a function that takes a62                value and should return `True` if the value should be ignored.63        ```py title="IsDict.settings(...)"64        from dirty_equals import IsDict65        assert {'a': 1, 'b': 2, 'c': None} != IsDict(a=1, b=2)66        assert {'a': 1, 'b': 2, 'c': None} == IsDict(a=1, b=2).settings(partial=True) #(1)!67        assert {'b': 2, 'a': 1} == IsDict(a=1, b=2)68        assert {'b': 2, 'a': 1} != IsDict(a=1, b=2).settings(strict=True) #(2)!69        # combining partial and strict70        assert {'a': 1, 'b': None, 'c': 3} == IsDict(a=1, c=3).settings(strict=True, partial=True)71        assert {'b': None, 'c': 3, 'a': 1} != IsDict(a=1, c=3).settings(strict=True, partial=True)72        ```73        1. This is the same as [`IsPartialDict(a=1, b=2)`][dirty_equals.IsPartialDict]74        2. This is the same as [`IsStrictDict(a=1, b=2)`][dirty_equals.IsStrictDict]75        """76        new_cls = self.__class__(self.expected_values)77        new_cls.__dict__ = self.__dict__.copy()78        if strict is not None:79            new_cls.strict = strict80        if partial is not None:81            new_cls.partial = partial82        if ignore is not NotGiven:83            new_cls.ignore = ignore84        if new_cls.partial and new_cls.ignore:85            raise TypeError('partial and ignore cannot be used together')86        return new_cls87    def equals(self, other: Dict[Any, Any]) -> bool:88        if not isinstance(other, dict):89            return False90        expected = self.expected_values91        if self.partial:92            other = {k: v for k, v in other.items() if k in expected}93        if self.ignore:94            expected = self._filter_dict(self.expected_values)95            other = self._filter_dict(other)96        if other != expected:97            return False98        if self.strict and list(other.keys()) != list(expected.keys()):99            return False100        return True101    def _filter_dict(self, d: Dict[Any, Any]) -> Dict[Any, Any]:102        return {k: v for k, v in d.items() if not self._ignore_value(v)}103    def _ignore_value(self, v: Any) -> bool:104        if isinstance(v, (DirtyEquals, DirtyEqualsMeta)):105            return False106        elif callable(self.ignore):107            return self.ignore(v)108        else:109            try:110                return v in self.ignore  # type: ignore[operator]111            except TypeError:112                # happens for unhashable types113                return False114    def _repr_ne(self) -> str:115        name = self.__class__.__name__116        modifiers = []117        if self.partial != (name == 'IsPartialDict'):118            modifiers += [f'partial={self.partial}']119        if (self.ignore == {None}) != (name == 'IsIgnoreDict') or self.ignore not in (None, {None}):120            r = self.ignore.__name__ if callable(self.ignore) else repr(self.ignore)121            modifiers += [f'ignore={r}']122        if self.strict != (name == 'IsStrictDict'):123            modifiers += [f'strict={self.strict}']124        if modifiers:125            mod = f'[{", ".join(modifiers)}]'126        else:127            mod = ''128        args = [f'{k}={v!r}' for k, v in self.expected_values.items()]129        return f'{name}{mod}({", ".join(args)})'130class IsPartialDict(IsDict):131    """132    Partial dictionary comparison, this is the same as133    [`IsDict(...).settings(partial=True)`][dirty_equals.IsDict.settings].134    ```py title="IsPartialDict"135    from dirty_equals import IsPartialDict136    assert {'a': 1, 'b': 2, 'c': 3} == IsPartialDict(a=1, b=2)137    assert {'a': 1, 'b': 2, 'c': 3} != IsPartialDict(a=1, b=3)138    assert {'a': 1, 'b': 2, 'd': 3} != IsPartialDict(a=1, b=2, c=3)139    # combining partial and strict140    assert {'a': 1, 'b': None, 'c': 3} == IsPartialDict(a=1, c=3).settings(strict=True)141    assert {'b': None, 'c': 3, 'a': 1} != IsPartialDict(a=1, c=3).settings(strict=True)142    ```143    """144    def _post_init(self) -> None:145        self.partial = True146class IsIgnoreDict(IsDict):147    """148    Dictionary comparison with `None` values ignored, this is the same as149    [`IsDict(...).settings(ignore={None})`][dirty_equals.IsDict.settings].150    `.settings(...)` can be used to customise the behaviour of `IsIgnoreDict`, in particular changing which151    values are ignored.152    ```py title="IsIgnoreDict"153    from dirty_equals import IsIgnoreDict154    assert {'a': 1, 'b': 2, 'c': None} == IsIgnoreDict(a=1, b=2)155    assert {'a': 1, 'b': 2, 'c': None, 'c': 'ignore'} == (156        IsIgnoreDict(a=1, b=2).settings(ignore={None, 'ignore'})157    )158    def is_even(v: int) -> bool:159        return v % 2 == 0160    assert {'a': 1, 'b': 2, 'c': 3, 'd': 4} == (161        IsIgnoreDict(a=1, c=3).settings(ignore=is_even)162    )163    # combining partial and strict164    assert {'a': 1, 'b': None, 'c': 3} == IsIgnoreDict(a=1, c=3).settings(strict=True)165    assert {'b': None, 'c': 3, 'a': 1} != IsIgnoreDict(a=1, c=3).settings(strict=True)166    ```167    """168    def _post_init(self) -> None:169        self.ignore = {None}170class IsStrictDict(IsDict):171    """172    Dictionary comparison with order enforced, this is the same as173    [`IsDict(...).settings(strict=True)`][dirty_equals.IsDict.settings].174    ```py title="IsDict.settings(...)"175    from dirty_equals import IsStrictDict176    assert {'a': 1, 'b': 2} == IsStrictDict(a=1, b=2)177    assert {'a': 1, 'b': 2, 'c': 3} != IsStrictDict(a=1, b=2)178    assert {'b': 2, 'a': 1} != IsStrictDict(a=1, b=2)179    # combining partial and strict180    assert {'a': 1, 'b': None, 'c': 3} == IsStrictDict(a=1, c=3).settings(partial=True)181    assert {'b': None, 'c': 3, 'a': 1} != IsStrictDict(a=1, c=3).settings(partial=True)182    ```183    """184    def _post_init(self) -> None:...items.py
Source:items.py  
1# -*- coding: utf-8 -*-2# Define here the models for your scraped items3#4# See documentation in:5# http://doc.scrapy.org/en/latest/topics/items.html6from scrapy.item import Item, Field7class TeamItem(Item):8    _id = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)9    collection = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)10    url = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)11    team_id = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)12    name = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=True, searchWeight=100)13    competitions = Field(existCheck=False, updateable=True, isArray=True, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)14    crest_url = Field(existCheck=False, updateable=True, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)15class TeamSeasonItem(Item):16    _id = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)17    collection = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)18    url = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)19    team_id = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)20    name = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)21    competition = Field(existCheck=True, updateable=False, isArray=False, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)22    players = Field(existCheck=False, updateable=True, isArray=True, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)23class CompetitionItem(Item):24    _id = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)25    collection = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)26    url = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)27    icon = Field(existCheck=False, updateable=True, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)28    league_code = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)29    seasons = Field(existCheck=False, updateable=True, isArray=True, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)30    metadata = Field(existCheck=False, updateable=True, isArray=False, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)31class CompetitionSeasonItem(Item):32    _id = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)33    collection = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)34    url = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)35    league_code = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)36    season = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)37    teams = Field(existCheck=False, updateable=False, isArray=True, isDict=True, arrayReplace=True, isSearchField=False, searchWeight=0)38class PlayerItem(Item):39    _id = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)40    collection = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)41    url = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)42    player_id = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)43    name = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=True, searchWeight=200)44    firstname = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=True, searchWeight=100)45    lastname = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=True, searchWeight=300)46    birthday = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)47    country = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)48    seasons = Field(existCheck=False, updateable=True, isArray=True, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)49class FixtureItem(Item):50    _id = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)51    collection = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)52    date = Field(existCheck=False, updateable=True, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)53    matchday = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)54    homeTeam = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)55    awayTeam = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)56    result = Field(existCheck=False, updateable=True, isArray=False, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)57    season = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)58    league_code = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)59    url = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)60    referee = Field(existCheck=False, updateable=True, isArray=False, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)61    stadium = Field(existCheck=False, updateable=True, isArray=False, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)62    goals = Field(existCheck=False, updateable=True, isArray=True, isDict=True, arrayReplace=True, isSearchField=False, searchWeight=0)63    assists = Field(existCheck=False, updateable=True, isArray=True, isDict=True, arrayReplace=True, isSearchField=False, searchWeight=0)64    cards = Field(existCheck=False, updateable=True, isArray=True, isDict=True, arrayReplace=True, isSearchField=False, searchWeight=0)...views.py
Source:views.py  
1from django.shortcuts import render2#from cloudserver.models import System_Monit3from django.views.decorators.csrf import csrf_exempt4import copy5import time6import psutil7import requests8import json9import numpy as np10import pandas as pd11from requests.adapters import HTTPAdapter12from urllib3.util.retry import Retry13from django.http import HttpResponse14#from cloudserver.dcpower.feadeal import build_cpu_metric, calc_cpu_usage15#from cloudserver.dcpower.testpower import lasso_vm2pc_pred, lasso_vm2pc_train16#from cloudserver.dcpower.dockerpower import build_docker_metrics, calc_docker_power17#from cloudserver.dcpower.predpower import pcpower_pred_train, pcpower_pred18#from cloudserver.dcpower.nnpower import nn_vm2pc_train, nn_vm2pc_pred19from pandas import DataFrame20from pandas import concat21from sklearn.preprocessing import MinMaxScaler22from keras.models import Sequential23from keras.layers import Dense24import numpy25def chart(request):26    # æ¾ç¤ºhtmlæä»¶27    return render(request, "cloudserver/line-time-series/index.htm")28    29    30def chart_pred(request):31    return render(request,"cloudserver/line-time-series/chart_pred.htm")32def getCPUstate(interval=1):33    cpu = psutil.cpu_percent(interval)34    return cpu35def getMemorystate():36    phymem = psutil.virtual_memory()37    cur_mem = phymem.percent38    mem_rate = int(phymem.used / 1024 / 1024)39    mem_all = int(phymem.total / 1024 / 1024)40    line = {41        'cur_mem': cur_mem,42        'mem_rate': mem_rate,43        'mem_all': mem_all,44    }45    return cur_mem46def server_cpu_json(request):47    data = []48    if request.method == "GET":49        i = 050        while(i < 2):51            t = time.time()52            time_stamp = int(round(t * 1000))  # 转æ¢ä¸ºæ¯«ç§çæ¶é´æ³53            cpu = getCPUstate()54            data.append([int(time_stamp), float('%.2f' % cpu)])55            print(data)56            i += 157    isdict = json.dumps(data)  # jsonåºååå表58    return HttpResponse(isdict, content_type="application/json")59    60    61def server_mem_json(request):62    data = []63    if request.method == "GET":64        i = 065        while(i < 2):66            t = time.time()67            time_stamp = int(round(t * 1000))  # 转æ¢ä¸ºæ¯«ç§çæ¶é´æ³68            mem = getMemorystate()69            data.append([int(time_stamp), float('%.2f' % mem)])70            print(data)71            i += 172    isdict = json.dumps(data)  # jsonåºååå表73    return HttpResponse(isdict, content_type="application/json")74def server_cpu_pred_json(request):75    data = []76    i = 077    while (i < 30):78        t = time.time()79        time_stamp = int(round(t * 1000))  # 转æ¢ä¸ºæ¯«ç§çæ¶é´æ³80        cpu = getCPUstate()81        data.append([int(time_stamp), float('%.2f' % cpu)])82        print(data)83        i += 184    ori_data = copy.deepcopy(data)85    for i, cpu in enumerate(ori_data):86        del[cpu[0]]87    # to_csv = pd.DataFrame(columns=['time','cpu_info'], data=data)88    values = np.array(ori_data)89    dataset = values.astype('float32')90    # normalize the dataset91    scaler = MinMaxScaler(feature_range=(0, 1))92    dataset = scaler.fit_transform(dataset)93    # split into train and test sets94    train_size = int(len(dataset) * 0.67)95    test_size = len(dataset) - train_size96    train, test = dataset[0:train_size, :], dataset[train_size:len(dataset), :]97    # reshape dataset98    look_back = 399    trainX, trainY = create_dataset(train, look_back)100    testX, testY = create_dataset(test, look_back)101    # create and fit Multilayer Perceptron model102    model = Sequential()103    model.add(Dense(12, input_dim=look_back, activation='relu'))104    model.add(Dense(8, activation='relu'))105    model.add(Dense(1))106    model.compile(loss='mean_squared_error', optimizer='adam')107    model.fit(trainX, trainY, epochs=100, batch_size=2, verbose=2)108    # model = load_model("/Users/Arithmetic/PycharmProjects/MyBlog/cloudserver/algorithm/airline_passengers/my_model.h5")109    trainScore = model.evaluate(trainX, trainY, verbose=0)110    trainPredict = model.predict(trainX)111    testPredict = model.predict(testX)112    mm = MinMaxScaler()113    trainPredict = mm.fit_transform(trainPredict)114    trainPredict = mm.inverse_transform(trainPredict)115    # print(trainPredict.tolist())116    tem = trainPredict.tolist()117    # print(data)118    # print(tem[0],len(tem),len(data))119    for i, item in enumerate(data[0:len(tem)]):120        item[1] = tem[i][0]121    # print(data)122    isdict = json.dumps(data)  # jsonåºååå表123    print("OJBK")124    return HttpResponse(isdict, content_type="application/json")125    126def chart_json(request):127    # 读å表ææè®°å½128    system_monit = System_Monit.objects.all()129    data = []  # å建ä¸ä¸ªç©ºå表130    dic = {}131    for i in system_monit:  # éåï¼æ¼æ¨ªçºµåæ 132        # æ¨ªåæ ä¸ºæ¶é´æ³,çºµåæ ä¸ºcpu使ç¨çãæ³¨æï¼å¿
须转æ¢ç±»åï¼å¦åæ°æ®ä¸å¯¹ã133        # t = int(i.time_stamp)134        # c = float('%.2f' % i.cpu)135        # dic['%d' % t] = c136        # data.append(dic)137        data.append([int(i.time_stamp), float('%.2f' % i.cpu)])138    # print(data)139    isdict = json.dumps(data)  # jsonåºååå表140    return HttpResponse(isdict, content_type="application/json")  # æ§è¡ç±»å为json...Looking for an in-depth tutorial around pytest? LambdaTest covers the detailed pytest tutorial that has everything related to the pytest, from setting up the pytest framework to automation testing. Delve deeper into pytest testing by exploring advanced use cases like parallel testing, pytest fixtures, parameterization, executing multiple test cases from a single file, and more.
Skim our below pytest tutorial playlist to get started with automation testing using the pytest framework.
https://www.youtube.com/playlist?list=PLZMWkkQEwOPlcGgDmHl8KkXKeLF83XlrP
Get 100 minutes of automation test minutes FREE!!
