diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 84d55430f8f4f..bfa1fdc812b7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -514,6 +514,9 @@ protected Policies getNamespacePolicies(String tenant, String cluster, String na return getNamespacePolicies(ns); } + /** + * Directly get the replication clusters for a namespace, without checking allowed clusters. + */ protected CompletableFuture> getNamespaceReplicatedClustersAsync(NamespaceName namespaceName) { return namespaceResources().getPoliciesAsync(namespaceName) .thenApply(policies -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index d24a3255b5556..0ae18fc2e54f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -59,6 +59,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamedEntity; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.ClusterData; @@ -792,9 +793,11 @@ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String clus .map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> { List> namespaceNamesInCluster = namespaces.stream() .map(namespaceName -> adminClient.namespaces().getPoliciesAsync(namespaceName) - .thenApply(policies -> policies.replication_clusters.contains(cluster) - ? namespaceName : null)) - .collect(Collectors.toList()); + .thenApply(policies -> { + boolean allowed = pulsar().getBrokerService() + .isCurrentClusterAllowed(NamespaceName.get(namespaceName), policies); + return allowed ? namespaceName : null; + })).collect(Collectors.toList()); return FutureUtil.waitForAll(namespaceNamesInCluster).thenApply( __ -> namespaceNamesInCluster.stream() .map(CompletableFuture::join) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 0e821c83e610d..4491701a60d99 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -27,7 +27,6 @@ import java.net.URI; import java.net.URL; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -221,7 +220,8 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime precheckWhenDeleteNamespace(namespaceName, force) .thenCompose(policies -> { final CompletableFuture> topicsFuture; - if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)){ + if (policies == null || !pulsar().getBrokerService() + .isCurrentClusterAllowed(namespaceName, policies)) { topicsFuture = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); } else { topicsFuture = pulsar().getNamespaceService().getFullListOfTopics(namespaceName); @@ -418,22 +418,25 @@ private CompletableFuture precheckWhenDeleteNamespace(NamespaceName ns return CompletableFuture.completedFuture(null); } Policies policies = policiesOpt.get(); - Set replicationClusters = policies.replication_clusters; - if (replicationClusters.size() > 1) { + // Just keep the behavior of V1 namespace being the same as before. + if (!nsName.isV2() && policies.replication_clusters.isEmpty() + && policies.allowed_clusters.isEmpty()) { + return CompletableFuture.completedFuture(policies); + } + String cluster = policies.getClusterThatCanDeleteNamespace(); + if (cluster == null) { // There are still more than one clusters configured for the global namespace throw new RestException(Status.PRECONDITION_FAILED, - "Cannot delete the global namespace " + nsName + ". There are still more than " - + "one replication clusters configured."); + "Cannot delete the global namespace " + nsName + ". There are still more than " + + "one replication clusters configured."); } - if (replicationClusters.size() == 1 - && !policies.replication_clusters.contains(config().getClusterName())) { + if (!cluster.equals(config().getClusterName())) { // the only replication cluster is other cluster, redirect - String replCluster = new ArrayList<>(policies.replication_clusters).get(0); - return clusterResources().getClusterAsync(replCluster) + return clusterResources().getClusterAsync(cluster) .thenCompose(replClusterDataOpt -> { ClusterData replClusterData = replClusterDataOpt .orElseThrow(() -> new RestException(Status.NOT_FOUND, - "Cluster " + replCluster + " does not exist")); + "Cluster " + cluster + " does not exist")); URL replClusterUrl; try { if (!replClusterData.isBrokerClientTlsEnabled()) { @@ -453,7 +456,7 @@ private CompletableFuture precheckWhenDeleteNamespace(NamespaceName ns .replaceQueryParam("authoritative", false).build(); if (log.isDebugEnabled()) { log.debug("[{}] Redirecting the rest call to {}: cluster={}", - clientAppId(), redirect, replCluster); + clientAppId(), redirect, cluster); } throw new WebApplicationException( Response.temporaryRedirect(redirect).build()); @@ -503,22 +506,25 @@ protected CompletableFuture internalDeleteNamespaceBundleAsync(String bund .thenCompose(policies -> { CompletableFuture future = CompletableFuture.completedFuture(null); if (namespaceName.isGlobal()) { - - if (policies.replication_clusters.size() > 1) { + // Just keep the behavior of V1 namespace being the same as before. + if (!namespaceName.isV2() && policies.replication_clusters.isEmpty() + && policies.allowed_clusters.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + String cluster = policies.getClusterThatCanDeleteNamespace(); + if (cluster == null) { // There are still more than one clusters configured for the global namespace throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace " + namespaceName + ". There are still more than one replication clusters configured."); } - if (policies.replication_clusters.size() == 1 - && !policies.replication_clusters.contains(config().getClusterName())) { + if (!cluster.equals(config().getClusterName())) { // No need to change. // the only replication cluster is other cluster, redirect - String replCluster = new ArrayList<>(policies.replication_clusters).get(0); - future = clusterResources().getClusterAsync(replCluster) + future = clusterResources().getClusterAsync(cluster) .thenCompose(clusterData -> { if (clusterData.isEmpty()) { throw new RestException(Status.NOT_FOUND, - "Cluster " + replCluster + " does not exist"); + "Cluster " + cluster + " does not exist"); } ClusterData replClusterData = clusterData.get(); URL replClusterUrl; @@ -542,7 +548,7 @@ protected CompletableFuture internalDeleteNamespaceBundleAsync(String bund .replaceQueryParam("authoritative", false).build(); if (log.isDebugEnabled()) { log.debug("[{}] Redirecting the rest call to {}: cluster={}", - clientAppId(), redirect, replCluster); + clientAppId(), redirect, cluster); } throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); }); @@ -739,6 +745,9 @@ protected CompletableFuture internalRevokePermissionsOnSubscriptionAsync(S subscriptionName, role, null/* additional auth-data json */)); } + /** + * Directly get the replication clusters for a namespace, without checking allowed clusters. + */ protected CompletableFuture> internalGetNamespaceReplicationClustersAsync() { return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION, PolicyOperation.READ) .thenAccept(__ -> { @@ -778,21 +787,19 @@ protected CompletableFuture internalSetNamespaceReplicationClusters(List getNamespacePoliciesAsync(this.namespaceName) - .thenCompose(nsPolicies -> { - if (nsPolicies.allowed_clusters.isEmpty()) { - return validateClusterForTenantAsync( - namespaceName.getTenant(), clusterId); - } - if (!nsPolicies.allowed_clusters.contains(clusterId)) { - String msg = String.format("Cluster [%s] is not in the " - + "list of allowed clusters list for namespace " - + "[%s]", clusterId, namespaceName.toString()); - log.info(msg); - throw new RestException(Status.FORBIDDEN, msg); - } - return CompletableFuture.completedFuture(null); - })); + .thenCompose(__ -> getNamespacePoliciesAsync(this.namespaceName) + .thenCompose(nsPolicies -> { + if (!Policies.checkNewReplicationClusters(nsPolicies, + replicationClusterSet)) { + String msg = String.format("Cluster [%s] is not in the " + + "list of allowed clusters list for namespace " + + "[%s]", clusterId, namespaceName.toString()); + log.info(msg); + throw new RestException(Status.BAD_REQUEST, msg); + } + return validateClusterForTenantAsync( + namespaceName.getTenant(), clusterId); + })); }).collect(Collectors.toList()); return FutureUtil.waitForAll(futures).thenApply(__ -> replicationClusterSet); })) @@ -1967,13 +1974,17 @@ protected BundlesData validateBundlesData(BundlesData initialBundles) { } private CompletableFuture validatePoliciesAsync(NamespaceName ns, Policies policies) { - if (ns.isV2() && policies.replication_clusters.isEmpty()) { - // Default to local cluster - policies.replication_clusters = Collections.singleton(config().getClusterName()); + if (!policies.checkAllowedAndReplicationClusters()) { + String msg = String.format("[%s] All replication clusters should be included in allowed clusters." + + " Repl clusters: %s, allowed clusters: %s", + ns.toString(), policies.replication_clusters, policies.allowed_clusters); + log.info(msg); + throw new RestException(Status.BAD_REQUEST, msg); } + pulsar().getBrokerService().setCurrentClusterAllowedIfNoClusterIsAllowed(ns, policies); // Validate cluster names and permissions - return policies.replication_clusters.stream() + return Stream.concat(policies.replication_clusters.stream(), policies.allowed_clusters.stream()) .map(cluster -> validateClusterForTenantAsync(ns.getTenant(), cluster)) .reduce(CompletableFuture.completedFuture(null), (a, b) -> a.thenCompose(ignore -> b)) .thenAccept(__ -> { @@ -2864,16 +2875,15 @@ protected CompletableFuture internalSetNamespaceAllowedClusters(List { - namespacePolicies.replication_clusters.forEach(replicationCluster -> { - if (!clusterIds.contains(replicationCluster)) { - throw new RestException(Status.BAD_REQUEST, - String.format("Allowed clusters do not contain the replication cluster %s. " - + "Please remove the replication cluster if the cluster is not allowed " - + "for this namespace", replicationCluster)); - } - }); - return Sets.newHashSet(clusterIds); + return getNamespacePoliciesAsync(this.namespaceName).thenApply(nsPolicies -> { + Set clusterSet = Sets.newHashSet(clusterIds); + if (!Policies.checkNewAllowedClusters(nsPolicies, clusterSet)){ + throw new RestException(Status.BAD_REQUEST, + String.format("Allowed clusters do not contain the replication cluster %s. " + + "Please remove the replication cluster if the cluster is not allowed " + + "for this namespace", nsPolicies.replication_clusters)); + } + return clusterSet; }); }) // Verify the allowed clusters are valid and they do not contain the peer clusters. @@ -2896,6 +2906,9 @@ protected CompletableFuture internalSetNamespaceAllowedClusters(List> internalGetNamespaceAllowedClustersAsync() { return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.READ) .thenAccept(__ -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ec61d58d2afe4..9318246afc13e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -514,7 +514,7 @@ private CompletableFuture> getReplicationClusters() { return CompletableFuture.completedFuture(null); } // Query the topic-level policies only if the namespace-level policies exist. - // Global policies does not affet Replication. + // Global policies does not affect Replication. final var namespacePolicies = optionalPolicies.get(); return pulsar().getTopicPoliciesService().getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 3ec6f5a0cd5e6..50f0f96934287 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -96,6 +96,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60; protected final String topic; + protected final NamespaceName namespace; // Reference to the CompletableFuture returned when creating this topic in BrokerService. // Used to safely remove the topic from BrokerService's cache by ensuring we remove the exact @@ -182,6 +183,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { public AbstractTopic(String topic, BrokerService brokerService) { this.topic = topic; + this.namespace = TopicName.get(topic).getNamespaceObject(); this.clock = brokerService.getClock(); this.brokerService = brokerService; this.producers = new ConcurrentHashMap<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 79dffdf7aadef..6a8c7f60e3cb8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -155,6 +155,7 @@ import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor; +import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; @@ -2758,26 +2759,27 @@ private void unloadDeletedReplNamespace(Policies data, NamespaceName namespace) return; } final String localCluster = this.pulsar.getConfiguration().getClusterName(); - if (!data.replication_clusters.contains(localCluster)) { - pulsar().getNamespaceService().getNamespaceBundleFactory() - .getBundlesAsync(namespace).thenAccept(bundles -> { - bundles.getBundles().forEach(bundle -> { - pulsar.getNamespaceService().isNamespaceBundleOwned(bundle).thenAccept(isExist -> { - if (isExist) { - this.pulsar().getExecutor().execute(() -> { - try { - pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespace.toString(), - bundle.getBundleRange()); - } catch (Exception e) { - log.error("Failed to unload namespace-bundle {} that not owned by {}, {}", - bundle.toString(), localCluster, e.getMessage()); - } - }); - } - }); + if (pulsar.getBrokerService().isCurrentClusterAllowed(namespace, data)) { + return; + } + pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundlesAsync(namespace).thenAccept(bundles -> { + bundles.getBundles().forEach(bundle -> { + pulsar.getNamespaceService().isNamespaceBundleOwned(bundle).thenAccept(isExist -> { + if (isExist) { + this.pulsar().getExecutor().execute(() -> { + try { + pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespace.toString(), + bundle.getBundleRange()); + } catch (Exception e) { + log.error("Failed to unload namespace-bundle {} that not owned by {}, {}", + bundle.toString(), localCluster, e.getMessage()); + } + }); + } }); }); - } + }); } public PulsarService pulsar() { @@ -3878,4 +3880,41 @@ private TopicFactory createPersistentTopicFactory() throws Exception { public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) { this.pulsarChannelInitFactory = factory; } + + /*** + * After PIP-321 Introduce allowed-cluster at the namespace level, the condition that whether the cluster is + * allowed to access by the current cluster was defined by two fields: + * - {@link Policies#replication_clusters} + * - {@link Policies#allowed_clusters} + * {@link Policies#allowed_clusters} has higher priority. Once it's set, {@link Policies#replication_clusters} only + * means the default replication clusters for the topics under the namespace. + */ + public boolean isCurrentClusterAllowed(NamespaceName nsName, Policies nsPolicies) { + // Compatibility with v1 version namespace. + if (Constants.GLOBAL_CLUSTER.equalsIgnoreCase(nsName.getCluster())) { + return nsPolicies.replication_clusters.contains(pulsar.getConfig().getClusterName()); + } + // If allowed clusters has been set, only check allowed clusters. + if (!nsPolicies.allowed_clusters.isEmpty()) { + return nsPolicies.allowed_clusters.contains(pulsar.getConfig().getClusterName()); + } + // Otherwise, replication clusters means allowed clusters. + return nsPolicies.replication_clusters.contains(pulsar.getConfig().getClusterName()); + } + + public void setCurrentClusterAllowedIfNoClusterIsAllowed(NamespaceName nsName, Policies nsPolicies) { + // Compatibility with v1 version namespace. + if (!nsName.isV2()) { + return; + } + if (nsPolicies.replication_clusters.contains(pulsar.getConfig().getClusterName()) + || nsPolicies.allowed_clusters.contains(pulsar.getConfig().getClusterName())) { + return; + } + if (nsPolicies.replication_clusters.isEmpty()) { + nsPolicies.replication_clusters.add(pulsar.getConfig().getClusterName()); + } else { + nsPolicies.allowed_clusters.add(pulsar.getConfig().getClusterName()); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 58ca93a1b3cee..4b28171ea99c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2088,17 +2088,17 @@ protected CompletableFuture removeTopicIfLocalClusterNotAllowed() { } protected CompletableFuture checkAllowedCluster(String localCluster) { - List replicationClusters = topicPolicies.getReplicationClusters().get(); + List topicRepls = topicPolicies.getReplicationClusters().get(); return brokerService.pulsar().getPulsarResources().getNamespaceResources() - .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(policiesOptional -> { + .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(nsPolicies -> { Set allowedClusters = Set.of(); - if (policiesOptional.isPresent()) { - allowedClusters = policiesOptional.get().allowed_clusters; + if (nsPolicies.isPresent()) { + allowedClusters = nsPolicies.get().allowed_clusters; } - if (TopicName.get(topic).isGlobal() && !replicationClusters.contains(localCluster) + if (TopicName.get(topic).isGlobal() && !topicRepls.contains(localCluster) && !allowedClusters.contains(localCluster)) { log.warn("Local cluster {} is not part of global namespace repl list {} and allowed list {}", - localCluster, replicationClusters, allowedClusters); + localCluster, topicRepls, allowedClusters); return CompletableFuture.completedFuture(false); } else { return CompletableFuture.completedFuture(true); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 353f2fa6f2ecd..65489eaa34b43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -912,8 +912,7 @@ public static CompletableFuture checkLocalOrGetPeerReplicationC localCluster, namespace.toString()); log.warn(msg); validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg)); - } else if (!policies.replication_clusters.contains(localCluster) && !policies.allowed_clusters - .contains(localCluster)) { + } else if (!pulsarService.getBrokerService().isCurrentClusterAllowed(namespace, policies)) { getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters, policies.allowed_clusters) .thenAccept(ownerPeerCluster -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java index da7d95d677af8..09d8adcacc8dc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceIsolationMultiBrokersTest.java @@ -77,7 +77,8 @@ public void setupClusters() throws Exception { public void testNamespaceIsolationPolicyForReplNS() throws Exception { // Verify that namespace is not present in cluster-2. - Set replicationClusters = localAdmin.namespaces().getPolicies("prop-ig/ns1").replication_clusters; + Set replicationClusters = localAdmin.namespaces() + .getPolicies("prop-ig/ns1").replication_clusters; // Nothing is needed to be changed. Assert.assertFalse(replicationClusters.contains("cluster-2")); // setup ns-isolation-policy in both the clusters. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java index 2b24265a36434..c8a75af36602b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java @@ -278,7 +278,7 @@ public void testValidateReplicationSettingsOnNamespace() throws Exception { destLookup.lookupTopicAsync(asyncResponse, TopicDomain.persistent.value(), property, cluster, ns2, "invalid-localCluster", false, null, null); verify(asyncResponse).resume(arg.capture()); - assertEquals(arg.getValue().getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); + assertEquals(arg.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 094a29c1c6c53..5d88cd3881384 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -868,6 +868,7 @@ public void testAllowedClustersAtNamespaceLevelShouldBeIncludedInAllowedClusters "Cluster [r3] is not in the list of allowed clusters list for tenant [my-tenant]"); } // 3. Clean up + admin.namespaces().setNamespaceAllowedClusters(namespace, Set.of(pulsar.getConfig().getClusterName())); admin.namespaces().deleteNamespace(namespace, true); admin.tenants().deleteTenant(tenant, true); for (String cluster : clusters) { @@ -935,7 +936,9 @@ public void testNewAllowedClusterAdminAPIAndItsImpactOnReplicationClusterAPI() t namespaces.setNamespaceReplicationClusters(namespace, replicationClustersExcel); fail(); //Todo: The status code in the old implementation is confused. - } catch (PulsarAdminException.NotAuthorizedException ignore) {} + } catch (PulsarAdminException ignore) { + assertTrue(ignore.getMessage().contains("allowed clusters list")); + } // 2.2 Peer cluster can not be a part of the allowed clusters. LinkedHashSet peerCluster = new LinkedHashSet<>(); @@ -950,6 +953,7 @@ public void testNewAllowedClusterAdminAPIAndItsImpactOnReplicationClusterAPI() t // CleanUp: Namespace with replication clusters can not be deleted by force. namespaces.setNamespaceReplicationClusters(namespace, Set.of(conf.getClusterName())); + namespaces.setNamespaceAllowedClusters(namespace, Set.of(conf.getClusterName())); admin.namespaces().deleteNamespace(namespace, true); admin.tenants().deleteTenant(tenant, true); for (String cluster : clusters) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 7e20bdb8780ad..4f8c5f8ded285 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -24,9 +24,13 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.time.Duration; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -37,11 +41,16 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; +import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; +import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; @@ -545,4 +554,132 @@ public void testCloseTopicAfterStartReplicationFailed() throws Exception { public void testPartitionedTopicWithTopicPolicyAndNoReplicationClusters() throws Exception { super.testPartitionedTopicWithTopicPolicyAndNoReplicationClusters(); } + + @Test + public void testUpdateNamespaceIsolationPolicy() throws Exception { + // Create a namespace and allow both clusters to access. + final String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", ""); + final String topic = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp"); + admin2.namespaces().createNamespace(ns1); + admin2.namespaces().setNamespaceAllowedClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2))); + admin1.topics().createNonPartitionedTopic(topic); + Producer p = client1.newProducer(Schema.STRING).topic(topic).create(); + PersistentTopic persistentTopic = (PersistentTopic) pulsar1.getBrokerService().getTopic(topic, false) + .join().get(); + + // Set namespace isolation policy. + // It will trigger a namespace unloading. + String policyName = "policy-1"; + String namespaceRegex = ns1; + String brokerName = pulsar1.getAdvertisedAddress(); + Map parameters1 = new HashMap<>(); + parameters1.put("min_limit", "1"); + parameters1.put("usage_threshold", "100"); + NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder() + .namespaces(Collections.singletonList(namespaceRegex)) + .primary(Collections.singletonList(brokerName)) + .secondary(Collections.singletonList(brokerName + ".*")) + .autoFailoverPolicy(AutoFailoverPolicyData.builder() + .policyType(AutoFailoverPolicyType.min_available) + .parameters(parameters1) + .build()) + .build(); + admin1.clusters().createNamespaceIsolationPolicy(cluster1, policyName, nsPolicyData1); + + // Verify: the namespace was unloaded. + Awaitility.await().untilAsserted(() -> { + assertTrue(persistentTopic.isClosingOrDeleting()); + }); + // Verify: the producer still works. + p.send("msg-1"); + + // cleanup. + p.close(); + admin1.clusters().deleteNamespaceIsolationPolicy(cluster1, policyName); + admin1.topics().delete(topic); + } + + /** + * Namespace deletion should not be allowed if more than one cluster is allowed to access. + */ + @Test + public void testDeleteNamespaceIfTwoClustersAllowed() throws Exception { + // Create a namespace and allow both clusters to access. + final String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", ""); + admin1.namespaces().createNamespace(ns1); + Awaitility.await().untilAsserted(() -> { + List clusters = admin1.namespaces().getNamespaceReplicationClusters(ns1); + assertEquals(clusters.size(), 1); + assertTrue(clusters.contains(cluster1)); + }); + admin1.namespaces().setNamespaceAllowedClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2))); + Awaitility.await().untilAsserted(() -> { + List clusters = admin1.namespaces().getNamespaceAllowedClusters(ns1); + assertEquals(clusters.size(), 2); + assertTrue(clusters.contains(cluster1)); + assertTrue(clusters.contains(cluster2)); + }); + try { + admin1.namespaces().deleteNamespace(ns1); + fail("namespace deletion should not be allowed if more than one cluster to access"); + } catch (PulsarAdminException.PreconditionFailedException e) { + // expected. + } + } + + @Test + public void testSetClustersAndAllowedClusters() throws Exception { + final String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", ""); + admin1.namespaces().createNamespace(ns1); + Awaitility.await().untilAsserted(() -> { + List clusters = admin1.namespaces().getNamespaceReplicationClusters(ns1); + assertEquals(clusters.size(), 1); + assertTrue(clusters.contains(cluster1)); + }); + + // New allowed clusters should include all replication clusters + try { + admin1.namespaces().setNamespaceAllowedClusters(ns1, new HashSet<>(Arrays.asList(cluster2))); + fail("New allowed clusters should include all replication clusters."); + } catch (PulsarAdminException e) { + assertTrue(e.getMessage().contains("do not contain the replication cluster")); + } + + admin1.namespaces().setNamespaceAllowedClusters(ns1, new HashSet<>(Arrays.asList(cluster1))); + + // New replication clusters should be included in allowed clusters. + try { + admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2))); + fail("New replication clusters should be included in allowed clusters."); + } catch (PulsarAdminException e) { + assertTrue(e.getMessage().contains("is not in the list of allowed clusters list")); + } + } + + @Test + public void testUpdateNamespacePolicies() throws Exception { + // Create a namespace and allow both clusters to access. + final String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", ""); + final String topic = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp"); + admin2.namespaces().createNamespace(ns1); + admin2.namespaces().setNamespaceAllowedClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2))); + admin1.topics().createNonPartitionedTopic(topic); + Producer p = client1.newProducer(Schema.STRING).topic(topic).create(); + PersistentTopic persistentTopic = (PersistentTopic) pulsar1.getBrokerService().getTopic(topic, false) + .join().get(); + + admin1.namespaces().setRetention(ns1, new RetentionPolicies(10, 10)); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin1.namespaces().getRetention(ns1), new RetentionPolicies(10, 10)); + }); + + // Verify: the namespace will not be unloaded, because the topic can be updated in memory. + assertFalse(persistentTopic.isClosingOrDeleting()); + // Verify: the producer still works. + p.send("msg-1"); + + // cleanup. + p.close(); + admin1.topics().delete(topic); + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index d5e08a1f50cc0..a24df3e7ad442 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -36,6 +36,10 @@ public class Policies { public final AuthPolicies auth_policies = AuthPolicies.builder().build(); @SuppressWarnings("checkstyle:MemberName") public Set replication_clusters = new HashSet<>(); + /** + * This field has a unique usage: defines whether a namespace is allowed to access by the current cluster. + * Instead of access this field directly, please call {@link BrokerService#isCurrentClusterAllowed}. + */ @SuppressWarnings("checkstyle:MemberName") public Set allowed_clusters = new HashSet<>(); public BundlesData bundles; @@ -216,9 +220,64 @@ public boolean equals(Object obj) { && Objects.equals(dispatcherPauseOnAckStatePersistentEnabled, other.dispatcherPauseOnAckStatePersistentEnabled); } - return false; } + /** + * Get the cluster that can delete the namespace. + */ + public String getClusterThatCanDeleteNamespace() { + if (this.replication_clusters.size() != 1 || this.allowed_clusters.size() > 1) { + return null; + } + String cluster = this.replication_clusters.iterator().next(); + // The namespace can be deleted if the current cluster is the only one cluster who can access it. + if (!this.allowed_clusters.isEmpty() && this.allowed_clusters.contains(cluster)) { + return cluster; + } else { + return cluster; + } + } + + /** + * Replication clusters should be included in allowed clusters. + */ + public static boolean checkNewReplicationClusters(Policies oldNsPolicies, Set newReplicationClusters) { + if (oldNsPolicies.allowed_clusters.isEmpty()) { + return true; + } + for (String newCluster : newReplicationClusters) { + if (!oldNsPolicies.allowed_clusters.contains(newCluster)) { + return false; + } + } + return true; + } + + /** + * Allowed cluster should contain all clusters that are defined in replication clusters. + */ + public static boolean checkNewAllowedClusters(Policies oldNsPolicies, Set newAllowedClusters) { + for (String oldReplicationCluster : oldNsPolicies.replication_clusters) { + if (!newAllowedClusters.contains(oldReplicationCluster)) { + return false; + } + } + return true; + } + /** + * Replication clusters should be included in allowed clusters if allowed clusters are not empty. + */ + public boolean checkAllowedAndReplicationClusters() { + if (this.allowed_clusters.isEmpty()) { + return true; + } + for (String replicationCluster : this.replication_clusters) { + if (!this.allowed_clusters.contains(replicationCluster)) { + return false; + } + } + return true; + } }