How to use simulate_principal_policy method in localstack

Best Python code snippet using localstack_python

test_mozdef_user.py

Source:test_mozdef_user.py Github

copy

Full Screen

...8 """9 cls.client = boto3.client('iam')10 # http://boto3.readthedocs.io/en/latest/reference/services/iam.html#IAM.Client.simulate_custom_policy11 def test_allowed_list_buckets(self, config):12 response = self.client.simulate_principal_policy(13 PolicySourceArn=config['source_arn'],14 ActionNames=['s3:ListAllMyBuckets']15 )16 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'17 def test_allowed_list_bucket_contents(self, config):18 response = self.client.simulate_principal_policy(19 PolicySourceArn=config['source_arn'],20 ActionNames=['s3:ListBucket'],21 ResourceArns=[22 'arn:aws:s3:::%s' % config['BackupBucketName'],23 'arn:aws:s3:::%s' % config['NewProdBackupBucketName'],24 'arn:aws:s3:::%s' % config['BlocklistBucketName'],25 'arn:aws:s3:::%s' % config['IPSpaceBucketName'],26 ]27 )28 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'29 def test_allowed_write_to_mozdefes2backups(self, config):30 response = self.client.simulate_principal_policy(31 PolicySourceArn=config['source_arn'],32 ActionNames=['s3:PutObject', 's3:DeleteObject'],33 ResourceArns=['arn:aws:s3:::%s/example_file.txt' % config['BackupBucketName'],]34 )35 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'36 def test_allowed_write_to_new_prod_backup_bucket(self, config):37 response = self.client.simulate_principal_policy(38 PolicySourceArn=config['source_arn'],39 ActionNames=['s3:PutObject', 's3:DeleteObject'],40 ResourceArns=['arn:aws:s3:::%s/example_file.txt' % config['NewProdBackupBucketName'],]41 )42 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'43 def test_denied_write_to_other_environments_backup_bucket(self, config):44 environment_index = config['environment_name_list'].index(45 config['environment_name'])46 other_environment_index = (47 environment_index + 148 if environment_index < (len(config['environment_name_list']) - 1)49 else 050 )51 other_environment_name = config['environment_name_list'][52 other_environment_index]53 other_config = config['all_environments'][other_environment_name]54 response = self.client.simulate_principal_policy(55 PolicySourceArn=config['source_arn'],56 ActionNames=['s3:PutObject', 's3:DeleteObject'],57 ResourceArns=[58 'arn:aws:s3:::%s/example_file.txt' %59 other_config['NewProdBackupBucketName']]60 )61 assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'62 def test_allowed_write_to_mozilla_infosec_blocklist(self, config):63 response = self.client.simulate_principal_policy(64 PolicySourceArn=config['source_arn'],65 ActionNames=['s3:PutObject'],66 ResourceArns=['arn:aws:s3:::%s/example_file.txt' % config['BlocklistBucketName']]67 )68 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'69 def test_denied_write_to_bucket(self, config):70 response = self.client.simulate_principal_policy(71 PolicySourceArn=config['source_arn'],72 ActionNames=['s3:PutObject'],73 ResourceArns=[74 'arn:aws:s3:::BucketThatIsNotAllowed/example_file.txt']75 )76 assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'77 def test_allowed_read_from_mozilla_ipspace(self, config):78 response = self.client.simulate_principal_policy(79 PolicySourceArn=config['source_arn'],80 ActionNames=['s3:GetObject'],81 ResourceArns=['arn:aws:s3:::%s/example_file.txt' % config['IPSpaceBucketName']]82 )83 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'84 def test_denied_read_from_bucket(self, config):85 response = self.client.simulate_principal_policy(86 PolicySourceArn=config['source_arn'],87 ActionNames=['s3:GetObject'],88 ResourceArns=['arn:aws:s3:::BucketThatIsNotAllowed/example_file.txt']89 )90 assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'91 def test_allowed_list_cloudtrail_bucket_contents(self, config):92 response = self.client.simulate_principal_policy(93 PolicySourceArn=config['source_arn'],94 ActionNames=['s3:ListBucket'],95 ResourceArns=['arn:aws:s3:::AnyBucketAtAll']96 )97 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'98 def test_allowed_get_cloudtrail_log(self, config):99 response = self.client.simulate_principal_policy(100 PolicySourceArn=config['source_arn'],101 ActionNames=['s3:GetObject'],102 ResourceArns = ['arn:aws:s3:::AnyBucketAtAll/AWSLogs/012345678901/CloudTrail/ap-northeast-1/2017/02/15/012345678901_CloudTrail_ap-northeast-1_20170215T0000Z_UVpGnwCcvkdew1nf.json.gz']103 )104 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'105 def test_allowed_describe_cloudtrails(self, config):106 response = self.client.simulate_principal_policy(107 PolicySourceArn=config['source_arn'],108 ActionNames=['cloudtrail:DescribeTrails']109 )110 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'111 def test_allowed_get_session_token(self, config):112 response = self.client.simulate_principal_policy(113 PolicySourceArn=config['source_arn'],114 ActionNames=['sts:GetSessionToken']115 )116 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'117 def test_allowed_assume_security_audit_role(self, config):118 response = self.client.simulate_principal_policy(119 PolicySourceArn=config['source_arn'],120 ActionNames=['sts:AssumeRole'],121 ResourceArns=['arn:aws:iam::012345678901:role/InfosecClientRoles-InfosecSecurityAuditRole-01245ABCDEFG']122 )123 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'124 def test_denied_assume_security_audit_role(self, config):125 response = self.client.simulate_principal_policy(126 PolicySourceArn=config['source_arn'],127 ActionNames=['sts:AssumeRole'],128 ResourceArns=['arn:aws:iam::012345678901:role/SomeRoleThatIsNotAllowed']129 )130 assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'131 def test_allowed_infosec_sqs_actions(self, config):132 response = self.client.simulate_principal_policy(133 PolicySourceArn=config['source_arn'],134 ActionNames=[135 "sqs:GetQueueUrl",136 "sqs:ReceiveMessage",137 "sqs:DeleteMessage"138 ],139 ResourceArns=[config['InfosecQueueArn']]140 )141 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'142 def test_denied_infosec_sqs_actions(self, config):143 response = self.client.simulate_principal_policy(144 PolicySourceArn=config['source_arn'],145 ActionNames=[146 "sqs:GetQueueUrl",147 "sqs:ReceiveMessage",148 "sqs:DeleteMessage"149 ],150 ResourceArns=['arn:aws:sqs:us-west-2:012345678901:SomeOtherSQSQueue']151 )152 assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'153 def test_denied_infosec_sqs_send_message(self, config):154 response = self.client.simulate_principal_policy(155 PolicySourceArn=config['source_arn'],156 ActionNames=["sqs:SendMessage"],157 ResourceArns=[config['InfosecQueueArn']]158 )159 assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'160 def test_allowed_mig_sqs_actions(self, config):161 response = self.client.simulate_principal_policy(162 PolicySourceArn=config['source_arn'],163 ActionNames=[164 "sqs:GetQueueUrl",165 "sqs:ReceiveMessage",166 "sqs:DeleteMessage"167 ],168 ResourceArns=[config['MIGQueueArn']]169 )170 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'171 def test_denied_mig_sqs_send_message(self, config):172 response = self.client.simulate_principal_policy(173 PolicySourceArn=config['source_arn'],174 ActionNames=["sqs:SendMessage"],175 ResourceArns=[config['MIGQueueArn']]176 )177 assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'178 def test_allowed_fxa_sqs_actions(self, config):179 response = self.client.simulate_principal_policy(180 PolicySourceArn=config['source_arn'],181 ActionNames=[182 "sqs:GetQueueUrl",183 "sqs:SendMessage"184 ],185 ResourceArns=[config['FxaQueueArn']]186 )187 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'188 def test_denied_fxa_sqs_actions(self, config):189 response = self.client.simulate_principal_policy(190 PolicySourceArn=config['source_arn'],191 ActionNames=[192 "sqs:GetQueueUrl",193 "sqs:SendMessage"194 ],195 ResourceArns=['arn:aws:sqs:us-west-2:012345678901:SomeOtherSQSQueue']196 )197 assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'198 def test_denied_fxa_sqs_delete_message(self, config):199 response = self.client.simulate_principal_policy(200 PolicySourceArn=config['source_arn'],201 ActionNames=["sqs:DeleteMessage"],202 ResourceArns=[config['FxaQueueArn']]203 )204 assert response['EvaluationResults'][0]['EvalDecision'] == 'implicitDeny'205 def test_allowed_assume_fxa_role(self, config):206 response = self.client.simulate_principal_policy(207 PolicySourceArn=config['source_arn'],208 ActionNames=['sts:AssumeRole'],209 ResourceArns=['arn:aws:iam::361527076523:role/ExampleRole']210 )211 assert response['EvaluationResults'][0]['EvalDecision'] == 'allowed'212 def test_allowed_vpc_blackholing(self, config):213 response = self.client.simulate_principal_policy(214 PolicySourceArn=config['source_arn'],215 ActionNames=[216 "ec2:DescribeRouteTables",217 "ec2:DescribeNetworkInterfaces",218 "ec2:CreateRoute"219 ]220 )...

Full Screen

Full Screen

test_permissions.py

Source:test_permissions.py Github

copy

Full Screen

...49 )50 mock_session.assert_has_calls([call.client("iam"), call.client("sts")])51 mock_session.assert_has_calls(52 [53 call.client().simulate_principal_policy(54 PolicySourceArn=ANY,55 ActionNames=ANY,56 ResourceArns=["*"],57 ContextEntries=[{}],58 ),59 ],60 )61def test_ensure_integration_uninstall_permissions():62 mock_session = MagicMock()63 mock_session.client.return_value.simulate_principal_policy.return_value = {64 "EvaluationResults": [65 {"EvalActionName": "foo:bar", "EvalDecision": "allowed"},66 {"EvalActionName": "bar:baz", "EvalDecision": "denied"},67 ]68 }69 with raises(UsageError):70 ensure_integration_uninstall_permissions(71 integration_uninstall(session=mock_session)72 )73 mock_session.assert_has_calls([call.client("iam"), call.client("sts")])74 mock_session.assert_has_calls(75 [76 call.client().simulate_principal_policy(77 PolicySourceArn=ANY,78 ActionNames=ANY,79 ResourceArns=["*"],80 ContextEntries=[{}],81 ),82 ],83 )84def test_ensure_layer_install_permissions():85 mock_session = MagicMock()86 mock_session.client.return_value.simulate_principal_policy.return_value = {87 "EvaluationResults": [88 {"EvalActionName": "foo:bar", "EvalDecision": "allowed"},89 {"EvalActionName": "bar:baz", "EvalDecision": "denied"},90 ]91 }92 with raises(UsageError):93 ensure_layer_install_permissions(layer_install(session=mock_session))94 mock_session.assert_has_calls([call.client("iam"), call.client("sts")])95 mock_session.assert_has_calls(96 [97 call.client().simulate_principal_policy(98 PolicySourceArn=ANY,99 ActionNames=ANY,100 ResourceArns=["*"],101 ContextEntries=[{}],102 ),103 ],104 )105def test_ensure_layer_uninstall_permissions():106 mock_session = MagicMock()107 mock_session.client.return_value.simulate_principal_policy.return_value = {108 "EvaluationResults": [109 {"EvalActionName": "foo:bar", "EvalDecision": "allowed"},110 {"EvalActionName": "bar:baz", "EvalDecision": "denied"},111 ]112 }113 with raises(UsageError):114 ensure_layer_uninstall_permissions(layer_uninstall(session=mock_session))115 mock_session.assert_has_calls([call.client("iam"), call.client("sts")])116 mock_session.assert_has_calls(117 [118 call.client().simulate_principal_policy(119 PolicySourceArn=ANY,120 ActionNames=ANY,121 ResourceArns=["*"],122 ContextEntries=[{}],123 ),124 ],125 )126def test_ensure_function_list_permissions():127 mock_session = MagicMock()128 mock_session.client.return_value.simulate_principal_policy.return_value = {129 "EvaluationResults": [130 {"EvalActionName": "foo:bar", "EvalDecision": "allowed"},131 {"EvalActionName": "bar:baz", "EvalDecision": "denied"},132 ]133 }134 with raises(UsageError):135 ensure_function_list_permissions(mock_session)136 mock_session.assert_has_calls([call.client("iam"), call.client("sts")])137 mock_session.assert_has_calls(138 [139 call.client().simulate_principal_policy(140 PolicySourceArn=ANY,141 ActionNames=ANY,142 ResourceArns=["*"],143 ContextEntries=[{}],144 ),145 ],146 )147def test_ensure_subscription_install_permissions():148 mock_session = MagicMock()149 mock_session.client.return_value.simulate_principal_policy.return_value = {150 "EvaluationResults": [151 {"EvalActionName": "foo:bar", "EvalDecision": "allowed"},152 {"EvalActionName": "bar:baz", "EvalDecision": "denied"},153 ]154 }155 with raises(UsageError):156 ensure_subscription_install_permissions(157 subscription_install(session=mock_session)158 )159 mock_session.assert_has_calls([call.client("iam"), call.client("sts")])160 mock_session.assert_has_calls(161 [162 call.client().simulate_principal_policy(163 PolicySourceArn=ANY,164 ActionNames=ANY,165 ResourceArns=["*"],166 ContextEntries=[{}],167 ),168 ],169 )170def test_ensure_subscription_uninstall_permissions():171 mock_session = MagicMock()172 mock_session.client.return_value.simulate_principal_policy.return_value = {173 "EvaluationResults": [174 {"EvalActionName": "foo:bar", "EvalDecision": "allowed"},175 {"EvalActionName": "bar:baz", "EvalDecision": "denied"},176 ]177 }178 with raises(UsageError):179 ensure_subscription_uninstall_permissions(180 subscription_uninstall(session=mock_session)181 )182 mock_session.assert_has_calls([call.client("iam"), call.client("sts")])183 mock_session.assert_has_calls(184 [185 call.client().simulate_principal_policy(186 PolicySourceArn=ANY,187 ActionNames=ANY,188 ResourceArns=["*"],189 ContextEntries=[{}],190 ),191 ],...

Full Screen

Full Screen

Lambda-iam-policy-validate-powerfulActions.py

Source:Lambda-iam-policy-validate-powerfulActions.py Github

copy

Full Screen

...20 # Create clients to call other services21 iam = boto3.client("iam")22 config = boto3.client("config")23 if resource_type == "AWS::IAM::User" or resource_type == "AWS::IAM::Role":24 compliance_status = simulate_principal_policy(iam, resource_arn)25 record_results(config, compliance_status, result_token, resource_type, resource_id, resource_arn, timestamp)26 elif resource_type == "AWS::IAM::Group":27 simulate_group(resource_arn, resource_id, resource_name, config, iam, result_token, timestamp)28 elif resource_type == "AWS::IAM::Policy":29 # Simulate the policy and record it's result30 compliance_status = simulate_managed_policy(iam, resource_arn)31 record_results(config, compliance_status, result_token, resource_type, resource_id, resource_arn, timestamp)32 # Get all of the attached principals33 attached_entities = iam.list_entities_for_policy(PolicyArn=resource_arn, MaxItems=1000)34 # Simulate all principals35 for role in attached_entities["PolicyRoles"]:36 role_arn = "arn:aws:iam:" + account_id + "::role/" + role["RoleName"]37 compliance_status = simulate_principal_policy(iam, role_arn)38 record_results(config, compliance_status, result_token, "AWS::IAM::Role", role["RoleId"], role_arn, timestamp)39 for user in attached_entities["PolicyUsers"]:40 user_arn = "arn:aws:iam:" + account_id + "::user/" + user["UserName"]41 compliance_status = simulate_principal_policy(iam, user_arn)42 record_results(config, compliance_status, result_token, "AWS::IAM::User", user["UserId"], user_arn, timestamp)43 for group in attached_entities["PolicyGroups"]:44 group_arn = "arn:aws:iam:" + account_id + "::group/" + group["GroupName"]45 simulate_group(group_arn, group["GroupId"], group["GroupName"], config, iam, result_token, timestamp)46def simulate_principal_policy(iam, resource_arn):47 # Call IAM to simulate the policy on restricted actions.48 response = iam.simulate_principal_policy(PolicySourceArn=resource_arn,ActionNames=POWERFUL_ACTIONS,ResourceArns=['*'])49 results = response['EvaluationResults']50 allows_powerful_action = False51 # Determine if any restricted actions are allowed.52 for actions in results:53 eval_decision = actions['EvalDecision']54 if(eval_decision == 'allowed'):55 action_name = actions['EvalActionName']56 print "Restricted action " + action_name + " was granted to resource " + resource_arn57 allows_powerful_action = True58 # If any restricted actions were allowed, consider the resource non-compliant.59 if(allows_powerful_action):60 return "NON_COMPLIANT"61 return "COMPLIANT"62def simulate_managed_policy(iam, policy_arn):63 # Retrieve the policy.64 get_policy_response = iam.get_policy(PolicyArn=policy_arn)65 default_version = get_policy_response["Policy"]["DefaultVersionId"]66 get_policy_version_response = iam.get_policy_version(PolicyArn=policy_arn, VersionId=default_version)67 policy_document = json.dumps(get_policy_version_response["PolicyVersion"]["Document"])68 # Simulate the policy69 simulation_response = iam.simulate_custom_policy(PolicyInputList=[policy_document], ActionNames=POWERFUL_ACTIONS, ResourceArns=["*"])70 results = simulation_response['EvaluationResults']71 allows_powerful_action = False72 # Determine if any restricted actions are allowed.73 for actions in results:74 evalDecision = actions['EvalDecision']75 if(evalDecision == 'allowed'):76 actionName = actions['EvalActionName']77 print "Restricted action " + actionName + " was granted to resource " + policy_arn78 allows_powerful_action = True79 # If any restricted actions were allowed, consider the resource non-compliant.80 if(allows_powerful_action):81 return "NON_COMPLIANT"82 return "COMPLIANT"83def record_results(config, compliance_result, result_token, resource_type, resource_id, resource_arn, timestamp):84 # Call Config to record the results of our evaluation.85 annotation = "Entity: " + resource_arn + " is " + compliance_result + " for validation of powerful actions access."86 config.put_evaluations(87 Evaluations=[88 {89 "ComplianceResourceType": resource_type,90 "ComplianceResourceId": resource_id,91 "ComplianceType": compliance_result,92 "Annotation": annotation,93 "OrderingTimestamp": timestamp94 },95 ],96 ResultToken=result_token97 )98def get_users_for_group(iam, group_name):99 response = iam.get_group(GroupName=group_name, MaxItems=1000)100 user_arns = []101 for user in response["Users"]:102 user_arns.append({'Arn': user["Arn"], 'UserId': user["UserId"]})103 return user_arns104def simulate_group(group_arn, group_id, group_name, config, iam, result_token, timestamp):105 # Simulate group, save status106 compliance_status = simulate_principal_policy(iam, group_arn)107 record_results(config, compliance_status, result_token, "AWS::IAM::Group", group_id, group_arn, timestamp)108 # Retrieve the ARNs of all users in the group109 users = get_users_for_group(iam, group_name)110 # For each user, simulate_principal111 for user in users:112 compliance_status = simulate_principal_policy(iam, user["Arn"])113 record_results(config, compliance_status, result_token, "AWS::IAM::User", user["UserId"], user["Arn"], timestamp)114def lambda_handler(event, context):115 invoking_event = json.loads(event["invokingEvent"])116 configuration_item = invoking_event["configurationItem"]117 result_token = "No token found."118 if "resultToken" in event:119 result_token = event["resultToken"]120 # Evaluate whether the resource is compliant...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful