How to use get_columns_in_relation method in dbt-osmosis

Best Python code snippet using dbt-osmosis_python

impl.py

Source:impl.py Github

copy

Full Screen

...60 type=relation_type61 )62 relations.append(relation)63 return relations64 def get_columns_in_relation(self, relation: Relation) -> List[MySQLColumn]:65 rows: List[agate.Row] = super().get_columns_in_relation(relation)66 return self.parse_show_columns(relation, rows)67 def _get_columns_for_catalog(68 self, relation: MySQLRelation69 ) -> Iterable[Dict[str, Any]]:70 columns = self.get_columns_in_relation(relation)71 for column in columns:72 # convert MySQLColumns into catalog dicts73 as_dict = column.to_dict()74 as_dict['column_name'] = as_dict.pop('column', None)75 as_dict['column_type'] = as_dict.pop('dtype')76 as_dict['table_database'] = None77 yield as_dict78 def get_relation(79 self, database: str, schema: str, identifier: str80 ) -> Optional[BaseRelation]:81 if not self.Relation.include_policy.database:82 database = None83 return super().get_relation(database, schema, identifier)84 def parse_show_columns(85 self,86 relation: Relation,87 raw_rows: List[agate.Row]88 ) -> List[MySQLColumn]:89 return [MySQLColumn(90 table_database=None,91 table_schema=relation.schema,92 table_name=relation.name,93 table_type=relation.type,94 table_owner=None,95 table_stats=None,96 column=column.column,97 column_index=idx,98 dtype=column.dtype,99 ) for idx, column in enumerate(raw_rows)]100 def get_catalog(self, manifest):101 schema_map = self._get_catalog_schemas(manifest)102 if len(schema_map) > 1:103 dbt.exceptions.raise_compiler_error(104 f'Expected only one database in get_catalog, found '105 f'{list(schema_map)}'106 )107 with executor(self.config) as tpe:108 futures: List[Future[agate.Table]] = []109 for info, schemas in schema_map.items():110 for schema in schemas:111 futures.append(tpe.submit_connected(112 self, schema,113 self._get_one_catalog, info, [schema], manifest114 ))115 catalogs, exceptions = catch_as_completed(futures)116 return catalogs, exceptions117 def _get_one_catalog(118 self, information_schema, schemas, manifest,119 ) -> agate.Table:120 if len(schemas) != 1:121 dbt.exceptions.raise_compiler_error(122 f'Expected only one schema in mysql _get_one_catalog, found '123 f'{schemas}'124 )125 database = information_schema.database126 schema = list(schemas)[0]127 columns: List[Dict[str, Any]] = []128 for relation in self.list_relations(database, schema):129 logger.debug("Getting table schema for relation {}", relation)130 columns.extend(self._get_columns_for_catalog(relation))131 return agate.Table.from_object(132 columns, column_types=DEFAULT_TYPE_TESTER133 )134 def check_schema_exists(self, database, schema):135 results = self.execute_macro(136 LIST_SCHEMAS_MACRO_NAME,137 kwargs={'database': database}138 )139 exists = True if schema in [row[0] for row in results] else False140 return exists141 # Methods used in adapter tests142 def update_column_sql(143 self,144 dst_name: str,145 dst_column: str,146 clause: str,147 where_clause: Optional[str] = None,148 ) -> str:149 clause = f'update {dst_name} set {dst_column} = {clause}'150 if where_clause is not None:151 clause += f' where {where_clause}'152 return clause153 def timestamp_add_sql(154 self, add_to: str, number: int = 1, interval: str = 'hour'155 ) -> str:156 # for backwards compatibility, we're compelled to set some sort of157 # default. A lot of searching has lead me to believe that the158 # '+ interval' syntax used in postgres/redshift is relatively common159 # and might even be the SQL standard's intention.160 return f"date_add({add_to}, interval {number} {interval})"161 def string_add_sql(162 self, add_to: str, value: str, location='append',163 ) -> str:164 if location == 'append':165 return f"concat({add_to}, '{value}')"166 elif location == 'prepend':167 return f"concat({value}, '{add_to}')"168 else:169 raise RuntimeException(170 f'Got an unexpected location value of "{location}"'171 )172 def get_rows_different_sql(173 self,174 relation_a: MySQLRelation,175 relation_b: MySQLRelation,176 column_names: Optional[List[str]] = None,177 ) -> str:178 # This method only really exists for test reasons179 names: List[str]180 if column_names is None:181 columns = self.get_columns_in_relation(relation_a)182 names = sorted((self.quote(c.name) for c in columns))183 else:184 names = sorted((self.quote(n) for n in column_names))185 alias_a = "A"186 alias_b = "B"187 columns_csv_a = ', '.join([f"{alias_a}.{name}" for name in names])188 columns_csv_b = ', '.join([f"{alias_b}.{name}" for name in names])189 join_condition = ' AND '.join([f"{alias_a}.{name} = {alias_b}.{name}" for name in names])190 first_column = names[0]191 # MySQL doesn't have an EXCEPT or MINUS operator, so we need to simulate it192 COLUMNS_EQUAL_SQL = '''193 SELECT194 row_count_diff.difference as row_count_difference,195 diff_count.num_missing as num_mismatched...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run dbt-osmosis automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful