Skip to content

feat: rewrite subquery into dependent join logical plan #16016

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 82 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
4ba36c0
chore: add test
duongcongtoai Feb 3, 2025
79eaca3
chore: more progress
duongcongtoai Feb 10, 2025
7ed0831
temp
duongcongtoai Mar 18, 2025
cc97879
Merge remote-tracking branch 'origin/main' into 14554-unnest-subquery…
duongcongtoai Mar 18, 2025
5096937
Merge remote-tracking branch 'origin/main' into 14554-unnest-subquery…
duongcongtoai Apr 10, 2025
68fd9ca
chore: some work
duongcongtoai Apr 16, 2025
ace332e
chore: some work on indexed algebra
duongcongtoai Apr 27, 2025
da8980c
chore: more progress
duongcongtoai May 4, 2025
483e3ac
chore: impl projection pull up
duongcongtoai May 4, 2025
f14b145
chore: complete unnesting simple subquery
duongcongtoai May 6, 2025
0cd8143
chore: correct join condition
duongcongtoai May 8, 2025
cc3e01c
chore: handle exist query
duongcongtoai May 8, 2025
9b5daa2
test: in sq test
duongcongtoai May 10, 2025
f26baf8
test: exist with no dependent column
duongcongtoai May 10, 2025
37852c1
test: exist with dependent columns
duongcongtoai May 10, 2025
2544478
Merge remote-tracking branch 'origin/main' into 14554-subquery-unnest…
duongcongtoai May 10, 2025
e984a55
chore: remove redundant clone
duongcongtoai May 11, 2025
94aba08
feat: dummy implementation for aggregation
duongcongtoai May 13, 2025
0f039fe
feat: handle count bug
duongcongtoai May 15, 2025
898bdc4
feat: add sq alias step
duongcongtoai May 16, 2025
1a600b6
test: simple count decorrelate
duongcongtoai May 16, 2025
6ce21b3
chore: some work to support multiple subqueries per level
duongcongtoai May 17, 2025
67923d4
feat: support multiple subqueries decorrelation untested
duongcongtoai May 19, 2025
64538cc
feat: correct node rewriting rule
duongcongtoai May 19, 2025
957403f
fix: subquery alias
duongcongtoai May 19, 2025
a465459
fix: adjust test case expectation
duongcongtoai May 19, 2025
479ae64
feat: convert sq to dependent joins
duongcongtoai May 24, 2025
2171e52
feat: impl dependent join rewriter
duongcongtoai May 24, 2025
9d26437
chore: clean up unused function
duongcongtoai May 24, 2025
24d1223
chore: clean up debug slt
duongcongtoai May 24, 2025
3533cd1
chore: simple logical plan type for dependent join
duongcongtoai May 24, 2025
e1002f8
fix: recursive dependent join rewrite
duongcongtoai May 24, 2025
7ba92f1
Merge remote-tracking branch 'origin/main' into 14554-subquery-unnest…
duongcongtoai May 24, 2025
e3c77d6
chore: some more note on further implementation
duongcongtoai May 24, 2025
1ae0926
chore: lint
duongcongtoai May 24, 2025
d15c2aa
chore: clippy
duongcongtoai May 24, 2025
e5baf2c
fix: test
duongcongtoai May 25, 2025
11dbb80
doc: draw diagram
duongcongtoai May 25, 2025
5856213
fix: proto
duongcongtoai May 25, 2025
a3f11a8
chore: revert unrelated change
duongcongtoai May 25, 2025
e2d9d14
chore: lint
duongcongtoai May 25, 2025
b298426
fix: subtrait
duongcongtoai May 25, 2025
cb1a757
fix: subtrait again
duongcongtoai May 25, 2025
baef066
fix: fail test
duongcongtoai May 25, 2025
a07b3b0
chore: clippy
duongcongtoai May 25, 2025
32db3a9
chore: add depth and data_type to correlated columns
duongcongtoai May 26, 2025
50d26f3
chore: rm snapshot
duongcongtoai May 26, 2025
b09e370
Merge branch 'main' into 14554-subquery-unnest-framework
duongcongtoai May 26, 2025
28dc7a4
feat: support alias and join
duongcongtoai May 26, 2025
cf830cb
feat: add lateral join fields to dependent join
duongcongtoai May 26, 2025
95994da
feat: rewrite lateral join
duongcongtoai May 27, 2025
9745a4f
feat: rewrite projection
duongcongtoai May 28, 2025
c2bf4d3
refactor: split rewrite logic
duongcongtoai May 28, 2025
c083501
feat: impl other api of logical plan for dependent join
duongcongtoai May 28, 2025
9512ccc
chore: rm debug file
duongcongtoai May 28, 2025
98d1c27
fix: not expose subquery expr for dependentjoin
duongcongtoai May 29, 2025
10f9aeb
chore: add data type to correlated column
duongcongtoai Jun 7, 2025
92bb175
fix: not expose subquery expr for dependentjoin
duongcongtoai May 29, 2025
29eff4b
spilt into rewrite_dependent_join & decorrelate_dependent_join
irenjj Jun 6, 2025
f4e332e
fix: cherry-pick conflict
duongcongtoai Jun 7, 2025
2a324bd
chore: move left over commit from feature branch
duongcongtoai Jun 7, 2025
f0c9f0b
chore: minor import format
duongcongtoai Jun 7, 2025
5e67945
Merge remote-tracking branch 'origin/main' into 14554-subquery-unnest…
duongcongtoai Jun 7, 2025
e964d6e
chore: clippy
duongcongtoai Jun 7, 2025
309511c
Merge remote-tracking branch 'origin/main' into 14554-subquery-unnest…
duongcongtoai Jun 7, 2025
2eb723e
fix: err msg
duongcongtoai Jun 7, 2025
b8a8de8
test: some more test cases
duongcongtoai Jun 7, 2025
a3d0b65
refactor: shared rewrite function
duongcongtoai Jun 7, 2025
8e858b4
refactor: remove all unwrap
duongcongtoai Jun 7, 2025
30300d1
fix: test expectation
duongcongtoai Jun 7, 2025
a93f901
fix subquery in join filter
irenjj Jun 8, 2025
4aed14f
rename
irenjj Jun 8, 2025
6f2ce78
add todo
irenjj Jun 8, 2025
7534a49
Merge pull request #9 from irenjj/subquery_in_join_filter
duongcongtoai Jun 8, 2025
c330c24
Merge branch 'main' into 14554-subquery-unnest-framework
duongcongtoai Jun 8, 2025
5be430a
chore: more constraint on correlated subquery in join filter
duongcongtoai Jun 8, 2025
7dc3dd9
Merge pull request #10 from duongcongtoai/dependent-join-multiple-sub…
duongcongtoai Jun 9, 2025
dc656f0
chore: try fix snapshot
duongcongtoai Jun 9, 2025
612ae33
Merge branch 'main' into 14554-subquery-unnest-framework
duongcongtoai Jun 9, 2025
f4c4ec0
chore: use normal assert
duongcongtoai Jun 10, 2025
f1f4529
Merge branch 'main' into 14554-subquery-unnest-framework
duongcongtoai Jun 10, 2025
0f5278f
fix: correct err assert
duongcongtoai Jun 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,11 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: Analyze must be root of the plan"
)
}
LogicalPlan::DependentJoin(_) => {
return internal_err!(
"Optimizors have not completely remove dependent join"
)
}
};
Ok(exec_node)
}
Expand Down
69 changes: 65 additions & 4 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ use crate::expr_rewriter::{
rewrite_sort_cols_by_aggs,
};
use crate::logical_plan::{
Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
Window,
Aggregate, Analyze, DependentJoin, Distinct, DistinctOn, EmptyRelation, Explain,
Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
Prepare, Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest,
Values, Window,
};
use crate::select_expr::SelectExpr;
use crate::utils::{
Expand Down Expand Up @@ -883,6 +883,47 @@ impl LogicalPlanBuilder {
))))
}

/// Build a dependent join provided a subquery plan
/// this function should only be used by the optimizor
/// a dependent join node will provides all columns belonging to the LHS
/// and one additional column as the result of evaluating the subquery on the RHS
/// under the name "subquery_name.output"
pub fn dependent_join(
self,
right: LogicalPlan,
correlated_columns: Vec<(usize, Column, DataType)>,
subquery_expr: Option<Expr>,
subquery_depth: usize,
subquery_name: String,
lateral_join_condition: Option<(JoinType, Expr)>,
) -> Result<Self> {
let left = self.build()?;
let schema = left.schema();
// TODO: for lateral join, output schema is similar to a normal join
let qualified_fields = schema
.iter()
.map(|(q, f)| (q.cloned(), Arc::clone(f)))
.chain(
subquery_expr
.iter()
.map(|expr| subquery_output_field(&subquery_name, expr)),
)
.collect();
let metadata = schema.metadata().clone();
let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;

Ok(Self::new(LogicalPlan::DependentJoin(DependentJoin {
schema: DFSchemaRef::new(dfschema),
left: Arc::new(left),
right: Arc::new(right),
correlated_columns,
subquery_expr,
subquery_name,
subquery_depth,
lateral_join_condition,
})))
}

/// Apply a join to `right` using explicitly specified columns and an
/// optional filter expression.
///
Expand Down Expand Up @@ -1547,6 +1588,26 @@ fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
)
}

fn subquery_output_field(
subquery_alias: &str,
subquery_expr: &Expr,
) -> (Option<TableReference>, Arc<Field>) {
// TODO: check nullability
let field = match subquery_expr {
Expr::InSubquery(_) => Arc::new(Field::new("output", DataType::Boolean, false)),
Expr::Exists(_) => Arc::new(Field::new("output", DataType::Boolean, false)),
Expr::ScalarSubquery(sq) => {
let data_type = sq.subquery.schema().field(0).data_type().clone();
Arc::new(Field::new("output", data_type, false))
}
_ => {
unreachable!()
}
};

(Some(TableReference::bare(subquery_alias)), field)
}

/// Creates a schema for a join operation.
/// The fields from the left side are first
pub fn build_join_schema(
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {

object
}
LogicalPlan::DependentJoin(..) => json!({}),
LogicalPlan::Join(Join {
on: ref keys,
filter,
Expand Down
25 changes: 17 additions & 8 deletions datafusion/expr/src/logical_plan/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,20 +201,27 @@ pub fn check_subquery_expr(
}?;
match outer_plan {
LogicalPlan::Projection(_)
| LogicalPlan::Filter(_) => Ok(()),
LogicalPlan::Aggregate(Aggregate { group_expr, aggr_expr, .. }) => {
| LogicalPlan::Filter(_)
| LogicalPlan::DependentJoin(_) => Ok(()),
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
..
}) => {
if group_expr.contains(expr) && !aggr_expr.contains(expr) {
// TODO revisit this validation logic
plan_err!(
"Correlated scalar subquery in the GROUP BY clause must also be in the aggregate expressions"
"Correlated scalar subquery in the GROUP BY clause must \
also be in the aggregate expressions"
)
} else {
Ok(())
}
}
_ => plan_err!(
"Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes"
)
"Correlated scalar subquery can only be used in Projection, Filter, \
Aggregate, DependentJoin plan nodes"
),
}?;
}
check_correlations_in_subquery(inner_plan)
Expand All @@ -235,11 +242,12 @@ pub fn check_subquery_expr(
| LogicalPlan::TableScan(_)
| LogicalPlan::Window(_)
| LogicalPlan::Aggregate(_)
| LogicalPlan::Join(_) => Ok(()),
| LogicalPlan::Join(_)
| LogicalPlan::DependentJoin(_) => Ok(()),
_ => plan_err!(
"In/Exist subquery can only be used in \
Projection, Filter, TableScan, Window functions, Aggregate and Join plan nodes, \
but was used in [{}]",
Projection, Filter, TableScan, Window functions, Aggregate, Join and \
Dependent Join plan nodes, but was used in [{}]",
outer_plan.display()
),
}?;
Expand Down Expand Up @@ -323,6 +331,7 @@ fn check_inner_plan(inner_plan: &LogicalPlan) -> Result<()> {
}
},
LogicalPlan::Extension(_) => Ok(()),
LogicalPlan::DependentJoin(_) => Ok(()),
plan => check_no_outer_references(plan),
}
}
Expand Down
11 changes: 6 additions & 5 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct,
DistinctOn, EmptyRelation, Explain, ExplainFormat, Extension, FetchType, Filter,
Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
projection_schema, Aggregate, Analyze, ColumnUnnestList, DependentJoin,
DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, ExplainFormat,
Extension, FetchType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, Projection, RecursiveQuery, Repartition, SkipType, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
Unnest, Values, Window,
};
pub use statement::{
Deallocate, Execute, Prepare, SetVariable, Statement, TransactionAccessMode,
Expand Down
105 changes: 105 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,99 @@ pub enum LogicalPlan {
Unnest(Unnest),
/// A variadic query (e.g. "Recursive CTEs")
RecursiveQuery(RecursiveQuery),
/// A node type that only exist during subquery decorrelation
/// TODO: maybe we can avoid creating new type of LogicalPlan for this usecase
DependentJoin(DependentJoin),
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DependentJoin {
pub schema: DFSchemaRef,
// All combinations of (subquery depth,Column and its DataType) on the RHS (and its descendant)
// which points to a column on the LHS of this dependent join
// Note that not all outer_refs from the RHS are mentioned in this vectors
// because RHS may reference columns provided somewhere from the above parent dependent join.
// Depths of each correlated_columns should always be gte current dependent join
// subquery_depth
pub correlated_columns: Vec<(usize, Column, DataType)>,
// the upper expr that containing the subquery expr
// i.e for predicates: where outer = scalar_sq + 1
// correlated exprs are `scalar_sq + 1`
pub subquery_expr: Option<Expr>,
// begins with depth = 1
pub subquery_depth: usize,
pub left: Arc<LogicalPlan>,
// dependent side accessing columns from left hand side (and maybe columns)
// belong to the parent dependent join node in case of recursion)
pub right: Arc<LogicalPlan>,
pub subquery_name: String,

pub lateral_join_condition: Option<(JoinType, Expr)>,
}

impl Display for DependentJoin {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let correlated_str = self
.correlated_columns
.iter()
.map(|(level, col, _)| format!("{col} lvl {level}"))
.collect::<Vec<String>>()
.join(", ");
let lateral_join_info =
if let Some((join_type, join_expr)) = &self.lateral_join_condition {
format!(" lateral {join_type} join with {join_expr}")
} else {
"".to_string()
};
let subquery_expr_str = if let Some(expr) = &self.subquery_expr {
format!(" with expr {expr}")
} else {
"".to_string()
};
write!(
f,
"DependentJoin on [{correlated_str}]{subquery_expr_str}\
{lateral_join_info} depth {0}",
self.subquery_depth,
)
}
}

impl PartialOrd for DependentJoin {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
#[derive(PartialEq, PartialOrd)]
struct ComparableJoin<'a> {
correlated_columns: &'a Vec<(usize, Column, DataType)>,
// the upper expr that containing the subquery expr
// i.e for predicates: where outer = scalar_sq + 1
// correlated exprs are `scalar_sq + 1`
subquery_expr: &'a Option<Expr>,

depth: &'a usize,
left: &'a Arc<LogicalPlan>,
// dependent side accessing columns from left hand side (and maybe columns)
// belong to the parent dependent join node in case of recursion)
right: &'a Arc<LogicalPlan>,
lateral_join_condition: &'a Option<(JoinType, Expr)>,
}
let comparable_self = ComparableJoin {
left: &self.left,
right: &self.right,
correlated_columns: &self.correlated_columns,
subquery_expr: &self.subquery_expr,
depth: &self.subquery_depth,
lateral_join_condition: &self.lateral_join_condition,
};
let comparable_other = ComparableJoin {
left: &other.left,
right: &other.right,
correlated_columns: &other.correlated_columns,
subquery_expr: &other.subquery_expr,
depth: &other.subquery_depth,
lateral_join_condition: &other.lateral_join_condition,
};
comparable_self.partial_cmp(&comparable_other)
}
}

impl Default for LogicalPlan {
Expand Down Expand Up @@ -318,6 +411,7 @@ impl LogicalPlan {
/// Get a reference to the logical plan's schema
pub fn schema(&self) -> &DFSchemaRef {
match self {
LogicalPlan::DependentJoin(DependentJoin { schema, .. }) => schema,
LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
LogicalPlan::Values(Values { schema, .. }) => schema,
LogicalPlan::TableScan(TableScan {
Expand Down Expand Up @@ -452,6 +546,9 @@ impl LogicalPlan {
LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
LogicalPlan::Sort(Sort { input, .. }) => vec![input],
LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
LogicalPlan::DependentJoin(DependentJoin { left, right, .. }) => {
vec![left, right]
}
LogicalPlan::Limit(Limit { input, .. }) => vec![input],
LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
Expand Down Expand Up @@ -540,6 +637,7 @@ impl LogicalPlan {
| LogicalPlan::Limit(Limit { input, .. })
| LogicalPlan::Repartition(Repartition { input, .. })
| LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
LogicalPlan::DependentJoin(..) => todo!(),
LogicalPlan::Join(Join {
left,
right,
Expand Down Expand Up @@ -647,6 +745,7 @@ impl LogicalPlan {
}) => Aggregate::try_new(input, group_expr, aggr_expr)
.map(LogicalPlan::Aggregate),
LogicalPlan::Sort(_) => Ok(self),
LogicalPlan::DependentJoin(_) => todo!(),
LogicalPlan::Join(Join {
left,
right,
Expand Down Expand Up @@ -1138,6 +1237,7 @@ impl LogicalPlan {
unnest_with_options(input, columns.clone(), options.clone())?;
Ok(new_plan)
}
LogicalPlan::DependentJoin(_) => todo!(),
}
}

Expand Down Expand Up @@ -1290,6 +1390,7 @@ impl LogicalPlan {
/// If `Some(n)` then the plan can return at most `n` rows but may return fewer.
pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
match self {
LogicalPlan::DependentJoin(DependentJoin { left, .. }) => left.max_rows(),
LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
LogicalPlan::Filter(filter) => {
if filter.is_scalar() {
Expand Down Expand Up @@ -1882,6 +1983,10 @@ impl LogicalPlan {

Ok(())
}

LogicalPlan::DependentJoin(dependent_join) => {
Display::fmt(dependent_join, f)
},
LogicalPlan::Join(Join {
on: ref keys,
filter,
Expand Down
Loading
Loading