diff --git a/bigframes/core/compile/sqlglot/aggregations/nullary_compiler.py b/bigframes/core/compile/sqlglot/aggregations/nullary_compiler.py index 720ce743a6..99e3562b42 100644 --- a/bigframes/core/compile/sqlglot/aggregations/nullary_compiler.py +++ b/bigframes/core/compile/sqlglot/aggregations/nullary_compiler.py @@ -20,7 +20,7 @@ from bigframes.core import window_spec import bigframes.core.compile.sqlglot.aggregations.op_registration as reg -from bigframes.core.compile.sqlglot.aggregations.utils import apply_window_if_present +from bigframes.core.compile.sqlglot.aggregations.windows import apply_window_if_present from bigframes.operations import aggregations as agg_ops NULLARY_OP_REGISTRATION = reg.OpRegistration() diff --git a/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py b/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py index 75ba090bc4..eddf7f56d2 100644 --- a/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py +++ b/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py @@ -20,7 +20,7 @@ from bigframes.core import window_spec import bigframes.core.compile.sqlglot.aggregations.op_registration as reg -from bigframes.core.compile.sqlglot.aggregations.utils import apply_window_if_present +from bigframes.core.compile.sqlglot.aggregations.windows import apply_window_if_present import bigframes.core.compile.sqlglot.expressions.typed_expr as typed_expr import bigframes.core.compile.sqlglot.sqlglot_ir as ir from bigframes.operations import aggregations as agg_ops diff --git a/bigframes/core/compile/sqlglot/aggregations/utils.py b/bigframes/core/compile/sqlglot/aggregations/utils.py deleted file mode 100644 index 57470cde5b..0000000000 --- a/bigframes/core/compile/sqlglot/aggregations/utils.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from __future__ import annotations - -import typing - -import sqlglot.expressions as sge - -from bigframes.core import window_spec - - -def apply_window_if_present( - value: sge.Expression, - window: typing.Optional[window_spec.WindowSpec] = None, -) -> sge.Expression: - if window is not None: - raise NotImplementedError("Can't apply window to the expression.") - return value diff --git a/bigframes/core/compile/sqlglot/aggregations/windows.py b/bigframes/core/compile/sqlglot/aggregations/windows.py new file mode 100644 index 0000000000..47fd43bd08 --- /dev/null +++ b/bigframes/core/compile/sqlglot/aggregations/windows.py @@ -0,0 +1,153 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import typing + +import sqlglot.expressions as sge + +from bigframes.core import utils, window_spec +import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler +import bigframes.core.ordering as ordering_spec + + +def apply_window_if_present( + value: sge.Expression, + window: typing.Optional[window_spec.WindowSpec] = None, +) -> sge.Expression: + if window is None: + return value + + if window.is_row_bounded and not window.ordering: + raise ValueError("No ordering provided for ordered analytic function") + elif ( + not window.is_row_bounded + and not window.is_range_bounded + and not window.ordering + ): + # Unbound grouping window. + order_by = None + elif window.is_range_bounded: + # Note that, when the window is range-bounded, we only need one ordering key. + # There are two reasons: + # 1. Manipulating null positions requires more than one ordering key, which + # is forbidden by SQL window syntax for range rolling. + # 2. Pandas does not allow range rolling on timeseries with nulls. + order_by = get_window_order_by((window.ordering[0],), override_null_order=False) + else: + order_by = get_window_order_by(window.ordering, override_null_order=True) + + order = sge.Order(expressions=order_by) if order_by else None + + group_by = ( + [scalar_compiler.compile_scalar_expression(key) for key in window.grouping_keys] + if window.grouping_keys + else None + ) + + # This is the key change. Don't create a spec for the default window frame + # if there's no ordering. This avoids generating an `ORDER BY NULL` clause. + if not window.bounds and not order: + return sge.Window(this=value, partition_by=group_by) + + kind = ( + "ROWS" if isinstance(window.bounds, window_spec.RowsWindowBounds) else "RANGE" + ) + + start: typing.Union[int, float, None] = None + end: typing.Union[int, float, None] = None + if isinstance(window.bounds, window_spec.RangeWindowBounds): + if window.bounds.start is not None: + start = utils.timedelta_to_micros(window.bounds.start) + if window.bounds.end is not None: + end = utils.timedelta_to_micros(window.bounds.end) + elif window.bounds: + start = window.bounds.start + end = window.bounds.end + + start_value, start_side = _get_window_bounds(start, is_preceding=True) + end_value, end_side = _get_window_bounds(end, is_preceding=False) + + spec = sge.WindowSpec( + kind=kind, + start=start_value, + start_side=start_side, + end=end_value, + end_side=end_side, + over="OVER", + ) + + return sge.Window(this=value, partition_by=group_by, order=order, spec=spec) + + +def get_window_order_by( + ordering: typing.Tuple[ordering_spec.OrderingExpression, ...], + override_null_order: bool = False, +) -> typing.Optional[tuple[sge.Ordered, ...]]: + """Returns the SQL order by clause for a window specification.""" + if not ordering: + return None + + order_by = [] + for ordering_spec_item in ordering: + expr = scalar_compiler.compile_scalar_expression( + ordering_spec_item.scalar_expression + ) + desc = not ordering_spec_item.direction.is_ascending + nulls_first = not ordering_spec_item.na_last + + if override_null_order: + # Bigquery SQL considers NULLS to be "smallest" values, but we need + # to override in these cases. + is_null_expr = sge.Is(this=expr, expression=sge.Null()) + if nulls_first and desc: + order_by.append( + sge.Ordered( + this=is_null_expr, + desc=desc, + nulls_first=nulls_first, + ) + ) + elif not nulls_first and not desc: + order_by.append( + sge.Ordered( + this=is_null_expr, + desc=desc, + nulls_first=nulls_first, + ) + ) + + order_by.append( + sge.Ordered( + this=expr, + desc=desc, + nulls_first=nulls_first, + ) + ) + return tuple(order_by) + + +def _get_window_bounds( + value, is_preceding: bool +) -> tuple[typing.Union[str, sge.Expression], typing.Optional[str]]: + """Compiles a single boundary value into its SQL components.""" + if value is None: + side = "PRECEDING" if is_preceding else "FOLLOWING" + return "UNBOUNDED", side + + if value == 0: + return "CURRENT ROW", None + + side = "PRECEDING" if value < 0 else "FOLLOWING" + return sge.convert(abs(value)), side diff --git a/bigframes/core/compile/sqlglot/compiler.py b/bigframes/core/compile/sqlglot/compiler.py index 1c5aaf50a8..2ae6b4bb9c 100644 --- a/bigframes/core/compile/sqlglot/compiler.py +++ b/bigframes/core/compile/sqlglot/compiler.py @@ -23,6 +23,7 @@ from bigframes.core import expression, guid, identifiers, nodes, pyarrow_utils, rewrite from bigframes.core.compile import configs import bigframes.core.compile.sqlglot.aggregate_compiler as aggregate_compiler +from bigframes.core.compile.sqlglot.aggregations import windows from bigframes.core.compile.sqlglot.expressions import typed_expr import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler import bigframes.core.compile.sqlglot.sqlglot_ir as ir @@ -272,18 +273,16 @@ def compile_random_sample( def compile_aggregate( self, node: nodes.AggregateNode, child: ir.SQLGlotIR ) -> ir.SQLGlotIR: - ordering_cols = tuple( - sge.Ordered( - this=scalar_compiler.compile_scalar_expression( - ordering.scalar_expression - ), - desc=ordering.direction.is_ascending is False, - nulls_first=ordering.na_last is False, - ) - for ordering in node.order_by + ordering_cols = windows.get_window_order_by( + node.order_by, override_null_order=True ) aggregations: tuple[tuple[str, sge.Expression], ...] = tuple( - (id.sql, aggregate_compiler.compile_aggregate(agg, order_by=ordering_cols)) + ( + id.sql, + aggregate_compiler.compile_aggregate( + agg, order_by=ordering_cols if ordering_cols else () + ), + ) for agg, id in node.aggregations ) by_cols: tuple[sge.Expression, ...] = tuple( diff --git a/tests/unit/core/compile/sqlglot/aggregations/test_windows.py b/tests/unit/core/compile/sqlglot/aggregations/test_windows.py new file mode 100644 index 0000000000..609d3441a5 --- /dev/null +++ b/tests/unit/core/compile/sqlglot/aggregations/test_windows.py @@ -0,0 +1,141 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import pandas as pd +import pytest +import sqlglot.expressions as sge + +from bigframes.core import window_spec +from bigframes.core.compile.sqlglot.aggregations.windows import ( + apply_window_if_present, + get_window_order_by, +) +import bigframes.core.expression as ex +import bigframes.core.ordering as ordering + + +class WindowsTest(unittest.TestCase): + def test_get_window_order_by_empty(self): + self.assertIsNone(get_window_order_by(tuple())) + + def test_get_window_order_by(self): + result = get_window_order_by((ordering.OrderingExpression(ex.deref("col1")),)) + self.assertEqual( + sge.Order(expressions=result).sql(dialect="bigquery"), + "ORDER BY `col1` ASC NULLS LAST", + ) + + def test_get_window_order_by_override_nulls(self): + result = get_window_order_by( + (ordering.OrderingExpression(ex.deref("col1")),), + override_null_order=True, + ) + self.assertEqual( + sge.Order(expressions=result).sql(dialect="bigquery"), + "ORDER BY `col1` IS NULL ASC NULLS LAST, `col1` ASC NULLS LAST", + ) + + def test_get_window_order_by_override_nulls_desc(self): + result = get_window_order_by( + ( + ordering.OrderingExpression( + ex.deref("col1"), + direction=ordering.OrderingDirection.DESC, + na_last=False, + ), + ), + override_null_order=True, + ) + self.assertEqual( + sge.Order(expressions=result).sql(dialect="bigquery"), + "ORDER BY `col1` IS NULL DESC NULLS FIRST, `col1` DESC NULLS FIRST", + ) + + def test_apply_window_if_present_no_window(self): + value = sge.func( + "SUM", sge.Column(this=sge.to_identifier("col_0", quoted=True)) + ) + result = apply_window_if_present(value) + self.assertEqual(result, value) + + def test_apply_window_if_present_row_bounded_no_ordering_raises(self): + with pytest.raises( + ValueError, match="No ordering provided for ordered analytic function" + ): + apply_window_if_present( + sge.Var(this="value"), + window_spec.WindowSpec( + bounds=window_spec.RowsWindowBounds(start=-1, end=1) + ), + ) + + def test_apply_window_if_present_unbounded_grouping_no_ordering(self): + result = apply_window_if_present( + sge.Var(this="value"), + window_spec.WindowSpec( + grouping_keys=(ex.deref("col1"),), + ), + ) + self.assertEqual( + result.sql(dialect="bigquery"), + "value OVER (PARTITION BY `col1`)", + ) + + def test_apply_window_if_present_range_bounded(self): + result = apply_window_if_present( + sge.Var(this="value"), + window_spec.WindowSpec( + ordering=(ordering.OrderingExpression(ex.deref("col1")),), + bounds=window_spec.RangeWindowBounds(start=None, end=pd.Timedelta(0)), + ), + ) + self.assertEqual( + result.sql(dialect="bigquery"), + "value OVER (ORDER BY `col1` ASC NULLS LAST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)", + ) + + def test_apply_window_if_present_range_bounded_timedelta(self): + result = apply_window_if_present( + sge.Var(this="value"), + window_spec.WindowSpec( + ordering=(ordering.OrderingExpression(ex.deref("col1")),), + bounds=window_spec.RangeWindowBounds( + start=pd.Timedelta(days=-1), end=pd.Timedelta(hours=12) + ), + ), + ) + self.assertEqual( + result.sql(dialect="bigquery"), + "value OVER (ORDER BY `col1` ASC NULLS LAST RANGE BETWEEN 86400000000 PRECEDING AND 43200000000 FOLLOWING)", + ) + + def test_apply_window_if_present_all_params(self): + result = apply_window_if_present( + sge.Var(this="value"), + window_spec.WindowSpec( + grouping_keys=(ex.deref("col1"),), + ordering=(ordering.OrderingExpression(ex.deref("col2")),), + bounds=window_spec.RowsWindowBounds(start=-1, end=0), + ), + ) + self.assertEqual( + result.sql(dialect="bigquery"), + "value OVER (PARTITION BY `col1` ORDER BY `col2` IS NULL ASC NULLS LAST, `col2` ASC NULLS LAST ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)", + ) + + +if __name__ == "__main__": + unittest.main()