How to use get_inserter method in stestr

Best Python code snippet using stestr_python

test_repository.py

Source:test_repository.py Github

copy

Full Screen

...133 return analyzer134 def test_can_initialise_with_param(self):135 repo = self.repo_impl.initialise(self.sample_url)136 self.assertIsInstance(repo, repository.AbstractRepository)137 def test_can_get_inserter(self):138 repo = self.repo_impl.initialise(self.sample_url)139 result = repo.get_inserter()140 self.assertNotEqual(None, result)141 def test_insert_stream_smoke(self):142 # We can insert some data into the repository.143 repo = self.repo_impl.initialise(self.sample_url)144 class Case(ResourcedTestCase):145 def method(self):146 pass147 case = Case('method')148 result = repo.get_inserter()149 legacy_result = testtools.ExtendedToStreamDecorator(result)150 legacy_result.startTestRun()151 case.run(legacy_result)152 legacy_result.stopTestRun()153 self.assertEqual(1, repo.count())154 self.assertNotEqual(None, result.get_id())155 def test_open(self):156 self.repo_impl.initialise(self.sample_url)157 self.repo_impl.open(self.sample_url)158 def test_open_non_existent(self):159 url = 'doesntexistatall'160 self.assertThat(lambda: self.repo_impl.open(url),161 raises(repository.RepositoryNotFound(url)))162 def test_inserting_creates_id(self):163 # When inserting a stream, an id is returned from stopTestRun.164 # Note that this is no longer recommended - but kept for compatibility.165 repo = self.repo_impl.initialise(self.sample_url)166 result = repo.get_inserter()167 result.startTestRun()168 self.assertNotEqual(None, result.stopTestRun())169 def test_count(self):170 repo = self.repo_impl.initialise(self.sample_url)171 self.assertEqual(0, repo.count())172 result = repo.get_inserter()173 result.startTestRun()174 result.stopTestRun()175 self.assertEqual(1, repo.count())176 result = repo.get_inserter()177 result.startTestRun()178 result.stopTestRun()179 self.assertEqual(2, repo.count())180 def test_latest_id_empty(self):181 repo = self.repo_impl.initialise(self.sample_url)182 self.assertThat(repo.latest_id,183 raises(KeyError("No tests in repository")))184 def test_latest_id_nonempty(self):185 repo = self.repo_impl.initialise(self.sample_url)186 result = repo.get_inserter()187 result.startTestRun()188 result.stopTestRun()189 inserted = result.get_id()190 self.assertEqual(inserted, repo.latest_id())191 def test_get_failing_empty(self):192 # repositories can return a TestRun with just latest failures in it.193 repo = self.repo_impl.initialise(self.sample_url)194 analyzed = self.get_failing(repo)195 self.assertEqual(0, analyzed.testsRun)196 def test_get_failing_one_run(self):197 # repositories can return a TestRun with just latest failures in it.198 repo = self.repo_impl.initialise(self.sample_url)199 result = repo.get_inserter()200 legacy_result = testtools.ExtendedToStreamDecorator(result)201 legacy_result.startTestRun()202 make_test('passing', True).run(legacy_result)203 make_test('failing', False).run(legacy_result)204 legacy_result.stopTestRun()205 analyzed = self.get_failing(repo)206 self.assertEqual(1, analyzed.testsRun)207 self.assertEqual(1, len(analyzed.errors))208 self.assertEqual('failing', analyzed.errors[0][0].id())209 def test_unexpected_success(self):210 # Unexpected successes get forwarded too. (Test added because of a211 # NameError in memory repo).212 repo = self.repo_impl.initialise(self.sample_url)213 result = repo.get_inserter()214 legacy_result = testtools.ExtendedToStreamDecorator(result)215 legacy_result.startTestRun()216 test = clone_test_with_new_id(Case('unexpected_success'), 'unexpected_success')217 test.run(legacy_result)218 legacy_result.stopTestRun()219 analyzed = self.get_last_run(repo)220 self.assertEqual(1, analyzed.testsRun)221 self.assertEqual(1, len(analyzed.unexpectedSuccesses))222 self.assertEqual('unexpected_success', analyzed.unexpectedSuccesses[0].id())223 def test_get_failing_complete_runs_delete_missing_failures(self):224 # failures from complete runs replace all failures.225 repo = self.repo_impl.initialise(self.sample_url)226 result = repo.get_inserter()227 legacy_result = testtools.ExtendedToStreamDecorator(result)228 legacy_result.startTestRun()229 make_test('passing', True).run(legacy_result)230 make_test('failing', False).run(legacy_result)231 make_test('missing', False).run(legacy_result)232 legacy_result.stopTestRun()233 result = repo.get_inserter()234 legacy_result = testtools.ExtendedToStreamDecorator(result)235 legacy_result.startTestRun()236 make_test('passing', False).run(legacy_result)237 make_test('failing', True).run(legacy_result)238 legacy_result.stopTestRun()239 analyzed = self.get_failing(repo)240 self.assertEqual(1, analyzed.testsRun)241 self.assertEqual(1, len(analyzed.errors))242 self.assertEqual('passing', analyzed.errors[0][0].id())243 def test_get_failing_partial_runs_preserve_missing_failures(self):244 # failures from two runs add to existing failures, and successes remove245 # from them.246 repo = self.repo_impl.initialise(self.sample_url)247 result = repo.get_inserter()248 legacy_result = testtools.ExtendedToStreamDecorator(result)249 legacy_result.startTestRun()250 make_test('passing', True).run(legacy_result)251 make_test('failing', False).run(legacy_result)252 make_test('missing', False).run(legacy_result)253 legacy_result.stopTestRun()254 result = repo.get_inserter(partial=True)255 legacy_result = testtools.ExtendedToStreamDecorator(result)256 legacy_result.startTestRun()257 make_test('passing', False).run(legacy_result)258 make_test('failing', True).run(legacy_result)259 legacy_result.stopTestRun()260 analyzed = self.get_failing(repo)261 self.assertEqual(2, analyzed.testsRun)262 self.assertEqual(2, len(analyzed.errors))263 self.assertEqual(set(['passing', 'missing']),264 set([test[0].id() for test in analyzed.errors]))265 def test_get_test_run_missing_keyerror(self):266 repo = self.repo_impl.initialise(self.sample_url)267 result = repo.get_inserter()268 result.startTestRun()269 result.stopTestRun()270 inserted = result.get_id()271 self.assertThat(lambda:repo.get_test_run(inserted - 1),272 raises(KeyError))273 def test_get_test_run(self):274 repo = self.repo_impl.initialise(self.sample_url)275 result = repo.get_inserter()276 result.startTestRun()277 inserted = result.stopTestRun()278 run = repo.get_test_run(inserted)279 self.assertNotEqual(None, run)280 def test_get_latest_run(self):281 repo = self.repo_impl.initialise(self.sample_url)282 result = repo.get_inserter()283 result.startTestRun()284 inserted = result.stopTestRun()285 run = repo.get_latest_run()286 self.assertEqual(inserted, run.get_id())287 def test_get_latest_run_empty_repo(self):288 repo = self.repo_impl.initialise(self.sample_url)289 self.assertRaises(KeyError, repo.get_latest_run)290 def test_get_test_run_get_id(self):291 repo = self.repo_impl.initialise(self.sample_url)292 result = repo.get_inserter()293 result.startTestRun()294 inserted = result.stopTestRun()295 run = repo.get_test_run(inserted)296 self.assertEqual(inserted, run.get_id())297 def test_get_test_run_preserves_time(self):298 self.skip('Fix me before releasing.')299 # The test run outputs the time events that it received.300 now = datetime(2001, 1, 1, 0, 0, 0, tzinfo=iso8601.Utc())301 second = timedelta(seconds=1)302 repo = self.repo_impl.initialise(self.sample_url)303 test_id = self.getUniqueString()304 test = make_test(test_id, True)305 result = repo.get_inserter()306 result.startTestRun()307 result.status(timestamp=now, test_id=test_id, test_status='inprogress')308 result.status(timestamp=(now + 1 * second), test_id=test_id, test_status='success')309 inserted = result.stopTestRun()310 run = repo.get_test_run(inserted)311 result = ExtendedTestResult()312 run.get_test().run(result)313 self.assertEqual(314 [('time', now),315 ('tags', set(), set()),316 ('startTest', Wildcard),317 ('time', now + 1 * second),318 ('addSuccess', Wildcard),319 ('stopTest', Wildcard),320 ('tags', set(), set()),321 ],322 result._events)323 def test_get_failing_get_id(self):324 repo = self.repo_impl.initialise(self.sample_url)325 result = repo.get_inserter()326 result.startTestRun()327 result.stopTestRun()328 run = repo.get_failing()329 self.assertEqual(None, run.get_id())330 def test_get_failing_get_subunit_stream(self):331 repo = self.repo_impl.initialise(self.sample_url)332 result = repo.get_inserter()333 legacy_result = testtools.ExtendedToStreamDecorator(result)334 legacy_result.startTestRun()335 make_test('testrepository.tests.test_repository.Case.method', False).run(legacy_result)336 legacy_result.stopTestRun()337 run = repo.get_failing()338 as_subunit = run.get_subunit_stream()339 stream = v2.ByteStreamToStreamResult(as_subunit)340 log = StreamResult()341 log.startTestRun()342 try:343 stream.run(log)344 finally:345 log.stopTestRun()346 self.assertEqual(347 log._events, [348 ('startTestRun',),349 ('status',350 'testrepository.tests.test_repository.Case.method',351 'inprogress',352 None,353 True,354 None,355 None,356 False,357 None,358 None,359 Wildcard),360 ('status',361 'testrepository.tests.test_repository.Case.method',362 None,363 None,364 True,365 'traceback',366 Wildcard,367 True,368 Wildcard,369 None,370 Wildcard),371 ('status',372 'testrepository.tests.test_repository.Case.method',373 'fail',374 None,375 True,376 None,377 None,378 False,379 None,380 None,381 Wildcard),382 ('stopTestRun',)383 ])384 def test_get_subunit_from_test_run(self):385 repo = self.repo_impl.initialise(self.sample_url)386 result = repo.get_inserter()387 legacy_result = testtools.ExtendedToStreamDecorator(result)388 legacy_result.startTestRun()389 make_test('testrepository.tests.test_repository.Case.method', True).run(legacy_result)390 legacy_result.stopTestRun()391 inserted = result.get_id()392 run = repo.get_test_run(inserted)393 as_subunit = run.get_subunit_stream()394 stream = v2.ByteStreamToStreamResult(as_subunit)395 log = StreamResult()396 log.startTestRun()397 try:398 stream.run(log)399 finally:400 log.stopTestRun()401 self.assertEqual(402 log._events,403 [404 ('startTestRun',),405 ('status',406 'testrepository.tests.test_repository.Case.method',407 'inprogress',408 None,409 True,410 None,411 None,412 False,413 None,414 None,415 Wildcard),416 ('status',417 'testrepository.tests.test_repository.Case.method',418 'success',419 None,420 True,421 None,422 None,423 False,424 None,425 None,426 Wildcard),427 ('stopTestRun',)428 ])429 def test_get_test_from_test_run(self):430 repo = self.repo_impl.initialise(self.sample_url)431 result = repo.get_inserter()432 legacy_result = testtools.ExtendedToStreamDecorator(result)433 legacy_result.startTestRun()434 make_test('testrepository.tests.test_repository.Case.method', True).run(legacy_result)435 legacy_result.stopTestRun()436 inserted = result.get_id()437 run = repo.get_test_run(inserted)438 test = run.get_test()439 result = testtools.StreamSummary()440 result.startTestRun()441 try:442 test.run(result)443 finally:444 result.stopTestRun()445 self.assertEqual(1, result.testsRun)446 def test_get_times_unknown_tests_are_unknown(self):447 repo = self.repo_impl.initialise(self.sample_url)448 test_ids = set(['foo', 'bar'])449 self.assertEqual(test_ids, repo.get_test_times(test_ids)['unknown'])450 def test_inserted_test_times_known(self):451 repo = self.repo_impl.initialise(self.sample_url)452 result = repo.get_inserter()453 legacy_result = testtools.ExtendedToStreamDecorator(result)454 legacy_result.startTestRun()455 test_name = 'testrepository.tests.test_repository.Case.method'456 run_timed(test_name, 0.1, legacy_result)457 legacy_result.stopTestRun()458 self.assertEqual({test_name: 0.1},459 repo.get_test_times([test_name])['known'])460 def test_inserted_exists_no_impact_on_test_times(self):461 repo = self.repo_impl.initialise(self.sample_url)462 result = repo.get_inserter()463 legacy_result = testtools.ExtendedToStreamDecorator(result)464 legacy_result.startTestRun()465 test_name = 'testrepository.tests.test_repository.Case.method'466 run_timed(test_name, 0.1, legacy_result)467 legacy_result.stopTestRun()468 result = repo.get_inserter()469 result.startTestRun()470 test_name = 'testrepository.tests.test_repository.Case.method'471 run_timed(test_name, 0.2, result, True)472 result.stopTestRun()473 self.assertEqual({test_name: 0.1},474 repo.get_test_times([test_name])['known'])475 def test_get_test_ids(self):476 repo = self.repo_impl.initialise(self.sample_url)477 inserter = repo.get_inserter()478 legacy_result = testtools.ExtendedToStreamDecorator(inserter)479 legacy_result.startTestRun()480 test_cases = [PlaceHolder(self.getUniqueString()) for r in range(5)]481 for test_case in test_cases:482 test_case.run(legacy_result)483 legacy_result.stopTestRun()484 run_id = inserter.get_id()485 self.assertEqual(run_id, repo.latest_id())486 returned_ids = repo.get_test_ids(run_id)...

Full Screen

Full Screen

test_file.py

Source:test_file.py Github

copy

Full Screen

...87 repo = file.RepositoryFactory().initialise(short_path)88 self.assertTrue(os.path.exists(repo.base))89 def test_inserter_output_path(self):90 repo = self.useFixture(FileRepositoryFixture()).repo91 inserter = repo.get_inserter()92 inserter.startTestRun()93 inserter.stopTestRun()94 self.assertTrue(os.path.exists(os.path.join(repo.base, '0')))95 def test_inserting_creates_id(self):96 # When inserting a stream, an id is returned from stopTestRun.97 repo = self.useFixture(FileRepositoryFixture()).repo98 result = repo.get_inserter()99 result.startTestRun()100 result.stopTestRun()101 self.assertEqual(0, result.get_id())102 # Skip if windows since ~ in a path doesn't work there103 @testtools.skipIf(os.name == 'nt', "Windows doesn't support '~' expand")104 def test_open_expands_user_directory(self):105 short_path = self.useFixture(HomeDirTempDir()).short_path106 repo1 = file.RepositoryFactory().initialise(short_path)107 repo2 = file.RepositoryFactory().open(short_path)108 self.assertEqual(repo1.base, repo2.base)109 def test_next_stream_corruption_error(self):110 repo = self.useFixture(FileRepositoryFixture()).repo111 open(os.path.join(repo.base, 'next-stream'), 'wb').close()112 self.assertThat(repo.count, matchers.Raises(113 matchers.MatchesException(114 ValueError("Corrupt next-stream file: ''"))))115 # Skip if windows since chmod doesn't work there116 @testtools.skipIf(os.name == 'nt', "Windows doesn't support chmod")117 def test_get_test_run_unexpected_ioerror_errno(self):118 repo = self.useFixture(FileRepositoryFixture()).repo119 inserter = repo.get_inserter()120 inserter.startTestRun()121 inserter.stopTestRun()122 self.assertTrue(os.path.isfile(os.path.join(repo.base, '0')))123 os.chmod(os.path.join(repo.base, '0'), 0000)124 self.assertRaises(IOError, repo.get_test_run, '0')125 def test_get_metadata(self):126 repo = self.useFixture(FileRepositoryFixture()).repo127 result = repo.get_inserter(metadata='fun')128 result.startTestRun()129 result.stopTestRun()130 run = repo.get_test_run(result.get_id())131 self.assertEqual(b'fun', run.get_metadata())132 def test_find_metadata(self):133 repo = self.useFixture(FileRepositoryFixture()).repo134 result = repo.get_inserter(metadata='fun')135 result.startTestRun()136 result.stopTestRun()137 result_bad = repo.get_inserter(metadata='not_fun')138 result_bad.startTestRun()139 result_bad.stopTestRun()140 run_ids = repo.find_metadata(b'fun')141 run_ids_int = [int(x) for x in run_ids]142 self.assertIn(result.get_id(), run_ids_int)143 self.assertNotIn(result_bad.get_id(), run_ids_int)144 def test_get_run_ids(self):145 repo = self.useFixture(FileRepositoryFixture()).repo146 result = repo.get_inserter(metadata='fun')147 result.startTestRun()148 result.stopTestRun()149 result_bad = repo.get_inserter(metadata='not_fun')150 result_bad.startTestRun()151 result_bad.stopTestRun()152 run_ids = repo.get_run_ids()153 self.assertEqual(['0', '1'], run_ids)154 def test_get_run_ids_empty(self):155 repo = self.useFixture(FileRepositoryFixture()).repo156 run_ids = repo.get_run_ids()157 self.assertEqual([], run_ids)158 def test_get_run_ids_with_hole(self):159 repo = self.useFixture(FileRepositoryFixture()).repo160 result = repo.get_inserter()161 result.startTestRun()162 result.stopTestRun()163 result = repo.get_inserter()164 result.startTestRun()165 result.stopTestRun()166 result = repo.get_inserter()167 result.startTestRun()168 result.stopTestRun()169 repo.remove_run_id('1')170 self.assertEqual(['0', '2'], repo.get_run_ids())171 def test_remove_ids(self):172 repo = self.useFixture(FileRepositoryFixture()).repo173 result = repo.get_inserter()174 result.startTestRun()175 result.stopTestRun()176 repo.remove_run_id('0')177 self.assertEqual([], repo.get_run_ids())178 def test_remove_ids_id_not_in_repo(self):179 repo = self.useFixture(FileRepositoryFixture()).repo180 result = repo.get_inserter()181 result.startTestRun()182 result.stopTestRun()...

Full Screen

Full Screen

genome_reconstruction.py

Source:genome_reconstruction.py Github

copy

Full Screen

1from MSV import *2import tempfile3#4# there is a ppt file that describes the example used in this test5#6def insert_calls(db_conn, dataset_name):7 JumpRunTable(db_conn)8 SvCallerRunTable(db_conn)9 get_inserter = GetCallInserter(ParameterSetManager(), db_conn, "simulated_sv",10 "the sv's that were simulated", -1)11 pool = PoolContainer(1, dataset_name)12 sv_inserter = get_inserter.execute(pool)13 # deletion14 sv_inserter.insert(SvCall(4, 7, 0, 0, True, True, 1000)) # a15 # inversion16 sv_inserter.insert(SvCall(9, 14, 0, 0, True, False, 1000)) # b17 sv_inserter.insert(SvCall(10, 15, 0, 0, False, True, 1000)) # c18 # insertion19 insertion = SvCall(16, 17, 0, 0, True, True, 1000)20 insertion.inserted_sequence = NucSeq("TGTT")21 sv_inserter.insert(insertion) # d22 # translocation23 sv_inserter.insert(SvCall(0, 19, 0, 0, True, True, 1000)) # e24 sv_inserter.insert(SvCall(1, 19, 0, 0, False, False, 1000)) # f25 sv_inserter.insert(SvCall(18, 20, 0, 0, True, True, 1000)) # g26 sv_inserter.close(pool)27 return get_inserter.cpp_module.id28def get_reference():29 reference = Pack()30 reference.append("chr1", "chr1-desc", NucSeq("GATCGTATC"))31 reference.append("chr2", "chr2-desc", NucSeq("CTCGTCAACAG"))32 return reference33if __name__ == "__main__":34 db_conn = DbConn({"SCHEMA": {"NAME": "tmp_2", "FLAGS": ["DROP_ON_CLOSURE"]}})35 run_id = insert_calls(db_conn, "tmp_2")36 reference = get_reference()37 expected_sequence = "GGATCGTCCGACGAAATGTTCA"38 reconstr = reconstruct_sequenced_genome(reference, run_id)39 if str(reconstr.extract_forward_strand_n()) != expected_sequence:40 print("original sequence ", reference.extract_forward_strand_n())41 print("expected sequence ", expected_sequence)42 print("reconstructed sequence", reconstr.extract_forward_strand_n())43 for i, l in enumerate(reconstr.contigLengths()):44 print("contig", i,"length =", l)45 assert str(reconstr.extract_forward_strand_n()) == expected_sequence46 if [*reconstr.contigLengths()] != [8, 14]:47 print("contig lengths don't match; expects 8 14, got ", *reconstr.contigLengths())...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run stestr automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful