How to use system_encode method in Robotframework

Best Python code snippet using robotframework

request.py

Source:request.py Github

copy

Full Screen

...130 if self.args.debug:131 self.lock.acquire()132 print '*' * self.console_width133 print '[Response headers and response text]\n'134 print res_headers + '\n' + system_encode(html_doc)135 print '\n' + '*' * self.console_width136 self.lock.release()137 if self.args.rtxt and html_doc.find(self.args.rtxt) >= 0:138 raise Exception('Retry for <%s>' % system_encode(self.args.rtxt) )139 if self.args.rntxt and html_doc.find(self.args.rntxt) < 0:140 raise Exception('Retry for no <%s>' % system_encode(self.args.rntxt))141 if self.args.rheader and res_headers.find(self.args.rheader) >= 0:142 raise Exception('Retry for header <%s>' % self.args.rheader)143 if self.args.rnheader and res_headers.find(self.args.rnheader) < 0:144 raise Exception('Retry for no header <%s>' % self.args.rnheader)145 found_err_tag = False146 for tag in self.args.err:147 if html_doc.find(tag) >= 0:148 found_err_tag = True149 found_suc_tag = False150 suc_tag_matched = ''151 for tag in self.args.suc:152 if html_doc.find(tag) >= 0:153 suc_tag_matched += tag + ' '154 found_suc_tag = True155 suc_tag_matched = suc_tag_matched.strip()156 data = urllib.unquote(data)157 cracked_msg = ''158 if (not self.args.no302 and response.status == 302):159 cracked_msg = '[+]%s \t\t{302 redirect}' % data160 if response.status == 200 and self.args.err and not found_err_tag:161 cracked_msg = '[+]%s \t\t{%s not found}' % (data, self.args.err)162 if self.args.suc and found_suc_tag:163 cracked_msg = '[+]%s \t\t[Found %s]' % (data, suc_tag_matched)164 if self.args.herr and res_headers.find(self.args.herr) < 0:165 cracked_msg = '[+]%s \t\t[%s not found in headers]' % (data, self.args.herr)166 if self.args.hsuc and res_headers.find(self.args.hsuc) >=0:167 cracked_msg = '[+]%s \t\t[Found %s in headers]' % (data, self.args.hsuc)168 if self.args.basic and response.status < 400:169 local_headers['Authorization'] = ''170 cracked_msg = '[+][Basic Auth] %s %s' % (params, self.args.u)171 if cracked_msg:172 add_cracked_count(self)173 if self.args.checkproxy:174 self.print_s('[+OK] %s' % cur_proxy, color_red=True)175 with open('001.proxy.servers.txt', 'a') as outFile:176 outFile.write(cur_proxy + '\n')177 else:178 self.print_s(system_encode('[+OK]%s' % data_print), color_red=True)179 with open(self.args.o, 'a') as outFile:180 outFile.write(cracked_msg + '\n')181 if err_count == max_err_count: self.queue.put(origin_params) # put in queue again182 if self.args.sleep: time.sleep( float(self.args.sleep) )183 break184 except Exception, e:185 err_count += 1186 if not self.args.checkproxy: self.print_s('[Exception in do_request] %s' % e)187 try:188 conn.close()189 except:190 pass191 time.sleep(3.0)192 self.queue.task_done()

Full Screen

Full Screen

weiboPicDownloader.py

Source:weiboPicDownloader.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""3Created on Fri Sep 15 01:38:07 20174@author: nondanee5"""6import os, sys, locale7import json, re, time8import concurrent.futures9import requests10try:11 requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)12except:13 pass14try:15 reload(sys)16 sys.setdefaultencoding('utf8') 17except:18 pass19try:20 input = raw_input21except NameError:22 pass23IS_PYTHON2 = sys.version[0] == "2"24SYSTEM_ENCODE = sys.stdin.encoding or locale.getpreferredencoding(True)25USER_AGENT = "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36"26PIC_AMOUNT = 027SAVE_PATH = ""28def print_fit(string,flush=False):29 if IS_PYTHON2:30 string = string.encode(SYSTEM_ENCODE)31 if flush == True:32 sys.stdout.write("\r"+string)33 sys.stdout.flush()34 else:35 sys.stdout.write(string+"\n")36def raw_input_fit(string=""):37 if IS_PYTHON2:38 return input(string.encode(SYSTEM_ENCODE)).decode(SYSTEM_ENCODE)39 else:40 return input(string)41def requests_retry(url,max_retry=0):42 retry = 043 while True:44 if retry > max_retry: 45 return46 try:47 response = requests.request("GET",url,headers={"User-Agent":USER_AGENT},timeout=5,verify=False)48 return response49 except:50 retry = retry + 151def get_img_urls(containerid):52 page = 153 amount = 054 total = 055 urls = []56 while True:57 url = "https://m.weibo.cn/api/container/getIndex?count={}&page={}&containerid={}".format(25,page,containerid)58 response = requests_retry(url=url,max_retry=3)59 if response == None: continue60 json_data = json.loads(response.text)61 if json_data["ok"] != 1: break62 total = json_data["data"]["cardlistInfo"]["total"]63 cards = json_data["data"]["cards"]64 for card in cards:65 amount = amount + 166 print_fit("分析微博中... {}".format(amount),flush=True)67 if "mblog" in card:68 if "pics" in card["mblog"]:69 for pic in card["mblog"]["pics"]:70 if "large" in pic:71 urls.append(pic["large"]["url"])72 page = page + 173 time.sleep(1)74 75 print_fit("\n分析完毕, 微博总数 {}, 实际获得 {}".format(total,amount))76 return urls77def uid_to_containerid(uid):78 if re.search(r'^\d{10}$',uid):79 return "107603" + uid 80def nickname_to_containerid(nickname):81 url = "https://m.weibo.com/n/{}".format(nickname)82 response = requests_retry(url=url) 83 uid_check = re.search(r'(\d{16})',response.url)84 if uid_check:85 return "107603" + uid_check.group(1)[-10:]86def get_containerid(account_type):87 input_tips = ["请输入用户ID: ","请输入用户昵称: "]88 error_info = ["用户ID无效\n","无法找到该用户\n"]89 functions = [uid_to_containerid,nickname_to_containerid]90 input_string = raw_input_fit(input_tips[account_type]).strip()91 if input_string == "":92 return93 containerid = functions[account_type](input_string)94 if containerid == None:95 print_fit(error_info[account_type])96 return97 else:98 return containerid 99def download_and_save(url,index):100 file_type = url[-3:]101 file_name = str(index + 1).zfill(len(str(PIC_AMOUNT))) + "." + file_type 102 file_path = os.path.join(SAVE_PATH,file_name) 103 response = requests_retry(url=url)104 if response == None:105 return index,0106 else:107 f = open(file_path,"wb")108 f.write(response.content)109 f.close()110 return index,1111 112def get_task(urls,miss):113 task = []114 for index in miss:115 task.append(urls[index])116 return task117def main():118 while True:119 home_path = os.path.realpath(raw_input_fit("请输入图片存储位置: "))120 print_fit("选择的目录为 {}".format(home_path))121 if os.path.exists(home_path) == True:122 break123 confirm = raw_input_fit("该目录不存在, 是否创建?(Y/n): ").strip()124 if confirm == "y" or confirm == "Y" or confirm == "":125 try:126 os.makedirs(home_path)127 except:128 print_fit("目录创建失败, 请尝试手动创建, 或者更改目录\n")129 else:130 print_fit("目录已创建")131 break132 else:133 print_fit("请手动创建, 或者更改目录\n")134 135 while True:136 print_fit("请输入要下载的账号类型:\n[1]用户ID [2]用户昵称")137 choice = raw_input_fit("(1/2): ")138 try:139 choice = int(choice)140 except:141 choice = 2142 if choice not in [1,2]:143 choice = 2144 containerid = get_containerid(choice - 1)145 if containerid != None:146 break 147 global SAVE_PATH148 SAVE_PATH = os.path.join(home_path,containerid[6:]) 149 if os.path.exists(SAVE_PATH) == False:150 os.mkdir(SAVE_PATH)151 urls = get_img_urls(containerid)152 153 global PIC_AMOUNT154 PIC_AMOUNT = len(urls)155 print_fit("图片数量 {}".format(PIC_AMOUNT))156 157 max_workers = raw_input_fit("设置下载线程数(1-20): ")158 try:159 max_workers = int(max_workers)160 except:161 max_workers = 20162 if max_workers > 20:163 max_workers = 20164 elif max_workers < 1:165 max_workers = 1166 miss = range(0,PIC_AMOUNT) 167 while True:168 task = get_task(urls,miss)169 executor = concurrent.futures.ThreadPoolExecutor(max_workers = max_workers)170 results = executor.map(download_and_save,task,miss)171 172 miss = []173 for result in results:174 index = result[0]175 status = result[1]176 if status == 0: miss.append(index)177 print_fit("已处理 {dealt}/{amount}, 下载失败 {failed}/{amount}".format(dealt=index+1,failed=len(miss),amount=PIC_AMOUNT),flush=True)178 179 if len(miss) != 0:180 confirm = raw_input_fit("是否继续尝试?(Y/n): ").strip()181 if confirm == "y" or confirm == "Y" or confirm == "":182 continue183 else:184 break185 else:186 print_fit("全部完成")187 break188 189 for index in miss:190 print_fit("图片 {} 下载失败 {}".format(index + 1,urls[index]))191 192 print_fit("下载结束, 路径是 {}".format(SAVE_PATH))193 sys.stdin.read()194 exit()195 196if __name__ == "__main__":...

Full Screen

Full Screen

weibo.py

Source:weibo.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import os3import sys4import locale5import json6import re7import time8import requests9import threading10import datetime11from localdb import DB12try:13 requests.packages.urllib3.disable_warnings(14 requests.packages.urllib3.exceptions.InsecureRequestWarning)15except:16 pass17try:18 reload(sys)19 sys.setdefaultencoding('utf8')20except:21 pass22try:23 input = raw_input24except NameError:25 pass26IS_PYTHON2 = sys.version[0] == "2"27SYSTEM_ENCODE = sys.stdin.encoding or locale.getpreferredencoding(True)28USER_AGENT = "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36"29PIC_AMOUNT = 030total = 031containerid=032SAVE_PATH = ""33pictures = {}34post_db=DB('Post')35picture_db=DB('Picture')36id_db=DB('ID')37def print_fit(string, flush=False):38 if IS_PYTHON2:39 string = string.encode(SYSTEM_ENCODE)40 if flush == True:41 sys.stdout.write("\r" + string)42 sys.stdout.flush()43 else:44 sys.stdout.write(string + "\n")45def requests_retry(url, max_retry=0):46 retry = 047 while True:48 if retry > max_retry:49 return50 try:51 response = requests.request(52 "GET", url, headers={"User-Agent": USER_AGENT}, timeout=5, verify=False)53 return response54 except:55 retry = retry + 156def uid_to_containerid(uid):57 if re.search(r'^\d{10}$', uid):58 return "107603" + uid59def nickname_to_containerid(nickname):60 url = "https://m.weibo.com/n/{}".format(nickname)61 response = requests_retry(url=url)62 uid_check = re.search(r'(\d{16})', response.url)63 if uid_check:64 return "107603" + uid_check.group(1)[-10:]65def get_pic_and_video(card):66 global pictures67 if 'mblog' in card.keys():68 blog = card['mblog']69 try:70 posttime = datetime.datetime.strptime(71 blog['created_at'], '%Y-%m-%d').strftime('%Y%m%d')72 except:73 try:74 year = str(datetime.datetime.now().year)75 posttime = datetime.datetime.strptime(76 year + '-' + blog['created_at'], '%Y-%m-%d').strftime('%Y%m%d')77 except:78 posttime=datetime.datetime.now().strftime('%Y%m%d')79 description = ''.join(re.findall(u'[\u4e00-\u9fa5]',blog['text']))80 pid=blog['id']81 if int(posttime)>=20171001:82 if 'retweeted_status' in blog.keys():83 if "pics" in blog["retweeted_status"]:84 pictures.setdefault(posttime,[])85 for pic in blog["retweeted_status"]["pics"]:86 if "large" in pic:87 picture = pic["large"]["url"]88 pictures.setdefault(posttime,[]).append(picture)89 # elif "page_info" in blog["retweeted_status"]:90 # if "media_info" in blog["retweeted_status"]["page_info"]:91 # video = blog["retweeted_status"]["page_info"]["media_info"]["stream_url"]92 # poster = blog["retweeted_status"]["page_info"]["page_pic"]["url"]93 # videos.append((pid,posttime, description, poster, video))94 else:95 if "pics" in blog:96 pictures.setdefault(posttime,[])97 for pic in blog["pics"]:98 if "large" in pic:99 picture = pic["large"]["url"]100 pictures.setdefault(posttime,[]).append(picture)101 # elif "page_info" in blog:102 # if "media_info" in blog["page_info"]:103 # video = blog["page_info"]["media_info"]["stream_url"]104 # poster = blog["page_info"]["page_pic"]["url"]105 # videos.append((pid,posttime, description, poster, video))106def parse_url(url):107 response = requests_retry(url=url, max_retry=3)108 json_data = response.json()109 try:110 cards = json_data["data"]["cards"]111 for card in cards:112 get_pic_and_video(card)113 except Exception as e:114 print(e)115 print('empty')116def get_img_urls(containerid):117 global pictures118 global total119 id = id_db.select(table='ID',id=containerid)[0]120 page = 1121 amount = 0122 url = "https://m.weibo.cn/api/container/getIndex?count={}&page={}&containerid={}".format(123 25, 1, containerid)124 response = requests_retry(url=url, max_retry=3)125 json_data = response.json()126 total = json_data["data"]["cardlistInfo"]["total"]127 if id is None:128 pages = int(round(total / 25.0, 0))129 elif id.postnum is None:130 pages = int(round(total / 25.0, 0))131 else:132 pages = int(round((total - id.postnum) / 25.0, 0))133 urls = ["https://m.weibo.cn/api/container/getIndex?count={}&page={}&containerid={}".format(134 25, i, containerid) for i in range(1, pages + 1)]135 threads = []136 for url in urls:137 t = threading.Thread(target=parse_url, args=(url,))138 threads.append(t)139 for t in threads:140 t.start()141 for t in threads:142 t.join()143 print_fit("\n分析完毕,图片数 {}".format( len(pictures.items())))144def write(name,containerid):145 global pictures146 for k,picture in pictures.items():147 id=str(containerid)+str(k)148 title=u'微博采集-'+name+'-'+str(k)149 cate=u'微博'150 tags=name151 poster=picture[0]152 post_db.insertnew(id=id,name=title,poster=poster,category=cate,tags=tags,status=False)153 for idx,url in enumerate(picture):154 sys.stdout.write('weibo {} day {} insert {} picture\r'.format(name,k,idx))155 sys.stdout.flush()156 data=picture_db.insertnew(pid=id,subid=idx,url=url)157def main(name):158 global containerid159 global total160 containerid = nickname_to_containerid(name)161 #containerid = name162 get_img_urls(containerid)163 if id_db.select(table='ID',id=containerid)[0]==None:164 id_db.insertnew(id=containerid,postnum=total)165 else:166 id_db.update(id=containerid,postnum=total)167 write(name,containerid)168if __name__ == "__main__":169 #name = sys.argv[1]170 #name = unicode(name.strip())171 #main(name)172 namelist=[]173 with open('wblist.txt','r') as f:174 for name in f.readlines():175 namelist.append(unicode(name.strip()))176 for name in namelist:177 print 'start crawler weibo user:{}'.format(name)...

Full Screen

Full Screen

encoding.py

Source:encoding.py Github

copy

Full Screen

...64# These interpreters handle communication with system APIs using Unicode.65if PY3 or JYTHON or IRONPYTHON:66 def system_decode(string):67 return string if is_unicode(string) else unic(string)68 def system_encode(string, errors='replace'):69 return string if is_unicode(string) else unic(string)70else:71 def system_decode(string):72 """Decodes bytes from system (e.g. cli args or env vars) to Unicode."""73 try:74 return string.decode(SYSTEM_ENCODING)75 except UnicodeError:76 return unic(string)77 def system_encode(string, errors='replace'):78 """Encodes Unicode to system encoding (e.g. cli args and env vars).79 Non-Unicode values are first converted to Unicode.80 """81 if not is_unicode(string):82 string = unic(string)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Robotframework automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful