Skip to content

refactor: Refactor udf definitions #1814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 0 additions & 48 deletions bigframes/core/compile/ibis_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,14 @@
# limitations under the License.
from __future__ import annotations

import typing
from typing import cast, Dict, Iterable, Optional, Tuple, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.ibis
import bigframes_vendored.ibis.backends.bigquery.datatypes as third_party_ibis_bqtypes
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
from bigframes_vendored.ibis.expr.datatypes.core import (
dtype as python_type_to_ibis_type,
)
import bigframes_vendored.ibis.expr.types as ibis_types
import db_dtypes # type: ignore
import geopandas as gpd # type: ignore
import google.cloud.bigquery as bigquery
import pandas as pd
import pyarrow as pa

Expand Down Expand Up @@ -439,45 +433,3 @@ def literal_to_ibis_scalar(
)

return scalar_expr


class UnsupportedTypeError(ValueError):
def __init__(self, type_, supported_types):
self.type = type_
self.supported_types = supported_types
super().__init__(
f"'{type_}' is not one of the supported types {supported_types}"
)


def ibis_type_from_python_type(t: type) -> ibis_dtypes.DataType:
if t not in bigframes.dtypes.RF_SUPPORTED_IO_PYTHON_TYPES:
raise UnsupportedTypeError(t, bigframes.dtypes.RF_SUPPORTED_IO_PYTHON_TYPES)
return python_type_to_ibis_type(t)


def ibis_array_output_type_from_python_type(t: type) -> ibis_dtypes.DataType:
array_of = typing.get_args(t)[0]
if array_of not in bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES:
raise UnsupportedTypeError(
array_of, bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES
)
return python_type_to_ibis_type(t)


def ibis_type_from_bigquery_type(
type_: bigquery.StandardSqlDataType,
) -> ibis_dtypes.DataType:
"""Convert bq type to ibis. Only to be used for remote functions, does not handle all types."""
if type_.type_kind not in bigframes.dtypes.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS:
raise UnsupportedTypeError(
type_.type_kind, bigframes.dtypes.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS
)
elif type_.type_kind == "ARRAY":
return ibis_dtypes.Array(
value_type=ibis_type_from_bigquery_type(
typing.cast(bigquery.StandardSqlDataType, type_.array_element_type)
)
)
else:
return third_party_ibis_bqtypes.BigQueryType.to_ibis(type_.type_kind)
83 changes: 49 additions & 34 deletions bigframes/core/compile/scalar_op_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import functools
import typing

import bigframes_vendored.constants as constants
import bigframes_vendored.ibis.expr.api as ibis_api
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
import bigframes_vendored.ibis.expr.operations.generic as ibis_generic
Expand All @@ -30,6 +29,7 @@
import bigframes.core.compile.default_ordering
import bigframes.core.compile.ibis_types
import bigframes.core.expression as ex
import bigframes.dtypes
import bigframes.operations as ops

_ZERO = typing.cast(ibis_types.NumericValue, ibis_types.literal(0))
Expand Down Expand Up @@ -1284,17 +1284,58 @@ def timedelta_floor_op_impl(x: ibis_types.NumericValue):

@scalar_op_compiler.register_unary_op(ops.RemoteFunctionOp, pass_op=True)
def remote_function_op_impl(x: ibis_types.Value, op: ops.RemoteFunctionOp):
ibis_node = getattr(op.func, "ibis_node", None)
if ibis_node is None:
raise TypeError(
f"only a bigframes remote function is supported as a callable. {constants.FEEDBACK_LINK}"
)
x_transformed = ibis_node(x)
udf_sig = op.function_def.signature
ibis_py_sig = (udf_sig.py_input_types, udf_sig.py_output_type)

@ibis_udf.scalar.builtin(
name=str(op.function_def.routine_ref), signature=ibis_py_sig
)
def udf(input):
...

x_transformed = udf(x)
if not op.apply_on_null:
x_transformed = ibis_api.case().when(x.isnull(), x).else_(x_transformed).end()
return ibis_api.case().when(x.isnull(), x).else_(x_transformed).end()
return x_transformed


@scalar_op_compiler.register_binary_op(ops.BinaryRemoteFunctionOp, pass_op=True)
def binary_remote_function_op_impl(
x: ibis_types.Value, y: ibis_types.Value, op: ops.BinaryRemoteFunctionOp
):
udf_sig = op.function_def.signature
ibis_py_sig = (udf_sig.py_input_types, udf_sig.py_output_type)

@ibis_udf.scalar.builtin(
name=str(op.function_def.routine_ref), signature=ibis_py_sig
)
def udf(input1, input2):
...

x_transformed = udf(x, y)
return x_transformed


@scalar_op_compiler.register_nary_op(ops.NaryRemoteFunctionOp, pass_op=True)
def nary_remote_function_op_impl(
*operands: ibis_types.Value, op: ops.NaryRemoteFunctionOp
):
udf_sig = op.function_def.signature
ibis_py_sig = (udf_sig.py_input_types, udf_sig.py_output_type)
arg_names = tuple(arg.name for arg in udf_sig.input_types)

@ibis_udf.scalar.builtin(
name=str(op.function_def.routine_ref),
signature=ibis_py_sig,
param_name_overrides=arg_names,
)
def udf(*inputs):
...

result = udf(*operands)
return result


@scalar_op_compiler.register_unary_op(ops.MapOp, pass_op=True)
def map_op_impl(x: ibis_types.Value, op: ops.MapOp):
case = ibis_api.case()
Expand Down Expand Up @@ -1918,19 +1959,6 @@ def manhattan_distance_impl(
return vector_distance(vector1, vector2, "MANHATTAN")


@scalar_op_compiler.register_binary_op(ops.BinaryRemoteFunctionOp, pass_op=True)
def binary_remote_function_op_impl(
x: ibis_types.Value, y: ibis_types.Value, op: ops.BinaryRemoteFunctionOp
):
ibis_node = getattr(op.func, "ibis_node", None)
if ibis_node is None:
raise TypeError(
f"only a bigframes remote function is supported as a callable. {constants.FEEDBACK_LINK}"
)
x_transformed = ibis_node(x, y)
return x_transformed


# Blob Ops
@scalar_op_compiler.register_binary_op(ops.obj_make_ref_op)
def obj_make_ref_op(x: ibis_types.Value, y: ibis_types.Value):
Expand Down Expand Up @@ -1992,19 +2020,6 @@ def case_when_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value:
return case_val.end() # type: ignore


@scalar_op_compiler.register_nary_op(ops.NaryRemoteFunctionOp, pass_op=True)
def nary_remote_function_op_impl(
*operands: ibis_types.Value, op: ops.NaryRemoteFunctionOp
):
ibis_node = getattr(op.func, "ibis_node", None)
if ibis_node is None:
raise TypeError(
f"only a bigframes remote function is supported as a callable. {constants.FEEDBACK_LINK}"
)
result = ibis_node(*operands)
return result


@scalar_op_compiler.register_nary_op(ops.SqlScalarOp, pass_op=True)
def sql_scalar_op_impl(*operands: ibis_types.Value, op: ops.SqlScalarOp):
return ibis_generic.SqlScalar(
Expand Down
40 changes: 17 additions & 23 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import bigframes.dtypes
import bigframes.exceptions as bfe
import bigframes.formatting_helpers as formatter
import bigframes.functions
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.operations.ai
Expand Down Expand Up @@ -4470,15 +4471,17 @@ def _prepare_export(
return array_value, id_overrides

def map(self, func, na_action: Optional[str] = None) -> DataFrame:
if not callable(func):
if not isinstance(func, bigframes.functions.BigqueryCallableRoutine):
raise TypeError("the first argument must be callable")

if na_action not in {None, "ignore"}:
raise ValueError(f"na_action={na_action} not supported")

# TODO(shobs): Support **kwargs
return self._apply_unary_op(
ops.RemoteFunctionOp(func=func, apply_on_null=(na_action is None))
ops.RemoteFunctionOp(
function_def=func.udf_def, apply_on_null=(na_action is None)
)
)

def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
Expand All @@ -4492,13 +4495,18 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
)
warnings.warn(msg, category=bfe.FunctionAxisOnePreviewWarning)

if not hasattr(func, "bigframes_bigquery_function"):
if not isinstance(
func,
(
bigframes.functions.BigqueryCallableRoutine,
bigframes.functions.BigqueryCallableRowRoutine,
),
):
raise ValueError(
"For axis=1 a BigFrames BigQuery function must be used."
)

is_row_processor = getattr(func, "is_row_processor")
if is_row_processor:
if func.is_row_processor:
# Early check whether the dataframe dtypes are currently supported
# in the bigquery function
# NOTE: Keep in sync with the value converters used in the gcf code
Expand Down Expand Up @@ -4552,7 +4560,7 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):

# Apply the function
result_series = rows_as_json_series._apply_unary_op(
ops.RemoteFunctionOp(func=func, apply_on_null=True)
ops.RemoteFunctionOp(function_def=func.udf_def, apply_on_null=True)
)
else:
# This is a special case where we are providing not-pandas-like
Expand All @@ -4567,7 +4575,7 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
# compatible with the data types of the input params
# 3. The order of the columns in the dataframe must correspond
# to the order of the input params in the function
udf_input_dtypes = getattr(func, "input_dtypes")
udf_input_dtypes = func.udf_def.signature.bf_input_types
if len(udf_input_dtypes) != len(self.columns):
raise ValueError(
f"BigFrames BigQuery function takes {len(udf_input_dtypes)}"
Expand All @@ -4581,25 +4589,11 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):

series_list = [self[col] for col in self.columns]
result_series = series_list[0]._apply_nary_op(
ops.NaryRemoteFunctionOp(func=func), series_list[1:]
ops.NaryRemoteFunctionOp(function_def=func.udf_def), series_list[1:]
)
result_series.name = None

# If the result type is string but the function output is intended
# to be an array, reconstruct the array from the string assuming it
# is a json serialized form of the array.
if bigframes.dtypes.is_string_like(
result_series.dtype
) and bigframes.dtypes.is_array_like(func.output_dtype):
import bigframes.bigquery as bbq

result_dtype = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(
func.output_dtype.pyarrow_dtype.value_type
)
result_series = bbq.json_extract_string_array(
result_series, value_dtype=result_dtype
)

result_series = func._post_process_series(result_series)
return result_series

# At this point column-wise or element-wise bigquery function operation will
Expand Down
28 changes: 0 additions & 28 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,32 +870,4 @@ def lcd_type_or_throw(dtype1: Dtype, dtype2: Dtype) -> Dtype:
return result


### Remote functions use only
# TODO: Refactor into remote function module

# Input and output types supported by BigQuery DataFrames remote functions.
# TODO(shobs): Extend the support to all types supported by BQ remote functions
# https://cloud.google.com/bigquery/docs/remote-functions#limitations
RF_SUPPORTED_IO_PYTHON_TYPES = {bool, bytes, float, int, str}

# Support array output types in BigQuery DataFrames remote functions even though
# it is not currently (2024-10-06) supported in BigQuery remote functions.
# https://cloud.google.com/bigquery/docs/remote-functions#limitations
# TODO(b/284515241): remove this special handling when BigQuery remote functions
# support array.
RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES = {bool, float, int, str}

RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS = {
"BOOLEAN",
"BOOL",
"BYTES",
"FLOAT",
"FLOAT64",
"INT64",
"INTEGER",
"STRING",
"ARRAY",
}


TIMEDELTA_DESCRIPTION_TAG = "#microseconds"
9 changes: 9 additions & 0 deletions bigframes/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from bigframes.functions.function import (
BigqueryCallableRoutine,
BigqueryCallableRowRoutine,
)

__all__ = [
"BigqueryCallableRoutine",
"BigqueryCallableRowRoutine",
]
Loading