Skip to content

[refactor][ml] Replace cache eviction algorithm with centralized removal queue and job #24363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented May 29, 2025

Motivation

This PR fixes fundamental inefficiencies and correctness issues in the current Pulsar broker entry cache eviction algorithm. The current implementation has flawed size-based eviction that doesn't remove the oldest entries and incorrect timestamp-based eviction with high CPU overhead. These fixes ensure that size-based eviction properly removes the oldest entries and timestamp-based eviction works correctly. Additionally, this PR serves as a foundation for future improvements to efficiently handle catch-up reads and Key_Shared subscription scenarios.

Mailing list discussion about this PR: https://lists.apache.org/thread/ddzzc17b0c218ozq9tx0r3rx5sgljfb0

Problems with the Current Broker Entry Cache Implementation

  1. Size-Based Eviction doesn't remove oldest entries: The existing EntryCacheDefaultEvictionPolicy uses an algorithm for keeping the cache size under the limit but cannot guarantee removal of the oldest entries from the cache. The algorithm:

    • Sorts caches by size in descending order
    • Selects caches representing a percentage of total size (PercentOfSizeToConsiderForEviction, default 0.5)
    • Attempts to evict proportional amounts from each selected cache
    • This approach doesn't ensure that the oldest entries are removed, leading to inefficient cache utilization
  2. Inefficient and Incorrect Timestamp-Based Eviction: The current timestamp eviction has both performance and correctness issues:

    • Performance problems:
      • Iterates through ALL cache instances (there's an instance for each persistent topic / ManagedLedgerImpl)
      • Causes remarkable CPU and memory pressure due to constant frequent iterations when there's a high number of topics running with high throughput.
      • Runs every 10 milliseconds by default (managedLedgerCacheEvictionIntervalMs=10) - 100 times per second!
    • Correctness problems:
      • Assumes entries are ordered by both position and timestamp, which breaks when:
        • Cache is used for catch-up reads (backlogged cursors)
        • Individual entries are cached out of order
        • Multiple read patterns (tailing + catch-up) access the same cache simultaneously
  3. Limited Cache Scope: The original RangeCache was designed for tailing reads. Later changes added support for backlogged cursors, but the eviction algorithms weren't updated to handle mixed read patterns effectively.

  4. Unnecessary Complexity: Generic type parameters in RangeCache add complexity without providing value, as the cache is only used for entry storage.

Modifications

1. Centralized Removal Queue (RangeCacheRemovalQueue)

  • Single-threaded eviction: All cache evictions are handled by one thread to avoid contention
  • Insertion-order tracking: Uses MpscUnboundedArrayQueue to maintain entry insertion order
  • Accurate timestamp eviction: Entries are processed in insertion order, making timestamp-based eviction reliable
  • Efficient size-based eviction: Oldest entries are removed first when freeing cache space

2. Simplified Cache Implementation

  • Removed generics: Dropped unnecessary type parameters from RangeCache to reduce complexity
  • Unified eviction handling: All cache instances use the same central removal queue
  • Improved consistency: Race conditions in eviction are minimized through centralized processing

3. Foundation for Future Improvements

The existing broker cache has limitations:

  • Unnecessary BookKeeper reads during catch-up read scenarios
    • Causes increased network costs and resource usage on BookKeeper nodes
    • Cascading performance issues under high fan-out catch-up reads
    • Current backlogged cursors caching solution has multiple gaps
  • Poor cache hit rates for Key_Shared subscriptions with slow consumers since entries get put into the replay queue and once the consumer has sent permits, these entries are read from BookKeeper (unless cacheEvictionByMarkDeletedPosition=true)

This refactoring prepares the cache system for:

  • Enhanced catch-up read optimization
  • Efficient replay queue caching for Key_Shared subscriptions

Algorithm Comparison

Before (EntryCacheDefaultEvictionPolicy)

Size Based Eviction

  1. Sort all caches by size (largest first)
  2. Select caches until reaching PercentOfSizeToConsiderForEviction (0.5)
  3. For each selected cache:
    • Calculate proportional eviction amount
    • Remove entries (no guarantee of age-based removal)
  4. Problem: May remove newer entries while keeping older ones

@Override
public void doEviction(List<EntryCache> caches, long sizeToFree) {
checkArgument(sizeToFree > 0);
checkArgument(!caches.isEmpty());
caches.sort(reverseOrder());
long totalSize = 0;
for (EntryCache cache : caches) {
totalSize += cache.getSize();
}
// This algorithm apply the eviction only the group of caches whose combined size reaches the
// PercentOfSizeToConsiderForEviction
List<EntryCache> cachesToEvict = new ArrayList();
long cachesToEvictTotalSize = 0;
long sizeToConsiderForEviction = (long) (totalSize * PercentOfSizeToConsiderForEviction);
log.debug("Need to gather at least {} from caches", sizeToConsiderForEviction);
int cacheIdx = 0;
while (cachesToEvictTotalSize < sizeToConsiderForEviction) {
// This condition should always be true, considering that we cannot free more size that what we have in
// cache
checkArgument(cacheIdx < caches.size());
EntryCache entryCache = caches.get(cacheIdx++);
cachesToEvictTotalSize += entryCache.getSize();
cachesToEvict.add(entryCache);
log.debug("Added cache {} with size {}", entryCache.getName(), entryCache.getSize());
}
int evictedEntries = 0;
long evictedSize = 0;
for (EntryCache entryCache : cachesToEvict) {
// To each entryCache chosen to for eviction, we'll ask to evict a proportional amount of data
long singleCacheSizeToFree = (long) (sizeToFree * (entryCache.getSize() / (double) cachesToEvictTotalSize));
if (singleCacheSizeToFree == 0) {
// If the size of this cache went to 0, it probably means that its entries has been removed from the
// cache since the time we've computed the ranking
continue;
}
Pair<Integer, Long> evicted = entryCache.evictEntries(singleCacheSizeToFree);
evictedEntries += evicted.getLeft();
evictedSize += evicted.getRight();
}
log.info("Completed cache eviction. Removed {} entries from {} caches. ({} Mb)", evictedEntries,
cachesToEvict.size(), evictedSize / RangeEntryCacheManagerImpl.MB);
}

Timestamp eviction

  1. Iterate all caches
  2. For each cache:
    • Start from the first position in the cache
    • Remove entries from the cache until the cache is empty or there's a valid entry that hasn't yet been expired
  3. Problem: Iterating all caches and entries cause a lot of unnecessary CPU and memory pressure due to iterations. By default, this is performed every 10 milliseconds, 100 times per second. (managedLedgerCacheEvictionIntervalMs=10)

private synchronized void doCacheEviction() {
long maxTimestamp = System.nanoTime() - cacheEvictionTimeThresholdNanos;
ledgers.values().forEach(mlfuture -> {
if (mlfuture.isDone() && !mlfuture.isCompletedExceptionally()) {
ManagedLedgerImpl ml = mlfuture.getNow(null);
if (ml != null) {
ml.doCacheEviction(maxTimestamp);
}
}
});
}

After (RangeCacheRemovalQueue)

  1. All entries added to insertion-order queue when cached
  2. For timestamp eviction:
    • Process queue from oldest to newest
    • Remove entries older than threshold
    • Stop when hitting newer entry (leverages insertion order)
  3. For size eviction:
    • Process queue from oldest to newest
    • Remove entries until target size freed
    • Guarantees oldest entries are removed first

https://github.com/apache/pulsar/blob/b72bc4ff3aa5c9c45d9233d2d000429b3cf0ce1a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheRemovalQueue.java

Note: There's a single shared removal queue for all ManagedLedgerImpl instances instead of having to do the check in multiple instances.

Verifying this change

This change is already covered by existing tests:

  • All existing cache-related tests continue to pass
  • RangeCacheTest validates the new removal queue functionality
  • EntryCacheManagerTest verifies eviction behavior remains correct
  • Integration tests ensure no regression in cache performance

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari added this to the 4.1.0 milestone May 29, 2025
@lhotari lhotari self-assigned this May 29, 2025
@lhotari lhotari requested review from merlimat, nodece and dao-jun May 29, 2025 18:32
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label May 29, 2025
@codecov-commenter
Copy link

codecov-commenter commented May 29, 2025

Codecov Report

Attention: Patch coverage is 85.57457% with 59 lines in your changes missing coverage. Please review.

Project coverage is 74.25%. Comparing base (bbc6224) to head (b72bc4f).
Report is 1125 commits behind head on master.

Files with missing lines Patch % Lines
...ache/bookkeeper/mledger/impl/cache/RangeCache.java 75.26% 10 Missing and 13 partials ⚠️
...che/bookkeeper/mledger/impl/AbstractEntryImpl.java 72.72% 13 Missing and 2 partials ⚠️
...kkeeper/mledger/impl/ManagedLedgerFactoryImpl.java 73.68% 5 Missing ⚠️
...per/mledger/impl/cache/RangeCacheEntryWrapper.java 91.66% 2 Missing and 2 partials ⚠️
...l/cache/RangeEntryCacheManagerEvictionHandler.java 76.47% 2 Missing and 2 partials ⚠️
...mledger/impl/cache/RangeEntryCacheManagerImpl.java 92.50% 0 Missing and 3 partials ⚠️
...che/bookkeeper/mledger/impl/ManagedLedgerImpl.java 90.90% 0 Missing and 2 partials ⚠️
...bookkeeper/mledger/impl/cache/CachedEntryImpl.java 91.66% 0 Missing and 1 partial ⚠️
...keeper/mledger/impl/cache/PendingReadsManager.java 80.00% 1 Missing ⚠️
...per/mledger/impl/cache/RangeCacheRemovalQueue.java 97.61% 0 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24363      +/-   ##
============================================
+ Coverage     73.57%   74.25%   +0.68%     
+ Complexity    32624    32241     -383     
============================================
  Files          1877     1871       -6     
  Lines        139502   145193    +5691     
  Branches      15299    16595    +1296     
============================================
+ Hits         102638   107819    +5181     
+ Misses        28908    28843      -65     
- Partials       7956     8531     +575     
Flag Coverage Δ
inttests 26.74% <64.79%> (+2.16%) ⬆️
systests 23.36% <63.81%> (-0.96%) ⬇️
unittests 73.74% <85.57%> (+0.89%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
.../org/apache/bookkeeper/mledger/impl/EntryImpl.java 100.00% <100.00%> (+21.51%) ⬆️
...kkeeper/mledger/impl/cache/EntryCacheDisabled.java 75.55% <100.00%> (+4.72%) ⬆️
.../mledger/impl/cache/RangeCacheRemovalCounters.java 100.00% <100.00%> (ø)
...keeper/mledger/impl/cache/RangeEntryCacheImpl.java 63.18% <100.00%> (+4.43%) ⬆️
...oker/service/nonpersistent/NonPersistentTopic.java 70.90% <ø> (+1.44%) ⬆️
...bookkeeper/mledger/impl/cache/CachedEntryImpl.java 91.66% <91.66%> (ø)
...keeper/mledger/impl/cache/PendingReadsManager.java 88.20% <80.00%> (+1.54%) ⬆️
...per/mledger/impl/cache/RangeCacheRemovalQueue.java 97.61% <97.61%> (ø)
...che/bookkeeper/mledger/impl/ManagedLedgerImpl.java 81.51% <90.90%> (+0.85%) ⬆️
...mledger/impl/cache/RangeEntryCacheManagerImpl.java 94.50% <92.50%> (-1.65%) ⬇️
... and 5 more

... and 1075 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Slf4j
class RangeCache {
private final ConcurrentNavigableMap<Position, RangeCacheEntryWrapper> entries;
private final RangeCacheRemovalQueue removalQueue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is removal O(1)? Do we need to scan items in the queue to remove an item?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use some basic LRU??

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's O(1). there's no need to do any scanning. the eviction happens always in FIFO order.

Actual cached entries might be evicted in other ways (let's say when a ledger is removed), but that doesn't impact the removal queue. When a cached entry has already been evicted, the "empty" wrapper will be in the queue and it will get removed once the queue processing proceeds due to expiration or by sized based eviction. Just to say that there won't be any scanning to remove the entries from the queue when EntryCache instances are cleared.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. what happens if some entries are reused repeatedly? Do we move them to the tail of the removal queue? Shouldn't we deprioritize evicting those entries?(e.g large fanout cases, subscriptions with many consumers)

  2. Dont we still need to scan entire list when all items timestamp are less than the threshold?(we dont do early stop here)

  3. Is this cache per ledger? If yes, Why not a single global cache shared with all ledgers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Do we need to worry about the size limit of the removal queue from too many empty wrapper entries?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2. Dont we still need to scan entire list when all items timestamp are less than the threshold?(we dont do early stop here)

there's no need to scan the entire queue currently. For future work with different thresholds, there could be multiple phases somewhat similar way as garbage collection algorithms do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3. Is this cache per ledger? If yes, Why not a single global cache shared with all ledgers?

this PR doesn't change that. I think it's an efficient solution that caches are per ledger for the broker use cases. I don't think that a single global cache would bring any benefits over the current solution.

This PR addresses the problems in the existing implementation about evicting expired entries and keeping the total global cache size under a limit. That has been a problem in having the individual caches, but after this PR, the problem is solved.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4. Do we need to worry about the size limit of the removal queue from too many empty wrapper entries?

The RangeCacheEntryWrapper causes an overhead about 100 bytes per entry currently. With 100,000 entries in the queue, that would be about 10 MB of heap. That shouldn't be a problem.

More details about RangeCacheEntryWrapper and its background:
The wrapper entries are recycled so the wrapper entries don't add GC overhead. These wrappers are already used in the RangeCache to ensure consistency while using StampedLocks to avoid synchronization and blocking. The wrapper solution was added to RangeCache in #22789, with follow ups in #22814, #22818, #23903 and #23955. Without the wrapper there were issues where the entry got evicted and recycled while it was getting used. It's explained in the PR descriptions and there are also test cases to reproduce the problems.
The design of the RangeCache is intentionally "racy" so that we wouldn't need to synchronize and block. StampedLock provides the necessary consistency together with the reference counting. The RangeCache's EntryWrapper is also a foundation for this removal queue work since it makes it possible to have the separate EntryCache instances per ledger and centralize the expiration and size based eviction to the queue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the good questions @heesung-sn

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for answering my questions. I think this PR and direction looks good to me with the future improvements.

But I guess this requires more discussions in the Pulsar community, too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs ready-to-test
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants