-
Notifications
You must be signed in to change notification settings - Fork 260
Use new API to do Iceberg partition. #13688
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Chong Gao <[email protected]>
Signed-off-by: Chong Gao <[email protected]>
Signed-off-by: Chong Gao <[email protected]>
Signed-off-by: Chong Gao <[email protected]>
Signed-off-by: Chong Gao <[email protected]>
Signed-off-by: Chong Gao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the Iceberg partition logic to use a new API for improved performance and simplicity. The changes replace the previous sorting-based approach with a new contiguousSplitGroupsAndGenUniqKeys method that handles grouping and splitting in a single operation.
- Simplifies partitioning logic by eliminating manual sorting and upper bound operations
- Uses new CUDF API for more efficient group-by and splitting operations
- Adds proper resource management for the input columnar batch in tests
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| GpuIcebergPartitioner.scala | Refactors partitioning implementation to use new contiguousSplitGroupsAndGenUniqKeys API |
| GpuIcebergPartitionerSuite.scala | Adds proper cleanup of input columnar batch after partitioning |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private def makeValueIndices(spillableInput: SpillableColumnarBatch): Array[Int] = { | ||
| val numInputCols = spillableInput.getColumnarBatch().numCols() |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method calls spillableInput.getColumnarBatch() which may trigger spilling/unspilling. Consider caching the result or passing numInputCols as a parameter to avoid potential repeated operations.
| private def makeValueIndices(spillableInput: SpillableColumnarBatch): Array[Int] = { | |
| val numInputCols = spillableInput.getColumnarBatch().numCols() | |
| private def makeValueIndices(numInputCols: Int): Array[Int] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR refactors the Iceberg partitioning implementation to use a new cuDF API (contiguousSplitGroupsAndGenUniqKeys) that consolidates multi-step partitioning logic into a single native operation. Previously, the partitioner performed separate operations for computing keys, generating row indices, sorting multiple times, grouping, and splitting. The new approach combines key computation with value columns and delegates the entire partition operation to cuDF, eliminating multiple GPU round-trips. Three new helper methods (computeKeysTable, makeKeysValuesTable, makeValueIndices) prepare the combined keys+values table structure required by the new API. A corresponding test fix adds proper resource cleanup (originalCb.close()) to prevent GPU memory leaks. This change fits into the broader Iceberg integration module (iceberg/) which provides GPU-accelerated Apache Iceberg support within the RAPIDS Accelerator for Spark.
Important Files Changed
| Filename | Score | Overview |
|---|---|---|
| iceberg/src/main/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitioner.scala | 3/5 | Replaces multi-step partitioning algorithm with single cuDF API call; introduces three helper methods but contains potential resource management issues in nested withResource blocks |
| tests/src/test/spark350/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitionerSuite.scala | 5/5 | Adds single line to close ColumnarBatch after partitioning to prevent GPU memory leak |
Confidence score: 3/5
- This PR requires careful review due to resource management concerns in the core partitioning logic and dependency on external cuDF changes.
- Score reflects potential resource leaks in nested
withResourcecalls within helper methods (computeKeysTable,makeKeysValuesTable) where intermediategetColumnarBatch()calls could leak GPU memory if exceptions occur between resource acquisition and the outer withResource scope. The PR also depends on an external cuDF PR which may not yet be merged or tested in integration. - Pay close attention to
iceberg/src/main/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitioner.scala, specifically lines 62-96 where nested resource management could fail under exception conditions, and verify that the cuDF dependency (rapidsai/cudf#20391) is properly integrated before merge.
Sequence Diagram
sequenceDiagram
participant User
participant GpuIcebergPartitioner
participant SpillableColumnarBatch
participant Table as "cuDF Table"
participant GpuExpression
participant GpuIcebergPartitionerObj as "GpuIcebergPartitioner Object"
User->>GpuIcebergPartitioner: "partition(input: ColumnarBatch)"
GpuIcebergPartitioner->>GpuIcebergPartitioner: "Check if numRows == 0"
alt numRows == 0
GpuIcebergPartitioner-->>User: "Return empty Seq"
end
GpuIcebergPartitioner->>SpillableColumnarBatch: "Create SpillableColumnarBatch from input"
SpillableColumnarBatch-->>GpuIcebergPartitioner: "spillableInput"
GpuIcebergPartitioner->>GpuIcebergPartitioner: "makeValueIndices(spillableInput)"
GpuIcebergPartitioner->>GpuIcebergPartitioner: "makeKeysValuesTable(spillableInput)"
GpuIcebergPartitioner->>GpuIcebergPartitioner: "computeKeysTable(spillableInput)"
GpuIcebergPartitioner->>SpillableColumnarBatch: "getColumnarBatch()"
SpillableColumnarBatch-->>GpuIcebergPartitioner: "inputBatch"
loop For each partition expression
GpuIcebergPartitioner->>GpuExpression: "columnarEval(inputBatch)"
GpuExpression-->>GpuIcebergPartitioner: "Column result"
end
GpuIcebergPartitioner->>Table: "Create keysTable from evaluated columns"
Table-->>GpuIcebergPartitioner: "keysTable"
GpuIcebergPartitioner->>SpillableColumnarBatch: "getColumnarBatch()"
SpillableColumnarBatch-->>GpuIcebergPartitioner: "inputBatch"
GpuIcebergPartitioner->>Table: "Create inputTable from inputBatch"
Table-->>GpuIcebergPartitioner: "inputTable"
GpuIcebergPartitioner->>Table: "Combine keysTable and inputTable"
Table-->>GpuIcebergPartitioner: "keysValuesTable"
GpuIcebergPartitioner->>Table: "groupBy(keyColIndices).contiguousSplitGroupsAndGenUniqKeys(valueColumnIndices)"
Table-->>GpuIcebergPartitioner: "splitRet (unique keys + partitioned groups)"
GpuIcebergPartitioner->>GpuIcebergPartitionerObj: "toPartitionKeys(partitionType, partitionSparkType, uniqKeyTable)"
GpuIcebergPartitionerObj-->>GpuIcebergPartitioner: "Array[SparkStructLike] partitionKeys"
GpuIcebergPartitioner->>GpuIcebergPartitioner: "Get groups from splitRet"
loop For each partition key and group
GpuIcebergPartitioner->>SpillableColumnarBatch: "Create SpillableColumnarBatch from partition"
SpillableColumnarBatch-->>GpuIcebergPartitioner: "spillablePartition"
GpuIcebergPartitioner->>GpuIcebergPartitioner: "Create ColumnarBatchWithPartition(spillablePartition, partitionKey)"
end
GpuIcebergPartitioner-->>User: "Return Seq[ColumnarBatchWithPartition]"
2 files reviewed, 2 comments
| private def computeKeysTable(spillableInput: SpillableColumnarBatch): Table = { | ||
| val keyCols = withResource(spillableInput.getColumnarBatch()) { inputBatch => | ||
| partitionExprs.safeMap(_.columnarEval(inputBatch)) | ||
| } | ||
| withResource(keyCols) { _ => | ||
| val arr = new Array[CudfColumnVector](partitionExprs.size) | ||
| for (i <- partitionExprs.indices) { | ||
| arr(i) = keyCols(i).getBase | ||
| } | ||
| new Table(arr:_*) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: calling getColumnarBatch() on line 63 without withResource wrapping it means the batch reference isn't auto-closed if exception occurs before line 66's withResource scope begins
| private def makeValueIndices(spillableInput: SpillableColumnarBatch): Array[Int] = { | ||
| val numInputCols = spillableInput.getColumnarBatch().numCols() | ||
| (keyColNum until (keyColNum + numInputCols)).toArray | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: calling getColumnarBatch() here is not wrapped in resource management; if numCols() throws or if the reference isn't properly managed elsewhere, this could cause issues
Signed-off-by: Chong Gao <[email protected]>
Signed-off-by: Chong Gao <[email protected]>
Signed-off-by: Chong Gao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This review covers only the changes made since the last review, not the entire PR. The latest changes address the resource management concern raised in previous reviews by removing the problematic standalone call to getColumnarBatch() that was used solely to determine column count. The developer has refactored the value indices computation to occur at a different point in the code flow where resource management is already properly handled by existing withResource blocks, eliminating the potential for spilling/unspilling issues that were flagged earlier. This change completes the PR's goal of replacing the old multi-step Iceberg partitioning approach (which involved separate key computation, row index generation, multiple sorts, and gather/split operations) with a single native cuDF call to contiguousSplitGroupsAndGenUniqKeys, which should significantly reduce GPU memory allocations and improve performance.
Important Files Changed
| Filename | Score | Overview |
|---|---|---|
| iceberg/src/main/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitioner.scala | 4/5 | Refactored to compute value column indices earlier in the flow, avoiding standalone getColumnarBatch() call that could trigger repeated spilling operations |
| tests/src/test/spark350/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitionerSuite.scala | 4/5 | No changes in this iteration; previous removal of originalCb.close() remains unchanged |
Confidence score: 4/5
- This PR is safe to merge with minimal remaining risk now that the resource management issue has been addressed
- Score reflects that the latest changes directly address the caching concern from previous review, but one point deducted because the test file still has the unverified assumption that
partitioner.partition()now handles input batch cleanup internally—this should be confirmed through code inspection or testing to ensure no GPU memory leaks occur - Pay close attention to iceberg/src/main/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitioner.scala to verify the new placement of
valueColumnIndicescomputation is logically correct and thatnumCols()will always return the same value when called on the spillable batch at this point in execution
2 files reviewed, no comments
Signed-off-by: Chong Gao <[email protected]>
Signed-off-by: Chong Gao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This review covers only the changes made since the last review, not the entire PR. The latest changes address previous feedback by moving the getColumnarBatch() call inside the withRetryNoSplit resource management block and passing numInputCols as a parameter to avoid repeated operations. The core refactoring now uses cuDF's new contiguousSplitGroupsAndGenUniqKeys API to consolidate the Iceberg partitioning workflow, which previously required separate sort, group-by, and split operations. This change eliminates intermediate GPU memory allocations for sorted tables and row indices, directly returning both the split groups and unique keys. The method makeValueIndices was renamed to makeInputIndices to better reflect that it references original input columns rather than intermediate values, and an explicit call to closeUniqKeyTable() was added after extracting partition keys to the host, reducing GPU memory pressure during subsequent processing. The refactoring fits into the broader GpuIcebergPartitioner component, which transforms Spark ColumnarBatch data into Iceberg-compatible partition splits by grouping rows according to partition key columns defined in the Iceberg table schema.
Important Files Changed
| Filename | Score | Overview |
|---|---|---|
| iceberg/src/main/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitioner.scala | 4/5 | Refactored partition logic to use new cuDF API contiguousSplitGroupsAndGenUniqKeys, eliminated intermediate sort/row-index steps, renamed makeValueIndices to makeInputIndices, and added explicit unique key table closure for memory optimization |
Confidence score: 4/5
- This PR is safe to merge with low risk; the refactoring simplifies the code and improves resource management
- Score reflects that the latest changes correctly address all previous resource management concerns by moving
getColumnarBatch()insidewithRetryNoSplitand passingnumInputColsas a parameter, and the new API consolidates multiple operations safely withinwithResourceblocks - No files require special attention; resource management is now properly scoped throughout the partition method
1 file reviewed, 1 comment
| val allCols = new Array[CudfColumnVector](totalColCount) | ||
| withRetryNoSplit(spillableInput) { scb => | ||
| // make table: [key columns, input columns] | ||
| val keysAndInputTable = makeKeysAndInputTable(scb) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: the keysAndInputTable variable is assigned outside withResource but used inside the closure on line 115-116; if an exception occurs between line 110 and 114, the table won't be properly closed
| } | ||
|
|
||
| val numRows = input.numRows() | ||
| val inputColumnIndices = makeInputIndices(input.numCols()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liurenjie1024 unlike keyColIndices, we compute inputColumnIndices for each partition call, could we get input columns number via:
inputSchema.columns.size or dataSparkType.fields.size.
Is the following right?
input.numCols() == inputSchema.columns.size == dataSparkType.fields.size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Just need to wait for jni part
| } | ||
|
|
||
| val numRows = input.numRows() | ||
| val inputColumnIndices = makeInputIndices(input.numCols()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's correct.
Signed-off-by: Chong Gao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Refactors Iceberg partitioning to use new cuDF API contiguousSplitGroupsAndGenUniqKeys, simplifying the implementation by eliminating manual sorting, row index generation, and upper bound operations.
Major changes:
- Replaced multi-step partitioning (sort, groupBy, upperBound, gather, split) with single
contiguousSplitGroupsAndGenUniqKeyscall - Introduced
makeKeysAndInputTablehelper to create combined key+input table - Removed dependencies on
OrderByArgandScalar - Simplified resource management flow in main partition method
Critical issue found:
makeKeysAndInputTablereturns aTablewith references to columns from already-closed source objects, creating dangling pointers to freed GPU memory
Confidence Score: 0/5
- Critical memory management bug makes this PR unsafe to merge
- The
makeKeysAndInputTablemethod returns a Table with references to already-freed GPU columns, which will cause memory corruption, segfaults, or undefined behavior when the table is used - iceberg/src/main/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitioner.scala requires immediate attention - the
makeKeysAndInputTablemethod must be fixed before merge
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| iceberg/src/main/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitioner.scala | 1/5 | Refactored to use new API contiguousSplitGroupsAndGenUniqKeys but introduced critical memory management bug in makeKeysAndInputTable where returned Table references freed columns |
Sequence Diagram
sequenceDiagram
participant Client
participant GpuIcebergPartitioner
participant makeKeysAndInputTable
participant cuDF
participant splitRet
Client->>GpuIcebergPartitioner: partition(input)
GpuIcebergPartitioner->>GpuIcebergPartitioner: create SpillableColumnarBatch
GpuIcebergPartitioner->>makeKeysAndInputTable: makeKeysAndInputTable(spillableInput)
makeKeysAndInputTable->>makeKeysAndInputTable: compute key columns via partitionExprs
makeKeysAndInputTable->>makeKeysAndInputTable: combine [key cols, input cols] into Table
makeKeysAndInputTable-->>GpuIcebergPartitioner: keysAndInputTable
GpuIcebergPartitioner->>cuDF: keysAndInputTable.groupBy(keyColIndices)
cuDF->>cuDF: contiguousSplitGroupsAndGenUniqKeys(inputColumnIndices)
cuDF-->>GpuIcebergPartitioner: splitRet
GpuIcebergPartitioner->>splitRet: getUniqKeyTable()
GpuIcebergPartitioner->>GpuIcebergPartitioner: toPartitionKeys(uniqKeyTable)
GpuIcebergPartitioner->>splitRet: closeUniqKeyTable()
GpuIcebergPartitioner->>splitRet: getGroups()
GpuIcebergPartitioner->>GpuIcebergPartitioner: zip partitionKeys with groups
GpuIcebergPartitioner-->>Client: Seq[ColumnarBatchWithPartition]
1 file reviewed, 1 comment
| private def makeKeysAndInputTable(spillableInput: SpillableColumnarBatch): Table = { | ||
| withResource(spillableInput.getColumnarBatch()) { inputBatch => | ||
| // compute keys columns | ||
| val keyCols = partitionExprs.safeMap(_.columnarEval(inputBatch)) | ||
|
|
||
| // combine keys columns and input columns into a new table | ||
| withResource(keyCols) { _ => | ||
| withResource(GpuColumnVector.from(inputBatch)) { inputTable => | ||
| val numCols = keyCols.size + inputTable.getNumberOfColumns | ||
| val cols = new Array[CudfColumnVector](numCols) | ||
| for (i <- keyCols.indices) { | ||
| cols(i) = keyCols(i).getBase | ||
| } | ||
| for (i <- 0 until inputTable.getNumberOfColumns) { | ||
| cols(i + keyCols.size) = inputTable.getColumn(i) | ||
| } | ||
| new Table(cols:_*) | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: creating Table on line 83 with column references from keyCols and inputTable, but those source objects are closed by their withResource wrappers (lines 73, 74) before the table is returned. the returned table contains dangling references to freed GPU memory.
the columns need to be incRefCount() before being added to the new table, or the table construction needs to happen before the source resources are closed
Fixes #13679.
Use new API to do Iceberg partition.
Signed-off-by: Chong Gao [email protected]
Depends on
contiguousSplitGroupsAndGenUniqKeysrapidsai/cudf#20391Checklists