Skip to content

adapt filter expressions to file schema during parquet scan #16461

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

adriangb
Copy link
Contributor

@adriangb adriangb commented Jun 19, 2025

The idea here is to move us one step closer to #15780 although there is still work to do (e.g. #15780 (comment)).

My goal is that this immediately unblocks other work (in particular #15057) while being a relatively small change that we can build upon with more optimizations, etc.

@github-actions github-actions bot added the datasource Changes to the datasource crate label Jun 19, 2025
Comment on lines 553 to 556
return exec_err!(
"Non-nullable column '{}' is missing from the physical schema",
column.name()
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be useful to include some sort of file identifier here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed -- that might also be a nice usecase for a builder style struct

let new_predicate = PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema)
  .with_identifier(file_name)
  .convert(predicate)?;

🤔

let stream = opener.open(make_meta(), file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 1);
assert_eq!(num_rows, 1);
Copy link
Contributor Author

@adriangb adriangb Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion fails on main: all 3 rows are passed because the row filter cannot handle the partition columns. This PR somewhat coincidentally happens to allow the row filter to handle predicates that depend on partition and data columns!

@adriangb adriangb requested a review from alamb June 19, 2025 19:20
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @adriangb -- other than the removed test I think this PR makes sense to me and could be merged

I think it would be useful to refactor this code a bit and invest in testing infrastructure as we proceed towards adding nicer features (like unwrapping casts on columns for example)

/// Preference is always given to casting literal values to the data type of the column
/// since casting the column to the literal value's data type can be significantly more expensive.
/// Given two columns the cast is applied arbitrarily to the first column.
pub fn cast_expr_to_schema(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is more general than just parquet, so perhaps we could move it into the datafusion-physical-schema crate or perhaps somewhere near the physical planner

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think it would really help (perhaps as a follow on PR) to add some more specific unit tests.

I wonder if an API like the following makes sense:

struct PhysicalExprSchemaRewriter { 
...
}


// rewrite a predicate
let new_predicate = PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema)
  // optionally provide partition values
  .with_partition_columns(partition_fields, partition_values
  .convert(predicate)?;

Then I think writing unit tests would be easy and we could adapt / extend the code over time -- and it would set us up for adapting more sophisticated expressions like field extraction...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, I will work on this next! Agreed on the unit tests 😄


assert!(candidate.is_none());
}

#[test]
fn test_filter_type_coercion() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to keep this test (or something like it) that shows reading with a predicate of a different shcema is correctly coerced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so - I'll have to make it more e2e since it is no longer specific to the row filter. We have some other similar tests, I'll work this into there

Comment on lines 557 to 558
if let Some(column) = expr.as_any().downcast_ref::<expressions::Column>() {
let logical_field = match logical_file_schema.field_with_name(column.name()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could reduce a level of nesting with something like

Suggested change
if let Some(column) = expr.as_any().downcast_ref::<expressions::Column>() {
let logical_field = match logical_file_schema.field_with_name(column.name()) {
let Some(column) = expr.as_any().downcast_ref::<expressions::Column>() else {
return Ok(Transformed::no(expr))
}

}
// If the column is not found in the logical schema, return an error
// This should probably never be hit unless something upstream broke, but nontheless it's better
// for us to return a handleable error than to panic / do something unexpected.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines 553 to 556
return exec_err!(
"Non-nullable column '{}' is missing from the physical schema",
column.name()
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed -- that might also be a nice usecase for a builder style struct

let new_predicate = PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema)
  .with_identifier(file_name)
  .convert(predicate)?;

🤔


// If the logical field and physical field are different, we need to cast
// the column to the logical field's data type.
// We will try later to move the cast to literal values if possible, which is computationally cheaper.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@github-actions github-actions bot added the physical-expr Changes to the physical-expr crates label Jun 21, 2025
@adriangb
Copy link
Contributor Author

@alamb I've created the builder, moved the implementation and added some unit tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datasource Changes to the datasource crate physical-expr Changes to the physical-expr crates
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants