How to use create_recurring_run method in autotest

Best Python code snippet using autotest_python

recurring_run.py

Source:recurring_run.py Github

copy

Full Screen

...128 # Ensure we only split on the first equals char so the value can contain129 # equals signs too.130 split_args: List = [arg.split('=', 1) for arg in args or []]131 params: Dict[str, Any] = dict(split_args)132 recurring_run = client_obj.create_recurring_run(133 cron_expression=cron_expression,134 description=description,135 enabled=enabled,136 enable_caching=enable_caching,137 end_time=end_time,138 experiment_id=experiment_id,139 interval_second=interval_second,140 job_name=job_name,141 max_concurrency=max_concurrency,142 no_catchup=not catchup,143 params=params,144 pipeline_package_path=pipeline_package_path,145 pipeline_id=pipeline_id,146 start_time=start_time,...

Full Screen

Full Screen

test_job.py

Source:test_job.py Github

copy

Full Screen

1from click.testing import CliRunner2from kfp.cli.output import OutputFormat3from kfp_ext.job import job, get4from unittest.mock import Mock5import unittest6import textwrap7class TestJob(unittest.TestCase):8 runner = CliRunner()9 experiment_name = 'an_experiment'10 experiment_id = '12345'11 job_name = 'a_job'12 job_id = '67890'13 pipeline_name = 'a_pipeline'14 pipeline_id = 'abcdef'15 version_name = 'a_version'16 version_id = 'ghijkl'17 cron_expression = '* * * * * *'18 @staticmethod19 def experiment_with_id(id):20 experiment = Mock()21 experiment.id = id22 return experiment23 @staticmethod24 def job(id, name):25 job = Mock()26 job.id = id27 job.name = name28 return job29 @staticmethod30 def versions(id, name):31 response = Mock()32 version = Mock()33 version.id = id34 version.name = name35 response.versions = [version]36 return response37 def test_get_table(self):38 client = Mock()39 client.get_recurring_run.return_value = self.job(self.job_id, self.job_name)40 result = self.runner.invoke(job, [41 'get',42 self.job_id43 ], obj={'client': client, 'output': OutputFormat.table.name})44 self.assertEqual(result.exit_code, 0)45 self.assertEqual(result.output, textwrap.dedent(46 f"""\47 Job Details48 -------------49 ID {self.job_id}50 Name {self.job_name}51 """))52 def test_get_json(self):53 client = Mock()54 client.get_recurring_run.return_value = self.job(self.job_id, self.job_name)55 result = self.runner.invoke(job, [56 'get',57 self.job_id58 ], obj={'client': client, 'output': OutputFormat.json.name})59 self.assertEqual(result.exit_code, 0)60 self.assertEqual(result.output, textwrap.dedent(61 f"""\62 {{63 "Job Details": {{64 "ID": "{self.job_id}",65 "Name": "{self.job_name}"66 }}67 }}68 """))69 def test_submit_job_without_cron(self):70 client = Mock()71 result = self.runner.invoke(job, [72 'submit',73 '--job-name', self.job_name,74 '--experiment-name', self.experiment_name,75 '--pipeline-name', self.pipeline_name76 ], obj={'client': client, 'output': None})77 self.assertEqual(result.exit_code, 2)78 self.assertIn("Error: Missing option '-c' / '--cron-expression'.", result.output)79 def test_submit_job_without_job(self):80 client = Mock()81 result = self.runner.invoke(job, ['submit'], obj={'client': client, 'output': None})82 self.assertEqual(result.exit_code, 2)83 self.assertIn("Error: Missing option '-j' / '--job-name'.", result.output)84 def test_submit_job_without_experiment(self):85 client = Mock()86 result = self.runner.invoke(job, [87 'submit',88 '--job-name', self.job_name89 ], obj={'client': client, 'output': None})90 self.assertEqual(result.exit_code, 2)91 self.assertIn("Error: Missing option '-e' / '--experiment-name'.", result.output)92 def test_submit_job_without_pipeline(self):93 client = Mock()94 result = self.runner.invoke(job, [95 'submit',96 '--job-name', self.job_name,97 '--experiment-name', self.experiment_name,98 '--cron-expression', self.cron_expression99 ], obj={'client': client, 'output': None})100 self.assertEqual(result.exit_code, 1)101 self.assertEqual(result.output, 'You must provide one of [pipeline_name, pipeline_id].\n')102 def test_submit_job_with_pipeline_name(self):103 client = Mock()104 client.create_experiment.return_value = self.experiment_with_id(self.experiment_id)105 client.get_pipeline_id.return_value = self.pipeline_id106 client.create_recurring_run.return_value = self.job(self.job_id, self.job_name)107 result = self.runner.invoke(job, [108 'submit',109 '--job-name', self.job_name,110 '--pipeline-name', self.pipeline_name,111 '--experiment-name', self.experiment_name,112 '--cron-expression', self.cron_expression113 ], obj={'client': client, 'output': OutputFormat.table.name})114 client.create_recurring_run.assert_called_once_with(experiment_id=self.experiment_id, job_name=self.job_name, cron_expression=self.cron_expression, pipeline_id=self.pipeline_id, version_id=None)115 self.assertEqual(result.exit_code, 0)116 # Output format is tested in test_get_*117 self.assertIn(self.job_id, result.output)118 self.assertIn(self.job_name, result.output)119 def test_submit_job_with_pipeline_id(self):120 client = Mock()121 client.create_experiment.return_value = self.experiment_with_id(self.experiment_id)122 client.create_recurring_run.return_value = self.job(self.job_id, self.job_name)123 result = self.runner.invoke(job, [124 'submit',125 '--job-name', self.job_name,126 '--pipeline-id', self.pipeline_id,127 '--experiment-name', self.experiment_name,128 '--cron-expression', self.cron_expression129 ], obj={'client': client, 'output': OutputFormat.table.name})130 client.create_recurring_run.assert_called_once_with(experiment_id=self.experiment_id, job_name=self.job_name, cron_expression=self.cron_expression, pipeline_id=self.pipeline_id, version_id=None)131 self.assertEqual(result.exit_code, 0)132 # Output format is tested in test_get_*133 self.assertIn(self.job_id, result.output)134 self.assertIn(self.job_name, result.output)135 def test_submit_job_with_version_id(self):136 client = Mock()137 client.create_experiment.return_value = self.experiment_with_id(self.experiment_id)138 client.create_recurring_run.return_value = self.job(self.job_id, self.job_name)139 result = self.runner.invoke(job, [140 'submit',141 '--job-name', self.job_name,142 '--pipeline-id', self.pipeline_id,143 '--version-id', self.version_id,144 '--experiment-name', self.experiment_name,145 '--cron-expression', self.cron_expression146 ], obj={'client': client, 'output': OutputFormat.table.name})147 client.create_recurring_run.assert_called_once_with(experiment_id=self.experiment_id, job_name=self.job_name, cron_expression=self.cron_expression, pipeline_id=self.pipeline_id, version_id=self.version_id)148 self.assertEqual(result.exit_code, 0)149 # Output format is tested in test_get_*150 self.assertIn(self.job_id, result.output)151 self.assertIn(self.job_name, result.output)152 def test_submit_job_with_version_name(self):153 client = Mock()154 client.create_experiment.return_value = self.experiment_with_id(self.experiment_id)155 client.list_pipeline_versions.return_value = self.versions(self.version_id, self.version_name)156 client.create_recurring_run.return_value = self.job(self.job_id, self.job_name)157 result = self.runner.invoke(job, [158 'submit',159 '--job-name', self.job_name,160 '--pipeline-id', self.pipeline_id,161 '--version-name', self.version_name,162 '--experiment-name', self.experiment_name,163 '--cron-expression', self.cron_expression164 ], obj={'client': client, 'output': OutputFormat.table.name})165 client.create_recurring_run.assert_called_once_with(experiment_id=self.experiment_id, job_name=self.job_name, cron_expression=self.cron_expression, pipeline_id=self.pipeline_id, version_id=self.version_id)166 def test_delete_job_without_id(self):167 client = Mock()168 result = self.runner.invoke(job, [169 'delete',170 ], obj={'client': client, 'output': None})171 self.assertEqual(result.exit_code, 2)172 self.assertIn("Missing argument 'JOB_ID'", result.output)173 def test_delete_job_with_id(self):174 client = Mock()175 result = self.runner.invoke(job, [176 'delete',177 self.job_id,178 ], obj={'client': client, 'output': OutputFormat.table.name})179 client.delete_recurring_run.assert_called_once_with(job_id=self.job_id)180 self.assertEqual(result.exit_code, 0)...

Full Screen

Full Screen

client.py

Source:client.py Github

copy

Full Screen

...38 for j in matched_jobs:39 print(f"Warning: deleting exists job: {j.name}, {j.id}")40 kfp_client._job_api.delete_job(id=j.id)41 # https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.client.html#kfp.Client.create_recurring_run42 return kfp_client.create_recurring_run(43 experiment_id=experiment_id,44 job_name=job_name,45 description=job_name,46 # https://pkg.go.dev/github.com/robfig/cron#hdr-CRON_Expression_Format47 cron_expression=cron_expression,48 max_concurrency=1,49 pipeline_package_path=pipeline_package_path,50 enabled=True,51 enable_caching=enable_caching,52 params=params,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful