diff --git a/dask_sql/physical/rel/custom/create_model.py b/dask_sql/physical/rel/custom/create_model.py index 2e6cdeb0a..1a8f89846 100644 --- a/dask_sql/physical/rel/custom/create_model.py +++ b/dask_sql/physical/rel/custom/create_model.py @@ -1,6 +1,7 @@ import logging from typing import TYPE_CHECKING +import numpy as np from dask import delayed from dask_sql.datacontainer import DataContainer @@ -183,7 +184,16 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)] model = delayed_model[0].compute() - model = ParallelPostFit(estimator=model) + if "sklearn" in model_class: + output_meta = np.array([]) + model = ParallelPostFit( + estimator=model, + predict_meta=output_meta, + predict_proba_meta=output_meta, + transform_meta=output_meta, + ) + else: + model = ParallelPostFit(estimator=model) else: model.fit(X, y, **fit_kwargs) diff --git a/dask_sql/physical/rel/custom/predict.py b/dask_sql/physical/rel/custom/predict.py index eb5e4b69f..dfdf29a25 100644 --- a/dask_sql/physical/rel/custom/predict.py +++ b/dask_sql/physical/rel/custom/predict.py @@ -2,6 +2,15 @@ import uuid from typing import TYPE_CHECKING +import numpy as np + +try: + from dask_ml.wrappers import ParallelPostFit + + dask_ml_flag = True +except ImportError: # pragma: no cover + dask_ml_flag = False + from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.physical.rel.base import BaseRelPlugin @@ -59,7 +68,27 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai model, training_columns = context.schema[schema_name].models[model_name] df = context.sql(sql_select) - prediction = model.predict(df[training_columns]) + part = df[training_columns] + + if dask_ml_flag: + if isinstance(model, ParallelPostFit): + output_meta = model.predict_meta + if output_meta is None: + output_meta = model.estimator.predict(part._meta_nonempty) + try: + prediction = part.map_partitions( + self._predict, + output_meta, + model.estimator, + meta=output_meta, + ) + except ValueError: + prediction = model.predict(part) + else: + prediction = model.predict(part) + else: + prediction = model.predict(part) + predicted_df = df.assign(target=prediction) # Create a temporary context, which includes the @@ -79,3 +108,39 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai dc = DataContainer(predicted_df, cc) return dc + + def _predict(self, part, predict_meta, estimator): + if part.shape[0] == 0 and predict_meta is not None: + empty_output = self.handle_empty_partitions(predict_meta) + if empty_output is not None: + return empty_output + return estimator.predict(part) + + def handle_empty_partitions(self, output_meta): + if hasattr(output_meta, "__array_function__"): + if len(output_meta.shape) == 1: + shape = 0 + else: + shape = list(output_meta.shape) + shape[0] = 0 + ar = np.zeros( + shape=shape, + dtype=output_meta.dtype, + like=output_meta, + ) + return ar + elif "scipy.sparse" in type(output_meta).__module__: + # sparse matrices don't support + # `like` due to non implimented __array_function__ + # Refer https://github.com/scipy/scipy/issues/10362 + # Note below works for both cupy and scipy sparse matrices + if len(output_meta.shape) == 1: + shape = 0 + else: + shape = list(output_meta.shape) + shape[0] = 0 + + ar = type(output_meta)(shape, dtype=output_meta.dtype) + return ar + elif hasattr(output_meta, "iloc"): + return output_meta.iloc[:0, :] diff --git a/tests/integration/test_model.py b/tests/integration/test_model.py index 044a56fcc..8c97b770c 100644 --- a/tests/integration/test_model.py +++ b/tests/integration/test_model.py @@ -924,3 +924,79 @@ def test_experiment_automl_regressor(c, client, training_df): ), "Best model was not registered" check_trained_model(c, "my_automl_exp2") + + +# TODO - many ML tests fail on clusters without sklearn - can we avoid this? +@skip_if_external_scheduler +def test_predict_with_nullable_types(c): + df = pd.DataFrame( + { + "rough_day_of_year": [0, 1, 2, 3], + "prev_day_inches_rained": [0.0, 1.0, 2.0, 3.0], + "rained": [False, False, False, True], + } + ) + c.create_table("train_set", df) + + model_class = "'sklearn.linear_model.LogisticRegression'" + + c.sql( + f""" + CREATE OR REPLACE MODEL model WITH ( + model_class = {model_class}, + wrap_predict = True, + wrap_fit = False, + target_column = 'rained' + ) AS ( + SELECT * + FROM train_set + ) + """ + ) + + expected = c.sql( + """ + SELECT * FROM PREDICT( + MODEL model, + SELECT * FROM train_set + ) + """ + ) + + df = pd.DataFrame( + { + "rough_day_of_year": pd.Series([0, 1, 2, 3], dtype="Int32"), + "prev_day_inches_rained": pd.Series([0.0, 1.0, 2.0, 3.0], dtype="Float32"), + "rained": pd.Series([False, False, False, True]), + } + ) + c.create_table("train_set", df) + + c.sql( + f""" + CREATE OR REPLACE MODEL model WITH ( + model_class = {model_class}, + wrap_predict = True, + wrap_fit = False, + target_column = 'rained' + ) AS ( + SELECT * + FROM train_set + ) + """ + ) + + result = c.sql( + """ + SELECT * FROM PREDICT( + MODEL model, + SELECT * FROM train_set + ) + """ + ) + + assert_eq( + expected, + result, + check_dtype=False, + )