Best Python code snippet using tempest_python
test_testresult.py
Source:test_testresult.py  
...181        self.assertTrue(result.wasSuccessful())182    def test_startStopTestRun(self):183        # Calling startTestRun completes ok.184        result = self.makeResult()185        result.startTestRun()186        result.stopTestRun()187    def test_failfast(self):188        result = self.makeResult()189        result.failfast = True190        class Failing(TestCase):191            def test_a(self):192                self.fail('a')193            def test_b(self):194                self.fail('b')195        TestSuite([Failing('test_a'), Failing('test_b')]).run(result)196        self.assertEqual(1, result.testsRun)197class TagsContract(Python27Contract):198    """Tests to ensure correct tagging behaviour.199    See the subunit docs for guidelines on how this is supposed to work.200    """201    def test_no_tags_by_default(self):202        # Results initially have no tags.203        result = self.makeResult()204        result.startTestRun()205        self.assertEqual(frozenset(), result.current_tags)206    def test_adding_tags(self):207        # Tags are added using 'tags' and thus become visible in208        # 'current_tags'.209        result = self.makeResult()210        result.startTestRun()211        result.tags(set(['foo']), set())212        self.assertEqual(set(['foo']), result.current_tags)213    def test_removing_tags(self):214        # Tags are removed using 'tags'.215        result = self.makeResult()216        result.startTestRun()217        result.tags(set(['foo']), set())218        result.tags(set(), set(['foo']))219        self.assertEqual(set(), result.current_tags)220    def test_startTestRun_resets_tags(self):221        # startTestRun makes a new test run, and thus clears all the tags.222        result = self.makeResult()223        result.startTestRun()224        result.tags(set(['foo']), set())225        result.startTestRun()226        self.assertEqual(set(), result.current_tags)227    def test_add_tags_within_test(self):228        # Tags can be added after a test has run.229        result = self.makeResult()230        result.startTestRun()231        result.tags(set(['foo']), set())232        result.startTest(self)233        result.tags(set(['bar']), set())234        self.assertEqual(set(['foo', 'bar']), result.current_tags)235    def test_tags_added_in_test_are_reverted(self):236        # Tags added during a test run are then reverted once that test has237        # finished.238        result = self.makeResult()239        result.startTestRun()240        result.tags(set(['foo']), set())241        result.startTest(self)242        result.tags(set(['bar']), set())243        result.addSuccess(self)244        result.stopTest(self)245        self.assertEqual(set(['foo']), result.current_tags)246    def test_tags_removed_in_test(self):247        # Tags can be removed during tests.248        result = self.makeResult()249        result.startTestRun()250        result.tags(set(['foo']), set())251        result.startTest(self)252        result.tags(set(), set(['foo']))253        self.assertEqual(set(), result.current_tags)254    def test_tags_removed_in_test_are_restored(self):255        # Tags removed during tests are restored once that test has finished.256        result = self.makeResult()257        result.startTestRun()258        result.tags(set(['foo']), set())259        result.startTest(self)260        result.tags(set(), set(['foo']))261        result.addSuccess(self)262        result.stopTest(self)263        self.assertEqual(set(['foo']), result.current_tags)264class DetailsContract(TagsContract):265    """Tests for the details API of TestResults."""266    def test_addExpectedFailure_details(self):267        # Calling addExpectedFailure(test, details=xxx) completes ok.268        result = self.makeResult()269        result.startTest(self)270        result.addExpectedFailure(self, details={})271    def test_addError_details(self):272        # Calling addError(test, details=xxx) completes ok.273        result = self.makeResult()274        result.startTest(self)275        result.addError(self, details={})276    def test_addFailure_details(self):277        # Calling addFailure(test, details=xxx) completes ok.278        result = self.makeResult()279        result.startTest(self)280        result.addFailure(self, details={})281    def test_addSkipped_details(self):282        # Calling addSkip(test, reason) completes ok.283        result = self.makeResult()284        result.startTest(self)285        result.addSkip(self, details={})286    def test_addUnexpectedSuccess_details(self):287        # Calling addUnexpectedSuccess(test) completes ok.288        result = self.makeResult()289        result.startTest(self)290        result.addUnexpectedSuccess(self, details={})291    def test_addSuccess_details(self):292        # Calling addSuccess(test) completes ok.293        result = self.makeResult()294        result.startTest(self)295        result.addSuccess(self, details={})296class FallbackContract(DetailsContract):297    """When we fallback we take our policy choice to map calls.298    For instance, we map unexpectedSuccess to an error code, not to success.299    """300    def test_addUnexpectedSuccess_was_successful(self):301        # addUnexpectedSuccess fails test run in testtools.302        result = self.makeResult()303        result.startTest(self)304        result.addUnexpectedSuccess(self)305        result.stopTest(self)306        self.assertFalse(result.wasSuccessful())307class StartTestRunContract(FallbackContract):308    """Defines the contract for testtools policy choices.309    That is things which are not simply extensions to unittest but choices we310    have made differently.311    """312    def test_startTestRun_resets_unexpected_success(self):313        result = self.makeResult()314        result.startTest(self)315        result.addUnexpectedSuccess(self)316        result.stopTest(self)317        result.startTestRun()318        self.assertTrue(result.wasSuccessful())319    def test_startTestRun_resets_failure(self):320        result = self.makeResult()321        result.startTest(self)322        result.addFailure(self, an_exc_info)323        result.stopTest(self)324        result.startTestRun()325        self.assertTrue(result.wasSuccessful())326    def test_startTestRun_resets_errors(self):327        result = self.makeResult()328        result.startTest(self)329        result.addError(self, an_exc_info)330        result.stopTest(self)331        result.startTestRun()332        self.assertTrue(result.wasSuccessful())333class TestTestResultContract(TestCase, StartTestRunContract):334    run_test_with = FullStackRunTest335    def makeResult(self):336        return TestResult()337class TestMultiTestResultContract(TestCase, StartTestRunContract):338    run_test_with = FullStackRunTest339    def makeResult(self):340        return MultiTestResult(TestResult(), TestResult())341class TestTextTestResultContract(TestCase, StartTestRunContract):342    run_test_with = FullStackRunTest343    def makeResult(self):344        return TextTestResult(StringIO())345class TestThreadSafeForwardingResultContract(TestCase, StartTestRunContract):346    run_test_with = FullStackRunTest347    def makeResult(self):348        result_semaphore = threading.Semaphore(1)349        target = TestResult()350        return ThreadsafeForwardingResult(target, result_semaphore)351class TestExtendedTestResultContract(TestCase, StartTestRunContract):352    def makeResult(self):353        return ExtendedTestResult()354class TestPython26TestResultContract(TestCase, Python26Contract):355    def makeResult(self):356        return Python26TestResult()357class TestAdaptedPython26TestResultContract(TestCase, FallbackContract):358    def makeResult(self):359        return ExtendedToOriginalDecorator(Python26TestResult())360class TestPython27TestResultContract(TestCase, Python27Contract):361    def makeResult(self):362        return Python27TestResult()363class TestAdaptedPython27TestResultContract(TestCase, DetailsContract):364    def makeResult(self):365        return ExtendedToOriginalDecorator(Python27TestResult())366class TestAdaptedStreamResult(TestCase, DetailsContract):367    def makeResult(self):368        return ExtendedToStreamDecorator(StreamResult())369class TestTestResultDecoratorContract(TestCase, StartTestRunContract):370    run_test_with = FullStackRunTest371    def makeResult(self):372        return TestResultDecorator(TestResult())373# DetailsContract because ExtendedToStreamDecorator follows Python for374# uxsuccess handling.375class TestStreamToExtendedContract(TestCase, DetailsContract):376    def makeResult(self):377        return ExtendedToStreamDecorator(378            StreamToExtendedDecorator(ExtendedTestResult()))379class TestStreamResultContract(object):380    def _make_result(self):381        raise NotImplementedError(self._make_result)382    def test_startTestRun(self):383        result = self._make_result()384        result.startTestRun()385        result.stopTestRun()386    def test_files(self):387        # Test parameter combinations when files are being emitted.388        result = self._make_result()389        result.startTestRun()390        self.addCleanup(result.stopTestRun)391        now = datetime.datetime.now(utc)392        inputs = list(dict(393            eof=True,394            mime_type="text/plain",395            route_code=_u("1234"),396            test_id=_u("foo"),397            timestamp=now,398            ).items())399        param_dicts = self._power_set(inputs)400        for kwargs in param_dicts:401            result.status(file_name=_u("foo"), file_bytes=_b(""), **kwargs)402            result.status(file_name=_u("foo"), file_bytes=_b("bar"), **kwargs)403    def test_test_status(self):404        # Tests non-file attachment parameter combinations.405        result = self._make_result()406        result.startTestRun()407        self.addCleanup(result.stopTestRun)408        now = datetime.datetime.now(utc)409        args = [[_u("foo"), s] for s in ['exists', 'inprogress', 'xfail',410            'uxsuccess', 'success', 'fail', 'skip']]411        inputs = list(dict(412            runnable=False,413            test_tags=set(['quux']),414            route_code=_u("1234"),415            timestamp=now,416            ).items())417        param_dicts = self._power_set(inputs)418        for kwargs in param_dicts:419            for arg in args:420                result.status(test_id=arg[0], test_status=arg[1], **kwargs)421    def _power_set(self, iterable):422        "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"423        s = list(iterable)424        param_dicts = []425        for ss in chain.from_iterable(combinations(s, r) for r in range(len(s)+1)):426            param_dicts.append(dict(ss))427        return param_dicts428class TestBaseStreamResultContract(TestCase, TestStreamResultContract):429    def _make_result(self):430        return StreamResult()431class TestCopyStreamResultContract(TestCase, TestStreamResultContract):432    def _make_result(self):433        return CopyStreamResult([StreamResult(), StreamResult()])434class TestDoubleStreamResultContract(TestCase, TestStreamResultContract):435    def _make_result(self):436        return LoggingStreamResult()437class TestExtendedToStreamDecoratorContract(TestCase, TestStreamResultContract):438    def _make_result(self):439        return ExtendedToStreamDecorator(StreamResult())440class TestStreamSummaryResultContract(TestCase, TestStreamResultContract):441    def _make_result(self):442        return StreamSummary()443class TestStreamTaggerContract(TestCase, TestStreamResultContract):444    def _make_result(self):445        return StreamTagger([StreamResult()], add=set(), discard=set())446class TestStreamToDictContract(TestCase, TestStreamResultContract):447    def _make_result(self):448        return StreamToDict(lambda x:None)449class TestStreamToExtendedDecoratorContract(TestCase, TestStreamResultContract):450    def _make_result(self):451        return StreamToExtendedDecorator(ExtendedTestResult())452class TestStreamToQueueContract(TestCase, TestStreamResultContract):453    def _make_result(self):454        queue = Queue()455        return StreamToQueue(queue, "foo")456class TestStreamFailFastContract(TestCase, TestStreamResultContract):457    def _make_result(self):458        return StreamFailFast(lambda:None)459class TestStreamResultRouterContract(TestCase, TestStreamResultContract):460    def _make_result(self):461        return StreamResultRouter(StreamResult())462class TestDoubleStreamResultEvents(TestCase):463    def test_startTestRun(self):464        result = LoggingStreamResult()465        result.startTestRun()466        self.assertEqual([('startTestRun',)], result._events)467    def test_stopTestRun(self):468        result = LoggingStreamResult()469        result.startTestRun()470        result.stopTestRun()471        self.assertEqual([('startTestRun',), ('stopTestRun',)], result._events)472    def test_file(self):473        result = LoggingStreamResult()474        result.startTestRun()475        now = datetime.datetime.now(utc)476        result.status(file_name="foo", file_bytes="bar", eof=True, mime_type="text/json",477            test_id="id", route_code='abc', timestamp=now)478        self.assertEqual(479            [('startTestRun',),480             ('status', 'id', None, None, True, 'foo', 'bar', True, 'text/json', 'abc', now)],481            result._events)482    def test_status(self):483        result = LoggingStreamResult()484        result.startTestRun()485        now = datetime.datetime.now(utc)486        result.status("foo", "success", test_tags=set(['tag']),487            runnable=False, route_code='abc', timestamp=now)488        self.assertEqual(489            [('startTestRun',),490             ('status', 'foo', 'success', set(['tag']), False, None, None, False, None, 'abc', now)],491            result._events)492class TestCopyStreamResultCopies(TestCase):493    def setUp(self):494        super(TestCopyStreamResultCopies, self).setUp()495        self.target1 = LoggingStreamResult()496        self.target2 = LoggingStreamResult()497        self.targets = [self.target1._events, self.target2._events]498        self.result = CopyStreamResult([self.target1, self.target2])499    def test_startTestRun(self):500        self.result.startTestRun()501        self.assertThat(self.targets, AllMatch(Equals([('startTestRun',)])))502    def test_stopTestRun(self):503        self.result.startTestRun()504        self.result.stopTestRun()505        self.assertThat(self.targets,506            AllMatch(Equals([('startTestRun',), ('stopTestRun',)])))507    def test_status(self):508        self.result.startTestRun()509        now = datetime.datetime.now(utc)510        self.result.status("foo", "success", test_tags=set(['tag']),511            runnable=False, file_name="foo", file_bytes=b'bar', eof=True,512            mime_type="text/json", route_code='abc', timestamp=now)513        self.assertThat(self.targets,514            AllMatch(Equals([('startTestRun',),515                ('status', 'foo', 'success', set(['tag']), False, "foo",516                 b'bar', True, "text/json", 'abc', now)517                ])))518class TestStreamTagger(TestCase):519    def test_adding(self):520        log = LoggingStreamResult()521        result = StreamTagger([log], add=['foo'])522        result.startTestRun()523        result.status()524        result.status(test_tags=set(['bar']))525        result.status(test_tags=None)526        result.stopTestRun()527        self.assertEqual([528            ('startTestRun',),529            ('status', None, None, set(['foo']), True, None, None, False, None, None, None),530            ('status', None, None, set(['foo', 'bar']), True, None, None, False, None, None, None),531            ('status', None, None, set(['foo']), True, None, None, False, None, None, None),532            ('stopTestRun',),533            ], log._events)534    def test_discarding(self):535        log = LoggingStreamResult()536        result = StreamTagger([log], discard=['foo'])537        result.startTestRun()538        result.status()539        result.status(test_tags=None)540        result.status(test_tags=set(['foo']))541        result.status(test_tags=set(['bar']))542        result.status(test_tags=set(['foo', 'bar']))543        result.stopTestRun()544        self.assertEqual([545            ('startTestRun',),546            ('status', None, None, None, True, None, None, False, None, None, None),547            ('status', None, None, None, True, None, None, False, None, None, None),548            ('status', None, None, None, True, None, None, False, None, None, None),549            ('status', None, None, set(['bar']), True, None, None, False, None, None, None),550            ('status', None, None, set(['bar']), True, None, None, False, None, None, None),551            ('stopTestRun',),552            ], log._events)553class TestStreamToDict(TestCase):554    def test_hung_test(self):555        tests = []556        result = StreamToDict(tests.append)557        result.startTestRun()558        result.status('foo', 'inprogress')559        self.assertEqual([], tests)560        result.stopTestRun()561        self.assertEqual([562            {'id': 'foo', 'tags': set(), 'details': {}, 'status': 'inprogress',563             'timestamps': [None, None]}564            ], tests)565    def test_all_terminal_states_reported(self):566        tests = []567        result = StreamToDict(tests.append)568        result.startTestRun()569        result.status('success', 'success')570        result.status('skip', 'skip')571        result.status('exists', 'exists')572        result.status('fail', 'fail')573        result.status('xfail', 'xfail')574        result.status('uxsuccess', 'uxsuccess')575        self.assertThat(tests, HasLength(6))576        self.assertEqual(577            ['success', 'skip', 'exists', 'fail', 'xfail', 'uxsuccess'],578            [test['id'] for test in tests])579        result.stopTestRun()580        self.assertThat(tests, HasLength(6))581    def test_files_reported(self):582        tests = []583        result = StreamToDict(tests.append)584        result.startTestRun()585        result.status(file_name="some log.txt",586            file_bytes=_b("1234 log message"), eof=True,587            mime_type="text/plain; charset=utf8", test_id="foo.bar")588        result.status(file_name="another file",589            file_bytes=_b("""Traceback..."""), test_id="foo.bar")590        result.stopTestRun()591        self.assertThat(tests, HasLength(1))592        test = tests[0]593        self.assertEqual("foo.bar", test['id'])594        self.assertEqual("unknown", test['status'])595        details = test['details']596        self.assertEqual(597            _u("1234 log message"), details['some log.txt'].as_text())598        self.assertEqual(599            _b("Traceback..."),600            _b('').join(details['another file'].iter_bytes()))601        self.assertEqual(602            "application/octet-stream", repr(details['another file'].content_type))603    def test_bad_mime(self):604        # Testtools was making bad mime types, this tests that the specific605        # corruption is catered for.606        tests = []607        result = StreamToDict(tests.append)608        result.startTestRun()609        result.status(file_name="file", file_bytes=b'a',610            mime_type='text/plain; charset=utf8, language=python',611            test_id='id')612        result.stopTestRun()613        self.assertThat(tests, HasLength(1))614        test = tests[0]615        self.assertEqual("id", test['id'])616        details = test['details']617        self.assertEqual(_u("a"), details['file'].as_text())618        self.assertEqual(619            "text/plain; charset=\"utf8\"",620            repr(details['file'].content_type))621    def test_timestamps(self):622        tests = []623        result = StreamToDict(tests.append)624        result.startTestRun()625        result.status(test_id='foo', test_status='inprogress', timestamp="A")626        result.status(test_id='foo', test_status='success', timestamp="B")627        result.status(test_id='bar', test_status='inprogress', timestamp="C")628        result.stopTestRun()629        self.assertThat(tests, HasLength(2))630        self.assertEqual(["A", "B"], tests[0]['timestamps'])631        self.assertEqual(["C", None], tests[1]['timestamps'])632class TestExtendedToStreamDecorator(TestCase):633    def test_explicit_time(self):634        log = LoggingStreamResult()635        result = ExtendedToStreamDecorator(log)636        result.startTestRun()637        now = datetime.datetime.now(utc)638        result.time(now)639        result.startTest(self)640        result.addSuccess(self)641        result.stopTest(self)642        result.stopTestRun()643        self.assertEqual([644            ('startTestRun',),645            ('status',646             'testtools.tests.test_testresult.TestExtendedToStreamDecorator.test_explicit_time',647             'inprogress',648             None,649             True,650             None,651             None,652             False,653             None,654             None,655             now),656            ('status',657             'testtools.tests.test_testresult.TestExtendedToStreamDecorator.test_explicit_time',658             'success',659              set(),660              True,661              None,662              None,663              False,664              None,665              None,666              now),667             ('stopTestRun',)], log._events)668    def test_wasSuccessful_after_stopTestRun(self):669        log = LoggingStreamResult()670        result = ExtendedToStreamDecorator(log)671        result.startTestRun()672        result.status(test_id='foo', test_status='fail')673        result.stopTestRun()674        self.assertEqual(False, result.wasSuccessful())675        676class TestStreamFailFast(TestCase):677    def test_inprogress(self):678        result = StreamFailFast(self.fail)679        result.status('foo', 'inprogress')680    def test_exists(self):681        result = StreamFailFast(self.fail)682        result.status('foo', 'exists')683    def test_xfail(self):684        result = StreamFailFast(self.fail)685        result.status('foo', 'xfail')686    def test_uxsuccess(self):687        calls = []688        def hook():689            calls.append("called")690        result = StreamFailFast(hook)691        result.status('foo', 'uxsuccess')692        result.status('foo', 'uxsuccess')693        self.assertEqual(['called', 'called'], calls)694    def test_success(self):695        result = StreamFailFast(self.fail)696        result.status('foo', 'success')697    def test_fail(self):698        calls = []699        def hook():700            calls.append("called")701        result = StreamFailFast(hook)702        result.status('foo', 'fail')703        result.status('foo', 'fail')704        self.assertEqual(['called', 'called'], calls)705    def test_skip(self):706        result = StreamFailFast(self.fail)707        result.status('foo', 'skip')708class TestStreamSummary(TestCase):709    def test_attributes(self):710        result = StreamSummary()711        result.startTestRun()712        self.assertEqual([], result.failures)713        self.assertEqual([], result.errors)714        self.assertEqual([], result.skipped)715        self.assertEqual([], result.expectedFailures)716        self.assertEqual([], result.unexpectedSuccesses)717        self.assertEqual(0, result.testsRun)718    def test_startTestRun(self):719        result = StreamSummary()720        result.startTestRun()721        result.failures.append('x')722        result.errors.append('x')723        result.skipped.append('x')724        result.expectedFailures.append('x')725        result.unexpectedSuccesses.append('x')726        result.testsRun = 1727        result.startTestRun()728        self.assertEqual([], result.failures)729        self.assertEqual([], result.errors)730        self.assertEqual([], result.skipped)731        self.assertEqual([], result.expectedFailures)732        self.assertEqual([], result.unexpectedSuccesses)733        self.assertEqual(0, result.testsRun)734    def test_wasSuccessful(self):735        # wasSuccessful returns False if any of736        # failures/errors is non-empty.737        result = StreamSummary()738        result.startTestRun()739        self.assertEqual(True, result.wasSuccessful())740        result.failures.append('x')741        self.assertEqual(False, result.wasSuccessful())742        result.startTestRun()743        result.errors.append('x')744        self.assertEqual(False, result.wasSuccessful())745        result.startTestRun()746        result.skipped.append('x')747        self.assertEqual(True, result.wasSuccessful())748        result.startTestRun()749        result.expectedFailures.append('x')750        self.assertEqual(True, result.wasSuccessful())751        result.startTestRun()752        result.unexpectedSuccesses.append('x')753        self.assertEqual(True, result.wasSuccessful())754    def test_stopTestRun(self):755        result = StreamSummary()756        # terminal successful codes.757        result.startTestRun()758        result.status("foo", "inprogress")759        result.status("foo", "success")760        result.status("bar", "skip")761        result.status("baz", "exists")762        result.stopTestRun()763        self.assertEqual(True, result.wasSuccessful())764        # Existence is terminal but doesn't count as 'running' a test.765        self.assertEqual(2, result.testsRun)766    def test_stopTestRun_inprogress_test_fails(self):767        # Tests inprogress at stopTestRun trigger a failure.768        result = StreamSummary()769        result.startTestRun()770        result.status("foo", "inprogress")771        result.stopTestRun()772        self.assertEqual(False, result.wasSuccessful())773        self.assertThat(result.errors, HasLength(1))774        self.assertEqual("foo", result.errors[0][0].id())775        self.assertEqual("Test did not complete", result.errors[0][1])776        # interim state detection handles route codes - while duplicate ids in777        # one run is undesirable, it may happen (e.g. with repeated tests).778        result.startTestRun()779        result.status("foo", "inprogress")780        result.status("foo", "inprogress", route_code="A")781        result.status("foo", "success", route_code="A")782        result.stopTestRun()783        self.assertEqual(False, result.wasSuccessful())784    def test_status_skip(self):785        # when skip is seen, a synthetic test is reported with reason captured786        # from the 'reason' file attachment if any.787        result = StreamSummary()788        result.startTestRun()789        result.status(file_name="reason",790            file_bytes=_b("Missing dependency"), eof=True,791            mime_type="text/plain; charset=utf8", test_id="foo.bar")792        result.status("foo.bar", "skip")793        self.assertThat(result.skipped, HasLength(1))794        self.assertEqual("foo.bar", result.skipped[0][0].id())795        self.assertEqual(_u("Missing dependency"), result.skipped[0][1])796    def _report_files(self, result):797        result.status(file_name="some log.txt",798            file_bytes=_b("1234 log message"), eof=True,799            mime_type="text/plain; charset=utf8", test_id="foo.bar")800        result.status(file_name="traceback",801            file_bytes=_b("""Traceback (most recent call last):802  File "testtools/tests/test_testresult.py", line 607, in test_stopTestRun803      AllMatch(Equals([('startTestRun',), ('stopTestRun',)])))804testtools.matchers._impl.MismatchError: Differences: [805[('startTestRun',), ('stopTestRun',)] != []806[('startTestRun',), ('stopTestRun',)] != []807]808"""), eof=True, mime_type="text/plain; charset=utf8", test_id="foo.bar")809    files_message = Equals(_u("""some log.txt: {{{1234 log message}}}810Traceback (most recent call last):811  File "testtools/tests/test_testresult.py", line 607, in test_stopTestRun812      AllMatch(Equals([('startTestRun',), ('stopTestRun',)])))813testtools.matchers._impl.MismatchError: Differences: [814[('startTestRun',), ('stopTestRun',)] != []815[('startTestRun',), ('stopTestRun',)] != []816]817"""))818    def test_status_fail(self):819        # when fail is seen, a synthetic test is reported with all files820        # attached shown as the message.821        result = StreamSummary()822        result.startTestRun()823        self._report_files(result)824        result.status("foo.bar", "fail")825        self.assertThat(result.errors, HasLength(1))826        self.assertEqual("foo.bar", result.errors[0][0].id())827        self.assertThat(result.errors[0][1], self.files_message)828    def test_status_xfail(self):829        # when xfail is seen, a synthetic test is reported with all files830        # attached shown as the message.831        result = StreamSummary()832        result.startTestRun()833        self._report_files(result)834        result.status("foo.bar", "xfail")835        self.assertThat(result.expectedFailures, HasLength(1))836        self.assertEqual("foo.bar", result.expectedFailures[0][0].id())837        self.assertThat(result.expectedFailures[0][1], self.files_message)838    def test_status_uxsuccess(self):839        # when uxsuccess is seen, a synthetic test is reported.840        result = StreamSummary()841        result.startTestRun()842        result.status("foo.bar", "uxsuccess")843        self.assertThat(result.unexpectedSuccesses, HasLength(1))844        self.assertEqual("foo.bar", result.unexpectedSuccesses[0].id())845class TestTestControl(TestCase):846    def test_default(self):847        self.assertEqual(False, TestControl().shouldStop)848    def test_stop(self):849        control = TestControl()850        control.stop()851        self.assertEqual(True, control.shouldStop)852class TestTestResult(TestCase):853    """Tests for 'TestResult'."""854    run_tests_with = FullStackRunTest855    def makeResult(self):856        """Make an arbitrary result for testing."""857        return TestResult()858    def test_addSkipped(self):859        # Calling addSkip on a TestResult records the test that was skipped in860        # its skip_reasons dict.861        result = self.makeResult()862        result.addSkip(self, _u("Skipped for some reason"))863        self.assertEqual({_u("Skipped for some reason"):[self]},864            result.skip_reasons)865        result.addSkip(self, _u("Skipped for some reason"))866        self.assertEqual({_u("Skipped for some reason"):[self, self]},867            result.skip_reasons)868        result.addSkip(self, _u("Skipped for another reason"))869        self.assertEqual({_u("Skipped for some reason"):[self, self],870            _u("Skipped for another reason"):[self]},871            result.skip_reasons)872    def test_now_datetime_now(self):873        result = self.makeResult()874        olddatetime = testresult.real.datetime875        def restore():876            testresult.real.datetime = olddatetime877        self.addCleanup(restore)878        class Module:879            pass880        now = datetime.datetime.now(utc)881        stubdatetime = Module()882        stubdatetime.datetime = Module()883        stubdatetime.datetime.now = lambda tz: now884        testresult.real.datetime = stubdatetime885        # Calling _now() looks up the time.886        self.assertEqual(now, result._now())887        then = now + datetime.timedelta(0, 1)888        # Set an explicit datetime, which gets returned from then on.889        result.time(then)890        self.assertNotEqual(now, result._now())891        self.assertEqual(then, result._now())892        # go back to looking it up.893        result.time(None)894        self.assertEqual(now, result._now())895    def test_now_datetime_time(self):896        result = self.makeResult()897        now = datetime.datetime.now(utc)898        result.time(now)899        self.assertEqual(now, result._now())900    def test_traceback_formatting_without_stack_hidden(self):901        # During the testtools test run, we show our levels of the stack,902        # because we want to be able to use our test suite to debug our own903        # code.904        result = self.makeResult()905        test = make_erroring_test()906        test.run(result)907        self.assertThat(908            result.errors[0][1],909            DocTestMatches(910                'Traceback (most recent call last):\n'911                '  File "...testtools...runtest.py", line ..., in _run_user\n'912                '    return fn(*args, **kwargs)\n'913                '  File "...testtools...testcase.py", line ..., in _run_test_method\n'914                '    return self._get_test_method()()\n'915                '  File "...testtools...tests...test_testresult.py", line ..., in error\n'916                '    1/0\n'917                'ZeroDivisionError: ...\n',918                doctest.ELLIPSIS | doctest.REPORT_UDIFF))919    def test_traceback_formatting_with_stack_hidden(self):920        result = self.makeResult()921        test = make_erroring_test()922        run_with_stack_hidden(True, test.run, result)923        self.assertThat(924            result.errors[0][1],925            DocTestMatches(926                'Traceback (most recent call last):\n'927                '  File "...testtools...tests...test_testresult.py", line ..., in error\n'928                '    1/0\n'929                'ZeroDivisionError: ...\n',930                doctest.ELLIPSIS))931    def test_traceback_formatting_with_stack_hidden_mismatch(self):932        result = self.makeResult()933        test = make_mismatching_test()934        run_with_stack_hidden(True, test.run, result)935        self.assertThat(936            result.failures[0][1],937            DocTestMatches(938                'Traceback (most recent call last):\n'939                '  File "...testtools...tests...test_testresult.py", line ..., in mismatch\n'940                '    self.assertEqual(1, 2)\n'941                '...MismatchError: 1 != 2\n',942                doctest.ELLIPSIS))943    def test_exc_info_to_unicode(self):944        # subunit upcalls to TestResult._exc_info_to_unicode, so we need to945        # make sure that it's there.946        #947        # See <https://bugs.launchpad.net/testtools/+bug/929063>.948        test = make_erroring_test()949        exc_info = make_exception_info(RuntimeError, "foo")950        result = self.makeResult()951        text_traceback = result._exc_info_to_unicode(exc_info, test)952        self.assertEqual(953            TracebackContent(exc_info, test).as_text(), text_traceback)954class TestMultiTestResult(TestCase):955    """Tests for 'MultiTestResult'."""956    def setUp(self):957        super(TestMultiTestResult, self).setUp()958        self.result1 = LoggingResult([])959        self.result2 = LoggingResult([])960        self.multiResult = MultiTestResult(self.result1, self.result2)961    def assertResultLogsEqual(self, expectedEvents):962        """Assert that our test results have received the expected events."""963        self.assertEqual(expectedEvents, self.result1._events)964        self.assertEqual(expectedEvents, self.result2._events)965    def test_repr(self):966        self.assertEqual(967            '<MultiTestResult (%r, %r)>' % (968                ExtendedToOriginalDecorator(self.result1),969                ExtendedToOriginalDecorator(self.result2)),970            repr(self.multiResult))971    def test_empty(self):972        # Initializing a `MultiTestResult` doesn't do anything to its973        # `TestResult`s.974        self.assertResultLogsEqual([])975    def test_failfast_get(self):976        # Reading reads from the first one - arbitrary choice.977        self.assertEqual(False, self.multiResult.failfast)978        self.result1.failfast = True979        self.assertEqual(True, self.multiResult.failfast)980    def test_failfast_set(self):981        # Writing writes to all.982        self.multiResult.failfast = True983        self.assertEqual(True, self.result1.failfast)984        self.assertEqual(True, self.result2.failfast)985    def test_shouldStop(self):986        self.assertFalse(self.multiResult.shouldStop)987        self.result2.stop()988        # NB: result1 is not stopped: MultiTestResult has to combine the989        # values.990        self.assertTrue(self.multiResult.shouldStop)991    def test_startTest(self):992        # Calling `startTest` on a `MultiTestResult` calls `startTest` on all993        # its `TestResult`s.994        self.multiResult.startTest(self)995        self.assertResultLogsEqual([('startTest', self)])996    def test_stop(self):997        self.assertFalse(self.multiResult.shouldStop)998        self.multiResult.stop()999        self.assertResultLogsEqual(['stop'])1000    def test_stopTest(self):1001        # Calling `stopTest` on a `MultiTestResult` calls `stopTest` on all1002        # its `TestResult`s.1003        self.multiResult.stopTest(self)1004        self.assertResultLogsEqual([('stopTest', self)])1005    def test_addSkipped(self):1006        # Calling `addSkip` on a `MultiTestResult` calls addSkip on its1007        # results.1008        reason = _u("Skipped for some reason")1009        self.multiResult.addSkip(self, reason)1010        self.assertResultLogsEqual([('addSkip', self, reason)])1011    def test_addSuccess(self):1012        # Calling `addSuccess` on a `MultiTestResult` calls `addSuccess` on1013        # all its `TestResult`s.1014        self.multiResult.addSuccess(self)1015        self.assertResultLogsEqual([('addSuccess', self)])1016    def test_done(self):1017        # Calling `done` on a `MultiTestResult` calls `done` on all its1018        # `TestResult`s.1019        self.multiResult.done()1020        self.assertResultLogsEqual([('done')])1021    def test_addFailure(self):1022        # Calling `addFailure` on a `MultiTestResult` calls `addFailure` on1023        # all its `TestResult`s.1024        exc_info = make_exception_info(AssertionError, 'failure')1025        self.multiResult.addFailure(self, exc_info)1026        self.assertResultLogsEqual([('addFailure', self, exc_info)])1027    def test_addError(self):1028        # Calling `addError` on a `MultiTestResult` calls `addError` on all1029        # its `TestResult`s.1030        exc_info = make_exception_info(RuntimeError, 'error')1031        self.multiResult.addError(self, exc_info)1032        self.assertResultLogsEqual([('addError', self, exc_info)])1033    def test_startTestRun(self):1034        # Calling `startTestRun` on a `MultiTestResult` forwards to all its1035        # `TestResult`s.1036        self.multiResult.startTestRun()1037        self.assertResultLogsEqual([('startTestRun')])1038    def test_stopTestRun(self):1039        # Calling `stopTestRun` on a `MultiTestResult` forwards to all its1040        # `TestResult`s.1041        self.multiResult.stopTestRun()1042        self.assertResultLogsEqual([('stopTestRun')])1043    def test_stopTestRun_returns_results(self):1044        # `MultiTestResult.stopTestRun` returns a tuple of all of the return1045        # values the `stopTestRun`s that it forwards to.1046        class Result(LoggingResult):1047            def stopTestRun(self):1048                super(Result, self).stopTestRun()1049                return 'foo'1050        multi_result = MultiTestResult(Result([]), Result([]))1051        result = multi_result.stopTestRun()1052        self.assertEqual(('foo', 'foo'), result)1053    def test_tags(self):1054        # Calling `tags` on a `MultiTestResult` calls `tags` on all its1055        # `TestResult`s.1056        added_tags = set(['foo', 'bar'])1057        removed_tags = set(['eggs'])1058        self.multiResult.tags(added_tags, removed_tags)1059        self.assertResultLogsEqual([('tags', added_tags, removed_tags)])1060    def test_time(self):1061        # the time call is dispatched, not eaten by the base class1062        self.multiResult.time('foo')1063        self.assertResultLogsEqual([('time', 'foo')])1064class TestTextTestResult(TestCase):1065    """Tests for 'TextTestResult'."""1066    def setUp(self):1067        super(TestTextTestResult, self).setUp()1068        self.result = TextTestResult(StringIO())1069    def getvalue(self):1070        return self.result.stream.getvalue()1071    def test__init_sets_stream(self):1072        result = TextTestResult("fp")1073        self.assertEqual("fp", result.stream)1074    def reset_output(self):1075        self.result.stream = StringIO()1076    def test_startTestRun(self):1077        self.result.startTestRun()1078        self.assertEqual("Tests running...\n", self.getvalue())1079    def test_stopTestRun_count_many(self):1080        test = make_test()1081        self.result.startTestRun()1082        self.result.startTest(test)1083        self.result.stopTest(test)1084        self.result.startTest(test)1085        self.result.stopTest(test)1086        self.result.stream = StringIO()1087        self.result.stopTestRun()1088        self.assertThat(self.getvalue(),1089            DocTestMatches("\nRan 2 tests in ...s\n...", doctest.ELLIPSIS))1090    def test_stopTestRun_count_single(self):1091        test = make_test()1092        self.result.startTestRun()1093        self.result.startTest(test)1094        self.result.stopTest(test)1095        self.reset_output()1096        self.result.stopTestRun()1097        self.assertThat(self.getvalue(),1098            DocTestMatches("\nRan 1 test in ...s\nOK\n", doctest.ELLIPSIS))1099    def test_stopTestRun_count_zero(self):1100        self.result.startTestRun()1101        self.reset_output()1102        self.result.stopTestRun()1103        self.assertThat(self.getvalue(),1104            DocTestMatches("\nRan 0 tests in ...s\nOK\n", doctest.ELLIPSIS))1105    def test_stopTestRun_current_time(self):1106        test = make_test()1107        now = datetime.datetime.now(utc)1108        self.result.time(now)1109        self.result.startTestRun()1110        self.result.startTest(test)1111        now = now + datetime.timedelta(0, 0, 0, 1)1112        self.result.time(now)1113        self.result.stopTest(test)1114        self.reset_output()1115        self.result.stopTestRun()1116        self.assertThat(self.getvalue(),1117            DocTestMatches("... in 0.001s\n...", doctest.ELLIPSIS))1118    def test_stopTestRun_successful(self):1119        self.result.startTestRun()1120        self.result.stopTestRun()1121        self.assertThat(self.getvalue(),1122            DocTestMatches("...\nOK\n", doctest.ELLIPSIS))1123    def test_stopTestRun_not_successful_failure(self):1124        test = make_failing_test()1125        self.result.startTestRun()1126        test.run(self.result)1127        self.result.stopTestRun()1128        self.assertThat(self.getvalue(),1129            DocTestMatches("...\nFAILED (failures=1)\n", doctest.ELLIPSIS))1130    def test_stopTestRun_not_successful_error(self):1131        test = make_erroring_test()1132        self.result.startTestRun()1133        test.run(self.result)1134        self.result.stopTestRun()1135        self.assertThat(self.getvalue(),1136            DocTestMatches("...\nFAILED (failures=1)\n", doctest.ELLIPSIS))1137    def test_stopTestRun_not_successful_unexpected_success(self):1138        test = make_unexpectedly_successful_test()1139        self.result.startTestRun()1140        test.run(self.result)1141        self.result.stopTestRun()1142        self.assertThat(self.getvalue(),1143            DocTestMatches("...\nFAILED (failures=1)\n", doctest.ELLIPSIS))1144    def test_stopTestRun_shows_details(self):1145        self.skip("Disabled per bug 1188420")1146        def run_tests():1147            self.result.startTestRun()1148            make_erroring_test().run(self.result)1149            make_unexpectedly_successful_test().run(self.result)1150            make_failing_test().run(self.result)1151            self.reset_output()1152            self.result.stopTestRun()1153        run_with_stack_hidden(True, run_tests)1154        self.assertThat(self.getvalue(),1155            DocTestMatches("""...======================================================================1156ERROR: testtools.tests.test_testresult.Test.error1157----------------------------------------------------------------------1158Traceback (most recent call last):1159  File "...testtools...tests...test_testresult.py", line ..., in error1160    1/01161ZeroDivisionError:... divi... by zero...1162======================================================================1163FAIL: testtools.tests.test_testresult.Test.failed1164----------------------------------------------------------------------1165Traceback (most recent call last):1166  File "...testtools...tests...test_testresult.py", line ..., in failed1167    self.fail("yo!")1168AssertionError: yo!1169======================================================================1170UNEXPECTED SUCCESS: testtools.tests.test_testresult.Test.succeeded1171----------------------------------------------------------------------1172...""", doctest.ELLIPSIS | doctest.REPORT_NDIFF))1173class TestThreadSafeForwardingResult(TestCase):1174    """Tests for `TestThreadSafeForwardingResult`."""1175    def make_results(self, n):1176        events = []1177        target = LoggingResult(events)1178        semaphore = threading.Semaphore(1)1179        return [1180            ThreadsafeForwardingResult(target, semaphore)1181            for i in range(n)], events1182    def test_nonforwarding_methods(self):1183        # startTest and stopTest are not forwarded because they need to be1184        # batched.1185        [result], events = self.make_results(1)1186        result.startTest(self)1187        result.stopTest(self)1188        self.assertEqual([], events)1189    def test_tags_not_forwarded(self):1190        # Tags need to be batched for each test, so they aren't forwarded1191        # until a test runs.1192        [result], events = self.make_results(1)1193        result.tags(set(['foo']), set(['bar']))1194        self.assertEqual([], events)1195    def test_global_tags_simple(self):1196        # Tags specified outside of a test result are global. When a test's1197        # results are finally forwarded, we send through these global tags1198        # *as* test specific tags, because as a multiplexer there should be no1199        # way for a global tag on an input stream to affect tests from other1200        # streams - we can just always issue test local tags.1201        [result], events = self.make_results(1)1202        result.tags(set(['foo']), set())1203        result.time(1)1204        result.startTest(self)1205        result.time(2)1206        result.addSuccess(self)1207        self.assertEqual(1208            [('time', 1),1209             ('startTest', self),1210             ('time', 2),1211             ('tags', set(['foo']), set()),1212             ('addSuccess', self),1213             ('stopTest', self),1214             ], events)1215    def test_global_tags_complex(self):1216        # Multiple calls to tags() in a global context are buffered until the1217        # next test completes and are issued as part of of the test context,1218        # because they cannot be issued until the output result is locked.1219        # The sample data shows them being merged together, this is, strictly1220        # speaking incidental - they could be issued separately (in-order) and1221        # still be legitimate.1222        [result], events = self.make_results(1)1223        result.tags(set(['foo', 'bar']), set(['baz', 'qux']))1224        result.tags(set(['cat', 'qux']), set(['bar', 'dog']))1225        result.time(1)1226        result.startTest(self)1227        result.time(2)1228        result.addSuccess(self)1229        self.assertEqual(1230            [('time', 1),1231             ('startTest', self),1232             ('time', 2),1233             ('tags', set(['cat', 'foo', 'qux']), set(['dog', 'bar', 'baz'])),1234             ('addSuccess', self),1235             ('stopTest', self),1236             ], events)1237    def test_local_tags(self):1238        # Any tags set within a test context are forwarded in that test1239        # context when the result is finally forwarded.  This means that the1240        # tags for the test are part of the atomic message communicating1241        # everything about that test.1242        [result], events = self.make_results(1)1243        result.time(1)1244        result.startTest(self)1245        result.tags(set(['foo']), set([]))1246        result.tags(set(), set(['bar']))1247        result.time(2)1248        result.addSuccess(self)1249        self.assertEqual(1250            [('time', 1),1251             ('startTest', self),1252             ('time', 2),1253             ('tags', set(['foo']), set(['bar'])),1254             ('addSuccess', self),1255             ('stopTest', self),1256             ], events)1257    def test_local_tags_dont_leak(self):1258        # A tag set during a test is local to that test and is not set during1259        # the tests that follow.1260        [result], events = self.make_results(1)1261        a, b = PlaceHolder('a'), PlaceHolder('b')1262        result.time(1)1263        result.startTest(a)1264        result.tags(set(['foo']), set([]))1265        result.time(2)1266        result.addSuccess(a)1267        result.stopTest(a)1268        result.time(3)1269        result.startTest(b)1270        result.time(4)1271        result.addSuccess(b)1272        result.stopTest(b)1273        self.assertEqual(1274            [('time', 1),1275             ('startTest', a),1276             ('time', 2),1277             ('tags', set(['foo']), set()),1278             ('addSuccess', a),1279             ('stopTest', a),1280             ('time', 3),1281             ('startTest', b),1282             ('time', 4),1283             ('addSuccess', b),1284             ('stopTest', b),1285             ], events)1286    def test_startTestRun(self):1287        # Calls to startTestRun are not batched, because we are only1288        # interested in sending tests atomically, not the whole run.1289        [result1, result2], events = self.make_results(2)1290        result1.startTestRun()1291        result2.startTestRun()1292        self.assertEqual(["startTestRun", "startTestRun"], events)1293    def test_stopTestRun(self):1294        # Calls to stopTestRun are not batched, because we are only1295        # interested in sending tests atomically, not the whole run.1296        [result1, result2], events = self.make_results(2)1297        result1.stopTestRun()1298        result2.stopTestRun()1299        self.assertEqual(["stopTestRun", "stopTestRun"], events)1300    def test_forward_addError(self):1301        # Once we receive an addError event, we forward all of the events for1302        # that test, as we now know that test is complete.1303        [result], events = self.make_results(1)1304        exc_info = make_exception_info(RuntimeError, 'error')1305        start_time = datetime.datetime.utcfromtimestamp(1.489)1306        end_time = datetime.datetime.utcfromtimestamp(51.476)1307        result.time(start_time)1308        result.startTest(self)1309        result.time(end_time)1310        result.addError(self, exc_info)1311        self.assertEqual([1312            ('time', start_time),1313            ('startTest', self),1314            ('time', end_time),1315            ('addError', self, exc_info),1316            ('stopTest', self),1317            ], events)1318    def test_forward_addFailure(self):1319        # Once we receive an addFailure event, we forward all of the events1320        # for that test, as we now know that test is complete.1321        [result], events = self.make_results(1)1322        exc_info = make_exception_info(AssertionError, 'failure')1323        start_time = datetime.datetime.utcfromtimestamp(2.489)1324        end_time = datetime.datetime.utcfromtimestamp(3.476)1325        result.time(start_time)1326        result.startTest(self)1327        result.time(end_time)1328        result.addFailure(self, exc_info)1329        self.assertEqual([1330            ('time', start_time),1331            ('startTest', self),1332            ('time', end_time),1333            ('addFailure', self, exc_info),1334            ('stopTest', self),1335            ], events)1336    def test_forward_addSkip(self):1337        # Once we receive an addSkip event, we forward all of the events for1338        # that test, as we now know that test is complete.1339        [result], events = self.make_results(1)1340        reason = _u("Skipped for some reason")1341        start_time = datetime.datetime.utcfromtimestamp(4.489)1342        end_time = datetime.datetime.utcfromtimestamp(5.476)1343        result.time(start_time)1344        result.startTest(self)1345        result.time(end_time)1346        result.addSkip(self, reason)1347        self.assertEqual([1348            ('time', start_time),1349            ('startTest', self),1350            ('time', end_time),1351            ('addSkip', self, reason),1352            ('stopTest', self),1353            ], events)1354    def test_forward_addSuccess(self):1355        # Once we receive an addSuccess event, we forward all of the events1356        # for that test, as we now know that test is complete.1357        [result], events = self.make_results(1)1358        start_time = datetime.datetime.utcfromtimestamp(6.489)1359        end_time = datetime.datetime.utcfromtimestamp(7.476)1360        result.time(start_time)1361        result.startTest(self)1362        result.time(end_time)1363        result.addSuccess(self)1364        self.assertEqual([1365            ('time', start_time),1366            ('startTest', self),1367            ('time', end_time),1368            ('addSuccess', self),1369            ('stopTest', self),1370            ], events)1371    def test_only_one_test_at_a_time(self):1372        # Even if there are multiple ThreadsafeForwardingResults forwarding to1373        # the same target result, the target result only receives the complete1374        # events for one test at a time.1375        [result1, result2], events = self.make_results(2)1376        test1, test2 = self, make_test()1377        start_time1 = datetime.datetime.utcfromtimestamp(1.489)1378        end_time1 = datetime.datetime.utcfromtimestamp(2.476)1379        start_time2 = datetime.datetime.utcfromtimestamp(3.489)1380        end_time2 = datetime.datetime.utcfromtimestamp(4.489)1381        result1.time(start_time1)1382        result2.time(start_time2)1383        result1.startTest(test1)1384        result2.startTest(test2)1385        result1.time(end_time1)1386        result2.time(end_time2)1387        result2.addSuccess(test2)1388        result1.addSuccess(test1)1389        self.assertEqual([1390            # test2 finishes first, and so is flushed first.1391            ('time', start_time2),1392            ('startTest', test2),1393            ('time', end_time2),1394            ('addSuccess', test2),1395            ('stopTest', test2),1396            # test1 finishes next, and thus follows.1397            ('time', start_time1),1398            ('startTest', test1),1399            ('time', end_time1),1400            ('addSuccess', test1),1401            ('stopTest', test1),1402            ], events)1403class TestMergeTags(TestCase):1404    def test_merge_unseen_gone_tag(self):1405        # If an incoming "gone" tag isn't currently tagged one way or the1406        # other, add it to the "gone" tags.1407        current_tags = set(['present']), set(['missing'])1408        changing_tags = set(), set(['going'])1409        expected = set(['present']), set(['missing', 'going'])1410        self.assertEqual(1411            expected, _merge_tags(current_tags, changing_tags))1412    def test_merge_incoming_gone_tag_with_current_new_tag(self):1413        # If one of the incoming "gone" tags is one of the existing "new"1414        # tags, then it overrides the "new" tag, leaving it marked as "gone".1415        current_tags = set(['present', 'going']), set(['missing'])1416        changing_tags = set(), set(['going'])1417        expected = set(['present']), set(['missing', 'going'])1418        self.assertEqual(1419            expected, _merge_tags(current_tags, changing_tags))1420    def test_merge_unseen_new_tag(self):1421        current_tags = set(['present']), set(['missing'])1422        changing_tags = set(['coming']), set()1423        expected = set(['coming', 'present']), set(['missing'])1424        self.assertEqual(1425            expected, _merge_tags(current_tags, changing_tags))1426    def test_merge_incoming_new_tag_with_current_gone_tag(self):1427        # If one of the incoming "new" tags is currently marked as "gone",1428        # then it overrides the "gone" tag, leaving it marked as "new".1429        current_tags = set(['present']), set(['coming', 'missing'])1430        changing_tags = set(['coming']), set()1431        expected = set(['coming', 'present']), set(['missing'])1432        self.assertEqual(1433            expected, _merge_tags(current_tags, changing_tags))1434class TestStreamResultRouter(TestCase):1435    def test_start_stop_test_run_no_fallback(self):1436        result = StreamResultRouter()1437        result.startTestRun()1438        result.stopTestRun()1439    def test_no_fallback_errors(self):1440        self.assertRaises(Exception, StreamResultRouter().status, test_id='f')1441    def test_fallback_calls(self):1442        fallback = LoggingStreamResult()1443        result = StreamResultRouter(fallback)1444        result.startTestRun()1445        result.status(test_id='foo')1446        result.stopTestRun()1447        self.assertEqual([1448            ('startTestRun',),1449            ('status', 'foo', None, None, True, None, None, False, None, None,1450             None),1451            ('stopTestRun',),1452            ],1453            fallback._events)1454    def test_fallback_no_do_start_stop_run(self):1455        fallback = LoggingStreamResult()1456        result = StreamResultRouter(fallback, do_start_stop_run=False)1457        result.startTestRun()1458        result.status(test_id='foo')1459        result.stopTestRun()1460        self.assertEqual([1461            ('status', 'foo', None, None, True, None, None, False, None, None,1462             None)1463            ],1464            fallback._events)1465    def test_add_rule_bad_policy(self):1466        router = StreamResultRouter()1467        target = LoggingStreamResult()1468        self.assertRaises(ValueError, router.add_rule, target, 'route_code_prefixa',1469            route_prefix='0')1470    def test_add_rule_extra_policy_arg(self):1471        router = StreamResultRouter()1472        target = LoggingStreamResult()1473        self.assertRaises(TypeError, router.add_rule, target, 'route_code_prefix',1474            route_prefix='0', foo=1)1475    def test_add_rule_missing_prefix(self):1476        router = StreamResultRouter()1477        target = LoggingStreamResult()1478        self.assertRaises(TypeError, router.add_rule, target, 'route_code_prefix')1479    def test_add_rule_slash_in_prefix(self):1480        router = StreamResultRouter()1481        target = LoggingStreamResult()1482        self.assertRaises(TypeError, router.add_rule, target, 'route_code_prefix',1483            route_prefix='0/')1484    def test_add_rule_route_code_consume_False(self):1485        fallback = LoggingStreamResult()1486        target = LoggingStreamResult()1487        router = StreamResultRouter(fallback)1488        router.add_rule(target, 'route_code_prefix', route_prefix='0')1489        router.status(test_id='foo', route_code='0')1490        router.status(test_id='foo', route_code='0/1')1491        router.status(test_id='foo')1492        self.assertEqual([1493            ('status', 'foo', None, None, True, None, None, False, None, '0',1494             None),1495            ('status', 'foo', None, None, True, None, None, False, None, '0/1',1496             None),1497            ],1498            target._events)1499        self.assertEqual([1500            ('status', 'foo', None, None, True, None, None, False, None, None,1501             None),1502            ],1503            fallback._events)1504    def test_add_rule_route_code_consume_True(self):1505        fallback = LoggingStreamResult()1506        target = LoggingStreamResult()1507        router = StreamResultRouter(fallback)1508        router.add_rule(1509            target, 'route_code_prefix', route_prefix='0', consume_route=True)1510        router.status(test_id='foo', route_code='0') # -> None1511        router.status(test_id='foo', route_code='0/1') # -> 11512        router.status(test_id='foo', route_code='1') # -> fallback as-is.1513        self.assertEqual([1514            ('status', 'foo', None, None, True, None, None, False, None, None,1515             None),1516            ('status', 'foo', None, None, True, None, None, False, None, '1',1517             None),1518            ],1519            target._events)1520        self.assertEqual([1521            ('status', 'foo', None, None, True, None, None, False, None, '1',1522             None),1523            ],1524            fallback._events)1525    def test_add_rule_test_id(self):1526        nontest = LoggingStreamResult()1527        test = LoggingStreamResult()1528        router = StreamResultRouter(test)1529        router.add_rule(nontest, 'test_id', test_id=None)1530        router.status(test_id='foo', file_name="bar", file_bytes=b'')1531        router.status(file_name="bar", file_bytes=b'')1532        self.assertEqual([1533            ('status', 'foo', None, None, True, 'bar', b'', False, None, None,1534             None),], test._events)1535        self.assertEqual([1536            ('status', None, None, None, True, 'bar', b'', False, None, None,1537             None),], nontest._events)1538    def test_add_rule_do_start_stop_run(self):1539        nontest = LoggingStreamResult()1540        router = StreamResultRouter()1541        router.add_rule(nontest, 'test_id', test_id=None, do_start_stop_run=True)1542        router.startTestRun()1543        router.stopTestRun()1544        self.assertEqual([1545            ('startTestRun',),1546            ('stopTestRun',),1547            ], nontest._events)1548    def test_add_rule_do_start_stop_run_after_startTestRun(self):1549        nontest = LoggingStreamResult()1550        router = StreamResultRouter()1551        router.startTestRun()1552        router.add_rule(nontest, 'test_id', test_id=None, do_start_stop_run=True)1553        router.stopTestRun()1554        self.assertEqual([1555            ('startTestRun',),1556            ('stopTestRun',),1557            ], nontest._events)1558class TestStreamToQueue(TestCase):1559    def make_result(self):1560        queue = Queue()1561        return queue, StreamToQueue(queue, "foo")1562    def test_status(self):1563        def check_event(event_dict, route=None, time=None):1564            self.assertEqual("status", event_dict['event'])1565            self.assertEqual("test", event_dict['test_id'])1566            self.assertEqual("fail", event_dict['test_status'])1567            self.assertEqual(set(["quux"]), event_dict['test_tags'])1568            self.assertEqual(False, event_dict['runnable'])1569            self.assertEqual("file", event_dict['file_name'])1570            self.assertEqual(_b("content"), event_dict['file_bytes'])1571            self.assertEqual(True, event_dict['eof'])1572            self.assertEqual("quux", event_dict['mime_type'])1573            self.assertEqual("test", event_dict['test_id'])1574            self.assertEqual(route, event_dict['route_code'])1575            self.assertEqual(time, event_dict['timestamp'])1576        queue, result = self.make_result()1577        result.status("test", "fail", test_tags=set(["quux"]), runnable=False,1578            file_name="file", file_bytes=_b("content"), eof=True,1579            mime_type="quux", route_code=None, timestamp=None)1580        self.assertEqual(1, queue.qsize())1581        a_time = datetime.datetime.now(utc)1582        result.status("test", "fail", test_tags=set(["quux"]), runnable=False,1583            file_name="file", file_bytes=_b("content"), eof=True,1584            mime_type="quux", route_code="bar", timestamp=a_time)1585        self.assertEqual(2, queue.qsize())1586        check_event(queue.get(False), route="foo", time=None)1587        check_event(queue.get(False), route="foo/bar", time=a_time)1588    def testStartTestRun(self):1589        queue, result = self.make_result()1590        result.startTestRun()1591        self.assertEqual(1592            {'event':'startTestRun', 'result':result}, queue.get(False))1593        self.assertTrue(queue.empty())1594    def testStopTestRun(self):1595        queue, result = self.make_result()1596        result.stopTestRun()1597        self.assertEqual(1598            {'event':'stopTestRun', 'result':result}, queue.get(False))1599        self.assertTrue(queue.empty())1600class TestExtendedToOriginalResultDecoratorBase(TestCase):1601    def make_26_result(self):1602        self.result = Python26TestResult()1603        self.make_converter()1604    def make_27_result(self):1605        self.result = Python27TestResult()1606        self.make_converter()1607    def make_converter(self):1608        self.converter = ExtendedToOriginalDecorator(self.result)1609    def make_extended_result(self):1610        self.result = ExtendedTestResult()1611        self.make_converter()1612    def check_outcome_details(self, outcome):1613        """Call an outcome with a details dict to be passed through."""1614        # This dict is /not/ convertible - thats deliberate, as it should1615        # not hit the conversion code path.1616        details = {'foo': 'bar'}1617        getattr(self.converter, outcome)(self, details=details)1618        self.assertEqual([(outcome, self, details)], self.result._events)1619    def get_details_and_string(self):1620        """Get a details dict and expected string."""1621        text1 = lambda: [_b("1\n2\n")]1622        text2 = lambda: [_b("3\n4\n")]1623        bin1 = lambda: [_b("5\n")]1624        details = {'text 1': Content(ContentType('text', 'plain'), text1),1625            'text 2': Content(ContentType('text', 'strange'), text2),1626            'bin 1': Content(ContentType('application', 'binary'), bin1)}1627        return (details,1628                ("Binary content:\n"1629                 "  bin 1 (application/binary)\n"1630                 "\n"1631                 "text 1: {{{\n"1632                 "1\n"1633                 "2\n"1634                 "}}}\n"1635                 "\n"1636                 "text 2: {{{\n"1637                 "3\n"1638                 "4\n"1639                 "}}}\n"))1640    def check_outcome_details_to_exec_info(self, outcome, expected=None):1641        """Call an outcome with a details dict to be made into exc_info."""1642        # The conversion is a done using RemoteError and the string contents1643        # of the text types in the details dict.1644        if not expected:1645            expected = outcome1646        details, err_str = self.get_details_and_string()1647        getattr(self.converter, outcome)(self, details=details)1648        err = self.converter._details_to_exc_info(details)1649        self.assertEqual([(expected, self, err)], self.result._events)1650    def check_outcome_details_to_nothing(self, outcome, expected=None):1651        """Call an outcome with a details dict to be swallowed."""1652        if not expected:1653            expected = outcome1654        details = {'foo': 'bar'}1655        getattr(self.converter, outcome)(self, details=details)1656        self.assertEqual([(expected, self)], self.result._events)1657    def check_outcome_details_to_string(self, outcome):1658        """Call an outcome with a details dict to be stringified."""1659        details, err_str = self.get_details_and_string()1660        getattr(self.converter, outcome)(self, details=details)1661        self.assertEqual([(outcome, self, err_str)], self.result._events)1662    def check_outcome_details_to_arg(self, outcome, arg, extra_detail=None):1663        """Call an outcome with a details dict to have an arg extracted."""1664        details, _ = self.get_details_and_string()1665        if extra_detail:1666            details.update(extra_detail)1667        getattr(self.converter, outcome)(self, details=details)1668        self.assertEqual([(outcome, self, arg)], self.result._events)1669    def check_outcome_exc_info(self, outcome, expected=None):1670        """Check that calling a legacy outcome still works."""1671        # calling some outcome with the legacy exc_info style api (no keyword1672        # parameters) gets passed through.1673        if not expected:1674            expected = outcome1675        err = sys.exc_info()1676        getattr(self.converter, outcome)(self, err)1677        self.assertEqual([(expected, self, err)], self.result._events)1678    def check_outcome_exc_info_to_nothing(self, outcome, expected=None):1679        """Check that calling a legacy outcome on a fallback works."""1680        # calling some outcome with the legacy exc_info style api (no keyword1681        # parameters) gets passed through.1682        if not expected:1683            expected = outcome1684        err = sys.exc_info()1685        getattr(self.converter, outcome)(self, err)1686        self.assertEqual([(expected, self)], self.result._events)1687    def check_outcome_nothing(self, outcome, expected=None):1688        """Check that calling a legacy outcome still works."""1689        if not expected:1690            expected = outcome1691        getattr(self.converter, outcome)(self)1692        self.assertEqual([(expected, self)], self.result._events)1693    def check_outcome_string_nothing(self, outcome, expected):1694        """Check that calling outcome with a string calls expected."""1695        getattr(self.converter, outcome)(self, "foo")1696        self.assertEqual([(expected, self)], self.result._events)1697    def check_outcome_string(self, outcome):1698        """Check that calling outcome with a string works."""1699        getattr(self.converter, outcome)(self, "foo")1700        self.assertEqual([(outcome, self, "foo")], self.result._events)1701class TestExtendedToOriginalResultDecorator(1702    TestExtendedToOriginalResultDecoratorBase):1703    def test_failfast_py26(self):1704        self.make_26_result()1705        self.assertEqual(False, self.converter.failfast)1706        self.converter.failfast = True1707        self.assertFalse(safe_hasattr(self.converter.decorated, 'failfast'))1708    def test_failfast_py27(self):1709        self.make_27_result()1710        self.assertEqual(False, self.converter.failfast)1711        # setting it should write it to the backing result1712        self.converter.failfast = True1713        self.assertEqual(True, self.converter.decorated.failfast)1714    def test_progress_py26(self):1715        self.make_26_result()1716        self.converter.progress(1, 2)1717    def test_progress_py27(self):1718        self.make_27_result()1719        self.converter.progress(1, 2)1720    def test_progress_pyextended(self):1721        self.make_extended_result()1722        self.converter.progress(1, 2)1723        self.assertEqual([('progress', 1, 2)], self.result._events)1724    def test_shouldStop(self):1725        self.make_26_result()1726        self.assertEqual(False, self.converter.shouldStop)1727        self.converter.decorated.stop()1728        self.assertEqual(True, self.converter.shouldStop)1729    def test_startTest_py26(self):1730        self.make_26_result()1731        self.converter.startTest(self)1732        self.assertEqual([('startTest', self)], self.result._events)1733    def test_startTest_py27(self):1734        self.make_27_result()1735        self.converter.startTest(self)1736        self.assertEqual([('startTest', self)], self.result._events)1737    def test_startTest_pyextended(self):1738        self.make_extended_result()1739        self.converter.startTest(self)1740        self.assertEqual([('startTest', self)], self.result._events)1741    def test_startTestRun_py26(self):1742        self.make_26_result()1743        self.converter.startTestRun()1744        self.assertEqual([], self.result._events)1745    def test_startTestRun_py27(self):1746        self.make_27_result()1747        self.converter.startTestRun()1748        self.assertEqual([('startTestRun',)], self.result._events)1749    def test_startTestRun_pyextended(self):1750        self.make_extended_result()1751        self.converter.startTestRun()1752        self.assertEqual([('startTestRun',)], self.result._events)1753    def test_stopTest_py26(self):1754        self.make_26_result()1755        self.converter.stopTest(self)1756        self.assertEqual([('stopTest', self)], self.result._events)1757    def test_stopTest_py27(self):1758        self.make_27_result()1759        self.converter.stopTest(self)1760        self.assertEqual([('stopTest', self)], self.result._events)1761    def test_stopTest_pyextended(self):1762        self.make_extended_result()1763        self.converter.stopTest(self)1764        self.assertEqual([('stopTest', self)], self.result._events)1765    def test_stopTestRun_py26(self):1766        self.make_26_result()1767        self.converter.stopTestRun()1768        self.assertEqual([], self.result._events)1769    def test_stopTestRun_py27(self):1770        self.make_27_result()1771        self.converter.stopTestRun()1772        self.assertEqual([('stopTestRun',)], self.result._events)1773    def test_stopTestRun_pyextended(self):1774        self.make_extended_result()1775        self.converter.stopTestRun()1776        self.assertEqual([('stopTestRun',)], self.result._events)1777    def test_tags_py26(self):1778        self.make_26_result()1779        self.converter.tags(set([1]), set([2]))1780    def test_tags_py27(self):1781        self.make_27_result()1782        self.converter.tags(set([1]), set([2]))1783    def test_tags_pyextended(self):1784        self.make_extended_result()1785        self.converter.tags(set([1]), set([2]))1786        self.assertEqual([('tags', set([1]), set([2]))], self.result._events)1787    def test_time_py26(self):1788        self.make_26_result()1789        self.converter.time(1)1790    def test_time_py27(self):1791        self.make_27_result()1792        self.converter.time(1)1793    def test_time_pyextended(self):1794        self.make_extended_result()1795        self.converter.time(1)1796        self.assertEqual([('time', 1)], self.result._events)1797class TestExtendedToOriginalAddError(TestExtendedToOriginalResultDecoratorBase):1798    outcome = 'addError'1799    def test_outcome_Original_py26(self):1800        self.make_26_result()1801        self.check_outcome_exc_info(self.outcome)1802    def test_outcome_Original_py27(self):1803        self.make_27_result()1804        self.check_outcome_exc_info(self.outcome)1805    def test_outcome_Original_pyextended(self):1806        self.make_extended_result()1807        self.check_outcome_exc_info(self.outcome)1808    def test_outcome_Extended_py26(self):1809        self.make_26_result()1810        self.check_outcome_details_to_exec_info(self.outcome)1811    def test_outcome_Extended_py27(self):1812        self.make_27_result()1813        self.check_outcome_details_to_exec_info(self.outcome)1814    def test_outcome_Extended_pyextended(self):1815        self.make_extended_result()1816        self.check_outcome_details(self.outcome)1817    def test_outcome__no_details(self):1818        self.make_extended_result()1819        self.assertThat(1820            lambda: getattr(self.converter, self.outcome)(self),1821            Raises(MatchesException(ValueError)))1822class TestExtendedToOriginalAddFailure(1823    TestExtendedToOriginalAddError):1824    outcome = 'addFailure'1825class TestExtendedToOriginalAddExpectedFailure(1826    TestExtendedToOriginalAddError):1827    outcome = 'addExpectedFailure'1828    def test_outcome_Original_py26(self):1829        self.make_26_result()1830        self.check_outcome_exc_info_to_nothing(self.outcome, 'addSuccess')1831    def test_outcome_Extended_py26(self):1832        self.make_26_result()1833        self.check_outcome_details_to_nothing(self.outcome, 'addSuccess')1834class TestExtendedToOriginalAddSkip(1835    TestExtendedToOriginalResultDecoratorBase):1836    outcome = 'addSkip'1837    def test_outcome_Original_py26(self):1838        self.make_26_result()1839        self.check_outcome_string_nothing(self.outcome, 'addSuccess')1840    def test_outcome_Original_py27(self):1841        self.make_27_result()1842        self.check_outcome_string(self.outcome)1843    def test_outcome_Original_pyextended(self):1844        self.make_extended_result()1845        self.check_outcome_string(self.outcome)1846    def test_outcome_Extended_py26(self):1847        self.make_26_result()1848        self.check_outcome_string_nothing(self.outcome, 'addSuccess')1849    def test_outcome_Extended_py27_no_reason(self):1850        self.make_27_result()1851        self.check_outcome_details_to_string(self.outcome)1852    def test_outcome_Extended_py27_reason(self):1853        self.make_27_result()1854        self.check_outcome_details_to_arg(self.outcome, 'foo',1855            {'reason': Content(UTF8_TEXT, lambda:[_b('foo')])})1856    def test_outcome_Extended_pyextended(self):1857        self.make_extended_result()1858        self.check_outcome_details(self.outcome)1859    def test_outcome__no_details(self):1860        self.make_extended_result()1861        self.assertThat(1862            lambda: getattr(self.converter, self.outcome)(self),1863            Raises(MatchesException(ValueError)))1864class TestExtendedToOriginalAddSuccess(1865    TestExtendedToOriginalResultDecoratorBase):1866    outcome = 'addSuccess'1867    expected = 'addSuccess'1868    def test_outcome_Original_py26(self):1869        self.make_26_result()1870        self.check_outcome_nothing(self.outcome, self.expected)1871    def test_outcome_Original_py27(self):1872        self.make_27_result()1873        self.check_outcome_nothing(self.outcome)1874    def test_outcome_Original_pyextended(self):1875        self.make_extended_result()1876        self.check_outcome_nothing(self.outcome)1877    def test_outcome_Extended_py26(self):1878        self.make_26_result()1879        self.check_outcome_details_to_nothing(self.outcome, self.expected)1880    def test_outcome_Extended_py27(self):1881        self.make_27_result()1882        self.check_outcome_details_to_nothing(self.outcome)1883    def test_outcome_Extended_pyextended(self):1884        self.make_extended_result()1885        self.check_outcome_details(self.outcome)1886class TestExtendedToOriginalAddUnexpectedSuccess(1887    TestExtendedToOriginalResultDecoratorBase):1888    outcome = 'addUnexpectedSuccess'1889    expected = 'addFailure'1890    def test_outcome_Original_py26(self):1891        self.make_26_result()1892        getattr(self.converter, self.outcome)(self)1893        [event] = self.result._events1894        self.assertEqual((self.expected, self), event[:2])1895    def test_outcome_Original_py27(self):1896        self.make_27_result()1897        self.check_outcome_nothing(self.outcome)1898    def test_outcome_Original_pyextended(self):1899        self.make_extended_result()1900        self.check_outcome_nothing(self.outcome)1901    def test_outcome_Extended_py26(self):1902        self.make_26_result()1903        getattr(self.converter, self.outcome)(self)1904        [event] = self.result._events1905        self.assertEqual((self.expected, self), event[:2])1906    def test_outcome_Extended_py27(self):1907        self.make_27_result()1908        self.check_outcome_details_to_nothing(self.outcome)1909    def test_outcome_Extended_pyextended(self):1910        self.make_extended_result()1911        self.check_outcome_details(self.outcome)1912class TestExtendedToOriginalResultOtherAttributes(1913    TestExtendedToOriginalResultDecoratorBase):1914    def test_other_attribute(self):1915        class OtherExtendedResult:1916            def foo(self):1917                return 21918            bar = 11919        self.result = OtherExtendedResult()1920        self.make_converter()1921        self.assertEqual(1, self.converter.bar)1922        self.assertEqual(2, self.converter.foo())1923class TestNonAsciiResults(TestCase):1924    """Test all kinds of tracebacks are cleanly interpreted as unicode1925    Currently only uses weak "contains" assertions, would be good to be much1926    stricter about the expected output. This would add a few failures for the1927    current release of IronPython for instance, which gets some traceback1928    lines muddled.1929    """1930    _sample_texts = (1931        _u("pa\u026a\u03b8\u0259n"), # Unicode encodings only1932        _u("\u5357\u7121"), # In ISO 2022 encodings1933        _u("\xa7\xa7\xa7"), # In ISO 8859 encodings1934        )1935    _is_pypy = "__pypy__" in sys.builtin_module_names1936    # Everything but Jython shows syntax errors on the current character1937    _error_on_character = os.name != "java" and not _is_pypy1938    def _run(self, stream, test):1939        """Run the test, the same as in testtools.run but not to stdout"""1940        result = TextTestResult(stream)1941        result.startTestRun()1942        try:1943            return test.run(result)1944        finally:1945            result.stopTestRun()1946    def _write_module(self, name, encoding, contents):1947        """Create Python module on disk with contents in given encoding"""1948        try:1949            # Need to pre-check that the coding is valid or codecs.open drops1950            # the file without closing it which breaks non-refcounted pythons1951            codecs.lookup(encoding)1952        except LookupError:1953            self.skip("Encoding unsupported by implementation: %r" % encoding)1954        f = codecs.open(os.path.join(self.dir, name + ".py"), "w", encoding)1955        try:1956            f.write(contents)1957        finally:1958            f.close()1959    def _test_external_case(self, testline, coding="ascii", modulelevel="",1960            suffix=""):1961        """Create and run a test case in a seperate module"""1962        self._setup_external_case(testline, coding, modulelevel, suffix)1963        return self._run_external_case()1964    def _setup_external_case(self, testline, coding="ascii", modulelevel="",1965            suffix=""):1966        """Create a test case in a seperate module"""1967        _, prefix, self.modname = self.id().rsplit(".", 2)1968        self.dir = tempfile.mkdtemp(prefix=prefix, suffix=suffix)1969        self.addCleanup(shutil.rmtree, self.dir)1970        self._write_module(self.modname, coding,1971            # Older Python 2 versions don't see a coding declaration in a1972            # docstring so it has to be in a comment, but then we can't1973            # workaround bug: <http://ironpython.codeplex.com/workitem/26940>1974            "# coding: %s\n"1975            "import testtools\n"1976            "%s\n"1977            "class Test(testtools.TestCase):\n"1978            "    def runTest(self):\n"1979            "        %s\n" % (coding, modulelevel, testline))1980    def _run_external_case(self):1981        """Run the prepared test case in a seperate module"""1982        sys.path.insert(0, self.dir)1983        self.addCleanup(sys.path.remove, self.dir)1984        module = __import__(self.modname)1985        self.addCleanup(sys.modules.pop, self.modname)1986        stream = StringIO()1987        self._run(stream, module.Test())1988        return stream.getvalue()1989    def _silence_deprecation_warnings(self):1990        """Shut up DeprecationWarning for this test only"""1991        warnings.simplefilter("ignore", DeprecationWarning)1992        self.addCleanup(warnings.filters.remove, warnings.filters[0])1993    def _get_sample_text(self, encoding="unicode_internal"):1994        if encoding is None and str_is_unicode:1995           encoding = "unicode_internal"1996        for u in self._sample_texts:1997            try:1998                b = u.encode(encoding)1999                if u == b.decode(encoding):2000                   if str_is_unicode:2001                       return u, u2002                   return u, b2003            except (LookupError, UnicodeError):2004                pass2005        self.skip("Could not find a sample text for encoding: %r" % encoding)2006    def _as_output(self, text):2007        return text2008    def test_non_ascii_failure_string(self):2009        """Assertion contents can be non-ascii and should get decoded"""2010        text, raw = self._get_sample_text(_get_exception_encoding())2011        textoutput = self._test_external_case("self.fail(%s)" % _r(raw))2012        self.assertIn(self._as_output(text), textoutput)2013    def test_non_ascii_failure_string_via_exec(self):2014        """Assertion via exec can be non-ascii and still gets decoded"""2015        text, raw = self._get_sample_text(_get_exception_encoding())2016        textoutput = self._test_external_case(2017            testline='exec ("self.fail(%s)")' % _r(raw))2018        self.assertIn(self._as_output(text), textoutput)2019    def test_control_characters_in_failure_string(self):2020        """Control characters in assertions should be escaped"""2021        textoutput = self._test_external_case("self.fail('\\a\\a\\a')")2022        self.expectFailure("Defense against the beeping horror unimplemented",2023            self.assertNotIn, self._as_output("\a\a\a"), textoutput)2024        self.assertIn(self._as_output(_u("\uFFFD\uFFFD\uFFFD")), textoutput)2025    def _local_os_error_matcher(self):2026        if sys.version_info > (3, 3):2027            return MatchesAny(Contains("FileExistsError: "),2028                              Contains("PermissionError: "))2029        elif os.name != "nt" or sys.version_info < (2, 5):2030            return Contains(self._as_output("OSError: "))2031        else:2032            return Contains(self._as_output("WindowsError: "))2033    def test_os_error(self):2034        """Locale error messages from the OS shouldn't break anything"""2035        textoutput = self._test_external_case(2036            modulelevel="import os",2037            testline="os.mkdir('/')")2038        self.assertThat(textoutput, self._local_os_error_matcher())2039    def test_assertion_text_shift_jis(self):2040        """A terminal raw backslash in an encoded string is weird but fine"""2041        example_text = _u("\u5341")2042        textoutput = self._test_external_case(2043            coding="shift_jis",2044            testline="self.fail('%s')" % example_text)2045        if str_is_unicode:2046            output_text = example_text2047        else:2048            output_text = example_text.encode("shift_jis").decode(2049                _get_exception_encoding(), "replace")2050        self.assertIn(self._as_output("AssertionError: %s" % output_text),2051            textoutput)2052    def test_file_comment_iso2022_jp(self):2053        """Control character escapes must be preserved if valid encoding"""2054        example_text, _ = self._get_sample_text("iso2022_jp")2055        textoutput = self._test_external_case(2056            coding="iso2022_jp",2057            testline="self.fail('Simple') # %s" % example_text)2058        self.assertIn(self._as_output(example_text), textoutput)2059    def test_unicode_exception(self):2060        """Exceptions that can be formated losslessly as unicode should be"""2061        example_text, _ = self._get_sample_text()2062        exception_class = (2063            "class FancyError(Exception):\n"2064            # A __unicode__ method does nothing on py3k but the default works2065            "    def __unicode__(self):\n"2066            "        return self.args[0]\n")2067        textoutput = self._test_external_case(2068            modulelevel=exception_class,2069            testline="raise FancyError(%s)" % _r(example_text))2070        self.assertIn(self._as_output(example_text), textoutput)2071    def test_unprintable_exception(self):2072        """A totally useless exception instance still prints something"""2073        exception_class = (2074            "class UnprintableError(Exception):\n"2075            "    def __str__(self):\n"2076            "        raise RuntimeError\n"2077            "    def __unicode__(self):\n"2078            "        raise RuntimeError\n"2079            "    def __repr__(self):\n"2080            "        raise RuntimeError\n")2081        textoutput = self._test_external_case(2082            modulelevel=exception_class,2083            testline="raise UnprintableError")2084        self.assertIn(self._as_output(2085            "UnprintableError: <unprintable UnprintableError object>\n"),2086            textoutput)2087    def test_string_exception(self):2088        """Raise a string rather than an exception instance if supported"""2089        if sys.version_info > (2, 6):2090            self.skip("No string exceptions in Python 2.6 or later")2091        elif sys.version_info > (2, 5):2092            self._silence_deprecation_warnings()2093        textoutput = self._test_external_case(testline="raise 'plain str'")2094        self.assertIn(self._as_output("\nplain str\n"), textoutput)2095    def test_non_ascii_dirname(self):2096        """Script paths in the traceback can be non-ascii"""2097        text, raw = self._get_sample_text(sys.getfilesystemencoding())2098        textoutput = self._test_external_case(2099            # Avoid bug in Python 3 by giving a unicode source encoding rather2100            # than just ascii which raises a SyntaxError with no other details2101            coding="utf-8",2102            testline="self.fail('Simple')",2103            suffix=raw)2104        self.assertIn(self._as_output(text), textoutput)2105    def test_syntax_error(self):2106        """Syntax errors should still have fancy special-case formatting"""2107        textoutput = self._test_external_case("exec ('f(a, b c)')")2108        self.assertIn(self._as_output(2109            '  File "<string>", line 1\n'2110            '    f(a, b c)\n'2111            + ' ' * self._error_on_character +2112            '          ^\n'2113            'SyntaxError: '2114            ), textoutput)2115    def test_syntax_error_malformed(self):2116        """Syntax errors with bogus parameters should break anything"""2117        textoutput = self._test_external_case("raise SyntaxError(3, 2, 1)")2118        self.assertIn(self._as_output("\nSyntaxError: "), textoutput)2119    def test_syntax_error_import_binary(self):2120        """Importing a binary file shouldn't break SyntaxError formatting"""2121        if sys.version_info < (2, 5):2122            # Python 2.4 assumes the file is latin-1 and tells you off2123            self._silence_deprecation_warnings()2124        self._setup_external_case("import bad")2125        f = open(os.path.join(self.dir, "bad.py"), "wb")2126        try:2127            f.write(_b("x\x9c\xcb*\xcd\xcb\x06\x00\x04R\x01\xb9"))2128        finally:2129            f.close()2130        textoutput = self._run_external_case()2131        matches_error = MatchesAny(2132            Contains('\nTypeError: '), Contains('\nSyntaxError: '))2133        self.assertThat(textoutput, matches_error)2134    def test_syntax_error_line_iso_8859_1(self):2135        """Syntax error on a latin-1 line shows the line decoded"""2136        text, raw = self._get_sample_text("iso-8859-1")2137        textoutput = self._setup_external_case("import bad")2138        self._write_module("bad", "iso-8859-1",2139            "# coding: iso-8859-1\n! = 0 # %s\n" % text)2140        textoutput = self._run_external_case()2141        self.assertIn(self._as_output(_u(2142            #'bad.py", line 2\n'2143            '    ! = 0 # %s\n'2144            '    ^\n'2145            'SyntaxError: ') %2146            (text,)), textoutput)2147    def test_syntax_error_line_iso_8859_5(self):2148        """Syntax error on a iso-8859-5 line shows the line decoded"""2149        text, raw = self._get_sample_text("iso-8859-5")2150        textoutput = self._setup_external_case("import bad")2151        self._write_module("bad", "iso-8859-5",2152            "# coding: iso-8859-5\n%% = 0 # %s\n" % text)2153        textoutput = self._run_external_case()2154        self.assertIn(self._as_output(_u(2155            #'bad.py", line 2\n'2156            '    %% = 0 # %s\n'2157            + ' ' * self._error_on_character +2158            '   ^\n'2159            'SyntaxError: ') %2160            (text,)), textoutput)2161    def test_syntax_error_line_euc_jp(self):2162        """Syntax error on a euc_jp line shows the line decoded"""2163        text, raw = self._get_sample_text("euc_jp")2164        textoutput = self._setup_external_case("import bad")2165        self._write_module("bad", "euc_jp",2166            "# coding: euc_jp\n$ = 0 # %s\n" % text)2167        textoutput = self._run_external_case()2168        # pypy uses cpython's multibyte codecs so has their behavior here2169        if self._is_pypy:2170            self._error_on_character = True2171        self.assertIn(self._as_output(_u(2172            #'bad.py", line 2\n'2173            '    $ = 0 # %s\n'2174            + ' ' * self._error_on_character +2175            '   ^\n'2176            'SyntaxError: ') %2177            (text,)), textoutput)2178    def test_syntax_error_line_utf_8(self):2179        """Syntax error on a utf-8 line shows the line decoded"""2180        text, raw = self._get_sample_text("utf-8")2181        textoutput = self._setup_external_case("import bad")2182        self._write_module("bad", "utf-8", _u("\ufeff^ = 0 # %s\n") % text)2183        textoutput = self._run_external_case()2184        self.assertIn(self._as_output(_u(2185            'bad.py", line 1\n'2186            '    ^ = 0 # %s\n'2187            + ' ' * self._error_on_character +2188            '   ^\n'2189            'SyntaxError: ') %2190            text), textoutput)2191class TestNonAsciiResultsWithUnittest(TestNonAsciiResults):2192    """Test that running under unittest produces clean ascii strings"""2193    def _run(self, stream, test):2194        from unittest import TextTestRunner as _Runner2195        return _Runner(stream).run(test)2196    def _as_output(self, text):2197        if str_is_unicode:2198            return text2199        return text.encode("utf-8")2200class TestDetailsToStr(TestCase):2201    def test_no_details(self):2202        string = _details_to_str({})2203        self.assertThat(string, Equals(''))2204    def test_binary_content(self):2205        content = content_from_stream(2206            StringIO('foo'), content_type=ContentType('image', 'jpeg'))2207        string = _details_to_str({'attachment': content})2208        self.assertThat(2209            string, Equals("""\2210Binary content:2211  attachment (image/jpeg)2212"""))2213    def test_single_line_content(self):2214        content = text_content('foo')2215        string = _details_to_str({'attachment': content})2216        self.assertThat(string, Equals('attachment: {{{foo}}}\n'))2217    def test_multi_line_text_content(self):2218        content = text_content('foo\nbar\nbaz')2219        string = _details_to_str({'attachment': content})2220        self.assertThat(string, Equals('attachment: {{{\nfoo\nbar\nbaz\n}}}\n'))2221    def test_special_text_content(self):2222        content = text_content('foo')2223        string = _details_to_str({'attachment': content}, special='attachment')2224        self.assertThat(string, Equals('foo\n'))2225    def test_multiple_text_content(self):2226        string = _details_to_str(2227            {'attachment': text_content('foo\nfoo'),2228             'attachment-1': text_content('bar\nbar')})2229        self.assertThat(2230            string, Equals('attachment: {{{\n'2231                           'foo\n'2232                           'foo\n'2233                           '}}}\n'2234                           '\n'2235                           'attachment-1: {{{\n'2236                           'bar\n'2237                           'bar\n'2238                           '}}}\n'))2239    def test_empty_attachment(self):2240        string = _details_to_str({'attachment': text_content('')})2241        self.assertThat(2242            string, Equals("""\2243Empty attachments:2244  attachment2245"""))2246    def test_lots_of_different_attachments(self):2247        jpg = lambda x: content_from_stream(2248            StringIO(x), ContentType('image', 'jpeg'))2249        attachments = {2250            'attachment': text_content('foo'),2251            'attachment-1': text_content('traceback'),2252            'attachment-2': jpg('pic1'),2253            'attachment-3': text_content('bar'),2254            'attachment-4': text_content(''),2255            'attachment-5': jpg('pic2'),2256            }2257        string = _details_to_str(attachments, special='attachment-1')2258        self.assertThat(2259            string, Equals("""\2260Binary content:2261  attachment-2 (image/jpeg)2262  attachment-5 (image/jpeg)2263Empty attachments:2264  attachment-42265attachment: {{{foo}}}2266attachment-3: {{{bar}}}2267traceback2268"""))2269class TestByTestResultTests(TestCase):2270    def setUp(self):2271        super(TestByTestResultTests, self).setUp()2272        self.log = []2273        self.result = TestByTestResult(self.on_test)2274        now = iter(range(5))2275        self.result._now = lambda: advance_iterator(now)2276    def assertCalled(self, **kwargs):2277        defaults = {2278            'test': self,2279            'tags': set(),2280            'details': None,2281            'start_time': 0,2282            'stop_time': 1,2283            }2284        defaults.update(kwargs)2285        self.assertEqual([defaults], self.log)2286    def on_test(self, **kwargs):2287        self.log.append(kwargs)2288    def test_no_tests_nothing_reported(self):2289        self.result.startTestRun()2290        self.result.stopTestRun()2291        self.assertEqual([], self.log)2292    def test_add_success(self):2293        self.result.startTest(self)2294        self.result.addSuccess(self)2295        self.result.stopTest(self)2296        self.assertCalled(status='success')2297    def test_add_success_details(self):2298        self.result.startTest(self)2299        details = {'foo': 'bar'}2300        self.result.addSuccess(self, details=details)2301        self.result.stopTest(self)2302        self.assertCalled(status='success', details=details)2303    def test_global_tags(self):2304        self.result.tags(['foo'], [])2305        self.result.startTest(self)2306        self.result.addSuccess(self)2307        self.result.stopTest(self)2308        self.assertCalled(status='success', tags=set(['foo']))2309    def test_local_tags(self):2310        self.result.tags(['foo'], [])2311        self.result.startTest(self)2312        self.result.tags(['bar'], [])2313        self.result.addSuccess(self)2314        self.result.stopTest(self)2315        self.assertCalled(status='success', tags=set(['foo', 'bar']))2316    def test_add_error(self):2317        self.result.startTest(self)2318        try:2319            1/02320        except ZeroDivisionError:2321            error = sys.exc_info()2322        self.result.addError(self, error)2323        self.result.stopTest(self)2324        self.assertCalled(2325            status='error',2326            details={'traceback': TracebackContent(error, self)})2327    def test_add_error_details(self):2328        self.result.startTest(self)2329        details = {"foo": text_content("bar")}2330        self.result.addError(self, details=details)2331        self.result.stopTest(self)2332        self.assertCalled(status='error', details=details)2333    def test_add_failure(self):2334        self.result.startTest(self)2335        try:2336            self.fail("intentional failure")2337        except self.failureException:2338            failure = sys.exc_info()2339        self.result.addFailure(self, failure)2340        self.result.stopTest(self)2341        self.assertCalled(2342            status='failure',2343            details={'traceback': TracebackContent(failure, self)})2344    def test_add_failure_details(self):2345        self.result.startTest(self)2346        details = {"foo": text_content("bar")}2347        self.result.addFailure(self, details=details)2348        self.result.stopTest(self)2349        self.assertCalled(status='failure', details=details)2350    def test_add_xfail(self):2351        self.result.startTest(self)2352        try:2353            1/02354        except ZeroDivisionError:2355            error = sys.exc_info()2356        self.result.addExpectedFailure(self, error)2357        self.result.stopTest(self)2358        self.assertCalled(2359            status='xfail',2360            details={'traceback': TracebackContent(error, self)})2361    def test_add_xfail_details(self):2362        self.result.startTest(self)2363        details = {"foo": text_content("bar")}2364        self.result.addExpectedFailure(self, details=details)2365        self.result.stopTest(self)2366        self.assertCalled(status='xfail', details=details)2367    def test_add_unexpected_success(self):2368        self.result.startTest(self)2369        details = {'foo': 'bar'}2370        self.result.addUnexpectedSuccess(self, details=details)2371        self.result.stopTest(self)2372        self.assertCalled(status='success', details=details)2373    def test_add_skip_reason(self):2374        self.result.startTest(self)2375        reason = self.getUniqueString()2376        self.result.addSkip(self, reason)2377        self.result.stopTest(self)2378        self.assertCalled(2379            status='skip', details={'reason': text_content(reason)})2380    def test_add_skip_details(self):2381        self.result.startTest(self)2382        details = {'foo': 'bar'}2383        self.result.addSkip(self, details=details)2384        self.result.stopTest(self)2385        self.assertCalled(status='skip', details=details)2386    def test_twice(self):2387        self.result.startTest(self)2388        self.result.addSuccess(self, details={'foo': 'bar'})2389        self.result.stopTest(self)2390        self.result.startTest(self)2391        self.result.addSuccess(self)2392        self.result.stopTest(self)2393        self.assertEqual(2394            [{'test': self,2395              'status': 'success',2396              'start_time': 0,2397              'stop_time': 1,2398              'tags': set(),2399              'details': {'foo': 'bar'}},2400             {'test': self,2401              'status': 'success',2402              'start_time': 2,2403              'stop_time': 3,2404              'tags': set(),2405              'details': None},2406             ],2407            self.log)2408class TestTagger(TestCase):2409    def test_tags_tests(self):2410        result = ExtendedTestResult()2411        tagger = Tagger(result, set(['foo']), set(['bar']))2412        test1, test2 = self, make_test()2413        tagger.startTest(test1)2414        tagger.addSuccess(test1)2415        tagger.stopTest(test1)2416        tagger.startTest(test2)2417        tagger.addSuccess(test2)2418        tagger.stopTest(test2)2419        self.assertEqual(2420            [('startTest', test1),2421             ('tags', set(['foo']), set(['bar'])),2422             ('addSuccess', test1),2423             ('stopTest', test1),2424             ('startTest', test2),2425             ('tags', set(['foo']), set(['bar'])),2426             ('addSuccess', test2),2427             ('stopTest', test2),2428             ], result._events)2429class TestTimestampingStreamResult(TestCase):2430    def test_startTestRun(self):2431        result = TimestampingStreamResult(LoggingStreamResult())2432        result.startTestRun()2433        self.assertEqual([('startTestRun',)], result.targets[0]._events)2434    def test_stopTestRun(self):2435        result = TimestampingStreamResult(LoggingStreamResult())2436        result.stopTestRun()2437        self.assertEqual([('stopTestRun',)], result.targets[0]._events)2438    def test_status_no_timestamp(self):2439        result = TimestampingStreamResult(LoggingStreamResult())2440        result.status(test_id="A", test_status="B", test_tags="C",2441            runnable="D", file_name="E", file_bytes=b"F", eof=True,2442            mime_type="G", route_code="H")2443        events = result.targets[0]._events2444        self.assertThat(events, HasLength(1))2445        self.assertThat(events[0], HasLength(11))2446        self.assertEqual(...test_junitxml.py
Source:test_junitxml.py  
...43        # When used via subunit2junitxml, startTestRun is called before44        # any tz info in the test stream has been seen.45        # So, we use the earliest reported timestamp as the start time,46        # replacing _test_start if needed.47        self.result.startTestRun() # the time is now.48        # Lose an hour (peeks inside, a little naughty but not very).49        self.result.time(self.result._run_start - datetime.timedelta(0, 3600))50        self.result.stopTestRun()51        self.assertEqual("""<testsuite errors="0" failures="0" name="" tests="0" time="0.000">52</testsuite>53""", self.get_output())54    def test_startTestRun_no_output(self):55        # startTestRun doesn't output anything, because JUnit wants an up-front56        # summary.57        self.result.startTestRun()58        self.assertEqual('', self.get_output())59    def test_stopTestRun_outputs(self):60        # When stopTestRun is called, everything is output.61        self.result.startTestRun()62        self.result.stopTestRun()63        self.assertEqual("""<testsuite errors="0" failures="0" name="" tests="0" time="0.000">64</testsuite>65""", self.get_output())66    def test_test_count(self):67        class Passes(unittest.TestCase):68            def test_me(self):69                pass70        self.result.startTestRun()71        Passes("test_me").run(self.result)72        Passes("test_me").run(self.result)73        self.result.stopTestRun()74        # When tests are run, the number of tests is counted.75        output = self.get_output()76        self.assertTrue('tests="2"' in output)77    def test_test_id_with_parameter(self):78        class Passes(unittest.TestCase):79            def id(self):80                return unittest.TestCase.id(self) + '(version_1.6)'81            def test_me(self):82                pass83        self.result.startTestRun()84        Passes("test_me").run(self.result)85        self.result.stopTestRun()86        output = self.get_output()87        self.assertTrue('Passes" name="test_me(version_1.6)"' in output)88    def test_erroring_test(self):89        class Errors(unittest.TestCase):90            def test_me(self):91                1/092        self.result.startTestRun()93        Errors("test_me").run(self.result)94        self.result.stopTestRun()95        self.assertEqual("""<testsuite errors="1" failures="0" name="" tests="1" time="0.000">96<testcase classname="junitxml.tests.test_junitxml.Errors" name="test_me" time="0.000">97<error type="ZeroDivisionError">error</error>98</testcase>99</testsuite>100""", self.get_output())101    def test_failing_test(self):102        class Fails(unittest.TestCase):103            def test_me(self):104                self.fail()105        self.result.startTestRun()106        Fails("test_me").run(self.result)107        self.result.stopTestRun()108        self.assertEqual("""<testsuite errors="0" failures="1" name="" tests="1" time="0.000">109<testcase classname="junitxml.tests.test_junitxml.Fails" name="test_me" time="0.000">110<failure type="AssertionError">failure</failure>111</testcase>112</testsuite>113""", self.get_output())114    def test_successful_test(self):115        class Passes(unittest.TestCase):116            def test_me(self):117                pass118        self.result.startTestRun()119        Passes("test_me").run(self.result)120        self.result.stopTestRun()121        self.assertEqual("""<testsuite errors="0" failures="0" name="" tests="1" time="0.000">122<testcase classname="junitxml.tests.test_junitxml.Passes" name="test_me" time="0.000"/>123</testsuite>124""", self.get_output())125    def test_skip_test(self):126        class Skips(unittest.TestCase):127            def test_me(self):128                self.skipTest("yo")129        self.result.startTestRun()130        test = Skips("test_me")131        self.run_test_or_simulate(test, 'skipTest', self.result.addSkip, 'yo')132        self.result.stopTestRun()133        output = self.get_output()134        expected = """<testsuite errors="0" failures="0" name="" tests="1" time="0.000">135<testcase classname="junitxml.tests.test_junitxml.Skips" name="test_me" time="0.000">136<skip>yo</skip>137</testcase>138</testsuite>139"""140        self.assertEqual(expected, output)141    def test_unexpected_success_test(self):142        class Succeeds(unittest.TestCase):143            def test_me(self):144                pass145            try:146                test_me = unittest.expectedFailure(test_me)147            except AttributeError:148                pass # Older python - just let the test pass149        self.result.startTestRun()150        Succeeds("test_me").run(self.result)151        self.result.stopTestRun()152        output = self.get_output()153        expected = """<testsuite errors="0" failures="1" name="" tests="1" time="0.000">154<testcase classname="junitxml.tests.test_junitxml.Succeeds" name="test_me" time="0.000">155<failure type="unittest.case._UnexpectedSuccess"/>156</testcase>157</testsuite>158"""159        expected_old = """<testsuite errors="0" failures="0" name="" tests="1" time="0.000">160<testcase classname="junitxml.tests.test_junitxml.Succeeds" name="test_me" time="0.000"/>161</testsuite>162"""163        if output != expected_old:164            self.assertEqual(expected, output)165    def test_expected_failure_test(self):166        expected_failure_support = [True]167        class ExpectedFail(unittest.TestCase):168            def test_me(self):169                self.fail("fail")170            try:171                test_me = unittest.expectedFailure(test_me)172            except AttributeError:173                # Older python - just let the test fail174                expected_failure_support[0] = False175        self.result.startTestRun()176        ExpectedFail("test_me").run(self.result)177        self.result.stopTestRun()178        output = self.get_output()179        expected = """<testsuite errors="0" failures="0" name="" tests="1" time="0.000">180<testcase classname="junitxml.tests.test_junitxml.ExpectedFail" name="test_me" time="0.000"/>181</testsuite>182"""183        expected_old = """<testsuite errors="0" failures="1" name="" tests="1" time="0.000">184<testcase classname="junitxml.tests.test_junitxml.ExpectedFail" name="test_me" time="0.000">185<failure type="AssertionError">failure</failure>186</testcase>187</testsuite>188"""189        if expected_failure_support[0]:190            self.assertEqual(expected, output)191        else:192            self.assertEqual(expected_old, output)193class TestWellFormedXml(unittest.TestCase):194    """XML created should always be well formed even with odd test cases"""195    def _run_and_parse_test(self, case):196        output = StringIO()197        result = junitxml.JUnitXmlResult(output)198        result.startTestRun()199        case.run(result)200        result.stopTestRun()201        return xml.dom.minidom.parseString(output.getvalue())202    def test_failure_with_amp(self):203        """Check the failure element content is escaped"""204        class FailWithAmp(unittest.TestCase):205            def runTest(self):206                self.fail("& should be escaped as &")207        doc = self._run_and_parse_test(FailWithAmp())208        self.assertTrue(209            doc.getElementsByTagName("failure")[0].firstChild.nodeValue210                .endswith("AssertionError: & should be escaped as &\n"))211    def test_quotes_in_test_case_id(self):212        """Check that quotes in an attribute are escaped"""...unit.py
Source:unit.py  
...24            info = callback(test, status)25            if info is not None:26                debug_info.update(info)27        return debug_info28    def startTestRun(self):29        self.logger.suite_start(tests=self.test_list)30    def startTest(self, test):31        self.testsRun += 132        self.logger.test_start(test.id())33    def stopTest(self, test):34        pass35    def stopTestRun(self):36        self.logger.suite_end()37    def addError(self, test, err):38        self.errors.append((test, self._exc_info_to_string(err, test)))39        extra = self.call_callbacks(test, "ERROR")40        self.logger.test_end(test.id(),41                             "ERROR",42                             message=self._exc_info_to_string(err, test),43                             expected="PASS",44                             extra=extra)45    def addFailure(self, test, err):46        extra = self.call_callbacks(test, "ERROR")47        self.logger.test_end(test.id(),48                            "FAIL",49                             message=self._exc_info_to_string(err, test),50                             expected="PASS",51                             extra=extra)52    def addSuccess(self, test):53        self.logger.test_end(test.id(), "PASS", expected="PASS")54    def addExpectedFailure(self, test, err):55        extra = self.call_callbacks(test, "ERROR")56        self.logger.test_end(test.id(),57                            "FAIL",58                             message=self._exc_info_to_string(err, test),59                             expected="FAIL",60                             extra=extra)61    def addUnexpectedSuccess(self, test):62        extra = self.call_callbacks(test, "ERROR")63        self.logger.test_end(test.id(),64                             "PASS",65                             expected="FAIL",66                             extra=extra)67    def addSkip(self, test, reason):68        extra = self.call_callbacks(test, "ERROR")69        self.logger.test_end(test.id(),70                             "SKIP",71                             message=reason,72                             expected="PASS",73                             extra=extra)74class StructuredTestRunner(unittest.TextTestRunner):75    resultclass = StructuredTestResult76    def __init__(self, **kwargs):77        """TestRunner subclass designed for structured logging.78        :params logger: A ``StructuredLogger`` to use for logging the test run.79        :params test_list: An optional list of tests that will be passed along80            the `suite_start` message.81        """82        self.logger = kwargs.pop("logger")83        self.test_list = kwargs.pop("test_list", [])84        self.result_callbacks = kwargs.pop("result_callbacks", [])85        unittest.TextTestRunner.__init__(self, **kwargs)86    def _makeResult(self):87        return self.resultclass(self.stream,88                                self.descriptions,89                                self.verbosity,90                                logger=self.logger,91                                test_list=self.test_list)92    def run(self, test):93        """Run the given test case or test suite."""94        result = self._makeResult()95        result.failfast = self.failfast96        result.buffer = self.buffer97        startTime = time.time()98        startTestRun = getattr(result, 'startTestRun', None)99        if startTestRun is not None:100            startTestRun()101        try:102            test(result)103        finally:104            stopTestRun = getattr(result, 'stopTestRun', None)105            if stopTestRun is not None:106                stopTestRun()107        stopTime = time.time()108        if hasattr(result, 'time_taken'):109            result.time_taken = stopTime - startTime110        run = result.testsRun...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
