How to use atomicish_rename method in stestr

Best Python code snippet using stestr_python

file.py

Source:file.py Github

copy

Full Screen

...32 AbstractTestRun,33 RepositoryNotFound,34 )35from testrepository.utils import timedelta_to_seconds36def atomicish_rename(source, target):37 if os.name != "posix" and os.path.exists(target):38 os.remove(target)39 os.rename(source, target)40class RepositoryFactory(AbstractRepositoryFactory):41 def initialise(klass, url):42 """Create a repository at url/path."""43 base = os.path.join(os.path.expanduser(url), '.testrepository')44 os.mkdir(base)45 stream = open(os.path.join(base, 'format'), 'wt')46 try:47 stream.write('1\n')48 finally:49 stream.close()50 result = Repository(base)51 result._write_next_stream(0)52 return result53 def open(self, url):54 path = os.path.expanduser(url)55 base = os.path.join(path, '.testrepository')56 try:57 stream = open(os.path.join(base, 'format'), 'rt')58 except (IOError, OSError) as e:59 if e.errno == errno.ENOENT:60 raise RepositoryNotFound(url)61 raise62 if '1\n' != stream.read():63 raise ValueError(url)64 return Repository(base)65class Repository(AbstractRepository):66 """Disk based storage of test results.67 68 This repository stores each stream it receives as a file in a directory.69 Indices are then built on top of this basic store.70 71 This particular disk layout is subject to change at any time, as its72 primarily a bootstrapping exercise at this point. Any changes made are73 likely to have an automatic upgrade process.74 """75 def __init__(self, base):76 """Create a file-based repository object for the repo at 'base'.77 :param base: The path to the repository.78 """79 self.base = base80 81 def _allocate(self):82 # XXX: lock the file. K?!83 value = self.count()84 self._write_next_stream(value + 1)85 return value86 def _next_stream(self):87 next_content = open(os.path.join(self.base, 'next-stream'), 'rt').read()88 try:89 return int(next_content)90 except ValueError:91 raise ValueError("Corrupt next-stream file: %r" % next_content)92 def count(self):93 return self._next_stream()94 def latest_id(self):95 result = self._next_stream() - 196 if result < 0:97 raise KeyError("No tests in repository")98 return result99 100 def get_failing(self):101 try:102 run_subunit_content = open(103 os.path.join(self.base, "failing"), 'rb').read()104 except IOError:105 err = sys.exc_info()[1]106 if err.errno == errno.ENOENT:107 run_subunit_content = _b('')108 else:109 raise110 return _DiskRun(None, run_subunit_content)111 def get_test_run(self, run_id):112 try:113 run_subunit_content = open(114 os.path.join(self.base, str(run_id)), 'rb').read()115 except IOError as e:116 if e.errno == errno.ENOENT:117 raise KeyError("No such run.")118 return _DiskRun(run_id, run_subunit_content)119 def _get_inserter(self, partial):120 return _Inserter(self, partial)121 def _get_test_times(self, test_ids):122 # May be too slow, but build and iterate.123 # 'c' because an existing repo may be missing a file.124 db = dbm.open(self._path('times.dbm'), 'c')125 try:126 result = {}127 for test_id in test_ids:128 if type(test_id) != str:129 test_id = test_id.encode('utf8')130 # gdbm does not support get().131 try:132 duration = db[test_id]133 except KeyError:134 duration = None135 if duration is not None:136 result[test_id] = float(duration)137 return result138 finally:139 db.close()140 def _path(self, suffix):141 return os.path.join(self.base, suffix)142 def _write_next_stream(self, value):143 # Note that this is unlocked and not threadsafe : for now, shrug - single144 # user, repo-per-working-tree model makes this acceptable in the short145 # term. Likewise we don't fsync - this data isn't valuable enough to146 # force disk IO.147 prefix = self._path('next-stream')148 stream = open(prefix + '.new', 'wt')149 try:150 stream.write('%d\n' % value)151 finally:152 stream.close()153 atomicish_rename(prefix + '.new', prefix)154class _DiskRun(AbstractTestRun):155 """A test run that was inserted into the repository."""156 def __init__(self, run_id, subunit_content):157 """Create a _DiskRun with the content subunit_content."""158 self._run_id = run_id159 self._content = subunit_content160 assert type(subunit_content) is bytes161 def get_id(self):162 return self._run_id163 def get_subunit_stream(self):164 # Transcode - we want V2.165 v1_stream = BytesIO(self._content)166 v1_case = subunit.ProtocolTestCase(v1_stream)167 output = BytesIO()168 output_stream = subunit.v2.StreamResultToBytes(output)169 output_stream = testtools.ExtendedToStreamDecorator(output_stream)170 output_stream.startTestRun()171 try:172 v1_case.run(output_stream)173 finally:174 output_stream.stopTestRun()175 output.seek(0)176 return output177 def get_test(self):178 #case = subunit.ProtocolTestCase(self.get_subunit_stream())179 case = subunit.ProtocolTestCase(BytesIO(self._content))180 def wrap_result(result):181 # Wrap in a router to mask out startTestRun/stopTestRun from the182 # ExtendedToStreamDecorator.183 result = testtools.StreamResultRouter(result, do_start_stop_run=False)184 # Wrap that in ExtendedToStreamDecorator to convert v1 calls to185 # StreamResult.186 return testtools.ExtendedToStreamDecorator(result)187 return testtools.DecorateTestCaseResult(188 case, wrap_result, methodcaller('startTestRun'),189 methodcaller('stopTestRun'))190class _SafeInserter(object):191 def __init__(self, repository, partial=False):192 # XXX: Perhaps should factor into a decorator and use an unaltered193 # TestProtocolClient.194 self._repository = repository195 fd, name = tempfile.mkstemp(dir=self._repository.base)196 self.fname = name197 stream = os.fdopen(fd, 'wb')198 self.partial = partial199 # The time take by each test, flushed at the end.200 self._times = {}201 self._test_start = None202 self._time = None203 subunit_client = testtools.StreamToExtendedDecorator(204 TestProtocolClient(stream))205 self.hook = testtools.CopyStreamResult([206 subunit_client,207 testtools.StreamToDict(self._handle_test)])208 self._stream = stream209 def _handle_test(self, test_dict):210 start, stop = test_dict['timestamps']211 if test_dict['status'] == 'exists' or None in (start, stop):212 return213 self._times[test_dict['id']] = str(timedelta_to_seconds(stop - start))214 def startTestRun(self):215 self.hook.startTestRun()216 self._run_id = None217 def stopTestRun(self):218 self.hook.stopTestRun()219 self._stream.flush()220 self._stream.close()221 run_id = self._name()222 final_path = os.path.join(self._repository.base, str(run_id))223 atomicish_rename(self.fname, final_path)224 # May be too slow, but build and iterate.225 db = dbm.open(self._repository._path('times.dbm'), 'c')226 try:227 db_times = {}228 for key, value in self._times.items():229 if type(key) != str:230 key = key.encode('utf8')231 db_times[key] = value232 if getattr(db, 'update', None):233 db.update(db_times)234 else:235 for key, value in db_times.items():236 db[key] = value237 finally:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run stestr automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful