Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ protected Policies getNamespacePolicies(String tenant, String cluster, String na
return getNamespacePolicies(ns);
}

/**
* Directly get the replication clusters for a namespace, without checking allowed clusters.
*/
protected CompletableFuture<Set<String>> getNamespaceReplicatedClustersAsync(NamespaceName namespaceName) {
return namespaceResources().getPoliciesAsync(namespaceName)
.thenApply(policies -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -792,9 +793,11 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String clus
.map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
List<CompletableFuture<String>> namespaceNamesInCluster = namespaces.stream()
.map(namespaceName -> adminClient.namespaces().getPoliciesAsync(namespaceName)
.thenApply(policies -> policies.replication_clusters.contains(cluster)
? namespaceName : null))
.collect(Collectors.toList());
.thenApply(policies -> {
boolean allowed = pulsar().getBrokerService()
.isCurrentClusterAllowed(NamespaceName.get(namespaceName), policies);
return allowed ? namespaceName : null;
})).collect(Collectors.toList());
return FutureUtil.waitForAll(namespaceNamesInCluster).thenApply(
__ -> namespaceNamesInCluster.stream()
.map(CompletableFuture::join)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime
precheckWhenDeleteNamespace(namespaceName, force)
.thenCompose(policies -> {
final CompletableFuture<List<String>> topicsFuture;
if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)){
if (policies == null || !pulsar().getBrokerService()
.isCurrentClusterAllowed(namespaceName, policies)) {
topicsFuture = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
} else {
topicsFuture = pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
Expand Down Expand Up @@ -418,22 +419,20 @@ private CompletableFuture<Policies> precheckWhenDeleteNamespace(NamespaceName ns
return CompletableFuture.completedFuture(null);
}
Policies policies = policiesOpt.get();
Set<String> replicationClusters = policies.replication_clusters;
if (replicationClusters.size() > 1) {
String cluster = policies.getClusterThatCanDeleteNamespace();
if (cluster == null) {
// There are still more than one clusters configured for the global namespace
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot delete the global namespace " + nsName + ". There are still more than "
+ "one replication clusters configured.");
"Cannot delete the global namespace " + nsName + ". There are still more than "
+ "one replication clusters configured.");
}
if (replicationClusters.size() == 1
&& !policies.replication_clusters.contains(config().getClusterName())) {
if (!cluster.equals(config().getClusterName())) {
// the only replication cluster is other cluster, redirect
String replCluster = new ArrayList<>(policies.replication_clusters).get(0);
return clusterResources().getClusterAsync(replCluster)
return clusterResources().getClusterAsync(cluster)
.thenCompose(replClusterDataOpt -> {
ClusterData replClusterData = replClusterDataOpt
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluster " + replCluster + " does not exist"));
"Cluster " + cluster + " does not exist"));
URL replClusterUrl;
try {
if (!replClusterData.isBrokerClientTlsEnabled()) {
Expand All @@ -453,7 +452,7 @@ private CompletableFuture<Policies> precheckWhenDeleteNamespace(NamespaceName ns
.replaceQueryParam("authoritative", false).build();
if (log.isDebugEnabled()) {
log.debug("[{}] Redirecting the rest call to {}: cluster={}",
clientAppId(), redirect, replCluster);
clientAppId(), redirect, cluster);
}
throw new WebApplicationException(
Response.temporaryRedirect(redirect).build());
Expand Down Expand Up @@ -503,22 +502,20 @@ protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bund
.thenCompose(policies -> {
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if (namespaceName.isGlobal()) {

if (policies.replication_clusters.size() > 1) {
String cluster = policies.getClusterThatCanDeleteNamespace();
if (cluster == null) {
// There are still more than one clusters configured for the global namespace
throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
+ namespaceName
+ ". There are still more than one replication clusters configured.");
}
if (policies.replication_clusters.size() == 1
&& !policies.replication_clusters.contains(config().getClusterName())) {
if (!cluster.equals(config().getClusterName())) { // No need to change.
// the only replication cluster is other cluster, redirect
String replCluster = new ArrayList<>(policies.replication_clusters).get(0);
future = clusterResources().getClusterAsync(replCluster)
future = clusterResources().getClusterAsync(cluster)
.thenCompose(clusterData -> {
if (clusterData.isEmpty()) {
throw new RestException(Status.NOT_FOUND,
"Cluster " + replCluster + " does not exist");
"Cluster " + cluster + " does not exist");
}
ClusterData replClusterData = clusterData.get();
URL replClusterUrl;
Expand All @@ -542,7 +539,7 @@ protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bund
.replaceQueryParam("authoritative", false).build();
if (log.isDebugEnabled()) {
log.debug("[{}] Redirecting the rest call to {}: cluster={}",
clientAppId(), redirect, replCluster);
clientAppId(), redirect, cluster);
}
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
});
Expand Down Expand Up @@ -739,6 +736,9 @@ protected CompletableFuture<Void> internalRevokePermissionsOnSubscriptionAsync(S
subscriptionName, role, null/* additional auth-data json */));
}

/**
* Directly get the replication clusters for a namespace, without checking allowed clusters.
*/
protected CompletableFuture<Set<String>> internalGetNamespaceReplicationClustersAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION, PolicyOperation.READ)
.thenAccept(__ -> {
Expand Down Expand Up @@ -778,21 +778,19 @@ protected CompletableFuture<Void> internalSetNamespaceReplicationClusters(List<S
"Invalid cluster id: " + clusterId);
}
return validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
.thenCompose(__ -> getNamespacePoliciesAsync(this.namespaceName)
.thenCompose(nsPolicies -> {
if (nsPolicies.allowed_clusters.isEmpty()) {
return validateClusterForTenantAsync(
namespaceName.getTenant(), clusterId);
}
if (!nsPolicies.allowed_clusters.contains(clusterId)) {
String msg = String.format("Cluster [%s] is not in the "
+ "list of allowed clusters list for namespace "
+ "[%s]", clusterId, namespaceName.toString());
log.info(msg);
throw new RestException(Status.FORBIDDEN, msg);
}
return CompletableFuture.completedFuture(null);
}));
.thenCompose(__ -> getNamespacePoliciesAsync(this.namespaceName)
.thenCompose(nsPolicies -> {
if (!Policies.checkNewReplicationClusters(nsPolicies,
replicationClusterSet)) {
String msg = String.format("Cluster [%s] is not in the "
+ "list of allowed clusters list for namespace "
+ "[%s]", clusterId, namespaceName.toString());
log.info(msg);
throw new RestException(Status.BAD_REQUEST, msg);
}
return validateClusterForTenantAsync(
namespaceName.getTenant(), clusterId);
}));
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__ -> replicationClusterSet);
}))
Expand Down Expand Up @@ -1967,13 +1965,17 @@ protected BundlesData validateBundlesData(BundlesData initialBundles) {
}

private CompletableFuture<Void> validatePoliciesAsync(NamespaceName ns, Policies policies) {
if (ns.isV2() && policies.replication_clusters.isEmpty()) {
// Default to local cluster
policies.replication_clusters = Collections.singleton(config().getClusterName());
if (!policies.checkAllowedAndReplicationClusters()) {
String msg = String.format("[%s] All replication clusters should be included in allowed clusters."
+ " Repl clusters: %s, allowed clusters: %s",
ns.toString(), policies.replication_clusters, policies.allowed_clusters);
log.info(msg);
throw new RestException(Status.FORBIDDEN, msg);
}
pulsar().getBrokerService().setCurrentClusterAllowedIfNoClusterIsAllowed(ns, policies);

// Validate cluster names and permissions
return policies.replication_clusters.stream()
return Stream.concat(policies.replication_clusters.stream(), policies.allowed_clusters.stream())
.map(cluster -> validateClusterForTenantAsync(ns.getTenant(), cluster))
.reduce(CompletableFuture.completedFuture(null), (a, b) -> a.thenCompose(ignore -> b))
.thenAccept(__ -> {
Expand Down Expand Up @@ -2864,16 +2866,15 @@ protected CompletableFuture<Void> internalSetNamespaceAllowedClusters(List<Strin
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot specify global in the list of allowed clusters");
}
return getNamespacePoliciesAsync(this.namespaceName).thenApply(namespacePolicies -> {
namespacePolicies.replication_clusters.forEach(replicationCluster -> {
if (!clusterIds.contains(replicationCluster)) {
throw new RestException(Status.BAD_REQUEST,
String.format("Allowed clusters do not contain the replication cluster %s. "
+ "Please remove the replication cluster if the cluster is not allowed "
+ "for this namespace", replicationCluster));
}
});
return Sets.newHashSet(clusterIds);
return getNamespacePoliciesAsync(this.namespaceName).thenApply(nsPolicies -> {
Set<String> clusterSet = Sets.newHashSet(clusterIds);
if (!Policies.checkNewAllowedClusters(nsPolicies, clusterSet)){
throw new RestException(Status.BAD_REQUEST,
String.format("Allowed clusters do not contain the replication cluster %s. "
+ "Please remove the replication cluster if the cluster is not allowed "
+ "for this namespace", nsPolicies.replication_clusters));
}
return clusterSet;
});
})
// Verify the allowed clusters are valid and they do not contain the peer clusters.
Expand All @@ -2896,6 +2897,9 @@ protected CompletableFuture<Void> internalSetNamespaceAllowedClusters(List<Strin
}));
}

/**
* Directly get the allowed clusters for a namespace, without checking replication clusters.
*/
protected CompletableFuture<Set<String>> internalGetNamespaceAllowedClustersAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.READ)
.thenAccept(__ -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ private CompletableFuture<Set<String>> getReplicationClusters() {
return CompletableFuture.completedFuture(null);
}
// Query the topic-level policies only if the namespace-level policies exist.
// Global policies does not affet Replication.
// Global policies does not affect Replication.
final var namespacePolicies = optionalPolicies.get();
return pulsar().getTopicPoliciesService().getTopicPoliciesAsync(topicName,
TopicPoliciesService.GetType.LOCAL_ONLY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener {
protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;

protected final String topic;
protected final NamespaceName namespace;

// Reference to the CompletableFuture returned when creating this topic in BrokerService.
// Used to safely remove the topic from BrokerService's cache by ensuring we remove the exact
Expand Down Expand Up @@ -182,6 +183,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener {

public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.namespace = TopicName.get(topic).getNamespaceObject();
this.clock = brokerService.getClock();
this.brokerService = brokerService;
this.producers = new ConcurrentHashMap<>();
Expand Down
Loading
Loading