How to use check_stream_status method in localstack

Best Python code snippet using localstack_python

main.py

Source:main.py Github

copy

Full Screen

...54 self.get_fh4_stream() # Sets current_channel values55 self.update_mixer_channel() # Changes to the current channel56 while True:57 # This will automatically tune to the offical forza stream58 offical_forza_status = self.check_stream_status("Forza")59 if(offical_forza_status):60 if(self.current_channel['token'] != "Forza"):61 self.current_channel['token'] = "Forza"62 self.update_mixer_channel()63 channel_online_status = self.check_stream_status(self.current_channel['token'])64 if(channel_online_status == True):65 pass66 else:67 self.get_fh4_stream()68 self.update_mixer_channel()69 time.sleep(60)70 def init_setup(self):71 # This function will wait for the user to login and then save the session cookies to a local file72 # Saving the cookie prevents the user from having to login everytime73 print('press <Enter> after you\'ve logged in') # Wait for user to login74 login_prompt = input() # Block until user finishes login75 pickle.dump( self.browser.get_cookies() , open("cookies.pkl","wb"))76 self.logger.info("dumped cookies.pkl")77 return78 def load_cookies(self):79 cookies = pickle.load(open("cookies.pkl", "rb"))80 for cookie in cookies:81 if 'expiry' in cookie:82 del cookie['expiry']83 self.browser.add_cookie(cookie)84 def update_mixer_channel(self):85 channel_url = 'https://mixer.com/' + self.current_channel['token']86 self.browser.get(channel_url)87 print("{} | Now watching: {}".format(time.strftime('%H:%M:%S'), self.current_channel['token']))88 self.logger.info("watching: {} {}".format(self.current_channel['token'], channel_url))89 return90 def get_fh4_stream(self):91 # This function will query the mixer API for FH4 streams, and return a channel name of one that is not a 'farm'92 # This route returns the top 50 streams for fh493 api_request = 'https://mixer.com/api/v1/types/559325/channels?order=viewersCurrent:DESC&limit=50'94 r = requests.get(api_request)95 data = json.loads(r.content.decode('utf-8'))96 for channel in data:97 try:98 if not(bool([i for i in self.blacklist if(i in channel['name'].lower())])) and self.check_stream_status(channel['token']) == True:99 self.current_channel['token'] = channel['token']100 self.current_channel['userId'] = channel['userId']101 return102 else:103 pass104 except UnicodeEncodeError as e:105 print(e)106 self.logger.warning(e)107 ## TODO: If the script reaches this point, it has not found any channels, need a solution to mitgate this108 print("No online or valid channels!")109 self.logger.critical("no online or valid channels found!")110 return111 def check_stream_status(self, token):112 try:113 api_request = 'https://mixer.com/api/v1/channels/' + token114 r = requests.get(api_request)115 data = json.loads(r.content.decode('utf-8'))116 if(data['online'] == True):117 return True118 else:119 return False120 except Exception as e:121 print(e)122 self.logger.warning(e)123 return False 124try:125 influence_farm_bot().run()...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

...16class nginxRTMPMonitor(Skill):17 def __init__(self, *args, **kwargs):18 super(nginxRTMPMonitor, self).__init__(*args, **kwargs)19 self.bot_was_last_message = False20 self.bot_thinks_stream_is_up = self.check_stream_status()21 if self.bot_thinks_stream_is_up:22 self.stream_since_when = datetime.datetime.today() - datetime.timedelta(hours=1)23 else:24 self.stream_since_when = False25 ##################################################################26 #27 # 1. Avoid spamming28 # The bot notifies if a stream is ongoing every hour29 # if no one posts within that hour, it is superfluous;30 # this functionality prevents that.31 #32 ##################################################################33 async def avoid_spam_send(self, msg):34 if not self.bot_was_last_message:35 await self.opsdroid.send(36 Message(37 text=msg,38 target=self.config.get('room_notify')39 )40 )41 self.bot_was_last_message = True42 else:43 pass44 @match_always45 async def who_last_said(self, event):46 if event.target == self.config.get('room_notify'):47 self.bot_was_last_message = False48 ##################################################################49 #50 # 1. Stream monitoring51 # Monitors stream52 #53 ##################################################################54 def check_stream_status(self):55 return bool(int(requests.get(self.config.get('stream_status_url')).text[0]))56 def take_stream_screenshot(self):57 pass58 @match_webhook('stream')59 async def streamwebhookskill(self, event: Request):60 # Capture the post data61 data = await event.json()62 if data['stream_state_change'] == 'start':63 await self.opsdroid.send(64 Message(65 text='<h1>⚡️ STARTED <a href="' + self.config.get('stream_url') + '">STREAMIN\'</a> ⚡️</h1>',66 target=self.config.get('room_notify')67 )68 )69 self.bot_thinks_stream_is_up = True70 self.stream_since_when = datetime.datetime.today()71 elif data['stream_state_change'] == 'stop':72 await self.opsdroid.send(73 Message(74 text='<h1>⚰️ STREAM OVER ⚰️</h1>',75 target=self.config.get('room_notify')76 )77 )78 self.bot_thinks_stream_is_up = False79 self.stream_since_when = False80 @match_crontab('* * * * *', timezone="Europe/Zurich")81 async def stream_ongoing(self, event):82 if self.bot_thinks_stream_is_up:83 stream_up = self.check_stream_status()84 if stream_up:85 if (datetime.datetime.today() - self.stream_since_when) > datetime.timedelta(hours=1):86 await self.avoid_spam_send(87 '<h1>⚡️ <a href="' + self.config.get('stream_url') + '">STREAMIN\'</a> ⚡️</h1>'88 )...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful