Best Python code snippet using localstack_python
web_scrapers.py
Source:web_scrapers.py  
1class _print:2	def __init__(self):3		pass4	def _abc(dictionary,indent=''):5			print(indent,type(dictionary), end='')6			for key in dictionary:7				space = indent + ' '8				print('\n',space, key, end='')9				space += ' '10				if type(dictionary[key]) == dict:11					_print._abc(dictionary=dictionary[key],indent=space)12				else:13					print('\n', space, dictionary[key])14	15	def _indented(*args,indent=''):16		print('web_scrapers', end='')17		space = indent + ' '18		for arg in args:19			space = space + ' '20			if type(arg) == dict:21				_print._abc(dictionary=arg,indent=space)22			else:23				print(':\n',space, arg, end='')24		print('\n')25			# if type(message) == dict:26			# 	print()27			# 	self._dictionary(message, indent=space)28			# else:29			# 	print(':\n',indent, message, end='')30class timer:31	"""32	A timer that can be used to determin elapsed time.33		timer(duration_seconds:float)34		Example:35			from modules.web_scrapers import timer36			x = timer(2)37			x.start()38			print('Timer started :', x.started())39			x.wait(1)40			print('Time elapsed: ', x.elapsed())41			print('Time remaining: ', x.remaining())42			print('The timer is done (maybe):', x.done())43			print('Waiting till done.')44			x.wait_till_done()45			print('Timer done:', x.done())46			import modules.web_scrapers as scrapers47			x = scrapers.timer(2)48	"""49	from datetime import datetime50	from datetime import timezone51	from time import sleep52	_utc = timezone.utc53	_timer = datetime54	55	def __init__(self, duration_seconds:float):56		self._timer_start_time = None57		self.duration_seconds = duration_seconds58	def current_time(self) -> datetime:59		"""60		Returns the current time as a datetime object.61		"""62		return self._timer.now(self._utc)63	def current_time_posix(self) -> float:64		"""65		Returns the current POSIX time seconds.66		"""67		return self.current_time().timestamp()68	def current_time_iso(self) -> str:69		"""70		Returns the current time is ISO format.71		2021-00-11-14:30-05:0072		"""73		return self.current_time().replace(microsecond=0).isoformat(' ')74	def start_time(self) -> datetime:75		"""76		Returns the timer start time as a datetime object.77		"""78		return self.timer_start_time79	def start_time_posix(self) -> float:80		"""81		Returns the timer start time in POSIX time seconds.82		"""83		return self.start_time().timestamp()84	def start_time_iso(self) -> str:85		"""86		Returns the timer start time in an ISO format.87		2021-00-11-14:30-05:0088		"""89		return self.start_time().replace(microsecond=0).isoformat(' ')90	def started(self) -> bool:91		"""92		Returns True or False. True is start has been called.93		"""94		if self.start_time() == None:95			return False96		return True97	def start(self, duration_seconds:float=None) -> None:98		"""99		Sets the timer to zero.100		"""101		self.timer_start_time = self.current_time()102		if duration_seconds != None:103			self.duration_seconds = duration_seconds104		if self.duration_seconds == None:105			_print._indented( 'class timer', 'start()','Warning: A count down duration has not been set.')106	def elapsed_seconds(self) -> float:107		"""108		Seconds since last reset.109		"""110		return self.current_time_posix() -  self.start_time_posix()111	def remaining_seconds(self) -> float:112		"""113		Seconds remaining. A negative number is seconds past timer end.114		"""115		return self.duration_seconds - self.elapsed_seconds()116	def wait(self, seconds:float) -> None:117		"""118		Waits the duration provided.119		"""120		self.sleep(seconds)121	def wait_till_done(self) -> None:122		"""123		Waits till the count down reaches zero.124		"""125		self.wait(self.remaining_seconds())126	def check_done(self) -> bool:127		"""128		Checks if the count down has reached zero.129		"""130		if self.remaining_seconds() > 0:131			return False132		else:133			self.started_utc_now = None134			return True135class _web:136	import requests137	def __init__(self, random_headers:bool=True, time_out:tuple=(10,10), url_update_interval:float=60):138		"""139		Time out (x,y):140			x - seconds to wait for a response.141			y - seconds to wait for the content.142		"""143		self.use_random_headers = random_headers144		self.time_out = time_out145		self.url_update_interval = url_update_interval146		self.urltimers = {}147		self.timer = timer(url_update_interval)148		self.previous_user_agent_index = 0149		self.user_agents = [150			"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.2 Safari/605.1.15",151			"Mozilla/5.0 (iPhone; CPU iPhone OS 14_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.2 Mobile/15E148 Safari/604.1",152			"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36",153			"Mozilla/5.0 (iPhone; CPU iPhone OS 14_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/87.0.4280.77 Mobile/15E148 Safari/604.1",154			"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36 Edge/17.17134",155			"Mozilla/5.0 (X11; CrOS x86_64 13310.93.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.133 Safari/537.36",156			]157		self.header = {158			"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",159			"Accept-Encoding": "gzip, deflate",160			"Accept-Language": "en-ca",161			"Upgrade-Insecure-Requests": "1",162			"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.2 Safari/605.1.15"163			}164	def _get_random_user_agent(self) -> str:165		"""166		Returns a random user agent.167		"""168		num = self.timer.current_time_posix()169		num = int( ( num - int(num) ) * 10**6 )170		idx = num % len(self.user_agents)171		if idx == self.previous_user_agent_index :172			idx = (num + 1) % len(self.user_agents)173			self.previous_user_agent_index = idx174		return self.user_agents[idx]175	def _get_header(self) -> dict:176		"""177		Returns a header with a random user agent.178		"""179		if self.use_random_headers:180			self.header["User-Agent"] = self._get_random_user_agent()181		return self.header182	def _get_website(self, url:str, header:dict=None) -> str:183		'''184		Returns the contents website.185		If the url update interval has not been completed 'None' is returned.186		'''187		# Don't access website too often188		if self.urltimers.get(url) == None:189			self.urltimers[url] = timer(self.url_update_interval)190		else:191			if not self.urltimers[url].check_done():192				# _print._indented('class _web', '_get_website('+url+', '+str(type(header))+')', 193				# 	'if not self.urltimers[url].check_done()')194				return None195		# Asign header196		if header == None:197			header =self._get_header()198	199		try:200			website = self.requests.get( url, timeout=self.time_out, headers=header)201		except:202			_print._indented('class _web', '_get_website('+url+', '+type(header)+')', 203				'website = self.requests.get('+url+', '+str(self.time_out)+','+type(header)+')')204			return None205		if not website.ok:206			_print._indented('class _web', '_get_website('+url+', '+ str(type(header))+')','not website.ok',website.text[:200])207		self.urltimers[url].start()208		return str(website.text)209class equity:210	from datetime import datetime211	from datetime import timezone212	import html213	class market_watch:214		"""215		Stock data scrapped from MarketWatch.216			market_watch(random_headers:bool=True, update_delay_seconds:int=60)217			Values:218				name                   :  tesla inc.219				symbol                 :  tsla220				currency               :  usd221				price                  :  702.00222				price_change           :  7.22223				price_change_percent   :  1.04%224				price_time_iso         :  2020-12-31-10:24-05:00225				exchange               :  nasdaq226				parsely-tags           :  pagetype: quotes, us:tsla, stock, us: u.s.: nasdaq, nas, page-overview227				chartingsymbol         :  stock/us/xnas/tsla228				instrumenttype         :  stock229				exchangecountry        :  us230				exchangeiso            :  xnas231			Example:232				from  modules.web_scrapers import equity233				scraper = equity.market_watch(random_headers=True)234				stock = scraper.get_stock('aapl')235				for key in stock:236					print(key.ljust(20),' : ',stock[key])237				del scraper238				del stock239				import modules.web_scrapers as scrapers240				scraper = scrapers.equity.market_watch(random_headers=True)241				stock = scraper.get_stock('avst', 'uk')242				for key in stock:243					print(key.ljust(20),' : ',stock[key])244		"""245		def __init__(self, random_headers:bool=True, update_delay_seconds:int=60):246			"""247			Use Market Watch to get stock information.248				When random_headers is True a header is chosen randomly and used with each website access.249				To reduce load on the website server update_delay_seconds prevents access using the250				same  symbol if the specified number of seconds has not elapsed since the last access.251			"""252			web = _web(random_headers=random_headers,  url_update_interval=update_delay_seconds)253			self.get_website = web._get_website254			self.url = 'https://www.marketwatch.com/investing/stock/'255			self.stock_list = {}256		def get_stock(self, symbol:str, country:str=None, header:dict=None) -> dict:257			"""258			Get the latest information on the stock symbol string.259				The optional country code defaults to US.260					CA, AU, FR, DE, HK, IT, JP, NL, NZ, NO, ZA, ES, SE, CH, UK261				Returns a dictionatry with keys:262					parsely-tags, chartingsymbol, instrumenttype, exchangecountry, exchangeiso,263					quotetime, name, symbol, currency, price, price_change, price_change_percent,264					price_time_iso, exchange265					Optional:266						Uses the supplied browser header.267						If a header is not provided then a random one gets used.268				Example:269					from  modules.web_scrapers import equity270					scraper = equity.market_watch(random_headers=True)271					stock = scraper.get_stock('aapl')272					for key in stock:273						print(key.ljust(20),' : ',stock[key])274					import modules.web_scrapers as scrapers275					scraper = scrapers.equity.market_watch(random_headers=True)276			"""277			symbol = symbol.casefold()278			if country != None:279				country = country.casefold()280			ticker_symbol = symbol281			url = self.url + symbol282			if country != None:283				url= url + '?countrycode=' + country284			# Website text285			html = self.get_website(url, header=header)286			if html == None:287				if self.stock_list[url] != None:288					return self.stock_list[url]289				else:290					_print._indented('class equity', 'market_watch', 'get_stock(' + symbol + ', ' + country + ', ' + str(type(header)+')',291					'html = self.get_website(' + url + ',' + 'header=' + str(type(header)) + ').casefold()',292					'if html == None:') )293				return None294			html = html.casefold()295			# Start parsing296			data_string = html[ html.find('<meta name=\"parsely-tags\"') : ]297			data_string = data_string[ : data_string.find('<meta name=\"description\"') ]298			lines = data_string.split('<meta name=')299			stock = {}300			for line in lines:301				if line != '':302					data = line.split('\"')303					key = data[1]304					value =  data[3].replace('	', '') # remove the tab305					value = equity.html.unescape(value)306					stock[key] = value307			# Test we have the expected keys308			if stock.get('tickersymbol') == None:309				_print._indented('market_watch', str('get_stock(' + symbol + ', ' + country + ', ' + str(type(header)) + ')'), 310				'stock.get(\'tickersymbol\') == None' )311				return None312			elif stock['tickersymbol'] != symbol:313				_print._indented('market_watch', str('get_stock(' + symbol + ', ' + country + ', ' + str(type(header)) + ')'), 314					'elif stock[\'tickersymbol\'] = ' + stock['tickersymbol'] + '!= '+symbol)315				return None316			# Price317			# Stock price with only digits and a period318			price =stock.pop('price')319			try:320				x = float(price)321			except:322				for c in range(0,len(price)):323					if not price[0].isdigit():324						price = price[1:]325					if not price[-1].isdigit():326						price = price[:-1]327				price = price.replace(',','')328			329			# ISO Date Time Format330			# From:331			# 	jan 12, 2021 4:35 p.m. gmt332			# 	(utc+00:00) dublin, edinburgh, lisbon, london333			# To:334			# 	2021-01-12-16:35-05:00335			tz = stock.pop('exchangetimezone')336			tz = tz.split('(')[1].split(')')[0][-6:]337			qt = stock['quotetime']338			qt = qt[:qt.find('.m.')] + 'm '339			qt = qt + tz340			time_iso = equity.datetime.strptime(qt, '%b %d, %Y %I:%M %p %z')341			time_iso = str(time_iso.replace(microsecond=0).astimezone(equity.timezone.utc).isoformat(' '))342			# Matching keys with data from other sources343			stock['name']=stock.pop('name')344			stock['symbol']=stock.pop('tickersymbol')345			stock['currency']=stock.pop('pricecurrency')346			stock['price'] = price347			stock['price_change']=stock.pop('pricechange')348			stock['price_change_percent']=stock.pop('pricechangepercent')349			stock['exchange'] = stock.pop('exchange')350			stock['price_time_iso'] = time_iso351			stock['quotetime'] = stock.pop('quotetime')352			self.stock_list[url] = stock353			return stock354	class yahoo_finance:355		"""356		Stock data scrapped from Yahoo Finance357			yahoo_finance(random_headers:bool=True, update_delay_seconds:int=60)358			Values:359				name                             :  tesla, inc.360				symbol                           :  tsla361				currency                         :  usd362				price                            :  701.20363				price_change                     :  6.42364				price_change_percent             :  0.92%365				price_time_iso                   :  2020-12-31-10:33-05:00366				exchange                         :  nasdaqgs367				sourceinterval                   :  15368				quotesourcename                  :  nasdaq real time price369				regularmarketopen                :  {'raw': 699.99, 'fmt': '699.99'}370				regularmarkettime                :  {'raw': 1609428782, 'fmt': '10:33am est'}371				fiftytwoweekrange                :  {'raw': '70.102 - 703.7399', 'fmt': '70.10 - 703.74'}372				sharesoutstanding                :  {'raw': 947900992, 'fmt': '947.901m', 'longfmt': '947,900,992'}373				regularmarketdayhigh             :  {'raw': 703.7399, 'fmt': '703.74'}374				longname                         :  tesla, inc.375				exchangetimezonename             :  america\u002fnew_york376				regularmarketpreviousclose       :  {'raw': 694.78, 'fmt': '694.78'}377				fiftytwoweekhighchange           :  {'raw': -2.539917, 'fmt': '-2.54'}378				exchangetimezoneshortname        :  est379				fiftytwoweeklowchange            :  {'raw': 631.098, 'fmt': '631.10'}380				exchangedatadelayedby            :  0381				regularmarketdaylow              :  {'raw': 691.13, 'fmt': '691.13'}382				pricehint                        :  2383				regularmarketvolume              :  {'raw': 11294432, 'fmt': '11.294m', 'longfmt': '11,294,432'}384				isloading                        :  False385				triggerable                      :  True386				firsttradedatemilliseconds       :  1277818200000387				region                           :  ca388				marketstate                      :  regular389				marketcap                        :  {'raw': 664668209152, 'fmt': '664.668b', 'longfmt': '664,668,209,152'}390				quotetype                        :  equity391				invalid                          :  False392				language                         :  en-ca393				fiftytwoweeklowchangepercent     :  {'raw': 9.002568, 'fmt': '900.26%'}394				regularmarketdayrange            :  {'raw': '691.13 - 703.7399', 'fmt': '691.13 - 703.74'}395				messageboardid                   :  finmb_27444752396				fiftytwoweekhigh                 :  {'raw': 703.7399, 'fmt': '703.74'}397				fiftytwoweekhighchangepercent    :  {'raw': -0.00360917, 'fmt': '-0.36%'}398				uuid                             :  ec367bc4-f92c-397c-ac81-bf7b43cffaf7399				market                           :  us_market400				fiftytwoweeklow                  :  {'raw': 70.102, 'fmt': '70.10'}401				tradeable                        :  False402			Example:403				from  modules.web_scrapers import equity404				scraper = equity.yahoo_finance(random_headers=True)405				stock = scraper.get_stock('aapl')406				for key in stock:407					print(key.ljust(20),' : ',stock[key])408				del scraper409				del stock410				import modules.web_scrapers as scrapers411				scraper = scrapers.equity.yahoo_finance(random_headers=True)412		"""413		def __init__(self,random_headers:bool=True, update_delay_seconds:int=60):414			"""415			Use Yahoo Finance to get stock information.416				When random_headers is True a header is chosen randomly and used with each website access.417				To reduce load on the website server update_delay_seconds prevents access using the418				same  symbol if the specified number of seconds has not elapsed since the last access.419			"""420			web = _web(random_headers=random_headers, url_update_interval=update_delay_seconds)421			self.get_website = web._get_website422			self.host = 'ca.finance.yahoo.com'423			self.url1 = 'https://' + self.host + '/quote/'424			self.url2 = '/sustainability?p='425			self.time_out = (2,4)426			from json import loads427			self.loads = loads428			self.stock_list = {}429			430		def get_stock(self, symbol:str, header:dict=None) -> dict :431			"""432			Get the latest information on the stock symbol string.433				Returns a dictionatry with keys:434					name,		symbol, 	currency, exchange,435					price, 	price_change,		price_change_percent, price_time_iso,436					regularmarketpreviousclose : {'raw':, 'fmt':},437					regularmarketopen : {'raw':, 'fmt':},438					regularmarketdaylow : {'raw':, 'fmt':},439					regularmarketdayhigh : {'raw':, 'fmt':},440					regularmarketdayrange : {'raw':, 'fmt':},441					regularmarketvolume : {'raw':, 'fmt':},442					regularmarkettime : {'raw':, 'fmt':},443					fiftytwoweekrange : {'raw':, 'fmt':},444					fiftytwoweeklow : {'raw':, 'fmt':},445					fiftytwoweeklowchange : {'raw':, 'fmt':},446					fiftytwoweeklowchangepercent : {'raw':, 'fmt':},447					fiftytwoweekhigh : {'raw':, 'fmt':},448					fiftytwoweekhighchange : {'raw':, 'fmt':},449					fiftytwoweekhighchangepercent : {'raw':, 'fmt':},450					sharesoutstanding : {'raw':, 'fmt':},451					marketcap : {'raw':, 'fmt':},452					sourceinterval,453					quotesourcename,454					sharesoutstanding,455					longname,456					exchangetimezonename,457					exchangetimezoneshortname,458					exchangedatadelayedby,459					pricehint,460					isloading,461					triggerable,462					firsttradedatemilliseconds,463					region,464					marketstate,465					marketcap,466					quotetype,	invalid,467					language,468					messageboardid,469					uuid,470					market,471					tradeable472				Optional:473					Uses the supplied browser header.474					If a header is not provided then a random one gets used.475					Example:476						import yahoo_finance as yf477						yf.yahoo_finance().get_stock('tsla')478						yf.yahoo_finance().get_stock('avst.l')479			"""480			symbol = symbol.casefold()481			ticker_symbol = symbol482			url = self.url1 + ticker_symbol + self.url2 + ticker_symbol483			# Website html484			html = self.get_website(url, header=header).casefold()485			486			if html == None:487				if self.stock_list[url] != None:488					return self.stock_list[url]489				else:490					_print._indented('class equity', 'yahoo_finance', 'get_stock(' + symbol + ', ' + str(type(header) + ')',491					'html = self.get_website(' + url + ',' + 'header=' + str(type(header)) + ').casefold()',492					'if html == None:') )493				return None494			data_string1 = html[ html.find('\"quotedata\":{'): ]495			data_string2 = data_string1[:data_string1.find(',\"mktmdata\"')]496			dic_str = data_string2[data_string2.find('{'):]497			try:498				stock = self.loads(dic_str)499			except:500				_print._indented('yahoo_finance', 'get_stock(' + symbol + ', ' +str(type(header)) + ')', 501					'try','stock = self.loads(dic_str)', 'data_string1', data_string1, 'data_string2', data_string2, 502					'dic_str', dic_str, 'html[:100]', html[:100])503				return None504			del html505			del dic_str506			# Test we have the expected keys507			if stock.get(symbol) == None:508				_print._indented('yahoo_finance', 'get_stock('+symbol+', '+str(type(header))+')', 509					'stock.get(\''+symbol+'\') == None', stock)510				return None511			elif stock[symbol]['symbol'] != symbol:512				_print._indented('yahoo_finance', 'get_stock('+symbol+', '+str(type(header))+')', 513					'elif stock[symbol][\'symbol\'] != symbol:', stock[symbol]['symbol']+' != '+symbol)514				return None515			stock = stock[symbol]516			# ISO Format517			# 2020-12-30-16:00-05:00518			lt = int( stock['regularmarkettime']['raw'] )519			dt_iso = equity.datetime.fromtimestamp(lt,tz=equity.timezone.utc).isoformat(' ')520			# Matching keys with data from other sources521			stock['name']=stock.pop('shortname')522			stock['symbol']=stock.pop('symbol')523			stock['currency']=stock.pop('currency')524			stock['price']=str(stock.pop('regularmarketprice')['raw'])525			stock['price_change']=stock.pop('regularmarketchange')['fmt']526			stock['price_change_percent']=stock.pop('regularmarketchangepercent')['fmt']527			stock.pop('exchange')528			stock['exchange'] = stock.pop('fullexchangename')529			stock['price_time_iso'] = dt_iso530			stock['regularmarkettime'] = stock.pop('regularmarkettime')531			self.stock_list[url] = stock532			return stock533class crypto:534	class business_insider:535		"""536		Cryptocurrency scraped from Business Insider.537			business_insider(random_headers:bool=True, update_delay_seconds:int=60)538			Dictionary keys:539				name             :  bitcoin540				price            :  38966.5195541				change           :  -412.53542				change_percent   :  -1.05543				market_cap       :  704230000000544				circulating      :  18072712545				volume           :  18880000000546				utc_iso          :  2021-01-08-18:02:20.067122+00:00547			Example:548				from  modules.web_scrapers import crypto549				scraper = crypto.business_insider(random_headers=True)550				crypto = scraper.get_crypto('btc-usd')551				for key in crypto:552					print(key.ljust(15),' : ',crypto[key])553				import modules.web_scrapers as scrapers554				scraper = scrapers.crypto.business_insider()555		"""556		def __init__(self,random_headers:bool=True, update_delay_seconds:int=60):557			"""558			Use Market Watch to get stock information.559			When random_headers is True a header is chosen randomly and used with each website access.560			To reduce load on the website server update_delay_seconds prevents access using the561			same  symbol if the specified number of seconds has not elapsed since the last access.562			"""563			self.web = _web(random_headers=random_headers, url_update_interval=update_delay_seconds)564			self.get_website = self.web._get_website565			# https://markets.businessinsider.com/cryptocurrencies566			# https://markets.businessinsider.com/currencies/btc-usd567			self.url_crypto_list = 'https://markets.businessinsider.com/cryptocurrencies'568			self.time_out = (4,4)569			self.crypto_value_sections_list = []570			self.values_dict = { 'symbol':0, 'name':1 , 'price':4, 'change':7,571													'change_percent':10, 'market_cap':11, 'circulating':12, 572													'volume':13, 'utc_iso':-1 573												}574			self.most_active_cryptos = {}575			self.most_active_cryptos_limit = 100576			self.timer = timer(update_delay_seconds)577			self.update_delay_seconds = update_delay_seconds578		def _parse_value(self,section_index:int) -> str:579			if section_index == -1: # utc_iso key580				return self.timer.start_time_iso()581			value_section = self.crypto_value_sections_list[section_index]582			value_list = value_section.split('<')583			value = value_list[0]584			del value_list585			if section_index == self.values_dict['symbol']:586				symbol_section_list = value.split('\"')587				symbol_link = symbol_section_list[1]588				del symbol_section_list589				symbol_link_sections = symbol_link.split('/')590				return symbol_link_sections[2]591			if section_index > self.values_dict['name']:592				if value[-1] == 'b':593					return str( int(float(value[:-1]) * 10**9) )594				if value[-1] == 'm':595					return str( float(value[:-1]) * 10**6 )596			return value597		def get_most_active_cryptos(self, limit:int, header:dict=None, show_warnings:bool=False) -> dict:598			"""599			Get the latest information on the most active cryptocuttencies.600				Returns a dictionatry with keys:601					'name' : {'symbol': , 'price':, 'change':, 'change_percent':, 'market_cap':,602											'circulating':, 'volume':}603					'bitcoin' : {'symbol':'tbc-usd', price':'34164.5391', 'change':'2105.41',604											'change_percent':'6.57', 'market_cap':'617450000000',605											'circulating':'18072712', 'volume':'18880000000'}606					Optional:607						Uses the supplied browser header.608				Example:609					from  modules.web_scrapers import crypto610					scraper = crypto.business_insider(random_headers=True)611					crypto = scraper.get_crypto('btc-usd')612					for key in crypto:613						print(key.ljust(15),' : ',crypto[key])614					import modules.web_scrapers as scrapers615					scraper = scrapers.crypto.business_insider()616					crypto = scraper.get_crypto('eth-usd')617			"""618			self.most_active_cryptos_limit = limit619			html = self.get_website(self.url_crypto_list, header=header)620			if html == None:621				return self.most_active_cryptos622			self.timer.start()623			html = html.casefold()624			# Get cryptos section625			html_sections_list = html.split('<tbody class="table__tbody">')626			del html627			if show_warnings:628				if len(html_sections_list) != 4:629					_print._indented('crypto', 'business_insider', 'get_most_active_cryptos('+str(limit)+', '+type(header)+', '+ str(show_warnings)+')',630						'HTML sections warning', 'Website data not as expected.',631						'4 sections expected, have ',len(html_sections_list))632			cryptos_section = html_sections_list[1]633			del html_sections_list634			cryptos_section = cryptos_section.replace(',','').replace(' %','')635			# Get crypto sections636			crypto_sections_list = cryptos_section.split('<a')637			del cryptos_section638			number_crypto_sections = len(crypto_sections_list) - 1639			if number_crypto_sections < limit:640				limit = number_crypto_sections641			limit +=1642			# Get crypto sections symbols, values643			for crypto_section in crypto_sections_list[1:limit]:644				self.crypto_value_sections_list = crypto_section.split('">')645				if show_warnings :646					if len(self.crypto_value_sections_list) != 18:647						_print._indented('\nValues sections warning:\n\tWebsite data not as expected.')648						_print._indented('\t18 sections expected, have ',len(self.crypto_value_sections_list))649				for key in self.values_dict:650					if key == 'symbol':651						symbol = self._parse_value(self.values_dict[key] )652						self.most_active_cryptos[symbol] = {}653					else:654						self.most_active_cryptos[symbol][key] = self._parse_value(self.values_dict[key] )655			return self.most_active_cryptos656		def find_symbol_for(self,limit:int, name:str) -> list:657			"""658			Returns a list of symbols from the last most active crypto currency data set659			that match in part or whole the name supplied.660			"""661			self.most_active_cryptos_limit = limit662			self.most_active_cryptos = self.get_most_active_cryptos(self.most_active_cryptos_limit)663			list = []664			for symbol in self.most_active_cryptos :665				crypto_name = self.most_active_cryptos[symbol]['name']666				if crypto_name.find(name) >= 0:667					list.append((symbol, crypto_name))668			return list669		def get_price_for(self,symbol:str) -> str:670			"""671			Returns the price as a string.672			"""673			self.most_active_cryptos = self.get_most_active_cryptos(self.most_active_cryptos_limit)674			return self.most_active_cryptos[symbol]['price']675		def get_crypto(self,symbol:str) -> dict:676			"""677			Returns the dictionary of a crypto symbol.678			"""679			self.most_active_cryptos = self.get_most_active_cryptos(self.most_active_cryptos_limit)680			return self.most_active_cryptos[symbol]681# timer and web682if __name__ == '__main__' :683	print('\ntimer and web')684	print('=============')685	# input('Press enter to continue.')686	w = _web(random_headers=True, url_update_interval=3)687	x = timer(5)688	print('Getting a website:\n',w._get_website('http://httpbin.org/headers') )689	x.start()690	print('Timer started :', x.started())691	x.wait(1)692	print('Time elapsed: ', x.elapsed_seconds())693	print('\nGetting the website again:',w._get_website('http://httpbin.org/headers') )694	print('\nTime remaining: ', x.remaining_seconds())695	print('The timer is done (maybe):', x.check_done())696	print('Waiting till done.')697	x.wait_till_done()698	print('Timer done:', x.check_done())699	print('Lets try getting the website again:\n',w._get_website('http://httpbin.org/headers') )700	print('Timer started at: ', x.start_time_iso())701	print('Current time:     ', x.current_time_iso())702	print('POSIX time:       ', x.current_time_posix())703# market_watch704if __name__ == '__main__' :705	print('\nmarket_watch')706	print('============')707	# input('Press enter to continue.')708	def example(symbol,country=None,random_headers=False):709		"""710		Prints all the available dictionary keys.711		Run from the command line:712				python3 yahoo_finance.py713		"""714		if random_headers:715			site = equity.market_watch(random_headers=True)716		else:717			site = equity.market_watch()718		stock = site.get_stock(symbol,country)719		print()720		for key in stock :721			pad=''722			for x in range(0,20-len(key)):723				pad = pad + ' '724			print(key, pad, ' : ', stock[key])725		print()726	print("\nexample('avst', 'uk', random_headers=False)")727	print('------------------------------------------')728	example('avst', 'uk', random_headers=False)729	print("\nexample('aapl',random_headers=True)")730	print('-----------------------------------')731	example('aapl',random_headers=True)732# yahoo_finance733if __name__ == '__main__' :734	print('\nyahoo_finance')735	print('=============')736	# input('Press enter to continue.')737	def example(symbol,random_headers=False):738		"""739		Prints all the available dictionary keys.740		Run from the command line:741				python3 yahoo_finance.py742		"""743		if random_headers:744			site = equity.yahoo_finance(random_headers=True)745		else:746			site = equity.yahoo_finance()747		stock = site.get_stock(symbol)748		print()749		for key in stock :750			pad=''751			for x in range(0,20-len(key)):752				pad = pad + ' '753			print(key, pad, ' : ', stock[key])754		print()755	print("\nexample('avst.l', random_headers=False)")756	print('--------------------------------------')757	example('avst.l', random_headers=False)758	print("\nexample('aapl',random_headers=True)")759	print('-----------------------------------')760	example('aapl',random_headers=True)761# business_insider762if __name__ == '__main__' :763	print('\nbusiness_insider')764	print('================')765	# input('Press enter to continue.')766	def example(limit:int, random_headers=False, find:str=None):767		"""768		Prints all the available dictionary keys.769		Run from the command line:770				python3 crypto.py771		"""772		print()773		print('Example limit: '+ str(limit))774		if random_headers:775			source = crypto.business_insider(random_headers=True)776		else:777			source = crypto.business_insider()778		cryptos = source.get_most_active_cryptos(limit=limit)779		if find != None:780			found = source.find_symbol_for(limit,find)781		for key in cryptos:782			cryptos[key]['name'] = cryptos[key]['name'].ljust(20)783			cryptos[key]['price'] = cryptos[key]['price'].rjust(10)784			cryptos[key]['change'] = cryptos[key]['change'].rjust(8)785			cryptos[key]['change_percent'] = cryptos[key]['change_percent'].rjust(8)786			cryptos[key]['market_cap'] = cryptos[key]['market_cap'].rjust(12)787			cryptos[key]['circulating'] = cryptos[key]['circulating'].rjust(12)788			cryptos[key]['volume'] = cryptos[key]['volume'].rjust(12)789			print(key.rjust(10)+' :',cryptos[key])790		if find != None:791				first_symbol = found[0][0].strip()792				print('\nSearched for: '+find+'\nFound: ',found)793				print('First symbol found is ' + first_symbol + ' for ' + found[0][1].strip() +'.')794				print('The price is $',source.get_price_for(first_symbol))795		print()796		print('Note: In this example the fields have been padded with spaces.')797		print('      The actual data does not have padding.')798		print()799	print("\nexample(4,random_headers=True)")800	print('------------------------------')801	example(4,random_headers=True)802	print("\nexample(limit=4,find='it',random_headers=True))")803	print('-----------------------------------------------')...file_loader.py
Source:file_loader.py  
1from lib2to3.pgen2 import token2import os3import pathlib4import requests5from datetime import datetime6import time7PATH_TO_DIR = './data'8SAVED_FILENAME_PATH = 'file_list.txt'9# URL = 'http://192.168.0.1:8001/loader/upload'10URL = 'http://127.0.0.1:8000/loader/upload'11TOKEN = 'fqGuqZMmWSBAeTIawQXrQvls4od0uDhiZa8bJcGV9GI'12DEVICE_ID = 513UPDATE_DELAY_SECONDS = 60*1014def create_file_if_not_exists(SAVED_FILENAME_PATH):15    path = SAVED_FILENAME_PATH16    try:17        file = open(path, 'r')18        file.close()19        print('%s is exists' % (path))20    except IOError:21        file = open(path, 'w')22        file.close()23        print('%s is created' % (path))24def read_saved_filenames(path_to_filename):25    saved_files = []26    with open(path_to_filename, 'r') as f:27        saved_files = [line.rstrip('\n') for line in f]28    return saved_files29def save_filenames(files, path_to_filename):30    path = path_to_filename31    with open(path, 'w') as f:32        for s in files:33            f.write(str(s) + '\n')34def get_files(PATH_TO_DIR, SAVED_FILENAME):35    files = list(pathlib.Path(PATH_TO_DIR).glob('**/*.*'))36    files = [file.__str__() for file in files]37    print(files)38    if SAVED_FILENAME in files:39        print('removing file from list', SAVED_FILENAME)40        files.remove(SAVED_FILENAME)41    if PATH_TO_DIR + '/' + SAVED_FILENAME in files:42        print('removing file from list 2', SAVED_FILENAME)43        files.remove(PATH_TO_DIR + '/' + SAVED_FILENAME)44    return files45def send_data(URL, file, values):46    r = requests.post(URL, files=file, data=values, cookies={'def': 'defvalue'})47    return r48def main():49    create_file_if_not_exists(SAVED_FILENAME_PATH)50    files = get_files(PATH_TO_DIR, SAVED_FILENAME_PATH)51    saved_files = read_saved_filenames(SAVED_FILENAME_PATH)52    print(files)53    print('saved', saved_files)54    for filename in files:55        try:56            if filename in saved_files:57                break58            file =  { filename: open(filename) }59            now = datetime.now()60            date_time_str = now.strftime("%Y-%m-%d %H:%M:%S")61            values = {62                'date': date_time_str,63                'token': TOKEN,64                'device': DEVICE_ID,65            }66            print('sending', filename)67            response = send_data(URL, file, values)68            print(response.status_code)69            if response.status_code == 200:70                saved_files.append(filename)71                save_filenames(saved_files, SAVED_FILENAME_PATH)72        except Exception as e:73            print(e)74def loop():75    while True:76        print('running')77        main()78        time.sleep(UPDATE_DELAY_SECONDS)79if __name__ == "__main__":...tasks.py
Source:tasks.py  
1import logging2from django.conf import settings3from huey.contrib.djhuey import db_task4from p2kitchen import slack5from p2kitchen.models import Brew6logger = logging.getLogger(__name__)7UPDATE_DELAY_SECONDS = 38@db_task()9def start_brewing(brew: Brew):10    logger.debug("Started brewing")11    message = brew.started_message()12    response = slack.chat_post_message(settings.SLACK_CHANNEL, **message)13    brew.slack_channel = response["channel"]14    brew.slack_ts = response["ts"]15    brew.save()16    update_progress.schedule(args=(brew.pk,), delay=UPDATE_DELAY_SECONDS)17@db_task()18def update_progress(brew_pk):19    logger.debug("Updating brew progress")20    try:21        brew = Brew.objects.get(pk=brew_pk)22    except Brew.DoesNotExist:23        logger.error(f"Critical error! Brew {brew_pk} doesn't exist.")24        return25    if brew.status == Brew.Status.FINISHED.value:26        message = brew.finished_message()27        slack.chat_update(brew.slack_channel, brew.slack_ts, **message)28        return29    elif brew.status == Brew.Status.INVALID.value:30        slack.chat_delete(brew.slack_channel, brew.slack_ts)31        return32    message = brew.update_message()33    slack.chat_update(brew.slack_channel, brew.slack_ts, **message)34    # Keep updating progress...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
