How to use mark_name method in Slash

Best Python code snippet using slash

query.py

Source:query.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2# 查询类模型3#4import copy5import json6import re7import time8from collections import OrderedDict9from django.db import models10from analysis.apps import AnalysisConfig11from framework.models import BaseModel12from framework.translation import _13from framework.utils import import_func, ObjectDict, trace_msg14from framework.utils.cache import CacheAttribute15from framework.utils.myenum import Enum16from log_def.models import DictDefine, LogDefine17class SqlMarkConfig(ObjectDict):18 mark_name: str19 name: str20 alias: str21 multiple: bool22 single: bool # 是否单选23 fixed: bool # 是否定值24 template: bool # 模板25 context_func: lambda x: x # 模板上下文26 order_num: int27 params_func: lambda x: x28class SqlMarkManager(object):29 mark_map = OrderedDict({30 # "game_alias" : {"name" : "game_alias", "multiple": False,31 # 'template' : 'sdk_center/widgets/game_alias_select2.html',32 # 'context_func': 'sdk_center.views.widgets.get_game_alias_dict'}, # 游戏代号33 # "game_aliass" : {"name" : "game_alias", "multiple": True,34 # 'template' : 'sdk_center/widgets/game_alias_dialog.html',35 # 'context_func': 'sdk_center.views.widgets.get_game_alias_dict'}, # 多个游戏代号36 #37 "server_id" : {"name" : "server_id", "multiple": False,38 'template' : 'analysis/widgets/query_server_select.html',39 'context_func': 'analysis.views.widgets.get_query_servers'}, # 服务器ID替换40 # 'context_func': 'game_manage.views.widgets.get_group_servers_dict'}, # 服务器ID替换41 # "server_ids" : {"name" : "server_id", "multiple": True,42 # 'template' : 'game_manage/widgets/group_server_dialog.html',43 # 'context_func': 'game_manage.views.widgets.get_group_servers_dict'}, # 多个服务器ID替换44 "server_name" : {"name": "server_name", "multiple": False, "alias": '服务器名'}, # 服务器名45 "master_id" : {"name": "master_id", "multiple": False, "alias": '母服ID'}, # 母服ID46 "master_db" : {"name": "master_db", "multiple": False, "alias": '母服db名'}, # 母服db名47 "sdate" : {"name": "sdate", "alias": '开始时间'}, # 开始时间48 "edate" : {"name": "edate", "alias": '结束时间'}, # 结束时间49 # "channel" : {"name" : "channel", "multiple": False, 'type': 'str',50 # 'template' : 'sdk_center/widgets/agent_channel_alias_select.html',51 # 'context_func': 'sdk_center.views.widgets.get_agent_channels_dict'}, # 渠道标识52 # "channels" : {"name" : "channel", "multiple": True, 'type': 'str',53 # 'template' : 'sdk_center/widgets/agent_channel_alias_dialog.html',54 # 'context_func': 'sdk_center.views.widgets.get_agent_channels_dict'}, # 渠道标识列表55 # "first_channel" : {"name" : "channel", "multiple": True, 'type': 'str',56 # 'template' : 'sdk_center/widgets/agent_alias.html',57 # 'context_func': 'sdk_center.views.widgets.get_agent_dict'}, # 渠道标识列表58 # "plan_channel" : {"name" : "channel", "multiple": True, 'type': 'str',59 # 'template' : 'sdk_center/widgets/plan_agent_channel.html',60 # 'context_func': 'sdk_center.views.widgets.get_agent_channels_dict'}, # 渠道标识61 # "package": {"name": "package", "multiple": True, 'type': 'str', "alias": '渠道标识'}, # 渠道标识62 # "media": {"name": "media", "multiple": True, 'type': 'str', "alias": '渠道标识'}, # 渠道标识63 #64 # "sdk_code": {"name": "sdk_code", "alias": '渠道标识'}, # 渠道代号65 # "channel_id" : {"name" : "channel_id", "multiple": False,66 # 'template' : 'game_manage/widgets/agent_channel_select.html',67 # 'context_func': 'game_manage.views.widgets.get_agent_channels_dict'}, # 渠道id68 # "channel_ids" : {"name" : "channel_id", "multiple": True,69 # 'template' : 'game_manage/widgets/agent_channel_dialog.html',70 # 'context_func': 'game_manage.views.widgets.get_agent_channels_dict'}, # 渠道id71 # "kouliang_rate": {"name": "kouliang_rate"},72 # "min_kouliang_num": {"name": "min_kouliang_num"},73 # "game_server_id" : {"name" : "game_server_id", "multiple": False,74 # 'template': 'sdk_center/widgets/game_server_alias.html'}, # 区服ID替换75 # "game_server_ids" : {"name" : "game_server_id", "multiple": True,76 # 'template': 'sdk_center/widgets/game_server_alias.html'}, # 多个区服ID替换77 "agent_name" : {"name": "agent_name", "alias": '平台名'}, # 平台名78 'platform_id' : {"name": "platform_id", "multiple": False, "alias": '游戏平id'}, # 游戏平台79 'platform_ids': {"name": "platform_id", "multiple": True, "alias": '平台IDS'},80 'user_id' : {"name" : "user_id", "multiple": False, "alias": '管理员id',81 'params_func': lambda request: [request.user.id]}, # 管理员id82 'admin_id' : {"name" : "admin_id", "multiple": False, "alias": '管理员id',83 'params_func': lambda request: [request.user.id]}, # 管理员id84 'is_root' : {"name" : "is_root", "multiple": False, "alias": '是否超级管理员',85 'params_func': lambda request: [1 if request.user.is_root else 0]}, # 是否超级管理员86 'is_manager' : {"name" : "is_manager", "multiple": False, "alias": '是否管理员',87 'params_func': lambda request: [1 if request.user.is_manager else 0]} # 是否管理员88 })89 @classmethod90 def register_mark(cls, sql_mark_config):91 cls.mark_map[sql_mark_config.mark_name] = sql_mark_config92class SqlBuilder(object):93 """sql94 @param TAG_FORMAT: 默认的标签外围95 @param params: 字典 一键一列表 例如reuqest.POST96 """97 TAG_FORMAT = '{{%s}}'98 def __init__(self, source_sql, params):99 """初始化100 @param source_sql 原SQL101 """102 self.sql = source_sql103 self.query_sql = source_sql.strip()104 self.params = params105 self.order_str = ''106 self.limit_str = ''107 self.mark_map = copy.copy(SqlMarkManager.mark_map)108 def query_sql_handle(self, sql=''):109 """查询sql转换110 """111 values_list = []112 for mark_name, config in self.mark_map.items():113 if self.has_mark(mark_name): # sql存在这个标签114 param_name = config['name']115 value = self.get_param_value(param_name, config)116 if value or value == 0 or config.get('single', False) or config.get('fixed',117 False): # 先把没值的标记替换,单选和开关可以为空值118 values_list.append((mark_name, value))119 else:120 self.empty_param_handle(mark_name)121 if values_list:122 for mark_name, value in values_list:123 self.replace_mark_to_value(mark_name, value)124 return self.query_sql125 def convert_datatable_name(self):126 datatable_tags = re.findall(r'${([^\s,]*)}', self.query_sql)127 if datatable_tags: # 包含${模型名}标签128 for data_model_name in set(datatable_tags):129 model_class = __import__(data_model_name)130 self.query_sql = self.query_sql.replace('${%s}' % data_model_name, model_class.get_table_name())131 return self.query_sql132 def get_param_value(self, param_name, config):133 """获取参数值134 """135 values = self.params.get(param_name, []) or ['']136 the_value = str(values[0])137 if the_value != '': # 参数不为空138 if not config.get('multiple', False): # 不是多选139 values = values[0]140 else:141 the_sp = r',|\s'142 if re.search(the_sp, the_value):143 values = re.split(the_sp, the_value)144 dict_key = config.get('dict', '') # 输入值转换,支持中文转成id145 if dict_key:146 if not config.get('fixed', False):147 value_def = DictDefine.get_dict_for_key(dict_key, reverse=True)148 values = self.convert_input_value(values, value_def, config.get('type'))149 else: # 固定值150 values = dict_key151 else:152 values = self.convert_input_value(values, {}, config.get('type'))153 else:154 values = config.get('default_value', '')155 return values156 def convert_input_value(self, values, value_def, value_type):157 """转换输入158 """159 if isinstance(values, (list, tuple, set)):160 _r = []161 for v in values:162 value = str(value_def.get(v, value_def.get(str(v), v)))163 # 防止 sql 注入164 value = self.get_safe_value(value)165 if not value.isdigit() or value_type == 'str':166 value = "'%s'" % value167 _r.append(value)168 return ','.join(_r)169 else:170 return value_def.get(values, values)171 def empty_param_handle(self, mark_name, value=' 0=0 '):172 """传入参数为空时,把 SQL的条件 例如:log_user={{player_id}} 换换成 0=0173 """174 the_mark = self.make_mark(mark_name)175 # pattern = re.compile(r"""[\w\.`(+=*]*[\W`]?(not[\s]+)?(like|in|=|=>|<=|>|<|!=)[\W]*%s[^\s]*[\s$]*""" % the_mark,flags=re.I)176 pattern = re.compile(r"""[\S]*[\W`]?([\s]+not[\s]+)?(like|in|=|=>|<=|>|<|!=)[\W]*%s[^\s]*[\s$]*""" % the_mark,177 flags=re.I)178 _s = int(time.time())179 self.query_sql = re.sub(pattern, value, self.query_sql)180 def set_limit(self, page_size, page_num):181 """设置sql limit182 """183 page_num, page_size = int(page_num), int(page_size)184 self.limit_str = 'limit %s,%s' % ((page_num - 1) * page_size, page_size)185 def set_order(self, sort_key, sort_type='ASC'):186 if sort_key and sort_type.lower() in ('asc', 'desc'):187 if 'order' in self.query_sql.strip()[-30:].lower():188 self.order_str = ' ,%s %s' % (sort_key, sort_type)189 else:190 self.order_str = ' ORDER BY %s %s' % (sort_key, sort_type)191 safe_pattern = re.compile(r"""'|"|;|>|<|%|--""", flags=re.I)192 def get_safe_value(self, value):193 value = re.sub(self.safe_pattern, '', str(value))194 return value195 def replace_mark_to_value(self, mark_name, value):196 self.query_sql = self.query_sql.replace(self.make_mark(mark_name), value)197 def make_mark(self, mark_name):198 return self.TAG_FORMAT % mark_name199 def get_query_sql(self):200 return '%s %s %s' % (Query.filter_sql(self.query_sql), self.order_str, self.limit_str)201 def get_count_sql(self):202 return 'select count(0) from (%s) newTable' % Query.filter_sql(self.query_sql)203 def has_mark(self, mark_name):204 return self.make_mark(mark_name) in self.query_sql205 def generate_mark_conditions(self):206 mark_conditions = {}207 for k, v in self.mark_map.items():208 mark_conditions[k] = v if self.has_mark(k) else False209 return mark_conditions210 def get_context_func(self, mark_name):211 func = self.mark_map.get(mark_name,{}).get('context_func', None)212 if isinstance(func, str):213 return import_func(func)214 return func215 def get_param_func(self, mark_name):216 func = self.mark_map.get(mark_name,{}).get('params_func', None)217 if isinstance(func, str):218 return import_func(func)219 return func220 def set_mark_parmas(self, request):221 for k, item in self.mark_map.items():222 param_func = self.get_param_func(k)223 if param_func:224 self.params[k] = param_func(request)225class QueryCompiler(SqlBuilder):226 def __init__(self, query_model, params):227 self.query = query_model228 self.already_get_query_sql = False229 the_sql = self.query.sql or self.query.get_default_sql()230 super(QueryCompiler, self).__init__(the_sql, params)231 self.mark_map.update(self.query.field_config)232 def get_tfoot_sql(self):233 """获取tfoot的汇总sql234 """235 assert self.already_get_query_sql, '只能先使用get_query_sql 获取查询SQL'236 tmp_sql = self.query_sql237 ftoot_sql = self.query.other_sql238 if ftoot_sql:239 self.query_sql = ftoot_sql240 self.query_sql_handle()241 ftoot_sql = self.query_sql242 self.query_sql = tmp_sql243 return ftoot_sql244 def get_query_sql(self):245 self.already_get_query_sql = True246 if self.query.order and not self.order_str: # 如果没有排序设置默认查询排序247 self.set_order(self.query.order, self.query.order_type_str)248 return super(QueryCompiler, self).get_query_sql()249 def get_statistic(self):250 """获取所需的统计251 """252 statistic_tags = re.findall(r'<<([^\s,]*)>>', self.query.sql)253 statistic_tags.extend(re.findall(r'<<([^\s,]*)>>', self.query.other_sql))254 result = []255 if statistic_tags: # 包含<<统计名>>标签256 statistic_names_d = DictDefine.get_dict_for_key('statistic_name', reverse=True)257 for s_name in set(statistic_tags):258 s_id = statistic_names_d.get(re.sub(r'\s', '', s_name)) # 去除空格等特殊字符259 if s_id:260 result.append((s_id, s_name))261 return result262 def query_sql_handle(self):263 """ 增加获取统计id 使用方法 <<统计名>>264 """265 record = super(QueryCompiler, self).query_sql_handle()266 if isinstance(self.query_sql, bytes):267 self.query_sql = self.query_sql.decode('utf8')268 statistic_tags = re.findall(r'<<([^\s,]*)>>', self.query_sql)269 from .statistic import Statistic270 if statistic_tags: # 包含<<统计名>>标签271 statistic_names_d = dict(Statistic.objects.values_list('name',272 'id')) # ,DictDefine.get_dict_for_key('statistic_name', reverse=True)273 for s_name in set(statistic_tags):274 s_id = statistic_names_d.get(re.sub(r'\s', '', s_name)) # 去除空格等特殊字符275 if s_id:276 self.query_sql = self.query_sql.replace('<<%s>>' % s_name, '"%s"' % s_id)277 # 限制使用的统计278 if self.has_mark("statistic"):279 statistic_list = self.get_statistic()280 self.replace_mark_to_value("statistic", ",".join([s_id for s_id, _ in statistic_list]))281 return self.query_sql282 def has_mark(self, mark_name):283 return super(QueryCompiler, self).has_mark(mark_name)284 def has_conditions(self):285 for k, v in self.query.field_config.items():286 if v.get('search', ''):287 return True288 def get_conn(self, server_id=0):289 from .query_server import QueryServer290 return QueryServer.get_conn(server_id, 'read')291class Query(BaseModel):292 """查询293 """294 log_key = models.CharField('关联表标识', max_length=30, db_index=True, null=False)295 log_type = models.IntegerField(default=0)296 class OrderType(Enum):297 DESC = 1, _('倒序')298 AESC = 0, _('正序')299 key = models.CharField('查询标识', default='', max_length=100, null=False, blank=True, db_index=True)300 name = models.CharField('查询名称', default='', max_length=100, null=False, blank=False, unique=True)301 select = models.CharField(_('查询字段'), max_length=1000, null=False, blank=True)302 where = models.CharField(max_length=500, null=False, blank=True)303 group = models.CharField('用途分组', max_length=50, null=False, blank=True, default='')304 order = models.CharField(max_length=20, null=False, blank=True, default='')305 order_type = models.IntegerField(default=OrderType.AESC, choices=OrderType.member_list(), null=False)306 sql = models.TextField(_('SQL'), default='', null=False, blank=True)307 other_sql = models.TextField(_('其他SQL'), default='', null=False, blank=True)308 cache_validate = models.IntegerField(default=0, null=True)309 is_paging = models.BooleanField(_('是否分页'), default=False, null=False)310 remark = models.CharField('备注', max_length=1000, blank=True)311 _field_config = models.TextField('查询字段定义', default="", blank=True)312 template_name = models.CharField('模版名', max_length=32, blank=True)313 _DEFAULT_FIELD_CONFIG = {314 "标记名": {"name" : "参数名", "dict": "", "sort": False, "order_num": 99, "multiple": False,315 "search": False, "merge_value": True}}316 __cache_config = None317 class Meta:318 app_label = AnalysisConfig.name319 ordering = ('id',)320 @CacheAttribute321 def log_def_list(self):322 return LogDefine.objects.using('read').all()323 def save(self, *args, **kwargs):324 super(Query, self).save(*args, **kwargs)325 def get_default_sql(self):326 """ 默认SQL327 """328 log_def = self.log_def329 the_sql = """SELECT %s FROM %s WHERE log_time BETWEEN '{{sdate}}' AND '{{edate}}' """ % (330 self.select, log_def.table_name)331 if self.where:332 the_sql += ' AND %s' % self.where333 if self.order:334 the_sql += ' ORDER BY %s %s' % (self.order, self.order_type_str)335 for field_name, config in log_def.config.items():336 verbose_name = config['verbose_name']337 if verbose_name:338 the_sql = the_sql.replace(verbose_name, field_name)339 return the_sql340 @property341 def order_type_str(self):342 return 'DESC' if self.order_type == 1 else 'AESC'343 @CacheAttribute344 def log_def(self):345 return LogDefine.objects.filter(key=self.log_key).first()346 @property347 def is_center_query(self):348 """是否中央查询349 """350 try:351 return self.log_def.status == LogDefine.PositionType.CENTER352 except:353 return True354 @CacheAttribute355 def selects(self):356 return [f.strip() for f in self.select.split(',') if f]357 @property358 def field_config(self):359 """字段定义360 """361 try:362 s_d = OrderedDict()363 self.__cache_config = self.__cache_config or OrderedDict(364 sorted(json.loads(self._field_config).items(), key=lambda x: x[1]['order_num']))365 except:366 print(trace_msg())367 self.__cache_config = self._DEFAULT_FIELD_CONFIG368 return self.__cache_config369 @field_config.setter370 def field_config(self, obj_value):371 if isinstance(obj_value, dict):372 obj_value = json.dumps(obj_value, ensure_ascii=False)373 self._field_config = obj_value374 @classmethod375 def filter_sql(cls, sql):376 p = re.compile('(update|delete|modify|lock[\s]+|drop|table)', re.I)377 sql = p.sub('', sql)378 return sql379 @CacheAttribute380 def safe_sql(self):381 return self.__class__.filter_sql(self.sql)382 def __unicode__(self):383 return '%s' % self.name...

Full Screen

Full Screen

views.py

Source:views.py Github

copy

Full Screen

1import random2from io import BytesIO3import logging4from django.contrib.auth import authenticate, login as auth_login, logout as auth_logout5from django.contrib.auth.decorators import login_required6from django.core import signing7from django.http import HttpResponse, Http404, JsonResponse8from django.shortcuts import render, redirect, get_object_or_4049from django.shortcuts import render_to_response10from django.core.urlresolvers import reverse_lazy, reverse11from django.template.response import TemplateResponse12from django.utils import timezone13from django.utils.decorators import method_decorator14from django.views.decorators.cache import never_cache15from django.views.decorators.http import last_modified16from django.views.generic import FormView, TemplateView, DetailView17from goods.models import Goods18from .models import User, RecipientsAddress19from .form import UserModelRegisterForm, UserLoginForm20from PIL import Image, ImageDraw, ImageFont, ImageFilter21def create_captcha_code(request):22 '''23 生成 4个汉字验证码 ,需要用户按顺序点击其中的 3个24 保存到会话中 25 返回验证码图片'''26 # 随机的4个汉字:27 zh_words = ['牛', '国', '王', '有', '天', '好', '吉', '祥', '如', '意']28 zh_words = random.sample(zh_words, 4)29 # 画图30 image = Image.open(r'C:\Users\chaiming\Desktop\IMG_20170502_165107_344.jpg')31 # w, h = image.size32 # image.thumbnail((w // 2, h // 2))33 # 随机颜色34 def rnd_color():35 return (random.choice([0, 255]), 0, random.choice([0, 255]))36 # 创建Font对象: 大小为长宽中小的那个的8/137 width, height = image.size38 font_size = int(min(image.size)) // 839 font = ImageFont.truetype(r'F:\pycharm_projects\djcode\dailyfresh\user\fonts\MSYH.TTF', font_size)40 # 创建Draw对象:41 draw = ImageDraw.Draw(image)42 # 生成随机坐标43 x1 = random.randint(0, width - font_size)44 y1 = random.randint(0 + font_size, height - font_size)45 # 循环3次,把剩下的符合的坐标提取46 points_list = [(x1, y1)]47 for i in range(3):48 is_not_pass = True49 while is_not_pass:50 x2, y2 = random.randint(0, width - font_size), random.randint(0 + font_size, height - font_size)51 for x, y in points_list:52 if (x2 > x + font_size) or (x2 + font_size < x) or (y2 > y + font_size) or (y2 + font_size < y):53 pass54 else:55 is_not_pass = True56 break57 else:58 is_not_pass = False59 points_list.append((x2, y2))60 # 输出文字:61 for t in range(4):62 draw.text(points_list[t], zh_words[t], font=font,63 fill=rnd_color())64 z_p = zip(zh_words, points_list) # 把字和坐标组合65 sample = random.sample([i for i in z_p], 3)66 # 设置session 在后端验证67 captcha_dict = {68 'a1': sample[0][0], 'x1': sample[0][1][0], 'y1': sample[0][1][1],69 'a2': sample[1][0], 'x2': sample[1][1][0], 'y2': sample[1][1][1],70 'a3': sample[2][0], 'x3': sample[2][1][0], 'y3': sample[2][1][1],71 'size': font_size72 }73 # 模糊:74 # image = image.filter(ImageFilter.BLUR)75 # del draw76 # draw = ImageDraw.Draw(image)77 draw.text(78 (0, 0),79 '"{}", "{}", "{}"'.format(captcha_dict['a1'], captcha_dict['a2'], captcha_dict['a3']),80 font=font,81 fill=(0, 0, 0)82 )83 buff = BytesIO()84 image.save(buff, 'jpeg')85 response = HttpResponse(buff.getvalue(), 'image/jpeg')86 request.session['captcha'] = captcha_dict87 return response88class UserCreate(FormView):89 form_class = UserModelRegisterForm90 template_name = 'user/register.html'91 success_url = reverse_lazy('user:user_center_info') # 跳转到用户中心92 @method_decorator(never_cache)93 def dispatch(self, *args, **kwargs):94 return super(UserCreate, self).dispatch(*args, **kwargs)95 def form_valid(self, form):96 # 对验证码进行验证97 code = self.request.session.get('captcha')98 if code:99 ret = form.valid_captcha(code=code)100 else:101 logger = logging.getLogger('captcha')102 logger.warning('IP: {}, 主机: {}, 服务器认证后的用户: {} 客户端篡改Cookies'.format(103 self.request.META.get('REMOTE_ADDR'),104 self.request.META.get('REMOTE_HOST'),105 self.request.META.get('REMOTE_USER'),106 ))107 raise Http404108 if ret is False:109 return render(self.request, self.template_name, {'form': form})110 User.objects.create_user(111 form.cleaned_data.get('username'),112 form.cleaned_data.get('email'),113 form.cleaned_data.get('password'),114 )115 return super().form_valid(form) # 验证成功后调用, 默认返回成功的重定向116def username_is_repeat(request):117 '''ajax验证同名用户功能'''118 if User.objects.filter(username=request.GET.get('username')).exists():119 return JsonResponse({'is_repeat': 1})120 return JsonResponse({'is_repeat': 0})121class UserLogin(FormView):122 form_class = UserLoginForm123 template_name = 'user/login.html'124 success_url = reverse_lazy('user:user_center_info')125 def get_context_data(self, **kwargs):126 context = super().get_context_data(**kwargs)127 context['mark_name'] = self.request.get_signed_cookie('mark_name', '')128 return context129 def form_valid(self, form):130 user = authenticate(username=form.cleaned_data.get('username'), password=form.cleaned_data.get('password'))131 if user is not None:132 if user.is_active:133 # 用户登陆134 auth_login(self.request, user)135 # 重定向136 return super().form_valid(form)137 else:138 # 用户验证不成功139 form.add_error(None, '用户名或密码不正确')140 context = {}141 response = TemplateResponse(self.request, self.template_name, context)142 # print(response.context_data)143 # 如果勾选了记住用户名,则保存到Cookies中, key=mark_name, 如果未勾选,设置mark_name 为空144 username = self.request.POST.get('username')145 if self.request.POST.get('mark_name') == '1':146 response.set_signed_cookie('mark_name', username)147 else:148 response.delete_cookie('mark_name')149 username = ''150 context = {151 'form': form,152 'mark_name': username153 }154 response.context_data = context155 return response156 # 直接调用 self.form_invalid()157 # return self.form_invalid(form)158 def form_invalid(self, form):159 context = {}160 response = TemplateResponse(self.request, self.template_name, context)161 # 如果勾选了记住用户名,则保存到Cookies中, key=mark_name162 username = self.request.POST.get('username')163 if self.request.POST.get('mark_name') == '1':164 response.set_signed_cookie('mark_name', username)165 else:166 response.delete_cookie('mark_name')167 username = ''168 context = {169 'form': form,170 'mark_name': username171 }172 response.context_data = context173 return response174def login_datetime(request):175 return timezone.datetime(2018, 5, 26, 16, 36) # 登陆页面最后修改时间176@last_modified(login_datetime)177@never_cache178def login(request):179 template_name = 'user/login.html'180 success_url = reverse_lazy('user:user_center_info') # 跳转到用户中心181 if request.method == 'GET':182 return render(request, template_name, {'mark_name': request.get_signed_cookie('mark_name', '')})183 else:184 username = request.POST.get('username')185 form = UserLoginForm(request.POST)186 if form.is_valid():187 user = authenticate(username=form.cleaned_data['username'], password=form.cleaned_data['password'])188 if user is not None:189 if user.is_active:190 # 登陆191 auth_login(request, user)192 # 重定向, 获取装饰器login_required 的next参数,实现跳转到登陆之前的路径193 success_url = request.GET.get('next', success_url)194 response = redirect(success_url)195 # 记住用户名功能196 if request.POST.get('mark_name') == '1':197 response.set_signed_cookie('mark_name', username)198 else:199 response.delete_cookie('mark_name')200 return response201 else:202 form.add_error(None, '用户名或密码不正确')203 # 页面记住用户名功能,把用户名存在cookie里204 response = TemplateResponse(request, template_name, {})205 if request.POST.get('mark_name') == '1':206 response.set_signed_cookie('mark_name', username)207 else:208 response.delete_cookie('mark_name')209 username = ''210 response.context_data = {'form': form, 'mark_name': username}211 return response212'''213头部Last-Modified 格式化214Expires: Thu, 10 May 2018 10:17:09 GMT215response['Last-Modified'] = timezone.datetime(2018, 5, 8).strftime('%a, %d %b %Y %H:%M:%S GMT')216'''217def logout_view(request):218 '''登出视图'''219 auth_logout(request)220 # Redirect to a success page.221 next_path = request.GET.get('next')222 if next_path:223 return redirect('%s?next=%s' % (reverse('user:login'), next_path))224 else:225 return redirect(reverse('user:login'))226class UserDetail(TemplateView):227 template_name = 'user/user_center_info.html'228 @method_decorator(login_required)229 def dispatch(self, *args, **kwargs):230 return super().dispatch(*args, **kwargs)231 def get_context_data(self, **kwargs):232 context = super().get_context_data(**kwargs)233 signing_str = self.request.COOKIES.get('gid_list')234 if signing_str is not None:235 try:236 gid_list = signing.loads(signing_str)237 except signing.BadSignature as e:238 logger = logging.getLogger('django.cookies')239 logger.warning('cookies中获取gid错误:{}, USER[{}], REMOTE_ADDR[{}]'.format(240 e,241 self.request.user,242 self.request.META.get('REMOTE_ADDR'),243 ))244 gid_list = []245 else:246 gid_list = []247 recently_viewed = []248 if gid_list is not []:249 for gid in gid_list:250 try:251 goods = Goods.objects.get(pk=gid)252 except Goods.DoesNotExist:253 goods = None254 if goods is not None:255 recently_viewed.append(goods)256 context['recently_viewed'] = recently_viewed257 return context258 # def get(self, request, *args, **kwargs):259 # self.cookies = request.COOKIES260 # return super().get(request, *args, **kwargs)261@login_required()262def edit_recipients_address(request):263 if request.method == 'POST':264 recipients_address = RecipientsAddress(265 user=request.user,266 recipient=request.POST.get('recipient', ''),267 shipping_address=request.POST.get('shipping_address', ''),268 postcode=request.POST.get('postcode', ''),269 mobile_number=request.POST.get('mobile_number', ''),270 )271 recipients_address.save()272 return render(request, 'user/user_center_site.html')273# 删除用户关联地址274@login_required()275def delete_address(request, uid, recid):276 uid = int(uid)277 recid = int(recid)278 address = get_object_or_404(RecipientsAddress, pk=recid)279 if address.user_id != uid:280 logger = logging.getLogger('myproject.debug')281 logger.debug('用户id和捕获参数uid不一致!')282 logger.debug('{}[{}] != {}[{}]'.format(type(address.user_id), address.user_id, type(uid), uid))283 raise Http404284 address.delete()...

Full Screen

Full Screen

marked.py

Source:marked.py Github

copy

Full Screen

1from typing import List, Set, Union, Callable, NamedTuple2from copy import deepcopy3from collections import namedtuple4from beancount.core.data import Transaction, Posting, new_metadata, Entries5import beancount_plugin_utils.metaset as metaset6from beancount_plugin_utils.BeancountError import BeancountError, entry_error_handler7MARK_SEPERATOR = "-"8PluginUtilsMarkedError = namedtuple("PluginUtilsMarkedError", "source message entry")9def normalize_transaction(10 tx: Transaction,11 mark_name: str,12 account_types: Union[Set[str], bool] = False,13):14 """15 Move marks in tags with name `mark_name` into meta, if any, and merge with marks in meta in a way of metaset.16 Then, if `account_types` are provided, move marks into postings with the given account types or error if there's none such posting.17 Note: in beancount `meta` is mandatory on transactions, but optional on postings.18 Example:19 try:20 tx, is_marked = marked.normalize_transaction(config.mark_name, entry, ("Income", "Expenses"))21 except BeancountError as e:22 new_entries.append(entry)23 errors.append(e.to_named_tuple())24 continue25 for posting, orig_posting in zip(tx, orig_tx):26 marks = metaset.get(posting)27 # Do your thing.28 Args:29 txs [Transaction]: transaction instances.30 mark_name [str]: the mark name.31 account_types [Set[str], False]: set of account types that must be considered, defaults to False.32 Returns:33 new Transaction instance with normalized marks.34 boolean of whenever mark was used in this transaction.35 Raises:36 BeancountError.37 """38 copy = deepcopy(tx)39 for tag in copy.tags:40 if tag == mark_name or tag[0 : len(mark_name + MARK_SEPERATOR)] == mark_name + MARK_SEPERATOR:41 copy = copy._replace(42 tags=copy.tags.difference([tag]),43 meta=metaset.add(copy.meta, mark_name, tag[len(mark_name + MARK_SEPERATOR) :] or ""),44 )45 is_used = False46 if metaset.has(copy.meta, mark_name):47 is_used = True48 for posting in copy.postings:49 if posting.meta == None:50 continue51 if not metaset.has(posting.meta, mark_name):52 continue53 is_used = True54 if not account_types:55 raise BeancountError(56 posting.meta,57 'Mark "{}" can be only applied to transactions, not postings: "{}".'.format(mark_name, posting.account),58 tx,59 PluginUtilsMarkedError,60 )61 if not (posting.account.split(":")[0] in account_types):62 raise BeancountError(63 posting.meta,64 'Mark "{}" can be only applied to posting with account types of: {}'.format(mark_name, account_types),65 tx,66 PluginUtilsMarkedError,67 )68 if not account_types:69 return copy, is_used70 if not is_used:71 return copy, False72 is_applied = False73 postings = []74 default_marks = metaset.get(copy.meta, mark_name)75 copy = copy._replace(meta=metaset.clear(copy.meta, mark_name))76 for posting in copy.postings:77 marks = metaset.get(posting.meta, mark_name)78 if len(marks) > 0:79 postings.append(posting)80 is_applied = True81 elif len(default_marks) > 0 and (posting.account.split(":")[0] in account_types):82 postings.append(posting._replace(meta=metaset.set(posting.meta, mark_name, default_marks)))83 is_applied = True84 else:85 postings.append(posting)86 if not is_applied:87 raise BeancountError(88 tx.meta,89 'Mark "{}" on a transaction has no effect because transaction does not have postings with account types of: {}'.format(90 mark_name, account_types91 ),92 tx,93 PluginUtilsMarkedError,94 )95 copy = copy._replace(postings=postings)96 return copy, True97def on_marked_transactions(98 per_marked_transaction: Callable[[Transaction, Transaction, NamedTuple], List[Transaction]],99 entries: Entries,100 config: NamedTuple,101 mark_name: str,102 account_types: Union[Set[str], bool] = False,103 error_named_tuple: NamedTuple = BeancountError,104):105 new_entries: Entries = []106 errors: List[NamedTuple] = []107 for entry in entries:108 with entry_error_handler(entry, new_entries, errors, error_named_tuple):109 if not isinstance(entry, Transaction) or entry.flag == "P": # Ignore txs generated by padding too.110 new_entries.append(entry)111 continue112 tx, is_marked = normalize_transaction(entry, mark_name, account_types)113 if is_marked:114 new_txs = per_marked_transaction(tx, entry, config)115 new_entries.extend(new_txs)116 else:117 new_entries.append(entry)...

Full Screen

Full Screen

common.py

Source:common.py Github

copy

Full Screen

1from typing import List, Set, Union, Tuple2from copy import deepcopy3from beancount.core.data import Transaction, Posting4from beancount.core.inventory import Inventory5import beancount_plugin_utils.metaset as metaset6def read_config(config_string):7 """8 Args:9 config_string: A configuration string in JSON format given in source file.10 Returns:11 A dict of the configuration string.12 """13 if len(config_string) == 0:14 config_obj = {}15 else:16 config_obj = eval(config_string, {}, {})17 if not isinstance(config_obj, dict):18 raise RuntimeError("Invalid plugin configuration: should be a single dict.")19 return config_obj20MARK_SEPERATOR = "-"21def normalize_marked_txn(tx: Transaction, mark_name: str):22 """23 If a transaction is marked, hoist marked tags into transation meta(s).24 Args:25 txs [Transaction]: transaction instances.26 mark_name [str]: the mark.27 Return:28 Transaction with normalized marks.29 """30 copy = deepcopy(tx)31 for tag in copy.tags:32 if (33 tag == mark_name34 or tag[0 : len(mark_name + MARK_SEPERATOR)] == mark_name + MARK_SEPERATOR35 ):36 copy = copy._replace(37 tags=copy.tags.difference([tag]),38 meta=metaset.add(39 copy.meta, mark_name, tag[len(mark_name + MARK_SEPERATOR) :] or ""40 ),41 )42 return copy43DEFAULT_APPLICABLE_ACCOUNT_TYPES = set(44 ["Income", "Expenses", "Assets", "Liabilities", "Equity"]45)46def marked_postings(47 tx: Transaction,48 mark_name: str,49 applicable_account_types: Set[str] = DEFAULT_APPLICABLE_ACCOUNT_TYPES,50 allow_posting_level_mark: bool = True,51):52 """53 Iterates over postings of the transaction, returning most specific mark value for applicable account types.54 Args:55 tx [Transaction]: transaction instance.56 mark_name [str]: the mark.57 applicable_account_types [Set[str]]: set of account types that must be considered, defaults to all five.58 allow_posting_level_mark [bool]: set to False if posting-level marks should raise error instead.59 Yields:60 list of mark values or None.61 posting.62 original posting.63 original transaction.64 """65 copy = deepcopy(tx)66 default_marks = metaset.get(tx.meta, mark_name)67 copy = copy._replace(meta=metaset.clear(tx.meta, mark_name))68 for _posting in copy.postings:69 marks = metaset.get(_posting.meta, mark_name)70 posting = _posting._replace(meta=metaset.clear(_posting.meta, mark_name))71 if len(marks) > 0:72 yield marks, posting, _posting, copy73 elif len(default_marks) > 0:74 if posting.account.split(":")[0] not in applicable_account_types:75 yield None, posting, _posting, copy76 else:77 yield default_marks, posting, _posting, copy78 else:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful