How to use _readfile method in avocado

Best Python code snippet using avocado_python Github


Full Screen

1#2#3# Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.4#5# This program is free software; you can redistribute it and/or modify6# it under the terms of the GNU General Public License as published by7# the Free Software Foundation; either version 2 of the License, or8# (at your option) any later version.9#10# This program is distributed in the hope that it will be useful, but11# WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU13# General Public License for more details.14#15# You should have received a copy of the GNU General Public License16# along with this program; if not, write to the Free Software17# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA18# 02110-1301, USA.19"""Global Configuration data for Ganeti.20This module provides the interface to a special case of cluster21configuration data, which is mostly static and available to all nodes.22"""23import sys24import errno25import logging26from ganeti import compat27from ganeti import errors28from ganeti import constants29from ganeti import utils30from ganeti import netutils31from ganeti import pathutils32SSCONF_LOCK_TIMEOUT = 1033#: Valid ssconf keys34_VALID_KEYS = compat.UniqueFrozenset([35 constants.SS_CLUSTER_NAME,36 constants.SS_CLUSTER_TAGS,37 constants.SS_FILE_STORAGE_DIR,38 constants.SS_SHARED_FILE_STORAGE_DIR,39 constants.SS_GLUSTER_STORAGE_DIR,40 constants.SS_MASTER_CANDIDATES,41 constants.SS_MASTER_CANDIDATES_IPS,42 constants.SS_MASTER_CANDIDATES_CERTS,43 constants.SS_MASTER_IP,44 constants.SS_MASTER_NETDEV,45 constants.SS_MASTER_NETMASK,46 constants.SS_MASTER_NODE,47 constants.SS_NODE_LIST,48 constants.SS_NODE_PRIMARY_IPS,49 constants.SS_NODE_SECONDARY_IPS,50 constants.SS_OFFLINE_NODES,51 constants.SS_ONLINE_NODES,52 constants.SS_PRIMARY_IP_FAMILY,53 constants.SS_INSTANCE_LIST,54 constants.SS_RELEASE_VERSION,55 constants.SS_HYPERVISOR_LIST,56 constants.SS_MAINTAIN_NODE_HEALTH,57 constants.SS_UID_POOL,58 constants.SS_NODEGROUPS,59 constants.SS_NETWORKS,60 constants.SS_HVPARAMS_XEN_PVM,61 constants.SS_HVPARAMS_XEN_FAKE,62 constants.SS_HVPARAMS_XEN_HVM,63 constants.SS_HVPARAMS_XEN_KVM,64 constants.SS_HVPARAMS_XEN_CHROOT,65 constants.SS_HVPARAMS_XEN_LXC,66 ])67#: Maximum size for ssconf files68_MAX_SIZE = 128 * 102469def ReadSsconfFile(filename):70 """Reads an ssconf file and verifies its size.71 @type filename: string72 @param filename: Path to file73 @rtype: string74 @return: File contents without newlines at the end75 @raise RuntimeError: When the file size exceeds L{_MAX_SIZE}76 """77 statcb = utils.FileStatHelper()78 data = utils.ReadFile(filename, size=_MAX_SIZE, preread=statcb)79 if > _MAX_SIZE:80 msg = ("File '%s' has a size of %s bytes (up to %s allowed)" %81 (filename,, _MAX_SIZE))82 raise RuntimeError(msg)83 return data.rstrip("\n")84class SimpleStore(object):85 """Interface to static cluster data.86 This is different that the config.ConfigWriter and87 SimpleConfigReader classes in that it holds data that will always be88 present, even on nodes which don't have all the cluster data.89 Other particularities of the datastore:90 - keys are restricted to predefined values91 """92 def __init__(self, cfg_location=None, _lockfile=pathutils.SSCONF_LOCK_FILE):93 if cfg_location is None:94 self._cfg_dir = pathutils.DATA_DIR95 else:96 self._cfg_dir = cfg_location97 self._lockfile = _lockfile98 def KeyToFilename(self, key):99 """Convert a given key into filename.100 """101 if key not in _VALID_KEYS:102 raise errors.ProgrammerError("Invalid key requested from SSConf: '%s'"103 % str(key))104 filename = self._cfg_dir + "/" + constants.SSCONF_FILEPREFIX + key105 return filename106 def _ReadFile(self, key, default=None):107 """Generic routine to read keys.108 This will read the file which holds the value requested. Errors109 will be changed into ConfigurationErrors.110 """111 filename = self.KeyToFilename(key)112 try:113 return ReadSsconfFile(filename)114 except EnvironmentError, err:115 if err.errno == errno.ENOENT and default is not None:116 return default117 raise errors.ConfigurationError("Can't read ssconf file %s: %s" %118 (filename, str(err)))119 def ReadAll(self):120 """Reads all keys and returns their values.121 @rtype: dict122 @return: Dictionary, ssconf key as key, value as value123 """124 result = []125 for key in _VALID_KEYS:126 try:127 value = self._ReadFile(key)128 except errors.ConfigurationError:129 # Ignore non-existing files130 pass131 else:132 result.append((key, value))133 return dict(result)134 def WriteFiles(self, values, dry_run=False):135 """Writes ssconf files used by external scripts.136 @type values: dict137 @param values: Dictionary of (name, value)138 @type dry_run boolean139 @param dry_run: Whether to perform a dry run140 """141 ssconf_lock = utils.FileLock.Open(self._lockfile)142 # Get lock while writing files143 ssconf_lock.Exclusive(blocking=True, timeout=SSCONF_LOCK_TIMEOUT)144 try:145 for name, value in values.iteritems():146 if isinstance(value, (list, tuple)):147 value = "\n".join(value)148 if value and not value.endswith("\n"):149 value += "\n"150 if len(value) > _MAX_SIZE:151 msg = ("Value '%s' has a length of %s bytes, but only up to %s are"152 " allowed" % (name, len(value), _MAX_SIZE))153 raise errors.ConfigurationError(msg)154 utils.WriteFile(self.KeyToFilename(name), data=value,155 mode=constants.SS_FILE_PERMS,156 dry_run=dry_run)157 finally:158 ssconf_lock.Unlock()159 def GetFileList(self):160 """Return the list of all config files.161 This is used for computing node replication data.162 """163 return [self.KeyToFilename(key) for key in _VALID_KEYS]164 def GetClusterName(self):165 """Get the cluster name.166 """167 return self._ReadFile(constants.SS_CLUSTER_NAME)168 def GetFileStorageDir(self):169 """Get the file storage dir.170 """171 return self._ReadFile(constants.SS_FILE_STORAGE_DIR)172 def GetSharedFileStorageDir(self):173 """Get the shared file storage dir.174 """175 return self._ReadFile(constants.SS_SHARED_FILE_STORAGE_DIR)176 def GetGlusterStorageDir(self):177 """Get the Gluster storage dir.178 """179 return self._ReadFile(constants.SS_GLUSTER_STORAGE_DIR)180 def GetMasterCandidates(self):181 """Return the list of master candidates.182 """183 data = self._ReadFile(constants.SS_MASTER_CANDIDATES)184 nl = data.splitlines(False)185 return nl186 def GetMasterCandidatesIPList(self):187 """Return the list of master candidates' primary IP.188 """189 data = self._ReadFile(constants.SS_MASTER_CANDIDATES_IPS)190 nl = data.splitlines(False)191 return nl192 def GetMasterCandidatesCertMap(self):193 """Returns the map of master candidate UUIDs to ssl cert.194 @rtype: dict of string to string195 @return: dictionary mapping the master candidates' UUIDs196 to their SSL certificate digests197 """198 data = self._ReadFile(constants.SS_MASTER_CANDIDATES_CERTS)199 lines = data.splitlines(False)200 certs = {}201 for line in lines:202 (node_uuid, cert_digest) = line.split("=")203 certs[node_uuid] = cert_digest204 return certs205 def GetMasterIP(self):206 """Get the IP of the master node for this cluster.207 """208 return self._ReadFile(constants.SS_MASTER_IP)209 def GetMasterNetdev(self):210 """Get the netdev to which we'll add the master ip.211 """212 return self._ReadFile(constants.SS_MASTER_NETDEV)213 def GetMasterNetmask(self):214 """Get the master netmask.215 """216 try:217 return self._ReadFile(constants.SS_MASTER_NETMASK)218 except errors.ConfigurationError:219 family = self.GetPrimaryIPFamily()220 ipcls = netutils.IPAddress.GetClassFromIpFamily(family)221 return ipcls.iplen222 def GetMasterNode(self):223 """Get the hostname of the master node for this cluster.224 """225 return self._ReadFile(constants.SS_MASTER_NODE)226 def GetNodeList(self):227 """Return the list of cluster nodes.228 """229 data = self._ReadFile(constants.SS_NODE_LIST)230 nl = data.splitlines(False)231 return nl232 def GetOnlineNodeList(self):233 """Return the list of online cluster nodes.234 """235 data = self._ReadFile(constants.SS_ONLINE_NODES)236 nl = data.splitlines(False)237 return nl238 def GetNodePrimaryIPList(self):239 """Return the list of cluster nodes' primary IP.240 """241 data = self._ReadFile(constants.SS_NODE_PRIMARY_IPS)242 nl = data.splitlines(False)243 return nl244 def GetNodeSecondaryIPList(self):245 """Return the list of cluster nodes' secondary IP.246 """247 data = self._ReadFile(constants.SS_NODE_SECONDARY_IPS)248 nl = data.splitlines(False)249 return nl250 def GetNodegroupList(self):251 """Return the list of nodegroups.252 """253 data = self._ReadFile(constants.SS_NODEGROUPS)254 nl = data.splitlines(False)255 return nl256 def GetNetworkList(self):257 """Return the list of networks.258 """259 data = self._ReadFile(constants.SS_NETWORKS)260 nl = data.splitlines(False)261 return nl262 def GetClusterTags(self):263 """Return the cluster tags.264 """265 data = self._ReadFile(constants.SS_CLUSTER_TAGS)266 nl = data.splitlines(False)267 return nl268 def GetHypervisorList(self):269 """Return the list of enabled hypervisors.270 """271 data = self._ReadFile(constants.SS_HYPERVISOR_LIST)272 nl = data.splitlines(False)273 return nl274 def GetHvparamsForHypervisor(self, hvname):275 """Return the hypervisor parameters of the given hypervisor.276 @type hvname: string277 @param hvname: name of the hypervisor, must be in C{constants.HYPER_TYPES}278 @rtype: dict of strings279 @returns: dictionary with hypervisor parameters280 """281 data = self._ReadFile(constants.SS_HVPARAMS_PREF + hvname)282 lines = data.splitlines(False)283 hvparams = {}284 for line in lines:285 (key, value) = line.split("=")286 hvparams[key] = value287 return hvparams288 def GetHvparams(self):289 """Return the hypervisor parameters of all hypervisors.290 @rtype: dict of dict of strings291 @returns: dictionary mapping hypervisor names to hvparams292 """293 all_hvparams = {}294 for hv in constants.HYPER_TYPES:295 all_hvparams[hv] = self.GetHvparamsForHypervisor(hv)296 return all_hvparams297 def GetMaintainNodeHealth(self):298 """Return the value of the maintain_node_health option.299 """300 data = self._ReadFile(constants.SS_MAINTAIN_NODE_HEALTH)301 # we rely on the bool serialization here302 return data == "True"303 def GetUidPool(self):304 """Return the user-id pool definition string.305 The separator character is a newline.306 The return value can be parsed using uidpool.ParseUidPool()::307 ss = ssconf.SimpleStore()308 uid_pool = uidpool.ParseUidPool(ss.GetUidPool(), separator="\\n")309 """310 data = self._ReadFile(constants.SS_UID_POOL)311 return data312 def GetPrimaryIPFamily(self):313 """Return the cluster-wide primary address family.314 """315 try:316 return int(self._ReadFile(constants.SS_PRIMARY_IP_FAMILY,317 except (ValueError, TypeError), err:319 raise errors.ConfigurationError("Error while trying to parse primary IP"320 " family: %s" % err)321def WriteSsconfFiles(values, dry_run=False):322 """Update all ssconf files.323 Wrapper around L{SimpleStore.WriteFiles}.324 """325 SimpleStore().WriteFiles(values, dry_run=dry_run)326def GetMasterAndMyself(ss=None):327 """Get the master node and my own hostname.328 This can be either used for a 'soft' check (compared to CheckMaster,329 which exits) or just for computing both at the same time.330 The function does not handle any errors, these should be handled in331 the caller (errors.ConfigurationError, errors.ResolverError).332 @param ss: either a sstore.SimpleConfigReader or a333 sstore.SimpleStore instance334 @rtype: tuple335 @return: a tuple (master node name, my own name)336 """337 if ss is None:338 ss = SimpleStore()339 return ss.GetMasterNode(), netutils.Hostname.GetSysName()340def CheckMaster(debug, ss=None):341 """Checks the node setup.342 If this is the master, the function will return. Otherwise it will343 exit with an exit code based on the node status.344 """345 try:346 master_name, myself = GetMasterAndMyself(ss)347 except errors.ConfigurationError, err:348 print "Cluster configuration incomplete: '%s'" % str(err)349 sys.exit(constants.EXIT_NODESETUP_ERROR)350 except errors.ResolverError, err:351 sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])352 sys.exit(constants.EXIT_NODESETUP_ERROR)353 if myself != master_name:354 if debug:355 sys.stderr.write("Not master, exiting.\n")356 sys.exit(constants.EXIT_NOTMASTER)357def VerifyClusterName(name, _cfg_location=None):358 """Verifies cluster name against a local cluster name.359 @type name: string360 @param name: Cluster name361 """362 sstore = SimpleStore(cfg_location=_cfg_location)363 try:364 local_name = sstore.GetClusterName()365 except errors.ConfigurationError, err:366 logging.debug("Can't get local cluster name: %s", err)367 else:368 if name != local_name:369 raise errors.GenericError("Current cluster name is '%s'" % local_name)370def VerifyKeys(keys):371 """Raises an exception if unknown ssconf keys are given.372 @type keys: sequence373 @param keys: Key names to verify374 @raise errors.GenericError: When invalid keys were found375 """376 invalid = frozenset(keys) - _VALID_KEYS377 if invalid:378 raise errors.GenericError("Invalid ssconf keys: %s" %...

Full Screen

Full Screen Github


Full Screen

...5 def __init__(self, pid):6 ProcFile.__init__(self)7 self.filename = '/proc/%s/io' % pid8 def names(self):9 return [line.split(':')[0] for line in self._readfile()]10 def get(self, name, default = None):11 for line in self._readfile():12 info = line.split(':')13 if name == info[0]:14 return int(info[1])15 else:16 None17 def __getattr__(self, name):18 if name in self.names():19 return self.get(name)20 else:21 raise AttributeError 22 23class PId(object):24 Stat = namedtuple( 'Stat', ['pid', 'comm', 'state', 'ppid', 'pgrp', 'session', 'tty_nr','tpgid',25 'flags', 'minflt', 'cminflt', 'majflt', 'cmajflt', 'utime','stime',26 'cutime', 'cstime', 'priority', 'nice', 'num_threads', 'itrealvalue',27 'starttime', 'vsize', 'rss', 'rsslim', 'startcode', 'endcode', 'startstack',28 'kstkesp', 'kstkeip', 'signal', 'blocked', 'sigignore', 'sigcatch',29 'wchan', 'nswap', 'cnswap', 'exit_signal', 'processor', 'rt_priority',30 'policy', 'delayacct_blkio_ticks', 'guest_time', 'cguest_time', 'start_data',31 'end_data', 'start_brk', 'arg_start', 'arg_end', 'env_start', 'env_end'])32 StatM = namedtuple('StatM', ['size', 'resident', 'share',33 'text', 'lib', 'data', 'dt'])34 FileDescriptor = namedtuple('FileDescriptor', ['fd', 'filename'])35 def __init__(self, pid):36 = pid37 #Generic Proc File38 self.gpf = ProcFile()39 @property40 def id_(self):41 return self.pid42 @property43 def oom_score(self):44 self.gpf.filename = '/proc/%s/oom_score' % self.pid45 score = self.gpf._readfile()46 return int(score[0])47 @property48 def oom_adj(self):49 self.gpf.filename = '/proc/%s/oom_adj' % self.pid50 score = self.gpf._readfile()51 return int(score[0])52 @oom_adj.setter53 def oom_adj(self, line):54 self.gpf.filename = '/proc/%s/oom_adj' % self.pid55 self.gpf._writefile(str(line))56 @property57 def oom_score_adj(self):58 self.gpf.filename = '/proc/%s/oom_score_adj' % self.pid59 score = self.gpf._readfile()60 return score[0]61 @oom_score_adj.setter62 def oom_score_adj(self, line):63 self.gpf.filename = '/proc/%s/oom_score_adj' % self.pid64 self.gpf._writefile(str(line))65 @property66 def cpuset(self):67 self.gpf.filename = '/proc/%s/cpuset' % self.pid68 value = self.gpf._readfile()69 return value[0]70 @property71 def sessionid(self):72 self.gpf.filename = '/proc/%s/sessionid' % self.pid73 value = self.gpf._readfile()74 return int(value[0])75 @property76 def personality(self):77 self.gpf.filename = '/proc/%s/personality' % self.pid78 value = self.gpf._readfile()79 return value[0]80 @property81 def coredump_filter(self):82 self.gpf.filename = '/proc/%s/coredump_filter' % self.pid83 value = self.gpf._readfile()84 return value[0]85 @coredump_filter.setter86 def coredump_filter(self, line):87 self.gpf.filename = '/proc/%s/coredump_filter' % self.pid88 self.gpf._writefile(str(line))89 @property90 def cmdline(self):91 self.gpf.filename = '/proc/%s/cmdline' % self.pid92 value = self.gpf._readfile()93 return value[0].split('\0')[:-1] if value else []94 @property95 def environ(self):96 self.gpf.filename = '/proc/%s/environ' % self.pid97 value = self.gpf._readfile()98 return value[0].split('\0')[:-1]99 @property100 def exe(self):101 return os.readlink('/proc/%s/exe' % @property103 def comm(self):104 self.gpf.filename = '/proc/%s/comm' % self.pid105 value = self.gpf._readfile()106 return value[0]107 @property108 def cpuset(self):109 self.gpf.filename = '/proc/%s/cpuset' % self.pid110 value = self.gpf._readfile()111 return value[0]112 @property113 def cwd(self):114 return os.readlink('/proc/%s/cwd' % @property116 def io(self):117 return IO( @property119 def loginuid(self):120 self.gpf.filename = '/proc/%s/loginuid' % self.pid121 value = self.gpf._readfile()122 return int(value[0])123 @loginuid.setter124 def loginuid(self, line):125 self.gpf.filename = '/proc/%s/loginuid' % self.pid126 self.gpf._writefile(str(line))127 @property128 def stat(self):129 self.gpf.filename = '/proc/%s/stat' % self.pid130 values = self.gpf._readfile()[0].split()131 pid = int(values[0])132 comm = values[1]133 state = values[2]134 tmp = [int(value) for value in values[3:51]]135 if len(values) < 51:136 tmp.extend([None]*(51 - len(values)))137 138 stat = self.Stat(pid, comm, state, *tuple(tmp))139 return stat140 @property141 def statm(self):142 self.gpf.filename = '/proc/%s/statm' % self.pid143 values = self.gpf._readfile()[0].split()144 return self.StatM(*tuple(int(value) for value in values))145 @property146 def fd(self):147 fd_dir = '/proc/%s/fd' % self.pid148 listdir = [int(i) for i in os.listdir(fd_dir)]149 files = (os.readlink('%s/%s' % (fd_dir, file_)) for file_ in listdir)150 return dict(list(zip(listdir, files)))151 @property152 def task(self):153 return Task( @property155 def thread(self):156 return Task( Thread(PId):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?