How to use raw_request_data method in autotest

Best Python code snippet using autotest_python

hunter_handler.py

Source:hunter_handler.py Github

copy

Full Screen

1#!/ usr/bin/env2# coding=utf-83#4# Copyright 2019 ztosec & https://sec.zto.com/5#6# Licensed under the Apache License, Version 2.0 (the "License"); you may7# not use this file except in compliance with the License. You may obtain8# a copy of the License at9#10# http://www.apache.org/licenses/LICENSE-2.011#12# Unless required by applicable law or agreed to in writing, software13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the15# License for the specific language governing permissions and limitations16# under the License.17"""18author: b5mali419"""20import os21import re22import base6423import json24import sys25import tornado.httpserver26import tornado.ioloop27import tornado.iostream28import tornado.web29import tornado.curl_httpclient30try:31 from common import log32except (ModuleNotFoundError, ImportError):33 HUNTER_PATH = "{}/../".format(os.path.dirname(os.path.abspath(__file__)))34 sys.path.insert(0, HUNTER_PATH)35finally:36 from common import log37 from hunter_celery import scan_celery38 from networkproxy import CACERT_FILE39 from networkproxy import CERTKEY_FILE40 from networkproxy import CAKEY_FILE41 from networkproxy.socket_wrapper import wrap_socket42 from model.network_proxy import NetWorkProxyConfig, NetWorkProxyConfigService43 from networkproxy.authentication import auth_login44 from api.service.redis_service import RedisService45 from model.default_value import TaskStatus46 from common.path import HUNTER_PATH47 from common.http_util import StatusCode48 from networkproxy.proxy_handler import ProxyHandler49 from networkproxy import get_http_server50 from networkproxy import set_http_server51logger = log.get_default_logger()52class HunterHandler(ProxyHandler):53 def request_handler(self, request, user_info):54 """55 将请求发送到MQ中56 :param request: 57 :return: 58 Simple example code:59 print(request.body_arguments)60 print(request.headers)61 print(request.body)62 print(request.cookies)63 print(request.version)64 print(request.protocol)65 print(request.host_name)66 print(request.uri)67 print(request.method)68 """69 if not user_info:70 return71 task_id = user_info.current_task_id72 current_user_name = user_info.user_name73 raw_request_data = self.wrap_request(request, user_info)74 # 是否为满足条件的请求75 current_task = RedisService.get_task(task_id)76 if current_task and "hook_rule" in current_task:77 # *.xx.com78 hook_rule = str(current_task.hook_rule).replace("*", ".*")79 if not str(raw_request_data["data"]["url"]).startswith(hook_rule) and re.match(r'' + hook_rule,80 raw_request_data["data"][81 "url"], re.S) is None:82 return83 if RedisService.create_urlclassifications(task_id, raw_request_data):84 logger.info("满足正则条件,发送流量到MQ中")85 scan_celery.delay(raw_request_data["data"], task_id, current_user_name, TaskStatus.NONE)86 def wrap_request(self, request, user_info):87 """88 转换请求数据格式89 :param request: 90 :return: 91 """92 from parser.base_traffic_parser import BaseTrafficParser93 raw_request_data = dict()94 url = request.uri95 if url is None or not url.startswith("http"):96 url = request.protocol + "://" + request.host_name + request.uri97 method = request.method98 headers = request.headers._dict99 request_wraper = {"data": request.body.decode("utf-8"), "type": "hunter-proxy", "url": url, "method": method,100 "parser": BaseTrafficParser.DEAFAULT_PARSER,101 "headers": json.dumps(headers), "requestid": None}102 raw_request_data["data"] = request_wraper103 return raw_request_data104 def retrieve_credentials(self):105 """106 弹出账号密码基础认证,成功则写session107 :return: 108 """109 auth_header = self.request.headers.get('Authorization', None)110 proxy_session_id = self.get_cookie('proxy_sessionid', None)111 if auth_header is not None:112 # Basic Zm9vOmJhcg==113 auth_mode, auth_base64 = auth_header.split(' ', 1)114 assert auth_mode == 'Basic'115 auth_username, auth_password = base64.b64decode(auth_base64).decode("UTF-8").split(':', 1)116 status, user_info = auth_login(auth_username, auth_password, proxy_session_id)117 # 认证失败118 if not status:119 self.write("认证失败,请确认账号密码是否正确")120 self.set_status(401)121 self.set_header('WWW-Authenticate', 'Basic realm="hunter"')122 else:123 self.set_cookie("proxy_sessionid", user_info["proxy_sessionid"])124 # 任务状态为关闭,或者任务不存在125 if "current_task_id" not in user_info or ("current_task_id" in user_info126 and user_info["current_task_id"] != ""127 and RedisService.get_task(128 user_info.current_task_id).status != str(TaskStatus.WORKING)):129 self.write("后台无正在运行的任务,你需要重建一个新任务")130 self.set_status(400)131 self.finish()132 status = False133 return status, user_info134 else:135 self.set_status(401)136 self.set_header('WWW-Authenticate', 'Basic realm="hunter"')137 self.finish()138 return False, None139 def show_cacert_page(self):140 """141 下载CA证书142 :return: 143 """144 html_content = """145 <html><head><title>Burp Suite Professional</title>146 <style type="text/css">147 body { background: #dedede; font-family: Arial, sans-serif; color: #404042; -webkit-font-smoothing: antialiased; }148 #container { padding: 0 15px; margin: 10px auto; background-color: #ffffff; }149 a { word-wrap: break-word; }150 a:link, a:visited { color: #e06228; text-decoration: none; }151 a:hover, a:active { color: #404042; text-decoration: underline; }152 h1 { font-size: 1.6em; line-height: 1.2em; font-weight: normal; color: #404042; }153 h2 { font-size: 1.3em; line-height: 1.2em; padding: 0; margin: 0.8em 0 0.3em 0; font-weight: normal; color: #404042;}154 .title, .navbar { color: #ffffff; background: #70BAFE; padding: 10px 15px; margin: 0 -15px 10px -15px; overflow: hidden; }155 .title h1 { color: #ffffff; padding: 0; margin: 0; font-size: 1.8em; }156 div.navbar {position: absolute; top: 18px; right: 25px;}div.navbar ul {list-style-type: none; margin: 0; padding: 0;}157 div.navbar li {display: inline; margi-left: 20px;}158 div.navbar a {color: white; padding: 10px}159 div.navbar a:hover, div.navbar a:active {text-decoration: none; background: #404042;}160 </style>161 </head>162 <body>163 <div id="container">164 <div class="title"><h1>Hunter Proxy</h1></div>165 <div class="navbar"><ul>166 <li><a href="/cert">CA Certificate</a></li>167 </ul></div>168 <p>Welcome to Hunter Proxy.</p><p>&nbsp;</p>169 </div>170 </body>171 </html>172 """173 self.set_status(200)174 self.write(html_content)175 self.finish()176 def download_cacert(self):177 """178 下载证书179 :return: 180 """181 self.set_header('Content-Type', 'application/octet-stream')182 self.set_header('Content-Disposition', 'attachment; filename=ca.crt')183 # 读取的模式需要根据实际情况进行修改184 with open(CACERT_FILE, 'rb') as f:185 while True:186 data = f.read(1)187 if not data:188 break189 self.write(data)190 self.finish()191 def handle_hunter_cacert_page(self):192 """193 处理证书页面194 :return: 195 """196 if self.request.host == "hunterca":197 if self.request.uri == "http://hunterca/cert":198 self.download_cacert()199 else:200 self.show_cacert_page()201 return202 def handle_hunter_authentication_record(self):203 """204 处理认证,保存流量到mq205 :return: 206 """207 user_info = None208 # 只对非白名单其属于要测试站点等我 开启401认证209 if not NetWorkProxyConfigService.is_white_hosts(self.request.host):210 status, user_info = self.retrieve_credentials()211 if not status or user_info is None:212 return213 # Hook request214 self.request_handler(self.request, user_info)215 @tornado.web.asynchronous216 def get(self):217 """218 下载证书页面219 :return: 220 """221 self.handle_hunter_cacert_page()222 self.handle_hunter_authentication_record()...

Full Screen

Full Screen

event_manager.py

Source:event_manager.py Github

copy

Full Screen

1from constants import *2import threading 3class EventManager:4 def __init__(self, game, network_manager):5 self.game = game6 self.network_manager = network_manager7 self.request_queue = []8 self.response_queue = []9 self.request_handler={10 1: self.game.join_game_handler,11 4: self.game.receive_guest_handler12 }13 14 self.lock = threading.Lock()15 def push_request(self, raw_request_data):16 request = self.extract_raw_request(raw_request_data)17 if request:18 self.lock.acquire()19 self.request_queue.append(request)20 self.lock.release()21 else:22 print("Invalid event data: ", data)23 def push_response(self, response):24 self.response_queue.append(response)25 def process_request_queue(self):26 if len(self.request_queue) == 0:27 return28 self.lock.acquire()29 for request in self.request_queue:30 print("Handling request: ", request)31 handler = self.request_handler.get(request.type, lambda : 'Not register handler')32 handler(request.data)33 self.request_queue.clear()34 self.lock.release()35 def process_response_queue(self):36 if len(self.response_queue) == 0:37 return38 for response in self.response_queue:39 self.network_manager.send(response)40 self.response_queue.clear()41 def post_process_response_queue(self):42 self.process_response_queue()43 def extract_raw_request(self, raw_request_data):44 try:45 contents = raw_request_data.raw_content.strip().split("\n")46 event_type = int(contents[0])47 return Request(event_type, RequestData(contents[1:], self.network_manager.get_socket_id(raw_request_data.sock)))48 except Exception as e:49 print("Exception in extract_raw_request: ", e)...

Full Screen

Full Screen

decode_request.py

Source:decode_request.py Github

copy

Full Screen

1import json2def decode_request(req):3 raw_request_data = req.get_data()4 charset = req.mimetype_params.get('charset') or 'UTF-8'5 request_dic = json.loads(raw_request_data.decode(charset, 'replace'))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful