How to use assert_schema_has_no_sources method in dbt-osmosis

Best Python code snippet using dbt-osmosis_python

osmosis.py

Source:osmosis.py Github

copy

Full Screen

...788 str(error),789 )790 return columns791 @staticmethod792 def assert_schema_has_no_sources(schema: Mapping) -> Mapping:793 """Inline assertion ensuring that a schema does not have a source key"""794 if schema.get("sources"):795 raise SanitizationRequired(796 "Found `sources:` block in a models schema file. We require you separate sources in order to organize your project."797 )798 return schema799 def build_schema_folder_mapping(800 self,801 target_node_type: Optional[Union[NodeType.Model, NodeType.Source]] = None,802 ) -> Dict[str, SchemaFileLocation]:803 """Builds a mapping of models or sources to their existing and target schema file paths"""804 if target_node_type == NodeType.Source:805 # Source folder mapping is reserved for source importing806 target_nodes = self.dbt.sources807 elif target_node_type == NodeType.Model:808 target_nodes = self.dbt.nodes809 else:810 target_nodes = {**self.dbt.nodes, **self.dbt.sources}811 # Container for output812 schema_map = {}813 logger().info("...building project structure mapping in memory")814 # Iterate over models and resolve current path vs declarative target path815 for unique_id, dbt_node in self.filtered_models(target_nodes):816 schema_path = self.get_schema_path(dbt_node)817 osmosis_schema_path = self.get_target_schema_path(dbt_node)818 schema_map[unique_id] = SchemaFileLocation(819 target=osmosis_schema_path, current=schema_path820 )821 return schema_map822 def draft_project_structure_update_plan(self) -> Dict[Path, SchemaFileMigration]:823 """Build project structure update plan based on `dbt-osmosis:` configs set across dbt_project.yml and model files.824 The update plan includes injection of undocumented models. Unless this plan is constructed and executed by the `commit_project_restructure` function,825 dbt-osmosis will only operate on models it is aware of through the existing documentation.826 Returns:827 MutableMapping: Update plan where dict keys consist of targets and contents consist of outputs which match the contents of the `models` to be output in the828 target file and supersede lists of what files are superseded by a migration829 """830 # Container for output831 blueprint: Dict[Path, SchemaFileMigration] = {}832 logger().info(833 ":chart_increasing: Searching project stucture for required updates and building action plan"834 )835 with self.adapter.connection_named("dbt-osmosis"):836 for unique_id, schema_file in self.build_schema_folder_mapping(837 target_node_type=NodeType.Model838 ).items():839 if not schema_file.is_valid:840 blueprint.setdefault(841 schema_file.target,842 SchemaFileMigration(output={"version": 2, "models": []}, supersede={}),843 )844 node = self.dbt.nodes[unique_id]845 if schema_file.current is None:846 # Bootstrapping Undocumented Model847 blueprint[schema_file.target].output["models"].append(848 self.get_base_model(node)849 )850 else:851 # Model Is Documented but Must be Migrated852 if not schema_file.current.exists():853 continue854 # TODO: We avoid sources for complexity reasons but if we are opinionated, we don't have to855 schema = self.assert_schema_has_no_sources(856 self.yaml_handler.load(schema_file.current)857 )858 models_in_file: Iterable[Dict[str, Any]] = schema.get("models", [])859 for documented_model in models_in_file:860 if documented_model["name"] == node.name:861 # Bootstrapping Documented Model862 blueprint[schema_file.target].output["models"].append(863 self.bootstrap_existing_model(documented_model, node)864 )865 # Target to supersede current866 blueprint[schema_file.target].supersede.setdefault(867 schema_file.current, []868 ).append(documented_model["name"])869 break...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run dbt-osmosis automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful