Skip to content

[SPARK-52968][SS] Emit additional state store metrics #51679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

dylanwong250
Copy link
Contributor

@dylanwong250 dylanwong250 commented Jul 28, 2025

What changes were proposed in this pull request?

Add additional metrics in structured streaming:

State Store Commit Metrics

"rocksdbChangeLogWriterCommitLatencyMs"
"rocksdbSaveZipFilesLatencyMs"

State Store Load Metrics

"rocksdbLoadFromSnapshotLatencyMs"
"rocksdbLoadLatencyMs"
"rocksdbReplayChangeLogLatencyMs"
"rocksdbNumReplayChangelogFiles"

Why are the changes needed?

Currently there are no metrics emitted related to loading the state store and replaying the change log.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit tests to verify that the new metrics are populated and that query progress contains the correct metrics.

Was this patch authored or co-authored using generative AI tooling?

No

)

// Refresh the recorded metrics after loading from snapshot
recordedMetrics = Some(metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we set recordedMetrics here but not in load?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that load can be read only. For more context, metricsOpt is how the RocksDBMetrics is accessed. It uses recordedMetrics which was previously only updated in commit(). This caused metrics from loadFromSnapshot to not be up to date since it is currently only used as read-only which does not commit().

I added the metrics refresh in load().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dylanwong250 please add comment to say why you added this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment in both load() and loadFromSnapshot().

When I added recordedMetrics = Some(metrics) in load() it caused some tests to fail. This was due to cases when abort() was called and then after that metrics were expected to be empty. I fixed this by clearing the metrics in rollback. I will see if that fixes the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dylanwong250 Why do we want the metrics to be empty after rollback?

Copy link
Contributor Author

@dylanwong250 dylanwong250 Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was running into a test case where in TransformWithStateExec there is the statement:

  if (isStreaming) {
    store.commit()
  } else {
    store.abort()
  }

The abort path would be taken and then the metrics would be set. But doing the set I was getting NPEs because the metrics did not exist in the metrics Map. I think this has something to do with how metrics are being initialized when isStreaming = false for TransformWithStateExec. The test that was failing was testTransformWithState.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed with Micheal to remove the refresh of the metrics on load() since these metrics are only currently reported on commit and the additional calls could increase latency of loading the store.

@@ -1350,6 +1392,7 @@ class RocksDB(
totalSSTFilesBytes,
nativeOpsLatencyMicros,
commitLatencyMs,
loadMetrics,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any concern about race conditions if we're not creating a copy of the maps here? I know that this is a pre-existing pattern, but could you just verify with a quick test that if we get the metrics, reload the state store (resetting the maps), and then read the metrics, that the metrics are correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. It seems that the existing commitLatencyMs also has the issue where the metrics in the maps from the call to metricsOpt can change. I added a clone for these maps and added a test.

)

// Refresh the recorded metrics after loading from snapshot
recordedMetrics = Some(metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you doing this? This pulls the rocksdb engine metrics right

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Response in the other comment thread related to this.

)

// Refresh the recorded metrics after loading from snapshot
recordedMetrics = Some(metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dylanwong250 please add comment to say why you added this

@@ -1101,7 +1144,8 @@ class RocksDB(
numInternalKeysOnLoadedVersion = numInternalKeysOnWritingVersion
loadedVersion = newVersion
commitLatencyMs ++= Map(
"fileSync" -> fileSyncTimeMs
"fileSync" -> fileSyncTimeMs,
"saveZipFiles" -> fileManagerMetrics.saveZipFilesTimeMs.getOrElse(0L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there should be a better way to do this. Also even if there was no zip upload in this commit, this would return the metrics from when maintenance thread did upload. That can confuse the user or eng debugging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed. I added a flag to only set this metric in commit when uploadSnapshot has been called which clears and recreates the fileManagerMetrics.

)

// Refresh the recorded metrics after loading from snapshot
recordedMetrics = Some(metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dylanwong250 Why do we want the metrics to be empty after rollback?

@@ -1312,7 +1376,8 @@ class RocksDB(
pinnedBlocksMemUsage,
totalSSTFilesBytes,
nativeOpsLatencyMicros,
commitLatencyMs,
commitLatencyMs.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making this change, can you just add a quick comment explaining why we need to clone?

@liviazhu
Copy link
Contributor

Change looks good mostly. Besides nits, just had 1 question about rollback

@@ -1034,14 +1078,22 @@ class RocksDB(
if (shouldForceSnapshot.get()) {
assert(snapshot.isDefined)
uploadSnapshot(snapshot.get)
uploadSnapshotCalled = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to introduce new flag here. We already have the isUploaded flag here to detect if we uploaded snapshot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to use a single flag.

Copy link
Contributor

@micheal-o micheal-o left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making this change. Stamped with one last comment. PTAL

/**
* Refresh the recorded metrics with the latest metrics.
*/
def refreshRecordedMetrics(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this for test only? if so make it private[state] and name refreshRecordedMetricsForTest

@@ -156,6 +156,8 @@ class RocksDB(
private val byteArrayPair = new ByteArrayPair()
private val commitLatencyMs = new mutable.HashMap[String, Long]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dylanwong250 please can we reset this commitLatencyMs map too just like you do for loadMetrics. We currently don't reset it and might be returning metrics from previous batch (existing bug). Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants