How to use get_function_configuration method in localstack

Best Python code snippet using localstack_python

test_lambdafunction.py

Source:test_lambdafunction.py Github

copy

Full Screen

1#! /usr/bin/python2# Copyright (C) GRyCAP - I3M - UPV3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15import unittest16import sys17import os18import tempfile19from mock import MagicMock20from mock import patch21sys.path.append("..")22sys.path.append(".")23sys.path.append("../..")24from scar.utils import StrUtils25from scar.providers.aws.lambdafunction import Lambda26class TestLambda(unittest.TestCase):27 def __init__(self, *args):28 os.environ["AWS_DEFAULT_REGION"] = "us-east-1"29 unittest.TestCase.__init__(self, *args)30 @staticmethod31 def _init_mocks(call_list):32 session = MagicMock(['client'])33 client = MagicMock(call_list)34 session.client.return_value = client35 resource_info = {'lambda': {'name': 'fname',36 'runtime': 'python3.7',37 'timeout': 300,38 'memory': 512,39 'layers': [],40 'log_type': 'Tail',41 'tags': {'createdby': 'scar'},42 'handler': 'some.handler',43 'description': 'desc',44 'deployment': {'bucket': 'someb',45 'max_s3_payload_size': 262144000},46 'environment': {'Variables': {'IMAGE_ID': 'some/image'}},47 'container': {'image': 'some/image:tag',48 'image_file': 'some.tgz',49 'environment': {'Variables': {}}},50 'supervisor': {'version': '1.4.2',51 'layer_name': 'layername'}},52 'ecr': {"delete_image": True},53 'api_gateway': {'endpoint': 'https://{api_id}.{api_region}/{stage_name}/l',54 'region': 'us-east-1',55 'stage_name': 'scar'},56 'iam': {'role': 'iamrole'}}57 lam = Lambda(resource_info)58 return session, lam, client59 def test_init(self):60 cli = Lambda({'lambda': {'name': 'fname',61 'supervisor': {'version': '1.4.2'}}})62 self.assertEqual(type(cli.client.client).__name__, "Lambda")63 @patch('boto3.Session')64 @patch('scar.providers.aws.launchtemplates.SupervisorUtils.download_supervisor')65 @patch('scar.providers.aws.udocker.Udocker.prepare_udocker_image')66 @patch('scar.providers.aws.controller.FileUtils.load_tmp_config_file')67 def test_create_function(self, load_tmp_config_file, prepare_udocker_image,68 download_supervisor, boto_session):69 session, lam, _ = self._init_mocks(['list_layers', 'publish_layer_version', 'get_bucket_location', 'put_object',70 'create_function', 'list_layer_versions'])71 boto_session.return_value = session72 load_tmp_config_file.return_value = {}73 tests_path = os.path.dirname(os.path.abspath(__file__))74 download_supervisor.return_value = os.path.join(tests_path, "../../files/supervisor.zip")75 lam.client.client.list_layers.return_value = {'Layers': [{'LayerName': 'layername'}]}76 lam.client.client.publish_layer_version.return_value = {'LayerVersionArn': '1'}77 lam.client.client.create_function.return_value = {'FunctionArn': 'farn'}78 lam.client.client.list_layer_versions.return_value = {'LayerVersions': []}79 lam.create_function()80 fdl = {"storage_providers": {},81 "name": "fname",82 "runtime": "python3.7",83 "timeout": 300,84 "memory": 512,85 "layers": ["1"],86 "log_type": "Tail",87 "tags": {"createdby": "scar"},88 "handler": "some.handler",89 "description": "desc",90 "deployment": {"bucket": "someb", "max_s3_payload_size": 262144000},91 "environment": {"Variables": {"IMAGE_ID": "some/image:tag"}},92 "container": {"image": "some/image:tag", "image_file": "some.tgz", "environment": {"Variables": {}}},93 "supervisor": {"version": "1.4.2", "layer_name": "layername"}}94 res = {'FunctionName': 'fname',95 'Role': 'iamrole',96 'Environment': {'Variables': {'IMAGE_ID': 'some/image:tag',97 'FDL': StrUtils.dict_to_base64_string(fdl)}},98 'Description': 'desc',99 'Timeout': 300,100 'MemorySize': 512,101 'Tags': {'createdby': 'scar'},102 'Code': {'S3Bucket': 'someb', 'S3Key': 'lambda/fname.zip'},103 'Runtime': 'python3.7',104 'Handler': 'some.handler',105 'Architectures': ['x86_64'],106 'Layers': ['1']}107 self.assertEqual(lam.client.client.create_function.call_args_list[0][1], res)108 self.assertEqual(lam.client.client.publish_layer_version.call_args_list[0][1]['LayerName'], "layername")109 self.assertEqual(lam.client.client.publish_layer_version.call_args_list[0][1]['Description'], "1.4.2")110 self.assertEqual(len(lam.client.client.publish_layer_version.call_args_list[0][1]['Content']['ZipFile']), 99662)111 @patch('boto3.Session')112 @patch('scar.providers.aws.launchtemplates.SupervisorUtils.download_supervisor_asset')113 @patch('scar.providers.aws.controller.FileUtils.load_tmp_config_file')114 @patch('scar.providers.aws.lambdafunction.FileUtils.unzip_folder')115 @patch('docker.from_env')116 def test_create_function_image(self, from_env, unzip_folder, load_tmp_config_file,117 download_supervisor_asset, boto_session):118 session, lam, client = self._init_mocks(['create_function', 'create_repository',119 'describe_registry', 'get_authorization_token'])120 boto_session.return_value = session121 load_tmp_config_file.return_value = {}122 tests_path = os.path.dirname(os.path.abspath(__file__))123 download_supervisor_asset.return_value = os.path.join(tests_path, "../../files/supervisor.zip")124 docker = MagicMock(['login', 'images'])125 docker.images = MagicMock(['build', 'push'])126 from_env.return_value = docker127 client.create_repository.return_value = {"repository": {"repositoryUri": "repouri"}}128 client.describe_registry.return_value = {'registryId': 'regid'}129 client.get_authorization_token.return_value = {'authorizationData': [{'authorizationToken': 'QVdTOnRva2Vu'}]}130 lam.resources_info['lambda']['runtime'] = 'image'131 lam.resources_info['lambda']['supervisor']['version'] = lam.supervisor_version = '1.5.0'132 lam.resources_info['lambda']['vpc'] = {'SubnetIds': ['subnet'],133 'SecurityGroupIds': ['sg']}134 lam.resources_info['lambda']['file_system'] = [{'Arn': 'efsaparn', '': '/mnt'}]135 lam.create_function()136 fdl = {"storage_providers": {},137 "name": "fname",138 "runtime": "image",139 "timeout": 300,140 "memory": 512,141 "layers": [],142 "log_type": "Tail",143 "tags": {"createdby": "scar"},144 "handler": "some.handler",145 "description": "desc",146 "deployment": {"bucket": "someb", "max_s3_payload_size": 262144000},147 "environment": {"Variables": {"IMAGE_ID": "repouri:latest"}},148 "container": {"image": "repouri:latest", "image_file": "some.tgz", "environment": {"Variables": {}}},149 "supervisor": {"version": "1.5.0", "layer_name": "layername"},150 "vpc": {"SubnetIds": ["subnet"], "SecurityGroupIds": ["sg"]},151 "file_system": [{'Arn': 'efsaparn', '': '/mnt'}],152 "ecr": {"delete_image": True}}153 res = {'FunctionName': 'fname',154 'Role': 'iamrole',155 'Environment': {'Variables': {'IMAGE_ID': 'repouri:latest',156 'FDL': StrUtils.dict_to_base64_string(fdl)}},157 'Description': 'desc',158 'Timeout': 300,159 'MemorySize': 512,160 'PackageType': 'Image',161 'Tags': {'createdby': 'scar'},162 'Architectures': ['x86_64'],163 'VpcConfig': {'SubnetIds': ['subnet'],164 'SecurityGroupIds': ['sg']},165 'FileSystemConfigs': [{'Arn': 'efsaparn', '': '/mnt'}],166 'Code': {'ImageUri': 'repouri:latest'}}167 self.assertEqual(lam.client.client.create_function.call_args_list[0][1], res)168 self.assertEqual(docker.images.push.call_args_list[0][0][0], "repouri")169 self.assertEqual(docker.images.build.call_args_list[0][1]['tag'], "repouri")170 @patch('boto3.Session')171 def test_delete_function(self, boto_session):172 session, lam, _ = self._init_mocks(['delete_function', 'get_function'])173 boto_session.return_value = session174 lam.client.client.get_function.return_value = {'Configuration': {'Environment': {'Variables': {'FDL': 'e30='}}}}175 lam.client.client.delete_function.return_value = {}176 lam.delete_function()177 self.assertEqual(lam.client.client.delete_function.call_args_list[0][1], {'FunctionName': 'fname'})178 @patch('boto3.Session')179 @patch('scar.providers.aws.containerimage.ECR')180 def test_delete_function_image(self, ecr_client, boto_session):181 session, lam, _ = self._init_mocks(['delete_function', 'get_function'])182 boto_session.return_value = session183 ecr = MagicMock(['get_repository_uri', 'delete_repository'])184 ecr.get_repository_uri.return_value = "repouri"185 ecr_client.return_value = ecr186 lam.client.client.get_function.return_value = {'Configuration': {'Environment': {'Variables': {'FDL': 'cnVudGltZTogaW1hZ2U='}}}}187 lam.client.client.delete_function.return_value = {}188 lam.resources_info['lambda']['runtime'] = 'image'189 lam.delete_function()190 self.assertEqual(lam.client.client.delete_function.call_args_list[0][1], {'FunctionName': 'fname'})191 self.assertEqual(ecr.delete_repository.call_args_list[0][0][0], 'fname')192 @patch('boto3.Session')193 def test_preheat_function(self, boto_session):194 session, lam, _ = self._init_mocks(['invoke'])195 boto_session.return_value = session196 lam.preheat_function()197 res = {'FunctionName': 'fname', 'InvocationType': 'Event', 'LogType': 'None', 'Payload': '{}'}198 self.assertEqual(lam.client.client.invoke.call_args_list[0][1], res)199 @patch('boto3.Session')200 def test_find_function(self, boto_session):201 session, lam, _ = self._init_mocks(['get_function_configuration'])202 boto_session.return_value = session203 lam.client.client.get_function_configuration.return_value = {}204 self.assertEqual(lam.find_function('fname'), True)205 res = {'FunctionName': 'fname'}206 self.assertEqual(lam.client.client.get_function_configuration.call_args_list[0][1], res)207 @patch('boto3.Session')208 def test_process_asynchronous_lambda_invocations(self, boto_session):209 session, lam, _ = self._init_mocks(['invoke', 'get_function_configuration'])210 boto_session.return_value = session211 lam.client.client.get_function_configuration.return_value = {}212 event = {'Records': [{'s3': {'object': {'key': 'okey'}}}]}213 lam.process_asynchronous_lambda_invocations([event])214 res = {'FunctionName': 'fname',215 'InvocationType': 'Event',216 'LogType': 'None',217 'Payload': '{"Records": [{"s3": {"object": {"key": "okey"}}}]}'}218 self.assertEqual(lam.client.client.invoke.call_args_list[0][1], res)219 @patch('boto3.Session')220 @patch('requests.get')221 @patch('requests.post')222 @patch('scar.providers.aws.validators.FileUtils.get_file_size')223 def test_call_http_endpoint(self, get_file_size, post, get, boto_session):224 session, lam, _ = self._init_mocks(['get_function_configuration'])225 boto_session.return_value = session226 lam.client.client.get_function_configuration.return_value = {'Environment': {'Variables': {'API_GATEWAY_ID': 'apiid'}}}227 lam.call_http_endpoint()228 self.assertEqual(get.call_args_list[0][0][0], 'https://apiid.us-east-1/scar/l')229 tmpfile = tempfile.NamedTemporaryFile(delete=False, suffix=".yaml")230 tmpfile.write(b"somedata\n")231 tmpfile.close()232 lam.resources_info['api_gateway']['data_binary'] = tmpfile.name233 lam.resources_info['api_gateway']['parameters'] = {'key': 'value'}234 get_file_size.return_value = 1024235 lam.call_http_endpoint()236 os.unlink(tmpfile.name)237 self.assertEqual(post.call_args_list[0][0][0], 'https://apiid.us-east-1/scar/l')238 self.assertEqual(post.call_args_list[0][1]['data'], b'c29tZWRhdGEK')239 self.assertEqual(post.call_args_list[0][1]['params'], {'key': 'value'})240 @patch('boto3.Session')241 @patch('requests.get')242 @patch('scar.providers.aws.lambdafunction.ZipFile')243 def test_get_fdl_config(self, zipfile, get, boto_session):244 session, lam, _ = self._init_mocks(['get_function'])245 boto_session.return_value = session246 response = MagicMock(['content'])247 response.content = b"aa"248 get.return_value = response249 lam.client.client.get_function.return_value = {'SupervisorVersion': '1.4.2',250 'Code': {'Location': 'http://loc.es'}}251 zfile = MagicMock(['__enter__', '__exit__'])252 zipfile.return_value = zfile253 filedata = MagicMock(['read'])254 filedata.read.side_effect = ["- item\n- item2\n", ""]255 filecont = MagicMock(['__enter__', '__exit__'])256 filecont.__enter__.return_value = filedata257 thezip = MagicMock(['open'])258 thezip.open.return_value = filecont259 zfile.__enter__.return_value = thezip260 self.assertEqual(lam.get_fdl_config('arn'), ['item', 'item2'])261 self.assertEqual(get.call_args_list[0][0][0], "http://loc.es")262 @patch('boto3.Session')263 def test_get_all_functions(self, boto_session):264 session, lam, _ = self._init_mocks(['get_function_configuration'])265 boto_session.return_value = session266 lam.client.client.get_function_configuration.return_value = {'FunctionName': 'fname',267 'FunctionArn': 'arn1',268 'Timeout': 600,269 'MemorySize': 1024}270 res = lam.get_all_functions(['arn1'])271 self.assertEqual(res[0]['lambda']['memory'], 1024)272 self.assertEqual(res[0]['lambda']['supervisor']['version'], '-')273 @patch('boto3.Session')274 @patch('time.sleep')275 def test_wait_function_active(self, sleep, boto_session):276 session, lam, _ = self._init_mocks(['get_function_configuration'])277 boto_session.return_value = session278 lam.client.client.get_function_configuration.return_value = {'State': 'Active'}279 self.assertEqual(lam.wait_function_active('farn'), True)...

Full Screen

Full Screen

config_access_tests.py

Source:config_access_tests.py Github

copy

Full Screen

1from __future__ import print_function2import os3os.environ["AWS_DEFAULT_REGION"] = "eu-west-1"4import aws_lambda_configurer5import json6import unittest27from mock import patch, MagicMock8class ConfigAccessTests(unittest2.TestCase):9 @patch('aws_lambda_configurer.aws_lambda')10 def test_get_config(self, aws_lambda_mock):11 aws_lambda_mock.get_function_configuration.return_value = {12 'Description': json.dumps({13 'endpoint': 'knuff.eu-west-1.es.amazonaws.com'14 })15 }16 config = aws_lambda_configurer.load_config(Context=MockContext('function-arn-name', 'function-version'))17 aws_lambda_mock.get_function_configuration.assert_called_once_with(FunctionName='function-arn-name',18 Qualifier='function-version')19 self.assertEquals(config, {20 'endpoint': 'knuff.eu-west-1.es.amazonaws.com'21 })22 @patch('aws_lambda_configurer.aws_lambda')23 @patch('aws_lambda_configurer.s3_client')24 def test_lookup_s3(self, s3_mock, aws_lambda_mock):25 aws_lambda_mock.get_function_configuration.return_value = {26 'Description': json.dumps({27 "_lookup": {28 "s3": {29 "bucket": "mybucket",30 "key": "my-key"31 }32 },33 "abc": 123,34 "override": 135 })36 }37 self.mock_s3_get(38 s3_mock,39 {40 "foo": "bar",41 "override": 242 })43 config = aws_lambda_configurer.load_config(Context=MockContext('function-arn-name', 'function-version'))44 self.assertEquals(config, {45 "abc": 123,46 "foo": "bar",47 "override": 248 })49 s3_mock.get_object.assert_called_once_with(Bucket='mybucket', Key='my-key')50 def mock_s3_get(self, s3_mock, result):51 body_mock = MagicMock52 s3_mock.get_object.return_value = {53 'Body': body_mock54 }55 body_mock.read = MagicMock(return_value=json.dumps(result))56 @patch('aws_lambda_configurer.aws_lambda')57 def test_raise_json_error(self, aws_lambda_mock):58 aws_lambda_mock.get_function_configuration.return_value = {59 'Description': "i am not a json description"60 }61 with self.assertRaises(Exception) as cm:62 aws_lambda_configurer.load_config(Context=MockContext('function-arn-name', 'function-version'))63 self.assertEqual('Description of function must contain JSON, but was "i am not a json description"',64 cm.exception.message)65 @patch('aws_lambda_configurer.aws_lambda')66 def test_missing_description_error(self, aws_lambda_mock):67 aws_lambda_mock.get_function_configuration.return_value = {}68 with self.assertRaises(KeyError) as cm:69 aws_lambda_configurer.load_config(Context=MockContext('function-arn-name', 'function-version'))70 self.assertEqual('Description', cm.exception.message)71 def test_raise_missing_args_error(self):72 with self.assertRaises(Exception) as cm:73 aws_lambda_configurer.load_config()74 self.assertEqual("Keyword argument 'Context' missing", cm.exception.message)75 @patch('aws_lambda_configurer.aws_lambda')76 def test_raise_invalid_lookup(self, aws_lambda_mock):77 aws_lambda_mock.get_function_configuration.return_value = {78 'Description': json.dumps({79 '_lookup': {80 'invalid_lookup_value': {}81 }})82 }83 with self.assertRaises(Exception) as cm:84 aws_lambda_configurer.load_config(Context=MockContext('function-arn-name', 'function-version'))85 self.assertIn('invalid_lookup_value', cm.exception.message)86 @patch('aws_lambda_configurer.aws_lambda')87 def test_raise_s3_lookup_missing_bucket(self, aws_lambda_mock):88 aws_lambda_mock.get_function_configuration.return_value = {89 'Description': json.dumps({90 '_lookup': {91 's3': {92 'key': 'foo'93 }94 }})95 }96 with self.assertRaises(Exception) as cm:97 aws_lambda_configurer.load_config(Context=MockContext('function-arn-name', 'function-version'))98 self.assertIn("config needs field 'bucket'" , cm.exception.message)99 @patch('aws_lambda_configurer.aws_lambda')100 def test_raise_s3_lookup_missing_key(self, aws_lambda_mock):101 aws_lambda_mock.get_function_configuration.return_value = {102 'Description': json.dumps({103 '_lookup': {104 's3': {105 'bucket': 'bar'106 }107 }})108 }109 with self.assertRaises(Exception) as cm:110 aws_lambda_configurer.load_config(Context=MockContext('function-arn-name', 'function-version'))111 self.assertIn("config needs field 'key'" , cm.exception.message)112class MockContext:113 def __init__(self, invoked_function_arn, function_version):114 self.invoked_function_arn = invoked_function_arn...

Full Screen

Full Screen

set_lambda_env.py

Source:set_lambda_env.py Github

copy

Full Screen

...41 except:42 print("# no such function_name")43 return None44 return envjson[env]["profile_name"], lmb.split(":")[-1]45def get_function_configuration(env, name, filepath="env.json"):46 profile_name, lambda_name = _get_lambda_name(env, name, filepath)47 session = boto3.Session(profile_name=profile_name)48 response = session.client('lambda').get_function_configuration(FunctionName=lambda_name)49 return response["Environment"]["Variables"]50def set_configuration(env, name, variables, filepath="env.json"):51 profile_name, lambda_name = _get_lambda_name(env, name, filepath)52 session = boto3.Session(profile_name=profile_name)53 response = session.client('lambda').update_function_configuration(54 FunctionName=lambda_name,55 Environment={"Variables": variables}56 )57 return response58if __name__ == '__main__':59 argresult = parser()60 if argresult.env is None or argresult.lambda_function is None:61 print("# -e and -l are mandatory!")62 sys.exit(1)63 env_values = get_function_configuration(argresult.env, argresult.lambda_function, argresult.filepath)64 if argresult.show:65 if env_values is not None:66 import pprint67 pprint.pprint(env_values)68 sys.exit(0)69 if argresult.delete:70 if argresult.delete not in env_values:71 print("# no such environment valuable is registered")72 sys.exit(1)73 del env_values[argresult.delete]74 if argresult.add:75 if argresult.value is None or argresult.value == "":76 print("# -v is required for -a (addition)")77 sys.exit(1)78 env_values[argresult.add] = argresult.value79 resp = set_configuration(argresult.env, argresult.lambda_function, env_values, argresult.filepath)80 print("# result =", resp['LastUpdateStatus'])81 env_values = get_function_configuration(argresult.env, argresult.lambda_function, argresult.filepath)82 import pprint...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful