Best Python code snippet using dbt-osmosis_python
osmosis.py
Source:osmosis.py  
...795            raise SanitizationRequired(796                "Found `sources:` block in a models schema file. We require you separate sources in order to organize your project."797            )798        return schema799    def build_schema_folder_mapping(800        self,801        target_node_type: Optional[Union[NodeType.Model, NodeType.Source]] = None,802    ) -> Dict[str, SchemaFileLocation]:803        """Builds a mapping of models or sources to their existing and target schema file paths"""804        if target_node_type == NodeType.Source:805            # Source folder mapping is reserved for source importing806            target_nodes = self.dbt.sources807        elif target_node_type == NodeType.Model:808            target_nodes = self.dbt.nodes809        else:810            target_nodes = {**self.dbt.nodes, **self.dbt.sources}811        # Container for output812        schema_map = {}813        logger().info("...building project structure mapping in memory")814        # Iterate over models and resolve current path vs declarative target path815        for unique_id, dbt_node in self.filtered_models(target_nodes):816            schema_path = self.get_schema_path(dbt_node)817            osmosis_schema_path = self.get_target_schema_path(dbt_node)818            schema_map[unique_id] = SchemaFileLocation(819                target=osmosis_schema_path, current=schema_path820            )821        return schema_map822    def draft_project_structure_update_plan(self) -> Dict[Path, SchemaFileMigration]:823        """Build project structure update plan based on `dbt-osmosis:` configs set across dbt_project.yml and model files.824        The update plan includes injection of undocumented models. Unless this plan is constructed and executed by the `commit_project_restructure` function,825        dbt-osmosis will only operate on models it is aware of through the existing documentation.826        Returns:827            MutableMapping: Update plan where dict keys consist of targets and contents consist of outputs which match the contents of the `models` to be output in the828            target file and supersede lists of what files are superseded by a migration829        """830        # Container for output831        blueprint: Dict[Path, SchemaFileMigration] = {}832        logger().info(833            ":chart_increasing: Searching project stucture for required updates and building action plan"834        )835        with self.adapter.connection_named("dbt-osmosis"):836            for unique_id, schema_file in self.build_schema_folder_mapping(837                target_node_type=NodeType.Model838            ).items():839                if not schema_file.is_valid:840                    blueprint.setdefault(841                        schema_file.target,842                        SchemaFileMigration(output={"version": 2, "models": []}, supersede={}),843                    )844                    node = self.dbt.nodes[unique_id]845                    if schema_file.current is None:846                        # Bootstrapping Undocumented Model847                        blueprint[schema_file.target].output["models"].append(848                            self.get_base_model(node)849                        )850                    else:851                        # Model Is Documented but Must be Migrated852                        if not schema_file.current.exists():853                            continue854                        # TODO: We avoid sources for complexity reasons but if we are opinionated, we don't have to855                        schema = self.assert_schema_has_no_sources(856                            self.yaml_handler.load(schema_file.current)857                        )858                        models_in_file: Iterable[Dict[str, Any]] = schema.get("models", [])859                        for documented_model in models_in_file:860                            if documented_model["name"] == node.name:861                                # Bootstrapping Documented Model862                                blueprint[schema_file.target].output["models"].append(863                                    self.bootstrap_existing_model(documented_model, node)864                                )865                                # Target to supersede current866                                blueprint[schema_file.target].supersede.setdefault(867                                    schema_file.current, []868                                ).append(documented_model["name"])869                                break870                        else:871                            ...  # Model not found at patch path -- We should pass on this for now872                else:873                    ...  # Valid schema file found for model -- We will update the columns in the `Document` task874        return blueprint875    def commit_project_restructure_to_disk(876        self, blueprint: Optional[Dict[Path, SchemaFileMigration]] = None877    ) -> bool:878        """Given a project restrucure plan of pathlib Paths to a mapping of output and supersedes which is in itself a mapping of Paths to model names,879        commit changes to filesystem to conform project to defined structure as code fully or partially superseding existing models as needed.880        Args:881            blueprint (Dict[Path, SchemaFileMigration]): Project restructure plan as typically created by `build_project_structure_update_plan`882        Returns:883            bool: True if the project was restructured, False if no action was required884        """885        # Build blueprint if not user supplied886        if not blueprint:887            blueprint = self.draft_project_structure_update_plan()888        # Verify we have actions in the plan889        if not blueprint:890            logger().info(":1st_place_medal: Project structure approved")891            return False892        # Print plan for user auditability893        self.pretty_print_restructure_plan(blueprint)894        logger().info(895            ":construction_worker: Executing action plan and conforming projecting schemas to defined structure"896        )897        for target, structure in blueprint.items():898            if not target.exists():899                # Build File900                logger().info(":construction: Building schema file %s", target.name)901                if not self.dry_run:902                    target.parent.mkdir(exist_ok=True, parents=True)903                    target.touch()904                    self.yaml_handler.dump(structure.output, target)905            else:906                # Update File907                logger().info(":toolbox: Updating schema file %s", target.name)908                target_schema: Optional[Dict[str, Any]] = self.yaml_handler.load(target)909                if not target_schema:910                    target_schema = {"version": 2}911                elif "version" not in target_schema:912                    target_schema["version"] = 2913                target_schema.setdefault("models", []).extend(structure.output["models"])914                if not self.dry_run:915                    self.yaml_handler.dump(target_schema, target)916            # Clean superseded schema files917            for dir, models in structure.supersede.items():918                preserved_models = []919                raw_schema: Dict[str, Any] = self.yaml_handler.load(dir)920                models_marked_for_superseding = set(models)921                models_in_schema = set(map(lambda mdl: mdl["name"], raw_schema.get("models", [])))922                non_superseded_models = models_in_schema - models_marked_for_superseding923                if len(non_superseded_models) == 0:924                    logger().info(":rocket: Superseded schema file %s", dir.name)925                    if not self.dry_run:926                        dir.unlink(missing_ok=True)927                else:928                    for model in raw_schema["models"]:929                        if model["name"] in non_superseded_models:930                            preserved_models.append(model)931                    raw_schema["models"] = preserved_models932                    if not self.dry_run:933                        self.yaml_handler.dump(raw_schema, dir)934                    logger().info(935                        ":satellite: Model documentation migrated from %s to %s",936                        dir.name,937                        target.name,938                    )939        return True940    @staticmethod941    def pretty_print_restructure_plan(blueprint: Dict[Path, SchemaFileMigration]) -> None:942        logger().info(943            list(944                map(945                    lambda plan: (blueprint[plan].supersede or "CREATE", "->", plan),946                    blueprint.keys(),947                )948            )949        )950    def build_node_ancestor_tree(951        self,952        node: ManifestNode,953        family_tree: Optional[Dict[str, List[str]]] = None,954        members_found: Optional[List[str]] = None,955        depth: int = 0,956    ) -> Dict[str, List[str]]:957        """Recursively build dictionary of parents in generational order"""958        if family_tree is None:959            family_tree = {}960        if members_found is None:961            members_found = []962        for parent in node.depends_on.nodes:963            member = self.dbt.nodes.get(parent, self.dbt.sources.get(parent))964            if member and parent not in members_found:965                family_tree.setdefault(f"generation_{depth}", []).append(parent)966                members_found.append(parent)967                # Recursion968                family_tree = self.build_node_ancestor_tree(969                    member, family_tree, members_found, depth + 1970                )971        return family_tree972    def inherit_column_level_knowledge(973        self,974        family_tree: Dict[str, Any],975    ) -> Dict[str, Dict[str, Any]]:976        """Inherit knowledge from ancestors in reverse insertion order to ensure that the most recent ancestor is always the one to inherit from"""977        knowledge: Dict[str, Dict[str, Any]] = {}978        for generation in reversed(family_tree):979            for ancestor in family_tree[generation]:980                member: ManifestNode = self.dbt.nodes.get(ancestor, self.dbt.sources.get(ancestor))981                if not member:982                    continue983                for name, info in member.columns.items():984                    knowledge.setdefault(name, {"progenitor": ancestor})985                    deserialized_info = info.to_dict()986                    # Handle Info:987                    # 1. tags are additive988                    # 2. descriptions are overriden989                    # 3. meta is merged990                    # 4. tests are ignored until I am convinced those shouldn't be hand curated with love991                    if deserialized_info["description"] in self.placeholders:992                        deserialized_info.pop("description", None)993                    deserialized_info["tags"] = list(994                        set(deserialized_info.pop("tags", []) + knowledge[name].get("tags", []))995                    )996                    if not deserialized_info["tags"]:997                        deserialized_info.pop("tags")  # poppin' tags like Macklemore998                    deserialized_info["meta"] = {999                        **knowledge[name].get("meta", {}),1000                        **deserialized_info["meta"],1001                    }1002                    if not deserialized_info["meta"]:1003                        deserialized_info.pop("meta")1004                    knowledge[name].update(deserialized_info)1005        return knowledge1006    def get_node_columns_with_inherited_knowledge(1007        self,1008        node: ManifestNode,1009    ) -> Dict[str, Dict[str, Any]]:1010        """Build a knowledgebase for the model based on iterating through ancestors"""1011        family_tree = self.build_node_ancestor_tree(node)1012        knowledge = self.inherit_column_level_knowledge(family_tree)1013        return knowledge1014    @staticmethod1015    def get_column_sets(1016        database_columns: Iterable[str],1017        yaml_columns: Iterable[str],1018        documented_columns: Iterable[str],1019    ) -> Tuple[List[str], List[str], List[str]]:1020        """Returns:1021        missing_columns: Columns in database not in dbt -- will be injected into schema file1022        undocumented_columns: Columns missing documentation -- descriptions will be inherited and injected into schema file where prior knowledge exists1023        extra_columns: Columns in schema file not in database -- will be removed from schema file1024        """1025        missing_columns = [1026            x for x in database_columns if x.lower() not in (y.lower() for y in yaml_columns)1027        ]1028        undocumented_columns = [1029            x for x in database_columns if x.lower() not in (y.lower() for y in documented_columns)1030        ]1031        extra_columns = [1032            x for x in yaml_columns if x.lower() not in (y.lower() for y in database_columns)1033        ]1034        return missing_columns, undocumented_columns, extra_columns1035    def propagate_documentation_downstream(self, force_inheritance: bool = False) -> None:1036        schema_map = self.build_schema_folder_mapping()1037        with self.adapter.connection_named("dbt-osmosis"):1038            for unique_id, node in track(list(self.filtered_models())):1039                logger().info("\n:point_right: Processing model: [bold]%s[/bold] \n", unique_id)1040                # Get schema file path, must exist to propagate documentation1041                schema_path: Optional[SchemaFileLocation] = schema_map.get(unique_id)1042                if schema_path is None or schema_path.current is None:1043                    logger().info(1044                        ":bow: No valid schema file found for model %s", unique_id1045                    )  # We can't take action1046                    continue1047                # Build Sets1048                database_columns: Set[str] = set(self.get_columns(node))1049                yaml_columns: Set[str] = set(column for column in node.columns)1050                if not database_columns:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
