Best Python code snippet using localstack_python
rit_lib.py
Source:rit_lib.py  
...192    self._branch_names = cleared_rit._branch_names193    self._commit_ids = cleared_rit._commit_ids194    self._short_commit_tree = cleared_rit._short_commit_tree195  ''' SET '''196  def add_commit(self, commit: Commit):197    ''' add the commit to the rit dir '''198    if self.prevent_mutations:199      raise RitError("Doing this would mutate the rit directory, and that is disabled for this RitResource")200    self._write_commit(commit)201    self._clear()202  def prune(self):203    leaf_commits = set()204    if self.head.commit_id is not None:205      leaf_commits.add(self.head.commit_id)206    for branch_name in self.get_branch_names():207      leaf_commits.add(self.get_branch(branch_name, ensure=True).commit_id)208    refed_commits = set(leaf_commits)209    for leaf_commit in leaf_commits:210      while True:211        commit = self.get_commit(leaf_commit, ensure=True)212        if commit.parent_commit_id is None or commit.parent_commit_id in refed_commits:213          break214        refed_commits.add(commit.parent_commit_id)215        leaf_commit = commit.parent_commit_id216    removed_commit_ids = []217    for commit_id in self.get_commit_ids():218      if commit_id in refed_commits:219        continue220      self._delete_commit(commit_id)221      removed_commit_ids.append(commit_id)222    return removed_commit_ids223  def set_branch(self, branch: Branch):224    ''' add the branch to the rit dir '''225    if self.prevent_mutations:226      raise RitError("Doing this would mutate the rit directory, and that is disabled for this RitResource")227    self._write_branch(branch)228    self._clear()229  def set_head(self, head: HeadNode):230    ''' set the new head point '''231    if self.prevent_mutations:232      raise RitError("Doing this would mutate the rit directory, and that is disabled for this RitResource")233    self._write_head(head)234    self._clear()235  ''' GET '''236  @property237  def paths(self):238    ''' returns a RitPaths object for this rit directory '''239    if self._paths is None:240      self._paths = self._read_paths()241    return self._paths242  @property243  def head(self):244    ''' returns the HeadNode object for this rit directory '''245    if self._head is None:246      self._head = self._read_head()247    return self._head248  def get_head_commit_id(self):249    ''' get the commit id of the head, None if there isn't one '''250    head = self.head251    if head.commit_id is not None:252      return head.commit_id253    else:254      branch = self.get_branch(head.branch_name)255      if branch is not None:256        return branch.commit_id257      else:258        return None259  def get_commit_ids(self):260    ''' get all commit ids '''261    if self._commit_ids is None:262      self._commit_ids = self._read_commit_ids()263    return self._commit_ids264  def get_commit(self, commit_id: str, *, ensure=False):265    '''266    get commit object of a commit id267    return a commit if found268    otherwise return None, if ensure is set to True, the raise instead of return None.269    '''270    if commit_id not in self._commits:271      try:272        self._commits[commit_id] = self._read_commit(commit_id)273      except FileNotFoundError:274        if ensure:275          raise RitError("Unable to load expected commit")276        return None277    return self._commits[commit_id]278  def is_commit(self, commit_id: str):279    ''' return True if commit_id has a commit '''280    return self.get_commit(commit_id) is not None281  def get_branch_names(self):282    ''' return names of all branches '''283    if self._branch_names is None:284      self._branch_names = self._read_branch_names()285    return self._branch_names286  def get_branch(self, name: str, *, ensure=False):287    '''288    query for branch by name289    returns None if not found. raises RitError instead if ensure is True290    '''291    if name not in self._branches:292      try:293        self._branches[name] = self._read_branch(name)294      except FileNotFoundError:295        if ensure:296          raise RitError("Unable to load expected branch")297        return None298    return self._branches[name]299  def is_branch(self, name: str):300    ''' return True if the branch name has a branch '''301    return self.get_branch(name) is not None302  def get_branch_name_to_commit_ids(self):303    ''' see _populate_commit_to_branch_map '''304    if self._branch_name_to_commit_ids is None:305      self._populate_commit_to_branch_map()306    return self._branch_name_to_commit_ids307  def get_commit_id_to_branch_names(self):308    ''' see _populate_commit_to_branch_map '''309    if self._commit_id_to_branch_names is None:310      self._populate_commit_to_branch_map()311    return self._commit_id_to_branch_names312  def get_commit_tree(self):313    '''314    returns a map of shortened commit prefixes to a list of all commit ids with315    that prefix316    the tree is used to find full commit ids given partial ones.317    '''318    if self._short_commit_tree is None:319      self._short_commit_tree = defaultdict(list)320      for commit_id in self.get_commit_ids():321        self._short_commit_tree[commit_id[:short_hash_index]].append(commit_id)322    return self._short_commit_tree323  ''' helpers '''324  def _populate_commit_to_branch_map(self):325    '''326    populate the commit <-> branch maps327    branches include the HEAD node328    if a branch doesn't have a commit, the commit and branch will not have329    entries in the maps330    '''331    branch_names = self.get_branch_names()332    self._branch_name_to_commit_ids = {}333    self._commit_id_to_branch_names = defaultdict(list)334    for branch_name in branch_names:335      branch = self.get_branch(branch_name, ensure=True)336      commit_id = branch.commit_id337      self._branch_name_to_commit_ids[branch_name] = commit_id338      self._commit_id_to_branch_names[commit_id].append(branch_name)339    head_commit_id = self.get_head_commit_id()340    if head_commit_id is not None:341      self._branch_name_to_commit_ids[head_ref_name] = head_commit_id342      self._commit_id_to_branch_names[head_commit_id].append(head_ref_name)343  ''' IO '''344  def _read_paths(self):345    root_rit_dir = os.path.realpath(self.root_rit_dir)346    rit_dir = os.path.join(root_rit_dir, rit_dir_name)347    last_root_rit_dir = None348    while not os.path.isdir(rit_dir):349      last_root_rit_dir = root_rit_dir350      root_rit_dir = os.path.dirname(root_rit_dir)351      if last_root_rit_dir == root_rit_dir:352        raise RitError("Unable to locate rit directory")353      rit_dir = os.path.join(root_rit_dir, rit_dir_name)354    return RitPaths.build_rit_paths(root_rit_dir)355  def _read_head(self):356    try:357      with open(os.path.join(self.paths.rit_dir, head_ref_name)) as fin:358        return HeadNode(**json.load(fin))359    except FileNotFoundError:360      return HeadNode(None, default_branch_name)361  def _read_commit(self, commit_id: str):362    logger.debug("Reading commit: %s", commit_id)363    with open(os.path.join(self.paths.commits, commit_id)) as fin:364      return Commit(**dict(**json.load(fin), commit_id=commit_id))365  def _write_commit(self, commit: Commit):366    logger.debug("Writing commit: %s", commit.commit_id)367    with open(os.path.join(self.paths.commits, commit.commit_id), 'w') as fout:368      data = asdict(commit)369      del data['commit_id']370      json.dump(data, fout)371  def _delete_commit(self, commit_id: str):372    try:373      os.remove(os.path.join(self.paths.commits, commit_id))374    except FileNotFoundError:375      raise RitError("Failed to remove commit since it didn't exist: %s", commit_id)376    try:377      os.remove(get_tar_path(self, commit_id))378    except FileNotFoundError:379      raise RitError("Failed to remove commit's tar since it didn't exist: %s", commit_id)380    try:381      os.remove(get_snar_path(self, commit_id))382    except FileNotFoundError:383      raise RitError("Failed to remove commit's snar since it didn't exist: %s", commit_id)384  def _read_branch(self, name: str):385    logger.debug("Reading branch: %s", name)386    with open(os.path.join(self.paths.branches, name)) as fin:387      branch = json.load(fin)388      return Branch(**dict(**branch, name=name))389  def _write_branch(self, branch: Branch):390    logger.debug("Writing branch %s to %s", branch.name, branch.commit_id)391    if self.is_commit(branch.name):392      raise RitError('Not creating a branch with the same name as a commit id: %s', branch.name)393    data = asdict(branch)394    del data['name']395    with open(os.path.join(self.paths.branches, branch.name), 'w') as fout:396      json.dump(data, fout)397  def _read_branch_names(self):398    for _, _, branch_names in os.walk(self.paths.branches):399      return branch_names400    return []401  def _read_commit_ids(self):402    for _, _, commit_ids in os.walk(self.paths.commits):403      return commit_ids404    return []405  def _write_head(self, head: HeadNode):406    with open(os.path.join(self.paths.rit_dir, head_ref_name), 'w') as fout:407      json.dump(asdict(head), fout)408''' UTIL '''409def colorize(color: int, msg: str):410  reset_seq = "\033[0m"411  color_seq = "\033[1;{}m"412  color_section = color_seq + "{}" + reset_seq413  return color_section.format(color, msg)414def mkdir(*args, exists_ok=False, **kwargs):415  try:416    os.mkdir(*args, **kwargs)417  except FileExistsError:418    if not exists_ok:419      raise420def none_t():421  def none_type(obj):422    return obj is None423  return none_type424def exact_t(*types):425  def exact_type(obj):426    return isinstance(obj, types)427  return exact_type428def optional_t(obj_t):429  def optional_type(obj):430    if obj is None:431      return True432    else:433      return obj_t(obj)434  return optional_type435def list_t(obj_t):436  def list_type(obj):437    if isinstance(obj, list):438      return all(obj_t(val) for val in obj)439    return False440  return list_type441def check_types(**type_defs):442  for name, (obj, type_def) in type_defs.items():443    if not type_def(obj):444      raise TypeError(f"Element had invalid type: {name}: {type(obj)}")445def check_obj_types(obj, type_defs):446  objs = {}447  for key, type_def in type_defs.items():448    objs[key] = (getattr(obj, key), type_def)449def require(statement, msg, *args):450  if not statement:451    raise RitError(msg, *args)452''' RIT DIR HELPERS '''453def get_tar_path(rit: RitResource, commit_id: str):454  ''' get a commit's tar path, where the backup for that commit is '''455  return os.path.join(rit.paths.backups, commit_id + '.tar')456def get_snar_path(rit: RitResource, commit_id: str):457  ''' get a commit's star path, where the backup's metadata for that commit is '''458  return os.path.join(rit.paths.backups, commit_id + '.snar')459''' COMMIT HELPERS '''460def hash_commit(create_time: float, msg: str, snar: str, tar: str):461  ''' create a commit_id from '''462  logger.debug("Calculating the hash of ref")463  ref_hash = hashlib.sha1()464  ref_hash.update(b'create_time')465  ref_hash.update(str(create_time).encode('utf-8'))466  ref_hash.update(b'msg')467  ref_hash.update(msg.encode('utf-8'))468  ref_hash.update(b'snar')469  with open(snar, 'rb') as fin:470    ref_hash.update(fin.read())471  ref_hash.update(b'tar')472  with open(tar, 'rb') as fin:473    ref_hash.update(fin.read())474  return ref_hash.hexdigest()475def check_tar():476  ''' verify tar is the correct version and GNU '''477  logger.debug("Checking tar version")478  process = subprocess.Popen(['tar', '--version'], stdout=subprocess.PIPE)479  contents = process.stdout.read()480  process.wait()481  version = contents.decode('utf-8').split('\n', 1)[0]482  logger.debug("Tar Version: %s", version)483  assert 'GNU tar' in version, "You must have a GNU tar installed"484def status_tar(rit: RitResource, verbose: bool):485  ''' returns True if rit directory is dirty '''486  parent_commit_id = rit.get_head_commit_id()487  work_snar = os.path.join(rit.paths.work, 'ref.snar')488  if parent_commit_id is not None:489    head_snar = get_snar_path(rit, parent_commit_id)490    # TODO: move into rit resource491    shutil.copyfile(head_snar, work_snar)492  check_tar()493  tar_cmd = ['tar', '-cvg', work_snar, f'--exclude={rit_dir_name}', '-f', os.devnull, '.']494  logger.debug("Running tar command: %s", tar_cmd)495  # TODO: move into rit resource496  process = subprocess.Popen(tar_cmd, cwd=rit.paths.root, stdout=subprocess.PIPE)497  terminated = False498  dirty = False499  while True:500    line = process.stdout.readline()501    if not line:502      break503    if line == b'./\n':504      continue505    dirty = True506    if verbose:507      output = line.decode('utf-8').strip()508      logger.info("\t- %s", colorize(fg + red, output))509    else:510      terminated = True511      try:512        process.terminate()513      except Exception:514        pass515      break516  # still need to read the stdout to prevent blocking and therefore deadlock517  if terminated:518    while process.stdout.read(DEFAULT_BUFFER_SIZE):519      pass520  exit_code = process.wait()521  if not terminated and exit_code != 0:522    raise RitError("Creating commit's tar failed with exit code: %d", exit_code)523  os.remove(work_snar)524  return dirty525def create_commit(rit: RitResource, create_time: float, msg: str):526  ''' create a commit with the current head as the parent commit (if any) '''527  parent_commit_id = rit.get_head_commit_id()528  logger.debug("Parent ref: %s", parent_commit_id)529  work_tar = os.path.join(rit.paths.work, 'ref.tar')530  logger.debug("Working tar: %s", work_tar)531  work_snar = os.path.join(rit.paths.work, 'ref.snar')532  logger.debug("Working snar: %s", work_snar)533  if parent_commit_id is not None:534    head_snar = get_snar_path(rit, parent_commit_id)535    logger.debug("Copying previous snar: %s", head_snar)536    # TODO: move into rit resource537    shutil.copyfile(head_snar, work_snar)538  else:539    logger.debug("Using fresh snar file since no parent commit")540  check_tar()541  opts = '-cz'542  if logger.getEffectiveLevel() <= logging.DEBUG:543    opts += 'v'544  opts += 'g'545  tar_cmd = ['tar', opts, work_snar, f'--exclude={rit_dir_name}', '-f', work_tar, '.']546  logger.debug("Running tar command: %s", tar_cmd)547  # TODO: move into rit resource548  process = subprocess.Popen(tar_cmd, cwd=rit.paths.root)549  # TODO: doesn't forward SIGTERM, only SIGINT550  exit_code = process.wait()551  if exit_code != 0:552    raise RitError("Creating commit's tar failed with exit code: %d", exit_code)553  commit_id = hash_commit(create_time, msg, work_snar, work_tar)554  logger.debug("Moving working snar into backups directory")555  snar = get_snar_path(rit, commit_id)556  os.rename(work_snar, snar)557  logger.debug("Moving working tar into backups directory")558  tar = get_tar_path(rit, commit_id)559  os.rename(work_tar, tar)560  commit = Commit(parent_commit_id, commit_id, create_time, msg)561  rit.add_commit(commit)562  if rit.head.commit_id is not None:563    new_head = HeadNode(commit_id=commit_id, branch_name=None)564    rit.set_head(new_head)565  else:566    rit.set_branch(Branch(rit.head.branch_name, commit_id))567  return commit568''' RESET HELPERS '''569def apply_commit(rit: RitResource, commit: Commit):570  '''571  apply the commit to the rit directory572  If the current rit directory isn't clean and isn't the parent of the commit573  being applied, the results will not be the contents of commit.574  See restore_to_commit.575  '''576  logger.info("Applying commit: %s", commit.commit_id)577  tar_file = get_tar_path(rit, commit.commit_id)578  tar_cmd = ['tar', '-xg', os.devnull, '-f', tar_file]579  # rit resource thing?580  process = subprocess.Popen(tar_cmd, cwd=rit.paths.root)581  exit_code = process.wait()582  if exit_code != 0:583    raise RitError("Failed while trying to apply commit: %s", commit.commit_id)584def restore_to_commit(rit: RitResource, commit: Commit):585  '''586  This gets the chain of commits from commit to root and applies them. If there587  are changes in the working directory relative to current head, then those will588  be destroyed.589  '''590  logger.debug('resetting to %s', commit.commit_id)591  commit_chain = [commit]592  while commit.parent_commit_id is not None:593    commit = rit.get_commit(commit.parent_commit_id, ensure=True)594    commit_chain.append(commit)595  commit_chain.reverse()596  for commit in commit_chain:597    apply_commit(rit, commit)598''' BRANCH HELPERS '''599@dataclass600class ResolvedRef:601  '''602  the user provides a ref, which can reference head, a branch or commit. this603  object contains the HeadNode, Branch and Commit that the user's ref is604  referring to. all 3 or None can be defined.605  see resolve_ref606  '''607  commit: Optional[Commit] = None608  '''609  if None:610    if head:611      head points to a branch with no commit612    else:613      ref doesn't refer to a branch or commit614  else:615    ref ultimately refers to this commit616  '''617  branch: Optional[Branch] = None618  '''619  if head is None620    if branch is None:621      ref doesn't refer to a branch622    else:623      ref points to this branch624  else:625    if branch is None:626      head points to a commit.627      head points to a branch with no commit.628    else:629      head points to this branch630  '''631  head: Optional[HeadNode] = None632  '''633  if head is None:634    ref was provided and not the head635  else:636    ref was omitted or explicitly set to the head637  '''638def resolve_commit(rit: RitResource, partial_commit_id: str):639  '''640  resolve a user provided commit id to a commit. if no commit is found, then641  return None. if the ref is an ambiguous shortened commit id, then this642  function raises an exception.643  '''644  logger.debug("Resolving commit: %s", partial_commit_id)645  commit = rit.get_commit(partial_commit_id)646  if commit is not None:647    return commit648  if len(partial_commit_id) < short_hash_index:649    return None650  short_commit_id = partial_commit_id[:short_hash_index]651  commit_tree = rit.get_commit_tree()652  if short_commit_id not in commit_tree:653    return None654  commit = None655  for commit_id in commit_tree[short_commit_id]:656    size = len(partial_commit_id)657    if partial_commit_id == commit_id[:size]:658      if commit is not None:659        raise RitError("Reference %s matched commits %s and %s", partial_commit_id, commit.commit_id, commit_id)660      commit = rit.get_commit(commit_id, ensure=True)661  return commit662def resolve_ref(rit: RitResource, ref: Optional[str]):663  '''664  resolve a user provided reference665  if ref is None, then ref refers to the current head. ref can also explicitly666  refer to the current head. ref otherwise refers to a branch. if the branch is667  not found, then the ref refers to a commit. if no commit is found, all 3668  fields of ResolvedRef will be None.669  if the ref is an ambiguous shortened commit id, then this function raises an670  exception.671  see the def of ResolvedRef672  '''673  logger.debug("Resolving ref: %s", ref)674  res = ResolvedRef()675  if ref is None or ref == head_ref_name:676    head = rit.head677    res.head = head678    if head.branch_name is not None:679      res.branch = rit.get_branch(head.branch_name)680      if res.branch is not None:681        res.commit = rit.get_commit(res.branch.commit_id)682    else:683      res.commit = rit.get_commit(head.commit_id)684  else:685    res.branch = rit.get_branch(ref)686    if res.branch is not None:687      res.commit = rit.get_commit(res.branch.commit_id)688    else:689      res.commit = resolve_commit(rit, ref)690  return res691def resolve_refs(rit: RitResource, refs: list[str], all: bool):692  '''693  Returns information regarding the provided refs694  '''695  resolved_refs: list[ResolvedRef] = []696  if not refs:697    refs.append(None)698  if all:699    refs.extend(rit.get_branch_names())700  for ref in refs:701    res = resolve_ref(rit, ref)702    resolved_refs.append(res)703  return resolved_refs704branch_name_re = re.compile('^\\w+$')705def validate_branch_name(name: str):706  ''' return whether this string is a valid branch name '''707  if name == head_ref_name:708    raise RitError("Branch can't be named the same as the head ref: %s", name)709  elif branch_name_re.search(name) is None:710    raise RitError("Invalid branch name: %s", name)711def _pprint_dur(dur: int, name: str):712  return f"{dur} {name}{'s' if dur > 1 else ''}"713def pprint_time_duration(start: float, end: float):714  ''' pretty print a time duration '''715  start_dt = datetime.datetime.fromtimestamp(start)716  end_dt = datetime.datetime.fromtimestamp(end)717  dur = end - start718  dur_sec = dur719  dur_min = dur / 60720  dur_hour = dur_min / 60721  dur_day = dur_hour / 24722  dur_month = 12 * (end_dt.year - start_dt.year) + (end_dt.month - start_dt.month)723  dur_year = dur_month // 12724  parts = []725  if dur_year >= 5:726    parts.append(_pprint_dur(int(dur_year), 'year'))727  elif dur_year >= 1:728    parts.append(_pprint_dur(int(dur_year), 'year'))729    parts.append(_pprint_dur(int(dur_month) % 12, 'month'))730  elif dur_month >= 1:731    parts.append(_pprint_dur(int(dur_month) % 12, 'month'))732  elif dur_day >= 1:733    parts.append(_pprint_dur(int(dur_day), 'day'))734  elif dur_hour >= 1:735    parts.append(_pprint_dur(int(dur_hour) % 60, 'hour'))736  elif dur_min >= 1:737    parts.append(_pprint_dur(int(dur_min) % 60, 'minute'))738  elif dur_sec >= 20:739    parts.append(_pprint_dur(int(dur_sec) % 60, 'second'))740  else:741    return 'Just now'742  return ', '.join(parts) + ' ago'743''' SUB LOG COMMANDS '''744def log_commits(rit: RitResource, commits: list[Commit]):745  '''746  Returns tuple of the following:747  - commit_graph: a tree containing all provided commits748  - leafs: the set of leaf notes of the tree749  - commit_id_to_commit: a map of each commit to a full Commit object750  - commit_id_to_branch_names: a map of each commit_id to branch names, including the head_ref_name751  '''752  leafs: set[str] = set()753  commit_graph: dict[str, str] = {}754  for commit in commits:755    if commit.commit_id not in commit_graph:756      leafs.add(commit.commit_id)757    while True:758      commit_graph[commit.commit_id] = commit.parent_commit_id759      if commit.parent_commit_id is None:760        break761      parent_commit = rit.get_commit(commit.parent_commit_id, ensure=True)762      if parent_commit.commit_id in leafs:763        leafs.remove(parent_commit.commit_id)764      commit = parent_commit765  now = time.time()766  commit_id_to_branch_names = rit.get_commit_id_to_branch_names()767  commit_id_to_commit: dict[str, Commit] = {}768  for commit_id in leafs:769    logger.info("Log branch from %s", commit_id[:short_hash_index])770    while commit_id is not None:771      if commit_id not in commit_id_to_commit:772        commit_id_to_commit[commit_id] = rit.get_commit(commit_id, ensure=True)773      commit = commit_id_to_commit[commit_id]774      colored_commit_id = colorize(fg + yellow, commit.commit_id[:short_hash_index])775      if commit.commit_id in commit_id_to_branch_names:776        branch_names = commit_id_to_branch_names[commit.commit_id]777        colored_branch_names = []778        for branch_name in branch_names:779          if branch_name == head_ref_name:780            colored_branch_names.append(colorize(fg + blue, branch_name))781          else:782            colored_branch_names.append(colorize(fg + green, branch_name))783        branch_details = f"({', '.join(colored_branch_names)}) "784      else:785        branch_details = ''786      time_duration = pprint_time_duration(commit.create_time, now)787      date_details = f'({time_duration}) '788      logger.info("* %s %s%s%s", colored_commit_id, date_details, branch_details, commit.msg)789      commit_id = commit_graph[commit_id]790  return commit_graph, leafs, commit_id_to_commit, commit_id_to_branch_names791''' SUB BRANCH COMMANDS '''792def delete_branch(rit: RitResource, name: str):793  ''' removes a branch '''794  try:795    os.remove(os.path.join(rit.paths.branches, name))796  except FileNotFoundError:797    raise RitError("Failed to remove branch since it didn't exist.")798def list_branches(rit: RitResource):799  ''' logs branches to logger '''800  head = rit.head801  head_branch_name = head.branch_name802  branch_names = rit.get_branch_names()803  # TODO: this doesn't handle HEAD well if its commitless804  for branch_name in branch_names:805    this_sym = '*' if branch_name == head_branch_name else ' '806    branch = rit.get_branch(branch_name, ensure=True)807    commit = rit.get_commit(branch.commit_id, ensure=True)808    colored_commit_id = colorize(fg + yellow, branch.commit_id[:short_hash_index])809    colored_branch_name = colorize(fg + green, branch_name)810    logger.info("%s %s\t%s %s", this_sym, colored_branch_name, colored_commit_id, commit.msg)811  return head, branch_names812def create_branch(rit: RitResource, name: str, ref: Optional[str], force: bool):813  '''814  creates a branch with name name at ref ref. if the branch already exists,815  force will move the branch to the new commit.816  '''817  if rit.is_branch(name) and not force:818    raise RitError('Branch already exists: %s. Use -f to force the overwrite of it.', name)819  res = resolve_ref(rit, ref)820  if res.commit is None:821    if res.head is not None:822      raise RitError("Current head doesn't have a commit")823    else:824      raise RitError("Unable to resolve ref: %s", ref)825  commit_id = res.commit.commit_id826  branch = Branch(name, commit_id)827  rit.set_branch(branch)828  logger.info("Created branch %s at %s", name, commit_id[:short_hash_index])829  return res830def log_refs(rit: RitResource, refs: list[str], all: bool, full: bool):831  '''832  Returns the commit tree containing the given refs. If none are given, assumes833  head. If all is true, appends refs.834  '''835  commits = []836  if not refs:837    refs.append(None)838  if all:839    refs.extend(rit.get_branch_names())840  for ref in refs:841    res = resolve_ref(rit, ref)842    if res.commit is None:843      if res.head is not None:844        # TODO: this doesn't handle HEAD well if its commitless845        raise RitError("head branch doesn't have any commits")846      else:847        raise RitError("Unable to locate ref: %s", ref)848    commits.append(res.commit)849  return log_commits(rit, commits)850def show_ref(rit: RitResource, ref: Optional[str]):851  ''' log the contents of a specific reference '''852  res = resolve_ref(rit, ref)853  if res.commit is None:854    if res.head is not None:855      raise RitError("head branch doesn't have any commits to show")856    else:857      raise RitError("Unable to locate ref: %s", ref)858  tar_file = get_tar_path(rit, res.commit.commit_id)859  tar_cmd = ['tar', '-tf', tar_file]860  process = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE)861  changes = []862  while True:863    line = process.stdout.readline()864    if not line:865      break866    elif line == b'./\n':867      continue868    output = line.decode('utf-8').strip()869    changes.append(output)870    logger.info("\t- %s", colorize(fg + cyan, output))871  results = process.wait()872  if results != 0:873    raise RitError("tar command failed with exit code %d", results)874  return res, changes875def status_head(rit: RitResource):876  ''' run status_tar on HEAD with logging '''877  if rit.head.branch_name is not None:878    head_id = rit.head.branch_name879  else:880    head_id = rit.head.commit_id881  logger.info("%s -> %s", head_ref_name, head_id)882  if not status_tar(rit, True):883    logger.info("Clean working directory!")884def prune_commits(rit: RitResource):885  ''' remove any commits not associated to a branch history '''886  return rit.prune()887def checkout_ref(*, root_rit_dir: str, ref: str, force: bool):888  '''889  Checkout ref (overwriting any changes if force is True).890  It will also check that the current working directory is clean so that we know891  no data is lost. If the working tree is dirty, then force needs to be set to892  True to forcibly delete these changes.893  Returns ref resolved, as a ResolvedRef.894  The end result is that head points to ref, and the working directory is895  restored to the point of ref.896  It is the equivalent of the following:897  - if head is a branch, set head to the underlying commit898  - hard reset to ref899  - set head to ref900  Or if you allow head to be moved to non equivalent commits:901  - set head to ref902  - hard reset to new head903  '''904  logger.debug('checkout_ref')905  logger.debug('  ref: %s', ref)906  logger.debug('  force: %s', force)907  check_types(908    ref = (ref, exact_t(str)),909    force = (force, exact_t(bool)),910  )911  rit = RitResource(root_rit_dir)912  res = resolve_ref(rit, ref)913  if res.head is not None:914    raise RitError("Attempted to checkout head ref")915  elif res.commit is None:916    raise RitError("Unable to resolve ref to commit: %s", ref)917  commit = res.commit918  restored = False919  head_commit_id = rit.get_head_commit_id()920  if head_commit_id is not None and head_commit_id != commit.commit_id:921    if not force and status_tar(rit, False):922      raise RitError("Uncommitted changes! Commit them or use -f to destroy them.")923    restored = True924    restore_to_commit(rit, commit)925  if res.branch is not None:926    new_head = HeadNode(branch_name = res.branch.name)927  else:928    new_head = HeadNode(commit_id = commit.commit_id)929  rit.set_head(new_head)930  if restored:931    logger.info("Successful checkout. Commit this checkout to get a clean rit status.")932  else:933    logger.info("Switched to new ref")934  return res935def checkout_orphan(*,936      root_rit_dir: str,937      name: str):938  '''939  Move head to point at branch with name name. Ensures that name doesn't already940  exist. This results in the head not pointing to any commit. Next commit will941  create a new branch.942  '''943  logger.debug('checkout_orphan')944  logger.debug('  name: %s', name)945  check_types(946    name = (name, optional_t(exact_t(str))),947  )948  validate_branch_name(name)949  rit = RitResource(root_rit_dir)950  if rit.is_branch(name):951    raise RitError("Orphan checkout failed because branch already exists.")952  new_head = HeadNode(branch_name=name)953  rit.set_head(new_head)954'''955API956You should be able to do anything you want with these functions.957'''958def query_cmd(*, root_rit_dir: str):959  '''960  Return a read only RitResource used for querying the rit directory.961  CLI users get information via stdout. That's not helpful to for python users,962  so they instead get access to the full data structure.963  For advanced users who want write access to the rit directory, construct a964  RitResource directly with prevent_mutations set to False and read the class'965  docs.966  '''967  return RitResource(root_rit_dir, prevent_mutations=True)968def init_cmd(*, root_rit_dir: str):969  logger.debug("init")970  rit = RitResource(root_rit_dir)971  rit.initialize()972def commit_cmd(*, root_rit_dir: str, msg: str):973  logger.debug('commit')974  logger.debug('  msg: %s', msg)975  check_types(976    msg = (msg, exact_t(str)),977  )978  rit = RitResource(root_rit_dir)979  commit = create_commit(rit, time.time(), msg)980  logger.info("Created commit %s: %s", commit.commit_id[:short_hash_index], commit.msg)981  return commit982def reset_cmd(*, root_rit_dir: str, ref: Optional[str], hard: bool):983  '''984  A reset tries to move the head to ref. If head is a branch, it instead moves985  the branch. If head is a commit, it just moves head to the new commit. If it's986  a hard reset, then post moving head, the working changes are removed.987  Here, a non hard reset is effectively the same as git's reset with no988  arguments. In git, a reset with no args is --mixed, and since we don't have989  staged changes, --mixed and --soft are the same in this context.  For that990  reason, here, we just use the terminology reset instead of soft reset or mixed991  reset.992  '''993  logger.debug('reset')994  logger.debug('  ref: %s', ref)995  logger.debug('  hard: %s', hard)996  check_types(997    ref = (ref, optional_t(exact_t(str))),998    hard = (hard, exact_t(bool)),999  )1000  rit = RitResource(root_rit_dir)1001  res = resolve_ref(rit, ref)1002  if res.commit is None:1003    raise RitError("Unable to resolve ref to commit: %s", ref)1004  if rit.head.branch_name is not None:1005    branch = rit.get_branch(rit.head.branch_name, ensure=False)1006    if branch is None:1007      branch = Branch(rit.head.branch_name, res.commit.commit_id)1008    else:1009      branch.commit_id = res.commit.commit_id1010    rit.set_branch(branch)1011  else:1012    new_head = HeadNode(commit_id = res.commit.commit_id)1013    rit.set_head(new_head)1014  restored = False1015  if hard:1016    restored = True1017    restore_to_commit(rit, res.commit)1018  if restored:1019    logger.info("Successful reset. Commit this checkout to get a clean rit status.")1020  else:1021    logger.info("Successful reset")1022  return res1023def checkout_cmd(*, root_rit_dir: str, orphan: bool, ref_or_name: str, force: Optional[bool]):1024  '''1025  Forwards to checkout_ref or checkout_orphan1026  '''1027  logger.debug('checkout')1028  logger.debug('  orphan: %s', orphan)1029  logger.debug('  ref_or_name: %s', ref_or_name)1030  logger.debug('  force: %s', force)1031  check_types(...test_rit.py
Source:test_rit.py  
...238  assert new_commit.parent_commit_id is None239  # get rit res240  rit_res = query_cmd(**base_kwargs)241  # verify commit id matches what was returned242  new_commit_lookup = rit_res.get_commit(new_commit.commit_id, ensure=True)243  assert new_commit_lookup.commit_id is not None244  assert new_commit_lookup.commit_id == new_commit.commit_id245  assert new_commit_lookup.parent_commit_id == new_commit.parent_commit_id246  assert new_commit_lookup.create_time == new_commit.create_time247  assert new_commit_lookup.msg == new_commit.msg248  root_commit_id = new_commit_lookup.commit_id249  # verify that head is still on branch250  assert rit_res.head.branch_name == 'otest'251  assert rit_res.head.commit_id is None252  # verify that branch matches claimed commit253  new_branch = rit_res.get_branch('otest')254  assert new_branch is not None255  assert new_branch.commit_id == new_commit.commit_id256  # add files257  branch_cmd(**base_kwargs, name="otest_root", ref=None, force=False, delete=False)258  otest_a_file = touch(root_rit_dir, 'otest_a')259  commit_cmd(**base_kwargs, msg="add a")260  branch_cmd(**base_kwargs, name="otest_a", ref=None, force=False, delete=False)261  otest_b_file = touch(root_rit_dir, 'otest_b')262  top_commit = commit_cmd(**base_kwargs, msg="add b")263  branch_cmd(**base_kwargs, name="otest_b", ref=None, force=False, delete=False)264  otest_c_file = touch(root_rit_dir, 'otest_c')265  rit_res = query_cmd(**base_kwargs)266  assert rit_res.head.branch_name == 'otest'267  otest_root_branch = rit_res.get_branch('otest_root')268  assert otest_root_branch is not None269  otest_a_branch = rit_res.get_branch('otest_a')270  assert otest_a_branch is not None271  otest_b_branch = rit_res.get_branch('otest_b')272  assert otest_b_branch is not None273  otest_branch = rit_res.get_branch('otest')274  assert otest_branch is not None275  assert otest_branch.commit_id == otest_b_branch.commit_id276  assert top_commit.commit_id == otest_branch.commit_id277  assert top_commit.parent_commit_id == otest_a_branch.commit_id278  assert root_commit_id == rit_res.get_commit(otest_a_branch.commit_id, ensure=True).parent_commit_id279  commit_chain = [root_commit_id, top_commit.parent_commit_id, top_commit.commit_id]280  # make orphan and make sure changes are still there281  checkout_cmd(**base_kwargs, orphan=True, ref_or_name='otest_2', force=None)282  assert os.path.exists(otest_a_file)283  assert os.path.exists(otest_b_file)284  assert os.path.exists(otest_c_file)285  checkout_cmd(**base_kwargs, orphan=True, ref_or_name='otest_3', force=None)286  assert os.path.exists(otest_a_file)287  assert os.path.exists(otest_b_file)288  assert os.path.exists(otest_c_file)289  # get rit res290  rit_res = query_cmd(**base_kwargs)291  assert rit_res.get_branch('otest_2') is None292  assert rit_res.get_branch('otest_3') is None293  assert rit_res.head.branch_name == 'otest_3'294  reset_cmd(**base_kwargs, ref=commit_chain[0], hard=False)295  rit_res = query_cmd(**base_kwargs)296  assert rit_res.head.branch_name == 'otest_3'297  assert rit_res.head.commit_id is None298  this_branch = rit_res.get_branch('otest_3', ensure=True)299  assert this_branch is not None300  assert this_branch.commit_id == commit_chain[0]301  assert os.path.exists(otest_a_file)302  assert os.path.exists(otest_b_file)303  assert os.path.exists(otest_c_file)304  reset_cmd(**base_kwargs, ref=None, hard=False)305  rit_res = query_cmd(**base_kwargs)306  assert rit_res.get_branch('otest_2') is None307  assert rit_res.head.branch_name == 'otest_3'308  reset_cmd(**base_kwargs, ref=rit_lib.head_ref_name, hard=False)309  rit_res = query_cmd(**base_kwargs)310  assert rit_res.get_branch('otest_2') is None311  assert rit_res.head.branch_name == 'otest_3'312  reset_cmd(**base_kwargs, ref=commit_chain[1], hard=False)313  rit_res = query_cmd(**base_kwargs)314  assert rit_res.head.branch_name == 'otest_3'315  assert rit_res.head.commit_id is None316  this_branch = rit_res.get_branch('otest_3', ensure=True)317  assert this_branch is not None318  assert this_branch.commit_id == commit_chain[1]319  assert os.path.exists(otest_a_file)320  assert os.path.exists(otest_b_file)321  assert os.path.exists(otest_c_file)322  reset_cmd(**base_kwargs, ref=None, hard=False)323  rit_res = query_cmd(**base_kwargs)324  assert rit_res.head.branch_name == 'otest_3'325  reset_cmd(**base_kwargs, ref=rit_lib.head_ref_name, hard=False)326  rit_res = query_cmd(**base_kwargs)327  assert rit_res.head.branch_name == 'otest_3'328  reset_cmd(**base_kwargs, ref=commit_chain[2], hard=False)329  rit_res = query_cmd(**base_kwargs)330  assert rit_res.head.branch_name == 'otest_3'331  assert rit_res.head.commit_id is None332  this_branch = rit_res.get_branch('otest_3', ensure=True)333  assert this_branch is not None334  assert this_branch.commit_id == commit_chain[2]335  assert os.path.exists(otest_a_file)336  assert os.path.exists(otest_b_file)337  assert os.path.exists(otest_c_file)338  checkout_cmd(**base_kwargs, orphan=False, ref_or_name=commit_chain[2], force=False)339  rit_res = query_cmd(**base_kwargs)340  assert rit_res.head.branch_name is None341  assert rit_res.head.commit_id == commit_chain[2]342  reset_cmd(**base_kwargs, ref=commit_chain[0], hard=False)343  rit_res = query_cmd(**base_kwargs)344  assert rit_res.head.branch_name is None345  assert rit_res.head.commit_id == commit_chain[0]346  assert os.path.exists(otest_a_file)347  assert os.path.exists(otest_b_file)348  assert os.path.exists(otest_c_file)349  reset_cmd(**base_kwargs, ref=None, hard=False)350  rit_res = query_cmd(**base_kwargs)351  assert rit_res.head.commit_id == commit_chain[0]352  reset_cmd(**base_kwargs, ref=rit_lib.head_ref_name, hard=False)353  rit_res = query_cmd(**base_kwargs)354  assert rit_res.head.commit_id == commit_chain[0]355  reset_cmd(**base_kwargs, ref=commit_chain[2], hard=False)356  rit_res = query_cmd(**base_kwargs)357  assert rit_res.head.branch_name is None358  assert rit_res.head.commit_id == commit_chain[2]359  assert os.path.exists(otest_a_file)360  assert os.path.exists(otest_b_file)361  assert os.path.exists(otest_c_file)362  reset_cmd(**base_kwargs, ref=commit_chain[0], hard=False)363  rit_res = query_cmd(**base_kwargs)364  assert rit_res.head.branch_name is None365  assert rit_res.head.commit_id == commit_chain[0]366  assert os.path.exists(otest_a_file)367  assert os.path.exists(otest_b_file)368  assert os.path.exists(otest_c_file)369  reset_cmd(**base_kwargs, ref=commit_chain[2], hard=True)370  rit_res = query_cmd(**base_kwargs)371  assert rit_res.head.branch_name is None372  assert rit_res.head.commit_id == commit_chain[2]373  assert os.path.exists(otest_a_file)374  assert os.path.exists(otest_b_file)375  assert not os.path.exists(otest_c_file)376  reset_cmd(**base_kwargs, ref=commit_chain[1], hard=True)377  rit_res = query_cmd(**base_kwargs)378  assert rit_res.head.branch_name is None379  assert rit_res.head.commit_id == commit_chain[1]380  assert os.path.exists(otest_a_file)381  assert not os.path.exists(otest_b_file)382  assert not os.path.exists(otest_c_file)383  reset_cmd(**base_kwargs, ref=commit_chain[0], hard=True)384  rit_res = query_cmd(**base_kwargs)385  assert rit_res.head.branch_name is None386  assert rit_res.head.commit_id == commit_chain[0]387  assert not os.path.exists(otest_a_file)388  assert not os.path.exists(otest_b_file)389  assert not os.path.exists(otest_c_file)390  reset_cmd(**base_kwargs, ref=commit_chain[1], hard=True)391  rit_res = query_cmd(**base_kwargs)392  assert rit_res.head.branch_name is None393  assert rit_res.head.commit_id == commit_chain[1]394  assert os.path.exists(otest_a_file)395  assert not os.path.exists(otest_b_file)396  assert not os.path.exists(otest_c_file)397  reset_cmd(**base_kwargs, ref=commit_chain[2], hard=True)398  rit_res = query_cmd(**base_kwargs)399  assert rit_res.head.branch_name is None400  assert rit_res.head.commit_id == commit_chain[2]401  assert os.path.exists(otest_a_file)402  assert os.path.exists(otest_b_file)403  assert not os.path.exists(otest_c_file)404  checkout_cmd(**base_kwargs, orphan=False, ref_or_name='otest_3', force=True)405  reset_cmd(**base_kwargs, ref=commit_chain[2], hard=True)406  rit_res = query_cmd(**base_kwargs)407  assert rit_res.head.branch_name == 'otest_3'408  assert rit_res.head.commit_id is None409  assert os.path.exists(otest_a_file)410  assert os.path.exists(otest_b_file)411  assert not os.path.exists(otest_c_file)412  reset_cmd(**base_kwargs, ref=commit_chain[1], hard=True)413  rit_res = query_cmd(**base_kwargs)414  assert rit_res.head.branch_name == 'otest_3'415  assert rit_res.head.commit_id is None416  assert os.path.exists(otest_a_file)417  assert not os.path.exists(otest_b_file)418  assert not os.path.exists(otest_c_file)419  reset_cmd(**base_kwargs, ref=commit_chain[0], hard=True)420  rit_res = query_cmd(**base_kwargs)421  assert rit_res.head.branch_name == 'otest_3'422  assert rit_res.head.commit_id is None423  assert not os.path.exists(otest_a_file)424  assert not os.path.exists(otest_b_file)425  assert not os.path.exists(otest_c_file)426  checkout_cmd(**base_kwargs, orphan=False, ref_or_name='deviate', force=True)427  rit_res = query_cmd(**base_kwargs)428  head_branch = rit_res.get_branch(rit_res.head.branch_name)429  head_commit_id = head_branch.commit_id430  head_commit = rit_res.get_commit(head_commit_id, ensure=True)431  reset_cmd(**base_kwargs, ref=head_commit.parent_commit_id, hard=False)432  rit_res = query_cmd(**base_kwargs)433  rit_res.get_commit(head_commit_id, ensure=True)434  prune_res = prune_cmd(**base_kwargs)435  rit_res = query_cmd(**base_kwargs)436  assert rit_res.get_commit(head_commit_id) is None437  assert len(prune_res) == 1438  assert prune_res[0] == head_commit_id439  rit_res = query_cmd(**base_kwargs)440  head_commit = rit_res.get_head_commit_id()441  checkout_cmd(**base_kwargs, orphan=False, ref_or_name=head_commit, force=True)442  reset_cmd(**base_kwargs, ref='deviate', hard=False)443  rit_res = query_cmd(**base_kwargs)444  assert rit_res.head.branch_name is None...CA4_Python_Script.py
Source:CA4_Python_Script.py  
1# Name: Paul Prew2# Student Number: 103548283# Programming for Big Data4# CA 45# This is a python program that will execute to read in a sample text data file which is named 'changes_python.txt'.  6# This program works to scrub/clean the data, and differentiate the content into 422 separate objects, 7# which can be represented as rows on a spreadsheet.8# Where possible additional data was derived from the data e.g. week number was derived from the date. 9# The data was mostly extracted from the title row.  10# Additionally, for each row the number of files actioned for each revision number was calculated, and broken out by action type e.g. A, D, M, R.11# This extract provides a count of the total number of file changes, broken out by action type. 12# This also enables analysis on the number of file changed by author, and breakdown by action type.13# In the program the 'Commit' class is used to define the elements of the 'commit data' extracted from the file.14# Each of the 422 commits, are represented as instance objects of the 'Commit' class.15# Functions operate on these objects to perform analysis on the commit data, and return the result as dictionary files.  16# Dictionary files are used for the analysys outputs, as this facilitates export as CSV. 17# Visualisation of analysis was done using charts in R Studio, and Microsoft Excel.18# When the program executes, the user is asked to confirm the action, to begin data file read.19# If successful, a message printed to the console will acknowledge this, and also display the total number of commits in the extract 20# and the total number of lines in the file.21# The user is then asked to confirm next action, to perform analysis on the commit data. 22# The program will then execute the analysis and export the results to CSV files in the working directory.23# When above step is completed, a message printed to the console will confirm this.24# The user will now be asked if they wish to print the analysis results to the console.  If the user selects this option25# the results are printed on screen.26sep = 72*'-'27import datetime28import csv29import os30# Below defines the class 'Commit' which is used to define the elements of the 'commit data' extracted from the file.31class Commit:32    'class for commits'33   34    def __init__(self, revision = None, author = None, fulldatetime = None, fulldate = None, dateinmonth = None, dayinweek = None, weeknumber = None, month = None, year = None,35            comment_line_count = None, number_changes = None, number_a = None, number_d = None, number_m = None, number_r = None, changes = None, comment = None):36        self.revision = revision37        self.author = author38        self.fulldatetime = fulldatetime39        self.fulldate = fulldate40        self.dateinmonth = dateinmonth41        self.dayinweek = dayinweek42        self.weeknumber = weeknumber43        self.month = month44        self.year = year45        self.comment_line_count = comment_line_count46        self.number_changes = number_changes47        self.changes = changes48        self.number_a = number_a49        self.number_d = number_d50        self.number_m = number_m51        self.number_r = number_r52        self.comment = comment53        54# Below is a generic function that will accept as parameter an attribute from the commit data55# This allows this function to be called multiple times with different parameter values 56# e.g. revisions by author, revisions by day.57def get_revisions_by_attribute(data, attribute):58    result = {}59    for commit in data:60        value = getattr(commit, attribute)61        item = value62        result[item] = result.get(item, 0) + 163    return result64# Below function will get the total number of file changes for all 422 commits,65# and break them out by file change action type i.e. A, D, M, R,66# and also by author    67def get_file_changes_by_action(data):68    all_authors = []69    for commit in data:70        new_author = {}71        author = commit.author72        match_found = False 73        if len(all_authors) > 0:74            for record in all_authors:75                if record['Author'] == author :76                    record['Action Type A'] += commit.number_a77                    record['Action Type D'] += commit.number_d78                    record['Action Type M'] += commit.number_m79                    record['Action Type R'] += commit.number_r80                    record['Total Files Actioned'] += commit.number_changes81                    match_found = True82                    break83                   84        if len(all_authors) == 0 or match_found == False :       85            new_author = {'Author': author, 'Action Type A': commit.number_a, 'Action Type D': commit.number_d, 'Action Type M': commit.number_m, 86                'Action Type R': commit.number_r, 'Total Files Actioned' : commit.number_changes }  87            all_authors.append(new_author)88     89    return all_authors90    91def read_file(changes_file):92    data = [line.strip() for line in open(changes_file, 'r')]93    return data  94def get_commits(data):95    commits = []96    current_commit = None97    index = 0   98    author = {}99    while True:100        try:101            current_commit = Commit()102            details = data[index + 1].split('|')103            current_commit.revision = int(details[0].strip().strip('r'))104            current_commit.author = details[1].strip()105            current_commit.fulldatetime = details[2].strip()    106            current_commit.dateinmonth = int(details[2][9:11])107            current_commit.month = int(details[2][6:8])108            current_commit.year = int(details[2][0:5])109            current_commit.fulldate = datetime.date(current_commit.year, current_commit.month, current_commit.dateinmonth)110            current_commit.weeknumber = current_commit.fulldate.strftime("%W")111            current_commit.dayinweek = details[2][28:31]112            current_commit.comment_line_count = int(details[3].strip().split(' ')[0])113            current_commit.changes = data[index+2:data.index('',index+1)]114            current_commit.number_changes = len(current_commit.changes)-1 115            current_commit.number_a = 0116            current_commit.number_d = 0117            current_commit.number_m = 0118            current_commit.number_r = 0119            for i in range(current_commit.number_changes) :120                if current_commit.changes[i + 1].startswith("A /") :121                    current_commit.number_a = current_commit.number_a + 1122                elif current_commit.changes[i + 1].startswith("D /") :123                    current_commit.number_d = current_commit.number_d + 1124                elif current_commit.changes[i + 1].startswith("M /") :125                    current_commit.number_m = current_commit.number_m + 1126                elif current_commit.changes[i + 1].startswith("R /") :127                    current_commit.number_r = current_commit.number_r + 1128            index = data.index(sep, index + 1)129            current_commit.comment = data[index-current_commit.comment_line_count:index]130            commits.append(current_commit)131        except IndexError:132            break133    return commits134# this function saves the commit list (class instances) to a CSV log file135def save_commit_list_as_csv (location):    136    with open(location,"wb") as data_export:137        fnameWriter = csv.writer(data_export)138        fnameWriter.writerow(["Revision", "Author", "Full Date/Time", "Full Date", "Date in Month", "Day in Week", "Week Number", "Month", "Year", "Comment Line Count", "Number Changes", "Number A", 139            "Number D", "Number M", "Number R", "Comment"])140        for commit in results:141            fnameWriter.writerow([commit.revision, commit.author, commit.fulldatetime, commit.fulldate, commit.dateinmonth, commit.dayinweek, commit.weeknumber, commit.month,142            commit.year, commit.comment_line_count, commit.number_changes, commit.number_a, commit.number_d, commit.number_m, commit.number_r,143            commit.comment])144    data_export.close()145# this function saves the analysis results to the console, and is used where the result to save, is formatted as a single dictionary146# the function accepts as parameters: data, and location, so can be used many times 147def save_dict_as_csv (data,location):148    with open(location,'wb') as output_file:149        w = csv.writer(output_file)150        w.writerows(data.items())151    output_file.close()152# this function saves the analysis results to a CSV log file, and is used where the result to save, is formatted as a list of dictionaries153def save_dict_list_as_csv(data, location) :          154    keys = data[0].keys()155    with open(location, 'wb') as output_file:156        dict_writer = csv.DictWriter(output_file, keys)157        dict_writer.writeheader()158        dict_writer.writerows(data)    159    output_file.close()160# this function prints the analysis results to the console, and is used where the result to print, is formatted as a single dictionary161# the function accepts as parameters: data, and location, so can be used many times     162def print_dict_to_console(data,title):163    print ''164    print title165    print "="*len(title)166    ls = list()167    for key, val in data.items():168        ls.append((val,key))169    ls.sort(reverse = True)170    for key, val in ls:171        print key, val172    return ls173# this function prints the analysis results to the console, and is used where the result to print, is formatted as a list of dictionaries174def print_dict_list_to_console(data, title): 175    print ''176    print title177    print "=" * len(title)178    for row in data:179        print row180def export_analysis_data_to_CSV():181    filetoCSV = authors182    save_dict_as_csv(filetoCSV,"Python_Export_Counts of Revisions By Author.csv") 183    filetoCSV = days184    save_dict_as_csv(filetoCSV,"Python Export_Counts of Revisions By Day in Week.csv") 185    filetoCSV = change_files186    save_dict_list_as_csv(filetoCSV,"Python Export_Counts of Files Changed By Action Type.csv")         187    188# main program 189if __name__ == '__main__':       190    191    os.system('cls')192    193    print '\n***************************************\n'194    print "Analysis for sample text file"195    print "Text file is 'changes_python.txt'"196    print '\n***************************************\n'197    while True:198        try: 199            s_inp = raw_input("Enter any key to read data file and get commits: ")200            data = read_file('changes_python.txt')201            results = get_commits(data)202            # following line saves the commit list with 422 rows to a csv which acts as a log file203            save_commit_list_as_csv("Python Export_Commit List.csv") 204            print "\nFile read was successful, and data has been extracted"205            print "{} rows of commits extracted from {} lines"  .format(len(results), len(data))206            print "Results saved to csv log file in working directory"207            break208        209        except:210            print "\nError while reading file. Please try again."211            print "Ensure text file is saved as 'changes_python.txt and is saved in the working directory."212            continue    213    214    s_inp = raw_input("\nEnter any key to perform analysis on commit data: ")215    216    authors = get_revisions_by_attribute(results, "author")217    days = get_revisions_by_attribute(results, "dayinweek")218    change_files = get_file_changes_by_action(results)219    220    export_analysis_data_to_CSV()221    print "\nAnalysis on commit data is completed"222    print "Results saved as CSV files in working directory"223     224    print "\nEnter 'Y' to print analysis summary to console"225    print "Enter any other key to exit program"226    choice = raw_input()227    if choice.lower() == 'y' :228        print_dict_to_console(authors, "Revisions By Author")229        print_dict_to_console(days, "Revisions By Day in Week")230        print_dict_list_to_console(change_files, "Revisions By File Changes By Author")231    else :232        print "\nExited program"233    234   235     236     ...compare_footprint
Source:compare_footprint  
...66    parser.add_argument('-c', '--commit', default=None,67                        help="Commit ID to use compare footprint against base. "68                                    "Default is HEAD or working tree.")69    return parser.parse_args()70def get_git_commit(commit):71    commit_id = None72    proc = subprocess.Popen('git rev-parse %s' % commit, stdout=subprocess.PIPE,73                            cwd=os.environ.get('ZEPHYR_BASE'), shell=True)74    if proc.wait() == 0:75        commit_id = proc.stdout.read().decode("utf-8").strip()76    return commit_id77def sanity_results_filename(commit=None, cwd=os.environ.get('ZEPHYR_BASE')):78    if not commit:79        file_name = "tmp.csv"80    else:81        if commit == RELEASE_DATA:82            file_name = RELEASE_DATA83        else:84            file_name = "%s.csv" % commit85    return os.path.join(cwd,'scripts', 'sanity_chk', file_name)86def git_checkout(commit, cwd=os.environ.get('ZEPHYR_BASE')):87    proc = subprocess.Popen('git diff --quiet', stdout=subprocess.PIPE,88                            stderr=subprocess.STDOUT, cwd=cwd, shell=True)89    if proc.wait() != 0:90        raise Exception("Cannot continue, you have unstaged changes in your working tree")91    proc = subprocess.Popen('git reset %s --hard' % commit,92                            stdout=subprocess.PIPE,93                            stderr=subprocess.STDOUT,94                            cwd=cwd, shell=True)95    if proc.wait() == 0:96        return True97    else:98        logger.error(proc.stdout.read())99    return False100def run_sanity_footprint(commit=None, cwd=os.environ.get('ZEPHYR_BASE'),101                         output_file=None):102    if not output_file:103        output_file = sanity_results_filename(commit)104    cmd = '/bin/bash -c "source ./zephyr-env.sh && twister'105    cmd += ' +scripts/sanity_chk/sanity_compare.args -o %s"' % output_file106    logger.debug('Sanity (%s)   %s' %(commit, cmd))107    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,108                            cwd=cwd, shell=True)109    output,_ = proc.communicate()110    if proc.wait() == 0:111        logger.debug(output)112        return True113    logger.error("Couldn't build footprint apps in commit %s" % commit)114    logger.error(output)115    raise Exception("Couldn't build footprint apps in commit %s" % commit)116def run_footprint_build(commit=None):117    logging.debug("footprint build for %s" % commit)118    if not commit:119        run_sanity_footprint()120    else:121        cmd = "git clone --no-hardlinks %s" % os.environ.get('ZEPHYR_BASE')122        tmp_location = os.path.join(tempfile.gettempdir(),123                                os.path.basename(os.environ.get('ZEPHYR_BASE')))124        if os.path.exists(tmp_location):125            shutil.rmtree(tmp_location)126        logging.debug("clonning into %s" % tmp_location)127        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,128                                stderr=subprocess.STDOUT,129                                cwd=tempfile.gettempdir(), shell=True)130        if proc.wait() == 0:131            if git_checkout(commit, tmp_location):132                run_sanity_footprint(commit, tmp_location)133        else:134            logger.error(proc.stdout.read())135        shutil.rmtree(tmp_location, ignore_errors=True)136    return True137def read_sanity_report(filename):138    data = []139    with open(filename) as fp:140        tmp = csv.DictReader(fp)141        for row in tmp:142            data.append(row)143    return data144def get_footprint_results(commit=None):145    sanity_file = sanity_results_filename(commit)146    if (not os.path.exists(sanity_file) or not commit) and commit != RELEASE_DATA:147        run_footprint_build(commit)148    return read_sanity_report(sanity_file)149def tree_changes():150    proc = subprocess.Popen('git diff --quiet', stdout=subprocess.PIPE,151                            cwd=os.environ.get('ZEPHYR_BASE'), shell=True)152    if proc.wait() != 0:153        return True154    return False155def get_default_current_commit():156    if tree_changes():157        return None158    else:159        return get_git_commit('HEAD')160def get_default_base_commit(current_commit):161    if not current_commit:162        if tree_changes():163            return get_git_commit('HEAD')164        else:165            return get_git_commit('HEAD~1')166    else:167        return get_git_commit('%s~1'%current_commit)168def build_history(b_commit=None, c_commit=None):169    if not GIT_ENABLED:170        logger.info('Working on current tree, not git enabled.')171        current_commit = None172        base_commit = RELEASE_DATA173    else:174        if not c_commit:175            current_commit = get_default_current_commit()176        else:177            current_commit = get_git_commit(c_commit)178        if not b_commit:179            base_commit = get_default_base_commit(current_commit)180        else:181            base_commit = get_git_commit(b_commit)182    if not base_commit:183        logger.error("Cannot resolve base commit")184        return185    logger.info("Base:    %s" % base_commit)186    logger.info("Current: %s" % (current_commit if current_commit else187                    'working space'))188    current_results = get_footprint_results(current_commit)189    base_results = get_footprint_results(base_commit)190    deltas = compare_results(base_results, current_results)191    print_deltas(deltas)192def compare_results(base_results, current_results):193    interesting_metrics = [("ram_size", int),194                           ("rom_size", int)]195    results = {}...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
