Best Python code snippet using molotov_python
analyse_utils.py
Source:analyse_utils.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3import collections4import jieba5import re6from copy import deepcopy7from logging import ERROR8jieba.setLogLevel(ERROR)9#æç« ç±»å对ç
§è¡¨10CLASS = {11    0: '娱ä¹',12    1: 'ä½è²',13    2: '彩票',14    3: 'æ¿äº§',15    4: 'æè²',16    5: 'æ¶å°',17    6: 'æ¶æ¿',18    7: 'æåº§',19    8: '游æ',20    9: '社ä¼',21    10: 'ç§æ',22    11: 'è¡ç¥¨',23    12: 'è´¢ç»',24    13: 'å®¶å±
'25}26#ä¼ å
¥post对象27class phCount(object):28    """29    ä¼ å
¥æ°æ®ï¼ åæåºç¨ç¨æ·çææº30    """31    def __init__(self, data_iter):32        """33        åå§å屿§34        """35        self._data_iter = deepcopy(data_iter)36    def _gen_counter(self):37        """38        çæè®¡æ°å¨å¯¹è±¡ï¼ å¯¹äºæ¯ä¸æ¡æ°æ®èè¨39        :return: çæå¨40        """41        _data = (_data['device'] for _data in self._data_iter)42        for _ in _data:43            yield collections.Counter(_)44    def _result_dirty(self):45        """46        对计æ°å¨å¯¹è±¡æ±åï¼åæ¥çæï¼æ°æ®47        :return: ä¸ç¬¦åechartsæ°æ®æ ¼å¼ï¼ éè¿ä¸æ¥æ ¼å¼å48        """49        _result = collections.Counter()50        for _counter in self._gen_counter():51            _result += _counter52        return _result53    #坹夿便¥å£54    def result(self):55        """56        å¯¹æ°æ®æ ¼å¼åï¼ è¿åechartsè¦æ±çæ°æ®æ ¼å¼,è¿éå°ipadç¨æ·ç»ä¸å½äºè¹æç¨æ·57        """58        for _res_tuple in self._result_dirty().items():59            _dict = dict()60            if _res_tuple[0] == 'android':61                _dict['value'] = _res_tuple[1]62                _dict['name'] = 'å®åç¨æ·'63            elif _res_tuple[0] == 'apple':64                _dict['value'] = _res_tuple[1]65                _dict['name'] = 'è¹æç¨æ·'66            else:67                _dict['value'] = _res_tuple[1]68                _dict['name'] = 'å¹³æ¿ç¨æ·'69            yield _dict70#ä¼ å
¥user对象71class sexCount(object):72    """ç»è®¡ç·å¥³æ¯ä¾"""73    def __init__(self, data_iter):74        """75        åå§å屿§76        """77        self._data_iter = deepcopy(data_iter)78    @property79    def _sex_list(self):80        """81        å°è£
æ¯æ¡æ°æ®çæ§å«ä¸ºå表82        """83        _sex_list = (data['sex'] for data in self._data_iter)84        return _sex_list85    def _result_dirty(self):86        """87        çæè®¡æ°å¨å¯¹è±¡88        """89        _result_dirty = collections.Counter(self._sex_list)90        return _result_dirty91    #坹夿¥å£92    def result(self):93        """94        对计æ°å¨å¯¹è±¡ç»æåå¤ç为符åechartsè¦æ±æ ¼å¼95        """96        for _res_tuple in self._result_dirty().items():97            _dict = dict()98            if _res_tuple[0] == 'male':99                _dict['value'] = _res_tuple[1]100                _dict['name'] = 'ç·ç'101            else:102                _dict['value'] = _res_tuple[1]103                _dict['name'] = '女ç'104            yield _dict105#ä¼ å
¥post对象,åç¨è¯è¯å
¸106class contentWordCount(object):107    def __init__(self, data_iter, stopWord):108        """109        åå§å屿§, å è½½èªå®ä¹è¯å
¸ï¼ å è½½åç¨è¯110        """111        self._data_iter = deepcopy(data_iter)112        self._stop_word = deepcopy(stopWord)113    def _fileterStopWord(self, content_dirty):114        """115        å»é¤åç¨è¯è¾
å©å½æ°116        """117        content = filter(lambda _: _ not in self._stop_word and len(_) > 1, content_dirty)118        return content119    @property120    def _content_clean(self):121        """122        ç»è¿åè¯ï¼ å»é¤åç¨è¯çå
容ï¼è¿åä¸ä¸ªçæå¨123        """124        _content_dirty = (jieba.cut(_data['reply_content']) for _data in self._data_iter)125        return map(self._fileterStopWord, _content_dirty)126    def _gen_counter(self):127        """128        计æ°å¨çæå¨å¯¹è±¡129        """130        for _ in self._content_clean:131            yield collections.Counter(_)132    @property133    def _result_dirty(self):134        """135        å¾å°ä¸¤ç¾ä¸ªçè¯136        """137        _result = collections.Counter()138        for _ in self._gen_counter():139            _result += _140        return iter(_result.most_common(100))141    #坹夿¥å£142    def result(self):143        """144        è¿åçæå¨145        """146        for _res_tuple in self._result_dirty:147            _dict = dict()148            _dict['value'] = _res_tuple[1]149            _dict['name'] = _res_tuple[0]150            yield _dict151#ä¼ å
¥user对象152class nickNameWordCount(object):153    def __init__(self, data_iter):154        """155         åå§å屿§156        """157        self._data_iter = deepcopy(data_iter)158    @property159    def _nick_name_list(self):160        """161        æææµç§°ï¼åè¯å:162        """163        _nick_name = (jieba.cut(''.join(re.findall(r'[\u4e00-\u9fa5]', nk_name['name_show']))) for nk_name in self._data_iter)164        return map(lambda _nick: filter(lambda _: _ not in ['ä½ ', 'æ', 'ä»', '她', 'å®', 'ç', 'æ¯', 'å', '大', 'å°', 'äº', 'å§', 'ä¸', 'èª', 'ä¸'], _nick), _nick_name)165    def _gen_counter(self):166        """167        产ç计æ°å¨å¯¹è±¡168        :return: çæå¨169        """170        for _ in self._nick_name_list:171            yield collections.Counter(_)172    @property173    def _result_dirty(self):174        """175        产çéechartsè§èæ°æ®æ ¼å¼çæ°æ®ï¼æ¾ç¤º100æ¡176        """177        _result = collections.Counter()178        for _ in self._gen_counter():179            _result += _180        return _result.most_common(100)181    #坹夿¥å£182    def result(self):183        """184        å¤çæechartsè§èæ°æ®185        """186        for _res_tuple in self._result_dirty:187            _dict = dict()188            _dict['value'] = _res_tuple[1]189            _dict['name'] = _res_tuple[0]190            yield _dict191#ä¼ å
¥user对象192class vipCount(object):193    """194    ç»è®¡ç¨æ·å¼éVIPç¶åµ195    """196    def __init__(self, data_iter):197        self.data_iter = deepcopy(data_iter)198    @property199    def _gen_tb_vip_list(self):200        return (_isVip['tb_vip'] for _isVip in self.data_iter)201    @property202    def _result_dirty(self):203        return collections.Counter(self._gen_tb_vip_list)204    #坹夿¥å£205    def result(self):206        """ç»æåæ°æ®"""207        for _res_tuple in self._result_dirty.items():208            _dict = dict()209            if _res_tuple[0]:210                _dict['value'] = _res_tuple[1]211                _dict['name'] = 'ä¼åç¨æ·'212            else:213                _dict['value'] = _res_tuple[1]214                _dict['name'] = 'æ®éç¨æ·'215            yield _dict216#ä¼ å
¥posts对象217class SentimentCount(object):218    """219    ç»è®¡æ
ææ°é220    """221    def __init__(self, data_iter):222        """å¤å¶è¿ä»£å¨å¯¹è±¡"""223        self._data_iter = deepcopy(data_iter)224    @property225    def _get_sent_list(self):226        for _ in self._data_iter:227            yield _.get('sentiment')228    @property229    def _get_dirty_counter(self):230        return collections.Counter(self._get_sent_list)231    # 坹夿¥å£232    def result(self):233        for name, value in self._get_dirty_counter.items():234            _dict = dict()235            if name == 0:236                _dict['name'] = 'æ¶æ'237                _dict['value'] = value238            elif name == 1:239                _dict['name'] = '䏿§'240                _dict['value'] = value241            else:242                _dict['name'] = '积æ'243                _dict['value'] = value244            yield _dict245# ä¼ å
¥posts对象246class ClassCount(object):247    """248    ç»è®¡æ
ææ°é249    """250    def __init__(self, data_iter):251        """å¤å¶è¿ä»£å¨å¯¹è±¡"""252        self._data_iter = deepcopy(data_iter)253    @property254    def _get_class_list(self):255        for _ in self._data_iter:256            yield _.get('classify')257    @property258    def _get_dirty_counter(self):259        return collections.Counter(self._get_class_list)260    # 坹夿¥å£261    def result(self):262        for name, value in self._get_dirty_counter.items():263            yield {'name': CLASS.get(name), 'value': value}264#å è½½åç¨è¯265def loadStopWord(path):266    """267    å è½½åç¨è¯è¯å
¸268    :return:269    """270    with open(path, 'r', encoding='utf8') as f_obj:...identity.py
Source:identity.py  
1# orm/identity.py2# Copyright (C) 2005-2015 the SQLAlchemy authors and contributors3# <see AUTHORS file>4#5# This module is part of SQLAlchemy and is released under6# the MIT License: http://www.opensource.org/licenses/mit-license.php7import weakref8from . import attributes9from .. import util10class IdentityMap(object):11    def __init__(self):12        self._dict = {}13        self._modified = set()14        self._wr = weakref.ref(self)15    def keys(self):16        return self._dict.keys()17    def replace(self, state):18        raise NotImplementedError()19    def add(self, state):20        raise NotImplementedError()21    def _add_unpresent(self, state, key):22        """optional inlined form of add() which can assume item isn't present23        in the map"""24        self.add(state)25    def update(self, dict):26        raise NotImplementedError("IdentityMap uses add() to insert data")27    def clear(self):28        raise NotImplementedError("IdentityMap uses remove() to remove data")29    def _manage_incoming_state(self, state):30        state._instance_dict = self._wr31        if state.modified:32            self._modified.add(state)33    def _manage_removed_state(self, state):34        del state._instance_dict35        if state.modified:36            self._modified.discard(state)37    def _dirty_states(self):38        return self._modified39    def check_modified(self):40        """return True if any InstanceStates present have been marked41        as 'modified'.42        """43        return bool(self._modified)44    def has_key(self, key):45        return key in self46    def popitem(self):47        raise NotImplementedError("IdentityMap uses remove() to remove data")48    def pop(self, key, *args):49        raise NotImplementedError("IdentityMap uses remove() to remove data")50    def setdefault(self, key, default=None):51        raise NotImplementedError("IdentityMap uses add() to insert data")52    def __len__(self):53        return len(self._dict)54    def copy(self):55        raise NotImplementedError()56    def __setitem__(self, key, value):57        raise NotImplementedError("IdentityMap uses add() to insert data")58    def __delitem__(self, key):59        raise NotImplementedError("IdentityMap uses remove() to remove data")60class WeakInstanceDict(IdentityMap):61    def __getitem__(self, key):62        state = self._dict[key]63        o = state.obj()64        if o is None:65            raise KeyError(key)66        return o67    def __contains__(self, key):68        try:69            if key in self._dict:70                state = self._dict[key]71                o = state.obj()72            else:73                return False74        except KeyError:75            return False76        else:77            return o is not None78    def contains_state(self, state):79        return state.key in self._dict and self._dict[state.key] is state80    def replace(self, state):81        if state.key in self._dict:82            existing = self._dict[state.key]83            if existing is not state:84                self._manage_removed_state(existing)85            else:86                return87        self._dict[state.key] = state88        self._manage_incoming_state(state)89    def add(self, state):90        key = state.key91        # inline of self.__contains__92        if key in self._dict:93            try:94                existing_state = self._dict[key]95                if existing_state is not state:96                    o = existing_state.obj()97                    if o is not None:98                        raise AssertionError(99                            "A conflicting state is already "100                            "present in the identity map for key %r"101                            % (key, ))102                else:103                    return104            except KeyError:105                pass106        self._dict[key] = state107        self._manage_incoming_state(state)108    def _add_unpresent(self, state, key):109        # inlined form of add() called by loading.py110        self._dict[key] = state111        state._instance_dict = self._wr112    def get(self, key, default=None):113        if key not in self._dict:114            return default115        state = self._dict[key]116        o = state.obj()117        if o is None:118            return default119        return o120    def items(self):121        values = self.all_states()122        result = []123        for state in values:124            value = state.obj()125            if value is not None:126                result.append((state.key, value))127        return result128    def values(self):129        values = self.all_states()130        result = []131        for state in values:132            value = state.obj()133            if value is not None:134                result.append(value)135        return result136    def __iter__(self):137        return iter(self.keys())138    if util.py2k:139        def iteritems(self):140            return iter(self.items())141        def itervalues(self):142            return iter(self.values())143    def all_states(self):144        if util.py2k:145            return self._dict.values()146        else:147            return list(self._dict.values())148    def _fast_discard(self, state):149        self._dict.pop(state.key, None)150    def discard(self, state):151        st = self._dict.pop(state.key, None)152        if st:153            assert st is state154            self._manage_removed_state(state)155    def safe_discard(self, state):156        if state.key in self._dict:157            st = self._dict[state.key]158            if st is state:159                self._dict.pop(state.key, None)160                self._manage_removed_state(state)161    def prune(self):162        return 0163class StrongInstanceDict(IdentityMap):164    """A 'strong-referencing' version of the identity map.165    .. deprecated:: this object is present in order to fulfill166       the ``weak_identity_map=False`` option of the Session.167       This option is present to allow compatibility with older applications,168       but it is recommended that strong references to objects169       be maintained by the calling application170       externally to the :class:`.Session` itself, to the degree171       that is needed by the application.172    """173    if util.py2k:174        def itervalues(self):175            return self._dict.itervalues()176        def iteritems(self):177            return self._dict.iteritems()178    def __iter__(self):179        return iter(self.dict_)180    def __getitem__(self, key):181        return self._dict[key]182    def __contains__(self, key):183        return key in self._dict184    def get(self, key, default=None):185        return self._dict.get(key, default)186    def values(self):187        return self._dict.values()188    def items(self):189        return self._dict.items()190    def all_states(self):191        return [attributes.instance_state(o) for o in self.values()]192    def contains_state(self, state):193        return (194            state.key in self and195            attributes.instance_state(self[state.key]) is state)196    def replace(self, state):197        if state.key in self._dict:198            existing = self._dict[state.key]199            existing = attributes.instance_state(existing)200            if existing is not state:201                self._manage_removed_state(existing)202            else:203                return204        self._dict[state.key] = state.obj()205        self._manage_incoming_state(state)206    def add(self, state):207        if state.key in self:208            if attributes.instance_state(self._dict[state.key]) is not state:209                raise AssertionError('A conflicting state is already '210                                     'present in the identity map for key %r'211                                     % (state.key, ))212        else:213            self._dict[state.key] = state.obj()214            self._manage_incoming_state(state)215    def _add_unpresent(self, state, key):216        # inlined form of add() called by loading.py217        self._dict[key] = state.obj()218        state._instance_dict = self._wr219    def _fast_discard(self, state):220        self._dict.pop(state.key, None)221    def discard(self, state):222        obj = self._dict.pop(state.key, None)223        if obj is not None:224            self._manage_removed_state(state)225            st = attributes.instance_state(obj)226            assert st is state227    def safe_discard(self, state):228        if state.key in self._dict:229            obj = self._dict[state.key]230            st = attributes.instance_state(obj)231            if st is state:232                self._dict.pop(state.key, None)233                self._manage_removed_state(state)234    def prune(self):235        """prune unreferenced, non-dirty states."""236        ref_count = len(self)237        dirty = [s.obj() for s in self.all_states() if s.modified]238        # work around http://bugs.python.org/issue6149239        keepers = weakref.WeakValueDictionary()240        keepers.update(self)241        self._dict.clear()242        self._dict.update(keepers)243        self.modified = bool(dirty)...iptable14.py
Source:iptable14.py  
...59            #å¨ä½60            cmd_list.append('-j {0}'.format(_dict.get('action')))61        return ' '.join(cmd_list)62            63    def check_and_convert_cmd_dict(self, _dict):64        _group = _dict.get('group') 65        _operate = _dict.get('operate')66        _ruleid = _dict.get('ruleid')67        _protocol = _dict.get('protocol')68        _srcaddress = _dict.get('srcaddress')69        _srcport = _dict.get('srcport')70        _dstaddress = _dict.get('dstaddress')71        _dstport = _dict.get('dstport')72        _action = _dict.get('action','ACCEPT')73        if _action == 'permit':74            _action = 'ACCEPT'75        elif _action == 'deny':76            _action = 'DROP'77        _dict['action'] = _action78        return True79    80    def bulid_cmd(self, _dict):81        82        self.check_and_convert_cmd_dict(_dict)83        84        cmd_list = ['iptables']85        #æä½ç±»å86        if _dict.get('operate') == 'create':87            cmd_list.append(self.bulid_create_cmd(_dict))88        elif _dict.get('operate') == 'insert':89            cmd_list.append(self.bulid_insert_cmd(_dict))90        elif _dict.get('operate') == 'delete':91            cmd_list.append(self.bulid_delete_cmd(_dict))92        elif _dict.get('operate') == 'update':93            cmd_list.append(self.bulid_update_cmd(_dict))94        95        _dict['executecmd'] = ' '.join(cmd_list)96        ...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
