Best Python code snippet using localstack_python
test_cli.py
Source:test_cli.py  
...145        },146    }147    conf_obj = mock_klio_config.setup(config, config_file, config_override)148    result = runner.invoke(cli.main, cli_inputs)149    core_testing.assert_execution_success(result)150    assert "" == result.output151    if image_tag:152        mock_get_git_sha.assert_not_called()153    else:154        mock_get_git_sha.assert_called_once_with(155            mock_klio_config.patch_os_getcwd156        )157    mock_klio_config.assert_calls()158    exp_image_tag = image_tag or mock_get_git_sha.return_value159    mock_build.assert_called_once_with(160        mock_klio_config.patch_os_getcwd,161        conf_obj,162        config_override,163        image_tag=exp_image_tag,164    )165def test_create_job(runner, mock_create, patch_os_getcwd):166    cli_inputs = [167        "job",168        "create",169        "--job-name",170        "test-job",171        "--gcp-project",172        "test-gcp-project",173        "--use-defaults",174    ]175    result = runner.invoke(cli.main, cli_inputs)176    core_testing.assert_execution_success(result)177    assert "" == result.output178    known_kwargs = {179        "job_name": "test-job",180        "gcp_project": "test-gcp-project",181        "use_defaults": True,182    }183    mock_create.assert_called_once_with((), known_kwargs, patch_os_getcwd)184def test_create_job_prompts(runner, mock_create, patch_os_getcwd):185    cli_inputs = ["job", "create"]186    prompt_inputs = ["test-job", "test-gcp-project"]187    inputs = "\n".join(prompt_inputs)188    result = runner.invoke(cli.main, cli_inputs, input=inputs)189    core_testing.assert_execution_success(result)190    exp_output = (191        "Name of your new job: {0}\n"192        "Name of the GCP project the job should be created in: "193        "{1}\n".format(*prompt_inputs)194    )195    assert exp_output == result.output196    known_kwargs = {197        "job_name": "test-job",198        "gcp_project": "test-gcp-project",199        "use_defaults": False,200    }201    mock_create.assert_called_once_with((), known_kwargs, patch_os_getcwd)202def test_create_job_unknown_args(runner, mock_create, patch_os_getcwd):203    cli_inputs = [204        "job",205        "create",206        "--job-name",207        "test-job",208        "--gcp-project",209        "test-gcp-project",210        "--use-defaults",211        "--input-topic",212        "test-input-topic",213    ]214    result = runner.invoke(cli.main, cli_inputs)215    core_testing.assert_execution_success(result)216    assert "" == result.output217    unknown_args = ("--input-topic", "test-input-topic")218    known_kwargs = {219        "job_name": "test-job",220        "gcp_project": "test-gcp-project",221        "use_defaults": True,222    }223    mock_create.assert_called_once_with(224        unknown_args, known_kwargs, patch_os_getcwd225    )226@pytest.mark.parametrize("config_override", (None, "klio-job2.yaml"))227def test_delete_job(228    config_override,229    mocker,230    monkeypatch,231    runner,232    config_file,233    mock_delete,234    patch_os_getcwd,235    mock_klio_config,236):237    config = {238        "job_name": "test-job",239        "version": 1,240        "job_config": {241            "inputs": [242                {243                    "topic": "foo-topic",244                    "subscription": "foo-sub",245                    "data_location": "foo-input-location",246                }247            ],248            "outputs": [249                {250                    "topic": "foo-topic-output",251                    "data_location": "foo-output-location",252                }253            ],254        },255    }256    conf_obj = mock_klio_config.setup(config, config_file, config_override)257    cli_inputs = ["job", "delete"]258    if config_override:259        cli_inputs.extend(["--config-file", config_override])260    result = runner.invoke(cli.main, cli_inputs)261    core_testing.assert_execution_success(result)262    assert "" == result.output263    mock_klio_config.assert_calls()264    mock_delete.assert_called_once_with(conf_obj)265    mock_delete.return_value.delete.assert_called_once_with()266def test_delete_job_gke(267    mocker, monkeypatch, runner, patch_os_getcwd, mock_klio_config,268):269    config = {270        "job_name": "test-job",271        "version": 1,272        "pipeline_options": {273            "project": "test-project",274            "runner": "DirectGKERunner",275        },276        "job_config": {},277    }278    mock_klio_config.setup(config, None, None)279    mock_delete_gke = mocker.patch.object(280        cli.job_commands.gke, "DeletePipelineGKE"281    )282    cli_inputs = ["job", "delete"]283    result = runner.invoke(cli.main, cli_inputs)284    core_testing.assert_execution_success(result)285    assert "" == result.output286    mock_klio_config.assert_calls()287    mock_delete_gke.assert_called_once_with(mock_klio_config.meta.job_dir)288    mock_delete_gke.return_value.delete.assert_called_once_with()289@pytest.mark.parametrize(290    "is_gcp,direct_runner,config_override,image_tag",291    (292        (False, True, None, None),293        (True, False, None, None),294        (True, True, None, None),295        (True, True, None, "foobar"),296        (True, True, "klio-job2.yaml", None),297        (True, True, "klio-job2.yaml", "foobar"),298    ),299)300@pytest.mark.parametrize("is_job_dir_override", (False, True))301def test_run_job(302    runner,303    mocker,304    tmpdir,305    is_gcp,306    direct_runner,307    image_tag,308    config_override,309    config_file,310    mock_get_git_sha,311    mock_klio_config,312    mock_error_stackdriver_logger_metrics,313    is_job_dir_override,314):315    mock_run = mocker.patch.object(cli.job_commands.run.RunPipeline, "run")316    mock_run.return_value = 0317    config_data = {318        "job_name": "test-job",319        "pipeline_options": {320            "worker_harness_container_image": "gcr.register.io/squad/feature",321            "project": "test-project",322            "region": "boonies",323            "staging_location": "gs://somewhere/over/the/rainbow",324            "temp_location": "gs://somewhere/over/the/rainbow",325        },326        "job_config": {327            "inputs": [328                {329                    "topic": "foo-topic",330                    "subscription": "foo-sub",331                    "data_location": "foo-input-location",332                }333            ],334            "outputs": [335                {336                    "topic": "foo-topic-output",337                    "data_location": "foo-output-location",338                }339            ],340        },341    }342    job_dir_override = None343    if is_job_dir_override:344        temp_dir = tmpdir.mkdir("testing12345")345        job_dir_override = str(temp_dir)346    mock_klio_config.setup(347        config_data, config_file, config_override, job_dir_override348    )349    cli_inputs = ["job", "run"]350    if image_tag:351        cli_inputs.extend(["--image-tag", image_tag])352    if direct_runner:353        cli_inputs.append("--direct-runner")354    if config_override:355        cli_inputs.extend(["--config-file", config_override])356    if job_dir_override:357        cli_inputs.extend(["--job-dir", job_dir_override])358    exp_image_tag = image_tag or mock_get_git_sha.return_value359    if config_override:360        exp_image_tag = "{}-{}".format(exp_image_tag, config_override)361    result = runner.invoke(cli.main, cli_inputs)362    core_testing.assert_execution_success(result)363    assert "" == result.output364    mock_klio_config.assert_calls()365    mock_run.assert_called_once_with()366    mock_get_git_sha.assert_called_once_with(367        mock_klio_config.meta.job_dir, image_tag368    )369    mock_error_stackdriver_logger_metrics.assert_called_once_with(370        mock_klio_config.klio_config, direct_runner371    )372@pytest.mark.parametrize(373    "direct_runner,config_override,image_tag",374    (375        (True, None, None),376        (False, None, None),377        (True, None, "foobar"),378        (True, "klio-job2.yaml", None),379        (True, "klio-job2.yaml", "foobar"),380    ),381)382def test_run_job_gke(383    runner,384    mocker,385    tmpdir,386    direct_runner,387    image_tag,388    config_override,389    config_file,390    mock_get_git_sha,391    mock_klio_config,392    mock_error_stackdriver_logger_metrics,393):394    mock_run_gke = mocker.patch.object(gke_commands.RunPipelineGKE, "run")395    mock_run_gke.return_value = 0396    mock_run = mocker.patch.object(cli.job_commands.run.RunPipeline, "run")397    mock_run.return_value = 0398    config_data = {399        "job_name": "test-job",400        "pipeline_options": {401            "worker_harness_container_image": "gcr.register.io/squad/feature",402            "project": "test-project",403            "region": "boonies",404            "staging_location": "gs://somewhere/over/the/rainbow",405            "temp_location": "gs://somewhere/over/the/rainbow",406            "runner": "DirectGKERunner",407        },408        "job_config": {409            "inputs": [410                {411                    "topic": "foo-topic",412                    "subscription": "foo-sub",413                    "data_location": "foo-input-location",414                }415            ],416            "outputs": [417                {418                    "topic": "foo-topic-output",419                    "data_location": "foo-output-location",420                }421            ],422        },423    }424    mock_klio_config.setup(config_data, config_file, config_override)425    cli_inputs = ["job", "run"]426    if image_tag:427        cli_inputs.extend(["--image-tag", image_tag])428    if direct_runner:429        cli_inputs.append("--direct-runner")430    if config_override:431        cli_inputs.extend(["--config-file", config_override])432    exp_image_tag = image_tag or mock_get_git_sha.return_value433    if config_override:434        exp_image_tag = "{}-{}".format(exp_image_tag, config_override)435    result = runner.invoke(cli.main, cli_inputs)436    core_testing.assert_execution_success(result)437    assert "" == result.output438    mock_klio_config.assert_calls()439    if direct_runner:440        mock_run.assert_called_once_with()441        mock_run_gke.assert_not_called()442    else:443        mock_run_gke.assert_called_once_with()444        mock_run.assert_not_called()445    mock_get_git_sha.assert_called_once_with(446        mock_klio_config.meta.job_dir, image_tag447    )448    mock_error_stackdriver_logger_metrics.assert_called_once_with(449        mock_klio_config.klio_config, direct_runner450    )451def test_run_job_raises(452    runner,453    mocker,454    config_file,455    patch_os_getcwd,456    pipeline_config_dict,457    mock_warn_if_py2_job,458    mock_get_git_sha,459):460    mock_run = mocker.patch.object(cli.job_commands.run.RunPipeline, "run")461    mock_get_config = mocker.patch.object(core_utils, "get_config_by_path")462    cli_inputs = ["job", "run", "--image-tag", "foobar"]463    config = {"job_name": "test-job"}464    mock_get_config.return_value = config465    result = runner.invoke(cli.main, cli_inputs)466    assert 1 == result.exit_code467    mock_warn_if_py2_job.asset_called_once_with(patch_os_getcwd)468    mock_get_config.assert_called_once_with(config_file)469    mock_get_git_sha.assert_not_called()470    mock_run.assert_not_called()471@pytest.mark.parametrize("config_override", (None, "klio-job2.yaml"))472def test_stop_job(473    config_override,474    runner,475    mock_stop,476    mocker,477    config_file,478    pipeline_config_dict,479    mock_klio_config,480):481    config_data = {482        "job_name": "test-job",483        "version": 1,484        "pipeline_options": pipeline_config_dict,485        "job_config": {486            "inputs": [487                {488                    "topic": "foo-topic",489                    "subscription": "foo-sub",490                    "data_location": "foo-input-location",491                }492            ],493            "outputs": [494                {495                    "topic": "foo-topic-output",496                    "data_location": "foo-output-location",497                }498            ],499        },500    }501    mock_klio_config.setup(config_data, config_file, config_override)502    cli_inputs = ["job", "stop"]503    if config_override:504        cli_inputs.extend(["--config-file", config_override])505    result = runner.invoke(cli.main, cli_inputs)506    core_testing.assert_execution_success(result)507    assert "" == result.output508    mock_klio_config.assert_calls()509    mock_stop.assert_called_once_with(510        "test-job", "test-project", "us-central1", "cancel"511    )512def test_stop_job_gke(513    runner, mocker, mock_klio_config,514):515    config_data = {516        "job_name": "test-job",517        "version": 1,518        "pipeline_options": {519            "project": "test-project",520            "runner": "DirectGKERunner",521        },522        "job_config": {},523    }524    mock_klio_config.setup(config_data, None, None)525    mock_stop_gke = mocker.patch.object(526        cli.job_commands.gke, "StopPipelineGKE"527    )528    cli_inputs = ["job", "stop"]529    result = runner.invoke(cli.main, cli_inputs)530    core_testing.assert_execution_success(result)531    assert "" == result.output532    mock_klio_config.assert_calls()533    mock_stop_gke.assert_called_once_with(mock_klio_config.meta.job_dir)534@pytest.mark.parametrize(535    "is_gcp,direct_runner,config_override,image_tag",536    (537        (False, True, None, None),538        (True, False, None, None),539        (True, True, None, None),540        (True, True, "klio-job2.yaml", None),541    ),542)543def test_deploy_job(544    runner,545    mocker,546    is_gcp,547    direct_runner,548    config_override,549    image_tag,550    mock_stop,551    config_file,552    patch_os_getcwd,553    pipeline_config_dict,554    mock_get_git_sha,555    mock_klio_config,556    mock_error_stackdriver_logger_metrics,557):558    mock_run = mocker.patch.object(cli.job_commands.run.RunPipeline, "run")559    mock_run.return_value = 0560    config_data = {561        "job_name": "test-job",562        "version": 1,563        "pipeline_options": pipeline_config_dict,564        "job_config": {565            "inputs": [566                {567                    "topic": "foo-topic",568                    "subscription": "foo-sub",569                    "data_location": "foo-input-location",570                }571            ],572            "outputs": [573                {574                    "topic": "foo-topic-output",575                    "data_location": "foo-output-location",576                }577            ],578        },579    }580    mock_klio_config.setup(581        copy.deepcopy(config_data), config_file, config_override582    )583    cli_inputs = ["job", "deploy"]584    if image_tag:585        cli_inputs.extend(["--image_tag", image_tag])586    if direct_runner:587        cli_inputs.append("--direct-runner")588    if config_override:589        cli_inputs.extend(["--config-file", config_override])590    exp_image_tag = image_tag or mock_get_git_sha.return_value591    if config_override:592        exp_image_tag = "{}-{}".format(exp_image_tag, config_override)593    result = runner.invoke(cli.main, cli_inputs)594    core_testing.assert_execution_success(result)595    assert "" == result.output596    mock_get_git_sha.assert_called_once_with(patch_os_getcwd, image_tag)597    mock_klio_config.assert_calls()598    mock_stop.assert_called_once_with(599        "test-job", "test-project", "us-central1", "cancel"600    )601    mock_run.assert_called_once_with()602    mock_error_stackdriver_logger_metrics.assert_called_once_with(603        mock_klio_config.klio_config, direct_runner604    )605def test_deploy_job_raises(606    runner,607    mocker,608    mock_stop,609    config_file,610    patch_os_getcwd,611    pipeline_config_dict,612    mock_warn_if_py2_job,613    mock_get_git_sha,614):615    mock_run = mocker.patch.object(cli.job_commands.run.RunPipeline, "run")616    mock_get_config = mocker.patch.object(core_utils, "get_config_by_path")617    cli_inputs = ["job", "deploy", "--image-tag", "foobar"]618    config = {"job_name": "test-job"}619    mock_get_config.return_value = config620    result = runner.invoke(cli.main, cli_inputs)621    assert 1 == result.exit_code622    mock_warn_if_py2_job.assert_called_once_with(patch_os_getcwd)623    mock_get_config.assert_called_once_with(config_file)624    mock_stop.assert_not_called()625    mock_get_git_sha.assert_not_called()626    mock_run.assert_not_called()627@pytest.mark.parametrize(628    "pytest_args,conf_override,image_tag",629    (630        ([], None, None),631        (["--", "-s"], None, None),632        (["--", "test::test test2::test2"], None, None),633        (["--", "-s"], "klio-job2.yaml", None),634    ),635)636def test_test_job(637    runner,638    mocker,639    config_file,640    patch_os_getcwd,641    pytest_args,642    conf_override,643    image_tag,644    mock_get_git_sha,645    mock_warn_if_py2_job,646    mock_get_config_job_dir,647):648    mock_test_pipeline = mocker.patch.object(649        cli.job_commands.test, "TestPipeline"650    )651    mock_test_pipeline.return_value.run.return_value = 0652    mock_get_config_job_dir.return_value = (653        patch_os_getcwd,654        conf_override or config_file,655    )656    cli_inputs = ["job", "test"]657    if image_tag:658        cli_inputs.extend(["--image-tag", image_tag])659    if conf_override:660        cli_inputs.extend(["--config-file", conf_override])661    cli_inputs.extend(pytest_args)662    config_data = {663        "job_name": "test-job",664        "pipeline_options": {665            "worker_harness_container_image": "gcr.register.io/squad/feature",666            "project": "test-project",667            "region": "boonies",668            "staging_location": "gs://somewhere/over/the/rainbow",669            "temp_location": "gs://somewhere/over/the/rainbow",670        },671        "job_config": {672            "inputs": [673                {674                    "topic": "foo-topic",675                    "subscription": "foo-sub",676                    "data_location": "foo-input-location",677                }678            ],679            "outputs": [680                {681                    "topic": "foo-topic-output",682                    "data_location": "foo-output-location",683                }684            ],685        },686    }687    mock_get_config = mocker.patch.object(core_utils, "get_config_by_path")688    # deepcopy since KlioConfig will pop keys689    mock_get_config.return_value = config_data690    conf = kconfig.KlioConfig(copy.deepcopy(config_data))691    mock_klio_config = mocker.patch.object(core_utils.config, "KlioConfig")692    mock_klio_config.return_value = conf693    result = runner.invoke(cli.main, cli_inputs)694    core_testing.assert_execution_success(result)695    assert "" == result.output696    exp_image_tag = image_tag or mock_get_git_sha.return_value697    if conf_override:698        exp_image_tag = "{}-{}".format(exp_image_tag, conf_override)699    mock_get_config_job_dir.assert_called_once_with(None, conf_override)700    mock_warn_if_py2_job.assert_called_once_with(patch_os_getcwd)701    if not image_tag:702        mock_get_git_sha.assert_called_once_with(patch_os_getcwd)703    else:704        mock_get_git_sha.assert_not_called()705    mock_get_config.assert_called_once_with(conf_override or config_file)706    mock_klio_config.assert_called_once_with(707        config_data, raw_overrides=(), raw_templates=()708    )709    exp_docker_runtime_config = cli.DockerRuntimeConfig(710        image_tag=exp_image_tag,711        force_build=False,712        config_file_override=conf_override,713    )714    mock_test_pipeline.assert_called_once_with(715        patch_os_getcwd, conf, exp_docker_runtime_config716    )717    mock_test_pipeline.return_value.run.assert_called_once_with(718        pytest_args=pytest_args719    )720@pytest.mark.parametrize(721    "create_resources,conf_override",722    (723        (False, None),724        (False, "klio-job2.yaml"),725        (True, None),726        (True, "klio-job2.yaml"),727    ),728)729def test_verify(730    runner,731    mocker,732    patch_os_getcwd,733    create_resources,734    conf_override,735    mock_warn_if_py2_job,736    mock_get_config_job_dir,737):738    mock_verify_job = mocker.patch.object(739        cli.job_commands.verify.VerifyJob, "verify_job"740    )741    mock_get_config_job_dir.return_value = (742        patch_os_getcwd,743        conf_override or config_file,744    )745    mock_get_config = mocker.patch.object(core_utils, "get_config_by_path")746    config = {747        "job_name": "klio-job-name",748        "job_config": {749            "inputs": [750                {"topic": "foo", "subscription": "bar", "data_location": "baz"}751            ],752            "outputs": [{"topic": "foo-out", "data_location": "baz-out"}],753        },754        "pipeline_options": {},755        "version": 1,756    }757    mock_get_config.return_value = config758    cli_inputs = ["job", "verify"]759    if create_resources:760        cli_inputs.append("--create-resources")761    if conf_override:762        cli_inputs.extend(["--config-file", conf_override])763    result = runner.invoke(cli.main, cli_inputs)764    core_testing.assert_execution_success(result)765    mock_get_config_job_dir.assert_called_once_with(None, conf_override)766    mock_get_config.assert_called_once_with(conf_override or config_file)767    mock_warn_if_py2_job.assert_called_once_with(patch_os_getcwd)768    mock_verify_job.assert_called_once_with()769@pytest.mark.parametrize(770    "use_job_dir,gcs_location,conf_override",771    (772        (None, None, None),773        (True, None, None),774        (None, "gs://bar", None),775        (None, None, "klio-job2.yaml"),776    ),777)778@pytest.mark.parametrize(779    "input_file,output_file",780    (781        (None, None),782        ("input.stats", None),783        (None, "output.stats"),784        ("input.stats", "output.stats"),785    ),786)787@pytest.mark.parametrize(788    "since,until", ((None, None), ("2 hours ago", "1 hour ago"))789)790def test_collect_profiling_data(791    runner,792    patch_os_getcwd,793    mock_warn_if_py2_job,794    mocker,795    use_job_dir,796    gcs_location,797    conf_override,798    input_file,799    output_file,800    since,801    until,802):803    mock_collector = mocker.patch.object(804        cli.job_commands.profile, "DataflowProfileStatsCollector"805    )806    config = {807        "version": 2,808        "job_name": "test-job",809        "job_config": {"events": {}, "data": {}},810        "pipeline_options": {},811    }812    if not gcs_location:813        config["pipeline_options"] = {"profile_location": "gs://foo"}814    mock_get_config = mocker.patch.object(core_utils, "get_config_by_path")815    mock_get_config.return_value = config816    exp_exit_code = 0817    exp_output_snippet = ""818    if input_file:819        if any([use_job_dir, gcs_location, output_file, conf_override]):820            exp_exit_code = 2821            exp_output_snippet = "mutually exclusive"822        elif any([since, until]):823            exp_output_snippet = "`--since` and `--until`  will be ignored"824    cli_inputs = ["job", "profile", "collect-profiling-data"]825    if input_file:826        cli_inputs.extend(["--input-file", input_file])827    if output_file:828        cli_inputs.extend(["--output-file", output_file])829    if since:830        cli_inputs.extend(["--since", since])831    if until:832        cli_inputs.extend(["--until", until])833    if gcs_location:834        cli_inputs.extend(["--gcs-location", gcs_location])835    if conf_override:836        cli_inputs.extend(["--config-file", conf_override])837    exp_since = since or "1 hour ago"838    exp_until = until or "now"839    with runner.isolated_filesystem() as f:840        if input_file:841            with open(input_file, "w") as in_f:842                in_f.write("foo\n")843        if use_job_dir:844            cli_inputs.extend(["--job-dir", f])845        result = runner.invoke(cli.main, cli_inputs)846        assert exp_exit_code == result.exit_code847        assert exp_output_snippet in result.output848        exp_job_dir = patch_os_getcwd if not use_job_dir else f849        if use_job_dir and exp_exit_code == 0:850            mock_warn_if_py2_job.assert_called_once_with(exp_job_dir)851        if exp_exit_code != 2:852            mock_collector.assert_called_once_with(853                gcs_location=gcs_location or "gs://foo",854                input_file=input_file,855                output_file=output_file,856                since=exp_since,857                until=exp_until,858            )859            mock_collector.return_value.get.assert_called_once_with(860                ("tottime",), ()861            )862def test_collect_profiling_data_raises(863    runner, patch_os_getcwd, mock_warn_if_py2_job, mocker864):865    mock_collector = mocker.patch.object(866        cli.job_commands.profile, "DataflowProfileStatsCollector"867    )868    config = {869        "version": 2,870        "job_name": "test-job",871        "job_config": {"events": {}, "data": {}},872        "pipeline_options": {},873    }874    mock_get_config = mocker.patch.object(core_utils, "get_config_by_path")875    mock_get_config.return_value = config876    cli_inputs = ["job", "profile", "collect-profiling-data"]877    result = runner.invoke(cli.main, cli_inputs)878    # no need to check the whole error msg string879    assert "Error: Please provide a GCS location" in result.output880    assert 2 == result.exit_code881    mock_collector.assert_not_called()882    mock_collector.return_value.get.assert_not_called()883@pytest.mark.parametrize(884    "input_file,entity_ids,exp_raise,exp_msg",885    (886        ("input.txt", (), False, None),887        (None, ("foo", "bar"), False, None),888        ("input.txt", ("foo", "bar"), True, "Illegal usage"),889        (None, (), True, "Must provide"),890    ),891)892def test_require_profile_input_data(893    input_file, entity_ids, exp_raise, exp_msg894):895    if exp_raise:896        with pytest.raises(click.UsageError, match=exp_msg):897            cli._require_profile_input_data(input_file, entity_ids)898    else:899        ret = cli._require_profile_input_data(input_file, entity_ids)900        assert ret is None901@pytest.mark.parametrize(902    "image_tag,config_override", ((None, None), ("foo-1234", "klio-job2.yaml"))903)904def test_profile(905    image_tag,906    config_override,907    config_file,908    patch_os_getcwd,909    pipeline_config_dict,910    mock_get_git_sha,911    mocker,912    mock_klio_config,913):914    mock_req_profile_input = mocker.patch.object(915        cli, "_require_profile_input_data"916    )917    config_data = {918        "job_name": "test-job",919        "pipeline_options": {920            "worker_harness_container_image": "gcr.register.io/squad/feature",921            "project": "test-project",922            "region": "boonies",923            "staging_location": "gs://somewhere/over/the/rainbow",924            "temp_location": "gs://somewhere/over/the/rainbow",925        },926        "job_config": {927            "inputs": [928                {929                    "topic": "foo-topic",930                    "subscription": "foo-sub",931                    "data_location": "foo-input-location",932                }933            ],934            "outputs": [935                {936                    "topic": "foo-topic-output",937                    "data_location": "foo-output-location",938                }939            ],940        },941    }942    conf = mock_klio_config.setup(config_data, config_file, config_override)943    mock_pipeline = mocker.patch.object(944        cli.job_commands.profile, "ProfilePipeline"945    )946    exp_image_tag = image_tag or mock_get_git_sha.return_value947    if config_override:948        exp_image_tag = "{}-{}".format(exp_image_tag, config_override)949    exp_basic_config = cli.DockerRuntimeConfig(950        image_tag=exp_image_tag,951        force_build=False,952        config_file_override=config_override,953    )954    exp_prof_config = cli.ProfileConfig(955        input_file="input.txt",956        output_file="output.txt",957        show_logs=False,958        entity_ids=(),959    )960    kwargs = {961        "input_file": "input.txt",962        "output_file": "output.txt",963        "entity_ids": (),964        "force_build": False,965        "show_logs": False,966        "entity_ids": (),967    }968    cli._profile(969        subcommand="foo",970        klio_config=conf,971        image_tag=image_tag,972        config_meta=mock_klio_config.meta,973        **kwargs974    )975    mock_req_profile_input.assert_called_once_with("input.txt", ())976    exp_call = mocker.call(977        mock_klio_config.meta.job_dir, conf, exp_basic_config, exp_prof_config,978    )979    assert 1 == mock_pipeline.call_count980    assert exp_call == mock_pipeline.call_args981    mock_pipeline.return_value.run.assert_called_once_with(982        what="foo", subcommand_flags={}983    )984def test_profile_memory(runner, mocker, minimal_mock_klio_config):985    mock_profile = mocker.patch.object(cli, "_profile")986    result = runner.invoke(cli.profile_memory, [])987    core_testing.assert_execution_success(result)988    exp_kwargs = {989        "image_tag": None,990        "force_build": False,991        "include_children": False,992        "input_file": None,993        "interval": 0.1,994        "multiprocess": False,995        "output_file": None,996        "plot_graph": False,997        "show_logs": False,998        "entity_ids": (),999    }1000    mock_profile.assert_called_once_with(1001        "memory",1002        minimal_mock_klio_config.klio_config,1003        minimal_mock_klio_config.meta,1004        **exp_kwargs1005    )1006def test_profile_memory_per_line(runner, mocker, minimal_mock_klio_config):1007    mock_profile = mocker.patch.object(cli, "_profile")1008    result = runner.invoke(cli.profile_memory_per_line, [])1009    core_testing.assert_execution_success(result)1010    exp_kwargs = {1011        "get_maximum": False,1012        "per_element": False,1013        "image_tag": None,1014        "force_build": False,1015        "input_file": None,1016        "output_file": None,1017        "show_logs": False,1018        "entity_ids": (),1019    }1020    mock_profile.assert_called_once_with(1021        "memory-per-line",1022        minimal_mock_klio_config.klio_config,1023        minimal_mock_klio_config.meta,1024        **exp_kwargs1025    )1026def test_profile_cpu(runner, mocker, minimal_mock_klio_config):1027    mock_profile = mocker.patch.object(cli, "_profile")1028    result = runner.invoke(cli.profile_cpu, [])1029    core_testing.assert_execution_success(result)1030    exp_kwargs = {1031        "interval": 0.1,1032        "plot_graph": False,1033        "image_tag": None,1034        "force_build": False,1035        "input_file": None,1036        "output_file": None,1037        "show_logs": False,1038        "entity_ids": (),1039    }1040    mock_profile.assert_called_once_with(1041        "cpu",1042        minimal_mock_klio_config.klio_config,1043        minimal_mock_klio_config.meta,1044        **exp_kwargs1045    )1046def test_profile_timeit(1047    runner, mocker, mock_klio_config, minimal_mock_klio_config1048):1049    mock_profile = mocker.patch.object(cli, "_profile")1050    result = runner.invoke(cli.profile_timeit, [])1051    core_testing.assert_execution_success(result)1052    exp_kwargs = {1053        "iterations": 10,1054        "image_tag": None,1055        "force_build": False,1056        "input_file": None,1057        "output_file": None,1058        "show_logs": False,1059        "entity_ids": (),1060    }1061    mock_profile.assert_called_once_with(1062        "timeit",1063        mock_klio_config.klio_config,1064        minimal_mock_klio_config.meta,1065        **exp_kwargs1066    )1067@pytest.mark.parametrize(1068    "force,ping,top_down,bottom_up,non_klio,conf_override",1069    (1070        (False, False, False, False, False, None),1071        (False, False, True, False, False, None),1072        (False, True, False, False, False, None),1073        (True, False, False, False, False, None),1074        (True, True, True, False, False, None),1075        (False, False, False, False, True, None),1076        (False, False, True, False, True, None),1077        (False, False, False, False, False, "klio-job2.yaml"),1078    ),1079)1080def test_publish(1081    force,1082    ping,1083    top_down,1084    bottom_up,1085    non_klio,1086    conf_override,1087    runner,1088    mocker,1089    config_file,1090    patch_os_getcwd,1091    mock_klio_config,1092):1093    mock_publish = mocker.patch.object(1094        cli.message_commands.publish, "publish_messages"1095    )1096    config = {1097        "job_name": "test-job",1098        "pipeline_options": {1099            "worker_harness_container_image": "gcr.register.io/squad/feature"1100        },1101        "job_config": {1102            "inputs": [1103                {1104                    "topic": "foo-topic",1105                    "subscription": "foo-sub",1106                    "data_location": "foo-input-location",1107                }1108            ],1109            "outputs": [1110                {1111                    "topic": "foo-topic-output",1112                    "data_location": "foo-output-location",1113                }1114            ],1115        },1116    }1117    cli_inputs = ["message", "publish", "deadb33f"]1118    if force:1119        cli_inputs.append("--force")1120    if ping:1121        cli_inputs.append("--ping")1122    if top_down:1123        cli_inputs.append("--top-down")1124    if bottom_up:1125        cli_inputs.append("--bottom-up")1126    if non_klio:1127        cli_inputs.append("--non-klio")1128        config["job_config"]["allow_non_klio_messages"] = True1129    if conf_override:1130        cli_inputs.extend(["--config-file", conf_override])1131    conf = mock_klio_config.setup(config, config_file)1132    result = runner.invoke(cli.main, cli_inputs)1133    core_testing.assert_execution_success(result)1134    assert "" == result.output1135    mock_publish.assert_called_once_with(1136        conf, ("deadb33f",), force, ping, top_down, non_klio,1137    )1138def test_publish_raises_execution(1139    runner, mocker, config_file, patch_os_getcwd, mock_klio_config1140):1141    mock_publish = mocker.patch.object(1142        cli.message_commands.publish, "publish_messages"1143    )1144    cli_inputs = [1145        "message",1146        "publish",1147        "deadb33f",..._testing.py
Source:_testing.py  
...13# limitations under the License.14#15from klio_core import config as kconfig16from klio_core import utils as core_utils17def assert_execution_success(result):18    """Helper for testing CLI commands that emits errors if execution failed"""19    if result.exception:20        if result.stdout:21            print("captured stdout: {}".format(result.stdout))22        raise result.exception23    assert 0 == result.exit_code24class MockKlioConfig(object):25    def __init__(self, module, mocker, monkeypatch, patch_os_getcwd):26        self.mock_get_config = mocker.patch.object(27            core_utils, "get_config_by_path"28        )29        self.mock_get_config_job_dir = mocker.Mock()30        monkeypatch.setattr(31            module.core_utils,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
