Best Python code snippet using avocado_python
beam_search.py
Source:beam_search.py  
1# Copyright 2018 The TensorFlow Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#     http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14# ==============================================================================15"""Beam search to find the translated sequence with the highest probability.16Source implementation from Tensor2Tensor:17https://github.com/tensorflow/tensor2tensor/blob/master/tensor2tensor/utils/beam_search.py18"""19import tensorflow as tf20from tensorflow.python.util import nest21# Default value for INF22INF = 1. * 1e723class _StateKeys(object):24  """Keys to dictionary storing the state of the beam search loop."""25  # Variable storing the loop index.26  CUR_INDEX = "CUR_INDEX"27  # Top sequences that are alive for each batch item. Alive sequences are ones28  # that have not generated an EOS token. Sequences that reach EOS are marked as29  # finished and moved to the FINISHED_SEQ tensor.30  # Has shape [batch_size, beam_size, CUR_INDEX + 1]31  ALIVE_SEQ = "ALIVE_SEQ"32  # Log probabilities of each alive sequence. Shape [batch_size, beam_size]33  ALIVE_LOG_PROBS = "ALIVE_LOG_PROBS"34  # Dictionary of cached values for each alive sequence. The cache stores35  # the encoder output, attention bias, and the decoder attention output from36  # the previous iteration.37  ALIVE_CACHE = "ALIVE_CACHE"38  # Top finished sequences for each batch item.39  # Has shape [batch_size, beam_size, CUR_INDEX + 1]. Sequences that are40  # shorter than CUR_INDEX + 1 are padded with 0s.41  FINISHED_SEQ = "FINISHED_SEQ"42  # Scores for each finished sequence. Score = log probability / length norm43  # Shape [batch_size, beam_size]44  FINISHED_SCORES = "FINISHED_SCORES"45  # Flags indicating which sequences in the finished sequences are finished.46  # At the beginning, all of the sequences in FINISHED_SEQ are filler values.47  # True -> finished sequence, False -> filler. Shape [batch_size, beam_size]48  FINISHED_FLAGS = "FINISHED_FLAGS"49class SequenceBeamSearch(object):50  """Implementation of beam search loop."""51  def __init__(self, symbols_to_logits_fn, vocab_size, batch_size,52               beam_size, alpha, max_decode_length, eos_id):53    self.symbols_to_logits_fn = symbols_to_logits_fn54    self.vocab_size = vocab_size55    self.batch_size = batch_size56    self.beam_size = beam_size57    self.alpha = alpha58    self.max_decode_length = max_decode_length59    self.eos_id = eos_id60  def search(self, initial_ids, initial_cache):61    """Beam search for sequences with highest scores."""62    state, state_shapes = self._create_initial_state(initial_ids, initial_cache)63    finished_state = tf.while_loop(64        self._continue_search, self._search_step, loop_vars=[state],65        shape_invariants=[state_shapes], parallel_iterations=1, back_prop=False)66    finished_state = finished_state[0]67    alive_seq = finished_state[_StateKeys.ALIVE_SEQ]68    alive_log_probs = finished_state[_StateKeys.ALIVE_LOG_PROBS]69    finished_seq = finished_state[_StateKeys.FINISHED_SEQ]70    finished_scores = finished_state[_StateKeys.FINISHED_SCORES]71    finished_flags = finished_state[_StateKeys.FINISHED_FLAGS]72    # Account for corner case where there are no finished sequences for a73    # particular batch item. In that case, return alive sequences for that batch74    # item.75    finished_seq = tf.where(76        tf.reduce_any(finished_flags, 1), finished_seq, alive_seq)77    finished_scores = tf.where(78        tf.reduce_any(finished_flags, 1), finished_scores, alive_log_probs)79    return finished_seq, finished_scores80  def _create_initial_state(self, initial_ids, initial_cache):81    """Return initial state dictionary and its shape invariants.82    Args:83      initial_ids: initial ids to pass into the symbols_to_logits_fn.84        int tensor with shape [batch_size, 1]85      initial_cache: dictionary storing values to be passed into the86        symbols_to_logits_fn.87    Returns:88        state and shape invariant dictionaries with keys from _StateKeys89    """90    # Current loop index (starts at 0)91    cur_index = tf.constant(0)92    # Create alive sequence with shape [batch_size, beam_size, 1]93    alive_seq = _expand_to_beam_size(initial_ids, self.beam_size)94    alive_seq = tf.expand_dims(alive_seq, axis=2)95    # Create tensor for storing initial log probabilities.96    # Assume initial_ids are prob 1.097    initial_log_probs = tf.constant(98        [[0.] + [-float("inf")] * (self.beam_size - 1)])99    alive_log_probs = tf.tile(initial_log_probs, [self.batch_size, 1])100    # Expand all values stored in the dictionary to the beam size, so that each101    # beam has a separate cache.102    alive_cache = nest.map_structure(103        lambda t: _expand_to_beam_size(t, self.beam_size), initial_cache)104    # Initialize tensor storing finished sequences with filler values.105    finished_seq = tf.zeros(tf.shape(alive_seq), tf.int32)106    # Set scores of the initial finished seqs to negative infinity.107    finished_scores = tf.ones([self.batch_size, self.beam_size]) * -INF108    # Initialize finished flags with all False values.109    finished_flags = tf.zeros([self.batch_size, self.beam_size], tf.bool)110    # Create state dictionary111    state = {112        _StateKeys.CUR_INDEX: cur_index,113        _StateKeys.ALIVE_SEQ: alive_seq,114        _StateKeys.ALIVE_LOG_PROBS: alive_log_probs,115        _StateKeys.ALIVE_CACHE: alive_cache,116        _StateKeys.FINISHED_SEQ: finished_seq,117        _StateKeys.FINISHED_SCORES: finished_scores,118        _StateKeys.FINISHED_FLAGS: finished_flags119    }120    # Create state invariants for each value in the state dictionary. Each121    # dimension must be a constant or None. A None dimension means either:122    #   1) the dimension's value is a tensor that remains the same but may123    #      depend on the input sequence to the model (e.g. batch size).124    #   2) the dimension may have different values on different iterations.125    state_shape_invariants = {126        _StateKeys.CUR_INDEX: tf.TensorShape([]),127        _StateKeys.ALIVE_SEQ: tf.TensorShape([None, self.beam_size, None]),128        _StateKeys.ALIVE_LOG_PROBS: tf.TensorShape([None, self.beam_size]),129        _StateKeys.ALIVE_CACHE: nest.map_structure(130            _get_shape_keep_last_dim, alive_cache),131        _StateKeys.FINISHED_SEQ: tf.TensorShape([None, self.beam_size, None]),132        _StateKeys.FINISHED_SCORES: tf.TensorShape([None, self.beam_size]),133        _StateKeys.FINISHED_FLAGS: tf.TensorShape([None, self.beam_size])134    }135    return state, state_shape_invariants136  def _continue_search(self, state):137    """Return whether to continue the search loop.138    The loops should terminate when139      1) when decode length has been reached, or140      2) when the worst score in the finished sequences is better than the best141         score in the alive sequences (i.e. the finished sequences are provably142         unchanging)143    Args:144      state: A dictionary with the current loop state.145    Returns:146      Bool tensor with value True if loop should continue, False if loop should147      terminate.148    """149    i = state[_StateKeys.CUR_INDEX]150    alive_log_probs = state[_StateKeys.ALIVE_LOG_PROBS]151    finished_scores = state[_StateKeys.FINISHED_SCORES]152    finished_flags = state[_StateKeys.FINISHED_FLAGS]153    not_at_max_decode_length = tf.less(i, self.max_decode_length)154    # Calculate largest length penalty (the larger penalty, the better score).155    max_length_norm = _length_normalization(self.alpha, self.max_decode_length)156    # Get the best possible scores from alive sequences.157    best_alive_scores = alive_log_probs[:, 0] / max_length_norm158    # Compute worst score in finished sequences for each batch element159    finished_scores *= tf.to_float(finished_flags)  # set filler scores to zero160    lowest_finished_scores = tf.reduce_min(finished_scores, axis=1)161    # If there are no finished sequences in a batch element, then set the lowest162    # finished score to -INF for that element.163    finished_batches = tf.reduce_any(finished_flags, 1)164    lowest_finished_scores += (1. - tf.to_float(finished_batches)) * -INF165    worst_finished_score_better_than_best_alive_score = tf.reduce_all(166        tf.greater(lowest_finished_scores, best_alive_scores)167    )168    return tf.logical_and(169        not_at_max_decode_length,170        tf.logical_not(worst_finished_score_better_than_best_alive_score)171    )172  def _search_step(self, state):173    """Beam search loop body.174    Grow alive sequences by a single ID. Sequences that have reached the EOS175    token are marked as finished. The alive and finished sequences with the176    highest log probabilities and scores are returned.177    A sequence's finished score is calculating by dividing the log probability178    by the length normalization factor. Without length normalization, the179    search is more likely to return shorter sequences.180    Args:181      state: A dictionary with the current loop state.182    Returns:183      new state dictionary.184    """185    # Grow alive sequences by one token.186    new_seq, new_log_probs, new_cache = self._grow_alive_seq(state)187    # Collect top beam_size alive sequences188    alive_state = self._get_new_alive_state(new_seq, new_log_probs, new_cache)189    # Combine newly finished sequences with existing finished sequences, and190    # collect the top k scoring sequences.191    finished_state = self._get_new_finished_state(state, new_seq, new_log_probs)192    # Increment loop index and create new state dictionary193    new_state = {_StateKeys.CUR_INDEX: state[_StateKeys.CUR_INDEX] + 1}194    new_state.update(alive_state)195    new_state.update(finished_state)196    return [new_state]197  def _grow_alive_seq(self, state):198    """Grow alive sequences by one token, and collect top 2*beam_size sequences.199    2*beam_size sequences are collected because some sequences may have reached200    the EOS token. 2*beam_size ensures that at least beam_size sequences are201    still alive.202    Args:203      state: A dictionary with the current loop state.204    Returns:205      Tuple of206      (Top 2*beam_size sequences [batch_size, 2 * beam_size, cur_index + 1],207       Scores of returned sequences [batch_size, 2 * beam_size],208       New alive cache, for each of the 2 * beam_size sequences)209    """210    i = state[_StateKeys.CUR_INDEX]211    alive_seq = state[_StateKeys.ALIVE_SEQ]212    alive_log_probs = state[_StateKeys.ALIVE_LOG_PROBS]213    alive_cache = state[_StateKeys.ALIVE_CACHE]214    beams_to_keep = 2 * self.beam_size215    # Get logits for the next candidate IDs for the alive sequences. Get the new216    # cache values at the same time.217    flat_ids = _flatten_beam_dim(alive_seq)  # [batch_size * beam_size]218    flat_cache = nest.map_structure(_flatten_beam_dim, alive_cache)219    flat_logits, flat_cache = self.symbols_to_logits_fn(flat_ids, i, flat_cache)220    # Unflatten logits to shape [batch_size, beam_size, vocab_size]221    logits = _unflatten_beam_dim(flat_logits, self.batch_size, self.beam_size)222    new_cache = nest.map_structure(223        lambda t: _unflatten_beam_dim(t, self.batch_size, self.beam_size),224        flat_cache)225    # Convert logits to normalized log probs226    candidate_log_probs = _log_prob_from_logits(logits)227    # Calculate new log probabilities if each of the alive sequences were228    # extended # by the the candidate IDs.229    # Shape [batch_size, beam_size, vocab_size]230    log_probs = candidate_log_probs + tf.expand_dims(alive_log_probs, axis=2)231    # Each batch item has beam_size * vocab_size candidate sequences. For each232    # batch item, get the k candidates with the highest log probabilities.233    flat_log_probs = tf.reshape(log_probs,234                                [-1, self.beam_size * self.vocab_size])235    topk_log_probs, topk_indices = tf.nn.top_k(flat_log_probs, k=beams_to_keep)236    # Extract the alive sequences that generate the highest log probabilities237    # after being extended.238    topk_beam_indices = topk_indices // self.vocab_size239    topk_seq, new_cache = _gather_beams(240        [alive_seq, new_cache], topk_beam_indices, self.batch_size,241        beams_to_keep)242    # Append the most probable IDs to the topk sequences243    topk_ids = topk_indices % self.vocab_size244    topk_ids = tf.expand_dims(topk_ids, axis=2)245    topk_seq = tf.concat([topk_seq, topk_ids], axis=2)246    return topk_seq, topk_log_probs, new_cache247  def _get_new_alive_state(self, new_seq, new_log_probs, new_cache):248    """Gather the top k sequences that are still alive.249    Args:250      new_seq: New sequences generated by growing the current alive sequences251        int32 tensor with shape [batch_size, 2 * beam_size, cur_index + 1]252      new_log_probs: Log probabilities of new sequences253        float32 tensor with shape [batch_size, beam_size]254      new_cache: Dict of cached values for each sequence.255    Returns:256      Dictionary with alive keys from _StateKeys:257        {Top beam_size sequences that are still alive (don't end with eos_id)258         Log probabilities of top alive sequences259         Dict cache storing decoder states for top alive sequences}260    """261    # To prevent finished sequences from being considered, set log probs to -INF262    new_finished_flags = tf.equal(new_seq[:, :, -1], self.eos_id)263    new_log_probs += tf.to_float(new_finished_flags) * -INF264    top_alive_seq, top_alive_log_probs, top_alive_cache = _gather_topk_beams(265        [new_seq, new_log_probs, new_cache], new_log_probs, self.batch_size,266        self.beam_size)267    return {268        _StateKeys.ALIVE_SEQ: top_alive_seq,269        _StateKeys.ALIVE_LOG_PROBS: top_alive_log_probs,270        _StateKeys.ALIVE_CACHE: top_alive_cache271    }272  def _get_new_finished_state(self, state, new_seq, new_log_probs):273    """Combine new and old finished sequences, and gather the top k sequences.274    Args:275      state: A dictionary with the current loop state.276      new_seq: New sequences generated by growing the current alive sequences277        int32 tensor with shape [batch_size, beam_size, i + 1]278      new_log_probs: Log probabilities of new sequences279        float32 tensor with shape [batch_size, beam_size]280    Returns:281      Dictionary with finished keys from _StateKeys:282        {Top beam_size finished sequences based on score,283         Scores of finished sequences,284         Finished flags of finished sequences}285    """286    i = state[_StateKeys.CUR_INDEX]287    finished_seq = state[_StateKeys.FINISHED_SEQ]288    finished_scores = state[_StateKeys.FINISHED_SCORES]289    finished_flags = state[_StateKeys.FINISHED_FLAGS]290    # First append a column of 0-ids to finished_seq to increment the length.291    # New shape of finished_seq: [batch_size, beam_size, i + 1]292    finished_seq = tf.concat(293        [finished_seq,294         tf.zeros([self.batch_size, self.beam_size, 1], tf.int32)], axis=2)295    # Calculate new seq scores from log probabilities.296    length_norm = _length_normalization(self.alpha, i + 1)297    new_scores = new_log_probs / length_norm298    # Set the scores of the still-alive seq in new_seq to large negative values.299    new_finished_flags = tf.equal(new_seq[:, :, -1], self.eos_id)300    new_scores += (1. - tf.to_float(new_finished_flags)) * -INF301    # Combine sequences, scores, and flags.302    finished_seq = tf.concat([finished_seq, new_seq], axis=1)303    finished_scores = tf.concat([finished_scores, new_scores], axis=1)304    finished_flags = tf.concat([finished_flags, new_finished_flags], axis=1)305    # Return the finished sequences with the best scores.306    top_finished_seq, top_finished_scores, top_finished_flags = (307        _gather_topk_beams([finished_seq, finished_scores, finished_flags],308                           finished_scores, self.batch_size, self.beam_size))309    return {310        _StateKeys.FINISHED_SEQ: top_finished_seq,311        _StateKeys.FINISHED_SCORES: top_finished_scores,312        _StateKeys.FINISHED_FLAGS: top_finished_flags313    }314def sequence_beam_search(315    symbols_to_logits_fn, initial_ids, initial_cache, vocab_size, beam_size,316    alpha, max_decode_length, eos_id):317  """Search for sequence of subtoken ids with the largest probability.318  Args:319    symbols_to_logits_fn: A function that takes in ids, index, and cache as320      arguments. The passed in arguments will have shape:321        ids -> [batch_size * beam_size, index]322        index -> [] (scalar)323        cache -> nested dictionary of tensors [batch_size * beam_size, ...]324      The function must return logits and new cache.325        logits -> [batch * beam_size, vocab_size]326        new cache -> same shape/structure as inputted cache327    initial_ids: Starting ids for each batch item.328      int32 tensor with shape [batch_size]329    initial_cache: dict containing starting decoder variables information330    vocab_size: int size of tokens331    beam_size: int number of beams332    alpha: float defining the strength of length normalization333    max_decode_length: maximum length to decoded sequence334    eos_id: int id of eos token, used to determine when a sequence has finished335  Returns:336    Top decoded sequences [batch_size, beam_size, max_decode_length]337    sequence scores [batch_size, beam_size]338  """339  batch_size = tf.shape(initial_ids)[0]340  sbs = SequenceBeamSearch(symbols_to_logits_fn, vocab_size, batch_size,341                           beam_size, alpha, max_decode_length, eos_id)342  return sbs.search(initial_ids, initial_cache)343def _log_prob_from_logits(logits):344  return logits - tf.reduce_logsumexp(logits, axis=2, keep_dims=True)345def _length_normalization(alpha, length):346  """Return length normalization factor."""347  return tf.pow(((5. + tf.to_float(length)) / 6.), alpha)348def _expand_to_beam_size(tensor, beam_size):349  """Tiles a given tensor by beam_size.350  Args:351    tensor: tensor to tile [batch_size, ...]352    beam_size: How much to tile the tensor by.353  Returns:354    Tiled tensor [batch_size, beam_size, ...]355  """356  tensor = tf.expand_dims(tensor, axis=1)357  tile_dims = [1] * tensor.shape.ndims358  tile_dims[1] = beam_size359  return tf.tile(tensor, tile_dims)360def _shape_list(tensor):361  """Return a list of the tensor's shape, and ensure no None values in list."""362  # Get statically known shape (may contain None's for unknown dimensions)363  shape = tensor.get_shape().as_list()364  # Ensure that the shape values are not None365  dynamic_shape = tf.shape(tensor)366  for i in range(len(shape)):  # pylint: disable=consider-using-enumerate367    if shape[i] is None:368      shape[i] = dynamic_shape[i]369  return shape370def _get_shape_keep_last_dim(tensor):371  shape_list = _shape_list(tensor)372  # Only the last373  for i in range(len(shape_list) - 1):374    shape_list[i] = None375  if isinstance(shape_list[-1], tf.Tensor):376    shape_list[-1] = None377  return tf.TensorShape(shape_list)378def _flatten_beam_dim(tensor):379  """Reshapes first two dimensions in to single dimension.380  Args:381    tensor: Tensor to reshape of shape [A, B, ...]382  Returns:383    Reshaped tensor of shape [A*B, ...]384  """385  shape = _shape_list(tensor)386  shape[0] *= shape[1]387  shape.pop(1)  # Remove beam dim388  return tf.reshape(tensor, shape)389def _unflatten_beam_dim(tensor, batch_size, beam_size):390  """Reshapes first dimension back to [batch_size, beam_size].391  Args:392    tensor: Tensor to reshape of shape [batch_size*beam_size, ...]393    batch_size: Tensor, original batch size.394    beam_size: int, original beam size.395  Returns:396    Reshaped tensor of shape [batch_size, beam_size, ...]397  """398  shape = _shape_list(tensor)399  new_shape = [batch_size, beam_size] + shape[1:]400  return tf.reshape(tensor, new_shape)401def _gather_beams(nested, beam_indices, batch_size, new_beam_size):402  """Gather beams from nested structure of tensors.403  Each tensor in nested represents a batch of beams, where beam refers to a404  single search state (beam search involves searching through multiple states405  in parallel).406  This function is used to gather the top beams, specified by407  beam_indices, from the nested tensors.408  Args:409    nested: Nested structure (tensor, list, tuple or dict) containing tensors410      with shape [batch_size, beam_size, ...].411    beam_indices: int32 tensor with shape [batch_size, new_beam_size]. Each412     value in beam_indices must be between [0, beam_size), and are not413     necessarily unique.414    batch_size: int size of batch415    new_beam_size: int number of beams to be pulled from the nested tensors.416  Returns:417    Nested structure containing tensors with shape418      [batch_size, new_beam_size, ...]419  """420  # Computes the i'th coodinate that contains the batch index for gather_nd.421  # Batch pos is a tensor like [[0,0,0,0,],[1,1,1,1],..].422  batch_pos = tf.range(batch_size * new_beam_size) // new_beam_size423  batch_pos = tf.reshape(batch_pos, [batch_size, new_beam_size])424  # Create coordinates to be passed to tf.gather_nd. Stacking creates a tensor425  # with shape [batch_size, beam_size, 2], where the last dimension contains426  # the (i, j) gathering coordinates.427  coordinates = tf.stack([batch_pos, beam_indices], axis=2)428  return nest.map_structure(429      lambda state: tf.gather_nd(state, coordinates), nested)430def _gather_topk_beams(nested, score_or_log_prob, batch_size, beam_size):431  """Gather top beams from nested structure."""432  _, topk_indexes = tf.nn.top_k(score_or_log_prob, k=beam_size)...sampling_module.py
Source:sampling_module.py  
1# Copyright 2022 The TensorFlow Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#     http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14"""Sampling module for top_k, top_p and greedy decoding."""15import abc16from typing import Any, Callable, Dict, Optional17import numpy as np18import tensorflow as tf19from official.nlp.modeling.ops import decoding_module20def greedy(log_probs):21  """Returns the top ids and scores based on greedy decoding."""22  log_probs, ids = tf.math.top_k(log_probs, k=1)23  return log_probs, ids24def sample_logits_with_temperature(logits, temperature):25  """Applies a sampling temperature.26     Temperature skews the distribution towards high probability27     tokens and lowers the mass in tail distribution.28  Args:29    logits: Input logits for next token.30    temperature: Tensor for specifying the sampling temperature.31  Returns:32    Logits with applied temperature.33  """34  return logits / temperature35def sample_top_k(logits, top_k):36  """Chooses top_k logits and sets the others to negative infinity.37  Args:38    logits: Input logits for next token.39    top_k: Tensor to specify the top_k values.40  Returns:41    Logits with top_k filtering applied.42  """43  top_k_logits = tf.math.top_k(logits, k=top_k)44  indices_to_remove = logits < tf.expand_dims(top_k_logits[0][..., -1], -1)45  top_k_logits = set_tensor_by_indices_to_value(logits, indices_to_remove,46                                                np.NINF)47  return top_k_logits48def sample_top_p(logits, top_p):49  """Chooses most probable logits with cumulative probabilities upto top_p.50  Sets the remaining logits to negative infinity.51  Args:52    logits: Input logits for next token.53    top_p: Float tensor with a value >=0 and < 1.054  Returns:55    Logits with top_p filtering applied.56  """57  sorted_indices = tf.argsort(logits, direction="DESCENDING")58  # Flatten logits as tf.gather on TPU needs axis to be compile time constant.59  logits_shape = decoding_module.shape_list(logits)60  range_for_gather = tf.expand_dims(tf.range(0, logits_shape[0]), axis=1)61  range_for_gather = tf.tile(range_for_gather * logits_shape[1],62                             [1, logits_shape[1]]) + sorted_indices63  flattened_logits = tf.reshape(logits, [-1])64  flattened_sorted_indices = tf.reshape(range_for_gather, [-1])65  sorted_logits = tf.reshape(66      tf.gather(flattened_logits, flattened_sorted_indices),67      [logits_shape[0], logits_shape[1]])68  cumulative_probs = tf.cumsum(tf.nn.softmax(sorted_logits, axis=-1), axis=-1)69  # Remove tokens with cumulative probability above the threshold.70  sorted_indices_to_remove = cumulative_probs > top_p71  # Shift the indices to the right to keep the first token above threshold.72  sorted_indices_to_remove = tf.roll(sorted_indices_to_remove, 1, axis=-1)73  sorted_indices_to_remove = tf.concat([74      tf.zeros_like(sorted_indices_to_remove[:, :1]),75      sorted_indices_to_remove[:, 1:]76  ], -1)77  # Scatter sorted indices to original indexes.78  indices_to_remove = scatter_values_on_batch_indices(sorted_indices_to_remove,79                                                      sorted_indices)80  top_p_logits = set_tensor_by_indices_to_value(logits, indices_to_remove,81                                                np.NINF)82  return top_p_logits83def scatter_values_on_batch_indices(values, batch_indices):84  """Scatter `values` into a tensor using `batch_indices`.85  Args:86    values: tensor of shape [batch_size, vocab_size] containing the values to87      scatter88    batch_indices: tensor of shape [batch_size, vocab_size] containing the89      indices to insert (should be a permutation in range(0, n))90  Returns:91    Tensor of shape [batch_size, vocab_size] with values inserted at92    batch_indices93  """94  tensor_shape = decoding_module.shape_list(batch_indices)95  broad_casted_batch_dims = tf.reshape(96      tf.broadcast_to(97          tf.expand_dims(tf.range(tensor_shape[0]), axis=-1), tensor_shape),98      [1, -1])99  pair_indices = tf.transpose(100      tf.concat([broad_casted_batch_dims,101                 tf.reshape(batch_indices, [1, -1])], 0))102  return tf.scatter_nd(pair_indices, tf.reshape(values, [-1]), tensor_shape)103def set_tensor_by_indices_to_value(input_tensor, indices, value):104  """Where indices is True, set the value in input_tensor to value.105  Args:106    input_tensor: float (batch_size, dim)107    indices: bool (batch_size, dim)108    value: float scalar109  Returns:110    output_tensor: same shape as input_tensor.111  """112  value_tensor = tf.zeros_like(input_tensor) + value113  output_tensor = tf.where(indices, value_tensor, input_tensor)114  return output_tensor115class SamplingModule(decoding_module.DecodingModule, metaclass=abc.ABCMeta):116  """Implementation for sampling strategies (go/decoding-tf-nlp)."""117  def __init__(self,118               symbols_to_logits_fn,119               vocab_size: int,120               max_decode_length: int,121               eos_id: int,122               padded_decode: bool,123               length_normalization_fn: Optional[Callable[[int, tf.DType],124                                                          float]] = None,125               top_k=0,126               top_p=1.0,127               sample_temperature=0.0,128               enable_greedy: bool = True,129               dtype: tf.DType = tf.float32):130    """Initialize sampling module."""131    self.symbols_to_logits_fn = symbols_to_logits_fn132    self.length_normalization_fn = length_normalization_fn133    self.eos_id = eos_id134    self.padded_decode = padded_decode135    self.dtype = tf.as_dtype(dtype)136    self.vocab_size = tf.convert_to_tensor(vocab_size, dtype=tf.int32)137    self.max_decode_length = max_decode_length138    self.top_k = tf.convert_to_tensor(top_k, dtype=tf.int32)139    self.top_p = tf.convert_to_tensor(top_p, dtype=tf.float32)140    self.sample_temperature = tf.convert_to_tensor(141        sample_temperature, dtype=tf.float32)142    self.enable_greedy = enable_greedy143    super(SamplingModule, self).__init__(144        length_normalization_fn=length_normalization_fn, dtype=dtype)145  def _grow_alive_seq(self,146                      state: Dict[str, Any],147                      batch_size: int) -> decoding_module.InternalState:148    """Grow alive sequences by one token.149    This function will implement the decoding strategies like top_p, top_k150    and greedy for the choosing the next logit.151    Args:152      state: A dictionary with the current loop state.153      batch_size: The given batch size154    Returns:155      Tuple of156      (Top sequences [batch, curr_index + 1] or [batch, max_decode_length + 1],157       Scores of returned sequences [batch, 1],158       New ids [batch, 1],159       New alive cache)160    """161    i = state[decoding_module.StateKeys.CUR_INDEX]162    alive_seq = state[decoding_module.StateKeys.ALIVE_SEQ]163    alive_log_probs = state[decoding_module.StateKeys.ALIVE_LOG_PROBS]164    alive_cache = state[decoding_module.StateKeys.ALIVE_CACHE]165    if self.padded_decode:166      ids = tf.slice(alive_seq, [0, i], [batch_size, 1])167    else:168      ids = alive_seq169    new_logits, new_cache = self.symbols_to_logits_fn(ids, i, alive_cache)170    candidate_log_probs = decoding_module.log_prob_from_logits(171        new_logits)172    original_log_probs = candidate_log_probs + alive_log_probs173    topk_log_probs, topk_ids = None, None174    if self.enable_greedy:175      topk_log_probs, topk_ids = greedy(original_log_probs)176    else:177      temperature_fn = sample_logits_with_temperature178      sampled_logits = tf.cond(179          self.sample_temperature > 0.0,180          lambda: temperature_fn(new_logits, self.sample_temperature),181          lambda: new_logits)182      sampled_logits = tf.cond(183          self.top_k > 0,184          lambda: sample_top_k(sampled_logits, self.top_k),185          lambda: sampled_logits)186      sampled_logits = tf.cond(187          self.top_p < 1,188          lambda: sample_top_p(sampled_logits, self.top_p),189          lambda: sampled_logits)190      topk_ids = tf.random.categorical(191          sampled_logits, dtype=tf.int32, num_samples=1)192      topk_log_probs = tf.gather(193          original_log_probs, topk_ids, axis=1, batch_dims=1)194    if self.padded_decode:195      topk_seq = tf.transpose(alive_seq, perm=[1, 0])196      topk_seq = tf.tensor_scatter_nd_update(197          topk_seq, [[i + 1]], tf.expand_dims(tf.squeeze(topk_ids, -1), 0))198      topk_seq = tf.transpose(topk_seq, perm=[1, 0])199    else:200      topk_seq = tf.concat([alive_seq, topk_ids], axis=-1)201    return topk_seq, topk_log_probs, topk_ids, new_cache202  def _create_initial_state(self,203                            initial_ids: tf.Tensor,204                            initial_cache: Dict[str, tf.Tensor],205                            batch_size: int) -> decoding_module.InitialState:206    """Return initial state dictionary and its shape invariants."""207    for key, value in initial_cache.items():208      for inner_value in tf.nest.flatten(value):209        if inner_value.dtype != self.dtype:210          raise TypeError(211              "initial_cache element for key '%s' has dtype %s that does not "212              "match sampling_module's dtype of %s. Value: %s" %213              (key, value.dtype.name, self.dtype.name, inner_value))214    # Current loop index (starts at 0)215    cur_index = tf.constant(0)216    # Alive sequence with shape [batch_size, 1]217    alive_seq = initial_ids218    alive_seq = tf.expand_dims(alive_seq, axis=-1)219    if self.padded_decode:220      alive_seq = tf.tile(alive_seq, [1, self.max_decode_length + 1])221    # Initial log probabilities with shape [batch_size, 1].222    initial_log_probs = tf.constant([[0.]], dtype=self.dtype)223    alive_log_probs = tf.tile(initial_log_probs, [batch_size, 1])224    alive_cache = initial_cache225    # Initialize tensor storing finished sequences [batch_size, 1, 1].226    finished_seq = tf.zeros(tf.shape(alive_seq), tf.int32)227    # Set scores of the initial finished seqs to negative infinity.228    finished_scores = tf.zeros([batch_size, 1], dtype=self.dtype)229    # Initialize finished flags with all False values.230    finished_flags = tf.zeros([batch_size, 1], tf.bool)231    # Create state dictionary and state shapes.232    state = {233        decoding_module.StateKeys.CUR_INDEX: cur_index,234        decoding_module.StateKeys.ALIVE_SEQ: alive_seq,235        decoding_module.StateKeys.ALIVE_LOG_PROBS: alive_log_probs,236        decoding_module.StateKeys.ALIVE_CACHE: alive_cache,237        decoding_module.StateKeys.FINISHED_SEQ: finished_seq,238        decoding_module.StateKeys.FINISHED_SCORES: finished_scores,239        decoding_module.StateKeys.FINISHED_FLAGS: finished_flags240    }241    if self.padded_decode:242      state_shape_invariants = {243          decoding_module.StateKeys.CUR_INDEX:244              tf.TensorShape([]),245          decoding_module.StateKeys.ALIVE_SEQ:246              tf.TensorShape(247                  [batch_size, self.max_decode_length + 1]),248          decoding_module.StateKeys.ALIVE_LOG_PROBS:249              tf.TensorShape([batch_size, 1]),250          decoding_module.StateKeys.ALIVE_CACHE:251              tf.nest.map_structure(lambda state: state.get_shape(),252                                    alive_cache),253          decoding_module.StateKeys.FINISHED_SEQ:254              tf.TensorShape(255                  [batch_size, self.max_decode_length + 1]),256          decoding_module.StateKeys.FINISHED_SCORES:257              tf.TensorShape([batch_size, 1]),258          decoding_module.StateKeys.FINISHED_FLAGS:259              tf.TensorShape([batch_size, 1])260      }261    else:262      state_shape_invariants = {263          decoding_module.StateKeys.CUR_INDEX:264              tf.TensorShape([]),265          decoding_module.StateKeys.ALIVE_SEQ:266              tf.TensorShape([None, None]),267          decoding_module.StateKeys.ALIVE_LOG_PROBS:268              tf.TensorShape([None, 1]),269          decoding_module.StateKeys.ALIVE_CACHE:270              tf.nest.map_structure(271                  decoding_module.get_shape_keep_last_dim,272                  alive_cache),273          decoding_module.StateKeys.FINISHED_SEQ:274              tf.TensorShape([None, None]),275          decoding_module.StateKeys.FINISHED_SCORES:276              tf.TensorShape([None, 1]),277          decoding_module.StateKeys.FINISHED_FLAGS:278              tf.TensorShape([None, 1])279      }280    return state, state_shape_invariants281  def _get_new_alive_state(self, new_seq: tf.Tensor, new_log_probs: tf.Tensor,282                           new_finished_flags: tf.Tensor,283                           new_cache: Dict[str, tf.Tensor]) -> Dict[str, Any]:284    """Gather the sequences that are still alive.285    This function resets the sequences in the alive_state that are finished.286    Args:287      new_seq: New sequences generated by growing the current alive sequences288        int32 tensor with shape [batch_size, cur_index + 1]289      new_log_probs: Log probabilities of new sequences float32 tensor with290        shape [batch_size, 1]291      new_finished_flags: A boolean Tensor indicates which sequences are live292        inside the beam.293      new_cache: Dict of cached values for each sequence.294    Returns:295      Dictionary with alive keys.296    """297    new_seq = tf.multiply(298        new_seq, tf.cast(tf.logical_not(new_finished_flags), new_seq.dtype))299    return {300        decoding_module.StateKeys.ALIVE_SEQ: new_seq,301        decoding_module.StateKeys.ALIVE_LOG_PROBS: new_log_probs,302        decoding_module.StateKeys.ALIVE_CACHE: new_cache303    }304  def _get_new_finished_state(self, state: Dict[str, Any], new_seq: tf.Tensor,305                              new_log_probs: tf.Tensor,306                              new_finished_flags: tf.Tensor,307                              batch_size: int) -> Dict[str, tf.Tensor]:308    """Combine new and old finished sequences.309    Args:310      state: A dictionary with the current loop state.311      new_seq: New sequences generated by growing the current alive sequences312        int32 tensor [batch, curr_index + 1] or [batch, max_decode_length + 1].313      new_log_probs: Log probabilities of new sequences float32 tensor with314        shape [batch, 1].315      new_finished_flags: A boolean Tensor indicates which sequences are live.316      batch_size: The given batch size.317    Returns:318      Dictionary with finished keys from StateKeys.319    """320    i = state[decoding_module.StateKeys.CUR_INDEX]321    finished_seq = state[decoding_module.StateKeys.FINISHED_SEQ]322    finished_scores = state[decoding_module.StateKeys.FINISHED_SCORES]323    finished_flags = state[decoding_module.StateKeys.FINISHED_FLAGS]324    if not self.padded_decode:325      finished_seq = tf.concat(326          [finished_seq, tf.zeros([batch_size, 1], tf.int32)], axis=-1)327    new_scores = new_log_probs328    if self.length_normalization_fn is not None:329      length_norm = self.length_normalization_fn(i + 1, self.dtype)330      new_scores = new_log_probs / length_norm331    new_seq = tf.multiply(332        new_seq, tf.cast(tf.logical_not(finished_flags), new_seq.dtype))333    new_scores = tf.multiply(334        new_scores, tf.cast(tf.logical_not(finished_flags), new_scores.dtype))335    finished_seq += tf.multiply(new_seq,336                                tf.cast(new_finished_flags, new_seq.dtype))337    finished_scores += tf.multiply(338        new_scores, tf.cast(new_finished_flags, new_scores.dtype))339    new_finished_flags = tf.logical_or(new_finished_flags, finished_flags)340    return {341        decoding_module.StateKeys.FINISHED_SEQ: finished_seq,342        decoding_module.StateKeys.FINISHED_SCORES: finished_scores,343        decoding_module.StateKeys.FINISHED_FLAGS: new_finished_flags344    }345  def _process_finished_state(346      self, finished_state: Dict[str, Any]) -> decoding_module.Output:347    """Process the alive/finished state to return final sequences and scores."""348    alive_seq = finished_state[decoding_module.StateKeys.ALIVE_SEQ]349    alive_log_probs = finished_state[decoding_module.StateKeys.ALIVE_LOG_PROBS]350    finished_seq = finished_state[decoding_module.StateKeys.FINISHED_SEQ]351    finished_scores = finished_state[decoding_module.StateKeys.FINISHED_SCORES]352    finished_flags = finished_state[decoding_module.StateKeys.FINISHED_FLAGS]353    finished_cond = tf.reduce_any(finished_flags, 1, name="finished_cond")354    if self.length_normalization_fn is not None:355      length_norm = self.length_normalization_fn(self.max_decode_length + 1,356                                                 self.dtype)357      alive_log_probs = alive_log_probs / length_norm358    seq_cond = decoding_module.expand_to_same_rank(finished_cond, finished_seq)359    score_cond = decoding_module.expand_to_same_rank(finished_cond,360                                                     finished_scores)361    finished_seq = tf.where(seq_cond, finished_seq, alive_seq)362    finished_scores = tf.where(score_cond, finished_scores, alive_log_probs)363    return finished_seq, finished_scores364  def _continue_search(self, state) -> tf.Tensor:365    i = state[decoding_module.StateKeys.CUR_INDEX]366    # Have we reached max decoding length?367    not_at_end = tf.less(i, self.max_decode_length)368    # Have all sampled sequences reached an EOS?369    all_has_eos = tf.reduce_all(370        state[decoding_module.StateKeys.FINISHED_FLAGS],371        axis=None,372        name="search_finish_cond")373    return tf.logical_and(not_at_end, tf.logical_not(all_has_eos))374  def _finished_flags(self, topk_ids, state) -> tf.Tensor:375    new_finished_flags = tf.equal(topk_ids, self.eos_id)376    new_finished_flags = tf.logical_or(377        new_finished_flags, state[decoding_module.StateKeys.FINISHED_FLAGS])...scheduling_algorithms.py
Source:scheduling_algorithms.py  
1from components import *2import operator3from abc import ABC, abstractmethod4class Scheduling(ABC):5    def __init__(self):6        self.turnaround = 07        self.finished = True8    def add_process_at_the_end(self):9        self.cur_processes.append(process)10    def add_process_at_the_beginning(self, process: Process):11        self.cur_processes = [process] + self.cur_processes12    def remove_process_at_the_beginning(self):13        self.cur_processes = self.cur_processes[1:]14    def get_first_process(self):15        return self.cur_processes[0]16    def get_arrived_processes(self, t: int):17        while len(self.processes) > 0 and self.processes[0].arrival_time <= t:18            self.cur_processes.append(self.processes[0])19            self.processes = self.processes[1:]20    def set_executed(self, process: Process, ram: RAM):21        for page in process.pages:22            ram.set_executed(page)23    @abstractmethod24    def execute(self):25        pass26class Premptive(Scheduling):27    def __init__(self, quantum: int, overhead: int):28        self.quantum = quantum29        self.overhead = overhead30        self.cnt_quantum = 031        self.cnt_overhead = -132        super().__init__()33class Non_Premptive(Scheduling):34    def __init__(self):35        super().__init__()36class FIFO(Non_Premptive):37    def __init__(self):38        super().__init__()39    # returns the process executed and a flag determining whether it has finished40    def execute(self, waiting_processes: list, t: int, ram: RAM) -> (Process, bool):41        if len(waiting_processes) == 0:42            print("t = ", "empty")43            return Process(-1, -1, -1, -1, -1, -1), False44        front = waiting_processes[0]45        assert front.exec_time != 046        self.set_executed(front, ram)47        if front.exec_time == 1:48            waiting_processes = waiting_processes[1:]49            self.turnaround = self.turnaround + (t - front.arrival_time + 1)50            self.finished = True51        else:52            waiting_processes[0].exec_time = waiting_processes[0].exec_time - 153            self.finished = False54        return front, self.finished55class SJF(Non_Premptive):56    def __init__(self):57        super().__init__()58    def execute(self, waiting_processes: list, t: int, ram: RAM) -> (Process, bool):59        if len(waiting_processes) == 0:60            print("t = ", t, "empty")61            return Process(-1, -1, -1, -1, -1, -1), False62        if self.finished:63            waiting_processes.sort(key=lambda p: p.exec_time)64            self.finished = False65        front = waiting_processes[0]66        assert front.exec_time != 067        self.set_executed(front, ram)68        finished = False69        if front.exec_time == 1:70            # waiting_processes = waiting_processes[1:]71            self.turnaround = self.turnaround + (t - front.arrival_time + 1)72            finished = True73        else:74            waiting_processes[0].exec_time = waiting_processes[0].exec_time - 175            finished = False76        return front, finished77class Round_Robin(Premptive):78    def __init__(self, quantum: int, overhead: int):79        assert quantum >= 1 and overhead >= 180        super().__init__(quantum=quantum, overhead=overhead)81    def execute(self, waiting_processes: list, t: int, ram: RAM) -> (Process, bool):82        # if cnt_overhead is negative, the process is executing83        if self.cnt_overhead >= 0:84            print("\n\n\n\nOVERHEAD\n\n\n\n")85            self.cnt_overhead += 186            if self.cnt_overhead == self.overhead:87                self.cnt_overhead = -188                self.cnt_quantum = 089            return Process(-1, -1, -1, -1, -1, -1), False90        if len(waiting_processes) == 0:91            print("t = ", t, "empty")92            return Process(-1, -1, -1, -1, -1, -1), False93        front = waiting_processes[0]94        assert front.exec_time != 095        self.set_executed(front, ram)96        finished = False97        if front.exec_time == 1:98            # waiting_processes = waiting_processes[1:]99            self.turnaround = self.turnaround + (t - front.arrival_time + 1)100            finished = True101            self.cnt_overhead = -1102            self.cnt_quantum = 0103        else:104            waiting_processes[0].exec_time = waiting_processes[0].exec_time - 1105            finished = False106        self.cnt_quantum += 1107        if self.cnt_quantum == self.quantum:108            self.cnt_overhead = 0109            self.cnt_quantum = 0110            # move process to the end of the queue if it will run again111            if not finished:112                waiting_processes = waiting_processes[1:] + [front]113        # returns a flag to indicates whether the process has finished and this process114        return front, finished115class EDF(Premptive):116    def __init__(self, quantum: int, overhead: int):117        self.quantum = quantum118        self.overhead = overhead119        super().__init__(quantum, overhead)120    def execute(self, waiting_processes: list, t: int, ram: RAM) -> (Process, bool):121        # if cnt_overhead is negative, the process is executing122        if self.cnt_overhead >= 0:123            print("\n\n\n\nOVERHEAD\n\n\n\n")124            self.cnt_overhead += 1125            if self.cnt_overhead == self.overhead:126                self.cnt_overhead = -1127                self.cnt_quantum = 0128            return Process(-1, -1, -1, -1, -1, -1), False129        if len(waiting_processes) == 0:130            print ("t = ", t, "empty")131            return Process(-1, -1, -1, -1, -1, -1), False132        if self.finished:133            waiting_processes.sort(key=lambda p: p.deadline)134            self.finished = False135        front = waiting_processes[0]136        assert front.exec_time != 0137        self.set_executed(front, ram)138        finished = False139        if front.exec_time == 1:140            # waiting_processes = waiting_processes[1:]141            self.turnaround = self.turnaround + (t - front.arrival_time + 1)142            finished = True143            self.cnt_overhead = -1144            self.cnt_quantum = 0145        else:146            waiting_processes[0].exec_time = waiting_processes[0].exec_time - 1147            finished = False148        self.cnt_quantum += 1149        if self.cnt_quantum == self.quantum:150            if not finished:151                self.cnt_overhead = 0152            self.cnt_quantum = 0153            # move process to the end of the queue if it will run again154            if not finished:155                waiting_processes = waiting_processes[1:] + [front]156        # returns a flag to indicates whether the process has finished and this process...test_item.py
Source:test_item.py  
1from django.urls import reverse2from rest_framework import status3from rest_framework.test import APITestCase4from django.contrib.auth.models import User5from .test_todo import create_todo6class ItemTest(APITestCase):7    """Tests API for items."""8    def prepare(self):9        user = User.objects.create_user("test_user4", "test@test.com", "test_password")10        self.client.force_authenticate(user=user)11        to_do_id_1 = create_todo(self.client, "ToDoList1").data["id"]12        to_do_id_2 = create_todo(self.client, "ToDoList2").data["id"]13        return to_do_id_1, to_do_id_214    def get(self, expected_titles, todo_id=None, finished=None):15        url = reverse("ToDoItems-list")16        data = {}17        if finished is not None:18            data["finished"] = finished19        if todo_id is not None:20            data["parent"] = todo_id21        response = self.client.get(url, data, format="json")22        self.assertEqual(response.status_code, status.HTTP_200_OK)23        real_titles = [(d["text"], d["parent"]) for d in response.data["results"]]24        self.assertEqual(real_titles, expected_titles)25        if finished is not None:26            item_status = [data["finished"] for data in response.data["results"]]27            self.assertEqual(finished, all(item_status))28    def post(self, item_text, todo_id, finished=None):29        url = reverse("ToDoItems-list")30        if finished is not None:31            data = {"text": item_text, "parent": todo_id, "finished": finished}32        else:33            data = {"text": item_text, "parent": todo_id}34        response = self.client.post(url, data, format="json")35        self.assertEqual(response.status_code, status.HTTP_201_CREATED)36        check_finished = False if (finished is None) else finished37        self.assertEqual(response.data["text"], item_text)38        self.assertEqual(response.data["parent"], todo_id)39        self.assertEqual(response.data["finished"], check_finished)40        return response.data["id"], response.data["finished"]41    def get_by_id(self, id, text, finished, parent):42        url_with_id = reverse("ToDoItems-detail", args=(id,))43        response = self.client.get(url_with_id, {id: id}, format="json")44        self.assertEqual(response.status_code, status.HTTP_200_OK)45        self.assertEqual(response.data["text"], text)46        self.assertEqual(response.data["finished"], finished)47        self.assertEqual(response.data["parent"], parent)48    def put(self, id, text, parent, finished=None):49        url_with_id = reverse("ToDoItems-detail", args=(id,))50        data = {"text": text, "parent": parent}51        if finished is not None:52            data["finished"] = finished53        response = self.client.put(url_with_id, data, format="json")54        self.assertEqual(response.status_code, status.HTTP_200_OK)55        self.assertEqual(response.data["text"], text)56        self.assertEqual(response.data["parent"], parent)57        if finished is not None:58            self.assertEqual(response.data["finished"], finished)59    def patch(self, id, text=None, finished=None, parent=None):60        url_with_id = reverse("ToDoItems-detail", args=(id,))61        data = {}62        if text is not None:63            data["text"] = text64        if finished is not None:65            data["finished"] = finished66        if parent is not None:67            data["parent"] = parent68        response = self.client.patch(url_with_id, data, format="json")69        self.assertEqual(response.status_code, status.HTTP_200_OK)70        if text is not None:71            self.assertEqual(response.data["text"], text)72        if finished is not None:73            self.assertEqual(response.data["finished"], finished)74        if parent is not None:75            self.assertEqual(response.data["parent"], parent)76    def delete(self, id, title, finished, to_do_id):77        self.get_by_id(id, title, finished, to_do_id)78        url_with_id = reverse("ToDoItems-detail", args=(id,))79        response = self.client.delete(url_with_id, {}, format="json")80        self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)81    def test_create_delete(self):82        """83        /todo_items/: get, post (create)84        /todo_items/{id}/: get (read), delete85        """86        to_do_id_1, to_do_id_2 = self.prepare()87        self.get([], to_do_id_1)88        item_text_1, item_text_2, item_text_3, item_text_4 = "Item1", "Item2", "Item3", "Item4"89        item_id_1, item_finished_1 = self.post(item_text_1, to_do_id_1)90        self.get([(item_text_1, to_do_id_1)], to_do_id_1)91        item_id_2, item_finished_2 = self.post(item_text_2, to_do_id_1, finished=False)92        self.get([(item_text_1, to_do_id_1), (item_text_2, to_do_id_1)], to_do_id_1)93        item_id_3, item_finished_3 = self.post(item_text_3, to_do_id_1, finished=True)94        self.get(95            [(item_text_1, to_do_id_1), (item_text_2, to_do_id_1), (item_text_3, to_do_id_1)],96            to_do_id_1,97        )98        item_id_4, item_finished_4 = self.post(item_text_4, to_do_id_2, finished=False)99        self.get(100            [101                (item_text_1, to_do_id_1),102                (item_text_2, to_do_id_1),103                (item_text_3, to_do_id_1),104                (item_text_4, to_do_id_2),105            ]106        )107        self.get(108            [(item_text_1, to_do_id_1), (item_text_2, to_do_id_1), (item_text_3, to_do_id_1)],109            to_do_id_1,110        )111        self.get([(item_text_1, to_do_id_1), (item_text_2, to_do_id_1)], to_do_id_1, finished=False)112        self.get([(item_text_3, to_do_id_1)], to_do_id_1, finished=True)113        self.get_by_id(item_id_1, item_text_1, item_finished_1, to_do_id_1)114        self.get_by_id(item_id_2, item_text_2, item_finished_2, to_do_id_1)115        self.get_by_id(item_id_3, item_text_3, item_finished_3, to_do_id_1)116        self.delete(item_id_3, item_text_3, item_finished_3, to_do_id_1)117        self.get([(item_text_1, to_do_id_1), (item_text_2, to_do_id_1)], to_do_id_1)118    def test_update(self):119        """120        /todo_items/{id}/: put (update), patch (partial_update)121        """122        to_do_id_1, to_do_id_2 = self.prepare()123        item_text_1 = "Item1"124        item_id_1, item_finished_1 = self.post(item_text_1, to_do_id_1)125        item_text_1_2 = "Item5"126        self.put(item_id_1, item_text_1_2, to_do_id_2)127        self.put(item_id_1, item_text_1_2, to_do_id_2, finished=False)128        self.put(item_id_1, item_text_1_2, to_do_id_2, finished=True)129        item_text_1_3 = "Item6"130        self.patch(item_id_1, parent=to_do_id_1)131        self.patch(item_id_1, finished=True)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
