Best Python code snippet using locust
test_retry.py
Source:test_retry.py  
...5from idact.detail.config.client.client_cluster_config import ClusterConfigImpl6from idact.detail.config.client.retry_config_impl import RetryConfigImpl7from idact.detail.helper.retry import retry, retry_with_config, \8    format_retry_failed_message9def failing_task(fail_times: int, failures: List[int]):10    """Fails fail_times, then returns 123.11        :param fail_times: Times to fail in a row before12                           success.13        :param failures: Empty list to append failure indices.14    """15    if fail_times <= len(failures):16        return 12317    if failures:18        failures.append(failures[-1] + 1)19    else:20        failures.append(0)21    raise RuntimeError()22def test_no_retries():23    failures = []24    with pytest.raises(RuntimeError):25        retry(fun=lambda: failing_task(fail_times=1, failures=failures),26              retries=0, seconds_between_retries=0)27    assert failures == [0]28def test_first_try_succeeds_no_retries():29    failures = []30    assert retry(fun=lambda: failing_task(fail_times=0, failures=failures),31                 retries=0, seconds_between_retries=0) == 12332    assert failures == []33def test_first_try_succeeds_one_retry():34    failures = []35    assert retry(fun=lambda: failing_task(fail_times=0, failures=failures),36                 retries=1, seconds_between_retries=0) == 12337    assert failures == []38def test_first_try_succeeds_some_retries():39    failures = []40    assert retry(fun=lambda: failing_task(fail_times=0, failures=failures),41                 retries=3, seconds_between_retries=0) == 12342    assert failures == []43def test_second_try_succeeds_no_retries():44    failures = []45    with pytest.raises(RuntimeError):46        retry(fun=lambda: failing_task(fail_times=1, failures=failures),47              retries=0, seconds_between_retries=0)48    assert failures == [0]49def test_second_try_succeeds_one_retry():50    failures = []51    assert retry(fun=lambda: failing_task(fail_times=1, failures=failures),52                 retries=1, seconds_between_retries=0) == 12353    assert failures == [0]54def test_second_try_succeeds_some_retries():55    failures = []56    assert retry(fun=lambda: failing_task(fail_times=1, failures=failures),57                 retries=3, seconds_between_retries=0) == 12358    assert failures == [0]59def test_third_try_succeeds_no_retries():60    failures = []61    with pytest.raises(RuntimeError):62        retry(fun=lambda: failing_task(fail_times=2, failures=failures),63              retries=0, seconds_between_retries=0)64    assert failures == [0]65def test_third_try_succeeds_one_retry():66    failures = []67    with pytest.raises(RuntimeError):68        retry(fun=lambda: failing_task(fail_times=2, failures=failures),69              retries=1, seconds_between_retries=0)70    assert failures == [0, 1]71def test_third_try_succeeds_some_retries():72    failures = []73    assert retry(fun=lambda: failing_task(fail_times=2, failures=failures),74                 retries=3, seconds_between_retries=0) == 12375    assert failures == [0, 1]76def test_third_try_succeeds_one_retry_with_config():77    failures = []78    config = ClusterConfigImpl(79        host='h', port=1, user='u', auth=AuthMethod.ASK,80        retries={Retry.PORT_INFO: RetryConfigImpl(81            count=1,82            seconds_between=0)})83    with pytest.raises(RuntimeError):84        retry_with_config(85            fun=lambda: failing_task(fail_times=2, failures=failures),86            name=Retry.PORT_INFO,87            config=config)88    assert failures == [0, 1]89def test_third_try_succeeds_some_retries_with_config():90    failures = []91    config = ClusterConfigImpl(92        host='h', port=1, user='u', auth=AuthMethod.ASK,93        retries={Retry.PORT_INFO: RetryConfigImpl(94            count=3,95            seconds_between=0)})96    assert retry_with_config(97        fun=lambda: failing_task(fail_times=2, failures=failures),98        name=Retry.PORT_INFO,99        config=config) == 123100    assert failures == [0, 1]101def test_retry_failed_message():102    formatted_message = format_retry_failed_message(name=Retry.PORT_INFO,103                                                    count=3,104                                                    seconds_between=5)105    assert formatted_message == (106        "Retried and failed: config.retries[Retry.PORT_INFO].{count=3, "...datadoc_poc.py
Source:datadoc_poc.py  
...4from airflow.operators.dummy import DummyOperator5from airflow.operators.python import PythonOperator6from datetime import datetime, timedelta7from include.datadog import ddclient8def failing_task():9    time.sleep(3)10    sys.exit()11# Default settings applied to all tasks12default_args = {13    "owner": "airflow",14    "retries": 1,15    "retry_delay": timedelta(seconds=5),16    "on_success_callback": ddclient().success_callback,17    "on_failure_callback": ddclient().failure_callback,18    "on_retry_callback": ddclient().retry_callback19}20with DAG(21        dag_id="datadog_poc",22        start_date=datetime(2022, 1, 6),...test_safe_execute.py
Source:test_safe_execute.py  
...19@pytest.fixture20def succeeding_task(succeeding_command, notify):21    return Task("succeeding", [], succeeding_command, notify)22@pytest.fixture23def failing_task(failing_command, notify):24    return Task("failing", [], failing_command, notify)25def test_success_returns_zero(succeeding_task):26    assert succeeding_task.safe_execute() == 027def test_failure_returns_one(failing_task):28    assert failing_task.safe_execute() == 129def test_failure_sends_exactly_one_notification(failing_task, notify):30    failing_task.safe_execute()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
