Skip to content

[feat][admin] PIP-415: Support getting message ID by index #24222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -5489,4 +5489,114 @@ protected CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
return null;
}));
}

protected CompletableFuture<MessageId> internalGetMessageIDByIndexAsync(Long index, boolean authoritative) {
if (!pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"GetMessageIDByIndex is not allowed when broker entry metadata is disabled"));
}
if (index == null || index < 0) {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
"Invalid message index: " + index));
}
int partitionIndex = topicName.getPartitionIndex();
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
return future.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
return CompletableFuture.completedFuture(null);
}
}).thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
} else {
return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(topicMetadata -> {
if (topicMetadata.partitions > 0) {
log.warn("[{}] Not supported getMessageIdByIndex operation on "
+ "partitioned-topic {}", clientAppId(), topicName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"GetMessageIDByIndex is not allowed on partitioned-topic");
}
});
}
}).thenCompose(ignore -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
if (!(topic instanceof PersistentTopic persistentTopic)) {
log.error("[{}] Get message id by index on a non-persistent topic {} is not allowed",
clientAppId(), topicName);
return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
"Get message id by index on a non-persistent topic is not allowed"));
}
ManagedLedger managedLedger = persistentTopic.getManagedLedger();
return findMessageIndexByPosition(
PositionFactory.create(managedLedger.getFirstPosition().getLedgerId(), 0),
managedLedger)
.thenCompose(firstIndex -> {
if (index < firstIndex) {
return CompletableFuture.completedFuture(PositionFactory.EARLIEST);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or consider returning null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why you think null is better than earliest?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicit signal that no valid position exists for the given index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, got.

PositionFactory.EARLIEST name feels odd, as its value (-1, -1) doesn't correspond to a real message and may mislead developers into thinking it's a valid starting point. Returning null would more clearly indicate that the index is out of range and no valid position exists when used as the public value.

It's just my opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return earliest is OK, but do we pass the index to the clients?
I seem to have some impression, but I'm not sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(-1, -1) doesn't correspond to a real message and may mislead developers into thinking it's a valid starting point.

It's already documented, so it's good.

but do we pass the index to the clients?

Do you mean the actual first available index? If so, I think the API could be unnecessarily complicated (a good API does not do multiple things)

} else {
return managedLedger.asyncFindPosition(entry -> {
try {
BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
// Skip messages without index
if (brokerEntryMetadata == null) {
return true;
}
return brokerEntryMetadata.getIndex() < index;
} catch (Exception e) {
log.error("Error deserialize message for message position find", e);
} finally {
entry.release();
}
return false;
});
}
}).thenCompose(position -> {
Position lastPosition = managedLedger.getLastConfirmedEntry();
if (position == null || position.compareTo(lastPosition) > 0) {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
"Message not found for index " + index));
} else {
return CompletableFuture.completedFuture(position);
}
});
}).thenCompose(position -> CompletableFuture.completedFuture(
new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex)));
}

protected CompletableFuture<Long> findMessageIndexByPosition(Position position, ManagedLedger managedLedger) {
CompletableFuture<Long> indexFuture = new CompletableFuture<>();
managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
if (brokerEntryMetadata == null) {
indexFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED,
"Broker entry metadata is not present in the message"));
} else {
long index = brokerEntryMetadata.getIndex();
if (index < 0) {
indexFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED,
"Invalid message index: " + index));
} else {
indexFuture.complete(index);
}
}
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to read position {} on topic {}",
clientAppId(), position, topicName, exception);
indexFuture.completeExceptionally(exception);
}
}, null);
return indexFuture;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5015,5 +5015,38 @@ public void removeAutoSubscriptionCreation(
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/getMessageIdByIndex")
@ApiOperation(hidden = true, value = "Get Message ID by index.",
notes = "If the specified index is a system message, "
+ "it will return the message id of the later message.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace or partitioned topic does not exist, "
+ "or the index is invalid"),
@ApiResponse(code = 406, message = "The topic is not a persistent topic"),
@ApiResponse(code = 412, message = "The broker is not enable broker entry metadata"),
})
public void getMessageIDByIndexAndPartitionID(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("index") long index,
@QueryParam("authoritative") @DefaultValue("false")
boolean authoritative){
validateTopicName(tenant, namespace, encodedTopic);
internalGetMessageIDByIndexAsync(index, authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get message id by index for topic {}, partition id {}, index {}",
clientAppId(), topicName, index, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,24 @@
package org.apache.pulsar.broker.service;

import static org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import static org.testng.AssertJUnit.fail;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.NotAllowedException;
import javax.ws.rs.NotFoundException;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
Expand Down Expand Up @@ -420,4 +425,72 @@ public void testManagedLedgerTotalSize() throws Exception {

cursor.close();
}

@Test
public void testGetMessageIdByIndex() throws Exception {
// 1. test no partitioned topic
final String topicName = newTopicName();
admin.topics().createNonPartitionedTopic(topicName);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.create();
MessageIdImpl messageId = (MessageIdImpl) producer.send("test");
Message<byte[]>
message = admin.topics().getMessagesById(topicName, messageId.getLedgerId(), messageId.getEntryId()).get(0);
long index = message.getIndex().get();
MessageIdImpl messageIdByIndex = (MessageIdImpl) admin.topics().getMessageIdByIndex(topicName, index);
Assert.assertEquals(messageIdByIndex, messageId);

// 2. test partitioned topic
final String topicName2 = newTopicName();
final String partitionedTopicName = topicName2 + "-partition-" + 0;
admin.topics().createPartitionedTopic(topicName2, 10);
@Cleanup
Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
.topic(topicName2)
.enableBatching(false)
.create();

MessageIdImpl messageId2 = null;
for (int i = 0; i < 200; i++) {
messageId2 = (MessageIdImpl) producer2.send("test" + i);
if (messageId2.getPartitionIndex() == 0) {
break;
}
}
Message<byte[]>
message2 = admin.topics().getMessagesById(partitionedTopicName,
messageId2.getLedgerId(), messageId2.getEntryId()).get(0);
long index2 = message2.getIndex().get();
// 2.1 test partitioned topic name with partition index
MessageIdImpl messageIdByIndex2 = (MessageIdImpl) admin.topics().getMessageIdByIndex(partitionedTopicName, index2);
Assert.assertEquals(messageIdByIndex2, messageId2);
// 2.2 test partitioned topic name without partition index
try {
messageIdByIndex2 = (MessageIdImpl) admin.topics().getMessageIdByIndex(topicName2, index2);
fail("Should not be able to get messageId by index for partitioned topic without partition index");
} catch (PulsarAdminException e) {
// Expected exception, as the topic name does not include partition index
Assert.assertTrue(e.getCause().getCause() instanceof NotAllowedException);
}

// 3. test invalid index
try {
admin.topics().getMessageIdByIndex(topicName, -1);
fail("Should not be able to get messageId by index for invalid index");
} catch (PulsarAdminException e) {
// Expected exception, as the index is invalid
Assert.assertTrue(e.getCause().getCause() instanceof NotFoundException);
}
try {
admin.topics().getMessageIdByIndex(topicName, 100000);
fail("Should not be able to get messageId by index for invalid index");
} catch (PulsarAdminException e) {
// Expected exception, as the index is invalid
Assert.assertTrue(e.getCause().getCause() instanceof NotFoundException);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4558,4 +4558,61 @@ default void createShadowTopic(String shadowTopic, String sourceTopic) throws Pu
default CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String sourceTopic) {
return createShadowTopicAsync(shadowTopic, sourceTopic, null);
}

/**
* Get the message id by index. If the index points to a system message, return the first user message following it;
* if the specified message has expired and been deleted, return MessageId.Earliest.
* The messages without entry metadata will be skipped, and the next matched message whose index >= the specified
* index will be returned.
* @param topicName either the specific partition name of a partitioned topic (e.g. my-topic-partition-0)
* or the original topic name for non-partitioned topics.
* @param index the index of a message
* @return the message id of the message.
* When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal
* storage unit for messages in Pulsar's persistence layer).
* If message batching is enabled, a single entry may contain multiple messages with distinct indexes.
* Example Scenario (partition with 2 entries):
* | Entry | Ledger ID | Entry ID | Index | Messages |
* | :--- | ---: | ---: | ---: | ---: |
* | A | 0 | 0 | 2 | 0,1,2 |
* | B | 0 | 1 | 4 | 3,4 |
* Param with indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A).
* @throws NotAuthorizedException (HTTP 403 Forbidden) Client lacks permissions to access the topic/namespace.
* @throws NotFoundException (HTTP 404 Not Found) Source topic/namespace does not exist, or invalid index.
* @throws PulsarAdminException (HTTP 406 Not Acceptable) Specified topic is not a persistent topic.
* @throws PreconditionFailedException (HTTP 412 Precondition Failed) Broker entry metadata is disabled.
* @throws PulsarAdminException For other errors (e.g., HTTP 500 Internal Server Error).
*/
MessageId getMessageIdByIndex(String topicName, long index) throws PulsarAdminException;


/**
* Get the message id by index asynchronously. If the index points to a system message, return the first user
* message following it; if the specified message has expired and been deleted, return MessageId.Earliest.
* The messages without entry metadata will be skipped, and the next matched message whose index >= the specified
* index will be returned.
* @param topicName either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or
* the original topic name for non-partitioned topics.
* @param index the index of a message
* When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal
* storage unit for messages in Pulsar's persistence layer).
* If message batching is enabled, a single entry may contain multiple messages with distinct indexes.
* Example Scenario (partition with 2 entries):
* | Entry | Ledger ID | Entry ID | Index | Messages |
* | :--- | ---: | ---: | ---: | ---: |
* | A | 0 | 0 | 2 | 0,1,2 |
* | B | 0 | 1 | 4 | 3,4 |
* Param with indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A).
* @implNote The return {@link CompletableFuture<MessageId>} that completes with the message id of the message.
* The future may complete exceptionally with:
* <ul>
* <li>{@link NotAuthorizedException} (HTTP 403) Permission denied for topic/namespace access.</li>
* <li>{@link NotFoundException} (HTTP 404) Shadow topic/namespace does not exist or invalid index.</li>
* <li>{@link PulsarAdminException} (HTTP 406) Shadow topic is not a persistent topic.</li>
* <li>{@link PreconditionFailedException} (HTTP 412) Broker entry metadata is not enabled.</li>
* <li>{@link PulsarAdminException} (HTTP 307) Redirect required to the correct broker.</li>
* <li>{@link PulsarAdminException} Other errors (e.g., HTTP 500).</li>
* </ul>
*/
CompletableFuture<MessageId> getMessageIdByIndexAsync(String topicName, long index);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2819,5 +2819,31 @@ public CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String
});
}

@Override
public MessageId getMessageIdByIndex(String topicName, long index) throws PulsarAdminException {
return sync(() -> getMessageIdByIndexAsync(topicName, index));
}

@Override
public CompletableFuture<MessageId> getMessageIdByIndexAsync(String topicName, long index) {
final CompletableFuture<MessageId> messageIdCompletableFuture = new CompletableFuture<>();
TopicName topic = validateTopic(topicName);
WebTarget path = topicPath(topic, "getMessageIdByIndex");
path = path.queryParam("index", index);
asyncGetRequest(path, new InvocationCallback<MessageIdImpl>(){

@Override
public void completed(MessageIdImpl messageId) {
messageIdCompletableFuture.complete(messageId);
}

@Override
public void failed(Throwable throwable) {
messageIdCompletableFuture.completeExceptionally(throwable);
}
});
return messageIdCompletableFuture;
}

private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2090,7 +2090,8 @@ public boolean matches(Long timestamp) {
cmdTopics.run(split("get-shadow-source persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getShadowSource("persistent://myprop/clust/ns1/ds1");


cmdTopics.run(split("get-message-id-by-index persistent://myprop/clust/ns1/ds1 -i 0"));
verify(mockTopics).getMessageIdByIndex("persistent://myprop/clust/ns1/ds1", 0);

}

Expand Down
Loading
Loading