How to use today_no_time method in localstack

Best Python code snippet using localstack_python

server.py

Source:server.py Github

copy

Full Screen

1# SPDX-FileCopyrightText: 2021 Number61742# SPDX-License-Identifier: Apache-2.03import http.server4from urllib.parse import urlparse5from urllib.parse import parse_qs6import datetime7import json8import logging9from dateutil.parser import parse10import pynput11import threading12import time13class WritingHttpRequestHandler(http.server.SimpleHTTPRequestHandler):14 def log_error(self, format, *args):15 global logger16 logger.error(format%args)17 18 def log_message(self, format, *args):19 pass20 def do_GET(self):21 global logger22 # Handle the request23 url = urlparse(self.path)24 if url.path == '/':25 self.path = 'control_panel.html'26 return http.server.SimpleHTTPRequestHandler.do_GET(self)27 elif url.path == '/favicon.ico':28 return http.server.SimpleHTTPRequestHandler.do_GET(self)29 elif url.path.startswith('/api'):30 self.handle_api()31 elif url.path == '/event':32 logger.debug('%s', url)33 self.handle_event()34 elif url.path =='/keypress':35 logger.debug('%s', url)36 self.handle_keypress()37 elif url.path == '/write':38 logger.debug('%s', url)39 self.handle_write()40 else:41 logger.debug("Ignoring request to " + self.path)42 self.send_error(404)43 def do_PUT(self):44 global logger45 # Acquire the PUT data46 content_length = int(self.headers['Content-Length'])47 raw_data = self.rfile.read(content_length)48 data = json.loads(raw_data, parse_float=float, parse_int=int)49 url = urlparse(self.path)50 if url.path == '/api/configwriter/timer_data':51 with open('timer_data.json', 'w') as f:52 json.dump(data, f, indent=4)53 logger.debug('Updated timer_data.json')54 elif url.path == '/api/configwriter/config':55 logger.debug("About to update config.json with %s", data)56 with open('config.json', 'r') as f:57 existing = json.load(f)58 # Copy the two settings we don't allow changing by the control panel59 data['host'] = existing['host']60 data['port'] = existing['port']61 with open('config.json', 'w') as f:62 json.dump(data, f, indent=4)63 64 # Reload config for rest of program65 global config66 config = readConfig('config.json')67 else:68 logger.debug("Ignoring PUT to " + self.path)69 self.send_error(400)70 return71 self.success_response()72 def success_response(self):73 # Write response74 self.send_response(200)75 self.send_header("Content-type", "text/plain")76 self.end_headers()77 body = f"Success"78 self.wfile.write(bytes(body, "utf8"))79 def handle_api(self):80 global logger81 url = urlparse(self.path)82 if url.path == '/api/timer':83 # Open timer_data.json84 with open('timer_data.json') as f:85 data = json.load(f)86 self.send_response(200)87 self.send_header("Content-type", "application/json")88 self.send_header("Access-Control-Allow-Origin", "*")89 self.end_headers()90 self.wfile.write(bytes(json.dumps(data, indent=4), "utf8"))91 return92 elif url.path == '/api/resettimer':93 reset_timer()94 logger.info("Manual timer reset via /api/resettimer")95 self.success_response()96 return97 elif url.path == '/api/config':98 # Open timer_data.json99 with open('config.json') as f:100 data = json.load(f)101 self.send_response(200)102 self.send_header("Content-type", "application/json")103 self.end_headers()104 self.wfile.write(bytes(json.dumps(data, indent=4), "utf8"))105 return106 elif url.path == '/api/events':107 with open('logs/events.log') as f:108 self.send_response(200)109 self.send_header("Content-type", "text/plain")110 self.send_header("Access-Control-Allow-Origin", "*")111 self.end_headers()112 self.wfile.write(bytes(f.read(), 'utf-8'))113 return114 elif url.path == '/api/twitch/id_to_name':115 query = parse_qs(urlparse(self.path).query)116 if 'id' not in query:117 self.send_error(400)118 return119 response = twitch_id_to_name_cached(query['id'][0])120 self.send_response(200)121 self.send_header("Content-type", "text/plain")122 self.send_header("Access-Control-Allow-Origin", "*")123 self.end_headers()124 self.wfile.write(bytes(response, "utf8"))125 return126 elif url.path == '/api/version':127 self.send_response(200)128 self.send_header("Content-type", "application/json")129 self.end_headers()130 self.wfile.write(bytes(version_string(), "utf8"))131 return132 logger.error('Unknown API call ' + url.path)133 self.send_error(400)134 def handle_event(self):135 global logger136 # Extract query137 query = parse_qs(urlparse(self.path).query)138 global config139 if 'train' in query:140 # Some kind of hype train event141 type = query['train'][0]142 if type == 'start':143 logger.info('Hype Train Start')144 elif type == 'end':145 sub_conductor_id = query['sub_conductor'][0]146 sub_conductor = twitch_id_to_name_cached(sub_conductor_id)147 bit_conductor_id = query['bit_conductor'][0]148 bit_conductor = twitch_id_to_name_cached(bit_conductor_id)149 logger.info("Hype Train End. Sub conductor %s, Bit conductor %s", sub_conductor, bit_conductor)150 elif type == 'progress':151 level = query['level'][0]152 progress = query['progress'][0]153 total = query['total'][0]154 logger.info("Hype Train Progress: Level %s Progress %s Total %s", level, progress, total)155 elif type == 'conductor':156 sub_conductor_id = query['sub_conductor'][0]157 sub_conductor = twitch_id_to_name_cached(sub_conductor_id)158 bit_conductor_id = query['bit_conductor'][0]159 bit_conductor = twitch_id_to_name_cached(bit_conductor_id)160 logger.info("Hype Train Conductor. Sub conductor %s, Bit conductor %s", sub_conductor, bit_conductor)161 elif type == 'cooldownover':162 logger.info('Hype Train Cooldown Over')163 else:164 # Not a recognized type165 self.send_error(400)166 return167 else:168 # Non-hype train event169 adjust = 0170 points = 0171 points_set = False172 if 'bits' in query:173 name = query['name'][0]174 amount = query['bits'][0]175 points = int(config['event']['per100bits'] * int(amount) / 100)176 message = query['message'][0]177 logger.info('\'' + name + '\' cheered ' + amount + ' bits (' + str(points) + ' points): ' + message)178 elif 'sub' in query:179 type = query['sub'][0]180 if type == 'self':181 name = query['name'][0]182 tier = query['tier'][0]183 if tier == 'Prime':184 points = int(config['event']['prime-sub'])185 elif tier == 'Tier 1' or tier == '1000':186 points = int(config['event']['t1-sub'])187 elif tier == 'Tier 2' or tier == '2000':188 points = int(config['event']['t2-sub'])189 elif tier == 'Tier 3' or tier == '3000':190 points = int(config['event']['t3-sub'])191 else:192 # Invalid tier, assuming Tier 1193 points = int(config['event']['t1-sub'])194 logger.debug('Have a self sub with a invalid tier' + str(query))195 logger.info('Invalid tier of ' + tier + ' treating it as Tier 1')196 months = query['months'][0]197 message = query['message'][0]198 logger.info('\'' + name + '\' subbed ' + tier + ' for ' + months + ' months (' + str(points) + ' points): ' + message)199 elif type == 'gift':200 gifter = query['gifter'][0]201 recipient = query['recipient'][0]202 tier = query['tier'][0]203 if tier == 'Tier 1' or tier == '1000':204 points = int(config['event']['t1-sub'])205 elif tier == 'Tier 2' or tier == '2000':206 points = int(config['event']['t2-sub'])207 elif tier == 'Tier 3' or tier == '3000':208 points = int(config['event']['t3-sub'])209 else:210 # Invalid tier, assuming Tier 1211 points = int(config['event']['t1-sub'])212 logger.debug('Have a gift sub with a invalid tier' + str(query))213 logger.info('Invalid tier of ' + tier + ' treating it as Tier 1')214 months = query['months'][0]215 logger.info('\'' + gifter + '\' gifted ' + tier + ' to \'' + recipient + '\' who now has ' + months + ' months (' + str(points) + ' points)')216 elif type == 'community':217 name = query['name'][0]218 quantity = int(query['quantity'][0])219 tier = query['tier'][0]220 if tier == 'Tier 1' or tier == '1000':221 points = int(config['event']['t1-sub']) * quantity222 elif tier == 'Tier 2' or tier == '2000':223 points = int(config['event']['t2-sub']) * quantity224 elif tier == 'Tier 3' or tier == '3000':225 points = int(config['event']['t3-sub']) * quantity226 else:227 # Invalid tier, assuming Tier 1228 points = int(config['event']['t1-sub']) * quantity229 logger.debug('Have a community gift sub with a invalid tier' + str(query))230 logger.info('Invalid tier of ' + tier + ' treating it as Tier 1')231 logger.info('\'' + name + '\' gifted ' + tier + ' to ' + str(quantity) + ' people (' + str(points) + ' points)')232 pass233 else:234 logger.debug('Have a sub with invalid type, skipping')235 self.send_error(400)236 return237 elif 'tip' in query:238 name = query['name'][0]239 amount = float(query['tip'][0])240 amount_after_fees = amount * (1.0 - config['event']['tip-fee-percent'] / 100) - config['event']['tip-fee-fixed']241 points = int(config['event']['perdollartip'] * amount_after_fees)242 message = query['message'][0]243 logger.info('\'' + name + '\' tipped ' + str(amount) + ' (' + str(points) + ' points): ' + message)244 elif 'time' in query:245 direction = 0246 if query['time'][0] == 'increase':247 direction = 1248 elif query['time'][0] == 'decrease':249 direction = -1250 else:251 #Invalid252 logger.debug('Event time with invalid value')253 self.send_response(400)254 return255 if 'amount' not in query:256 logger.debug('No amount specified')257 self.send_response(400)258 return259 today_no_time = datetime.datetime.combine(datetime.date.today(), datetime.time())260 amount = (parse(query['amount'][0]) - today_no_time).total_seconds()261 adjust = int(direction * amount)262 logger.info('Adjust timer by ' + str(adjust) + ' seconds')263 elif 'points' in query:264 direction = 0265 if query['points'][0] == 'increase':266 direction = 1267 elif query['points'][0] == 'decrease':268 direction = -1269 elif query['points'][0] == 'set':270 direction = 1271 points_set = True272 else:273 #Invalid274 logger.debug('Event points with invalid value')275 self.send_response(400)276 return277 if 'amount' not in query:278 logger.debug('Points even with no amount')279 self.send_response(400)280 return281 points = direction * int(query['amount'][0])282 if points_set:283 logger.info('Set timer to ' + str(points) + ' points')284 else:285 logger.info('Adjust timer by ' + str(points) + ' points')286 # Read in timer info287 timer_data = {}288 with open('timer_data.json') as f:289 timer_data = json.load(f)290 # Add to total291 if points_set:292 timer_data['points-funded'] = points293 else:294 timer_data['points-funded'] += points295 timer_data['time-adjust'] += adjust296 # Calculate stream end297 stream_start = datetime.datetime.fromisoformat(timer_data['stream-start'])298 stream_end = datetime.datetime.fromisoformat(timer_data['stream-end'])299 percent_funded = timer_data['points-funded'] / timer_data['points-to-fully-fund']300 time_adjust = datetime.timedelta(seconds=timer_data['time-adjust'])301 timer_start = datetime.timedelta(seconds=timer_data['timer-start'])302 time_fundable = datetime.timedelta(seconds=timer_data['time-fundable'])303 current_end = stream_start + timer_start + percent_funded * time_fundable + time_adjust304 # Do not exceed stream end305 if current_end > stream_end:306 current_end = stream_end307 # Convert back to ISO Format and write out308 timer_data['current-end'] = current_end.isoformat(timespec='milliseconds')309 logger.debug('Timer info is now ' + json.dumps(timer_data))310 with open('timer_data.json', 'w') as f:311 json.dump(timer_data, f, indent=4)312 313 self.success_response()314 def handle_keypress(self):315 global logger316 from pynput.keyboard import Key317 # Extract query318 query = parse_qs(urlparse(self.path).query)319 modifiers_map = {320 'alt': Key.alt, 'alt_gr': Key.alt_gr, 'alt_l': Key.alt_l, 'alt_r': Key.alt_r, 'ctrl': Key.ctrl,321 'ctrl_l': Key.ctrl_l, 'ctrl_r': Key.ctrl_r, 'shift': Key.shift, 'shift_l': Key.shift_l,322 'shift_r': Key.shift_r, 'super': Key.cmd323 }324 key_map = {325 'f1': Key.f1, 'f2': Key.f2, 'f3': Key.f3, 'f4': Key.f4, 'f5': Key.f5, 'f6': Key.f6, 'f7': Key.f7,326 'f8': Key.f8, 'f9': Key.f9, 'f10': Key.f10, 'f11': Key.f11, 'f12': Key.f12, 'f13': Key.f13,327 'f14': Key.f14, 'f15': Key.f15, 'f16': Key.f16, 'f17': Key.f17, 'f18': Key.f18, 'f19': Key.f19,328 'f20': Key.f20329 }330 # Determine key to press331 modifiers = []332 if 'mod' in query:333 modifiers = [modifiers_map[x] for x in query['mod']]334 if 'key' not in query:335 self.log_message("Requested keypress with no key")336 self.send_error(400)337 return338 keystring = query['key'][0]339 key = ''340 if keystring in 'abcdefghijklmnopqrstuvwxyz0123456789':341 key = keystring342 elif keystring in key_map:343 key = key_map[keystring]344 else:345 logger.debug("Unknown key: %s", keystring)346 self.send_error(400)347 return348 repeat = 1349 if 'repeat' in query:350 repeat = int(query['repeat'][0])351 delay = 0352 if 'delay' in query and repeat != 1:353 delay = int(query['delay'][0]) / 1000354 logger.debug("Pressing key %s with modifiers %s and repeat of %i and delay %i", key, modifiers, repeat, delay)355 x = threading.Thread(target=keypressWorker, args=(modifiers, key, repeat, delay))356 x.start()357 self.success_response()358 def handle_write(self):359 global logger360 # Extract query361 query = parse_qs(urlparse(self.path).query)362 # Require both filename and data363 if not 'filename' in query:364 logger.debug("Missing query parameter filename")365 self.send_error(404)366 return367 if not 'data' in query:368 logger.debug("Missing query parameter data")369 self.send_error(404)370 return371 # Optional mode372 mode = 'w'373 if 'mode' in query:374 if query['mode'][0] == 'a':375 mode = 'a'376 elif query['mode'][0] != 'w':377 logger.debug("Unknown mode parameter '" + query['mode'][0] + "'")378 self.send_error(404)379 return380 filename = query['filename'][0]381 data = query['data'][0]382 # Optional timestamp and per line383 if 'log' in query:384 data = datetime.datetime.now().isoformat() + ' - ' + data + '\n'385 # Actually write to the file386 with open(filename, mode) as w:387 w.write(data)388 if mode == 'w':389 logger.debug("'" + filename + "' was overwritten with '" + data + "'")390 elif mode == 'a':391 logger.debug("'" + filename + "' was appended with '" + data + "'")392 self.success_response()393def keypressWorker(modifiers, key, repeat, delay):394 keyboard = pynput.keyboard.Controller()395 for i in range(repeat):396 # Press modifiers397 for m in modifiers:398 keyboard.press(m)399 # Press key400 keyboard.tap(key)401 # Release modifiers402 for m in modifiers:403 keyboard.release(m)404 time.sleep(delay)405 406def readConfig(filename):407 with open(filename) as f:408 config = json.load(f)409 # Convert inputs into datetime and timedeltas as appropriate410 411 config['timer']['stream-start'] = parse(config['timer']['stream-start'])412 # dateutil.parser will assume today's date if given something like "30s", subtract413 # off today's date to get a correct duration414 today_no_time = datetime.datetime.combine(datetime.date.today(), datetime.time())415 config['timer']['timer-start'] = parse(config['timer']['timer-start']) - today_no_time416 config['timer']['time-fundable'] = parse(config['timer']['time-fundable']) - today_no_time417 return config418def ensureTimerSetup(config):419 import os.path420 if os.path.exists('timer_data.json'):421 with open('timer_data.json') as f:422 timer_data = json.load(f)423 # Convert into datetimes424 timer_data['stream-start'] = parse(timer_data['stream-start'])425 timer_data['stream-end'] = parse(timer_data['stream-end'])426 timer_data['current-end'] = parse(timer_data['current-end'])427 now = datetime.datetime.now()428 if now < timer_data['current-end']:429 print("Since current time is before end in timer_data.json, assuming script restarted during stream")430 return431 reset_timer()432def reset_timer():433 global config434 # No existing timer data or expired435 timer_data = {}436 timer_data['stream-start'] = config['timer']['stream-start'].isoformat()437 timer_data['stream-end'] = (config['timer']['stream-start'] + config['timer']['timer-start'] + config['timer']['time-fundable']).isoformat()438 timer_data['current-end'] = (config['timer']['stream-start'] + config['timer']['timer-start']).isoformat()439 timer_data['points-funded'] = 0440 timer_data['points-to-fully-fund'] = config['timer']['points-to-fully-fund']441 timer_data['time-adjust'] = 0442 timer_data['timer-start'] = config['timer']['timer-start'].total_seconds()443 timer_data['time-fundable'] = config['timer']['time-fundable'].total_seconds()444 with open('timer_data.json', 'w') as f:445 json.dump(timer_data, f, indent=4)446twitch_id_cache = {}447def twitch_id_to_name_cached(id):448 global twitch_id_cache449 # Check if already cached450 if id in twitch_id_cache:451 return twitch_id_cache[id]452 453 import requests454 r = requests.get('https://customapi.aidenwallis.co.uk/api/v1/twitch/toName/' + id)455 twitch_id_cache[id] = r.text456 global logger457 logger.debug ('Convert id %s to %s', id, r.text)458 return r.text459def version_string():460 return "0.0.9"461def setup_logging():462 import logging.handlers463 # Ensure there is a logs directory464 import os465 os.makedirs('logs', exist_ok=True)466 # Setup a rotating debug log467 debug_handler = logging.handlers.TimedRotatingFileHandler(468 filename='logs/debug.log',469 when='d',470 interval=1,471 backupCount=10472 )473 debug_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')474 debug_handler.setFormatter(debug_formatter)475 debug_handler.setLevel(logging.DEBUG)476 # Setup rotating event log477 event_handler = logging.handlers.TimedRotatingFileHandler(478 filename='logs/events.log',479 when='d',480 interval=1,481 )482 event_formatter = logging.Formatter('%(asctime)s - %(message)s')483 event_handler.setFormatter(event_formatter)484 event_handler.setLevel(logging.INFO)485 # Attach handlers486 logger = logging.getLogger('root')487 logger.setLevel(logging.DEBUG)488 logger.addHandler(debug_handler)489 logger.addHandler(event_handler)490 return logger491def startServer():492 global logger493 logger = setup_logging()494 global config495 config = readConfig('config.json')496 ensureTimerSetup(config)497 498 print("WritingHTTPServer " + version_string() + " by Number6174")499 print("Server started on http://" + config['host'] + ":" + str(config['port']))500 print("Use CTRL+C to stop")501 logger.info('Server startup. Version ' + version_string() + ' listening on ' + config['host'] + ':' + str(config['port']))502 # Setup server503 handler = WritingHttpRequestHandler504 server = http.server.HTTPServer((config['host'], config['port']), handler)505 # Start the server506 server.serve_forever()507if __name__ == "__main__":...

Full Screen

Full Screen

time.py

Source:time.py Github

copy

Full Screen

...31def now(millis: bool = False, tz: Optional[tzinfo] = None) -> int:32 return mktime(datetime.now(tz=tz), millis=millis)33def now_utc(millis: bool = False) -> int:34 return now(millis, timezone.utc)35def today_no_time() -> int:36 return mktime(datetime.combine(date.today(), datetime.min.time()))37def mktime(ts: datetime, millis: bool = False) -> int:38 if millis:39 return int(ts.timestamp() * 1000)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful