How to use cancel_job method in localstack

Best Python code snippet using localstack_python

api.py

Source:api.py Github

copy

Full Screen

...97 return send_file(path_to_dir, ts)98 elif not os.path.exists(path_to_ts):99 return do_job(path_to_dir, token, index, ts)100@app.route("/cancel/<token>/<index>/<ts>", methods=["GET"])101def cancel_job(token, index, ts):102 path_to_dir = os.path.join(PATH, token)103 with open(os.path.join(path_to_dir, "job_ids.txt"), "r") as f:104 job_ids = f.read().splitlines()105 string = [el for el in job_ids if f"{token} {index} {ts}" in el][0]106 cancel_job_id = string.split()[0]107 cancel_job = q.fetch_job(cancel_job_id)108 if cancel_job is not None:109 logging.info(f"CANCEL_JOB is not None")110 logging.info(cancel_job)111 logging.info(cancel_job.get_status())112 logging.info(f"CANCEL_JOB enqueued_at: {cancel_job.enqueued_at}")113 cancel_job.kill()114 logging.info("JOB CANCEL")115 logging.info(cancel_job)...

Full Screen

Full Screen

pomodoro.py

Source:pomodoro.py Github

copy

Full Screen

1from talon import Module, imgui, cron, actions2import math3import threading4import time5from typing import Optional6module = Module()7lock = threading.Lock()8start_time = None9current_duration = None10pomodoro_type = None11pause_time = None12finished = False13cancel_job = None14# TODO: Better placement15@imgui.open(y=20, x=5)16def gui(gui: imgui.GUI):17 global cancel_job18 if pomodoro_type is not None:19 with lock:20 current_time = pause_time if pause_time else time.monotonic()21 remaining_time = math.ceil(22 (current_duration + start_time - current_time) / 6023 )24 if remaining_time <= 0:25 # Flash effect26 flashes_per_second = 1.527 suffix = (28 "FINISHED" if int(time.monotonic() * flashes_per_second) % 2 else ""29 )30 gui.text(f"{pomodoro_type} -- {suffix}")31 else:32 # remaining_time = (current_duration + start_time - current_time) / 6033 gui.text(f"{pomodoro_type} {remaining_time:02d}")34def delete_cancel_cron():35 global cancel_job36 try:37 cron.cancel(cancel_job)38 except:39 pass40 cancel_job = None41def check_pomodoro():42 global sound_job43 with lock:44 if start_time and time.monotonic() > start_time + current_duration:45 # TODO: Play alarm46 delete_cancel_cron()47 finished = True48 # TODO: Cron this?49 for i in range(4):50 # Implementation at time of writing can't play parallel sounds,51 # so this will repeat the ding.52 actions.user.play_bell_high()53@module.action_class54class Actions:55 def pomodoro_start(type_: Optional[str] = "W", time_: Optional[int] = 25 * 60):56 """Start a pomodoro of `type` of length `time`."""57 global start_time, pomodoro_type, pause_time, finished, cancel_job, current_duration58 with lock:59 delete_cancel_cron()60 cancel_job = cron.interval("1s", check_pomodoro)61 start_time = time.monotonic()62 pomodoro_type = type_63 current_duration = time_64 pause_time = None65 finished = False66 gui.show()67 def pomodoro_pause():68 """Pause the active pomodoro."""69 global pause_time70 with lock:71 if not pause_time:72 pause_time = time.monotonic()73 def pomodoro_unpause():74 """Unpause the active pomodoro."""75 global pause_time, current_time76 with lock:77 if pause_time:78 current_time -= pause_time - time.monotonic()79 pause_time = None80 def pomodoro_cancel():81 """Cancel the active pomodoro."""82 global start_time, pomodoro_type, pause_time, current_duration83 with lock:84 delete_cancel_cron()85 if not pomodoro_type:86 raise RuntimeError("No pomodoro running.")87 pomodoro_type = None88 start_time = None89 pause_time = None90 current_duration = None...

Full Screen

Full Screen

TorqueDriver.py

Source:TorqueDriver.py Github

copy

Full Screen

...43 return self.commands[self.SEND_JOB].format(job_name, queue, args)44 def _get_job_id(self, stdout):45 job_id = stdout.read()46 return job_id47 def cancel_job(self, job_name):48 '''49 Try to cancel job_name50 '''51 cancel_job = self.get_cancel_job_command(job_name)52 stdin, stdout, stderr = self.transport.exec_command(cancel_job)53 def get_cancel_job_command(self, job_name):54 '''55 Build the string for cancel_job command56 '''57 return self.commands[self.CANCEL_JOB].format(job_name)58 def get_job_status(self, job_name, remotePath = None):59 '''60 Query the system to obtain info about the status of job_name61 '''...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful