Best Python code snippet using avocado_python
bot.py
Source:bot.py
1# -*- coding:utf-8 -*-2from bs4 import BeautifulSoup3import websocket4import threading5import time6import json7import re8import requests9import os10import wesdk.query as query11websocket._logging._logger.level = -9912def logging(msg):13 now=time.strftime("%Y-%m-%d %X")14 print(f'[{now}]:{msg}')15class Bot(threading.Thread):16 handle_register = {}17 contact_list = {}18 wxid = ''19 name = ''20 AT_PATTERN = re.compile(r'@([^\u2005]+)\u2005(.*)')21 def __init__(self, ip='127.0.0.1', port=5555):22 threading.Thread.__init__(self)23 self.IP = ip24 self.PORT = port25 self.SERVER=f'ws://{ip}:{port}'26 self.ws = websocket.WebSocketApp(self.SERVER, \27 on_open=self.make_on_open(), \28 on_message=self.make_on_message(), \29 on_error=self.make_on_error(), \30 on_close=self.make_on_close())31 contlist = self.get_contact_list()32 for item in contlist['content']:33 self.contact_list[item['wxid']] = item34 def register(self, handle_name, foo):35 self.handle_register[handle_name] = foo36 ########## HTTP API æ ·ä¾ ##########37 # åéhttp请æ±å°hook端38 def send_http(self, uri, data):39 if isinstance(data, str) or isinstance(data, bytes):40 data = json.loads(data)41 base_data={42 'id':query.uuid(),43 'type':'null',44 'roomid':'null',45 'wxid':'null',46 'content':'null',47 'nickname':'null',48 'ext':'null',49 }50 base_data.update(data)51 url = f'http://{self.IP}:{self.PORT}/{uri}'52 rsp = requests.post(url,json={'para':base_data},timeout=5)53 rsp = rsp.json()54 if 'content' in rsp and isinstance(rsp['content'], str):55 try:56 rsp['content'] = json.loads(rsp['content'])57 except:58 pass59 # # æäºçæ¬è·åä¸å°ç¾¤æµç§°ï¼éè¦ä»è系人éå60 # if rsp.get('type', 0) == query.CHATROOM_MEMBER_NICK \61 # and not rsp['content'].get('nick', ''):62 # if rsp['content']['wxid'] not in ['ROOT', 'null']:63 # rsp['content']['nick'] = self.contact_list.get(rsp['content']['wxid'], {}).get('name', '')64 # elif rsp['content']['roomid'] not in ['null']:65 # rsp['content']['nick'] = self.contact_list.get(rsp['content']['roomid'], {}).get('name', '')66 return rsp67 ################## åéæ¶æ¯ ##################68 def send_msg(self, msg, wxid='null', roomid='null', nickname='null', force_type=None):69 uri = '/api/sendtxtmsg'70 return self.send_http(uri, query.send_msg(msg, wxid, roomid, nickname, force_type))71 ################## ä¸ªäººä¿¡æ¯ ##################72 # get_personal_info è·åç»éè´¦å·ç个人信æ¯73 def get_personal_info(self):74 uri = '/api/get_personal_info'75 return self.send_http(uri, query.get_personal_info())76 # get_personal_detail è·åæå®wxidç个人信æ¯77 def get_personal_detail(self, wxid):78 uri = '/api/get_personal_detail'79 return self.send_http(uri, query.get_personal_detail(wxid))80 # get_user_nick è·åæå®wxidçæµç§°81 def get_user_nick(self, wxid):82 uri='api/getmembernick'83 return self.send_http(uri, query.get_user_nick(wxid))84 # get_contact_list è·åè系人(wxidåroomid)85 def get_contact_list(self):86 uri='/api/getcontactlist'87 return self.send_http(uri, query.get_contact_list())88 ################## 群èä¿¡æ¯ ##################89 # get_chatroom_member è·åæå®ç¾¤çæå90 def get_chatroom_member(self, roomid):91 uri='/api/get_charroom_member_list'92 return self.send_http(uri, query.get_chatroom_member(roomid))93 # get_chatroom_member_nick è·å群èæåæµç§°, æ微信好åçæµç§°(åªå¡«wxidæ¶)94 def get_chatroom_member_nick(self, roomid='null', wxid='ROOT'):95 # è·åæå®ç¾¤çæåçæµç§° æ 微信好åçæµç§°96 uri='api/getmembernick'97 return self.send_http(uri, query.get_chatroom_member_nick(roomid, wxid))98 99 ########## Message Handle ##########100 # å¤çå¼æ¥è¿åçæ¶æ¯101 def send_websocket(self, data):102 self.ws.send(data)103 # wshd_noimplement æªå®ç°/dummy handle104 def wshd_noimplement(self, j):105 logging(f'noimplement:{json.dumps(j, ensure_ascii=False)}')106 # wshd_heart_beat å¿è·³å
107 def wshd_heart_beat(self, j):108 if 'heart_beat' in self.handle_register:109 self.handle_register['heart_beat'](j)110 return111 logging(j['content'])112 # wshd_personal_detail ç¨å¾®ä¿¡idæ¥è¯¢è´¦å·ä¿¡æ¯113 def wshd_personal_detail(self, j):114 if 'personal_detail' in self.handle_register:115 self.handle_register['personal_detail'](j)116 return117 self.wshd_noimplement(j)118 # wshd_personal_info ç»éè´¦å·çä¿¡æ¯119 def wshd_personal_info(self, j):120 self.name = j['content']['wx_name']121 self.wxid = j['content']['wx_id']122 if 'personal_info' in self.handle_register:123 self.handle_register['personal_info'](j)124 return125 logging("Connect success name: %s wxid: %s"%(self.name, self.wxid))126 # wshd_at_msg é¢çå½æ°127 def wshd_at_msg(self, j):128 if 'at_msg' in self.handle_register:129 self.handle_register['at_msg'](j)130 return131 self.wshd_noimplement(j)132 # wshd_chatroom_member_nick è·åæµç§°133 def wshd_chatroom_member_nick(self, j):134 if 'chatroom_member_nick' in self.handle_register:135 self.handle_register['chatroom_member_nick'](j)136 return137 self.wshd_noimplement(j)138 # wshd_chatroom_member ææ群çæåä¿¡æ¯139 def wshd_chatroom_member(self, j):140 if 'chatroom_member' in self.handle_register:141 self.handle_register['chatroom_member'](j)142 return143 self.wshd_noimplement(j)144 # wshd_contact_list è系人145 def wshd_contact_list(self, j):146 for item in j['content']:147 self.contact_list[item['wxid']] = item148 if 'user_list' in self.handle_register:149 self.handle_register['user_list'](j)150 return151 # alias152 if 'contact_list' in self.handle_register:153 self.handle_register['contact_list'](j)154 return155 # wshd_user_list alias of wshd_contact_list156 def wshd_user_list(self, j):157 self.wshd_contact_list(j)158 # wshd_join_room è¿ç¾¤äºä»¶159 def wshd_join_room(self, j):160 if 'join_room' in self.handle_register:161 self.handle_register['join_room'](j)162 return163 logging(f'join_room:{j}')164 # wshd_debug_switch debugå¼å
³æ¶æ¯165 def wshd_debug_switch(self, j):166 if 'debug_switch' in self.handle_register:167 self.handle_register['debug_switch'](j)168 return169 logging(j)170 # wshd_recv_txt_cite_msg å¼ç¨çæåæ¶æ¯171 def wshd_recv_txt_cite_msg(self, msg):172 if 'recv_txt_cite_msg' in self.handle_register:173 self.handle_register['recv_txt_cite_msg'](msg)174 return175 msgXml=msg['content']['content'].replace('&','&').replace('<','<').replace('>','>')176 soup=BeautifulSoup(msgXml,'lxml')177 msg={178 'content':soup.select_one('title').text,179 'id':msg['id'],180 'id1':msg['content']['id2'],181 'id2':'wxid_dummy',182 'id3':'',183 'srvid':msg['srvid'],184 'time':msg['time'],185 'type':msg['type'],186 'wxid':msg['content']['id1']187 }188 189 self.wshd_recv_txt_msg(msg)190 # wshd_recv_txt_msg æåæ¶æ¯191 def wshd_recv_txt_msg(self, msg):192 if '@chatroom' in msg['wxid']:193 msg['roomid'] = msg['wxid'] #群id194 msg['senderid'] = msg['id1'] #个人id195 else:196 msg['roomid'] = None197 msg['senderid'] = msg['wxid'] #个人id198 at_match = self.AT_PATTERN.match(msg['content'])199 if at_match:200 msg['at_nickname'] = at_match.groups()[0]201 msg['content'] = at_match.groups()[1]202 msg['at_bot'] = msg['at_nickname'] == self.name203 msg['nickname'] = self.get_chatroom_member_nick(msg['roomid'], msg['senderid'])['content']['nick']204 if 'recv_txt_msg' in self.handle_register:205 try:206 self.handle_register['recv_txt_msg'](msg)207 except:208 import traceback209 traceback.print_exc()210 return211 logging(f'æ¶å°æ¶æ¯:{msg}')212 key_replys = {213 "ding": "dong",214 "dong": "ding",215 }216 func_replays = [217 {218 "match":lambda x:x.startswith('/echo '),219 "replay":lambda x:x[len("/echo "):],220 }221 ]222 if msg['content'] in key_replys:223 self.ws.send(query.send_msg(key_replys[msg['content']],wxid=msg['senderid'],roomid=msg['roomid'],nickname=msg['nickname']))224 for f in func_replays:225 if not f["match"](msg['content']):226 continue227 self.ws.send(query.send_msg(f["replay"](msg['content']),wxid=msg['senderid'],roomid=msg['roomid'],nickname=msg['nickname']))228 break229 # wshd_recv_pic_msg å¾çæ¶æ¯230 def wshd_recv_pic_msg(self, msg):231 if 'recv_pic_msg' in self.handle_register:232 self.handle_register['recv_pic_msg'](msg)233 return234 self.wshd_noimplement(msg)235 ########## WebSocket Events ##########236 def make_on_open(self):237 def on_open(ws):238 ws.send(query.get_personal_info())239 ws.send(query.get_contact_list())240 if 'on_open' in self.handle_register:241 self.handle_register['on_open'](ws)242 return243 return on_open244 def make_on_error(self):245 def on_error(ws, error):246 if 'on_error' in self.handle_register:247 self.handle_register['on_error'](ws, error)248 return249 logging(f'on_error:{error}')250 return on_error251 def make_on_close(self):252 def on_close(ws):253 if 'on_close' in self.handle_register:254 self.handle_register['on_close'](ws)255 return256 logging("closed")257 return on_close258 def make_on_message(self):259 def on_message(ws, message):260 j=json.loads(message)261 if 'content' in j and isinstance(j['content'], str):262 try:263 j['content'] = json.loads(j['content'])264 except:265 pass266 resp_type=j['type']267 action={268 query.PERSONAL_DETAIL:self.wshd_personal_detail,269 query.PERSONAL_INFO:self.wshd_personal_info,270 query.CHATROOM_MEMBER_NICK:self.wshd_chatroom_member_nick,271 query.AT_MSG:self.wshd_at_msg,272 query.DEBUG_SWITCH:self.wshd_debug_switch,273 query.CHATROOM_MEMBER:self.wshd_chatroom_member,274 query.RECV_PIC_MSG:self.wshd_recv_pic_msg,275 query.RECV_TXT_MSG:self.wshd_recv_txt_msg,276 query.RECV_TXT_CITE_MSG:self.wshd_recv_txt_cite_msg,277 query.HEART_BEAT:self.wshd_heart_beat,278 query.USER_LIST:self.wshd_user_list,279 query.GET_USER_LIST_SUCCSESS:self.wshd_user_list,280 query.GET_USER_LIST_FAIL:self.wshd_user_list,281 query.JOIN_ROOM:self.wshd_join_room,282 query.TXT_MSG:self.wshd_noimplement,283 query.PIC_MSG:self.wshd_noimplement,284 }285 action.get(resp_type, print)(j)286 return on_message287 288 def run(self):...
final_assembler.py
Source:final_assembler.py
...9 match self._type:10 case "A":11 # add r1 r2 r512 _, r0, r1, r2 = line.split()13 return f"{self.opcode}00{handle_register(r0)}{handle_register(r1)}{handle_register(r2)}"14 case "B":15 # mov r0 $316 _, r0, imm = line.split()17 return f"{self.opcode}{handle_register(r0)}{handle_immediate(imm[1:])}"18 case "C":19 # mov r4 r520 _, r0, r1 = line.split()21 return f"{self.opcode}00000{handle_register(r0)}{handle_register(r1, flags_allowed=True)}"22 case "D":23 # ld r0 x24 _, r0, addr = line.split()25 return f"{self.opcode}{handle_register(r0)}{handle_address(addr)}"26 case "E":27 # je label28 _, addr = line.split()29 return f"{self.opcode}000{handle_address(addr)}"30 case "F":31 return f"{self.opcode}{'0' * 11}"32 except ValueError:33 throw_error("SyntaxError: Invalid syntax encountered in instruction")34 def assemble(self, line: str):35 if self.secondary_opcode is not None and "$" in line:36 self._type = "B"37 temp = self.opcode38 self.opcode = self.secondary_opcode39 machine_code = self.final_assemble(line)40 self._type = "C"41 self.opcode = temp42 return machine_code43 else:44 return self.final_assemble(line)45def throw_error(message: str):46 print(message)47 quit()48def handle_register(register: str, flags_allowed: Optional[bool]=False):49 if register == "FLAGS":50 if not flags_allowed:51 throw_error("FLAGSError: Illegal use of FLAGS Register")52 else:53 return "111"54 else:55 return bin(int(register.lstrip("r")))[2:].zfill(3)56def handle_immediate(immediate: str):57 if not immediate.isdigit():58 throw_error("SyntaxError: Invalid Immediate value - must be an integer")59 immediate = bin(int(immediate))[2:]60 if len(immediate) > 8:61 throw_error("OverflowError: Immediate value is too large")62 else:...
report_ios.py
Source:report_ios.py
...16 datas = ('key', 'value')17 idfa = rparams.get('idfa', '')18 imei = rparams.get('imei', '')19 if idfa or imei:20 self.handle_register(userId, rparams)21 else:22 key = 'report:user:%s' % userId23 TyContext.RedisPayData.execute('HMSET', key, *datas)24 TyContext.RedisPayData.execute('EXPIRE', key, 60)25 def on_user_login(self, userId, rparams):26 TyContext.ftlog.info('ReportIOS->on_user_login', 'userId', userId, 'rparams', rparams)27 key = 'report:user:%s' % userId28 # æ¯å¦ç¬¬ä¸æ¬¡æ³¨å29 if TyContext.RedisPayData.execute('DEL', key):30 TyContext.ftlog.info('ReportIOS->on_user_login', 'userId=%s' % userId, 'rparams', rparams)31 self.handle_register(userId, rparams)32 def handle_register(self, userId, rparams):33 TyContext.ftlog.info('ReportIOS->handle_register', 'userId', userId, 'rparams', rparams)34 for report in self.report_list:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!