Best Python code snippet using dbt-osmosis_python
impl.py
Source:impl.py  
...60                type=relation_type61            )62            relations.append(relation)63        return relations64    def get_columns_in_relation(self, relation: Relation) -> List[MySQLColumn]:65        rows: List[agate.Row] = super().get_columns_in_relation(relation)66        return self.parse_show_columns(relation, rows)67    def _get_columns_for_catalog(68        self, relation: MySQLRelation69    ) -> Iterable[Dict[str, Any]]:70        columns = self.get_columns_in_relation(relation)71        for column in columns:72            # convert MySQLColumns into catalog dicts73            as_dict = column.to_dict()74            as_dict['column_name'] = as_dict.pop('column', None)75            as_dict['column_type'] = as_dict.pop('dtype')76            as_dict['table_database'] = None77            yield as_dict78    def get_relation(79        self, database: str, schema: str, identifier: str80    ) -> Optional[BaseRelation]:81        if not self.Relation.include_policy.database:82            database = None83        return super().get_relation(database, schema, identifier)84    def parse_show_columns(85            self,86            relation: Relation,87            raw_rows: List[agate.Row]88    ) -> List[MySQLColumn]:89        return [MySQLColumn(90            table_database=None,91            table_schema=relation.schema,92            table_name=relation.name,93            table_type=relation.type,94            table_owner=None,95            table_stats=None,96            column=column.column,97            column_index=idx,98            dtype=column.dtype,99        ) for idx, column in enumerate(raw_rows)]100    def get_catalog(self, manifest):101        schema_map = self._get_catalog_schemas(manifest)102        if len(schema_map) > 1:103            dbt.exceptions.raise_compiler_error(104                f'Expected only one database in get_catalog, found '105                f'{list(schema_map)}'106            )107        with executor(self.config) as tpe:108            futures: List[Future[agate.Table]] = []109            for info, schemas in schema_map.items():110                for schema in schemas:111                    futures.append(tpe.submit_connected(112                        self, schema,113                        self._get_one_catalog, info, [schema], manifest114                    ))115            catalogs, exceptions = catch_as_completed(futures)116        return catalogs, exceptions117    def _get_one_catalog(118        self, information_schema, schemas, manifest,119    ) -> agate.Table:120        if len(schemas) != 1:121            dbt.exceptions.raise_compiler_error(122                f'Expected only one schema in mysql _get_one_catalog, found '123                f'{schemas}'124            )125        database = information_schema.database126        schema = list(schemas)[0]127        columns: List[Dict[str, Any]] = []128        for relation in self.list_relations(database, schema):129            logger.debug("Getting table schema for relation {}", relation)130            columns.extend(self._get_columns_for_catalog(relation))131        return agate.Table.from_object(132            columns, column_types=DEFAULT_TYPE_TESTER133        )134    def check_schema_exists(self, database, schema):135        results = self.execute_macro(136            LIST_SCHEMAS_MACRO_NAME,137            kwargs={'database': database}138        )139        exists = True if schema in [row[0] for row in results] else False140        return exists141    # Methods used in adapter tests142    def update_column_sql(143        self,144        dst_name: str,145        dst_column: str,146        clause: str,147        where_clause: Optional[str] = None,148    ) -> str:149        clause = f'update {dst_name} set {dst_column} = {clause}'150        if where_clause is not None:151            clause += f' where {where_clause}'152        return clause153    def timestamp_add_sql(154        self, add_to: str, number: int = 1, interval: str = 'hour'155    ) -> str:156        # for backwards compatibility, we're compelled to set some sort of157        # default. A lot of searching has lead me to believe that the158        # '+ interval' syntax used in postgres/redshift is relatively common159        # and might even be the SQL standard's intention.160        return f"date_add({add_to}, interval {number} {interval})"161    def string_add_sql(162        self, add_to: str, value: str, location='append',163    ) -> str:164        if location == 'append':165            return f"concat({add_to}, '{value}')"166        elif location == 'prepend':167            return f"concat({value}, '{add_to}')"168        else:169            raise RuntimeException(170                f'Got an unexpected location value of "{location}"'171            )172    def get_rows_different_sql(173        self,174        relation_a: MySQLRelation,175        relation_b: MySQLRelation,176        column_names: Optional[List[str]] = None,177    ) -> str:178        # This method only really exists for test reasons179        names: List[str]180        if column_names is None:181            columns = self.get_columns_in_relation(relation_a)182            names = sorted((self.quote(c.name) for c in columns))183        else:184            names = sorted((self.quote(n) for n in column_names))185        alias_a = "A"186        alias_b = "B"187        columns_csv_a = ', '.join([f"{alias_a}.{name}" for name in names])188        columns_csv_b = ', '.join([f"{alias_b}.{name}" for name in names])189        join_condition = ' AND '.join([f"{alias_a}.{name} = {alias_b}.{name}" for name in names])190        first_column = names[0]191        # MySQL doesn't have an EXCEPT or MINUS operator, so we need to simulate it192        COLUMNS_EQUAL_SQL = '''193        SELECT194            row_count_diff.difference as row_count_difference,195            diff_count.num_missing as num_mismatched...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
