How to use retried method in elementium

Best Python code snippet using elementium_python

test_query_retries.py

Source:test_query_retries.py Github

copy

Full Screen

...231 # Kill one impalad so that a retry is triggered.232 killed_impalad = self.cluster.impalads[1]233 killed_impalad.kill()234 # Wait until the retry is running.235 self.__wait_until_retried(handle)236 # Kill another impalad so that another retry is attempted.237 self.cluster.impalads[2].kill()238 # Wait until the query fails.239 self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)240 # Validate the live exec summary.241 retried_query_id = self.__get_retried_query_id_from_summary(handle)242 assert retried_query_id is not None243 # The runtime profile and client log of the retried query, need to be retrieved244 # before fetching results, since the failed fetch attempt will close the245 # query handle.246 retried_runtime_profile = self.client.get_runtime_profile(handle)247 self.__validate_client_log(handle, retried_query_id)248 # Assert that the query failed, since a query can only be retried once.249 try:250 self.client.fetch(self._shuffle_heavy_query, handle)251 assert False252 except ImpalaBeeswaxException, e:253 assert "Max retry limit was hit. Query was retried 1 time(s)." in str(e)254 # Assert that the killed impalad shows up in the list of blacklisted executors from255 # the runtime profile.256 self.__assert_executors_blacklisted(killed_impalad, retried_runtime_profile)257 # Assert that the query id of the original query is in the runtime profile of the258 # retried query.259 self.__validate_original_id_in_profile(retried_runtime_profile,260 handle.get_handle().id)261 # Validate the state of the web ui. The query must be closed before validating the262 # state since it asserts that no queries are in flight.263 self.__validate_web_ui_state()264 @pytest.mark.execute_serially265 def test_retry_fetched_rows(self):266 """Test that query retries are not triggered if some rows have already been267 fetched. Run a query, fetch some rows from it, kill one of the impalads that is268 running the query, and the validate that another fetch request fails."""269 query = "select * from functional.alltypes where bool_col = sleep(500)"270 handle = self.execute_query_async(query,271 query_options={'retry_failed_queries': 'true', 'batch_size': '1'})272 self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)273 self.client.fetch(query, handle, max_rows=1)274 self.cluster.impalads[1].kill()275 time.sleep(5)276 # Assert that attempt to fetch from the query handle fails.277 try:278 self.client.fetch(query, handle)279 assert False280 except Exception as e:281 assert "Failed due to unreachable impalad" in str(e)282 assert "Skipping retry of query_id=%s because the client has already " \283 "fetched some rows" % handle.get_handle().id in str(e)284 @pytest.mark.execute_serially285 def test_spooling_all_results_for_retries(self):286 """Test retryable queries with spool_all_results_for_retries=true will spool all287 results when results spooling is enabled."""288 handle = self.execute_query_async(self._union_query, query_options={289 'retry_failed_queries': 'true', 'spool_query_results': 'true',290 'spool_all_results_for_retries': 'true'})291 # Fetch one row first.292 results = self.client.fetch(self._union_query, handle, max_rows=1)293 assert len(results.data) == 1294 assert int(results.data[0]) == 8295 # All results are spooled since we are able to fetch some results.296 # Killing an impalad should not trigger query retry.297 self.__kill_random_impalad()298 time.sleep(5)299 # We are still able to fetch the remaining results.300 results = self.client.fetch(self._union_query, handle)301 assert len(results.data) == 1302 assert int(results.data[0]) == 3650303 # Verify no retry happens304 retried_query_id = self.__get_retried_query_id_from_summary(handle)305 assert retried_query_id is None306 runtime_profile = self.client.get_runtime_profile(handle)307 assert self.__get_query_id_from_profile(runtime_profile) == handle.get_handle().id308 self.client.close_query(handle)309 @pytest.mark.execute_serially310 def test_query_retry_in_spooling(self):311 """Test retryable queries with results spooling enabled and312 spool_all_results_for_retries=true can be safely retried for failures that happen when313 it's still spooling the results"""314 handle = self.execute_query_async(self._union_query, query_options={315 'retry_failed_queries': 'true', 'spool_query_results': 'true',316 'spool_all_results_for_retries': 'true'})317 # Wait until the first union operand finishes, so some results are spooled.318 self.wait_for_progress(handle, 0.1, 60)319 self.__kill_random_impalad()320 # Still able to fetch the correct result since the query is retried.321 results = self.client.fetch(self._union_query, handle)322 assert len(results.data) == 2323 assert int(results.data[0]) == 8324 assert int(results.data[1]) == 3650325 # Verify the query has been retried326 retried_query_id = self.__get_retried_query_id_from_summary(handle)327 assert retried_query_id is not None328 self.client.close_query(handle)329 @pytest.mark.execute_serially330 def test_retried_query_not_spooling_all_results(self):331 """Test retried query can return results immediately even when results spooling and332 spool_all_results_for_retries are enabled in the original query."""333 handle = self.execute_query_async(self._union_query, query_options={334 'retry_failed_queries': 'true', 'spool_query_results': 'true',335 'spool_all_results_for_retries': 'true'})336 # Wait until the first union operand finishes and then kill one impalad.337 self.wait_for_progress(handle, 0.1, 60)338 # Kill one impalad so the query will be retried.339 self.__kill_random_impalad()340 time.sleep(5)341 # Verify that we are able to fetch results of the first union operand while the query342 # is still executing the second union operand.343 results = self.client.fetch(self._union_query, handle, max_rows=1)344 assert len(results.data) == 1345 assert int(results.data[0]) == 8346 # Assert that the query is still executing the second union operand.347 summary = self.client.get_exec_summary(handle)348 assert summary.progress.num_completed_scan_ranges < summary.progress.total_scan_ranges349 self.client.close_query(handle)350 @pytest.mark.execute_serially351 def test_query_retry_reaches_spool_limit(self):352 """Test retryable queries with results spooling enabled and353 spool_all_results_for_retries=true that reach spooling mem limit will return rows and354 skip retry"""355 query = "select * from functional.alltypes where bool_col = sleep(500)"356 # Set lower values for spill-to-disk configs to force the above query to spill357 # spooled results and hit result queue limit.358 handle = self.execute_query_async(query, query_options={359 'batch_size': 1,360 'spool_query_results': True,361 'retry_failed_queries': True,362 'spool_all_results_for_retries': True,363 'min_spillable_buffer_size': 8 * 1024,364 'default_spillable_buffer_size': 8 * 1024,365 'max_result_spooling_mem': 8 * 1024,366 'max_spilled_result_spooling_mem': 8 * 1024})367 # Wait until we can fetch some results368 results = self.client.fetch(query, handle, max_rows=1)369 assert len(results.data) == 1370 # Assert that the query is still executing371 summary = self.client.get_exec_summary(handle)372 assert summary.progress.num_completed_scan_ranges < summary.progress.total_scan_ranges373 self.assert_impalad_log_contains('INFO', 'Cannot spool all results in the allocated'374 ' result spooling space. Query retry will be skipped if any results have been '375 'returned.', expected_count=1)376 # Kill one impalad and assert that the query is not retried.377 self.__kill_random_impalad()378 try:379 self.client.fetch(query, handle)380 assert False, "fetch should fail"381 except ImpalaBeeswaxException as e:382 assert "Failed due to unreachable impalad" in str(e)383 assert "Skipping retry of query_id=%s because the client has already " \384 "fetched some rows" % handle.get_handle().id in str(e)385 @pytest.mark.execute_serially386 def test_original_query_cancel(self):387 """Test canceling a retryable query with spool_all_results_for_retries=true. Make sure388 Coordinator::Wait() won't block in cancellation."""389 for state in ['RUNNING', 'FINISHED']:390 handle = self.execute_query_async(self._union_query, query_options={391 'retry_failed_queries': 'true', 'spool_query_results': 'true',392 'spool_all_results_for_retries': 'true'})393 self.wait_for_state(handle, self.client.QUERY_STATES[state], 60)394 # Cancel the query.395 self.client.cancel(handle)396 # Assert that attempt to fetch from the query handle fails with a cancellation397 # error398 try:399 self.client.fetch(self._union_query, handle)400 assert False401 except Exception as e:402 assert "Cancelled" in str(e)403 @pytest.mark.execute_serially404 def test_retry_finished_query(self):405 """Test that queries in FINISHED state can still be retried before the client fetch406 any rows. Sets batch_size to 1 so results will be available as soon as possible.407 The query state becomes FINISHED when results are available."""408 query = "select * from functional.alltypes where bool_col = sleep(50)"409 handle = self.execute_query_async(query,410 query_options={'retry_failed_queries': 'true', 'batch_size': '1'})411 self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)412 self.__kill_random_impalad()413 time.sleep(5)414 self.client.fetch(query, handle)415 # Verifies the query is retried.416 retried_query_id = self.__get_retried_query_id_from_summary(handle)417 assert retried_query_id is not None418 self.client.close_query(handle)419 @pytest.mark.execute_serially420 @CustomClusterTestSuite.with_args(421 statestored_args="-statestore_heartbeat_frequency_ms=60000")422 def test_retry_query_cancel(self):423 """Trigger a query retry, and then cancel the retried query. Validate that the424 cancelled query fails with the correct error message. Set a really high statestore425 heartbeat frequency so that killed impalads are not removed from the cluster426 membership."""427 impalad_service = self.cluster.get_first_impalad().service428 # Kill an impalad, and run a query. The query should be retried.429 self.cluster.impalads[1].kill()430 query = "select count(*) from tpch_parquet.lineitem"431 handle = self.execute_query_async(query,432 query_options={'retry_failed_queries': 'true'})433 self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)434 # Validate the live exec summary.435 retried_query_id = self.__get_retried_query_id_from_summary(handle)436 assert retried_query_id is not None437 # Validate that the query was retried.438 profile_retried_query_id = \439 self.__validate_runtime_profiles_from_service(impalad_service, handle)440 assert profile_retried_query_id == retried_query_id441 self.__validate_client_log(handle, retried_query_id)442 # Cancel the query.443 self.client.cancel(handle)444 # Assert than attempt to fetch from the query handle fails with a cancellation445 # error446 try:447 self.client.fetch(query, handle)448 assert False449 except Exception, e:450 assert "Cancelled" in str(e)451 @pytest.mark.execute_serially452 @CustomClusterTestSuite.with_args(453 statestored_args="-statestore_heartbeat_frequency_ms=60000")454 def test_retry_query_timeout(self):455 """Trigger a query retry, and then leave the query handle open until the456 'query_timeout_s' causes the handle to be closed. Assert that the runtime profile of457 the original and retried queries are correct, and that the 'num-queries-expired'458 metric is properly incremented. Set a really high statestore heartbeat frequency so459 that killed impalads are not removed from the cluster membership."""460 impalad_service = self.cluster.get_first_impalad().service461 # Kill an impalad, and run a query. The query should be retried.462 self.cluster.impalads[1].kill()463 query = "select count(*) from tpch_parquet.lineitem"464 handle = self.execute_query_async(query,465 query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'})466 self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)467 # Validate the live exec summary.468 retried_query_id = self.__get_retried_query_id_from_summary(handle)469 assert retried_query_id is not None470 # Wait for the query timeout to expire the query handle.471 time.sleep(5)472 # Validate that the query was retried.473 profile_retried_query_id = \474 self.__validate_runtime_profiles_from_service(impalad_service, handle)475 assert profile_retried_query_id == retried_query_id476 self.__validate_client_log(handle, retried_query_id)477 # Assert than attempt to fetch from the query handle fails with a query expired478 # error.479 try:480 self.client.fetch(query, handle)481 assert False482 except Exception, e:483 assert "expired due to client inactivity" in str(e)484 # Assert that the impalad metrics show one expired query.485 assert impalad_service.get_metric_value('impala-server.num-queries-expired') == 1486 @pytest.mark.execute_serially487 @CustomClusterTestSuite.with_args(impalad_args="--idle_session_timeout=1",488 statestored_args="--statestore_heartbeat_frequency_ms=60000")489 def test_retry_query_session_timeout(self):490 """Similar to 'test_retry_query_timeout' except with an idle session timeout."""491 self.close_impala_clients()492 impalad_service = self.cluster.get_first_impalad().service493 # Kill an impalad, and run a query. The query should be retried.494 self.cluster.impalads[1].kill()495 query = "select count(*) from tpch_parquet.lineitem"496 client = self.cluster.get_first_impalad().service.create_beeswax_client()497 client.set_configuration({'retry_failed_queries': 'true'})498 handle = client.execute_async(query)499 self.wait_for_state(handle, client.QUERY_STATES['FINISHED'], 60, client=client)500 # Wait for the idle session timeout to expire the session.501 time.sleep(5)502 # Validate that the query was retried. Skip validating client log since we can't503 # get it using the expired session.504 self.__validate_runtime_profiles_from_service(impalad_service, handle)505 # Assert than attempt to fetch from the query handle fails with a session expired506 # error.507 try:508 client.fetch(query, handle)509 except Exception, e:510 assert "Client session expired" in str(e)511 # Assert that the impalad metrics show one expired session.512 assert impalad_service.get_metric_value('impala-server.num-sessions-expired') == 1513 @pytest.mark.execute_serially514 @CustomClusterTestSuite.with_args(515 statestored_args="-statestore_heartbeat_frequency_ms=60000")516 def test_retry_query_hs2(self):517 """Test query retries with the HS2 protocol. Enable the results set cache as well and518 test that query retries work with the results cache."""519 self.cluster.impalads[1].kill()520 query = "select count(*) from tpch_parquet.lineitem"521 self.hs2_client.set_configuration({'retry_failed_queries': 'true'})522 self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')523 handle = self.hs2_client.execute_async(query)524 self.wait_for_state(handle, 'FINISHED_STATE', 60, client=self.hs2_client)525 results = self.hs2_client.fetch(query, handle)526 assert results.success527 assert len(results.data) == 1528 assert int(results.data[0]) == 6001215529 # Validate the live exec summary.530 retried_query_id = \531 self.__get_retried_query_id_from_summary(handle, use_hs2_client=True)532 assert retried_query_id is not None533 # Validate the state of the runtime profiles.534 retried_runtime_profile = self.hs2_client.get_runtime_profile(handle,535 TRuntimeProfileFormat.STRING)536 self.__validate_runtime_profiles(537 retried_runtime_profile, self.hs2_client.get_query_id(handle), retried_query_id)538 self.__validate_client_log(handle, retried_query_id, use_hs2_client=True)539 self.impalad_test_service.wait_for_metric_value(540 'impala-server.resultset-cache.total-num-rows', 1, timeout=60)541 self.hs2_client.close_query(handle)542 def __validate_runtime_profiles_from_service(self, impalad_service, handle):543 """Wrapper around '__validate_runtime_profiles' that first fetches the retried profile544 from the web ui."""545 original_profile = impalad_service.read_query_profile_page(handle.get_handle().id)546 retried_query_id = self.__get_retried_query_id_from_profile(original_profile)547 retried_profile = impalad_service.read_query_profile_page(retried_query_id)548 self.__validate_runtime_profiles(549 retried_profile, handle.get_handle().id, retried_query_id)550 return retried_query_id551 def __get_retried_query_id_from_profile(self, profile):552 """Returns the entry for 'Retried Query Id' from the given profile, or 'None' if no553 such entry exists."""554 retried_query_id_search = re.search("Retried Query Id: (.*)", profile)555 if not retried_query_id_search: return None556 return retried_query_id_search.group(1)557 def __wait_until_retried(self, handle, timeout=300):558 """Wait until the given query handle has been retried. This is achieved by polling the559 runtime profile of the query and checking the 'Retry Status' field."""560 retried_state = "RETRIED"561 def __get_retry_status():562 profile = self.__get_original_query_profile(handle.get_handle().id)563 retry_status = re.search("Retry Status: (.*)", profile)564 return retry_status.group(1) if retry_status else None565 start_time = time.time()566 retry_status = __get_retry_status()567 while retry_status != retried_state and time.time() - start_time < timeout:568 retry_status = __get_retry_status()569 time.sleep(0.5)570 if retry_status != retried_state:571 raise Timeout("query {0} was not retried within timeout".format...

Full Screen

Full Screen

test_job_requeuing.py

Source:test_job_requeuing.py Github

copy

Full Screen

1from test.support import EnvironmentVarGuard2from unittest.mock import patch3from django.conf import settings4from django.test import TestCase5from django.utils import timezone6from data_refinery_common.models import DownloaderJob, ProcessorJob, SurveyJob7from data_refinery_foreman.foreman import job_requeuing8from data_refinery_foreman.foreman.test_utils import (9 create_downloader_job,10 create_processor_job,11 create_survey_job,12)13def fake_send_job(job_type, job, is_dispatch=False):14 job.batch_job_queue = settings.AWS_BATCH_QUEUE_WORKERS_NAMES[0]15 job.save()16 return True17class JobRequeuingTestCase(TestCase):18 @patch("data_refinery_foreman.foreman.job_requeuing.send_job")19 def test_requeuing_downloader_job(self, mock_send_job):20 mock_send_job.side_effect = fake_send_job21 job = create_downloader_job()22 job_requeuing.requeue_downloader_job(job)23 self.assertEqual(len(mock_send_job.mock_calls), 1)24 jobs = DownloaderJob.objects.order_by("id")25 original_job = jobs[0]26 self.assertTrue(original_job.retried)27 self.assertEqual(original_job.num_retries, 0)28 self.assertFalse(original_job.success)29 retried_job = jobs[1]30 self.assertEqual(retried_job.num_retries, 1)31 self.assertEqual(retried_job.original_files.count(), 2)32 @patch("data_refinery_foreman.foreman.job_requeuing.send_job")33 def test_requeuing_processor_job(self, mock_send_job):34 mock_send_job.side_effect = fake_send_job35 job = create_processor_job()36 job_requeuing.requeue_processor_job(job)37 self.assertEqual(len(mock_send_job.mock_calls), 1)38 jobs = ProcessorJob.objects.order_by("id")39 original_job = jobs[0]40 self.assertTrue(original_job.retried)41 self.assertEqual(original_job.num_retries, 0)42 self.assertFalse(original_job.success)43 retried_job = jobs[1]44 self.assertEqual(retried_job.num_retries, 1)45 @patch("data_refinery_foreman.foreman.job_requeuing.send_job")46 def test_requeuing_processor_job_no_batch_job_queue(self, mock_send_job):47 mock_send_job.side_effect = fake_send_job48 job = create_processor_job()49 job.batch_job_queue = None50 job.save()51 self.env = EnvironmentVarGuard()52 self.env.set("RUNING_IN_CLOUD", "True")53 with self.settings(RUNNING_IN_CLOUD=True):54 job_requeuing.requeue_processor_job(job)55 self.assertEqual(len(mock_send_job.mock_calls), 1)56 jobs = ProcessorJob.objects.order_by("id")57 original_job = jobs[0]58 self.assertTrue(original_job.retried)59 self.assertEqual(original_job.num_retries, 0)60 self.assertFalse(original_job.success)61 retried_job = jobs[1]62 self.assertEqual(retried_job.num_retries, 1)63 self.assertIsNotNone(retried_job.batch_job_queue)64 @patch("data_refinery_foreman.foreman.job_requeuing.send_job")65 def test_requeuing_compendia_job_no_batch_job_queue(self, mock_send_job):66 mock_send_job.side_effect = fake_send_job67 job = create_processor_job()68 job.batch_job_queue = None69 job.pipeline_applied = "CREATE_COMPENDIA"70 job.save()71 self.env = EnvironmentVarGuard()72 self.env.set("RUNING_IN_CLOUD", "True")73 with self.settings(RUNNING_IN_CLOUD=True):74 job_requeuing.requeue_processor_job(job)75 self.assertEqual(len(mock_send_job.mock_calls), 1)76 jobs = ProcessorJob.objects.order_by("id")77 original_job = jobs[0]78 self.assertTrue(original_job.retried)79 self.assertEqual(original_job.num_retries, 0)80 self.assertFalse(original_job.success)81 retried_job = jobs[1]82 self.assertEqual(retried_job.num_retries, 1)83 self.assertIsNotNone(retried_job.batch_job_queue)84 @patch("data_refinery_foreman.foreman.job_requeuing.send_job")85 def test_requeuing_processor_job_w_more_ram(self, mock_send_job):86 mock_send_job.side_effect = fake_send_job87 job = create_processor_job(pipeline="SALMON", ram_amount=16384, start_time=timezone.now())88 job_requeuing.requeue_processor_job(job)89 self.assertEqual(len(mock_send_job.mock_calls), 1)90 jobs = ProcessorJob.objects.order_by("id")91 original_job = jobs[0]92 self.assertTrue(original_job.retried)93 self.assertEqual(original_job.num_retries, 0)94 self.assertFalse(original_job.success)95 retried_job = jobs[1]96 self.assertEqual(retried_job.num_retries, 1)97 self.assertEqual(original_job.ram_amount, 16384)98 self.assertEqual(retried_job.ram_amount, 32768)99 @patch("data_refinery_foreman.foreman.job_requeuing.send_job")100 def test_requeuing_survey_job(self, mock_send_job):101 mock_send_job.side_effect = fake_send_job102 job = create_survey_job()103 job_requeuing.requeue_survey_job(job)104 self.assertEqual(len(mock_send_job.mock_calls), 1)105 jobs = SurveyJob.objects.order_by("id")106 original_job = jobs[0]107 self.assertTrue(original_job.retried)108 self.assertEqual(original_job.num_retries, 0)109 self.assertFalse(original_job.success)110 retried_job = jobs[1]...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run elementium automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful