Best Python code snippet using localstack_python
storage_client.py
Source:storage_client.py  
...135        self._storage_client = storage_client136    def schemes(self) -> Set[str]:137        return {'gs'}138    @staticmethod139    def _get_bucket_name(url: str) -> Tuple[str, str]:140        parsed = urllib.parse.urlparse(url)141        if parsed.scheme != 'gs':142            raise ValueError(f"invalid scheme, expected gs: {parsed.scheme}")143        name = parsed.path144        if name:145            assert name[0] == '/'146            name = name[1:]147        return (parsed.netloc, name)148    async def open(self, url: str) -> ReadableStream:149        bucket, name = self._get_bucket_name(url)150        return await self._storage_client.get_object(bucket, name)151    async def create(self, url: str) -> WritableStream:152        bucket, name = self._get_bucket_name(url)153        return await self._storage_client.insert_object(bucket, name)154    async def mkdir(self, url: str) -> None:155        pass156    async def statfile(self, url: str) -> GetObjectFileStatus:157        try:158            bucket, name = self._get_bucket_name(url)159            return GetObjectFileStatus(await self._storage_client.get_object_metadata(bucket, name))160        except aiohttp.ClientResponseError as e:161            if e.status == 404:162                raise FileNotFoundError(url) from e163            raise164    async def _listfiles_recursive(self, bucket: str, name: str) -> AsyncIterator[FileListEntry]:165        assert name.endswith('/')166        params = {167            'prefix': name168        }169        async for page in await self._storage_client.list_objects(bucket, params=params):170            prefixes = page.get('prefixes')171            assert not prefixes172            items = page.get('items')173            if items is not None:174                for item in page['items']:175                    yield GoogleStorageFileListEntry(f'gs://{bucket}/{item["name"]}', item)176    async def _listfiles_flat(self, bucket: str, name: str) -> AsyncIterator[FileListEntry]:177        assert name.endswith('/')178        params = {179            'prefix': name,180            'delimiter': '/',181            'includeTrailingDelimiter': 'true'182        }183        async for page in await self._storage_client.list_objects(bucket, params=params):184            prefixes = page.get('prefixes')185            if prefixes:186                for prefix in prefixes:187                    assert prefix.endswith('/')188                    url = f'gs://{bucket}/{prefix}'189                    yield GoogleStorageFileListEntry(url, None)190            items = page.get('items')191            if items:192                for item in page['items']:193                    yield GoogleStorageFileListEntry(f'gs://{bucket}/{item["name"]}', item)194    async def listfiles(self, url: str, recursive: bool = False) -> AsyncIterator[FileListEntry]:195        bucket, name = self._get_bucket_name(url)196        if not name.endswith('/'):197            name = f'{name}/'198        if recursive:199            it = self._listfiles_recursive(bucket, name)200        else:201            it = self._listfiles_flat(bucket, name)202        it = it.__aiter__()203        try:204            first_entry = await it.__anext__()205        except StopAsyncIteration:206            raise FileNotFoundError(url)  # pylint: disable=raise-missing-from207        async def cons(first_entry, it):208            yield first_entry209            try:210                while True:211                    yield await it.__anext__()212            except StopAsyncIteration:213                pass214        return cons(first_entry, it)215    async def isfile(self, url: str) -> bool:216        try:217            bucket, name = self._get_bucket_name(url)218            await self._storage_client.get_object_metadata(bucket, name)219            return True220        except aiohttp.ClientResponseError as e:221            if e.status == 404:222                return False223            raise224    async def isdir(self, url: str) -> bool:225        bucket, name = self._get_bucket_name(url)226        assert name.endswith('/')227        params = {228            'prefix': name,229            'delimiter': '/',230            'includeTrailingDelimiter': 'true',231            'maxResults': 1232        }233        async for page in await self._storage_client.list_objects(bucket, params=params):234            prefixes = page.get('prefixes')235            items = page.get('items')236            return prefixes or items237        assert False  # unreachable238    async def remove(self, url: str) -> None:239        bucket, name = self._get_bucket_name(url)240        await self._storage_client.delete_object(bucket, name)241    async def rmtree(self, url: str) -> None:242        try:243            async for entry in await self.listfiles(url, recursive=True):244                await self.remove(await entry.url())245        except FileNotFoundError:246            pass247    async def close(self) -> None:248        await self._storage_client.close()...s3.py
Source:s3.py  
...22        self._s3 = kappa.awsclient.create_client('s3', context.session)23        self._lambda = kappa.awsclient.create_client('lambda', context.session)24    def _make_notification_id(self, function_name):25        return 'Kappa-%s-notification' % function_name26    def _get_bucket_name(self):27        return self.arn.split(':')[-1]28    def _get_notification_spec(self, function):29            notification_spec = {30                'Id': self._make_notification_id(function.name),31                'Events': [e for e in self._config['events']],32                'LambdaFunctionArn': function.arn,33            }34            # Add S3 key filters35            if 'key_filters' in self._config:36                filters_spec = { 'Key' : { 'FilterRules' : [] } }37                for filter in self._config['key_filters']:38                    if 'type' in filter and 'value' in filter and filter['type'] in ('prefix', 'suffix'):39                        rule = { 'Name' : filter['type'].capitalize(), 'Value' : filter['value'] }40                        filters_spec['Key']['FilterRules'].append(rule)41                notification_spec['Filter'] = filters_spec42            return notification_spec43    def add(self, function):44        existingPermission={}45        try:46            response = self._lambda.call('get_policy',47                                     FunctionName=function.name)48            existingPermission = self.arn in str(response['Policy'])49        except Exception:50            LOG.debug('S3 event source permission not available')51        if not existingPermission:52            response = self._lambda.call('add_permission',53                                         FunctionName=function.name,54                                         StatementId=str(uuid.uuid4()),55                                         Action='lambda:InvokeFunction',56                                         Principal='s3.amazonaws.com',57                                         SourceArn=self.arn)58            LOG.debug(response)59        else:60            LOG.debug('S3 event source permission already exists')61        new_notification_spec = self._get_notification_spec(function)62        notification_spec_list = []63        try:64            response = self._s3.call(65                'get_bucket_notification_configuration',66                Bucket=self._get_bucket_name())67            LOG.debug(response)68            notification_spec_list = response['LambdaFunctionConfigurations']69        except Exception as exc:70            LOG.debug('Unable to get existing S3 event source notification configurations')71        if new_notification_spec not in notification_spec_list:72            notification_spec_list.append(new_notification_spec)73        else:       74            notification_spec_list=[]75            LOG.debug("S3 event source already exists")76        if notification_spec_list:77            notification_configuration = {78                'LambdaFunctionConfigurations': notification_spec_list79            }80            try:81                response = self._s3.call(82                    'put_bucket_notification_configuration',83                    Bucket=self._get_bucket_name(),84                    NotificationConfiguration=notification_configuration)85                LOG.debug(response)86            except Exception as exc:87                LOG.debug(exc.response)88                LOG.exception('Unable to add S3 event source')89    enable = add90    def update(self, function):91        self.add(function)92    def remove(self, function):93        notification_spec = self._get_notification_spec(function)94        LOG.debug('removing s3 notification')95        response = self._s3.call(96            'get_bucket_notification_configuration',97            Bucket=self._get_bucket_name())98        LOG.debug(response)99        if 'LambdaFunctionConfigurations' in response:100            notification_spec_list = response['LambdaFunctionConfigurations']101            if notification_spec in notification_spec_list:102                notification_spec_list.remove(notification_spec)103                response['LambdaFunctionConfigurations'] = notification_spec_list104                del response['ResponseMetadata']105                response = self._s3.call(106                    'put_bucket_notification_configuration',107                    Bucket=self._get_bucket_name(),108                    NotificationConfiguration=response)109                LOG.debug(response)110    disable = remove111    def status(self, function):112        LOG.debug('status for s3 notification for %s', function.name)113        notification_spec = self._get_notification_spec(function)114        response = self._s3.call(115            'get_bucket_notification_configuration',116            Bucket=self._get_bucket_name())117        LOG.debug(response)118        if 'LambdaFunctionConfigurations' not in response:119            return None120        121        notification_spec_list = response['LambdaFunctionConfigurations']122        if notification_spec not in notification_spec_list:123            return None124        125        return {126            'EventSourceArn': self.arn,127            'State': 'Enabled'...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
