Best Python code snippet using ATX
train.py
Source:train.py  
...87      labels.update(arc.ilabel for arc in fst.arcs(state))88    reader.next()89  assert not reader.error()90  return labels91def _mktemp(suffix: str) -> str:92  """Creates temporary file with desired suffix.93  Args:94    suffix: the desired suffix for the temporary file.95  Returns:96    Path to a temporary file.97  """98  path = tempfile.mkstemp(suffix=f".{suffix}")[1]99  logging.debug("New temporary file:\t%s", path)100  return path101def _rmtemp(path: str) -> None:102  """Removes temporary file.103  Args:104    path: path to temporary file to be removed.105  """106  logging.debug("Removing temporary file:\t%s", path)107  os.remove(path)108@dataclasses.dataclass109class RandomStart:110  """Struct representing a random start."""111  idx: int112  seed: int113  ifar_path: str114  ofar_path: str115  cg_path: str116  train_opts: List[str]117  def train(self) -> Tuple[str, float]:118    """Trains a single random start.119    Returns:120      A tuple containing the aligner FST path and the negative log likelihood.121    """122    start = time.time()123    # Randomizes the channel model weights.124    rfst_path = _mktemp(f"random-{self.seed:05d}.fst")125    _log_check_call([126        "baumwelchrandomize",127        f"--seed={self.seed}",128        self.cg_path,129        rfst_path,130    ])131    # Trains model and reads likelihood.132    afst_path = _mktemp(f"aligner-{self.seed:05d}.fst")133    likelihood = INF134    cmd = [135        "baumwelchtrain",136        *self.train_opts,137        self.ifar_path,138        self.ofar_path,139        rfst_path,140        afst_path,141    ]142    logging.debug("Subprocess call:\t%s", " ".join(cmd))143    with subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True) as proc:144      for line in proc.stderr:  # type: ignore145        match = re.match(146            r"INFO:\s+Iteration\s+(\d+):\s+(-?\d*(\.\d*)?)",147            line.rstrip(),148        )149        assert match, line150        iteration = int(match.group(1))151        likelihood = float(match.group(2))152        logging.debug(153            "start:\t%3d;\titer:\t%3d;\tLL:\t%.4f;\telapsed:\t%3ds",154            self.idx,155            iteration,156            likelihood,157            time.time() - start,158        )159    _rmtemp(rfst_path)160    return afst_path, likelihood161# Major stages.162def _compile_fars(tsv: str, input_token_type: str,163                  output_token_type: str) -> Tuple[str, str]:164  """Compiles FAR files and returns their paths.165  Args:166    tsv: path to the data TSV file.167    input_token_type: input token type (one of: "byte", "utf8", or a symbol168        table).169    output_token_type: output token_type (one of: "byte", "utf8", or a symbol170        table).171  Returns:172    A tuple containing the input FAR path and output FAR path.173  """174  with tempfile.NamedTemporaryFile(suffix=".i.txt", mode="w") as itxt:175    with tempfile.NamedTemporaryFile(suffix=".o.txt", mode="w") as otxt:176      with open(tsv, "r") as source:177        for col1, col2 in csv.reader(source, delimiter="\t"):178          print(col1, file=itxt)179          print(col2, file=otxt)180      ifar_path = _mktemp("i.far")181      _log_check_call([182          "farcompilestrings",183          "--fst_type=compact",184          f"--token_type={input_token_type}",185          itxt.name,186          ifar_path,187      ])188      ofar_path = _mktemp("o.far")189      _log_check_call([190          "farcompilestrings",191          "--fst_type=compact",192          f"--token_type={output_token_type}",193          otxt.name,194          ofar_path,195      ])196  # Temporary text files are now deleted.197  return ifar_path, ofar_path198def _compile_cg(ifar_path: str, ofar_path: str, insertions: bool,199                deletions: bool) -> str:200  """Compiles the covering grammar from the input and output FARs.201  Args:202    ifar_path: path to the input FAR.203    ofar_path: path to the output FAR.204    insertions: should insertions be permitted?205    deletions: should deletions be permitted?206  Returns:207    The path to the CG FST.208  """209  ilabels = _get_far_labels(ifar_path)210  olabels = _get_far_labels(ofar_path)211  cg = pywrapfst.VectorFst()212  state = cg.add_state()213  cg.set_start(state)214  one = pywrapfst.Weight.one(cg.weight_type())215  for ilabel, olabel in itertools.product(ilabels, olabels):216    cg.add_arc(state, pywrapfst.Arc(ilabel, olabel, one, state))217  # Handles epsilons, carefully avoiding adding a useless 0:0 label.218  if insertions:219    for olabel in olabels:220      cg.add_arc(state, pywrapfst.Arc(0, olabel, one, state))221  if deletions:222    for ilabel in ilabels:223      cg.add_arc(state, pywrapfst.Arc(ilabel, 0, one, state))224  cg.set_final(state)225  assert cg.verify(), "Label acceptor is ill-formed"226  cg_path = _mktemp("cg.fst")227  cg.write(cg_path)228  return cg_path229def _train_aligner(230    ifar_path: str,231    ofar_path: str,232    cg_path: str,233    seed: int,234    random_starts: int,235    processes: int,236    batch_size: Optional[int] = None,237    delta: float = None,238    alpha: float = None,239    max_iters: Optional[int] = None,240) -> str:241  """Trains the aligner.242  NB: many arguments inherit default values from the `baumwelchtrain` tool.243  Args:244    ifar_path: path to the input FAR.245    ofar_path: path to the output FAR.246    cg_path: path to the convering grammar FST.247    seed: integer random seed.248    random_starts: number of random starts.249    processes: maximum number of processes running concurrently.250    batch_size: batch size (default: from `baumwelchtrain`).251    delta: comparison/quantization delta (default: from `baumwelchtrain`).252    alpha: learning rate (default: from `baumwelchtrain`).253    max_iters: maximum number of iterations (default: from `baumwelchtrain`).254  Returns:255    The path to the aligner FST.256  """257  train_opts: List[str] = []258  if batch_size:259    train_opts.append(f"--batch_size={batch_size}")260  if delta:261    train_opts.append(f"--delta={delta}")262  if alpha:263    train_opts.append(f"--alpha={alpha}")264  if max_iters:265    train_opts.append(f"--max_iters={max_iters}")266  random.seed(seed)267  # Each random start is associated with a randomly chosen unique integer in268  # the range [1, RAND_MAX).269  starts = [270      RandomStart(idx, seed, ifar_path, ofar_path, cg_path, train_opts)271      for idx, seed in enumerate(272          random.sample(range(1, RAND_MAX), random_starts), 1)273  ]274  with multiprocessing.Pool(processes) as pool:275    # Setting chunksize to 1 means that random starts are processed276    # in roughly the order you'd expect.277    pairs = pool.map(RandomStart.train, starts, chunksize=1)278  # Finds best aligner; we `min` because this is in negative log space.279  best_aligner_path, best_likelihood = min(pairs, key=operator.itemgetter(1))280  logging.debug("Best aligner:\t%s", best_aligner_path)281  logging.debug("Best likelihood:\t%.4f", best_likelihood)282  # Deletes suboptimal aligner FSTs.283  for aligner_path, _ in pairs:284    if aligner_path == best_aligner_path:285      continue286    _rmtemp(aligner_path)287  _rmtemp(cg_path)288  return best_aligner_path289def _align(ifar_path: str, ofar_path: str, afst_path: str) -> str:290  """Computes the alignments FAR.291  Args:292    ifar_path: path to the input FAR.293    ofar_path: path to the output FAR.294    afst_path: path to the aligner FST.295  Returns:296    The path to the alignments FAR.297  """298  afar_path = _mktemp("a.far")299  _log_check_call(300      ["baumwelchdecode", ifar_path, ofar_path, afst_path, afar_path])301  _rmtemp(ifar_path)302  _rmtemp(ofar_path)303  return afar_path304def _encode(afar_path: str) -> Tuple[str, str]:305  """Encodes the alignments FAR.306  Args:307    afar_path: path to the alignments FAR.308  Returns:309     A (path to the encoded FAR, path to the encoder) tuple.310  """311  efar_path = _mktemp("e.far")312  encoder_path = _mktemp("encoder")313  _log_check_call(314      ["farencode", "--encode_labels", afar_path, encoder_path, efar_path])315  _rmtemp(afar_path)316  return efar_path, encoder_path317def _compile_pair_ngram(318    efar_path: str,319    encoder_path: str,320    ofst_path: str,321    order: Optional[int] = None,322    size: Optional[int] = None,323) -> None:324  """Compiles the pair n-gram model.325  Args:326    efar_path: path to the encoded FAR.327    encoder_path: path to the encoder.328    ofst_path: path for the pair n-gram FST.329    order: n-gram model order (default: from `ngramcount`).330    size: n-gram model size to prune to (default: no pruning).331  """332  cfst_path = _mktemp("c.fst")333  cmd = ["ngramcount", "--require_symbols=false"]334  if order:335    cmd.append(f"--order={order}")336  cmd.append(efar_path)337  cmd.append(cfst_path)338  _log_check_call(cmd)339  mfst_path = _mktemp("m.fst")340  _log_check_call(["ngrammake", "--method=kneser_ney", cfst_path, mfst_path])341  _rmtemp(cfst_path)342  if size:343    sfst_path = _mktemp("s.fst")344    _log_check_call([345        "ngramshrink",346        "--method=relative_entropy",347        f"--target_number_of_ngrams={size}",348        mfst_path,349        sfst_path,350    ])351    _rmtemp(mfst_path)352  else:353    sfst_path = mfst_path354  _log_check_call(["fstencode", "--decode", sfst_path, encoder_path, ofst_path])355  _rmtemp(encoder_path)356  _rmtemp(sfst_path)357def main(args: argparse.Namespace) -> None:...json_store.py
Source:json_store.py  
...46    def __iter__(self):47        return iter(self._data)48    def __len__(self):49        return len(self._data)50    def _mktemp(self):51        prefix = os.path.basename(self.path) + "."52        dirname = os.path.dirname(self.path)53        return NamedTemporaryFile(mode='w', prefix=prefix, dir=dirname, delete=False)54    def sync(self, json_kw=None, force=False):55        """Atomically write the entire store to disk if it's changed.56        If a dict is passed in as `json_kw`, it will be used as keyword57        arguments to the json module.58        If force is set True, a new file will be written even if the store59        hasn't changed since last sync.60        """61        json_kw = json_kw or self.json_kw62        if self._synced_json_kw != json_kw:63            self._needs_sync = True64        if not (self._needs_sync or force):65            return False66        with self._mktemp() as fp:67            json.dump(self._data, fp, **json_kw)68        if self.mode != MODE_600:  # _mktemp uses 0600 by default69            os.chmod(fp.name, self.mode)70        shutil.move(fp.name, self.path)71        self._synced_json_kw = json_kw72        self._needs_sync = False...test_tempfile.py
Source:test_tempfile.py  
...11        super().setUp()12        self.cassette.storage_file = get_datafile_filename(self.id)13        TempFile.set_cassette(self.cassette)14    def testSimple(self):15        output = TempFile._mktemp()16        self.assertIn(17            f"/tmp/{os.path.basename(self.cassette.storage_file)}/static_tmp_1",18            output,19        )20    def testChangeFile(self):21        self.cassette.storage_file = str(self.cassette.storage_file) + ".x"22        output = TempFile._mktemp()23        self.assertIn(24            f"/tmp/{os.path.basename(self.cassette.storage_file)}/static_tmp_1",25            output,26        )27        output = TempFile._mktemp()28        self.assertIn(29            f"/tmp/{os.path.basename(self.cassette.storage_file)}/static_tmp_2",30            output,31        )32        self.cassette.storage_file = str(self.cassette.storage_file) + ".y"33        self.assertEqual(TempFile.counter, 2)34        output = TempFile._mktemp()35        self.assertEqual(TempFile.counter, 1)36        self.assertIn(37            f"/tmp/{os.path.basename(self.cassette.storage_file)}/static_tmp_1",38            output,39        )40        output = TempFile._mktemp()41        self.assertIn(42            f"/tmp/{os.path.basename(self.cassette.storage_file)}/static_tmp_2",43            output,44        )45@replace(what="tempfile.mktemp", decorate=MkTemp.decorator_plain())46def new_tempfile():47    return tempfile.mktemp()48@replace(what="tempfile.mkdtemp", decorate=MkDTemp.decorator_plain())49def new_tempdir():50    return tempfile.mkdtemp()51class TempFile_New(BaseClass):52    def test_tempfile(self):53        """When regeneration, tempfile will change, so change the expected output"""54        filename = new_tempfile()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
