Skip to content

[improve][broker] optimize the problem of subscription snapshot cache not hitting #24300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

liudezhi2098
Copy link
Contributor

Motivation

  • When message acknowledgment confirmation is slower than message consumption rate, subscription cursor synchronization fails to complete. This occurs because:
  1. Current Behavior
    • With large receiver queues (e.g., receiverQueueSize=1000), the cursor never synchronizes
     Consumer<String> consumer = client.newConsumer(Schema.STRING)
               .topic(topic)
               .subscriptionName("sub")
               .receiverQueueSize(1000)
                .subscribe();
      while (true) {
                Message<String> msg = consumer.receive();
                 consumer.acknowledge(msg);
                 Thread.sleep(100);
            }
    
    
    • With small queues (e.g., receiverQueueSize=1), synchronization works properly
     Consumer<String> consumer = client.newConsumer(Schema.STRING)
               .topic(topic)
               .subscriptionName("sub")
               .receiverQueueSize(1)
                .subscribe();
      while (true) {
                Message<String> msg = consumer.receive();
                 consumer.acknowledge(msg);
                 Thread.sleep(100);
            }
    
    
  2. Root Cause:

Modifications

  1. Cache Update Strategy:
    • modified the cache to maintain mapping relationships for remote clusters.
  2. Eviction Policy Enhancement:
    • When cache reaches capacity (maxSnapshotToCache):
      • Allow subsequent snapshots to be added through periodic dynamic adjustment
      • The latest snapshot is used to replace the intermediate snapshot of the cache, and the update becomes slower as the difference between the latest snapshot time and the Mark Delete Position time increases.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@liudezhi2098 liudezhi2098 self-assigned this May 14, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label May 14, 2025
@liangyepianzhou liangyepianzhou added this to the 4.1.0 milestone May 14, 2025
@lhotari
Copy link
Member

lhotari commented May 14, 2025

  • When message acknowledgment confirmation is slower than message consumption rate, subscription cursor synchronization fails to complete.

@liudezhi2098 Regarding this scenario, is there a way to find out this happens?
Does the metric pulsar_replicated_subscriptions_timedout_snapshots added in #22381 help detecting problems?

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @liudezhi2098! Added a comment about adding code comments. :)

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the publishTime information comes from Pulsar client, the logic could be brittle. In Pulsar, there's also an optional brokerPublishTime which is not enabled by default (requires specific configuration in all brokers).

Have you considered what could happen when publishTime values aren't in sync?

I've understood that the "PIP-33: Replicated subscriptions" algorithm rely on vector clocks so that clock sync doesn't become a problem. (earlier discussion)
The snapshots are the way how the vector clocks are synchronized, at least that's how I interpret it from one view point.

The changes in this PR don't currently make sense to me, mainly due to the use of publishTime.

I'd assume that in your problem scenario, the correct approach would be to tune replicatedSubscriptionsSnapshotFrequencyMillis, replicatedSubscriptionsSnapshotTimeoutSeconds and replicatedSubscriptionsSnapshotMaxCachedPerSubscription values.

@FieldContext(
category = CATEGORY_SERVER,
doc = "Frequency of snapshots for replicated subscriptions tracking.")
private int replicatedSubscriptionsSnapshotFrequencyMillis = 1_000;
@FieldContext(
category = CATEGORY_SERVER,
doc = "Timeout for building a consistent snapshot for tracking replicated subscriptions state. ")
private int replicatedSubscriptionsSnapshotTimeoutSeconds = 30;
@FieldContext(
category = CATEGORY_SERVER,
doc = "Max number of snapshot to be cached per subscription.")
private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;

Have you already done this?

Currently it is a problem that it's necessary to tune the values to fix issues and it's also hard to notice that the problem is occurring.

It looks like future improvements are needed too.

@lhotari
Copy link
Member

lhotari commented May 14, 2025

Current Behavior

  • With large receiver queues (e.g., receiverQueueSize=1000), the cursor never synchronizes

@liangyepianzhou Do you have a separate repro app where this could be observed with real brokers (let's say 2 Pulsar broker within docker-compose + some test app)? Creating a separate Git repo for such a repro would be one approach to share it. Having a runnable repro makes things easier for reviewers too.

@lhotari
Copy link
Member

lhotari commented May 14, 2025

  • The SnapshotCache updates too aggressively
  • When advancedMarkDeletePosition executes, valid cache entries are frequently unavailable

@liudezhi2098 One thought here is that perhaps there could be interaction between ReplicatedSubscriptionsController and all ReplicatedSubscriptionSnapshotCache instances? Could there be a solution that when "updates too aggressively" that there's a solution in place that a snapshot would be completed every replicatedSubscriptionsSnapshotFrequencyMillis.
Since the ReplicatedSubscriptionSnapshotCache is an internal interface, we don't need to keep it as a "cache". It's possible that it doesn't make sense in the revisited solution.
Do you have a chance to try something in this area instead since I don't think that using publishTime in the solution makes sense.

@liudezhi2098
Copy link
Contributor Author

liudezhi2098 commented May 14, 2025

Since the publishTime information comes from Pulsar client, the logic could be brittle. In Pulsar, there's also an optional brokerPublishTime which is not enabled by default (requires specific configuration in all brokers).

Have you considered what could happen when publishTime values aren't in sync?

I've understood that the "PIP-33: Replicated subscriptions" algorithm rely on vector clocks so that clock sync doesn't become a problem. (earlier discussion) The snapshots are the way how the vector clocks are synchronized, at least that's how I interpret it from one view point.

The changes in this PR don't currently make sense to me, mainly due to the use of publishTime.

I'd assume that in your problem scenario, the correct approach would be to tune replicatedSubscriptionsSnapshotFrequencyMillis, replicatedSubscriptionsSnapshotTimeoutSeconds and replicatedSubscriptionsSnapshotMaxCachedPerSubscription values.

@FieldContext(
category = CATEGORY_SERVER,
doc = "Frequency of snapshots for replicated subscriptions tracking.")
private int replicatedSubscriptionsSnapshotFrequencyMillis = 1_000;
@FieldContext(
category = CATEGORY_SERVER,
doc = "Timeout for building a consistent snapshot for tracking replicated subscriptions state. ")
private int replicatedSubscriptionsSnapshotTimeoutSeconds = 30;
@FieldContext(
category = CATEGORY_SERVER,
doc = "Max number of snapshot to be cached per subscription.")
private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;

Have you already done this?

Currently it is a problem that it's necessary to tune the values to fix issues and it's also hard to notice that the problem is occurring.

It looks like future improvements are needed too.

@lhotari The generation of snapshots is completed through the exchange of snapshotRequest and snapshotResponse between two clusters. Ultimately, the ReplicatedSubscriptionsController writes Marker messages, and using publishTime is reliable because this behavior occurs within the same broker.

Of course, the topic may be transferred to another broker, but this is a low-frequency scenario, and its publishTime will not exhibit continuous jumps.

However, we can adopt a simpler approach that doesn't require using publishTime. Instead, we can record the current system time each time the snapshotCache,but there is a flaw that it cannot truly reflect the time difference between two messages. In some scenarios, it will cause the cache update frequency to decrease.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generation of snapshots is completed through the exchange of snapshotRequest and snapshotResponse between two clusters. Ultimately, the ReplicatedSubscriptionsController writes Marker messages, and using publishTime is reliable because this behavior occurs within the same broker.

Of course, the topic may be transferred to another broker, but this is a low-frequency scenario, and its publishTime will not exhibit continuous jumps.

However, we can adopt a simpler approach that doesn't require using publishTime. Instead, we can record the current system time each time the snapshotCache is updated and use this timestamp for dynamic adjustments.

Thanks for explaining that. I missed that point that the marker messages are originated from the same broker. publishTime would be fine due to that detail.

When looking at the current master branch code in ReplicatedSubscriptionSnapshotCache.addNewSnapshot, I'd assume that a potential solution to the problem could be that the current mark delete position is taken into account before purging entries.

It looks like the problem arrises when there's isn't at least one entry that is older than the current mark delete position in the cache.

I'd suggest to revisit the purging logic in this way:

  • modify addNewSnapshot and add a 2nd parameter which is the current mark delete position
  • always keep the newest entry that is before the current mark delete position when purging entries, all older entries can be purged
  • if the cache remains full after after doing this, remove a single entry in the cache so that the new entry could be added.
    • keep the position of the last removed entry so that it's possible to continue the purging algorithm in subsequent calls
    • if there's no previous last removed entry, purge the 2nd entry (assuming that the first entry is the newest entry before current mark delete position)
    • if there's a previous last removed entry, continue purging from next entry after the last removed position by first skipping one entry and then removing the 2nd entry
    • if there's no more entries to remove, start removing from the beginning.

This purging logic should always result in making it possible to add a new entry. Since every 2nd entry is removed, it will result in "sampling" so that when the mark delete position finally advances, it advances to the most recent position.

There's a possibility to increase replicatedSubscriptionsSnapshotMaxCachedPerSubscription parameter to improve the resolution, if that's desirable.

Perhaps the intention of your timestamp based approach is already to achieve something similar?

@liudezhi2098
Copy link
Contributor Author

liudezhi2098 commented May 14, 2025

When looking at the current master branch code in ReplicatedSubscriptionSnapshotCache.addNewSnapshot, I'd assume that a potential solution to the problem could be that the current mark delete position is taken into account before purging entries.

It looks like the problem arrises when there's isn't at least one entry that is older than the current mark delete position in the cache.

I'd suggest to revisit the purging logic in this way:

  • modify addNewSnapshot and add a 2nd parameter which is the current mark delete position

  • always keep the newest entry that is before the current mark delete position when purging entries, all older entries can be purged

  • if the cache remains full after after doing this, remove a single entry in the cache so that the new entry could be added.

    • keep the position of the last removed entry so that it's possible to continue the purging algorithm in subsequent calls
    • if there's no previous last removed entry, purge the 2nd entry (assuming that the first entry is the newest entry before current mark delete position)
    • if there's a previous last removed entry, continue purging from next entry after the last removed position by first skipping one entry and then removing the 2nd entry
    • if there's no more entries to remove, start removing from the beginning.

This purging logic should always result in making it possible to add a new entry. Since every 2nd entry is removed, it will result in "sampling" so that when the mark delete position finally advances, it advances to the most recent position.

There's a possibility to increase replicatedSubscriptionsSnapshotMaxCachedPerSubscription parameter to improve the resolution, if that's desirable.

Perhaps the intention of your timestamp based approach is already to achieve something similar?

@lhotari The intention of timestamp based is to achieve this purpose,the key is when cache is full, how to update the cache,
there is no perfect algorithm to solve this problem.

I recommend using Median-based eviction for simplicity, try to make the cache an arithmetic progression in time, because for the shared mode subscription, there will be individual unconfirmed messages, presenting a very jumpy situation.

@lhotari
Copy link
Member

lhotari commented May 14, 2025

The intention of timestamp based is to achieve this purpose,the key is when cache is full, how to update the cache, there is no perfect algorithm to solve this problem.

I recommend using Median-based eviction for simplicity, try to make the cache an arithmetic progression in time, because for the shared mode subscription, there will be individual unconfirmed messages, presenting a very jumpy situation.

You are right about this. The added comments in the code make it easier to understand the intention of the logic. My previous comment about taking the mark deletion position into account in adding snapshots didn't make much sense after rethinking.
I'll review again.

@liudezhi2098 liudezhi2098 requested a review from lhotari May 15, 2025 03:36
@liudezhi2098 liudezhi2098 requested a review from lhotari May 15, 2025 13:49
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, mainly comments about comments. The 2nd rule in skipping to add entries, timeSinceLastSnapshot < timeWindowPerSlot, seems risky to add since it could have surprising consequences. I think it would be better to remove that.

Comment on lines +93 to +95
// The time window length of each time slot, used for dynamic adjustment in the snapshot cache.
// The larger the time slot, the slower the update.
final long timeWindowPerSlot = timeSinceFirstSnapshot / snapshotFrequencyMillis / maxSnapshotToCache;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit hard to grasp, what the "time slot" concept is here.
Let's say, timeSinceFirstSnapshot is 25000 ms, snapshotFrequencyMillis is 1000ms and maxSnapshotToCache is 10, it would result in 2. What's the point of this?
With low values, this would be close to 0, I guess. This is also why I think this is just unnecessary complexity.

Copy link
Contributor Author

@liudezhi2098 liudezhi2098 May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if timeSinceFirstSnapshot is 25 minutes? The goal is that if timeSinceFirstSnapshot becomes longer, the update frequency should be lower.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if timeSinceFirstSnapshot is 25 minutes? The goal is that if timeSinceFirstSnapshot becomes longer, the update frequency should be lower.

This doesn't seem to be a realistic case. I disagree that the update frequency should become lower. That's exactly my point that if there's a long delay, it will be enforced going forward. I think it's easier to make progress in this PR by removing this rule and adding it later if there's a specific reason to do so.

@lhotari
Copy link
Member

lhotari commented May 16, 2025

I recommend using Median-based eviction for simplicity, try to make the cache an arithmetic progression in time, because for the shared mode subscription, there will be individual unconfirmed messages, presenting a very jumpy situation.

I don't see how the median based eviction could make sense. After the cache is filled up, when the median entry is removed and a new entry is added, and this repeats, the result will be that only entries after the median entry will be evicted (assuming no other events happen in between). Eventually there will be a large gap between the 2 entries in the middle.

Since time is already considered in the algorithm, it seems that an alternative approach would be to evict the entry with the shortest time distance to it's adjacent entries. Makes sense?

@lhotari
Copy link
Member

lhotari commented May 22, 2025

I recommend using Median-based eviction for simplicity, try to make the cache an arithmetic progression in time, because for the shared mode subscription, there will be individual unconfirmed messages, presenting a very jumpy situation.

I don't see how the median based eviction could make sense. After the cache is filled up, when the median entry is removed and a new entry is added, and this repeats, the result will be that only entries after the median entry will be evicted (assuming no other events happen in between). Eventually there will be a large gap between the 2 entries in the middle.

Since time is already considered in the algorithm, it seems that an alternative approach would be to evict the entry with the shortest time distance to it's adjacent entries. Makes sense?

@liudezhi2098 Just wondering if you are fine with the provided feedback on this PR? It would be great to address this issue in replicated subscriptions and get this PR to completion.

@lhotari
Copy link
Member

lhotari commented May 30, 2025

@liudezhi2098 Are you planning to continue working on this? I think that this is a really great improvement to address a long time issue with replicated subscriptions.

@lhotari lhotari requested review from merlimat, nodece and dao-jun May 30, 2025 05:37
@lhotari lhotari added the triage/lhotari/important lhotari's triaging label for important issues or PRs label May 30, 2025
@lhotari
Copy link
Member

lhotari commented Jun 2, 2025

@liudezhi2098 There's also a long-standing issue #10054 which is addressed by #16651. I have updated PR 16651 description, rebased it and revisited it slightly. Please review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs triage/lhotari/important lhotari's triaging label for important issues or PRs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants