Best Python code snippet using localstack_python
test_vpc_peering_connection.py
Source:test_vpc_peering_connection.py  
1# Copyright (c) 2014 Skytap http://skytap.com/2#3# Permission is hereby granted, free of charge, to any person obtaining a4# copy of this software and associated documentation files (the5# "Software"), to deal in the Software without restriction, including6# without limitation the rights to use, copy, modify, merge, publish, dis-7# tribute, sublicense, and/or sell copies of the Software, and to permit8# persons to whom the Software is furnished to do so, subject to the fol-9# lowing conditions:10#11# The above copyright notice and this permission notice shall be included12# in all copies or substantial portions of the Software.13#14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS15# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-16# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT17# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,18# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS20# IN THE SOFTWARE.21from tests.unit import mock, unittest22from tests.unit import AWSMockServiceTestCase23from boto.vpc import VpcPeeringConnection, VPCConnection, Subnet24class TestDescribeVpcPeeringConnections(AWSMockServiceTestCase):25    DESCRIBE_VPC_PEERING_CONNECTIONS= b"""<?xml version="1.0" encoding="UTF-8"?>26<DescribeVpcPeeringConnectionsResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">27   <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>28    <vpcPeeringConnectionSet>29        <item>30           <vpcPeeringConnectionId>pcx-111aaa22</vpcPeeringConnectionId>31            <requesterVpcInfo>32               <ownerId>777788889999</ownerId>33                <vpcId>vpc-1a2b3c4d</vpcId>34              <cidrBlock>172.31.0.0/16</cidrBlock>35           </requesterVpcInfo>36           <accepterVpcInfo>37                <ownerId>111122223333</ownerId>38               <vpcId>vpc-aa22cc33</vpcId>39           </accepterVpcInfo>40            <status>41                <code>pending-acceptance</code>42               <message>Pending Acceptance by 111122223333</message>43            </status>44           <expirationTime>2014-02-17T16:00:50.000Z</expirationTime>45        </item>46        <item>47           <vpcPeeringConnectionId>pcx-444bbb88</vpcPeeringConnectionId>48            <requesterVpcInfo>49               <ownerId>1237897234</ownerId>50                <vpcId>vpc-2398abcd</vpcId>51              <cidrBlock>172.30.0.0/16</cidrBlock>52           </requesterVpcInfo>53           <accepterVpcInfo>54                <ownerId>98654313</ownerId>55               <vpcId>vpc-0983bcda</vpcId>56           </accepterVpcInfo>57            <status>58                <code>pending-acceptance</code>59               <message>Pending Acceptance by 98654313</message>60            </status>61           <expirationTime>2015-02-17T16:00:50.000Z</expirationTime>62        </item>63    </vpcPeeringConnectionSet>64</DescribeVpcPeeringConnectionsResponse>"""65    66    connection_class = VPCConnection67    def default_body(self):68        return self.DESCRIBE_VPC_PEERING_CONNECTIONS69    def test_get_vpc_peering_connections(self):70        self.set_http_response(status_code=200)71        api_response = self.service_connection.get_all_vpc_peering_connections(72            ['pcx-111aaa22', 'pcx-444bbb88'], filters=[('status-code', ['pending-acceptance'])])73        self.assertEqual(len(api_response), 2)74        for vpc_peering_connection in api_response:75            if vpc_peering_connection.id == 'pcx-111aaa22':76                self.assertEqual(vpc_peering_connection.id, 'pcx-111aaa22')77                self.assertEqual(vpc_peering_connection.status_code, 'pending-acceptance')78                self.assertEqual(vpc_peering_connection.status_message, 'Pending Acceptance by 111122223333')79                self.assertEqual(vpc_peering_connection.requester_vpc_info.owner_id, '777788889999')80                self.assertEqual(vpc_peering_connection.requester_vpc_info.vpc_id, 'vpc-1a2b3c4d')81                self.assertEqual(vpc_peering_connection.requester_vpc_info.cidr_block, '172.31.0.0/16')82                self.assertEqual(vpc_peering_connection.accepter_vpc_info.owner_id, '111122223333')83                self.assertEqual(vpc_peering_connection.accepter_vpc_info.vpc_id, 'vpc-aa22cc33')84                self.assertEqual(vpc_peering_connection.expiration_time, '2014-02-17T16:00:50.000Z')85            else:86                self.assertEqual(vpc_peering_connection.id, 'pcx-444bbb88')87                self.assertEqual(vpc_peering_connection.status_code, 'pending-acceptance')88                self.assertEqual(vpc_peering_connection.status_message, 'Pending Acceptance by 98654313')89                self.assertEqual(vpc_peering_connection.requester_vpc_info.owner_id, '1237897234')90                self.assertEqual(vpc_peering_connection.requester_vpc_info.vpc_id, 'vpc-2398abcd')91                self.assertEqual(vpc_peering_connection.requester_vpc_info.cidr_block, '172.30.0.0/16')92                self.assertEqual(vpc_peering_connection.accepter_vpc_info.owner_id, '98654313')93                self.assertEqual(vpc_peering_connection.accepter_vpc_info.vpc_id, 'vpc-0983bcda')94                self.assertEqual(vpc_peering_connection.expiration_time, '2015-02-17T16:00:50.000Z')95class TestCreateVpcPeeringConnection(AWSMockServiceTestCase):96    CREATE_VPC_PEERING_CONNECTION= b"""<CreateVpcPeeringConnectionResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">97  <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>98  <vpcPeeringConnection>99        <vpcPeeringConnectionId>pcx-73a5401a</vpcPeeringConnectionId>100        <requesterVpcInfo>101            <ownerId>777788889999</ownerId>102            <vpcId>vpc-1a2b3c4d</vpcId>103            <cidrBlock>10.0.0.0/28</cidrBlock>104        </requesterVpcInfo>105        <accepterVpcInfo>106            <ownerId>123456789012</ownerId>107            <vpcId>vpc-a1b2c3d4</vpcId>108        </accepterVpcInfo>109        <status>110            <code>initiating-request</code>111            <message>Initiating Request to 123456789012</message>112        </status>113        <expirationTime>2014-02-18T14:37:25.000Z</expirationTime>114        <tagSet/>115    </vpcPeeringConnection>116</CreateVpcPeeringConnectionResponse>"""117    118    connection_class = VPCConnection119    def default_body(self):120        return self.CREATE_VPC_PEERING_CONNECTION121    def test_create_vpc_peering_connection(self):122        self.set_http_response(status_code=200)123        vpc_peering_connection = self.service_connection.create_vpc_peering_connection('vpc-1a2b3c4d', 'vpc-a1b2c3d4', '123456789012')124        self.assertEqual(vpc_peering_connection.id, 'pcx-73a5401a')125        self.assertEqual(vpc_peering_connection.status_code, 'initiating-request')126        self.assertEqual(vpc_peering_connection.status_message, 'Initiating Request to 123456789012')127        self.assertEqual(vpc_peering_connection.requester_vpc_info.owner_id, '777788889999')128        self.assertEqual(vpc_peering_connection.requester_vpc_info.vpc_id, 'vpc-1a2b3c4d')129        self.assertEqual(vpc_peering_connection.requester_vpc_info.cidr_block, '10.0.0.0/28')130        self.assertEqual(vpc_peering_connection.accepter_vpc_info.owner_id, '123456789012')131        self.assertEqual(vpc_peering_connection.accepter_vpc_info.vpc_id, 'vpc-a1b2c3d4')132        self.assertEqual(vpc_peering_connection.expiration_time, '2014-02-18T14:37:25.000Z')133class TestDeleteVpcPeeringConnection(AWSMockServiceTestCase):134    DELETE_VPC_PEERING_CONNECTION= b"""<DeleteVpcPeeringConnectionResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">135  <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>136  <return>true</return>137</DeleteVpcPeeringConnectionResponse>"""138    139    connection_class = VPCConnection140    def default_body(self):141        return self.DELETE_VPC_PEERING_CONNECTION142    def test_delete_vpc_peering_connection(self):143        self.set_http_response(status_code=200)144        self.assertEquals(self.service_connection.delete_vpc_peering_connection('pcx-12345678'), True)145class TestDeleteVpcPeeringConnectionShortForm(unittest.TestCase):146    DESCRIBE_VPC_PEERING_CONNECTIONS= b"""<?xml version="1.0" encoding="UTF-8"?>147<DescribeVpcPeeringConnectionsResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">148   <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>149   <vpcPeeringConnectionSet>150      <item>151         <vpcPeeringConnectionId>pcx-111aaa22</vpcPeeringConnectionId>152         <requesterVpcInfo>153            <ownerId>777788889999</ownerId>154            <vpcId>vpc-1a2b3c4d</vpcId>155            <cidrBlock>172.31.0.0/16</cidrBlock>156         </requesterVpcInfo>157         <accepterVpcInfo>158            <ownerId>111122223333</ownerId>159            <vpcId>vpc-aa22cc33</vpcId>160         </accepterVpcInfo>161         <status>162            <code>pending-acceptance</code>163            <message>Pending Acceptance by 111122223333</message>164         </status>165         <expirationTime>2014-02-17T16:00:50.000Z</expirationTime>166      </item>167   </vpcPeeringConnectionSet>168</DescribeVpcPeeringConnectionsResponse>"""169    DELETE_VPC_PEERING_CONNECTION= b"""<DeleteVpcPeeringConnectionResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">170  <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>171  <return>true</return>172</DeleteVpcPeeringConnectionResponse>"""173    def test_delete_vpc_peering_connection(self):174        vpc_conn = VPCConnection(aws_access_key_id='aws_access_key_id',175                                 aws_secret_access_key='aws_secret_access_key')176        mock_response = mock.Mock()177        mock_response.read.return_value = self.DESCRIBE_VPC_PEERING_CONNECTIONS178        mock_response.status = 200179        vpc_conn.make_request = mock.Mock(return_value=mock_response)180        vpc_peering_connections = vpc_conn.get_all_vpc_peering_connections()181        self.assertEquals(1, len(vpc_peering_connections))182        vpc_peering_connection = vpc_peering_connections[0]183        mock_response = mock.Mock()184        mock_response.read.return_value = self.DELETE_VPC_PEERING_CONNECTION185        mock_response.status = 200186        vpc_conn.make_request = mock.Mock(return_value=mock_response)187        self.assertEquals(True, vpc_peering_connection.delete())188        self.assertIn('DeleteVpcPeeringConnection', vpc_conn.make_request.call_args_list[0][0])189        self.assertNotIn('DeleteVpc', vpc_conn.make_request.call_args_list[0][0])190class TestRejectVpcPeeringConnection(AWSMockServiceTestCase):191    REJECT_VPC_PEERING_CONNECTION= b"""<RejectVpcPeeringConnectionResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">192  <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>193  <return>true</return>194</RejectVpcPeeringConnectionResponse>"""195    196    connection_class = VPCConnection197    def default_body(self):198        return self.REJECT_VPC_PEERING_CONNECTION199    def test_reject_vpc_peering_connection(self):200        self.set_http_response(status_code=200)201        self.assertEquals(self.service_connection.reject_vpc_peering_connection('pcx-12345678'), True)202class TestAcceptVpcPeeringConnection(AWSMockServiceTestCase):203    ACCEPT_VPC_PEERING_CONNECTION= b"""<AcceptVpcPeeringConnectionResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">204  <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>205  <vpcPeeringConnection>206        <vpcPeeringConnectionId>pcx-1a2b3c4d</vpcPeeringConnectionId>207       <requesterVpcInfo>208            <ownerId>123456789012</ownerId>209            <vpcId>vpc-1a2b3c4d</vpcId>210            <cidrBlock>10.0.0.0/28</cidrBlock>211        </requesterVpcInfo>212        <accepterVpcInfo>213            <ownerId>777788889999</ownerId>214            <vpcId>vpc-111aaa22</vpcId>215            <cidrBlock>10.0.1.0/28</cidrBlock>216        </accepterVpcInfo>217        <status>218            <code>active</code>219            <message>Active</message>220        </status>221        <tagSet/>222    </vpcPeeringConnection>223</AcceptVpcPeeringConnectionResponse>"""224    225    connection_class = VPCConnection226    def default_body(self):227        return self.ACCEPT_VPC_PEERING_CONNECTION228    def test_accept_vpc_peering_connection(self):229        self.set_http_response(status_code=200)230        vpc_peering_connection = self.service_connection.accept_vpc_peering_connection('pcx-1a2b3c4d')231        self.assertEqual(vpc_peering_connection.id, 'pcx-1a2b3c4d')232        self.assertEqual(vpc_peering_connection.status_code, 'active')233        self.assertEqual(vpc_peering_connection.status_message, 'Active')234        self.assertEqual(vpc_peering_connection.requester_vpc_info.owner_id, '123456789012')235        self.assertEqual(vpc_peering_connection.requester_vpc_info.vpc_id, 'vpc-1a2b3c4d')236        self.assertEqual(vpc_peering_connection.requester_vpc_info.cidr_block, '10.0.0.0/28')237        self.assertEqual(vpc_peering_connection.accepter_vpc_info.owner_id, '777788889999')238        self.assertEqual(vpc_peering_connection.accepter_vpc_info.vpc_id, 'vpc-111aaa22')239        self.assertEqual(vpc_peering_connection.accepter_vpc_info.cidr_block, '10.0.1.0/28')240if __name__ == '__main__':...ec2_vpc_peer_describe.py
Source:ec2_vpc_peer_describe.py  
...148    HAS_BOTO3 = True149except ImportError:150    HAS_BOTO3 = False151def get_peer_connection(vpc, status, vpc_id, peer_vpc_id):152    result = vpc.describe_vpc_peering_connections(Filters=[153        {'Name': 'requester-vpc-info.vpc-id', 'Values': [vpc_id]},154        {'Name': 'accepter-vpc-info.vpc-id', 'Values': [peer_vpc_id]},155        {'Name': 'status-code', 'Values' : [status]}156    ])157    if result['VpcPeeringConnections'] == []:158        result = vpc.describe_vpc_peering_connections(Filters=[159            {'Name': 'requester-vpc-info.vpc-id', 'Values': [peer_vpc_id]},160            {'Name': 'accepter-vpc-info.vpc-id', 'Values': [vpc_id]},161            {'Name': 'status-code', 'Values' : [status]}162        ])163    return None if len(result['VpcPeeringConnections']) != 1 else result['VpcPeeringConnections'][0]164def list_peer_connections(vpc, status, vpc_id):165    result = vpc.describe_vpc_peering_connections(Filters=[166        {'Name': 'requester-vpc-info.vpc-id', 'Values': [vpc_id]},167        {'Name': 'status-code', 'Values' : [status]}168    ])169    if result['VpcPeeringConnections'] == []:170        result = vpc.describe_vpc_peering_connections(Filters=[171            {'Name': 'accepter-vpc-info.vpc-id', 'Values': [vpc_id]},172            {'Name': 'status-code', 'Values' : [status]}173        ])174    return result['VpcPeeringConnections']175def main():176    argument_spec = ec2_argument_spec()177    argument_spec.update(dict(178        vpc_id       = dict(),179        peer_vpc_id  = dict(),180        status       = dict(choices=['pending-acceptance', 'failed', 'expired', 'provisioning', 'active', 'deleted', 'rejected'], default='active')181    ),182    )183    module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True)184    if not HAS_BOTO3:...peering.py
Source:peering.py  
...27    i_am_plural = 'VPC peering connections'28    def __init__(self, accounts=None, debug=False):29        super(Peering, self).__init__(accounts=accounts, debug=debug)30    @record_exception()31    def describe_vpc_peering_connections(self, **kwargs):32        from security_monkey.common.sts_connect import connect33        conn = connect(kwargs['account_name'], 'boto3.ec2.client', region=kwargs['region'],34                       assumed_role=kwargs['assumed_role'])35        peering_info = self.wrap_aws_rate_limited_call(36            conn.describe_vpc_peering_connections)37        return peering_info38    def slurp(self):39        """40        :returns: item_list - list of vpc peerings.41        :returns: exception_map - A dict where the keys are a tuple containing the42            location of the exception and the value is the actual exception43        """44        self.prep_for_slurp()45        @iter_account_region(index=self.index, accounts=self.accounts, service_name='ec2')46        def slurp_items(**kwargs):47            item_list = []48            exception_map = {}49            kwargs['exception_map'] = exception_map50            app.logger.debug("Checking {}/{}/{}".format(self.index,51                                                        kwargs['account_name'], kwargs['region']))52            peering_info = self.describe_vpc_peering_connections(**kwargs)53            if peering_info:54                all_peerings = peering_info.get('VpcPeeringConnections', [])55                app.logger.debug("Found {} {}".format(56                    len(all_peerings), self.i_am_plural))57                for peering in all_peerings:58                    connection_id = peering['VpcPeeringConnectionId']59                    tags = peering.get('Tags')60                    peering_name = None61                    if tags:62                        peering_name = tags[0].get('Value', None)63                    if not (peering_name is None):64                        peering_name = "{0} ({1})".format(65                            peering_name.encode('utf-8', 'ignore'), connection_id)66                    else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
