How to use redis_prefix method in selenium-respectful

Best Python code snippet using selenium-respectful_python

__init__.py

Source:__init__.py Github

copy

Full Screen

1#!/usr/bin/env python2# -*- coding: utf-8 -*-3import time4from flask.ext.sqlalchemy import SQLAlchemy5from .. import app, redis6from ..utils import init_wechat_sdk7from ..plugins.state import get_user_last_interact_time8db = SQLAlchemy(app)9from .auth import Auth10from .express import Express11from .sign import Sign12from .user import User13def set_user_info(openid):14 """保存用户信息"""15 redis_prefix = "wechat:user:"16 cache = redis.hexists(redis_prefix + openid, 'nickname')17 if not cache:18 user_info = User.query.filter_by(openid=openid).first()19 if not user_info:20 try:21 wechat = init_wechat_sdk()22 user_info = wechat.get_user_info(openid)23 if 'nickname' not in user_info:24 raise KeyError(user_info)25 except Exception, e:26 app.logger.warning(u"获取微信用户信息 API 出错: %s" % e)27 user_info = None28 else:29 user = User(openid=user_info['openid'],30 nickname=user_info['nickname'],31 sex=user_info['sex'],32 province=user_info['province'],33 city=user_info['city'],34 country=user_info['country'],35 headimgurl=user_info['headimgurl'])36 user.save()37 # 与查询的数据类型一样,方便 redis 写入38 user_info = user39 if user_info:40 # 写入缓存41 redis.hmset(redis_prefix + user_info.openid, {42 "nickname": user_info.nickname,43 "realname": user_info.realname,44 "classname": user_info.classname,45 "sex": user_info.sex,46 "province": user_info.province,47 "city": user_info.city,48 "country": user_info.country,49 "headimgurl": user_info.headimgurl,50 "regtime": user_info.regtime51 })52 else:53 timeout = int(time.time()) - int(get_user_last_interact_time(openid))54 if timeout > 24 * 60 * 60:55 try:56 wechat = init_wechat_sdk()57 user_info = wechat.get_user_info(openid)58 if 'nickname' not in user_info:59 raise KeyError(user_info)60 except Exception, e:61 app.logger.warning(u"获取微信用户信息 API 出错: %s" % e)62 else:63 user = User.query.filter_by(openid=openid).first()64 user.nickname = user_info['nickname']65 user.sex = user_info['sex']66 user.province = user_info['province']67 user.city = user_info['city']68 user.country = user_info['country']69 user.headimgurl = user_info['headimgurl']70 user.update()71 redis.hmset(redis_prefix + openid, {72 "nickname": user_info['nickname'],73 "sex": user_info['sex'],74 "province": user_info['province'],75 "city": user_info['city'],76 "country": user_info['country'],77 "headimgurl": user_info['headimgurl']78 })79 return None80def is_user_exists(openid):81 """用户是否存在数据库"""82 redis_prefix = "wechat:user:"83 cache = redis.exists(redis_prefix + openid)84 if not cache:85 user_info = User.query.filter_by(openid=openid).first()86 if not user_info:87 return False88 else:89 return True90 else:91 return True92def get_sign_info(openid):93 """读取签到信息"""94 sign_info = Sign.query.filter_by(openid=openid).first()95 if not sign_info:96 return {97 "lastsigntime": 0,98 "keepdays": 0,99 "totaldays": 0100 }101 else:102 return {103 "lastsigntime": int(sign_info.lastsigntime),104 "keepdays": int(sign_info.keepdays),105 "totaldays": int(sign_info.totaldays)106 }107def update_sign_info(openid, lastsigntime, totaldays, keepdays):108 """更新签到信息"""109 # 写入数据库110 sign_info = Sign.query.filter_by(openid=openid).first()111 if not sign_info:112 sign_info = Sign(openid, lastsigntime, totaldays, keepdays)113 sign_info.save()114 else:115 sign_info.lastsigntime = lastsigntime116 sign_info.totaldays = totaldays117 sign_info.keepdays = keepdays118 sign_info.update()119 return None120def get_today_sign_ranklist(today_timestamp):121 """获取今日签到排行榜"""122 data = Sign.query.join(User, Sign.openid == User.openid) \123 .add_columns(User.nickname) \124 .filter(Sign.lastsigntime >= today_timestamp) \125 .order_by(Sign.lastsigntime).all()126 return data127def get_sign_keepdays_ranklist():128 """获取续签排行榜"""129 data = Sign.query.join(User, Sign.openid == User.openid) \130 .add_columns(User.nickname) \131 .order_by(Sign.keepdays.desc(),132 Sign.totaldays.desc(),133 Sign.lastsigntime).limit(6).all()134 return data135def get_express_num(openid, num):136 """读取快递信息"""137 express_info = Express.query.filter_by(openid=openid, num=num).first()138 return express_info139def set_express_num(openid, num, com_code, lastupdate, ischeck):140 """写入快递信息"""141 express_info = get_express_num(openid, num)142 if not express_info:143 express = Express(openid=openid,144 num=num,145 comcode=com_code,146 lastupdate=lastupdate,147 ischeck=ischeck)148 express.save()149 else:150 if express_info.lastupdate != lastupdate:151 express_info.lastupdate = lastupdate152 express_info.ischeck = ischeck153 express_info.update()154def get_all_uncheck_express():155 """读取未签收的快递信息"""156 express_info = Express.query.filter(Express.ischeck != 3).all()157 return express_info158def get_all_auth_info():159 """读取全部授权账号信息"""160 auth_info = Auth.query.all()161 return auth_info162def get_user_student_info(openid):163 """读取绑定的教务管理系统账号"""164 redis_prefix = "wechat:user:"165 user_info_cache = redis.hgetall(redis_prefix + openid)166 if 'studentid' in user_info_cache and 'studentpwd' in user_info_cache:167 return user_info_cache168 else:169 auth_info = Auth.query.filter_by(openid=openid).first()170 if auth_info and auth_info.studentid and auth_info.studentpwd:171 # 写入缓存172 redis.hmset(redis_prefix + openid, {173 "studentid": auth_info.studentid,174 "studentpwd": auth_info.studentpwd,175 })176 user_info_cache['studentid'] = auth_info.studentid177 user_info_cache['studentpwd'] = auth_info.studentpwd178 return user_info_cache179 else:180 return False181def get_user_library_info(openid):182 """读取绑定的图书馆账号"""183 redis_prefix = "wechat:user:"184 user_info_cache = redis.hgetall(redis_prefix + openid)185 if 'libraryid' in user_info_cache and 'librarypwd' in user_info_cache:186 return user_info_cache187 else:188 auth_info = Auth.query.filter_by(openid=openid).first()189 if auth_info and auth_info.libraryid and auth_info.librarypwd:190 # 写入缓存191 redis.hmset(redis_prefix + openid, {192 "libraryid": auth_info.libraryid,193 "librarypwd": auth_info.librarypwd,194 })195 user_info_cache['libraryid'] = auth_info.libraryid196 user_info_cache['librarypwd'] = auth_info.librarypwd197 return user_info_cache198 else:199 return False200def set_user_student_info(openid, studentid, studentpwd):201 """写入绑定的教务管理系统账号"""202 redis_prefix = "wechat:user:"203 auth_info = Auth.query.filter_by(openid=openid).first()204 if not auth_info:205 auth = Auth(openid=openid,206 studentid=studentid,207 studentpwd=studentpwd)208 auth.save()209 else:210 auth_info.studentid = studentid211 auth_info.studentpwd = studentpwd212 auth_info.update()213 # 写入缓存214 redis.hmset(redis_prefix + openid, {215 "studentid": studentid,216 "studentpwd": studentpwd217 })218def set_user_library_info(openid, libraryid, librarypwd):219 """写入绑定的借书卡账号"""220 redis_prefix = "wechat:user:"221 auth_info = Auth.query.filter_by(openid=openid).first()222 if not auth_info:223 auth = Auth(openid=openid,224 libraryid=libraryid,225 librarypwd=librarypwd)226 auth.save()227 else:228 auth_info.libraryid = libraryid229 auth_info.librarypwd = librarypwd230 auth_info.update()231 # 写入缓存232 redis.hmset(redis_prefix + openid, {233 "libraryid": libraryid,234 "librarypwd": librarypwd235 })236def set_user_realname_and_classname(openid, realname, classname):237 """写入用户的真实姓名和班级"""238 redis_prefix = "wechat:user:"239 cache = redis.hgetall(redis_prefix + openid)240 realname_exists = redis.hexists(redis_prefix + openid, 'realname')241 if not realname_exists or cache['realname'] == 'None':242 user_info = User.query.filter_by(openid=openid).first()243 if user_info and not user_info.realname:244 user_info.realname = realname245 user_info.classname = classname246 user_info.update()247 # 写入缓存248 redis.hmset(redis_prefix + openid, {249 "realname": realname,250 "classname": classname...

Full Screen

Full Screen

BatchGenerator.py

Source:BatchGenerator.py Github

copy

Full Screen

1import numpy as np2import random as rd3import redis4class BatchGenerator():5 def __init__(self, batch_size, validation_frac=0.05, test_frac=0.05):6 self.redis_prefix = "deepmm_"7 self.batch_size = int(batch_size)8 self.r = redis.Redis(decode_responses=True, 9 host='localhost', 10 port=6379, db=0)11 self.redis_clean()12 self.redis_load_dimensions()13 self.redis_create_dictionary()14 self.redis_split_train_val_test(validation_frac, test_frac)15 def redis_load_dimensions(self):16 self.m = int(self.r.scard(self.redis_prefix + "available_data"))17 self.T_x = int(self.r.get(self.redis_prefix + "T_x"))18 self.T_y = int(self.r.get(self.redis_prefix + "T_y"))19 self.n_x = int(self.r.get(self.redis_prefix + "n_x"))20 self.n_y = int(self.r.get(self.redis_prefix + "n_y"))21 def encode_inst(self, inst):22 return self.dict[inst]23 def decode_inst(self, one_hot):24 try:25 return self.reverse_dict[np.argmax(one_hot)]26 except KeyError as e:27 raise KeyError("Key error: " + str(one_hot)) from e28 def redis_create_dictionary(self):29 vals = self.r.zrangebyscore(30 self.redis_prefix + "dict",31 "-inf", "+inf")32 idm = np.identity(len(vals))33 self.dict = {}34 self.reverse_dict = {}35 for i in range(len(vals)):36 self.dict[vals[i]] = idm[i]37 self.reverse_dict[i] = vals[i]38 self.n_y = len(vals)39 def redis_split_train_val_test(self, validation_frac, test_frac):40 train_frac = 1 - validation_frac - test_frac41 validation = self.r.spop(self.redis_prefix + "available_data", 42 int(self.m * validation_frac))43 self.r.sadd(self.redis_prefix + "available_validation_data", 44 *validation)45 test = self.r.spop(self.redis_prefix + "available_data", 46 int(self.m * test_frac))47 self.r.sadd(self.redis_prefix + "available_test_data", 48 *test)49 self.r.rename(self.redis_prefix + "available_data", 50 self.redis_prefix + "available_train_data")51 self.m_train = int(self.r.scard(self.redis_prefix + "available_train_data"))52 self.m_validation = int(self.r.scard(self.redis_prefix + "available_validation_data"))53 self.m_test = int(self.r.scard(self.redis_prefix + "available_test_data"))54 def redis_reset_dataset(self, dataset):55 used_key = self.redis_prefix + "used_" + dataset + "_data"56 available_key = self.redis_prefix + "available_" + dataset + "_data"57 self.r.sunionstore(available_key, available_key, used_key)58 self.r.delete(used_key)59 def redis_clean(self):60 keys = [self.redis_prefix + "available_data", 61 self.redis_prefix + "available_train_data",62 self.redis_prefix + "used_train_data",63 self.redis_prefix + "available_validation_data",64 self.redis_prefix + "used_validation_data",65 self.redis_prefix + "available_test_data",66 self.redis_prefix + "used_test_data"]67 self.r.sunionstore(self.redis_prefix + "available_data", *keys)68 for k in keys[1:]:69 self.r.delete(k)70 def prepare_batch(self, dataset):71 if (dataset not in ["train", "validation", "test"]):72 raise ValueError("`dataset` should be train, validation or test")73 else:74 redis_key_available = self.redis_prefix + "available_" + dataset + "_data"75 redis_key_used = self.redis_prefix + "used_" + dataset + "_data"76 vals = self.r.spop(redis_key_available, self.batch_size)77 XY_str = [[vv.split(";") for vv in v.split("-")] for v in vals]78 X = np.zeros((self.batch_size, self.T_x, self.n_x), dtype=np.float32)79 X_decoder = np.zeros((self.batch_size, self.T_y, self.n_y), dtype=np.float32)80 Y = np.zeros((self.batch_size, self.T_y, self.n_y), dtype=np.float32)81 for seq_idx in range(len(XY_str)):82 for x_idx in range(len(XY_str[seq_idx][0])):83 X[seq_idx,x_idx] = np.array(list(XY_str[seq_idx][0][x_idx]), dtype=np.float32)84 for y_idx in range(len(XY_str[seq_idx][1])):85 X_decoder[seq_idx,y_idx] = self.encode_inst(XY_str[seq_idx][1][y_idx])86 87 self.r.sadd(redis_key_used, *vals)88 Y = np.roll(X_decoder, -1, 1)89 one_hot_end = self.encode_inst("<END>")90 for i in range(self.batch_size):91 Y[i,-1,:] = one_hot_end92 return [X, X_decoder], Y93 def generator_train(self):94 while (True):95 for i in range(self.m_train // self.batch_size):96 yield self.prepare_batch("train") 97 self.redis_reset_dataset("train")98 def generator_validation(self):99 while (True):100 for i in range(self.m_validation // self.batch_size):101 yield self.prepare_batch("validation") 102 self.redis_reset_dataset("validation") 103 def generator_test(self):104 for i in range(self.m_test // self.batch_size):105 yield self.prepare_batch("test") ...

Full Screen

Full Screen

token_bb_redis.py

Source:token_bb_redis.py Github

copy

Full Screen

1from typing import List2from app.db.repositories.base_redis import BaseRedisRepository3from aioredis import Redis4from app.schemas.token_bb import TokenBB5from app.core.config import REDIS_PREFIX6class TokenBBRedisRepository(BaseRedisRepository):7 def __init__(self, redis: Redis) -> None:8 super().__init__(redis)9 async def set_token(self, *, token: TokenBB, expires_in: int):10 """11 Set token hash fields to multiple values.12 :param token:13 """14 await self._redis.hmset(f"{REDIS_PREFIX}:{token.id}", token.dict())15 await self._redis.expire(f"{REDIS_PREFIX}:{token.id}", expires_in) # need seconds16 async def len(self, key: str):17 """18 Get the number of fields in a given hash.19 :param key:20 :return: int:21 """22 return await self._redis.hlen(f"{REDIS_PREFIX}:{key}")23 async def get_all(self, key: str):24 """25 Get all the fields and values in a hash.26 :param key:27 :return: dict:28 """29 return await self._redis.hgetall(f"{REDIS_PREFIX}:{key}")30 async def get_token_by_id(self, id: str) -> List[TokenBB]:31 data = await self._redis.hgetall(f"{REDIS_PREFIX}:{id}")32 if not data:33 return None34 token = TokenBB(**data)35 return token36 async def remove_token_by_key(self, key: str):37 await self._redis.delete(f"{REDIS_PREFIX}:{key}")38 async def expire_token_by_key(self, key: str, seconds: int = 60): # 1min39 await self._redis.expire(f"{REDIS_PREFIX}:{key}", seconds)40 async def remove_tokens_by_id(self, id: str):41 tokens: List[TokenBB] = await self.get_tokens_by_id(id=id)42 for token in tokens:43 await self.remove_token_by_key(token.id)44 async def expire_tokens_by_id(self, id: str):45 tokens: List[TokenBB] = await self.get_tokens_by_id(id=id)46 for token in tokens:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run selenium-respectful automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful