Best Python code snippet using gabbi_python
tasks.py
Source:tasks.py  
...122    _print_output(COLOR_RED_BOLD, ''.join(('ERROR: ', message, '\n')), *args, **kwargs)123def _error_output_exit(message, *args, **kwargs):124    _error_output(message, *args, **kwargs)125    sys.exit(1)126def _verbose_output(verbose, message, *args, **kwargs):127    if verbose:128        _print_output(COLOR_GRAY_LIGHT, ''.join(('DEBUG: ', message, '\n')), *args, **kwargs)129def _case_sensitive_regular_file_exists(filename):130    if not os.path.isfile(filename):131        # Short circuit132        return False133    directory, filename = os.path.split(filename)134    return filename in os.listdir(directory)135def _get_root_directory():136    root_directory = subprocess.check_output(137        ['git', 'rev-parse', '--show-toplevel'],138        stderr=sys.stderr,139    ).decode('utf8').strip()140    if not root_directory:141        _error_output_exit('Failed to find Git root directory.')142    return root_directory143def _setup_task(no_stash, verbose):144    if not no_stash:145        global __POST_APPLY146        # stash changes before we execute task147        _verbose_output(verbose, 'Stashing changes...')148        result = subprocess.check_output(149            ['git', 'stash'],150            stderr=sys.stderr,151        ).decode('utf8')152        if result.startswith('Saved'):153            __POST_APPLY = True154        _verbose_output(verbose, 'Finished stashing changes.')155def _cleanup_task(verbose):156    if __POST_APPLY:157        _verbose_output(verbose, 'Un-stashing changes...')158        subprocess.check_output(159            ['git', 'stash', 'pop'],160            stderr=sys.stderr,161        )162        _verbose_output(verbose, 'Finished un-stashing changes.')163def _write_to_version_file(release_version, version_info, version_separator, verbose):164    _verbose_output(verbose, 'Writing version to {}...', VERSION_FILENAME)165    if not _case_sensitive_regular_file_exists(VERSION_FILENAME):166        raise ReleaseFailure(167            'Failed to find version file: {}. File names are case sensitive!'.format(VERSION_FILENAME),168        )169    if VERSION_FILE_IS_TXT:170        with codecs.open(VERSION_FILENAME, 'wb', encoding='utf8') as version_write:171            version_write.write(release_version)172    else:173        with codecs.open(VERSION_FILENAME, 'rb', encoding='utf8') as version_read:174            output = []175            version_info_written = False176            # We replace u' with ' in this, because Py2 projects should use unicode_literals in their version file177            version_info = VERSION_INFO_VARIABLE_TEMPLATE.format(tuple(version_info)).replace(", u'", ", '")178            for line in version_read:179                if line.startswith('__version_info__'):180                    output.append(version_info)181                    version_info_written = True182                elif line.startswith('__version__'):183                    if not version_info_written:184                        output.append(version_info)185                    output.append(VERSION_VARIABLE_TEMPLATE.format(version_separator))186                else:187                    output.append(line.rstrip())188        with codecs.open(VERSION_FILENAME, 'wb', encoding='utf8') as version_write:189            for line in output:190                version_write.write(line + '\n')191    _verbose_output(verbose, 'Finished writing to {}.version.', MODULE_NAME)192def _gather_commit_messages(verbose):193    _verbose_output(verbose, 'Gathering commit messages since last release commit.')194    command = [195        'git',196        'log',197        '-1',198        '--format=%H',199        '--grep={}'.format(RELEASE_MESSAGE_TEMPLATE.replace(' {}', '').replace('"', '\\"'))200    ]201    _verbose_output(verbose, 'Running command: "{}"', '" "'.join(command))202    commit_hash = subprocess.check_output(command, stderr=sys.stderr).decode('utf8').strip()203    if not commit_hash:204        _verbose_output(verbose, 'No previous release commit was found. Not gathering messages.')205        return []206    command = [207        'git',208        'log',209        '--format=%s',210        '{}..HEAD'.format(commit_hash)211    ]212    _verbose_output(verbose, 'Running command: "{}"', '" "'.join(command))213    output = subprocess.check_output(command, stderr=sys.stderr).decode('utf8')214    messages = []215    for message in output.splitlines():216        if not message.strip().startswith('Merge pull request #'):217            messages.append('- {}'.format(message))218    _verbose_output(219        verbose,220        'Returning {number} commit messages gathered since last release commit:\n{messages}',221        number=len(messages),222        messages=messages,223    )224    return messages225def _prompt_for_changelog(verbose):226    built_up_changelog = []227    changelog_header = []228    changelog_message = []229    changelog_footer = []230    _verbose_output(verbose, 'Reading changelog file {} looking for built-up changes...', CHANGELOG_FILENAME)231    with codecs.open(CHANGELOG_FILENAME, 'rb', encoding='utf8') as changelog_read:232        previous_line = ''233        passed_header = passed_changelog = False234        for line_number, line in enumerate(changelog_read):235            if not passed_header:236                changelog_header.append(line)237                # .txt and .md changelog files start like this:238                #     Changelog239                #     =========240                # .rst changelog files start like this241                #     =========242                #     Changelog243                #     =========244                if line_number > 0 and RE_CHANGELOG_FILE_HEADER.search(line):245                    passed_header = True246                continue247            if not passed_changelog and RE_CHANGELOG_VERSION_HEADER.search(line):248                changelog_footer.append(previous_line)249                passed_changelog = True250            if passed_changelog:251                changelog_footer.append(line)252            else:253                if previous_line.strip():254                    built_up_changelog.append(previous_line)255                previous_line = line256    if len(built_up_changelog) > 0:257        _verbose_output(verbose, 'Read {} lines of built-up changelog text:', len(built_up_changelog))258        if verbose:259            _verbose_output(verbose, six.text_type(built_up_changelog))260        _standard_output('There are existing changelog details for this release. You can "edit" the changes, '261                         '"accept" them as-is, delete them and create a "new" changelog message, or "delete" '262                         'them and enter no changelog.')263        instruction = _prompt('How would you like to proceed? (EDIT/new/accept/delete/exit):').lower()264        if instruction in (INSTRUCTION_NEW, INSTRUCTION_DELETE):265            built_up_changelog = []266        if instruction == INSTRUCTION_ACCEPT:267            changelog_message = built_up_changelog268        if not instruction or instruction in (INSTRUCTION_EDIT, INSTRUCTION_NEW):269            instruction = INSTRUCTION_YES270    else:271        _verbose_output(verbose, 'No existing lines of built-up changelog text were read.')272        instruction = _prompt(273            'Would you like to enter changelog details for this release? (Y/n/exit):',274        ).lower() or INSTRUCTION_YES275    if instruction == INSTRUCTION_EXIT:276        raise ReleaseExit()277    if instruction == INSTRUCTION_YES:278        gather = _prompt(279            'Would you like to{also} gather commit messages from recent commits and add them to the '280            'changelog? ({y_n}/exit):',281            **({'also': ' also', 'y_n': 'y/N'} if built_up_changelog else {'also': '', 'y_n': 'Y/n'})282        ).lower() or (INSTRUCTION_NO if built_up_changelog else INSTRUCTION_YES)283        commit_messages = []284        if gather == INSTRUCTION_YES:285            commit_messages = _gather_commit_messages(verbose)286        elif gather == INSTRUCTION_EXIT:287            raise ReleaseExit()288        tf_o = tempfile.NamedTemporaryFile(mode='wb')289        codec = codecs.lookup('utf8')290        with codecs.StreamReaderWriter(tf_o, codec.streamreader, codec.streamwriter, 'strict') as tf:291            _verbose_output(verbose, 'Opened temporary file {} for editing changelog.', tf.name)292            if commit_messages:293                tf.write('\n'.join(commit_messages) + '\n')294            if built_up_changelog:295                tf.writelines(built_up_changelog)296            tf.writelines([297                '\n',298                '# Enter your changelog message above this comment, then save and close editor when finished.\n',299                '# Any existing contents were pulled from changes to CHANGELOG.txt since the last release.\n',300                '# Leave it blank (delete all existing contents) to release with no changelog details.\n',301                '# All lines starting with "#" are comments and ignored.\n',302                '# As a best practice, if you are entering multiple items as a list, prefix each item with a "-".'303            ])304            tf.flush()305            _verbose_output(verbose, 'Wrote existing changelog contents and instructions to temporary file.')306            editor = os.environ.get('INVOKE_RELEASE_EDITOR', os.environ.get('EDITOR', 'vim'))307            _verbose_output(verbose, 'Opening editor {} to edit changelog.', editor)308            try:309                subprocess.check_call(310                    shlex.split(editor) + [tf.name],311                    stdout=sys.stdout,312                    stderr=sys.stderr,313                )314            except (subprocess.CalledProcessError, OSError) as e:315                args = {'editor': editor}316                if isinstance(e, OSError):317                    message = 'Failed to open changelog editor `{editor}` due to error: {error} (err {error_code}).'318                    args.update(error=e.strerror, error_code=e.errno)319                else:320                    message = 'Failed to open changelog editor `{editor}` due to return code: {return_code}.'321                    args.update(return_code=e.returncode)322                message += (323                    ' Try setting $INVOKE_RELEASE_EDITOR or $EDITOR in your shell profile to the full path to '324                    'Vim or another editor.'325                )326                raise ReleaseFailure(message.format(**args))327            _verbose_output(verbose, 'User has closed editor')328            with codecs.open(tf.name, 'rb', encoding='utf8') as read:329                first_line = True330                last_line_blank = False331                for line in read:332                    line_blank = not line.strip()333                    if (first_line or last_line_blank) and line_blank:334                        # Suppress leading blank lines and compress multiple blank lines into one335                        continue336                    if line.startswith(CHANGELOG_COMMENT_FIRST_CHAR):337                        # Suppress comments338                        continue339                    changelog_message.append(line)340                    last_line_blank = line_blank341                    first_line = False342                if last_line_blank:343                    # Suppress trailing blank lines344                    changelog_message.pop()345            _verbose_output(verbose, 'Changelog message read from temporary file:\n{}', changelog_message)346    return changelog_header, changelog_message, changelog_footer347def _write_to_changelog_file(release_version, changelog_header, changelog_message, changelog_footer, verbose):348    _verbose_output(verbose, 'Writing changelog contents to {}.', CHANGELOG_FILENAME)349    if not _case_sensitive_regular_file_exists(CHANGELOG_FILENAME):350        raise ReleaseFailure(351            'Failed to find changelog file: {}. File names are case sensitive!'.format(CHANGELOG_FILENAME),352        )353    with codecs.open(CHANGELOG_FILENAME, 'wb', encoding='utf8') as changelog_write:354        header_line = '{version} ({date})'.format(355            version=release_version,356            date=datetime.datetime.now().strftime('%Y-%m-%d'),357        )358        changelog_write.writelines(changelog_header + ['\n'])359        if changelog_message:360            changelog_write.writelines([361                header_line, '\n', '-' * len(header_line), '\n',362            ])363            changelog_write.writelines(changelog_message + ['\n'])364        changelog_write.writelines(changelog_footer)365    _verbose_output(verbose, 'Finished writing to changelog.')366def _tag_branch(release_version, changelog_lines, verbose, overwrite=False):367    _verbose_output(verbose, 'Tagging branch...')368    try:369        gpg = subprocess.check_output(['which', 'gpg']).decode('utf8').strip()370        _verbose_output(verbose, 'Found location of `gpg` to be {}'.format(gpg))371    except subprocess.CalledProcessError:372        gpg = None373    if not gpg:374        try:375            gpg = subprocess.check_output(['which', 'gpg2']).decode('utf8').strip()376            _verbose_output(verbose, 'Found location of `gpg2` to be {}'.format(gpg))377        except subprocess.CalledProcessError:378            gpg = None379    try:380        tty = subprocess.check_output(['tty']).decode('utf8').strip()381        _verbose_output(verbose, 'Found location of `tty` to be {}'.format(tty))382    except subprocess.CalledProcessError:383        _verbose_output(verbose, 'Could not get tty path ... Maybe a problem? Maybe not.')384        tty = ''385    release_message = RELEASE_MESSAGE_TEMPLATE.format(release_version)386    if changelog_lines:387        release_message += '\n\nChangelog Details:'388        for line in changelog_lines:389            release_message += '\n' + line.strip()390    cmd = ['git', 'tag', '-a', release_version, '-m', release_message]391    if overwrite:392        cmd.append('-f')393    signed = False394    if gpg:395        sign_with_key = _prompt(396            'GPG is installed on your system. Would you like to sign the release tag with your GitHub committer email '397            'GPG key? (y/N/[alternative key ID]):',398        ).lower() or INSTRUCTION_NO399        if sign_with_key == INSTRUCTION_YES:400            cmd.append('-s')401        elif sign_with_key != INSTRUCTION_NO:402            cmd.extend(['-u', sign_with_key])403        if sign_with_key != INSTRUCTION_NO:404            signed = True405            try:406                subprocess.check_output(407                    ['git', 'config', '--global', 'gpg.program', gpg],408                )409            except subprocess.CalledProcessError as e:410                raise ReleaseFailure(411                    'Failed to configure Git+GPG. Something is not right. Aborting.\n{code}: {output}'.format(412                        code=e.returncode,413                        output=e.output.decode('utf8'),414                    )415                )416    else:417        _standard_output('GPG is not installed on your system. Will not sign the release tag.')418    try:419        result = subprocess.check_output(420            cmd,421            stderr=subprocess.STDOUT,422            env=dict(os.environ, GPG_TTY=tty),423        ).decode('utf8')424    except subprocess.CalledProcessError as e:425        result = '`git` command exit code {code} - {output}'.format(code=e.returncode, output=e.output.decode('utf8'))426    if result:427        if 'unable to sign the tag' in result:428            raise ReleaseFailure(429                'Failed tagging branch due to error signing the tag. Perhaps you need to create a code-signing key, or '430                'the alternate key ID you specified was incorrect?\n\n'431                'Suggestions:\n'432                ' - Generate a key with `{gpg} --get-key` (GPG v1) or `{gpg} --full-gen-key` (GPG v2) (and use 4096)\n'433                ' - It is not enough for the key email to match your committer email; the full display name must '434                'match, too (e.g. "First Last <email@example.org>")\n'435                ' - If the key display name does not match the committer display name, use the alternate key ID\n'436                'Error output: {output}'.format(gpg=gpg, output=result)437            )438        raise ReleaseFailure('Failed tagging branch: {}'.format(result))439    if signed:440        try:441            subprocess.check_call(442                ['git', 'tag', '-v', release_version],443                stdout=sys.stdout,444                stderr=sys.stderr,445            )446        except subprocess.CalledProcessError:447            raise ReleaseFailure(448                'Successfully created a signed release tag, but failed to verify its signature. Something is not right.'449            )450    _verbose_output(verbose, 'Finished tagging branch.')451def _commit_release_changes(release_version, changelog_lines, verbose):452    _verbose_output(verbose, 'Committing release changes...')453    files_to_commit = [VERSION_FILENAME, CHANGELOG_FILENAME] + _get_extra_files_to_commit()454    _verbose_output(verbose, 'Staging changes for files {}.'.format(files_to_commit))455    try:456        result = subprocess.check_output(457            ['git', 'add'] + files_to_commit,458            stderr=subprocess.STDOUT,459        )460    except subprocess.CalledProcessError as e:461        result = '`git` command exit code {code} - {output}'.format(code=e.returncode, output=e.output.decode('utf8'))462    if result:463        raise ReleaseFailure('Failed staging release files for commit: {}'.format(result))464    release_message = [RELEASE_MESSAGE_TEMPLATE.format(release_version)]465    if changelog_lines:466        release_message.append('\nChangelog Details:')467        for line in changelog_lines:468            release_message.append(line.strip())469    subprocess.check_call(470        ['git', 'commit', '-m', '\n'.join(release_message)],471        stdout=sys.stdout,472        stderr=sys.stderr,473    )474    _verbose_output(verbose, 'Finished releasing changes.')475def _push_release_changes(release_version, branch_name, verbose):476    try:477        if USE_TAG:478            message = 'Push release changes and tag to remote origin (branch "{}")? (y/N/rollback):'479        else:480            message = 'Push release changes to remote origin (branch "{}")? (y/N/rollback):'481        push = _prompt(message, branch_name).lower()482    except KeyboardInterrupt:483        push = INSTRUCTION_ROLLBACK484    if push == INSTRUCTION_YES:485        _verbose_output(verbose, 'Pushing changes to remote origin...')486        subprocess.check_call(487            ['git', 'push', 'origin', '{0}:{0}'.format(branch_name)],488            stdout=sys.stdout,489            stderr=sys.stderr,490        )491        if USE_TAG:492            # push the release tag493            subprocess.check_call(494                ['git', 'push', 'origin', release_version],495                stdout=sys.stdout,496                stderr=sys.stderr,497            )498        if USE_PULL_REQUEST:499            _checkout_branch(verbose, BRANCH_MASTER)500            _delete_branch(verbose, branch_name)501        _verbose_output(verbose, 'Finished pushing changes to remote origin.')502        return PUSH_RESULT_PUSHED503    elif push == INSTRUCTION_ROLLBACK:504        _standard_output('Rolling back local release commit and tag...')505        if USE_PULL_REQUEST:506            _checkout_branch(verbose, BRANCH_MASTER)507            _delete_branch(verbose, branch_name)508        else:509            _delete_last_commit(verbose)510        if USE_TAG:511            _delete_local_tag(release_version, verbose)512        _verbose_output(verbose, 'Finished rolling back local release commit.')513        return PUSH_RESULT_ROLLBACK514    else:515        _standard_output('Not pushing changes to remote origin!')516        if USE_TAG:517            _print_output(518                COLOR_RED_BOLD,519                'Make sure you remember to explicitly push {branch} and the tag '520                '(or revert your local changes if you are trying to cancel)! '521                'You can push with the following commands:\n'522                '    git push origin {branch}:{branch}\n'523                '    git push origin "{tag}"\n',524                branch=branch_name,525                tag=release_version,526            )527        else:528            _print_output(529                COLOR_RED_BOLD,530                'Make sure you remember to explicitly push {branch} (or revert your local changes if you are '531                'trying to cancel)! You can push with the following command:\n'532                '    git push origin {branch}:{branch}\n',533                branch=branch_name,534            )535        return PUSH_RESULT_NO_ACTION536def _get_last_commit_hash(verbose):537    _verbose_output(verbose, 'Getting last commit hash...')538    commit_hash = subprocess.check_output(539        ['git', 'log', '-n', '1', '--pretty=format:%H'],540        stderr=sys.stderr,541    ).decode('utf8').strip()542    _verbose_output(verbose, 'Last commit hash is {}.', commit_hash)543    return commit_hash544def _get_commit_subject(commit_hash, verbose):545    _verbose_output(verbose, 'Getting commit message for hash {}...', commit_hash)546    message = subprocess.check_output(547        ['git', 'log', '-n', '1', '--pretty=format:%s', commit_hash],548        stderr=sys.stderr,549    ).decode('utf8').strip()550    _verbose_output(verbose, 'Commit message for hash {hash} is "{value}".', hash=commit_hash, value=message)551    return message552def _get_branch_name(verbose):553    _verbose_output(verbose, 'Determining current Git branch name.')554    branch_name = subprocess.check_output(555        ['git', 'rev-parse', '--abbrev-ref', 'HEAD'],556        stderr=sys.stderr,557    ).decode('utf8').strip()558    _verbose_output(verbose, 'Current Git branch name is {}.', branch_name)559    return branch_name560def _create_branch(verbose, branch_name):561    _verbose_output(verbose, 'Creating branch {branch}...', branch=branch_name)562    subprocess.check_call(563        ['git', 'checkout', '-b', branch_name],564        stdout=sys.stdout,565        stderr=sys.stderr,566    )567    _verbose_output(verbose, 'Done creating branch {}.', branch_name)568def _create_local_tracking_branch(verbose, branch_name):569    """Create a local tracking branch of origin/<branch_name>.570    Returns True if successful, False otherwise.571    """572    _verbose_output(573        verbose,574        'Creating local branch {branch} set up to track remote branch {branch} from \'origin\'...',575        branch=branch_name576    )577    success = True578    try:579        subprocess.check_call(580            ['git', 'checkout', '--track', 'origin/{}'.format(branch_name)],581            stdout=sys.stdout,582            stderr=sys.stderr,583        )584        _verbose_output(verbose, 'Done creating branch {}.', branch_name)585    except subprocess.CalledProcessError:586        _verbose_output(verbose, 'Creating branch {} failed.', branch_name)587        success = False588    return success589def _checkout_branch(verbose, branch_name):590    _verbose_output(verbose, 'Checking out branch {branch}...', branch=branch_name)591    subprocess.check_call(592        ['git', 'checkout', branch_name],593        stdout=sys.stdout,594        stderr=sys.stderr,595    )596    _verbose_output(verbose, 'Done checking out branch {}.', branch_name)597def _delete_branch(verbose, branch_name):598    _verbose_output(verbose, 'Deleting branch {branch}...', branch=branch_name)599    subprocess.check_call(600        ['git', 'branch', '-D', branch_name],601        stdout=sys.stdout,602        stderr=sys.stderr,603    )604    _verbose_output(verbose, 'Done deleting branch {}.', branch_name)605def _is_branch_on_remote(verbose, branch_name):606    _verbose_output(verbose, 'Checking if branch {} exists on remote...', branch_name)607    result = subprocess.check_output(608        ['git', 'ls-remote', '--heads', 'origin', branch_name],609        stderr=sys.stderr,610    ).decode('utf8').strip()611    on_remote = branch_name in result612    _verbose_output(613        verbose,614        'Result of on-remote check for branch {branch_name} is {result}.',615        branch_name=branch_name,616        result=on_remote,617    )618    return on_remote619def _create_branch_from_tag(verbose, tag_name, branch_name):620    _verbose_output(verbose, 'Creating branch {branch} from tag {tag}...', branch=branch_name, tag=tag_name)621    subprocess.check_call(622        ['git', 'checkout', 'tags/{}'.format(tag_name), '-b', branch_name],623        stdout=sys.stdout,624        stderr=sys.stderr,625    )626    _verbose_output(verbose, 'Done creating branch {}.', branch_name)627def _push_branch(verbose, branch_name):628    _verbose_output(verbose, 'Pushing branch {} to remote.', branch_name)629    subprocess.check_call(630        ['git', 'push', 'origin', '{0}:{0}'.format(branch_name)],631        stdout=sys.stdout,632        stderr=sys.stderr,633    )634    _verbose_output(verbose, 'Done pushing branch {}.', branch_name)635def _fetch_tags(verbose):636    _verbose_output(verbose, 'Fetching all remote tags...')637    subprocess.check_call(638        ['git', 'fetch', '--tags'],639        stdout=sys.stdout,640        stderr=sys.stderr,641    )642    _verbose_output(verbose, 'Done fetching tags.')643def _get_tag_list(verbose):644    _verbose_output(verbose, 'Parsing list of local tags...')645    result = subprocess.check_output(646        ['git', 'tag', '--list'],647        stderr=sys.stderr,648    ).decode('utf8').strip().split()649    _verbose_output(verbose, 'Result of tag list parsing is {}.', result)650    return result651def _does_tag_exist_locally(release_version, verbose):652    _verbose_output(verbose, 'Checking if tag {} exists locally...', release_version)653    result = subprocess.check_output(654        ['git', 'tag', '--list', release_version],655        stderr=sys.stderr,656    ).decode('utf8').strip()657    exists = release_version in result658    _verbose_output(verbose, 'Result of exists check for tag {tag} is {result}.', tag=release_version, result=exists)659    return exists660def _is_tag_on_remote(release_version, verbose):661    _verbose_output(verbose, 'Checking if tag {} was pushed to remote...', release_version)662    result = subprocess.check_output(663        ['git', 'ls-remote', '--tags', 'origin', release_version],664        stderr=sys.stderr,665    ).decode('utf8').strip()666    on_remote = release_version in result667    _verbose_output(668        verbose,669        'Result of on-remote check for tag {tag} is {result}.',670        tag=release_version,671        result=on_remote,672    )673    return on_remote674def _get_remote_branches_with_commit(commit_hash, verbose):675    _verbose_output(verbose, 'Checking if commit {} was pushed to any remote branches...', commit_hash)676    result = subprocess.check_output(677        ['git', 'branch', '-r', '--contains', commit_hash],678        stderr=sys.stderr,679    ).decode('utf8').strip()680    on_remote = []681    for line in result.splitlines():682        line = line.strip()683        if line.startswith('origin/') and not line.startswith('origin/HEAD'):684            on_remote.append(line)685    _verbose_output(686        verbose,687        'Result of on-remote check for commit {hash} is {remote}.',688        hash=commit_hash,689        remote=on_remote,690    )691    return on_remote692def _delete_local_tag(tag_name, verbose):693    _verbose_output(verbose, 'Deleting local tag {}...', tag_name)694    subprocess.check_call(695        ['git', 'tag', '-d', tag_name],696        stdout=sys.stdout,697        stderr=sys.stderr,698    )699    _verbose_output(verbose, 'Finished deleting local tag {}.', tag_name)700def _delete_remote_tag(tag_name, verbose):701    _verbose_output(verbose, 'Deleting remote tag {}...', tag_name)702    subprocess.check_call(703        ['git', 'push', 'origin', ':refs/tags/{}'.format(tag_name)],704        stdout=sys.stdout,705        stderr=sys.stderr,706    )707    _verbose_output(verbose, 'Finished deleting remote tag {}.', tag_name)708def _delete_last_commit(verbose):709    _verbose_output(verbose, 'Deleting last commit, assumed to be for version and changelog files...')710    extra_files = _get_extra_files_to_commit()711    subprocess.check_call(712        ['git', 'reset', '--soft', 'HEAD~1'],713        stdout=sys.stdout,714        stderr=sys.stderr,715    )716    subprocess.check_call(717        ['git', 'reset', 'HEAD', VERSION_FILENAME, CHANGELOG_FILENAME] + extra_files,718        stdout=sys.stdout,719        stderr=sys.stderr,720    )721    subprocess.check_call(722        ['git', 'checkout', '--', VERSION_FILENAME, CHANGELOG_FILENAME] + extra_files,723        stdout=sys.stdout,724        stderr=sys.stderr,725    )726    _verbose_output(verbose, 'Finished deleting last commit.')727def _revert_remote_commit(release_version, commit_hash, branch_name, verbose):728    _verbose_output(verbose, 'Rolling back release commit on remote branch "{}"...', branch_name)729    subprocess.check_call(730        ['git', 'revert', '--no-edit', '--no-commit', commit_hash],731        stdout=sys.stdout,732        stderr=sys.stderr,733    )734    release_message = 'REVERT: {}'.format(RELEASE_MESSAGE_TEMPLATE.format(release_version))735    subprocess.check_call(736        ['git', 'commit', '-m', release_message],737        stdout=sys.stdout,738        stderr=sys.stderr,739    )740    _verbose_output(verbose, 'Pushing changes to remote branch "{}"...', branch_name)741    subprocess.check_call(742        ['git', 'push', 'origin', '{0}:{0}'.format(branch_name)],743        stdout=sys.stdout,744        stderr=sys.stderr,745    )746    _verbose_output(verbose, 'Finished rolling back release commit.')747def _import_version_or_exit():748    if VERSION_FILE_IS_TXT:749        # if there is version.txt, use that750        with codecs.open(VERSION_FILENAME, 'rb', encoding='utf8') as version_txt:751            return version_txt.read()752    try:753        return __import__('{}.version'.format(MODULE_NAME), fromlist=[str('__version__')]).__version__754    except ImportError as e:755        import pprint756        _error_output_exit(757            'Could not import `__version__` from `{module}.version`. Error was "ImportError: {err}." Path is:\n{path}',758            module=MODULE_NAME,759            err=e.args[0],760            path=pprint.pformat(sys.path),...httpclient.py
Source:httpclient.py  
...55        super(VerboseHttp, self).__init__(**kwargs)56    def _request(self, conn, host, absolute_uri, request_uri, method, body,57                 headers, redirections, cachekey):58        """Display request parameters before requesting."""59        self._verbose_output('#### %s ####' % self.caption,60                             color=self.COLORMAP['caption'])61        self._verbose_output('%s %s' % (method, request_uri),62                             prefix=self.REQUEST_PREFIX,63                             color=self.COLORMAP['request'])64        self._print_header("Host", host, prefix=self.REQUEST_PREFIX)65        self._print_headers(headers, prefix=self.REQUEST_PREFIX)66        self._print_body(headers, body)67        (response, content) = httplib2.Http._request(68            self, conn, host, absolute_uri, request_uri, method, body,69            headers, redirections, cachekey70        )71        # Blank line for division72        self._verbose_output('')73        self._verbose_output('%s %s' % (response['status'], response.reason),74                             prefix=self.RESPONSE_PREFIX,75                             color=self.COLORMAP['status'])76        self._print_headers(response, prefix=self.RESPONSE_PREFIX)77        # response body78        self._print_body(response, content)79        self._verbose_output('')80        return (response, content)81    def _print_headers(self, headers, prefix=''):82        """Output request or response headers."""83        if self._show_headers:84            for key in headers:85                if key not in self.HEADER_BLACKLIST:86                    self._print_header(key, headers[key], prefix=prefix)87    def _print_body(self, headers, content):88        """Output body if not binary."""89        if self._show_body and utils.not_binary(90                utils.extract_content_type(headers)[0]):91            self._verbose_output('')92            self._verbose_output(93                utils.decode_response_content(headers, content))94    def _print_header(self, name, value, prefix='', stream=None):95        """Output one single header."""96        header = self.colorize(self.COLORMAP['header'], "%s:" % name)97        self._verbose_output("%s %s" % (header, value), prefix=prefix,98                             stream=stream)99    def _verbose_output(self, message, prefix='', color=None, stream=None):100        """Output a message."""101        stream = stream or self._stream102        if prefix and message:103            print(prefix, end=' ', file=stream)104        if color:105            message = self.colorize(color, message)106        print(message, file=stream)107def get_http(verbose=False, caption=''):108    """Return an Http class for making requests."""109    if verbose:110        body = True111        headers = True112        colorize = True113        stream = sys.stdout...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
