Best Python code snippet using autotest_python
rpc_interface.py
Source:rpc_interface.py  
...13#14INTERFACE_VERSION = (2013, 05, 23)15# table/spreadsheet view support16def get_test_views(**filter_data):17    return rpc_utils.prepare_for_serialization(18        models.TestView.list_objects(filter_data))19def get_num_test_views(**filter_data):20    return models.TestView.query_count(filter_data)21def get_group_counts(group_by, header_groups=None, fixed_headers=None,22                     extra_select_fields=None, **filter_data):23    """24    Queries against TestView grouping by the specified fields and computings25    counts for each group.26    * group_by should be a list of field names.27    * extra_select_fields can be used to specify additional fields to select28      (usually for aggregate functions).29    * header_groups can be used to get lists of unique combinations of group30      fields.  It should be a list of tuples of fields from group_by.  It's31      primarily for use by the spreadsheet view.32    * fixed_headers can map header fields to lists of values.  the header will33      guaranteed to return exactly those value.  this does not work together34      with header_groups.35    Returns a dictionary with two keys:36    * header_values contains a list of lists, one for each header group in37      header_groups.  Each list contains all the values for the corresponding38      header group as tuples.39    * groups contains a list of dicts, one for each row.  Each dict contains40      keys for each of the group_by fields, plus a 'group_count' key for the41      total count in the group, plus keys for each of the extra_select_fields.42      The keys for the extra_select_fields are determined by the "AS" alias of43      the field.44    """45    query = models.TestView.objects.get_query_set_with_joins(filter_data)46    # don't apply presentation yet, since we have extra selects to apply47    query = models.TestView.query_objects(filter_data, initial_query=query,48                                          apply_presentation=False)49    count_alias, count_sql = models.TestView.objects.get_count_sql(query)50    query = query.extra(select={count_alias: count_sql})51    if extra_select_fields:52        query = query.extra(select=extra_select_fields)53    query = models.TestView.apply_presentation(query, filter_data)54    group_processor = tko_rpc_utils.GroupDataProcessor(query, group_by,55                                                       header_groups or [],56                                                       fixed_headers or {})57    group_processor.process_group_dicts()58    return rpc_utils.prepare_for_serialization(group_processor.get_info_dict())59def get_num_groups(group_by, **filter_data):60    """61    Gets the count of unique groups with the given grouping fields.62    """63    query = models.TestView.objects.get_query_set_with_joins(filter_data)64    query = models.TestView.query_objects(filter_data, initial_query=query)65    return models.TestView.objects.get_num_groups(query, group_by)66def get_status_counts(group_by, header_groups=[], fixed_headers={},67                      **filter_data):68    """69    Like get_group_counts, but also computes counts of passed, complete (and70    valid), and incomplete tests, stored in keys "pass_count', 'complete_count',71    and 'incomplete_count', respectively.72    """73    return get_group_counts(group_by, header_groups=header_groups,74                            fixed_headers=fixed_headers,75                            extra_select_fields=tko_rpc_utils.STATUS_FIELDS,76                            **filter_data)77def get_latest_tests(group_by, header_groups=[], fixed_headers={},78                     extra_info=[], **filter_data):79    """80    Similar to get_status_counts, but return only the latest test result per81    group.  It still returns the same information (i.e. with pass count etc.)82    for compatibility.  It includes an additional field "test_idx" with each83    group.84    :param extra_info a list containing the field names that should be returned85                      with each cell. The fields are returned in the extra_info86                      field of the return dictionary.87    """88    # find latest test per group89    initial_query = models.TestView.objects.get_query_set_with_joins(90        filter_data)91    query = models.TestView.query_objects(filter_data,92                                          initial_query=initial_query,93                                          apply_presentation=False)94    query = query.exclude(status__in=tko_rpc_utils._INVALID_STATUSES)95    query = query.extra(96        select={'latest_test_idx': 'MAX(%s)' %97                models.TestView.objects.get_key_on_this_table('test_idx')})98    query = models.TestView.apply_presentation(query, filter_data)99    group_processor = tko_rpc_utils.GroupDataProcessor(query, group_by,100                                                       header_groups,101                                                       fixed_headers)102    group_processor.process_group_dicts()103    info = group_processor.get_info_dict()104    # fetch full info for these tests so we can access their statuses105    all_test_ids = [group['latest_test_idx'] for group in info['groups']]106    test_views = initial_query.in_bulk(all_test_ids)107    for group_dict in info['groups']:108        test_idx = group_dict.pop('latest_test_idx')109        group_dict['test_idx'] = test_idx110        test_view = test_views[test_idx]111        tko_rpc_utils.add_status_counts(group_dict, test_view.status)112        group_dict['extra_info'] = []113        for field in extra_info:114            group_dict['extra_info'].append(getattr(test_view, field))115    return rpc_utils.prepare_for_serialization(info)116def get_job_ids(**filter_data):117    """118    Returns AFE job IDs for all tests matching the filters.119    """120    query = models.TestView.query_objects(filter_data)121    job_ids = set()122    for test_view in query.values('job_tag').distinct():123        # extract job ID from tag124        first_tag_component = test_view['job_tag'].split('-')[0]125        try:126            job_id = int(first_tag_component)127            job_ids.add(job_id)128        except ValueError:129            # a nonstandard job tag, i.e. from contributed results130            pass131    return list(job_ids)132# test detail view133def _attributes_to_dict(attribute_list):134    return dict((attribute.attribute, attribute.value)135                for attribute in attribute_list)136def _iteration_attributes_to_dict(attribute_list):137    iter_keyfunc = operator.attrgetter('iteration')138    attribute_list.sort(key=iter_keyfunc)139    iterations = {}140    for key, group in itertools.groupby(attribute_list, iter_keyfunc):141        iterations[key] = _attributes_to_dict(group)142    return iterations143def _format_iteration_keyvals(test):144    iteration_attr = _iteration_attributes_to_dict(test.iteration_attributes)145    iteration_perf = _iteration_attributes_to_dict(test.iteration_results)146    all_iterations = iteration_attr.keys() + iteration_perf.keys()147    max_iterations = max(all_iterations + [0])148    # merge the iterations into a single list of attr & perf dicts149    return [{'attr': iteration_attr.get(index, {}),150             'perf': iteration_perf.get(index, {})}151            for index in xrange(1, max_iterations + 1)]152def _job_keyvals_to_dict(keyvals):153    return dict((keyval.key, keyval.value) for keyval in keyvals)154def get_detailed_test_views(**filter_data):155    test_views = models.TestView.list_objects(filter_data)156    tests_by_id = models.Test.objects.in_bulk([test_view['test_idx']157                                               for test_view in test_views])158    tests = tests_by_id.values()159    models.Test.objects.populate_relationships(tests, models.TestAttribute,160                                               'attributes')161    models.Test.objects.populate_relationships(tests, models.IterationAttribute,162                                               'iteration_attributes')163    models.Test.objects.populate_relationships(tests, models.IterationResult,164                                               'iteration_results')165    models.Test.objects.populate_relationships(tests, models.TestLabel,166                                               'labels')167    jobs_by_id = models.Job.objects.in_bulk([test_view['job_idx']168                                             for test_view in test_views])169    jobs = jobs_by_id.values()170    models.Job.objects.populate_relationships(jobs, models.JobKeyval,171                                              'keyvals')172    for test_view in test_views:173        test = tests_by_id[test_view['test_idx']]174        test_view['attributes'] = _attributes_to_dict(test.attributes)175        test_view['iterations'] = _format_iteration_keyvals(test)176        test_view['labels'] = [label.name for label in test.labels]177        job = jobs_by_id[test_view['job_idx']]178        test_view['job_keyvals'] = _job_keyvals_to_dict(job.keyvals)179    return rpc_utils.prepare_for_serialization(test_views)180# graphing view support181def get_hosts_and_tests():182    """183    Gets every host that has had a benchmark run on it. Additionally, also184    gets a dictionary mapping the host names to the benchmarks.185    """186    host_info = {}187    q = (dbmodels.Q(test_name__startswith='kernbench') |188         dbmodels.Q(test_name__startswith='dbench') |189         dbmodels.Q(test_name__startswith='tbench') |190         dbmodels.Q(test_name__startswith='unixbench') |191         dbmodels.Q(test_name__startswith='iozone'))192    test_query = models.TestView.objects.filter(q).values(193        'test_name', 'hostname', 'machine_idx').distinct()194    for result_dict in test_query:195        hostname = result_dict['hostname']196        test = result_dict['test_name']197        machine_idx = result_dict['machine_idx']198        host_info.setdefault(hostname, {})199        host_info[hostname].setdefault('tests', [])200        host_info[hostname]['tests'].append(test)201        host_info[hostname]['id'] = machine_idx202    return rpc_utils.prepare_for_serialization(host_info)203def create_metrics_plot(queries, plot, invert, drilldown_callback,204                        normalize=None):205    return graphing_utils.create_metrics_plot(206        queries, plot, invert, normalize, drilldown_callback=drilldown_callback)207def create_qual_histogram(query, filter_string, interval, drilldown_callback):208    return graphing_utils.create_qual_histogram(209        query, filter_string, interval, drilldown_callback=drilldown_callback)210# TODO(showard) - this extremely generic RPC is used only by one place in the211# client.  We should come up with a more opaque RPC for that place to call and212# get rid of this.213def execute_query_with_param(query, param):214    cursor = readonly_connection.connection().cursor()215    cursor.execute(query, param)216    return cursor.fetchall()217def get_preconfig(name, type):218    return preconfigs.manager.get_preconfig(name, type)219def get_embedding_id(url_token, graph_type, params):220    try:221        model = models.EmbeddedGraphingQuery.objects.get(url_token=url_token)222    except models.EmbeddedGraphingQuery.DoesNotExist:223        params_str = pickle.dumps(params)224        now = datetime.datetime.now()225        # pylint: disable=E1123226        model = models.EmbeddedGraphingQuery(url_token=url_token,227                                             graph_type=graph_type,228                                             params=params_str,229                                             last_updated=now)230        model.cached_png = graphing_utils.create_embedded_plot(model,231                                                               now.ctime())232        model.save()233    return model.id234def get_embedded_query_url_token(id):235    model = models.EmbeddedGraphingQuery.objects.get(id=id)236    return model.url_token237# test label management238def add_test_label(name, description=None):239    return models.TestLabel.add_object(name=name, description=description).id240def modify_test_label(label_id, **data):241    models.TestLabel.smart_get(label_id).update_object(data)242def delete_test_label(label_id):243    models.TestLabel.smart_get(label_id).delete()244def get_test_labels(**filter_data):245    return rpc_utils.prepare_for_serialization(246        models.TestLabel.list_objects(filter_data))247def get_test_labels_for_tests(**test_filter_data):248    label_ids = models.TestView.objects.query_test_label_ids(test_filter_data)249    labels = models.TestLabel.list_objects({'id__in': label_ids})250    return rpc_utils.prepare_for_serialization(labels)251def test_label_add_tests(label_id, **test_filter_data):252    test_ids = models.TestView.objects.query_test_ids(test_filter_data)253    models.TestLabel.smart_get(label_id).tests.add(*test_ids)254def test_label_remove_tests(label_id, **test_filter_data):255    label = models.TestLabel.smart_get(label_id)256    # only include tests that actually have this label257    extra_where = test_filter_data.get('extra_where', '')258    if extra_where:259        extra_where = '(' + extra_where + ') AND '260    extra_where += 'tko_test_labels.id = %s' % label.id261    test_filter_data['extra_where'] = extra_where262    test_ids = models.TestView.objects.query_test_ids(test_filter_data)263    label.tests.remove(*test_ids)264# user-created test attributes265def set_test_attribute(attribute, value, **test_filter_data):266    """267    * attribute - string name of attribute268    * value - string, or None to delete an attribute269    * test_filter_data - filter data to apply to TestView to choose tests to act270      upon271    """272    assert test_filter_data  # disallow accidental actions on all hosts273    test_ids = models.TestView.objects.query_test_ids(test_filter_data)274    tests = models.Test.objects.in_bulk(test_ids)275    for test in tests.itervalues():276        test.set_or_delete_attribute(attribute, value)277# saved queries278def get_saved_queries(**filter_data):279    return rpc_utils.prepare_for_serialization(280        models.SavedQuery.list_objects(filter_data))281def add_saved_query(name, url_token):282    name = name.strip()283    owner = afe_models.User.current_user().login284    existing_list = list(models.SavedQuery.objects.filter(owner=owner,285                                                          name=name))286    if existing_list:287        query_object = existing_list[0]288        query_object.url_token = url_token289        query_object.save()290        return query_object.id291    return models.SavedQuery.add_object(owner=owner, name=name,292                                        url_token=url_token).id293def delete_saved_queries(id_list):294    user = afe_models.User.current_user().login295    query = models.SavedQuery.objects.filter(id__in=id_list, owner=user)296    if query.count() == 0:297        raise model_logic.ValidationError('No such queries found for this user')298    query.delete()299# other300def get_motd():301    return rpc_utils.get_motd()302def get_static_data():303    result = {}304    group_fields = []305    for field in models.TestView.group_fields:306        if field in models.TestView.extra_fields:307            name = models.TestView.extra_fields[field]308        else:309            name = models.TestView.get_field_dict()[field].verbose_name310        group_fields.append((name.capitalize(), field))311    model_fields = [(field.verbose_name.capitalize(), field.column)312                    for field in models.TestView._meta.fields]313    extra_fields = [(field_name.capitalize(), field_sql)314                    for field_sql, field_name315                    in models.TestView.extra_fields.iteritems()]316    benchmark_key = {317        'kernbench': 'elapsed',318        'dbench': 'throughput',319        'tbench': 'throughput',320        'unixbench': 'score',321        'iozone': '32768-4096-fwrite'322    }323    tko_perf_view = [324        ['Test Index', 'test_idx'],325        ['Job Index', 'job_idx'],326        ['Test Name', 'test_name'],327        ['Subdirectory', 'subdir'],328        ['Kernel Index', 'kernel_idx'],329        ['Status Index', 'status_idx'],330        ['Reason', 'reason'],331        ['Host Index', 'machine_idx'],332        ['Test Started Time', 'test_started_time'],333        ['Test Finished Time', 'test_finished_time'],334        ['Job Tag', 'job_tag'],335        ['Job Name', 'job_name'],336        ['Owner', 'job_owner'],337        ['Job Queued Time', 'job_queued_time'],338        ['Job Started Time', 'job_started_time'],339        ['Job Finished Time', 'job_finished_time'],340        ['Hostname', 'hostname'],341        ['Platform', 'platform'],342        ['Machine Owner', 'machine_owner'],343        ['Kernel Hash', 'kernel_hash'],344        ['Kernel Base', 'kernel_base'],345        ['Kernel', 'kernel'],346        ['Status', 'status'],347        ['Iteration Number', 'iteration'],348        ['Performance Keyval (Key)', 'iteration_key'],349        ['Performance Keyval (Value)', 'iteration_value'],350    ]351    result['group_fields'] = sorted(group_fields)352    result['all_fields'] = sorted(model_fields + extra_fields)353    result['test_labels'] = get_test_labels(sort_by=['name'])354    result['current_user'] = rpc_utils.prepare_for_serialization(355        afe_models.User.current_user().get_object_dict())356    result['benchmark_key'] = benchmark_key357    result['tko_perf_view'] = tko_perf_view358    result['tko_test_view'] = model_fields359    result['preconfigs'] = preconfigs.manager.all_preconfigs()360    result['motd'] = rpc_utils.get_motd()361    return result362# lower level access to tko models363def get_machines(**filter_data):364    return rpc_utils.prepare_for_serialization(365        models.Machine.list_objects(filter_data))366def get_kernels(**filter_data):367    return rpc_utils.prepare_for_serialization(368        models.Kernel.list_objects(filter_data))369def get_patches(**filter_data):370    return rpc_utils.prepare_for_serialization(371        models.Patch.list_objects(filter_data))372def get_statuses(**filter_data):373    return rpc_utils.prepare_for_serialization(374        models.Status.list_objects(filter_data))375def get_jobs(**filter_data):376    return rpc_utils.prepare_for_serialization(377        models.Job.list_objects(filter_data))378def get_job_keyvals(**filter_data):379    return rpc_utils.prepare_for_serialization(380        models.JobKeyval.list_objects(filter_data))381def get_tests(**filter_data):382    return rpc_utils.prepare_for_serialization(383        models.Test.list_objects(filter_data))384def get_test_attributes(**filter_data):385    return rpc_utils.prepare_for_serialization(386        models.TestAttribute.list_objects(filter_data))387def get_iteration_attributes(**filter_data):388    return rpc_utils.prepare_for_serialization(389        models.IterationAttribute.list_objects(filter_data))390def get_iteration_results(**filter_data):391    return rpc_utils.prepare_for_serialization(392        models.IterationResult.list_objects(filter_data))393def get_interface_version():...controller.py
Source:controller.py  
...12        self.inventory_client = inventory_client13        self.jobs_collection = jobs_collection14        self.tasks_collection = tasks_collection15    @staticmethod16    def prepare_for_serialization(obj):17        """Converts object_id to a string and a datetime object to ctime format18        :param obj: probably a task or a job document19        :return: the object reference20        """21        serialize_object_id(obj)22        if obj.get('ttl_time_completed'):23            obj['ttl_time_completed'] = obj['ttl_time_completed'].ctime()24        return obj25    @async_endpoint('get_job')26    async def get_job(self, job_id, projection=None):27        """Gets a job from the job_collection. Jobs expire quickly.28        :param job_id: The Id of the job to get29        :param projection: A mongodb projection. https://goo.gl/kB2g2630        :return: A job object31        """32        job = await self.jobs_collection.find_one({'job_id': job_id},33                                                  projection=projection)34        if not job:35            return36        return self.prepare_for_serialization(job)37    @async_endpoint('get_job_status')38    async def get_job_status(self, job_id):39        """Get the status of job tasks40        :param job_id: the id of a job41        :return: Job object contain task status objects or None42        """43        error_states = ['ERROR', 'TIMEOUT', 'EXCEPTION']44        job = await self.jobs_collection.find_one({'job_id': job_id})45        if not job:46            return47        tasks = self.tasks_collection.find(48            {'job_id': job_id}, {'task_id': 1, 'status': 1, '_id': 0})49        job['has_failures'] = False50        job['tasks'] = []51        async for task in tasks:52            job['tasks'].append(serialize_object_id(task))53            if task['status'] in error_states:54                job['has_failures'] = True55        return self.prepare_for_serialization(job)56    @async_endpoint('get_job_tasks')57    async def get_job_tasks(self, job_id, projection=None):58        """Get tasks belonging to a job59        :param job_id: The id of a job (UUID)60        :param projection: A mongodb projection. https://goo.gl/kB2g2661        :return: dictionary containing top level keys count and jobs62        """63        c = self.tasks_collection.find({'job_id': job_id},64                                       projection=projection)65        count = await c.count()66        tasks = []67        async for task in c:68            tasks.append(self.prepare_for_serialization(task))69        return {'count': count, 'tasks': tasks}70    @async_endpoint('get_task')71    async def get_task(self, task_id):72        """Get a single task73        :param task_id: The id of the task (UUID)74        :return: The task object (dict)75        """76        task = await self.tasks_collection.find_one({'task_id': task_id})77        if not task:78            return79        return self.prepare_for_serialization(task)80    @async_endpoint('get_active_tasks_by_mercury_id')81    async def get_active_tasks_by_mercury_id(self, mercury_id):82        """Gets all active tasks associated with specified mercury id83        :param mercury_id:84        :return: dictionary containing tasks list and count85        """86        c = self.tasks_collection.find({'mercury_id': mercury_id,87                                        'time_completed': None})88        count = await c.count()89        tasks = []90        async for task in c:91            tasks.append(self.prepare_for_serialization(task))92        return {'count': count, 'tasks': tasks}93    @async_endpoint('get_jobs')94    async def get_jobs(self, projection=None):95        """Get active jobs. The jobs collection is made ephemeral via a ttl key;96        this collection should not grow very large97        :param projection: A mongodb projection. https://goo.gl/kB2g2698        :return: dictionary containing top level keys count and jobs99        """100        projection = projection or {'instruction': 0}101        c = self.jobs_collection.find({}, projection=projection).sort(102            'time_created', 1)103        count = await c.count()104        jobs = []105        async for job in c:106            jobs.append(self.prepare_for_serialization(job))107        return {'count': count, 'jobs': jobs}108    @async_endpoint('create_job')109    async def create_job(self, query, instruction):110        """Create a job111        :param query: Query representing targets of the instruction112        :param instruction: An instruction or preprocessor directive. See the113        full documentation regarding instruction syntax at114        http://jr0d.github.io/mercury_api115        :raises EndpointException: Raised after catching a MercuryUserError as116        to conform to dispatch semantics117        :return: The job_id or None118        """119        # Add a constraint to the query that ensures we only match 'active'120        # devices (devices that are accessible through an agent)...mixins.py
Source:mixins.py  
...9        bool,10        dict,11        type(None),12    )13    def prepare_for_serialization(self):14        class_name = self.__class__.__name__15        dict_ = {}16        for k, v in self.__dict__.items():17            if type(v) in self.serializable_types:18                dict_[k] = v19            elif isinstance(v, Jsonable):20                dict_[k] = v.prepare_for_serialization()21            else:22                raise ValueError('{} is not Serializable!'.format(v))23        return {'class_name': class_name, 'dict': dict_}24    def to_json(self):25        return json.dumps(self.prepare_for_serialization(), indent=4)26    @classmethod27    def from_json(cls, json_str):28        json_dict = json.loads(json_str)29        string_class_name = json_dict['class_name']30        current_class_name = cls.__name__31        if string_class_name == current_class_name:32            return cls(json_dict)33        else:34            raise ValueError('type {} is different to type {}!'.format(string_class_name, current_class_name))35    def to_xml(self):36        return self.prepare_for_serialization()37class Panda(Jsonable):38    def __init__(self, name):39        self.name = name40    def __eq__(self, other):41        return self.__class__.__name__ == other.__class__.__name__42class Person(Jsonable):43    def __init__(self, name):44        self.name = name45    def __eq__(self, other):46        return self.name == other.name47p = Panda(name='Ivo')48# print(p.to_json())49# json_string = p.to_json()50print(p.to_xml())...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
