How to use execution_info_from_job method in autotest

Best Python code snippet using autotest_python

resources.py

Source:resources.py Github

copy

Full Screen

...336 'the kernel config file to the client and tells the client of '337 'the new (local) path when compiling the kernel; the tests '338 'must be server side tests'))339 @classmethod340 def execution_info_from_job(cls, job):341 return {'control_file': job.control_file,342 'is_server': 343 job.control_type == control_data.CONTROL_TYPE.SERVER,344 'dependencies': [label.name for label345 in job.dependency_labels.all()],346 'machines_per_execution': job.synch_count,347 'run_verify': bool(job.run_verify),348 'run_reset': bool(job.run_reset),349 'timeout_mins': job.timeout_mins,350 'maximum_runtime_mins': job.max_runtime_mins,351 'cleanup_before_job':352 model_attributes.RebootBefore.get_string(job.reboot_before),353 'cleanup_after_job':354 model_attributes.RebootAfter.get_string(job.reboot_after),355 }356 def _get_execution_info(self, input_dict):357 tests = input_dict.get('tests', '')358 client_control_file = input_dict.get('client_control_file', None)359 if not tests and not client_control_file:360 return self._DEFAULTS361 test_list = tests.split(',')362 if 'profilers' in input_dict:363 profilers_list = input_dict['profilers'].split(',')364 else:365 profilers_list = []366 kernels = input_dict.get('kernels', '') # TODO367 if kernels:368 kernels = [dict(version=kernel) for kernel in kernels.split(',')]369 cf_info, test_objects, profiler_objects, label = (370 rpc_utils.prepare_generate_control_file(371 test_list, kernels, None, profilers_list))372 control_file_contents = control_file.generate_control(373 tests=test_objects, kernels=kernels,374 profilers=profiler_objects, is_server=cf_info['is_server'],375 client_control_file=client_control_file,376 profile_only=input_dict.get('profile_only', None),377 upload_kernel_config=input_dict.get(378 'upload_kernel_config', None))379 return dict(self._DEFAULTS,380 control_file=control_file_contents,381 is_server=cf_info['is_server'],382 dependencies=cf_info['dependencies'],383 machines_per_execution=cf_info['synch_count'])384 def handle_request(self):385 result = self.link()386 result['execution_info'] = self._get_execution_info(387 self._request.REQUEST)388 return self._basic_response(result)389class QueueEntriesRequest(resource_lib.Resource):390 _permitted_methods = ('GET',)391 def _query_parameters_accepted(self):392 return (('hosts', 'Comma-separated list of hostnames'),393 ('one_time_hosts',394 'Comma-separated list of hostnames not already in the '395 'Autotest system'),396 ('meta_hosts',397 'Comma-separated list of label names; for each one, an entry '398 'will be created and assigned at runtime to an available host '399 'with that label'),400 ('atomic_group_class', 'TODO'))401 def _read_list(self, list_string):402 if list_string:403 return list_string.split(',')404 return []405 def handle_request(self):406 request_dict = self._request.REQUEST407 hosts = self._read_list(request_dict.get('hosts'))408 one_time_hosts = self._read_list(request_dict.get('one_time_hosts'))409 meta_hosts = self._read_list(request_dict.get('meta_hosts'))410 atomic_group_class = request_dict.get('atomic_group_class')411 # TODO: bring in all the atomic groups magic from create_job()412 entries = []413 for hostname in one_time_hosts:414 models.Host.create_one_time_host(hostname)415 for hostname in hosts:416 entry = Host.from_uri_args(self._request, hostname)417 entries.append({'host': entry.link()})418 for label_name in meta_hosts:419 entry = Label.from_uri_args(self._request, label_name)420 entries.append({'meta_host': entry.link()})421 if atomic_group_class:422 entries.append({'atomic_group_class': atomic_group_class})423 result = self.link()424 result['queue_entries'] = entries425 return self._basic_response(result)426class Job(resource_lib.InstanceEntry):427 _permitted_methods = ('GET',)428 model = models.Job429 class _StatusConstraint(query_lib.Constraint):430 def apply_constraint(self, queryset, value, comparison_type,431 is_inverse):432 if comparison_type != 'equals' or is_inverse:433 raise query_lib.ConstraintError('Can only use this selector '434 'with equals')435 non_queued_statuses = [436 status for status, _437 in models.HostQueueEntry.Status.choices()438 if status != models.HostQueueEntry.Status.QUEUED]439 if value == 'queued':440 return queryset.exclude(441 hostqueueentry__status__in=non_queued_statuses)442 elif value == 'active':443 return queryset.filter(444 hostqueueentry__status__in=non_queued_statuses).filter(445 hostqueueentry__complete=False).distinct()446 elif value == 'complete':447 return queryset.exclude(hostqueueentry__complete=False)448 else:449 raise query_lib.ConstraintError('Value must be one of queued, '450 'active or complete')451 @classmethod452 def add_query_selectors(cls, query_processor):453 query_processor.add_field_selector('id')454 query_processor.add_field_selector('name')455 query_processor.add_selector(456 query_lib.Selector('status',457 doc='One of queued, active or complete'),458 Job._StatusConstraint())459 query_processor.add_keyval_selector('has_keyval', models.JobKeyval,460 'key', 'value')461 @classmethod462 def from_uri_args(cls, request, job_id, **kwargs):463 return cls(request, models.Job.objects.get(id=job_id))464 def _uri_args(self):465 return {'job_id': self.instance.id}466 @classmethod467 def _do_prepare_for_full_representation(cls, instances):468 models.Job.objects.populate_relationships(instances, models.JobKeyval,469 'keyvals')470 def short_representation(self):471 rep = super(Job, self).short_representation()472 try:473 string_priority = priorities.Priority.get_string(474 self.instance.priority)475 except ValueError:476 string_priority = str(self.instance.priority)477 rep.update({'id': self.instance.id,478 'owner': self.instance.owner,479 'name': self.instance.name,480 'priority': string_priority,481 'created_on':482 self._format_datetime(self.instance.created_on),483 })484 return rep485 def full_representation(self):486 rep = super(Job, self).full_representation()487 queue_entries = QueueEntryCollection(self._request)488 queue_entries.set_query_parameters(job=self.instance.id)489 drone_set = self.instance.drone_set and self.instance.drone_set.name490 rep.update({'email_list': self.instance.email_list,491 'parse_failed_repair':492 bool(self.instance.parse_failed_repair),493 'drone_set': drone_set,494 'execution_info':495 ExecutionInfo.execution_info_from_job(self.instance),496 'queue_entries': queue_entries.link(),497 'keyvals': dict((keyval.key, keyval.value)498 for keyval in self.instance.keyvals)499 })500 return rep501 @classmethod502 def create_instance(cls, input_dict, containing_collection):503 owner = input_dict.get('owner')504 if not owner:505 owner = models.User.current_user().login506 cls._check_for_required_fields(input_dict, ('name', 'execution_info',507 'queue_entries'))508 execution_info = input_dict['execution_info']509 cls._check_for_required_fields(execution_info, ('control_file',...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful