How to use is_main_thread method in tox

Best Python code snippet using tox_python

middlewares.py

Source:middlewares.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""3This file include custom middleware for scrapy.4The middleware implements rotating proxy with random user agent.5"""6import asyncio7import codecs8import json9import logging10import os11import threading12from subprocess import CalledProcessError, check_call13from fake_useragent import UserAgent14from proxybroker import Broker15from rotating_proxies.expire import Proxies, ProxyState16from rotating_proxies.middlewares import RotatingProxyMiddleware17from rotating_proxies.utils import extract_proxy_hostport18from scrapy import signals19from scrapy.exceptions import CloseSpider, NotConfigured20from scrapy.utils.project import get_project_settings21from movie_scrapers.modules.async_looper import RepeatedTimer22__author__ = "Baran Nama"23__copyright__ = "Copyright 2020, Movies-ds project"24__maintainer__ = "Baran Nama"25__email__ = "barann.nama@gmail.com"26logger = logging.getLogger(__name__)27class CustomRotatingProxiesMiddleware(RotatingProxyMiddleware):28 """29 Class implementing rotating proxy with random user agent30 """31 def __init__(32 self,33 proxy_list,34 logstats_interval,35 stop_if_no_proxies,36 max_proxies_to_try,37 backoff_base,38 backoff_cap,39 crawler,40 ):41 super(CustomRotatingProxiesMiddleware, self).__init__(42 proxy_list,43 logstats_interval,44 stop_if_no_proxies,45 max_proxies_to_try,46 backoff_base,47 backoff_cap,48 crawler,49 )50 # change default proxy class with custom one51 self.proxies = CustomProxies(52 self.cleanup_proxy_list(proxy_list), backoff=self.proxies.backoff53 )54 # if we need to use random agent, set it up55 self.use_random_ua = crawler.settings.get("USE_RANDOM_UA", False)56 if self.use_random_ua:57 fallback = crawler.settings.get("FAKEUSERAGENT_FALLBACK", None)58 self.ua = UserAgent(fallback=fallback)59 self.ua_type = crawler.settings.get("RANDOM_UA_TYPE", "random")60 self.per_proxy = crawler.settings.get("RANDOM_UA_PER_PROXY", False)61 self.proxy2ua = {}62 @classmethod63 def from_crawler(cls, crawler):64 s = crawler.settings65 proxy_list = CustomProxies.get_proxies()66 mw = cls(67 proxy_list=proxy_list,68 logstats_interval=s.getfloat("ROTATING_PROXY_LOGSTATS_INTERVAL", 30),69 stop_if_no_proxies=s.getbool("ROTATING_PROXY_CLOSE_SPIDER", False),70 max_proxies_to_try=s.getint("ROTATING_PROXY_PAGE_RETRY_TIMES", 5),71 backoff_base=s.getfloat("ROTATING_PROXY_BACKOFF_BASE", 300),72 backoff_cap=s.getfloat("ROTATING_PROXY_BACKOFF_CAP", 3600),73 crawler=crawler,74 )75 crawler.signals.connect(mw.engine_started, signal=signals.engine_started)76 crawler.signals.connect(mw.engine_stopped, signal=signals.engine_stopped)77 return mw78 def process_request(self, request, spider):79 if "proxy" in request.meta and not request.meta.get("_rotating_proxy"):80 return81 # first setup proxy82 proxy = self.proxies.get_random()83 if not proxy:84 if self.stop_if_no_proxies:85 raise CloseSpider("no_proxies")86 else:87 logger.warning("No proxies available, getting new proxies")88 self.proxies.update_proxies(read_from_broker=False)89 proxy = self.proxies.get_random()90 if proxy is None:91 self.proxies.update_proxies(read_from_file=False)92 proxy = self.proxies.get_random()93 if proxy is None:94 logger.error("Overall, No proxies. Close the spider")95 raise CloseSpider("no_proxies_after_reset")96 # after reset proxies, reset proxy-user agent assignments as well97 self.proxy2ua = {}98 request.meta["proxy"] = proxy99 request.meta["download_slot"] = self.get_proxy_slot(proxy)100 request.meta["_rotating_proxy"] = True101 # then setup user agent102 self.setup_ua(request)103 def reanimate_proxies(self):104 """Prevent dead proxies from reanimating.105 If reanimation is needed, just comment it out this overriding106 """107 def setup_ua(self, request):108 """Setup user agent with or without proxy for given request"""109 def get_ua():110 """Gets random UA based on the type setting (random, firefox…)"""111 return getattr(self.ua, self.ua_type)112 if self.use_random_ua:113 proxy = request.meta.get("proxy", None)114 if proxy is not None and self.per_proxy:115 if proxy not in self.proxy2ua:116 self.proxy2ua[proxy] = get_ua()117 logger.debug(118 "Assign User-Agent %s to Proxy %s"119 % (self.proxy2ua[proxy], proxy)120 )121 request.headers.setdefault("User-Agent", self.proxy2ua[proxy])122 else:123 request.headers.setdefault("User-Agent", get_ua())124class CustomProxies(Proxies):125 """126 Helper proxy class for adding, updating and tracking proxies in the system.127 """128 # bool indicating whether we did initial collection from proxybroker129 is_initial_collection = True130 # lock for managing proxybroker collection process131 gather_lock = threading.RLock()132 # lock for managing proxybroker proxy checking process133 check_lock = threading.RLock()134 def __init__(self, proxy_list, backoff=None):135 super().__init__(proxy_list, backoff)136 s = get_project_settings()137 collection_interval = s.getint("PROXY_COLLECTION_INTERVAL", 0)138 if collection_interval > 0:139 worker_loop = asyncio.new_event_loop()140 # Create a task for updating proxies if required141 self.task = RepeatedTimer(142 self.update_proxies,143 collection_interval * 60,144 event_loop=worker_loop,145 now=False,146 read_from_file=False,147 )148 self.task.start()149 # logger.info('Initial automated async proxy collection has been scheduled')150 def engine_stopped(self):151 """ Stop any running collection task if exist """152 if getattr(self, "task", False) and self.task.running:153 logger.info("Async collection task is ending")154 self.task.stop()155 def update_proxies(self, read_from_file=True, read_from_broker=True):156 """ Update the proxies with given ones while excluding already used ones """157 is_main_thread = threading.current_thread() is threading.main_thread()158 new_proxies = CustomProxies.get_proxies(159 read_from_file=read_from_file, read_from_broker=read_from_broker160 )161 logger.info(162 f'[Thread: {"Main" if is_main_thread else "Not main"}] '163 f"Updating the proxies by using recently collected proxies: {len(new_proxies)}"164 )165 for proxy in new_proxies:166 self.add(proxy)167 @staticmethod168 def get_proxies(read_from_file=True, read_from_broker=True):169 """ Get proxies from various sources including from files, setting and proxybroker170 Note that it only fetch proxies, not check whether it is already used or not"""171 proxy_list = []172 if read_from_file:173 proxy_list = CustomProxies.get_proxies_from_file()174 # we have no proxy file and no proxy list in the settings then get proxies from proxybroker175 if read_from_broker and not proxy_list:176 proxy_list = CustomProxies.get_proxies_from_external()177 if not proxy_list:178 proxy_list = CustomProxies.get_proxies_programmatically()179 return proxy_list180 @staticmethod181 def get_proxies_from_file():182 """ Get proxies from external file or from settings"""183 s = get_project_settings()184 is_main_thread = threading.current_thread() is threading.main_thread()185 proxy_path = s.get("ROTATING_PROXY_LIST_PATH", None)186 logger.info(187 f'[Thread: {"Main" if is_main_thread else "Not main"}]'188 f"Proxies is read from file: {proxy_path}."189 )190 # first check whether we have a proxy list file, if exist get the proxies191 if proxy_path is not None and os.path.isfile(proxy_path):192 proxy_list = CustomProxies.check_proxies(193 open(proxy_path, "r", encoding="utf-8")194 )195 logger.info(196 f'[Thread: {"Main" if is_main_thread else "Not main"}]'197 f"Valid proxies found in :{proxy_path} after checking is {len(proxy_list)}"198 )199 else:200 # then check whether we integrate a proxy list in the settings201 proxy_list = s.getlist("ROTATING_PROXY_LIST", [])202 # remove duplicates if exist203 proxy_list = list(set(proxy_list))204 return proxy_list205 @classmethod206 def get_proxies_from_external(cls):207 """ Get proxies using external proxybroker script and read results from file"""208 def scrape_proxies():209 script_path = s.get("PROXY_SCRIPT_PATH", "proxy_scrape.py")210 scrape_settings = json.dumps(211 {212 "dnsbl": s.get("PROXY_DNSBL"),213 "types": s.get("PROXY_TYPES"),214 "countries": s.get("PROXY_COUNTRIES"),215 }216 )217 execution_command = f"python {script_path} -p {proxy_file_path} -l {limit} -s '{scrape_settings}'"218 logger.info(f"Command executed: {execution_command}")219 check_call(execution_command, shell=True, timeout=30 * 60)220 is_main_thread = threading.current_thread() is threading.main_thread()221 logger.info(222 f'[Thread: {"Main" if is_main_thread else "Not main"}] '223 f"Proxy collection using external script has been requested."224 )225 with cls.gather_lock:226 logger.info(227 f'[Thread: {"Main" if is_main_thread else "Not main"}] '228 f"Proxy collection using external script is started."229 )230 proxy_list = []231 s = get_project_settings()232 proxy_file_path = s.get("PROXY_FILE_PATH", "proxies.txt")233 limit = s.getint("PROXY_PERIODIC_COUNT", 10)234 if cls.is_initial_collection:235 limit = s.getint("PROXY_INITIAL_COUNT", 0)236 try:237 scrape_proxies()238 except (CalledProcessError, TimeoutError) as e:239 logger.error(240 f'[Thread: {"Main" if is_main_thread else "Not main"}]'241 f"{e}. "242 f"No proxy has been received from file: {proxy_file_path}. Trying to get again."243 )244 except Exception as e:245 logger.error(246 f'[Thread: {"Main" if is_main_thread else "Not main"}]'247 f"{e}. "248 f"No proxy has been received from file: {proxy_file_path}. Trying to get again."249 )250 finally:251 if os.path.isfile(proxy_file_path):252 with codecs.open(proxy_file_path, "r", encoding="utf8") as f:253 proxy_list = [line.strip() for line in f if line.strip()]254 # remove proxies after use255 os.remove(proxy_file_path)256 if not proxy_list:257 logger.error(258 f'[Thread: {"Main" if is_main_thread else "Not main"}]'259 f"No proxy has been received from file: {proxy_file_path}. Trying to get again."260 )261 return cls.get_proxies_from_external()262 logger.info(263 f'[Thread: {"Main" if is_main_thread else "Not main"}]: '264 f"Proxy collection from file is ended."265 f'Type of collection: {"initial" if cls.is_initial_collection else "periodic"} '266 f" Number of collected proxies: {len(proxy_list)}"267 )268 # we did initial proxybroker collection, so we will do smaller batch of collection269 cls.is_initial_collection = False270 return proxy_list271 @classmethod272 def get_proxies_programmatically(cls):273 """ Static method for collecting free proxies using ProxyBroker by executing in runtime274 is_initial is the variable whether we will use initial collection limit or periodic one"""275 async def fetch_proxy(proxies):276 while True:277 proxy = await proxies.get()278 if proxy is None:279 break280 proto = "https" if "HTTPS" in proxy.types else "http"281 row = f"{proto}://{proxy.host}:{proxy.port}"282 if row not in proxy_list:283 proxy_list.append(row)284 return proxy_list285 is_main_thread = threading.current_thread() is threading.main_thread()286 logger.info(287 f'[Thread: {"Main" if is_main_thread else "Not main"}] '288 f"Proxies collection is requested programmatically."289 )290 with cls.gather_lock:291 proxy_list = []292 logger.info(293 f'[Thread: {"Main" if is_main_thread else "Not main"}] '294 f"Proxies is started to collect programmatically."295 )296 s = get_project_settings()297 limit = s.getint("PROXY_PERIODIC_COUNT", 10)298 if cls.is_initial_collection:299 limit = s.getint("PROXY_INITIAL_COUNT", 100)300 proxy_q = asyncio.Queue()301 if is_main_thread:302 broker = Broker(proxy_q)303 else:304 broker = Broker(proxy_q, stop_broker_on_sigint=False)305 try:306 tasks = asyncio.gather(307 broker.find(308 types=s.get("PROXY_TYPES"),309 countries=s.get("PROXY_COUNTRIES"),310 strict=True,311 dnsbl=s.get("PROXY_DNSBL"),312 limit=limit,313 ),314 fetch_proxy(proxy_q),315 )316 loop = asyncio.get_event_loop()317 _, proxy_list = loop.run_until_complete(tasks)318 except Exception as e:319 logger.error(320 f'[Thread: {"Main" if is_main_thread else "Not main"}]'321 f"{e}"322 f"Error happened on proxy collection programmatically. Cancelled"323 )324 broker.stop()325 else:326 logger.info(327 f'[Thread: {"Main" if is_main_thread else "Not main"}]: '328 f"Proxy collection programmatically is ended."329 f'Type of collection: {"initial" if cls.is_initial_collection else "periodic"} '330 f" Number of collected proxies: {len(proxy_list)}"331 )332 # we did initial proxybroker collection, so we will do smaller batch of collection333 cls.is_initial_collection = False334 return proxy_list335 @classmethod336 def check_proxies(cls, proxy_list):337 """ Static method for checking given proxy list using ProxyBroker"""338 async def fetch_proxy(proxies):339 new_proxy_list = []340 while True:341 proxy = await proxies.get()342 if proxy is None:343 break344 proto = "https" if "HTTPS" in proxy.types else "http"345 row = f"{proto}://{proxy.host}:{proxy.port}"346 if row not in new_proxy_list:347 new_proxy_list.append(row)348 return new_proxy_list349 is_main_thread = threading.current_thread() is threading.main_thread()350 logger.info(351 f'[Thread: {"Main" if is_main_thread else "Not main"}] '352 f"Proxies checking is requested."353 )354 with cls.check_lock:355 logger.info(356 f'[Thread: {"Main" if is_main_thread else "Not main"}] '357 f"Proxies checking is started."358 )359 s = get_project_settings()360 proxy_q = asyncio.Queue()361 if threading.current_thread() is threading.main_thread():362 broker = Broker(proxy_q)363 else:364 broker = Broker(proxy_q, stop_broker_on_sigint=False)365 try:366 tasks = asyncio.gather(367 broker.find(368 data=proxy_list,369 types=s.get("PROXY_TYPES"),370 countries=s.get("PROXY_COUNTRIES"),371 strict=True,372 dnsbl=s.get("PROXY_DNSBL"),373 ),374 fetch_proxy(proxy_q),375 )376 loop = asyncio.get_event_loop()377 _, proxy_list = loop.run_until_complete(tasks)378 except RuntimeError as e:379 logger.error(f"Error happened on proxy checking. Cancelled")380 broker.stop()381 else:382 logger.info(383 f'[Thread: {"Main" if is_main_thread else "Not main"}]: '384 f"Proxy checking is ended."385 f" Number of collected proxies: {len(proxy_list)}"386 )387 return proxy_list388 def add(self, proxy):389 """ Add a proxy to the proxy list """390 if proxy in self.proxies:391 logger.warning(f"Proxy {proxy} is already in proxies list")392 return393 hostport = extract_proxy_hostport(proxy)394 self.proxies[proxy] = ProxyState()395 self.proxies_by_hostport[hostport] = proxy...

Full Screen

Full Screen

base.py

Source:base.py Github

copy

Full Screen

...10 quick_log = []11 @classmethod12 def add_log(cls, content=''):13 self = cls()14 # print('添加 Log 主进程{} 进程ID{}'.format(is_main_thread(), current_thread_id()))15 if is_main_thread():16 self.logs.append(content)17 else:18 tmp_log = self.thread_logs.get(current_thread_id(), [])19 tmp_log.append(content)20 self.thread_logs[current_thread_id()] = tmp_log21 return self22 @classmethod23 def flush(cls, sep='\n', end='\n', file=None, exit=False, publish=True):24 from py12306.cluster.cluster import Cluster25 self = cls()26 logs = self.get_logs()27 # 输出到文件28 if file == None and Config().OUT_PUT_LOG_TO_FILE_ENABLED and not Const.IS_TEST: # TODO 文件无法写入友好提示29 file = open(Config().OUT_PUT_LOG_TO_FILE_PATH, 'a', encoding='utf-8')30 if not file: file = None31 # 输出日志到各个节点32 if publish and self.quick_log and Config().is_cluster_enabled() and Cluster().is_ready: #33 f = io.StringIO()34 with redirect_stdout(f):35 print(*logs, sep=sep, end='' if end == '\n' else end)36 out = f.getvalue()37 Cluster().publish_log_message(out)38 else:39 print(*logs, sep=sep, end=end, file=file)40 self.empty_logs(logs)41 if exit: sys.exit()42 def get_logs(self):43 if self.quick_log:44 logs = self.quick_log45 else:46 if is_main_thread():47 logs = self.logs48 else:49 logs = self.thread_logs.get(current_thread_id())50 return logs51 def empty_logs(self, logs=None):52 if self.quick_log:53 self.quick_log = []54 else:55 if is_main_thread():56 self.logs = []57 else:58 if logs and self.thread_logs.get(current_thread_id()): del self.thread_logs[current_thread_id()]59 @classmethod60 def add_quick_log(cls, content=''):61 self = cls()62 self.quick_log.append(content)63 return self64 def notification(self, title, content=''):65 # if sys.platform == 'darwin': # 不太友好 先关闭,之前没考虑到 mac 下会请求权限66 # os.system( 'osascript -e \'tell app "System Events" to display notification "{content}" with title "{title}"\''.format(67 # title=title, content=content))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tox automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful