Skip to content

Commit fbe74da

Browse files
committed
address PR feedback
1 parent 9b231bb commit fbe74da

File tree

3 files changed

+302
-64
lines changed

3 files changed

+302
-64
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 5 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ use crate::{
2525
apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
2626
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
2727
};
28-
use arrow::compute::can_cast_types;
29-
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3028
use datafusion_datasource::file_meta::FileMeta;
3129
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
3230
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
@@ -539,7 +537,7 @@ fn should_enable_page_index(
539537
.unwrap_or(false)
540538
}
541539

542-
use datafusion_physical_expr::expressions;
540+
use datafusion_physical_expr::PhysicalExprSchemaRewriter;
543541

544542
/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new [`PhysicalExpr`] that
545543
/// is cast to the specified data type.
@@ -553,68 +551,11 @@ pub fn cast_expr_to_schema(
553551
partition_values: Vec<ScalarValue>,
554552
partition_fields: &[FieldRef],
555553
) -> Result<Arc<dyn PhysicalExpr>> {
556-
expr.transform(|expr| {
557-
if let Some(column) = expr.as_any().downcast_ref::<expressions::Column>() {
558-
let logical_field = match logical_file_schema.field_with_name(column.name()) {
559-
Ok(field) => field,
560-
Err(e) => {
561-
// If the column is a partition field, we can use the partition value
562-
for (partition_field, partition_value) in
563-
partition_fields.iter().zip(partition_values.iter())
564-
{
565-
if partition_field.name() == column.name() {
566-
return Ok(Transformed::yes(expressions::lit(
567-
partition_value.clone(),
568-
)));
569-
}
570-
}
571-
// If the column is not found in the logical schema, return an error
572-
// This should probably never be hit unless something upstream broke, but nontheless it's better
573-
// for us to return a handleable error than to panic / do something unexpected.
574-
return Err(e.into());
575-
}
576-
};
577-
let Ok(physical_field) = physical_file_schema.field_with_name(column.name())
578-
else {
579-
if !logical_field.is_nullable() {
580-
return exec_err!(
581-
"Non-nullable column '{}' is missing from the physical schema",
582-
column.name()
583-
);
584-
}
585-
// If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do.
586-
// TODO: do we need to sync this with what the `SchemaAdapter` actually does?
587-
// While the default implementation fills in nulls in theory a custom `SchemaAdapter` could do something else!
588-
let value = ScalarValue::Null.cast_to(logical_field.data_type())?;
589-
return Ok(Transformed::yes(expressions::lit(value)));
590-
};
591-
592-
if logical_field.data_type() == physical_field.data_type() {
593-
return Ok(Transformed::no(expr));
594-
}
595-
596-
// If the logical field and physical field are different, we need to cast
597-
// the column to the logical field's data type.
598-
// We will try later to move the cast to literal values if possible, which is computationally cheaper.
599-
if !can_cast_types(logical_field.data_type(), physical_field.data_type()) {
600-
return exec_err!(
601-
"Cannot cast column '{}' from '{}' (file data type) to '{}' (table data type)",
602-
column.name(),
603-
logical_field.data_type(),
604-
physical_field.data_type()
605-
);
606-
}
607-
let casted_expr = Arc::new(expressions::CastExpr::new(
608-
expr,
609-
logical_field.data_type().clone(),
610-
None,
611-
));
612-
return Ok(Transformed::yes(casted_expr));
613-
}
554+
let rewriter =
555+
PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema)
556+
.with_partition_columns(partition_fields.to_vec(), partition_values);
614557

615-
Ok(Transformed::no(expr))
616-
})
617-
.data()
558+
rewriter.rewrite(expr)
618559
}
619560

620561
#[cfg(test)]

datafusion/physical-expr/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ mod partitioning;
3737
mod physical_expr;
3838
pub mod planner;
3939
mod scalar_function;
40+
pub mod schema_rewriter;
4041
pub mod statistics;
4142
pub mod utils;
4243
pub mod window;
@@ -67,6 +68,7 @@ pub use datafusion_physical_expr_common::sort_expr::{
6768

6869
pub use planner::{create_physical_expr, create_physical_exprs};
6970
pub use scalar_function::ScalarFunctionExpr;
71+
pub use schema_rewriter::PhysicalExprSchemaRewriter;
7072
pub use utils::{conjunction, conjunction_opt, split_conjunction};
7173

7274
// For backwards compatibility

0 commit comments

Comments
 (0)