Best Python code snippet using ATX
index.py
Source:index.py  
1#!/usr/bin/env python2# -*- coding: UTF-8 -*-3import logging4import os5from threading import Thread6from traceback import format_exc7from sanji.connection.mqtt import Mqtt8from sanji.core import Sanji9from sanji.core import Route10from sanji.model_initiator import ModelInitiator11from voluptuous import All, Any, Length, Match, Range, Required, Schema12from voluptuous import REMOVE_EXTRA, Optional, In13from cellular_utility.cell_mgmt import CellMgmt, CellMgmtError14from cellular_utility.cell_mgmt import CellAllModuleNotSupportError15from cellular_utility.management import Manager16from cellular_utility.vnstat import VnStat, VnStatError17from sh import rm, service18if __name__ == "__main__":19    FORMAT = "%(asctime)s - %(levelname)s - %(lineno)s - %(message)s"20    logging.basicConfig(level=logging.INFO, format=FORMAT)21_logger = logging.getLogger("sanji.cellular")22class Index(Sanji):23    CONF_PROFILE_SCHEMA = Schema(24        {25            Required("apn", default="internet"):26                All(Any(unicode, str), Length(0, 100)),27            Optional("type", default="ipv4v6"):28                In(frozenset(["ipv4", "ipv6", "ipv4v6"])),29            Optional("auth", default={}): {30                Required("protocol", default="none"):31                    In(frozenset(["none", "chap", "pap", "both"])),32                Optional("username"):33                    All(Any(unicode, str), Length(0, 255)),34                Optional("password"):35                    All(Any(unicode, str), Length(0, 255))36            }37        },38        extra=REMOVE_EXTRA)39    CONF_SCHEMA = Schema(40        {41            "id": int,42            Required("enable"): bool,43            Required("pdpContext"): {44                Required("static"): bool,45                Required("id"): int,46                Required("retryTimeout", default=120): All(47                    int,48                    Any(0, Range(min=10, max=86400 - 1))49                ),50                Required("primary"): CONF_PROFILE_SCHEMA,51                Required("secondary", default={}): CONF_PROFILE_SCHEMA52            },53            Required("pinCode", default=""): Any(Match(r"[0-9]{4,8}"), ""),54            Required("keepalive"): {55                Required("enable"): bool,56                Required("targetHost"): basestring,57                Required("intervalSec"): All(58                    int,59                    Any(0, Range(min=60, max=86400 - 1))60                ),61                Required("reboot",62                         default={"enable": False, "cycles": 1}): {63                    Required("enable", default=False): bool,64                    Required("cycles", default=1): All(65                        int,66                        Any(0, Range(min=1, max=48))),67                }68            }69        },70        extra=REMOVE_EXTRA)71    def init(self, *args, **kwargs):72        path_root = os.path.abspath(os.path.dirname(__file__))73        self.model = ModelInitiator("cellular", path_root)74        self.model.db[0] = Index.CONF_SCHEMA(self.model.db[0])75        self._dev_name = None76        self._mgr = None77        self._vnstat = None78        self.__init_monit_config(79            enable=(self.model.db[0]["enable"] and80                    self.model.db[0]["keepalive"]["enable"] and True and81                    self.model.db[0]["keepalive"]["reboot"]["enable"] and82                    True),83            target_host=self.model.db[0]["keepalive"]["targetHost"],84            iface=self._dev_name,85            cycles=self.model.db[0]["keepalive"]["reboot"]["cycles"]86        )87        self._init_thread = Thread(88            name="sanji.cellular.init_thread",89            target=self.__initial_procedure)90        self._init_thread.daemon = True91        self._init_thread.start()92    def __initial_procedure(self):93        """94        Continuously check Cellular modem existence.95        Set self._dev_name, self._mgr, self._vnstat properly.96        """97        cell_mgmt = CellMgmt()98        wwan_node = None99        for retry in xrange(0, 4):100            if retry == 3:101                return102            try:103                wwan_node = cell_mgmt.m_info().wwan_node104                break105            except CellAllModuleNotSupportError:106                break107            except CellMgmtError:108                _logger.warning("get wwan_node failure: " + format_exc())109                cell_mgmt.power_cycle(timeout_sec=60)110        self._dev_name = wwan_node111        self.__init_monit_config(112            enable=(self.model.db[0]["enable"] and113                    self.model.db[0]["keepalive"]["enable"] and True and114                    self.model.db[0]["keepalive"]["reboot"]["enable"] and115                    True),116            target_host=self.model.db[0]["keepalive"]["targetHost"],117            iface=self._dev_name,118            cycles=self.model.db[0]["keepalive"]["reboot"]["cycles"]119        )120        self.__create_manager()121        self._vnstat = VnStat(self._dev_name)122    def __create_manager(self):123        pin = self.model.db[0]["pinCode"]124        if "primary" in self.model.db[0]["pdpContext"]:125            pdpc_primary_apn = \126                self.model.db[0]["pdpContext"]["primary"].get(127                    "apn", "internet")128            pdpc_primary_type = \129                self.model.db[0]["pdpContext"]["primary"].get("type", "ipv4v6")130            pdpc_primary_auth = \131                self.model.db[0]["pdpContext"]["primary"].get("auth", {})132        else:133            pdpc_primary_apn = "internet"134            pdpc_primary_type = "ipv4v6"135            pdpc_primary_auth = {}136        if "secondary" in self.model.db[0]["pdpContext"]:137            pdpc_secondary_apn = \138                self.model.db[0]["pdpContext"]["secondary"].get("apn", "")139            pdpc_secondary_type = \140                self.model.db[0]["pdpContext"]["secondary"].get(141                    "type", "ipv4v6")142            pdpc_secondary_auth = \143                self.model.db[0]["pdpContext"]["secondary"].get("auth", {})144        else:145            pdpc_secondary_apn = ""146            pdpc_secondary_type = "ipv4v6"147            pdpc_secondary_auth = {}148        pdpc_retry_timeout = self.model.db[0]["pdpContext"]["retryTimeout"]149        self._mgr = Manager(150            dev_name=self._dev_name,151            enabled=self.model.db[0]["enable"],152            pin=None if pin == "" else pin,153            pdp_context_static=self.model.db[0]["pdpContext"]["static"],154            pdp_context_id=self.model.db[0]["pdpContext"]["id"],155            pdp_context_primary_apn=pdpc_primary_apn,156            pdp_context_primary_type=pdpc_primary_type,157            pdp_context_primary_auth=pdpc_primary_auth.get("protocol", "none"),158            pdp_context_primary_username=pdpc_primary_auth.get("username", ""),159            pdp_context_primary_password=pdpc_primary_auth.get("password", ""),160            pdp_context_secondary_apn=pdpc_secondary_apn,161            pdp_context_secondary_type=pdpc_secondary_type,162            pdp_context_secondary_auth=pdpc_secondary_auth.get(163                "protocol", "none"),164            pdp_context_secondary_username=pdpc_secondary_auth.get(165                "username", ""),166            pdp_context_secondary_password=pdpc_secondary_auth.get(167                "password", ""),168            pdp_context_retry_timeout=pdpc_retry_timeout,169            keepalive_enabled=self.model.db[0]["keepalive"]["enable"],170            keepalive_host=self.model.db[0]["keepalive"]["targetHost"],171            keepalive_period_sec=self.model.db[0]["keepalive"]["intervalSec"],172            log_period_sec=60)173        # clear PIN code if pin error174        if self._mgr.status() == Manager.Status.pin_error and pin != "":175            self.model.db[0]["pinCode"] = ""176            self.model.save_db()177        self._mgr.set_update_network_information_callback(178            self._publish_network_info)179        self._mgr.start()180    def __init_completed(self):181        if self._init_thread is None:182            return True183        self._init_thread.join(0)184        if self._init_thread.is_alive():185            return False186        self._init_thread = None187        return True188    def __init_monit_config(189            self, enable=False, target_host="8.8.8.8", iface="", cycles=1):190        if enable is False:191            rm("-rf", "/etc/monit/conf.d/keepalive")192            service("monit", "restart")193            return194        ifacecmd = "" if iface == "" or iface is None \195                   else "-I {}".format(iface)196        config = """check program ping-test with path "/bin/ping {target_host} {ifacecmd} -c 3 -W 20"197    if status != 0198    then exec "/bin/bash -c '/usr/sbin/cell_mgmt power_off force && /bin/sleep 5 && /usr/local/sbin/reboot -i -f -d'"199    every {cycles} cycles200"""  # noqa201        with open("/etc/monit/conf.d/keepalive", "w") as f:202            f.write(config.format(203                target_host=target_host, ifacecmd=ifacecmd, cycles=cycles))204        service("monit", "restart")205    @Route(methods="get", resource="/network/cellulars")206    def get_list(self, message, response):207        if not self.__init_completed():208            return response(code=200, data=[])209        if (self._dev_name is None or210                self._mgr is None or211                self._vnstat is None):212            return response(code=200, data=[])213        return response(code=200, data=[self._get()])214    @Route(methods="get", resource="/network/cellulars/:id")215    def get(self, message, response):216        if not self.__init_completed():217            return response(code=400, data={"message": "resource not exist"})218        id_ = int(message.param["id"])219        if id_ != 1:220            return response(code=400, data={"message": "resource not exist"})221        return response(code=200, data=self._get())222    PUT_SCHEMA = CONF_SCHEMA223    @Route(methods="put", resource="/network/cellulars/:id", schema=PUT_SCHEMA)224    def put(self, message, response):225        if not self.__init_completed():226            return response(code=400, data={"message": "resource not exist"})227        id_ = int(message.param["id"])228        if id_ != 1:229            return response(code=400, data={"message": "resource not exist"})230        _logger.info(str(message.data))231        data = Index.PUT_SCHEMA(message.data)232        data["id"] = id_233        _logger.info(str(data))234        # always use the 1st PDP context for static235        if data["pdpContext"]["static"] is True:236            data["pdpContext"]["id"] = 1237        # since all items are required in PUT,238        # its schema is identical to cellular.json239        self.model.db[0] = data240        self.model.save_db()241        if self._mgr is not None:242            self._mgr.stop()243            self._mgr = None244        self.__create_manager()245        self.__init_monit_config(246            enable=(self.model.db[0]["enable"] and247                    self.model.db[0]["keepalive"]["enable"] and True and248                    self.model.db[0]["keepalive"]["reboot"]["enable"] and249                    True),250            target_host=self.model.db[0]["keepalive"]["targetHost"],251            iface=self._dev_name,252            cycles=self.model.db[0]["keepalive"]["reboot"]["cycles"]253        )254        # self._get() may wait until start/stop finished255        return response(code=200, data=self.model.db[0])256    def _get(self):257        name = self._dev_name258        if name is None:259            name = "n/a"260        config = self.model.db[0]261        status = self._mgr.status()262        minfo = self._mgr.module_information()263        sinfo = self._mgr.static_information()264        cinfo = self._mgr.cellular_information()265        ninfo = self._mgr.network_information()266        try:267            pdpc_list = self._mgr.pdp_context_list()268        except CellMgmtError:269            pdpc_list = []270        try:271            self._vnstat.update()272            usage = self._vnstat.get_usage()273        except VnStatError:274            usage = {275                "txkbyte": -1,276                "rxkbyte": -1277            }278        # clear PIN code if pin error279        if (config["pinCode"] != "" and280                status == Manager.Status.pin):281            config["pinCode"] = ""282            self.model.db[0] = config283            self.model.save_db()284        config["pdpContext"]["primary"] = \285            Index.CONF_PROFILE_SCHEMA(config["pdpContext"]["primary"])286        config["pdpContext"]["secondary"] = \287            Index.CONF_PROFILE_SCHEMA(config["pdpContext"]["secondary"])288        config["pdpContext"]["list"] = pdpc_list289        return {290            "id": config["id"],291            "name": name,292            "mode": "" if cinfo is None else cinfo.mode,293            "signal": {"csq": 0, "rssi": 0, "ecio": 0.0} if cinfo is None else294                    {"csq": cinfo.signal_csq,295                     "rssi": cinfo.signal_rssi_dbm,296                     "ecio": cinfo.signal_ecio_dbm},297            "operatorName": "" if cinfo is None else cinfo.operator,298            "lac": "" if cinfo is None else cinfo.lac,299            "tac": "" if cinfo is None else cinfo.tac,300            "nid": "" if cinfo is None else cinfo.nid,301            "cellId": "" if cinfo is None else cinfo.cell_id,302            "bid": "" if cinfo is None else cinfo.bid,303            "imsi": "" if sinfo is None else sinfo.imsi,304            "iccId": "" if sinfo is None else sinfo.iccid,305            "imei": "" if minfo is None else minfo.imei,306            "esn": "" if minfo is None else minfo.esn,307            "pinRetryRemain": (308                -1 if sinfo is None else sinfo.pin_retry_remain),309            "status": status.name,310            "mac": "00:00:00:00:00:00" if minfo is None else minfo.mac,311            "ip": "" if ninfo is None else ninfo.ip,312            "netmask": "" if ninfo is None else ninfo.netmask,313            "gateway": "" if ninfo is None else ninfo.gateway,314            "dns": [] if ninfo is None else ninfo.dns_list,315            "usage": {316                "txkbyte": usage["txkbyte"],317                "rxkbyte": usage["rxkbyte"]318            },319            "enable": config["enable"],320            "pdpContext": config["pdpContext"],321            "pinCode": config["pinCode"],322            "keepalive": {323                "enable": config["keepalive"]["enable"],324                "targetHost": config["keepalive"]["targetHost"],325                "intervalSec": config["keepalive"]["intervalSec"],326                "reboot": {327                    "enable": config["keepalive"]["reboot"]["enable"],328                    "cycles": config["keepalive"]["reboot"]["cycles"]329                }330            }331        }332    def _publish_network_info(333            self,334            nwk_info):335        name = self._dev_name336        if name is None:337            _logger.error("device name not available")338            return339        data = {340            "name": name,341            "wan": True,342            "type": "cellular",343            "mode": "dhcp",344            "status": nwk_info.status,345            "ip": nwk_info.ip,346            "netmask": nwk_info.netmask,347            "gateway": nwk_info.gateway,348            "dns": nwk_info.dns_list349        }350        _logger.info("publish network info: " + str(data))351        self.publish.event.put("/network/interfaces/{}".format(name),352                               data=data)353    @Route(methods="get", resource="/network/cellulars/:id/firmware")354    def get_fw(self, message, response):355        if not self.__init_completed():356            return response(code=400, data={"message": "resource not exist"})357        id_ = int(message.param["id"])358        if id_ != 1:359            return response(code=400, data={"message": "resource not exist"})360        m_info = self._mgr._cell_mgmt.m_info()361        if m_info.module != "MC7354":362            return response(code=200, data={363                "switchable": False,364                "current": None,365                "preferred": None,366                "avaliable": None367            })368        fw_info = self._mgr._cell_mgmt.get_cellular_fw()369        return response(code=200, data=fw_info)370    @Route(methods="put", resource="/network/cellulars/:id/firmware")371    def put_fw(self, message, response):372        if not self.__init_completed():373            return response(code=400, data={"message": "resource not exist"})374        id_ = int(message.param["id"])375        if id_ != 1:376            return response(code=400, data={"message": "resource not exist"})377        response(code=200)378        self._mgr._cell_mgmt.set_cellular_fw(379            fwver=message.data["fwver"],380            config=message.data["config"],381            carrier=message.data["carrier"]382        )383if __name__ == "__main__":384    cellular = Index(connection=Mqtt())...Player.py
Source:Player.py  
1#!/usr/bin/python32# -*- coding: utf-8 -*-3# Copyright (c) 2018 Bhojpur Consulting Private Limited, India. All rights reserved.4#5# Permission is hereby granted, free of charge, to any person obtaining a copy6# of this software and associated documentation files (the "Software"), to deal7# in the Software without restriction, including without limitation the rights8# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell9# copies of the Software, and to permit persons to whom the Software is10# furnished to do so, subject to the following conditions:11#12# The above copyright notice and this permission notice shall be included in13# all copies or substantial portions of the Software.14#15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR16# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,17# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE18# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER19# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN21# THE SOFTWARE.22# Bhojpur Speech: implementation of class Player23#24# The class Player implements the playback device for existing recordings25import os, time, datetime, subprocess, threading, copy, queue26from bhojpur import Base27class Player(Base):28  """ Bhojpur Speech Player controller """29  def __init__(self,app):30    """ initialization """31    self._app     = app32    self.debug    = app.debug33    self._api     = app.api34    self._backend = app.backend35    self._lock        = threading.Lock()36    self._file        = None37    self._dirinfo     = None38    self._dirplay     = None39    self._dirstop     = threading.Event()40    self._init_thread = None41    self.read_config()42    self.register_apis()43  # --- read configuration   --------------------------------------------------44  def read_config(self):45    """ read configuration from config-file """46    # section [PLAYER]47    self._wait_dir = int(self.get_value(self._app.parser,"PLAYER",48                                        "player_wait_dir",10))49    self._root_dir = self.get_value(self._app.parser,"PLAYER",50                                    "player_root_dir",51                                    os.path.expanduser("~"))52    self._root_dir = os.path.abspath(self._root_dir)53    self._def_dir = self.get_value(self._app.parser,"PLAYER",54                                    "player_def_dir",55                                    self._root_dir)56    self._def_dir = os.path.abspath(self._def_dir)57    self._dir = self._def_dir58    self.msg("Player: root dir:    %s" % self._root_dir)59    self.msg("Player: default dir: %s" % self._def_dir)60  # --- register APIs   ------------------------------------------------------61  def register_apis(self):62    """ register Bhojpur Speech API functions """63    self._api.player_play_file  = self.player_play_file64    self._api.player_stop       = self.player_stop65    self._api.player_pause      = self.player_pause66    self._api.player_resume     = self.player_resume67    self._api.player_toggle     = self.player_toggle68    self._api.player_select_dir = self.player_select_dir69    self._api.player_play_dir   = self.player_play_dir70    self._api._player_get_cover_file = self._player_get_cover_file71  # --- return persistent state of this class   -------------------------------72  def get_persistent_state(self):73    """ return persistent state (overrides Base.get_pesistent_state()) """74    return {75      'player_dir': self._dir,76      'player_file': self._file77      }78  # --- restore persistent state of this class   ------------------------------79  def set_persistent_state(self,state_map):80    """ restore persistent state (overrides Base.set_pesistent_state()) """81    self.msg("Player: restoring persistent state")82    if 'player_dir' in state_map:83      self._dir = state_map['player_dir']84      self.msg("Player: currrent dir (tentative):  %s" % self._dir)85    if 'player_file' in state_map:86      self._file = state_map['player_file']87      self.msg("Player: currrent file (tentative): %s" % self._file)88    self._api.update_state(section="player",key="last_dir",89                           value=self._dir[len(self._root_dir):]+os.path.sep,90                           publish=False)91    self._init_thread = threading.Thread(target=self._init_state)92    self._init_thread.start()93  # --- lazy query of dir-info during initialization   ----------------------94  def _init_state(self):95    """ wait for directory and query dir-info """96    # check directory (wait if necessary)97    self.msg("Player: waiting for %s" % self._dir)98    while not os.path.exists(self._dir) and self._wait_dir:99      time.sleep(1)100      self._wait_dir -= 1101    # check again102    if self._check_dir(self._dir):103      self._get_dirinfo(self._dir)104    else:105      # oops, check failed, now check everything106      if not os.path.exists(self._root_dir):107        self.msg("[WARNING] Player: root-directory %s of player does not exist" %108                 self._root_dir,True)109        self._root_dir = os.path.expanduser("~")110        self.msg("[WARNING] Player: using %s as fallback" % self._root_dir,True)111      if not self._check_dir(self._def_dir):112        self._def_dir = self._root_dir113        self.msg("[WARNING] Player: using %s as fallback" % self._root_dir,True)114      self._dir = self._def_dir115      self._get_dirinfo(self._dir)116    # also check files now117    if self._file and not self._check_file(self._file):118      self._file = None119    self._init_thread = None120    self.msg("Player: currrent dir:  %s" % self._dir)121    self.msg("Player: currrent file: %s" % self._file)122  # --- check directory   ---------------------------------------------------123  def _check_dir(self,path):124    """ check if directory is valid """125    path = os.path.abspath(path)126    if not os.path.exists(path):127      self.msg("[WARNING] Player: %s does not exist" % path)128      return False129    if not os.path.commonpath([self._root_dir,path]) == self._root_dir:130      self.msg("[WARNING] Player: %s is not child of root-directory" % path,131               True)132      return False133    return True134  # --- check file   ---------------------------------------------------------135  def _check_file(self,path):136    """ check if file is valid """137    path = os.path.abspath(path)138    if not os.path.exists(path):139      self.msg("[WARNING] Player: %s does not exist" % path)140      return False141    else:142      return self._check_dir(os.path.dirname(path))143  # --- pretty print duration/time   ----------------------------------------144  def _pp_time(self,seconds):145    """ pritty-print time as mm:ss or hh:mm """146    m, s = divmod(seconds,60)147    h, m = divmod(m,60)148    if h > 0:149      return "{0:02d}:{1:02d}:{2:02d}".format(h,m,s)150    else:151      return "{0:02d}:{1:02d}".format(m,s)152  # --- start playing   -------------------------------------------------------153  def player_play_file(self,file=None,last=True):154    """ start playing """155    if self._init_thread:156      self._init_thread.join()157    if file:158      if not os.path.isabs(file):159        file = os.path.join(self._dir,file)160      if not self._check_file(file):161        raise ValueError("invalid filename %s" % file)162      self._file = file163    if not self._file:164      raise ValueError("default file not set")165    if self._dirinfo:166      self._dirinfo['cur_file'] = self._file167    # this will push the information to all clients, even if the file168    # is already playing.169    # We might also need to push the elapsed time?!170    total_secs = int(subprocess.check_output(["mp3info", "-p","%S",self._file]))171    file_info = {'name': os.path.basename(self._file),172                 'total': total_secs,173                 'total_pretty': self._pp_time(total_secs),174                 'last': last}175    self._api._push_event({'type': 'file_info', 'value': file_info })176    if self._backend.play(self._file,last):177      self._api.update_state(section="player",key="last_file",178                             value=os.path.basename(self._file),publish=False)179    self._api.update_state(section="player",key="time",180                           value=[0,file_info['total'],file_info['total_pretty']],181                           publish=False)182    return file_info183  # --- stop playing   -------------------------------------------------------184  def player_stop(self):185    """ stop playing (play->stop, pause->stop)"""186    if self._dirplay:187      self._dirstop.set()         # this will also stop the backend188      self._dirplay.join()189    else:190      self._backend.stop()        # backend will publish eof-event191  # --- pause playing   -----------------------------------------------------192  def player_pause(self):193    """ pause playing (play->pause) """194    self._backend.pause()195  # --- resume playing   ----------------------------------------------------196  def player_resume(self):197    """ resume playing (pause->play) """198    self._backend.resume()199  # --- toggle playing   ------------------------------------------------------200  def player_toggle(self):201    """ toggle playing (play->pause, pause->play) """202    self._backend.toggle()203  # --- select directory, return entries   ------------------------------------204  def player_select_dir(self,dir=None):205    """ select directory:206        a directory starting with a / is always interpreted relative207        to root_dir, otherwise relative to the current directory208    """209    if self._init_thread:210      self._init_thread.join()211    self._lock.acquire()212    if not dir:213      # use current directory, keep current file214      dir = self._dir215    else:216      if os.path.isabs(dir):217        dir = os.path.normpath(self._root_dir+dir)   # cannot use join here!218        self.msg("Player: dir is absolute, fullpath %s" % dir)219      else:220        dir = os.path.normpath(os.path.join(self._dir,dir))221        self.msg("Player: dir is relative, fullpath %s" % dir)222      if not self._check_dir(dir):223        self._lock.release()224        raise ValueError("invalid directory %s" % dir)225    cache_valid = False226    if dir == self._dir:227      if self._dirinfo:228        cache_valid = True229    else:230      # set new current directory231      self._dir = dir232    # publish event first (return dir relative to root_dir)233    cur_dir = self._dir[len(self._root_dir):]+os.path.sep234    self._api._push_event({'type':  'dir_select', 'value': cur_dir})235    self._api.update_state(section="player",key="last_dir",236                           value=cur_dir,237                           publish=False)238    # then query new directory info239    if not cache_valid:240      self._get_dirinfo(dir)241      self._dirinfo['cur_dir'] = cur_dir242    else:243      self.msg("Player: using cached dir-info for %s" % dir)244    self._lock.release()245    return self._dirinfo246  # --- play all files in directory   -----------------------------------------247  def player_play_dir(self,start=None):248    """ play all files in the current directory starting with249        the given file250    """251    if self._init_thread:252      self._init_thread.join()253    # check existing player-thread, stop it and wait until it is finished254    if self._dirplay:255      self._dirstop.set()256      self._dirplay.join()257    # copy file-list258    if not start:259      files = copy.deepcopy(self._dirinfo['files'])260    else:261      try:262        index = self._dirinfo['files'].index(start)263        self.msg("Player: starting play_dir with file %s (index %i)" %264                 (start,index))265        files = copy.deepcopy(self._dirinfo['files'][index:])266      except ValueError:267        raise ValueError("file %s does not exist" % start)268    # start player-thread, pass files as argument269    self._dirstop.clear()270    self._dirplay = threading.Thread(target=self._play_dir,args=(files,))271    self._dirplay.start()272  # --- play all files (helper)   --------------------------------------------273  def _play_dir(self,files):274    """ play all given files """275    ev_queue = self._api._add_consumer("_play_dir")276    do_exit = False277    index_last = len(files)-1278    for index,fname in enumerate(files):279      if do_exit:280        break281      self.msg("Player: _play_dir: playing next file %s" % fname)282      self.player_play_file(fname,last=index==index_last)283      while True:284        # a naive implementation would just block on the queue, but285        # then we could stop this thread only after an event occurs286        if self._dirstop.wait(timeout=1.0):287          do_exit = True288          break289        try:290          ev = ev_queue.get(block=False)291          ev_queue.task_done()292          if ev:293            if ev['type'] == 'eof' and ev['value']['name'] == fname:294              self.msg("Player: processing eof for %s" % fname)295              break                              # start next file296          else:297            do_exit = True298            break299        except queue.Empty:300          pass301    # cleanup302    self.msg("Player: stopping _play_dir and cleaning up")303    self._api._del_consumer("_play_dir")304    self._backend.stop()305    self._dirplay = None306  # --- return name of cover file (currently only cover.jpg)   ---------------307  def _player_get_cover_file(self):308    """ return name of cover file """309    cover = os.path.join(self._dir,"cover.jpg")310    if os.path.exists(cover):311      return cover312    else:313      return None314  # --- create directory info for given dir   --------------------------------315  def _get_dirinfo(self,dir):316    """ create directory info """317    self._dirinfo =  {'dirs':  [], 'files': [], 'dur': []}318    self.msg("Player: collecting dir-info for %s" % dir)319    # first entry is parent directory320    if self._dir != self._root_dir:321      self._dirinfo['dirs'].append('..')322    for f in os.listdir(dir):323      if os.path.isfile(os.path.join(dir,f)):324        if f.endswith(".mp3"):325          self._dirinfo['files'].append(f)326      else:327        self._dirinfo['dirs'].append(f)328    # ... and sort results329    self._dirinfo['files'].sort()330    self._dirinfo['dirs'].sort()331    # set current file (keep existing)332    if self._file:333      self._dirinfo['cur_file'] = os.path.basename(self._file)334    else:335      if len(self._dirinfo['files']):336        self._file = os.path.join(self._dir,self._dirinfo['files'][0])337        self._dirinfo['cur_file'] = self._dirinfo['files'][0]338      else:339        self._dirinfo['cur_file'] = None340      self._api.update_state(section="player",key="last_file",341                             value= self._dirinfo['cur_file'],publish=False)342    # add add time-info343    for f in self._dirinfo['files']:344      secs = int(subprocess.check_output(["mp3info",345                                          "-p","%S",346                                          os.path.join(dir,f)]))...SRPlayer.py
Source:SRPlayer.py  
1#!/usr/bin/python32# -*- coding: utf-8 -*-3# -----------------------------------------------------------------------------4# Pi-Webradio: implementation of class Player5#6# The class Player implements the playback device for existing recordings7#8# Author: Bernhard Bablok9# License: GPL310#11# Website: https://github.com/bablokb/pi-webradio12#13# -----------------------------------------------------------------------------14import os, time, datetime, subprocess, threading, copy, queue15from webradio import Base16class Player(Base):17  """ Player-controller """18  def __init__(self,app):19    """ initialization """20    self._app     = app21    self.debug    = app.debug22    self._api     = app.api23    self._backend = app.backend24    self._lock        = threading.Lock()25    self._file        = None26    self._dirinfo     = None27    self._dirplay     = None28    self._dirstop     = threading.Event()29    self._init_thread = None30    self.read_config()31    self.register_apis()32  # --- read configuration   --------------------------------------------------33  def read_config(self):34    """ read configuration from config-file """35    # section [PLAYER]36    self._wait_dir = int(self.get_value(self._app.parser,"PLAYER",37                                        "player_wait_dir",10))38    self._root_dir = self.get_value(self._app.parser,"PLAYER",39                                    "player_root_dir",40                                    os.path.expanduser("~"))41    self._root_dir = os.path.abspath(self._root_dir)42    self._def_dir = self.get_value(self._app.parser,"PLAYER",43                                    "player_def_dir",44                                    self._root_dir)45    self._def_dir = os.path.abspath(self._def_dir)46    self._dir = self._def_dir47    self.msg("Player: root dir:    %s" % self._root_dir)48    self.msg("Player: default dir: %s" % self._def_dir)49  # --- register APIs   ------------------------------------------------------50  def register_apis(self):51    """ register API-functions """52    self._api.player_play_file  = self.player_play_file53    self._api.player_stop       = self.player_stop54    self._api.player_pause      = self.player_pause55    self._api.player_resume     = self.player_resume56    self._api.player_toggle     = self.player_toggle57    self._api.player_select_dir = self.player_select_dir58    self._api.player_play_dir   = self.player_play_dir59    self._api._player_get_cover_file = self._player_get_cover_file60  # --- return persistent state of this class   -------------------------------61  def get_persistent_state(self):62    """ return persistent state (overrides SRBase.get_pesistent_state()) """63    return {64      'player_dir': self._dir,65      'player_file': self._file66      }67  # --- restore persistent state of this class   ------------------------------68  def set_persistent_state(self,state_map):69    """ restore persistent state (overrides SRBase.set_pesistent_state()) """70    self.msg("Player: restoring persistent state")71    if 'player_dir' in state_map:72      self._dir = state_map['player_dir']73      self.msg("Player: currrent dir (tentative):  %s" % self._dir)74    if 'player_file' in state_map:75      self._file = state_map['player_file']76      self.msg("Player: currrent file (tentative): %s" % self._file)77    self._api.update_state(section="player",key="last_dir",78                           value=self._dir[len(self._root_dir):]+os.path.sep,79                           publish=False)80    self._init_thread = threading.Thread(target=self._init_state)81    self._init_thread.start()82  # --- lazy query of dir-info during initialization   ----------------------83  def _init_state(self):84    """ wait for directory and query dir-info """85    # check directory (wait if necessary)86    self.msg("Player: waiting for %s" % self._dir)87    while not os.path.exists(self._dir) and self._wait_dir:88      time.sleep(1)89      self._wait_dir -= 190    # check again91    if self._check_dir(self._dir):92      self._get_dirinfo(self._dir)93    else:94      # oops, check failed, now check everything95      if not os.path.exists(self._root_dir):96        self.msg("[WARNING] Player: root-directory %s of player does not exist" %97                 self._root_dir,True)98        self._root_dir = os.path.expanduser("~")99        self.msg("[WARNING] Player: using %s as fallback" % self._root_dir,True)100      if not self._check_dir(self._def_dir):101        self._def_dir = self._root_dir102        self.msg("[WARNING] Player: using %s as fallback" % self._root_dir,True)103      self._dir = self._def_dir104      self._get_dirinfo(self._dir)105    # also check files now106    if self._file and not self._check_file(self._file):107      self._file = None108    self._init_thread = None109    self.msg("Player: currrent dir:  %s" % self._dir)110    self.msg("Player: currrent file: %s" % self._file)111  # --- check directory   ---------------------------------------------------112  def _check_dir(self,path):113    """ check if directory is valid """114    path = os.path.abspath(path)115    if not os.path.exists(path):116      self.msg("[WARNING] Player: %s does not exist" % path)117      return False118    if not os.path.commonpath([self._root_dir,path]) == self._root_dir:119      self.msg("[WARNING] Player: %s is not child of root-directory" % path,120               True)121      return False122    return True123  # --- check file   ---------------------------------------------------------124  def _check_file(self,path):125    """ check if file is valid """126    path = os.path.abspath(path)127    if not os.path.exists(path):128      self.msg("[WARNING] Player: %s does not exist" % path)129      return False130    else:131      return self._check_dir(os.path.dirname(path))132  # --- pretty print duration/time   ----------------------------------------133  def _pp_time(self,seconds):134    """ pritty-print time as mm:ss or hh:mm """135    m, s = divmod(seconds,60)136    h, m = divmod(m,60)137    if h > 0:138      return "{0:02d}:{1:02d}:{2:02d}".format(h,m,s)139    else:140      return "{0:02d}:{1:02d}".format(m,s)141  # --- start playing   -------------------------------------------------------142  def player_play_file(self,file=None,last=True):143    """ start playing """144    if self._init_thread:145      self._init_thread.join()146    if file:147      if not os.path.isabs(file):148        file = os.path.join(self._dir,file)149      if not self._check_file(file):150        raise ValueError("invalid filename %s" % file)151      self._file = file152    if not self._file:153      raise ValueError("default file not set")154    if self._dirinfo:155      self._dirinfo['cur_file'] = self._file156    # this will push the information to all clients, even if the file157    # is already playing.158    # We might also need to push the elapsed time?!159    total_secs = int(subprocess.check_output(["mp3info", "-p","%S",self._file]))160    file_info = {'name': os.path.basename(self._file),161                 'total': total_secs,162                 'total_pretty': self._pp_time(total_secs),163                 'last': last}164    self._api._push_event({'type': 'file_info', 'value': file_info })165    if self._backend.play(self._file,last):166      self._api.update_state(section="player",key="last_file",167                             value=os.path.basename(self._file),publish=False)168    self._api.update_state(section="player",key="time",169                           value=[0,file_info['total'],file_info['total_pretty']],170                           publish=False)171    return file_info172  # --- stop playing   -------------------------------------------------------173  def player_stop(self):174    """ stop playing (play->stop, pause->stop)"""175    if self._dirplay:176      self._dirstop.set()         # this will also stop the backend177      self._dirplay.join()178    else:179      self._backend.stop()        # backend will publish eof-event180  # --- pause playing   -----------------------------------------------------181  def player_pause(self):182    """ pause playing (play->pause) """183    self._backend.pause()184  # --- resume playing   ----------------------------------------------------185  def player_resume(self):186    """ resume playing (pause->play) """187    self._backend.resume()188  # --- toggle playing   ------------------------------------------------------189  def player_toggle(self):190    """ toggle playing (play->pause, pause->play) """191    self._backend.toggle()192  # --- select directory, return entries   ------------------------------------193  def player_select_dir(self,dir=None):194    """ select directory:195        a directory starting with a / is always interpreted relative196        to root_dir, otherwise relative to the current directory197    """198    if self._init_thread:199      self._init_thread.join()200    self._lock.acquire()201    if not dir:202      # use current directory, keep current file203      dir = self._dir204    else:205      if os.path.isabs(dir):206        dir = os.path.normpath(self._root_dir+dir)   # cannot use join here!207        self.msg("Player: dir is absolute, fullpath %s" % dir)208      else:209        dir = os.path.normpath(os.path.join(self._dir,dir))210        self.msg("Player: dir is relative, fullpath %s" % dir)211      if not self._check_dir(dir):212        self._lock.release()213        raise ValueError("invalid directory %s" % dir)214    cache_valid = False215    if dir == self._dir:216      if self._dirinfo:217        cache_valid = True218    else:219      # set new current directory220      self._dir = dir221    # publish event first (return dir relative to root_dir)222    cur_dir = self._dir[len(self._root_dir):]+os.path.sep223    self._api._push_event({'type':  'dir_select', 'value': cur_dir})224    self._api.update_state(section="player",key="last_dir",225                           value=cur_dir,226                           publish=False)227    # then query new directory info228    if not cache_valid:229      self._get_dirinfo(dir)230      self._dirinfo['cur_dir'] = cur_dir231    else:232      self.msg("Player: using cached dir-info for %s" % dir)233    self._lock.release()234    return self._dirinfo235  # --- play all files in directory   -----------------------------------------236  def player_play_dir(self,start=None):237    """ play all files in the current directory starting with238        the given file239    """240    if self._init_thread:241      self._init_thread.join()242    # check existing player-thread, stop it and wait until it is finished243    if self._dirplay:244      self._dirstop.set()245      self._dirplay.join()246    # copy file-list247    if not start:248      files = copy.deepcopy(self._dirinfo['files'])249    else:250      try:251        index = self._dirinfo['files'].index(start)252        self.msg("Player: starting play_dir with file %s (index %i)" %253                 (start,index))254        files = copy.deepcopy(self._dirinfo['files'][index:])255      except ValueError:256        raise ValueError("file %s does not exist" % start)257    # start player-thread, pass files as argument258    self._dirstop.clear()259    self._dirplay = threading.Thread(target=self._play_dir,args=(files,))260    self._dirplay.start()261  # --- play all files (helper)   --------------------------------------------262  def _play_dir(self,files):263    """ play all given files """264    ev_queue = self._api._add_consumer("_play_dir")265    do_exit = False266    index_last = len(files)-1267    for index,fname in enumerate(files):268      if do_exit:269        break270      self.msg("Player: _play_dir: playing next file %s" % fname)271      self.player_play_file(fname,last=index==index_last)272      while True:273        # a naive implementation would just block on the queue, but274        # then we could stop this thread only after an event occurs275        if self._dirstop.wait(timeout=1.0):276          do_exit = True277          break278        try:279          ev = ev_queue.get(block=False)280          ev_queue.task_done()281          if ev:282            if ev['type'] == 'eof' and ev['value']['name'] == fname:283              self.msg("Player: processing eof for %s" % fname)284              break                              # start next file285          else:286            do_exit = True287            break288        except queue.Empty:289          pass290    # cleanup291    self.msg("Player: stopping _play_dir and cleaning up")292    self._api._del_consumer("_play_dir")293    self._backend.stop()294    self._dirplay = None295  # --- return name of cover file (currently only cover.jpg)   ---------------296  def _player_get_cover_file(self):297    """ return name of cover file """298    cover = os.path.join(self._dir,"cover.jpg")299    if os.path.exists(cover):300      return cover301    else:302      return None303  # --- create directory info for given dir   --------------------------------304  def _get_dirinfo(self,dir):305    """ create directory info """306    self._dirinfo =  {'dirs':  [], 'files': [], 'dur': []}307    self.msg("Player: collecting dir-info for %s" % dir)308    # first entry is parent directory309    if self._dir != self._root_dir:310      self._dirinfo['dirs'].append('..')311    for f in os.listdir(dir):312      if os.path.isfile(os.path.join(dir,f)):313        if f.endswith(".mp3"):314          self._dirinfo['files'].append(f)315      else:316        self._dirinfo['dirs'].append(f)317    # ... and sort results318    self._dirinfo['files'].sort()319    self._dirinfo['dirs'].sort()320    # set current file (keep existing)321    if self._file:322      self._dirinfo['cur_file'] = os.path.basename(self._file)323    else:324      if len(self._dirinfo['files']):325        self._file = os.path.join(self._dir,self._dirinfo['files'][0])326        self._dirinfo['cur_file'] = self._dirinfo['files'][0]327      else:328        self._dirinfo['cur_file'] = None329      self._api.update_state(section="player",key="last_file",330                             value= self._dirinfo['cur_file'],publish=False)331    # add add time-info332    for f in self._dirinfo['files']:333      secs = int(subprocess.check_output(["mp3info",334                                          "-p","%S",335                                          os.path.join(dir,f)]))...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
