35
35
//! - New source operators that do not make use of Tokio resources
36
36
//! - Exchange like operators that do not use Tokio's `Channel` implementation to pass data between
37
37
//! tasks
38
- //!
38
+ //!
39
39
//! # Available utilities
40
- //!
40
+ //!
41
41
//! This module provides two function that can be used to add cooperative scheduling to existing
42
42
//! `Stream` implementations.
43
- //!
43
+ //!
44
44
//! [`cooperative`] is a generic function that takes ownership of the wrapped [`RecordBatchStream`].
45
45
//! This function has the benefit of not requiring an additional heap allocation and can avoid
46
46
//! dynamic dispatch.
47
- //!
47
+ //!
48
48
//! [`make_cooperative`] is a non-generic function that wraps a [`SendableRecordBatchStream`]. This
49
49
//! can be used to wrap dynamically typed, heap allocated [`RecordBatchStream`]s.
50
50
51
- #[ cfg( datafusion_coop = "tokio_fallback" ) ]
51
+ #[ cfg( any(
52
+ datafusion_coop = "tokio_fallback" ,
53
+ not( any( datafusion_coop = "tokio" , datafusion_coop = "per_stream" ) )
54
+ ) ) ]
52
55
use futures:: Future ;
53
56
use std:: any:: Any ;
54
57
use std:: pin:: Pin ;
@@ -72,18 +75,18 @@ use futures::{Stream, StreamExt};
72
75
/// A stream that passes record batches through unchanged while cooperating with the Tokio runtime.
73
76
/// It consumes cooperative scheduling budget for each returned [`RecordBatch`],
74
77
/// allowing other tasks to execute when the budget is exhausted.
75
- ///
76
- /// See the [module level documentation](crate::coop) for an in-depth discussion.
78
+ ///
79
+ /// See the [module level documentation](crate::coop) for an in-depth discussion.
77
80
pub struct CooperativeStream < T >
78
81
where
79
82
T : RecordBatchStream + Unpin ,
80
83
{
81
84
inner : T ,
82
- #[ cfg( not ( any ( datafusion_coop = "tokio" , datafusion_coop = "tokio_fallback" ) ) ) ]
85
+ #[ cfg( datafusion_coop = "per_stream" ) ]
83
86
budget : u8 ,
84
87
}
85
88
86
- #[ cfg( not ( any ( datafusion_coop = "tokio" , datafusion_coop = "tokio_fallback" ) ) ) ]
89
+ #[ cfg( datafusion_coop = "per_stream" ) ]
87
90
// Magic value that matches Tokio's task budget value
88
91
const YIELD_FREQUENCY : u8 = 128 ;
89
92
@@ -97,10 +100,7 @@ where
97
100
pub fn new ( inner : T ) -> Self {
98
101
Self {
99
102
inner,
100
- #[ cfg( not( any(
101
- datafusion_coop = "tokio" ,
102
- datafusion_coop = "tokio_fallback"
103
- ) ) ) ]
103
+ #[ cfg( datafusion_coop = "per_stream" ) ]
104
104
budget : YIELD_FREQUENCY ,
105
105
}
106
106
}
@@ -128,10 +128,13 @@ where
128
128
value
129
129
}
130
130
131
- #[ cfg( datafusion_coop = "tokio_fallback" ) ]
131
+ #[ cfg( any(
132
+ datafusion_coop = "tokio_fallback" ,
133
+ not( any( datafusion_coop = "tokio" , datafusion_coop = "per_stream" ) )
134
+ ) ) ]
132
135
{
133
- // This is a temporary placeholder implementation
134
- // that may have slightly worse performance compared to `poll_proceed`
136
+ // This is a temporary placeholder implementation that may have slightly
137
+ // worse performance compared to `poll_proceed`
135
138
if !tokio:: task:: coop:: has_budget_remaining ( ) {
136
139
cx. waker ( ) . wake_by_ref ( ) ;
137
140
return Poll :: Pending ;
@@ -151,7 +154,7 @@ where
151
154
value
152
155
}
153
156
154
- #[ cfg( not ( any ( datafusion_coop = "tokio" , datafusion_coop = "tokio_fallback" ) ) ) ]
157
+ #[ cfg( datafusion_coop = "per_stream" ) ]
155
158
{
156
159
if self . budget == 0 {
157
160
self . budget = YIELD_FREQUENCY ;
0 commit comments