36
36
import io .grpc .xds .client .Bootstrapper .ServerInfo ;
37
37
import io .grpc .xds .client .EnvoyProtoData .Node ;
38
38
import io .grpc .xds .client .XdsClient .ProcessingTracker ;
39
- import io .grpc .xds .client .XdsClient .ResourceStore ;
40
39
import io .grpc .xds .client .XdsClient .XdsResponseHandler ;
41
40
import io .grpc .xds .client .XdsLogger .XdsLogLevel ;
42
- import io .grpc .xds .client .XdsTransportFactory .EventHandler ;
43
41
import io .grpc .xds .client .XdsTransportFactory .StreamingCall ;
44
42
import io .grpc .xds .client .XdsTransportFactory .XdsTransport ;
45
43
import java .util .Collection ;
@@ -65,40 +63,41 @@ final class ControlPlaneClient {
65
63
private final ServerInfo serverInfo ;
66
64
private final XdsTransport xdsTransport ;
67
65
private final XdsResponseHandler xdsResponseHandler ;
68
- private final ResourceStore resourceStore ;
66
+ private final XdsClient . ResourceStore resourceStore ;
69
67
private final ScheduledExecutorService timeService ;
70
68
private final BackoffPolicy .Provider backoffPolicyProvider ;
71
69
private final Stopwatch stopwatch ;
72
70
private final Node bootstrapNode ;
73
- private final XdsClient xdsClient ;
74
71
75
72
// Last successfully applied version_info for each resource type. Starts with empty string.
76
73
// A version_info is used to update management server with client's most recent knowledge of
77
74
// resources.
78
75
private final Map <XdsResourceType <?>, String > versions = new HashMap <>();
79
76
80
77
private boolean shutdown ;
78
+ private boolean lastStateWasReady ;
79
+ private boolean inError ;
80
+
81
81
@ Nullable
82
82
private AdsStream adsStream ;
83
83
@ Nullable
84
84
private BackoffPolicy retryBackoffPolicy ;
85
85
@ Nullable
86
86
private ScheduledHandle rpcRetryTimer ;
87
- private MessagePrettyPrinter messagePrinter ;
87
+ private final MessagePrettyPrinter messagePrinter ;
88
88
89
89
/** An entity that manages ADS RPCs over a single channel. */
90
90
ControlPlaneClient (
91
91
XdsTransport xdsTransport ,
92
92
ServerInfo serverInfo ,
93
93
Node bootstrapNode ,
94
94
XdsResponseHandler xdsResponseHandler ,
95
- ResourceStore resourceStore ,
95
+ XdsClient . ResourceStore resourceStore ,
96
96
ScheduledExecutorService
97
97
timeService ,
98
98
SynchronizationContext syncContext ,
99
99
BackoffPolicy .Provider backoffPolicyProvider ,
100
100
Supplier <Stopwatch > stopwatchSupplier ,
101
- XdsClient xdsClient ,
102
101
MessagePrettyPrinter messagePrinter ) {
103
102
this .serverInfo = checkNotNull (serverInfo , "serverInfo" );
104
103
this .xdsTransport = checkNotNull (xdsTransport , "xdsTransport" );
@@ -108,7 +107,6 @@ final class ControlPlaneClient {
108
107
this .timeService = checkNotNull (timeService , "timeService" );
109
108
this .syncContext = checkNotNull (syncContext , "syncContext" );
110
109
this .backoffPolicyProvider = checkNotNull (backoffPolicyProvider , "backoffPolicyProvider" );
111
- this .xdsClient = checkNotNull (xdsClient , "xdsClient" );
112
110
this .messagePrinter = checkNotNull (messagePrinter , "messagePrinter" );
113
111
stopwatch = checkNotNull (stopwatchSupplier , "stopwatchSupplier" ).get ();
114
112
logId = InternalLogId .allocate ("xds-client" , serverInfo .target ());
@@ -138,6 +136,10 @@ public String toString() {
138
136
return logId .toString ();
139
137
}
140
138
139
+ public ServerInfo getServerInfo () {
140
+ return serverInfo ;
141
+ }
142
+
141
143
/**
142
144
* Updates the resource subscription for the given resource type.
143
145
*/
@@ -148,7 +150,15 @@ void adjustResourceSubscription(XdsResourceType<?> resourceType) {
148
150
}
149
151
if (adsStream == null ) {
150
152
startRpcStream ();
153
+ // when the stream becomes ready, it will send the discovery requests
154
+ return ;
155
+ }
156
+
157
+ // We will do the rest of the method as part of the readyHandler when the stream is ready.
158
+ if (!lastStateWasReady ) {
159
+ return ;
151
160
}
161
+
152
162
Collection <String > resources = resourceStore .getSubscribedResources (serverInfo , resourceType );
153
163
if (resources == null ) {
154
164
resources = Collections .emptyList ();
@@ -203,25 +213,45 @@ boolean isInBackoff() {
203
213
204
214
// Must be synchronized.
205
215
boolean isReady () {
206
- return adsStream != null && adsStream .call != null && adsStream .call .isReady ();
216
+ return adsStream != null && adsStream .call != null
217
+ && adsStream .call .isReady () && !adsStream .closed ;
218
+ }
219
+
220
+ boolean isResponseReceived () {
221
+ return adsStream != null && adsStream .responseReceived ;
222
+ }
223
+
224
+ boolean isConnected () {
225
+ return lastStateWasReady ;
226
+ }
227
+
228
+ boolean isInError () {
229
+ return inError ;
207
230
}
208
231
232
+
209
233
/**
210
234
* Starts a timer for each requested resource that hasn't been responded to and
211
235
* has been waiting for the channel to get ready.
212
236
*/
213
237
// Must be synchronized.
214
238
void readyHandler () {
215
239
if (!isReady ()) {
240
+ logger .log (XdsLogLevel .DEBUG , "ADS stream ready handler called, but not ready {0}" , logId );
216
241
return ;
217
242
}
218
243
219
- if (isInBackoff ()) {
244
+ logger .log (XdsLogLevel .DEBUG , "ADS stream ready {0}" , logId );
245
+
246
+ if (rpcRetryTimer != null ) {
220
247
rpcRetryTimer .cancel ();
221
248
rpcRetryTimer = null ;
222
249
}
223
250
224
- xdsClient .startSubscriberTimersIfNeeded (serverInfo );
251
+ if (!lastStateWasReady ) {
252
+ lastStateWasReady = true ;
253
+ xdsResponseHandler .handleStreamRestarted (serverInfo );
254
+ }
225
255
}
226
256
227
257
/**
@@ -232,27 +262,50 @@ void readyHandler() {
232
262
private void startRpcStream () {
233
263
checkState (adsStream == null , "Previous adsStream has not been cleared yet" );
234
264
adsStream = new AdsStream ();
265
+ adsStream .start ();
235
266
logger .log (XdsLogLevel .INFO , "ADS stream started" );
236
267
stopwatch .reset ().start ();
237
268
}
238
269
270
+ void sendDiscoveryRequests () {
271
+ if (adsStream == null ) {
272
+ startRpcStream ();
273
+ // when the stream becomes ready, it will send the discovery requests
274
+ return ;
275
+ }
276
+
277
+ if (isConnected ()) {
278
+ adjustAllResourceSubscriptions ();
279
+ }
280
+ }
281
+
282
+ void adjustAllResourceSubscriptions () {
283
+ if (isInBackoff ()) {
284
+ return ;
285
+ }
286
+
287
+ Set <XdsResourceType <?>> subscribedResourceTypes =
288
+ new HashSet <>(resourceStore .getSubscribedResourceTypesWithTypeUrl ().values ());
289
+
290
+ for (XdsResourceType <?> type : subscribedResourceTypes ) {
291
+ adjustResourceSubscription (type );
292
+ }
293
+ }
294
+
239
295
@ VisibleForTesting
240
296
public final class RpcRetryTask implements Runnable {
241
297
@ Override
242
298
public void run () {
243
- if (shutdown ) {
299
+ logger .log (XdsLogLevel .DEBUG , "Retry timeout. Restart ADS stream {0}" , logId );
300
+ if (shutdown || isReady ()) {
244
301
return ;
245
302
}
246
- startRpcStream ();
247
- Set <XdsResourceType <?>> subscribedResourceTypes =
248
- new HashSet <>(resourceStore .getSubscribedResourceTypesWithTypeUrl ().values ());
249
- for (XdsResourceType <?> type : subscribedResourceTypes ) {
250
- Collection <String > resources = resourceStore .getSubscribedResources (serverInfo , type );
251
- if (resources != null ) {
252
- adsStream .sendDiscoveryRequest (type , resources );
253
- }
303
+
304
+ if (adsStream == null ) {
305
+ startRpcStream ();
254
306
}
255
- xdsResponseHandler .handleStreamRestarted (serverInfo );
307
+
308
+ // handling CPC management is triggered in readyHandler
256
309
}
257
310
}
258
311
@@ -262,7 +315,7 @@ XdsResourceType<?> fromTypeUrl(String typeUrl) {
262
315
return resourceStore .getSubscribedResourceTypesWithTypeUrl ().get (typeUrl );
263
316
}
264
317
265
- private class AdsStream implements EventHandler <DiscoveryResponse > {
318
+ private class AdsStream implements XdsTransportFactory . EventHandler <DiscoveryResponse > {
266
319
private boolean responseReceived ;
267
320
private boolean closed ;
268
321
// Response nonce for the most recently received discovery responses of each resource type.
@@ -279,6 +332,9 @@ private class AdsStream implements EventHandler<DiscoveryResponse> {
279
332
private AdsStream () {
280
333
this .call = xdsTransport .createStreamingCall (methodDescriptor .getFullMethodName (),
281
334
methodDescriptor .getRequestMarshaller (), methodDescriptor .getResponseMarshaller ());
335
+ }
336
+
337
+ void start () {
282
338
call .start (this );
283
339
}
284
340
@@ -363,10 +419,13 @@ public void onStatusReceived(final Status status) {
363
419
final void handleRpcResponse (XdsResourceType <?> type , String versionInfo , List <Any > resources ,
364
420
String nonce ) {
365
421
checkNotNull (type , "type" );
422
+
366
423
if (closed ) {
367
424
return ;
368
425
}
426
+
369
427
responseReceived = true ;
428
+ inError = false ;
370
429
respNonces .put (type , nonce );
371
430
ProcessingTracker processingTracker = new ProcessingTracker (
372
431
() -> call .startRecvMessage (), syncContext );
@@ -376,6 +435,10 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
376
435
}
377
436
378
437
private void handleRpcStreamClosed (Status status ) {
438
+ if (this == adsStream || adsStream == null ) {
439
+ lastStateWasReady = false ;
440
+ }
441
+
379
442
if (closed ) {
380
443
return ;
381
444
}
@@ -385,13 +448,16 @@ private void handleRpcStreamClosed(Status status) {
385
448
// has never been initialized.
386
449
retryBackoffPolicy = backoffPolicyProvider .get ();
387
450
}
451
+
388
452
// FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
389
453
// to avoid TSAN races, since tests may wait until callbacks are called but then would run
390
454
// concurrently with the stopwatch and schedule.
455
+
391
456
long elapsed = stopwatch .elapsed (TimeUnit .NANOSECONDS );
392
457
long delayNanos = Math .max (0 , retryBackoffPolicy .nextBackoffNanos () - elapsed );
393
- rpcRetryTimer = syncContext .schedule (
394
- new RpcRetryTask (), delayNanos , TimeUnit .NANOSECONDS , timeService );
458
+
459
+ rpcRetryTimer =
460
+ syncContext .schedule (new RpcRetryTask (), delayNanos , TimeUnit .NANOSECONDS , timeService );
395
461
396
462
Status newStatus = status ;
397
463
if (responseReceived ) {
@@ -410,6 +476,7 @@ private void handleRpcStreamClosed(Status status) {
410
476
} else {
411
477
// If the ADS stream is closed without ever having received a response from the server, then
412
478
// the XdsClient should consider that a connectivity error (see gRFC A57).
479
+ inError = true ;
413
480
if (status .isOk ()) {
414
481
newStatus = Status .UNAVAILABLE .withDescription (
415
482
"ADS stream closed with OK before receiving a response" );
@@ -420,10 +487,8 @@ private void handleRpcStreamClosed(Status status) {
420
487
}
421
488
422
489
closed = true ;
423
- xdsResponseHandler .handleStreamClosed (newStatus );
490
+ xdsResponseHandler .handleStreamClosed (newStatus , ! responseReceived );
424
491
cleanUp ();
425
-
426
- logger .log (XdsLogLevel .INFO , "Retry ADS stream in {0} ns" , delayNanos );
427
492
}
428
493
429
494
private void close (Exception error ) {
@@ -441,4 +506,5 @@ private void cleanUp() {
441
506
}
442
507
}
443
508
}
509
+
444
510
}
0 commit comments