Skip to content

Commit f45bec4

Browse files
committed
Rework ensure_coop to base itself on evaluation and scheduling properties
1 parent 935db91 commit f45bec4

File tree

13 files changed

+112
-98
lines changed

13 files changed

+112
-98
lines changed

datafusion/datasource/src/sink.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use datafusion_physical_plan::{
3737

3838
use async_trait::async_trait;
3939
use futures::StreamExt;
40+
use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType};
4041

4142
/// `DataSink` implements writing streams of [`RecordBatch`]es to
4243
/// user defined destinations.
@@ -140,7 +141,8 @@ impl DataSinkExec {
140141
Partitioning::UnknownPartitioning(1),
141142
input.pipeline_behavior(),
142143
input.boundedness(),
143-
)
144+
).with_scheduling_type(SchedulingType::Cooperative)
145+
.with_evaluation_type(EvaluationType::Eager)
144146
}
145147
}
146148

@@ -246,10 +248,6 @@ impl ExecutionPlan for DataSinkExec {
246248
fn metrics(&self) -> Option<MetricsSet> {
247249
self.sink.metrics()
248250
}
249-
250-
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
251-
Some(self)
252-
}
253251
}
254252

255253
/// Create a output record batch with a count

datafusion/datasource/src/source.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::fmt;
2222
use std::fmt::{Debug, Formatter};
2323
use std::sync::Arc;
2424

25-
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
25+
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType, SchedulingType};
2626
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
2727
use datafusion_physical_plan::projection::ProjectionExec;
2828
use datafusion_physical_plan::{
@@ -262,10 +262,6 @@ impl ExecutionPlan for DataSourceExec {
262262
.map(|stream| cooperative_wrapper(stream))
263263
}
264264

265-
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
266-
Some(self)
267-
}
268-
269265
fn metrics(&self) -> Option<MetricsSet> {
270266
Some(self.data_source.metrics().clone_inner())
271267
}
@@ -379,7 +375,7 @@ impl DataSourceExec {
379375
data_source.output_partitioning(),
380376
EmissionType::Incremental,
381377
Boundedness::Bounded,
382-
)
378+
).with_scheduling_type(SchedulingType::Cooperative)
383379
}
384380

385381
/// Downcast the `DataSourceExec`'s `data_source` to a specific file source

datafusion/physical-optimizer/src/ensure_coop.rs

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ use datafusion_common::config::ConfigOptions;
2929
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
3030
use datafusion_common::Result;
3131
use datafusion_physical_plan::coop::CooperativeExec;
32-
use datafusion_physical_plan::ExecutionPlan;
32+
use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType};
33+
use datafusion_physical_plan::{displayable, ExecutionPlan};
3334

34-
/// `EnsureCooperative` is a [`PhysicalOptimizerRule`] that finds every leaf node in
35-
/// the plan and replaces it with a variant that yields cooperatively if supported.
36-
/// If the node does not provide a built-in yielding variant via
37-
/// [`ExecutionPlan::with_cooperative_yields`], it is wrapped in a [`CooperativeExec`] parent.
35+
/// `EnsureCooperative` is a [`PhysicalOptimizerRule`] that inspects the physical plan for
36+
/// sub plans that do not participate in cooperative scheduling. The plan is subdivided into sub
37+
/// plans on eager evaluation boundaries. Leaf nodes and eager evaluation roots are checked
38+
/// to see if they participate in cooperative scheduling. Those that do no are wrapped in
39+
/// a [`CooperativeExec`] parent.
3840
pub struct EnsureCooperative {}
3941

4042
impl EnsureCooperative {
@@ -51,36 +53,31 @@ impl Default for EnsureCooperative {
5153

5254
impl Debug for EnsureCooperative {
5355
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54-
f.debug_struct("InsertYieldExec").finish()
56+
f.debug_struct("EnsureCooperative").finish()
5557
}
5658
}
5759

5860
impl PhysicalOptimizerRule for EnsureCooperative {
5961
fn name(&self) -> &str {
60-
"insert_yield_exec"
62+
"ensure_cooperative"
6163
}
6264

6365
fn optimize(
6466
&self,
6567
plan: Arc<dyn ExecutionPlan>,
6668
_config: &ConfigOptions,
6769
) -> Result<Arc<dyn ExecutionPlan>> {
68-
plan.transform_down(|plan| {
69-
if !plan.children().is_empty() {
70-
// Not a leaf, keep recursing down.
71-
return Ok(Transformed::no(plan));
70+
plan.transform_up(|plan| {
71+
let is_leaf = plan.children().is_empty();
72+
let is_exchange = plan.properties().evaluation_type == EvaluationType::Eager;
73+
if (is_leaf || is_exchange) && plan.properties().scheduling_type != SchedulingType::Cooperative {
74+
// Wrap non-cooperative leaves or eager evaluation roots in a cooperative exec to
75+
// ensure the plans they participate in are properly cooperative.
76+
Ok(Transformed::new(Arc::new(CooperativeExec::new(Arc::clone(&plan))), true, TreeNodeRecursion::Continue))
77+
} else {
78+
Ok(Transformed::no(plan))
7279
}
73-
// For leaf nodes, try to get a built-in cooperative-yielding variant.
74-
let new_plan =
75-
Arc::clone(&plan)
76-
.with_cooperative_yields()
77-
.unwrap_or_else(|| {
78-
// Only if no built-in variant exists, insert a `CooperativeExec`.
79-
Arc::new(CooperativeExec::new(plan))
80-
});
81-
Ok(Transformed::new(new_plan, true, TreeNodeRecursion::Jump))
82-
})
83-
.map(|t| t.data)
80+
}).map(|t| t.data)
8481
}
8582

8683
fn schema_check(&self) -> bool {

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use super::{
2727
DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
2828
Statistics,
2929
};
30-
use crate::execution_plan::CardinalityEffect;
30+
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
3131
use crate::projection::{make_with_child, ProjectionExec};
3232
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
3333

@@ -72,6 +72,16 @@ impl CoalescePartitionsExec {
7272

7373
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
7474
fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
75+
let input_partitions = input.output_partitioning().partition_count();
76+
let (drive, scheduling) = if input_partitions > 1 {
77+
(EvaluationType::Eager, SchedulingType::Cooperative)
78+
} else {
79+
(
80+
input.properties().evaluation_type,
81+
input.properties().scheduling_type,
82+
)
83+
};
84+
7585
// Coalescing partitions loses existing orderings:
7686
let mut eq_properties = input.equivalence_properties().clone();
7787
eq_properties.clear_orderings();
@@ -82,6 +92,8 @@ impl CoalescePartitionsExec {
8292
input.pipeline_behavior(),
8393
input.boundedness(),
8494
)
95+
.with_evaluation_type(drive)
96+
.with_scheduling_type(scheduling)
8597
}
8698
}
8799

datafusion/physical-plan/src/coop.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use arrow_schema::Schema;
3030
use datafusion_common::{internal_err, Result, Statistics};
3131
use datafusion_execution::TaskContext;
3232

33+
use crate::execution_plan::SchedulingType;
3334
use crate::stream::RecordBatchStreamAdapter;
3435
use futures::{FutureExt, Stream};
3536
use pin_project_lite::pin_project;
@@ -107,14 +108,20 @@ pub struct CooperativeExec {
107108
/// The child execution plan that this operator "wraps" to make it
108109
/// cooperate with the runtime.
109110
child: Arc<dyn ExecutionPlan>,
111+
properties: PlanProperties,
110112
}
111113

112114
impl CooperativeExec {
113115
/// Create a new `CooperativeExec` operator that wraps the given child
114116
/// execution plan and yields control back to the runtime every `frequency`
115117
/// batches.
116118
pub fn new(child: Arc<dyn ExecutionPlan>) -> Self {
117-
Self { child }
119+
let properties = child
120+
.properties()
121+
.clone()
122+
.with_scheduling_type(SchedulingType::Cooperative);
123+
124+
Self { child, properties }
118125
}
119126

120127
/// Returns the child execution plan this operator "wraps" to make it
@@ -148,7 +155,7 @@ impl ExecutionPlan for CooperativeExec {
148155
}
149156

150157
fn properties(&self) -> &PlanProperties {
151-
self.child.properties()
158+
&self.properties
152159
}
153160

154161
fn maintains_input_order(&self) -> Vec<bool> {

datafusion/physical-plan/src/empty.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use datafusion_common::{internal_err, Result};
3333
use datafusion_execution::TaskContext;
3434
use datafusion_physical_expr::EquivalenceProperties;
3535

36+
use crate::execution_plan::SchedulingType;
3637
use log::trace;
3738

3839
/// Execution plan for empty relation with produce_one_row=false
@@ -81,6 +82,7 @@ impl EmptyExec {
8182
EmissionType::Incremental,
8283
Boundedness::Bounded,
8384
)
85+
.with_scheduling_type(SchedulingType::Cooperative)
8486
}
8587
}
8688

@@ -173,10 +175,6 @@ impl ExecutionPlan for EmptyExec {
173175
None,
174176
))
175177
}
176-
177-
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
178-
Some(self)
179-
}
180178
}
181179

182180
#[cfg(test)]

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ use crate::coalesce_partitions::CoalescePartitionsExec;
4141
use crate::display::DisplayableExecutionPlan;
4242
use crate::metrics::MetricsSet;
4343
use crate::projection::ProjectionExec;
44-
use crate::repartition::RepartitionExec;
45-
use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
4644
use crate::stream::RecordBatchStreamAdapter;
4745

4846
use arrow::array::{Array, RecordBatch};
@@ -559,16 +557,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
559557
child_pushdown_result,
560558
))
561559
}
562-
563-
/// Returns a version of this plan that cooperates with the runtime via
564-
/// built‐in yielding. If such a version doesn't exist, returns `None`.
565-
/// You do not need to do provide such a version of a custom operator,
566-
/// but DataFusion will utilize it while optimizing the plan if it exists.
567-
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
568-
// Conservative default implementation assumes that a leaf does not
569-
// cooperate with yielding.
570-
None
571-
}
572560
}
573561

574562
/// [`ExecutionPlan`] Invariant Level
@@ -743,6 +731,26 @@ pub enum EmissionType {
743731
Both,
744732
}
745733

734+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
735+
pub enum SchedulingType {
736+
/// The stream generated by [`execute`](ExecutionPlan::execute) does not participate in cooperative scheduling
737+
Blocking,
738+
/// The stream generated by [`execute`](ExecutionPlan::execute) actively participates in cooperative scheduling
739+
/// by consuming task budget
740+
Cooperative,
741+
}
742+
743+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
744+
pub enum EvaluationType {
745+
/// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch`
746+
/// instances when it is demanded by invoking `Stream::poll_next`.
747+
Lazy,
748+
/// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch`
749+
/// in one or more spawned Tokio tasks. Eager evaluation is only started the first time
750+
/// `Stream::poll_next` is called.
751+
Eager,
752+
}
753+
746754
/// Utility to determine an operator's boundedness based on its children's boundedness.
747755
///
748756
/// Assumes boundedness can be inferred from child operators:
@@ -831,6 +839,8 @@ pub struct PlanProperties {
831839
pub emission_type: EmissionType,
832840
/// See [ExecutionPlanProperties::boundedness]
833841
pub boundedness: Boundedness,
842+
pub evaluation_type: EvaluationType,
843+
pub scheduling_type: SchedulingType,
834844
/// See [ExecutionPlanProperties::output_ordering]
835845
output_ordering: Option<LexOrdering>,
836846
}
@@ -850,6 +860,8 @@ impl PlanProperties {
850860
partitioning,
851861
emission_type,
852862
boundedness,
863+
evaluation_type: EvaluationType::Lazy,
864+
scheduling_type: SchedulingType::Blocking,
853865
output_ordering,
854866
}
855867
}
@@ -881,6 +893,16 @@ impl PlanProperties {
881893
self
882894
}
883895

896+
pub fn with_scheduling_type(mut self, scheduling_type: SchedulingType) -> Self {
897+
self.scheduling_type = scheduling_type;
898+
self
899+
}
900+
901+
pub fn with_evaluation_type(mut self, drive_type: EvaluationType) -> Self {
902+
self.evaluation_type = drive_type;
903+
self
904+
}
905+
884906
/// Overwrite constraints with its new value.
885907
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
886908
self.eq_properties = self.eq_properties.with_constraints(constraints);
@@ -912,25 +934,7 @@ impl PlanProperties {
912934
/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
913935
/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
914936
pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
915-
if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>() {
916-
!matches!(
917-
repartition.properties().output_partitioning(),
918-
Partitioning::RoundRobinBatch(_)
919-
)
920-
} else if let Some(coalesce) = plan.as_any().downcast_ref::<CoalescePartitionsExec>()
921-
{
922-
coalesce.input().output_partitioning().partition_count() > 1
923-
} else if let Some(sort_preserving_merge) =
924-
plan.as_any().downcast_ref::<SortPreservingMergeExec>()
925-
{
926-
sort_preserving_merge
927-
.input()
928-
.output_partitioning()
929-
.partition_count()
930-
> 1
931-
} else {
932-
false
933-
}
937+
plan.properties().evaluation_type == EvaluationType::Lazy
934938
}
935939

936940
/// Returns a copy of this plan if we change any child according to the pointer comparison.

datafusion/physical-plan/src/memory.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323
use std::task::{Context, Poll};
2424

2525
use crate::coop::cooperative;
26-
use crate::execution_plan::{Boundedness, EmissionType};
26+
use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
2727
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
2828
use crate::{
2929
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
@@ -163,7 +163,9 @@ impl LazyMemoryExec {
163163
Partitioning::RoundRobinBatch(generators.len()),
164164
EmissionType::Incremental,
165165
Boundedness::Bounded,
166-
);
166+
)
167+
.with_scheduling_type(SchedulingType::Cooperative);
168+
167169
Ok(Self {
168170
schema,
169171
batch_generators: generators,
@@ -273,10 +275,6 @@ impl ExecutionPlan for LazyMemoryExec {
273275
Ok(Box::pin(cooperative(stream)))
274276
}
275277

276-
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
277-
Some(self)
278-
}
279-
280278
fn metrics(&self) -> Option<MetricsSet> {
281279
Some(self.metrics.clone_inner())
282280
}

0 commit comments

Comments
 (0)