How to use isdict method in Pytest

Best Python code snippet using pytest

dbpool.py

Source:dbpool.py Github

copy

Full Screen

1# coding: utf-82import datetime3import logging4import os5import random6import threading7import time8import types9debug = True10dbpool = {}11def timeit(func):12 def _(*args, **kwargs):13 start = time.time()14 err = ''15 try:16 retval = func(*args, **kwargs)17 return retval18 except Exception as e:19 err = str(e)20 raise21 finally:22 end = time.time()23 conn = args[0]24 dbcf = conn.param25 logging.info('name=%s|user=%s|addr=%s:%d|db=%s|time=%d|sql=%s|err=%s',26 conn.name, dbcf.get('user', ''),27 dbcf.get('host', ''), dbcf.get('port', 0),28 os.path.basename(dbcf.get('db', '')),29 int((end - start) * 1000000),30 repr(args[1])[:100], err)31 return _32class DBPoolBase:33 def acquire(self, name):34 pass35 def release(self, name, conn):36 pass37class DBResult:38 def __init__(self, fields, data):39 self.fields = fields40 self.data = data41 def todict(self):42 ret = []43 for item in self.data:44 ret.append(dict(zip(self.fields, item)))45 return ret46 def __iter__(self):47 for row in self.data:48 yield dict(zip(self.fields, row))49 def row(self, i, isdict=True):50 if isdict:51 return dict(zip(self.fields, self.data[i]))52 return self.data[i]53 def __getitem__(self, i):54 return dict(zip(self.fields, self.data[i]))55class DBFunc:56 def __init__(self, data):57 self.value = data58class DBConnection:59 def __init__(self, param, lasttime, status):60 self.name = None61 self.param = param62 self.conn = None63 self.status = status64 self.lasttime = lasttime65 self.pool = None66 def is_available(self):67 if self.status == 0:68 return True69 return False70 def useit(self):71 self.status = 172 self.lasttime = time.time()73 def releaseit(self):74 self.status = 075 def connect(self):76 pass77 def close(self):78 pass79 def alive(self):80 pass81 def cursor(self):82 return self.conn.cursor()83 84 @timeit85 def execute(self, sql, param=None):86 # logging.debug(sql[:200])87 cur = self.conn.cursor()88 try:89 if param:90 cur.execute(sql, param)91 else:92 cur.execute(sql)93 except Exception, e:94 raise e95 # logging.warning(e)96 self.connect()97 if param:98 cur.execute(sql, param)99 else:100 cur.execute(sql)101 ret = cur.fetchall()102 cur.close()103 return ret104 @timeit105 def executemany(self, sql, param):106 cur = self.conn.cursor()107 try:108 ret = cur.executemany(sql, param)109 except:110 self.connect()111 ret = cur.executemany(sql, param)112 cur.close()113 return ret114 @timeit115 def query(self, sql, param=None, isdict=True):116 '''sql查询,返回查询结果'''117 cur = self.conn.cursor()118 try:119 if not param:120 cur.execute(sql)121 else:122 cur.execute(sql, param)123 except:124 self.connect()125 if not param:126 cur.execute(sql)127 else:128 cur.execute(sql, param)129 res = cur.fetchall()130 cur.close()131 # logging.info('desc:', cur.description)132 if res and isdict:133 ret = []134 xkeys = [i[0] for i in cur.description]135 for item in res:136 ret.append(dict(zip(xkeys, item)))137 else:138 ret = res139 return ret140 @timeit141 def get(self, sql, param=None, isdict=True):142 '''sql查询,只返回一条'''143 cur = self.conn.cursor()144 try:145 if not param:146 cur.execute(sql)147 else:148 cur.execute(sql, param)149 except:150 self.connect()151 if not param:152 cur.execute(sql)153 else:154 cur.execute(sql, param)155 res = cur.fetchone()156 cur.close()157 if res and isdict:158 xkeys = [i[0] for i in cur.description]159 return dict(zip(xkeys, res))160 else:161 return res162 def value2sql(self, v, charset='utf-8'):163 tv = type(v)164 if tv in [types.StringType, types.UnicodeType]:165 if tv == types.UnicodeType:166 v = v.encode(charset)167 if v.startswith(('now()', 'md5(')):168 return v169 # v = self.escape(v)170 return "'%s'" % v if not v or v[0] != "'" else v171 elif isinstance(v, datetime.datetime):172 return "'%s'" % str(v)173 elif isinstance(v, DBFunc):174 return v.value175 else:176 if v is None:177 return 'NULL'178 return str(v)179 def dict2sql(self, d, sp=','):180 space_char = '"' if isinstance(self, PGConnection) else '`'181 """字典可以是 {name:value} 形式,也可以是 {name:(operator, value)}"""182 x = []183 for k, v in d.items():184 if isinstance(v, types.TupleType):185 x.append('%s' % self.exp2sql(k, v[0], v[1]))186 else:187 x.append('%s%s%s=%s' % (space_char,188 k.strip(' %s'%space_char).replace('.', '%s.%s'%(space_char, space_char)),189 space_char, self.value2sql(v)))190 return sp.join(x)191 def exp2sql(self, key, op, value):192 space_char = '"' if isinstance(self, PGConnection) else '`'193 item = '(%s%s%s %s ' % (space_char, key.strip(space_char).replace('.', '%s.%s'%(space_char, space_char)),194 space_char, op.strip())195 if op in ['in', 'not in']:196 item += '(%s))' % ','.join([self.value2sql(x) for x in value])197 elif op == 'between':198 item += ' %s and %s)' % (self.value2sql(value[0]), self.value2sql(value[1]))199 else:200 item += self.value2sql(value) + ')'201 return item202 def dict2insert(self, d):203 space_char = '"' if isinstance(self, PGConnection) else '`'204 keys = d.keys()205 vals = []206 for k in keys:207 vals.append('%s' % self.value2sql(d[k]))208 new_keys = [space_char + k.strip(space_char) + space_char for k in keys]209 return ','.join(new_keys), ','.join(vals)210 def insert(self, table, values):211 # sql = "insert into %s set %s" % (table, self.dict2sql(values))212 keys, vals = self.dict2insert(values)213 sql = "insert into %s(%s) values (%s)" % (table, keys, vals)214 ret = self.execute(sql)215 if ret:216 ret = self.last_insert_id()217 return ret218 def insert_ignore(self, table, values):219 keys, vals = self.dict2insert(values)220 sql = "insert ignore into %s(%s) values (%s)" % (table, keys, vals)221 ret = self.execute(sql)222 if ret:223 ret = self.last_insert_id()224 return ret225 def update(self, table, values, where=None):226 sql = "update %s set %s" % (table, self.dict2sql(values))227 if where:228 sql += " where %s" % self.dict2sql(where, ' and ')229 return self.execute(sql)230 def delete(self, table, where):231 sql = "delete from %s" % table232 if where:233 sql += " where %s" % self.dict2sql(where, ' and ')234 return self.execute(sql)235 def select(self, table, where=None, fields='*', other=None, isdict=True):236 sql = "select %s from %s" % (fields, table)237 if where:238 sql += " where %s" % self.dict2sql(where, ' and ')239 if other:240 sql += ' ' + other241 return self.query(sql, None, isdict=isdict)242 def select_one(self, table, where=None, fields='*', other=None, isdict=True):243 sql = "select %s from %s" % (fields, table)244 if where:245 sql += " where %s" % self.dict2sql(where, ' and ')246 if other:247 sql += ' ' + other248 return self.get(sql, None, isdict=isdict)249 def get_page_data(self, table, where=None, fields='*', other=None, isdict=True, page_num=1, page_count=20):250 """根据条件进行分页查询251 get_page_data252 return all_count, page_num, data253 all_count: 所有数据条目数254 page_num: 当前返回数据的页码255 data: 页面数据内容256 """257 page_num = int(page_num)258 page_count = int(page_count)259 count_data = self.select_one(table, where, "count(*) as count", other, isdict=isdict)260 if count_data and count_data['count']:261 all_count = count_data["count"]262 else:263 all_count = 0264 if all_count == 0:265 return 0, page_num, []266 267 offset = page_num * page_count - page_count268 if offset > all_count:269 data = []270 else:271 other += " limit %d offset %d" % (page_count, offset)272 data = self.select(table, where, fields, other, isdict=isdict)273 return all_count, page_num, data274 def last_insert_id(self):275 pass276 def start(self): # start transaction277 pass278 def commit(self):279 self.conn.commit()280 def rollback(self):281 self.conn.rollback()282 def escape(self, s):283 return s284def with_mysql_reconnect(func):285 def _(self, *args, **argitems):286 try:287 import MySQLdb288 mysqllib = MySQLdb289 except:290 logging.info("MySQLdb load error! Load pymysql...")291 import pymysql292 mysqllib = pymysql293 trycount = 3294 while True:295 try:296 x = func(self, *args, **argitems)297 except mysqllib.OperationalError as e:298 # logging.err('mysql error:', e)299 if e[0] >= 2000: # client error300 # logging.err('reconnect ...')301 self.conn.close()302 self.connect()303 trycount -= 1304 if trycount > 0:305 continue306 raise307 else:308 return x309 return _310def with_pg_reconnect(func):311 def _(self, *args, **argitems):312 import psycopg2313 trycount = 3314 while True:315 try:316 x = func(self, *args, **argitems)317 except psycopg2.OperationalError as e:318 # logging.err('mysql error:', e)319 if e[0] >= 2000: # client error320 # logging.err('reconnect ...')321 self.conn.close()322 self.connect()323 trycount -= 1324 if trycount > 0:325 continue326 raise327 else:328 return x329 return _330class PGConnection(DBConnection):331 name = "pg"332 def __init__(self, param, lasttime, status):333 DBConnection.__init__(self, param, lasttime, status)334 self.connect()335 def useit(self):336 self.status = 1337 self.lasttime = time.time()338 def releaseit(self):339 self.status = 0340 def connect(self):341 engine = self.param['engine']342 if engine == 'pg':343 import psycopg2344 self.conn = psycopg2.connect(host=self.param['host'],345 port=self.param['port'],346 user=self.param['user'],347 password=self.param['passwd'],348 database=self.param['db']349 )350 self.conn.autocommit = 1351 else:352 raise ValueError('engine error:' + engine)353 # logging.note('mysql connected', self.conn)354 def close(self):355 self.conn.close()356 self.conn = None357 @with_pg_reconnect358 def alive(self):359 if self.is_available():360 cur = self.conn.cursor()361 cur.query("show tables;")362 cur.close()363 self.conn.ping()364 @with_pg_reconnect365 def execute(self, sql, param=None):366 return DBConnection.execute(self, sql, param)367 @with_pg_reconnect368 def executemany(self, sql, param):369 return DBConnection.executemany(self, sql, param)370 @with_pg_reconnect371 def query(self, sql, param=None, isdict=True):372 return DBConnection.query(self, sql, param, isdict)373 @with_pg_reconnect374 def get(self, sql, param=None, isdict=True):375 return DBConnection.get(self, sql, param, isdict)376 def escape(self, s, enc='utf-8'):377 if type(s) == types.UnicodeType:378 s = s.encode(enc)379 import psycopg2380 ns = psycopg2._param_escape(s)381 return unicode(ns, enc)382 def last_insert_id(self):383 ret = self.query('select last_insert_id()', isdict=False)384 return ret[0][0]385 def start(self):386 sql = "start transaction"387 return self.execute(sql)388 def insert(self, table, values):389 # sql = "insert into %s set %s" % (table, self.dict2sql(values))390 ret = 0391 try:392 keys, vals = self.dict2insert(values)393 sql = "insert into %s(%s) values (%s) RETURNING id" % (table, keys, vals)394 data = self.query(sql)395 if data:396 ret = data[0].get('id')397 except Exception as e:398 logging.error(e)399 return ret400 def insert_ignore(self, table, values):401 return self.insert(table, values)402class MySQLConnection(DBConnection):403 name = "mysql"404 def __init__(self, param, lasttime, status):405 DBConnection.__init__(self, param, lasttime, status)406 self.connect()407 def useit(self):408 self.status = 1409 self.lasttime = time.time()410 def releaseit(self):411 self.status = 0412 def connect(self):413 engine = self.param['engine']414 if engine == 'mysql':415 try:416 import MySQLdb417 mysqllib = MySQLdb418 except:419 logging.info("MySQLdb load error! Load pymysql...")420 import pymysql421 mysqllib = pymysql422 self.conn = mysqllib.connect(host=self.param['host'],423 port=self.param['port'],424 user=self.param['user'],425 passwd=self.param['passwd'],426 db=self.param['db'],427 charset=self.param['charset'],428 connect_timeout=self.param.get('timeout', 20),429 )430 self.conn.autocommit(1)431 # if self.param.get('autocommit',None):432 # logging.note('set autocommit')433 # self.conn.autocommit(1)434 # initsqls = self.param.get('init_command')435 # if initsqls:436 # logging.note('init sqls:', initsqls)437 # cur = self.conn.cursor()438 # cur.execute(initsqls)439 # cur.close()440 else:441 raise ValueError('engine error:' + engine)442 # logging.note('mysql connected', self.conn)443 def close(self):444 self.conn.close()445 self.conn = None446 @with_mysql_reconnect447 def alive(self):448 if self.is_available():449 cur = self.conn.cursor()450 cur.execute("show tables;")451 cur.close()452 self.conn.ping()453 @with_mysql_reconnect454 def execute(self, sql, param=None):455 return DBConnection.execute(self, sql, param)456 @with_mysql_reconnect457 def executemany(self, sql, param):458 return DBConnection.executemany(self, sql, param)459 @with_mysql_reconnect460 def query(self, sql, param=None, isdict=True):461 return DBConnection.query(self, sql, param, isdict)462 @with_mysql_reconnect463 def get(self, sql, param=None, isdict=True):464 return DBConnection.get(self, sql, param, isdict)465 def escape(self, s, enc='utf-8'):466 if type(s) == types.UnicodeType:467 s = s.encode(enc)468 ns = self.conn.escape_string(s)469 return unicode(ns, enc)470 def last_insert_id(self):471 ret = self.query('select last_insert_id()', isdict=False)472 return ret[0][0]473 def start(self):474 sql = "start transaction"475 return self.execute(sql)476class SQLiteConnection(DBConnection):477 name = "sqlite"478 def __init__(self, param, lasttime, status):479 DBConnection.__init__(self, param, lasttime, status)480 def connect(self):481 engine = self.param['engine']482 if engine == 'sqlite':483 import sqlite3484 self.conn = sqlite3.connect(self.param['db'], isolation_level=None)485 else:486 raise ValueError('engine error:' + engine)487 def useit(self):488 DBConnection.useit(self)489 if not self.conn:490 self.connect()491 def releaseit(self):492 DBConnection.releaseit(self)493 self.conn.close()494 self.conn = None495 def escape(self, s, enc='utf-8'):496 s = s.replace("'", "\\'")497 s = s.replace('"', '\\"')498 return s499 def last_insert_id(self):500 ret = self.query('select last_insert_rowid()', isdict=False)501 return ret[0][0]502 def start(self):503 sql = "BEGIN"504 return self.conn.execute(sql)505class DBPool(DBPoolBase):506 def __init__(self, dbcf):507 # one item: [conn, last_get_time, stauts]508 self.dbconn_idle = []509 self.dbconn_using = []510 self.dbcf = dbcf511 self.max_conn = 20512 self.min_conn = 1513 if 'conn' in self.dbcf:514 self.max_conn = self.dbcf['conn']515 self.connection_class = {}516 x = globals()517 for v in x.values():518 try:519 class_type = types.ClassType520 except:521 class_type = type522 if type(v) == class_type and v != DBConnection and issubclass(v, DBConnection):523 self.connection_class[v.name] = v524 self.lock = threading.Lock()525 self.cond = threading.Condition(self.lock)526 self.open(self.min_conn)527 def synchronize(func):528 def _(self, *args, **argitems):529 self.lock.acquire()530 x = None531 try:532 x = func(self, *args, **argitems)533 finally:534 self.lock.release()535 return x536 return _537 def open(self, n=1):538 param = self.dbcf539 newconns = []540 for i in range(0, n):541 try:542 myconn = self.connection_class[param['engine']](param, time.time(), 0)543 newconns.append(myconn)544 except Exception as e:545 logging.info(e)546 logging.error("%s connection error!" % param)547 self.dbconn_idle += newconns548 def clear_timeout(self):549 # logging.info('try clear timeout conn ...')550 now = time.time()551 dels = []552 allconn = len(self.dbconn_idle) + len(self.dbconn_using)553 for c in self.dbconn_idle:554 if allconn == 1:555 break556 if now - c.lasttime > 10:557 dels.append(c)558 allconn -= 1559 logging.warn('close timeout db conn:%d', len(dels))560 for c in dels:561 c.close()562 self.dbconn_idle.remove(c)563 @synchronize564 def acquire(self, timeout=None):565 try_count = 10566 while len(self.dbconn_idle) == 0:567 try_count -= 1568 if not try_count:569 break570 if len(self.dbconn_idle) + len(self.dbconn_using) < self.max_conn:571 self.open()572 continue573 self.cond.wait(timeout)574 if not self.dbconn_idle:575 return None576 conn = self.dbconn_idle.pop(0)577 conn.useit()578 self.dbconn_using.append(conn)579 if random.randint(0, 100) > 80:580 self.clear_timeout()581 return conn582 @synchronize583 def release(self, conn):584 self.dbconn_using.remove(conn)585 conn.releaseit()586 self.dbconn_idle.insert(0, conn)587 self.cond.notify()588 @synchronize589 def alive(self):590 for conn in self.dbconn_idle:591 conn.alive()592 def size(self):593 return len(self.dbconn_idle), len(self.dbconn_using)594class DBConnProxy:595 def __init__(self, master_conn, slave_conn):596 # self.name = ''597 self.master = master_conn598 self.slave = slave_conn599 self._modify_methods = set(600 ['execute', 'executemany', 'last_insert_id', 'insert', 'update', 'delete', 'insert_list'])601 self._master_methods = {602 'selectw_one': 'select_one',603 'selectw': 'select',604 'queryw': 'query',605 'getw': 'get',606 }607 def __getattr__(self, name):608 if name in self._modify_methods:609 return getattr(self.master, name)610 elif name in self._master_methods:611 return getattr(self.master, self._master_methods[name])612 else:613 return getattr(self.slave, name)614class RWDBPool:615 def __init__(self, dbcf):616 self.dbcf = dbcf617 self.name = ''618 self.policy = dbcf.get('policy', 'round_robin')619 self.master = DBPool(dbcf.get('master', None))620 self.slaves = []621 self._slave_current = -1622 for x in dbcf.get('slave', []):623 self.slaves.append(DBPool(x))624 def get_slave(self):625 if self.policy == 'round_robin':626 size = len(self.slaves)627 self._slave_current = (self._slave_current + 1) % size628 return self.slaves[self._slave_current]629 else:630 raise ValueError('policy not support')631 def get_master(self):632 return self.master633 def acquire(self, timeout=None):634 # logging.debug('rwdbpool acquire')635 master_conn = None636 slave_conn = None637 try:638 master_conn = self.master.acquire(timeout)639 slave_conn = self.get_slave().acquire(timeout)640 return DBConnProxy(master_conn, slave_conn)641 except:642 if master_conn:643 master_conn.pool.release(master_conn)644 if slave_conn:645 slave_conn.pool.release(slave_conn)646 raise647 def release(self, conn):648 # logging.debug('rwdbpool release')649 conn.master.pool.release(conn.master)650 conn.slave.pool.release(conn.slave)651 def size(self):652 ret = {'master': self.master.size(), 'slave': []}653 for x in self.slaves:654 key = '%s@%s:%d' % (x.dbcf['user'], x.dbcf['host'], x.dbcf['port'])655 ret['slave'].append((key, x.size()))656 return ret657def checkalive(name=None):658 global dbpool659 while True:660 if name is None:661 checknames = dbpool.keys()662 else:663 checknames = [name]664 for k in checknames:665 pool = dbpool[k]666 pool.alive()667 time.sleep(300)668def install(cf, force=False):669 global dbpool670 if dbpool and not force:671 return dbpool672 dbpool = {}673 for name, item in cf.items():674 # item = cf[name]675 dbp = None676 if 'master' in item:677 dbp = RWDBPool(item)678 else:679 dbp = DBPool(item)680 dbpool[name] = dbp681 return dbpool682def acquire(name, timeout=None):683 global dbpool684 # logging.info("acquire:", name)685 pool = dbpool.get(name, None)686 x = None687 if pool:688 x = pool.acquire(timeout)689 if x:690 x.name = name691 return x692def release(conn):693 global dbpool694 # logging.info("release:", name)695 if not conn:696 return None697 pool = dbpool[conn.name]698 return pool.release(conn)699def execute(db, sql, param=None):700 return db.execute(sql, param)701def executemany(db, sql, param):702 return db.executemany(sql, param)703def query(db, sql, param=None, isdict=True):704 return db.query(sql, param, isdict)705def with_database(name, errfunc=None, errstr=''):706 def f(func):707 def _(self, *args, **argitems):708 self.db = acquire(name)709 x = None710 try:711 x = func(self, *args, **argitems)712 except:713 if errfunc:714 return getattr(self, errfunc)(error=errstr)715 else:716 raise717 finally:718 release(self.db)719 self.db = None720 return x721 return _722 return f723def with_database_class(name):724 def _(cls):725 try:726 cls.db = acquire(name)727 except:728 cls.db = None729 finally:730 release(cls.db)731 return cls...

Full Screen

Full Screen

_dict.py

Source:_dict.py Github

copy

Full Screen

1from __future__ import annotations2from typing import Any, Callable, Container, Dict, Optional, Union, overload3from ._base import DirtyEquals, DirtyEqualsMeta4NotGiven = object()5class IsDict(DirtyEquals[Dict[Any, Any]]):6 """7 Base class for comparing dictionaries. By default, `IsDict` isn't particularly useful on its own8 (it behaves pretty much like a normal `dict`), but it can be subclassed9 (see [`IsPartialDict`][dirty_equals.IsPartialDict] and [`IsStrictDict`][dirty_equals.IsStrictDict]) or modified10 with `.settings(...)` to powerful things.11 """12 @overload13 def __init__(self, expected: Dict[Any, Any]):14 ...15 @overload16 def __init__(self, **expected: Any):17 ...18 def __init__(self, *expected_args: Dict[Any, Any], **expected_kwargs: Any):19 """20 Can be created from either key word arguments or an existing dictionary (same as `dict()`).21 `IsDict` is not particularly useful on its own, but it can be subclassed or modified with22 [`.settings(...)`][dirty_equals.IsDict.settings] to facilitate powerful comparison of dictionaries.23 ```py title="IsDict"24 from dirty_equals import IsDict25 assert {'a': 1, 'b': 2} == IsDict(a=1, b=2)26 assert {1: 2, 3: 4} == IsDict({1: 2, 3: 4})27 ```28 """29 if expected_kwargs:30 self.expected_values = expected_kwargs31 if expected_args:32 raise TypeError('IsDict requires either a single argument or kwargs, not both')33 elif not expected_args:34 self.expected_values = {}35 elif len(expected_args) == 1:36 self.expected_values = expected_args[0]37 else:38 raise TypeError(f'IsDict expected at most 1 argument, got {len(expected_args)}')39 if not isinstance(self.expected_values, dict):40 raise TypeError(f'expected_values must be a dict, got {type(self.expected_values)}')41 self.strict = False42 self.partial = False43 self.ignore: Union[None, Container[Any], Callable[[Any], bool]] = None44 self._post_init()45 super().__init__()46 def _post_init(self) -> None:47 pass48 def settings(49 self,50 *,51 strict: Optional[bool] = None,52 partial: Optional[bool] = None,53 ignore: Union[None, Container[Any], Callable[[Any], bool]] = NotGiven, # type: ignore[assignment]54 ) -> IsDict:55 """56 Allows you to customise the behaviour of `IsDict`, technically a new `IsDict` is required to allow chaining.57 Args:58 strict (bool): If `True`, the order of key/value pairs must match.59 partial (bool): If `True`, only keys include in the wrapped dict are checked.60 ignore (Union[None, Container[Any], Callable[[Any], bool]]): Values to omit from comparison.61 Can be either a `Container` (e.g. `set` or `list`) of values to ignore, or a function that takes a62 value and should return `True` if the value should be ignored.63 ```py title="IsDict.settings(...)"64 from dirty_equals import IsDict65 assert {'a': 1, 'b': 2, 'c': None} != IsDict(a=1, b=2)66 assert {'a': 1, 'b': 2, 'c': None} == IsDict(a=1, b=2).settings(partial=True) #(1)!67 assert {'b': 2, 'a': 1} == IsDict(a=1, b=2)68 assert {'b': 2, 'a': 1} != IsDict(a=1, b=2).settings(strict=True) #(2)!69 # combining partial and strict70 assert {'a': 1, 'b': None, 'c': 3} == IsDict(a=1, c=3).settings(strict=True, partial=True)71 assert {'b': None, 'c': 3, 'a': 1} != IsDict(a=1, c=3).settings(strict=True, partial=True)72 ```73 1. This is the same as [`IsPartialDict(a=1, b=2)`][dirty_equals.IsPartialDict]74 2. This is the same as [`IsStrictDict(a=1, b=2)`][dirty_equals.IsStrictDict]75 """76 new_cls = self.__class__(self.expected_values)77 new_cls.__dict__ = self.__dict__.copy()78 if strict is not None:79 new_cls.strict = strict80 if partial is not None:81 new_cls.partial = partial82 if ignore is not NotGiven:83 new_cls.ignore = ignore84 if new_cls.partial and new_cls.ignore:85 raise TypeError('partial and ignore cannot be used together')86 return new_cls87 def equals(self, other: Dict[Any, Any]) -> bool:88 if not isinstance(other, dict):89 return False90 expected = self.expected_values91 if self.partial:92 other = {k: v for k, v in other.items() if k in expected}93 if self.ignore:94 expected = self._filter_dict(self.expected_values)95 other = self._filter_dict(other)96 if other != expected:97 return False98 if self.strict and list(other.keys()) != list(expected.keys()):99 return False100 return True101 def _filter_dict(self, d: Dict[Any, Any]) -> Dict[Any, Any]:102 return {k: v for k, v in d.items() if not self._ignore_value(v)}103 def _ignore_value(self, v: Any) -> bool:104 if isinstance(v, (DirtyEquals, DirtyEqualsMeta)):105 return False106 elif callable(self.ignore):107 return self.ignore(v)108 else:109 try:110 return v in self.ignore # type: ignore[operator]111 except TypeError:112 # happens for unhashable types113 return False114 def _repr_ne(self) -> str:115 name = self.__class__.__name__116 modifiers = []117 if self.partial != (name == 'IsPartialDict'):118 modifiers += [f'partial={self.partial}']119 if (self.ignore == {None}) != (name == 'IsIgnoreDict') or self.ignore not in (None, {None}):120 r = self.ignore.__name__ if callable(self.ignore) else repr(self.ignore)121 modifiers += [f'ignore={r}']122 if self.strict != (name == 'IsStrictDict'):123 modifiers += [f'strict={self.strict}']124 if modifiers:125 mod = f'[{", ".join(modifiers)}]'126 else:127 mod = ''128 args = [f'{k}={v!r}' for k, v in self.expected_values.items()]129 return f'{name}{mod}({", ".join(args)})'130class IsPartialDict(IsDict):131 """132 Partial dictionary comparison, this is the same as133 [`IsDict(...).settings(partial=True)`][dirty_equals.IsDict.settings].134 ```py title="IsPartialDict"135 from dirty_equals import IsPartialDict136 assert {'a': 1, 'b': 2, 'c': 3} == IsPartialDict(a=1, b=2)137 assert {'a': 1, 'b': 2, 'c': 3} != IsPartialDict(a=1, b=3)138 assert {'a': 1, 'b': 2, 'd': 3} != IsPartialDict(a=1, b=2, c=3)139 # combining partial and strict140 assert {'a': 1, 'b': None, 'c': 3} == IsPartialDict(a=1, c=3).settings(strict=True)141 assert {'b': None, 'c': 3, 'a': 1} != IsPartialDict(a=1, c=3).settings(strict=True)142 ```143 """144 def _post_init(self) -> None:145 self.partial = True146class IsIgnoreDict(IsDict):147 """148 Dictionary comparison with `None` values ignored, this is the same as149 [`IsDict(...).settings(ignore={None})`][dirty_equals.IsDict.settings].150 `.settings(...)` can be used to customise the behaviour of `IsIgnoreDict`, in particular changing which151 values are ignored.152 ```py title="IsIgnoreDict"153 from dirty_equals import IsIgnoreDict154 assert {'a': 1, 'b': 2, 'c': None} == IsIgnoreDict(a=1, b=2)155 assert {'a': 1, 'b': 2, 'c': None, 'c': 'ignore'} == (156 IsIgnoreDict(a=1, b=2).settings(ignore={None, 'ignore'})157 )158 def is_even(v: int) -> bool:159 return v % 2 == 0160 assert {'a': 1, 'b': 2, 'c': 3, 'd': 4} == (161 IsIgnoreDict(a=1, c=3).settings(ignore=is_even)162 )163 # combining partial and strict164 assert {'a': 1, 'b': None, 'c': 3} == IsIgnoreDict(a=1, c=3).settings(strict=True)165 assert {'b': None, 'c': 3, 'a': 1} != IsIgnoreDict(a=1, c=3).settings(strict=True)166 ```167 """168 def _post_init(self) -> None:169 self.ignore = {None}170class IsStrictDict(IsDict):171 """172 Dictionary comparison with order enforced, this is the same as173 [`IsDict(...).settings(strict=True)`][dirty_equals.IsDict.settings].174 ```py title="IsDict.settings(...)"175 from dirty_equals import IsStrictDict176 assert {'a': 1, 'b': 2} == IsStrictDict(a=1, b=2)177 assert {'a': 1, 'b': 2, 'c': 3} != IsStrictDict(a=1, b=2)178 assert {'b': 2, 'a': 1} != IsStrictDict(a=1, b=2)179 # combining partial and strict180 assert {'a': 1, 'b': None, 'c': 3} == IsStrictDict(a=1, c=3).settings(partial=True)181 assert {'b': None, 'c': 3, 'a': 1} != IsStrictDict(a=1, c=3).settings(partial=True)182 ```183 """184 def _post_init(self) -> None:...

Full Screen

Full Screen

items.py

Source:items.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2# Define here the models for your scraped items3#4# See documentation in:5# http://doc.scrapy.org/en/latest/topics/items.html6from scrapy.item import Item, Field7class TeamItem(Item):8 _id = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)9 collection = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)10 url = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)11 team_id = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)12 name = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=True, searchWeight=100)13 competitions = Field(existCheck=False, updateable=True, isArray=True, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)14 crest_url = Field(existCheck=False, updateable=True, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)15class TeamSeasonItem(Item):16 _id = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)17 collection = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)18 url = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)19 team_id = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)20 name = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)21 competition = Field(existCheck=True, updateable=False, isArray=False, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)22 players = Field(existCheck=False, updateable=True, isArray=True, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)23class CompetitionItem(Item):24 _id = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)25 collection = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)26 url = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)27 icon = Field(existCheck=False, updateable=True, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)28 league_code = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)29 seasons = Field(existCheck=False, updateable=True, isArray=True, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)30 metadata = Field(existCheck=False, updateable=True, isArray=False, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)31class CompetitionSeasonItem(Item):32 _id = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)33 collection = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)34 url = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)35 league_code = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)36 season = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)37 teams = Field(existCheck=False, updateable=False, isArray=True, isDict=True, arrayReplace=True, isSearchField=False, searchWeight=0)38class PlayerItem(Item):39 _id = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)40 collection = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)41 url = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)42 player_id = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)43 name = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=True, searchWeight=200)44 firstname = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=True, searchWeight=100)45 lastname = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=True, searchWeight=300)46 birthday = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)47 country = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)48 seasons = Field(existCheck=False, updateable=True, isArray=True, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)49class FixtureItem(Item):50 _id = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)51 collection = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)52 date = Field(existCheck=False, updateable=True, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)53 matchday = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)54 homeTeam = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)55 awayTeam = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)56 result = Field(existCheck=False, updateable=True, isArray=False, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)57 season = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)58 league_code = Field(existCheck=True, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)59 url = Field(existCheck=False, updateable=False, isArray=False, isDict=False, arrayReplace=False, isSearchField=False, searchWeight=0)60 referee = Field(existCheck=False, updateable=True, isArray=False, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)61 stadium = Field(existCheck=False, updateable=True, isArray=False, isDict=True, arrayReplace=False, isSearchField=False, searchWeight=0)62 goals = Field(existCheck=False, updateable=True, isArray=True, isDict=True, arrayReplace=True, isSearchField=False, searchWeight=0)63 assists = Field(existCheck=False, updateable=True, isArray=True, isDict=True, arrayReplace=True, isSearchField=False, searchWeight=0)64 cards = Field(existCheck=False, updateable=True, isArray=True, isDict=True, arrayReplace=True, isSearchField=False, searchWeight=0)...

Full Screen

Full Screen

views.py

Source:views.py Github

copy

Full Screen

1from django.shortcuts import render2#from cloudserver.models import System_Monit3from django.views.decorators.csrf import csrf_exempt4import copy5import time6import psutil7import requests8import json9import numpy as np10import pandas as pd11from requests.adapters import HTTPAdapter12from urllib3.util.retry import Retry13from django.http import HttpResponse14#from cloudserver.dcpower.feadeal import build_cpu_metric, calc_cpu_usage15#from cloudserver.dcpower.testpower import lasso_vm2pc_pred, lasso_vm2pc_train16#from cloudserver.dcpower.dockerpower import build_docker_metrics, calc_docker_power17#from cloudserver.dcpower.predpower import pcpower_pred_train, pcpower_pred18#from cloudserver.dcpower.nnpower import nn_vm2pc_train, nn_vm2pc_pred19from pandas import DataFrame20from pandas import concat21from sklearn.preprocessing import MinMaxScaler22from keras.models import Sequential23from keras.layers import Dense24import numpy25def chart(request):26 # 显示html文件27 return render(request, "cloudserver/line-time-series/index.htm")28 29 30def chart_pred(request):31 return render(request,"cloudserver/line-time-series/chart_pred.htm")32def getCPUstate(interval=1):33 cpu = psutil.cpu_percent(interval)34 return cpu35def getMemorystate():36 phymem = psutil.virtual_memory()37 cur_mem = phymem.percent38 mem_rate = int(phymem.used / 1024 / 1024)39 mem_all = int(phymem.total / 1024 / 1024)40 line = {41 'cur_mem': cur_mem,42 'mem_rate': mem_rate,43 'mem_all': mem_all,44 }45 return cur_mem46def server_cpu_json(request):47 data = []48 if request.method == "GET":49 i = 050 while(i < 2):51 t = time.time()52 time_stamp = int(round(t * 1000)) # 转换为毫秒的时间戳53 cpu = getCPUstate()54 data.append([int(time_stamp), float('%.2f' % cpu)])55 print(data)56 i += 157 isdict = json.dumps(data) # json序列化列表58 return HttpResponse(isdict, content_type="application/json")59 60 61def server_mem_json(request):62 data = []63 if request.method == "GET":64 i = 065 while(i < 2):66 t = time.time()67 time_stamp = int(round(t * 1000)) # 转换为毫秒的时间戳68 mem = getMemorystate()69 data.append([int(time_stamp), float('%.2f' % mem)])70 print(data)71 i += 172 isdict = json.dumps(data) # json序列化列表73 return HttpResponse(isdict, content_type="application/json")74def server_cpu_pred_json(request):75 data = []76 i = 077 while (i < 30):78 t = time.time()79 time_stamp = int(round(t * 1000)) # 转换为毫秒的时间戳80 cpu = getCPUstate()81 data.append([int(time_stamp), float('%.2f' % cpu)])82 print(data)83 i += 184 ori_data = copy.deepcopy(data)85 for i, cpu in enumerate(ori_data):86 del[cpu[0]]87 # to_csv = pd.DataFrame(columns=['time','cpu_info'], data=data)88 values = np.array(ori_data)89 dataset = values.astype('float32')90 # normalize the dataset91 scaler = MinMaxScaler(feature_range=(0, 1))92 dataset = scaler.fit_transform(dataset)93 # split into train and test sets94 train_size = int(len(dataset) * 0.67)95 test_size = len(dataset) - train_size96 train, test = dataset[0:train_size, :], dataset[train_size:len(dataset), :]97 # reshape dataset98 look_back = 399 trainX, trainY = create_dataset(train, look_back)100 testX, testY = create_dataset(test, look_back)101 # create and fit Multilayer Perceptron model102 model = Sequential()103 model.add(Dense(12, input_dim=look_back, activation='relu'))104 model.add(Dense(8, activation='relu'))105 model.add(Dense(1))106 model.compile(loss='mean_squared_error', optimizer='adam')107 model.fit(trainX, trainY, epochs=100, batch_size=2, verbose=2)108 # model = load_model("/Users/Arithmetic/PycharmProjects/MyBlog/cloudserver/algorithm/airline_passengers/my_model.h5")109 trainScore = model.evaluate(trainX, trainY, verbose=0)110 trainPredict = model.predict(trainX)111 testPredict = model.predict(testX)112 mm = MinMaxScaler()113 trainPredict = mm.fit_transform(trainPredict)114 trainPredict = mm.inverse_transform(trainPredict)115 # print(trainPredict.tolist())116 tem = trainPredict.tolist()117 # print(data)118 # print(tem[0],len(tem),len(data))119 for i, item in enumerate(data[0:len(tem)]):120 item[1] = tem[i][0]121 # print(data)122 isdict = json.dumps(data) # json序列化列表123 print("OJBK")124 return HttpResponse(isdict, content_type="application/json")125 126def chart_json(request):127 # 读取表所有记录128 system_monit = System_Monit.objects.all()129 data = [] # 创建一个空列表130 dic = {}131 for i in system_monit: # 遍历,拼横纵坐标132 # 横坐标为时间戳,纵坐标为cpu使用率。注意,必须转换类型,否则数据不对。133 # t = int(i.time_stamp)134 # c = float('%.2f' % i.cpu)135 # dic['%d' % t] = c136 # data.append(dic)137 data.append([int(i.time_stamp), float('%.2f' % i.cpu)])138 # print(data)139 isdict = json.dumps(data) # json序列化列表140 return HttpResponse(isdict, content_type="application/json") # 执行类型为json...

Full Screen

Full Screen

Pytest Tutorial

Looking for an in-depth tutorial around pytest? LambdaTest covers the detailed pytest tutorial that has everything related to the pytest, from setting up the pytest framework to automation testing. Delve deeper into pytest testing by exploring advanced use cases like parallel testing, pytest fixtures, parameterization, executing multiple test cases from a single file, and more.

Chapters

  1. What is pytest
  2. Pytest installation: Want to start pytest from scratch? See how to install and configure pytest for Python automation testing.
  3. Run first test with pytest framework: Follow this step-by-step tutorial to write and run your first pytest script.
  4. Parallel testing with pytest: A hands-on guide to parallel testing with pytest to improve the scalability of your test automation.
  5. Generate pytest reports: Reports make it easier to understand the results of pytest-based test runs. Learn how to generate pytest reports.
  6. Pytest Parameterized tests: Create and run your pytest scripts while avoiding code duplication and increasing test coverage with parameterization.
  7. Pytest Fixtures: Check out how to implement pytest fixtures for your end-to-end testing needs.
  8. Execute Multiple Test Cases: Explore different scenarios for running multiple test cases in pytest from a single file.
  9. Stop Test Suite after N Test Failures: See how to stop your test suite after n test failures in pytest using the @pytest.mark.incremental decorator and maxfail command-line option.

YouTube

Skim our below pytest tutorial playlist to get started with automation testing using the pytest framework.

https://www.youtube.com/playlist?list=PLZMWkkQEwOPlcGgDmHl8KkXKeLF83XlrP

Run Pytest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful