Best Python code snippet using lisa_python
schema.py
Source:schema.py  
...1685    def _get_scale(self):1686        return self._attributes['RDB$FIELD_SCALE']1687    def _get_field_type(self):1688        return self._attributes['RDB$FIELD_TYPE']1689    def _get_sub_type(self):1690        return self._attributes['RDB$FIELD_SUB_TYPE']1691    def _get_segment_length(self):1692        return self._attributes['RDB$SEGMENT_LENGTH']1693    def _get_external_length(self):1694        return self._attributes['RDB$EXTERNAL_LENGTH']1695    def _get_external_scale(self):1696        return self._attributes['RDB$EXTERNAL_SCALE']1697    def _get_external_type(self):1698        return self._attributes['RDB$EXTERNAL_TYPE']1699    def _get_dimensions(self):1700        if self._attributes['RDB$DIMENSIONS']:1701            return self.schema._get_field_dimensions(self)1702        else:1703            return []1704    def _get_character_length(self):1705        return self._attributes['RDB$CHARACTER_LENGTH']1706    def _get_collation(self):1707        return self.schema.get_collation_by_id(self._attributes['RDB$CHARACTER_SET_ID'],1708                                               self._attributes['RDB$COLLATION_ID'])1709    def _get_character_set(self):1710        return self.schema.get_character_set_by_id(self._attributes['RDB$CHARACTER_SET_ID'])1711    def _get_precision(self):1712        return self._attributes['RDB$FIELD_PRECISION']1713    def _get_datatype(self):1714        l = []1715        precision_known = False1716        if self.field_type in (FBT_SMALLINT,FBT_INTEGER,FBT_BIGINT):1717            if self.precision != None:1718                if (self.sub_type > 0) and (self.sub_type < MAX_INTSUBTYPES):1719                    l.append('%s(%d, %d)' % \1720                      (INTEGRAL_SUBTYPES[self.sub_type],self.precision,-self.scale))1721                    precision_known = True1722        if not precision_known:1723            if (self.field_type == FBT_SMALLINT) and (self.scale < 0):1724                l.append('NUMERIC(4, %d)' % -self.scale)1725            elif (self.field_type == FBT_INTEGER) and (self.scale < 0):1726                l.append('NUMERIC(9, %d)' % -self.scale)1727            elif (self.field_type == FBT_DOUBLE_PRECISION) and (self.scale < 0):1728                l.append('NUMERIC(15, %d)' % -self.scale)1729            else:1730                l.append(COLUMN_TYPES[self.field_type])1731        if self.field_type in (FBT_CHAR,FBT_VARCHAR):1732            l.append('(%d)' % (self.length if self.character_length == None else self.character_length))1733        if self._attributes['RDB$DIMENSIONS'] != None:1734            l.append('[%s]' % ', '.join('%d' % u if l == 1 1735                                        else '%d:%d' % (l,u) 1736                                        for l,u in self.dimensions))1737        if self.field_type == FBT_BLOB:1738            if self.sub_type >= 0 and self.sub_type <= MAX_BLOBSUBTYPES:1739                l.append(' SUB_TYPE %s' % BLOB_SUBTYPES[self.sub_type])1740            else:1741                l.append(' SUB_TYPE %d' % self.sub_type)1742            l.append(' SEGMENT SIZE %d' % self.segment_length)1743        if self.field_type in (FBT_CHAR,FBT_VARCHAR,FBT_BLOB):1744            if self._attributes['RDB$CHARACTER_SET_ID'] is not None and \1745              (self.character_set.name != self.schema.default_character_set.name) or \1746              self._attributes['RDB$COLLATION_ID']:1747                if (self._attributes['RDB$CHARACTER_SET_ID'] is not None):1748                    l.append(' CHARACTER SET %s' % self.character_set.name)1749                if self._attributes['RDB$COLLATION_ID'] is not None:1750                    cname = self.collation.name1751                    if self.character_set._attributes['RDB$DEFAULT_COLLATE_NAME'] != cname:1752                        l.append(' COLLATE %s' % cname)1753        return ''.join(l)1754    #--- Properties1755    expression = LateBindingProperty(_get_expression,None,None,1756        "Expression that defines the COMPUTED BY column or None.")1757    validation = LateBindingProperty(_get_validation,None,None,1758        "CHECK constraint for the domain or None.")1759    default = LateBindingProperty(_get_default,None,None,1760        "Expression that defines the default value or None.")1761    length = LateBindingProperty(_get_length,None,None,1762        "Length of the column in bytes.")1763    scale = LateBindingProperty(_get_scale,None,None,1764        "Negative number representing the scale of NUMBER and DECIMAL column.")1765    field_type = LateBindingProperty(_get_field_type,None,None,1766        "Number code of the data type defined for the column.")1767    sub_type = LateBindingProperty(_get_sub_type,None,None,"BLOB subtype.")1768    segment_length = LateBindingProperty(_get_segment_length,None,None,1769        "For BLOB columns, a suggested length for BLOB buffers.")1770    external_length = LateBindingProperty(_get_external_length,None,None,1771        "Length of field as it is in an external table. Always 0 for regular tables.")1772    external_scale = LateBindingProperty(_get_external_scale,None,None,1773        "Scale factor of an integer field as it is in an external table.")1774    external_type = LateBindingProperty(_get_external_type,None,None,1775        "Data type of the field as it is in an external table.")1776    dimensions = LateBindingProperty(_get_dimensions,None,None,1777        "List of dimension definition pairs if column is an array type. Always empty for non-array columns.")1778    character_length = LateBindingProperty(_get_character_length,None,None,1779        "Length of CHAR and VARCHAR column, in characters (not bytes).")1780    collation = LateBindingProperty(_get_collation,None,None,1781        "Collation object for a character column or None.")1782    character_set = LateBindingProperty(_get_character_set,None,None,1783        "CharacterSet object for a character or text BLOB column, or None.")1784    precision = LateBindingProperty(_get_precision,None,None,1785        "Indicates the number of digits of precision available to the data type of the column.")1786    datatype = LateBindingProperty(_get_datatype,None,None,1787        "Comlete SQL datatype definition.")1788    #--- Public1789    1790    def accept_visitor(self,visitor):1791        """Visitor Pattern support. Calls `visitDomain(self)` on parameter object.1792        1793        :param visitor: Visitor object of Vistior Pattern.1794        """1795        visitor.visitDomain(self)1796    def issystemobject(self):1797        "Return True if this database object is system object."1798        return (self._attributes['RDB$SYSTEM_FLAG'] == 1) or self.name.startswith('RDB$')1799    def isnullable(self):1800        "Returns True if domain is not defined with NOT NULL."1801        return not self._attributes['RDB$NULL_FLAG']1802    def iscomputed(self):1803        "Returns True if domain is computed."1804        return bool(self._attributes['RDB$COMPUTED_SOURCE'])1805    def isvalidated(self):1806        "Returns True if domain has validation constraint."1807        return bool(self._attributes['RDB$VALIDATION_SOURCE'])1808    def isarray(self):1809        "Returns True if domain defines an array."1810        return bool(self._attributes['RDB$DIMENSIONS'])1811    def has_default(self):1812        "Returns True if domain has default value."1813        return bool(self._attributes['RDB$DEFAULT_SOURCE'])1814class Dependency(BaseSchemaItem):1815    """Maps dependency between database objects.1816    Supported SQL actions: none1817    """1818    def __init__(self,schema,attributes):1819        super(Dependency,self).__init__(schema,attributes)1820        1821        self._strip_attribute('RDB$DEPENDENT_NAME')1822        self._strip_attribute('RDB$DEPENDED_ON_NAME')1823        self._strip_attribute('RDB$FIELD_NAME')1824    #--- Protected1825    1826    def _get_dependent_name(self):1827        return self._attributes['RDB$DEPENDENT_NAME']1828    def _get_dependent_type(self):1829        return self._attributes['RDB$DEPENDENT_TYPE']1830    def _get_field_name(self):1831        return self._attributes['RDB$FIELD_NAME']1832    def _get_depended_on_name(self):1833        return self._attributes['RDB$DEPENDED_ON_NAME']1834    def _get_depended_on_type(self):1835        return self._attributes['RDB$DEPENDED_ON_TYPE']1836    def _get_dependent(self):1837        if self.dependent_type == 0: # TABLE1838            t = self.schema.get_table(self.dependent_name)1839        elif self.dependent_type == 1: # VIEW1840            return self.schema.get_view(self.dependent_name)1841        elif self.dependent_type == 2: # TRIGGER1842            return self.schema.get_trigger(self.dependent_name)1843        elif self.dependent_type == 3: # COMPUTED FIELD (i.e. DOMAIN)1844            return self.schema.get_domain(self.dependent_name)1845        elif self.dependent_type == 4: 1846            ## ToDo: Implement handler for VALIDATION if necessary1847            return None1848        elif self.dependent_type == 5: #PROCEDURE1849            return self.schema.get_procedure(self.dependent_name)1850        elif self.dependent_type == 6: # EXPRESSION INDEX1851            return self.schema.get_index(self.dependent_name)1852        elif self.dependent_type == 7: # EXCEPTION1853            return self.schema.get_exception(self.dependent_name)1854        elif self.dependent_type == 8:1855            ## ToDo: Implement handler for USER if necessary1856            return None1857        elif self.dependent_type == 9: # FIELD (i.e. DOMAIN)1858            return self.schema.get_domain(self.dependent_name)1859        elif self.dependent_type == 10: # INDEX1860            return self.schema.get_index(self.dependent_name)1861        elif self.dependent_type == 11:1862            ## ToDo: Implement handler for DEPENDENT COUNT if necessary1863            return None1864        elif self.dependent_type == 12:1865            ## ToDo: Implement handler for USER GROUP if necessary1866            return None1867        elif self.dependent_type == 13: # ROLE1868            return self.schema.get_role(self.dependent_name)1869        elif self.dependent_type == 14: # GENERATOR1870            return self.schema.get_generator(self.dependent_name)1871        elif self.dependent_type == 15: # UDF1872            return self.schema.get_function(self.dependent_name)1873        elif self.dependent_type == 16:1874            ## ToDo: Implement handler for BLOB_FILTER1875            return None1876        return None1877    def _get_depended_on(self):1878        if self.depended_on_type == 0: # TABLE1879            t = self.schema.get_table(self.depended_on_name)1880            if self.field_name:1881                return t.get_column(self.field_name)1882            else:1883                return t1884        elif self.depended_on_type == 1: # VIEW1885            return self.schema.get_view(self.depended_on_name)1886        elif self.depended_on_type == 2: # TRIGGER1887            return self.schema.get_trigger(self.depended_on_name)1888        elif self.depended_on_type == 3: # COMPUTED FIELD (i.e. DOMAIN)1889            return self.schema.get_domain(self.depended_on_name)1890        elif self.depended_on_type == 4:1891            ## ToDo: Implement handler for VALIDATION if necessary1892            return None1893        elif self.depended_on_type == 5: #PROCEDURE1894            return self.schema.get_procedure(self.depended_on_name)1895        elif self.depended_on_type == 6: # EXPRESSION INDEX1896            return self.schema.get_index(self.depended_on_name)1897        elif self.depended_on_type == 7: # EXCEPTION1898            return self.schema.get_exception(self.depended_on_name)1899        elif self.depended_on_type == 8:1900            ## ToDo: Implement handler for USER if necessary1901            return None1902        elif self.depended_on_type == 9: # FIELD (i.e. DOMAIN)1903            return self.schema.get_domain(self.depended_on_name)1904        elif self.depended_on_type == 10: # INDEX1905            return self.schema.get_index(self.depended_on_name)1906        elif self.depended_on_type == 11:1907            ## ToDo: Implement handler for DEPENDENT COUNT if necessary1908            return None1909        elif self.depended_on_type == 12:1910            ## ToDo: Implement handler for USER GROUP if necessary1911            return None1912        elif self.depended_on_type == 13: # ROLE1913            return self.schema.get_role(self.depended_on_name)1914        elif self.depended_on_type == 14: # GENERATOR1915            return self.schema.get_generator(self.depended_on_name)1916        elif self.depended_on_type == 15: # UDF1917            return self.schema.get_function(self.depended_on_name)1918        elif self.depended_on_type == 16:1919            ## ToDo: Implement handler for BLOB_FILTER1920            return None1921        return None1922    1923    #--- Properties1924    1925    dependent = LateBindingProperty(_get_dependent,None,None,1926                                    "Dependent database object.")1927    dependent_name = LateBindingProperty(_get_dependent_name,None,None,1928                                    "Dependent database object name.")1929    dependent_type = LateBindingProperty(_get_dependent_type,None,None,1930                                    "Dependent database object type.")1931    field_name = LateBindingProperty(_get_field_name,None,None,1932                                    "Name of one column in `depended on` object.")1933    depended_on = LateBindingProperty(_get_depended_on,None,None,1934                                    "Database object on which dependent depends.")1935    depended_on_name = LateBindingProperty(_get_depended_on_name,None,None,1936                                    "Name of db object on which dependent depends.")1937    depended_on_type = LateBindingProperty(_get_depended_on_type,None,None,1938                                    "Type of db object on which dependent depends.")1939    1940    #--- Public1941    1942    def accept_visitor(self,visitor):1943        """Visitor Pattern support. Calls `visitDependency(self)` on parameter object.1944        1945        :param visitor: Visitor object of Vistior Pattern.1946        """1947        visitor.visitDependency(self)1948    def issystemobject(self):1949        "Returns True as dependency entries are considered as system objects." 1950        return True1951    def get_dependents(self):1952        "Returns empty list because Dependency object never has dependents."1953        return []1954    def get_dependencies(self):1955        "Returns empty list because Dependency object never has dependencies."1956        return []1957    1958class Constraint(BaseSchemaItem):1959    """Represents table or column constraint.1960    Supported SQL actions: 1961    1962    - Constraint on user table except NOT NULL constraint: create, drop1963    - Constraint on system table: none1964    """1965    def __init__(self,schema,attributes):1966        super(Constraint,self).__init__(schema,attributes)1967        self._strip_attribute('RDB$CONSTRAINT_NAME')1968        self._strip_attribute('RDB$CONSTRAINT_TYPE')1969        self._strip_attribute('RDB$RELATION_NAME')1970        self._strip_attribute('RDB$DEFERRABLE')1971        self._strip_attribute('RDB$INITIALLY_DEFERRED')1972        self._strip_attribute('RDB$INDEX_NAME')1973        self._strip_attribute('RDB$TRIGGER_NAME')1974        self._strip_attribute('RDB$CONST_NAME_UQ')1975        self._strip_attribute('RDB$MATCH_OPTION')1976        self._strip_attribute('RDB$UPDATE_RULE')1977        self._strip_attribute('RDB$DELETE_RULE')1978        if not (self.issystemobject() or self.isnotnull()):1979            self._actions = ['create','drop']1980    #--- Protected1981    def _get_create_sql(self,**params):1982        self._check_params(params,[])1983        const_def = 'ALTER TABLE %s ADD ' % self.table.get_quoted_name()1984        if not self.name.startswith('INTEG_'):1985            const_def += 'CONSTRAINT %s\n  ' % self.get_quoted_name()1986        if self.ischeck():1987            const_def += self.triggers[0].source1988        elif self.ispkey() or self.isunique():1989            const_def += 'PRIMARY KEY' if self.ispkey() else 'UNIQUE'1990            i = self.index1991            const_def +=  ' (%s)' % ','.join(i.segment_names)1992            if not i.issystemobject():1993                const_def += '\n  USING %s INDEX %s' % (i.index_type,i.get_quoted_name())1994        elif self.isfkey():1995            const_def += 'FOREIGN KEY (%s)\n  ' % ','.join(self.index.segment_names)1996            p = self.partner_constraint1997            const_def += 'REFERENCES %s (%s)' % (p.table.get_quoted_name(),1998                                                 ','.join(p.index.segment_names))1999            if self.delete_rule != 'RESTRICT':2000                const_def += '\n  ON DELETE %s' % self.delete_rule2001            if self.update_rule != 'RESTRICT':2002                const_def += '\n  ON UPDATE %s' % self.update_rule2003            i = self.index2004            if not i.issystemobject():2005                const_def += '\n  USING %s INDEX %s' % (i.index_type,i.get_quoted_name())2006        else:2007            raise fdb_embedded.OperationalError("Unrecognized constraint type '%s'" % self.constraint_type)2008        return const_def2009    def _get_drop_sql(self,**params):2010        self._check_params(params,[])2011        return 'ALTER TABLE %s DROP CONSTRAINT %s' % (self.table.get_quoted_name(),2012                                                      self.get_quoted_name())2013    def _get_name(self):2014        return self._attributes['RDB$CONSTRAINT_NAME']2015    def _get_constraint_type(self):2016        return self._attributes['RDB$CONSTRAINT_TYPE']2017    def _get_table(self):2018        return self.schema.get_table(self._attributes['RDB$RELATION_NAME'])2019    def _get_index(self):2020        return self.schema.get_index(self._attributes['RDB$INDEX_NAME'])2021    def _get_trigger_names(self):2022        if self.ischeck():2023            return self._attributes['RDB$TRIGGER_NAME']2024        else:2025            return []2026    def _get_triggers(self):2027        return [self.schema.get_trigger(tname) for tname in self.trigger_names]2028    def _get_column_name(self):2029        if self.isnotnull():2030            return self._attributes['RDB$TRIGGER_NAME']2031        else:2032            return None2033    def _get_partner_constraint(self):2034        return self.schema.get_constraint(self._attributes['RDB$CONST_NAME_UQ'])2035    def _get_match_option(self):2036        return self._attributes['RDB$MATCH_OPTION']2037    def _get_update_rule(self):2038        return self._attributes['RDB$UPDATE_RULE']2039    def _get_delete_rule(self):2040        return self._attributes['RDB$DELETE_RULE']2041    #--- Properties2042    constraint_type = LateBindingProperty(_get_constraint_type,None,None,2043        "primary key/unique/foreign key/check/not null.")2044    table = LateBindingProperty(_get_table,None,None,2045        ":class:`Table` instance this constraint applies to.")2046    index = LateBindingProperty(_get_index,None,None,2047        ":class:`Index` instance that enforces the constraint.\n`None` if constraint is not primary key/unique or foreign key.")2048    trigger_names = LateBindingProperty(_get_trigger_names,None,None,2049        "For a CHECK constraint contains trigger names that enforce the constraint.")2050    triggers = LateBindingProperty(_get_triggers,None,None,2051        "For a CHECK constraint contains :class:`Trigger` instances that enforce the constraint.")2052    column_name = LateBindingProperty(_get_column_name,None,None,2053        "For a NOT NULL constraint, this is the name of the column to which the constraint applies.")2054    partner_constraint = LateBindingProperty(_get_partner_constraint,None,None,2055        "For a FOREIGN KEY constraint, this is the unique or primary key :class:`Constraint` referred.")2056    match_option = LateBindingProperty(_get_match_option,None,None,2057        "For a FOREIGN KEY constraint only. Current value is FULL in all cases.")2058    update_rule = LateBindingProperty(_get_update_rule,None,None,2059        "For a FOREIGN KEY constraint, this is the action applicable to when primary key is updated.")2060    delete_rule = LateBindingProperty(_get_delete_rule,None,None,2061        "For a FOREIGN KEY constraint, this is the action applicable to when primary key is deleted.")2062    #--- Public2063    2064    def accept_visitor(self,visitor):2065        """Visitor Pattern support. Calls `visitConstraint(self)` on parameter object.2066        2067        :param visitor: Visitor object of Vistior Pattern.2068        """2069        visitor.visitConstraint(self)2070    def issystemobject(self):2071        "Returns True if this database object is system object."2072        return self.schema.get_table(self._attributes['RDB$RELATION_NAME']).issystemobject()2073    def isnotnull(self):2074        "Returns True if it's NOT NULL constraint."2075        return self.constraint_type == 'NOT NULL'2076    def ispkey(self):2077        "Returns True if it's PRIMARY KEY constraint."2078        return self.constraint_type == 'PRIMARY KEY'2079    def isfkey(self):2080        "Returns True if it's FOREIGN KEY constraint."2081        return self.constraint_type == 'FOREIGN KEY'2082    def isunique(self):2083        "Returns True if it's UNIQUE constraint."2084        return self.constraint_type == 'UNIQUE'2085    def ischeck(self):2086        "Returns True if it's CHECK constraint."2087        return self.constraint_type == 'CHECK'2088    def isdeferrable(self):2089        "Returns True if it's DEFERRABLE constraint."2090        return self._attributes['RDB$DEFERRABLE'] != 'NO'2091    def isdeferred(self):2092        "Returns True if it's INITIALLY DEFERRED constraint."2093        return self._attributes['RDB$INITIALLY_DEFERRED'] != 'NO'2094class Table(BaseSchemaItem):2095    """Represents Table in database.2096    Supported SQL actions: 2097    2098    - User table: create, recreate, drop2099    - System table: none2100    """2101    def __init__(self,schema,attributes):2102        super(Table,self).__init__(schema,attributes)2103        self._type_code = [0,]2104        self.__columns = None2105        self._strip_attribute('RDB$RELATION_NAME')2106        self._strip_attribute('RDB$OWNER_NAME')2107        self._strip_attribute('RDB$SECURITY_CLASS')2108        self._strip_attribute('RDB$DEFAULT_CLASS')2109        if not self.issystemobject():2110            self._actions = ['create','recreate','drop']2111            2112    #--- Protected2113    def _get_create_sql(self,**params):2114        self._check_params(params,[])2115        tabdef = 'CREATE %sTABLE %s' % ('GLOBAL TEMPORARY ' if self.isgtt() else '',2116                                        self.get_quoted_name())2117        if self.isexternal():2118            tabdef += "  EXTERNAL FILE '%s'\n" % self.external_file2119        tabdef += '\n('2120        partdefs = []2121        for col in self.columns:2122            coldef = '\n  %s ' % col.get_quoted_name()2123            collate = ''2124            if col.isdomainbased():2125                coldef += '%s' % col.domain.get_quoted_name()2126            elif col.iscomputed():2127                coldef += 'COMPUTED BY %s' % col.get_computedby()2128            else:2129                datatype = col.datatype2130                if datatype.rfind(' COLLATE ') > 0:2131                    datatype, collate = datatype.split(' COLLATE ')2132                coldef += '%s' % datatype2133            if col.has_default():2134                coldef += ' DEFAULT %s' % col.default2135            if not col.isnullable():2136                coldef += ' NOT NULL'2137            if col._attributes['RDB$COLLATION_ID'] is not None:2138                cname = col.collation.name2139                if col.domain.character_set._attributes['RDB$DEFAULT_COLLATE_NAME'] != cname:2140                    collate = cname2141            if collate:2142                coldef += ' COLLATE %s' % collate2143            partdefs.append(coldef)2144        if self.has_pkey():2145            pk = self.primary_key2146            pkdef = '\n  '2147            if not pk.name.startswith('INTEG_'):2148                pkdef += 'CONSTRAINT %s\n  ' % pk.get_quoted_name()2149            i = pk.index2150            pkdef +=  'PRIMARY KEY (%s)' % ','.join(i.segment_names)2151            if not i.issystemobject():2152                pkdef += '\n    USING %s INDEX %s' % (i.index_type,i.get_quoted_name())2153            partdefs.append(pkdef)2154        for uq in self.constraints:2155            if uq.isunique():2156                uqdef = '\n  '2157                if not uq.name.startswith('INTEG_'):2158                    uqdef += 'CONSTRAINT %s\n  ' % uq.get_quoted_name()2159                i = uq.index2160                uqdef +=  'UNIQUE (%s)' % ','.join(i.segment_names)2161                if not i.issystemobject():2162                    uqdef += '\n    USING %s INDEX %s' % (i.index_type,i.get_quoted_name())2163                partdefs.append(uqdef)2164        tabdef += ','.join(partdefs)2165        tabdef += '\n)'2166        return tabdef2167    def _get_drop_sql(self,**params):2168        self._check_params(params,[])2169        return 'DROP TABLE %s' % self.get_quoted_name()2170    def _get_name(self):2171        return self._attributes['RDB$RELATION_NAME']2172    def _get_id(self):2173        return self._attributes['RDB$RELATION_ID']2174    def _get_dbkey_length(self):2175        return self._attributes['RDB$DBKEY_LENGTH']2176    def _get_format(self):2177        return self._attributes['RDB$FORMAT']2178    def _get_table_type(self):2179        return self.schema.enum_relation_types.get(self._attributes.get('RDB$RELATION_TYPE'),2180                                                   'PERSISTENT')2181    def _get_security_class(self):2182        return self._attributes['RDB$SECURITY_CLASS']2183    def _get_external_file(self):2184        return self._attributes['RDB$EXTERNAL_FILE']2185    def _get_owner_name(self):2186        return self._attributes['RDB$OWNER_NAME']2187    def _get_default_class(self):2188        return self._attributes['RDB$DEFAULT_CLASS']2189    def _get_flags(self):2190        return self._attributes['RDB$FLAGS']2191    def _get_indices(self):2192        return [i for i in self.schema._get_all_indices() 2193                if i._attributes['RDB$RELATION_NAME'] == self.name]2194    def _get_triggers(self):2195        return [t for t in self.schema.triggers 2196                if t._attributes['RDB$RELATION_NAME'] == self.name]2197    def _get_constraints(self):2198        return [c for c in self.schema.constraints 2199                if c._attributes['RDB$RELATION_NAME'] == self.name]2200    def _get_columns(self):2201        if self.__columns is None:2202            self.__columns = [TableColumn(self.schema,self,row) for row in 2203                self.schema._select("""select RDB$FIELD_NAME, RDB$RELATION_NAME,2204RDB$FIELD_SOURCE, RDB$FIELD_POSITION, RDB$UPDATE_FLAG, RDB$FIELD_ID, RDB$DESCRIPTION, 2205RDB$SYSTEM_FLAG, RDB$SECURITY_CLASS, RDB$NULL_FLAG, RDB$DEFAULT_SOURCE, RDB$COLLATION_ID 2206from RDB$RELATION_FIELDS where RDB$RELATION_NAME = ? order by RDB$FIELD_POSITION""",(self.name,))]2207        return self.__columns2208    def _get_primary_key(self):2209        for const in self.constraints:2210            if const.ispkey():2211                return const2212        return None2213    def _get_foreign_keys(self):2214        return [c for c in self.constraints if c.isfkey()]2215    def _get_privileges(self):2216        return [p for p in self.schema.privileges2217                if ((p.subject_name == self.name) and 2218                    (p.subject_type in self._type_code))]2219    #--- Properties2220    id = LateBindingProperty(_get_id,None,None,"Internam number ID for the table.")2221    dbkey_length = LateBindingProperty(_get_dbkey_length,None,None,2222            "Length of the RDB$DB_KEY column in bytes.")2223    format = LateBindingProperty(_get_format,None,None,2224            "Internal format ID for the table.")2225    table_type = LateBindingProperty(_get_table_type,None,None,"Table type.")2226    security_class = LateBindingProperty(_get_security_class,None,None,2227            "Security class that define access limits to the table.")2228    external_file = LateBindingProperty(_get_external_file,None,None,2229            "Full path to the external data file, if any.")2230    owner_name = LateBindingProperty(_get_owner_name,None,None,2231            "User name of table's creator.")2232    default_class = LateBindingProperty(_get_default_class,None,None,2233            "Default security class.")2234    flags = LateBindingProperty(_get_flags,None,None,"Internal flags.")2235    primary_key = LateBindingProperty(_get_primary_key,None,None,2236            "PRIMARY KEY :class:`Constraint` for this table or None.")2237    foreign_keys = LateBindingProperty(_get_foreign_keys,None,None,2238            "List of FOREIGN KEY :class:`Constraint` instances for this table.")2239    columns = LateBindingProperty(_get_columns,None,None,2240        "Returns list of columns defined for table.\nItems are :class:`TableColumn` objects.")2241    constraints = LateBindingProperty(_get_constraints,None,None,2242        "Returns list of constraints defined for table.\nItems are :class:`Constraint` objects.")2243    indices = LateBindingProperty(_get_indices,None,None,2244        "Returns list of indices defined for table.\nItems are :class:`Index` objects.")2245    triggers = LateBindingProperty(_get_triggers,None,None,2246        "Returns list of triggers defined for table.\nItems are :class:`Trigger` objects.")2247    privileges = LateBindingProperty(_get_privileges,None,None,2248        "List of :class:`Privilege` objects granted to this object.")2249    #--- Public2250    2251    def accept_visitor(self,visitor):2252        """Visitor Pattern support. Calls `visitTable(self)` on parameter object.2253        2254        :param visitor: Visitor object of Vistior Pattern.2255        """2256        visitor.visitTable(self)2257    def get_column(self,name):2258        "Return :class:`TableColumn` object with specified name."2259        for col in self.columns:2260            if col.name == name:2261                return col2262        return None2263    def isgtt(self):2264        "Returns True if table is GLOBAL TEMPORARY table."2265        return self.table_type.startswith('GLOBAL_TEMPORARY')2266    def ispersistent(self):2267        "Returns True if table is persistent one."2268        return self.table_type in ['PERSISTENT','EXTERNAL']2269    def isexternal(self):2270        "Returns True if table is external table."2271        return bool(self.external_file)2272    def has_pkey(self):2273        "Returns True if table has PRIMARY KEY defined."2274        for const in self.constraints:2275            if const.ispkey():2276                return True2277        return False2278    def has_fkey(self):2279        "Returns True if table has any FOREIGN KEY constraint."2280        for const in self.constraints:2281            if const.isfkey():2282                return True2283        return False2284    2285class View(BaseSchemaItem):2286    """Represents database View.2287    Supported SQL actions: 2288    2289    - User views: create, recreate, alter(columns=string_or_list,query=string,check=bool), 2290      create_or_alter, drop2291    - System views: none2292    """2293    def __init__(self,schema,attributes):2294        super(View,self).__init__(schema,attributes)2295        self._type_code = [1,]2296        self.__columns = None2297        self._strip_attribute('RDB$RELATION_NAME')2298        self._strip_attribute('RDB$VIEW_SOURCE')2299        self._strip_attribute('RDB$OWNER_NAME')2300        self._strip_attribute('RDB$SECURITY_CLASS')2301        self._strip_attribute('RDB$DEFAULT_CLASS')2302        if not self.issystemobject():2303            self._actions = ['create','recreate','alter','create_or_alter','drop']2304            2305    #--- Protected2306    def _get_create_sql(self,**params):2307        self._check_params(params,[])2308        return "CREATE VIEW %s (%s)\n   AS\n     %s" % (self.get_quoted_name(),2309                        ','.join([col.get_quoted_name() for col in self.columns]),self.sql)2310    def _get_alter_sql(self,**params):2311        self._check_params(params,['columns','query','check'])2312        columns = params.get('columns')2313        if isinstance(columns,(list,tuple)):2314            columns = ','.join(columns)2315        query = params.get('query')2316        check = params.get('check',False)2317        if query:2318            return "ALTER VIEW %s %s\n   AS\n     %s" % (self.get_quoted_name(),2319                    '(%s)' % columns if columns else '',2320                    '%s\n     WITH CHECK OPTION' % query if check else query)2321        else:2322            raise fdb_embedded.ProgrammingError("Missing required parameter: 'query'.")2323    def _get_drop_sql(self,**params):2324        self._check_params(params,[])2325        return 'DROP VIEW %s' % self.get_quoted_name()2326    def _get_name(self):2327        return self._attributes['RDB$RELATION_NAME']2328    def _get_sql(self):2329        return self._attributes['RDB$VIEW_SOURCE']2330    def _get_id(self):2331        return self._attributes['RDB$RELATION_ID']2332    def _get_dbkey_length(self):2333        return self._attributes['RDB$DBKEY_LENGTH']2334    def _get_format(self):2335        return self._attributes['RDB$FORMAT']2336    def _get_security_class(self):2337        return self._attributes['RDB$SECURITY_CLASS']2338    def _get_owner_name(self):2339        return self._attributes['RDB$OWNER_NAME']2340    def _get_default_class(self):2341        return self._attributes['RDB$DEFAULT_CLASS']2342    def _get_flags(self):2343        return self._attributes['RDB$FLAGS']2344    def _get_triggers(self):2345        return [t for t in self.schema.triggers 2346                if t._attributes['RDB$RELATION_NAME'] == self.name]2347    def _get_columns(self):2348        if self.__columns is None:2349            self.__columns = [ViewColumn(self.schema,self,row) for row in 2350                self.schema._select("""select r.RDB$FIELD_NAME, r.RDB$RELATION_NAME, 2351r.RDB$FIELD_SOURCE, r.RDB$FIELD_POSITION, r.RDB$UPDATE_FLAG, r.RDB$FIELD_ID, 2352r.RDB$DESCRIPTION, r.RDB$SYSTEM_FLAG, r.RDB$SECURITY_CLASS, r.RDB$NULL_FLAG, 2353r.RDB$DEFAULT_SOURCE, r.RDB$COLLATION_ID, r.RDB$BASE_FIELD, 2354v.RDB$RELATION_NAME as BASE_RELATION 2355    from RDB$RELATION_FIELDS r2356    left join RDB$VIEW_RELATIONS v on r.RDB$VIEW_CONTEXT = v.RDB$VIEW_CONTEXT2357    where r.RDB$RELATION_NAME = ? 2358    order by RDB$FIELD_POSITION""",(self.name,))]2359        return self.__columns2360    def _get_privileges(self):2361        return [p for p in self.schema.privileges2362                if ((p.subject_name == self.name) and 2363                    (p.subject_type == 0))] # Views are logged as Tables in RDB$USER_PRIVILEGES2364    #--- Properties2365    id = LateBindingProperty(_get_id,None,None,"Internal number ID for the view.")2366    sql= LateBindingProperty(_get_sql,None,None,"The query specification.")2367    dbkey_length = LateBindingProperty(_get_dbkey_length,None,None,2368        "Length of the RDB$DB_KEY column in bytes.")2369    format = LateBindingProperty(_get_format,None,None,"Internal format ID for the view.")2370    security_class = LateBindingProperty(_get_security_class,None,None,2371        "Security class that define access limits to the view.")2372    owner_name = LateBindingProperty(_get_owner_name,None,None,"User name of view's creator.")2373    default_class = LateBindingProperty(_get_default_class,None,None,"Default security class.")2374    flags = LateBindingProperty(_get_flags,None,None,"Internal flags.")2375    columns = LateBindingProperty(_get_columns,None,None,2376        "Returns list of columns defined for view.\nItems are :class:`ViewColumn` objects.")2377    triggers = LateBindingProperty(_get_triggers,None,None,2378        "Returns list of triggers defined for view.\nItems are :class:`Trigger` objects.")2379    privileges = LateBindingProperty(_get_privileges,None,None,2380        "List of :class:`Privilege` objects granted to this object.")2381    #--- Public2382    2383    def accept_visitor(self,visitor):2384        """Visitor Pattern support. Calls `visitView(self)` on parameter object.2385        2386        :param visitor: Visitor object of Vistior Pattern.2387        """2388        visitor.visitView(self)2389    def get_column(self,name):2390        "Return :class:`TableColumn` object with specified name."2391        for col in self.columns:2392            if col.name == name:2393                return col2394        return None2395    def get_trigger(self,name):2396        "Return :class:`Trigger` object with specified name."2397        for t in self.triggers:2398            if t.name == name:2399                return t2400        return None2401    def has_checkoption(self):2402        "Returns True if View has WITH CHECK OPTION defined."2403        return "WITH CHECK OPTION" in self.sql.upper()2404    2405class Trigger(BaseSchemaItem):2406    """Represents trigger.2407    Supported SQL actions: 2408    2409    - User trigger: create, recreate, create_or_alter, drop,2410      alter(fire_on=string,active=bool,sequence=int,declare=string_or_list,2411      code=string_or_list)2412    - System trigger: none2413    """2414    def __init__(self,schema,attributes):2415        super(Trigger,self).__init__(schema,attributes)2416        self._type_code = [2,]2417        self._strip_attribute('RDB$TRIGGER_NAME')2418        self._strip_attribute('RDB$RELATION_NAME')2419        if not self.issystemobject():2420            self._actions = ['create','recreate','alter','create_or_alter','drop']2421    #--- Protected2422    def _get_create_sql(self,**params):2423        self._check_params(params,[])2424        result = 'CREATE TRIGGER %s' % self.get_quoted_name()2425        if self._attributes['RDB$RELATION_NAME']:2426            result += ' FOR %s' % self.relation.get_quoted_name()2427        result += ' %s\n%s POSITION %d\n%s' % ('ACTIVE' if self.isactive() else 'INACTIVE',2428                                               self.get_type_as_string(),2429                                               self.sequence,self.source)2430        return result2431    def _get_alter_sql(self,**params):2432        self._check_params(params,['fire_on','active','sequence','declare','code'])2433        action = params.get('fire_on')2434        active = params.get('active')2435        sequence = params.get('sequence')2436        declare = params.get('declare')2437        code = params.get('code')2438        #2439        header = ''2440        if active is not None:2441            header += ' ACTIVE' if active else ' INACTIVE'2442        if action is not None:2443            dbaction = action.upper().startswith('ON ')2444            if ((dbaction and not self.isdbtrigger()) 2445                or (not dbaction and self.isdbtrigger())):2446                raise fdb_embedded.ProgrammingError("Trigger type change is not allowed.")2447            header += '\n  %s' % action2448        if sequence is not None:2449            header += '\n  POSITION %d' % sequence2450        #2451        if code is not None:2452            if declare is None:2453                d = ''2454            elif isinstance(declare,(list,tuple)):2455                d = ''2456                for x in declare:2457                    d += '  %s\n' % x2458            else:2459                d = '%s\n' % declare2460            if isinstance(code,(list,tuple)):2461                c = ''2462                for x in code:2463                    c += '  %s\n' % x2464            else:2465                c = '%s\n' % code2466            body = '\nAS\n%sBEGIN\n%sEND' % (d,c)2467        else:2468            body = ''2469        #2470        if not (header or body):2471            raise fdb_embedded.ProgrammingError("Header or body definition required.")2472        return 'ALTER TRIGGER %s%s%s' % (self.get_quoted_name(),header,body)2473    def _get_drop_sql(self,**params):2474        self._check_params(params,[])2475        return 'DROP TRIGGER %s' % self.get_quoted_name()2476    def _get_action_time(self):2477        return (self.trigger_type + 1) & 12478    def _get_action_type(self,slot):2479        return ((self.trigger_type + 1) >> (slot * 2 - 1)) & 32480    def _get_name(self):2481        return self._attributes['RDB$TRIGGER_NAME']2482    def _get_relation(self):2483        relname = self._attributes['RDB$RELATION_NAME']2484        rel = self.schema.get_table(relname)2485        if not rel:2486            rel = self.schema.get_view(relname)2487        return rel2488    def _get_sequence(self):2489        return self._attributes['RDB$TRIGGER_SEQUENCE']2490    def _get_trigger_type(self):2491        return self._attributes['RDB$TRIGGER_TYPE']2492    def _get_source(self):2493        return self._attributes['RDB$TRIGGER_SOURCE']2494    def _get_flags(self):2495        return self._attributes['RDB$FLAGS']2496    def _istype(self,type_code):2497        atype = self._get_action_type(1)2498        if atype == type_code:2499            return True2500        atype = self._get_action_type(2)2501        if atype and atype == type_code:2502            return True2503        atype = self._get_action_type(3)2504        if atype and atype == type_code:2505            return True2506        return False2507    #--- Properties2508    relation = LateBindingProperty(_get_relation,None,None,2509        ":class:`Table` or :class:`View` that the trigger is for, or None for database triggers")2510    sequence = LateBindingProperty(_get_sequence,None,None,2511        "Sequence (position) of trigger. Zero usually means no sequence defined.")2512    trigger_type = LateBindingProperty(_get_trigger_type,None,None,2513        "Numeric code for trigger type that define what event and when are covered by trigger.")2514    source = LateBindingProperty(_get_source,None,None,"PSQL source code.")2515    flags = LateBindingProperty(_get_flags,None,None,"Internal flags.")2516    #--- Public2517    2518    def accept_visitor(self,visitor):2519        """Visitor Pattern support. Calls `visitTrigger(self)` on parameter object.2520        2521        :param visitor: Visitor object of Vistior Pattern.2522        """2523        visitor.visitTrigger(self)2524    def isactive(self):2525        "Returns True if this trigger is active."2526        return self._attributes['RDB$TRIGGER_INACTIVE'] == 02527    def isbefore(self):2528        "Returns True if this trigger is set for BEFORE action."2529        return self._get_action_time() == 02530    def isafter(self):2531        "Returns True if this trigger is set for AFTER action."2532        return self._get_action_time() == 12533    def isdbtrigger(self):2534        "Returns True if this trigger is database trigger."2535        return (self.trigger_type & TRIGGER_TYPE_MASK) == TRIGGER_TYPE_DB2536    def isinsert(self):2537        "Returns True if this trigger is set for INSERT operation."2538        return self._istype(1)2539    def isupdate(self):2540        "Returns True if this trigger is set for UPDATE operation."2541        return self._istype(2)2542    def isdelete(self):2543        "Returns True if this trigger is set for DELETE operation."2544        return self._istype(3)2545    def get_type_as_string(self):2546        "Return string with action and operation specification."2547        l = []2548        if self.isdbtrigger():2549            l.append('ON '+TRIGGER_DB_TYPES[self.trigger_type & ~TRIGGER_TYPE_DB])2550        else:2551            l.append(TRIGGER_PREFIX_TYPES[self._get_action_time()])2552            l.append(TRIGGER_SUFFIX_TYPES[self._get_action_type(1)])2553            sufix = self._get_action_type(2)2554            if sufix:2555                l.append('OR')2556                l.append(TRIGGER_SUFFIX_TYPES[sufix])2557            sufix = self._get_action_type(3)2558            if sufix:2559                l.append('OR')2560                l.append(TRIGGER_SUFFIX_TYPES[sufix])2561        return ' '.join(l)2562class ProcedureParameter(BaseSchemaItem):2563    """Represents procedure parameter.2564    Supported SQL actions: none.2565    """2566    def __init__(self,schema,proc,attributes):2567        super(ProcedureParameter,self).__init__(schema,attributes)2568        self.__proc = proc2569        self._strip_attribute('RDB$PARAMETER_NAME')2570        self._strip_attribute('RDB$PROCEDURE_NAME')2571        self._strip_attribute('RDB$FIELD_SOURCE')2572        self._strip_attribute('RDB$RELATION_NAME')2573        self._strip_attribute('RDB$FIELD_NAME')2574    #--- Protected2575    def _get_name(self):2576        return self._attributes['RDB$PARAMETER_NAME']2577    def _get_procedure(self):2578        return self.schema.get_procedure(self._attributes['RDB$PROCEDURE_NAME'])2579    def _get_sequence(self):2580        return self._attributes['RDB$PARAMETER_NUMBER']2581    def _get_domain(self):2582        return self.schema.get_domain(self._attributes['RDB$FIELD_SOURCE'])2583    def _get_datatype(self):2584        return self.domain.datatype2585    def _get_type_from(self):2586        m = self.mechanism2587        if m is None:2588            return PROCPAR_DATATYPE2589        elif m == 0:2590            return PROCPAR_DATATYPE if self.domain.issystemobject() else PROCPAR_DOMAIN2591        elif m == 1:2592            if self._attributes.get('RDB$RELATION_NAME') is None:2593                return PROCPAR_TYPE_OF_DOMAIN2594            else:2595                return PROCPAR_TYPE_OF_COLUMN2596        else:2597            raise fdb_embedded.InternalError("Unknown parameter mechanism code: %d" % m)2598    def _get_default(self):2599        result = self._attributes.get('RDB$DEFAULT_SOURCE')2600        if result:2601            if result.upper().startswith('= '):2602                result = result[2:]2603            elif result.upper().startswith('DEFAULT '):2604                result = result[8:]2605        return result2606    def _get_collation(self):2607        cid = self._attributes.get('RDB$COLLATION_ID')2608        return (None if cid is None 2609                else self.schema.get_collation_by_id(self.domain._attributes['RDB$CHARACTER_SET_ID'],cid))2610    def _get_mechanism(self):2611        return self._attributes.get('RDB$PARAMETER_MECHANISM')2612    def _get_column(self):2613        rname = self._attributes.get('RDB$RELATION_NAME')2614        return (None if rname is None 2615                else self.schema.get_table(rname).get_column(self._attributes['RDB$FIELD_NAME']))2616    #--- Properties2617    procedure = LateBindingProperty(_get_procedure,None,None,2618                                "Name of the stored procedure.")2619    sequence = LateBindingProperty(_get_sequence,None,None,2620                                "Sequence (position) of parameter.")2621    domain = LateBindingProperty(_get_domain,None,None,2622                                ":class:`Domain` for this parameter.")2623    datatype = LateBindingProperty(_get_datatype,None,None,2624                                "Comlete SQL datatype definition.")2625    type_from = LateBindingProperty(_get_type_from,None,None,2626                                "Numeric code. See :attr:`Schema.enum_param_type_from`.`")2627    2628    # FB 2.12629    default = LateBindingProperty(_get_default,None,None,"Default value.")2630    collation = LateBindingProperty(_get_collation,None,None,2631                                ":class:`collation` for this parameter.")2632    mechanism = LateBindingProperty(_get_mechanism,None,None,2633                                "Parameter mechanism code.")2634    # FB 2.52635    column = LateBindingProperty(_get_column,None,None,2636                                ":class:`TableColumn` for this parameter.")2637    #--- Public2638    2639    def accept_visitor(self,visitor):2640        """Visitor Pattern support. Calls `visitProcedureParameter(self)` on parameter object.2641        2642        :param visitor: Visitor object of Vistior Pattern.2643        """2644        visitor.visitProcedureParameter(self)2645    def get_sql_definition(self):2646        "Returns SQL definition for parameter."2647        typedef = self.datatype2648        if self.type_from == PROCPAR_DOMAIN:2649            typedef = self.domain.get_quoted_name()2650        elif self.type_from == PROCPAR_TYPE_OF_DOMAIN:2651            typedef = 'TYPE OF %s' % self.domain.get_quoted_name()2652        elif self.type_from == PROCPAR_TYPE_OF_COLUMN:2653            typedef = 'TYPE OF COLUMN %s.%s' % (self.column.table.get_quoted_name(),2654                                                self.column.get_quoted_name())2655        result = '%s %s%s' % (self.get_quoted_name(),typedef,2656                               '' if self.isnullable() else ' NOT NULL')2657        c = self.collation2658        if c is not None:2659            result += ' COLLATE %s' % c.get_quoted_name()2660        if self.isinput() and self.has_default():2661            result += ' = %s' % self.default2662        return result2663    def isinput(self):2664        "Returns True if parameter is INPUT parameter."2665        return self._attributes['RDB$PARAMETER_TYPE'] == 02666    def isnullable(self):2667        "Returns True if parameter allows NULL."2668        return not bool(self._attributes.get('RDB$NULL_FLAG'))2669    def has_default(self):2670        "Returns True if parameter has default value."2671        return bool(self._attributes.get('RDB$DEFAULT_SOURCE'))2672    2673class Procedure(BaseSchemaItem):2674    """Represents stored procedure.2675    Supported SQL actions: 2676    2677    - User procedure: create(no_code=bool), recreate(no_code=bool), 2678      create_or_alter(no_code=bool), drop,2679      alter(input=string_or_list,output=string_or_list,declare=string_or_list,2680      code=string_or_list)2681    - System procedure: none2682    """2683    def __init__(self,schema,attributes):2684        super(Procedure,self).__init__(schema,attributes)2685        self._type_code = [5,]2686        self.__inputParams = self.__outputParams = None2687        2688        self._strip_attribute('RDB$PROCEDURE_NAME')2689        self._strip_attribute('RDB$OWNER_NAME')2690        self._strip_attribute('RDB$SECURITY_CLASS')2691        self.__ods = schema._con.ods2692        if not self.issystemobject():2693            self._actions = ['create','recreate','alter','create_or_alter','drop']2694    #--- Protected2695    def _get_create_sql(self,**params):2696        self._check_params(params,['no_code'])2697        no_code = params.get('no_code')2698        result = 'CREATE PROCEDURE %s' % self.get_quoted_name()2699        if self.has_input():2700            if self._attributes['RDB$PROCEDURE_INPUTS'] == 1:2701                result += ' (%s)\n' % self.input_params[0].get_sql_definition()2702            else:2703                result += ' (\n'2704                for p in self.input_params:2705                    result += '  %s%s\n' % (p.get_sql_definition(),2706                                            '' if p.sequence+1 == self._attributes['RDB$PROCEDURE_INPUTS']2707                                            else ',')2708                result += ')\n'2709        else:2710            result += '\n'2711        if self.has_output():2712            if self._attributes['RDB$PROCEDURE_OUTPUTS'] == 1:2713                result += 'RETURNS (%s)\n' % self.output_params[0].get_sql_definition()2714            else:2715                result += 'RETURNS (\n'2716                for p in self.input_params:2717                    result += '  %s%s\n' % (p.get_sql_definition(),2718                                            '' if p.sequence+1 == self._attributes['RDB$PROCEDURE_OUTPUTS']2719                                            else ',')2720                result += ')\n'2721        return result+'AS\n'+('BEGIN\nEND' if no_code else self.source)2722    def _get_alter_sql(self,**params):2723        self._check_params(params,['input','output','declare','code'])2724        inpars = params.get('input')2725        outpars = params.get('output')2726        declare = params.get('declare')2727        code = params.get('code')2728        if code is None:2729            raise fdb_embedded.ProgrammingError("Missing required parameter: 'code'.")2730        #2731        header = ''2732        if inpars is not None:2733            if isinstance(inpars,(list,tuple)):2734                numpars = len(inpars)2735                if numpars == 1:2736                    header = ' (%s)\n' % inpars2737                else:2738                    header = ' (\n'2739                    i = 12740                    for p in inpars:2741                        header += '  %s%s\n' % (p,'' if i == numpars else ',')2742                        i += 12743                    header += ')\n'2744            else:2745                header = ' (%s)\n' % inpars2746        #2747        if outpars is not None:2748            if not header:2749                header += '\n'2750            if isinstance(outpars,(list,tuple)):2751                numpars = len(outpars)2752                if numpars == 1:2753                    header += 'RETURNS (%s)\n' % outpars2754                else:2755                    header += 'RETURNS (\n'2756                    i = 12757                    for p in outpars:2758                        header += '  %s%s\n' % (p,'' if i == numpars else ',')2759                        i += 12760                    header += ')\n'2761            else:2762                header += 'RETURNS (%s)\n' % outpars2763        #2764        if code:2765            if declare is None:2766                d = ''2767            elif isinstance(declare,(list,tuple)):2768                d = ''2769                for x in declare:2770                    d += '  %s\n' % x2771            else:2772                d = '%s\n' % declare2773            if isinstance(code,(list,tuple)):2774                c = ''2775                for x in code:2776                    c += '  %s\n' % x2777            else:2778                c = '%s\n' % code2779            body = '%sAS\n%sBEGIN\n%sEND' % ('' if header else '\n',d,c)2780        else:2781            body = '%sAS\nBEGIN\nEND' % ('' if header else '\n')2782        #2783        return 'ALTER PROCEDURE %s%s%s' % (self.get_quoted_name(),header,body)2784    def _get_drop_sql(self,**params):2785        self._check_params(params,[])2786        return 'DROP PROCEDURE %s' % self.get_quoted_name()2787    def __param_columns(self):2788        cols = ['RDB$PARAMETER_NAME','RDB$PROCEDURE_NAME','RDB$PARAMETER_NUMBER',2789                'RDB$PARAMETER_TYPE','RDB$FIELD_SOURCE','RDB$DESCRIPTION', 2790                'RDB$SYSTEM_FLAG']2791        if self.__ods >= fdb_embedded.ODS_FB_21:2792            cols.extend(['RDB$DEFAULT_SOURCE','RDB$COLLATION_ID','RDB$NULL_FLAG',2793                         'RDB$PARAMETER_MECHANISM'])2794        if self.__ods >= fdb_embedded.ODS_FB_25:2795            cols.extend(['RDB$FIELD_NAME','RDB$RELATION_NAME'])2796        return ','.join(cols)2797    def _get_name(self):2798        return self._attributes['RDB$PROCEDURE_NAME']2799    def _get_id(self):2800        return self._attributes['RDB$PROCEDURE_ID']2801    def _get_source(self):2802        return self._attributes['RDB$PROCEDURE_SOURCE']2803    def _get_security_class(self):2804        return self._attributes['RDB$SECURITY_CLASS']2805    def _get_owner_name(self):2806        return self._attributes['RDB$OWNER_NAME']2807    def _get_input_params(self):2808        if self.__inputParams is None:2809            if self.has_input():2810                self.__inputParams = [ProcedureParameter(self.schema,self,row) for row in 2811                    self.schema._select("""select %s from rdb$procedure_parameters 2812where rdb$procedure_name = ? 2813and rdb$parameter_type = 0 2814order by rdb$parameter_number""" % self.__param_columns(),(self.name,))]2815            else:2816                self.__inputParams = []2817        return self.__inputParams2818    def _get_output_params(self):2819        if self.__outputParams is None:2820            if self.has_output():2821                self.__outputParams = [ProcedureParameter(self.schema,self,row) for row in 2822                    self.schema._select("""select %s from rdb$procedure_parameters 2823where rdb$procedure_name = ? 2824and rdb$parameter_type = 1 2825order by rdb$parameter_number""" % self.__param_columns(),(self.name,))]2826            else:2827                self.__outputParams = []2828        return self.__outputParams2829    def _get_proc_type(self):2830        return self._attributes.get('RDB$PROCEDURE_TYPE',0)2831    def _get_valid_blr(self):2832        result = self._attributes.get('RDB$VALID_BLR')2833        return bool(result) if result is not None else None2834    def _get_privileges(self):2835        return [p for p in self.schema.privileges2836                if ((p.subject_name == self.name) and 2837                    (p.subject_type in self._type_code))]2838    2839    #--- Properties2840    id = LateBindingProperty(_get_id,None,None,"Internal unique ID number.")2841    source = LateBindingProperty(_get_source,None,None,"PSQL source code.")2842    security_class  = LateBindingProperty(_get_security_class,None,None,2843        "Security class that define access limits to the procedure.")2844    owner_name = LateBindingProperty(_get_owner_name,None,None,2845        "User name of procedure's creator.")2846    input_params = LateBindingProperty(_get_input_params,None,None,2847        "List of input parameters.\nInstances are :class:`ProcedureParameter` instances.")2848    output_params = LateBindingProperty(_get_output_params,None,None,2849        "List of output parameters.\nInstances are :class:`ProcedureParameter` instances.")2850    privileges = LateBindingProperty(_get_privileges,None,None,2851        "List of :class:`Privilege` objects granted to this object.")2852    # FB 2.12853    proc_type = LateBindingProperty(_get_proc_type,None,None,2854        "Procedure type code. See :attr:`fdb_embedded.Connection.enum_procedure_types`.")2855    valid_blr = LateBindingProperty(_get_valid_blr,None,None,2856        "Procedure BLR invalidation flag. Coul be True/False or None.")2857    2858    #--- Public2859    2860    def accept_visitor(self,visitor):2861        """Visitor Pattern support. Calls `visitProcedure(self)` on parameter object.2862        2863        :param visitor: Visitor object of Vistior Pattern.2864        """2865        visitor.visitProcedure(self)2866    def get_param(self,name):2867        "Returns :class:`ProcedureParameter` with specified name or None"2868        for p in self.output_params:2869            if p.name == name:2870                return p2871        for p in self.input_params:2872            if p.name == name:2873                return p2874        return None2875    def has_input(self):2876        "Returns True if procedure has any input parameters."2877        return bool(self._attributes['RDB$PROCEDURE_INPUTS'])2878    def has_output(self):2879        "Returns True if procedure has any output parameters."2880        return bool(self._attributes['RDB$PROCEDURE_OUTPUTS'])2881class Role(BaseSchemaItem):2882    """Represents user role.2883    Supported SQL actions: 2884    2885    - User role: create, drop2886    - System role: none2887    """2888    def __init__(self,schema,attributes):2889        super(Role,self).__init__(schema,attributes)2890        self._type_code = [13,]2891        self._strip_attribute('RDB$ROLE_NAME')2892        self._strip_attribute('RDB$OWNER_NAME')2893        if not self.issystemobject():2894            self._actions = ['create','drop']2895    #--- Protected2896    def _get_create_sql(self,**params):2897        self._check_params(params,[])2898        return 'CREATE ROLE %s' % self.get_quoted_name()2899    def _get_drop_sql(self,**params):2900        self._check_params(params,[])2901        return 'DROP ROLE %s' % self.get_quoted_name()2902    def _get_name(self):2903        return self._attributes['RDB$ROLE_NAME']2904    def _get_owner_name(self):2905        return self._attributes['RDB$OWNER_NAME']2906    def _get_privileges(self):2907        return [p for p in self.schema.privileges2908                if ((p.subject_name == self.name) and 2909                    (p.subject_type in self._type_code))]2910    #--- Properties2911    owner_name = LateBindingProperty(_get_owner_name,None,None,"User name of role owner.")2912    privileges = LateBindingProperty(_get_privileges,None,None,2913        "List of :class:`Privilege` objects granted to this object.")2914    #--- Public2915    2916    def accept_visitor(self,visitor):2917        """Visitor Pattern support. Calls `visitRole(self)` on parameter object.2918        2919        :param visitor: Visitor object of Vistior Pattern.2920        """2921        visitor.visitRole(self)2922class FunctionArgument(BaseSchemaItem):2923    """Represets UDF argument.2924    Supported SQL actions: none.2925    """2926    def __init__(self,schema,function,attributes):2927        super(FunctionArgument,self).__init__(schema,attributes)2928        self._type_code = [15,]2929        self.__function = function2930        self._strip_attribute('RDB$FUNCTION_NAME')2931    #--- Protected2932    def _get_name(self):2933        return self._attributes['RDB$FUNCTION_NAME']+'_'+str(self._get_position())2934    def _get_function(self):2935        return self.__function2936        #return self.schema.get_function(self._attributes['RDB$FUNCTION_NAME'])2937    def _get_position(self):2938        return self._attributes['RDB$ARGUMENT_POSITION']2939    def _get_mechanism(self):2940        return abs(self._attributes['RDB$MECHANISM'])2941    def _get_length(self):2942        return self._attributes['RDB$FIELD_LENGTH']2943    def _get_scale(self):2944        return self._attributes['RDB$FIELD_SCALE']2945    def _get_field_type(self):2946        return self._attributes['RDB$FIELD_TYPE']2947    def _get_sub_type(self):2948        return self._attributes['RDB$FIELD_SUB_TYPE']2949    def _get_character_length(self):2950        return self._attributes['RDB$CHARACTER_LENGTH']2951    def _get_character_set(self):2952        return self.schema.get_character_set_by_id(self._attributes['RDB$CHARACTER_SET_ID'])2953    def _get_precision(self):2954        return self._attributes['RDB$FIELD_PRECISION']2955    def _get_datatype(self):2956        l = []2957        precision_known = False2958        if self.field_type in (FBT_SMALLINT,FBT_INTEGER,FBT_BIGINT):2959            if self.precision != None:2960                if (self.sub_type > 0) and (self.sub_type < MAX_INTSUBTYPES):2961                    l.append('%s(%d, %d)' % \...bilibili.py
Source:bilibili.py  
...198# å ID è§£æå¨çåç±»åºäº https://github.com/mengshouer/nonebot_plugin_analysis_bilibili ç c79dbe9 æäº¤çæ¬199class BaseID(abc.ABC):200    def __init__(self, text: str):201        search_result: Optional[Match[str]] = re.compile(self._get_pattern(), re.I).search(text)202        self.subtype: str = self._get_sub_type()203        self.suburl: Optional[str] = self._get_api_url(search_result) if search_result else None204    @abc.abstractmethod205    def _get_pattern(self) -> str:206        """è¿åä¸ä¸ªæ£å表达å¼ï¼ç¨äºä»ç¨æ·æ¶æ¯ä¸å¹é
åºåªä½id"""207        raise NotImplementedError208    @abc.abstractmethod209    def _get_sub_type(self) -> str:210        """è¿å该idçåªä½ç±»åï¼ï¼"""211        raise NotImplementedError212    @abc.abstractmethod213    def _get_api_url(self, search_result: Match[str]) -> str:214        """æ ¹æ®search_resultè¿åä¸ä¸ªè¯¥åªä½å¯¹åºçapiå°å"""215        raise NotImplementedError216class BVID(BaseID):217    def _get_pattern(self): return r'BV([A-Za-z0-9]{10})+'218    def _get_sub_type(self): return 'video'219    def _get_api_url(self, search_result): return f'https://api.bilibili.com/x/web-interface/view?bvid={search_result[0]}'220class AID(BaseID):221    def _get_pattern(self): return r'av\d+'222    def _get_sub_type(self): return 'video'223    def _get_api_url(self, search_result): return f'https://api.bilibili.com/x/web-interface/view?aid={search_result[0][2:]}'224class EPID(BaseID):225    def _get_pattern(self): return r'ep\d+'226    def _get_sub_type(self): return 'bangumi'227    def _get_api_url(self, search_result): return f'https://bangumi.bilibili.com/view/web_api/season?ep_id={search_result[0][2:]}'228class SeasonID(BaseID):229    def _get_pattern(self): return r'ss\d+'230    def _get_sub_type(self): return 'bangumi'231    def _get_api_url(self, search_result): return f'https://bangumi.bilibili.com/view/web_api/season?season_id={search_result[0][2:]}'232class MediaID(BaseID):233    def _get_pattern(self): return r'md\d+'234    def _get_sub_type(self): return 'bangumi'235    def _get_api_url(self, search_result): return f'https://bangumi.bilibili.com/view/web_api/season?media_id={search_result[0][2:]}'236class RoomID(BaseID):237    def _get_pattern(self): return r'live.bilibili.com/(blanc/|h5/)?(\d+)'238    def _get_sub_type(self): return 'live'239    def _get_api_url(self, search_result): return f'https://api.live.bilibili.com/xlive/web-room/v1/index/getInfoByRoom?room_id={search_result[2]}'240class CVID(BaseID):241    def _get_pattern(self): return r'(cv|/read/(mobile|native)(/|\?id=))(\d+)'242    def _get_sub_type(self): return 'article'243    def _get_api_url(self, search_result): return f'https://api.bilibili.com/x/article/viewinfo?id={search_result[4]}&mobi_app=pc&from=web'244class Dynamic2ID(BaseID):245    def _get_pattern(self): return r'([tm]).bilibili.com/(\d+)\?(.*?)(&|&)type=2'246    def _get_sub_type(self): return 'dynamic'247    def _get_api_url(self, search_result): return f'https://api.vc.bilibili.com/dynamic_svr/v1/dynamic_svr/get_dynamic_detail?rid={search_result[2]}&type=2'248class DynamicID(BaseID):249    def _get_pattern(self): return r'([tm]).bilibili.com/(\d+)'250    def _get_sub_type(self): return 'dynamic'251    def _get_api_url(self, search_result): return f'https://api.vc.bilibili.com/dynamic_svr/v1/dynamic_svr/get_dynamic_detail?dynamic_id={search_result[2]}'252class VideoResponse(BaseModel):253    class Owner(BaseModel):254        name: str255    class Stat(BaseModel):256        view: int257        danmaku: int258        reply: int259        favorite: int260        coin: int261        share: int262        like: int263        dislike: int264    pubdate: int...subclasses.py
Source:subclasses.py  
...51            f"registered: " f"[{', '.join([name for name in self.keys()])}]"52        )53    def load_typed_runbook(self, raw_runbook: Any) -> T_BASECLASS:54        type_name = raw_runbook[constants.TYPE]55        sub_type = self._get_sub_type(type_name)56        instance: Any = schema.load_by_type(sub_type, raw_runbook)57        if hasattr(instance, "extended_schemas"):58            if instance.extended_schemas:59                raise LisaException(60                    f"found unknown fields: {instance.extended_schemas}"61                )62        return cast(T_BASECLASS, instance)63    def create_by_type_name(self, type_name: str, **kwargs: Any) -> T_BASECLASS:64        sub_type = self._get_sub_type(type_name)65        return cast(T_BASECLASS, sub_type(**kwargs))66    def create_by_runbook(67        self, runbook: schema.TypedSchema, **kwargs: Any68    ) -> T_BASECLASS:69        sub_type = self._get_sub_type(runbook.type)70        sub_type_with_runbook = cast(Type[BaseClassWithRunbookMixin], sub_type)71        sub_object = sub_type_with_runbook.create_with_runbook(72            runbook=runbook, **kwargs73        )74        assert isinstance(75            sub_object, BaseClassWithRunbookMixin76        ), f"actual: {type(sub_object)}"77        return cast(T_BASECLASS, sub_object)78    def _get_subclasses(79        self, type: Type[BaseClassMixin]80    ) -> Iterable[Type[BaseClassMixin]]:81        # recursive loop subclasses of subclasses82        for subclass_type in type.__subclasses__():83            yield subclass_type84            yield from self._get_subclasses(subclass_type)85    def _get_sub_type(self, type_name: str) -> type:86        self.initialize()87        sub_type = self.get(type_name)88        if sub_type is None:89            raise LisaException(90                f"cannot find subclass '{type_name}' of {self._base_type.__name__}. "91                f"Supported types include: {list(self.keys())}. "92                f"Are you missing an import in 'mixin_modules.py' or an extension?"93            )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
