How to use create_change_set method in localstack

Best Python code snippet using localstack_python

main.py

Source:main.py Github

copy

Full Screen

...92 "An error occurred when updating %s, %s", self.stack_name, error93 )94 raise Exception(error)95 return "stack_update_complete"96 def create_change_set(self):97 try:98 client.create_change_set(**self.parameters)99 except botocore.exceptions.ClientError as error:100 logger.error(101 "An error occurred when creating %s, %s",102 self.parameters["ChangeSetName"],103 error,104 )105 raise Exception(error)106 return "change_set_create_complete"107 def delete_change_set(self):108 client.delete_change_set(**self.parameters)109 def execute_change_set(self):110 try:111 client.execute_change_set(**self.parameters)112 except client.exceptions.ChangeSetNotFoundException as error:113 logger.error(114 "Change set %s was not found", self.parameters["ChangeSetName"]115 )116 raise Exception(error)117 return "stack_update_complete"118 def wait(self, waiter_name):119 logger.info("waiting for operation %s to complete", waiter_name)120 waiter = client.get_waiter(waiter_name)121 try:122 # TODO: Fix this, it should use the paremeters provided by the build_cloudformation_parameters123 parameters = {"StackName": self.stack_name}124 if "change_set" in waiter_name:125 parameters["ChangeSetName"] = f"{self.stack_name}-{self.commit}"126 waiter.wait(**parameters)127 logger.info(f"operation {operation} finished!")128 except botocore.exceptions.WaiterError as error:129 logger.error(130 "An error happened waiting for the operation %s, %s", operation, error131 )132 raise Exception(error)133 def generate_parameters(self):134 logger.info("generating parameters")135 for file in os.listdir(self.stack_parameters_path):136 if self.environment in file:137 parameters_file = os.path.join(self.stack_parameters_path, file)138 if "enc" in file:139 subprocess.run(140 [141 "sops",142 "-d",143 parameters_file,144 ">",145 self.stack_tmp_parameters_file,146 ],147 stdout=subprocess.DEVNULL,148 )149 else:150 logger.debug("copying file")151 shutil.copyfile(parameters_file, self.stack_tmp_parameters_file)152 break153 def clean(self):154 logger.info("Cleaning up the parameters")155 subprocess.run(["rm", self.stack_tmp_parameters_file])156# TODO: Support decrypt of the parameters file157# def generate_parameters(parameters_path):158def usage(info, err):159 help = """160main script helps with the creation of CloudFormation stacks.161Usage:162python main stack-name <command>163Commands to work with Stacks:164create - Creates a new stack based on the environment and the stack name165update - Updates an already created stack166delete - Deletes an already created stack167Commands to work with Change Sets:168create_change_set - Creates a new change set using the stack name and the commit's SHA as change set name169execute_change_set - Executes the current change set170delete_change_set - Delete the current change set171The stack-name is the name of the stack's folder inside stacks/172E.g.173python main network create174 """175 print(info)176 print(help)177 sys.exit(err)178def args_validator():179 if len(sys.argv) != 3:180 usage("Invalid number of arguments", errno.EINVAL)181 if sys.argv[2] not in OPERATIONS:182 usage(f"{sys.argv[2]} is not permitted", errno.EPERM)183 stack_path = os.path.join("stacks", sys.argv[1])184 if not os.path.exists(stack_path):185 usage(f"{stack_path} does not exist", errno.EPERM)186 try:187 os.environ["PROJECT"]188 os.environt["ENVIRONMENT"]189 except:190 logger.warning(191 "PROJECT or ENVIRONMENT are not set, falling back to default values"192 )193# Run validation before creating194args_validator()195operation = sys.argv[2]196stack = Stack(sys.argv[1], sys.argv[2])197stack.validate_cloudformation_template()198try:199 if operation == "create":200 waiter_name = stack.create_stack()201 elif operation == "delete":202 waiter_name = stack.delete_stack()203 elif operation == "update":204 waiter_name = stack.update_stack()205 elif operation == "create_change_set":206 waiter_name = stack.create_change_set()207 elif operation == "delete_change_set":208 waiter_name = stack.delete_change_set()209 sys.exit(0)210 elif operation == "execute_change_set":211 waiter_name = stack.execute_change_set()212 else:213 usage(1)214 stack.wait(waiter_name)215except Exception as error:216 logger.error(error)217finally:...

Full Screen

Full Screen

deploy.py

Source:deploy.py Github

copy

Full Screen

...36 client.mock('describe_stacks', describe_stacks)37 exists = _stack_exists(client, 'foo')38 self.assertEqual(client.called['describe_stacks'], 1)39 self.assertEqual(exists, False)40 def test_make_create_change_set(self):41 '''create a changeset42 '''43 stack_name = 'foo'44 client = cfn.Cloudformation()45 describe_stacks = cfn.make_describe_stacks(client, 3, 'CREATE_COMPLETE', 'bar')46 client.mock('describe_stacks', describe_stacks)47 def create_change_set(StackName, TemplateURL, UsePreviousTemplate, Parameters, Capabilities, ChangeSetName, ChangeSetType):48 self.assertEqual(ChangeSetType, 'CREATE')49 return {50 'Id': 'string',51 'StackId': 'string'52 }53 client.mock('create_change_set', create_change_set)54 _make_change_set(client, stack_name, '', {})55 self.assertEqual(client.called['create_change_set'], 1)56 def test_make_update_change_set(self):57 '''update a stack with changeset58 '''59 stack_name = 'foo'60 client = cfn.Cloudformation()61 describe_stacks = cfn.make_describe_stacks(client, 3, 'CREATE_COMPLETE')62 client.mock('describe_stacks', describe_stacks)63 def create_change_set(StackName, TemplateURL, UsePreviousTemplate, Parameters, Capabilities, ChangeSetName, ChangeSetType):64 self.assertEqual(ChangeSetType, 'UPDATE')65 return {66 'Id': 'string',67 'StackId': 'string'68 }69 client.mock('create_change_set', create_change_set)70 _make_change_set(client, stack_name, '', {})71 self.assertEqual(client.called['create_change_set'], 1)72 def test_wait_for_changeset(self):73 '''pause execution until the changeset creation is complete74 '''75 n_calls = 376 client = cfn.Cloudformation()77 describe_change_set = cfn.make_describe_change_set(client, n_calls, 'CREATE_COMPLETE')...

Full Screen

Full Screen

test_deploy.py

Source:test_deploy.py Github

copy

Full Screen

1from unittest import TestCase2import pytest3import os4from mock import MagicMock5from stacker import deploy6class DeployExecutorTest(TestCase):7 cf_json = os.path.join(os.path.dirname(__file__),'resources/cloudformation.json')8 cf_yaml_functions = os.path.join(os.path.dirname(__file__), 'resources/cf_functions.yaml')9 cf_json_functions = os.path.join(os.path.dirname(__file__), 'resources/cf_functions.json')10 config_json = os.path.join(os.path.dirname(__file__), 'resources/config.json')11 def test_error_thrown_on_incorrect_config_file_format(self):12 with pytest.raises(deploy.DeployException):13 executor = deploy.DeployExecutor()14 executor.load_parameters("config.txt")15 def test_load_json_cloudformation(self):16 executor = deploy.DeployExecutor()17 executor.load_cloudformation(self.cf_json)18 def test_deploy_with_no_config_file(self):19 executor = deploy.DeployExecutor()20 executor.cf_client = MagicMock()21 executor.cf_client.create_change_set = MagicMock()22 executor.cf_client.wait_for_change_set_to_complete = MagicMock()23 executor.execute(stack_name="test-stack",template_name=self.cf_json)24 def test_deploy_with_json_config(self):25 executor = deploy.DeployExecutor()26 executor.cf_client = MagicMock()27 executor.cf_client.create_change_set = MagicMock()28 executor.cf_client.wait_for_change_set_to_complete = MagicMock()29 executor.execute(stack_name="test-stack",template_name=self.cf_json, config_filename=self.config_json, create=True)30 executor.cf_client.create_change_set.assert_called()31 executor.cf_client.wait_for_change_set_to_complete.assert_called()32 # Tests that we can parse yaml containing Cloudformation functions33 def test_deploy_yaml_cf_with_functions(self):34 executor = deploy.DeployExecutor()35 executor.cf_client = MagicMock()36 executor.cf_client.create_change_set = MagicMock()37 executor.cf_client.wait_for_change_set_to_complete = MagicMock()38 executor.execute(stack_name="test-stack", template_name=self.cf_yaml_functions, config_filename=self.config_json, create=True)39 executor.cf_client.create_change_set.assert_called()40 executor.cf_client.wait_for_change_set_to_complete.assert_called()41 # Tests that we can parse json containing Cloudformation functions42 def test_deploy_json_cf_with_functions(self):43 executor = deploy.DeployExecutor()44 executor.cf_client = MagicMock()45 executor.cf_client.create_change_set = MagicMock()46 executor.cf_client.wait_for_change_set_to_complete = MagicMock()47 executor.execute(stack_name="test-stack",template_name=self.cf_json_functions, config_filename=self.config_json, create=True)48 executor.cf_client.create_change_set.assert_called()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful