Skip to content

Commit c6db989

Browse files
committed
Add flag to guard xds fallback
1 parent 3608277 commit c6db989

File tree

6 files changed

+52
-20
lines changed

6 files changed

+52
-20
lines changed

census/src/test/java/io/grpc/census/CensusModulesTest.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import io.grpc.ClientInterceptors;
5757
import io.grpc.ClientStreamTracer;
5858
import io.grpc.Context;
59+
import io.grpc.KnownLength;
5960
import io.grpc.Metadata;
6061
import io.grpc.MethodDescriptor;
6162
import io.grpc.ServerCall;
@@ -99,6 +100,7 @@
99100
import io.opencensus.trace.Tracer;
100101
import io.opencensus.trace.propagation.BinaryFormat;
101102
import io.opencensus.trace.propagation.SpanContextParseException;
103+
import java.io.IOException;
102104
import java.io.InputStream;
103105
import java.util.HashSet;
104106
import java.util.List;
@@ -136,7 +138,7 @@ public class CensusModulesTest {
136138
ClientStreamTracer.StreamInfo.newBuilder()
137139
.setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, 10L)).build();
138140

139-
private static class StringInputStream extends InputStream {
141+
private static class StringInputStream extends InputStream implements KnownLength {
140142
final String string;
141143

142144
StringInputStream(String string) {
@@ -149,6 +151,11 @@ public int read() {
149151
// passed to the InProcess server and consumed by MARSHALLER.parse().
150152
throw new UnsupportedOperationException("Should not be called");
151153
}
154+
155+
@Override
156+
public int available() throws IOException {
157+
return string == null ? 0 : string.length();
158+
}
152159
}
153160

154161
private static final MethodDescriptor.Marshaller<String> MARSHALLER =

xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java

+20
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@
4141
@Internal
4242
public abstract class BootstrapperImpl extends Bootstrapper {
4343

44+
public static final String GRPC_EXPERIMENTAL_XDS_FALLBACK =
45+
"GRPC_EXPERIMENTAL_XDS_FALLBACK";
46+
private static Boolean xdsFallbackEnabled = null;
47+
4448
// Client features.
4549
@VisibleForTesting
4650
public static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING =
@@ -59,11 +63,21 @@ protected BootstrapperImpl() {
5963
logger = XdsLogger.withLogId(InternalLogId.allocate("bootstrapper", null));
6064
}
6165

66+
// Delayed initialization of xdsFallbackEnabled to allow for flag initialization.
67+
public static boolean isEnabledXdsFallback() {
68+
if (xdsFallbackEnabled == null) {
69+
xdsFallbackEnabled = GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_FALLBACK, false);
70+
}
71+
return xdsFallbackEnabled;
72+
}
73+
6274
protected abstract String getJsonContent() throws IOException, XdsInitializationException;
6375

6476
protected abstract Object getImplSpecificConfig(Map<String, ?> serverConfig, String serverUri)
6577
throws XdsInitializationException;
6678

79+
80+
6781
/**
6882
* Reads and parses bootstrap config. The config is expected to be in JSON format.
6983
*/
@@ -102,6 +116,9 @@ protected BootstrapInfo.Builder bootstrapBuilder(Map<String, ?> rawData)
102116
throw new XdsInitializationException("Invalid bootstrap: 'xds_servers' does not exist.");
103117
}
104118
List<ServerInfo> servers = parseServerInfos(rawServerConfigs, logger);
119+
if (servers.size() > 1 && !isEnabledXdsFallback()) {
120+
servers = ImmutableList.of(servers.get(0));
121+
}
105122
builder.servers(servers);
106123

107124
Node.Builder nodeBuilder = Node.newBuilder();
@@ -208,6 +225,9 @@ protected BootstrapInfo.Builder bootstrapBuilder(Map<String, ?> rawData)
208225
if (rawAuthorityServers == null || rawAuthorityServers.isEmpty()) {
209226
authorityServers = servers;
210227
} else {
228+
if (rawAuthorityServers.size() > 1 && !isEnabledXdsFallback()) {
229+
rawAuthorityServers = ImmutableList.of(rawAuthorityServers.get(0));
230+
}
211231
authorityServers = parseServerInfos(rawAuthorityServers, logger);
212232
}
213233
authorityInfoMapBuilder.put(

xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java

+13-14
Original file line numberDiff line numberDiff line change
@@ -402,11 +402,7 @@ public void run() {
402402
public void onStatusReceived(final Status status) {
403403
lastStateWasReady = false;
404404
syncContext.execute(() -> {
405-
if (status.isOk()) {
406-
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
407-
} else {
408-
handleRpcStreamClosed(status);
409-
}
405+
handleRpcStreamClosed(status);
410406
});
411407
}
412408

@@ -425,7 +421,7 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
425421
processingTracker.onComplete();
426422
}
427423

428-
private void handleRpcStreamClosed(Status error) {
424+
private void handleRpcStreamClosed(Status status) {
429425
if (closed) {
430426
return;
431427
}
@@ -440,15 +436,18 @@ private void handleRpcStreamClosed(Status error) {
440436
// concurrently with the stopwatch and schedule.
441437
long delayNanos = scheduleRpcRetry();
442438

443-
checkArgument(!error.isOk(), "unexpected OK status");
444-
String errorMsg = error.getDescription() != null
445-
&& error.getDescription().equals(CLOSED_BY_SERVER)
446-
? "ADS stream closed with status {0}: {1}. Cause: {2}"
447-
: "ADS stream failed with status {0}: {1}. Cause: {2}";
448-
logger.log(
449-
XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
439+
if (status.isOk()) {
440+
logger.log(XdsLogLevel.WARNING, "ADS stream closed, backoff {0} ns", delayNanos);
441+
} else {
442+
String errorMsg = status.getDescription() != null
443+
&& status.getDescription().equals(CLOSED_BY_SERVER)
444+
? "ADS stream closed with status {0}: {1}. Cause: {2}"
445+
: "ADS stream failed with status {0}: {1}. Cause: {2}";
446+
logger.log(
447+
XdsLogLevel.ERROR, errorMsg, status.getCode(), status.getDescription(), status.getCause());
448+
}
450449
closed = true;
451-
xdsResponseHandler.handleStreamClosed(error);
450+
xdsResponseHandler.handleStreamClosed(status);
452451
cleanUp();
453452

454453
logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.common.base.Preconditions.checkArgument;
2020
import static com.google.common.base.Preconditions.checkNotNull;
2121
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22+
import static io.grpc.xds.client.ControlPlaneClient.CLOSED_BY_SERVER;
2223
import static io.grpc.xds.client.XdsResourceType.ParsedResource;
2324
import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate;
2425

@@ -996,28 +997,30 @@ public void handleResourceResponse(
996997
}
997998

998999
@Override
999-
public void handleStreamClosed(Status error) {
1000+
public void handleStreamClosed(Status status) {
10001001
syncContext.throwIfNotInThisSynchronizationContext();
10011002
ControlPlaneClient cpcForThisStream = serverCpClientMap.get(serverInfo);
10021003
cleanUpResourceTimers(cpcForThisStream);
10031004

1004-
boolean hadError = false;
1005+
Status error = status.isOk() ? Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER) : status;
1006+
boolean checkForFallback = !status.isOk() && BootstrapperImpl.isEnabledXdsFallback();
1007+
10051008
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
10061009
resourceSubscribers.values()) {
10071010
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
10081011
if (subscriber.hasResult()
10091012
|| (subscriber.cpcFixed && !cpcForThisStream.equals(subscriber.controlPlaneClient))) {
10101013
continue;
10111014
}
1012-
if (!hadError) {
1013-
logger.log(XdsLogLevel.WARNING, "ADS stream closed with error: {0}", error);
1015+
if (checkForFallback) {
10141016
// try to fallback to lower priority control plane client
10151017
if (doFallbackForAuthority(cpcForThisStream,
10161018
serverInfo, subscriber.serverInfos, subscriber.authority)) {
10171019
return;
10181020
}
1021+
checkForFallback = false;
10191022
}
1020-
hadError = true;
1023+
10211024
subscriber.onError(error, null);
10221025
}
10231026
}

xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class GrpcBootstrapperImplTest {
6565
@Before
6666
public void setUp() {
6767
saveEnvironment();
68+
System.setProperty(BootstrapperImpl.GRPC_EXPERIMENTAL_XDS_FALLBACK, "true");
6869
bootstrapper.bootstrapPathFromEnvVar = BOOTSTRAP_FILE_PATH;
6970
}
7071

xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.common.collect.ImmutableMap;
3131
import io.grpc.Status;
3232
import io.grpc.internal.ObjectPool;
33+
import io.grpc.xds.client.BootstrapperImpl;
3334
import io.grpc.xds.client.XdsClient;
3435
import io.grpc.xds.client.XdsClientImpl;
3536
import io.grpc.xds.client.XdsInitializationException;
@@ -119,6 +120,7 @@ public void onResourceDoesNotExist(String resourceName) {
119120

120121
@Before
121122
public void setUp() throws XdsInitializationException {
123+
System.setProperty(BootstrapperImpl.GRPC_EXPERIMENTAL_XDS_FALLBACK, "true");
122124
setAdsConfig(mainTdServer, MAIN_SERVER);
123125
setAdsConfig(fallbackServer, FALLBACK_SERVER);
124126

0 commit comments

Comments
 (0)