Best Python code snippet using autotest_python
models.py
Source:models.py  
...5    _GROUP_COUNT_NAME = 'group_count'6    def _get_key_unless_is_function(self, field):7        if '(' in field:8            return field9        return self.get_key_on_this_table(field)10    def _get_field_names(self, fields, extra_select_fields={}):11        field_names = []12        for field in fields:13            if field in extra_select_fields:14                field_names.append(field)15            else:16                field_names.append(self._get_key_unless_is_function(field))17        return field_names18    def _get_group_query_sql(self, query, group_by, extra_select_fields):19        group_fields = self._get_field_names(group_by, extra_select_fields)20        select_fields = [field for field in group_fields21                         if field not in extra_select_fields]22        for field_name, field_sql in extra_select_fields.iteritems():23            field_sql = self._get_key_unless_is_function(field_sql)24            select_fields.append(field_sql + ' AS ' + field_name)25            # add the extra fields to the query selects, so they'll be sortable26            # and Django won't mess with any of them27            query._select[field_name] = field_sql28        _, where, params = query._get_sql_clause()29        # insert GROUP BY clause into query30        group_by_clause = ' GROUP BY ' + ', '.join(group_fields)31        group_by_position = where.rfind('ORDER BY')32        if group_by_position == -1:33            group_by_position = len(where)34        where = (where[:group_by_position] +35                 group_by_clause + ' ' +36                 where[group_by_position:])37        return ('SELECT ' + ', '.join(select_fields) + where), params38    def _get_column_names(self, cursor):39        """\40        Gets the column names from the cursor description. This method exists41        so that it can be mocked in the unit test for sqlite3 compatibility."42        """43        return [column_info[0] for column_info in cursor.description]44    def execute_group_query(self, query, group_by, extra_select_fields=[]):45        """46        Performs the given query grouped by the fields in group_by with the47        given extra select fields added.  extra_select_fields should be a dict48        mapping field alias to field SQL.  Usually, the extra fields will use49        group aggregation functions.  Returns a list of dicts, where each dict50        corresponds to single row and contains a key for each grouped field as51        well as all of the extra select fields.52        """53        sql, params = self._get_group_query_sql(query, group_by,54                                                extra_select_fields)55        cursor = readonly_connection.connection().cursor()56        cursor.execute(sql, params)57        field_names = self._get_column_names(cursor)58        row_dicts = [dict(zip(field_names, row)) for row in cursor.fetchall()]59        return row_dicts60    def get_count_sql(self, query):61        """62        Get the SQL to properly select a per-group count of unique matches for63        a grouped query.  Returns a tuple (field alias, field SQL)64        """65        if query._distinct:66            pk_field = self.get_key_on_this_table()67            count_sql = 'COUNT(DISTINCT %s)' % pk_field68        else:69            count_sql = 'COUNT(1)'70        return self._GROUP_COUNT_NAME, count_sql71    def _get_num_groups_sql(self, query, group_by):72        group_fields = self._get_field_names(group_by)73        query._order_by = None # this can mess up the query and isn't needed74        _, where, params = query._get_sql_clause()75        return ('SELECT COUNT(DISTINCT %s) %s' % (','.join(group_fields),76                                                  where),77                params)78    def get_num_groups(self, query, group_by):79        """80        Returns the number of distinct groups for the given query grouped by the81        fields in group_by.82        """83        sql, params = self._get_num_groups_sql(query, group_by)84        cursor = readonly_connection.connection().cursor()85        cursor.execute(sql, params)86        return cursor.fetchone()[0]87class Machine(dbmodels.Model):88    machine_idx = dbmodels.AutoField(primary_key=True)89    hostname = dbmodels.CharField(unique=True, maxlength=300)90    machine_group = dbmodels.CharField(blank=True, maxlength=240)91    owner = dbmodels.CharField(blank=True, maxlength=240)92    class Meta:93        db_table = 'machines'94class Kernel(dbmodels.Model):95    kernel_idx = dbmodels.AutoField(primary_key=True)96    kernel_hash = dbmodels.CharField(maxlength=105, editable=False)97    base = dbmodels.CharField(maxlength=90)98    printable = dbmodels.CharField(maxlength=300)99    class Meta:100        db_table = 'kernels'101class Patch(dbmodels.Model):102    kernel = dbmodels.ForeignKey(Kernel, db_column='kernel_idx')103    name = dbmodels.CharField(blank=True, maxlength=240)104    url = dbmodels.CharField(blank=True, maxlength=900)105    hash_ = dbmodels.CharField(blank=True, maxlength=105, db_column='hash')106    class Meta:107        db_table = 'patches'108class Status(dbmodels.Model):109    status_idx = dbmodels.AutoField(primary_key=True)110    word = dbmodels.CharField(maxlength=30)111    class Meta:112        db_table = 'status'113class Job(dbmodels.Model):114    job_idx = dbmodels.AutoField(primary_key=True)115    tag = dbmodels.CharField(unique=True, maxlength=300)116    label = dbmodels.CharField(maxlength=300)117    username = dbmodels.CharField(maxlength=240)118    machine = dbmodels.ForeignKey(Machine, db_column='machine_idx')119    queued_time = dbmodels.DateTimeField(null=True, blank=True)120    started_time = dbmodels.DateTimeField(null=True, blank=True)121    finished_time = dbmodels.DateTimeField(null=True, blank=True)122    class Meta:123        db_table = 'jobs'124class Test(dbmodels.Model, model_logic.ModelExtensions,125           model_logic.ModelWithAttributes):126    test_idx = dbmodels.AutoField(primary_key=True)127    job = dbmodels.ForeignKey(Job, db_column='job_idx')128    test = dbmodels.CharField(maxlength=90)129    subdir = dbmodels.CharField(blank=True, maxlength=180)130    kernel = dbmodels.ForeignKey(Kernel, db_column='kernel_idx')131    status = dbmodels.ForeignKey(Status, db_column='status')132    reason = dbmodels.CharField(blank=True, maxlength=3072)133    machine = dbmodels.ForeignKey(Machine, db_column='machine_idx')134    finished_time = dbmodels.DateTimeField(null=True, blank=True)135    started_time = dbmodels.DateTimeField(null=True, blank=True)136    objects = model_logic.ExtendedManager()137    def _get_attribute_model_and_args(self, attribute):138        return TestAttribute, dict(test=self, attribute=attribute,139                                   user_created=True)140    def set_attribute(self, attribute, value):141        # ensure non-user-created attributes remain immutable142        try:143            TestAttribute.objects.get(test=self, attribute=attribute,144                                      user_created=False)145            raise ValueError('Attribute %s already exists for test %s and is '146                             'immutable' % (attribute, self.test_idx))147        except TestAttribute.DoesNotExist:148            super(Test, self).set_attribute(attribute, value)149    class Meta:150        db_table = 'tests'151class TestAttribute(dbmodels.Model, model_logic.ModelExtensions):152    test = dbmodels.ForeignKey(Test, db_column='test_idx')153    attribute = dbmodels.CharField(maxlength=90)154    value = dbmodels.CharField(blank=True, maxlength=300)155    user_created = dbmodels.BooleanField(default=False)156    objects = model_logic.ExtendedManager()157    class Meta:158        db_table = 'test_attributes'159class IterationAttribute(dbmodels.Model, model_logic.ModelExtensions):160    # this isn't really a primary key, but it's necessary to appease Django161    # and is harmless as long as we're careful162    test = dbmodels.ForeignKey(Test, db_column='test_idx', primary_key=True)163    iteration = dbmodels.IntegerField()164    attribute = dbmodels.CharField(maxlength=90)165    value = dbmodels.CharField(blank=True, maxlength=300)166    objects = model_logic.ExtendedManager()167    class Meta:168        db_table = 'iteration_attributes'169class IterationResult(dbmodels.Model, model_logic.ModelExtensions):170    # see comment on IterationAttribute regarding primary_key=True171    test = dbmodels.ForeignKey(Test, db_column='test_idx', primary_key=True)172    iteration = dbmodels.IntegerField()173    attribute = dbmodels.CharField(maxlength=90)174    value = dbmodels.FloatField(null=True, max_digits=12, decimal_places=31,175                              blank=True)176    objects = model_logic.ExtendedManager()177    class Meta:178        db_table = 'iteration_result'179class TestLabel(dbmodels.Model, model_logic.ModelExtensions):180    name = dbmodels.CharField(maxlength=80, unique=True)181    description = dbmodels.TextField(blank=True)182    tests = dbmodels.ManyToManyField(Test, blank=True,183                                     filter_interface=dbmodels.HORIZONTAL)184    name_field = 'name'185    objects = model_logic.ExtendedManager()186    class Meta:187        db_table = 'test_labels'188class SavedQuery(dbmodels.Model, model_logic.ModelExtensions):189    # TODO: change this to foreign key once DBs are merged190    owner = dbmodels.CharField(maxlength=80)191    name = dbmodels.CharField(maxlength=100)192    url_token = dbmodels.TextField()193    class Meta:194        db_table = 'saved_queries'195class EmbeddedGraphingQuery(dbmodels.Model, model_logic.ModelExtensions):196    url_token = dbmodels.TextField(null=False, blank=False)197    graph_type = dbmodels.CharField(maxlength=16, null=False, blank=False)198    params = dbmodels.TextField(null=False, blank=False)199    last_updated = dbmodels.DateTimeField(null=False, blank=False,200                                          editable=False)201    # refresh_time shows the time at which a thread is updating the cached202    # image, or NULL if no one is updating the image. This is used so that only203    # one thread is updating the cached image at a time (see204    # graphing_utils.handle_plot_request)205    refresh_time = dbmodels.DateTimeField(editable=False)206    cached_png = dbmodels.TextField(editable=False)207    class Meta:208        db_table = 'embedded_graphing_queries'209# views210class TestViewManager(TempManager):211    def get_query_set(self):212        query = super(TestViewManager, self).get_query_set()213        # add extra fields to selects, using the SQL itself as the "alias"214        extra_select = dict((sql, sql)215                            for sql in self.model.extra_fields.iterkeys())216        return query.extra(select=extra_select)217    def _get_include_exclude_suffix(self, exclude):218        if exclude:219            suffix = '_exclude'220        else:221            suffix = '_include'222        return suffix223    def _add_label_joins(self, query_set, suffix=''):224        query_set = self.add_join(query_set, 'test_labels_tests',225                                   join_key='test_id', suffix=suffix,226                                   force_left_join=True)227        second_join_alias = 'test_labels' + suffix228        second_join_condition = ('%s.id = %s.testlabel_id' %229                                 (second_join_alias,230                                  'test_labels_tests' + suffix))231        filter_object = self._CustomSqlQ()232        filter_object.add_join('test_labels',233                               second_join_condition,234                               'LEFT JOIN',235                               alias=second_join_alias)236        return query_set.filter(filter_object)237    def _add_attribute_join(self, query_set, join_condition='', suffix=None,238                            exclude=False):239        join_condition = self.escape_user_sql(join_condition)240        if suffix is None:241            suffix = self._get_include_exclude_suffix(exclude)242        return self.add_join(query_set, 'test_attributes',243                              join_key='test_idx',244                              join_condition=join_condition,245                              suffix=suffix, exclude=exclude)246    def _get_label_ids_from_names(self, label_names):247        if not label_names:248            return []249        query = TestLabel.objects.filter(name__in=label_names).values('id')250        return [str(label['id']) for label in query]251    def get_query_set_with_joins(self, filter_data, include_host_labels=False):252        include_labels = filter_data.pop('include_labels', [])253        exclude_labels = filter_data.pop('exclude_labels', [])254        query_set = self.get_query_set()255        joined = False256        # TODO: make this check more thorough if necessary257        extra_where = filter_data.get('extra_where', '')258        if 'test_labels' in extra_where:259            query_set = self._add_label_joins(query_set)260            joined = True261        include_label_ids = self._get_label_ids_from_names(include_labels)262        if include_label_ids:263            # TODO: Factor this out like what's done with attributes264            condition = ('test_labels_tests_include.testlabel_id IN (%s)' %265                         ','.join(include_label_ids))266            query_set = self.add_join(query_set, 'test_labels_tests',267                                       join_key='test_id',268                                       suffix='_include',269                                       join_condition=condition)270            joined = True271        exclude_label_ids = self._get_label_ids_from_names(exclude_labels)272        if exclude_label_ids:273            condition = ('test_labels_tests_exclude.testlabel_id IN (%s)' %274                         ','.join(exclude_label_ids))275            query_set = self.add_join(query_set, 'test_labels_tests',276                                       join_key='test_id',277                                       suffix='_exclude',278                                       join_condition=condition,279                                       exclude=True)280            joined = True281        include_attributes_where = filter_data.pop('include_attributes_where',282                                                   '')283        exclude_attributes_where = filter_data.pop('exclude_attributes_where',284                                                   '')285        if include_attributes_where:286            query_set = self._add_attribute_join(287                query_set, join_condition=include_attributes_where)288            joined = True289        if exclude_attributes_where:290            query_set = self._add_attribute_join(291                query_set, join_condition=exclude_attributes_where,292                exclude=True)293            joined = True294        if not joined:295            filter_data['no_distinct'] = True296        if include_host_labels or 'test_attributes_host_labels' in extra_where:297            query_set = self._add_attribute_join(298                query_set, suffix='_host_labels',299                join_condition='test_attributes_host_labels.attribute = '300                               '"host-labels"')301        return query_set302    def query_test_ids(self, filter_data):303        dicts = self.model.query_objects(filter_data).values('test_idx')304        return [item['test_idx'] for item in dicts]305    def query_test_label_ids(self, filter_data):306        query_set = self.model.query_objects(filter_data)307        query_set = self._add_label_joins(query_set, suffix='_list')308        rows = self._custom_select_query(query_set, ['test_labels_list.id'])309        return [row[0] for row in rows if row[0] is not None]310    def escape_user_sql(self, sql):311        sql = super(TestViewManager, self).escape_user_sql(sql)312        return sql.replace('test_idx', self.get_key_on_this_table('test_idx'))313class TestView(dbmodels.Model, model_logic.ModelExtensions):314    extra_fields = {315            'DATE(job_queued_time)': 'job queued day',316            'DATE(test_finished_time)': 'test finished day',317    }318    group_fields = [319            'test_name',320            'status',321            'kernel',322            'hostname',323            'job_tag',324            'job_name',325            'platform',326            'reason',...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
