Best Python code snippet using localstack_python
accessadvisor_automation.py
Source:accessadvisor_automation.py  
...207        # Marker is only accepted if result was truncated.208        while status != "COMPLETED":209            try:210                if marker is None:211                    response = client.get_service_last_accessed_details_with_entities(212                        JobId=jobid,213                        ServiceNamespace=service214                    )215                else:216                    response = client.get_service_last_accessed_details_with_entities(217                        JobId=jobid,218                        ServiceNamespace=service,219                        Marker=marker220                    )221                status = response['JobStatus']222                print('job status: ', status)223                time.sleep(2)224            except botocore.exceptions.ClientError as e:225                status = False226                if e.response['Error']['Code'] == True:227                    response = 'We got an error'228                else:229                    response = "Unexpected error: %s" % e230    return response...client.py
Source:client.py  
...151    def get_server_certificate(self, ServerCertificateName: str) -> Dict:152        pass153    def get_service_last_accessed_details(self, JobId: str, MaxItems: int = None, Marker: str = None) -> Dict:154        pass155    def get_service_last_accessed_details_with_entities(self, JobId: str, ServiceNamespace: str, MaxItems: int = None, Marker: str = None) -> Dict:156        pass157    def get_service_linked_role_deletion_status(self, DeletionTaskId: str) -> Dict:158        pass159    def get_ssh_public_key(self, UserName: str, SSHPublicKeyId: str, Encoding: str) -> Dict:160        pass161    def get_user(self, UserName: str = None) -> Dict:162        pass163    def get_user_policy(self, UserName: str, PolicyName: str) -> Dict:164        pass165    def get_waiter(self, waiter_name: str = None) -> Waiter:166        pass167    def list_access_keys(self, UserName: str = None, Marker: str = None, MaxItems: int = None) -> Dict:168        pass169    def list_account_aliases(self, Marker: str = None, MaxItems: int = None) -> Dict:...iar.py
Source:iar.py  
1# -*- coding: utf-8 -*-2"""Identity access report to generate and analyze AWS IAM policies for overprescribed permissions.3This module should be used to generate last access service details for IAM policies and analyze4each service to determine if it is not required in the policy based on a maximum number of days5unused.6Todo:7    * Many policies makes for a long run serially. Integrate threading to speed up the querying.8        * Beware rate limits though!9"""10import logging11from datetime import datetime, timedelta, timezone12from os.path import abspath13import boto314from botocore.exceptions import WaiterError15from botocore.waiter import WaiterModel, create_waiter_with_client16from waiters import (LAST_ACCESS_DETAILS_WAITER_NAME,17                     last_access_details_waiter_config)18# Initialize logging19logger = logging.getLogger(__name__)20class PolicyScope:21    """Constants mapped to IAM policy scope.22    23    This class is used to define a series of AWS IAM policy scopes which are used24    to query for lists of policies in an AWS account. Each constant is defined as follows:25        - LOCAL: Targets only custom, un-managed IAM policies26        - AWS: Targets AWS managed policies27        - All: Targets all managed and un-managed policies28    """29    LOCAL = 'Local'30    AWS = 'AWS'31    ALL = 'All'32class IdentityAccessReport:33    """This class is used to generate and report on Service Last Access Details.34    35    In addition to reporting, it has the capability to determine whether a particular policy36    is overprescribed.37    Example:38        report = IdentityAccessReport()39        report.run()40        pprint.pprint(report.overprescribed_policies())41    """42    def __init__(self,43                 profile_name='default',44                 waiter_delay=1,45                 max_attempts=30,):46        """Initializes logging, the IAM client and waiter.47        Args:48            profile_name (str): defines the profile boto3 should use to authenticate,49                defaults to 'default'50            waiter_delay (int): defines the amount of seconds to wait before checking51                if the report ran52            max_attempts (int): defines the maximum number of times to check if the53                report finished54        """55        # Initialize private class variables56        self.__waiter_delay = waiter_delay57        self.__max_attempts = max_attempts58        self.__jobs = []59        self.__json = {}60        self.__policy_arns = []61        # Initialize IAM client62        self.__initialize__iam_client(profile_name)63        # Setup waiter for last access job64        self.__waiter_model = WaiterModel(65            last_access_details_waiter_config(waiter_delay, max_attempts))66        self.__last_access_job_waiter = create_waiter_with_client(67            LAST_ACCESS_DETAILS_WAITER_NAME, self.__waiter_model,68            self.__iam_client)69    def __initialize__iam_client(self, profile_name):70        """Initializes the AWS session and IAM client.71        72        Args:73            profile_name (str): the AWS profile used to establish the AWS74                session, defaults to 'default'.75        """76        logger.info('Establishing IAM session with AWS...')77        self.__session = boto3.Session(profile_name=profile_name)78        self.__iam_client = self.__session.client('iam')79    def __query_policies(self, scope, attached):80        """Queries AWS IAM for a list of policies based on user-defined scope and role attachment.81        This method gathers a list of IAM policies associated with the account82        defined in the profile. It is configured to query policies based on83        user-defined scope and role attachment. The policy ARNs are stored in84        and are accessible from the class property ``policy_arns``.85        Args:86            scope (PolicyScope): defines a class constant mapping to AWS string87                literals defining scope (All, AWS, Local).88            attached (bool): defines whether you want to query for policies89                attached to existing roles or not.90        """91        marker = None # Set None for first run92        while True:93            # Gather the list of policies based on user input94            logger.info(95                'Querying for all policies in scope [%s], attached [%s], marker [%s]' %96                (scope, attached, marker))97            if marker:98                response = self.__iam_client.list_policies(Scope=scope,99                                                        OnlyAttached=attached,100                                                        Marker=marker)101            else:102                response = self.__iam_client.list_policies(Scope=scope,103                                                        OnlyAttached=attached)104            logger.debug(response)105            # If response is not paginated, store policy ARNs. Otherwise,106            # loop pages and store ARNs.107            truncated = response.get('IsTruncated')108            if truncated:109                marker = response.get('Marker')110                self.__extract_policies(response.get('Policies'))111            else:112                self.__extract_policies(response.get('Policies'))113                break114            115    def __extract_policies(self, policies):116        """Loops through policies and extracts the AWS IAM policy ARNs.117        Args:118            policies (list): list of IAM policies each defined as a dictionary.119        """120        for policy in policies:121            self.__policy_arns.append(policy['Arn'])122        logger.debug(self.__policy_arns)123    def __gather_reports(self):124        """Gets service last access details reports for each job ID.125        This function takes the list of job IDs generated when the reports are run126        and downloads/stores the reports in a JSON format.127        Raises:128            WaiterError: If report does not complete after X seconds.129        """130        # Loop through job IDs131        logger.info('Downloading last access details for each job:')132        for job in self.__jobs:133            job_id = job[1]134            arn = job[0]135            try:136                # Use custom waiter to periodically query the job ID. If job137                # completed successfully, continue rest of try. Otherwise,138                # drop into exception.139                self.__last_access_job_waiter.wait(JobId=job_id)140                logger.info("\tJob completed for arn [" + arn + "]")141                response = self.__iam_client.get_service_last_accessed_details(142                    JobId=job_id)143                logger.debug(response.get('ServicesLastAccessed'))144                self.__json[arn] = response.get('ServicesLastAccessed')145            except WaiterError:146                logger.error(147                    '\tAccess details report not completed after ' +148                    self.__waiter_delay + ' seconds. JobId: ' + job,149                    exc_info=True)150    def __is_overprescribed(self, days, arn, service):151        """Determines if a service is overprescribed by calculating a time delta.152        Args:153            days (int): defines the maximum number of days a policy service should154                exist unused before being flagged as overprescribed.155            arn (str): the IAM policy ARN.156            service (dict): the policy service definition.157        Returns:158            bool: True if overprescribed, False otherwise.159        """160        last_authenticated = service.get('LastAuthenticated')161        time_delta = datetime.now(timezone.utc) - last_authenticated162        overprescribed = False163        if time_delta.days >= days:164            logger.debug(165                '%s - %s is overprescribed by %s days...' %166                (arn, service.get('ServiceName'),167                 time_delta.days - days))168            overprescribed = True169        return overprescribed170    @property171    def policy_arns(self):  # Readonly172        """list(str): contains AWS IAM policy ARNs"""173        return self.__policy_arns174    @property175    def json(self):  # Readonly176        """dict: contains output of IAM last service access report."""177        return self.__json178    def run(self, scope=PolicyScope.LOCAL, attached=False):179        """Kicks off the creation an AWS IAM report for each IAM policy in an AWS account.180        This method queries IAM policies based on user input, generates a report for each181        policy, gathers the reports and stores the service last accessed data for analysis.182        Args:183            scope (PolicyScope): defines a class constant mapping to AWS string literals 184                defining scope (All, AWS, Local). Defaults to ``PolicyScope.Local``.185            attached (bool): defines whether you want to query for policies attached to186                existing roles or not. Defaults to ``False``.187        """188        self.__query_policies(scope, attached)189        logger.info(190            'Generating last access details for all queried policies:')191        192        # Loop through policies and kick of service last accessed details report.193        for arn in self.__policy_arns:194            response = self.__iam_client.generate_service_last_accessed_details(195                Arn=arn)196            self.__jobs.append((arn, response.get('JobId')))197            logger.info("\t%s --> Job: %s" %198                               (arn, response.get('JobId')))199        self.__gather_reports()200    def overprescribed_policies(self, days=30):201        """Determines if queried policies have service permissions that have not been used for X days.202        This method will yield a ``True`` or ``False`` value for each service defined203        in the IAM policy. This is derived by evaluating the last time the service204        permissions were used against the user-specified maximum number of days205        acceptable. In addition, if a policy service has not been used in the206        allotted AWS IAM reporting window (365 days), this method will outline207        that as well.208        Args:209            days (int): defines the maximum number of days a policy service should210                exist unused before being flagged as overprescribed. Defaults to 30.211        Returns:212            dict: contains each policy ARN as the key and the list of services that are considered overprescribed.213        Raises:214            ValueError: If `days` > 365215        """216        # Reporting window for AWS IAM is a hard 365 days. Make sure we can't217        # exceed that.218        if days > 365:219            logger.error('days set to %s, cannot be greater than 365' %220                                days)221            raise ValueError('days cannot be greater than 365')222        policies = {}223        # Loop through each policy224        for arn, services in self.__json.items():225            overprescribed_services = []226            # Loop through each policy's services to determine if they227            # are overprescribed.228            for service in services:229                last_authenticated = service.get('LastAuthenticated')230                # If last authenticated key exists, process time delta. Otherwise,231                # automatically flag service as overprescribed based on comment232                # below.233                if last_authenticated is not None:234                    # Calculate the delta between the last authenticated field235                    # and the current run time.236                    if self.__is_overprescribed(days, arn, service):237                        overprescribed_services.append(service)238                else:239                    # A service that appears in the report but does not contain the240                    # field 'LastAuthenticated' is defined as an overprescribed service241                    # by the boto3 documentation. See below:242                    #    LastAuthenticated: This field is null if no IAM entities attempted243                    #                       to access the service within the reporting period.244                    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html#IAM.Client.get_service_last_accessed_details_with_entities245                    logger.debug(246                        "%s - %s missing LastAuthenticated field, not used within reporting period, flagging overprescribed"247                        % (arn, service.get('ServiceName')))248                    overprescribed_services.append(service)249            policies[arn] = overprescribed_services...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
