-
Notifications
You must be signed in to change notification settings - Fork 3.6k
[improve][broker] optimize the problem of subscription snapshot cache not hitting #24300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ | |
*/ | ||
package org.apache.pulsar.broker.service.persistent; | ||
|
||
import java.util.Iterator; | ||
import java.util.Map; | ||
import java.util.NavigableMap; | ||
import java.util.TreeMap; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
@@ -33,29 +35,96 @@ | |
public class ReplicatedSubscriptionSnapshotCache { | ||
private final String subscription; | ||
private final NavigableMap<Position, ReplicatedSubscriptionsSnapshot> snapshots; | ||
// Used to record the timestamp of snapshots location, which will be used to adjust cache update frequency later. | ||
private final NavigableMap<Position, Long> positionToTimestamp; | ||
private final int maxSnapshotToCache; | ||
private final int snapshotFrequencyMillis; | ||
|
||
public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache) { | ||
public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache, | ||
int snapshotFrequencyMillis) { | ||
this.subscription = subscription; | ||
this.snapshots = new TreeMap<>(); | ||
this.positionToTimestamp = new TreeMap<>(); | ||
this.maxSnapshotToCache = maxSnapshotToCache; | ||
this.snapshotFrequencyMillis = snapshotFrequencyMillis; | ||
} | ||
|
||
public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot) { | ||
public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot, long publishTime) { | ||
liudezhi2098 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
MarkersMessageIdData msgId = snapshot.getLocalMessageId(); | ||
Position position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); | ||
|
||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Added new replicated-subscription snapshot at {} -- {}", subscription, position, | ||
snapshot.getSnapshotId()); | ||
} | ||
// Case 1: cache if empty | ||
if (positionToTimestamp.lastEntry() == null) { | ||
snapshots.put(position, snapshot); | ||
positionToTimestamp.put(position, publishTime); | ||
return; | ||
} | ||
|
||
snapshots.put(position, snapshot); | ||
// The time difference between the previous position and the earliest cache entry | ||
final long timeSinceFirstSnapshot = publishTime - positionToTimestamp.firstEntry().getValue(); | ||
// The time difference between the previous position and the lately cache entry | ||
final long timeSinceLastSnapshot = publishTime - positionToTimestamp.lastEntry().getValue(); | ||
// The time window length of each time slot, used for dynamic adjustment in the snapshot cache. | ||
// The larger the time slot, the slower the update. | ||
final long timeWindowPerSlot = timeSinceFirstSnapshot / snapshotFrequencyMillis / maxSnapshotToCache; | ||
Comment on lines
+92
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a bit hard to grasp, what the "time slot" concept is here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if timeSinceFirstSnapshot is 25 minutes? The goal is that if timeSinceFirstSnapshot becomes longer, the update frequency should be lower. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to be a realistic case. I disagree that the update frequency should become lower. That's exactly my point that if there's a long delay, it will be enforced going forward. I think it's easier to make progress in this PR by removing this rule and adding it later if there's a specific reason to do so. |
||
|
||
// Prune the cache | ||
while (snapshots.size() > maxSnapshotToCache) { | ||
snapshots.pollFirstEntry(); | ||
if (position.compareTo(positionToTimestamp.firstKey()) < 0) { | ||
// Case 2: Reset cursor if position precedes first entry | ||
liudezhi2098 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
positionToTimestamp.clear(); | ||
snapshots.clear(); | ||
snapshots.put(position, snapshot); | ||
positionToTimestamp.put(position, publishTime); | ||
return; | ||
} else if (position.compareTo(positionToTimestamp.lastKey()) < 0) { | ||
// Case 3: Reset cursor If the position is in the middle, delete the cache after that position | ||
liudezhi2098 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
while (position.compareTo(positionToTimestamp.lastKey()) < 0) { | ||
positionToTimestamp.pollLastEntry(); | ||
snapshots.pollLastEntry(); | ||
} | ||
snapshots.put(position, snapshot); | ||
positionToTimestamp.put(position, publishTime); | ||
} | ||
// Time-based eviction conditions | ||
// timeSinceLastSnapshot < snapshotFrequencyMillis, keep the same frequency | ||
// timeSinceLastSnapshot < timeWindowPerSlot, implementing dynamic adjustments | ||
liudezhi2098 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (timeSinceLastSnapshot < snapshotFrequencyMillis || timeSinceLastSnapshot < timeWindowPerSlot) { | ||
return; | ||
} | ||
liudezhi2098 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (snapshots.size() < maxSnapshotToCache) { | ||
// Case 4: Add to cache if not full | ||
snapshots.put(position, snapshot); | ||
positionToTimestamp.put(position, publishTime); | ||
} else { | ||
// Case 5: Median-based eviction when cache is full | ||
int medianIndex = maxSnapshotToCache / 2; | ||
Position positionToRemove = findPositionByIndex(medianIndex); | ||
if (positionToRemove != null) { | ||
positionToTimestamp.remove(positionToRemove); | ||
snapshots.remove(positionToRemove); | ||
} | ||
positionToTimestamp.put(position, publishTime); | ||
snapshots.put(position, snapshot); | ||
} | ||
} | ||
|
||
/** | ||
* Find the Position in NavigableMap according to the target index. | ||
*/ | ||
private Position findPositionByIndex(int targetIndex) { | ||
Iterator<Map.Entry<Position, Long>> it = positionToTimestamp.entrySet().iterator(); | ||
int currentIndex = 0; | ||
while (it.hasNext()) { | ||
Map.Entry<Position, Long> entry = it.next(); | ||
if (currentIndex == targetIndex) { | ||
return entry.getKey(); | ||
} | ||
currentIndex++; | ||
} | ||
return null; | ||
} | ||
|
||
/** | ||
|
@@ -73,6 +142,7 @@ public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(P | |
// This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we | ||
// can use | ||
snapshot = snapshots.pollFirstEntry().getValue(); | ||
positionToTimestamp.pollFirstEntry(); | ||
} | ||
} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.