diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 050b0b10..3ce8e3c6 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -649,6 +649,14 @@ void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, const callback(result, msg); } +static void logErrorTopicNameForAcknowledge(const std::string& topic) { + if (topic.empty()) { + LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer"); + } else { + LOG_ERROR("Message of topic: " << topic << " not in consumers"); + } +} + void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, const ResultCallback& callback) { if (state_ != Ready) { interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultAlreadyClosed, msgId); @@ -657,19 +665,14 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, const Res } const std::string& topicPartitionName = msgId.getTopicName(); - if (topicPartitionName.empty()) { - LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer"); - callback(ResultOperationNotSupported); - return; - } auto optConsumer = consumers_.find(topicPartitionName); if (optConsumer) { unAckedMessageTrackerPtr_->remove(msgId); optConsumer.value()->acknowledgeAsync(msgId, callback); } else { - LOG_ERROR("Message of topic: " << topicPartitionName << " not in unAckedMessageTracker"); - callback(ResultUnknownError); + logErrorTopicNameForAcknowledge(topicPartitionName); + callback(ResultOperationNotSupported); } } @@ -684,7 +687,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis for (const MessageId& messageId : messageIdList) { const auto& topicName = messageId.getTopicName(); if (topicName.empty()) { - LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer"); + logErrorTopicNameForAcknowledge(topicName); callback(ResultOperationNotSupported); return; } @@ -710,19 +713,22 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis unAckedMessageTrackerPtr_->remove(kv.second); optConsumer.value()->acknowledgeAsync(kv.second, cb); } else { - LOG_ERROR("Message of topic: " << kv.first << " not in consumers"); - callback(ResultUnknownError); + logErrorTopicNameForAcknowledge(kv.first); + callback(ResultOperationNotSupported); } } } void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, const ResultCallback& callback) { - msgId.getTopicName(); + const auto& topic = msgId.getTopicName(); auto optConsumer = consumers_.find(msgId.getTopicName()); if (optConsumer) { unAckedMessageTrackerPtr_->removeMessagesTill(msgId); optConsumer.value()->acknowledgeCumulativeAsync(msgId, callback); + } else { + logErrorTopicNameForAcknowledge(topic); + callback(ResultOperationNotSupported); } } diff --git a/tests/MultiTopicsConsumerTest.cc b/tests/MultiTopicsConsumerTest.cc index 0694fa4b..d59b50dc 100644 --- a/tests/MultiTopicsConsumerTest.cc +++ b/tests/MultiTopicsConsumerTest.cc @@ -103,3 +103,42 @@ TEST(MultiTopicsConsumerTest, testSeekToNewerPosition) { client.close(); } + +TEST(MultiTopicsConsumerTest, testAcknowledgeInvalidMessageId) { + const std::string topicPrefix = "multi-topics-consumer-ack-invalid-msg-id"; + Client client{lookupUrl}; + std::vector topics(2); + for (size_t i = 0; i < topics.size(); i++) { + Producer producer; + auto topic = topicPrefix + unique_str(); + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build())); + topics[i] = std::move(topic); + } + + Consumer consumer; + ConsumerConfiguration conf; + conf.setSubscriptionInitialPosition(InitialPositionEarliest); + ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", conf, consumer)); + + std::vector msgIds(topics.size()); + for (size_t i = 0; i < topics.size(); i++) { + Message msg; + ASSERT_EQ(ResultOk, consumer.receive(msg, 3000)); + std::string serialized; + msg.getMessageId().serialize(serialized); + msgIds[i] = MessageId::deserialize(serialized); + } + + ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds[0])); + ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds)); + ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledgeCumulative(msgIds[1])); + + msgIds[0].setTopicName("invalid-topic"); + msgIds[1].setTopicName("invalid-topic"); + ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds[0])); + ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds)); + ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledgeCumulative(msgIds[1])); + + client.close(); +}