Best Python code snippet using localstack_python
lambda_function.py
Source:lambda_function.py  
...168    policy_document_dict = json.loads(describe_response['VpcEndpoints'][0]['PolicyDocument'])169    if len(policy_document_dict['Statement']) > 1:170        custom_raise_exception(event, context, 'VPC Endpoint policy has multiple statements, that is not supported by this function')171    return policy_document_dict172def modify_vpc_endpoint(event, context, ec2_client, new_policy_document_dict):173    """Call the modify API."""174    if DEBUG_MODE is True:175        print("Entering function 'modify_vpc_endpoint'")176    try:177        # We need to convert the policy document from dict to string so we use json.dumps178        modify_response = ec2_client.modify_vpc_endpoint(179            PolicyDocument=json.dumps(new_policy_document_dict),180            VpcEndpointId=str(event['ResourceProperties']['vpc-endpoint-id'])181            )182    except Exception as error:  # pylint: disable=W0703183        print("Failed to connect to given region, aborting.")184        custom_raise_exception(event, context, error)185    if modify_response['Return'] is 'False':186        custom_raise_exception(event, context, 'VPC endpoint policy failed to update')187    return188def cloudformation_create(event, context, ec2_client):189    """Add the given bucket to the VPC endpoint policy."""190    if DEBUG_MODE is True:191        print("Create Option: Attempting to run creation")192    original_policy_document = describe_vpc_endpoints(event, context, ec2_client)193    if isinstance(original_policy_document['Statement'][0]['Resource'], list):194        print("Resource is a list, appending...")195        original_policy_document['Statement'][0]['Resource'].append(str(event['ResourceProperties']['bucket-arn']))196        original_policy_document['Statement'][0]['Resource'].append(str(event['ResourceProperties']['bucket-arn'] + '/*'))197    elif isinstance(original_policy_document['Statement'][0]['Resource'], unicode):198        print("Resource is the default unicode string, replacing")199        original_policy_document['Statement'][0]['Resource'] = [200            str(event['ResourceProperties']['bucket-arn']),201            str(event['ResourceProperties']['bucket-arn'] + '/*')202            ]203    else:204        custom_raise_exception(event, context, 'Endpoint policy looks invalid, Resource stanza is not a list or unicode.')205    if DEBUG_MODE is True:206        print("New policy\n%s" % json.dumps(original_policy_document, indent=2))207    modify_vpc_endpoint(event, context, ec2_client, original_policy_document)208    response_data = {}209    if event['StackId'] == '012345678910/fake-stack-id':210        print("Skipping sending CloudFormation response due to local testing.")211        return212    send(event, context, 'SUCCESS', response_data, event['StackId'])213    if DEBUG_MODE is True:214        print("Exiting successfully")215    return216def cloudformation_update(event, context, ec2_client):217    """Cloudformation called us with CreateStack."""218    if DEBUG_MODE is True:219        print("Create Option: Attempting to run update")220    # This should almost never be called for updates, only if a template had a bucket added or removed221    original_policy_document = describe_vpc_endpoints(event, context, ec2_client)222    if event['ResourceProperties']['bucket-arn'] not in original_policy_document[0]['Resource']:223        # Our bucket is not in the policy, add it224        original_policy_document['Statement'][0]['Resource'].append(str(event['ResourceProperties']['bucket-arn']))225        original_policy_document['Statement'][0]['Resource'].append(str(event['ResourceProperties']['bucket-arn'] + '/*'))226        modify_vpc_endpoint(event, context, ec2_client, original_policy_document)227    else:228        # Our bucket is in the policy229        print("Bucket supplied is already in the policy, skipping any actions.")230    response_data = {}231    if event['StackId'] == '012345678910/fake-stack-id':232        print("Skipping sending CloudFormation response due to local testing.")233        return234    send(event, context, 'SUCCESS', response_data, event['StackId'])235    if DEBUG_MODE is True:236        print("Exiting successfully")237    return238def cloudformation_delete(event, context, ec2_client):239    """Delete the given bucket from the VPC endpoint policy."""240    if DEBUG_MODE is True:241        print("Create Option: Attempting to run deletion")242    original_policy_document = describe_vpc_endpoints(event, context, ec2_client)243    original_policy_document['Statement'][0]['Resource'].remove(str(event['ResourceProperties']['bucket-arn']))244    original_policy_document['Statement'][0]['Resource'].remove(str(event['ResourceProperties']['bucket-arn'] + '/*'))245    modify_vpc_endpoint(event, context, ec2_client, original_policy_document)246    response_data = {}247    if event['StackId'] == '012345678910/fake-stack-id':248        print("Skipping sending CloudFormation response due to local testing.")249        return250    send(event, context, 'SUCCESS', response_data, event['StackId'])251    if DEBUG_MODE is True:252        print("Exiting successfully")253    return254def lambda_handler(event, context):255    """Main Lambda function."""256    print("event:" + str(event))257    if DEBUG_MODE:258        print("event:" + str(event))259    validate_inputs(event, context)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
