How to use handle_register method in avocado

Best Python code snippet using avocado_python

bot.py

Source:bot.py Github

copy

Full Screen

1# -*- coding:utf-8 -*-2from bs4 import BeautifulSoup3import websocket4import threading5import time6import json7import re8import requests9import os10import wesdk.query as query11websocket._logging._logger.level = -9912def logging(msg):13 now=time.strftime("%Y-%m-%d %X")14 print(f'[{now}]:{msg}')15class Bot(threading.Thread):16 handle_register = {}17 contact_list = {}18 wxid = ''19 name = ''20 AT_PATTERN = re.compile(r'@([^\u2005]+)\u2005(.*)')21 def __init__(self, ip='127.0.0.1', port=5555):22 threading.Thread.__init__(self)23 self.IP = ip24 self.PORT = port25 self.SERVER=f'ws://{ip}:{port}'26 self.ws = websocket.WebSocketApp(self.SERVER, \27 on_open=self.make_on_open(), \28 on_message=self.make_on_message(), \29 on_error=self.make_on_error(), \30 on_close=self.make_on_close())31 contlist = self.get_contact_list()32 for item in contlist['content']:33 self.contact_list[item['wxid']] = item34 def register(self, handle_name, foo):35 self.handle_register[handle_name] = foo36 ########## HTTP API 样例 ##########37 # 发送http请求到hook端38 def send_http(self, uri, data):39 if isinstance(data, str) or isinstance(data, bytes):40 data = json.loads(data)41 base_data={42 'id':query.uuid(),43 'type':'null',44 'roomid':'null',45 'wxid':'null',46 'content':'null',47 'nickname':'null',48 'ext':'null',49 }50 base_data.update(data)51 url = f'http://{self.IP}:{self.PORT}/{uri}'52 rsp = requests.post(url,json={'para':base_data},timeout=5)53 rsp = rsp.json()54 if 'content' in rsp and isinstance(rsp['content'], str):55 try:56 rsp['content'] = json.loads(rsp['content'])57 except:58 pass59 # # 某些版本获取不到群昵称,需要从联系人里取60 # if rsp.get('type', 0) == query.CHATROOM_MEMBER_NICK \61 # and not rsp['content'].get('nick', ''):62 # if rsp['content']['wxid'] not in ['ROOT', 'null']:63 # rsp['content']['nick'] = self.contact_list.get(rsp['content']['wxid'], {}).get('name', '')64 # elif rsp['content']['roomid'] not in ['null']:65 # rsp['content']['nick'] = self.contact_list.get(rsp['content']['roomid'], {}).get('name', '')66 return rsp67 ################## 发送消息 ##################68 def send_msg(self, msg, wxid='null', roomid='null', nickname='null', force_type=None):69 uri = '/api/sendtxtmsg'70 return self.send_http(uri, query.send_msg(msg, wxid, roomid, nickname, force_type))71 ################## 个人信息 ##################72 # get_personal_info 获取登陆账号的个人信息73 def get_personal_info(self):74 uri = '/api/get_personal_info'75 return self.send_http(uri, query.get_personal_info())76 # get_personal_detail 获取指定wxid的个人信息77 def get_personal_detail(self, wxid):78 uri = '/api/get_personal_detail'79 return self.send_http(uri, query.get_personal_detail(wxid))80 # get_user_nick 获取指定wxid的昵称81 def get_user_nick(self, wxid):82 uri='api/getmembernick'83 return self.send_http(uri, query.get_user_nick(wxid))84 # get_contact_list 获取联系人(wxid和roomid)85 def get_contact_list(self):86 uri='/api/getcontactlist'87 return self.send_http(uri, query.get_contact_list())88 ################## 群聊信息 ##################89 # get_chatroom_member 获取指定群的成员90 def get_chatroom_member(self, roomid):91 uri='/api/get_charroom_member_list'92 return self.send_http(uri, query.get_chatroom_member(roomid))93 # get_chatroom_member_nick 获取群聊成员昵称, 或微信好友的昵称(只填wxid时)94 def get_chatroom_member_nick(self, roomid='null', wxid='ROOT'):95 # 获取指定群的成员的昵称 或 微信好友的昵称96 uri='api/getmembernick'97 return self.send_http(uri, query.get_chatroom_member_nick(roomid, wxid))98 99 ########## Message Handle ##########100 # 处理异步返回的消息101 def send_websocket(self, data):102 self.ws.send(data)103 # wshd_noimplement 未实现/dummy handle104 def wshd_noimplement(self, j):105 logging(f'noimplement:{json.dumps(j, ensure_ascii=False)}')106 # wshd_heart_beat 心跳包107 def wshd_heart_beat(self, j):108 if 'heart_beat' in self.handle_register:109 self.handle_register['heart_beat'](j)110 return111 logging(j['content'])112 # wshd_personal_detail 用微信id查询账号信息113 def wshd_personal_detail(self, j):114 if 'personal_detail' in self.handle_register:115 self.handle_register['personal_detail'](j)116 return117 self.wshd_noimplement(j)118 # wshd_personal_info 登陆账号的信息119 def wshd_personal_info(self, j):120 self.name = j['content']['wx_name']121 self.wxid = j['content']['wx_id']122 if 'personal_info' in self.handle_register:123 self.handle_register['personal_info'](j)124 return125 logging("Connect success name: %s wxid: %s"%(self.name, self.wxid))126 # wshd_at_msg 预留函数127 def wshd_at_msg(self, j):128 if 'at_msg' in self.handle_register:129 self.handle_register['at_msg'](j)130 return131 self.wshd_noimplement(j)132 # wshd_chatroom_member_nick 获取昵称133 def wshd_chatroom_member_nick(self, j):134 if 'chatroom_member_nick' in self.handle_register:135 self.handle_register['chatroom_member_nick'](j)136 return137 self.wshd_noimplement(j)138 # wshd_chatroom_member 所有群的成员信息139 def wshd_chatroom_member(self, j):140 if 'chatroom_member' in self.handle_register:141 self.handle_register['chatroom_member'](j)142 return143 self.wshd_noimplement(j)144 # wshd_contact_list 联系人145 def wshd_contact_list(self, j):146 for item in j['content']:147 self.contact_list[item['wxid']] = item148 if 'user_list' in self.handle_register:149 self.handle_register['user_list'](j)150 return151 # alias152 if 'contact_list' in self.handle_register:153 self.handle_register['contact_list'](j)154 return155 # wshd_user_list alias of wshd_contact_list156 def wshd_user_list(self, j):157 self.wshd_contact_list(j)158 # wshd_join_room 进群事件159 def wshd_join_room(self, j):160 if 'join_room' in self.handle_register:161 self.handle_register['join_room'](j)162 return163 logging(f'join_room:{j}')164 # wshd_debug_switch debug开关消息165 def wshd_debug_switch(self, j):166 if 'debug_switch' in self.handle_register:167 self.handle_register['debug_switch'](j)168 return169 logging(j)170 # wshd_recv_txt_cite_msg 引用的文字消息171 def wshd_recv_txt_cite_msg(self, msg):172 if 'recv_txt_cite_msg' in self.handle_register:173 self.handle_register['recv_txt_cite_msg'](msg)174 return175 msgXml=msg['content']['content'].replace('&amp;','&').replace('&lt;','<').replace('&gt;','>')176 soup=BeautifulSoup(msgXml,'lxml')177 msg={178 'content':soup.select_one('title').text,179 'id':msg['id'],180 'id1':msg['content']['id2'],181 'id2':'wxid_dummy',182 'id3':'',183 'srvid':msg['srvid'],184 'time':msg['time'],185 'type':msg['type'],186 'wxid':msg['content']['id1']187 }188 189 self.wshd_recv_txt_msg(msg)190 # wshd_recv_txt_msg 文字消息191 def wshd_recv_txt_msg(self, msg):192 if '@chatroom' in msg['wxid']:193 msg['roomid'] = msg['wxid'] #群id194 msg['senderid'] = msg['id1'] #个人id195 else:196 msg['roomid'] = None197 msg['senderid'] = msg['wxid'] #个人id198 at_match = self.AT_PATTERN.match(msg['content'])199 if at_match:200 msg['at_nickname'] = at_match.groups()[0]201 msg['content'] = at_match.groups()[1]202 msg['at_bot'] = msg['at_nickname'] == self.name203 msg['nickname'] = self.get_chatroom_member_nick(msg['roomid'], msg['senderid'])['content']['nick']204 if 'recv_txt_msg' in self.handle_register:205 try:206 self.handle_register['recv_txt_msg'](msg)207 except:208 import traceback209 traceback.print_exc()210 return211 logging(f'收到消息:{msg}')212 key_replys = {213 "ding": "dong",214 "dong": "ding",215 }216 func_replays = [217 {218 "match":lambda x:x.startswith('/echo '),219 "replay":lambda x:x[len("/echo "):],220 }221 ]222 if msg['content'] in key_replys:223 self.ws.send(query.send_msg(key_replys[msg['content']],wxid=msg['senderid'],roomid=msg['roomid'],nickname=msg['nickname']))224 for f in func_replays:225 if not f["match"](msg['content']):226 continue227 self.ws.send(query.send_msg(f["replay"](msg['content']),wxid=msg['senderid'],roomid=msg['roomid'],nickname=msg['nickname']))228 break229 # wshd_recv_pic_msg 图片消息230 def wshd_recv_pic_msg(self, msg):231 if 'recv_pic_msg' in self.handle_register:232 self.handle_register['recv_pic_msg'](msg)233 return234 self.wshd_noimplement(msg)235 ########## WebSocket Events ##########236 def make_on_open(self):237 def on_open(ws):238 ws.send(query.get_personal_info())239 ws.send(query.get_contact_list())240 if 'on_open' in self.handle_register:241 self.handle_register['on_open'](ws)242 return243 return on_open244 def make_on_error(self):245 def on_error(ws, error):246 if 'on_error' in self.handle_register:247 self.handle_register['on_error'](ws, error)248 return249 logging(f'on_error:{error}')250 return on_error251 def make_on_close(self):252 def on_close(ws):253 if 'on_close' in self.handle_register:254 self.handle_register['on_close'](ws)255 return256 logging("closed")257 return on_close258 def make_on_message(self):259 def on_message(ws, message):260 j=json.loads(message)261 if 'content' in j and isinstance(j['content'], str):262 try:263 j['content'] = json.loads(j['content'])264 except:265 pass266 resp_type=j['type']267 action={268 query.PERSONAL_DETAIL:self.wshd_personal_detail,269 query.PERSONAL_INFO:self.wshd_personal_info,270 query.CHATROOM_MEMBER_NICK:self.wshd_chatroom_member_nick,271 query.AT_MSG:self.wshd_at_msg,272 query.DEBUG_SWITCH:self.wshd_debug_switch,273 query.CHATROOM_MEMBER:self.wshd_chatroom_member,274 query.RECV_PIC_MSG:self.wshd_recv_pic_msg,275 query.RECV_TXT_MSG:self.wshd_recv_txt_msg,276 query.RECV_TXT_CITE_MSG:self.wshd_recv_txt_cite_msg,277 query.HEART_BEAT:self.wshd_heart_beat,278 query.USER_LIST:self.wshd_user_list,279 query.GET_USER_LIST_SUCCSESS:self.wshd_user_list,280 query.GET_USER_LIST_FAIL:self.wshd_user_list,281 query.JOIN_ROOM:self.wshd_join_room,282 query.TXT_MSG:self.wshd_noimplement,283 query.PIC_MSG:self.wshd_noimplement,284 }285 action.get(resp_type, print)(j)286 return on_message287 288 def run(self):...

Full Screen

Full Screen

final_assembler.py

Source:final_assembler.py Github

copy

Full Screen

...9 match self._type:10 case "A":11 # add r1 r2 r512 _, r0, r1, r2 = line.split()13 return f"{self.opcode}00{handle_register(r0)}{handle_register(r1)}{handle_register(r2)}"14 case "B":15 # mov r0 $316 _, r0, imm = line.split()17 return f"{self.opcode}{handle_register(r0)}{handle_immediate(imm[1:])}"18 case "C":19 # mov r4 r520 _, r0, r1 = line.split()21 return f"{self.opcode}00000{handle_register(r0)}{handle_register(r1, flags_allowed=True)}"22 case "D":23 # ld r0 x24 _, r0, addr = line.split()25 return f"{self.opcode}{handle_register(r0)}{handle_address(addr)}"26 case "E":27 # je label28 _, addr = line.split()29 return f"{self.opcode}000{handle_address(addr)}"30 case "F":31 return f"{self.opcode}{'0' * 11}"32 except ValueError:33 throw_error("SyntaxError: Invalid syntax encountered in instruction")34 def assemble(self, line: str):35 if self.secondary_opcode is not None and "$" in line:36 self._type = "B"37 temp = self.opcode38 self.opcode = self.secondary_opcode39 machine_code = self.final_assemble(line)40 self._type = "C"41 self.opcode = temp42 return machine_code43 else:44 return self.final_assemble(line)45def throw_error(message: str):46 print(message)47 quit()48def handle_register(register: str, flags_allowed: Optional[bool]=False):49 if register == "FLAGS":50 if not flags_allowed:51 throw_error("FLAGSError: Illegal use of FLAGS Register")52 else:53 return "111"54 else:55 return bin(int(register.lstrip("r")))[2:].zfill(3)56def handle_immediate(immediate: str):57 if not immediate.isdigit():58 throw_error("SyntaxError: Invalid Immediate value - must be an integer")59 immediate = bin(int(immediate))[2:]60 if len(immediate) > 8:61 throw_error("OverflowError: Immediate value is too large")62 else:...

Full Screen

Full Screen

report_ios.py

Source:report_ios.py Github

copy

Full Screen

...16 datas = ('key', 'value')17 idfa = rparams.get('idfa', '')18 imei = rparams.get('imei', '')19 if idfa or imei:20 self.handle_register(userId, rparams)21 else:22 key = 'report:user:%s' % userId23 TyContext.RedisPayData.execute('HMSET', key, *datas)24 TyContext.RedisPayData.execute('EXPIRE', key, 60)25 def on_user_login(self, userId, rparams):26 TyContext.ftlog.info('ReportIOS->on_user_login', 'userId', userId, 'rparams', rparams)27 key = 'report:user:%s' % userId28 # 是否第一次注册29 if TyContext.RedisPayData.execute('DEL', key):30 TyContext.ftlog.info('ReportIOS->on_user_login', 'userId=%s' % userId, 'rparams', rparams)31 self.handle_register(userId, rparams)32 def handle_register(self, userId, rparams):33 TyContext.ftlog.info('ReportIOS->handle_register', 'userId', userId, 'rparams', rparams)34 for report in self.report_list:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful