Best Python code snippet using localstack_python
awslambda.py
Source:awslambda.py  
1# Copyright 2016-2017 Capital One Services, LLC2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14from __future__ import absolute_import, division, print_function, unicode_literals15import functools16import jmespath17import json18import six19from botocore.exceptions import ClientError20from botocore.paginate import Paginator21from concurrent.futures import as_completed22from c7n.actions import BaseAction, RemovePolicyBase23from c7n.filters import CrossAccountAccessFilter, ValueFilter24import c7n.filters.vpc as net_filters25from c7n.manager import resources26from c7n import query27from c7n.resources.iam import CheckPermissions28from c7n.tags import universal_augment29from c7n.utils import local_session, type_schema, generate_arn30ErrAccessDenied = "AccessDeniedException"31@resources.register('lambda')32class AWSLambda(query.QueryResourceManager):33    class resource_type(object):34        service = 'lambda'35        type = 'function'36        enum_spec = ('list_functions', 'Functions', None)37        name = id = 'FunctionName'38        filter_name = None39        date = 'LastModified'40        dimension = 'FunctionName'41        config_type = "AWS::Lambda::Function"42        universal_taggable = object()43    @property44    def generate_arn(self):45        """ Generates generic arn if ID is not already arn format.46        """47        if self._generate_arn is None:48            self._generate_arn = functools.partial(49                generate_arn,50                self.get_model().service,51                region=self.config.region,52                account_id=self.account_id,53                resource_type=self.get_model().type,54                separator=':')55        return self._generate_arn56    def get_source(self, source_type):57        if source_type == 'describe':58            return DescribeLambda(self)59        elif source_type == 'config':60            return ConfigLambda(self)61        raise ValueError("Unsupported source: %s for %s" % (62            source_type, self.resource_type.config_type))63class DescribeLambda(query.DescribeSource):64    def augment(self, resources):65        return universal_augment(66            self.manager, super(DescribeLambda, self).augment(resources))67class ConfigLambda(query.ConfigSource):68    def load_resource(self, item):69        resource = super(ConfigLambda, self).load_resource(item)70        resource['Tags'] = [71            {u'Key': k, u'Value': v} for k, v in item[72                'supplementaryConfiguration'].get('Tags', {}).items()]73        resource['c7n:Policy'] = item[74            'supplementaryConfiguration'].get('Policy')75        return resource76@AWSLambda.filter_registry.register('security-group')77class SecurityGroupFilter(net_filters.SecurityGroupFilter):78    RelatedIdsExpression = "VpcConfig.SecurityGroupIds[]"79@AWSLambda.filter_registry.register('subnet')80class SubnetFilter(net_filters.SubnetFilter):81    RelatedIdsExpression = "VpcConfig.SubnetIds[]"82@AWSLambda.filter_registry.register('vpc')83class VpcFilter(net_filters.VpcFilter):84    RelatedIdsExpression = "VpcConfig.VpcId"85AWSLambda.filter_registry.register('network-location', net_filters.NetworkLocation)86@AWSLambda.filter_registry.register('check-permissions')87class LambdaPermissions(CheckPermissions):88    def get_iam_arns(self, resources):89        return [r['Role'] for r in resources]90@AWSLambda.filter_registry.register('reserved-concurrency')91class ReservedConcurrency(ValueFilter):92    annotation_key = "c7n:FunctionInfo"93    value_key = '"c7n:FunctionInfo".Concurrency.ReservedConcurrentExecutions'94    schema = type_schema('reserved-concurrency', rinherit=ValueFilter.schema)95    permissions = ('lambda:GetFunction',)96    def validate(self):97        self.data['key'] = self.value_key98        return super(ReservedConcurrency, self).validate()99    def process(self, resources, event=None):100        self.data['key'] = self.value_key101        client = local_session(self.manager.session_factory).client('lambda')102        def _augment(r):103            try:104                r[self.annotation_key] = self.manager.retry(105                    client.get_function, FunctionName=r['FunctionArn'])106                r[self.annotation_key].pop('ResponseMetadata')107            except ClientError as e:108                if e.response['Error']['Code'] == ErrAccessDenied:109                    self.log.warning(110                        "Access denied getting lambda:%s",111                        r['FunctionName'])112                raise113            return r114        with self.executor_factory(max_workers=3) as w:115            resources = list(filter(None, w.map(_augment, resources)))116            return super(ReservedConcurrency, self).process(resources, event)117def get_lambda_policies(client, executor_factory, resources, log):118    def _augment(r):119        try:120            r['c7n:Policy'] = client.get_policy(121                FunctionName=r['FunctionName'])['Policy']122        except client.exceptions.ResourceNotFoundException:123            return None124        except ClientError as e:125            if e.response['Error']['Code'] == 'AccessDeniedException':126                log.warning(127                    "Access denied getting policy lambda:%s",128                    r['FunctionName'])129        return r130    results = []131    futures = {}132    with executor_factory(max_workers=3) as w:133        for r in resources:134            if 'c7n:Policy' in r:135                results.append(r)136                continue137            futures[w.submit(_augment, r)] = r138        for f in as_completed(futures):139            if f.exception():140                log.warning("Error getting policy for:%s err:%s",141                            r['FunctionName'], f.exception())142                r = futures[f]143                continue144            results.append(f.result())145    return filter(None, results)146@AWSLambda.filter_registry.register('event-source')147class LambdaEventSource(ValueFilter):148    # this uses iam policy, it should probably use149    # event source mapping api150    annotation_key = "c7n:EventSources"151    schema = type_schema('event-source', rinherit=ValueFilter.schema)152    permissions = ('lambda:GetPolicy',)153    def process(self, resources, event=None):154        client = local_session(self.manager.session_factory).client('lambda')155        self.log.debug("fetching policy for %d lambdas" % len(resources))156        resources = get_lambda_policies(157            client, self.executor_factory, resources, self.log)158        self.data['key'] = self.annotation_key159        return super(LambdaEventSource, self).process(resources, event)160    def __call__(self, r):161        if 'c7n:Policy' not in r:162            return False163        sources = set()164        data = json.loads(r['c7n:Policy'])165        for s in data.get('Statement', ()):166            if s['Effect'] != 'Allow':167                continue168            if 'Service' in s['Principal']:169                sources.add(s['Principal']['Service'])170            if sources:171                r[self.annotation_key] = list(sources)172        return self.match(r)173@AWSLambda.filter_registry.register('cross-account')174class LambdaCrossAccountAccessFilter(CrossAccountAccessFilter):175    """Filters lambda functions with cross-account permissions176    The whitelist parameter can be used to prevent certain accounts177    from being included in the results (essentially stating that these178    accounts permissions are allowed to exist)179    This can be useful when combining this filter with the delete action.180    :example:181    .. code-block:: yaml182            policies:183              - name: lambda-cross-account184                resource: lambda185                filters:186                  - type: cross-account187                    whitelist:188                      - 'IAM-Policy-Cross-Account-Access'189    """190    permissions = ('lambda:GetPolicy',)191    policy_attribute = 'c7n:Policy'192    def process(self, resources, event=None):193        client = local_session(self.manager.session_factory).client('lambda')194        self.log.debug("fetching policy for %d lambdas" % len(resources))195        resources = get_lambda_policies(196            client, self.executor_factory, resources, self.log)197        return super(LambdaCrossAccountAccessFilter, self).process(198            resources, event)199@AWSLambda.action_registry.register('remove-statements')200class RemovePolicyStatement(RemovePolicyBase):201    """Action to remove policy/permission statements from lambda functions.202    :example:203    .. code-block:: yaml204            policies:205              - name: lambda-remove-cross-accounts206                resource: lambda207                filters:208                  - type: cross-account209                actions:210                  - type: remove-statements211                    statement_ids: matched212    """213    schema = type_schema(214        'remove-statements',215        required=['statement_ids'],216        statement_ids={'oneOf': [217            {'enum': ['matched']},218            {'type': 'array', 'items': {'type': 'string'}}]})219    permissions = ("lambda:GetPolicy", "lambda:RemovePermission")220    def process(self, resources):221        results = []222        client = local_session(self.manager.session_factory).client('lambda')223        for r in resources:224            try:225                if self.process_resource(client, r):226                    results.append(r)227            except Exception:228                self.log.exception(229                    "Error processing lambda %s", r['FunctionArn'])230        return results231    def process_resource(self, client, resource):232        if 'c7n:Policy' not in resource:233            try:234                resource['c7n:Policy'] = client.get_policy(235                    FunctionName=resource['FunctionName']).get('Policy')236            except ClientError as e:237                if e.response['Error']['Code'] != ErrAccessDenied:238                    raise239                resource['c7n:Policy'] = None240        if not resource['c7n:Policy']:241            return242        p = json.loads(resource['c7n:Policy'])243        statements, found = self.process_policy(244            p, resource, CrossAccountAccessFilter.annotation_key)245        if not found:246            return247        for f in found:248            client.remove_permission(249                FunctionName=resource['FunctionName'],250                StatementId=f['Sid'])251@AWSLambda.action_registry.register('set-concurrency')252class SetConcurrency(BaseAction):253    """Set lambda function concurrency to the desired level.254    Can be used to set the reserved function concurrency to an exact value,255    to delete reserved concurrency, or to set the value to an attribute of256    the resource.257    """258    schema = type_schema(259        'set-concurrency',260        required=('value',),261        **{'expr': {'type': 'boolean'},262           'value': {'oneOf': [263               {'type': 'string'},264               {'type': 'integer'},265               {'type': 'null'}]}})266    permissions = ('lambda:DeleteFunctionConcurrency',267                   'lambda:PutFunctionConcurrency')268    def validate(self):269        if self.data.get('expr', False) and not isinstance(self.data['value'], six.text_type):270            raise ValueError("invalid value expression %s" % self.data['value'])271        return self272    def process(self, functions):273        client = local_session(self.manager.session_factory).client('lambda')274        is_expr = self.data.get('expr', False)275        value = self.data['value']276        if is_expr:277            value = jmespath.compile(value)278        none_type = type(None)279        for function in functions:280            fvalue = value281            if is_expr:282                fvalue = value.search(function)283                if isinstance(fvalue, float):284                    fvalue = int(fvalue)285                if isinstance(value, int) or isinstance(value, none_type):286                    self.policy.log.warning(287                        "Function: %s Invalid expression value for concurrency: %s",288                        function['FunctionName'], fvalue)289                    continue290            if fvalue is None:291                client.delete_function_concurrency(292                    FunctionName=function['FunctionName'])293            else:294                client.put_function_concurrency(295                    FunctionName=function['FunctionName'],296                    ReservedConcurrentExecutions=fvalue)297@AWSLambda.action_registry.register('delete')298class Delete(BaseAction):299    """Delete a lambda function (including aliases and older versions).300    :example:301    .. code-block:: yaml302            policies:303              - name: lambda-delete-dotnet-functions304                resource: lambda305                filters:306                  - Runtime: dotnetcore1.0307                actions:308                  - delete309    """310    schema = type_schema('delete')311    permissions = ("lambda:DeleteFunction",)312    def process(self, functions):313        client = local_session(self.manager.session_factory).client('lambda')314        for function in functions:315            try:316                client.delete_function(FunctionName=function['FunctionName'])317            except ClientError as e:318                if e.response['Error']['Code'] == "ResourceNotFoundException":319                    continue320                raise321        self.log.debug("Deleted %d functions", len(functions))322@resources.register('lambda-layer')323class LambdaLayerVersion(query.QueryResourceManager):324    """Note custodian models the lambda layer version.325    Layers end up being a logical asset, the physical asset for use326    and management is the layer verison.327    To ease that distinction, we support querying just the latest328    layer version or having a policy against all layer versions.329    By default we query all versions, the following is an example330    to query just the latest.331    .. code-block:: yaml332        policies:333          - name: lambda-layer334            resource: lambda335            query:336              - version: latest337    """338    class resource_type(object):339        service = 'lambda'340        type = 'function'341        enum_spec = ('list_layers', 'Layers', None)342        name = id = 'LayerName'343        filter_name = None344        date = 'CreatedDate'345        dimension = None346        config_type = None347    def augment(self, resources):348        versions = {}349        for r in resources:350            versions[r['LayerName']] = v = r['LatestMatchingVersion']351            v['LayerName'] = r['LayerName']352        if {'version': 'latest'} in self.data.get('query', []):353            return list(versions.values())354        layer_names = list(versions)355        client = local_session(self.session_factory).client('lambda')356        versions = []357        for layer_name in layer_names:358            pager = get_layer_version_paginator(client)359            for v in pager.paginate(360                    LayerName=layer_name).build_full_result().get('LayerVersions'):361                v['LayerName'] = layer_name362                versions.append(v)363        return versions364def get_layer_version_paginator(client):365    pager = Paginator(366        client.list_layer_versions,367        {'input_token': 'NextToken',368         'output_token': 'NextToken',369         'result_key': 'LayerVersions'},370        client.meta.service_model.operation_model('ListLayerVersions'))371    pager.PAGE_ITERATOR_CLS = query.RetryPageIterator372    return pager373@LambdaLayerVersion.filter_registry.register('cross-account')374class LayerCrossAccount(CrossAccountAccessFilter):375    permissions = ('lambda:GetLayerVersionPolicy',)376    def process(self, resources, event=None):377        client = local_session(self.manager.session_factory).client('lambda')378        for r in resources:379            r['c7n:Policy'] = self.manager.retry(380                client.get_layer_version_policy,381                LayerName=r['LayerName'],382                VersionNumber=r['Version']).get('Policy')383        return super(LayerCrossAccount, self).process(resources)384    def get_resource_policy(self, r):385        return r['c7n:Policy']386@LambdaLayerVersion.action_registry.register('remove-statements')387class LayerRemovePermissions(RemovePolicyBase):388    schema = type_schema(389        'remove-statements',390        required=['statement_ids'],391        statement_ids={'oneOf': [392            {'enum': ['matched']},393            {'type': 'array', 'items': {'type': 'string'}}]})394    permissions = (395        "lambda:GetLayerVersionPolicy",396        "lambda:RemoveLayerVersionPermission")397    def process(self, resources):398        client = local_session(self.manager.session_factory).client('lambda')399        for r in resources:400            self.process_resource(client, r)401    def process_resource(self, client, r):402        if 'c7n:Policy' not in r:403            try:404                r['c7n:Policy'] = self.manager.retry(405                    client.get_layer_version_policy,406                    LayerName=r['LayerName'],407                    VersionNumber=r['Version'])408            except client.exceptions.ResourceNotFound:409                return410        p = json.loads(r['c7n:Policy'])411        statements, found = self.process_policy(412            p, r, CrossAccountAccessFilter.annotation_key)413        if not found:414            return415        for f in found:416            self.manager.retry(417                client.remove_layer_version_permission,418                LayerName=r['LayerName'],419                StatementId=f['Sid'],420                VersionNumber=r['Version'])421@LambdaLayerVersion.action_registry.register('delete')422class DeleteLayerVersion(BaseAction):423    schema = type_schema('delete')424    permissions = ('lambda:DeleteLayerVersion',)425    def process(self, resources):426        client = local_session(427            self.manager.session_factory).client('lambda')428        for r in resources:429            try:430                self.manager.retry(431                    client.delete_layer_version,432                    LayerName=r['LayerName'],433                    VersionNumber=r['Version'])434            except client.exceptions.ResourceNotFound:...inventory-lambdas.py
Source:inventory-lambdas.py  
...118    try:119        resource_item['supplementaryConfiguration']['LayerVersions'] = []120        response = client.list_layer_versions(LayerName=layer['LayerName'], MaxItems=50)121        for version in response['LayerVersions']:122            version['Policy'] = client.get_layer_version_policy(LayerName=layer['LayerName'], VersionNumber=version['Version'])123            resource_item['supplementaryConfiguration']['LayerVersions'].append(version)124    except ClientError as e:125        message = f"Error getting the Policy for layer {layer['LayerName']} in {region} for {target_account.account_name}: {e}"126        resource_item['errors']['LayerVersions'] = message127        logger.warning(message)...lambdax.py
Source:lambdax.py  
1import json2import logging3from typing import Any, Dict, Generator, Optional, Tuple4from botocore.exceptions import ClientError5from introspector.aws.fetch import ServiceProxy6from introspector.aws.svc import RegionalService, ServiceSpec, resource_gate7_log = logging.getLogger(__name__)8HAS_TAGS = {'list_functions': 'FunctionArn'}9def _get_policy(proxy: ServiceProxy, arn: str) -> Optional[Dict]:10  try:11    policy_resp = proxy.get('get_policy', FunctionName=arn)12    if policy_resp is not None:13      return json.loads(policy_resp['Policy'])14  except ClientError as e:15    if e.response.get('Error', {}).get('Code') != 'ResourceNotFoundException':16      raise17  return None18def _import_function(proxy: ServiceProxy, function: Dict, spec: ServiceSpec):19  name = function['FunctionName']20  arn = function['FunctionArn']21  if resource_gate(spec, 'FunctionVersion'):22    versions_resp = proxy.list('list_versions_by_function', FunctionName=name)23    if versions_resp is not None:24      versions = versions_resp[1]['Versions']25      for version in versions:26        version['ParentFunctionArn'] = arn27        version['Policy'] = _get_policy(proxy, version['FunctionArn'])28        yield 'FunctionVersion', version29  if resource_gate(spec, 'Alias'):30    aliases_resp = proxy.list('list_aliases', FunctionName=name)31    if aliases_resp is not None:32      aliases = aliases_resp[1]['Aliases']33      for alias in aliases:34        alias['FunctionArn'] = arn35        alias['Policy'] = _get_policy(proxy, alias['AliasArn'])36        yield 'Alias', alias37def _import_functions(proxy: ServiceProxy, spec: ServiceSpec):38  functions_resp = proxy.list('list_functions')39  if functions_resp is not None:40    functions = functions_resp[1]['Functions']41    for function in functions:42      arn_for_tags = HAS_TAGS.get('list_functions')43      if arn_for_tags is not None:44        arn = function[arn_for_tags]45        tags_result = proxy.list('list_tags', Resource=arn)46        if tags_result is not None:47          function['Tags'] = tags_result[1].get('Tags', [])48      function['Policy'] = _get_policy(proxy, function['FunctionArn'])49      yield 'Function', function50      yield from _import_function(proxy, function, spec)51def _import_layer(proxy: ServiceProxy, layer_arn: str, layer_name: str):52  layer_versions_resp = proxy.list('list_layer_versions', LayerName=layer_arn)53  if layer_versions_resp is not None:54    layer_versions = layer_versions_resp[1]['LayerVersions']55    for layer_version in layer_versions:56      policy = proxy.get('get_layer_version_policy', LayerName=layer_arn, VersionNumber=layer_version['Version'])57      if policy is not None:58        policy_string = policy.get('Policy')59        if policy_string is not None:60          layer_version['Policy'] = json.loads(policy_string)61        else:62          layer_version['Policy'] = None63        layer_version['PolicyRevisionId'] = policy.get('RevisionId')64      layer_version['Name'] = layer_name65      yield 'LayerVersion', layer_version66def _import_layers(proxy: ServiceProxy):67  layers_resp = proxy.list('list_layers')68  if layers_resp is not None:69    layer_versions = layers_resp[1]['Layers']70    for layer_version in layer_versions:71      layer_arn = layer_version['LayerArn']72      yield from _import_layer(proxy, layer_arn, layer_version['LayerName'])73def _import_lambda_region(74    proxy: ServiceProxy, region: str,75    spec: ServiceSpec) -> Generator[Tuple[str, Any], None, None]:76  if resource_gate(spec, 'Function'):77    _log.info(f'Importing functions in {region}')78    yield from _import_functions(proxy, spec)79  if resource_gate(spec, 'LayerVersion'):80    yield from _import_layers(proxy)81  # TODO: event sources...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
