Best Python code snippet using playwright-python
video-rename
Source:video-rename  
1#!/usr/bin/python32# -*- coding: utf-8 -*-3# kate: space-indent on; indent-width 4; mixedindent off; indent-mode python; syntax Python;4import sys5import argparse6import json7import re8import string9import urllib.request10import datetime11from arsoft.utils import *12from arsoft.inifile import IniFile13re_words = re.compile('[ \\.\\-&_,]+')14def isxdigit(value):15    for c in value:16        if c not in string.hexdigits:17            return False18    return True19def iscamelcase(s):20    ret = False21    got_upper = False22    for c in s:23        if c in string.ascii_uppercase:24            if got_upper:25                # already upper, so at least two uppercase letters26                return False27            else:28                got_upper = True29        else:30            if not c in string.ascii_lowercase:31                return False32            got_upper = False33    return True34def split_camelcase(s):35    ret = []36    got_upper = False37    start = 038    i = 039    for i in range(len(s)):40        if s[i] in string.ascii_uppercase:41            if i > start:42                ret.append(s[start:i])43            got_upper = True44            start = i45    if i >= start:46        ret.append(s[start:])47    return ret48def convert_date(words):49    ret = []50    int_values = []51    month_names = ['january', 'february', 'march', 'april', 'may', 'june', 'july', 'august', 'september', 'october', 'november', 'december']52    for w in words:53        w_int = None54        w_int_month = False55        w_int_year = False56        try:57            w_int = month_names.index(w.lower()) + 158            w_int_month = True59        except ValueError:60            pass61        if w_int is None:62            try:63                w_int = int(w)64                if w_int > 2000:65                    w_int_year = True66            except ValueError:67                pass68        ret.append(w)69        #print(w, w_int)70        if w_int is None:71            int_values = []72        else:73            int_values.append( (w_int, w_int_month, w_int_year) )74            if len(int_values) == 3:75                year = 076                month = 077                day = 078                #print(int_values)79                for (w_int, w_int_month, w_int_year) in int_values:80                    if w_int_month:81                        month = w_int82                    elif w_int_year:83                        year = w_int84                for (w_int, w_int_month, w_int_year) in int_values:85                    if w_int_month or w_int_year:86                        continue87                    else:88                        if year == 0:89                            year = w_int + 2000 if w_int < 100 else w_int90                        elif month == 0 and w_int >= 1 and w_int <= 12:91                            month = w_int92                        elif day == 0 and w_int >= 1 and w_int <= 31:93                            day = w_int94                #print(year, month , day)95                if year and month and day:96                    # remove string values97                    ret = ret[0:-3]98                    ret.append(datetime.date(year=year, month=month, day=day))99                #print(int_values)100    return ret101def replace_in_list(list, needle, replacement):102    start = -1103    end = 0104    #print(list)105    for n in needle:106        try:107            i = list.index(n, end)108            if start < 0:109                start = i110            end = i111            #print('find %s -> %i' % (n, i))112        except ValueError:113            return list114    #print('build %i, %s -> %s -> %s' % (start, list[0:start], replacement, list[start+len(needle):]))115    list = list[0:start] + replacement + list[start+len(needle):]116    return list117def remove_ignored(words, ignored):118    if not words:119        return []120    ret = []121    index = 0122    words_len = len(words)123    #print('remove_ignored %s' % words)124    while index < words_len:125        found = False126        for iw in ignored:127            if isinstance(iw, list):128                offset = 0129                found = True130                #print('ff %s' % iw)131                for iwe in iw:132                    #print('ff qq %s<>%s' % (iwe, words[index+offset]))133                    if index+offset >= words_len or iwe != words[index+offset]:134                        found = False135                        break136                    offset = offset + 1137                #print('ff match %s' % found)138                if found:139                    index = index + len(iw)140            elif iw == words[index]:141                #print('ff match single %s' % iw)142                found = True143                index = index + 1144                break145        if not found:146            ret.append(words[index])147            index = index + 1148    #print('remove_ignored ret=%s' % ret)149    return ret150def split_into_words(s, ignored=None, combined=None):151    #print(s)152    ret = []153    ex = []154    in_brackets = 0155    brackets_list = []156    for w in re_words.split(s):157        if not w:158            continue159        if (w[0] == '(' and w[-1] == ')') or (w[0] == '[' and w[-1] == ']') or (w[0] == '{' and w[-1] == '}'):160            w = w[1:-1]161        elif w[0] == '(' or w[0] == '[':162            in_brackets+=1163            brackets_list.append(w[1:])164            continue165        elif w[-1] == ')' or w[0] == ']':166            in_brackets-=1167            brackets_list.append(w[0:-1])168            if in_brackets == 0:169                w = ' '.join(brackets_list)170            else:171                continue172        elif in_brackets > 0:173            brackets_list.append(w)174            continue175        if iscamelcase(w):176            for wx in split_camelcase(w):177                if wx[0].isalpha() and wx[-1].isdigit():178                    ex.append(wx[0:-1])179                elif wx[0].isalpha() and wx[-1] == '+':180                    ex.append(wx[0:-1])181                elif wx[0] == '#':182                    ex.append(wx[1:].lower())183                else:184                    ex.append(wx.lower())185        else:186            if w[0].isalpha() and w[-1].isdigit():187                ex.append(w[0:-1])188            elif w[0].isalpha() and w[-1] == '+':189                ex.append(w[0:-1])190            elif w[0] == '#':191                ex.append(w[1:].lower())192            else:193                ex.append(w.lower())194    if ignored is not None:195        ret = remove_ignored(ex, ignored)196    else:197        ret = ex198    if combined:199        for (needle, replacement) in combined:200            ret = replace_in_list(ret, needle, replacement)201            #print('%s -> %s, %s' % (needle, replacement, ret))202    ret = convert_date(ret)203    return ret204def expanduser_dirs(*args):205    dirs = []206    for a in args:207        if a is None:208            continue209        if isinstance(a, list):210            for e in a:211                e = os.path.expanduser(e)212                dirs.append(os.path.abspath(e))213        else:214            a = os.path.expanduser(a)215            dirs.append(os.path.abspath(a))216    return dirs217def extract_html_title(data):218    from html.parser import HTMLParser219    class MyHTMLParser(HTMLParser):220        _path = []221        _title = None222        def handle_starttag(self, tag, attrs):223            self._path.append(tag)224        def handle_endtag(self, tag):225            self._path.pop()226        def handle_data(self, data):227            if len(self._path) > 1 and self._path[-1] == 'title':228                self._title = data229    parser = MyHTMLParser()230    parser.feed(data)231    return parser._title232def copyfile(src, dst, *, follow_symlinks=True, callback=None, use_sendfile=False):233    """Copy data from src to dst.234    If follow_symlinks is not set and src is a symbolic link, a new235    symlink will be created instead of copying the file it points to.236    """237    if shutil._samefile(src, dst):238        raise shutil.SameFileError("{!r} and {!r} are the same file".format(src, dst))239    for fn in [src, dst]:240        try:241            st = os.stat(fn)242        except OSError:243            # File most likely does not exist244            pass245        else:246            # XXX What about other special files? (sockets, devices...)247            if shutil.stat.S_ISFIFO(st.st_mode):248                raise shutil.SpecialFileError("`%s` is a named pipe" % fn)249    if not follow_symlinks and os.path.islink(src):250        os.symlink(os.readlink(src), dst)251    else:252        size = os.stat(src).st_size253        with open(src, 'rb') as fsrc:254            with open(dst, 'wb') as fdst:255                copyfileobj(fsrc, fdst, callback=callback, total=size, use_sendfile=use_sendfile)256    return dst257def copyfileobj(fsrc, fdst, callback, total, length=4096*1024, flush_blocks=64, use_sendfile=False):258    copied = 0259    num_blocks = 0260    if use_sendfile:261        dest_fd = fdst.fileno()262        src_fd = fsrc.fileno()263        block_size = length * flush_blocks264        while True:265            bytesSent = os.sendfile(dest_fd, src_fd, copied, block_size)266            if bytesSent == 0:267                break268            copied += bytesSent269            if callback is not None:270                callback(copied, total=total)271    else:272        while True:273            buf = fsrc.read(length)274            if not buf:275                break276            fdst.write(buf)277            copied += len(buf)278            num_blocks += 1279            if num_blocks >= flush_blocks:280                fdst.flush()281                os.fsync(fdst.fileno())282                num_blocks = 0283            if callback is not None:284                callback(copied, total=total)285def copy_with_progress(src, dst, *, follow_symlinks=True, callback=None, use_sendfile=False):286    if os.path.isdir(dst):287        dst = os.path.join(dst, os.path.basename(src))288    copyfile(src, dst, follow_symlinks=follow_symlinks, callback=callback, use_sendfile=use_sendfile)289    shutil.copymode(src, dst, follow_symlinks=follow_symlinks)290    shutil.copystat(src, dst, follow_symlinks=follow_symlinks)291    return dst292def is_within_directory(f, d):293    relpath = os.path.relpath(f, d)294    if os.path.isabs(relpath):295        # if we got an abspath then f cannot be within d, otherwise we296        # would have gotten a relpath297        return False, None298    else:299        # if relpath starts with .. the file is outside the directory300        if relpath[0] == '.' and relpath[1] == '.':301            return False, None302        else:303            return True, relpath304class video_rule(object):305    def __init__(self, re_pattern, url_template, options={}):306        self.re_pattern = re_pattern307        self.re = re.compile(re_pattern)308        self.url_template = url_template309        self.options = options310    def match(self, basename):311        return self.re.match(basename)312    def __repr__(self):313        return '\"%s\" -> %s' % (self.re_pattern, self.url_template)314class hash_directory(object):315    def __init__(self, path=None, opts=None):316        self.path = path317        self.readonly = False318        if opts is not None:319            for o in opts.split(','):320                if o == 'read-only' or o == 'ro':321                    self.readonly = True322    def __str__(self):323        if self.readonly:324            return 'read-only:%s' % self.path325        else:326            return self.path327    def __repr__(self):328        return self.__str__()329class hash_entry(object):330    def __init__(self, base=None, opts=None, words=None):331        self._aliases = []332        if words is None:333            self._words = split_into_words(base)334        else:335            self._words = words336        self.directory = base337        self.debug = False338        if opts is not None:339            for o in opts.split(','):340                if ':' in o:341                    k,v = o.split(':')342                    if k == 'alias':343                        self._aliases.append(split_into_words(v))344                elif o == 'debug':345                    self.debug = True346    def equal(self, words):347        if len(words) != len(self._words):348            return False349        for i, w in enumerate(self._words):350            if w != words[i]:351                return False352        return True353    @property354    def aliases(self):355        return self._aliases356    @staticmethod357    def _match_words(lhs_words, rhs_words, full_match=False, partial_match=False, debug=False):358        ret = -1359        first = True360        i = 0361        i_max = len(lhs_words)362        j_max = len(rhs_words)363        num_matches = 0364        while i < i_max:365            rhs_words_start = 0366            while rhs_words_start < j_max and i < i_max:367                try:368                    #print('rhs_words.index(%s i=%i, %i)' % (lhs_words, i, rhs_words_start))369                    j = rhs_words.index(lhs_words[i], rhs_words_start)370                    if first:371                        ret = 1372                        num_matches += 1373                        first = False374                    else:375                        ret += 1376                    rhs_words_start = j + 1377                    #print(j)378                    k = 1379                    while i + k < i_max and j + k < j_max and lhs_words[i + k] == rhs_words[j + k]:380                        if debug:381                            print('xx k=%i, %s, %s' % (k, lhs_words[i + k], rhs_words[j + k]))382                        ret += 10383                        num_matches += 1384                        k += 1385                        if debug:386                            print('xx ret=%i' % (ret))387                    # add extra point for full match for single words388                    if i_max == 1:389                        ret += 1390                        if debug:391                            print('full i=%i/%i, ret=%i' % (i,i_max, ret))392                    i = i + k393                except ValueError:394                    break395            i += 1396        if full_match:397            if j_max == num_matches:398                if debug:399                    print('%s<>%s ret=%i (full match)' % (lhs_words,rhs_words,ret))400                return ret401            else:402                if debug:403                    print('%s<>%s ret=-1 (no full match %i!=%i)' % (lhs_words,rhs_words,j_max,num_matches))404                return -1405        elif partial_match:406            if num_matches >= i_max:407                if debug:408                    print('%s<>%s ret=%i (partial match)' % (lhs_words,rhs_words,ret))409                return ret410            else:411                if debug:412                    print('%s<>%s ret=-1 (no partial match %i!=%i)' % (lhs_words,rhs_words,i_max,num_matches))413                return -1414        else:415            if debug:416                print('%s<>%s %i/%i ret=%i' % (lhs_words,rhs_words,j_max,num_matches, ret))417            return ret418    def match(self, words, full_match=False, partial_match=False, debug=False):419        if self.debug:420            debug = True421        ret = self._match_words(self._words, words, full_match=full_match, partial_match=partial_match, debug=debug)422        for alias in self._aliases:423            alias_score = self._match_words(alias, words, full_match=full_match, partial_match=partial_match, debug=debug)424            if alias_score > ret:425                ret = alias_score426        return ret427    def __eq__(self, rhs):428        return self._words == rhs._words429    def __hash__(self):430        ret = 0431        for w in self._words:432            ret += hash(w)433        return ret434    def __iter__(self):435        return iter(self._words)436    @staticmethod437    def directory_name(words, delim='.'):438        s = []439        for w in words:440            if isinstance(w, datetime.date):441                s.append(str(w))442            else:443                s.append(w)444        return delim.join(s)445    def __str__(self):446        return self.directory_name(self._words)447    def __repr__(self):448        return self.__str__()449class input_file(object):450    def __init__(self, filename, hash_dir=None, entry=None, unknown=False, channels=False):451        self.hash_dir = hash_dir452        self.entry = entry453        self.filename = filename454        self.unknown = unknown455        self.channels = channels456        if not isinstance(self.filename, str):457            raise RuntimeError("self.filename is not a string: %s" % type(self.filename))458    def __str__(self):459        return self.filename460    def __repr__(self):461        return self.__str__()462class video_rename_app:463    def __init__(self):464        self._verbose = False465        self._files = []466        self._video_file_exts = []467        self._rules = []468        self._hash_threshold = 2469        self._hash_unknown_dir = '_unknown'470        self._hashes = []471        self._input_dirs = []472        self._hash_dirs = []473        self._ignored_words = []474        self._combined_words = []475        self._channels = []476    def _load_rules(self, section=None):477        self._rules = []478        for (k,v) in section.get_all():479            if k is None or not '.' in k:480                continue481            k_name, k_ext = k.split('.', 1)482            if k_ext == 're':483                re_pattern = v484                url = section.get(k_name + '.url')485                webpage = section.getAsBoolean(k_name + '.webpage', False)486                enable = section.getAsBoolean(k_name + '.enable', True)487                if url and enable:488                    options = { 'webpage': webpage }489                    r = video_rule(re_pattern, url, options)490                    self._rules.append(r)491    def _load_hashes(self, section):492        self._hashes = []493        for (k,v) in section.get_all():494            if k is None:495                continue496            self._hashes.append(hash_entry(k, v))497        self._hash_unknown_dir = section.get('unknown', '_unknown')498        self._hash_channels_dir = section.get('channels', '_channels')499    def _load_config(self, filename):500        filename = os.path.expanduser(filename)501        f = IniFile(filename, keyValueSeperator='=')502        if f.has_section('rules'):503            self._load_rules(f.section('rules'))504        if f.has_section('hashes'):505            self._load_hashes(f.section('hashes'))506        if self._input_dirs is None or not self._input_dirs:507            self._input_dirs = f.getAsArray(None, 'input')508        if self._hash_dirs is None or not self._hash_dirs:509            self._hash_dirs = []510            for hd in f.getAsArray(None, 'hash_dir'):511                if ',' in hd:512                    path,opts = hd.split(',',1)513                else:514                    path = hd515                    opts = None516                self._hash_dirs.append(hash_directory(path, opts))517        self._video_file_exts = f.getAsArray(None, 'video_file_exts', ['.mp4', '.avi', '.wmv', '.mkv', '.mov', '.vid'])518        self._ignored_words = []519        ignored_words_multi = []520        for w in f.getAsArray(None, 'ignored_words', ['360p', '480p', '720p', '1080p', 'and', 'the', 'a', 'is', 'in', 'of']):521            ww = split_into_words(w)522            if len(ww) > 1:523                #print('ignore %s' % (ww))524                ignored_words_multi.append(ww)525            else:526                self._ignored_words.append(w)527        # put all multi words ignores to the beginning of the list528        for ww in ignored_words_multi:529            self._ignored_words.insert(0, ww)530        self._channels = f.getAsArray(None, 'channels', [])531        for s in self._channels:532            channel_names = s.split(',')533            main_channel_name = channel_names[0]534            for ch in channel_names:535                chw = split_into_words(ch)536                self._combined_words.append( (chw, [main_channel_name] ) )537    def _add_file(self, f):538        fabs = os.path.abspath(f.filename)539        (name, ext) = os.path.splitext(fabs)540        if ext in self._video_file_exts:541            self._files.append(input_file(fabs, hash_dir=f.hash_dir, unknown=f.unknown, channels=f.channels, entry=f.entry))542        elif self._verbose:543            print('Skip non-video file %s' % fabs)544    def _add(self, f):545        if not isinstance(f, input_file):546            raise RuntimeError('require input_file not %s' % type(f))547        if self._verbose:548            print('Process %s' % f)549        if os.path.isfile(f.filename):550            self._add_file(f)551        elif os.path.isdir(f.filename):552            for e in os.listdir(path=f.filename):553                full = os.path.join(f.filename, e)554                if os.path.isdir(full):555                    self._add( input_file(full, hash_dir=f.hash_dir, unknown=f.unknown, channels=f.channels, entry=f.entry) )556                elif os.path.isfile(full):557                    self._add_file(input_file(full, hash_dir=f.hash_dir, unknown=f.unknown, channels=f.channels, entry=f.entry))558    def _mkdir(self, d):559        ret = True560        if not os.path.isdir(d):561            try:562                os.makedirs(d, exist_ok=True)563            except OSError:564                ret = False565        return ret566    def _rename_file(self, src, dst, overwrite=False):567        ret = False568        dst_dir = os.path.dirname(dst)569        self._mkdir(dst_dir)570        need_copy = False571        try:572            do_rename = True573            if os.path.exists(dst):574                if os.path.islink(dst):575                    target = os.readlink(dst)576                    if not os.path.isabs(target):577                        # convert into absolute path578                        target = os.path.join(os.path.dirname(dst), target)579                    if os.path.samefile(src,target):580                        os.remove(dst)581                        #print('Found link from src to dst -> %s' % dst)582                elif os.path.isdir(dst):583                    print('Destination %s exists as directory' % (dst), file=sys.stderr)584                    do_rename = False585                elif os.path.isfile(dst):586                    if overwrite:587                        print('Destination file %s already exists. Overwrite' % (dst), file=sys.stderr)588                        os.remove(dst)589                    else:590                        do_rename = False591                        print('Destination file %s already exists' % (dst), file=sys.stderr)592                else:593                    print('Destination %s already exists' % (dst), file=sys.stderr)594                    do_rename = False595            if do_rename:596                os.rename(src, dst)597            ret = True598        except OSError as e:599            # Invalid cross-device link600            if e.errno == 18:601                need_copy = True602            else:603                print('Failed to rename %s to %s: %s' % (src, dst, e), file=sys.stderr)604        if need_copy:605            def _copy_progress(copied, total):606                precent = 100*copied/total607                print('\r  Copying %i %% ...' % (precent), end='')608            try:609                print("  Copying", end='')610                copy_with_progress(src, dst, callback=_copy_progress, use_sendfile=False if self._disable_sendfile else True)611                #shutil.copy2(src, dst)612                os.unlink(src)613                ret = True614                print("\r  Copy complete")615            except OSError as e:616                print('Failed to copy %s to %s: %s' % (src, dst, e), file=sys.stderr)617        return ret618    def _mklink(self, src, dst):619        ret = False620        dst_dir = os.path.dirname(dst)621        self._mkdir(dst_dir)622        r = os.path.relpath(src, dst_dir)623        try:624            if os.path.islink(dst):625                # remove old link first626                os.remove(dst)627            os.symlink(r, dst, target_is_directory=False)628            ret = True629        except OSError as e:630            print('Failed to create symlink %s to %s: %s' % (src, dst, e), file=sys.stderr)631        return ret632    def _get_html_title(self, url):633        hdr = {'User-Agent':'Mozilla/5.0', 'Accept': '*/*'}634        req = urllib.request.Request(url, headers=hdr)635        try:636            response = urllib.request.urlopen(req)637            if response.status == 200:638                data = response.read()      # a `bytes` object639                text = data.decode('utf-8') # a `str`; this step can't be used if data is binary640                #print(text)641                return extract_html_title(text)642            #elif response.status == 302:643                #newurl = response.geturl()644                #print('new url %s' % newurl)645        except urllib.error.HTTPError as e:646            if self._verbose:647                print('HTTP Error %s: %s' % (url, e))648            pass649        return None650    def _clean_title(self, s):651        hash_idx = s.rfind('#')652        if hash_idx > 0:653            s = s[0:hash_idx - 1]654        s = self._clean_filename(s)655        words = split_into_words(s, ignored=self._ignored_words, combined=self._combined_words)656        if self._verbose:657            print('clean filename %s: %s' % (s, words))658        if not words:659            return None660        while len(s) > 200:661            elems = s.split()662            elems.pop()663            s = ' '.join(elems)664        return s665    def _clean_filename(self, s):666        s = s.replace('/', '_')667        s = s.replace('\r', '_')668        s = s.replace('\n', '_')669        s = s.replace('__', '_')670        s = s.strip()671        return s672    def _auto_rename_file(self, f, force=False):673        if not isinstance(f, input_file):674            raise RuntimeError('require input_file not %s' % type(f))675        path, name = os.path.split(f.filename)676        (basename, ext) = os.path.splitext(name)677        found_rule = False678        found_url = None679        for rule in self._rules:680            m = rule.match(basename)681            if m is not None:682                found_rule = True683                found_url = m.expand(rule.url_template)684                if self._verbose:685                    print('  %s: Match rule %s/%s -> %s' % (basename, rule.re_pattern, rule.url_template, found_url))686                break687        got_title = False688        suggested_filename = None689        if found_rule:690            try:691                (sts, stdoutdata, stderrdata) = runcmdAndGetData(args=['ffprobe', '-hide_banner', '-v', 'error', '-of', 'json', '-show_format', f.filename])692            except FileNotFoundError as ex:693                print('Cannot execute ffprobe.', file=sys.stderr)694                sts = -1695            if sts == 0:696                file_format = json.loads(stdoutdata.decode('utf8'))697                #print(file_format['format'])698                if 'tags' in file_format['format']:699                    tags = file_format['format']['tags']700                    if 'title' in tags:701                        title = self._clean_title(tags['title'])702                        if title:703                            got_title = True704                            suggested_filename = path + '/' + title + ext705            if not got_title and found_url:706                title = self._get_html_title(found_url)707                if title:708                    title = self._clean_title(title)709                    if title:710                        suggested_filename = path + '/' + title + ('.%s' % basename) + ext711        else:712            new_basename = self._clean_filename(basename)713            if new_basename != basename:714                suggested_filename = path + '/' + new_basename + ext715        if suggested_filename is not None:716            if self._noop:717                print('  Rename to %s (noop)' % (suggested_filename))718            else:719                print('  Rename to %s' % (suggested_filename))720                self._rename_file(f.filename, suggested_filename)721        return suggested_filename722    def _find_hash_entry(self, basename, existing_entry=None, full_match=False, debug=False):723        words = split_into_words(basename, ignored=self._ignored_words, combined=self._combined_words)724        ret = []725        for e in self._hashes:726            score = e.match(words, full_match=full_match, debug=debug)727            if score >= 0:728                ret.append((score, e))729        ret = sorted(ret, key=lambda e: e[0], reverse=True)730        if ret and existing_entry is not None:731            (top_score, top_entry) = ret[0]732            ret.insert(0, (top_score, existing_entry) )733        return ret734    def _auto_hash_file(self, f, force=False):735        if not isinstance(f, input_file):736            raise RuntimeError('require input_file not %s' % type(f))737        path, name = os.path.split(f.filename)738        (basename, ext) = os.path.splitext(name)739        suggested_filename = None740        symlink_filenames = []741        equal_score_entries = []742        entries = self._find_hash_entry(basename, existing_entry=f.entry, debug=False)743        if entries:744            entries = filter(lambda x: x[0] >= self._hash_threshold, entries)745        if entries:746            is_unknown = False747            equal_score = None748            equal_score_entries = []749            for (score, e) in entries:750                if equal_score is None:751                    equal_score = score752                    equal_score_entries.append(e)753                elif equal_score == score:754                    equal_score_entries.append(e)755        if equal_score_entries:756            dest_filenames = []757            for e in equal_score_entries:758                if f.hash_dir is None:759                    for hd in self._hash_dirs:760                        if hd.readonly:761                            continue762                        suggested_dir = os.path.join(hd.path, e.directory)763                        new_filename = os.path.join(suggested_dir, name)764                        dest_filenames.append( (new_filename, hd) )765                else:766                    suggested_dir = os.path.join(f.hash_dir.path, e.directory)767                    new_filename = os.path.join(suggested_dir, name)768                    dest_filenames.append( (new_filename, f.hash_dir) )769            if dest_filenames:770                if self._verbose:771                    print('  dest filenames: %s' % (dest_filenames))772                for (new_filename, hd) in dest_filenames:773                    is_same = False774                    try:775                        is_same = os.path.samefile(new_filename, f.filename)776                    except OSError:777                        pass778                    if is_same:779                        suggested_filename = new_filename780                    else:781                        symlink_filenames.append(new_filename)782                if suggested_filename is None:783                    suggested_filename = symlink_filenames[0]784                    if len(symlink_filenames) > 1:785                        symlink_filenames = symlink_filenames[1:]786                    else:787                        symlink_filenames = []788        if suggested_filename is None:789            # DO not move files if the file is within a hash_dir790            is_within_hash_dir = False791            if f.hash_dir is None:792                for hd in self._hash_dirs:793                    is_within_hash_dir, relpath = is_within_directory(f.filename, hd.path)794                    if is_within_hash_dir:795                        break796            else:797                is_within_hash_dir = True798            if not is_within_hash_dir:799                target_hd = None800                for hd in self._hash_dirs:801                    if hd.readonly:802                        continue803                    target_hd = hd804                    break805                if target_hd is not None:806                    suggested_filename = os.path.join(target_hd.path, self._hash_unknown_dir, name)807        if suggested_filename is not None:808            if f.filename != suggested_filename:809                if self._noop:810                    print('  Hash move to %s (noop)' % (suggested_filename))811                else:812                    print('  Hash move to %s' % (suggested_filename))813                    self._rename_file(f.filename, suggested_filename)814            for f in symlink_filenames:815                if self._noop:816                    print('  Hash symlink to %s (noop)' % (f))817                else:818                    print('  Hash symlink to %s' % (f))819                    self._mklink(suggested_filename, f)820    def _is_combined_word(self,w):821        ret = False822        for (chw, main_channel_name) in self._combined_words:823            if w in chw:824                ret = True825                break826        return ret827    def main(self):828        #=============================================================================================829        # process command line830        #=============================================================================================831        parser = argparse.ArgumentParser(description='renames video files by extract the video title from the meta data')832        parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='enable verbose output of this script.')833        parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='enable debug output of this script.')834        parser.add_argument('-f', '--force', dest='force', action='store_true', help='force processing of given files.')835        parser.add_argument('-n', '--noop', dest='noop', action='store_true', help='only show what would be done.')836        parser.add_argument('--no-hash', dest='no_hash', action='store_true', help='do not perform hashing')837        parser.add_argument('--no-rename', dest='no_rename', action='store_true', help='do not rename files.')838        parser.add_argument('-H', '--hash', dest='hash_dirs', nargs='*', help='hash video files into given directory')839        parser.add_argument('-q', '--query', dest='queries', nargs='*', help='query for specific files')840        parser.add_argument('-r', '--rehash', dest='rehashs', nargs='*', help='rehash specific words')841        parser.add_argument('--unknown', dest='process_unknown', action='store_true', help='re-process files in unknown directory')842        parser.add_argument('--channels', dest='process_channels', action='store_true', help='re-process files in channels directory')843        parser.add_argument('-c', '--config', dest='config_file', default='~/.config/video-rename.conf', help='load configuration file')844        parser.add_argument('--test-words', dest='test_words', nargs='*', help='testing word processing')845        parser.add_argument('--test-match', dest='test_match', nargs='*', help='test match processing')846        parser.add_argument('--show-hashes', dest='show_hashes', action='store_true', help='show all configured hashes')847        parser.add_argument('--disable-sendfile', dest='disable_sendfile', action='store_true', help='disable usage of sendfile on Unix machines')848        parser.add_argument('files', metavar='FILE', type=str, nargs='*', help='video files or directories')849        args = parser.parse_args()850        self._verbose = args.verbose851        self._debug = args.debug852        self._noop = args.noop853        self._no_hash = args.no_hash854        self._no_rename = args.no_rename855        #self._noop = True856        self._input_dirs = args.files857        self._hash_dirs = []858        for h in expanduser_dirs(args.hash_dirs):859            self._hash_dirs.append(hash_directory(h))860        self._queries = args.queries861        self._rehashs = args.rehashs862        self._process_unknown = args.process_unknown863        self._process_channels = args.process_channels864        self._disable_sendfile = args.disable_sendfile865        self._load_config(filename=args.config_file)866        if self._debug:867            print('Rules:')868            for r in self._rules:869                print('  %s' % r)870        if args.show_hashes:871            print('Hashes:')872            for h in self._hashes:873                print('  %s' % h)874                if h.aliases:875                    for a in h.aliases:876                        print('    %s' % hash_entry.directory_name(a))877            print('Hash unknown dir: %s' % self._hash_unknown_dir)878            print('Hash channels dir: %s' % self._hash_channels_dir)879            return 0880        if args.test_words:881            if self._debug:882                print('Ignored words:')883                for w in self._ignored_words:884                    print('  %s' %w)885                print('Combined words:')886                for (chw, main_channel_name) in self._combined_words:887                    print('  %s -> %s' % ('.'.join(chw), '.'.join(main_channel_name)) )888            print('Results:')889            for w in args.test_words:890                if w.startswith('file://'):891                    w = w[7:]892                if os.path.exists(w):893                    (name, ext) = os.path.splitext(w)894                    w = os.path.basename(name)895                words = split_into_words(w, ignored=self._ignored_words, combined=self._combined_words)896                print('  %s' % words)897            return 0898        if args.test_match:899            for w in args.test_match:900                if w.startswith('file://'):901                    w = w[7:]902                if os.path.exists(w):903                    (name, ext) = os.path.splitext(w)904                    w = os.path.basename(name)905                entries = self._find_hash_entry(w, full_match=False, debug=self._debug)906                if entries:907                    entries = filter(lambda x: x[0] >= self._hash_threshold, entries)908                if entries:909                    for (score, e) in entries:910                        print('%03i %s' % (score, e))911            return 0912        input_dirs = []913        for f in expanduser_dirs(self._input_dirs):914            f_hd = None915            f_entry = None916            for hd in self._hash_dirs:917                within, relpath = is_within_directory(f, hd.path)918                if within:919                    if os.path.isfile(f):920                        f_dir = os.path.dirname(relpath)921                    else:922                        f_dir = relpath923                    #print('%i, %s' % (within, f_dir))924                    f_hd = hd925                    entries = self._find_hash_entry(f_dir, full_match=True)926                    if entries:927                        f_score, f_entry = entries[0]928                    break929            #print(f, f_hd, f_entry)930            input_dirs.append( input_file(f, hash_dir=f_hd, entry=f_entry) )931        if self._verbose:932            if input_dirs:933                print('Input:')934                for i in input_dirs:935                    print('  %s' % i)936            if not self._no_hash:937                print('Hash dirs:')938                for hd in self._hash_dirs:939                    print('  %s' % hd)940        if self._process_unknown or self._process_channels:941            input_dirs = []942        elif self._queries:943            # query is a read-only operation944            self._no_hash = True945            self._no_rename = True946        elif self._rehashs:947            input_dirs = []948            # rehash, so required to do some hashing, but no rename949            self._no_hash = False950            self._no_rename = True951        if self._process_unknown:952            for h in self._hash_dirs:953                input_dirs.append( input_file(os.path.join(h.path, self._hash_unknown_dir), hash_dir=h, unknown=True) )954        if self._process_channels:955            for h in self._hash_dirs:956                input_dirs.append( input_file(os.path.join(h.path, self._hash_channels_dir), hash_dir=h, channels=True) )957        for f in input_dirs:958            self._add(f)959        if self._verbose:960            print('Files to process:')961            for i, f in enumerate(self._files):962                print('  [%03i]: %s' % (i,f))963        possible_words = []964        show_possible_words = False965        if self._process_unknown or self._process_channels:966            show_possible_words = True967            word_dict = {}968            for f in self._files:969                (name, ext) = os.path.splitext(f.filename)970                basename = os.path.basename(name)971                words = split_into_words(basename, ignored=self._ignored_words, combined=self._combined_words)972                num_words = len(words)973                for i in range(num_words):974                    if i + 1 < num_words:975                        if self._is_combined_word(words[i]) or self._is_combined_word(words[i+1]):976                            continue977                        word_combo = [ words[i], words[i+1] ]978                        h = hash_entry( words=word_combo )979                        if h in word_dict:980                            word_dict[h].add(f)981                        else:982                            word_dict[h] = set( [ f ] )983            existing_hashes = []984            for (h, files) in word_dict.items():985                found_existing_hash = True if h in self._hashes else False986                if found_existing_hash:987                    existing_hashes.append( (h, files) )988                elif len(files) > 1: #self._hash_threshold:989                    possible_words.append( (h, files ) )990            if 0:991                # reset the list of files992                self._files = []993                if existing_hashes:994                    if self._verbose:995                        print('Find files for existing hashes:')996                    for (h, files) in existing_hashes:997                        if self._verbose:998                            print('  %s: %i matches' % (h, len(files)))999                        for f in files:1000                            if self._verbose:1001                                print('    %s' % f)1002                            self._files.append(f)1003                if self._verbose:1004                    if self._files:1005                        print('Files for rehashing:')1006                        for f in files:1007                            print('  %s' % f)1008        elif self._queries:1009            for q in self._queries:1010                entries = self._find_hash_entry(q, full_match=True, debug=self._debug)1011                if entries:1012                    for (score, e) in entries:1013                        print('Found hash entry \'%s\':' % (e))1014                        results = []1015                        for hd in self._hash_dirs:1016                            d = os.path.join(hd.path, str(e))1017                            if os.path.isdir(d):1018                                for f in os.listdir(d):1019                                    results.append( (f, os.path.join(d, f), hd ) )1020                        for f in self._files:1021                            (name, ext) = os.path.splitext(f.filename)1022                            basename = os.path.basename(name)1023                            words = split_into_words(basename, ignored=self._ignored_words, combined=self._combined_words)1024                            score = e.match(words, full_match=False)1025                            if score >= 0:1026                                results.append( (basename, f.filename, None ) )1027                        for (basename, full, hd) in sorted(results, key=lambda x: x[0]):1028                            if hd is None:1029                                print('  %s in %s' % (basename, os.path.dirname(full)))1030                            else:1031                                print('  %s' % basename)1032                if 1:1033                    tmp_hash_entry = hash_entry(q)1034                    results = []1035                    for hd in self._hash_dirs:1036                        d = os.path.join(hd.path, self._hash_unknown_dir)1037                        if os.path.isdir(d):1038                            for f in os.listdir(d):1039                                (basename, ext) = os.path.splitext(f)1040                                words = split_into_words(basename, ignored=self._ignored_words, combined=self._combined_words)1041                                score = tmp_hash_entry.match(words, partial_match=True)1042                                if score >= 0:1043                                    results.append( (f, os.path.join(d, f), hd ) )1044                    for (basename, full, hd) in sorted(results, key=lambda x: x[0]):1045                        print('  %s in %s' % (basename, os.path.dirname(full)))1046        elif self._rehashs:1047            for q in self._rehashs:1048                entries = self._find_hash_entry(q, full_match=True)1049                if entries:1050                    for (score, e) in entries:1051                        print('Found hash entry: %s:' % (e))1052                        for hd in self._hash_dirs:1053                            d = os.path.join(hd.path, str(e))1054                            if os.path.isdir(d):1055                                self._add(input_file(d, hash_dir=hd, entry=e))1056        total = len(self._files)1057        total_progress = 01058        current_progress = 01059        if not self._no_rename:1060            total_progress += total1061        if not self._no_hash:1062            total_progress += total1063        if not self._no_rename:1064            for i in range(total):1065                print('[%03i/%03i] Rename %s' % (current_progress + 1, total_progress, self._files[i]))1066                current_progress += 11067                f_new = self._auto_rename_file(self._files[i], force=args.force)1068                if f_new is not None:1069                    self._files[i] = input_file(f_new, hash_dir=f.hash_dir)1070        if not self._no_hash:1071            for i in range(total):1072                f = self._files[i]1073                print('[%03i/%03i] Hash %s' % (current_progress + 1, total_progress, self._files[i]))1074                current_progress += 11075                self._auto_hash_file(self._files[i], force=args.force)1076        if possible_words and show_possible_words:1077            possible_words = sorted(possible_words, key=lambda e: len(e[1]), reverse=False)1078            print('Possible words:')1079            for (h, files) in possible_words:1080                print('  %s: %i matches' % (h, len(files)))1081                if self._debug:1082                    for f in files:1083                        print('    %s' % f)1084            for (h, files) in possible_words:1085                if len(files) > 2:1086                    print('%s=' % (h))1087        ret = 01088        return ret1089if __name__ == "__main__":1090    app =  video_rename_app()...x509helper.py
Source:x509helper.py  
1#!/usr/bin/env python22# -*- coding: utf-8 -*-3"""4Copyright (C) 2011-20185    Adam Greene <copyright@mzpqnxow.com>6Please see LICENSE or LICENSE.md for terms7"""8from Crypto.Util import asn19import base6410import OpenSSL11import os12import textwrap13import logging14import sys15from logger import LoggingMixin16class X509Helper(LoggingMixin):17    """18    base class for X509HelperCertificate and X509HelperKey19    functions for formatting common fieldsas errtc.20    """21    # when producing suggested_filename, how many bytes of the modules to use22    # in naming23    FILENAME_OCTETS = 624    def __init__(25            self,26            logger=None,27            log_level=logging.CRITICAL,28            blacklist_file=''):29        self.modulus_blacklist = []30        self.modulus_blacklist_config_path = blacklist_file31        if logger is None:32            LoggingMixin.__init__(self, log_level=log_level)33        else:34            self.logger = logger35        self.load_modulus_blacklist()36    def load_modulus_blacklist(self):37        if not self.modulus_blacklist_config_path:38            return39        try:40            f = open(self.modulus_blacklist_config_path, 'rb')41            for modulus_line in f.readlines():42                eline = modulus_line43                eline = eline.strip('\n')44                eline = eline.upper()45                self.modulus_blacklist.append(eline)46            f.close()47            self.logger.debug('Added {} items to modulus blacklist...'.format(48                len(self.modulus_blacklist)))49        except Exception as err:50            self.logger.error(51                'Fatal exception occurred while building blacklist...')52            self.logger.error(err)53            sys.exit(10)54    def is_blacklisted(self, modulus):55        return modulus.upper() in self.modulus_blacklist56    def printable_modulus(57            self,58            der_decimal,59            columns=16,60            prefix='\t',61            use_colons=True):62        modulus = hex(der_decimal).rstrip('L').lstrip('0x')or '0'63        printable_modulus = ''64        for i in xrange(0, len(modulus), 2):65            if i:66                if not (i % columns):67                    printable_modulus += '\n' + prefix68                else:69                    if use_colons:70                        printable_modulus += ':'71            else:72                printable_modulus += prefix73            printable_modulus += modulus[i:i + 2]74        return printable_modulus75    def modulus_long_to_string(self, der_decimal):76        modulus = hex(der_decimal).rstrip('L').lstrip('0x')or '0'77        printable_modulus = ''78        for i in xrange(0, len(modulus), 2):79            printable_modulus += modulus[i:i + 2]80        return printable_modulus81class X509HelperKey(X509Helper):82    def __init__(83            self,84            key_file,85            blacklist_file=None,86            logger=None,87            password='password'):88        X509Helper.__init__(89            self, logger=logger, blacklist_file=blacklist_file)90        self.key_pem_buffer = None91        self.rsa_key = None92        self.key_private_asn1 = None93        self.key_private_der = None94        self.key_modulus = None95        self.key_public_exponent = None96        self.key_private_exponent = None97        self.key_private_prime1 = None98        self.key_private_prime2 = None99        self.key_private_exponent1 = None100        self.key_private_exponent2 = None101        self.key_private_coefficient = None102        self.key_file = key_file103        self.password = password104        if blacklist_file:105            self.modulus_blacklist_config_path = blacklist_file106        self.subjects = ''107        self.parsed_key = None108        self.parse_key_file_universal()109    def __eq__(self, obj):110        if isinstance(obj, X509HelperCertificate):111            return ((self.key_modulus == obj.certificate_public_modulus) and (112                obj.certificate_public_exponent == self.key_public_exponent))113        else:114            return False115    def passwd_cb(self):116        self.logger.info('Returning password "{}"'.format(self.password))117        return self.password118    def der_key_to_pem_key(self, der_buffer):119        PEM_HEADER = '-----BEGIN RSA PRIVATE KEY-----'120        PEM_FOOTER = '-----END RSA PRIVATE KEY-----'121        f = str(base64.standard_b64encode(der_buffer))122        return (PEM_HEADER + '\n' + textwrap.fill(f, 64) + '\n' + PEM_FOOTER)123    def write_to_file(self, dirname=''):124        """ also sets self.subject """125        if dirname and not dirname.endswith('/'):126            dirname += '/'127        try:128            os.stat(os.path.join(dirname, self.suggested_filename))129            return ''  # dup, already processed this key130        except (IOError, OSError):131            pass  # file doesn't exist, process this entry132        with open(os.path.join(dirname, self.suggested_filename), 'wb') as filefd:133            # dos2unix and add a trailing newline134            filefd.write(self.key_pem_buffer.replace('\r\n', '\n') + '\n')135        self.subjects = self.suggested_filename.ljust(136            25) + ' - %d bit RSA Private Key (PEM format)'.format(self.key_bitsize)137        return self.subjects138    def key(self):139        return self.parsed_key140    def parse_key_file_universal(self):141        try:142            self.logger.info('Parsing key {}'.format(self.key_file))143            self.key_buffer = open(self.key_file, 'rb').read()144            self.crypto = OpenSSL.crypto145            try:146                self.logger.warning(147                    'Trying to load {} data as PEM...'.format(148                        self.key_file))149                self.rsa_key = self.crypto.load_privatekey(150                    self.crypto.FILETYPE_PEM, self.key_buffer, 'password')151                self.key_pem_buffer = self.key_buffer152            except Exception as err:153                pass154            if not self.rsa_key or not self.key_pem_buffer:155                try:156                    self.logger.warning(157                        'Trying to load {} data as DER...'.format(158                            self.key_file))159                    self.rsa_key = self.crypto.load_privatekey(160                        self.crypto.FILETYPE_ASN1, self.key_buffer, 'password')161                    self.key_pem_buffer = self.der_key_to_pem_key(162                        self.key_buffer)163                except Exception as err:164                    self.logger.warning(165                        'Failure to parse {} as DER/PEM format key, skipping...'.format(self.key_file))166                    raise(err)167            self.key_bitsize = self.rsa_key.bits()168            self.key_private_asn1 = self.crypto.dump_privatekey(169                self.crypto.FILETYPE_ASN1, self.rsa_key)170            self.key_private_der = asn1.DerSequence()171            self.key_private_der.decode(self.key_private_asn1)172            self.key_modulus = self.key_private_der[1]173            self.key_printable_private_modulus = self.printable_modulus(174                self.key_modulus)175            d = self.modulus_long_to_string(self.key_modulus)176            if self.is_blacklisted(d):177                self.logger.info('found blacklisted key...')178                self.parsed_key = None179                return180            self.key_public_exponent = self.key_private_der[2]181            self.key_private_exponent = self.key_private_der[3]182            self.key_private_prime1 = self.key_private_der[4]183            self.key_private_prime2 = self.key_private_der[5]184            self.key_private_exponent1 = self.key_private_der[6]185            self.key_private_exponent2 = self.key_private_der[7]186            self.key_private_coefficient = self.key_private_der[8]187            self.suggested_filename = self.key_printable_private_modulus.replace(188                '\t', '')189            self.suggested_filename = self.suggested_filename[len(190                self.suggested_filename) - (3 * self.FILENAME_OCTETS) + 1:]191            self.suggested_filename += '.key'192            self.suggested_filename = self.suggested_filename.replace(':', '_')193            self.suggested_filename = self.suggested_filename.replace('\r', '')194            self.suggested_filename = self.suggested_filename.replace('\n', '')195            self.parsed_key = {}196            self.parsed_key['public_exponent'] = self.key_public_exponent197            self.parsed_key['private_exponent'] = self.key_private_exponent198            self.parsed_key['private_prime1'] = self.key_private_prime1199            self.parsed_key['private_prime2'] = self.key_private_prime2200            self.parsed_key['private_exponent1'] = self.key_private_exponent1201            self.parsed_key['private_exponent2'] = self.key_private_exponent2202            self.parsed_key['private_coefficient'] = self.key_private_coefficient203            self.parsed_key['key_bitsize'] = self.key_bitsize204            self.parsed_key['suggested_filenmame'] = self.suggested_filename205            self.logger.critical(206                'Success - %d bit RSA Private Key (PEM format)'.format(self.key_bitsize))207            return208        except Exception as err:209            self.parsed_key = None210            self.logger.debug(err)211            self.logger.debug(212                'Exception with {} as an RSA key file, skipping...'.format(213                    self.key_file))214            return215class X509HelperCertificate(X509Helper):216    def __init__(self, certificate_file, blacklist_file=None, logger=None):217        X509Helper.__init__(self, logger=logger, blacklist_file=blacklist_file)218        self.certificate_pem_buffer = None219        self.certificate_pubkey_PKey = None220        self.certificate_pubkey_asn1 = None221        self.certificate_pubkey_der = None222        self.certificate_public_modulus = None223        self.certificate_public_exponent = None224        self.text_subject = ''225        self.subject_dict = {}226        self.certificate_file = certificate_file227        self.parsed_certificate = None228        self.parse_certificate_file_universal()229    def __eq__(self, obj):230        if isinstance(obj, X509HelperKey):231            return (obj.key_modulus == self.certificate_public_modulus) and (232                obj.key_public_exponent == self.certificate_public_exponent)233        else:234            return False235    def OID_tuple_to_string(self, oid_tuple):236        i = 0237        s = ''238        for v in oid_tuple:239            if i:240                s = s + '.'241            i += 1242            s = s + str(v)243        return s244    def certificate(self):245        return self.parsed_certificate246    def lookup_OID(self, oid_tuple):247        """Map an OID in tuple form to a symbolic name248        We know a few for unknown OIDs, translate the OID to a string and use that249        a la OpenSSL250        Fill these in yourself if you want, the documentation is listed in the comments251        """252        lookup = {253            # Organization - http://www.oid-info.com/get/2.5.4.10254            # http://www.oid-info.com/get/0.9.2342.19200300.100.1.1255            (2, ): 'ISO/ITU-T',256            (2, 5): 'X.500 Directory Services',257            (2, 5, 4): 'X.500 Attribute Types',258            (2, 5, 4, 0): 'id-at-objectClass',259            (2, 5, 4, 1): 'id-at-aliasedEntryName',260            (2, 5, 4, 2): 'id-at-knowldgeinformation',261            (2, 5, 4, 3): 'id-at-commonName',262            (2, 5, 4, 4): 'id-at-surname',263            (2, 5, 4, 5): 'id-at-serialNumber',264            (2, 5, 4, 6): 'id-at-countryName',265            (2, 5, 4, 7): 'id-at-localityName',266            (2, 5, 4, 8): 'id-at-stateOrProvinceName',267            (2, 5, 4, 9): 'id-at-streetAddress',268            (2, 5, 4, 10): 'id-at-organizationName',269            (2, 5, 4, 11): 'id-at-organizationalUnitName',270            (2, 5, 4, 12): 'id-at-title',271            (2, 5, 4, 13): 'id-at-description',272            (2, 5, 4, 14): 'id-at-searchGuide',273            (2, 5, 4, 15): 'id-at-businessCategory',274            (2, 5, 4, 16): 'id-at-postalAddress',275            (2, 5, 4, 17): 'id-at-postalCode',276            (2, 5, 4, 18): 'id-at-postOfficeBox',277            (2, 5, 4, 19): 'id-at-physicalDeliveryOfficeName',278            (2, 5, 4, 20): 'id-at-telephoneNumber',279            (2, 5, 4, 21): 'id-at-telexNumber',280            (2, 5, 4, 22): 'id-at-teletexTerminalIdentifier',281            (2, 5, 4, 23): 'id-at-facsimileTelephoneNumber',282            (2, 5, 4, 24): 'id-at-x121Address',283            (2, 5, 4, 25): 'id-at-internationalISDNNumber',284            (2, 5, 4, 26): 'id-at-registeredAddress',285            (2, 5, 4, 27): 'id-at-destinationIndicator',286            (2, 5, 4, 28): 'id-at-preferredDeliveryMethod',287            (2, 5, 4, 29): 'id-at-presentationAddress',288            (2, 5, 4, 30): 'id-at-supportedApplicationContext',289            (2, 5, 4, 31): 'id-at-member',290            (2, 5, 4, 32): 'id-at-owner',291            (2, 5, 4, 33): 'id-at-roleOccupant',292            (2, 5, 4, 34): 'id-at-seeAlso',293            (2, 5, 4, 35): 'id-at-userPassword',294            (2, 5, 4, 36): 'id-at-userCertificate',295            (2, 5, 4, 37): 'id-at-cACertificate',296            (2, 5, 4, 38): 'id-at-authorityRevocationList',297            (2, 5, 4, 39): 'id-at-certificateRevocationList',298            (2, 5, 4, 40): 'id-at-crossCertificatePair',299            (2, 5, 4, 41): 'id-at-name',300            (2, 5, 4, 42): 'id-at-givenName',301            (2, 5, 4, 43): 'id-at-initials',302            (2, 5, 4, 44): 'id-at-generationQualifier',303            (2, 5, 4, 45): 'id-at-uniqueIdentifier',304            (2, 5, 4, 46): 'id-at-dnQualifier',305            (2, 5, 4, 47): 'id-at-enhancedSearchGuide',306            (2, 5, 4, 48): 'id-at-protocolInformation',307            (2, 5, 4, 49): 'id-at-distinguishedName',308            (2, 5, 4, 50): 'id-at-uniqueMember',309            (2, 5, 4, 51): 'id-at-houseIdentifier',310            (2, 5, 4, 52): 'id-at-supportedAlgorithms',311            (2, 5, 4, 53): 'id-at-deltaRevocationList',312            (2, 5, 4, 58): 'Attribute Certificate attribute (id-at-attributeCertificate)',313            (2, 5, 4, 65): 'id-at-pseudonym'}314        return lookup.get(oid_tuple, self.OID_tuple_to_string(oid_tuple))315    def handle_custom_oids(self):316        """317        process self.subject (asn1/der) in order to produce a subject string for humans to read318        this is called for non-standard Sybject stringsas errspecially the custom OIDs319        OpenSSL native can parse this fine, but python bindings can't, they assign the subject a320        field named 'UNDEF' it is OK if this function fails, it is just best effort to improve321        the 'UNDEF' description...322        """323        certType = Name()324        derData = self.subject.der()325        cert, rest = asn1.codec.der.decoder(derData, asn1spec=certType)326        try:327            subject = ''328            extensions = cert.getComponentByPosition(0)329            i = 0330            while True:331                pair = extensions.getComponentByPosition(i)332                pair = pair.getComponentByPosition(0)333                name = pair.getComponentByPosition(0).asTuple()334                value = pair.getComponentByPosition(1).getComponent()335                name = self.lookup_OID(name)336                if i != 0:337                    subject += '/'338                subject += '{}={}'.format(name, value)339                i += 1340        except Exception as err:341            self.logger.debug('expected exception, ignoring...')342            self.logger.debug(err)343        return subject344    def write_to_file(self, dirname=''):345        """ also sets self.summary """346        if dirname and not dirname.endswith('/'):347            dirname += '/'348        try:349            os.stat(os.path.join(dirname, self.suggested_filename))350            return ''  # dup, already processed this key351        except (IOError, OSError):352            pass  # file doesn't exist, process this entry353        with open(os.path.join(dirname, self.suggested_filename), 'wb') as filefd:354            # dos2unix and add a trailing newline355            filefd.write(356                self.certificate_pem_buffer.replace(357                    '\r\n', '\n') + '\n')358        self.summary = self.suggested_filename.ljust(359            25) + ' - ' + self.text_subject360        return self.summary361    def parse_subject_components(self):362        s = {}363        for c in self.subject_components:364            s[c[0]] = c[1]365        self.subject_components = s366        if 'UNDEF' in self.subject_components:367            try:368                self.text_subject += self.handle_custom_oids()369            except Exception:370                self.logger.error(371                    'unexpected exception in handle_custom_oids!')372                self.text_subject += 'UNDEF=0'373        else:374            for key in self.subject_components:375                self.text_subject += key + '=' + \376                    self.subject_components[key] + '/'377        return378    def get_subject_field(self, field):379        if field in self.subject_components:380            return self.subject_components[field]381        else:382            return None383    def der_cert_to_pem_cert(self, der_buffer):384        """385        Takes a certificate in binary DER format and returns the386        PEM version of it as a string.387        """388        PEM_HEADER = '-----BEGIN CERTIFICATE-----'389        PEM_FOOTER = '-----END CERTIFICATE-----'390        b64 = str(base64.standard_b64encode(der_buffer))391        return (PEM_HEADER + '\n' + textwrap.fill(b64, 64) + '\n' + PEM_FOOTER)392    def parse_certificate_file_universal(self):393        try:394            self.logger.warning(395                'OK, trying to process certificate file %s...'.format(396                    self.certificate_file))397            self.c = OpenSSL.crypto398            self.certificate_buffer = open(399                self.certificate_file,400                'rb').read()401            try:  # assume it is PEM first402                self.x509_certificate = self.c.load_certificate(403                    self.c.FILETYPE_PEM,404                    self.certificate_buffer)405                self.certificate_pem_buffer = self.certificate_buffer406            except Exception as err:  # not PEM, try to treat it as DER407                self.x509_certificate = None408                self.certificate_der_buffer = self.certificate_buffer409                self.certificate_pem_buffer = self.der_cert_to_pem_cert(410                    self.certificate_der_buffer)411            if not self.x509_certificate:412                self.x509_certificate = self.c.load_certificate(413                    self.c.FILETYPE_PEM,414                    self.certificate_pem_buffer)415            self.certificate_pubkey_PKey = self.x509_certificate.get_pubkey()416            self.subject = self.x509_certificate.get_subject()417            self.subject_components = self.subject.get_components()418            self.certificate_pubkey_asn1 = self.c.dump_privatekey(419                self.c.FILETYPE_ASN1,420                self.certificate_pubkey_PKey)421            self.certificate_pubkey_der = asn1.DerSequence()422            self.certificate_pubkey_der.decode(self.certificate_pubkey_asn1)423            self.certificate_public_modulus = self.certificate_pubkey_der[1]424            self.certificate_key_bitsize = self.certificate_public_modulus.bit_length()425            d = self.modulus_long_to_string(self.certificate_public_modulus)426            if self.is_blacklisted(d):427                self.logger.info('found blacklisted certificate...')428                return429            self.certificate_printable_public_modulus = self.printable_modulus(430                self.certificate_pubkey_der[1])431            self.certificate_public_exponent = self.certificate_pubkey_der[2]432            self.suggested_filename = self.certificate_printable_public_modulus.replace(433                '\t', '')434            self.suggested_filename = self.suggested_filename[len(435                self.suggested_filename) - (3 * self.FILENAME_OCTETS) + 1:]436            self.suggested_filename += '.cert'437            self.suggested_filename = self.suggested_filename.replace(':', '_')438            self.suggested_filename = self.suggested_filename.replace('\r', '')439            self.suggested_filename = self.suggested_filename.replace('\n', '')440            self.parse_subject_components()441            self.parsed_certificate = {}442            self.parsed_certificate['key_bitsize'] = self.certificate_key_bitsize443            self.parsed_certificate['public_modulus'] = self.certificate_public_modulus444            self.parsed_certificate['text_subject'] = self.text_subject445            self.parsed_certificate['suggested_filename'] = self.suggested_filename446            self.logger.critical('Success - %s', self.text_subject)447            return448        except Exception as err:449            self.logger.warning(err)450            self.logger.warning(451                'Failure to parse {} as DER/PEM format certificate, skipping...'.format(452                    self.certificate_file))...api.py
Source:api.py  
1import subprocess2import os3import unicodedata4import json5import falcon6from datetime import date, datetime7from butterknife.pool import Subvol8class MyEncoder(json.JSONEncoder):9    def default(self, obj):10        if isinstance(obj, datetime):11            return obj.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + "Z"12        if isinstance(obj, date):13            return obj.strftime('%Y-%m-%d')14        if isinstance(obj, map):15            return tuple(obj)16        if isinstance(obj, Subvol):17            return obj.version18        return json.JSONEncoder.default(self, obj)19def parse_subvol(func):20    def wrapped(instance, req, resp, subvol, *args, **kwargs):21        return func(instance, req, resp, Subvol("@" + subvol), *args, **kwargs)22    return wrapped23def serialize(func):24    """25    Falcon response serialization26    """27    def wrapped(instance, req, resp, **kwargs):28        assert not req.get_param("unicode") or req.get_param("unicode") == u"â", "Unicode sanity check failed"29        resp.set_header("Cache-Control", "no-cache, no-store, must-revalidate");30        resp.set_header("Pragma", "no-cache");31        resp.set_header("Expires", "0");32        r = func(instance, req, resp, **kwargs)33        if not resp.body:34            if not req.client_accepts_json:35                raise falcon.HTTPUnsupportedMediaType(36                    'This API only supports the JSON media type.',37                    href='http://docs.examples.com/api/json')38            resp.set_header('Content-Type', 'application/json')39            resp.body = json.dumps(r, cls=MyEncoder)40        return r41    return wrapped42from jinja2 import Environment, PackageLoader, FileSystemLoader43env = Environment(loader=PackageLoader('butterknife', 'templates'))44def templatize(path):45    template = env.get_template(path)46    def wrapper(func):47        def wrapped(instance, req, resp, **kwargs):48            assert not req.get_param("unicode") or req.get_param("unicode") == u"â", "Unicode sanity check failed"49            r = func(instance, req, resp, **kwargs)50            r.pop("self", None)51            if not resp.body:52                if  req.get_header("Accept") == "application/json":53                    resp.set_header("Cache-Control", "no-cache, no-store, must-revalidate");54                    resp.set_header("Pragma", "no-cache");55                    resp.set_header("Expires", "0");56                    resp.set_header('Content-Type', 'application/json')57                    resp.body = json.dumps(r, cls=MyEncoder)58                    return r59                else:60                    resp.set_header('Content-Type', 'text/html')61                    resp.body = template.render(request=req, **r)62                    return r63        return wrapped64    return wrapper65class PoolResource(object):66    def __init__(self, pool, subvol_filter):67        self.pool = pool68        self.subvol_filter = subvol_filter69class SubvolResource(PoolResource):70    @templatize("index.html")71    def on_get(self, req, resp):72        def subvol_generator():73            for subvol in sorted(self.pool.subvol_list(), reverse=True):74                if req.get_param("architecture"):75                    if req.get_param("architecture") != subvol.architecture:76                        continue77                yield subvol78        return { "subvolumes": tuple(subvol_generator()) }79class TemplateResource(PoolResource):80    @serialize81    def on_get(self, req, resp):82        return {"templates": map(83            lambda j:{"namespace": j[0], "identifier":j[1], "architectures":j[2]},84            self.pool.template_list(self.subvol_filter))}85class VersionResource(PoolResource):86    @serialize87    def on_get(self, req, resp, name, arch):88        namespace, identifier = name.rsplit(".", 1)89        subset_filter = self.subvol_filter.subset(namespace=namespace,90            identifier=identifier, architecture=arch)91        return { "versions": map(92            lambda v:{"identifier":v, "signed":v.signed},93            sorted(subset_filter.apply(self.pool.subvol_list()), reverse=True, key=lambda j:j.numeric_version)) }94class LegacyStreamingResource(PoolResource):95    def on_get(self, req, resp, name, arch, version):96        parent_version = req.get_param("parent")97        subvol = "@template:%(name)s:%(arch)s:%(version)s" % locals()98        if not self.subvol_filter.match(Subvol(subvol)):99            raise Exception("Not going to happen")100        suggested_filename = "%(name)s:%(arch)s:%(version)s" % locals()101        if parent_version:102            parent_subvol = "@template:%(name)s:%(arch)s:%(parent_version)s" % locals()103            if not self.subvol_filter.match(Subvol(parent_subvol)): raise104            suggested_filename += ":" + parent_version105        else:106            parent_subvol = None107        suggested_filename += ".far"108        resp.set_header("Content-Disposition", "attachment; filename=\"%s\"" % suggested_filename)109        resp.set_header('Content-Type', 'application/btrfs-stream')110        streamer = self.pool.send(subvol, parent_subvol)111        resp.stream = streamer.stdout112        accepted_encodings = req.get_header("Accept-Encoding") or ""113        accepted_encodings = [j.strip() for j in accepted_encodings.lower().split(",")]114        if "gzip" in accepted_encodings:115            for cmd in "/usr/bin/pigz", "/bin/gzip":116                if os.path.exists(cmd):117                    resp.set_header('Content-Encoding', 'gzip')118                    print("Compressing with %s" % cmd)119                    compressor = subprocess.Popen((cmd,"--fast"), bufsize=-1, stdin=streamer.stdout, stdout=subprocess.PIPE)120                    resp.stream = compressor.stdout121                    break122            else:123                print("No gzip compressors found, falling back to no compression")124        else:125            print("Client did not ask for compression")126class StreamResource(PoolResource):127    @parse_subvol128    def on_get(self, req, resp, subvol):129        if not self.subvol_filter.match(subvol):130            resp.body = "Subvolume does not match filter"131            resp.status = falcon.HTTP_403132            return133        format = req.get_param("format") or "btrfs-stream"134        if format == "btrfs-stream":135            parent_slug = req.get_param("parent")136            suggested_filename = "%s.%s-%s-%s" % (subvol.namespace, subvol.identifier, subvol.architecture, subvol.version)137            if parent_slug:138                parent_subvol = Subvol(parent_slug) if parent_slug else None139                if not self.subvol_filter.match(parent_subvol):140                    resp.body = "Subvolume does not match filter"141                    resp.status = falcon.HTTP_403142                    return143                suggested_filename += "-" + parent_subvol.version144            else:145                parent_subvol = None146            suggested_filename += ".far"147            resp.set_header('Content-Type', 'application/btrfs-stream')148            try:149                streamer = self.pool.send(subvol, parent_subvol)150            except SubvolNotFound as e:151                resp.body = "Could not find subvolume %s\n" % str(e)152                resp.status = falcon.HTTP_403153                return154        elif format == "tar":155            suggested_filename = "%s.%s-%s-%s.tar" % (subvol.namespace, subvol.identifier, subvol.architecture, subvol.version)156            try:157                streamer = self.pool.tar(subvol)158            except SubvolNotFound as e:159                resp.body = "Could not find subvolume %s\n" % str(e)160                resp.status = falcon.HTTP_403161                return162        else:163            resp.body = "Requested unknown format"164            resp.status = falcon.HTTP_403165            return166        resp.stream = streamer.stdout167        resp.set_header("Content-Disposition", "attachment; filename=\"%s\"" % suggested_filename)168        accepted_encodings = req.get_header("Accept-Encoding") or ""169        accepted_encodings = [j.strip() for j in accepted_encodings.lower().split(",")]170        if "gzip" in accepted_encodings:171            for cmd in "/usr/bin/pigz", "/bin/gzip":172                if os.path.exists(cmd):173                    resp.set_header('Content-Encoding', 'gzip')174                    print("Compressing with %s" % cmd)175                    compressor = subprocess.Popen((cmd,"--fast"), bufsize=-1, stdin=streamer.stdout, stdout=subprocess.PIPE)176                    resp.stream = compressor.stdout177                    return178            else:179                print("No gzip compressors found, falling back to no compression")180        else:181            print("Client did not ask for compression")182class ManifestResource(PoolResource):183    """184    Generate manifest for a subvolume185    """186    @parse_subvol187    def on_get(self, req, resp, subvol):188        if not self.subvol_filter.match(subvol):189            resp.body = "Subvolume does not match filter"190            resp.status = falcon.HTTP_403191            return192        suggested_filename = "%s.%s-%s-%s.csv" % (subvol.namespace, subvol.identifier, subvol.architecture, subvol.version)193        resp.set_header('Content-Type', 'text/plain')194        resp.stream = self.pool.manifest(subvol)195class KeyringResource(object):196    def __init__(self, filename):197        self.filename = filename198    def on_get(self, req, resp):199        resp.set_header("Content-Type", "application/x-gnupg-keyring")200        resp.set_header("Content-Disposition", "attachment; filename=\"%s.gpg\"" % req.env["SERVER_NAME"].replace(".", "_")) # HTTP_HOST instead? Underscore *should* not be allowed in hostname201        resp.stream = open(self.filename, "rb")202class SignatureResource(PoolResource):203    @parse_subvol204    def on_get(self, req, resp, subvol):205        if not self.subvol_filter.match(subvol):206            resp.body = "Subvolume does not match filter"207            resp.status = falcon.HTTP_403208            return209        try:210            resp.stream = self.pool.signature(subvol)211            suggested_filename = "%s.%s-%s-%s.asc" % (subvol.namespace, subvol.identifier, subvol.architecture, subvol.version)212            resp.set_header('Content-Type', 'text/plain')213            resp.set_header("Cache-Control", "public")214        except FileNotFoundError:215            resp.body = "Signature for %s not found" % subvol216            resp.status = falcon.HTTP_404217class PackageDiff(PoolResource):218    @templatize("packages.html")219    @parse_subvol220    def on_get(self, req, resp, subvol):221        print(subvol.domain)222        if not self.subvol_filter.match(subvol):223            resp.body = "Subvolume does not match filter"224            resp.status = falcon.HTTP_403225            return226        parent_subvol = req.get_param("parent")227        # TODO: Add heuristics to determine package management system,228        #       at least don't die with RPM systems229        def dpkg_list(root):230            """231            Return dict of package names and versions corresponding to a232            Debian/Ubuntu etc root filesystem233            """234            package_name = None235            package_version = None236            versions = {}237            for line in open(os.path.join(root, "var/lib/dpkg/status")):238                line = line[:-1]239                if not line:240                    assert package_name, "No package name specified!"241                    assert package_version, "No package version specified!"242                    versions[package_name] = package_version243                    package_name = None244                    package_version = None245                    continue246                if ": " not in line:247                    continue248                key, value = line.split(": ", 1)249                if key == "Package":250                    package_name = value251                    continue252                if key == "Version":253                    package_version = value254                    continue255            return versions256        new = dpkg_list("/var/lib/butterknife/pool/%s" % subvol)257        if not parent_subvol:258            packages_diff = False259            packages_intact = sorted(new.items())260        else:261            packages_diff = True262            if not self.subvol_filter.match(Subvol(parent_subvol)):263                resp.body = "Parent subvolume does not match filter"264                resp.status = falcon.HTTP_403265                return266            old = dpkg_list("/var/lib/butterknife/pool/%s" % parent_subvol)267            packages_added = []268            packages_removed = []269            packages_updated = []270            packages_intact = []271            for key in sorted(set(new) & set(old)):272                old_version = old[key]273                new_version = new[key]274                if old_version != new_version:275                    packages_updated.append((key, old_version, new_version))276                else:277                    packages_intact.append((key, old_version))278            for key in sorted(set(new) - set(old)):279                packages_added.append((key, new[key]))280            for key in sorted(set(old) - set(new)):281                packages_removed.append((key, old[key]))...importer.py
Source:importer.py  
1"""Import logic for blueprint."""2from __future__ import annotations3from dataclasses import dataclass4import html5import re6import voluptuous as vol7import yarl8from homeassistant.core import HomeAssistant9from homeassistant.exceptions import HomeAssistantError10from homeassistant.helpers import aiohttp_client, config_validation as cv11from homeassistant.util import yaml12from .models import Blueprint13from .schemas import is_blueprint_config14COMMUNITY_TOPIC_PATTERN = re.compile(15    r"^https://community.home-assistant.io/t/[a-z0-9-]+/(?P<topic>\d+)(?:/(?P<post>\d+)|)$"16)17COMMUNITY_CODE_BLOCK = re.compile(18    r'<code class="lang-(?P<syntax>[a-z]+)">(?P<content>(?:.|\n)*)</code>', re.MULTILINE19)20GITHUB_FILE_PATTERN = re.compile(21    r"^https://github.com/(?P<repository>.+)/blob/(?P<path>.+)$"22)23COMMUNITY_TOPIC_SCHEMA = vol.Schema(24    {25        "slug": str,26        "title": str,27        "post_stream": {"posts": [{"updated_at": cv.datetime, "cooked": str}]},28    },29    extra=vol.ALLOW_EXTRA,30)31class UnsupportedUrl(HomeAssistantError):32    """When the function doesn't support the url."""33@dataclass(frozen=True)34class ImportedBlueprint:35    """Imported blueprint."""36    suggested_filename: str37    raw_data: str38    blueprint: Blueprint39def _get_github_import_url(url: str) -> str:40    """Convert a GitHub url to the raw content.41    Async friendly.42    """43    if url.startswith("https://raw.githubusercontent.com/"):44        return url45    match = GITHUB_FILE_PATTERN.match(url)46    if match is None:47        raise UnsupportedUrl("Not a GitHub file url")48    repo, path = match.groups()49    return f"https://raw.githubusercontent.com/{repo}/{path}"50def _get_community_post_import_url(url: str) -> str:51    """Convert a forum post url to an import url.52    Async friendly.53    """54    match = COMMUNITY_TOPIC_PATTERN.match(url)55    if match is None:56        raise UnsupportedUrl("Not a topic url")57    _topic, post = match.groups()58    json_url = url59    if post is not None:60        # Chop off post part, ie /261        json_url = json_url[: -len(post) - 1]62    json_url += ".json"63    return json_url64def _extract_blueprint_from_community_topic(65    url: str,66    topic: dict,67) -> ImportedBlueprint | None:68    """Extract a blueprint from a community post JSON.69    Async friendly.70    """71    block_content = None72    blueprint = None73    post = topic["post_stream"]["posts"][0]74    for match in COMMUNITY_CODE_BLOCK.finditer(post["cooked"]):75        block_syntax, block_content = match.groups()76        if block_syntax not in ("auto", "yaml"):77            continue78        block_content = html.unescape(block_content.strip())79        try:80            data = yaml.parse_yaml(block_content)81        except HomeAssistantError:82            if block_syntax == "yaml":83                raise84            continue85        if not is_blueprint_config(data):86            continue87        blueprint = Blueprint(data)88        break89    if blueprint is None:90        raise HomeAssistantError(91            "No valid blueprint found in the topic. Blueprint syntax blocks need to be marked as YAML or no syntax."92        )93    return ImportedBlueprint(94        f'{post["username"]}/{topic["slug"]}', block_content, blueprint95    )96async def fetch_blueprint_from_community_post(97    hass: HomeAssistant, url: str98) -> ImportedBlueprint | None:99    """Get blueprints from a community post url.100    Method can raise aiohttp client exceptions, vol.Invalid.101    Caller needs to implement own timeout.102    """103    import_url = _get_community_post_import_url(url)104    session = aiohttp_client.async_get_clientsession(hass)105    resp = await session.get(import_url, raise_for_status=True)106    json_resp = await resp.json()107    json_resp = COMMUNITY_TOPIC_SCHEMA(json_resp)108    return _extract_blueprint_from_community_topic(url, json_resp)109async def fetch_blueprint_from_github_url(110    hass: HomeAssistant, url: str111) -> ImportedBlueprint:112    """Get a blueprint from a github url."""113    import_url = _get_github_import_url(url)114    session = aiohttp_client.async_get_clientsession(hass)115    resp = await session.get(import_url, raise_for_status=True)116    raw_yaml = await resp.text()117    data = yaml.parse_yaml(raw_yaml)118    blueprint = Blueprint(data)119    parsed_import_url = yarl.URL(import_url)120    suggested_filename = f"{parsed_import_url.parts[1]}/{parsed_import_url.parts[-1]}"121    if suggested_filename.endswith(".yaml"):122        suggested_filename = suggested_filename[:-5]123    return ImportedBlueprint(suggested_filename, raw_yaml, blueprint)124async def fetch_blueprint_from_github_gist_url(125    hass: HomeAssistant, url: str126) -> ImportedBlueprint:127    """Get a blueprint from a Github Gist."""128    if not url.startswith("https://gist.github.com/"):129        raise UnsupportedUrl("Not a GitHub gist url")130    parsed_url = yarl.URL(url)131    session = aiohttp_client.async_get_clientsession(hass)132    resp = await session.get(133        f"https://api.github.com/gists/{parsed_url.parts[2]}",134        headers={"Accept": "application/vnd.github.v3+json"},135        raise_for_status=True,136    )137    gist = await resp.json()138    blueprint = None139    filename = None140    content = None141    for filename, info in gist["files"].items():142        if not filename.endswith(".yaml"):143            continue144        content = info["content"]145        data = yaml.parse_yaml(content)146        if not is_blueprint_config(data):147            continue148        blueprint = Blueprint(data)149        break150    if blueprint is None:151        raise HomeAssistantError(152            "No valid blueprint found in the gist. The blueprint file needs to end with '.yaml'"153        )154    return ImportedBlueprint(155        f"{gist['owner']['login']}/{filename[:-5]}", content, blueprint156    )157async def fetch_blueprint_from_url(hass: HomeAssistant, url: str) -> ImportedBlueprint:158    """Get a blueprint from a url."""159    for func in (160        fetch_blueprint_from_community_post,161        fetch_blueprint_from_github_url,162        fetch_blueprint_from_github_gist_url,163    ):164        try:165            imported_bp = await func(hass, url)166            imported_bp.blueprint.update_metadata(source_url=url)167            return imported_bp168        except UnsupportedUrl:169            pass...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
