Best Python code snippet using stestr_python
file.py
Source:file.py  
...32    AbstractTestRun,33    RepositoryNotFound,34    )35from testrepository.utils import timedelta_to_seconds36def atomicish_rename(source, target):37    if os.name != "posix" and os.path.exists(target):38        os.remove(target)39    os.rename(source, target)40class RepositoryFactory(AbstractRepositoryFactory):41    def initialise(klass, url):42        """Create a repository at url/path."""43        base = os.path.join(os.path.expanduser(url), '.testrepository')44        os.mkdir(base)45        stream = open(os.path.join(base, 'format'), 'wt')46        try:47            stream.write('1\n')48        finally:49            stream.close()50        result = Repository(base)51        result._write_next_stream(0)52        return result53    def open(self, url):54        path = os.path.expanduser(url)55        base = os.path.join(path, '.testrepository')56        try:57            stream = open(os.path.join(base, 'format'), 'rt')58        except (IOError, OSError) as e:59            if e.errno == errno.ENOENT:60                raise RepositoryNotFound(url)61            raise62        if '1\n' != stream.read():63            raise ValueError(url)64        return Repository(base)65class Repository(AbstractRepository):66    """Disk based storage of test results.67    68    This repository stores each stream it receives as a file in a directory.69    Indices are then built on top of this basic store.70    71    This particular disk layout is subject to change at any time, as its72    primarily a bootstrapping exercise at this point. Any changes made are73    likely to have an automatic upgrade process.74    """75    def __init__(self, base):76        """Create a file-based repository object for the repo at 'base'.77        :param base: The path to the repository.78        """79        self.base = base80    81    def _allocate(self):82        # XXX: lock the file. K?!83        value = self.count()84        self._write_next_stream(value + 1)85        return value86    def _next_stream(self):87        next_content = open(os.path.join(self.base, 'next-stream'), 'rt').read()88        try:89            return int(next_content)90        except ValueError:91            raise ValueError("Corrupt next-stream file: %r" % next_content)92    def count(self):93        return self._next_stream()94    def latest_id(self):95        result = self._next_stream() - 196        if result < 0:97            raise KeyError("No tests in repository")98        return result99 100    def get_failing(self):101        try:102            run_subunit_content = open(103                os.path.join(self.base, "failing"), 'rb').read()104        except IOError:105            err = sys.exc_info()[1]106            if err.errno == errno.ENOENT:107                run_subunit_content = _b('')108            else:109                raise110        return _DiskRun(None, run_subunit_content)111    def get_test_run(self, run_id):112        try:113            run_subunit_content = open(114                os.path.join(self.base, str(run_id)), 'rb').read()115        except IOError as e:116            if e.errno == errno.ENOENT:117                raise KeyError("No such run.")118        return _DiskRun(run_id, run_subunit_content)119    def _get_inserter(self, partial):120        return _Inserter(self, partial)121    def _get_test_times(self, test_ids):122        # May be too slow, but build and iterate.123        # 'c' because an existing repo may be missing a file.124        db = dbm.open(self._path('times.dbm'), 'c')125        try:126            result = {}127            for test_id in test_ids:128                if type(test_id) != str:129                    test_id = test_id.encode('utf8')130                # gdbm does not support get().131                try:132                    duration = db[test_id]133                except KeyError:134                    duration = None135                if duration is not None:136                    result[test_id] = float(duration)137            return result138        finally:139            db.close()140    def _path(self, suffix):141        return os.path.join(self.base, suffix)142    def _write_next_stream(self, value):143        # Note that this is unlocked and not threadsafe : for now, shrug - single144        # user, repo-per-working-tree model makes this acceptable in the short145        # term. Likewise we don't fsync - this data isn't valuable enough to146        # force disk IO.147        prefix = self._path('next-stream')148        stream = open(prefix + '.new', 'wt')149        try:150            stream.write('%d\n' % value)151        finally:152            stream.close()153        atomicish_rename(prefix + '.new', prefix)154class _DiskRun(AbstractTestRun):155    """A test run that was inserted into the repository."""156    def __init__(self, run_id, subunit_content):157        """Create a _DiskRun with the content subunit_content."""158        self._run_id = run_id159        self._content = subunit_content160        assert type(subunit_content) is bytes161    def get_id(self):162        return self._run_id163    def get_subunit_stream(self):164        # Transcode - we want V2.165        v1_stream = BytesIO(self._content)166        v1_case = subunit.ProtocolTestCase(v1_stream)167        output = BytesIO()168        output_stream = subunit.v2.StreamResultToBytes(output)169        output_stream = testtools.ExtendedToStreamDecorator(output_stream)170        output_stream.startTestRun()171        try:172            v1_case.run(output_stream)173        finally:174            output_stream.stopTestRun()175        output.seek(0)176        return output177    def get_test(self):178        #case = subunit.ProtocolTestCase(self.get_subunit_stream())179        case = subunit.ProtocolTestCase(BytesIO(self._content))180        def wrap_result(result):181            # Wrap in a router to mask out startTestRun/stopTestRun from the182            # ExtendedToStreamDecorator.183            result = testtools.StreamResultRouter(result, do_start_stop_run=False)184            # Wrap that in ExtendedToStreamDecorator to convert v1 calls to185            # StreamResult.186            return testtools.ExtendedToStreamDecorator(result)187        return testtools.DecorateTestCaseResult(188            case, wrap_result, methodcaller('startTestRun'),189            methodcaller('stopTestRun'))190class _SafeInserter(object):191    def __init__(self, repository, partial=False):192        # XXX: Perhaps should factor into a decorator and use an unaltered193        # TestProtocolClient.194        self._repository = repository195        fd, name = tempfile.mkstemp(dir=self._repository.base)196        self.fname = name197        stream = os.fdopen(fd, 'wb')198        self.partial = partial199        # The time take by each test, flushed at the end.200        self._times = {}201        self._test_start = None202        self._time = None203        subunit_client = testtools.StreamToExtendedDecorator(204            TestProtocolClient(stream))205        self.hook = testtools.CopyStreamResult([206            subunit_client,207            testtools.StreamToDict(self._handle_test)])208        self._stream = stream209    def _handle_test(self, test_dict):210        start, stop = test_dict['timestamps']211        if test_dict['status'] == 'exists' or None in (start, stop):212            return213        self._times[test_dict['id']] = str(timedelta_to_seconds(stop - start))214    def startTestRun(self):215        self.hook.startTestRun()216        self._run_id = None217    def stopTestRun(self):218        self.hook.stopTestRun()219        self._stream.flush()220        self._stream.close()221        run_id = self._name()222        final_path = os.path.join(self._repository.base, str(run_id))223        atomicish_rename(self.fname, final_path)224        # May be too slow, but build and iterate.225        db = dbm.open(self._repository._path('times.dbm'), 'c')226        try:227            db_times = {}228            for key, value in self._times.items():229                if type(key) != str:230                    key = key.encode('utf8')231                db_times[key] = value232            if getattr(db, 'update', None):233                db.update(db_times)234            else:235                for key, value in db_times.items():236                    db[key] = value237        finally:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
