How to use get_service_last_accessed_details_with_entities method in localstack

Best Python code snippet using localstack_python

accessadvisor_automation.py

Source:accessadvisor_automation.py Github

copy

Full Screen

...207 # Marker is only accepted if result was truncated.208 while status != "COMPLETED":209 try:210 if marker is None:211 response = client.get_service_last_accessed_details_with_entities(212 JobId=jobid,213 ServiceNamespace=service214 )215 else:216 response = client.get_service_last_accessed_details_with_entities(217 JobId=jobid,218 ServiceNamespace=service,219 Marker=marker220 )221 status = response['JobStatus']222 print('job status: ', status)223 time.sleep(2)224 except botocore.exceptions.ClientError as e:225 status = False226 if e.response['Error']['Code'] == True:227 response = 'We got an error'228 else:229 response = "Unexpected error: %s" % e230 return response...

Full Screen

Full Screen

client.py

Source:client.py Github

copy

Full Screen

...151 def get_server_certificate(self, ServerCertificateName: str) -> Dict:152 pass153 def get_service_last_accessed_details(self, JobId: str, MaxItems: int = None, Marker: str = None) -> Dict:154 pass155 def get_service_last_accessed_details_with_entities(self, JobId: str, ServiceNamespace: str, MaxItems: int = None, Marker: str = None) -> Dict:156 pass157 def get_service_linked_role_deletion_status(self, DeletionTaskId: str) -> Dict:158 pass159 def get_ssh_public_key(self, UserName: str, SSHPublicKeyId: str, Encoding: str) -> Dict:160 pass161 def get_user(self, UserName: str = None) -> Dict:162 pass163 def get_user_policy(self, UserName: str, PolicyName: str) -> Dict:164 pass165 def get_waiter(self, waiter_name: str = None) -> Waiter:166 pass167 def list_access_keys(self, UserName: str = None, Marker: str = None, MaxItems: int = None) -> Dict:168 pass169 def list_account_aliases(self, Marker: str = None, MaxItems: int = None) -> Dict:...

Full Screen

Full Screen

iar.py

Source:iar.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""Identity access report to generate and analyze AWS IAM policies for overprescribed permissions.3This module should be used to generate last access service details for IAM policies and analyze4each service to determine if it is not required in the policy based on a maximum number of days5unused.6Todo:7 * Many policies makes for a long run serially. Integrate threading to speed up the querying.8 * Beware rate limits though!9"""10import logging11from datetime import datetime, timedelta, timezone12from os.path import abspath13import boto314from botocore.exceptions import WaiterError15from botocore.waiter import WaiterModel, create_waiter_with_client16from waiters import (LAST_ACCESS_DETAILS_WAITER_NAME,17 last_access_details_waiter_config)18# Initialize logging19logger = logging.getLogger(__name__)20class PolicyScope:21 """Constants mapped to IAM policy scope.22 23 This class is used to define a series of AWS IAM policy scopes which are used24 to query for lists of policies in an AWS account. Each constant is defined as follows:25 - LOCAL: Targets only custom, un-managed IAM policies26 - AWS: Targets AWS managed policies27 - All: Targets all managed and un-managed policies28 """29 LOCAL = 'Local'30 AWS = 'AWS'31 ALL = 'All'32class IdentityAccessReport:33 """This class is used to generate and report on Service Last Access Details.34 35 In addition to reporting, it has the capability to determine whether a particular policy36 is overprescribed.37 Example:38 report = IdentityAccessReport()39 report.run()40 pprint.pprint(report.overprescribed_policies())41 """42 def __init__(self,43 profile_name='default',44 waiter_delay=1,45 max_attempts=30,):46 """Initializes logging, the IAM client and waiter.47 Args:48 profile_name (str): defines the profile boto3 should use to authenticate,49 defaults to 'default'50 waiter_delay (int): defines the amount of seconds to wait before checking51 if the report ran52 max_attempts (int): defines the maximum number of times to check if the53 report finished54 """55 # Initialize private class variables56 self.__waiter_delay = waiter_delay57 self.__max_attempts = max_attempts58 self.__jobs = []59 self.__json = {}60 self.__policy_arns = []61 # Initialize IAM client62 self.__initialize__iam_client(profile_name)63 # Setup waiter for last access job64 self.__waiter_model = WaiterModel(65 last_access_details_waiter_config(waiter_delay, max_attempts))66 self.__last_access_job_waiter = create_waiter_with_client(67 LAST_ACCESS_DETAILS_WAITER_NAME, self.__waiter_model,68 self.__iam_client)69 def __initialize__iam_client(self, profile_name):70 """Initializes the AWS session and IAM client.71 72 Args:73 profile_name (str): the AWS profile used to establish the AWS74 session, defaults to 'default'.75 """76 logger.info('Establishing IAM session with AWS...')77 self.__session = boto3.Session(profile_name=profile_name)78 self.__iam_client = self.__session.client('iam')79 def __query_policies(self, scope, attached):80 """Queries AWS IAM for a list of policies based on user-defined scope and role attachment.81 This method gathers a list of IAM policies associated with the account82 defined in the profile. It is configured to query policies based on83 user-defined scope and role attachment. The policy ARNs are stored in84 and are accessible from the class property ``policy_arns``.85 Args:86 scope (PolicyScope): defines a class constant mapping to AWS string87 literals defining scope (All, AWS, Local).88 attached (bool): defines whether you want to query for policies89 attached to existing roles or not.90 """91 marker = None # Set None for first run92 while True:93 # Gather the list of policies based on user input94 logger.info(95 'Querying for all policies in scope [%s], attached [%s], marker [%s]' %96 (scope, attached, marker))97 if marker:98 response = self.__iam_client.list_policies(Scope=scope,99 OnlyAttached=attached,100 Marker=marker)101 else:102 response = self.__iam_client.list_policies(Scope=scope,103 OnlyAttached=attached)104 logger.debug(response)105 # If response is not paginated, store policy ARNs. Otherwise,106 # loop pages and store ARNs.107 truncated = response.get('IsTruncated')108 if truncated:109 marker = response.get('Marker')110 self.__extract_policies(response.get('Policies'))111 else:112 self.__extract_policies(response.get('Policies'))113 break114 115 def __extract_policies(self, policies):116 """Loops through policies and extracts the AWS IAM policy ARNs.117 Args:118 policies (list): list of IAM policies each defined as a dictionary.119 """120 for policy in policies:121 self.__policy_arns.append(policy['Arn'])122 logger.debug(self.__policy_arns)123 def __gather_reports(self):124 """Gets service last access details reports for each job ID.125 This function takes the list of job IDs generated when the reports are run126 and downloads/stores the reports in a JSON format.127 Raises:128 WaiterError: If report does not complete after X seconds.129 """130 # Loop through job IDs131 logger.info('Downloading last access details for each job:')132 for job in self.__jobs:133 job_id = job[1]134 arn = job[0]135 try:136 # Use custom waiter to periodically query the job ID. If job137 # completed successfully, continue rest of try. Otherwise,138 # drop into exception.139 self.__last_access_job_waiter.wait(JobId=job_id)140 logger.info("\tJob completed for arn [" + arn + "]")141 response = self.__iam_client.get_service_last_accessed_details(142 JobId=job_id)143 logger.debug(response.get('ServicesLastAccessed'))144 self.__json[arn] = response.get('ServicesLastAccessed')145 except WaiterError:146 logger.error(147 '\tAccess details report not completed after ' +148 self.__waiter_delay + ' seconds. JobId: ' + job,149 exc_info=True)150 def __is_overprescribed(self, days, arn, service):151 """Determines if a service is overprescribed by calculating a time delta.152 Args:153 days (int): defines the maximum number of days a policy service should154 exist unused before being flagged as overprescribed.155 arn (str): the IAM policy ARN.156 service (dict): the policy service definition.157 Returns:158 bool: True if overprescribed, False otherwise.159 """160 last_authenticated = service.get('LastAuthenticated')161 time_delta = datetime.now(timezone.utc) - last_authenticated162 overprescribed = False163 if time_delta.days >= days:164 logger.debug(165 '%s - %s is overprescribed by %s days...' %166 (arn, service.get('ServiceName'),167 time_delta.days - days))168 overprescribed = True169 return overprescribed170 @property171 def policy_arns(self): # Readonly172 """list(str): contains AWS IAM policy ARNs"""173 return self.__policy_arns174 @property175 def json(self): # Readonly176 """dict: contains output of IAM last service access report."""177 return self.__json178 def run(self, scope=PolicyScope.LOCAL, attached=False):179 """Kicks off the creation an AWS IAM report for each IAM policy in an AWS account.180 This method queries IAM policies based on user input, generates a report for each181 policy, gathers the reports and stores the service last accessed data for analysis.182 Args:183 scope (PolicyScope): defines a class constant mapping to AWS string literals 184 defining scope (All, AWS, Local). Defaults to ``PolicyScope.Local``.185 attached (bool): defines whether you want to query for policies attached to186 existing roles or not. Defaults to ``False``.187 """188 self.__query_policies(scope, attached)189 logger.info(190 'Generating last access details for all queried policies:')191 192 # Loop through policies and kick of service last accessed details report.193 for arn in self.__policy_arns:194 response = self.__iam_client.generate_service_last_accessed_details(195 Arn=arn)196 self.__jobs.append((arn, response.get('JobId')))197 logger.info("\t%s --> Job: %s" %198 (arn, response.get('JobId')))199 self.__gather_reports()200 def overprescribed_policies(self, days=30):201 """Determines if queried policies have service permissions that have not been used for X days.202 This method will yield a ``True`` or ``False`` value for each service defined203 in the IAM policy. This is derived by evaluating the last time the service204 permissions were used against the user-specified maximum number of days205 acceptable. In addition, if a policy service has not been used in the206 allotted AWS IAM reporting window (365 days), this method will outline207 that as well.208 Args:209 days (int): defines the maximum number of days a policy service should210 exist unused before being flagged as overprescribed. Defaults to 30.211 Returns:212 dict: contains each policy ARN as the key and the list of services that are considered overprescribed.213 Raises:214 ValueError: If `days` > 365215 """216 # Reporting window for AWS IAM is a hard 365 days. Make sure we can't217 # exceed that.218 if days > 365:219 logger.error('days set to %s, cannot be greater than 365' %220 days)221 raise ValueError('days cannot be greater than 365')222 policies = {}223 # Loop through each policy224 for arn, services in self.__json.items():225 overprescribed_services = []226 # Loop through each policy's services to determine if they227 # are overprescribed.228 for service in services:229 last_authenticated = service.get('LastAuthenticated')230 # If last authenticated key exists, process time delta. Otherwise,231 # automatically flag service as overprescribed based on comment232 # below.233 if last_authenticated is not None:234 # Calculate the delta between the last authenticated field235 # and the current run time.236 if self.__is_overprescribed(days, arn, service):237 overprescribed_services.append(service)238 else:239 # A service that appears in the report but does not contain the240 # field 'LastAuthenticated' is defined as an overprescribed service241 # by the boto3 documentation. See below:242 # LastAuthenticated: This field is null if no IAM entities attempted243 # to access the service within the reporting period.244 # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html#IAM.Client.get_service_last_accessed_details_with_entities245 logger.debug(246 "%s - %s missing LastAuthenticated field, not used within reporting period, flagging overprescribed"247 % (arn, service.get('ServiceName')))248 overprescribed_services.append(service)249 policies[arn] = overprescribed_services...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful