Best Python code snippet using autotest_python
monitor_db_functional_unittest.py
Source:monitor_db_functional_unittest.py  
...251        self.mock_config.set_config_value('SCHEDULER', 'pidfile_timeout_mins',252                                          1)253        self.mock_config.set_config_value('SCHEDULER', 'gc_stats_interval_mins',254                                          999999)255    def _initialize_test(self):256        self.dispatcher.initialize()257    def _run_dispatcher(self):258        for _ in xrange(self._A_LOT_OF_TICKS):259            self.dispatcher.tick()260    def test_idle(self):261        self._initialize_test()262        self._run_dispatcher()263    def _assert_process_executed(self, working_directory, pidfile_name):264        process_was_executed = self.mock_drone_manager.was_process_executed(265            'hosts/host1/1-verify', drone_manager.AUTOSERV_PID_FILE)266        self.assert_(process_was_executed,267                     '%s/%s not executed' % (working_directory, pidfile_name))268    def _update_instance(self, model_instance):269        return type(model_instance).objects.get(pk=model_instance.pk)270    def _check_statuses(self, queue_entry, queue_entry_status,271                        host_status=None):272        self._check_entry_status(queue_entry, queue_entry_status)273        if host_status:274            self._check_host_status(queue_entry.host, host_status)275    def _check_entry_status(self, queue_entry, status):276        # update from DB277        queue_entry = self._update_instance(queue_entry)278        self.assertEquals(queue_entry.status, status)279    def _check_host_status(self, host, status):280        # update from DB281        host = self._update_instance(host)282        self.assertEquals(host.status, status)283    def _run_pre_job_verify(self, queue_entry):284        self._run_dispatcher()  # launches verify285        self._check_statuses(queue_entry, HqeStatus.VERIFYING,286                             HostStatus.VERIFYING)287        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)288    def test_simple_job(self):289        self._initialize_test()290        job, queue_entry = self._make_job_and_queue_entry()291        self._run_pre_job_verify(queue_entry)292        self._run_dispatcher()  # launches job293        self._check_statuses(queue_entry, HqeStatus.RUNNING, HostStatus.RUNNING)294        self._finish_job(queue_entry)295        self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)296        self._assert_nothing_is_running()297    def _setup_for_pre_job_cleanup(self):298        self._initialize_test()299        job, queue_entry = self._make_job_and_queue_entry()300        job.reboot_before = model_attributes.RebootBefore.ALWAYS301        job.save()302        return queue_entry303    def _run_pre_job_cleanup_job(self, queue_entry):304        self._run_dispatcher()  # cleanup305        self._check_statuses(queue_entry, HqeStatus.VERIFYING,306                             HostStatus.CLEANING)307        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)308        self._run_dispatcher()  # verify309        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)310        self._run_dispatcher()  # job311        self._finish_job(queue_entry)312    def test_pre_job_cleanup(self):313        queue_entry = self._setup_for_pre_job_cleanup()314        self._run_pre_job_cleanup_job(queue_entry)315    def _run_pre_job_cleanup_one_failure(self):316        queue_entry = self._setup_for_pre_job_cleanup()317        self._run_dispatcher()  # cleanup318        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,319                                               exit_status=256)320        self._run_dispatcher()  # repair321        self._check_statuses(queue_entry, HqeStatus.QUEUED,322                             HostStatus.REPAIRING)323        self.mock_drone_manager.finish_process(_PidfileType.REPAIR)324        return queue_entry325    def test_pre_job_cleanup_failure(self):326        queue_entry = self._run_pre_job_cleanup_one_failure()327        # from here the job should run as normal328        self._run_pre_job_cleanup_job(queue_entry)329    def test_pre_job_cleanup_double_failure(self):330        # TODO (showard): this test isn't perfect.  in reality, when the second331        # cleanup fails, it copies its results over to the job directory using332        # copy_results_on_drone() and then parses them.  since we don't handle333        # that, there appear to be no results at the job directory.  the334        # scheduler handles this gracefully, parsing gets effectively skipped,335        # and this test passes as is.  but we ought to properly test that336        # behavior.337        queue_entry = self._run_pre_job_cleanup_one_failure()338        self._run_dispatcher()  # second cleanup339        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,340                                               exit_status=256)341        self._run_dispatcher()342        self._check_statuses(queue_entry, HqeStatus.FAILED,343                             HostStatus.REPAIR_FAILED)344        # nothing else should run345        self._assert_nothing_is_running()346    def _assert_nothing_is_running(self):347        self.assertEquals(self.mock_drone_manager.running_pidfile_ids(), [])348    def _setup_for_post_job_cleanup(self):349        self._initialize_test()350        job, queue_entry = self._make_job_and_queue_entry()351        job.reboot_after = model_attributes.RebootAfter.ALWAYS352        job.save()353        return queue_entry354    def _run_post_job_cleanup_failure_up_to_repair(self, queue_entry,355                                                   include_verify=True):356        if include_verify:357            self._run_pre_job_verify(queue_entry)358        self._run_dispatcher()  # job359        self.mock_drone_manager.finish_process(_PidfileType.JOB)360        self._run_dispatcher()  # parsing + cleanup361        self.mock_drone_manager.finish_process(_PidfileType.PARSE)362        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,363                                               exit_status=256)364        self._run_dispatcher()  # repair, HQE unaffected365        self.mock_drone_manager.finish_process(_PidfileType.ARCHIVE)366        self._run_dispatcher()367        return queue_entry368    def test_post_job_cleanup_failure(self):369        queue_entry = self._setup_for_post_job_cleanup()370        self._run_post_job_cleanup_failure_up_to_repair(queue_entry)371        self._check_statuses(queue_entry, HqeStatus.COMPLETED,372                             HostStatus.REPAIRING)373        self.mock_drone_manager.finish_process(_PidfileType.REPAIR)374        self._run_dispatcher()375        self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)376    def test_post_job_cleanup_failure_repair_failure(self):377        queue_entry = self._setup_for_post_job_cleanup()378        self._run_post_job_cleanup_failure_up_to_repair(queue_entry)379        self.mock_drone_manager.finish_process(_PidfileType.REPAIR,380                                               exit_status=256)381        self._run_dispatcher()382        self._check_statuses(queue_entry, HqeStatus.COMPLETED,383                             HostStatus.REPAIR_FAILED)384    def _ensure_post_job_process_is_paired(self, queue_entry, pidfile_type):385        pidfile_name = _PIDFILE_TYPE_TO_PIDFILE[pidfile_type]386        queue_entry = self._update_instance(queue_entry)387        pidfile_id = self.mock_drone_manager.pidfile_from_path(388            queue_entry.execution_path(), pidfile_name)389        self.assert_(pidfile_id._paired_with_pidfile)390    def _finish_job(self, queue_entry):391        self.mock_drone_manager.finish_process(_PidfileType.JOB)392        self._run_dispatcher()  # launches parsing + cleanup393        self._check_statuses(queue_entry, HqeStatus.PARSING,394                             HostStatus.CLEANING)395        self._ensure_post_job_process_is_paired(queue_entry, _PidfileType.PARSE)396        self._finish_parsing_and_cleanup(queue_entry)397    def _finish_parsing_and_cleanup(self, queue_entry):398        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)399        self.mock_drone_manager.finish_process(_PidfileType.PARSE)400        self._run_dispatcher()401        self._check_entry_status(queue_entry, HqeStatus.ARCHIVING)402        self.mock_drone_manager.finish_process(_PidfileType.ARCHIVE)403        self._run_dispatcher()404    def _create_reverify_request(self):405        host = self.hosts[0]406        models.SpecialTask.schedule_special_task(407            host=host, task=models.SpecialTask.Task.VERIFY)408        return host409    def test_requested_reverify(self):410        host = self._create_reverify_request()411        self._run_dispatcher()412        self._check_host_status(host, HostStatus.VERIFYING)413        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)414        self._run_dispatcher()415        self._check_host_status(host, HostStatus.READY)416    def test_requested_reverify_failure(self):417        host = self._create_reverify_request()418        self._run_dispatcher()419        self.mock_drone_manager.finish_process(_PidfileType.VERIFY,420                                               exit_status=256)421        self._run_dispatcher()  # repair422        self._check_host_status(host, HostStatus.REPAIRING)423        self.mock_drone_manager.finish_process(_PidfileType.REPAIR)424        self._run_dispatcher()425        self._check_host_status(host, HostStatus.READY)426    def _setup_for_do_not_verify(self):427        self._initialize_test()428        job, queue_entry = self._make_job_and_queue_entry()429        queue_entry.host.protection = host_protections.Protection.DO_NOT_VERIFY430        queue_entry.host.save()431        return queue_entry432    def test_do_not_verify_job(self):433        queue_entry = self._setup_for_do_not_verify()434        self._run_dispatcher()  # runs job directly435        self._finish_job(queue_entry)436    def test_do_not_verify_job_with_cleanup(self):437        queue_entry = self._setup_for_do_not_verify()438        queue_entry.job.reboot_before = model_attributes.RebootBefore.ALWAYS439        queue_entry.job.save()440        self._run_dispatcher()  # cleanup441        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)442        self._run_dispatcher()  # job443        self._finish_job(queue_entry)444    def test_do_not_verify_pre_job_cleanup_failure(self):445        queue_entry = self._setup_for_do_not_verify()446        queue_entry.job.reboot_before = model_attributes.RebootBefore.ALWAYS447        queue_entry.job.save()448        self._run_dispatcher()  # cleanup449        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,450                                               exit_status=256)451        self._run_dispatcher()  # failure ignored; job runs452        self._finish_job(queue_entry)453    def test_do_not_verify_post_job_cleanup_failure(self):454        queue_entry = self._setup_for_do_not_verify()455        self._run_post_job_cleanup_failure_up_to_repair(queue_entry,456                                                        include_verify=False)457        # failure ignored, host still set to Ready458        self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)459        self._run_dispatcher()  # nothing else runs460        self._assert_nothing_is_running()461    def test_do_not_verify_requested_reverify_failure(self):462        host = self._create_reverify_request()463        host.protection = host_protections.Protection.DO_NOT_VERIFY464        host.save()465        self._run_dispatcher()466        self.mock_drone_manager.finish_process(_PidfileType.VERIFY,467                                               exit_status=256)468        self._run_dispatcher()469        self._check_host_status(host, HostStatus.READY)  # ignore failure470        self._assert_nothing_is_running()471    def test_job_abort_in_verify(self):472        self._initialize_test()473        job = self._create_job(hosts=[1])474        self._run_dispatcher()  # launches verify475        job.hostqueueentry_set.update(aborted=True)476        self._run_dispatcher()  # kills verify, launches cleanup477        self.assert_(self.mock_drone_manager.was_last_process_killed(478            _PidfileType.VERIFY))479        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)480        self._run_dispatcher()481    def test_job_abort(self):482        self._initialize_test()483        job = self._create_job(hosts=[1])484        job.run_verify = False485        job.save()486        self._run_dispatcher()  # launches job487        job.hostqueueentry_set.update(aborted=True)488        self._run_dispatcher()  # kills job, launches gathering489        self.assert_(self.mock_drone_manager.was_last_process_killed(490            _PidfileType.JOB))491        self.mock_drone_manager.finish_process(_PidfileType.GATHER)492        self._run_dispatcher()  # launches parsing + cleanup493        queue_entry = job.hostqueueentry_set.all()[0]494        self._finish_parsing_and_cleanup(queue_entry)495    def test_job_abort_queued_synchronous(self):496        self._initialize_test()497        job = self._create_job(hosts=[1, 2])498        job.synch_count = 2499        job.save()500        job.hostqueueentry_set.update(aborted=True)501        self._run_dispatcher()502        for host_queue_entry in job.hostqueueentry_set.all():503            self.assertEqual(host_queue_entry.status,504                             HqeStatus.ABORTED)505    def test_no_pidfile_leaking(self):506        self._initialize_test()507        self.test_simple_job()508        self.assertEquals(self.mock_drone_manager._pidfiles, {})509        self.test_job_abort_in_verify()510        self.assertEquals(self.mock_drone_manager._pidfiles, {})511        self.test_job_abort()512        self.assertEquals(self.mock_drone_manager._pidfiles, {})513    def _make_job_and_queue_entry(self):514        job = self._create_job(hosts=[1])515        queue_entry = job.hostqueueentry_set.all()[0]516        return job, queue_entry517    def test_recover_running_no_process(self):518        # recovery should re-execute a Running HQE if no process is found519        _, queue_entry = self._make_job_and_queue_entry()520        queue_entry.status = HqeStatus.RUNNING521        queue_entry.execution_subdir = '1-myuser/host1'522        queue_entry.save()523        queue_entry.host.status = HostStatus.RUNNING524        queue_entry.host.save()525        self._initialize_test()526        self._run_dispatcher()527        self._finish_job(queue_entry)528    def test_recover_verifying_hqe_no_special_task(self):529        # recovery should fail on a Verifing HQE with no corresponding530        # Verify or Cleanup SpecialTask531        _, queue_entry = self._make_job_and_queue_entry()532        queue_entry.status = HqeStatus.VERIFYING533        queue_entry.save()534        # make some dummy SpecialTasks that shouldn't count535        models.SpecialTask.objects.create(536            host=queue_entry.host,537            task=models.SpecialTask.Task.VERIFY,538            requested_by=models.User.current_user())539        models.SpecialTask.objects.create(540            host=queue_entry.host,541            task=models.SpecialTask.Task.CLEANUP,542            queue_entry=queue_entry,543            is_complete=True,544            requested_by=models.User.current_user())545        self.assertRaises(host_scheduler.SchedulerError, self._initialize_test)546    def _test_recover_verifying_hqe_helper(self, task, pidfile_type):547        _, queue_entry = self._make_job_and_queue_entry()548        queue_entry.status = HqeStatus.VERIFYING549        queue_entry.save()550        special_task = models.SpecialTask.objects.create(551            host=queue_entry.host, task=task, queue_entry=queue_entry)552        self._initialize_test()553        self._run_dispatcher()554        self.mock_drone_manager.finish_process(pidfile_type)555        self._run_dispatcher()556        # don't bother checking the rest of the job execution, as long as the557        # SpecialTask ran558    def test_recover_verifying_hqe_with_cleanup(self):559        # recover an HQE that was in pre-job cleanup560        self._test_recover_verifying_hqe_helper(models.SpecialTask.Task.CLEANUP,561                                                _PidfileType.CLEANUP)562    def test_recover_verifying_hqe_with_verify(self):563        # recover an HQE that was in pre-job verify564        self._test_recover_verifying_hqe_helper(models.SpecialTask.Task.VERIFY,565                                                _PidfileType.VERIFY)566    def test_recover_pending_hqes_with_group(self):567        # recover a group of HQEs that are in Pending, in the same group (e.g.,568        # in a job with atomic hosts)569        job = self._create_job(hosts=[1, 2], atomic_group=1)570        job.save()571        job.hostqueueentry_set.all().update(status=HqeStatus.PENDING)572        self._initialize_test()573        for queue_entry in job.hostqueueentry_set.all():574            self.assertEquals(queue_entry.status, HqeStatus.STARTING)575    def test_recover_parsing(self):576        self._initialize_test()577        job, queue_entry = self._make_job_and_queue_entry()578        job.run_verify = False579        job.reboot_after = model_attributes.RebootAfter.NEVER580        job.save()581        self._run_dispatcher()  # launches job582        self.mock_drone_manager.finish_process(_PidfileType.JOB)583        self._run_dispatcher()  # launches parsing584        # now "restart" the scheduler585        self._create_dispatcher()586        self._initialize_test()587        self._run_dispatcher()588        self.mock_drone_manager.finish_process(_PidfileType.PARSE)589        self._run_dispatcher()590    def test_recover_parsing__no_process_already_aborted(self):591        _, queue_entry = self._make_job_and_queue_entry()592        queue_entry.execution_subdir = 'host1'593        queue_entry.status = HqeStatus.PARSING594        queue_entry.aborted = True595        queue_entry.save()596        self._initialize_test()597        self._run_dispatcher()598    def test_job_scheduled_just_after_abort(self):599        # test a pretty obscure corner case where a job is aborted while queued,600        # another job is ready to run, and throttling is active. the post-abort601        # cleanup must not be pre-empted by the second job.602        job1, queue_entry1 = self._make_job_and_queue_entry()603        job2, queue_entry2 = self._make_job_and_queue_entry()604        self.mock_drone_manager.process_capacity = 0605        self._run_dispatcher()  # schedule job1, but won't start verify606        job1.hostqueueentry_set.update(aborted=True)607        self.mock_drone_manager.process_capacity = 100608        self._run_dispatcher()  # cleanup must run here, not verify for job2609        self._check_statuses(queue_entry1, HqeStatus.ABORTED,610                             HostStatus.CLEANING)611        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)612        self._run_dispatcher()  # now verify starts for job2613        self._check_statuses(queue_entry2, HqeStatus.VERIFYING,614                             HostStatus.VERIFYING)615    def test_reverify_interrupting_pre_job(self):616        # ensure things behave sanely if a reverify is scheduled in the middle617        # of pre-job actions618        _, queue_entry = self._make_job_and_queue_entry()619        self._run_dispatcher()  # pre-job verify620        self._create_reverify_request()621        self.mock_drone_manager.finish_process(_PidfileType.VERIFY,622                                               exit_status=256)623        self._run_dispatcher()  # repair624        self.mock_drone_manager.finish_process(_PidfileType.REPAIR)625        self._run_dispatcher()  # reverify runs now626        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)627        self._run_dispatcher()  # pre-job verify628        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)629        self._run_dispatcher()  # and job runs...630        self._check_statuses(queue_entry, HqeStatus.RUNNING, HostStatus.RUNNING)631        self._finish_job(queue_entry)  # reverify has been deleted632        self._check_statuses(queue_entry, HqeStatus.COMPLETED,633                             HostStatus.READY)634        self._assert_nothing_is_running()635    def test_reverify_while_job_running(self):636        # once a job is running, a reverify must not be allowed to preempt637        # Gathering638        _, queue_entry = self._make_job_and_queue_entry()639        self._run_pre_job_verify(queue_entry)640        self._run_dispatcher()  # job runs641        self._create_reverify_request()642        # make job end with a signal, so gathering will run643        self.mock_drone_manager.finish_process(_PidfileType.JOB,644                                               exit_status=271)645        self._run_dispatcher()  # gathering must start646        self.mock_drone_manager.finish_process(_PidfileType.GATHER)647        self._run_dispatcher()  # parsing and cleanup648        self._finish_parsing_and_cleanup(queue_entry)649        self._run_dispatcher()  # now reverify runs650        self._check_statuses(queue_entry, HqeStatus.FAILED,651                             HostStatus.VERIFYING)652        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)653        self._run_dispatcher()654        self._check_host_status(queue_entry.host, HostStatus.READY)655    def test_reverify_while_host_pending(self):656        # ensure that if a reverify is scheduled while a host is in Pending, it657        # won't run until the host is actually free658        job = self._create_job(hosts=[1, 2])659        queue_entry = job.hostqueueentry_set.get(host__hostname='host1')660        job.synch_count = 2661        job.save()662        host2 = self.hosts[1]663        host2.locked = True664        host2.save()665        self._run_dispatcher()  # verify host1666        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)667        self._run_dispatcher()  # host1 Pending668        self._check_statuses(queue_entry, HqeStatus.PENDING, HostStatus.PENDING)669        self._create_reverify_request()670        self._run_dispatcher()  # nothing should happen here671        self._check_statuses(queue_entry, HqeStatus.PENDING, HostStatus.PENDING)672        # now let the job run673        host2.locked = False674        host2.save()675        self._run_dispatcher()  # verify host2676        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)677        self._run_dispatcher()  # run job678        self._finish_job(queue_entry)679        # need to explicitly finish host1's post-job cleanup680        self.mock_drone_manager.finish_specific_process(681            'hosts/host1/4-cleanup', drone_manager.AUTOSERV_PID_FILE)682        self._run_dispatcher()683        # the reverify should now be running684        self._check_statuses(queue_entry, HqeStatus.COMPLETED,685                             HostStatus.VERIFYING)686        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)687        self._run_dispatcher()688        self._check_host_status(queue_entry.host, HostStatus.READY)689    def test_throttling(self):690        job = self._create_job(hosts=[1, 2, 3])691        job.synch_count = 3692        job.save()693        queue_entries = list(job.hostqueueentry_set.all())694        def _check_hqe_statuses(*statuses):695            for queue_entry, status in zip(queue_entries, statuses):696                self._check_statuses(queue_entry, status)697        self.mock_drone_manager.process_capacity = 2698        self._run_dispatcher()  # verify runs on 1 and 2699        _check_hqe_statuses(HqeStatus.VERIFYING, HqeStatus.VERIFYING,700                            HqeStatus.VERIFYING)701        self.assertEquals(len(self.mock_drone_manager.running_pidfile_ids()), 2)702        self.mock_drone_manager.finish_specific_process(703            'hosts/host1/1-verify', drone_manager.AUTOSERV_PID_FILE)704        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)705        self._run_dispatcher()  # verify runs on 3706        _check_hqe_statuses(HqeStatus.PENDING, HqeStatus.PENDING,707                            HqeStatus.VERIFYING)708        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)709        self._run_dispatcher()  # job won't run due to throttling710        _check_hqe_statuses(HqeStatus.STARTING, HqeStatus.STARTING,711                            HqeStatus.STARTING)712        self._assert_nothing_is_running()713        self.mock_drone_manager.process_capacity = 3714        self._run_dispatcher()  # now job runs715        _check_hqe_statuses(HqeStatus.RUNNING, HqeStatus.RUNNING,716                            HqeStatus.RUNNING)717        self.mock_drone_manager.process_capacity = 2718        self.mock_drone_manager.finish_process(_PidfileType.JOB,719                                               exit_status=271)720        self._run_dispatcher()  # gathering won't run due to throttling721        _check_hqe_statuses(HqeStatus.GATHERING, HqeStatus.GATHERING,722                            HqeStatus.GATHERING)723        self._assert_nothing_is_running()724        self.mock_drone_manager.process_capacity = 3725        self._run_dispatcher()  # now gathering runs726        self.mock_drone_manager.process_capacity = 0727        self.mock_drone_manager.finish_process(_PidfileType.GATHER)728        self._run_dispatcher()  # parsing runs despite throttling729        _check_hqe_statuses(HqeStatus.PARSING, HqeStatus.PARSING,730                            HqeStatus.PARSING)731    def test_abort_starting_while_throttling(self):732        self._initialize_test()733        job = self._create_job(hosts=[1, 2], synchronous=True)734        queue_entry = job.hostqueueentry_set.all()[0]735        job.run_verify = False736        job.reboot_after = model_attributes.RebootAfter.NEVER737        job.save()738        self.mock_drone_manager.process_capacity = 0739        self._run_dispatcher()  # go to starting, but don't start job740        self._check_statuses(queue_entry, HqeStatus.STARTING,741                             HostStatus.PENDING)742        job.hostqueueentry_set.update(aborted=True)743        self._run_dispatcher()744        self._check_statuses(queue_entry, HqeStatus.GATHERING,745                             HostStatus.RUNNING)746        self.mock_drone_manager.process_capacity = 5...test_get_contracts.py
Source:test_get_contracts.py  
...5from layer_linter.dependencies import ImportPath6from layer_linter.module import Module7class TestGetContracts:8    def test_happy_path(self):9        self._initialize_test()10        contracts = get_contracts(self.filename_and_path, package_name='singlecontractfile')11        assert len(contracts) == 212        expected_contracts = [13            {14                'name': 'Contract A',15                'packages': ['singlecontractfile.foo', 'singlecontractfile.bar'],16                'layers': ['one', 'two'],17            },18            {19                'name': 'Contract B',20                'packages': ['singlecontractfile'],21                'layers': ['one', 'two', 'three'],22                'whitelisted_paths': [23                    ('baz.utils', 'baz.three.green'),24                    ('baz.three.blue', 'baz.two'),25                ],26            },27        ]28        sorted_contracts = sorted(contracts, key=lambda i: i.name)29        for contract_index, contract in enumerate(sorted_contracts):30            expected_data = expected_contracts[contract_index]31            assert contract.name == expected_data['name']32            for package_index, package in enumerate(contract.containers):33                expected_package_name = expected_data['packages'][package_index]34                assert package == Module(expected_package_name)35            for layer_index, layer in enumerate(contract.layers):36                expected_layer_data = expected_data['layers'][layer_index]37                assert isinstance(layer, Layer)38                assert layer.name == expected_layer_data39            for whitelisted_index, whitelisted_path in enumerate(contract.whitelisted_paths):40                expected_importer, expected_imported = expected_data['whitelisted_paths'][41                    whitelisted_index]42                assert isinstance(whitelisted_path, ImportPath)43                assert whitelisted_path.importer == Module(expected_importer)44                assert whitelisted_path.imported == Module(expected_imported)45    def test_container_does_not_exist(self):46        self._initialize_test('layers_with_missing_container.yml')47        with pytest.raises(ValueError) as e:48            get_contracts(self.filename_and_path, package_name='singlecontractfile')49        assert str(e.value) == "Invalid container 'singlecontractfile.missing': no such package."50    def _initialize_test(self, config_filename='layers.yml'):51        # Append the package directory to the path.52        dirname = os.path.dirname(__file__)53        package_dirname = os.path.join(dirname, '..', 'assets', 'singlecontractfile')54        sys.path.append(package_dirname)55        # Set the full config filename and path as an instance attribute....Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
