Skip to content

[WIP] Broker cache refactoring and add CacheEvictionByExpectedReadCount eviction strategy #207

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 77 commits into from

Conversation

lhotari
Copy link
Owner

@lhotari lhotari commented May 26, 2025

Motivation

Pulsar's broker cache contains multiple gaps as described in an email
https://lists.apache.org/thread/xm095hnjo0cffbdy8ckysmzzm90gsbnp
There's also a slightly related issue apache#23466

Status

These changes are WIP and many tests might still fail. This work was started in October 2024 and has been rebased several times.

Modifications

  • refactor the cache implementation
    • remove unnecessary generics usage from RangeCache
  • add new cache eviction strategy CacheEvictionByExpectedReadCount that is based on expected read count of entries
    • when an entry is read by a cursor, there are usually more cursors that will be reading the same entry very soon from the cache. This solution ensures that the later cursors will find the entry in the cache unless the cache entry has expired by size or time.
  • replace the per-cache eviction to be handled by a single shared eviction queue
    • each ledger has it's own cache instance and previously each cache instance eviction was handled separately. This was a limiting factor in adding the new CacheEvictionByExpectedReadCount strategy

UPDATE about progress

Changes will be split into at least 2 PRs.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

lhotari added 30 commits May 26, 2025 11:21
for (LedgerEntry e : ledgerEntries) {
EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
int expectedReadCountVal = expectedReadCount.getAsInt();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we place this line before the for loop?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


public Pair<Integer, Long> evictLEntriesBeforeTimestamp(long timestampNanos) {
return evictEntries(
(e, c) -> e.timestampNanos < timestampNanos ? EvictionResult.REMOVE : EvictionResult.STASH_AND_STOP,
Copy link

@berg223 berg223 May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why here we set STASH_AND_STOP instead of STASH? The difference is that we won't handle queue if we set STASH_AND_STOP here. And the method behave like we won't evict all entries before timestamp, because there maybe entries before timestamp still in removalQueue.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the most recent version of the code is more clear.

Since all entries in the queue are in insert-order, there's no longer to keep on processing the remaining entries after hitting an entry that hasn't expired. the difference between STASH and STASH_AND_STOP is that STASH will add to the stash and continue, but the other one will also stop processing remaining entries. STASH_AND_STOP is only used to evict by size.

While evicting by size, the entries that aren't evictable or haven't expired will be added to the stash.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! That's reasonable!

class RangeCacheRemovalQueue {
// The removal queue is unbounded, but we allocate memory in chunks to avoid frequent memory allocations.
private static final int REMOVAL_QUEUE_CHUNK_SIZE = 128 * 1024;
private final MpscUnboundedArrayQueue<RangeCacheEntryWrapper> removalQueue = new MpscUnboundedArrayQueue<>(
Copy link

@berg223 berg223 May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be better if removalQueue is a PriorityQueue and ordered by expectedReadCount? However, PriorityQueue is not fast when expectedReadCount change very frequently.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it. The queue will be used to keep the insert-order.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for asking! these questions help in explaining the design and will be useful content for the PIP when I write that.

Answer:
perhaps. The reason for the Mpsc queue is that it has minimal overhead. The broker cache in Pulsar is a performance hotspot and minimizing overhead in adding and removing is one of the design goals.
Without proper benchmarks, it's obviously hard to compare the actual difference in performance.
It's just based on gut feeling that minimizing work and simplifying will usually end up in the fastest algorithm.

I would assume that some sort of "generation" based removal could be a useful direction, kinda taking some inspiration from garbage collection algorithms. the current removal queue + stash are already in that direction.

Another reason why this problem is more like GC is that expectedReadCount is dynamic. It is incremented in at least 2 cases:

  • when entries get added to the replay queue
  • when a cache entry already exists and a new entry is attempted to be added

Since expectedReadCount is dynamic, typical priority queue algorithms wouldn't be able to handle that.

Yet another design goal is minimize synchronization in broker cache so that it could scale when the system has a lot of CPU cores. In the current design eviction itself is intentionally single threaded. Later on it could be possible to add sharding if this part would become a bottleneck, however that is unlikely.
The minimized synchronization will help so that insertion and lookups in the cache don't get constrained by shared locks and can scale when the broker runs on more CPU cores. This is also why StampedLock is used in RangeCacheEntryWrapper. Many operations are racy in RangeCache and there's a solution to ensure sufficient consistency with stamped locks.


if (log.isDebugEnabled()) {
log.debug("[{}] Adding entry to cache: {} - size: {}", ml.getName(), entry.getPosition(),
entryLength);
}

Position position = entry.getPosition();
if (entries.exists(position)) {
return false;
CachedEntry previousEntry = entries.get(position);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call previousEntry.release() after its life end? Because we have called retain method in entries.get(position)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, fixed.

CachedEntry previousEntry = entries.get(position);
if (previousEntry != null && entry.getReadCountHandler() != null) {
// If the entry is already in the cache, increase the expected read count on the existing entry
if (previousEntry.increaseReadCount(entry.getReadCountHandler().getExpectedReadCount())) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we increaseReadCount instead of setReadCount here? Why we need to deal this case?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the existing cached entry isn't replaced, it's possible that there are now new active consumers which might read this entry later. It's now possible to be accurate on the expected read count, so this is best efforts to keep entries longer in the cache, until the expiration. The inaccuracy doesn't cause much harm since when the cache fills up, many entries might have already been reached the expected read count and that would provide more space.
Entries will eventually get removed after the time out. Increasing the timeout for cache hits could be problematic since the cached entries actually remain more direct memory than just the entry itself. This is due to the fact that the cached ByteBuf is sliced from a parent buffer which could be larger. That parent buffer would be retained until all slices have been released. the managedLedgerCacheCopyEntries setting is to make a copy each time, but that comes with the tradeoff of allocating a new buffer and copying to that.

entry.setRefCnt(1);
entry.decreaseReadCountOnRelease = true;
Copy link

@berg223 berg223 May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like dangerous here. Because we need to prevent bad usage in the future. Maybe better to invoke setDecreaseReadCountOnRelease and release after cursor complete read. By the way, is there any other way to decrease expectedReadCount? I only find it decreased in EntryImpl.release method.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, there's potential for misuse in such internal APIs. This can be improved with comments.
I guess more comments should be added how the read count decrementing works. The EntryImpl#create(Entry other) method is used in 2 locations where a new wrapper is returned for an existing entry. In most cases, it's desired to decrement the read count when the entry is released. However, there's still corner cases with PendingReadsManager in how to handle merging of the reads and how to merge expected read counts when 2 separate reads get merged to one read. It's possible that more information is needed in the expected read count than just the count to get it right. Having actual tests to simulate the pending reads and how to get the read count right could be useful.

Copy link

@berg223 berg223 May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your patient answer ! Why not explicitly call a method to decrease the count after cursor or ledger complete read ? Do you think it's bad to couple them?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's better to decouple that. The calling release is already the way to signal that the use of the entry is done.

@lhotari
Copy link
Owner Author

lhotari commented May 29, 2025

In order to make it easier to get these changes merged to Pulsar master for Pulsar 4.1, I'll split the changes:

  • Broker cache eviction refactoring and use of single queue for removal
  • expected read count based retention of entries in the cache

@lhotari
Copy link
Owner Author

lhotari commented May 29, 2025

  • Broker cache eviction refactoring and use of single queue for removal

This is WIP in #208

@lhotari
Copy link
Owner Author

lhotari commented May 30, 2025

  • Broker cache eviction refactoring and use of single queue for removal

This is WIP in #208

PR created upstream in apache#24363. @berg223 please review
Mailing list discussion thread: https://lists.apache.org/thread/ddzzc17b0c218ozq9tx0r3rx5sgljfb0

@lhotari
Copy link
Owner Author

lhotari commented May 30, 2025

  • expected read count based retention of entries in the cache

WIP in #209

@lhotari lhotari closed this May 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants