Best Python code snippet using refurb_python
test_tasks.py
Source:test_tasks.py  
1# Licensed under the Apache License, Version 2.0 (the "License");2# you may not use this file except in compliance with the License.3# You may obtain a copy of the License at4#5# http://www.apache.org/licenses/LICENSE-2.06#7# Unless required by applicable law or agreed to in writing, software8# distributed under the License is distributed on an "AS IS" BASIS,9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.10# See the License for the specific language governing permissions and11# limitations under the License.12import celery13import pretend14import pytest15from warehouse.malware import tasks16from warehouse.malware.models import MalwareCheck, MalwareCheckState, MalwareVerdict17from warehouse.malware.services import PrinterMalwareCheckService18from ...common import checks as test_checks19from ...common.db.malware import MalwareCheckFactory, MalwareVerdictFactory20from ...common.db.packaging import FileFactory, ProjectFactory, ReleaseFactory21class TestRunCheck:22    def test_success(self, db_request, monkeypatch):23        db_request.route_url = pretend.call_recorder(lambda *a, **kw: "fake_route")24        monkeypatch.setattr(tasks, "checks", test_checks)25        file0 = FileFactory.create()26        MalwareCheckFactory.create(27            name="ExampleHookedCheck", state=MalwareCheckState.Enabled28        )29        task = pretend.stub()30        tasks.run_check(task, db_request, "ExampleHookedCheck", obj_id=file0.id)31        assert db_request.route_url.calls == [32            pretend.call("packaging.file", path=file0.path)33        ]34        assert db_request.db.query(MalwareVerdict).one()35    @pytest.mark.parametrize(("manually_triggered"), [True, False])36    def test_evaluation_run(self, db_session, monkeypatch, manually_triggered):37        monkeypatch.setattr(tasks, "checks", test_checks)38        MalwareCheckFactory.create(39            name="ExampleScheduledCheck", state=MalwareCheckState.Evaluation40        )41        ProjectFactory.create()42        task = pretend.stub()43        request = pretend.stub(44            db=db_session,45            log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),46        )47        tasks.run_check(48            task,49            request,50            "ExampleScheduledCheck",51            manually_triggered=manually_triggered,52        )53        if manually_triggered:54            assert db_session.query(MalwareVerdict).one()55        else:56            assert request.log.info.calls == [57                pretend.call(58                    "ExampleScheduledCheck is in the `evaluation` state and must be \59manually triggered to run."60                )61            ]62            assert db_session.query(MalwareVerdict).all() == []63    def test_disabled_check(self, db_session, monkeypatch):64        monkeypatch.setattr(tasks, "checks", test_checks)65        MalwareCheckFactory.create(66            name="ExampleHookedCheck", state=MalwareCheckState.Disabled67        )68        task = pretend.stub()69        request = pretend.stub(70            db=db_session,71            log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),72        )73        file = FileFactory.create()74        tasks.run_check(task, request, "ExampleHookedCheck", obj_id=file.id)75        assert request.log.info.calls == [76            pretend.call("Check ExampleHookedCheck isn't active. Aborting.")77        ]78    def test_missing_check(self, db_request, monkeypatch):79        monkeypatch.setattr(tasks, "checks", test_checks)80        task = pretend.stub()81        with pytest.raises(AttributeError):82            tasks.run_check(task, db_request, "DoesNotExistCheck")83    def test_missing_obj_id(self, db_session, monkeypatch):84        monkeypatch.setattr(tasks, "checks", test_checks)85        task = pretend.stub()86        MalwareCheckFactory.create(87            name="ExampleHookedCheck", state=MalwareCheckState.Enabled88        )89        task = pretend.stub()90        request = pretend.stub(91            db=db_session,92            log=pretend.stub(error=pretend.call_recorder(lambda *args, **kwargs: None)),93        )94        tasks.run_check(task, request, "ExampleHookedCheck")95        assert request.log.error.calls == [96            pretend.call(97                "Fatal exception: ExampleHookedCheck: Missing required kwarg `obj_id`"98            )99        ]100    def test_retry(self, db_session, monkeypatch):101        monkeypatch.setattr(tasks, "checks", test_checks)102        exc = Exception("Scan failed")103        def scan(self, **kwargs):104            raise exc105        monkeypatch.setattr(tasks.checks.ExampleHookedCheck, "scan", scan)106        MalwareCheckFactory.create(107            name="ExampleHookedCheck", state=MalwareCheckState.Enabled108        )109        task = pretend.stub(110            retry=pretend.call_recorder(pretend.raiser(celery.exceptions.Retry))111        )112        request = pretend.stub(113            db=db_session,114            log=pretend.stub(error=pretend.call_recorder(lambda *args, **kwargs: None)),115            route_url=pretend.call_recorder(lambda *a, **kw: pretend.stub()),116        )117        file = FileFactory.create()118        with pytest.raises(celery.exceptions.Retry):119            tasks.run_check(task, request, "ExampleHookedCheck", obj_id=file.id)120        assert request.log.error.calls == [121            pretend.call("Error executing check ExampleHookedCheck: Scan failed")122        ]123        assert task.retry.calls == [pretend.call(exc=exc)]124class TestRunScheduledCheck:125    def test_invalid_check_name(self, db_request, monkeypatch):126        monkeypatch.setattr(tasks, "checks", test_checks)127        task = pretend.stub()128        with pytest.raises(AttributeError):129            tasks.run_scheduled_check(task, db_request, "DoesNotExist")130    def test_run_check(self, db_session, capfd, monkeypatch):131        MalwareCheckFactory.create(132            name="ExampleScheduledCheck", state=MalwareCheckState.Enabled133        )134        request = pretend.stub(135            db=db_session,136            find_service_factory=pretend.call_recorder(137                lambda interface: PrinterMalwareCheckService.create_service138            ),139        )140        task = pretend.stub()141        tasks.run_scheduled_check(task, request, "ExampleScheduledCheck")142        assert request.find_service_factory.calls == [143            pretend.call(tasks.IMalwareCheckService)144        ]145        out, err = capfd.readouterr()146        assert out == "ExampleScheduledCheck {'manually_triggered': False}\n"147class TestBackfill:148    def test_invalid_check_name(self, db_request, monkeypatch):149        monkeypatch.setattr(tasks, "checks", test_checks)150        task = pretend.stub()151        with pytest.raises(AttributeError):152            tasks.backfill(task, db_request, "DoesNotExist", 1)153    @pytest.mark.parametrize(154        ("num_objects", "num_runs"), [(11, 1), (11, 11), (101, 90)]155    )156    def test_run(self, db_session, capfd, num_objects, num_runs, monkeypatch):157        monkeypatch.setattr(tasks, "checks", test_checks)158        ids = []159        for i in range(num_objects):160            ids.append(FileFactory.create().id)161        MalwareCheckFactory.create(162            name="ExampleHookedCheck", state=MalwareCheckState.Enabled163        )164        request = pretend.stub(165            db=db_session,166            log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),167            find_service_factory=pretend.call_recorder(168                lambda interface: PrinterMalwareCheckService.create_service169            ),170        )171        task = pretend.stub()172        tasks.backfill(task, request, "ExampleHookedCheck", num_runs)173        assert request.log.info.calls == [174            pretend.call("Running backfill on %d Files." % num_runs)175        ]176        assert request.find_service_factory.calls == [177            pretend.call(tasks.IMalwareCheckService)178        ]179        out, err = capfd.readouterr()180        num_output_lines = 0181        for file_id in ids:182            logged_output = "ExampleHookedCheck:%s %s\n" % (183                file_id,184                {"manually_triggered": True},185            )186            num_output_lines += 1 if logged_output in out else 0187        assert num_output_lines == num_runs188class TestSyncChecks:189    def test_no_updates(self, db_session, monkeypatch):190        monkeypatch.setattr(tasks, "checks", test_checks)191        monkeypatch.setattr(tasks.checks.ExampleScheduledCheck, "version", 2)192        MalwareCheckFactory.create(193            name="ExampleHookedCheck", state=MalwareCheckState.Disabled194        )195        MalwareCheckFactory.create(196            name="ExampleScheduledCheck", state=MalwareCheckState.Disabled197        )198        MalwareCheckFactory.create(199            name="ExampleScheduledCheck", state=MalwareCheckState.Enabled, version=2200        )201        task = pretend.stub()202        request = pretend.stub(203            db=db_session,204            log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),205        )206        tasks.sync_checks(task, request)207        assert request.log.info.calls == [208            pretend.call("2 malware checks found in codebase."),209            pretend.call("ExampleHookedCheck is unmodified."),210            pretend.call("ExampleScheduledCheck is unmodified."),211        ]212    @pytest.mark.parametrize(213        ("final_state"), [MalwareCheckState.Enabled, MalwareCheckState.Disabled]214    )215    def test_upgrade_check(self, monkeypatch, db_session, final_state):216        monkeypatch.setattr(tasks, "checks", test_checks)217        monkeypatch.setattr(tasks.checks.ExampleHookedCheck, "version", 2)218        MalwareCheckFactory.create(name="ExampleHookedCheck", state=final_state)219        MalwareCheckFactory.create(220            name="ExampleScheduledCheck", state=MalwareCheckState.Disabled221        )222        task = pretend.stub()223        request = pretend.stub(224            db=db_session,225            log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),226        )227        tasks.sync_checks(task, request)228        assert request.log.info.calls == [229            pretend.call("2 malware checks found in codebase."),230            pretend.call("Updating existing ExampleHookedCheck."),231            pretend.call("ExampleScheduledCheck is unmodified."),232        ]233        db_checks = (234            db_session.query(MalwareCheck)235            .filter(MalwareCheck.name == "ExampleHookedCheck")236            .all()237        )238        assert len(db_checks) == 2239        if final_state == MalwareCheckState.Disabled:240            assert (241                db_checks[0].state == db_checks[1].state == MalwareCheckState.Disabled242            )243        else:244            for c in db_checks:245                if c.state == final_state:246                    assert c.version == 2247                else:248                    assert c.version == 1249    def test_one_new_check(self, db_session, monkeypatch):250        monkeypatch.setattr(tasks, "checks", test_checks)251        MalwareCheckFactory.create(252            name="ExampleHookedCheck", state=MalwareCheckState.Disabled253        )254        MalwareCheckFactory.create(255            name="ExampleScheduledCheck", state=MalwareCheckState.Disabled256        )257        task = pretend.stub()258        class FakeMalwareCheck:259            version = 1260            short_description = "This is a short description."261            long_description = "This is a longer description."262            check_type = "scheduled"263            schedule = {"minute": "0", "hour": "*/8"}264        tasks.checks.FakeMalwareCheck = FakeMalwareCheck265        request = pretend.stub(266            db=db_session,267            log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),268        )269        tasks.sync_checks(task, request)270        assert request.log.info.calls == [271            pretend.call("3 malware checks found in codebase."),272            pretend.call("ExampleHookedCheck is unmodified."),273            pretend.call("ExampleScheduledCheck is unmodified."),274            pretend.call("Adding new FakeMalwareCheck to the database."),275        ]276        assert db_session.query(MalwareCheck).count() == 3277        new_check = (278            db_session.query(MalwareCheck)279            .filter(MalwareCheck.name == "FakeMalwareCheck")280            .one()281        )282        assert new_check.state == MalwareCheckState.Disabled283        del tasks.checks.FakeMalwareCheck284    def test_too_many_db_checks(self, db_session, monkeypatch):285        monkeypatch.setattr(tasks, "checks", test_checks)286        MalwareCheckFactory.create(287            name="ExampleHookedCheck", state=MalwareCheckState.Enabled288        )289        MalwareCheckFactory.create(290            name="ExampleScheduledCheck", state=MalwareCheckState.Enabled291        )292        MalwareCheckFactory.create(293            name="AnotherCheck", state=MalwareCheckState.Evaluation, version=2294        )295        task = pretend.stub()296        request = pretend.stub(297            db=db_session,298            log=pretend.stub(299                info=pretend.call_recorder(lambda *args, **kwargs: None),300                error=pretend.call_recorder(lambda *args, **kwargs: None),301            ),302        )303        with pytest.raises(Exception):304            tasks.sync_checks(task, request)305        assert request.log.info.calls == [306            pretend.call("2 malware checks found in codebase.")307        ]308        assert request.log.error.calls == [309            pretend.call(310                "Found 3 active checks in the db, but only 2 checks in code. Please \311manually move superfluous checks to the wiped_out state in the check admin: \312AnotherCheck"313            )314        ]315    def test_only_wiped_out(self, db_session, monkeypatch):316        monkeypatch.setattr(tasks, "checks", test_checks)317        MalwareCheckFactory.create(318            name="ExampleHookedCheck", state=MalwareCheckState.WipedOut319        )320        MalwareCheckFactory.create(321            name="ExampleScheduledCheck", state=MalwareCheckState.WipedOut322        )323        task = pretend.stub()324        request = pretend.stub(325            db=db_session,326            log=pretend.stub(327                info=pretend.call_recorder(lambda *args, **kwargs: None),328                error=pretend.call_recorder(lambda *args, **kwargs: None),329            ),330        )331        tasks.sync_checks(task, request)332        assert request.log.info.calls == [333            pretend.call("2 malware checks found in codebase.")334        ]335        assert request.log.error.calls == [336            pretend.call(337                "ExampleHookedCheck is wiped_out and cannot be synced. Please remove check \338from codebase."339            ),340            pretend.call(341                "ExampleScheduledCheck is wiped_out and cannot be synced. Please remove check \342from codebase."343            ),344        ]345class TestRemoveVerdicts:346    def test_no_verdicts(self, db_session):347        check = MalwareCheckFactory.create()348        request = pretend.stub(349            db=db_session,350            log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),351        )352        task = pretend.stub()353        removed = tasks.remove_verdicts(task, request, check.name)354        assert request.log.info.calls == [355            pretend.call(356                "Removing 0 malware verdicts associated with %s version 1." % check.name357            )358        ]359        assert removed == 0360    @pytest.mark.parametrize(("check_with_verdicts"), [True, False])361    def test_many_verdicts(self, db_session, check_with_verdicts):362        check0 = MalwareCheckFactory.create()363        check1 = MalwareCheckFactory.create()364        project = ProjectFactory.create(name="foo")365        release = ReleaseFactory.create(project=project)366        file0 = FileFactory.create(release=release, filename="foo.bar")367        num_verdicts = 10368        for i in range(num_verdicts):369            MalwareVerdictFactory.create(check=check1, release_file=file0)370        assert db_session.query(MalwareVerdict).count() == num_verdicts371        request = pretend.stub(372            db=db_session,373            log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),374        )375        task = pretend.stub()376        if check_with_verdicts:377            wiped_out_check = check1378        else:379            wiped_out_check = check0380            num_verdicts = 0381        removed = tasks.remove_verdicts(task, request, wiped_out_check.name)382        assert request.log.info.calls == [383            pretend.call(384                "Removing %d malware verdicts associated with %s version 1."385                % (num_verdicts, wiped_out_check.name)386            )387        ]...test_doctor.py
Source:test_doctor.py  
1""" Doctor command unit tests """2# standard python imports3import subprocess4import time5import unittest.mock as mock6# 3rd party imports7import pytest8# two1 imports9# importing the class directly to get around renaming doctor10from two1.commands.doctor import Check11from two1.commands.doctor import Doctor12@pytest.mark.parametrize("url, return_value, server_port", [13    ("http://0.0.0.0:8000", True, 8000),14    ("http://0.0.0.0:8001", False, 8000),15    ("http://0.0.0.0", False, 8000),16    ("https://0.0.0.0:8000", True, 8000),17    ("https://0.0.0.0", False, 8000),18    ])19def test_make_http_connection(doctor, url, return_value, server_port):20    """ Fires up an http server to check functionality of make_connection """21    server_cmd = "python3 -m http.server {}"22    proc = subprocess.Popen([server_cmd.format(server_port)], shell=True, stdout=subprocess.PIPE)23    time.sleep(.5)24    assert doctor.make_http_connection(url) == return_value25    proc.kill()26    proc.wait()27@pytest.mark.parametrize("checks, expected_length", [28    ({29        "type1": [30            Check("name", "message", "value", Check.Result.PASS),31            Check("name", "message", "value", Check.Result.FAIL),32            Check("name", "message", "value", Check.Result.SKIP),33            Check("name", "message", "value", Check.Result.WARN)]}, 4),34    ({35        "type1": [36            Check("name", "message", "value", Check.Result.FAIL),37            Check("name", "message", "value", Check.Result.FAIL)],38        "type2": [39            Check("name", "message", "value", Check.Result.FAIL),40            Check("name", "message", "value", Check.Result.FAIL)]}, 4),41    ({"type1": []}, 0)])42def test_get_checks(doctor, checks, expected_length):43    """ Ensures the function get_checks is returning a flat list of checks """44    # sets the checks list manually instead of running tests45    doctor.checks = checks46    returned_checks = doctor.get_checks()47    assert isinstance(returned_checks, list)48    assert len(returned_checks) == expected_length49@pytest.mark.parametrize("checks, expected_length, result_filter", [50    ({51        "type1": [52            Check("name", "message", "value", Check.Result.PASS),53            Check("name", "message", "value", Check.Result.FAIL),54            Check("name", "message", "value", Check.Result.SKIP),55            Check("name", "message", "value", Check.Result.WARN)]},56     1,57     Check.Result.PASS),58    ({59        "type1": [60            Check("name", "message", "value", Check.Result.FAIL),61            Check("name", "message", "value", Check.Result.FAIL)],62        "type2": [63            Check("name", "message", "value", Check.Result.FAIL),64            Check("name", "message", "value", Check.Result.FAIL)]},65     0,66     Check.Result.WARN),67    ({68        "type1": [69            Check("name", "message", "value", Check.Result.FAIL),70            Check("name", "message", "value", Check.Result.FAIL)],71        "type2": [72            Check("name", "message", "value", Check.Result.FAIL),73            Check("name", "message", "value", Check.Result.FAIL)]},74     4,75     Check.Result.FAIL),76    ({"type1": []}, 0, Check.Result.PASS),77    ({"type1": []}, 0, "donkey")])78def test_get_checks_with_result_filter(doctor, checks, expected_length, result_filter):79    """ Checks that get_checks returns a flat list of checks and uses a search filter """80    # sets the checks list manually instead of running tests81    doctor.checks = checks82    if not isinstance(result_filter, Check.Result):83        with pytest.raises(ValueError):84            returned_checks = doctor.get_checks(result_filter)85    else:86        returned_checks = doctor.get_checks(result_filter)87        assert isinstance(returned_checks, list)88        assert len(returned_checks) == expected_length89@pytest.mark.parametrize("test_checks", [90    ({91        "type1": [92            Check("name", "message", "value", Check.Result.PASS),93            Check("name", "message", "value", Check.Result.FAIL),94            Check("name", "message", "value", Check.Result.SKIP),95            Check("name", "message", "value", Check.Result.WARN)]}),96    ({97        "type1": [98            Check("name", "message", "value", Check.Result.FAIL),99            Check("name", "message", "value", Check.Result.FAIL)],100        "type2": [101            Check("name", "message", "value", Check.Result.FAIL),102            Check("name", "message", "value", Check.Result.FAIL)]}),103    ({"type1": []})])104def test_to_dict(doctor, test_checks):105    """ Ensures that the Doctor.to_doct is returning the correct dict values106        for all check data members107    """108    # sets the checks list manually instead of running tests109    doctor.checks = test_checks110    doc_dict = doctor.to_dict()111    assert isinstance(doc_dict, dict)112    for check_type in test_checks.keys():113        for check_obj, check_dict in zip(test_checks[check_type], doc_dict[check_type]):114            assert check_obj.name == check_dict["name"]115            assert check_obj.message == check_dict["message"]116            assert check_obj.value == check_dict["value"]117            assert check_obj.result.name == check_dict["result"]118@pytest.mark.integration119def test_doctor_integration(doctor):120    """ Runs the full doctor suite of tests and ensures there are no failures"""121    specialties = Doctor.SPECIALTIES122    # Gets a dict of the types of checks and the functions of that check type as a list123    expected_doctor_checks = {check_type: [] for check_type in specialties.keys()}124    for attr_name in dir(doctor):125        for check_type in specialties.keys():126            if attr_name.startswith("check_{}".format(check_type)) and callable(getattr(doctor, attr_name)):127                expected_doctor_checks[check_type].append(attr_name)128    # runs each of the different checks129    for check_type in expected_doctor_checks.keys():130        doctor.checkup(check_type)131    # gets the results from all checks in dict form132    appointment_results = doctor.to_dict()133    # iterates over all expected checks ensuring they were actually called134    for check_type in expected_doctor_checks.keys():135        expected_check_functions = expected_doctor_checks[check_type]136        actual_check_functions = appointment_results[check_type]137        for check_result in actual_check_functions:138            assert check_result['name'] in expected_check_functions139        assert len(actual_check_functions) == len(expected_check_functions)140    # makes sure there are no failures141    assert len(doctor.get_checks(Check.Result.FAIL)) == 0142@pytest.mark.parametrize('system, release_os, check_status', [143    ('Linux', '4.0.0', Check.Result.PASS),144    ('Darwin', '14.5.0', Check.Result.PASS),145])146def test_doctor_operating_system_check(doctor, system, release_os, check_status):147    """ Unit test the ability to check the user's operating system."""148    with mock.patch('platform.system', mock.Mock(return_value=system)), mock.patch('platform.release', mock.Mock(return_value=release_os)):  # nopep8149        status, _, actual_os = doctor.check_general_operating_system_release()150    assert status == check_status...verification_program.py
Source:verification_program.py  
1import sys2import subprocess3import numpy as np4args = sys.argv5exe_path = sys.argv[1]6n_tests = int(sys.argv[-1])7print(n_tests)8test_checks = []9for i in range(n_tests):10    random_permutation = np.random.permutation(range(1, 101))11    random_permutation_args = [f'{n}' for n in random_permutation]12    sleep_sort_args = [exe_path] + random_permutation_args13    result = subprocess.run(sleep_sort_args, stdout=subprocess.PIPE).stdout.decode('utf-8')14    result_lst = result.split(' ')[:-1]15    result_lst = list(map(int, result_lst))16    check = all(result_lst[i] < result_lst[i + 1] for i in range(len(result_lst) - 1))17    test_checks.append(check)18print(f'{sum(test_checks)} / {len(test_checks)}')19if sum(test_checks) / len(test_checks) < 0.9:20    print('Not OK')21else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
