18
18
import * as protoLoader from '@grpc/proto-loader' ;
19
19
// This is a non-public, unstable API, but it's very convenient
20
20
import { loadProtosWithOptionsSync } from '@grpc/proto-loader/build/src/util' ;
21
- import { loadPackageDefinition , StatusObject , status , logVerbosity , Metadata , experimental , ChannelOptions , ClientDuplexStream , ServiceError , ChannelCredentials , Channel } from '@grpc/grpc-js' ;
21
+ import { loadPackageDefinition , StatusObject , status , logVerbosity , Metadata , experimental , ChannelOptions , ClientDuplexStream , ServiceError , ChannelCredentials , Channel , connectivityState } from '@grpc/grpc-js' ;
22
22
import * as adsTypes from './generated/ads' ;
23
23
import * as lrsTypes from './generated/lrs' ;
24
24
import { loadBootstrapInfo } from './xds-bootstrap' ;
@@ -255,6 +255,7 @@ export class XdsClient {
255
255
DiscoveryRequest ,
256
256
DiscoveryResponse__Output
257
257
> | null = null ;
258
+ private receivedAdsResponseOnCurrentStream = false ;
258
259
259
260
private lrsNode : Node | null = null ;
260
261
private lrsClient : LoadReportingServiceClient | null = null ;
@@ -373,6 +374,9 @@ export class XdsClient {
373
374
{ channelOverride : channel }
374
375
) ;
375
376
this . maybeStartAdsStream ( ) ;
377
+ channel . watchConnectivityState ( channel . getConnectivityState ( false ) , Infinity , ( ) => {
378
+ this . handleAdsConnectivityStateUpdate ( ) ;
379
+ } )
376
380
377
381
this . lrsClient = new protoDefinitions . envoy . service . load_stats . v3 . LoadReportingService (
378
382
serverUri ,
@@ -394,7 +398,29 @@ export class XdsClient {
394
398
clearInterval ( this . statsTimer ) ;
395
399
}
396
400
401
+ private handleAdsConnectivityStateUpdate ( ) {
402
+ if ( ! this . adsClient ) {
403
+ return ;
404
+ }
405
+ const state = this . adsClient . getChannel ( ) . getConnectivityState ( false ) ;
406
+ if ( state === connectivityState . READY && this . adsCall ) {
407
+ this . reportAdsStreamStarted ( ) ;
408
+ }
409
+ if ( state === connectivityState . TRANSIENT_FAILURE ) {
410
+ this . reportStreamError ( {
411
+ code : status . UNAVAILABLE ,
412
+ details : 'No connection established to xDS server' ,
413
+ metadata : new Metadata ( )
414
+ } ) ;
415
+ }
416
+ this . adsClient . getChannel ( ) . watchConnectivityState ( state , Infinity , ( ) => {
417
+ this . handleAdsConnectivityStateUpdate ( ) ;
418
+ } ) ;
419
+ }
420
+
397
421
private handleAdsResponse ( message : DiscoveryResponse__Output ) {
422
+ this . receivedAdsResponseOnCurrentStream = true ;
423
+ this . adsBackoff . reset ( ) ;
398
424
let handleResponseResult : {
399
425
result : HandleResponseResult ;
400
426
serviceKind : AdsServiceKind ;
@@ -466,7 +492,7 @@ export class XdsClient {
466
492
'ADS stream ended. code=' + streamStatus . code + ' details= ' + streamStatus . details
467
493
) ;
468
494
this . adsCall = null ;
469
- if ( streamStatus . code !== status . OK ) {
495
+ if ( streamStatus . code !== status . OK && ! this . receivedAdsResponseOnCurrentStream ) {
470
496
this . reportStreamError ( streamStatus ) ;
471
497
}
472
498
/* If the backoff timer is no longer running, we do not need to wait any
@@ -496,7 +522,9 @@ export class XdsClient {
496
522
if ( this . adsCall !== null ) {
497
523
return ;
498
524
}
499
- this . adsCall = this . adsClient . StreamAggregatedResources ( ) ;
525
+ this . receivedAdsResponseOnCurrentStream = false ;
526
+ const metadata = new Metadata ( { waitForReady : true } ) ;
527
+ this . adsCall = this . adsClient . StreamAggregatedResources ( metadata ) ;
500
528
this . adsCall . on ( 'data' , ( message : DiscoveryResponse__Output ) => {
501
529
this . handleAdsResponse ( message ) ;
502
530
} ) ;
@@ -515,7 +543,9 @@ export class XdsClient {
515
543
this . updateNames ( service ) ;
516
544
}
517
545
}
518
- this . reportAdsStreamStarted ( ) ;
546
+ if ( this . adsClient . getChannel ( ) . getConnectivityState ( false ) === connectivityState . READY ) {
547
+ this . reportAdsStreamStarted ( ) ;
548
+ }
519
549
}
520
550
521
551
private maybeSendAdsMessage ( typeUrl : string , resourceNames : string [ ] , responseNonce : string , versionInfo : string , errorMessage ?: string ) {
@@ -547,10 +577,6 @@ export class XdsClient {
547
577
* version info are updated so that it sends the post-update values.
548
578
*/
549
579
ack ( serviceKind : AdsServiceKind ) {
550
- /* An ack is the best indication of a successful interaction between the
551
- * client and the server, so we can reset the backoff timer here. */
552
- this . adsBackoff . reset ( ) ;
553
-
554
580
this . updateNames ( serviceKind ) ;
555
581
}
556
582
0 commit comments