Skip to content

refactor: add apply_window_if_present and get_window_order_by methods #1947

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from bigframes.core import window_spec
import bigframes.core.compile.sqlglot.aggregations.op_registration as reg
from bigframes.core.compile.sqlglot.aggregations.utils import apply_window_if_present
from bigframes.core.compile.sqlglot.aggregations.windows import apply_window_if_present
from bigframes.operations import aggregations as agg_ops

NULLARY_OP_REGISTRATION = reg.OpRegistration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from bigframes.core import window_spec
import bigframes.core.compile.sqlglot.aggregations.op_registration as reg
from bigframes.core.compile.sqlglot.aggregations.utils import apply_window_if_present
from bigframes.core.compile.sqlglot.aggregations.windows import apply_window_if_present
import bigframes.core.compile.sqlglot.expressions.typed_expr as typed_expr
import bigframes.core.compile.sqlglot.sqlglot_ir as ir
from bigframes.operations import aggregations as agg_ops
Expand Down
29 changes: 0 additions & 29 deletions bigframes/core/compile/sqlglot/aggregations/utils.py

This file was deleted.

153 changes: 153 additions & 0 deletions bigframes/core/compile/sqlglot/aggregations/windows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import typing

import sqlglot.expressions as sge

from bigframes.core import utils, window_spec
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
import bigframes.core.ordering as ordering_spec


def apply_window_if_present(
value: sge.Expression,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
if window is None:
return value

if window.is_row_bounded and not window.ordering:
raise ValueError("No ordering provided for ordered analytic function")
elif (
not window.is_row_bounded
and not window.is_range_bounded
and not window.ordering
):
# Unbound grouping window.
order_by = None
elif window.is_range_bounded:
# Note that, when the window is range-bounded, we only need one ordering key.
# There are two reasons:
# 1. Manipulating null positions requires more than one ordering key, which
# is forbidden by SQL window syntax for range rolling.
# 2. Pandas does not allow range rolling on timeseries with nulls.
order_by = get_window_order_by((window.ordering[0],), override_null_order=False)
else:
order_by = get_window_order_by(window.ordering, override_null_order=True)

order = sge.Order(expressions=order_by) if order_by else None

group_by = (
[scalar_compiler.compile_scalar_expression(key) for key in window.grouping_keys]
if window.grouping_keys
else None
)

# This is the key change. Don't create a spec for the default window frame
# if there's no ordering. This avoids generating an `ORDER BY NULL` clause.
if not window.bounds and not order:
return sge.Window(this=value, partition_by=group_by)

kind = (
"ROWS" if isinstance(window.bounds, window_spec.RowsWindowBounds) else "RANGE"
)

start: typing.Union[int, float, None] = None
end: typing.Union[int, float, None] = None
if isinstance(window.bounds, window_spec.RangeWindowBounds):
if window.bounds.start is not None:
start = utils.timedelta_to_micros(window.bounds.start)
if window.bounds.end is not None:
end = utils.timedelta_to_micros(window.bounds.end)
elif window.bounds:
start = window.bounds.start
end = window.bounds.end

start_value, start_side = _get_window_bounds(start, is_preceding=True)
end_value, end_side = _get_window_bounds(end, is_preceding=False)

spec = sge.WindowSpec(
kind=kind,
start=start_value,
start_side=start_side,
end=end_value,
end_side=end_side,
over="OVER",
)

return sge.Window(this=value, partition_by=group_by, order=order, spec=spec)


def get_window_order_by(
ordering: typing.Tuple[ordering_spec.OrderingExpression, ...],
override_null_order: bool = False,
) -> typing.Optional[tuple[sge.Ordered, ...]]:
"""Returns the SQL order by clause for a window specification."""
if not ordering:
return None

order_by = []
for ordering_spec_item in ordering:
expr = scalar_compiler.compile_scalar_expression(
ordering_spec_item.scalar_expression
)
desc = not ordering_spec_item.direction.is_ascending
nulls_first = not ordering_spec_item.na_last

if override_null_order:
# Bigquery SQL considers NULLS to be "smallest" values, but we need
# to override in these cases.
is_null_expr = sge.Is(this=expr, expression=sge.Null())
if nulls_first and desc:
order_by.append(
sge.Ordered(
this=is_null_expr,
desc=desc,
nulls_first=nulls_first,
)
)
elif not nulls_first and not desc:
order_by.append(
sge.Ordered(
this=is_null_expr,
desc=desc,
nulls_first=nulls_first,
)
)

order_by.append(
sge.Ordered(
this=expr,
desc=desc,
nulls_first=nulls_first,
)
)
return tuple(order_by)


def _get_window_bounds(
value, is_preceding: bool
) -> tuple[typing.Union[str, sge.Expression], typing.Optional[str]]:
"""Compiles a single boundary value into its SQL components."""
if value is None:
side = "PRECEDING" if is_preceding else "FOLLOWING"
return "UNBOUNDED", side

if value == 0:
return "CURRENT ROW", None

side = "PRECEDING" if value < 0 else "FOLLOWING"
return sge.convert(abs(value)), side
19 changes: 9 additions & 10 deletions bigframes/core/compile/sqlglot/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from bigframes.core import expression, guid, identifiers, nodes, pyarrow_utils, rewrite
from bigframes.core.compile import configs
import bigframes.core.compile.sqlglot.aggregate_compiler as aggregate_compiler
from bigframes.core.compile.sqlglot.aggregations import windows
from bigframes.core.compile.sqlglot.expressions import typed_expr
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
import bigframes.core.compile.sqlglot.sqlglot_ir as ir
Expand Down Expand Up @@ -272,18 +273,16 @@ def compile_random_sample(
def compile_aggregate(
self, node: nodes.AggregateNode, child: ir.SQLGlotIR
) -> ir.SQLGlotIR:
ordering_cols = tuple(
sge.Ordered(
this=scalar_compiler.compile_scalar_expression(
ordering.scalar_expression
),
desc=ordering.direction.is_ascending is False,
nulls_first=ordering.na_last is False,
)
for ordering in node.order_by
ordering_cols = windows.get_window_order_by(
node.order_by, override_null_order=True
)
aggregations: tuple[tuple[str, sge.Expression], ...] = tuple(
(id.sql, aggregate_compiler.compile_aggregate(agg, order_by=ordering_cols))
(
id.sql,
aggregate_compiler.compile_aggregate(
agg, order_by=ordering_cols if ordering_cols else ()
),
)
for agg, id in node.aggregations
)
by_cols: tuple[sge.Expression, ...] = tuple(
Expand Down
141 changes: 141 additions & 0 deletions tests/unit/core/compile/sqlglot/aggregations/test_windows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

import pandas as pd
import pytest
import sqlglot.expressions as sge

from bigframes.core import window_spec
from bigframes.core.compile.sqlglot.aggregations.windows import (
apply_window_if_present,
get_window_order_by,
)
import bigframes.core.expression as ex
import bigframes.core.ordering as ordering


class WindowsTest(unittest.TestCase):
def test_get_window_order_by_empty(self):
self.assertIsNone(get_window_order_by(tuple()))

def test_get_window_order_by(self):
result = get_window_order_by((ordering.OrderingExpression(ex.deref("col1")),))
self.assertEqual(
sge.Order(expressions=result).sql(dialect="bigquery"),
"ORDER BY `col1` ASC NULLS LAST",
)

def test_get_window_order_by_override_nulls(self):
result = get_window_order_by(
(ordering.OrderingExpression(ex.deref("col1")),),
override_null_order=True,
)
self.assertEqual(
sge.Order(expressions=result).sql(dialect="bigquery"),
"ORDER BY `col1` IS NULL ASC NULLS LAST, `col1` ASC NULLS LAST",
)

def test_get_window_order_by_override_nulls_desc(self):
result = get_window_order_by(
(
ordering.OrderingExpression(
ex.deref("col1"),
direction=ordering.OrderingDirection.DESC,
na_last=False,
),
),
override_null_order=True,
)
self.assertEqual(
sge.Order(expressions=result).sql(dialect="bigquery"),
"ORDER BY `col1` IS NULL DESC NULLS FIRST, `col1` DESC NULLS FIRST",
)

def test_apply_window_if_present_no_window(self):
value = sge.func(
"SUM", sge.Column(this=sge.to_identifier("col_0", quoted=True))
)
result = apply_window_if_present(value)
self.assertEqual(result, value)

def test_apply_window_if_present_row_bounded_no_ordering_raises(self):
with pytest.raises(
ValueError, match="No ordering provided for ordered analytic function"
):
apply_window_if_present(
sge.Var(this="value"),
window_spec.WindowSpec(
bounds=window_spec.RowsWindowBounds(start=-1, end=1)
),
)

def test_apply_window_if_present_unbounded_grouping_no_ordering(self):
result = apply_window_if_present(
sge.Var(this="value"),
window_spec.WindowSpec(
grouping_keys=(ex.deref("col1"),),
),
)
self.assertEqual(
result.sql(dialect="bigquery"),
"value OVER (PARTITION BY `col1`)",
)

def test_apply_window_if_present_range_bounded(self):
result = apply_window_if_present(
sge.Var(this="value"),
window_spec.WindowSpec(
ordering=(ordering.OrderingExpression(ex.deref("col1")),),
bounds=window_spec.RangeWindowBounds(start=None, end=pd.Timedelta(0)),
),
)
self.assertEqual(
result.sql(dialect="bigquery"),
"value OVER (ORDER BY `col1` ASC NULLS LAST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)",
)

def test_apply_window_if_present_range_bounded_timedelta(self):
result = apply_window_if_present(
sge.Var(this="value"),
window_spec.WindowSpec(
ordering=(ordering.OrderingExpression(ex.deref("col1")),),
bounds=window_spec.RangeWindowBounds(
start=pd.Timedelta(days=-1), end=pd.Timedelta(hours=12)
),
),
)
self.assertEqual(
result.sql(dialect="bigquery"),
"value OVER (ORDER BY `col1` ASC NULLS LAST RANGE BETWEEN 86400000000 PRECEDING AND 43200000000 FOLLOWING)",
)

def test_apply_window_if_present_all_params(self):
result = apply_window_if_present(
sge.Var(this="value"),
window_spec.WindowSpec(
grouping_keys=(ex.deref("col1"),),
ordering=(ordering.OrderingExpression(ex.deref("col2")),),
bounds=window_spec.RowsWindowBounds(start=-1, end=0),
),
)
self.assertEqual(
result.sql(dialect="bigquery"),
"value OVER (PARTITION BY `col1` ORDER BY `col2` IS NULL ASC NULLS LAST, `col2` ASC NULLS LAST ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)",
)


if __name__ == "__main__":
unittest.main()