Best Python code snippet using tempest_python
auth.py
Source:auth.py  
...52    @abc.abstractmethod53    def _get_auth(self):54        return55    @abc.abstractmethod56    def _fill_credentials(self, auth_data_body):57        return58    def fill_credentials(self):59        """60        Fill credentials object with data from auth61        """62        auth_data = self.get_auth()63        self._fill_credentials(auth_data[1])64        return self.credentials65    @classmethod66    def check_credentials(cls, credentials):67        """68        Verify credentials are valid.69        """70        return isinstance(credentials, Credentials) and credentials.is_valid()71    @property72    def auth_data(self):73        return self.get_auth()74    @auth_data.deleter75    def auth_data(self):76        self.clear_auth()77    def get_auth(self):78        """79        Returns auth from cache if available, else auth first80        """81        if self.cache is None or self.is_expired(self.cache):82            self.set_auth()83        return self.cache84    def set_auth(self):85        """86        Forces setting auth, ignores cache if it exists.87        Refills credentials88        """89        self.cache = self._get_auth()90        self._fill_credentials(self.cache[1])91    def clear_auth(self):92        """93        Can be called to clear the access cache so that next request94        will fetch a new token and base_url.95        """96        self.cache = None97        self.credentials.reset()98    @abc.abstractmethod99    def is_expired(self, auth_data):100        return101    def auth_request(self, method, url, headers=None, body=None, filters=None):102        """103        Obtains auth data and decorates a request with that.104        :param method: HTTP method of the request105        :param url: relative URL of the request (path)106        :param headers: HTTP headers of the request107        :param body: HTTP body in case of POST / PUT108        :param filters: select a base URL out of the catalog109        :returns a Tuple (url, headers, body)110        """111        orig_req = dict(url=url, headers=headers, body=body)112        auth_url, auth_headers, auth_body = self._decorate_request(113            filters, method, url, headers, body)114        auth_req = dict(url=auth_url, headers=auth_headers, body=auth_body)115        # Overwrite part if the request if it has been requested116        if self.alt_part is not None:117            if self.alt_auth_data is not None:118                alt_url, alt_headers, alt_body = self._decorate_request(119                    filters, method, url, headers, body,120                    auth_data=self.alt_auth_data)121                alt_auth_req = dict(url=alt_url, headers=alt_headers,122                                    body=alt_body)123                auth_req[self.alt_part] = alt_auth_req[self.alt_part]124            else:125                # If alt auth data is None, skip auth in the requested part126                auth_req[self.alt_part] = orig_req[self.alt_part]127            # Next auth request will be normal, unless otherwise requested128            self.reset_alt_auth_data()129        return auth_req['url'], auth_req['headers'], auth_req['body']130    def reset_alt_auth_data(self):131        """132        Configure auth provider to provide valid authentication data133        """134        self.alt_part = None135        self.alt_auth_data = None136    def set_alt_auth_data(self, request_part, auth_data):137        """138        Configure auth provider to provide alt authentication data139        on a part of the *next* auth_request. If credentials are None,140        set invalid data.141        :param request_part: request part to contain invalid auth: url,142                             headers, body143        :param auth_data: alternative auth_data from which to get the144                          invalid data to be injected145        """146        self.alt_part = request_part147        self.alt_auth_data = auth_data148    @abc.abstractmethod149    def base_url(self, filters, auth_data=None):150        """151        Extracts the base_url based on provided filters152        """153        return154class KeystoneAuthProvider(AuthProvider):155    token_expiry_threshold = datetime.timedelta(seconds=60)156    def __init__(self, credentials, auth_url,157                 disable_ssl_certificate_validation=None,158                 ca_certs=None, trace_requests=None):159        super(KeystoneAuthProvider, self).__init__(credentials)160        self.dsvm = disable_ssl_certificate_validation161        self.ca_certs = ca_certs162        self.trace_requests = trace_requests163        self.auth_client = self._auth_client(auth_url)164    def _decorate_request(self, filters, method, url, headers=None, body=None,165                          auth_data=None):166        if auth_data is None:167            auth_data = self.auth_data168        token, _ = auth_data169        base_url = self.base_url(filters=filters, auth_data=auth_data)170        # build authenticated request171        # returns new request, it does not touch the original values172        _headers = copy.deepcopy(headers) if headers is not None else {}173        _headers['X-Auth-Token'] = str(token)174        if url is None or url == "":175            _url = base_url176        else:177            # Join base URL and url, and remove multiple contiguous slashes178            _url = "/".join([base_url, url])179            parts = [x for x in urlparse.urlparse(_url)]180            parts[2] = re.sub("/{2,}", "/", parts[2])181            _url = urlparse.urlunparse(parts)182        # no change to method or body183        return str(_url), _headers, body184    @abc.abstractmethod185    def _auth_client(self):186        return187    @abc.abstractmethod188    def _auth_params(self):189        return190    def _get_auth(self):191        # Bypasses the cache192        auth_func = getattr(self.auth_client, 'get_token')193        auth_params = self._auth_params()194        # returns token, auth_data195        token, auth_data = auth_func(**auth_params)196        return token, auth_data197    def get_token(self):198        return self.auth_data[0]199class KeystoneV2AuthProvider(KeystoneAuthProvider):200    EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'201    def _auth_client(self, auth_url):202        return json_v2id.TokenClientJSON(203            auth_url, disable_ssl_certificate_validation=self.dsvm,204            ca_certs=self.ca_certs, trace_requests=self.trace_requests)205    def _auth_params(self):206        return dict(207            user=self.credentials.username,208            password=self.credentials.password,209            tenant=self.credentials.tenant_name,210            auth_data=True)211    def _fill_credentials(self, auth_data_body):212        tenant = auth_data_body['token']['tenant']213        user = auth_data_body['user']214        if self.credentials.tenant_name is None:215            self.credentials.tenant_name = tenant['name']216        if self.credentials.tenant_id is None:217            self.credentials.tenant_id = tenant['id']218        if self.credentials.username is None:219            self.credentials.username = user['name']220        if self.credentials.user_id is None:221            self.credentials.user_id = user['id']222    def base_url(self, filters, auth_data=None):223        """224        Filters can be:225        - service: compute, image, etc226        - region: the service region227        - endpoint_type: adminURL, publicURL, internalURL228        - api_version: replace catalog version with this229        - skip_path: take just the base URL230        """231        if auth_data is None:232            auth_data = self.auth_data233        token, _auth_data = auth_data234        service = filters.get('service')235        region = filters.get('region')236        endpoint_type = filters.get('endpoint_type', 'publicURL')237        if service is None:238            raise exceptions.EndpointNotFound("No service provided")239        _base_url = None240        for ep in _auth_data['serviceCatalog']:241            if ep["type"] == service:242                for _ep in ep['endpoints']:243                    if region is not None and _ep['region'] == region:244                        _base_url = _ep.get(endpoint_type)245                if not _base_url:246                    # No region matching, use the first247                    _base_url = ep['endpoints'][0].get(endpoint_type)248                break249        if _base_url is None:250            raise exceptions.EndpointNotFound(service)251        parts = urlparse.urlparse(_base_url)252        if filters.get('api_version', None) is not None:253            path = "/" + filters['api_version']254            noversion_path = "/".join(parts.path.split("/")[2:])255            if noversion_path != "":256                path += "/" + noversion_path257            _base_url = _base_url.replace(parts.path, path)258        if filters.get('skip_path', None) is not None and parts.path != '':259            _base_url = _base_url.replace(parts.path, "/")260        return _base_url261    def is_expired(self, auth_data):262        _, access = auth_data263        expiry = datetime.datetime.strptime(access['token']['expires'],264                                            self.EXPIRY_DATE_FORMAT)265        return expiry - self.token_expiry_threshold <= \266            datetime.datetime.utcnow()267class KeystoneV3AuthProvider(KeystoneAuthProvider):268    EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'269    def _auth_client(self, auth_url):270        return json_v3id.V3TokenClientJSON(271            auth_url, disable_ssl_certificate_validation=self.dsvm,272            ca_certs=self.ca_certs, trace_requests=self.trace_requests)273    def _auth_params(self):274        return dict(275            user_id=self.credentials.user_id,276            username=self.credentials.username,277            password=self.credentials.password,278            project_id=self.credentials.project_id,279            project_name=self.credentials.project_name,280            user_domain_id=self.credentials.user_domain_id,281            user_domain_name=self.credentials.user_domain_name,282            project_domain_id=self.credentials.project_domain_id,283            project_domain_name=self.credentials.project_domain_name,284            domain_id=self.credentials.domain_id,285            domain_name=self.credentials.domain_name,286            auth_data=True)287    def _fill_credentials(self, auth_data_body):288        # project or domain, depending on the scope289        project = auth_data_body.get('project', None)290        domain = auth_data_body.get('domain', None)291        # user is always there292        user = auth_data_body['user']293        # Set project fields294        if project is not None:295            if self.credentials.project_name is None:296                self.credentials.project_name = project['name']297            if self.credentials.project_id is None:298                self.credentials.project_id = project['id']299            if self.credentials.project_domain_id is None:300                self.credentials.project_domain_id = project['domain']['id']301            if self.credentials.project_domain_name is None:...web.py
Source:web.py  
...28            logging.info(29                "Page loading stopped directly, " "cause of load time is out-of-date"30            )31        return driver32    def _fill_credentials(self, driver):33        logger.info("Filling up credentials inputs")34        login_input = driver.find_element_by_id("txtLogin")35        password_input = driver.find_element_by_id("txtPassword")36        login_input.send_keys(KASPI_USERNAME)37        password_input.send_keys(KASPI_PASSWORD)38        return driver39    def _confirm_code(self, driver):40        logger.info("Starting confirm code operations")41        login_button = driver.find_element_by_class_name("entrance__loginButton")42        login_button.click()43        time.sleep(20)44        code = get_confirmation_code()45        code_chars = [num for num in code]46        driver.find_element_by_id("txtOtpChar1").send_keys(code_chars[0])47        driver.find_element_by_id("txtOtpChar2").send_keys(code_chars[1])48        driver.find_element_by_id("txtOtpChar3").send_keys(code_chars[2])49        driver.find_element_by_id("txtOtpChar4").send_keys(code_chars[3])50        return driver51    def login(self):52        delay = 1053        logger.info("Opening the chrome")54        driver = webdriver.Chrome()55        driver.set_page_load_timeout(40)56        driver = self._get_login_page(driver=driver, delay=delay)57        driver = self._fill_credentials(driver=driver)58        driver = self._confirm_code(driver=driver)59        return driver60    def logout(self, driver, delay):61        logger.info("Log outing from system")62        driver.find_element_by_id("headerAuth").click()63        WebDriverWait(driver, delay).until(64            ec.presence_of_element_located(65                (By.CLASS_NAME, "headerSettings__item--logOut")66            )67        )68        driver.find_element_by_class_name("headerSettings__item--logOut").click()69class KaspiTransactions:70    def __init__(71        self, auth_service: KaspiAuth, links: Optional[Type[KaspiLinks]] = None...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
