How to use prepare_rows_as_nested_dicts method in autotest

Best Python code snippet using autotest_python

rpc_interface.py

Source:rpc_interface.py Github

copy

Full Screen

...48def get_labels(**filter_data):49 """\50 @returns A sequence of nested dictionaries of label information.51 """52 return rpc_utils.prepare_rows_as_nested_dicts(53 models.Label.query_objects(filter_data),54 ('atomic_group',))55# atomic groups56def add_atomic_group(name, max_number_of_machines=None, description=None):57 return models.AtomicGroup.add_object(58 name=name, max_number_of_machines=max_number_of_machines,59 description=description).id60def modify_atomic_group(id, **data):61 models.AtomicGroup.smart_get(id).update_object(data)62def delete_atomic_group(id):63 models.AtomicGroup.smart_get(id).delete()64def atomic_group_add_labels(id, labels):65 label_objs = models.Label.smart_get_bulk(labels)66 models.AtomicGroup.smart_get(id).label_set.add(*label_objs)67def atomic_group_remove_labels(id, labels):68 label_objs = models.Label.smart_get_bulk(labels)69 models.AtomicGroup.smart_get(id).label_set.remove(*label_objs)70def get_atomic_groups(**filter_data):71 return rpc_utils.prepare_for_serialization(72 models.AtomicGroup.list_objects(filter_data))73# hosts74def add_host(hostname, status=None, locked=None, protection=None):75 return models.Host.add_object(hostname=hostname, status=status,76 locked=locked, protection=protection).id77def modify_host(id, **data):78 rpc_utils.check_modify_host(data)79 host = models.Host.smart_get(id)80 rpc_utils.check_modify_host_locking(host, data)81 host.update_object(data)82def modify_hosts(host_filter_data, update_data):83 """84 @param host_filter_data: Filters out which hosts to modify.85 @param update_data: A dictionary with the changes to make to the hosts.86 """87 rpc_utils.check_modify_host(update_data)88 hosts = models.Host.query_objects(host_filter_data)89 for host in hosts:90 host.update_object(update_data)91def host_add_labels(id, labels):92 labels = models.Label.smart_get_bulk(labels)93 host = models.Host.smart_get(id)94 platforms = [label.name for label in labels if label.platform]95 if len(platforms) > 1:96 raise model_logic.ValidationError(97 {'labels': 'Adding more than one platform label: %s' %98 ', '.join(platforms)})99 if len(platforms) == 1:100 models.Host.check_no_platform([host])101 host.labels.add(*labels)102def host_remove_labels(id, labels):103 labels = models.Label.smart_get_bulk(labels)104 models.Host.smart_get(id).labels.remove(*labels)105def set_host_attribute(attribute, value, **host_filter_data):106 """107 @param attribute string name of attribute108 @param value string, or None to delete an attribute109 @param host_filter_data filter data to apply to Hosts to choose hosts to act110 upon111 """112 assert host_filter_data # disallow accidental actions on all hosts113 hosts = models.Host.query_objects(host_filter_data)114 models.AclGroup.check_for_acl_violation_hosts(hosts)115 for host in hosts:116 host.set_or_delete_attribute(attribute, value)117def delete_host(id):118 models.Host.smart_get(id).delete()119def get_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,120 exclude_atomic_group_hosts=False, valid_only=True, **filter_data):121 """122 @param multiple_labels: match hosts in all of the labels given. Should123 be a list of label names.124 @param exclude_only_if_needed_labels: Exclude hosts with at least one125 "only_if_needed" label applied.126 @param exclude_atomic_group_hosts: Exclude hosts that have one or more127 atomic group labels associated with them.128 """129 hosts = rpc_utils.get_host_query(multiple_labels,130 exclude_only_if_needed_labels,131 exclude_atomic_group_hosts,132 valid_only, filter_data)133 hosts = list(hosts)134 models.Host.objects.populate_relationships(hosts, models.Label,135 'label_list')136 models.Host.objects.populate_relationships(hosts, models.AclGroup,137 'acl_list')138 models.Host.objects.populate_relationships(hosts, models.HostAttribute,139 'attribute_list')140 host_dicts = []141 for host_obj in hosts:142 host_dict = host_obj.get_object_dict()143 host_dict['labels'] = [label.name for label in host_obj.label_list]144 host_dict['platform'], host_dict['atomic_group'] = (rpc_utils.145 find_platform_and_atomic_group(host_obj))146 host_dict['acls'] = [acl.name for acl in host_obj.acl_list]147 host_dict['attributes'] = dict((attribute.attribute, attribute.value)148 for attribute in host_obj.attribute_list)149 host_dicts.append(host_dict)150 return rpc_utils.prepare_for_serialization(host_dicts)151def get_num_hosts(multiple_labels=(), exclude_only_if_needed_labels=False,152 exclude_atomic_group_hosts=False, valid_only=True,153 **filter_data):154 """155 Same parameters as get_hosts().156 @returns The number of matching hosts.157 """158 hosts = rpc_utils.get_host_query(multiple_labels,159 exclude_only_if_needed_labels,160 exclude_atomic_group_hosts,161 valid_only, filter_data)162 return hosts.count()163# tests164def add_test(name, test_type, path, author=None, dependencies=None,165 experimental=True, run_verify=None, test_class=None,166 test_time=None, test_category=None, description=None,167 sync_count=1):168 return models.Test.add_object(name=name, test_type=test_type, path=path,169 author=author, dependencies=dependencies,170 experimental=experimental,171 run_verify=run_verify, test_time=test_time,172 test_category=test_category,173 sync_count=sync_count,174 test_class=test_class,175 description=description).id176def modify_test(id, **data):177 models.Test.smart_get(id).update_object(data)178def delete_test(id):179 models.Test.smart_get(id).delete()180def get_tests(**filter_data):181 return rpc_utils.prepare_for_serialization(182 models.Test.list_objects(filter_data))183# profilers184def add_profiler(name, description=None):185 return models.Profiler.add_object(name=name, description=description).id186def modify_profiler(id, **data):187 models.Profiler.smart_get(id).update_object(data)188def delete_profiler(id):189 models.Profiler.smart_get(id).delete()190def get_profilers(**filter_data):191 return rpc_utils.prepare_for_serialization(192 models.Profiler.list_objects(filter_data))193# users194def add_user(login, access_level=None):195 return models.User.add_object(login=login, access_level=access_level).id196def modify_user(id, **data):197 models.User.smart_get(id).update_object(data)198def delete_user(id):199 models.User.smart_get(id).delete()200def get_users(**filter_data):201 return rpc_utils.prepare_for_serialization(202 models.User.list_objects(filter_data))203# acl groups204def add_acl_group(name, description=None):205 group = models.AclGroup.add_object(name=name, description=description)206 group.users.add(models.User.current_user())207 return group.id208def modify_acl_group(id, **data):209 group = models.AclGroup.smart_get(id)210 group.check_for_acl_violation_acl_group()211 group.update_object(data)212 group.add_current_user_if_empty()213def acl_group_add_users(id, users):214 group = models.AclGroup.smart_get(id)215 group.check_for_acl_violation_acl_group()216 users = models.User.smart_get_bulk(users)217 group.users.add(*users)218def acl_group_remove_users(id, users):219 group = models.AclGroup.smart_get(id)220 group.check_for_acl_violation_acl_group()221 users = models.User.smart_get_bulk(users)222 group.users.remove(*users)223 group.add_current_user_if_empty()224def acl_group_add_hosts(id, hosts):225 group = models.AclGroup.smart_get(id)226 group.check_for_acl_violation_acl_group()227 hosts = models.Host.smart_get_bulk(hosts)228 group.hosts.add(*hosts)229 group.on_host_membership_change()230def acl_group_remove_hosts(id, hosts):231 group = models.AclGroup.smart_get(id)232 group.check_for_acl_violation_acl_group()233 hosts = models.Host.smart_get_bulk(hosts)234 group.hosts.remove(*hosts)235 group.on_host_membership_change()236def delete_acl_group(id):237 models.AclGroup.smart_get(id).delete()238def get_acl_groups(**filter_data):239 acl_groups = models.AclGroup.list_objects(filter_data)240 for acl_group in acl_groups:241 acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])242 acl_group['users'] = [user.login243 for user in acl_group_obj.users.all()]244 acl_group['hosts'] = [host.hostname245 for host in acl_group_obj.hosts.all()]246 return rpc_utils.prepare_for_serialization(acl_groups)247# jobs248def generate_control_file(tests=(), kernel=None, label=None, profilers=(),249 client_control_file='', use_container=False,250 profile_only=None, upload_kernel_config=False):251 """252 Generates a client-side control file to load a kernel and run tests.253 @param tests List of tests to run.254 @param kernel A list of kernel info dictionaries configuring which kernels255 to boot for this job and other options for them256 @param label Name of label to grab kernel config from.257 @param profilers List of profilers to activate during the job.258 @param client_control_file The contents of a client-side control file to259 run at the end of all tests. If this is supplied, all tests must be260 client side.261 TODO: in the future we should support server control files directly262 to wrap with a kernel. That'll require changing the parameter263 name and adding a boolean to indicate if it is a client or server264 control file.265 @param use_container unused argument today. TODO: Enable containers266 on the host during a client side test.267 @param profile_only A boolean that indicates what default profile_only268 mode to use in the control file. Passing None will generate a269 control file that does not explcitly set the default mode at all.270 @param upload_kernel_config: if enabled it will generate server control271 file code that uploads the kernel config file to the client and272 tells the client of the new (local) path when compiling the kernel;273 the tests must be server side tests274 @returns a dict with the following keys:275 control_file: str, The control file text.276 is_server: bool, is the control file a server-side control file?277 synch_count: How many machines the job uses per autoserv execution.278 synch_count == 1 means the job is asynchronous.279 dependencies: A list of the names of labels on which the job depends.280 """281 if not tests and not client_control_file:282 return dict(control_file='', is_server=False, synch_count=1,283 dependencies=[])284 cf_info, test_objects, profiler_objects, label = (285 rpc_utils.prepare_generate_control_file(tests, kernel, label,286 profilers))287 cf_info['control_file'] = control_file.generate_control(288 tests=test_objects, kernels=kernel, platform=label,289 profilers=profiler_objects, is_server=cf_info['is_server'],290 client_control_file=client_control_file, profile_only=profile_only,291 upload_kernel_config=upload_kernel_config)292 return cf_info293def create_parameterized_job(name, priority, test, parameters, kernel=None,294 label=None, profilers=(), profiler_parameters=None,295 use_container=False, profile_only=None,296 upload_kernel_config=False, hosts=(),297 meta_hosts=(), one_time_hosts=(),298 atomic_group_name=None, synch_count=None,299 is_template=False, timeout=None,300 max_runtime_hrs=None, run_verify=True,301 email_list='', dependencies=(), reboot_before=None,302 reboot_after=None, parse_failed_repair=None,303 hostless=False, keyvals=None, drone_set=None):304 """305 Creates and enqueues a parameterized job.306 Most parameters a combination of the parameters for generate_control_file()307 and create_job(), with the exception of:308 @param test name or ID of the test to run309 @param parameters a map of parameter name ->310 tuple of (param value, param type)311 @param profiler_parameters a dictionary of parameters for the profilers:312 key: profiler name313 value: dict of param name -> tuple of314 (param value,315 param type)316 """317 # Save the values of the passed arguments here. What we're going to do with318 # them is pass them all to rpc_utils.get_create_job_common_args(), which319 # will extract the subset of these arguments that apply for320 # rpc_utils.create_job_common(), which we then pass in to that function.321 args = locals()322 # Set up the parameterized job configs323 test_obj = models.Test.smart_get(test)324 if test_obj.test_type == model_attributes.TestTypes.SERVER:325 control_type = models.Job.ControlType.SERVER326 else:327 control_type = models.Job.ControlType.CLIENT328 try:329 label = models.Label.smart_get(label)330 except models.Label.DoesNotExist:331 label = None332 kernel_objs = models.Kernel.create_kernels(kernel)333 profiler_objs = [models.Profiler.smart_get(profiler)334 for profiler in profilers]335 parameterized_job = models.ParameterizedJob.objects.create(336 test=test_obj, label=label, use_container=use_container,337 profile_only=profile_only,338 upload_kernel_config=upload_kernel_config)339 parameterized_job.kernels.add(*kernel_objs)340 for profiler in profiler_objs:341 parameterized_profiler = models.ParameterizedJobProfiler.objects.create(342 parameterized_job=parameterized_job,343 profiler=profiler)344 profiler_params = profiler_parameters.get(profiler.name, {})345 for name, (value, param_type) in profiler_params.iteritems():346 models.ParameterizedJobProfilerParameter.objects.create(347 parameterized_job_profiler=parameterized_profiler,348 parameter_name=name,349 parameter_value=value,350 parameter_type=param_type)351 try:352 for parameter in test_obj.testparameter_set.all():353 if parameter.name in parameters:354 param_value, param_type = parameters.pop(parameter.name)355 parameterized_job.parameterizedjobparameter_set.create(356 test_parameter=parameter, parameter_value=param_value,357 parameter_type=param_type)358 if parameters:359 raise Exception('Extra parameters remain: %r' % parameters)360 return rpc_utils.create_job_common(361 parameterized_job=parameterized_job.id,362 control_type=control_type,363 **rpc_utils.get_create_job_common_args(args))364 except:365 parameterized_job.delete()366 raise367def create_job(name, priority, control_file, control_type,368 hosts=(), meta_hosts=(), one_time_hosts=(),369 atomic_group_name=None, synch_count=None, is_template=False,370 timeout=None, max_runtime_hrs=None, run_verify=True,371 email_list='', dependencies=(), reboot_before=None,372 reboot_after=None, parse_failed_repair=None, hostless=False,373 keyvals=None, drone_set=None):374 """\375 Create and enqueue a job.376 @param name name of this job377 @param priority Low, Medium, High, Urgent378 @param control_file String contents of the control file.379 @param control_type Type of control file, Client or Server.380 @param synch_count How many machines the job uses per autoserv execution.381 synch_count == 1 means the job is asynchronous. If an atomic group is382 given this value is treated as a minimum.383 @param is_template If true then create a template job.384 @param timeout Hours after this call returns until the job times out.385 @param max_runtime_hrs Hours from job starting time until job times out386 @param run_verify Should the host be verified before running the test?387 @param email_list String containing emails to mail when the job is done388 @param dependencies List of label names on which this job depends389 @param reboot_before Never, If dirty, or Always390 @param reboot_after Never, If all tests passed, or Always391 @param parse_failed_repair if true, results of failed repairs launched by392 this job will be parsed as part of the job.393 @param hostless if true, create a hostless job394 @param keyvals dict of keyvals to associate with the job395 @param hosts List of hosts to run job on.396 @param meta_hosts List where each entry is a label name, and for each entry397 one host will be chosen from that label to run the job on.398 @param one_time_hosts List of hosts not in the database to run the job on.399 @param atomic_group_name The name of an atomic group to schedule the job on.400 @param drone_set The name of the drone set to run this test on.401 @returns The created Job id number.402 """403 return rpc_utils.create_job_common(404 **rpc_utils.get_create_job_common_args(locals()))405def abort_host_queue_entries(**filter_data):406 """\407 Abort a set of host queue entries.408 """409 query = models.HostQueueEntry.query_objects(filter_data)410 query = query.filter(complete=False)411 models.AclGroup.check_abort_permissions(query)412 host_queue_entries = list(query.select_related())413 rpc_utils.check_abort_synchronous_jobs(host_queue_entries)414 for queue_entry in host_queue_entries:415 queue_entry.abort()416def reverify_hosts(**filter_data):417 """\418 Schedules a set of hosts for verify.419 @returns A list of hostnames that a verify task was created for.420 """421 hosts = models.Host.query_objects(filter_data)422 models.AclGroup.check_for_acl_violation_hosts(hosts)423 for host in hosts:424 models.SpecialTask.schedule_special_task(host,425 models.SpecialTask.Task.VERIFY)426 return list(sorted(host.hostname for host in hosts))427def get_jobs(not_yet_run=False, running=False, finished=False, **filter_data):428 """\429 Extra filter args for get_jobs:430 -not_yet_run: Include only jobs that have not yet started running.431 -running: Include only jobs that have start running but for which not432 all hosts have completed.433 -finished: Include only jobs for which all hosts have completed (or434 aborted).435 At most one of these three fields should be specified.436 """437 filter_data['extra_args'] = rpc_utils.extra_job_filters(not_yet_run,438 running,439 finished)440 job_dicts = []441 jobs = list(models.Job.query_objects(filter_data))442 models.Job.objects.populate_relationships(jobs, models.Label,443 'dependencies')444 models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')445 for job in jobs:446 job_dict = job.get_object_dict()447 job_dict['dependencies'] = ','.join(label.name448 for label in job.dependencies)449 job_dict['keyvals'] = dict((keyval.key, keyval.value)450 for keyval in job.keyvals)451 job_dicts.append(job_dict)452 return rpc_utils.prepare_for_serialization(job_dicts)453def get_num_jobs(not_yet_run=False, running=False, finished=False,454 **filter_data):455 """\456 See get_jobs() for documentation of extra filter parameters.457 """458 filter_data['extra_args'] = rpc_utils.extra_job_filters(not_yet_run,459 running,460 finished)461 return models.Job.query_count(filter_data)462def get_jobs_summary(**filter_data):463 """\464 Like get_jobs(), but adds a 'status_counts' field, which is a dictionary465 mapping status strings to the number of hosts currently with that466 status, i.e. {'Queued' : 4, 'Running' : 2}.467 """468 jobs = get_jobs(**filter_data)469 ids = [job['id'] for job in jobs]470 all_status_counts = models.Job.objects.get_status_counts(ids)471 for job in jobs:472 job['status_counts'] = all_status_counts[job['id']]473 return rpc_utils.prepare_for_serialization(jobs)474def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):475 """\476 Retrieves all the information needed to clone a job.477 """478 job = models.Job.objects.get(id=id)479 job_info = rpc_utils.get_job_info(job,480 preserve_metahosts,481 queue_entry_filter_data)482 host_dicts = []483 for host in job_info['hosts']:484 host_dict = get_hosts(id=host.id)[0]485 other_labels = host_dict['labels']486 if host_dict['platform']:487 other_labels.remove(host_dict['platform'])488 host_dict['other_labels'] = ', '.join(other_labels)489 host_dicts.append(host_dict)490 for host in job_info['one_time_hosts']:491 host_dict = dict(hostname=host.hostname,492 id=host.id,493 platform='(one-time host)',494 locked_text='')495 host_dicts.append(host_dict)496 # convert keys from Label objects to strings (names of labels)497 meta_host_counts = dict((meta_host.name, count) for meta_host, count498 in job_info['meta_host_counts'].iteritems())499 info = dict(job=job.get_object_dict(),500 meta_host_counts=meta_host_counts,501 hosts=host_dicts)502 info['job']['dependencies'] = job_info['dependencies']503 if job_info['atomic_group']:504 info['atomic_group_name'] = (job_info['atomic_group']).name505 else:506 info['atomic_group_name'] = None507 info['hostless'] = job_info['hostless']508 info['drone_set'] = job.drone_set and job.drone_set.name509 return rpc_utils.prepare_for_serialization(info)510# host queue entries511def get_host_queue_entries(**filter_data):512 """\513 @returns A sequence of nested dictionaries of host and job information.514 """515 return rpc_utils.prepare_rows_as_nested_dicts(516 models.HostQueueEntry.query_objects(filter_data),517 ('host', 'atomic_group', 'job'))518def get_num_host_queue_entries(**filter_data):519 """\520 Get the number of host queue entries associated with this job.521 """522 return models.HostQueueEntry.query_count(filter_data)523def get_hqe_percentage_complete(**filter_data):524 """525 Computes the fraction of host queue entries matching the given filter data526 that are complete.527 """528 query = models.HostQueueEntry.query_objects(filter_data)529 complete_count = query.filter(complete=True).count()530 total_count = query.count()531 if total_count == 0:532 return 1533 return float(complete_count) / total_count534# special tasks535def get_special_tasks(**filter_data):536 return rpc_utils.prepare_rows_as_nested_dicts(537 models.SpecialTask.query_objects(filter_data),538 ('host', 'queue_entry'))539# support for host detail view540def get_host_queue_entries_and_special_tasks(hostname, query_start=None,541 query_limit=None):542 """543 @returns an interleaved list of HostQueueEntries and SpecialTasks,544 in approximate run order. each dict contains keys for type, host,545 job, status, started_on, execution_path, and ID.546 """547 total_limit = None548 if query_limit is not None:549 total_limit = query_start + query_limit550 filter_data = {'host__hostname': hostname,551 'query_limit': total_limit,552 'sort_by': ['-id']}553 queue_entries = list(models.HostQueueEntry.query_objects(filter_data))554 special_tasks = list(models.SpecialTask.query_objects(filter_data))555 interleaved_entries = rpc_utils.interleave_entries(queue_entries,556 special_tasks)557 if query_start is not None:558 interleaved_entries = interleaved_entries[query_start:]559 if query_limit is not None:560 interleaved_entries = interleaved_entries[:query_limit]561 return rpc_utils.prepare_for_serialization(interleaved_entries)562def get_num_host_queue_entries_and_special_tasks(hostname):563 filter_data = {'host__hostname': hostname}564 return (models.HostQueueEntry.query_count(filter_data)565 + models.SpecialTask.query_count(filter_data))566# recurring run567def get_recurring(**filter_data):568 return rpc_utils.prepare_rows_as_nested_dicts(569 models.RecurringRun.query_objects(filter_data),570 ('job', 'owner'))571def get_num_recurring(**filter_data):572 return models.RecurringRun.query_count(filter_data)573def delete_recurring_runs(**filter_data):574 to_delete = models.RecurringRun.query_objects(filter_data)575 to_delete.delete()576def create_recurring_run(job_id, start_date, loop_period, loop_count):577 owner = models.User.current_user().login578 job = models.Job.objects.get(id=job_id)579 return job.create_recurring_job(start_date=start_date,580 loop_period=loop_period,581 loop_count=loop_count,582 owner=owner)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful