How to use _write_next_stream method in stestr

Best Python code snippet using stestr_python

file.py

Source:file.py Github

copy

Full Screen

...47 stream.write('1\n')48 finally:49 stream.close()50 result = Repository(base)51 result._write_next_stream(0)52 return result53 def open(self, url):54 path = os.path.expanduser(url)55 base = os.path.join(path, '.testrepository')56 try:57 stream = open(os.path.join(base, 'format'), 'rt')58 except (IOError, OSError) as e:59 if e.errno == errno.ENOENT:60 raise RepositoryNotFound(url)61 raise62 if '1\n' != stream.read():63 raise ValueError(url)64 return Repository(base)65class Repository(AbstractRepository):66 """Disk based storage of test results.67 68 This repository stores each stream it receives as a file in a directory.69 Indices are then built on top of this basic store.70 71 This particular disk layout is subject to change at any time, as its72 primarily a bootstrapping exercise at this point. Any changes made are73 likely to have an automatic upgrade process.74 """75 def __init__(self, base):76 """Create a file-based repository object for the repo at 'base'.77 :param base: The path to the repository.78 """79 self.base = base80 81 def _allocate(self):82 # XXX: lock the file. K?!83 value = self.count()84 self._write_next_stream(value + 1)85 return value86 def _next_stream(self):87 next_content = open(os.path.join(self.base, 'next-stream'), 'rt').read()88 try:89 return int(next_content)90 except ValueError:91 raise ValueError("Corrupt next-stream file: %r" % next_content)92 def count(self):93 return self._next_stream()94 def latest_id(self):95 result = self._next_stream() - 196 if result < 0:97 raise KeyError("No tests in repository")98 return result99 100 def get_failing(self):101 try:102 run_subunit_content = open(103 os.path.join(self.base, "failing"), 'rb').read()104 except IOError:105 err = sys.exc_info()[1]106 if err.errno == errno.ENOENT:107 run_subunit_content = _b('')108 else:109 raise110 return _DiskRun(None, run_subunit_content)111 def get_test_run(self, run_id):112 try:113 run_subunit_content = open(114 os.path.join(self.base, str(run_id)), 'rb').read()115 except IOError as e:116 if e.errno == errno.ENOENT:117 raise KeyError("No such run.")118 return _DiskRun(run_id, run_subunit_content)119 def _get_inserter(self, partial):120 return _Inserter(self, partial)121 def _get_test_times(self, test_ids):122 # May be too slow, but build and iterate.123 # 'c' because an existing repo may be missing a file.124 db = dbm.open(self._path('times.dbm'), 'c')125 try:126 result = {}127 for test_id in test_ids:128 if type(test_id) != str:129 test_id = test_id.encode('utf8')130 # gdbm does not support get().131 try:132 duration = db[test_id]133 except KeyError:134 duration = None135 if duration is not None:136 result[test_id] = float(duration)137 return result138 finally:139 db.close()140 def _path(self, suffix):141 return os.path.join(self.base, suffix)142 def _write_next_stream(self, value):143 # Note that this is unlocked and not threadsafe : for now, shrug - single144 # user, repo-per-working-tree model makes this acceptable in the short145 # term. Likewise we don't fsync - this data isn't valuable enough to146 # force disk IO.147 prefix = self._path('next-stream')148 stream = open(prefix + '.new', 'wt')149 try:150 stream.write('%d\n' % value)151 finally:152 stream.close()153 atomicish_rename(prefix + '.new', prefix)154class _DiskRun(AbstractTestRun):155 """A test run that was inserted into the repository."""156 def __init__(self, run_id, subunit_content):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run stestr automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful