Best Python code snippet using dbt-osmosis_python
osmosis.py
Source:osmosis.py  
...1011        family_tree = self.build_node_ancestor_tree(node)1012        knowledge = self.inherit_column_level_knowledge(family_tree)1013        return knowledge1014    @staticmethod1015    def get_column_sets(1016        database_columns: Iterable[str],1017        yaml_columns: Iterable[str],1018        documented_columns: Iterable[str],1019    ) -> Tuple[List[str], List[str], List[str]]:1020        """Returns:1021        missing_columns: Columns in database not in dbt -- will be injected into schema file1022        undocumented_columns: Columns missing documentation -- descriptions will be inherited and injected into schema file where prior knowledge exists1023        extra_columns: Columns in schema file not in database -- will be removed from schema file1024        """1025        missing_columns = [1026            x for x in database_columns if x.lower() not in (y.lower() for y in yaml_columns)1027        ]1028        undocumented_columns = [1029            x for x in database_columns if x.lower() not in (y.lower() for y in documented_columns)1030        ]1031        extra_columns = [1032            x for x in yaml_columns if x.lower() not in (y.lower() for y in database_columns)1033        ]1034        return missing_columns, undocumented_columns, extra_columns1035    def propagate_documentation_downstream(self, force_inheritance: bool = False) -> None:1036        schema_map = self.build_schema_folder_mapping()1037        with self.adapter.connection_named("dbt-osmosis"):1038            for unique_id, node in track(list(self.filtered_models())):1039                logger().info("\n:point_right: Processing model: [bold]%s[/bold] \n", unique_id)1040                # Get schema file path, must exist to propagate documentation1041                schema_path: Optional[SchemaFileLocation] = schema_map.get(unique_id)1042                if schema_path is None or schema_path.current is None:1043                    logger().info(1044                        ":bow: No valid schema file found for model %s", unique_id1045                    )  # We can't take action1046                    continue1047                # Build Sets1048                database_columns: Set[str] = set(self.get_columns(node))1049                yaml_columns: Set[str] = set(column for column in node.columns)1050                if not database_columns:1051                    logger().info(1052                        ":safety_vest: Unable to resolve columns in database, falling back to using yaml columns as base column set\n"1053                    )1054                    database_columns = yaml_columns1055                # Get documentated columns1056                documented_columns: Set[str] = set(1057                    column1058                    for column, info in node.columns.items()1059                    if info.description and info.description not in self.placeholders1060                )1061                # Queue1062                missing_columns, undocumented_columns, extra_columns = self.get_column_sets(1063                    database_columns, yaml_columns, documented_columns1064                )1065                if force_inheritance:1066                    # Consider all columns "undocumented" so that inheritance is not selective1067                    undocumented_columns = database_columns1068                # Engage1069                n_cols_added = 01070                n_cols_doc_inherited = 01071                n_cols_removed = 01072                if len(missing_columns) > 0 or len(undocumented_columns) or len(extra_columns) > 0:1073                    schema_file = self.yaml_handler.load(schema_path.current)1074                    (1075                        n_cols_added,1076                        n_cols_doc_inherited,...join.py
Source:join.py  
...44    Represent a Node in the join tree data structure.45    Join nodes can be simple data sources, join expressions and queries.46    """47    @abstractmethod48    def get_column_sets(self) -> Mapping[str, ColumnSet]:49        raise NotImplementedError50    @abstractmethod51    def accept(self, visitor: JoinVisitor[TReturn, TSimpleDataSource]) -> TReturn:52        raise NotImplementedError53    @abstractmethod54    def get_alias_node_map(self) -> Mapping[str, IndividualNode[TSimpleDataSource]]:55        raise NotImplementedError56@dataclass(frozen=True)57class IndividualNode(JoinNode[TSimpleDataSource], Generic[TSimpleDataSource]):58    """59    Join node that represent an individual data source: an entity/table60    or a subquery.61    It also assign an alias to a node in a join expression.62    The alias is used in the join condition and in all the expressions63    that rely on the join in the query.64    """65    alias: str66    data_source: Union[TSimpleDataSource, ProcessableQuery[TSimpleDataSource]]67    def get_alias_node_map(self) -> Mapping[str, IndividualNode[TSimpleDataSource]]:68        return {self.alias: self}69    def get_column_sets(self) -> Mapping[str, ColumnSet]:70        return (71            {self.alias: self.data_source.get_columns()}72            if self.alias is not None73            else {}74        )75    def accept(self, visitor: JoinVisitor[TReturn, TSimpleDataSource]) -> TReturn:76        return visitor.visit_individual_node(self)77def entity_from_node(node: IndividualNode[Entity]) -> EntityKey:78    assert isinstance(node.data_source, Entity)79    return node.data_source.key80class JoinConditionExpression(NamedTuple):81    """82    Represent one qualified column [alias.column] in the83    ON clause within the join expression.84    """85    table_alias: str86    column: str87class JoinCondition(NamedTuple):88    """89    Represents a condition in the ON clause in the JOIN expression.90    """91    left: JoinConditionExpression92    right: JoinConditionExpression93@dataclass(frozen=True)94class JoinClause(DataSource, JoinNode[TSimpleDataSource], Generic[TSimpleDataSource]):95    """96    Joins two JoinNodes.97    For a simple join between two entities/tables or two subqueries, both98    left and right node are IndividualNodes.99    For a more complex joins between multiple nodes, nodes are associative100    on the left:101    `(a INNER JOIN b ON condition) INNER JOIN c ON condition`102    This means the left node can be a join on its own while the right node103    still needs to be an IndividualNode.104    """105    left_node: JoinNode[TSimpleDataSource]106    right_node: IndividualNode[TSimpleDataSource]107    keys: Sequence[JoinCondition]108    join_type: JoinType109    join_modifier: Optional[JoinModifier] = None110    def get_column_sets(self) -> Mapping[str, ColumnSet]:111        return {112            **self.left_node.get_column_sets(),113            **self.right_node.get_column_sets(),114        }115    def get_columns(self) -> ColumnSet:116        return QualifiedColumnSet(self.get_column_sets())117    def get_alias_node_map(self) -> Mapping[str, IndividualNode[TSimpleDataSource]]:118        return {119            **self.left_node.get_alias_node_map(),120            **self.right_node.get_alias_node_map(),121        }122    def __post_init__(self) -> None:123        column_set = self.get_columns()124        for condition in self.keys:125            assert f"{condition.left.table_alias}.{condition.left.column}" in column_set126            assert (127                f"{condition.right.table_alias}.{condition.right.column}" in column_set128            )129    def accept(self, visitor: JoinVisitor[TReturn, TSimpleDataSource]) -> TReturn:130        return visitor.visit_join_clause(self)...test_join.py
Source:test_join.py  
...20GROUPS_ASSIGNEE = ColumnSet([("id", UInt(32)), ("user", String())])21def test_entity_node() -> None:22    e = Entity(key=EntityKey.EVENTS, schema=ERRORS_SCHEMA)23    node = IndividualNode(alias="err", data_source=e)24    assert node.get_column_sets() == {"err": e.schema}25def test_simple_join() -> None:26    e = Entity(key=EntityKey.EVENTS, schema=ERRORS_SCHEMA)27    node_err = IndividualNode(alias="err", data_source=e)28    g = Entity(key=EntityKey.GROUPEDMESSAGE, schema=GROUPS_SCHEMA)29    node_group = IndividualNode(alias="groups", data_source=g)30    join = JoinClause(31        left_node=node_err,32        right_node=node_group,33        keys=[34            JoinCondition(35                left=JoinConditionExpression("err", "group_id"),36                right=JoinConditionExpression("groups", "id"),37            )38        ],39        join_type=JoinType.INNER,40        join_modifier=JoinModifier.SEMI,41    )42    assert join.get_column_sets() == {"err": ERRORS_SCHEMA, "groups": GROUPS_SCHEMA}43    joined_cols = join.get_columns()44    assert "err.group_id" in joined_cols45    assert "err.event_id" in joined_cols46    assert "groups.id" in joined_cols47    assert "groups.message" in joined_cols48    with pytest.raises(AssertionError):49        JoinClause(50            left_node=node_err,51            right_node=node_group,52            keys=[53                JoinCondition(54                    left=JoinConditionExpression("err", "missing_col"),55                    right=JoinConditionExpression("groups", "another_missing_col"),56                )57            ],58            join_type=JoinType.INNER,59        )60def test_complex_joins() -> None:61    e = Entity(key=EntityKey.EVENTS, schema=ERRORS_SCHEMA)62    node_err = IndividualNode(alias="err", data_source=e)63    g = Entity(key=EntityKey.GROUPEDMESSAGE, schema=GROUPS_SCHEMA)64    node_group = IndividualNode(alias="groups", data_source=g)65    a = Entity(key=EntityKey.GROUPASSIGNEE, schema=GROUPS_ASSIGNEE)66    query = Query(67        from_clause=a,68        selected_columns=[69            SelectedExpression("id", Column("id", None, "id")),70            SelectedExpression("assigned_user", Column("assigned_user", None, "user")),71        ],72    )73    node_query = IndividualNode(alias="assignee", data_source=query)74    join = JoinClause(75        left_node=JoinClause(76            left_node=node_err,77            right_node=node_group,78            keys=[79                JoinCondition(80                    left=JoinConditionExpression("err", "group_id"),81                    right=JoinConditionExpression("groups", "id"),82                )83            ],84            join_type=JoinType.INNER,85        ),86        right_node=node_query,87        keys=[88            JoinCondition(89                left=JoinConditionExpression("err", "group_id"),90                right=JoinConditionExpression("assignee", "id"),91            )92        ],93        join_type=JoinType.INNER,94    )95    assert join.get_column_sets() == {96        "err": ERRORS_SCHEMA,97        "assignee": ColumnSet([("id", Any()), ("assigned_user", Any())]),98        "groups": GROUPS_SCHEMA,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
