Best Python code snippet using pandera_python
combiners.py
Source:combiners.py  
1#2# Licensed to the Apache Software Foundation (ASF) under one or more3# contributor license agreements.  See the NOTICE file distributed with4# this work for additional information regarding copyright ownership.5# The ASF licenses this file to You under the Apache License, Version 2.06# (the "License"); you may not use this file except in compliance with7# the License.  You may obtain a copy of the License at8#9#    http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17"""A library of basic combiner PTransform subclasses."""18# pytype: skip-file19from __future__ import absolute_import20from __future__ import division21import heapq22import operator23import random24import sys25import warnings26from builtins import object27from builtins import zip28from typing import Any29from typing import Dict30from typing import Iterable31from typing import List32from typing import Set33from typing import Tuple34from typing import TypeVar35from typing import Union36from past.builtins import long37from apache_beam import typehints38from apache_beam.transforms import core39from apache_beam.transforms import cy_combiners40from apache_beam.transforms import ptransform41from apache_beam.transforms import window42from apache_beam.transforms.display import DisplayDataItem43from apache_beam.typehints import with_input_types44from apache_beam.typehints import with_output_types45from apache_beam.utils.timestamp import Duration46from apache_beam.utils.timestamp import Timestamp47__all__ = [48    'Count', 'Mean', 'Sample', 'Top', 'ToDict', 'ToList', 'ToSet', 'Latest'49]50# Type variables51T = TypeVar('T')52K = TypeVar('K')53V = TypeVar('V')54TimestampType = Union[int, float, Timestamp, Duration]55class Mean(object):56  """Combiners for computing arithmetic means of elements."""57  class Globally(ptransform.PTransform):58    """combiners.Mean.Globally computes the arithmetic mean of the elements."""59    def expand(self, pcoll):60      return pcoll | core.CombineGlobally(MeanCombineFn())61  class PerKey(ptransform.PTransform):62    """combiners.Mean.PerKey finds the means of the values for each key."""63    def expand(self, pcoll):64      return pcoll | core.CombinePerKey(MeanCombineFn())65# TODO(laolu): This type signature is overly restrictive. This should be66# more general.67@with_input_types(Union[float, int, long])68@with_output_types(float)69class MeanCombineFn(core.CombineFn):70  """CombineFn for computing an arithmetic mean."""71  def create_accumulator(self):72    return (0, 0)73  def add_input(self, sum_count, element):74    (sum_, count) = sum_count75    return sum_ + element, count + 176  def merge_accumulators(self, accumulators):77    sums, counts = zip(*accumulators)78    return sum(sums), sum(counts)79  def extract_output(self, sum_count):80    (sum_, count) = sum_count81    if count == 0:82      return float('NaN')83    return sum_ / float(count)84  def for_input_type(self, input_type):85    if input_type is int:86      return cy_combiners.MeanInt64Fn()87    elif input_type is float:88      return cy_combiners.MeanFloatFn()89    return self90class Count(object):91  """Combiners for counting elements."""92  class Globally(ptransform.PTransform):93    """combiners.Count.Globally counts the total number of elements."""94    def expand(self, pcoll):95      return pcoll | core.CombineGlobally(CountCombineFn())96  class PerKey(ptransform.PTransform):97    """combiners.Count.PerKey counts how many elements each unique key has."""98    def expand(self, pcoll):99      return pcoll | core.CombinePerKey(CountCombineFn())100  class PerElement(ptransform.PTransform):101    """combiners.Count.PerElement counts how many times each element occurs."""102    def expand(self, pcoll):103      paired_with_void_type = typehints.Tuple[pcoll.element_type, Any]104      output_type = typehints.KV[pcoll.element_type, int]105      return (106          pcoll107          | (108              '%s:PairWithVoid' % self.label >> core.Map(109                  lambda x: (x, None)).with_output_types(paired_with_void_type))110          | core.CombinePerKey(CountCombineFn()).with_output_types(output_type))111@with_input_types(Any)112@with_output_types(int)113class CountCombineFn(core.CombineFn):114  """CombineFn for computing PCollection size."""115  def create_accumulator(self):116    return 0117  def add_input(self, accumulator, element):118    return accumulator + 1119  def add_inputs(self, accumulator, elements):120    return accumulator + len(list(elements))121  def merge_accumulators(self, accumulators):122    return sum(accumulators)123  def extract_output(self, accumulator):124    return accumulator125class Top(object):126  """Combiners for obtaining extremal elements."""127  # pylint: disable=no-self-argument128  class Of(ptransform.PTransform):129    """Obtain a list of the compare-most N elements in a PCollection.130    This transform will retrieve the n greatest elements in the PCollection131    to which it is applied, where "greatest" is determined by the comparator132    function supplied as the compare argument.133    """134    def _py2__init__(self, n, compare=None, *args, **kwargs):135      """Initializer.136      compare should be an implementation of "a < b" taking at least two137      arguments (a and b). Additional arguments and side inputs specified in138      the apply call become additional arguments to the comparator. Defaults to139      the natural ordering of the elements.140      The arguments 'key' and 'reverse' may instead be passed as keyword141      arguments, and have the same meaning as for Python's sort functions.142      Args:143        pcoll: PCollection to process.144        n: number of elements to extract from pcoll.145        compare: as described above.146        *args: as described above.147        **kwargs: as described above.148      """149      if compare:150        warnings.warn(151            'Compare not available in Python 3, use key instead.',152            DeprecationWarning)153      self._n = n154      self._compare = compare155      self._key = kwargs.pop('key', None)156      self._reverse = kwargs.pop('reverse', False)157      self._args = args158      self._kwargs = kwargs159    def _py3__init__(self, n, **kwargs):160      """Creates a global Top operation.161      The arguments 'key' and 'reverse' may be passed as keyword arguments,162      and have the same meaning as for Python's sort functions.163      Args:164        pcoll: PCollection to process.165        n: number of elements to extract from pcoll.166        **kwargs: may contain 'key' and/or 'reverse'167      """168      unknown_kwargs = set(kwargs.keys()) - set(['key', 'reverse'])169      if unknown_kwargs:170        raise ValueError(171            'Unknown keyword arguments: ' + ', '.join(unknown_kwargs))172      self._py2__init__(n, None, **kwargs)173    # Python 3 sort does not accept a comparison operator, and nor do we.174    # FIXME: mypy would handle this better if we placed the _py*__init__ funcs175    #  inside the if/else block below:176    if sys.version_info[0] < 3:177      __init__ = _py2__init__178    else:179      __init__ = _py3__init__  # type: ignore180    def default_label(self):181      return 'Top(%d)' % self._n182    def expand(self, pcoll):183      compare = self._compare184      if (not self._args and not self._kwargs and pcoll.windowing.is_default()):185        if self._reverse:186          if compare is None or compare is operator.lt:187            compare = operator.gt188          else:189            original_compare = compare190            compare = lambda a, b: original_compare(b, a)191        # This is a more efficient global algorithm.192        top_per_bundle = pcoll | core.ParDo(193            _TopPerBundle(self._n, compare, self._key))194        # If pcoll is empty, we can't guerentee that top_per_bundle195        # won't be empty, so inject at least one empty accumulator196        # so that downstream is guerenteed to produce non-empty output.197        empty_bundle = pcoll.pipeline | core.Create([(None, [])])198        return ((top_per_bundle, empty_bundle) | core.Flatten()199                | core.GroupByKey()200                | core.ParDo(_MergeTopPerBundle(self._n, compare, self._key)))201      else:202        return pcoll | core.CombineGlobally(203            TopCombineFn(self._n, compare, self._key, self._reverse),204            *self._args,205            **self._kwargs)206  class PerKey(ptransform.PTransform):207    """Identifies the compare-most N elements associated with each key.208    This transform will produce a PCollection mapping unique keys in the input209    PCollection to the n greatest elements with which they are associated, where210    "greatest" is determined by the comparator function supplied as the compare211    argument in the initializer.212    """213    def _py2__init__(self, n, compare=None, *args, **kwargs):214      """Initializer.215      compare should be an implementation of "a < b" taking at least two216      arguments (a and b). Additional arguments and side inputs specified in217      the apply call become additional arguments to the comparator.  Defaults to218      the natural ordering of the elements.219      The arguments 'key' and 'reverse' may instead be passed as keyword220      arguments, and have the same meaning as for Python's sort functions.221      Args:222        n: number of elements to extract from input.223        compare: as described above.224        *args: as described above.225        **kwargs: as described above.226      """227      if compare:228        warnings.warn(229            'Compare not available in Python 3, use key instead.',230            DeprecationWarning)231      self._n = n232      self._compare = compare233      self._key = kwargs.pop('key', None)234      self._reverse = kwargs.pop('reverse', False)235      self._args = args236      self._kwargs = kwargs237    def _py3__init__(self, n, **kwargs):238      """Creates a per-key Top operation.239      The arguments 'key' and 'reverse' may be passed as keyword arguments,240      and have the same meaning as for Python's sort functions.241      Args:242        pcoll: PCollection to process.243        n: number of elements to extract from pcoll.244        **kwargs: may contain 'key' and/or 'reverse'245      """246      unknown_kwargs = set(kwargs.keys()) - set(['key', 'reverse'])247      if unknown_kwargs:248        raise ValueError(249            'Unknown keyword arguments: ' + ', '.join(unknown_kwargs))250      self._py2__init__(n, None, **kwargs)251    # Python 3 sort does not accept a comparison operator, and nor do we.252    if sys.version_info[0] < 3:253      __init__ = _py2__init__254    else:255      __init__ = _py3__init__  # type: ignore256    def default_label(self):257      return 'TopPerKey(%d)' % self._n258    def expand(self, pcoll):259      """Expands the transform.260      Raises TypeCheckError: If the output type of the input PCollection is not261      compatible with Tuple[A, B].262      Args:263        pcoll: PCollection to process264      Returns:265        the PCollection containing the result.266      """267      return pcoll | core.CombinePerKey(268          TopCombineFn(self._n, self._compare, self._key, self._reverse),269          *self._args,270          **self._kwargs)271  @staticmethod272  @ptransform.ptransform_fn273  def Largest(pcoll, n):274    """Obtain a list of the greatest N elements in a PCollection."""275    return pcoll | Top.Of(n)276  @staticmethod277  @ptransform.ptransform_fn278  def Smallest(pcoll, n):279    """Obtain a list of the least N elements in a PCollection."""280    return pcoll | Top.Of(n, reverse=True)281  @staticmethod282  @ptransform.ptransform_fn283  def LargestPerKey(pcoll, n):284    """Identifies the N greatest elements associated with each key."""285    return pcoll | Top.PerKey(n)286  @staticmethod287  @ptransform.ptransform_fn288  def SmallestPerKey(pcoll, n, reverse=True):289    """Identifies the N least elements associated with each key."""290    return pcoll | Top.PerKey(n, reverse=True)291@with_input_types(T)292@with_output_types(Tuple[None, List[T]])293class _TopPerBundle(core.DoFn):294  def __init__(self, n, less_than, key):295    self._n = n296    self._less_than = None if less_than is operator.le else less_than297    self._key = key298  def start_bundle(self):299    self._heap = []300  def process(self, element):301    if self._less_than or self._key:302      element = cy_combiners.ComparableValue(303          element, self._less_than, self._key)304    if len(self._heap) < self._n:305      heapq.heappush(self._heap, element)306    else:307      heapq.heappushpop(self._heap, element)308  def finish_bundle(self):309    # Though sorting here results in more total work, this allows us to310    # skip most elements in the reducer.311    # Essentially, given s map bundles, we are trading about O(sn) compares in312    # the (single) reducer for O(sn log n) compares across all mappers.313    self._heap.sort()314    # Unwrap to avoid serialization via pickle.315    if self._less_than or self._key:316      yield window.GlobalWindows.windowed_value(317          (None, [wrapper.value for wrapper in self._heap]))318    else:319      yield window.GlobalWindows.windowed_value((None, self._heap))320@with_input_types(Tuple[None, Iterable[List[T]]])321@with_output_types(List[T])322class _MergeTopPerBundle(core.DoFn):323  def __init__(self, n, less_than, key):324    self._n = n325    self._less_than = None if less_than is operator.lt else less_than326    self._key = key327  def process(self, key_and_bundles):328    _, bundles = key_and_bundles329    def push(hp, e):330      if len(hp) < self._n:331        heapq.heappush(hp, e)332        return False333      elif e < hp[0]:334        # Because _TopPerBundle returns sorted lists, all other elements335        # will also be smaller.336        return True337      else:338        heapq.heappushpop(hp, e)339        return False340    if self._less_than or self._key:341      heapc = []  # type: List[cy_combiners.ComparableValue]342      for bundle in bundles:343        if not heapc:344          heapc = [345              cy_combiners.ComparableValue(element, self._less_than, self._key)346              for element in bundle347          ]348          continue349        for element in reversed(bundle):350          if push(heapc,351                  cy_combiners.ComparableValue(element,352                                               self._less_than,353                                               self._key)):354            break355      heapc.sort()356      yield [wrapper.value for wrapper in reversed(heapc)]357    else:358      heap = []359      for bundle in bundles:360        if not heap:361          heap = bundle362          continue363        for element in reversed(bundle):364          if push(heap, element):365            break366      heap.sort()367      yield heap[::-1]368@with_input_types(T)369@with_output_types(List[T])370class TopCombineFn(core.CombineFn):371  """CombineFn doing the combining for all of the Top transforms.372  This CombineFn uses a key or comparison operator to rank the elements.373  Args:374    compare: (optional) an implementation of "a < b" taking at least two375        arguments (a and b). Additional arguments and side inputs specified376        in the apply call become additional arguments to the comparator.377    key: (optional) a mapping of elements to a comparable key, similar to378        the key argument of Python's sorting methods.379    reverse: (optional) whether to order things smallest to largest, rather380        than largest to smallest381  """382  # TODO(robertwb): For Python 3, remove compare and only keep key.383  def __init__(self, n, compare=None, key=None, reverse=False):384    self._n = n385    if compare is operator.lt:386      compare = None387    elif compare is operator.gt:388      compare = None389      reverse = not reverse390    if compare:391      self._compare = ((392          lambda a, b, *args, **kwargs: not compare(a, b, *args, **kwargs))393                       if reverse else compare)394    else:395      self._compare = operator.gt if reverse else operator.lt396    self._less_than = None397    self._key = key398  def _hydrated_heap(self, heap):399    if heap:400      first = heap[0]401      if isinstance(first, cy_combiners.ComparableValue):402        if first.requires_hydration:403          assert self._less_than is not None404          for comparable in heap:405            assert comparable.requires_hydration406            comparable.hydrate(self._less_than, self._key)407            assert not comparable.requires_hydration408          return heap409        else:410          return heap411      else:412        assert self._less_than is not None413        return [414            cy_combiners.ComparableValue(element, self._less_than, self._key)415            for element in heap416        ]417    else:418      return heap419  def display_data(self):420    return {421        'n': self._n,422        'compare': DisplayDataItem(423            self._compare.__name__ if hasattr(self._compare, '__name__') else424            self._compare.__class__.__name__).drop_if_none()425    }426  # The accumulator type is a tuple427  # (bool, Union[List[T], List[ComparableValue[T]])428  # where the boolean indicates whether the second slot contains a List of T429  # (False) or List of ComparableValue[T] (True). In either case, the List430  # maintains heap invariance. When the contents of the List are431  # ComparableValue[T] they either all 'requires_hydration' or none do.432  # This accumulator representation allows us to minimize the data encoding433  # overheads. Creation of ComparableValues is elided for performance reasons434  # when there is no need for complicated comparison functions.435  def create_accumulator(self, *args, **kwargs):436    return (False, [])437  def add_input(self, accumulator, element, *args, **kwargs):438    # Caching to avoid paying the price of variadic expansion of args / kwargs439    # when it's not needed (for the 'if' case below).440    if self._less_than is None:441      if args or kwargs:442        self._less_than = lambda a, b: self._compare(a, b, *args, **kwargs)443      else:444        self._less_than = self._compare445    holds_comparables, heap = accumulator446    if self._less_than is not operator.lt or self._key:447      heap = self._hydrated_heap(heap)448      holds_comparables = True449    else:450      assert not holds_comparables451    comparable = (452        cy_combiners.ComparableValue(element, self._less_than, self._key)453        if holds_comparables else element)454    if len(heap) < self._n:455      heapq.heappush(heap, comparable)456    else:457      heapq.heappushpop(heap, comparable)458    return (holds_comparables, heap)459  def merge_accumulators(self, accumulators, *args, **kwargs):460    if args or kwargs:461      self._less_than = lambda a, b: self._compare(a, b, *args, **kwargs)462      add_input = lambda accumulator, element: self.add_input(463          accumulator, element, *args, **kwargs)464    else:465      self._less_than = self._compare466      add_input = self.add_input467    result_heap = None468    holds_comparables = None469    for accumulator in accumulators:470      holds_comparables, heap = accumulator471      if self._less_than is not operator.lt or self._key:472        heap = self._hydrated_heap(heap)473        holds_comparables = True474      else:475        assert not holds_comparables476      if result_heap is None:477        result_heap = heap478      else:479        for comparable in heap:480          _, result_heap = add_input(481              (holds_comparables, result_heap),482              comparable.value if holds_comparables else comparable)483    assert result_heap is not None and holds_comparables is not None484    return (holds_comparables, result_heap)485  def compact(self, accumulator, *args, **kwargs):486    holds_comparables, heap = accumulator487    # Unwrap to avoid serialization via pickle.488    if holds_comparables:489      return (False, [comparable.value for comparable in heap])490    else:491      return accumulator492  def extract_output(self, accumulator, *args, **kwargs):493    if args or kwargs:494      self._less_than = lambda a, b: self._compare(a, b, *args, **kwargs)495    else:496      self._less_than = self._compare497    holds_comparables, heap = accumulator498    if self._less_than is not operator.lt or self._key:499      if not holds_comparables:500        heap = self._hydrated_heap(heap)501        holds_comparables = True502    else:503      assert not holds_comparables504    assert len(heap) <= self._n505    heap.sort(reverse=True)506    return [507        comparable.value if holds_comparables else comparable508        for comparable in heap509    ]510class Largest(TopCombineFn):511  def default_label(self):512    return 'Largest(%s)' % self._n513class Smallest(TopCombineFn):514  def __init__(self, n):515    super(Smallest, self).__init__(n, reverse=True)516  def default_label(self):517    return 'Smallest(%s)' % self._n518class Sample(object):519  """Combiners for sampling n elements without replacement."""520  # pylint: disable=no-self-argument521  class FixedSizeGlobally(ptransform.PTransform):522    """Sample n elements from the input PCollection without replacement."""523    def __init__(self, n):524      self._n = n525    def expand(self, pcoll):526      return pcoll | core.CombineGlobally(SampleCombineFn(self._n))527    def display_data(self):528      return {'n': self._n}529    def default_label(self):530      return 'FixedSizeGlobally(%d)' % self._n531  class FixedSizePerKey(ptransform.PTransform):532    """Sample n elements associated with each key without replacement."""533    def __init__(self, n):534      self._n = n535    def expand(self, pcoll):536      return pcoll | core.CombinePerKey(SampleCombineFn(self._n))537    def display_data(self):538      return {'n': self._n}539    def default_label(self):540      return 'FixedSizePerKey(%d)' % self._n541@with_input_types(T)542@with_output_types(List[T])543class SampleCombineFn(core.CombineFn):544  """CombineFn for all Sample transforms."""545  def __init__(self, n):546    super(SampleCombineFn, self).__init__()547    # Most of this combiner's work is done by a TopCombineFn. We could just548    # subclass TopCombineFn to make this class, but since sampling is not549    # really a kind of Top operation, we use a TopCombineFn instance as a550    # helper instead.551    self._top_combiner = TopCombineFn(n)552  def create_accumulator(self):553    return self._top_combiner.create_accumulator()554  def add_input(self, heap, element):555    # Before passing elements to the Top combiner, we pair them with random556    # numbers. The elements with the n largest random number "keys" will be557    # selected for the output.558    return self._top_combiner.add_input(heap, (random.random(), element))559  def merge_accumulators(self, heaps):560    return self._top_combiner.merge_accumulators(heaps)561  def compact(self, heap):562    return self._top_combiner.compact(heap)563  def extract_output(self, heap):564    # Here we strip off the random number keys we added in add_input.565    return [e for _, e in self._top_combiner.extract_output(heap)]566class _TupleCombineFnBase(core.CombineFn):567  def __init__(self, *combiners):568    self._combiners = [core.CombineFn.maybe_from_callable(c) for c in combiners]569    self._named_combiners = combiners570  def display_data(self):571    combiners = [572        c.__name__ if hasattr(c, '__name__') else c.__class__.__name__573        for c in self._named_combiners574    ]575    return {'combiners': str(combiners)}576  def create_accumulator(self):577    return [c.create_accumulator() for c in self._combiners]578  def merge_accumulators(self, accumulators):579    return [580        c.merge_accumulators(a) for c,581        a in zip(self._combiners, zip(*accumulators))582    ]583  def compact(self, accumulator):584    return [c.compact(a) for c, a in zip(self._combiners, accumulator)]585  def extract_output(self, accumulator):586    return tuple(587        [c.extract_output(a) for c, a in zip(self._combiners, accumulator)])588class TupleCombineFn(_TupleCombineFnBase):589  """A combiner for combining tuples via a tuple of combiners.590  Takes as input a tuple of N CombineFns and combines N-tuples by591  combining the k-th element of each tuple with the k-th CombineFn,592  outputting a new N-tuple of combined values.593  """594  def add_input(self, accumulator, element):595    return [596        c.add_input(a, e) for c,597        a,598        e in zip(self._combiners, accumulator, element)599    ]600  def with_common_input(self):601    return SingleInputTupleCombineFn(*self._combiners)602class SingleInputTupleCombineFn(_TupleCombineFnBase):603  """A combiner for combining a single value via a tuple of combiners.604  Takes as input a tuple of N CombineFns and combines elements by605  applying each CombineFn to each input, producing an N-tuple of606  the outputs corresponding to each of the N CombineFn's outputs.607  """608  def add_input(self, accumulator, element):609    return [610        c.add_input(a, element) for c, a in zip(self._combiners, accumulator)611    ]612class ToList(ptransform.PTransform):613  """A global CombineFn that condenses a PCollection into a single list."""614  def __init__(self, label='ToList'):  # pylint: disable=useless-super-delegation615    super(ToList, self).__init__(label)616  def expand(self, pcoll):617    return pcoll | self.label >> core.CombineGlobally(ToListCombineFn())618@with_input_types(T)619@with_output_types(List[T])620class ToListCombineFn(core.CombineFn):621  """CombineFn for to_list."""622  def create_accumulator(self):623    return []624  def add_input(self, accumulator, element):625    accumulator.append(element)626    return accumulator627  def merge_accumulators(self, accumulators):628    return sum(accumulators, [])629  def extract_output(self, accumulator):630    return accumulator631class ToDict(ptransform.PTransform):632  """A global CombineFn that condenses a PCollection into a single dict.633  PCollections should consist of 2-tuples, notionally (key, value) pairs.634  If multiple values are associated with the same key, only one of the values635  will be present in the resulting dict.636  """637  def __init__(self, label='ToDict'):  # pylint: disable=useless-super-delegation638    super(ToDict, self).__init__(label)639  def expand(self, pcoll):640    return pcoll | self.label >> core.CombineGlobally(ToDictCombineFn())641@with_input_types(Tuple[K, V])642@with_output_types(Dict[K, V])643class ToDictCombineFn(core.CombineFn):644  """CombineFn for to_dict."""645  def create_accumulator(self):646    return dict()647  def add_input(self, accumulator, element):648    key, value = element649    accumulator[key] = value650    return accumulator651  def merge_accumulators(self, accumulators):652    result = dict()653    for a in accumulators:654      result.update(a)655    return result656  def extract_output(self, accumulator):657    return accumulator658class ToSet(ptransform.PTransform):659  """A global CombineFn that condenses a PCollection into a set."""660  def __init__(self, label='ToSet'):  # pylint: disable=useless-super-delegation661    super(ToSet, self).__init__(label)662  def expand(self, pcoll):663    return pcoll | self.label >> core.CombineGlobally(ToSetCombineFn())664@with_input_types(T)665@with_output_types(Set[T])666class ToSetCombineFn(core.CombineFn):667  """CombineFn for ToSet."""668  def create_accumulator(self):669    return set()670  def add_input(self, accumulator, element):671    accumulator.add(element)672    return accumulator673  def merge_accumulators(self, accumulators):674    return set.union(*accumulators)675  def extract_output(self, accumulator):676    return accumulator677class _CurriedFn(core.CombineFn):678  """Wrapped CombineFn with extra arguments."""679  def __init__(self, fn, args, kwargs):680    self.fn = fn681    self.args = args682    self.kwargs = kwargs683  def create_accumulator(self):684    return self.fn.create_accumulator(*self.args, **self.kwargs)685  def add_input(self, accumulator, element):686    return self.fn.add_input(accumulator, element, *self.args, **self.kwargs)687  def merge_accumulators(self, accumulators):688    return self.fn.merge_accumulators(accumulators, *self.args, **self.kwargs)689  def compact(self, accumulator):690    return self.fn.compact(accumulator, *self.args, **self.kwargs)691  def extract_output(self, accumulator):692    return self.fn.extract_output(accumulator, *self.args, **self.kwargs)693  def apply(self, elements):694    return self.fn.apply(elements, *self.args, **self.kwargs)695def curry_combine_fn(fn, args, kwargs):696  if not args and not kwargs:697    return fn698  else:699    return _CurriedFn(fn, args, kwargs)700class PhasedCombineFnExecutor(object):701  """Executor for phases of combine operations."""702  def __init__(self, phase, fn, args, kwargs):703    self.combine_fn = curry_combine_fn(fn, args, kwargs)704    if phase == 'all':705      self.apply = self.full_combine706    elif phase == 'add':707      self.apply = self.add_only708    elif phase == 'merge':709      self.apply = self.merge_only710    elif phase == 'extract':711      self.apply = self.extract_only712    else:713      raise ValueError('Unexpected phase: %s' % phase)714  def full_combine(self, elements):715    return self.combine_fn.apply(elements)716  def add_only(self, elements):717    return self.combine_fn.add_inputs(718        self.combine_fn.create_accumulator(), elements)719  def merge_only(self, accumulators):720    return self.combine_fn.merge_accumulators(accumulators)721  def extract_only(self, accumulator):722    return self.combine_fn.extract_output(accumulator)723class Latest(object):724  """Combiners for computing the latest element"""725  @with_input_types(T)726  @with_output_types(T)727  class Globally(ptransform.PTransform):728    """Compute the element with the latest timestamp from a729    PCollection."""730    @staticmethod731    def add_timestamp(element, timestamp=core.DoFn.TimestampParam):732      return [(element, timestamp)]733    def expand(self, pcoll):734      return (735          pcoll736          | core.ParDo(self.add_timestamp).with_output_types(737              Tuple[T, TimestampType])  # type: ignore[misc]738          | core.CombineGlobally(LatestCombineFn()))739  @with_input_types(Tuple[K, V])740  @with_output_types(Tuple[K, V])741  class PerKey(ptransform.PTransform):742    """Compute elements with the latest timestamp for each key743    from a keyed PCollection"""744    @staticmethod745    def add_timestamp(element, timestamp=core.DoFn.TimestampParam):746      key, value = element747      return [(key, (value, timestamp))]748    def expand(self, pcoll):749      return (750          pcoll751          | core.ParDo(self.add_timestamp).with_output_types(752              Tuple[K, Tuple[T, TimestampType]])  # type: ignore[misc]753          | core.CombinePerKey(LatestCombineFn()))754@with_input_types(Tuple[T, TimestampType])  # type: ignore[misc]755@with_output_types(T)756class LatestCombineFn(core.CombineFn):757  """CombineFn to get the element with the latest timestamp758  from a PCollection."""759  def create_accumulator(self):760    return (None, window.MIN_TIMESTAMP)761  def add_input(self, accumulator, element):762    if accumulator[1] > element[1]:763      return accumulator764    else:765      return element766  def merge_accumulators(self, accumulators):767    result = self.create_accumulator()768    for accumulator in accumulators:769      result = self.add_input(result, accumulator)770    return result771  def extract_output(self, accumulator):...container.py
Source:container.py  
...41    @type _queue: List42      The end of the list represents the *front* of the queue, that is,43      the next item to be removed.44    @type _less_than: Callable[[Object, Object], bool]45      If x._less_than(y) is true, then x has higher priority than y46      and should be removed from the queue before y.47    === Representation Invariants ===48    - all elements of _queue are of the same type49    - the elements of _queue are appropriate arguments for function less_than50    - the elements of _queue are in order according to function less_than.51    """52    def __init__(self, less_than):53        """Initialize this to an empty PriorityQueue.54        @type self: PriorityQueue55        @type less_than: Callable[[Object, Object], bool]56            Determines the relative priority of two elements of the queue.57            If x._less_than(y) is true, then x has higher priority than y.58        @rtype: None59        """60        self._queue = []61        self._less_than = less_than62    def add(self, item):63        """Add <item> to this PriorityQueue.64        @type self: PriorityQueue65        @type item: Object66        @rtype: None67        >>> def shorter(a, b):68        ...    return len(a) < len(b)69        ...70        >>>71        >>> # Define a PriorityQueue with priority on shorter strings.72        >>> # I.e., when we remove, we get the shortest remaining string.73        >>> pq = PriorityQueue(shorter)74        >>> pq.add('fred')75        >>> pq.add('arju')76        >>> pq.add('monalisa')77        >>> pq.add('hat')78        >>> pq._queue79        ['monalisa', 'arju', 'fred', 'hat']80        >>> pq.remove()81        'hat'82        >>> pq._queue83        ['monalisa', 'arju', 'fred']84        """85        if len(self._queue) == 0:86            self._queue.append(item)87        else:88            if self._less_than(item, self._queue[-1]):89                self._queue.append(item)90            elif self._less_than(self._queue[0], item):91                self._queue = [item] + self._queue92            else:93                for itr in self._queue:94                    if not self._less_than(item, itr):95                        self._queue.insert(self._queue.index(itr), item)96                        break97    def remove(self):98        """Remove and return the next item from this PriorityQueue.99        Precondition: this priority queue is non-empty.100        @type self: PriorityQueue101        @rtype: Object102        >>> def shorter(a, b):103        ...    return len(a) < len(b)104        ...105        >>>106        >>> # When we hit the tie, the one that was added first will be107        >>> # removed first.108        >>> pq = PriorityQueue(shorter)...leetcode-973.py
Source:leetcode-973.py  
...9        # å°æ°ç»çå k 个å
ç´ å å10        for idx in range(k // 2, -1, -1):11            self.adjust(points, idx, k - 1)12        for idx in range(k, len(points)):13            if self._less_than(points[0], points[idx]):14                continue15            points[0], points[idx] = points[idx], points[0]16            self.adjust(points, 0, k - 1)17        return points[:k]18    def _less_than(self, point1: List[int], point2: List[int]) -> bool:  # noqa19        return (point1[0] * point1[0] + point1[1] * point1[1]) < (point2[0] * point2[0] + point2[1] * point2[1])20    # 大根å 21    def adjust(self, arr: List[List[int]], start: int, end: int) -> None:  # noqa22        if start >= end:23            return24        while 2 * start + 1 <= end:25            left: int = 2 * start + 126            right: int = 2 * start + 227            # å¦ææ²¡æå³å©å28            if right > end:29                if self._less_than(arr[start], arr[left]):30                    arr[start], arr[left] = arr[left], arr[start]31                break32            if not self._less_than(arr[start], arr[left]) and not self._less_than(arr[start], arr[right]):33                break34            if not self._less_than(arr[left], arr[right]):35                # ä¸å·¦å©å交æ¢36                arr[start], arr[left] = arr[left], arr[start]37                start = left38                continue39            arr[start], arr[right] = arr[right], arr[start]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
