-
Notifications
You must be signed in to change notification settings - Fork 3.6k
[improve][broker] optimize the problem of subscription snapshot cache not hitting #24300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ | |
*/ | ||
package org.apache.pulsar.broker.service.persistent; | ||
|
||
import java.util.Iterator; | ||
import java.util.Map; | ||
import java.util.NavigableMap; | ||
import java.util.TreeMap; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
@@ -34,28 +36,110 @@ public class ReplicatedSubscriptionSnapshotCache { | |
private final String subscription; | ||
private final NavigableMap<Position, ReplicatedSubscriptionsSnapshot> snapshots; | ||
private final int maxSnapshotToCache; | ||
private final int snapshotFrequencyMillis; | ||
|
||
public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache) { | ||
public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache, | ||
int snapshotFrequencyMillis) { | ||
this.subscription = subscription; | ||
this.snapshots = new TreeMap<>(); | ||
this.maxSnapshotToCache = maxSnapshotToCache; | ||
this.snapshotFrequencyMillis = snapshotFrequencyMillis; | ||
} | ||
|
||
public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot) { | ||
/** | ||
* Adds a new replicated subscription snapshot to the cache with time-based eviction policies. | ||
* This method handles 5 distinct cases for cache management: | ||
* <ol> | ||
* <li><b>Empty Cache</b>: Directly inserts the snapshot if cache is empty.</li> | ||
* <li><b>Position Precedes First Entry</b>: Clears entire cache and inserts new snapshot.</li> | ||
* <li><b>Position in Middle</b>: Trims later entries and inserts new snapshot.</li> | ||
* <li><b>Cache Not Full</b>: Inserts snapshot if cache has available space.</li> | ||
* <li><b>Cache Full</b>: Evicts median-aged entry before insertion (time-window optimized).</li> | ||
* </ol> | ||
* | ||
* <p>Time-based eviction considers two conditions: | ||
* <ul> | ||
* <li>Minimum frequency interval ({@code snapshotFrequencyMillis})</li> | ||
* <li>Dynamic time window per slot ({@code timeWindowPerSlot})</li> | ||
* </ul> | ||
* | ||
* @param snapshot The replicated subscription snapshot to add (non-null) | ||
* @param publishTime The ReplicatedSubscriptionsSnapshot marker message (entry), | ||
* the publishTime originates from the same broker, the time is reliable and clock skew is | ||
* not a problem. | ||
* @throws NullPointerException If snapshot argument is null | ||
* @see ReplicatedSubscriptionsSnapshot | ||
* @see #findPositionByIndex(int) For median position calculation in eviction case | ||
*/ | ||
public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot, long publishTime) { | ||
snapshot.setMarkersTimestamp(publishTime); | ||
MarkersMessageIdData msgId = snapshot.getLocalMessageId(); | ||
Position position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); | ||
|
||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Added new replicated-subscription snapshot at {} -- {}", subscription, position, | ||
snapshot.getSnapshotId()); | ||
} | ||
// Case 1: cache if empty | ||
if (snapshots.lastEntry() == null) { | ||
snapshots.put(position, snapshot); | ||
return; | ||
} | ||
|
||
snapshots.put(position, snapshot); | ||
// The time difference between the previous position and the earliest cache entry | ||
final long timeSinceFirstSnapshot = publishTime - snapshots.firstEntry().getValue().getMarkersTimestamp(); | ||
// The time difference between the previous position and the lately cache entry | ||
final long timeSinceLastSnapshot = publishTime - snapshots.lastEntry().getValue().getMarkersTimestamp(); | ||
// The time window length of each time slot, used for dynamic adjustment in the snapshot cache. | ||
// The larger the time slot, the slower the update. | ||
final long timeWindowPerSlot = timeSinceFirstSnapshot / snapshotFrequencyMillis / maxSnapshotToCache; | ||
Comment on lines
+92
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a bit hard to grasp, what the "time slot" concept is here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if timeSinceFirstSnapshot is 25 minutes? The goal is that if timeSinceFirstSnapshot becomes longer, the update frequency should be lower. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be a realistic case. I disagree that the update frequency should become lower. That's exactly my point that if there's a long delay, it will be enforced going forward. I think it's easier to make progress in this PR by removing this rule and adding it later if there's a specific reason to do so. |
||
|
||
// Prune the cache | ||
while (snapshots.size() > maxSnapshotToCache) { | ||
snapshots.pollFirstEntry(); | ||
if (position.compareTo(snapshots.firstKey()) < 0) { | ||
// Case 2: Reset cursor if position precedes first entry | ||
liudezhi2098 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
snapshots.clear(); | ||
snapshots.put(position, snapshot); | ||
return; | ||
} else if (position.compareTo(snapshots.lastKey()) < 0) { | ||
// Case 3: Reset cursor If the position is in the middle, delete the cache after that position | ||
liudezhi2098 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
while (position.compareTo(snapshots.lastKey()) < 0) { | ||
snapshots.pollLastEntry(); | ||
} | ||
snapshots.put(position, snapshot); | ||
} | ||
// Time-based eviction conditions | ||
// timeSinceLastSnapshot < snapshotFrequencyMillis, keep the same frequency | ||
// timeSinceLastSnapshot < timeWindowPerSlot, implementing dynamic adjustments | ||
liudezhi2098 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (timeSinceLastSnapshot < snapshotFrequencyMillis || timeSinceLastSnapshot < timeWindowPerSlot) { | ||
return; | ||
} | ||
liudezhi2098 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (snapshots.size() < maxSnapshotToCache) { | ||
// Case 4: Add to cache if not full | ||
snapshots.put(position, snapshot); | ||
} else { | ||
// Case 5: Median-based eviction when cache is full | ||
int medianIndex = maxSnapshotToCache / 2; | ||
Position positionToRemove = findPositionByIndex(medianIndex); | ||
if (positionToRemove != null) { | ||
snapshots.remove(positionToRemove); | ||
} | ||
snapshots.put(position, snapshot); | ||
} | ||
} | ||
|
||
/** | ||
* Find the Position in NavigableMap according to the target index. | ||
*/ | ||
private Position findPositionByIndex(int targetIndex) { | ||
Iterator<Map.Entry<Position, ReplicatedSubscriptionsSnapshot>> it = snapshots.entrySet().iterator(); | ||
int currentIndex = 0; | ||
while (it.hasNext()) { | ||
Map.Entry<Position, ReplicatedSubscriptionsSnapshot> entry = it.next(); | ||
if (currentIndex == targetIndex) { | ||
return entry.getKey(); | ||
} | ||
currentIndex++; | ||
} | ||
return null; | ||
} | ||
|
||
/** | ||
|
Uh oh!
There was an error while loading. Please reload this page.