Skip to content

Add custom fanout to CacheableCombineAccumulate.InitialCombineGlobally #183

Open
@cyc

Description

@cyc

Recently I have been running some benchmarks on TFT using a variety of different datasets. One thing I have noticed is that CacheableCombineAccumulate seems to have a bit of a scaling issue, in particular with tft.bucketize and the QuantilesCombiner.

As a concrete example, I ran an experiment with a dataset of 20 billion rows and around 1000 dense scalar columns. I added just one analyzer: tft.bucketize(num_buckets=100, elementwise=True) and applied it to the 1000 columns concatenated together.

Dataflow job ID: 2020-06-12_11_25_12-8283309558685381372
Elapsed time: 21 hr 57 min
Total vCPU time: 11,455.279 vCPU hr

Around 19 of the 22 wall-clock hours were spent on the InitialCombineGlobally step.

To verify that this was a bottleneck on InitialCombineGlobally I forked _IntermediateAccumulateCombineImpl and added .with_fanout(1000) to beam.CombineGlobally.

Dataflow job ID: 2020-06-22_13_45_28-14720730178676069772
Elapsed time: 2 hr 43 min
Total vCPU time: 6,638.944 vCPU hr

It would be nice if we could annotate our analyzers/combiners with the desired fanout size so that we have control over this issue should it arise. Also, while the above experiment seems like a rather perversely-chosen dataset and transformation combination that is unlikely to exist in practice, this workflow is somewhat similar to certain real-world problems that I am working with so this would be very helpful to have fixed.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions