Best Python code snippet using localstack_python
base.py
Source:base.py  
...158        """Register filter passed from API layer.159160        :param filter_name: Name of the filter passed in the URL161162        Filter names are validated in validate_filters() method which by163        default allows filters based on fields' names.  Extensions can create164        new filter names.  Such names must be registered to particular object165        with this method.166        """167        cls.extra_filter_names.add(filter_name)168169    @classmethod170    def validate_filters(cls, **kwargs):171        bad_filters = {key for key in kwargs172                       if key not in cls.fields or cls.is_synthetic(key)}173        bad_filters.difference_update(cls.extra_filter_names)174        if bad_filters:175            bad_filters = ', '.join(bad_filters)176            msg = _("'%s' is not supported for filtering") % bad_filters177            raise n_exc.InvalidInput(error_message=msg)178179    @classmethod180    @abc.abstractmethod181    def get_objects(cls, context, _pager=None, validate_filters=True,182                    **kwargs):183        raise NotImplementedError()184185    @classmethod186    def update_objects(cls, context, values, validate_filters=True, **kwargs):187        objs = cls.get_objects(188            context, validate_filters=validate_filters, **kwargs)189        for obj in objs:190            for k, v in values.items():191                setattr(obj, k, v)192            obj.update()193        return len(objs)194195    @classmethod196    def delete_objects(cls, context, validate_filters=True, **kwargs):197        objs = cls.get_objects(198            context, validate_filters=validate_filters, **kwargs)199        for obj in objs:200            obj.delete()201        return len(objs)202203    def create(self):204        raise NotImplementedError()205206    def update(self):207        raise NotImplementedError()208209    def delete(self):210        raise NotImplementedError()211212    @classmethod213    def count(cls, context, validate_filters=True, **kwargs):214        '''Count the number of objects matching filtering criteria.'''215        return len(216            cls.get_objects(217                context, validate_filters=validate_filters, **kwargs))218219220def _detach_db_obj(func):221    """Decorator to detach db_obj from the session."""222    @functools.wraps(func)223    def decorator(self, *args, **kwargs):224        synthetic_changed = bool(self._get_changed_synthetic_fields())225        res = func(self, *args, **kwargs)226        # some relationship based fields may be changed since we227        # captured the model, let's refresh it for the latest database228        # state229        if synthetic_changed:230            # TODO(ihrachys) consider refreshing just changed attributes231            self.obj_context.session.refresh(self.db_obj)232        # detach the model so that consequent fetches don't reuse it233        self.obj_context.session.expunge(self.db_obj)234        return res235    return decorator236237238class DeclarativeObject(abc.ABCMeta):239240    def __init__(cls, name, bases, dct):241        super(DeclarativeObject, cls).__init__(name, bases, dct)242        if 'project_id' in cls.fields:243            obj_extra_fields_set = set(cls.obj_extra_fields)244            obj_extra_fields_set.add('tenant_id')245            cls.obj_extra_fields = list(obj_extra_fields_set)246            setattr(cls, 'tenant_id',247                    property(lambda x: x.get('project_id', None)))248249        fields_no_update_set = set(cls.fields_no_update)250        for base in itertools.chain([cls], bases):251            keys_set = set()252            if hasattr(base, 'primary_keys'):253                keys_set.update(base.primary_keys)254            if hasattr(base, 'obj_extra_fields'):255                keys_set.update(base.obj_extra_fields)256            for key in keys_set:257                if key in cls.fields or key in cls.obj_extra_fields:258                    fields_no_update_set.add(key)259        cls.fields_no_update = list(fields_no_update_set)260261        model = getattr(cls, 'db_model', None)262        if model:263            # generate unique_keys from the model264            if not getattr(cls, 'unique_keys', None):265                cls.unique_keys = []266                obj_field_names = set(cls.fields.keys())267                model_to_obj_translation = {268                    v: k for (k, v) in cls.fields_need_translation.items()}269270                keys = db_utils.get_unique_keys(model) or []271                for model_unique_key in keys:272                    obj_unique_key = [model_to_obj_translation.get(key, key)273                                      for key in model_unique_key]274                    if obj_field_names.issuperset(obj_unique_key):275                        cls.unique_keys.append(obj_unique_key)276            # detach db_obj right after object is loaded from the model277            cls.create = _detach_db_obj(cls.create)278            cls.update = _detach_db_obj(cls.update)279280        if (hasattr(cls, 'has_standard_attributes') and281                cls.has_standard_attributes()):282            setattr(cls, 'standard_attr_id',283                    property(lambda x: x.db_obj.standard_attr_id284                             if x.db_obj else None))285            standardattributes.add_standard_attributes(cls)286            standardattributes.add_tag_filter_names(cls)287        # Instantiate extra filters per class288        cls.extra_filter_names = set(cls.extra_filter_names)289        # add tenant_id filter for objects that have project_id290        if 'project_id' in cls.fields and 'tenant_id' not in cls.fields:291            cls.extra_filter_names.add('tenant_id')292293294@six.add_metaclass(DeclarativeObject)295class NeutronDbObject(NeutronObject):296297    # should be overridden for all persistent objects298    db_model = None299300    primary_keys = ['id']301302    # 'unique_keys' is a list of unique keys that can be used with get_object303    # instead of 'primary_keys' (e.g. [['key1'], ['key2a', 'key2b']]).304    # By default 'unique_keys' will be inherited from the 'db_model'305    unique_keys = []306307    # this is a dict to store the association between the foreign key and the308    # corresponding key in the main table for a synthetic field of a specific309    # class, e.g. port extension has 'port_id' as foreign key, that is310    # associated with the key 'id' of the table Port for the synthetic311    # field of class Port. So foreign_keys = {'Port': {'port_id': 'id'}}.312    # The assumption is the association is the same for all object fields.313    # E.g. all the port extension will use 'port_id' as key.314    foreign_keys = {}315316    fields_no_update = []317318    # dict with name mapping: {'field_name_in_object': 'field_name_in_db'}319    # It can be used also as DB relationship mapping to synthetic fields name.320    # It is needed to load synthetic fields with one SQL query using side321    # loaded entities.322    # Examples: {'synthetic_field_name': 'relationship_name_in_model'}323    #           {'field_name_in_object': 'field_name_in_db'}324    fields_need_translation = {}325326    # obj_extra_fields defines properties that are not part of the model327    # but we want to expose them for easier usage of the object.328    # Handling of obj_extra_fields is in oslo.versionedobjects.329    # The extra fields can be accessed as read only property and are exposed330    # in to_dict()331    # obj_extra_fields = []332333    def __init__(self, *args, **kwargs):334        super(NeutronDbObject, self).__init__(*args, **kwargs)335        self._captured_db_model = None336337    @property338    def db_obj(self):339        '''Return a database model that persists object data.'''340        return self._captured_db_model341342    def from_db_object(self, db_obj):343        fields = self.modify_fields_from_db(db_obj)344        for field in self.fields:345            if field in fields and not self.is_synthetic(field):346                setattr(self, field, fields[field])347        self.load_synthetic_db_fields(db_obj)348        self._captured_db_model = db_obj349        self.obj_reset_changes()350351    @classmethod352    def has_standard_attributes(cls):353        return bool(cls.db_model and354                    issubclass(cls.db_model,355                               standard_attr.HasStandardAttributes))356357    @classmethod358    def modify_fields_to_db(cls, fields):359        """360        This method enables to modify the fields and its361        content before data is inserted into DB.362363         It uses the fields_need_translation dict with structure:364        {365            'field_name_in_object': 'field_name_in_db'366        }367368        :param fields: dict of fields from NeutronDbObject369        :return: modified dict of fields370        """371        result = copy.deepcopy(dict(fields))372        for field, field_db in cls.fields_need_translation.items():373            if field in result:374                result[field_db] = result.pop(field)375        return result376377    @classmethod378    def modify_fields_from_db(cls, db_obj):379        """Modify the fields after data were fetched from DB.380381        It uses the fields_need_translation dict with structure:382        {383            'field_name_in_object': 'field_name_in_db'384        }385386        :param db_obj: model fetched from database387        :return: modified dict of DB values388        """389        # db models can have declarative proxies that are not exposed into390        # db.keys() so we must fetch data based on object fields definition391        potential_fields = (list(cls.fields.keys()) +392                            list(cls.fields_need_translation.values()))393        result = {field: db_obj[field] for field in potential_fields394                  if db_obj.get(field) is not None}395        for field, field_db in cls.fields_need_translation.items():396            if field_db in result:397                result[field] = result.pop(field_db)398        return result399400    @classmethod401    def _load_object(cls, context, db_obj):402        obj = cls(context)403        obj.from_db_object(db_obj)404        # detach the model so that consequent fetches don't reuse it405        context.session.expunge(obj.db_obj)406        return obj407408    def obj_load_attr(self, attrname):409        """Set None for nullable fields that has unknown value.410411        In case model attribute is not present in database, value stored under412        ``attrname'' field will be unknown. In such cases if the field413        ``attrname'' is a nullable Field return None414        """415        try:416            is_attr_nullable = self.fields[attrname].nullable417        except KeyError:418            return super(NeutronDbObject, self).obj_load_attr(attrname)419        if is_attr_nullable:420            self[attrname] = None421422    @classmethod423    def get_object(cls, context, **kwargs):424        """425        Return the first result of given context or None if the result doesn't426        contain any row. Next, convert it to a versioned object.427428        :param context:429        :param kwargs: multiple keys defined by key=value pairs430        :return: single object of NeutronDbObject class or None431        """432        lookup_keys = set(kwargs.keys())433        all_keys = itertools.chain([cls.primary_keys], cls.unique_keys)434        if not any(lookup_keys.issuperset(keys) for keys in all_keys):435            missing_keys = set(cls.primary_keys).difference(lookup_keys)436            raise o_exc.NeutronPrimaryKeyMissing(object_class=cls.__name__,437                                                 missing_keys=missing_keys)438439        with context.session.begin(subtransactions=True):440            db_obj = obj_db_api.get_object(441                context, cls.db_model,442                **cls.modify_fields_to_db(kwargs)443            )444            if db_obj:445                return cls._load_object(context, db_obj)446447    @classmethod448    def get_objects(cls, context, _pager=None, validate_filters=True,449                    **kwargs):450        """451        Fetch all results from DB and convert them to versioned objects.452453        :param context:454        :param _pager: a Pager object representing advanced sorting/pagination455                       criteria456        :param validate_filters: Raises an error in case of passing an unknown457                                 filter458        :param kwargs: multiple keys defined by key=value pairs459        :return: list of objects of NeutronDbObject class or empty list460        """461        if validate_filters:462            cls.validate_filters(**kwargs)463        with context.session.begin(subtransactions=True):464            db_objs = obj_db_api.get_objects(465                context, cls.db_model, _pager=_pager,466                **cls.modify_fields_to_db(kwargs)467            )468            return [cls._load_object(context, db_obj) for db_obj in db_objs]469470    @classmethod471    def update_objects(cls, context, values, validate_filters=True, **kwargs):472        """473        Update objects that match filtering criteria from DB.474475        :param context:476        :param values: multiple keys to update in matching objects477        :param validate_filters: Raises an error in case of passing an unknown478                                 filter479        :param kwargs: multiple keys defined by key=value pairs480        :return: Number of entries updated481        """482        if validate_filters:483            cls.validate_filters(**kwargs)484485        # if we have standard attributes, we will need to fetch records to486        # update revision numbers487        if cls.has_standard_attributes():488            return super(NeutronDbObject, cls).update_objects(489                context, values, validate_filters=False, **kwargs)490491        with db_api.autonested_transaction(context.session):492            return obj_db_api.update_objects(493                context, cls.db_model,494                cls.modify_fields_to_db(values),495                **cls.modify_fields_to_db(kwargs))496497    @classmethod498    def delete_objects(cls, context, validate_filters=True, **kwargs):499        """500        Delete objects that match filtering criteria from DB.501502        :param context:503        :param validate_filters: Raises an error in case of passing an unknown504                                 filter505        :param kwargs: multiple keys defined by key=value pairs506        :return: Number of entries deleted507        """508        if validate_filters:509            cls.validate_filters(**kwargs)510        with context.session.begin(subtransactions=True):511            return obj_db_api.delete_objects(512                context, cls.db_model, **cls.modify_fields_to_db(kwargs))513514    @classmethod515    def is_accessible(cls, context, db_obj):516        return (context.is_admin or517                context.tenant_id == db_obj.tenant_id)518519    @staticmethod520    def filter_to_str(value):521        if isinstance(value, list):522            return [str(val) for val in value]523        return str(value)524525    @staticmethod526    def filter_to_json_str(value, default=None):527        def _dict_to_json(v):528            return (529                jsonutils.dumps(530                    collections.OrderedDict(531                        sorted(v.items(), key=lambda t: t[0])532                    )533                ) if v else default534            )535536        if isinstance(value, list):537            return [_dict_to_json(val) for val in value]538        v = _dict_to_json(value)539        return v540541    @staticmethod542    def load_json_from_str(field, default=None):543        value = field or default544        if value:545            value = jsonutils.loads(value)546        return value547548    def _get_changed_persistent_fields(self):549        fields = self.obj_get_changes()550        for field in self.synthetic_fields:551            if field in fields:552                del fields[field]553        return fields554555    def _get_changed_synthetic_fields(self):556        fields = self.obj_get_changes()557        for field in self._get_changed_persistent_fields():558            if field in fields:559                del fields[field]560        return fields561562    def _validate_changed_fields(self, fields):563        fields = fields.copy()564        forbidden_updates = set(self.fields_no_update) & set(fields.keys())565        if forbidden_updates:566            raise o_exc.NeutronObjectUpdateForbidden(fields=forbidden_updates)567568        return fields569570    def load_synthetic_db_fields(self, db_obj=None):571        """572        Load the synthetic fields that are stored in a different table from the573        main object.574575        This method doesn't take care of loading synthetic fields that aren't576        stored in the DB, e.g. 'shared' in RBAC policy.577        """578        clsname = self.__class__.__name__579580        # TODO(rossella_s) Find a way to handle ObjectFields with581        # subclasses=True582        for field in self.synthetic_fields:583            try:584                objclasses = obj_base.VersionedObjectRegistry.obj_classes(585                ).get(self.fields[field].objname)586            except AttributeError:587                # NOTE(rossella_s) this is probably because this field is not588                # an ObjectField589                continue590            if not objclasses:591                # NOTE(rossella_s) some synthetic fields are not handled by592                # this method, for example the ones that have subclasses, see593                # QosRule594                continue595            objclass = objclasses[0]596            foreign_keys = objclass.foreign_keys.get(clsname)597            if not foreign_keys:598                raise o_exc.NeutronSyntheticFieldsForeignKeysNotFound(599                    parent=clsname, child=objclass.__name__)600            if len(foreign_keys.keys()) > 1:601                raise o_exc.NeutronSyntheticFieldMultipleForeignKeys(602                        field=field)603604            synthetic_field_db_name = (605                self.fields_need_translation.get(field, field))606            synth_db_objs = (db_obj.get(synthetic_field_db_name, None)607                             if db_obj else None)608609            # synth_db_objs can be list, empty list or None, that is why610            # we need 'is not None', because [] is valid case for 'True'611            if synth_db_objs is not None:612                if not isinstance(synth_db_objs, list):613                    synth_db_objs = [synth_db_objs]614                synth_objs = [objclass._load_object(self.obj_context, obj)615                              for obj in synth_db_objs]616            else:617                synth_objs = objclass.get_objects(618                    self.obj_context, **{619                        k: getattr(self, v) if v in self else db_obj.get(v)620                        for k, v in foreign_keys.items()})621            if isinstance(self.fields[field], obj_fields.ObjectField):622                setattr(self, field, synth_objs[0] if synth_objs else None)623            else:624                setattr(self, field, synth_objs)625            self.obj_reset_changes([field])626627    def create(self):628        fields = self._get_changed_persistent_fields()629        with db_api.autonested_transaction(self.obj_context.session):630            try:631                db_obj = obj_db_api.create_object(632                    self.obj_context, self.db_model,633                    self.modify_fields_to_db(fields))634            except obj_exc.DBDuplicateEntry as db_exc:635                raise o_exc.NeutronDbObjectDuplicateEntry(636                    object_class=self.__class__, db_exception=db_exc)637638            self.from_db_object(db_obj)639640    def _get_composite_keys(self):641        keys = {}642        for key in self.primary_keys:643            keys[key] = getattr(self, key)644        return keys645646    def update_fields(self, obj_data, reset_changes=False):647        """Updates fields of an object that are not forbidden to be updated.648649        :param obj_data: the full set of object data650        :type obj_data: dict651        :param reset_changes: indicates whether the object's current set of652                              changed fields should be cleared653        :type reset_changes: boolean654655        :returns: None656        """657        if reset_changes:658            self.obj_reset_changes()659        for k, v in obj_data.items():660            if k not in self.fields_no_update:661                setattr(self, k, v)662663    def update(self):664        updates = self._get_changed_persistent_fields()665        updates = self._validate_changed_fields(updates)666667        with db_api.autonested_transaction(self.obj_context.session):668            db_obj = obj_db_api.update_object(669                self.obj_context, self.db_model,670                self.modify_fields_to_db(updates),671                **self.modify_fields_to_db(672                    self._get_composite_keys()))673            self.from_db_object(db_obj)674675    def delete(self):676        obj_db_api.delete_object(self.obj_context, self.db_model,677                                 **self.modify_fields_to_db(678                                     self._get_composite_keys()))679        self._captured_db_model = None680681    @classmethod682    def count(cls, context, validate_filters=True, **kwargs):683        """684        Count the number of objects matching filtering criteria.685686        :param context:687        :param validate_filters: Raises an error in case of passing an unknown688                                 filter689        :param kwargs: multiple keys defined by key=value pairs690        :return: number of matching objects691        """692        if validate_filters:693            cls.validate_filters(**kwargs)694        return obj_db_api.count(695            context, cls.db_model, **cls.modify_fields_to_db(kwargs)696        )697698    @classmethod699    def objects_exist(cls, context, validate_filters=True, **kwargs):700        """701        Check if objects are present in DB.702703        :param context:704        :param validate_filters: Raises an error in case of passing an unknown705                                 filter706        :param kwargs: multiple keys defined by key=value pairs707        :return: boolean. True if object is present.708        """709        if validate_filters:710            cls.validate_filters(**kwargs)711        # Succeed if at least a single object matches; no need to fetch more712        return bool(obj_db_api.get_object(713            context, cls.db_model, **cls.modify_fields_to_db(kwargs))
...utils.py
Source:utils.py  
...195            self.assertEqual(debug_error_message('An error message'), None)196            self.app.config['DEBUG'] = True197            self.assertEqual(debug_error_message('An error message'),198                             'An error message')199    def test_validate_filters(self):200        self.app.config['DOMAIN'][self.known_resource]['allowed_filters'] = []201        with self.app.test_request_context():202            self.assertTrue('key' in validate_filters(203                {'key': 'val'},204                self.known_resource))205            self.assertTrue('key' in validate_filters(206                {'key': ['val1', 'val2']},207                self.known_resource))208            self.assertTrue('key' in validate_filters(209                {'key': {'$in': ['val1', 'val2']}},210                self.known_resource))211            self.assertTrue('key' in validate_filters(212                {'$or': [{'key': 'val1'}, {'key': 'val2'}]},213                self.known_resource))214            self.assertTrue('$or' in validate_filters(215                {'$or': 'val'},216                self.known_resource))217            self.assertTrue('$or' in validate_filters(218                {'$or': {'key': 'val1'}},219                self.known_resource))220            self.assertTrue('$or' in validate_filters(221                {'$or': ['val']},222                self.known_resource))223        self.app.config['DOMAIN'][self.known_resource]['allowed_filters'] = \224            ['key']225        with self.app.test_request_context():226            self.assertTrue(validate_filters(227                {'key': 'val'},228                self.known_resource) is None)229            self.assertTrue(validate_filters(230                {'key': ['val1', 'val2']},231                self.known_resource) is None)232            self.assertTrue(validate_filters(233                {'key': {'$in': ['val1', 'val2']}},234                self.known_resource) is None)235            self.assertTrue(validate_filters(236                {'$or': [{'key': 'val1'}, {'key': 'val2'}]},237                self.known_resource) is None)238class DummyEvent(object):239    """240    Even handler that records the call parameters and asserts a check241    Usage::242        app = Eve()243        app.on_my_event = DummyEvent(element_not_deleted)244    In the test::245        assert app.on_my_event.called[0] == expected_param_0246    """247    def __init__(self, check, deepcopy=False):248        """249        :param check: method checking the state of something during the event....test_qual.py
Source:test_qual.py  
...23    assert not qf.qual_pass_filter("/AAA")24    assert qf.qual_pass_filter("AE]]]/")25    assert 3 == qf.parsed26    assert 2 == qf.passed27def validate_filters(filters):28    assert "qflag" in filters29    assert isinstance(filters["qflag"], qual.QualityFilter)30    assert 30 == filters["qflag"].min_qscore31    assert 0.2 == filters["qflag"].max_perc32    assert "qtest" in filters33    assert isinstance(filters["qtest"], qual.QualityFilter)34    assert 15 == filters["qtest"].min_qscore35    assert 0.1 == filters["qtest"].max_perc36def test_QualityFilter_init_flag_filters():37    filters = qual.QualityFilter.init_flag_filters(38        DEFAULT_FILTER_QUAL_FLAGS, DEFAULT_PHRED_OFFSET39    )40    validate_filters(filters)41def test_dummy_apply_filter_flag():42    assert qual.dummy_apply_filter_flag({}, {})43def test_apply_filter_flag():44    filters = qual.QualityFilter.init_flag_filters(45        DEFAULT_FILTER_QUAL_FLAGS, DEFAULT_PHRED_OFFSET46    )47    assert qual.apply_filter_flag(48        {"qflag": ("AAAA", -1, -1), "qtest": ("AAAAAAAAA", -1, -1)}, filters49    )50    assert not qual.apply_filter_flag(51        {"qflag": ("/AAA", -1, -1), "qtest": ("AAAAAAAAA", -1, -1)}, filters52    )53    assert not qual.apply_filter_flag(54        {"qflag": ("AAAA", -1, -1), "qtest": ("/AAAAAAAA", -1, -1)}, filters55    )56    assert not qual.apply_filter_flag(57        {"qflag": ("/AAA", -1, -1), "qtest": ("/AAAAAAAA", -1, -1)}, filters58    )59    assert qual.apply_filter_flag(60        {"qflag": ("/EEAAAA", -1, -1), "qtest": ("/EEAAAAAAAA", -1, -1)}, filters61    )62def test_setup_qual_filters():63    qff, ff = qual.setup_qual_filters(None, DEFAULT_PHRED_OFFSET)64    assert 0 == len(qff)65    assert qual.dummy_apply_filter_flag == ff66    qff, ff = qual.setup_qual_filters(DEFAULT_FILTER_QUAL_FLAGS, DEFAULT_PHRED_OFFSET)67    assert 2 == len(qff)68    assert qual.apply_filter_flag == ff...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
