Best Python code snippet using autotest_python
context.py
Source:context.py  
1#!/usr/bin/env python2# encoding: utf-83# Created by zza on 2021/6/24 16:384# Copyright 2021 LinkSense Technology CO,. Ltd5from __future__ import annotations6import asyncio7import datetime8import traceback9from typing import TYPE_CHECKING, Dict, ItemsView, Optional, Type10from lk_flow.config import Config11from lk_flow.core.event import EVENT, Event, EventBus12from lk_flow.env import logger13from lk_flow.errors import (14    DuplicateModError,15    DuplicateTaskNameError,16    LkFlowBaseError,17    ModNotFoundError,18    TaskNotFoundError,19)20from lk_flow.models import SubProcess, Task21if TYPE_CHECKING:  # pragma: no cover22    # https://stackoverflow.com/questions/39740632/python-type-hinting-without-cyclic-imports23    from lk_flow.core import ModAbstraction24class Context:25    _env: Context = None26    def __init__(self, config: Config):27        Context._env = self28        self.loop_enable: bool = True29        self.config: Config = config30        self.event_bus: EventBus = EventBus()31        self.sleep_time: int = config.sleep_time32        self.system_start_time: datetime.datetime = datetime.datetime.now()33        # Process éå34        self._PROCESS_ALL = {}  # ææTask35        self._PROCESS_RUNNING = {}  # æ£å¨è·ç36        self._PROCESS_STOPPED = {}  # æªæ¿æ´» or å·²ç»æç37        self._mod_map: Dict[str, Type["ModAbstraction"]] = {}38        # add add_listener39        self.event_bus.add_listener(EVENT.EXEC_SYSTEM_CLOSE, self._close_loop)40        self.event_bus.add_listener(EVENT.HEARTBEAT, self.state_check)41    @classmethod42    def get_instance(cls) -> Context:43        """44        è¿åå·²ç»å建ç Context 对象45        >>> Context.get_instance()46        Traceback (most recent call last):47            ...48        RuntimeError: Context has not been created. Please Use `Context.get_instance()` after lk_flow init49        """50        if cls._env is None:51            raise RuntimeError(52                "Context has not been created. Please Use `Context.get_instance()` after lk_flow init"53            )54        return cls._env55    # Mod56    def get_mod_map(self) -> ItemsView[str, Type["ModAbstraction"]]:57        return self._mod_map.items()58    def add_mod(self, mod_name: str, mod: Type["ModAbstraction"]) -> None:59        if mod_name in self._mod_map.keys():60            _message = (61                f"DuplicateModError at name = {mod_name}, {self._mod_map[mod_name]}"62            )63            raise DuplicateModError(_message)64        self._mod_map[mod_name] = mod65    def get_mod(self, mod_name: str) -> Type[Type["ModAbstraction"]]:66        if mod_name not in self._mod_map.keys():67            raise ModNotFoundError(f"{mod_name} not found")68        return self._mod_map[mod_name]69    # Process70    def get_process(self, task_name: str) -> SubProcess:71        if task_name not in self._PROCESS_ALL:72            raise TaskNotFoundError(f"Task {task_name} not found")73        return self._PROCESS_ALL[task_name]74    # Process Set75    def get_all_processes(self) -> ItemsView[str, SubProcess]:76        return self._PROCESS_ALL.items()77    def get_stopped_processes(self) -> ItemsView[str, SubProcess]:78        return self._PROCESS_STOPPED.items()79    def get_running_processes(self) -> ItemsView[str, SubProcess]:80        return self._PROCESS_RUNNING.items()81    # Task82    def start_task(self, task_name: str) -> None:83        if task_name in self._PROCESS_RUNNING:84            return85        # get subprocess86        process_manager: SubProcess = self._PROCESS_STOPPED.pop(task_name)87        # pre start88        self.event_bus.publish_event(Event(EVENT.TASK_PRE_START, task_name=task_name))89        # start90        asyncio.get_event_loop().run_until_complete(process_manager.start())91        self._PROCESS_RUNNING[task_name] = process_manager92        # running93        event = Event(94            EVENT.TASK_RUNNING,95            task_name=task_name,96            task=process_manager.config,97            process=process_manager,98        )99        self.event_bus.publish_event(event)100    def stop_task(self, task_name: str) -> None:101        """忢è¿ç¨"""102        if task_name not in self._PROCESS_RUNNING:103            return104        process_manager: SubProcess = self._PROCESS_RUNNING.pop(task_name)105        # stop106        asyncio.get_event_loop().run_until_complete(process_manager.stop())107        self._PROCESS_STOPPED[task_name] = process_manager108        # running109        event = Event(110            EVENT.TASK_STOP,111            task_name=task_name,112            task=process_manager.config,113            process=process_manager,114        )115        self.event_bus.publish_event(event)116    def add_task(self, task: Task) -> Optional[SubProcess]:117        """æ·»å ä»»å¡å°ç³»ç»"""118        if task.name in self._PROCESS_ALL:  # å·²å è½½119            message = f"{task.name} is already used. {task}"120            raise DuplicateTaskNameError(message)121        logger.debug(f"add task: {task}")122        process_manager = SubProcess(task)123        self._PROCESS_ALL[task.name] = process_manager124        self._PROCESS_STOPPED[task.name] = process_manager125        event = Event(126            EVENT.TASK_ADD, task_name=task.name, task=task, process=process_manager127        )128        self.event_bus.publish_event(event)129        return process_manager130    def delete_task(self, task_name: str) -> None:131        subprocess: SubProcess = self._PROCESS_ALL.pop(task_name, None)132        if subprocess is None:133            return134        if self._PROCESS_RUNNING.pop(task_name, None):135            asyncio.run(subprocess.stop())136        else:137            self._PROCESS_STOPPED.pop(task_name, None)138        event = Event(139            EVENT.TASK_DELETE,140            task_name=task_name,141            task=subprocess.config,142            process=subprocess,143        )144        self.event_bus.publish_event(event)145    # System146    def _close_loop(self, event: Event) -> Optional[True]:147        """å
³é循ç¯"""148        self.loop_enable = False149        for task_name in self._PROCESS_RUNNING.copy().keys():150            self.stop_task(task_name)151        logger.info(f"Get {event}, close loop.")152        self.event_bus.publish_event(Event(EVENT.SYSTEM_CLOSE))153    def is_running(self, task_name: str) -> bool:154        """check is running"""155        if task_name in self._PROCESS_RUNNING:156            return True157        return False158    def state_check(self, _: Event) -> None:159        """160        æ£æ¥è¿ç¨ç¶æ161        è¿ç¨æ£å¸¸éåºååé TASK_FINISHäºä»¶162        å¼å¸¸éåºåé TASK_RUNNING_ERRORäºä»¶163        """164        for name, subprocess in self._PROCESS_RUNNING.copy().items():165            if subprocess.exit_code is None:166                # still running167                continue168            self._PROCESS_RUNNING.pop(name)169            self._PROCESS_STOPPED[name] = subprocess170    async def entry_loop(self) -> None:171        while self.loop_enable:172            try:173                self.event_bus.publish_event(174                    Event(EVENT.HEARTBEAT, now=datetime.datetime.now())175                )176            except LkFlowBaseError as err:177                logger.error(err.message)178                logger.error(traceback.format_exc())179            await asyncio.sleep(self.sleep_time)...musicplayer.py
Source:musicplayer.py  
1"""2This module contains a reusable MusicPlayer class that is based3on an audio player application named mpg123.4"""5import subprocess6import time7from threading import Thread8#-------------------------------------------------------------------9# AudioEngineUnavailableError class10#-------------------------------------------------------------------11class AudioEngineUnavailableError(Exception):12    """13    When the required player application (mpg123) is not available14    this Exception is raised.15    """16    pass 17#-------------------------------------------------------------------18# NoPlaybackError class19#-------------------------------------------------------------------20class NoPlaybackError(Exception):21    """22    When an attempt is made to control playback but nothing is23    being played, this Exception is raised.24    """25    pass 26#-------------------------------------------------------------------27# PlaybackInProgressError class28#-------------------------------------------------------------------29class PlaybackInProgressError(Exception):30    """31    When an attempt is made to begin playback while something32    is already playing, this Exception is raised.33    """34    pass 35#-------------------------------------------------------------------36# MusicPlayer class37#-------------------------------------------------------------------38class MusicPlayer():39    """40    Start a subprocess running audio player mpg123 to play a specified41    mp3 file. Provide controls to stop/start and quit playback.42    """43    def __init__(self):44        self._audioengine = 'mpg123' # Only supported player is mpg12345        self._p = None46        self._is_paused = False47        # The following instance variables are affected by the process_monitor48        # thread. 49        self._process_running = False50        self._return_code = None51    def play(self, sound_file):52        if not self._process_running:53            try:54                # Open a subprocess that runs mpg123 to play an mp3 file55                self._p = subprocess.Popen([self._audioengine, 56                                      '-C',     # Enable commands to be read from stdin57                                      '-q',     # Be quiet58                                      sound_file],59                                      stdin=subprocess.PIPE, # Pipe input via bytes60                                      stdout=None,   61                                      stderr=None)62                # Since we are using stdin for commands, we have to send something63                # to keep mpg123 from complaining when we exit. The complaint is64                # not a serious one, but it is annoying. If stdin is not used the65                # terminal is posted with "Can't set terminal attributes" at exit.66                # I send an empty string below to keep mpg123 happy.67                self._p.stdin.write(b'')68                self._p.stdin.flush()69                # start a monitor thread that sets instance variables with the70                # status of the subprocess. 71                monitor_thread = Thread(target=self.process_monitor,args=()) 72                monitor_thread.start()73            except FileNotFoundError as e:74                raise AudioEngineUnavailableError(f'AudioEngineUnavailableError: {e}')75        else:76            raise PlaybackInProgressError('You cannot play while something else is already playing.')77    def quit_playing(self):78        if self._process_running:79            self._p.stdin.write(b'q')80            self._p.stdin.flush()81            # Wait for process to end82            while self._process_running:83                time.sleep(0.1)84        else:85            raise NoPlaybackError('Cannot quit playback because nothing is playing.')86    def pause(self):87        if self._process_running:88            if self._is_paused:89                # Already paused, do nothing90                pass91            else:92                self._p.stdin.write(b's')93                self._p.stdin.flush()94                self._is_paused = True95        else:96            raise NoPlaybackError('Cannot pause playback because nothing is playing.')97                    98    def resume(self):99        if self._process_running:100            if not self._is_paused:101                # Already playing, do nothing102                pass103            else:104                self._p.stdin.write(b's')105                self._p.stdin.flush()106                self._is_paused = False107        else:108            raise NoPlaybackError('Cannot resume playback because nothing is playing.')109    def is_playing(self):110        return self._process_running111    def return_code(self):112        return self._return_code113    def process_monitor(self):114        """115        This code runs in its own thread to monitor the state of our116        external subprocess. Instance variables _process_running and117        _return_code are used to show the process status.118        """119        # Indicate that the process is running at the start, it120        # should be121        self._process_running = True122        # When a process exits, p.poll() returns the code it set upon123        # completion124        self._return_code = self._p.poll()125        # See whether the process has already exited. This will cause a126        # value (i.e. not None) to return from p.poll()127        if self._return_code == None:128            # Wait for the process to complete, get its return code directly129            # from the wait() call (i.e. do not use p.poll())130            self._return_code = self._p.wait()131        # When we get here, the process has exited and set a return code132        self._process_running = False133def main():134    """135    A function that instantiates a MusicPlayer and demonstrates its features.136    """137    player = MusicPlayer()138    player.play('./my_mp3_files/fluidity-100-ig-edit-4558.mp3')139    print(f'Player is playing: {player.is_playing()}')140    print('Play for 2 seconds')141    time.sleep(2)142    print('Pause player for 2 seconds')143    player.pause()144    time.sleep(2)145    print('')146    player.resume()147    time.sleep(2)148    player.quit_playing()149    print(f'Player is playing: {player.is_playing()}')150    print(f'Return code: {player.return_code()}')151    player.play('./my_mp3_files/this-minimal-technology-12327.mp3')152    # Play a track until it finishes, ending the player subprocess when it does.153    # Keep the Python process running until the track ends.154    print('The Python program will keep running until it sees subprocess completion.')155    progress_indicator = '*'156    while player.is_playing():157        time.sleep(1)158        print(progress_indicator)159        progress_indicator += '*'160if __name__ == '__main__':161    try:162        main()163    except Exception as e:164        print(e)165            ...utils.py
Source:utils.py  
1import asyncio2from .ml import prediction_reload3from trame import state4async def queue_to_state(queue, *tasks):5    _process_running = True6    while _process_running:7        if queue.empty():8            await asyncio.sleep(1)9        else:10            msg = queue.get_nowait()11            if isinstance(msg, str):12                # command13                if msg == "stop":14                    _process_running = False15            else:16                # Need to monitor as we are outside of client/server update17                with state.monitor():18                    # state update (dict)19                    state.update(msg)20    await asyncio.gather(*tasks)21    # Make sure we can go to prediction22    state.prediction_available = prediction_reload()23    state.testing_count = 0...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
