How to use _dict method in Molotov

Best Python code snippet using molotov_python

analyse_utils.py

Source:analyse_utils.py Github

copy

Full Screen

1#!/usr/bin/env python2# -*- coding: utf-8 -*-3import collections4import jieba5import re6from copy import deepcopy7from logging import ERROR8jieba.setLogLevel(ERROR)9#文章类型对照表10CLASS = {11 0: '娱乐',12 1: '体育',13 2: '彩票',14 3: '房产',15 4: '教育',16 5: '时尚',17 6: '时政',18 7: '星座',19 8: '游戏',20 9: '社会',21 10: '科技',22 11: '股票',23 12: '财经',24 13: '家居'25}26#传入post对象27class phCount(object):28 """29 传入数据, 分析出用用户的手机30 """31 def __init__(self, data_iter):32 """33 初始化属性34 """35 self._data_iter = deepcopy(data_iter)36 def _gen_counter(self):37 """38 生成计数器对象, 对于每一条数据而言39 :return: 生成器40 """41 _data = (_data['device'] for _data in self._data_iter)42 for _ in _data:43 yield collections.Counter(_)44 def _result_dirty(self):45 """46 对计数器对象求和,初步生成,数据47 :return: 不符合echarts数据格式, 需进一步格式化48 """49 _result = collections.Counter()50 for _counter in self._gen_counter():51 _result += _counter52 return _result53 #对外提供接口54 def result(self):55 """56 对数据格式化, 返回echarts要求的数据格式,这里将ipad用户统一归于苹果用户57 """58 for _res_tuple in self._result_dirty().items():59 _dict = dict()60 if _res_tuple[0] == 'android':61 _dict['value'] = _res_tuple[1]62 _dict['name'] = '安卓用户'63 elif _res_tuple[0] == 'apple':64 _dict['value'] = _res_tuple[1]65 _dict['name'] = '苹果用户'66 else:67 _dict['value'] = _res_tuple[1]68 _dict['name'] = '平板用户'69 yield _dict70#传入user对象71class sexCount(object):72 """统计男女比例"""73 def __init__(self, data_iter):74 """75 初始化属性76 """77 self._data_iter = deepcopy(data_iter)78 @property79 def _sex_list(self):80 """81 封装每条数据的性别为列表82 """83 _sex_list = (data['sex'] for data in self._data_iter)84 return _sex_list85 def _result_dirty(self):86 """87 生成计数器对象88 """89 _result_dirty = collections.Counter(self._sex_list)90 return _result_dirty91 #对外接口92 def result(self):93 """94 对计数器对象结构化处理为符合echarts要求格式95 """96 for _res_tuple in self._result_dirty().items():97 _dict = dict()98 if _res_tuple[0] == 'male':99 _dict['value'] = _res_tuple[1]100 _dict['name'] = '男生'101 else:102 _dict['value'] = _res_tuple[1]103 _dict['name'] = '女生'104 yield _dict105#传入post对象,停用词词典106class contentWordCount(object):107 def __init__(self, data_iter, stopWord):108 """109 初始化属性, 加载自定义词典, 加载停用词110 """111 self._data_iter = deepcopy(data_iter)112 self._stop_word = deepcopy(stopWord)113 def _fileterStopWord(self, content_dirty):114 """115 去除停用词辅助函数116 """117 content = filter(lambda _: _ not in self._stop_word and len(_) > 1, content_dirty)118 return content119 @property120 def _content_clean(self):121 """122 经过分词, 去除停用词的内容,返回一个生成器123 """124 _content_dirty = (jieba.cut(_data['reply_content']) for _data in self._data_iter)125 return map(self._fileterStopWord, _content_dirty)126 def _gen_counter(self):127 """128 计数器生成器对象129 """130 for _ in self._content_clean:131 yield collections.Counter(_)132 @property133 def _result_dirty(self):134 """135 得到两百个热词136 """137 _result = collections.Counter()138 for _ in self._gen_counter():139 _result += _140 return iter(_result.most_common(100))141 #对外接口142 def result(self):143 """144 返回生成器145 """146 for _res_tuple in self._result_dirty:147 _dict = dict()148 _dict['value'] = _res_tuple[1]149 _dict['name'] = _res_tuple[0]150 yield _dict151#传入user对象152class nickNameWordCount(object):153 def __init__(self, data_iter):154 """155 初始化属性156 """157 self._data_iter = deepcopy(data_iter)158 @property159 def _nick_name_list(self):160 """161 所有昵称,分词后:162 """163 _nick_name = (jieba.cut(''.join(re.findall(r'[\u4e00-\u9fa5]', nk_name['name_show']))) for nk_name in self._data_iter)164 return map(lambda _nick: filter(lambda _: _ not in ['你', '我', '他', '她', '它', '的', '是', '啊', '大', '小', '了', '吧', '不', '自', '一'], _nick), _nick_name)165 def _gen_counter(self):166 """167 产生计数器对象168 :return: 生成器169 """170 for _ in self._nick_name_list:171 yield collections.Counter(_)172 @property173 def _result_dirty(self):174 """175 产生非echarts规范数据格式的数据,显示100条176 """177 _result = collections.Counter()178 for _ in self._gen_counter():179 _result += _180 return _result.most_common(100)181 #对外接口182 def result(self):183 """184 处理成echarts规范数据185 """186 for _res_tuple in self._result_dirty:187 _dict = dict()188 _dict['value'] = _res_tuple[1]189 _dict['name'] = _res_tuple[0]190 yield _dict191#传入user对象192class vipCount(object):193 """194 统计用户开通VIP状况195 """196 def __init__(self, data_iter):197 self.data_iter = deepcopy(data_iter)198 @property199 def _gen_tb_vip_list(self):200 return (_isVip['tb_vip'] for _isVip in self.data_iter)201 @property202 def _result_dirty(self):203 return collections.Counter(self._gen_tb_vip_list)204 #对外接口205 def result(self):206 """结构化数据"""207 for _res_tuple in self._result_dirty.items():208 _dict = dict()209 if _res_tuple[0]:210 _dict['value'] = _res_tuple[1]211 _dict['name'] = '会员用户'212 else:213 _dict['value'] = _res_tuple[1]214 _dict['name'] = '普通用户'215 yield _dict216#传入posts对象217class SentimentCount(object):218 """219 统计情感数量220 """221 def __init__(self, data_iter):222 """复制迭代器对象"""223 self._data_iter = deepcopy(data_iter)224 @property225 def _get_sent_list(self):226 for _ in self._data_iter:227 yield _.get('sentiment')228 @property229 def _get_dirty_counter(self):230 return collections.Counter(self._get_sent_list)231 # 对外接口232 def result(self):233 for name, value in self._get_dirty_counter.items():234 _dict = dict()235 if name == 0:236 _dict['name'] = '消极'237 _dict['value'] = value238 elif name == 1:239 _dict['name'] = '中性'240 _dict['value'] = value241 else:242 _dict['name'] = '积极'243 _dict['value'] = value244 yield _dict245# 传入posts对象246class ClassCount(object):247 """248 统计情感数量249 """250 def __init__(self, data_iter):251 """复制迭代器对象"""252 self._data_iter = deepcopy(data_iter)253 @property254 def _get_class_list(self):255 for _ in self._data_iter:256 yield _.get('classify')257 @property258 def _get_dirty_counter(self):259 return collections.Counter(self._get_class_list)260 # 对外接口261 def result(self):262 for name, value in self._get_dirty_counter.items():263 yield {'name': CLASS.get(name), 'value': value}264#加载停用词265def loadStopWord(path):266 """267 加载停用词词典268 :return:269 """270 with open(path, 'r', encoding='utf8') as f_obj:...

Full Screen

Full Screen

identity.py

Source:identity.py Github

copy

Full Screen

1# orm/identity.py2# Copyright (C) 2005-2015 the SQLAlchemy authors and contributors3# <see AUTHORS file>4#5# This module is part of SQLAlchemy and is released under6# the MIT License: http://www.opensource.org/licenses/mit-license.php7import weakref8from . import attributes9from .. import util10class IdentityMap(object):11 def __init__(self):12 self._dict = {}13 self._modified = set()14 self._wr = weakref.ref(self)15 def keys(self):16 return self._dict.keys()17 def replace(self, state):18 raise NotImplementedError()19 def add(self, state):20 raise NotImplementedError()21 def _add_unpresent(self, state, key):22 """optional inlined form of add() which can assume item isn't present23 in the map"""24 self.add(state)25 def update(self, dict):26 raise NotImplementedError("IdentityMap uses add() to insert data")27 def clear(self):28 raise NotImplementedError("IdentityMap uses remove() to remove data")29 def _manage_incoming_state(self, state):30 state._instance_dict = self._wr31 if state.modified:32 self._modified.add(state)33 def _manage_removed_state(self, state):34 del state._instance_dict35 if state.modified:36 self._modified.discard(state)37 def _dirty_states(self):38 return self._modified39 def check_modified(self):40 """return True if any InstanceStates present have been marked41 as 'modified'.42 """43 return bool(self._modified)44 def has_key(self, key):45 return key in self46 def popitem(self):47 raise NotImplementedError("IdentityMap uses remove() to remove data")48 def pop(self, key, *args):49 raise NotImplementedError("IdentityMap uses remove() to remove data")50 def setdefault(self, key, default=None):51 raise NotImplementedError("IdentityMap uses add() to insert data")52 def __len__(self):53 return len(self._dict)54 def copy(self):55 raise NotImplementedError()56 def __setitem__(self, key, value):57 raise NotImplementedError("IdentityMap uses add() to insert data")58 def __delitem__(self, key):59 raise NotImplementedError("IdentityMap uses remove() to remove data")60class WeakInstanceDict(IdentityMap):61 def __getitem__(self, key):62 state = self._dict[key]63 o = state.obj()64 if o is None:65 raise KeyError(key)66 return o67 def __contains__(self, key):68 try:69 if key in self._dict:70 state = self._dict[key]71 o = state.obj()72 else:73 return False74 except KeyError:75 return False76 else:77 return o is not None78 def contains_state(self, state):79 return state.key in self._dict and self._dict[state.key] is state80 def replace(self, state):81 if state.key in self._dict:82 existing = self._dict[state.key]83 if existing is not state:84 self._manage_removed_state(existing)85 else:86 return87 self._dict[state.key] = state88 self._manage_incoming_state(state)89 def add(self, state):90 key = state.key91 # inline of self.__contains__92 if key in self._dict:93 try:94 existing_state = self._dict[key]95 if existing_state is not state:96 o = existing_state.obj()97 if o is not None:98 raise AssertionError(99 "A conflicting state is already "100 "present in the identity map for key %r"101 % (key, ))102 else:103 return104 except KeyError:105 pass106 self._dict[key] = state107 self._manage_incoming_state(state)108 def _add_unpresent(self, state, key):109 # inlined form of add() called by loading.py110 self._dict[key] = state111 state._instance_dict = self._wr112 def get(self, key, default=None):113 if key not in self._dict:114 return default115 state = self._dict[key]116 o = state.obj()117 if o is None:118 return default119 return o120 def items(self):121 values = self.all_states()122 result = []123 for state in values:124 value = state.obj()125 if value is not None:126 result.append((state.key, value))127 return result128 def values(self):129 values = self.all_states()130 result = []131 for state in values:132 value = state.obj()133 if value is not None:134 result.append(value)135 return result136 def __iter__(self):137 return iter(self.keys())138 if util.py2k:139 def iteritems(self):140 return iter(self.items())141 def itervalues(self):142 return iter(self.values())143 def all_states(self):144 if util.py2k:145 return self._dict.values()146 else:147 return list(self._dict.values())148 def _fast_discard(self, state):149 self._dict.pop(state.key, None)150 def discard(self, state):151 st = self._dict.pop(state.key, None)152 if st:153 assert st is state154 self._manage_removed_state(state)155 def safe_discard(self, state):156 if state.key in self._dict:157 st = self._dict[state.key]158 if st is state:159 self._dict.pop(state.key, None)160 self._manage_removed_state(state)161 def prune(self):162 return 0163class StrongInstanceDict(IdentityMap):164 """A 'strong-referencing' version of the identity map.165 .. deprecated:: this object is present in order to fulfill166 the ``weak_identity_map=False`` option of the Session.167 This option is present to allow compatibility with older applications,168 but it is recommended that strong references to objects169 be maintained by the calling application170 externally to the :class:`.Session` itself, to the degree171 that is needed by the application.172 """173 if util.py2k:174 def itervalues(self):175 return self._dict.itervalues()176 def iteritems(self):177 return self._dict.iteritems()178 def __iter__(self):179 return iter(self.dict_)180 def __getitem__(self, key):181 return self._dict[key]182 def __contains__(self, key):183 return key in self._dict184 def get(self, key, default=None):185 return self._dict.get(key, default)186 def values(self):187 return self._dict.values()188 def items(self):189 return self._dict.items()190 def all_states(self):191 return [attributes.instance_state(o) for o in self.values()]192 def contains_state(self, state):193 return (194 state.key in self and195 attributes.instance_state(self[state.key]) is state)196 def replace(self, state):197 if state.key in self._dict:198 existing = self._dict[state.key]199 existing = attributes.instance_state(existing)200 if existing is not state:201 self._manage_removed_state(existing)202 else:203 return204 self._dict[state.key] = state.obj()205 self._manage_incoming_state(state)206 def add(self, state):207 if state.key in self:208 if attributes.instance_state(self._dict[state.key]) is not state:209 raise AssertionError('A conflicting state is already '210 'present in the identity map for key %r'211 % (state.key, ))212 else:213 self._dict[state.key] = state.obj()214 self._manage_incoming_state(state)215 def _add_unpresent(self, state, key):216 # inlined form of add() called by loading.py217 self._dict[key] = state.obj()218 state._instance_dict = self._wr219 def _fast_discard(self, state):220 self._dict.pop(state.key, None)221 def discard(self, state):222 obj = self._dict.pop(state.key, None)223 if obj is not None:224 self._manage_removed_state(state)225 st = attributes.instance_state(obj)226 assert st is state227 def safe_discard(self, state):228 if state.key in self._dict:229 obj = self._dict[state.key]230 st = attributes.instance_state(obj)231 if st is state:232 self._dict.pop(state.key, None)233 self._manage_removed_state(state)234 def prune(self):235 """prune unreferenced, non-dirty states."""236 ref_count = len(self)237 dirty = [s.obj() for s in self.all_states() if s.modified]238 # work around http://bugs.python.org/issue6149239 keepers = weakref.WeakValueDictionary()240 keepers.update(self)241 self._dict.clear()242 self._dict.update(keepers)243 self.modified = bool(dirty)...

Full Screen

Full Screen

iptable14.py

Source:iptable14.py Github

copy

Full Screen

...59 #动作60 cmd_list.append('-j {0}'.format(_dict.get('action')))61 return ' '.join(cmd_list)62 63 def check_and_convert_cmd_dict(self, _dict):64 _group = _dict.get('group') 65 _operate = _dict.get('operate')66 _ruleid = _dict.get('ruleid')67 _protocol = _dict.get('protocol')68 _srcaddress = _dict.get('srcaddress')69 _srcport = _dict.get('srcport')70 _dstaddress = _dict.get('dstaddress')71 _dstport = _dict.get('dstport')72 _action = _dict.get('action','ACCEPT')73 if _action == 'permit':74 _action = 'ACCEPT'75 elif _action == 'deny':76 _action = 'DROP'77 _dict['action'] = _action78 return True79 80 def bulid_cmd(self, _dict):81 82 self.check_and_convert_cmd_dict(_dict)83 84 cmd_list = ['iptables']85 #操作类型86 if _dict.get('operate') == 'create':87 cmd_list.append(self.bulid_create_cmd(_dict))88 elif _dict.get('operate') == 'insert':89 cmd_list.append(self.bulid_insert_cmd(_dict))90 elif _dict.get('operate') == 'delete':91 cmd_list.append(self.bulid_delete_cmd(_dict))92 elif _dict.get('operate') == 'update':93 cmd_list.append(self.bulid_update_cmd(_dict))94 95 _dict['executecmd'] = ' '.join(cmd_list)96 ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Molotov automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful