Skip to content

Commit 0c7a8c0

Browse files
committed
JVMCBC-1693 Debounce GetConfig requests
Motivation ---------- During rebalance, the SDK receives many cluster topology change notifications. Instead of fetching the new topology once for each notification, the SDK should reduce the number of fetches by ignoring obsolete notifications. Modifications ------------- Process topology update notifications one at a time: * Serialize the topology update signal processing by using `concatMap` instead of `flatMap`. * Before fetching new topology, compare current topology against the latest version the server told us about. If we have up-to-date topology, ignore the obsolete notification. * Call `provider.propose{Bucket,Global}Config` in `doOnNext` instead of in an external subscriber. This ensures the SDK's "current" topology revision is updated immediately, before processing the next signal. -- Add `TopologyChangeNotificationBuffer` for storing notifications. Only ask server for latest topology if a notification is still relevant. Use this separate buffer for the topology revisions because storing them in a Flux would be excessively complex. -- Move the topology polling trigger flux creation code into DefaultConfigurationProvider, so it can be shared by the Global and KV refreshers. (The flux itself isn't shared -- only the code to create it). For consistency, and to ensure we don't block the KV threads, handle topology change notifications in the same thread as timer ticks (instead of in the KV thread). Change-Id: Icabc7b10eabea6977a376b931f4e4c43f05350bb Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/235174 Tested-by: Build Bot <[email protected]> Reviewed-by: Michael Reiche <[email protected]>
1 parent 006ec3c commit 0c7a8c0

File tree

12 files changed

+321
-71
lines changed

12 files changed

+321
-71
lines changed

core-io/src/integrationTest/java/com/couchbase/client/core/config/refresher/ProposedBucketConfigInspectingProvider.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
import com.couchbase.client.core.env.SeedNode;
99
import com.couchbase.client.core.io.CollectionIdentifier;
1010
import com.couchbase.client.core.io.CollectionMap;
11+
import com.couchbase.client.core.topology.TopologyRevision;
12+
import org.jspecify.annotations.Nullable;
1113
import reactor.core.publisher.Flux;
1214
import reactor.core.publisher.Mono;
1315

14-
import java.util.Collections;
16+
import java.time.Duration;
1517
import java.util.List;
1618
import java.util.Set;
1719
import java.util.concurrent.CopyOnWriteArrayList;
@@ -133,11 +135,16 @@ public void signalConfigRefreshFailed(ConfigRefreshFailure failure) {
133135
}
134136

135137
@Override
136-
public void signalConfigChanged() {
138+
public void signalNewTopologyAvailable(@Nullable String bucketName, @Nullable TopologyRevision availableRevision) {
137139
}
138140

139141
@Override
140-
public Flux<Long> configChangeNotifications() {
141-
return Flux.empty();
142+
public @Nullable TopologyRevision removeTopologyRevisionChangeNotification(@Nullable String bucketName) {
143+
return null;
144+
}
145+
146+
@Override
147+
public Flux<TopologyPollingTrigger> topologyPollingTriggers(Duration timerInterval) {
148+
return delegate.topologyPollingTriggers(timerInterval);
142149
}
143150
}

core-io/src/main/java/com/couchbase/client/core/config/ClusterConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.couchbase.client.core.service.ServiceType;
2121
import com.couchbase.client.core.topology.ClusterTopology;
2222
import com.couchbase.client.core.topology.ClusterTopologyWithBucket;
23+
import com.couchbase.client.core.topology.TopologyRevision;
2324
import reactor.util.annotation.Nullable;
2425

2526
import java.util.Collection;
@@ -77,6 +78,16 @@ public Collection<ClusterTopologyWithBucket> bucketTopologies() {
7778
return g == null ? null : g.asClusterTopology();
7879
}
7980

81+
public TopologyRevision globalTopologyRevision() {
82+
ClusterTopology t = globalTopology();
83+
return t == null ? TopologyRevision.ZERO : t.revision();
84+
}
85+
86+
public TopologyRevision bucketTopologyRevision(String bucketName) {
87+
ClusterTopology t = bucketTopology(bucketName);
88+
return t == null ? TopologyRevision.ZERO : t.revision();
89+
}
90+
8091
public BucketConfig bucketConfig(final String bucketName) {
8192
return bucketConfigs.get(bucketName);
8293
}

core-io/src/main/java/com/couchbase/client/core/config/ConfigurationProvider.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
import com.couchbase.client.core.env.SeedNode;
2121
import com.couchbase.client.core.io.CollectionIdentifier;
2222
import com.couchbase.client.core.io.CollectionMap;
23+
import com.couchbase.client.core.topology.TopologyRevision;
24+
import org.jspecify.annotations.Nullable;
2325
import reactor.core.publisher.Flux;
2426
import reactor.core.publisher.Mono;
2527

28+
import java.time.Duration;
2629
import java.util.Set;
2730

2831
/**
@@ -170,17 +173,35 @@ default void republishCurrentConfig() {
170173
/**
171174
* Signals to the config provider that the server sent a notification
172175
* that the cluster topology or bucket config changed.
176+
*
177+
* @param bucketName null if the signal is for the global topology
178+
* @param availableRevision null if the server did not tell us the new revision
173179
*/
174-
void signalConfigChanged();
180+
void signalNewTopologyAvailable(
181+
@Nullable String bucketName,
182+
@Nullable TopologyRevision availableRevision
183+
);
175184

176185
/**
177-
* Returns a feed that emits {@value TRIGGERED_BY_CONFIG_CHANGE_NOTIFICATION}
178-
* whenever someone calls {@link #signalConfigChanged()}.
186+
* Returns a feed of polling triggers, a combination of timer ticks and server notifications.
179187
*/
180-
Flux<Long> configChangeNotifications();
188+
Flux<TopologyPollingTrigger> topologyPollingTriggers(Duration timerInterval);
189+
190+
enum TopologyPollingTrigger {
191+
TIMER,
192+
SERVER_NOTIFICATION,
193+
}
181194

182195
/**
183-
* The value emitted by the {@link #configChangeNotifications()} feed.
196+
* Returns and removes the latest unprocessed topology revision change notification
197+
* for the specified context (bucket/global), or null if there are no
198+
* unprocessed notifications for the specified context.
199+
* <p>
200+
* Might return the greatest possible topology revision if the server told us
201+
* our topology was obsolete but didn't tell us the latest revision.
202+
*
203+
* @param bucketName null means global
184204
*/
185-
long TRIGGERED_BY_CONFIG_CHANGE_NOTIFICATION = -1L;
205+
@Nullable
206+
TopologyRevision removeTopologyRevisionChangeNotification(@Nullable String bucketName);
186207
}

core-io/src/main/java/com/couchbase/client/core/config/DefaultConfigurationProvider.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import com.couchbase.client.core.topology.NodeIdentifier;
6969
import com.couchbase.client.core.topology.PortSelector;
7070
import com.couchbase.client.core.topology.TopologyParser;
71+
import com.couchbase.client.core.topology.TopologyRevision;
7172
import com.couchbase.client.core.util.ConnectionString;
7273
import com.couchbase.client.core.util.ConnectionStringUtil;
7374
import com.couchbase.client.core.util.NanoTimestamp;
@@ -79,6 +80,7 @@
7980
import reactor.core.publisher.Flux;
8081
import reactor.core.publisher.Mono;
8182
import reactor.core.publisher.Sinks;
83+
import reactor.core.scheduler.Scheduler;
8284
import reactor.core.scheduler.Schedulers;
8385
import reactor.util.annotation.Nullable;
8486
import reactor.util.retry.Retry;
@@ -100,6 +102,7 @@
100102
import static com.couchbase.client.core.Reactor.ignoreIfDone;
101103
import static com.couchbase.client.core.logging.RedactableArgument.redactMeta;
102104
import static com.couchbase.client.core.logging.RedactableArgument.redactSystem;
105+
import static com.couchbase.client.core.util.CbStrings.nullToEmpty;
103106
import static com.couchbase.client.core.util.ConnectionStringUtil.fromDnsSrvOrThrowIfTlsRequired;
104107
import static java.util.Collections.emptySet;
105108
import static java.util.Objects.requireNonNull;
@@ -185,7 +188,8 @@ public class DefaultConfigurationProvider implements ConfigurationProvider {
185188

186189
private volatile NanoTimestamp lastDnsSrvLookup = NanoTimestamp.never();
187190

188-
private final Sinks.Many<Long> configPollTrigger = Sinks.many().multicast().directBestEffort();
191+
private final Sinks.Many<TopologyPollingTrigger> topologyPollingTriggers = Sinks.many().multicast().onBackpressureBuffer(1);
192+
private final TopologyChangeNotificationBuffer topologyChangeNotificationBuffer = new TopologyChangeNotificationBuffer();
189193

190194
/**
191195
* Creates a new configuration provider.
@@ -812,14 +816,46 @@ public void signalConfigRefreshFailed(final ConfigRefreshFailure failure) {
812816
}
813817
}
814818

819+
private static final TopologyRevision NEWER_THAN_WHAT_WE_HAVE = new TopologyRevision(Long.MAX_VALUE, Long.MAX_VALUE) {
820+
@Override
821+
public String toString() {
822+
return "?.?";
823+
}
824+
};
825+
815826
@Override
816-
public synchronized void signalConfigChanged() {
817-
configPollTrigger.tryEmitNext(TRIGGERED_BY_CONFIG_CHANGE_NOTIFICATION);
827+
public void signalNewTopologyAvailable(
828+
@Nullable String bucketName,
829+
@Nullable TopologyRevision availableRevision
830+
) {
831+
832+
if (availableRevision == null) {
833+
availableRevision = NEWER_THAN_WHAT_WE_HAVE;
834+
}
835+
836+
if (topologyChangeNotificationBuffer.putIfNewer(nullToEmpty(bucketName), availableRevision)) {
837+
// synchronized to prevent tryEmitNext from returning EmitResult.FAIL_NON_SERIALIZED
838+
synchronized (this) {
839+
topologyPollingTriggers.tryEmitNext(TopologyPollingTrigger.SERVER_NOTIFICATION);
840+
}
841+
}
818842
}
819843

820844
@Override
821-
public Flux<Long> configChangeNotifications() {
822-
return configPollTrigger.asFlux();
845+
public Flux<TopologyPollingTrigger> topologyPollingTriggers(Duration timerInterval) {
846+
Scheduler scheduler = core.context().environment().scheduler();
847+
848+
return Flux.merge(
849+
Flux.interval(timerInterval, core.context().environment().scheduler())
850+
.onBackpressureDrop() // otherwise the `interval` operator signals an error
851+
.map(it -> TopologyPollingTrigger.TIMER),
852+
853+
topologyPollingTriggers.asFlux().publishOn(scheduler)
854+
);
855+
}
856+
857+
public @Nullable TopologyRevision removeTopologyRevisionChangeNotification(@Nullable String bucketName) {
858+
return topologyChangeNotificationBuffer.remove(bucketName);
823859
}
824860

825861
/**
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2025 Couchbase, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.couchbase.client.core.config;
18+
19+
import com.couchbase.client.core.annotation.Stability;
20+
import com.couchbase.client.core.topology.TopologyRevision;
21+
import org.jspecify.annotations.NullMarked;
22+
import org.jspecify.annotations.Nullable;
23+
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.ConcurrentMap;
26+
27+
import static com.couchbase.client.core.util.CbStrings.nullToEmpty;
28+
29+
/**
30+
* "Squashes" notifications, retaining only the highest revision for each bucket (plus global).
31+
*/
32+
@NullMarked
33+
@Stability.Internal
34+
public class TopologyChangeNotificationBuffer {
35+
private final ConcurrentMap<String, TopologyRevision> bucketNameToRevision = new ConcurrentHashMap<>();
36+
37+
/**
38+
* Returns true if the given revision was added to the buffer,
39+
* or false if it was not newer than the existing entry.
40+
*
41+
* @param bucketName null means global
42+
*/
43+
public boolean putIfNewer(
44+
@Nullable String bucketName,
45+
TopologyRevision revision
46+
) {
47+
TopologyRevision updatedRevision = bucketNameToRevision.compute(
48+
key(bucketName),
49+
(k, existing) ->
50+
existing == null || revision.newerThan(existing)
51+
? revision
52+
: existing
53+
);
54+
55+
return revision == updatedRevision;
56+
}
57+
58+
/**
59+
* @param bucketName null means global
60+
*/
61+
public @Nullable TopologyRevision remove(@Nullable String bucketName) {
62+
return bucketNameToRevision.remove(key(bucketName));
63+
}
64+
65+
private static String key(@Nullable String bucketName) {
66+
return nullToEmpty(bucketName);
67+
}
68+
69+
@Override
70+
public String toString() {
71+
return "TopologyChangeNotificationBuffer{" +
72+
"bucketNameToRevision=" + bucketNameToRevision +
73+
'}';
74+
}
75+
}

core-io/src/main/java/com/couchbase/client/core/config/refresher/GlobalRefresher.java

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,17 @@
2323
import com.couchbase.client.core.config.ConfigRefreshFailure;
2424
import com.couchbase.client.core.config.ConfigVersion;
2525
import com.couchbase.client.core.config.ConfigurationProvider;
26+
import com.couchbase.client.core.config.ConfigurationProvider.TopologyPollingTrigger;
2627
import com.couchbase.client.core.config.GlobalConfig;
2728
import com.couchbase.client.core.config.PortInfo;
2829
import com.couchbase.client.core.config.ProposedGlobalConfigContext;
2930
import com.couchbase.client.core.msg.kv.CarrierGlobalConfigRequest;
3031
import com.couchbase.client.core.retry.FailFastRetryStrategy;
3132
import com.couchbase.client.core.service.ServiceType;
33+
import com.couchbase.client.core.topology.TopologyRevision;
3234
import com.couchbase.client.core.util.NanoTimestamp;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
3337
import reactor.core.Disposable;
3438
import reactor.core.publisher.Flux;
3539
import reactor.core.publisher.Mono;
@@ -42,7 +46,6 @@
4246
import java.util.concurrent.atomic.AtomicLong;
4347
import java.util.stream.Collectors;
4448

45-
import static com.couchbase.client.core.config.DefaultConfigurationProvider.TRIGGERED_BY_CONFIG_CHANGE_NOTIFICATION;
4649
import static com.couchbase.client.core.config.refresher.KeyValueBucketRefresher.MAX_PARALLEL_FETCH;
4750
import static com.couchbase.client.core.config.refresher.KeyValueBucketRefresher.POLLER_INTERVAL;
4851
import static com.couchbase.client.core.config.refresher.KeyValueBucketRefresher.clampConfigRequestTimeout;
@@ -56,6 +59,7 @@
5659
* needed.
5760
*/
5861
public class GlobalRefresher {
62+
private final Logger log = LoggerFactory.getLogger(GlobalRefresher.class);
5963

6064
/**
6165
* Holds the parent configuration provider.
@@ -116,28 +120,49 @@ public GlobalRefresher(final ConfigurationProvider provider, final Core core) {
116120
configPollInterval = core.context().environment().ioConfig().configPollInterval();
117121
configRequestTimeout = clampConfigRequestTimeout(configPollInterval);
118122

119-
pollRegistration = Flux.merge(
120-
Flux.interval(pollerInterval(), core.context().environment().scheduler()),
121-
provider.configChangeNotifications()
122-
)
123-
// Proposing a new config should be quick, but just in case it gets held up we do
124-
// not want to terminate the refresher and just drop the ticks and keep going.
125-
.onBackpressureDrop()
126-
// If the refresher is not started yet, drop all intervals
127-
.filter(v -> started)
128-
// Since the POLLER_INTERVAL is smaller than the config poll interval, make sure
129-
// we only emit poll events if enough time has elapsed -- or if the server told us
130-
// there's a new config.
131-
.filter(v -> v == TRIGGERED_BY_CONFIG_CHANGE_NOTIFICATION || lastPoll.hasElapsed(configPollInterval))
132-
.flatMap(ign -> {
123+
pollRegistration = provider.topologyPollingTriggers(pollerInterval())
124+
.filter(trigger -> started)
125+
.concatMap(trigger -> {
126+
if (trigger == TopologyPollingTrigger.TIMER) {
127+
// It's a polling tick! Ticks can be more frequent than the configured polling interval, or might occur right after a triggered update, so maybe skip this tick.
128+
if (!lastPoll.hasElapsed(configPollInterval)) {
129+
log.trace("Ignoring tick because last poll for global topology was {} ago, which is less than config poll interval {}", lastPoll.elapsed(), configPollInterval);
130+
return Mono.empty();
131+
132+
} else {
133+
log.debug("Polling for global topology because polling interval has elapsed.");
134+
}
135+
136+
} else if (trigger == TopologyPollingTrigger.SERVER_NOTIFICATION) {
137+
TopologyRevision availableRevision = provider.removeTopologyRevisionChangeNotification(null);
138+
TopologyRevision currentRevision = provider.config().globalTopologyRevision();
139+
140+
if (availableRevision == null) {
141+
log.trace("Ignoring redundant global topology change notification.");
142+
return Mono.empty();
143+
}
144+
145+
if (availableRevision.newerThan(currentRevision)) {
146+
log.debug("Fetching updated global topology; available revision {} is newer than current ({}).", availableRevision, currentRevision);
147+
148+
} else {
149+
log.debug("Skipping global topology revision {} because it's not newer than current ({}).", availableRevision, currentRevision);
150+
return Mono.empty();
151+
}
152+
153+
} else {
154+
return Mono.error(new RuntimeException("Unexpected topology poll trigger: " + trigger));
155+
}
156+
133157
List<PortInfo> nodes = filterEligibleNodes();
134158
if (numFailedRefreshes.get() >= nodes.size()) {
135159
provider.signalConfigRefreshFailed(ConfigRefreshFailure.ALL_NODES_TRIED_ONCE_WITHOUT_SUCCESS);
136160
numFailedRefreshes.set(0);
137161
}
138-
return attemptUpdateGlobalConfig(Flux.fromIterable(nodes).take(MAX_PARALLEL_FETCH));
162+
return attemptUpdateGlobalConfig(Flux.fromIterable(nodes).take(MAX_PARALLEL_FETCH))
163+
.doOnNext(provider::proposeGlobalConfig);
139164
})
140-
.subscribe(provider::proposeGlobalConfig);
165+
.subscribe();
141166
}
142167

143168
/**

0 commit comments

Comments
 (0)