diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index ab9913dea4889..eacd3ac904e65 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC; import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import java.time.Clock; import java.util.ArrayList; @@ -114,6 +115,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + @VisibleForTesting + @Getter protected volatile boolean isFenced; protected final HierarchyTopicPolicies topicPolicies; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ae22175f375aa..1fdad8294a385 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -270,6 +270,7 @@ protected TopicStatsHelper initialValue() { private volatile double lastUpdatedAvgPublishRateInMsg = 0; private volatile double lastUpdatedAvgPublishRateInByte = 0; + @Getter private volatile boolean isClosingOrDeleting = false; private ScheduledFuture fencedTopicMonitoringTask = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java index 55483708fdf6a..ae4927da5ce0a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java @@ -33,23 +33,24 @@ public interface RawReader { */ static CompletableFuture create(PulsarClient client, String topic, String subscription) { - return create(client, topic, subscription, true); + return create(client, topic, subscription, true, true); } static CompletableFuture create(PulsarClient client, String topic, String subscription, - boolean createTopicIfDoesNotExist) { + boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors) { CompletableFuture> future = new CompletableFuture<>(); RawReader r = - new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future, createTopicIfDoesNotExist); + new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future, createTopicIfDoesNotExist, + retryOnRecoverableErrors); return future.thenApply(__ -> r); } static CompletableFuture create(PulsarClient client, ConsumerConfigurationData consumerConfiguration, - boolean createTopicIfDoesNotExist) { + boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors) { CompletableFuture> future = new CompletableFuture<>(); RawReader r = new RawReaderImpl((PulsarClientImpl) client, - consumerConfiguration, future, createTopicIfDoesNotExist); + consumerConfiguration, future, createTopicIfDoesNotExist, retryOnRecoverableErrors); return future.thenApply(__ -> r); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 5ac051d227119..32f75d71dc332 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.Schema; @@ -52,7 +53,7 @@ public class RawReaderImpl implements RawReader { public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, CompletableFuture> consumerFuture, - boolean createTopicIfDoesNotExist) { + boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors) { consumerConfiguration = new ConsumerConfigurationData<>(); consumerConfiguration.getTopicNames().add(topic); consumerConfiguration.setSubscriptionName(subscription); @@ -62,14 +63,16 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); consumerConfiguration.setAckReceiptEnabled(true); - consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist); + consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist, + retryOnRecoverableErrors); } public RawReaderImpl(PulsarClientImpl client, ConsumerConfigurationData consumerConfiguration, CompletableFuture> consumerFuture, - boolean createTopicIfDoesNotExist) { + boolean createTopicIfDoesNotExist, boolean retryOnRecoverableErrors) { this.consumerConfiguration = consumerConfiguration; - consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist); + consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist, + retryOnRecoverableErrors); } @@ -117,9 +120,11 @@ public String toString() { static class RawConsumerImpl extends ConsumerImpl { final BlockingQueue incomingRawMessages; final Queue> pendingRawReceives; + final boolean retryOnRecoverableErrors; RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, - CompletableFuture> consumerFuture, boolean createTopicIfDoesNotExist) { + CompletableFuture> consumerFuture, boolean createTopicIfDoesNotExist, + boolean retryOnRecoverableErrors) { super(client, conf.getSingleTopic(), conf, @@ -135,6 +140,14 @@ static class RawConsumerImpl extends ConsumerImpl { ); incomingRawMessages = new GrowableArrayBlockingQueue<>(); pendingRawReceives = new ConcurrentLinkedQueue<>(); + this.retryOnRecoverableErrors = retryOnRecoverableErrors; + } + + protected boolean isUnrecoverableError(Throwable t) { + if (!retryOnRecoverableErrors && (t instanceof PulsarClientException.ServiceNotReadyException)) { + return true; + } + return super.isUnrecoverableError(t); } void tryCompletePending() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java index 983443432ff49..d37298757db9d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java @@ -56,7 +56,7 @@ public Compactor(ServiceConfiguration conf, } public CompletableFuture compact(String topic) { - return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false).thenComposeAsync( + return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false, false).thenComposeAsync( this::compactAndCloseReader, scheduler); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java index 3a446101990f5..66fff46b6604f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java @@ -18,14 +18,19 @@ */ package org.apache.pulsar.client.impl; +import static org.testng.Assert.assertTrue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -76,7 +81,7 @@ public void testInterruptedWhenCreateConsumer() throws InterruptedException { .subscribe(); Assert.fail("Should have thrown an exception"); } catch (PulsarClientException e) { - Assert.assertTrue(e.getCause() instanceof InterruptedException); + assertTrue(e.getCause() instanceof InterruptedException); } }); startConsumer.start(); @@ -88,4 +93,39 @@ public void testInterruptedWhenCreateConsumer() throws InterruptedException { Assert.assertEquals(clientImpl.consumersCount(), 0); }); } + + @Test + public void testReceiveWillDoneAfterClosedConsumer() throws Exception { + String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + String subName = "test-sub"; + admin.topics().createNonPartitionedTopic(tpName); + admin.topics().createSubscription(tpName, subName, MessageId.earliest); + ConsumerImpl consumer = + (ConsumerImpl) pulsarClient.newConsumer().topic(tpName).subscriptionName(subName).subscribe(); + CompletableFuture> future = consumer.receiveAsync(); + consumer.close(); + Awaitility.await().untilAsserted(() -> { + assertTrue(future.isDone()); + }); + } + + @Test + public void testReceiveWillDoneAfterTopicDeleted() throws Exception { + String namespace = "public/default"; + admin.namespaces().setAutoTopicCreation(namespace, AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(false).build()); + String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + String subName = "test-sub"; + admin.topics().createNonPartitionedTopic(tpName); + admin.topics().createSubscription(tpName, subName, MessageId.earliest); + ConsumerImpl consumer = + (ConsumerImpl) pulsarClient.newConsumer().topic(tpName).subscriptionName(subName).subscribe(); + CompletableFuture> future = consumer.receiveAsync(); + admin.topics().delete(tpName, true); + Awaitility.await().untilAsserted(() -> { + assertTrue(future.isDone()); + }); + // cleanup. + admin.namespaces().removeAutoTopicCreation(namespace); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index d9ddc00b2e863..a88d590f202c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -27,12 +27,17 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -44,6 +49,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -54,12 +60,17 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.awaitility.Awaitility; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.apache.pulsar.client.impl.RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; @Test(groups = "broker-impl") @Slf4j @@ -215,7 +226,7 @@ public void testRawReaderWithConfigurationCreation() throws Exception { consumerConfiguration.setReadCompacted(true); consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); consumerConfiguration.setAckReceiptEnabled(true); - RawReader reader = RawReader.create(pulsarClient, consumerConfiguration, true).get(); + RawReader reader = RawReader.create(pulsarClient, consumerConfiguration, true, true).get(); MessageId lastMessageId = reader.getLastMessageIdAsync().get(); while (true) { @@ -547,11 +558,62 @@ public void testAutoCreateTopic() throws ExecutionException, InterruptedExceptio String topic2 = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); try { - reader = RawReader.create(pulsarClient, topic2, subscription, false).get(); + reader = RawReader.create(pulsarClient, topic2, subscription, false, true).get(); Assert.fail(); } catch (Exception e) { Assert.assertTrue(e.getCause() instanceof PulsarClientException.TopicDoesNotExistException); } reader.closeAsync().join(); } + + @Test(timeOut = 60000) + public void testReconnectsWhenServiceNotReady() throws Exception { + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); + String subscriptionName = "s1"; + admin.topics().createNonPartitionedTopic(topic); + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); + + // Inject a delay event for topic close, which leads to that the raw-reader will get a ServiceNotReady error, + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + ManagedCursor compactionCursor = ml.openCursor(subscriptionName); + ManagedCursor spyCompactionCursor = spy(compactionCursor); + CountDownLatch delayCloseCursorSignal = new CountDownLatch(1); + Answer answer = new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + delayCloseCursorSignal.await(); + return invocationOnMock.callRealMethod(); + } + }; + doAnswer(answer).when(spyCompactionCursor).asyncClose(any(AsyncCallbacks.CloseCallback.class), any()); + ml.getCursors().removeCursor(subscriptionName); + ml.getCursors().add(spyCompactionCursor, ml.getLastConfirmedEntry()); + + // Unload topic after reader is connected. + // The topic state comes to "fenced", then RawReader will get a ServiceNotReady error, + CompletableFuture msgFuture = reader.readNextAsync(); + CompletableFuture unloadFuture = admin.topics().unloadAsync(topic); + Awaitility.await().untilAsserted(() -> { + Assert.assertTrue(persistentTopic.isFenced()); + }); + + // Verify: RasReader reconnected after that the unloading is finished, and it can consume successfully. + delayCloseCursorSignal.countDown(); + unloadFuture.get(); + MessageIdImpl msgIdSent = (MessageIdImpl) producer.send("msg"); + RawMessage rawMessage = msgFuture.get(); + Assert.assertNotNull(rawMessage); + MessageIdImpl msgIdReceived = (MessageIdImpl) rawMessage.getMessageId(); + Assert.assertEquals(msgIdSent.getLedgerId(), msgIdReceived.getLedgerId()); + Assert.assertEquals(msgIdSent.getEntryId(), msgIdReceived.getEntryId()); + + // cleanup. + rawMessage.close();; + producer.close(); + reader.closeAsync().get(); + admin.topics().delete(topic, false); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index ff9026dbba6e5..f43e2a1c672c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -20,7 +20,11 @@ import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocations; +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -60,9 +64,11 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.api.OpenBuilder; import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; @@ -101,6 +107,8 @@ import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -2320,6 +2328,65 @@ public void testAcknowledgeWithReconnection() throws Exception { producer.close(); } + @Test(timeOut = 120 * 1000) + public void testConcurrentCompactionAndTopicDelete() throws Exception { + final String topicName = newUniqueName("persistent://my-tenant/my-ns/tp"); + admin.topics().createNonPartitionedTopic(topicName); + // Load up the topic. + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + + // Inject a reading delay to the compaction task, + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + ManagedCursor compactionCursor = ml.openCursor(COMPACTION_SUBSCRIPTION); + ManagedCursor spyCompactionCursor = spy(compactionCursor); + CountDownLatch delayReadSignal = new CountDownLatch(1); + Answer answer = new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + delayReadSignal.await(); + return invocationOnMock.callRealMethod(); + } + }; + doAnswer(answer).when(spyCompactionCursor).asyncReadEntries(anyInt(), + any(AsyncCallbacks.ReadEntriesCallback.class), any(), any(Position.class)); + doAnswer(answer).when(spyCompactionCursor).asyncReadEntries(anyInt(), anyLong(), + any(AsyncCallbacks.ReadEntriesCallback.class), any(), any(Position.class)); + doAnswer(answer).when(spyCompactionCursor).asyncReadEntriesOrWait(anyInt(), anyLong(), + any(AsyncCallbacks.ReadEntriesCallback.class), any(), any(Position.class)); + ml.getCursors().removeCursor(COMPACTION_SUBSCRIPTION); + ml.getCursors().add(spyCompactionCursor, ml.getLastConfirmedEntry()); + + // Trigger a compaction task. + for (int i = 0; i < 2000; i++) { + producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send(); + } + ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer(Schema.STRING) + .topic(topicName).readCompacted(true).subscriptionName("s1") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + persistentTopic.triggerCompaction(); + Awaitility.await().untilAsserted(() -> { + assertEquals(persistentTopic.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 1); + }); + + // Since we injected a delay reading, the compaction task started and not finish yet. + // Call topic deletion, they two tasks are concurrently executed. + producer.close(); + consumer.close(); + CompletableFuture deleteTopicFuture = persistentTopic.deleteForcefully(); + + // Remove the injection after 3s. + Thread.sleep(3000); + delayReadSignal.countDown(); + + // Verify: topic deletion is successfully executed. + Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(() -> { + assertTrue(deleteTopicFuture.isDone()); + }); + } + @Test public void testEarliestSubsAfterRollover() throws Exception { final String topicName = newUniqueName("persistent://my-tenant/my-ns/testEarliestSubsAfterRollover"); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 6bcba51defdd1..5005549f476ee 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -362,6 +362,17 @@ public TooManyRequestsException(String msg) { } } + /** + * Relates to server-side errors: + * ServiceUnitNotReadyException, TopicFencedException and SubscriptionFencedException. + */ + public static class ServiceNotReadyException extends LookupException { + + public ServiceNotReadyException(String msg) { + super(msg); + } + } + /** * Connect exception thrown by Pulsar client. */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 24fd0034eb080..0563fa7e66667 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1353,7 +1353,7 @@ public static PulsarClientException getPulsarClientException(ServerError error, case PersistenceError: return new PulsarClientException.BrokerPersistenceException(errorMsg); case ServiceNotReady: - return new PulsarClientException.LookupException(errorMsg); + return new PulsarClientException.ServiceNotReadyException(errorMsg); case TooManyRequests: return new PulsarClientException.TooManyRequestsException(errorMsg); case ProducerBlockedQuotaExceededError: diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index fb5b6788d08e5..a4e262b35aedf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -64,7 +64,15 @@ interface Connection { * @apiNote If the returned future is completed exceptionally, reconnectLater will be called. */ CompletableFuture connectionOpened(ClientCnx cnx); - default void connectionFailed(PulsarClientException e) { + + /** + * + * @param e What error happened when tries to get a connection + * @return If "true", the connection handler will retry to get a connection, otherwise, it stops to get a new + * connection. If it returns "false", you should release resources that consumers/producers occupied. + */ + default boolean connectionFailed(PulsarClientException e) { + return true; } } @@ -142,22 +150,24 @@ protected void grabCnx(Optional hostURI) { } private Void handleConnectionError(Throwable exception) { + boolean toRetry = true; try { log.warn("[{}] [{}] Error connecting to broker: {}", state.topic, state.getHandlerName(), exception.getMessage()); if (exception instanceof PulsarClientException) { - connection.connectionFailed((PulsarClientException) exception); + toRetry = connection.connectionFailed((PulsarClientException) exception); } else if (exception.getCause() instanceof PulsarClientException) { - connection.connectionFailed((PulsarClientException) exception.getCause()); + toRetry = connection.connectionFailed((PulsarClientException) exception.getCause()); } else { - connection.connectionFailed(new PulsarClientException(exception)); + toRetry = connection.connectionFailed(new PulsarClientException(exception)); } } catch (Throwable throwable) { log.error("[{}] [{}] Unexpected exception after the connection", state.topic, state.getHandlerName(), throwable); } - - reconnectLater(exception); + if (toRetry) { + reconnectLater(exception); + } return null; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 275966c26b33e..f575a127c96c3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -974,6 +974,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { if (e.getCause() instanceof PulsarClientException && PulsarClientException.isRetriableError(e.getCause()) + && !isUnrecoverableError(e.getCause()) && System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) { future.completeExceptionally(e.getCause()); } else if (!subscribeFuture.isDone()) { @@ -985,18 +986,8 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { + "with subscription name %s when connecting to the broker", topicName.toString(), subscription))); client.cleanupConsumer(this); - } else if (e.getCause() instanceof TopicDoesNotExistException) { - // The topic was deleted after the consumer was created, and we're - // not allowed to recreate the topic. This can happen in few cases: - // * Regex consumer getting error after topic gets deleted - // * Regular consumer after topic is manually delete and with - // auto-topic-creation set to false - // No more retries are needed in this case. - setState(State.Failed); - closeConsumerTasks(); - client.cleanupConsumer(this); - log.warn("[{}][{}] Closed consumer because topic does not exist anymore {}", - topic, subscription, cnx.channel().remoteAddress()); + } else if (isUnrecoverableError(e.getCause())) { + closeWhenReceivedUnrecoverableError(e.getCause(), cnx); } else { // consumer was subscribed and connected but we got some error, keep trying future.completeExceptionally(e.getCause()); @@ -1011,6 +1002,37 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { return future; } + /*** + * Different consumer implementation can define its additional unrecoverable error. + */ + protected boolean isUnrecoverableError(Throwable t) { + // TopicDoesNotExistException: topic has been deleted. + // NotFoundException: topic has been deleted. + // IllegalStateException: consumer has been closed. + return (t instanceof TopicDoesNotExistException) || (t instanceof IllegalStateException) + || (t instanceof PulsarClientException.NotFoundException); + } + + protected void closeWhenReceivedUnrecoverableError(Throwable t, ClientCnx cnx) { + // The topic was deleted after the consumer was created, and we're + // not allowed to recreate the topic. This can happen in few cases: + // * Regex consumer getting error after topic gets deleted + // * Regular consumer after topic is manually delete and with + // auto-topic-creation set to false + // No more retries are needed in this case. + final String cnxStr = cnx == null ? "null" : String.valueOf(cnx.channel().remoteAddress()); + log.warn("[{}][{}] {} Closed consumer because get an error that does not support to retry: {} {}", + topic, subscription, cnxStr, t.getClass().getName(), t.getMessage()); + closeAsync().whenComplete((__, ex) -> { + if (ex == null) { + setState(State.Failed); + return; + } + log.error("[{}][{}] {} Failed to close consumer after got an error that does not support to retry: {} {}", + topic, subscription, cnxStr, t.getClass().getName(), t.getMessage()); + }); + } + protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize) { log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", topic, subscription, cnx.channel().remoteAddress(), consumerId); @@ -1091,7 +1113,7 @@ private void sendFlowPermitsToBroker(ClientCnx cnx, int numMessages) { } @Override - public void connectionFailed(PulsarClientException exception) { + public boolean connectionFailed(PulsarClientException exception) { boolean nonRetriableError = !PulsarClientException.isRetriableError(exception); boolean timeout = System.currentTimeMillis() > lookupDeadline; if (nonRetriableError || timeout) { @@ -1107,10 +1129,18 @@ public void connectionFailed(PulsarClientException exception) { closeConsumerTasks(); deregisterFromClientCnx(); client.cleanupConsumer(this); + return false; + } else { + Throwable actError = FutureUtil.unwrapCompletionException(exception); + if (isUnrecoverableError(actError)) { + closeWhenReceivedUnrecoverableError(actError, null); + return false; + } } } else { previousExceptionCount.incrementAndGet(); } + return true; } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 9960af6046ac3..b6c27c4ca3d27 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1973,13 +1973,8 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { } // Close the producer since topic does not exist. - if (cause instanceof PulsarClientException.TopicDoesNotExistException) { - closeAsync().whenComplete((v, ex) -> { - if (ex != null) { - log.error("Failed to close producer on TopicDoesNotExistException.", ex); - } - producerCreatedFuture.completeExceptionally(cause); - }); + if (isUnrecoverableError(cause)) { + closeWhenReceivedUnrecoverableError(cause, cnx); future.complete(null); return null; } @@ -2051,8 +2046,28 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { return future; } + protected boolean isUnrecoverableError(Throwable t) { + // TopicDoesNotExistException: topic has been deleted. + // NotFoundException: topic has been deleted. + // IllegalStateException: producer has been closed. + return (t instanceof PulsarClientException.TopicDoesNotExistException) || (t instanceof IllegalStateException) + || (t instanceof PulsarClientException.NotFoundException); + } + + protected void closeWhenReceivedUnrecoverableError(Throwable t, ClientCnx cnx) { + final String cnxStr = cnx == null ? "null" : String.valueOf(cnx.channel().remoteAddress()); + log.warn("[{}][{}] {} Closed producer because get an error that does not support to retry: {} {}", + topic, producerName, cnxStr, t.getClass().getName(), t.getMessage()); + closeAsync().whenComplete((v, ex) -> { + if (ex != null) { + log.error("Failed to close producer on TopicDoesNotExistException.", ex); + } + producerCreatedFuture.completeExceptionally(t); + }); + } + @Override - public void connectionFailed(PulsarClientException exception) { + public boolean connectionFailed(PulsarClientException exception) { boolean nonRetriableError = !PulsarClientException.isRetriableError(exception); boolean timeout = System.currentTimeMillis() > lookupDeadline; if (nonRetriableError || timeout) { @@ -2067,10 +2082,18 @@ public void connectionFailed(PulsarClientException exception) { closeProducerTasks(); setState(State.Failed); client.cleanupProducer(this); + return false; + } else { + Throwable actError = FutureUtil.unwrapCompletionException(exception); + if (isUnrecoverableError(actError)) { + closeWhenReceivedUnrecoverableError(actError, null); + return false; + } } } else { previousExceptionCount.incrementAndGet(); } + return true; } private void closeProducerTasks() { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java index 93fa7082f33c6..5357b15462255 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java @@ -89,7 +89,7 @@ public TopicListWatcher(PatternConsumerUpdateQueue patternConsumerUpdateQueue, } @Override - public void connectionFailed(PulsarClientException exception) { + public boolean connectionFailed(PulsarClientException exception) { boolean nonRetriableError = !PulsarClientException.isRetriableError(exception); if (nonRetriableError) { exception.setPreviousExceptionCount(previousExceptionCount); @@ -98,10 +98,12 @@ public void connectionFailed(PulsarClientException exception) { log.info("[{}] Watcher creation failed for {} with non-retriable error {}", topic, name, exception.getMessage()); deregisterFromClientCnx(); + return false; } } else { previousExceptionCount.incrementAndGet(); } + return true; } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index a1fe78d7290da..4ca742d98ea48 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -122,7 +122,7 @@ public void start() { } @Override - public void connectionFailed(PulsarClientException exception) { + public boolean connectionFailed(PulsarClientException exception) { boolean nonRetriableError = !PulsarClientException.isRetriableError(exception); boolean timeout = System.currentTimeMillis() > lookupDeadline; if (nonRetriableError || timeout) { @@ -136,10 +136,12 @@ public void connectionFailed(PulsarClientException exception) { + "timeout", transactionCoordinatorId, exception); } setState(State.Failed); + return false; } } else { previousExceptionCount.getAndIncrement(); } + return true; } @Override