Best Python code snippet using autotest_python
dag_test.py
Source:dag_test.py  
...81                ],  # B -> D82            ]83        )84        self.check_argo_yaml("dag_golden_1.yaml")85    def test_set_dependencies(self):86        couler.set_dependencies(lambda: job_a(message="A"), dependencies=None)87        couler.set_dependencies(lambda: job_b(message="B"), dependencies=["A"])88        couler.set_dependencies(lambda: job_c(message="C"), dependencies=["A"])89        couler.set_dependencies(lambda: job_d(message="D"), dependencies=["B"])90        self.check_argo_yaml("dag_golden_1.yaml")91    def test_set_dependencies_with_conditional(self):92        couler.set_dependencies(lambda: job_a(message="A"), dependencies=None)93        couler.set_dependencies(lambda: job_b(message="B"), dependencies=["A"])94        couler.set_dependencies(95            lambda: couler.when(96                couler.equal(flip_coin(), "heads"), lambda: heads()97            ),98            dependencies=["B"],99        )100        self.check_argo_yaml("dag_golden_2.yaml")101    def test_method_dependencies(self):102        class A:103            def a(self):104                return job_a(message="A")105            @staticmethod106            def b():107                return job_b(message="B")108            @classmethod109            def c(cls):110                return job_c(message="C")111        instance = A()112        couler.set_dependencies(instance.a, dependencies=None)113        couler.set_dependencies(instance.b, dependencies=["A"])114        couler.set_dependencies(A.c, dependencies=["A"])115        couler.set_dependencies(lambda: job_d(message="D"), dependencies=["B"])116        self.check_argo_yaml("dag_golden_1.yaml")117    def test_set_dependencies_with_passing_parameter_artifact_implicitly(self):118        def producer_two(step_name):119            output_one = couler.create_parameter_artifact(path="/mnt/t1.txt")120            output_two = couler.create_parameter_artifact(path="/mnt/t2.txt")121            c1 = "echo -n A > %s" % output_one.path122            c2 = "echo -n B > %s" % output_two.path123            command = "%s && %s" % (c1, c2)124            return couler.run_container(125                image="docker/whalesay:latest",126                args=command,127                output=[output_one, output_two],128                command=["bash", "-c"],129                step_name=step_name,130            )131        def consume_two(step_name):132            couler.run_container(133                image="docker/whalesay:latest",134                command=["echo"],135                args=["--input: x"],136                step_name=step_name,137            )138        couler.set_dependencies(139            lambda: producer_two(step_name="A"), dependencies=None140        )141        couler.set_dependencies(142            lambda: consume_two(step_name="B"), dependencies=["A"]143        )144        self.check_argo_yaml("parameter_passing_golden.yaml")145    def test_set_dependencies_none(self):146        couler.set_dependencies(lambda: job_a(message="A"), dependencies=None)147        couler.set_dependencies(lambda: job_b(message="B"), dependencies=["A"])148        couler.set_dependencies(lambda: job_c(message="C"), dependencies=None)149        pyaml.dump(couler.workflow_yaml())150        wf_tree = couler.workflow_yaml()151        tasks = wf_tree["spec"]["templates"][0]["dag"]["tasks"]152        assert_deps = {"A": None, "B": ["A"], "C": None}153        self.assertEqual(len(tasks), 3)154        for task in tasks:155            assert_dep = assert_deps[task["name"]]156            self.assertEqual(task.get("dependencies"), assert_dep)157    def test_set_depends_none(self):158        couler.set_dependencies(lambda: job_a(message="A"), dependencies=None)159        couler.set_dependencies(160            lambda: job_b(message="B"), dependencies="A.Succeeded"161        )162        couler.set_dependencies(lambda: job_c(message="C"), dependencies=None)163        content = pyaml.dump(couler.workflow_yaml())164        self.assertNotIn("dependencies", content)165        wf_tree = couler.workflow_yaml()166        tasks = wf_tree["spec"]["templates"][0]["dag"]["tasks"]167        assert_deps = {"A": None, "B": "A.Succeeded", "C": None}168        self.assertEqual(len(tasks), 3)169        for task in tasks:170            assert_dep = assert_deps[task["name"]]171            self.assertEqual(task.get("depends"), assert_dep)172    def test_raise(self):173        with self.assertRaises(ValueError):174            def some_raise():175                raise ValueError("test this")176            couler.set_exit_handler(couler.WFStatus.Failed, some_raise)177        couler._cleanup()178        couler.set_dependencies(lambda: job_a(message="A"), dependencies=None)179        content = pyaml.dump(couler.workflow_yaml())180        self.assertNotIn("onExit", content)181    def test_set_dependencies_none_with_exit_handler(self):182        couler.set_dependencies(lambda: job_a(message="A"), dependencies=None)183        couler.set_dependencies(lambda: job_b(message="B"), dependencies=["A"])184        def job_exit():185            return couler.run_container(186                image="docker/whalesay:latest",187                command=["cowsay"],188                step_name="C",189            )190        couler.set_exit_handler(couler.WFStatus.Failed, job_exit)191        content = pyaml.dump(couler.workflow_yaml())192        self.assertIn("{{workflow.status}} == Failed", content)193    # TODO: Provide new test case without `tf.train`.194    # def test_set_dependencies_for_job(self):195    #196    #     def producer_two(step_name):197    #         output_one = couler.create_parameter_artifact(path="/tmp/t1.txt")198    #         output_two = couler.create_parameter_artifact(path="/tmp/t2.txt")199    #         c1 = "echo -n A > %s" % output_one.path200    #         c2 = "echo -n B > %s" % output_two.path201    #         command = "%s && %s" % (c1, c2)202    #         return couler.run_container(203    #             image="docker/whalesay:latest",204    #             args=command,205    #             output=[output_one, output_two],206    #             command=["bash", "-c"],207    #             step_name=step_name,208    #         )209    #210    #     def train_1(step_name):211    #         tf.train(212    #             num_ps=2,213    #             num_workers=3,214    #             image="tensorflow:1.13",215    #             command="python tf.py",216    #             clean_pod_policy="Running",217    #             step_name=step_name,218    #         )219    #220    #     def train_2(step_name):221    #         tf.train(222    #             num_ps=2,223    #             num_workers=3,224    #             image="tensorflow:1.13",225    #             command="python tf.py",226    #             clean_pod_policy="Running",227    #             step_name=step_name,228    #         )229    #230    #     couler.set_dependencies(231    #         lambda: producer_two(step_name="A"), dependencies=None232    #     )233    #     couler.set_dependencies(234    #         lambda: train_1(step_name="B"), dependencies=["A"]235    #     )236    #     couler.set_dependencies(237    #         lambda: train_2(step_name="C"), dependencies=["B"]...dag-diamond-steps.py
Source:dag-diamond-steps.py  
...63    for x in listNum:64        newList.append(message+str(x))65    return newList66def diamond():67    couler.set_dependencies(lambda: job_x(message="X1"), dependencies=None)68    couler.set_dependencies(lambda: job_a(message="A1"), dependencies=["X1"])69    couler.set_dependencies(lambda: job_a(message="A2"), dependencies=["X1"])70    couler.set_dependencies(lambda: job_a(message="A3"), dependencies=["X1"])71    dep1 = listCreation(message="A")72    couler.set_dependencies(lambda: job_x(message="X2"), dependencies=dep1)73    couler.set_dependencies(lambda: job_x(message="X3"), dependencies=["X2"])74    couler.set_dependencies(lambda: job_x(message="X4"), dependencies=["X2"])75    couler.set_dependencies(lambda: job_b(message="B1"), dependencies=["X3"])76    couler.set_dependencies(lambda: job_b(message="B2"), dependencies=["X3"])77    couler.set_dependencies(lambda: job_b(message="B3"), dependencies=["X3"])78    dep2 = listCreation(message="B")79    couler.set_dependencies(lambda: job_x(message="X5"), dependencies=dep2)80    couler.set_dependencies(lambda: job_c(message="C1"), dependencies=["X4"])81    couler.set_dependencies(lambda: job_c(message="C2"), dependencies=["X4"])82    couler.set_dependencies(lambda: job_c(message="C3"), dependencies=["X4"])83    dep3 = listCreation(message="C")84    couler.set_dependencies(lambda: job_b(message="X6"), dependencies=dep3)85    couler.set_dependencies(lambda: job_b(message="X7"),86                            dependencies=["X6", "X5"])87    couler.set_dependencies(lambda: job_d(message="D1"), dependencies=["X7"])88    couler.set_dependencies(lambda: job_d(message="D2"), dependencies=["X7"])89    couler.set_dependencies(lambda: job_d(message="D3"), dependencies=["X7"])90    dep4 = listCreation(message="D")91    couler.set_dependencies(lambda: job_d(message="X8"), dependencies=dep4)92diamond()93submitter = ArgoSubmitter(namespace="argo")...flow.py
Source:flow.py  
...14# CREATE FLOW/ETL PIPELINE.15flow = Flow('ETL Pipeline')16# 1) POPULATE STAGING TABLES.17# AIRBNB LISTINGS CALENDAR DATA.18flow.set_dependencies(19    task=drop_calendar_staging_table)20flow.set_dependencies(21    task=create_calendar_staging_table,22    upstream_tasks=[drop_calendar_staging_table])23flow.set_dependencies(24    task=load_calendar_data_into_staging_table,25    upstream_tasks=[create_calendar_staging_table])26# COVID JAPAN DATA.27flow.set_dependencies(28    task=drop_covid_staging_table)29flow.set_dependencies(30    task=create_covid_staging_table,31    upstream_tasks=[drop_covid_staging_table])32flow.set_dependencies(33    task=load_covid_data_into_staging_table,34    upstream_tasks=[create_covid_staging_table])35# 2) RUN STAGING TABLES QUALITY CHECKS.36flow.set_dependencies(37    task=run_quality_checks_for_staging_tables,38    upstream_tasks=[load_calendar_data_into_staging_table,39                    load_covid_data_into_staging_table])40# 3) CREATE NEW TABLES FOR SOURCE-OF-TRUTH DB.41flow.set_dependencies(42    task=create_table_tokyo_covid_by_prefecture,43    upstream_tasks=[run_quality_checks_for_staging_tables])44flow.set_dependencies(45    task=create_table_tokyo_aggr_listings_availability,46    upstream_tasks=[run_quality_checks_for_staging_tables])47flow.set_dependencies(48    task=create_table_tokyo_listings_availability_and_covid_rates,49    upstream_tasks=[create_table_tokyo_covid_by_prefecture,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
