Best Python code snippet using avocado_python
ssconf.py
Source:ssconf.py  
1#2#3# Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.4#5# This program is free software; you can redistribute it and/or modify6# it under the terms of the GNU General Public License as published by7# the Free Software Foundation; either version 2 of the License, or8# (at your option) any later version.9#10# This program is distributed in the hope that it will be useful, but11# WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU13# General Public License for more details.14#15# You should have received a copy of the GNU General Public License16# along with this program; if not, write to the Free Software17# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA18# 02110-1301, USA.19"""Global Configuration data for Ganeti.20This module provides the interface to a special case of cluster21configuration data, which is mostly static and available to all nodes.22"""23import sys24import errno25import logging26from ganeti import compat27from ganeti import errors28from ganeti import constants29from ganeti import utils30from ganeti import netutils31from ganeti import pathutils32SSCONF_LOCK_TIMEOUT = 1033#: Valid ssconf keys34_VALID_KEYS = compat.UniqueFrozenset([35  constants.SS_CLUSTER_NAME,36  constants.SS_CLUSTER_TAGS,37  constants.SS_FILE_STORAGE_DIR,38  constants.SS_SHARED_FILE_STORAGE_DIR,39  constants.SS_GLUSTER_STORAGE_DIR,40  constants.SS_MASTER_CANDIDATES,41  constants.SS_MASTER_CANDIDATES_IPS,42  constants.SS_MASTER_CANDIDATES_CERTS,43  constants.SS_MASTER_IP,44  constants.SS_MASTER_NETDEV,45  constants.SS_MASTER_NETMASK,46  constants.SS_MASTER_NODE,47  constants.SS_NODE_LIST,48  constants.SS_NODE_PRIMARY_IPS,49  constants.SS_NODE_SECONDARY_IPS,50  constants.SS_OFFLINE_NODES,51  constants.SS_ONLINE_NODES,52  constants.SS_PRIMARY_IP_FAMILY,53  constants.SS_INSTANCE_LIST,54  constants.SS_RELEASE_VERSION,55  constants.SS_HYPERVISOR_LIST,56  constants.SS_MAINTAIN_NODE_HEALTH,57  constants.SS_UID_POOL,58  constants.SS_NODEGROUPS,59  constants.SS_NETWORKS,60  constants.SS_HVPARAMS_XEN_PVM,61  constants.SS_HVPARAMS_XEN_FAKE,62  constants.SS_HVPARAMS_XEN_HVM,63  constants.SS_HVPARAMS_XEN_KVM,64  constants.SS_HVPARAMS_XEN_CHROOT,65  constants.SS_HVPARAMS_XEN_LXC,66  ])67#: Maximum size for ssconf files68_MAX_SIZE = 128 * 102469def ReadSsconfFile(filename):70  """Reads an ssconf file and verifies its size.71  @type filename: string72  @param filename: Path to file73  @rtype: string74  @return: File contents without newlines at the end75  @raise RuntimeError: When the file size exceeds L{_MAX_SIZE}76  """77  statcb = utils.FileStatHelper()78  data = utils.ReadFile(filename, size=_MAX_SIZE, preread=statcb)79  if statcb.st.st_size > _MAX_SIZE:80    msg = ("File '%s' has a size of %s bytes (up to %s allowed)" %81           (filename, statcb.st.st_size, _MAX_SIZE))82    raise RuntimeError(msg)83  return data.rstrip("\n")84class SimpleStore(object):85  """Interface to static cluster data.86  This is different that the config.ConfigWriter and87  SimpleConfigReader classes in that it holds data that will always be88  present, even on nodes which don't have all the cluster data.89  Other particularities of the datastore:90    - keys are restricted to predefined values91  """92  def __init__(self, cfg_location=None, _lockfile=pathutils.SSCONF_LOCK_FILE):93    if cfg_location is None:94      self._cfg_dir = pathutils.DATA_DIR95    else:96      self._cfg_dir = cfg_location97    self._lockfile = _lockfile98  def KeyToFilename(self, key):99    """Convert a given key into filename.100    """101    if key not in _VALID_KEYS:102      raise errors.ProgrammerError("Invalid key requested from SSConf: '%s'"103                                   % str(key))104    filename = self._cfg_dir + "/" + constants.SSCONF_FILEPREFIX + key105    return filename106  def _ReadFile(self, key, default=None):107    """Generic routine to read keys.108    This will read the file which holds the value requested. Errors109    will be changed into ConfigurationErrors.110    """111    filename = self.KeyToFilename(key)112    try:113      return ReadSsconfFile(filename)114    except EnvironmentError, err:115      if err.errno == errno.ENOENT and default is not None:116        return default117      raise errors.ConfigurationError("Can't read ssconf file %s: %s" %118                                      (filename, str(err)))119  def ReadAll(self):120    """Reads all keys and returns their values.121    @rtype: dict122    @return: Dictionary, ssconf key as key, value as value123    """124    result = []125    for key in _VALID_KEYS:126      try:127        value = self._ReadFile(key)128      except errors.ConfigurationError:129        # Ignore non-existing files130        pass131      else:132        result.append((key, value))133    return dict(result)134  def WriteFiles(self, values, dry_run=False):135    """Writes ssconf files used by external scripts.136    @type values: dict137    @param values: Dictionary of (name, value)138    @type dry_run boolean139    @param dry_run: Whether to perform a dry run140    """141    ssconf_lock = utils.FileLock.Open(self._lockfile)142    # Get lock while writing files143    ssconf_lock.Exclusive(blocking=True, timeout=SSCONF_LOCK_TIMEOUT)144    try:145      for name, value in values.iteritems():146        if isinstance(value, (list, tuple)):147          value = "\n".join(value)148        if value and not value.endswith("\n"):149          value += "\n"150        if len(value) > _MAX_SIZE:151          msg = ("Value '%s' has a length of %s bytes, but only up to %s are"152                 " allowed" % (name, len(value), _MAX_SIZE))153          raise errors.ConfigurationError(msg)154        utils.WriteFile(self.KeyToFilename(name), data=value,155                        mode=constants.SS_FILE_PERMS,156                        dry_run=dry_run)157    finally:158      ssconf_lock.Unlock()159  def GetFileList(self):160    """Return the list of all config files.161    This is used for computing node replication data.162    """163    return [self.KeyToFilename(key) for key in _VALID_KEYS]164  def GetClusterName(self):165    """Get the cluster name.166    """167    return self._ReadFile(constants.SS_CLUSTER_NAME)168  def GetFileStorageDir(self):169    """Get the file storage dir.170    """171    return self._ReadFile(constants.SS_FILE_STORAGE_DIR)172  def GetSharedFileStorageDir(self):173    """Get the shared file storage dir.174    """175    return self._ReadFile(constants.SS_SHARED_FILE_STORAGE_DIR)176  def GetGlusterStorageDir(self):177    """Get the Gluster storage dir.178    """179    return self._ReadFile(constants.SS_GLUSTER_STORAGE_DIR)180  def GetMasterCandidates(self):181    """Return the list of master candidates.182    """183    data = self._ReadFile(constants.SS_MASTER_CANDIDATES)184    nl = data.splitlines(False)185    return nl186  def GetMasterCandidatesIPList(self):187    """Return the list of master candidates' primary IP.188    """189    data = self._ReadFile(constants.SS_MASTER_CANDIDATES_IPS)190    nl = data.splitlines(False)191    return nl192  def GetMasterCandidatesCertMap(self):193    """Returns the map of master candidate UUIDs to ssl cert.194    @rtype: dict of string to string195    @return: dictionary mapping the master candidates' UUIDs196      to their SSL certificate digests197    """198    data = self._ReadFile(constants.SS_MASTER_CANDIDATES_CERTS)199    lines = data.splitlines(False)200    certs = {}201    for line in lines:202      (node_uuid, cert_digest) = line.split("=")203      certs[node_uuid] = cert_digest204    return certs205  def GetMasterIP(self):206    """Get the IP of the master node for this cluster.207    """208    return self._ReadFile(constants.SS_MASTER_IP)209  def GetMasterNetdev(self):210    """Get the netdev to which we'll add the master ip.211    """212    return self._ReadFile(constants.SS_MASTER_NETDEV)213  def GetMasterNetmask(self):214    """Get the master netmask.215    """216    try:217      return self._ReadFile(constants.SS_MASTER_NETMASK)218    except errors.ConfigurationError:219      family = self.GetPrimaryIPFamily()220      ipcls = netutils.IPAddress.GetClassFromIpFamily(family)221      return ipcls.iplen222  def GetMasterNode(self):223    """Get the hostname of the master node for this cluster.224    """225    return self._ReadFile(constants.SS_MASTER_NODE)226  def GetNodeList(self):227    """Return the list of cluster nodes.228    """229    data = self._ReadFile(constants.SS_NODE_LIST)230    nl = data.splitlines(False)231    return nl232  def GetOnlineNodeList(self):233    """Return the list of online cluster nodes.234    """235    data = self._ReadFile(constants.SS_ONLINE_NODES)236    nl = data.splitlines(False)237    return nl238  def GetNodePrimaryIPList(self):239    """Return the list of cluster nodes' primary IP.240    """241    data = self._ReadFile(constants.SS_NODE_PRIMARY_IPS)242    nl = data.splitlines(False)243    return nl244  def GetNodeSecondaryIPList(self):245    """Return the list of cluster nodes' secondary IP.246    """247    data = self._ReadFile(constants.SS_NODE_SECONDARY_IPS)248    nl = data.splitlines(False)249    return nl250  def GetNodegroupList(self):251    """Return the list of nodegroups.252    """253    data = self._ReadFile(constants.SS_NODEGROUPS)254    nl = data.splitlines(False)255    return nl256  def GetNetworkList(self):257    """Return the list of networks.258    """259    data = self._ReadFile(constants.SS_NETWORKS)260    nl = data.splitlines(False)261    return nl262  def GetClusterTags(self):263    """Return the cluster tags.264    """265    data = self._ReadFile(constants.SS_CLUSTER_TAGS)266    nl = data.splitlines(False)267    return nl268  def GetHypervisorList(self):269    """Return the list of enabled hypervisors.270    """271    data = self._ReadFile(constants.SS_HYPERVISOR_LIST)272    nl = data.splitlines(False)273    return nl274  def GetHvparamsForHypervisor(self, hvname):275    """Return the hypervisor parameters of the given hypervisor.276    @type hvname: string277    @param hvname: name of the hypervisor, must be in C{constants.HYPER_TYPES}278    @rtype: dict of strings279    @returns: dictionary with hypervisor parameters280    """281    data = self._ReadFile(constants.SS_HVPARAMS_PREF + hvname)282    lines = data.splitlines(False)283    hvparams = {}284    for line in lines:285      (key, value) = line.split("=")286      hvparams[key] = value287    return hvparams288  def GetHvparams(self):289    """Return the hypervisor parameters of all hypervisors.290    @rtype: dict of dict of strings291    @returns: dictionary mapping hypervisor names to hvparams292    """293    all_hvparams = {}294    for hv in constants.HYPER_TYPES:295      all_hvparams[hv] = self.GetHvparamsForHypervisor(hv)296    return all_hvparams297  def GetMaintainNodeHealth(self):298    """Return the value of the maintain_node_health option.299    """300    data = self._ReadFile(constants.SS_MAINTAIN_NODE_HEALTH)301    # we rely on the bool serialization here302    return data == "True"303  def GetUidPool(self):304    """Return the user-id pool definition string.305    The separator character is a newline.306    The return value can be parsed using uidpool.ParseUidPool()::307      ss = ssconf.SimpleStore()308      uid_pool = uidpool.ParseUidPool(ss.GetUidPool(), separator="\\n")309    """310    data = self._ReadFile(constants.SS_UID_POOL)311    return data312  def GetPrimaryIPFamily(self):313    """Return the cluster-wide primary address family.314    """315    try:316      return int(self._ReadFile(constants.SS_PRIMARY_IP_FAMILY,317                                default=netutils.IP4Address.family))318    except (ValueError, TypeError), err:319      raise errors.ConfigurationError("Error while trying to parse primary IP"320                                      " family: %s" % err)321def WriteSsconfFiles(values, dry_run=False):322  """Update all ssconf files.323  Wrapper around L{SimpleStore.WriteFiles}.324  """325  SimpleStore().WriteFiles(values, dry_run=dry_run)326def GetMasterAndMyself(ss=None):327  """Get the master node and my own hostname.328  This can be either used for a 'soft' check (compared to CheckMaster,329  which exits) or just for computing both at the same time.330  The function does not handle any errors, these should be handled in331  the caller (errors.ConfigurationError, errors.ResolverError).332  @param ss: either a sstore.SimpleConfigReader or a333      sstore.SimpleStore instance334  @rtype: tuple335  @return: a tuple (master node name, my own name)336  """337  if ss is None:338    ss = SimpleStore()339  return ss.GetMasterNode(), netutils.Hostname.GetSysName()340def CheckMaster(debug, ss=None):341  """Checks the node setup.342  If this is the master, the function will return. Otherwise it will343  exit with an exit code based on the node status.344  """345  try:346    master_name, myself = GetMasterAndMyself(ss)347  except errors.ConfigurationError, err:348    print "Cluster configuration incomplete: '%s'" % str(err)349    sys.exit(constants.EXIT_NODESETUP_ERROR)350  except errors.ResolverError, err:351    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])352    sys.exit(constants.EXIT_NODESETUP_ERROR)353  if myself != master_name:354    if debug:355      sys.stderr.write("Not master, exiting.\n")356    sys.exit(constants.EXIT_NOTMASTER)357def VerifyClusterName(name, _cfg_location=None):358  """Verifies cluster name against a local cluster name.359  @type name: string360  @param name: Cluster name361  """362  sstore = SimpleStore(cfg_location=_cfg_location)363  try:364    local_name = sstore.GetClusterName()365  except errors.ConfigurationError, err:366    logging.debug("Can't get local cluster name: %s", err)367  else:368    if name != local_name:369      raise errors.GenericError("Current cluster name is '%s'" % local_name)370def VerifyKeys(keys):371  """Raises an exception if unknown ssconf keys are given.372  @type keys: sequence373  @param keys: Key names to verify374  @raise errors.GenericError: When invalid keys were found375  """376  invalid = frozenset(keys) - _VALID_KEYS377  if invalid:378    raise errors.GenericError("Invalid ssconf keys: %s" %...process.py
Source:process.py  
...5    def __init__(self, pid):6        ProcFile.__init__(self)7        self.filename = '/proc/%s/io' % pid8    def names(self):9        return [line.split(':')[0] for line in self._readfile()]10    def get(self, name, default = None):11        for line in self._readfile():12            info = line.split(':')13            if name == info[0]:14                return int(info[1])15            else:16                None17    def __getattr__(self, name):18        if name in self.names():19            return self.get(name)20        else:21            raise AttributeError 22        23class PId(object):24    Stat = namedtuple( 'Stat', ['pid', 'comm', 'state', 'ppid', 'pgrp', 'session', 'tty_nr','tpgid',25            'flags', 'minflt', 'cminflt', 'majflt', 'cmajflt', 'utime','stime',26            'cutime', 'cstime', 'priority', 'nice', 'num_threads', 'itrealvalue',27            'starttime', 'vsize', 'rss', 'rsslim', 'startcode', 'endcode', 'startstack',28            'kstkesp', 'kstkeip', 'signal', 'blocked', 'sigignore', 'sigcatch',29            'wchan', 'nswap', 'cnswap', 'exit_signal', 'processor', 'rt_priority',30            'policy', 'delayacct_blkio_ticks', 'guest_time', 'cguest_time', 'start_data',31            'end_data', 'start_brk', 'arg_start', 'arg_end', 'env_start', 'env_end'])32    StatM = namedtuple('StatM', ['size', 'resident', 'share',33                                'text', 'lib', 'data', 'dt'])34    FileDescriptor = namedtuple('FileDescriptor', ['fd', 'filename'])35    def __init__(self, pid):36        self.pid = pid37        #Generic Proc File38        self.gpf = ProcFile()39    @property40    def id_(self):41        return self.pid42    @property43    def oom_score(self):44        self.gpf.filename = '/proc/%s/oom_score' % self.pid45        score = self.gpf._readfile()46        return int(score[0])47    @property48    def oom_adj(self):49        self.gpf.filename = '/proc/%s/oom_adj' % self.pid50        score = self.gpf._readfile()51        return int(score[0])52    @oom_adj.setter53    def oom_adj(self, line):54        self.gpf.filename = '/proc/%s/oom_adj' % self.pid55        self.gpf._writefile(str(line))56    @property57    def oom_score_adj(self):58        self.gpf.filename = '/proc/%s/oom_score_adj' % self.pid59        score = self.gpf._readfile()60        return score[0]61    @oom_score_adj.setter62    def oom_score_adj(self, line):63        self.gpf.filename = '/proc/%s/oom_score_adj' % self.pid64        self.gpf._writefile(str(line))65    @property66    def cpuset(self):67        self.gpf.filename = '/proc/%s/cpuset' % self.pid68        value = self.gpf._readfile()69        return value[0]70    @property71    def sessionid(self):72        self.gpf.filename = '/proc/%s/sessionid' % self.pid73        value = self.gpf._readfile()74        return int(value[0])75    @property76    def personality(self):77        self.gpf.filename = '/proc/%s/personality' % self.pid78        value = self.gpf._readfile()79        return value[0]80    @property81    def coredump_filter(self):82        self.gpf.filename = '/proc/%s/coredump_filter' % self.pid83        value = self.gpf._readfile()84        return value[0]85    @coredump_filter.setter86    def coredump_filter(self, line):87        self.gpf.filename = '/proc/%s/coredump_filter' % self.pid88        self.gpf._writefile(str(line))89    @property90    def cmdline(self):91        self.gpf.filename = '/proc/%s/cmdline' % self.pid92        value = self.gpf._readfile()93        return value[0].split('\0')[:-1] if value else []94    @property95    def environ(self):96        self.gpf.filename = '/proc/%s/environ' % self.pid97        value = self.gpf._readfile()98        return value[0].split('\0')[:-1]99    @property100    def exe(self):101        return os.readlink('/proc/%s/exe' % self.pid)102    @property103    def comm(self):104        self.gpf.filename = '/proc/%s/comm' % self.pid105        value = self.gpf._readfile()106        return value[0]107    @property108    def cpuset(self):109        self.gpf.filename = '/proc/%s/cpuset' % self.pid110        value = self.gpf._readfile()111        return value[0]112    @property113    def cwd(self):114        return os.readlink('/proc/%s/cwd' % self.pid)115    @property116    def io(self):117        return IO(self.pid)118    @property119    def loginuid(self):120        self.gpf.filename = '/proc/%s/loginuid' % self.pid121        value = self.gpf._readfile()122        return int(value[0])123    @loginuid.setter124    def loginuid(self, line):125        self.gpf.filename = '/proc/%s/loginuid' % self.pid126        self.gpf._writefile(str(line))127    @property128    def stat(self):129        self.gpf.filename = '/proc/%s/stat' % self.pid130        values = self.gpf._readfile()[0].split()131        pid = int(values[0])132        comm = values[1]133        state = values[2]134        tmp = [int(value) for value in values[3:51]]135        if len(values) < 51:136            tmp.extend([None]*(51 - len(values)))137            138        stat = self.Stat(pid, comm, state, *tuple(tmp))139        return stat140    @property141    def statm(self):142        self.gpf.filename = '/proc/%s/statm' % self.pid143        values = self.gpf._readfile()[0].split()144        return self.StatM(*tuple(int(value) for value in values))145    @property146    def fd(self):147        fd_dir = '/proc/%s/fd' % self.pid148        listdir = [int(i) for i in os.listdir(fd_dir)]149        files = (os.readlink('%s/%s' % (fd_dir, file_)) for file_ in listdir)150        return dict(list(zip(listdir, files)))151    @property152    def task(self):153        return Task(self.pid)154    @property155    def thread(self):156        return Task(self.pid)157class Thread(PId):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
