Best Python code snippet using molotov_python
request.py
Source:request.py  
1# -*- coding: utf-8 -*-2import threading3import time4import traceback5import re6from resources.lib.modules.globals import g7from resources.lib.modules.scrapers import source_utils8from resources.lib.modules.scrapers.common_types import UrlParts9from resources.lib.modules.scrapers.third_party.cloudscraper import cloudscraper10from requests.compat import urlparse, urlunparse11_head_checks = {}12def _get(cfscrape, url, headers, timeout, allow_redirects, update_options_fn):13    request_options = {14        'method': 'GET',15        'url': url,16        'headers': headers,17        'timeout': timeout,18        'allow_redirects': allow_redirects19    }20    if update_options_fn is not None:21        update_options_fn(request_options)22    return cfscrape.request(**request_options)23def _is_cloudflare_iuam_challenge(resp, allow_empty_body=False):24    try:25        return (26            resp.headers.get('Server', '').startswith('cloudflare')27            and resp.status_code in [429, 503]28            and (allow_empty_body or re.search(29                r'action="/.*?__cf_chl_jschl_tk__=\S+".*?name="jschl_vc"\svalue=.*?',30                resp.text,31                re.M | re.DOTALL32            ))33        )34    except AttributeError:35        pass36    return False37def _get_domain(url): 38    parsed_url = urlparse(url)39    scheme = parsed_url.scheme if parsed_url.scheme != '' else 'https'40    return "%s://%s" % (scheme, parsed_url.netloc)41def _get_head_check(url):42    result = _head_checks.get(url, None)43    if isinstance(result, bool):44        return (url, result)45    elif result is not None:46        return _get_head_check(result)47    return (url, None)48class Request(object):49    def __init__(self, sequental=False, timeout=None, wait=1):50        self._request = source_utils.randomUserAgentRequests()51        self._cfscrape = cloudscraper.create_scraper(interpreter='native')52        self._sequental = sequental53        self._wait = wait54        self._should_wait = False55        self._lock = threading.Lock()56        self._timeout = 1057        if timeout is not None:58            self._timeout = timeout59        self.exc_msg = ''60        self.skip_head = False61        self.request_time = 9962    def _verify_response(self, response):63      if response.status_code >= 400:64          self.exc_msg = 'response status code %s' % response.status_code65          if response.status_code in [429, 503]:66            self.exc_msg = '%s (probably Cloudflare)' % self.exc_msg67          raise Exception()68    def _request_core(self, request, sequental = None, cf_retries=3):69        self.exc_msg = ''70        if sequental is None:71            sequental = self._sequental72        response_err = lambda: None73        response_err.status_code = 50174        try:75            response = None76            if sequental is False:77                self._request_start = time.time()78                response = request(None)79                self._request_end = time.time()80                self.request_time = round(self._request_end - self._request_start)81                response_err = response82                self._verify_response(response)83                return response84            # with self._lock:85            #     if self._should_wait:86            #         time.sleep(self._wait)87            #     self._should_wait = True88            #     self._request_start = time.time()89            #     response = request(_update_request_options)90            #     self._request_end = time.time()91            #     self.request_time = round(self._request_end - self._request_start)92            #93            # response_err = response94            # self._verify_response(response)95            #96            # try:97            #     if self.exc_msg == '' and response.request.headers.get('X-Domain', None) is not None:98            #         _save_cf_cookies(self._cfscrape, response)99            # except: pass100            #101            # return response102        except:103            self._request_end = time.time()104            self.request_time = round(self._request_end - self._request_start)105            if self.exc_msg == '':106              exc = traceback.format_exc(limit=1)107              if 'ConnectTimeout' in exc or 'ReadTimeout' in exc:108                  self.exc_msg = 'request timed out'109              if 'Detected the new Cloudflare challenge.' in exc and cf_retries > 0 and self.request_time < 2:110                  cf_retries -= 1111                  tools.log('cf_new_challenge_retry: %s' % request.url, 'notice')112                  return self._request_core(request, sequental, cf_retries)113              elif 'Cloudflare' in exc or '!!Loop Protection!!' in exc:114                  self.exc_msg = 'failed Cloudflare protection'115              elif 'Max retries exceeded with url' in exc:116                  self.exc_msg = 'Max retries exceeded'117              else:118                  self.exc_msg = 'failed - %s' % exc119            # g.log('%s %s' % (request.url, self.exc_msg), 'notice')120            return response_err121    def _check_redirect(self, src, response):122        if response.status_code in [301, 302]:123            redirect_url = response.headers['Location']124            if not redirect_url.endswith('127.0.0.1') and not redirect_url.endswith('localhost') and response.url != redirect_url:125                dest = redirect_url126                src_clean = re.sub(r'https?://', '', src)127                dest_clean = re.sub(r'https?://', '', _get_domain(dest))128                if src_clean != dest_clean or 'https://' in dest:129                  dest130        return False131    def _head(self, url):132        global _head_checks133        if self.skip_head:134            return (url, 200)135        (url, head_check) = _get_head_check(url)136        if head_check:137            return (url, 200)138        elif head_check is False:139            return (url, 500)140        url = _get_domain(url)141        # g.log('HEAD: %s' % url, 'debug')142        request = lambda _: self._request.head(url, timeout=2)143        request.url = url144        try:145            response = self._request_core(request, sequental=False)146            if _is_cloudflare_iuam_challenge(response, allow_empty_body=True):147                response = lambda: None148                response.url = url149                response.status_code = 200150            if response.status_code >= 400:151                response = lambda: None152                response.url = url153                response.status_code = 200154        except:155            response = lambda: None156            response.url = url157            response.status_code = 200158        try:159            head_check_key = _get_domain(response.url)160        except:161            response.url = url162            head_check_key = _get_domain(url)163        redirect_url = self._check_redirect(head_check_key, response)164        if redirect_url:165            _head_checks[head_check_key] = redirect_url166            return self._head(redirect_url)167        _head_checks[head_check_key] = response.status_code == 200168        return (response.url, response.status_code)169    def head(self, url):170        return self._head(url)171        # return database.get(self._head, 12, url)172    def find_url(self, urls):173        for url in urls:174            (response_url, status_code) = self.head(url.base)175            if status_code != 200:176                continue177            if response_url.endswith("/"):178                response_url = response_url[:-1]179            return UrlParts(base=response_url, search=url.search, default_search=url.default_search)180        return None181    def get(self, url, headers={}, allow_redirects=True):182        parsed_url = urlparse(url)183        response = self.head(_get_domain(url))184        if response is None:185            return None186        # (url, status_code) = response187        # if status_code != 200:188        #     return None189        resolved_url = urlparse(url)190        url = urlunparse(191            (192                resolved_url.scheme,193                resolved_url.netloc,194                parsed_url.path,195                parsed_url.params,196                parsed_url.query,197                parsed_url.fragment,198            )199        )200        # g.log('GET: %s' % url)#, 'debug')201        request = lambda x: _get(self._cfscrape, url, headers, self._timeout, allow_redirects, x)202        request.url = url203        return self._request_core(request)204    def post(self, url, data, headers={}):205        # g.log('POST: %s' % url, 'debug')206        request = lambda _: self._cfscrape.post(url, data, headers=headers, timeout=self._timeout)207        request.url = url...observer_thread_wrapper.py
Source:observer_thread_wrapper.py  
1# -*- coding: utf-8 -*-2"""Wrapper for observer registered in ThreadedMolerConnection (old name: ObservableConnection)."""3__author__ = 'Marcin Usielski'4__copyright__ = 'Copyright (C) 2020-2021, Nokia'5__email__ = 'marcin.usielski@nokia.com'6from threading import Thread7from moler.config.loggers import TRACE8from moler.exceptions import CommandFailure, MolerException9import logging10from moler.util import tracked_thread11import threading12try:13    import queue14except ImportError:15    import Queue as queue  # For python 216class ObserverThreadWrapper(object):17    """Wrapper for observer registered in ThreadedMolerConnection (old name: ObservableConnection)."""18    _th_nr = 119    def __init__(self, observer, observer_self, logger):20        """21        Construct wrapper for observer.22        :param observer: observer to wrap.23        :param observer_self: self for observer if observer is method from object or None if observer is a function.24        :param logger: logger to log.25        """26        self._observer = observer27        self._observer_self = observer_self28        self._queue = queue.Queue()29        self._request_end = threading.Event()30        self._timeout_for_get_from_queue = 131        self.logger = logger32        self._t = Thread(target=self._loop_for_observer, name="ObserverThreadWrapper-{}-{}".format(33            ObserverThreadWrapper._th_nr, observer_self))34        ObserverThreadWrapper._th_nr += 135        self._t.setDaemon(True)36        self._t.start()37    def feed(self, data, recv_time):38        """39        Put data here.40        :param data: data to put.41        :param recv_time: time when data is received/read from connection.42        :return: None43        """44        self._queue.put((data, recv_time))45    def request_stop(self):46        """47        Call if you want to stop feed observer.48        :return: None49        """50        self._request_end.set()51        # self._t.join()  # only for debugging to have less active threads.52        if self._t:53            self._t = None54    @tracked_thread.log_exit_exception55    def _loop_for_observer(self):56        """57        Loop to pass data (put by method feed) to observer.58        :return: None59        """60        logging.getLogger("moler_threads").debug("ENTER {}".format(self._observer))61        heartbeat = tracked_thread.report_alive()62        while not self._request_end.is_set():63            if next(heartbeat):64                logging.getLogger("moler_threads").debug("ALIVE")65            try:66                data, timestamp = self._queue.get(True, self._timeout_for_get_from_queue)67                try:68                    self.logger.log(level=TRACE, msg=r'notifying {}({!r})'.format(self._observer, repr(data)))69                except ReferenceError:70                    self._request_end.set()  # self._observer is no more valid.71                try:72                    if self._observer_self:73                        self._observer(self._observer_self, data, timestamp)74                    else:75                        self._observer(data, timestamp)76                except ReferenceError:77                    self._request_end.set()  # self._observer is no more valid.78                except Exception as ex:79                    self._handle_unexpected_error_from_observer(exception=ex, data=data, timestamp=timestamp)80            except queue.Empty:81                pass  # No incoming data within self._timeout_for_get_from_queue82        self._observer = None83        self._observer_self = None84        logging.getLogger("moler_threads").debug("EXIT")85    def _handle_unexpected_error_from_observer(self, exception, data, timestamp):86        self.logger.exception(msg=r'Exception inside: {}({!r}) at {}'.format(self._observer, repr(data), timestamp))87class ObserverThreadWrapperForConnectionObserver(ObserverThreadWrapper):88    def _handle_unexpected_error_from_observer(self, exception, data, timestamp):89        self.logger.warning("Unhandled exception from '{} 'caught by ObserverThreadWrapperForConnectionObserver"90                            " (Runner normally). '{}' : '{}'.".format(self._observer_self, exception, repr(exception)))91        ex_msg = "Unexpected exception from {} caught by runner when processing data >>{}<< at '{}':" \92                 " >>>{}<<< -> repr: >>>{}<<<".format(self._observer_self, data, timestamp, exception, repr(exception))93        if self._observer_self.is_command():94            ex = CommandFailure(command=self._observer_self, message=ex_msg)95        else:96            ex = MolerException(ex_msg)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
