Best Python code snippet using slash
query.py
Source:query.py  
1# -*- coding: utf-8 -*-2# æ¥è¯¢ç±»æ¨¡å3#4import copy5import json6import re7import time8from collections import OrderedDict9from django.db import models10from analysis.apps import AnalysisConfig11from framework.models import BaseModel12from framework.translation import _13from framework.utils import import_func, ObjectDict, trace_msg14from framework.utils.cache import CacheAttribute15from framework.utils.myenum import Enum16from log_def.models import DictDefine, LogDefine17class SqlMarkConfig(ObjectDict):18    mark_name: str19    name: str20    alias: str21    multiple: bool22    single: bool  # æ¯å¦åé23    fixed: bool  # æ¯å¦å®å¼24    template: bool  # 模æ¿25    context_func: lambda x: x  # 模æ¿ä¸ä¸æ26    order_num: int27    params_func: lambda x: x28class SqlMarkManager(object):29    mark_map = OrderedDict({30            # "game_alias"      : {"name"        : "game_alias", "multiple": False,31            #                      'template'    : 'sdk_center/widgets/game_alias_select2.html',32            #                      'context_func': 'sdk_center.views.widgets.get_game_alias_dict'},  # 游æä»£å·33            # "game_aliass"     : {"name"        : "game_alias", "multiple": True,34            #                      'template'    : 'sdk_center/widgets/game_alias_dialog.html',35            #                      'context_func': 'sdk_center.views.widgets.get_game_alias_dict'},  # å¤ä¸ªæ¸¸æä»£å·36            #37            "server_id"   : {"name"        : "server_id", "multiple": False,38                            'template'    : 'analysis/widgets/query_server_select.html',39                            'context_func': 'analysis.views.widgets.get_query_servers'},  # æå¡å¨IDæ¿æ¢40            # 'context_func': 'game_manage.views.widgets.get_group_servers_dict'},  # æå¡å¨IDæ¿æ¢41            # "server_ids"      : {"name"        : "server_id", "multiple": True,42            #                      'template'    : 'game_manage/widgets/group_server_dialog.html',43            #                      'context_func': 'game_manage.views.widgets.get_group_servers_dict'},  # å¤ä¸ªæå¡å¨IDæ¿æ¢44            "server_name" : {"name": "server_name", "multiple": False, "alias": 'æå¡å¨å'},  # æå¡å¨å45            "master_id"   : {"name": "master_id", "multiple": False, "alias": 'æ¯æID'},  # æ¯æID46            "master_db"   : {"name": "master_db", "multiple": False, "alias": 'æ¯ædbå'},  # æ¯ædbå47            "sdate"       : {"name": "sdate", "alias": 'å¼å§æ¶é´'},  # å¼å§æ¶é´48            "edate"       : {"name": "edate", "alias": 'ç»ææ¶é´'},  # ç»ææ¶é´49            # "channel"         : {"name"        : "channel", "multiple": False, 'type': 'str',50            #                      'template'    : 'sdk_center/widgets/agent_channel_alias_select.html',51            #                      'context_func': 'sdk_center.views.widgets.get_agent_channels_dict'},  # æ¸ éæ è¯52            # "channels"        : {"name"        : "channel", "multiple": True, 'type': 'str',53            #                      'template'    : 'sdk_center/widgets/agent_channel_alias_dialog.html',54            #                      'context_func': 'sdk_center.views.widgets.get_agent_channels_dict'},  # æ¸ éæ è¯å表55            # "first_channel"   : {"name"        : "channel", "multiple": True, 'type': 'str',56            #                      'template'    : 'sdk_center/widgets/agent_alias.html',57            #                      'context_func': 'sdk_center.views.widgets.get_agent_dict'},  # æ¸ éæ è¯å表58            # "plan_channel"    : {"name"        : "channel", "multiple": True, 'type': 'str',59            #                      'template'    : 'sdk_center/widgets/plan_agent_channel.html',60            #                      'context_func': 'sdk_center.views.widgets.get_agent_channels_dict'},  # æ¸ éæ è¯61            # "package": {"name": "package", "multiple": True, 'type': 'str', "alias": 'æ¸ éæ è¯'},  # æ¸ éæ è¯62            # "media": {"name": "media", "multiple": True, 'type': 'str', "alias": 'æ¸ éæ è¯'},  # æ¸ éæ è¯63            #64            # "sdk_code": {"name": "sdk_code", "alias": 'æ¸ éæ è¯'},  # 渠é代å·65            # "channel_id"      : {"name"        : "channel_id", "multiple": False,66            #                      'template'    : 'game_manage/widgets/agent_channel_select.html',67            #                      'context_func': 'game_manage.views.widgets.get_agent_channels_dict'},  # 渠éid68            # "channel_ids"     : {"name"        : "channel_id", "multiple": True,69            #                      'template'    : 'game_manage/widgets/agent_channel_dialog.html',70            #                      'context_func': 'game_manage.views.widgets.get_agent_channels_dict'},  # 渠éid71            # "kouliang_rate": {"name": "kouliang_rate"},72            # "min_kouliang_num": {"name": "min_kouliang_num"},73            # "game_server_id"  : {"name"    : "game_server_id", "multiple": False,74            #                      'template': 'sdk_center/widgets/game_server_alias.html'},  # åºæIDæ¿æ¢75            # "game_server_ids" : {"name"    : "game_server_id", "multiple": True,76            #                      'template': 'sdk_center/widgets/game_server_alias.html'},  # å¤ä¸ªåºæIDæ¿æ¢77            "agent_name"  : {"name": "agent_name", "alias": 'å¹³å°å'},  # å¹³å°å78            'platform_id' : {"name": "platform_id", "multiple": False, "alias": '游æå¹³id'},  # 游æå¹³å°79            'platform_ids': {"name": "platform_id", "multiple": True, "alias": 'å¹³å°IDS'},80            'user_id'     : {"name"       : "user_id", "multiple": False, "alias": '管çåid',81                             'params_func': lambda request: [request.user.id]},  # 管çåid82            'admin_id'    : {"name"       : "admin_id", "multiple": False, "alias": '管çåid',83                             'params_func': lambda request: [request.user.id]},  # 管çåid84            'is_root'     : {"name"       : "is_root", "multiple": False, "alias": 'æ¯å¦è¶
级管çå',85                             'params_func': lambda request: [1 if request.user.is_root else 0]},  # æ¯å¦è¶
级管çå86            'is_manager'  : {"name"       : "is_manager", "multiple": False, "alias": 'æ¯å¦ç®¡çå',87                             'params_func': lambda request: [1 if request.user.is_manager else 0]}  # æ¯å¦ç®¡çå88    })89    @classmethod90    def register_mark(cls, sql_mark_config):91        cls.mark_map[sql_mark_config.mark_name] = sql_mark_config92class SqlBuilder(object):93    """sql94    @param TAG_FORMAT: é»è®¤çæ ç¾å¤å´95    @param params:  åå
¸ ä¸é®ä¸å表 ä¾å¦reuqest.POST96    """97    TAG_FORMAT = '{{%s}}'98    def __init__(self, source_sql, params):99        """åå§å100        @param source_sql åSQL101        """102        self.sql = source_sql103        self.query_sql = source_sql.strip()104        self.params = params105        self.order_str = ''106        self.limit_str = ''107        self.mark_map = copy.copy(SqlMarkManager.mark_map)108    def query_sql_handle(self, sql=''):109        """æ¥è¯¢sql转æ¢110        """111        values_list = []112        for mark_name, config in self.mark_map.items():113            if self.has_mark(mark_name):  # sqlåå¨è¿ä¸ªæ ç¾114                param_name = config['name']115                value = self.get_param_value(param_name, config)116                if value or value == 0 or config.get('single', False) or config.get('fixed',117                                                                                    False):  # å
ææ²¡å¼çæ è®°æ¿æ¢,åéåå¼å
³å¯ä»¥ä¸ºç©ºå¼118                    values_list.append((mark_name, value))119                else:120                    self.empty_param_handle(mark_name)121        if values_list:122            for mark_name, value in values_list:123                self.replace_mark_to_value(mark_name, value)124        return self.query_sql125    def convert_datatable_name(self):126        datatable_tags = re.findall(r'${([^\s,]*)}', self.query_sql)127        if datatable_tags:  # å
å«${模åå}æ ç¾128            for data_model_name in set(datatable_tags):129                model_class = __import__(data_model_name)130                self.query_sql = self.query_sql.replace('${%s}' % data_model_name, model_class.get_table_name())131        return self.query_sql132    def get_param_value(self, param_name, config):133        """è·ååæ°å¼134        """135        values = self.params.get(param_name, []) or ['']136        the_value = str(values[0])137        if the_value != '':  # åæ°ä¸ä¸ºç©º138            if not config.get('multiple', False):  # 䏿¯å¤é139                values = values[0]140            else:141                the_sp = r',|\s'142                if re.search(the_sp, the_value):143                    values = re.split(the_sp, the_value)144            dict_key = config.get('dict', '')  # è¾å
¥å¼è½¬æ¢,æ¯æä¸æè½¬æid145            if dict_key:146                if not config.get('fixed', False):147                    value_def = DictDefine.get_dict_for_key(dict_key, reverse=True)148                    values = self.convert_input_value(values, value_def, config.get('type'))149                else:  # åºå®å¼150                    values = dict_key151            else:152                values = self.convert_input_value(values, {}, config.get('type'))153        else:154            values = config.get('default_value', '')155        return values156    def convert_input_value(self, values, value_def, value_type):157        """转æ¢è¾å
¥158        """159        if isinstance(values, (list, tuple, set)):160            _r = []161            for v in values:162                value = str(value_def.get(v, value_def.get(str(v), v)))163                # 鲿¢ sql 注å
¥164                value = self.get_safe_value(value)165                if not value.isdigit() or value_type == 'str':166                    value = "'%s'" % value167                _r.append(value)168            return ','.join(_r)169        else:170            return value_def.get(values, values)171    def empty_param_handle(self, mark_name, value=' 0=0 '):172        """ä¼ å
¥åæ°ä¸ºç©ºæ¶,æ SQLçæ¡ä»¶ ä¾å¦:log_user={{player_id}} æ¢æ¢æ 0=0173        """174        the_mark = self.make_mark(mark_name)175        # pattern = re.compile(r"""[\w\.`(+=*]*[\W`]?(not[\s]+)?(like|in|=|=>|<=|>|<|!=)[\W]*%s[^\s]*[\s$]*""" % the_mark,flags=re.I)176        pattern = re.compile(r"""[\S]*[\W`]?([\s]+not[\s]+)?(like|in|=|=>|<=|>|<|!=)[\W]*%s[^\s]*[\s$]*""" % the_mark,177                             flags=re.I)178        _s = int(time.time())179        self.query_sql = re.sub(pattern, value, self.query_sql)180    def set_limit(self, page_size, page_num):181        """设置sql limit182        """183        page_num, page_size = int(page_num), int(page_size)184        self.limit_str = 'limit %s,%s' % ((page_num - 1) * page_size, page_size)185    def set_order(self, sort_key, sort_type='ASC'):186        if sort_key and sort_type.lower() in ('asc', 'desc'):187            if 'order' in self.query_sql.strip()[-30:].lower():188                self.order_str = ' ,%s %s' % (sort_key, sort_type)189            else:190                self.order_str = ' ORDER BY %s %s' % (sort_key, sort_type)191    safe_pattern = re.compile(r"""'|"|;|>|<|%|--""", flags=re.I)192    def get_safe_value(self, value):193        value = re.sub(self.safe_pattern, '', str(value))194        return value195    def replace_mark_to_value(self, mark_name, value):196        self.query_sql = self.query_sql.replace(self.make_mark(mark_name), value)197    def make_mark(self, mark_name):198        return self.TAG_FORMAT % mark_name199    def get_query_sql(self):200        return '%s %s %s' % (Query.filter_sql(self.query_sql), self.order_str, self.limit_str)201    def get_count_sql(self):202        return 'select count(0) from (%s) newTable' % Query.filter_sql(self.query_sql)203    def has_mark(self, mark_name):204        return self.make_mark(mark_name) in self.query_sql205    def generate_mark_conditions(self):206        mark_conditions = {}207        for k, v in self.mark_map.items():208            mark_conditions[k] = v if self.has_mark(k) else False209        return mark_conditions210    def get_context_func(self, mark_name):211        func = self.mark_map.get(mark_name,{}).get('context_func', None)212        if isinstance(func, str):213            return import_func(func)214        return func215    def get_param_func(self, mark_name):216        func = self.mark_map.get(mark_name,{}).get('params_func', None)217        if isinstance(func, str):218            return import_func(func)219        return func220    def set_mark_parmas(self, request):221        for k, item in self.mark_map.items():222            param_func = self.get_param_func(k)223            if param_func:224                self.params[k] = param_func(request)225class QueryCompiler(SqlBuilder):226    def __init__(self, query_model, params):227        self.query = query_model228        self.already_get_query_sql = False229        the_sql = self.query.sql or self.query.get_default_sql()230        super(QueryCompiler, self).__init__(the_sql, params)231        self.mark_map.update(self.query.field_config)232    def get_tfoot_sql(self):233        """è·åtfootçæ±æ»sql234        """235        assert self.already_get_query_sql, 'åªè½å
使ç¨get_query_sql è·åæ¥è¯¢SQL'236        tmp_sql = self.query_sql237        ftoot_sql = self.query.other_sql238        if ftoot_sql:239            self.query_sql = ftoot_sql240            self.query_sql_handle()241            ftoot_sql = self.query_sql242            self.query_sql = tmp_sql243        return ftoot_sql244    def get_query_sql(self):245        self.already_get_query_sql = True246        if self.query.order and not self.order_str:  # å¦ææ²¡ææåºè®¾ç½®é»è®¤æ¥è¯¢æåº247            self.set_order(self.query.order, self.query.order_type_str)248        return super(QueryCompiler, self).get_query_sql()249    def get_statistic(self):250        """è·åæéçç»è®¡251        """252        statistic_tags = re.findall(r'<<([^\s,]*)>>', self.query.sql)253        statistic_tags.extend(re.findall(r'<<([^\s,]*)>>', self.query.other_sql))254        result = []255        if statistic_tags:  # å
å«<<ç»è®¡å>>æ ç¾256            statistic_names_d = DictDefine.get_dict_for_key('statistic_name', reverse=True)257            for s_name in set(statistic_tags):258                s_id = statistic_names_d.get(re.sub(r'\s', '', s_name))  # å»é¤ç©ºæ ¼çç¹æ®å符259                if s_id:260                    result.append((s_id, s_name))261        return result262    def query_sql_handle(self):263        """ å¢å è·åç»è®¡id ä½¿ç¨æ¹æ³ <<ç»è®¡å>>264        """265        record = super(QueryCompiler, self).query_sql_handle()266        if isinstance(self.query_sql, bytes):267            self.query_sql = self.query_sql.decode('utf8')268        statistic_tags = re.findall(r'<<([^\s,]*)>>', self.query_sql)269        from .statistic import Statistic270        if statistic_tags:  # å
å«<<ç»è®¡å>>æ ç¾271            statistic_names_d = dict(Statistic.objects.values_list('name',272                                                                   'id'))  # ,DictDefine.get_dict_for_key('statistic_name', reverse=True)273            for s_name in set(statistic_tags):274                s_id = statistic_names_d.get(re.sub(r'\s', '', s_name))  # å»é¤ç©ºæ ¼çç¹æ®å符275                if s_id:276                    self.query_sql = self.query_sql.replace('<<%s>>' % s_name, '"%s"' % s_id)277        # éå¶ä½¿ç¨çç»è®¡278        if self.has_mark("statistic"):279            statistic_list = self.get_statistic()280            self.replace_mark_to_value("statistic", ",".join([s_id for s_id, _ in statistic_list]))281        return self.query_sql282    def has_mark(self, mark_name):283        return super(QueryCompiler, self).has_mark(mark_name)284    def has_conditions(self):285        for k, v in self.query.field_config.items():286            if v.get('search', ''):287                return True288    def get_conn(self, server_id=0):289        from .query_server import QueryServer290        return QueryServer.get_conn(server_id, 'read')291class Query(BaseModel):292    """æ¥è¯¢293    """294    log_key = models.CharField('å
³è表æ è¯', max_length=30, db_index=True, null=False)295    log_type = models.IntegerField(default=0)296    class OrderType(Enum):297        DESC = 1, _('ååº')298        AESC = 0, _('æ£åº')299    key = models.CharField('æ¥è¯¢æ è¯', default='', max_length=100, null=False, blank=True, db_index=True)300    name = models.CharField('æ¥è¯¢åç§°', default='', max_length=100, null=False, blank=False, unique=True)301    select = models.CharField(_('æ¥è¯¢å段'), max_length=1000, null=False, blank=True)302    where = models.CharField(max_length=500, null=False, blank=True)303    group = models.CharField('ç¨éåç»', max_length=50, null=False, blank=True, default='')304    order = models.CharField(max_length=20, null=False, blank=True, default='')305    order_type = models.IntegerField(default=OrderType.AESC, choices=OrderType.member_list(), null=False)306    sql = models.TextField(_('SQL'), default='', null=False, blank=True)307    other_sql = models.TextField(_('å
¶ä»SQL'), default='', null=False, blank=True)308    cache_validate = models.IntegerField(default=0, null=True)309    is_paging = models.BooleanField(_('æ¯å¦å页'), default=False, null=False)310    remark = models.CharField('夿³¨', max_length=1000, blank=True)311    _field_config = models.TextField('æ¥è¯¢å段å®ä¹', default="", blank=True)312    template_name = models.CharField('模çå', max_length=32, blank=True)313    _DEFAULT_FIELD_CONFIG = {314            "æ è®°å": {"name"  : "åæ°å", "dict": "", "sort": False, "order_num": 99, "multiple": False,315                    "search": False, "merge_value": True}}316    __cache_config = None317    class Meta:318        app_label = AnalysisConfig.name319        ordering = ('id',)320    @CacheAttribute321    def log_def_list(self):322        return LogDefine.objects.using('read').all()323    def save(self, *args, **kwargs):324        super(Query, self).save(*args, **kwargs)325    def get_default_sql(self):326        """ é»è®¤SQL327        """328        log_def = self.log_def329        the_sql = """SELECT %s FROM %s WHERE log_time  BETWEEN '{{sdate}}' AND '{{edate}}' """ % (330                self.select, log_def.table_name)331        if self.where:332            the_sql += ' AND %s' % self.where333        if self.order:334            the_sql += ' ORDER BY %s %s' % (self.order, self.order_type_str)335        for field_name, config in log_def.config.items():336            verbose_name = config['verbose_name']337            if verbose_name:338                the_sql = the_sql.replace(verbose_name, field_name)339        return the_sql340    @property341    def order_type_str(self):342        return 'DESC' if self.order_type == 1 else 'AESC'343    @CacheAttribute344    def log_def(self):345        return LogDefine.objects.filter(key=self.log_key).first()346    @property347    def is_center_query(self):348        """æ¯å¦ä¸å¤®æ¥è¯¢349        """350        try:351            return self.log_def.status == LogDefine.PositionType.CENTER352        except:353            return True354    @CacheAttribute355    def selects(self):356        return [f.strip() for f in self.select.split(',') if f]357    @property358    def field_config(self):359        """åæ®µå®ä¹360        """361        try:362            s_d = OrderedDict()363            self.__cache_config = self.__cache_config or OrderedDict(364                    sorted(json.loads(self._field_config).items(), key=lambda x: x[1]['order_num']))365        except:366            print(trace_msg())367            self.__cache_config = self._DEFAULT_FIELD_CONFIG368        return self.__cache_config369    @field_config.setter370    def field_config(self, obj_value):371        if isinstance(obj_value, dict):372            obj_value = json.dumps(obj_value, ensure_ascii=False)373        self._field_config = obj_value374    @classmethod375    def filter_sql(cls, sql):376        p = re.compile('(update|delete|modify|lock[\s]+|drop|table)', re.I)377        sql = p.sub('', sql)378        return sql379    @CacheAttribute380    def safe_sql(self):381        return self.__class__.filter_sql(self.sql)382    def __unicode__(self):383        return '%s' % self.name...views.py
Source:views.py  
1import random2from io import BytesIO3import logging4from django.contrib.auth import authenticate, login as auth_login, logout as auth_logout5from django.contrib.auth.decorators import login_required6from django.core import signing7from django.http import HttpResponse, Http404, JsonResponse8from django.shortcuts import render, redirect, get_object_or_4049from django.shortcuts import render_to_response10from django.core.urlresolvers import reverse_lazy, reverse11from django.template.response import TemplateResponse12from django.utils import timezone13from django.utils.decorators import method_decorator14from django.views.decorators.cache import never_cache15from django.views.decorators.http import last_modified16from django.views.generic import FormView, TemplateView, DetailView17from goods.models import Goods18from .models import User, RecipientsAddress19from .form import UserModelRegisterForm, UserLoginForm20from PIL import Image, ImageDraw, ImageFont, ImageFilter21def create_captcha_code(request):22    '''23    çæ 4个æ±åéªè¯ç  ,éè¦ç¨æ·æé¡ºåºç¹å»å
¶ä¸ç 3个24    ä¿åå°ä¼è¯ä¸ 25    è¿åéªè¯ç å¾ç'''26    # éæºç4个æ±å:27    zh_words = ['ç', 'å½', 'ç', 'æ', '天', '好', 'å', '祥', 'å¦', 'æ']28    zh_words = random.sample(zh_words, 4)29    #  ç»å¾30    image = Image.open(r'C:\Users\chaiming\Desktop\IMG_20170502_165107_344.jpg')31    # w, h = image.size32    # image.thumbnail((w // 2, h // 2))33    # éæºé¢è²34    def rnd_color():35        return (random.choice([0, 255]), 0, random.choice([0, 255]))36    # å建Font对象: 大å°ä¸ºé¿å®½ä¸å°çé£ä¸ªç8/137    width, height = image.size38    font_size = int(min(image.size)) // 839    font = ImageFont.truetype(r'F:\pycharm_projects\djcode\dailyfresh\user\fonts\MSYH.TTF', font_size)40    # å建Draw对象:41    draw = ImageDraw.Draw(image)42    # çæéæºåæ 43    x1 = random.randint(0, width - font_size)44    y1 = random.randint(0 + font_size, height - font_size)45    # 循ç¯3æ¬¡ï¼æå©ä¸ç符åçåæ æå46    points_list = [(x1, y1)]47    for i in range(3):48        is_not_pass = True49        while is_not_pass:50            x2, y2 = random.randint(0, width - font_size), random.randint(0 + font_size, height - font_size)51            for x, y in points_list:52                if (x2 > x + font_size) or (x2 + font_size < x) or (y2 > y + font_size) or (y2 + font_size < y):53                    pass54                else:55                    is_not_pass = True56                    break57            else:58                is_not_pass = False59                points_list.append((x2, y2))60    # è¾åºæå:61    for t in range(4):62        draw.text(points_list[t], zh_words[t], font=font,63                  fill=rnd_color())64    z_p = zip(zh_words, points_list)  # æåååæ ç»å65    sample = random.sample([i for i in z_p], 3)66    #  设置session å¨å端éªè¯67    captcha_dict = {68        'a1': sample[0][0], 'x1': sample[0][1][0], 'y1': sample[0][1][1],69        'a2': sample[1][0], 'x2': sample[1][1][0], 'y2': sample[1][1][1],70        'a3': sample[2][0], 'x3': sample[2][1][0], 'y3': sample[2][1][1],71        'size': font_size72    }73    # 模ç³:74    # image = image.filter(ImageFilter.BLUR)75    # del draw76    # draw = ImageDraw.Draw(image)77    draw.text(78        (0, 0),79        '"{}", "{}", "{}"'.format(captcha_dict['a1'], captcha_dict['a2'], captcha_dict['a3']),80        font=font,81        fill=(0, 0, 0)82    )83    buff = BytesIO()84    image.save(buff, 'jpeg')85    response = HttpResponse(buff.getvalue(), 'image/jpeg')86    request.session['captcha'] = captcha_dict87    return response88class UserCreate(FormView):89    form_class = UserModelRegisterForm90    template_name = 'user/register.html'91    success_url = reverse_lazy('user:user_center_info')  # 跳转å°ç¨æ·ä¸å¿92    @method_decorator(never_cache)93    def dispatch(self, *args, **kwargs):94        return super(UserCreate, self).dispatch(*args, **kwargs)95    def form_valid(self, form):96        #  对éªè¯ç è¿è¡éªè¯97        code = self.request.session.get('captcha')98        if code:99            ret = form.valid_captcha(code=code)100        else:101            logger = logging.getLogger('captcha')102            logger.warning('IP: {}, 主æº: {}, æå¡å¨è®¤è¯åçç¨æ·: {} 客æ·ç«¯ç¯¡æ¹Cookies'.format(103                self.request.META.get('REMOTE_ADDR'),104                self.request.META.get('REMOTE_HOST'),105                self.request.META.get('REMOTE_USER'),106            ))107            raise Http404108        if ret is False:109            return render(self.request, self.template_name, {'form': form})110        User.objects.create_user(111            form.cleaned_data.get('username'),112            form.cleaned_data.get('email'),113            form.cleaned_data.get('password'),114        )115        return super().form_valid(form)  # éªè¯æååè°ç¨, é»è®¤è¿åæåçéå®å116def username_is_repeat(request):117    '''ajaxéªè¯ååç¨æ·åè½'''118    if User.objects.filter(username=request.GET.get('username')).exists():119        return JsonResponse({'is_repeat': 1})120    return JsonResponse({'is_repeat': 0})121class UserLogin(FormView):122    form_class = UserLoginForm123    template_name = 'user/login.html'124    success_url = reverse_lazy('user:user_center_info')125    def get_context_data(self, **kwargs):126        context = super().get_context_data(**kwargs)127        context['mark_name'] = self.request.get_signed_cookie('mark_name', '')128        return context129    def form_valid(self, form):130        user = authenticate(username=form.cleaned_data.get('username'), password=form.cleaned_data.get('password'))131        if user is not None:132            if user.is_active:133                #  ç¨æ·ç»é134                auth_login(self.request, user)135                #  éå®å136                return super().form_valid(form)137        else:138            #  ç¨æ·éªè¯ä¸æå139            form.add_error(None, 'ç¨æ·åæå¯ç ä¸æ£ç¡®')140            context = {}141            response = TemplateResponse(self.request, self.template_name, context)142            # print(response.context_data)143            #  妿å¾éäºè®°ä½ç¨æ·åï¼åä¿åå°Cookiesä¸, key=mark_name, 妿æªå¾éï¼è®¾ç½®mark_name 为空144            username = self.request.POST.get('username')145            if self.request.POST.get('mark_name') == '1':146                response.set_signed_cookie('mark_name', username)147            else:148                response.delete_cookie('mark_name')149                username = ''150            context = {151                'form': form,152                'mark_name': username153            }154            response.context_data = context155            return response156            #  ç´æ¥è°ç¨ self.form_invalid()157            # return self.form_invalid(form)158    def form_invalid(self, form):159        context = {}160        response = TemplateResponse(self.request, self.template_name, context)161        #  妿å¾éäºè®°ä½ç¨æ·åï¼åä¿åå°Cookiesä¸, key=mark_name162        username = self.request.POST.get('username')163        if self.request.POST.get('mark_name') == '1':164            response.set_signed_cookie('mark_name', username)165        else:166            response.delete_cookie('mark_name')167            username = ''168        context = {169            'form': form,170            'mark_name': username171        }172        response.context_data = context173        return response174def login_datetime(request):175    return timezone.datetime(2018, 5, 26, 16, 36)  # ç»é页颿åä¿®æ¹æ¶é´176@last_modified(login_datetime)177@never_cache178def login(request):179    template_name = 'user/login.html'180    success_url = reverse_lazy('user:user_center_info')  # 跳转å°ç¨æ·ä¸å¿181    if request.method == 'GET':182        return render(request, template_name, {'mark_name': request.get_signed_cookie('mark_name', '')})183    else:184        username = request.POST.get('username')185        form = UserLoginForm(request.POST)186        if form.is_valid():187            user = authenticate(username=form.cleaned_data['username'], password=form.cleaned_data['password'])188            if user is not None:189                if user.is_active:190                    #  ç»é191                    auth_login(request, user)192                    #  éå®å, è·åè£
饰å¨login_required çnextåæ°ï¼å®ç°è·³è½¬å°ç»éä¹åçè·¯å¾193                    success_url = request.GET.get('next', success_url)194                    response = redirect(success_url)195                    #  è®°ä½ç¨æ·ååè½196                    if request.POST.get('mark_name') == '1':197                        response.set_signed_cookie('mark_name', username)198                    else:199                        response.delete_cookie('mark_name')200                    return response201            else:202                form.add_error(None, 'ç¨æ·åæå¯ç ä¸æ£ç¡®')203        # 页é¢è®°ä½ç¨æ·ååè½ï¼æç¨æ·ååå¨cookieé204        response = TemplateResponse(request, template_name, {})205        if request.POST.get('mark_name') == '1':206            response.set_signed_cookie('mark_name', username)207        else:208            response.delete_cookie('mark_name')209            username = ''210        response.context_data = {'form': form, 'mark_name': username}211        return response212'''213头é¨Last-Modified æ ¼å¼å214Expires: Thu, 10 May 2018 10:17:09 GMT215response['Last-Modified'] = timezone.datetime(2018, 5, 8).strftime('%a, %d %b %Y %H:%M:%S GMT')216'''217def logout_view(request):218    '''ç»åºè§å¾'''219    auth_logout(request)220    # Redirect to a success page.221    next_path = request.GET.get('next')222    if next_path:223        return redirect('%s?next=%s' % (reverse('user:login'), next_path))224    else:225        return redirect(reverse('user:login'))226class UserDetail(TemplateView):227    template_name = 'user/user_center_info.html'228    @method_decorator(login_required)229    def dispatch(self, *args, **kwargs):230        return super().dispatch(*args, **kwargs)231    def get_context_data(self, **kwargs):232        context = super().get_context_data(**kwargs)233        signing_str = self.request.COOKIES.get('gid_list')234        if signing_str is not None:235            try:236                gid_list = signing.loads(signing_str)237            except signing.BadSignature as e:238                logger = logging.getLogger('django.cookies')239                logger.warning('cookiesä¸è·ågidé误ï¼{}, USER[{}], REMOTE_ADDR[{}]'.format(240                    e,241                    self.request.user,242                    self.request.META.get('REMOTE_ADDR'),243                ))244                gid_list = []245        else:246            gid_list = []247        recently_viewed = []248        if gid_list is not []:249            for gid in gid_list:250                try:251                    goods = Goods.objects.get(pk=gid)252                except Goods.DoesNotExist:253                    goods = None254                if goods is not None:255                    recently_viewed.append(goods)256        context['recently_viewed'] = recently_viewed257        return context258        # def get(self, request, *args, **kwargs):259        #     self.cookies = request.COOKIES260        #     return super().get(request, *args, **kwargs)261@login_required()262def edit_recipients_address(request):263    if request.method == 'POST':264        recipients_address = RecipientsAddress(265            user=request.user,266            recipient=request.POST.get('recipient', ''),267            shipping_address=request.POST.get('shipping_address', ''),268            postcode=request.POST.get('postcode', ''),269            mobile_number=request.POST.get('mobile_number', ''),270        )271        recipients_address.save()272    return render(request, 'user/user_center_site.html')273# å é¤ç¨æ·å
³èå°å274@login_required()275def delete_address(request, uid, recid):276    uid = int(uid)277    recid = int(recid)278    address = get_object_or_404(RecipientsAddress, pk=recid)279    if address.user_id != uid:280        logger = logging.getLogger('myproject.debug')281        logger.debug('ç¨æ·idåæè·åæ°uidä¸ä¸è´ï¼')282        logger.debug('{}[{}] != {}[{}]'.format(type(address.user_id), address.user_id, type(uid), uid))283        raise Http404284    address.delete()...marked.py
Source:marked.py  
1from typing import List, Set, Union, Callable, NamedTuple2from copy import deepcopy3from collections import namedtuple4from beancount.core.data import Transaction, Posting, new_metadata, Entries5import beancount_plugin_utils.metaset as metaset6from beancount_plugin_utils.BeancountError import BeancountError, entry_error_handler7MARK_SEPERATOR = "-"8PluginUtilsMarkedError = namedtuple("PluginUtilsMarkedError", "source message entry")9def normalize_transaction(10    tx: Transaction,11    mark_name: str,12    account_types: Union[Set[str], bool] = False,13):14    """15    Move marks in tags with name `mark_name` into meta, if any, and merge with marks in meta in a way of metaset.16    Then, if `account_types` are provided, move marks into postings with the given account types or error if there's none such posting.17    Note: in beancount `meta` is mandatory on transactions, but optional on postings.18    Example:19        try:20            tx, is_marked = marked.normalize_transaction(config.mark_name, entry, ("Income", "Expenses"))21        except BeancountError as e:22            new_entries.append(entry)23            errors.append(e.to_named_tuple())24            continue25        for posting, orig_posting in zip(tx, orig_tx):26            marks = metaset.get(posting)27            # Do your thing.28    Args:29        txs [Transaction]: transaction instances.30        mark_name [str]: the mark name.31        account_types [Set[str], False]: set of account types that must be considered, defaults to False.32    Returns:33        new Transaction instance with normalized marks.34        boolean of whenever mark was used in this transaction.35    Raises:36        BeancountError.37    """38    copy = deepcopy(tx)39    for tag in copy.tags:40        if tag == mark_name or tag[0 : len(mark_name + MARK_SEPERATOR)] == mark_name + MARK_SEPERATOR:41            copy = copy._replace(42                tags=copy.tags.difference([tag]),43                meta=metaset.add(copy.meta, mark_name, tag[len(mark_name + MARK_SEPERATOR) :] or ""),44            )45    is_used = False46    if metaset.has(copy.meta, mark_name):47        is_used = True48    for posting in copy.postings:49        if posting.meta == None:50            continue51        if not metaset.has(posting.meta, mark_name):52            continue53        is_used = True54        if not account_types:55            raise BeancountError(56                posting.meta,57                'Mark "{}" can be only applied to transactions, not postings: "{}".'.format(mark_name, posting.account),58                tx,59                PluginUtilsMarkedError,60            )61        if not (posting.account.split(":")[0] in account_types):62            raise BeancountError(63                posting.meta,64                'Mark "{}" can be only applied to posting with account types of: {}'.format(mark_name, account_types),65                tx,66                PluginUtilsMarkedError,67            )68    if not account_types:69        return copy, is_used70    if not is_used:71        return copy, False72    is_applied = False73    postings = []74    default_marks = metaset.get(copy.meta, mark_name)75    copy = copy._replace(meta=metaset.clear(copy.meta, mark_name))76    for posting in copy.postings:77        marks = metaset.get(posting.meta, mark_name)78        if len(marks) > 0:79            postings.append(posting)80            is_applied = True81        elif len(default_marks) > 0 and (posting.account.split(":")[0] in account_types):82            postings.append(posting._replace(meta=metaset.set(posting.meta, mark_name, default_marks)))83            is_applied = True84        else:85            postings.append(posting)86    if not is_applied:87        raise BeancountError(88            tx.meta,89            'Mark "{}" on a transaction has no effect because transaction does not have postings with account types of: {}'.format(90                mark_name, account_types91            ),92            tx,93            PluginUtilsMarkedError,94        )95    copy = copy._replace(postings=postings)96    return copy, True97def on_marked_transactions(98    per_marked_transaction: Callable[[Transaction, Transaction, NamedTuple], List[Transaction]],99    entries: Entries,100    config: NamedTuple,101    mark_name: str,102    account_types: Union[Set[str], bool] = False,103    error_named_tuple: NamedTuple = BeancountError,104):105    new_entries: Entries = []106    errors: List[NamedTuple] = []107    for entry in entries:108        with entry_error_handler(entry, new_entries, errors, error_named_tuple):109            if not isinstance(entry, Transaction) or entry.flag == "P":  # Ignore txs generated by padding too.110                new_entries.append(entry)111                continue112            tx, is_marked = normalize_transaction(entry, mark_name, account_types)113            if is_marked:114                new_txs = per_marked_transaction(tx, entry, config)115                new_entries.extend(new_txs)116            else:117                new_entries.append(entry)...common.py
Source:common.py  
1from typing import List, Set, Union, Tuple2from copy import deepcopy3from beancount.core.data import Transaction, Posting4from beancount.core.inventory import Inventory5import beancount_plugin_utils.metaset as metaset6def read_config(config_string):7    """8    Args:9      config_string: A configuration string in JSON format given in source file.10    Returns:11      A dict of the configuration string.12    """13    if len(config_string) == 0:14        config_obj = {}15    else:16        config_obj = eval(config_string, {}, {})17    if not isinstance(config_obj, dict):18        raise RuntimeError("Invalid plugin configuration: should be a single dict.")19    return config_obj20MARK_SEPERATOR = "-"21def normalize_marked_txn(tx: Transaction, mark_name: str):22    """23    If a transaction is marked, hoist marked tags into transation meta(s).24    Args:25        txs [Transaction]: transaction instances.26        mark_name [str]: the mark.27    Return:28        Transaction with normalized marks.29    """30    copy = deepcopy(tx)31    for tag in copy.tags:32        if (33            tag == mark_name34            or tag[0 : len(mark_name + MARK_SEPERATOR)] == mark_name + MARK_SEPERATOR35        ):36            copy = copy._replace(37                tags=copy.tags.difference([tag]),38                meta=metaset.add(39                    copy.meta, mark_name, tag[len(mark_name + MARK_SEPERATOR) :] or ""40                ),41            )42    return copy43DEFAULT_APPLICABLE_ACCOUNT_TYPES = set(44    ["Income", "Expenses", "Assets", "Liabilities", "Equity"]45)46def marked_postings(47    tx: Transaction,48    mark_name: str,49    applicable_account_types: Set[str] = DEFAULT_APPLICABLE_ACCOUNT_TYPES,50    allow_posting_level_mark: bool = True,51):52    """53    Iterates over postings of the transaction, returning most specific mark value for applicable account types.54    Args:55        tx [Transaction]: transaction instance.56        mark_name [str]: the mark.57        applicable_account_types [Set[str]]: set of account types that must be considered, defaults to all five.58        allow_posting_level_mark [bool]: set to False if posting-level marks should raise error instead.59    Yields:60        list of mark values or None.61        posting.62        original posting.63        original transaction.64    """65    copy = deepcopy(tx)66    default_marks = metaset.get(tx.meta, mark_name)67    copy = copy._replace(meta=metaset.clear(tx.meta, mark_name))68    for _posting in copy.postings:69        marks = metaset.get(_posting.meta, mark_name)70        posting = _posting._replace(meta=metaset.clear(_posting.meta, mark_name))71        if len(marks) > 0:72            yield marks, posting, _posting, copy73        elif len(default_marks) > 0:74            if posting.account.split(":")[0] not in applicable_account_types:75                yield None, posting, _posting, copy76            else:77                yield default_marks, posting, _posting, copy78        else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
