Best Python code snippet using autotest_python
dbt_parser.py
Source:dbt_parser.py  
1#! /usr/bin/python32# dbt_parser.py3#4# Project name: DepecheCode5# Author: Hugo Juhel6#7# description:8"""9    Airflow's DAG parser and builder for DBT10"""11#############################################################################12#                                 Packages                                  #13#############################################################################14import json15from pathlib import Path16from airflow.utils.task_group import TaskGroup17from airflow.utils.decorators import apply_defaults18from depechecode.airflow_plugin.operators.dbt import RunOperator19from depechecode.logger import MixinLogable20#############################################################################21#                                 Packages                                  #22#############################################################################23class DBTAutoDag(MixinLogable):24    """25    An automatic parser for a DBT manifest.26    The class extracts the between models dependencies and convert them to DepecheCode's dbt operators.27    A utility class that parses out a dbt project and creates the respective task groups28    Args:29        dag: The Airflow DAG30        dbt_global_cli_flags: Any global flags for the dbt CLI31        dbt_project_dir: The directory containing the dbt_project.yml32        dbt_profiles_dir: The directory containing the profiles.yml33        dbt_target: The dbt target profile (e.g. dev, prod)34        dbt_run_group_name: Optional override for the task group name.35        dbt_test_group_name: Optional override for the task group name.36    """37    _PATH_TO_TARGET = "depechecode/manifest.json"38    @apply_defaults39    def __init__(40        self,41        dag,42        working_dir: str = None,43        profiles_dir: str = None,44        target: str = None,45        requirements_file_path: str = None,46        *args,47        **kwargs,48    ):49        super().__init__(logger_name="DBTDagParser")50        self._dag = dag51        self._working_dir = working_dir52        self._profiles_dir = profiles_dir53        self._target = target54        self._requirements_file_path = requirements_file_path55        self._args = args56        self._kwargs = kwargs57        # Prepare groups to gather tasks.58        self._run_group: TaskGroup = None  # type: ignore59    def load_dbt_manifest(self):60        """61        Read the DBT manifest.62        Returns: A JSON object containing the dbt manifest content.63        """64        cwd = self._working_dir or "/"65        try:66            path = Path(cwd) / Path(self._PATH_TO_TARGET)67            with open(path) as f:68                file_content = json.load(f)69        except BaseException as error:70            raise IOError(71                f"Failed to read the DBT s project manifest : {path}"  # type: ignore72            ) from error73        return file_content74    def __call__(self):75        """76        Parse out the content of the DBT manifest and build the Task77        Returns: None78        """79        manifest = self.load_dbt_manifest()80        tasks = {}81        with TaskGroup(group_id="run_group") as tg1:82            # Create the tasks for each model83            for node_name in manifest["nodes"].keys():84                if node_name.split(".")[0] == "model":85                    # Make the run nodes86                    tasks[node_name] = RunOperator(87                        task_group=self._run_group,88                        task_id=node_name,89                        model=node_name.split(".")[-1],90                        working_dir=self._working_dir,91                        requirements_file=self._requirements_file_path,92                        profiles_dir=self._profiles_dir,93                        target=self._target,94                        dag=self._dag,95                    )96            # Add upstream and downstream dependencies for each run task97            for node_name in manifest["nodes"].keys():98                if node_name.split(".")[0] == "model":99                    for upstream_node in manifest["nodes"][node_name]["depends_on"][100                        "nodes"101                    ]:102                        upstream_node_type = upstream_node.split(".")[0]103                        if upstream_node_type == "model":104                            tasks[upstream_node] >> tasks[node_name]105                    self.info(f"Created node {node_name}")106            self._run_group = tg1107    @property108    def run_group(self):...job.py
Source:job.py  
...42        return all([43            self._status[group.name] in (Job.GroupStatus.DONE, Job.GroupStatus.CANCELLED)44            for group in self._groups45        ])46    async def _run_group(self, group: Group):47        if self._is_regular_runtime:48            module_dependencies = group.module_dependencies49        else:50            module_dependencies = group.module_dependencies_nonregular51        results = [52            result53            for name, result in self._results.items()54            if name in module_dependencies55        ]56        task = await group.run(self.uuid, self._request, results, self._is_regular_runtime)57        self._tasks[group.name] = task58        self._status[group.name] = Job.GroupStatus.PROCESSING59    async def run(self):60        for group in self._groups:61            dependencies = (62                group.dependencies63                if self._is_regular_runtime64                else group.dependencies_nonregular65            )66            if len(dependencies) == 0:67                await self._run_group(group)68        while not self._finished:69            done, _ = await asyncio.wait(70                fs=self._tasks.values(),71                return_when=asyncio.FIRST_COMPLETED,72            )73            for task in done:74                future: FutureResult = task.result()75                for module_result in future.result.collection:76                    self._results[module_result.name] = module_result77                self._status[future.group] = Job.GroupStatus.DONE78                del self._tasks[future.group]79                # Check for early error stopping80                if future.result.contains_invalid_result():81                    self._status = dict(82                        (k, Job.GroupStatus.CANCELLED)83                        if v in (Job.GroupStatus.IDLE, Job.GroupStatus.PROCESSING)84                        else (k, v)85                        for k, v in self._status.items()86                    )87            for group in self._ready_groups:88                await self._run_group(group)...kubectl-cluster-group
Source:kubectl-cluster-group  
...73        74        targets = self.groups if self.all_groups else {self.group_sel: self.groups[self.group_sel]}75        for k, g in targets.items():76            print('[GROUP]: {name} '.format(name=k)+ '-'*80)77            self._run_group(g)78            print('')79    def _run_group(self, group):80        for cluster in group.clusters:81            print('[CLUSTER]: {name} '.format(name=cluster.name)+ '-'*50)82            args = ['kubectl']83            args.extend(cluster.args)84            args.extend(self.kubectl_args)85            print('[DEBUG]', ' '.join(args))86            subprocess.call(args)87            print('')88            89if __name__ == '__main__':90    tool = ClusterTool()91    tool.parse_args()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
