How to use authorize_snapshot_access method in localstack

Best Python code snippet using localstack_python

client.py

Source:client.py Github

copy

Full Screen

...10 def accept_reserved_node_exchange(self, ReservedNodeId: str, TargetReservedNodeOfferingId: str) -> Dict:11 pass12 def authorize_cluster_security_group_ingress(self, ClusterSecurityGroupName: str, CIDRIP: str = None, EC2SecurityGroupName: str = None, EC2SecurityGroupOwnerId: str = None) -> Dict:13 pass14 def authorize_snapshot_access(self, SnapshotIdentifier: str, AccountWithRestoreAccess: str, SnapshotClusterIdentifier: str = None) -> Dict:15 pass16 def batch_delete_cluster_snapshots(self, Identifiers: List) -> Dict:17 pass18 def batch_modify_cluster_snapshots(self, SnapshotIdentifierList: List, ManualSnapshotRetentionPeriod: int = None, Force: bool = None) -> Dict:19 pass20 def can_paginate(self, operation_name: str = None):21 pass22 def cancel_resize(self, ClusterIdentifier: str) -> Dict:23 pass24 def copy_cluster_snapshot(self, SourceSnapshotIdentifier: str, TargetSnapshotIdentifier: str, SourceSnapshotClusterIdentifier: str = None, ManualSnapshotRetentionPeriod: int = None) -> Dict:25 pass26 def create_cluster(self, ClusterIdentifier: str, NodeType: str, MasterUsername: str, MasterUserPassword: str, DBName: str = None, ClusterType: str = None, ClusterSecurityGroups: List = None, VpcSecurityGroupIds: List = None, ClusterSubnetGroupName: str = None, AvailabilityZone: str = None, PreferredMaintenanceWindow: str = None, ClusterParameterGroupName: str = None, AutomatedSnapshotRetentionPeriod: int = None, ManualSnapshotRetentionPeriod: int = None, Port: int = None, ClusterVersion: str = None, AllowVersionUpgrade: bool = None, NumberOfNodes: int = None, PubliclyAccessible: bool = None, Encrypted: bool = None, HsmClientCertificateIdentifier: str = None, HsmConfigurationIdentifier: str = None, ElasticIp: str = None, Tags: List = None, KmsKeyId: str = None, EnhancedVpcRouting: bool = None, AdditionalInfo: str = None, IamRoles: List = None, MaintenanceTrackName: str = None, SnapshotScheduleIdentifier: str = None) -> Dict:27 pass28 def create_cluster_parameter_group(self, ParameterGroupName: str, ParameterGroupFamily: str, Description: str, Tags: List = None) -> Dict:...

Full Screen

Full Screen

redshift_backup.py

Source:redshift_backup.py Github

copy

Full Screen

...206 Share backup with another AWS Account207 """208 redshift_client = AwsHelper.boto3_client('redshift', region_name=backup_region, arn=self.role_arn, external_id=self.role_external_id)209 snapshot_id = backup_id.split(":")[-1].split("/")[1]210 redshift_client.authorize_snapshot_access(211 SnapshotIdentifier=snapshot_id,212 AccountWithRestoreAccess=aws_account_id213 )214 def get_backup_resource(self, backup_region: str, backup_id: str) -> BackupResource:215 """216 Get Backup Resource within region, identified by its backup_id217 """218 redshift_client = AwsHelper.boto3_client('redshift', region_name=backup_region, arn=self.role_arn, external_id=self.role_external_id)219 snapshot_id = backup_id.split(":")[-1].split("/")[1]220 snapshots = redshift_client.describe_cluster_snapshots(SnapshotIdentifier=snapshot_id)221 snapshot = snapshots['Snapshots'][0]222 d_tags = BackupResource.dict_from_boto3_tags(snapshot['Tags'])223 return BackupResource.construct(d_tags['shelvery:tag_name'], backup_id, d_tags)224 def copy_shared_backup(self, source_account: str, source_backup: BackupResource) -> str:...

Full Screen

Full Screen

redshift_create_snapshot_action.py

Source:redshift_create_snapshot_action.py Github

copy

Full Screen

1######################################################################################################################2# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. #3# #4# Licensed under the Amazon Software License (the "License"). You may not use this file except in compliance #5# with the License. A copy of the License is located at #6# #7# http://aws.amazon.com/asl/ #8# #9# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES #10# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions #11# and limitations under the License. #12######################################################################################################################13from datetime import datetime14import services.redshift_service15from actions import *16from boto_retry import get_client_with_retries17from util.tag_filter_set import TagFilterSet18GROUP_TITLE_TAGGING_NAMING = "Tagging and naming options"19PARAM_DESC_COPIED_CLUSTER_TAGS = "Copied tags from instance to snapshot"20PARAM_DESC_SNAPSHOT_TAGS = "Tags to add to snapshot, use list of tagname=tagvalue pairs"21PARAM_DESC_ACCOUNTS_RESTORE_ACCESS = "Comma separated list of accounts that will be granted access to restore the snapshot"22PARAM_LABEL_COPIED_CLUSTER_TAGS = "Copied cluster tags"23PARAM_LABEL_SNAPSHOT_TAGS = "Snapshot tags"24PARAM_LABEL_CCOUNTS_RESTORE_ACCESS = "Grant restore access to accounts"25SNAPSHOT_NAME = "{}-{:0>4d}{:0>2d}{:0>2d}{:0>02d}{:0>02d}"26INFO_CREATE_SNAPSHOT = "Creating snapshot for redshift cluster \"{}\""27INFO_SNAPSHOT_CREATED = "Snapshot is {}"28INFO_SNAPSHOT_NAME = "Name of the snapshot is {}"29INFO_START_SNAPSHOT_ACTION = "Creating snapshot for redshift cluster \"{}\" for task \"{}\""30INFO_GRANT_ACCOUNT_ACCESS = "Granted access to snapshot for account {}"31PARAM_COPIED_CLUSTER_TAGS = "CopiedInstanceTags"32PARAM_SNAPSHOT_TAGS = "SnapshotTags"33PARAM_ACCOUNTS_RESTORE_ACCESS = "AccountsWithRestoreAccess"34class RedshiftCreateSnapshotAction:35 properties = {36 ACTION_TITLE: "RedShift Create Snapshot",37 ACTION_VERSION: "1.0",38 ACTION_DESCRIPION: "Creates manual type snapshot for Redshift cluster",39 ACTION_AUTHOR: "AWS",40 ACTION_ID: "6310b757-d8a8-4031-af29-29b9fc5bcf65",41 ACTION_SERVICE: "redshift",42 ACTION_RESOURCES: services.redshift_service.CLUSTERS,43 ACTION_AGGREGATION: ACTION_AGGREGATION_RESOURCE,44 ACTION_MEMORY: 128,45 ACTION_SELECT_EXPRESSION: "Clusters[*].{ClusterIdentifier:ClusterIdentifier,ClusterStatus:ClusterStatus,Tags:Tags}",46 ACTION_PARAMETERS: {47 PARAM_COPIED_CLUSTER_TAGS: {48 PARAM_DESCRIPTION: PARAM_DESC_COPIED_CLUSTER_TAGS,49 PARAM_TYPE: type(""),50 PARAM_REQUIRED: False,51 PARAM_LABEL: PARAM_LABEL_COPIED_CLUSTER_TAGS52 },53 PARAM_SNAPSHOT_TAGS: {54 PARAM_DESCRIPTION: PARAM_DESC_SNAPSHOT_TAGS,55 PARAM_TYPE: type(""),56 PARAM_REQUIRED: False,57 PARAM_LABEL: PARAM_LABEL_SNAPSHOT_TAGS58 },59 PARAM_ACCOUNTS_RESTORE_ACCESS: {60 PARAM_DESCRIPTION: PARAM_DESC_ACCOUNTS_RESTORE_ACCESS,61 PARAM_TYPE: type([]),62 PARAM_REQUIRED: False,63 PARAM_LABEL: PARAM_LABEL_CCOUNTS_RESTORE_ACCESS64 }65 },66 ACTION_PARAMETER_GROUPS: [67 {68 ACTION_PARAMETER_GROUP_TITLE: GROUP_TITLE_TAGGING_NAMING,69 ACTION_PARAMETER_GROUP_LIST: [70 PARAM_COPIED_CLUSTER_TAGS,71 PARAM_SNAPSHOT_TAGS72 ],73 }],74 ACTION_PERMISSIONS: ["redshift:DescribeClusters",75 "redshift:CreateClusterSnapshot",76 "redshift:DescribeTags",77 "redshift:AuthorizeSnapshotAccess"]78 }79 def __init__(self, arguments):80 self._arguments = arguments81 self.logger = self._arguments[ACTION_PARAM_LOGGER]82 self.context = self._arguments[ACTION_PARAM_CONTEXT]83 self.session = self._arguments[ACTION_PARAM_SESSION]84 self.task = self._arguments[ACTION_PARAM_TASK]85 self.cluster = self._arguments[ACTION_PARAM_RESOURCES]86 self.cluster_id = self.cluster["ClusterIdentifier"]87 self.cluster_tags = self.cluster.get("Tags", {})88 self.cluster_status = self.cluster["ClusterStatus"]89 self.copied_instance_tagfilter = TagFilterSet(self._arguments.get(PARAM_COPIED_CLUSTER_TAGS, ""))90 self.snapshot_tags = {}91 for tag in self._arguments.get(PARAM_SNAPSHOT_TAGS, "").split(","):92 t = tag.partition("=")93 self.snapshot_tags[t[0]] = t[2]94 self.logger.debug("Arguments {}", self._arguments)95 self.granted_accounts = self._arguments.get(PARAM_ACCOUNTS_RESTORE_ACCESS, [])96 self.result = {97 "account": self.cluster["AwsAccount"],98 "region": self.cluster["Region"],99 "cluster-identifier": self.cluster_id,100 "task": self.task101 }102 def execute(self, _):103 self.logger.info("{}, version {}", self.properties[ACTION_TITLE], self.properties[ACTION_VERSION])104 self.logger.info(INFO_START_SNAPSHOT_ACTION, self.cluster_id, self.task)105 if self.cluster_status != "available":106 raise Exception("Status of cluster is \"{}\", can only make snapshot of cluster with status \"available\"".format(107 self.cluster_status))108 tags = self.copied_instance_tagfilter.pairs_matching_any_filter(self.cluster_tags)109 tags.update(self.snapshot_tags)110 snapshot_tags = [{"Key": t, "Value": tags[t]} for t in tags]111 dt = datetime.utcnow()112 snapshot_name = SNAPSHOT_NAME.format(self.cluster_id, dt.year, dt.month, dt.day, dt.hour, dt.minute)113 redshift = get_client_with_retries("redshift", ["create_cluster_snapshot", "authorize_snapshot_access"],114 context=self.context, session=self.session)115 create_snapshot_resp = redshift.create_cluster_snapshot_with_retries(SnapshotIdentifier=snapshot_name,116 Tags=snapshot_tags,117 ClusterIdentifier=self.cluster_id)118 self.result["snapshot-identifier"] = snapshot_name119 self.result["snapshot-create-time"] = create_snapshot_resp["Snapshot"]["SnapshotCreateTime"]120 self.logger.info(INFO_SNAPSHOT_CREATED, snapshot_name)121 if self.granted_accounts is not None and len(self.granted_accounts) > 0:122 for account in self.granted_accounts:123 redshift.authorize_snapshot_access_with_retries(SnapshotIdentifier=snapshot_name,124 SnapshotClusterIdentifier=self.cluster_id,125 AccountWithRestoreAccess=account)126 self.logger.info(INFO_GRANT_ACCOUNT_ACCESS, account)127 self.result["granted-access-accounts"] = self.granted_accounts...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful