How to use prepare_for_serialization method in autotest

Best Python code snippet using autotest_python

rpc_interface.py

Source:rpc_interface.py Github

copy

Full Screen

...13#14INTERFACE_VERSION = (2013, 05, 23)15# table/spreadsheet view support16def get_test_views(**filter_data):17 return rpc_utils.prepare_for_serialization(18 models.TestView.list_objects(filter_data))19def get_num_test_views(**filter_data):20 return models.TestView.query_count(filter_data)21def get_group_counts(group_by, header_groups=None, fixed_headers=None,22 extra_select_fields=None, **filter_data):23 """24 Queries against TestView grouping by the specified fields and computings25 counts for each group.26 * group_by should be a list of field names.27 * extra_select_fields can be used to specify additional fields to select28 (usually for aggregate functions).29 * header_groups can be used to get lists of unique combinations of group30 fields. It should be a list of tuples of fields from group_by. It's31 primarily for use by the spreadsheet view.32 * fixed_headers can map header fields to lists of values. the header will33 guaranteed to return exactly those value. this does not work together34 with header_groups.35 Returns a dictionary with two keys:36 * header_values contains a list of lists, one for each header group in37 header_groups. Each list contains all the values for the corresponding38 header group as tuples.39 * groups contains a list of dicts, one for each row. Each dict contains40 keys for each of the group_by fields, plus a 'group_count' key for the41 total count in the group, plus keys for each of the extra_select_fields.42 The keys for the extra_select_fields are determined by the "AS" alias of43 the field.44 """45 query = models.TestView.objects.get_query_set_with_joins(filter_data)46 # don't apply presentation yet, since we have extra selects to apply47 query = models.TestView.query_objects(filter_data, initial_query=query,48 apply_presentation=False)49 count_alias, count_sql = models.TestView.objects.get_count_sql(query)50 query = query.extra(select={count_alias: count_sql})51 if extra_select_fields:52 query = query.extra(select=extra_select_fields)53 query = models.TestView.apply_presentation(query, filter_data)54 group_processor = tko_rpc_utils.GroupDataProcessor(query, group_by,55 header_groups or [],56 fixed_headers or {})57 group_processor.process_group_dicts()58 return rpc_utils.prepare_for_serialization(group_processor.get_info_dict())59def get_num_groups(group_by, **filter_data):60 """61 Gets the count of unique groups with the given grouping fields.62 """63 query = models.TestView.objects.get_query_set_with_joins(filter_data)64 query = models.TestView.query_objects(filter_data, initial_query=query)65 return models.TestView.objects.get_num_groups(query, group_by)66def get_status_counts(group_by, header_groups=[], fixed_headers={},67 **filter_data):68 """69 Like get_group_counts, but also computes counts of passed, complete (and70 valid), and incomplete tests, stored in keys "pass_count', 'complete_count',71 and 'incomplete_count', respectively.72 """73 return get_group_counts(group_by, header_groups=header_groups,74 fixed_headers=fixed_headers,75 extra_select_fields=tko_rpc_utils.STATUS_FIELDS,76 **filter_data)77def get_latest_tests(group_by, header_groups=[], fixed_headers={},78 extra_info=[], **filter_data):79 """80 Similar to get_status_counts, but return only the latest test result per81 group. It still returns the same information (i.e. with pass count etc.)82 for compatibility. It includes an additional field "test_idx" with each83 group.84 :param extra_info a list containing the field names that should be returned85 with each cell. The fields are returned in the extra_info86 field of the return dictionary.87 """88 # find latest test per group89 initial_query = models.TestView.objects.get_query_set_with_joins(90 filter_data)91 query = models.TestView.query_objects(filter_data,92 initial_query=initial_query,93 apply_presentation=False)94 query = query.exclude(status__in=tko_rpc_utils._INVALID_STATUSES)95 query = query.extra(96 select={'latest_test_idx': 'MAX(%s)' %97 models.TestView.objects.get_key_on_this_table('test_idx')})98 query = models.TestView.apply_presentation(query, filter_data)99 group_processor = tko_rpc_utils.GroupDataProcessor(query, group_by,100 header_groups,101 fixed_headers)102 group_processor.process_group_dicts()103 info = group_processor.get_info_dict()104 # fetch full info for these tests so we can access their statuses105 all_test_ids = [group['latest_test_idx'] for group in info['groups']]106 test_views = initial_query.in_bulk(all_test_ids)107 for group_dict in info['groups']:108 test_idx = group_dict.pop('latest_test_idx')109 group_dict['test_idx'] = test_idx110 test_view = test_views[test_idx]111 tko_rpc_utils.add_status_counts(group_dict, test_view.status)112 group_dict['extra_info'] = []113 for field in extra_info:114 group_dict['extra_info'].append(getattr(test_view, field))115 return rpc_utils.prepare_for_serialization(info)116def get_job_ids(**filter_data):117 """118 Returns AFE job IDs for all tests matching the filters.119 """120 query = models.TestView.query_objects(filter_data)121 job_ids = set()122 for test_view in query.values('job_tag').distinct():123 # extract job ID from tag124 first_tag_component = test_view['job_tag'].split('-')[0]125 try:126 job_id = int(first_tag_component)127 job_ids.add(job_id)128 except ValueError:129 # a nonstandard job tag, i.e. from contributed results130 pass131 return list(job_ids)132# test detail view133def _attributes_to_dict(attribute_list):134 return dict((attribute.attribute, attribute.value)135 for attribute in attribute_list)136def _iteration_attributes_to_dict(attribute_list):137 iter_keyfunc = operator.attrgetter('iteration')138 attribute_list.sort(key=iter_keyfunc)139 iterations = {}140 for key, group in itertools.groupby(attribute_list, iter_keyfunc):141 iterations[key] = _attributes_to_dict(group)142 return iterations143def _format_iteration_keyvals(test):144 iteration_attr = _iteration_attributes_to_dict(test.iteration_attributes)145 iteration_perf = _iteration_attributes_to_dict(test.iteration_results)146 all_iterations = iteration_attr.keys() + iteration_perf.keys()147 max_iterations = max(all_iterations + [0])148 # merge the iterations into a single list of attr & perf dicts149 return [{'attr': iteration_attr.get(index, {}),150 'perf': iteration_perf.get(index, {})}151 for index in xrange(1, max_iterations + 1)]152def _job_keyvals_to_dict(keyvals):153 return dict((keyval.key, keyval.value) for keyval in keyvals)154def get_detailed_test_views(**filter_data):155 test_views = models.TestView.list_objects(filter_data)156 tests_by_id = models.Test.objects.in_bulk([test_view['test_idx']157 for test_view in test_views])158 tests = tests_by_id.values()159 models.Test.objects.populate_relationships(tests, models.TestAttribute,160 'attributes')161 models.Test.objects.populate_relationships(tests, models.IterationAttribute,162 'iteration_attributes')163 models.Test.objects.populate_relationships(tests, models.IterationResult,164 'iteration_results')165 models.Test.objects.populate_relationships(tests, models.TestLabel,166 'labels')167 jobs_by_id = models.Job.objects.in_bulk([test_view['job_idx']168 for test_view in test_views])169 jobs = jobs_by_id.values()170 models.Job.objects.populate_relationships(jobs, models.JobKeyval,171 'keyvals')172 for test_view in test_views:173 test = tests_by_id[test_view['test_idx']]174 test_view['attributes'] = _attributes_to_dict(test.attributes)175 test_view['iterations'] = _format_iteration_keyvals(test)176 test_view['labels'] = [label.name for label in test.labels]177 job = jobs_by_id[test_view['job_idx']]178 test_view['job_keyvals'] = _job_keyvals_to_dict(job.keyvals)179 return rpc_utils.prepare_for_serialization(test_views)180# graphing view support181def get_hosts_and_tests():182 """183 Gets every host that has had a benchmark run on it. Additionally, also184 gets a dictionary mapping the host names to the benchmarks.185 """186 host_info = {}187 q = (dbmodels.Q(test_name__startswith='kernbench') |188 dbmodels.Q(test_name__startswith='dbench') |189 dbmodels.Q(test_name__startswith='tbench') |190 dbmodels.Q(test_name__startswith='unixbench') |191 dbmodels.Q(test_name__startswith='iozone'))192 test_query = models.TestView.objects.filter(q).values(193 'test_name', 'hostname', 'machine_idx').distinct()194 for result_dict in test_query:195 hostname = result_dict['hostname']196 test = result_dict['test_name']197 machine_idx = result_dict['machine_idx']198 host_info.setdefault(hostname, {})199 host_info[hostname].setdefault('tests', [])200 host_info[hostname]['tests'].append(test)201 host_info[hostname]['id'] = machine_idx202 return rpc_utils.prepare_for_serialization(host_info)203def create_metrics_plot(queries, plot, invert, drilldown_callback,204 normalize=None):205 return graphing_utils.create_metrics_plot(206 queries, plot, invert, normalize, drilldown_callback=drilldown_callback)207def create_qual_histogram(query, filter_string, interval, drilldown_callback):208 return graphing_utils.create_qual_histogram(209 query, filter_string, interval, drilldown_callback=drilldown_callback)210# TODO(showard) - this extremely generic RPC is used only by one place in the211# client. We should come up with a more opaque RPC for that place to call and212# get rid of this.213def execute_query_with_param(query, param):214 cursor = readonly_connection.connection().cursor()215 cursor.execute(query, param)216 return cursor.fetchall()217def get_preconfig(name, type):218 return preconfigs.manager.get_preconfig(name, type)219def get_embedding_id(url_token, graph_type, params):220 try:221 model = models.EmbeddedGraphingQuery.objects.get(url_token=url_token)222 except models.EmbeddedGraphingQuery.DoesNotExist:223 params_str = pickle.dumps(params)224 now = datetime.datetime.now()225 # pylint: disable=E1123226 model = models.EmbeddedGraphingQuery(url_token=url_token,227 graph_type=graph_type,228 params=params_str,229 last_updated=now)230 model.cached_png = graphing_utils.create_embedded_plot(model,231 now.ctime())232 model.save()233 return model.id234def get_embedded_query_url_token(id):235 model = models.EmbeddedGraphingQuery.objects.get(id=id)236 return model.url_token237# test label management238def add_test_label(name, description=None):239 return models.TestLabel.add_object(name=name, description=description).id240def modify_test_label(label_id, **data):241 models.TestLabel.smart_get(label_id).update_object(data)242def delete_test_label(label_id):243 models.TestLabel.smart_get(label_id).delete()244def get_test_labels(**filter_data):245 return rpc_utils.prepare_for_serialization(246 models.TestLabel.list_objects(filter_data))247def get_test_labels_for_tests(**test_filter_data):248 label_ids = models.TestView.objects.query_test_label_ids(test_filter_data)249 labels = models.TestLabel.list_objects({'id__in': label_ids})250 return rpc_utils.prepare_for_serialization(labels)251def test_label_add_tests(label_id, **test_filter_data):252 test_ids = models.TestView.objects.query_test_ids(test_filter_data)253 models.TestLabel.smart_get(label_id).tests.add(*test_ids)254def test_label_remove_tests(label_id, **test_filter_data):255 label = models.TestLabel.smart_get(label_id)256 # only include tests that actually have this label257 extra_where = test_filter_data.get('extra_where', '')258 if extra_where:259 extra_where = '(' + extra_where + ') AND '260 extra_where += 'tko_test_labels.id = %s' % label.id261 test_filter_data['extra_where'] = extra_where262 test_ids = models.TestView.objects.query_test_ids(test_filter_data)263 label.tests.remove(*test_ids)264# user-created test attributes265def set_test_attribute(attribute, value, **test_filter_data):266 """267 * attribute - string name of attribute268 * value - string, or None to delete an attribute269 * test_filter_data - filter data to apply to TestView to choose tests to act270 upon271 """272 assert test_filter_data # disallow accidental actions on all hosts273 test_ids = models.TestView.objects.query_test_ids(test_filter_data)274 tests = models.Test.objects.in_bulk(test_ids)275 for test in tests.itervalues():276 test.set_or_delete_attribute(attribute, value)277# saved queries278def get_saved_queries(**filter_data):279 return rpc_utils.prepare_for_serialization(280 models.SavedQuery.list_objects(filter_data))281def add_saved_query(name, url_token):282 name = name.strip()283 owner = afe_models.User.current_user().login284 existing_list = list(models.SavedQuery.objects.filter(owner=owner,285 name=name))286 if existing_list:287 query_object = existing_list[0]288 query_object.url_token = url_token289 query_object.save()290 return query_object.id291 return models.SavedQuery.add_object(owner=owner, name=name,292 url_token=url_token).id293def delete_saved_queries(id_list):294 user = afe_models.User.current_user().login295 query = models.SavedQuery.objects.filter(id__in=id_list, owner=user)296 if query.count() == 0:297 raise model_logic.ValidationError('No such queries found for this user')298 query.delete()299# other300def get_motd():301 return rpc_utils.get_motd()302def get_static_data():303 result = {}304 group_fields = []305 for field in models.TestView.group_fields:306 if field in models.TestView.extra_fields:307 name = models.TestView.extra_fields[field]308 else:309 name = models.TestView.get_field_dict()[field].verbose_name310 group_fields.append((name.capitalize(), field))311 model_fields = [(field.verbose_name.capitalize(), field.column)312 for field in models.TestView._meta.fields]313 extra_fields = [(field_name.capitalize(), field_sql)314 for field_sql, field_name315 in models.TestView.extra_fields.iteritems()]316 benchmark_key = {317 'kernbench': 'elapsed',318 'dbench': 'throughput',319 'tbench': 'throughput',320 'unixbench': 'score',321 'iozone': '32768-4096-fwrite'322 }323 tko_perf_view = [324 ['Test Index', 'test_idx'],325 ['Job Index', 'job_idx'],326 ['Test Name', 'test_name'],327 ['Subdirectory', 'subdir'],328 ['Kernel Index', 'kernel_idx'],329 ['Status Index', 'status_idx'],330 ['Reason', 'reason'],331 ['Host Index', 'machine_idx'],332 ['Test Started Time', 'test_started_time'],333 ['Test Finished Time', 'test_finished_time'],334 ['Job Tag', 'job_tag'],335 ['Job Name', 'job_name'],336 ['Owner', 'job_owner'],337 ['Job Queued Time', 'job_queued_time'],338 ['Job Started Time', 'job_started_time'],339 ['Job Finished Time', 'job_finished_time'],340 ['Hostname', 'hostname'],341 ['Platform', 'platform'],342 ['Machine Owner', 'machine_owner'],343 ['Kernel Hash', 'kernel_hash'],344 ['Kernel Base', 'kernel_base'],345 ['Kernel', 'kernel'],346 ['Status', 'status'],347 ['Iteration Number', 'iteration'],348 ['Performance Keyval (Key)', 'iteration_key'],349 ['Performance Keyval (Value)', 'iteration_value'],350 ]351 result['group_fields'] = sorted(group_fields)352 result['all_fields'] = sorted(model_fields + extra_fields)353 result['test_labels'] = get_test_labels(sort_by=['name'])354 result['current_user'] = rpc_utils.prepare_for_serialization(355 afe_models.User.current_user().get_object_dict())356 result['benchmark_key'] = benchmark_key357 result['tko_perf_view'] = tko_perf_view358 result['tko_test_view'] = model_fields359 result['preconfigs'] = preconfigs.manager.all_preconfigs()360 result['motd'] = rpc_utils.get_motd()361 return result362# lower level access to tko models363def get_machines(**filter_data):364 return rpc_utils.prepare_for_serialization(365 models.Machine.list_objects(filter_data))366def get_kernels(**filter_data):367 return rpc_utils.prepare_for_serialization(368 models.Kernel.list_objects(filter_data))369def get_patches(**filter_data):370 return rpc_utils.prepare_for_serialization(371 models.Patch.list_objects(filter_data))372def get_statuses(**filter_data):373 return rpc_utils.prepare_for_serialization(374 models.Status.list_objects(filter_data))375def get_jobs(**filter_data):376 return rpc_utils.prepare_for_serialization(377 models.Job.list_objects(filter_data))378def get_job_keyvals(**filter_data):379 return rpc_utils.prepare_for_serialization(380 models.JobKeyval.list_objects(filter_data))381def get_tests(**filter_data):382 return rpc_utils.prepare_for_serialization(383 models.Test.list_objects(filter_data))384def get_test_attributes(**filter_data):385 return rpc_utils.prepare_for_serialization(386 models.TestAttribute.list_objects(filter_data))387def get_iteration_attributes(**filter_data):388 return rpc_utils.prepare_for_serialization(389 models.IterationAttribute.list_objects(filter_data))390def get_iteration_results(**filter_data):391 return rpc_utils.prepare_for_serialization(392 models.IterationResult.list_objects(filter_data))393def get_interface_version():...

Full Screen

Full Screen

controller.py

Source:controller.py Github

copy

Full Screen

...12 self.inventory_client = inventory_client13 self.jobs_collection = jobs_collection14 self.tasks_collection = tasks_collection15 @staticmethod16 def prepare_for_serialization(obj):17 """Converts object_id to a string and a datetime object to ctime format18 :param obj: probably a task or a job document19 :return: the object reference20 """21 serialize_object_id(obj)22 if obj.get('ttl_time_completed'):23 obj['ttl_time_completed'] = obj['ttl_time_completed'].ctime()24 return obj25 @async_endpoint('get_job')26 async def get_job(self, job_id, projection=None):27 """Gets a job from the job_collection. Jobs expire quickly.28 :param job_id: The Id of the job to get29 :param projection: A mongodb projection. https://goo.gl/kB2g2630 :return: A job object31 """32 job = await self.jobs_collection.find_one({'job_id': job_id},33 projection=projection)34 if not job:35 return36 return self.prepare_for_serialization(job)37 @async_endpoint('get_job_status')38 async def get_job_status(self, job_id):39 """Get the status of job tasks40 :param job_id: the id of a job41 :return: Job object contain task status objects or None42 """43 error_states = ['ERROR', 'TIMEOUT', 'EXCEPTION']44 job = await self.jobs_collection.find_one({'job_id': job_id})45 if not job:46 return47 tasks = self.tasks_collection.find(48 {'job_id': job_id}, {'task_id': 1, 'status': 1, '_id': 0})49 job['has_failures'] = False50 job['tasks'] = []51 async for task in tasks:52 job['tasks'].append(serialize_object_id(task))53 if task['status'] in error_states:54 job['has_failures'] = True55 return self.prepare_for_serialization(job)56 @async_endpoint('get_job_tasks')57 async def get_job_tasks(self, job_id, projection=None):58 """Get tasks belonging to a job59 :param job_id: The id of a job (UUID)60 :param projection: A mongodb projection. https://goo.gl/kB2g2661 :return: dictionary containing top level keys count and jobs62 """63 c = self.tasks_collection.find({'job_id': job_id},64 projection=projection)65 count = await c.count()66 tasks = []67 async for task in c:68 tasks.append(self.prepare_for_serialization(task))69 return {'count': count, 'tasks': tasks}70 @async_endpoint('get_task')71 async def get_task(self, task_id):72 """Get a single task73 :param task_id: The id of the task (UUID)74 :return: The task object (dict)75 """76 task = await self.tasks_collection.find_one({'task_id': task_id})77 if not task:78 return79 return self.prepare_for_serialization(task)80 @async_endpoint('get_active_tasks_by_mercury_id')81 async def get_active_tasks_by_mercury_id(self, mercury_id):82 """Gets all active tasks associated with specified mercury id83 :param mercury_id:84 :return: dictionary containing tasks list and count85 """86 c = self.tasks_collection.find({'mercury_id': mercury_id,87 'time_completed': None})88 count = await c.count()89 tasks = []90 async for task in c:91 tasks.append(self.prepare_for_serialization(task))92 return {'count': count, 'tasks': tasks}93 @async_endpoint('get_jobs')94 async def get_jobs(self, projection=None):95 """Get active jobs. The jobs collection is made ephemeral via a ttl key;96 this collection should not grow very large97 :param projection: A mongodb projection. https://goo.gl/kB2g2698 :return: dictionary containing top level keys count and jobs99 """100 projection = projection or {'instruction': 0}101 c = self.jobs_collection.find({}, projection=projection).sort(102 'time_created', 1)103 count = await c.count()104 jobs = []105 async for job in c:106 jobs.append(self.prepare_for_serialization(job))107 return {'count': count, 'jobs': jobs}108 @async_endpoint('create_job')109 async def create_job(self, query, instruction):110 """Create a job111 :param query: Query representing targets of the instruction112 :param instruction: An instruction or preprocessor directive. See the113 full documentation regarding instruction syntax at114 http://jr0d.github.io/mercury_api115 :raises EndpointException: Raised after catching a MercuryUserError as116 to conform to dispatch semantics117 :return: The job_id or None118 """119 # Add a constraint to the query that ensures we only match 'active'120 # devices (devices that are accessible through an agent)...

Full Screen

Full Screen

mixins.py

Source:mixins.py Github

copy

Full Screen

...9 bool,10 dict,11 type(None),12 )13 def prepare_for_serialization(self):14 class_name = self.__class__.__name__15 dict_ = {}16 for k, v in self.__dict__.items():17 if type(v) in self.serializable_types:18 dict_[k] = v19 elif isinstance(v, Jsonable):20 dict_[k] = v.prepare_for_serialization()21 else:22 raise ValueError('{} is not Serializable!'.format(v))23 return {'class_name': class_name, 'dict': dict_}24 def to_json(self):25 return json.dumps(self.prepare_for_serialization(), indent=4)26 @classmethod27 def from_json(cls, json_str):28 json_dict = json.loads(json_str)29 string_class_name = json_dict['class_name']30 current_class_name = cls.__name__31 if string_class_name == current_class_name:32 return cls(json_dict)33 else:34 raise ValueError('type {} is different to type {}!'.format(string_class_name, current_class_name))35 def to_xml(self):36 return self.prepare_for_serialization()37class Panda(Jsonable):38 def __init__(self, name):39 self.name = name40 def __eq__(self, other):41 return self.__class__.__name__ == other.__class__.__name__42class Person(Jsonable):43 def __init__(self, name):44 self.name = name45 def __eq__(self, other):46 return self.name == other.name47p = Panda(name='Ivo')48# print(p.to_json())49# json_string = p.to_json()50print(p.to_xml())...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful