...47 query = models.TestView.apply_presentation(query, filter_data)48 group_processor = tko_rpc_utils.GroupDataProcessor(query, group_by,49 header_groups or [],50 fixed_headers or {})51 group_processor.process_group_dicts()52 return rpc_utils.prepare_for_serialization(group_processor.get_info_dict())53def get_num_groups(group_by, **filter_data):54 """55 Gets the count of unique groups with the given grouping fields.56 """57 query = models.TestView.objects.get_query_set_with_joins(filter_data)58 query = models.TestView.query_objects(filter_data, initial_query=query)59 return models.TestView.objects.get_num_groups(query, group_by)60def get_status_counts(group_by, header_groups=[], fixed_headers={},61 **filter_data):62 """63 Like get_group_counts, but also computes counts of passed, complete (and64 valid), and incomplete tests, stored in keys "pass_count', 'complete_count',65 and 'incomplete_count', respectively.66 """67 return get_group_counts(group_by, header_groups=header_groups,68 fixed_headers=fixed_headers,69 extra_select_fields=tko_rpc_utils.STATUS_FIELDS,70 **filter_data)71def get_latest_tests(group_by, header_groups=[], fixed_headers={},72 extra_info=[], **filter_data):73 """74 Similar to get_status_counts, but return only the latest test result per75 group. It still returns the same information (i.e. with pass count etc.)76 for compatibility. It includes an additional field "test_idx" with each77 group.78 @param extra_info a list containing the field names that should be returned79 with each cell. The fields are returned in the extra_info80 field of the return dictionary.81 """82 # find latest test per group83 initial_query = models.TestView.objects.get_query_set_with_joins(84 filter_data)85 query = models.TestView.query_objects(filter_data,86 initial_query=initial_query,87 apply_presentation=False)88 query = query.exclude(status__in=tko_rpc_utils._INVALID_STATUSES)89 query = query.extra(90 select={'latest_test_idx' : 'MAX(%s)' %91 models.TestView.objects.get_key_on_this_table('test_idx')})92 query = models.TestView.apply_presentation(query, filter_data)93 group_processor = tko_rpc_utils.GroupDataProcessor(query, group_by,94 header_groups,95 fixed_headers)96 group_processor.process_group_dicts()97 info = group_processor.get_info_dict()98 # fetch full info for these tests so we can access their statuses99 all_test_ids = [group['latest_test_idx'] for group in info['groups']]100 test_views = initial_query.in_bulk(all_test_ids)101 for group_dict in info['groups']:102 test_idx = group_dict.pop('latest_test_idx')103 group_dict['test_idx'] = test_idx104 test_view = test_views[test_idx]105 tko_rpc_utils.add_status_counts(group_dict, test_view.status)106 group_dict['extra_info'] = []107 for field in extra_info:108 group_dict['extra_info'].append(getattr(test_view, field))109 return rpc_utils.prepare_for_serialization(info)110def get_job_ids(**filter_data):...

...146 in zip(self._header_index_maps,147 group_dict['header_values'])]148 for field in self._group_by + ['header_values']:149 del group_dict[field]150 def process_group_dicts(self):151 self._fetch_data()152 if len(self._group_dicts) > self._MAX_GROUP_RESULTS:153 raise TooManyRowsError(154 'Query yielded %d rows, exceeding maximum %d' % (155 len(self._group_dicts), self._MAX_GROUP_RESULTS))156 for group_dict in self._group_dicts:157 self._process_group_dict(group_dict)158 self._header_values = self._get_sorted_header_values()159 if self._header_groups:160 for group_dict in self._group_dicts:161 self._replace_headers_with_indices(group_dict)162 def get_info_dict(self):163 return {'groups' : self._group_dicts,164 'header_values' : self._header_values}

