Skip to content

Commit 96cecb2

Browse files
committed
Fix fallback to pass all tests
1 parent 57c0352 commit 96cecb2

File tree

6 files changed

+69
-68
lines changed

6 files changed

+69
-68
lines changed

xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,14 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
137137
private XdsClient xdsClient;
138138
@GuardedBy("lock")
139139
private int refCount;
140-
private final Constructor backoffProviderConstructor;
140+
private final Constructor<? extends BackoffPolicy.Provider> backoffProviderConstructor;
141141

142142
@VisibleForTesting
143143
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target,
144144
Class<? extends BackoffPolicy.Provider> rawBackoffProviderClass) {
145145
this.bootstrapInfo = checkNotNull(bootstrapInfo);
146146
this.target = target;
147-
Class backoffProviderClass = rawBackoffProviderClass != null
147+
Class<? extends BackoffPolicy.Provider> backoffProviderClass = rawBackoffProviderClass != null
148148
? rawBackoffProviderClass
149149
: ExponentialBackoffPolicy.Provider.class;
150150
try {

xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ final class ControlPlaneClient {
9999
SynchronizationContext syncContext,
100100
BackoffPolicy.Provider backoffPolicyProvider,
101101
Supplier<Stopwatch> stopwatchSupplier,
102-
XdsClient xdsClient, // Has been replaced by xdsResponseHandler
103102
MessagePrettyPrinter messagePrinter) {
104103
this.serverInfo = checkNotNull(serverInfo, "serverInfo");
105104
this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
@@ -152,10 +151,13 @@ void adjustResourceSubscription(XdsResourceType<?> resourceType, String authorit
152151
}
153152
if (adsStream == null) {
154153
startRpcStream();
154+
// when the stream becomes ready, it will send the discovery requests
155+
return;
155156
}
157+
156158
Collection<String> resources =
157159
resourceStore.getSubscribedResources(serverInfo, resourceType, authority);
158-
if (resources != null) {
160+
if (resources != null && !resources.isEmpty()) {
159161
adsStream.sendDiscoveryRequest(resourceType, resources);
160162
}
161163
}
@@ -244,13 +246,16 @@ void sendDiscoveryRequests(String authority) {
244246
new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values());
245247
for (XdsResourceType<?> type : subscribedResourceTypes) {
246248
Collection<String> resources =
247-
type.updateInPlaceOnFallback()
248-
? resourceStore.getAllResources(type, authority)
249-
: resourceStore.getSubscribedResources(serverInfo, type, authority);
249+
resourceStore.getSubscribedResources(serverInfo, type, authority);
250250
if (resources != null && !resources.isEmpty()) {
251251
adsStream.sendDiscoveryRequest(type, resources);
252252
}
253+
}
254+
}
253255

256+
public <T extends XdsClient.ResourceUpdate> void removeNonceForType(XdsResourceType<T> type) {
257+
if (!shutdown && adsStream != null) {
258+
adsStream.respNonces.remove(type);
254259
}
255260
}
256261

@@ -278,6 +283,7 @@ XdsResourceType<?> fromTypeUrl(String typeUrl) {
278283
private class AdsStream implements EventHandler<DiscoveryResponse> {
279284
private boolean responseReceived;
280285
private boolean closed;
286+
private boolean hasGoneReady = false;
281287
// Response nonce for the most recently received discovery responses of each resource type.
282288
// Client initiated requests start response nonce with empty string.
283289
// Nonce in each response is echoed back in the following ACK/NACK request. It is
@@ -337,6 +343,11 @@ final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> reso
337343

338344
@Override
339345
public void onReady() {
346+
if (shutdown || closed || hasGoneReady) {
347+
return;
348+
}
349+
350+
hasGoneReady = true;
340351
syncContext.execute(ControlPlaneClient.this::readyHandler);
341352
}
342353

@@ -439,7 +450,11 @@ private void cleanUp() {
439450

440451
private long scheduleRpcRetry() {
441452
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
453+
if (retryBackoffPolicy == null) {
454+
retryBackoffPolicy = backoffPolicyProvider.get();
455+
}
442456
long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
457+
443458
rpcRetryTimer =
444459
syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
445460
return delayNanos;

xds/src/main/java/io/grpc/xds/client/XdsClient.java

+9-11
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ void handleResourceResponse(
417417
* Is expected do manage the ControlPlanClients and cache updates associated with
418418
* Moving to or from a fallback server.
419419
*
420-
* Must be synchronized.
420+
* <p>Must be synchronized.
421421
*/
422422
void handleStreamReady(ServerInfo serverInfo);
423423
}
@@ -434,17 +434,15 @@ public interface ResourceStore {
434434
@Nullable
435435
Collection<String> getSubscribedResources(ServerInfo serverInfo,
436436
XdsResourceType<? extends ResourceUpdate> type);
437-
default Collection<String> getSubscribedResources(
438-
ServerInfo serverInfo, XdsResourceType<? extends ResourceUpdate> type, String authority) {
439-
return getSubscribedResources(serverInfo, type);
440-
};
441437

442-
Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl();
443-
444-
Collection<String> getAllResources(XdsResourceType<?> type);
445-
default Collection<String> getAllResources(XdsResourceType<?> type, String authority) {
446-
return getAllResources(type);
447-
}
438+
/**
439+
* Like {@link #getSubscribedResources(ServerInfo, XdsResourceType)}, but limits the results to
440+
* those matching the given authority.
441+
*/
442+
@Nullable
443+
Collection<String> getSubscribedResources(
444+
ServerInfo serverInfo, XdsResourceType<? extends ResourceUpdate> type, String authority);
448445

446+
Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl();
449447
}
450448
}

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

+29-45
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,20 @@ public void uncaughtException(Thread t, Throwable e) {
8282
}
8383
});
8484

85-
private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap =
86-
new HashMap<>();
87-
final Map<ServerInfo, LoadReportClient> serverLrsClientMap =
88-
new HashMap<>();
85+
private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap = new HashMap<>();
86+
final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
87+
/** Map of authority to its active control plane client (affected by xds fallback). */
88+
private final Map<String, ControlPlaneClient> activeCpClients = new HashMap<>();
8989

9090
private final Map<ServerInfo, ControlPlaneClient> serverCpClientMap = new HashMap<>();
91+
92+
/** Maps resource type to the corresponding map of subscribers (keyed by subscriber name). */
9193
private final Map<XdsResourceType<? extends ResourceUpdate>,
9294
Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
9395
resourceSubscribers = new HashMap<>();
96+
/** Maps typeUrl to the corresponding XdsResourceType. */
9497
private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
98+
9599
private final XdsTransportFactory xdsTransportFactory;
96100
private final Bootstrapper.BootstrapInfo bootstrapInfo;
97101
private final ScheduledExecutorService timeService;
@@ -103,10 +107,6 @@ public void uncaughtException(Thread t, Throwable e) {
103107
private final XdsLogger logger;
104108
private volatile boolean isShutdown;
105109
private final MessagePrettyPrinter messagePrinter;
106-
/**
107-
* Map of authority to its active control plane client.
108-
*/
109-
private Map<String, ControlPlaneClient> activeCpClients = new HashMap<>();
110110

111111
public XdsClientImpl(
112112
XdsTransportFactory xdsTransportFactory,
@@ -172,7 +172,7 @@ public Collection<String> getSubscribedResources(
172172
@Override
173173
public Collection<String> getSubscribedResources(
174174
ServerInfo serverInfo, XdsResourceType<? extends ResourceUpdate> type, String authority) {
175-
return getSubscribedResources(serverInfo, type, authority, false);
175+
return getSubscribedResources(serverInfo, type, authority, true);
176176
}
177177

178178
private Collection<String> getSubscribedResources(ServerInfo serverInfo, XdsResourceType<?
@@ -183,44 +183,21 @@ private Collection<String> getSubscribedResources(ServerInfo serverInfo, XdsReso
183183
ControlPlaneClient controlPlaneClient = serverCpClientMap.get(serverInfo);
184184
for (String key : resources.keySet()) {
185185
ResourceSubscriber<? extends ResourceUpdate> resource = resources.get(key);
186-
if (resource.controlPlaneClient.equals(controlPlaneClient) &&
187-
(!useAuthority || Objects.equals(authority, resource.authority))) {
186+
if (resource.controlPlaneClient.equals(controlPlaneClient)
187+
&& (!useAuthority || Objects.equals(authority, resource.authority))) {
188188
builder.add(key);
189189
}
190190
}
191191
Collection<String> retVal = builder.build();
192192
return retVal.isEmpty() ? null : retVal;
193193
}
194194

195-
@Override
196-
public Collection<String> getAllResources(XdsResourceType<?> type) {
197-
if (!resourceSubscribers.containsKey(type)) {
198-
return null;
199-
}
200-
return ImmutableList.copyOf(resourceSubscribers.get(type).keySet());
201-
}
202-
203-
@Override
204-
public Collection<String> getAllResources(XdsResourceType<?> type, String authority) {
205-
if (!resourceSubscribers.containsKey(type)) {
206-
return null;
207-
}
208-
209-
List<String> matchingResources = new ArrayList<>();
210-
for (ResourceSubscriber<?> resource : resourceSubscribers.get(type).values()) {
211-
if (Objects.equals(authority, resource.authority)) {
212-
matchingResources.add(resource.resource);
213-
}
214-
}
215-
return matchingResources;
216-
}
217-
218195

219196
// As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
220197
// ResourceTypes that do not have subscribers does not show up in the snapshot keys.
221198
@Override
222199
public ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
223-
getSubscribedResourcesMetadataSnapshot() {
200+
getSubscribedResourcesMetadataSnapshot() {
224201
final SettableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future =
225202
SettableFuture.create();
226203
syncContext.execute(new Runnable() {
@@ -290,17 +267,22 @@ public void run() {
290267
if (subscriber == null) {
291268
logger.log(XdsLogLevel.WARNING, "double cancel of resource watch for {0}:{1}",
292269
type.typeName(), resourceName);
270+
return;
293271
}
294272
subscriber.removeWatcher(watcher);
295273
if (!subscriber.isWatched()) {
296274
subscriber.cancelResourceWatch();
297275
resourceSubscribers.get(type).remove(resourceName);
298-
if (subscriber.controlPlaneClient != null) {
299-
subscriber.controlPlaneClient.adjustResourceSubscription(type, subscriber.authority);
276+
ControlPlaneClient controlPlaneClient = subscriber.controlPlaneClient;
277+
if (controlPlaneClient != null) {
278+
controlPlaneClient.adjustResourceSubscription(type, subscriber.authority);
300279
}
301280
if (resourceSubscribers.get(type).isEmpty()) {
302281
resourceSubscribers.remove(type);
303282
subscribedResourceTypeUrls.remove(type.typeUrl());
283+
if (controlPlaneClient != null) {
284+
controlPlaneClient.removeNonceForType(type);
285+
}
304286
}
305287
}
306288
}
@@ -432,7 +414,6 @@ private ControlPlaneClient getOrCreateControlPlaneClient(ServerInfo serverInfo)
432414
syncContext,
433415
backoffPolicyProvider,
434416
stopwatchSupplier,
435-
this,
436417
messagePrinter
437418
);
438419

@@ -1015,8 +996,8 @@ private void internalHandleStreamReady(
1015996
return;
1016997
}
1017998

1018-
if (activeCpClient.isReady() &&
1019-
compareCpClients(activeCpClient, controlPlaneClient, authority) < 0) {
999+
if (activeCpClient.isReady()
1000+
&& compareCpClients(activeCpClient, controlPlaneClient, authority) < 0) {
10201001
logger.log(XdsLogLevel.INFO, "Ignoring stream restart for lower priority server {0}",
10211002
serverInfo.target());
10221003
return;
@@ -1027,7 +1008,7 @@ private void internalHandleStreamReady(
10271008
resourceSubscribers.values()) {
10281009
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
10291010
if (subscriber.controlPlaneClient == controlPlaneClient
1030-
&& subscriber.authority == authority) {
1011+
&& Objects.equals(subscriber.authority, authority)) {
10311012
subscriber.restartTimer();
10321013
}
10331014
}
@@ -1036,9 +1017,10 @@ private void internalHandleStreamReady(
10361017
if (activeCpClient != controlPlaneClient) {
10371018
activeCpClients.put(authority, controlPlaneClient);
10381019
updateRootResources(controlPlaneClient, authority, false);
1039-
controlPlaneClient.sendDiscoveryRequests(authority);
10401020
}
10411021

1022+
controlPlaneClient.sendDiscoveryRequests(authority);
1023+
10421024
// Shutdown any lower priority control plane clients.
10431025
Iterator<ControlPlaneClient> iterator = serverCpClientMap.values().iterator();
10441026
while (iterator.hasNext()) {
@@ -1049,7 +1031,6 @@ private void internalHandleStreamReady(
10491031
}
10501032
}
10511033
}
1052-
10531034
}
10541035

10551036
private int compareCpClients(ControlPlaneClient base, ControlPlaneClient other,
@@ -1059,10 +1040,13 @@ private int compareCpClients(ControlPlaneClient base, ControlPlaneClient other,
10591040
}
10601041

10611042
ImmutableList<ServerInfo> serverInfos = getServerInfos(authority);
1062-
if (!serverInfos.contains(base.getServerInfo()) || !serverInfos.contains(other.getServerInfo())) {
1043+
ServerInfo baseServerInfo = base.getServerInfo();
1044+
ServerInfo otherServerInfo = other.getServerInfo();
1045+
1046+
if (!serverInfos.contains(baseServerInfo) || !serverInfos.contains(otherServerInfo)) {
10631047
return -100; // At least one of them isn't serving this authority
10641048
}
10651049

1066-
return serverInfos.indexOf(base.getServerInfo()) - serverInfos.indexOf(other.getServerInfo());
1050+
return serverInfos.indexOf(baseServerInfo) - serverInfos.indexOf(otherServerInfo);
10671051
}
10681052
}

xds/src/test/java/io/grpc/xds/CsdsServiceTest.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -455,15 +455,18 @@ public Collection<String> getSubscribedResources(ServerInfo serverInfo,
455455
return null;
456456
}
457457

458+
@Nullable
458459
@Override
459-
public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() {
460-
return ImmutableMap.of();
460+
public Collection<String> getSubscribedResources(
461+
ServerInfo serverInfo, XdsResourceType<? extends ResourceUpdate> type, String authority) {
462+
return null;
461463
}
462464

463465
@Override
464-
public Collection<String> getAllResources(XdsResourceType<?> type) {
465-
return null;
466+
public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() {
467+
return ImmutableMap.of();
466468
}
469+
467470
}
468471

469472
private static class FakeXdsClientPoolFactory implements XdsClientPoolFactory {

xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public class XdsClientFallbackTest {
6969
public void onChanged(XdsListenerResource.LdsUpdate update) {
7070
log.info("LDS update: " + update);
7171
}
72+
7273
@Override
7374
public void onError(Status error) {
7475
log.info("LDS update error: " + error.getDescription());
@@ -109,7 +110,7 @@ public void setUp() throws XdsInitializationException {
109110

110111
SharedXdsClientPoolProvider clientPoolProvider = new SharedXdsClientPoolProvider();
111112
clientPoolProvider.setBootstrapOverride(defaultBootstrapOverride());
112-
// clientPoolProvider.setBackoffProviderClass(MinimalBackoffPolicyProvider.class);
113+
// clientPoolProvider.setBackoffProviderClass(MinimalBackoffPolicyProvider.class);
113114
xdsClientPool = clientPoolProvider.getOrCreate(DUMMY_TARGET);
114115
}
115116

0 commit comments

Comments
 (0)