diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 27660d2ba7..f7de5c051a 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -459,7 +459,7 @@ def project_window_op( for column in inputs: clauses.append((column.isnull(), ibis_types.null())) if window_spec.min_periods and len(inputs) > 0: - if expression.op.skips_nulls: + if not expression.op.nulls_count_for_min_values: # Most operations do not count NULL values towards min_periods per_col_does_count = (column.notnull() for column in inputs) # All inputs must be non-null for observation to count diff --git a/bigframes/core/groupby/dataframe_group_by.py b/bigframes/core/groupby/dataframe_group_by.py index a2c4cf2867..40bedd93d6 100644 --- a/bigframes/core/groupby/dataframe_group_by.py +++ b/bigframes/core/groupby/dataframe_group_by.py @@ -263,6 +263,48 @@ def kurt( kurtosis = kurt + @validations.requires_ordering() + def first(self, numeric_only: bool = False, min_count: int = -1) -> df.DataFrame: + window_spec = window_specs.unbound( + grouping_keys=tuple(self._by_col_ids), + min_periods=min_count if min_count >= 0 else 0, + ) + target_cols, index = self._aggregated_columns(numeric_only) + block, firsts_ids = self._block.multi_apply_window_op( + target_cols, + agg_ops.FirstNonNullOp(), + window_spec=window_spec, + ) + block, _ = block.aggregate( + self._by_col_ids, + tuple( + aggs.agg(firsts_id, agg_ops.AnyValueOp()) for firsts_id in firsts_ids + ), + dropna=self._dropna, + column_labels=index, + ) + return df.DataFrame(block) + + @validations.requires_ordering() + def last(self, numeric_only: bool = False, min_count: int = -1) -> df.DataFrame: + window_spec = window_specs.unbound( + grouping_keys=tuple(self._by_col_ids), + min_periods=min_count if min_count >= 0 else 0, + ) + target_cols, index = self._aggregated_columns(numeric_only) + block, lasts_ids = self._block.multi_apply_window_op( + target_cols, + agg_ops.LastNonNullOp(), + window_spec=window_spec, + ) + block, _ = block.aggregate( + self._by_col_ids, + tuple(aggs.agg(lasts_id, agg_ops.AnyValueOp()) for lasts_id in lasts_ids), + dropna=self._dropna, + column_labels=index, + ) + return df.DataFrame(block) + def all(self) -> df.DataFrame: return self._aggregate_all(agg_ops.all_op) diff --git a/bigframes/core/groupby/series_group_by.py b/bigframes/core/groupby/series_group_by.py index a29bb45a32..24b5cba130 100644 --- a/bigframes/core/groupby/series_group_by.py +++ b/bigframes/core/groupby/series_group_by.py @@ -36,6 +36,7 @@ import bigframes.core.window as windows import bigframes.core.window_spec as window_specs import bigframes.dataframe as df +import bigframes.dtypes import bigframes.operations.aggregations as agg_ops import bigframes.series as series @@ -162,6 +163,54 @@ def kurt(self, *args, **kwargs) -> series.Series: kurtosis = kurt + @validations.requires_ordering() + def first(self, numeric_only: bool = False, min_count: int = -1) -> series.Series: + if numeric_only and not bigframes.dtypes.is_numeric( + self._block.expr.get_column_type(self._value_column) + ): + raise TypeError( + f"Cannot use 'numeric_only' with non-numeric column {self._value_name}." + ) + window_spec = window_specs.unbound( + grouping_keys=tuple(self._by_col_ids), + min_periods=min_count if min_count >= 0 else 0, + ) + block, firsts_id = self._block.apply_window_op( + self._value_column, + agg_ops.FirstNonNullOp(), + window_spec=window_spec, + ) + block, _ = block.aggregate( + self._by_col_ids, + (aggs.agg(firsts_id, agg_ops.AnyValueOp()),), + dropna=self._dropna, + ) + return series.Series(block.with_column_labels([self._value_name])) + + @validations.requires_ordering() + def last(self, numeric_only: bool = False, min_count: int = -1) -> series.Series: + if numeric_only and not bigframes.dtypes.is_numeric( + self._block.expr.get_column_type(self._value_column) + ): + raise TypeError( + f"Cannot use 'numeric_only' with non-numeric column {self._value_name}." + ) + window_spec = window_specs.unbound( + grouping_keys=tuple(self._by_col_ids), + min_periods=min_count if min_count >= 0 else 0, + ) + block, firsts_id = self._block.apply_window_op( + self._value_column, + agg_ops.LastNonNullOp(), + window_spec=window_spec, + ) + block, _ = block.aggregate( + self._by_col_ids, + (aggs.agg(firsts_id, agg_ops.AnyValueOp()),), + dropna=self._dropna, + ) + return series.Series(block.with_column_labels([self._value_name])) + def prod(self, *args) -> series.Series: return self._aggregate(agg_ops.product_op) @@ -314,7 +363,7 @@ def _apply_window_op( discard_name=False, window: typing.Optional[window_specs.WindowSpec] = None, never_skip_nulls: bool = False, - ): + ) -> series.Series: """Apply window op to groupby. Defaults to grouped cumulative window.""" window_spec = window or window_specs.cumulative_rows( grouping_keys=tuple(self._by_col_ids) diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 1c321c0bf8..984f7d3798 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -33,6 +33,11 @@ def skips_nulls(self): """Whether the window op skips null rows.""" return True + @property + def nulls_count_for_min_values(self) -> bool: + """Whether null values count for min_values.""" + return not self.skips_nulls + @property def implicitly_inherits_order(self): """ @@ -480,6 +485,10 @@ class FirstNonNullOp(UnaryWindowOp): def skips_nulls(self): return False + @property + def nulls_count_for_min_values(self) -> bool: + return False + @dataclasses.dataclass(frozen=True) class LastOp(UnaryWindowOp): @@ -492,6 +501,10 @@ class LastNonNullOp(UnaryWindowOp): def skips_nulls(self): return False + @property + def nulls_count_for_min_values(self) -> bool: + return False + @dataclasses.dataclass(frozen=True) class ShiftOp(UnaryWindowOp): diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index 0af173adc8..5d3f015de8 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -768,3 +768,101 @@ def test_series_groupby_quantile(scalars_df_index, scalars_pandas_df_index, q): pd.testing.assert_series_equal( pd_result, bf_result, check_dtype=False, check_index_type=False ) + + +@pytest.mark.parametrize( + ("numeric_only", "min_count"), + [ + (True, 2), + (False, -1), + ], +) +def test_series_groupby_first( + scalars_df_index, scalars_pandas_df_index, numeric_only, min_count +): + bf_result = ( + scalars_df_index.groupby("string_col")["int64_col"].first( + numeric_only=numeric_only, min_count=min_count + ) + ).to_pandas() + pd_result = scalars_pandas_df_index.groupby("string_col")["int64_col"].first( + numeric_only=numeric_only, min_count=min_count + ) + pd.testing.assert_series_equal( + pd_result, + bf_result, + ) + + +@pytest.mark.parametrize( + ("numeric_only", "min_count"), + [ + (False, 4), + (True, 0), + ], +) +def test_series_groupby_last( + scalars_df_index, scalars_pandas_df_index, numeric_only, min_count +): + bf_result = ( + scalars_df_index.groupby("string_col")["int64_col"].last( + numeric_only=numeric_only, min_count=min_count + ) + ).to_pandas() + pd_result = scalars_pandas_df_index.groupby("string_col")["int64_col"].last( + numeric_only=numeric_only, min_count=min_count + ) + pd.testing.assert_series_equal(pd_result, bf_result) + + +@pytest.mark.parametrize( + ("numeric_only", "min_count"), + [ + (False, 4), + (True, 0), + ], +) +def test_dataframe_groupby_first( + scalars_df_index, scalars_pandas_df_index, numeric_only, min_count +): + # min_count seems to not work properly on older pandas + pytest.importorskip("pandas", minversion="2.0.0") + # bytes, dates not handling min_count properly in pandas + bf_result = ( + scalars_df_index.drop(columns=["bytes_col", "date_col"]) + .groupby(scalars_df_index.int64_col % 2) + .first(numeric_only=numeric_only, min_count=min_count) + ).to_pandas() + pd_result = ( + scalars_pandas_df_index.drop(columns=["bytes_col", "date_col"]) + .groupby(scalars_pandas_df_index.int64_col % 2) + .first(numeric_only=numeric_only, min_count=min_count) + ) + pd.testing.assert_frame_equal( + pd_result, + bf_result, + ) + + +@pytest.mark.parametrize( + ("numeric_only", "min_count"), + [ + (True, 2), + (False, -1), + ], +) +def test_dataframe_groupby_last( + scalars_df_index, scalars_pandas_df_index, numeric_only, min_count +): + bf_result = ( + scalars_df_index.groupby(scalars_df_index.int64_col % 2).last( + numeric_only=numeric_only, min_count=min_count + ) + ).to_pandas() + pd_result = scalars_pandas_df_index.groupby( + scalars_pandas_df_index.int64_col % 2 + ).last(numeric_only=numeric_only, min_count=min_count) + pd.testing.assert_frame_equal( + pd_result, + bf_result, + ) diff --git a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py index ebfbfa8830..777846ff80 100644 --- a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py +++ b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py @@ -537,6 +537,80 @@ def kurtosis( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def first(self, numeric_only: bool = False, min_count: int = -1): + """ + Compute the first entry of each column within each group. + + Defaults to skipping NA elements. + + **Examples:** + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> df = bpd.DataFrame(dict(A=[1, 1, 3], B=[None, 5, 6], C=[1, 2, 3])) + >>> df.groupby("A").first() + B C + A + 1 5.0 1 + 3 6.0 3 + + [2 rows x 2 columns] + + >>> df.groupby("A").first(min_count=2) + B C + A + 1 1 + 3 + + [2 rows x 2 columns] + + Args: + numeric_only (bool, default False): + Include only float, int, boolean columns. If None, will attempt to use + everything, then use only numeric data. + min_count (int, default -1): + The required number of valid values to perform the operation. If fewer + than ``min_count`` valid values are present the result will be NA. + + Returns: + bigframes.pandas.DataFrame or bigframes.pandas.Series: + First of values within each group. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + def last(self, numeric_only: bool = False, min_count: int = -1): + """ + Compute the last entry of each column within each group. + + Defaults to skipping NA elements. + + **Examples:** + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> df = bpd.DataFrame(dict(A=[1, 1, 3], B=[5, None, 6], C=[1, 2, 3])) + >>> df.groupby("A").last() + B C + A + 1 5.0 2 + 3 6.0 3 + + [2 rows x 2 columns] + + Args: + numeric_only (bool, default False): + Include only float, int, boolean columns. If None, will attempt to use + everything, then use only numeric data. + min_count (int, default -1): + The required number of valid values to perform the operation. If fewer + than ``min_count`` valid values are present the result will be NA. + + Returns: + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Last of values within each group. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def sum( self, numeric_only: bool = False,