Best Python code snippet using dbt-osmosis_python
osmosis.py
Source:osmosis.py  
...318    @property319    def manifest(self) -> ManifestProxy:320        """dbt manifest dict"""321        return ManifestProxy(self.dbt.flat_graph)322    def safe_parse_project(self, reinit: bool = False) -> None:323        """This is used to reseed the DbtProject safely post-init. This is324        intended for use by the osmosis server"""325        if reinit:326            self.clear_caches()327        _config_pointer = copy(self.config)328        try:329            self.parse_project(init=reinit)330        except Exception as parse_error:331            self.config = _config_pointer332            raise parse_error333        self.write_manifest_artifact()334    def write_manifest_artifact(self) -> None:335        """Write a manifest.json to disk"""336        artifact_path = os.path.join(337            self.config.project_root, self.config.target_path, MANIFEST_ARTIFACT338        )339        self.dbt.write(artifact_path)340    def clear_caches(self) -> None:341        """Clear least recently used caches and reinstantiable container objects"""342        self.get_ref_node.cache_clear()343        self.get_source_node.cache_clear()344        self.get_macro_function.cache_clear()345        self.get_columns.cache_clear()346        self.compile_sql.cache_clear()347    @lru_cache(maxsize=10)348    def get_ref_node(self, target_model_name: str) -> MaybeNonSource:349        """Get a `ManifestNode` from a dbt project model name"""350        return self.dbt.resolve_ref(351            target_model_name=target_model_name,352            target_model_package=None,353            current_project=self.config.project_name,354            node_package=self.config.project_name,355        )356    @lru_cache(maxsize=10)357    def get_source_node(self, target_source_name: str, target_table_name: str) -> MaybeParsedSource:358        """Get a `ManifestNode` from a dbt project source name and table name"""359        return self.dbt.resolve_source(360            target_source_name=target_source_name,361            target_table_name=target_table_name,362            current_project=self.config.project_name,363            node_package=self.config.project_name,364        )365    def get_server_node(self, sql: str, node_name="name"):366        """Get a node for SQL execution against adapter"""367        self._clear_node(node_name)368        sql_node = self.sql_parser.parse_remote(sql, node_name)369        process_node(self.config, self.dbt, sql_node)370        return sql_node371    @lru_cache(maxsize=10)372    def get_node_by_path(self, path: str):373        """Find an existing node given relative file path."""374        for node in self.dbt.nodes.values():375            if node.original_file_path == path:376                return node377        return None378    @lru_cache(maxsize=100)379    def get_macro_function(self, macro_name: str) -> Callable[[Dict[str, Any]], Any]:380        """Get macro as a function which takes a dict via argument named `kwargs`,381        ie: `kwargs={"relation": ...}`382        make_schema_fn = get_macro_function('make_schema')\n383        make_schema_fn({'name': '__test_schema_1'})\n384        make_schema_fn({'name': '__test_schema_2'})"""385        return partial(self.adapter.execute_macro, macro_name=macro_name, manifest=self.dbt)386    def adapter_execute(387        self, sql: str, auto_begin: bool = False, fetch: bool = False388    ) -> Tuple[AdapterResponse, agate.Table]:389        """Wraps adapter.execute. Execute SQL against database"""390        return self.adapter.execute(sql, auto_begin, fetch)391    def execute_macro(392        self,393        macro: str,394        kwargs: Optional[Dict[str, Any]] = None,395    ) -> Any:396        """Wraps adapter execute_macro. Execute a macro like a function."""397        return self.get_macro_function(macro)(kwargs=kwargs)398    def execute_sql(self, raw_sql: str) -> DbtAdapterExecutionResult:399        """Execute dbt SQL statement against database"""400        # if no jinja chars then these are synonymous401        compiled_sql = raw_sql402        if has_jinja(raw_sql):403            # jinja found, compile it404            compiled_sql = self.compile_sql(raw_sql).compiled_sql405        return DbtAdapterExecutionResult(406            *self.adapter_execute(compiled_sql, fetch=True),407            raw_sql,408            compiled_sql,409        )410    def execute_node(self, node: ManifestNode) -> DbtAdapterExecutionResult:411        """Execute dbt SQL statement against database from a ManifestNode"""412        raw_sql: str = getattr(node, RAW_CODE)413        compiled_sql: Optional[str] = getattr(node, COMPILED_CODE, None)414        if compiled_sql:415            # node is compiled, execute the SQL416            return self.execute_sql(compiled_sql)417        # node not compiled418        if has_jinja(raw_sql):419            # node has jinja in its SQL, compile it420            compiled_sql = self.compile_node(node).compiled_sql421        # execute the SQL422        return self.execute_sql(compiled_sql or raw_sql)423    @lru_cache(maxsize=SQL_CACHE_SIZE)424    def compile_sql(self, raw_sql: str, retry: int = 3) -> DbtAdapterCompilationResult:425        """Creates a node with `get_server_node` method. Compile generated node.426        Has a retry built in because even uuidv4 cannot gaurantee uniqueness at the speed427        in which we can call this function concurrently. A retry significantly increases the stability"""428        temp_node_id = str(uuid.uuid4())429        try:430            node = self.compile_node(self.get_server_node(raw_sql, temp_node_id))431        except Exception as exc:432            if retry > 0:433                return self.compile_sql(raw_sql, retry - 1)434            raise exc435        else:436            return node437        finally:438            self._clear_node(temp_node_id)439    def compile_node(self, node: ManifestNode) -> DbtAdapterCompilationResult:440        """Compiles existing node."""441        self.sql_compiler.node = node442        # this is essentially a convenient wrapper to adapter.get_compiler443        compiled_node = self.sql_compiler.compile(self.dbt)444        return DbtAdapterCompilationResult(445            getattr(compiled_node, RAW_CODE),446            getattr(compiled_node, COMPILED_CODE),447            compiled_node,448        )449    def _clear_node(self, name="name"):450        """Removes the statically named node created by `execute_sql` and `compile_sql` in `dbt.lib`"""451        self.dbt.nodes.pop(f"{NodeType.SqlOperation}.{self.project_name}.{name}", None)452    def get_relation(self, database: str, schema: str, name: str) -> Optional[BaseRelation]:453        """Wrapper for `adapter.get_relation`"""454        return self.adapter.get_relation(database, schema, name)455    def create_relation(self, database: str, schema: str, name: str) -> BaseRelation:456        """Wrapper for `adapter.Relation.create`"""457        return self.adapter.Relation.create(database, schema, name)458    def create_relation_from_node(self, node: ManifestNode) -> BaseRelation:459        """Wrapper for `adapter.Relation.create_from`"""460        return self.adapter.Relation.create_from(self.config, node)461    def get_columns_in_relation(self, node: ManifestNode) -> List[str]:462        """Wrapper for `adapter.get_columns_in_relation`"""463        return self.adapter.get_columns_in_relation(self.create_relation_from_node(node))464    @lru_cache(maxsize=5)465    def get_columns(self, node: ManifestNode) -> List[ColumnInfo]:466        """Get a list of columns from a compiled node"""467        columns = []468        try:469            columns.extend(470                [c.name for c in self.get_columns_in_relation(self.create_relation_from_node(node))]471            )472        except CompilationException:473            original_sql = str(getattr(node, RAW_CODE))474            # TODO: account for `TOP` syntax475            setattr(node, RAW_CODE, f"select * from ({original_sql}) limit 0")476            result = self.execute_node(node)477            setattr(node, RAW_CODE, original_sql)478            delattr(node, COMPILED_CODE)479            columns.extend(result.table.column_names)480        return columns481    def get_or_create_relation(482        self, database: str, schema: str, name: str483    ) -> Tuple[BaseRelation, bool]:484        """Get relation or create if not exists. Returns tuple of relation and485        boolean result of whether it existed ie: (relation, did_exist)"""486        ref = self.get_relation(database, schema, name)487        return (ref, True) if ref else (self.create_relation(database, schema, name), False)488    def create_schema(self, node: ManifestNode):489        """Create a schema in the database"""490        return self.execute_macro(491            "create_schema",492            kwargs={"relation": self.create_relation_from_node(node)},493        )494    def materialize(495        self, node: ManifestNode, temporary: bool = True496    ) -> Tuple[AdapterResponse, None]:497        """Materialize a table in the database"""498        return self.adapter_execute(499            # Returns CTAS string so send to adapter.execute500            self.execute_macro(501                "create_table_as",502                kwargs={503                    "sql": getattr(node, COMPILED_CODE),504                    "relation": self.create_relation_from_node(node),505                    "temporary": temporary,506                },507            ),508            auto_begin=True,509        )510class DbtProjectContainer:511    """This class manages multiple DbtProjects which each correspond512    to a single dbt project on disk. This is mostly for osmosis server use"""513    def __init__(self):514        self._projects: Dict[str, DbtProject] = OrderedDict()515        self._default_project: Optional[str] = None516    def get_project(self, project_name: str) -> Optional[DbtProject]:517        """Primary interface to get a project and execute code"""518        return self._projects.get(project_name)519    @lru_cache(maxsize=10)520    def get_project_by_root_dir(self, root_dir: str) -> Optional[DbtProject]:521        """Get a project by its root directory."""522        root_dir = os.path.abspath(os.path.normpath(root_dir))523        for project in self._projects.values():524            if os.path.abspath(project.project_root) == root_dir:525                return project526        return None527    def get_default_project(self) -> Optional[DbtProject]:528        """Gets the default project which at any given time is the529        earliest project inserted into the container"""530        return self._projects.get(self._default_project)531    def add_project(532        self,533        target: Optional[str] = None,534        profiles_dir: Optional[str] = None,535        project_dir: Optional[str] = None,536        threads: Optional[int] = 1,537        name_override: Optional[str] = "",538    ) -> DbtProject:539        """Add a DbtProject with arguments"""540        project = DbtProject(target, profiles_dir, project_dir, threads)541        project_name = name_override or project.config.project_name542        if self._default_project is None:543            self._default_project = project_name544        self._projects[project_name] = project545        return project546    def add_parsed_project(self, project: DbtProject) -> DbtProject:547        """Add an already instantiated DbtProject"""548        self._projects.setdefault(project.config.project_name, project)549        return project550    def add_project_from_args(self, args: ConfigInterface) -> DbtProject:551        """Add a DbtProject from a ConfigInterface"""552        project = DbtProject.from_args(args)553        self._projects.setdefault(project.config.project_name, project)554        return project555    def drop_project(self, project_name: str) -> None:556        """Drop a DbtProject"""557        project = self.get_project(project_name)558        if project is None:559            return560        project.clear_caches()561        project.adapter.connections.cleanup_all()562        self._projects.pop(project_name)563        if self._default_project == project_name:564            if len(self) > 0:565                self._default_project = self._projects.keys()[0]566            else:567                self._default_project = None568    def drop_all_projects(self) -> None:569        """Drop all DbtProjectContainers"""570        self._default_project = None571        for project in self._projects:572            self.drop_project(project)573    def reparse_all_projects(self) -> None:574        """Reparse all projects"""575        for project in self:576            project.safe_parse_project()577    def registered_projects(self) -> List[str]:578        """Convenience to grab all registered project names"""579        return list(self._projects.keys())580    def __len__(self):581        """Allows len(DbtProjectContainer)"""582        return len(self._projects)583    def __getitem__(self, project: str):584        """Allows DbtProjectContainer['jaffle_shop']"""585        maybe_project = self.get_project(project)586        if maybe_project is None:587            raise KeyError(project)588        return maybe_project589    def __delitem__(self, project: str):590        """Allows del DbtProjectContainer['jaffle_shop']"""...server_v2.py
Source:server_v2.py  
...277    given project at any given time synchronously or asynchronously"""278    target_did_change = old_target != new_target279    try:280        runner.args.target = new_target281        runner.safe_parse_project(reinit=reset or target_did_change)282    except Exception as reparse_err:283        runner.args.target = old_target284        rv = OsmosisErrorContainer(285            error=OsmosisError(286                code=OsmosisErrorCode.ProjectParseFailure,287                message=str(reparse_err),288                data=reparse_err.__dict__,289            )290        )291    else:292        runner._version += 1293        rv = OsmosisResetResult(294            result=(295                f"Profile target changed from {old_target} to {new_target}!"...main.py
Source:main.py  
...104        dry_run=dry_run,105    )106    # Conform project structure & bootstrap undocumented models injecting columns107    if runner.commit_project_restructure_to_disk():108        runner.safe_parse_project()109    runner.propagate_documentation_downstream(force_inheritance=force_inheritance)110@yaml.command(context_settings=CONTEXT)111@shared_opts112@click.option(113    "-f",114    "--fqn",115    type=click.STRING,116    help="Specify models based on FQN. Use dots as separators. Looks like folder.folder.model or folder.folder.source.table. Use list command to see the scope of an FQN filter.",117)118@click.option(119    "-d",120    "--dry-run",121    is_flag=True,122    help="If specified, no changes are committed to disk.",...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
