Skip to content

[feat][admin] PIP-415: Support getting message ID by index #24222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -5489,4 +5489,65 @@ protected CompletableFuture<AutoSubscriptionCreationOverride> internalGetAutoSub
return null;
}));
}

protected CompletableFuture<MessageId> internalGetMessageIDByIndexAsync(Long offset, boolean authoritative) {
if (pulsar().getBrokerService().isBrokerEntryMetadataEnabled()) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"GetMessageIDByIndex is not allowed when broker entry metadata is disabled"));
}
int partitionIndex = topicName.getPartitionIndex();
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES);
return future.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
return CompletableFuture.completedFuture(null);
}
}).thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
} else {
return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(topicMetadata -> {
if (topicMetadata.partitions > 0) {
log.warn("[{}] Not supported getMessageIdByIndex operation on "
+ "partitioned-topic {}", clientAppId(), topicName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"GetMessageIDByIndex is not allowed on partitioned-topic");
}
});
}
}).thenCompose(ignore -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
if (!(topic instanceof PersistentTopic persistentTopic)) {
log.error("[{}] Get message id by offset on a non-persistent topic {} is not allowed",
clientAppId(), topicName);
return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
"Get message id by offset on a non-persistent topic is not allowed"));
}
ManagedLedger managedLedger = persistentTopic.getManagedLedger();
return managedLedger.asyncFindPosition(entry -> {
try {
BrokerEntryMetadata brokerEntryMetadata =
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
return brokerEntryMetadata.getIndex() < offset;
} catch (Exception e) {
log.error("Error deserialize message for message position find", e);
} finally {
entry.release();
}
return false;
});
}).thenCompose(position -> {
if (position == null) {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
"Message not found for offset " + offset));
} else {
return CompletableFuture
.completedFuture(new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
partitionIndex));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5015,5 +5015,38 @@ public void removeAutoSubscriptionCreation(
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/getMessageIdByIndex")
@ApiOperation(hidden = true, value = "Get Message ID by index.",
notes = "If the specified index is a system message, "
+ "it will return the message id of the later message.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace or partitioned topic does not exist, "
+ "or the index is invalid"),
@ApiResponse(code = 406, message = "The topic is not a persistent topic"),
@ApiResponse(code = 412, message = "The broker is not enable broker entry metadata"),
})
public void getMessageIDByIndexAndPartitionID(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("index") long index,
@QueryParam("authoritative") @DefaultValue("false")
boolean authoritative){
validateTopicName(tenant, namespace, encodedTopic);
internalGetMessageIDByIndexAsync(index, authoritative)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get message id by index for topic {}, partition id {}, index {}",
clientAppId(), topicName, index, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -420,4 +420,46 @@ public void testManagedLedgerTotalSize() throws Exception {

cursor.close();
}

@Test
public void testGetMessageIdByIndex() throws Exception {
// 1. test no partitioned topic
final String topicName = newTopicName();
admin.topics().createNonPartitionedTopic(topicName);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.create();
MessageIdImpl messageId = (MessageIdImpl) producer.send("test");
Message<byte[]>
message = admin.topics().getMessagesById(topicName, messageId.getLedgerId(), messageId.getEntryId()).get(0);
long index = message.getIndex().get();
MessageIdImpl messageIdByIndex = (MessageIdImpl) admin.topics().getMessageIdByIndex(topicName, index);
Assert.assertEquals(messageIdByIndex, messageId);

// 2. test partitioned topic
final String topicName2 = newTopicName();
final String partitionedTopicName = topicName2 + "-partition-" + 0;
admin.topics().createPartitionedTopic(topicName2, 10);
@Cleanup
Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
.topic(topicName2)
.enableBatching(false)
.create();

MessageIdImpl messageId2 = null;
for (int i = 0; i < 200; i++) {
messageId2 = (MessageIdImpl) producer2.send("test" + i);
if (messageId2.getPartitionIndex() == 0) {
break;
}
}
Message<byte[]>
message2 = admin.topics().getMessagesById(partitionedTopicName,
messageId2.getLedgerId(), messageId2.getEntryId()).get(0);
long index2 = message2.getIndex().get();
MessageIdImpl messageIdByIndex2 = (MessageIdImpl) admin.topics().getMessageIdByIndex(partitionedTopicName, index2);
Assert.assertEquals(messageIdByIndex2, messageId2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4558,4 +4558,20 @@ default void createShadowTopic(String shadowTopic, String sourceTopic) throws Pu
default CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String sourceTopic) {
return createShadowTopicAsync(shadowTopic, sourceTopic, null);
}

/**
* Get the message id by index.
* @param topicName the partitioned topic name or non-partitioned topic name
* @param index the index of a message
* @return the message id of the message
*/
MessageId getMessageIdByIndex(String topicName, long index) throws PulsarAdminException;

/**
* Get the message id by index asynchronously.
* @param topicName the topic name
* @param index the index of a message
* @return the message id of the message
*/
CompletableFuture<MessageId> getMessageIdByIndexAsync(String topicName, long index);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2819,5 +2819,31 @@ public CompletableFuture<Void> createShadowTopicAsync(String shadowTopic, String
});
}

@Override
public MessageId getMessageIdByIndex(String topicName, long offset) throws PulsarAdminException {
return sync(() -> getMessageIdByIndexAsync(topicName, offset));
}

@Override
public CompletableFuture<MessageId> getMessageIdByIndexAsync(String topicName, long offset) {
final CompletableFuture<MessageId> messageIdCompletableFuture = new CompletableFuture<>();
TopicName topic = validateTopic(topicName);
WebTarget path = topicPath(topic, "getMessageIdByOffset");
path = path.queryParam("offset", offset);
asyncGetRequest(path, new InvocationCallback<MessageIdImpl>(){

@Override
public void completed(MessageIdImpl messageId) {
messageIdCompletableFuture.complete(messageId);
}

@Override
public void failed(Throwable throwable) {
messageIdCompletableFuture.completeExceptionally(throwable);
}
});
return messageIdCompletableFuture;
}

private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2090,7 +2090,8 @@ public boolean matches(Long timestamp) {
cmdTopics.run(split("get-shadow-source persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getShadowSource("persistent://myprop/clust/ns1/ds1");


cmdTopics.run(split("get-message-id-by-index persistent://myprop/clust/ns1/ds1 -i 0"));
verify(mockTopics).getMessageIdByIndex("persistent://myprop/clust/ns1/ds1", 0);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced());

addCommand("trim-topic", new TrimTopic());

addCommand("get-message-id-by-index", new GetMessageIdByIndex());
}

@Command(description = "Get the list of topics under a namespace.")
Expand Down Expand Up @@ -3058,4 +3060,20 @@ void run() throws PulsarAdminException {
getAdmin().topics().trimTopic(topic);
}
}

@Command(description = "Get message id by index")
private class GetMessageIdByIndex extends CliCommand {

@Parameters(description = "persistent://tenant/namespace/topic", arity = "1")
private String topicName;

@Option(names = { "--index", "-i" }, description = "Index to get message id for the topic", required = true)
private Long index;

@Override
void run() throws Exception {
String topic = validateTopicName(topicName);
getAdmin().topics().getMessageIdByIndex(topic, index);
}
}
}
Loading