Best Python code snippet using autotest_python
static.py
Source:static.py  
...66        print '<br>in the static call'67        if environ['REQUEST_METHOD'] not in ('GET', 'HEAD'):68            return self.method_not_allowed(environ, start_response)69        path_info = environ.get('PATH_INFO', '')70        full_path = self._full_path(path_info)71        # guard against arbitrary file retrieval72        if not (path.abspath(full_path + '/'))\73               .startswith(path.abspath(self.root + '/')):74            return self.not_found(environ, start_response)75        if path.isdir(full_path):76            if full_path[-1] <> '/' or full_path == self.root:77                location = util.request_uri(environ, include_query=False) + '/'78                if environ.get('QUERY_STRING'):79                    location += '?' + environ.get('QUERY_STRING')80                headers = [('Location', location)]81                return self.moved_permanently(environ, start_response, headers)82            else:83                full_path = self._full_path(path_info + self.index_file)84        content_type = self._guess_type(full_path)85        try:86            etag, last_modified = self._conditions(full_path, environ)87            headers = [('Date', rfc822.formatdate(time.time())),88                       ('Last-Modified', last_modified),89                       ('ETag', etag)]90            if_modified = environ.get('HTTP_IF_MODIFIED_SINCE')91            if if_modified and (rfc822.parsedate(if_modified)92                                >= rfc822.parsedate(last_modified)):93                return self.not_modified(environ, start_response, headers)94            if_none = environ.get('HTTP_IF_NONE_MATCH')95            if if_none and (if_none == '*' or etag in if_none):96                return self.not_modified(environ, start_response, headers)97            file_like = self._file_like(full_path)98            headers.append(('Content-Type', content_type))99            start_response("200 OK", headers)100            if environ['REQUEST_METHOD'] == 'GET':101                return self._body(full_path, environ, file_like)102            else:103                return ['']104        except (IOError, OSError), e:105            return self.not_found(environ, start_response)106    def _full_path(self, path_info):107        """Return the full path from which to read."""108        return self.root + path_info109    def _guess_type(self, full_path):110        """Guess the mime type using the mimetypes module."""111        return mimetypes.guess_type(full_path)[0] or 'text/plain'112    def _conditions(self, full_path, environ):113        """Return a tuple of etag, last_modified by mtime from stat."""114        mtime = stat(full_path).st_mtime115        return str(mtime), rfc822.formatdate(mtime)116    def _file_like(self, full_path):117        """Return the appropriate file object."""118        return open(full_path, 'rb')119    def _body(self, full_path, environ, file_like):120        """Return an iterator over the body of the response."""121        way_to_send = environ.get('wsgi.file_wrapper', iter_and_close)122        return way_to_send(file_like, self.block_size)123def iter_and_close(file_like, block_size):124    """Yield file contents by block then close the file."""125    while 1:126        try:127            block = file_like.read(block_size)128            if block: yield block129            else: raise StopIteration130        except StopIteration, si:131            file_like.close()132            return133def cling_wrap(package_name, dir_name, **kw):134    """Return a Cling that serves from the given package and dir_name.135    This uses pkg_resources.resource_filename which is not the136    recommended way, since it extracts the files.137    I think this works fine unless you have some _very_ serious138    requirements for static content, in which case you probably139    shouldn't be serving it through a WSGI app, IMHO. YMMV.140    """141    resource = Requirement.parse(package_name)142    return Cling(resource_filename(resource, dir_name), **kw)143class Shock(Cling):144    """A stupidly simple way to serve up mixed content.145    Serves static content just like Cling (it's superclass)146    except that it process content with the first matching147    magic from self.magics if any apply.148    See Cling and classes with "Magic" in their names in this module.149    If you are using Shock with the StringMagic class for instance:150    shock = Shock('/data', magics=[StringMagic(food='cheese')])151    Let's say you have a file called /data/foo.txt.stp containing one line:152    "I love to eat $food!"153    When you do a GET on /foo.txt you will see this in your browser:154    "I love to eat cheese!"155    This is really nice if you have a color variable in your css files or156    something trivial like that. It seems silly to create or change a157    handful of objects for a couple of dynamic bits of text.158    """159    magics = ()160    def _match_magic(self, full_path):161        """Return the first magic that matches this path or None."""162        for magic in self.magics:163            if magic.matches(full_path):164                return magic165    def _full_path(self, path_info):166        """Return the full path from which to read."""167        full_path = self.root + path_info168        if path.exists(full_path):169            return full_path170        else:171            for magic in self.magics:172                if path.exists(magic.new_path(full_path)):173                    return magic.new_path(full_path)174            else:175                return full_path176    def _guess_type(self, full_path):177        """Guess the mime type magically or using the mimetypes module."""178        magic = self._match_magic(full_path)179        if magic is not None:...versionfs.py
Source:versionfs.py  
...4243    # Helpers44    # =======4546    def _full_path(self, partial):47        if partial.startswith("/"):48            partial = partial[1:]49        path = os.path.join(self.root, partial)50        return path5152    def _get_versioned_path(self, filename, version):53        return '.'.join([54                filename, 55                #self.version_midfix,56                str(version)]57                )5859    def _get_nonversioned_filename(self, filename):60        match = nonversioned_re.match(filename)61        return match[1] if match else ''6263    # Filesystem methods64    # ==================6566    def access(self, path, mode):67        print ("access:", path, mode)68        full_path = self._full_path(path)69        if not os.access(full_path, mode):70            raise FuseOSError(errno.EACCES)7172    def chmod(self, path, mode):73        print ("chmod:", path, mode)74        full_path = self._full_path(path)75        return os.chmod(full_path, mode)7677    def chown(self, path, uid, gid):78        print ("chown:", path, uid, gid)79        full_path = self._full_path(path)80        return os.chown(full_path, uid, gid)8182    def getattr(self, path, fh=None):83        print ("getattr:", path)84        full_path = self._full_path(path)85        st = os.lstat(full_path)86        return dict((key, getattr(st, key)) for key in ('st_atime', 'st_ctime',87                     'st_gid', 'st_mode', 'st_mtime', 'st_nlink', 'st_size', 'st_uid'))8889    def readdir(self, path, fh):90        print ("readdir:", path)91        full_path = self._full_path(path)9293        dirents = ['.', '..']94        if os.path.isdir(full_path):95            dirs = list(filter(lambda x: is_versioned(x) < 0, os.listdir(full_path)))96            dirents.extend(dirs)  # type: ignore97        for r in dirents:98            yield r99100    def readlink(self, path):101        print ("readlink:", path)102        pathname = os.readlink(self._full_path(path))103        if pathname.startswith("/"):104            # Path name is absolute, sanitize it.105            return os.path.relpath(pathname, self.root)106        else:107            return pathname108109    def mknod(self, path, mode, dev):110        print ("mknod:", path, mode, dev)111        return os.mknod(self._full_path(path), mode, dev)112113    def rmdir(self, path):114        print ("rmdir:", path)115        full_path = self._full_path(path)116        return os.rmdir(full_path)117118    def mkdir(self, path, mode):119        print ("mkdir:", path, mode)120        return os.mkdir(self._full_path(path), mode)121122    def statfs(self, path):123        print ("statfs:", path)124        full_path = self._full_path(path)125        stv = os.statvfs(full_path)126        return dict((key, getattr(stv, key)) for key in ('f_bavail', 'f_bfree',127            'f_blocks', 'f_bsize', 'f_favail', 'f_ffree', 'f_files', 'f_flag',128            'f_frsize', 'f_namemax'))129130    def unlink(self, path):131        print ("unlink:", path)132        full_path = self._full_path(path)133        if os.path.basename(path)[0] != '.':134            self.remove_old_versions(full_path) # remove history files if the original file's removed.135        return os.unlink(full_path)136137    def symlink(self, name, target):138        print ("symlink:", name, target)139        return os.symlink(target, self._full_path(name))140141    def rename(self, old, new):142        print ("rename:", old, new)143        return os.rename(self._full_path(old), self._full_path(new))144145    def link(self, target, name):146        print ("link:", target, name)147        return os.link(self._full_path(name), self._full_path(target))148149    def utimens(self, path, times=None):150        print ("utimens:", path, times)151        return os.utime(self._full_path(path), times)152153    # File methods154    # ============155156    def open(self, path, flags):157        print ('** open:', path, '**')158        full_path = self._full_path(path)159        return os.open(full_path, flags)160161    def create(self, path, mode, fi=None):162        print ('** create:', path, '**')163        full_path = self._full_path(path)164        return os.open(full_path, os.O_WRONLY | os.O_CREAT, mode)165166    def read(self, path, length, offset, fh):167        print ('** read:', path, '**')168        os.lseek(fh, offset, os.SEEK_SET)169        return os.read(fh, length)170171    def write(self, path, buf, offset, fh):172        print ('** write:', path, '**')173        os.lseek(fh, offset, os.SEEK_SET)174175        self.is_move[path] = False176177        if os.path.basename(path)[0] == '.':178            return os.write(fh, buf) # skip version system for hidden files179180        res = os.write(fh, buf)181182        self.versionlog(path) # versioning183184        return res  185186    def truncate(self, path, length, fh=None):187        print ('** truncate:', path, '**')188        full_path = self._full_path(path)189        with open(full_path, 'r+') as f:190            f.truncate(length)191192    def flush(self, path, fh):193        print ('** flush', path, '**')194        if (os.path.basename(path)[0] != '.') and (path not in self.is_move):195            self.is_move[path] = True196        return os.fsync(fh)197198    def release(self, path, fh):199        print ('** release', path, '**')200        if os.path.basename(path)[0] != '.':201            not_wroten = self.is_move[path]202            del self.is_move[path]203            if not_wroten: # file flushed but not written204                self.versionlog(path) # versioning205        return os.close(fh)206207    def fsync(self, path, fdatasync, fh):208        print ('** fsync:', path, '**')209        return self.flush(path, fh)210    211    # Version Methods212    # ===========================213    def versionlog(self, path):214        full_path = self._full_path(path)215        n = self.append_version(path)216        if n > -1:217            print("created {}-th version for {}".format(n + 1 if n < MAX_VERSIONS else n, path))218            shutil.copy2(full_path, self._full_path(self._get_versioned_path(path, 1)))219        else:220            print('file has no modification.')221        222223    def remove_old_versions(self, path):224        arr = self.get_all_version(path)225        for e in arr:226            os.remove(self._get_versioned_path(path, e))227228    def get_all_version(self, path):229        dirents = []230        if os.path.isdir(self.root):231           dirents.extend(os.listdir(self.root))232233        filename = os.path.basename(path)234235        dirents = list(filter(lambda x: self._get_nonversioned_filename(x) == filename, dirents))236        return list(filter(lambda x: x > -1, map(is_versioned, dirents)))237238    def append_version(self, filename):239        arr = self.get_all_version(filename)240        n = len(arr)241        if n and filecmp.cmp(self._full_path(self._get_versioned_path(filename, 1)), self._full_path(filename)):242            return -1 # same, create no version log243        self.shift_version(filename, arr)244        return n245246    def shift_version(self, filename, versions):247        versions.sort(reverse=True)248        path = self._full_path(filename)249        if len(versions) == MAX_VERSIONS:250            os.remove(self._get_versioned_path(path, MAX_VERSIONS))251            del versions[0]252        for version in versions:253            old_path = self._get_versioned_path(path, version)254            os.rename(old_path, self._get_versioned_path(path, version + 1))255    256def main(mountpoint):257    FUSE(VersionFS(), mountpoint, nothreads=True, foreground=True)258259if __name__ == '__main__':260    # logging.basicConfig(level=logging.DEBUG)261262    if len(sys.argv) > 1:
...render_jsonnet.py
Source:render_jsonnet.py  
1import json2import logging3import os4import os.path5import re6import pyjsonnet.template_filters as filters7logger = logging.getLogger(__name__)8class RenderJsonnet(object):9    def __init__(self, files=None, manifestpath=None, lib_dirs=[]):10        self.manifestdir = None11        if manifestpath:12            self.manifestdir = os.path.dirname(manifestpath)13        self.files = files14        lib_dirs.append(os.path.join(os.path.dirname(__file__), "jsonnet/lib"))15        self.lib_dirs = lib_dirs16    #  Returns content if worked, None if file not found, or throws an exception17    def try_path(self, path, rel):18        if rel[0] == '/':19            full_path = rel20        else:21            full_path = path + rel22        if full_path[-1] == '/':23            raise RuntimeError('Attempted to import a directory')24        if not rel:25            raise RuntimeError('Got invalid filename (empty string).')26        if self.files is not None and full_path in self.files:27            if self.files[full_path] is None:28                with open(full_path) as f:29                    self.files[full_path] = f.read()30            return rel, self.files[full_path]31        # @TODO(ant31) fail if full_path is absolute32        elif self.manifestdir and os.path.isfile(33                os.path.join(self.manifestdir, full_path)):34            filepath = os.path.join(self.manifestdir, full_path)35            with open(filepath) as f:36                return rel, f.read()37        else:38            for libdir in self.lib_dirs:39                libpath = os.path.join(libdir, rel)40                if os.path.isfile(libpath):41                    with open(libpath) as f:42                        return rel, f.read()43        if not os.path.isfile(full_path):44            return full_path, None45        with open(full_path) as f:46            return full_path, f.read()47    def import_callback(self, path, rel):48        full_path, content = self.try_path(path, rel)49        if content:50            return full_path, content51        raise RuntimeError('File not found')52    def render_jsonnet(self, manifeststr, tla_codes=None):53        # @TODO(ant31): workaround until jsonnet compile on windows54        import _jsonnet55        try:56            json_str = _jsonnet.evaluate_snippet(  # pylint: disable=no-member57                "snippet",58                manifeststr,59                import_callback=self.import_callback,60                native_callbacks=filters.jsonnet_callbacks(),61                tla_codes=tla_codes)62        except RuntimeError as e:63            print("tla_codes: %s" % (str(tla_codes)))64            print("\n".join([65                "%s %s" % (i, line)66                for i, line in enumerate([67                    l for l in manifeststr.split("\n")68                    if re.match(r"^ *#", l) is None69                ])70            ]))71            raise e...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
