How to use _LookUpStack method in autotest

Best Python code snippet using autotest_python

config.py

Source:config.py Github

copy

Full Screen

1import argparse2import os3import random4from fnmatch import fnmatchcase5import sys6import re7import shlex8import string9import pkg_resources10import itertools11import pluggy12from subprocess import list2cmdline13import tox.interpreters14from tox import hookspecs15from tox._verlib import NormalizedVersion16import py17import tox18iswin32 = sys.platform == "win32"19default_factors = {'jython': 'jython', 'pypy': 'pypy', 'pypy3': 'pypy3',20 'py': sys.executable, 'py2': 'python2', 'py3': 'python3'}21for version in '26,27,32,33,34,35,36,37'.split(','):22 default_factors['py' + version] = 'python%s.%s' % tuple(version)23hookimpl = pluggy.HookimplMarker("tox")24_dummy = object()25def get_plugin_manager(plugins=()):26 # initialize plugin manager27 import tox.venv28 pm = pluggy.PluginManager("tox")29 pm.add_hookspecs(hookspecs)30 pm.register(tox.config)31 pm.register(tox.interpreters)32 pm.register(tox.venv)33 pm.register(tox.session)34 pm.load_setuptools_entrypoints("tox")35 for plugin in plugins:36 pm.register(plugin)37 pm.check_pending()38 return pm39class Parser:40 """ command line and ini-parser control object. """41 def __init__(self):42 self.argparser = argparse.ArgumentParser(43 description="tox options", add_help=False)44 self._testenv_attr = []45 def add_argument(self, *args, **kwargs):46 """ add argument to command line parser. This takes the47 same arguments that ``argparse.ArgumentParser.add_argument``.48 """49 return self.argparser.add_argument(*args, **kwargs)50 def add_testenv_attribute(self, name, type, help, default=None, postprocess=None):51 """ add an ini-file variable for "testenv" section.52 Types are specified as strings like "bool", "line-list", "string", "argv", "path",53 "argvlist".54 The ``postprocess`` function will be called for each testenv55 like ``postprocess(testenv_config=testenv_config, value=value)``56 where ``value`` is the value as read from the ini (or the default value)57 and ``testenv_config`` is a :py:class:`tox.config.TestenvConfig` instance58 which will receive all ini-variables as object attributes.59 Any postprocess function must return a value which will then be set60 as the final value in the testenv section.61 """62 self._testenv_attr.append(VenvAttribute(name, type, default, help, postprocess))63 def add_testenv_attribute_obj(self, obj):64 """ add an ini-file variable as an object.65 This works as the ``add_testenv_attribute`` function but expects66 "name", "type", "help", and "postprocess" attributes on the object.67 """68 assert hasattr(obj, "name")69 assert hasattr(obj, "type")70 assert hasattr(obj, "help")71 assert hasattr(obj, "postprocess")72 self._testenv_attr.append(obj)73 def _parse_args(self, args):74 return self.argparser.parse_args(args)75 def _format_help(self):76 return self.argparser.format_help()77class VenvAttribute:78 def __init__(self, name, type, default, help, postprocess):79 self.name = name80 self.type = type81 self.default = default82 self.help = help83 self.postprocess = postprocess84class DepOption:85 name = "deps"86 type = "line-list"87 help = "each line specifies a dependency in pip/setuptools format."88 default = ()89 def postprocess(self, testenv_config, value):90 deps = []91 config = testenv_config.config92 for depline in value:93 m = re.match(r":(\w+):\s*(\S+)", depline)94 if m:95 iname, name = m.groups()96 ixserver = config.indexserver[iname]97 else:98 name = depline.strip()99 ixserver = None100 name = self._replace_forced_dep(name, config)101 deps.append(DepConfig(name, ixserver))102 return deps103 def _replace_forced_dep(self, name, config):104 """105 Override the given dependency config name taking --force-dep-version106 option into account.107 :param name: dep config, for example ["pkg==1.0", "other==2.0"].108 :param config: Config instance109 :return: the new dependency that should be used for virtual environments110 """111 if not config.option.force_dep:112 return name113 for forced_dep in config.option.force_dep:114 if self._is_same_dep(forced_dep, name):115 return forced_dep116 return name117 @classmethod118 def _is_same_dep(cls, dep1, dep2):119 """120 Returns True if both dependency definitions refer to the121 same package, even if versions differ.122 """123 dep1_name = pkg_resources.Requirement.parse(dep1).project_name124 try:125 dep2_name = pkg_resources.Requirement.parse(dep2).project_name126 except pkg_resources.RequirementParseError:127 # we couldn't parse a version, probably a URL128 return False129 return dep1_name == dep2_name130class PosargsOption:131 name = "args_are_paths"132 type = "bool"133 default = True134 help = "treat positional args in commands as paths"135 def postprocess(self, testenv_config, value):136 config = testenv_config.config137 args = config.option.args138 if args:139 if value:140 args = []141 for arg in config.option.args:142 if arg:143 origpath = config.invocationcwd.join(arg, abs=True)144 if origpath.check():145 arg = testenv_config.changedir.bestrelpath(origpath)146 args.append(arg)147 testenv_config._reader.addsubstitutions(args)148 return value149class InstallcmdOption:150 name = "install_command"151 type = "argv"152 default = "pip install {opts} {packages}"153 help = "install command for dependencies and package under test."154 def postprocess(self, testenv_config, value):155 if '{packages}' not in value:156 raise tox.exception.ConfigError(157 "'install_command' must contain '{packages}' substitution")158 return value159def parseconfig(args=None, plugins=()):160 """161 :param list[str] args: Optional list of arguments.162 :type pkg: str163 :rtype: :class:`Config`164 :raise SystemExit: toxinit file is not found165 """166 pm = get_plugin_manager(plugins)167 if args is None:168 args = sys.argv[1:]169 # prepare command line options170 parser = Parser()171 pm.hook.tox_addoption(parser=parser)172 # parse command line options173 option = parser._parse_args(args)174 interpreters = tox.interpreters.Interpreters(hook=pm.hook)175 config = Config(pluginmanager=pm, option=option, interpreters=interpreters)176 config._parser = parser177 config._testenv_attr = parser._testenv_attr178 # parse ini file179 basename = config.option.configfile180 if os.path.isfile(basename):181 inipath = py.path.local(basename)182 elif os.path.isdir(basename):183 # Assume 'tox.ini' filename if directory was passed184 inipath = py.path.local(os.path.join(basename, 'tox.ini'))185 else:186 for path in py.path.local().parts(reverse=True):187 inipath = path.join(basename)188 if inipath.check():189 break190 else:191 inipath = py.path.local().join('setup.cfg')192 if not inipath.check():193 helpoptions = option.help or option.helpini194 feedback("toxini file %r not found" % (basename),195 sysexit=not helpoptions)196 if helpoptions:197 return config198 try:199 parseini(config, inipath)200 except tox.exception.InterpreterNotFound:201 exn = sys.exc_info()[1]202 # Use stdout to match test expectations203 py.builtin.print_("ERROR: " + str(exn))204 # post process config object205 pm.hook.tox_configure(config=config)206 return config207def feedback(msg, sysexit=False):208 py.builtin.print_("ERROR: " + msg, file=sys.stderr)209 if sysexit:210 raise SystemExit(1)211class VersionAction(argparse.Action):212 def __call__(self, argparser, *args, **kwargs):213 version = tox.__version__214 py.builtin.print_("%s imported from %s" % (version, tox.__file__))215 raise SystemExit(0)216class SetenvDict:217 def __init__(self, dict, reader):218 self.reader = reader219 self.definitions = dict220 self.resolved = {}221 self._lookupstack = []222 def __contains__(self, name):223 return name in self.definitions224 def get(self, name, default=None):225 try:226 return self.resolved[name]227 except KeyError:228 try:229 if name in self._lookupstack:230 raise KeyError(name)231 val = self.definitions[name]232 except KeyError:233 return os.environ.get(name, default)234 self._lookupstack.append(name)235 try:236 self.resolved[name] = res = self.reader._replace(val)237 finally:238 self._lookupstack.pop()239 return res240 def __getitem__(self, name):241 x = self.get(name, _dummy)242 if x is _dummy:243 raise KeyError(name)244 return x245 def keys(self):246 return self.definitions.keys()247 def __setitem__(self, name, value):248 self.definitions[name] = value249 self.resolved[name] = value250@hookimpl251def tox_addoption(parser):252 # formatter_class=argparse.ArgumentDefaultsHelpFormatter)253 parser.add_argument("--version", nargs=0, action=VersionAction,254 dest="version",255 help="report version information to stdout.")256 parser.add_argument("-h", "--help", action="store_true", dest="help",257 help="show help about options")258 parser.add_argument("--help-ini", "--hi", action="store_true", dest="helpini",259 help="show help about ini-names")260 parser.add_argument("-v", action='count', dest="verbosity", default=0,261 help="increase verbosity of reporting output. -vv mode turns off "262 "output redirection for package installation")263 parser.add_argument("--showconfig", action="store_true",264 help="show configuration information for all environments. ")265 parser.add_argument("-l", "--listenvs", action="store_true",266 dest="listenvs", help="show list of test environments "267 "(with description if verbose)")268 parser.add_argument("-a", "--listenvs-all", action="store_true",269 dest="listenvs_all",270 help="show list of all defined environments"271 "(with description if verbose)")272 parser.add_argument("-c", action="store", default="tox.ini",273 dest="configfile",274 help="config file name or directory with 'tox.ini' file.")275 parser.add_argument("-e", action="append", dest="env",276 metavar="envlist",277 help="work against specified environments (ALL selects all).")278 parser.add_argument("--notest", action="store_true", dest="notest",279 help="skip invoking test commands.")280 parser.add_argument("--sdistonly", action="store_true", dest="sdistonly",281 help="only perform the sdist packaging activity.")282 parser.add_argument("--installpkg", action="store", default=None,283 metavar="PATH",284 help="use specified package for installation into venv, instead of "285 "creating an sdist.")286 parser.add_argument("--develop", action="store_true", dest="develop",287 help="install package in the venv using 'setup.py develop' via "288 "'pip -e .'")289 parser.add_argument('-i', action="append",290 dest="indexurl", metavar="URL",291 help="set indexserver url (if URL is of form name=url set the "292 "url for the 'name' indexserver, specifically)")293 parser.add_argument("--pre", action="store_true", dest="pre",294 help="install pre-releases and development versions of dependencies. "295 "This will pass the --pre option to install_command "296 "(pip by default).")297 parser.add_argument("-r", "--recreate", action="store_true",298 dest="recreate",299 help="force recreation of virtual environments")300 parser.add_argument("--result-json", action="store",301 dest="resultjson", metavar="PATH",302 help="write a json file with detailed information "303 "about all commands and results involved.")304 # We choose 1 to 4294967295 because it is the range of PYTHONHASHSEED.305 parser.add_argument("--hashseed", action="store",306 metavar="SEED", default=None,307 help="set PYTHONHASHSEED to SEED before running commands. "308 "Defaults to a random integer in the range [1, 4294967295] "309 "([1, 1024] on Windows). "310 "Passing 'noset' suppresses this behavior.")311 parser.add_argument("--force-dep", action="append",312 metavar="REQ", default=None,313 help="Forces a certain version of one of the dependencies "314 "when configuring the virtual environment. REQ Examples "315 "'pytest<2.7' or 'django>=1.6'.")316 parser.add_argument("--sitepackages", action="store_true",317 help="override sitepackages setting to True in all envs")318 parser.add_argument("--alwayscopy", action="store_true",319 help="override alwayscopy setting to True in all envs")320 parser.add_argument("--skip-missing-interpreters", action="store_true",321 help="don't fail tests for missing interpreters")322 parser.add_argument("--workdir", action="store",323 dest="workdir", metavar="PATH", default=None,324 help="tox working directory")325 parser.add_argument("args", nargs="*",326 help="additional arguments available to command positional substitution")327 parser.add_testenv_attribute(328 name="envdir", type="path", default="{toxworkdir}/{envname}",329 help="set venv directory -- be very careful when changing this as tox "330 "will remove this directory when recreating an environment")331 # add various core venv interpreter attributes332 def setenv(testenv_config, value):333 setenv = value334 config = testenv_config.config335 if "PYTHONHASHSEED" not in setenv and config.hashseed is not None:336 setenv['PYTHONHASHSEED'] = config.hashseed337 return setenv338 parser.add_testenv_attribute(339 name="setenv", type="dict_setenv", postprocess=setenv,340 help="list of X=Y lines with environment variable settings")341 def basepython_default(testenv_config, value):342 if value is None:343 for f in testenv_config.factors:344 if f in default_factors:345 return default_factors[f]346 return sys.executable347 return str(value)348 parser.add_testenv_attribute(349 name="basepython", type="string", default=None, postprocess=basepython_default,350 help="executable name or path of interpreter used to create a "351 "virtual test environment.")352 def merge_description(testenv_config, value):353 """the reader by default joins generated description with new line,354 replace new line with space"""355 return value.replace('\n', ' ')356 parser.add_testenv_attribute(357 name="description", type="string", default='', postprocess=merge_description,358 help="short description of this environment")359 parser.add_testenv_attribute(360 name="envtmpdir", type="path", default="{envdir}/tmp",361 help="venv temporary directory")362 parser.add_testenv_attribute(363 name="envlogdir", type="path", default="{envdir}/log",364 help="venv log directory")365 parser.add_testenv_attribute(366 name="downloadcache", type="string", default=None,367 help="(ignored) has no effect anymore, pip-8 uses local caching by default")368 parser.add_testenv_attribute(369 name="changedir", type="path", default="{toxinidir}",370 help="directory to change to when running commands")371 parser.add_testenv_attribute_obj(PosargsOption())372 parser.add_testenv_attribute(373 name="skip_install", type="bool", default=False,374 help="Do not install the current package. This can be used when "375 "you need the virtualenv management but do not want to install "376 "the current package")377 parser.add_testenv_attribute(378 name="ignore_errors", type="bool", default=False,379 help="if set to True all commands will be executed irrespective of their "380 "result error status.")381 def recreate(testenv_config, value):382 if testenv_config.config.option.recreate:383 return True384 return value385 parser.add_testenv_attribute(386 name="recreate", type="bool", default=False, postprocess=recreate,387 help="always recreate this test environment.")388 def passenv(testenv_config, value):389 # Flatten the list to deal with space-separated values.390 value = list(391 itertools.chain.from_iterable(392 [x.split(' ') for x in value]))393 passenv = set([394 "PATH", "PIP_INDEX_URL", "LANG", "LANGUAGE", "LD_LIBRARY_PATH"395 ])396 # read in global passenv settings397 p = os.environ.get("TOX_TESTENV_PASSENV", None)398 if p is not None:399 env_values = [x for x in p.split() if x]400 value.extend(env_values)401 # we ensure that tmp directory settings are passed on402 # we could also set it to the per-venv "envtmpdir"403 # but this leads to very long paths when run with jenkins404 # so we just pass it on by default for now.405 if sys.platform == "win32":406 passenv.add("SYSTEMDRIVE") # needed for pip6407 passenv.add("SYSTEMROOT") # needed for python's crypto module408 passenv.add("PATHEXT") # needed for discovering executables409 passenv.add("COMSPEC") # needed for distutils cygwincompiler410 passenv.add("TEMP")411 passenv.add("TMP")412 else:413 passenv.add("TMPDIR")414 for spec in value:415 for name in os.environ:416 if fnmatchcase(name.upper(), spec.upper()):417 passenv.add(name)418 return passenv419 parser.add_testenv_attribute(420 name="passenv", type="line-list", postprocess=passenv,421 help="environment variables needed during executing test commands "422 "(taken from invocation environment). Note that tox always "423 "passes through some basic environment variables which are "424 "needed for basic functioning of the Python system. "425 "See --showconfig for the eventual passenv setting.")426 parser.add_testenv_attribute(427 name="whitelist_externals", type="line-list",428 help="each lines specifies a path or basename for which tox will not warn "429 "about it coming from outside the test environment.")430 parser.add_testenv_attribute(431 name="platform", type="string", default=".*",432 help="regular expression which must match against ``sys.platform``. "433 "otherwise testenv will be skipped.")434 def sitepackages(testenv_config, value):435 return testenv_config.config.option.sitepackages or value436 def alwayscopy(testenv_config, value):437 return testenv_config.config.option.alwayscopy or value438 parser.add_testenv_attribute(439 name="sitepackages", type="bool", default=False, postprocess=sitepackages,440 help="Set to ``True`` if you want to create virtual environments that also "441 "have access to globally installed packages.")442 parser.add_testenv_attribute(443 name="alwayscopy", type="bool", default=False, postprocess=alwayscopy,444 help="Set to ``True`` if you want virtualenv to always copy files rather "445 "than symlinking.")446 def pip_pre(testenv_config, value):447 return testenv_config.config.option.pre or value448 parser.add_testenv_attribute(449 name="pip_pre", type="bool", default=False, postprocess=pip_pre,450 help="If ``True``, adds ``--pre`` to the ``opts`` passed to "451 "the install command. ")452 def develop(testenv_config, value):453 option = testenv_config.config.option454 return not option.installpkg and (value or option.develop)455 parser.add_testenv_attribute(456 name="usedevelop", type="bool", postprocess=develop, default=False,457 help="install package in develop/editable mode")458 parser.add_testenv_attribute_obj(InstallcmdOption())459 parser.add_testenv_attribute(460 name="list_dependencies_command",461 type="argv",462 default="pip freeze",463 help="list dependencies for a virtual environment")464 parser.add_testenv_attribute_obj(DepOption())465 parser.add_testenv_attribute(466 name="commands", type="argvlist", default="",467 help="each line specifies a test command and can use substitution.")468 parser.add_testenv_attribute(469 "ignore_outcome", type="bool", default=False,470 help="if set to True a failing result of this testenv will not make "471 "tox fail, only a warning will be produced")472 parser.add_testenv_attribute(473 "extras", type="line-list",474 help="list of extras to install with the source distribution or "475 "develop install")476class Config(object):477 """ Global Tox config object. """478 def __init__(self, pluginmanager, option, interpreters):479 #: dictionary containing envname to envconfig mappings480 self.envconfigs = {}481 self.invocationcwd = py.path.local()482 self.interpreters = interpreters483 self.pluginmanager = pluginmanager484 #: option namespace containing all parsed command line options485 self.option = option486 @property487 def homedir(self):488 homedir = get_homedir()489 if homedir is None:490 homedir = self.toxinidir # XXX good idea?491 return homedir492class TestenvConfig:493 """ Testenv Configuration object.494 In addition to some core attributes/properties this config object holds all495 per-testenv ini attributes as attributes, see "tox --help-ini" for an overview.496 """497 def __init__(self, envname, config, factors, reader):498 #: test environment name499 self.envname = envname500 #: global tox config object501 self.config = config502 #: set of factors503 self.factors = factors504 self._reader = reader505 def get_envbindir(self):506 """ path to directory where scripts/binaries reside. """507 if (sys.platform == "win32"508 and "jython" not in self.basepython509 and "pypy" not in self.basepython):510 return self.envdir.join("Scripts")511 else:512 return self.envdir.join("bin")513 @property514 def envbindir(self):515 return self.get_envbindir()516 @property517 def envpython(self):518 """ path to python executable. """519 return self.get_envpython()520 def get_envpython(self):521 """ path to python/jython executable. """522 if "jython" in str(self.basepython):523 name = "jython"524 else:525 name = "python"526 return self.envbindir.join(name)527 def get_envsitepackagesdir(self):528 """ return sitepackagesdir of the virtualenv environment.529 (only available during execution, not parsing)530 """531 x = self.config.interpreters.get_sitepackagesdir(532 info=self.python_info,533 envdir=self.envdir)534 return x535 @property536 def python_info(self):537 """ return sitepackagesdir of the virtualenv environment. """538 return self.config.interpreters.get_info(envconfig=self)539 def getsupportedinterpreter(self):540 if sys.platform == "win32" and self.basepython and \541 "jython" in self.basepython:542 raise tox.exception.UnsupportedInterpreter(543 "Jython/Windows does not support installing scripts")544 info = self.config.interpreters.get_info(envconfig=self)545 if not info.executable:546 raise tox.exception.InterpreterNotFound(self.basepython)547 if not info.version_info:548 raise tox.exception.InvocationError(549 'Failed to get version_info for %s: %s' % (info.name, info.err))550 if info.version_info < (2, 6):551 raise tox.exception.UnsupportedInterpreter(552 "python2.5 is not supported anymore, sorry")553 return info.executable554testenvprefix = "testenv:"555def get_homedir():556 try:557 return py.path.local._gethomedir()558 except Exception:559 return None560def make_hashseed():561 max_seed = 4294967295562 if sys.platform == 'win32':563 max_seed = 1024564 return str(random.randint(1, max_seed))565class parseini:566 def __init__(self, config, inipath):567 config.toxinipath = inipath568 config.toxinidir = config.toxinipath.dirpath()569 self._cfg = py.iniconfig.IniConfig(config.toxinipath)570 config._cfg = self._cfg571 self.config = config572 if inipath.basename == 'setup.cfg':573 prefix = 'tox'574 else:575 prefix = None576 ctxname = getcontextname()577 if ctxname == "jenkins":578 reader = SectionReader("tox:jenkins", self._cfg, prefix=prefix,579 fallbacksections=['tox'])580 distshare_default = "{toxworkdir}/distshare"581 elif not ctxname:582 reader = SectionReader("tox", self._cfg, prefix=prefix)583 distshare_default = "{homedir}/.tox/distshare"584 else:585 raise ValueError("invalid context")586 if config.option.hashseed is None:587 hashseed = make_hashseed()588 elif config.option.hashseed == 'noset':589 hashseed = None590 else:591 hashseed = config.option.hashseed592 config.hashseed = hashseed593 reader.addsubstitutions(toxinidir=config.toxinidir,594 homedir=config.homedir)595 # As older versions of tox may have bugs or incompatabilities that596 # prevent parsing of tox.ini this must be the first thing checked.597 config.minversion = reader.getstring("minversion", None)598 if config.minversion:599 minversion = NormalizedVersion(self.config.minversion)600 toxversion = NormalizedVersion(tox.__version__)601 if toxversion < minversion:602 raise tox.exception.MinVersionError(603 "tox version is %s, required is at least %s" % (604 toxversion, minversion))605 if config.option.workdir is None:606 config.toxworkdir = reader.getpath("toxworkdir", "{toxinidir}/.tox")607 else:608 config.toxworkdir = config.toxinidir.join(config.option.workdir, abs=True)609 if not config.option.skip_missing_interpreters:610 config.option.skip_missing_interpreters = \611 reader.getbool("skip_missing_interpreters", False)612 # determine indexserver dictionary613 config.indexserver = {'default': IndexServerConfig('default')}614 prefix = "indexserver"615 for line in reader.getlist(prefix):616 name, url = map(lambda x: x.strip(), line.split("=", 1))617 config.indexserver[name] = IndexServerConfig(name, url)618 override = False619 if config.option.indexurl:620 for urldef in config.option.indexurl:621 m = re.match(r"\W*(\w+)=(\S+)", urldef)622 if m is None:623 url = urldef624 name = "default"625 else:626 name, url = m.groups()627 if not url:628 url = None629 if name != "ALL":630 config.indexserver[name].url = url631 else:632 override = url633 # let ALL override all existing entries634 if override:635 for name in config.indexserver:636 config.indexserver[name] = IndexServerConfig(name, override)637 reader.addsubstitutions(toxworkdir=config.toxworkdir)638 config.distdir = reader.getpath("distdir", "{toxworkdir}/dist")639 reader.addsubstitutions(distdir=config.distdir)640 config.distshare = reader.getpath("distshare", distshare_default)641 reader.addsubstitutions(distshare=config.distshare)642 config.sdistsrc = reader.getpath("sdistsrc", None)643 config.setupdir = reader.getpath("setupdir", "{toxinidir}")644 config.logdir = config.toxworkdir.join("log")645 config.envlist, all_envs = self._getenvdata(reader)646 # factors used in config or predefined647 known_factors = self._list_section_factors("testenv")648 known_factors.update(default_factors)649 known_factors.add("python")650 # factors stated in config envlist651 stated_envlist = reader.getstring("envlist", replace=False)652 if stated_envlist:653 for env in _split_env(stated_envlist):654 known_factors.update(env.split('-'))655 # configure testenvs656 for name in all_envs:657 section = testenvprefix + name658 factors = set(name.split('-'))659 if section in self._cfg or factors <= known_factors:660 config.envconfigs[name] = \661 self.make_envconfig(name, section, reader._subs, config)662 all_develop = all(name in config.envconfigs663 and config.envconfigs[name].usedevelop664 for name in config.envlist)665 config.skipsdist = reader.getbool("skipsdist", all_develop)666 def _list_section_factors(self, section):667 factors = set()668 if section in self._cfg:669 for _, value in self._cfg[section].items():670 exprs = re.findall(r'^([\w{}\.,-]+)\:\s+', value, re.M)671 factors.update(*mapcat(_split_factor_expr, exprs))672 return factors673 def make_envconfig(self, name, section, subs, config):674 factors = set(name.split('-'))675 reader = SectionReader(section, self._cfg, fallbacksections=["testenv"],676 factors=factors)677 vc = TestenvConfig(config=config, envname=name, factors=factors, reader=reader)678 reader.addsubstitutions(**subs)679 reader.addsubstitutions(envname=name)680 reader.addsubstitutions(envbindir=vc.get_envbindir,681 envsitepackagesdir=vc.get_envsitepackagesdir,682 envpython=vc.get_envpython)683 for env_attr in config._testenv_attr:684 atype = env_attr.type685 if atype in ("bool", "path", "string", "dict", "dict_setenv", "argv", "argvlist"):686 meth = getattr(reader, "get" + atype)687 res = meth(env_attr.name, env_attr.default)688 elif atype == "space-separated-list":689 res = reader.getlist(env_attr.name, sep=" ")690 elif atype == "line-list":691 res = reader.getlist(env_attr.name, sep="\n")692 else:693 raise ValueError("unknown type %r" % (atype,))694 if env_attr.postprocess:695 res = env_attr.postprocess(testenv_config=vc, value=res)696 setattr(vc, env_attr.name, res)697 if atype == "path":698 reader.addsubstitutions(**{env_attr.name: res})699 return vc700 def _getenvdata(self, reader):701 envstr = self.config.option.env \702 or os.environ.get("TOXENV") \703 or reader.getstring("envlist", replace=False) \704 or []705 envlist = _split_env(envstr)706 # collect section envs707 all_envs = set(envlist) - set(["ALL"])708 for section in self._cfg:709 if section.name.startswith(testenvprefix):710 all_envs.add(section.name[len(testenvprefix):])711 if not all_envs:712 all_envs.add("python")713 if not envlist or "ALL" in envlist:714 envlist = sorted(all_envs)715 return envlist, all_envs716def _split_env(env):717 """if handed a list, action="append" was used for -e """718 if not isinstance(env, list):719 if '\n' in env:720 env = ','.join(env.split('\n'))721 env = [env]722 return mapcat(_expand_envstr, env)723def _split_factor_expr(expr):724 partial_envs = _expand_envstr(expr)725 return [set(e.split('-')) for e in partial_envs]726def _expand_envstr(envstr):727 # split by commas not in groups728 tokens = re.split(r'((?:\{[^}]+\})+)|,', envstr)729 envlist = [''.join(g).strip()730 for k, g in itertools.groupby(tokens, key=bool) if k]731 def expand(env):732 tokens = re.split(r'\{([^}]+)\}', env)733 parts = [token.split(',') for token in tokens]734 return [''.join(variant) for variant in itertools.product(*parts)]735 return mapcat(expand, envlist)736def mapcat(f, seq):737 return list(itertools.chain.from_iterable(map(f, seq)))738class DepConfig:739 def __init__(self, name, indexserver=None):740 self.name = name741 self.indexserver = indexserver742 def __str__(self):743 if self.indexserver:744 if self.indexserver.name == "default":745 return self.name746 return ":%s:%s" % (self.indexserver.name, self.name)747 return str(self.name)748 __repr__ = __str__749class IndexServerConfig:750 def __init__(self, name, url=None):751 self.name = name752 self.url = url753#: Check value matches substitution form754#: of referencing value from other section. E.g. {[base]commands}755is_section_substitution = re.compile("{\[[^{}\s]+\]\S+?}").match756class SectionReader:757 def __init__(self, section_name, cfgparser, fallbacksections=None,758 factors=(), prefix=None):759 if prefix is None:760 self.section_name = section_name761 else:762 self.section_name = "%s:%s" % (prefix, section_name)763 self._cfg = cfgparser764 self.fallbacksections = fallbacksections or []765 self.factors = factors766 self._subs = {}767 self._subststack = []768 self._setenv = None769 def get_environ_value(self, name):770 if self._setenv is None:771 return os.environ.get(name)772 return self._setenv.get(name)773 def addsubstitutions(self, _posargs=None, **kw):774 self._subs.update(kw)775 if _posargs:776 self.posargs = _posargs777 def getpath(self, name, defaultpath):778 toxinidir = self._subs['toxinidir']779 path = self.getstring(name, defaultpath)780 if path is not None:781 return toxinidir.join(path, abs=True)782 def getlist(self, name, sep="\n"):783 s = self.getstring(name, None)784 if s is None:785 return []786 return [x.strip() for x in s.split(sep) if x.strip()]787 def getdict(self, name, default=None, sep="\n"):788 value = self.getstring(name, None)789 return self._getdict(value, default=default, sep=sep)790 def getdict_setenv(self, name, default=None, sep="\n"):791 value = self.getstring(name, None, replace=True, crossonly=True)792 definitions = self._getdict(value, default=default, sep=sep)793 self._setenv = SetenvDict(definitions, reader=self)794 return self._setenv795 def _getdict(self, value, default, sep):796 if value is None:797 return default or {}798 d = {}799 for line in value.split(sep):800 if line.strip():801 name, rest = line.split('=', 1)802 d[name.strip()] = rest.strip()803 return d804 def getbool(self, name, default=None):805 s = self.getstring(name, default)806 if not s:807 s = default808 if s is None:809 raise KeyError("no config value [%s] %s found" % (810 self.section_name, name))811 if not isinstance(s, bool):812 if s.lower() == "true":813 s = True814 elif s.lower() == "false":815 s = False816 else:817 raise tox.exception.ConfigError(818 "boolean value %r needs to be 'True' or 'False'")819 return s820 def getargvlist(self, name, default=""):821 s = self.getstring(name, default, replace=False)822 return _ArgvlistReader.getargvlist(self, s)823 def getargv(self, name, default=""):824 return self.getargvlist(name, default)[0]825 def getstring(self, name, default=None, replace=True, crossonly=False):826 x = None827 for s in [self.section_name] + self.fallbacksections:828 try:829 x = self._cfg[s][name]830 break831 except KeyError:832 continue833 if x is None:834 x = default835 else:836 x = self._apply_factors(x)837 if replace and x and hasattr(x, 'replace'):838 x = self._replace(x, name=name, crossonly=crossonly)839 # print "getstring", self.section_name, name, "returned", repr(x)840 return x841 def _apply_factors(self, s):842 def factor_line(line):843 m = re.search(r'^([\w{}\.,-]+)\:\s+(.+)', line)844 if not m:845 return line846 expr, line = m.groups()847 if any(fs <= self.factors for fs in _split_factor_expr(expr)):848 return line849 lines = s.strip().splitlines()850 return '\n'.join(filter(None, map(factor_line, lines)))851 def _replace(self, value, name=None, section_name=None, crossonly=False):852 if '{' not in value:853 return value854 section_name = section_name if section_name else self.section_name855 self._subststack.append((section_name, name))856 try:857 return Replacer(self, crossonly=crossonly).do_replace(value)858 finally:859 assert self._subststack.pop() == (section_name, name)860class Replacer:861 RE_ITEM_REF = re.compile(862 r'''863 (?<!\\)[{]864 (?:(?P<sub_type>[^[:{}]+):)? # optional sub_type for special rules865 (?P<substitution_value>(?:\[[^,{}]*\])?[^:,{}]*) # substitution key866 (?::(?P<default_value>[^{}]*))? # default value867 [}]868 ''', re.VERBOSE)869 def __init__(self, reader, crossonly=False):870 self.reader = reader871 self.crossonly = crossonly872 def do_replace(self, x):873 return self.RE_ITEM_REF.sub(self._replace_match, x)874 def _replace_match(self, match):875 g = match.groupdict()876 sub_value = g['substitution_value']877 if self.crossonly:878 if sub_value.startswith("["):879 return self._substitute_from_other_section(sub_value)880 # in crossonly we return all other hits verbatim881 start, end = match.span()882 return match.string[start:end]883 # special case: all empty values means ":" which is os.pathsep884 if not any(g.values()):885 return os.pathsep886 # special case: opts and packages. Leave {opts} and887 # {packages} intact, they are replaced manually in888 # _venv.VirtualEnv.run_install_command.889 if sub_value in ('opts', 'packages'):890 return '{%s}' % sub_value891 try:892 sub_type = g['sub_type']893 except KeyError:894 raise tox.exception.ConfigError(895 "Malformed substitution; no substitution type provided")896 if sub_type == "env":897 return self._replace_env(match)898 if sub_type is not None:899 raise tox.exception.ConfigError(900 "No support for the %s substitution type" % sub_type)901 return self._replace_substitution(match)902 def _replace_env(self, match):903 envkey = match.group('substitution_value')904 if not envkey:905 raise tox.exception.ConfigError(906 'env: requires an environment variable name')907 default = match.group('default_value')908 envvalue = self.reader.get_environ_value(envkey)909 if envvalue is None:910 if default is None:911 raise tox.exception.ConfigError(912 "substitution env:%r: unknown environment variable %r "913 " or recursive definition." %914 (envkey, envkey))915 return default916 return envvalue917 def _substitute_from_other_section(self, key):918 if key.startswith("[") and "]" in key:919 i = key.find("]")920 section, item = key[1:i], key[i + 1:]921 cfg = self.reader._cfg922 if section in cfg and item in cfg[section]:923 if (section, item) in self.reader._subststack:924 raise ValueError('%s already in %s' % (925 (section, item), self.reader._subststack))926 x = str(cfg[section][item])927 return self.reader._replace(x, name=item, section_name=section,928 crossonly=self.crossonly)929 raise tox.exception.ConfigError(930 "substitution key %r not found" % key)931 def _replace_substitution(self, match):932 sub_key = match.group('substitution_value')933 val = self.reader._subs.get(sub_key, None)934 if val is None:935 val = self._substitute_from_other_section(sub_key)936 if py.builtin.callable(val):937 val = val()938 return str(val)939class _ArgvlistReader:940 @classmethod941 def getargvlist(cls, reader, value):942 """Parse ``commands`` argvlist multiline string.943 :param str name: Key name in a section.944 :param str value: Content stored by key.945 :rtype: list[list[str]]946 :raise :class:`tox.exception.ConfigError`:947 line-continuation ends nowhere while resolving for specified section948 """949 commands = []950 current_command = ""951 for line in value.splitlines():952 line = line.rstrip()953 if not line:954 continue955 if line.endswith("\\"):956 current_command += " " + line[:-1]957 continue958 current_command += line959 if is_section_substitution(current_command):960 replaced = reader._replace(current_command, crossonly=True)961 commands.extend(cls.getargvlist(reader, replaced))962 else:963 commands.append(cls.processcommand(reader, current_command))964 current_command = ""965 else:966 if current_command:967 raise tox.exception.ConfigError(968 "line-continuation ends nowhere while resolving for [%s] %s" %969 (reader.section_name, "commands"))970 return commands971 @classmethod972 def processcommand(cls, reader, command):973 posargs = getattr(reader, "posargs", "")974 posargs_string = list2cmdline([x for x in posargs if x])975 # Iterate through each word of the command substituting as976 # appropriate to construct the new command string. This977 # string is then broken up into exec argv components using978 # shlex.979 newcommand = ""980 for word in CommandParser(command).words():981 if word == "{posargs}" or word == "[]":982 newcommand += posargs_string983 continue984 elif word.startswith("{posargs:") and word.endswith("}"):985 if posargs:986 newcommand += posargs_string987 continue988 else:989 word = word[9:-1]990 new_arg = ""991 new_word = reader._replace(word)992 new_word = reader._replace(new_word)993 new_word = new_word.replace('\\{', '{').replace('\\}', '}')994 new_arg += new_word995 newcommand += new_arg996 # Construct shlex object that will not escape any values,997 # use all values as is in argv.998 shlexer = shlex.shlex(newcommand, posix=True)999 shlexer.whitespace_split = True1000 shlexer.escape = ''1001 return list(shlexer)1002class CommandParser(object):1003 class State(object):1004 def __init__(self):1005 self.word = ''1006 self.depth = 01007 self.yield_words = []1008 def __init__(self, command):1009 self.command = command1010 def words(self):1011 ps = CommandParser.State()1012 def word_has_ended():1013 return ((cur_char in string.whitespace and ps.word and1014 ps.word[-1] not in string.whitespace) or1015 (cur_char == '{' and ps.depth == 0 and not ps.word.endswith('\\')) or1016 (ps.depth == 0 and ps.word and ps.word[-1] == '}') or1017 (cur_char not in string.whitespace and ps.word and1018 ps.word.strip() == ''))1019 def yield_this_word():1020 yieldword = ps.word1021 ps.word = ''1022 if yieldword:1023 ps.yield_words.append(yieldword)1024 def yield_if_word_ended():1025 if word_has_ended():1026 yield_this_word()1027 def accumulate():1028 ps.word += cur_char1029 def push_substitution():1030 ps.depth += 11031 def pop_substitution():1032 ps.depth -= 11033 for cur_char in self.command:1034 if cur_char in string.whitespace:1035 if ps.depth == 0:1036 yield_if_word_ended()1037 accumulate()1038 elif cur_char == '{':1039 yield_if_word_ended()1040 accumulate()1041 push_substitution()1042 elif cur_char == '}':1043 accumulate()1044 pop_substitution()1045 else:1046 yield_if_word_ended()1047 accumulate()1048 if ps.word.strip():1049 yield_this_word()1050 return ps.yield_words1051def getcontextname():1052 if any(env in os.environ for env in ['JENKINS_URL', 'HUDSON_URL']):1053 return 'jenkins'...

Full Screen

Full Screen

repository.py

Source:repository.py Github

copy

Full Screen

1from __future__ import annotations2from itertools import chain3import typing as t4from .exceptions import CannotResolve, CircularDependency, PartiallyResolved5from .helpers import EMPTY, _LookupStack, _Stack6from .types import DependencyInfo, T7from .resolvers import Resolver8class SimpleRepository:9 def __init__(self, resolvers: t.Iterable[Resolver]):10 self.resolvers: t.List[Resolver] = list(resolvers)11 self.types_by_str: t.Dict[str, t.Type[T]] = {}12 self.types: t.Dict[t.Type, DependencyInfo] = {}13 self.instances: t.Dict[t.Type[T], t.List[T]] = {}14 self.lookup_stack = _LookupStack()15 def get(16 self, key: t.Union[t.Type[T], str], many=False, repo=None, kwargs=None17 ) -> t.Union[t.List[T], T]:18 if isinstance(key, str):19 key = self.types_by_str.get(key)20 if key is None:21 raise CannotResolve(f"Could not resolve for key {key}")22 if many:23 return self.instances.get(key, [])24 if not isinstance(key, type) and callable(key):25 name = getattr(key, "__name__", str(key))26 return self.resolve(factory=key, key=name, repo=repo, kwargs=kwargs)27 if kwargs is not None or key not in self.instances:28 inst = self.create(key, repo=repo, kwargs=kwargs)29 return inst30 instances = self.instances[key]31 return instances[-1]32 def create(self, key: t.Type[T], repo=None, kwargs=None) -> T:33 """Instantiate the object and all its dependencies34 :param key: The class / factory function to instantiate35 :param repo: The current ``Repository`` used for requesting dependencies36 :param kwargs: Any user specified keyword arguments (see :func:`gimme.get`). These37 will have preference over any keyword arguments this function supplies and any38 keyword arguments supplied by :func:`gimme.register`39 """40 repo = repo or self41 if not isinstance(key, type):42 raise TypeError(f"Can only create classes, not {key}")43 if key in self.lookup_stack:44 raise CircularDependency(str(self.lookup_stack))45 info = self._ensure_info(key)46 do_store = kwargs is None and info.store47 combined_kwargs = {**(info.kwargs or {}), **(kwargs or {})}48 inst = self.resolve(info.factory, key=key, repo=repo, kwargs=combined_kwargs)49 if do_store:50 self.add(inst)51 return inst52 def resolve(self, factory, key, repo=None, kwargs=None):53 """Resolve a factory function. This resolves all function parameters as dependencies,54 runs the function and returns the result55 """56 inst = EMPTY57 with self.lookup_stack.push(key):58 for plugin in self.resolvers:59 try:60 return plugin.create(factory, repo, kwargs)61 except (CannotResolve, PartiallyResolved):62 continue63 if inst is EMPTY:64 raise CannotResolve(str(self.lookup_stack))65 def _ensure_info(self, cls: t.Type[T]) -> DependencyInfo:66 info = self.types.get(cls)67 if info:68 return info69 self.register(cls)70 return self._ensure_info(cls)71 def add(self, inst, deep=True):72 def append_instance_to(key):73 if key not in self.instances:74 self.instances[key] = []75 self.instances[key].append(inst)76 cls = type(inst)77 self.register(cls)78 append_instance_to(cls)79 if deep:80 for base in cls.__mro__[1:]:81 append_instance_to(base)82 def register(83 self,84 cls: t.Type[T] = None,85 factory: t.Callable = None,86 info: DependencyInfo = None,87 store=True,88 kwargs=None,89 ):90 if not (bool(cls) ^ bool(info)):91 raise ValueError("Supply either cls or info")92 if info is None:93 if not isinstance(cls, type):94 raise TypeError(f"Can only register classes, not {cls}")95 if factory is None:96 factory = cls97 info = DependencyInfo(cls=cls, factory=factory, store=store, kwargs=kwargs)98 for base in info.cls.__mro__:99 if base not in self.types:100 key = base.__name__101 self.types_by_str[key] = base102 self.types[base] = info103 def add_resolver(self, resolver: Resolver):104 self.resolvers.insert(0, resolver)105 def __contains__(self, item: t.Type):106 return item in self.instances107class LayeredRepository(_Stack[SimpleRepository]):108 def __init__(self, first_layer: SimpleRepository):109 super().__init__([first_layer])110 @property111 def current(self) -> SimpleRepository:112 return self[-1]113 def get(self, key: t.Union[t.Type[T], str], many=False, kwargs=None) -> t.Union[t.List[T], T]:114 err = None115 if many:116 return list(chain.from_iterable(repo.get(key, many) for repo in self))117 for repo in reversed(self):118 try:119 return repo.get(key, many, repo=self, kwargs=kwargs)120 except CannotResolve as e:121 err = e122 if err:123 raise err124 def create(self, key: t.Type[T]) -> T:125 return self.current.create(key, repo=self)126 def pop(self):127 if len(self) <= 1:128 raise IndexError("Cannot pop the base repository layer")129 return super().pop()130 def __exit__(self, exc_type, exc_val, exc_tb):131 try:132 self.pop()133 except IndexError:134 pass135 def __getattr__(self, item):136 return getattr(self.current, item)137 def __contains__(self, item: t.Type):...

Full Screen

Full Screen

helpers.py

Source:helpers.py Github

copy

Full Screen

1import collections2import typing as t3from .types import T, TypeHintInfo4def is_generic_type_hint(hint):5 return hasattr(hint, "__origin__")6def parse_type_hint(hint) -> t.Optional[TypeHintInfo]:7 """8 Get the constructor for iterable/sequence type hints:9 returns the concrete type belonging to the type hint, ie `set` for `typing.Set`10 for the abstract type hints typing.Iterable11 :param hint: A Generic Type hint12 :returns: TypeHintInfo, or None if it could not be parsed into a list-like collection of13 `type`. eg for nested generic types (`List[List[int]]`)14 """15 # hint must be a Type hint of type Iterable, but not a Mapping16 if (17 not is_generic_type_hint(hint)18 or hint.__origin__ is None # py3619 or not issubclass(hint.__origin__, collections.abc.Iterable)20 or issubclass(hint.__origin__, collections.abc.Mapping)21 ):22 return None23 collection_type = hint.__origin__24 if issubclass(collection_type, tuple):25 # Must be variable length tuple26 if len(hint.__args__) != 2 or hint.__args__[1] != Ellipsis:27 return None28 # abstract types like Iterable and Sequence are given as list29 if collection_type in vars(collections.abc).values():30 collection_type = list31 inner_type = getattr(hint, "__args__", [None])[0]32 if isinstance(inner_type, t.ForwardRef):33 inner_type = inner_type.__forward_arg__34 elif not isinstance(inner_type, (str, type)):35 return None36 return TypeHintInfo(collection_type, inner_type)37class _Stack(list, t.List[T]):38 def push(self, item):39 self.append(item)40 return self41 def __enter__(self):42 return self43 def __exit__(self, exc_type, exc_val, exc_tb):44 self.pop()45class _LookupStack(_Stack):46 def __str__(self):47 return " -> ".join(getattr(i, "__name__", str(i)) for i in self)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful