Best Python code snippet using fMBT_python
app.py
Source:app.py  
...225        assert code != 500 or key is None, \226            'It is currently not possible to register a 500 internal ' \227            'server error on a per-blueprint level.'228        self.error_handler_spec.setdefault(key, {})[code] = f229    def _check_hook(self, hook):230        if not asyncio.iscoroutinefunction(hook):231            raise Exception('before/after hooks must be coroutine functions.')232    def before_start(self, hook):233        self._check_hook(hook)234        self.before_start_hooks.append(hook)235        return hook236    def after_start(self, hook):237        self._check_hook(hook)238        self.after_start_hooks.append(hook)239        return hook240    def before_stop(self, hook):241        self._check_hook(hook)242        self.before_stop_hooks.append(hook)243        return hook244    def after_stop(self, hook):245        self._check_hook(hook)246        self.after_stop_hooks.append(hook)247        return hook248    def before_request(self, f):249        """Registers a function to run before each request."""250        self.before_request_funcs.setdefault(None, []).append(f)251        return f252    def after_request(self, f):253        """Register a function to be run after each request.  Your function254        must take one parameter, a `Response` object and return255        a new response object.256        """257        self.after_request_funcs.setdefault(None, []).insert(0, f)258        return f259    def teardown_request(self, f):...drs_tree_check.py
Source:drs_tree_check.py  
...55        return self.state == self.STATE_FAIL_FIXABLE56    def check(self, pt):57        try:58            self._state_pass()59            self._check_hook(pt)60        except:61            self._state_exception()62            raise63        return self.has_passed()64    def repair(self, pt):65        try:66            self._repair_hook(pt)67        except:68            self._state_exception()69            raise70        # Recheck after fixing71        self.check(pt)72        return self.has_passed()73    def _check_hook(self, pt):74        # Implement in subclasses75        raise NotImplementedError76    def _repair_hook(self, pt):77        # Implement in subclasses78        raise NotImplementedError79    def _fs_versions(self, pt):80        return set(int(x[1:]) for x in os.listdir(pt.pub_dir) if x[0] == 'v')81    def _all_versions(self, pt):82        return self._fs_versions(pt).union(pt.versions.keys())83        84    def _latest_version(self, pt):85        versions = self._all_versions(pt)86        if versions:87            return max(self._all_versions(pt))88        else:89            raise ValueError('No latest version')90    #-------------------------------------------------------------------------91    # State changes92    def _state_fixable(self, stat, reason=None):93        if self.state != self.STATE_FAIL_UNFIXABLE:94            self.state = self.STATE_FAIL_FIXABLE95        self._add_stat_count(stat)96        if reason is None:97            reason = ''98        log.warning('Fixable failure: %s: %s' % (stat, reason))99    def _state_unfixable(self, stat, reason=None):100        self.state = self.STATE_FAIL_UNFIXABLE101        self._add_stat_count(stat)102        if reason is None:103            reason = ''104        log.warning('Unfixable failure: %s: %s' % (stat, reason))105    def _state_pass(self):106        self.state = self.STATE_PASS107    def _state_exception(self):108        self.state = self.STATE_EXCEPTION109    def _add_stat_count(self, stat):110        try:111            self._repair_stats[stat] += 1112        except KeyError:113            self._repair_stats[stat] = 1114            115class CheckLatest(TreeChecker):116    def __init__(self):117        super(self.__class__, self).__init__()118        self._fix_to = None119    def _check_hook(self, pt):120        latest_dir = op.join(pt.pub_dir, VERSIONING_LATEST_DIR)121        if not op.exists(latest_dir):122            self._state_fixable('latest directory missing or broken')123            return124        try:125            latest_version = self._latest_version(pt)126        except ValueError:127            return128        # Link could be there but invalid129        link = op.join(pt.pub_dir, os.readlink(latest_dir))130        if not op.exists(link):131            self._state_fixable('Latest directory missing', '%s does not exist' % link)132            self._fix_to = latest_version133            return134        link_v = int(op.basename(op.normpath(link))[1:]) # remove leading "v"135        if link_v != latest_version:136            self._state_fixable('Latest directory wrong', '%s should point to v%d' % (link, latest_version))137            self._fix_to = latest_version138            return139    def _repair_hook(self, pt):140        latest_dir = op.join(pt.pub_dir, VERSIONING_LATEST_DIR)141        if op.islink(latest_dir):142            os.remove(latest_dir)143        if self._fix_to:144            latest_link = op.join(pt.pub_dir, 'v%d' % self._fix_to)145            os.symlink(latest_link, latest_dir)146        else:147            pt._deduce_versions()148            pt._do_latest()149class CheckVersionLinks(TreeChecker):150    """151    Check all links in version directories point to real files.152    """153    def __init__(self):154        super(self.__class__, self).__init__()155        self._fix_versions = set()156    def _check_hook(self, pt):157        fdir = op.join(pt.pub_dir, VERSIONING_FILES_DIR)158        if not op.isdir(fdir):159            self._state_unfixable('Files directory %s does not exist' % fdir)160            return161        # find all filesystem versions and versions from the files directory162        # Only check latest version163        if check_latest == True:164            try:165                versions = [self._latest_version(pt)]166            except ValueError:167                return168        else:169            versions = set(self._all_versions(pt))170        171        for version in versions:172            for stat, message in self._scan_version(pt, version):173                self._state_unfixable(stat, message)174                self._fix_versions.add(version)175    def _scan_version(self, pt, version):176        """177        :return: stat, message178        """179        done = []180        for cmd, src, dest in pt._link_commands(version):181            if cmd == pt.CMD_MKDIR:182                if not op.isdir(dest):183                    yield ('Directory missing', '%s does not exist' % dest)184            elif cmd == pt.CMD_LINK:185                if not op.isabs(src):186                    realsrc = op.abspath(op.join(op.dirname(dest), src))187                else:188                    realsrc = src189                if not op.exists(realsrc):190                    self._state_unfixable('File %s source of link %s does not exist' % (realsrc, dest))191                elif not op.exists(dest):192                    yield ('Missing links', 'Link %s does not exist' % dest)193                else:194                    realdest = os.readlink(dest)195                    if not op.isabs(realdest):196                        realdest = op.abspath(op.join(op.dirname(dest), realdest))197                    if realsrc != realdest:198                        yield ('Links to wrong file', 'Link %s does not point to the correct file %s' % (dest, src))199                    drs = pt.drs_tree.drs_fs.filename_to_drs(op.basename(realsrc))200                    done.append(drs)201        # Now scan filesystem for overlapping files202        version_dir = op.join(pt.pub_dir, 'v%d' % version)203        for dirpath, dirnames, filenames in os.walk(version_dir):204            for filename in filenames:205                try:206                    drs = pt.drs_tree.drs_fs.filename_to_drs(filename)207                except TranslationError:208                    continue209                for done_drs in done:210                    if done_drs.variable == drs.variable and drs_dates_overlap(drs, done_drs):211                        if drs == done_drs:212                            continue213                        log.debug('%s overlaps %s' % (drs, done_drs))214                        yield ('Overlapping files in version', 215                                '%s, %s' % (done_drs, drs))216class CheckFilesLinks(TreeChecker):217    """218    Check to make sure the files directory doesn't contain symbolic links.219    This problem is marked unfixable as to do so would dissrupt old versions.220    """221    def _check_hook(self, pt):222        self._links = []223        latest_version = self._latest_version(pt)224        for filepath, link_dir in pt.iter_real_files():225            if check_latest and version != latest_version:226                continue227            if op.islink(filepath):228                self._links.append((filepath, link_dir))229                self._state_unfixable('Links in files dir', 'Path %s is a symbolic link' % filepath)230def repair_version(pt, version):231    """232    An 'all in one' repair function that removes the version directory233    and reconstructs from scratch.234    This function is guaranteed not to delete anything if real files235    are detected in the version directory....prototypes.py
Source:prototypes.py  
...155    def get_setup_action_defaults(self, action, env):156        if env.config.get('trac', 'repository_type') == 'svn':157            repo_dir = env.config.get('trac', 'repository_dir')158            if repo_dir:159                pre = self._check_hook(repo_dir, 'pre-commit')160                post = self._check_hook(repo_dir, 'post-commit')161                return self.arg_map.get((pre, post))162    163    def execute_setup_action(self, action, args, data, log_cb):164        if data['repo_type'] != 'svn':165            print >>sys.stderr, 'Cannot install hook scripts for repository type %s'%data['repo_type']166            return False167        168        pre, post = dict([(v,k) for k, v in self.arg_map.iteritems()]).get(args, (False, False))169        if pre:170            hookf, trachook = self._install_hook(data['repo_dir'], 'pre-commit')171            svnlook = locate('svnlook')172            if os.name == 'nt':173                hookf.write('%s log -t %%2 %%1 > C:\\temp\\svnlog-%%2\n'%svnlook)174                hookf.write('%s %s %s file:"C:\\temp\\svnlook-%%2\n"'%175                            (sys.executable, trachook, data['env'].path))176                hookf.write('IF ERRORLEVEL 1 SET TRAC_CANCEL=YES\n')177                hookf.write('DEL /F C:\\temp\\svnlog-%2\n')178                hookf.write('IF DEFINED TRAC_CANCEL EXIT 1\n')179            else:180                hookf.write('%s %s %s "${%s log -t "$2" "$1"}"\n'%181                            (sys.executable, trachook, data['env'].path, svnlook))182        if post:183            hookf, trachook = self._install_hook(data['repo_dir'], 'post-commit')184            revarg = os.name == 'nt' and '%2' or '$2'185            hookf.write('%s %s -p "%s" -r "%s"\n'%186                        (sys.executable, trachook, data['env'].path, revarg))187        return True188    189    # Internal methods190    def _check_hook(self, path, hook):191        hook_file = os.path.join(path, 'hooks', hook)192        if os.name == 'nt':193            hook_file += '.bat'194        if not os.path.exists(hook_file):195            return False196        for line in open(hook_file):197            line = line.strip()198            if os.name == 'nt':199                if line.startswith('REM') or line.startswith('::'):200                    continue201            else:202                line = line.split('#', 1)[0]203            if 'trac-'+hook+'-hook' in line:204                return True...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
