How to use spec_relpath method in Mamba

Best Python code snippet using mamba

dicom2spec.py

Source:dicom2spec.py Github

copy

Full Screen

1"""Derive a study specification snippet describing a DICOM series based on the2DICOM metadata as provided by datalad.3"""4import logging5import os.path as op6from datalad.core.local.save import Save7from datalad.distribution.dataset import EnsureDataset8from datalad.distribution.dataset import datasetmethod9from datalad.distribution.dataset import require_dataset10from datalad.distribution.dataset import resolve_path11from datalad.interface.base import Interface12from datalad.interface.base import build_doc13from datalad.interface.utils import eval_results14from datalad.support import json_py15from datalad.support.constraints import EnsureNone16from datalad.support.constraints import EnsureStr17from datalad.support.exceptions import InsufficientArgumentsError18from datalad.support.param import Parameter19from datalad_hirni.commands.spec4anything import _get_edit_dict20from datalad_hirni.support.spec_helpers import (21 get_specval,22 has_specval23)24# bound dataset method25import datalad_metalad.dump26lgr = logging.getLogger('datalad.hirni.dicom2spec')27class RuleSet(object):28 """Holds and applies the current rule set for deriving BIDS terms from29 DICOM metadata"""30 def __init__(self, dataset=None):31 """Retrieves the configured set of rules32 Rules are defined by classes ... + __datalad_hirni_rules33 datalad.hirni.dicom2spec.rules ... multiple34 Parameters35 ----------36 dataset: Dataset37 Dataset to read possibly customized rules from38 """39 from datalad.utils import assure_list40 from datalad import cfg as dl_cfg41 from datalad_hirni.support.default_rules import DefaultRules42 cfg = dataset.config if dataset else dl_cfg43 self._rule_set = []44 # get a list of paths to build the rule set from45 # Note: assure_list is supposed to return empty list if there's nothing46 self._file_list = \47 assure_list(cfg.get("datalad.hirni.dicom2spec.rules"))48 lgr.debug("loaded list of rule files: %s", self._file_list)49 for file in self._file_list:50 if not op.exists(file) or not op.isfile(file):51 lgr.warning("Ignored invalid path for dicom2spec rules "52 "definition: %s", file)53 continue54 from datalad.utils import import_module_from_file55 from datalad.dochelpers import exc_str56 try:57 mod = import_module_from_file(file)58 except Exception as e:59 # any exception means full stop60 raise ValueError("Rules definition file at {} is broken: {}"61 "".format(file, exc_str(e)))62 # check file's __datalad_hirni_rules for the actual class:63 if not hasattr(mod, "__datalad_hirni_rules"):64 raise ValueError("Rules definition file {} missed attribute "65 "'__datalad_hirni_rules'.".format(file))66 self._rule_set.append(getattr(mod, "__datalad_hirni_rules"))67 if not self._rule_set:68 self._rule_set = [DefaultRules]69 def apply(self, dicommetadata, subject=None,70 anon_subject=None, session=None):71 """Applies rule set to DICOM metadata72 Note, that a particular series can be determined invalid (for73 application) by those rules, but still needs to show up in the74 specification for later review.75 Parameters76 ----------77 dicommetadata: list of dict78 expects datalad's metadata for DICOMs (the list of dicts for the all79 the series)80 Returns81 -------82 list of dict83 derived dict in specification terminology84 """85 # instantiate rules with metadata; note, that some possible rules might86 # need the entirety of it, not just the current series to be treated.87 actual_rules = [r(dicommetadata) for r in self._rule_set]88 # we want one specification dict per image series89 result_dicts = [dict() for i in range(len(dicommetadata))]90 for rule in actual_rules:91 # TODO: generic overrides instead (or none at all here and let this92 # be done later on - not sure, what's most useful for the rules93 # themselves. Also: If we know already we can save the effort to94 # deduct => likely keep passing on to the rules)95 dict_list = rule(subject=subject,96 anon_subject=anon_subject,97 session=session)98 # should return exactly one dict per series:99 assert len(dict_list) == len(dicommetadata)100 for idx, t in zip(range(len(dicommetadata)), dict_list):101 value_dict = t[0]102 is_valid = t[1]103 for key in value_dict.keys():104 # TODO: This should get more complex (deriving105 # tags/procedures?) and use some SpecHandler for assignment106 # (more sophisticated than _get_edit_dict)107 result_dicts[idx][key] = {'value': value_dict[key],108 'approved': False}109 if not is_valid:110 if 'tags' in result_dicts[idx] and \111 'hirni-dicom-converter-ignore' not in \112 result_dicts[idx]['tags']:113 result_dicts[idx]['tags'].append('hirni-dicom-converter-ignore')114 else:115 result_dicts[idx]['tags'] = ['hirni-dicom-converter-ignore']116 return result_dicts117def add_to_spec(ds_metadata, spec_list, basepath,118 subject=None, anon_subject=None, session=None, overrides=None, dataset=None):119 # TODO: discover procedures and write default config into spec for more convenient editing!120 # But: Would need toolbox present to create a spec. If not - what version of toolbox to use?121 # Double-check run-procedure --discover122 # Spec needs a dicomseries:all snippet before the actual dicomseries123 # snippets, since the order determines the order of execution of procedures124 # later on.125 # Note, that here we only make sure such a snippet exists. It is to be126 # updated with unique values from the dicomseries snippets later on.127 existing_all_dicoms = [i for s, i in zip(spec_list, range(len(spec_list)))128 if s['type'] == 'dicomseries:all']129 assert len(existing_all_dicoms) <= 1130 if not existing_all_dicoms:131 spec_list.append({'type': 'dicomseries:all'})132 existing_all_dicoms = len(spec_list) - 1133 else:134 existing_all_dicoms = existing_all_dicoms[0]135 # proceed with actual image series:136 lgr.debug("Discovered %s image series.",137 len(ds_metadata['metadata']['dicom']['Series']))138 # generate a list of dicts, with the "rule-proof" entries:139 base_list = []140 for series in ds_metadata['metadata']['dicom']['Series']:141 base_list.append({142 # Note: The first 4 entries aren't a dict and have no143 # "approved flag", since they are automatically managed144 'type': 'dicomseries',145 'location': op.relpath(ds_metadata['path'], basepath),146 'uid': series['SeriesInstanceUID'],147 'dataset-id': ds_metadata['dsid'],148 'dataset-refcommit': ds_metadata['refcommit'],149 'tags': []150 #'tags': ['hirni-dicom-converter-ignore']151 # if not series_is_valid(series) else [],152 })153 rules_new = RuleSet(dataset=dataset) # TODO: Pass on dataset for config access! => RF the entire thing154 derived = rules_new.apply(ds_metadata['metadata']['dicom']['Series'],155 subject=subject,156 anon_subject=anon_subject,157 session=session158 )159 # TODO: Move assertion to a test?160 assert len(derived) == len(base_list)161 for idx in range(len(base_list)):162 base_list[idx].update(derived[idx])163 # merge with existing spec plus overrides:164 for series in base_list:165 series.update(overrides)166 existing = [i for s, i in167 zip(spec_list, range(len(spec_list)))168 if s['type'] == 'dicomseries' and s['uid'] == series['uid']]169 if existing:170 lgr.debug("Updating existing spec for image series %s",171 series['uid'])172 # we already had data of that series in the spec;173 spec_list[existing[0]].update(series)174 else:175 lgr.debug("Creating spec for image series %s", series['uid'])176 spec_list.append(series)177 # spec snippet for addressing an entire dicom acquisition:178 # fill in values of editable fields, that are unique across179 # dicomseries180 uniques = dict()181 for s in spec_list:182 for k in s.keys():183 if isinstance(s[k], dict) and 'value' in s[k]:184 if k not in uniques:185 uniques[k] = set()186 uniques[k].add(s[k]['value'])187 all_dicoms = dict()188 for k in uniques:189 if len(uniques[k]) == 1:190 all_dicoms[k] = _get_edit_dict(value=uniques[k].pop(),191 approved=False)192 all_dicoms.update({193 'type': 'dicomseries:all',194 'location': op.relpath(ds_metadata['path'], basepath),195 'dataset-id': ds_metadata['dsid'],196 'dataset-refcommit': ds_metadata['refcommit'],197 'procedures': [{198 'procedure-name': {'value': 'hirni-dicom-converter',199 'approved': False},200 'procedure-call': {'value': None,201 'approved': False},202 'on-anonymize': {'value': False,203 'approved': False},204 },205 ]206 })207 spec_list[existing_all_dicoms].update(all_dicoms)208 return spec_list209@build_doc210class Dicom2Spec(Interface):211 """Derives a specification snippet from DICOM metadata and stores it in a212 JSON file.213 The derivation is based on a rule system. You can implement your own rules as a python class.214 See the documentation page on customization for details. If you have such rules in dedicated files,215 their use and priority is configured via the datalad.hirni.dicom2spec.rules config variable. It takes216 a path to a python file containung such a rule definition. This configuration can be specified multiple217 times and at different levels (system-wide, user, dataset, local repository). If there are indeed218 several occurences of that configuration, the respective rules will be applied in order. Hence "later"219 appearances will overwrite "earlier" ones. Thereby you can have institution rules for example and still220 apply additional rules tailored to your needs or a particular study.221 """222 _params_ = dict(223 dataset=Parameter(224 args=("-d", "--dataset"),225 doc="""specify a dataset containing the DICOM metadata to be226 used. If no dataset is given, an attempt is made to identify227 the dataset based on the current working directory""",228 constraints=EnsureDataset() | EnsureNone()),229 path=Parameter(230 args=("path",),231 metavar="PATH",232 nargs="+",233 doc="""path to DICOM files""",234 constraints=EnsureStr() | EnsureNone()),235 spec=Parameter(236 args=("-s", "--spec",),237 metavar="SPEC",238 doc="""file to store the specification in""",239 constraints=EnsureStr() | EnsureNone()),240 subject=Parameter(241 args=("--subject",),242 metavar="SUBJECT",243 doc="""subject identifier. If not specified, an attempt will be made244 to derive SUBJECT from DICOM headers""",245 constraints=EnsureStr() | EnsureNone()),246 anon_subject=Parameter(247 args=("--anon-subject",),248 metavar="ANON_SUBJECT",249 doc="""TODO""",250 constraints=EnsureStr() | EnsureNone()),251 acquisition=Parameter(252 args=("--acquisition",),253 metavar="ACQUISITION",254 doc="""acquisition identifier. If not specified, an attempt255 will be made to derive an identifier from DICOM headers""",256 constraints=EnsureStr() | EnsureNone()),257 properties=Parameter(258 args=("--properties",),259 metavar="PATH or JSON string",260 doc="""""",261 constraints=EnsureStr() | EnsureNone()),262 )263 @staticmethod264 @datasetmethod(name='hirni_dicom2spec')265 @eval_results266 def __call__(path=None, spec=None, dataset=None, subject=None,267 anon_subject=None, acquisition=None, properties=None):268 # TODO: acquisition can probably be removed (or made an alternative to269 # derive spec and/or dicom location from)270 # Change, so path needs to point directly to dicom ds?271 # Or just use acq and remove path?272 dataset = require_dataset(dataset, check_installed=True,273 purpose="spec from dicoms")274 from datalad.utils import assure_list275 if path is not None:276 path = assure_list(path)277 path = [resolve_path(p, dataset) for p in path]278 path = [str(p) for p in path]279 else:280 raise InsufficientArgumentsError(281 "insufficient arguments for dicom2spec: a path is required")282 # TODO: We should be able to deal with several paths at once283 # ATM we aren't (see also commit + message of actual spec)284 assert len(path) == 1285 if not spec:286 raise InsufficientArgumentsError(287 "insufficient arguments for dicom2spec: a spec file is required")288 # TODO: That's prob. wrong. We can derive default spec from acquisition289 else:290 spec = str(resolve_path(spec, dataset))291 spec_series_list = \292 [r for r in json_py.load_stream(spec)] if op.exists(spec) else list()293 # get dataset level metadata:294 found_some = False295 for meta in dataset.meta_dump(296 path,297 recursive=False, # always False?298 reporton='datasets',299 return_type='generator',300 result_renderer='disabled'):301 if meta.get('status', None) not in ['ok', 'notneeded']:302 yield meta303 continue304 if 'dicom' not in meta['metadata']:305 # TODO: Really "notneeded" or simply not a result at all?306 yield dict(307 status='notneeded',308 message=("found no DICOM metadata for %s",309 meta['path']),310 path=meta['path'],311 type='dataset',312 action='dicom2spec',313 logger=lgr)314 continue315 if 'Series' not in meta['metadata']['dicom'] or \316 not meta['metadata']['dicom']['Series']:317 yield dict(318 status='impossible',319 message=("no image series detected in DICOM metadata of"320 " %s", meta['path']),321 path=meta['path'],322 type='dataset',323 action='dicom2spec',324 logger=lgr)325 continue326 found_some = True327 overrides = dict()328 if properties:329 # load from file or json string330 props = json_py.load(properties) \331 if op.exists(properties) else json_py.loads(properties)332 # turn into editable, pre-approved records333 props = {k: dict(value=v, approved=True) for k, v in props.items()}334 overrides.update(props)335 spec_series_list = add_to_spec(meta,336 spec_series_list,337 op.dirname(spec),338 subject=subject,339 anon_subject=anon_subject,340 # session=session,341 # TODO: parameter "session" was what342 # we now call acquisition. This is343 # NOT a good default for bids_session!344 # Particularly wrt to anonymization345 overrides=overrides,346 dataset=dataset347 )348 if not found_some:349 yield dict(status='impossible',350 message="found no DICOM metadata",351 path=path,352 type='file', # TODO: arguable should be 'file' or 'dataset', depending on path353 action='dicom2spec',354 logger=lgr)355 return356 # TODO: RF needed. This rule should go elsewhere:357 # ignore duplicates (prob. reruns of aborted runs)358 # -> convert highest id only359 # Note: This sorting is a q&d hack!360 # TODO: Sorting needs to become more sophisticated + include notion of :all361 spec_series_list = sorted(spec_series_list,362 key=lambda x: get_specval(x, 'id')363 if 'id' in x.keys() else 0)364 for i in range(len(spec_series_list)):365 # Note: Removed the following line from condition below,366 # since it appears to be pointless. Value for 'converter'367 # used to be 'heudiconv' or 'ignore' for a 'dicomseries', so368 # it's not clear ATM what case this could possibly have catched:369 # heuristic.has_specval(spec_series_list[i], "converter") and \370 if spec_series_list[i]["type"] == "dicomseries" and \371 has_specval(spec_series_list[i], "bids-run") and \372 get_specval(spec_series_list[i], "bids-run") in \373 [get_specval(s, "bids-run")374 for s in spec_series_list[i + 1:]375 if get_specval(376 s,377 "description") == get_specval(378 spec_series_list[i], "description") and \379 get_specval(s, "id") > get_specval(380 spec_series_list[i], "id")381 ]:382 lgr.debug("Ignore SeriesNumber %s for conversion" % i)383 spec_series_list[i]["tags"].append(384 'hirni-dicom-converter-ignore')385 lgr.debug("Storing specification (%s)", spec)386 # store as a stream (one record per file) to be able to387 # easily concat files without having to parse them, or388 # process them line by line without having to fully parse them389 from datalad_hirni.support.spec_helpers import sort_spec390 # Note: Sorting paradigm needs to change. See above.391 # spec_series_list = sorted(spec_series_list, key=lambda x: sort_spec(x))392 json_py.dump2stream(spec_series_list, spec)393 # make sure spec is tracked in git:394 spec_attrs = dataset.repo.get_gitattributes(spec)395 spec_relpath = op.relpath(spec, dataset.path)396 if spec_relpath not in spec_attrs.keys() or \397 'annex.largefiles' not in spec_attrs[spec_relpath].keys() or \398 spec_attrs[spec_relpath]['annex.largefiles'] != 'nothing':399 dataset.repo.set_gitattributes([(spec,400 {'annex.largefiles': 'nothing'})],401 '.gitattributes')402 for r in Save.__call__(dataset=dataset,403 path=[spec, '.gitattributes'],404 to_git=True,405 message="[HIRNI] Added study specification "406 "snippet for %s" %407 op.relpath(path[0], dataset.path),408 return_type='generator',409 result_renderer='disabled'):410 if r.get('status', None) not in ['ok', 'notneeded']:411 yield r412 elif r['path'] in [spec, op.join(dataset.path, '.gitattributes')] \413 and r['type'] == 'file':414 r['action'] = 'dicom2spec'415 r['logger'] = lgr416 yield r417 elif r['type'] == 'dataset':418 # 'ok' or 'notneeded' for a dataset is okay, since we commit419 # the spec. But it's not a result to yield420 continue421 else:422 # anything else shouldn't happen423 yield dict(status='error',424 message=("unexpected result from save: %s", r),425 path=spec, # TODO: This actually isn't clear - get it from `r`426 type='file',427 action='dicom2spec',...

Full Screen

Full Screen

test_scm.py

Source:test_scm.py Github

copy

Full Screen

1import tempfile2import os3import stat4import configparser5import shutil6from copr_rpmbuild.providers.scm import ScmProvider7from copr_rpmbuild.helpers import read_config8from . import TestCase9try:10 from unittest import mock11 builtins = 'builtins'12except ImportError:13 # Python 2 version depends on mock14 import mock15 builtins = '__builtin__'16RPKG_CONF_JINJA = """17[rpkg]18lookaside = {{ lookaside_url }}19anongiturl = {{ clone_url }}/%(module)s20"""21class TestScmProvider(TestCase):22 def setUp(self):23 super(TestScmProvider, self).setUp()24 self.source_json = {25 "type": "git",26 "clone_url": "https://example.org/somerepo.git",27 "committish": "f28",28 "subdirectory": "subpkg",29 "spec": "pkg.spec",30 "srpm_build_method": "rpkg",31 }32 @mock.patch('{0}.open'.format(builtins), new_callable=mock.mock_open())33 @mock.patch('copr_rpmbuild.providers.base.os.mkdir')34 def test_init(self, mock_mkdir, mock_open):35 source_json = self.source_json.copy()36 provider = ScmProvider(source_json, self.config)37 self.assertEqual(provider.scm_type, "git")38 self.assertEqual(provider.clone_url, "https://example.org/somerepo.git")39 self.assertEqual(provider.committish, "f28")40 self.assertEqual(provider.repo_subdir, "subpkg")41 self.assertEqual(provider.spec_relpath, "pkg.spec")42 self.assertEqual(provider.srpm_build_method, "rpkg")43 self.assertEqual(provider.repo_dirname, "somerepo")44 self.assertEqual(provider.repo_path, os.path.join(provider.workdir, "somerepo"))45 self.assertEqual(provider.repo_subpath, os.path.join(provider.workdir, "somerepo", "subpkg"))46 self.assertEqual(provider.spec_path, os.path.join(provider.workdir, "somerepo", "subpkg", "pkg.spec"))47 source_json["subdirectory"] = "/SOURCES"48 source_json["spec"] = "/SPECS/pkg.spec"49 provider = ScmProvider(source_json, self.config)50 self.assertEqual(provider.repo_subdir, "/SOURCES")51 self.assertEqual(provider.spec_relpath, "/SPECS/pkg.spec")52 self.assertEqual(provider.repo_path, os.path.join(provider.workdir, "somerepo"))53 self.assertEqual(provider.repo_subpath, os.path.join(provider.workdir, "somerepo", "SOURCES"))54 self.assertEqual(provider.spec_path, os.path.join(provider.workdir, "somerepo", "SPECS", "pkg.spec"))55 def test_generate_rpkg_config(self):56 tmpdir = tempfile.mkdtemp(prefix="copr-rpmbuild-test-")57 self.config.set("main", "workspace", tmpdir)58 rpkg_tmpdir = tempfile.mkdtemp(prefix="copr-rpmbuild-test-", dir=tmpdir)59 rpkg_config = open(os.path.join(rpkg_tmpdir, "rpkg.conf.j2"), "w")60 rpkg_config.write(RPKG_CONF_JINJA)61 rpkg_config.close()62 source_json = self.source_json.copy()63 source_json["clone_url"] = "http://copr-dist-git.fedorainfracloud.org/git/clime/project/pkg.git"64 with mock.patch("copr_rpmbuild.providers.scm.CONF_DIRS", new=[rpkg_tmpdir]):65 provider = ScmProvider(source_json, self.config)66 rpkg_config_path = provider.generate_rpkg_config()67 config = configparser.RawConfigParser()68 config.read(rpkg_config_path)69 self.assertTrue(config.has_section("rpkg"))70 self.assertEqual(config.get("rpkg", "lookaside"), "http://copr-dist-git.fedorainfracloud.org/repo/pkgs")71 self.assertEqual(config.get("rpkg", "anongiturl"), "http://copr-dist-git.fedorainfracloud.org/git/%(module)s")72 source_json["clone_url"] = "http://unknownurl/git/clime/project/pkg.git"73 with mock.patch("copr_rpmbuild.providers.scm.CONF_DIRS", new=[rpkg_tmpdir]):74 provider = ScmProvider(source_json, self.config)75 rpkg_config_path = provider.generate_rpkg_config()76 self.assertEqual(rpkg_config_path, os.path.join(os.environ['HOME'], '.config', 'rpkg.conf'))77 shutil.rmtree(tmpdir)78 @mock.patch('{0}.open'.format(builtins), new_callable=mock.mock_open())79 @mock.patch('copr_rpmbuild.providers.base.os.mkdir')80 def test_get_rpkg_command(self, mock_mkdir, mock_open):81 provider = ScmProvider(self.source_json, self.config)82 provider.generate_rpkg_config = mock.MagicMock(return_value="/etc/rpkg.conf")83 assert_cmd = ["rpkg", "srpm",84 "--outdir", self.config.get("main", "resultdir"),85 "--spec", provider.spec_path]86 self.assertEqual(provider.get_rpkg_command(), assert_cmd)87 @mock.patch('{0}.open'.format(builtins), new_callable=mock.mock_open())88 @mock.patch('copr_rpmbuild.providers.base.os.mkdir')89 def test_get_tito_command(self, mock_mkdir, mock_open):90 provider = ScmProvider(self.source_json, self.config)91 assert_cmd = ["tito", "build", "--srpm",92 "--output", self.config.get("main", "resultdir")]93 self.assertEqual(provider.get_tito_command(), assert_cmd)94 @mock.patch("copr_rpmbuild.helpers.run_cmd")95 @mock.patch('{0}.open'.format(builtins), new_callable=mock.mock_open())96 @mock.patch('copr_rpmbuild.providers.base.os.mkdir')97 def test_get_tito_test_command(self, mock_mkdir, mock_open, run_cmd_mock):98 provider = ScmProvider(self.source_json, self.config)99 assert_cmd = ["tito", "build", "--test", "--srpm",100 "--output", self.config.get("main", "resultdir")]101 self.assertEqual(provider.get_tito_test_command(), assert_cmd)102 @mock.patch("copr_rpmbuild.providers.scm.get_mock_uniqueext")103 @mock.patch('{0}.open'.format(builtins), new_callable=mock.mock_open())104 def test_get_make_srpm_command(self, mock_open, get_mock_uniqueext_mock):105 tmpdir = tempfile.mkdtemp(prefix="copr-rpmbuild-test-")106 ws = os.path.join(tmpdir, "workspace")107 rd = os.path.join(tmpdir, "resultdir")108 os.makedirs(ws)109 os.makedirs(rd)110 self.config.set("main", "workspace", ws)111 self.config.set("main", "resultdir", rd)112 get_mock_uniqueext_mock.return_value = '2'113 self.source_json['srpm_build_method'] = 'make_srpm'114 provider = ScmProvider(self.source_json, self.config)115 for directory in [provider.resultdir, provider.workdir]:116 assert stat.S_IMODE(os.stat(directory).st_mode) == 0o707117 resultdir = provider.resultdir118 basename = os.path.basename(resultdir)119 workdir_base = os.path.basename(provider.workdir)120 bind_mount_cmd_part = '--plugin-option=bind_mount:dirs=(("{0}", "/mnt/{1}"), ("{2}", "/mnt/{3}"))'\121 .format(provider.workdir, workdir_base,122 resultdir, basename)123 make_srpm_cmd_part = 'cd /mnt/{wb}/somerepo/subpkg; make -f /mnt/{wb}/somerepo/.copr/Makefile srpm '\124 'outdir="/mnt/{rb}" spec="/mnt/{wb}/somerepo/subpkg/pkg.spec"'\125 .format(126 wb=workdir_base,127 rb=basename,128 )129 assert_cmd = ['mock', '--uniqueext', '2', '-r', '/etc/copr-rpmbuild/mock-source-build.cfg',130 bind_mount_cmd_part, '--chroot', make_srpm_cmd_part]131 self.assertEqual(provider.get_make_srpm_command(), assert_cmd)...

Full Screen

Full Screen

scm.py

Source:scm.py Github

copy

Full Screen

1import os2import re3import logging4import re5from copr_rpmbuild import helpers6from jinja2 import Environment, FileSystemLoader7from ..helpers import run_cmd, CONF_DIRS, get_mock_uniqueext8from .base import Provider9from six.moves.urllib.parse import urlparse10log = logging.getLogger("__main__")11class ScmProvider(Provider):12 def init_provider(self):13 source_dict = self.source_dict14 self.scm_type = source_dict.get('type') or 'git'15 self.clone_url = source_dict.get('clone_url')16 self.committish = source_dict.get('committish')17 self.repo_subdir = source_dict.get('subdirectory') or ''18 self.spec_relpath = source_dict.get('spec') or ''19 self.srpm_build_method = source_dict.get('srpm_build_method') or 'rpkg'20 self.repo_dirname = os.path.splitext(os.path.basename(21 self.clone_url.rstrip('/')))[0]22 self.repo_path = helpers.path_join(self.workdir, self.repo_dirname)23 self.repo_subpath = helpers.path_join(self.repo_path, self.repo_subdir)24 self.spec_path = helpers.path_join(25 self.repo_path, os.path.join(self.repo_subdir, self.spec_relpath))26 # make_srpm method can create root-owned files in resultdir27 self.use_safe_resultdir = self.srpm_build_method == "make_srpm"28 def generate_rpkg_config(self):29 parsed_clone_url = urlparse(self.clone_url)30 distgit_config_section = None31 index = 032 config_section = 'distgit{index}'.format(index=index)33 while self.config.has_section(config_section):34 distgit_hostname_pattern = self.config.get(35 config_section, 'distgit_hostname_pattern')36 if re.match(distgit_hostname_pattern, parsed_clone_url.netloc):37 distgit_config_section = config_section38 break39 index += 140 config_section = 'distgit{index}'.format(index=index)41 if not distgit_config_section:42 distgit_config_section = 'main'43 distgit_lookaside_url = self.config.get(44 distgit_config_section, 'distgit_lookaside_url', fallback='').strip('/').format(45 scheme=parsed_clone_url.scheme, netloc=parsed_clone_url.netloc)46 distgit_clone_url = self.config.get(47 distgit_config_section, 'distgit_clone_url', fallback='').strip('/').format(48 scheme=parsed_clone_url.scheme, netloc=parsed_clone_url.netloc)49 jinja_env = Environment(loader=FileSystemLoader(CONF_DIRS))50 template = jinja_env.get_template("rpkg.conf.j2")51 config = template.render(lookaside_url=distgit_lookaside_url,52 clone_url=distgit_clone_url)53 log.debug('Generated rpkg config:\n'+config+'\n')54 config_dir_path = os.path.join(os.getenv('HOME'), '.config')55 try:56 os.makedirs(config_dir_path)57 except OSError:58 pass59 config_path = os.path.join(config_dir_path, 'rpkg.conf')60 log.debug('Writing config into '+config_path)61 f = open(config_path, "w+")62 f.write(config)63 f.close()64 return config_path65 def get_rpkg_command(self):66 self.generate_rpkg_config()67 return ['rpkg', 'srpm', '--outdir', self.resultdir, '--spec', self.spec_path]68 def get_tito_command(self):69 return ['tito', 'build', '--srpm', '--output', self.resultdir]70 def get_tito_test_command(self):71 return ['tito', 'build', '--test', '--srpm', '--output', self.resultdir]72 @staticmethod73 def _mock_mountpoint(directory):74 base = os.path.basename(os.path.normpath(directory))75 return os.path.join("/mnt", base)76 def get_make_srpm_command(self):77 mock_workdir = self._mock_mountpoint(self.workdir)78 mock_resultdir = self._mock_mountpoint(self.resultdir)79 mock_repodir = helpers.path_join(mock_workdir, self.repo_dirname)80 mock_cwd = helpers.path_join(mock_repodir, self.repo_subdir)81 mock_spec_path = helpers.path_join(82 mock_repodir, os.path.join(self.repo_subdir, self.spec_relpath))83 mock_bind_mount_cmd_part = \84 '--plugin-option=bind_mount:dirs=(("{0}", "{1}"), ("{2}", "{3}"))'\85 .format(self.workdir, mock_workdir, self.resultdir, mock_resultdir)86 makefile_path = os.path.join(mock_repodir, '.copr', 'Makefile')87 make_srpm_cmd_part = \88 'cd {0}; make -f {1} srpm outdir="{2}" spec="{3}"'\89 .format(mock_cwd, makefile_path, mock_resultdir, mock_spec_path)90 return ['mock', '--uniqueext', get_mock_uniqueext(),91 '-r', '/etc/copr-rpmbuild/mock-source-build.cfg',92 mock_bind_mount_cmd_part, '--chroot', make_srpm_cmd_part]93 def produce_srpm(self):94 helpers.git_clone_and_checkout(95 self.clone_url,96 self.committish,97 self.repo_path,98 self.scm_type)99 cmd = {100 'rpkg': self.get_rpkg_command,101 'tito': self.get_tito_command,102 'tito_test': self.get_tito_test_command,103 'make_srpm': self.get_make_srpm_command,104 }[self.srpm_build_method]()105 if not os.path.exists(self.repo_subpath):106 raise RuntimeError("The user-defined SCM subdirectory `{}' doesn't exist within this repository {}"107 .format(self.repo_subdir, self.clone_url))...

Full Screen

Full Screen

example_collector_spec.py

Source:example_collector_spec.py Github

copy

Full Screen

...4import inspect5from mamba import example, example_group, loader6from mamba.example_collector import ExampleCollector7from expects import expect8def spec_relpath(name):9 return os.path.join('spec', 'fixtures', name)10def spec_abspath(name):11 return os.path.join(os.path.dirname(__file__), 'fixtures', name)12IRRELEVANT_PATH = spec_abspath('without_inner_contexts.py')13PENDING_DECORATOR_PATH = spec_abspath('with_pending_decorator.py')14PENDING_DECORATOR_AS_ROOT_PATH = spec_abspath('with_pending_decorator_as_root.py')15WITH_RELATIVE_IMPORT_PATH = spec_abspath('with_relative_import.py')16def _load_module(path):17 example_collector = ExampleCollector([path])18 return list(example_collector.modules())[0]19with description(ExampleCollector) as _:20 with context('when loading from file'):21 with it('loads module from absolute path'):22 module = _load_module(IRRELEVANT_PATH)23 expect(inspect.ismodule(module)).to.be.true24 with it('loads module from relative path'):25 module = _load_module(spec_relpath('without_inner_contexts.py'))26 expect(inspect.ismodule(module)).to.be.true27 #FIXME: Mixed responsabilities in test [collect, load]??28 with context('when loading'):29 with it('orders examples by line number'):30 module = _load_module(spec_abspath('without_inner_contexts.py'))31 examples = loader.Loader().load_examples_from(module)32 expect(examples).to.have.length(1)33 expect([example.name for example in examples[0].examples]).to.be.equal(['it first example', 'it second example', 'it third example'])34 with it('places examples together and groups at the end'):35 module = _load_module(spec_abspath('with_inner_contexts.py'))36 examples = loader.Loader().load_examples_from(module)37 expect(examples).to.have.length(1)38 expect([example.name for example in examples[0].examples]).to.be.equal(['it first example', 'it second example', 'it third example', '#inner_context'])39 with context('when a pending decorator loaded'):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Mamba automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful