How to use credentials_provider method in tempest

Best Python code snippet using tempest_python

auth.py

Source:auth.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import datetime3import hmac4import logging5from hashlib import sha2566from urllib.parse import quote7from .consts import DATE_FORMAT, UNSIGNED_PAYLOAD8from .credential import FederationCredentials, StaticCredentials9from .utils import to_bytes10logger = logging.getLogger(__name__)11def _canonical_query_string_params(params):12 results = []13 for param in sorted(params):14 value = str(params[param])15 results.append('%s=%s' % (quote(param, safe='-_.~'),16 quote(value, safe='-_.~')))17 cqs = '&'.join(results)18 return cqs19def _signed_headers(headers):20 hl = sorted(headers.items(), key=lambda d: d[0].lower())21 vl = []22 for v in hl:23 vl.append(v[0].lower())24 return ';'.join(vl)25def _canonical_headers(headers):26 hl = sorted(headers.items(), key=lambda d: d[0].lower())27 s = ''28 for val in hl:29 if isinstance(val[1], list):30 tlist = sorted(val[1])31 for v in tlist:32 s += val[0] + ':' + v + '\n'33 else:34 s += val[0].lower() + ':' + str(val[1]) + '\n'35 return s36def _canonical_request(req):37 cr = [req.method.upper(), quote(req.path, safe='/~'), _canonical_query_string_params(req.params),38 _canonical_headers(req.headers), _signed_headers(req.headers)]39 if req.headers.get('x-tos-content-sha256'):40 cr.append(req.headers['x-tos-content-sha256'])41 else:42 cr.append(UNSIGNED_PAYLOAD)43 return '\n'.join(cr)44def _param_to_quoted_query(k, v):45 if v:46 return quote(str(k), '') + '=' + quote(str(v), '')47 else:48 return quote(k, '/~')49def _sign(key, msg, hex=False):50 if hex:51 sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest()52 else:53 sig = hmac.new(key, msg.encode('utf-8'), sha256).digest()54 return sig55class AuthBase():56 def __init__(self, credentials_provider, region):57 self.credentials_provider = credentials_provider58 self.region = region.strip()59 self.credential = None60 def sign_request(self, req):61 self.credential = self.credentials_provider.get_credentials()62 if self.credential.get_security_token():63 req.headers["x-tos-security-token"] = self.credential.get_security_token()64 date = datetime.datetime.utcnow().strftime(DATE_FORMAT)65 req.headers['Date'] = date66 req.headers['x-tos-date'] = date67 signature = self._make_signature(req, date)68 req.headers['Authorization'] = self._inject_signature_to_request(req, signature, date)69 def sign_url(self, req, expires):70 if expires is None:71 expires = 60 * 6072 date = datetime.datetime.utcnow().strftime(DATE_FORMAT)73 self.credential = self.credentials_provider.get_credentials()74 req.params['X-Tos-Algorithm'] = 'TOS4-HMAC-SHA256'75 req.params['X-Tos-Credential'] = self._credential(date)76 req.params['X-Tos-Date'] = date77 req.params['X-Tos-Expires'] = expires78 req.params['X-Tos-SignedHeaders'] = _signed_headers(req.headers)79 if self.credential.get_security_token():80 req.params["X-Tos-Security-Token"] = self.credential.get_security_token()81 req.params['X-Tos-Signature'] = self._make_signature(req, date)82 return req.url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in req.params.items())83 def _make_signature(self, req, date):84 canonical_request = _canonical_request(req)85 logger.debug("pre-request: canonical_request:\n%s", canonical_request)86 string_to_sign = self._string_to_sign(canonical_request, date)87 logger.debug("pre-request: string_to_sign:\n%s", string_to_sign)88 signature = self._signature(string_to_sign, date)89 logger.debug("pre-request: signature:\n%s", signature)90 return signature91 def _inject_signature_to_request(self, req, signature, date):92 results = ['TOS4-HMAC-SHA256 Credential=%s' % self._credential(date),93 'SignedHeaders=%s' % _signed_headers(req.headers), 'Signature=%s' % signature]94 return ', '.join(results)95 def _string_to_sign(self, canonical_request, date):96 sts = ['TOS4-HMAC-SHA256', date, self._credential_scope(date),97 sha256(canonical_request.encode('utf-8')).hexdigest()]98 return '\n'.join(sts)99 def _credential(self, date):100 return "{0}/{1}/{2}/tos/request".format(self.credential.get_access_key_id(), date[0:8], self.region)101 def _credential_scope(self, date):102 return "{0}/{1}/tos/request".format(date[0:8], self.region)103 def _signature(self, string_to_sign, date):104 k_date = _sign(to_bytes(self.credential.get_access_key_secret()), date[0:8])105 k_region = _sign(k_date, self.region)106 k_service = _sign(k_region, 'tos')107 k_signing = _sign(k_service, 'request')108 return _sign(k_signing, string_to_sign, hex=True)109 def copy(self):110 if not isinstance(self.credentials_provider, StaticCredentials):111 return None112 provider = self.credentials_provider.credentials113 return provider.access_key_id, provider.access_key_secret, provider.security_token, self.region114class Auth(AuthBase):115 def __init__(self, access_key_id, access_key_secret, region, sts=None):116 super(Auth, self).__init__(StaticCredentials(access_key_id, access_key_secret, sts), region)117class FederationAuth(AuthBase):118 def __init__(self, credentials_provider: FederationCredentials, region: str):...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2#3# mete0r.mailer : mete0r's mailer4# Copyright (C) 2014 mete0r <mete0r@sarangbang.or.kr>5#6# This program is free software: you can redistribute it and/or modify7# it under the terms of the GNU Affero General Public License as published by8# the Free Software Foundation, either version 3 of the License, or9# (at your option) any later version.10#11# This program is distributed in the hope that it will be useful,12# but WITHOUT ANY WARRANTY; without even the implied warranty of13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the14# GNU Affero General Public License for more details.15#16# You should have received a copy of the GNU Affero General Public License17# along with this program. If not, see <http://www.gnu.org/licenses/>.18#19import base6420XOAUTH2_SCOPE = 'https://mail.google.com/'21class XOAuth2(object):22 def __init__(self, credentials_provider):23 self.credentials_provider = credentials_provider24 @classmethod25 def from_settings(cls, settings, prefix):26 credentials_provider = credentials_from_settings(settings, prefix)27 return cls(credentials_provider=credentials_provider)28 def authenticate(self, connection):29 email, access_token = self.credentials_provider.get_credentials()30 s = make_xoauth2_string(email, access_token)31 return connection.docmd('AUTH', 'XOAUTH2 ' + base64.b64encode(s))32def make_xoauth2_string(username, access_token):33 return 'user=%s\x01auth=Bearer %s\x01\x01' % (username, access_token)34def credentials_from_settings(settings, prefix):35 credentials_provider = settings.get(prefix + 'credentials')36 prefix += 'credentials.'37 if credentials_provider == 'offline':38 from .offline import Offline39 return Offline.from_settings(settings, prefix)40 elif credentials_provider == 'goauthc':41 from .goauthc import GOAuthc42 return GOAuthc.from_settings(settings, prefix)...

Full Screen

Full Screen

test_credentials.py

Source:test_credentials.py Github

copy

Full Screen

...9@pytest.fixture(name='mock_reader')10def _mock_reader():11 return MagicMock(name='reader')12@pytest.fixture(name='credentials_provider')13def _credentials_provider(mock_reader, monkeypatch):14 monkeypatch.setenv('LOCAL_JIRA_USER', '')15 monkeypatch.setenv('LOCAL_JIRA_PASS', '')16 monkeypatch.setenv('ENV_JIRA_USER', 'test_user')17 monkeypatch.setenv('ENV_JIRA_PASS', 'test_password')18 provider = CredentialsProvider(JiraEnvironment.Dev)19 monkeypatch.setattr(CredentialsProvider, '_read_file', mock_reader)20 return provider21def test_load_local_dev_none(credentials_provider):22 # pylint: disable=protected-access23 assert credentials_provider._load_local_dev() is None24def test_load_local_dev_read(monkeypatch, credentials_provider, mock_reader):25 monkeypatch.setenv('LOCAL_JIRA_USER', '/path/to/user')26 monkeypatch.setenv('LOCAL_JIRA_PASS', '/path/to/pass')27 # pylint: disable=protected-access...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful