Skip to content

[feat][admin] PIP-415: Support getting message ID by index #24222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

liangyepianzhou
Copy link
Contributor

PIP:#24220

Motivation

we can now obtain the offset of a message by its message id:

  1. Get the message by id using get-message-by-id cmd
  2. Get the index of the message using Message.getIndex()

But we cannot obtain the message id by offset. Then we need to add a new API to get the message id by offset.

Modifications

Add a new http API to retrieve the message ID by offset.
We propose to add a new API to retrieve the message ID by offset, enabling us to cache the mapping between message ID and offset.
This will allow us to use offsets for seek and acknowledgment operations when consuming messages through the standardized API.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Apr 27, 2025
@liangyepianzhou liangyepianzhou self-assigned this Apr 27, 2025
@liangyepianzhou liangyepianzhou marked this pull request as ready for review April 27, 2025 12:35
xiangying added 2 commits May 14, 2025 10:41
Copy link
Member

@StevenLuMT StevenLuMT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#24220 PIP has a lot of adjustments.
@liangyepianzhou please ping me after you modify the code and I will review it again.

@liangyepianzhou liangyepianzhou changed the title [feat][admin] PIP-415: Support getting message ID by offset [feat][admin] PIP-415: Support getting message ID by index May 29, 2025
@AuroraTwinkle
Copy link
Contributor

Good jobs

@ApiResponse(code = 406, message = "The topic is not a persistent topic"),
@ApiResponse(code = 412, message = "The broker is not enable broker entry metadata"),
})
public void getMessageIDByIndexAndPartitionID(@Suspended final AsyncResponse asyncResponse,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AndPartitionID suffix in the method seems meaningless (and confusing)?

} else {
indexFuture.complete(index);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You must release entry here.

I think you can add a common method for both asyncReadEntry and asyncFindPosition to use. Here is an example:

    private static Optional<Long> getIndexAndRelease(Entry entry) throws IOException {
        try {
            final var brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
            if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
                return Optional.of(brokerEntryMetadata.getIndex());
            } else {
                return Optional.empty();
            }
        } catch (Throwable throwable) {
            throw new IOException(throwable);
        }
    }

The implementation above forces the caller side to handle the two exceptional cases:

  • Optional.empty() is returned, which indicates the broker entry metadata does not exist or the index does not exist (your code does not take it into consideration)
  • The buffer is corrupted, where IOException will be thrown

But you can implement your own method if you care much about the overhead of Optional or don't want to use checked exception.

}
ManagedLedger managedLedger = persistentTopic.getManagedLedger();
return findMessageIndexByPosition(
PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just pass managedLedger.getFirstPosition() here?

@BewareMyPower
Copy link
Contributor

Out of the scope of this PR, I think it would be better to add a method to ManagedLedger for it. Because the customized ML implementation (e.g. StreamNative's Ursa engine) might not depend on the broker entry metadata to get the message's index. It could be another PIP.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants