Best Python code snippet using elementium_python
test_query_retries.py
Source:test_query_retries.py  
...231    # Kill one impalad so that a retry is triggered.232    killed_impalad = self.cluster.impalads[1]233    killed_impalad.kill()234    # Wait until the retry is running.235    self.__wait_until_retried(handle)236    # Kill another impalad so that another retry is attempted.237    self.cluster.impalads[2].kill()238    # Wait until the query fails.239    self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)240    # Validate the live exec summary.241    retried_query_id = self.__get_retried_query_id_from_summary(handle)242    assert retried_query_id is not None243    # The runtime profile and client log of the retried query, need to be retrieved244    # before fetching results, since the failed fetch attempt will close the245    # query handle.246    retried_runtime_profile = self.client.get_runtime_profile(handle)247    self.__validate_client_log(handle, retried_query_id)248    # Assert that the query failed, since a query can only be retried once.249    try:250      self.client.fetch(self._shuffle_heavy_query, handle)251      assert False252    except ImpalaBeeswaxException, e:253      assert "Max retry limit was hit. Query was retried 1 time(s)." in str(e)254    # Assert that the killed impalad shows up in the list of blacklisted executors from255    # the runtime profile.256    self.__assert_executors_blacklisted(killed_impalad, retried_runtime_profile)257    # Assert that the query id of the original query is in the runtime profile of the258    # retried query.259    self.__validate_original_id_in_profile(retried_runtime_profile,260            handle.get_handle().id)261    # Validate the state of the web ui. The query must be closed before validating the262    # state since it asserts that no queries are in flight.263    self.__validate_web_ui_state()264  @pytest.mark.execute_serially265  def test_retry_fetched_rows(self):266    """Test that query retries are not triggered if some rows have already been267    fetched. Run a query, fetch some rows from it, kill one of the impalads that is268    running the query, and the validate that another fetch request fails."""269    query = "select * from functional.alltypes where bool_col = sleep(500)"270    handle = self.execute_query_async(query,271        query_options={'retry_failed_queries': 'true', 'batch_size': '1'})272    self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)273    self.client.fetch(query, handle, max_rows=1)274    self.cluster.impalads[1].kill()275    time.sleep(5)276    # Assert that attempt to fetch from the query handle fails.277    try:278      self.client.fetch(query, handle)279      assert False280    except Exception as e:281      assert "Failed due to unreachable impalad" in str(e)282      assert "Skipping retry of query_id=%s because the client has already " \283             "fetched some rows" % handle.get_handle().id in str(e)284  @pytest.mark.execute_serially285  def test_spooling_all_results_for_retries(self):286    """Test retryable queries with spool_all_results_for_retries=true will spool all287    results when results spooling is enabled."""288    handle = self.execute_query_async(self._union_query, query_options={289        'retry_failed_queries': 'true', 'spool_query_results': 'true',290        'spool_all_results_for_retries': 'true'})291    # Fetch one row first.292    results = self.client.fetch(self._union_query, handle, max_rows=1)293    assert len(results.data) == 1294    assert int(results.data[0]) == 8295    # All results are spooled since we are able to fetch some results.296    # Killing an impalad should not trigger query retry.297    self.__kill_random_impalad()298    time.sleep(5)299    # We are still able to fetch the remaining results.300    results = self.client.fetch(self._union_query, handle)301    assert len(results.data) == 1302    assert int(results.data[0]) == 3650303    # Verify no retry happens304    retried_query_id = self.__get_retried_query_id_from_summary(handle)305    assert retried_query_id is None306    runtime_profile = self.client.get_runtime_profile(handle)307    assert self.__get_query_id_from_profile(runtime_profile) == handle.get_handle().id308    self.client.close_query(handle)309  @pytest.mark.execute_serially310  def test_query_retry_in_spooling(self):311    """Test retryable queries with results spooling enabled and312    spool_all_results_for_retries=true can be safely retried for failures that happen when313    it's still spooling the results"""314    handle = self.execute_query_async(self._union_query, query_options={315      'retry_failed_queries': 'true', 'spool_query_results': 'true',316      'spool_all_results_for_retries': 'true'})317    # Wait until the first union operand finishes, so some results are spooled.318    self.wait_for_progress(handle, 0.1, 60)319    self.__kill_random_impalad()320    # Still able to fetch the correct result since the query is retried.321    results = self.client.fetch(self._union_query, handle)322    assert len(results.data) == 2323    assert int(results.data[0]) == 8324    assert int(results.data[1]) == 3650325    # Verify the query has been retried326    retried_query_id = self.__get_retried_query_id_from_summary(handle)327    assert retried_query_id is not None328    self.client.close_query(handle)329  @pytest.mark.execute_serially330  def test_retried_query_not_spooling_all_results(self):331    """Test retried query can return results immediately even when results spooling and332    spool_all_results_for_retries are enabled in the original query."""333    handle = self.execute_query_async(self._union_query, query_options={334      'retry_failed_queries': 'true', 'spool_query_results': 'true',335      'spool_all_results_for_retries': 'true'})336    # Wait until the first union operand finishes and then kill one impalad.337    self.wait_for_progress(handle, 0.1, 60)338    # Kill one impalad so the query will be retried.339    self.__kill_random_impalad()340    time.sleep(5)341    # Verify that we are able to fetch results of the first union operand while the query342    # is still executing the second union operand.343    results = self.client.fetch(self._union_query, handle, max_rows=1)344    assert len(results.data) == 1345    assert int(results.data[0]) == 8346    # Assert that the query is still executing the second union operand.347    summary = self.client.get_exec_summary(handle)348    assert summary.progress.num_completed_scan_ranges < summary.progress.total_scan_ranges349    self.client.close_query(handle)350  @pytest.mark.execute_serially351  def test_query_retry_reaches_spool_limit(self):352    """Test retryable queries with results spooling enabled and353    spool_all_results_for_retries=true that reach spooling mem limit will return rows and354    skip retry"""355    query = "select * from functional.alltypes where bool_col = sleep(500)"356    # Set lower values for spill-to-disk configs to force the above query to spill357    # spooled results and hit result queue limit.358    handle = self.execute_query_async(query, query_options={359        'batch_size': 1,360        'spool_query_results': True,361        'retry_failed_queries': True,362        'spool_all_results_for_retries': True,363        'min_spillable_buffer_size': 8 * 1024,364        'default_spillable_buffer_size': 8 * 1024,365        'max_result_spooling_mem': 8 * 1024,366        'max_spilled_result_spooling_mem': 8 * 1024})367    # Wait until we can fetch some results368    results = self.client.fetch(query, handle, max_rows=1)369    assert len(results.data) == 1370    # Assert that the query is still executing371    summary = self.client.get_exec_summary(handle)372    assert summary.progress.num_completed_scan_ranges < summary.progress.total_scan_ranges373    self.assert_impalad_log_contains('INFO', 'Cannot spool all results in the allocated'374        ' result spooling space. Query retry will be skipped if any results have been '375        'returned.', expected_count=1)376    # Kill one impalad and assert that the query is not retried.377    self.__kill_random_impalad()378    try:379      self.client.fetch(query, handle)380      assert False, "fetch should fail"381    except ImpalaBeeswaxException as e:382      assert "Failed due to unreachable impalad" in str(e)383      assert "Skipping retry of query_id=%s because the client has already " \384             "fetched some rows" % handle.get_handle().id in str(e)385  @pytest.mark.execute_serially386  def test_original_query_cancel(self):387    """Test canceling a retryable query with spool_all_results_for_retries=true. Make sure388    Coordinator::Wait() won't block in cancellation."""389    for state in ['RUNNING', 'FINISHED']:390      handle = self.execute_query_async(self._union_query, query_options={391        'retry_failed_queries': 'true', 'spool_query_results': 'true',392        'spool_all_results_for_retries': 'true'})393      self.wait_for_state(handle, self.client.QUERY_STATES[state], 60)394      # Cancel the query.395      self.client.cancel(handle)396      # Assert that attempt to fetch from the query handle fails with a cancellation397      # error398      try:399        self.client.fetch(self._union_query, handle)400        assert False401      except Exception as e:402        assert "Cancelled" in str(e)403  @pytest.mark.execute_serially404  def test_retry_finished_query(self):405    """Test that queries in FINISHED state can still be retried before the client fetch406    any rows. Sets batch_size to 1 so results will be available as soon as possible.407    The query state becomes FINISHED when results are available."""408    query = "select * from functional.alltypes where bool_col = sleep(50)"409    handle = self.execute_query_async(query,410        query_options={'retry_failed_queries': 'true', 'batch_size': '1'})411    self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)412    self.__kill_random_impalad()413    time.sleep(5)414    self.client.fetch(query, handle)415    # Verifies the query is retried.416    retried_query_id = self.__get_retried_query_id_from_summary(handle)417    assert retried_query_id is not None418    self.client.close_query(handle)419  @pytest.mark.execute_serially420  @CustomClusterTestSuite.with_args(421      statestored_args="-statestore_heartbeat_frequency_ms=60000")422  def test_retry_query_cancel(self):423    """Trigger a query retry, and then cancel the retried query. Validate that the424    cancelled query fails with the correct error message. Set a really high statestore425    heartbeat frequency so that killed impalads are not removed from the cluster426    membership."""427    impalad_service = self.cluster.get_first_impalad().service428    # Kill an impalad, and run a query. The query should be retried.429    self.cluster.impalads[1].kill()430    query = "select count(*) from tpch_parquet.lineitem"431    handle = self.execute_query_async(query,432        query_options={'retry_failed_queries': 'true'})433    self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)434    # Validate the live exec summary.435    retried_query_id = self.__get_retried_query_id_from_summary(handle)436    assert retried_query_id is not None437    # Validate that the query was retried.438    profile_retried_query_id = \439        self.__validate_runtime_profiles_from_service(impalad_service, handle)440    assert profile_retried_query_id == retried_query_id441    self.__validate_client_log(handle, retried_query_id)442    # Cancel the query.443    self.client.cancel(handle)444    # Assert than attempt to fetch from the query handle fails with a cancellation445    # error446    try:447      self.client.fetch(query, handle)448      assert False449    except Exception, e:450        assert "Cancelled" in str(e)451  @pytest.mark.execute_serially452  @CustomClusterTestSuite.with_args(453      statestored_args="-statestore_heartbeat_frequency_ms=60000")454  def test_retry_query_timeout(self):455    """Trigger a query retry, and then leave the query handle open until the456    'query_timeout_s' causes the handle to be closed. Assert that the runtime profile of457    the original and retried queries are correct, and that the 'num-queries-expired'458    metric is properly incremented. Set a really high statestore heartbeat frequency so459    that killed impalads are not removed from the cluster membership."""460    impalad_service = self.cluster.get_first_impalad().service461    # Kill an impalad, and run a query. The query should be retried.462    self.cluster.impalads[1].kill()463    query = "select count(*) from tpch_parquet.lineitem"464    handle = self.execute_query_async(query,465        query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'})466    self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)467    # Validate the live exec summary.468    retried_query_id = self.__get_retried_query_id_from_summary(handle)469    assert retried_query_id is not None470    # Wait for the query timeout to expire the query handle.471    time.sleep(5)472    # Validate that the query was retried.473    profile_retried_query_id = \474        self.__validate_runtime_profiles_from_service(impalad_service, handle)475    assert profile_retried_query_id == retried_query_id476    self.__validate_client_log(handle, retried_query_id)477    # Assert than attempt to fetch from the query handle fails with a query expired478    # error.479    try:480      self.client.fetch(query, handle)481      assert False482    except Exception, e:483        assert "expired due to client inactivity" in str(e)484    # Assert that the impalad metrics show one expired query.485    assert impalad_service.get_metric_value('impala-server.num-queries-expired') == 1486  @pytest.mark.execute_serially487  @CustomClusterTestSuite.with_args(impalad_args="--idle_session_timeout=1",488        statestored_args="--statestore_heartbeat_frequency_ms=60000")489  def test_retry_query_session_timeout(self):490    """Similar to 'test_retry_query_timeout' except with an idle session timeout."""491    self.close_impala_clients()492    impalad_service = self.cluster.get_first_impalad().service493    # Kill an impalad, and run a query. The query should be retried.494    self.cluster.impalads[1].kill()495    query = "select count(*) from tpch_parquet.lineitem"496    client = self.cluster.get_first_impalad().service.create_beeswax_client()497    client.set_configuration({'retry_failed_queries': 'true'})498    handle = client.execute_async(query)499    self.wait_for_state(handle, client.QUERY_STATES['FINISHED'], 60, client=client)500    # Wait for the idle session timeout to expire the session.501    time.sleep(5)502    # Validate that the query was retried. Skip validating client log since we can't503    # get it using the expired session.504    self.__validate_runtime_profiles_from_service(impalad_service, handle)505    # Assert than attempt to fetch from the query handle fails with a session expired506    # error.507    try:508      client.fetch(query, handle)509    except Exception, e:510      assert "Client session expired" in str(e)511    # Assert that the impalad metrics show one expired session.512    assert impalad_service.get_metric_value('impala-server.num-sessions-expired') == 1513  @pytest.mark.execute_serially514  @CustomClusterTestSuite.with_args(515      statestored_args="-statestore_heartbeat_frequency_ms=60000")516  def test_retry_query_hs2(self):517    """Test query retries with the HS2 protocol. Enable the results set cache as well and518    test that query retries work with the results cache."""519    self.cluster.impalads[1].kill()520    query = "select count(*) from tpch_parquet.lineitem"521    self.hs2_client.set_configuration({'retry_failed_queries': 'true'})522    self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')523    handle = self.hs2_client.execute_async(query)524    self.wait_for_state(handle, 'FINISHED_STATE', 60, client=self.hs2_client)525    results = self.hs2_client.fetch(query, handle)526    assert results.success527    assert len(results.data) == 1528    assert int(results.data[0]) == 6001215529    # Validate the live exec summary.530    retried_query_id = \531        self.__get_retried_query_id_from_summary(handle, use_hs2_client=True)532    assert retried_query_id is not None533    # Validate the state of the runtime profiles.534    retried_runtime_profile = self.hs2_client.get_runtime_profile(handle,535        TRuntimeProfileFormat.STRING)536    self.__validate_runtime_profiles(537        retried_runtime_profile, self.hs2_client.get_query_id(handle), retried_query_id)538    self.__validate_client_log(handle, retried_query_id, use_hs2_client=True)539    self.impalad_test_service.wait_for_metric_value(540        'impala-server.resultset-cache.total-num-rows', 1, timeout=60)541    self.hs2_client.close_query(handle)542  def __validate_runtime_profiles_from_service(self, impalad_service, handle):543    """Wrapper around '__validate_runtime_profiles' that first fetches the retried profile544    from the web ui."""545    original_profile = impalad_service.read_query_profile_page(handle.get_handle().id)546    retried_query_id = self.__get_retried_query_id_from_profile(original_profile)547    retried_profile = impalad_service.read_query_profile_page(retried_query_id)548    self.__validate_runtime_profiles(549        retried_profile, handle.get_handle().id, retried_query_id)550    return retried_query_id551  def __get_retried_query_id_from_profile(self, profile):552    """Returns the entry for 'Retried Query Id' from the given profile, or 'None' if no553    such entry exists."""554    retried_query_id_search = re.search("Retried Query Id: (.*)", profile)555    if not retried_query_id_search: return None556    return retried_query_id_search.group(1)557  def __wait_until_retried(self, handle, timeout=300):558    """Wait until the given query handle has been retried. This is achieved by polling the559    runtime profile of the query and checking the 'Retry Status' field."""560    retried_state = "RETRIED"561    def __get_retry_status():562      profile = self.__get_original_query_profile(handle.get_handle().id)563      retry_status = re.search("Retry Status: (.*)", profile)564      return retry_status.group(1) if retry_status else None565    start_time = time.time()566    retry_status = __get_retry_status()567    while retry_status != retried_state and time.time() - start_time < timeout:568      retry_status = __get_retry_status()569      time.sleep(0.5)570    if retry_status != retried_state:571      raise Timeout("query {0} was not retried within timeout".format...test_job_requeuing.py
Source:test_job_requeuing.py  
1from test.support import EnvironmentVarGuard2from unittest.mock import patch3from django.conf import settings4from django.test import TestCase5from django.utils import timezone6from data_refinery_common.models import DownloaderJob, ProcessorJob, SurveyJob7from data_refinery_foreman.foreman import job_requeuing8from data_refinery_foreman.foreman.test_utils import (9    create_downloader_job,10    create_processor_job,11    create_survey_job,12)13def fake_send_job(job_type, job, is_dispatch=False):14    job.batch_job_queue = settings.AWS_BATCH_QUEUE_WORKERS_NAMES[0]15    job.save()16    return True17class JobRequeuingTestCase(TestCase):18    @patch("data_refinery_foreman.foreman.job_requeuing.send_job")19    def test_requeuing_downloader_job(self, mock_send_job):20        mock_send_job.side_effect = fake_send_job21        job = create_downloader_job()22        job_requeuing.requeue_downloader_job(job)23        self.assertEqual(len(mock_send_job.mock_calls), 1)24        jobs = DownloaderJob.objects.order_by("id")25        original_job = jobs[0]26        self.assertTrue(original_job.retried)27        self.assertEqual(original_job.num_retries, 0)28        self.assertFalse(original_job.success)29        retried_job = jobs[1]30        self.assertEqual(retried_job.num_retries, 1)31        self.assertEqual(retried_job.original_files.count(), 2)32    @patch("data_refinery_foreman.foreman.job_requeuing.send_job")33    def test_requeuing_processor_job(self, mock_send_job):34        mock_send_job.side_effect = fake_send_job35        job = create_processor_job()36        job_requeuing.requeue_processor_job(job)37        self.assertEqual(len(mock_send_job.mock_calls), 1)38        jobs = ProcessorJob.objects.order_by("id")39        original_job = jobs[0]40        self.assertTrue(original_job.retried)41        self.assertEqual(original_job.num_retries, 0)42        self.assertFalse(original_job.success)43        retried_job = jobs[1]44        self.assertEqual(retried_job.num_retries, 1)45    @patch("data_refinery_foreman.foreman.job_requeuing.send_job")46    def test_requeuing_processor_job_no_batch_job_queue(self, mock_send_job):47        mock_send_job.side_effect = fake_send_job48        job = create_processor_job()49        job.batch_job_queue = None50        job.save()51        self.env = EnvironmentVarGuard()52        self.env.set("RUNING_IN_CLOUD", "True")53        with self.settings(RUNNING_IN_CLOUD=True):54            job_requeuing.requeue_processor_job(job)55        self.assertEqual(len(mock_send_job.mock_calls), 1)56        jobs = ProcessorJob.objects.order_by("id")57        original_job = jobs[0]58        self.assertTrue(original_job.retried)59        self.assertEqual(original_job.num_retries, 0)60        self.assertFalse(original_job.success)61        retried_job = jobs[1]62        self.assertEqual(retried_job.num_retries, 1)63        self.assertIsNotNone(retried_job.batch_job_queue)64    @patch("data_refinery_foreman.foreman.job_requeuing.send_job")65    def test_requeuing_compendia_job_no_batch_job_queue(self, mock_send_job):66        mock_send_job.side_effect = fake_send_job67        job = create_processor_job()68        job.batch_job_queue = None69        job.pipeline_applied = "CREATE_COMPENDIA"70        job.save()71        self.env = EnvironmentVarGuard()72        self.env.set("RUNING_IN_CLOUD", "True")73        with self.settings(RUNNING_IN_CLOUD=True):74            job_requeuing.requeue_processor_job(job)75        self.assertEqual(len(mock_send_job.mock_calls), 1)76        jobs = ProcessorJob.objects.order_by("id")77        original_job = jobs[0]78        self.assertTrue(original_job.retried)79        self.assertEqual(original_job.num_retries, 0)80        self.assertFalse(original_job.success)81        retried_job = jobs[1]82        self.assertEqual(retried_job.num_retries, 1)83        self.assertIsNotNone(retried_job.batch_job_queue)84    @patch("data_refinery_foreman.foreman.job_requeuing.send_job")85    def test_requeuing_processor_job_w_more_ram(self, mock_send_job):86        mock_send_job.side_effect = fake_send_job87        job = create_processor_job(pipeline="SALMON", ram_amount=16384, start_time=timezone.now())88        job_requeuing.requeue_processor_job(job)89        self.assertEqual(len(mock_send_job.mock_calls), 1)90        jobs = ProcessorJob.objects.order_by("id")91        original_job = jobs[0]92        self.assertTrue(original_job.retried)93        self.assertEqual(original_job.num_retries, 0)94        self.assertFalse(original_job.success)95        retried_job = jobs[1]96        self.assertEqual(retried_job.num_retries, 1)97        self.assertEqual(original_job.ram_amount, 16384)98        self.assertEqual(retried_job.ram_amount, 32768)99    @patch("data_refinery_foreman.foreman.job_requeuing.send_job")100    def test_requeuing_survey_job(self, mock_send_job):101        mock_send_job.side_effect = fake_send_job102        job = create_survey_job()103        job_requeuing.requeue_survey_job(job)104        self.assertEqual(len(mock_send_job.mock_calls), 1)105        jobs = SurveyJob.objects.order_by("id")106        original_job = jobs[0]107        self.assertTrue(original_job.retried)108        self.assertEqual(original_job.num_retries, 0)109        self.assertFalse(original_job.success)110        retried_job = jobs[1]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
