Best Python code snippet using localstack_python
api_gateway_domain.py
Source:api_gateway_domain.py  
...127    path_mappings = module.params.get('domain_mappings', [])128    domain_name = module.params.get('domain_name')129    result = {'domain': {}, 'path_mappings': []}130    try:131        result['domain'] = create_domain_name(132            module,133            client,134            domain_name,135            module.params.get('certificate_arn'),136            module.params.get('endpoint_type'),137            module.params.get('security_policy')138        )139        for mapping in path_mappings:140            base_path = mapping.get('base_path', '')141            rest_api_id = mapping.get('rest_api_id')142            stage = mapping.get('stage')143            if rest_api_id is None or stage is None:144                module.fail_json('Every domain mapping needs a rest_api_id and stage name')145            result['path_mappings'].append(add_domain_mapping(client, domain_name, base_path, rest_api_id, stage))146    except (ClientError, BotoCoreError, EndpointConnectionError) as e:147        module.fail_json_aws(e, msg="creating API GW domain")148    return camel_dict_to_snake_dict(result)149def update_domain(module, client, existing_domain):150    domain_name = module.params.get('domain_name')151    result = existing_domain152    result['updated'] = False153    domain = existing_domain.get('domain')154    # Compare only relevant set of domain arguments.155    # As get_domain_name gathers all kind of state information that can't be set anyways.156    # Also this module doesn't support custom TLS cert setup params as they are kind of deprecated already and would increase complexity.157    existing_domain_settings = {158        'certificate_arn': domain.get('certificate_arn'),159        'security_policy': domain.get('security_policy'),160        'endpoint_type': domain.get('endpoint_configuration').get('types')[0]161    }162    specified_domain_settings = {163        'certificate_arn': module.params.get('certificate_arn'),164        'security_policy': module.params.get('security_policy'),165        'endpoint_type': module.params.get('endpoint_type')166    }167    if specified_domain_settings != existing_domain_settings:168        try:169            result['domain'] = update_domain_name(client, domain_name, **snake_dict_to_camel_dict(specified_domain_settings))170            result['updated'] = True171        except (ClientError, BotoCoreError, EndpointConnectionError) as e:172            module.fail_json_aws(e, msg="updating API GW domain")173    existing_mappings = copy.deepcopy(existing_domain.get('path_mappings', []))174    # Cleanout `base_path: "(none)"` elements from dicts as those won't match with specified mappings175    for mapping in existing_mappings:176        if mapping.get('base_path', 'missing') == '(none)':177            mapping.pop('base_path')178    specified_mappings = copy.deepcopy(module.params.get('domain_mappings', []))179    # Cleanout `base_path: ""` elements from dicts as those won't match with existing mappings180    for mapping in specified_mappings:181        if mapping.get('base_path', 'missing') == '':182            mapping.pop('base_path')183    if specified_mappings != existing_mappings:184        try:185            # When lists missmatch delete all existing mappings before adding new ones as specified186            for mapping in existing_domain.get('path_mappings', []):187                delete_domain_mapping(client, domain_name, mapping['base_path'])188            for mapping in module.params.get('domain_mappings', []):189                result['path_mappings'] = add_domain_mapping(190                    client, domain_name, mapping.get('base_path', ''), mapping.get('rest_api_id'), mapping.get('stage')191                )192                result['updated'] = True193        except (ClientError, BotoCoreError, EndpointConnectionError) as e:194            module.fail_json_aws(e, msg="updating API GW domain mapping")195    return camel_dict_to_snake_dict(result)196def delete_domain(module, client):197    domain_name = module.params.get('domain_name')198    try:199        result = delete_domain_name(client, domain_name)200    except (ClientError, BotoCoreError, EndpointConnectionError) as e:201        module.fail_json_aws(e, msg="deleting API GW domain")202    return camel_dict_to_snake_dict(result)203retry_params = {"delay": 5, "backoff": 1.2}204@AWSRetry.jittered_backoff(**retry_params)205def get_domain_name(client, domain_name):206    return client.get_domain_name(domainName=domain_name)207@AWSRetry.jittered_backoff(**retry_params)208def get_domain_mappings(client, domain_name):209    return client.get_base_path_mappings(domainName=domain_name, limit=200).get('items', [])210@AWSRetry.jittered_backoff(**retry_params)211def create_domain_name(module, client, domain_name, certificate_arn, endpoint_type, security_policy):212    endpoint_configuration = {'types': [endpoint_type]}213    if endpoint_type == 'EDGE':214        return client.create_domain_name(215            domainName=domain_name,216            certificateArn=certificate_arn,217            endpointConfiguration=endpoint_configuration,218            securityPolicy=security_policy219        )220    else:221        # Use regionalCertificateArn for regional domain deploys222        return client.create_domain_name(223            domainName=domain_name,224            regionalCertificateArn=certificate_arn,225            endpointConfiguration=endpoint_configuration,226            securityPolicy=security_policy227        )228@AWSRetry.jittered_backoff(**retry_params)229def add_domain_mapping(client, domain_name, base_path, rest_api_id, stage):230    return client.create_base_path_mapping(domainName=domain_name, basePath=base_path, restApiId=rest_api_id, stage=stage)231@AWSRetry.jittered_backoff(**retry_params)232def update_domain_name(client, domain_name, **kwargs):233    patch_operations = []234    for key, value in kwargs.items():235        path = "/" + key236        if key == "endpointType":...lambda_function.py
Source:lambda_function.py  
...54                "TruststoreVersion": cdef.get("truststore_version")55            })56            domain_name_arn = gen_apigateway_arn(domain_name, region)57            get_domain_name(prev_state, domain_name, desired_config, desired_tls_config, tags, region)58            create_domain_name(domain_name, desired_config, desired_tls_config, tags, region)59            update_domain_name(domain_name, desired_config, desired_tls_config, region)60            remove_tags(domain_name_arn)61            add_tags(domain_name_arn)62        #We call this one no matter what63        remove_domain_name()64        return eh.finish()65    except Exception as e:66        msg = traceback.format_exc()67        print(msg)68        eh.add_log("Unexpected Error", {"error": str(e)}, is_error=True)69        eh.declare_return(200, 0, error_code=str(e))70        return eh.finish()71def form_domain(subdomain, base_domain):72    if subdomain and base_domain:73        return f"{subdomain}.{base_domain}"74    else:75        return None76@ext(handler=eh, op="get_acm_cert")77def get_acm_cert(domain_name, region):78    cursor = 'none'79    certs = []80    while cursor:81        try:82            payload = remove_none_attributes({83                "CertificateStatuses": ["ISSUED"],84                "NextToken": cursor if cursor != 'none' else None85            })86            cert_response = acm.list_certificates(**payload)87            print(f"cert_response = {cert_response}")88            certs.extend(cert_response.get("CertificateSummaryList", []))89            cursor = cert_response.get("nextToken")90        except ClientError as e:91            handle_common_errors(e, eh, "List Certificates Failed", 0)92    93    # print(certs)94    print(list(filter(lambda x: domain_name.endswith(x["DomainName"].replace("*", "")), certs)))95    sorted_matching_certs = list(filter(lambda x: domain_name.endswith(x["DomainName"].replace("*", "")), certs))96    sorted_matching_certs.sort(key=lambda x:-len(x['DomainName']))97    print(f"sorted_matching_certs = {sorted_matching_certs}")98    if not sorted_matching_certs:99        eh.perm_error("No Matching ACM Certificate Found, Cannot Create API Custom Domain")100        eh.add_log("No Matching ACM Certificates", {"all_certs": certs}, is_error=True)101        return 0102    eh.add_op("get_domain_name")103    certificate_arn = sorted_matching_certs[0]['CertificateArn']104    certificate_domain_name = sorted_matching_certs[0]['DomainName']105    eh.add_props({"certificate_arn": certificate_arn,106        "certificate_domain_name": certificate_domain_name})107    eh.add_links({"ACM Certificate": gen_certificate_link(certificate_arn, region)})108@ext(handler=eh, op="get_domain_name")109def get_domain_name(prev_state, domain_name, desired_config, desired_tls_config, tags, region):110    if prev_state and prev_state.get("props", {}).get("name"):111        old_domain_name = prev_state["props"]["name"]112        if old_domain_name and (old_domain_name != domain_name):113            eh.add_op("remove_old", {"name": old_domain_name, "create_and_remove": True})114    try:115        response = v2.get_domain_name(DomainName=domain_name)116        print(f'Selection Expression = {response.get("ApiMappingSelectionExpression")}')117        print(f"response = {response}")118        eh.add_log("Got Domain Name", response)119        config = response.get("DomainNameConfigurations")[0]120        tls_config = response.get("MutualTlsAuthentication")121        desired_config['CertificateArn'] = eh.props['certificate_arn']122        match = True123        for k, v in desired_config.items():124            if v != config[k]:125                eh.add_op("update_domain_name")126                match = False127        if match:128            for k,v in desired_tls_config.items():129                if v != tls_config[k]:130                    eh.add_op("update_domain_name")131                    match = False132        if match:133            eh.add_props({134                "hosted_zone_id": config.get("HostedZoneId"),135                "api_gateway_domain_name": config.get("ApiGatewayDomainName")136            })137            eh.add_links({"AWS Custom Domain Name": gen_custom_domain_link(domain_name, region),138            "URL": gen_domain_url(domain_name)})139            eh.add_log("Domain Update Unncessary", {"config": config, "desired_config": desired_config})140        current_tags = response.get("Tags") or {}141        if tags != current_tags:142            remove_tags = [k for k in current_tags.keys() if k not in tags]143            add_tags = {k:v for k,v in tags.items() if k not in current_tags.keys()}144            if remove_tags:145                eh.add_op("remove_tags", remove_tags)146            if add_tags:147                eh.add_op("add_tags", add_tags)148    except ClientError as e:149        if e.response['Error']['Code'] == "NotFoundException":150            eh.add_op("create_domain_name")151            eh.add_log("Domain Name Does Not Exist", {"domain_name": domain_name})152        else:153            handle_common_errors(e, eh, "Get Domain Name Failed", 10)154    155@ext(handler=eh, op="create_domain_name")156def create_domain_name(domain_name, desired_config, desired_tls_config, tags, region):157    try:158        params = remove_none_attributes({159            "DomainName": domain_name,160            "DomainNameConfigurations": [desired_config],161            "MutualTlsAuthentication": desired_tls_config or None,162            "Tags": tags or None163        })164        response = v2.create_domain_name(**params)165        print(f"create response {response}")166        eh.add_log("Created Domain Name", response)167        config = response['DomainNameConfigurations'][0]168        eh.add_props({169            "hosted_zone_id": config.get("HostedZoneId"),170            "api_gateway_domain_name": config.get("ApiGatewayDomainName")171        })172        eh.add_links({173            "AWS Custom Domain Name": gen_custom_domain_link(domain_name, region),174            "URL": gen_domain_url(domain_name)175        })176    except ClientError as e:177        handle_common_errors(e, eh, "Create Failure", 35, 178            ["NotFoundException", "BadRequestException", "AccessDeniedException"]...test_service_gateway.py
Source:test_service_gateway.py  
1from datetime import datetime2from botocore.stub import Stubber3from plugin.infrastructure.resource.aws.services.gateway import ApiGatewayService4import boto35api_gateway = boto3.client("apigateway", region_name="us-east-1")6stubber = Stubber(api_gateway)7class TestGatewayServicer(ApiGatewayService):8    __test__ = False9    def __init__(self):10        self.api_gateway = api_gateway11def test_gateway_init():12    service = ApiGatewayService("us-east-1")13    assert str(type(service.api_gateway)) == str(type(api_gateway))14def test_create_custom_domain(capsys):15    stubber.add_response(16            "create_domain_name",17            {18                "domainName": "domain"19                })20    stubber.activate()21    gateway_servicer = TestGatewayServicer()22    gateway_servicer.create_custom_domain("domain", "cert", "_type")23    captured = capsys.readouterr()24    assert captured.out == "domain created\n"25def test_create_custom_domain_error(caplog):26    stubber.add_client_error("create_domain_name", service_error_code="ClientError")27    stubber.activate()28    gateway_servicer = TestGatewayServicer()29    gateway_servicer.create_custom_domain("domain", "cert", "_type")30    assert "ClientError" in caplog.records[0].message31def test_not_exists_api_gateway():32    response = {33            'position': 'string',34            'items': [35                {36                    'id': 'string',37                    'name': 'not-found',38                    'description': 'string',39                    'createdDate': datetime(2015, 1, 1),40                    'version': 'string',41                    'warnings': [42                        'string',43                        ],44                    'binaryMediaTypes': [45                        'string',46                        ],47                    'minimumCompressionSize': 123,48                    'apiKeySource': 'HEADER',49                    'endpointConfiguration': {50                        'types': [ 'REGIONAL' ],51                        'vpcEndpointIds': [ 'string' ]52                        },53                    'policy': 'string',54                    'tags': {55                        'string': 'string'56                        },57                    'disableExecuteApiEndpoint': True58                    },59                ]60            }61    stubber.add_response("get_rest_apis", response)62    stubber.activate()63    gateway_servicer = TestGatewayServicer()64    res = gateway_servicer.not_exists_api_gateway("gw-api")65    assert res is True66def test_not_exists_api_gateway_exists():67    response = {68            'position': 'string',69            'items': [70                {71                    'id': 'string',72                    'name': 'gw-api',73                    'description': 'string',74                    'createdDate': datetime(2015, 1, 1),75                    'version': 'string',76                    'warnings': [77                        'string',78                        ],79                    'binaryMediaTypes': [80                        'string',81                        ],82                    'minimumCompressionSize': 123,83                    'apiKeySource': 'HEADER',84                    'endpointConfiguration': {85                        'types': [ 'REGIONAL' ],86                        'vpcEndpointIds': [ 'string' ]87                        },88                    'policy': 'string',89                    'tags': {90                        'string': 'string'91                        },92                    'disableExecuteApiEndpoint': True93                    },94                ]95            }96    stubber.add_response("get_rest_apis", response)97    stubber.activate()98    gateway_servicer = TestGatewayServicer()99    res = gateway_servicer.not_exists_api_gateway("gw-api")100    assert res is False101def test_not_exists_api_gateway_error(caplog):102    stubber.add_client_error("get_rest_apis", service_error_code="ClientError")103    stubber.activate()104    gateway_servicer = TestGatewayServicer()105    gateway_servicer.not_exists_api_gateway("gw")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
