How to use auto_close_results method in SeleniumBase

Best Python code snippet using SeleniumBase

__init__.py

Source:__init__.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import csv3import io4import logging5import os6import threading7import time8import cxnstr9import greenlet10import pymysql11import pymysql.constants.FIELD_TYPE as FT12import pynvim13import six14import nvim_mysql.autocomplete15import nvim_mysql.util16logger = logging.getLogger(__name__)17logger.setLevel(logging.DEBUG)18NUMERIC_TYPES = [19 FT.DECIMAL,20 FT.TINY,21 FT.SHORT,22 FT.LONG,23 FT.FLOAT,24 FT.DOUBLE,25 FT.LONGLONG,26 FT.INT24,27 FT.NEWDECIMAL,28]29DATE_TYPES = [30 FT.TIMESTAMP,31 FT.DATE,32 FT.TIME,33 FT.DATETIME,34 FT.YEAR,35 FT.NEWDATE,36]37OPTION_DEFAULTS = {38 'aliases': None,39 'auto_close_results': 0,40 'aux_window_pref': 'results',41 'use_spinner': 1,42}43SPINNER_CHARS = u"⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"44class NvimMySQLError(Exception):45 pass46def prepend_type_hints_to_header(header, types):47 for i, t in enumerate(types):48 if t in NUMERIC_TYPES:49 header[i] = '#' + header[i]50 elif t in DATE_TYPES:51 header[i] = '@' + header[i]52def display_value(v):53 """Return the value to display for one particular cell/value."""54 if v is None:55 v = u'NULL'56 elif isinstance(v, bytes):57 try:58 v = v.decode('utf-8')59 v = ' '.join(v.splitlines())60 except UnicodeDecodeError:61 if six.PY3:62 v = '0x' + v.hex()63 else:64 v = '0x' + v.encode('hex')65 else:66 v = six.text_type(v)67 v = ' '.join(v.splitlines())68 return v69def results_to_table(header, rows, types=None):70 """Format query result set as an ASCII table.71 If a list of field types is provided (from cursor.description), type hints72 will be added to the headers.73 Return a list of strings.74 """75 header = header[:]76 if types:77 prepend_type_hints_to_header(header, types)78 col_lengths = [max([len(display_value(r)) for r in col]) for col in zip(header, *rows)]79 # Table elements.80 horizontal_bar = '+' + '+'.join(['-' * (l + 2) for l in col_lengths]) + '+'81 def table_row(row):82 # Return a database row formatted as a table row.83 return '|' + '|'.join(84 [u' {:{}} '.format(display_value(v), l) for v, l in zip(row, col_lengths)]) + '|'85 return [86 horizontal_bar,87 table_row(header),88 horizontal_bar,89 ] + [table_row(r) for r in rows] + [90 horizontal_bar91 ]92def results_to_vertical(header, rows, types=None):93 """Format query result set as a series of field: value lines.94 Each row will span len(row) lines.95 If a list of field types is provided (from cursor.description), type hints96 will be added to the headers.97 Return a list of strings.98 """99 header = header[:]100 if types:101 prepend_type_hints_to_header(header, types)102 header_lengths = [len(h) for h in header]103 max_header_length = max(header_lengths)104 header_strs = ['{{:>{}}}'.format(max_header_length + 1).format(header[i]) for i in range(len(header))]105 output = []106 for i, row in enumerate(rows, 1):107 if len(rows) > 1:108 output.append('***** row {} *****'.format(i))109 for j, v in enumerate(row):110 output.append('{}: {}'.format(header_strs[j], display_value(v)))111 if len(rows) > 1 and i < len(rows):112 output.append('')113 return output114def results_to_csv(header, rows):115 """Format query result set as a CSV file.116 Note that CSV is a text format, so binary data that is not valid utf-8 will117 cause an error.118 """119 # In Python 2, the csv module can't accept unicode, so we have to give it UTF-8.120 # In Python 3, the csv module accepts unicode.121 def output_value(v):122 if six.PY3:123 if isinstance(v, bytes):124 return v.decode('utf-8')125 else:126 if isinstance(v, unicode):127 return v.encode('utf-8')128 return v129 f = six.StringIO()130 csv_out = csv.writer(f)131 csv_out.writerow([output_value(v) for v in header])132 for row in rows:133 csv_out.writerow([output_value(v) for v in row])134 return f.getvalue().splitlines()135def format_results(results, format_='table', metadata=None):136 if metadata is None:137 metadata = {}138 if results['type'] == 'read':139 if format_ == 'table':140 lines = results_to_table(results['header'], results['rows'], results['types'])141 lines.extend(["", "{} row(s) in set, {} col(s)".format(results['count'], len(results['header']))])142 elif format_ == 'csv':143 lines = results_to_csv(results['header'], results['rows'])144 elif format_ == 'raw_column':145 lines = '\n'.join([str(r[0]) for r in results['rows']]).splitlines()146 elif format_ == 'vertical':147 lines = results_to_vertical(results['header'], results['rows'], results['types'])148 else:149 raise ValueError("Invalid results format '{}'".format(format_))150 elif results['type'] == 'write':151 lines = ["", "{} row(s) affected".format(results['count'])]152 elif results['type'] == 'error':153 lines = results['message'].splitlines()154 if format_ == 'table':155 duration = metadata.get('duration')156 if duration is not None and results['type'] in ['read', 'write']:157 lines[-1] += " ({:.2f} sec)".format(duration)158 warnings = results.get('warnings')159 if warnings:160 lines.extend(['', '[warnings]:'])161 for warning in warnings:162 lines.append("({}) {}".format(warning[1], warning[2]))163 query = metadata.get('query')164 if query is not None:165 lines.extend(['', '---', ''] + query.splitlines())166 return lines167class MySQLTab(object):168 """Represents a MySQL-connected tabpage.169 Each tab has one (primary) connection to a single server.170 """171 AUTOID = 1172 def __init__(self, mysql, vim, tabpage):173 self.vim = vim174 self.mysql = mysql175 self.tabpage = tabpage176 self.autoid = MySQLTab.AUTOID; MySQLTab.AUTOID += 1177 self.conn = None178 self.connection_string = None179 self.server_name = None180 self.status = {181 'executing': False,182 'killing': False,183 'results_pending': False,184 }185 self.results = None186 self.query = None187 self.query_start = None188 self.query_end = None189 self.results_buffer = self._initialize_results_buffer()190 self.results_format = None191 self.tree = Tree(self)192 self.tree_buffer = self._initialize_tree_buffer()193 def _initialize_results_buffer(self):194 cur_buf = self.vim.current.buffer195 # Create196 buf_name = "Results{}".format(self.autoid)197 self.vim.command("badd {}".format(buf_name))198 # Set up199 results_buffer = list(self.vim.buffers)[-1]200 self.vim.command("b! {}".format(results_buffer.number))201 self.vim.command("setl buftype=nofile bufhidden=hide nowrap nonu noswapfile nostartofline")202 self.vim.command("nnoremap <buffer> <S-Left> zH")203 self.vim.command("nnoremap <buffer> <S-Right> zL")204 # close window and go to previous205 self.vim.command("nnoremap <buffer> q :let nr = winnr() <Bar> :wincmd p <Bar> :exe nr . \"wincmd c\"<CR>")206 self.vim.command("nnoremap <buffer> <Leader>c :MySQLShowResults csv<CR>")207 self.vim.command("nnoremap <buffer> <Leader>1 :MySQLShowResults raw_column<CR>")208 self.vim.command("nnoremap <buffer> <Leader>t :MySQLShowResults table<CR>")209 self.vim.command("nnoremap <buffer> <Leader>G :MySQLShowResults vertical<CR>")210 self.vim.command("nnoremap <buffer> <Leader>f :MySQLFreezeResultsHeader<CR>")211 # Switch back212 self.vim.command("b! {}".format(cur_buf.number))213 return results_buffer214 def _initialize_tree_buffer(self):215 cur_buf = self.vim.current.buffer216 # Create217 buf_name = "Tree{}".format(self.autoid)218 self.vim.command("badd {}".format(buf_name))219 # Set up220 tree_buffer = list(self.vim.buffers)[-1]221 self.vim.command("b! {}".format(tree_buffer.number))222 self.vim.command("setl buftype=nofile bufhidden=hide nowrap nonu noswapfile")223 self.vim.command("nnoremap <buffer> <Space> :MySQLTreeToggleDatabase<CR>")224 self.vim.command("nnoremap <buffer> q :let nr = winnr() <Bar> :wincmd p <Bar> :exe nr . \"wincmd c\"<CR>")225 self.vim.command("syn match Directory /^[^ ].*/")226 # Switch back227 self.vim.command("b! {}".format(cur_buf.number))228 return tree_buffer229 def set_connection(self, conn, connection_string, server_name):230 """Set this MySQL tab's database connection to conn."""231 if self.conn:232 self.conn.close()233 self.conn = conn234 self.connection_string = connection_string235 self.server_name = server_name236 self.tabpage.vars['MySQLServer'] = server_name237 self.tree = Tree(self)238 self.tree.refresh_data()239 self.tree_buffer[:] = self.tree.render()240 def update_status(self, **kwargs):241 """Set one or more status flags for this tab.242 Use keyword arguments to do this. Example:243 self.update_status(executing=False, results_pending=True)244 """245 for k, v in kwargs.items():246 if k not in self.status:247 raise KeyError248 self.status[k] = v249 # In case multiple flags are set, the first listed below is the one250 # that shows in vim.251 status_flag = ''252 if self.status['killing']:253 status_flag = 'k'254 elif self.status['executing']:255 status_flag = 'e'256 elif self.status['results_pending']:257 status_flag = 'r'258 logger.debug("status flag: {}".format(status_flag))259 self.tabpage.vars['MySQLStatusFlag'] = status_flag260 self.mysql.refresh_tabline()261 def execute_queries(self, queries, combine_results):262 """Sequentially execute the given queries in this tab.263 If there is an error, execution will stop and the error will be264 displayed.265 Assuming all queries succeed, if combine_results is True,266 aggregate counts will be shown after the last query. (Note that267 these counts pertain only to "write" queries.) If268 combine_results is False, the results of the last query are269 shown.270 """271 # Ignore if a query is already running.272 if self.status['executing']:273 return274 gr = greenlet.getcurrent()275 cursor = self.conn.cursor()276 def query_done():277 logger.debug("query_done called")278 gr.parent = greenlet.getcurrent()279 gr.switch()280 def run_query(query, result):281 logger.debug("run_query called")282 try:283 cursor.execute(query)284 result['description'] = cursor.description285 result['rowcount'] = cursor.rowcount286 result['rows'] = cursor.fetchall()287 cursor.execute("show warnings")288 result['warnings'] = cursor.fetchall()289 except Exception as e:290 result['error'] = "Error: " + repr(e)291 else:292 result['error'] = None293 self.vim.async_call(query_done)294 if combine_results:295 self.query = ''296 self.results = {'type': 'write', 'count': 0, 'warnings': []}297 self.update_status(executing=True)298 self.query_start = time.time()299 for query in queries:300 if combine_results:301 if self.query:302 self.query += '\n\n'303 self.query += query304 else:305 self.query = query306 query_result = {}307 logger.debug("executing query: {}".format(query))308 threading.Thread(target=run_query, args=[query, query_result]).start()309 gr.parent.switch()310 # Query is done.311 if query_result['error']:312 self.results = {'type': 'error', 'message': query_result['error']}313 break314 if combine_results:315 # for "write" queries, add to count316 if not cursor.description:317 self.results['count'] += query_result['rowcount']318 self.results['warnings'].extend(query_result['warnings'])319 else:320 if not query_result['description']:321 self.results = {322 'type': 'write',323 'count': query_result['rowcount'],324 'warnings': query_result['warnings'],325 }326 else:327 header = [f[0] for f in query_result['description']]328 types = [f[1] for f in query_result['description']]329 rows = query_result['rows']330 self.results = {331 'type': 'read',332 'header': header,333 'types': types,334 'rows': rows,335 'count': query_result['rowcount'],336 'warnings': query_result['warnings'],337 }338 self.query_end = time.time()339 cursor.close()340 self.update_status(executing=False, killing=False)341 # TODO: Differentiate results pending from error pending?342 self.update_status(results_pending=True)343 self.vim.command('MySQLShowResults table {}'.format(self.autoid))344 def execute_query(self, query):345 """Execute the given query in this tab.346 Results will be displayed if appropriate when the query finishes.347 """348 self.execute_queries([query], False)349 def complete(self, findstart, base):350 create_new_conn = self.status['executing']351 if create_new_conn:352 logger.debug("query is executing, so creating new connection for autocomplete")353 db_params = cxnstr.to_dict(self.connection_string)354 conn = pymysql.connect(use_unicode=True, **db_params)355 else:356 logger.debug("using existing connection for autocomplete")357 conn = self.conn358 result = nvim_mysql.autocomplete.complete(findstart, base, self.vim, conn.cursor())359 if create_new_conn:360 logger.debug("closing autocomplete connection")361 conn.close()362 return result363 def get_aux_window(self, target):364 target_buffer = self.results_buffer if target == 'results' else self.tree_buffer365 for window in self.vim.current.tabpage.windows:366 if window.buffer == target_buffer:367 return window368 return None369 def get_results_window(self):370 return self.get_aux_window('results')371 def get_tree_window(self):372 return self.get_aux_window('tree')373 def open_aux_window(self, target):374 # If target window is already open, jump to it.375 target_window = self.get_aux_window(target)376 if target_window is not None:377 logger.debug("{} window is already open in this tab".format(target))378 self.vim.command('{}wincmd w'.format(target_window.number))379 return380 # If not, open it.381 # First, check to see if we'll need to give the other window precedence.382 other = 'tree' if target == 'results' else 'results'383 other_window = self.get_aux_window(other)384 reopen_other_window = other_window is not None and self.mysql.get_option('aux_window_pref') == other385 if reopen_other_window:386 # If so, close for now (then we'll re-open).387 self.vim.command("{}wincmd c".format(other_window.number))388 # Open target window.389 if target == 'results':390 result_win_height = int(self.vim.current.window.height * 0.35)391 split_command = "botright {} split".format(result_win_height)392 else:393 tree_win_width = int(self.vim.current.window.width * 0.17)394 split_command = "vertical topleft {} split".format(tree_win_width)395 logger.debug("split command: {}".format(split_command))396 self.vim.command(split_command)397 target_buffer = self.results_buffer if target == 'results' else self.tree_buffer398 self.vim.command("b! {}".format(target_buffer.number))399 if reopen_other_window:400 self.open_aux_window(other)401 # switch back to our window402 self.vim.command("{}wincmd w".format(self.get_aux_window(target).number))403 def open_results_window(self):404 self.open_aux_window('results')405 def open_tree_window(self):406 self.open_aux_window('tree')407 def close(self):408 try:409 self.conn.close()410 except:411 pass412 self.vim.command("bd! {}".format(self.results_buffer.number))413 self.vim.command("bd! {}".format(self.tree_buffer.number))414@pynvim.plugin415class MySQL(object):416 """Plugin interface to neovim."""417 def __init__(self, vim):418 self.vim = vim419 self.tabs = {}420 self.initialized = False421 logger.debug("plugin loaded by host")422 def get_option(self, name):423 return self.vim.vars.get('nvim_mysql#{}'.format(name), OPTION_DEFAULTS[name])424 @pynvim.command('MySQLConnect', nargs=1, sync=True)425 def connect(self, args):426 """Use the given connection_string to connect the current tabpage to a MySQL server."""427 target = args[0]428 aliases = self.get_option('aliases')429 if aliases is not None and target in aliases:430 logger.debug("'{}' is an alias for '{}'".format(target, aliases[target]))431 connection_string = aliases[target]432 server_name = target433 else:434 connection_string = target435 server_name = None436 db_params = cxnstr.to_dict(connection_string)437 if server_name is None:438 server_name = db_params['host']439 logger.debug("connecting to {}".format(connection_string))440 conn = pymysql.connect(use_unicode=True, **db_params)441 conn.autocommit(True)442 logger.debug("connection succeeded")443 tabpage = self.vim.current.tabpage444 if tabpage in self.tabs:445 logger.debug("this tab is already MySQL-connected, will replace connection")446 tab = self.tabs[tabpage]447 else:448 logger.debug("this tab is not MySQL-connected, will initialize")449 tab = self.tabs[tabpage] = MySQLTab(self, self.vim, tabpage)450 tab.set_connection(conn, connection_string, server_name)451 if self.vim.current.buffer.name == '' and 'current_syntax' not in self.vim.current.buffer.vars:452 self.vim.command('set ft=mysql')453 if not self.initialized:454 self._initialize()455 self.refresh_tabline()456 @pynvim.command('MySQLExecQueryUnderCursor', sync=False)457 def exec_query_under_cursor(self):458 """Execute the query under the cursor.459 This command assumes that all queries are separated by at least one460 blank line.461 """462 if not self.initialized:463 raise NvimMySQLError("Use MySQLConnect to connect to a database first")464 current_tab = self.tabs.get(self.vim.current.tabpage, None)465 if current_tab is None:466 raise NvimMySQLError("This is not a MySQL-connected tabpage")467 query, _ = nvim_mysql.util.get_query_under_cursor(468 self.vim.current.buffer,469 self.vim.current.window.cursor[0] - 1,470 self.vim.current.window.cursor[1]471 )472 if query is not None:473 current_tab.execute_query(query)474 @pynvim.command('MySQLExecQueriesInRange', range='', sync=False)475 def exec_queries_in_range(self, range):476 """Execute the queries in the visual selection.477 Results of individual queries are not shown.478 This command assumes that all queries are separated by at least one479 blank line.480 """481 if not self.initialized:482 raise NvimMySQLError("Use MySQLConnect to connect to a database first")483 current_tab = self.tabs.get(self.vim.current.tabpage, None)484 if current_tab is None:485 raise NvimMySQLError("This is not a MySQL-connected tabpage")486 queries = nvim_mysql.util.get_queries_in_range(self.vim.current.buffer, range[0] - 1, range[1] - 1)487 current_tab.execute_queries(queries, len(queries) > 1)488 def _run_query_on_table_under_cursor(self, query_fmt):489 """Run a query on the table under the cursor."""490 if not self.initialized:491 raise NvimMySQLError("Use MySQLConnect to connect to a database first")492 current_tab = self.tabs.get(self.vim.current.tabpage, None)493 if current_tab is None:494 raise NvimMySQLError("This is not a MySQL-connected tabpage")495 # Special handling for tables in the tree buffer.496 if self.vim.current.buffer == current_tab.tree_buffer:497 # Ignore if we're on a database row.498 if not self.vim.current.line.startswith(' '):499 return500 table = self.vim.current.line.strip()501 database, _, _ = nvim_mysql.util.get_parent_database_in_tree(502 self.vim.current.buffer,503 self.vim.current.window.cursor[0] - 1504 )505 table = database + '.' + table506 else:507 word = nvim_mysql.util.get_word_under_cursor(508 self.vim.current.buffer,509 self.vim.current.window.cursor[0] - 1,510 self.vim.current.window.cursor[1]511 )512 table = nvim_mysql.util.word_to_table(word)513 if nvim_mysql.util.table_exists(current_tab.conn, table):514 query = query_fmt.format(table)515 current_tab.execute_query(query)516 else:517 raise NvimMySQLError("Table '{}' does not exist".format(table))518 @pynvim.command('MySQLDescribeTableUnderCursor', sync=False)519 def describe_table_under_cursor(self):520 """Describe the table under the cursor."""521 self._run_query_on_table_under_cursor("describe {}")522 @pynvim.command('MySQLShowIndexesFromTableUnderCursor', sync=False)523 def show_indexes_from_table_under_cursor(self):524 """Show indexes from the table under the cursor."""525 self._run_query_on_table_under_cursor("show indexes from {}")526 @pynvim.command('MySQLSampleTableUnderCursor', sync=False)527 def sample_table_under_cursor(self):528 """Select a sampling of rows from the table under the cursor."""529 self._run_query_on_table_under_cursor("select * from {} limit 100")530 @pynvim.command('MySQLSelectAllFromTableUnderCursor', sync=False)531 def select_all_from_table_under_cursor(self):532 """Select all rows from the table under the cursor."""533 self._run_query_on_table_under_cursor("select * from {}")534 @pynvim.command('MySQLCountTableUnderCursor', sync=False)535 def count_table_under_cursor(self):536 """Select count(*) from the table under the cursor."""537 self._run_query_on_table_under_cursor("select count(*) from {}")538 @pynvim.command('MySQLKillQuery', sync=True)539 def kill_query(self):540 """Kill the query currently executing in the current tabpage.541 This command creates an additional connection to the server to542 kill the query.543 """544 if not self.initialized:545 raise NvimMySQLError("Use MySQLConnect to connect to a database first")546 current_tab = self.tabs.get(self.vim.current.tabpage, None)547 if current_tab is None:548 raise NvimMySQLError("This is not a MySQL-connected tabpage")549 # If there's no running query, ignore.550 if not current_tab.status['executing']:551 raise NvimMySQLError("No query is currently running in this tab")552 current_tab.update_status(killing=True)553 query_id = current_tab.conn.thread_id()554 logger.debug("thread id: {}".format(query_id))555 db_params = cxnstr.to_dict(current_tab.connection_string)556 conn = pymysql.connect(use_unicode=True, **db_params)557 try:558 cursor = conn.cursor()559 cursor.execute("kill query {}".format(query_id))560 finally:561 conn.close()562 logger.debug("done killing query")563 @pynvim.command('MySQLShowResults', nargs='*', sync=True)564 def show_results(self, args):565 """Display the results buffer.566 :MySQLShowResults <format> <tab_autoid>567 Both arguments are optional, but format must be specified if tab_autoid568 is specified.569 format can be one of 'table' (the default), 'csv', or 'raw_column'.570 'table' is an ASCII table format, similar to the standard MySQL client.571 'csv' formats the result set as a CSV file.572 'raw_column' is a raw view of a single column (the first column, if the573 result set contains more than one). For a 1x1 result set, this format574 lets you see the raw data of a single data point, which can be helpful575 for long text fields and/or text fields with newlines. It's also useful576 for quickly extracting a list of field names from DESCRIBE output.577 If tab_autoid is specified, only show the results if we are currently578 in the MySQLTab with the given autoid. If tab_autoid is not specified,579 show the results no matter what.580 """581 if not self.initialized:582 raise NvimMySQLError("Use MySQLConnect to connect to a database first")583 logger.debug("show_results args: {}".format(args))584 if len(args) > 1:585 tab_autoid = int(args[1])586 else:587 tab_autoid = None588 if len(args) > 0:589 format_ = args[0]590 if format_ not in ['table', 'csv', 'raw_column', 'vertical']:591 raise NvimMySQLError("Invalid results format '{}'".format(format_))592 else:593 format_ = 'table'594 current_tab = self.tabs.get(self.vim.current.tabpage, None)595 if current_tab is None:596 if tab_autoid is None:597 raise NvimMySQLError("This is not a MySQL-connected tabpage")598 else:599 return600 # If we were called with a specific tab number and we're not in601 # that tab, ignore.602 if tab_autoid is not None and tab_autoid != current_tab.autoid:603 return604 current_tab.open_results_window()605 if current_tab.query and (current_tab.status['results_pending'] or format_ != current_tab.results_format):606 metadata = {607 'query': current_tab.query,608 'duration': current_tab.query_end - current_tab.query_start,609 }610 current_tab.results_buffer[:] = format_results(current_tab.results, format_, metadata)611 current_tab.results_format = format_612 self.vim.command("normal gg0")613 current_tab.update_status(results_pending=False)614 # If this was done automatically, switch back to wherever the user was.615 if tab_autoid is not None:616 self.vim.command('wincmd p')617 @pynvim.command('MySQLFreezeResultsHeader', sync=True)618 def freeze_results_header(self):619 if not self.initialized:620 raise NvimMySQLError("Use MySQLConnect to connect to a database first")621 current_tab = self.tabs.get(self.vim.current.tabpage, None)622 if current_tab is None:623 raise NvimMySQLError("This is not a MySQL-connected tabpage")624 if current_tab.results_buffer != self.vim.current.buffer:625 raise NvimMySQLError("This command can only be run in results buffer")626 self.vim.feedkeys("""gg^:=winheight('%')-4 sp L3jH^:se scb k:se scb :se sbo=hor j""")627 @pynvim.command('MySQLShowTree', sync=True)628 def show_tree(self):629 """Display the tree buffer."""630 if not self.initialized:631 raise NvimMySQLError("Use MySQLConnect to connect to a database first")632 current_tab = self.tabs.get(self.vim.current.tabpage, None)633 if current_tab is None:634 raise NvimMySQLError("This is not a MySQL-connected tabpage")635 current_tab.open_tree_window()636 current_tab.tree.refresh_data()637 current_tab.tree_buffer[:] = current_tab.tree.render()638 @pynvim.command('MySQLTreeToggleDatabase', sync=True)639 def tree_toggle_database(self):640 """Open or close the nearest database in the tree."""641 if not self.initialized:642 raise NvimMySQLError("Use MySQLConnect to connect to a database first")643 current_tab = self.tabs.get(self.vim.current.tabpage, None)644 if current_tab is None:645 raise NvimMySQLError("This is not a MySQL-connected tabpage")646 if current_tab.tree_buffer != self.vim.current.buffer:647 raise NvimMySQLError("This command can only be run in tree buffer")648 database, expanded, row = nvim_mysql.util.get_parent_database_in_tree(649 self.vim.current.buffer,650 self.vim.current.window.cursor[0] - 1651 )652 if expanded:653 current_tab.tree.close(database)654 else:655 current_tab.tree.open(database)656 current_tab.tree.refresh_data()657 current_tab.tree_buffer[:] = current_tab.tree.render()658 self.vim.current.window.cursor = [row + 1, 0]659 @pynvim.function('MySQLComplete', sync=True)660 def complete(self, args):661 findstart, base = args662 if not self.initialized:663 raise NvimMySQLError("Use MySQLConnect to connect to a database first")664 current_tab = self.tabs.get(self.vim.current.tabpage, None)665 # If this isn't a MySQL tab, ignore.666 if current_tab is None:667 return 0 if findstart else []668 return current_tab.complete(findstart, base)669 @pynvim.autocmd('TabClosed', sync=True)670 def cleanup_tabs_on_tabclosed(self):671 if self.initialized:672 self.cleanup_tabs()673 def cleanup_tabs(self):674 logger.debug("number of open tabs: {}".format(len(self.vim.tabpages)))675 for nvim_tab, mysql_tab in list(self.tabs.items()):676 if nvim_tab not in self.vim.tabpages:677 logger.debug("tab w/ handle {} is not longer open. closing.".format(nvim_tab.handle))678 mysql_tab.close()679 del self.tabs[nvim_tab]680 @pynvim.autocmd('WinEnter', sync=True)681 def auto_close_aux_windows_on_winenter(self):682 """Close remaining windows in tab when all are disposable."""683 if self.initialized:684 def closeable(window):685 auto_close_results = bool(self.get_option('auto_close_results'))686 is_results_window = window.buffer == current_tab.results_buffer687 is_tree_window = window.buffer == current_tab.tree_buffer688 return (auto_close_results and is_results_window) or is_tree_window689 tabpage = self.vim.current.tabpage690 current_tab = self.tabs.get(tabpage, None)691 if current_tab is not None:692 if all(closeable(w) for w in tabpage.windows):693 for _ in range(len(tabpage.windows)):694 self.vim.command('q')695 # We have to call this manually because the TabClosed696 # autocommand doesn't appear to be called when using697 # vim.command.698 self.cleanup_tabs()699 def _initialize(self):700 logger.debug("initializing plugin")701 self.initialized = True702 tabline_file = os.path.join(os.path.dirname(__file__), 'tabline.vim')703 self.vim.command('source {}'.format(tabline_file))704 # Set up autocomplete705 self.vim.command('set completefunc=MySQLComplete')706 self.refresh_tabline()707 if self.get_option('use_spinner'):708 self.start_spinner()709 logger.debug("plugin initialized")710 def refresh_tabline(self, spinner_char=None):711 if spinner_char:712 self.vim.vars['nvim_mysql#spinner_char'] = spinner_char713 self.vim.command('set showtabline=2 tabline=%!MySQLTabLine()')714 def start_spinner(self):715 def spin():716 i = 0717 while True:718 i = i % len(SPINNER_CHARS)719 self.vim.async_call(self.refresh_tabline, SPINNER_CHARS[i])720 time.sleep(.1)721 i += 1722 t = threading.Thread(target=spin)723 t.daemon = True724 t.start()725class Tree(object):726 """Internal representation of tree view."""727 def __init__(self, tab):728 self.tab = tab729 self.data = {} # {db: {expanded: bool, objects: [str]}}730 def refresh_data(self):731 cursor = self.tab.conn.cursor()732 cursor.execute("show databases")733 databases = [r[0] for r in cursor.fetchall()]734 # Remove databases that are no longer listed735 for database in self.data:736 if database not in databases:737 del self.data[database]738 # Add new databases739 for database in databases:740 if database not in self.data:741 self.data[database] = {'expanded': False, 'objects': []}742 # Update objects for expanded databases743 for database in self.data:744 if self.data[database]['expanded']:745 cursor.execute("show tables from {}".format(database))746 tables = [r[0] for r in cursor.fetchall()]747 self.data[database]['objects'] = tables748 def open(self, database):749 self.data[database]['expanded'] = True750 def close(self, database):751 self.data[database]['expanded'] = False752 def render(self):753 s = ''754 for database in sorted(self.data):755 s += database756 s += u' ▾' if self.data[database]['expanded'] else u' ▸'757 s += '\n'758 if self.data[database]['expanded']:759 s += ' ' + '\n '.join(self.data[database]['objects']) + '\n'...

Full Screen

Full Screen

master_qa.py

Source:master_qa.py Github

copy

Full Screen

...279 if START_IN_FULL_SCREEN_MODE:280 self.maximize_window()281 def verify(self, *args):282 self.manual_page_check(*args)283 def auto_close_results(self):284 ''' If this method is called, the results page will automatically close285 at the end of the test run, rather than waiting on the user to close286 the results page manually.287 '''288 self.auto_close_results_page = True289 def tearDown(self):290 if sys.exc_info()[1]:291 self.add_failure(sys.exc_info()[1])292 self.process_manual_check_results(self.auto_close_results_page)...

Full Screen

Full Screen

results_page.py

Source:results_page.py Github

copy

Full Screen

2class MasterQATests(MasterQA):3 def test_results_page_only(self):4 ''' Importing MasterQA should take you to the results5 page at the end of the test, even if testing nothing. '''...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run SeleniumBase automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful