How to use _do_start_ssl_proxy_with_listener method in localstack

Best Python code snippet using localstack_python

proxy_server.py

Source:proxy_server.py Github

copy

Full Screen

...80 server = _do_start_ssl_proxy_with_client_auth(81 port, target, client_cert_key=client_cert_key82 )83 else:84 server = _do_start_ssl_proxy_with_listener(port, target)85 if not asynchronous:86 server.join()87 return server88 def _run(*args):89 return _do_start_ssl_proxy(port, target, target_ssl=target_ssl)90 if not asynchronous:91 return _run()92 proxy = FuncThread(_run)93 TMP_THREADS.append(proxy)94 proxy.start()95 return proxy96def _do_start_ssl_proxy(port: int, target: PortOrUrl, target_ssl=False):97 import pproxy98 from localstack.services.generic_proxy import GenericProxy99 if ":" not in str(target):100 target = "127.0.0.1:%s" % target101 LOG.debug("Starting SSL proxy server %s -> %s", port, target)102 # create server and remote connection103 server = pproxy.Server("secure+tunnel://0.0.0.0:%s" % port)104 target_proto = "ssl+tunnel" if target_ssl else "tunnel"105 remote = pproxy.Connection("%s://%s" % (target_proto, target))106 args = dict(rserver=[remote], verbose=print)107 # set SSL contexts108 _, cert_file_name, key_file_name = GenericProxy.create_ssl_cert()109 for context in pproxy.server.sslcontexts:110 context.load_cert_chain(cert_file_name, key_file_name)111 loop = ensure_event_loop()112 handler = loop.run_until_complete(server.start_server(args))113 try:114 loop.run_forever()115 except KeyboardInterrupt:116 print("exit!")117 handler.close()118 loop.run_until_complete(handler.wait_closed())119 loop.run_until_complete(loop.shutdown_asyncgens())120 loop.close()121def _do_start_ssl_proxy_with_client_auth(122 port: int, target: PortOrUrl, client_cert_key: Tuple[str, str]123):124 # prepare cert files (TODO: check whether/how we can pass cert strings to requests.request(..) directly)125 cert_file = client_cert_key[0]126 if not os.path.exists(cert_file):127 cert_file = new_tmp_file()128 save_file(cert_file, client_cert_key[0])129 key_file = client_cert_key[1]130 if not os.path.exists(key_file):131 key_file = new_tmp_file()132 save_file(key_file, client_cert_key[1])133 cert_params = (cert_file, key_file)134 # start proxy135 requests_kwargs = {"cert": cert_params}136 result = _do_start_ssl_proxy_with_listener(port, target, requests_kwargs=requests_kwargs)137 return result138def _do_start_ssl_proxy_with_listener(139 port: int, target: PortOrUrl, requests_kwargs: Dict[str, Any] = None140):141 target = f"http://localhost:{target}" if isinstance(target, int) else target142 base_url = f"{'https://' if '://' not in target else ''}{target.rstrip('/')}"143 requests_kwargs = requests_kwargs or {}144 # define forwarding listener145 class Listener(ProxyListener):146 def forward_request(self, method, path, data, headers):147 # send request to target148 url = f"{base_url}{path}"149 response = requests.request(150 method=method, url=url, data=data, headers=headers, verify=False, **requests_kwargs151 )152 # fix encoding of response, based on Accept-Encoding header...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful