Best Python code snippet using localstack_python
test_lambda.py
Source:test_lambda.py  
1#2# (c) 2017 Michael De La Rue3#4# This file is part of Ansible5#6# Ansible is free software: you can redistribute it and/or modify7# it under the terms of the GNU General Public License as published by8# the Free Software Foundation, either version 3 of the License, or9# (at your option) any later version.10#11# Ansible is distributed in the hope that it will be useful,12# but WITHOUT ANY WARRANTY; without even the implied warranty of13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the14# GNU General Public License for more details.15#16# You should have received a copy of the GNU General Public License17# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.18# Make coding more python3-ish19from __future__ import (absolute_import, division, print_function)20import copy21import json22import pytest23from ansible.compat.tests.mock import MagicMock, Mock, patch24from ansible.module_utils import basic25from ansible.module_utils._text import to_bytes26boto3 = pytest.importorskip("boto3")27# lambda is a keyword so we have to hack this.28_temp = __import__("ansible.modules.cloud.amazon.lambda")29lda = getattr(_temp.modules.cloud.amazon, "lambda")30def set_module_args(args):31    args = json.dumps({'ANSIBLE_MODULE_ARGS': args})32    basic._ANSIBLE_ARGS = to_bytes(args)33base_lambda_config = {34    'FunctionName': 'lambda_name',35    'Role': 'arn:aws:iam::987654321012:role/lambda_basic_execution',36    'Handler': 'lambda_python.my_handler',37    'Description': 'this that the other',38    'Timeout': 3,39    'MemorySize': 128,40    'Runtime': 'python2.7',41    'CodeSha256': 'AqMZ+xptM7aC9VXu+5jyp1sqO+Nj4WFMNzQxtPMP2n8=',42}43one_change_lambda_config = copy.copy(base_lambda_config)44one_change_lambda_config['Timeout'] = 445two_change_lambda_config = copy.copy(one_change_lambda_config)46two_change_lambda_config['Role'] = 'arn:aws:iam::987654321012:role/lambda_advanced_execution'47code_change_lambda_config = copy.copy(base_lambda_config)48code_change_lambda_config['CodeSha256'] = 'P+Zy8U4T4RiiHWElhL10VBKj9jw4rSJ5bm/TiW+4Rts='49base_module_args = {50    "region": "us-west-1",51    "name": "lambda_name",52    "state": "present",53    "zip_file": "test/units/modules/cloud/amazon/fixtures/thezip.zip",54    "runtime": 'python2.7',55    "role": 'arn:aws:iam::987654321012:role/lambda_basic_execution',56    "memory_size": 128,57    "timeout": 3,58    "handler": 'lambda_python.my_handler'59}60module_args_with_environment = dict(base_module_args, environment_variables={61    "variable_name": "variable_value"62})63def make_mock_no_connection_connection(config):64    """return a mock of ansible's boto3_conn ready to return a mock AWS API client"""65    lambda_client_double = MagicMock()66    lambda_client_double.get_function.configure_mock(67        return_value=False68    )69    lambda_client_double.update_function_configuration.configure_mock(70        return_value={71            'Version': 172        }73    )74    fake_boto3_conn = Mock(return_value=lambda_client_double)75    return (fake_boto3_conn, lambda_client_double)76def make_mock_connection(config):77    """return a mock of ansible's boto3_conn ready to return a mock AWS API client"""78    lambda_client_double = MagicMock()79    lambda_client_double.get_function.configure_mock(80        return_value={81            'Configuration': config82        }83    )84    lambda_client_double.update_function_configuration.configure_mock(85        return_value={86            'Version': 187        }88    )89    fake_boto3_conn = Mock(return_value=lambda_client_double)90    return (fake_boto3_conn, lambda_client_double)91class AnsibleFailJson(Exception):92    pass93def fail_json_double(*args, **kwargs):94    """works like fail_json but returns module results inside exception instead of stdout"""95    kwargs['failed'] = True96    raise AnsibleFailJson(kwargs)97# TODO: def test_handle_different_types_in_config_params():98def test_create_lambda_if_not_exist():99    set_module_args(base_module_args)100    (boto3_conn_double, lambda_client_double) = make_mock_no_connection_connection(code_change_lambda_config)101    with patch.object(lda, 'boto3_conn', boto3_conn_double):102        try:103            lda.main()104        except SystemExit:105            pass106    # guard against calling other than for a lambda connection (e.g. IAM)107    assert(len(boto3_conn_double.mock_calls) == 1), "multiple boto connections used unexpectedly"108    assert(len(lambda_client_double.update_function_configuration.mock_calls) == 0), \109        "unexpectedly updated lambda configuration when should have only created"110    assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \111        "update lambda function code when function should have been created only"112    assert(len(lambda_client_double.create_function.mock_calls) > 0), \113        "failed to call create_function "114    (create_args, create_kwargs) = lambda_client_double.create_function.call_args115    assert (len(create_kwargs) > 0), "expected create called with keyword args, none found"116    try:117        # For now I assume that we should NOT send an empty environment.  It might118        # be okay / better to explicitly send an empty environment.  However `None'119        # is not acceptable - mikedlr120        create_kwargs["Environment"]121        raise(Exception("Environment sent to boto when none expected"))122    except KeyError:123        pass  # We are happy, no environment is fine124def test_update_lambda_if_code_changed():125    set_module_args(base_module_args)126    (boto3_conn_double, lambda_client_double) = make_mock_connection(code_change_lambda_config)127    with patch.object(lda, 'boto3_conn', boto3_conn_double):128        try:129            lda.main()130        except SystemExit:131            pass132    # guard against calling other than for a lambda connection (e.g. IAM)133    assert(len(boto3_conn_double.mock_calls) == 1), "multiple boto connections used unexpectedly"134    assert(len(lambda_client_double.update_function_configuration.mock_calls) == 0), \135        "unexpectedly updatede lambda configuration when only code changed"136    assert(len(lambda_client_double.update_function_configuration.mock_calls) < 2), \137        "lambda function update called multiple times when only one time should be needed"138    assert(len(lambda_client_double.update_function_code.mock_calls) > 1), \139        "failed to update lambda function when code changed"140    # 3 because after uploading we call into the return from mock to try to find what function version141    # was returned so the MagicMock actually sees two calls for one update.142    assert(len(lambda_client_double.update_function_code.mock_calls) < 3), \143        "lambda function code update called multiple times when only one time should be needed"144def test_update_lambda_if_config_changed():145    set_module_args(base_module_args)146    (boto3_conn_double, lambda_client_double) = make_mock_connection(two_change_lambda_config)147    with patch.object(lda, 'boto3_conn', boto3_conn_double):148        try:149            lda.main()150        except SystemExit:151            pass152    # guard against calling other than for a lambda connection (e.g. IAM)153    assert(len(boto3_conn_double.mock_calls) == 1), "multiple boto connections used unexpectedly"154    assert(len(lambda_client_double.update_function_configuration.mock_calls) > 0), \155        "failed to update lambda function when configuration changed"156    assert(len(lambda_client_double.update_function_configuration.mock_calls) < 2), \157        "lambda function update called multiple times when only one time should be needed"158    assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \159        "updated lambda code when no change should have happened"160def test_update_lambda_if_only_one_config_item_changed():161    set_module_args(base_module_args)162    (boto3_conn_double, lambda_client_double) = make_mock_connection(one_change_lambda_config)163    with patch.object(lda, 'boto3_conn', boto3_conn_double):164        try:165            lda.main()166        except SystemExit:167            pass168    # guard against calling other than for a lambda connection (e.g. IAM)169    assert(len(boto3_conn_double.mock_calls) == 1), "multiple boto connections used unexpectedly"170    assert(len(lambda_client_double.update_function_configuration.mock_calls) > 0), \171        "failed to update lambda function when configuration changed"172    assert(len(lambda_client_double.update_function_configuration.mock_calls) < 2), \173        "lambda function update called multiple times when only one time should be needed"174    assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \175        "updated lambda code when no change should have happened"176def test_update_lambda_if_added_environment_variable():177    set_module_args(module_args_with_environment)178    (boto3_conn_double, lambda_client_double) = make_mock_connection(base_lambda_config)179    with patch.object(lda, 'boto3_conn', boto3_conn_double):180        try:181            lda.main()182        except SystemExit:183            pass184    # guard against calling other than for a lambda connection (e.g. IAM)185    assert(len(boto3_conn_double.mock_calls) == 1), "multiple boto connections used unexpectedly"186    assert(len(lambda_client_double.update_function_configuration.mock_calls) > 0), \187        "failed to update lambda function when configuration changed"188    assert(len(lambda_client_double.update_function_configuration.mock_calls) < 2), \189        "lambda function update called multiple times when only one time should be needed"190    assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \191        "updated lambda code when no change should have happened"192    (update_args, update_kwargs) = lambda_client_double.update_function_configuration.call_args193    assert (len(update_kwargs) > 0), "expected update configuration called with keyword args, none found"194    assert update_kwargs['Environment']['Variables'] == module_args_with_environment['environment_variables']195def test_dont_update_lambda_if_nothing_changed():196    set_module_args(base_module_args)197    (boto3_conn_double, lambda_client_double) = make_mock_connection(base_lambda_config)198    with patch.object(lda, 'boto3_conn', boto3_conn_double):199        try:200            lda.main()201        except SystemExit:202            pass203    # guard against calling other than for a lambda connection (e.g. IAM)204    assert(len(boto3_conn_double.mock_calls) == 1), "multiple boto connections used unexpectedly"205    assert(len(lambda_client_double.update_function_configuration.mock_calls) == 0), \206        "updated lambda function when no configuration changed"207    assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \208        "updated lambda code when no change should have happened"209def test_warn_region_not_specified():210    set_module_args({211        "name": "lambda_name",212        "state": "present",213        # Module is called without a region causing error214        # "region": "us-east-1",215        "zip_file": "test/units/modules/cloud/amazon/fixtures/thezip.zip",216        "runtime": 'python2.7',217        "role": 'arn:aws:iam::987654321012:role/lambda_basic_execution',218        "handler": 'lambda_python.my_handler'})219    get_aws_connection_info_double = Mock(return_value=(None, None, None))220    with patch.object(lda, 'get_aws_connection_info', get_aws_connection_info_double):221        with patch.object(basic.AnsibleModule, 'fail_json', fail_json_double):222            try:223                lda.main()224            except AnsibleFailJson as e:225                result = e.args[0]...lambda_function.py
Source:lambda_function.py  
1import boto32# Changes to this lambda function must be manually updated, then it can be run to update all the other associated lambda functions.3def lambda_handler(event, context):4    lambda_client = boto3.client('lambda')5    response = lambda_client.update_function_code(6        FunctionName = 'arn:aws:lambda:us-east-1:849776797214:function:check-cluster-status',7        S3Bucket = 'us-dev-us-east-1-data',8        S3Key = 'step-function-testing/lambda-function-zip-files/check-cluster-status.zip'9    )10    response = lambda_client.update_function_code(11        FunctionName = 'arn:aws:lambda:us-east-1:849776797214:function:check-emr-job-status',12        S3Bucket = 'us-dev-us-east-1-data',13        S3Key = 'step-function-testing/lambda-function-zip-files/check-emr-job-status.zip'14    )15    response = lambda_client.update_function_code(16        FunctionName = 'arn:aws:lambda:us-east-1:849776797214:function:check-emr-timeout',17        S3Bucket = 'us-dev-us-east-1-data',18        S3Key = 'step-function-testing/lambda-function-zip-files/check-emr-timeout.zip'19    )20    response = lambda_client.update_function_code(21        FunctionName = 'arn:aws:lambda:us-east-1:849776797214:function:check-s3-file-exists',22        S3Bucket = 'us-dev-us-east-1-data',23        S3Key = 'step-function-testing/lambda-function-zip-files/check-s3-file-exists.zip'24    )25    response = lambda_client.update_function_code(26        FunctionName = 'arn:aws:lambda:us-east-1:849776797214:function:Datapipeline-StateMachine-Trigger',27        S3Bucket = 'us-dev-us-east-1-data',28        S3Key = 'step-function-testing/lambda-function-zip-files/Datapipeline-StateMachine-Trigger.zip'29    )30    response = lambda_client.update_function_code(31        FunctionName = 'arn:aws:lambda:us-east-1:849776797214:function:datapipeline-update-checkNumber',32        S3Bucket = 'us-dev-us-east-1-data',33        S3Key = 'step-function-testing/lambda-function-zip-files/datapipeline-update-checkNumber.zip'34    )35    response = lambda_client.update_function_code(36        FunctionName = 'arn:aws:lambda:us-east-1:849776797214:function:emr-add-step',37        S3Bucket = 'us-dev-us-east-1-data',38        S3Key = 'step-function-testing/lambda-function-zip-files/emr-add-step.zip'39    )40    response = lambda_client.update_function_code(41        FunctionName = 'arn:aws:lambda:us-east-1:849776797214:function:emr-cluster-create',42        S3Bucket = 'us-dev-us-east-1-data',43        S3Key = 'step-function-testing/lambda-function-zip-files/emr-cluster-create.zip'44    )45    response = lambda_client.update_function_code(46        FunctionName = 'arn:aws:lambda:us-east-1:849776797214:function:emr-pipe-setup',47        S3Bucket = 'us-dev-us-east-1-data',48        S3Key = 'step-function-testing/lambda-function-zip-files/emr-pipe-setup.zip'49    )50    response = lambda_client.update_function_code(51        FunctionName = 'arn:aws:lambda:us-east-1:849776797214:function:increment-step-number',52        S3Bucket = 'us-dev-us-east-1-data',53        S3Key = 'step-function-testing/lambda-function-zip-files/increment-step-number.zip'54    )55    response = lambda_client.update_function_code(56        FunctionName = 'arn:aws:lambda:us-east-1:849776797214:function:terminate-emr-cluster',57        S3Bucket = 'us-dev-us-east-1-data',58        S3Key = 'step-function-testing/lambda-function-zip-files/terminate-emr-cluster.zip'59    )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
