How to use kms_client method in localstack

Best Python code snippet using localstack_python

kms_utils.py

Source:kms_utils.py Github

copy

Full Screen

1# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License"). You4# may not use this file except in compliance with the License. A copy of5# the License is located at6#7# http://aws.amazon.com/apache2.0/8#9# or in the "license" file accompanying this file. This file is10# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF11# ANY KIND, either express or implied. See the License for the specific12# language governing permissions and limitations under the License.13from __future__ import absolute_import14import contextlib15import json16from sagemaker import utils17PRINCIPAL_TEMPLATE = (18 '["{account_id}", "{role_arn}", ' '"arn:{partition}:iam::{account_id}:role/{sagemaker_role}"] '19)20KEY_ALIAS = "SageMakerTestKMSKey"21KMS_S3_ALIAS = "SageMakerTestS3KMSKey"22POLICY_NAME = "default"23KEY_POLICY = """24{{25 "Version": "2012-10-17",26 "Id": "{id}",27 "Statement": [28 {{29 "Sid": "Enable IAM User Permissions",30 "Effect": "Allow",31 "Principal": {{32 "AWS": {principal}33 }},34 "Action": "kms:*",35 "Resource": "*"36 }}37 ]38}}39"""40def _get_kms_key_arn(kms_client, alias):41 try:42 response = kms_client.describe_key(KeyId="alias/" + alias)43 return response["KeyMetadata"]["Arn"]44 except kms_client.exceptions.NotFoundException:45 return None46def _get_kms_key_id(kms_client, alias):47 try:48 response = kms_client.describe_key(KeyId="alias/" + alias)49 return response["KeyMetadata"]["KeyId"]50 except kms_client.exceptions.NotFoundException:51 return None52def _create_kms_key(53 kms_client, account_id, region, role_arn=None, sagemaker_role="SageMakerRole", alias=KEY_ALIAS54):55 if role_arn:56 principal = PRINCIPAL_TEMPLATE.format(57 partition=utils._aws_partition(region),58 account_id=account_id,59 role_arn=role_arn,60 sagemaker_role=sagemaker_role,61 )62 else:63 principal = '"{account_id}"'.format(account_id=account_id)64 response = kms_client.create_key(65 Policy=KEY_POLICY.format(66 id=POLICY_NAME, principal=principal, sagemaker_role=sagemaker_role67 ),68 Description="KMS key for SageMaker Python SDK integ tests",69 )70 key_arn = response["KeyMetadata"]["Arn"]71 if alias:72 kms_client.create_alias(AliasName="alias/" + alias, TargetKeyId=key_arn)73 return key_arn74def _add_role_to_policy(75 kms_client, account_id, role_arn, region, alias=KEY_ALIAS, sagemaker_role="SageMakerRole"76):77 key_id = _get_kms_key_id(kms_client, alias)78 policy = kms_client.get_key_policy(KeyId=key_id, PolicyName=POLICY_NAME)79 policy = json.loads(policy["Policy"])80 principal = policy["Statement"][0]["Principal"]["AWS"]81 if role_arn not in principal or sagemaker_role not in principal:82 principal = PRINCIPAL_TEMPLATE.format(83 partition=utils._aws_partition(region),84 account_id=account_id,85 role_arn=role_arn,86 sagemaker_role=sagemaker_role,87 )88 kms_client.put_key_policy(89 KeyId=key_id,90 PolicyName=POLICY_NAME,91 Policy=KEY_POLICY.format(id=POLICY_NAME, principal=principal),92 )93def get_or_create_kms_key(94 sagemaker_session, role_arn=None, alias=KEY_ALIAS, sagemaker_role="SageMakerRole"95):96 kms_client = sagemaker_session.boto_session.client("kms")97 kms_key_arn = _get_kms_key_arn(kms_client, alias)98 region = sagemaker_session.boto_region_name99 sts_client = sagemaker_session.boto_session.client(100 "sts", region_name=region, endpoint_url=utils.sts_regional_endpoint(region)101 )102 account_id = sts_client.get_caller_identity()["Account"]103 if kms_key_arn is None:104 return _create_kms_key(kms_client, account_id, region, role_arn, sagemaker_role, alias)105 if role_arn:106 _add_role_to_policy(kms_client, account_id, role_arn, region, alias, sagemaker_role)107 return kms_key_arn108KMS_BUCKET_POLICY = """{{109 "Version": "2012-10-17",110 "Id": "PutObjPolicy",111 "Statement": [112 {{113 "Sid": "DenyIncorrectEncryptionHeader",114 "Effect": "Deny",115 "Principal": "*",116 "Action": "s3:PutObject",117 "Resource": "arn:{partition}:s3:::{bucket_name}/*",118 "Condition": {{119 "StringNotEquals": {{120 "s3:x-amz-server-side-encryption": "aws:kms"121 }}122 }}123 }},124 {{125 "Sid": "DenyUnEncryptedObjectUploads",126 "Effect": "Deny",127 "Principal": "*",128 "Action": "s3:PutObject",129 "Resource": "arn:{partition}:s3:::{bucket_name}/*",130 "Condition": {{131 "Null": {{132 "s3:x-amz-server-side-encryption": "true"133 }}134 }}135 }}136 ]137}}"""138@contextlib.contextmanager139def bucket_with_encryption(sagemaker_session, sagemaker_role):140 boto_session = sagemaker_session.boto_session141 region = boto_session.region_name142 sts_client = boto_session.client(143 "sts", region_name=region, endpoint_url=utils.sts_regional_endpoint(region)144 )145 account = sts_client.get_caller_identity()["Account"]146 role_arn = sts_client.get_caller_identity()["Arn"]147 kms_client = boto_session.client("kms", region_name=region)148 kms_key_arn = _create_kms_key(kms_client, account, region, role_arn, sagemaker_role, None)149 region = boto_session.region_name150 bucket_name = "sagemaker-{}-{}-with-kms".format(region, account)151 sagemaker_session._create_s3_bucket_if_it_does_not_exist(bucket_name=bucket_name, region=region)152 s3_client = boto_session.client("s3", region_name=region)153 s3_client.put_bucket_encryption(154 Bucket=bucket_name,155 ServerSideEncryptionConfiguration={156 "Rules": [157 {158 "ApplyServerSideEncryptionByDefault": {159 "SSEAlgorithm": "aws:kms",160 "KMSMasterKeyID": kms_key_arn,161 }162 }163 ]164 },165 )166 s3_client.put_bucket_policy(167 Bucket=bucket_name,168 Policy=KMS_BUCKET_POLICY.format(169 partition=utils._aws_partition(region), bucket_name=bucket_name170 ),171 )172 yield "s3://" + bucket_name, kms_key_arn...

Full Screen

Full Screen

test_kms.py

Source:test_kms.py Github

copy

Full Screen

1import botocore.exceptions2import pytest3from cryptography.exceptions import InvalidSignature4from cryptography.hazmat.primitives import hashes, serialization5from cryptography.hazmat.primitives.asymmetric import ec, padding6from localstack import config7from localstack.constants import TEST_AWS_ACCOUNT_ID8class TestKMS:9 def test_create_key(self, kms_client):10 response = kms_client.list_keys()11 assert response["ResponseMetadata"]["HTTPStatusCode"] == 20012 keys_before = response["Keys"]13 response = kms_client.create_key(14 Policy="policy1", Description="test key 123", KeyUsage="ENCRYPT_DECRYPT"15 )16 assert response["ResponseMetadata"]["HTTPStatusCode"] == 20017 key_id = response["KeyMetadata"]["KeyId"]18 response = kms_client.list_keys()19 assert len(response["Keys"]) == len(keys_before) + 120 response = kms_client.describe_key(KeyId=key_id)["KeyMetadata"]21 assert response["KeyId"] == key_id22 assert ":%s:" % config.DEFAULT_REGION in response["Arn"]23 assert ":%s:" % TEST_AWS_ACCOUNT_ID in response["Arn"]24 def test_create_grant_with_invalid_key(self, kms_client):25 with pytest.raises(botocore.exceptions.ClientError):26 kms_client.create_grant(27 KeyId="invalid",28 GranteePrincipal="arn:aws:iam::000000000000:role/test",29 Operations=["Decrypt", "Encrypt"],30 )31 def test_list_grants_with_invalid_key(self, kms_client):32 with pytest.raises(botocore.exceptions.ClientError):33 kms_client.list_grants(34 KeyId="invalid",35 )36 def test_create_grant_with_valid_key(self, kms_client, kms_key):37 key_id = kms_key["KeyMetadata"]["KeyId"]38 grants_before = kms_client.list_grants(KeyId=key_id)["Grants"]39 grant = kms_client.create_grant(40 KeyId=key_id,41 GranteePrincipal="arn:aws:iam::000000000000:role/test",42 Operations=["Decrypt", "Encrypt"],43 )44 assert "GrantId" in grant45 assert "GrantToken" in grant46 grants_after = kms_client.list_grants(KeyId=key_id)["Grants"]47 assert len(grants_after) == len(grants_before) + 148 def test_revoke_grant(self, kms_client, kms_grant_and_key):49 grant = kms_grant_and_key[0]50 key_id = kms_grant_and_key[1]["KeyMetadata"]["KeyId"]51 grants_before = kms_client.list_grants(KeyId=key_id)["Grants"]52 kms_client.revoke_grant(KeyId=key_id, GrantId=grant["GrantId"])53 grants_after = kms_client.list_grants(KeyId=key_id)["Grants"]54 assert len(grants_after) == len(grants_before) - 155 def test_retire_grant(self, kms_client, kms_grant_and_key):56 grant = kms_grant_and_key[0]57 key_id = kms_grant_and_key[1]["KeyMetadata"]["KeyId"]58 grants_before = kms_client.list_grants(KeyId=key_id)["Grants"]59 kms_client.retire_grant(GrantToken=grant["GrantToken"])60 grants_after = kms_client.list_grants(KeyId=key_id)["Grants"]61 assert len(grants_after) == len(grants_before) - 162 def test_asymmetric_keys(self, kms_client, kms_key):63 key_id = kms_key["KeyMetadata"]["KeyId"]64 # generate key pair without plaintext65 result = kms_client.generate_data_key_pair_without_plaintext(66 KeyId=key_id, KeyPairSpec="RSA_2048"67 )68 assert result.get("PrivateKeyCiphertextBlob")69 assert not result.get("PrivateKeyPlaintext")70 assert result.get("PublicKey")71 # generate key pair72 result = kms_client.generate_data_key_pair(KeyId=key_id, KeyPairSpec="RSA_2048")73 assert result.get("PrivateKeyCiphertextBlob")74 assert result.get("PrivateKeyPlaintext")75 assert result.get("PublicKey")76 # get public key77 result1 = kms_client.get_public_key(KeyId=key_id)78 assert result.get("KeyId") == result1.get("KeyId")79 assert result.get("KeyPairSpec") == result1.get("KeySpec")80 assert result.get("PublicKey") == result1.get("PublicKey")81 # assert correct value of encrypted key82 decrypted = kms_client.decrypt(83 CiphertextBlob=result["PrivateKeyCiphertextBlob"], KeyId=key_id84 )85 assert decrypted["Plaintext"] == result["PrivateKeyPlaintext"]86 @pytest.mark.parametrize("key_type", ["rsa", "ecc"])87 def test_sign(self, kms_client, key_type):88 key_spec = "RSA_2048" if key_type == "rsa" else "ECC_NIST_P256"89 result = kms_client.create_key(KeyUsage="SIGN_VERIFY", KeySpec=key_spec)90 key_id = result["KeyMetadata"]["KeyId"]91 message = b"test message 123 !%$@"92 algo = "RSASSA_PSS_SHA_256" if key_type == "rsa" else "ECDSA_SHA_384"93 result = kms_client.sign(94 KeyId=key_id, Message=message, MessageType="RAW", SigningAlgorithm=algo95 )96 def _verify(signature):97 kwargs = {}98 if key_type == "rsa":99 kwargs["padding"] = padding.PSS(100 mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH101 )102 kwargs["algorithm"] = hashes.SHA256()103 else:104 kwargs["signature_algorithm"] = ec.ECDSA(algorithm=hashes.SHA384())105 public_key.verify(signature=signature, data=message, **kwargs)106 public_key_data = kms_client.get_public_key(KeyId=key_id)["PublicKey"]107 public_key = serialization.load_der_public_key(public_key_data)108 _verify(result["Signature"])109 with pytest.raises(InvalidSignature):110 _verify(result["Signature"] + b"foobar")111 def test_get_and_list_sign_key(self, kms_client):112 response = kms_client.create_key(113 Description="test key 123",114 KeyUsage="SIGN_VERIFY",115 CustomerMasterKeySpec="ECC_NIST_P256",116 )117 key_id = response["KeyMetadata"]["KeyId"]118 describe_response = kms_client.describe_key(KeyId=key_id)["KeyMetadata"]119 assert describe_response["KeyId"] == key_id120 list_response = kms_client.list_keys()121 found = False122 for keyData in list_response["Keys"]:123 if keyData["KeyId"] == key_id:124 found = True125 break...

Full Screen

Full Screen

kms.py

Source:kms.py Github

copy

Full Screen

1######################################################################################################################2# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. #3# #4# Licensed under the Amazon Software License (the "License"). You may not use this file except in compliance #5# with the License. A copy of the License is located at #6# #7# http://aws.amazon.com/asl/ #8# #9# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES #10# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions #11# and limitations under the License. #12######################################################################################################################13#!/bin/python14import boto315import inspect16kms_client = boto3.client('kms')17class KMS(object):18 def __init__(self, logger):19 self.logger = logger20 def describe_key(self, alias_name):21 try:22 key_id = 'alias/' + alias_name23 response = kms_client.describe_key(24 KeyId=key_id25 )26 return response27 except Exception as e:28 message = {'FILE': __file__.split('/')[-1], 'CLASS': self.__class__.__name__,29 'METHOD': inspect.stack()[0][3], 'EXCEPTION': str(e)}30 self.logger.exception(message)31 raise32 def create_key(self, policy, description="CMK created for AWS Landing Zone Resources"):33 try:34 response = kms_client.create_key(35 Policy=policy,36 Description=description,37 KeyUsage='ENCRYPT_DECRYPT',38 Origin='AWS_KMS',39 BypassPolicyLockoutSafetyCheck=True,40 Tags=[41 {42 'TagKey': 'AWSSolutions',43 'TagValue': 'AWSLandingZone'44 },45 ]46 )47 return response48 except Exception as e:49 message = {'FILE': __file__.split('/')[-1], 'CLASS': self.__class__.__name__,50 'METHOD': inspect.stack()[0][3], 'EXCEPTION': str(e)}51 self.logger.exception(message)52 raise53 def create_alias(self, alias_name, key_name):54 try:55 response = kms_client.create_alias(56 AliasName=alias_name,57 TargetKeyId=key_name58 )59 return response60 except Exception as e:61 message = {'FILE': __file__.split('/')[-1], 'CLASS': self.__class__.__name__,62 'METHOD': inspect.stack()[0][3], 'EXCEPTION': str(e)}63 self.logger.exception(message)64 raise65 def list_aliases(self, marker=None):66 try:67 if marker:68 response = kms_client.list_aliases(Marker=marker)69 else:70 response = kms_client.list_aliases()71 return response72 except Exception as e:73 message = {'FILE': __file__.split('/')[-1], 'CLASS': self.__class__.__name__,74 'METHOD': inspect.stack()[0][3], 'EXCEPTION': str(e)}75 self.logger.exception(message)76 raise77 def put_key_policy(self, key_id, policy):78 try:79 response = kms_client.put_key_policy(80 KeyId=key_id,81 Policy=policy,82 PolicyName = 'default', # Per API docs, the only valid value is default.83 BypassPolicyLockoutSafetyCheck=True84 )85 return response86 except Exception as e:87 message = {'FILE': __file__.split('/')[-1], 'CLASS': self.__class__.__name__,88 'METHOD': inspect.stack()[0][3], 'EXCEPTION': str(e)}89 self.logger.exception(message)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful