Skip to content

[improve][broker] optimize the problem of subscription snapshot cache not hitting #24300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i

if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
final int readerIndex = metadataAndPayload.readerIndex();
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload, msgMetadata.getPublishTime());
metadataAndPayload.readerIndex(readerIndex);
}

Expand Down Expand Up @@ -358,13 +358,13 @@ protected boolean isConsumersExceededOnSubscription(AbstractTopic topic, int con
&& maxConsumersPerSubscription <= consumerSize;
}

private void processReplicatedSubscriptionSnapshot(Position pos, ByteBuf headersAndPayload) {
private void processReplicatedSubscriptionSnapshot(Position pos, ByteBuf headersAndPayload, long publishTime) {
// Remove the protobuf headers
Commands.skipMessageMetadata(headersAndPayload);

try {
ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(headersAndPayload);
subscription.processReplicatedSubscriptionSnapshot(snapshot);
subscription.processReplicatedSubscriptionSnapshot(snapshot, publishTime);
} catch (Throwable t) {
log.warn("Failed to process replicated subscription snapshot at {} -- {}", pos, t.getMessage(), t);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ default long getNumberOfEntriesDelayed() {

boolean isSubscriptionMigrated();

default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot, long publishTime) {
// Default is no-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ public boolean setReplicated(boolean replicated) {
this.replicatedSubscriptionSnapshotCache = null;
} else if (this.replicatedSubscriptionSnapshotCache == null) {
this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription(),
config.getReplicatedSubscriptionsSnapshotFrequencyMillis());
}

if (this.cursor != null) {
Expand Down Expand Up @@ -1523,10 +1524,10 @@ protected Map<String, Long> mergeCursorProperties(Map<String, Long> userProperti
}

@Override
public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot, long publishTime) {
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
if (snapshotCache != null) {
snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot));
snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot), publishTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.service.persistent;

import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -32,32 +34,114 @@
@Slf4j
public class ReplicatedSubscriptionSnapshotCache {
private final String subscription;
private final NavigableMap<Position, ReplicatedSubscriptionsSnapshot> snapshots;
private final NavigableMap<Position, SnapshotEntry> snapshots;
private final int maxSnapshotToCache;
private final int snapshotFrequencyMillis;

public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache) {
public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache,
int snapshotFrequencyMillis) {
this.subscription = subscription;
this.snapshots = new TreeMap<>();
this.maxSnapshotToCache = maxSnapshotToCache;
this.snapshotFrequencyMillis = snapshotFrequencyMillis;
}

public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot) {
/**
* Adds a new replicated subscription snapshot to the cache with time-based eviction policies.
* This method handles 5 distinct cases for cache management:
* <ol>
* <li><b>Empty Cache</b>: Directly inserts the snapshot if cache is empty.</li>
* <li><b>Position Precedes First Entry</b>: Clears entire cache and inserts new snapshot.</li>
* <li><b>Position in Middle</b>: Trims later entries and inserts new snapshot.</li>
* <li><b>Cache Not Full</b>: Inserts snapshot if cache has available space.</li>
* <li><b>Cache Full</b>: Evicts median-aged entry before insertion (time-window optimized).</li>
* </ol>
*
* <p>Time-based eviction considers two conditions:
* <ul>
* <li>Minimum frequency interval ({@code snapshotFrequencyMillis})</li>
* <li>Dynamic time window per slot ({@code timeWindowPerSlot})</li>
* </ul>
*
* @param snapshot The replicated subscription snapshot to add (non-null)
* @param publishTime The ReplicatedSubscriptionsSnapshot marker message (entry),
* the publishTime originates from the same broker, the time is reliable and clock skew is
* not a problem.
* @throws NullPointerException If snapshot argument is null
* @see ReplicatedSubscriptionsSnapshot
* @see #findPositionByIndex(int) For median position calculation in eviction case
*/
public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot, long publishTime) {
SnapshotEntry snapshotEntry = new SnapshotEntry(snapshot, publishTime);
MarkersMessageIdData msgId = snapshot.getLocalMessageId();
Position position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId());

if (log.isDebugEnabled()) {
log.debug("[{}] Added new replicated-subscription snapshot at {} -- {}", subscription, position,
snapshot.getSnapshotId());
}
// Case 1: cache if empty
if (snapshots.lastEntry() == null) {
snapshots.put(position, snapshotEntry);
return;
}

snapshots.put(position, snapshot);
// The time difference between the previous position and the earliest cache entry
final long timeSinceFirstSnapshot = publishTime - snapshots.firstEntry().getValue().timestamp();
// The time difference between the previous position and the lately cache entry
final long timeSinceLastSnapshot = publishTime - snapshots.lastEntry().getValue().timestamp();
// The time window length of each time slot, used for dynamic adjustment in the snapshot cache.
// The larger the time slot, the slower the update.
final long timeWindowPerSlot = timeSinceFirstSnapshot / snapshotFrequencyMillis / maxSnapshotToCache;
Comment on lines +92 to +94
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit hard to grasp, what the "time slot" concept is here.
Let's say, timeSinceFirstSnapshot is 25000 ms, snapshotFrequencyMillis is 1000ms and maxSnapshotToCache is 10, it would result in 2. What's the point of this?
With low values, this would be close to 0, I guess. This is also why I think this is just unnecessary complexity.

Copy link
Contributor Author

@liudezhi2098 liudezhi2098 May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if timeSinceFirstSnapshot is 25 minutes? The goal is that if timeSinceFirstSnapshot becomes longer, the update frequency should be lower.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if timeSinceFirstSnapshot is 25 minutes? The goal is that if timeSinceFirstSnapshot becomes longer, the update frequency should be lower.

This doesn't seem to be a realistic case. I disagree that the update frequency should become lower. That's exactly my point that if there's a long delay, it will be enforced going forward. I think it's easier to make progress in this PR by removing this rule and adding it later if there's a specific reason to do so.


// Prune the cache
while (snapshots.size() > maxSnapshotToCache) {
snapshots.pollFirstEntry();
if (position.compareTo(snapshots.firstKey()) < 0) {
// Case 2: Reset cursor if position precedes first entry
snapshots.clear();
snapshots.put(position, snapshotEntry);
return;
} else if (position.compareTo(snapshots.lastKey()) < 0) {
// Case 3: Reset cursor If the position is in the middle, delete the cache after that position
while (position.compareTo(snapshots.lastKey()) < 0) {
snapshots.pollLastEntry();
}
snapshots.put(position, snapshotEntry);
}
// Time-based eviction conditions
// timeSinceLastSnapshot < snapshotFrequencyMillis, keep the same frequency
// timeSinceLastSnapshot < timeWindowPerSlot, implementing dynamic adjustments
if (timeSinceLastSnapshot < snapshotFrequencyMillis || timeSinceLastSnapshot < timeWindowPerSlot) {
return;
}
if (snapshots.size() < maxSnapshotToCache) {
// Case 4: Add to cache if not full
snapshots.put(position, snapshotEntry);
} else {
// Case 5: Median-based eviction when cache is full
int medianIndex = maxSnapshotToCache / 2;
Position positionToRemove = findPositionByIndex(medianIndex);
if (positionToRemove != null) {
snapshots.remove(positionToRemove);
}
snapshots.put(position, snapshotEntry);
}
}

/**
* Find the Position in NavigableMap according to the target index.
*/
private Position findPositionByIndex(int targetIndex) {
Iterator<Map.Entry<Position, SnapshotEntry>> it = snapshots.entrySet().iterator();
int currentIndex = 0;
while (it.hasNext()) {
Map.Entry<Position, SnapshotEntry> entry = it.next();
if (currentIndex == targetIndex) {
return entry.getKey();
}
currentIndex++;
}
return null;
}

/**
* Signal that the mark-delete position on the subscription has been advanced. If there is a snapshot that
* correspond to this position, it will returned, other it will return null.
Expand All @@ -72,7 +156,7 @@ public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(P
} else {
// This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we
// can use
snapshot = snapshots.pollFirstEntry().getValue();
snapshot = snapshots.pollFirstEntry().getValue().snapshot();
}
}

Expand All @@ -88,4 +172,7 @@ public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(P
}
return snapshot;
}

private record SnapshotEntry(ReplicatedSubscriptionsSnapshot snapshot, long timestamp) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@

@Test(groups = "broker")
public class ReplicatedSubscriptionSnapshotCacheTest {
int snapshotFrequencyMillis = 1000;

@Test
public void testSnapshotCache() {
ReplicatedSubscriptionSnapshotCache cache = new ReplicatedSubscriptionSnapshotCache("my-subscription", 10);

ReplicatedSubscriptionSnapshotCache cache =
new ReplicatedSubscriptionSnapshotCache("my-subscription", 10, snapshotFrequencyMillis);

assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(0, 0)));
assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(100, 0)));
Expand All @@ -51,11 +54,11 @@ public void testSnapshotCache() {
ReplicatedSubscriptionsSnapshot s7 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-7");
s7.setLocalMessageId().setLedgerId(7 ).setEntryId(7);

cache.addNewSnapshot(s1);
cache.addNewSnapshot(s2);
cache.addNewSnapshot(s5);
cache.addNewSnapshot(s7);
long publishTime = System.currentTimeMillis();
cache.addNewSnapshot(s1, publishTime);
cache.addNewSnapshot(s2, publishTime + snapshotFrequencyMillis);
cache.addNewSnapshot(s5, publishTime + 2L * snapshotFrequencyMillis);
cache.addNewSnapshot(s7, publishTime + 3L * snapshotFrequencyMillis);

assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(0, 0)));
assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1, 0)));
Expand All @@ -72,9 +75,73 @@ public void testSnapshotCache() {
assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(5, 5)));
}

@Test
public void testSnapshotCacheByRestCursor() {

ReplicatedSubscriptionSnapshotCache cache =
new ReplicatedSubscriptionSnapshotCache("my-subscription", 10, snapshotFrequencyMillis);
// The rest cursor is smaller than the cache first position
ReplicatedSubscriptionsSnapshot s1 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-1");
s1.setLocalMessageId().setLedgerId(10).setEntryId(10);

ReplicatedSubscriptionsSnapshot s2 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-2");
s2.setLocalMessageId().setLedgerId(20).setEntryId(20);

ReplicatedSubscriptionsSnapshot s3 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-3");
s3.setLocalMessageId().setLedgerId(30).setEntryId(30);

ReplicatedSubscriptionsSnapshot s4 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-4");
s4.setLocalMessageId().setLedgerId(1).setEntryId(1);

ReplicatedSubscriptionsSnapshot s5 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-5");
s5.setLocalMessageId().setLedgerId(10).setEntryId(10);
long publishTime = System.currentTimeMillis();
cache.addNewSnapshot(s1, publishTime);
cache.addNewSnapshot(s2, publishTime + 2L * snapshotFrequencyMillis);
cache.addNewSnapshot(s3, publishTime + 3L * snapshotFrequencyMillis);
cache.addNewSnapshot(s4, publishTime + 4L * snapshotFrequencyMillis);
cache.addNewSnapshot(s5, publishTime + 5L * snapshotFrequencyMillis);

ReplicatedSubscriptionsSnapshot snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(1, 1));
assertEquals(snapshot.getSnapshotId(), "snapshot-4");
snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(10, 10));
assertEquals(snapshot.getSnapshotId(), "snapshot-5");
assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(20, 20)));
assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(30, 30)));

// The rest cursor is smaller than the cache last position
ReplicatedSubscriptionsSnapshot s6 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-6");
s6.setLocalMessageId().setLedgerId(10).setEntryId(10);
ReplicatedSubscriptionsSnapshot s7 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-7");
s7.setLocalMessageId().setLedgerId(20).setEntryId(20);
ReplicatedSubscriptionsSnapshot s8 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-8");
s8.setLocalMessageId().setLedgerId(30).setEntryId(30);
ReplicatedSubscriptionsSnapshot s9 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-9");
s9.setLocalMessageId().setLedgerId(20).setEntryId(20);
cache.addNewSnapshot(s6, publishTime + 6L * snapshotFrequencyMillis);
cache.addNewSnapshot(s7, publishTime + 7L * snapshotFrequencyMillis);
cache.addNewSnapshot(s8, publishTime + 8L * snapshotFrequencyMillis);
cache.addNewSnapshot(s9, publishTime + 9L * snapshotFrequencyMillis);
snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(10, 10));
assertEquals(snapshot.getSnapshotId(), "snapshot-6");
snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(20, 20));
assertEquals(snapshot.getSnapshotId(), "snapshot-9");
assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(30, 30)));
}

@Test
public void testSnapshotCachePruning() {
ReplicatedSubscriptionSnapshotCache cache = new ReplicatedSubscriptionSnapshotCache("my-subscription", 3);
ReplicatedSubscriptionSnapshotCache cache =
new ReplicatedSubscriptionSnapshotCache("my-subscription", 4, snapshotFrequencyMillis);

ReplicatedSubscriptionsSnapshot s1 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-1");
Expand All @@ -91,20 +158,37 @@ public void testSnapshotCachePruning() {
ReplicatedSubscriptionsSnapshot s4 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-4");
s4.setLocalMessageId().setLedgerId(4).setEntryId(4);

cache.addNewSnapshot(s1);
cache.addNewSnapshot(s2);
cache.addNewSnapshot(s3);
cache.addNewSnapshot(s4);

// Snapshot-1 was already pruned
assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1, 1)));
ReplicatedSubscriptionsSnapshot s5 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-5");
s5.setLocalMessageId().setLedgerId(5).setEntryId(5);
ReplicatedSubscriptionsSnapshot s6 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-6");
s6.setLocalMessageId().setLedgerId(6).setEntryId(6);
ReplicatedSubscriptionsSnapshot s7 = new ReplicatedSubscriptionsSnapshot()
.setSnapshotId("snapshot-7");
s7.setLocalMessageId().setLedgerId(7).setEntryId(7);

long publishTime = System.currentTimeMillis();
cache.addNewSnapshot(s1, publishTime + snapshotFrequencyMillis);
cache.addNewSnapshot(s2, publishTime + 2L * snapshotFrequencyMillis);
cache.addNewSnapshot(s3, publishTime + 3L * snapshotFrequencyMillis);
cache.addNewSnapshot(s4, publishTime + 4L * snapshotFrequencyMillis);
cache.addNewSnapshot(s5, publishTime + 5L * snapshotFrequencyMillis);
cache.addNewSnapshot(s6, publishTime + 5L * snapshotFrequencyMillis);
cache.addNewSnapshot(s7, publishTime + 7L * snapshotFrequencyMillis);
// snapshots = [s1, s2, s5, s7]
cache.advancedMarkDeletePosition(PositionFactory.create(1, 1));
ReplicatedSubscriptionsSnapshot snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(2, 2));
assertNotNull(snapshot);
assertEquals(snapshot.getSnapshotId(), "snapshot-2");

snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(3, 3));
assertNull(snapshot);
snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(4, 4));
assertNull(snapshot);
snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(5, 5));
assertNotNull(snapshot);
assertEquals(snapshot.getSnapshotId(), "snapshot-4");
assertEquals(snapshot.getSnapshotId(), "snapshot-5");
snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(6, 6));
assertNull(snapshot);
snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(7, 7));
assertEquals(snapshot.getSnapshotId(), "snapshot-7");
}
}
Loading