Best Python code snippet using autotest_python
rpc_interface.py
Source:rpc_interface.py  
...316                                     exclude_only_if_needed_labels,317                                     exclude_atomic_group_hosts,318                                     valid_only, filter_data)319    return hosts.count()320def get_install_server_profiles():321    """322    Get install server profiles.323    :return: Sequence of profiles.324    """325    install_server = None326    install_server_info = get_install_server_info()327    install_server_type = install_server_info.get('type', None)328    install_server_url = install_server_info.get('xmlrpc_url', None)329    if install_server_type == 'cobbler' and install_server_url:330        install_server = xmlrpclib.ServerProxy(install_server_url)331    if install_server is None:332        return None333    return install_server.get_item_names('profile')334def get_profiles():335    """336    Get profiles.337    :return: Sequence of profiles.338    """339    error_encountered = True340    profile_dicts = []341    profiles = get_install_server_profiles()342    if profiles is not None:343        if len(profiles) < 1:344            msg = 'No profiles defined on install server'345            rpc_logger = logging.getLogger('rpc_logger')346            rpc_logger.info(msg)347        else:348            error_encountered = False349            # not sorted350            profiles.sort()351            profile_dicts.append(dict(name="Do_not_install"))352            for profile in profiles:353                profile_dicts.append(dict(name=profile))354    if error_encountered:355        profile_dicts.append(dict(name="N/A"))356    return rpc_utils.prepare_for_serialization(profile_dicts)357def get_num_profiles():358    """359    Get the number of profiles. Same parameters as get_profiles().360    :return: The number of defined profiles.361    """362    error_encountered = True363    profiles = get_install_server_profiles()364    if profiles is not None:365        if len(profiles) < 1:366            # 'N/A'367            return 1368        else:369            # include 'Do_not_install'370            return len(profiles) + 1371    if error_encountered:372        # 'N/A'373        return 1374def reserve_hosts(host_filter_data, username=None):375    """376    Reserve some hosts.377    :param host_filter_data: Filters out which hosts to reserve.378    :param username: login of the user reserving hosts379    :type username: str380    :return: None.381    """382    hosts = models.Host.query_objects(host_filter_data)383    reservations.create(hosts_to_reserve=[h.hostname for h in hosts],384                        username=username)385def release_hosts(host_filter_data, username=None):386    """387    Release some hosts.388    :param host_filter_data: Filters out which hosts to release.389    :param username: login of the user reserving hosts390    :type username: str391    :return: None.392    """393    hosts = models.Host.query_objects(host_filter_data)394    reservations.release(hosts_to_release=[h.hostname for h in hosts],395                         username=username)396def force_release_hosts(host_filter_data, username=None):397    """398    Force release some hosts (remove all ACLs).399    :param host_filter_data: Filters out which hosts to release.400    :param username: login of the user releasing hosts, which needs have elevated privileges401    :type username: str402    :return: None.403    """404    hosts = models.Host.query_objects(host_filter_data)405    reservations.force_release(hosts_to_release=[h.hostname for h in hosts],406                               username=username)407# tests408def add_test(name, test_type, path, author=None, dependencies=None,409             experimental=True, run_verify=None, test_class=None,410             test_time=None, test_category=None, description=None,411             sync_count=1):412    """413    Add (create) test.414    :param name: Test name.415    :param test_type: Test type (Client or Server).416    :param path: Relative path to the test.417    :param author: The author of the test (optional).418    :param dependencies: Dependencies (optional).419    :param experimental: Experimental? (True or False) (optional).420    :param run_verify: Run verify? (True or False) (optional).421    :param test_class: Test class (optional).422    :param test_time: Test time (optional).423    :param test_category: Test category (optional).424    :param description: Description (optional).425    :param sync_count: Sync count (optional).426    :return: ID.427    """428    return models.Test.add_object(name=name, test_type=test_type, path=path,429                                  author=author, dependencies=dependencies,430                                  experimental=experimental,431                                  run_verify=run_verify, test_time=test_time,432                                  test_category=test_category,433                                  sync_count=sync_count,434                                  test_class=test_class,435                                  description=description).id436def modify_test(id, **data):437    """438    Modify (update) test.439    :param id: Test identification.440    :param data: Test data to modify.441    :return: None.442    """443    models.Test.smart_get(id).update_object(data)444def delete_test(id):445    """446    Delete test.447    :param id: Test identification.448    :return: None.449    """450    models.Test.smart_get(id).delete()451def get_tests(**filter_data):452    """453    Get tests.454    :param filter_data: Filters out which tests to get.455    :return: Sequence of tests.456    """457    return rpc_utils.prepare_for_serialization(458        models.Test.list_objects(filter_data))459# profilers460def add_profiler(name, description=None):461    """462    Add (create) profiler.463    :param name: The name of the profiler.464    :param description: Description (optional).465    :return: ID.466    """467    return models.Profiler.add_object(name=name, description=description).id468def modify_profiler(id, **data):469    """470    Modify (update) profiler.471    :param id: Profiler identification.472    :param data: Profiler data to modify.473    :return: None.474    """475    models.Profiler.smart_get(id).update_object(data)476def delete_profiler(id):477    """478    Delete profiler.479    :param id: Profiler identification.480    :return: None.481    """482    models.Profiler.smart_get(id).delete()483def get_profilers(**filter_data):484    """485    Get all profilers.486    :param filter_data: Filters out which profilers to get.487    :return: Sequence of profilers.488    """489    return rpc_utils.prepare_for_serialization(490        models.Profiler.list_objects(filter_data))491# users492def add_user(login, access_level=None):493    """494    Add (create) user.495    :param login: The login name.496    :param acess_level: Access level (optional).497    :return: ID.498    """499    return models.User.add_object(login=login, access_level=access_level).id500def modify_user(id, **data):501    """502    Modify (update) user.503    :param id: User identification.504    :param data: User data to modify.505    :return: None.506    """507    models.User.smart_get(id).update_object(data)508def delete_user(id):509    """510    Delete user.511    :param id: User identification.512    :return: None.513    """514    models.User.smart_get(id).delete()515def get_users(**filter_data):516    """517    Get users.518    :param filter_data: Filters out which users to get.519    :return: Sequence of users.520    """521    return rpc_utils.prepare_for_serialization(522        models.User.list_objects(filter_data))523# acl groups524def add_acl_group(name, description=None):525    """526    Add (create) ACL group.527    :param name: The name of the ACL group.528    :param description: Description (optional).529    :return: ID.530    """531    group = models.AclGroup.add_object(name=name, description=description)532    group.users.add(models.User.current_user())533    return group.id534def modify_acl_group(id, **data):535    """536    Modify (update) ACL group.537    :param id: ACL group identification.538    :param data: ACL group data to modify.539    :return: None.540    """541    group = models.AclGroup.smart_get(id)542    group.check_for_acl_violation_acl_group()543    group.update_object(data)544    group.add_current_user_if_empty()545def acl_group_add_users(id, users):546    """547    Add users to an ACL group.548    :param id: ACL group identification.549    :param users: Sequence of users.550    :return: None.551    """552    group = models.AclGroup.smart_get(id)553    group.check_for_acl_violation_acl_group()554    users = models.User.smart_get_bulk(users)555    group.users.add(*users)556def acl_group_remove_users(id, users):557    """558    Remove users from an ACL group.559    :param id: ACL group identification.560    :param users: Sequence of users.561    :return: None.562    """563    group = models.AclGroup.smart_get(id)564    group.check_for_acl_violation_acl_group()565    users = models.User.smart_get_bulk(users)566    group.users.remove(*users)567    group.add_current_user_if_empty()568def acl_group_add_hosts(id, hosts):569    """570    Add hosts to an ACL group.571    :param id: ACL group identification.572    :param hosts: Sequence of hosts to add.573    :return: None.574    """575    group = models.AclGroup.smart_get(id)576    group.check_for_acl_violation_acl_group()577    hosts = models.Host.smart_get_bulk(hosts)578    group.hosts.add(*hosts)579    group.on_host_membership_change()580def acl_group_remove_hosts(id, hosts):581    """582    Remove hosts from an ACL group.583    :param id: ACL group identification.584    :param hosts: Sequence of hosts to remove.585    :return: None.586    """587    group = models.AclGroup.smart_get(id)588    group.check_for_acl_violation_acl_group()589    hosts = models.Host.smart_get_bulk(hosts)590    group.hosts.remove(*hosts)591    group.on_host_membership_change()592def delete_acl_group(id):593    """594    Delete ACL group.595    :param id: ACL group identification.596    :return: None.597    """598    models.AclGroup.smart_get(id).delete()599def get_acl_groups(**filter_data):600    """601    Get ACL groups.602    :param filter_data: Filters out which ACL groups to get.603    :return: Sequence of ACL groups.604    """605    acl_groups = models.AclGroup.list_objects(filter_data)606    for acl_group in acl_groups:607        acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])608        acl_group['users'] = [user.login609                              for user in acl_group_obj.users.all()]610        acl_group['hosts'] = [host.hostname611                              for host in acl_group_obj.hosts.all()]612    return rpc_utils.prepare_for_serialization(acl_groups)613# jobs614def generate_control_file(tests=(), kernel=None, label=None, profilers=(),615                          client_control_file='', use_container=False,616                          profile_only=None, upload_kernel_config=False):617    """618    Generates a client-side control file to load a kernel and run tests.619    :param tests List of tests to run.620    :param kernel A list of kernel info dictionaries configuring which kernels621        to boot for this job and other options for them622    :param label Name of label to grab kernel config from.623    :param profilers List of profilers to activate during the job.624    :param client_control_file The contents of a client-side control file to625        run at the end of all tests.  If this is supplied, all tests must be626        client side.627        TODO: in the future we should support server control files directly628        to wrap with a kernel.  That'll require changing the parameter629        name and adding a boolean to indicate if it is a client or server630        control file.631    :param use_container unused argument today.  TODO: Enable containers632        on the host during a client side test.633    :param profile_only A boolean that indicates what default profile_only634        mode to use in the control file. Passing None will generate a635        control file that does not explcitly set the default mode at all.636    :param upload_kernel_config: if enabled it will generate server control637            file code that uploads the kernel config file to the client and638            tells the client of the new (local) path when compiling the kernel;639            the tests must be server side tests640    :return: a dict with the following keys:641        control_file: str, The control file text.642        is_server: bool, is the control file a server-side control file?643        synch_count: How many machines the job uses per autoserv execution.644            synch_count == 1 means the job is asynchronous.645        dependencies: A list of the names of labels on which the job depends.646    """647    if not tests and not client_control_file:648        return dict(control_file='', is_server=False, synch_count=1,649                    dependencies=[])650    cf_info, test_objects, profiler_objects, label = (651        rpc_utils.prepare_generate_control_file(tests, kernel, label,652                                                profilers))653    cf_info['control_file'] = control_file.generate_control(654        tests=test_objects, kernels=kernel, platform=label,655        profilers=profiler_objects, is_server=cf_info['is_server'],656        client_control_file=client_control_file, profile_only=profile_only,657        upload_kernel_config=upload_kernel_config)658    return cf_info659def create_parameterized_job(name, priority, test, parameters, kernel=None,660                             label=None, profiles=[], profilers=(),661                             profiler_parameters=None,662                             use_container=False, profile_only=None,663                             upload_kernel_config=False, hosts=[],664                             meta_hosts=[], meta_host_profiles=[], one_time_hosts=[],665                             atomic_group_name=None, synch_count=None,666                             is_template=False, timeout=None,667                             max_runtime_hrs=None, run_verify=True,668                             email_list='', dependencies=(), reboot_before=None,669                             reboot_after=None, parse_failed_repair=None,670                             hostless=False, keyvals=None, drone_set=None,671                             reserve_hosts=False):672    """673    Creates and enqueues a parameterized job.674    Most parameters a combination of the parameters for generate_control_file()675    and create_job(), with the exception of:676    :param test name or ID of the test to run677    :param parameters a map of parameter name ->678                          tuple of (param value, param type)679    :param profiler_parameters a dictionary of parameters for the profilers:680                                   key: profiler name681                                   value: dict of param name -> tuple of682                                                                (param value,683                                                                 param type)684    """685    # Save the values of the passed arguments here. What we're going to do with686    # them is pass them all to rpc_utils.get_create_job_common_args(), which687    # will extract the subset of these arguments that apply for688    # rpc_utils.create_job_common(), which we then pass in to that function.689    args = locals()690    # Set up the parameterized job configs691    test_obj = models.Test.smart_get(test)692    if test_obj.test_type == model_attributes.TestTypes.SERVER:693        control_type = models.Job.ControlType.SERVER694    else:695        control_type = models.Job.ControlType.CLIENT696    try:697        label = models.Label.smart_get(label)698    except models.Label.DoesNotExist:699        label = None700    kernel_objs = models.Kernel.create_kernels(kernel)701    profiler_objs = [models.Profiler.smart_get(profiler)702                     for profiler in profilers]703    parameterized_job = models.ParameterizedJob.objects.create(704        test=test_obj, label=label, use_container=use_container,705        profile_only=profile_only,706        upload_kernel_config=upload_kernel_config)707    parameterized_job.kernels.add(*kernel_objs)708    for profiler in profiler_objs:709        parameterized_profiler = models.ParameterizedJobProfiler.objects.create(710            parameterized_job=parameterized_job,711            profiler=profiler)712        profiler_params = profiler_parameters.get(profiler.name, {})713        for name, (value, param_type) in profiler_params.iteritems():714            models.ParameterizedJobProfilerParameter.objects.create(715                parameterized_job_profiler=parameterized_profiler,716                parameter_name=name,717                parameter_value=value,718                parameter_type=param_type)719    try:720        for parameter in test_obj.testparameter_set.all():721            if parameter.name in parameters:722                param_value, param_type = parameters.pop(parameter.name)723                parameterized_job.parameterizedjobparameter_set.create(724                    test_parameter=parameter, parameter_value=param_value,725                    parameter_type=param_type)726        if parameters:727            raise Exception('Extra parameters remain: %r' % parameters)728        return rpc_utils.create_job_common(729            parameterized_job=parameterized_job.id,730            control_type=control_type,731            **rpc_utils.get_create_job_common_args(args))732    except Exception:733        parameterized_job.delete()734        raise735def create_job(name, priority, control_file, control_type,736               hosts=[], profiles=[], meta_hosts=[], meta_host_profiles=[],737               one_time_hosts=[], atomic_group_name=None, synch_count=None,738               is_template=False, timeout=None, max_runtime_hrs=None,739               run_verify=True, email_list='', dependencies=(), reboot_before=None,740               reboot_after=None, parse_failed_repair=None, hostless=False,741               keyvals=None, drone_set=None, reserve_hosts=False):742    """743    Create and enqueue a job.744    :param name: name of this job745    :param priority: Low, Medium, High, Urgent746    :param control_file: String contents of the control file.747    :param control_type: Type of control file, Client or Server.748    :param synch_count: How many machines the job uses per autoserv execution.749                        synch_count == 1 means the job is asynchronous. If an750                        atomic group is given this value is treated as a751                        minimum.752    :param is_template: If true then create a template job.753    :param timeout: Hours after this call returns until the job times out.754    :param max_runtime_hrs: Hours from job starting time until job times out755    :param run_verify: Should the host be verified before running the test?756    :param email_list: String containing emails to mail when the job is done757    :param dependencies: List of label names on which this job depends758    :param reboot_before: Never, If dirty, or Always759    :param reboot_after: Never, If all tests passed, or Always760    :param parse_failed_repair: if true, results of failed repairs launched by761                                this job will be parsed as part of the job.762    :param hostless: if true, create a hostless job763    :param keyvals: dict of keyvals to associate with the job764    :param hosts: List of hosts to run job on.765    :param profiles: List of profiles to use, in sync with @hosts list766    :param meta_hosts: List where each entry is a label name, and for each767                       entry one host will be chosen from that label to run768                       the job on.769    :param one_time_hosts: List of hosts not in the database to run the job on.770    :param atomic_group_name: name of an atomic group to schedule the job on.771    :param drone_set: The name of the drone set to run this test on.772    :param reserve_hosts: If set we will reseve the hosts that were allocated773                          for this job774    :returns: The created Job id number.775    :rtype: integer776    """777    return rpc_utils.create_job_common(778        **rpc_utils.get_create_job_common_args(locals()))779def abort_host_queue_entries(**filter_data):780    """781    Abort a set of host queue entries.782    :param filter_data: Filters out which hosts.783    :return: None.784    """785    query = models.HostQueueEntry.query_objects(filter_data)786    query = query.filter(complete=False)787    models.AclGroup.check_abort_permissions(query)788    host_queue_entries = list(query.select_related())789    rpc_utils.check_abort_synchronous_jobs(host_queue_entries)790    for queue_entry in host_queue_entries:791        queue_entry.abort()792def reverify_hosts(**filter_data):793    """794    Schedules a set of hosts for verify.795    :param filter_data: Filters out which hosts.796    :return: A list of hostnames that a verify task was created for.797    """798    hosts = models.Host.query_objects(filter_data)799    models.AclGroup.check_for_acl_violation_hosts(hosts)800    for host in hosts:801        models.SpecialTask.schedule_special_task(host,802                                                 models.SpecialTask.Task.VERIFY)803    return list(sorted(host.hostname for host in hosts))804def get_jobs(not_yet_run=False, running=False, finished=False, **filter_data):805    """806    Extra filter args for get_jobs:807    -not_yet_run: Include only jobs that have not yet started running.808    -running: Include only jobs that have start running but for which not809    all hosts have completed.810    -finished: Include only jobs for which all hosts have completed (or811    aborted).812    At most one of these three fields should be specified.813    """814    filter_data['extra_args'] = rpc_utils.extra_job_filters(not_yet_run,815                                                            running,816                                                            finished)817    job_dicts = []818    jobs = list(models.Job.query_objects(filter_data))819    models.Job.objects.populate_relationships(jobs, models.Label,820                                              'dependencies')821    models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')822    for job in jobs:823        job_dict = job.get_object_dict()824        job_dict['dependencies'] = ','.join(label.name825                                            for label in job.dependencies)826        job_dict['keyvals'] = dict((keyval.key, keyval.value)827                                   for keyval in job.keyvals)828        job_dicts.append(job_dict)829    return rpc_utils.prepare_for_serialization(job_dicts)830def get_num_jobs(not_yet_run=False, running=False, finished=False,831                 **filter_data):832    """833    See get_jobs() for documentation of extra filter parameters.834    """835    filter_data['extra_args'] = rpc_utils.extra_job_filters(not_yet_run,836                                                            running,837                                                            finished)838    return models.Job.query_count(filter_data)839def get_jobs_summary(**filter_data):840    """841    Like get_jobs(), but adds a 'status_counts' field, which is a dictionary842    mapping status strings to the number of hosts currently with that843    status, i.e. {'Queued' : 4, 'Running' : 2}.844    """845    jobs = get_jobs(**filter_data)846    ids = [job['id'] for job in jobs]847    all_status_counts = models.Job.objects.get_status_counts(ids)848    for job in jobs:849        job['status_counts'] = all_status_counts[job['id']]850    return rpc_utils.prepare_for_serialization(jobs)851def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):852    """853    Retrieves all the information needed to clone a job.854    """855    job = models.Job.objects.get(id=id)856    job_info = rpc_utils.get_job_info(job,857                                      preserve_metahosts,858                                      queue_entry_filter_data)859    host_dicts = []860    for host, profile in zip(job_info['hosts'], job_info['profiles']):861        host_dict = get_hosts(id=host.id)[0]862        other_labels = host_dict['labels']863        if host_dict['platform']:864            other_labels.remove(host_dict['platform'])865        host_dict['other_labels'] = ', '.join(other_labels)866        host_dict['profile'] = profile867        host_dicts.append(host_dict)868    for host in job_info['one_time_hosts']:869        host_dict = dict(hostname=host.hostname,870                         id=host.id,871                         platform='(one-time host)',872                         locked_text='')873        host_dicts.append(host_dict)874    meta_host_dicts = []875    # convert keys from Label objects to strings (names of labels)876    meta_host_counts = dict((meta_host.name, count) for meta_host, count877                            in job_info['meta_host_counts'].iteritems())878    for meta_host, meta_host_profile in zip(job_info['meta_hosts'], job_info['meta_host_profiles']):879        meta_host_dict = dict(name=meta_host.name, count=meta_host_counts[meta_host.name], profile=meta_host_profile)880        meta_host_dicts.append(meta_host_dict)881    info = dict(job=job.get_object_dict(),882                meta_hosts=meta_host_dicts,883                hosts=host_dicts)884    info['job']['dependencies'] = job_info['dependencies']885    if job_info['atomic_group']:886        info['atomic_group_name'] = (job_info['atomic_group']).name887    else:888        info['atomic_group_name'] = None889    info['hostless'] = job_info['hostless']890    info['drone_set'] = job.drone_set and job.drone_set.name891    return rpc_utils.prepare_for_serialization(info)892# host queue entries893def get_host_queue_entries(**filter_data):894    """895    :return: A sequence of nested dictionaries of host and job information.896    """897    return rpc_utils.prepare_rows_as_nested_dicts(898        models.HostQueueEntry.query_objects(filter_data),899        ('host', 'atomic_group', 'job'))900def get_num_host_queue_entries(**filter_data):901    """902    Get the number of host queue entries associated with this job.903    """904    return models.HostQueueEntry.query_count(filter_data)905def get_hqe_percentage_complete(**filter_data):906    """907    Computes the fraction of host queue entries matching the given filter data908    that are complete.909    """910    query = models.HostQueueEntry.query_objects(filter_data)911    complete_count = query.filter(complete=True).count()912    total_count = query.count()913    if total_count == 0:914        return 1915    return float(complete_count) / total_count916# special tasks917def get_special_tasks(**filter_data):918    return rpc_utils.prepare_rows_as_nested_dicts(919        models.SpecialTask.query_objects(filter_data),920        ('host', 'queue_entry'))921# support for host detail view922def get_host_queue_entries_and_special_tasks(hostname, query_start=None,923                                             query_limit=None):924    """925    :return: an interleaved list of HostQueueEntries and SpecialTasks,926            in approximate run order.  each dict contains keys for type, host,927            job, status, started_on, execution_path, and ID.928    """929    total_limit = None930    if query_limit is not None:931        total_limit = query_start + query_limit932    filter_data = {'host__hostname': hostname,933                   'query_limit': total_limit,934                   'sort_by': ['-id']}935    queue_entries = list(models.HostQueueEntry.query_objects(filter_data))936    special_tasks = list(models.SpecialTask.query_objects(filter_data))937    interleaved_entries = rpc_utils.interleave_entries(queue_entries,938                                                       special_tasks)939    if query_start is not None:940        interleaved_entries = interleaved_entries[query_start:]941    if query_limit is not None:942        interleaved_entries = interleaved_entries[:query_limit]943    return rpc_utils.prepare_for_serialization(interleaved_entries)944def get_num_host_queue_entries_and_special_tasks(hostname):945    filter_data = {'host__hostname': hostname}946    return (models.HostQueueEntry.query_count(filter_data) +947            models.SpecialTask.query_count(filter_data))948# recurring run949def get_recurring(**filter_data):950    """951    Return recurring jobs.952    :param filter_data: Filters out which recurring jobs to get.953    :return: Sequence of recurring jobs.954    """955    return rpc_utils.prepare_rows_as_nested_dicts(956        models.RecurringRun.query_objects(filter_data),957        ('job', 'owner'))958def get_num_recurring(**filter_data):959    """960    Get the number of recurring jobs.961    :param filter_data: Filters out which recurring jobs to get.962    :return: Number of recurring jobs.963    """964    return models.RecurringRun.query_count(filter_data)965def delete_recurring_runs(**filter_data):966    """967    Delete recurring jobs.968    :param filter_data: Filters out which recurring jobs to delete.969    :return: None.970    """971    to_delete = models.RecurringRun.query_objects(filter_data)972    to_delete.delete()973def create_recurring_run(job_id, start_date, loop_period, loop_count):974    """975    Create (add) a recurring job.976    :param job_id: Job identification.977    :param start_date: Start date.978    :param loop_period: Loop period.979    :param loop_count: Loo counter.980    :return: None.981    """982    owner = models.User.current_user().login983    job = models.Job.objects.get(id=job_id)984    return job.create_recurring_job(start_date=start_date,985                                    loop_period=loop_period,986                                    loop_count=loop_count,987                                    owner=owner)988# other989def echo(data=""):990    """991    Echo - for doing a basic test to see if RPC calls992    can successfully be made.993    :param data: Object to echo, it must be serializable.994    :return: Object echoed back.995    """996    return data997def get_motd():998    """999    Returns the message of the day (MOTD).1000    :return: String with MOTD.1001    """1002    return rpc_utils.get_motd()1003def get_static_data():1004    """1005    Returns a dictionary containing a bunch of data that shouldn't change1006    often and is otherwise inaccessible.  This includes:1007    priorities: List of job priority choices.1008    default_priority: Default priority value for new jobs.1009    users: Sorted list of all users.1010    labels: Sorted list of all labels.1011    atomic_groups: Sorted list of all atomic groups.1012    tests: Sorted list of all tests.1013    profilers: Sorted list of all profilers.1014    current_user: Logged-in username.1015    host_statuses: Sorted list of possible Host statuses.1016    job_statuses: Sorted list of possible HostQueueEntry statuses.1017    job_timeout_default: The default job timeout length in hours.1018    parse_failed_repair_default: Default value for the parse_failed_repair job1019    option.1020    reboot_before_options: A list of valid RebootBefore string enums.1021    reboot_after_options: A list of valid RebootAfter string enums.1022    motd: Server's message of the day.1023    status_dictionary: A mapping from one word job status names to a more1024            informative description.1025    """1026    job_fields = models.Job.get_field_dict()1027    default_drone_set_name = models.DroneSet.default_drone_set_name()1028    drone_sets = ([default_drone_set_name] +1029                  sorted(drone_set.name for drone_set in1030                         models.DroneSet.objects.exclude(1031                             name=default_drone_set_name)))1032    result = {}1033    result['priorities'] = models.Job.Priority.choices()1034    default_priority = job_fields['priority'].default1035    default_string = models.Job.Priority.get_string(default_priority)1036    result['default_priority'] = default_string1037    result['users'] = get_users(sort_by=['login'])1038    result['labels'] = get_labels(sort_by=['-platform', 'name'])1039    result['atomic_groups'] = get_atomic_groups(sort_by=['name'])1040    result['tests'] = get_tests(sort_by=['name'])1041    result['profilers'] = get_profilers(sort_by=['name'])1042    result['current_user'] = rpc_utils.prepare_for_serialization(1043        models.User.current_user().get_object_dict())1044    result['host_statuses'] = sorted(models.Host.Status.names)1045    result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)1046    result['job_timeout_default'] = models.Job.DEFAULT_TIMEOUT1047    result['job_max_runtime_hrs_default'] = models.Job.DEFAULT_MAX_RUNTIME_HRS1048    result['parse_failed_repair_default'] = bool(1049        models.Job.DEFAULT_PARSE_FAILED_REPAIR)1050    result['reboot_before_options'] = model_attributes.RebootBefore.names1051    result['reboot_after_options'] = model_attributes.RebootAfter.names1052    result['motd'] = rpc_utils.get_motd()1053    result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()1054    result['drone_sets'] = drone_sets1055    result['parameterized_jobs'] = models.Job.parameterized_jobs_enabled()1056    result['status_dictionary'] = {"Aborted": "Aborted",1057                                   "Verifying": "Verifying Host",1058                                   "Pending": "Waiting on other hosts",1059                                   "Running": "Running autoserv",1060                                   "Completed": "Autoserv completed",1061                                   "Failed": "Failed to complete",1062                                   "Queued": "Queued",1063                                   "Starting": "Next in host's queue",1064                                   "Stopped": "Other host(s) failed verify",1065                                   "Parsing": "Awaiting parse of final results",1066                                   "Gathering": "Gathering log files",1067                                   "Template": "Template job for recurring run",1068                                   "Waiting": "Waiting for scheduler action",1069                                   "Archiving": "Archiving results"}1070    return result1071def get_server_time():1072    """1073    Return server current time.1074    :return: Date string in format YYYY-MM-DD HH:MM1075    """1076    return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")1077def get_version():1078    """1079    Return autotest version.1080    :return: String with version.1081    """1082    return version.get_version()1083def get_interface_version():1084    """1085    Return interface version.1086    :return: Sequence with year, month number, day.1087    """1088    return INTERFACE_VERSION1089def _get_logs_used_space():1090    """1091    (Internal) Return disk usage (percentage) for the results directory.1092    :return: Usage in percents (integer value).1093    """1094    logs_dir = settings.get_value('COMMON', 'test_output_dir', default=None)1095    autodir = os.path.abspath(os.path.join(os.path.dirname(__file__),1096                                           '..', '..'))1097    if logs_dir is None:1098        logs_dir = os.path.join(autodir, 'results')1099    usage = psutil.disk_usage(logs_dir)1100    return int(usage.percent)1101def _process_running(process_name):1102    """1103    (Internal) Return whether a given process name is running.1104    :param process_name: The name of the process.1105    :return: True (running) or False (no).1106    """1107    process_running = False1108    for p in psutil.process_iter():1109        for args in p.cmdline:1110            if os.path.basename(args) == process_name and p.is_running:1111                process_running = True1112    return process_running1113def get_server_status():1114    """1115    Get autotest server system information.1116    :return: Dict with keys:1117             * 'disk_space_percentage' Autotest log directory disk usage1118             * 'scheduler_running' Whether the autotest scheduler is running1119             * 'sheduler_watcher_running' Whether the scheduler watcher is1120                running1121             * 'concerns' Global evaluation of whether there are problems to1122                be addressed1123    """1124    server_status = {}1125    concerns = False1126    disk_treshold = int(settings.get_value('SERVER', 'logs_disk_usage_treshold',1127                                           default="80"))1128    used_space_logs = _get_logs_used_space()1129    if used_space_logs > disk_treshold:1130        concerns = True1131    server_status['used_space_logs'] = used_space_logs1132    scheduler_running = _process_running('autotest-scheduler')1133    if not scheduler_running:1134        concerns = True1135    server_status['scheduler_running'] = scheduler_running1136    watcher_running = _process_running('autotest-scheduler-watcher')1137    if not watcher_running:1138        concerns = True1139    server_status['scheduler_watcher_running'] = watcher_running1140    if settings.get_value('INSTALL_SERVER', 'xmlrpc_url', default=''):1141        install_server_running = get_install_server_profiles() is not None1142        if not install_server_running:1143            concerns = True1144    else:1145        install_server_running = False1146    server_status['install_server_running'] = install_server_running1147    server_status['concerns'] = concerns...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
