How to use _get_s3_filter method in localstack

Best Python code snippet using localstack_python

s3_listener.py

Source:s3_listener.py Github

copy

Full Screen

...45 return False46def filter_rules_match(filters, object_path):47 """ check whether the given object path matches all of the given filters """48 filters = filters or {}49 s3_filter = _get_s3_filter(filters)50 for rule in s3_filter.get('FilterRule', []):51 if rule['Name'] == 'prefix':52 if not prefix_with_slash(object_path).startswith(prefix_with_slash(rule['Value'])):53 return False54 elif rule['Name'] == 'suffix':55 if not object_path.endswith(rule['Value']):56 return False57 else:58 LOGGER.warning('Unknown filter name: "%s"' % rule['Name'])59 return True60def _get_s3_filter(filters):61 return filters.get('S3Key', filters.get('Key', {}))62def prefix_with_slash(s):63 return s if s[0] == '/' else '/%s' % s64def get_event_message(event_name, bucket_name, file_name='testfile.txt', file_size=1024):65 # Based on: http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html66 return {67 'Records': [{68 'eventVersion': '2.0',69 'eventSource': 'aws:s3',70 'awsRegion': DEFAULT_REGION,71 'eventTime': timestamp(format=TIMESTAMP_FORMAT_MILLIS),72 'eventName': event_name,73 'userIdentity': {74 'principalId': 'AIDAJDPLRKLG7UEXAMPLE'75 },76 'requestParameters': {77 'sourceIPAddress': '127.0.0.1' # TODO determine real source IP78 },79 'responseElements': {80 'x-amz-request-id': short_uid(),81 'x-amz-id-2': 'eftixk72aD6Ap51TnqcoF8eFidJG9Z/2' # Amazon S3 host that processed the request82 },83 's3': {84 's3SchemaVersion': '1.0',85 'configurationId': 'testConfigRule',86 'bucket': {87 'name': bucket_name,88 'ownerIdentity': {89 'principalId': 'A3NL1KOZZKExample'90 },91 'arn': 'arn:aws:s3:::%s' % bucket_name92 },93 'object': {94 'key': file_name,95 'size': file_size,96 'eTag': 'd41d8cd98f00b204e9800998ecf8427e',97 'versionId': '096fKKXTRTtl3on89fVO.nfljtsv6qko',98 'sequencer': '0055AED6DCD90281E5'99 }100 }101 }]102 }103def queue_url_for_arn(queue_arn):104 sqs_client = aws_stack.connect_to_service('sqs')105 parts = queue_arn.split(':')106 return sqs_client.get_queue_url(QueueName=parts[5],107 QueueOwnerAWSAccountId=parts[4])['QueueUrl']108def send_notifications(method, bucket_name, object_path):109 for bucket, b_cfg in iteritems(S3_NOTIFICATIONS):110 if bucket == bucket_name:111 action = {'PUT': 'ObjectCreated', 'POST': 'ObjectCreated', 'DELETE': 'ObjectRemoved'}[method]112 # TODO: support more detailed methods, e.g., DeleteMarkerCreated113 # http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html114 if action == 'ObjectCreated' and method == 'POST':115 api_method = 'CompleteMultipartUpload'116 else:117 api_method = {'PUT': 'Put', 'POST': 'Post', 'DELETE': 'Delete'}[method]118 event_name = '%s:%s' % (action, api_method)119 if (event_type_matches(b_cfg['Event'], action, api_method) and120 filter_rules_match(b_cfg.get('Filter'), object_path)):121 # send notification122 message = get_event_message(123 event_name=event_name, bucket_name=bucket_name,124 file_name=urlparse.urlparse(object_path[1:]).path125 )126 message = json.dumps(message)127 if b_cfg.get('Queue'):128 sqs_client = aws_stack.connect_to_service('sqs')129 try:130 queue_url = queue_url_for_arn(b_cfg['Queue'])131 sqs_client.send_message(QueueUrl=queue_url, MessageBody=message)132 except Exception as e:133 LOGGER.warning('Unable to send notification for S3 bucket "%s" to SQS queue "%s": %s' %134 (bucket_name, b_cfg['Queue'], e))135 if b_cfg.get('Topic'):136 sns_client = aws_stack.connect_to_service('sns')137 try:138 sns_client.publish(TopicArn=b_cfg['Topic'], Message=message, Subject='Amazon S3 Notification')139 except Exception:140 LOGGER.warning('Unable to send notification for S3 bucket "%s" to SNS topic "%s".' %141 (bucket_name, b_cfg['Topic']))142 # CloudFunction and LambdaFunction are semantically identical143 lambda_function_config = b_cfg.get('CloudFunction') or b_cfg.get('LambdaFunction')144 if lambda_function_config:145 # make sure we don't run into a socket timeout146 connection_config = botocore.config.Config(read_timeout=300)147 lambda_client = aws_stack.connect_to_service('lambda', config=connection_config)148 try:149 lambda_client.invoke(FunctionName=lambda_function_config,150 InvocationType='Event', Payload=message)151 except Exception:152 LOGGER.warning('Unable to send notification for S3 bucket "%s" to Lambda function "%s".' %153 (bucket_name, lambda_function_config))154 if not filter(lambda x: b_cfg.get(x), NOTIFICATION_DESTINATION_TYPES):155 LOGGER.warning('Neither of %s defined for S3 notification.' %156 '/'.join(NOTIFICATION_DESTINATION_TYPES))157def get_cors(bucket_name):158 response = Response()159 cors = BUCKET_CORS.get(bucket_name)160 if not cors:161 # TODO: check if bucket exists, otherwise return 404-like error162 cors = {163 'CORSConfiguration': []164 }165 body = xmltodict.unparse(cors)166 response._content = body167 response.status_code = 200168 return response169def set_cors(bucket_name, cors):170 # TODO: check if bucket exists, otherwise return 404-like error171 if not isinstance(cors, dict):172 cors = xmltodict.parse(cors)173 BUCKET_CORS[bucket_name] = cors174 response = Response()175 response.status_code = 200176 return response177def delete_cors(bucket_name):178 # TODO: check if bucket exists, otherwise return 404-like error179 BUCKET_CORS.pop(bucket_name, {})180 response = Response()181 response.status_code = 200182 return response183def append_cors_headers(bucket_name, request_method, request_headers, response):184 cors = BUCKET_CORS.get(bucket_name)185 if not cors:186 return187 origin = request_headers.get('Origin', '')188 rules = cors['CORSConfiguration']['CORSRule']189 if not isinstance(rules, list):190 rules = [rules]191 for rule in rules:192 # add allow-origin header193 allowed_methods = rule.get('AllowedMethod', [])194 if request_method in allowed_methods:195 allowed_origins = rule.get('AllowedOrigin', [])196 for allowed in allowed_origins:197 if origin in allowed or re.match(allowed.replace('*', '.*'), origin):198 response.headers['Access-Control-Allow-Origin'] = origin199 break200 # add additional headers201 exposed_headers = rule.get('ExposeHeader', [])202 for header in exposed_headers:203 if header.lower() == 'date':204 response.headers[header] = timestamp(format='%a, %d %b %Y %H:%M:%S +0000')205 elif header.lower() == 'etag':206 response.headers[header] = md5(response._content)207 elif header.lower() in ('server', 'x-amz-id-2', 'x-amz-request-id'):208 response.headers[header] = short_uid()209 elif header.lower() == 'x-amz-delete-marker':210 response.headers[header] = 'false'211 elif header.lower() == 'x-amz-version-id':212 # TODO: check whether bucket versioning is enabled and return proper version id213 response.headers[header] = 'null'214def get_lifecycle(bucket_name):215 response = Response()216 lifecycle = BUCKET_LIFECYCLE.get(bucket_name)217 if not lifecycle:218 # TODO: check if bucket exists, otherwise return 404-like error219 lifecycle = {220 'LifecycleConfiguration': {}221 }222 body = xmltodict.unparse(lifecycle)223 response._content = body224 response.status_code = 200225 return response226def set_lifecycle(bucket_name, lifecycle):227 # TODO: check if bucket exists, otherwise return 404-like error228 if isinstance(to_str(lifecycle), six.string_types):229 lifecycle = xmltodict.parse(lifecycle)230 BUCKET_LIFECYCLE[bucket_name] = lifecycle231 response = Response()232 response.status_code = 200233 return response234def strip_chunk_signatures(data):235 # For clients that use streaming v4 authentication, the request contains chunk signatures236 # in the HTTP body (see example below) which we need to strip as moto cannot handle them237 #238 # 17;chunk-signature=6e162122ec4962bea0b18bc624025e6ae4e9322bdc632762d909e87793ac5921239 # <payload data ...>240 # 0;chunk-signature=927ab45acd82fc90a3c210ca7314d59fedc77ce0c914d79095f8cc9563cf2c70241 data_new = re.sub(b'(\r\n)?[0-9a-fA-F]+;chunk-signature=[0-9a-f]{64}(\r\n){,2}', b'',242 data, flags=re.MULTILINE | re.DOTALL)243 if data_new != data:244 # trim \r (13) or \n (10)245 for i in range(0, 2):246 if len(data_new) and data_new[0] in (10, 13):247 data_new = data_new[1:]248 for i in range(0, 6):249 if len(data_new) and data_new[-1] in (10, 13):250 data_new = data_new[:-1]251 return data_new252def check_content_md5(data, headers):253 actual = md5(strip_chunk_signatures(data))254 expected = headers['Content-MD5']255 try:256 expected = to_str(codecs.encode(base64.b64decode(expected), 'hex'))257 except Exception:258 expected = '__invalid__'259 if actual != expected:260 response = Response()261 result = {262 'Error': {263 'Code': 'InvalidDigest',264 'Message': 'The Content-MD5 you specified was invalid'265 }266 }267 response._content = xmltodict.unparse(result)268 response.status_code = 400269 return response270def expand_redirect_url(starting_url, key, bucket):271 """ Add key and bucket parameters to starting URL query string. """272 parsed = urlparse.urlparse(starting_url)273 query = collections.OrderedDict(urlparse.parse_qsl(parsed.query))274 query.update([('key', key), ('bucket', bucket)])275 redirect_url = urlparse.urlunparse((276 parsed.scheme, parsed.netloc, parsed.path,277 parsed.params, urlparse.urlencode(query), None))278 return redirect_url279def get_bucket_name(path, headers):280 parsed = urlparse.urlparse(path)281 # try pick the bucket_name from the path282 bucket_name = parsed.path.split('/')[1]283 host = headers['host']284 # is the hostname not starting a bucket name?285 if host.startswith(HOSTNAME) or host.startswith(HOSTNAME_EXTERNAL):286 return bucket_name287 # matches the common endpoints like288 # - '<bucket_name>.s3.<region>.amazonaws.com'289 # - '<bucket_name>.s3-<region>.amazonaws.com.cn'290 common_pattern = re.compile(r'^(.+)\.s3[.\-][a-z]{2}-[a-z]+-[0-9]{1,}'291 r'\.amazonaws\.com(\.[a-z]+)?$')292 # matches dualstack endpoints like293 # - <bucket_name>.s3.dualstack.<region>.amazonaws.com'294 # - <bucket_name>.s3.dualstack.<region>.amazonaws.com.cn'295 dualstack_pattern = re.compile(r'^(.+)\.s3\.dualstack\.[a-z]{2}-[a-z]+-[0-9]{1,}'296 r'\.amazonaws\.com(\.[a-z]+)?$')297 # matches legacy endpoints like298 # - '<bucket_name>.s3.amazonaws.com'299 # - '<bucket_name>.s3-external-1.amazonaws.com.cn'300 legacy_patterns = re.compile(r'^(.+)\.s3\.?(-external-1)?\.amazonaws\.com(\.[a-z]+)?$')301 # if any of the above patterns match, the first captured group302 # will be returned as the bucket name303 for pattern in [common_pattern, dualstack_pattern, legacy_patterns]:304 match = pattern.match(host)305 if match:306 bucket_name = match.groups()[0]307 break308 # we're either returning the original bucket_name,309 # or a pattern matched the host and we're returning that name instead310 return bucket_name311def handle_notification_request(bucket, method, data):312 response = Response()313 response.status_code = 200314 response._content = ''315 if method == 'GET':316 # TODO check if bucket exists317 result = '<NotificationConfiguration xmlns="%s">' % XMLNS_S3318 if bucket in S3_NOTIFICATIONS:319 notif = S3_NOTIFICATIONS[bucket]320 for dest in NOTIFICATION_DESTINATION_TYPES:321 if dest in notif:322 dest_dict = {323 '%sConfiguration' % dest: {324 'Id': uuid.uuid4(),325 dest: notif[dest],326 'Event': notif['Event'],327 'Filter': notif['Filter']328 }329 }330 result += xmltodict.unparse(dest_dict, full_document=False)331 result += '</NotificationConfiguration>'332 response._content = result333 if method == 'PUT':334 parsed = xmltodict.parse(data)335 notif_config = parsed.get('NotificationConfiguration')336 S3_NOTIFICATIONS.pop(bucket, None)337 for dest in NOTIFICATION_DESTINATION_TYPES:338 config = notif_config.get('%sConfiguration' % (dest))339 if config:340 events = config.get('Event')341 if isinstance(events, six.string_types):342 events = [events]343 event_filter = config.get('Filter', {})344 # make sure FilterRule is an array345 s3_filter = _get_s3_filter(event_filter)346 if s3_filter and not isinstance(s3_filter.get('FilterRule', []), list):347 s3_filter['FilterRule'] = [s3_filter['FilterRule']]348 # create final details dict349 notification_details = {350 'Id': config.get('Id'),351 'Event': events,352 dest: config.get(dest),353 'Filter': event_filter354 }355 # TODO: what if we have multiple destinations - would we overwrite the config?356 S3_NOTIFICATIONS[bucket] = clone(notification_details)357 return response358class ProxyListenerS3(ProxyListener):359 def forward_request(self, method, path, data, headers):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful