How to use strip_chunk_signatures method in localstack

Best Python code snippet using localstack_python

s3_listener.py

Source:s3_listener.py Github

copy

Full Screen

...139 for allowed in allowed_origins:140 if origin in allowed or re.match(allowed.replace('*', '.*'), origin):141 response.headers['Access-Control-Allow-Origin'] = origin142 break143def strip_chunk_signatures(data):144 # For clients that use streaming v4 authentication, the request contains chunk signatures145 # in the HTTP body (see example below) which we need to strip as moto cannot handle them146 #147 # 17;chunk-signature=6e162122ec4962bea0b18bc624025e6ae4e9322bdc632762d909e87793ac5921148 # <payload data ...>149 # 0;chunk-signature=927ab45acd82fc90a3c210ca7314d59fedc77ce0c914d79095f8cc9563cf2c70150 data_new = re.sub(r'^[0-9a-fA-F]+;chunk-signature=[0-9a-f]{64}', '', data, flags=re.MULTILINE)151 if data_new != data:152 # trim \r (13) or \n (10)153 for i in range(0, 2):154 if ord(data_new[0]) in (10, 13):155 data_new = data_new[1:]156 for i in range(0, 6):157 if ord(data_new[-1]) in (10, 13):158 data_new = data_new[:-1]159 return data_new160def update_s3(method, path, data, headers, response=None, return_forward_info=False):161 if return_forward_info:162 modified_data = None163 # If this request contains streaming v4 authentication signatures, strip them from the message164 # Related isse: https://github.com/atlassian/localstack/issues/98165 # TODO we should evaluate whether to replace moto s3 with scality/S3:166 # https://github.com/scality/S3/issues/237167 if headers.get('x-amz-content-sha256') == 'STREAMING-AWS4-HMAC-SHA256-PAYLOAD':168 modified_data = strip_chunk_signatures(data)169 # persist this API call to disk170 persistence.record('s3', method, path, data, headers)171 parsed = urlparse.urlparse(path)172 query = parsed.query173 path = parsed.path174 bucket = path.split('/')[1]175 query_map = urlparse.parse_qs(query)176 if method == 'PUT' and (query == 'notification' or 'notification' in query_map):177 tree = ET.fromstring(data)178 for dest in ['Queue', 'Topic', 'CloudFunction']:179 config = tree.find('{%s}%sConfiguration' % (XMLNS_S3, dest))180 if config is not None and len(config):181 S3_NOTIFICATIONS[bucket] = {182 'Id': get_xml_text(config, 'Id'),...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful