How to use to_be_focused method in Playwright Python

Best Python code snippet using playwright-python

globals.py

Source:globals.py Github

copy

Full Screen

1# Copyright (C) 2011-2012 Patrick Totzke <patricktotzke@gmail.com>2# Copyright © 2018 Dylan Baker3# This file is released under the GNU GPL, version 3 or a later revision.4# For further details see the COPYING file5import argparse6import code7import email8import email.utils9import glob10import logging11import os12import subprocess13from io import BytesIO14import asyncio15import shlex16import urwid17from . import Command, registerCommand18from . import CommandCanceled, SequenceCanceled19from .utils import update_keys20from .. import commands21from .. import buffers22from .. import helper23from ..helper import split_commandstring24from ..helper import mailto_to_envelope25from ..completion.commandline import CommandLineCompleter26from ..completion.contacts import ContactsCompleter27from ..completion.accounts import AccountCompleter28from ..completion.tags import TagsCompleter29from ..widgets.utils import DialogBox30from ..db.errors import DatabaseLockedError31from ..db.envelope import Envelope32from ..settings.const import settings33from ..settings.errors import ConfigError, NoMatchingAccount34from ..utils import argparse as cargparse35MODE = 'global'36@registerCommand(MODE, 'exit', help="shut down cleanly")37class ExitCommand(Command):38 """Shut down cleanly."""39 def __init__(self, _prompt=True, **kwargs):40 """41 :param _prompt: For internal use only, used to control prompting to42 close without sending, and is used by the43 BufferCloseCommand if settings change after yielding to44 the UI.45 :type _prompt: bool46 """47 super(ExitCommand, self).__init__(**kwargs)48 self.prompt_to_send = _prompt49 async def apply(self, ui):50 if settings.get('bug_on_exit'):51 msg = 'really quit?'52 if (await ui.choice(msg, select='yes', cancel='no',53 msg_position='left')) == 'no':54 return55 # check if there are any unsent messages56 if self.prompt_to_send:57 for buffer in ui.buffers:58 if (isinstance(buffer, buffers.EnvelopeBuffer) and59 not buffer.envelope.sent_time):60 msg = 'quit without sending message?'61 if (await ui.choice(msg, cancel='no',62 msg_position='left')) == 'no':63 raise CommandCanceled()64 for b in ui.buffers:65 b.cleanup()66 await ui.apply_command(FlushCommand(callback=ui.exit))67 ui.cleanup()68 if ui.db_was_locked:69 msg = 'Database locked. Exit without saving?'70 response = await ui.choice(msg, msg_position='left', cancel='no')71 if response == 'no':72 return73 ui.exit()74@registerCommand(MODE, 'search', usage='search query', arguments=[75 (['--sort'], {'help': 'sort order', 'choices': [76 'oldest_first', 'newest_first', 'message_id', 'unsorted']}),77 (['query'], {'nargs': argparse.REMAINDER, 'help': 'search string'})])78class SearchCommand(Command):79 """open a new search buffer. Search obeys the notmuch80 :ref:`search.exclude_tags <search.exclude_tags>` setting."""81 repeatable = True82 def __init__(self, query, sort=None, **kwargs):83 """84 :param query: notmuch querystring85 :type query: str86 :param sort: how to order results. Must be one of87 'oldest_first', 'newest_first', 'message_id' or88 'unsorted'.89 :type sort: str90 """91 self.query = ' '.join(query)92 self.order = sort93 Command.__init__(self, **kwargs)94 def apply(self, ui):95 if self.query:96 open_searches = ui.get_buffers_of_type(buffers.SearchBuffer)97 to_be_focused = None98 for sb in open_searches:99 if sb.querystring == self.query:100 to_be_focused = sb101 if to_be_focused:102 if ui.current_buffer != to_be_focused:103 ui.buffer_focus(to_be_focused)104 else:105 # refresh an already displayed search106 ui.current_buffer.rebuild()107 ui.update()108 else:109 ui.buffer_open(buffers.SearchBuffer(ui, self.query,110 sort_order=self.order))111 else:112 ui.notify('empty query string')113@registerCommand(MODE, 'prompt', arguments=[114 (['startwith'], {'nargs': '?', 'default': '', 'help': 'initial content'})])115class PromptCommand(Command):116 """prompts for commandline and interprets it upon select"""117 def __init__(self, startwith='', **kwargs):118 """119 :param startwith: initial content of the prompt widget120 :type startwith: str121 """122 self.startwith = startwith123 Command.__init__(self, **kwargs)124 async def apply(self, ui):125 logging.info('open command shell')126 mode = ui.mode or 'global'127 cmpl = CommandLineCompleter(ui.dbman, mode, ui.current_buffer)128 cmdline = await ui.prompt(129 '',130 text=self.startwith,131 completer=cmpl,132 history=ui.commandprompthistory)133 logging.debug('CMDLINE: %s', cmdline)134 # interpret and apply commandline135 if cmdline:136 # save into prompt history137 ui.commandprompthistory.append(cmdline)138 await ui.apply_commandline(cmdline)139 else:140 raise CommandCanceled()141@registerCommand(MODE, 'refresh')142class RefreshCommand(Command):143 """refresh the current buffer"""144 repeatable = True145 def apply(self, ui):146 ui.current_buffer.rebuild()147 ui.update()148@registerCommand(149 MODE, 'shellescape', arguments=[150 (['--spawn'], {'action': cargparse.BooleanAction, 'default': None,151 'help': 'run in terminal window'}),152 (['--thread'], {'action': cargparse.BooleanAction, 'default': None,153 'help': 'run in separate thread'}),154 (['--refocus'], {'action': cargparse.BooleanAction,155 'help': 'refocus current buffer after command '156 'has finished'}),157 (['cmd'], {'help': 'command line to execute'})],158 forced={'shell': True},159)160class ExternalCommand(Command):161 """run external command"""162 repeatable = True163 def __init__(self, cmd, stdin=None, shell=False, spawn=False,164 refocus=True, thread=False, on_success=None, **kwargs):165 """166 :param cmd: the command to call167 :type cmd: list or str168 :param stdin: input to pipe to the process169 :type stdin: file or str170 :param spawn: run command in a new terminal171 :type spawn: bool172 :param shell: let shell interpret command string173 :type shell: bool174 :param thread: run asynchronously, don't block alot175 :type thread: bool176 :param refocus: refocus calling buffer after cmd termination177 :type refocus: bool178 :param on_success: code to execute after command successfully exited179 :type on_success: callable180 """181 logging.debug({'spawn': spawn})182 # make sure cmd is a list of str183 if isinstance(cmd, str):184 # convert cmdstring to list: in case shell==True,185 # Popen passes only the first item in the list to $SHELL186 cmd = [cmd] if shell else split_commandstring(cmd)187 # determine complete command list to pass188 touchhook = settings.get_hook('touch_external_cmdlist')189 # filter cmd, shell and thread through hook if defined190 if touchhook is not None:191 logging.debug('calling hook: touch_external_cmdlist')192 res = touchhook(cmd, shell=shell, spawn=spawn, thread=thread)193 logging.debug('got: %s', res)194 cmd, shell, self.in_thread = res195 # otherwise if spawn requested and X11 is running196 elif spawn:197 if 'DISPLAY' in os.environ:198 term_cmd = settings.get('terminal_cmd', '')199 logging.info('spawn in terminal: %s', term_cmd)200 termcmdlist = split_commandstring(term_cmd)201 cmd = termcmdlist + cmd202 else:203 thread = False204 self.cmdlist = cmd205 self.stdin = stdin206 self.shell = shell207 self.refocus = refocus208 self.in_thread = thread209 self.on_success = on_success210 Command.__init__(self, **kwargs)211 async def apply(self, ui):212 logging.debug('cmdlist: %s', self.cmdlist)213 callerbuffer = ui.current_buffer214 # set standard input for subcommand215 stdin = None216 if self.stdin is not None:217 # wrap strings in StrinIO so that they behaves like a file218 if isinstance(self.stdin, str):219 # XXX: is utf-8 always safe to use here, or do we need to check220 # the terminal encoding first?221 stdin = BytesIO(self.stdin.encode('utf-8'))222 else:223 stdin = self.stdin224 logging.info('calling external command: %s', self.cmdlist)225 ret = ''226 # TODO: these can probably be refactored in terms of helper.call_cmd227 # and helper.call_cmd_async228 if self.in_thread:229 try:230 if self.shell:231 _cmd = asyncio.create_subprocess_shell232 # The shell function wants a single string or bytestring,233 # we could just join it, but lets be extra safe and use234 # shlex.quote to avoid suprises.235 cmdlist = [shlex.quote(' '.join(self.cmdlist))]236 else:237 _cmd = asyncio.create_subprocess_exec238 cmdlist = self.cmdlist239 proc = await _cmd(240 *cmdlist,241 stdout=subprocess.PIPE,242 stderr=subprocess.PIPE,243 stdin=subprocess.PIPE if stdin else None)244 except OSError as e:245 ret = str(e)246 else:247 _, err = await proc.communicate(stdin.read() if stdin else None)248 if proc.returncode == 0:249 ret = 'success'250 elif err:251 ret = err.decode(urwid.util.detected_encoding)252 else:253 with ui.paused():254 try:255 proc = subprocess.Popen(256 self.cmdlist, shell=self.shell,257 stdin=subprocess.PIPE if stdin else None,258 stderr=subprocess.PIPE)259 except OSError as e:260 ret = str(e)261 else:262 _, err = proc.communicate(stdin.read() if stdin else None)263 if proc.returncode == 0:264 ret = 'success'265 elif err:266 ret = err.decode(urwid.util.detected_encoding)267 if ret == 'success':268 if self.on_success is not None:269 self.on_success()270 else:271 msg = "editor has exited with error code {} -- {}".format(272 proc.returncode,273 ret or "No stderr output")274 ui.notify(msg, priority='error')275 if self.refocus and callerbuffer in ui.buffers:276 logging.info('refocussing')277 ui.buffer_focus(callerbuffer)278class EditCommand(ExternalCommand):279 """edit a file"""280 def __init__(self, path, spawn=None, thread=None, **kwargs):281 """282 :param path: path to the file to be edited283 :type path: str284 :param spawn: force running edtor in a new terminal285 :type spawn: bool286 :param thread: run asynchronously, don't block alot287 :type thread: bool288 """289 self.spawn = spawn290 if spawn is None:291 self.spawn = settings.get('editor_spawn')292 self.thread = thread293 if thread is None:294 self.thread = settings.get('editor_in_thread')295 editor_cmdstring = None296 if os.path.isfile('/usr/bin/editor'):297 editor_cmdstring = '/usr/bin/editor'298 editor_cmdstring = os.environ.get('EDITOR', editor_cmdstring)299 editor_cmdstring = settings.get('editor_cmd') or editor_cmdstring300 logging.debug('using editor_cmd: %s', editor_cmdstring)301 self.cmdlist = None302 if editor_cmdstring:303 if '%s' in editor_cmdstring:304 cmdstring = editor_cmdstring.replace('%s',305 helper.shell_quote(path))306 self.cmdlist = split_commandstring(cmdstring)307 else:308 self.cmdlist = split_commandstring(editor_cmdstring) + [path]309 logging.debug({'spawn: ': self.spawn, 'in_thread': self.thread})310 ExternalCommand.__init__(self, self.cmdlist,311 spawn=self.spawn, thread=self.thread,312 **kwargs)313 async def apply(self, ui):314 if self.cmdlist is None:315 ui.notify('no editor set', priority='error')316 else:317 return await ExternalCommand.apply(self, ui)318@registerCommand(MODE, 'pyshell')319class PythonShellCommand(Command):320 """open an interactive python shell for introspection"""321 repeatable = True322 def apply(self, ui):323 with ui.paused():324 code.interact(local=locals())325@registerCommand(MODE, 'repeat')326class RepeatCommand(Command):327 """repeat the command executed last time"""328 def __init__(self, **kwargs):329 Command.__init__(self, **kwargs)330 async def apply(self, ui):331 if ui.last_commandline is not None:332 await ui.apply_commandline(ui.last_commandline)333 else:334 ui.notify('no last command')335@registerCommand(MODE, 'call', arguments=[336 (['command'], {'help': 'python command string to call'})])337class CallCommand(Command):338 """execute python code"""339 repeatable = True340 def __init__(self, command, **kwargs):341 """342 :param command: python command string to call343 :type command: str344 """345 Command.__init__(self, **kwargs)346 self.command = command347 async def apply(self, ui):348 try:349 hooks = settings.hooks350 if hooks:351 env = {'ui': ui, 'settings': settings}352 for k, v in env.items():353 if not getattr(hooks, k, None):354 setattr(hooks, k, v)355 t = eval(self.command)356 if asyncio.iscoroutine(t):357 await t358 except Exception as e:359 logging.exception(e)360 msg = 'an error occurred during execution of "%s":\n%s'361 ui.notify(msg % (self.command, e), priority='error')362@registerCommand(MODE, 'bclose', arguments=[363 (['--redraw'],364 {'action': cargparse.BooleanAction,365 'help': 'redraw current buffer after command has finished'}),366 (['--force'],367 {'action': 'store_true', 'help': 'never ask for confirmation'})])368class BufferCloseCommand(Command):369 """close a buffer"""370 repeatable = True371 def __init__(self, buffer=None, force=False, redraw=True, **kwargs):372 """373 :param buffer: the buffer to close or None for current374 :type buffer: `alot.buffers.Buffer`375 :param force: force buffer close376 :type force: bool377 """378 self.buffer = buffer379 self.force = force380 self.redraw = redraw381 Command.__init__(self, **kwargs)382 async def apply(self, ui):383 async def one_buffer(prompt=True):384 """Helper to handle the case on only one buffer being opened.385 prompt is a boolean that is passed to ExitCommand() as the _prompt386 keyword argument.387 """388 # If there is only one buffer and the settings don't allow using389 # closebuffer to exit, then just stop.390 if not settings.get('quit_on_last_bclose'):391 msg = ('not closing last remaining buffer as '392 'global.quit_on_last_bclose is set to False')393 logging.info(msg)394 ui.notify(msg, priority='error')395 # Otherwise pass directly to exit command, which also prommpts for396 # 'close without sending'397 else:398 logging.info('closing the last buffer, exiting')399 await ui.apply_command(ExitCommand(_prompt=prompt))400 if self.buffer is None:401 self.buffer = ui.current_buffer402 if len(ui.buffers) == 1:403 await one_buffer()404 return405 if (isinstance(self.buffer, buffers.EnvelopeBuffer) and406 not self.buffer.envelope.sent_time):407 msg = 'close without sending?'408 if (not self.force and (await ui.choice(msg, cancel='no',409 msg_position='left')) ==410 'no'):411 raise CommandCanceled()412 # Because we await above it is possible that the settings or the number413 # of buffers chould change, so retest.414 if len(ui.buffers) == 1:415 await one_buffer(prompt=False)416 else:417 ui.buffer_close(self.buffer, self.redraw)418@registerCommand(MODE, 'bprevious', forced={'offset': -1},419 help='focus previous buffer')420@registerCommand(MODE, 'bnext', forced={'offset': +1},421 help='focus next buffer')422@registerCommand(423 MODE, 'buffer',424 arguments=[(['index'], {'type': int, 'help': 'buffer index to focus'})],425 help='focus buffer with given index')426class BufferFocusCommand(Command):427 """focus a :class:`~alot.buffers.Buffer`"""428 repeatable = True429 def __init__(self, buffer=None, index=None, offset=0, **kwargs):430 """431 :param buffer: the buffer to focus or None432 :type buffer: `alot.buffers.Buffer`433 :param index: index (in bufferlist) of the buffer to focus.434 :type index: int435 :param offset: position of the buffer to focus relative to the436 currently focussed one. This is used only if `buffer`437 is set to `None`438 :type offset: int439 """440 self.buffer = buffer441 self.index = index442 self.offset = offset443 Command.__init__(self, **kwargs)444 def apply(self, ui):445 if self.buffer is None:446 if self.index is not None:447 try:448 self.buffer = ui.buffers[self.index]449 except IndexError:450 ui.notify('no buffer exists at index %d' % self.index)451 return452 else:453 self.index = ui.buffers.index(ui.current_buffer)454 num = len(ui.buffers)455 self.buffer = ui.buffers[(self.index + self.offset) % num]456 ui.buffer_focus(self.buffer)457@registerCommand(MODE, 'bufferlist')458class OpenBufferlistCommand(Command):459 """open a list of active buffers"""460 def __init__(self, filtfun=lambda x: x, **kwargs):461 """462 :param filtfun: filter to apply to displayed list463 :type filtfun: callable (str->bool)464 """465 self.filtfun = filtfun466 Command.__init__(self, **kwargs)467 def apply(self, ui):468 blists = ui.get_buffers_of_type(buffers.BufferlistBuffer)469 if blists:470 ui.buffer_focus(blists[0])471 else:472 bl = buffers.BufferlistBuffer(ui, self.filtfun)473 ui.buffer_open(bl)474@registerCommand(MODE, 'taglist', arguments=[475 (['--tags'], {'nargs': '+', 'help': 'tags to display'}),476])477class TagListCommand(Command):478 """opens taglist buffer"""479 def __init__(self, filtfun=lambda x: x, tags=None, **kwargs):480 """481 :param filtfun: filter to apply to displayed list482 :type filtfun: callable (str->bool)483 """484 self.filtfun = filtfun485 self.tags = tags486 Command.__init__(self, **kwargs)487 def apply(self, ui):488 tags = self.tags or ui.dbman.get_all_tags()489 blists = ui.get_buffers_of_type(buffers.TagListBuffer)490 if blists:491 buf = blists[0]492 buf.tags = tags493 buf.rebuild()494 ui.buffer_focus(buf)495 else:496 ui.buffer_open(buffers.TagListBuffer(ui, tags, self.filtfun))497@registerCommand(MODE, 'namedqueries')498class NamedQueriesCommand(Command):499 """opens named queries buffer"""500 def __init__(self, filtfun=bool, **kwargs):501 """502 :param filtfun: filter to apply to displayed list503 :type filtfun: callable (str->bool)504 """505 self.filtfun = filtfun506 Command.__init__(self, **kwargs)507 def apply(self, ui):508 ui.buffer_open(buffers.NamedQueriesBuffer(ui, self.filtfun))509@registerCommand(MODE, 'flush')510class FlushCommand(Command):511 """flush write operations or retry until committed"""512 repeatable = True513 def __init__(self, callback=None, silent=False, **kwargs):514 """515 :param callback: function to call after successful writeout516 :type callback: callable517 """518 Command.__init__(self, **kwargs)519 self.callback = callback520 self.silent = silent521 def apply(self, ui):522 try:523 ui.dbman.flush()524 if callable(self.callback):525 self.callback()526 logging.debug('flush complete')527 if ui.db_was_locked:528 if not self.silent:529 ui.notify('changes flushed')530 ui.db_was_locked = False531 ui.update()532 except DatabaseLockedError:533 timeout = settings.get('flush_retry_timeout')534 if timeout > 0:535 def f(*_):536 self.apply(ui)537 ui.mainloop.set_alarm_in(timeout, f)538 if not ui.db_was_locked:539 if not self.silent:540 ui.notify('index locked, will try again in %d secs'541 % timeout)542 ui.db_was_locked = True543 ui.update()544 return545# TODO: choices546@registerCommand(MODE, 'help', arguments=[547 (['commandname'], {'help': 'command or \'bindings\''})])548class HelpCommand(Command):549 """display help for a command (use \'bindings\' to display all keybindings550 interpreted in current mode)"""551 def __init__(self, commandname='', **kwargs):552 """553 :param commandname: command to document554 :type commandname: str555 """556 Command.__init__(self, **kwargs)557 self.commandname = commandname558 def apply(self, ui):559 logging.debug('HELP')560 if self.commandname == 'bindings':561 text_att = settings.get_theming_attribute('help', 'text')562 title_att = settings.get_theming_attribute('help', 'title')563 section_att = settings.get_theming_attribute('help', 'section')564 # get mappings565 globalmaps, modemaps = settings.get_keybindings(ui.mode)566 # build table567 maxkeylength = len(568 max(list(modemaps.keys()) + list(globalmaps.keys()), key=len))569 keycolumnwidth = maxkeylength + 2570 linewidgets = []571 # mode specific maps572 if modemaps:573 txt = (section_att, '\n%s-mode specific maps' % ui.mode)574 linewidgets.append(urwid.Text(txt))575 for (k, v) in modemaps.items():576 line = urwid.Columns([('fixed', keycolumnwidth,577 urwid.Text((text_att, k))),578 urwid.Text((text_att, v))])579 linewidgets.append(line)580 # global maps581 linewidgets.append(urwid.Text((section_att, '\nglobal maps')))582 for (k, v) in globalmaps.items():583 if k not in modemaps:584 line = urwid.Columns(585 [('fixed', keycolumnwidth, urwid.Text((text_att, k))),586 urwid.Text((text_att, v))])587 linewidgets.append(line)588 body = urwid.ListBox(linewidgets)589 titletext = 'Bindings Help (escape cancels)'590 box = DialogBox(body, titletext,591 bodyattr=text_att,592 titleattr=title_att)593 # put promptwidget as overlay on main widget594 overlay = urwid.Overlay(box, ui.root_widget, 'center',595 ('relative', 70), 'middle',596 ('relative', 70))597 ui.show_as_root_until_keypress(overlay, 'esc')598 else:599 logging.debug('HELP %s', self.commandname)600 parser = commands.lookup_parser(self.commandname, ui.mode)601 if parser:602 ui.notify(parser.format_help(), block=True)603 else:604 ui.notify('command not known: %s' % self.commandname,605 priority='error')606@registerCommand(MODE, 'compose', arguments=[607 (['--sender'], {'nargs': '?', 'help': 'sender'}),608 (['--template'], {'nargs': '?',609 'help': 'path to a template message file'}),610 (['--tags'], {'nargs': '?',611 'help': 'comma-separated list of tags to apply to message'}),612 (['--subject'], {'nargs': '?', 'help': 'subject line'}),613 (['--to'], {'nargs': '+', 'help': 'recipients'}),614 (['--cc'], {'nargs': '+', 'help': 'copy to'}),615 (['--bcc'], {'nargs': '+', 'help': 'blind copy to'}),616 (['--attach'], {'nargs': '+', 'help': 'attach files'}),617 (['--omit_signature'], {'action': 'store_true',618 'help': 'do not add signature'}),619 (['--spawn'], {'action': cargparse.BooleanAction, 'default': None,620 'help': 'spawn editor in new terminal'}),621 (['rest'], {'nargs': '*'}),622])623class ComposeCommand(Command):624 """compose a new email"""625 def __init__(626 self,627 envelope=None, headers=None, template=None, sender='',628 tags=None, subject='', to=None, cc=None, bcc=None, attach=None,629 omit_signature=False, spawn=None, rest=None, encrypt=False,630 **kwargs):631 """632 :param envelope: use existing envelope633 :type envelope: :class:`~alot.db.envelope.Envelope`634 :param headers: forced header values635 :type headers: dict (str->str)636 :param template: name of template to parse into the envelope after637 creation. This should be the name of a file in your638 template_dir639 :type template: str640 :param sender: From-header value641 :type sender: str642 :param tags: Comma-separated list of tags to apply to message643 :type tags: list(str)644 :param subject: Subject-header value645 :type subject: str646 :param to: To-header value647 :type to: str648 :param cc: Cc-header value649 :type cc: str650 :param bcc: Bcc-header value651 :type bcc: str652 :param attach: Path to files to be attached (globable)653 :type attach: str654 :param omit_signature: do not attach/append signature655 :type omit_signature: bool656 :param spawn: force spawning of editor in a new terminal657 :type spawn: bool658 :param rest: remaining parameters. These can start with659 'mailto' in which case it is interpreted as mailto string.660 Otherwise it will be interpreted as recipients (to) header661 :type rest: list(str)662 :param encrypt: if the email should be encrypted663 :type encrypt: bool664 """665 Command.__init__(self, **kwargs)666 self.envelope = envelope667 self.template = template668 self.headers = headers or {}669 self.sender = sender670 self.subject = subject671 self.to = to or []672 self.cc = cc or []673 self.bcc = bcc or []674 self.attach = attach675 self.omit_signature = omit_signature676 self.force_spawn = spawn677 self.rest = ' '.join(rest or [])678 self.encrypt = encrypt679 self.tags = tags680 class ApplyError(Exception):681 pass682 def _get_template(self, ui):683 # get location of tempsdir, containing msg templates684 tempdir = settings.get('template_dir')685 path = os.path.expanduser(self.template)686 if not os.path.dirname(path): # use tempsdir687 if not os.path.isdir(tempdir):688 ui.notify('no templates directory: %s' % tempdir,689 priority='error')690 raise self.ApplyError()691 path = os.path.join(tempdir, path)692 if not os.path.isfile(path):693 ui.notify('could not find template: %s' % path,694 priority='error')695 raise self.ApplyError()696 try:697 with open(path, 'rb') as f:698 template = helper.try_decode(f.read())699 self.envelope.parse_template(template)700 except Exception as e:701 ui.notify(str(e), priority='error')702 raise self.ApplyError()703 async def _get_sender_details(self, ui):704 # find out the right account, if possible yet705 account = self.envelope.account706 if account is None:707 accounts = settings.get_accounts()708 if not accounts:709 ui.notify('no accounts set.', priority='error')710 return711 elif len(accounts) == 1:712 account = accounts[0]713 # get missing From header714 if 'From' not in self.envelope.headers:715 if account is not None:716 fromstring = email.utils.formataddr(717 (account.realname, str(account.address)))718 self.envelope.add('From', fromstring)719 else:720 cmpl = AccountCompleter()721 fromaddress = await ui.prompt('From', completer=cmpl,722 tab=1, history=ui.senderhistory)723 if fromaddress is None:724 raise CommandCanceled()725 ui.senderhistory.append(fromaddress)726 self.envelope.add('From', fromaddress)727 else:728 fromaddress = self.envelope.get("From")729 # try to find the account again730 if account is None:731 try:732 account = settings.account_matching_address(fromaddress)733 except NoMatchingAccount:734 msg = 'Cannot compose mail - ' \735 'no account found for `%s`' % fromaddress736 logging.error(msg)737 ui.notify(msg, priority='error')738 raise CommandCanceled()739 if self.envelope.account is None:740 self.envelope.account = account741 async def _set_signature(self, ui):742 account = self.envelope.account743 if not self.omit_signature and account.signature:744 logging.debug('has signature')745 sig = os.path.expanduser(account.signature)746 if os.path.isfile(sig):747 logging.debug('is file')748 if account.signature_as_attachment:749 name = account.signature_filename or None750 self.envelope.attach(sig, filename=name)751 logging.debug('attached')752 else:753 with open(sig, 'rb') as f:754 sigcontent = f.read()755 mimetype = helper.guess_mimetype(sigcontent)756 if mimetype.startswith('text'):757 sigcontent = helper.try_decode(sigcontent)758 self.envelope.body_txt += '\n' + sigcontent759 else:760 ui.notify('could not locate signature: %s' % sig,761 priority='error')762 if (await ui.choice('send without signature?', 'yes',763 'no')) == 'no':764 raise self.ApplyError765 async def apply(self, ui):766 try:767 await self.__apply(ui)768 except self.ApplyError:769 return770 def _get_account(self, ui):771 # find out the right account772 sender = self.envelope.get('From')773 _, addr = email.utils.parseaddr(sender)774 try:775 account = settings.get_account_by_address(addr)776 except NoMatchingAccount:777 msg = 'Cannot compose mail - no account found for `%s`' % addr778 logging.error(msg)779 ui.notify(msg, priority='error')780 raise CommandCanceled()781 if account is None:782 accounts = settings.get_accounts()783 if not accounts:784 ui.notify('no accounts set.', priority='error')785 raise self.ApplyError786 account = accounts[0]787 return account788 def _set_envelope(self):789 if self.envelope is None:790 if self.rest:791 if self.rest.startswith('mailto'):792 self.envelope = mailto_to_envelope(self.rest)793 else:794 self.envelope = Envelope()795 self.envelope.add('To', self.rest)796 else:797 self.envelope = Envelope()798 def _set_gpg_sign(self, ui):799 account = self.envelope.account800 if account.sign_by_default:801 if account.gpg_key:802 self.envelope.sign = account.sign_by_default803 self.envelope.sign_key = account.gpg_key804 else:805 msg = 'Cannot find gpg key for account {}'806 msg = msg.format(account.address)807 logging.warning(msg)808 ui.notify(msg, priority='error')809 async def _set_to(self, ui):810 account = self.envelope.account811 if 'To' not in self.envelope.headers:812 allbooks = not settings.get('complete_matching_abook_only')813 logging.debug(allbooks)814 abooks = settings.get_addressbooks(order=[account],815 append_remaining=allbooks)816 logging.debug(abooks)817 completer = ContactsCompleter(abooks)818 to = await ui.prompt('To', completer=completer,819 history=ui.recipienthistory)820 if to is None:821 raise CommandCanceled()822 to = to.strip(' \t\n,')823 ui.recipienthistory.append(to)824 self.envelope.add('To', to)825 async def _set_gpg_encrypt(self, ui):826 account = self.envelope.account827 if self.encrypt or account.encrypt_by_default == "all":828 logging.debug("Trying to encrypt message because encrypt=%s and "829 "encrypt_by_default=%s", self.encrypt,830 account.encrypt_by_default)831 await update_keys(ui, self.envelope, block_error=self.encrypt)832 elif account.encrypt_by_default == "trusted":833 logging.debug("Trying to encrypt message because "834 "account.encrypt_by_default=%s",835 account.encrypt_by_default)836 await update_keys(ui, self.envelope, block_error=self.encrypt,837 signed_only=True)838 else:839 logging.debug("No encryption by default, encrypt_by_default=%s",840 account.encrypt_by_default)841 def _set_base_attributes(self):842 # set forced headers843 for key, value in self.headers.items():844 self.envelope.add(key, value)845 # set forced headers for separate parameters846 if self.sender:847 self.envelope.add('From', self.sender)848 if self.subject:849 self.envelope.add('Subject', self.subject)850 if self.to:851 self.envelope.add('To', ','.join(self.to))852 if self.cc:853 self.envelope.add('Cc', ','.join(self.cc))854 if self.bcc:855 self.envelope.add('Bcc', ','.join(self.bcc))856 if self.tags:857 self.envelope.tags = [t for t in self.tags.split(',') if t]858 async def _set_subject(self, ui):859 if settings.get('ask_subject') and \860 'Subject' not in self.envelope.headers:861 subject = await ui.prompt('Subject')862 logging.debug('SUBJECT: "%s"', subject)863 if subject is None:864 raise CommandCanceled()865 self.envelope.add('Subject', subject)866 async def _set_compose_tags(self, ui):867 if settings.get('compose_ask_tags'):868 comp = TagsCompleter(ui.dbman)869 tags = ','.join(self.tags) if self.tags else ''870 tagsstring = await ui.prompt('Tags', text=tags, completer=comp)871 tags = [t for t in tagsstring.split(',') if t]872 if tags is None:873 raise CommandCanceled()874 self.envelope.tags = tags875 def _set_attachments(self):876 if self.attach:877 for gpath in self.attach:878 for a in glob.glob(gpath):879 self.envelope.attach(a)880 logging.debug('attaching: %s', a)881 async def __apply(self, ui):882 self._set_envelope()883 if self.template is not None:884 self._get_template(ui)885 # Set headers and tags886 self._set_base_attributes()887 # set account and missing From header888 await self._get_sender_details(ui)889 # add signature890 await self._set_signature(ui)891 # Figure out whether we should GPG sign messages by default892 # and look up key if so893 self._set_gpg_sign(ui)894 # get missing To header895 await self._set_to(ui)896 # Set subject897 await self._set_subject(ui)898 # Add additional tags899 await self._set_compose_tags(ui)900 # Set attachments901 self._set_attachments()902 # set encryption if needed903 await self._set_gpg_encrypt(ui)904 cmd = commands.envelope.EditCommand(envelope=self.envelope,905 spawn=self.force_spawn,906 refocus=False)907 await ui.apply_command(cmd)908@registerCommand(909 MODE, 'move', help='move focus in current buffer',910 arguments=[911 (['movement'],912 {'nargs': argparse.REMAINDER,913 'help': 'up, down, [half]page up, [half]page down, first, last'})])914class MoveCommand(Command):915 """move in widget"""916 def __init__(self, movement=None, **kwargs):917 if movement is None:918 self.movement = ''919 else:920 self.movement = ' '.join(movement)921 Command.__init__(self, **kwargs)922 def apply(self, ui):923 if self.movement in ['up', 'down', 'page up', 'page down']:924 ui.mainloop.process_input([self.movement])925 elif self.movement in ['halfpage down', 'halfpage up']:926 ui.mainloop.process_input(927 ui.mainloop.screen_size[1] // 2 * [self.movement.split()[-1]])928 elif self.movement == 'first':929 if hasattr(ui.current_buffer, "focus_first"):930 ui.current_buffer.focus_first()931 ui.update()932 elif self.movement == 'last':933 if hasattr(ui.current_buffer, "focus_last"):934 ui.current_buffer.focus_last()935 ui.update()936 else:937 ui.notify('unknown movement: ' + self.movement,938 priority='error')939@registerCommand(MODE, 'reload', help='reload all configuration files')940class ReloadCommand(Command):941 """Reload configuration."""942 def apply(self, ui):943 try:944 settings.reload()945 except ConfigError as e:946 ui.notify('Error when reloading config files:\n {}'.format(e),947 priority='error')948@registerCommand(949 MODE, 'savequery',950 arguments=[951 (['--no-flush'], {'action': 'store_false', 'dest': 'flush',952 'default': 'True',953 'help': 'postpone a writeout to the index'}),954 (['alias'], {'help': 'alias to use for query string'}),955 (['query'], {'help': 'query string to store',956 'nargs': '+'})957 ],958 help='store query string as a "named query" in the database')959class SaveQueryCommand(Command):960 """save alias for query string"""961 repeatable = False962 def __init__(self, alias, query=None, flush=True, **kwargs):963 """964 :param alias: name to use for query string965 :type alias: str966 :param query: query string to save967 :type query: str or None968 :param flush: immediately write out to the index969 :type flush: bool970 """971 self.alias = alias972 if query is None:973 self.query = ''974 else:975 self.query = ' '.join(query)976 self.flush = flush977 Command.__init__(self, **kwargs)978 def apply(self, ui):979 msg = 'saved alias "%s" for query string "%s"' % (self.alias,980 self.query)981 try:982 ui.dbman.save_named_query(self.alias, self.query)983 logging.debug(msg)984 ui.notify(msg)985 except DatabaseROError:986 ui.notify('index in read-only mode', priority='error')987 return988 # flush index989 if self.flush:990 ui.apply_command(commands.globals.FlushCommand())991@registerCommand(992 MODE, 'removequery',993 arguments=[994 (['--no-flush'], {'action': 'store_false', 'dest': 'flush',995 'default': 'True',996 'help': 'postpone a writeout to the index'}),997 (['alias'], {'help': 'alias to remove'}),998 ],999 help='removes a "named query" from the database')1000class RemoveQueryCommand(Command):1001 """remove named query string for given alias"""1002 repeatable = False1003 def __init__(self, alias, flush=True, **kwargs):1004 """1005 :param alias: name to use for query string1006 :type alias: str1007 :param flush: immediately write out to the index1008 :type flush: bool1009 """1010 self.alias = alias1011 self.flush = flush1012 Command.__init__(self, **kwargs)1013 def apply(self, ui):1014 msg = 'removed alias "%s"' % (self.alias)1015 try:1016 ui.dbman.remove_named_query(self.alias)1017 logging.debug(msg)1018 ui.notify(msg)1019 except DatabaseROError:1020 ui.notify('index in read-only mode', priority='error')1021 return1022 # flush index1023 if self.flush:1024 ui.apply_command(commands.globals.FlushCommand())1025@registerCommand(1026 MODE, 'confirmsequence',1027 arguments=[1028 (['msg'], {'help': 'Additional message to prompt',1029 'nargs': '*'})1030 ],1031 help="prompt to confirm a sequence of commands")1032class ConfirmCommand(Command):1033 """Prompt user to confirm a sequence of commands."""1034 def __init__(self, msg=None, **kwargs):1035 """1036 :param msg: Additional message to prompt the user with1037 :type msg: List[str]1038 """1039 super(ConfirmCommand, self).__init__(**kwargs)1040 if not msg:1041 self.msg = "Confirm sequence?"1042 else:1043 self.msg = "Confirm sequence: {}?".format(" ".join(msg))1044 async def apply(self, ui):1045 if (await ui.choice(self.msg, select='yes', cancel='no',1046 msg_position='left')) == 'no':...

Full Screen

Full Screen

_assertions.py

Source:_assertions.py Github

copy

Full Screen

...467 timeout: float = None,468 ) -> None:469 __tracebackhide__ = True470 await self._not.to_be_visible(timeout)471 async def to_be_focused(472 self,473 timeout: float = None,474 ) -> None:475 __tracebackhide__ = True476 await self._expect_impl(477 "to.be.focused",478 FrameExpectOptions(timeout=timeout),479 None,480 "Locator expected to be focused",481 )482 async def not_to_be_focused(483 self,484 timeout: float = None,485 ) -> None:486 __tracebackhide__ = True487 await self._not.to_be_focused(timeout)488class APIResponseAssertions:489 def __init__(self, response: APIResponse, is_not: bool = False) -> None:490 self._loop = response._loop491 self._dispatcher_fiber = response._dispatcher_fiber492 self._is_not = is_not493 self._actual = response494 @property495 def _not(self) -> "APIResponseAssertions":496 return APIResponseAssertions(self._actual, not self._is_not)497 async def to_be_ok(498 self,499 ) -> None:500 __tracebackhide__ = True501 if self._is_not is not self._actual.ok:...

Full Screen

Full Screen

test_assertions.py

Source:test_assertions.py Github

copy

Full Screen

...190 expect(page.locator("input[name=input1]")).not_to_be_empty()191 expect(page.locator("input[name=input2]")).to_be_empty()192 with pytest.raises(AssertionError):193 expect(page.locator("input[name=input1]")).to_be_empty(timeout=100)194def test_assertions_locator_to_be_focused(page: Page, server: Server) -> None:195 page.goto(server.EMPTY_PAGE)196 page.set_content("<input type=checkbox>")197 my_checkbox = page.locator("input")198 with pytest.raises(AssertionError):199 expect(my_checkbox).to_be_focused(timeout=100)200 my_checkbox.focus()201 expect(my_checkbox).to_be_focused()202def test_assertions_locator_to_be_hidden_visible(page: Page, server: Server) -> None:203 page.goto(server.EMPTY_PAGE)204 page.set_content("<div style='width: 50px; height: 50px;'>Something</div>")205 my_checkbox = page.locator("div")206 expect(my_checkbox).to_be_visible()207 with pytest.raises(AssertionError):208 expect(my_checkbox).to_be_hidden(timeout=100)209 my_checkbox.evaluate("e => e.style.display = 'none'")210 expect(my_checkbox).to_be_hidden()211 with pytest.raises(AssertionError):212 expect(my_checkbox).to_be_visible(timeout=100)213def test_assertions_should_serialize_regexp_correctly(214 page: Page, server: Server215) -> None:...

Full Screen

Full Screen

test_boolean_type.py

Source:test_boolean_type.py Github

copy

Full Screen

...132 dropdown_id = page.locator(f"{first_row_cell_selector} .cell-wrapper").get_attribute("aria-controls")133 dropdown_selector = f".dropdown.single-select-cell-dropdown:has(ul#{dropdown_id})"134 expect(page.locator(dropdown_selector)).not_to_be_visible()135 page.click(first_row_cell_selector)136 expect(page.locator(f"{first_row_cell_selector} .cell-wrapper")).to_be_focused()137 expect(page.locator(dropdown_selector)).to_be_visible()138 expect(page.locator(f"{dropdown_selector} li")).to_contain_text(["true", "false"])139 page.click(f"{dropdown_selector} li:has-text('true')")140 expect(page.locator(dropdown_selector)).not_to_be_visible()141 expect(first_row_cell_locator).to_contain_text("true", use_inner_text=True)142 expect(page.locator(f"{first_row_cell_selector} .cell-wrapper")).to_be_focused()143 page.click(first_row_cell_selector)144 expect(page.locator(dropdown_selector)).to_be_visible()145 page.click(f"{dropdown_selector} li:has-text('false')")146 expect(page.locator(dropdown_selector)).not_to_be_visible()147 expect(first_row_cell_locator).to_contain_text("false", use_inner_text=True)148 expect(page.locator(f"{first_row_cell_selector} .cell-wrapper")).to_be_focused()149 second_row_cell_selector = get_cell_selector(page, table_with_all_types, 2, "boolean_dd")150 expect(page.locator(second_row_cell_selector)).to_contain_text("true", use_inner_text=True)151 third_row_cell_selector = get_cell_selector(page, table_with_all_types, 3, "boolean_dd")152 expect(page.locator(third_row_cell_selector)).to_contain_text("false", use_inner_text=True)153def test_boolean_cell_dropdown_key_behavior(page, table_with_all_types, go_to_all_types_table):154 expect_table_to_open(page)155 cell_selector = get_cell_selector(page, table_with_all_types, 1, "boolean_dd")156 cell_locator = page.locator(cell_selector)157 page.click(cell_selector)158 expect(cell_locator).to_have_class(re.compile("is-active"))159 dropdown_id = page.locator(f"{cell_selector} .cell-wrapper").get_attribute("aria-controls")160 dropdown_selector = f".dropdown.single-select-cell-dropdown:has(ul#{dropdown_id})"161 expect(page.locator(dropdown_selector)).not_to_be_visible()162 expect(page.locator(f"{cell_selector} .cell-wrapper")).to_be_focused()163 page.keyboard.press("Enter")164 expect(page.locator(dropdown_selector)).to_be_visible()165 page.keyboard.press("ArrowDown")166 page.keyboard.press("Enter")167 expect(page.locator(dropdown_selector)).not_to_be_visible()168 expect(cell_locator).to_contain_text("true", use_inner_text=True)...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful