How to use wait_for_done method in localstack

Best Python code snippet using localstack_python

test_beam.py

Source:test_beam.py Github

copy

Full Screen

1# Licensed to the Apache Software Foundation (ASF) under one2# or more contributor license agreements. See the NOTICE file3# distributed with this work for additional information4# regarding copyright ownership. The ASF licenses this file5# to you under the Apache License, Version 2.0 (the6# "License"); you may not use this file except in compliance7# with the License. You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing,12# software distributed under the License is distributed on an13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY14# KIND, either express or implied. See the License for the15# specific language governing permissions and limitations16# under the License.17#18import copy19import os20import subprocess21import unittest22from unittest import mock23from unittest.mock import MagicMock24from parameterized import parameterized25from airflow.exceptions import AirflowException26from airflow.providers.apache.beam.hooks.beam import BeamCommandRunner, BeamHook, beam_options_to_args27PY_FILE = 'apache_beam.examples.wordcount'28JAR_FILE = 'unitest.jar'29JOB_CLASS = 'com.example.UnitTest'30PY_OPTIONS = ['-m']31TEST_JOB_ID = 'test-job-id'32GO_FILE = '/path/to/file.go'33DEFAULT_RUNNER = "DirectRunner"34BEAM_STRING = 'airflow.providers.apache.beam.hooks.beam.{}'35BEAM_VARIABLES_PY = {'output': 'gs://test/output', 'labels': {'foo': 'bar'}}36BEAM_VARIABLES_JAVA = {37 'output': 'gs://test/output',38 'labels': {'foo': 'bar'},39}40BEAM_VARIABLES_GO = {'output': 'gs://test/output', 'labels': {'foo': 'bar'}}41APACHE_BEAM_V_2_14_0_JAVA_SDK_LOG = f""""\42Dataflow SDK version: 2.14.043Jun 15, 2020 2:57:28 PM org.apache.beam.runners.dataflow.DataflowRunner run44INFO: To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow\45/jobsDetail/locations/europe-west3/jobs/{TEST_JOB_ID}?project=XXX46Submitted job: {TEST_JOB_ID}47Jun 15, 2020 2:57:28 PM org.apache.beam.runners.dataflow.DataflowRunner run48INFO: To cancel the job using the 'gcloud' tool, run:49> gcloud dataflow jobs --project=XXX cancel --region=europe-west3 {TEST_JOB_ID}50"""51class TestBeamHook(unittest.TestCase):52 @mock.patch(BEAM_STRING.format('BeamCommandRunner'))53 def test_start_python_pipeline(self, mock_runner):54 hook = BeamHook(runner=DEFAULT_RUNNER)55 wait_for_done = mock_runner.return_value.wait_for_done56 process_line_callback = MagicMock()57 hook.start_python_pipeline(58 variables=copy.deepcopy(BEAM_VARIABLES_PY),59 py_file=PY_FILE,60 py_options=PY_OPTIONS,61 process_line_callback=process_line_callback,62 )63 expected_cmd = [64 "python3",65 '-m',66 PY_FILE,67 f'--runner={DEFAULT_RUNNER}',68 '--output=gs://test/output',69 '--labels=foo=bar',70 ]71 mock_runner.assert_called_once_with(72 cmd=expected_cmd, process_line_callback=process_line_callback, working_directory=None73 )74 wait_for_done.assert_called_once_with()75 @parameterized.expand(76 [77 ('default_to_python3', 'python3'),78 ('major_version_2', 'python2'),79 ('major_version_3', 'python3'),80 ('minor_version', 'python3.6'),81 ]82 )83 @mock.patch(BEAM_STRING.format('BeamCommandRunner'))84 def test_start_python_pipeline_with_custom_interpreter(self, _, py_interpreter, mock_runner):85 hook = BeamHook(runner=DEFAULT_RUNNER)86 wait_for_done = mock_runner.return_value.wait_for_done87 process_line_callback = MagicMock()88 hook.start_python_pipeline(89 variables=copy.deepcopy(BEAM_VARIABLES_PY),90 py_file=PY_FILE,91 py_options=PY_OPTIONS,92 py_interpreter=py_interpreter,93 process_line_callback=process_line_callback,94 )95 expected_cmd = [96 py_interpreter,97 '-m',98 PY_FILE,99 f'--runner={DEFAULT_RUNNER}',100 '--output=gs://test/output',101 '--labels=foo=bar',102 ]103 mock_runner.assert_called_once_with(104 cmd=expected_cmd, process_line_callback=process_line_callback, working_directory=None105 )106 wait_for_done.assert_called_once_with()107 @parameterized.expand(108 [109 (['foo-bar'], False),110 (['foo-bar'], True),111 ([], True),112 ]113 )114 @mock.patch(BEAM_STRING.format('prepare_virtualenv'))115 @mock.patch(BEAM_STRING.format('BeamCommandRunner'))116 def test_start_python_pipeline_with_non_empty_py_requirements_and_without_system_packages(117 self, current_py_requirements, current_py_system_site_packages, mock_runner, mock_virtualenv118 ):119 hook = BeamHook(runner=DEFAULT_RUNNER)120 wait_for_done = mock_runner.return_value.wait_for_done121 mock_virtualenv.return_value = '/dummy_dir/bin/python'122 process_line_callback = MagicMock()123 hook.start_python_pipeline(124 variables=copy.deepcopy(BEAM_VARIABLES_PY),125 py_file=PY_FILE,126 py_options=PY_OPTIONS,127 py_requirements=current_py_requirements,128 py_system_site_packages=current_py_system_site_packages,129 process_line_callback=process_line_callback,130 )131 expected_cmd = [132 '/dummy_dir/bin/python',133 '-m',134 PY_FILE,135 f'--runner={DEFAULT_RUNNER}',136 '--output=gs://test/output',137 '--labels=foo=bar',138 ]139 mock_runner.assert_called_once_with(140 cmd=expected_cmd, process_line_callback=process_line_callback, working_directory=None141 )142 wait_for_done.assert_called_once_with()143 mock_virtualenv.assert_called_once_with(144 venv_directory=mock.ANY,145 python_bin="python3",146 system_site_packages=current_py_system_site_packages,147 requirements=current_py_requirements,148 )149 @mock.patch(BEAM_STRING.format('BeamCommandRunner'))150 def test_start_python_pipeline_with_empty_py_requirements_and_without_system_packages(self, mock_runner):151 hook = BeamHook(runner=DEFAULT_RUNNER)152 wait_for_done = mock_runner.return_value.wait_for_done153 process_line_callback = MagicMock()154 with self.assertRaisesRegex(AirflowException, "Invalid method invocation."):155 hook.start_python_pipeline(156 variables=copy.deepcopy(BEAM_VARIABLES_PY),157 py_file=PY_FILE,158 py_options=PY_OPTIONS,159 py_requirements=[],160 process_line_callback=process_line_callback,161 )162 mock_runner.assert_not_called()163 wait_for_done.assert_not_called()164 @mock.patch(BEAM_STRING.format('BeamCommandRunner'))165 def test_start_java_pipeline(self, mock_runner):166 hook = BeamHook(runner=DEFAULT_RUNNER)167 wait_for_done = mock_runner.return_value.wait_for_done168 process_line_callback = MagicMock()169 hook.start_java_pipeline(170 jar=JAR_FILE,171 variables=copy.deepcopy(BEAM_VARIABLES_JAVA),172 process_line_callback=process_line_callback,173 )174 expected_cmd = [175 'java',176 '-jar',177 JAR_FILE,178 f'--runner={DEFAULT_RUNNER}',179 '--output=gs://test/output',180 '--labels={"foo":"bar"}',181 ]182 mock_runner.assert_called_once_with(183 cmd=expected_cmd, process_line_callback=process_line_callback, working_directory=None184 )185 wait_for_done.assert_called_once_with()186 @mock.patch(BEAM_STRING.format('BeamCommandRunner'))187 def test_start_java_pipeline_with_job_class(self, mock_runner):188 hook = BeamHook(runner=DEFAULT_RUNNER)189 wait_for_done = mock_runner.return_value.wait_for_done190 process_line_callback = MagicMock()191 hook.start_java_pipeline(192 jar=JAR_FILE,193 variables=copy.deepcopy(BEAM_VARIABLES_JAVA),194 job_class=JOB_CLASS,195 process_line_callback=process_line_callback,196 )197 expected_cmd = [198 'java',199 '-cp',200 JAR_FILE,201 JOB_CLASS,202 f'--runner={DEFAULT_RUNNER}',203 '--output=gs://test/output',204 '--labels={"foo":"bar"}',205 ]206 mock_runner.assert_called_once_with(207 cmd=expected_cmd, process_line_callback=process_line_callback, working_directory=None208 )209 wait_for_done.assert_called_once_with()210 @mock.patch(BEAM_STRING.format('shutil.which'))211 @mock.patch(BEAM_STRING.format('BeamCommandRunner'))212 def test_start_go_pipeline(self, mock_runner, mock_which):213 mock_which.return_value = "/some_path/to/go"214 hook = BeamHook(runner=DEFAULT_RUNNER)215 wait_for_done = mock_runner.return_value.wait_for_done216 process_line_callback = MagicMock()217 hook.start_go_pipeline(218 go_file=GO_FILE,219 variables=copy.deepcopy(BEAM_VARIABLES_GO),220 process_line_callback=process_line_callback,221 )222 basename = os.path.basename(GO_FILE)223 go_workspace = os.path.dirname(GO_FILE)224 expected_cmd = [225 'go',226 'run',227 basename,228 f'--runner={DEFAULT_RUNNER}',229 '--output=gs://test/output',230 '--labels={"foo":"bar"}',231 ]232 mock_runner.assert_called_once_with(233 cmd=expected_cmd, process_line_callback=process_line_callback, working_directory=go_workspace234 )235 wait_for_done.assert_called_once_with()236 @mock.patch(BEAM_STRING.format('shutil.which'))237 def test_start_go_pipeline_without_go_installed_raises(self, mock_which):238 mock_which.return_value = None239 hook = BeamHook(runner=DEFAULT_RUNNER)240 with self.assertRaises(AirflowException) as ex_ctx:241 hook.start_go_pipeline(242 go_file=GO_FILE,243 variables=copy.deepcopy(BEAM_VARIABLES_GO),244 )245 assert (246 "You need to have Go installed to run beam go pipeline. See https://go.dev/doc/install "247 "installation guide. If you are running airflow in Docker see more info at "248 "'https://airflow.apache.org/docs/docker-stack/recipes.html'." == str(ex_ctx.exception)249 )250class TestBeamRunner(unittest.TestCase):251 @mock.patch('airflow.providers.apache.beam.hooks.beam.BeamCommandRunner.log')252 @mock.patch('subprocess.Popen')253 @mock.patch('select.select')254 def test_beam_wait_for_done_logging(self, mock_select, mock_popen, mock_logging):255 cmd = ['test', 'cmd']256 mock_logging.info = MagicMock()257 mock_logging.warning = MagicMock()258 mock_proc = MagicMock()259 mock_proc.stderr = MagicMock()260 mock_proc.stderr.readlines = MagicMock(return_value=['test\n', 'error\n'])261 mock_stderr_fd = MagicMock()262 mock_proc.stderr.fileno = MagicMock(return_value=mock_stderr_fd)263 mock_proc_poll = MagicMock()264 mock_select.return_value = [[mock_stderr_fd]]265 def poll_resp_error():266 mock_proc.return_code = 1267 return True268 mock_proc_poll.side_effect = [None, poll_resp_error]269 mock_proc.poll = mock_proc_poll270 mock_popen.return_value = mock_proc271 beam = BeamCommandRunner(cmd)272 mock_logging.info.assert_called_once_with('Running command: %s', " ".join(cmd))273 mock_popen.assert_called_once_with(274 cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, cwd=None275 )276 self.assertRaises(Exception, beam.wait_for_done)277class TestBeamOptionsToArgs(unittest.TestCase):278 @parameterized.expand(279 [280 ({"key": "val"}, ["--key=val"]),281 ({"key": None}, ["--key"]),282 ({"key": True}, ["--key"]),283 ({"key": False}, ["--key=False"]),284 ({"key": ["a", "b", "c"]}, ["--key=a", "--key=b", "--key=c"]),285 ]286 )287 def test_beam_options_to_args(self, options, expected_args):288 args = beam_options_to_args(options)...

Full Screen

Full Screen

round_trip_fault_test.py

Source:round_trip_fault_test.py Github

copy

Full Screen

...50 self.kafka.stop()51 self.zk.stop()52 def test_round_trip_workload(self):53 workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)54 workload1.wait_for_done(timeout_sec=600)55 def test_round_trip_workload_with_broker_partition(self):56 workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)57 time.sleep(2)58 part1 = [self.kafka.nodes[0]]59 part2 = self.kafka.nodes[1:] + [self.workload_service.nodes[0]] + self.zk.nodes60 partition1_spec = NetworkPartitionFaultSpec(0, TaskSpec.MAX_DURATION_MS,61 [part1, part2])62 partition1 = self.trogdor.create_task("partition1", partition1_spec)63 workload1.wait_for_done(timeout_sec=600)64 partition1.stop()65 partition1.wait_for_done()66 def test_produce_consume_with_broker_pause(self):67 workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)68 time.sleep(2)69 stop1_spec = ProcessStopFaultSpec(0, TaskSpec.MAX_DURATION_MS, [self.kafka.nodes[0]],70 self.kafka.java_class_name())71 stop1 = self.trogdor.create_task("stop1", stop1_spec)72 workload1.wait_for_done(timeout_sec=600)73 stop1.stop()74 stop1.wait_for_done()75 self.kafka.stop_node(self.kafka.nodes[0], False)76 def test_produce_consume_with_client_partition(self):77 workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)78 time.sleep(2)79 part1 = [self.workload_service.nodes[0]]80 part2 = self.kafka.nodes + self.zk.nodes81 partition1_spec = NetworkPartitionFaultSpec(0, 60000, [part1, part2])82 stop1 = self.trogdor.create_task("stop1", partition1_spec)83 workload1.wait_for_done(timeout_sec=600)84 stop1.stop()85 stop1.wait_for_done()86 def test_produce_consume_with_latency(self):87 workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)88 time.sleep(2)89 spec = DegradedNetworkFaultSpec(0, 60000)90 for node in self.kafka.nodes + self.zk.nodes:91 spec.add_node_spec(node.name, "eth0", latencyMs=100, rateLimitKbit=3000)92 slow1 = self.trogdor.create_task("slow1", spec)93 workload1.wait_for_done(timeout_sec=600)94 slow1.stop()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful