How to use physical_resource_id method in localstack

Best Python code snippet using localstack_python

device_pool_resource.py

Source:device_pool_resource.py Github

copy

Full Screen

1from typing import Optional2import boto33import traceback4from botocore.client import BaseClient5from . import cloudformation6KNOWN_PROPERTIES = {'Name', 'Rules', 'ProjectArn', 'Description', 'MaxDevices', 'ServiceToken'}7def lambda_handler(event: dict, context):8 print(event)9 physical_resource_id = event.get('PhysicalResourceId')10 project_arn = event.get('ResourceProperties', {}).get('ProjectArn', None)11 name = event.get('ResourceProperties', {}).get('Name', None)12 rules = event.get('ResourceProperties', {}).get('Rules', None)13 description = event.get('ResourceProperties', {}).get('Description', None)14 max_devices = event.get('ResourceProperties', {}).get('MaxDevices', None)15 extra_properties = set(event.get('ResourceProperties', {}).keys()).difference(KNOWN_PROPERTIES)16 def send_error(reason):17 cloudformation.send_response(18 event=event,19 context=context,20 status=cloudformation.Status.FAILED,21 reason=reason,22 physical_resource_id=cloudformation.RESOURCE_NOT_CREATED23 )24 def send_missing_property_response(property_name: str):25 send_error(f'Required property {property_name} not set')26 try:27 if not project_arn:28 send_missing_property_response('ProjectArn')29 elif not name:30 send_missing_property_response('Name')31 elif not rules:32 send_missing_property_response('Rules')33 elif extra_properties:34 send_error(f'Unknown properties found: {", ".join(extra_properties)}')35 else:36 if event['RequestType'] == 'Delete' and physical_resource_id == cloudformation.RESOURCE_NOT_CREATED:37 cloudformation.send_response(38 event=event, context=context,39 status=cloudformation.Status.SUCCESS,40 physical_resource_id=physical_resource_id41 )42 else:43 if event['RequestType'] == 'Delete':44 client = _get_device_farm_client()45 client.delete_device_pool(arn=physical_resource_id)46 elif event['RequestType'] == 'Create':47 client = _get_device_farm_client()48 params = {49 'projectArn': project_arn,50 'name': name,51 'rules': rules,52 }53 if description is not None:54 params['description'] = description55 if max_devices is not None:56 params['maxDevices'] = max_devices57 response = client.create_device_pool(**params)58 physical_resource_id = response['devicePool']['arn']59 elif event['RequestType'] == 'Update':60 client = _get_device_farm_client()61 params = {62 'arn': physical_resource_id,63 'name': name,64 'rules': rules,65 }66 if max_devices is not None:67 params['maxDevices'] = max_devices68 else:69 params['clearMaxDevices'] = True70 if description is not None:71 params['description'] = description72 client.update_device_pool(**params)73 else:74 raise ValueError('Unknown RequestType ' + event['RequestType'])75 cloudformation.send_response(76 event=event, context=context,77 status=cloudformation.Status.SUCCESS,78 physical_resource_id=physical_resource_id,79 data={80 'Arn': physical_resource_id,81 },82 )83 except Exception as e:84 print(e)85 traceback.print_exc()86 physical_resource_id = physical_resource_id or cloudformation.RESOURCE_NOT_CREATED87 cloudformation.send_response(88 event=event,89 context=context,90 status=cloudformation.Status.FAILED,91 reason=str(e),92 physical_resource_id=physical_resource_id,93 )94 print('Finished')95 return 'ok'96def get_top_device_pool_arn(client: BaseClient, project_arn: str) -> Optional[str]:97 paginator = client.get_paginator('list_device_pools')98 for page in paginator.paginate(arn=project_arn, type='CURATED'):99 for device_pool in page['devicePools']:100 if device_pool['name'] == 'Top Devices':101 return device_pool['arn']102 print('Top Devices device pool not found')103 return None104def get_project_id(project_arn: str) -> str:105 arn_parts = project_arn.split(':')106 return arn_parts[-1]107def _get_device_farm_client() -> BaseClient:...

Full Screen

Full Screen

project_resource.py

Source:project_resource.py Github

copy

Full Screen

1from typing import Optional2import boto33import traceback4from botocore.client import BaseClient5from . import cloudformation6KNOWN_PROPERTIES = {'ProjectName', 'ServiceToken'}7def lambda_handler(event: dict, context):8 print(event)9 physical_resource_id = event.get('PhysicalResourceId')10 project_name = event.get('ResourceProperties', {}).get('ProjectName', None)11 extra_properties = set(event.get('ResourceProperties', {}).keys()).difference(KNOWN_PROPERTIES)12 try:13 if not project_name:14 cloudformation.send_response(15 event=event,16 context=context,17 status=cloudformation.Status.FAILED,18 reason='ProjectName is not set',19 physical_resource_id=cloudformation.RESOURCE_NOT_CREATED20 )21 elif extra_properties:22 cloudformation.send_response(23 event=event,24 context=context,25 status=cloudformation.Status.FAILED,26 reason=f'Unknown properties found: {", ".join(extra_properties)}',27 physical_resource_id=cloudformation.RESOURCE_NOT_CREATED28 )29 else:30 if event['RequestType'] == 'Delete' and physical_resource_id == cloudformation.RESOURCE_NOT_CREATED:31 cloudformation.send_response(32 event=event, context=context,33 status=cloudformation.Status.SUCCESS,34 physical_resource_id=physical_resource_id35 )36 else:37 if event['RequestType'] == 'Delete':38 client = _get_device_farm_client()39 client.delete_project(arn=physical_resource_id)40 elif event['RequestType'] == 'Create':41 client = _get_device_farm_client()42 response = client.create_project(name=project_name)43 physical_resource_id = response['project']['arn']44 elif event['RequestType'] == 'Update':45 client = _get_device_farm_client()46 client.update_project(arn=physical_resource_id, name=project_name)47 else:48 raise ValueError('Unknown RequestType ' + event['RequestType'])49 top_devices_device_pool_arn = get_top_device_pool_arn(client, physical_resource_id)50 cloudformation.send_response(51 event=event, context=context,52 status=cloudformation.Status.SUCCESS,53 physical_resource_id=physical_resource_id,54 data={55 'Arn': physical_resource_id,56 'ProjectId': get_project_id(physical_resource_id),57 'TopDevicesDevicePoolArn': top_devices_device_pool_arn,58 },59 )60 except Exception as e:61 print(e)62 traceback.print_exc()63 physical_resource_id = physical_resource_id or cloudformation.RESOURCE_NOT_CREATED64 cloudformation.send_response(65 event=event,66 context=context,67 status=cloudformation.Status.FAILED,68 reason=str(e),69 physical_resource_id=physical_resource_id,70 )71 print('Finished')72 return 'ok'73def get_top_device_pool_arn(client: BaseClient, project_arn: str) -> Optional[str]:74 paginator = client.get_paginator('list_device_pools')75 for page in paginator.paginate(arn=project_arn, type='CURATED'):76 for device_pool in page['devicePools']:77 if device_pool['name'] == 'Top Devices':78 return device_pool['arn']79 print('Top Devices device pool not found')80 return None81def get_project_id(project_arn: str) -> str:82 arn_parts = project_arn.split(':')83 return arn_parts[-1]84def _get_device_farm_client() -> BaseClient:...

Full Screen

Full Screen

lambda.py

Source:lambda.py Github

copy

Full Screen

1import json2import boto33import cfnresponse4codebuild_client = boto3.client("codebuild")5s3_client = boto3.client("s3")6def handler(event, context):7 physical_resource_id = None8 codebuild_running = False9 try:10 if event.get("RequestType") in ("Create", "Update"):11 bucket = event["ResourceProperties"]["Bucket"]12 key_target = event["ResourceProperties"]["KeyTarget"]13 physical_resource_id = f"arn:aws:s3:::{bucket}/{key_target}"14 start_build(event)15 codebuild_running = True16 elif event.get("source") == "aws.codebuild":17 codebuild_status = event["detail"]["build-status"]18 # Replace event with original event, will be used by cfnresponse.19 env = event["detail"]["additional-information"]["environment"][20 "environment-variables"21 ]22 for var in env:23 if var["name"] == "CFN_EVENT":24 event = json.loads(var["value"])25 break26 else:27 raise ValueError(env)28 bucket = event["ResourceProperties"]["Bucket"]29 key_target = event["ResourceProperties"]["KeyTarget"]30 physical_resource_id = f"arn:aws:s3:::{bucket}/{key_target}"31 if codebuild_status != "SUCCEEDED":32 raise ValueError(codebuild_status)33 # Ensure zip was uploaded (must be in buildspec).34 s3_client.head_object(Bucket=bucket, Key=key_target)35 # Delete previous zip after updates.36 if event["RequestType"] == "Update":37 old_physical_resource_id = event["PhysicalResourceId"]38 if old_physical_resource_id != physical_resource_id:39 delete(old_physical_resource_id)40 elif event.get("RequestType") == "Delete":41 physical_resource_id = event["PhysicalResourceId"]42 delete(physical_resource_id)43 bucket = event["ResourceProperties"]["Bucket"]44 key_target = event["ResourceProperties"]["KeyTarget"]45 delete(f"arn:aws:s3:::{bucket}/{key_target}")46 else:47 raise ValueError(event)48 status = cfnresponse.SUCCESS49 except Exception:50 status = cfnresponse.FAILED51 print(event)52 raise53 finally:54 if not codebuild_running:55 response_data = {}56 cfnresponse.send(57 event, context, status, response_data, physical_resource_id58 )59def start_build(event):60 env = {}61 bucket = event["ResourceProperties"]["Bucket"]62 key_target = event["ResourceProperties"]["KeyTarget"]63 env["TARGET_BUCKET"] = bucket64 env["TARGET_KEY"] = key_target65 env["TARGET_URL"] = f"s3://{bucket}/{key_target}"66 env["CFN_EVENT"] = json.dumps(event)67 response = codebuild_client.start_build(68 projectName=event["ResourceProperties"]["CodeBuildProjectName"],69 environmentVariablesOverride=[70 {"name": key, "value": value} for (key, value) in env.items()71 ],72 )73 print(response)74def delete(physical_resource_id):75 if physical_resource_id.startswith("arn:aws:s3:::"):76 bucket_and_key = physical_resource_id.split(":")[-1]77 bucket, key = bucket_and_key.split("/", 1)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful