Best Python code snippet using localstack_python
test_kms.py
Source:test_kms.py  
...98        assert decrypted["Plaintext"] == result["PrivateKeyPlaintext"]99    @pytest.mark.parametrize("key_type", ["rsa", "ecc"])100    def test_sign(self, kms_client, key_type, kms_create_key):101        key_spec = "RSA_2048" if key_type == "rsa" else "ECC_NIST_P256"102        result = kms_create_key(KeyUsage="SIGN_VERIFY", KeySpec=key_spec)103        key_id = result["KeyId"]104        message = b"test message 123 !%$@"105        algo = "RSASSA_PSS_SHA_256" if key_type == "rsa" else "ECDSA_SHA_384"106        result = kms_client.sign(107            KeyId=key_id, Message=message, MessageType="RAW", SigningAlgorithm=algo108        )109        def _verify(signature):110            kwargs = {}111            if key_type == "rsa":112                kwargs["padding"] = padding.PSS(113                    mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH114                )115                kwargs["algorithm"] = hashes.SHA256()116            else:117                kwargs["signature_algorithm"] = ec.ECDSA(algorithm=hashes.SHA384())118            public_key.verify(signature=signature, data=message, **kwargs)119        public_key_data = kms_client.get_public_key(KeyId=key_id)["PublicKey"]120        public_key = serialization.load_der_public_key(public_key_data)121        _verify(result["Signature"])122        with pytest.raises(InvalidSignature):123            _verify(result["Signature"] + b"foobar")124    @pytest.mark.aws_validated125    def test_get_and_list_sign_key(self, kms_client, kms_create_key):126        response = kms_create_key(KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_NIST_P256")127        key_id = response["KeyId"]128        describe_response = kms_client.describe_key(KeyId=key_id)["KeyMetadata"]129        assert describe_response["KeyId"] == key_id130        list_response = kms_client.list_keys()131        found = False132        for keyData in list_response["Keys"]:133            if keyData["KeyId"] == key_id:134                found = True135                break136        assert found is True137    def test_import_key(self, kms_client, kms_key):138        key_id = kms_key["KeyId"]139        # get key import params140        params = kms_client.get_parameters_for_import(141            KeyId=key_id, WrappingAlgorithm="RSAES_PKCS1_V1_5", WrappingKeySpec="RSA_2048"142        )143        assert params["KeyId"] == key_id144        assert params["ImportToken"]145        assert params["PublicKey"]146        assert isinstance(params["ParametersValidTo"], datetime)147        # create 256 bit symmetric key (import_key_material(..) works with symmetric keys, as per the docs)148        symmetric_key = bytes(getrandbits(8) for _ in range(32))149        assert len(symmetric_key) == 32150        # import symmetric key (key material) into KMS151        public_key = load_der_public_key(params["PublicKey"])152        encrypted_key = public_key.encrypt(symmetric_key, PKCS1v15())153        kms_client.import_key_material(154            KeyId=key_id,155            ImportToken=params["ImportToken"],156            EncryptedKeyMaterial=encrypted_key,157            ExpirationModel="KEY_MATERIAL_DOES_NOT_EXPIRE",158        )159        # use key to encrypt/decrypt data160        plaintext = b"test content 123 !#"161        encrypt_result = kms_client.encrypt(Plaintext=plaintext, KeyId=key_id)162        encrypted = encrypt(symmetric_key, plaintext)163        assert encrypt_result["CiphertextBlob"] == encrypted164        api_decrypted = kms_client.decrypt(165            CiphertextBlob=encrypt_result["CiphertextBlob"], KeyId=key_id166        )167        assert api_decrypted["Plaintext"] == plaintext168    @pytest.mark.aws_validated169    def test_list_aliases_of_key(self, kms_client, kms_create_key):170        aliased_key = kms_create_key()171        comparison_key = kms_create_key()172        alias_name = f"alias/{short_uid()}"173        kms_client.create_alias(AliasName=alias_name, TargetKeyId=aliased_key["KeyId"])174        response = kms_client.list_aliases(KeyId=aliased_key["KeyId"])175        assert len(response["Aliases"]) == 1176        response = kms_client.list_aliases(KeyId=comparison_key["KeyId"])...kms.py
Source:kms.py  
...22def kms_list_keys(client=None, region=None):23    response = client.list_keys()24    return [key['KeyId'] for key in response['Keys']]25@boto_client('kms')26def kms_create_key(description, policy=None, bypass_policy_lockout_safety_check=False,27                   key_usage='ENCRYPT_DECRYPT', origin='AWS_KMS', tags=[],28                   region=None, client=None):29    create_key_params = {30        'Description': description,31        'BypassPolicyLockoutSafetyCheck': bypass_policy_lockout_safety_check,32        'KeyUsage': key_usage,33        'Origin': origin,34        'Tags': tags35    }36    if policy:37        create_key_params['Policy'] = policy38    logger.debug({'create_key_params': create_key_params})39    return client.create_key(**create_key_params).get('KeyMetadata')40@boto_client('kms')41def kms_create_alias(alias_name, key_id, region=None, client=None):42    alias_name = __alias_name(alias_name)43    create_alias_params = {44        'AliasName': alias_name,45        'TargetKeyId': key_id46    }47    logger.debug({'create_alias_params': create_alias_params})48    try:49        client.create_alias(**create_alias_params)50    except Exception as e:51        logger.error(str(e))52        return False53    return True54def kms_ensure_key(alias_name, description=None, policy=None, bypass_policy_lockout_safety_check=False,55                   key_usage='ENCRYPT_DECRYPT', origin='AWS_KMS', tags=[], region=None):56    alias_name = __alias_name(alias_name)57    key_alias = get_alias_attr(alias_name, region=region)58    if not key_alias:59        logger.debug('[kms_ensure_key] key does not exist... creating it...')60        if description is None:61            description = alias_name62        key = kms_create_key(63            description=description,64            policy=policy,65            bypass_policy_lockout_safety_check=bypass_policy_lockout_safety_check,66            key_usage=key_usage,67            origin=origin,68            tags=tags,69            region=region70        )71        if kms_create_alias(alias_name, key['KeyId'], region=region):72            # need to get the new alias arn73            key_alias = get_alias_attr(alias_name, region=region)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
