From e32ae3f5843c5c8b103a4788812edb3313e27df8 Mon Sep 17 00:00:00 2001 From: irenjj Date: Thu, 15 May 2025 22:46:49 +0800 Subject: [PATCH 1/7] Fix Correlated Subquery With Depth Larger Than One --- datafusion/sql/src/expr/subquery.rs | 5 ++--- datafusion/sql/src/planner.rs | 8 ++++++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 602d39233d58..7caa3f6cf724 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -16,6 +16,7 @@ // under the License. use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use arrow::datatypes::Field; use datafusion_common::{plan_err, DFSchema, Diagnostic, Result, Span, Spans}; use datafusion_expr::expr::{Exists, InSubquery}; use datafusion_expr::{Expr, LogicalPlan, Subquery}; @@ -98,8 +99,7 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + planner_context.extend_outer_query_schema(&Arc::new(input_schema.clone()))?; let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { for item in &select.projection { @@ -112,7 +112,6 @@ impl SqlToRel<'_, S> { } let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); self.validate_single_column( &sub_plan, diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 3325c98aa74b..0c06ff8294cb 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -249,6 +249,14 @@ impl PlannerContext { schema } + pub fn extend_outer_query_schema(&mut self, schema: &DFSchemaRef) -> Result<()> { + match self.outer_query_schema.as_mut() { + Some(from_schema) => Arc::make_mut(from_schema).merge(schema), + None => self.outer_query_schema = Some(Arc::clone(schema)), + }; + Ok(()) + } + pub fn set_table_schema( &mut self, mut schema: Option, From f3d1c310abf63948dd8afc406981395c60f70727 Mon Sep 17 00:00:00 2001 From: irenjj Date: Sun, 18 May 2025 19:28:28 +0800 Subject: [PATCH 2/7] Make planner_context perceivable by create_plan --- benchmarks/src/cancellation.rs | 6 ++- datafusion-cli/src/exec.rs | 20 ++++++-- datafusion-cli/src/object_storage.rs | 25 ++++++++-- datafusion/core/src/execution/context/mod.rs | 6 ++- .../core/src/execution/session_state.rs | 6 ++- datafusion/core/tests/sql/sql_api.rs | 10 +++- .../tests/cases/roundtrip_logical_plan.rs | 5 +- datafusion/sql/src/expr/subquery.rs | 2 +- datafusion/sql/src/statement.rs | 48 +++++++++++++------ 9 files changed, 96 insertions(+), 32 deletions(-) diff --git a/benchmarks/src/cancellation.rs b/benchmarks/src/cancellation.rs index f5740bdc96e0..27bd2179890b 100644 --- a/benchmarks/src/cancellation.rs +++ b/benchmarks/src/cancellation.rs @@ -33,6 +33,7 @@ use datafusion::execution::TaskContext; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; +use datafusion::sql::planner::PlannerContext; use datafusion_common::instant::Instant; use futures::TryStreamExt; use object_store::ObjectStore; @@ -185,7 +186,10 @@ async fn datafusion(store: Arc) -> Result<()> { .await?; println!("Creating logical plan..."); - let logical_plan = ctx.state().create_logical_plan(query).await?; + let logical_plan = ctx + .state() + .create_logical_plan(query, &mut PlannerContext::new()) + .await?; println!("Creating physical plan..."); let physical_plan = Arc::new(CoalescePartitionsExec::new( diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index fa4cd9c5fd3b..e664d73744d9 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -26,6 +26,7 @@ use crate::{ object_storage::get_object_store, print_options::{MaxRows, PrintOptions}, }; +use datafusion::sql::planner::PlannerContext; use futures::StreamExt; use std::collections::HashMap; use std::fs::File; @@ -231,7 +232,8 @@ pub(super) async fn exec_and_print( let adjusted = AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement); - let plan = create_plan(ctx, statement).await?; + let mut planner_context = PlannerContext::new(); + let plan = create_plan(ctx, statement, &mut planner_context).await?; let adjusted = adjusted.with_plan(&plan); let df = ctx.execute_logical_plan(plan).await?; @@ -348,8 +350,12 @@ fn config_file_type_from_str(ext: &str) -> Option { async fn create_plan( ctx: &dyn CliSessionContext, statement: Statement, + planner_context: &mut PlannerContext, ) -> Result { - let mut plan = ctx.session_state().statement_to_plan(statement).await?; + let mut plan = ctx + .session_state() + .statement_to_plan(statement, planner_context) + .await?; // Note that cmd is a mutable reference so that create_external_table function can remove all // datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion @@ -453,7 +459,10 @@ mod tests { async fn create_external_table_test(location: &str, sql: &str) -> Result<()> { let ctx = SessionContext::new(); - let plan = ctx.state().create_logical_plan(sql).await?; + let plan = ctx + .state() + .create_logical_plan(sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { let format = config_file_type_from_str(&cmd.file_type); @@ -479,7 +488,10 @@ mod tests { let ctx = SessionContext::new(); // AWS CONFIG register. - let plan = ctx.state().create_logical_plan(sql).await?; + let plan = ctx + .state() + .create_logical_plan(sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Copy(cmd) = &plan { let format = config_file_type_from_str(&cmd.file_type.get_ext()); diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index c31310093ac6..642fc7716322 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -493,7 +493,10 @@ mod tests { ); let ctx = SessionContext::new(); - let mut plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx + .state() + .create_logical_plan(&sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { ctx.register_table_options_extension_from_scheme(scheme); @@ -538,7 +541,10 @@ mod tests { ); let ctx = SessionContext::new(); - let mut plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx + .state() + .create_logical_plan(&sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { ctx.register_table_options_extension_from_scheme(scheme); @@ -564,7 +570,10 @@ mod tests { ) LOCATION '{location}'" ); - let mut plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx + .state() + .create_logical_plan(&sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { ctx.register_table_options_extension_from_scheme(scheme); @@ -592,7 +601,10 @@ mod tests { let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.oss.endpoint' '{endpoint}') LOCATION '{location}'"); let ctx = SessionContext::new(); - let mut plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx + .state() + .create_logical_plan(&sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { ctx.register_table_options_extension_from_scheme(scheme); @@ -629,7 +641,10 @@ mod tests { let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET OPTIONS('gcp.service_account_path' '{service_account_path}', 'gcp.service_account_key' '{service_account_key}', 'gcp.application_credentials_path' '{application_credentials_path}') LOCATION '{location}'"); let ctx = SessionContext::new(); - let mut plan = ctx.state().create_logical_plan(&sql).await?; + let mut plan = ctx + .state() + .create_logical_plan(&sql, &mut PlannerContext::new()) + .await?; if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { ctx.register_table_options_extension_from_scheme(scheme); diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index ecc3bd2990f4..81e37d9d7f95 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -87,6 +87,7 @@ use datafusion_session::SessionStore; use async_trait::async_trait; use chrono::{DateTime, Utc}; +use datafusion_sql::planner::PlannerContext; use object_store::ObjectStore; use parking_lot::RwLock; use url::Url; @@ -620,7 +621,10 @@ impl SessionContext { sql: &str, options: SQLOptions, ) -> Result { - let plan = self.state().create_logical_plan(sql).await?; + let plan = self + .state() + .create_logical_plan(sql, &mut PlannerContext::new()) + .await?; options.verify_plan(&plan)?; self.execute_logical_plan(plan).await diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 8aa812cc5258..699e56127beb 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -461,6 +461,7 @@ impl SessionState { pub async fn statement_to_plan( &self, statement: Statement, + planner_context: &mut PlannerContext, ) -> datafusion_common::Result { let references = self.resolve_table_references(&statement)?; @@ -482,7 +483,7 @@ impl SessionState { } let query = SqlToRel::new_with_options(&provider, self.get_parser_options()); - query.statement_to_plan(statement) + query.statement_to_plan(statement, planner_context) } fn get_parser_options(&self) -> ParserOptions { @@ -514,10 +515,11 @@ impl SessionState { pub async fn create_logical_plan( &self, sql: &str, + planner_context: &mut PlannerContext, ) -> datafusion_common::Result { let dialect = self.config.options().sql_parser.dialect.as_str(); let statement = self.sql_to_statement(sql, dialect)?; - let plan = self.statement_to_plan(statement).await?; + let plan = self.statement_to_plan(statement, planner_context).await?; Ok(plan) } diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index ec086bcc50c7..fa7199d2c6ca 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -162,7 +162,9 @@ async fn empty_statement_returns_error() { let state = ctx.state(); // Give it an empty string which contains no statements - let plan_res = state.create_logical_plan("").await; + let plan_res = state + .create_logical_plan("", &mut PlannerContext::new()) + .await; assert_eq!( plan_res.unwrap_err().strip_backtrace(), "Error during planning: No SQL statements were provided in the query string" @@ -180,6 +182,7 @@ async fn multiple_statements_returns_error() { let plan_res = state .create_logical_plan( "INSERT INTO test (x) VALUES (1); INSERT INTO test (x) VALUES (2)", + &mut PlannerContext::new(), ) .await; assert_eq!( @@ -199,7 +202,10 @@ async fn ddl_can_not_be_planned_by_session_state() { // can not create a logical plan for catalog DDL let sql = "DROP TABLE test"; - let plan = state.create_logical_plan(sql).await.unwrap(); + let plan = state + .create_logical_plan(sql, &mut PlannerContext::new()) + .await + .unwrap(); let physical_plan = state.create_physical_plan(&plan).await; assert_eq!( physical_plan.unwrap_err().strip_backtrace(), diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index eee1bc23c9ad..ed73dea2206e 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -288,7 +288,10 @@ async fn roundtrip_custom_listing_tables() -> Result<()> { LOCATION '../core/tests/data/window_2.csv' OPTIONS ('format.has_header' 'true')"; - let plan = ctx.state().create_logical_plan(query).await?; + let plan = ctx + .state() + .create_logical_plan(query, &mut PlannerContext::new()) + .await?; let bytes = logical_plan_to_bytes(&plan)?; let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 7caa3f6cf724..8f43538314c8 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -16,7 +16,6 @@ // under the License. use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use arrow::datatypes::Field; use datafusion_common::{plan_err, DFSchema, Diagnostic, Result, Span, Spans}; use datafusion_expr::expr::{Exists, InSubquery}; use datafusion_expr::{Expr, LogicalPlan, Subquery}; @@ -112,6 +111,7 @@ impl SqlToRel<'_, S> { } let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); + dbg!(&outer_ref_columns); self.validate_single_column( &sub_plan, diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 1f1c235fee6f..73e0275e7472 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -177,26 +177,37 @@ fn calc_inline_constraints_from_columns(columns: &[ColumnDef]) -> Vec SqlToRel<'_, S> { /// Generate a logical plan from an DataFusion SQL statement - pub fn statement_to_plan(&self, statement: DFStatement) -> Result { + pub fn statement_to_plan( + &self, + statement: DFStatement, + planner_context: &mut PlannerContext, + ) -> Result { match statement { DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s), - DFStatement::Statement(s) => self.sql_statement_to_plan(*s), + DFStatement::Statement(s) => self.sql_statement_to_plan(*s, planner_context), DFStatement::CopyTo(s) => self.copy_to_plan(s), DFStatement::Explain(ExplainStatement { verbose, analyze, format, statement, - }) => self.explain_to_plan(verbose, analyze, format, *statement), + }) => self.explain_to_plan( + verbose, + analyze, + format, + *statement, + planner_context, + ), } } /// Generate a logical plan from an SQL statement - pub fn sql_statement_to_plan(&self, statement: Statement) -> Result { - self.sql_statement_to_plan_with_context_impl( - statement, - &mut PlannerContext::new(), - ) + pub fn sql_statement_to_plan( + &self, + statement: Statement, + planner_context: &mut PlannerContext, + ) -> Result { + self.sql_statement_to_plan_with_context_impl(statement, planner_context) } /// Generate a logical plan from an SQL statement @@ -229,7 +240,7 @@ impl SqlToRel<'_, S> { } => { let format = format.map(|format| format.to_string()); let statement = DFStatement::Statement(statement); - self.explain_to_plan(verbose, analyze, format, statement) + self.explain_to_plan(verbose, analyze, format, statement, planner_context) } Statement::Query(query) => self.query_to_plan(*query, planner_context), Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable), @@ -1295,7 +1306,10 @@ impl SqlToRel<'_, S> { let query = "SELECT * FROM information_schema.tables;"; let mut rewrite = DFParser::parse_sql(query)?; assert_eq!(rewrite.len(), 1); - self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1 + self.statement_to_plan( + rewrite.pop_front().unwrap(), + &mut PlannerContext::new(), + ) // length of rewrite is 1 } else { plan_err!("SHOW TABLES is not supported unless information_schema is enabled") } @@ -1629,8 +1643,9 @@ impl SqlToRel<'_, S> { analyze: bool, format: Option, statement: DFStatement, + planner_context: &mut PlannerContext, ) -> Result { - let plan = self.statement_to_plan(statement)?; + let plan = self.statement_to_plan(statement, planner_context)?; if matches!(plan, LogicalPlan::Explain(_)) { return plan_err!("Nested EXPLAINs are not supported"); } @@ -1723,7 +1738,7 @@ impl SqlToRel<'_, S> { let mut rewrite = DFParser::parse_sql(&query)?; assert_eq!(rewrite.len(), 1); - self.statement_to_plan(rewrite.pop_front().unwrap()) + self.statement_to_plan(rewrite.pop_front().unwrap(), &mut PlannerContext::new()) } fn set_variable_to_plan( @@ -2109,7 +2124,8 @@ impl SqlToRel<'_, S> { let mut rewrite = DFParser::parse_sql(&query)?; assert_eq!(rewrite.len(), 1); - self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1 + self.statement_to_plan(rewrite.pop_front().unwrap(), &mut PlannerContext::new()) + // length of rewrite is 1 } /// Rewrite `SHOW FUNCTIONS` to another SQL query @@ -2193,7 +2209,8 @@ ON p.function_name = r.routine_name ); let mut rewrite = DFParser::parse_sql(&query)?; assert_eq!(rewrite.len(), 1); - self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1 + self.statement_to_plan(rewrite.pop_front().unwrap(), &mut PlannerContext::new()) + // length of rewrite is 1 } fn show_create_table_to_plan( @@ -2221,7 +2238,8 @@ ON p.function_name = r.routine_name let mut rewrite = DFParser::parse_sql(&query)?; assert_eq!(rewrite.len(), 1); - self.statement_to_plan(rewrite.pop_front().unwrap()) // length of rewrite is 1 + self.statement_to_plan(rewrite.pop_front().unwrap(), &mut PlannerContext::new()) + // length of rewrite is 1 } /// Return true if there is a table provider available for "schema.table" From 5f935c79ce5828c70101f3ce0e2ad63d5355ebec Mon Sep 17 00:00:00 2001 From: irenjj Date: Mon, 19 May 2025 20:58:32 +0800 Subject: [PATCH 3/7] split outer_query_schema into different level --- datafusion/sql/src/expr/identifier.rs | 77 ++++++++++++++------------- datafusion/sql/src/expr/subquery.rs | 23 +++++--- datafusion/sql/src/planner.rs | 68 +++++++++++++++++------ datafusion/sql/src/relation/mod.rs | 7 +-- 4 files changed, 111 insertions(+), 64 deletions(-) diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 7c276ce53e35..2544d4e3c99e 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -165,48 +165,49 @@ impl SqlToRel<'_, S> { not_impl_err!("compound identifier: {ids:?}") } else { // Check the outer_query_schema and try to find a match - if let Some(outer) = planner_context.outer_query_schema() { - let search_result = search_dfschema(&ids, outer); - match search_result { - // Found matching field with spare identifier(s) for nested field(s) in structure - Some((field, qualifier, nested_names)) - if !nested_names.is_empty() => - { - // TODO: remove when can support nested identifiers for OuterReferenceColumn - not_impl_err!( - "Nested identifiers are not yet supported for OuterReferenceColumn {}", - Column::from((qualifier, field)).quoted_flat_name() - ) + // TODO: Put the depth somewhere to record it like (OuterReferenceColumn) + for (_depth, schema) in + planner_context.iter_outer_query_schemas_rev() + { + if let Some(outer) = schema { + let search_result = search_dfschema(&ids, outer); + match search_result { + // Found matching field with spare identifier(s) for nested field(s) in structure + Some((field, qualifier, nested_names)) + if !nested_names.is_empty() => + { + // TODO: remove when can support nested identifiers for OuterReferenceColumn + return not_impl_err!("Nested identifiers are not yet supported for OuterReferenceColumn {}", Column::from((qualifier, field)).quoted_flat_name()); + } + // Found matching field with no spare identifier(s) + Some((field, qualifier, _nested_names)) => { + // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column + return Ok(Expr::OuterReferenceColumn( + field.data_type().clone(), + Column::from((qualifier, field)), + )); + } + // Found no matching field, will return a default + None => continue, } - // Found matching field with no spare identifier(s) - Some((field, qualifier, _nested_names)) => { - // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column - Ok(Expr::OuterReferenceColumn( - field.data_type().clone(), - Column::from((qualifier, field)), - )) - } - // Found no matching field, will return a default - None => { - let s = &ids[0..ids.len()]; - // safe unwrap as s can never be empty or exceed the bounds - let (relation, column_name) = - form_identifier(s).unwrap(); - Ok(Expr::Column(Column::new(relation, column_name))) - } - } - } else { - let s = &ids[0..ids.len()]; - // Safe unwrap as s can never be empty or exceed the bounds - let (relation, column_name) = form_identifier(s).unwrap(); - let mut column = Column::new(relation, column_name); - if self.options.collect_spans { - if let Some(span) = ids_span { - column.spans_mut().add_span(span); + } else { + // Only depth=0 outer_query_schema can reach here. + let s = &ids[0..ids.len()]; + // Safe unwrap as s can never be empty or exceed the bounds + let (relation, column_name) = form_identifier(s).unwrap(); + let mut column = Column::new(relation, column_name); + if self.options.collect_spans { + if let Some(span) = ids_span { + column.spans_mut().add_span(span); + } } + return Ok(Expr::Column(column)); } - Ok(Expr::Column(column)) } + let s = &ids[0..ids.len()]; + // safe unwrap as s can never be empty or exceed the bounds + let (relation, column_name) = form_identifier(s).unwrap(); + return Ok(Expr::Column(Column::new(relation, column_name))); } } } diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 8f43538314c8..7d5348a14173 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -31,11 +31,15 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + // TODO + planner_context.push_outer_query_schema(Some(input_schema.clone().into())); + planner_context.increase_depth(); + let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); + + planner_context.decrease_depth(); + Ok(Expr::Exists(Exists { subquery: Subquery { subquery: Arc::new(sub_plan), @@ -54,8 +58,9 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let old_outer_query_schema = - planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + // TODO + planner_context.push_outer_query_schema(Some(input_schema.clone().into())); + planner_context.increase_depth(); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { @@ -70,7 +75,6 @@ impl SqlToRel<'_, S> { let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_outer_query_schema); self.validate_single_column( &sub_plan, @@ -81,6 +85,8 @@ impl SqlToRel<'_, S> { let expr_obj = self.sql_to_expr(expr, input_schema, planner_context)?; + planner_context.decrease_depth(); + Ok(Expr::InSubquery(InSubquery::new( Box::new(expr_obj), Subquery { @@ -98,7 +104,8 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - planner_context.extend_outer_query_schema(&Arc::new(input_schema.clone()))?; + planner_context.push_outer_query_schema(Some(input_schema.clone().into())); + planner_context.increase_depth(); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { for item in &select.projection { @@ -120,6 +127,8 @@ impl SqlToRel<'_, S> { "Select only one column in the subquery", )?; + planner_context.decrease_depth(); + Ok(Expr::ScalarSubquery(Subquery { subquery: Arc::new(sub_plan), outer_ref_columns, diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 0c06ff8294cb..32f4f62472aa 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -198,13 +198,35 @@ pub struct PlannerContext { /// Map of CTE name to logical plan of the WITH clause. /// Use `Arc` to allow cheap cloning ctes: HashMap>, - /// The query schema of the outer query plan, used to resolve the columns in subquery - outer_query_schema: Option, /// The joined schemas of all FROM clauses planned so far. When planning LATERAL /// FROM clauses, this should become a suffix of the `outer_query_schema`. outer_from_schema: Option, /// The query schema defined by the table create_table_schema: Option, + + // TODO: take outer_from_schema and create_table_schema into consideration. + /// All levels query schema of the outer query plan, used to resolve the columns in subquery. + /// Use `depth` to index different level outer_query_schema. + /// For example: + /// SELECT name <---------------------------------------- depth = 0 + /// FROM employees e + /// WHERE salary > ( + /// SELECT AVG(salary) <----------------------------- depth = 1 + /// FROM employees e2 + /// WHERE e2.department_id = e.department_id + /// AND e.department_id IN ( + /// SELECT department_id <--------------------- depth = 2 + /// FROM employees + /// GROUP BY department_id + /// HAVING AVG(salary) > ( + /// SELECT AVG(salary) + /// FROM employees + /// ) + /// ) + /// ); + outer_query_schemas: Vec>, + /// Current depth of query, starting from 0. + cur_depth: usize, } impl Default for PlannerContext { @@ -219,9 +241,10 @@ impl PlannerContext { Self { prepare_param_data_types: Arc::new(vec![]), ctes: HashMap::new(), - outer_query_schema: None, outer_from_schema: None, create_table_schema: None, + outer_query_schemas: vec![None], // depth 0 has no outer query schema + cur_depth: 0, } } @@ -234,27 +257,40 @@ impl PlannerContext { self } + // TODO: replace all places with outer_query_schemas() // Return a reference to the outer query's schema pub fn outer_query_schema(&self) -> Option<&DFSchema> { - self.outer_query_schema.as_ref().map(|s| s.as_ref()) + self.outer_query_schemas[self.cur_depth] + .as_ref() + .map(|s| s.as_ref()) + } + + pub fn outer_query_schemas(&self) -> &[Option] { + &self.outer_query_schemas + } + + /// Returns an iterator over the outer query schemas from back to front, + /// along with their indices. + pub fn iter_outer_query_schemas_rev(&self) -> impl Iterator)> { + self.outer_query_schemas + .iter() + .enumerate() + .rev() + .map(|(i, schema_ref)| (i, schema_ref.as_ref().map(|s| s.as_ref()))) } /// Sets the outer query schema, returning the existing one, if /// any - pub fn set_outer_query_schema( - &mut self, - mut schema: Option, - ) -> Option { - std::mem::swap(&mut self.outer_query_schema, &mut schema); - schema + pub fn push_outer_query_schema(&mut self, schema: Option) { + self.outer_query_schemas.push(schema); } - pub fn extend_outer_query_schema(&mut self, schema: &DFSchemaRef) -> Result<()> { - match self.outer_query_schema.as_mut() { - Some(from_schema) => Arc::make_mut(from_schema).merge(schema), - None => self.outer_query_schema = Some(Arc::clone(schema)), - }; - Ok(()) + pub fn increase_depth(&mut self) { + self.cur_depth += 1; + } + + pub fn decrease_depth(&mut self) { + self.cur_depth -= 1; } pub fn set_table_schema( diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index dee855f8c000..ef17cea20f19 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -184,7 +184,8 @@ impl SqlToRel<'_, S> { let old_from_schema = planner_context .set_outer_from_schema(None) .unwrap_or_else(|| Arc::new(DFSchema::empty())); - let new_query_schema = match planner_context.outer_query_schema() { + // TODO + let _new_query_schema = match planner_context.outer_query_schema() { Some(old_query_schema) => { let mut new_query_schema = old_from_schema.as_ref().clone(); new_query_schema.merge(old_query_schema); @@ -192,12 +193,12 @@ impl SqlToRel<'_, S> { } None => Some(Arc::clone(&old_from_schema)), }; - let old_query_schema = planner_context.set_outer_query_schema(new_query_schema); + // let old_query_schema = planner_context.set_outer_query_schema(new_query_schema); let plan = self.create_relation(subquery, planner_context)?; let outer_ref_columns = plan.all_out_ref_exprs(); - planner_context.set_outer_query_schema(old_query_schema); + // planner_context.set_outer_query_schema(old_query_schema); planner_context.set_outer_from_schema(Some(old_from_schema)); // We can omit the subquery wrapper if there are no columns From ded701ff02ae8fc57ebbce2abd998e4784ba3f31 Mon Sep 17 00:00:00 2001 From: irenjj Date: Tue, 20 May 2025 07:54:38 +0800 Subject: [PATCH 4/7] add depth to Subquery LogicalPlan --- datafusion/expr/src/expr_fn.rs | 15 +++++++---- datafusion/expr/src/expr_schema.rs | 1 + datafusion/expr/src/logical_plan/plan.rs | 4 +++ datafusion/expr/src/logical_plan/tree_node.rs | 2 ++ .../optimizer/src/analyzer/type_coercion.rs | 4 +++ .../src/decorrelate_predicate_subquery.rs | 26 ++++++++++++------- datafusion/sql/src/expr/subquery.rs | 7 +++++ datafusion/sql/src/planner.rs | 8 +++++- datafusion/sql/src/relation/mod.rs | 3 +++ 9 files changed, 54 insertions(+), 16 deletions(-) diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index cee356a2b42c..8441fd1f9cbc 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -231,12 +231,13 @@ pub fn in_list(expr: Expr, list: Vec, negated: bool) -> Expr { } /// Create an EXISTS subquery expression -pub fn exists(subquery: Arc) -> Expr { +pub fn exists(subquery: Arc, depth: usize) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::Exists(Exists { subquery: Subquery { subquery, outer_ref_columns, + depth, spans: Spans::new(), }, negated: false, @@ -244,12 +245,13 @@ pub fn exists(subquery: Arc) -> Expr { } /// Create a NOT EXISTS subquery expression -pub fn not_exists(subquery: Arc) -> Expr { +pub fn not_exists(subquery: Arc, depth: usize) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::Exists(Exists { subquery: Subquery { subquery, outer_ref_columns, + depth, spans: Spans::new(), }, negated: true, @@ -257,13 +259,14 @@ pub fn not_exists(subquery: Arc) -> Expr { } /// Create an IN subquery expression -pub fn in_subquery(expr: Expr, subquery: Arc) -> Expr { +pub fn in_subquery(expr: Expr, subquery: Arc, depth: usize) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::InSubquery(InSubquery::new( Box::new(expr), Subquery { subquery, outer_ref_columns, + depth, spans: Spans::new(), }, false, @@ -271,13 +274,14 @@ pub fn in_subquery(expr: Expr, subquery: Arc) -> Expr { } /// Create a NOT IN subquery expression -pub fn not_in_subquery(expr: Expr, subquery: Arc) -> Expr { +pub fn not_in_subquery(expr: Expr, subquery: Arc, depth: usize) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::InSubquery(InSubquery::new( Box::new(expr), Subquery { subquery, outer_ref_columns, + depth, spans: Spans::new(), }, true, @@ -285,11 +289,12 @@ pub fn not_in_subquery(expr: Expr, subquery: Arc) -> Expr { } /// Create a scalar subquery expression -pub fn scalar_subquery(subquery: Arc) -> Expr { +pub fn scalar_subquery(subquery: Arc, depth: usize) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::ScalarSubquery(Subquery { subquery, outer_ref_columns, + depth, spans: Spans::new(), }) } diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 3786180e2cfa..c543f3e5be70 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -661,6 +661,7 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result { self.assert_no_expressions(expr)?; @@ -950,6 +951,7 @@ impl LogicalPlan { Ok(LogicalPlan::Subquery(Subquery { subquery: Arc::new(subquery), outer_ref_columns: outer_ref_columns.clone(), + depth: *depth, spans: spans.clone(), })) } @@ -3833,6 +3835,7 @@ pub struct Subquery { pub subquery: Arc, /// The outer references used in the subquery pub outer_ref_columns: Vec, + pub depth: usize, /// Span information for subquery projection columns pub spans: Spans, } @@ -3869,6 +3872,7 @@ impl Subquery { Subquery { subquery: plan, outer_ref_columns: self.outer_ref_columns.clone(), + depth: self.depth, spans: Spans::new(), } } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 7f6e1e025387..be4194bfb478 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -159,11 +159,13 @@ impl TreeNode for LogicalPlan { LogicalPlan::Subquery(Subquery { subquery, outer_ref_columns, + depth, spans, }) => subquery.map_elements(f)?.update_data(|subquery| { LogicalPlan::Subquery(Subquery { subquery, outer_ref_columns, + depth, spans, }) }), diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 860e041bb423..a9f187eef396 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -315,6 +315,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { Expr::ScalarSubquery(Subquery { subquery, outer_ref_columns, + depth, spans, }) => { let new_plan = @@ -322,6 +323,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { Ok(Transformed::yes(Expr::ScalarSubquery(Subquery { subquery: Arc::new(new_plan), outer_ref_columns, + depth, spans, }))) } @@ -335,6 +337,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { subquery: Subquery { subquery: Arc::new(new_plan), outer_ref_columns: subquery.outer_ref_columns, + depth: subquery.depth, spans: subquery.spans, }, negated, @@ -359,6 +362,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { let new_subquery = Subquery { subquery: Arc::new(new_plan), outer_ref_columns: subquery.outer_ref_columns, + depth: subquery.depth, spans: subquery.spans, }; Ok(Transformed::yes(Expr::InSubquery(InSubquery::new( diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index a72657bf689d..c000f0d52256 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -134,19 +134,23 @@ fn rewrite_inner_subqueries( let alias = config.alias_generator(); let expr_without_subqueries = expr.transform(|e| match e { Expr::Exists(Exists { - subquery: Subquery { subquery, .. }, + subquery: Subquery { + subquery, depth, .. + }, negated, }) => match mark_join(&cur_input, Arc::clone(&subquery), None, negated, alias)? { Some((plan, exists_expr)) => { cur_input = plan; Ok(Transformed::yes(exists_expr)) } - None if negated => Ok(Transformed::no(not_exists(subquery))), - None => Ok(Transformed::no(exists(subquery))), + None if negated => Ok(Transformed::no(not_exists(subquery, depth))), + None => Ok(Transformed::no(exists(subquery, depth))), }, Expr::InSubquery(InSubquery { expr, - subquery: Subquery { subquery, .. }, + subquery: Subquery { + subquery, depth, .. + }, negated, }) => { let in_predicate = subquery @@ -165,8 +169,10 @@ fn rewrite_inner_subqueries( cur_input = plan; Ok(Transformed::yes(exists_expr)) } - None if negated => Ok(Transformed::no(not_in_subquery(*expr, subquery))), - None => Ok(Transformed::no(in_subquery(*expr, subquery))), + None if negated => { + Ok(Transformed::no(not_in_subquery(*expr, subquery, depth))) + } + None => Ok(Transformed::no(in_subquery(*expr, subquery, depth))), } } _ => Ok(Transformed::no(e)), @@ -409,12 +415,12 @@ impl SubqueryInfo { pub fn expr(self) -> Expr { match self.where_in_expr { Some(expr) => match self.negated { - true => not_in_subquery(expr, self.query.subquery), - false => in_subquery(expr, self.query.subquery), + true => not_in_subquery(expr, self.query.subquery, self.query.depth), + false => in_subquery(expr, self.query.subquery, self.query.depth), }, None => match self.negated { - true => not_exists(self.query.subquery), - false => exists(self.query.subquery), + true => not_exists(self.query.subquery, self.query.depth), + false => exists(self.query.subquery, self.query.depth), }, } } diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 7d5348a14173..a07767fc7ad5 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -34,6 +34,7 @@ impl SqlToRel<'_, S> { // TODO planner_context.push_outer_query_schema(Some(input_schema.clone().into())); planner_context.increase_depth(); + let depth = planner_context.cur_depth(); let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); @@ -44,6 +45,7 @@ impl SqlToRel<'_, S> { subquery: Subquery { subquery: Arc::new(sub_plan), outer_ref_columns, + depth, spans: Spans::new(), }, negated, @@ -61,6 +63,7 @@ impl SqlToRel<'_, S> { // TODO planner_context.push_outer_query_schema(Some(input_schema.clone().into())); planner_context.increase_depth(); + let depth = planner_context.cur_depth(); let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { @@ -92,6 +95,7 @@ impl SqlToRel<'_, S> { Subquery { subquery: Arc::new(sub_plan), outer_ref_columns, + depth, spans, }, negated, @@ -106,6 +110,8 @@ impl SqlToRel<'_, S> { ) -> Result { planner_context.push_outer_query_schema(Some(input_schema.clone().into())); planner_context.increase_depth(); + let depth = planner_context.cur_depth(); + let mut spans = Spans::new(); if let SetExpr::Select(select) = subquery.body.as_ref() { for item in &select.projection { @@ -132,6 +138,7 @@ impl SqlToRel<'_, S> { Ok(Expr::ScalarSubquery(Subquery { subquery: Arc::new(sub_plan), outer_ref_columns, + depth, spans, })) } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 32f4f62472aa..0036ba1ce097 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -271,7 +271,9 @@ impl PlannerContext { /// Returns an iterator over the outer query schemas from back to front, /// along with their indices. - pub fn iter_outer_query_schemas_rev(&self) -> impl Iterator)> { + pub fn iter_outer_query_schemas_rev( + &self, + ) -> impl Iterator)> { self.outer_query_schemas .iter() .enumerate() @@ -293,6 +295,10 @@ impl PlannerContext { self.cur_depth -= 1; } + pub fn cur_depth(&self) -> usize { + self.cur_depth + } + pub fn set_table_schema( &mut self, mut schema: Option, diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index ef17cea20f19..f01d30dc26ab 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -207,12 +207,14 @@ impl SqlToRel<'_, S> { return Ok(plan); } + // TODO: handle depth match plan { LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { subquery_alias( LogicalPlan::Subquery(Subquery { subquery: input, outer_ref_columns, + depth: 1, spans: Spans::new(), }), alias, @@ -221,6 +223,7 @@ impl SqlToRel<'_, S> { plan => Ok(LogicalPlan::Subquery(Subquery { subquery: Arc::new(plan), outer_ref_columns, + depth: 1, spans: Spans::new(), })), } From ee95eebf0dfc2de2c14bea7551aa2fcd6f29d402 Mon Sep 17 00:00:00 2001 From: irenjj Date: Tue, 20 May 2025 08:19:18 +0800 Subject: [PATCH 5/7] fix build --- datafusion-cli/src/cli_context.rs | 4 +++- datafusion-cli/src/exec.rs | 2 +- .../substrait/src/logical_plan/consumer/expr/subquery.rs | 3 +++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/datafusion-cli/src/cli_context.rs b/datafusion-cli/src/cli_context.rs index 516929ebacf1..13da8716548c 100644 --- a/datafusion-cli/src/cli_context.rs +++ b/datafusion-cli/src/cli_context.rs @@ -22,7 +22,7 @@ use datafusion::{ error::DataFusionError, execution::{context::SessionState, TaskContext}, logical_expr::LogicalPlan, - prelude::SessionContext, + prelude::SessionContext, sql::planner::PlannerContext, }; use object_store::ObjectStore; @@ -51,6 +51,7 @@ pub trait CliSessionContext { async fn execute_logical_plan( &self, plan: LogicalPlan, + planner_context: &mut PlannerContext, ) -> Result; } @@ -92,6 +93,7 @@ impl CliSessionContext for SessionContext { async fn execute_logical_plan( &self, plan: LogicalPlan, + planner_context: &mut PlannerContext, ) -> Result { self.execute_logical_plan(plan).await } diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index e664d73744d9..cc49fd607ae5 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -236,7 +236,7 @@ pub(super) async fn exec_and_print( let plan = create_plan(ctx, statement, &mut planner_context).await?; let adjusted = adjusted.with_plan(&plan); - let df = ctx.execute_logical_plan(plan).await?; + let df = ctx.execute_logical_plan(plan, &mut planner_context).await?; let physical_plan = df.create_physical_plan().await?; // Track memory usage for the query result if it's bounded diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs b/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs index f7e4c2bb0fbd..49977a1d38c3 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs @@ -49,6 +49,7 @@ pub async fn from_subquery( subquery: Subquery { subquery: Arc::new(haystack_expr), outer_ref_columns: outer_refs, + depth: 1, spans: Spans::new(), }, negated: false, @@ -68,6 +69,7 @@ pub async fn from_subquery( Ok(Expr::ScalarSubquery(Subquery { subquery: Arc::new(plan), outer_ref_columns, + depth: 1, spans: Spans::new(), })) } @@ -84,6 +86,7 @@ pub async fn from_subquery( Subquery { subquery: Arc::new(plan), outer_ref_columns, + depth: 1, spans: Spans::new(), }, false, From 61d5a9142e5e21a1cc34ad9c119b09de526beb3b Mon Sep 17 00:00:00 2001 From: irenjj Date: Wed, 21 May 2025 20:54:06 +0800 Subject: [PATCH 6/7] add planner_context to session_state --- datafusion-cli/src/cli_context.rs | 9 ++++--- datafusion-cli/src/exec.rs | 4 ++- datafusion/core/src/execution/context/mod.rs | 25 ++++++++++++++++--- .../core/src/execution/session_state.rs | 8 ++++++ 4 files changed, 38 insertions(+), 8 deletions(-) diff --git a/datafusion-cli/src/cli_context.rs b/datafusion-cli/src/cli_context.rs index 13da8716548c..ec6353240421 100644 --- a/datafusion-cli/src/cli_context.rs +++ b/datafusion-cli/src/cli_context.rs @@ -22,7 +22,8 @@ use datafusion::{ error::DataFusionError, execution::{context::SessionState, TaskContext}, logical_expr::LogicalPlan, - prelude::SessionContext, sql::planner::PlannerContext, + prelude::SessionContext, + sql::planner::PlannerContext, }; use object_store::ObjectStore; @@ -51,7 +52,7 @@ pub trait CliSessionContext { async fn execute_logical_plan( &self, plan: LogicalPlan, - planner_context: &mut PlannerContext, + planner_context: Option, ) -> Result; } @@ -93,8 +94,8 @@ impl CliSessionContext for SessionContext { async fn execute_logical_plan( &self, plan: LogicalPlan, - planner_context: &mut PlannerContext, + planner_context: Option, ) -> Result { - self.execute_logical_plan(plan).await + self.execute_logical_plan(plan, planner_context).await } } diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index cc49fd607ae5..819dc4bab1bd 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -236,7 +236,9 @@ pub(super) async fn exec_and_print( let plan = create_plan(ctx, statement, &mut planner_context).await?; let adjusted = adjusted.with_plan(&plan); - let df = ctx.execute_logical_plan(plan, &mut planner_context).await?; + let df = ctx + .execute_logical_plan(plan, Some(planner_context)) + .await?; let physical_plan = df.create_physical_plan().await?; // Track memory usage for the query result if it's bounded diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 81e37d9d7f95..2b30020918ba 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -627,7 +627,8 @@ impl SessionContext { .await?; options.verify_plan(&plan)?; - self.execute_logical_plan(plan).await + // TODO: fetch planner_context + self.execute_logical_plan(plan, None).await } /// Creates logical expressions from SQL query text. @@ -663,7 +664,11 @@ impl SessionContext { /// If you wish to limit the type of plan that can be run from /// SQL, see [`Self::sql_with_options`] and /// [`SQLOptions::verify_plan`]. - pub async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result { + pub async fn execute_logical_plan( + &self, + plan: LogicalPlan, + planner_context: Option, + ) -> Result { match plan { LogicalPlan::Ddl(ddl) => { // Box::pin avoids allocating the stack space within this function's frame @@ -738,7 +743,10 @@ impl SessionContext { .remove_prepared(deallocate.name.as_str())?; self.return_empty_dataframe() } - plan => Ok(DataFrame::new(self.state(), plan)), + plan => Ok(DataFrame::new( + self.with_planner_context_state(planner_context), + plan, + )), } } @@ -1651,6 +1659,17 @@ impl SessionContext { state } + /// Return a new [`SessionState`] with specific [`PlannerContext`]. + pub fn with_planner_context_state( + &self, + planner_context: Option, + ) -> SessionState { + let mut state = self.state.read().clone(); + state.set_planner_context(planner_context); + state.execution_props_mut().start_execution(); + state + } + /// Get reference to [`SessionState`] pub fn state_ref(&self) -> Arc> { Arc::clone(&self.state) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 699e56127beb..fb902dda4e93 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -179,6 +179,7 @@ pub struct SessionState { /// Cache logical plans of prepared statements for later execution. /// Key is the prepared statement name. prepared_plans: HashMap>, + planner_context: Option, } impl Debug for SessionState { @@ -207,6 +208,7 @@ impl Debug for SessionState { .field("aggregate_functions", &self.aggregate_functions) .field("window_functions", &self.window_functions) .field("prepared_plans", &self.prepared_plans) + .field("planner_context", &self.planner_context) .finish() } } @@ -353,6 +355,11 @@ impl SessionState { self.function_factory.as_ref() } + /// Set the planner_context. + pub fn set_planner_context(&mut self, planner_context: Option) { + self.planner_context = planner_context; + } + /// Get the table factories pub fn table_factories(&self) -> &HashMap> { &self.table_factories @@ -1385,6 +1392,7 @@ impl SessionStateBuilder { runtime_env, function_factory, prepared_plans: HashMap::new(), + planner_context: None, }; if let Some(file_formats) = file_formats { From 797d09ecc0f60400a0cb6ea91d7117cd8393b6b6 Mon Sep 17 00:00:00 2001 From: irenjj Date: Thu, 22 May 2025 21:35:14 +0800 Subject: [PATCH 7/7] put PlannerContext into ExecutionProps --- benchmarks/src/cancellation.rs | 2 +- datafusion-cli/src/cli_context.rs | 3 +- datafusion-cli/src/exec.rs | 2 +- datafusion/core/src/execution/context/mod.rs | 4 +- .../core/src/execution/session_state.rs | 13 +- datafusion/expr/src/execution_props.rs | 10 + datafusion/expr/src/lib.rs | 1 + datafusion/expr/src/planner_context.rs | 210 ++++++++++++++++++ datafusion/sql/src/cte.rs | 6 +- datafusion/sql/src/expr/function.rs | 3 +- datafusion/sql/src/expr/grouping_set.rs | 3 +- datafusion/sql/src/expr/identifier.rs | 3 +- datafusion/sql/src/expr/mod.rs | 3 +- datafusion/sql/src/expr/order_by.rs | 3 +- datafusion/sql/src/expr/subquery.rs | 3 +- datafusion/sql/src/expr/substring.rs | 3 +- datafusion/sql/src/expr/unary_op.rs | 5 +- datafusion/sql/src/expr/value.rs | 3 +- datafusion/sql/src/planner.rs | 191 +--------------- datafusion/sql/src/query.rs | 3 +- datafusion/sql/src/relation/join.rs | 4 +- datafusion/sql/src/relation/mod.rs | 3 +- datafusion/sql/src/select.rs | 3 +- datafusion/sql/src/set_expr.rs | 4 +- datafusion/sql/src/statement.rs | 3 +- datafusion/sql/src/values.rs | 4 +- 26 files changed, 273 insertions(+), 222 deletions(-) create mode 100644 datafusion/expr/src/planner_context.rs diff --git a/benchmarks/src/cancellation.rs b/benchmarks/src/cancellation.rs index 27bd2179890b..a78cb531adb2 100644 --- a/benchmarks/src/cancellation.rs +++ b/benchmarks/src/cancellation.rs @@ -30,10 +30,10 @@ use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::listing::{ListingOptions, ListingTableUrl}; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::execution::TaskContext; +use datafusion::logical_expr::planner_context::PlannerContext; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; -use datafusion::sql::planner::PlannerContext; use datafusion_common::instant::Instant; use futures::TryStreamExt; use object_store::ObjectStore; diff --git a/datafusion-cli/src/cli_context.rs b/datafusion-cli/src/cli_context.rs index ec6353240421..5a147b1e0984 100644 --- a/datafusion-cli/src/cli_context.rs +++ b/datafusion-cli/src/cli_context.rs @@ -21,9 +21,8 @@ use datafusion::{ dataframe::DataFrame, error::DataFusionError, execution::{context::SessionState, TaskContext}, - logical_expr::LogicalPlan, + logical_expr::{planner_context::PlannerContext, LogicalPlan}, prelude::SessionContext, - sql::planner::PlannerContext, }; use object_store::ObjectStore; diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 819dc4bab1bd..10b3893d16c3 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -26,7 +26,7 @@ use crate::{ object_storage::get_object_store, print_options::{MaxRows, PrintOptions}, }; -use datafusion::sql::planner::PlannerContext; +use datafusion::logical_expr::planner_context::PlannerContext; use futures::StreamExt; use std::collections::HashMap; use std::fs::File; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 2b30020918ba..7e4d7b972dcb 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -74,6 +74,7 @@ pub use datafusion_execution::config::SessionConfig; use datafusion_execution::registry::SerializerRegistry; pub use datafusion_execution::TaskContext; pub use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::{ expr_rewriter::FunctionRewrite, logical_plan::{DdlStatement, Statement}, @@ -87,7 +88,6 @@ use datafusion_session::SessionStore; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use datafusion_sql::planner::PlannerContext; use object_store::ObjectStore; use parking_lot::RwLock; use url::Url; @@ -1665,7 +1665,7 @@ impl SessionContext { planner_context: Option, ) -> SessionState { let mut state = self.state.read().clone(); - state.set_planner_context(planner_context); + state.set_planner_context_in_execution_props(planner_context); state.execution_props_mut().start_execution(); state } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index fb902dda4e93..448682d37a09 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -52,6 +52,7 @@ use datafusion_execution::TaskContext; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr_rewriter::FunctionRewrite; use datafusion_expr::planner::{ExprPlanner, TypePlanner}; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::registry::{FunctionRegistry, SerializerRegistry}; use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::var_provider::{is_system_variables, VarType}; @@ -70,7 +71,7 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::ExecutionPlan; use datafusion_session::Session; use datafusion_sql::parser::{DFParserBuilder, Statement}; -use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel}; +use datafusion_sql::planner::{ContextProvider, ParserOptions, SqlToRel}; use async_trait::async_trait; use chrono::{DateTime, Utc}; @@ -356,8 +357,11 @@ impl SessionState { } /// Set the planner_context. - pub fn set_planner_context(&mut self, planner_context: Option) { - self.planner_context = planner_context; + pub fn set_planner_context_in_execution_props( + &mut self, + planner_context: Option, + ) { + self.execution_props.set_planner_context(planner_context); } /// Get the table factories @@ -1991,11 +1995,12 @@ mod tests { use datafusion_common::DFSchema; use datafusion_common::Result; use datafusion_execution::config::SessionConfig; + use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::Expr; use datafusion_optimizer::optimizer::OptimizerRule; use datafusion_optimizer::Optimizer; use datafusion_physical_plan::display::DisplayableExecutionPlan; - use datafusion_sql::planner::{PlannerContext, SqlToRel}; + use datafusion_sql::planner::SqlToRel; use std::collections::HashMap; use std::sync::Arc; diff --git a/datafusion/expr/src/execution_props.rs b/datafusion/expr/src/execution_props.rs index d672bd1acc46..072102319538 100644 --- a/datafusion/expr/src/execution_props.rs +++ b/datafusion/expr/src/execution_props.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::planner_context::PlannerContext; use crate::var_provider::{VarProvider, VarType}; use chrono::{DateTime, TimeZone, Utc}; use datafusion_common::alias::AliasGenerator; @@ -37,6 +38,7 @@ pub struct ExecutionProps { pub alias_generator: Arc, /// Providers for scalar variables pub var_providers: Option>>, + pub planner_context: Option, } impl Default for ExecutionProps { @@ -54,6 +56,7 @@ impl ExecutionProps { query_execution_start_time: Utc.timestamp_nanos(0), alias_generator: Arc::new(AliasGenerator::new()), var_providers: None, + planner_context: None, } } @@ -66,6 +69,13 @@ impl ExecutionProps { self } + pub fn set_planner_context( + &mut self, + planner_context: Option, + ) { + self.planner_context = planner_context; + } + /// Marks the execution of query started timestamp. /// This also instantiates a new alias generator. pub fn start_execution(&mut self) -> &Self { diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 48931d6525af..e226ee502f94 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -42,6 +42,7 @@ mod udaf; mod udf; mod udwf; +pub mod planner_context; pub mod conditional_expressions; pub mod execution_props; pub mod expr; diff --git a/datafusion/expr/src/planner_context.rs b/datafusion/expr/src/planner_context.rs new file mode 100644 index 000000000000..318fc1056af5 --- /dev/null +++ b/datafusion/expr/src/planner_context.rs @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::datatypes::DataType; +use datafusion_common::{DFSchema, DFSchemaRef, HashMap, Result}; + +use crate::LogicalPlan; + +/// Struct to store the states used by the Planner. The Planner will leverage the states +/// to resolve CTEs, Views, subqueries and PREPARE statements. The states include +/// Common Table Expression (CTE) provided with WITH clause and +/// Parameter Data Types provided with PREPARE statement and the query schema of the +/// outer query plan. +/// +/// # Cloning +/// +/// Only the `ctes` are truly cloned when the `PlannerContext` is cloned. +/// This helps resolve scoping issues of CTEs. +/// By using cloning, a subquery can inherit CTEs from the outer query +/// and can also define its own private CTEs without affecting the outer query. +/// +#[derive(Debug, Clone)] +pub struct PlannerContext { + /// Data types for numbered parameters ($1, $2, etc), if supplied + /// in `PREPARE` statement + prepare_param_data_types: Arc>, + /// Map of CTE name to logical plan of the WITH clause. + /// Use `Arc` to allow cheap cloning + ctes: HashMap>, + /// The joined schemas of all FROM clauses planned so far. When planning LATERAL + /// FROM clauses, this should become a suffix of the `outer_query_schema`. + outer_from_schema: Option, + /// The query schema defined by the table + create_table_schema: Option, + + // TODO: take outer_from_schema and create_table_schema into consideration. + /// All levels query schema of the outer query plan, used to resolve the columns in subquery. + /// Use `depth` to index different level outer_query_schema. + /// For example: + /// SELECT name <---------------------------------------- depth = 0 + /// FROM employees e + /// WHERE salary > ( + /// SELECT AVG(salary) <----------------------------- depth = 1 + /// FROM employees e2 + /// WHERE e2.department_id = e.department_id + /// AND e.department_id IN ( + /// SELECT department_id <--------------------- depth = 2 + /// FROM employees + /// GROUP BY department_id + /// HAVING AVG(salary) > ( + /// SELECT AVG(salary) + /// FROM employees + /// ) + /// ) + /// ); + outer_query_schemas: Vec>, + /// Current depth of query, starting from 0. + cur_depth: usize, +} + +impl Default for PlannerContext { + fn default() -> Self { + Self::new() + } +} + +impl PlannerContext { + /// Create an empty PlannerContext + pub fn new() -> Self { + Self { + prepare_param_data_types: Arc::new(vec![]), + ctes: HashMap::new(), + outer_from_schema: None, + create_table_schema: None, + outer_query_schemas: vec![None], // depth 0 has no outer query schema + cur_depth: 0, + } + } + + /// Update the PlannerContext with provided prepare_param_data_types + pub fn with_prepare_param_data_types( + mut self, + prepare_param_data_types: Vec, + ) -> Self { + self.prepare_param_data_types = prepare_param_data_types.into(); + self + } + + // TODO: replace all places with outer_query_schemas() + // Return a reference to the outer query's schema + pub fn outer_query_schema(&self) -> Option<&DFSchema> { + self.outer_query_schemas[self.cur_depth] + .as_ref() + .map(|s| s.as_ref()) + } + + pub fn outer_query_schemas(&self) -> &[Option] { + &self.outer_query_schemas + } + + /// Returns an iterator over the outer query schemas from back to front, + /// along with their indices. + pub fn iter_outer_query_schemas_rev( + &self, + ) -> impl Iterator)> { + self.outer_query_schemas + .iter() + .enumerate() + .rev() + .map(|(i, schema_ref)| (i, schema_ref.as_ref().map(|s| s.as_ref()))) + } + + /// Sets the outer query schema, returning the existing one, if + /// any + pub fn push_outer_query_schema(&mut self, schema: Option) { + self.outer_query_schemas.push(schema); + } + + pub fn increase_depth(&mut self) { + self.cur_depth += 1; + } + + pub fn decrease_depth(&mut self) { + self.cur_depth -= 1; + } + + pub fn cur_depth(&self) -> usize { + self.cur_depth + } + + pub fn set_table_schema( + &mut self, + mut schema: Option, + ) -> Option { + std::mem::swap(&mut self.create_table_schema, &mut schema); + schema + } + + pub fn table_schema(&self) -> Option { + self.create_table_schema.clone() + } + + // Return a clone of the outer FROM schema + pub fn outer_from_schema(&self) -> Option> { + self.outer_from_schema.clone() + } + + /// Sets the outer FROM schema, returning the existing one, if any + pub fn set_outer_from_schema( + &mut self, + mut schema: Option, + ) -> Option { + std::mem::swap(&mut self.outer_from_schema, &mut schema); + schema + } + + /// Extends the FROM schema, returning the existing one, if any + pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> { + match self.outer_from_schema.as_mut() { + Some(from_schema) => Arc::make_mut(from_schema).merge(schema), + None => self.outer_from_schema = Some(Arc::clone(schema)), + }; + Ok(()) + } + + /// Return the types of parameters (`$1`, `$2`, etc) if known + pub fn prepare_param_data_types(&self) -> &[DataType] { + &self.prepare_param_data_types + } + + /// Returns true if there is a Common Table Expression (CTE) / + /// Subquery for the specified name + pub fn contains_cte(&self, cte_name: &str) -> bool { + self.ctes.contains_key(cte_name) + } + + /// Inserts a LogicalPlan for the Common Table Expression (CTE) / + /// Subquery for the specified name + pub fn insert_cte(&mut self, cte_name: impl Into, plan: LogicalPlan) { + let cte_name = cte_name.into(); + self.ctes.insert(cte_name, Arc::new(plan)); + } + + /// Return a plan for the Common Table Expression (CTE) / Subquery for the + /// specified name + pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> { + self.ctes.get(cte_name).map(|cte| cte.as_ref()) + } + + /// Remove the plan of CTE / Subquery for the specified name + pub fn remove_cte(&mut self, cte_name: &str) { + self.ctes.remove(cte_name); + } +} diff --git a/datafusion/sql/src/cte.rs b/datafusion/sql/src/cte.rs index 3650aea9c3c2..881a11f5bfe4 100644 --- a/datafusion/sql/src/cte.rs +++ b/datafusion/sql/src/cte.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use arrow::datatypes::Schema; use datafusion_common::{ @@ -25,7 +25,9 @@ use datafusion_common::{ tree_node::{TreeNode, TreeNodeRecursion}, Result, }; -use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, TableSource}; +use datafusion_expr::{ + planner_context::PlannerContext, LogicalPlan, LogicalPlanBuilder, TableSource, +}; use sqlparser::ast::{Query, SetExpr, SetOperator, With}; impl SqlToRel<'_, S> { diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 535300427ad8..4169369feafa 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use arrow::datatypes::DataType; use datafusion_common::{ @@ -24,6 +24,7 @@ use datafusion_common::{ }; use datafusion_expr::expr::{ScalarFunction, Unnest, WildcardOptions}; use datafusion_expr::planner::{PlannerResult, RawAggregateExpr, RawWindowExpr}; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::{ expr, Expr, ExprFunctionExt, ExprSchemable, WindowFrame, WindowFunctionDefinition, }; diff --git a/datafusion/sql/src/expr/grouping_set.rs b/datafusion/sql/src/expr/grouping_set.rs index bedbf2a7d347..b0ce2e6d9c90 100644 --- a/datafusion/sql/src/expr/grouping_set.rs +++ b/datafusion/sql/src/expr/grouping_set.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use datafusion_common::plan_err; use datafusion_common::{DFSchema, Result}; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::{Expr, GroupingSet}; use sqlparser::ast::Expr as SQLExpr; diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 2544d4e3c99e..52fab312b25a 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -21,10 +21,11 @@ use datafusion_common::{ DataFusionError, Result, Span, TableReference, }; use datafusion_expr::planner::PlannerResult; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::{Case, Expr}; use sqlparser::ast::{CaseWhen, Expr as SQLExpr, Ident}; -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use datafusion_expr::UNNAMED_TABLE; impl SqlToRel<'_, S> { diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index d29ccdc6a7e9..cb16195734aa 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -19,6 +19,7 @@ use arrow::datatypes::{DataType, TimeUnit}; use datafusion_expr::planner::{ PlannerResult, RawBinaryExpr, RawDictionaryExpr, RawFieldAccessExpr, }; +use datafusion_expr::planner_context::PlannerContext; use sqlparser::ast::{ AccessExpr, BinaryOperator, CastFormat, CastKind, DataType as SQLDataType, DictionaryField, Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias, MapEntry, @@ -37,7 +38,7 @@ use datafusion_expr::{ Operator, TryCast, }; -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; mod binary_op; mod function; diff --git a/datafusion/sql/src/expr/order_by.rs b/datafusion/sql/src/expr/order_by.rs index d357c3753e13..ba609b5de0c0 100644 --- a/datafusion/sql/src/expr/order_by.rs +++ b/datafusion/sql/src/expr/order_by.rs @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use datafusion_common::{ not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, Result, }; use datafusion_expr::expr::Sort; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::{Expr, SortExpr}; use sqlparser::ast::{ Expr as SQLExpr, OrderByExpr, OrderByOptions, Value, ValueWithSpan, diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index a07767fc7ad5..5a7c0d87f693 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use datafusion_common::{plan_err, DFSchema, Diagnostic, Result, Span, Spans}; use datafusion_expr::expr::{Exists, InSubquery}; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::{Expr, LogicalPlan, Subquery}; use sqlparser::ast::Expr as SQLExpr; use sqlparser::ast::{Query, SelectItem, SetExpr}; diff --git a/datafusion/sql/src/expr/substring.rs b/datafusion/sql/src/expr/substring.rs index 59c78bc713cc..0b241a717011 100644 --- a/datafusion/sql/src/expr/substring.rs +++ b/datafusion/sql/src/expr/substring.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use datafusion_common::{not_impl_err, plan_err}; use datafusion_common::{DFSchema, Result, ScalarValue}; use datafusion_expr::planner::PlannerResult; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::Expr; use sqlparser::ast::Expr as SQLExpr; diff --git a/datafusion/sql/src/expr/unary_op.rs b/datafusion/sql/src/expr/unary_op.rs index e0c94543f601..8fdd2a88adec 100644 --- a/datafusion/sql/src/expr/unary_op.rs +++ b/datafusion/sql/src/expr/unary_op.rs @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use datafusion_common::{not_impl_err, plan_err, DFSchema, Diagnostic, Result}; use datafusion_expr::{ - type_coercion::{is_interval, is_timestamp}, - Expr, ExprSchemable, + planner_context::PlannerContext, type_coercion::{is_interval, is_timestamp}, Expr, ExprSchemable }; use sqlparser::ast::{Expr as SQLExpr, UnaryOperator, Value, ValueWithSpan}; diff --git a/datafusion/sql/src/expr/value.rs b/datafusion/sql/src/expr/value.rs index be4a45a25750..0860f6225022 100644 --- a/datafusion/sql/src/expr/value.rs +++ b/datafusion/sql/src/expr/value.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use arrow::compute::kernels::cast_utils::{ parse_interval_month_day_nano_config, IntervalParseConfig, IntervalUnit, }; @@ -30,6 +30,7 @@ use datafusion_common::{ }; use datafusion_expr::expr::{BinaryExpr, Placeholder}; use datafusion_expr::planner::PlannerResult; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::{lit, Expr, Operator}; use log::debug; use sqlparser::ast::{ diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 0036ba1ce097..929058a00e99 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -16,7 +16,6 @@ // under the License. //! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST) -use std::collections::HashMap; use std::sync::Arc; use std::vec; @@ -25,11 +24,12 @@ use datafusion_common::config::SqlParserOptions; use datafusion_common::error::add_possible_columns_to_diag; use datafusion_common::TableReference; use datafusion_common::{ - field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, Diagnostic, + field_not_found, internal_err, plan_datafusion_err, Diagnostic, SchemaError, }; use datafusion_common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result}; use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder}; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::utils::find_column_exprs; use datafusion_expr::{col, Expr}; use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo}; @@ -177,193 +177,6 @@ impl IdentNormalizer { } } -/// Struct to store the states used by the Planner. The Planner will leverage the states -/// to resolve CTEs, Views, subqueries and PREPARE statements. The states include -/// Common Table Expression (CTE) provided with WITH clause and -/// Parameter Data Types provided with PREPARE statement and the query schema of the -/// outer query plan. -/// -/// # Cloning -/// -/// Only the `ctes` are truly cloned when the `PlannerContext` is cloned. -/// This helps resolve scoping issues of CTEs. -/// By using cloning, a subquery can inherit CTEs from the outer query -/// and can also define its own private CTEs without affecting the outer query. -/// -#[derive(Debug, Clone)] -pub struct PlannerContext { - /// Data types for numbered parameters ($1, $2, etc), if supplied - /// in `PREPARE` statement - prepare_param_data_types: Arc>, - /// Map of CTE name to logical plan of the WITH clause. - /// Use `Arc` to allow cheap cloning - ctes: HashMap>, - /// The joined schemas of all FROM clauses planned so far. When planning LATERAL - /// FROM clauses, this should become a suffix of the `outer_query_schema`. - outer_from_schema: Option, - /// The query schema defined by the table - create_table_schema: Option, - - // TODO: take outer_from_schema and create_table_schema into consideration. - /// All levels query schema of the outer query plan, used to resolve the columns in subquery. - /// Use `depth` to index different level outer_query_schema. - /// For example: - /// SELECT name <---------------------------------------- depth = 0 - /// FROM employees e - /// WHERE salary > ( - /// SELECT AVG(salary) <----------------------------- depth = 1 - /// FROM employees e2 - /// WHERE e2.department_id = e.department_id - /// AND e.department_id IN ( - /// SELECT department_id <--------------------- depth = 2 - /// FROM employees - /// GROUP BY department_id - /// HAVING AVG(salary) > ( - /// SELECT AVG(salary) - /// FROM employees - /// ) - /// ) - /// ); - outer_query_schemas: Vec>, - /// Current depth of query, starting from 0. - cur_depth: usize, -} - -impl Default for PlannerContext { - fn default() -> Self { - Self::new() - } -} - -impl PlannerContext { - /// Create an empty PlannerContext - pub fn new() -> Self { - Self { - prepare_param_data_types: Arc::new(vec![]), - ctes: HashMap::new(), - outer_from_schema: None, - create_table_schema: None, - outer_query_schemas: vec![None], // depth 0 has no outer query schema - cur_depth: 0, - } - } - - /// Update the PlannerContext with provided prepare_param_data_types - pub fn with_prepare_param_data_types( - mut self, - prepare_param_data_types: Vec, - ) -> Self { - self.prepare_param_data_types = prepare_param_data_types.into(); - self - } - - // TODO: replace all places with outer_query_schemas() - // Return a reference to the outer query's schema - pub fn outer_query_schema(&self) -> Option<&DFSchema> { - self.outer_query_schemas[self.cur_depth] - .as_ref() - .map(|s| s.as_ref()) - } - - pub fn outer_query_schemas(&self) -> &[Option] { - &self.outer_query_schemas - } - - /// Returns an iterator over the outer query schemas from back to front, - /// along with their indices. - pub fn iter_outer_query_schemas_rev( - &self, - ) -> impl Iterator)> { - self.outer_query_schemas - .iter() - .enumerate() - .rev() - .map(|(i, schema_ref)| (i, schema_ref.as_ref().map(|s| s.as_ref()))) - } - - /// Sets the outer query schema, returning the existing one, if - /// any - pub fn push_outer_query_schema(&mut self, schema: Option) { - self.outer_query_schemas.push(schema); - } - - pub fn increase_depth(&mut self) { - self.cur_depth += 1; - } - - pub fn decrease_depth(&mut self) { - self.cur_depth -= 1; - } - - pub fn cur_depth(&self) -> usize { - self.cur_depth - } - - pub fn set_table_schema( - &mut self, - mut schema: Option, - ) -> Option { - std::mem::swap(&mut self.create_table_schema, &mut schema); - schema - } - - pub fn table_schema(&self) -> Option { - self.create_table_schema.clone() - } - - // Return a clone of the outer FROM schema - pub fn outer_from_schema(&self) -> Option> { - self.outer_from_schema.clone() - } - - /// Sets the outer FROM schema, returning the existing one, if any - pub fn set_outer_from_schema( - &mut self, - mut schema: Option, - ) -> Option { - std::mem::swap(&mut self.outer_from_schema, &mut schema); - schema - } - - /// Extends the FROM schema, returning the existing one, if any - pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> { - match self.outer_from_schema.as_mut() { - Some(from_schema) => Arc::make_mut(from_schema).merge(schema), - None => self.outer_from_schema = Some(Arc::clone(schema)), - }; - Ok(()) - } - - /// Return the types of parameters (`$1`, `$2`, etc) if known - pub fn prepare_param_data_types(&self) -> &[DataType] { - &self.prepare_param_data_types - } - - /// Returns true if there is a Common Table Expression (CTE) / - /// Subquery for the specified name - pub fn contains_cte(&self, cte_name: &str) -> bool { - self.ctes.contains_key(cte_name) - } - - /// Inserts a LogicalPlan for the Common Table Expression (CTE) / - /// Subquery for the specified name - pub fn insert_cte(&mut self, cte_name: impl Into, plan: LogicalPlan) { - let cte_name = cte_name.into(); - self.ctes.insert(cte_name, Arc::new(plan)); - } - - /// Return a plan for the Common Table Expression (CTE) / Subquery for the - /// specified name - pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> { - self.ctes.get(cte_name).map(|cte| cte.as_ref()) - } - - /// Remove the plan of CTE / Subquery for the specified name - pub(super) fn remove_cte(&mut self, cte_name: &str) { - self.ctes.remove(cte_name); - } -} - /// SQL query planner and binder /// /// This struct is used to convert a SQL AST into a [`LogicalPlan`]. diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index f42a3ad138c4..067e5fbcc4d8 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -17,12 +17,13 @@ use std::sync::Arc; -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use crate::stack::StackGuard; use datafusion_common::{not_impl_err, Constraints, DFSchema, Result}; use datafusion_expr::expr::Sort; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, }; diff --git a/datafusion/sql/src/relation/join.rs b/datafusion/sql/src/relation/join.rs index 8a3c20e3971b..dd020d8c45b3 100644 --- a/datafusion/sql/src/relation/join.rs +++ b/datafusion/sql/src/relation/join.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use datafusion_common::{not_impl_err, plan_datafusion_err, Column, Result}; -use datafusion_expr::{JoinType, LogicalPlan, LogicalPlanBuilder}; +use datafusion_expr::{planner_context::PlannerContext, JoinType, LogicalPlan, LogicalPlanBuilder}; use sqlparser::ast::{ Join, JoinConstraint, JoinOperator, ObjectName, TableFactor, TableWithJoins, }; diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index f01d30dc26ab..cbcc75362a99 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -17,13 +17,14 @@ use std::sync::Arc; -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{ not_impl_err, plan_err, DFSchema, Diagnostic, Result, Span, Spans, TableReference, }; use datafusion_expr::builder::subquery_alias; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder}; use datafusion_expr::{Subquery, SubqueryAlias}; use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor}; diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index c8837c947723..71b973772dba 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -19,7 +19,7 @@ use std::collections::HashSet; use std::ops::ControlFlow; use std::sync::Arc; -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use crate::query::to_order_by_exprs_with_select; use crate::utils::{ check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs, @@ -35,6 +35,7 @@ use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; use datafusion_expr::expr_rewriter::{ normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_sorts, }; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::utils::{ expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs, diff --git a/datafusion/sql/src/set_expr.rs b/datafusion/sql/src/set_expr.rs index 78c63d7db7bd..8597821be9f7 100644 --- a/datafusion/sql/src/set_expr.rs +++ b/datafusion/sql/src/set_expr.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use datafusion_common::{ not_impl_err, plan_err, DataFusionError, Diagnostic, Result, Span, }; -use datafusion_expr::{LogicalPlan, LogicalPlanBuilder}; +use datafusion_expr::{planner_context::PlannerContext, LogicalPlan, LogicalPlanBuilder}; use sqlparser::ast::{SetExpr, SetOperator, SetQuantifier, Spanned}; impl SqlToRel<'_, S> { diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 73e0275e7472..e6980f427583 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -25,7 +25,7 @@ use crate::parser::{ LexOrdering, Statement as DFStatement, }; use crate::planner::{ - object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel, + object_name_to_qualifier, ContextProvider, SqlToRel, }; use crate::utils::normalize_ident; @@ -42,6 +42,7 @@ use datafusion_expr::dml::{CopyTo, InsertOp}; use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check; use datafusion_expr::logical_plan::builder::project; use datafusion_expr::logical_plan::DdlStatement; +use datafusion_expr::planner_context::PlannerContext; use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{ cast, col, Analyze, CreateCatalog, CreateCatalogSchema, diff --git a/datafusion/sql/src/values.rs b/datafusion/sql/src/values.rs index dd8957c95470..376f98079361 100644 --- a/datafusion/sql/src/values.rs +++ b/datafusion/sql/src/values.rs @@ -17,9 +17,9 @@ use std::sync::Arc; -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use crate::planner::{ContextProvider, SqlToRel}; use datafusion_common::{DFSchema, Result}; -use datafusion_expr::{LogicalPlan, LogicalPlanBuilder}; +use datafusion_expr::{planner_context::PlannerContext, LogicalPlan, LogicalPlanBuilder}; use sqlparser::ast::Values as SQLValues; impl SqlToRel<'_, S> {