Best Python code snippet using autotest_python
resources.py
Source:resources.py  
...47    entry_class = AtomicGroupClass48class Label(EntryWithInvalid):49    model = models.Label50    @classmethod51    def add_query_selectors(cls, query_processor):52        query_processor.add_field_selector('name')53        query_processor.add_field_selector(54                'is_platform', field='platform',55                value_transform=query_processor.read_boolean)56    @classmethod57    def from_uri_args(cls, request, label_name, **kwargs):58        return cls(request, models.Label.objects.get(name=label_name))59    def _uri_args(self):60        return {'label_name': self.instance.name}61    def short_representation(self):62        rep = super(Label, self).short_representation()63        rep.update({'name': self.instance.name,64                    'is_platform': bool(self.instance.platform)})65        return rep66    def full_representation(self):67        rep = super(Label, self).full_representation()68        atomic_group_class = AtomicGroupClass.from_optional_instance(69                self._request, self.instance.atomic_group)70        rep.update({'atomic_group_class':71                        atomic_group_class.short_representation(),72                    'hosts': HostLabelingCollection(fixed_entry=self).link()})73        return rep74    @classmethod75    def create_instance(cls, input_dict, containing_collection):76        cls._check_for_required_fields(input_dict, ('name',))77        return models.Label.add_object(name=input_dict['name'])78    def update(self, input_dict):79        # TODO update atomic group80        if 'is_platform' in input_dict:81            self.instance.platform = input_dict['is_platform']82            self.instance.save()83class LabelCollection(resource_lib.Collection):84    queryset = models.Label.valid_objects.all()85    entry_class = Label86class AtomicLabelTagging(resource_lib.Relationship):87    related_classes = {'label': Label, 'atomic_group_class': AtomicGroupClass}88class AtomicLabelTaggingCollection(resource_lib.RelationshipCollection):89    entry_class = AtomicLabelTagging90class User(resource_lib.InstanceEntry):91    model = models.User92    _permitted_methods = ('GET,')93    @classmethod94    def from_uri_args(cls, request, username, **kwargs):95        if username == '@me':96            username = models.User.current_user().login97        return cls(request, models.User.objects.get(login=username))98    def _uri_args(self):99        return {'username': self.instance.login}100    def short_representation(self):101        rep = super(User, self).short_representation()102        rep['username'] = self.instance.login103        return rep104    def full_representation(self):105        rep = super(User, self).full_representation()106        accessible_hosts = HostCollection(self._request)107        accessible_hosts.set_query_parameters(accessible_by=self.instance.login)108        rep.update({'jobs': 'TODO',109                    'recurring_runs': 'TODO',110                    'acls':111                    UserAclMembershipCollection(fixed_entry=self).link(),112                    'accessible_hosts': accessible_hosts.link()})113        return rep114class UserCollection(resource_lib.Collection):115    _permitted_methods = ('GET',)116    queryset = models.User.objects.all()117    entry_class = User118class Acl(resource_lib.InstanceEntry):119    _permitted_methods = ('GET',)120    model = models.AclGroup121    @classmethod122    def from_uri_args(cls, request, acl_name, **kwargs):123        return cls(request, models.AclGroup.objects.get(name=acl_name))124    def _uri_args(self):125        return {'acl_name': self.instance.name}126    def short_representation(self):127        rep = super(Acl, self).short_representation()128        rep['name'] = self.instance.name129        return rep130    def full_representation(self):131        rep = super(Acl, self).full_representation()132        rep.update({'users':133                    UserAclMembershipCollection(fixed_entry=self).link(),134                    'hosts':135                    HostAclMembershipCollection(fixed_entry=self).link()})136        return rep137    @classmethod138    def create_instance(cls, input_dict, containing_collection):139        cls._check_for_required_fields(input_dict, ('name',))140        return models.AclGroup.add_object(name=input_dict['name'])141    def update(self, input_dict):142        pass143class AclCollection(resource_lib.Collection):144    queryset = models.AclGroup.objects.all()145    entry_class = Acl146class UserAclMembership(resource_lib.Relationship):147    related_classes = {'user': User, 'acl': Acl}148    # TODO: check permissions149    # TODO: check for and add/remove "Everyone"150class UserAclMembershipCollection(resource_lib.RelationshipCollection):151    entry_class = UserAclMembership152class Host(EntryWithInvalid):153    model = models.Host154    @classmethod155    def add_query_selectors(cls, query_processor):156        query_processor.add_field_selector('hostname')157        query_processor.add_field_selector(158                'locked', value_transform=query_processor.read_boolean)159        query_processor.add_field_selector(160                'locked_by', field='locked_by__login',161                doc='Username of user who locked this host, if locked')162        query_processor.add_field_selector('status')163        query_processor.add_field_selector(164                'protection_level', field='protection',165                doc='Verify/repair protection level',166                value_transform=cls._read_protection)167        query_processor.add_field_selector(168                'accessible_by', field='aclgroup__users__login',169                doc='Username of user with access to this host')170        query_processor.add_related_existence_selector(171                'has_label', models.Label, 'name')172    @classmethod173    def _read_protection(cls, protection_input):174        return host_protections.Protection.get_value(protection_input)175    @classmethod176    def from_uri_args(cls, request, hostname, **kwargs):177        return cls(request, models.Host.objects.get(hostname=hostname))178    def _uri_args(self):179        return {'hostname': self.instance.hostname}180    def short_representation(self):181        rep = super(Host, self).short_representation()182        # TODO calling platform() over and over is inefficient183        platform_rep = (Label.from_optional_instance(self._request,184                                                     self.instance.platform())185                        .short_representation())186        rep.update({'hostname': self.instance.hostname,187                    'locked': bool(self.instance.locked),188                    'status': self.instance.status,189                    'platform': platform_rep})190        return rep191    def full_representation(self):192        rep = super(Host, self).full_representation()193        protection = host_protections.Protection.get_string(194                self.instance.protection)195        locked_by = (User.from_optional_instance(self._request,196                                                 self.instance.locked_by)197                     .short_representation())198        labels = HostLabelingCollection(fixed_entry=self)199        acls = HostAclMembershipCollection(fixed_entry=self)200        queue_entries = QueueEntryCollection(self._request)201        queue_entries.set_query_parameters(host=self.instance.hostname)202        health_tasks = HealthTaskCollection(self._request)203        health_tasks.set_query_parameters(host=self.instance.hostname)204        rep.update({'locked_by': locked_by,205                    'locked_on': self._format_datetime(self.instance.lock_time),206                    'invalid': self.instance.invalid,207                    'protection_level': protection,208                    # TODO make these efficient209                    'labels': labels.full_representation(),210                    'acls': acls.full_representation(),211                    'queue_entries': queue_entries.link(),212                    'health_tasks': health_tasks.link()})213        return rep214    @classmethod215    def create_instance(cls, input_dict, containing_collection):216        cls._check_for_required_fields(input_dict, ('hostname',))217        # include locked here, rather than waiting for update(), to avoid race218        # conditions219        host = models.Host.add_object(hostname=input_dict['hostname'],220                                      locked=input_dict.get('locked', False))221        return host222    def update(self, input_dict):223        data = {'locked': input_dict.get('locked'),224                'protection': input_dict.get('protection_level')}225        data = input_dict.remove_unspecified_fields(data)226        if 'protection' in data:227            data['protection'] = self._read_protection(data['protection'])228        self.instance.update_object(**data)229        if 'platform' in input_dict:230            label = self.resolve_link(input_dict['platform']) .instance231            if not label.platform:232                raise exceptions.BadRequest('Label %s is not a platform' % label.name)233            for label in self.instance.labels.filter(platform=True):234                self.instance.labels.remove(label)235            self.instance.labels.add(label)236class HostCollection(resource_lib.Collection):237    queryset = models.Host.valid_objects.all()238    entry_class = Host239class HostLabeling(resource_lib.Relationship):240    related_classes = {'host': Host, 'label': Label}241class HostLabelingCollection(resource_lib.RelationshipCollection):242    entry_class = HostLabeling243class HostAclMembership(resource_lib.Relationship):244    related_classes = {'host': Host, 'acl': Acl}245    # TODO: acl.check_for_acl_violation_acl_group()246    # TODO: models.AclGroup.on_host_membership_change()247class HostAclMembershipCollection(resource_lib.RelationshipCollection):248    entry_class = HostAclMembership249class Test(resource_lib.InstanceEntry):250    model = models.Test251    @classmethod252    def add_query_selectors(cls, query_processor):253        query_processor.add_field_selector('name')254    @classmethod255    def from_uri_args(cls, request, test_name, **kwargs):256        return cls(request, models.Test.objects.get(name=test_name))257    def _uri_args(self):258        return {'test_name': self.instance.name}259    def short_representation(self):260        rep = super(Test, self).short_representation()261        rep['name'] = self.instance.name262        return rep263    def full_representation(self):264        rep = super(Test, self).full_representation()265        rep.update({'author': self.instance.author,266                    'class': self.instance.test_class,267                    'control_file_type':268                    control_data.CONTROL_TYPE.get_string(269                        self.instance.test_type),270                    'control_file_path': self.instance.path,271                    'sync_count': self.instance.sync_count,272                    'dependencies':273                    TestDependencyCollection(fixed_entry=self).link(),274                    })275        return rep276    @classmethod277    def create_instance(cls, input_dict, containing_collection):278        cls._check_for_required_fields(input_dict,279                                       ('name', 'control_file_type',280                                        'control_file_path'))281        test_type = control_data.CONTROL_TYPE.get_value(282            input['control_file_type'])283        return models.Test.add_object(name=input_dict['name'],284                                      test_type=test_type,285                                      path=input_dict['control_file_path'])286    def update(self, input_dict):287        data = {'test_type': input_dict.get('control_file_type'),288                'path': input_dict.get('control_file_path'),289                'class': input_dict.get('class'),290                }291        data = input_dict.remove_unspecified_fields(data)292        self.instance.update_object(**data)293class TestCollection(resource_lib.Collection):294    queryset = models.Test.objects.all()295    entry_class = Test296class TestDependency(resource_lib.Relationship):297    related_classes = {'test': Test, 'label': Label}298class TestDependencyCollection(resource_lib.RelationshipCollection):299    entry_class = TestDependency300# TODO profilers301class ExecutionInfo(resource_lib.Resource):302    _permitted_methods = ('GET','POST')303    _job_fields = models.Job.get_field_dict()304    _DEFAULTS = {305            'control_file': '',306            'is_server': True,307            'dependencies': [],308            'machines_per_execution': 1,309            'run_verify': bool(_job_fields['run_verify'].default),310            'run_reset': bool(_job_fields['run_reset'].default),311            'timeout_mins': _job_fields['timeout_mins'].default,312            'maximum_runtime_mins': _job_fields['max_runtime_mins'].default,313            'cleanup_before_job':314                model_attributes.RebootBefore.get_string(315                    models.DEFAULT_REBOOT_BEFORE),316            'cleanup_after_job':317                model_attributes.RebootAfter.get_string(318                    models.DEFAULT_REBOOT_AFTER),319            }320    def _query_parameters_accepted(self):321        return (('tests', 'Comma-separated list of test names to run'),322                ('kernels', 'TODO'),323                ('client_control_file',324                 'Client control file segment to run after all specified '325                 'tests'),326                ('profilers',327                 'Comma-separated list of profilers to activate during the '328                 'job'),329                ('use_container', 'TODO'),330                ('profile_only',331                 'If true, run only profiled iterations; otherwise, always run '332                 'at least one non-profiled iteration in addition to a '333                 'profiled iteration'),334                ('upload_kernel_config',335                 'If true, generate a server control file code that uploads '336                 'the kernel config file to the client and tells the client of '337                 'the new (local) path when compiling the kernel; the tests '338                 'must be server side tests'))339    @classmethod340    def execution_info_from_job(cls, job):341        return {'control_file': job.control_file,342                'is_server': 343                job.control_type == control_data.CONTROL_TYPE.SERVER,344                'dependencies': [label.name for label345                                 in job.dependency_labels.all()],346                'machines_per_execution': job.synch_count,347                'run_verify': bool(job.run_verify),348                'run_reset': bool(job.run_reset),349                'timeout_mins': job.timeout_mins,350                'maximum_runtime_mins': job.max_runtime_mins,351                'cleanup_before_job':352                    model_attributes.RebootBefore.get_string(job.reboot_before),353                'cleanup_after_job':354                    model_attributes.RebootAfter.get_string(job.reboot_after),355                }356    def _get_execution_info(self, input_dict):357        tests = input_dict.get('tests', '')358        client_control_file = input_dict.get('client_control_file', None)359        if not tests and not client_control_file:360            return self._DEFAULTS361        test_list = tests.split(',')362        if 'profilers' in input_dict:363            profilers_list = input_dict['profilers'].split(',')364        else:365            profilers_list = []366        kernels = input_dict.get('kernels', '') # TODO367        if kernels:368            kernels = [dict(version=kernel) for kernel in kernels.split(',')]369        cf_info, test_objects, profiler_objects, label = (370                rpc_utils.prepare_generate_control_file(371                        test_list, kernels, None, profilers_list))372        control_file_contents = control_file.generate_control(373                tests=test_objects, kernels=kernels,374                profilers=profiler_objects, is_server=cf_info['is_server'],375                client_control_file=client_control_file,376                profile_only=input_dict.get('profile_only', None),377                upload_kernel_config=input_dict.get(378                    'upload_kernel_config', None))379        return dict(self._DEFAULTS,380                    control_file=control_file_contents,381                    is_server=cf_info['is_server'],382                    dependencies=cf_info['dependencies'],383                    machines_per_execution=cf_info['synch_count'])384    def handle_request(self):385        result = self.link()386        result['execution_info'] = self._get_execution_info(387                self._request.REQUEST)388        return self._basic_response(result)389class QueueEntriesRequest(resource_lib.Resource):390    _permitted_methods = ('GET',)391    def _query_parameters_accepted(self):392        return (('hosts', 'Comma-separated list of hostnames'),393                ('one_time_hosts',394                 'Comma-separated list of hostnames not already in the '395                 'Autotest system'),396                ('meta_hosts',397                 'Comma-separated list of label names; for each one, an entry '398                 'will be created and assigned at runtime to an available host '399                 'with that label'),400                ('atomic_group_class', 'TODO'))401    def _read_list(self, list_string):402        if list_string:403            return list_string.split(',')404        return []405    def handle_request(self):406        request_dict = self._request.REQUEST407        hosts = self._read_list(request_dict.get('hosts'))408        one_time_hosts = self._read_list(request_dict.get('one_time_hosts'))409        meta_hosts = self._read_list(request_dict.get('meta_hosts'))410        atomic_group_class = request_dict.get('atomic_group_class')411        # TODO: bring in all the atomic groups magic from create_job()412        entries = []413        for hostname in one_time_hosts:414            models.Host.create_one_time_host(hostname)415        for hostname in hosts:416            entry = Host.from_uri_args(self._request, hostname)417            entries.append({'host': entry.link()})418        for label_name in meta_hosts:419            entry = Label.from_uri_args(self._request, label_name)420            entries.append({'meta_host': entry.link()})421        if atomic_group_class:422            entries.append({'atomic_group_class': atomic_group_class})423        result = self.link()424        result['queue_entries'] = entries425        return self._basic_response(result)426class Job(resource_lib.InstanceEntry):427    _permitted_methods = ('GET',)428    model = models.Job429    class _StatusConstraint(query_lib.Constraint):430        def apply_constraint(self, queryset, value, comparison_type,431                             is_inverse):432            if comparison_type != 'equals' or is_inverse:433                raise query_lib.ConstraintError('Can only use this selector '434                                                'with equals')435            non_queued_statuses = [436                    status for status, _437                    in models.HostQueueEntry.Status.choices()438                    if status != models.HostQueueEntry.Status.QUEUED]439            if value == 'queued':440                return queryset.exclude(441                        hostqueueentry__status__in=non_queued_statuses)442            elif value == 'active':443                return queryset.filter(444                        hostqueueentry__status__in=non_queued_statuses).filter(445                        hostqueueentry__complete=False).distinct()446            elif value == 'complete':447                return queryset.exclude(hostqueueentry__complete=False)448            else:449                raise query_lib.ConstraintError('Value must be one of queued, '450                                                'active or complete')451    @classmethod452    def add_query_selectors(cls, query_processor):453        query_processor.add_field_selector('id')454        query_processor.add_field_selector('name')455        query_processor.add_selector(456                query_lib.Selector('status',457                                   doc='One of queued, active or complete'),458                Job._StatusConstraint())459        query_processor.add_keyval_selector('has_keyval', models.JobKeyval,460                                            'key', 'value')461    @classmethod462    def from_uri_args(cls, request, job_id, **kwargs):463        return cls(request, models.Job.objects.get(id=job_id))464    def _uri_args(self):465        return {'job_id': self.instance.id}466    @classmethod467    def _do_prepare_for_full_representation(cls, instances):468        models.Job.objects.populate_relationships(instances, models.JobKeyval,469                                                  'keyvals')470    def short_representation(self):471        rep = super(Job, self).short_representation()472        try:473            string_priority = priorities.Priority.get_string(474                    self.instance.priority)475        except ValueError:476            string_priority = str(self.instance.priority)477        rep.update({'id': self.instance.id,478                    'owner': self.instance.owner,479                    'name': self.instance.name,480                    'priority': string_priority,481                    'created_on':482                        self._format_datetime(self.instance.created_on),483                    })484        return rep485    def full_representation(self):486        rep = super(Job, self).full_representation()487        queue_entries = QueueEntryCollection(self._request)488        queue_entries.set_query_parameters(job=self.instance.id)489        drone_set = self.instance.drone_set and self.instance.drone_set.name490        rep.update({'email_list': self.instance.email_list,491                    'parse_failed_repair':492                        bool(self.instance.parse_failed_repair),493                    'drone_set': drone_set,494                    'execution_info':495                        ExecutionInfo.execution_info_from_job(self.instance),496                    'queue_entries': queue_entries.link(),497                    'keyvals': dict((keyval.key, keyval.value)498                                    for keyval in self.instance.keyvals)499                    })500        return rep501    @classmethod502    def create_instance(cls, input_dict, containing_collection):503        owner = input_dict.get('owner')504        if not owner:505            owner = models.User.current_user().login506        cls._check_for_required_fields(input_dict, ('name', 'execution_info',507                                                    'queue_entries'))508        execution_info = input_dict['execution_info']509        cls._check_for_required_fields(execution_info, ('control_file',510                                                        'is_server'))511        if execution_info['is_server']:512            control_type = control_data.CONTROL_TYPE.SERVER513        else:514            control_type = control_data.CONTROL_TYPE.CLIENT515        options = dict(516                name=input_dict['name'],517                priority=input_dict.get('priority', None),518                control_file=execution_info['control_file'],519                control_type=control_type,520                is_template=input_dict.get('is_template', None),521                timeout_mins=execution_info.get('timeout_mins'),522                max_runtime_mins=execution_info.get('maximum_runtime_mins'),523                synch_count=execution_info.get('machines_per_execution'),524                run_verify=execution_info.get('run_verify'),525                run_reset=execution_info.get('run_reset'),526                email_list=input_dict.get('email_list', None),527                dependencies=execution_info.get('dependencies', ()),528                reboot_before=execution_info.get('cleanup_before_job'),529                reboot_after=execution_info.get('cleanup_after_job'),530                parse_failed_repair=input_dict.get('parse_failed_repair', None),531                drone_set=input_dict.get('drone_set', None),532                keyvals=input_dict.get('keyvals', None))533        host_objects, metahost_label_objects, atomic_group = [], [], None534        for queue_entry in input_dict['queue_entries']:535            if 'host' in queue_entry:536                host = queue_entry['host']537                if host: # can be None, indicated a hostless job538                    host_entry = containing_collection.resolve_link(host)539                    host_objects.append(host_entry.instance)540            elif 'meta_host' in queue_entry:541                label_entry = containing_collection.resolve_link(542                        queue_entry['meta_host'])543                metahost_label_objects.append(label_entry.instance)544            if 'atomic_group_class' in queue_entry:545                atomic_group_entry = containing_collection.resolve_link(546                        queue_entry['atomic_group_class'])547                if atomic_group:548                    assert atomic_group_entry.instance.id == atomic_group.id549                else:550                    atomic_group = atomic_group_entry.instance551        job_id = rpc_utils.create_new_job(552                owner=owner,553                options=options,554                host_objects=host_objects,555                metahost_objects=metahost_label_objects,556                atomic_group=atomic_group)557        return models.Job.objects.get(id=job_id)558    def update(self, input_dict):559        # Required for POST, doesn't actually support PUT560        pass561class JobCollection(resource_lib.Collection):562    queryset = models.Job.objects.order_by('-id')563    entry_class = Job564class QueueEntry(resource_lib.InstanceEntry):565    _permitted_methods = ('GET', 'PUT')566    model = models.HostQueueEntry567    @classmethod568    def add_query_selectors(cls, query_processor):569        query_processor.add_field_selector('host', field='host__hostname')570        query_processor.add_field_selector('job', field='job__id')571    @classmethod572    def from_uri_args(cls, request, queue_entry_id):573        instance = models.HostQueueEntry.objects.get(id=queue_entry_id)574        return cls(request, instance)575    def _uri_args(self):576        return {'queue_entry_id': self.instance.id}577    def short_representation(self):578        rep = super(QueueEntry, self).short_representation()579        if self.instance.host:580            host = (Host(self._request, self.instance.host)581                    .short_representation())582        else:583            host = None584        job = Job(self._request, self.instance.job)585        host = Host.from_optional_instance(self._request, self.instance.host)586        label = Label.from_optional_instance(self._request,587                                             self.instance.meta_host)588        atomic_group_class = AtomicGroupClass.from_optional_instance(589                self._request, self.instance.atomic_group)590        rep.update(591                {'job': job.short_representation(),592                 'host': host.short_representation(),593                 'label': label.short_representation(),594                 'atomic_group_class':595                     atomic_group_class.short_representation(),596                 'status': self.instance.status,597                 'execution_path': self.instance.execution_subdir,598                 'started_on': self._format_datetime(self.instance.started_on),599                 'aborted': bool(self.instance.aborted)})600        return rep601    def update(self, input_dict):602        if 'aborted' in input_dict:603            if input_dict['aborted'] != True:604                raise exceptions.BadRequest('"aborted" can only be set to true')605            query = models.HostQueueEntry.objects.filter(pk=self.instance.pk)606            models.AclGroup.check_abort_permissions(query)607            rpc_utils.check_abort_synchronous_jobs(query)608            self.instance.abort(thread_local.get_user())609class QueueEntryCollection(resource_lib.Collection):610    queryset = models.HostQueueEntry.objects.order_by('-id')611    entry_class = QueueEntry612class HealthTask(resource_lib.InstanceEntry):613    _permitted_methods = ('GET',)614    model = models.SpecialTask615    @classmethod616    def add_query_selectors(cls, query_processor):617        query_processor.add_field_selector('host', field='host__hostname')618    @classmethod619    def from_uri_args(cls, request, task_id):620        instance = models.SpecialTask.objects.get(id=task_id)621        return cls(request, instance)622    def _uri_args(self):623        return {'task_id': self.instance.id}624    def short_representation(self):625        rep = super(HealthTask, self).short_representation()626        host = Host(self._request, self.instance.host)627        queue_entry = QueueEntry.from_optional_instance(628                self._request, self.instance.queue_entry)629        rep.update(630                {'host': host.short_representation(),...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
