Best Python code snippet using playwright-python
globals.py
Source:globals.py  
1# Copyright (C) 2011-2012  Patrick Totzke <patricktotzke@gmail.com>2# Copyright © 2018 Dylan Baker3# This file is released under the GNU GPL, version 3 or a later revision.4# For further details see the COPYING file5import argparse6import code7import email8import email.utils9import glob10import logging11import os12import subprocess13from io import BytesIO14import asyncio15import shlex16import urwid17from . import Command, registerCommand18from . import CommandCanceled, SequenceCanceled19from .utils import update_keys20from .. import commands21from .. import buffers22from .. import helper23from ..helper import split_commandstring24from ..helper import mailto_to_envelope25from ..completion.commandline import CommandLineCompleter26from ..completion.contacts import ContactsCompleter27from ..completion.accounts import AccountCompleter28from ..completion.tags import TagsCompleter29from ..widgets.utils import DialogBox30from ..db.errors import DatabaseLockedError31from ..db.envelope import Envelope32from ..settings.const import settings33from ..settings.errors import ConfigError, NoMatchingAccount34from ..utils import argparse as cargparse35MODE = 'global'36@registerCommand(MODE, 'exit', help="shut down cleanly")37class ExitCommand(Command):38    """Shut down cleanly."""39    def __init__(self, _prompt=True, **kwargs):40        """41        :param _prompt: For internal use only, used to control prompting to42                        close without sending, and is used by the43                        BufferCloseCommand if settings change after yielding to44                        the UI.45        :type _prompt: bool46        """47        super(ExitCommand, self).__init__(**kwargs)48        self.prompt_to_send = _prompt49    async def apply(self, ui):50        if settings.get('bug_on_exit'):51            msg = 'really quit?'52            if (await ui.choice(msg, select='yes', cancel='no',53                                msg_position='left')) == 'no':54                return55        # check if there are any unsent messages56        if self.prompt_to_send:57            for buffer in ui.buffers:58                if (isinstance(buffer, buffers.EnvelopeBuffer) and59                        not buffer.envelope.sent_time):60                    msg = 'quit without sending message?'61                    if (await ui.choice(msg, cancel='no',62                                        msg_position='left')) == 'no':63                        raise CommandCanceled()64        for b in ui.buffers:65            b.cleanup()66        await ui.apply_command(FlushCommand(callback=ui.exit))67        ui.cleanup()68        if ui.db_was_locked:69            msg = 'Database locked. Exit without saving?'70            response = await ui.choice(msg, msg_position='left', cancel='no')71            if response == 'no':72                return73            ui.exit()74@registerCommand(MODE, 'search', usage='search query', arguments=[75    (['--sort'], {'help': 'sort order', 'choices': [76        'oldest_first', 'newest_first', 'message_id', 'unsorted']}),77    (['query'], {'nargs': argparse.REMAINDER, 'help': 'search string'})])78class SearchCommand(Command):79    """open a new search buffer. Search obeys the notmuch80    :ref:`search.exclude_tags <search.exclude_tags>` setting."""81    repeatable = True82    def __init__(self, query, sort=None, **kwargs):83        """84        :param query: notmuch querystring85        :type query: str86        :param sort: how to order results. Must be one of87                     'oldest_first', 'newest_first', 'message_id' or88                     'unsorted'.89        :type sort: str90        """91        self.query = ' '.join(query)92        self.order = sort93        Command.__init__(self, **kwargs)94    def apply(self, ui):95        if self.query:96            open_searches = ui.get_buffers_of_type(buffers.SearchBuffer)97            to_be_focused = None98            for sb in open_searches:99                if sb.querystring == self.query:100                    to_be_focused = sb101            if to_be_focused:102                if ui.current_buffer != to_be_focused:103                    ui.buffer_focus(to_be_focused)104                else:105                    # refresh an already displayed search106                    ui.current_buffer.rebuild()107                    ui.update()108            else:109                ui.buffer_open(buffers.SearchBuffer(ui, self.query,110                                                    sort_order=self.order))111        else:112            ui.notify('empty query string')113@registerCommand(MODE, 'prompt', arguments=[114    (['startwith'], {'nargs': '?', 'default': '', 'help': 'initial content'})])115class PromptCommand(Command):116    """prompts for commandline and interprets it upon select"""117    def __init__(self, startwith='', **kwargs):118        """119        :param startwith: initial content of the prompt widget120        :type startwith: str121        """122        self.startwith = startwith123        Command.__init__(self, **kwargs)124    async def apply(self, ui):125        logging.info('open command shell')126        mode = ui.mode or 'global'127        cmpl = CommandLineCompleter(ui.dbman, mode, ui.current_buffer)128        cmdline = await ui.prompt(129            '',130            text=self.startwith,131            completer=cmpl,132            history=ui.commandprompthistory)133        logging.debug('CMDLINE: %s', cmdline)134        # interpret and apply commandline135        if cmdline:136            # save into prompt history137            ui.commandprompthistory.append(cmdline)138            await ui.apply_commandline(cmdline)139        else:140            raise CommandCanceled()141@registerCommand(MODE, 'refresh')142class RefreshCommand(Command):143    """refresh the current buffer"""144    repeatable = True145    def apply(self, ui):146        ui.current_buffer.rebuild()147        ui.update()148@registerCommand(149    MODE, 'shellescape', arguments=[150        (['--spawn'], {'action': cargparse.BooleanAction, 'default': None,151                       'help': 'run in terminal window'}),152        (['--thread'], {'action': cargparse.BooleanAction, 'default': None,153                        'help': 'run in separate thread'}),154        (['--refocus'], {'action': cargparse.BooleanAction,155                         'help': 'refocus current buffer after command '156                                 'has finished'}),157        (['cmd'], {'help': 'command line to execute'})],158    forced={'shell': True},159)160class ExternalCommand(Command):161    """run external command"""162    repeatable = True163    def __init__(self, cmd, stdin=None, shell=False, spawn=False,164                 refocus=True, thread=False, on_success=None, **kwargs):165        """166        :param cmd: the command to call167        :type cmd: list or str168        :param stdin: input to pipe to the process169        :type stdin: file or str170        :param spawn: run command in a new terminal171        :type spawn: bool172        :param shell: let shell interpret command string173        :type shell: bool174        :param thread: run asynchronously, don't block alot175        :type thread: bool176        :param refocus: refocus calling buffer after cmd termination177        :type refocus: bool178        :param on_success: code to execute after command successfully exited179        :type on_success: callable180        """181        logging.debug({'spawn': spawn})182        # make sure cmd is a list of str183        if isinstance(cmd, str):184            # convert cmdstring to list: in case shell==True,185            # Popen passes only the first item in the list to $SHELL186            cmd = [cmd] if shell else split_commandstring(cmd)187        # determine complete command list to pass188        touchhook = settings.get_hook('touch_external_cmdlist')189        # filter cmd, shell and thread through hook if defined190        if touchhook is not None:191            logging.debug('calling hook: touch_external_cmdlist')192            res = touchhook(cmd, shell=shell, spawn=spawn, thread=thread)193            logging.debug('got: %s', res)194            cmd, shell, self.in_thread = res195        # otherwise if spawn requested and X11 is running196        elif spawn:197            if 'DISPLAY' in os.environ:198                term_cmd = settings.get('terminal_cmd', '')199                logging.info('spawn in terminal: %s', term_cmd)200                termcmdlist = split_commandstring(term_cmd)201                cmd = termcmdlist + cmd202            else:203                thread = False204        self.cmdlist = cmd205        self.stdin = stdin206        self.shell = shell207        self.refocus = refocus208        self.in_thread = thread209        self.on_success = on_success210        Command.__init__(self, **kwargs)211    async def apply(self, ui):212        logging.debug('cmdlist: %s', self.cmdlist)213        callerbuffer = ui.current_buffer214        # set standard input for subcommand215        stdin = None216        if self.stdin is not None:217            # wrap strings in StrinIO so that they behaves like a file218            if isinstance(self.stdin, str):219                # XXX: is utf-8 always safe to use here, or do we need to check220                # the terminal encoding first?221                stdin = BytesIO(self.stdin.encode('utf-8'))222            else:223                stdin = self.stdin224        logging.info('calling external command: %s', self.cmdlist)225        ret = ''226        # TODO: these can probably be refactored in terms of helper.call_cmd227        # and helper.call_cmd_async228        if self.in_thread:229            try:230                if self.shell:231                    _cmd = asyncio.create_subprocess_shell232                    # The shell function wants a single string or bytestring,233                    # we could just join it, but lets be extra safe and use234                    # shlex.quote to avoid suprises.235                    cmdlist = [shlex.quote(' '.join(self.cmdlist))]236                else:237                    _cmd = asyncio.create_subprocess_exec238                    cmdlist = self.cmdlist239                proc = await _cmd(240                    *cmdlist,241                    stdout=subprocess.PIPE,242                    stderr=subprocess.PIPE,243                    stdin=subprocess.PIPE if stdin else None)244            except OSError as e:245                ret = str(e)246            else:247                _, err = await proc.communicate(stdin.read() if stdin else None)248                if proc.returncode == 0:249                    ret = 'success'250                elif err:251                    ret = err.decode(urwid.util.detected_encoding)252        else:253            with ui.paused():254                try:255                    proc = subprocess.Popen(256                        self.cmdlist, shell=self.shell,257                        stdin=subprocess.PIPE if stdin else None,258                        stderr=subprocess.PIPE)259                except OSError as e:260                    ret = str(e)261                else:262                    _, err = proc.communicate(stdin.read() if stdin else None)263                if proc.returncode == 0:264                    ret = 'success'265                elif err:266                    ret = err.decode(urwid.util.detected_encoding)267        if ret == 'success':268            if self.on_success is not None:269                self.on_success()270        else:271            msg = "editor has exited with error code {} -- {}".format(272                    proc.returncode,273                    ret or "No stderr output")274            ui.notify(msg, priority='error')275        if self.refocus and callerbuffer in ui.buffers:276            logging.info('refocussing')277            ui.buffer_focus(callerbuffer)278class EditCommand(ExternalCommand):279    """edit a file"""280    def __init__(self, path, spawn=None, thread=None, **kwargs):281        """282        :param path: path to the file to be edited283        :type path: str284        :param spawn: force running edtor in a new terminal285        :type spawn: bool286        :param thread: run asynchronously, don't block alot287        :type thread: bool288        """289        self.spawn = spawn290        if spawn is None:291            self.spawn = settings.get('editor_spawn')292        self.thread = thread293        if thread is None:294            self.thread = settings.get('editor_in_thread')295        editor_cmdstring = None296        if os.path.isfile('/usr/bin/editor'):297            editor_cmdstring = '/usr/bin/editor'298        editor_cmdstring = os.environ.get('EDITOR', editor_cmdstring)299        editor_cmdstring = settings.get('editor_cmd') or editor_cmdstring300        logging.debug('using editor_cmd: %s', editor_cmdstring)301        self.cmdlist = None302        if editor_cmdstring:303            if '%s' in editor_cmdstring:304                cmdstring = editor_cmdstring.replace('%s',305                                                     helper.shell_quote(path))306                self.cmdlist = split_commandstring(cmdstring)307            else:308                self.cmdlist = split_commandstring(editor_cmdstring) + [path]309        logging.debug({'spawn: ': self.spawn, 'in_thread': self.thread})310        ExternalCommand.__init__(self, self.cmdlist,311                                 spawn=self.spawn, thread=self.thread,312                                 **kwargs)313    async def apply(self, ui):314        if self.cmdlist is None:315            ui.notify('no editor set', priority='error')316        else:317            return await ExternalCommand.apply(self, ui)318@registerCommand(MODE, 'pyshell')319class PythonShellCommand(Command):320    """open an interactive python shell for introspection"""321    repeatable = True322    def apply(self, ui):323        with ui.paused():324            code.interact(local=locals())325@registerCommand(MODE, 'repeat')326class RepeatCommand(Command):327    """repeat the command executed last time"""328    def __init__(self, **kwargs):329        Command.__init__(self, **kwargs)330    async def apply(self, ui):331        if ui.last_commandline is not None:332            await ui.apply_commandline(ui.last_commandline)333        else:334            ui.notify('no last command')335@registerCommand(MODE, 'call', arguments=[336    (['command'], {'help': 'python command string to call'})])337class CallCommand(Command):338    """execute python code"""339    repeatable = True340    def __init__(self, command, **kwargs):341        """342        :param command: python command string to call343        :type command: str344        """345        Command.__init__(self, **kwargs)346        self.command = command347    async def apply(self, ui):348        try:349            hooks = settings.hooks350            if hooks:351                env = {'ui': ui, 'settings': settings}352                for k, v in env.items():353                    if not getattr(hooks, k, None):354                        setattr(hooks, k, v)355            t = eval(self.command)356            if asyncio.iscoroutine(t):357                await t358        except Exception as e:359            logging.exception(e)360            msg = 'an error occurred during execution of "%s":\n%s'361            ui.notify(msg % (self.command, e), priority='error')362@registerCommand(MODE, 'bclose', arguments=[363    (['--redraw'],364     {'action': cargparse.BooleanAction,365      'help': 'redraw current buffer after command has finished'}),366    (['--force'],367     {'action': 'store_true', 'help': 'never ask for confirmation'})])368class BufferCloseCommand(Command):369    """close a buffer"""370    repeatable = True371    def __init__(self, buffer=None, force=False, redraw=True, **kwargs):372        """373        :param buffer: the buffer to close or None for current374        :type buffer: `alot.buffers.Buffer`375        :param force: force buffer close376        :type force: bool377        """378        self.buffer = buffer379        self.force = force380        self.redraw = redraw381        Command.__init__(self, **kwargs)382    async def apply(self, ui):383        async def one_buffer(prompt=True):384            """Helper to handle the case on only one buffer being opened.385            prompt is a boolean that is passed to ExitCommand() as the _prompt386            keyword argument.387            """388            # If there is only one buffer and the settings don't allow using389            # closebuffer to exit, then just stop.390            if not settings.get('quit_on_last_bclose'):391                msg = ('not closing last remaining buffer as '392                       'global.quit_on_last_bclose is set to False')393                logging.info(msg)394                ui.notify(msg, priority='error')395            # Otherwise pass directly to exit command, which also prommpts for396            # 'close without sending'397            else:398                logging.info('closing the last buffer, exiting')399                await ui.apply_command(ExitCommand(_prompt=prompt))400        if self.buffer is None:401            self.buffer = ui.current_buffer402        if len(ui.buffers) == 1:403            await one_buffer()404            return405        if (isinstance(self.buffer, buffers.EnvelopeBuffer) and406                not self.buffer.envelope.sent_time):407            msg = 'close without sending?'408            if (not self.force and (await ui.choice(msg, cancel='no',409                                                    msg_position='left')) ==410                    'no'):411                raise CommandCanceled()412        # Because we await above it is possible that the settings or the number413        # of buffers chould change, so retest.414        if len(ui.buffers) == 1:415            await one_buffer(prompt=False)416        else:417            ui.buffer_close(self.buffer, self.redraw)418@registerCommand(MODE, 'bprevious', forced={'offset': -1},419                 help='focus previous buffer')420@registerCommand(MODE, 'bnext', forced={'offset': +1},421                 help='focus next buffer')422@registerCommand(423    MODE, 'buffer',424    arguments=[(['index'], {'type': int, 'help': 'buffer index to focus'})],425    help='focus buffer with given index')426class BufferFocusCommand(Command):427    """focus a :class:`~alot.buffers.Buffer`"""428    repeatable = True429    def __init__(self, buffer=None, index=None, offset=0, **kwargs):430        """431        :param buffer: the buffer to focus or None432        :type buffer: `alot.buffers.Buffer`433        :param index: index (in bufferlist) of the buffer to focus.434        :type index: int435        :param offset: position of the buffer to focus relative to the436                       currently focussed one. This is used only if `buffer`437                       is set to `None`438        :type offset: int439        """440        self.buffer = buffer441        self.index = index442        self.offset = offset443        Command.__init__(self, **kwargs)444    def apply(self, ui):445        if self.buffer is None:446            if self.index is not None:447                try:448                    self.buffer = ui.buffers[self.index]449                except IndexError:450                    ui.notify('no buffer exists at index %d' % self.index)451                    return452            else:453                self.index = ui.buffers.index(ui.current_buffer)454            num = len(ui.buffers)455            self.buffer = ui.buffers[(self.index + self.offset) % num]456        ui.buffer_focus(self.buffer)457@registerCommand(MODE, 'bufferlist')458class OpenBufferlistCommand(Command):459    """open a list of active buffers"""460    def __init__(self, filtfun=lambda x: x, **kwargs):461        """462        :param filtfun: filter to apply to displayed list463        :type filtfun: callable (str->bool)464        """465        self.filtfun = filtfun466        Command.__init__(self, **kwargs)467    def apply(self, ui):468        blists = ui.get_buffers_of_type(buffers.BufferlistBuffer)469        if blists:470            ui.buffer_focus(blists[0])471        else:472            bl = buffers.BufferlistBuffer(ui, self.filtfun)473            ui.buffer_open(bl)474@registerCommand(MODE, 'taglist', arguments=[475    (['--tags'], {'nargs': '+', 'help': 'tags to display'}),476])477class TagListCommand(Command):478    """opens taglist buffer"""479    def __init__(self, filtfun=lambda x: x, tags=None, **kwargs):480        """481        :param filtfun: filter to apply to displayed list482        :type filtfun: callable (str->bool)483        """484        self.filtfun = filtfun485        self.tags = tags486        Command.__init__(self, **kwargs)487    def apply(self, ui):488        tags = self.tags or ui.dbman.get_all_tags()489        blists = ui.get_buffers_of_type(buffers.TagListBuffer)490        if blists:491            buf = blists[0]492            buf.tags = tags493            buf.rebuild()494            ui.buffer_focus(buf)495        else:496            ui.buffer_open(buffers.TagListBuffer(ui, tags, self.filtfun))497@registerCommand(MODE, 'namedqueries')498class NamedQueriesCommand(Command):499    """opens named queries buffer"""500    def __init__(self, filtfun=bool, **kwargs):501        """502        :param filtfun: filter to apply to displayed list503        :type filtfun: callable (str->bool)504        """505        self.filtfun = filtfun506        Command.__init__(self, **kwargs)507    def apply(self, ui):508        ui.buffer_open(buffers.NamedQueriesBuffer(ui, self.filtfun))509@registerCommand(MODE, 'flush')510class FlushCommand(Command):511    """flush write operations or retry until committed"""512    repeatable = True513    def __init__(self, callback=None, silent=False, **kwargs):514        """515        :param callback: function to call after successful writeout516        :type callback: callable517        """518        Command.__init__(self, **kwargs)519        self.callback = callback520        self.silent = silent521    def apply(self, ui):522        try:523            ui.dbman.flush()524            if callable(self.callback):525                self.callback()526            logging.debug('flush complete')527            if ui.db_was_locked:528                if not self.silent:529                    ui.notify('changes flushed')530                ui.db_was_locked = False531            ui.update()532        except DatabaseLockedError:533            timeout = settings.get('flush_retry_timeout')534            if timeout > 0:535                def f(*_):536                    self.apply(ui)537                ui.mainloop.set_alarm_in(timeout, f)538                if not ui.db_was_locked:539                    if not self.silent:540                        ui.notify('index locked, will try again in %d secs'541                                  % timeout)542                    ui.db_was_locked = True543            ui.update()544            return545# TODO: choices546@registerCommand(MODE, 'help', arguments=[547    (['commandname'], {'help': 'command or \'bindings\''})])548class HelpCommand(Command):549    """display help for a command (use \'bindings\' to display all keybindings550    interpreted in current mode)"""551    def __init__(self, commandname='', **kwargs):552        """553        :param commandname: command to document554        :type commandname: str555        """556        Command.__init__(self, **kwargs)557        self.commandname = commandname558    def apply(self, ui):559        logging.debug('HELP')560        if self.commandname == 'bindings':561            text_att = settings.get_theming_attribute('help', 'text')562            title_att = settings.get_theming_attribute('help', 'title')563            section_att = settings.get_theming_attribute('help', 'section')564            # get mappings565            globalmaps, modemaps = settings.get_keybindings(ui.mode)566            # build table567            maxkeylength = len(568                max(list(modemaps.keys()) + list(globalmaps.keys()), key=len))569            keycolumnwidth = maxkeylength + 2570            linewidgets = []571            # mode specific maps572            if modemaps:573                txt = (section_att, '\n%s-mode specific maps' % ui.mode)574                linewidgets.append(urwid.Text(txt))575                for (k, v) in modemaps.items():576                    line = urwid.Columns([('fixed', keycolumnwidth,577                                           urwid.Text((text_att, k))),578                                          urwid.Text((text_att, v))])579                    linewidgets.append(line)580            # global maps581            linewidgets.append(urwid.Text((section_att, '\nglobal maps')))582            for (k, v) in globalmaps.items():583                if k not in modemaps:584                    line = urwid.Columns(585                        [('fixed', keycolumnwidth, urwid.Text((text_att, k))),586                         urwid.Text((text_att, v))])587                    linewidgets.append(line)588            body = urwid.ListBox(linewidgets)589            titletext = 'Bindings Help (escape cancels)'590            box = DialogBox(body, titletext,591                            bodyattr=text_att,592                            titleattr=title_att)593            # put promptwidget as overlay on main widget594            overlay = urwid.Overlay(box, ui.root_widget, 'center',595                                    ('relative', 70), 'middle',596                                    ('relative', 70))597            ui.show_as_root_until_keypress(overlay, 'esc')598        else:599            logging.debug('HELP %s', self.commandname)600            parser = commands.lookup_parser(self.commandname, ui.mode)601            if parser:602                ui.notify(parser.format_help(), block=True)603            else:604                ui.notify('command not known: %s' % self.commandname,605                          priority='error')606@registerCommand(MODE, 'compose', arguments=[607    (['--sender'], {'nargs': '?', 'help': 'sender'}),608    (['--template'], {'nargs': '?',609                      'help': 'path to a template message file'}),610    (['--tags'], {'nargs': '?',611                  'help': 'comma-separated list of tags to apply to message'}),612    (['--subject'], {'nargs': '?', 'help': 'subject line'}),613    (['--to'], {'nargs': '+', 'help': 'recipients'}),614    (['--cc'], {'nargs': '+', 'help': 'copy to'}),615    (['--bcc'], {'nargs': '+', 'help': 'blind copy to'}),616    (['--attach'], {'nargs': '+', 'help': 'attach files'}),617    (['--omit_signature'], {'action': 'store_true',618                            'help': 'do not add signature'}),619    (['--spawn'], {'action': cargparse.BooleanAction, 'default': None,620                   'help': 'spawn editor in new terminal'}),621    (['rest'], {'nargs': '*'}),622])623class ComposeCommand(Command):624    """compose a new email"""625    def __init__(626            self,627            envelope=None, headers=None, template=None, sender='',628            tags=None, subject='', to=None, cc=None, bcc=None, attach=None,629            omit_signature=False, spawn=None, rest=None, encrypt=False,630            **kwargs):631        """632        :param envelope: use existing envelope633        :type envelope: :class:`~alot.db.envelope.Envelope`634        :param headers: forced header values635        :type headers: dict (str->str)636        :param template: name of template to parse into the envelope after637                         creation. This should be the name of a file in your638                         template_dir639        :type template: str640        :param sender: From-header value641        :type sender: str642        :param tags: Comma-separated list of tags to apply to message643        :type tags: list(str)644        :param subject: Subject-header value645        :type subject: str646        :param to: To-header value647        :type to: str648        :param cc: Cc-header value649        :type cc: str650        :param bcc: Bcc-header value651        :type bcc: str652        :param attach: Path to files to be attached (globable)653        :type attach: str654        :param omit_signature: do not attach/append signature655        :type omit_signature: bool656        :param spawn: force spawning of editor in a new terminal657        :type spawn: bool658        :param rest: remaining parameters. These can start with659                     'mailto' in which case it is interpreted as mailto string.660                     Otherwise it will be interpreted as recipients (to) header661        :type rest: list(str)662        :param encrypt: if the email should be encrypted663        :type encrypt: bool664        """665        Command.__init__(self, **kwargs)666        self.envelope = envelope667        self.template = template668        self.headers = headers or {}669        self.sender = sender670        self.subject = subject671        self.to = to or []672        self.cc = cc or []673        self.bcc = bcc or []674        self.attach = attach675        self.omit_signature = omit_signature676        self.force_spawn = spawn677        self.rest = ' '.join(rest or [])678        self.encrypt = encrypt679        self.tags = tags680    class ApplyError(Exception):681        pass682    def _get_template(self, ui):683        # get location of tempsdir, containing msg templates684        tempdir = settings.get('template_dir')685        path = os.path.expanduser(self.template)686        if not os.path.dirname(path):  # use tempsdir687            if not os.path.isdir(tempdir):688                ui.notify('no templates directory: %s' % tempdir,689                          priority='error')690                raise self.ApplyError()691            path = os.path.join(tempdir, path)692        if not os.path.isfile(path):693            ui.notify('could not find template: %s' % path,694                      priority='error')695            raise self.ApplyError()696        try:697            with open(path, 'rb') as f:698                template = helper.try_decode(f.read())699            self.envelope.parse_template(template)700        except Exception as e:701            ui.notify(str(e), priority='error')702            raise self.ApplyError()703    async def _get_sender_details(self, ui):704        # find out the right account, if possible yet705        account = self.envelope.account706        if account is None:707            accounts = settings.get_accounts()708            if not accounts:709                ui.notify('no accounts set.', priority='error')710                return711            elif len(accounts) == 1:712                account = accounts[0]713        # get missing From header714        if 'From' not in self.envelope.headers:715            if account is not None:716                fromstring = email.utils.formataddr(717                    (account.realname, str(account.address)))718                self.envelope.add('From', fromstring)719            else:720                cmpl = AccountCompleter()721                fromaddress = await ui.prompt('From', completer=cmpl,722                                              tab=1, history=ui.senderhistory)723                if fromaddress is None:724                    raise CommandCanceled()725                ui.senderhistory.append(fromaddress)726                self.envelope.add('From', fromaddress)727        else:728            fromaddress = self.envelope.get("From")729        # try to find the account again730        if account is None:731            try:732                account = settings.account_matching_address(fromaddress)733            except NoMatchingAccount:734                msg = 'Cannot compose mail - ' \735                      'no account found for `%s`' % fromaddress736                logging.error(msg)737                ui.notify(msg, priority='error')738                raise CommandCanceled()739        if self.envelope.account is None:740            self.envelope.account = account741    async def _set_signature(self, ui):742        account = self.envelope.account743        if not self.omit_signature and account.signature:744            logging.debug('has signature')745            sig = os.path.expanduser(account.signature)746            if os.path.isfile(sig):747                logging.debug('is file')748                if account.signature_as_attachment:749                    name = account.signature_filename or None750                    self.envelope.attach(sig, filename=name)751                    logging.debug('attached')752                else:753                    with open(sig, 'rb') as f:754                        sigcontent = f.read()755                    mimetype = helper.guess_mimetype(sigcontent)756                    if mimetype.startswith('text'):757                        sigcontent = helper.try_decode(sigcontent)758                        self.envelope.body_txt += '\n' + sigcontent759            else:760                ui.notify('could not locate signature: %s' % sig,761                          priority='error')762                if (await ui.choice('send without signature?', 'yes',763                                    'no')) == 'no':764                    raise self.ApplyError765    async def apply(self, ui):766        try:767            await self.__apply(ui)768        except self.ApplyError:769            return770    def _get_account(self, ui):771        # find out the right account772        sender = self.envelope.get('From')773        _, addr = email.utils.parseaddr(sender)774        try:775            account = settings.get_account_by_address(addr)776        except NoMatchingAccount:777            msg = 'Cannot compose mail - no account found for `%s`' % addr778            logging.error(msg)779            ui.notify(msg, priority='error')780            raise CommandCanceled()781        if account is None:782            accounts = settings.get_accounts()783            if not accounts:784                ui.notify('no accounts set.', priority='error')785                raise self.ApplyError786            account = accounts[0]787        return account788    def _set_envelope(self):789        if self.envelope is None:790            if self.rest:791                if self.rest.startswith('mailto'):792                    self.envelope = mailto_to_envelope(self.rest)793                else:794                    self.envelope = Envelope()795                    self.envelope.add('To', self.rest)796            else:797                self.envelope = Envelope()798    def _set_gpg_sign(self, ui):799        account = self.envelope.account800        if account.sign_by_default:801            if account.gpg_key:802                self.envelope.sign = account.sign_by_default803                self.envelope.sign_key = account.gpg_key804            else:805                msg = 'Cannot find gpg key for account {}'806                msg = msg.format(account.address)807                logging.warning(msg)808                ui.notify(msg, priority='error')809    async def _set_to(self, ui):810        account = self.envelope.account811        if 'To' not in self.envelope.headers:812            allbooks = not settings.get('complete_matching_abook_only')813            logging.debug(allbooks)814            abooks = settings.get_addressbooks(order=[account],815                                               append_remaining=allbooks)816            logging.debug(abooks)817            completer = ContactsCompleter(abooks)818            to = await ui.prompt('To', completer=completer,819                                 history=ui.recipienthistory)820            if to is None:821                raise CommandCanceled()822            to = to.strip(' \t\n,')823            ui.recipienthistory.append(to)824            self.envelope.add('To', to)825    async def _set_gpg_encrypt(self, ui):826        account = self.envelope.account827        if self.encrypt or account.encrypt_by_default == "all":828            logging.debug("Trying to encrypt message because encrypt=%s and "829                          "encrypt_by_default=%s", self.encrypt,830                          account.encrypt_by_default)831            await update_keys(ui, self.envelope, block_error=self.encrypt)832        elif account.encrypt_by_default == "trusted":833            logging.debug("Trying to encrypt message because "834                          "account.encrypt_by_default=%s",835                          account.encrypt_by_default)836            await update_keys(ui, self.envelope, block_error=self.encrypt,837                              signed_only=True)838        else:839            logging.debug("No encryption by default, encrypt_by_default=%s",840                          account.encrypt_by_default)841    def _set_base_attributes(self):842        # set forced headers843        for key, value in self.headers.items():844            self.envelope.add(key, value)845        # set forced headers for separate parameters846        if self.sender:847            self.envelope.add('From', self.sender)848        if self.subject:849            self.envelope.add('Subject', self.subject)850        if self.to:851            self.envelope.add('To', ','.join(self.to))852        if self.cc:853            self.envelope.add('Cc', ','.join(self.cc))854        if self.bcc:855            self.envelope.add('Bcc', ','.join(self.bcc))856        if self.tags:857            self.envelope.tags = [t for t in self.tags.split(',') if t]858    async def _set_subject(self, ui):859        if settings.get('ask_subject') and \860                'Subject' not in self.envelope.headers:861            subject = await ui.prompt('Subject')862            logging.debug('SUBJECT: "%s"', subject)863            if subject is None:864                raise CommandCanceled()865            self.envelope.add('Subject', subject)866    async def _set_compose_tags(self, ui):867        if settings.get('compose_ask_tags'):868            comp = TagsCompleter(ui.dbman)869            tags = ','.join(self.tags) if self.tags else ''870            tagsstring = await ui.prompt('Tags', text=tags, completer=comp)871            tags = [t for t in tagsstring.split(',') if t]872            if tags is None:873                raise CommandCanceled()874            self.envelope.tags = tags875    def _set_attachments(self):876        if self.attach:877            for gpath in self.attach:878                for a in glob.glob(gpath):879                    self.envelope.attach(a)880                    logging.debug('attaching: %s', a)881    async def __apply(self, ui):882        self._set_envelope()883        if self.template is not None:884            self._get_template(ui)885        # Set headers and tags886        self._set_base_attributes()887        # set account and missing From header888        await self._get_sender_details(ui)889        # add signature890        await self._set_signature(ui)891        # Figure out whether we should GPG sign messages by default892        # and look up key if so893        self._set_gpg_sign(ui)894        # get missing To header895        await self._set_to(ui)896        # Set subject897        await self._set_subject(ui)898        # Add additional tags899        await self._set_compose_tags(ui)900        # Set attachments901        self._set_attachments()902        # set encryption if needed903        await self._set_gpg_encrypt(ui)904        cmd = commands.envelope.EditCommand(envelope=self.envelope,905                                            spawn=self.force_spawn,906                                            refocus=False)907        await ui.apply_command(cmd)908@registerCommand(909    MODE, 'move', help='move focus in current buffer',910    arguments=[911        (['movement'],912         {'nargs': argparse.REMAINDER,913          'help': 'up, down, [half]page up, [half]page down, first, last'})])914class MoveCommand(Command):915    """move in widget"""916    def __init__(self, movement=None, **kwargs):917        if movement is None:918            self.movement = ''919        else:920            self.movement = ' '.join(movement)921        Command.__init__(self, **kwargs)922    def apply(self, ui):923        if self.movement in ['up', 'down', 'page up', 'page down']:924            ui.mainloop.process_input([self.movement])925        elif self.movement in ['halfpage down', 'halfpage up']:926            ui.mainloop.process_input(927                ui.mainloop.screen_size[1] // 2 * [self.movement.split()[-1]])928        elif self.movement == 'first':929            if hasattr(ui.current_buffer, "focus_first"):930                ui.current_buffer.focus_first()931                ui.update()932        elif self.movement == 'last':933            if hasattr(ui.current_buffer, "focus_last"):934                ui.current_buffer.focus_last()935                ui.update()936        else:937            ui.notify('unknown movement: ' + self.movement,938                      priority='error')939@registerCommand(MODE, 'reload', help='reload all configuration files')940class ReloadCommand(Command):941    """Reload configuration."""942    def apply(self, ui):943        try:944            settings.reload()945        except ConfigError as e:946            ui.notify('Error when reloading config files:\n {}'.format(e),947                      priority='error')948@registerCommand(949    MODE, 'savequery',950    arguments=[951        (['--no-flush'], {'action': 'store_false', 'dest': 'flush',952                          'default': 'True',953                          'help': 'postpone a writeout to the index'}),954        (['alias'], {'help': 'alias to use for query string'}),955        (['query'], {'help': 'query string to store',956                     'nargs': '+'})957    ],958    help='store query string as a "named query" in the database')959class SaveQueryCommand(Command):960    """save alias for query string"""961    repeatable = False962    def __init__(self, alias, query=None, flush=True, **kwargs):963        """964        :param alias: name to use for query string965        :type alias: str966        :param query: query string to save967        :type query: str or None968        :param flush: immediately write out to the index969        :type flush: bool970        """971        self.alias = alias972        if query is None:973            self.query = ''974        else:975            self.query = ' '.join(query)976        self.flush = flush977        Command.__init__(self, **kwargs)978    def apply(self, ui):979        msg = 'saved alias "%s" for query string "%s"' % (self.alias,980                                                          self.query)981        try:982            ui.dbman.save_named_query(self.alias, self.query)983            logging.debug(msg)984            ui.notify(msg)985        except DatabaseROError:986            ui.notify('index in read-only mode', priority='error')987            return988        # flush index989        if self.flush:990            ui.apply_command(commands.globals.FlushCommand())991@registerCommand(992    MODE, 'removequery',993    arguments=[994        (['--no-flush'], {'action': 'store_false', 'dest': 'flush',995                          'default': 'True',996                          'help': 'postpone a writeout to the index'}),997        (['alias'], {'help': 'alias to remove'}),998    ],999    help='removes a "named query" from the database')1000class RemoveQueryCommand(Command):1001    """remove named query string for given alias"""1002    repeatable = False1003    def __init__(self, alias, flush=True, **kwargs):1004        """1005        :param alias: name to use for query string1006        :type alias: str1007        :param flush: immediately write out to the index1008        :type flush: bool1009        """1010        self.alias = alias1011        self.flush = flush1012        Command.__init__(self, **kwargs)1013    def apply(self, ui):1014        msg = 'removed alias "%s"' % (self.alias)1015        try:1016            ui.dbman.remove_named_query(self.alias)1017            logging.debug(msg)1018            ui.notify(msg)1019        except DatabaseROError:1020            ui.notify('index in read-only mode', priority='error')1021            return1022        # flush index1023        if self.flush:1024            ui.apply_command(commands.globals.FlushCommand())1025@registerCommand(1026    MODE, 'confirmsequence',1027    arguments=[1028        (['msg'], {'help': 'Additional message to prompt',1029                   'nargs': '*'})1030    ],1031    help="prompt to confirm a sequence of commands")1032class ConfirmCommand(Command):1033    """Prompt user to confirm a sequence of commands."""1034    def __init__(self, msg=None, **kwargs):1035        """1036        :param msg: Additional message to prompt the user with1037        :type msg: List[str]1038        """1039        super(ConfirmCommand, self).__init__(**kwargs)1040        if not msg:1041            self.msg = "Confirm sequence?"1042        else:1043            self.msg = "Confirm sequence: {}?".format(" ".join(msg))1044    async def apply(self, ui):1045        if (await ui.choice(self.msg, select='yes', cancel='no',1046                            msg_position='left')) == 'no':..._assertions.py
Source:_assertions.py  
...467        timeout: float = None,468    ) -> None:469        __tracebackhide__ = True470        await self._not.to_be_visible(timeout)471    async def to_be_focused(472        self,473        timeout: float = None,474    ) -> None:475        __tracebackhide__ = True476        await self._expect_impl(477            "to.be.focused",478            FrameExpectOptions(timeout=timeout),479            None,480            "Locator expected to be focused",481        )482    async def not_to_be_focused(483        self,484        timeout: float = None,485    ) -> None:486        __tracebackhide__ = True487        await self._not.to_be_focused(timeout)488class APIResponseAssertions:489    def __init__(self, response: APIResponse, is_not: bool = False) -> None:490        self._loop = response._loop491        self._dispatcher_fiber = response._dispatcher_fiber492        self._is_not = is_not493        self._actual = response494    @property495    def _not(self) -> "APIResponseAssertions":496        return APIResponseAssertions(self._actual, not self._is_not)497    async def to_be_ok(498        self,499    ) -> None:500        __tracebackhide__ = True501        if self._is_not is not self._actual.ok:...test_assertions.py
Source:test_assertions.py  
...190    expect(page.locator("input[name=input1]")).not_to_be_empty()191    expect(page.locator("input[name=input2]")).to_be_empty()192    with pytest.raises(AssertionError):193        expect(page.locator("input[name=input1]")).to_be_empty(timeout=100)194def test_assertions_locator_to_be_focused(page: Page, server: Server) -> None:195    page.goto(server.EMPTY_PAGE)196    page.set_content("<input type=checkbox>")197    my_checkbox = page.locator("input")198    with pytest.raises(AssertionError):199        expect(my_checkbox).to_be_focused(timeout=100)200    my_checkbox.focus()201    expect(my_checkbox).to_be_focused()202def test_assertions_locator_to_be_hidden_visible(page: Page, server: Server) -> None:203    page.goto(server.EMPTY_PAGE)204    page.set_content("<div style='width: 50px; height: 50px;'>Something</div>")205    my_checkbox = page.locator("div")206    expect(my_checkbox).to_be_visible()207    with pytest.raises(AssertionError):208        expect(my_checkbox).to_be_hidden(timeout=100)209    my_checkbox.evaluate("e => e.style.display = 'none'")210    expect(my_checkbox).to_be_hidden()211    with pytest.raises(AssertionError):212        expect(my_checkbox).to_be_visible(timeout=100)213def test_assertions_should_serialize_regexp_correctly(214    page: Page, server: Server215) -> None:...test_boolean_type.py
Source:test_boolean_type.py  
...132    dropdown_id = page.locator(f"{first_row_cell_selector} .cell-wrapper").get_attribute("aria-controls")133    dropdown_selector = f".dropdown.single-select-cell-dropdown:has(ul#{dropdown_id})"134    expect(page.locator(dropdown_selector)).not_to_be_visible()135    page.click(first_row_cell_selector)136    expect(page.locator(f"{first_row_cell_selector} .cell-wrapper")).to_be_focused()137    expect(page.locator(dropdown_selector)).to_be_visible()138    expect(page.locator(f"{dropdown_selector} li")).to_contain_text(["true", "false"])139    page.click(f"{dropdown_selector} li:has-text('true')")140    expect(page.locator(dropdown_selector)).not_to_be_visible()141    expect(first_row_cell_locator).to_contain_text("true", use_inner_text=True)142    expect(page.locator(f"{first_row_cell_selector} .cell-wrapper")).to_be_focused()143    page.click(first_row_cell_selector)144    expect(page.locator(dropdown_selector)).to_be_visible()145    page.click(f"{dropdown_selector} li:has-text('false')")146    expect(page.locator(dropdown_selector)).not_to_be_visible()147    expect(first_row_cell_locator).to_contain_text("false", use_inner_text=True)148    expect(page.locator(f"{first_row_cell_selector} .cell-wrapper")).to_be_focused()149    second_row_cell_selector = get_cell_selector(page, table_with_all_types, 2, "boolean_dd")150    expect(page.locator(second_row_cell_selector)).to_contain_text("true", use_inner_text=True)151    third_row_cell_selector = get_cell_selector(page, table_with_all_types, 3, "boolean_dd")152    expect(page.locator(third_row_cell_selector)).to_contain_text("false", use_inner_text=True)153def test_boolean_cell_dropdown_key_behavior(page, table_with_all_types, go_to_all_types_table):154    expect_table_to_open(page)155    cell_selector = get_cell_selector(page, table_with_all_types, 1, "boolean_dd")156    cell_locator = page.locator(cell_selector)157    page.click(cell_selector)158    expect(cell_locator).to_have_class(re.compile("is-active"))159    dropdown_id = page.locator(f"{cell_selector} .cell-wrapper").get_attribute("aria-controls")160    dropdown_selector = f".dropdown.single-select-cell-dropdown:has(ul#{dropdown_id})"161    expect(page.locator(dropdown_selector)).not_to_be_visible()162    expect(page.locator(f"{cell_selector} .cell-wrapper")).to_be_focused()163    page.keyboard.press("Enter")164    expect(page.locator(dropdown_selector)).to_be_visible()165    page.keyboard.press("ArrowDown")166    page.keyboard.press("Enter")167    expect(page.locator(dropdown_selector)).not_to_be_visible()168    expect(cell_locator).to_contain_text("true", use_inner_text=True)...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
