Best Python code snippet using localstack_python
test_amazon_driver.py
Source:test_amazon_driver.py  
1""" AmazonDriver for Compute2    based on BaseDriver for Compute Resource3"""4import mock5from botocore.exceptions import ClientError6from calplus.tests import base7from calplus.v1.compute.drivers.amazon import AmazonDriver8fake_config_driver = {9    'driver_name': 'AMAZON1',10    'aws_access_key_id': 'fake_id',11    'aws_secret_access_key': 'fake_key',12    'region_name': 'us-east-1',13    'endpoint_url': 'http://localhost:8788',14    'limit': {15    }16}17fake_error_code = {18    'ResponseMetadata': {19        'HTTPStatusCode': 400,20        'RequestId': 'req-a4570d1a-8319-4d6f-8077-7044d72ef449',21        'HTTPHeaders': {22            'date': 'Sat, 24 Sep 2016 03:09:39 GMT',23            'content-length': '250',24            'content-type': 'text/xml'25        }26    },27    'Error': {28        'Message': "The subnet ID 'fake_id' does not exist",29        'Code': 'InvalidSubnetID.NotFound'30    }31}32fake_describe_return = {33    'Reservations': [34        {35            'Groups': [],36            'Instances': [37                {38                    'AmiLaunchIndex': 0,39                    'ImageId': 'ami-2624df94',40                    'InstanceId': 'i-72827591',41                    'InstanceType': 'm1.ec2api-alt',42                    'KernelId': 'aki-512bee3f',43                    'KeyName': '',44                    'LaunchTime': 'fake_time',45                    'NetworkInterfaces': [46                        {47                            'Attachment': {48                                'AttachTime': 'fake_time',49                                'AttachmentId': 'eni-attach-ce1477aa',50                                'DeleteOnTermination': True,51                                'DeviceIndex': 0,52                                'Status': 'attached'53                            },54                            'Description': '',55                            'Groups': [56                                {57                                    'GroupId': 'sg-9ed2e7ec',58                                    'GroupName': 'default'59                                }60                            ],61                            'MacAddress': 'fa:16:3e:27:af:a2',62                            'NetworkInterfaceId': 'eni-ce1477aa',63                            'OwnerId': '3b91bb4e974a4729b3596f8cebb9b559',64                            'PrivateIpAddress': '10.10.10.76',65                            'PrivateIpAddresses': [66                                {67                                    'Primary': True,68                                    'PrivateIpAddress': '10.10.10.76'69                                }70                            ],71                            'SourceDestCheck': True,72                            'Status': 'in-use',73                            'SubnetId': 'subnet-b3a91954',74                            'VpcId': 'vpc-9ed2e7ec'75                        }76                    ],77                    'Placement': {78                        'AvailabilityZone': ''79                    },80                    'PrivateDnsName': 'r-atxu9l73-0',81                    'PrivateIpAddress': '10.10.10.76',82                    'PublicDnsName': '',83                    'RamdiskId': 'ari-b7e05ed6',84                    'RootDeviceName': '/dev/vda',85                    'RootDeviceType': 'instance-store',86                    'SecurityGroups': [87                        {88                            'GroupId': 'sg-9ed2e7ec',89                            'GroupName': 'default'90                        }91                    ],92                    'SourceDestCheck': True,93                    'State': {94                        'Code': 0,95                        'Name': 'error'96                    },97                    'SubnetId': 'subnet-b3a91954',98                    'VpcId': 'vpc-9ed2e7ec'99                }100            ],101            'OwnerId': '3b91bb4e974a4729b3596f8cebb9b559',102            'ReservationId': 'r-atxu9l73'103        }104    ],105    'ResponseMetadata': {106        'HTTPHeaders': {107            'content-length': '2971',108            'content-type': 'text/xml',109            'date': 'Wed, 07 Dec 2016 16:17:11 GMT'110        },111        'HTTPStatusCode': 200,112        'RequestId': 'req-646be08c-9104-4855-a0d9-62013ba9566d'113    }114}115class FakeInstance(object):116    """In fact, this class is boto3.resources.factory.ec2.Instance117    """118    def __init__(self):119        super(FakeInstance, self).__init__()120    def terminate(self):121        pass122    def stop(self):123        pass124    def start(self):125        pass126    def reboot(self):127        pass128class AmazonDriverTest(base.TestCase):129    """docstring for AmazonDriverTest"""130    def setUp(self):131        super(AmazonDriverTest, self).setUp()132        self.fake_driver = AmazonDriver(fake_config_driver)133    def test_create_successfully(self):134        self.mock_object(135            self.fake_driver.resource, 'create_instances',136            mock.Mock(return_value=mock.Mock))137        self.fake_driver.create(138            'fake_image_id',139            'fake_flavor_id',140            'fake_net_id',141            'fake_name'142        )143        self.fake_driver.resource.create_instances.\144            assert_called_once_with(145                ImageId='fake_image_id',146                MinCount=1,147                MaxCount=1,148                InstanceType='fake_flavor_id',149                SubnetId='fake_net_id',150                IamInstanceProfile={151                    'Arn': '',152                    'Name': 'fake_name'153                }154            )155    def test_create_without_instance_name(self):156        self.mock_object(157            self.fake_driver.resource, 'create_instances',158            mock.Mock(return_value=mock.Mock))159        self.fake_driver.create(160            'fake_image_id',161            'fake_flavor_id',162            'fake_net_id'163        )164        self.fake_driver.resource.create_instances. \165            assert_called_once_with(166                ImageId='fake_image_id',167                MinCount=1,168                MaxCount=1,169                InstanceType='fake_flavor_id',170                SubnetId='fake_net_id',171                IamInstanceProfile={172                    'Arn': '',173                    'Name': mock.ANY174                }175            )176    def test_create_unable_to_create_instance(self):177        self.mock_object(178            self.fake_driver.resource, 'create_instances',179            mock.Mock(side_effect=ClientError(180                fake_error_code,181                'operation_name'182            ))183        )184        self.assertRaises(ClientError, self.fake_driver.create,185                'fake_image_id',186                'fake_flavor_id',187                'fake_net_id',188                'fake_name')189        self.fake_driver.resource.create_instances. \190            assert_called_once_with(191                ImageId='fake_image_id',192                MinCount=1,193                MaxCount=1,194                InstanceType='fake_flavor_id',195                SubnetId='fake_net_id',196                IamInstanceProfile={197                    'Arn': '',198                    'Name': mock.ANY199                }200            )201    def test_show_successfully(self):202        self.mock_object(203            self.fake_driver.client, 'describe_instances',204            mock.Mock(return_value=fake_describe_return))205        self.fake_driver.show('fake_id')206        self.fake_driver.client.describe_instances. \207            assert_called_once_with(InstanceIds=['fake_id'])208    def test_show_unable_to_show(self):209        self.mock_object(210            self.fake_driver.client, 'describe_instances',211            mock.Mock(side_effect=ClientError(212                fake_error_code,213                'operation_name'214            ))215        )216        self.assertRaises(ClientError,217                          self.fake_driver.show, 'fake_id')218        self.fake_driver.client.describe_instances. \219            assert_called_once_with(InstanceIds=['fake_id'])220    def test_list_successfully(self):221        self.mock_object(222            self.fake_driver.client, 'describe_instances',223            mock.Mock(return_value=fake_describe_return))224        self.fake_driver.list()225        self.fake_driver.client.describe_instances. \226            assert_called_once_with()227    def test_list_unable_to_list(self):228        self.mock_object(229            self.fake_driver.client, 'describe_instances',230            mock.Mock(side_effect=ClientError(231                fake_error_code,232                'operation_name'233            ))234        )235        self.assertRaises(ClientError,236                          self.fake_driver.list)237        self.fake_driver.client.describe_instances. \238            assert_called_once_with()239    def test_delete_successfully(self):240        self.mock_object(241            self.fake_driver.resource, 'Instance',242            mock.Mock(return_value=FakeInstance()))243        self.fake_driver.delete('fake_id')244    def test_delete_unable_to_list(self):245        self.mock_object(246            self.fake_driver.resource, 'Instance',247            mock.Mock(side_effect=ClientError(248                fake_error_code,249                'operation_name'250            ))251        )252        self.assertRaises(ClientError,253                          self.fake_driver.delete, 'fake_id')254    def test_shutdown_successfully(self):255        self.mock_object(256            self.fake_driver.resource, 'Instance',257            mock.Mock(return_value=FakeInstance()))258        self.fake_driver.shutdown('fake_id')259    def test_shutdown_unable_to_list(self):260        self.mock_object(261            self.fake_driver.resource, 'Instance',262            mock.Mock(side_effect=ClientError(263                fake_error_code,264                'operation_name'265            ))266        )267        self.assertRaises(ClientError,268                          self.fake_driver.shutdown, 'fake_id')269    def test_start_successfully(self):270        self.mock_object(271            self.fake_driver.resource, 'Instance',272            mock.Mock(return_value=FakeInstance()))273        self.fake_driver.start('fake_id')274    def test_start_unable_to_list(self):275        self.mock_object(276            self.fake_driver.resource, 'Instance',277            mock.Mock(side_effect=ClientError(278                fake_error_code,279                'operation_name'280            ))281        )282        self.assertRaises(ClientError,283                          self.fake_driver.start, 'fake_id')284    def test_reboot_successfully(self):285        self.mock_object(286            self.fake_driver.resource, 'Instance',287            mock.Mock(return_value=FakeInstance()))288        self.fake_driver.reboot('fake_id')289    def test_reboot_unable_to_list(self):290        self.mock_object(291            self.fake_driver.resource, 'Instance',292            mock.Mock(side_effect=ClientError(293                fake_error_code,294                'operation_name'295            ))296        )297        self.assertRaises(ClientError,298                          self.fake_driver.reboot, 'fake_id')299    def test_add_nic_successfully(self):300        self.mock_object(301            self.fake_driver.client, 'attach_network_interface',302            mock.Mock(return_value=mock.Mock))303        self.fake_driver.add_nic('fake_id', 'fake_net_id')304        self.fake_driver.client.attach_network_interface. \305            assert_called_once_with('fake_id', 'fake_net_id', 1)306    def test_add_nic_unable_to_add(self):307        self.mock_object(308            self.fake_driver.client, 'attach_network_interface',309            mock.Mock(side_effect=ClientError(310                fake_error_code,311                'operation_name'312            )))313        self.assertRaises(ClientError,314                          self.fake_driver.add_nic, 'fake_id', 'fake_net_id')315        self.fake_driver.client.attach_network_interface. \316            assert_called_once_with('fake_id', 'fake_net_id', 1)317    def test_delete_nic_successfully(self):318        self.mock_object(319            self.fake_driver.client, 'detach_network_interface',320            mock.Mock(return_value=mock.Mock))321        self.fake_driver.delete_nic('fake_id', 'fake_attachment_id')322        self.fake_driver.client.detach_network_interface. \323            assert_called_once_with('fake_attachment_id')324    def test_delete_nic_unable_to_delete(self):325        self.mock_object(326            self.fake_driver.client, 'detach_network_interface',327            mock.Mock(side_effect=ClientError(328                fake_error_code,329                'operation_name'330            )))331        self.assertRaises(ClientError,332            self.fake_driver.delete_nic, 'fake_id', 'fake_attachment_id')333        self.fake_driver.client.detach_network_interface. \334            assert_called_once_with('fake_attachment_id')335    def test_list_nic_successfully(self):336        self.mock_object(337            self.fake_driver.client, 'describe_instances',338            mock.Mock(return_value=fake_describe_return))339        self.fake_driver.list_nic('fake_id')340        self.fake_driver.client.describe_instances. \341            assert_called_once_with(InstanceIds=['fake_id'])342    def test_list_nic_unable_to_list(self):343        self.mock_object(344            self.fake_driver.client, 'describe_instances',345            mock.Mock(side_effect=ClientError(346                fake_error_code,347                'operation_name'348            )))349        self.assertRaises(ClientError,350                          self.fake_driver.list_nic, 'fake_id')351        self.fake_driver.client.describe_instances. \352            assert_called_once_with(InstanceIds=['fake_id'])353    def test_list_ip_successfully(self):354        self.mock_object(355            self.fake_driver.client, 'describe_instances',356            mock.Mock(return_value=fake_describe_return))357        self.fake_driver.list_ip('fake_id')358        self.fake_driver.client.describe_instances. \359            assert_called_once_with(InstanceIds=['fake_id'])360    def test_list_ip_unable_to_delete(self):361        self.mock_object(362            self.fake_driver.client, 'describe_instances',363            mock.Mock(side_effect=ClientError(364                fake_error_code,365                'operation_name'366            )))367        self.assertRaises(ClientError,368                          self.fake_driver.list_ip, 'fake_id')369        self.fake_driver.client.describe_instances. \...aws.py
Source:aws.py  
...49        status = get_network_interface_state(network_interface)50        return status == 'attached'51    except KeyError:52        return False53def detach_network_interface(network_interface):54    def get_attachment_id(boto_client, interface):55        response = boto_client.describe_network_interfaces(56            NetworkInterfaceIds=[57                interface58            ]59        )60        return response['NetworkInterfaces'][0]['Attachment']['AttachmentId']61    client = boto3.client('ec2')62    client.detach_network_interface(63        AttachmentId=get_attachment_id(client, network_interface)64    )65def ensure_network_interface_is_detached(network_interface):66    try:67        while get_network_interface_state(network_interface) != 'detached':68            time.sleep(1)69    except KeyError:70        pass71def attach_network_interface(network_interface, instance_id):72    client = boto3.client('ec2')73    for _ in xrange(10):74        try:75            client.attach_network_interface(76                NetworkInterfaceId=network_interface,77                InstanceId=instance_id,78                DeviceIndex=DEVICE_INDEX79            )80        except ClientError:81            time.sleep(1)82def configure_local_interface(local_interface, ip, netmask):83    cmd = [84        'ifconfig',85        local_interface,86        'inet',87        ip,88        'netmask',89        netmask90    ]91    for _ in xrange(10):92        os.environ['PATH'] = "%s:/sbin" % os.environ['PATH']93        env = os.environ94        proc = Popen(cmd, env=env)95        proc.communicate()96        if proc.returncode:97            time.sleep(1)98        else:99            return100def aws_notify_master(cfg):101    """The function moves network interface to local instance and brings it up.102    Steps:103    - Detach network interface if attached to anywhere.104    - Attach the network interface to the local instance.105    - Configure IP address on this instance106    :param cfg: config object107    """108    try:109        os.environ['AWS_ACCESS_KEY_ID'] = cfg.get('aws', 'aws_access_key_id')110        os.environ['AWS_SECRET_ACCESS_KEY'] = cfg.get('aws',111                                                      'aws_secret_access_key')112        os.environ['AWS_DEFAULT_REGION'] = cfg.get('aws', 'aws_default_region')113    except NoOptionError:114        LOG.error('aws_access_key_id, aws_secret_access_key and '115                  'aws_default_region must be defined in '116                  'aws section of the config file.')117        exit(-1)118    instance_id = get_my_instance_id()119    try:120        ip = cfg.get('proxysql', 'virtual_ip')121        netmask = cfg.get('proxysql', 'virtual_netmask')122        network_interface = get_network_interface(ip)123        if network_interface_attached(network_interface):124            detach_network_interface(network_interface)125        local_interface = "eth%d" % DEVICE_INDEX126        ensure_local_interface_is_gone(local_interface)127        ensure_network_interface_is_detached(network_interface)128        attach_network_interface(network_interface, instance_id)129        configure_local_interface(local_interface, ip, netmask)130    except NoOptionError as err:131        LOG.error('virtual_ip and virtual_netmask must be defined in '132                  'proxysql section of the config file.')...test_aws.py
Source:test_aws.py  
...25def test__network_interface_attached_if_side_effect_is_key_error(mocker):26    mock_get_interface_state = mocker.patch('proxysql_tools.aws.aws.get_network_interface_state')27    mock_get_interface_state.side_effect = KeyError28    assert not network_interface_attached('1.1.1.1')29def test__detach_network_interface(mocker):30    mock_boto3 = mocker.patch('proxysql_tools.aws.aws.boto3')31    mock_ec2 = MagicMock()32    mock_boto3.client.return_value = mock_ec233    detach_network_interface('stub')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
