Skip to content

[improve][broker] Deny removing local cluster from topic level replicated cluster policy #24351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,12 @@ protected static boolean isNotFoundException(Throwable ex) {
== Status.NOT_FOUND.getStatusCode();
}

protected static boolean is4xxRestException(Throwable ex) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
return realCause instanceof WebApplicationException
&& (((WebApplicationException) realCause).getResponse().getStatus() % 100 == 4);
}

protected static boolean isConflictException(Throwable ex) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
return realCause instanceof WebApplicationException
Expand All @@ -984,6 +990,10 @@ protected static boolean isNot307And404And400Exception(Throwable ex) {
return !isRedirectException(ex) && !isNotFoundException(ex) && !isBadRequest(ex);
}

protected static boolean isNot307And4xxException(Throwable ex) {
return !isRedirectException(ex) && !is4xxRestException(ex);
}

protected static String getTopicNotFoundErrorMessage(String topic) {
return String.format("Topic %s not found", topic);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4988,7 +4988,7 @@ protected CompletableFuture<Void> internalRemoveSubscribeRate(boolean isGlobal)

protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) {
Throwable cause = thr.getCause();
if (isNot307And404And400Exception(cause)) {
if (isNot307And4xxException(cause)) {
log.error("[{}] Failed to perform {} on topic {}",
clientAppId(), methodName, topicName, cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
Expand All @@ -46,6 +47,7 @@
import javax.ws.rs.core.Response;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand Down Expand Up @@ -2332,7 +2334,26 @@ public void setReplicationClusters(
@ApiParam(value = "List of replication clusters", required = true) List<String> clusterIds) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> preValidation(authoritative)).thenCompose(__ -> {
// Set a topic-level replicated clusters that do not contain local cluster is not meaningful, except
// the following scenario: User has two clusters, which enabled Geo-Replication through a global
// metadata store, the resources named partitioned topic metadata and the resource namespace-level
// "replicated clusters" are shared between multi clusters. Pulsar can hardly delete a specify
// partitioned topic. To support this use case, the following steps can implement it:
// 1. set a global topic-level replicated clusters that do not contain local cluster.
// 2. the local cluster will remove the subtopics automatically, and remove the schemas and local
// topic policies. Just leave the global topic policies there, which prevents the namespace level
// replicated clusters policy taking affect.
// TODO But the API "pulsar-admin topics set-replication-clusters" does not support global policy,
// to support this scenario, a PIP is needed.
boolean clustersDoesNotContainsLocal = CollectionUtils.isEmpty(clusterIds)
|| !clusterIds.contains(pulsar().getConfig().getClusterName());
if (clustersDoesNotContainsLocal) {
return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED,
"Can not remove local cluster from the topic-level replication clusters policy"));
}
return CompletableFuture.completedFuture(null);
})
.thenCompose(__ -> internalSetReplicationClusters(clusterIds))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2015,9 +2015,15 @@ CompletableFuture<Void> deleteSchemaAndPoliciesIfClusterRemoved() {
}

});
// TODO regarding the topic level policies, it will be deleted at a seperate PR.
// Because there is an issue related to Global policies has not been solved so
// far.
// There are only one cases that will remove local clusters: using global metadata
// store, namespaces will share policies cross multi clusters, including
// "replicated clusters" and "partitioned topic metadata", we can hardly delete
// partitioned topic from one cluster and keep it exists in another. Removing
// local cluster from the namespace level "replicated clusters" can do this.
// TODO: there is no way to delete a specify partitioned topic if users have enabled
// Geo-Replication with a global metadata store, a PIP is needed.
// Since the system topic "__change_events" under the namespace will also be
// deleted, we can skip to delete topic-level policies.
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -96,6 +97,7 @@
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -3797,4 +3799,35 @@ public void testSetSubRateWithSub() throws Exception {
.ratePeriodInSecond(10)
.build());
}

@DataProvider
public Object[][] topicTypes() {
return new Object[][]{
{TopicType.PARTITIONED},
{TopicType.NON_PARTITIONED}
};
}

@Test(dataProvider = "topicTypes")
public void testRemoveLocalCluster(TopicType topicType) throws Exception {
String topic = "persistent://" + myNamespace + "/testSetSubRateWithSub";
if (TopicType.PARTITIONED.equals(topicType)) {
admin.topics().createNonPartitionedTopic(topic);
} else {
admin.topics().createPartitionedTopic(topic, 2);
}
try {
admin.topics().setReplicationClusters(topic, Arrays.asList("not-local-cluster"));
fail("Can not remove local cluster from the topic-level replication clusters policy");
} catch (PulsarAdminException.PreconditionFailedException e) {
assertTrue(e.getMessage().contains("Can not remove local cluster from the topic-level replication clusters"
+ " policy"));
}
// cleanup.
if (TopicType.PARTITIONED.equals(topicType)) {
admin.topics().delete(topic, false);
} else {
admin.topics().deletePartitionedTopic(topic, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -1422,4 +1423,88 @@ public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) thro
admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace,
SchemaCompatibilityStrategy.FORWARD);
}

/***
* Manually modifying topic policies by Rest API.
* - Global topic level policies:
* - Add: replicate
* - Update: replicate
* - Delete a single policy(it is equivalent to specify updating): delete both local and remote policies.
* - Local topic level policies:
* - Add: never replicate
* - Update: never replicate
* - Delete a single policy(it is equivalent to specify updating): delete local policies only.
* Delete Topic triggers that both local and global policies will be deleted in local cluster, but will not delete
* the remote cluster's global policies. This test case will be covered by
* "OneWayReplicatorUsingGlobalPartitionedTest.testRemoveCluster".
*/
@Test
public void testTopicPoliciesReplicationRule() throws Exception {
// Init Pulsar resources.
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
final TopicName topicNameObj = TopicName.get(topicName);
final String subscriptionName = "s1";
admin1.topics().createNonPartitionedTopic(topicName);
Producer<byte[]> producer1 = client1.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
waitReplicatorStarted(topicName);
producer1.close();
assertTrue(pulsar2.getPulsarResources().getTopicResources().persistentTopicExists(topicNameObj).join());
admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
admin1.topics().createSubscription(subscriptionName, topicName, MessageId.earliest);
admin2.topics().createSubscription(subscriptionName, topicName, MessageId.earliest);

// Case 1: Global topic level policies -> Add: replicate.
PublishRate publishRateAddGlobal = new PublishRate(100, 10000);
admin1.topicPolicies(true).setPublishRate(topicName, publishRateAddGlobal);
// Case 4: Local topic level policies -> Add: never replicate.
PublishRate publishRateAddLocal = new PublishRate(200, 20000);
admin1.topicPolicies(false).setPublishRate(topicName, publishRateAddLocal);
waitChangeEventsReplicated(replicatedNamespace);
Thread.sleep(2000);
Awaitility.await().untilAsserted(() -> {
PublishRate valueGlobal = admin2.topicPolicies(true).getPublishRate(topicName);
assertEquals(valueGlobal, publishRateAddGlobal);
PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName);
assertNull(valueLocal);
});

// Case 2: Global topic level policies -> Update: replicate.
PublishRate publishRateUpdateGlobal = new PublishRate(300, 30000);
admin1.topicPolicies(true).setPublishRate(topicName, publishRateUpdateGlobal);
// Case 5: Local topic level policies -> Update: never replicate.
PublishRate publishRateUpdateLocal = new PublishRate(400, 40000);
admin1.topicPolicies(false).setPublishRate(topicName, publishRateUpdateLocal);
waitChangeEventsReplicated(replicatedNamespace);
Thread.sleep(2000);
Awaitility.await().untilAsserted(() -> {
PublishRate valueGlobal = admin2.topicPolicies(true).getPublishRate(topicName);
assertEquals(valueGlobal, publishRateUpdateGlobal);
PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName);
assertNull(valueLocal);
});

// Case 3: Global topic level policies -> Delete: delete both local and remote policies.
admin1.topicPolicies(true).removePublishRate(topicName);
waitChangeEventsReplicated(replicatedNamespace);
Thread.sleep(2000);
Awaitility.await().untilAsserted(() -> {
PublishRate valueGlobal = admin2.topicPolicies(true).getPublishRate(topicName);
assertNull(valueGlobal);
});

// Case 6: Local topic level policies -> Delete: never replicate.
PublishRate publishRateAddLocal2 = new PublishRate(500, 50000);
admin2.topicPolicies(false).setPublishRate(topicName, publishRateAddLocal2);
Awaitility.await().untilAsserted(() -> {
PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName);
assertEquals(valueLocal, publishRateAddLocal2);
});
admin1.topicPolicies(false).removePublishRate(topicName);
waitChangeEventsReplicated(replicatedNamespace);
Thread.sleep(2000);
Awaitility.await().untilAsserted(() -> {
PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName);
assertEquals(valueLocal, publishRateAddLocal2);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
Expand All @@ -48,6 +51,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
Expand Down Expand Up @@ -506,4 +510,41 @@ protected void waitReplicatorStopped(PulsarService sourceCluster, PulsarService
|| !persistentTopic1.getReplicators().get(targetCluster.getConfig().getClusterName()).isConnected());
});
}

protected void waitChangeEventsReplicated(String ns) {
String topicName = "persistent://" + ns + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
TopicName topicNameObj = TopicName.get(topicName);
Optional<PartitionedTopicMetadata> metadata = pulsar1.getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources()
.getPartitionedTopicMetadataAsync(topicNameObj).join();
Function<Replicator, Boolean> ensureNoBacklog = new Function<Replicator,Boolean>() {

@Override
public Boolean apply(Replicator replicator) {
if (!replicator.getRemoteCluster().equals("c2")) {
return true;
}
PersistentReplicator persistentReplicator = (PersistentReplicator) replicator;
Position lac = persistentReplicator.getCursor().getManagedLedger().getLastConfirmedEntry();
Position mdPos = persistentReplicator.getCursor().getMarkDeletedPosition();
return mdPos.compareTo(lac) >= 0;
}
};
if (metadata.isPresent()) {
for (int index = 0; index < metadata.get().partitions; index++) {
String partitionName = topicNameObj.getPartition(index).toString();
PersistentTopic persistentTopic =
(PersistentTopic) pulsar1.getBrokerService().getTopic(partitionName, false).join().get();
persistentTopic.getReplicators().values().forEach(replicator -> {
assertTrue(ensureNoBacklog.apply(replicator));
});
}
} else {
PersistentTopic persistentTopic =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
persistentTopic.getReplicators().values().forEach(replicator -> {
assertTrue(ensureNoBacklog.apply(replicator));
});
}
}
}
Loading
Loading