|
96 | 96 | import com.couchbase.client.core.topology.ClusterIdentifierUtil; |
97 | 97 | import com.couchbase.client.core.topology.ClusterTopology; |
98 | 98 | import com.couchbase.client.core.topology.ClusterTopologyWithBucket; |
| 99 | +import com.couchbase.client.core.topology.HostAndServicePorts; |
99 | 100 | import com.couchbase.client.core.topology.NodeIdentifier; |
100 | 101 | import com.couchbase.client.core.transaction.cleanup.CoreTransactionsCleanup; |
101 | 102 | import com.couchbase.client.core.transaction.context.CoreTransactionsContext; |
|
104 | 105 | import com.couchbase.client.core.util.Deadline; |
105 | 106 | import com.couchbase.client.core.util.LatestStateSubscription; |
106 | 107 | import com.couchbase.client.core.util.NanoTimestamp; |
107 | | -import org.slf4j.LoggerFactory; |
108 | 108 | import org.slf4j.Logger; |
| 109 | +import org.slf4j.LoggerFactory; |
109 | 110 | import reactor.core.Disposable; |
110 | 111 | import reactor.core.publisher.Flux; |
111 | 112 | import reactor.core.publisher.Mono; |
|
135 | 136 | import static com.couchbase.client.core.util.ConnectionStringUtil.sanityCheckPorts; |
136 | 137 | import static java.util.Collections.emptySet; |
137 | 138 | import static java.util.Objects.requireNonNull; |
| 139 | +import static java.util.stream.Collectors.toList; |
| 140 | +import static java.util.stream.Collectors.toSet; |
138 | 141 |
|
139 | 142 | /** |
140 | 143 | * The main entry point into the core layer. |
@@ -823,18 +826,35 @@ private Mono<Void> reconfigureBuckets(final Flux<ClusterTopologyWithBucket> buck |
823 | 826 | .then(); |
824 | 827 | } |
825 | 828 |
|
| 829 | + /** |
| 830 | + * Returns the given topology's nodes, but with the KV service removed from nodes that aren't hosting the bucket. |
| 831 | + * <p> |
| 832 | + * This prevents the SDK from creating doomed KV endpoints that can never connect because |
| 833 | + * "select bucket" returns an error when the node isn't hosting the bucket. |
| 834 | + */ |
| 835 | + private static List<HostAndServicePorts> nodesWithoutIrrelevantKvServices(ClusterTopologyWithBucket topology) { |
| 836 | + Set<NodeIdentifier> nodeIdsServicingBucket = |
| 837 | + topology.bucket().nodes().stream() |
| 838 | + .map(HostAndServicePorts::id) |
| 839 | + .collect(toSet()); |
| 840 | + |
| 841 | + return topology.nodes().stream() |
| 842 | + .map(node -> nodeIdsServicingBucket.contains(node.id()) ? node : node.without(ServiceType.KV)) |
| 843 | + .collect(toList()); |
| 844 | + } |
| 845 | + |
826 | 846 | /** |
827 | 847 | * @param bucketName pass non-null if using the topology to configure bucket-scoped services. |
828 | | - * |
829 | | - * @implNote Maybe in the future we can inspect the ClusterTopology to see if it has a BucketTopology, |
830 | | - * and get the bucket name from there. However, let's make it explicit for now; this leaves the door open |
831 | | - * to using a ClusterTopologyWithBucket to configure global services (by passing a null bucket name). |
832 | 848 | */ |
833 | 849 | private Mono<Void> reconfigureGlobalOrBucket( |
834 | 850 | ClusterTopology topology, |
835 | 851 | @Nullable String bucketName |
836 | 852 | ) { |
837 | | - return Flux.fromIterable(topology.nodes()) |
| 853 | + List<HostAndServicePorts> nodes = bucketName == null |
| 854 | + ? topology.nodes() |
| 855 | + : nodesWithoutIrrelevantKvServices(topology.requireBucket()); |
| 856 | + |
| 857 | + return Flux.fromIterable(nodes) |
838 | 858 | .flatMap(ni -> { |
839 | 859 | Flux<Void> serviceRemoveFlux = Flux |
840 | 860 | .fromArray(ServiceType.values()) |
|
0 commit comments