How to use update_delay_seconds method in localstack

Best Python code snippet using localstack_python

web_scrapers.py

Source:web_scrapers.py Github

copy

Full Screen

1class _print:2 def __init__(self):3 pass4 def _abc(dictionary,indent=''):5 print(indent,type(dictionary), end='')6 for key in dictionary:7 space = indent + ' '8 print('\n',space, key, end='')9 space += ' '10 if type(dictionary[key]) == dict:11 _print._abc(dictionary=dictionary[key],indent=space)12 else:13 print('\n', space, dictionary[key])14 15 def _indented(*args,indent=''):16 print('web_scrapers', end='')17 space = indent + ' '18 for arg in args:19 space = space + ' '20 if type(arg) == dict:21 _print._abc(dictionary=arg,indent=space)22 else:23 print(':\n',space, arg, end='')24 print('\n')25 # if type(message) == dict:26 # print()27 # self._dictionary(message, indent=space)28 # else:29 # print(':\n',indent, message, end='')30class timer:31 """32 A timer that can be used to determin elapsed time.33 timer(duration_seconds:float)34 Example:35 from modules.web_scrapers import timer36 x = timer(2)37 x.start()38 print('Timer started :', x.started())39 x.wait(1)40 print('Time elapsed: ', x.elapsed())41 print('Time remaining: ', x.remaining())42 print('The timer is done (maybe):', x.done())43 print('Waiting till done.')44 x.wait_till_done()45 print('Timer done:', x.done())46 import modules.web_scrapers as scrapers47 x = scrapers.timer(2)48 """49 from datetime import datetime50 from datetime import timezone51 from time import sleep52 _utc = timezone.utc53 _timer = datetime54 55 def __init__(self, duration_seconds:float):56 self._timer_start_time = None57 self.duration_seconds = duration_seconds58 def current_time(self) -> datetime:59 """60 Returns the current time as a datetime object.61 """62 return self._timer.now(self._utc)63 def current_time_posix(self) -> float:64 """65 Returns the current POSIX time seconds.66 """67 return self.current_time().timestamp()68 def current_time_iso(self) -> str:69 """70 Returns the current time is ISO format.71 2021-00-11-14:30-05:0072 """73 return self.current_time().replace(microsecond=0).isoformat(' ')74 def start_time(self) -> datetime:75 """76 Returns the timer start time as a datetime object.77 """78 return self.timer_start_time79 def start_time_posix(self) -> float:80 """81 Returns the timer start time in POSIX time seconds.82 """83 return self.start_time().timestamp()84 def start_time_iso(self) -> str:85 """86 Returns the timer start time in an ISO format.87 2021-00-11-14:30-05:0088 """89 return self.start_time().replace(microsecond=0).isoformat(' ')90 def started(self) -> bool:91 """92 Returns True or False. True is start has been called.93 """94 if self.start_time() == None:95 return False96 return True97 def start(self, duration_seconds:float=None) -> None:98 """99 Sets the timer to zero.100 """101 self.timer_start_time = self.current_time()102 if duration_seconds != None:103 self.duration_seconds = duration_seconds104 if self.duration_seconds == None:105 _print._indented( 'class timer', 'start()','Warning: A count down duration has not been set.')106 def elapsed_seconds(self) -> float:107 """108 Seconds since last reset.109 """110 return self.current_time_posix() - self.start_time_posix()111 def remaining_seconds(self) -> float:112 """113 Seconds remaining. A negative number is seconds past timer end.114 """115 return self.duration_seconds - self.elapsed_seconds()116 def wait(self, seconds:float) -> None:117 """118 Waits the duration provided.119 """120 self.sleep(seconds)121 def wait_till_done(self) -> None:122 """123 Waits till the count down reaches zero.124 """125 self.wait(self.remaining_seconds())126 def check_done(self) -> bool:127 """128 Checks if the count down has reached zero.129 """130 if self.remaining_seconds() > 0:131 return False132 else:133 self.started_utc_now = None134 return True135class _web:136 import requests137 def __init__(self, random_headers:bool=True, time_out:tuple=(10,10), url_update_interval:float=60):138 """139 Time out (x,y):140 x - seconds to wait for a response.141 y - seconds to wait for the content.142 """143 self.use_random_headers = random_headers144 self.time_out = time_out145 self.url_update_interval = url_update_interval146 self.urltimers = {}147 self.timer = timer(url_update_interval)148 self.previous_user_agent_index = 0149 self.user_agents = [150 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.2 Safari/605.1.15",151 "Mozilla/5.0 (iPhone; CPU iPhone OS 14_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.2 Mobile/15E148 Safari/604.1",152 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36",153 "Mozilla/5.0 (iPhone; CPU iPhone OS 14_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/87.0.4280.77 Mobile/15E148 Safari/604.1",154 "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36 Edge/17.17134",155 "Mozilla/5.0 (X11; CrOS x86_64 13310.93.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.133 Safari/537.36",156 ]157 self.header = {158 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",159 "Accept-Encoding": "gzip, deflate",160 "Accept-Language": "en-ca",161 "Upgrade-Insecure-Requests": "1",162 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.2 Safari/605.1.15"163 }164 def _get_random_user_agent(self) -> str:165 """166 Returns a random user agent.167 """168 num = self.timer.current_time_posix()169 num = int( ( num - int(num) ) * 10**6 )170 idx = num % len(self.user_agents)171 if idx == self.previous_user_agent_index :172 idx = (num + 1) % len(self.user_agents)173 self.previous_user_agent_index = idx174 return self.user_agents[idx]175 def _get_header(self) -> dict:176 """177 Returns a header with a random user agent.178 """179 if self.use_random_headers:180 self.header["User-Agent"] = self._get_random_user_agent()181 return self.header182 def _get_website(self, url:str, header:dict=None) -> str:183 '''184 Returns the contents website.185 If the url update interval has not been completed 'None' is returned.186 '''187 # Don't access website too often188 if self.urltimers.get(url) == None:189 self.urltimers[url] = timer(self.url_update_interval)190 else:191 if not self.urltimers[url].check_done():192 # _print._indented('class _web', '_get_website('+url+', '+str(type(header))+')', 193 # 'if not self.urltimers[url].check_done()')194 return None195 # Asign header196 if header == None:197 header =self._get_header()198 199 try:200 website = self.requests.get( url, timeout=self.time_out, headers=header)201 except:202 _print._indented('class _web', '_get_website('+url+', '+type(header)+')', 203 'website = self.requests.get('+url+', '+str(self.time_out)+','+type(header)+')')204 return None205 if not website.ok:206 _print._indented('class _web', '_get_website('+url+', '+ str(type(header))+')','not website.ok',website.text[:200])207 self.urltimers[url].start()208 return str(website.text)209class equity:210 from datetime import datetime211 from datetime import timezone212 import html213 class market_watch:214 """215 Stock data scrapped from MarketWatch.216 market_watch(random_headers:bool=True, update_delay_seconds:int=60)217 Values:218 name : tesla inc.219 symbol : tsla220 currency : usd221 price : 702.00222 price_change : 7.22223 price_change_percent : 1.04%224 price_time_iso : 2020-12-31-10:24-05:00225 exchange : nasdaq226 parsely-tags : pagetype: quotes, us:tsla, stock, us: u.s.: nasdaq, nas, page-overview227 chartingsymbol : stock/us/xnas/tsla228 instrumenttype : stock229 exchangecountry : us230 exchangeiso : xnas231 Example:232 from modules.web_scrapers import equity233 scraper = equity.market_watch(random_headers=True)234 stock = scraper.get_stock('aapl')235 for key in stock:236 print(key.ljust(20),' : ',stock[key])237 del scraper238 del stock239 import modules.web_scrapers as scrapers240 scraper = scrapers.equity.market_watch(random_headers=True)241 stock = scraper.get_stock('avst', 'uk')242 for key in stock:243 print(key.ljust(20),' : ',stock[key])244 """245 def __init__(self, random_headers:bool=True, update_delay_seconds:int=60):246 """247 Use Market Watch to get stock information.248 When random_headers is True a header is chosen randomly and used with each website access.249 To reduce load on the website server update_delay_seconds prevents access using the250 same symbol if the specified number of seconds has not elapsed since the last access.251 """252 web = _web(random_headers=random_headers, url_update_interval=update_delay_seconds)253 self.get_website = web._get_website254 self.url = 'https://www.marketwatch.com/investing/stock/'255 self.stock_list = {}256 def get_stock(self, symbol:str, country:str=None, header:dict=None) -> dict:257 """258 Get the latest information on the stock symbol string.259 The optional country code defaults to US.260 CA, AU, FR, DE, HK, IT, JP, NL, NZ, NO, ZA, ES, SE, CH, UK261 Returns a dictionatry with keys:262 parsely-tags, chartingsymbol, instrumenttype, exchangecountry, exchangeiso,263 quotetime, name, symbol, currency, price, price_change, price_change_percent,264 price_time_iso, exchange265 Optional:266 Uses the supplied browser header.267 If a header is not provided then a random one gets used.268 Example:269 from modules.web_scrapers import equity270 scraper = equity.market_watch(random_headers=True)271 stock = scraper.get_stock('aapl')272 for key in stock:273 print(key.ljust(20),' : ',stock[key])274 import modules.web_scrapers as scrapers275 scraper = scrapers.equity.market_watch(random_headers=True)276 """277 symbol = symbol.casefold()278 if country != None:279 country = country.casefold()280 ticker_symbol = symbol281 url = self.url + symbol282 if country != None:283 url= url + '?countrycode=' + country284 # Website text285 html = self.get_website(url, header=header)286 if html == None:287 if self.stock_list[url] != None:288 return self.stock_list[url]289 else:290 _print._indented('class equity', 'market_watch', 'get_stock(' + symbol + ', ' + country + ', ' + str(type(header)+')',291 'html = self.get_website(' + url + ',' + 'header=' + str(type(header)) + ').casefold()',292 'if html == None:') )293 return None294 html = html.casefold()295 # Start parsing296 data_string = html[ html.find('<meta name=\"parsely-tags\"') : ]297 data_string = data_string[ : data_string.find('<meta name=\"description\"') ]298 lines = data_string.split('<meta name=')299 stock = {}300 for line in lines:301 if line != '':302 data = line.split('\"')303 key = data[1]304 value = data[3].replace('&#x9;', '') # remove the tab305 value = equity.html.unescape(value)306 stock[key] = value307 # Test we have the expected keys308 if stock.get('tickersymbol') == None:309 _print._indented('market_watch', str('get_stock(' + symbol + ', ' + country + ', ' + str(type(header)) + ')'), 310 'stock.get(\'tickersymbol\') == None' )311 return None312 elif stock['tickersymbol'] != symbol:313 _print._indented('market_watch', str('get_stock(' + symbol + ', ' + country + ', ' + str(type(header)) + ')'), 314 'elif stock[\'tickersymbol\'] = ' + stock['tickersymbol'] + '!= '+symbol)315 return None316 # Price317 # Stock price with only digits and a period318 price =stock.pop('price')319 try:320 x = float(price)321 except:322 for c in range(0,len(price)):323 if not price[0].isdigit():324 price = price[1:]325 if not price[-1].isdigit():326 price = price[:-1]327 price = price.replace(',','')328 329 # ISO Date Time Format330 # From:331 # jan 12, 2021 4:35 p.m. gmt332 # (utc+00:00) dublin, edinburgh, lisbon, london333 # To:334 # 2021-01-12-16:35-05:00335 tz = stock.pop('exchangetimezone')336 tz = tz.split('(')[1].split(')')[0][-6:]337 qt = stock['quotetime']338 qt = qt[:qt.find('.m.')] + 'm '339 qt = qt + tz340 time_iso = equity.datetime.strptime(qt, '%b %d, %Y %I:%M %p %z')341 time_iso = str(time_iso.replace(microsecond=0).astimezone(equity.timezone.utc).isoformat(' '))342 # Matching keys with data from other sources343 stock['name']=stock.pop('name')344 stock['symbol']=stock.pop('tickersymbol')345 stock['currency']=stock.pop('pricecurrency')346 stock['price'] = price347 stock['price_change']=stock.pop('pricechange')348 stock['price_change_percent']=stock.pop('pricechangepercent')349 stock['exchange'] = stock.pop('exchange')350 stock['price_time_iso'] = time_iso351 stock['quotetime'] = stock.pop('quotetime')352 self.stock_list[url] = stock353 return stock354 class yahoo_finance:355 """356 Stock data scrapped from Yahoo Finance357 yahoo_finance(random_headers:bool=True, update_delay_seconds:int=60)358 Values:359 name : tesla, inc.360 symbol : tsla361 currency : usd362 price : 701.20363 price_change : 6.42364 price_change_percent : 0.92%365 price_time_iso : 2020-12-31-10:33-05:00366 exchange : nasdaqgs367 sourceinterval : 15368 quotesourcename : nasdaq real time price369 regularmarketopen : {'raw': 699.99, 'fmt': '699.99'}370 regularmarkettime : {'raw': 1609428782, 'fmt': '10:33am est'}371 fiftytwoweekrange : {'raw': '70.102 - 703.7399', 'fmt': '70.10 - 703.74'}372 sharesoutstanding : {'raw': 947900992, 'fmt': '947.901m', 'longfmt': '947,900,992'}373 regularmarketdayhigh : {'raw': 703.7399, 'fmt': '703.74'}374 longname : tesla, inc.375 exchangetimezonename : america\u002fnew_york376 regularmarketpreviousclose : {'raw': 694.78, 'fmt': '694.78'}377 fiftytwoweekhighchange : {'raw': -2.539917, 'fmt': '-2.54'}378 exchangetimezoneshortname : est379 fiftytwoweeklowchange : {'raw': 631.098, 'fmt': '631.10'}380 exchangedatadelayedby : 0381 regularmarketdaylow : {'raw': 691.13, 'fmt': '691.13'}382 pricehint : 2383 regularmarketvolume : {'raw': 11294432, 'fmt': '11.294m', 'longfmt': '11,294,432'}384 isloading : False385 triggerable : True386 firsttradedatemilliseconds : 1277818200000387 region : ca388 marketstate : regular389 marketcap : {'raw': 664668209152, 'fmt': '664.668b', 'longfmt': '664,668,209,152'}390 quotetype : equity391 invalid : False392 language : en-ca393 fiftytwoweeklowchangepercent : {'raw': 9.002568, 'fmt': '900.26%'}394 regularmarketdayrange : {'raw': '691.13 - 703.7399', 'fmt': '691.13 - 703.74'}395 messageboardid : finmb_27444752396 fiftytwoweekhigh : {'raw': 703.7399, 'fmt': '703.74'}397 fiftytwoweekhighchangepercent : {'raw': -0.00360917, 'fmt': '-0.36%'}398 uuid : ec367bc4-f92c-397c-ac81-bf7b43cffaf7399 market : us_market400 fiftytwoweeklow : {'raw': 70.102, 'fmt': '70.10'}401 tradeable : False402 Example:403 from modules.web_scrapers import equity404 scraper = equity.yahoo_finance(random_headers=True)405 stock = scraper.get_stock('aapl')406 for key in stock:407 print(key.ljust(20),' : ',stock[key])408 del scraper409 del stock410 import modules.web_scrapers as scrapers411 scraper = scrapers.equity.yahoo_finance(random_headers=True)412 """413 def __init__(self,random_headers:bool=True, update_delay_seconds:int=60):414 """415 Use Yahoo Finance to get stock information.416 When random_headers is True a header is chosen randomly and used with each website access.417 To reduce load on the website server update_delay_seconds prevents access using the418 same symbol if the specified number of seconds has not elapsed since the last access.419 """420 web = _web(random_headers=random_headers, url_update_interval=update_delay_seconds)421 self.get_website = web._get_website422 self.host = 'ca.finance.yahoo.com'423 self.url1 = 'https://' + self.host + '/quote/'424 self.url2 = '/sustainability?p='425 self.time_out = (2,4)426 from json import loads427 self.loads = loads428 self.stock_list = {}429 430 def get_stock(self, symbol:str, header:dict=None) -> dict :431 """432 Get the latest information on the stock symbol string.433 Returns a dictionatry with keys:434 name, symbol, currency, exchange,435 price, price_change, price_change_percent, price_time_iso,436 regularmarketpreviousclose : {'raw':, 'fmt':},437 regularmarketopen : {'raw':, 'fmt':},438 regularmarketdaylow : {'raw':, 'fmt':},439 regularmarketdayhigh : {'raw':, 'fmt':},440 regularmarketdayrange : {'raw':, 'fmt':},441 regularmarketvolume : {'raw':, 'fmt':},442 regularmarkettime : {'raw':, 'fmt':},443 fiftytwoweekrange : {'raw':, 'fmt':},444 fiftytwoweeklow : {'raw':, 'fmt':},445 fiftytwoweeklowchange : {'raw':, 'fmt':},446 fiftytwoweeklowchangepercent : {'raw':, 'fmt':},447 fiftytwoweekhigh : {'raw':, 'fmt':},448 fiftytwoweekhighchange : {'raw':, 'fmt':},449 fiftytwoweekhighchangepercent : {'raw':, 'fmt':},450 sharesoutstanding : {'raw':, 'fmt':},451 marketcap : {'raw':, 'fmt':},452 sourceinterval,453 quotesourcename,454 sharesoutstanding,455 longname,456 exchangetimezonename,457 exchangetimezoneshortname,458 exchangedatadelayedby,459 pricehint,460 isloading,461 triggerable,462 firsttradedatemilliseconds,463 region,464 marketstate,465 marketcap,466 quotetype, invalid,467 language,468 messageboardid,469 uuid,470 market,471 tradeable472 Optional:473 Uses the supplied browser header.474 If a header is not provided then a random one gets used.475 Example:476 import yahoo_finance as yf477 yf.yahoo_finance().get_stock('tsla')478 yf.yahoo_finance().get_stock('avst.l')479 """480 symbol = symbol.casefold()481 ticker_symbol = symbol482 url = self.url1 + ticker_symbol + self.url2 + ticker_symbol483 # Website html484 html = self.get_website(url, header=header).casefold()485 486 if html == None:487 if self.stock_list[url] != None:488 return self.stock_list[url]489 else:490 _print._indented('class equity', 'yahoo_finance', 'get_stock(' + symbol + ', ' + str(type(header) + ')',491 'html = self.get_website(' + url + ',' + 'header=' + str(type(header)) + ').casefold()',492 'if html == None:') )493 return None494 data_string1 = html[ html.find('\"quotedata\":{'): ]495 data_string2 = data_string1[:data_string1.find(',\"mktmdata\"')]496 dic_str = data_string2[data_string2.find('{'):]497 try:498 stock = self.loads(dic_str)499 except:500 _print._indented('yahoo_finance', 'get_stock(' + symbol + ', ' +str(type(header)) + ')', 501 'try','stock = self.loads(dic_str)', 'data_string1', data_string1, 'data_string2', data_string2, 502 'dic_str', dic_str, 'html[:100]', html[:100])503 return None504 del html505 del dic_str506 # Test we have the expected keys507 if stock.get(symbol) == None:508 _print._indented('yahoo_finance', 'get_stock('+symbol+', '+str(type(header))+')', 509 'stock.get(\''+symbol+'\') == None', stock)510 return None511 elif stock[symbol]['symbol'] != symbol:512 _print._indented('yahoo_finance', 'get_stock('+symbol+', '+str(type(header))+')', 513 'elif stock[symbol][\'symbol\'] != symbol:', stock[symbol]['symbol']+' != '+symbol)514 return None515 stock = stock[symbol]516 # ISO Format517 # 2020-12-30-16:00-05:00518 lt = int( stock['regularmarkettime']['raw'] )519 dt_iso = equity.datetime.fromtimestamp(lt,tz=equity.timezone.utc).isoformat(' ')520 # Matching keys with data from other sources521 stock['name']=stock.pop('shortname')522 stock['symbol']=stock.pop('symbol')523 stock['currency']=stock.pop('currency')524 stock['price']=str(stock.pop('regularmarketprice')['raw'])525 stock['price_change']=stock.pop('regularmarketchange')['fmt']526 stock['price_change_percent']=stock.pop('regularmarketchangepercent')['fmt']527 stock.pop('exchange')528 stock['exchange'] = stock.pop('fullexchangename')529 stock['price_time_iso'] = dt_iso530 stock['regularmarkettime'] = stock.pop('regularmarkettime')531 self.stock_list[url] = stock532 return stock533class crypto:534 class business_insider:535 """536 Cryptocurrency scraped from Business Insider.537 business_insider(random_headers:bool=True, update_delay_seconds:int=60)538 Dictionary keys:539 name : bitcoin540 price : 38966.5195541 change : -412.53542 change_percent : -1.05543 market_cap : 704230000000544 circulating : 18072712545 volume : 18880000000546 utc_iso : 2021-01-08-18:02:20.067122+00:00547 Example:548 from modules.web_scrapers import crypto549 scraper = crypto.business_insider(random_headers=True)550 crypto = scraper.get_crypto('btc-usd')551 for key in crypto:552 print(key.ljust(15),' : ',crypto[key])553 import modules.web_scrapers as scrapers554 scraper = scrapers.crypto.business_insider()555 """556 def __init__(self,random_headers:bool=True, update_delay_seconds:int=60):557 """558 Use Market Watch to get stock information.559 When random_headers is True a header is chosen randomly and used with each website access.560 To reduce load on the website server update_delay_seconds prevents access using the561 same symbol if the specified number of seconds has not elapsed since the last access.562 """563 self.web = _web(random_headers=random_headers, url_update_interval=update_delay_seconds)564 self.get_website = self.web._get_website565 # https://markets.businessinsider.com/cryptocurrencies566 # https://markets.businessinsider.com/currencies/btc-usd567 self.url_crypto_list = 'https://markets.businessinsider.com/cryptocurrencies'568 self.time_out = (4,4)569 self.crypto_value_sections_list = []570 self.values_dict = { 'symbol':0, 'name':1 , 'price':4, 'change':7,571 'change_percent':10, 'market_cap':11, 'circulating':12, 572 'volume':13, 'utc_iso':-1 573 }574 self.most_active_cryptos = {}575 self.most_active_cryptos_limit = 100576 self.timer = timer(update_delay_seconds)577 self.update_delay_seconds = update_delay_seconds578 def _parse_value(self,section_index:int) -> str:579 if section_index == -1: # utc_iso key580 return self.timer.start_time_iso()581 value_section = self.crypto_value_sections_list[section_index]582 value_list = value_section.split('<')583 value = value_list[0]584 del value_list585 if section_index == self.values_dict['symbol']:586 symbol_section_list = value.split('\"')587 symbol_link = symbol_section_list[1]588 del symbol_section_list589 symbol_link_sections = symbol_link.split('/')590 return symbol_link_sections[2]591 if section_index > self.values_dict['name']:592 if value[-1] == 'b':593 return str( int(float(value[:-1]) * 10**9) )594 if value[-1] == 'm':595 return str( float(value[:-1]) * 10**6 )596 return value597 def get_most_active_cryptos(self, limit:int, header:dict=None, show_warnings:bool=False) -> dict:598 """599 Get the latest information on the most active cryptocuttencies.600 Returns a dictionatry with keys:601 'name' : {'symbol': , 'price':, 'change':, 'change_percent':, 'market_cap':,602 'circulating':, 'volume':}603 'bitcoin' : {'symbol':'tbc-usd', price':'34164.5391', 'change':'2105.41',604 'change_percent':'6.57', 'market_cap':'617450000000',605 'circulating':'18072712', 'volume':'18880000000'}606 Optional:607 Uses the supplied browser header.608 Example:609 from modules.web_scrapers import crypto610 scraper = crypto.business_insider(random_headers=True)611 crypto = scraper.get_crypto('btc-usd')612 for key in crypto:613 print(key.ljust(15),' : ',crypto[key])614 import modules.web_scrapers as scrapers615 scraper = scrapers.crypto.business_insider()616 crypto = scraper.get_crypto('eth-usd')617 """618 self.most_active_cryptos_limit = limit619 html = self.get_website(self.url_crypto_list, header=header)620 if html == None:621 return self.most_active_cryptos622 self.timer.start()623 html = html.casefold()624 # Get cryptos section625 html_sections_list = html.split('<tbody class="table__tbody">')626 del html627 if show_warnings:628 if len(html_sections_list) != 4:629 _print._indented('crypto', 'business_insider', 'get_most_active_cryptos('+str(limit)+', '+type(header)+', '+ str(show_warnings)+')',630 'HTML sections warning', 'Website data not as expected.',631 '4 sections expected, have ',len(html_sections_list))632 cryptos_section = html_sections_list[1]633 del html_sections_list634 cryptos_section = cryptos_section.replace(',','').replace(' %','')635 # Get crypto sections636 crypto_sections_list = cryptos_section.split('<a')637 del cryptos_section638 number_crypto_sections = len(crypto_sections_list) - 1639 if number_crypto_sections < limit:640 limit = number_crypto_sections641 limit +=1642 # Get crypto sections symbols, values643 for crypto_section in crypto_sections_list[1:limit]:644 self.crypto_value_sections_list = crypto_section.split('">')645 if show_warnings :646 if len(self.crypto_value_sections_list) != 18:647 _print._indented('\nValues sections warning:\n\tWebsite data not as expected.')648 _print._indented('\t18 sections expected, have ',len(self.crypto_value_sections_list))649 for key in self.values_dict:650 if key == 'symbol':651 symbol = self._parse_value(self.values_dict[key] )652 self.most_active_cryptos[symbol] = {}653 else:654 self.most_active_cryptos[symbol][key] = self._parse_value(self.values_dict[key] )655 return self.most_active_cryptos656 def find_symbol_for(self,limit:int, name:str) -> list:657 """658 Returns a list of symbols from the last most active crypto currency data set659 that match in part or whole the name supplied.660 """661 self.most_active_cryptos_limit = limit662 self.most_active_cryptos = self.get_most_active_cryptos(self.most_active_cryptos_limit)663 list = []664 for symbol in self.most_active_cryptos :665 crypto_name = self.most_active_cryptos[symbol]['name']666 if crypto_name.find(name) >= 0:667 list.append((symbol, crypto_name))668 return list669 def get_price_for(self,symbol:str) -> str:670 """671 Returns the price as a string.672 """673 self.most_active_cryptos = self.get_most_active_cryptos(self.most_active_cryptos_limit)674 return self.most_active_cryptos[symbol]['price']675 def get_crypto(self,symbol:str) -> dict:676 """677 Returns the dictionary of a crypto symbol.678 """679 self.most_active_cryptos = self.get_most_active_cryptos(self.most_active_cryptos_limit)680 return self.most_active_cryptos[symbol]681# timer and web682if __name__ == '__main__' :683 print('\ntimer and web')684 print('=============')685 # input('Press enter to continue.')686 w = _web(random_headers=True, url_update_interval=3)687 x = timer(5)688 print('Getting a website:\n',w._get_website('http://httpbin.org/headers') )689 x.start()690 print('Timer started :', x.started())691 x.wait(1)692 print('Time elapsed: ', x.elapsed_seconds())693 print('\nGetting the website again:',w._get_website('http://httpbin.org/headers') )694 print('\nTime remaining: ', x.remaining_seconds())695 print('The timer is done (maybe):', x.check_done())696 print('Waiting till done.')697 x.wait_till_done()698 print('Timer done:', x.check_done())699 print('Lets try getting the website again:\n',w._get_website('http://httpbin.org/headers') )700 print('Timer started at: ', x.start_time_iso())701 print('Current time: ', x.current_time_iso())702 print('POSIX time: ', x.current_time_posix())703# market_watch704if __name__ == '__main__' :705 print('\nmarket_watch')706 print('============')707 # input('Press enter to continue.')708 def example(symbol,country=None,random_headers=False):709 """710 Prints all the available dictionary keys.711 Run from the command line:712 python3 yahoo_finance.py713 """714 if random_headers:715 site = equity.market_watch(random_headers=True)716 else:717 site = equity.market_watch()718 stock = site.get_stock(symbol,country)719 print()720 for key in stock :721 pad=''722 for x in range(0,20-len(key)):723 pad = pad + ' '724 print(key, pad, ' : ', stock[key])725 print()726 print("\nexample('avst', 'uk', random_headers=False)")727 print('------------------------------------------')728 example('avst', 'uk', random_headers=False)729 print("\nexample('aapl',random_headers=True)")730 print('-----------------------------------')731 example('aapl',random_headers=True)732# yahoo_finance733if __name__ == '__main__' :734 print('\nyahoo_finance')735 print('=============')736 # input('Press enter to continue.')737 def example(symbol,random_headers=False):738 """739 Prints all the available dictionary keys.740 Run from the command line:741 python3 yahoo_finance.py742 """743 if random_headers:744 site = equity.yahoo_finance(random_headers=True)745 else:746 site = equity.yahoo_finance()747 stock = site.get_stock(symbol)748 print()749 for key in stock :750 pad=''751 for x in range(0,20-len(key)):752 pad = pad + ' '753 print(key, pad, ' : ', stock[key])754 print()755 print("\nexample('avst.l', random_headers=False)")756 print('--------------------------------------')757 example('avst.l', random_headers=False)758 print("\nexample('aapl',random_headers=True)")759 print('-----------------------------------')760 example('aapl',random_headers=True)761# business_insider762if __name__ == '__main__' :763 print('\nbusiness_insider')764 print('================')765 # input('Press enter to continue.')766 def example(limit:int, random_headers=False, find:str=None):767 """768 Prints all the available dictionary keys.769 Run from the command line:770 python3 crypto.py771 """772 print()773 print('Example limit: '+ str(limit))774 if random_headers:775 source = crypto.business_insider(random_headers=True)776 else:777 source = crypto.business_insider()778 cryptos = source.get_most_active_cryptos(limit=limit)779 if find != None:780 found = source.find_symbol_for(limit,find)781 for key in cryptos:782 cryptos[key]['name'] = cryptos[key]['name'].ljust(20)783 cryptos[key]['price'] = cryptos[key]['price'].rjust(10)784 cryptos[key]['change'] = cryptos[key]['change'].rjust(8)785 cryptos[key]['change_percent'] = cryptos[key]['change_percent'].rjust(8)786 cryptos[key]['market_cap'] = cryptos[key]['market_cap'].rjust(12)787 cryptos[key]['circulating'] = cryptos[key]['circulating'].rjust(12)788 cryptos[key]['volume'] = cryptos[key]['volume'].rjust(12)789 print(key.rjust(10)+' :',cryptos[key])790 if find != None:791 first_symbol = found[0][0].strip()792 print('\nSearched for: '+find+'\nFound: ',found)793 print('First symbol found is ' + first_symbol + ' for ' + found[0][1].strip() +'.')794 print('The price is $',source.get_price_for(first_symbol))795 print()796 print('Note: In this example the fields have been padded with spaces.')797 print(' The actual data does not have padding.')798 print()799 print("\nexample(4,random_headers=True)")800 print('------------------------------')801 example(4,random_headers=True)802 print("\nexample(limit=4,find='it',random_headers=True))")803 print('-----------------------------------------------')...

Full Screen

Full Screen

file_loader.py

Source:file_loader.py Github

copy

Full Screen

1from lib2to3.pgen2 import token2import os3import pathlib4import requests5from datetime import datetime6import time7PATH_TO_DIR = './data'8SAVED_FILENAME_PATH = 'file_list.txt'9# URL = 'http://192.168.0.1:8001/loader/upload'10URL = 'http://127.0.0.1:8000/loader/upload'11TOKEN = 'fqGuqZMmWSBAeTIawQXrQvls4od0uDhiZa8bJcGV9GI'12DEVICE_ID = 513UPDATE_DELAY_SECONDS = 60*1014def create_file_if_not_exists(SAVED_FILENAME_PATH):15 path = SAVED_FILENAME_PATH16 try:17 file = open(path, 'r')18 file.close()19 print('%s is exists' % (path))20 except IOError:21 file = open(path, 'w')22 file.close()23 print('%s is created' % (path))24def read_saved_filenames(path_to_filename):25 saved_files = []26 with open(path_to_filename, 'r') as f:27 saved_files = [line.rstrip('\n') for line in f]28 return saved_files29def save_filenames(files, path_to_filename):30 path = path_to_filename31 with open(path, 'w') as f:32 for s in files:33 f.write(str(s) + '\n')34def get_files(PATH_TO_DIR, SAVED_FILENAME):35 files = list(pathlib.Path(PATH_TO_DIR).glob('**/*.*'))36 files = [file.__str__() for file in files]37 print(files)38 if SAVED_FILENAME in files:39 print('removing file from list', SAVED_FILENAME)40 files.remove(SAVED_FILENAME)41 if PATH_TO_DIR + '/' + SAVED_FILENAME in files:42 print('removing file from list 2', SAVED_FILENAME)43 files.remove(PATH_TO_DIR + '/' + SAVED_FILENAME)44 return files45def send_data(URL, file, values):46 r = requests.post(URL, files=file, data=values, cookies={'def': 'defvalue'})47 return r48def main():49 create_file_if_not_exists(SAVED_FILENAME_PATH)50 files = get_files(PATH_TO_DIR, SAVED_FILENAME_PATH)51 saved_files = read_saved_filenames(SAVED_FILENAME_PATH)52 print(files)53 print('saved', saved_files)54 for filename in files:55 try:56 if filename in saved_files:57 break58 file = { filename: open(filename) }59 now = datetime.now()60 date_time_str = now.strftime("%Y-%m-%d %H:%M:%S")61 values = {62 'date': date_time_str,63 'token': TOKEN,64 'device': DEVICE_ID,65 }66 print('sending', filename)67 response = send_data(URL, file, values)68 print(response.status_code)69 if response.status_code == 200:70 saved_files.append(filename)71 save_filenames(saved_files, SAVED_FILENAME_PATH)72 except Exception as e:73 print(e)74def loop():75 while True:76 print('running')77 main()78 time.sleep(UPDATE_DELAY_SECONDS)79if __name__ == "__main__":...

Full Screen

Full Screen

tasks.py

Source:tasks.py Github

copy

Full Screen

1import logging2from django.conf import settings3from huey.contrib.djhuey import db_task4from p2kitchen import slack5from p2kitchen.models import Brew6logger = logging.getLogger(__name__)7UPDATE_DELAY_SECONDS = 38@db_task()9def start_brewing(brew: Brew):10 logger.debug("Started brewing")11 message = brew.started_message()12 response = slack.chat_post_message(settings.SLACK_CHANNEL, **message)13 brew.slack_channel = response["channel"]14 brew.slack_ts = response["ts"]15 brew.save()16 update_progress.schedule(args=(brew.pk,), delay=UPDATE_DELAY_SECONDS)17@db_task()18def update_progress(brew_pk):19 logger.debug("Updating brew progress")20 try:21 brew = Brew.objects.get(pk=brew_pk)22 except Brew.DoesNotExist:23 logger.error(f"Critical error! Brew {brew_pk} doesn't exist.")24 return25 if brew.status == Brew.Status.FINISHED.value:26 message = brew.finished_message()27 slack.chat_update(brew.slack_channel, brew.slack_ts, **message)28 return29 elif brew.status == Brew.Status.INVALID.value:30 slack.chat_delete(brew.slack_channel, brew.slack_ts)31 return32 message = brew.update_message()33 slack.chat_update(brew.slack_channel, brew.slack_ts, **message)34 # Keep updating progress...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful