How to use disable_key method in localstack

Best Python code snippet using localstack_python

access-key-rotation.py

Source:access-key-rotation.py Github

copy

Full Screen

1import boto32from botocore.exceptions import ClientError3import json4# Definición de clientes5iam_client = boto3.client('iam')6ses_client = boto3.client('ses')7sm_client = boto3.client('secretsmanager')8# Lista de usuarios a excluir en la rotacion automatica, separados por comas9exclude_users=[""]10# Lista de usuarios a incluir en la rotacion automatica, separados por comas11include_users=[""] 12# Metodo que procesa todos los eventos13def lambda_handler(event, context):14 # Guarda en la var message el mensaje sns como json15 message = json.loads(event['Records'][0]['Sns']['Message'])16 # Parseamos el mensaje hasta obtener el resourceId17 resourceId = message["detail"]18 resourceId = resourceId["newEvaluationResult"]19 resourceId = resourceId["evaluationResultIdentifier"]20 resourceId = resourceId["evaluationResultQualifier"]21 resourceId = resourceId["resourceId"]22 23 # Obtiene el username en funcion del resourceId y lo guarda en la var username24 username = getUser(resourceId)25 # Lista las keys del usuarname obtenido con la funcion de boto326 keys = iam_client.list_access_keys(UserName=username)27 # Evalua cuantas keys tiene el usuario28 ## Ai tiene 2 keys, guarda la más antigua para eliminarla y guarda la más reciente para deshabilitarla29 if len(keys['AccessKeyMetadata']) == 2:30 d1=keys['AccessKeyMetadata'][0]['CreateDate']31 d2=keys['AccessKeyMetadata'][1]['CreateDate']32 if d1 < d2:33 delete_key=keys['AccessKeyMetadata'][0]34 disable_key=keys['AccessKeyMetadata'][1]35 else:36 delete_key=keys['AccessKeyMetadata'][1]37 disable_key=keys['AccessKeyMetadata'][0] 38 ## Si tiene una sola key, la guarda para deshabilitarla39 else:40 disable_key=keys['AccessKeyMetadata'][0] 41 delete_key=""42 # Desahibilita la key43 disableKey(disable_key)44 # Borra la key45 deleteKey(delete_key)46 # Crea una nueva key para el usuario47 new_key = createKey(username)48 # Obtiene el mail del usuario49 mail = getUserMail(username)50 # Si es la primera vez que rota sus keys automaticamente crea un secreto para almacenar las nuevas keys51 try:52 createSecret(username, new_key)53 # Si no es la primera vez que rota sus keys automaticamente actualiza el secreto con sus nuevas keys54 except Exception as e: 55 updateSecret(username, new_key)56 # Envia un mail al usuario enseñandoles cuales keys fueron deshabilitadas, cuales eliminadas y que vea en Secret Manager su nueva key57 sendMail(mail, username, disable_key, delete_key)58 return resourceId59 60# En base a la función de boto3 list users listamos todos los usuarios de la cuenta61# Entre todos los usuarios busca el que tenga userId igual al resourceid (valor que se trae desde el config)62# Si preferis exlude users poner not in exclude_users en la condicion63def getUser(resourceId):64 response = iam_client.list_users()65 for user in response['Users']:66 # BUSCAR TAGS SIMIL A LA FUNCION DEL MAIL67 if user['UserId'] == resourceId and user['UserName'] in include_users: 68 return user['UserName']69 70# En funcion de la key obtiene el username y la access key y la inactiva71def disableKey(key):72 ak = key['AccessKeyId']73 un = key['UserName']74 75 iam_client.update_access_key(76 AccessKeyId=ak,77 Status='Inactive',78 UserName=un79 )80# En funcion de la key obtiene el username y la access key y la elimina81def deleteKey(key):82 if (key != ""):83 ak = key['AccessKeyId']84 un = key['UserName']85 86 iam_client.delete_access_key(87 AccessKeyId=ak,88 UserName=un89 )90 91# Usando boto3 se crea una nueva key para dicho usuario92def createKey(username):93 response = iam_client.create_access_key(94 UserName=username95 )96 return response97 98# Busca el mail del usuario en el tag mail99def getUserMail(username):100 101 tags = iam_client.list_user_tags(102 UserName=username,103 )104 105 for tag in tags['Tags']:106 if tag['Key'] == 'mail':107 mail = tag['Value']108 return mail109# Crea un secreto para almacenar la nueva key del usuario110def createSecret(username, new_key):111 AccessKeyId = new_key['AccessKey']['AccessKeyId']112 SecretAccessKey = new_key['AccessKey']['SecretAccessKey']113 secret_name = "/aws/iam/credentials/" + username114 data = {115 "AccessKey": AccessKeyId,116 "SecretAccessKey": SecretAccessKey117 }118 secret=json.dumps(data)119 sm_client.create_secret(120 Name=secret_name,121 Description='New Access Keys',122 SecretString=secret,123 Tags=[124 {125 'Key': 'Owner',126 'Value': username127 },128 ],129 )130# Hace update del secret en caso de que este ya existiera131def updateSecret(username, new_key):132 AccessKeyId = new_key['AccessKey']['AccessKeyId']133 SecretAccessKey = new_key['AccessKey']['SecretAccessKey']134 data = {135 "AccessKey": AccessKeyId,136 "SecretAccessKey": SecretAccessKey137 }138 secret=json.dumps(data)139 filter = [ 140 {141 'Key': 'tag-value',142 'Values': [username],143 } ]144 145 list_secret = sm_client.list_secrets(146 MaxResults=1,147 Filters=filter148 )149 secretID = list_secret['SecretList'][0]['ARN']150 sm_client.update_secret (151 SecretId=secretID,152 Description='Actualizada',153 SecretString=secret154 )155# Envia el correo156def sendMail(mail, username, disable_key, delete_key):157 url = "https://console.aws.amazon.com/secretsmanager/home?region=us-east-1#!/listSecrets"158 disable_key = disable_key['AccessKeyId']159 if (delete_key != ""):160 delete_key = delete_key['AccessKeyId']161 SENDER = "sol.malisani@dinocloudconsulting.com" #TODO: change sender162 CHARSET = "UTF-8"163 SUBJECT = "Sus Access Keys han sido rotadas de forma automática"164 RECIPIENT = mail165 166 167 BODY_HTML = """<html>168 <head></head>169 <body>170 <h3>Username {username}</h3>171 <p><ul>172 <li>Se ha desahibilitado la AccessKeyId: <b>{disable_key}</b></li>173 </ul>174 Visualice su nueva clave en AWS Secret Manager: <a href={url}>{url}</a> através del secreto: <b>/aws/iam/credentials/{username}</b>.175 <br/><br/>176 <i>Este email fue enviado de forma automática a través de Amazon SES</i>.177 </p>178 </body>179 </html>180 """.format(**locals())181 182 183 if (delete_key != ""):184 BODY_HTML = """<html>185 <head></head>186 <body>187 <h3>Username {username}</h3>188 <p><ul>189 <li>Se ha desahibilitado la AccessKeyId: <b>{disable_key}</b></li>190 <li>Se ha eliminado la AccessKeyId: <b>{delete_key}</b></li>191 </ul>192 Visualice su nueva clave en AWS Secret Manager: <a href={url}>{url}</a> através del secreto: <b>/aws/iam/credentials/{username}</b>.193 <br/><br/>194 <i>Este email fue enviado de forma automática a través de Amazon SES</i>.195 </p>196 </body>197 </html>198 """.format(**locals())199 200 ses_client.send_email(201 Destination={202 'ToAddresses': [203 RECIPIENT,204 ],205 },206 Message={207 'Body': {208 'Html': {209 'Charset': CHARSET,210 'Data': BODY_HTML,211 },212 213 },214 'Subject': {215 'Charset': CHARSET,216 'Data': SUBJECT,217 },218 },219 Source=SENDER,...

Full Screen

Full Screen

ssh.py

Source:ssh.py Github

copy

Full Screen

...66 puts(key)67 puts('-' * 40)68list_keys = ListKeys()69class DisableKey(SshManagementTask):70 def disable_key(self, authorized_file, key):71 key_regex = re.escape(key)72 key_regex = key_regex.replace('\/', '/')73 key_regex = '^%s$' % key_regex74 backup = '.%s.bak' % self.conf.current_time75 files.comment(authorized_file, key_regex, use_sudo=True, backup=backup)76 def do(self):77 if 'authorized_file' in self.conf:78 self.disable_key(self.conf.authorized_file, self.conf.key)79 else:80 authorized_files = list_authorized_files.get_authorized_files(81 exclude_users=self.conf.exclude_users)82 for user, authorized_file in authorized_files:83 self.disable_key(authorized_file, self.conf.key)84disable_key = DisableKey()85class EnableKey(SshManagementTask):86 def enable_key(self, authorized_file, key):87 backup = '.%s.bak' % self.conf.current_time88 regex = '%s' % re.escape(key)89 commented_key = '#' + regex90 if files.contains(91 authorized_file, commented_key, exact=True, use_sudo=True):92 files.uncomment(authorized_file, regex, use_sudo=True,93 backup=backup)94 else:95 files.append(authorized_file, key, use_sudo=True)96 def do(self):97 if 'authorized_file' in self.conf:...

Full Screen

Full Screen

lookup_cloudtrail_events.py

Source:lookup_cloudtrail_events.py Github

copy

Full Screen

1import datetime2import collections3import boto34cloudtrail = boto3.client('cloudtrail')5def lambda_handler(event, context):6 account_id = event['account_id']7 time_discovered = event['time_discovered']8 username = event['username']9 disable_key = event['disable_key']10 endtime = datetime.datetime.now() # Create start and end time for CloudTrail lookup11 interval = datetime.timedelta(hours=24)12 starttime = endtime - interval13 print('Retrieving events...')14 events = get_events(username, starttime, endtime)15 print('Summarizing events...')16 event_names, resource_names, resource_types = get_events_summaries(events)17 return {18 "account_id": account_id,19 "time_discovered": time_discovered,20 "username": username,21 "disable_key": disable_key,22 "event_names": event_names,23 "resource_names": resource_names,24 "resource_types": resource_types25 }26def get_events(username, starttime, endtime):27 """ Retrieves detailed list of CloudTrail events that occured between the specified time interval.28 Args:29 username (string): Username to lookup CloudTrail events for.30 starttime(datetime): Start of interval to lookup CloudTrail events between.31 endtime(datetime): End of interval to lookup CloudTrail events between.32 Returns:33 (dict)34 Dictionary containing list of CloudTrail events occuring between the start and end time with detailed information for each event.35 """36 try:37 response = cloudtrail.lookup_events(38 LookupAttributes=[39 {40 'AttributeKey': 'Username',41 'AttributeValue': username42 },43 ],44 StartTime=starttime,45 EndTime=endtime,46 MaxResults=5047 )48 except Exception as e:49 print(e)50 print('Unable to retrieve CloudTrail events for user "{}"'.format(username))51 raise(e)52 return response53def get_events_summaries(events):54 """ Summarizes CloudTrail events list by reducing into counters of occurences for each event, resource name, and resource type in list.55 Args:56 events (dict): Dictionary containing list of CloudTrail events to be summarized.57 Returns:58 (list, list, list)59 Lists containing name:count tuples of most common occurences of events, resource names, and resource types in events list.60 """61 event_name_counter = collections.Counter()62 resource_name_counter = collections.Counter()63 resource_type_counter = collections.Counter()64 for event in events['Events']:65 resources = event.get("Resources")66 event_name_counter.update([event.get('EventName')])67 if resources is not None:68 resource_name_counter.update([resource.get("ResourceName") for resource in resources])69 resource_type_counter.update([resource.get("ResourceType") for resource in resources])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful