How to use describe_transit_gateway_vpc_attachments method in localstack

Best Python code snippet using localstack_python

aws.py

Source:aws.py Github

copy

Full Screen

...92 """93 logger = logging.getLogger(__name__)94 # Lookup tgwId and tgwAttachmentId for a vpc95 if tgwId == None:96 response = ec2_client.describe_transit_gateway_vpc_attachments(97 Filters=[{'Name': 'vpc-id', 'Values': [vpc.id]},98 {'Name': 'state', 'Values': [99 'available']}100 ]101 )102 else:103 response = ec2_client.describe_transit_gateway_vpc_attachments(104 Filters=[105 {'Name': 'vpc-id', 'Values': [vpc.id]},106 {'Name': 'state', 'Values': ['available']},107 {'Name': 'transit-gateway-id', 'Values': [tgwId]}108 ]109 )110 if len(response["TransitGatewayVpcAttachments"]) == 0:111 return []112 return response['TransitGatewayVpcAttachments'][0]['SubnetIds']113 @classmethod114 def restoreTgwAttachementStaticRoute(cls, tgwRtbInfo, initialKey, ec2_client, vpc, tgwId):115 logger = logging.getLogger(__name__)116 # Lookup tgwId and tgwAttachmentId for a vpc117 if tgwId == None:118 response = ec2_client.describe_transit_gateway_vpc_attachments(119 Filters=[{'Name': 'vpc-id', 'Values': [vpc.id]},120 {'Name': 'state', 'Values': [121 'available']}122 ]123 )124 else:125 response = ec2_client.describe_transit_gateway_vpc_attachments(126 Filters=[127 {'Name': 'vpc-id', 'Values': [vpc.id]},128 {'Name': 'state', 'Values': ['available']},129 {'Name': 'transit-gateway-id', 'Values': [tgwId]}130 ]131 )132 # if len(response["TransitGatewayVpcAttachments"]) == 0:133 # return134 for attachObj in response['TransitGatewayVpcAttachments']:135 tgwAttachmentId = attachObj['TransitGatewayAttachmentId']136 tgwId = attachObj['TransitGatewayId']137 response = ec2_client.describe_transit_gateway_route_tables(138 Filters=[139 {'Name': 'transit-gateway-id', 'Values': [tgwId]},140 {'Name': 'state', 'Values': ['available']}141 ]142 )143 # iterate all tgw route tables144 for x in response['TransitGatewayRouteTables']:145 tgwRtbId = x['TransitGatewayRouteTableId']146 key = f'{initialKey}-{tgwAttachmentId}-{tgwRtbId}'147 if not key in tgwRtbInfo:148 continue149 logger.info(150 f'- Restore static route(s) for {tgwAttachmentId} into {tgwId}/{tgwRtbId}')151 for cidr in tgwRtbInfo[key]:152 try:153 logger.info(f' add route {cidr}')154 if cls.DryRun:155 continue156 response = ec2_client.create_transit_gateway_route(157 DestinationCidrBlock=cidr,158 TransitGatewayRouteTableId=tgwRtbId,159 TransitGatewayAttachmentId=tgwAttachmentId160 )161 except botocore.exceptions.ClientError as err:162 logger.error(f' **Alert** {err}')163 @classmethod164 def deleteTgwAttachementStaticRoute(cls, tgwRtbInfo, initialKey, ec2_client, vpc, tgwId):165 logger = logging.getLogger(__name__)166 # Lookup tgwId and tgwAttachmentId for a vpc167 if tgwId == None:168 response = ec2_client.describe_transit_gateway_vpc_attachments(169 Filters=[{'Name': 'vpc-id', 'Values': [vpc.id]},170 {'Name': 'state', 'Values': [171 'available']}172 ]173 )174 else:175 response = ec2_client.describe_transit_gateway_vpc_attachments(176 Filters=[177 {'Name': 'vpc-id', 'Values': [vpc.id]},178 {'Name': 'state', 'Values': ['available']},179 {'Name': 'transit-gateway-id', 'Values': [tgwId]}180 ]181 )182 # if len(response["TransitGatewayVpcAttachments"]) == 0:183 # return184 for attachObj in response['TransitGatewayVpcAttachments']:185 tgwAttachmentId = attachObj['TransitGatewayAttachmentId']186 tgwId = attachObj['TransitGatewayId']187 response = ec2_client.describe_transit_gateway_route_tables(188 Filters=[189 {'Name': 'transit-gateway-id', 'Values': [tgwId]},190 {'Name': 'state', 'Values': ['available']}191 ]192 )193 # iterate all tgw route tables194 for x in response['TransitGatewayRouteTables']:195 tgwRtbId = x['TransitGatewayRouteTableId']196 response = ec2_client.search_transit_gateway_routes(197 TransitGatewayRouteTableId=tgwRtbId,198 Filters=[199 {200 'Name': 'attachment.transit-gateway-attachment-id',201 'Values': [202 tgwAttachmentId,203 ]204 },205 {206 'Name': 'type',207 'Values': [208 'static',209 ]210 }211 ])212 if len(response['Routes']) == 0:213 continue214 logger.info(215 f'- Delete static route(s) for {tgwAttachmentId} from {tgwId}/{tgwRtbId}')216 key = f'{initialKey}-{tgwAttachmentId}-{tgwRtbId}'217 if cls.DryRun:218 continue219 cls.storeAndDeleteTgwRoute(220 tgwRtbInfo, key, ec2_client, tgwRtbId, response['Routes'])221 @classmethod222 def mgmtTgwAttachementPropagatedRoute(cls, revert, tgwRtbInfo, initialKey, ec2_client, vpc, tgwId):223 # Lookup tgwId and tgwAttachmentId for a vpc224 if tgwId == None:225 response = ec2_client.describe_transit_gateway_vpc_attachments(226 Filters=[{'Name': 'vpc-id', 'Values': [vpc.id]},227 {'Name': 'state', 'Values': [228 'available']}229 ]230 )231 else:232 response = ec2_client.describe_transit_gateway_vpc_attachments(233 Filters=[234 {'Name': 'vpc-id', 'Values': [vpc.id]},235 {'Name': 'state', 'Values': ['available']},236 {'Name': 'transit-gateway-id', 'Values': [tgwId]}237 ]238 )239 for attachObj in response['TransitGatewayVpcAttachments']:240 logger = logging.getLogger(__name__)241 if revert == False:242 tgw_attach_id = attachObj["TransitGatewayAttachmentId"]243 response = ec2_client.get_transit_gateway_attachment_propagations(244 TransitGatewayAttachmentId=tgw_attach_id)245 key = f'{initialKey}-{vpc.id}'246 if len(response['TransitGatewayAttachmentPropagations']) == 0:...

Full Screen

Full Screen

test_transit_gateway.py

Source:test_transit_gateway.py Github

copy

Full Screen

1# Copyright (c) 2018 Cloudify Platform Ltd. All rights reserved2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14# Standard imports15import unittest16# Third party imports17from mock import patch, MagicMock18from cloudify.exceptions import OperationRetry19# Local imports20from cloudify_aws.common._compat import reload_module21from cloudify_aws.ec2.resources import transit_gateway as mod22from cloudify_aws.common.tests.test_base import (23 TestBase,24 mock_decorator25)26class TestEC2TransitGateway(TestBase):27 def setUp(self):28 self.transit_gateway = mod.EC2TransitGateway(29 "ctx_node",30 resource_id='test_name',31 client=True,32 logger=None)33 mock1 = patch('cloudify_aws.common.decorators.aws_resource',34 mock_decorator)35 mock2 = patch('cloudify_aws.common.decorators.wait_for_status',36 mock_decorator)37 mock1.start()38 mock2.start()39 reload_module(mod)40 def test_class_properties(self):41 effect = self.get_client_error_exception(name='EC2 Transit Gateway')42 self.transit_gateway.client = self.make_client_function(43 'describe_transit_gateways', side_effect=effect)44 res = self.transit_gateway.properties45 self.assertEqual(res, {})46 value = {}47 self.transit_gateway.client = self.make_client_function(48 'describe_transit_gateways', return_value=value)49 res = self.transit_gateway.properties50 self.assertEqual(res, value)51 value = {mod.TGS: [{mod.TG_ID: 'test_name'}]}52 self.transit_gateway.client = self.make_client_function(53 'describe_transit_gateways', return_value=value)54 res = self.transit_gateway.properties55 self.assertEqual(res[mod.TG_ID], 'test_name')56 def test_class_status(self):57 value = {58 mod.TGS: [59 {mod.TG_ID: 'test_name', 'State': None}]60 }61 self.transit_gateway.client = self.make_client_function(62 'describe_transit_gateways', return_value=value)63 res = self.transit_gateway.status64 self.assertIsNone(res)65 def test_class_status_positive(self):66 value = {67 mod.TGS: [68 {mod.TG_ID: 'test_name', 'State': 'available'}]69 }70 se = [value, value, value]71 self.transit_gateway.client = self.make_client_function(72 'describe_transit_gateways', side_effect=se)73 res = self.transit_gateway.status74 self.assertEqual(res, 'available')75 def test_class_create(self):76 value = {mod.TG: 'test'}77 self.transit_gateway.client = self.make_client_function(78 'create_transit_gateway', return_value=value)79 res = self.transit_gateway.create(value)80 self.assertEqual(res[mod.TG], value[mod.TG])81 def test_class_delete(self):82 params = {}83 self.transit_gateway.client = self.make_client_function(84 'delete_transit_gateway')85 self.transit_gateway.delete(params)86 self.assertTrue(87 self.transit_gateway.client.delete_transit_gateway.called)88 params = {mod.TG: 'transit gateway'}89 self.transit_gateway.delete(params)90 self.assertEqual(params[mod.TG], 'transit gateway')91 def test_prepare(self):92 ctx = self.get_mock_ctx(mod.TG)93 config = {mod.TG_ID: 'transit gateway'}94 mod.prepare(ctx, mod.EC2TransitGateway, config)95 self.assertEqual(ctx.instance.runtime_properties['resource_config'],96 config)97 def test_create(self):98 ctx = self.get_mock_ctx(mod.TG)99 config = {mod.TG_ID: 'transit gateway'}100 self.transit_gateway.resource_id = config[mod.TG_ID]101 iface = MagicMock()102 iface.create = self.mock_return({mod.TG: config})103 mod.create(ctx=ctx, iface=iface, resource_config=config)104 self.assertEqual(self.transit_gateway.resource_id, 'transit gateway')105 def test_delete(self):106 ctx = self.get_mock_ctx(mod.TG)107 iface = MagicMock()108 mod.delete(ctx=ctx, iface=iface, resource_config={})109 self.assertTrue(iface.delete.called)110class TestEC2TransitGatewayAttachment(TestBase):111 def setUp(self):112 self.transit_gateway_attachment = mod.EC2TransitGatewayAttachment(113 "ctx_node",114 resource_id=True,115 client=True,116 logger=None)117 mock1 = patch('cloudify_aws.common.decorators.aws_resource',118 mock_decorator)119 mock2 = patch('cloudify_aws.common.decorators.wait_for_status',120 mock_decorator)121 mock1.start()122 mock2.start()123 reload_module(mod)124 def test_class_properties(self):125 effect = self.get_client_error_exception(126 name='EC2 Transit Gateway Attachment')127 self.transit_gateway_attachment.client = self.make_client_function(128 'describe_transit_gateway_vpc_attachments', side_effect=effect)129 res = self.transit_gateway_attachment.properties130 self.assertIsNone(res)131 value = {}132 self.transit_gateway_attachment.client = self.make_client_function(133 'describe_transit_gateway_vpc_attachments', return_value=value)134 res = self.transit_gateway_attachment.properties135 self.assertIsNone(res)136 value = {137 mod.TG_ATTACHMENTS: [138 {mod.TG_ATTACHMENT_ID: 'test_name'}]139 }140 self.transit_gateway_attachment.client = self.make_client_function(141 'describe_transit_gateway_vpc_attachments', return_value=value)142 res = self.transit_gateway_attachment.properties143 self.assertEqual(res[mod.TG_ATTACHMENT_ID], 'test_name')144 def test_class_status(self):145 value = {}146 self.transit_gateway_attachment.client = self.make_client_function(147 'describe_transit_gateway_vpc_attachments', return_value=value)148 res = self.transit_gateway_attachment.status149 self.assertIsNone(res)150 value = {151 mod.TG_ATTACHMENTS: [152 {153 mod.TG_ATTACHMENT_ID: 'test_name',154 'State': 'available'155 }156 ]157 }158 self.transit_gateway_attachment.client = self.make_client_function(159 'describe_transit_gateway_vpc_attachments', return_value=value)160 res = self.transit_gateway_attachment.status161 self.assertEqual(res, 'available')162 def test_class_create(self):163 value = {mod.TG_ATTACHMENT: 'test'}164 self.transit_gateway_attachment.client = self.make_client_function(165 'create_transit_gateway_vpc_attachment', return_value=value)166 res = self.transit_gateway_attachment.create(value)167 self.assertEqual(res[mod.TG_ATTACHMENT],168 value[mod.TG_ATTACHMENT])169 def test_class_accept(self):170 value = {mod.TG_ATTACHMENT: 'test'}171 self.transit_gateway_attachment.client = self.make_client_function(172 'accept_transit_gateway_vpc_attachment', return_value=value)173 res = self.transit_gateway_attachment.accept(value)174 self.assertEqual(res[mod.TG_ATTACHMENT],175 value[mod.TG_ATTACHMENT])176 def test_class_delete(self):177 params = {}178 self.transit_gateway_attachment.client = self.make_client_function(179 'delete_transit_gateway_vpc_attachment')180 self.transit_gateway_attachment.delete(params)181 self.assertTrue(self.transit_gateway_attachment.182 client.delete_transit_gateway_vpc_attachment.called)183 params = {mod.TG_ATTACHMENT_ID: 'transit gateway'}184 self.transit_gateway_attachment.delete(params)185 self.assertEqual(params[mod.TG_ATTACHMENT_ID],186 'transit gateway')187 def test_create(self):188 source_ctx = self.get_mock_ctx(mod.TG)189 target_ctx = self.get_mock_ctx(mod.TG)190 ctx = self.get_mock_relationship_ctx(191 mod.TG_ATTACHMENT, test_source=source_ctx,192 test_target=target_ctx)193 config = {mod.TG_ATTACHMENT_ID: 'transit gateway'}194 self.transit_gateway_attachment.resource_id = \195 config[mod.TG_ATTACHMENT_ID]196 iface = MagicMock()197 iface.create = self.mock_return({mod.TG: config})198 mod.request_vpc_attachment(199 ctx=ctx,200 iface=iface,201 transit_gateway_id='transit gateway',202 vpc_id='vpc',203 subnet_ids=['subnet'])204 self.assertIn(mod.TG_ATTACHMENTS,205 source_ctx.instance.runtime_properties)206 def test_delete(self):207 source_ctx = self.get_mock_ctx(208 mod.TG,209 test_runtime_properties={'aws_resource_id': 'transit gateway'})210 ctx = self.get_mock_relationship_ctx(211 mod.TG_ATTACHMENT, test_source=source_ctx)212 iface = MagicMock(client=MagicMock())213 with self.assertRaises(OperationRetry):214 mod.delete_vpc_attachment(215 ctx=ctx,216 iface=iface,217 transit_gateway_attachment_id='transit gateway')218 self.assertTrue(iface.delete.called)219if __name__ == '__main__':...

Full Screen

Full Screen

index.py

Source:index.py Github

copy

Full Screen

...38 }39 )40 logger.info('describe_transit_gateway_vpc_attachments on {}'.format(41 TgwInspectionVpcAttachmentId))42 TgwResponse = ec2.describe_transit_gateway_vpc_attachments(43 TransitGatewayAttachmentIds=[TgwInspectionVpcAttachmentId]44 )45 ApplianceModeStatus = TgwResponse['TransitGatewayVpcAttachments'][0]['Options']['ApplianceModeSupport']46 DnsSupportStatus = TgwResponse['TransitGatewayVpcAttachments'][0]['Options']['DnsSupport']47 response['Data'] = {48 'ApplianceModeStatus': ApplianceModeStatus,49 'DnsSupportStatus': DnsSupportStatus50 }51 except Exception as e:52 logger.info(53 'ec2.modify/describe_transit_gateway_vpc_attachment: {}'.format(e))54 logger.info('Response: {}'.format(json.dumps(event)))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful