Best Python code snippet using slash
fixture_store.py
Source:fixture_store.py  
1import collections2import sys3from contextlib import contextmanager4from slash import ctx5import logbook6from ordered_set import OrderedSet7from ...ctx import context as slash_context8from ...exception_handling import handling_exceptions9from ...exceptions import CyclicFixtureDependency, UnresolvedFixtureStore, UnknownFixtures, InvalidFixtureName10from ...utils.python import get_arguments, reraise11from ..variation_factory import VariationFactory12from ..test import is_valid_test_name13from .active_fixture import ActiveFixture14from .fixture import Fixture15from .namespace import Namespace16from .parameters import Parametrization, iter_parametrization_fixtures17from .utils import (get_real_fixture_name_from_argument, get_scope_by_name,18                    nofixtures)19_logger = logbook.Logger(__name__)20class FixtureStore(object):21    def __init__(self):22        super(FixtureStore, self).__init__()23        self._namespaces = [Namespace(self)]24        self._unresolved_fixture_ids = set()25        self._fixtures_by_id = {}26        self._fixtures_by_fixture_info = {}27        self._active_fixtures_by_scope = collections.defaultdict(dict)28        self._active_fixture_dependencies = {} # maps fixture id to the frozenset of (param_id, variation index)29        self._computing = set()30        self._all_needed_parametrization_ids_by_fixture_id = {}31        self._known_fixture_ids = collections.defaultdict(dict) # maps fixture ids to known combinations32    def get_active_fixture(self, fixture):33        return self._active_fixtures_by_scope[fixture.info.scope].get(fixture.info.id)34    def _compute_id(self, variation, p):35        if isinstance(p, Parametrization):36            return variation.param_value_indices[p.info.id]37        combination = frozenset((f.info.id, self._compute_id(variation, f))38                                for f in self.iter_all_needed_fixture_objects(p))39        known = self._known_fixture_ids[p.info.id]40        return known.setdefault(combination, len(known))41    def iter_all_needed_fixture_objects(self, fixtureobj):42        for fid in self.get_all_needed_fixture_ids(fixtureobj):43            yield self.get_fixture_by_id(fid)44    def iter_active_fixtures(self):45        for _, fixtures in self._active_fixtures_by_scope.items():46            for f in fixtures.values():47                yield f48    def call_with_fixtures(self, test_func, namespace, trigger_test_start=False, trigger_test_end=False):49        if not nofixtures.is_marked(test_func):50            fixture_names = self.get_required_fixture_names(test_func)51            kwargs = self.get_fixture_dict(fixture_names, namespace)52            used_fixtures_decorator_names = getattr(test_func, '__extrafixtures__', None)53            if used_fixtures_decorator_names is not None:54                used_fixture_names_only = set(used_fixtures_decorator_names) - set(fixture_names)55                for name, fixture in self._get_fixtures_set(used_fixture_names_only, namespace=namespace):56                    self.get_fixture_value(fixture, name=name)57        else:58            kwargs = {}59        try:60            if trigger_test_start:61                for fixture in self.iter_active_fixtures():62                    fixture.call_test_start()63            return test_func(**kwargs)64        finally:65            if trigger_test_end:66                for fixture in self.iter_active_fixtures():67                    with handling_exceptions(swallow=True):68                        fixture.call_test_end()69    def get_required_fixture_names(self, test_func):70        """Returns a list of fixture names needed by test_func.71        Each element returned is either a string or a tuple of (required_name, real_name)72        """73        skip_names = {name for name, _ in iter_parametrization_fixtures(test_func)}74        returned = []75        for argument in get_arguments(test_func):76            if argument.name in skip_names:77                continue78            real_name = get_real_fixture_name_from_argument(argument)79            if real_name == argument.name:80                returned.append(real_name)81            else:82                returned.append((argument.name, real_name))83        return returned84    def get_required_fixture_objects(self, test_func, namespace):85        names = self.get_required_fixture_names(test_func)86        assert isinstance(names, list)87        return set(self.get_fixture_dict(names, namespace=namespace, get_values=False).values())88    def resolve_name(self, parameter_name, start_point, namespace=None):89        if namespace is None:90            namespace = self.get_current_namespace()91        parts = parameter_name.split('.')[::-1]92        if not parts:93            raise UnknownFixtures(parameter_name)94        while parts:95            current_name = parts.pop()96            param_fixtures = dict(iter_parametrization_fixtures(start_point))97            if current_name in param_fixtures:98                if parts: # we cannot decend further than a parameter99                    raise UnknownFixtures(parameter_name)100                start_point = param_fixtures[current_name]101            else:102                start_point = self.get_fixture_by_name(current_name, namespace=namespace)103                namespace = start_point.namespace104        return start_point105    def __iter__(self):106        return iter(self._fixtures_by_id.values())107    def push_namespace(self):108        self._namespaces.append(Namespace(self, parent=self._namespaces[-1]))109    def pop_namespace(self):110        return self._namespaces.pop(-1)111    @contextmanager112    def new_namespace_context(self):113        self.push_namespace()114        try:115            yield116        finally:117            self.pop_namespace()118    def get_current_namespace(self):119        return self._namespaces[-1]120    def get_all_needed_fixture_ids(self, fixtureobj):121        if self._unresolved_fixture_ids:122            raise UnresolvedFixtureStore()123        if isinstance(fixtureobj, Parametrization):124            return frozenset([fixtureobj.info.id])125        returned = self._all_needed_parametrization_ids_by_fixture_id.get(fixtureobj.info.id)126        if returned is None:127            returned = self._compute_all_needed_parametrization_ids(fixtureobj)128            self._all_needed_parametrization_ids_by_fixture_id[fixtureobj.info.id] = returned129        return returned130    def iter_autouse_fixtures_in_namespace(self, namespace=None):131        if namespace is None:132            namespace = self.get_current_namespace()133        for fixture in namespace.iter_fixtures():134            if fixture.info.autouse:135                yield fixture136    def activate_autouse_fixtures_in_namespace(self, namespace):137        for fixture in self.iter_autouse_fixtures_in_namespace(namespace):138            _ = self.get_fixture_value(fixture)139    def _compute_all_needed_parametrization_ids(self, fixtureobj):140        stack = [(fixtureobj.info.id, [fixtureobj.info.id], set([fixtureobj.info.id]))]141        returned = OrderedSet()142        while stack:143            fixture_id, path, visited = stack.pop()144            if fixture_id in self._all_needed_parametrization_ids_by_fixture_id:145                returned.update(self._all_needed_parametrization_ids_by_fixture_id[fixture_id])146                continue147            fixture = self._fixtures_by_id[fixture_id]148            if fixture.parametrization_ids:149                assert isinstance(fixture.parametrization_ids, OrderedSet)150                returned.update(fixture.parametrization_ids)151            if fixture.keyword_arguments:152                for needed in fixture.keyword_arguments.values():153                    if needed.is_parameter():154                        continue155                    needed_id = needed.info.id156                    if needed_id in visited:157                        self._raise_cyclic_dependency_error(fixtureobj, path, needed_id)158                    stack.append((needed_id, path + [needed_id], visited | set([needed_id])))159        return returned160    def _raise_cyclic_dependency_error(self, fixtureobj, path, new_id):161        raise CyclicFixtureDependency(162            'Cyclic fixture dependency detected in {}: {}'.format(163                fixtureobj.info.func.__code__.co_filename,164                ' -> '.join(self._fixtures_by_id[f_id].info.name165                            for f_id in path + [new_id])))166    def push_scope(self, scope):167        scope = get_scope_by_name(scope)168    def pop_scope(self, scope): # pylint: disable=unused-argument169        if slash_context.result is not None and slash_context.result.is_interrupted():170            return171        scope = get_scope_by_name(scope)172        for s, active_fixtures in self._active_fixtures_by_scope.items():173            if s <= scope:174                for active_fixture in reversed(list(active_fixtures.values())):175                    with handling_exceptions(swallow=True):176                        self._deactivate_fixture(active_fixture.fixture)177                assert not active_fixtures178    def ensure_known_parametrization(self, parametrization):179        if parametrization.info.id not in self._fixtures_by_id:180            self._fixtures_by_id[parametrization.info.id] = parametrization181    def add_fixtures_from_dict(self, d):182        for thing in d.values():183            fixture_info = getattr(thing, '__slash_fixture__', None)184            if fixture_info is None:185                continue186            assert self.get_current_namespace() is self._namespaces[-1]187            fixture_info = self.add_fixture(thing).__slash_fixture__188            self.get_current_namespace().add_name(189                fixture_info.name, fixture_info.id)190    def add_fixture(self, fixture_func):191        fixture_info = fixture_func.__slash_fixture__192        existing_fixture = self._fixtures_by_id.get(fixture_info.id)193        if existing_fixture is not None:194            return existing_fixture.fixture_func195        if is_valid_test_name(fixture_info.name):196            raise InvalidFixtureName('Invalid fixture name: {.name}'.format(fixture_info))197        fixture_object = Fixture(self, fixture_func)198        current_namespace = self._namespaces[-1]199        current_namespace.add_name(fixture_info.name, fixture_info.id)200        self.register_fixture_id(fixture_object)201        return fixture_func202    def register_fixture_id(self, f):203        if f.info.id in self._fixtures_by_id:204            return205        self._fixtures_by_id[f.info.id] = f206        self._unresolved_fixture_ids.add(f.info.id)207    def get_fixture_by_name(self, name, namespace=None):208        if namespace is None:209            namespace = self._namespaces[-1]210        return namespace.get_fixture_by_name(name)211    def get_fixture_by_argument(self, arg):212        return self.get_fixture_by_name(get_real_fixture_name_from_argument(arg))213    def get_fixture_by_id(self, fixture_id):214        return self._fixtures_by_id[fixture_id]215    def get_fixture_dict(self, fixture_names, namespace=None, get_values=True, skip_names=frozenset()):216        returned = {}217        if namespace is None:218            namespace = self.get_current_namespace()219        fixtures_set = self._get_fixtures_set(fixture_names, skip_names=skip_names, namespace=namespace)220        for required_name, fixture in fixtures_set:221            if get_values:222                fixture = self.get_fixture_value(fixture, name=required_name)223            if required_name in fixture_names or [element for element in fixture_names if required_name in element]:224                returned[required_name] = fixture225        return returned226    def _get_fixtures_set(self, fixture_names, namespace=None, skip_names=frozenset(), fixtures_set=None):227        if fixtures_set is None:228            fixtures_set = OrderedSet()229        for element in fixture_names:230            if isinstance(element, tuple):231                required_name, real_name = element232            else:233                required_name = real_name = element234            if required_name in skip_names:235                continue236            if element == 'this':237                continue238            fixture = namespace.get_fixture_by_name(real_name)239            fixtures_set.add((required_name, fixture))240            if hasattr(fixture.fixture_func, "__extrafixtures__"):241                self._get_fixtures_set(fixture.fixture_func.__extrafixtures__, skip_names=skip_names,242                                       namespace=namespace, fixtures_set=fixtures_set)243        return fixtures_set244    def get_fixture_value(self, fixture, name=None):245        if name is None:246            name = fixture.info.name247        value = self._compute_fixture_value(name, fixture)248        return value249    def get_value(self, variation, parameter_or_fixture):250        fixture_id = parameter_or_fixture.info.id251        fixtureobj = self.get_fixture_by_id(parameter_or_fixture.info.id)252        if isinstance(fixtureobj, Parametrization):253            value = parameter_or_fixture.get_value_by_index(variation.param_value_indices[fixture_id])254        else:255            value = self.get_fixture_value(parameter_or_fixture)256        return value257    def iter_parametrization_variations(self, fixture_ids=(), funcs=(), methods=()):258        if self._unresolved_fixture_ids:259            raise UnresolvedFixtureStore()260        variation_factory = VariationFactory(self)261        for fixture_id in fixture_ids:262            variation_factory.add_needed_fixture_id(fixture_id)263        for func in funcs:264            variation_factory.add_needed_fixtures_from_function(func)265        for method in methods:266            variation_factory.add_needed_fixtures_from_method(method)267        return variation_factory.iter_variations()268    def _compute_fixture_value(self, name, fixture, relative_name=None):269        if relative_name is None:270            relative_name = name271        assert not fixture.is_parameter()272        if fixture.info.id in self._computing:273            raise CyclicFixtureDependency(274                'Fixture {!r} is a part of a dependency cycle!'.format(name))275        active_fixture = self.get_active_fixture(fixture)276        if active_fixture is not None:277            if self._is_active_fixture_valid(fixture):278                _logger.trace("Fixture {} did not change", fixture)279                return active_fixture.value280            else:281                _logger.trace("Fixture {} no longer valid. Recomputing", fixture)282                self._deactivate_fixture(active_fixture.fixture)283        self._computing.add(fixture.info.id)284        try:285            fixture_value = self._call_fixture(fixture, relative_name=relative_name)286        except:287            exc_info = sys.exc_info()288            self._deactivate_fixture(fixture)289            reraise(*exc_info)290        finally:291            self._computing.discard(fixture.info.id)292        return fixture_value293    def _is_active_fixture_valid(self, fixture):294        assert fixture.info.id in self._active_fixture_dependencies, "Fixture dependencies not updated"295        new_dependencies = self._compute_fixture_dependencies(fixture)296        return new_dependencies.issubset(self._active_fixture_dependencies[fixture.info.id])297    def _compute_fixture_dependencies(self, fixture):298        param_indices = self._compute_all_needed_parametrization_ids(fixture)299        if not param_indices:300            return frozenset()301        assert ctx.session is not None, "Dependency computation requires an active session"302        variation = ctx.session.variations.get_current_variation()303        assert variation is not None, "Dependency computation requires current variation"304        return frozenset((param_id, variation.param_value_indices[param_id])305                         for param_id in self._compute_all_needed_parametrization_ids(fixture))306    def _call_fixture(self, fixture, relative_name):307        assert relative_name308        active_fixture = ActiveFixture(fixture)309        kwargs = {}310        if fixture.keyword_arguments is None:311            raise UnresolvedFixtureStore('Fixture {} is unresolved!'.format(fixture.info.name))312        for required_name, needed_fixture in fixture.keyword_arguments.items():313            if needed_fixture.is_parameter():314                continue315            kwargs[required_name] = self._compute_fixture_value(316                required_name, needed_fixture,317                relative_name='{} -> {}'.format(relative_name, required_name))318        assert fixture.info.id not in self._active_fixtures_by_scope[fixture.info.scope]319        _logger.trace("Activating fixture {}...", fixture)320        self._active_fixtures_by_scope[fixture.info.scope][fixture.info.id] = active_fixture321        self._active_fixture_dependencies[fixture.info.id] = self._compute_fixture_dependencies(fixture)322        prev_context_fixture = slash_context.fixture323        slash_context.fixture = active_fixture324        try:325            returned = active_fixture.value = fixture.get_value(kwargs, active_fixture)326        finally:327            slash_context.fixture = prev_context_fixture328        _logger.trace(' -- {} = {!r}', relative_name, returned)329        return returned330    def _deactivate_fixture(self, fixture):331        # in most cases it will be the last active fixture in its scope332        active = self._active_fixtures_by_scope[fixture.info.scope].pop(fixture.info.id, None)333        self._active_fixture_dependencies.pop(fixture.info.id, None)334        if active is not None:335            active.do_cleanups()336    def resolve(self):337        while self._unresolved_fixture_ids:338            fixture = self._fixtures_by_id[self._unresolved_fixture_ids.pop()]...test_fixture_mechanism.py
Source:test_fixture_mechanism.py  
...204def test_fixture_store_iter_parametrization_variations_unresolved(store):205    @store.add_fixture206    @slash.fixture207    @slash.parametrize('x', [1, 2, 3])208    def needed_fixture(x):209        pass210    def test_func(needed_fixture):211        pass212    with pytest.raises(UnresolvedFixtureStore):213        list(store.iter_parametrization_variations(funcs=[test_func]))214def test_fixture_dependency(store):215    counter = itertools.count()216    @store.add_fixture217    @slash.fixture218    def fixture1(fixture2):219        assert fixture2 == 'fixture2_value_0'220        return 'fixture1_value_{}'.format(next(counter))221    @store.add_fixture222    @slash.fixture...fixture.py
Source:fixture.py  
1import itertools2from collections import OrderedDict3from ordered_set import OrderedSet4from ...exceptions import UnknownFixtures, InvalidFixtureScope, CyclicFixtureDependency5from contextlib import ExitStack6from ...ctx import context7from .namespace import Namespace8from .parameters import iter_parametrization_fixtures9from .fixture_base import FixtureBase10from .utils import get_real_fixture_name_from_argument11from ..requirements import get_requirements12from ..tagging import get_tags, NO_TAGS, Tags13_fixture_id = itertools.count()14class Fixture(FixtureBase):15    def __init__(self, store, fixture_func):16        super(Fixture, self).__init__()17        self.fixture_func = fixture_func18        self.info = self.fixture_func.__slash_fixture__19        self.scope = self.info.scope20        self.namespace = Namespace(store, store.get_current_namespace())21    def get_tags(self, store):22        current_fixture_tags = get_tags(self.fixture_func)23        returned = current_fixture_tags.copy() if current_fixture_tags is not NO_TAGS else Tags()24        required_fixtures_tags = [fixture.get_tags(store) for fixture in store.get_required_fixture_objects(self.fixture_func, self.namespace)]25        for tags in required_fixtures_tags:26            returned.update(tags)27        return returned28    def is_parameter(self):29        return False30    def is_fixture(self):31        return True32    parametrization_ids = None33    def __repr__(self):34        return '<Function Fixture around {}>'.format(self.fixture_func)35    def is_override(self):36        parent = self.namespace.get_parent()37        while parent is not None:38            f = parent.get_fixture_by_name(self.info.name, default=None)39            if f is None:40                return False41            if f is not self:42                return True43            parent = parent.get_parent()44        return False45    def get_value(self, kwargs, active_fixture):46        if self.info.needs_this:47            assert 'this' not in kwargs48            kwargs['this'] = active_fixture49        with ExitStack() as stack:50            if context.session is not None:51                stack.enter_context(context.session.cleanups.default_scope_override(self.info.scope_name))52            return self.fixture_func(**kwargs)53    def get_requirements(self, store):54        fixture_requirements = get_requirements(self.fixture_func)55        required_fixtures = store.get_required_fixture_objects(self.fixture_func, self.namespace)56        while required_fixtures:57            fixture_requirements.extend(required_fixtures.pop().get_requirements(store))58        return fixture_requirements59    def _resolve(self, store):60        assert self.keyword_arguments is None61        assert self.parametrization_ids is None62        self.parametrization_ids = OrderedSet()63        keyword_arguments = OrderedDict()64        parametrized = set()65        for name, param in iter_parametrization_fixtures(self.fixture_func):66            store.register_fixture_id(param)67            parametrized.add(name)68            self.parametrization_ids.add(param.info.id)69            keyword_arguments[name] = param70        for param_name, arg in self.info.required_args.items():71            if param_name in parametrized:72                continue73            try:74                needed_fixture = self.namespace.get_fixture_by_name(get_real_fixture_name_from_argument(arg))75                if needed_fixture.scope < self.scope: # pylint: disable=no-member76                    raise InvalidFixtureScope('Fixture {} is dependent on {}, which has a smaller scope ({} > {})'.format(77                        self.info.name, param_name, self.scope, needed_fixture.scope)) # pylint: disable=no-member78                if needed_fixture is self:79                    raise CyclicFixtureDependency('Cyclic fixture dependency detected in {}: {} depends on itself'.format(80                        self.info.func.__code__.co_filename,81                        self.info.name))82                keyword_arguments[param_name] = needed_fixture83            except LookupError:84                raise UnknownFixtures(param_name)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
