How to use _fill_credentials method in tempest

Best Python code snippet using tempest_python

auth.py

Source:auth.py Github

copy

Full Screen

...52 @abc.abstractmethod53 def _get_auth(self):54 return55 @abc.abstractmethod56 def _fill_credentials(self, auth_data_body):57 return58 def fill_credentials(self):59 """60 Fill credentials object with data from auth61 """62 auth_data = self.get_auth()63 self._fill_credentials(auth_data[1])64 return self.credentials65 @classmethod66 def check_credentials(cls, credentials):67 """68 Verify credentials are valid.69 """70 return isinstance(credentials, Credentials) and credentials.is_valid()71 @property72 def auth_data(self):73 return self.get_auth()74 @auth_data.deleter75 def auth_data(self):76 self.clear_auth()77 def get_auth(self):78 """79 Returns auth from cache if available, else auth first80 """81 if self.cache is None or self.is_expired(self.cache):82 self.set_auth()83 return self.cache84 def set_auth(self):85 """86 Forces setting auth, ignores cache if it exists.87 Refills credentials88 """89 self.cache = self._get_auth()90 self._fill_credentials(self.cache[1])91 def clear_auth(self):92 """93 Can be called to clear the access cache so that next request94 will fetch a new token and base_url.95 """96 self.cache = None97 self.credentials.reset()98 @abc.abstractmethod99 def is_expired(self, auth_data):100 return101 def auth_request(self, method, url, headers=None, body=None, filters=None):102 """103 Obtains auth data and decorates a request with that.104 :param method: HTTP method of the request105 :param url: relative URL of the request (path)106 :param headers: HTTP headers of the request107 :param body: HTTP body in case of POST / PUT108 :param filters: select a base URL out of the catalog109 :returns a Tuple (url, headers, body)110 """111 orig_req = dict(url=url, headers=headers, body=body)112 auth_url, auth_headers, auth_body = self._decorate_request(113 filters, method, url, headers, body)114 auth_req = dict(url=auth_url, headers=auth_headers, body=auth_body)115 # Overwrite part if the request if it has been requested116 if self.alt_part is not None:117 if self.alt_auth_data is not None:118 alt_url, alt_headers, alt_body = self._decorate_request(119 filters, method, url, headers, body,120 auth_data=self.alt_auth_data)121 alt_auth_req = dict(url=alt_url, headers=alt_headers,122 body=alt_body)123 auth_req[self.alt_part] = alt_auth_req[self.alt_part]124 else:125 # If alt auth data is None, skip auth in the requested part126 auth_req[self.alt_part] = orig_req[self.alt_part]127 # Next auth request will be normal, unless otherwise requested128 self.reset_alt_auth_data()129 return auth_req['url'], auth_req['headers'], auth_req['body']130 def reset_alt_auth_data(self):131 """132 Configure auth provider to provide valid authentication data133 """134 self.alt_part = None135 self.alt_auth_data = None136 def set_alt_auth_data(self, request_part, auth_data):137 """138 Configure auth provider to provide alt authentication data139 on a part of the *next* auth_request. If credentials are None,140 set invalid data.141 :param request_part: request part to contain invalid auth: url,142 headers, body143 :param auth_data: alternative auth_data from which to get the144 invalid data to be injected145 """146 self.alt_part = request_part147 self.alt_auth_data = auth_data148 @abc.abstractmethod149 def base_url(self, filters, auth_data=None):150 """151 Extracts the base_url based on provided filters152 """153 return154class KeystoneAuthProvider(AuthProvider):155 token_expiry_threshold = datetime.timedelta(seconds=60)156 def __init__(self, credentials, auth_url,157 disable_ssl_certificate_validation=None,158 ca_certs=None, trace_requests=None):159 super(KeystoneAuthProvider, self).__init__(credentials)160 self.dsvm = disable_ssl_certificate_validation161 self.ca_certs = ca_certs162 self.trace_requests = trace_requests163 self.auth_client = self._auth_client(auth_url)164 def _decorate_request(self, filters, method, url, headers=None, body=None,165 auth_data=None):166 if auth_data is None:167 auth_data = self.auth_data168 token, _ = auth_data169 base_url = self.base_url(filters=filters, auth_data=auth_data)170 # build authenticated request171 # returns new request, it does not touch the original values172 _headers = copy.deepcopy(headers) if headers is not None else {}173 _headers['X-Auth-Token'] = str(token)174 if url is None or url == "":175 _url = base_url176 else:177 # Join base URL and url, and remove multiple contiguous slashes178 _url = "/".join([base_url, url])179 parts = [x for x in urlparse.urlparse(_url)]180 parts[2] = re.sub("/{2,}", "/", parts[2])181 _url = urlparse.urlunparse(parts)182 # no change to method or body183 return str(_url), _headers, body184 @abc.abstractmethod185 def _auth_client(self):186 return187 @abc.abstractmethod188 def _auth_params(self):189 return190 def _get_auth(self):191 # Bypasses the cache192 auth_func = getattr(self.auth_client, 'get_token')193 auth_params = self._auth_params()194 # returns token, auth_data195 token, auth_data = auth_func(**auth_params)196 return token, auth_data197 def get_token(self):198 return self.auth_data[0]199class KeystoneV2AuthProvider(KeystoneAuthProvider):200 EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'201 def _auth_client(self, auth_url):202 return json_v2id.TokenClientJSON(203 auth_url, disable_ssl_certificate_validation=self.dsvm,204 ca_certs=self.ca_certs, trace_requests=self.trace_requests)205 def _auth_params(self):206 return dict(207 user=self.credentials.username,208 password=self.credentials.password,209 tenant=self.credentials.tenant_name,210 auth_data=True)211 def _fill_credentials(self, auth_data_body):212 tenant = auth_data_body['token']['tenant']213 user = auth_data_body['user']214 if self.credentials.tenant_name is None:215 self.credentials.tenant_name = tenant['name']216 if self.credentials.tenant_id is None:217 self.credentials.tenant_id = tenant['id']218 if self.credentials.username is None:219 self.credentials.username = user['name']220 if self.credentials.user_id is None:221 self.credentials.user_id = user['id']222 def base_url(self, filters, auth_data=None):223 """224 Filters can be:225 - service: compute, image, etc226 - region: the service region227 - endpoint_type: adminURL, publicURL, internalURL228 - api_version: replace catalog version with this229 - skip_path: take just the base URL230 """231 if auth_data is None:232 auth_data = self.auth_data233 token, _auth_data = auth_data234 service = filters.get('service')235 region = filters.get('region')236 endpoint_type = filters.get('endpoint_type', 'publicURL')237 if service is None:238 raise exceptions.EndpointNotFound("No service provided")239 _base_url = None240 for ep in _auth_data['serviceCatalog']:241 if ep["type"] == service:242 for _ep in ep['endpoints']:243 if region is not None and _ep['region'] == region:244 _base_url = _ep.get(endpoint_type)245 if not _base_url:246 # No region matching, use the first247 _base_url = ep['endpoints'][0].get(endpoint_type)248 break249 if _base_url is None:250 raise exceptions.EndpointNotFound(service)251 parts = urlparse.urlparse(_base_url)252 if filters.get('api_version', None) is not None:253 path = "/" + filters['api_version']254 noversion_path = "/".join(parts.path.split("/")[2:])255 if noversion_path != "":256 path += "/" + noversion_path257 _base_url = _base_url.replace(parts.path, path)258 if filters.get('skip_path', None) is not None and parts.path != '':259 _base_url = _base_url.replace(parts.path, "/")260 return _base_url261 def is_expired(self, auth_data):262 _, access = auth_data263 expiry = datetime.datetime.strptime(access['token']['expires'],264 self.EXPIRY_DATE_FORMAT)265 return expiry - self.token_expiry_threshold <= \266 datetime.datetime.utcnow()267class KeystoneV3AuthProvider(KeystoneAuthProvider):268 EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'269 def _auth_client(self, auth_url):270 return json_v3id.V3TokenClientJSON(271 auth_url, disable_ssl_certificate_validation=self.dsvm,272 ca_certs=self.ca_certs, trace_requests=self.trace_requests)273 def _auth_params(self):274 return dict(275 user_id=self.credentials.user_id,276 username=self.credentials.username,277 password=self.credentials.password,278 project_id=self.credentials.project_id,279 project_name=self.credentials.project_name,280 user_domain_id=self.credentials.user_domain_id,281 user_domain_name=self.credentials.user_domain_name,282 project_domain_id=self.credentials.project_domain_id,283 project_domain_name=self.credentials.project_domain_name,284 domain_id=self.credentials.domain_id,285 domain_name=self.credentials.domain_name,286 auth_data=True)287 def _fill_credentials(self, auth_data_body):288 # project or domain, depending on the scope289 project = auth_data_body.get('project', None)290 domain = auth_data_body.get('domain', None)291 # user is always there292 user = auth_data_body['user']293 # Set project fields294 if project is not None:295 if self.credentials.project_name is None:296 self.credentials.project_name = project['name']297 if self.credentials.project_id is None:298 self.credentials.project_id = project['id']299 if self.credentials.project_domain_id is None:300 self.credentials.project_domain_id = project['domain']['id']301 if self.credentials.project_domain_name is None:...

Full Screen

Full Screen

web.py

Source:web.py Github

copy

Full Screen

...28 logging.info(29 "Page loading stopped directly, " "cause of load time is out-of-date"30 )31 return driver32 def _fill_credentials(self, driver):33 logger.info("Filling up credentials inputs")34 login_input = driver.find_element_by_id("txtLogin")35 password_input = driver.find_element_by_id("txtPassword")36 login_input.send_keys(KASPI_USERNAME)37 password_input.send_keys(KASPI_PASSWORD)38 return driver39 def _confirm_code(self, driver):40 logger.info("Starting confirm code operations")41 login_button = driver.find_element_by_class_name("entrance__loginButton")42 login_button.click()43 time.sleep(20)44 code = get_confirmation_code()45 code_chars = [num for num in code]46 driver.find_element_by_id("txtOtpChar1").send_keys(code_chars[0])47 driver.find_element_by_id("txtOtpChar2").send_keys(code_chars[1])48 driver.find_element_by_id("txtOtpChar3").send_keys(code_chars[2])49 driver.find_element_by_id("txtOtpChar4").send_keys(code_chars[3])50 return driver51 def login(self):52 delay = 1053 logger.info("Opening the chrome")54 driver = webdriver.Chrome()55 driver.set_page_load_timeout(40)56 driver = self._get_login_page(driver=driver, delay=delay)57 driver = self._fill_credentials(driver=driver)58 driver = self._confirm_code(driver=driver)59 return driver60 def logout(self, driver, delay):61 logger.info("Log outing from system")62 driver.find_element_by_id("headerAuth").click()63 WebDriverWait(driver, delay).until(64 ec.presence_of_element_located(65 (By.CLASS_NAME, "headerSettings__item--logOut")66 )67 )68 driver.find_element_by_class_name("headerSettings__item--logOut").click()69class KaspiTransactions:70 def __init__(71 self, auth_service: KaspiAuth, links: Optional[Type[KaspiLinks]] = None...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful