Best Python code snippet using autotest_python
resource_lib.py
Source:resource_lib.py  
...37            try:38                instance = cls.from_uri_args(request, **kwargs)39            except django.core.exceptions.ObjectDoesNotExist, exc:40                raise http.Http404(exc)41            instance.read_query_parameters(request.GET)42            return instance.handle_request()43        except exceptions.RequestError, exc:44            return exc.response45    def handle_request(self):46        if self._request.method.upper() not in self._permitted_methods:47            return http.HttpResponseNotAllowed(self._permitted_methods)48        handler = getattr(self, self._request.method.lower())49        return handler()50    # the handler methods below only need to be overridden if the resource51    # supports the method52    def get(self):53        """Handle a GET request.54        @returns an HttpResponse55        """56        raise NotImplementedError57    def post(self):58        """Handle a POST request.59        @returns an HttpResponse60        """61        raise NotImplementedError62    def put(self):63        """Handle a PUT request.64        @returns an HttpResponse65        """66        raise NotImplementedError67    def delete(self):68        """Handle a DELETE request.69        @returns an HttpResponse70        """71        raise NotImplementedError72    @classmethod73    def from_uri_args(cls, request, **kwargs):74        """Construct an instance from URI args.75        Default implementation for resources with no URI args.76        """77        return cls(request)78    def _uri_args(self):79        """Return kwargs for a URI reference to this resource.80        Default implementation for resources with no URI args.81        """82        return {}83    def _query_parameters_accepted(self):84        """Return sequence of tuples (name, description) for query parameters.85        Documents the available query parameters for GETting this resource.86        Default implementation for resources with no parameters.87        """88        return ()89    def read_query_parameters(self, parameters):90        """Read relevant query parameters from a Django MultiValueDict."""91        params_acccepted = set(param_name for param_name, _92                               in self._query_parameters_accepted())93        for name, values in parameters.iterlists():94            base_name = name.split(':', 1)[0]95            if base_name in params_acccepted:96                self._query_params.setlist(name, values)97    def set_query_parameters(self, **parameters):98        """Set query parameters programmatically."""99        self._query_params.update(parameters)100    def href(self, query_params=None):101        """Return URI to this resource."""102        kwargs = self._uri_args()103        path = urlresolvers.reverse(self.dispatch_request, kwargs=kwargs)104        full_query_params = datastructures.MultiValueDict(self._query_params)105        if query_params:106            full_query_params.update(query_params)107        if full_query_params:108            path += '?' + urllib.urlencode(full_query_params.lists(),109                                           doseq=True)110        return self._request.build_absolute_uri(path)111    def resolve_uri(self, uri):112        # check for absolute URIs113        match = re.match(r'(?P<root>https?://[^/]+)(?P<path>/.*)', uri)114        if match:115            # is this URI for a different host?116            my_root = self._request.build_absolute_uri('/')117            request_root = match.group('root') + '/'118            if my_root != request_root:119                # might support this in the future, but not now120                raise exceptions.BadRequest('Unable to resolve remote URI %s'121                                            % uri)122            uri = match.group('path')123        try:124            view_method, args, kwargs = urlresolvers.resolve(uri)125        except http.Http404:126            raise exceptions.BadRequest('Unable to resolve URI %s' % uri)127        resource_class = view_method.im_self # class owning this classmethod128        return resource_class.from_uri_args(self._request, **kwargs)129    def resolve_link(self, link):130        if isinstance(link, dict):131            uri = link['href']132        elif isinstance(link, basestring):133            uri = link134        else:135            raise exceptions.BadRequest('Unable to understand link %s' % link)136        return self.resolve_uri(uri)137    def link(self, query_params=None):138        return {'href': self.href(query_params=query_params)}139    def _query_parameters_response(self):140        return dict((name, description)141                    for name, description in self._query_parameters_accepted())142    def _basic_response(self, content):143        """Construct and return a simple 200 response."""144        assert isinstance(content, dict)145        query_parameters = self._query_parameters_response()146        if query_parameters:147            content['query_parameters'] = query_parameters148        encoded_content = json.dumps(content)149        return http.HttpResponse(encoded_content,150                                 content_type=_JSON_CONTENT_TYPE)151    def _decoded_input(self):152        content_type = self._request.META.get('CONTENT_TYPE',153                                              _JSON_CONTENT_TYPE)154        raw_data = self._request.raw_post_data155        if content_type == _JSON_CONTENT_TYPE:156            try:157                raw_dict = json.loads(raw_data)158            except ValueError, exc:159                raise exceptions.BadRequest('Error decoding request body: '160                                            '%s\n%r' % (exc, raw_data))161            if not isinstance(raw_dict, dict):162                raise exceptions.BadRequest('Expected dict input, got %s: %r' %163                                            (type(raw_dict), raw_dict))164        elif content_type == 'application/x-www-form-urlencoded':165            cgi_dict = cgi.parse_qs(raw_data) # django won't do this for PUT166            raw_dict = {}167            for key, values in cgi_dict.items():168                value = values[-1] # take last value if multiple were given169                try:170                    # attempt to parse numbers, booleans and nulls171                    raw_dict[key] = json.loads(value)172                except ValueError:173                    # otherwise, leave it as a string174                    raw_dict[key] = value175        else:176            raise exceptions.RequestError(415, 'Unsupported media type: %s'177                                          % content_type)178        return _InputDict(raw_dict)179    def _format_datetime(self, date_time):180        """Return ISO 8601 string for the given datetime"""181        if date_time is None:182            return None183        timezone_hrs = time.timezone / 60 / 60  # convert seconds to hours184        if timezone_hrs >= 0:185            timezone_join = '+'186        else:187            timezone_join = '' # minus sign comes from number itself188        timezone_spec = '%s%s:00' % (timezone_join, timezone_hrs)189        return date_time.strftime('%Y-%m-%dT%H:%M:%S') + timezone_spec190    @classmethod191    def _check_for_required_fields(cls, input_dict, fields):192        assert isinstance(fields, (list, tuple)), fields193        missing_fields = ', '.join(field for field in fields194                                   if field not in input_dict)195        if missing_fields:196            raise exceptions.BadRequest('Missing input: ' + missing_fields)197class Entry(Resource):198    @classmethod199    def add_query_selectors(cls, query_processor):200        """Sbuclasses may override this to support querying."""201        pass202    def short_representation(self):203        return self.link()204    def full_representation(self):205        return self.short_representation()206    def get(self):207        return self._basic_response(self.full_representation())208    def put(self):209        try:210            self.update(self._decoded_input())211        except model_logic.ValidationError, exc:212            raise exceptions.BadRequest('Invalid input: %s' % exc)213        return self._basic_response(self.full_representation())214    def _delete_entry(self):215        raise NotImplementedError216    def delete(self):217        self._delete_entry()218        return http.HttpResponse(status=204) # No content219    def create_instance(self, input_dict, containing_collection):220        raise NotImplementedError221    def update(self, input_dict):222        raise NotImplementedError223class InstanceEntry(Entry):224    class NullEntry(object):225        def link(self):226            return None227        def short_representation(self):228            return None229    _null_entry = NullEntry()230    _permitted_methods = ('GET', 'PUT', 'DELETE')231    model = None # subclasses must override this with a Django model class232    def __init__(self, request, instance):233        assert self.model is not None234        super(InstanceEntry, self).__init__(request)235        self.instance = instance236        self._is_prepared_for_full_representation = False237    @classmethod238    def from_optional_instance(cls, request, instance):239        if instance is None:240            return cls._null_entry241        return cls(request, instance)242    def _delete_entry(self):243        self.instance.delete()244    def full_representation(self):245        self.prepare_for_full_representation([self])246        return super(InstanceEntry, self).full_representation()247    @classmethod248    def prepare_for_full_representation(cls, entries):249        """250        Prepare the given list of entries to generate full representations.251        This method delegates to _do_prepare_for_full_representation(), which252        subclasses may override as necessary to do the actual processing.  This253        method also marks the instance as prepared, so it's safe to call this254        multiple times with the same instance(s) without wasting work.255        """256        not_prepared = [entry for entry in entries257                        if not entry._is_prepared_for_full_representation]258        cls._do_prepare_for_full_representation([entry.instance259                                                 for entry in not_prepared])260        for entry in not_prepared:261            entry._is_prepared_for_full_representation = True262    @classmethod263    def _do_prepare_for_full_representation(cls, instances):264        """265        Subclasses may override this to gather data as needed for full266        representations of the given model instances.  Typically, this involves267        querying over related objects, and this method offers a chance to query268        for many instances at once, which can provide a great performance269        benefit.270        """271        pass272class Collection(Resource):273    _DEFAULT_ITEMS_PER_PAGE = 50274    _permitted_methods=('GET', 'POST')275    # subclasses must override these276    queryset = None # or override _fresh_queryset() directly277    entry_class = None278    def __init__(self, request):279        super(Collection, self).__init__(request)280        assert self.entry_class is not None281        if isinstance(self.entry_class, basestring):282            type(self).entry_class = _resolve_class_path(self.entry_class)283        self._query_processor = query_lib.QueryProcessor()284        self.entry_class.add_query_selectors(self._query_processor)285    def _query_parameters_accepted(self):286        params = [('start_index', 'Index of first member to include'),287                  ('items_per_page', 'Number of members to include'),288                  ('full_representations',289                   'True to include full representations of members')]290        for selector in self._query_processor.selectors():291            params.append((selector.name, selector.doc))292        return params293    def _fresh_queryset(self):294        assert self.queryset is not None295        # always copy the queryset before using it to avoid caching296        return self.queryset.all()297    def _entry_from_instance(self, instance):298        return self.entry_class(self._request, instance)299    def _representation(self, entry_instances):300        entries = [self._entry_from_instance(instance)301                   for instance in entry_instances]302        want_full_representation = self._read_bool_parameter(303                'full_representations')304        if want_full_representation:305            self.entry_class.prepare_for_full_representation(entries)306        members = []307        for entry in entries:308            if want_full_representation:309                rep = entry.full_representation()310            else:311                rep = entry.short_representation()312            members.append(rep)313        rep = self.link()314        rep.update({'members': members})315        return rep316    def _read_bool_parameter(self, name):317        if name not in self._query_params:318            return False319        return (self._query_params[name].lower() == 'true')320    def _read_int_parameter(self, name, default):321        if name not in self._query_params:322            return default323        input_value = self._query_params[name]324        try:325            return int(input_value)326        except ValueError:327            raise exceptions.BadRequest('Invalid non-numeric value for %s: %r'328                                        % (name, input_value))329    def _apply_form_query(self, queryset):330        """Apply any query selectors passed as form variables."""331        for parameter, values in self._query_params.lists():332            if ':' in parameter:333                parameter, comparison_type = parameter.split(':', 1)334            else:335                comparison_type = None336            if not self._query_processor.has_selector(parameter):337                continue338            for value in values: # forms keys can have multiple values339                queryset = self._query_processor.apply_selector(340                        queryset, parameter, value,341                        comparison_type=comparison_type)342        return queryset343    def _filtered_queryset(self):344        return self._apply_form_query(self._fresh_queryset())345    def get(self):346        queryset = self._filtered_queryset()347        items_per_page = self._read_int_parameter('items_per_page',348                                                  self._DEFAULT_ITEMS_PER_PAGE)349        start_index = self._read_int_parameter('start_index', 0)350        page = queryset[start_index:(start_index + items_per_page)]351        rep = self._representation(page)352        rep.update({'total_results': len(queryset),353                    'start_index': start_index,354                    'items_per_page': items_per_page})355        return self._basic_response(rep)356    def full_representation(self):357        # careful, this rep can be huge for large collections358        return self._representation(self._fresh_queryset())359    def post(self):360        input_dict = self._decoded_input()361        try:362            instance = self.entry_class.create_instance(input_dict, self)363            entry = self._entry_from_instance(instance)364            entry.update(input_dict)365        except model_logic.ValidationError, exc:366            raise exceptions.BadRequest('Invalid input: %s' % exc)367        # RFC 2616 specifies that we provide the new URI in both the Location368        # header and the body369        response = http.HttpResponse(status=201, # Created370                                     content=entry.href())371        response['Location'] = entry.href()372        return response373class Relationship(Entry):374    _permitted_methods = ('GET', 'DELETE')375    # subclasses must override this with a dict mapping name to entry class376    related_classes = None377    def __init__(self, **kwargs):378        assert len(self.related_classes) == 2379        self.entries = dict((name, kwargs[name])380                            for name in self.related_classes)381        for name in self.related_classes: # sanity check382            assert isinstance(self.entries[name], self.related_classes[name])383        # just grab the request from one of the entries384        some_entry = self.entries.itervalues().next()385        super(Relationship, self).__init__(some_entry._request)386    @classmethod387    def from_uri_args(cls, request, **kwargs):388        # kwargs contains URI args for each entry389        entries = {}390        for name, entry_class in cls.related_classes.iteritems():391            entries[name] = entry_class.from_uri_args(request, **kwargs)392        return cls(**entries)393    def _uri_args(self):394        kwargs = {}395        for name, entry in self.entries.iteritems():396            kwargs.update(entry._uri_args())397        return kwargs398    def short_representation(self):399        rep = self.link()400        for name, entry in self.entries.iteritems():401            rep[name] = entry.short_representation()402        return rep403    @classmethod404    def _get_related_manager(cls, instance):405        """Get the related objects manager for the given instance.406        The instance must be one of the related classes.  This method will407        return the related manager from that instance to instances of the other408        related class.409        """410        this_model = type(instance)411        models = [entry_class.model for entry_class412                  in cls.related_classes.values()]413        if isinstance(instance, models[0]):414            this_model, other_model = models415        else:416            other_model, this_model = models417        _, field = this_model.objects.determine_relationship(other_model)418        this_models_fields = (this_model._meta.fields419                              + this_model._meta.many_to_many)420        if field in this_models_fields:421            manager_name = field.attname422        else:423            # related manager is on other_model, get name of reverse related424            # manager on this_model425            manager_name = field.related.get_accessor_name()426        return getattr(instance, manager_name)427    def _delete_entry(self):428        # choose order arbitrarily429        entry, other_entry = self.entries.itervalues()430        related_manager = self._get_related_manager(entry.instance)431        related_manager.remove(other_entry.instance)432    @classmethod433    def create_instance(cls, input_dict, containing_collection):434        other_name = containing_collection.unfixed_name435        cls._check_for_required_fields(input_dict, (other_name,))436        entry = containing_collection.fixed_entry437        other_entry = containing_collection.resolve_link(input_dict[other_name])438        related_manager = cls._get_related_manager(entry.instance)439        related_manager.add(other_entry.instance)440        return other_entry.instance441    def update(self, input_dict):442        pass443class RelationshipCollection(Collection):444    def __init__(self, request=None, fixed_entry=None):445        if request is None:446            request = fixed_entry._request447        super(RelationshipCollection, self).__init__(request)448        assert issubclass(self.entry_class, Relationship)449        self.related_classes = self.entry_class.related_classes450        self.fixed_name = None451        self.fixed_entry = None452        self.unfixed_name = None453        self.related_manager = None454        if fixed_entry is not None:455            self._set_fixed_entry(fixed_entry)456            entry_uri_arg = self.fixed_entry._uri_args().values()[0]457            self._query_params[self.fixed_name] = entry_uri_arg458    def _set_fixed_entry(self, entry):459        """Set the fixed entry for this collection.460        The entry must be an instance of one of the related entry classes.  This461        method must be called before a relationship is used.  It gets called462        either from the constructor (when collections are instantiated from463        other resource handling code) or from read_query_parameters() (when a464        request is made directly for the collection.465        """466        names = self.related_classes.keys()467        if isinstance(entry, self.related_classes[names[0]]):468            self.fixed_name, self.unfixed_name = names469        else:470            assert isinstance(entry, self.related_classes[names[1]])471            self.unfixed_name, self.fixed_name = names472        self.fixed_entry = entry473        self.unfixed_class = self.related_classes[self.unfixed_name]474        self.related_manager = self.entry_class._get_related_manager(475                entry.instance)476    def _query_parameters_accepted(self):477        return [(name, 'Show relationships for this %s' % entry_class.__name__)478                for name, entry_class479                in self.related_classes.iteritems()]480    def _resolve_query_param(self, name, uri_arg):481        entry_class = self.related_classes[name]482        return entry_class.from_uri_args(self._request, uri_arg)483    def read_query_parameters(self, query_params):484        super(RelationshipCollection, self).read_query_parameters(query_params)485        if not self._query_params:486            raise exceptions.BadRequest(487                    'You must specify one of the parameters %s and %s'488                    % tuple(self.related_classes.keys()))489        query_items = self._query_params.items()490        fixed_entry = self._resolve_query_param(*query_items[0])491        self._set_fixed_entry(fixed_entry)492        if len(query_items) > 1:493            other_fixed_entry = self._resolve_query_param(*query_items[1])494            self.related_manager = self.related_manager.filter(495                    pk=other_fixed_entry.instance.id)496    def _entry_from_instance(self, instance):497        unfixed_entry = self.unfixed_class(self._request, instance)498        entries = {self.fixed_name: self.fixed_entry,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
