Best Python code snippet using localstack_python
test_mozdef_user.py
Source:test_mozdef_user.py  
...8        """9        cls.client = boto3.client('iam')10    # http://boto3.readthedocs.io/en/latest/reference/services/iam.html#IAM.Client.simulate_custom_policy11    def test_allowed_list_buckets(self, config):12        response = self.client.simulate_principal_policy(13            PolicySourceArn=config['source_arn'],14            ActionNames=['s3:ListAllMyBuckets']15        )16        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'17    def test_allowed_list_bucket_contents(self, config):18        response = self.client.simulate_principal_policy(19            PolicySourceArn=config['source_arn'],20            ActionNames=['s3:ListBucket'],21            ResourceArns=[22                'arn:aws:s3:::%s' % config['BackupBucketName'],23                'arn:aws:s3:::%s' % config['NewProdBackupBucketName'],24                'arn:aws:s3:::%s' % config['BlocklistBucketName'],25                'arn:aws:s3:::%s' % config['IPSpaceBucketName'],26            ]27        )28        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'29    def test_allowed_write_to_mozdefes2backups(self, config):30        response = self.client.simulate_principal_policy(31            PolicySourceArn=config['source_arn'],32            ActionNames=['s3:PutObject', 's3:DeleteObject'],33            ResourceArns=['arn:aws:s3:::%s/example_file.txt' % config['BackupBucketName'],]34        )35        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'36    def test_allowed_write_to_new_prod_backup_bucket(self, config):37        response = self.client.simulate_principal_policy(38            PolicySourceArn=config['source_arn'],39            ActionNames=['s3:PutObject', 's3:DeleteObject'],40            ResourceArns=['arn:aws:s3:::%s/example_file.txt' % config['NewProdBackupBucketName'],]41        )42        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'43    def test_denied_write_to_other_environments_backup_bucket(self, config):44        environment_index = config['environment_name_list'].index(45            config['environment_name'])46        other_environment_index = (47            environment_index + 148            if environment_index < (len(config['environment_name_list']) - 1)49            else 050        )51        other_environment_name = config['environment_name_list'][52            other_environment_index]53        other_config = config['all_environments'][other_environment_name]54        response = self.client.simulate_principal_policy(55            PolicySourceArn=config['source_arn'],56            ActionNames=['s3:PutObject', 's3:DeleteObject'],57            ResourceArns=[58                'arn:aws:s3:::%s/example_file.txt' %59                other_config['NewProdBackupBucketName']]60        )61        assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'62    def test_allowed_write_to_mozilla_infosec_blocklist(self, config):63        response = self.client.simulate_principal_policy(64            PolicySourceArn=config['source_arn'],65            ActionNames=['s3:PutObject'],66            ResourceArns=['arn:aws:s3:::%s/example_file.txt' % config['BlocklistBucketName']]67        )68        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'69    def test_denied_write_to_bucket(self, config):70        response = self.client.simulate_principal_policy(71            PolicySourceArn=config['source_arn'],72            ActionNames=['s3:PutObject'],73            ResourceArns=[74                'arn:aws:s3:::BucketThatIsNotAllowed/example_file.txt']75        )76        assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'77    def test_allowed_read_from_mozilla_ipspace(self, config):78        response = self.client.simulate_principal_policy(79            PolicySourceArn=config['source_arn'],80            ActionNames=['s3:GetObject'],81            ResourceArns=['arn:aws:s3:::%s/example_file.txt' % config['IPSpaceBucketName']]82        )83        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'84    def test_denied_read_from_bucket(self, config):85        response = self.client.simulate_principal_policy(86            PolicySourceArn=config['source_arn'],87            ActionNames=['s3:GetObject'],88            ResourceArns=['arn:aws:s3:::BucketThatIsNotAllowed/example_file.txt']89        )90        assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'91    def test_allowed_list_cloudtrail_bucket_contents(self, config):92        response = self.client.simulate_principal_policy(93            PolicySourceArn=config['source_arn'],94            ActionNames=['s3:ListBucket'],95            ResourceArns=['arn:aws:s3:::AnyBucketAtAll']96        )97        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'98    def test_allowed_get_cloudtrail_log(self, config):99        response = self.client.simulate_principal_policy(100            PolicySourceArn=config['source_arn'],101            ActionNames=['s3:GetObject'],102            ResourceArns = ['arn:aws:s3:::AnyBucketAtAll/AWSLogs/012345678901/CloudTrail/ap-northeast-1/2017/02/15/012345678901_CloudTrail_ap-northeast-1_20170215T0000Z_UVpGnwCcvkdew1nf.json.gz']103        )104        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'105    def test_allowed_describe_cloudtrails(self, config):106        response = self.client.simulate_principal_policy(107            PolicySourceArn=config['source_arn'],108            ActionNames=['cloudtrail:DescribeTrails']109        )110        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'111    def test_allowed_get_session_token(self, config):112        response = self.client.simulate_principal_policy(113            PolicySourceArn=config['source_arn'],114            ActionNames=['sts:GetSessionToken']115        )116        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'117    def test_allowed_assume_security_audit_role(self, config):118        response = self.client.simulate_principal_policy(119            PolicySourceArn=config['source_arn'],120            ActionNames=['sts:AssumeRole'],121            ResourceArns=['arn:aws:iam::012345678901:role/InfosecClientRoles-InfosecSecurityAuditRole-01245ABCDEFG']122        )123        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'124    def test_denied_assume_security_audit_role(self, config):125        response = self.client.simulate_principal_policy(126            PolicySourceArn=config['source_arn'],127            ActionNames=['sts:AssumeRole'],128            ResourceArns=['arn:aws:iam::012345678901:role/SomeRoleThatIsNotAllowed']129        )130        assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'131    def test_allowed_infosec_sqs_actions(self, config):132        response = self.client.simulate_principal_policy(133            PolicySourceArn=config['source_arn'],134            ActionNames=[135                "sqs:GetQueueUrl",136                "sqs:ReceiveMessage",137                "sqs:DeleteMessage"138              ],139            ResourceArns=[config['InfosecQueueArn']]140        )141        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'142    def test_denied_infosec_sqs_actions(self, config):143        response = self.client.simulate_principal_policy(144            PolicySourceArn=config['source_arn'],145            ActionNames=[146                "sqs:GetQueueUrl",147                "sqs:ReceiveMessage",148                "sqs:DeleteMessage"149              ],150            ResourceArns=['arn:aws:sqs:us-west-2:012345678901:SomeOtherSQSQueue']151        )152        assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'153    def test_denied_infosec_sqs_send_message(self, config):154        response = self.client.simulate_principal_policy(155            PolicySourceArn=config['source_arn'],156            ActionNames=["sqs:SendMessage"],157            ResourceArns=[config['InfosecQueueArn']]158        )159        assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'160    def test_allowed_mig_sqs_actions(self, config):161        response = self.client.simulate_principal_policy(162            PolicySourceArn=config['source_arn'],163            ActionNames=[164                "sqs:GetQueueUrl",165                "sqs:ReceiveMessage",166                "sqs:DeleteMessage"167              ],168            ResourceArns=[config['MIGQueueArn']]169        )170        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'171    def test_denied_mig_sqs_send_message(self, config):172        response = self.client.simulate_principal_policy(173            PolicySourceArn=config['source_arn'],174            ActionNames=["sqs:SendMessage"],175            ResourceArns=[config['MIGQueueArn']]176        )177        assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'178    def test_allowed_fxa_sqs_actions(self, config):179        response = self.client.simulate_principal_policy(180            PolicySourceArn=config['source_arn'],181            ActionNames=[182                "sqs:GetQueueUrl",183                "sqs:SendMessage"184              ],185            ResourceArns=[config['FxaQueueArn']]186        )187        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'188    def test_denied_fxa_sqs_actions(self, config):189        response = self.client.simulate_principal_policy(190            PolicySourceArn=config['source_arn'],191            ActionNames=[192                "sqs:GetQueueUrl",193                "sqs:SendMessage"194              ],195            ResourceArns=['arn:aws:sqs:us-west-2:012345678901:SomeOtherSQSQueue']196        )197        assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'198    def test_denied_fxa_sqs_delete_message(self, config):199        response = self.client.simulate_principal_policy(200            PolicySourceArn=config['source_arn'],201            ActionNames=["sqs:DeleteMessage"],202            ResourceArns=[config['FxaQueueArn']]203        )204        assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'205    def test_allowed_assume_fxa_role(self, config):206        response = self.client.simulate_principal_policy(207            PolicySourceArn=config['source_arn'],208            ActionNames=['sts:AssumeRole'],209            ResourceArns=['arn:aws:iam::361527076523:role/ExampleRole']210        )211        assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'212    def test_allowed_vpc_blackholing(self, config):213        response = self.client.simulate_principal_policy(214            PolicySourceArn=config['source_arn'],215            ActionNames=[216                "ec2:DescribeRouteTables",217                "ec2:DescribeNetworkInterfaces",218                "ec2:CreateRoute"219              ]220        )...test_permissions.py
Source:test_permissions.py  
...49        )50    mock_session.assert_has_calls([call.client("iam"), call.client("sts")])51    mock_session.assert_has_calls(52        [53            call.client().simulate_principal_policy(54                PolicySourceArn=ANY,55                ActionNames=ANY,56                ResourceArns=["*"],57                ContextEntries=[{}],58            ),59        ],60    )61def test_ensure_integration_uninstall_permissions():62    mock_session = MagicMock()63    mock_session.client.return_value.simulate_principal_policy.return_value = {64        "EvaluationResults": [65            {"EvalActionName": "foo:bar", "EvalDecision": "allowed"},66            {"EvalActionName": "bar:baz", "EvalDecision": "denied"},67        ]68    }69    with raises(UsageError):70        ensure_integration_uninstall_permissions(71            integration_uninstall(session=mock_session)72        )73    mock_session.assert_has_calls([call.client("iam"), call.client("sts")])74    mock_session.assert_has_calls(75        [76            call.client().simulate_principal_policy(77                PolicySourceArn=ANY,78                ActionNames=ANY,79                ResourceArns=["*"],80                ContextEntries=[{}],81            ),82        ],83    )84def test_ensure_layer_install_permissions():85    mock_session = MagicMock()86    mock_session.client.return_value.simulate_principal_policy.return_value = {87        "EvaluationResults": [88            {"EvalActionName": "foo:bar", "EvalDecision": "allowed"},89            {"EvalActionName": "bar:baz", "EvalDecision": "denied"},90        ]91    }92    with raises(UsageError):93        ensure_layer_install_permissions(layer_install(session=mock_session))94    mock_session.assert_has_calls([call.client("iam"), call.client("sts")])95    mock_session.assert_has_calls(96        [97            call.client().simulate_principal_policy(98                PolicySourceArn=ANY,99                ActionNames=ANY,100                ResourceArns=["*"],101                ContextEntries=[{}],102            ),103        ],104    )105def test_ensure_layer_uninstall_permissions():106    mock_session = MagicMock()107    mock_session.client.return_value.simulate_principal_policy.return_value = {108        "EvaluationResults": [109            {"EvalActionName": "foo:bar", "EvalDecision": "allowed"},110            {"EvalActionName": "bar:baz", "EvalDecision": "denied"},111        ]112    }113    with raises(UsageError):114        ensure_layer_uninstall_permissions(layer_uninstall(session=mock_session))115    mock_session.assert_has_calls([call.client("iam"), call.client("sts")])116    mock_session.assert_has_calls(117        [118            call.client().simulate_principal_policy(119                PolicySourceArn=ANY,120                ActionNames=ANY,121                ResourceArns=["*"],122                ContextEntries=[{}],123            ),124        ],125    )126def test_ensure_function_list_permissions():127    mock_session = MagicMock()128    mock_session.client.return_value.simulate_principal_policy.return_value = {129        "EvaluationResults": [130            {"EvalActionName": "foo:bar", "EvalDecision": "allowed"},131            {"EvalActionName": "bar:baz", "EvalDecision": "denied"},132        ]133    }134    with raises(UsageError):135        ensure_function_list_permissions(mock_session)136    mock_session.assert_has_calls([call.client("iam"), call.client("sts")])137    mock_session.assert_has_calls(138        [139            call.client().simulate_principal_policy(140                PolicySourceArn=ANY,141                ActionNames=ANY,142                ResourceArns=["*"],143                ContextEntries=[{}],144            ),145        ],146    )147def test_ensure_subscription_install_permissions():148    mock_session = MagicMock()149    mock_session.client.return_value.simulate_principal_policy.return_value = {150        "EvaluationResults": [151            {"EvalActionName": "foo:bar", "EvalDecision": "allowed"},152            {"EvalActionName": "bar:baz", "EvalDecision": "denied"},153        ]154    }155    with raises(UsageError):156        ensure_subscription_install_permissions(157            subscription_install(session=mock_session)158        )159    mock_session.assert_has_calls([call.client("iam"), call.client("sts")])160    mock_session.assert_has_calls(161        [162            call.client().simulate_principal_policy(163                PolicySourceArn=ANY,164                ActionNames=ANY,165                ResourceArns=["*"],166                ContextEntries=[{}],167            ),168        ],169    )170def test_ensure_subscription_uninstall_permissions():171    mock_session = MagicMock()172    mock_session.client.return_value.simulate_principal_policy.return_value = {173        "EvaluationResults": [174            {"EvalActionName": "foo:bar", "EvalDecision": "allowed"},175            {"EvalActionName": "bar:baz", "EvalDecision": "denied"},176        ]177    }178    with raises(UsageError):179        ensure_subscription_uninstall_permissions(180            subscription_uninstall(session=mock_session)181        )182    mock_session.assert_has_calls([call.client("iam"), call.client("sts")])183    mock_session.assert_has_calls(184        [185            call.client().simulate_principal_policy(186                PolicySourceArn=ANY,187                ActionNames=ANY,188                ResourceArns=["*"],189                ContextEntries=[{}],190            ),191        ],...Lambda-iam-policy-validate-powerfulActions.py
Source:Lambda-iam-policy-validate-powerfulActions.py  
...20    # Create clients to call other services21    iam = boto3.client("iam")22    config = boto3.client("config")23    if resource_type == "AWS::IAM::User" or resource_type == "AWS::IAM::Role":24        compliance_status = simulate_principal_policy(iam, resource_arn)25        record_results(config, compliance_status, result_token, resource_type, resource_id, resource_arn, timestamp)26    elif resource_type == "AWS::IAM::Group":27        simulate_group(resource_arn, resource_id, resource_name, config, iam, result_token, timestamp)28    elif resource_type == "AWS::IAM::Policy":29        # Simulate the policy and record it's result30        compliance_status = simulate_managed_policy(iam, resource_arn)31        record_results(config, compliance_status, result_token, resource_type, resource_id, resource_arn, timestamp)32        # Get all of the attached principals33        attached_entities = iam.list_entities_for_policy(PolicyArn=resource_arn, MaxItems=1000)34        # Simulate all principals35        for role in attached_entities["PolicyRoles"]:36            role_arn = "arn:aws:iam:" + account_id + "::role/" + role["RoleName"]37            compliance_status = simulate_principal_policy(iam, role_arn)38            record_results(config, compliance_status, result_token, "AWS::IAM::Role", role["RoleId"], role_arn, timestamp)39        for user in attached_entities["PolicyUsers"]:40            user_arn = "arn:aws:iam:" + account_id + "::user/" + user["UserName"]41            compliance_status = simulate_principal_policy(iam, user_arn)42            record_results(config, compliance_status, result_token, "AWS::IAM::User", user["UserId"], user_arn, timestamp)43        for group in attached_entities["PolicyGroups"]:44            group_arn = "arn:aws:iam:" + account_id + "::group/" + group["GroupName"]45            simulate_group(group_arn, group["GroupId"], group["GroupName"], config, iam, result_token, timestamp)46def simulate_principal_policy(iam, resource_arn):47    # Call IAM to simulate the policy on restricted actions.48    response = iam.simulate_principal_policy(PolicySourceArn=resource_arn,ActionNames=POWERFUL_ACTIONS,ResourceArns=['*'])49    results = response['EvaluationResults']50    allows_powerful_action = False51    # Determine if any restricted actions are allowed.52    for actions in results:53        eval_decision = actions['EvalDecision']54        if(eval_decision == 'allowed'):55            action_name = actions['EvalActionName']56            print "Restricted action " + action_name + " was granted to resource " + resource_arn57            allows_powerful_action = True58    # If any restricted actions were allowed, consider the resource non-compliant.59    if(allows_powerful_action):60        return "NON_COMPLIANT"61    return "COMPLIANT"62def simulate_managed_policy(iam, policy_arn):63    # Retrieve the policy.64    get_policy_response = iam.get_policy(PolicyArn=policy_arn)65    default_version = get_policy_response["Policy"]["DefaultVersionId"]66    get_policy_version_response = iam.get_policy_version(PolicyArn=policy_arn, VersionId=default_version)67    policy_document = json.dumps(get_policy_version_response["PolicyVersion"]["Document"])68    # Simulate the policy69    simulation_response = iam.simulate_custom_policy(PolicyInputList=[policy_document], ActionNames=POWERFUL_ACTIONS, ResourceArns=["*"])70    results = simulation_response['EvaluationResults']71    allows_powerful_action = False72    # Determine if any restricted actions are allowed.73    for actions in results:74        evalDecision = actions['EvalDecision']75        if(evalDecision == 'allowed'):76            actionName = actions['EvalActionName']77            print "Restricted action " + actionName + " was granted to resource " + policy_arn78            allows_powerful_action = True79    # If any restricted actions were allowed, consider the resource non-compliant.80    if(allows_powerful_action):81        return "NON_COMPLIANT"82    return "COMPLIANT"83def record_results(config, compliance_result, result_token, resource_type, resource_id, resource_arn, timestamp):84    # Call Config to record the results of our evaluation.85    annotation = "Entity: " + resource_arn + " is " + compliance_result + " for validation of powerful actions access."86    config.put_evaluations(87        Evaluations=[88            {89                "ComplianceResourceType": resource_type,90                "ComplianceResourceId": resource_id,91                "ComplianceType": compliance_result,92                "Annotation": annotation,93                "OrderingTimestamp": timestamp94            },95        ],96        ResultToken=result_token97    )98def get_users_for_group(iam, group_name):99    response = iam.get_group(GroupName=group_name, MaxItems=1000)100    user_arns = []101    for user in response["Users"]:102        user_arns.append({'Arn': user["Arn"], 'UserId': user["UserId"]})103    return user_arns104def simulate_group(group_arn, group_id, group_name, config, iam, result_token, timestamp):105   # Simulate group, save status106    compliance_status = simulate_principal_policy(iam, group_arn)107    record_results(config, compliance_status, result_token, "AWS::IAM::Group", group_id, group_arn, timestamp)108    # Retrieve the ARNs of all users in the group109    users = get_users_for_group(iam, group_name)110    # For each user, simulate_principal111    for user in users:112        compliance_status = simulate_principal_policy(iam, user["Arn"])113        record_results(config, compliance_status, result_token, "AWS::IAM::User", user["UserId"], user["Arn"], timestamp)114def lambda_handler(event, context):115    invoking_event = json.loads(event["invokingEvent"])116    configuration_item = invoking_event["configurationItem"]117    result_token = "No token found."118    if "resultToken" in event:119        result_token = event["resultToken"]120    # Evaluate whether the resource is compliant...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
