How to use canonical_resource_type method in localstack

Best Python code snippet using localstack_python

template_deployer.py

Source:template_deployer.py Github

copy

Full Screen

...79def get_deployment_config(res_type):80 result = RESOURCE_TO_FUNCTION.get(res_type)81 if result is not None:82 return result83 canonical_type = canonical_resource_type(res_type)84 resource_class = RESOURCE_MODELS.get(canonical_type)85 if resource_class:86 return resource_class.get_deploy_templates()87def get_resource_type(resource):88 res_type = resource.get("ResourceType") or resource.get("Type") or ""89 parts = res_type.split("::", 1)90 if len(parts) == 1:91 return parts[0]92 return parts[1]93def get_service_name(resource):94 res_type = resource.get("Type", resource.get("ResourceType", ""))95 parts = res_type.split("::")96 if len(parts) == 1:97 return None98 if res_type.endswith("Cognito::UserPool"):99 return "cognito-idp"100 if parts[-2] == "Cognito":101 return "cognito-idp"102 if parts[-2] == "Elasticsearch":103 return "es"104 if parts[-2] == "KinesisFirehose":105 return "firehose"106 if parts[-2] == "ResourceGroups":107 return "resource-groups"108 if parts[-2] == "CertificateManager":109 return "acm"110 return parts[1].lower()111def get_resource_name(resource):112 properties = resource.get("Properties") or {}113 name = properties.get("Name")114 if name:115 return name116 # try to extract name via resource class117 res_type = canonical_resource_type(get_resource_type(resource))118 model_class = RESOURCE_MODELS.get(res_type)119 if model_class:120 instance = model_class(resource)121 name = instance.get_resource_name()122 if not name:123 LOG.debug('Unable to extract name for resource type "%s"' % res_type)124 return name125def get_client(resource, func_config):126 resource_type = get_resource_type(resource)127 service = get_service_name(resource)128 resource_config = get_deployment_config(resource_type)129 if resource_config is None:130 raise Exception(131 "CloudFormation deployment for resource type %s not yet implemented" % resource_type132 )133 try:134 if func_config.get("boto_client") == "resource":135 return aws_stack.connect_to_resource(service)136 return aws_stack.connect_to_service(service)137 except Exception as e:138 LOG.warning('Unable to get client for "%s" API, skipping deployment: %s' % (service, e))139 return None140def describe_stack_resource(stack_name, logical_resource_id):141 client = aws_stack.connect_to_service("cloudformation")142 try:143 result = client.describe_stack_resource(144 StackName=stack_name, LogicalResourceId=logical_resource_id145 )146 return result["StackResourceDetail"]147 except Exception as e:148 LOG.warning(149 'Unable to get details for resource "%s" in CloudFormation stack "%s": %s'150 % (logical_resource_id, stack_name, e)151 )152def retrieve_resource_details(resource_id, resource_status, resources, stack_name):153 resource = resources.get(resource_id)154 resource_id = resource_status.get("PhysicalResourceId") or resource_id155 if not resource:156 resource = {}157 resource_type = get_resource_type(resource)158 resource_props = resource.get("Properties")159 if resource_props is None:160 raise Exception(161 'Unable to find properties for resource "%s": %s %s'162 % (resource_id, resource, resources)163 )164 try:165 # convert resource props to resource entity166 instance = get_resource_model_instance(resource_id, resources)167 if instance:168 state = instance.fetch_and_update_state(stack_name=stack_name, resources=resources)169 return state170 # special case for stack parameters171 if resource_type == "Parameter":172 return resource_props173 # fallback: try accessing stack.moto_resource_statuses174 stack = find_stack(stack_name)175 moto_resource = stack.moto_resource_statuses.get(resource_id)176 if moto_resource:177 return moto_resource178 # if is_deployable_resource(resource):179 LOG.warning(180 "Unexpected resource type %s when resolving references of resource %s: %s"181 % (resource_type, resource_id, resource)182 )183 except DependencyNotYetSatisfied:184 return185 except Exception as e:186 check_not_found_exception(e, resource_type, resource, resource_status)187 return None188def check_not_found_exception(e, resource_type, resource, resource_status=None):189 # we expect this to be a "not found" exception190 markers = [191 "NoSuchBucket",192 "ResourceNotFound",193 "NoSuchEntity",194 "NotFoundException",195 "404",196 "not found",197 "not exist",198 ]199 if not list(filter(lambda marker, e=e: marker in str(e), markers)):200 LOG.warning(201 "Unexpected error retrieving details for resource type %s: Exception: %s - %s - status: %s"202 % (resource_type, e, resource, resource_status)203 )204 return False205 return True206def extract_resource_attribute(207 resource_type,208 resource_state,209 attribute,210 resource_id=None,211 resource=None,212 resources=None,213 stack_name=None,214):215 LOG.debug("Extract resource attribute: %s %s" % (resource_type, attribute))216 is_ref_attribute = attribute in ["PhysicalResourceId", "Ref"]217 is_ref_attr_or_arn = is_ref_attribute or attribute == "Arn"218 resource = resource or {}219 if not resource and resources:220 resource = resources[resource_id]221 if not resource_state:222 resource_state = retrieve_resource_details(resource_id, {}, resources, stack_name)223 if not resource_state:224 raise DependencyNotYetSatisfied(225 resource_ids=resource_id,226 message='Unable to fetch details for resource "%s" (attribute "%s")'227 % (resource_id, attribute),228 )229 if isinstance(resource_state, MotoCloudFormationModel):230 if is_ref_attribute:231 res_phys_id = getattr(resource_state, "physical_resource_id", None)232 get_res_phys_id = getattr(resource_state, "get_physical_resource_id", None)233 if not res_phys_id and get_res_phys_id:234 res_phys_id = get_res_phys_id(attribute)235 if res_phys_id:236 return res_phys_id237 if hasattr(resource_state, "get_cfn_attribute"):238 try:239 return resource_state.get_cfn_attribute(attribute)240 except Exception:241 pass242 raise Exception(243 'Unable to extract attribute "%s" from "%s" model class %s'244 % (attribute, resource_type, type(resource_state))245 )246 # extract resource specific attributes247 # TODO: remove the code below - move into resource model classes!248 resource_props = resource.get("Properties", {})249 if resource_type == "Parameter":250 result = None251 param_value = resource_props.get(252 "Value",253 resource.get("Value", resource_props.get("Properties", {}).get("Value")),254 )255 if is_ref_attr_or_arn:256 result = param_value257 elif isinstance(param_value, dict):258 result = param_value.get(attribute)259 if result is not None:260 return result261 return ""262 elif resource_type == "Lambda::Function":263 func_configs = resource_state.get("Configuration") or {}264 if is_ref_attr_or_arn:265 func_arn = func_configs.get("FunctionArn")266 if func_arn:267 return resolve_refs_recursively(stack_name, func_arn, resources)268 func_name = resolve_refs_recursively(269 stack_name, func_configs.get("FunctionName"), resources270 )271 return aws_stack.lambda_function_arn(func_name)272 else:273 return func_configs.get(attribute)274 elif resource_type == "Lambda::Version":275 if resource_state.get("Version"):276 return "%s:%s" % (277 resource_state.get("FunctionArn"),278 resource_state.get("Version").split(":")[-1],279 )280 elif resource_type == "DynamoDB::Table":281 actual_attribute = "LatestStreamArn" if attribute == "StreamArn" else attribute282 value = resource_state.get("Table", {}).get(actual_attribute)283 if value:284 return value285 elif resource_type == "ApiGateway::RestApi":286 if is_ref_attribute:287 result = resource_state.get("id")288 if result:289 return result290 if attribute == "RootResourceId":291 api_id = resource_state["id"]292 resources = aws_stack.connect_to_service("apigateway").get_resources(restApiId=api_id)[293 "items"294 ]295 for res in resources:296 if res["path"] == "/" and not res.get("parentId"):297 return res["id"]298 elif resource_type == "ApiGateway::Resource":299 if is_ref_attribute:300 return resource_state.get("id")301 elif resource_type == "ApiGateway::Deployment":302 if is_ref_attribute:303 return resource_state.get("id")304 elif resource_type == "S3::Bucket":305 if attribute == "WebsiteURL":306 bucket_name = resource_props.get("BucketName")307 return f"http://{bucket_name}.{S3_STATIC_WEBSITE_HOSTNAME}"308 if is_ref_attr_or_arn:309 bucket_name = resource_props.get("BucketName")310 bucket_name = resolve_refs_recursively(stack_name, bucket_name, resources)311 if attribute == "Arn":312 return aws_stack.s3_bucket_arn(bucket_name)313 return bucket_name314 elif resource_type == "Elasticsearch::Domain":315 if attribute == "DomainEndpoint":316 domain_status = resource_state.get("DomainStatus", {})317 result = domain_status.get("Endpoint")318 if result:319 return result320 if attribute in ["Arn", "DomainArn"]:321 domain_name = resource_props.get("DomainName") or resource_state.get("DomainName")322 return aws_stack.es_domain_arn(domain_name)323 elif resource_type == "StepFunctions::StateMachine":324 if is_ref_attr_or_arn:325 return resource_state["stateMachineArn"]326 elif resource_type == "SNS::Topic":327 if is_ref_attribute and resource_state.get("TopicArn"):328 topic_arn = resource_state.get("TopicArn")329 return resolve_refs_recursively(stack_name, topic_arn, resources)330 elif resource_type == "SQS::Queue":331 if is_ref_attr_or_arn:332 if attribute == "Arn" and resource_state.get("QueueArn"):333 return resolve_refs_recursively(334 stack_name, resource_state.get("QueueArn"), resources335 )336 return aws_stack.get_sqs_queue_url(resource_props.get("QueueName"))337 attribute_lower = common.first_char_to_lower(attribute)338 result = resource_state.get(attribute) or resource_state.get(attribute_lower)339 if result is None and isinstance(resource, dict):340 result = resource_props.get(attribute) or resource_props.get(attribute_lower)341 if result is None:342 result = get_attr_from_model_instance(343 resource,344 attribute,345 resource_type=resource_type,346 resource_id=resource_id,347 )348 if is_ref_attribute:349 for attr in ["Id", "PhysicalResourceId", "Ref"]:350 if result is None:351 for obj in [resource_state, resource]:352 result = result or obj.get(attr)353 return result354def canonical_resource_type(resource_type):355 if "::" in resource_type and not resource_type.startswith("AWS::"):356 resource_type = "AWS::%s" % resource_type357 return resource_type358def get_attr_from_model_instance(resource, attribute, resource_type, resource_id=None):359 resource_type = canonical_resource_type(resource_type)360 model_class = RESOURCE_MODELS.get(resource_type)361 if not model_class:362 if resource_type not in ["AWS::Parameter", "Parameter"]:363 LOG.debug('Unable to find model class for resource type "%s"' % resource_type)364 return365 try:366 inst = model_class(resource_name=resource_id, resource_json=resource)367 return inst.get_cfn_attribute(attribute)368 except Exception:369 pass370def resolve_ref(stack_name, ref, resources, attribute):371 if ref == "AWS::Region":372 return aws_stack.get_region()373 if ref == "AWS::Partition":374 return "aws"375 if ref == "AWS::StackName":376 return stack_name377 if ref == "AWS::StackId":378 # TODO return proper stack id!379 return stack_name380 if ref == "AWS::AccountId":381 return TEST_AWS_ACCOUNT_ID382 if ref == "AWS::NoValue":383 return PLACEHOLDER_AWS_NO_VALUE384 if ref == "AWS::NotificationARNs":385 # TODO!386 return {}387 if ref == "AWS::URLSuffix":388 return AWS_URL_SUFFIX389 is_ref_attribute = attribute in ["Ref", "PhysicalResourceId", "Arn"]390 if is_ref_attribute:391 # extract the Properties here, as we only want to recurse over the resource props...392 resource_props = resources.get(ref, {}).get("Properties")393 resolve_refs_recursively(stack_name, resource_props, resources)394 return determine_resource_physical_id(395 resource_id=ref,396 resources=resources,397 attribute=attribute,398 stack_name=stack_name,399 )400 if resources.get(ref):401 if isinstance(resources[ref].get(attribute), (str, int, float, bool, dict)):402 return resources[ref][attribute]403 # fetch resource details404 resource_new = retrieve_resource_details(ref, {}, resources, stack_name)405 if not resource_new:406 raise DependencyNotYetSatisfied(407 resource_ids=ref,408 message='Unable to fetch details for resource "%s" (resolving attribute "%s")'409 % (ref, attribute),410 )411 resource = resources.get(ref)412 resource_type = get_resource_type(resource)413 result = extract_resource_attribute(414 resource_type,415 resource_new,416 attribute,417 resource_id=ref,418 resource=resource,419 resources=resources,420 stack_name=stack_name,421 )422 if result is None:423 LOG.warning(424 'Unable to extract reference attribute "%s" from resource: %s %s'425 % (attribute, resource_new, resource)426 )427 return result428# Using a @prevent_stack_overflow decorator here to avoid infinite recursion429# in case we load stack exports that have circular dependencies (see issue 3438)430# TODO: Potentially think about a better approach in the future431@prevent_stack_overflow(match_parameters=True)432def resolve_refs_recursively(stack_name, value, resources):433 if isinstance(value, dict):434 keys_list = list(value.keys())435 stripped_fn_lower = keys_list[0].lower().split("::")[-1] if len(keys_list) == 1 else None436 # process special operators437 if keys_list == ["Ref"]:438 ref = resolve_ref(stack_name, value["Ref"], resources, attribute="Ref")439 if ref is None:440 msg = 'Unable to resolve Ref for resource "%s" (yet)' % value["Ref"]441 LOG.debug("%s - %s" % (msg, resources.get(value["Ref"]) or set(resources.keys())))442 raise DependencyNotYetSatisfied(resource_ids=value["Ref"], message=msg)443 ref = resolve_refs_recursively(stack_name, ref, resources)444 return ref445 if stripped_fn_lower == "getatt":446 attr_ref = value[keys_list[0]]447 attr_ref = attr_ref.split(".") if isinstance(attr_ref, str) else attr_ref448 return resolve_ref(stack_name, attr_ref[0], resources, attribute=attr_ref[1])449 if stripped_fn_lower == "join":450 join_values = value[keys_list[0]][1]451 join_values = [resolve_refs_recursively(stack_name, v, resources) for v in join_values]452 none_values = [v for v in join_values if v is None]453 if none_values:454 raise Exception(455 "Cannot resolve CF fn::Join %s due to null values: %s" % (value, join_values)456 )457 return value[keys_list[0]][0].join([str(v) for v in join_values])458 if stripped_fn_lower == "sub":459 item_to_sub = value[keys_list[0]]460 attr_refs = dict([(r, {"Ref": r}) for r in STATIC_REFS])461 if not isinstance(item_to_sub, list):462 item_to_sub = [item_to_sub, {}]463 result = item_to_sub[0]464 item_to_sub[1].update(attr_refs)465 for key, val in item_to_sub[1].items():466 val = resolve_refs_recursively(stack_name, val, resources)467 result = result.replace("${%s}" % key, val)468 # resolve placeholders469 result = resolve_placeholders_in_string(470 result, stack_name=stack_name, resources=resources471 )472 return result473 if stripped_fn_lower == "findinmap":474 attr = resolve_refs_recursively(stack_name, value[keys_list[0]][1], resources)475 result = resolve_ref(stack_name, value[keys_list[0]][0], resources, attribute=attr)476 if not result:477 raise Exception(478 "Cannot resolve fn::FindInMap: %s %s"479 % (value[keys_list[0]], list(resources.keys()))480 )481 key = value[keys_list[0]][2]482 if not isinstance(key, str):483 key = resolve_refs_recursively(stack_name, key, resources)484 return result.get(key)485 if stripped_fn_lower == "importvalue":486 import_value_key = resolve_refs_recursively(stack_name, value[keys_list[0]], resources)487 stack = find_stack(stack_name)488 stack_export = stack.exports_map.get(import_value_key) or {}489 if not stack_export.get("Value"):490 LOG.info(491 'Unable to find export "%s" in stack "%s", existing export names: %s'492 % (import_value_key, stack_name, list(stack.exports_map.keys()))493 )494 return None495 return stack_export["Value"]496 if stripped_fn_lower == "if":497 condition, option1, option2 = value[keys_list[0]]498 condition = evaluate_condition(stack_name, condition, resources)499 return resolve_refs_recursively(500 stack_name, option1 if condition else option2, resources501 )502 if stripped_fn_lower == "condition":503 result = evaluate_condition(stack_name, value[keys_list[0]], resources)504 return result505 if stripped_fn_lower == "not":506 condition = value[keys_list[0]][0]507 condition = resolve_refs_recursively(stack_name, condition, resources)508 return not condition509 if stripped_fn_lower in ["and", "or"]:510 conditions = value[keys_list[0]]511 results = [resolve_refs_recursively(stack_name, cond, resources) for cond in conditions]512 result = all(results) if stripped_fn_lower == "and" else any(results)513 return result514 if stripped_fn_lower == "equals":515 operand1, operand2 = value[keys_list[0]]516 operand1 = resolve_refs_recursively(stack_name, operand1, resources)517 operand2 = resolve_refs_recursively(stack_name, operand2, resources)518 return str(operand1) == str(operand2)519 if stripped_fn_lower == "select":520 index, values = value[keys_list[0]]521 index = resolve_refs_recursively(stack_name, index, resources)522 values = resolve_refs_recursively(stack_name, values, resources)523 return values[index]524 if stripped_fn_lower == "split":525 delimiter, string = value[keys_list[0]]526 delimiter = resolve_refs_recursively(stack_name, delimiter, resources)527 string = resolve_refs_recursively(stack_name, string, resources)528 return string.split(delimiter)529 if stripped_fn_lower == "getazs":530 region = (531 resolve_refs_recursively(stack_name, value["Fn::GetAZs"], resources)532 or aws_stack.get_region()533 )534 azs = []535 for az in ("a", "b", "c", "d"):536 azs.append("%s%s" % (region, az))537 return azs538 if stripped_fn_lower == "base64":539 value_to_encode = value[keys_list[0]]540 value_to_encode = resolve_refs_recursively(stack_name, value_to_encode, resources)541 return to_str(base64.b64encode(to_bytes(value_to_encode)))542 for key, val in dict(value).items():543 value[key] = resolve_refs_recursively(stack_name, val, resources)544 if isinstance(value, list):545 for i in range(len(value)):546 value[i] = resolve_refs_recursively(stack_name, value[i], resources)547 return value548def resolve_placeholders_in_string(result, stack_name=None, resources=None):549 def _replace(match):550 parts = match.group(1).split(".")551 if len(parts) >= 2:552 resource_name, _, attr_name = match.group(1).partition(".")553 resolved = resolve_ref(554 stack_name, resource_name.strip(), resources, attribute=attr_name.strip()555 )556 if resolved is None:557 raise DependencyNotYetSatisfied(558 resource_ids=resource_name,559 message="Unable to resolve attribute ref %s" % match.group(1),560 )561 return resolved562 if len(parts) == 1 and parts[0] in resources:563 resource_json = resources[parts[0]]564 resource_type = get_resource_type(resource_json)565 result = extract_resource_attribute(566 resource_type,567 {},568 "Ref",569 resources=resources,570 resource_id=parts[0],571 stack_name=stack_name,572 )573 if result is None:574 raise DependencyNotYetSatisfied(575 resource_ids=parts[0],576 message="Unable to resolve attribute ref %s" % match.group(1),577 )578 return result579 # TODO raise exception here?580 return match.group(0)581 regex = r"\$\{([^\}]+)\}"582 result = re.sub(regex, _replace, result)583 return result584def evaluate_condition(stack_name, condition, resources):585 condition = resolve_refs_recursively(stack_name, condition, resources)586 condition = resolve_ref(stack_name, condition, resources, attribute="Ref")587 condition = resolve_refs_recursively(stack_name, condition, resources)588 return condition589def evaluate_resource_condition(resource, stack_name, resources):590 condition = resource.get("Condition")591 if condition:592 condition = evaluate_condition(stack_name, condition, resources)593 if condition is False or condition in FALSE_STRINGS or is_none_or_empty_value(condition):594 return False595 return True596def get_stack_parameter(stack_name, parameter):597 try:598 client = aws_stack.connect_to_service("cloudformation")599 stack = client.describe_stacks(StackName=stack_name)["Stacks"]600 except Exception:601 return None602 stack = stack and stack[0]603 if not stack:604 return None605 result = [p["ParameterValue"] for p in stack["Parameters"] if p["ParameterKey"] == parameter]606 return (result or [None])[0]607def update_resource(resource_id, resources, stack_name):608 resource = resources[resource_id]609 resource_type = get_resource_type(resource)610 if resource_type not in UPDATEABLE_RESOURCES:611 LOG.warning('Unable to update resource type "%s", id "%s"' % (resource_type, resource_id))612 return613 LOG.info("Updating resource %s of type %s" % (resource_id, resource_type))614 instance = get_resource_model_instance(resource_id, resources)615 if instance:616 result = instance.update_resource(resource, stack_name=stack_name, resources=resources)617 instance.fetch_and_update_state(stack_name=stack_name, resources=resources)618 return result619def get_resource_model_instance(resource_id: str, resources) -> Optional[GenericBaseModel]:620 """Obtain a typed resource entity instance representing the given stack resource."""621 resource = resources[resource_id]622 resource_type = get_resource_type(resource)623 canonical_type = canonical_resource_type(resource_type)624 resource_class = RESOURCE_MODELS.get(canonical_type)625 if not resource_class:626 return None627 instance = resource_class(resource)628 return instance629def fix_account_id_in_arns(params):630 def fix_ids(o, **kwargs):631 if isinstance(o, dict):632 for k, v in o.items():633 if common.is_string(v, exclude_binary=True):634 o[k] = aws_stack.fix_account_id_in_arns(v)635 elif common.is_string(o, exclude_binary=True):636 o = aws_stack.fix_account_id_in_arns(o)637 return o638 result = common.recurse_object(params, fix_ids)639 return result640def convert_data_types(func_details, params):641 """Convert data types in the "params" object, with the type defs642 specified in the 'types' attribute of "func_details"."""643 types = func_details.get("types") or {}644 attr_names = types.keys() or []645 def cast(_obj, _type):646 if _type == bool:647 return _obj in ["True", "true", True]648 if _type == str:649 if isinstance(_obj, bool):650 return str(_obj).lower()651 return str(_obj)652 if _type == int:653 return int(_obj)654 return _obj655 def fix_types(o, **kwargs):656 if isinstance(o, dict):657 for k, v in o.items():658 if k in attr_names:659 o[k] = cast(v, types[k])660 return o661 result = common.recurse_object(params, fix_types)662 return result663# TODO remove this method664def prepare_template_body(req_data):665 return template_preparer.prepare_template_body(req_data)666def deploy_resource(resource_id, resources, stack_name):667 result = execute_resource_action(resource_id, resources, stack_name, ACTION_CREATE)668 return result669def delete_resource(resource_id, resources, stack_name):670 return execute_resource_action(resource_id, resources, stack_name, ACTION_DELETE)671def execute_resource_action_fallback(672 action_name, resource_id, resources, stack_name, resource, resource_type673):674 # using moto as fallback for now - TODO remove in the future!675 msg = 'Action "%s" for resource type %s not yet implemented' % (676 action_name,677 resource_type,678 )679 long_type = canonical_resource_type(resource_type)680 clazz = parsing.MODEL_MAP.get(long_type)681 if not clazz:682 LOG.warning(msg)683 return684 LOG.info("%s - using fallback mechanism" % msg)685 if action_name == ACTION_CREATE:686 resource_name = get_resource_name(resource) or resource_id687 result = clazz.create_from_cloudformation_json(688 resource_name, resource, aws_stack.get_region()689 )690 return result691def execute_resource_action(resource_id, resources, stack_name, action_name):692 resource = resources[resource_id]693 resource_type = get_resource_type(resource)694 func_details = get_deployment_config(resource_type)695 if not func_details or action_name not in func_details:696 if resource_type in ["Parameter"]:697 return698 return execute_resource_action_fallback(699 action_name, resource_id, resources, stack_name, resource, resource_type700 )701 LOG.debug(702 'Running action "%s" for resource type "%s" id "%s"'703 % (action_name, resource_type, resource_id)704 )705 func_details = func_details[action_name]706 func_details = func_details if isinstance(func_details, list) else [func_details]707 results = []708 for func in func_details:709 if callable(func["function"]):710 result = func["function"](resource_id, resources, resource_type, func, stack_name)711 results.append(result)712 continue713 client = get_client(resource, func)714 if client:715 result = configure_resource_via_sdk(716 resource_id, resources, resource_type, func, stack_name, action_name717 )718 results.append(result)719 return (results or [None])[0]720def configure_resource_via_sdk(721 resource_id, resources, resource_type, func_details, stack_name, action_name722):723 resource = resources[resource_id]724 if resource_type == "EC2::Instance":725 if action_name == "create":726 func_details["boto_client"] = "resource"727 client = get_client(resource, func_details)728 function = getattr(client, func_details["function"])729 params = func_details.get("parameters") or lambda_get_params()730 defaults = func_details.get("defaults", {})731 resource_props = resource["Properties"] = resource.get("Properties", {})732 resource_props = dict(resource_props)733 resource_state = resource.get(KEY_RESOURCE_STATE, {})734 if callable(params):735 params = params(736 resource_props,737 stack_name=stack_name,738 resources=resources,739 resource_id=resource_id,740 )741 else:742 # it could be a list like ['param1', 'param2', {'apiCallParamName': 'cfResourcePropName'}]743 if isinstance(params, list):744 _params = {}745 for param in params:746 if isinstance(param, dict):747 _params.update(param)748 else:749 _params[param] = param750 params = _params751 params = dict(params)752 for param_key, prop_keys in dict(params).items():753 params.pop(param_key, None)754 if not isinstance(prop_keys, list):755 prop_keys = [prop_keys]756 for prop_key in prop_keys:757 if prop_key == PLACEHOLDER_RESOURCE_NAME:758 params[param_key] = PLACEHOLDER_RESOURCE_NAME759 else:760 if callable(prop_key):761 prop_value = prop_key(762 resource_props,763 stack_name=stack_name,764 resources=resources,765 resource_id=resource_id,766 )767 else:768 prop_value = resource_props.get(769 prop_key,770 resource.get(prop_key, resource_state.get(prop_key)),771 )772 if prop_value is not None:773 params[param_key] = prop_value774 break775 # replace PLACEHOLDER_RESOURCE_NAME in params776 resource_name_holder = {}777 def fix_placeholders(o, **kwargs):778 if isinstance(o, dict):779 for k, v in o.items():780 if v == PLACEHOLDER_RESOURCE_NAME:781 if "value" not in resource_name_holder:782 resource_name_holder["value"] = get_resource_name(resource) or resource_id783 o[k] = resource_name_holder["value"]784 return o785 common.recurse_object(params, fix_placeholders)786 # assign default values if empty787 params = common.merge_recursive(defaults, params)788 # this is an indicator that we should skip this resource deployment, and return789 if params is None:790 return791 # convert refs792 for param_key, param_value in dict(params).items():793 if param_value is not None:794 param_value = params[param_key] = resolve_refs_recursively(795 stack_name, param_value, resources796 )797 # convert any moto account IDs (123456789012) in ARNs to our format (000000000000)798 params = fix_account_id_in_arns(params)799 # convert data types (e.g., boolean strings to bool)800 params = convert_data_types(func_details, params)801 # remove None values, as they usually raise boto3 errors802 params = remove_none_values(params)803 # convert boolean strings804 # (TODO: we should find a more reliable mechanism than this opportunistic/probabilistic approach!)805 params_before_conversion = copy.deepcopy(params)806 for param_key, param_value in dict(params).items():807 # Convert to boolean (TODO: do this recursively?)808 if str(param_value).lower() in ["true", "false"]:809 params[param_key] = str(param_value).lower() == "true"810 # invoke function811 try:812 LOG.debug(813 'Request for resource type "%s" in region %s: %s %s'814 % (resource_type, aws_stack.get_region(), func_details["function"], params)815 )816 try:817 result = function(**params)818 except botocore.exceptions.ParamValidationError as e:819 LOG.debug(f"Trying original parameters: {params_before_conversion}")820 if "type: <class 'bool'>" not in str(e):821 raise822 result = function(**params_before_conversion)823 except Exception as e:824 if action_name == "delete" and check_not_found_exception(e, resource_type, resource):825 return826 LOG.warning(827 "Error calling %s with params: %s for resource: %s" % (function, params, resource)828 )829 raise e830 return result831def get_action_name_for_resource_change(res_change):832 return {"Add": "CREATE", "Remove": "DELETE", "Modify": "UPDATE"}.get(res_change)833# TODO: this shouldn't be called for stack parameters834def determine_resource_physical_id(835 resource_id, resources=None, stack=None, attribute=None, stack_name=None836):837 resources = resources or stack.resources838 stack_name = stack_name or stack.stack_name839 resource = resources.get(resource_id, {})840 if not resource:841 return842 resource_type = get_resource_type(resource)843 resource_type = re.sub("^AWS::", "", resource_type)844 resource_props = resource.get("Properties", {})845 # determine result from resource class846 canonical_type = canonical_resource_type(resource_type)847 resource_class = RESOURCE_MODELS.get(canonical_type)848 if resource_class:849 resource_inst = resource_class(resource)850 resource_inst.fetch_state_if_missing(stack_name=stack_name, resources=resources)851 result = resource_inst.get_physical_resource_id(attribute=attribute)852 if result:853 return result854 # TODO: put logic into resource-specific model classes!855 if resource_type == "ApiGateway::RestApi":856 result = resource_props.get("id")857 if result:858 return result859 elif resource_type == "ApiGateway::Stage":860 return resource_props.get("StageName")861 elif resource_type == "AppSync::DataSource":862 return resource_props.get("DataSourceArn")863 elif resource_type == "KinesisFirehose::DeliveryStream":864 return aws_stack.firehose_stream_arn(resource_props.get("DeliveryStreamName"))865 elif resource_type == "StepFunctions::StateMachine":866 return aws_stack.state_machine_arn(867 resource_props.get("StateMachineName")868 ) # returns ARN in AWS869 elif resource_type == "S3::Bucket":870 if attribute == "Arn":871 return aws_stack.s3_bucket_arn(resource_props.get("BucketName"))872 return resource_props.get("BucketName") # Note: "Ref" returns bucket name in AWS873 elif resource_type == "IAM::Role":874 if attribute == "Arn":875 return aws_stack.role_arn(resource_props.get("RoleName"))876 return resource_props.get("RoleName")877 elif resource_type == "SecretsManager::Secret":878 arn = get_secret_arn(resource_props.get("Name")) or ""879 if attribute == "Arn":880 return arn881 return arn.split(":")[-1]882 elif resource_type == "IAM::Policy":883 if attribute == "Arn":884 return aws_stack.policy_arn(resource_props.get("PolicyName"))885 return resource_props.get("PolicyName")886 elif resource_type == "DynamoDB::Table":887 table_name = resource_props.get("TableName")888 if table_name:889 if attribute == "Ref":890 return table_name # Note: "Ref" returns table name in AWS891 return table_name892 elif resource_type == "Logs::LogGroup":893 return resource_props.get("LogGroupName")894 res_id = resource.get("PhysicalResourceId")895 if res_id and attribute in [None, "Ref", "PhysicalResourceId"]:896 return res_id897 result = extract_resource_attribute(898 resource_type,899 {},900 attribute or "PhysicalResourceId",901 stack_name=stack_name,902 resource_id=resource_id,903 resource=resource,904 resources=resources,905 )906 if result is not None:907 # note that value could be an empty string here (in case of Parameter values)908 return result909 LOG.info(910 'Unable to determine PhysicalResourceId for "%s" resource, ID "%s"'911 % (resource_type, resource_id)912 )913def update_resource_details(stack, resource_id, details, action=None):914 resource = stack.resources.get(resource_id, {})915 if not resource or not details:916 return917 # TODO: we need to rethink this method - this should be encapsulated in the resource model classes.918 # Also, instead of actively updating the PhysicalResourceId attributes below, they should be919 # determined and returned by the resource model classes upon request.920 resource_type = resource.get("Type") or ""921 resource_type = re.sub("^AWS::", "", resource_type)922 resource_props = resource.get("Properties", {})923 if resource_type == "ApiGateway::RestApi":924 resource_props["id"] = details["id"]925 if resource_type == "KMS::Key":926 resource["PhysicalResourceId"] = details["KeyMetadata"]["KeyId"]927 if resource_type == "EC2::Instance":928 if details and isinstance(details, list) and hasattr(details[0], "id"):929 resource["PhysicalResourceId"] = details[0].id930 if isinstance(details, dict) and details.get("InstanceId"):931 resource["PhysicalResourceId"] = details["InstanceId"]932 if resource_type == "EC2::SecurityGroup":933 resource["PhysicalResourceId"] = details["GroupId"]934 if resource_type == "IAM::InstanceProfile":935 resource["PhysicalResourceId"] = details["InstanceProfile"]["InstanceProfileName"]936 if resource_type == "StepFunctions::Activity":937 resource["PhysicalResourceId"] = details["activityArn"]938 if resource_type == "ApiGateway::Model":939 resource["PhysicalResourceId"] = details["id"]940 if resource_type == "EC2::VPC":941 resource["PhysicalResourceId"] = details["Vpc"]["VpcId"]942 if resource_type == "EC2::Subnet":943 resource["PhysicalResourceId"] = details["Subnet"]["SubnetId"]944 if resource_type == "EC2::RouteTable":945 resource["PhysicalResourceId"] = details["RouteTable"]["RouteTableId"]946 if resource_type == "EC2::Route":947 resource["PhysicalResourceId"] = generate_route_id(948 resource_props["RouteTableId"],949 resource_props.get("DestinationCidrBlock", ""),950 resource_props.get("DestinationIpv6CidrBlock"),951 )952 # TODO remove!953 if isinstance(details, MotoCloudFormationModel):954 # fallback: keep track of moto resource status955 stack.moto_resource_statuses[resource_id] = details956def add_default_resource_props(957 resource,958 stack_name,959 resource_name=None,960 resource_id=None,961 update=False,962 existing_resources=None,963):964 """Apply some fixes to resource props which otherwise cause deployments to fail"""965 res_type = resource["Type"]966 canonical_type = canonical_resource_type(res_type)967 resource_class = RESOURCE_MODELS.get(canonical_type)968 if resource_class is not None:969 resource_class.add_defaults(resource, stack_name)970# -----------------------971# MAIN TEMPLATE DEPLOYER972# -----------------------973class TemplateDeployer(object):974 def __init__(self, stack):975 self.stack = stack976 @property977 def resources(self):978 return self.stack.resources979 @property980 def stack_name(self):981 return self.stack.stack_name982 # ------------------983 # MAIN ENTRY POINTS984 # ------------------985 def deploy_stack(self):986 self.stack.set_stack_status("CREATE_IN_PROGRESS")987 try:988 self.apply_changes(989 self.stack,990 self.stack,991 stack_name=self.stack.stack_name,992 initialize=True,993 action="CREATE",994 )995 except Exception as e:996 LOG.info("Unable to create stack %s: %s" % (self.stack.stack_name, e))997 self.stack.set_stack_status("CREATE_FAILED")998 raise999 def apply_change_set(self, change_set):1000 action = "CREATE"1001 change_set.stack.set_stack_status("%s_IN_PROGRESS" % action)1002 try:1003 self.apply_changes(1004 change_set.stack,1005 change_set,1006 stack_name=change_set.stack_name,1007 action=action,1008 )1009 except Exception as e:1010 LOG.info(1011 "Unable to apply change set %s: %s" % (change_set.metadata.get("ChangeSetName"), e)1012 )1013 change_set.metadata["Status"] = "%s_FAILED" % action1014 self.stack.set_stack_status("%s_FAILED" % action)1015 raise1016 def update_stack(self, new_stack):1017 self.stack.set_stack_status("UPDATE_IN_PROGRESS")1018 # apply changes1019 self.apply_changes(self.stack, new_stack, stack_name=self.stack.stack_name, action="UPDATE")1020 def delete_stack(self):1021 if not self.stack:1022 return1023 self.stack.set_stack_status("DELETE_IN_PROGRESS")1024 stack_resources = list(self.stack.resources.values())1025 stack_name = self.stack.stack_name1026 resources = dict([(r["LogicalResourceId"], common.clone_safe(r)) for r in stack_resources])1027 for key, resource in resources.items():1028 resource["Properties"] = resource.get("Properties", common.clone_safe(resource))1029 resource["ResourceType"] = resource.get("ResourceType") or resource.get("Type")1030 for resource_id, resource in resources.items():1031 # TODO: cache condition value in resource details on deployment and use cached value here1032 if evaluate_resource_condition(resource, stack_name, resources):1033 delete_resource(resource_id, resources, stack_name)1034 self.stack.set_resource_status(resource_id, "DELETE_COMPLETE")1035 # update status1036 self.stack.set_stack_status("DELETE_COMPLETE")1037 # ----------------------------1038 # DEPENDENCY RESOLUTION UTILS1039 # ----------------------------1040 def is_deployable_resource(self, resource):1041 resource_type = get_resource_type(resource)1042 entry = get_deployment_config(resource_type)1043 if entry is None and resource_type not in ["Parameter", None]:1044 # fall back to moto resource creation (TODO: remove in the future)1045 long_res_type = canonical_resource_type(resource_type)1046 if long_res_type in parsing.MODEL_MAP:1047 return True1048 LOG.warning('Unable to deploy resource type "%s": %s' % (resource_type, resource))1049 return bool(entry and entry.get(ACTION_CREATE))1050 def is_deployed(self, resource):1051 resource_status = {}1052 resource_id = resource["LogicalResourceId"]1053 details = retrieve_resource_details(1054 resource_id, resource_status, self.resources, self.stack_name1055 )1056 return bool(details)1057 def is_updateable(self, resource):1058 """Return whether the given resource can be updated or not."""1059 if not self.is_deployable_resource(resource) or not self.is_deployed(resource):...

Full Screen

Full Screen

replicate.py

Source:replicate.py Github

copy

Full Screen

...70 # update cached resources file71 save_file(res_types_file, json.dumps(all_types))72 # add custom resource types73 for res_type, details in SERVICE_RESOURCES.items():74 res_type = canonical_resource_type(res_type)75 existing = [ts for ts in all_types if ts["TypeName"] == res_type]76 if not existing:77 all_types.append({"TypeName": res_type})78 return all_types79 def get_resources(self, resource_type: str) -> List[Dict]:80 result = []81 result += self.get_resources_cloudcontrol(resource_type)82 result += self.get_resources_custom(resource_type)83 return result84 def get_resources_cloudcontrol(self, resource_type: str) -> List[Dict]:85 cloudcontrol = self.session.client("cloudcontrol")86 try:87 # fetch the list of resource identifiers88 res_list = list_all_resources(89 lambda kwargs: cloudcontrol.list_resources(TypeName=resource_type),90 last_token_attr_name="NextToken",91 list_attr_name="ResourceDescriptions",92 )93 # fetch the detailed resource descriptions94 def handle(resource):95 cloudcontrol = self.session.client("cloudcontrol")96 res_details = cloudcontrol.get_resource(97 TypeName=resource_type, Identifier=resource["Identifier"]98 )99 with lock:100 resources.append(res_details)101 # parallelizing the execution, as CloudControl can be very slow to respond102 lock = threading.RLock()103 resources = []104 parallelize(handle, res_list)105 result = [106 {107 "TypeName": resource_type,108 "Identifier": r["ResourceDescription"]["Identifier"],109 "Properties": json.loads(r["ResourceDescription"]["Properties"]),110 }111 for r in resources112 ]113 return result114 except Exception as e:115 exs = str(e)116 if "UnsupportedActionException" in exs:117 LOG.info("Unsupported operation: %s", e)118 return []119 if "must not be null" in exs or "cannot be empty" in exs:120 LOG.info("Unable to list resources: %s", e)121 return []122 LOG.warning("Unknown error occurred: %s", e)123 return []124 def get_resources_custom(self, resource_type: str) -> List[Dict]:125 resource_type_short = resource_type.removeprefix("AWS::")126 details = SERVICE_RESOURCES.get(resource_type_short)127 if not details:128 return []129 resource_type = canonical_resource_type(resource_type)130 model_class = load_resource_models().get(resource_type)131 if not model_class:132 return []133 service_name = template_deployer.get_service_name({"Type": resource_type})134 from_client = boto3.client(service_name)135 res_list = getattr(from_client, details["list_operation"])()136 res_selector = details.get("results")137 if res_selector:138 res_list = extract_jsonpath(res_list, res_selector)139 res_list = res_list or []140 result = []141 for _resource in res_list:142 props = {}143 props_mapping = details.get("props") or {}144 for prop_key, prop_val in props_mapping.items():145 if "$" in prop_val:146 props[prop_key] = extract_jsonpath(_resource, prop_val)147 fetch_details = details.get("fetch_details")148 if fetch_details:149 params = deepcopy(fetch_details.get("parameters") or {})150 for param, param_value in params.items():151 if "$" in param_value:152 params[param] = extract_jsonpath(_resource, param_value)153 res_details = getattr(from_client, fetch_details["operation"])(**params)154 res_fields = fetch_details.get("results")155 for res_field in res_fields:156 if isinstance(res_field, dict):157 field_name = list(res_field.keys())[0]158 field_value = list(res_field.values())[0]159 if callable(field_value):160 field_value = field_value(res_details)161 else:162 field_name = res_field.split(".")[-1]163 field_value = extract_jsonpath(res_details, res_field)164 if field_value != []:165 props[field_name] = field_value166 res_json = {"Type": resource_type, "Properties": props}167 result.append(res_json)168 return result169class ResourceReplicator:170 """Utility that creates resources from CloudFormation/CloudControl templates."""171 def create(self, resource: Dict):172 model_class = self._get_resource_model(resource)173 if not model_class:174 return175 res_type = self._resource_type(resource)176 res_json = {"Type": res_type, "Properties": resource["Properties"]}177 LOG.debug("Deploying CloudFormation resource: %s", res_json)178 # note: quick hack for now - creating a fake Stack for each individual resource to be deployed179 stack = Stack({"StackName": "s1"})180 stack.template_resources["myres"] = res_json181 resource_status = template_deployer.retrieve_resource_details("myres", {}, stack)182 if not resource_status:183 # deploy resource, if it doesn't exist yet184 template_deployer.deploy_resource(stack, "myres")185 # TODO: need to ensure that the ID of the created resource also matches!186 # add extended state (e.g., actual S3 objects)187 self.add_extended_resource_state(res_json)188 def add_extended_resource_state(self, resource: Dict, state: Dict = None):189 model_class = self._get_resource_model(resource)190 if not issubclass(model_class, ExtendedResourceStateReplicator):191 return192 model_instance = model_class(resource)193 if state is None:194 return model_instance.add_extended_state_external()195 return model_instance.add_extended_state_internal(state)196 def _resource_type(self, resource: Dict) -> str:197 res_type = resource.get("Type") or resource["TypeName"]198 return canonical_resource_type(res_type)199 def _get_resource_model(self, resource: Dict) -> Type:200 res_type = self._resource_type(resource)201 model_class = load_resource_models().get(res_type)202 if not model_class:203 LOG.info("Unable to find CloudFormation model class for resource: %s", res_type)204 return model_class205def replicate_state(206 scraper: AwsAccountScraper, creator: ResourceReplicator, services: List[str] = None207):208 """Replicate the state from a source AWS account into a target account (or LocalStack)"""209 res_types = scraper.get_resource_types()210 LOG.info("Found %s Cloud Control resources types", len(res_types))211 for res_type in res_types:212 type_name = res_type["TypeName"]...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful