How to use commit method in localstack

Best Python code snippet using localstack_python

rit_lib.py

Source:rit_lib.py Github

copy

Full Screen

...192 self._branch_names = cleared_rit._branch_names193 self._commit_ids = cleared_rit._commit_ids194 self._short_commit_tree = cleared_rit._short_commit_tree195 ''' SET '''196 def add_commit(self, commit: Commit):197 ''' add the commit to the rit dir '''198 if self.prevent_mutations:199 raise RitError("Doing this would mutate the rit directory, and that is disabled for this RitResource")200 self._write_commit(commit)201 self._clear()202 def prune(self):203 leaf_commits = set()204 if self.head.commit_id is not None:205 leaf_commits.add(self.head.commit_id)206 for branch_name in self.get_branch_names():207 leaf_commits.add(self.get_branch(branch_name, ensure=True).commit_id)208 refed_commits = set(leaf_commits)209 for leaf_commit in leaf_commits:210 while True:211 commit = self.get_commit(leaf_commit, ensure=True)212 if commit.parent_commit_id is None or commit.parent_commit_id in refed_commits:213 break214 refed_commits.add(commit.parent_commit_id)215 leaf_commit = commit.parent_commit_id216 removed_commit_ids = []217 for commit_id in self.get_commit_ids():218 if commit_id in refed_commits:219 continue220 self._delete_commit(commit_id)221 removed_commit_ids.append(commit_id)222 return removed_commit_ids223 def set_branch(self, branch: Branch):224 ''' add the branch to the rit dir '''225 if self.prevent_mutations:226 raise RitError("Doing this would mutate the rit directory, and that is disabled for this RitResource")227 self._write_branch(branch)228 self._clear()229 def set_head(self, head: HeadNode):230 ''' set the new head point '''231 if self.prevent_mutations:232 raise RitError("Doing this would mutate the rit directory, and that is disabled for this RitResource")233 self._write_head(head)234 self._clear()235 ''' GET '''236 @property237 def paths(self):238 ''' returns a RitPaths object for this rit directory '''239 if self._paths is None:240 self._paths = self._read_paths()241 return self._paths242 @property243 def head(self):244 ''' returns the HeadNode object for this rit directory '''245 if self._head is None:246 self._head = self._read_head()247 return self._head248 def get_head_commit_id(self):249 ''' get the commit id of the head, None if there isn't one '''250 head = self.head251 if head.commit_id is not None:252 return head.commit_id253 else:254 branch = self.get_branch(head.branch_name)255 if branch is not None:256 return branch.commit_id257 else:258 return None259 def get_commit_ids(self):260 ''' get all commit ids '''261 if self._commit_ids is None:262 self._commit_ids = self._read_commit_ids()263 return self._commit_ids264 def get_commit(self, commit_id: str, *, ensure=False):265 '''266 get commit object of a commit id267 return a commit if found268 otherwise return None, if ensure is set to True, the raise instead of return None.269 '''270 if commit_id not in self._commits:271 try:272 self._commits[commit_id] = self._read_commit(commit_id)273 except FileNotFoundError:274 if ensure:275 raise RitError("Unable to load expected commit")276 return None277 return self._commits[commit_id]278 def is_commit(self, commit_id: str):279 ''' return True if commit_id has a commit '''280 return self.get_commit(commit_id) is not None281 def get_branch_names(self):282 ''' return names of all branches '''283 if self._branch_names is None:284 self._branch_names = self._read_branch_names()285 return self._branch_names286 def get_branch(self, name: str, *, ensure=False):287 '''288 query for branch by name289 returns None if not found. raises RitError instead if ensure is True290 '''291 if name not in self._branches:292 try:293 self._branches[name] = self._read_branch(name)294 except FileNotFoundError:295 if ensure:296 raise RitError("Unable to load expected branch")297 return None298 return self._branches[name]299 def is_branch(self, name: str):300 ''' return True if the branch name has a branch '''301 return self.get_branch(name) is not None302 def get_branch_name_to_commit_ids(self):303 ''' see _populate_commit_to_branch_map '''304 if self._branch_name_to_commit_ids is None:305 self._populate_commit_to_branch_map()306 return self._branch_name_to_commit_ids307 def get_commit_id_to_branch_names(self):308 ''' see _populate_commit_to_branch_map '''309 if self._commit_id_to_branch_names is None:310 self._populate_commit_to_branch_map()311 return self._commit_id_to_branch_names312 def get_commit_tree(self):313 '''314 returns a map of shortened commit prefixes to a list of all commit ids with315 that prefix316 the tree is used to find full commit ids given partial ones.317 '''318 if self._short_commit_tree is None:319 self._short_commit_tree = defaultdict(list)320 for commit_id in self.get_commit_ids():321 self._short_commit_tree[commit_id[:short_hash_index]].append(commit_id)322 return self._short_commit_tree323 ''' helpers '''324 def _populate_commit_to_branch_map(self):325 '''326 populate the commit <-> branch maps327 branches include the HEAD node328 if a branch doesn't have a commit, the commit and branch will not have329 entries in the maps330 '''331 branch_names = self.get_branch_names()332 self._branch_name_to_commit_ids = {}333 self._commit_id_to_branch_names = defaultdict(list)334 for branch_name in branch_names:335 branch = self.get_branch(branch_name, ensure=True)336 commit_id = branch.commit_id337 self._branch_name_to_commit_ids[branch_name] = commit_id338 self._commit_id_to_branch_names[commit_id].append(branch_name)339 head_commit_id = self.get_head_commit_id()340 if head_commit_id is not None:341 self._branch_name_to_commit_ids[head_ref_name] = head_commit_id342 self._commit_id_to_branch_names[head_commit_id].append(head_ref_name)343 ''' IO '''344 def _read_paths(self):345 root_rit_dir = os.path.realpath(self.root_rit_dir)346 rit_dir = os.path.join(root_rit_dir, rit_dir_name)347 last_root_rit_dir = None348 while not os.path.isdir(rit_dir):349 last_root_rit_dir = root_rit_dir350 root_rit_dir = os.path.dirname(root_rit_dir)351 if last_root_rit_dir == root_rit_dir:352 raise RitError("Unable to locate rit directory")353 rit_dir = os.path.join(root_rit_dir, rit_dir_name)354 return RitPaths.build_rit_paths(root_rit_dir)355 def _read_head(self):356 try:357 with open(os.path.join(self.paths.rit_dir, head_ref_name)) as fin:358 return HeadNode(**json.load(fin))359 except FileNotFoundError:360 return HeadNode(None, default_branch_name)361 def _read_commit(self, commit_id: str):362 logger.debug("Reading commit: %s", commit_id)363 with open(os.path.join(self.paths.commits, commit_id)) as fin:364 return Commit(**dict(**json.load(fin), commit_id=commit_id))365 def _write_commit(self, commit: Commit):366 logger.debug("Writing commit: %s", commit.commit_id)367 with open(os.path.join(self.paths.commits, commit.commit_id), 'w') as fout:368 data = asdict(commit)369 del data['commit_id']370 json.dump(data, fout)371 def _delete_commit(self, commit_id: str):372 try:373 os.remove(os.path.join(self.paths.commits, commit_id))374 except FileNotFoundError:375 raise RitError("Failed to remove commit since it didn't exist: %s", commit_id)376 try:377 os.remove(get_tar_path(self, commit_id))378 except FileNotFoundError:379 raise RitError("Failed to remove commit's tar since it didn't exist: %s", commit_id)380 try:381 os.remove(get_snar_path(self, commit_id))382 except FileNotFoundError:383 raise RitError("Failed to remove commit's snar since it didn't exist: %s", commit_id)384 def _read_branch(self, name: str):385 logger.debug("Reading branch: %s", name)386 with open(os.path.join(self.paths.branches, name)) as fin:387 branch = json.load(fin)388 return Branch(**dict(**branch, name=name))389 def _write_branch(self, branch: Branch):390 logger.debug("Writing branch %s to %s", branch.name, branch.commit_id)391 if self.is_commit(branch.name):392 raise RitError('Not creating a branch with the same name as a commit id: %s', branch.name)393 data = asdict(branch)394 del data['name']395 with open(os.path.join(self.paths.branches, branch.name), 'w') as fout:396 json.dump(data, fout)397 def _read_branch_names(self):398 for _, _, branch_names in os.walk(self.paths.branches):399 return branch_names400 return []401 def _read_commit_ids(self):402 for _, _, commit_ids in os.walk(self.paths.commits):403 return commit_ids404 return []405 def _write_head(self, head: HeadNode):406 with open(os.path.join(self.paths.rit_dir, head_ref_name), 'w') as fout:407 json.dump(asdict(head), fout)408''' UTIL '''409def colorize(color: int, msg: str):410 reset_seq = "\033[0m"411 color_seq = "\033[1;{}m"412 color_section = color_seq + "{}" + reset_seq413 return color_section.format(color, msg)414def mkdir(*args, exists_ok=False, **kwargs):415 try:416 os.mkdir(*args, **kwargs)417 except FileExistsError:418 if not exists_ok:419 raise420def none_t():421 def none_type(obj):422 return obj is None423 return none_type424def exact_t(*types):425 def exact_type(obj):426 return isinstance(obj, types)427 return exact_type428def optional_t(obj_t):429 def optional_type(obj):430 if obj is None:431 return True432 else:433 return obj_t(obj)434 return optional_type435def list_t(obj_t):436 def list_type(obj):437 if isinstance(obj, list):438 return all(obj_t(val) for val in obj)439 return False440 return list_type441def check_types(**type_defs):442 for name, (obj, type_def) in type_defs.items():443 if not type_def(obj):444 raise TypeError(f"Element had invalid type: {name}: {type(obj)}")445def check_obj_types(obj, type_defs):446 objs = {}447 for key, type_def in type_defs.items():448 objs[key] = (getattr(obj, key), type_def)449def require(statement, msg, *args):450 if not statement:451 raise RitError(msg, *args)452''' RIT DIR HELPERS '''453def get_tar_path(rit: RitResource, commit_id: str):454 ''' get a commit's tar path, where the backup for that commit is '''455 return os.path.join(rit.paths.backups, commit_id + '.tar')456def get_snar_path(rit: RitResource, commit_id: str):457 ''' get a commit's star path, where the backup's metadata for that commit is '''458 return os.path.join(rit.paths.backups, commit_id + '.snar')459''' COMMIT HELPERS '''460def hash_commit(create_time: float, msg: str, snar: str, tar: str):461 ''' create a commit_id from '''462 logger.debug("Calculating the hash of ref")463 ref_hash = hashlib.sha1()464 ref_hash.update(b'create_time')465 ref_hash.update(str(create_time).encode('utf-8'))466 ref_hash.update(b'msg')467 ref_hash.update(msg.encode('utf-8'))468 ref_hash.update(b'snar')469 with open(snar, 'rb') as fin:470 ref_hash.update(fin.read())471 ref_hash.update(b'tar')472 with open(tar, 'rb') as fin:473 ref_hash.update(fin.read())474 return ref_hash.hexdigest()475def check_tar():476 ''' verify tar is the correct version and GNU '''477 logger.debug("Checking tar version")478 process = subprocess.Popen(['tar', '--version'], stdout=subprocess.PIPE)479 contents = process.stdout.read()480 process.wait()481 version = contents.decode('utf-8').split('\n', 1)[0]482 logger.debug("Tar Version: %s", version)483 assert 'GNU tar' in version, "You must have a GNU tar installed"484def status_tar(rit: RitResource, verbose: bool):485 ''' returns True if rit directory is dirty '''486 parent_commit_id = rit.get_head_commit_id()487 work_snar = os.path.join(rit.paths.work, 'ref.snar')488 if parent_commit_id is not None:489 head_snar = get_snar_path(rit, parent_commit_id)490 # TODO: move into rit resource491 shutil.copyfile(head_snar, work_snar)492 check_tar()493 tar_cmd = ['tar', '-cvg', work_snar, f'--exclude={rit_dir_name}', '-f', os.devnull, '.']494 logger.debug("Running tar command: %s", tar_cmd)495 # TODO: move into rit resource496 process = subprocess.Popen(tar_cmd, cwd=rit.paths.root, stdout=subprocess.PIPE)497 terminated = False498 dirty = False499 while True:500 line = process.stdout.readline()501 if not line:502 break503 if line == b'./\n':504 continue505 dirty = True506 if verbose:507 output = line.decode('utf-8').strip()508 logger.info("\t- %s", colorize(fg + red, output))509 else:510 terminated = True511 try:512 process.terminate()513 except Exception:514 pass515 break516 # still need to read the stdout to prevent blocking and therefore deadlock517 if terminated:518 while process.stdout.read(DEFAULT_BUFFER_SIZE):519 pass520 exit_code = process.wait()521 if not terminated and exit_code != 0:522 raise RitError("Creating commit's tar failed with exit code: %d", exit_code)523 os.remove(work_snar)524 return dirty525def create_commit(rit: RitResource, create_time: float, msg: str):526 ''' create a commit with the current head as the parent commit (if any) '''527 parent_commit_id = rit.get_head_commit_id()528 logger.debug("Parent ref: %s", parent_commit_id)529 work_tar = os.path.join(rit.paths.work, 'ref.tar')530 logger.debug("Working tar: %s", work_tar)531 work_snar = os.path.join(rit.paths.work, 'ref.snar')532 logger.debug("Working snar: %s", work_snar)533 if parent_commit_id is not None:534 head_snar = get_snar_path(rit, parent_commit_id)535 logger.debug("Copying previous snar: %s", head_snar)536 # TODO: move into rit resource537 shutil.copyfile(head_snar, work_snar)538 else:539 logger.debug("Using fresh snar file since no parent commit")540 check_tar()541 opts = '-cz'542 if logger.getEffectiveLevel() <= logging.DEBUG:543 opts += 'v'544 opts += 'g'545 tar_cmd = ['tar', opts, work_snar, f'--exclude={rit_dir_name}', '-f', work_tar, '.']546 logger.debug("Running tar command: %s", tar_cmd)547 # TODO: move into rit resource548 process = subprocess.Popen(tar_cmd, cwd=rit.paths.root)549 # TODO: doesn't forward SIGTERM, only SIGINT550 exit_code = process.wait()551 if exit_code != 0:552 raise RitError("Creating commit's tar failed with exit code: %d", exit_code)553 commit_id = hash_commit(create_time, msg, work_snar, work_tar)554 logger.debug("Moving working snar into backups directory")555 snar = get_snar_path(rit, commit_id)556 os.rename(work_snar, snar)557 logger.debug("Moving working tar into backups directory")558 tar = get_tar_path(rit, commit_id)559 os.rename(work_tar, tar)560 commit = Commit(parent_commit_id, commit_id, create_time, msg)561 rit.add_commit(commit)562 if rit.head.commit_id is not None:563 new_head = HeadNode(commit_id=commit_id, branch_name=None)564 rit.set_head(new_head)565 else:566 rit.set_branch(Branch(rit.head.branch_name, commit_id))567 return commit568''' RESET HELPERS '''569def apply_commit(rit: RitResource, commit: Commit):570 '''571 apply the commit to the rit directory572 If the current rit directory isn't clean and isn't the parent of the commit573 being applied, the results will not be the contents of commit.574 See restore_to_commit.575 '''576 logger.info("Applying commit: %s", commit.commit_id)577 tar_file = get_tar_path(rit, commit.commit_id)578 tar_cmd = ['tar', '-xg', os.devnull, '-f', tar_file]579 # rit resource thing?580 process = subprocess.Popen(tar_cmd, cwd=rit.paths.root)581 exit_code = process.wait()582 if exit_code != 0:583 raise RitError("Failed while trying to apply commit: %s", commit.commit_id)584def restore_to_commit(rit: RitResource, commit: Commit):585 '''586 This gets the chain of commits from commit to root and applies them. If there587 are changes in the working directory relative to current head, then those will588 be destroyed.589 '''590 logger.debug('resetting to %s', commit.commit_id)591 commit_chain = [commit]592 while commit.parent_commit_id is not None:593 commit = rit.get_commit(commit.parent_commit_id, ensure=True)594 commit_chain.append(commit)595 commit_chain.reverse()596 for commit in commit_chain:597 apply_commit(rit, commit)598''' BRANCH HELPERS '''599@dataclass600class ResolvedRef:601 '''602 the user provides a ref, which can reference head, a branch or commit. this603 object contains the HeadNode, Branch and Commit that the user's ref is604 referring to. all 3 or None can be defined.605 see resolve_ref606 '''607 commit: Optional[Commit] = None608 '''609 if None:610 if head:611 head points to a branch with no commit612 else:613 ref doesn't refer to a branch or commit614 else:615 ref ultimately refers to this commit616 '''617 branch: Optional[Branch] = None618 '''619 if head is None620 if branch is None:621 ref doesn't refer to a branch622 else:623 ref points to this branch624 else:625 if branch is None:626 head points to a commit.627 head points to a branch with no commit.628 else:629 head points to this branch630 '''631 head: Optional[HeadNode] = None632 '''633 if head is None:634 ref was provided and not the head635 else:636 ref was omitted or explicitly set to the head637 '''638def resolve_commit(rit: RitResource, partial_commit_id: str):639 '''640 resolve a user provided commit id to a commit. if no commit is found, then641 return None. if the ref is an ambiguous shortened commit id, then this642 function raises an exception.643 '''644 logger.debug("Resolving commit: %s", partial_commit_id)645 commit = rit.get_commit(partial_commit_id)646 if commit is not None:647 return commit648 if len(partial_commit_id) < short_hash_index:649 return None650 short_commit_id = partial_commit_id[:short_hash_index]651 commit_tree = rit.get_commit_tree()652 if short_commit_id not in commit_tree:653 return None654 commit = None655 for commit_id in commit_tree[short_commit_id]:656 size = len(partial_commit_id)657 if partial_commit_id == commit_id[:size]:658 if commit is not None:659 raise RitError("Reference %s matched commits %s and %s", partial_commit_id, commit.commit_id, commit_id)660 commit = rit.get_commit(commit_id, ensure=True)661 return commit662def resolve_ref(rit: RitResource, ref: Optional[str]):663 '''664 resolve a user provided reference665 if ref is None, then ref refers to the current head. ref can also explicitly666 refer to the current head. ref otherwise refers to a branch. if the branch is667 not found, then the ref refers to a commit. if no commit is found, all 3668 fields of ResolvedRef will be None.669 if the ref is an ambiguous shortened commit id, then this function raises an670 exception.671 see the def of ResolvedRef672 '''673 logger.debug("Resolving ref: %s", ref)674 res = ResolvedRef()675 if ref is None or ref == head_ref_name:676 head = rit.head677 res.head = head678 if head.branch_name is not None:679 res.branch = rit.get_branch(head.branch_name)680 if res.branch is not None:681 res.commit = rit.get_commit(res.branch.commit_id)682 else:683 res.commit = rit.get_commit(head.commit_id)684 else:685 res.branch = rit.get_branch(ref)686 if res.branch is not None:687 res.commit = rit.get_commit(res.branch.commit_id)688 else:689 res.commit = resolve_commit(rit, ref)690 return res691def resolve_refs(rit: RitResource, refs: list[str], all: bool):692 '''693 Returns information regarding the provided refs694 '''695 resolved_refs: list[ResolvedRef] = []696 if not refs:697 refs.append(None)698 if all:699 refs.extend(rit.get_branch_names())700 for ref in refs:701 res = resolve_ref(rit, ref)702 resolved_refs.append(res)703 return resolved_refs704branch_name_re = re.compile('^\\w+$')705def validate_branch_name(name: str):706 ''' return whether this string is a valid branch name '''707 if name == head_ref_name:708 raise RitError("Branch can't be named the same as the head ref: %s", name)709 elif branch_name_re.search(name) is None:710 raise RitError("Invalid branch name: %s", name)711def _pprint_dur(dur: int, name: str):712 return f"{dur} {name}{'s' if dur > 1 else ''}"713def pprint_time_duration(start: float, end: float):714 ''' pretty print a time duration '''715 start_dt = datetime.datetime.fromtimestamp(start)716 end_dt = datetime.datetime.fromtimestamp(end)717 dur = end - start718 dur_sec = dur719 dur_min = dur / 60720 dur_hour = dur_min / 60721 dur_day = dur_hour / 24722 dur_month = 12 * (end_dt.year - start_dt.year) + (end_dt.month - start_dt.month)723 dur_year = dur_month // 12724 parts = []725 if dur_year >= 5:726 parts.append(_pprint_dur(int(dur_year), 'year'))727 elif dur_year >= 1:728 parts.append(_pprint_dur(int(dur_year), 'year'))729 parts.append(_pprint_dur(int(dur_month) % 12, 'month'))730 elif dur_month >= 1:731 parts.append(_pprint_dur(int(dur_month) % 12, 'month'))732 elif dur_day >= 1:733 parts.append(_pprint_dur(int(dur_day), 'day'))734 elif dur_hour >= 1:735 parts.append(_pprint_dur(int(dur_hour) % 60, 'hour'))736 elif dur_min >= 1:737 parts.append(_pprint_dur(int(dur_min) % 60, 'minute'))738 elif dur_sec >= 20:739 parts.append(_pprint_dur(int(dur_sec) % 60, 'second'))740 else:741 return 'Just now'742 return ', '.join(parts) + ' ago'743''' SUB LOG COMMANDS '''744def log_commits(rit: RitResource, commits: list[Commit]):745 '''746 Returns tuple of the following:747 - commit_graph: a tree containing all provided commits748 - leafs: the set of leaf notes of the tree749 - commit_id_to_commit: a map of each commit to a full Commit object750 - commit_id_to_branch_names: a map of each commit_id to branch names, including the head_ref_name751 '''752 leafs: set[str] = set()753 commit_graph: dict[str, str] = {}754 for commit in commits:755 if commit.commit_id not in commit_graph:756 leafs.add(commit.commit_id)757 while True:758 commit_graph[commit.commit_id] = commit.parent_commit_id759 if commit.parent_commit_id is None:760 break761 parent_commit = rit.get_commit(commit.parent_commit_id, ensure=True)762 if parent_commit.commit_id in leafs:763 leafs.remove(parent_commit.commit_id)764 commit = parent_commit765 now = time.time()766 commit_id_to_branch_names = rit.get_commit_id_to_branch_names()767 commit_id_to_commit: dict[str, Commit] = {}768 for commit_id in leafs:769 logger.info("Log branch from %s", commit_id[:short_hash_index])770 while commit_id is not None:771 if commit_id not in commit_id_to_commit:772 commit_id_to_commit[commit_id] = rit.get_commit(commit_id, ensure=True)773 commit = commit_id_to_commit[commit_id]774 colored_commit_id = colorize(fg + yellow, commit.commit_id[:short_hash_index])775 if commit.commit_id in commit_id_to_branch_names:776 branch_names = commit_id_to_branch_names[commit.commit_id]777 colored_branch_names = []778 for branch_name in branch_names:779 if branch_name == head_ref_name:780 colored_branch_names.append(colorize(fg + blue, branch_name))781 else:782 colored_branch_names.append(colorize(fg + green, branch_name))783 branch_details = f"({', '.join(colored_branch_names)}) "784 else:785 branch_details = ''786 time_duration = pprint_time_duration(commit.create_time, now)787 date_details = f'({time_duration}) '788 logger.info("* %s %s%s%s", colored_commit_id, date_details, branch_details, commit.msg)789 commit_id = commit_graph[commit_id]790 return commit_graph, leafs, commit_id_to_commit, commit_id_to_branch_names791''' SUB BRANCH COMMANDS '''792def delete_branch(rit: RitResource, name: str):793 ''' removes a branch '''794 try:795 os.remove(os.path.join(rit.paths.branches, name))796 except FileNotFoundError:797 raise RitError("Failed to remove branch since it didn't exist.")798def list_branches(rit: RitResource):799 ''' logs branches to logger '''800 head = rit.head801 head_branch_name = head.branch_name802 branch_names = rit.get_branch_names()803 # TODO: this doesn't handle HEAD well if its commitless804 for branch_name in branch_names:805 this_sym = '*' if branch_name == head_branch_name else ' '806 branch = rit.get_branch(branch_name, ensure=True)807 commit = rit.get_commit(branch.commit_id, ensure=True)808 colored_commit_id = colorize(fg + yellow, branch.commit_id[:short_hash_index])809 colored_branch_name = colorize(fg + green, branch_name)810 logger.info("%s %s\t%s %s", this_sym, colored_branch_name, colored_commit_id, commit.msg)811 return head, branch_names812def create_branch(rit: RitResource, name: str, ref: Optional[str], force: bool):813 '''814 creates a branch with name name at ref ref. if the branch already exists,815 force will move the branch to the new commit.816 '''817 if rit.is_branch(name) and not force:818 raise RitError('Branch already exists: %s. Use -f to force the overwrite of it.', name)819 res = resolve_ref(rit, ref)820 if res.commit is None:821 if res.head is not None:822 raise RitError("Current head doesn't have a commit")823 else:824 raise RitError("Unable to resolve ref: %s", ref)825 commit_id = res.commit.commit_id826 branch = Branch(name, commit_id)827 rit.set_branch(branch)828 logger.info("Created branch %s at %s", name, commit_id[:short_hash_index])829 return res830def log_refs(rit: RitResource, refs: list[str], all: bool, full: bool):831 '''832 Returns the commit tree containing the given refs. If none are given, assumes833 head. If all is true, appends refs.834 '''835 commits = []836 if not refs:837 refs.append(None)838 if all:839 refs.extend(rit.get_branch_names())840 for ref in refs:841 res = resolve_ref(rit, ref)842 if res.commit is None:843 if res.head is not None:844 # TODO: this doesn't handle HEAD well if its commitless845 raise RitError("head branch doesn't have any commits")846 else:847 raise RitError("Unable to locate ref: %s", ref)848 commits.append(res.commit)849 return log_commits(rit, commits)850def show_ref(rit: RitResource, ref: Optional[str]):851 ''' log the contents of a specific reference '''852 res = resolve_ref(rit, ref)853 if res.commit is None:854 if res.head is not None:855 raise RitError("head branch doesn't have any commits to show")856 else:857 raise RitError("Unable to locate ref: %s", ref)858 tar_file = get_tar_path(rit, res.commit.commit_id)859 tar_cmd = ['tar', '-tf', tar_file]860 process = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE)861 changes = []862 while True:863 line = process.stdout.readline()864 if not line:865 break866 elif line == b'./\n':867 continue868 output = line.decode('utf-8').strip()869 changes.append(output)870 logger.info("\t- %s", colorize(fg + cyan, output))871 results = process.wait()872 if results != 0:873 raise RitError("tar command failed with exit code %d", results)874 return res, changes875def status_head(rit: RitResource):876 ''' run status_tar on HEAD with logging '''877 if rit.head.branch_name is not None:878 head_id = rit.head.branch_name879 else:880 head_id = rit.head.commit_id881 logger.info("%s -> %s", head_ref_name, head_id)882 if not status_tar(rit, True):883 logger.info("Clean working directory!")884def prune_commits(rit: RitResource):885 ''' remove any commits not associated to a branch history '''886 return rit.prune()887def checkout_ref(*, root_rit_dir: str, ref: str, force: bool):888 '''889 Checkout ref (overwriting any changes if force is True).890 It will also check that the current working directory is clean so that we know891 no data is lost. If the working tree is dirty, then force needs to be set to892 True to forcibly delete these changes.893 Returns ref resolved, as a ResolvedRef.894 The end result is that head points to ref, and the working directory is895 restored to the point of ref.896 It is the equivalent of the following:897 - if head is a branch, set head to the underlying commit898 - hard reset to ref899 - set head to ref900 Or if you allow head to be moved to non equivalent commits:901 - set head to ref902 - hard reset to new head903 '''904 logger.debug('checkout_ref')905 logger.debug(' ref: %s', ref)906 logger.debug(' force: %s', force)907 check_types(908 ref = (ref, exact_t(str)),909 force = (force, exact_t(bool)),910 )911 rit = RitResource(root_rit_dir)912 res = resolve_ref(rit, ref)913 if res.head is not None:914 raise RitError("Attempted to checkout head ref")915 elif res.commit is None:916 raise RitError("Unable to resolve ref to commit: %s", ref)917 commit = res.commit918 restored = False919 head_commit_id = rit.get_head_commit_id()920 if head_commit_id is not None and head_commit_id != commit.commit_id:921 if not force and status_tar(rit, False):922 raise RitError("Uncommitted changes! Commit them or use -f to destroy them.")923 restored = True924 restore_to_commit(rit, commit)925 if res.branch is not None:926 new_head = HeadNode(branch_name = res.branch.name)927 else:928 new_head = HeadNode(commit_id = commit.commit_id)929 rit.set_head(new_head)930 if restored:931 logger.info("Successful checkout. Commit this checkout to get a clean rit status.")932 else:933 logger.info("Switched to new ref")934 return res935def checkout_orphan(*,936 root_rit_dir: str,937 name: str):938 '''939 Move head to point at branch with name name. Ensures that name doesn't already940 exist. This results in the head not pointing to any commit. Next commit will941 create a new branch.942 '''943 logger.debug('checkout_orphan')944 logger.debug(' name: %s', name)945 check_types(946 name = (name, optional_t(exact_t(str))),947 )948 validate_branch_name(name)949 rit = RitResource(root_rit_dir)950 if rit.is_branch(name):951 raise RitError("Orphan checkout failed because branch already exists.")952 new_head = HeadNode(branch_name=name)953 rit.set_head(new_head)954'''955API956You should be able to do anything you want with these functions.957'''958def query_cmd(*, root_rit_dir: str):959 '''960 Return a read only RitResource used for querying the rit directory.961 CLI users get information via stdout. That's not helpful to for python users,962 so they instead get access to the full data structure.963 For advanced users who want write access to the rit directory, construct a964 RitResource directly with prevent_mutations set to False and read the class'965 docs.966 '''967 return RitResource(root_rit_dir, prevent_mutations=True)968def init_cmd(*, root_rit_dir: str):969 logger.debug("init")970 rit = RitResource(root_rit_dir)971 rit.initialize()972def commit_cmd(*, root_rit_dir: str, msg: str):973 logger.debug('commit')974 logger.debug(' msg: %s', msg)975 check_types(976 msg = (msg, exact_t(str)),977 )978 rit = RitResource(root_rit_dir)979 commit = create_commit(rit, time.time(), msg)980 logger.info("Created commit %s: %s", commit.commit_id[:short_hash_index], commit.msg)981 return commit982def reset_cmd(*, root_rit_dir: str, ref: Optional[str], hard: bool):983 '''984 A reset tries to move the head to ref. If head is a branch, it instead moves985 the branch. If head is a commit, it just moves head to the new commit. If it's986 a hard reset, then post moving head, the working changes are removed.987 Here, a non hard reset is effectively the same as git's reset with no988 arguments. In git, a reset with no args is --mixed, and since we don't have989 staged changes, --mixed and --soft are the same in this context. For that990 reason, here, we just use the terminology reset instead of soft reset or mixed991 reset.992 '''993 logger.debug('reset')994 logger.debug(' ref: %s', ref)995 logger.debug(' hard: %s', hard)996 check_types(997 ref = (ref, optional_t(exact_t(str))),998 hard = (hard, exact_t(bool)),999 )1000 rit = RitResource(root_rit_dir)1001 res = resolve_ref(rit, ref)1002 if res.commit is None:1003 raise RitError("Unable to resolve ref to commit: %s", ref)1004 if rit.head.branch_name is not None:1005 branch = rit.get_branch(rit.head.branch_name, ensure=False)1006 if branch is None:1007 branch = Branch(rit.head.branch_name, res.commit.commit_id)1008 else:1009 branch.commit_id = res.commit.commit_id1010 rit.set_branch(branch)1011 else:1012 new_head = HeadNode(commit_id = res.commit.commit_id)1013 rit.set_head(new_head)1014 restored = False1015 if hard:1016 restored = True1017 restore_to_commit(rit, res.commit)1018 if restored:1019 logger.info("Successful reset. Commit this checkout to get a clean rit status.")1020 else:1021 logger.info("Successful reset")1022 return res1023def checkout_cmd(*, root_rit_dir: str, orphan: bool, ref_or_name: str, force: Optional[bool]):1024 '''1025 Forwards to checkout_ref or checkout_orphan1026 '''1027 logger.debug('checkout')1028 logger.debug(' orphan: %s', orphan)1029 logger.debug(' ref_or_name: %s', ref_or_name)1030 logger.debug(' force: %s', force)1031 check_types(...

Full Screen

Full Screen

test_rit.py

Source:test_rit.py Github

copy

Full Screen

...238 assert new_commit.parent_commit_id is None239 # get rit res240 rit_res = query_cmd(**base_kwargs)241 # verify commit id matches what was returned242 new_commit_lookup = rit_res.get_commit(new_commit.commit_id, ensure=True)243 assert new_commit_lookup.commit_id is not None244 assert new_commit_lookup.commit_id == new_commit.commit_id245 assert new_commit_lookup.parent_commit_id == new_commit.parent_commit_id246 assert new_commit_lookup.create_time == new_commit.create_time247 assert new_commit_lookup.msg == new_commit.msg248 root_commit_id = new_commit_lookup.commit_id249 # verify that head is still on branch250 assert rit_res.head.branch_name == 'otest'251 assert rit_res.head.commit_id is None252 # verify that branch matches claimed commit253 new_branch = rit_res.get_branch('otest')254 assert new_branch is not None255 assert new_branch.commit_id == new_commit.commit_id256 # add files257 branch_cmd(**base_kwargs, name="otest_root", ref=None, force=False, delete=False)258 otest_a_file = touch(root_rit_dir, 'otest_a')259 commit_cmd(**base_kwargs, msg="add a")260 branch_cmd(**base_kwargs, name="otest_a", ref=None, force=False, delete=False)261 otest_b_file = touch(root_rit_dir, 'otest_b')262 top_commit = commit_cmd(**base_kwargs, msg="add b")263 branch_cmd(**base_kwargs, name="otest_b", ref=None, force=False, delete=False)264 otest_c_file = touch(root_rit_dir, 'otest_c')265 rit_res = query_cmd(**base_kwargs)266 assert rit_res.head.branch_name == 'otest'267 otest_root_branch = rit_res.get_branch('otest_root')268 assert otest_root_branch is not None269 otest_a_branch = rit_res.get_branch('otest_a')270 assert otest_a_branch is not None271 otest_b_branch = rit_res.get_branch('otest_b')272 assert otest_b_branch is not None273 otest_branch = rit_res.get_branch('otest')274 assert otest_branch is not None275 assert otest_branch.commit_id == otest_b_branch.commit_id276 assert top_commit.commit_id == otest_branch.commit_id277 assert top_commit.parent_commit_id == otest_a_branch.commit_id278 assert root_commit_id == rit_res.get_commit(otest_a_branch.commit_id, ensure=True).parent_commit_id279 commit_chain = [root_commit_id, top_commit.parent_commit_id, top_commit.commit_id]280 # make orphan and make sure changes are still there281 checkout_cmd(**base_kwargs, orphan=True, ref_or_name='otest_2', force=None)282 assert os.path.exists(otest_a_file)283 assert os.path.exists(otest_b_file)284 assert os.path.exists(otest_c_file)285 checkout_cmd(**base_kwargs, orphan=True, ref_or_name='otest_3', force=None)286 assert os.path.exists(otest_a_file)287 assert os.path.exists(otest_b_file)288 assert os.path.exists(otest_c_file)289 # get rit res290 rit_res = query_cmd(**base_kwargs)291 assert rit_res.get_branch('otest_2') is None292 assert rit_res.get_branch('otest_3') is None293 assert rit_res.head.branch_name == 'otest_3'294 reset_cmd(**base_kwargs, ref=commit_chain[0], hard=False)295 rit_res = query_cmd(**base_kwargs)296 assert rit_res.head.branch_name == 'otest_3'297 assert rit_res.head.commit_id is None298 this_branch = rit_res.get_branch('otest_3', ensure=True)299 assert this_branch is not None300 assert this_branch.commit_id == commit_chain[0]301 assert os.path.exists(otest_a_file)302 assert os.path.exists(otest_b_file)303 assert os.path.exists(otest_c_file)304 reset_cmd(**base_kwargs, ref=None, hard=False)305 rit_res = query_cmd(**base_kwargs)306 assert rit_res.get_branch('otest_2') is None307 assert rit_res.head.branch_name == 'otest_3'308 reset_cmd(**base_kwargs, ref=rit_lib.head_ref_name, hard=False)309 rit_res = query_cmd(**base_kwargs)310 assert rit_res.get_branch('otest_2') is None311 assert rit_res.head.branch_name == 'otest_3'312 reset_cmd(**base_kwargs, ref=commit_chain[1], hard=False)313 rit_res = query_cmd(**base_kwargs)314 assert rit_res.head.branch_name == 'otest_3'315 assert rit_res.head.commit_id is None316 this_branch = rit_res.get_branch('otest_3', ensure=True)317 assert this_branch is not None318 assert this_branch.commit_id == commit_chain[1]319 assert os.path.exists(otest_a_file)320 assert os.path.exists(otest_b_file)321 assert os.path.exists(otest_c_file)322 reset_cmd(**base_kwargs, ref=None, hard=False)323 rit_res = query_cmd(**base_kwargs)324 assert rit_res.head.branch_name == 'otest_3'325 reset_cmd(**base_kwargs, ref=rit_lib.head_ref_name, hard=False)326 rit_res = query_cmd(**base_kwargs)327 assert rit_res.head.branch_name == 'otest_3'328 reset_cmd(**base_kwargs, ref=commit_chain[2], hard=False)329 rit_res = query_cmd(**base_kwargs)330 assert rit_res.head.branch_name == 'otest_3'331 assert rit_res.head.commit_id is None332 this_branch = rit_res.get_branch('otest_3', ensure=True)333 assert this_branch is not None334 assert this_branch.commit_id == commit_chain[2]335 assert os.path.exists(otest_a_file)336 assert os.path.exists(otest_b_file)337 assert os.path.exists(otest_c_file)338 checkout_cmd(**base_kwargs, orphan=False, ref_or_name=commit_chain[2], force=False)339 rit_res = query_cmd(**base_kwargs)340 assert rit_res.head.branch_name is None341 assert rit_res.head.commit_id == commit_chain[2]342 reset_cmd(**base_kwargs, ref=commit_chain[0], hard=False)343 rit_res = query_cmd(**base_kwargs)344 assert rit_res.head.branch_name is None345 assert rit_res.head.commit_id == commit_chain[0]346 assert os.path.exists(otest_a_file)347 assert os.path.exists(otest_b_file)348 assert os.path.exists(otest_c_file)349 reset_cmd(**base_kwargs, ref=None, hard=False)350 rit_res = query_cmd(**base_kwargs)351 assert rit_res.head.commit_id == commit_chain[0]352 reset_cmd(**base_kwargs, ref=rit_lib.head_ref_name, hard=False)353 rit_res = query_cmd(**base_kwargs)354 assert rit_res.head.commit_id == commit_chain[0]355 reset_cmd(**base_kwargs, ref=commit_chain[2], hard=False)356 rit_res = query_cmd(**base_kwargs)357 assert rit_res.head.branch_name is None358 assert rit_res.head.commit_id == commit_chain[2]359 assert os.path.exists(otest_a_file)360 assert os.path.exists(otest_b_file)361 assert os.path.exists(otest_c_file)362 reset_cmd(**base_kwargs, ref=commit_chain[0], hard=False)363 rit_res = query_cmd(**base_kwargs)364 assert rit_res.head.branch_name is None365 assert rit_res.head.commit_id == commit_chain[0]366 assert os.path.exists(otest_a_file)367 assert os.path.exists(otest_b_file)368 assert os.path.exists(otest_c_file)369 reset_cmd(**base_kwargs, ref=commit_chain[2], hard=True)370 rit_res = query_cmd(**base_kwargs)371 assert rit_res.head.branch_name is None372 assert rit_res.head.commit_id == commit_chain[2]373 assert os.path.exists(otest_a_file)374 assert os.path.exists(otest_b_file)375 assert not os.path.exists(otest_c_file)376 reset_cmd(**base_kwargs, ref=commit_chain[1], hard=True)377 rit_res = query_cmd(**base_kwargs)378 assert rit_res.head.branch_name is None379 assert rit_res.head.commit_id == commit_chain[1]380 assert os.path.exists(otest_a_file)381 assert not os.path.exists(otest_b_file)382 assert not os.path.exists(otest_c_file)383 reset_cmd(**base_kwargs, ref=commit_chain[0], hard=True)384 rit_res = query_cmd(**base_kwargs)385 assert rit_res.head.branch_name is None386 assert rit_res.head.commit_id == commit_chain[0]387 assert not os.path.exists(otest_a_file)388 assert not os.path.exists(otest_b_file)389 assert not os.path.exists(otest_c_file)390 reset_cmd(**base_kwargs, ref=commit_chain[1], hard=True)391 rit_res = query_cmd(**base_kwargs)392 assert rit_res.head.branch_name is None393 assert rit_res.head.commit_id == commit_chain[1]394 assert os.path.exists(otest_a_file)395 assert not os.path.exists(otest_b_file)396 assert not os.path.exists(otest_c_file)397 reset_cmd(**base_kwargs, ref=commit_chain[2], hard=True)398 rit_res = query_cmd(**base_kwargs)399 assert rit_res.head.branch_name is None400 assert rit_res.head.commit_id == commit_chain[2]401 assert os.path.exists(otest_a_file)402 assert os.path.exists(otest_b_file)403 assert not os.path.exists(otest_c_file)404 checkout_cmd(**base_kwargs, orphan=False, ref_or_name='otest_3', force=True)405 reset_cmd(**base_kwargs, ref=commit_chain[2], hard=True)406 rit_res = query_cmd(**base_kwargs)407 assert rit_res.head.branch_name == 'otest_3'408 assert rit_res.head.commit_id is None409 assert os.path.exists(otest_a_file)410 assert os.path.exists(otest_b_file)411 assert not os.path.exists(otest_c_file)412 reset_cmd(**base_kwargs, ref=commit_chain[1], hard=True)413 rit_res = query_cmd(**base_kwargs)414 assert rit_res.head.branch_name == 'otest_3'415 assert rit_res.head.commit_id is None416 assert os.path.exists(otest_a_file)417 assert not os.path.exists(otest_b_file)418 assert not os.path.exists(otest_c_file)419 reset_cmd(**base_kwargs, ref=commit_chain[0], hard=True)420 rit_res = query_cmd(**base_kwargs)421 assert rit_res.head.branch_name == 'otest_3'422 assert rit_res.head.commit_id is None423 assert not os.path.exists(otest_a_file)424 assert not os.path.exists(otest_b_file)425 assert not os.path.exists(otest_c_file)426 checkout_cmd(**base_kwargs, orphan=False, ref_or_name='deviate', force=True)427 rit_res = query_cmd(**base_kwargs)428 head_branch = rit_res.get_branch(rit_res.head.branch_name)429 head_commit_id = head_branch.commit_id430 head_commit = rit_res.get_commit(head_commit_id, ensure=True)431 reset_cmd(**base_kwargs, ref=head_commit.parent_commit_id, hard=False)432 rit_res = query_cmd(**base_kwargs)433 rit_res.get_commit(head_commit_id, ensure=True)434 prune_res = prune_cmd(**base_kwargs)435 rit_res = query_cmd(**base_kwargs)436 assert rit_res.get_commit(head_commit_id) is None437 assert len(prune_res) == 1438 assert prune_res[0] == head_commit_id439 rit_res = query_cmd(**base_kwargs)440 head_commit = rit_res.get_head_commit_id()441 checkout_cmd(**base_kwargs, orphan=False, ref_or_name=head_commit, force=True)442 reset_cmd(**base_kwargs, ref='deviate', hard=False)443 rit_res = query_cmd(**base_kwargs)444 assert rit_res.head.branch_name is None...

Full Screen

Full Screen

CA4_Python_Script.py

Source:CA4_Python_Script.py Github

copy

Full Screen

1# Name: Paul Prew2# Student Number: 103548283# Programming for Big Data4# CA 45# This is a python program that will execute to read in a sample text data file which is named 'changes_python.txt'. 6# This program works to scrub/clean the data, and differentiate the content into 422 separate objects, 7# which can be represented as rows on a spreadsheet.8# Where possible additional data was derived from the data e.g. week number was derived from the date. 9# The data was mostly extracted from the title row. 10# Additionally, for each row the number of files actioned for each revision number was calculated, and broken out by action type e.g. A, D, M, R.11# This extract provides a count of the total number of file changes, broken out by action type. 12# This also enables analysis on the number of file changed by author, and breakdown by action type.13# In the program the 'Commit' class is used to define the elements of the 'commit data' extracted from the file.14# Each of the 422 commits, are represented as instance objects of the 'Commit' class.15# Functions operate on these objects to perform analysis on the commit data, and return the result as dictionary files. 16# Dictionary files are used for the analysys outputs, as this facilitates export as CSV. 17# Visualisation of analysis was done using charts in R Studio, and Microsoft Excel.18# When the program executes, the user is asked to confirm the action, to begin data file read.19# If successful, a message printed to the console will acknowledge this, and also display the total number of commits in the extract 20# and the total number of lines in the file.21# The user is then asked to confirm next action, to perform analysis on the commit data. 22# The program will then execute the analysis and export the results to CSV files in the working directory.23# When above step is completed, a message printed to the console will confirm this.24# The user will now be asked if they wish to print the analysis results to the console. If the user selects this option25# the results are printed on screen.26sep = 72*'-'27import datetime28import csv29import os30# Below defines the class 'Commit' which is used to define the elements of the 'commit data' extracted from the file.31class Commit:32 'class for commits'33 34 def __init__(self, revision = None, author = None, fulldatetime = None, fulldate = None, dateinmonth = None, dayinweek = None, weeknumber = None, month = None, year = None,35 comment_line_count = None, number_changes = None, number_a = None, number_d = None, number_m = None, number_r = None, changes = None, comment = None):36 self.revision = revision37 self.author = author38 self.fulldatetime = fulldatetime39 self.fulldate = fulldate40 self.dateinmonth = dateinmonth41 self.dayinweek = dayinweek42 self.weeknumber = weeknumber43 self.month = month44 self.year = year45 self.comment_line_count = comment_line_count46 self.number_changes = number_changes47 self.changes = changes48 self.number_a = number_a49 self.number_d = number_d50 self.number_m = number_m51 self.number_r = number_r52 self.comment = comment53 54# Below is a generic function that will accept as parameter an attribute from the commit data55# This allows this function to be called multiple times with different parameter values 56# e.g. revisions by author, revisions by day.57def get_revisions_by_attribute(data, attribute):58 result = {}59 for commit in data:60 value = getattr(commit, attribute)61 item = value62 result[item] = result.get(item, 0) + 163 return result64# Below function will get the total number of file changes for all 422 commits,65# and break them out by file change action type i.e. A, D, M, R,66# and also by author 67def get_file_changes_by_action(data):68 all_authors = []69 for commit in data:70 new_author = {}71 author = commit.author72 match_found = False 73 if len(all_authors) > 0:74 for record in all_authors:75 if record['Author'] == author :76 record['Action Type A'] += commit.number_a77 record['Action Type D'] += commit.number_d78 record['Action Type M'] += commit.number_m79 record['Action Type R'] += commit.number_r80 record['Total Files Actioned'] += commit.number_changes81 match_found = True82 break83 84 if len(all_authors) == 0 or match_found == False : 85 new_author = {'Author': author, 'Action Type A': commit.number_a, 'Action Type D': commit.number_d, 'Action Type M': commit.number_m, 86 'Action Type R': commit.number_r, 'Total Files Actioned' : commit.number_changes } 87 all_authors.append(new_author)88 89 return all_authors90 91def read_file(changes_file):92 data = [line.strip() for line in open(changes_file, 'r')]93 return data 94def get_commits(data):95 commits = []96 current_commit = None97 index = 0 98 author = {}99 while True:100 try:101 current_commit = Commit()102 details = data[index + 1].split('|')103 current_commit.revision = int(details[0].strip().strip('r'))104 current_commit.author = details[1].strip()105 current_commit.fulldatetime = details[2].strip() 106 current_commit.dateinmonth = int(details[2][9:11])107 current_commit.month = int(details[2][6:8])108 current_commit.year = int(details[2][0:5])109 current_commit.fulldate = datetime.date(current_commit.year, current_commit.month, current_commit.dateinmonth)110 current_commit.weeknumber = current_commit.fulldate.strftime("%W")111 current_commit.dayinweek = details[2][28:31]112 current_commit.comment_line_count = int(details[3].strip().split(' ')[0])113 current_commit.changes = data[index+2:data.index('',index+1)]114 current_commit.number_changes = len(current_commit.changes)-1 115 current_commit.number_a = 0116 current_commit.number_d = 0117 current_commit.number_m = 0118 current_commit.number_r = 0119 for i in range(current_commit.number_changes) :120 if current_commit.changes[i + 1].startswith("A /") :121 current_commit.number_a = current_commit.number_a + 1122 elif current_commit.changes[i + 1].startswith("D /") :123 current_commit.number_d = current_commit.number_d + 1124 elif current_commit.changes[i + 1].startswith("M /") :125 current_commit.number_m = current_commit.number_m + 1126 elif current_commit.changes[i + 1].startswith("R /") :127 current_commit.number_r = current_commit.number_r + 1128 index = data.index(sep, index + 1)129 current_commit.comment = data[index-current_commit.comment_line_count:index]130 commits.append(current_commit)131 except IndexError:132 break133 return commits134# this function saves the commit list (class instances) to a CSV log file135def save_commit_list_as_csv (location): 136 with open(location,"wb") as data_export:137 fnameWriter = csv.writer(data_export)138 fnameWriter.writerow(["Revision", "Author", "Full Date/Time", "Full Date", "Date in Month", "Day in Week", "Week Number", "Month", "Year", "Comment Line Count", "Number Changes", "Number A", 139 "Number D", "Number M", "Number R", "Comment"])140 for commit in results:141 fnameWriter.writerow([commit.revision, commit.author, commit.fulldatetime, commit.fulldate, commit.dateinmonth, commit.dayinweek, commit.weeknumber, commit.month,142 commit.year, commit.comment_line_count, commit.number_changes, commit.number_a, commit.number_d, commit.number_m, commit.number_r,143 commit.comment])144 data_export.close()145# this function saves the analysis results to the console, and is used where the result to save, is formatted as a single dictionary146# the function accepts as parameters: data, and location, so can be used many times 147def save_dict_as_csv (data,location):148 with open(location,'wb') as output_file:149 w = csv.writer(output_file)150 w.writerows(data.items())151 output_file.close()152# this function saves the analysis results to a CSV log file, and is used where the result to save, is formatted as a list of dictionaries153def save_dict_list_as_csv(data, location) : 154 keys = data[0].keys()155 with open(location, 'wb') as output_file:156 dict_writer = csv.DictWriter(output_file, keys)157 dict_writer.writeheader()158 dict_writer.writerows(data) 159 output_file.close()160# this function prints the analysis results to the console, and is used where the result to print, is formatted as a single dictionary161# the function accepts as parameters: data, and location, so can be used many times 162def print_dict_to_console(data,title):163 print ''164 print title165 print "="*len(title)166 ls = list()167 for key, val in data.items():168 ls.append((val,key))169 ls.sort(reverse = True)170 for key, val in ls:171 print key, val172 return ls173# this function prints the analysis results to the console, and is used where the result to print, is formatted as a list of dictionaries174def print_dict_list_to_console(data, title): 175 print ''176 print title177 print "=" * len(title)178 for row in data:179 print row180def export_analysis_data_to_CSV():181 filetoCSV = authors182 save_dict_as_csv(filetoCSV,"Python_Export_Counts of Revisions By Author.csv") 183 filetoCSV = days184 save_dict_as_csv(filetoCSV,"Python Export_Counts of Revisions By Day in Week.csv") 185 filetoCSV = change_files186 save_dict_list_as_csv(filetoCSV,"Python Export_Counts of Files Changed By Action Type.csv") 187 188# main program 189if __name__ == '__main__': 190 191 os.system('cls')192 193 print '\n***************************************\n'194 print "Analysis for sample text file"195 print "Text file is 'changes_python.txt'"196 print '\n***************************************\n'197 while True:198 try: 199 s_inp = raw_input("Enter any key to read data file and get commits: ")200 data = read_file('changes_python.txt')201 results = get_commits(data)202 # following line saves the commit list with 422 rows to a csv which acts as a log file203 save_commit_list_as_csv("Python Export_Commit List.csv") 204 print "\nFile read was successful, and data has been extracted"205 print "{} rows of commits extracted from {} lines" .format(len(results), len(data))206 print "Results saved to csv log file in working directory"207 break208 209 except:210 print "\nError while reading file. Please try again."211 print "Ensure text file is saved as 'changes_python.txt and is saved in the working directory."212 continue 213 214 s_inp = raw_input("\nEnter any key to perform analysis on commit data: ")215 216 authors = get_revisions_by_attribute(results, "author")217 days = get_revisions_by_attribute(results, "dayinweek")218 change_files = get_file_changes_by_action(results)219 220 export_analysis_data_to_CSV()221 print "\nAnalysis on commit data is completed"222 print "Results saved as CSV files in working directory"223 224 print "\nEnter 'Y' to print analysis summary to console"225 print "Enter any other key to exit program"226 choice = raw_input()227 if choice.lower() == 'y' :228 print_dict_to_console(authors, "Revisions By Author")229 print_dict_to_console(days, "Revisions By Day in Week")230 print_dict_list_to_console(change_files, "Revisions By File Changes By Author")231 else :232 print "\nExited program"233 234 235 236 ...

Full Screen

Full Screen

compare_footprint

Source:compare_footprint Github

copy

Full Screen

...66 parser.add_argument('-c', '--commit', default=None,67 help="Commit ID to use compare footprint against base. "68 "Default is HEAD or working tree.")69 return parser.parse_args()70def get_git_commit(commit):71 commit_id = None72 proc = subprocess.Popen('git rev-parse %s' % commit, stdout=subprocess.PIPE,73 cwd=os.environ.get('ZEPHYR_BASE'), shell=True)74 if proc.wait() == 0:75 commit_id = proc.stdout.read().decode("utf-8").strip()76 return commit_id77def sanity_results_filename(commit=None, cwd=os.environ.get('ZEPHYR_BASE')):78 if not commit:79 file_name = "tmp.csv"80 else:81 if commit == RELEASE_DATA:82 file_name = RELEASE_DATA83 else:84 file_name = "%s.csv" % commit85 return os.path.join(cwd,'scripts', 'sanity_chk', file_name)86def git_checkout(commit, cwd=os.environ.get('ZEPHYR_BASE')):87 proc = subprocess.Popen('git diff --quiet', stdout=subprocess.PIPE,88 stderr=subprocess.STDOUT, cwd=cwd, shell=True)89 if proc.wait() != 0:90 raise Exception("Cannot continue, you have unstaged changes in your working tree")91 proc = subprocess.Popen('git reset %s --hard' % commit,92 stdout=subprocess.PIPE,93 stderr=subprocess.STDOUT,94 cwd=cwd, shell=True)95 if proc.wait() == 0:96 return True97 else:98 logger.error(proc.stdout.read())99 return False100def run_sanity_footprint(commit=None, cwd=os.environ.get('ZEPHYR_BASE'),101 output_file=None):102 if not output_file:103 output_file = sanity_results_filename(commit)104 cmd = '/bin/bash -c "source ./zephyr-env.sh && twister'105 cmd += ' +scripts/sanity_chk/sanity_compare.args -o %s"' % output_file106 logger.debug('Sanity (%s) %s' %(commit, cmd))107 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,108 cwd=cwd, shell=True)109 output,_ = proc.communicate()110 if proc.wait() == 0:111 logger.debug(output)112 return True113 logger.error("Couldn't build footprint apps in commit %s" % commit)114 logger.error(output)115 raise Exception("Couldn't build footprint apps in commit %s" % commit)116def run_footprint_build(commit=None):117 logging.debug("footprint build for %s" % commit)118 if not commit:119 run_sanity_footprint()120 else:121 cmd = "git clone --no-hardlinks %s" % os.environ.get('ZEPHYR_BASE')122 tmp_location = os.path.join(tempfile.gettempdir(),123 os.path.basename(os.environ.get('ZEPHYR_BASE')))124 if os.path.exists(tmp_location):125 shutil.rmtree(tmp_location)126 logging.debug("clonning into %s" % tmp_location)127 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,128 stderr=subprocess.STDOUT,129 cwd=tempfile.gettempdir(), shell=True)130 if proc.wait() == 0:131 if git_checkout(commit, tmp_location):132 run_sanity_footprint(commit, tmp_location)133 else:134 logger.error(proc.stdout.read())135 shutil.rmtree(tmp_location, ignore_errors=True)136 return True137def read_sanity_report(filename):138 data = []139 with open(filename) as fp:140 tmp = csv.DictReader(fp)141 for row in tmp:142 data.append(row)143 return data144def get_footprint_results(commit=None):145 sanity_file = sanity_results_filename(commit)146 if (not os.path.exists(sanity_file) or not commit) and commit != RELEASE_DATA:147 run_footprint_build(commit)148 return read_sanity_report(sanity_file)149def tree_changes():150 proc = subprocess.Popen('git diff --quiet', stdout=subprocess.PIPE,151 cwd=os.environ.get('ZEPHYR_BASE'), shell=True)152 if proc.wait() != 0:153 return True154 return False155def get_default_current_commit():156 if tree_changes():157 return None158 else:159 return get_git_commit('HEAD')160def get_default_base_commit(current_commit):161 if not current_commit:162 if tree_changes():163 return get_git_commit('HEAD')164 else:165 return get_git_commit('HEAD~1')166 else:167 return get_git_commit('%s~1'%current_commit)168def build_history(b_commit=None, c_commit=None):169 if not GIT_ENABLED:170 logger.info('Working on current tree, not git enabled.')171 current_commit = None172 base_commit = RELEASE_DATA173 else:174 if not c_commit:175 current_commit = get_default_current_commit()176 else:177 current_commit = get_git_commit(c_commit)178 if not b_commit:179 base_commit = get_default_base_commit(current_commit)180 else:181 base_commit = get_git_commit(b_commit)182 if not base_commit:183 logger.error("Cannot resolve base commit")184 return185 logger.info("Base: %s" % base_commit)186 logger.info("Current: %s" % (current_commit if current_commit else187 'working space'))188 current_results = get_footprint_results(current_commit)189 base_results = get_footprint_results(base_commit)190 deltas = compare_results(base_results, current_results)191 print_deltas(deltas)192def compare_results(base_results, current_results):193 interesting_metrics = [("ram_size", int),194 ("rom_size", int)]195 results = {}...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful