Best Python code snippet using localstack_python
test_boto_elasticsearch_domain.py
Source:test_boto_elasticsearch_domain.py  
1# -*- coding: utf-8 -*-2# Import Python libs3from __future__ import absolute_import, print_function, unicode_literals4import copy5import logging6import random7import string8# Import Salt Testing libs9from tests.support.mixins import LoaderModuleMockMixin10from tests.support.unit import skipIf, TestCase11from tests.support.mock import (12    NO_MOCK,13    NO_MOCK_REASON,14    MagicMock,15    patch16)17# Import Salt libs18from salt.ext import six19import salt.loader20from salt.utils.versions import LooseVersion21import salt.modules.boto_elasticsearch_domain as boto_elasticsearch_domain22# Import 3rd-party libs23# pylint: disable=import-error,no-name-in-module24try:25    import boto326    from botocore.exceptions import ClientError27    HAS_BOTO = True28except ImportError:29    HAS_BOTO = False30from salt.ext.six.moves import range31# pylint: enable=import-error,no-name-in-module32# the boto_elasticsearch_domain module relies on the connect_to_region() method33# which was added in boto 2.8.034# https://github.com/boto/boto/commit/33ac26b416fbb48a60602542b4ce15dcc7029f1235required_boto3_version = '1.2.1'36def _has_required_boto():37    '''38    Returns True/False boolean depending on if Boto is installed and correct39    version.40    '''41    if not HAS_BOTO:42        return False43    elif LooseVersion(boto3.__version__) < LooseVersion(required_boto3_version):44        return False45    else:46        return True47if _has_required_boto():48    region = 'us-east-1'49    access_key = 'GKTADJGHEIQSXMKKRBJ08H'50    secret_key = 'askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs'51    conn_parameters = {'region': region, 'key': access_key, 'keyid': secret_key, 'profile': {}}52    error_message = 'An error occurred (101) when calling the {0} operation: Test-defined error'53    error_content = {54      'Error': {55        'Code': 101,56        'Message': "Test-defined error"57      }58    }59    not_found_error = ClientError({60        'Error': {61            'Code': 'ResourceNotFoundException',62            'Message': "Test-defined error"63        }64    }, 'msg')65    domain_ret = dict(DomainName='testdomain',66                      ElasticsearchClusterConfig={},67                      EBSOptions={},68                      AccessPolicies={},69                      SnapshotOptions={},70                      AdvancedOptions={})71log = logging.getLogger(__name__)72class BotoElasticsearchDomainTestCaseBase(TestCase, LoaderModuleMockMixin):73    conn = None74    def setup_loader_modules(self):75        self.opts = salt.config.DEFAULT_MINION_OPTS76        utils = salt.loader.utils(self.opts, whitelist=['boto3'], context={})77        return {boto_elasticsearch_domain: {'__utils__': utils}}78    def setUp(self):79        super(BotoElasticsearchDomainTestCaseBase, self).setUp()80        boto_elasticsearch_domain.__init__(self.opts)81        del self.opts82        # Set up MagicMock to replace the boto3 session83        # connections keep getting cached from prior tests, can't find the84        # correct context object to clear it. So randomize the cache key, to prevent any85        # cache hits86        conn_parameters['key'] = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(50))87        self.patcher = patch('boto3.session.Session')88        self.addCleanup(self.patcher.stop)89        self.addCleanup(delattr, self, 'patcher')90        mock_session = self.patcher.start()91        session_instance = mock_session.return_value92        self.conn = MagicMock()93        self.addCleanup(delattr, self, 'conn')94        session_instance.client.return_value = self.conn95class BotoElasticsearchDomainTestCaseMixin(object):96    pass97@skipIf(True, 'Skip these tests while investigating failures')98@skipIf(HAS_BOTO is False, 'The boto module must be installed.')99@skipIf(_has_required_boto() is False, 'The boto3 module must be greater than'100                                       ' or equal to version {0}'101        .format(required_boto3_version))102@skipIf(NO_MOCK, NO_MOCK_REASON)103class BotoElasticsearchDomainTestCase(BotoElasticsearchDomainTestCaseBase, BotoElasticsearchDomainTestCaseMixin):104    '''105    TestCase for salt.modules.boto_elasticsearch_domain module106    '''107    def test_that_when_checking_if_a_domain_exists_and_a_domain_exists_the_domain_exists_method_returns_true(self):108        '''109        Tests checking domain existence when the domain already exists110        '''111        result = boto_elasticsearch_domain.exists(DomainName='testdomain', **conn_parameters)112        self.assertTrue(result['exists'])113    def test_that_when_checking_if_a_domain_exists_and_a_domain_does_not_exist_the_domain_exists_method_returns_false(self):114        '''115        Tests checking domain existence when the domain does not exist116        '''117        self.conn.describe_elasticsearch_domain.side_effect = not_found_error118        result = boto_elasticsearch_domain.exists(DomainName='mydomain', **conn_parameters)119        self.assertFalse(result['exists'])120    def test_that_when_checking_if_a_domain_exists_and_boto3_returns_an_error_the_domain_exists_method_returns_error(self):121        '''122        Tests checking domain existence when boto returns an error123        '''124        self.conn.describe_elasticsearch_domain.side_effect = ClientError(error_content, 'list_domains')125        result = boto_elasticsearch_domain.exists(DomainName='mydomain', **conn_parameters)126        self.assertEqual(result.get('error', {}).get('message'), error_message.format('list_domains'))127    def test_that_when_checking_domain_status_and_a_domain_exists_the_domain_status_method_returns_info(self):128        '''129        Tests checking domain existence when the domain already exists130        '''131        self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}132        result = boto_elasticsearch_domain.status(DomainName='testdomain', **conn_parameters)133        self.assertTrue(result['domain'])134    def test_that_when_checking_domain_status_and_boto3_returns_an_error_the_domain_status_method_returns_error(self):135        '''136        Tests checking domain existence when boto returns an error137        '''138        self.conn.describe_elasticsearch_domain.side_effect = ClientError(error_content, 'list_domains')139        result = boto_elasticsearch_domain.status(DomainName='mydomain', **conn_parameters)140        self.assertEqual(result.get('error', {}).get('message'), error_message.format('list_domains'))141    def test_that_when_describing_domain_it_returns_the_dict_of_properties_returns_true(self):142        '''143        Tests describing parameters if domain exists144        '''145        domainconfig = {}146        for k, v in six.iteritems(domain_ret):147            if k == 'DomainName':148                continue149            domainconfig[k] = {'Options': v}150        self.conn.describe_elasticsearch_domain_config.return_value = {'DomainConfig': domainconfig}151        result = boto_elasticsearch_domain.describe(DomainName=domain_ret['DomainName'], **conn_parameters)152        log.warning(result)153        desired_ret = copy.copy(domain_ret)154        desired_ret.pop('DomainName')155        self.assertEqual(result, {'domain': desired_ret})156    def test_that_when_describing_domain_on_client_error_it_returns_error(self):157        '''158        Tests describing parameters failure159        '''160        self.conn.describe_elasticsearch_domain_config.side_effect = ClientError(error_content, 'list_domains')161        result = boto_elasticsearch_domain.describe(DomainName='testdomain', **conn_parameters)162        self.assertTrue('error' in result)163    def test_that_when_creating_a_domain_succeeds_the_create_domain_method_returns_true(self):164        '''165        tests True domain created.166        '''167        self.conn.create_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}168        args = copy.copy(domain_ret)169        args.update(conn_parameters)170        result = boto_elasticsearch_domain.create(**args)171        self.assertTrue(result['created'])172    def test_that_when_creating_a_domain_fails_the_create_domain_method_returns_error(self):173        '''174        tests False domain not created.175        '''176        self.conn.create_elasticsearch_domain.side_effect = ClientError(error_content, 'create_domain')177        args = copy.copy(domain_ret)178        args.update(conn_parameters)179        result = boto_elasticsearch_domain.create(**args)180        self.assertEqual(result.get('error', {}).get('message'), error_message.format('create_domain'))181    def test_that_when_deleting_a_domain_succeeds_the_delete_domain_method_returns_true(self):182        '''183        tests True domain deleted.184        '''185        result = boto_elasticsearch_domain.delete(DomainName='testdomain',186                                                    **conn_parameters)187        self.assertTrue(result['deleted'])188    def test_that_when_deleting_a_domain_fails_the_delete_domain_method_returns_false(self):189        '''190        tests False domain not deleted.191        '''192        self.conn.delete_elasticsearch_domain.side_effect = ClientError(error_content, 'delete_domain')193        result = boto_elasticsearch_domain.delete(DomainName='testdomain',194                                                    **conn_parameters)195        self.assertFalse(result['deleted'])196    def test_that_when_updating_a_domain_succeeds_the_update_domain_method_returns_true(self):197        '''198        tests True domain updated.199        '''200        self.conn.update_elasticsearch_domain_config.return_value = {'DomainConfig': domain_ret}201        args = copy.copy(domain_ret)202        args.update(conn_parameters)203        result = boto_elasticsearch_domain.update(**args)204        self.assertTrue(result['updated'])205    def test_that_when_updating_a_domain_fails_the_update_domain_method_returns_error(self):206        '''207        tests False domain not updated.208        '''209        self.conn.update_elasticsearch_domain_config.side_effect = ClientError(error_content, 'update_domain')210        args = copy.copy(domain_ret)211        args.update(conn_parameters)212        result = boto_elasticsearch_domain.update(**args)213        self.assertEqual(result.get('error', {}).get('message'), error_message.format('update_domain'))214    def test_that_when_adding_tags_succeeds_the_add_tags_method_returns_true(self):215        '''216        tests True tags added.217        '''218        self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}219        result = boto_elasticsearch_domain.add_tags(DomainName='testdomain', a='b', **conn_parameters)220        self.assertTrue(result['tagged'])221    def test_that_when_adding_tags_fails_the_add_tags_method_returns_false(self):222        '''223        tests False tags not added.224        '''225        self.conn.add_tags.side_effect = ClientError(error_content, 'add_tags')226        self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}227        result = boto_elasticsearch_domain.add_tags(DomainName=domain_ret['DomainName'], a='b', **conn_parameters)228        self.assertFalse(result['tagged'])229    def test_that_when_removing_tags_succeeds_the_remove_tags_method_returns_true(self):230        '''231        tests True tags removed.232        '''233        self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}234        result = boto_elasticsearch_domain.remove_tags(DomainName=domain_ret['DomainName'], TagKeys=['a'], **conn_parameters)235        self.assertTrue(result['tagged'])236    def test_that_when_removing_tags_fails_the_remove_tags_method_returns_false(self):237        '''238        tests False tags not removed.239        '''240        self.conn.remove_tags.side_effect = ClientError(error_content, 'remove_tags')241        self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}242        result = boto_elasticsearch_domain.remove_tags(DomainName=domain_ret['DomainName'], TagKeys=['b'], **conn_parameters)243        self.assertFalse(result['tagged'])244    def test_that_when_listing_tags_succeeds_the_list_tags_method_returns_true(self):245        '''246        tests True tags listed.247        '''248        self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}249        result = boto_elasticsearch_domain.list_tags(DomainName=domain_ret['DomainName'], **conn_parameters)250        self.assertEqual(result['tags'], {})251    def test_that_when_listing_tags_fails_the_list_tags_method_returns_false(self):252        '''253        tests False tags not listed.254        '''255        self.conn.list_tags.side_effect = ClientError(error_content, 'list_tags')256        self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}257        result = boto_elasticsearch_domain.list_tags(DomainName=domain_ret['DomainName'], **conn_parameters)...aws_es_info.py
Source:aws_es_info.py  
...94                return client.list_packages_for_domain(95                    DomainName=module.params['name'],96                ), False97        elif module.params['describe_elasticsearch_domain']:98            return client.describe_elasticsearch_domain(99                DomainName=module.params['name'],100            ), False101        elif module.params['describe_elasticsearch_domain_config']:102            return client.describe_elasticsearch_domain_config(103                DomainName=module.params['name'],104            ), False105        else:106            if client.can_paginate('list_domain_names'):107                paginator = client.get_paginator('list_domain_names')108                return paginator.paginate(), True109            else:110                return client.list_domain_names(), False111    except (BotoCoreError, ClientError) as e:112        module.fail_json_aws(e, msg='Failed to fetch Amazon ES details')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
