diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 257780690d644..84d55430f8f4f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -962,6 +962,12 @@ protected static boolean isNotFoundException(Throwable ex) { == Status.NOT_FOUND.getStatusCode(); } + protected static boolean is4xxRestException(Throwable ex) { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + return realCause instanceof WebApplicationException + && (((WebApplicationException) realCause).getResponse().getStatus() % 100 == 4); + } + protected static boolean isConflictException(Throwable ex) { Throwable realCause = FutureUtil.unwrapCompletionException(ex); return realCause instanceof WebApplicationException @@ -984,6 +990,10 @@ protected static boolean isNot307And404And400Exception(Throwable ex) { return !isRedirectException(ex) && !isNotFoundException(ex) && !isBadRequest(ex); } + protected static boolean isNot307And4xxException(Throwable ex) { + return !isRedirectException(ex) && !is4xxRestException(ex); + } + protected static String getTopicNotFoundErrorMessage(String topic) { return String.format("Topic %s not found", topic); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index c666a19b61d55..20d748f99a710 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4988,7 +4988,7 @@ protected CompletableFuture internalRemoveSubscribeRate(boolean isGlobal) protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) { Throwable cause = thr.getCause(); - if (isNot307And404And400Exception(cause)) { + if (isNot307And4xxException(cause)) { log.error("[{}] Failed to perform {} on topic {}", clientAppId(), methodName, topicName, cause); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 538731eda4315..5f4384c9a317e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; @@ -46,6 +47,7 @@ import javax.ws.rs.core.Response; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -2332,7 +2334,26 @@ public void setReplicationClusters( @ApiParam(value = "List of replication clusters", required = true) List clusterIds) { validateTopicName(tenant, namespace, encodedTopic); validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE) - .thenCompose(__ -> preValidation(authoritative)) + .thenCompose(__ -> preValidation(authoritative)).thenCompose(__ -> { + // Set a topic-level replicated clusters that do not contain local cluster is not meaningful, except + // the following scenario: User has two clusters, which enabled Geo-Replication through a global + // metadata store, the resources named partitioned topic metadata and the resource namespace-level + // "replicated clusters" are shared between multi clusters. Pulsar can hardly delete a specify + // partitioned topic. To support this use case, the following steps can implement it: + // 1. set a global topic-level replicated clusters that do not contain local cluster. + // 2. the local cluster will remove the subtopics automatically, and remove the schemas and local + // topic policies. Just leave the global topic policies there, which prevents the namespace level + // replicated clusters policy taking affect. + // TODO But the API "pulsar-admin topics set-replication-clusters" does not support global policy, + // to support this scenario, a PIP is needed. + boolean clustersDoesNotContainsLocal = CollectionUtils.isEmpty(clusterIds) + || !clusterIds.contains(pulsar().getConfig().getClusterName()); + if (clustersDoesNotContainsLocal) { + return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, + "Can not remove local cluster from the topic-level replication clusters policy")); + } + return CompletableFuture.completedFuture(null); + }) .thenCompose(__ -> internalSetReplicationClusters(clusterIds)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ae22175f375aa..a7c895ee28d91 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2015,9 +2015,15 @@ CompletableFuture deleteSchemaAndPoliciesIfClusterRemoved() { } }); - // TODO regarding the topic level policies, it will be deleted at a seperate PR. - // Because there is an issue related to Global policies has not been solved so - // far. + // There are only one cases that will remove local clusters: using global metadata + // store, namespaces will share policies cross multi clusters, including + // "replicated clusters" and "partitioned topic metadata", we can hardly delete + // partitioned topic from one cluster and keep it exists in another. Removing + // local cluster from the namespace level "replicated clusters" can do this. + // TODO: there is no way to delete a specify partitioned topic if users have enabled + // Geo-Replication with a global metadata store, a PIP is needed. + // Since the system topic "__change_events" under the namespace will also be + // deleted, we can skip to delete topic-level policies. } } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index bf2c3e5b2aa18..5eee0844ef65d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -32,6 +32,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -96,6 +97,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; @@ -3797,4 +3799,35 @@ public void testSetSubRateWithSub() throws Exception { .ratePeriodInSecond(10) .build()); } + + @DataProvider + public Object[][] topicTypes() { + return new Object[][]{ + {TopicType.PARTITIONED}, + {TopicType.NON_PARTITIONED} + }; + } + + @Test(dataProvider = "topicTypes") + public void testRemoveLocalCluster(TopicType topicType) throws Exception { + String topic = "persistent://" + myNamespace + "/testSetSubRateWithSub"; + if (TopicType.PARTITIONED.equals(topicType)) { + admin.topics().createNonPartitionedTopic(topic); + } else { + admin.topics().createPartitionedTopic(topic, 2); + } + try { + admin.topics().setReplicationClusters(topic, Arrays.asList("not-local-cluster")); + fail("Can not remove local cluster from the topic-level replication clusters policy"); + } catch (PulsarAdminException.PreconditionFailedException e) { + assertTrue(e.getMessage().contains("Can not remove local cluster from the topic-level replication clusters" + + " policy")); + } + // cleanup. + if (TopicType.PARTITIONED.equals(topicType)) { + admin.topics().delete(topic, false); + } else { + admin.topics().deletePartitionedTopic(topic, false); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 3c7edaec44d00..1244300378a8c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -91,6 +91,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -1422,4 +1423,88 @@ public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) thro admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace, SchemaCompatibilityStrategy.FORWARD); } + + /*** + * Manually modifying topic policies by Rest API. + * - Global topic level policies: + * - Add: replicate + * - Update: replicate + * - Delete a single policy(it is equivalent to specify updating): delete both local and remote policies. + * - Local topic level policies: + * - Add: never replicate + * - Update: never replicate + * - Delete a single policy(it is equivalent to specify updating): delete local policies only. + * Delete Topic triggers that both local and global policies will be deleted in local cluster, but will not delete + * the remote cluster's global policies. This test case will be covered by + * "OneWayReplicatorUsingGlobalPartitionedTest.testRemoveCluster". + */ + @Test + public void testTopicPoliciesReplicationRule() throws Exception { + // Init Pulsar resources. + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + final TopicName topicNameObj = TopicName.get(topicName); + final String subscriptionName = "s1"; + admin1.topics().createNonPartitionedTopic(topicName); + Producer producer1 = client1.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create(); + waitReplicatorStarted(topicName); + producer1.close(); + assertTrue(pulsar2.getPulsarResources().getTopicResources().persistentTopicExists(topicNameObj).join()); + admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + admin1.topics().createSubscription(subscriptionName, topicName, MessageId.earliest); + admin2.topics().createSubscription(subscriptionName, topicName, MessageId.earliest); + + // Case 1: Global topic level policies -> Add: replicate. + PublishRate publishRateAddGlobal = new PublishRate(100, 10000); + admin1.topicPolicies(true).setPublishRate(topicName, publishRateAddGlobal); + // Case 4: Local topic level policies -> Add: never replicate. + PublishRate publishRateAddLocal = new PublishRate(200, 20000); + admin1.topicPolicies(false).setPublishRate(topicName, publishRateAddLocal); + waitChangeEventsReplicated(replicatedNamespace); + Thread.sleep(2000); + Awaitility.await().untilAsserted(() -> { + PublishRate valueGlobal = admin2.topicPolicies(true).getPublishRate(topicName); + assertEquals(valueGlobal, publishRateAddGlobal); + PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName); + assertNull(valueLocal); + }); + + // Case 2: Global topic level policies -> Update: replicate. + PublishRate publishRateUpdateGlobal = new PublishRate(300, 30000); + admin1.topicPolicies(true).setPublishRate(topicName, publishRateUpdateGlobal); + // Case 5: Local topic level policies -> Update: never replicate. + PublishRate publishRateUpdateLocal = new PublishRate(400, 40000); + admin1.topicPolicies(false).setPublishRate(topicName, publishRateUpdateLocal); + waitChangeEventsReplicated(replicatedNamespace); + Thread.sleep(2000); + Awaitility.await().untilAsserted(() -> { + PublishRate valueGlobal = admin2.topicPolicies(true).getPublishRate(topicName); + assertEquals(valueGlobal, publishRateUpdateGlobal); + PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName); + assertNull(valueLocal); + }); + + // Case 3: Global topic level policies -> Delete: delete both local and remote policies. + admin1.topicPolicies(true).removePublishRate(topicName); + waitChangeEventsReplicated(replicatedNamespace); + Thread.sleep(2000); + Awaitility.await().untilAsserted(() -> { + PublishRate valueGlobal = admin2.topicPolicies(true).getPublishRate(topicName); + assertNull(valueGlobal); + }); + + // Case 6: Local topic level policies -> Delete: never replicate. + PublishRate publishRateAddLocal2 = new PublishRate(500, 50000); + admin2.topicPolicies(false).setPublishRate(topicName, publishRateAddLocal2); + Awaitility.await().untilAsserted(() -> { + PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName); + assertEquals(valueLocal, publishRateAddLocal2); + }); + admin1.topicPolicies(false).removePublishRate(topicName); + waitChangeEventsReplicated(replicatedNamespace); + Thread.sleep(2000); + Awaitility.await().untilAsserted(() -> { + PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName); + assertEquals(valueLocal, publishRateAddLocal2); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 534da8bbc491b..d222abde3a36f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -36,9 +36,12 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.ClientBuilder; @@ -48,6 +51,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -506,4 +510,41 @@ protected void waitReplicatorStopped(PulsarService sourceCluster, PulsarService || !persistentTopic1.getReplicators().get(targetCluster.getConfig().getClusterName()).isConnected()); }); } + + protected void waitChangeEventsReplicated(String ns) { + String topicName = "persistent://" + ns + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; + TopicName topicNameObj = TopicName.get(topicName); + Optional metadata = pulsar1.getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources() + .getPartitionedTopicMetadataAsync(topicNameObj).join(); + Function ensureNoBacklog = new Function() { + + @Override + public Boolean apply(Replicator replicator) { + if (!replicator.getRemoteCluster().equals("c2")) { + return true; + } + PersistentReplicator persistentReplicator = (PersistentReplicator) replicator; + Position lac = persistentReplicator.getCursor().getManagedLedger().getLastConfirmedEntry(); + Position mdPos = persistentReplicator.getCursor().getMarkDeletedPosition(); + return mdPos.compareTo(lac) >= 0; + } + }; + if (metadata.isPresent()) { + for (int index = 0; index < metadata.get().partitions; index++) { + String partitionName = topicNameObj.getPartition(index).toString(); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar1.getBrokerService().getTopic(partitionName, false).join().get(); + persistentTopic.getReplicators().values().forEach(replicator -> { + assertTrue(ensureNoBacklog.apply(replicator)); + }); + } + } else { + PersistentTopic persistentTopic = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + persistentTopic.getReplicators().values().forEach(replicator -> { + assertTrue(ensureNoBacklog.apply(replicator)); + }); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java index 585fe6ececaf9..a97dbfa4efd0c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.service.TopicPoliciesService.GetType.GLOBAL_ONLY; +import static org.apache.pulsar.broker.service.TopicPoliciesService.GetType.LOCAL_ONLY; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -34,6 +36,8 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.protocol.schema.StoredSchema; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; @@ -181,6 +185,12 @@ public void testRemoveCluster() throws Exception { admin1.namespaces().createNamespace(ns1); admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2))); admin1.topics().createPartitionedTopic(topic, 2); + PublishRate publishRateAddGlobal = new PublishRate(100, 10000); + admin1.topicPolicies(true).setPublishRate(topic, publishRateAddGlobal); + PublishRate publishRateAddLocal1 = new PublishRate(200, 20000); + admin1.topicPolicies(false).setPublishRate(topic, publishRateAddLocal1); + PublishRate publishRateAddLocal2 = new PublishRate(300, 30000); + admin2.topicPolicies(false).setPublishRate(topic, publishRateAddLocal2); Awaitility.await().untilAsserted(() -> { assertTrue(pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() .partitionedTopicExists(TopicName.get(topic))); @@ -190,6 +200,10 @@ public void testRemoveCluster() throws Exception { List> schemaList21 = pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get(); assertEquals(schemaList21.size(), 0); + PublishRate valueGlobal = admin2.topicPolicies(true).getPublishRate(topic); + assertEquals(valueGlobal, publishRateAddGlobal); + PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topic); + assertEquals(valueLocal, publishRateAddLocal2); }); // Wait for copying messages. @@ -201,6 +215,10 @@ public void testRemoveCluster() throws Exception { assertTrue(tps.containsKey(topicP0)); assertTrue(tps.containsKey(topicP1)); assertTrue(tps.containsKey(topicChangeEvents)); + Map>> tps2 = pulsar2.getBrokerService().getTopics(); + assertTrue(tps2.containsKey(topicP0)); + assertTrue(tps2.containsKey(topicP1)); + assertTrue(tps2.containsKey(topicChangeEvents)); List> schemaList12 = pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get(); assertEquals(schemaList12.size(), 1); @@ -227,6 +245,17 @@ public void testRemoveCluster() throws Exception { List> schemaList23 = pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get(); assertEquals(schemaList23.size(), 1); + // Verify: the topic policies will be removed in local cluster, but remote cluster will not. + Optional globalPolicies2 = pulsar2.getTopicPoliciesService() + .getTopicPoliciesAsync(TopicName.get(topic), GLOBAL_ONLY).join(); + assertTrue(globalPolicies2.isPresent(), "Remote cluster should have global policies."); + assertEquals(globalPolicies2.get().getPublishRate(), publishRateAddGlobal, + "Remote cluster should have global policies: publish rate."); + Optional localPolicies2 = pulsar2.getTopicPoliciesService() + .getTopicPoliciesAsync(TopicName.get(topic), LOCAL_ONLY).join(); + assertTrue(localPolicies2.isPresent(), "Remote cluster should have local policies."); + assertEquals(localPolicies2.get().getPublishRate(), publishRateAddLocal2, + "Remote cluster should have local policies: publish rate."); }); // cleanup. @@ -239,4 +268,10 @@ public void testRemoveCluster() throws Exception { public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) throws Exception { super.testIncompatibleMultiVersionSchema(enableDeduplication); } + + @Override + @Test + public void testTopicPoliciesReplicationRule() throws Exception { + super.testTopicPoliciesReplicationRule(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 5cbea8df1293f..450370a60d06f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -229,4 +229,10 @@ public void testRemoveCluster() throws Exception { public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) throws Exception { super.testIncompatibleMultiVersionSchema(enableDeduplication); } + + @Override + @Test + public void testTopicPoliciesReplicationRule() throws Exception { + super.testTopicPoliciesReplicationRule(); + } }