How to use _get_bucket_name method in localstack

Best Python code snippet using localstack_python

storage_client.py

Source:storage_client.py Github

copy

Full Screen

...135 self._storage_client = storage_client136 def schemes(self) -> Set[str]:137 return {'gs'}138 @staticmethod139 def _get_bucket_name(url: str) -> Tuple[str, str]:140 parsed = urllib.parse.urlparse(url)141 if parsed.scheme != 'gs':142 raise ValueError(f"invalid scheme, expected gs: {parsed.scheme}")143 name = parsed.path144 if name:145 assert name[0] == '/'146 name = name[1:]147 return (parsed.netloc, name)148 async def open(self, url: str) -> ReadableStream:149 bucket, name = self._get_bucket_name(url)150 return await self._storage_client.get_object(bucket, name)151 async def create(self, url: str) -> WritableStream:152 bucket, name = self._get_bucket_name(url)153 return await self._storage_client.insert_object(bucket, name)154 async def mkdir(self, url: str) -> None:155 pass156 async def statfile(self, url: str) -> GetObjectFileStatus:157 try:158 bucket, name = self._get_bucket_name(url)159 return GetObjectFileStatus(await self._storage_client.get_object_metadata(bucket, name))160 except aiohttp.ClientResponseError as e:161 if e.status == 404:162 raise FileNotFoundError(url) from e163 raise164 async def _listfiles_recursive(self, bucket: str, name: str) -> AsyncIterator[FileListEntry]:165 assert name.endswith('/')166 params = {167 'prefix': name168 }169 async for page in await self._storage_client.list_objects(bucket, params=params):170 prefixes = page.get('prefixes')171 assert not prefixes172 items = page.get('items')173 if items is not None:174 for item in page['items']:175 yield GoogleStorageFileListEntry(f'gs://{bucket}/{item["name"]}', item)176 async def _listfiles_flat(self, bucket: str, name: str) -> AsyncIterator[FileListEntry]:177 assert name.endswith('/')178 params = {179 'prefix': name,180 'delimiter': '/',181 'includeTrailingDelimiter': 'true'182 }183 async for page in await self._storage_client.list_objects(bucket, params=params):184 prefixes = page.get('prefixes')185 if prefixes:186 for prefix in prefixes:187 assert prefix.endswith('/')188 url = f'gs://{bucket}/{prefix}'189 yield GoogleStorageFileListEntry(url, None)190 items = page.get('items')191 if items:192 for item in page['items']:193 yield GoogleStorageFileListEntry(f'gs://{bucket}/{item["name"]}', item)194 async def listfiles(self, url: str, recursive: bool = False) -> AsyncIterator[FileListEntry]:195 bucket, name = self._get_bucket_name(url)196 if not name.endswith('/'):197 name = f'{name}/'198 if recursive:199 it = self._listfiles_recursive(bucket, name)200 else:201 it = self._listfiles_flat(bucket, name)202 it = it.__aiter__()203 try:204 first_entry = await it.__anext__()205 except StopAsyncIteration:206 raise FileNotFoundError(url) # pylint: disable=raise-missing-from207 async def cons(first_entry, it):208 yield first_entry209 try:210 while True:211 yield await it.__anext__()212 except StopAsyncIteration:213 pass214 return cons(first_entry, it)215 async def isfile(self, url: str) -> bool:216 try:217 bucket, name = self._get_bucket_name(url)218 await self._storage_client.get_object_metadata(bucket, name)219 return True220 except aiohttp.ClientResponseError as e:221 if e.status == 404:222 return False223 raise224 async def isdir(self, url: str) -> bool:225 bucket, name = self._get_bucket_name(url)226 assert name.endswith('/')227 params = {228 'prefix': name,229 'delimiter': '/',230 'includeTrailingDelimiter': 'true',231 'maxResults': 1232 }233 async for page in await self._storage_client.list_objects(bucket, params=params):234 prefixes = page.get('prefixes')235 items = page.get('items')236 return prefixes or items237 assert False # unreachable238 async def remove(self, url: str) -> None:239 bucket, name = self._get_bucket_name(url)240 await self._storage_client.delete_object(bucket, name)241 async def rmtree(self, url: str) -> None:242 try:243 async for entry in await self.listfiles(url, recursive=True):244 await self.remove(await entry.url())245 except FileNotFoundError:246 pass247 async def close(self) -> None:248 await self._storage_client.close()...

Full Screen

Full Screen

s3.py

Source:s3.py Github

copy

Full Screen

...22 self._s3 = kappa.awsclient.create_client('s3', context.session)23 self._lambda = kappa.awsclient.create_client('lambda', context.session)24 def _make_notification_id(self, function_name):25 return 'Kappa-%s-notification' % function_name26 def _get_bucket_name(self):27 return self.arn.split(':')[-1]28 def _get_notification_spec(self, function):29 notification_spec = {30 'Id': self._make_notification_id(function.name),31 'Events': [e for e in self._config['events']],32 'LambdaFunctionArn': function.arn,33 }34 # Add S3 key filters35 if 'key_filters' in self._config:36 filters_spec = { 'Key' : { 'FilterRules' : [] } }37 for filter in self._config['key_filters']:38 if 'type' in filter and 'value' in filter and filter['type'] in ('prefix', 'suffix'):39 rule = { 'Name' : filter['type'].capitalize(), 'Value' : filter['value'] }40 filters_spec['Key']['FilterRules'].append(rule)41 notification_spec['Filter'] = filters_spec42 return notification_spec43 def add(self, function):44 existingPermission={}45 try:46 response = self._lambda.call('get_policy',47 FunctionName=function.name)48 existingPermission = self.arn in str(response['Policy'])49 except Exception:50 LOG.debug('S3 event source permission not available')51 if not existingPermission:52 response = self._lambda.call('add_permission',53 FunctionName=function.name,54 StatementId=str(uuid.uuid4()),55 Action='lambda:InvokeFunction',56 Principal='s3.amazonaws.com',57 SourceArn=self.arn)58 LOG.debug(response)59 else:60 LOG.debug('S3 event source permission already exists')61 new_notification_spec = self._get_notification_spec(function)62 notification_spec_list = []63 try:64 response = self._s3.call(65 'get_bucket_notification_configuration',66 Bucket=self._get_bucket_name())67 LOG.debug(response)68 notification_spec_list = response['LambdaFunctionConfigurations']69 except Exception as exc:70 LOG.debug('Unable to get existing S3 event source notification configurations')71 if new_notification_spec not in notification_spec_list:72 notification_spec_list.append(new_notification_spec)73 else: 74 notification_spec_list=[]75 LOG.debug("S3 event source already exists")76 if notification_spec_list:77 notification_configuration = {78 'LambdaFunctionConfigurations': notification_spec_list79 }80 try:81 response = self._s3.call(82 'put_bucket_notification_configuration',83 Bucket=self._get_bucket_name(),84 NotificationConfiguration=notification_configuration)85 LOG.debug(response)86 except Exception as exc:87 LOG.debug(exc.response)88 LOG.exception('Unable to add S3 event source')89 enable = add90 def update(self, function):91 self.add(function)92 def remove(self, function):93 notification_spec = self._get_notification_spec(function)94 LOG.debug('removing s3 notification')95 response = self._s3.call(96 'get_bucket_notification_configuration',97 Bucket=self._get_bucket_name())98 LOG.debug(response)99 if 'LambdaFunctionConfigurations' in response:100 notification_spec_list = response['LambdaFunctionConfigurations']101 if notification_spec in notification_spec_list:102 notification_spec_list.remove(notification_spec)103 response['LambdaFunctionConfigurations'] = notification_spec_list104 del response['ResponseMetadata']105 response = self._s3.call(106 'put_bucket_notification_configuration',107 Bucket=self._get_bucket_name(),108 NotificationConfiguration=response)109 LOG.debug(response)110 disable = remove111 def status(self, function):112 LOG.debug('status for s3 notification for %s', function.name)113 notification_spec = self._get_notification_spec(function)114 response = self._s3.call(115 'get_bucket_notification_configuration',116 Bucket=self._get_bucket_name())117 LOG.debug(response)118 if 'LambdaFunctionConfigurations' not in response:119 return None120 121 notification_spec_list = response['LambdaFunctionConfigurations']122 if notification_spec not in notification_spec_list:123 return None124 125 return {126 'EventSourceArn': self.arn,127 'State': 'Enabled'...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful