How to use create_boto_client method in localstack

Best Python code snippet using localstack_python

receive_sqs_message.py

Source:receive_sqs_message.py Github

copy

Full Screen

...59 f'Error: on Account: {account_to_assume} with Role: {cross_account_role_arn}')60 print(f'{cross_account_role_name} might not exists in account?')61 raise e62# Create Boto Client63def create_boto_client(account_number, region, service, cross_account_role):64 # Use boto3 on source account65 if account_number == source_account:66 client = boto3.client(service, region)67 print(f'skipping STS for local account: {account_number}')68 else:69 # Log into Accounts with STS70 assume_creds = assume_sts_role(account_number, cross_account_role)71 client = assume_creds.client(service, region)72 print(f'Logged into Account: {account_number}')73 return client74# Get Lambda Functions75def get_all_lambda(account_number, region, cross_account_role):76 # Init77 var_list = []78 # Change boto client79 client_lambda = create_boto_client(80 account_number, region, 'lambda', cross_account_role)81 # Page all ec282 paginator = client_lambda.get_paginator('list_functions')83 for page in paginator.paginate():84 for i in page['Functions']:85 # clean role name out of arn86 iam_role = str(i['Role']).split(':')[5].split('/')[1]87 lambda_arn = i['FunctionArn']88 lambda_name = i['FunctionName']89 # Try Get Tags90 try:91 lambda_tag = client_lambda.list_tags(Resource=lambda_arn)['Tags']92 except ClientError as e:93 lambda_tag = 'No Tags Exist'94 var_list.append(95 {96 'EntryType': 'lambda',97 'Region': str(region),98 'FunctionName': str(lambda_name),99 'Id': str(lambda_arn),100 'Runtime': str(i['Runtime']),101 'AccountNumber': str(account_number),102 'Timeout': str(i['Timeout']),103 'RoleName': str(iam_role),104 'Handler': str(i['Handler']),105 'CodeSize': int(i['CodeSize']),106 'Link': str(f'https://{region}.console.aws.amazon.com/lambda/home?region={region}#/functions/{lambda_name}'),107 'Version': str(i['Version']),108 'MemorySize': int(i['MemorySize']),109 'LastModified': str(i['LastModified']),110 'Tags': str(lambda_tag)111 })112 return var_list113# Get RDS Function114def get_all_rds(account_number, region, cross_account_role):115 # Init116 var_list = []117 # Change boto client118 client_rds = create_boto_client(119 account_number, region, 'rds', cross_account_role)120 # Page all db instances121 paginator = client_rds.get_paginator('describe_db_instances')122 for page in paginator.paginate():123 for i in page['DBInstances']:124 # Get tags125 instance = i['DBInstanceArn']126 #Try Get Tags127 try:128 rds_tag = client_rds.list_tags_for_resource(ResourceName=instance)['TagList']129 except ClientError as e:130 rds_tag = 'No Tags Exist'131 var_list.append(132 {133 'EntryType': 'rds',134 'Region': str(region),135 'AccountNumber': str(account_number),136 'State': str(i['DBInstanceStatus']),137 'Id': str(i['DBInstanceIdentifier']),138 'DBInstanceClass': str(i['DBInstanceClass']),139 'AllocatedStorage': int(i.get('AllocatedStorage', ' ')),140 'PreferredBackupWindow': str(i.get('PreferredBackupWindow', ' ')),141 'BackupRetentionPeriod': str(i.get('BackupRetentionPeriod', ' ')),142 'PreferredMaintenanceWindow': str(i.get('PreferredMaintenanceWindow', ' ')),143 'Link': str(f'https://{region}.console.aws.amazon.com/rds/home?region={region}#databases:'),144 'StorageType': str(i.get('StorageType', ' ')),145 'Engine': str(i['Engine']),146 'MultiAZ': str(i.get('MultiAZ', ' ')),147 'PubliclyAccessible': str(i.get('PubliclyAccessible', ' ')),148 'Tags': str(rds_tag)149 })150 return var_list151# Get EKS Function152def get_all_eks(account_number, region, cross_account_role):153 # Init154 var_list = []155 # Change boto client156 client_eks = create_boto_client(157 account_number, region, 'eks', cross_account_role)158 # Get all eks clusters159 paginator = client_eks.get_paginator('list_clusters')160 for page in paginator.paginate():161 for i in page['clusters']:162 cluster_name = i163 eks_detail = client_eks.describe_cluster(name=cluster_name)['cluster']164 var_list.append({165 'AccountNumber': str(account_number),166 'EntryType': str('eks'),167 'Region': str(region),168 'Name': str(eks_detail['name']),169 'Id': str(eks_detail['arn']),170 'Status': str(eks_detail['status']),171 'RoleArn': str(eks_detail.get('roleArn', ' ')),172 'Created': str(eks_detail['createdAt']),173 'VpcId': str(eks_detail['resourcesVpcConfig'].get('vpcId', ' ')),174 'PlatformVersion': str(eks_detail['platformVersion']),175 'Link': (f'https://{region}.console.aws.amazon.com/eks/home?region={region}#/clusters/{cluster_name}'),176 'K8 Version': str(eks_detail['version']),177 'Endpoint': str(eks_detail['endpoint']),178 'Tags': str(eks_detail.get('tags', 'No Tags Exist'))179 })180 return var_list181# Get EC2 Function182def get_all_ec2(account_number, region, cross_account_role):183 # Init184 var_list = []185 # Use boto3 on source account186 client_ec2 = create_boto_client(187 account_number, region, 'ec2', cross_account_role)188 # Page all ec2189 paginator = client_ec2.get_paginator('describe_instances')190 for page in paginator.paginate():191 for i in page['Reservations']:192 # Check for IAM Role193 checkIAMrole = i['Instances'][0].get('IamInstanceProfile', ' ')194 # Convert from string to dict if not empty195 if checkIAMrole != ' ':196 python_dict = literal_eval(f'{checkIAMrole}')197 full_role_name = python_dict['Arn']198 # clean role name out of arn199 iam_role = full_role_name.split(':')[5].split('/')[1]200 else:201 iam_role = ' '202 # Get vCPU count203 vcpu_core = i['Instances'][0]['CpuOptions']['CoreCount']204 vcpu_thread = i['Instances'][0]['CpuOptions']['ThreadsPerCore']205 # Cores x thread = vCPU count206 vCPU = int(vcpu_core) * int(vcpu_thread)207 # Turn Tags into Strings for Table format on front end208 ec2_tags = i['Instances'][0].get('Tags', 'No Tags Exist')209 var_list.append(210 {211 'EntryType': 'ec2',212 'Id': str(i['Instances'][0]['InstanceId']),213 'State': str(i['Instances'][0]['State']['Name']),214 'AccountNumber': str(account_number),215 'Region': str(region),216 'vCPU': int(vCPU),217 'KeyName': str(i['Instances'][0].get('KeyName', ' ')),218 'RoleName': str(iam_role),219 'Link': str(f'https://{region}.console.aws.amazon.com/ec2/v2/home?region={region}#Instances:sort=instanceId'),220 'PrivateIpAddress': str(i['Instances'][0].get('PrivateIpAddress', ' ')),221 'PublicIpAddress': str(i['Instances'][0].get('PublicIpAddress', ' ')),222 'InstancePlatform': str(i['Instances'][0].get('Platform', 'Linux/UNIX')),223 'InstanceType': str(i['Instances'][0]['InstanceType']),224 'Tags': str(ec2_tags)225 })226 return var_list227# Get ALB/NLB/ELB Function228def get_all_load_balancers(account_number, region, cross_account_role):229 # Init230 var_list = []231 # Use boto3 on source account232 client_elb = create_boto_client(233 account_number, region, 'elb', cross_account_role)234 # ALB/NLB235 client_elbv2 = create_boto_client(236 account_number, region, 'elbv2', cross_account_role)237 # Page all elb's238 paginator = client_elb.get_paginator('describe_load_balancers')239 for page in paginator.paginate():240 for i in page['LoadBalancerDescriptions']:241 var_list.append(242 {243 'EntryType': 'lb',244 'LoadBalancerName': str(i['LoadBalancerName']),245 'Id': str(i['DNSName']),246 'Scheme': str(i['Scheme']),247 'VPC': str(i['VPCId']),248 'State': 'n/a',249 'AccountNumber': str(account_number),250 'Region': str(region),251 'AvailabilityZones': str(i['AvailabilityZones']),252 'SecurityGroups': str(i['SecurityGroups']),253 'Type': 'classic'254 })255 # Page all ALB/NLB256 paginator2 = client_elbv2.get_paginator('describe_load_balancers')257 for page in paginator2.paginate():258 for i in page['LoadBalancers']:259 var_list.append(260 {261 'EntryType': 'lb',262 'LoadBalancerName': str(i['LoadBalancerName']),263 'Id': str(i['DNSName']),264 'Scheme': str(i['Scheme']),265 'State': str(i['State']['Code']),266 'VPC': str(i['VpcId']),267 'AccountNumber': str(account_number),268 'Region': str(region),269 'AvailabilityZones': str(i['AvailabilityZones']),270 'SecurityGroups': str(i.get('SecurityGroups', ' ')),271 'Type': str(i['Type'])272 })273 return var_list274# Get EBS Volumes275def get_all_ebs(account_number, region, cross_account_role):276 277 # Init278 var_list = []279 # Use boto3 on source account280 client_ebs = create_boto_client(281 account_number, region, 'ec2', cross_account_role)282 # Page all elb's283 paginator = client_ebs.get_paginator('describe_volumes')284 for page in paginator.paginate():285 for i in page['Volumes']:286 var_list.append(287 {288 'EntryType': 'ebs',289 'Id': str(i['VolumeId']),290 'State': str(i['State']),291 'Size': str(i['Size']),292 'VolumeType': str(i['VolumeType']),293 'AccountNumber': str(account_number),294 'Region': str(region),295 'Tags': str(i.get('Tags', 'No Tag')),296 'Encrypted': str(i['Encrypted']),297 'SnapshotId': str(i['SnapshotId']),298 'AvailabilityZone': str(i['AvailabilityZone']),299 'CreateTime': str(i['CreateTime'])300 })301 return var_list302# Get IAM Roles Function303def get_all_iam_roles(account_number, cross_account_role):304 # Init305 var_list = []306 # Use boto3 on source account307 client_iam = create_boto_client(308 account_number, 'us-east-1', 'iam', cross_account_role)309 # Page roles310 paginator = client_iam.get_paginator('list_roles')311 for page in paginator.paginate():312 for i in page['Roles']:313 role_name = i['RoleName']314 # Get Tags for Role315 try:316 print(f'Getting Tags for: {role_name}...')317 tags = client_iam.list_role_tags(RoleName=role_name)['Tags']318 except ClientError as e:319 tags = 'No Tags Exist'320 var_list.append(321 {322 'Id': str(i['Arn']),323 'EntryType': 'iam-roles',324 'Region': 'us-east-1',325 'AccountNumber': str(account_number),326 'Link': str(f"https://console.aws.amazon.com/iam/home?region=us-east-1#/roles/{role_name}"),327 'Tags': str(tags),328 'RoleName': str(role_name),329 'CreateDate': str(i['CreateDate'])330 })331 return var_list332# Get IAM Users Function333def get_all_iam_users(account_number, cross_account_role):334 # Init335 var_list = []336 # Use boto3 on source account337 client_iam = create_boto_client(338 account_number, 'us-east-1', 'iam', cross_account_role)339 # Page users340 paginator = client_iam.get_paginator('list_users')341 for page in paginator.paginate():342 for i in page['Users']:343 username = i['UserName']344 # Get Tags for User345 try:346 print(f'Getting Tags for: {username}...')347 tags = client_iam.list_user_tags(UserName=username)['Tags']348 except ClientError as e:349 tags = 'No Tags Exist'350 var_list.append(351 {352 'Id': str(i['Arn']),353 'EntryType': 'iam-users',354 'AccountNumber': str(account_number),355 'Region': 'us-east-1',356 'Link': str(f"https://console.aws.amazon.com/iam/home?region=us-east-1#/users/{username}"),357 'UserName': str(username),358 'Tags': str(tags),359 'PasswordLastUsed': str(i.get('PasswordLastUsed', ' ')),360 'CreateDate': str(i['CreateDate'])361 })362 return var_list363# Get IAM Users Function364def get_all_iam_attached_policys(account_number, cross_account_role):365 # Init366 var_list = []367 # Use boto3 on source account368 client_iam = create_boto_client(369 account_number, 'us-east-1', 'iam', cross_account_role)370 # Page policys371 paginator = client_iam.get_paginator('list_policies')372 for page in paginator.paginate(OnlyAttached=True):373 for i in page['Policies']:374 var_list.append(375 {376 'Id': str(i['Arn']),377 'EntryType': 'iam-attached-policys',378 'AccountNumber': str(account_number),379 'Region': 'us-east-1',380 'PolicyName': str(i['PolicyName']),381 'AttachmentCount': int(i['AttachmentCount'])382 })383 return var_list384# Get OnDemand Capacity Reservations Function385def get_all_odcr(account_number, region, cross_account_role):386 # Init387 var_list = []388 # Use boto3 on source account389 client_ec2 = create_boto_client(390 account_number, region, 'ec2', cross_account_role)391 # Page all reservations392 paginator = client_ec2.get_paginator('describe_capacity_reservations')393 for page in paginator.paginate():394 for i in page['CapacityReservations']:395 if i['State'] == 'active':396 var_list.append(397 {398 'EntryType': 'odcr',399 'AccountNumber': str(account_number),400 'Region': str(region),401 'AvailabilityZone': str(i['AvailabilityZone']),402 'AvailableInstanceCount': int(i['AvailableInstanceCount']),403 'Id': str(i['CapacityReservationId']),404 'Qty Available': str(f"{i['AvailableInstanceCount']} of {i['TotalInstanceCount']}"),405 'CreateDate': str(i['CreateDate']),406 'EbsOptimized': str(i['EbsOptimized']),407 'EndDateType': str(i['EndDateType']),408 'EphemeralStorage': str(i['EphemeralStorage']),409 'InstanceMatchCriteria': str(i['InstanceMatchCriteria']),410 'InstancePlatform': str(i['InstancePlatform']),411 'InstanceType': str(i['InstanceType']),412 'State': str(i['State']),413 'Tags': str(i['Tags']),414 'Tenancy': str(i['Tenancy']),415 'TotalInstanceCount': int(i['TotalInstanceCount'])416 })417 return var_list418# Get Lightsail Instances Function419def get_all_lightsail(account_number, region, cross_account_role):420 # Init421 var_list = []422 # Use boto3 on source account423 client_lightsail = create_boto_client(424 account_number, region, 'lightsail', cross_account_role)425 # Page all reservations426 paginator = client_lightsail.get_paginator('get_instances')427 for page in paginator.paginate():428 for i in page['instances']:429 var_list.append(430 {431 'EntryType': 'lightsail',432 'Id': str(i['arn']),433 'AccountNumber': str(account_number),434 'Region': str(region),435 'AvailabilityZone': str(i['location']['availabilityZone']),436 'Name': str(i['name']),437 'CreateDate': str(i['createdAt']),438 'Blueprint': str(i['blueprintName']),439 'RAM in GB': int(i['hardware']['ramSizeInGb']),440 'vCPU': int(i['hardware']['cpuCount']),441 'SSD in GB': int(i['hardware']['disks'][0]['sizeInGb']),442 'Public IP': str(i['publicIpAddress'])443 })444 return var_list445# Get Organizations Function446def get_organizations(account_number, region, cross_account_role):447 # Init448 var_list = []449 # Use boto3 on source account450 client_org = create_boto_client(451 account_number, region, 'organizations', cross_account_role)452 # Page all org453 paginator = client_org.get_paginator('list_accounts')454 for page in paginator.paginate():455 for i in page['Accounts']:456 if i['Status'] == 'ACTIVE':457 acc_number = i['Id']458 # Try Get Tags459 try:460 org_tags = client_org.list_tags_for_resource(ResourceId=acc_number)['Tags']461 except ClientError as e:462 org_tags = 'No Tags Exist'463 var_list.append(464 {465 'AccountNumber': str(acc_number),466 'Id': str(i['Arn']),467 'Region': 'us-east-1',468 'EntryType': 'org',469 'Name': str(i['Name']),470 'Email': str(i['Email']),471 'Status': str(i['Status']),472 'JoinedMethod': str(i['JoinedMethod']),473 'Tags': str(org_tags)474 })475 return var_list476# Get VPC Function477def get_all_vpc(account_number, region, cross_account_role):478 # Init479 var_list = []480 # Use boto3 on source account481 client_ec2 = create_boto_client(482 account_number, region, 'ec2', cross_account_role)483 # Page all vpc's484 paginator = client_ec2.get_paginator('describe_vpcs')485 for page in paginator.paginate():486 for i in page['Vpcs']:487 var_list.append(488 {489 'EntryType': 'vpc',490 'AccountNumber': str(account_number),491 'Region': str(region),492 'CidrBlock': str(i['CidrBlock']),493 'Id': str(i['VpcId']),494 'DhcpOptionsId': str(i['DhcpOptionsId']),495 'InstanceTenancy': str(i['InstanceTenancy']),496 'Link': str(f'https://{region}.console.aws.amazon.com/vpc/home?region={region}#vpcs:sort=VpcId'),497 'Tags': str(i.get('Tags', 'No Tags Exist'))498 })499 return var_list500# Get All Network Interfaces Function501def get_all_network_interfaces(account_number, region, cross_account_role):502 # Init503 var_list = []504 # Use boto3 on source account505 client_ec2 = create_boto_client(506 account_number, region, 'ec2', cross_account_role)507 retrieved_subnets = {}508 def get_subnet(subnet_id):509 try:510 if subnet_id and subnet_id not in retrieved_subnets:511 retrieved_subnets[subnet_id] = client_ec2.describe_subnets(SubnetIds=[subnet_id])['Subnets'][0]512 return retrieved_subnets[subnet_id]513 except:514 print(f"Unable to call describe_subnets for {subnet_id}")515 return {}516 # Page all vpc's517 paginator = client_ec2.get_paginator('describe_network_interfaces')518 for page in paginator.paginate():519 for i in page['NetworkInterfaces']:520 subnet = get_subnet(i.get('SubnetId'))521 for ip in i.get('PrivateIpAddresses', []):522 var_list.append({523 'EntryType': 'network-interfaces',524 'Id': f"{i.get('NetworkInterfaceId')}-{ip['PrivateIpAddress']}",525 'NetworkInterfaceId': str(i.get('NetworkInterfaceId')),526 'AccountNumber': str(account_number),527 'Region': str(region),528 'PrivateIpAddress': ip.get('PrivateIpAddress', ' '),529 'PublicIp': ip.get('Association', {}).get('PublicIp', ' '),530 'Primary': str(ip.get('Primary', ' ')),531 'Status': str(i.get('Status', ' ')),532 'Link': str(f"https://{region}.console.aws.amazon.com/ec2/v2/home?region={region}#NIC:sort=networkInterfaceId"),533 'AttStatus': str(i.get('Attachment', {}).get('Status', ' ')),534 'InterfaceType': str(i.get('InterfaceType', ' ')),535 'SubnetId': str(i.get('SubnetId', ' ')),536 'VpcId': str(i.get('VpcId', ' ')),537 'CidrBlock': str(subnet.get('CidrBlock', ' ')),538 'Tags': str(i.get('TagSet', 'No Tags Exist')),539 'Description': str(i.get('Description', ' '))540 })541 return var_list542# Get Subnet Function543def get_all_subnets(account_number, region, cross_account_role):544 # Init545 var_list = []546 # Use boto3 on source account547 client_ec2 = create_boto_client(548 account_number, region, 'ec2', cross_account_role)549 # No paginator for subnets550 # paginator = client_ec2.get_paginator('describe_subnets')551 result = client_ec2.describe_subnets()552 for i in result['Subnets']:553 var_list.append(554 {555 'EntryType': 'subnet',556 'AccountNumber': str(account_number),557 'Region': region,558 'CidrBlock': str(i['CidrBlock']),559 'AvailabilityZone': str(i['AvailabilityZone']),560 'AvailabilityZoneId': str(i['AvailabilityZoneId']),561 'Id': str(i['SubnetArn']),562 'SubnetId': str(i['SubnetId']),563 'VpcId': str(i['VpcId']),564 'AvailableIpAddressCount': i['AvailableIpAddressCount'],565 'Tags': str(i.get('Tags', 'No Tags Exist'))566 })567 return var_list568# Get Reserved Instances569def get_all_ris(account_number, region, cross_account_role):570 # Init571 var_list = []572 # Use boto3 on source account573 client_ec2 = create_boto_client(574 account_number, region, 'ec2', cross_account_role)575 # No paginator for reservations576 # paginator = client_ec2.get_paginator('')577 result = client_ec2.describe_reserved_instances()578 for i in result['ReservedInstances']:579 # only get active ones580 if i['State'] == 'active':581 var_list.append(582 {583 'EntryType': 'ri',584 'AccountNumber': str(account_number),585 'InstanceCount': int(i['InstanceCount']),586 'InstanceType': str(i['InstanceType']),587 'Scope': str(i['Scope']),588 'ProductDescription': str(i['ProductDescription']),589 'Id': str(i['ReservedInstancesId']),590 'Start': str(i['Start']),591 'End': str(i['End']),592 'InstanceTenancy': str(i['InstanceTenancy']),593 'OfferingClass': str(i['OfferingClass'])594 })595 return var_list596# Get S3 Buckets # REGION FORCED TO US-EAST-1597def get_all_s3_buckets(account_number, cross_account_role):598 # Init599 var_list = []600 # Use boto3 on source account601 client_s3 = create_boto_client(602 account_number, 'us-east-1', 's3', cross_account_role)603 # No paginator for listing buckets604 # paginator = client_ec2.get_paginator('')605 result = client_s3.list_buckets()606 for i in result['Buckets']:607 bucket_name = i['Name']608 bucket_creation_date = i['CreationDate']609 bucket_region = ' '610 bucket_tag = ' '611 # Try Get Region612 try:613 print(f'Getting Region for bucket: {bucket_name}')614 bucket_region = client_s3.get_bucket_location(Bucket=bucket_name)['LocationConstraint']615 except ClientError as e:...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

...11 if credentials['twitter']['token'] is None:12 print("Did you forget to add your twitter tokens to the credentials.json file?")13 raise HTTPError14 tweet_iterator = create_tweet_iterator()15 s3_client = create_boto_client()16 collection_client = create_mongo_client_to_database_collection()17 while True:18 timestamp()19 tweets = collect_tweets(tweet_iterator, 100)20 filename = write_to_disk(tweets)21 process_local_file_to_S3(s3_client,filename)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful