Best Python code snippet using localstack_python
scheduler.py
Source:scheduler.py  
...33        return self.period is not None34    @property35    def is_cancelled(self) -> bool:36        return self._cancelled37    def set_next_deadline(self):38        """39        Internal method to update the next deadline of this task based on the period and the current time.40        """41        if not self.deadline:42            raise ValueError("Deadline was not initialized")43        if self.fixed_rate:44            self.deadline = self.deadline + self.period45        else:46            self.deadline = time.time() + self.period47    def cancel(self):48        self._cancelled = True49    def run(self):50        """51        Executes the task function. If the function raises and Exception, ``on_error`` is called (if set).52        """53        try:54            self.task(*self.args, **self.kwargs)55        except Exception as e:56            if self.on_error:57                self.on_error(e)58class Scheduler:59    """60    An event-loop based task scheduler that can manage multiple scheduled tasks with different periods,61    can be parallelized with an executor.62    """63    POISON = (-1, "__POISON__")64    def __init__(self, executor: Optional[Executor] = None) -> None:65        """66        Creates a new Scheduler. If an executor is passed, then that executor will be used to run the scheduled tasks67        asynchronously, otherwise they will be executed synchronously inside the event loop. Running tasks68        asynchronously in an executor means that they will be effectively executed at a fixed rate (scheduling with69        ``fixed_rate = False``, will have no effect).70        :param executor: an optional executor that tasks will be submitted to.71        """72        super().__init__()73        self.executor = executor74        self._queue = queue.PriorityQueue()75        self._condition = threading.Condition()76    def schedule(77        self,78        func: Callable,79        period: Optional[float] = None,80        fixed_rate: bool = True,81        start: Optional[float] = None,82        on_error: Callable[[Exception], None] = None,83        args: Optional[Union[Tuple, List[Any]]] = None,84        kwargs: Optional[Mapping[str, Any]] = None,85    ) -> ScheduledTask:86        """87        Schedules a given task (function call).88        :param func: the task to schedule89        :param period: the period in which to run the task (in seconds). if not set, task will run once90        :param fixed_rate: whether the to run at a fixed rate (neglecting execution duration of the task)91        :param start: start time92        :param on_error: error callback93        :param args: additional positional arguments to pass to the function94        :param kwargs: additional keyword arguments to pass to the function95        :return: a ScheduledTask instance96        """97        st = ScheduledTask(98            func,99            period=period,100            fixed_rate=fixed_rate,101            start=start,102            on_error=on_error,103            args=args,104            kwargs=kwargs,105        )106        self.schedule_task(st)107        return st108    def schedule_task(self, task: ScheduledTask) -> None:109        """110        Schedules the given task and sets the deadline of the task to either ``task.start`` or the current time.111        :param task: the task to schedule112        """113        task.deadline = max(task.start or 0, time.time())114        self.add(task)115    def add(self, task: ScheduledTask) -> None:116        """117        Schedules the given task. Requires that the task has a deadline set. It's better to use ``schedule_task``.118        :param task: the task to schedule.119        """120        if task.deadline is None:121            raise ValueError122        task._cancelled = False123        with self._condition:124            self._queue.put((task.deadline, task))125            self._condition.notify()126    def close(self) -> None:127        """128        Terminates the run loop.129        """130        with self._condition:131            self._queue.put(self.POISON)132            self._condition.notify()133    def run(self):134        q = self._queue135        cond = self._condition136        executor = self.executor137        poison = self.POISON138        task: ScheduledTask139        while True:140            deadline, task = q.get()141            if (deadline, task) == poison:142                break143            if task.is_cancelled:144                continue145            # wait until the task should be executed146            wait = max(0, deadline - time.time())147            if wait > 0:148                with cond:149                    interrupted = cond.wait(timeout=wait)150                    if interrupted:151                        # something with a potentially earlier deadline has arrived while waiting, so we re-queue and152                        # continue. this could be optimized by checking the deadline of the added element(s) first,153                        # but that would be fairly involved. the assumption is that `schedule` is not invoked frequently154                        q.put((task.deadline, task))155                        continue156            # run or submit the task157            if not task.is_cancelled:158                if executor:159                    executor.submit(task.run)160                else:161                    task.run()162            if task.is_periodic:163                try:164                    task.set_next_deadline()165                except ValueError:166                    # task deadline couldn't be set because it was cancelled167                    continue...m6_demo_plans.py
Source:m6_demo_plans.py  
...17from bluesky import preprocessors as bpp18import datetime19import time20import tqdm21def set_next_deadline(deadline, interval):22    while deadline <= time.time():  # until time is in the future23        deadline += interval24    return deadline25def push_images(num_images=4, frame_rate=10, run_time=300, md={}):26    _md = dict(27        purpose="push TIFF files to PVaccess PV",28        num_images=num_images,29        frame_rate=frame_rate,30        run_time=run_time,31        datetime=str(datetime.datetime.now()),32    )33    _md.update(md)34    # Problems priming, cannot force a new cam image.  Set Capture directly.35    # if not AD_plugin_primed(adpvadet.tiff1):36    #     print("Priming 'adpvadet.tiff1' plugin.")37    #     AD_prime_plugin2(adpvadet.tiff1)38    if "capture" in adpvadet.tiff1.stage_sigs:39        adpvadet.tiff1.stage_sigs.pop("capture")40    print(f"adpvadet.tiff1.stage_sigs={adpvadet.tiff1.stage_sigs}")41    adpvadet.cam.stage_sigs["num_images"] = 1_000_000  # num_images42    adpvadet.tiff1.stage_sigs["num_capture"] = 1_000_000  # num_images43    frame_interval = 1.0 / frame_rate44    # setup custom file names in TIFF plugin45    yield from bps.mv(46        adpvadet.tiff1.file_name, iconfig["BDP_DATA_FILE_NAME"],47        adpvadet.tiff1.file_path, iconfig["BDP_DATA_DIR"],48        adpvadet.tiff1.file_template, iconfig["BDP_DATA_FILE_TEMPLATE"],49    )50    @bpp.stage_decorator([adpvadet])51    @bpp.run_decorator(md=_md)52    def inner_plan():53        yield from bps.mv(adpvadet.tiff1.capture, 1)54        yield from bps.mv(adpvadet.cam.acquire, 1)55        t0 = time.time()56        frame_deadline = t057        run_deadline = t0 + max(0, run_time)58        def detector_stopped():59            result = adpvadet.cam.acquire.get() not in (1, "Acquire")60            if result:61                logger.info(62                    "Stopping 'acquisition' early:"63                    f" {adpvadet.cam.acquire.pvname} stopped."64                )65            return result66        def has_runtime_expired():67            result = time.time() >= run_deadline68            if result:69                logger.info("Run time time complete.")70            return result71        def publish_single_frame(frame):72            yield from bps.null()73            # next call is not a bluesky plan74            img2pva.publish_frame_as_pva(frame)  # runs in thread75        progress_bar = tqdm.tqdm(desc=f"run time: {run_time} seconds.")76        while True:77            "Repeat until run time expires."78            if has_runtime_expired() or detector_stopped():79                break80            for frame in gallery.image_file_list(num_images):81                progress_bar.update()82                if has_runtime_expired() or detector_stopped():83                    break84                yield from img2pva.wait_server(frame_deadline)85                # yield from bps.mv(img2pva, item)86                yield from publish_single_frame(frame)87                yield from img2pva.wait_server()88                yield from bps.create()89                yield from bps.read(adpvadet.cam.array_counter)90                yield from bps.read(adpvadet.cam.array_rate)91                yield from bps.read(adpvadet.pva1.execution_time)92                yield from bps.read(adpvadet.tiff1.execution_time)93                yield from bps.save()94                frame_deadline = set_next_deadline(frame_deadline, frame_interval)95        yield from img2pva.wait_server()96        progress_bar.close()97        yield from bps.mv(adpvadet.tiff1.capture, 0)98        yield from bps.mv(adpvadet.cam.acquire, 0)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
