How to use remove_tags_from_resource method in localstack

Best Python code snippet using localstack_python

rds.py

Source:rds.py Github

copy

Full Screen

1# Copyright: (c) 2018, Ansible Project2# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)3from __future__ import (absolute_import, division, print_function)4__metaclass__ = type5from ansible.module_utils._text import to_text6from ansible.module_utils.aws.waiters import get_waiter7from ansible.module_utils.common.dict_transformations import snake_dict_to_camel_dict8from ansible.module_utils.ec2 import compare_aws_tags, AWSRetry, ansible_dict_to_boto3_tag_list, boto3_tag_list_to_ansible_dict9try:10 from botocore.exceptions import BotoCoreError, ClientError, WaiterError11except ImportError:12 pass13from collections import namedtuple14from time import sleep15Boto3ClientMethod = namedtuple('Boto3ClientMethod', ['name', 'waiter', 'operation_description', 'cluster', 'instance'])16# Whitelist boto3 client methods for cluster and instance resources17cluster_method_names = [18 'create_db_cluster', 'restore_db_cluster_from_db_snapshot', 'restore_db_cluster_from_s3',19 'restore_db_cluster_to_point_in_time', 'modify_db_cluster', 'delete_db_cluster', 'add_tags_to_resource',20 'remove_tags_from_resource', 'list_tags_for_resource', 'promote_read_replica_db_cluster'21]22instance_method_names = [23 'create_db_instance', 'restore_db_instance_to_point_in_time', 'restore_db_instance_from_s3',24 'restore_db_instance_from_db_snapshot', 'create_db_instance_read_replica', 'modify_db_instance',25 'delete_db_instance', 'add_tags_to_resource', 'remove_tags_from_resource', 'list_tags_for_resource',26 'promote_read_replica', 'stop_db_instance', 'start_db_instance', 'reboot_db_instance'27]28def get_rds_method_attribute(method_name, module):29 readable_op = method_name.replace('_', ' ').replace('db', 'DB')30 if method_name in cluster_method_names and 'new_db_cluster_identifier' in module.params:31 cluster = True32 instance = False33 if method_name == 'delete_db_cluster':34 waiter = 'cluster_deleted'35 else:36 waiter = 'cluster_available'37 elif method_name in instance_method_names and 'new_db_instance_identifier' in module.params:38 cluster = False39 instance = True40 if method_name == 'delete_db_instance':41 waiter = 'db_instance_deleted'42 elif method_name == 'stop_db_instance':43 waiter = 'db_instance_stopped'44 else:45 waiter = 'db_instance_available'46 else:47 raise NotImplementedError("method {0} hasn't been added to the list of accepted methods to use a waiter in module_utils/aws/rds.py".format(method_name))48 return Boto3ClientMethod(name=method_name, waiter=waiter, operation_description=readable_op, cluster=cluster, instance=instance)49def get_final_identifier(method_name, module):50 apply_immediately = module.params['apply_immediately']51 if get_rds_method_attribute(method_name, module).cluster:52 identifier = module.params['db_cluster_identifier']53 updated_identifier = module.params['new_db_cluster_identifier']54 elif get_rds_method_attribute(method_name, module).instance:55 identifier = module.params['db_instance_identifier']56 updated_identifier = module.params['new_db_instance_identifier']57 else:58 raise NotImplementedError("method {0} hasn't been added to the list of accepted methods in module_utils/aws/rds.py".format(method_name))59 if not module.check_mode and updated_identifier and apply_immediately:60 identifier = updated_identifier61 return identifier62def handle_errors(module, exception, method_name, parameters):63 if not isinstance(exception, ClientError):64 module.fail_json_aws(exception, msg="Unexpected failure for method {0} with parameters {1}".format(method_name, parameters))65 changed = True66 error_code = exception.response['Error']['Code']67 if method_name == 'modify_db_instance' and error_code == 'InvalidParameterCombination':68 if 'No modifications were requested' in to_text(exception):69 changed = False70 elif 'ModifyDbCluster API' in to_text(exception):71 module.fail_json_aws(exception, msg='It appears you are trying to modify attributes that are managed at the cluster level. Please see rds_cluster')72 else:73 module.fail_json_aws(exception, msg='Unable to {0}'.format(get_rds_method_attribute(method_name, module).operation_description))74 elif method_name == 'promote_read_replica' and error_code == 'InvalidDBInstanceState':75 if 'DB Instance is not a read replica' in to_text(exception):76 changed = False77 else:78 module.fail_json_aws(exception, msg='Unable to {0}'.format(get_rds_method_attribute(method_name, module).operation_description))79 elif method_name == 'create_db_instance' and exception.response['Error']['Code'] == 'InvalidParameterValue':80 accepted_engines = [81 'aurora', 'aurora-mysql', 'aurora-postgresql', 'mariadb', 'mysql', 'oracle-ee', 'oracle-se',82 'oracle-se1', 'oracle-se2', 'postgres', 'sqlserver-ee', 'sqlserver-ex', 'sqlserver-se', 'sqlserver-web'83 ]84 if parameters.get('Engine') not in accepted_engines:85 module.fail_json_aws(exception, msg='DB engine {0} should be one of {1}'.format(parameters.get('Engine'), accepted_engines))86 else:87 module.fail_json_aws(exception, msg='Unable to {0}'.format(get_rds_method_attribute(method_name, module).operation_description))88 else:89 module.fail_json_aws(exception, msg='Unable to {0}'.format(get_rds_method_attribute(method_name, module).operation_description))90 return changed91def call_method(client, module, method_name, parameters):92 result = {}93 changed = True94 if not module.check_mode:95 wait = module.params['wait']96 # TODO: stabilize by adding get_rds_method_attribute(method_name).extra_retry_codes97 method = getattr(client, method_name)98 try:99 if method_name == 'modify_db_instance':100 # check if instance is in an available state first, if possible101 if wait:102 wait_for_status(client, module, module.params['db_instance_identifier'], method_name)103 result = AWSRetry.jittered_backoff(catch_extra_error_codes=['InvalidDBInstanceState'])(method)(**parameters)104 else:105 result = AWSRetry.jittered_backoff()(method)(**parameters)106 except (BotoCoreError, ClientError) as e:107 changed = handle_errors(module, e, method_name, parameters)108 if wait and changed:109 identifier = get_final_identifier(method_name, module)110 wait_for_status(client, module, identifier, method_name)111 return result, changed112def wait_for_instance_status(client, module, db_instance_id, waiter_name):113 def wait(client, db_instance_id, waiter_name, extra_retry_codes):114 retry = AWSRetry.jittered_backoff(catch_extra_error_codes=extra_retry_codes)115 try:116 waiter = client.get_waiter(waiter_name)117 except ValueError:118 # using a waiter in ansible.module_utils.aws.waiters119 waiter = get_waiter(client, waiter_name)120 waiter.wait(WaiterConfig={'Delay': 60, 'MaxAttempts': 60}, DBInstanceIdentifier=db_instance_id)121 waiter_expected_status = {122 'db_instance_deleted': 'deleted',123 'db_instance_stopped': 'stopped',124 }125 expected_status = waiter_expected_status.get(waiter_name, 'available')126 if expected_status == 'available':127 extra_retry_codes = ['DBInstanceNotFound']128 else:129 extra_retry_codes = []130 for attempt_to_wait in range(0, 10):131 try:132 wait(client, db_instance_id, waiter_name, extra_retry_codes)133 break134 except WaiterError as e:135 # Instance may be renamed and AWSRetry doesn't handle WaiterError136 if e.last_response.get('Error', {}).get('Code') == 'DBInstanceNotFound':137 sleep(10)138 continue139 module.fail_json_aws(e, msg='Error while waiting for DB instance {0} to be {1}'.format(db_instance_id, expected_status))140 except (BotoCoreError, ClientError) as e:141 module.fail_json_aws(e, msg='Unexpected error while waiting for DB instance {0} to be {1}'.format(142 db_instance_id, expected_status)143 )144def wait_for_cluster_status(client, module, db_cluster_id, waiter_name):145 try:146 waiter = get_waiter(client, waiter_name).wait(DBClusterIdentifier=db_cluster_id)147 except WaiterError as e:148 if waiter_name == 'cluster_deleted':149 msg = "Failed to wait for DB cluster {0} to be deleted".format(db_cluster_id)150 else:151 msg = "Failed to wait for DB cluster {0} to be available".format(db_cluster_id)152 module.fail_json_aws(e, msg=msg)153 except (BotoCoreError, ClientError) as e:154 module.fail_json_aws(e, msg="Failed with an unexpected error while waiting for the DB cluster {0}".format(db_cluster_id))155def wait_for_status(client, module, identifier, method_name):156 waiter_name = get_rds_method_attribute(method_name, module).waiter157 if get_rds_method_attribute(method_name, module).cluster:158 wait_for_cluster_status(client, module, identifier, waiter_name)159 elif get_rds_method_attribute(method_name, module).instance:160 wait_for_instance_status(client, module, identifier, waiter_name)161 else:162 raise NotImplementedError("method {0} hasn't been added to the whitelist of handled methods".format(method_name))163def get_tags(client, module, cluster_arn):164 try:165 return boto3_tag_list_to_ansible_dict(166 client.list_tags_for_resource(ResourceName=cluster_arn)['TagList']167 )168 except (BotoCoreError, ClientError) as e:169 module.fail_json_aws(e, msg="Unable to describe tags")170def arg_spec_to_rds_params(options_dict):171 tags = options_dict.pop('tags')172 has_processor_features = False173 if 'processor_features' in options_dict:174 has_processor_features = True175 processor_features = options_dict.pop('processor_features')176 camel_options = snake_dict_to_camel_dict(options_dict, capitalize_first=True)177 for key in list(camel_options.keys()):178 for old, new in (('Db', 'DB'), ('Iam', 'IAM'), ('Az', 'AZ')):179 if old in key:180 camel_options[key.replace(old, new)] = camel_options.pop(key)181 camel_options['Tags'] = tags182 if has_processor_features:183 camel_options['ProcessorFeatures'] = processor_features184 return camel_options185def ensure_tags(client, module, resource_arn, existing_tags, tags, purge_tags):186 if tags is None:187 return False188 tags_to_add, tags_to_remove = compare_aws_tags(existing_tags, tags, purge_tags)189 changed = bool(tags_to_add or tags_to_remove)190 if tags_to_add:191 call_method(192 client, module, method_name='add_tags_to_resource',193 parameters={'ResourceName': resource_arn, 'Tags': ansible_dict_to_boto3_tag_list(tags_to_add)}194 )195 if tags_to_remove:196 call_method(197 client, module, method_name='remove_tags_from_resource',198 parameters={'ResourceName': resource_arn, 'TagKeys': tags_to_remove}199 )...

Full Screen

Full Screen

app.py

Source:app.py Github

copy

Full Screen

...72 )73 if action == 'untag':74 for snapshot in snapshots.values():75 arn = snapshot['DBSnapshotArn']76 rds.remove_tags_from_resource(77 ResourceName=arn, TagKeys=['snap_usage', ])78def tag_db_cluster_snaps(rds, action):79 snapshots = {}80 for response in rds.get_paginator('describe_db_cluster_snapshots').paginate():81 snapshots.update([(snapshot['DBClusterSnapshotArn'], snapshot)82 for snapshot in response['DBClusterSnapshots']])83 if action == 'tag':84 for snapshot in snapshots.values():85 arn = snapshot['DBClusterSnapshotArn']86 rds.add_tags_to_resource(ResourceName=arn, Tags=[87 {88 'Key': 'snap_usage',89 'Value': 'rds'90 },91 ]92 )93 elif action == 'untag':94 for snapshot in snapshots.values():95 arn = snapshot['DBClusterSnapshotArn']96 rds.remove_tags_from_resource(97 ResourceName=arn, TagKeys=['snap_usage', ])98def tag_untag_resources(regionlist, action):99 for region in regionlist:100 ec2 = connect_service(region, 'ec2')101 tag_ebs_snaps(ec2, action)102 rds = connect_service(region, 'rds')103 tag_rds_snaps(rds, action)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful