Best Python code snippet using avocado_python
frames_test.py
Source:frames_test.py  
...93    def wrapper(df):94      df = df.copy()95      func(df)96      return df97    self._run_test(wrapper, arg, **kwargs)98  def _run_test(99      self, func, *args, distributed=True, nonparallel=False, check_proxy=True):100    """Verify that func(*args) produces the same result in pandas and in Beam.101    Args:102        distributed (bool): Whether or not to use PartitioningSession to103            simulate parallel execution.104        nonparallel (bool): Whether or not this function contains a105            non-parallelizable operation. If True, the expression will be106            generated twice, once outside of an allow_non_parallel_operations107            block (to verify NonParallelOperation is raised), and again inside108            of an allow_non_parallel_operations block to actually generate an109            expression to verify.110        check_proxy (bool): Whether or not to check that the proxy of the111            generated expression matches the actual result, defaults to True.112            This option should NOT be set to False in tests added for new113            operations if at all possible. Instead make sure the new operation114            produces the correct proxy. This flag only exists as an escape hatch115            until existing failures can be addressed (BEAM-12379)."""116    # Compute expected value117    expected = func(*args)118    # Compute actual value119    deferred_args = _get_deferred_args(*args)120    if nonparallel:121      # First run outside a nonparallel block to confirm this raises as expected122      with self.assertRaises(expressions.NonParallelOperation) as raised:123        func(*deferred_args)124      if raised.exception.msg.startswith(125          "Encountered non-parallelizable form of"):126        raise AssertionError(127            "Default NonParallelOperation raised, please specify a reason in "128            "the Singleton() partitioning requirement for this operation."129        ) from raised.exception130      # Re-run in an allow non parallel block to get an expression to verify131      with beam.dataframe.allow_non_parallel_operations():132        expr = func(*deferred_args)._expr133    else:134      expr = func(*deferred_args)._expr135    # Compute the result of the generated expression136    session_type = (137        expressions.PartitioningSession if distributed else expressions.Session)138    actual = session_type({}).evaluate(expr)139    # Verify140    if isinstance(expected, pd.core.generic.NDFrame):141      if distributed:142        if expected.index.is_unique:143          expected = expected.sort_index()144          actual = actual.sort_index()145        else:146          expected = expected.sort_values(list(expected.columns))147          actual = actual.sort_values(list(actual.columns))148      if isinstance(expected, pd.Series):149        pd.testing.assert_series_equal(expected, actual)150      elif isinstance(expected, pd.DataFrame):151        pd.testing.assert_frame_equal(expected, actual)152      else:153        raise ValueError(154            f"Expected value is a {type(expected)},"155            "not a Series or DataFrame.")156    else:157      # Expectation is not a pandas object158      if isinstance(expected, float):159        if np.isnan(expected):160          cmp = np.isnan161        else:162          cmp = lambda x: np.isclose(expected, x)163      else:164        cmp = lambda x: x == expected165      self.assertTrue(166          cmp(actual), 'Expected:\n\n%r\n\nActual:\n\n%r' % (expected, actual))167    if check_proxy:168      # Verify that the actual result agrees with the proxy169      proxy = expr.proxy()170      if type(actual) in (np.float32, np.float64):171        self.assertTrue(type(actual) == type(proxy) or np.isnan(proxy))172      else:173        self.assertEqual(type(actual), type(proxy))174      if isinstance(expected, pd.core.generic.NDFrame):175        if isinstance(expected, pd.Series):176          self.assertEqual(actual.dtype, proxy.dtype)177          self.assertEqual(actual.name, proxy.name)178        elif isinstance(expected, pd.DataFrame):179          pd.testing.assert_series_equal(actual.dtypes, proxy.dtypes)180        else:181          raise ValueError(182              f"Expected value is a {type(expected)},"183              "not a Series or DataFrame.")184        self.assertEqual(actual.index.names, proxy.index.names)185        for i in range(actual.index.nlevels):186          self.assertEqual(187              actual.index.get_level_values(i).dtype,188              proxy.index.get_level_values(i).dtype)189class DeferredFrameTest(_AbstractFrameTest):190  """Miscellaneous tessts for DataFrame operations."""191  def test_series_arithmetic(self):192    a = pd.Series([1, 2, 3])193    b = pd.Series([100, 200, 300])194    self._run_test(lambda a, b: a - 2 * b, a, b)195    self._run_test(lambda a, b: a.subtract(2).multiply(b).divide(a), a, b)196  def test_dataframe_arithmetic(self):197    df = pd.DataFrame({'a': [1, 2, 3], 'b': [100, 200, 300]})198    df2 = pd.DataFrame({'a': [3000, 1000, 2000], 'b': [7, 11, 13]})199    self._run_test(lambda df, df2: df - 2 * df2, df, df2)200    self._run_test(201        lambda df, df2: df.subtract(2).multiply(df2).divide(df), df, df2)202  @unittest.skipIf(PD_VERSION < (1, 3), "dropna=False is new in pandas 1.3")203  def test_value_counts_dropna_false(self):204    df = pd.DataFrame({205        'first_name': ['John', 'Anne', 'John', 'Beth'],206        'middle_name': ['Smith', pd.NA, pd.NA, 'Louise']207    })208    # TODO(BEAM-12495): Remove the assertRaises this when the underlying bug in209    # https://github.com/pandas-dev/pandas/issues/36470 is fixed.210    with self.assertRaises(NotImplementedError):211      self._run_test(lambda df: df.value_counts(dropna=False), df)212  def test_get_column(self):213    df = pd.DataFrame({214        'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],215        'Speed': [380., 370., 24., 26.]216    })217    self._run_test(lambda df: df['Animal'], df)218    self._run_test(lambda df: df.Speed, df)219    self._run_test(lambda df: df.get('Animal'), df)220    self._run_test(lambda df: df.get('FOO', df.Animal), df)221  def test_series_xs(self):222    # pandas doctests only verify DataFrame.xs, here we verify Series.xs as well223    d = {224        'num_legs': [4, 4, 2, 2],225        'num_wings': [0, 0, 2, 2],226        'class': ['mammal', 'mammal', 'mammal', 'bird'],227        'animal': ['cat', 'dog', 'bat', 'penguin'],228        'locomotion': ['walks', 'walks', 'flies', 'walks']229    }230    df = pd.DataFrame(data=d)231    df = df.set_index(['class', 'animal', 'locomotion'])232    self._run_test(lambda df: df.num_legs.xs('mammal'), df)233    self._run_test(lambda df: df.num_legs.xs(('mammal', 'dog')), df)234    self._run_test(lambda df: df.num_legs.xs('cat', level=1), df)235    self._run_test(236        lambda df: df.num_legs.xs(('bird', 'walks'), level=[0, 'locomotion']),237        df)238  def test_set_column(self):239    def new_column(df):240      df['NewCol'] = df['Speed']241    df = pd.DataFrame({242        'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],243        'Speed': [380., 370., 24., 26.]244    })245    self._run_inplace_test(new_column, df)246  def test_set_column_from_index(self):247    def new_column(df):248      df['NewCol'] = df.index249    df = pd.DataFrame({250        'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],251        'Speed': [380., 370., 24., 26.]252    })253    self._run_inplace_test(new_column, df)254  def test_tz_localize_ambiguous_series(self):255    # This replicates a tz_localize doctest:256    #   s.tz_localize('CET', ambiguous=np.array([True, True, False]))257    # But using a DeferredSeries instead of a np array258    s = pd.Series(259        range(3),260        index=pd.DatetimeIndex([261            '2018-10-28 01:20:00', '2018-10-28 02:36:00', '2018-10-28 03:46:00'262        ]))263    ambiguous = pd.Series([True, True, False], index=s.index)264    self._run_test(265        lambda s,266        ambiguous: s.tz_localize('CET', ambiguous=ambiguous),267        s,268        ambiguous)269  def test_tz_convert(self):270    # This replicates a tz_localize doctest:271    #   s.tz_localize('CET', ambiguous=np.array([True, True, False]))272    # But using a DeferredSeries instead of a np array273    s = pd.Series(274        range(3),275        index=pd.DatetimeIndex([276            '2018-10-27 01:20:00', '2018-10-27 02:36:00', '2018-10-27 03:46:00'277        ],278                               tz='Europe/Berlin'))279    self._run_test(lambda s: s.tz_convert('America/Los_Angeles'), s)280  def test_sort_index_columns(self):281    df = pd.DataFrame({282        'c': range(10),283        'a': range(10),284        'b': range(10),285        np.nan: range(10),286    })287    self._run_test(lambda df: df.sort_index(axis=1), df)288    self._run_test(lambda df: df.sort_index(axis=1, ascending=False), df)289    self._run_test(lambda df: df.sort_index(axis=1, na_position='first'), df)290  def test_where_callable_args(self):291    df = pd.DataFrame(292        np.arange(10, dtype=np.int64).reshape(-1, 2), columns=['A', 'B'])293    self._run_test(294        lambda df: df.where(lambda df: df % 2 == 0, lambda df: df * 10), df)295  def test_where_concrete_args(self):296    df = pd.DataFrame(297        np.arange(10, dtype=np.int64).reshape(-1, 2), columns=['A', 'B'])298    self._run_test(299        lambda df: df.where(300            df % 2 == 0, pd.Series({301                'A': 123, 'B': 456302            }), axis=1),303        df)304  def test_combine_dataframe(self):305    df = pd.DataFrame({'A': [0, 0], 'B': [4, 4]})306    df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})307    take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2308    self._run_test(309        lambda df,310        df2: df.combine(df2, take_smaller),311        df,312        df2,313        nonparallel=True)314  def test_combine_dataframe_fill(self):315    df1 = pd.DataFrame({'A': [0, 0], 'B': [None, 4]})316    df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})317    take_smaller = lambda s1, s2: s1 if s1.sum() < s2.sum() else s2318    self._run_test(319        lambda df1,320        df2: df1.combine(df2, take_smaller, fill_value=-5),321        df1,322        df2,323        nonparallel=True)324  def test_combine_Series(self):325    s1 = pd.Series({'falcon': 330.0, 'eagle': 160.0})326    s2 = pd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})327    self._run_test(328        lambda s1,329        s2: s1.combine(s2, max),330        s1,331        s2,332        nonparallel=True,333        check_proxy=False)334  def test_combine_first_dataframe(self):335    df1 = pd.DataFrame({'A': [None, 0], 'B': [None, 4]})336    df2 = pd.DataFrame({'A': [1, 1], 'B': [3, 3]})337    self._run_test(lambda df1, df2: df1.combine_first(df2), df1, df2)338  def test_combine_first_series(self):339    s1 = pd.Series([1, np.nan])340    s2 = pd.Series([3, 4])341    self._run_test(lambda s1, s2: s1.combine_first(s2), s1, s2)342  def test_add_prefix(self):343    df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [3, 4, 5, 6]})344    s = pd.Series([1, 2, 3, 4])345    self._run_test(lambda df: df.add_prefix('col_'), df)346    self._run_test(lambda s: s.add_prefix('col_'), s)347  def test_add_suffix(self):348    df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [3, 4, 5, 6]})349    s = pd.Series([1, 2, 3, 4])350    self._run_test(lambda df: df.add_suffix('_col'), df)351    self._run_test(lambda s: s.add_prefix('_col'), s)352  def test_set_index(self):353    df = pd.DataFrame({354        # [19, 18, ..]355        'index1': reversed(range(20)),  # [15, 16, .., 0, 1, .., 13, 14]356        'index2': np.roll(range(20), 5),  # ['', 'a', 'bb', ...]357        'values': [chr(ord('a') + i) * i for i in range(20)],358    })359    self._run_test(lambda df: df.set_index(['index1', 'index2']), df)360    self._run_test(lambda df: df.set_index(['index1', 'index2'], drop=True), df)361    self._run_test(lambda df: df.set_index('values'), df)362    self._run_error_test(lambda df: df.set_index('bad'), df)363    self._run_error_test(364        lambda df: df.set_index(['index2', 'bad', 'really_bad']), df)365  def test_set_axis(self):366    df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}, index=['X', 'Y', 'Z'])367    self._run_test(lambda df: df.set_axis(['I', 'II'], axis='columns'), df)368    self._run_test(lambda df: df.set_axis([0, 1], axis=1), df)369    self._run_inplace_test(370        lambda df: df.set_axis(['i', 'ii'], axis='columns'), df)371    with self.assertRaises(NotImplementedError):372      self._run_test(lambda df: df.set_axis(['a', 'b', 'c'], axis='index'), df)373      self._run_test(lambda df: df.set_axis([0, 1, 2], axis=0), df)374  def test_series_set_axis(self):375    s = pd.Series(list(range(3)), index=['X', 'Y', 'Z'])376    with self.assertRaises(NotImplementedError):377      self._run_test(lambda s: s.set_axis(['a', 'b', 'c']), s)378      self._run_test(lambda s: s.set_axis([1, 2, 3]), s)379  def test_series_drop_ignore_errors(self):380    midx = pd.MultiIndex(381        levels=[['lama', 'cow', 'falcon'], ['speed', 'weight', 'length']],382        codes=[[0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 1, 2, 0, 1, 2, 0, 1, 2]])383    s = pd.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1, 0.3], index=midx)384    # drop() requires singleton partitioning unless errors are ignored385    # Add some additional tests here to make sure the implementation works in386    # non-singleton partitioning.387    self._run_test(lambda s: s.drop('lama', level=0, errors='ignore'), s)388    self._run_test(lambda s: s.drop(('cow', 'speed'), errors='ignore'), s)389    self._run_test(lambda s: s.drop('falcon', level=0, errors='ignore'), s)390  def test_dataframe_drop_ignore_errors(self):391    midx = pd.MultiIndex(392        levels=[['lama', 'cow', 'falcon'], ['speed', 'weight', 'length']],393        codes=[[0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 1, 2, 0, 1, 2, 0, 1, 2]])394    df = pd.DataFrame(395        index=midx,396        columns=['big', 'small'],397        data=[[45, 30], [200, 100], [1.5, 1], [30, 20], [250, 150], [1.5, 0.8],398              [320, 250], [1, 0.8], [0.3, 0.2]])399    # drop() requires singleton partitioning unless errors are ignored400    # Add some additional tests here to make sure the implementation works in401    # non-singleton partitioning.402    self._run_test(403        lambda df: df.drop(index='lama', level=0, errors='ignore'), df)404    self._run_test(405        lambda df: df.drop(index=('cow', 'speed'), errors='ignore'), df)406    self._run_test(407        lambda df: df.drop(index='falcon', level=0, errors='ignore'), df)408    self._run_test(409        lambda df: df.drop(index='cow', columns='small', errors='ignore'), df)410  def test_merge(self):411    # This is from the pandas doctests, but fails due to re-indexing being412    # order-sensitive.413    df1 = pd.DataFrame({414        'lkey': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]415    })416    df2 = pd.DataFrame({417        'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]418    })419    self._run_test(420        lambda df1,421        df2: df1.merge(df2, left_on='lkey', right_on='rkey').rename(422            index=lambda x: '*'),423        df1,424        df2,425        nonparallel=True,426        check_proxy=False)427    self._run_test(428        lambda df1,429        df2: df1.merge(430            df2, left_on='lkey', right_on='rkey', suffixes=('_left', '_right')).431        rename(index=lambda x: '*'),432        df1,433        df2,434        nonparallel=True,435        check_proxy=False)436  def test_merge_left_join(self):437    # This is from the pandas doctests, but fails due to re-indexing being438    # order-sensitive.439    df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})440    df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})441    self._run_test(442        lambda df1,443        df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*'),444        df1,445        df2,446        nonparallel=True,447        check_proxy=False)448  def test_merge_on_index(self):449    # This is from the pandas doctests, but fails due to re-indexing being450    # order-sensitive.451    df1 = pd.DataFrame({452        'lkey': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]453    }).set_index('lkey')454    df2 = pd.DataFrame({455        'rkey': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]456    }).set_index('rkey')457    self._run_test(458        lambda df1,459        df2: df1.merge(df2, left_index=True, right_index=True),460        df1,461        df2,462        check_proxy=False)463  def test_merge_same_key(self):464    df1 = pd.DataFrame({465        'key': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]466    })467    df2 = pd.DataFrame({468        'key': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]469    })470    self._run_test(471        lambda df1,472        df2: df1.merge(df2, on='key').rename(index=lambda x: '*'),473        df1,474        df2,475        nonparallel=True,476        check_proxy=False)477    self._run_test(478        lambda df1,479        df2: df1.merge(df2, on='key', suffixes=('_left', '_right')).rename(480            index=lambda x: '*'),481        df1,482        df2,483        nonparallel=True,484        check_proxy=False)485  def test_merge_same_key_doctest(self):486    df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})487    df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})488    self._run_test(489        lambda df1,490        df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*'),491        df1,492        df2,493        nonparallel=True,494        check_proxy=False)495    # Test without specifying 'on'496    self._run_test(497        lambda df1,498        df2: df1.merge(df2, how='left').rename(index=lambda x: '*'),499        df1,500        df2,501        nonparallel=True,502        check_proxy=False)503  def test_merge_same_key_suffix_collision(self):504    df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2], 'a_lsuffix': [5, 6]})505    df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4], 'a_rsuffix': [7, 8]})506    self._run_test(507        lambda df1,508        df2: df1.merge(509            df2, how='left', on='a', suffixes=('_lsuffix', '_rsuffix')).rename(510                index=lambda x: '*'),511        df1,512        df2,513        nonparallel=True,514        check_proxy=False)515    # Test without specifying 'on'516    self._run_test(517        lambda df1,518        df2: df1.merge(df2, how='left', suffixes=('_lsuffix', '_rsuffix')).519        rename(index=lambda x: '*'),520        df1,521        df2,522        nonparallel=True,523        check_proxy=False)524  def test_value_counts_with_nans(self):525    # similar to doctests that verify value_counts, but include nan values to526    # make sure we handle them correctly.527    df = pd.DataFrame({528        'num_legs': [2, 4, 4, 6, np.nan, np.nan],529        'num_wings': [2, 0, 0, 0, np.nan, 2]530    },531                      index=['falcon', 'dog', 'cat', 'ant', 'car', 'plane'])532    self._run_test(lambda df: df.value_counts(), df)533    self._run_test(lambda df: df.value_counts(normalize=True), df)534    if PD_VERSION >= (1, 3):535      # dropna=False is new in pandas 1.3536      # TODO(BEAM-12495): Remove the assertRaises this when the underlying bug537      # in https://github.com/pandas-dev/pandas/issues/36470 is fixed.538      with self.assertRaises(NotImplementedError):539        self._run_test(lambda df: df.value_counts(dropna=False), df)540    # Test the defaults.541    self._run_test(lambda df: df.num_wings.value_counts(), df)542    self._run_test(lambda df: df.num_wings.value_counts(normalize=True), df)543    self._run_test(lambda df: df.num_wings.value_counts(dropna=False), df)544    # Test the combination interactions.545    for normalize in (True, False):546      for dropna in (True, False):547        self._run_test(548            lambda df,549            dropna=dropna,550            normalize=normalize: df.num_wings.value_counts(551                dropna=dropna, normalize=normalize),552            df)553  def test_value_counts_does_not_support_sort(self):554    df = pd.DataFrame({555        'num_legs': [2, 4, 4, 6, np.nan, np.nan],556        'num_wings': [2, 0, 0, 0, np.nan, 2]557    },558                      index=['falcon', 'dog', 'cat', 'ant', 'car', 'plane'])559    with self.assertRaisesRegex(frame_base.WontImplementError,560                                r"value_counts\(sort\=True\)"):561      self._run_test(lambda df: df.value_counts(sort=True), df)562    with self.assertRaisesRegex(frame_base.WontImplementError,563                                r"value_counts\(sort\=True\)"):564      self._run_test(lambda df: df.num_wings.value_counts(sort=True), df)565  def test_series_getitem(self):566    s = pd.Series([x**2 for x in range(10)])567    self._run_test(lambda s: s[...], s)568    self._run_test(lambda s: s[:], s)569    self._run_test(lambda s: s[s < 10], s)570    self._run_test(lambda s: s[lambda s: s < 10], s)571    s.index = s.index.map(float)572    self._run_test(lambda s: s[1.5:6], s)573  @parameterized.expand([574      (pd.Series(range(10)), ),  # unique575      (pd.Series(list(range(100)) + [0]), ),  # non-unique int576      (pd.Series(list(range(100)) + [0]) / 100, ),  # non-unique flt577      (pd.Series(['a', 'b', 'c', 'd']), ),  # unique str578      (pd.Series(['a', 'b', 'a', 'c', 'd']), ),  # non-unique str579  ])580  def test_series_is_unique(self, series):581    self._run_test(lambda s: s.is_unique, series)582  @parameterized.expand([583      (pd.Series(range(10)), ),  # False584      (pd.Series([1, 2, np.nan, 3, np.nan]), ),  # True585      (pd.Series(['a', 'b', 'c', 'd', 'e']), ),  # False586      (pd.Series(['a', 'b', None, 'c', None]), ),  # True587  ])588  def test_series_hasnans(self, series):589    self._run_test(lambda s: s.hasnans, series)590  def test_dataframe_getitem(self):591    df = pd.DataFrame({'A': [x**2 for x in range(6)], 'B': list('abcdef')})592    self._run_test(lambda df: df['A'], df)593    self._run_test(lambda df: df[['A', 'B']], df)594    self._run_test(lambda df: df[:], df)595    self._run_test(lambda df: df[df.A < 10], df)596    df.index = df.index.map(float)597    self._run_test(lambda df: df[1.5:4], df)598  def test_loc(self):599    dates = pd.date_range('1/1/2000', periods=8)600    # TODO(BEAM-11757): We do not preserve the freq attribute on a DateTime601    # index602    dates.freq = None603    df = pd.DataFrame(604        np.arange(32).reshape((8, 4)),605        index=dates,606        columns=['A', 'B', 'C', 'D'])607    self._run_test(lambda df: df.loc[:], df)608    self._run_test(lambda df: df.loc[:, 'A'], df)609    self._run_test(lambda df: df.loc[:dates[3]], df)610    self._run_test(lambda df: df.loc[df.A > 10], df)611    self._run_test(lambda df: df.loc[lambda df: df.A > 10], df)612    self._run_test(lambda df: df.C.loc[df.A > 10], df)613    self._run_test(lambda df, s: df.loc[s.loc[1:3]], df, pd.Series(dates))614  def test_append_sort(self):615    # yapf: disable616    df1 = pd.DataFrame({'int': [1, 2, 3], 'str': ['a', 'b', 'c']},617                       columns=['int', 'str'],618                       index=[1, 3, 5])619    df2 = pd.DataFrame({'int': [4, 5, 6], 'str': ['d', 'e', 'f']},620                       columns=['str', 'int'],621                       index=[2, 4, 6])622    # yapf: enable623    self._run_test(lambda df1, df2: df1.append(df2, sort=True), df1, df2)624    self._run_test(lambda df1, df2: df1.append(df2, sort=False), df1, df2)625    self._run_test(lambda df1, df2: df2.append(df1, sort=True), df1, df2)626    self._run_test(lambda df1, df2: df2.append(df1, sort=False), df1, df2)627  def test_smallest_largest(self):628    df = pd.DataFrame({'A': [1, 1, 2, 2], 'B': [2, 3, 5, 7]})629    self._run_test(lambda df: df.nlargest(1, 'A', keep='all'), df)630    self._run_test(lambda df: df.nsmallest(3, 'A', keep='all'), df)631    self._run_test(lambda df: df.nlargest(3, ['A', 'B'], keep='all'), df)632  def test_series_cov_corr(self):633    for s in [pd.Series([1, 2, 3]),634              pd.Series(range(100)),635              pd.Series([x**3 for x in range(-50, 50)])]:636      self._run_test(lambda s: s.std(), s)637      self._run_test(lambda s: s.var(), s)638      self._run_test(lambda s: s.corr(s), s)639      self._run_test(lambda s: s.corr(s + 1), s)640      self._run_test(lambda s: s.corr(s * s), s)641      self._run_test(lambda s: s.cov(s * s), s)642      self._run_test(lambda s: s.skew(), s)643      self._run_test(lambda s: s.kurtosis(), s)644      self._run_test(lambda s: s.kurt(), s)645  def test_dataframe_cov_corr(self):646    df = pd.DataFrame(np.random.randn(20, 3), columns=['a', 'b', 'c'])647    df.loc[df.index[:5], 'a'] = np.nan648    df.loc[df.index[5:10], 'b'] = np.nan649    self._run_test(lambda df: df.corr(), df)650    self._run_test(lambda df: df.cov(), df)651    self._run_test(lambda df: df.corr(min_periods=12), df)652    self._run_test(lambda df: df.cov(min_periods=12), df)653    self._run_test(lambda df: df.corrwith(df.a), df)654    self._run_test(lambda df: df[['a', 'b']].corrwith(df[['b', 'c']]), df)655    df2 = pd.DataFrame(np.random.randn(20, 3), columns=['a', 'b', 'c'])656    self._run_test(657        lambda df, df2: df.corrwith(df2, axis=1), df, df2, check_proxy=False)658  def test_corrwith_bad_axis(self):659    df = pd.DataFrame({'a': range(3), 'b': range(3, 6), 'c': range(6, 9)})660    self._run_error_test(lambda df: df.corrwith(df.a, axis=2), df)661    self._run_error_test(lambda df: df.corrwith(df, axis=5), df)662  @unittest.skipIf(PD_VERSION < (1, 2), "na_action added in pandas 1.2.0")663  def test_applymap_na_action(self):664    # Replicates a doctest for na_action which is incompatible with665    # doctest framework666    df = pd.DataFrame([[pd.NA, 2.12], [3.356, 4.567]])667    self._run_test(668        lambda df: df.applymap(lambda x: len(str(x)), na_action='ignore'),669        df,670        # TODO: generate proxy using naive type inference on fn671        check_proxy=False)672  def test_dataframe_eval_query(self):673    df = pd.DataFrame(np.random.randn(20, 3), columns=['a', 'b', 'c'])674    self._run_test(lambda df: df.eval('foo = a + b - c'), df)675    self._run_test(lambda df: df.query('a > b + c'), df)676    self._run_inplace_test(lambda df: df.eval('foo = a + b - c'), df)677    # Verify that attempting to access locals raises a useful error678    deferred_df = frame_base.DeferredFrame.wrap(679        expressions.ConstantExpression(df, df[0:0]))680    self.assertRaises(681        NotImplementedError, lambda: deferred_df.eval('foo = a + @b - c'))682    self.assertRaises(683        NotImplementedError, lambda: deferred_df.query('a > @b + c'))684  def test_index_name_assignment(self):685    df = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})686    df = df.set_index(['a', 'b'], drop=False)687    def change_index_names(df):688      df.index.names = ['A', None]689    self._run_inplace_test(change_index_names, df)690  def test_quantile(self):691    df = pd.DataFrame(692        np.array([[1, 1], [2, 10], [3, 100], [4, 100]]), columns=['a', 'b'])693    self._run_test(694        lambda df: df.quantile(0.1, axis='columns'), df, check_proxy=False)695    self._run_test(696        lambda df: df.quantile(0.1, axis='columns'), df, check_proxy=False)697    with self.assertRaisesRegex(frame_base.WontImplementError,698                                r"df\.quantile\(q=0\.1, axis='columns'\)"):699      self._run_test(lambda df: df.quantile([0.1, 0.5], axis='columns'), df)700  def test_dataframe_melt(self):701    df = pd.DataFrame({702        'A': {703            0: 'a', 1: 'b', 2: 'c'704        },705        'B': {706            0: 1, 1: 3, 2: 5707        },708        'C': {709            0: 2, 1: 4, 2: 6710        }711    })712    self._run_test(713        lambda df: df.melt(id_vars=['A'], value_vars=['B'], ignore_index=False),714        df)715    self._run_test(716        lambda df: df.melt(717            id_vars=['A'], value_vars=['B', 'C'], ignore_index=False),718        df)719    self._run_test(720        lambda df: df.melt(721            id_vars=['A'],722            value_vars=['B'],723            var_name='myVarname',724            value_name='myValname',725            ignore_index=False),726        df)727    self._run_test(728        lambda df: df.melt(729            id_vars=['A'], value_vars=['B', 'C'], ignore_index=False),730        df)731    df.columns = [list('ABC'), list('DEF')]732    self._run_test(733        lambda df: df.melt(734            col_level=0, id_vars=['A'], value_vars=['B'], ignore_index=False),735        df)736    self._run_test(737        lambda df: df.melt(738            id_vars=[('A', 'D')], value_vars=[('B', 'E')], ignore_index=False),739        df)740  def test_fillna_columns(self):741    df = pd.DataFrame(742        [[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1], [np.nan, np.nan, np.nan, 5],743         [np.nan, 3, np.nan, 4], [3, np.nan, np.nan, 4]],744        columns=list('ABCD'))745    self._run_test(lambda df: df.fillna(method='ffill', axis='columns'), df)746    self._run_test(747        lambda df: df.fillna(method='ffill', axis='columns', limit=1), df)748    self._run_test(749        lambda df: df.fillna(method='bfill', axis='columns', limit=1), df)750    # Intended behavior is unclear here. See751    # https://github.com/pandas-dev/pandas/issues/40989752    # self._run_test(lambda df: df.fillna(axis='columns', value=100,753    #                                     limit=2), df)754  def test_dataframe_fillna_dataframe_as_value(self):755    df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1],756                       [np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]],757                      columns=list("ABCD"))758    df2 = pd.DataFrame(np.zeros((4, 4)), columns=list("ABCE"))759    self._run_test(lambda df, df2: df.fillna(df2), df, df2)760  def test_dataframe_fillna_series_as_value(self):761    df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1],762                       [np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]],763                      columns=list("ABCD"))764    s = pd.Series(range(4), index=list("ABCE"))765    self._run_test(lambda df, s: df.fillna(s), df, s)766  def test_series_fillna_series_as_value(self):767    df = pd.DataFrame([[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1],768                       [np.nan, np.nan, np.nan, 5], [np.nan, 3, np.nan, 4]],769                      columns=list("ABCD"))770    df2 = pd.DataFrame(np.zeros((4, 4)), columns=list("ABCE"))771    self._run_test(lambda df, df2: df.A.fillna(df2.A), df, df2)772  def test_append_verify_integrity(self):773    df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10))774    df2 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(9, 19))775    self._run_error_test(776        lambda s1,777        s2: s1.append(s2, verify_integrity=True),778        df1['A'],779        df2['A'],780        construction_time=False)781    self._run_error_test(782        lambda df1,783        df2: df1.append(df2, verify_integrity=True),784        df1,785        df2,786        construction_time=False)787  def test_categorical_groupby(self):788    df = pd.DataFrame({'A': np.arange(6), 'B': list('aabbca')})789    df['B'] = df['B'].astype(pd.CategoricalDtype(list('cab')))790    df = df.set_index('B')791    # TODO(BEAM-11190): These aggregations can be done in index partitions, but792    # it will require a little more complex logic793    self._run_test(lambda df: df.groupby(level=0).sum(), df, nonparallel=True)794    self._run_test(lambda df: df.groupby(level=0).mean(), df, nonparallel=True)795  def test_dataframe_sum_nonnumeric_raises(self):796    # Attempting a numeric aggregation with the str column present should797    # raise, and suggest the numeric_only argument798    with self.assertRaisesRegex(frame_base.WontImplementError, 'numeric_only'):799      self._run_test(lambda df: df.sum(), GROUPBY_DF)800    # numeric_only=True should work801    self._run_test(lambda df: df.sum(numeric_only=True), GROUPBY_DF)802    # projecting only numeric columns should too803    self._run_test(lambda df: df[['foo', 'bar']].sum(), GROUPBY_DF)804  def test_insert(self):805    df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})806    self._run_inplace_test(lambda df: df.insert(1, 'C', df.A * 2), df)807    self._run_inplace_test(808        lambda df: df.insert(0, 'foo', pd.Series([8], index=[1])),809        df,810        check_proxy=False)811    self._run_inplace_test(lambda df: df.insert(2, 'bar', value='q'), df)812  def test_insert_does_not_support_list_value(self):813    df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})814    with self.assertRaisesRegex(frame_base.WontImplementError,815                                r"insert\(value=list\)"):816      self._run_inplace_test(lambda df: df.insert(1, 'C', [7, 8, 9]), df)817  def test_drop_duplicates(self):818    df = pd.DataFrame({819        'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],820        'style': ['cup', 'cup', 'cup', 'pack', 'pack'],821        'rating': [4, 4, 3.5, 15, 5]822    })823    self._run_test(lambda df: df.drop_duplicates(keep=False), df)824    self._run_test(825        lambda df: df.drop_duplicates(subset=['brand'], keep=False), df)826    self._run_test(827        lambda df: df.drop_duplicates(subset=['brand', 'style'], keep=False),828        df)829  @parameterized.expand([830      (831          lambda base: base.from_dict({832              'col_1': [3, 2, 1, 0], 'col_2': ['a', 'b', 'c', 'd']833          }), ),834      (835          lambda base: base.from_dict({836              'row_1': [3, 2, 1, 0], 'row_2': ['a', 'b', 'c', 'd']837          },838                                      orient='index'), ),839      (840          lambda base: base.from_records(841              np.array([(3, 'a'), (2, 'b'), (1, 'c'), (0, 'd')],842                       dtype=[('col_1', 'i4'), ('col_2', 'U1')])), ),843  ])844  def test_create_methods(self, func):845    expected = func(pd.DataFrame)846    deferred_df = func(frames.DeferredDataFrame)847    actual = expressions.Session({}).evaluate(deferred_df._expr)848    pd.testing.assert_frame_equal(actual, expected)849  def test_replace(self):850    # verify a replace() doctest case that doesn't quite work in Beam as it uses851    # the default method='pad'852    df = pd.DataFrame({'A': ['bat', 'foo', 'bait'], 'B': ['abc', 'bar', 'xyz']})853    self._run_test(854        lambda df: df.replace(855            regex={856                r'^ba.$': 'new', 'foo': 'xyz'857            }, method=None),858        df)859  def test_sample_columns(self):860    df = pd.DataFrame({861        'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],862        'style': ['cup', 'cup', 'cup', 'pack', 'pack'],863        'rating': [4, 4, 3.5, 15, 5]864    })865    self._run_test(lambda df: df.sample(axis=1, n=2, random_state=1), df)866    self._run_error_test(lambda df: df.sample(axis=1, n=10, random_state=2), df)867    self._run_test(868        lambda df: df.sample(axis=1, n=10, random_state=3, replace=True), df)869  def test_cat(self):870    # Replicate the doctests from CategorigcalAccessor871    # These tests don't translate into pandas_doctests_test.py because it872    # tries to use astype("category") in Beam, which makes a non-deferred873    # column type.874    s = pd.Series(list("abbccc")).astype("category")875    self._run_test(lambda s: s.cat.rename_categories(list("cba")), s)876    self._run_test(lambda s: s.cat.reorder_categories(list("cba")), s)877    self._run_test(lambda s: s.cat.add_categories(["d", "e"]), s)878    self._run_test(lambda s: s.cat.remove_categories(["a", "c"]), s)879    self._run_test(lambda s: s.cat.set_categories(list("abcde")), s)880    self._run_test(lambda s: s.cat.as_ordered(), s)881    self._run_test(lambda s: s.cat.as_unordered(), s)882    self._run_test(lambda s: s.cat.codes, s)883  @parameterized.expand(frames.ELEMENTWISE_DATETIME_PROPERTIES)884  def test_dt_property(self, prop_name):885    # Generate a series with a lot of unique timestamps886    s = pd.Series(887        pd.date_range('1/1/2000', periods=100, freq='m') +888        pd.timedelta_range(start='0 days', end='70 days', periods=100))889    self._run_test(lambda s: getattr(s.dt, prop_name), s)890  @parameterized.expand([891      ('month_name', {}),892      ('day_name', {}),893      ('normalize', {}),894      (895          'strftime',896          {897              'date_format': '%B %d, %Y, %r'898          },899      ),900      ('tz_convert', {901          'tz': 'Europe/Berlin'902      }),903  ])904  def test_dt_method(self, op, kwargs):905    # Generate a series with a lot of unique timestamps906    s = pd.Series(907        pd.date_range(908            '1/1/2000', periods=100, freq='m', tz='America/Los_Angeles') +909        pd.timedelta_range(start='0 days', end='70 days', periods=100))910    self._run_test(lambda s: getattr(s.dt, op)(**kwargs), s)911  def test_dt_tz_localize_ambiguous_series(self):912    # This replicates a dt.tz_localize doctest:913    #   s.tz_localize('CET', ambiguous=np.array([True, True, False]))914    # But using a DeferredSeries instead of a np array915    s = pd.to_datetime(916        pd.Series([917            '2018-10-28 01:20:00', '2018-10-28 02:36:00', '2018-10-28 03:46:00'918        ]))919    ambiguous = pd.Series([True, True, False], index=s.index)920    self._run_test(921        lambda s,922        ambiguous: s.dt.tz_localize('CET', ambiguous=ambiguous),923        s,924        ambiguous)925  def test_dt_tz_localize_nonexistent(self):926    # This replicates dt.tz_localize doctests that exercise `nonexistent`.927    # However they specify ambiguous='NaT' because the default,928    # ambiguous='infer', is not supported.929    s = pd.to_datetime(930        pd.Series(['2015-03-29 02:30:00', '2015-03-29 03:30:00']))931    self._run_test(932        lambda s: s.dt.tz_localize(933            'Europe/Warsaw', ambiguous='NaT', nonexistent='shift_forward'),934        s)935    self._run_test(936        lambda s: s.dt.tz_localize(937            'Europe/Warsaw', ambiguous='NaT', nonexistent='shift_backward'),938        s)939    self._run_test(940        lambda s: s.dt.tz_localize(941            'Europe/Warsaw', ambiguous='NaT', nonexistent=pd.Timedelta('1H')),942        s)943# pandas doesn't support kurtosis on GroupBys:944# https://github.com/pandas-dev/pandas/issues/40139945ALL_GROUPING_AGGREGATIONS = sorted(946    set(frames.ALL_AGGREGATIONS) - set(('kurt', 'kurtosis')))947class GroupByTest(_AbstractFrameTest):948  """Tests for DataFrame/Series GroupBy operations."""949  @parameterized.expand(ALL_GROUPING_AGGREGATIONS)950  def test_groupby_agg(self, agg_type):951    if agg_type == 'describe' and PD_VERSION < (1, 2):952      self.skipTest(953          "BEAM-12366: proxy generation of DataFrameGroupBy.describe "954          "fails in pandas < 1.2")955    self._run_test(956        lambda df: df.groupby('group').agg(agg_type),957        GROUPBY_DF,958        check_proxy=False)959  @parameterized.expand(ALL_GROUPING_AGGREGATIONS)960  def test_groupby_with_filter(self, agg_type):961    if agg_type == 'describe' and PD_VERSION < (1, 2):962      self.skipTest(963          "BEAM-12366: proxy generation of DataFrameGroupBy.describe "964          "fails in pandas < 1.2")965    self._run_test(966        lambda df: getattr(df[df.foo > 30].groupby('group'), agg_type)(),967        GROUPBY_DF,968        check_proxy=False)969  @parameterized.expand(ALL_GROUPING_AGGREGATIONS)970  def test_groupby(self, agg_type):971    if agg_type == 'describe' and PD_VERSION < (1, 2):972      self.skipTest(973          "BEAM-12366: proxy generation of DataFrameGroupBy.describe "974          "fails in pandas < 1.2")975    self._run_test(976        lambda df: getattr(df.groupby('group'), agg_type)(),977        GROUPBY_DF,978        check_proxy=False)979  @parameterized.expand(ALL_GROUPING_AGGREGATIONS)980  def test_groupby_series(self, agg_type):981    if agg_type == 'describe' and PD_VERSION < (1, 2):982      self.skipTest(983          "BEAM-12366: proxy generation of DataFrameGroupBy.describe "984          "fails in pandas < 1.2")985    self._run_test(986        lambda df: getattr(df[df.foo > 40].groupby(df.group), agg_type)(),987        GROUPBY_DF,988        check_proxy=False)989  def test_groupby_user_guide(self):990    # Example from https://pandas.pydata.org/docs/user_guide/groupby.html991    arrays = [['bar', 'bar', 'baz', 'baz', 'foo', 'foo', 'qux', 'qux'],992              ['one', 'two', 'one', 'two', 'one', 'two', 'one', 'two']]993    index = pd.MultiIndex.from_arrays(arrays, names=['first', 'second'])994    df = pd.DataFrame({995        'A': [1, 1, 1, 1, 2, 2, 3, 3], 'B': np.arange(8)996    },997                      index=index)998    self._run_test(lambda df: df.groupby(['second', 'A']).sum(), df)999  @parameterized.expand(ALL_GROUPING_AGGREGATIONS)1000  def test_groupby_project_series(self, agg_type):1001    df = GROUPBY_DF1002    if agg_type == 'describe':1003      self.skipTest(1004          "BEAM-12366: proxy generation of SeriesGroupBy.describe "1005          "fails")1006    if agg_type in ('corr', 'cov'):1007      self.skipTest(1008          "BEAM-12367: SeriesGroupBy.{corr, cov} do not raise the "1009          "expected error.")1010    self._run_test(lambda df: getattr(df.groupby('group').foo, agg_type)(), df)1011    self._run_test(lambda df: getattr(df.groupby('group').bar, agg_type)(), df)1012    self._run_test(1013        lambda df: getattr(df.groupby('group')['foo'], agg_type)(), df)1014    self._run_test(1015        lambda df: getattr(df.groupby('group')['bar'], agg_type)(), df)1016  @parameterized.expand(ALL_GROUPING_AGGREGATIONS)1017  def test_groupby_project_dataframe(self, agg_type):1018    if agg_type == 'describe' and PD_VERSION < (1, 2):1019      self.skipTest(1020          "BEAM-12366: proxy generation of DataFrameGroupBy.describe "1021          "fails in pandas < 1.2")1022    self._run_test(1023        lambda df: getattr(df.groupby('group')[['bar', 'baz']], agg_type)(),1024        GROUPBY_DF,1025        check_proxy=False)1026  def test_groupby_errors_bad_projection(self):1027    df = GROUPBY_DF1028    # non-existent projection column1029    self._run_error_test(1030        lambda df: df.groupby('group')[['bar', 'baz']].bar.median(), df)1031    self._run_error_test(lambda df: df.groupby('group')[['bad']].median(), df)1032    self._run_error_test(lambda df: df.groupby('group').bad.median(), df)1033    self._run_error_test(1034        lambda df: df.groupby('group')[['bar', 'baz']].bar.sum(), df)1035    self._run_error_test(lambda df: df.groupby('group')[['bat']].sum(), df)1036    self._run_error_test(lambda df: df.groupby('group').bat.sum(), df)1037  def test_groupby_errors_non_existent_label(self):1038    df = GROUPBY_DF1039    # non-existent grouping label1040    self._run_error_test(1041        lambda df: df.groupby(['really_bad', 'foo', 'bad']).foo.sum(), df)1042    self._run_error_test(lambda df: df.groupby('bad').foo.sum(), df)1043  def test_groupby_callable(self):1044    df = GROUPBY_DF1045    self._run_test(lambda df: df.groupby(lambda x: x % 2).foo.sum(), df)1046    self._run_test(lambda df: df.groupby(lambda x: x % 5).median(), df)1047  def test_groupby_apply(self):1048    df = GROUPBY_DF1049    def median_sum_fn(x):1050      return (x.foo + x.bar).median()1051    # Note this is the same as DataFrameGroupBy.describe. Using it here is1052    # just a convenient way to test apply() with a user fn that returns a Series1053    describe = lambda df: df.describe()1054    self._run_test(lambda df: df.groupby('group').foo.apply(describe), df)1055    self._run_test(1056        lambda df: df.groupby('group')[['foo', 'bar']].apply(describe), df)1057    self._run_test(lambda df: df.groupby('group').apply(median_sum_fn), df)1058    self._run_test(1059        lambda df: df.set_index('group').foo.groupby(level=0).apply(describe),1060        df)1061    self._run_test(lambda df: df.groupby(level=0).apply(median_sum_fn), df)1062    self._run_test(lambda df: df.groupby(lambda x: x % 3).apply(describe), df)1063    self._run_test(1064        lambda df: df.bar.groupby(lambda x: x % 3).apply(describe), df)1065    self._run_test(1066        lambda df: df.set_index(['str', 'group', 'bool']).groupby(1067            level='group').apply(median_sum_fn),1068        df)1069  def test_groupby_apply_preserves_column_order(self):1070    df = GROUPBY_DF1071    self._run_test(1072        lambda df: df[['foo', 'group', 'bar']].groupby('group').apply(1073            lambda x: x),1074        df)1075  def test_groupby_transform(self):1076    df = pd.DataFrame({1077        "Date": [1078            "2015-05-08",1079            "2015-05-07",1080            "2015-05-06",1081            "2015-05-05",1082            "2015-05-08",1083            "2015-05-07",1084            "2015-05-06",1085            "2015-05-05"1086        ],1087        "Data": [5, 8, 6, 1, 50, 100, 60, 120],1088    })1089    self._run_test(lambda df: df.groupby('Date')['Data'].transform(np.sum), df)1090    self._run_test(1091        lambda df: df.groupby('Date')['Data'].transform(1092            lambda x: (x - x.mean()) / x.std()),1093        df)1094  def test_groupby_apply_modified_index(self):1095    df = GROUPBY_DF1096    # If apply fn modifies the index then the output will include the grouped1097    # index1098    self._run_test(1099        lambda df: df.groupby('group').apply(1100            lambda x: x[x.foo > x.foo.median()]),1101        df)1102  @unittest.skip('BEAM-11710')1103  def test_groupby_aggregate_grouped_column(self):1104    df = pd.DataFrame({1105        'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],1106        'foo': [None if i % 11 == 0 else i for i in range(100)],1107        'bar': [None if i % 7 == 0 else 99 - i for i in range(100)],1108        'baz': [None if i % 13 == 0 else i * 2 for i in range(100)],1109    })1110    self._run_test(lambda df: df.groupby('group').group.count(), df)1111    self._run_test(lambda df: df.groupby('group')[['group', 'bar']].count(), df)1112    self._run_test(1113        lambda df: df.groupby('group')[['group', 'bar']].apply(1114            lambda x: x.describe()),1115        df)1116  @parameterized.expand((x, ) for x in [1117      0,1118      [1],1119      3,1120      [0, 3],1121      [2, 1],1122      ['foo', 0],1123      [1, 'str'],1124      [3, 0, 2, 1],1125  ])1126  def test_groupby_level_agg(self, level):1127    df = GROUPBY_DF.set_index(['group', 'foo', 'bar', 'str'], drop=False)1128    self._run_test(lambda df: df.groupby(level=level).bar.max(), df)1129    self._run_test(1130        lambda df: df.groupby(level=level).sum(numeric_only=True), df)1131    self._run_test(1132        lambda df: df.groupby(level=level).apply(1133            lambda x: (x.foo + x.bar).median()),1134        df)1135  @unittest.skipIf(PD_VERSION < (1, 1), "drop_na added in pandas 1.1.0")1136  def test_groupby_count_na(self):1137    # Verify we can do a groupby.count() that doesn't drop NaN values1138    self._run_test(1139        lambda df: df.groupby('foo', dropna=True).bar.count(), GROUPBY_DF)1140    self._run_test(1141        lambda df: df.groupby('foo', dropna=False).bar.count(), GROUPBY_DF)1142  def test_groupby_sum_min_count(self):1143    df = pd.DataFrame({1144        'good': [1, 2, 3, np.nan],1145        'bad': [np.nan, np.nan, np.nan, 4],1146        'group': ['a', 'b', 'a', 'b']1147    })1148    self._run_test(lambda df: df.groupby('group').sum(min_count=2), df)1149  def test_groupby_dtypes(self):1150    self._run_test(1151        lambda df: df.groupby('group').dtypes, GROUPBY_DF, check_proxy=False)1152    self._run_test(1153        lambda df: df.groupby(level=0).dtypes, GROUPBY_DF, check_proxy=False)1154  @parameterized.expand(ALL_GROUPING_AGGREGATIONS)1155  def test_dataframe_groupby_series(self, agg_type):1156    if agg_type == 'describe' and PD_VERSION < (1, 2):1157      self.skipTest(1158          "BEAM-12366: proxy generation of DataFrameGroupBy.describe "1159          "fails in pandas < 1.2")1160    self._run_test(1161        lambda df: df[df.foo > 40].groupby(df.group).agg(agg_type),1162        GROUPBY_DF,1163        check_proxy=False)1164    self._run_test(1165        lambda df: df[df.foo > 40].groupby(df.foo % 3).agg(agg_type),1166        GROUPBY_DF,1167        check_proxy=False)1168  @parameterized.expand(ALL_GROUPING_AGGREGATIONS)1169  def test_series_groupby_series(self, agg_type):1170    if agg_type == 'describe':1171      self.skipTest(1172          "BEAM-12366: proxy generation of SeriesGroupBy.describe "1173          "fails")1174    if agg_type in ('corr', 'cov'):1175      self.skipTest(1176          "BEAM-12367: SeriesGroupBy.{corr, cov} do not raise the "1177          "expected error.")1178    self._run_test(1179        lambda df: df[df.foo < 40].bar.groupby(df.group).agg(agg_type),1180        GROUPBY_DF)1181    self._run_test(1182        lambda df: df[df.foo < 40].bar.groupby(df.foo % 3).agg(agg_type),1183        GROUPBY_DF)1184  def test_groupby_series_apply(self):1185    df = GROUPBY_DF1186    def median_sum_fn(x):1187      return (x.foo + x.bar).median()1188    # Note this is the same as DataFrameGroupBy.describe. Using it here is1189    # just a convenient way to test apply() with a user fn that returns a Series1190    describe = lambda df: df.describe()1191    self._run_test(lambda df: df.groupby(df.group).foo.apply(describe), df)1192    self._run_test(1193        lambda df: df.groupby(df.group)[['foo', 'bar']].apply(describe), df)1194    self._run_test(lambda df: df.groupby(df.group).apply(median_sum_fn), df)1195  def test_groupby_multiindex_keep_nans(self):1196    # Due to https://github.com/pandas-dev/pandas/issues/364701197    # groupby(dropna=False) doesn't work with multiple columns1198    with self.assertRaisesRegex(NotImplementedError, "BEAM-12495"):1199      self._run_test(1200          lambda df: df.groupby(['foo', 'bar'], dropna=False).sum(), GROUPBY_DF)1201class AggregationTest(_AbstractFrameTest):1202  """Tests for global aggregation methods on DataFrame/Series."""1203  # corr, cov on Series require an other argument1204  @parameterized.expand(1205      sorted(set(frames.ALL_AGGREGATIONS) - set(['corr', 'cov'])))1206  def test_series_agg(self, agg_method):1207    s = pd.Series(list(range(16)))1208    nonparallel = agg_method in (1209        'quantile', 'mean', 'describe', 'median', 'sem', 'mad')1210    # TODO(BEAM-12379): max and min produce the wrong proxy1211    check_proxy = agg_method not in ('max', 'min')1212    self._run_test(1213        lambda s: s.agg(agg_method),1214        s,1215        nonparallel=nonparallel,1216        check_proxy=check_proxy)1217  # corr, cov on Series require an other argument1218  # Series.size is a property1219  @parameterized.expand(1220      sorted(set(frames.ALL_AGGREGATIONS) - set(['corr', 'cov', 'size'])))1221  def test_series_agg_method(self, agg_method):1222    s = pd.Series(list(range(16)))1223    nonparallel = agg_method in (1224        'quantile', 'mean', 'describe', 'median', 'sem', 'mad')1225    # TODO(BEAM-12379): max and min produce the wrong proxy1226    check_proxy = agg_method not in ('max', 'min')1227    self._run_test(1228        lambda s: getattr(s, agg_method)(),1229        s,1230        nonparallel=nonparallel,1231        check_proxy=check_proxy)1232  @parameterized.expand(frames.ALL_AGGREGATIONS)1233  def test_dataframe_agg(self, agg_method):1234    df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})1235    nonparallel = agg_method in (1236        'quantile', 'mean', 'describe', 'median', 'sem', 'mad')1237    # TODO(BEAM-12379): max and min produce the wrong proxy1238    check_proxy = agg_method not in ('max', 'min')1239    self._run_test(1240        lambda df: df.agg(agg_method),1241        df,1242        nonparallel=nonparallel,1243        check_proxy=check_proxy)1244  # DataFrame.size is a property1245  @parameterized.expand(sorted(set(frames.ALL_AGGREGATIONS) - set(['size'])))1246  def test_dataframe_agg_method(self, agg_method):1247    df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})1248    nonparallel = agg_method in (1249        'quantile', 'mean', 'describe', 'median', 'sem', 'mad')1250    # TODO(BEAM-12379): max and min produce the wrong proxy1251    check_proxy = agg_method not in ('max', 'min')1252    self._run_test(1253        lambda df: getattr(df, agg_method)(),1254        df,1255        nonparallel=nonparallel,1256        check_proxy=check_proxy)1257  def test_series_agg_modes(self):1258    s = pd.Series(list(range(16)))1259    self._run_test(lambda s: s.agg('sum'), s)1260    self._run_test(lambda s: s.agg(['sum']), s)1261    self._run_test(lambda s: s.agg(['sum', 'mean']), s, nonparallel=True)1262    self._run_test(lambda s: s.agg(['mean']), s, nonparallel=True)1263    self._run_test(lambda s: s.agg('mean'), s, nonparallel=True)1264  def test_dataframe_agg_modes(self):1265    df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})1266    self._run_test(lambda df: df.agg('sum'), df)1267    self._run_test(lambda df: df.agg(['sum', 'mean']), df, nonparallel=True)1268    self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df)1269    self._run_test(1270        lambda df: df.agg({1271            'A': 'sum', 'B': 'mean'1272        }), df, nonparallel=True)1273    self._run_test(1274        lambda df: df.agg({'A': ['sum', 'mean']}), df, nonparallel=True)1275    self._run_test(1276        lambda df: df.agg({1277            'A': ['sum', 'mean'], 'B': 'min'1278        }),1279        df,1280        nonparallel=True)1281  def test_series_agg_level(self):1282    self._run_test(1283        lambda df: df.set_index(['group', 'foo']).bar.count(level=0),1284        GROUPBY_DF)1285    self._run_test(1286        lambda df: df.set_index(['group', 'foo']).bar.max(level=0), GROUPBY_DF)1287    self._run_test(1288        lambda df: df.set_index(['group', 'foo']).bar.median(level=0),1289        GROUPBY_DF)1290    self._run_test(1291        lambda df: df.set_index(['foo', 'group']).bar.count(level=1),1292        GROUPBY_DF)1293    self._run_test(1294        lambda df: df.set_index(['group', 'foo']).bar.max(level=1), GROUPBY_DF)1295    self._run_test(1296        lambda df: df.set_index(['group', 'foo']).bar.max(level='foo'),1297        GROUPBY_DF)1298    self._run_test(1299        lambda df: df.set_index(['group', 'foo']).bar.median(level=1),1300        GROUPBY_DF)1301  def test_dataframe_agg_level(self):1302    self._run_test(1303        lambda df: df.set_index(['group', 'foo']).count(level=0), GROUPBY_DF)1304    self._run_test(1305        lambda df: df.set_index(['group', 'foo']).max(1306            level=0, numeric_only=False),1307        GROUPBY_DF,1308        check_proxy=False)1309    # pandas implementation doesn't respect numeric_only argument here1310    # (https://github.com/pandas-dev/pandas/issues/40788), it1311    # always acts as if numeric_only=True. Our implmentation respects it so we1312    # need to make it explicit.1313    self._run_test(1314        lambda df: df.set_index(['group', 'foo']).sum(1315            level=0, numeric_only=True),1316        GROUPBY_DF)1317    self._run_test(1318        lambda df: df.set_index(['group', 'foo'])[['bar']].count(level=1),1319        GROUPBY_DF)1320    self._run_test(1321        lambda df: df.set_index(['group', 'foo']).count(level=1), GROUPBY_DF)1322    self._run_test(1323        lambda df: df.set_index(['group', 'foo']).max(1324            level=1, numeric_only=False),1325        GROUPBY_DF,1326        check_proxy=False)1327    # sum with str columns is order-sensitive1328    self._run_test(1329        lambda df: df.set_index(['group', 'foo']).sum(1330            level=1, numeric_only=True),1331        GROUPBY_DF)1332    self._run_test(1333        lambda df: df.set_index(['group', 'foo']).median(1334            level=0, numeric_only=True),1335        GROUPBY_DF)1336    self._run_test(1337        lambda df: df.drop('str', axis=1).set_index(['foo', 'group']).median(1338            level=1, numeric_only=True),1339        GROUPBY_DF)1340  def test_series_agg_multifunc_level(self):1341    # level= is ignored for multiple agg fns1342    self._run_test(1343        lambda df: df.set_index(['group', 'foo']).bar.agg(['min', 'max'],1344                                                          level=0),1345        GROUPBY_DF)1346  def test_dataframe_agg_multifunc_level(self):1347    # level= is ignored for multiple agg fns1348    self._run_test(1349        lambda df: df.set_index(['group', 'foo']).agg(['min', 'max'], level=0),1350        GROUPBY_DF,1351        check_proxy=False)1352  @parameterized.expand([(True, ), (False, )])1353  @unittest.skipIf(1354      PD_VERSION < (1, 2),1355      "pandas 1.1.0 produces different dtypes for these examples")1356  def test_dataframe_agg_numeric_only(self, numeric_only):1357    # Note other aggregation functions can fail on this input with1358    # numeric_only={False,None}. These are the only ones that actually work for1359    # the string inputs.1360    self._run_test(1361        lambda df: df.max(numeric_only=numeric_only),1362        GROUPBY_DF,1363        check_proxy=False)1364    self._run_test(1365        lambda df: df.min(numeric_only=numeric_only),1366        GROUPBY_DF,1367        check_proxy=False)1368  @unittest.skip(1369      "pandas implementation doesn't respect numeric_only= with "1370      "level= (https://github.com/pandas-dev/pandas/issues/40788)")1371  def test_dataframe_agg_level_numeric_only(self):1372    self._run_test(1373        lambda df: df.set_index('foo').sum(level=0, numeric_only=True),1374        GROUPBY_DF)1375    self._run_test(1376        lambda df: df.set_index('foo').max(level=0, numeric_only=True),1377        GROUPBY_DF)1378    self._run_test(1379        lambda df: df.set_index('foo').mean(level=0, numeric_only=True),1380        GROUPBY_DF)1381    self._run_test(1382        lambda df: df.set_index('foo').median(level=0, numeric_only=True),1383        GROUPBY_DF)1384  def test_dataframe_agg_bool_only(self):1385    df = pd.DataFrame({1386        'all': [True for i in range(10)],1387        'any': [i % 3 == 0 for i in range(10)],1388        'int': range(10)1389    })1390    self._run_test(lambda df: df.all(), df)1391    self._run_test(lambda df: df.any(), df)1392    self._run_test(lambda df: df.all(bool_only=True), df)1393    self._run_test(lambda df: df.any(bool_only=True), df)1394  @unittest.skip(1395      "pandas doesn't implement bool_only= with level= "1396      "(https://github.com/pandas-dev/pandas/blob/"1397      "v1.2.3/pandas/core/generic.py#L10573)")1398  def test_dataframe_agg_level_bool_only(self):1399    df = pd.DataFrame({1400        'all': [True for i in range(10)],1401        'any': [i % 3 == 0 for i in range(10)],1402        'int': range(10)1403    })1404    self._run_test(lambda df: df.set_index('int', drop=False).all(level=0), df)1405    self._run_test(lambda df: df.set_index('int', drop=False).any(level=0), df)1406    self._run_test(1407        lambda df: df.set_index('int', drop=False).all(level=0, bool_only=True),1408        df)1409    self._run_test(1410        lambda df: df.set_index('int', drop=False).any(level=0, bool_only=True),1411        df)1412  def test_series_agg_np_size(self):1413    self._run_test(1414        lambda df: df.set_index(['group', 'foo']).agg(np.size),1415        GROUPBY_DF,1416        check_proxy=False)1417  def test_df_agg_invalid_kwarg_raises(self):1418    self._run_error_test(lambda df: df.agg('mean', bool_only=True), GROUPBY_DF)1419    self._run_error_test(1420        lambda df: df.agg('any', numeric_only=True), GROUPBY_DF)1421    self._run_error_test(1422        lambda df: df.agg('median', min_count=3, numeric_only=True), GROUPBY_DF)1423  def test_series_agg_method_invalid_kwarg_raises(self):1424    self._run_error_test(lambda df: df.foo.median(min_count=3), GROUPBY_DF)1425    self._run_error_test(1426        lambda df: df.foo.agg('median', min_count=3), GROUPBY_DF)1427  @unittest.skipIf(1428      PD_VERSION < (1, 3),1429      (1430          "DataFrame.agg raises a different exception from the "1431          "aggregation methods. Fixed in "1432          "https://github.com/pandas-dev/pandas/pull/40543."))1433  def test_df_agg_method_invalid_kwarg_raises(self):1434    self._run_error_test(lambda df: df.mean(bool_only=True), GROUPBY_DF)1435    self._run_error_test(lambda df: df.any(numeric_only=True), GROUPBY_DF)1436    self._run_error_test(1437        lambda df: df.median(min_count=3, numeric_only=True), GROUPBY_DF)1438  def test_agg_min_count(self):1439    df = pd.DataFrame({1440        'good': [1, 2, 3, np.nan],1441        'bad': [np.nan, np.nan, np.nan, 4],1442    },1443                      index=['a', 'b', 'a', 'b'])1444    self._run_test(lambda df: df.sum(level=0, min_count=2), df)1445    self._run_test(lambda df: df.sum(min_count=3), df, nonparallel=True)1446    self._run_test(lambda df: df.sum(min_count=1), df, nonparallel=True)1447    self._run_test(lambda df: df.good.sum(min_count=2), df, nonparallel=True)1448    self._run_test(lambda df: df.bad.sum(min_count=2), df, nonparallel=True)1449  def test_series_agg_std(self):1450    s = pd.Series(range(10))1451    self._run_test(lambda s: s.agg('std'), s)1452    self._run_test(lambda s: s.agg('var'), s)1453    self._run_test(lambda s: s.agg(['std', 'sum']), s)1454    self._run_test(lambda s: s.agg(['var']), s)1455  def test_std_all_na(self):1456    s = pd.Series([np.nan] * 10)1457    self._run_test(lambda s: s.agg('std'), s)1458    self._run_test(lambda s: s.std(), s)1459  def test_std_mostly_na_with_ddof(self):1460    df = pd.DataFrame({1461        'one': [i if i % 8 == 0 else np.nan for i in range(8)],1462        'two': [i if i % 4 == 0 else np.nan for i in range(8)],1463        'three': [i if i % 2 == 0 else np.nan for i in range(8)],1464    },1465                      index=pd.MultiIndex.from_arrays(1466                          [list(range(8)), list(reversed(range(8)))],1467                          names=['forward', None]))1468    self._run_test(lambda df: df.std(), df)  # ddof=11469    self._run_test(lambda df: df.std(ddof=0), df)1470    self._run_test(lambda df: df.std(ddof=2), df)1471    self._run_test(lambda df: df.std(ddof=3), df)1472    self._run_test(lambda df: df.std(ddof=4), df)1473  def test_dataframe_std(self):1474    self._run_test(lambda df: df.std(numeric_only=True), GROUPBY_DF)1475    self._run_test(lambda df: df.var(numeric_only=True), GROUPBY_DF)1476  def test_dataframe_mode(self):1477    self._run_test(1478        lambda df: df.mode(), GROUPBY_DF, nonparallel=True, check_proxy=False)1479    self._run_test(1480        lambda df: df.mode(numeric_only=True),1481        GROUPBY_DF,1482        nonparallel=True,1483        check_proxy=False)1484    self._run_test(1485        lambda df: df.mode(dropna=True, numeric_only=True),1486        GROUPBY_DF,1487        nonparallel=True,1488        check_proxy=False)1489  def test_series_mode(self):1490    self._run_test(lambda df: df.foo.mode(), GROUPBY_DF, nonparallel=True)1491    self._run_test(1492        lambda df: df.baz.mode(dropna=True), GROUPBY_DF, nonparallel=True)1493class BeamSpecificTest(unittest.TestCase):1494  """Tests for functionality that's specific to the Beam DataFrame API.1495  These features don't exist in pandas so we must verify them independently."""1496  def assert_frame_data_equivalent(self, actual, expected):1497    """Verify that actual is the same as expected, ignoring the index and order1498    of the data."""1499    def sort_and_drop_index(df):1500      if isinstance(df, pd.Series):1501        df = df.sort_values()1502      elif isinstance(df, pd.DataFrame):1503        df = df.sort_values(by=list(df.columns))1504      return df.reset_index(drop=True)1505    actual = sort_and_drop_index(actual)1506    expected = sort_and_drop_index(expected)1507    if isinstance(expected, pd.Series):1508      pd.testing.assert_series_equal(actual, expected)1509    elif isinstance(expected, pd.DataFrame):1510      pd.testing.assert_frame_equal(actual, expected)1511  def _evaluate(self, func, *args, distributed=True):1512    deferred_args = [1513        frame_base.DeferredFrame.wrap(1514            expressions.ConstantExpression(arg, arg[0:0])) for arg in args1515    ]1516    session_type = (1517        expressions.PartitioningSession if distributed else expressions.Session)1518    return session_type({}).evaluate(func(*deferred_args)._expr)1519  def test_drop_duplicates_keep_any(self):1520    df = pd.DataFrame({1521        'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],1522        'style': ['cup', 'cup', 'cup', 'pack', 'pack'],1523        'rating': [4, 4, 3.5, 15, 5]1524    })1525    result = self._evaluate(lambda df: df.drop_duplicates(keep='any'), df)1526    # Verify that the result is the same as conventional drop_duplicates1527    self.assert_frame_data_equivalent(result, df.drop_duplicates())1528  def test_drop_duplicates_keep_any_subset(self):1529    df = pd.DataFrame({1530        'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],1531        'style': ['cup', 'cup', 'cup', 'pack', 'pack'],1532        'rating': [4, 4, 3.5, 15, 5]1533    })1534    result = self._evaluate(1535        lambda df: df.drop_duplicates(keep='any', subset=['brand']), df)1536    self.assertTrue(result.brand.unique)1537    self.assert_frame_data_equivalent(1538        result.brand, df.drop_duplicates(subset=['brand']).brand)1539  def test_series_drop_duplicates_keep_any(self):1540    df = pd.DataFrame({1541        'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],1542        'style': ['cup', 'cup', 'cup', 'pack', 'pack'],1543        'rating': [4, 4, 3.5, 15, 5]1544    })1545    result = self._evaluate(lambda df: df.brand.drop_duplicates(keep='any'), df)1546    self.assert_frame_data_equivalent(result, df.brand.drop_duplicates())1547  def test_duplicated_keep_any(self):1548    df = pd.DataFrame({1549        'brand': ['Yum Yum', 'Yum Yum', 'Indomie', 'Indomie', 'Indomie'],1550        'style': ['cup', 'cup', 'cup', 'pack', 'pack'],1551        'rating': [4, 4, 3.5, 15, 5]1552    })1553    result = self._evaluate(lambda df: df.duplicated(keep='any'), df)1554    # Verify that the result is the same as conventional duplicated1555    self.assert_frame_data_equivalent(result, df.duplicated())1556  def test_nsmallest_any(self):1557    df = pd.DataFrame({1558        'population': [1559            59000000,1560            65000000,1561            434000,1562            434000,1563            434000,1564            337000,1565            337000,1566            11300,1567            113001568        ],1569        'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],1570        'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]1571    },1572                      index=[1573                          "Italy",1574                          "France",1575                          "Malta",1576                          "Maldives",1577                          "Brunei",1578                          "Iceland",1579                          "Nauru",1580                          "Tuvalu",1581                          "Anguilla"1582                      ])1583    result = self._evaluate(1584        lambda df: df.population.nsmallest(3, keep='any'), df)1585    # keep='any' should produce the same result as keep='first',1586    # but not necessarily with the same index1587    self.assert_frame_data_equivalent(result, df.population.nsmallest(3))1588  def test_nlargest_any(self):1589    df = pd.DataFrame({1590        'population': [1591            59000000,1592            65000000,1593            434000,1594            434000,1595            434000,1596            337000,1597            337000,1598            11300,1599            113001600        ],1601        'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],1602        'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]1603    },1604                      index=[1605                          "Italy",1606                          "France",1607                          "Malta",1608                          "Maldives",1609                          "Brunei",1610                          "Iceland",1611                          "Nauru",1612                          "Tuvalu",1613                          "Anguilla"1614                      ])1615    result = self._evaluate(1616        lambda df: df.population.nlargest(3, keep='any'), df)1617    # keep='any' should produce the same result as keep='first',1618    # but not necessarily with the same index1619    self.assert_frame_data_equivalent(result, df.population.nlargest(3))1620  def test_sample(self):1621    df = pd.DataFrame({1622        'population': [1623            59000000,1624            65000000,1625            434000,1626            434000,1627            434000,1628            337000,1629            337000,1630            11300,1631            113001632        ],1633        'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],1634        'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]1635    },1636                      index=[1637                          "Italy",1638                          "France",1639                          "Malta",1640                          "Maldives",1641                          "Brunei",1642                          "Iceland",1643                          "Nauru",1644                          "Tuvalu",1645                          "Anguilla"1646                      ])1647    result = self._evaluate(lambda df: df.sample(n=3), df)1648    self.assertEqual(len(result), 3)1649    series_result = self._evaluate(lambda df: df.GDP.sample(n=3), df)1650    self.assertEqual(len(series_result), 3)1651    self.assertEqual(series_result.name, "GDP")1652  def test_sample_with_weights(self):1653    df = pd.DataFrame({1654        'population': [1655            59000000,1656            65000000,1657            434000,1658            434000,1659            434000,1660            337000,1661            337000,1662            11300,1663            113001664        ],1665        'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],1666        'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]1667    },1668                      index=[1669                          "Italy",1670                          "France",1671                          "Malta",1672                          "Maldives",1673                          "Brunei",1674                          "Iceland",1675                          "Nauru",1676                          "Tuvalu",1677                          "Anguilla"1678                      ])1679    weights = pd.Series([0, 0, 0, 0, 0, 0, 0, 1, 1], index=df.index)1680    result = self._evaluate(1681        lambda df, weights: df.sample(n=2, weights=weights), df, weights)1682    self.assertEqual(len(result), 2)1683    self.assertEqual(set(result.index), set(["Tuvalu", "Anguilla"]))1684    series_result = self._evaluate(1685        lambda df, weights: df.GDP.sample(n=2, weights=weights), df, weights)1686    self.assertEqual(len(series_result), 2)1687    self.assertEqual(series_result.name, "GDP")1688    self.assertEqual(set(series_result.index), set(["Tuvalu", "Anguilla"]))1689  def test_sample_with_missing_weights(self):1690    df = pd.DataFrame({1691        'population': [1692            59000000,1693            65000000,1694            434000,1695            434000,1696            434000,1697            337000,1698            337000,1699            11300,1700            113001701        ],1702        'GDP': [1937894, 2583560, 12011, 4520, 12128, 17036, 182, 38, 311],1703        'alpha-2': ["IT", "FR", "MT", "MV", "BN", "IS", "NR", "TV", "AI"]1704    },1705                      index=[1706                          "Italy",1707                          "France",1708                          "Malta",1709                          "Maldives",1710                          "Brunei",1711                          "Iceland",1712                          "Nauru",1713                          "Tuvalu",1714                          "Anguilla"1715                      ])1716    # Missing weights are treated as 01717    weights = pd.Series([.1, .01, np.nan, 0],1718                        index=["Nauru", "Iceland", "Anguilla", "Italy"])1719    result = self._evaluate(1720        lambda df, weights: df.sample(n=2, weights=weights), df, weights)1721    self.assertEqual(len(result), 2)1722    self.assertEqual(set(result.index), set(["Nauru", "Iceland"]))1723    series_result = self._evaluate(1724        lambda df, weights: df.GDP.sample(n=2, weights=weights), df, weights)1725    self.assertEqual(len(series_result), 2)1726    self.assertEqual(series_result.name, "GDP")1727    self.assertEqual(set(series_result.index), set(["Nauru", "Iceland"]))1728  def test_sample_with_weights_distribution(self):1729    target_prob = 0.251730    num_samples = 1001731    num_targets = 2001732    num_other_elements = 100001733    target_weight = target_prob / num_targets1734    other_weight = (1 - target_prob) / num_other_elements1735    self.assertTrue(target_weight > other_weight * 10, "weights too close")1736    result = self._evaluate(1737        lambda s,1738        weights: s.sample(n=num_samples, weights=weights).sum(),1739        # The first elements are 1, the rest are all 0.  This means that when1740        # we sum all the sampled elements (above), the result should be the1741        # number of times the first elements (aka targets) were sampled.1742        pd.Series([1] * num_targets + [0] * num_other_elements),1743        pd.Series([target_weight] * num_targets +1744                  [other_weight] * num_other_elements))1745    # With the above constants, the probability of violating this invariant1746    # (as computed using the Bernoulli distribution) is about 0.0012%.1747    expected = num_samples * target_prob1748    self.assertTrue(expected / 3 < result < expected * 2, (expected, result))1749class AllowNonParallelTest(unittest.TestCase):1750  def _use_non_parallel_operation(self):1751    _ = frame_base.DeferredFrame.wrap(1752        expressions.PlaceholderExpression(pd.Series([1, 2, 3]))).replace(1753            'a', 'b', limit=1)1754  def test_disallow_non_parallel(self):1755    with self.assertRaises(expressions.NonParallelOperation):1756      self._use_non_parallel_operation()1757  def test_allow_non_parallel_in_context(self):1758    with beam.dataframe.allow_non_parallel_operations():1759      self._use_non_parallel_operation()1760  def test_allow_non_parallel_nesting(self):1761    # disallowed1762    with beam.dataframe.allow_non_parallel_operations():1763      # allowed1764      self._use_non_parallel_operation()1765      with beam.dataframe.allow_non_parallel_operations(False):1766        # disallowed again1767        with self.assertRaises(expressions.NonParallelOperation):1768          self._use_non_parallel_operation()1769      # allowed1770      self._use_non_parallel_operation()1771    # disallowed1772    with self.assertRaises(expressions.NonParallelOperation):1773      self._use_non_parallel_operation()1774class ConstructionTimeTest(unittest.TestCase):1775  """Tests for operations that can be executed eagerly."""1776  DF = pd.DataFrame({1777      'str_col': ['foo', 'bar'] * 3,1778      'int_col': [1, 2] * 3,1779      'flt_col': [1.1, 2.2] * 3,1780      'cat_col': pd.Series(list('aabbca'), dtype="category"),1781      'datetime_col': pd.Series(1782          pd.date_range(1783              '1/1/2000', periods=6, freq='m', tz='America/Los_Angeles'))1784  })1785  DEFERRED_DF = frame_base.DeferredFrame.wrap(1786      expressions.PlaceholderExpression(DF.iloc[:0]))1787  def _run_test(self, fn):1788    expected = fn(self.DF)1789    actual = fn(self.DEFERRED_DF)1790    if isinstance(expected, pd.Index):1791      pd.testing.assert_index_equal(expected, actual)1792    elif isinstance(expected, pd.Series):1793      pd.testing.assert_series_equal(expected, actual)1794    elif isinstance(expected, pd.DataFrame):1795      pd.testing.assert_frame_equal(expected, actual)1796    else:1797      self.assertEqual(expected, actual)1798  @parameterized.expand(DF.columns)1799  def test_series_name(self, col_name):1800    self._run_test(lambda df: df[col_name].name)1801  @parameterized.expand(DF.columns)1802  def test_series_dtype(self, col_name):1803    self._run_test(lambda df: df[col_name].dtype)1804    self._run_test(lambda df: df[col_name].dtypes)1805  def test_dataframe_columns(self):1806    self._run_test(lambda df: list(df.columns))1807  def test_dataframe_dtypes(self):1808    self._run_test(lambda df: list(df.dtypes))1809  def test_categories(self):1810    self._run_test(lambda df: df.cat_col.cat.categories)1811  def test_categorical_ordered(self):1812    self._run_test(lambda df: df.cat_col.cat.ordered)1813  def test_groupby_ndim(self):1814    self._run_test(lambda df: df.groupby('int_col').ndim)1815  def test_groupby_project_ndim(self):1816    self._run_test(lambda df: df.groupby('int_col').flt_col.ndim)1817    self._run_test(1818        lambda df: df.groupby('int_col')[['flt_col', 'str_col']].ndim)1819  def test_get_column_default_None(self):1820    # .get just returns default_value=None at construction time if the column1821    # doesn't exist1822    self._run_test(lambda df: df.get('FOO'))1823  def test_datetime_tz(self):1824    self._run_test(lambda df: df.datetime_col.dt.tz)1825class DocstringTest(unittest.TestCase):1826  @parameterized.expand([1827      (frames.DeferredDataFrame, pd.DataFrame),1828      (frames.DeferredSeries, pd.Series),1829      #(frames._DeferredIndex, pd.Index),1830      (frames._DeferredStringMethods, pd.core.strings.StringMethods),1831      (1832          frames._DeferredCategoricalMethods,1833          pd.core.arrays.categorical.CategoricalAccessor),1834      (frames.DeferredGroupBy, pd.core.groupby.generic.DataFrameGroupBy),1835      (frames._DeferredGroupByCols, pd.core.groupby.generic.DataFrameGroupBy),1836      (1837          frames._DeferredDatetimeMethods,1838          pd.core.indexes.accessors.DatetimeProperties),...pegparser_test.py
Source:pegparser_test.py  
...8import sys9import unittest10from pegparser import *11class PegParserTestCase(unittest.TestCase):12  def _run_test(self, grammar, text, expected,13          strings_are_tokens=False, whitespace_rule=None):14    """Utility for running a parser test and comparing results.15    Program exits (sys.exit) if expected does not match actual.16    Args:17      grammar -- the root rule to be used by the parser.18      text -- the text to parse.19      expected -- the expected abstract syntax tree. None means20        failure is expected.21      strings_are_tokens -- whether strings are treated as tokens.22      whitespace_rule -- the rule used for matching whitespace.23        Default is None, which means that no whitespace is tolerated.24    """25    parser = PegParser(grammar, whitespace_rule,26               strings_are_tokens=strings_are_tokens)27    actual = None28    error = None29    try:30      actual = parser.parse(text)31    except SyntaxError, e:32      error = e33      pass34    if actual != expected:35      msg = '''36CONTENT:37%s38EXPECTED:39%s40ACTUAL:41%s42ERROR: %s''' % (text, pprint.pformat(expected), pprint.pformat(actual), error)43      self.fail(msg)44  def test_sequence(self):45    sequence = SEQUENCE('A', 'BB', 'C')46    self._run_test(grammar=sequence, text='ABBC', expected=['A', 'BB', 'C'])47    self._run_test(grammar=sequence, text='BBAC', expected=None)48    # Syntax Sugar49    sequence = ['A', 'BB', 'C']50    self._run_test(grammar=sequence, text='ABBC', expected=['A', 'BB', 'C'])51    self._run_test(grammar=sequence, text='BBAC', expected=None)52  def test_regex(self):53    regex = re.compile(r'[A-Za-z]*')54    self._run_test(grammar=regex, text='AaBb', expected='AaBb')55    self._run_test(grammar=regex, text='0AaBb', expected=None)56    self._run_test(grammar=regex, text='Aa0Bb', expected=None)57  def test_function(self):58    def Func():59      return 'ABC'60    self._run_test(grammar=Func, text='ABC', expected=('Func', 'ABC'))61    self._run_test(grammar=Func, text='XYZ', expected=None)62  def test_function_label(self):63    def func():64      return 'ABC'65    def _func():66      return 'ABC'67    self._run_test(grammar=func, text='ABC', expected=('func', 'ABC'))68    self._run_test(grammar=_func, text='ABC', expected='ABC')69  def test_label(self):70    sequence = [TOKEN('def'), LABEL('funcName', re.compile(r'[a-z0-9]*')),71          TOKEN('():')]72    self._run_test(grammar=sequence, text='def f1():',73      whitespace_rule=' ', expected=[('funcName', 'f1')])74    self._run_test(grammar=sequence, text='def f2():',75      whitespace_rule=' ', expected=[('funcName', 'f2')])76  def test_or(self):77    grammer = OR('A', 'B')78    self._run_test(grammar=grammer, text='A', expected='A')79    self._run_test(grammar=grammer, text='B', expected='B')80    self._run_test(grammar=grammer, text='C', expected=None)81  def test_maybe(self):82    seq = ['A', MAYBE('B'), 'C']83    self._run_test(grammar=seq, text='ABC', expected=['A', 'B', 'C'])84    self._run_test(grammar=seq, text='ADC', expected=None)85    self._run_test(grammar=seq, text='AC', expected=['A', 'C'])86    self._run_test(grammar=seq, text='AB', expected=None)87  def test_many(self):88    seq = ['A', MANY('B'), 'C']89    self._run_test(grammar=seq, text='ABC', expected=['A', 'B', 'C'])90    self._run_test(grammar=seq, text='ABBBBC',91      expected=['A', 'B', 'B', 'B', 'B', 'C'])92    self._run_test(grammar=seq, text='AC', expected=None)93  def test_many_with_separator(self):94    letter = OR('A', 'B', 'C')95    def _gram():96      return [letter, MAYBE([TOKEN(','), _gram])]97    self._run_test(grammar=_gram, text='A,B,C,B',98      expected=['A', 'B', 'C', 'B'])99    self._run_test(grammar=_gram, text='A B C', expected=None)100    shortergrammar = MANY(letter, TOKEN(','))101    self._run_test(grammar=shortergrammar, text='A,B,C,B',102      expected=['A', 'B', 'C', 'B'])103    self._run_test(grammar=shortergrammar, text='A B C', expected=None)104  def test_raise(self):105    self._run_test(grammar=['A', 'B'], text='AB',106      expected=['A', 'B'])107    try:108      self._run_test(grammar=['A', 'B', RAISE('test')], text='AB',109        expected=None)110      print 'Expected RuntimeError'111      sys.exit(-1)112    except RuntimeError, e:113      return114  def test_whitespace(self):115    gram = MANY('A')116    self._run_test(grammar=gram, text='A A  A', expected=None)117    self._run_test(grammar=gram, whitespace_rule=' ', text='A A  A',118      expected=['A', 'A', 'A'])119  def test_math_expression_syntax(self):120    operator = LABEL('op', OR('+', '-', '/', '*'))121    literal = LABEL('num', re.compile(r'[0-9]+'))122    def _exp():123      return MANY(OR(literal, [TOKEN('('), _exp, TOKEN(')')]),124            separator=operator)125    self._run_test(grammar=_exp,126      text='(1-2)+3*((4*5)*6)+(7+8/9)-10',127      expected=[[('num', '1'), ('op', '-'), ('num', '2')],128        ('op', '+'),129        ('num', '3'),130        ('op', '*'),131        [[('num', '4'), ('op', '*'), ('num', '5')],132          ('op', '*'), ('num', '6')],133        ('op', '+'),134        [('num', '7'), ('op', '+'), ('num', '8'),135         ('op', '/'), ('num', '9')],136        ('op', '-'),137        ('num', '10')])138  def test_mini_language(self):139    def name():140      return re.compile(r'[a-z]+')141    def var_decl():142      return ['var', name, ';']143    def func_invoke():144      return [name, '(', ')', ';']145    def func_body():146      return MANY(OR(var_decl, func_invoke))147    def func_decl():148      return ['function', name, '(', ')', '{', func_body, '}']149    def args():150      return MANY(name, ',')151    def program():152      return MANY(OR(var_decl, func_decl))153    self._run_test(grammar=program,154      whitespace_rule=OR('\n', ' '),155      strings_are_tokens=True,156      text='var x;\nfunction f(){\n  var y;\n  g();\n}\n',157      expected=('program',[158             ('var_decl', [('name', 'x')]),159             ('func_decl', [('name', 'f'), ('func_body', [160              ('var_decl', [('name', 'y')]),161              ('func_invoke', [('name', 'g')])])])]))162if __name__ == "__main__":163  logging.config.fileConfig("logging.conf")164  if __name__ == '__main__':...test_problems.py
Source:test_problems.py  
...22from ..problems import *23from ..operators import RandomGenerator24class TestProblemsSimple(unittest.TestCase):25    def test_DTLZ1(self):26        self._run_test(DTLZ1(2))27    28    def test_DTLZ2(self):29        self._run_test(DTLZ2(2))30        31    def test_DTLZ3(self):32        self._run_test(DTLZ3(2))33        34    def test_DTLZ4(self):35        self._run_test(DTLZ4(2))36        37    def test_DTLZ7(self):38        self._run_test(DTLZ7(2))39    def test_WFG1(self):40        self._run_test(WFG1(2))41    def test_WFG2(self):42        self._run_test(WFG2(2))43        44    def test_WFG3(self):45        self._run_test(WFG3(2))46        47    def test_WFG4(self):48        self._run_test(WFG4(2))49        50    def test_WFG5(self):51        self._run_test(WFG5(2))52        53    def test_WFG6(self):54        self._run_test(WFG6(2))55        56    def test_WFG7(self):57        self._run_test(WFG7(2))58        59    def test_WFG8(self):60        self._run_test(WFG8(2))61        62    def test_WFG9(self):63        self._run_test(WFG9(2))64    def test_UF1(self):65        self._run_test(UF1())66        67    def test_UF2(self):68        self._run_test(UF2())69    70    def test_UF3(self):71        self._run_test(UF3())72        73    def test_UF4(self):74        self._run_test(UF4())75        76    def test_UF5(self):77        self._run_test(UF5())78        79    def test_UF6(self):80        self._run_test(UF6())81        82    def test_UF7(self):83        self._run_test(UF7())84        85    def test_UF8(self):86        self._run_test(UF8())87        88    def test_UF9(self):89        self._run_test(UF9())90        91    def test_UF10(self):92        self._run_test(UF10())93        94    def test_UF11(self):95        self._run_test(UF11())96        97    def test_UF12(self):98        self._run_test(UF12())99        100    def test_UF13(self):101        self._run_test(UF13())102        103    def test_CF1(self):104        self._run_test(CF1())105        106    def test_CF2(self):107        self._run_test(CF2())108        109    def test_CF3(self):110        self._run_test(CF3())111        112    def test_CF4(self):113        self._run_test(CF4())114        115    def test_CF5(self):116        self._run_test(CF5())117        118    def test_CF6(self):119        self._run_test(CF6())120        121    def test_CF7(self):122        self._run_test(CF7())123        124    def test_CF8(self):125        self._run_test(CF8())126        127    def test_CF9(self):128        self._run_test(CF9())129        130    def test_CF10(self):131        self._run_test(CF10())132        133    def test_ZDT1(self):134        self._run_test(ZDT1())135        136    def test_ZDT2(self):137        self._run_test(ZDT2())138        139    def test_ZDT3(self):140        self._run_test(ZDT3())141        142    def test_ZDT4(self):143        self._run_test(ZDT4())144        145    def test_ZDT5(self):146        self._run_test(ZDT5())147        148    def test_ZDT6(self):149        self._run_test(ZDT6())150        151    def _run_test(self, problem):152        if hasattr(problem, "random"):153            solution = problem.random()154        else:155            solution = RandomGenerator().generate(problem)156            ...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
