Best Python code snippet using fMBT_python
utils.py
Source:utils.py  
1"""2filename    : PersonalData_v0.0.4/utils.py3Author      : Joseph Lin4Email       : joseph.lin@aliyun.com5Date        : 2018/Nov/036Last Change : 2018/Nov/037TODO:8 -[o] import error warn9 -[o] éææä½¿ç¨å
¶å®æ¹å¼ä½¿ç¨ getSubTagfrom() function10"""11import sys, os12import re13import time14# from bs4 import BeautifulSoup15'''-[o] import error alert16å ä¸ºå¤§æ¦ç使ç¨âèæç¯å¢âçåå ï¼17æä»¥å¨æ²¡æä½¿ç¨èæç¯å¢çæ
åµä¸ï¼ éè¦æéï¼18ï¼å°±å django ç manage.py 飿 ·ï¼ï¼19Q: getSubTagfrom() å
鍿¯ä½¿ç¨ BeautifulSoup ç tag object,20    没æ import BeautifulSoup å¨å¤é¨è°ç¨è¿ä¸ªfunction è½ç¨ï¼21A: è½ç¨ï¼ -- ç»è®ºæ¯è¿æ ·ï¼å
·ä½ä¸äºçåçï¼æå¾
äºè§£ã22'''23# 使ç¨å级路å¾ä¸ç phantojs/bin/ æä»¶å¤¹ä¸ç phantojs åºç¨ç¨åºä½ä¸ºé»è®¤24DEFAULT_BROWSER_PATH = './phantomjs/bin/'25if sys.platform == 'win32':26    DEFAULT_BROWSER_NAME = 'phantomjs.exe'27elif sys.platform == 'linux':28    DEFAULT_BROWSER_NAME = 'phantomjs'29elif sys.platform == 'cygwin':30    DEFAULT_BROWSER_NAME = 'phantomjs.exe'31def getSubTagfrom(tag, htmlId=None, htmlTag=None, htmlClass=None):32    subTag = None33    if htmlClass is not None:34        list_ = tag.findAll(htmlClass[0], {'class': htmlClass[1]})35        return list_[0]36    for content in tag.contents:37        """ -[o] ç¨ æµç
çpython ä¸çâååæ´¾æ³å½æ°â éææ¬æ®µä»£ç """38        if htmlId is not None:39            key = htmlId40            '''å ä¸º htmlclass æ¯åå¢å çï¼æ¬æ®µä»£ç æç¹å¿éå¿ï¼41            æä»¥æé»è¾åä¹åçä¿æä¸è´ï¼'''42            if str(content).find(key) != -1:43                subTag = content44                break45        elif htmlTag is not None:46            key = htmlTag47            '''å ä¸º htmlclass æ¯åå¢å çï¼æ¬æ®µä»£ç æç¹å¿éå¿ï¼48            æä»¥æé»è¾åä¹åçä¿æä¸è´ï¼'''49            if str(content).find(key) != -1:50                subTag = content51                break52        else:53            break54    return subTag55def cover_WANG_to_integet(_str):56    highOrder = re.sub("\D", "", _str)57    if "ä¸" in _str:58        return int(highOrder) * 10000 + 999959    else:60        return int(highOrder)61from socket import *62import pickle63def negotiateIPCInfo():64    serverHost = 'localhost'65    serverPort = 5000766    msg = ['OK, give me the port number']67    sockObj = socket(AF_INET, SOCK_STREAM)68    sockObj.connect((serverHost, serverPort), )69    # sockObj.send(pickle.dump(msg[0]))70    sockObj.send(msg[0].encode())71    data = sockObj.recv(1024)72    data = pickle.loads(data)73    if __debug__:74        print('[Debug]: negotiateIPCInfo> Client received: ', data, file=sys.stderr)75    sockObj.close()76    return data77from multiprocessing.connection import Client78def getPageSourceOf(whichPage, _IPCData):79    MAX_RETRY_TIMES = 480    for failTimes in range(MAX_RETRY_TIMES):  # å°è¯ä¸æ¬¡é¾æ¥ IPCã81        try:82            c = Client(('localhost', _IPCData['port']), authkey=b'CSDN-Data')83        except ConnectionRefusedError as e:84            print("[Info] ConnectionRefusedError: ", e)85            if failTimes == MAX_RETRY_TIMES - 1:  # 第 MAX_RETRY_TIMES æ¬¡é¾æ¥å¤±è´¥86                import traceback; traceback.print_exc()87            time.sleep(1 * (failTimes + 1))88        else:89            break90    recv_result = False91    if whichPage == 'HomePage':92        counter = 093        while True:94            _recv = c.recv()  # <--- this will block95            if not _recv:     # timeout not work.96                counter += 197                if counter > 60:98                    print("getHomePageHTMLText> connection timeout",99                          file=sys.stderr)100                    c.close()101                    sys.exit(1)102                sleep(0.1)103            else:104                if recv_result is False:105                    if _recv == 'SYNC':106                        c.send('SYNC')107                    elif _recv == 'Req: whichPage':108                        c.send('HomePage')109                    elif _recv == 'Req: req_url':110                        c.send(_IPCData['req_url'])111                    elif _recv == 'Rsp: pagesource':112                        c.send("Ready")113                        recv_result = True114                else:115                    _pagesource = _recv116                    c.close()117                    return _pagesource118    elif whichPage == 'BlogPage':119        counter = 0120        while True:121            _recv = c.recv()  # <--- this will block122            if not _recv:     # timeout not work.123                counter += 1124                if counter > 60:125                    print("getBlogPageHTMLText> connection timeout",126                          file=sys.stderr)127                    c.close()128                    sys.exit(1)129                sleep(0.1)130            else:131                if recv_result is False:132                    if _recv == 'SYNC':133                        c.send('SYNC')134                    elif _recv == 'Req: whichPage':135                        c.send('BlogPage')136                    elif _recv == 'Req: req_url':137                        c.send(_IPCData['req_url'])138                    elif _recv == 'Rsp: pagesource':139                        c.send("Ready")140                        recv_result = True141                else:142                    _pagesource = _recv143                    c.close()144                    return _pagesource145def getPageSourcesOf(whichPages, _IPCData):146    MAX_RETRY_TIMES = 4147    for failTimes in range(MAX_RETRY_TIMES):  # å°è¯ MAX_RETRY_TIMES æ¬¡é¾æ¥ IPCã148        try:149            c = Client(('localhost', _IPCData['port']), authkey=b'CSDN-Data')150        except ConnectionRefusedError as e:151            print("[Info] ConnectionRefusedError: ", e)152            if failTimes == MAX_RETRY_TIMES - 1:  # 第 MAX_RETRY_TIMES æ¬¡é¾æ¥å¤±è´¥153                import traceback; traceback.print_exc()154            time.sleep(1 * (failTimes + 1))155        else:156            break157    recv_result = False158    if whichPages == 'ArticlesPages':159        while True:160            _recv = c.recv()161            if recv_result is False:162                if _recv == 'SYNC':163                    c.send('SYNC')164                elif _recv == 'Req: whichPage':165                    c.send('ArticlesPages')166                elif _recv == 'Req: req_url':167                    c.send(_IPCData['req_url'])168                elif _recv == 'Rsp: pagesource':169                    c.send('Ready')170                    recv_result = True171            else:172                _pagesources = _recv173                c.close()174                return _pagesources175    else:...fake_server.py
Source:fake_server.py  
...35    def __init__(self):36        self.pubenc = elgamal(server_pubkey)37        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)38        self.s.connect((ip, port))39    def _recv(self):40        data = self.s.recv(1024)41        return data.strip()42    def _send(self, msg):43        if isinstance(msg , str):44            msg = msg.encode()45        self.s.send(msg)46    def enc_send(self, msg , enc_key = b''):47        if enc_key == b'':48            y1 , y2 = self.pubenc.encrypt(bytes_to_long(msg))49            self._send(str(y1) + ', ' + str(y2))50        else:51            assert len(enc_key) == 1652            aes = AES.new(enc_key , AES.MODE_ECB)53            self._send(aes.encrypt(pad(msg)))54    55    def dec_recv(self,  enc_key = b''):56        msg = self._recv()57        if enc_key == b'':58            c = [int(i) for i in msg.split(b', ')]59            m = self.pridec.decrypt(c)60            return long_to_bytes(m)61        else:62            assert len(enc_key) == 1663            aes = AES.new(enc_key , AES.MODE_ECB)64            return unpad(aes.decrypt(msg))65    def signup(self , c):66        self._recv()67        self._send('shallow')68        self._recv()69        self._send(str(c[0]) + ', ' + str(c[1]))70        msg = self._recv()71        if msg[:4] == b'your':72            return 173        else:74            self._send('1')75            self._recv()76            return 077    def choose(self , choice):78        self._recv()79        self._send(str(choice))80    def main(self , c):81        Alice_passwd , bitnumber = readdata()82        while 1:83            print(long_to_bytes(Alice_passwd))84            new_c = [c[0], c[1] * 2**bitnumber % server_pubkey[0]]85            if c[1] == 0:86                exit(0)87            self.choose(1)88            if self.signup(new_c):89                Alice_passwd += 2**(88 - bitnumber)90                bitnumber += 191                self.s.close()92                writedata(Alice_passwd , bitnumber)93                return 094            else:95                bitnumber += 196class fake_server(socketserver.BaseRequestHandler):97    def _recv(self):98        data = self.request.recv(1024)99        return data.strip()100    def _send(self, msg, newline=True):101        if isinstance(msg , bytes):102            msg += b'\n'103        else:104            msg += '\n'105            msg = msg.encode()106        self.request.sendall(msg)107    def enc_send(self, msg , usrid , enc_key = b''):108        if enc_key == b'':109            pubenc = self.pubkey[usrid]110            y1 , y2 = pubenc.encrypt(bytes_to_long(msg))111            self._send(str(y1) + ', ' + str(y2))112        else:113            assert len(enc_key) == 16114            aes = AES.new(enc_key , AES.MODE_ECB)115            self._send(aes.encrypt(pad(msg)))116    117    def dec_recv(self,  enc_key = b''):118        msg = self._recv()119        if enc_key == b'':120            c = [int(i) for i in msg.split(b', ')]121            m = self.prikey.decrypt(c)122            return long_to_bytes(m)123        else:124            assert len(enc_key) == 16125            aes = AES.new(enc_key , AES.MODE_ECB)126            return unpad(aes.decrypt(msg))127    def init_key(self):128        self.pubkey = {}129        self.pubkey[b'Alice'] = elgamal(Alice_pubkey)130    def signin(self):131        self._send('please give me your name')132        userid = self._recv()133        r = readdata()[0]134        self._send('please give me your passwd(encrypted and xored by r)')135        self._send(str(r))136        msg = self._recv()137        c = [int(i) for i in msg.split(b', ')]138        return c139    def handle(self):140        self.init_key()141        key = b''142        userid = ''143        self._send(MENU)144        choice = self._recv()145        c = self.signin()146        alice = fake_Alice()147        alice.main(c)148        print('done')149        self.request.close()150        return 0151class ForkedServer(socketserver.ForkingMixIn, socketserver.TCPServer):152    pass153if __name__ == "__main__":154    HOST, PORT = '0.0.0.0', 9001155    server = ForkedServer((HOST, PORT), fake_server)156    server.allow_reuse_address = True157    server.serve_forever()158 Alice.py
Source:Alice.py  
...17        self.pubenc = elgamal(server_pubkey)18        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)19        self.s.connect((ip, port))20    21    def _recv(self):22        data = self.s.recv(1024)23        return data.strip()24    def _send(self, msg):25        if isinstance(msg , str):26            msg = msg.encode()27        self.s.send(msg)28    def enc_send(self, msg , enc_key = b''):29        if enc_key == b'':30            y1 , y2 = self.pubenc.encrypt(bytes_to_long(msg))31            self._send(str(y1) + ', ' + str(y2))32        else:33            assert len(enc_key) == 1634            aes = AES.new(enc_key , AES.MODE_ECB)35            self._send(aes.encrypt(pad(msg)))36    37    def dec_recv(self,  enc_key = b''):38        msg = self._recv()39        if enc_key == b'':40            c = [int(i) for i in msg.split(b', ')]41            m = self.pridec.decrypt(c)42            return long_to_bytes(m)43        else:44            assert len(enc_key) == 1645            aes = AES.new(enc_key , AES.MODE_ECB)46            return unpad(aes.decrypt(msg))47    def main(self):48        firstmsg = self._recv()49        if firstmsg != b'1. signup  2.signin':50            return 051        self._send('2')52        self._recv()53        self._send('Alice')54        self._recv()55        r = int(self._recv())56        userdata = long_to_bytes(bytes_to_long(AlicePasswd) ^ r)57        self.enc_send(userdata)58        self._recv()59        self._recv()60        endkey = self.dec_recv()61        key = userdata + endkey62        self.enc_send(b'I am a ctfer.Please give me flag' , enc_key= key)63        return self.dec_recv(enc_key= key)64class Task(socketserver.BaseRequestHandler):65    def _recv(self):66        data = self.request.recv(1024)67        return data.strip()68    def _send(self, msg, newline=True):69        if isinstance(msg , bytes):70            msg += b'\n'71        else:72            msg += '\n'73            msg = msg.encode()74        self.request.sendall(msg)75    76    def handle(self):77        signal.alarm(60)78        self._send('Hello, I am Alice, can you tell me the address of the server?\nIn return, I will give you the ctf_flag')79        try:80            addr = self._recv()81            ip, port = [x.strip() for x in addr.split(b':')]82            83            port = int(port)84        except:85            ip, port = '0.0.0.0', 1000186        a = Alice(ip , port)87        msg = a.main()88        self._send(b'Thanks, here is your flag')89        self._send(msg)90class ForkedServer(socketserver.ForkingMixIn, socketserver.TCPServer):91    pass92if __name__ == "__main__":93    HOST, PORT = '0.0.0.0', 1000394    server = ForkedServer((HOST, PORT), Task)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
