Best Python code snippet using Testify_python
test_countifs.py
Source:test_countifs.py  
...210            (9, 2),211        ],212    },213]214def prepare_test_case(215    spark_context: SparkContext, test_case: Dict[str, List[Any]]216) -> Tuple[RDD, RDD, List[Any]]:217    data_rdd = spark_context.parallelize(218        enumerate(219            map(lambda p: p if isinstance(p, tuple) else (p,), test_case["data_points"])220        )221    )222    query_rdd = spark_context.parallelize(223        enumerate(224            map(225                lambda p: p if isinstance(p, tuple) else (p,),226                sorted(test_case["query_points"]),227            )228        )229    )230    return data_rdd, query_rdd, test_case["expected_result"]231@pytest.mark.parametrize("n_partitions", [1, 2, 4])232@pytest.mark.parametrize("test_case", TESTS_1D)233def test_algorithm_execution_1d(spark_context, n_partitions, test_case):234    data_rdd, query_rdd, expected_result = prepare_test_case(spark_context, test_case)235    countifs = Countifs(spark_context, n_partitions)236    result = countifs(data_rdd=data_rdd, query_rdd=query_rdd, n_dim=1).collect()237    assert len(result) == len(expected_result)238    assert result == expected_result239TESTS_2D = [240    {241        "data_points": [(3, 6), (4, 2)],242        "query_points": [(0, 5), (7, 1)],243        "expected_result": [(0, 1), (1, 0)],244    },245    {246        "query_points": [(0, 5), (7, 1)],247        "data_points": [(3, 6), (4, 2)],248        "expected_result": [(0, 1), (1, 0)],249    },250    {251        "query_points": [(100, 100), (102, 102)],252        "data_points": [(103, 480), (1178, 101)],253        "expected_result": [(0, 2), (1, 1)],254    },255    {256        "query_points": [(100, 100), (102, 102), (104, 104), (106, 106)],257        "data_points": [(1178, 101), (103, 480), (105, 1771), (1243, 107)],258        "expected_result": [(0, 4), (1, 3), (2, 2), (3, 1)],259    },260    {261        "query_points": [(100, 100), (102, 102)],262        "data_points": [(103, 480), (105, 1771), (1178, 101), (1243, 107)],263        "expected_result": [(0, 4), (1, 3)],264    },265]266@pytest.mark.parametrize("n_partitions", [1, 2, 4])267@pytest.mark.parametrize("test_case", TESTS_2D)268def test_algorithm_execution_2d(spark_context, n_partitions, test_case):269    data_rdd, query_rdd, expected_result = prepare_test_case(spark_context, test_case)270    countifs = Countifs(spark_context, n_partitions)271    result = countifs(data_rdd=data_rdd, query_rdd=query_rdd, n_dim=2).collect()272    assert len(result) == len(expected_result)273    assert result == expected_result274TESTS_3D = [275    {276        "query_points": [(100, 100, 100), (102, 102, 102)],277        "data_points": [278            (2137, 103, 480),279            (105, 2137, 1771),280            (1178, 101, 2137),281            (2137, 1243, 107),282        ],283        "expected_result": [(0, 4), (1, 3)],284    }285]286@pytest.mark.parametrize("n_partitions", [1, 2, 4])287@pytest.mark.parametrize("test_case", TESTS_3D)288def test_algorithm_execution_3d(spark_context, n_partitions, test_case):289    data_rdd, query_rdd, expected_result = prepare_test_case(spark_context, test_case)290    countifs = Countifs(spark_context, n_partitions)291    result = countifs(data_rdd=data_rdd, query_rdd=query_rdd, n_dim=3).collect()292    assert len(result) == len(expected_result)293    assert result == expected_result294def random_test_case(n_data_points: int, n_query_points: int, n_dim: int):295    min_coord_value = 100296    max_coord_value = 100 + 100 * (n_query_points + n_data_points)297    def make_query_point(i):298        return tuple(min_coord_value + 2 * i for _ in range(n_dim))299    query_points = [make_query_point(i) for i in range(n_query_points)]300    data_points_per_query_point = n_data_points // n_query_points301    data_points_rest = n_data_points - n_query_points * data_points_per_query_point302    def make_data_point(min_val, max_val, global_max_val):303        """304        One of the coords will be between (min_val, max_val),305        rest of the coords will be between (min_val, global_max_val)306        """307        random_coord = random.randint(0, n_dim - 1)308        coords = [random.randint(min_val, global_max_val) for _ in range(n_dim)]309        coords[random_coord] = random.randint(min_val, max_val)310        return tuple(coords)311    # start with random data points which are smaller than all query points312    data_points = [313        make_data_point(0, min_coord_value, max_coord_value)314        for _ in range(data_points_rest)315    ]316    for i in range(n_query_points):317        # add data point in L-shape, with all dimensions > query point dimensions318        data_points_for_query = [319            make_data_point(320                min_coord_value + 2 * i + 1,321                min_coord_value + 2 * i + 1,322                max_coord_value,323            )324            for _ in range(data_points_per_query_point)325        ]326        data_points += data_points_for_query327    random.shuffle(data_points)328    expected_result = [329        (i, data_points_per_query_point * (n_query_points - i))330        for i in range(n_query_points)331    ]332    assert expected_result[-1] == (n_query_points - 1, data_points_per_query_point)333    assert (334        len(data_points) == n_data_points335    ), f"got: {len(data_points)}, expected: {n_data_points}"336    return {337        "data_points": data_points,338        "query_points": query_points,339        "expected_result": expected_result,340    }341LONG_N_PARTITIONS = [1, 2, 3, 4, 8, 16]342RANDOM_TESTS_1D = [343    random_test_case(10, 10, 1),344    random_test_case(1_000, 10, 1),345    random_test_case(1_000, 100, 1),346    random_test_case(1_000, 1_000, 1),347    # random_test_case(100_000, 10, 1),348    # random_test_case(100_000, 100, 1),349    # random_test_case(100_000, 1_000, 1),350    # random_test_case(100_000, 10_000, 1),351]352@pytest.mark.long353@pytest.mark.parametrize("n_partitions", LONG_N_PARTITIONS)354@pytest.mark.parametrize("test_case", RANDOM_TESTS_1D)355def test_algorithm_performance_1d(spark_context, n_partitions, test_case):356    data_rdd, query_rdd, expected_result = prepare_test_case(spark_context, test_case)357    countifs = Countifs(spark_context, n_partitions)358    result = countifs(data_rdd=data_rdd, query_rdd=query_rdd, n_dim=1).collect()359    assert len(result) == len(expected_result)360    assert result == expected_result361RANDOM_TESTS_2D = [362    random_test_case(10, 10, 2),363    random_test_case(1_000, 10, 2),364    random_test_case(1_000, 100, 2),365    random_test_case(1_000, 1_000, 2),366    # random_test_case(100_000, 10, 2),367    # random_test_case(100_000, 100, 2),368    # random_test_case(100_000, 1_000, 2),369    # random_test_case(100_000, 10_000, 2),370]371@pytest.mark.long372@pytest.mark.parametrize("n_partitions", LONG_N_PARTITIONS)373@pytest.mark.parametrize("test_case", RANDOM_TESTS_2D)374def test_algorithm_performance_2d(spark_context, n_partitions, test_case):375    data_rdd, query_rdd, expected_result = prepare_test_case(spark_context, test_case)376    countifs = Countifs(spark_context, n_partitions)377    result = countifs(data_rdd=data_rdd, query_rdd=query_rdd, n_dim=2).collect()378    assert len(result) == len(expected_result)379    assert result == expected_result380RANDOM_TESTS_3D = [381    random_test_case(10, 10, 3),382    random_test_case(1_000, 10, 3),383    random_test_case(1_000, 100, 3),384    random_test_case(1_000, 1_000, 3),385    # random_test_case(100_000, 10, 3),386    # random_test_case(100_000, 100, 3),387    # random_test_case(100_000, 1_000, 3),388    # random_test_case(100_000, 10_000, 3),389]390@pytest.mark.long391@pytest.mark.parametrize("n_partitions", LONG_N_PARTITIONS)392@pytest.mark.parametrize("test_case", RANDOM_TESTS_3D)393def test_algorithm_performance_3d(spark_context, n_partitions, test_case):394    data_rdd, query_rdd, expected_result = prepare_test_case(spark_context, test_case)395    countifs = Countifs(spark_context, n_partitions)396    result = countifs(data_rdd=data_rdd, query_rdd=query_rdd, n_dim=3).collect()397    assert len(result) == len(expected_result)...test_runner_test.py
Source:test_runner_test.py  
...7from testify import test_runner8from .test_runner_subdir.inheriting_class import InheritingClass9prepared = False10running = False11def prepare_test_case(options, test_case):12    global prepared13    prepared = True14def run_test_case(options, test_case, runnable):15    global running16    running = True17    try:18        return runnable()19    finally:20        running = False21def add_testcase_info(test_case, runner):22    test_case.__testattr__ = True23class TestTestRunnerGetTestMethodName(test_case.TestCase):24    def test_method_from_other_module_reports_class_module(self):25        ret = test_runner.TestRunner.get_test_method_name(...add_validation_case.py
Source:add_validation_case.py  
...12# get event data from production SKIP13skip = SKIP('https://skip.eatws.net',14            secret_id='skip-prod-readonly-access')15logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)16def prepare_test_case(evid, inventory=None, waveforms=None):17    event: Optional[Event] = skip.get_event(evid)18    if not event:19        raise ValueError('%s not found in SKIP' % evid)20    eqinfo = dict(21        id=event.id,22        lat=event.latitude,23        lon=event.longitude,24        dep=event.depth_km,25        time=UTCDateTime(event.event_time),26    )27    datadir = abspath(DATA_DIR)28    result = runwphase(29        server='IRIS',30        eqinfo=eqinfo,31        save_waveforms=join(datadir, '%s.mseed' % evid),32        save_inventory=join(datadir, '%s.xml' % evid),33        inventory=inventory,34        waveforms=waveforms,35    )36    MT = result.MomentTensor37    case = dict(38        id=event.id,39        lat=event.latitude,40        lon=event.longitude,41        dep=event.depth_km,42        time=event.event_time,43        _expected_results={k: getattr(MT, k) for k in result_keys},44    )45    if result.QualityParams is not None:46        case["_expected_results"]["azimuthal_gap"] = result.QualityParams.azimuthal_gap47    print(json.dumps(case, indent=4))48    add_case(case)49    print("This test case has been added to validation_cases.json and test-datasets/.")50    print("To create a new release tarball: "51          "tar czvf ga-wphase-test-datasets.tar.gz test-datasets/ validation_cases.json")52if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
