Best Python code snippet using localstack_python
api.py
Source:api.py  
...97        return send_file(path_to_dir, ts)98    elif not os.path.exists(path_to_ts):99        return do_job(path_to_dir, token, index, ts)100@app.route("/cancel/<token>/<index>/<ts>", methods=["GET"])101def cancel_job(token, index, ts):102    path_to_dir = os.path.join(PATH, token)103    with open(os.path.join(path_to_dir, "job_ids.txt"), "r") as f:104        job_ids = f.read().splitlines()105    string = [el for el in job_ids if f"{token} {index} {ts}" in el][0]106    cancel_job_id = string.split()[0]107    cancel_job = q.fetch_job(cancel_job_id)108    if cancel_job is not None:109        logging.info(f"CANCEL_JOB is not None")110        logging.info(cancel_job)111        logging.info(cancel_job.get_status())112        logging.info(f"CANCEL_JOB enqueued_at: {cancel_job.enqueued_at}")113        cancel_job.kill()114        logging.info("JOB CANCEL")115        logging.info(cancel_job)...pomodoro.py
Source:pomodoro.py  
1from talon import Module, imgui, cron, actions2import math3import threading4import time5from typing import Optional6module = Module()7lock = threading.Lock()8start_time = None9current_duration = None10pomodoro_type = None11pause_time = None12finished = False13cancel_job = None14# TODO: Better placement15@imgui.open(y=20, x=5)16def gui(gui: imgui.GUI):17    global cancel_job18    if pomodoro_type is not None:19        with lock:20            current_time = pause_time if pause_time else time.monotonic()21            remaining_time = math.ceil(22                (current_duration + start_time - current_time) / 6023            )24            if remaining_time <= 0:25                # Flash effect26                flashes_per_second = 1.527                suffix = (28                    "FINISHED" if int(time.monotonic() * flashes_per_second) % 2 else ""29                )30                gui.text(f"{pomodoro_type} -- {suffix}")31            else:32                # remaining_time = (current_duration + start_time - current_time) / 6033                gui.text(f"{pomodoro_type} {remaining_time:02d}")34def delete_cancel_cron():35    global cancel_job36    try:37        cron.cancel(cancel_job)38    except:39        pass40    cancel_job = None41def check_pomodoro():42    global sound_job43    with lock:44        if start_time and time.monotonic() > start_time + current_duration:45            # TODO: Play alarm46            delete_cancel_cron()47            finished = True48            # TODO: Cron this?49            for i in range(4):50                # Implementation at time of writing can't play parallel sounds,51                # so this will repeat the ding.52                actions.user.play_bell_high()53@module.action_class54class Actions:55    def pomodoro_start(type_: Optional[str] = "W", time_: Optional[int] = 25 * 60):56        """Start a pomodoro of `type` of length `time`."""57        global start_time, pomodoro_type, pause_time, finished, cancel_job, current_duration58        with lock:59            delete_cancel_cron()60            cancel_job = cron.interval("1s", check_pomodoro)61            start_time = time.monotonic()62            pomodoro_type = type_63            current_duration = time_64            pause_time = None65            finished = False66        gui.show()67    def pomodoro_pause():68        """Pause the active pomodoro."""69        global pause_time70        with lock:71            if not pause_time:72                pause_time = time.monotonic()73    def pomodoro_unpause():74        """Unpause the active pomodoro."""75        global pause_time, current_time76        with lock:77            if pause_time:78                current_time -= pause_time - time.monotonic()79                pause_time = None80    def pomodoro_cancel():81        """Cancel the active pomodoro."""82        global start_time, pomodoro_type, pause_time, current_duration83        with lock:84            delete_cancel_cron()85            if not pomodoro_type:86                raise RuntimeError("No pomodoro running.")87            pomodoro_type = None88            start_time = None89            pause_time = None90            current_duration = None...TorqueDriver.py
Source:TorqueDriver.py  
...43    return self.commands[self.SEND_JOB].format(job_name, queue, args)44  def _get_job_id(self, stdout):45    job_id = stdout.read()46    return job_id47  def cancel_job(self, job_name):48    '''49    Try to cancel job_name50    '''51    cancel_job = self.get_cancel_job_command(job_name)52    stdin, stdout, stderr = self.transport.exec_command(cancel_job)53  def get_cancel_job_command(self, job_name):54    '''55    Build the string for cancel_job command56    '''57    return self.commands[self.CANCEL_JOB].format(job_name)58  def get_job_status(self, job_name, remotePath = None):59    '''60    Query the system to obtain info about the status of job_name61    '''...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
