How to use cli method in synthetixio-synpress

Best JavaScript code snippet using synthetixio-synpress

generate_cli_trees.py

Source:generate_cli_trees.py Github

copy

Full Screen

1# -*- coding: utf-8 -*- #2# Copyright 2017 Google Inc. All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15"""gcloud CLI tree generators for non-gcloud CLIs.16A CLI tree for a supported command is generated by using the root command plus17`help` or `--help` arguments to do a DFS traversal. Each node is generated18from a man-ish style runtime document.19Supported CLI commands have their own runtime help document quirks, so each is20handled by an ad-hoc parser. The parsers rely on consistency within commands21and between command releases.22The CLI tree for an unsupported command is generated from the output of23`man the-command` and contains only the command root node.24"""25from __future__ import absolute_import26from __future__ import division27from __future__ import unicode_literals28import abc29import json30import os31import re32import subprocess33import sys34import tarfile35import textwrap36from googlecloudsdk.calliope import cli_tree37from googlecloudsdk.command_lib.static_completion import generate as generate_static38from googlecloudsdk.command_lib.static_completion import lookup39from googlecloudsdk.core import exceptions40from googlecloudsdk.core import http41from googlecloudsdk.core import log42from googlecloudsdk.core.console import progress_tracker43from googlecloudsdk.core.resource import resource_printer44from googlecloudsdk.core.util import encoding45from googlecloudsdk.core.util import files46from googlecloudsdk.core.util import text as text_utils47import six48from six.moves import range49class Error(exceptions.Error):50 """Exceptions for this module."""51class NoCliTreeForCommand(Error):52 """Command does not have a CLI tree."""53class NoCliTreeGeneratorForCommand(Error):54 """Command does not have a CLI tree generator."""55class NoManPageTextForCommand(Error):56 """Could not get man page text for command."""57# TODO(b/69033748): disable until issues resolved58def _DisableLongRunningCliTreeGeneration(command):59 """Returns True if long running CLI tree generation is disabled."""60 # Only a few command generators are long running.61 if command not in ('bq', 'gsutil', 'kubectl'):62 return False63 # Only generate these CLI trees on the fly for explicit requests.64 # It can take ~1minute, not good, especially at the interactive prompt.65 if 'update-cli-trees' in sys.argv or '--update-cli-trees' in sys.argv:66 return False67 # It's a long running generator, not explicitly requested -- disable.68 return True69def _NormalizeSpace(text):70 """Returns text dedented and multiple non-indent spaces replaced by one."""71 return re.sub('([^ ]) *', r'\1 ', textwrap.dedent(text)).strip('\n')72def _Flag(name, description='', value=None, default=None, type_='string',73 category='', is_global=False, is_required=False, nargs=None):74 """Initializes and returns a flag dict node."""75 return {76 cli_tree.LOOKUP_ATTR: {},77 cli_tree.LOOKUP_CATEGORY: category,78 cli_tree.LOOKUP_DEFAULT: default,79 cli_tree.LOOKUP_DESCRIPTION: _NormalizeSpace(description),80 cli_tree.LOOKUP_GROUP: '',81 cli_tree.LOOKUP_IS_GLOBAL: is_global,82 cli_tree.LOOKUP_IS_HIDDEN: False,83 cli_tree.LOOKUP_IS_REQUIRED: is_required,84 cli_tree.LOOKUP_NAME: name,85 cli_tree.LOOKUP_NARGS: nargs or ('0' if type_ == 'bool' else '1'),86 cli_tree.LOOKUP_VALUE: value,87 cli_tree.LOOKUP_TYPE: type_,88 }89def _Positional(name, description='', default=None, nargs='0'):90 """Initializes and returns a positional dict node."""91 return {92 cli_tree.LOOKUP_DEFAULT: default,93 cli_tree.LOOKUP_DESCRIPTION: _NormalizeSpace(description),94 cli_tree.LOOKUP_NAME: name,95 cli_tree.LOOKUP_NARGS: nargs,96 }97def _Command(path):98 """Initializes and returns a command/group dict node."""99 return {100 cli_tree.LOOKUP_CAPSULE: '',101 cli_tree.LOOKUP_COMMANDS: {},102 cli_tree.LOOKUP_FLAGS: {},103 cli_tree.LOOKUP_GROUPS: {},104 cli_tree.LOOKUP_IS_GROUP: False,105 cli_tree.LOOKUP_IS_HIDDEN: False,106 cli_tree.LOOKUP_PATH: path,107 cli_tree.LOOKUP_POSITIONALS: [],108 cli_tree.LOOKUP_RELEASE: 'GA',109 cli_tree.LOOKUP_SECTIONS: {},110 }111def _GetDirectories(directory=None, warn_on_exceptions=False):112 """Returns the list of directories to search for CLI trees.113 Args:114 directory: The directory containing the CLI tree JSON files. If None115 then the default installation and config directories are used.116 warn_on_exceptions: Emits warning messages in lieu of exceptions.117 """118 # Initialize the list of directories to search for CLI tree files. The default119 # CLI tree is only searched for and generated in directories[0]. Other120 # existing trees are updated in the directory in which they were found. New121 # trees are generated in directories[-1].122 directories = []123 if directory:124 directories.append(directory)125 else:126 try:127 directories.append(cli_tree.CliTreeDir())128 except cli_tree.SdkRootNotFoundError as e:129 if not warn_on_exceptions:130 raise131 log.warning(six.text_type(e))132 directories.append(cli_tree.CliTreeConfigDir())133 return directories134class CliTreeGenerator(six.with_metaclass(abc.ABCMeta, object)):135 """Base CLI tree generator."""136 _FAILURES = None137 @classmethod138 def MemoizeFailures(cls, enable):139 """Memoizes failed attempts and doesn't repeat them if enable is True."""140 cls._FAILURES = set() if enable else None141 @classmethod142 def AlreadyFailed(cls, command):143 """Returns True if man page request for command already failed."""144 return command in cls._FAILURES if cls._FAILURES else False145 @classmethod146 def AddFailure(cls, command):147 """Add command to the set of failed man generations."""148 if cls._FAILURES is not None:149 cls._FAILURES.add(command)150 def __init__(self, command, command_name=None, tarball=None):151 self.command = command152 self.command_name = command_name or command153 self._cli_version = None # For memoizing GetVersion()154 self._tarball = tarball155 def Run(self, cmd):156 """Runs cmd and returns the output as a string."""157 return encoding.Decode(subprocess.check_output([self.command] + cmd[1:]))158 def GetVersion(self):159 """Returns the CLI_VERSION string."""160 if not self._cli_version:161 try:162 self._cli_version = self.Run([self.command, 'version']).split()[-1]163 except: # pylint: disable=bare-except164 self._cli_version = cli_tree.CLI_VERSION_UNKNOWN165 return self._cli_version166 @abc.abstractmethod167 def Generate(self):168 """Generates and returns the CLI tree dict."""169 return None170 def FindTreeFile(self, directories):171 """Returns (path,f) open for read for the first CLI tree in directories."""172 for directory in directories or _GetDirectories(warn_on_exceptions=True):173 path = os.path.join(directory or '.', self.command_name) + '.json'174 try:175 return path, files.FileReader(path)176 except files.Error:177 pass178 return path, None179 def IsUpToDate(self, tree, verbose=False):180 """Returns a bool tuple (readonly, up_to_date)."""181 actual_cli_version = tree.get(cli_tree.LOOKUP_CLI_VERSION)182 readonly = actual_cli_version == cli_tree.CLI_VERSION_READONLY183 # Check the schema version.184 actual_tree_version = tree.get(cli_tree.LOOKUP_VERSION)185 if actual_tree_version != cli_tree.VERSION:186 return readonly, False187 # Check the CLI version.188 expected_cli_version = self.GetVersion()189 if readonly:190 # READONLY trees are always up to date.191 pass192 elif expected_cli_version == cli_tree.CLI_VERSION_UNKNOWN:193 # Don't know how to regenerate but we have one in hand -- accept it.194 pass195 elif actual_cli_version != expected_cli_version:196 return readonly, False197 # Schema and CLI versions are up to date.198 if verbose:199 log.status.Print('[{}] CLI tree version [{}] is up to date.'.format(200 self.command, actual_cli_version))201 return readonly, True202 def LoadOrGenerate(self, directories=None, force=False, generate=True,203 ignore_out_of_date=False, tarball=False, verbose=False,204 warn_on_exceptions=False):205 """Loads the CLI tree or generates it if necessary, and returns the tree."""206 f = None207 try:208 path, f = self.FindTreeFile(directories)209 if not f:210 # TODO(b/69033748): disable until issues resolved211 if _DisableLongRunningCliTreeGeneration(self.command):212 return None213 else:214 up_to_date = False215 try:216 tree = json.load(f)217 except ValueError:218 # Corrupt JSON -- could have been interrupted.219 tree = None220 if tree:221 readonly, up_to_date = self.IsUpToDate(tree, verbose=verbose)222 if readonly:223 return tree224 elif up_to_date:225 if not force:226 return tree227 elif ignore_out_of_date:228 return None229 finally:230 if f:231 f.close()232 def _Generate():233 """Helper that generates a CLI tree and writes it to a JSON file."""234 tree = self.Generate()235 if tree:236 try:237 f = files.FileWriter(path)238 except files.Error as e:239 # CLI data config dir may not be initialized yet.240 directory, _ = os.path.split(path)241 try:242 files.MakeDir(directory)243 f = files.FileWriter(path)244 except files.Error:245 if not warn_on_exceptions:246 raise247 log.warning(six.text_type(e))248 return None249 with f:250 resource_printer.Print(tree, print_format='json', out=f)251 return tree252 # At this point:253 # (1) the tree is not found or is out of date254 # (2) the tree is not readonly255 # (3) we have a generator for the tree256 if not generate:257 raise NoCliTreeForCommand('No CLI tree for [{}].'.format(self.command))258 if not verbose:259 return _Generate()260 with progress_tracker.ProgressTracker(261 '{} the [{}] CLI tree'.format(262 'Updating' if f else 'Generating', self.command)):263 return _Generate()264class _BqCollector(object):265 """bq help document section collector."""266 def __init__(self, text):267 self.text = text.split('\n')268 self.heading = 'DESCRIPTION'269 self.lookahead = None270 self.ignore_trailer = False271 def Collect(self, strip_headings=False):272 """Returns the heading and content lines from text."""273 content = []274 if self.lookahead:275 if not strip_headings:276 content.append(self.lookahead)277 self.lookahead = None278 heading = self.heading279 self.heading = None280 while self.text:281 line = self.text.pop(0)282 if line.startswith(' ') or not strip_headings and not self.ignore_trailer:283 content.append(line.rstrip())284 while content and not content[0]:285 content.pop(0)286 while content and not content[-1]:287 content.pop()288 self.ignore_trailer = True289 return heading, content290class BqCliTreeGenerator(CliTreeGenerator):291 """bq CLI tree generator."""292 def Run(self, cmd):293 """Runs cmd and returns the output as a string."""294 try:295 output = subprocess.check_output([self.command] + cmd[1:])296 except subprocess.CalledProcessError as e:297 # bq exit code is 1 for help and --help. How do you know if help failed?298 if e.returncode != 1:299 raise300 output = e.output301 return encoding.Decode(output).replace('bq.py', 'bq')302 def AddFlags(self, command, content, is_global=False):303 """Adds flags in content lines to command."""304 while content:305 line = content.pop(0)306 name, description = line.strip().split(':', 1)307 paragraph = [description.strip()]308 default = ''309 while content and not content[0].startswith(' --'):310 line = content.pop(0).strip()311 if line.startswith('(default: '):312 default = line[10:-1]313 else:314 paragraph.append(line)315 description = ' '.join(paragraph).strip()316 if name.startswith('--[no]'):317 name = '--' + name[6:]318 type_ = 'bool'319 value = ''320 else:321 value = 'VALUE'322 type_ = 'string'323 command[cli_tree.LOOKUP_FLAGS][name] = _Flag(324 name=name,325 description=description,326 type_=type_,327 value=value,328 default=default,329 is_required=False,330 is_global=is_global,331 )332 def SubTree(self, path):333 """Generates and returns the CLI subtree rooted at path."""334 command = _Command(path)335 command[cli_tree.LOOKUP_IS_GROUP] = True336 text = self.Run([self.command, 'help'] + path[1:])337 # `bq help` lists help for all commands. Command flags are "defined"338 # by example. We don't attempt to suss that out.339 content = text.split('\n')340 while content:341 line = content.pop(0)342 if not line or not line[0].islower():343 continue344 name, text = line.split(' ', 1)345 description = [text.strip()]346 examples = []347 arguments = []348 paragraph = description349 while content and (not content[0] or not content[0][0].islower()):350 line = content.pop(0).strip()351 if line == 'Arguments:':352 paragraph = arguments353 elif line == 'Examples:':354 paragraph = examples355 else:356 paragraph.append(line)357 subcommand = _Command(path + [name])358 command[cli_tree.LOOKUP_COMMANDS][name] = subcommand359 if description:360 subcommand[cli_tree.LOOKUP_SECTIONS]['DESCRIPTION'] = '\n'.join(361 description)362 if examples:363 subcommand[cli_tree.LOOKUP_SECTIONS]['EXAMPLES'] = '\n'.join(364 examples)365 return command366 def Generate(self):367 """Generates and returns the CLI tree rooted at self.command."""368 # Construct the tree minus the global flags.369 tree = self.SubTree([self.command_name])370 # Add the global flags to the root.371 text = self.Run([self.command, '--help'])372 collector = _BqCollector(text)373 _, content = collector.Collect(strip_headings=True)374 self.AddFlags(tree, content, is_global=True)375 # Finally add the version stamps.376 tree[cli_tree.LOOKUP_CLI_VERSION] = self.GetVersion()377 tree[cli_tree.LOOKUP_VERSION] = cli_tree.VERSION378 return tree379class _GsutilCollector(object):380 """gsutil help document section collector."""381 UNKNOWN, ROOT, MAN, TOPIC = list(range(4))382 def __init__(self, text):383 self.text = text.split('\n')384 self.heading = 'CAPSULE'385 self.page_type = self.UNKNOWN386 def Collect(self, strip_headings=False):387 """Returns the heading and content lines from text."""388 content = []389 heading = self.heading390 self.heading = None391 while self.text:392 line = self.text.pop(0)393 if self.page_type == self.UNKNOWN:394 # The first heading distinguishes the document page type.395 if line.startswith('Usage:'):396 self.page_type = self.ROOT397 continue398 elif line == 'NAME':399 self.page_type = self.MAN400 heading = 'CAPSULE'401 continue402 elif not line.startswith(' '):403 continue404 elif self.page_type == self.ROOT:405 # The root help page.406 if line == 'Available commands:':407 heading = 'COMMANDS'408 continue409 elif line == 'Additional help topics:':410 self.heading = 'TOPICS'411 break412 elif not line.startswith(' '):413 continue414 elif self.page_type == self.MAN:415 # A command/subcommand man style page.416 if line == 'OVERVIEW':417 self.page_type = self.TOPIC418 self.heading = 'DESCRIPTION'419 break420 elif line == 'SYNOPSIS':421 self.heading = line422 break423 elif line.endswith('OPTIONS'):424 self.heading = 'FLAGS'425 break426 elif line and line[0].isupper():427 self.heading = line.split(' ', 1)[-1]428 break429 elif self.page_type == self.TOPIC:430 # A topic man style page.431 if line and line[0].isupper():432 self.heading = line433 break434 if line.startswith(' ') or not strip_headings:435 content.append(line.rstrip())436 while content and not content[0]:437 content.pop(0)438 while content and not content[-1]:439 content.pop()440 return heading, content441class GsutilCliTreeGenerator(CliTreeGenerator):442 """gsutil CLI tree generator."""443 def __init__(self, *args, **kwargs):444 super(GsutilCliTreeGenerator, self).__init__(*args, **kwargs)445 self.topics = []446 def Run(self, cmd):447 """Runs the command in cmd and returns the output as a string."""448 try:449 if self._tarball:450 # package time invocation requires explicit #! override451 cmd = [sys.executable, self.command] + cmd[1:]452 output = subprocess.check_output(cmd)453 except subprocess.CalledProcessError as e:454 # gsutil exit code is 1 for --help depending on the context.455 if e.returncode != 1:456 raise457 output = e.output458 return encoding.Decode(output)459 def AddFlags(self, command, content, is_global=False):460 """Adds flags in content lines to command."""461 def _Add(name, description):462 value = ''463 type_ = 'bool'464 default = ''465 command[cli_tree.LOOKUP_FLAGS][name] = _Flag(466 name=name,467 description=description,468 type_=type_,469 value=value,470 default=default,471 is_required=False,472 is_global=is_global,473 )474 parse = re.compile(' *((-[^ ]*,)* *(-[^ ]*) *)(.*)')475 name = None476 description = []477 for line in content:478 if line.startswith(' -'):479 if name:480 _Add(name, '\n'.join(description))481 match = parse.match(line)482 name = match.group(3)483 description = [match.group(4).rstrip()]484 elif len(line) > 16:485 description.append(line[16:].rstrip())486 if name:487 _Add(name, '\n'.join(description))488 def SubTree(self, path):489 """Generates and returns the CLI subtree rooted at path."""490 command = _Command(path)491 is_help_command = len(path) > 1 and path[1] == 'help'492 if is_help_command:493 cmd = path494 else:495 cmd = path + ['--help']496 text = self.Run(cmd)497 collector = _GsutilCollector(text)498 while True:499 heading, content = collector.Collect()500 if not heading:501 break502 elif heading == 'CAPSULE':503 if content:504 command[cli_tree.LOOKUP_CAPSULE] = content[0].split('-', 1)[1].strip()505 elif heading == 'COMMANDS':506 if is_help_command:507 continue508 for line in content:509 try:510 name = line.split()[0]511 except IndexError:512 continue513 if name == 'update':514 continue515 command[cli_tree.LOOKUP_IS_GROUP] = True516 command[cli_tree.LOOKUP_COMMANDS][name] = self.SubTree(path + [name])517 elif heading == 'FLAGS':518 self.AddFlags(command, content)519 elif heading == 'SYNOPSIS':520 commands = []521 for line in content:522 if not line:523 break524 cmd = line.split()525 if len(cmd) <= len(path):526 continue527 if cmd[:len(path)] == path:528 name = cmd[len(path)]529 if name[0].islower() and name not in ('off', 'on', 'false', 'true'):530 commands.append(name)531 if len(commands) > 1:532 command[cli_tree.LOOKUP_IS_GROUP] = True533 for name in commands:534 command[cli_tree.LOOKUP_COMMANDS][name] = self.SubTree(535 path + [name])536 elif heading == 'TOPICS':537 for line in content:538 try:539 self.topics.append(line.split()[0])540 except IndexError:541 continue542 elif heading.isupper():543 if heading.lower() == path[-1]:544 heading = 'DESCRIPTION'545 command[cli_tree.LOOKUP_SECTIONS][heading] = '\n'.join(546 [line[2:] for line in content])547 return command548 def Generate(self):549 """Generates and returns the CLI tree rooted at self.command."""550 tree = self.SubTree([self.command_name])551 # Add the global flags to the root.552 text = self.Run([self.command, 'help', 'options'])553 collector = _GsutilCollector(text)554 while True:555 heading, content = collector.Collect()556 if not heading:557 break558 if heading == 'FLAGS':559 self.AddFlags(tree, content, is_global=True)560 # Add the help topics.561 help_command = tree[cli_tree.LOOKUP_COMMANDS]['help']562 help_command[cli_tree.LOOKUP_IS_GROUP] = True563 for topic in self.topics:564 help_command[cli_tree.LOOKUP_COMMANDS][topic] = self.SubTree(565 help_command[cli_tree.LOOKUP_PATH] + [topic])566 # Finally add the version stamps.567 tree[cli_tree.LOOKUP_CLI_VERSION] = self.GetVersion()568 tree[cli_tree.LOOKUP_VERSION] = cli_tree.VERSION569 return tree570class _KubectlCollector(object):571 """Kubectl help document section collector."""572 def __init__(self, text):573 self.text = text.split('\n')574 self.heading = 'DESCRIPTION'575 self.lookahead = None576 self.ignore_trailer = False577 def Collect(self, strip_headings=False):578 """Returns the heading and content lines from text."""579 content = []580 if self.lookahead:581 if not strip_headings:582 content.append(self.lookahead)583 self.lookahead = None584 heading = self.heading585 self.heading = None586 while self.text:587 line = self.text.pop(0)588 usage = 'Usage:'589 if line.startswith(usage):590 line = line[len(usage):].strip()591 if line:592 self.lookahead = line593 self.heading = 'USAGE'594 break595 if line.endswith(':'):596 if 'Commands' in line:597 self.heading = 'COMMANDS'598 break599 if 'Examples' in line:600 self.heading = 'EXAMPLES'601 break602 if 'Options' in line:603 self.heading = 'FLAGS'604 break605 if line.startswith(' ') or not strip_headings and not self.ignore_trailer:606 content.append(line.rstrip())607 while content and not content[0]:608 content.pop(0)609 while content and not content[-1]:610 content.pop()611 self.ignore_trailer = True612 return heading, content613class KubectlCliTreeGenerator(CliTreeGenerator):614 """kubectl CLI tree generator."""615 def AddFlags(self, command, content, is_global=False):616 """Adds flags in content lines to command."""617 for line in content:618 flags, description = line.strip().split(':', 1)619 flag = flags.split(', ')[-1]620 name, value = flag.split('=')621 if value in ('true', 'false'):622 value = ''623 type_ = 'bool'624 else:625 value = 'VALUE'626 type_ = 'string'627 default = ''628 command[cli_tree.LOOKUP_FLAGS][name] = _Flag(629 name=name,630 description=description,631 type_=type_,632 value=value,633 default=default,634 is_required=False,635 is_global=is_global,636 )637 def SubTree(self, path):638 """Generates and returns the CLI subtree rooted at path."""639 command = _Command(path)640 text = self.Run(path + ['--help'])641 collector = _KubectlCollector(text)642 while True:643 heading, content = collector.Collect()644 if not heading:645 break646 elif heading == 'COMMANDS':647 for line in content:648 try:649 name = line.split()[0]650 except IndexError:651 continue652 command[cli_tree.LOOKUP_IS_GROUP] = True653 command[cli_tree.LOOKUP_COMMANDS][name] = self.SubTree(path + [name])654 elif heading in ('DESCRIPTION', 'EXAMPLES'):655 command[cli_tree.LOOKUP_SECTIONS][heading] = '\n'.join(content)656 elif heading == 'FLAGS':657 self.AddFlags(command, content)658 return command659 def GetVersion(self):660 """Returns the CLI_VERSION string."""661 if not self._cli_version:662 try:663 verbose_version = self.Run([self.command, 'version', '--client'])664 match = re.search('GitVersion:"([^"]*)"', verbose_version)665 self._cli_version = match.group(1)666 except: # pylint: disable=bare-except667 self._cli_version = cli_tree.CLI_VERSION_UNKNOWN668 return self._cli_version669 def Generate(self):670 """Generates and returns the CLI tree rooted at self.command."""671 # Construct the tree minus the global flags.672 tree = self.SubTree([self.command_name])673 # Add the global flags to the root.674 text = self.Run([self.command, 'options'])675 collector = _KubectlCollector(text)676 _, content = collector.Collect(strip_headings=True)677 content.append(' --help=true: List detailed command help.')678 self.AddFlags(tree, content, is_global=True)679 # Finally add the version stamps.680 tree[cli_tree.LOOKUP_CLI_VERSION] = self.GetVersion()681 tree[cli_tree.LOOKUP_VERSION] = cli_tree.VERSION682 return tree683class _ManPageCollector(six.with_metaclass(abc.ABCMeta, object)):684 """man page help document section collector base class.685 Attributes:686 command: The man page command name.687 content_indent: A string of space characters representing the indent of688 the first line of content for any section.689 heading: The heading for the next call to Collect().690 text: The list of man page lines.691 version: The collector CLI_VERSION string.692 """693 def __init__(self, command):694 self.command = command695 self.content_indent = None696 self.heading = None697 self.text = self.GetManPageText().split('\n')698 @classmethod699 @abc.abstractmethod700 def GetVersion(cls):701 """Returns the CLI_VERSION string."""702 return None703 @abc.abstractmethod704 def _GetRawManPageText(self):705 """Returns the raw man page text."""706 return None707 @abc.abstractmethod708 def GetManPageText(self):709 """Returns the preprocessed man page text."""710 return None711 def Collect(self):712 """Returns the heading and content lines from text."""713 content = []714 heading = self.heading715 self.heading = None716 while self.text:717 line = self.text.pop(0)718 if not heading:719 # No NAME no man page.720 if line == 'NAME':721 heading = line722 continue723 elif not line:724 pass725 elif line[0] == ' ':726 if not self.content_indent:727 self.content_indent = re.sub('[^ ].*', '', line)728 if len(line) > len(self.content_indent):729 indented_char = line[len(self.content_indent)]730 if not line.startswith(self.content_indent):731 # Subsection heading or category.732 line = '### ' + line.strip()733 elif heading == 'DESCRIPTION' and indented_char == '-':734 # Some man pages, like GNU ls(1), inline flags in DESCRIPTION.735 self.text.insert(0, line)736 self.heading = 'FLAGS'737 break738 elif heading == 'FLAGS' and indented_char not in (' ', '-'):739 self.text.insert(0, line)740 self.heading = 'DESCRIPTION'741 break742 elif line in ('SYNOPSIS', 'DESCRIPTION', 'EXIT STATUS', 'SEE ALSO'):743 self.heading = line744 break745 elif 'FLAGS' in line or 'OPTIONS' in line:746 self.heading = 'FLAGS'747 break748 elif line and line[0].isupper():749 self.heading = line.split(' ', 1)[-1]750 break751 content.append(line.rstrip())752 while content and not content[0]:753 content.pop(0)754 while content and not content[-1]:755 content.pop()756 return heading, content757class _ManCommandCollector(_ManPageCollector):758 """man command help document section collector."""759 _CLI_VERSION = 'man-0.1'760 @classmethod761 def GetVersion(cls):762 return cls._CLI_VERSION763 def _GetRawManPageText(self):764 """Returns the raw man page text."""765 try:766 with files.FileWriter(os.devnull) as f:767 return encoding.Decode(subprocess.check_output(['man', self.command],768 stderr=f))769 except (OSError, subprocess.CalledProcessError):770 raise NoManPageTextForCommand(771 'Cannot get man(1) command man page text for [{}].'.format(772 self.command))773 def GetManPageText(self):774 """Returns the preprocessed man page text."""775 text = self._GetRawManPageText()776 return re.sub(777 '.\b', '', re.sub(778 '(\u2010|\\u2010)\n *', '', text))779class _ManUrlCollector(_ManPageCollector):780 """man URL help document section collector."""781 _CLI_VERSION = 'man7.org-0.1'782 @classmethod783 def GetVersion(cls):784 return cls._CLI_VERSION785 def _GetRawManPageText(self):786 """Returns the raw man page text."""787 url = 'http://man7.org/linux/man-pages/man1/{}.1.html'.format(self.command)788 response, content = http.HttpClient().request(url)789 if response.status != 200:790 raise NoManPageTextForCommand(791 'Cannot get URL man page text for [{}].'.format(self.command))792 return content.decode('utf-8')793 def GetManPageText(self):794 """Returns the text man page for self.command from a URL."""795 text = self._GetRawManPageText()796 # A little sed(1) to clean up the html header/trailer and anchors.797 # A mispplaced .* or (...) group match could affect timing. Keep track798 # of that if you change any of the pattern,replacement tuples.799 for pattern, replacement in (800 ('<span class="footline">.*', ''),801 ('<h2><a id="([^"]*)"[^\n]*\n', '\\1\n'),802 ('<b>( +)', '\\1*'),803 ('( +)</b>', '*\\1'),804 ('<i>( +)', '\\1_'),805 ('( +)</i>', '_\\1'),806 ('</?b>', '*'),807 ('</?i>', '_'),808 ('</pre>', ''),809 ('<a href="([^"]*)">([^\n]*)</a>', '[\\1](\\2)'),810 ('&amp;', '\\&'),811 ('&gt;', '>'),812 ('&lt;', '<'),813 ('&#39;', "'"),814 ):815 text = re.sub(pattern, replacement, text, flags=re.DOTALL)816 # ... and some non-regular edits to finish up.817 lines = []818 top = 'NAME'819 flags = False820 paragraph = False821 for line in text.split('\n'):822 if top and line == 'NAME':823 # Ignore all lines before the NAME section.824 top = None825 lines = []826 if line.startswith(' *-'): # NOTICE: NOT a regex!827 # Drop font embellishment markdown from flag definition list lines.828 flags = True829 if paragraph:830 # Blank line was really a paragraph.831 paragraph = False832 lines.append('')833 if ' ' in line[7:]:834 head, tail = line[7:].split(' ', 1)835 head = re.sub('\\*', '', head)836 line = ' '.join([' ', head, tail])837 else:838 line = re.sub('\\*', '', line)839 elif flags:840 if not line:841 paragraph = True842 continue843 elif not line.startswith(' '):844 flags = False845 paragraph = False846 elif paragraph:847 if not line[0].lower():848 # A real paragraph849 lines.append('+')850 paragraph = False851 lines.append(line)852 return '\n'.join(lines)853class ManPageCliTreeGenerator(CliTreeGenerator):854 """man page CLI tree generator."""855 @classmethod856 def _GetManPageCollectorType(cls):857 """Returns the man page collector type."""858 if files.FindExecutableOnPath('man'):859 return _ManCommandCollector860 return _ManUrlCollector861 def __init__(self, command):862 super(ManPageCliTreeGenerator, self).__init__(command)863 self.collector_type = self._GetManPageCollectorType()864 def GetVersion(self):865 """Returns the CLI_VERSION string."""866 if not self.collector_type:867 return cli_tree.CLI_VERSION_UNKNOWN868 return self.collector_type.GetVersion()869 def AddFlags(self, command, content, is_global=False):870 """Adds flags in content lines to command."""871 def _NameTypeValueNargs(name, type_=None, value=None, nargs=None):872 """Returns the (name, type, value-metavar, nargs) for flag name."""873 if name.startswith('--'):874 # --foo875 if '=' in name:876 name, value = name.split('=', 1)877 if name[-1] == '[':878 name = name[:-1]879 if value.endswith(']'):880 value = value[:-1]881 nargs = '?'882 else:883 nargs = '1'884 type_ = 'string'885 elif len(name) > 2:886 # -b VALUE887 value = name[2:]888 if value[0].isspace():889 value = value[1:]890 if value.startswith('['):891 value = value[1:]892 if value.endswith(']'):893 value = value[:-1]894 nargs = '?'895 else:896 nargs = '1'897 name = name[:2]898 type_ = 'string'899 if type_ is None or value is None or nargs is None:900 type_ = 'bool'901 value = ''902 nargs = '0'903 return (name, type_, value, nargs)904 def _Add(name, description, category, type_, value, nargs):905 """Adds a flag."""906 name, type_, value, nargs = _NameTypeValueNargs(name, type_, value, nargs)907 default = ''908 command[cli_tree.LOOKUP_FLAGS][name] = _Flag(909 name=name,910 description='\n'.join(description),911 type_=type_,912 value=value,913 nargs=nargs,914 category=category,915 default=default,916 is_required=False,917 is_global=is_global,918 )919 def _AddNames(names, description, category):920 """Add a flag name list."""921 if names:922 _, type_, value, nargs = _NameTypeValueNargs(names[-1])923 for name in names:924 _Add(name, description, category, type_, value, nargs)925 names = []926 description = []927 category = ''928 for line in content:929 if line.lstrip().startswith('-'):930 _AddNames(names, description, category)931 line = line.lstrip()932 names = line.strip().replace(', -', ', --').split(', -')933 if ' ' in names[-1]:934 names[-1], text = names[-1].split(' ', 1)935 description = [text.strip()]936 else:937 description = []938 elif line.startswith('### '):939 category = line[4:]940 else:941 description.append(line)942 _AddNames(names, description, category)943 def _Generate(self, path, collector):944 """Generates and returns the CLI subtree rooted at path."""945 command = _Command(path)946 while True:947 heading, content = collector.Collect()948 if not heading:949 break950 elif heading == 'NAME':951 if content:952 command[cli_tree.LOOKUP_CAPSULE] = re.sub(953 '.* -+ *', '', content[0]).strip()954 elif heading == 'FLAGS':955 self.AddFlags(command, content)956 elif heading not in ('BUGS', 'COLOPHON', 'COMPATIBILITY', 'HISTORY',957 'STANDARDS', 'SYNOPSIS'):958 blocks = []959 begin = 0960 end = 0961 while end < len(content):962 if content[end].startswith('###'):963 if begin < end:964 blocks.append(_NormalizeSpace('\n'.join(content[begin:end])))965 blocks.append(content[end])966 begin = end + 1967 end += 1968 if begin < end:969 blocks.append(_NormalizeSpace('\n'.join(content[begin:end])))970 text = '\n'.join(blocks)971 if heading in command[cli_tree.LOOKUP_SECTIONS]:972 command[cli_tree.LOOKUP_SECTIONS][heading] += '\n\n' + text973 else:974 command[cli_tree.LOOKUP_SECTIONS][heading] = text975 return command976 def Generate(self):977 """Generates and returns the CLI tree rooted at self.command."""978 if not self.collector_type:979 return None980 collector = self.collector_type(self.command)981 if not collector:982 return None983 tree = self._Generate([self.command], collector)984 if not tree:985 return None986 # Add the version stamps.987 tree[cli_tree.LOOKUP_CLI_VERSION] = self.GetVersion()988 tree[cli_tree.LOOKUP_VERSION] = cli_tree.VERSION989 return tree990GENERATORS = {991 'bq': BqCliTreeGenerator,992 'gsutil': GsutilCliTreeGenerator,993 'kubectl': KubectlCliTreeGenerator,994}995def LoadOrGenerate(command, directories=None, tarball=None, force=False,996 generate=True, ignore_out_of_date=False, verbose=False,997 warn_on_exceptions=False, allow_extensions=False):998 """Returns the CLI tree for command, generating it if it does not exist.999 Args:1000 command: The CLI root command name.1001 directories: The list of directories containing the CLI tree JSON files.1002 If None then the default installation directories are used.1003 tarball: For packaging CLI trees. --commands specifies one command that is1004 a relative path in this tarball. The tarball is extracted to a temporary1005 directory and the command path is adjusted to point to the temporary1006 directory.1007 force: Update all exitsing trees by forcing them to be out of date if True.1008 generate: Generate the tree if it is out of date or does not exist.1009 ignore_out_of_date: Ignore out of date trees instead of regenerating.1010 verbose: Display a status line for up to date CLI trees if True.1011 warn_on_exceptions: Emits warning messages in lieu of generator exceptions.1012 Used during installation.1013 allow_extensions: Whether or not to allow extensions in executable names.1014 Returns:1015 The CLI tree for command or None if command not found or there is no1016 generator.1017 """1018 command_dir, command_name = os.path.split(command)1019 # Handle package time command names.1020 if command_name.endswith('_lite'):1021 command_name = command_name[:-5]1022 # Don't repeat failed attempts.1023 if CliTreeGenerator.AlreadyFailed(command_name):1024 if verbose:1025 log.status.Print('No CLI tree generator for [{}].'.format(command))1026 return None1027 def _LoadOrGenerate(command, command_dir, command_name):1028 """Helper."""1029 # The command must exist.1030 if (command_dir and not os.path.exists(command) or1031 not command_dir1032 and not files.FindExecutableOnPath(command_name,1033 allow_extensions=allow_extensions)):1034 if verbose:1035 log.status.Print('Command [{}] not found.'.format(command))1036 return None1037 # Instantiate the appropriate generator.1038 try:1039 generator = GENERATORS[command_name](1040 command, command_name=command_name, tarball=tarball)1041 except KeyError:1042 generator = ManPageCliTreeGenerator(command_name)1043 if not generator:1044 return None1045 # Load or (re)generate the CLI tree if possible.1046 try:1047 return generator.LoadOrGenerate(1048 directories=directories,1049 force=force,1050 generate=generate,1051 ignore_out_of_date=ignore_out_of_date,1052 verbose=verbose,1053 warn_on_exceptions=warn_on_exceptions)1054 except NoManPageTextForCommand:1055 pass1056 return None1057 if command_name == cli_tree.DEFAULT_CLI_NAME:1058 tree = cli_tree.Load()1059 elif tarball:1060 with files.TemporaryDirectory() as tmp:1061 tar = tarfile.open(tarball)1062 tar.extractall(tmp)1063 command = os.path.join(tmp, command)1064 command_dir = os.path.join(tmp, command_dir)1065 tree = _LoadOrGenerate(command, command_dir, command_name)1066 else:1067 tree = _LoadOrGenerate(command, command_dir, command_name)1068 if not tree:1069 CliTreeGenerator.AddFailure(command_name)1070 return tree1071def UpdateCliTrees(cli=None, commands=None, directory=None, tarball=None,1072 force=False, verbose=False, warn_on_exceptions=False):1073 """(re)generates the CLI trees in directory if non-existent or out of date.1074 This function uses the progress tracker because some of the updates can1075 take ~minutes.1076 Args:1077 cli: The default CLI. If not None then the default CLI is also updated.1078 commands: Update only the commands in this list.1079 directory: The directory containing the CLI tree JSON files. If None1080 then the default installation directories are used.1081 tarball: For packaging CLI trees. --commands specifies one command that is1082 a relative path in this tarball. The tarball is extracted to a temporary1083 directory and the command path is adjusted to point to the temporary1084 directory.1085 force: Update all exitsing trees by forcing them to be out of date if True.1086 verbose: Display a status line for up to date CLI trees if True.1087 warn_on_exceptions: Emits warning messages in lieu of exceptions. Used1088 during installation.1089 Raises:1090 NoCliTreeGeneratorForCommand: A command in commands is not supported1091 (doesn't have a generator).1092 """1093 directories = _GetDirectories(1094 directory=directory, warn_on_exceptions=warn_on_exceptions)1095 if not commands:1096 commands = set([cli_tree.DEFAULT_CLI_NAME] + list(GENERATORS.keys()))1097 failed = []1098 for command in sorted(commands):1099 if command != cli_tree.DEFAULT_CLI_NAME:1100 tree = LoadOrGenerate(command,1101 directories=directories,1102 tarball=tarball,1103 force=force,1104 verbose=verbose,1105 warn_on_exceptions=warn_on_exceptions)1106 if not tree:1107 failed.append(command)1108 elif cli:1109 def _Mtime(path):1110 try:1111 return os.path.getmtime(path)1112 except OSError:1113 return 01114 # Update the CLI tree.1115 cli_tree_path = cli_tree.CliTreePath(directory=directories[0])1116 cli_tree.Load(cli=cli, path=cli_tree_path, force=force, verbose=verbose)1117 # Update the static completion CLI tree if older than the CLI tree. To1118 # keep static completion startup lightweight we don't track the release1119 # in the tree data. Using the modify time is a minimal sanity check.1120 completion_tree_path = lookup.CompletionCliTreePath(1121 directory=directories[0])1122 cli_tree_mtime = _Mtime(cli_tree_path)1123 completion_tree_mtime = _Mtime(completion_tree_path)1124 if (force or not completion_tree_mtime or1125 completion_tree_mtime < cli_tree_mtime):1126 files.MakeDir(os.path.dirname(completion_tree_path))1127 with files.FileWriter(completion_tree_path) as f:1128 generate_static.ListCompletionTree(cli, out=f)1129 elif verbose:1130 log.status.Print(1131 '[{}] static completion CLI tree is up to date.'.format(command))1132 if failed:1133 message = 'No CLI tree {} for [{}].'.format(1134 text_utils.Pluralize(len(failed), 'generator'),1135 ', '.join(sorted(failed)))1136 if not warn_on_exceptions:1137 raise NoCliTreeGeneratorForCommand(message)1138 log.warning(message)1139def LoadAll(directory=None, ignore_out_of_date=False, root=None,1140 warn_on_exceptions=True):1141 """Loads all CLI trees in directory and adds them to tree.1142 Args:1143 directory: The config directory containing the CLI tree modules.1144 ignore_out_of_date: Ignore out of date trees instead of regenerating.1145 root: dict, The CLI root to update. A new root is created if None.1146 warn_on_exceptions: Warn on exceptions instead of raising if True.1147 Raises:1148 CliTreeVersionError: loaded tree version mismatch1149 ImportModuleError: import errors1150 Returns:1151 The CLI tree.1152 """1153 # Create the root node if needed.1154 if root is None:1155 root = cli_tree.Node(description='The CLI tree root.')1156 # Load the default CLI if available.1157 if cli_tree.DEFAULT_CLI_NAME not in root[cli_tree.LOOKUP_COMMANDS]:1158 try:1159 root[cli_tree.LOOKUP_COMMANDS][cli_tree.DEFAULT_CLI_NAME] = (1160 cli_tree.Load())1161 except cli_tree.CliTreeLoadError:1162 pass1163 # Load extra CLIs by searching directories in order. .json files are treated1164 # as CLI modules/data, where the file base name is the name of the CLI root1165 # command.1166 directories = _GetDirectories(1167 directory=directory, warn_on_exceptions=warn_on_exceptions)1168 loaded = {cli_tree.DEFAULT_CLI_NAME, '__init__'} # Already loaded this above.1169 for directory in directories:1170 if not directory or not os.path.exists(directory):1171 continue1172 for (dirpath, _, filenames) in os.walk(directory):1173 for filename in sorted(filenames): # For stability across runs.1174 command, extension = os.path.splitext(filename)1175 if extension != '.json':1176 continue1177 if command in loaded:1178 # Already loaded. Earlier directory hits take precedence.1179 continue1180 loaded.add(command)1181 if command == cli_tree.DEFAULT_CLI_NAME:1182 tree = cli_tree.Load(os.path.join(dirpath, filename))1183 else:1184 tree = LoadOrGenerate(command,1185 directories=[dirpath],1186 ignore_out_of_date=ignore_out_of_date,1187 warn_on_exceptions=warn_on_exceptions)1188 if tree:1189 root[cli_tree.LOOKUP_COMMANDS][command] = tree1190 # Don't search subdirectories.1191 break...

Full Screen

Full Screen

client_test.py

Source:client_test.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""3unit tests4"""5import json6import requests7import requests.exceptions8import socket9import sys10import requests_mock11import random12from nose.tools import raises13from mock import patch14import warnings15import mock16from influxdb.influxdb08 import InfluxDBClient17from influxdb.influxdb08.client import session18if sys.version < '3':19 import codecs20 def u(x):21 return codecs.unicode_escape_decode(x)[0]22else:23 def u(x):24 return x25if sys.version_info[:2] <= (2, 6):26 import unittest2 as unittest27else:28 import unittest29def _build_response_object(status_code=200, content=""):30 resp = requests.Response()31 resp.status_code = status_code32 resp._content = content.encode("utf8")33 return resp34def _mocked_session(method="GET", status_code=200, content=""):35 method = method.upper()36 def request(*args, **kwargs):37 c = content38 # Check method39 assert method == kwargs.get('method', 'GET')40 if method == 'POST':41 data = kwargs.get('data', None)42 if data is not None:43 # Data must be a string44 assert isinstance(data, str)45 # Data must be a JSON string46 assert c == json.loads(data, strict=True)47 c = data48 # Anyway, Content must be a JSON string (or empty string)49 if not isinstance(c, str):50 c = json.dumps(c)51 return _build_response_object(status_code=status_code, content=c)52 mocked = patch.object(53 session,54 'request',55 side_effect=request56 )57 return mocked58class TestInfluxDBClient(unittest.TestCase):59 def setUp(self):60 # By default, raise exceptions on warnings61 warnings.simplefilter('error', FutureWarning)62 self.dummy_points = [63 {64 "points": [65 ["1", 1, 1.0],66 ["2", 2, 2.0]67 ],68 "name": "foo",69 "columns": ["column_one", "column_two", "column_three"]70 }71 ]72 self.dsn_string = 'influxdb://uSr:pWd@host:1886/db'73 def test_scheme(self):74 cli = InfluxDBClient('host', 8086, 'username', 'password', 'database')75 self.assertEqual(cli._baseurl, 'http://host:8086')76 cli = InfluxDBClient(77 'host', 8086, 'username', 'password', 'database', ssl=True78 )79 self.assertEqual(cli._baseurl, 'https://host:8086')80 def test_dsn(self):81 cli = InfluxDBClient.from_DSN(self.dsn_string)82 self.assertEqual('http://host:1886', cli._baseurl)83 self.assertEqual('uSr', cli._username)84 self.assertEqual('pWd', cli._password)85 self.assertEqual('db', cli._database)86 self.assertFalse(cli.use_udp)87 cli = InfluxDBClient.from_DSN('udp+' + self.dsn_string)88 self.assertTrue(cli.use_udp)89 cli = InfluxDBClient.from_DSN('https+' + self.dsn_string)90 self.assertEqual('https://host:1886', cli._baseurl)91 cli = InfluxDBClient.from_DSN('https+' + self.dsn_string,92 **{'ssl': False})93 self.assertEqual('http://host:1886', cli._baseurl)94 def test_switch_database(self):95 cli = InfluxDBClient('host', 8086, 'username', 'password', 'database')96 cli.switch_database('another_database')97 self.assertEqual(cli._database, 'another_database')98 @raises(FutureWarning)99 def test_switch_db_deprecated(self):100 cli = InfluxDBClient('host', 8086, 'username', 'password', 'database')101 cli.switch_db('another_database')102 self.assertEqual(cli._database, 'another_database')103 def test_switch_user(self):104 cli = InfluxDBClient('host', 8086, 'username', 'password', 'database')105 cli.switch_user('another_username', 'another_password')106 self.assertEqual(cli._username, 'another_username')107 self.assertEqual(cli._password, 'another_password')108 def test_write(self):109 with requests_mock.Mocker() as m:110 m.register_uri(111 requests_mock.POST,112 "http://localhost:8086/write"113 )114 cli = InfluxDBClient(database='db')115 cli.write(116 {"database": "mydb",117 "retentionPolicy": "mypolicy",118 "points": [{"name": "cpu_load_short",119 "tags": {"host": "server01",120 "region": "us-west"},121 "timestamp": "2009-11-10T23:00:00Z",122 "values": {"value": 0.64}}]}123 )124 self.assertEqual(125 json.loads(m.last_request.body),126 {"database": "mydb",127 "retentionPolicy": "mypolicy",128 "points": [{"name": "cpu_load_short",129 "tags": {"host": "server01",130 "region": "us-west"},131 "timestamp": "2009-11-10T23:00:00Z",132 "values": {"value": 0.64}}]}133 )134 def test_write_points(self):135 with requests_mock.Mocker() as m:136 m.register_uri(137 requests_mock.POST,138 "http://localhost:8086/db/db/series"139 )140 cli = InfluxDBClient(database='db')141 cli.write_points(142 self.dummy_points143 )144 self.assertListEqual(145 json.loads(m.last_request.body),146 self.dummy_points147 )148 def test_write_points_string(self):149 with requests_mock.Mocker() as m:150 m.register_uri(151 requests_mock.POST,152 "http://localhost:8086/db/db/series"153 )154 cli = InfluxDBClient(database='db')155 cli.write_points(156 str(json.dumps(self.dummy_points))157 )158 self.assertListEqual(159 json.loads(m.last_request.body),160 self.dummy_points161 )162 def test_write_points_batch(self):163 with requests_mock.Mocker() as m:164 m.register_uri(requests_mock.POST,165 "http://localhost:8086/db/db/series")166 cli = InfluxDBClient('localhost', 8086,167 'username', 'password', 'db')168 cli.write_points(data=self.dummy_points, batch_size=2)169 self.assertEqual(1, m.call_count)170 def test_write_points_batch_invalid_size(self):171 with requests_mock.Mocker() as m:172 m.register_uri(requests_mock.POST,173 "http://localhost:8086/db/db/series")174 cli = InfluxDBClient('localhost', 8086,175 'username', 'password', 'db')176 cli.write_points(data=self.dummy_points, batch_size=-2)177 self.assertEqual(1, m.call_count)178 def test_write_points_batch_multiple_series(self):179 dummy_points = [180 {"points": [["1", 1, 1.0], ["2", 2, 2.0], ["3", 3, 3.0],181 ["4", 4, 4.0], ["5", 5, 5.0]],182 "name": "foo",183 "columns": ["val1", "val2", "val3"]},184 {"points": [["1", 1, 1.0], ["2", 2, 2.0], ["3", 3, 3.0],185 ["4", 4, 4.0], ["5", 5, 5.0], ["6", 6, 6.0],186 ["7", 7, 7.0], ["8", 8, 8.0]],187 "name": "bar",188 "columns": ["val1", "val2", "val3"]},189 ]190 expected_last_body = [{'points': [['7', 7, 7.0], ['8', 8, 8.0]],191 'name': 'bar',192 'columns': ['val1', 'val2', 'val3']}]193 with requests_mock.Mocker() as m:194 m.register_uri(requests_mock.POST,195 "http://localhost:8086/db/db/series")196 cli = InfluxDBClient('localhost', 8086,197 'username', 'password', 'db')198 cli.write_points(data=dummy_points, batch_size=3)199 self.assertEqual(m.call_count, 5)200 self.assertEqual(expected_last_body, m.request_history[4].json())201 def test_write_points_udp(self):202 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)203 port = random.randint(4000, 8000)204 s.bind(('0.0.0.0', port))205 cli = InfluxDBClient(206 'localhost', 8086, 'root', 'root',207 'test', use_udp=True, udp_port=port208 )209 cli.write_points(self.dummy_points)210 received_data, addr = s.recvfrom(1024)211 self.assertEqual(self.dummy_points,212 json.loads(received_data.decode(), strict=True))213 def test_write_bad_precision_udp(self):214 cli = InfluxDBClient(215 'localhost', 8086, 'root', 'root',216 'test', use_udp=True, udp_port=4444217 )218 with self.assertRaisesRegexp(219 Exception,220 "InfluxDB only supports seconds precision for udp writes"221 ):222 cli.write_points(223 self.dummy_points,224 time_precision='ms'225 )226 @raises(Exception)227 def test_write_points_fails(self):228 with _mocked_session('post', 500):229 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')230 cli.write_points([])231 def test_write_points_with_precision(self):232 with _mocked_session('post', 200, self.dummy_points):233 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')234 self.assertTrue(cli.write_points(self.dummy_points))235 def test_write_points_bad_precision(self):236 cli = InfluxDBClient()237 with self.assertRaisesRegexp(238 Exception,239 "Invalid time precision is given. \(use 's', 'm', 'ms' or 'u'\)"240 ):241 cli.write_points(242 self.dummy_points,243 time_precision='g'244 )245 @raises(Exception)246 def test_write_points_with_precision_fails(self):247 with _mocked_session('post', 500):248 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')249 cli.write_points_with_precision([])250 def test_delete_points(self):251 with _mocked_session('delete', 204) as mocked:252 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')253 self.assertTrue(cli.delete_points("foo"))254 self.assertEqual(len(mocked.call_args_list), 1)255 args, kwds = mocked.call_args_list[0]256 self.assertEqual(kwds['params'],257 {'u': 'username', 'p': 'password'})258 self.assertEqual(kwds['url'], 'http://host:8086/db/db/series/foo')259 @raises(Exception)260 def test_delete_points_with_wrong_name(self):261 with _mocked_session('delete', 400):262 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')263 cli.delete_points("nonexist")264 @raises(NotImplementedError)265 def test_create_scheduled_delete(self):266 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')267 cli.create_scheduled_delete([])268 @raises(NotImplementedError)269 def test_get_list_scheduled_delete(self):270 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')271 cli.get_list_scheduled_delete()272 @raises(NotImplementedError)273 def test_remove_scheduled_delete(self):274 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')275 cli.remove_scheduled_delete(1)276 def test_query(self):277 data = [278 {279 "name": "foo",280 "columns": ["time", "sequence_number", "column_one"],281 "points": [282 [1383876043, 16, "2"], [1383876043, 15, "1"],283 [1383876035, 14, "2"], [1383876035, 13, "1"]284 ]285 }286 ]287 with _mocked_session('get', 200, data):288 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')289 result = cli.query('select column_one from foo;')290 self.assertEqual(len(result[0]['points']), 4)291 def test_query_chunked(self):292 cli = InfluxDBClient(database='db')293 example_object = {294 'points': [295 [1415206250119, 40001, 667],296 [1415206244555, 30001, 7],297 [1415206228241, 20001, 788],298 [1415206212980, 10001, 555],299 [1415197271586, 10001, 23]300 ],301 'name': 'foo',302 'columns': [303 'time',304 'sequence_number',305 'val'306 ]307 }308 example_response = \309 json.dumps(example_object) + json.dumps(example_object)310 with requests_mock.Mocker() as m:311 m.register_uri(312 requests_mock.GET,313 "http://localhost:8086/db/db/series",314 text=example_response315 )316 self.assertListEqual(317 cli.query('select * from foo', chunked=True),318 [example_object, example_object]319 )320 def test_query_chunked_unicode(self):321 cli = InfluxDBClient(database='db')322 example_object = {323 'points': [324 [1415206212980, 10001, u('unicode-\xcf\x89')],325 [1415197271586, 10001, u('more-unicode-\xcf\x90')]326 ],327 'name': 'foo',328 'columns': [329 'time',330 'sequence_number',331 'val'332 ]333 }334 example_response = \335 json.dumps(example_object) + json.dumps(example_object)336 with requests_mock.Mocker() as m:337 m.register_uri(338 requests_mock.GET,339 "http://localhost:8086/db/db/series",340 text=example_response341 )342 self.assertListEqual(343 cli.query('select * from foo', chunked=True),344 [example_object, example_object]345 )346 @raises(Exception)347 def test_query_fail(self):348 with _mocked_session('get', 401):349 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')350 cli.query('select column_one from foo;')351 def test_query_bad_precision(self):352 cli = InfluxDBClient()353 with self.assertRaisesRegexp(354 Exception,355 "Invalid time precision is given. \(use 's', 'm', 'ms' or 'u'\)"356 ):357 cli.query('select column_one from foo', time_precision='g')358 def test_create_database(self):359 with _mocked_session('post', 201, {"name": "new_db"}):360 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')361 self.assertTrue(cli.create_database('new_db'))362 @raises(Exception)363 def test_create_database_fails(self):364 with _mocked_session('post', 401):365 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')366 cli.create_database('new_db')367 def test_delete_database(self):368 with _mocked_session('delete', 204):369 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')370 self.assertTrue(cli.delete_database('old_db'))371 @raises(Exception)372 def test_delete_database_fails(self):373 with _mocked_session('delete', 401):374 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')375 cli.delete_database('old_db')376 def test_get_list_database(self):377 data = [378 {"name": "a_db"}379 ]380 with _mocked_session('get', 200, data):381 cli = InfluxDBClient('host', 8086, 'username', 'password')382 self.assertEqual(len(cli.get_list_database()), 1)383 self.assertEqual(cli.get_list_database()[0]['name'], 'a_db')384 @raises(Exception)385 def test_get_list_database_fails(self):386 with _mocked_session('get', 401):387 cli = InfluxDBClient('host', 8086, 'username', 'password')388 cli.get_list_database()389 @raises(FutureWarning)390 def test_get_database_list_deprecated(self):391 data = [392 {"name": "a_db"}393 ]394 with _mocked_session('get', 200, data):395 cli = InfluxDBClient('host', 8086, 'username', 'password')396 self.assertEqual(len(cli.get_database_list()), 1)397 self.assertEqual(cli.get_database_list()[0]['name'], 'a_db')398 def test_delete_series(self):399 with _mocked_session('delete', 204):400 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')401 cli.delete_series('old_series')402 @raises(Exception)403 def test_delete_series_fails(self):404 with _mocked_session('delete', 401):405 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')406 cli.delete_series('old_series')407 def test_get_series_list(self):408 cli = InfluxDBClient(database='db')409 with requests_mock.Mocker() as m:410 example_response = \411 '[{"name":"list_series_result","columns":' \412 '["time","name"],"points":[[0,"foo"],[0,"bar"]]}]'413 m.register_uri(414 requests_mock.GET,415 "http://localhost:8086/db/db/series",416 text=example_response417 )418 self.assertListEqual(419 cli.get_list_series(),420 ['foo', 'bar']421 )422 def test_get_continuous_queries(self):423 cli = InfluxDBClient(database='db')424 with requests_mock.Mocker() as m:425 # Tip: put this in a json linter!426 example_response = '[ { "name": "continuous queries", "columns"' \427 ': [ "time", "id", "query" ], "points": [ [ ' \428 '0, 1, "select foo(bar,95) from \\"foo_bar' \429 's\\" group by time(5m) into response_times.' \430 'percentiles.5m.95" ], [ 0, 2, "select perce' \431 'ntile(value,95) from \\"response_times\\" g' \432 'roup by time(5m) into response_times.percen' \433 'tiles.5m.95" ] ] } ]'434 m.register_uri(435 requests_mock.GET,436 "http://localhost:8086/db/db/series",437 text=example_response438 )439 self.assertListEqual(440 cli.get_list_continuous_queries(),441 [442 'select foo(bar,95) from "foo_bars" group '443 'by time(5m) into response_times.percentiles.5m.95',444 'select percentile(value,95) from "response_times" group '445 'by time(5m) into response_times.percentiles.5m.95'446 ]447 )448 def test_get_list_cluster_admins(self):449 pass450 def test_add_cluster_admin(self):451 with requests_mock.Mocker() as m:452 m.register_uri(453 requests_mock.POST,454 "http://localhost:8086/cluster_admins"455 )456 cli = InfluxDBClient(database='db')457 cli.add_cluster_admin(458 new_username='paul',459 new_password='laup'460 )461 self.assertDictEqual(462 json.loads(m.last_request.body),463 {464 'name': 'paul',465 'password': 'laup'466 }467 )468 def test_update_cluster_admin_password(self):469 with requests_mock.Mocker() as m:470 m.register_uri(471 requests_mock.POST,472 "http://localhost:8086/cluster_admins/paul"473 )474 cli = InfluxDBClient(database='db')475 cli.update_cluster_admin_password(476 username='paul',477 new_password='laup'478 )479 self.assertDictEqual(480 json.loads(m.last_request.body),481 {'password': 'laup'}482 )483 def test_delete_cluster_admin(self):484 with requests_mock.Mocker() as m:485 m.register_uri(486 requests_mock.DELETE,487 "http://localhost:8086/cluster_admins/paul",488 status_code=200,489 )490 cli = InfluxDBClient(database='db')491 cli.delete_cluster_admin(username='paul')492 self.assertIsNone(m.last_request.body)493 def test_set_database_admin(self):494 pass495 def test_unset_database_admin(self):496 pass497 def test_alter_database_admin(self):498 with requests_mock.Mocker() as m:499 m.register_uri(500 requests_mock.POST,501 "http://localhost:8086/db/db/users/paul"502 )503 cli = InfluxDBClient(database='db')504 cli.alter_database_admin(505 username='paul',506 is_admin=False507 )508 self.assertDictEqual(509 json.loads(m.last_request.body),510 {511 'admin': False512 }513 )514 @raises(NotImplementedError)515 def test_get_list_database_admins(self):516 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')517 cli.get_list_database_admins()518 @raises(NotImplementedError)519 def test_add_database_admin(self):520 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')521 cli.add_database_admin('admin', 'admin_secret_password')522 @raises(NotImplementedError)523 def test_update_database_admin_password(self):524 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')525 cli.update_database_admin_password('admin', 'admin_secret_password')526 @raises(NotImplementedError)527 def test_delete_database_admin(self):528 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')529 cli.delete_database_admin('admin')530 def test_get_database_users(self):531 cli = InfluxDBClient('localhost', 8086, 'username', 'password', 'db')532 example_response = \533 '[{"name":"paul","isAdmin":false,"writeTo":".*","readFrom":".*"},'\534 '{"name":"bobby","isAdmin":false,"writeTo":".*","readFrom":".*"}]'535 with requests_mock.Mocker() as m:536 m.register_uri(537 requests_mock.GET,538 "http://localhost:8086/db/db/users",539 text=example_response540 )541 users = cli.get_database_users()542 self.assertEqual(json.loads(example_response), users)543 def test_add_database_user(self):544 with requests_mock.Mocker() as m:545 m.register_uri(546 requests_mock.POST,547 "http://localhost:8086/db/db/users"548 )549 cli = InfluxDBClient(database='db')550 cli.add_database_user(551 new_username='paul',552 new_password='laup',553 permissions=('.*', '.*')554 )555 self.assertDictEqual(556 json.loads(m.last_request.body),557 {558 'writeTo': '.*',559 'password': 'laup',560 'readFrom': '.*',561 'name': 'paul'562 }563 )564 def test_add_database_user_bad_permissions(self):565 cli = InfluxDBClient()566 with self.assertRaisesRegexp(567 Exception,568 "'permissions' must be \(readFrom, writeTo\) tuple"569 ):570 cli.add_database_user(571 new_password='paul',572 new_username='paul',573 permissions=('hello', 'hello', 'hello')574 )575 def test_alter_database_user_password(self):576 with requests_mock.Mocker() as m:577 m.register_uri(578 requests_mock.POST,579 "http://localhost:8086/db/db/users/paul"580 )581 cli = InfluxDBClient(database='db')582 cli.alter_database_user(583 username='paul',584 password='n3wp4ss!'585 )586 self.assertDictEqual(587 json.loads(m.last_request.body),588 {589 'password': 'n3wp4ss!'590 }591 )592 def test_alter_database_user_permissions(self):593 with requests_mock.Mocker() as m:594 m.register_uri(595 requests_mock.POST,596 "http://localhost:8086/db/db/users/paul"597 )598 cli = InfluxDBClient(database='db')599 cli.alter_database_user(600 username='paul',601 permissions=('^$', '.*')602 )603 self.assertDictEqual(604 json.loads(m.last_request.body),605 {606 'readFrom': '^$',607 'writeTo': '.*'608 }609 )610 def test_alter_database_user_password_and_permissions(self):611 with requests_mock.Mocker() as m:612 m.register_uri(613 requests_mock.POST,614 "http://localhost:8086/db/db/users/paul"615 )616 cli = InfluxDBClient(database='db')617 cli.alter_database_user(618 username='paul',619 password='n3wp4ss!',620 permissions=('^$', '.*')621 )622 self.assertDictEqual(623 json.loads(m.last_request.body),624 {625 'password': 'n3wp4ss!',626 'readFrom': '^$',627 'writeTo': '.*'628 }629 )630 def test_update_database_user_password_current_user(self):631 cli = InfluxDBClient(632 username='root',633 password='hello',634 database='database'635 )636 with requests_mock.Mocker() as m:637 m.register_uri(638 requests_mock.POST,639 "http://localhost:8086/db/database/users/root"640 )641 cli.update_database_user_password(642 username='root',643 new_password='bye'644 )645 self.assertEqual(cli._password, 'bye')646 def test_delete_database_user(self):647 with requests_mock.Mocker() as m:648 m.register_uri(649 requests_mock.DELETE,650 "http://localhost:8086/db/db/users/paul"651 )652 cli = InfluxDBClient(database='db')653 cli.delete_database_user(username='paul')654 self.assertIsNone(m.last_request.body)655 @raises(NotImplementedError)656 def test_update_permission(self):657 cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')658 cli.update_permission('admin', [])659 @mock.patch('requests.Session.request')660 def test_request_retry(self, mock_request):661 """Tests that two connection errors will be handled"""662 class CustomMock(object):663 i = 0664 def connection_error(self, *args, **kwargs):665 self.i += 1666 if self.i < 3:667 raise requests.exceptions.ConnectionError668 else:669 r = requests.Response()670 r.status_code = 200671 return r672 mock_request.side_effect = CustomMock().connection_error673 cli = InfluxDBClient(database='db')674 cli.write_points(675 self.dummy_points676 )677 @mock.patch('requests.Session.request')678 def test_request_retry_raises(self, mock_request):679 """Tests that three connection errors will not be handled"""680 class CustomMock(object):681 i = 0682 def connection_error(self, *args, **kwargs):683 self.i += 1684 if self.i < 4:685 raise requests.exceptions.ConnectionError686 else:687 r = requests.Response()688 r.status_code = 200689 return r690 mock_request.side_effect = CustomMock().connection_error691 cli = InfluxDBClient(database='db')692 with self.assertRaises(requests.exceptions.ConnectionError):...

Full Screen

Full Screen

cli.py

Source:cli.py Github

copy

Full Screen

1"""2Filters that accept a `CommandLineInterface` as argument.3"""4from __future__ import unicode_literals5from .base import Filter6from prompt_toolkit.enums import EditingMode7from prompt_toolkit.key_binding.vi_state import InputMode as ViInputMode8from prompt_toolkit.cache import memoized9__all__ = (10 'HasArg',11 'HasCompletions',12 'HasFocus',13 'InFocusStack',14 'HasSearch',15 'HasSelection',16 'HasValidationError',17 'IsAborting',18 'IsDone',19 'IsMultiline',20 'IsReadOnly',21 'IsReturning',22 'RendererHeightIsKnown',23 'InEditingMode',24 # Vi modes.25 'ViMode',26 'ViNavigationMode',27 'ViInsertMode',28 'ViInsertMultipleMode',29 'ViReplaceMode',30 'ViSelectionMode',31 'ViWaitingForTextObjectMode',32 'ViDigraphMode',33 # Emacs modes.34 'EmacsMode',35 'EmacsInsertMode',36 'EmacsSelectionMode',37)38@memoized()39class HasFocus(Filter):40 """41 Enable when this buffer has the focus.42 """43 def __init__(self, buffer_name):44 self._buffer_name = buffer_name45 @property46 def buffer_name(self):47 " The given buffer name. (Read-only) "48 return self._buffer_name49 def __call__(self, cli):50 return cli.current_buffer_name == self.buffer_name51 def __repr__(self):52 return 'HasFocus(%r)' % self.buffer_name53@memoized()54class InFocusStack(Filter):55 """56 Enable when this buffer appears on the focus stack.57 """58 def __init__(self, buffer_name):59 self._buffer_name = buffer_name60 @property61 def buffer_name(self):62 " The given buffer name. (Read-only) "63 return self._buffer_name64 def __call__(self, cli):65 return self.buffer_name in cli.buffers.focus_stack66 def __repr__(self):67 return 'InFocusStack(%r)' % self.buffer_name68@memoized()69class HasSelection(Filter):70 """71 Enable when the current buffer has a selection.72 """73 def __call__(self, cli):74 return bool(cli.current_buffer.selection_state)75 def __repr__(self):76 return 'HasSelection()'77@memoized()78class HasCompletions(Filter):79 """80 Enable when the current buffer has completions.81 """82 def __call__(self, cli):83 return cli.current_buffer.complete_state is not None84 def __repr__(self):85 return 'HasCompletions()'86@memoized()87class IsMultiline(Filter):88 """89 Enable in multiline mode.90 """91 def __call__(self, cli):92 return cli.current_buffer.is_multiline()93 def __repr__(self):94 return 'IsMultiline()'95@memoized()96class IsReadOnly(Filter):97 """98 True when the current buffer is read only.99 """100 def __call__(self, cli):101 return cli.current_buffer.read_only()102 def __repr__(self):103 return 'IsReadOnly()'104@memoized()105class HasValidationError(Filter):106 """107 Current buffer has validation error.108 """109 def __call__(self, cli):110 return cli.current_buffer.validation_error is not None111 def __repr__(self):112 return 'HasValidationError()'113@memoized()114class HasArg(Filter):115 """116 Enable when the input processor has an 'arg'.117 """118 def __call__(self, cli):119 return cli.input_processor.arg is not None120 def __repr__(self):121 return 'HasArg()'122@memoized()123class HasSearch(Filter):124 """125 Incremental search is active.126 """127 def __call__(self, cli):128 return cli.is_searching129 def __repr__(self):130 return 'HasSearch()'131@memoized()132class IsReturning(Filter):133 """134 When a return value has been set.135 """136 def __call__(self, cli):137 return cli.is_returning138 def __repr__(self):139 return 'IsReturning()'140@memoized()141class IsAborting(Filter):142 """143 True when aborting. (E.g. Control-C pressed.)144 """145 def __call__(self, cli):146 return cli.is_aborting147 def __repr__(self):148 return 'IsAborting()'149@memoized()150class IsExiting(Filter):151 """152 True when exiting. (E.g. Control-D pressed.)153 """154 def __call__(self, cli):155 return cli.is_exiting156 def __repr__(self):157 return 'IsExiting()'158@memoized()159class IsDone(Filter):160 """161 True when the CLI is returning, aborting or exiting.162 """163 def __call__(self, cli):164 return cli.is_done165 def __repr__(self):166 return 'IsDone()'167@memoized()168class RendererHeightIsKnown(Filter):169 """170 Only True when the renderer knows it's real height.171 (On VT100 terminals, we have to wait for a CPR response, before we can be172 sure of the available height between the cursor position and the bottom of173 the terminal. And usually it's nicer to wait with drawing bottom toolbars174 until we receive the height, in order to avoid flickering -- first drawing175 somewhere in the middle, and then again at the bottom.)176 """177 def __call__(self, cli):178 return cli.renderer.height_is_known179 def __repr__(self):180 return 'RendererHeightIsKnown()'181@memoized()182class InEditingMode(Filter):183 """184 Check whether a given editing mode is active. (Vi or Emacs.)185 """186 def __init__(self, editing_mode):187 self._editing_mode = editing_mode188 @property189 def editing_mode(self):190 " The given editing mode. (Read-only) "191 return self._editing_mode192 def __call__(self, cli):193 return cli.editing_mode == self.editing_mode194 def __repr__(self):195 return 'InEditingMode(%r)' % (self.editing_mode, )196@memoized()197class ViMode(Filter):198 def __call__(self, cli):199 return cli.editing_mode == EditingMode.VI200 def __repr__(self):201 return 'ViMode()'202@memoized()203class ViNavigationMode(Filter):204 """205 Active when the set for Vi navigation key bindings are active.206 """207 def __call__(self, cli):208 if (cli.editing_mode != EditingMode.VI209 or cli.vi_state.operator_func210 or cli.vi_state.waiting_for_digraph211 or cli.current_buffer.selection_state):212 return False213 return (cli.vi_state.input_mode == ViInputMode.NAVIGATION or214 cli.current_buffer.read_only())215 def __repr__(self):216 return 'ViNavigationMode()'217@memoized()218class ViInsertMode(Filter):219 def __call__(self, cli):220 if (cli.editing_mode != EditingMode.VI221 or cli.vi_state.operator_func222 or cli.vi_state.waiting_for_digraph223 or cli.current_buffer.selection_state224 or cli.current_buffer.read_only()):225 return False226 return cli.vi_state.input_mode == ViInputMode.INSERT227 def __repr__(self):228 return 'ViInputMode()'229@memoized()230class ViInsertMultipleMode(Filter):231 def __call__(self, cli):232 if (cli.editing_mode != EditingMode.VI233 or cli.vi_state.operator_func234 or cli.vi_state.waiting_for_digraph235 or cli.current_buffer.selection_state236 or cli.current_buffer.read_only()):237 return False238 return cli.vi_state.input_mode == ViInputMode.INSERT_MULTIPLE239 def __repr__(self):240 return 'ViInsertMultipleMode()'241@memoized()242class ViReplaceMode(Filter):243 def __call__(self, cli):244 if (cli.editing_mode != EditingMode.VI245 or cli.vi_state.operator_func246 or cli.vi_state.waiting_for_digraph247 or cli.current_buffer.selection_state248 or cli.current_buffer.read_only()):249 return False250 return cli.vi_state.input_mode == ViInputMode.REPLACE251 def __repr__(self):252 return 'ViReplaceMode()'253@memoized()254class ViSelectionMode(Filter):255 def __call__(self, cli):256 if cli.editing_mode != EditingMode.VI:257 return False258 return bool(cli.current_buffer.selection_state)259 def __repr__(self):260 return 'ViSelectionMode()'261@memoized()262class ViWaitingForTextObjectMode(Filter):263 def __call__(self, cli):264 if cli.editing_mode != EditingMode.VI:265 return False266 return cli.vi_state.operator_func is not None267 def __repr__(self):268 return 'ViWaitingForTextObjectMode()'269@memoized()270class ViDigraphMode(Filter):271 def __call__(self, cli):272 if cli.editing_mode != EditingMode.VI:273 return False274 return cli.vi_state.waiting_for_digraph275 def __repr__(self):276 return 'ViDigraphMode()'277@memoized()278class EmacsMode(Filter):279 " When the Emacs bindings are active. "280 def __call__(self, cli):281 return cli.editing_mode == EditingMode.EMACS282 def __repr__(self):283 return 'EmacsMode()'284@memoized()285class EmacsInsertMode(Filter):286 def __call__(self, cli):287 if (cli.editing_mode != EditingMode.EMACS288 or cli.current_buffer.selection_state289 or cli.current_buffer.read_only()):290 return False291 return True292 def __repr__(self):293 return 'EmacsInsertMode()'294@memoized()295class EmacsSelectionMode(Filter):296 def __call__(self, cli):297 return (cli.editing_mode == EditingMode.EMACS298 and cli.current_buffer.selection_state)299 def __repr__(self):...

Full Screen

Full Screen

test_task_1_cli_commands_1.py

Source:test_task_1_cli_commands_1.py Github

copy

Full Screen

1import random2from task_1_chess_board.task_1 import ChessBoard3def test_chess_board_cli_valid_parameters(cli_client):4 '''5 Test task_1_chess_board/task_1.py valid parameters6 '''7 chess_board_height = random.randint(1, 25)8 chess_board_width = random.randint(1, 25)9 #10 command = (11 'python',12 'task_1_chess_board/task_1.py',13 '-ht',14 f'{chess_board_height}',15 '-wt',16 f'{chess_board_width}'17 )18 cli_out, cli_error, exitcode = cli_client(command)19 cli_out = cli_out.decode()20 #21 a_chess_board = ChessBoard(chess_board_height, chess_board_width)22 a_chess_board = f'{str(a_chess_board)}\n'23 #24 assert 0 == exitcode25 assert b'' == cli_error26 assert a_chess_board == cli_out27def test_chess_board_cli_no_parameters(cli_client):28 '''29 Test task_1_chess_board/task_1.py no parameters30 '''31 command = ('python', 'task_1_chess_board/task_1.py')32 cli_out, cli_error, exitcode = cli_client(command)33 HELP_MSG = (34 b"*** Welcome to the ChessBoard generator ***\n"35 b"You can generate a chess board with custom height "36 b"and width parameters \n"37 b"height and width parameters that must be a positive integers \n"38 b"Example: python task_1.py --height 8 --width 8 \n"39 b"Example: python task_1.py -ht 8 -wt 8 \n\n"40 )41 assert 0 == exitcode42 assert b'' == cli_error43 assert HELP_MSG == cli_out44def test_chess_board_cli_only_height_param(cli_client):45 '''46 Test task_1_chess_board/task_1.py only -ht param47 '''48 chess_board_height = random.randint(1, 25)49 command = (50 'python',51 'task_1_chess_board/task_1.py',52 '-ht',53 f'{chess_board_height}',54 )55 cli_out, cli_error, exitcode = cli_client(command)56 cli_out = cli_out57 HELP_MSG = (58 b"*** Welcome to the ChessBoard generator ***\n"59 b"You can generate a chess board with custom height "60 b"and width parameters \n"61 b"height and width parameters that must be a positive integers \n"62 b"Example: python task_1.py --height 8 --width 8 \n"63 b"Example: python task_1.py -ht 8 -wt 8 \n\n"64 )65 assert 0 == exitcode66 assert b'' == cli_error67 assert HELP_MSG == cli_out68def test_chess_board_cli_only_width_param(cli_client):69 '''70 Test task_1_chess_board/task_1.py only -wt param71 '''72 chess_board_width = random.randint(1, 25)73 command = (74 'python',75 'task_1_chess_board/task_1.py',76 '-wt',77 f'{chess_board_width}',78 )79 cli_out, cli_error, exitcode = cli_client(command)80 cli_out = cli_out81 HELP_MSG = (82 b"*** Welcome to the ChessBoard generator ***\n"83 b"You can generate a chess board with custom height "84 b"and width parameters \n"85 b"height and width parameters that must be a positive integers \n"86 b"Example: python task_1.py --height 8 --width 8 \n"87 b"Example: python task_1.py -ht 8 -wt 8 \n\n"88 )89 assert 0 == exitcode90 assert b'' == cli_error91 assert HELP_MSG == cli_out92def test_chess_board_cli_height_param_not_int(cli_client):93 '''94 Test task_1_chess_board/task_1.py -ht A -wt INT95 '''96 chess_board_height = random.randint(1, 25)97 chess_board_width = random.randint(1, 25)98 #99 command = (100 'python',101 'task_1_chess_board/task_1.py',102 '-ht',103 'A',104 '-wt',105 f'{chess_board_width}'106 )107 cli_out, cli_error, exitcode = cli_client(command)108 cli_out = cli_out.decode()109 #110 a_chess_board = ChessBoard(chess_board_height, chess_board_width)111 a_chess_board = f'{str(a_chess_board)}\n'112 #113 ERROR_MSG = (114 b'usage: task_1.py [-h] [-ht HEIGHT] [-wt WIDTH]\n'115 b'task_1.py: error: argument -ht/--height: A is not an integer\n'116 )117 #118 assert 2 == exitcode119 assert ERROR_MSG == cli_error120 assert '' == cli_out121def test_chess_board_cli_width_param_not_int(cli_client):122 '''123 Test task_1_chess_board/task_1.py -ht INT -wt A124 '''125 chess_board_height = random.randint(1, 25)126 chess_board_width = random.randint(1, 25)127 #128 command = (129 'python',130 'task_1_chess_board/task_1.py',131 '-ht',132 f'{chess_board_height}',133 '-wt',134 'B'135 )136 cli_out, cli_error, exitcode = cli_client(command)137 cli_out = cli_out.decode()138 #139 a_chess_board = ChessBoard(chess_board_height, chess_board_width)140 a_chess_board = f'{str(a_chess_board)}\n'141 #142 ERROR_MSG = (143 b'usage: task_1.py [-h] [-ht HEIGHT] [-wt WIDTH]\n'144 b'task_1.py: error: argument -wt/--width: B is not an integer\n'145 )146 #147 assert 2 == exitcode148 assert ERROR_MSG == cli_error149 assert '' == cli_out150def test_chess_board_cli_width_param_negative_int(cli_client):151 '''152 Test task_1_chess_board/task_1.py -ht INT -wt -10153 '''154 chess_board_height = random.randint(1, 25)155 chess_board_width = random.randint(-111, -1)156 #157 command = (158 'python',159 'task_1_chess_board/task_1.py',160 '-ht',161 f'{chess_board_height}',162 '-wt',163 f'{chess_board_width}'164 )165 cli_out, cli_error, exitcode = cli_client(command)166 cli_out = cli_out.decode()167 cli_error = cli_error.decode()168 #169 a_chess_board = ChessBoard(chess_board_height, chess_board_width)170 a_chess_board = f'{str(a_chess_board)}\n'171 #172 ERROR_MSG = (173 'usage: task_1.py [-h] [-ht HEIGHT] [-wt WIDTH]\n'174 'task_1.py: error: argument '175 f'-wt/--width: {chess_board_width} is not a positive integer\n'176 )177 #178 assert 2 == exitcode179 assert ERROR_MSG == cli_error...

Full Screen

Full Screen

test_cli.py

Source:test_cli.py Github

copy

Full Screen

...34 cli.valid_char("")35 with pytest.raises(argparse.ArgumentTypeError):36 cli.valid_char("12")37def test_cli_no_args_shows_usage(capfd):38 cli.cli(w("pricehist"))39 out, err = capfd.readouterr()40 assert "usage: pricehist" in out41 assert "optional arguments:" in out or "options:" in out42 assert "commands:" in out43def test_cli_help_shows_usage_and_exits(capfd):44 with pytest.raises(SystemExit) as e:45 cli.cli(w("pricehist -h"))46 assert e.value.code == 047 out, err = capfd.readouterr()48 assert "usage: pricehist" in out49 assert "optional arguments:" in out or "options:" in out50 assert "commands:" in out51def test_cli_verbose(capfd, mocker):52 cli.cli(w("pricehist --verbose"))53 out, err = capfd.readouterr()54 assert "Ended pricehist run at" in err55def test_cli_version(capfd):56 cli.cli(w("pricehist --version"))57 out, err = capfd.readouterr()58 assert f"pricehist {__version__}\n" == out59def test_cli_sources(capfd):60 cli.cli(w("pricehist sources"))61 out, err = capfd.readouterr()62 for source_id in sources.by_id.keys():63 assert source_id in out64def test_cli_source(capfd):65 expected = sources.by_id["ecb"].format_info() + "\n"66 cli.cli(w("pricehist source ecb"))67 out, err = capfd.readouterr()68 assert out == expected69def test_cli_source_symbols(capfd, mocker):70 sources.by_id["ecb"].symbols = mocker.MagicMock(71 return_value=[("EUR/AUD", "Euro against Australian Dollar")]72 )73 cli.cli(w("pricehist source ecb --symbols"))74 out, err = capfd.readouterr()75 assert out == "EUR/AUD Euro against Australian Dollar\n"76def test_cli_source_search(capfd, mocker):77 sources.by_id["alphavantage"].search = mocker.MagicMock(78 return_value=[("TSLA", "Tesla Inc, Equity, United States, USD")]79 )80 cli.cli(w("pricehist source alphavantage --search TSLA"))81 out, err = capfd.readouterr()82 assert out == "TSLA Tesla Inc, Equity, United States, USD\n"83def test_cli_source_fetch(capfd, mocker):84 formatted_result = "P 2021-01-01 00:00:00 BTC 24139.4648 EUR\n"85 cli.fetch = mocker.MagicMock(return_value=formatted_result)86 argv = w("pricehist fetch coindesk BTC/EUR -s 2021-01-01 -e 2021-01-01 -o ledger")87 cli.cli(argv)88 out, err = capfd.readouterr()89 assert out == formatted_result90def test_cli_source_fetch_invalid_start(capfd, mocker):91 argv = w("pricehist fetch coindesk BTC/EUR -s 2021-01-01 -e 2020-12-01")92 with pytest.raises(SystemExit) as e:93 cli.cli(argv)94 assert e.value.code != 095 out, err = capfd.readouterr()96 assert "end date '2020-12-01' preceeds the start date" in err97def test_cli_source_fetch_invalid_type(capfd, mocker):98 argv = w("pricehist fetch coindesk BTC/EUR -t notype")99 with pytest.raises(SystemExit) as e:100 cli.cli(argv)101 assert e.value.code != 0102 out, err = capfd.readouterr()103 assert "price type 'notype' is not recognized" in err104def test_cli_source_fetch_sets_source_defaults(mocker):105 cli.fetch = mocker.MagicMock(return_value="")106 cli.cli(w("pricehist fetch coindesk BTC/EUR"))107 captured_series = cli.fetch.call_args.args[0]108 assert captured_series.start == sources.by_id["coindesk"].start()109 assert captured_series.type == sources.by_id["coindesk"].types()[0]110def test_cli_source_fetch_normalizes_symbols(mocker):111 cli.fetch = mocker.MagicMock(return_value="")112 cli.cli(w("pricehist fetch coindesk btc/eur"))113 captured_series = cli.fetch.call_args.args[0]114 assert captured_series.base == "BTC"115 assert captured_series.quote == "EUR"116def test_cli_source_fetch_handles_brokenpipeerror(caplog, mocker):117 cli.fetch = mocker.MagicMock(side_effect=BrokenPipeError())118 cli.cli(w("pricehist fetch coindesk BTC/EUR --verbose"))119 assert any(120 [121 "DEBUG" == r.levelname and "output pipe was closed early" in r.message122 for r in caplog.records123 ]...

Full Screen

Full Screen

bitcoin_cli.py

Source:bitcoin_cli.py Github

copy

Full Screen

...20 rpc_response = self.nodes[0].getblockchaininfo()21 assert_equal(cli_response, rpc_response)22 user, password = get_auth_cookie(self.nodes[0].datadir)23 self.log.info("Test -stdinrpcpass option")24 assert_equal(0, self.nodes[0].cli('-rpcuser=%s' % user, '-stdinrpcpass', input=password).getblockcount())25 assert_raises_process_error(1, "incorrect rpcuser or rpcpassword", self.nodes[0].cli('-rpcuser=%s' % user, '-stdinrpcpass', input="foo").echo)26 self.log.info("Test -stdin and -stdinrpcpass")27 assert_equal(["foo", "bar"], self.nodes[0].cli('-rpcuser=%s' % user, '-stdin', '-stdinrpcpass', input=password + "\nfoo\nbar").echo())28 assert_raises_process_error(1, "incorrect rpcuser or rpcpassword", self.nodes[0].cli('-rpcuser=%s' % user, '-stdin', '-stdinrpcpass', input="foo").echo)29 self.log.info("Compare responses from `bitcoin-cli -getinfo` and the RPCs data is retrieved from.")30 cli_get_info = self.nodes[0].cli('-getinfo').help()31 wallet_info = self.nodes[0].getwalletinfo()32 network_info = self.nodes[0].getnetworkinfo()33 blockchain_info = self.nodes[0].getblockchaininfo()34 assert_equal(cli_get_info['version'], network_info['version'])35 assert_equal(cli_get_info['protocolversion'], network_info['protocolversion'])36 assert_equal(cli_get_info['walletversion'], wallet_info['walletversion'])37 assert_equal(cli_get_info['balance'], wallet_info['balance'])38 assert_equal(cli_get_info['blocks'], blockchain_info['blocks'])39 assert_equal(cli_get_info['timeoffset'], network_info['timeoffset'])40 assert_equal(cli_get_info['connections'], network_info['connections'])41 assert_equal(cli_get_info['proxy'], network_info['networks'][0]['proxy'])42 assert_equal(cli_get_info['difficulty'], blockchain_info['difficulty'])43 assert_equal(cli_get_info['testnet'], blockchain_info['chain'] == "test")44 assert_equal(cli_get_info['balance'], wallet_info['balance'])...

Full Screen

Full Screen

c2json.py

Source:c2json.py Github

copy

Full Screen

1"""Generate a keymap.json from a keymap.c file.2"""3import json4from argcomplete.completers import FilesCompleter5from milc import cli6import qmk.keymap7import qmk.path8from qmk.json_encoders import InfoJSONEncoder9from qmk.keyboard import keyboard_completer, keyboard_folder10from qmk.errors import CppError11@cli.argument('--no-cpp', arg_only=True, action='store_false', help='Do not use \'cpp\' on keymap.c')12@cli.argument('-o', '--output', arg_only=True, type=qmk.path.normpath, help='File to write to')13@cli.argument('-q', '--quiet', arg_only=True, action='store_true', help="Quiet mode, only output error messages")14@cli.argument('-kb', '--keyboard', arg_only=True, type=keyboard_folder, completer=keyboard_completer, required=True, help='The keyboard\'s name')15@cli.argument('-km', '--keymap', arg_only=True, required=True, help='The keymap\'s name')16@cli.argument('filename', arg_only=True, completer=FilesCompleter('.c'), help='keymap.c file')17@cli.subcommand('Creates a keymap.json from a keymap.c file.')18def c2json(cli):19 """Generate a keymap.json from a keymap.c file.20 This command uses the `qmk.keymap` module to generate a keymap.json from a keymap.c file. The generated keymap is written to stdout, or to a file if -o is provided.21 """22 if cli.args.filename != '-':23 cli.args.filename = qmk.path.normpath(cli.args.filename)24 # Error checking25 if not cli.args.filename.exists():26 cli.log.error('C file does not exist!')27 cli.print_usage()28 return False29 # Environment processing30 if cli.args.output == ('-'):31 cli.args.output = None32 # Parse the keymap.c33 try:34 keymap_json = qmk.keymap.c2json(cli.args.keyboard, cli.args.keymap, cli.args.filename, use_cpp=cli.args.no_cpp)35 except CppError as e:36 if cli.config.general.verbose:37 cli.log.debug('The C pre-processor ran into a fatal error: %s', e)38 cli.log.error('Something went wrong. Try to use --no-cpp.\nUse the CLI in verbose mode to find out more.')39 return False40 # Generate the keymap.json41 try:42 keymap_json = qmk.keymap.generate_json(keymap_json['keymap'], keymap_json['keyboard'], keymap_json['layout'], keymap_json['layers'])43 except KeyError:44 cli.log.error('Something went wrong. Try to use --no-cpp.')45 return False46 if cli.args.output:47 cli.args.output.parent.mkdir(parents=True, exist_ok=True)48 if cli.args.output.exists():49 cli.args.output.replace(cli.args.output.parent / (cli.args.output.name + '.bak'))50 cli.args.output.write_text(json.dumps(keymap_json, cls=InfoJSONEncoder))51 if not cli.args.quiet:52 cli.log.info('Wrote keymap to %s.', cli.args.output)53 else:...

Full Screen

Full Screen

eth_wallet_cli.py

Source:eth_wallet_cli.py Github

copy

Full Screen

...26from eth_wallet.cli.network import (27 network,28)29@click.group()30def eth_wallet_cli():31 pass32eth_wallet_cli.add_command(new_wallet)33eth_wallet_cli.add_command(get_wallet)34eth_wallet_cli.add_command(reveal_seed)35eth_wallet_cli.add_command(get_balance)36eth_wallet_cli.add_command(send_transaction)37eth_wallet_cli.add_command(restore_wallet)38eth_wallet_cli.add_command(add_token)39eth_wallet_cli.add_command(list_tokens)40eth_wallet_cli.add_command(network)41if __name__ == "__main__":...

Full Screen

Full Screen

Using AI Code Generation

copy

Full Screen

1const { cli } = require('synthetixio-synpress');2const { assert } = require('chai');3describe('test2', () => {4 before(async () => {5 cli({6 });7 });8 it('should run test1.js', async () => {9 assert.equal(true, true);10 });11});12const { cli } = require('synthetixio-synpress');13const { assert } = require('chai');14describe('test1', () => {15 before(async () => {16 cli({17 });18 });19 it('should run test1.js', async () => {20 assert.equal(true, true);21 });22});23[MIT](LICENSE)

Full Screen

Using AI Code Generation

copy

Full Screen

1const { cli } = require('@synthetixio/synpress');2const { ethers } = require('hardhat');3const { assert } = require('chai');4const { toBN } = ethers.utils;5const { toBytes32 } = require('../..');6describe('Synthetix', () => {7 let owner, account1, account2;8 before(async () => {9 [owner, account1, account2] = await cli.accountsOf('owner', 'account1', 'account2');10 });11 it('should transfer SNX from the owner to account1', async () => {12 const amountToTransfer = toBN('1000000000000000000');13 await cli.synthetix().transfer(account1.address, amountToTransfer);14 assert.bnEqual(await cli.synthetix().balanceOf(owner.address), toBN('0'));15 assert.bnEqual(await cli.synthetix().balanceOf(account1.address), amountToTransfer);16 });17 it('should transfer SNX from account1 to account2', async () => {18 const amountToTransfer = toBN('1000000000000000000');19 await cli.synthetix().transfer(account2.address, amountToTransfer, { from: account1.address });20 assert.bnEqual(await cli.synthetix().balanceOf(account1.address), toBN('0'));21 assert.bnEqual(await cli.synthetix().balanceOf(account2.address), amountToTransfer);22 });23 it('should issue sUSD on behalf of account1', async () => {24 const amountToIssue = toBN('1000000000000000000');25 await cli.synthetix().issueSynths(amountToIssue, { from: account1.address });26 assert.bnEqual(await cli.synthetix().balanceOf(account1.address), amountToIssue);27 assert.bnEqual(await cli.synthsUSD().balanceOf(account1.address), amountToIssue);28 assert.bnEqual(await cli.synthetix().totalSupply(), amountToIssue);29 });30 it('should exchange sUSD to sETH on behalf of account1', async () => {31 const amountToExchange = toBN('1000000000000000000');32 const sUSDContract = cli.synthsUSD();33 const sETHContract = cli.synths(toBytes32('sETH'));

Full Screen

Using AI Code Generation

copy

Full Screen

1const { deployContract, Synpress } = require('synpress');2const { getContract } = require('./utils/contracts');3const synpress = new Synpress();4const main = async () => {5 const Synthetix = getContract('Synthetix');6 await synpress.deploy(Synthetix);7 const snx = await synpress.deployed(Synthetix);8 console.log('Synthetix address is', snx.address);9 console.log('Synthetix owner is', await snx.owner());10};11main();12const { getContract } = require('synpress');13const main = async () => {14 const Synthetix = getContract('Synthetix');15 const snx = await Synthetix.deployed();16 console.log('Synthetix address is', snx.address);17 console.log('Synthetix owner is', await snx.owner());18};19main();

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run synthetixio-synpress automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful