How to use run_cli method in Robotframework

Best Python code snippet using robotframework

test_main.py

Source:test_main.py Github

copy

Full Screen

...13import meeshkan.__main__ as main14from .utils import MockResponse, DummyStore, PicklableMock15CLI_RUNNER = CliRunner()16BUILD_CLOUD_CLIENT_PATCH_PATH = 'meeshkan.__utils__._build_cloud_client'17def run_cli(args, inputs=None, catch_exceptions=True):18 return CLI_RUNNER.invoke(main.cli, args=args, catch_exceptions=catch_exceptions, input=inputs)19def _build_session(post_return_value=None, request_return_value=None):20 session = mock.create_autospec(requests.Session)21 if post_return_value is not None:22 session.post.return_value = post_return_value23 if request_return_value is not None:24 session.request.return_value = request_return_value25 return session26def _token_store(build_session=None):27 """Returns a TokenStore for unit testing"""28 _cloud_url = 'favorite-url-yay.com'29 _refresh_token = 'meeshkan-top-secret'30 if build_session is None:31 return DummyStore(cloud_url=_cloud_url, refresh_token=_refresh_token)32 return DummyStore(cloud_url=_cloud_url, refresh_token=_refresh_token, build_session=build_session)33@pytest.fixture34def pre_post_tests():35 """Pre- and post-test method to explicitly start and stop various instances."""36 def stop_service():37 run_cli(args=['stop'])38 yield stop_service()39 stop_service() # Stuff to run after every test40def test_setup_if_exists(pre_post_tests): # pylint:disable=unused-argument,redefined-outer-name41 """Tests `meeshkan setup` if the credentials file exists42 Does not test wrt to Git access token; that's tested separately in test_config"""43 # Mock credentials writing (tested in test_config.py)44 temp_token = "abc"45 def to_isi(refresh_token, git_access_token, *args):46 assert refresh_token == temp_token, "Refresh token token used is '{}'!".format(temp_token)47 assert git_access_token == "", "No git access token is given!"48 with mock.patch("meeshkan.config.Credentials.to_isi") as mock_to_isi:49 with mock.patch("os.path.isfile") as mock_isfile:50 mock_isfile.return_value = True51 mock_to_isi.side_effect = to_isi52 # Test with proper interaction53 run_cli(args=['setup'], inputs="y\n{token}\n\n".format(token=temp_token), catch_exceptions=False)54 assert mock_to_isi.call_count == 1, "`to_isi` should only be called once (proper response)"55 # Test with empty response56 run_cli(args=['setup'], inputs="\n{token}\n\n".format(token=temp_token), catch_exceptions=False)57 assert mock_to_isi.call_count == 2, "`to_isi` should be called twice here (default response)"58 # Test with non-positive answer59 config_result = run_cli(args=['setup'], inputs="asdasdas\n{token}\n\n".format(token=temp_token),60 catch_exceptions=False)61 assert mock_to_isi.call_count == 2, "`to_isi` should still be called only twice (negative answer)"62 assert config_result.exit_code == 2, "Exit code should be non-zero (2 - cancelled by user)"63def test_setup_if_doesnt_exists(pre_post_tests): # pylint:disable=unused-argument,redefined-outer-name64 """Tests `meeshkan setup` if the credentials file does not exist65 Does not test wrt to Git access token; that's tested separately in test_config"""66 # Mock credentials writing (tested in test_config.py)67 temp_token = "abc"68 def to_isi(refresh_token, git_access_token, *args):69 assert refresh_token == temp_token, "Refresh token token used is '{}'!".format(temp_token)70 assert git_access_token == "", "No git access token is given!"71 with mock.patch("meeshkan.config.Credentials.to_isi") as mock_to_isi:72 with mock.patch("os.path.isfile") as mock_isfile:73 mock_isfile.return_value = False74 mock_to_isi.side_effect = to_isi75 # Test with proper interaction76 run_cli(args=['setup'], inputs="{token}\n\n".format(token=temp_token), catch_exceptions=False)77 assert mock_to_isi.call_count == 1, "`to_isi` should only be called once (token given)"78 # Test with empty response79 temp_token = ''80 run_cli(args=['setup'], inputs="\n\n", catch_exceptions=False)81 assert mock_to_isi.call_count == 2, "`to_isi` should be called twice here (empty token)"82@pytest.mark.skip("This is hard to test at the moment")83def test_version_mismatch_major(pre_post_tests): # pylint:disable=unused-argument,redefined-outer-name84 original_version = meeshkan.__version__85 meeshkan.__version__ = '0.0.0'86 with mock.patch("requests.get") as mock_requests_get: # Mock requests.get specifically for version test...87 mock_requests_get.return_value = MockResponse({"releases": {"20.0.0": {}, "2.0.0": {}}}, 200)88 version_result = run_cli(args=['start'], catch_exceptions=False)89 assert "pip install" in version_result.stdout, "New version available! Client should suggest how to update"90 assert "newer version" in version_result.stdout, "New major version available! Client should notify user"91 meeshkan.__version__ = original_version92@pytest.mark.skip("This is hard to test at the moment")93def test_version_mismatch(pre_post_tests): # pylint:disable=unused-argument,redefined-outer-name94 original_version = meeshkan.__version__95 meeshkan.__version__ = '0.0.0'96 with mock.patch("requests.get") as mock_requests_get: # Mock requests.get specifically for version test...97 mock_requests_get.return_value = MockResponse({"releases": {"0.1.0": {}, "0.0.1": {}}}, 200)98 version_result = run_cli(args=['start'], catch_exceptions=False)99 assert "pip install" not in version_result.stdout, "New version minor available! Client should be quieter..."100 assert "newer version" in version_result.stdout, "New major version available! Client should notify user"101 meeshkan.__version__ = original_version102def test_start_stop(pre_post_tests): # pylint: disable=unused-argument,redefined-outer-name103 # Patch CloudClient as it connects to cloud at start-up104 # Lots of reverse-engineering happening here...105 with mock.patch(BUILD_CLOUD_CLIENT_PATCH_PATH) as mock_build_cloud_client:106 mock_cloud_client = PicklableMock()107 mock_build_cloud_client.return_value = mock_cloud_client108 mock_cloud_client.notify_service_start.return_value = None109 start_result = run_cli('start')110 assert start_result.exit_code == 0111 assert Service.is_running(), "Service should be running after using `meeshkan start`"112 stop_result = run_cli(args=['stop'])113 assert not Service.is_running(), "Service should NOT be running after using `meeshkan stop`"114 assert start_result.exit_code == 0, "`meeshkan start` is expected to run without errors"115 assert stop_result.exit_code == 0, "`meeshkan stop` is expected to run without errors"116 assert mock_cloud_client.notify_service_start.call_count == 1, "`notify_service_start` is expected " \117 "to be called only once."118def test_double_start(pre_post_tests): # pylint: disable=unused-argument,redefined-outer-name119 with mock.patch(BUILD_CLOUD_CLIENT_PATCH_PATH) as mock_build_cloud_client:120 mock_cloud_client = PicklableMock()121 mock_build_cloud_client.return_value = mock_cloud_client122 mock_cloud_client.notify_service_start.return_value = None123 start_result = run_cli('start')124 assert Service.is_running(), "Service should be running after using `meeshkan start`"125 double_start_result = run_cli('start')126 assert double_start_result.stdout == "Service is already running.\n", "Service should already be running"127 assert start_result.exit_code == 0, "`meeshkan start` should succeed by default"128 assert double_start_result.exit_code == 0, "Consecutive calls to `meeshkan start`are allowed"129 assert mock_cloud_client.notify_service_start.call_count == 1, "`notify_service_start` is expected " \130 "to be called only once"131def test_start_fail(pre_post_tests): # pylint: disable=unused-argument,redefined-outer-name132 def fail_notify_start(*args, **kwargs): # pylint: disable=unused-argument,redefined-outer-name133 raise RuntimeError("Mocking notify service start failure")134 with mock.patch(BUILD_CLOUD_CLIENT_PATCH_PATH) as mock_build_cloud_client:135 mock_cloud_client = PicklableMock()136 mock_build_cloud_client.return_value = mock_cloud_client137 mock_cloud_client.notify_service_start.side_effect = fail_notify_start138 start_result = run_cli('start')139 assert "Starting the Meeshkan agent failed" in start_result.stdout,\140 "`meeshkan start` is expected to fail with error message"141 assert start_result.exit_code == 1, "`meeshkan start` exit code should be non-zero upon failure"142 assert not Service.is_running(), "Service should not be running!"143def test_help(pre_post_tests): # pylint: disable=unused-argument,redefined-outer-name144 assert_msg1 = "All the commands in __main__ should be listed under `meeshkan help`"145 help_result = run_cli('help')146 assert help_result.exit_code == 0, "`meeshkan help` should run without errors!"147 help_result = [x.strip() for x in help_result.stdout.split("\n")]148 commands = ['cancel', 'clean', 'clear', 'help', 'list', 'logs', 'notifications', 'report', 'setup', 'sorry',149 'start', 'status', 'stop', 'submit']150 assert all([any([output.startswith(command) for output in help_result]) for command in commands]), assert_msg1151def test_start_with_401_fails(pre_post_tests): # pylint: disable=unused-argument,redefined-outer-name152 # Patch CloudClient as it connects to cloud at start-up153 with mock.patch(BUILD_CLOUD_CLIENT_PATCH_PATH) as mock_build_cloud_client:154 mock_cloud_client = PicklableMock()155 mock_build_cloud_client.return_value = mock_cloud_client156 # Raise Unauthorized exception when service start notified157 def side_effect(*args, **kwargs): # pylint: disable=unused-argument158 raise meeshkan.exceptions.UnauthorizedRequestException()159 mock_cloud_client.notify_service_start.side_effect = side_effect160 start_result = run_cli('--silent start')161 assert start_result.exit_code == 1, "`meeshkan start` is expected to fail with UnauthorizedRequestException and " \162 "return a non-zero exit code"163 assert start_result.stdout == UnauthorizedRequestException().message + '\n', "stdout when running `meeshkan " \164 "start` should match the error " \165 "message in " \166 "UnauthorizedRequestException"167 assert not Service.is_running(), "Service should not be running after a failed `start`"168 assert mock_cloud_client.notify_service_start.call_count == 1, "`notify_service_start` should be " \169 "called once (where it fails)"170def test_start_submit(pre_post_tests): # pylint: disable=unused-argument,redefined-outer-name171 # Patch CloudClient as it connects to cloud at start-up172 with mock.patch(BUILD_CLOUD_CLIENT_PATCH_PATH) as mock_build_cloud_client:173 mock_cloud_client = PicklableMock()174 mock_build_cloud_client.return_value = mock_cloud_client175 # Mock notify service start, enough for start-up176 mock_cloud_client.notify_service_start.return_value = None177 mock_cloud_client.post_payload.return_value = None178 start_result = run_cli(args=['start'])179 assert start_result.exit_code == 0, "`start` should run smoothly"180 assert Service.is_running(), "Service should be running after `start`"181 submit_result = run_cli(args='echo Hello') # if it works without the `submit`, it will work with it182 assert submit_result.exit_code == 0, "`submit` is expected to succeed"183 stdout_pattern = r"Job\s(\d+)\ssubmitted\ssuccessfully\swith\sID\s([\w-]+)"184 match = re.match(stdout_pattern, submit_result.stdout)185 job_number = int(match.group(1))186 assert job_number == 1, "Submitted job should have a HID of 1 (first job submitted)"187 job_uuid = match.group(2)188 assert uuid.UUID(job_uuid), "Job UUID should be a valid UUID and match the regex pattern"189 assert Service.is_running(), "Service should still be running!"190 list_result = run_cli(args='list')191 # Better testing at some point.192 assert list_result.exit_code == 0, "`list` is expected to succeed"193 def verify_finished(out):194 out = out.split("\n") # Split per line195 line = [x for x in out if job_uuid in x] # Find the one relevant job_id196 assert len(line) == 1, "There should be only one line with the given job id"197 return "FINISHED" in line[0]198 list_result = run_cli(args='list')199 while not verify_finished(list_result.stdout):200 time.sleep(0.2) # Hacky way to give some time for finishing the task201 list_result = run_cli(args='list')202 # Check stdout and stderr exist203 assert meeshkan.config.JOBS_DIR.joinpath(job_uuid, 'stdout').is_file(), "stdout file is expected to exist after " \204 "job is finished"205 assert meeshkan.config.JOBS_DIR.joinpath(job_uuid, 'stderr').is_file(), "stderr file is expected to exist after " \206 "job is finished"207def test_sorry_success(pre_post_tests): # pylint: disable=unused-argument,redefined-outer-name208 payload = {"data": {"uploadLink": {"upload": "http://localhost", "uploadMethod": "PUT", "headers": ["x:a"]}}}209 mock_session = _build_session(post_return_value=MockResponse(payload, 200),210 request_return_value=MockResponse(status_code=200))211 mock_token_store = _token_store() # no need to connect for a token in this instance212 cloud_client = CloudClient(cloud_url="http://localhost", token_store=mock_token_store,213 build_session=lambda: mock_session)214 def mock_cc_builder(*args): # pylint: disable=unused-argument215 return cloud_client216 with mock.patch('meeshkan.__main__._build_cloud_client', mock_cc_builder):217 sorry_result = run_cli(args=['sorry'])218 assert sorry_result.exit_code == 0, "`sorry` is expected to succeed"219 assert sorry_result.stdout == "Logs uploaded to server succesfully.\n", "`sorry` output message should match"220def test_sorry_upload_fail(pre_post_tests): # pylint: disable=unused-argument,redefined-outer-name221 payload = {"data": {"uploadLink": {"upload": "http://localhost", "uploadMethod": "PUT", "headers": ["x:a"]}}}222 mock_session = _build_session(post_return_value=MockResponse(payload, 200),223 request_return_value=MockResponse(status_code=205))224 mock_token_store = _token_store(build_session=lambda: mock_session)225 cloud_client = CloudClient(cloud_url="http://localhost", token_store=mock_token_store,226 build_session=lambda: mock_session)227 def mock_cc_builder(*args): # pylint: disable=unused-argument228 return cloud_client229 with mock.patch('meeshkan.__main__._build_cloud_client', mock_cc_builder):230 sorry_result = run_cli(args=['sorry'])231 assert sorry_result.exit_code == 1, "`sorry` is expected to fail"232 assert sorry_result.stdout == "Failed uploading logs to server.\n", "`sorry` output message should match"233def test_sorry_connection_fail(pre_post_tests): # pylint: disable=unused-argument,redefined-outer-name234 payload = {"data": {"uploadLink": {"upload": "http://localhost", "uploadMethod": "PUT", "headers": ["x:a"]}}}235 mock_session = _build_session(post_return_value=MockResponse(payload, 404))236 mock_token_store = _token_store(build_session=lambda: mock_session)237 cloud_client = CloudClient(cloud_url="http://localhost", token_store=mock_token_store,238 build_session=lambda: mock_session)239 def mock_cc_builder(*args): # pylint: disable=unused-argument240 return cloud_client241 with mock.patch('meeshkan.__main__._build_cloud_client', mock_cc_builder):242 sorry_result = run_cli(args=['sorry'])243 assert sorry_result.exit_code == 1, "`sorry` is expected to fail"244 assert sorry_result.stdout == "Failed uploading logs to server.\n", "`sorry` output message should match"245def test_empty_list(pre_post_tests): # pylint: disable=unused-argument,redefined-outer-name246 with mock.patch(BUILD_CLOUD_CLIENT_PATCH_PATH) as mock_build_cloud_client:247 mock_cloud_client = PicklableMock()248 mock_build_cloud_client.return_value = mock_cloud_client249 # Mock notify service start, enough for start-up250 mock_cloud_client.notify_service_start.return_value = None251 mock_cloud_client.post_payload.return_value = None252 run_cli(args=['start'])253 list_result = run_cli(args=['list'])254 assert Service.is_running(), "Service should be running after running `start`"255 assert list_result.exit_code == 0, "`list` is expected to succeed"256 assert list_result.stdout == "No jobs submitted yet.\n", "`list` output message should match"257def test_easter_egg(pre_post_tests): # pylint: disable=unused-argument,redefined-outer-name258 easter_egg = run_cli('im-bored') # No mocking as we don't care about get requests here?259 assert easter_egg.exit_code == 0, "easter egg is expected to succeed"260 assert easter_egg.stdout.index(":") > 0, "A colon is used in stdout to separate author and content - where is it?"261def test_clear(pre_post_tests): # pylint: disable=unused-argument,redefined-outer-name262 def do_nothing(*args, **kwargs):263 return264 patch_rmtree = mock.patch('shutil.rmtree', do_nothing)265 patch_rmtree.start()266 clear_result = run_cli(args=['clear'])267 assert clear_result.exit_code == 0, "`clear` is expected to succeed"268 assert "Removing jobs directory" in clear_result.stdout, "`clear` output messages should match"269 assert "Removing logs directory" in clear_result.stdout, "`clear` output messages should match"270 # Sanity tests as we're mocking rmtree -> but even if that fails, the directories should be recreated!271 assert os.path.isdir(meeshkan.config.JOBS_DIR), "Default JOBS directory should exist after `clear`"272 assert os.path.isdir(meeshkan.config.LOGS_DIR), "Default LOGS directory should exist after `clear`"273 patch_rmtree.stop()274def test_status(pre_post_tests): # pylint: disable=unused-argument,redefined-outer-name275 with mock.patch(BUILD_CLOUD_CLIENT_PATCH_PATH) as mock_build_cloud_client:276 mock_cloud_client = PicklableMock()277 mock_build_cloud_client.return_value = mock_cloud_client278 # Mock notify service start, enough for start-up279 mock_cloud_client.notify_service_start.return_value = None280 mock_cloud_client.post_payload.return_value = None281 not_running_status = run_cli(args=['status'])282 assert not_running_status.exit_code == 0, "`status` is expected to succeed even if Service is not running"283 assert "configured to run" in not_running_status.stdout, "`status` message should match"284 run_cli(args=['start'])285 running_status = run_cli(args=['status'])286 assert running_status.exit_code == 0, "`status` is expected to succeed"287 assert "up and running" in running_status.stdout, "`status` message should match"...

Full Screen

Full Screen

test_ro_functional.py

Source:test_ro_functional.py Github

copy

Full Screen

...95# redhat testing #96##################97def testInfoProducts(run_cli, backends):98 bz = _open_bz(REDHAT_URL, **backends)99 out = run_cli("bugzilla info --products", bz)100 _check(out, 123, "Virtualization Tools")101def testInfoComps(run_cli, backends):102 bz = _open_bz(REDHAT_URL, **backends)103 out = run_cli("bugzilla info --components 'Virtualization Tools'", bz)104 _check(out, 8, "virtinst")105def testInfoVers(run_cli, backends):106 bz = _open_bz(REDHAT_URL, **backends)107 out = run_cli("bugzilla info --versions Fedora", bz)108 _check(out, 17, "rawhide")109def testInfoCompOwners(run_cli, backends):110 bz = _open_bz(REDHAT_URL, **backends)111 out = run_cli("bugzilla info "112 "--component_owners 'Virtualization Tools'", bz)113 _check(out, None, "libvirt: Libvirt Maintainers")114def testQuery(run_cli, backends):115 bz = _open_bz(REDHAT_URL, **backends)116 args = "--product Fedora --component python-bugzilla --version 14"117 cli = "bugzilla query %s --bug_status CLOSED" % args118 mincount = 4119 expectbug = "621030"120 out = run_cli(cli, bz)121 assert len(out.splitlines()) >= mincount122 assert bool([l1 for l1 in out.splitlines() if123 l1.startswith("#" + expectbug)])124 # Check --ids output option125 out2 = run_cli(cli + " --ids", bz)126 assert len(out.splitlines()) == len(out2.splitlines())127 assert bool([l2 for l2 in out2.splitlines() if128 l2 == expectbug])129def testQueryFull(run_cli, backends):130 bz = _open_bz(REDHAT_URL, **backends)131 bugid = "621601"132 out = run_cli("bugzilla query --full --bug_id %s" % bugid, bz)133 _check(out, 60, "end-of-life (EOL)")134def testQueryRaw(run_cli, backends):135 bz = _open_bz(REDHAT_URL, **backends)136 bugid = "307471"137 out = run_cli("bugzilla query --raw --bug_id %s" % bugid, bz)138 _check(out, 70, "ATTRIBUTE[whiteboard]: bzcl34nup")139def testQueryOneline(run_cli, backends):140 bz = _open_bz(REDHAT_URL, **backends)141 bugid = "785016"142 out = run_cli("bugzilla query --oneline --bug_id %s" % bugid, bz)143 assert len(out.splitlines()) == 1144 assert out.splitlines()[0].startswith("#%s" % bugid)145 assert "[---] fedora-review+,fedora-cvs+" in out146 bugid = "720784"147 out = run_cli("bugzilla query --oneline --bug_id %s" % bugid, bz)148 assert len(out.splitlines()) == 1149 assert out.splitlines()[0].startswith("#%s" % bugid)150 assert " CVE-2011-2527" in out151def testQueryExtra(run_cli, backends):152 bz = _open_bz(REDHAT_URL, **backends)153 bugid = "307471"154 out = run_cli("bugzilla query --extra --bug_id %s" % bugid, bz)155 assert ("#%s" % bugid) in out156 assert " +Status Whiteboard: bzcl34nup" in out157def testQueryFormat(run_cli, backends):158 bz = _open_bz(REDHAT_URL, **backends)159 args = ("--bug_id 307471 --outputformat=\"id=%{bug_id} "160 "sw=%{whiteboard:status} needinfo=%{flag:needinfo} "161 "sum=%{summary}\"")162 out = run_cli("bugzilla query %s" % args, bz)163 assert "id=307471 sw= bzcl34nup needinfo= " in out164 args = ("--bug_id 785016 --outputformat=\"id=%{bug_id} "165 "sw=%{whiteboard:status} flag=%{flag:fedora-review} "166 "sum=%{summary}\"")167 out = run_cli("bugzilla query %s" % args, bz)168 assert "id=785016 sw= flag=+" in out169 # Unicode in this bug's summary170 args = "--bug_id 522796 --outputformat \"%{summary}\""171 out = run_cli("bugzilla query %s" % args, bz)172 assert "V34 — system" in out173def testQueryURL(run_cli, backends):174 bz = _open_bz(REDHAT_URL, **backends)175 qurl = ("/buglist.cgi?f1=creation_ts"176 "&list_id=973582&o1=greaterthaneq&classification=Fedora&"177 "o2=lessthaneq&query_format=advanced&f2=creation_ts"178 "&v1=2010-01-01&component=python-bugzilla&v2=2010-06-01"179 "&product=Fedora")180 url = REDHAT_URL181 if "/xmlrpc.cgi" in url:182 url = url.replace("/xmlrpc.cgi", qurl)183 else:184 url += qurl185 out = run_cli("bugzilla query --from-url \"%s\"" % url, bz)186 _check(out, 10, "#553878 CLOSED")187def testQueryFixedIn(run_cli, backends):188 bz = _open_bz(REDHAT_URL, **backends)189 out = run_cli("bugzilla query --fixed_in anaconda-15.29-1", bz)190 assert len(out.splitlines()) == 4191 assert "#629311 CLOSED" in out192def testQueryExtrafieldPool(run_cli, backends):193 # rhbz has an agile 'pool' extension but doesn't return the field194 # by default. Check that '-extrafield pool' returns it for --json output195 bz = _open_bz(REDHAT_URL, **backends)196 out1 = run_cli("bugzilla query --id 1717616 --json", bz)197 out2 = run_cli("bugzilla query --id 1717616 --json --extrafield pool", bz)198 assert "current_sprint_id" not in out1199 assert "current_sprint_id" in out2200def testComponentsDetails(backends):201 """202 Fresh call to getcomponentsdetails should properly refresh203 """204 bz = _open_bz(REDHAT_URL, **backends)205 assert bool(bz.getcomponentsdetails("Red Hat Developer Toolset"))206def testGetBugAlias(backends):207 """208 getbug() works if passed an alias209 """210 bz = _open_bz(REDHAT_URL, **backends)211 bug = bz.getbug("CVE-2011-2527")212 assert bug.bug_id == 720773213def testQuerySubComponent(run_cli, backends):214 bz = _open_bz(REDHAT_URL, **backends)215 # Test special error wrappers in bugzilla/_cli.py216 out = run_cli("bugzilla query --product 'Red Hat Enterprise Linux 7' "217 "--component lvm2 --sub-component 'Thin Provisioning'", bz)218 assert len(out.splitlines()) >= 3219 assert "#1060931 " in out220def testBugFields(backends):221 bz = _open_bz(REDHAT_URL, **backends)222 fields = bz.getbugfields(names=["product"])[:]223 assert fields == ["product"]224 bz.getbugfields(names=["product", "bug_status"], force_refresh=True)225 assert set(bz.bugfields) == set(["product", "bug_status"])226def testProductGetMisc(backends):227 bz = _open_bz(REDHAT_URL, **backends)228 assert bz.product_get(ptype="enterable", include_fields=["id"])229 assert bz.product_get(ptype="selectable", include_fields=["name"])230def testBugAutoRefresh(backends):231 bz = _open_bz(REDHAT_URL, **backends)232 bz.bug_autorefresh = True233 bug = bz.query(bz.build_query(bug_id=720773,234 include_fields=["summary"]))[0]235 assert hasattr(bug, "component")236 assert bool(bug.component)237 bz.bug_autorefresh = False238 bug = bz.query(bz.build_query(bug_id=720773,239 include_fields=["summary"]))[0]240 assert not hasattr(bug, "component")241 try:242 assert bool(bug.component)243 except Exception as e:244 assert "adjust your include_fields" in str(e)245def testExtraFields(backends):246 bz = _open_bz(REDHAT_URL, **backends)247 # Check default extra_fields will pull in comments248 bug = bz.getbug(720773, exclude_fields=["product"])249 assert "comments" in dir(bug)250 assert "product" not in dir(bug)251 # Ensure that include_fields overrides default extra_fields252 bug = bz.getbug(720773, include_fields=["summary"])253 assert "summary" in dir(bug)254 assert "comments" not in dir(bug)255def testExternalBugsOutput(run_cli, backends):256 bz = _open_bz(REDHAT_URL, **backends)257 out = run_cli('bugzilla query --bug_id 989253 '258 '--outputformat="%{external_bugs}"', bz)259 assert "bugzilla.gnome.org/show_bug.cgi?id=703421" in out260 assert "External bug: https://bugs.launchpad.net/bugs/1203576" in out261def testActiveComps(run_cli, backends):262 bz = _open_bz(REDHAT_URL, **backends)263 out = run_cli("bugzilla info --components 'Virtualization Tools' "264 "--active-components", bz)265 assert "virtinst" not in out266 out = run_cli("bugzilla info --component_owners 'Virtualization Tools' "267 "--active-components", bz)268 assert "virtinst" not in out269def testFaults(run_cli, backends):270 bz = _open_bz(REDHAT_URL, **backends)271 # Test special error wrappers in bugzilla/_cli.py272 out = run_cli("bugzilla query --field=IDONTEXIST=FOO", bz,273 expectfail=True)274 assert "Server error:" in out275 out = run_cli("bugzilla "276 "--bugzilla https://example.com/xmlrpc.cgi "277 "query --field=IDONTEXIST=FOO", None, expectfail=True)278 assert "Connection lost/failed" in out279 out = run_cli("bugzilla "280 "--bugzilla https://expired.badssl.com/ "281 "query --bug_id 1234", None, expectfail=True)282 assert "trust the remote server" in out283 assert "--nosslverify" in out284def test_login_stubs(backends):285 bz = _open_bz(REDHAT_URL, **backends)286 # Failed login, verifies our backends are calling the correct API287 with pytest.raises(bugzilla.BugzillaError) as e:288 bz.login("foo", "bar")289 assert "Login failed" in str(e)290 # Works fine when not logged in291 bz.logout()292def test_redhat_version(backends):293 bzversion = (5, 0)...

Full Screen

Full Screen

test_cli.py

Source:test_cli.py Github

copy

Full Screen

...5import pyinfra6from pyinfra import pseudo_state7from pyinfra_cli.main import _main, cli8from ..paramiko_util import PatchSSHTestCase9def run_cli(*arguments):10 pyinfra.is_cli = True11 runner = CliRunner()12 result = runner.invoke(cli, arguments)13 pyinfra.is_cli = False14 return result15class TestCliEagerFlags(TestCase):16 def test_print_help(self):17 result = run_cli('--version')18 assert result.exit_code == 019 result = run_cli('--help')20 assert result.exit_code == 021 def test_print_facts_list(self):22 result = run_cli('--facts')23 assert result.exit_code == 024 def test_print_operations_list(self):25 result = run_cli('--operations')26 assert result.exit_code == 027class TestCliDeployRuns(PatchSSHTestCase):28 def setUp(self):29 pseudo_state.reset()30 def test_invalid_deploy(self):31 result = run_cli(32 '@local',33 'not-a-file.py',34 )35 assert result.exit_code == 136 assert 'No deploy file: not-a-file.py' in result.stdout37 def test_deploy_inventory(self):38 result = run_cli(39 path.join('tests', 'deploy', 'inventories', 'inventory.py'),40 'server.shell',41 '--debug-data',42 )43 assert result.exit_code == 044 def test_get_facts(self):45 result = run_cli(46 path.join('tests', 'deploy', 'inventories', 'inventory.py'),47 'fact',48 'os',49 )50 assert result.exit_code == 051 assert '"somehost": null' in result.stdout52 def test_deploy_operation(self):53 result = run_cli(54 path.join('tests', 'deploy', 'inventories', 'inventory.py'),55 'server.shell',56 'echo hi',57 )58 assert result.exit_code == 059 def test_deploy_operation_with_all(self):60 result = run_cli(61 path.join('tests', 'deploy', 'inventory_all.py'),62 'server.shell',63 'echo hi',64 )65 assert result.exit_code == 066 def test_exec_command(self):67 result = run_cli(68 path.join('tests', 'deploy', 'inventories', 'inventory.py'),69 'exec',70 '--',71 'echo hi',72 )73 assert result.exit_code == 074 def test_exec_command_with_options(self):75 result = run_cli(76 path.join('tests', 'deploy', 'inventories', 'inventory.py'),77 'exec',78 '--sudo',79 '--sudo-user', 'pyinfra',80 '--su-user', 'pyinfrawhat',81 '--port', '1022',82 '--user', 'ubuntu',83 '--',84 'echo hi',85 )86 assert result.exit_code == 087 def test_exec_command_with_serial(self):88 result = run_cli(89 path.join('tests', 'deploy', 'inventories', 'inventory.py'),90 'exec',91 '--serial',92 '--',93 'echo hi',94 )95 assert result.exit_code == 096 def test_exec_command_with_no_wait(self):97 result = run_cli(98 path.join('tests', 'deploy', 'inventories', 'inventory.py'),99 'exec',100 '--no-wait',101 '--',102 'echo hi',103 )104 assert result.exit_code == 0105 def test_exec_command_with_debug_operations(self):106 result = run_cli(107 path.join('tests', 'deploy', 'inventories', 'inventory.py'),108 'exec',109 '--debug-operations',110 '--',111 'echo hi',112 )113 assert result.exit_code == 0114 def test_exec_command_with_debug_facts(self):115 result = run_cli(116 path.join('tests', 'deploy', 'inventories', 'inventory.py'),117 'exec',118 '--debug-facts',119 '--',120 'echo hi',121 )122 assert result.exit_code == 0123 def test_exec_command_with_debug_data_limit(self):124 result = run_cli(125 path.join('tests', 'deploy', 'inventories', 'inventory.py'),126 'exec',127 '--debug-data',128 '--limit', 'somehost',129 '--',130 'echo hi',131 )132 assert result.exit_code == 0133class TestCliDeployState(PatchSSHTestCase):134 def test_deploy(self):135 # Run 3 iterations of the test - each time shuffling the order of the136 # hosts - ensuring that the ordering has no effect on the operation order.137 for _ in range(3):138 self._do_test_deploy()139 def _do_test_deploy(self):140 pseudo_state.reset()141 correct_op_name_and_host_names = [142 ('First main operation', True), # true for all hosts143 ('Second main operation', ('somehost',)),144 ('tests/deploy/tasks/a_task.py | First task operation', ('anotherhost',)),145 ('tests/deploy/tasks/a_task.py | Second task operation', ('anotherhost',)),146 ('tests/deploy/tasks/a_task.py | First task operation', True),147 ('tests/deploy/tasks/a_task.py | Second task operation', True),148 ('Loop-0 main operation', True),149 ('Loop-1 main operation', True),150 ('Third main operation', True),151 ('Order loop 1', True),152 ('2nd Order loop 1', True),153 ('Order loop 2', True),154 ('2nd Order loop 2', True),155 ('Final limited operation', ('somehost',)),156 ]157 hosts = ['somehost', 'anotherhost', 'someotherhost']158 shuffle(hosts)159 result = run_cli(160 ','.join(hosts),161 path.join('tests', 'deploy', 'deploy.py'),162 )163 assert result.exit_code == 0164 state = pseudo_state165 op_order = state.get_op_order()166 assert (167 len(correct_op_name_and_host_names) == len(op_order)168 ), 'Incorrect number of operations detected'169 for i, (correct_op_name, correct_host_names) in enumerate(170 correct_op_name_and_host_names,171 ):172 op_hash = op_order[i]173 op_meta = state.op_meta[op_hash]...

Full Screen

Full Screen

manage.py

Source:manage.py Github

copy

Full Screen

1import logging2import shutil3import subprocess4import sys5from pathlib import Path6from typing import List7import click8import typer9import config as conf10import loggers11from prodstats.main import app12loggers.config()13logger = logging.getLogger()14CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"], ignore_unknown_options=True)15CELERY_LOG_LEVEL_NAME: str = loggers.mlevelname(conf.CELERY_LOG_LEVEL)16def hr():17 return "-" * shutil.get_terminal_size().columns + "\n"18# dev_cli = typer.Typer(help="Development tools")19db_cli = typer.Typer(help="Database Management")20test_cli = typer.Typer(help="Test Commands")21delete_cli = typer.Typer(help="Delete things")22# ----------------------------- subcommands -------------------------------- #23@test_cli.command(help="Execute a smoke test against a worker instance")24def smoke_test():25 # TODO: implement26 logger.warning("verified")27@test_cli.command(help="Placeholder")28def unit_test():29 """ Unittest placeholder """30 # TODO: implement31 # logger.warning("verified")32@db_cli.command(help="Create a directory to manage database migrations")33def init(dir: Path = Path(conf.MIGRATION_DIR), args: List[str] = None):34 cmd = ["alembic", "init", str(dir)] + (args or [])35 subprocess.call(cmd)36@db_cli.command(help="Create a new migration revision")37def migrate(args: List[str] = None):38 cmd = ["alembic", "revision", "--autogenerate", "--head", "head"] + (args or [])39 subprocess.call(cmd)40@db_cli.command(help="Apply pending migrations to the database")41def upgrade(args: List[str] = None):42 logger.warning("Applying database migrations")43 cmd = ["alembic", "upgrade", "head"] + (args or [])44 subprocess.call(cmd)45@db_cli.command(help="Downgrade to a previous revision of the database")46def downgrade(revision: str = "-1", args: List[str] = None):47 cmd = ["alembic", "downgrade", revision] + (args or [])48 subprocess.call(cmd)49@db_cli.command(50 help="Drop the current database and rebuild using the existing migrations"51)52def recreate(args: List[str] = None): # nocover53 if conf.ENV not in ["dev", "development"]:54 logger.error(55 f"""Cant recreate database when not in development mode. Set ENV=development as an environment variable to enable this feature.""" # noqa56 )57 sys.exit(0)58 else:59 from sqlalchemy_utils import create_database, database_exists, drop_database60 url = conf.ALEMBIC_CONFIG.url61 short_url = str(url).split("@")[-1]62 if database_exists(url):63 logger.warning(f"Dropping existing database: {short_url}")64 drop_database(url)65 logger.warning(f"Recreating database at: {short_url}")66 create_database(url)67 else:68 logger.warning(f"Creating new database at: {short_url}")69 create_database(url)70 upgrade()71 logger.warning("Database recreation complete")72# --- run -------------------------------------------------------------------- #73# run_cli = typer.Typer(help="Execution procedures")74# NOTE: typer doesn't yet support passing unknown options. The workaround below is75# creating a click parent group and adding each typer group as a sub-group76# of the click parent, then creating a click group to handle the commands77# that need dynamic arguments.78cli = click.Group(79 help="Prodstats: Ingest, process, and analyze production and well data"80)81run_cli = click.Group("run", help="Execution procedures")82@run_cli.command(83 help="Launch a web process to serve the api",84 context_settings={"ignore_unknown_options": True},85)86@click.argument("args", nargs=-1, type=click.UNPROCESSED)87def web(args):88 cmd = ["uvicorn", "prodstats.main:app"] + list(args)89 subprocess.call(cmd) # nocover90@run_cli.command(91 help="Launch a web process with hot reload enabled",92 context_settings={"ignore_unknown_options": True},93)94@click.argument("args", nargs=-1, type=click.UNPROCESSED)95def dev(args):96 cmd = ["uvicorn", "prodstats.main:app", "--reload"] + list(args)97 subprocess.call(cmd)98@run_cli.command(99 help="Launch a Celery worker", context_settings={"ignore_unknown_options": True}100)101@click.argument("celery_args", nargs=-1, type=click.UNPROCESSED)102def worker(celery_args):103 cmd = ["celery", "-A", "cq:celery_app", "worker"] + list(celery_args)104 subprocess.call(cmd)105@run_cli.command(106 help="Launch a Celery Beat scheduler",107 context_settings={"ignore_unknown_options": True},108)109@click.argument("celery_args", nargs=-1, type=click.UNPROCESSED)110def cron(celery_args):111 cmd = ["celery", "-A", "cq:celery_app", "beat"] + list(celery_args)112 subprocess.call(cmd)113@run_cli.command(114 help="Launch a monitoring process running flower",115 context_settings={"ignore_unknown_options": True},116)117@click.argument("celery_args", nargs=-1, type=click.UNPROCESSED)118def flower(celery_args):119 cmd = ["celery", "-A", "cq:celery_app", "flower"] + list(celery_args)120 subprocess.call(cmd)121@run_cli.command(help="Manually send a task to the worker cluster")122@click.argument("task")123def task(task: str):124 """Run a one-off task. Pass the name of the scoped task to run.125 Ex. endpoint_name.task_name"""126 # from cq.tasks import sync_endpoint127 try:128 if "." in task:129 typer.secho(f"{task=}")130 else:131 raise ValueError132 except ValueError:133 typer.secho("Invalid task format. Try specifying ENDPOINT_NAME.TASK_NAME")134 return 0135# --- top -------------------------------------------------------------------- #136@cli.command(help="List api routes")137def routes():138 for r in app.routes:139 typer.echo(f"{r.name:<25} {r.path:<30} {r.methods}")140# --- attach groups ---------------------------------------------------------- #141cli.add_command(run_cli)142cli.add_command(typer.main.get_command(db_cli), "db")143cli.add_command(typer.main.get_command(test_cli), "test")144# cli.add_command(typer.main.get_command(dev_cli), "dev")145# cli.add_command(typer.main.get_command(delete_cli), "delete")146def main(argv: List[str] = sys.argv):147 """148 Args:149 argv (list): List of arguments150 Returns:151 int: A return code152 Does stuff.153 """154 cli()155if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Robotframework automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful