Skip to content

Commit 1bf305f

Browse files
committed
chore: Used unbounded channel
1 parent c8df7fe commit 1bf305f

File tree

2 files changed

+15
-15
lines changed

2 files changed

+15
-15
lines changed

datafusion/physical-plan/src/repartition/distributor_channels.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,12 @@ pub fn channels<T>(
8282
pub fn tokio_channels<T>(
8383
n: usize,
8484
) -> (
85-
Vec<tokio::sync::mpsc::Sender<T>>,
86-
Vec<tokio::sync::mpsc::Receiver<T>>,
85+
Vec<tokio::sync::mpsc::UnboundedSender<T>>,
86+
Vec<tokio::sync::mpsc::UnboundedReceiver<T>>,
8787
) {
88-
// only used for OnDemandRepartitionExec, so no need for unbounded capacity
89-
let (senders, receivers) = (0..n).map(|_| tokio::sync::mpsc::channel(2)).unzip();
88+
let (senders, receivers) = (0..n)
89+
.map(|_| tokio::sync::mpsc::unbounded_channel())
90+
.unzip();
9091
(senders, receivers)
9192
}
9293

@@ -103,8 +104,9 @@ pub fn partition_aware_channels<T>(
103104
(0..n_in).map(|_| channels(n_out)).unzip()
104105
}
105106

106-
type OnDemandPartitionAwareSenders<T> = Vec<Vec<tokio::sync::mpsc::Sender<T>>>;
107-
type OnDemandPartitionAwareReceivers<T> = Vec<Vec<tokio::sync::mpsc::Receiver<T>>>;
107+
type OnDemandPartitionAwareSenders<T> = Vec<Vec<tokio::sync::mpsc::UnboundedSender<T>>>;
108+
type OnDemandPartitionAwareReceivers<T> =
109+
Vec<Vec<tokio::sync::mpsc::UnboundedReceiver<T>>>;
108110

109111
pub fn on_demand_partition_aware_channels<T>(
110112
n_in: usize,

datafusion/physical-plan/src/repartition/on_demand_repartition.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ type PartitionChannels = (Vec<Sender<usize>>, Vec<Receiver<usize>>);
9797
/// └─────────────────┘ Distribute data to the output partitions
9898
///
9999
/// ```
100-
type OnDemandDistributionSender = tokio::sync::mpsc::Sender<MaybeBatch>;
101-
type OnDemandDistributionReceiver = tokio::sync::mpsc::Receiver<MaybeBatch>;
100+
type OnDemandDistributionSender = tokio::sync::mpsc::UnboundedSender<MaybeBatch>;
101+
type OnDemandDistributionReceiver = tokio::sync::mpsc::UnboundedReceiver<MaybeBatch>;
102102

103103
type OnDemandInputPartitionsToCurrentPartitionSender = Vec<OnDemandDistributionSender>;
104104
type OnDemandInputPartitionsToCurrentPartitionReceiver =
@@ -631,10 +631,8 @@ impl OnDemandRepartitionExec {
631631
metrics: OnDemandRepartitionMetrics,
632632
context: Arc<TaskContext>,
633633
) -> Result<()> {
634-
let num_output_partition = partitioning.partition_count();
635634
// initialize buffer channel so that we can pre-fetch from input
636-
let (buffer_tx, mut buffer_rx) =
637-
tokio::sync::mpsc::channel(num_output_partition * 2);
635+
let (buffer_tx, mut buffer_rx) = tokio::sync::mpsc::channel(2);
638636
// execute the child operator in a separate task
639637
// that pushes batches into buffer channel with limited capacity
640638
let processing_task = SpawnedTask::spawn(Self::process_input(
@@ -683,7 +681,7 @@ impl OnDemandRepartitionExec {
683681
if let Some((tx, reservation)) = output_channels.get_mut(&partition) {
684682
reservation.lock().try_grow(size)?;
685683

686-
if tx.send(Some(Ok(batch))).await.is_err() {
684+
if tx.send(Some(Ok(batch))).is_err() {
687685
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
688686
reservation.lock().shrink(size);
689687
output_channels.remove(&partition);
@@ -737,7 +735,7 @@ impl OnDemandRepartitionExec {
737735
"Join Error".to_string(),
738736
Box::new(DataFusionError::External(Box::new(Arc::clone(&e)))),
739737
));
740-
tx.send(Some(err)).await.ok();
738+
tx.send(Some(err)).ok();
741739
}
742740
}
743741
// Error from running input task
@@ -748,14 +746,14 @@ impl OnDemandRepartitionExec {
748746
for (_, tx) in txs {
749747
// wrap it because need to send error to all output partitions
750748
let err = Err(DataFusionError::from(&e));
751-
tx.send(Some(err)).await.ok();
749+
tx.send(Some(err)).ok();
752750
}
753751
}
754752
// Input task completed successfully
755753
Ok(Ok(())) => {
756754
// notify each output partition that this input partition has no more data
757755
for (_, tx) in txs {
758-
tx.send(None).await.ok();
756+
tx.send(None).ok();
759757
}
760758
}
761759
}

0 commit comments

Comments
 (0)