Best Python code snippet using localstack_python
api_generator.py
Source:api_generator.py  
1import logging2from collections import namedtuple3from samtranslator.metrics.method_decorator import cw_timer4from samtranslator.model.intrinsics import ref, fnGetAtt, make_or_condition5from samtranslator.model.apigateway import (6    ApiGatewayDeployment,7    ApiGatewayRestApi,8    ApiGatewayStage,9    ApiGatewayAuthorizer,10    ApiGatewayResponse,11    ApiGatewayDomainName,12    ApiGatewayBasePathMapping,13    ApiGatewayUsagePlan,14    ApiGatewayUsagePlanKey,15    ApiGatewayApiKey,16)17from samtranslator.model.route53 import Route53RecordSetGroup18from samtranslator.model.exceptions import InvalidResourceException, InvalidTemplateException, InvalidDocumentException19from samtranslator.model.s3_utils.uri_parser import parse_s3_uri20from samtranslator.region_configuration import RegionConfiguration21from samtranslator.swagger.swagger import SwaggerEditor22from samtranslator.model.intrinsics import is_intrinsic, fnSub23from samtranslator.model.lambda_ import LambdaPermission24from samtranslator.translator.logical_id_generator import LogicalIdGenerator25from samtranslator.translator.arn_generator import ArnGenerator26from samtranslator.model.tags.resource_tagging import get_tag_list27from samtranslator.utils.py27hash_fix import Py27Dict, Py27UniStr28LOG = logging.getLogger(__name__)29_CORS_WILDCARD = "'*'"30CorsProperties = namedtuple(31    "CorsProperties", ["AllowMethods", "AllowHeaders", "AllowOrigin", "MaxAge", "AllowCredentials"]32)33# Default the Cors Properties to '*' wildcard and False AllowCredentials. Other properties are actually Optional34CorsProperties.__new__.__defaults__ = (None, None, _CORS_WILDCARD, None, False)35AuthProperties = namedtuple(36    "AuthProperties",37    [38        "Authorizers",39        "DefaultAuthorizer",40        "InvokeRole",41        "AddDefaultAuthorizerToCorsPreflight",42        "ApiKeyRequired",43        "ResourcePolicy",44        "UsagePlan",45    ],46)47AuthProperties.__new__.__defaults__ = (None, None, None, True, None, None, None)48UsagePlanProperties = namedtuple(49    "UsagePlanProperties", ["CreateUsagePlan", "Description", "Quota", "Tags", "Throttle", "UsagePlanName"]50)51UsagePlanProperties.__new__.__defaults__ = (None, None, None, None, None, None)52GatewayResponseProperties = ["ResponseParameters", "ResponseTemplates", "StatusCode"]53class SharedApiUsagePlan(object):54    """55    Collects API information from different API resources in the same template,56    so that these information can be used in the shared usage plan57    """58    SHARED_USAGE_PLAN_CONDITION_NAME = "SharedUsagePlanCondition"59    def __init__(self):60        self.usage_plan_shared = False61        self.stage_keys_shared = list()62        self.api_stages_shared = list()63        self.depends_on_shared = list()64        # shared resource level attributes65        self.conditions = set()66        self.any_api_without_condition = False67        self.deletion_policy = None68        self.update_replace_policy = None69    def get_combined_resource_attributes(self, resource_attributes, conditions):70        """71        This method returns a dictionary which combines 'DeletionPolicy', 'UpdateReplacePolicy' and 'Condition'72        values of API definitions that could be used in Shared Usage Plan resources.73        Parameters74        ----------75        resource_attributes: Dict[str]76            A dictionary of resource level attributes of the API resource77        conditions: Dict[str]78            Conditions section of the template79        """80        self._set_deletion_policy(resource_attributes.get("DeletionPolicy"))81        self._set_update_replace_policy(resource_attributes.get("UpdateReplacePolicy"))82        self._set_condition(resource_attributes.get("Condition"), conditions)83        combined_resource_attributes = dict()84        if self.deletion_policy:85            combined_resource_attributes["DeletionPolicy"] = self.deletion_policy86        if self.update_replace_policy:87            combined_resource_attributes["UpdateReplacePolicy"] = self.update_replace_policy88        # do not set Condition if any of the API resource does not have Condition in it89        if self.conditions and not self.any_api_without_condition:90            combined_resource_attributes["Condition"] = SharedApiUsagePlan.SHARED_USAGE_PLAN_CONDITION_NAME91        return combined_resource_attributes92    def _set_deletion_policy(self, deletion_policy):93        if deletion_policy:94            if self.deletion_policy:95                # update only if new deletion policy is Retain96                if deletion_policy == "Retain":97                    self.deletion_policy = deletion_policy98            else:99                self.deletion_policy = deletion_policy100    def _set_update_replace_policy(self, update_replace_policy):101        if update_replace_policy:102            if self.update_replace_policy:103                # if new value is Retain or104                # new value is retain and current value is Delete then update its value105                if (update_replace_policy == "Retain") or (106                    update_replace_policy == "Snapshot" and self.update_replace_policy == "Delete"107                ):108                    self.update_replace_policy = update_replace_policy109            else:110                self.update_replace_policy = update_replace_policy111    def _set_condition(self, condition, template_conditions):112        # if there are any API without condition, then skip113        if self.any_api_without_condition:114            return115        if condition and condition not in self.conditions:116            if template_conditions is None:117                raise InvalidTemplateException(118                    "Can't have condition without having 'Conditions' section in the template"119                )120            if self.conditions:121                self.conditions.add(condition)122                or_condition = make_or_condition(self.conditions)123                template_conditions[SharedApiUsagePlan.SHARED_USAGE_PLAN_CONDITION_NAME] = or_condition124            else:125                self.conditions.add(condition)126                template_conditions[SharedApiUsagePlan.SHARED_USAGE_PLAN_CONDITION_NAME] = condition127        elif condition is None:128            self.any_api_without_condition = True129            if template_conditions and SharedApiUsagePlan.SHARED_USAGE_PLAN_CONDITION_NAME in template_conditions:130                del template_conditions[SharedApiUsagePlan.SHARED_USAGE_PLAN_CONDITION_NAME]131class ApiGenerator(object):132    def __init__(133        self,134        logical_id,135        cache_cluster_enabled,136        cache_cluster_size,137        variables,138        depends_on,139        definition_body,140        definition_uri,141        name,142        stage_name,143        shared_api_usage_plan,144        template_conditions,145        tags=None,146        endpoint_configuration=None,147        method_settings=None,148        binary_media=None,149        minimum_compression_size=None,150        disable_execute_api_endpoint=None,151        cors=None,152        auth=None,153        gateway_responses=None,154        access_log_setting=None,155        canary_setting=None,156        tracing_enabled=None,157        resource_attributes=None,158        passthrough_resource_attributes=None,159        open_api_version=None,160        models=None,161        domain=None,162        fail_on_warnings=None,163        description=None,164        mode=None,165        api_key_source_type=None,166    ):167        """Constructs an API Generator class that generates API Gateway resources168        :param logical_id: Logical id of the SAM API Resource169        :param cache_cluster_enabled: Whether cache cluster is enabled170        :param cache_cluster_size: Size of the cache cluster171        :param variables: API Gateway Variables172        :param depends_on: Any resources that need to be depended on173        :param definition_body: API definition174        :param definition_uri: URI to API definition175        :param name: Name of the API Gateway resource176        :param stage_name: Name of the Stage177        :param tags: Stage Tags178        :param access_log_setting: Whether to send access logs and where for Stage179        :param canary_setting: Canary Setting for Stage180        :param tracing_enabled: Whether active tracing with X-ray is enabled181        :param resource_attributes: Resource attributes to add to API resources182        :param passthrough_resource_attributes: Attributes such as `Condition` that are added to derived resources183        :param models: Model definitions to be used by API methods184        :param description: Description of the API Gateway resource185        """186        self.logical_id = logical_id187        self.cache_cluster_enabled = cache_cluster_enabled188        self.cache_cluster_size = cache_cluster_size189        self.variables = variables190        self.depends_on = depends_on191        self.definition_body = definition_body192        self.definition_uri = definition_uri193        self.name = name194        self.stage_name = stage_name195        self.tags = tags196        self.endpoint_configuration = endpoint_configuration197        self.method_settings = method_settings198        self.binary_media = binary_media199        self.minimum_compression_size = minimum_compression_size200        self.disable_execute_api_endpoint = disable_execute_api_endpoint201        self.cors = cors202        self.auth = auth203        self.gateway_responses = gateway_responses204        self.access_log_setting = access_log_setting205        self.canary_setting = canary_setting206        self.tracing_enabled = tracing_enabled207        self.resource_attributes = resource_attributes208        self.passthrough_resource_attributes = passthrough_resource_attributes209        self.open_api_version = open_api_version210        self.remove_extra_stage = open_api_version211        self.models = models212        self.domain = domain213        self.fail_on_warnings = fail_on_warnings214        self.description = description215        self.shared_api_usage_plan = shared_api_usage_plan216        self.template_conditions = template_conditions217        self.mode = mode218        self.api_key_source_type = api_key_source_type219    def _construct_rest_api(self):220        """Constructs and returns the ApiGateway RestApi.221        :returns: the RestApi to which this SAM Api corresponds222        :rtype: model.apigateway.ApiGatewayRestApi223        """224        rest_api = ApiGatewayRestApi(self.logical_id, depends_on=self.depends_on, attributes=self.resource_attributes)225        # NOTE: For backwards compatibility we need to retain BinaryMediaTypes on the CloudFormation Property226        # Removing this and only setting x-amazon-apigateway-binary-media-types results in other issues.227        rest_api.BinaryMediaTypes = self.binary_media228        rest_api.MinimumCompressionSize = self.minimum_compression_size229        if self.endpoint_configuration:230            self._set_endpoint_configuration(rest_api, self.endpoint_configuration)231        elif not RegionConfiguration.is_apigw_edge_configuration_supported():232            # Since this region does not support EDGE configuration, we explicitly set the endpoint type233            # to Regional which is the only supported config.234            self._set_endpoint_configuration(rest_api, "REGIONAL")235        if self.definition_uri and self.definition_body:236            raise InvalidResourceException(237                self.logical_id, "Specify either 'DefinitionUri' or 'DefinitionBody' property and not both."238            )239        if self.open_api_version:240            if not SwaggerEditor.safe_compare_regex_with_string(241                SwaggerEditor.get_openapi_versions_supported_regex(), self.open_api_version242            ):243                raise InvalidResourceException(244                    self.logical_id, "The OpenApiVersion value must be of the format '3.0.0'."245                )246        self._add_cors()247        self._add_auth()248        self._add_gateway_responses()249        self._add_binary_media_types()250        self._add_models()251        if self.fail_on_warnings:252            rest_api.FailOnWarnings = self.fail_on_warnings253        if self.disable_execute_api_endpoint is not None:254            self._add_endpoint_extension()255        if self.definition_uri:256            rest_api.BodyS3Location = self._construct_body_s3_dict()257        elif self.definition_body:258            # # Post Process OpenApi Auth Settings259            self.definition_body = self._openapi_postprocess(self.definition_body)260            rest_api.Body = self.definition_body261        if self.name:262            rest_api.Name = self.name263        if self.description:264            rest_api.Description = self.description265        if self.mode:266            rest_api.Mode = self.mode267        if self.api_key_source_type:268            rest_api.ApiKeySourceType = self.api_key_source_type269        return rest_api270    def _add_endpoint_extension(self):271        """Add disableExecuteApiEndpoint if it is set in SAM272        Note:273        If neither DefinitionUri nor DefinitionBody are specified,274        SAM will generate a openapi definition body based on template configuration.275        https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-resource-api.html#sam-api-definitionbody276        For this reason, we always put DisableExecuteApiEndpoint into openapi object irrespective of origin of DefinitionBody.277        """278        if self.disable_execute_api_endpoint is not None and not self.definition_body:279            raise InvalidResourceException(280                self.logical_id, "DisableExecuteApiEndpoint works only within 'DefinitionBody' property."281            )282        editor = SwaggerEditor(self.definition_body)283        editor.add_disable_execute_api_endpoint_extension(self.disable_execute_api_endpoint)284        self.definition_body = editor.swagger285    def _construct_body_s3_dict(self):286        """Constructs the RestApi's `BodyS3Location property`_, from the SAM Api's DefinitionUri property.287        :returns: a BodyS3Location dict, containing the S3 Bucket, Key, and Version of the Swagger definition288        :rtype: dict289        """290        if isinstance(self.definition_uri, dict):291            if not self.definition_uri.get("Bucket", None) or not self.definition_uri.get("Key", None):292                # DefinitionUri is a dictionary but does not contain Bucket or Key property293                raise InvalidResourceException(294                    self.logical_id, "'DefinitionUri' requires Bucket and Key properties to be specified."295                )296            s3_pointer = self.definition_uri297        else:298            # DefinitionUri is a string299            s3_pointer = parse_s3_uri(self.definition_uri)300            if s3_pointer is None:301                raise InvalidResourceException(302                    self.logical_id,303                    "'DefinitionUri' is not a valid S3 Uri of the form "304                    "'s3://bucket/key' with optional versionId query parameter.",305                )306            if isinstance(self.definition_uri, Py27UniStr):307                # self.defintion_uri is a Py27UniStr instance if it is defined in the template308                # we need to preserve the Py27UniStr type309                s3_pointer["Bucket"] = Py27UniStr(s3_pointer["Bucket"])310                s3_pointer["Key"] = Py27UniStr(s3_pointer["Key"])311                if "Version" in s3_pointer:312                    s3_pointer["Version"] = Py27UniStr(s3_pointer["Version"])313        # Construct body_s3 as py27 dict314        body_s3 = Py27Dict()315        body_s3["Bucket"] = s3_pointer["Bucket"]316        body_s3["Key"] = s3_pointer["Key"]317        if "Version" in s3_pointer:318            body_s3["Version"] = s3_pointer["Version"]319        return body_s3320    def _construct_deployment(self, rest_api):321        """Constructs and returns the ApiGateway Deployment.322        :param model.apigateway.ApiGatewayRestApi rest_api: the RestApi for this Deployment323        :returns: the Deployment to which this SAM Api corresponds324        :rtype: model.apigateway.ApiGatewayDeployment325        """326        deployment = ApiGatewayDeployment(327            self.logical_id + "Deployment", attributes=self.passthrough_resource_attributes328        )329        deployment.RestApiId = rest_api.get_runtime_attr("rest_api_id")330        if not self.remove_extra_stage:331            deployment.StageName = "Stage"332        return deployment333    def _construct_stage(self, deployment, swagger, redeploy_restapi_parameters):334        """Constructs and returns the ApiGateway Stage.335        :param model.apigateway.ApiGatewayDeployment deployment: the Deployment for this Stage336        :returns: the Stage to which this SAM Api corresponds337        :rtype: model.apigateway.ApiGatewayStage338        """339        # If StageName is some intrinsic function, then don't prefix the Stage's logical ID340        # This will NOT create duplicates because we allow only ONE stage per API resource341        stage_name_prefix = self.stage_name if isinstance(self.stage_name, str) else ""342        if stage_name_prefix.isalnum():343            stage_logical_id = self.logical_id + stage_name_prefix + "Stage"344        else:345            generator = LogicalIdGenerator(self.logical_id + "Stage", stage_name_prefix)346            stage_logical_id = generator.gen()347        stage = ApiGatewayStage(stage_logical_id, attributes=self.passthrough_resource_attributes)348        stage.RestApiId = ref(self.logical_id)349        stage.update_deployment_ref(deployment.logical_id)350        stage.StageName = self.stage_name351        stage.CacheClusterEnabled = self.cache_cluster_enabled352        stage.CacheClusterSize = self.cache_cluster_size353        stage.Variables = self.variables354        stage.MethodSettings = self.method_settings355        stage.AccessLogSetting = self.access_log_setting356        stage.CanarySetting = self.canary_setting357        stage.TracingEnabled = self.tracing_enabled358        if swagger is not None:359            deployment.make_auto_deployable(360                stage, self.remove_extra_stage, swagger, self.domain, redeploy_restapi_parameters361            )362        if self.tags is not None:363            stage.Tags = get_tag_list(self.tags)364        return stage365    def _construct_api_domain(self, rest_api, route53_record_set_groups):366        """367        Constructs and returns the ApiGateway Domain and BasepathMapping368        """369        if self.domain is None:370            return None, None, None371        if self.domain.get("DomainName") is None or self.domain.get("CertificateArn") is None:372            raise InvalidResourceException(373                self.logical_id, "Custom Domains only works if both DomainName and CertificateArn" " are provided."374            )375        self.domain["ApiDomainName"] = "{}{}".format(376            "ApiGatewayDomainName", LogicalIdGenerator("", self.domain.get("DomainName")).gen()377        )378        domain = ApiGatewayDomainName(self.domain.get("ApiDomainName"), attributes=self.passthrough_resource_attributes)379        domain.DomainName = self.domain.get("DomainName")380        endpoint = self.domain.get("EndpointConfiguration")381        if endpoint is None:382            endpoint = "REGIONAL"383            self.domain["EndpointConfiguration"] = "REGIONAL"384        elif endpoint not in ["EDGE", "REGIONAL", "PRIVATE"]:385            raise InvalidResourceException(386                self.logical_id,387                "EndpointConfiguration for Custom Domains must be"388                " one of {}.".format(["EDGE", "REGIONAL", "PRIVATE"]),389            )390        if endpoint == "REGIONAL":391            domain.RegionalCertificateArn = self.domain.get("CertificateArn")392        else:393            domain.CertificateArn = self.domain.get("CertificateArn")394        domain.EndpointConfiguration = {"Types": [endpoint]}395        mutual_tls_auth = self.domain.get("MutualTlsAuthentication", None)396        if mutual_tls_auth:397            if isinstance(mutual_tls_auth, dict):398                if not set(mutual_tls_auth.keys()).issubset({"TruststoreUri", "TruststoreVersion"}):399                    invalid_keys = list()400                    for key in mutual_tls_auth.keys():401                        if not key in {"TruststoreUri", "TruststoreVersion"}:402                            invalid_keys.append(key)403                    invalid_keys.sort()404                    raise InvalidResourceException(405                        ",".join(invalid_keys),406                        "Available MutualTlsAuthentication fields are {}.".format(407                            ["TruststoreUri", "TruststoreVersion"]408                        ),409                    )410                domain.MutualTlsAuthentication = {}411                if mutual_tls_auth.get("TruststoreUri", None):412                    domain.MutualTlsAuthentication["TruststoreUri"] = mutual_tls_auth["TruststoreUri"]413                if mutual_tls_auth.get("TruststoreVersion", None):414                    domain.MutualTlsAuthentication["TruststoreVersion"] = mutual_tls_auth["TruststoreVersion"]415            else:416                raise InvalidResourceException(417                    mutual_tls_auth,418                    "MutualTlsAuthentication must be a map with at least one of the following fields {}.".format(419                        ["TruststoreUri", "TruststoreVersion"]420                    ),421                )422        if self.domain.get("SecurityPolicy", None):423            domain.SecurityPolicy = self.domain["SecurityPolicy"]424        if self.domain.get("OwnershipVerificationCertificateArn", None):425            domain.OwnershipVerificationCertificateArn = self.domain["OwnershipVerificationCertificateArn"]426        # Create BasepathMappings427        if self.domain.get("BasePath") and isinstance(self.domain.get("BasePath"), str):428            basepaths = [self.domain.get("BasePath")]429        elif self.domain.get("BasePath") and isinstance(self.domain.get("BasePath"), list):430            basepaths = self.domain.get("BasePath")431        else:432            basepaths = None433        basepath_resource_list = []434        if basepaths is None:435            basepath_mapping = ApiGatewayBasePathMapping(436                self.logical_id + "BasePathMapping", attributes=self.passthrough_resource_attributes437            )438            basepath_mapping.DomainName = ref(self.domain.get("ApiDomainName"))439            basepath_mapping.RestApiId = ref(rest_api.logical_id)440            basepath_mapping.Stage = ref(rest_api.logical_id + ".Stage")441            basepath_resource_list.extend([basepath_mapping])442        else:443            for path in basepaths:444                path = "".join(e for e in path if e.isalnum())445                logical_id = "{}{}{}".format(self.logical_id, path, "BasePathMapping")446                basepath_mapping = ApiGatewayBasePathMapping(447                    logical_id, attributes=self.passthrough_resource_attributes448                )449                basepath_mapping.DomainName = ref(self.domain.get("ApiDomainName"))450                basepath_mapping.RestApiId = ref(rest_api.logical_id)451                basepath_mapping.Stage = ref(rest_api.logical_id + ".Stage")452                basepath_mapping.BasePath = path453                basepath_resource_list.extend([basepath_mapping])454        # Create the Route53 RecordSetGroup resource455        record_set_group = None456        if self.domain.get("Route53") is not None:457            route53 = self.domain.get("Route53")458            if not isinstance(route53, dict):459                raise InvalidResourceException(460                    self.logical_id,461                    "Invalid property type '{}' for Route53. "462                    "Expected a map defines an Amazon Route 53 configuration'.".format(type(route53).__name__),463                )464            if route53.get("HostedZoneId") is None and route53.get("HostedZoneName") is None:465                raise InvalidResourceException(466                    self.logical_id,467                    "HostedZoneId or HostedZoneName is required to enable Route53 support on Custom Domains.",468                )469            logical_id_suffix = LogicalIdGenerator(470                "", route53.get("HostedZoneId") or route53.get("HostedZoneName")471            ).gen()472            logical_id = "RecordSetGroup" + logical_id_suffix473            record_set_group = route53_record_set_groups.get(logical_id)474            if not record_set_group:475                record_set_group = Route53RecordSetGroup(logical_id, attributes=self.passthrough_resource_attributes)476                if "HostedZoneId" in route53:477                    record_set_group.HostedZoneId = route53.get("HostedZoneId")478                if "HostedZoneName" in route53:479                    record_set_group.HostedZoneName = route53.get("HostedZoneName")480                record_set_group.RecordSets = []481                route53_record_set_groups[logical_id] = record_set_group482            record_set_group.RecordSets += self._construct_record_sets_for_domain(self.domain)483        return domain, basepath_resource_list, record_set_group484    def _construct_record_sets_for_domain(self, domain):485        recordset_list = []486        recordset = {}487        route53 = domain.get("Route53")488        recordset["Name"] = domain.get("DomainName")489        recordset["Type"] = "A"490        recordset["AliasTarget"] = self._construct_alias_target(self.domain)491        recordset_list.extend([recordset])492        recordset_ipv6 = {}493        if route53.get("IpV6") is not None and route53.get("IpV6") is True:494            recordset_ipv6["Name"] = domain.get("DomainName")495            recordset_ipv6["Type"] = "AAAA"496            recordset_ipv6["AliasTarget"] = self._construct_alias_target(self.domain)497            recordset_list.extend([recordset_ipv6])498        return recordset_list499    def _construct_alias_target(self, domain):500        alias_target = {}501        route53 = domain.get("Route53")502        target_health = route53.get("EvaluateTargetHealth")503        if target_health is not None:504            alias_target["EvaluateTargetHealth"] = target_health505        if domain.get("EndpointConfiguration") == "REGIONAL":506            alias_target["HostedZoneId"] = fnGetAtt(self.domain.get("ApiDomainName"), "RegionalHostedZoneId")507            alias_target["DNSName"] = fnGetAtt(self.domain.get("ApiDomainName"), "RegionalDomainName")508        else:509            if route53.get("DistributionDomainName") is None:510                route53["DistributionDomainName"] = fnGetAtt(self.domain.get("ApiDomainName"), "DistributionDomainName")511            alias_target["HostedZoneId"] = "Z2FDTNDATAQYW2"512            alias_target["DNSName"] = route53.get("DistributionDomainName")513        return alias_target514    @cw_timer(prefix="Generator", name="Api")515    def to_cloudformation(self, redeploy_restapi_parameters, route53_record_set_groups):516        """Generates CloudFormation resources from a SAM API resource517        :returns: a tuple containing the RestApi, Deployment, and Stage for an empty Api.518        :rtype: tuple519        """520        rest_api = self._construct_rest_api()521        domain, basepath_mapping, route53 = self._construct_api_domain(rest_api, route53_record_set_groups)522        deployment = self._construct_deployment(rest_api)523        swagger = None524        if rest_api.Body is not None:525            swagger = rest_api.Body526        elif rest_api.BodyS3Location is not None:527            swagger = rest_api.BodyS3Location528        stage = self._construct_stage(deployment, swagger, redeploy_restapi_parameters)529        permissions = self._construct_authorizer_lambda_permission()530        usage_plan = self._construct_usage_plan(rest_api_stage=stage)531        return rest_api, deployment, stage, permissions, domain, basepath_mapping, route53, usage_plan532    def _add_cors(self):533        """534        Add CORS configuration to the Swagger file, if necessary535        """536        INVALID_ERROR = "Invalid value for 'Cors' property"537        if not self.cors:538            return539        if self.cors and not self.definition_body:540            raise InvalidResourceException(541                self.logical_id, "Cors works only with inline Swagger specified in 'DefinitionBody' property."542            )543        if isinstance(self.cors, str) or is_intrinsic(self.cors):544            # Just set Origin property. Others will be defaults545            properties = CorsProperties(AllowOrigin=self.cors)546        elif isinstance(self.cors, dict):547            # Make sure keys in the dict are recognized548            if not all(key in CorsProperties._fields for key in self.cors.keys()):549                raise InvalidResourceException(self.logical_id, INVALID_ERROR)550            properties = CorsProperties(**self.cors)551        else:552            raise InvalidResourceException(self.logical_id, INVALID_ERROR)553        if not SwaggerEditor.is_valid(self.definition_body):554            raise InvalidResourceException(555                self.logical_id,556                "Unable to add Cors configuration because "557                "'DefinitionBody' does not contain a valid Swagger definition.",558            )559        if properties.AllowCredentials is True and properties.AllowOrigin == _CORS_WILDCARD:560            raise InvalidResourceException(561                self.logical_id,562                "Unable to add Cors configuration because "563                "'AllowCredentials' can not be true when "564                "'AllowOrigin' is \"'*'\" or not set",565            )566        editor = SwaggerEditor(self.definition_body)567        for path in editor.iter_on_path():568            try:569                editor.add_cors(570                    path,571                    properties.AllowOrigin,572                    properties.AllowHeaders,573                    properties.AllowMethods,574                    max_age=properties.MaxAge,575                    allow_credentials=properties.AllowCredentials,576                )577            except InvalidTemplateException as ex:578                raise InvalidResourceException(self.logical_id, ex.message)579        # Assign the Swagger back to template580        self.definition_body = editor.swagger581    def _add_binary_media_types(self):582        """583        Add binary media types to Swagger584        """585        if not self.binary_media:586            return587        # We don't raise an error here like we do for similar cases because that would be backwards incompatible588        if self.binary_media and not self.definition_body:589            return590        editor = SwaggerEditor(self.definition_body)591        editor.add_binary_media_types(self.binary_media)592        # Assign the Swagger back to template593        self.definition_body = editor.swagger594    def _add_auth(self):595        """596        Add Auth configuration to the Swagger file, if necessary597        """598        if not self.auth:599            return600        if self.auth and not self.definition_body:601            raise InvalidResourceException(602                self.logical_id, "Auth works only with inline Swagger specified in " "'DefinitionBody' property."603            )604        # Make sure keys in the dict are recognized605        if not all(key in AuthProperties._fields for key in self.auth.keys()):606            raise InvalidResourceException(self.logical_id, "Invalid value for 'Auth' property")607        if not SwaggerEditor.is_valid(self.definition_body):608            raise InvalidResourceException(609                self.logical_id,610                "Unable to add Auth configuration because "611                "'DefinitionBody' does not contain a valid Swagger definition.",612            )613        swagger_editor = SwaggerEditor(self.definition_body)614        auth_properties = AuthProperties(**self.auth)615        authorizers = self._get_authorizers(auth_properties.Authorizers, auth_properties.DefaultAuthorizer)616        if authorizers:617            swagger_editor.add_authorizers_security_definitions(authorizers)618            self._set_default_authorizer(619                swagger_editor,620                authorizers,621                auth_properties.DefaultAuthorizer,622                auth_properties.AddDefaultAuthorizerToCorsPreflight,623                auth_properties.Authorizers,624            )625        if auth_properties.ApiKeyRequired:626            swagger_editor.add_apikey_security_definition()627            self._set_default_apikey_required(swagger_editor)628        if auth_properties.ResourcePolicy:629            SwaggerEditor.validate_is_dict(630                auth_properties.ResourcePolicy, "ResourcePolicy must be a map (ResourcePolicyStatement)."631            )632            for path in swagger_editor.iter_on_path():633                swagger_editor.add_resource_policy(auth_properties.ResourcePolicy, path, self.stage_name)634            if auth_properties.ResourcePolicy.get("CustomStatements"):635                swagger_editor.add_custom_statements(auth_properties.ResourcePolicy.get("CustomStatements"))636        self.definition_body = self._openapi_postprocess(swagger_editor.swagger)637    def _construct_usage_plan(self, rest_api_stage=None):638        """Constructs and returns the ApiGateway UsagePlan, ApiGateway UsagePlanKey, ApiGateway ApiKey for Auth.639        :param model.apigateway.ApiGatewayStage stage: the stage of rest api640        :returns: UsagePlan, UsagePlanKey, ApiKey for this rest Api641        :rtype: model.apigateway.ApiGatewayUsagePlan, model.apigateway.ApiGatewayUsagePlanKey,642                model.apigateway.ApiGatewayApiKey643        """644        create_usage_plans_accepted_values = ["SHARED", "PER_API", "NONE"]645        if not self.auth:646            return []647        auth_properties = AuthProperties(**self.auth)648        if auth_properties.UsagePlan is None:649            return []650        usage_plan_properties = auth_properties.UsagePlan651        # throws error if UsagePlan is not a dict652        if not isinstance(usage_plan_properties, dict):653            raise InvalidResourceException(self.logical_id, "'UsagePlan' must be a dictionary")654        # throws error if the property invalid/ unsupported for UsagePlan655        if not all(key in UsagePlanProperties._fields for key in usage_plan_properties.keys()):656            raise InvalidResourceException(self.logical_id, "Invalid property for 'UsagePlan'")657        create_usage_plan = usage_plan_properties.get("CreateUsagePlan")658        usage_plan = None659        api_key = None660        usage_plan_key = None661        if create_usage_plan is None:662            raise InvalidResourceException(self.logical_id, "'CreateUsagePlan' is a required field for UsagePlan.")663        if create_usage_plan not in create_usage_plans_accepted_values:664            raise InvalidResourceException(665                self.logical_id, "'CreateUsagePlan' accepts one of {}.".format(create_usage_plans_accepted_values)666            )667        if create_usage_plan == "NONE":668            return []669        # create usage plan for this api only670        elif usage_plan_properties.get("CreateUsagePlan") == "PER_API":671            usage_plan_logical_id = self.logical_id + "UsagePlan"672            usage_plan = ApiGatewayUsagePlan(673                logical_id=usage_plan_logical_id,674                depends_on=[self.logical_id],675                attributes=self.passthrough_resource_attributes,676            )677            api_stages = list()678            api_stage = dict()679            api_stage["ApiId"] = ref(self.logical_id)680            api_stage["Stage"] = ref(rest_api_stage.logical_id)681            api_stages.append(api_stage)682            usage_plan.ApiStages = api_stages683            api_key = self._construct_api_key(usage_plan_logical_id, create_usage_plan, rest_api_stage)684            usage_plan_key = self._construct_usage_plan_key(usage_plan_logical_id, create_usage_plan, api_key)685        # create a usage plan for all the Apis686        elif create_usage_plan == "SHARED":687            LOG.info("Creating SHARED usage plan for all the Apis")688            usage_plan_logical_id = "ServerlessUsagePlan"689            if self.logical_id not in self.shared_api_usage_plan.depends_on_shared:690                self.shared_api_usage_plan.depends_on_shared.append(self.logical_id)691            usage_plan = ApiGatewayUsagePlan(692                logical_id=usage_plan_logical_id,693                depends_on=self.shared_api_usage_plan.depends_on_shared,694                attributes=self.shared_api_usage_plan.get_combined_resource_attributes(695                    self.passthrough_resource_attributes, self.template_conditions696                ),697            )698            api_stage = dict()699            api_stage["ApiId"] = ref(self.logical_id)700            api_stage["Stage"] = ref(rest_api_stage.logical_id)701            if api_stage not in self.shared_api_usage_plan.api_stages_shared:702                self.shared_api_usage_plan.api_stages_shared.append(api_stage)703            usage_plan.ApiStages = self.shared_api_usage_plan.api_stages_shared704            api_key = self._construct_api_key(usage_plan_logical_id, create_usage_plan, rest_api_stage)705            usage_plan_key = self._construct_usage_plan_key(usage_plan_logical_id, create_usage_plan, api_key)706        if usage_plan_properties.get("UsagePlanName"):707            usage_plan.UsagePlanName = usage_plan_properties.get("UsagePlanName")708        if usage_plan_properties.get("Description"):709            usage_plan.Description = usage_plan_properties.get("Description")710        if usage_plan_properties.get("Quota"):711            usage_plan.Quota = usage_plan_properties.get("Quota")712        if usage_plan_properties.get("Tags"):713            usage_plan.Tags = usage_plan_properties.get("Tags")714        if usage_plan_properties.get("Throttle"):715            usage_plan.Throttle = usage_plan_properties.get("Throttle")716        return usage_plan, api_key, usage_plan_key717    def _construct_api_key(self, usage_plan_logical_id, create_usage_plan, rest_api_stage):718        """719        :param usage_plan_logical_id: String720        :param create_usage_plan: String721        :param rest_api_stage: model.apigateway.ApiGatewayStage stage: the stage of rest api722        :return: api_key model.apigateway.ApiGatewayApiKey resource which is created for the given usage plan723        """724        if create_usage_plan == "SHARED":725            # create an api key resource for all the apis726            LOG.info("Creating api key resource for all the Apis from SHARED usage plan")727            api_key_logical_id = "ServerlessApiKey"728            api_key = ApiGatewayApiKey(729                logical_id=api_key_logical_id,730                depends_on=[usage_plan_logical_id],731                attributes=self.shared_api_usage_plan.get_combined_resource_attributes(732                    self.passthrough_resource_attributes, self.template_conditions733                ),734            )735            api_key.Enabled = True736            stage_key = dict()737            stage_key["RestApiId"] = ref(self.logical_id)738            stage_key["StageName"] = ref(rest_api_stage.logical_id)739            if stage_key not in self.shared_api_usage_plan.stage_keys_shared:740                self.shared_api_usage_plan.stage_keys_shared.append(stage_key)741            api_key.StageKeys = self.shared_api_usage_plan.stage_keys_shared742        # for create_usage_plan = "PER_API"743        else:744            # create an api key resource for this api745            api_key_logical_id = self.logical_id + "ApiKey"746            api_key = ApiGatewayApiKey(747                logical_id=api_key_logical_id,748                depends_on=[usage_plan_logical_id],749                attributes=self.passthrough_resource_attributes,750            )751            api_key.Enabled = True752            stage_keys = list()753            stage_key = dict()754            stage_key["RestApiId"] = ref(self.logical_id)755            stage_key["StageName"] = ref(rest_api_stage.logical_id)756            stage_keys.append(stage_key)757            api_key.StageKeys = stage_keys758        return api_key759    def _construct_usage_plan_key(self, usage_plan_logical_id, create_usage_plan, api_key):760        """761        :param usage_plan_logical_id: String762        :param create_usage_plan: String763        :param api_key: model.apigateway.ApiGatewayApiKey resource764        :return: model.apigateway.ApiGatewayUsagePlanKey resource that contains the mapping between usage plan and api key765        """766        if create_usage_plan == "SHARED":767            # create a mapping between api key and the usage plan768            usage_plan_key_logical_id = "ServerlessUsagePlanKey"769            resource_attributes = self.shared_api_usage_plan.get_combined_resource_attributes(770                self.passthrough_resource_attributes, self.template_conditions771            )772        # for create_usage_plan = "PER_API"773        else:774            # create a mapping between api key and the usage plan775            usage_plan_key_logical_id = self.logical_id + "UsagePlanKey"776            resource_attributes = self.passthrough_resource_attributes777        usage_plan_key = ApiGatewayUsagePlanKey(778            logical_id=usage_plan_key_logical_id,779            depends_on=[api_key.logical_id],780            attributes=resource_attributes,781        )782        usage_plan_key.KeyId = ref(api_key.logical_id)783        usage_plan_key.KeyType = "API_KEY"784        usage_plan_key.UsagePlanId = ref(usage_plan_logical_id)785        return usage_plan_key786    def _add_gateway_responses(self):787        """788        Add Gateway Response configuration to the Swagger file, if necessary789        """790        if not self.gateway_responses:791            return792        if self.gateway_responses and not self.definition_body:793            raise InvalidResourceException(794                self.logical_id,795                "GatewayResponses works only with inline Swagger specified in " "'DefinitionBody' property.",796            )797        # Make sure keys in the dict are recognized798        for responses_key, responses_value in self.gateway_responses.items():799            if is_intrinsic(responses_value):800                # TODO: Add intrinsic support for this field.801                raise InvalidResourceException(802                    self.logical_id,803                    "Unable to set GatewayResponses attribute because "804                    "intrinsic functions are not supported for this field.",805                )806            elif not isinstance(responses_value, dict):807                raise InvalidResourceException(808                    self.logical_id,809                    "Invalid property type '{}' for GatewayResponses. "810                    "Expected an object of type 'GatewayResponse'.".format(type(responses_value).__name__),811                )812            for response_key in responses_value.keys():813                if response_key not in GatewayResponseProperties:814                    raise InvalidResourceException(815                        self.logical_id,816                        "Invalid property '{}' in 'GatewayResponses' property '{}'.".format(817                            response_key, responses_key818                        ),819                    )820        if not SwaggerEditor.is_valid(self.definition_body):821            raise InvalidResourceException(822                self.logical_id,823                "Unable to add Auth configuration because "824                "'DefinitionBody' does not contain a valid Swagger definition.",825            )826        swagger_editor = SwaggerEditor(self.definition_body)827        # The dicts below will eventually become part of swagger/openapi definition, thus requires using Py27Dict()828        gateway_responses = Py27Dict()829        for response_type, response in self.gateway_responses.items():830            gateway_responses[response_type] = ApiGatewayResponse(831                api_logical_id=self.logical_id,832                response_parameters=response.get("ResponseParameters", Py27Dict()),833                response_templates=response.get("ResponseTemplates", Py27Dict()),834                status_code=response.get("StatusCode", None),835            )836        if gateway_responses:837            swagger_editor.add_gateway_responses(gateway_responses)838        # Assign the Swagger back to template839        self.definition_body = swagger_editor.swagger840    def _add_models(self):841        """842        Add Model definitions to the Swagger file, if necessary843        :return:844        """845        if not self.models:846            return847        if self.models and not self.definition_body:848            raise InvalidResourceException(849                self.logical_id, "Models works only with inline Swagger specified in " "'DefinitionBody' property."850            )851        if not SwaggerEditor.is_valid(self.definition_body):852            raise InvalidResourceException(853                self.logical_id,854                "Unable to add Models definitions because "855                "'DefinitionBody' does not contain a valid Swagger definition.",856            )857        if not all(isinstance(model, dict) for model in self.models.values()):858            raise InvalidResourceException(self.logical_id, "Invalid value for 'Models' property")859        swagger_editor = SwaggerEditor(self.definition_body)860        swagger_editor.add_models(self.models)861        # Assign the Swagger back to template862        self.definition_body = self._openapi_postprocess(swagger_editor.swagger)863    def _openapi_postprocess(self, definition_body):864        """865        Convert definitions to openapi 3 in definition body if OpenApiVersion flag is specified.866        If the is swagger defined in the definition body, we treat it as a swagger spec and do not867        make any openapi 3 changes to it868        """869        if definition_body.get("swagger") is not None:870            return definition_body871        if definition_body.get("openapi") is not None and self.open_api_version is None:872            self.open_api_version = definition_body.get("openapi")873        if self.open_api_version and SwaggerEditor.safe_compare_regex_with_string(874            SwaggerEditor.get_openapi_version_3_regex(), self.open_api_version875        ):876            if definition_body.get("securityDefinitions"):877                components = definition_body.get("components", Py27Dict())878                # In the previous line, the default value `Py27Dict()` will be only returned only if `components`879                # property is not in definition_body dict, but if it exist, and its value is None, so None will be880                # returned and not the default value. That is why the below line is required.881                components = components if components else Py27Dict()882                components["securitySchemes"] = definition_body["securityDefinitions"]883                definition_body["components"] = components884                del definition_body["securityDefinitions"]885            if definition_body.get("definitions"):886                components = definition_body.get("components", Py27Dict())887                components["schemas"] = definition_body["definitions"]888                definition_body["components"] = components889                del definition_body["definitions"]890            # removes `consumes` and `produces` options for CORS in openapi3 and891            # adds `schema` for the headers in responses for openapi3892            paths = definition_body.get("paths")893            if paths:894                for path, path_item in paths.items():895                    SwaggerEditor.validate_path_item_is_dict(path_item, path)896                    if path_item.get("options"):897                        options = path_item.get("options").copy()898                        for field, field_val in options.items():899                            # remove unsupported produces and consumes in options for openapi3900                            if field in ["produces", "consumes"]:901                                del definition_body["paths"][path]["options"][field]902                            # add schema for the headers in options section for openapi3903                            if field in ["responses"]:904                                SwaggerEditor.validate_is_dict(905                                    field_val,906                                    "Value of responses in options method for path {} must be a "907                                    "dictionary according to Swagger spec.".format(path),908                                )909                                if field_val.get("200") and field_val.get("200").get("headers"):910                                    headers = field_val["200"]["headers"]911                                    for header, header_val in headers.items():912                                        new_header_val_with_schema = Py27Dict()913                                        new_header_val_with_schema["schema"] = header_val914                                        definition_body["paths"][path]["options"][field]["200"]["headers"][915                                            header916                                        ] = new_header_val_with_schema917        return definition_body918    def _get_authorizers(self, authorizers_config, default_authorizer=None):919        # The dict below will eventually become part of swagger/openapi definition, thus requires using Py27Dict()920        authorizers = Py27Dict()921        if default_authorizer == "AWS_IAM":922            authorizers[default_authorizer] = ApiGatewayAuthorizer(923                api_logical_id=self.logical_id, name=default_authorizer, is_aws_iam_authorizer=True924            )925        if not authorizers_config:926            if "AWS_IAM" in authorizers:927                return authorizers928            return None929        if not isinstance(authorizers_config, dict):930            raise InvalidResourceException(self.logical_id, "Authorizers must be a dictionary.")931        for authorizer_name, authorizer in authorizers_config.items():932            if not isinstance(authorizer, dict):933                raise InvalidResourceException(934                    self.logical_id, "Authorizer %s must be a dictionary." % (authorizer_name)935                )936            authorizers[authorizer_name] = ApiGatewayAuthorizer(937                api_logical_id=self.logical_id,938                name=authorizer_name,939                user_pool_arn=authorizer.get("UserPoolArn"),940                function_arn=authorizer.get("FunctionArn"),941                identity=authorizer.get("Identity"),942                function_payload_type=authorizer.get("FunctionPayloadType"),943                function_invoke_role=authorizer.get("FunctionInvokeRole"),944                authorization_scopes=authorizer.get("AuthorizationScopes"),945            )946        return authorizers947    def _get_permission(self, authorizer_name, authorizer_lambda_function_arn):948        """Constructs and returns the Lambda Permission resource allowing the Authorizer to invoke the function.949        :returns: the permission resource950        :rtype: model.lambda_.LambdaPermission951        """952        rest_api = ApiGatewayRestApi(self.logical_id, depends_on=self.depends_on, attributes=self.resource_attributes)953        api_id = rest_api.get_runtime_attr("rest_api_id")954        partition = ArnGenerator.get_partition_name()955        resource = "${__ApiId__}/authorizers/*"956        source_arn = fnSub(957            ArnGenerator.generate_arn(partition=partition, service="execute-api", resource=resource),958            {"__ApiId__": api_id},959        )960        lambda_permission = LambdaPermission(961            self.logical_id + authorizer_name + "AuthorizerPermission", attributes=self.passthrough_resource_attributes962        )963        lambda_permission.Action = "lambda:InvokeFunction"964        lambda_permission.FunctionName = authorizer_lambda_function_arn965        lambda_permission.Principal = "apigateway.amazonaws.com"966        lambda_permission.SourceArn = source_arn967        return lambda_permission968    def _construct_authorizer_lambda_permission(self):969        if not self.auth:970            return []971        auth_properties = AuthProperties(**self.auth)972        authorizers = self._get_authorizers(auth_properties.Authorizers)973        if not authorizers:974            return []975        permissions = []976        for authorizer_name, authorizer in authorizers.items():977            # Construct permissions for Lambda Authorizers only978            if not authorizer.function_arn:979                continue980            permission = self._get_permission(authorizer_name, authorizer.function_arn)981            permissions.append(permission)982        return permissions983    def _set_default_authorizer(984        self, swagger_editor, authorizers, default_authorizer, add_default_auth_to_preflight=True, api_authorizers=None985    ):986        if not default_authorizer:987            return988        if not isinstance(default_authorizer, str):989            raise InvalidResourceException(990                self.logical_id,991                "DefaultAuthorizer is not a string.",992            )993        if not authorizers.get(default_authorizer) and default_authorizer != "AWS_IAM":994            raise InvalidResourceException(995                self.logical_id,996                "Unable to set DefaultAuthorizer because '"997                + default_authorizer998                + "' was not defined in 'Authorizers'.",999            )1000        for path in swagger_editor.iter_on_path():1001            swagger_editor.set_path_default_authorizer(1002                path,1003                default_authorizer,1004                authorizers=authorizers,1005                add_default_auth_to_preflight=add_default_auth_to_preflight,1006                api_authorizers=api_authorizers,1007            )1008    def _set_default_apikey_required(self, swagger_editor):1009        for path in swagger_editor.iter_on_path():1010            swagger_editor.set_path_default_apikey_required(path)1011    def _set_endpoint_configuration(self, rest_api, value):1012        """1013        Sets endpoint configuration property of AWS::ApiGateway::RestApi resource1014        :param rest_api: RestApi resource1015        :param string/dict value: Value to be set1016        """1017        if isinstance(value, dict) and value.get("Type"):1018            rest_api.Parameters = {"endpointConfigurationTypes": value.get("Type")}1019            rest_api.EndpointConfiguration = {"Types": [value.get("Type")]}1020            if "VPCEndpointIds" in value.keys():1021                rest_api.EndpointConfiguration["VpcEndpointIds"] = value.get("VPCEndpointIds")1022        else:1023            rest_api.EndpointConfiguration = {"Types": [value]}...apigateway.py
Source:apigateway.py  
2import jsbeautifier3def create_api_key(client, name):4    create_key = client.create_api_key(name=name)5    return create_key6def create_usage_plan(client, name):7    create_usage_plan = client.create_usage_plan(name=name)8    return create_usage_plan9def create_api(client, name):10    create_api = client.create_rest_api(name=name)11    return create_api12    13def list_api_keys(client):14    keys = [x['id'] for x in client.get_api_keys()['items']]15    return keys16def list_apis(client):17    list_apis = client.get_rest_apis()18    return list_apis19def delete_usage_plans(client):20    responses = []21    for plan in client.get_usage_plans()['items']:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
