Best Python code snippet using localstack_python
test_AWS_KMS_Auditor.py
Source:test_AWS_KMS_Auditor.py  
1#This file is part of ElectricEye.2#SPDX-License-Identifier: Apache-2.03#Licensed to the Apache Software Foundation (ASF) under one4#or more contributor license agreements.  See the NOTICE file5#distributed with this work for additional information6#regarding copyright ownership.  The ASF licenses this file7#to you under the Apache License, Version 2.0 (the8#"License"); you may not use this file except in compliance9#with the License.  You may obtain a copy of the License at10#http://www.apache.org/licenses/LICENSE-2.011#Unless required by applicable law or agreed to in writing,12#software distributed under the License is distributed on an13#"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY14#KIND, either express or implied.  See the License for the15#specific language governing permissions and limitations16#under the License.17import datetime18import json19import os20import pytest21from botocore.stub import Stubber, ANY22from . import context23from auditors.aws.AWS_KMS_Auditor import (24    kms_key_exposed_check,25    kms_key_rotation_check,26    kms,27)28list_aliases_response = {29    "Aliases": [30        {31            "AliasArn": "arn:aws:kms:us-east-1:012345678901:alias/aws/s3",32            "TargetKeyId": "c84a8fab-6c42-4b33-ad64-a8e0b0ec0a15",33        },34    ],35}36get_key_policy_public_response = {37    "Policy": '{"Version": "2012-10-17","Id": "KeyPolicy1568312239560","Statement": [{"Sid": "StmtID1672312238115","Effect": "Allow","Principal": {"AWS": "*"},"Action": "kms:*","Resource": "*"}]}'38}39get_key_policy_not_public_response = {40    "Policy": '{"Version": "2012-10-17","Id": "KeyPolicy1568312239560","Statement": [{"Sid": "StmtID1672312238115","Effect": "Allow","Principal": {"AWS": "012345678901"},"Action": "kms:*","Resource": "*"}]}'41}42get_key_policy_has_condition_response = {43    "Policy": '{"Version": "2012-10-17","Id": "KeyPolicy1568312239560","Statement": [{"Sid": "StmtID1672312238115","Effect": "Allow","Principal": {"AWS": "*"},"Action": "kms:*","Resource": "*","Condition": {"StringEquals": {"kms:CallerAccount": "012345678901","kms:ViaService": "sns.us-east-1.amazonaws.com"}}}]}'44}45get_key_policy_no_AWS_response = {46    "Policy": '{"Version": "2012-10-17","Id": "KeyPolicy1568312239560","Statement": [{"Sid": "StmtID1672312238115","Effect": "Allow","Principal": {"Service": "cloudtrail.amazonaws.com"},"Action": "kms:*","Resource": "*","Condition": {"StringEquals": {"kms:CallerAccount": "012345678901","kms:ViaService": "sns.us-east-1.amazonaws.com"}}}]}'47}48get_key_policy_principal_str_response = {49    "Policy": '{"Version": "2012-10-17","Id": "KeyPolicy1568312239560","Statement": [{"Sid": "StmtID1672312238115","Effect": "Allow","Principal": "*","Action": "kms:*","Resource": "*","Condition": {"StringEquals": {"kms:CallerAccount": "012345678901","kms:ViaService": "sns.us-east-1.amazonaws.com"}}}]}'50}51list_keys_response = {52    "Keys": [53        {54            "KeyId": "273e5d8e-4746-4ba9-be3a-4dce36783814",55            "KeyArn": "arn:aws:kms:us-east-1:012345678901:key/273e5d8e-4746-4ba9-be3a-4dce36783814",56        }57    ]58}59get_key_rotation_status_response = {"KeyRotationEnabled": True}60get_key_rotation_status_response1 = {"KeyRotationEnabled": False}61@pytest.fixture(scope="function")62def kms_stubber():63    kms_stubber = Stubber(kms)64    kms_stubber.activate()65    yield kms_stubber66    kms_stubber.deactivate()67def test_key_rotation_enabled(kms_stubber):68    kms_stubber.add_response("list_keys", list_keys_response)69    kms_stubber.add_response(70        "get_key_rotation_status", get_key_rotation_status_response71    )72    results = kms_key_rotation_check(73        cache={}, awsAccountId="012345678901", awsRegion="us-east-1", awsPartition="aws"74    )75    for result in results:76        assert "273e5d8e-4746-4ba9-be3a-4dce36783814" in result["Id"]77        assert result["RecordState"] == "ARCHIVED"78    kms_stubber.assert_no_pending_responses()79def test_key_rotation_not_enabled(kms_stubber):80    kms_stubber.add_response("list_keys", list_keys_response)81    kms_stubber.add_response(82        "get_key_rotation_status", get_key_rotation_status_response183    )84    results = kms_key_rotation_check(85        cache={}, awsAccountId="012345678901", awsRegion="us-east-1", awsPartition="aws"86    )87    for result in results:88        assert "273e5d8e-4746-4ba9-be3a-4dce36783814" in result["Id"]89        assert result["RecordState"] == "ACTIVE"90    kms_stubber.assert_no_pending_responses()91def test_has_public_key(kms_stubber):92    kms_stubber.add_response("list_aliases", list_aliases_response)93    kms_stubber.add_response("get_key_policy", get_key_policy_public_response)94    results = kms_key_exposed_check(95        cache={}, awsAccountId="012345678901", awsRegion="us-east-1", awsPartition="aws"96    )97    for result in results:98        assert "s3" in result["Id"]99        assert result["RecordState"] == "ACTIVE"100    kms_stubber.assert_no_pending_responses()101def test_no_public_key(kms_stubber):102    kms_stubber.add_response("list_aliases", list_aliases_response)103    kms_stubber.add_response("get_key_policy", get_key_policy_not_public_response)104    results = kms_key_exposed_check(105        cache={}, awsAccountId="012345678901", awsRegion="us-east-1", awsPartition="aws"106    )107    for result in results:108        assert "s3" in result["Id"]109        assert result["RecordState"] == "ARCHIVED"110    kms_stubber.assert_no_pending_responses()111def test_has_condition(kms_stubber):112    kms_stubber.add_response("list_aliases", list_aliases_response)113    kms_stubber.add_response("get_key_policy", get_key_policy_has_condition_response)114    results = kms_key_exposed_check(115        cache={}, awsAccountId="012345678901", awsRegion="us-east-1", awsPartition="aws"116    )117    for result in results:118        assert "s3" in result["Id"]119        assert result["RecordState"] == "ARCHIVED"120    kms_stubber.assert_no_pending_responses()121def test_no_AWS(kms_stubber):122    kms_stubber.add_response("list_aliases", list_aliases_response)123    kms_stubber.add_response("get_key_policy", get_key_policy_no_AWS_response)124    results = kms_key_exposed_check(125        cache={}, awsAccountId="012345678901", awsRegion="us-east-1", awsPartition="aws"126    )127    for result in results:128        assert "s3" in result["Id"]129        assert result["RecordState"] == "ARCHIVED"130    kms_stubber.assert_no_pending_responses()131def test_principal_is_string(kms_stubber):132    kms_stubber.add_response("list_aliases", list_aliases_response)133    kms_stubber.add_response("get_key_policy", get_key_policy_principal_str_response)134    results = kms_key_exposed_check(135        cache={}, awsAccountId="012345678901", awsRegion="us-east-1", awsPartition="aws"136    )137    for result in results:138        assert "s3" in result["Id"]139        assert result["RecordState"] == "ARCHIVED"...aws_utils.py
Source:aws_utils.py  
...12    key_policy['Statement'][0]['Principal']['AWS'] = principal_list13    return key_policy14def get_principals(key_policy):15    return key_policy['Statement'][0]['Principal']16def get_key_policy(key_id):17    return json.loads(18        kms_client.get_key_policy(KeyId=key_id, PolicyName='default')['Policy']19    )20def get_key_id(key_alias):21    return [alias for alias in kms_client.list_aliases()['Aliases'] if alias['AliasName'] == key_alias][0]['TargetKeyId']22def get_key_policy_with_alias(key_alias):23    return get_key_policy(get_key_id(key_alias))24def fetch_principal_arn(role_name):25    return [x for x in iam_client.list_roles()['Roles'] if x['RoleName'] == role_name][0]['Arn']26def generate_kms_policy_string(principals, key_id_string = 'key-default-1'):27    policy_string = json.dumps({28        'Id': key_id_string,29        'Statement': [{30            'Action': 'kms:*',31            'Effect': 'Allow',32            'Principal': {'AWS': principals},33            'Resource': '*',34            'Sid': 'Enable IAM User Permissions'}],35    'Version': '2012-10-17'}36    )37    return policy_string...get_key_policy.py
Source:get_key_policy.py  
...9# logger config10logger = logging.getLogger()11logging.basicConfig(level=logging.INFO,12                    format='%(asctime)s: %(levelname)s: %(message)s')13def get_key_policy(keyId, policyName):14    """15    Provides detailed information about a KMS key.16    """17    try:18        client = boto3.client('kms')19        response = client.get_key_policy(20            KeyId=keyId,21            PolicyName=policyName22        )23        # describe_key(KeyId=keyId, GrantTokens=[])24    except ClientError:25        logger.exception('Could not describe a KMS key.')26        raise27    else:28        return response['Policy']29if __name__ == '__main__':30    # Constants31    KEY_ID = '4bd80467-916b-4a8d-b20b-157253518086'32    logger.info('Getting information about KMS key...')33    kms = get_key_policy(KEY_ID, 'default')34    logger.info(35        f'Key Details: {json.dumps(kms, indent=4, default=json_datetime_serializer)}'...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
