Best Python code snippet using autotest_python
scm.py
Source:scm.py  
...62def set_git_config(name,email):63    cfg = git.GitConfigParser([os.path.normpath(os.path.expanduser("~/.gitconfig"))], read_only=False)64    cfg.set_value('user','name',name)65    cfg.set_value('user','email',email)66def repo_check(repo, require_remote=False):67    if repo is None:68        raise TypeError('Not a git repository.')69    # TODO: no remote fail70    if not repo.remotes and require_remote:71        raise ValueError('No git remotes configured. Please add one.')72    # TODO: You're in a merge state.73    return repo74def get_current_branch_name(repo):75    """Returns current branch name"""76    repo_check(repo)77    return repo.head.ref.name78def clone_from(repourl, targetdir):79    try:80        return _repo.clone_from(repourl, targetdir)81    except (GitCommandError,) as e:82        raise CloneFailure('Cloning {0} failed'.format(repourl),e)83def Repo(path):84    """Returns the current Repo, based on path."""85    try:86        return _repo(path, search_parent_directories=True)87    except InvalidGitRepositoryError:88        pass89# def get_remote(repo):90#     repo_check(repo, require_remote=True)91#     reader = repo.config_reader()92#     # If there is no remote option in legit section, return default93#     if not reader.has_option('legit', 'remote'):94#         return repo.remotes[0]95#     remote_name = reader.get('legit', 'remote')96#     if not remote_name in [r.name for r in repo.remotes]:97#         raise ValueError('Remote "{0}" does not exist! Please update your git '98#                          'configuration.'.format(remote_name))99#     return repo.remote(remote_name)100def branch_name(repo):101    """Gets current branch name"""102    repo_check(repo)103    return repo.head.ref.name104def branches(repo, local=True, remote=True, excl=forbidden_branches):105    """Returns a list of local and remote branches."""106    repo_check(repo)107    r = [n for n in map(del_remote,map(gname,repo.remote().refs)) if n not in excl] if remote and repo.remotes else []108    l = [n for n in map(gname,repo.heads) if n not in excl] if local else []109    return [Branch(n, is_published=n in r, is_local=n in l) for n in sorted(set(r+l))]110def branch_names(repo, local=True, remote=True, excl=forbidden_branches):111    """Returns a list of local and remote branch names."""112    return [b.name for b in branches(repo, local=local, remote=remote, excl=excl)]113def get_branch(repo, branch=None, local=True, remote=True, excl=forbidden_branches):114    """Returns a list of local and remote branches."""115    branch = branch_name(repo) if branch is None else branch116    return {b.name:b for b in branches(repo, local=local, remote=remote, excl=excl)}.get(branch)117def get_branch_state(b):118    return 'tracked' if b.is_published and b.is_local else 'local' if b.is_local else 'remote'119def get_branch_items(branches,branch=None):120    """Get quick panel items from a list of branches"""121    return [[['  ','* '][branch==b.name]+b.name,'  '+get_branch_state(b)] for b in branches]122class RepoHelper(object):123    @property124    def repo(self):125        """discover the repo for a sublime command"""126        if hasattr(self, 'view'):127            window = self.view.window()128            view = self.view129        elif hasattr(self, 'window'):130            window = self.window131            view = self.window.active_view()132        else:133            return134        folders = []135        view_repo = view.settings().get('git_repo')136        if view_repo:137            folders.append(view_repo)138        cur_file = view.file_name()139        if cur_file:140            folders.append(os.path.dirname(cur_file))141        142        folders.extend(window.folders())143        for folder in folders:144            if os.path.exists(folder):145                repo = Repo(folder)146                if repo:147                    return repo148# def get_branches(repo, local=True, remote_branches=True):149#     """Returns a list of local and remote branches."""150#     repo_check(repo)151#     # print local152#     branches = []153#     if remote_branches:154#         # Remote refs.155#         try:156#             for b in repo.remote('origin').refs:157#                 name = b.name.split('/',1)[-1]158#                 if name not in forbidden_branches:159#                     branches.append(Branch(name, is_published=True))160#         except (IndexError, AssertionError):161#             pass162#     if local:163#         # Local refs.164#         for b in [h.name for h in repo.heads]:165#             if b not in [br.name for br in branches] or not remote_branches:166#                 if b not in forbidden_branches:167#                     branches.append(Branch(b, is_published=False))168#     return sorted(branches, key=attrgetter('name'))169# def get_branch_names(repo, local=True, remote_branches=True):170#     repo_check(repo)171#     branches = get_branches(repo, local=local, remote_branches=remote_branches)172#     return [b.name for b in branches]173def stash_it(repo, sync=False):174    repo_check(repo)175    msg = 'syncing branch' if sync else 'switching branches'176    return repo.git.stash('save', '--include-untracked', STASH_TEMPLATE.format(msg))177def unstash_index(repo, sync=False, branch=None):178    """Returns an unstash index if one is available."""179    repo_check(repo)180    stash_list = repo.git.stash('list')181    branch = branch_name(repo) if branch is None else branch182    intent = 'syncing branch' if sync else 'switching branches'183    legit_msg = STASH_TEMPLATE.format(intent)184    # stash_name_re = "^stash@{([0-9]+)}: On "+branch+": Legit: stashing before "+intent+"\.$"185    for stash in stash_list.splitlines():186        stash_id, on_branch, msg = stash.split(': ',2)187        # stash_match = re.match(stash_name_re, stash)188        if on_branch[3:]==branch and msg==legit_msg:189            return stash_id[7:-1]190            # return stash_match.groups()[0]191def unstash_it(repo, sync=False, branch=None):192    """Unstashes changes from current branch for branch sync."""193    repo_check(repo)194    stash_index = unstash_index(repo, sync=sync, branch=branch)195    if stash_index is not None:196        return repo.git.stash('pop', 'stash@{{{0}}}'.format(stash_index))197def fetch(repo):198    repo_check(repo)199    return repo.git.fetch('origin')200def is_empty(repo):201    """Check to see if a repo is empty"""202    203    repo_check(repo)204    return not (repo.head.is_valid() or any(repo.index.iter_blobs()))205def is_upstream_ahead(repo,branch=None):206    repo_check(repo, require_remote=True)207    branch = branch_name(repo) if branch is None else branch208    return any(repo.iter_commits(branch+'..'+branch+'@{u}'))209def merged(repo, branch, on_src=False):210    '''Checks to see if a branch is merged'''211    repo_check(repo)212    src, dst = ('HEAD^{}',branch+'^{}') if on_src else (branch+'^{}','HEAD^{}')213    return (repo.merge_base(src,dst) or [None])[0] == repo.rev_parse(src)214def smart_merge(repo, branch, allow_rebase=True, force_theirs=None):215    repo_check(repo)216    from_branch = branch_name(repo)217    merges = repo.git.log('--merges', '{0}..{1}'.format(branch, from_branch))218    if allow_rebase:219        verb = 'merge' if merges.count('commit') else 'rebase'220    else:221        verb = 'merge'222    try:223        if verb == 'merge' and force_theirs is not None:224            if force_theirs:225                # Overwrite our conflicting differences with theirs226                if not allow_rebase:227                    return repo.git.merge(branch,no_ff=True,X='theirs')228                return getattr(repo.git, verb)(branch,X='theirs')229            else:230                # Discard their conflicting changes being merged in231                if not allow_rebase:232                    return repo.git.merge(branch,no_ff=True,X='ours')233                return getattr(repo.git, verb)(branch,X='ours')234        elif not allow_rebase:235            return repo.git.merge(branch,no_ff=True)236        else:237            # Merge and abort if there are conflicts238            return getattr(repo.git, verb)(branch)239    except GitCommandError as why:240        log = getattr(repo.git, verb)('--abort')241        abort('Merge failed. Reverting.', log='{0}\n{1}'.format(why, log), type='merge')242        raise why243def smart_pull(repo):244    'git log --merges origin/master..master'245    repo_check(repo)246    branch = branch_name(repo)247    fetch(repo)248    return smart_merge(repo, '{0}/{1}'.format('origin', branch))249def commit(repo, message, *files, all_files=True, untracked_files=True):250    repo_check(repo)251    if repo.is_dirty() or repo.untracked_files:252        if all_files:253            if untracked_files:254                repo.git.add(all=True)255            else:256                repo.git.add(update=True)257        elif files:258            repo.index.add(files)259        else:260            return261    # Check that something is to be committed262    if repo.head.is_valid():263        if not repo.index.diff("HEAD"):264            return265    else:266        if not list(repo.index.iter_blobs()):267            return268    return repo.index.commit(message)269def undo(repo):270    repo_check(repo)271    repo.head.reset('HEAD^')272def is_upstream_behind(repo,branch=None):273    repo_check(repo, require_remote=True)274    branch = branch_name(repo) if branch is None else branch275    return any(repo.iter_commits(branch+'@{u}..'+branch))276def push(repo, branch=None):277    repo_check(repo, require_remote=True)278    branch = branch_name(repo) if branch is None else branch279    if branch in repo.heads:280        try:281            repo.remotes.origin.push('{0}:{0}'.format(branch,branch))282        except GitCommandError:283            raise PushFailure("")284        if not repo.heads[branch].tracking_branch():285            repo.heads[branch].set_tracking_branch(repo.remotes.origin.refs[branch])286def branch_on_remote(repo, branch):287    # Determine if a branch is in a remote repository288    repo_check(repo, require_remote=True)289    try:290        return repo.git.ls_remote('origin', branch,heads=True)291    except GitCommandError:292        raise RemoteFailure('Cannot contact remote repository',next(repo.remotes.origin.urls))293def checkout_branch(repo, branch):294    """Checks out given branch."""295    repo_check(repo)296    repo.heads[branch].checkout()297def track_branch(repo, branch):298    "Creates a new branch that tracks a remote branch"299    repo_check(repo, require_remote=True)300    remote = repo.remotes.origin.refs[branch]301    return repo.create_head(branch, remote).set_tracking_branch(remote)302def sprout_branch(repo, branch, off_branch=None):303    """Creates branch from current unless provided."""304    repo_check(repo)305    off_branch = branch_name(repo) if off_branch is None else off_branch306    return repo.create_head(branch, repo.heads[off_branch])307def destroy_branch(repo, branch):308    '''Delete a branch both locally and remotely'''309    repo_check(repo)310    head = get_branch(repo, branch)311    if head is None:312        return313    if head.name == branch_name(repo):314        return315    index = unstash_index(repo, branch=head.name)316    if index:317        repo.git.stash('drop', index)318    if head.is_published:319        repo.git.push('origin',head.name,delete=True)320    if head.is_local:321        repo.git.branch(head.name,D=True)322# def graft_branch(repo, branch):323#     """Merges branch into current branch, and deletes it."""324#     repo_check(repo)325#     log = []326#     try:327#         msg = repo.git.merge('--no-ff', branch)328#         log.append(msg)329#     except GitCommandError as why:330#         log = repo.git.merge('--abort')331#         abort('Merge failed. Reverting.', log='{0}\n{1}'.format(why, log), type='merge')332#     out = repo.git.branch('-D', branch)333#     log.append(out)334#     return '\n'.join(log)335# def unpublish_branch(repo, branch):336#     """Unpublishes given branch."""337#     repo_check(repo)338#     try:339#         return repo.git.push('origin', ':{0}'.format(branch))340#     except GitCommandError:341#         _, _, log = repo.git.fetch('origin', '--prune', with_extended_output=True)342#         abort('Unpublish failed. Fetching.', log=log, type='unpublish')343# def publish_branch(repo, branch):344#     """Publishes given branch."""345#     repo_check(repo)...test_git_commit_one_file.py
Source:test_git_commit_one_file.py  
1# -*- coding: utf-8 -*-2import os3from vilya.models.project import CodeDoubanProject4from vilya.models import git5from tests.base import TestCase6from tests.utils import mkdtemp7from vilya.libs import gyt8from vilya.libs.permdir import get_repo_root9class TestGit(TestCase):10    @property11    def u(self):12        return self.addUser()13    def _path(self, name):14        return os.path.join(get_repo_root(), '%s.git' % name)15    def _path_work_tree(self, name):16        return os.path.join(get_repo_root(), '%s.work_tree' % name)17    def _repo(self, name, bare=True):18        git_path = self._path(name)19        if bare:20            work_tree_path = None21        else:22            work_tree_path = self._path_work_tree(name)23            if not os.path.exists(work_tree_path):24                os.mkdir(work_tree_path)25        try:26            CodeDoubanProject.create_git_repo(git_path)27        except:28            pass29        repo = git.GitRepo(git_path, work_tree=work_tree_path)30        return repo31    def _commit(self, repo, filename, content='testcontent',32                message='testmessage'):33        # TODO allow commiting more than one file34        assert os.path.exists(repo.work_tree), \35            "repo.work_tree must exist, check if repo has been created with bare=False"  # noqa36        path = os.path.join(repo.work_tree, filename)37        dir_ = os.path.dirname(path)38        if not os.path.exists(dir_):39            os.makedirs(os.path.dirname(path))40        f = open(path, 'w')41        f.write(content)42        f.close()43        rep2 = gyt.repo(repo.path, repo.work_tree, bare=False)44        rep2.call(['add', filename])45        rep2.call(['commit', filename, '-m', message], _env=self.env_for_git)46        return gyt.repo(repo.path).sha()47    def test_simple_commit(self):48        repo = self._repo('test', bare=False)49        self._commit(repo, 'testfile1', 'content1', 'msg1')50        src = repo.get_src('testfile1')51        assert src == ('blob', u'content1')52        repo.commit_one_file('testfile1', 'content1 modified',53                             'change1', self.u, orig_hash=hash('content1'))54        src = repo.get_src('testfile1')55        assert src == ('blob', u'content1 modified')56    def test_simple_commit_do_not_delete_other_files(self):57        repo = self._repo('test', bare=False)58        self._commit(repo, 'testfile1', 'content1', 'msg1')59        self._commit(repo, 'testfile2', 'content2', 'msg2')60        repo.commit_one_file('testfile1', 'content1 modified',61                             'change1', self.u, orig_hash=hash('content1'))62        src = repo.get_src('testfile1')63        assert src == ('blob', u'content1 modified')64        type_, files = repo.get_src('')65        assert any(d['path'] == 'testfile2' for d in files), \66            "testfile2 should exists in root tree"67        src = repo.get_src('testfile2')68        assert src == ('blob', u'content2')69    def test_commit_in_inner_directory(self):70        repo = self._repo('test', bare=False)71        self._commit(repo, 'test/file1', 'content1', 'msg1')72        src = repo.get_src('test/file1')73        assert src == ('blob', u'content1')74        repo.commit_one_file('test/file1', 'content1 modified',75                             'change1', self.u, orig_hash=hash('content1'))76        src = repo.get_src('test/file1')77        assert src == ('blob', u'content1 modified')78    def test_create_file(self):79        repo = self._repo('test', bare=False)80        self._commit(repo, 'file1', 'content1', 'msg1')81        repo.commit_one_file(82            'file2', 'content2 created', 'create1', self.u)83        assert repo.cat('HEAD:file1') == 'content1'84        assert repo.cat('HEAD:file2') == 'content2 created'85    def test_create_first_file(self):86        repo = self._repo('test', bare=False)87        repo.commit_one_file(88            'file1', 'content1 created', 'create1', self.u)89        assert repo.cat('HEAD:file1') == 'content1 created'90    def test_create_first_file_and_more(self):91        repo = self._repo('test', bare=False)92        repo.commit_one_file(93            'file1', 'content1 created', 'create1', self.u)94        repo.commit_one_file(95            'file2', 'content2 created', 'create2', self.u)96        repo.commit_one_file(97            'file3', 'content3 created', 'create3', self.u)98        repo.commit_one_file(99            'file4', 'content4 created', 'create4', self.u)100        assert repo.cat('HEAD:file1') == 'content1 created'101        assert repo.cat('HEAD:file2') == 'content2 created'102        assert repo.cat('HEAD:file3') == 'content3 created'103        assert repo.cat('HEAD:file4') == 'content4 created'104    def test_commit_file_on_dirty_index(self):105        repo = self._repo('test', bare=False)106        repo.commit_one_file(107            'file1', 'content1 created', 'create1', self.u)108        repo.commit_one_file(109            'file2', 'content2 created', 'create2', self.u)110        repo.commit_one_file(111            'file1', 'content1 modified', 'modify1', self.u)112        # Now artificially rewind the index tree state113        repo.call('read-tree HEAD^')114        repo.commit_one_file(115            'file2', 'content2 modified', 'modify2', self.u)116        # the latest commit should not have anything related to file1117        assert 'file1' not in repo.call('log -p -n1')118    def test_create_file_in_dir(self):119        repo = self._repo('test', bare=False)120        self._commit(repo, 'test/file1', 'content1', 'msg1')121        repo.commit_one_file(122            'test/file2', 'content2 created', 'create1', self.u)123        assert repo.cat('HEAD:test/file1') == 'content1'124        assert repo.cat('HEAD:test/file2') == 'content2 created'125    def test_simple_commit_in_branch(self):126        repo = self._repo('test', bare=False)127        self._commit(repo, 'testfile1', 'content1', 'msg1')128        tmp_branch = repo.temp_branch_name()129        repo.commit_one_file('testfile1', 'content1 modified', 'change1',130                             self.u, orig_hash=hash('content1'),131                             branch=tmp_branch)132        with mkdtemp() as tmpdir:133            gyt.call(['git', 'clone', repo.path, tmpdir])134            repo_check = gyt.repo(tmpdir, bare=False)135            src = repo_check.call('show HEAD:testfile1')136            assert src == u'content1'137            repo_check.call('checkout master')138            src = repo_check.call('show HEAD:testfile1')139            assert src == u'content1'140            repo_check.call('checkout %s' % tmp_branch)141            src = repo_check.call('show HEAD:testfile1')142            assert src == u'content1 modified'143            repo_check.call('checkout master')144            src = repo_check.call('show HEAD:testfile1')145            assert src == u'content1'146    def test_simple_commit_in_branch_in_subdir(self):147        repo = self._repo('test', bare=False)148        self._commit(repo, 'test/file1', 'content1', 'msg1')149        tmp_branch = repo.temp_branch_name()150        repo.commit_one_file('test/file1', 'content1 modified', 'change1',151                             self.u, orig_hash=hash('content1'),152                             branch=tmp_branch)153        with mkdtemp() as tmpdir:154            gyt.call(['git', 'clone', repo.path, tmpdir])155            repo_check = gyt.repo(tmpdir, bare=False)156            src = repo_check.call('show HEAD:test/file1')157            assert src == u'content1'158            repo_check.call('checkout master')159            src = repo_check.call('show HEAD:test/file1')160            assert src == u'content1'161            repo_check.call('checkout %s' % tmp_branch)162            src = repo_check.call('show HEAD:test/file1')163            assert src == u'content1 modified'164            repo_check.call('checkout master')165            src = repo_check.call('show HEAD:test/file1')166            assert src == u'content1'167    def test_simple_commit_in_branch_creates_branch(self):168        repo = self._repo('test', bare=False)169        self._commit(repo, 'testfile1', 'content1', 'msg1')170        assert repo.get_branches() == ['master']171        tmp_branch = repo.temp_branch_name()172        repo.commit_one_file('testfile1', 'content1 modified', 'change1',173                             self.u, orig_hash=hash('content1'),174                             branch=tmp_branch)175        assert repo.get_branches() == ['master', tmp_branch]176    def test_simple_commit_in_branch_and_delete_branch(self):177        repo = self._repo('test', bare=False)178        self._commit(repo, 'testfile1', 'content1', 'msg1')179        tmp_branch = repo.temp_branch_name()180        repo.commit_one_file('testfile1', 'content1 modified', 'change1',181                             self.u, orig_hash=hash('content1'),182                             branch=tmp_branch)183        assert tmp_branch in repo.get_branches()184        repo.remove_temp_branch(tmp_branch)185        assert tmp_branch not in repo.get_branches()186        assert repo.get_branches() == ['master']187    def test_simple_commit_in_another_branch(self):188        repo = self._repo('test', bare=False)189        self._commit(repo, 'testfile1', 'content1', 'msg1')190        branch = 'mybranch'191        repo.commit_one_file('testfile1', 'content1 modified', 'change1',192                             self.u, orig_hash=hash('content1'), branch=branch)193        assert branch in repo.get_branches()...CollectMapping.py
Source:CollectMapping.py  
1from NoteBookMapping import mapping,group2from CheckRepo import RepoCheck3from RepoNotebooks import RepoNotebook4from Data import TEST_REPO,REPOS5from CollectNotebook import name6from Config import CURRENT_FILE,Notebook,CellMapping,SPLIT,SAVE_FOLDER7mapping_name = lambda x,y,index : f'{index}{SPLIT}{x}{SPLIT}{y}.txt'8class CollectMapping:9    def __init__(self,repo_check:RepoCheck):10        self.repo_check = repo_check11    def saveMapping(self,output,old_path,new_path,index):12        name = lambda x: x.split("/")[-1].split(".")[0][:63]13        mname = mapping_name(name(old_path),name(new_path),index)14        with open(f'{CURRENT_FILE}/mapping_cache/{mname}','w') as file:15            for i in output:16                if type(i[1]) != list:17                    file.write(f'{i[0]},{i[1]}\n')18                else:19                    if len(i[1]) == 1:20                        file.write(f'{i[0]},{i[1][0]}m\n')21                    else:22                        file.write(f'{i[0]},{",".join([str(j) for j in i[1]])}\n')23    def getMapping(self,version):24        i,j = 0,125        index = 026        while j < len(version):27            old_path = f'{self.repo_check.notebook_cache_path}/{name(version[i][1],version[i][0])}'28            new_path = f'{self.repo_check.notebook_cache_path}/{name(version[j][1],version[j][0])}'29            old,new = Notebook(old_path),Notebook(new_path)30            output = CellMapping(old,new)31            self.saveMapping(output,old_path,new_path,index)32            i += 133            j += 134            index += 135    def run(self):36        _,_,output = group(self.repo_check.data())37        for i in output:38            self.getMapping(i)39            40if __name__ == "__main__":41    rpn = RepoCheck('frnsys#ai_notes')42    cm = CollectMapping(rpn)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
