Best Python code snippet using molecule_python
test_hashi_vault_connection_options.py
Source:test_hashi_vault_connection_options.py  
...35    def _cb(retry_action):36        pass37    return _cb38@pytest.fixture39def connection_options(adapter, retry_callback_generator):40    return HashiVaultConnectionOptions(adapter, retry_callback_generator)41class TestHashiVaultConnectionOptions(object):42    def test_connection_options_is_option_group(self, connection_options):43        assert issubclass(type(connection_options), HashiVaultOptionGroupBase)44    # _boolean_or_cacert tests45    # this method is the intersection of the validate_certs and ca_cert parameter46    # along with the VAULT_SKIP_VERIFY environment variable (see the function defintion).47    # The result is either a boolean, or a string, to be passed to the hvac client's48    # verify parameter.49    @pytest.mark.parametrize(50        'optpatch,envpatch,expected',51        [52            ({}, {}, True),53            ({}, {'VAULT_SKIP_VERIFY': 'true'}, False),54            ({}, {'VAULT_SKIP_VERIFY': 'false'}, True),55            ({}, {'VAULT_SKIP_VERIFY': 'invalid'}, True),56            ({'validate_certs': True}, {}, True),57            ({'validate_certs': True}, {'VAULT_SKIP_VERIFY': 'false'}, True),58            ({'validate_certs': True}, {'VAULT_SKIP_VERIFY': 'true'}, True),59            ({'validate_certs': True}, {'VAULT_SKIP_VERIFY': 'invalid'}, True),60            ({'validate_certs': False}, {}, False),61            ({'validate_certs': False}, {'VAULT_SKIP_VERIFY': 'false'}, False),62            ({'validate_certs': False}, {'VAULT_SKIP_VERIFY': 'true'}, False),63            ({'validate_certs': False}, {'VAULT_SKIP_VERIFY': 'invalid'}, False),64            ({'ca_cert': '/tmp/fake'}, {}, '/tmp/fake'),65            ({'ca_cert': '/tmp/fake'}, {'VAULT_SKIP_VERIFY': 'true'}, False),66            ({'ca_cert': '/tmp/fake'}, {'VAULT_SKIP_VERIFY': 'false'}, '/tmp/fake'),67            ({'ca_cert': '/tmp/fake'}, {'VAULT_SKIP_VERIFY': 'invalid'}, '/tmp/fake'),68            ({'ca_cert': '/tmp/fake', 'validate_certs': True}, {}, '/tmp/fake'),69            ({'ca_cert': '/tmp/fake', 'validate_certs': True}, {'VAULT_SKIP_VERIFY': 'false'}, '/tmp/fake'),70            ({'ca_cert': '/tmp/fake', 'validate_certs': True}, {'VAULT_SKIP_VERIFY': 'true'}, '/tmp/fake'),71            ({'ca_cert': '/tmp/fake', 'validate_certs': True}, {'VAULT_SKIP_VERIFY': 'invalid'}, '/tmp/fake'),72            ({'ca_cert': '/tmp/fake', 'validate_certs': False}, {}, False),73            ({'ca_cert': '/tmp/fake', 'validate_certs': False}, {'VAULT_SKIP_VERIFY': 'false'}, False),74            ({'ca_cert': '/tmp/fake', 'validate_certs': False}, {'VAULT_SKIP_VERIFY': 'true'}, False),75            ({'ca_cert': '/tmp/fake', 'validate_certs': False}, {'VAULT_SKIP_VERIFY': 'invalid'}, False),76        ]77    )78    def test_boolean_or_cacert(self, connection_options, predefined_options, adapter, optpatch, envpatch, expected):79        adapter.set_options(**optpatch)80        with mock.patch.dict(os.environ, envpatch):81            connection_options._boolean_or_cacert()82        assert predefined_options['ca_cert'] == expected83    # _process_option_proxies84    # proxies can be specified as a dictionary where key is protocol/scheme85    # and value is the proxy address. A dictionary can also be supplied as a string86    # representation of a dictionary in JSON format.87    # If a string is supplied that cannot be interpreted as a JSON dictionary, then it88    # is assumed to be a proxy address, and will be used as proxy for both the89    # http and https protocols.90    @pytest.mark.parametrize(91        'optproxies,expected',92        [93            (None, None),94            ('socks://thecat', {'http': 'socks://thecat', 'https': 'socks://thecat'}),95            ('{"http": "gopher://it"}', {'http': 'gopher://it'}),96            ({'https': "smtp://mail.aol.com"}, {'https': "smtp://mail.aol.com"}),97            ({'protoa': 'proxya', 'protob': 'proxyb', 'protoc': 'proxyc'}, {'protoa': 'proxya', 'protob': 'proxyb', 'protoc': 'proxyc'}),98            ('{"protoa": "proxya", "protob": "proxyb", "protoc": "proxyc"}', {'protoa': 'proxya', 'protob': 'proxyb', 'protoc': 'proxyc'}),99            ('{"protoa":"proxya","protob":"proxyb","protoc":"proxyc"}', {'protoa': 'proxya', 'protob': 'proxyb', 'protoc': 'proxyc'}),100        ]101    )102    def test_process_option_proxies(self, connection_options, predefined_options, adapter, optproxies, expected):103        adapter.set_option('proxies', optproxies)104        connection_options._process_option_proxies()105        assert predefined_options['proxies'] == expected106    # _process_option_retries107    # can be specified as a positive int or a dict108    # (or any string that can be interpreted as one of those)109    @pytest.mark.parametrize('opt_retries', ['plz retry', ('1', '1'), [True], -1, 1.0])110    def test_process_option_retries_invalid(self, connection_options, predefined_options, adapter, opt_retries):111        adapter.set_option('retries', opt_retries)112        with pytest.raises((TypeError, ValueError)):113            connection_options._process_option_retries()114    @pytest.mark.parametrize('opt_retries', [None, 0, '0'])115    def test_process_option_retries_none_result(self, connection_options, predefined_options, adapter, opt_retries):116        adapter.set_option('retries', opt_retries)117        connection_options._process_option_retries()118        assert predefined_options['retries'] is None119    @pytest.mark.parametrize('opt_retries', [1, '1', 10, '30'])120    def test_process_option_retries_from_number(self, connection_options, predefined_options, adapter, opt_retries):121        expected = connection_options._RETRIES_DEFAULT_PARAMS.copy()122        expected['total'] = int(float(opt_retries))123        adapter.set_option('retries', opt_retries)124        connection_options._process_option_retries()125        assert predefined_options['retries'] == expected126    @pytest.mark.parametrize(127        'opt_retries,expected',128        [129            ({}, {}),130            ('{}', {}),131            ({'total': 5}, {'total': 5}),132            ('{"total": 9}', {'total': 9}),133        ]134    )135    def test_process_option_retries_from_dict(self, connection_options, predefined_options, adapter, opt_retries, expected):136        adapter.set_option('retries', opt_retries)137        connection_options._process_option_retries()138        assert predefined_options['retries'] == expected139    # process_connection_options140    # this is the public function of the class meant to ensure all option processing is complete141    def test_process_connection_options(self, mocker, connection_options, adapter):142        # mock the internal methods we expect to be called143        f_process_late_binding_env_vars = mocker.patch.object(connection_options, 'process_late_binding_env_vars')144        f_boolean_or_cacert = mocker.patch.object(connection_options, '_boolean_or_cacert')145        f_process_option_proxies = mocker.patch.object(connection_options, '_process_option_proxies')146        f_process_option_retries = mocker.patch.object(connection_options, '_process_option_retries')147        # mock the adapter itself, so we can spy on adapter interactions148        # since we're mocking out the methods we expect to call, we shouldn't see any149        mock_adapter = mock.create_autospec(adapter)150        connection_options._options = mock_adapter151        connection_options.process_connection_options()152        # assert the expected methods have been called once153        f_process_late_binding_env_vars.assert_called_once()154        f_boolean_or_cacert.assert_called_once()155        f_process_option_proxies.assert_called_once()156        f_process_option_retries.assert_called_once()157        # aseert that the adapter had no interactions (because we mocked out everything we knew about)158        # the intention here is to catch a situation where process_connection_options has been modified159        # to do some new behavior, without modifying this test.160        assert mock_adapter.method_calls == [], 'Unexpected adapter interaction: %r' % mock_adapter.method_calls161    # get_hvac_connection_options162    # gets the dict of params to pass to the hvac Client constructor163    # based on the connection options we have in Ansible164    @pytest.mark.parametrize('opt_ca_cert', [None, '/tmp/fake'])165    @pytest.mark.parametrize('opt_validate_certs', [None, True, False])166    @pytest.mark.parametrize('opt_namespace', [None, 'namepsace1'])167    @pytest.mark.parametrize('opt_timeout', [None, 30])168    @pytest.mark.parametrize('opt_retries', [None, 0, 2, {'total': 3}, '{"total": 3}'])169    @pytest.mark.parametrize('opt_retry_action', ['ignore', 'warn'])170    @pytest.mark.parametrize('opt_proxies', [171        None, 'socks://noshow', '{"https": "https://prox", "http": "http://other"}', {'http': 'socks://one', 'https': 'socks://two'}172    ])173    def test_get_hvac_connection_options(174        self, connection_options, predefined_options, adapter,175        opt_ca_cert, opt_validate_certs, opt_proxies, opt_namespace, opt_timeout, opt_retries, opt_retry_action,176    ):177        option_set = {178            'ca_cert': opt_ca_cert,179            'validate_certs': opt_validate_certs,180            'proxies': opt_proxies,181            'namespace': opt_namespace,182            'timeout': opt_timeout,183            'retries': opt_retries,184            'retry_action': opt_retry_action,185        }186        adapter.set_options(**option_set)187        connection_options.process_connection_options()188        opts = connection_options.get_hvac_connection_options()189        # these two will get swallowed up to become 'verify'190        assert 'validate_certs' not in opts191        assert 'ca_cert' not in opts192        # retry_action is used/removed in the configuration of retries (session)193        assert 'retry_action' not in opts194        # retries will become session195        assert 'retries' not in opts196        # these should always be returned197        assert 'url' in opts and opts['url'] == predefined_options['url']198        assert 'verify' in opts and opts['verify'] == predefined_options['ca_cert']199        # these are optional200        assert 'proxies' not in opts or opts['proxies'] == predefined_options['proxies']201        assert 'namespace' not in opts or opts['namespace'] == predefined_options['namespace']202        assert 'timeout' not in opts or opts['timeout'] == predefined_options['timeout']203        assert 'session' not in opts or isinstance(opts['session'], Session)204    @mock.patch('ansible_collections.community.hashi_vault.plugins.module_utils._connection_options.HAS_RETRIES', new=False)205    def test_get_hvac_connection_options_retry_not_available(self, connection_options, adapter):206        adapter.set_option('retries', 2)207        connection_options.process_connection_options()208        with pytest.raises(NotImplementedError):209            connection_options.get_hvac_connection_options()210    def test_url_is_required(self, connection_options, adapter):211        adapter.set_option('url', None)212        with pytest.raises(HashiVaultValueError, match=r'Required option url was not set'):...gerrit.py
Source:gerrit.py  
1# -*- coding: utf-8 -*-2#3# Copyright (C) Red Hat, Inc4#5# Licensed under the Apache License, Version 2.0 (the "License"); you may6# not use this file except in compliance with the License. You may obtain7# a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the14# License for the specific language governing permissions and limitations15# under the License.16import json17import logging18import threading19import time20import paramiko21from paramiko.client import WarningPolicy22import settings23LOG = logging.getLogger()24class GerritEventsStream(threading.Thread):25    """26    This thread class is responsible to receive the Gerrit streams27    events in the background. Every message is pushed to a message queue that28    can be read by the main thread.29    """30    def __init__(self, event_queue, connection_options):31        """32        The event queue used to push the messages.33        """34        super(GerritEventsStream, self).__init__()35        self._event_queue = event_queue36        self._connection_options = connection_options37        self._running = True38    def run(self):39        while self._running:40            LOG.debug('%s: running %s' % (self.getName(), self._running))41            client = paramiko.SSHClient()42            client.load_system_host_keys()43            client.set_missing_host_key_policy(WarningPolicy())44            try:45                client.connect(hostname=self._connection_options['hostname'],46                               username=self._connection_options['username'],47                               port=self._connection_options['port'],48                               key_filename=self._connection_options['key_filename'],  # noqa49                               look_for_keys=True,50                               timeout=5000)51                client.get_transport().set_keepalive(60)52                _, stdout, _ = client.exec_command('gerrit stream-events -s comment-added')53                while self._running:54                    LOG.debug('%s: checking incoming data' % self.getName())55                    # check if there is some data in the underlying paramiko56                    # channel, this for the thread to not sleep on IO.57                    if stdout.channel.recv_ready():58                        event = stdout.readline()59                        json_event = json.loads(event)60                        if 'project' in json_event and json_event['project'] == 'dci-rhel-agent':61                            self._event_queue.put(json_event)62                    if self._running:63                        time.sleep(1)64                LOG.debug('%s: stop running' % self.getName())65            except Exception as e:66                LOG.exception('gerrit error: %s' % str(e))67            finally:68                client.close()69            if self._running:70                time.sleep(2)71        LOG.info('%s: terminated' % self.getName())72    def stop(self):73        """74        Stop the running thread.75        """76        self._running = False77def vote_on_review(review_number, patchset_version, vote, job_id):78    """79    Vote on a review given the review number, the patchet version and80    the vote status in (-1, 0, +1). This use the Verified label.81    """82    connection_options = {}83    connection_options['port'] = settings.GERRIT_PORT84    connection_options['username'] = settings.GERRIT_USERNAME85    connection_options['hostname'] = settings.GERRIT_HOSTNAME86    connection_options['key_filename'] = settings.GERRIT_SSH_KEY_FILENAME87    try:88        client = paramiko.SSHClient()89        client.load_system_host_keys()90        client.set_missing_host_key_policy(WarningPolicy())91        client.connect(hostname=connection_options['hostname'],92                       username=connection_options['username'],93                       port=connection_options['port'],94                       key_filename=connection_options['key_filename'],95                       look_for_keys=True,96                       timeout=5000)97        client.get_transport().set_keepalive(60)98        job_url = "https://www.distributed-ci.io/jobs/%s" % job_id99        if vote == 1:100            _, _, _ = client.exec_command('gerrit review --message "dci-third-party success ! %s" --verified %s %s,%s' % (job_url, vote, review_number, patchset_version))  # noqa101        else:102            _, _, _ = client.exec_command('gerrit review --message "dci-third-party failure ! %s" --verified %s %s,%s' % (job_url, vote, review_number, patchset_version))  # noqa103    except Exception as e:104            LOG.exception('gerrit error: %s' % str(e))105    finally:106        client.close()107def comment(review_number, patchset_version, comment):108    """109    Comment a review.110    """111    connection_options = {}112    connection_options['port'] = settings.GERRIT_PORT113    connection_options['username'] = settings.GERRIT_USERNAME114    connection_options['hostname'] = settings.GERRIT_HOSTNAME115    connection_options['key_filename'] = settings.GERRIT_SSH_KEY_FILENAME116    try:117        client = paramiko.SSHClient()118        client.load_system_host_keys()119        client.set_missing_host_key_policy(WarningPolicy())120        client.connect(hostname=connection_options['hostname'],121                       username=connection_options['username'],122                       port=connection_options['port'],123                       key_filename=connection_options['key_filename'],124                       look_for_keys=True,125                       timeout=5000)126        client.get_transport().set_keepalive(60)127        _, _, _ = client.exec_command('gerrit review --message "%s" %s,%s' % (comment, review_number, patchset_version))  # noqa128    except Exception as e:129            LOG.exception('gerrit error: %s' % str(e))130    finally:...models.py
Source:models.py  
1import pprint2import MySQLdb3from django.conf import settings4from django.db import models5from utils import models as utils_models, mysql_functions6class Environment(utils_models.TimeStampedModel):7    """Environment"""8    name = models.CharField(max_length=255, default='')9    class Meta:10        db_table = 'environments'11    def __unicode__(self):12        """Returns unicode representation of the object."""13        return self.name14class Server(utils_models.TimeStampedModel):15    """Server"""16    name = models.CharField(17        max_length=255, unique=True, default='')18    hostname = models.CharField(max_length=255, default='')19    environment = models.ForeignKey(20        Environment, null=True, blank=True, default=None,21        on_delete=models.SET_NULL)22    port = models.IntegerField(null=True, blank=True, default=None)23    class Meta:24        db_table = 'servers'25    def __unicode__(self):26        """Returns unicode representation of the object."""27        parts = []28        parts.append(self.name)29        parts.append(' [%s' % self.hostname)30        if self.port is None:31            parts.append(']')32        else:33            parts.append(':%s]' % self.port)34        if self.environment:35            parts.append(', environment=%s' % self.environment)36        return ''.join(parts)37    def schema_exists(self, schema_name, connection_options=None):38        return schema_name in self.get_schema_list(connection_options)39    def get_schema_list(self, connection_options=None):40        if connection_options is None:41            connection_options = {42                'user': settings.MYSQL_USER,43                'passwd': settings.MYSQL_PASSWORD44            }45        connection_options = connection_options.copy()46        connection_options.update({'host': self.hostname})47        if self.port:48            connection_options['port'] = self.port49        conn = MySQLdb.connect(**connection_options)50        schema_list = []51        with conn as cur:52            cur.execute('SHOW DATABASES')53            rows = cur.fetchall()54            for row in rows:55                if row[0] not in ['information_schema', 'mysql']:56                    schema_list.append(row[0])57        return schema_list58    def dump_schema(self, schema_name, connection_options=None):59        if connection_options is None:60            connection_options = {61                'user': settings.MYSQL_USER,62                'passwd': settings.MYSQL_PASSWORD63            }64        connection_options = connection_options.copy()65        connection_options.update({'host': self.hostname})66        if self.port:67            connection_options['port'] = self.port68        return mysql_functions.dump_schema(schema_name, **connection_options)69class ServerData(utils_models.TimeStampedModel):70    server = models.ForeignKey(Server)71    database_schema = models.ForeignKey('schemaversions.DatabaseSchema')72    schema_exists = models.BooleanField(default=False)73    schema_version = models.ForeignKey(74        'schemaversions.SchemaVersion', null=True, blank=True, default=None,75        on_delete=models.SET_NULL)76    schema_version_diff = models.TextField(blank=True, default='')77    class Meta:78        db_table = 'server_data'...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
