@@ -45,8 +45,13 @@ mod sp_repartition_fuzz_tests {
45
45
use test_utils:: add_empty_batches;
46
46
47
47
use datafusion_physical_expr_common:: sort_expr:: LexOrdering ;
48
- use datafusion_physical_plan:: memory:: MemorySourceConfig ;
49
- use datafusion_physical_plan:: source:: DataSourceExec ;
48
+ use datafusion_physical_plan:: {
49
+ memory:: MemorySourceConfig ,
50
+ repartition:: on_demand_repartition:: OnDemandRepartitionExec ,
51
+ } ;
52
+ use datafusion_physical_plan:: {
53
+ repartition:: on_demand_repartition, source:: DataSourceExec ,
54
+ } ;
50
55
use itertools:: izip;
51
56
use rand:: { rngs:: StdRng , seq:: SliceRandom , Rng , SeedableRng } ;
52
57
@@ -296,25 +301,40 @@ mod sp_repartition_fuzz_tests {
296
301
// behaviour. We can choose, n_distinct as we like. However,
297
302
// we chose it a large number to decrease probability of having same rows in the table.
298
303
let n_distinct = 1_000_000 ;
299
- for ( is_first_roundrobin, is_first_sort_preserving) in
300
- [ ( false , false ) , ( false , true ) , ( true , false ) , ( true , true ) ]
301
- {
302
- for is_second_roundrobin in [ false , true ] {
303
- let mut handles = Vec :: new ( ) ;
304
-
305
- for seed in seed_start..seed_end {
306
- #[ allow( clippy:: disallowed_methods) ] // spawn allowed only in tests
307
- let job = tokio:: spawn ( run_sort_preserving_repartition_test (
308
- make_staggered_batches :: < true > ( n_row, n_distinct, seed as u64 ) ,
309
- is_first_roundrobin,
310
- is_first_sort_preserving,
311
- is_second_roundrobin,
312
- ) ) ;
313
- handles. push ( job) ;
314
- }
315
-
316
- for job in handles {
317
- job. await . unwrap ( ) ;
304
+ for use_on_demand_repartition in [ false , true ] {
305
+ for ( is_first_roundrobin, is_first_sort_preserving) in
306
+ [ ( false , false ) , ( false , true ) , ( true , false ) , ( true , true ) ]
307
+ {
308
+ for is_second_roundrobin in [ false , true ] {
309
+ // On demand repartition only replaces the roundrobin repartition
310
+ if use_on_demand_repartition
311
+ && !is_first_roundrobin
312
+ && !is_second_roundrobin
313
+ {
314
+ continue ;
315
+ }
316
+ let mut handles = Vec :: new ( ) ;
317
+
318
+ for seed in seed_start..seed_end {
319
+ #[ allow( clippy:: disallowed_methods) ]
320
+ // spawn allowed only in tests
321
+ let job = tokio:: spawn ( run_sort_preserving_repartition_test (
322
+ make_staggered_batches :: < true > (
323
+ n_row,
324
+ n_distinct,
325
+ seed as u64 ,
326
+ ) ,
327
+ is_first_roundrobin,
328
+ is_first_sort_preserving,
329
+ is_second_roundrobin,
330
+ use_on_demand_repartition,
331
+ ) ) ;
332
+ handles. push ( job) ;
333
+ }
334
+
335
+ for job in handles {
336
+ job. await . unwrap ( ) ;
337
+ }
318
338
}
319
339
}
320
340
}
@@ -343,9 +363,14 @@ mod sp_repartition_fuzz_tests {
343
363
// If `true`, second repartition executor after `DataSourceExec` will be in `RoundRobin` mode
344
364
// else it will be in `Hash` mode
345
365
is_second_roundrobin : bool ,
366
+ // If `true`, `OnDemandRepartitionExec` will be used instead of `RepartitionExec`
367
+ use_on_demand_repartition : bool ,
346
368
) {
347
369
let schema = input1[ 0 ] . schema ( ) ;
348
- let session_config = SessionConfig :: new ( ) . with_batch_size ( 50 ) ;
370
+ let mut session_config = SessionConfig :: new ( ) . with_batch_size ( 50 ) ;
371
+ if use_on_demand_repartition {
372
+ session_config. options_mut ( ) . optimizer . prefer_round_robin_repartition = false ;
373
+ }
349
374
let ctx = SessionContext :: new_with_config ( session_config) ;
350
375
let mut sort_keys = LexOrdering :: default ( ) ;
351
376
for ordering_col in [ "a" , "b" , "c" ] {
@@ -367,16 +392,32 @@ mod sp_repartition_fuzz_tests {
367
392
let hash_exprs = vec ! [ col( "c" , & schema) . unwrap( ) ] ;
368
393
369
394
let intermediate = match ( is_first_roundrobin, is_first_sort_preserving) {
370
- ( true , true ) => sort_preserving_repartition_exec_round_robin ( running_source) ,
371
- ( true , false ) => repartition_exec_round_robin ( running_source) ,
395
+ ( true , true ) => {
396
+ if use_on_demand_repartition {
397
+ sort_preserving_repartition_exec_on_demand ( running_source)
398
+ } else {
399
+ sort_preserving_repartition_exec_round_robin ( running_source)
400
+ }
401
+ }
402
+ ( true , false ) => {
403
+ if use_on_demand_repartition {
404
+ repartition_exec_on_demand ( running_source)
405
+ } else {
406
+ repartition_exec_round_robin ( running_source)
407
+ }
408
+ }
372
409
( false , true ) => {
373
410
sort_preserving_repartition_exec_hash ( running_source, hash_exprs. clone ( ) )
374
411
}
375
412
( false , false ) => repartition_exec_hash ( running_source, hash_exprs. clone ( ) ) ,
376
413
} ;
377
414
378
415
let intermediate = if is_second_roundrobin {
379
- sort_preserving_repartition_exec_round_robin ( intermediate)
416
+ if use_on_demand_repartition {
417
+ sort_preserving_repartition_exec_on_demand ( intermediate)
418
+ } else {
419
+ sort_preserving_repartition_exec_round_robin ( intermediate)
420
+ }
380
421
} else {
381
422
sort_preserving_repartition_exec_hash ( intermediate, hash_exprs. clone ( ) )
382
423
} ;
@@ -399,6 +440,16 @@ mod sp_repartition_fuzz_tests {
399
440
)
400
441
}
401
442
443
+ fn sort_preserving_repartition_exec_on_demand (
444
+ input : Arc < dyn ExecutionPlan > ,
445
+ ) -> Arc < dyn ExecutionPlan > {
446
+ Arc :: new (
447
+ OnDemandRepartitionExec :: try_new ( input, Partitioning :: OnDemand ( 2 ) )
448
+ . unwrap ( )
449
+ . with_preserve_order ( ) ,
450
+ )
451
+ }
452
+
402
453
fn repartition_exec_round_robin (
403
454
input : Arc < dyn ExecutionPlan > ,
404
455
) -> Arc < dyn ExecutionPlan > {
@@ -407,6 +458,14 @@ mod sp_repartition_fuzz_tests {
407
458
)
408
459
}
409
460
461
+ fn repartition_exec_on_demand (
462
+ input : Arc < dyn ExecutionPlan > ,
463
+ ) -> Arc < dyn ExecutionPlan > {
464
+ Arc :: new (
465
+ OnDemandRepartitionExec :: try_new ( input, Partitioning :: OnDemand ( 2 ) ) . unwrap ( ) ,
466
+ )
467
+ }
468
+
410
469
fn sort_preserving_repartition_exec_hash (
411
470
input : Arc < dyn ExecutionPlan > ,
412
471
hash_expr : Vec < Arc < dyn PhysicalExpr > > ,
0 commit comments