From 3ee25dbd64d03b24272657769a103a3f8c467bd1 Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Mon, 9 Dec 2024 13:36:40 -0800 Subject: [PATCH 1/3] Rebased onto master --- .../io/grpc/xds/client/BootstrapperImpl.java | 13 + .../grpc/xds/client/ControlPlaneClient.java | 226 +++++-- .../java/io/grpc/xds/client/XdsClient.java | 37 +- .../io/grpc/xds/client/XdsClientImpl.java | 566 +++++++++++++----- .../java/io/grpc/xds/ControlPlaneRule.java | 109 +++- .../java/io/grpc/xds/CsdsServiceTest.java | 15 +- .../io/grpc/xds/GrpcBootstrapperImplTest.java | 4 + .../grpc/xds/GrpcXdsClientImplTestBase.java | 59 +- .../io/grpc/xds/XdsClientFallbackTest.java | 522 ++++++++++++++++ .../grpc/xds/XdsSecurityClientServerTest.java | 1 + .../grpc/xds/XdsTestControlPlaneService.java | 8 + .../CommonBootstrapperTestUtils.java | 84 ++- .../ClientSslContextProviderFactoryTest.java | 2 +- .../SecurityProtocolNegotiatorsTest.java | 2 +- .../ServerSslContextProviderFactoryTest.java | 2 +- .../security/TlsContextManagerTest.java | 2 +- ...tProviderClientSslContextProviderTest.java | 2 +- ...tProviderServerSslContextProviderTest.java | 2 +- 18 files changed, 1365 insertions(+), 291 deletions(-) create mode 100644 xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java rename xds/src/test/java/io/grpc/xds/{ => client}/CommonBootstrapperTestUtils.java (65%) diff --git a/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java b/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java index 9930417348b..a48313fd21e 100644 --- a/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java @@ -41,6 +41,9 @@ @Internal public abstract class BootstrapperImpl extends Bootstrapper { + public static final String GRPC_EXPERIMENTAL_XDS_FALLBACK = + "GRPC_EXPERIMENTAL_XDS_FALLBACK"; + // Client features. @VisibleForTesting public static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING = @@ -52,6 +55,9 @@ public abstract class BootstrapperImpl extends Bootstrapper { private static final String SERVER_FEATURE_IGNORE_RESOURCE_DELETION = "ignore_resource_deletion"; private static final String SERVER_FEATURE_TRUSTED_XDS_SERVER = "trusted_xds_server"; + @VisibleForTesting + static boolean enableXdsFallback = GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_FALLBACK, false); + protected final XdsLogger logger; protected FileReader reader = LocalFileReader.INSTANCE; @@ -65,6 +71,7 @@ protected BootstrapperImpl() { protected abstract Object getImplSpecificConfig(Map serverConfig, String serverUri) throws XdsInitializationException; + /** * Reads and parses bootstrap config. The config is expected to be in JSON format. */ @@ -103,6 +110,9 @@ protected BootstrapInfo.Builder bootstrapBuilder(Map rawData) throw new XdsInitializationException("Invalid bootstrap: 'xds_servers' does not exist."); } List servers = parseServerInfos(rawServerConfigs, logger); + if (servers.size() > 1 && !enableXdsFallback) { + servers = ImmutableList.of(servers.get(0)); + } builder.servers(servers); Node.Builder nodeBuilder = Node.newBuilder(); @@ -209,6 +219,9 @@ protected BootstrapInfo.Builder bootstrapBuilder(Map rawData) if (rawAuthorityServers == null || rawAuthorityServers.isEmpty()) { authorityServers = servers; } else { + if (rawAuthorityServers.size() > 1 && !enableXdsFallback) { + rawAuthorityServers = ImmutableList.of(rawAuthorityServers.get(0)); + } authorityServers = parseServerInfos(rawAuthorityServers, logger); } authorityInfoMapBuilder.put( diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index 62076fb8bf1..047f8a2e315 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -39,7 +39,6 @@ import io.grpc.xds.client.XdsClient.ResourceStore; import io.grpc.xds.client.XdsClient.XdsResponseHandler; import io.grpc.xds.client.XdsLogger.XdsLogLevel; -import io.grpc.xds.client.XdsTransportFactory.EventHandler; import io.grpc.xds.client.XdsTransportFactory.StreamingCall; import io.grpc.xds.client.XdsTransportFactory.XdsTransport; import java.util.Collection; @@ -70,7 +69,6 @@ final class ControlPlaneClient { private final BackoffPolicy.Provider backoffPolicyProvider; private final Stopwatch stopwatch; private final Node bootstrapNode; - private final XdsClient xdsClient; // Last successfully applied version_info for each resource type. Starts with empty string. // A version_info is used to update management server with client's most recent knowledge of @@ -78,14 +76,15 @@ final class ControlPlaneClient { private final Map, String> versions = new HashMap<>(); private boolean shutdown; - private boolean streamClosedNoResponse; + private boolean inError; + @Nullable private AdsStream adsStream; @Nullable private BackoffPolicy retryBackoffPolicy; @Nullable private ScheduledHandle rpcRetryTimer; - private MessagePrettyPrinter messagePrinter; + private final MessagePrettyPrinter messagePrinter; /** An entity that manages ADS RPCs over a single channel. */ ControlPlaneClient( @@ -99,7 +98,6 @@ final class ControlPlaneClient { SynchronizationContext syncContext, BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier, - XdsClient xdsClient, MessagePrettyPrinter messagePrinter) { this.serverInfo = checkNotNull(serverInfo, "serverInfo"); this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport"); @@ -109,7 +107,6 @@ final class ControlPlaneClient { this.timeService = checkNotNull(timeService, "timeService"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); - this.xdsClient = checkNotNull(xdsClient, "xdsClient"); this.messagePrinter = checkNotNull(messagePrinter, "messagePrinter"); stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get(); logId = InternalLogId.allocate("xds-client", serverInfo.target()); @@ -139,22 +136,36 @@ public String toString() { return logId.toString(); } + public ServerInfo getServerInfo() { + return serverInfo; + } + /** * Updates the resource subscription for the given resource type. */ // Must be synchronized. void adjustResourceSubscription(XdsResourceType resourceType) { - if (isInBackoff()) { + if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { return; } if (adsStream == null) { startRpcStream(); + // when the stream becomes ready, it will send the discovery requests + return; + } + + // We will do the rest of the method as part of the readyHandler when the stream is ready. + if (!isConnected()) { + return; } + Collection resources = resourceStore.getSubscribedResources(serverInfo, resourceType); if (resources == null) { resources = Collections.emptyList(); } adsStream.sendDiscoveryRequest(resourceType, resources); + resourceStore.startMissingResourceTimers(resources, resourceType); + if (resources.isEmpty()) { // The resource type no longer has subscribing resources; clean up references to it versions.remove(resourceType); @@ -194,50 +205,44 @@ void nackResponse(XdsResourceType type, String nonce, String errorDetail) { adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail); } - /** - * Returns {@code true} if the resource discovery is currently in backoff. - */ // Must be synchronized. - boolean isInBackoff() { - return rpcRetryTimer != null && rpcRetryTimer.isPending(); + boolean isReady() { + return adsStream != null && adsStream.call != null + && adsStream.call.isReady() && !adsStream.closed; } - // Must be synchronized. - boolean isReady() { - return adsStream != null && adsStream.call != null && adsStream.call.isReady(); + boolean isConnected() { + return adsStream != null && adsStream.sentInitialRequest; } /** - * Starts a timer for each requested resource that hasn't been responded to and - * has been waiting for the channel to get ready. + * Used for identifying whether or not when getting a control plane for authority that this + * control plane should be skipped over if there is a fallback. + * + *

Also used by metric to consider this control plane to not be "active". + * + *

A ControlPlaneClient is considered to be in error during the time from when an + * {@link AdsStream} closed without having received a response to the time an AdsStream does + * receive a response. */ - // Must be synchronized. - void readyHandler() { - if (!isReady()) { - return; - } - - if (isInBackoff()) { - rpcRetryTimer.cancel(); - rpcRetryTimer = null; - } - - xdsClient.startSubscriberTimersIfNeeded(serverInfo); + boolean isInError() { + return inError; } + /** - * Indicates whether there is an active ADS stream. - * - *

Return {@code true} when the {@code AdsStream} is created. - * {@code false} when the ADS stream fails without a response. Resets to true - * upon receiving the first response on a new ADS stream. + * Cleans up outstanding rpcRetryTimer if present, since we are communicating. + * If we haven't sent the initial discovery request for this RPC stream, we will delegate to + * xdsResponseHandler (in practice XdsClientImpl) to do any initialization for a new active + * stream such as starting timers. We then send the initial discovery request. */ - // Must be synchronized - boolean hasWorkingAdsStream() { - return !streamClosedNoResponse; + // Must be synchronized. + void readyHandler(boolean shouldSendInitialRequest) { + if (shouldSendInitialRequest) { + sendDiscoveryRequests(); + } } - /** * Establishes the RPC connection by creating a new RPC stream on the given channel for * xDS protocol communication. @@ -245,28 +250,51 @@ boolean hasWorkingAdsStream() { // Must be synchronized. private void startRpcStream() { checkState(adsStream == null, "Previous adsStream has not been cleared yet"); + + if (rpcRetryTimer != null) { + rpcRetryTimer.cancel(); + rpcRetryTimer = null; + } + adsStream = new AdsStream(); + adsStream.start(); logger.log(XdsLogLevel.INFO, "ADS stream started"); stopwatch.reset().start(); } + void sendDiscoveryRequests() { + if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { + return; + } + + if (adsStream == null) { + startRpcStream(); + // when the stream becomes ready, it will send the discovery requests + return; + } + + if (isConnected()) { + Set> subscribedResourceTypes = + new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values()); + + for (XdsResourceType type : subscribedResourceTypes) { + adjustResourceSubscription(type); + } + } + } + @VisibleForTesting public final class RpcRetryTask implements Runnable { @Override public void run() { + logger.log(XdsLogLevel.DEBUG, "Retry timeout. Restart ADS stream {0}", logId); if (shutdown) { return; } + startRpcStream(); - Set> subscribedResourceTypes = - new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values()); - for (XdsResourceType type : subscribedResourceTypes) { - Collection resources = resourceStore.getSubscribedResources(serverInfo, type); - if (resources != null) { - adsStream.sendDiscoveryRequest(type, resources); - } - } - xdsResponseHandler.handleStreamRestarted(serverInfo); + + // handling CPC management is triggered in readyHandler } } @@ -276,8 +304,9 @@ XdsResourceType fromTypeUrl(String typeUrl) { return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl); } - private class AdsStream implements EventHandler { + private class AdsStream implements XdsTransportFactory.EventHandler { private boolean responseReceived; + private boolean sentInitialRequest; private boolean closed; // Response nonce for the most recently received discovery responses of each resource type. // Client initiated requests start response nonce with empty string. @@ -293,6 +322,9 @@ private class AdsStream implements EventHandler { private AdsStream() { this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(), methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller()); + } + + void start() { call.start(this); } @@ -338,7 +370,19 @@ final void sendDiscoveryRequest(XdsResourceType type, Collection reso @Override public void onReady() { - syncContext.execute(ControlPlaneClient.this::readyHandler); + syncContext.execute(() -> { + if (!isReady()) { + logger.log(XdsLogLevel.DEBUG, + "ADS stream ready handler called, but not ready {0}", logId); + return; + } + + logger.log(XdsLogLevel.DEBUG, "ADS stream ready {0}", logId); + + boolean hadSentInitialRequest = sentInitialRequest; + sentInitialRequest = true; + readyHandler(!hadSentInitialRequest); + }); } @Override @@ -346,8 +390,13 @@ public void onRecvMessage(DiscoveryResponse response) { syncContext.execute(new Runnable() { @Override public void run() { - // Reset flag as message has been received on a stream - streamClosedNoResponse = false; + if (closed) { + return; + } + boolean isFirstResponse = !responseReceived; + responseReceived = true; + inError = false; + XdsResourceType type = fromTypeUrl(response.getTypeUrl()); if (logger.isLoggable(XdsLogLevel.DEBUG)) { logger.log( @@ -364,7 +413,7 @@ public void run() { return; } handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(), - response.getNonce()); + response.getNonce(), isFirstResponse); } }); } @@ -377,17 +426,14 @@ public void onStatusReceived(final Status status) { } final void handleRpcResponse(XdsResourceType type, String versionInfo, List resources, - String nonce) { + String nonce, boolean isFirstResponse) { checkNotNull(type, "type"); - if (closed) { - return; - } - responseReceived = true; + respNonces.put(type, nonce); ProcessingTracker processingTracker = new ProcessingTracker( () -> call.startRecvMessage(), syncContext); xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce, - processingTracker); + isFirstResponse, processingTracker); processingTracker.onComplete(); } @@ -401,13 +447,16 @@ private void handleRpcStreamClosed(Status status) { // has never been initialized. retryBackoffPolicy = backoffPolicyProvider.get(); } + // FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks // to avoid TSAN races, since tests may wait until callbacks are called but then would run // concurrently with the stopwatch and schedule. + long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed); - rpcRetryTimer = syncContext.schedule( - new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); + + rpcRetryTimer = + syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); Status newStatus = status; if (responseReceived) { @@ -424,9 +473,9 @@ private void handleRpcStreamClosed(Status status) { "ADS stream closed by server after a response was received"); } } else { - streamClosedNoResponse = true; // If the ADS stream is closed without ever having received a response from the server, then // the XdsClient should consider that a connectivity error (see gRFC A57). + inError = true; if (status.isOk()) { newStatus = Status.UNAVAILABLE.withDescription( "ADS stream closed with OK before receiving a response"); @@ -437,10 +486,8 @@ private void handleRpcStreamClosed(Status status) { } closed = true; - xdsResponseHandler.handleStreamClosed(newStatus); + xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived); cleanUp(); - - logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos); } private void close(Exception error) { @@ -458,4 +505,55 @@ private void cleanUp() { } } } + + @VisibleForTesting + static class FailingXdsTransport implements XdsTransport { + Status error; + + public FailingXdsTransport(Status error) { + this.error = error; + } + + @Override + public StreamingCall + createStreamingCall(String fullMethodName, + MethodDescriptor.Marshaller reqMarshaller, + MethodDescriptor.Marshaller respMarshaller) { + return new FailingXdsStreamingCall<>(); + } + + @Override + public void shutdown() { + // no-op + } + + private class FailingXdsStreamingCall implements StreamingCall { + + @Override + public void start(XdsTransportFactory.EventHandler eventHandler) { + eventHandler.onStatusReceived(error); + } + + @Override + public void sendMessage(ReqT message) { + // no-op + } + + @Override + public void startRecvMessage() { + // no-op + } + + @Override + public void sendError(Exception e) { + // no-op + } + + @Override + public boolean isReady() { + return false; + } + } + } + } diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClient.java b/xds/src/main/java/io/grpc/xds/client/XdsClient.java index 06f15005c22..36f8bd591c7 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClient.java @@ -306,14 +306,6 @@ public Object getSecurityConfig() { throw new UnsupportedOperationException(); } - /** - * For all subscriber's for the specified server, if the resource hasn't yet been - * resolved then start a timer for it. - */ - protected void startSubscriberTimersIfNeeded(ServerInfo serverInfo) { - throw new UnsupportedOperationException(); - } - /** * Returns a {@link ListenableFuture} to the snapshot of the subscribed resources as * they are at the moment of the call. @@ -428,30 +420,39 @@ interface XdsResponseHandler { /** Called when a xds response is received. */ void handleResourceResponse( XdsResourceType resourceType, ServerInfo serverInfo, String versionInfo, - List resources, String nonce, ProcessingTracker processingTracker); + List resources, String nonce, boolean isFirstResponse, + ProcessingTracker processingTracker); /** Called when the ADS stream is closed passively. */ // Must be synchronized. - void handleStreamClosed(Status error); - - /** Called when the ADS stream has been recreated. */ - // Must be synchronized. - void handleStreamRestarted(ServerInfo serverInfo); + void handleStreamClosed(Status error, boolean shouldTryFallback); } public interface ResourceStore { + /** - * Returns the collection of resources currently subscribing to or {@code null} if not - * subscribing to any resources for the given type. + * Returns the collection of resources currently subscribed to which have an authority matching + * one of those for which the ControlPlaneClient associated with the specified ServerInfo is + * the active one, or {@code null} if no such resources are currently subscribed to. * *

Note an empty collection indicates subscribing to resources of the given type with * wildcard mode. + * + * @param serverInfo the xds server to get the resources from + * @param type the type of the resources that should be retrieved */ // Must be synchronized. @Nullable - Collection getSubscribedResources(ServerInfo serverInfo, - XdsResourceType type); + Collection getSubscribedResources( + ServerInfo serverInfo, XdsResourceType type); Map> getSubscribedResourceTypesWithTypeUrl(); + + /** + * For any of the subscribers to one of the specified resources, if there isn't a result or + * an existing timer for the resource, start a timer for the resource. + */ + void startMissingResourceTimers(Collection resourceNames, + XdsResourceType resourceType); } } diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 529ac2747df..0368e72efc0 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -26,8 +26,8 @@ import com.google.common.base.Joiner; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.Any; @@ -42,10 +42,13 @@ import io.grpc.xds.client.Bootstrapper.ServerInfo; import io.grpc.xds.client.XdsClient.ResourceStore; import io.grpc.xds.client.XdsLogger.XdsLogLevel; +import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -54,6 +57,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -74,21 +78,25 @@ public void uncaughtException(Thread t, Throwable e) { XdsLogLevel.ERROR, "Uncaught exception in XdsClient SynchronizationContext. Panic!", e); - // TODO(chengyuanzhang): better error handling. + // TODO: better error handling. throw new AssertionError(e); } }); - private final Map loadStatsManagerMap = - new HashMap<>(); - final Map serverLrsClientMap = - new HashMap<>(); - + private final Map loadStatsManagerMap = new HashMap<>(); + final Map serverLrsClientMap = new HashMap<>(); + /** Map of authority to its activated control plane client (affected by xds fallback). + * The last entry in the list for each value is the "active" CPC for the matching key */ + private final Map> activatedCpClients = new HashMap<>(); private final Map serverCpClientMap = new HashMap<>(); + + /** Maps resource type to the corresponding map of subscribers (keyed by resource name). */ private final Map, Map>> resourceSubscribers = new HashMap<>(); + /** Maps typeUrl to the corresponding XdsResourceType. */ private final Map> subscribedResourceTypeUrls = new HashMap<>(); + private final XdsTransportFactory xdsTransportFactory; private final Bootstrapper.BootstrapInfo bootstrapInfo; private final ScheduledExecutorService timeService; @@ -126,48 +134,6 @@ public XdsClientImpl( logger.log(XdsLogLevel.INFO, "Created"); } - private void handleResourceResponse( - XdsResourceType xdsResourceType, ServerInfo serverInfo, String versionInfo, - List resources, String nonce, ProcessingTracker processingTracker) { - checkNotNull(xdsResourceType, "xdsResourceType"); - syncContext.throwIfNotInThisSynchronizationContext(); - Set toParseResourceNames = - xdsResourceType.shouldRetrieveResourceKeysForArgs() - ? getResourceKeys(xdsResourceType) - : null; - XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce, - bootstrapInfo, securityConfig, toParseResourceNames); - handleResourceUpdate(args, resources, xdsResourceType, processingTracker); - } - - private void handleStreamClosed(Status error, ServerInfo serverInfo) { - syncContext.throwIfNotInThisSynchronizationContext(); - cleanUpResourceTimers(); - if (!error.isOk()) { - metricReporter.reportServerFailure(1L, serverInfo.target()); - for (Map> subscriberMap : - resourceSubscribers.values()) { - for (ResourceSubscriber subscriber : subscriberMap.values()) { - if (!subscriber.hasResult()) { - subscriber.onError(error, null); - } - } - } - } - } - - private void handleStreamRestarted(ServerInfo serverInfo) { - syncContext.throwIfNotInThisSynchronizationContext(); - for (Map> subscriberMap : - resourceSubscribers.values()) { - for (ResourceSubscriber subscriber : subscriberMap.values()) { - if (subscriber.serverInfo.equals(serverInfo)) { - subscriber.restartTimer(); - } - } - } - } - @Override public void shutdown() { syncContext.execute( @@ -184,7 +150,8 @@ public void run() { for (final LoadReportClient lrsClient : serverLrsClientMap.values()) { lrsClient.stopLoadReporting(); } - cleanUpResourceTimers(); + cleanUpResourceTimers(null); + activatedCpClients.clear(); } }); } @@ -199,20 +166,53 @@ public Map> getSubscribedResourceTypesWithTypeUrl() { return Collections.unmodifiableMap(subscribedResourceTypeUrls); } + private ControlPlaneClient getActiveCpc(String authority) { + List controlPlaneClients = activatedCpClients.get(authority); + if (controlPlaneClients == null || controlPlaneClients.isEmpty()) { + return null; + } + + return controlPlaneClients.get(controlPlaneClients.size() - 1); + } + @Nullable @Override - public Collection getSubscribedResources(ServerInfo serverInfo, - XdsResourceType type) { + public Collection getSubscribedResources( + ServerInfo serverInfo, XdsResourceType type) { + ControlPlaneClient targetCpc = serverCpClientMap.get(serverInfo); + if (targetCpc == null) { + return null; + } + + // This should include all of the authorities that targetCpc or a fallback from it is serving + List authorities = activatedCpClients.entrySet().stream() + .filter(entry -> entry.getValue().contains(targetCpc)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + Map> resources = resourceSubscribers.getOrDefault(type, Collections.emptyMap()); - ImmutableSet.Builder builder = ImmutableSet.builder(); - for (String key : resources.keySet()) { - if (resources.get(key).serverInfo.equals(serverInfo)) { - builder.add(key); + + Collection retVal = resources.entrySet().stream() + .filter(entry -> authorities.contains(entry.getValue().authority)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + + return retVal.isEmpty() ? null : retVal; + } + + @Override + public void startMissingResourceTimers(Collection resourceNames, + XdsResourceType resourceType) { + Map> subscriberMap = + resourceSubscribers.get(resourceType); + + for (String resourceName : resourceNames) { + ResourceSubscriber subscriber = subscriberMap.get(resourceName); + if (subscriber.respTimer == null && !subscriber.hasResult()) { + subscriber.restartTimer(); } } - Collection retVal = builder.build(); - return retVal.isEmpty() ? null : retVal; } // As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic. @@ -228,7 +228,7 @@ public void run() { // A map from a "resource type" to a map ("resource name": "resource metadata") ImmutableMap.Builder, Map> metadataSnapshot = ImmutableMap.builder(); - for (XdsResourceType resourceType: resourceSubscribers.keySet()) { + for (XdsResourceType resourceType : resourceSubscribers.keySet()) { ImmutableMap.Builder metadataMap = ImmutableMap.builder(); for (Map.Entry> resourceEntry : resourceSubscribers.get(resourceType).entrySet()) { @@ -249,9 +249,9 @@ public Object getSecurityConfig() { @Override public void watchXdsResource(XdsResourceType type, - String resourceName, - ResourceWatcher watcher, - Executor watcherExecutor) { + String resourceName, + ResourceWatcher watcher, + Executor watcherExecutor) { syncContext.execute(new Runnable() { @Override @SuppressWarnings("unchecked") @@ -262,36 +262,125 @@ public void run() { } ResourceSubscriber subscriber = (ResourceSubscriber) resourceSubscribers.get(type).get(resourceName); + if (subscriber == null) { logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName); subscriber = new ResourceSubscriber<>(type, resourceName); resourceSubscribers.get(type).put(resourceName, subscriber); - if (subscriber.controlPlaneClient != null) { - subscriber.controlPlaneClient.adjustResourceSubscription(type); + + if (subscriber.errorDescription == null) { + CpcWithFallbackState cpcToUse = manageControlPlaneClient(subscriber); + if (cpcToUse.cpc != null) { + cpcToUse.cpc.adjustResourceSubscription(type); + } } } + subscriber.addWatcher(watcher, watcherExecutor); } }); } + /** + * Gets a ControlPlaneClient for the subscriber's authority, creating one if necessary. + * If there already was an active CPC for this authority, and it is different from the one + * identified, then do fallback to the identified one (cpcToUse). + * + * @return identified CPC or {@code null} (if there are no valid ServerInfos associated with the + * subscriber's authority or CPC's for all are in backoff), and whether did a fallback. + */ + @VisibleForTesting + private CpcWithFallbackState manageControlPlaneClient( + ResourceSubscriber subscriber) { + + ControlPlaneClient cpcToUse; + boolean didFallback = false; + try { + cpcToUse = getOrCreateControlPlaneClient(subscriber.authority); + } catch (IllegalArgumentException e) { + if (subscriber.errorDescription == null) { + subscriber.errorDescription = "Bad configuration: " + e.getMessage(); + } + + subscriber.onError( + Status.INVALID_ARGUMENT.withDescription(subscriber.errorDescription), null); + return new CpcWithFallbackState(null, false); + } catch (IOException e) { + logger.log(XdsLogLevel.DEBUG, + "Could not create a control plane client for authority {0}: {1}", + subscriber.authority, e.getMessage()); + return new CpcWithFallbackState(null, false); + } + + ControlPlaneClient activeCpClient = getActiveCpc(subscriber.authority); + if (cpcToUse != activeCpClient) { + addCpcToAuthority(subscriber.authority, cpcToUse); // makes it active + if (activeCpClient != null) { + didFallback = cpcToUse != null && !cpcToUse.isInError(); + if (didFallback) { + logger.log(XdsLogLevel.INFO, "Falling back to XDS server {0}", + cpcToUse.getServerInfo().target()); + } else { + logger.log(XdsLogLevel.WARNING, "No working fallback XDS Servers found from {0}", + activeCpClient.getServerInfo().target()); + } + } + } + + return new CpcWithFallbackState(cpcToUse, didFallback); + } + + private void addCpcToAuthority(String authority, ControlPlaneClient cpcToUse) { + List controlPlaneClients = + activatedCpClients.computeIfAbsent(authority, k -> new ArrayList<>()); + + if (controlPlaneClients.contains(cpcToUse)) { + return; + } + + // if there are any missing CPCs between the last one and cpcToUse, add them + add cpcToUse + ImmutableList serverInfos = getServerInfos(authority); + for (int i = controlPlaneClients.size(); i < serverInfos.size(); i++) { + ServerInfo serverInfo = serverInfos.get(i); + ControlPlaneClient cpc = serverCpClientMap.get(serverInfo); + controlPlaneClients.add(cpc); + logger.log(XdsLogLevel.DEBUG, "Adding control plane client {0} to authority {1}", + cpc, authority); + cpcToUse.sendDiscoveryRequests(); + if (cpc == cpcToUse) { + break; + } + } + } + @Override public void cancelXdsResourceWatch(XdsResourceType type, - String resourceName, - ResourceWatcher watcher) { + String resourceName, + ResourceWatcher watcher) { syncContext.execute(new Runnable() { @Override @SuppressWarnings("unchecked") public void run() { ResourceSubscriber subscriber = (ResourceSubscriber) resourceSubscribers.get(type).get(resourceName); + if (subscriber == null) { + logger.log(XdsLogLevel.WARNING, "double cancel of resource watch for {0}:{1}", + type.typeName(), resourceName); + return; + } subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.cancelResourceWatch(); resourceSubscribers.get(type).remove(resourceName); - if (subscriber.controlPlaneClient != null) { - subscriber.controlPlaneClient.adjustResourceSubscription(type); + + List controlPlaneClients = + activatedCpClients.get(subscriber.authority); + if (controlPlaneClients != null) { + controlPlaneClients.forEach((cpc) -> { + cpc.adjustResourceSubscription(type); + }); } + if (resourceSubscribers.get(type).isEmpty()) { resourceSubscribers.remove(type); subscribedResourceTypeUrls.remove(type.typeUrl()); @@ -344,30 +433,6 @@ public String toString() { return logId.toString(); } - @Override - protected void startSubscriberTimersIfNeeded(ServerInfo serverInfo) { - if (isShutDown()) { - return; - } - - syncContext.execute(new Runnable() { - @Override - public void run() { - if (isShutDown()) { - return; - } - - for (Map> subscriberMap : resourceSubscribers.values()) { - for (ResourceSubscriber subscriber : subscriberMap.values()) { - if (subscriber.serverInfo.equals(serverInfo) && subscriber.respTimer == null) { - subscriber.restartTimer(); - } - } - } - } - }); - } - private Set getResourceKeys(XdsResourceType xdsResourceType) { if (!resourceSubscribers.containsKey(xdsResourceType)) { return null; @@ -376,53 +441,74 @@ private Set getResourceKeys(XdsResourceType xdsResourceType) { return resourceSubscribers.get(xdsResourceType).keySet(); } - private void cleanUpResourceTimers() { + // cpcForThisStream is null when doing shutdown + private void cleanUpResourceTimers(ControlPlaneClient cpcForThisStream) { + Collection authoritiesForCpc = getActiveAuthorities(cpcForThisStream); + for (Map> subscriberMap : resourceSubscribers.values()) { for (ResourceSubscriber subscriber : subscriberMap.values()) { - subscriber.stopTimer(); + if (cpcForThisStream == null || authoritiesForCpc.contains(subscriber.authority)) { + subscriber.stopTimer(); + } + } + } + } + + private ControlPlaneClient getOrCreateControlPlaneClient(String authority) throws IOException { + // Optimize for the common case of a working ads stream already exists for the authority + ControlPlaneClient activeCpc = getActiveCpc(authority); + if (activeCpc != null && !activeCpc.isInError()) { + return activeCpc; + } + + ImmutableList serverInfos = getServerInfos(authority); + if (serverInfos == null) { + throw new IllegalArgumentException("No xds servers found for authority " + authority); + } + + for (ServerInfo serverInfo : serverInfos) { + ControlPlaneClient cpc = getOrCreateControlPlaneClient(serverInfo); + if (cpc.isInError()) { + continue; } + return cpc; } + + // Everything existed and is in backoff so throw + throw new IOException("All xds transports for authority " + authority + " are in backoff"); } - public ControlPlaneClient getOrCreateControlPlaneClient(ServerInfo serverInfo) { + private ControlPlaneClient getOrCreateControlPlaneClient(ServerInfo serverInfo) { syncContext.throwIfNotInThisSynchronizationContext(); if (serverCpClientMap.containsKey(serverInfo)) { return serverCpClientMap.get(serverInfo); } - XdsTransportFactory.XdsTransport xdsTransport = xdsTransportFactory.create(serverInfo); + logger.log(XdsLogLevel.DEBUG, "Creating control plane client for {0}", serverInfo.target()); + XdsTransportFactory.XdsTransport xdsTransport; + try { + xdsTransport = xdsTransportFactory.create(serverInfo); + } catch (Exception e) { + String msg = String.format("Failed to create xds transport for %s: %s", + serverInfo.target(), e.getMessage()); + logger.log(XdsLogLevel.WARNING, msg); + xdsTransport = + new ControlPlaneClient.FailingXdsTransport(Status.UNAVAILABLE.withDescription(msg)); + } + ControlPlaneClient controlPlaneClient = new ControlPlaneClient( xdsTransport, serverInfo, bootstrapInfo.node(), - new XdsResponseHandler() { - - @Override - public void handleResourceResponse( - XdsResourceType resourceType, ServerInfo serverInfo, String versionInfo, - List resources, String nonce, ProcessingTracker processingTracker) { - XdsClientImpl.this.handleResourceResponse(resourceType, serverInfo, versionInfo, - resources, nonce, - processingTracker); - } - - @Override - public void handleStreamClosed(Status error) { - XdsClientImpl.this.handleStreamClosed(error, serverInfo); - } - - @Override - public void handleStreamRestarted(ServerInfo serverInfo) { - XdsClientImpl.this.handleStreamRestarted(serverInfo); - } - }, + new ResponseHandler(serverInfo), this, timeService, syncContext, backoffPolicyProvider, stopwatchSupplier, - this, - messagePrinter); + messagePrinter + ); + serverCpClientMap.put(serverInfo, controlPlaneClient); LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier); @@ -441,32 +527,48 @@ public Map getServerLrsClientMap() { return ImmutableMap.copyOf(serverLrsClientMap); } - @Nullable - private ServerInfo getServerInfo(String resource) { + private String getAuthority(String resource) { + String authority; if (resource.startsWith(XDSTP_SCHEME)) { URI uri = URI.create(resource); - String authority = uri.getAuthority(); + authority = uri.getAuthority(); if (authority == null) { authority = ""; } + } else { + authority = null; + } + + return authority; + } + + @Nullable + private ImmutableList getServerInfos(String authority) { + if (authority != null) { AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority); if (authorityInfo == null || authorityInfo.xdsServers().isEmpty()) { return null; } - return authorityInfo.xdsServers().get(0); + return authorityInfo.xdsServers(); } else { - return bootstrapInfo.servers().get(0); // use first server + return bootstrapInfo.servers(); } } @SuppressWarnings("unchecked") private void handleResourceUpdate( XdsResourceType.Args args, List resources, XdsResourceType xdsResourceType, - ProcessingTracker processingTracker) { + boolean isFirstResponse, ProcessingTracker processingTracker) { + ControlPlaneClient controlPlaneClient = serverCpClientMap.get(args.serverInfo); + + if (isFirstResponse) { + shutdownLowerPriorityCpcs(controlPlaneClient); + } + ValidatedResourceUpdate result = xdsResourceType.parse(args, resources); logger.log(XdsLogger.XdsLogLevel.INFO, "Received {0} Response version {1} nonce {2}. Parsed resources: {3}", - xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources); + xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources); Map> parsedResources = result.parsedResources; Set invalidResources = result.invalidResources; metricReporter.reportResourceUpdates(Long.valueOf(parsedResources.size()), @@ -477,14 +579,14 @@ private void handleResourceUpdate( String errorDetail = null; if (errors.isEmpty()) { checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors"); - serverCpClientMap.get(args.serverInfo).ackResponse(xdsResourceType, args.versionInfo, + controlPlaneClient.ackResponse(xdsResourceType, args.versionInfo, args.nonce); } else { errorDetail = Joiner.on('\n').join(errors); logger.log(XdsLogLevel.WARNING, "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}", xdsResourceType.typeName(), args.versionInfo, args.nonce, errorDetail); - serverCpClientMap.get(args.serverInfo).nackResponse(xdsResourceType, args.nonce, errorDetail); + controlPlaneClient.nackResponse(xdsResourceType, args.nonce, errorDetail); } long updateTime = timeProvider.currentTimeNanos(); @@ -523,12 +625,53 @@ private void handleResourceUpdate( // For State of the World services, notify watchers when their watched resource is missing // from the ADS update. Note that we can only do this if the resource update is coming from // the same xDS server that the ResourceSubscriber is subscribed to. - if (subscriber.serverInfo.equals(args.serverInfo)) { - subscriber.onAbsent(processingTracker); + if (getActiveCpc(subscriber.authority) == controlPlaneClient) { + subscriber.onAbsent(processingTracker, args.serverInfo); } } } + @Override + public Future reportServerConnections(ServerConnectionCallback callback) { + SettableFuture future = SettableFuture.create(); + syncContext.execute(() -> { + serverCpClientMap.forEach((serverInfo, controlPlaneClient) -> + callback.reportServerConnectionGauge( + !controlPlaneClient.isInError(), serverInfo.target())); + future.set(null); + }); + return future; + } + + private void shutdownLowerPriorityCpcs(ControlPlaneClient activatedCpc) { + // For each authority, remove any control plane clients, with lower priority than the activated + // one, from activatedCpClients storing them all in cpcsToShutdown. + Set cpcsToShutdown = new HashSet<>(); + for ( List cpcsForAuth : activatedCpClients.values()) { + if (cpcsForAuth == null) { + continue; + } + int index = cpcsForAuth.indexOf(activatedCpc); + if (index > -1) { + cpcsToShutdown.addAll(cpcsForAuth.subList(index + 1, cpcsForAuth.size())); + cpcsForAuth.subList(index + 1, cpcsForAuth.size()).clear(); // remove lower priority cpcs + } + } + + // Shutdown any lower priority control plane clients identified above that aren't still being + // used by another authority. If they are still being used let the XDS server know that we + // no longer are interested in subscriptions for authorities we are no longer responsible for. + for (ControlPlaneClient cpc : cpcsToShutdown) { + if (activatedCpClients.values().stream().noneMatch(list -> list.contains(cpc))) { + cpc.shutdown(); + serverCpClientMap.remove(cpc.getServerInfo()); + } else { + cpc.sendDiscoveryRequests(); + } + } + } + + @Override public Future reportServerConnections(ServerConnectionCallback callback) { SettableFuture future = SettableFuture.create(); @@ -543,50 +686,52 @@ public Future reportServerConnections(ServerConnectionCallback callback) { /** Tracks a single subscribed resource. */ private final class ResourceSubscriber { - @Nullable private final ServerInfo serverInfo; - @Nullable private final ControlPlaneClient controlPlaneClient; + @Nullable + private final String authority; private final XdsResourceType type; private final String resource; private final Map, Executor> watchers = new HashMap<>(); - @Nullable private T data; + @Nullable + private T data; private boolean absent; // Tracks whether the deletion has been ignored per bootstrap server feature. // See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md private boolean resourceDeletionIgnored; - @Nullable private ScheduledHandle respTimer; - @Nullable private ResourceMetadata metadata; - @Nullable private String errorDescription; + @Nullable + private ScheduledHandle respTimer; + @Nullable + private ResourceMetadata metadata; + @Nullable + private String errorDescription; ResourceSubscriber(XdsResourceType type, String resource) { syncContext.throwIfNotInThisSynchronizationContext(); this.type = type; this.resource = resource; - this.serverInfo = getServerInfo(resource); - if (serverInfo == null) { + this.authority = getAuthority(resource); + if (getServerInfos(authority) == null) { this.errorDescription = "Wrong configuration: xds server does not exist for resource " + resource; - this.controlPlaneClient = null; return; } + // Initialize metadata in UNKNOWN state to cover the case when resource subscriber, // is created but not yet requested because the client is in backoff. this.metadata = ResourceMetadata.newResourceMetadataUnknown(); + } - ControlPlaneClient controlPlaneClient = null; - try { - controlPlaneClient = getOrCreateControlPlaneClient(serverInfo); - if (controlPlaneClient.isInBackoff()) { - return; - } - } catch (IllegalArgumentException e) { - controlPlaneClient = null; - this.errorDescription = "Bad configuration: " + e.getMessage(); - return; - } finally { - this.controlPlaneClient = controlPlaneClient; - } - - restartTimer(); + @Override + public String toString() { + return "ResourceSubscriber{" + + "resource='" + resource + '\'' + + ", authority='" + authority + '\'' + + ", type=" + type + + ", watchers=" + watchers.size() + + ", data=" + data + + ", absent=" + absent + + ", resourceDeletionIgnored=" + resourceDeletionIgnored + + ", errorDescription='" + errorDescription + '\'' + + '}'; } void addWatcher(ResourceWatcher watcher, Executor watcherExecutor) { @@ -607,7 +752,7 @@ void addWatcher(ResourceWatcher watcher, Executor watcherExecutor) { }); } - void removeWatcher(ResourceWatcher watcher) { + void removeWatcher(ResourceWatcher watcher) { checkArgument(watchers.containsKey(watcher), "watcher %s not registered", watcher); watchers.remove(watcher); } @@ -616,7 +761,9 @@ void restartTimer() { if (data != null || absent) { // resource already resolved return; } - if (!controlPlaneClient.isReady()) { // When client becomes ready, it triggers a restartTimer + ControlPlaneClient activeCpc = getActiveCpc(authority); + if (activeCpc == null || !activeCpc.isReady()) { + // When client becomes ready, it triggers a restartTimer for all relevant subscribers. return; } @@ -626,7 +773,7 @@ public void run() { logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout", type, resource); respTimer = null; - onAbsent(null); + onAbsent(null, activeCpc.getServerInfo()); } @Override @@ -638,6 +785,9 @@ public String toString() { // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED. metadata = ResourceMetadata.newResourceMetadataRequested(); + if (respTimer != null) { + respTimer.cancel(); + } respTimer = syncContext.schedule( new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService); @@ -661,8 +811,7 @@ void cancelResourceWatch() { message += " for which we previously ignored a deletion"; logLevel = XdsLogLevel.FORCE_INFO; } - logger.log(logLevel, message, type, resource, - serverInfo != null ? serverInfo.target() : "unknown"); + logger.log(logLevel, message, type, resource, getTarget()); } boolean isWatched() { @@ -687,7 +836,7 @@ void onData(ParsedResource parsedResource, String version, long updateTime, if (resourceDeletionIgnored) { logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version " + "of resource for which we previously ignored a deletion: type {1} name {2}", - serverInfo != null ? serverInfo.target() : "unknown", type, resource); + getTarget(), type, resource); resourceDeletionIgnored = false; } if (!Objects.equals(oldData, data)) { @@ -704,15 +853,21 @@ void onData(ParsedResource parsedResource, String version, long updateTime, } } - void onAbsent(@Nullable ProcessingTracker processingTracker) { + private String getTarget() { + ControlPlaneClient activeCpc = getActiveCpc(authority); + return (activeCpc != null) + ? activeCpc.getServerInfo().target() + : "unknown"; + } + + void onAbsent(@Nullable ProcessingTracker processingTracker, ServerInfo serverInfo) { if (respTimer != null && respTimer.isPending()) { // too early to conclude absence return; } // Ignore deletion of State of the World resources when this feature is on, // and the resource is reusable. - boolean ignoreResourceDeletionEnabled = - serverInfo != null && serverInfo.ignoreResourceDeletion(); + boolean ignoreResourceDeletionEnabled = serverInfo.ignoreResourceDeletion(); if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) { if (!resourceDeletionIgnored) { logger.log(XdsLogLevel.FORCE_WARNING, @@ -785,4 +940,89 @@ private void notifyWatcher(ResourceWatcher watcher, T update) { } } + private class ResponseHandler implements XdsResponseHandler { + final ServerInfo serverInfo; + + ResponseHandler(ServerInfo serverInfo) { + this.serverInfo = serverInfo; + } + + @Override + public void handleResourceResponse( + XdsResourceType xdsResourceType, ServerInfo serverInfo, String versionInfo, + List resources, String nonce, boolean isFirstResponse, + ProcessingTracker processingTracker) { + checkNotNull(xdsResourceType, "xdsResourceType"); + syncContext.throwIfNotInThisSynchronizationContext(); + Set toParseResourceNames = + xdsResourceType.shouldRetrieveResourceKeysForArgs() + ? getResourceKeys(xdsResourceType) + : null; + XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce, + bootstrapInfo, securityConfig, toParseResourceNames); + handleResourceUpdate(args, resources, xdsResourceType, isFirstResponse, processingTracker); + } + + @Override + public void handleStreamClosed(Status status, boolean shouldTryFallback) { + syncContext.throwIfNotInThisSynchronizationContext(); + + ControlPlaneClient cpcClosed = serverCpClientMap.get(serverInfo); + if (cpcClosed == null) { + return; + } + + cleanUpResourceTimers(cpcClosed); + + if (status.isOk()) { + return; // Not considered an error + } + + metricReporter.reportServerFailure(1L, serverInfo.target()); + + Collection authoritiesForClosedCpc = getActiveAuthorities(cpcClosed); + for (Map> subscriberMap : + resourceSubscribers.values()) { + for (ResourceSubscriber subscriber : subscriberMap.values()) { + if (subscriber.hasResult() || !authoritiesForClosedCpc.contains(subscriber.authority)) { + continue; + } + + // try to fallback to lower priority control plane client + if (shouldTryFallback && manageControlPlaneClient(subscriber).didFallback) { + authoritiesForClosedCpc.remove(subscriber.authority); + if (authoritiesForClosedCpc.isEmpty()) { + return; // optimization: no need to continue once all authorities have done fallback + } + continue; // since we did fallback, don't consider it an error + } + + subscriber.onError(status, null); + } + } + } + + } + + private static class CpcWithFallbackState { + ControlPlaneClient cpc; + boolean didFallback; + + private CpcWithFallbackState(ControlPlaneClient cpc, boolean didFallback) { + this.cpc = cpc; + this.didFallback = didFallback; + } + } + + private Collection getActiveAuthorities(ControlPlaneClient cpc) { + List asList = activatedCpClients.entrySet().stream() + .filter(entry -> !entry.getValue().isEmpty() + && cpc == entry.getValue().get(entry.getValue().size() - 1)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + + // Since this is usually used for contains, use a set when the list is large + return (asList.size() < 100) ? asList : new HashSet<>(asList); + } + } diff --git a/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java b/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java index 69fde29a0a9..ac1c4829c74 100644 --- a/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java +++ b/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java @@ -57,6 +57,7 @@ import io.grpc.InsecureServerCredentials; import io.grpc.NameResolverRegistry; import io.grpc.Server; +import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.UUID; @@ -88,9 +89,11 @@ public class ControlPlaneRule extends TestWatcher { private XdsTestControlPlaneService controlPlaneService; private XdsTestLoadReportingService loadReportingService; private XdsNameResolverProvider nameResolverProvider; + private int port; // Only change from 0 to actual port used in the server. public ControlPlaneRule() { serverHostName = "test-server"; + this.port = 0; } public ControlPlaneRule setServerHostName(String serverHostName) { @@ -117,11 +120,7 @@ public Server getServer() { try { controlPlaneService = new XdsTestControlPlaneService(); loadReportingService = new XdsTestLoadReportingService(); - server = Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create()) - .addService(controlPlaneService) - .addService(loadReportingService) - .build() - .start(); + createAndStartXdsServer(); } catch (Exception e) { throw new AssertionError("unable to start the control plane server", e); } @@ -146,6 +145,42 @@ public Server getServer() { NameResolverRegistry.getDefaultRegistry().deregister(nameResolverProvider); } + /** + * Will shutdown existing server if needed. + * Then creates a new server in the same way as {@link #starting(Description)} and starts it. + */ + public void restartXdsServer() { + + if (getServer() != null && !getServer().isTerminated()) { + getServer().shutdownNow(); + try { + if (!getServer().awaitTermination(5, TimeUnit.SECONDS)) { + logger.log(Level.SEVERE, "Timed out waiting for server shutdown"); + } + } catch (InterruptedException e) { + throw new AssertionError("unable to shut down control plane server", e); + } + } + + try { + createAndStartXdsServer(); + } catch (Exception e) { + throw new AssertionError("unable to restart the control plane server", e); + } + } + + private void createAndStartXdsServer() throws IOException { + server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) + .addService(controlPlaneService) + .addService(loadReportingService) + .build() + .start(); + + if (port == 0) { + port = server.getPort(); + } + } + /** * For test purpose, use boostrapOverride to programmatically provide bootstrap info. */ @@ -175,46 +210,69 @@ void setLdsConfig(Listener serverListener, Listener clientListener) { } void setRdsConfig(RouteConfiguration routeConfiguration) { - getService().setXdsConfig(ADS_TYPE_URL_RDS, ImmutableMap.of(RDS_NAME, routeConfiguration)); + setRdsConfig(RDS_NAME, routeConfiguration); + } + + public void setRdsConfig(String rdsName, RouteConfiguration routeConfiguration) { + getService().setXdsConfig(ADS_TYPE_URL_RDS, ImmutableMap.of(rdsName, routeConfiguration)); } void setCdsConfig(Cluster cluster) { + setCdsConfig(CLUSTER_NAME, cluster); + } + + void setCdsConfig(String clusterName, Cluster cluster) { getService().setXdsConfig(ADS_TYPE_URL_CDS, - ImmutableMap.of(CLUSTER_NAME, cluster)); + ImmutableMap.of(clusterName, cluster)); } void setEdsConfig(ClusterLoadAssignment clusterLoadAssignment) { + setEdsConfig(EDS_NAME, clusterLoadAssignment); + } + + void setEdsConfig(String edsName, ClusterLoadAssignment clusterLoadAssignment) { getService().setXdsConfig(ADS_TYPE_URL_EDS, - ImmutableMap.of(EDS_NAME, clusterLoadAssignment)); + ImmutableMap.of(edsName, clusterLoadAssignment)); } /** * Builds a new default RDS configuration. */ static RouteConfiguration buildRouteConfiguration(String authority) { - io.envoyproxy.envoy.config.route.v3.VirtualHost virtualHost = VirtualHost.newBuilder() + return buildRouteConfiguration(authority, RDS_NAME, CLUSTER_NAME); + } + + static RouteConfiguration buildRouteConfiguration(String authority, String rdsName, + String clusterName) { + VirtualHost.Builder vhBuilder = VirtualHost.newBuilder() + .setName(rdsName) .addDomains(authority) .addRoutes( Route.newBuilder() .setMatch( RouteMatch.newBuilder().setPrefix("/").build()) .setRoute( - RouteAction.newBuilder().setCluster(CLUSTER_NAME) + RouteAction.newBuilder().setCluster(clusterName) .setAutoHostRewrite(BoolValue.newBuilder().setValue(true).build()) - .build()).build()).build(); - return RouteConfiguration.newBuilder().setName(RDS_NAME).addVirtualHosts(virtualHost).build(); + .build())); + VirtualHost virtualHost = vhBuilder.build(); + return RouteConfiguration.newBuilder().setName(rdsName).addVirtualHosts(virtualHost).build(); } /** * Builds a new default CDS configuration. */ static Cluster buildCluster() { + return buildCluster(CLUSTER_NAME, EDS_NAME); + } + + static Cluster buildCluster(String clusterName, String edsName) { return Cluster.newBuilder() - .setName(CLUSTER_NAME) + .setName(clusterName) .setType(Cluster.DiscoveryType.EDS) .setEdsClusterConfig( Cluster.EdsClusterConfig.newBuilder() - .setServiceName(EDS_NAME) + .setServiceName(edsName) .setEdsConfig( ConfigSource.newBuilder() .setAds(AggregatedConfigSource.newBuilder().build()) @@ -228,7 +286,13 @@ static Cluster buildCluster() { * Builds a new default EDS configuration. */ static ClusterLoadAssignment buildClusterLoadAssignment(String hostName, String endpointHostname, - int port) { + int port) { + return buildClusterLoadAssignment(hostName, endpointHostname, port, EDS_NAME); + } + + static ClusterLoadAssignment buildClusterLoadAssignment(String hostName, String endpointHostname, + int port, String edsName) { + Address address = Address.newBuilder() .setSocketAddress( SocketAddress.newBuilder().setAddress(hostName).setPortValue(port).build()).build(); @@ -243,7 +307,7 @@ static ClusterLoadAssignment buildClusterLoadAssignment(String hostName, String .setHealthStatus(HealthStatus.HEALTHY) .build()).build(); return ClusterLoadAssignment.newBuilder() - .setClusterName(EDS_NAME) + .setClusterName(edsName) .addEndpoints(endpoints) .build(); } @@ -252,8 +316,17 @@ static ClusterLoadAssignment buildClusterLoadAssignment(String hostName, String * Builds a new client listener. */ static Listener buildClientListener(String name) { + return buildClientListener(name, "terminal-filter"); + } + + + static Listener buildClientListener(String name, String identifier) { + return buildClientListener(name, identifier, RDS_NAME); + } + + static Listener buildClientListener(String name, String identifier, String rdsName) { HttpFilter httpFilter = HttpFilter.newBuilder() - .setName("terminal-filter") + .setName(identifier) .setTypedConfig(Any.pack(Router.newBuilder().build())) .setIsOptional(true) .build(); @@ -262,7 +335,7 @@ static Listener buildClientListener(String name) { .HttpConnectionManager.newBuilder() .setRds( Rds.newBuilder() - .setRouteConfigName(RDS_NAME) + .setRouteConfigName(rdsName) .setConfigSource( ConfigSource.newBuilder() .setAds(AggregatedConfigSource.getDefaultInstance()))) diff --git a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java index 7c6821dc560..df0687f6706 100644 --- a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java +++ b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java @@ -108,7 +108,7 @@ public void setUp() { // because true->false return mutation prevents fetchClientStatus from completing the request. csdsStub = ClientStatusDiscoveryServiceGrpc .newBlockingStub(grpcServerRule.getChannel()) - .withDeadline(Deadline.after(3, TimeUnit.SECONDS)); + .withDeadline(Deadline.after(30, TimeUnit.SECONDS)); csdsAsyncStub = ClientStatusDiscoveryServiceGrpc.newStub(grpcServerRule.getChannel()); } @@ -498,11 +498,17 @@ public BootstrapInfo getBootstrapInfo() { @Nullable @Override - public Collection getSubscribedResources(ServerInfo serverInfo, - XdsResourceType type) { + public Collection getSubscribedResources( + ServerInfo serverInfo, XdsResourceType type) { return null; } + @Override + public void startMissingResourceTimers(Collection resourceNames, + XdsResourceType resourceType) { + // do nothing + } + @Override public Map> getSubscribedResourceTypesWithTypeUrl() { return ImmutableMap.of(); @@ -511,8 +517,7 @@ public Map> getSubscribedResourceTypesWithTypeUrl() { private static class FakeXdsClientPoolFactory implements XdsClientPoolFactory { private final Map xdsClientMap = new HashMap<>(); - private boolean isOldStyle - ; + private boolean isOldStyle; private FakeXdsClientPoolFactory(@Nullable XdsClient xdsClient) { if (xdsClient != null) { diff --git a/xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java b/xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java index 475b6e00a07..d2a9bf3316d 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java +++ b/xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java @@ -32,6 +32,7 @@ import io.grpc.xds.client.Bootstrapper.BootstrapInfo; import io.grpc.xds.client.Bootstrapper.ServerInfo; import io.grpc.xds.client.BootstrapperImpl; +import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.client.EnvoyProtoData.Node; import io.grpc.xds.client.Locality; import io.grpc.xds.client.XdsInitializationException; @@ -61,10 +62,12 @@ public class GrpcBootstrapperImplTest { private String originalBootstrapPathFromSysProp; private String originalBootstrapConfigFromEnvVar; private String originalBootstrapConfigFromSysProp; + private boolean originalExperimentalXdsFallbackFlag; @Before public void setUp() { saveEnvironment(); + originalExperimentalXdsFallbackFlag = CommonBootstrapperTestUtils.setEnableXdsFallback(true); bootstrapper.bootstrapPathFromEnvVar = BOOTSTRAP_FILE_PATH; } @@ -81,6 +84,7 @@ public void restoreEnvironment() { bootstrapper.bootstrapPathFromSysProp = originalBootstrapPathFromSysProp; bootstrapper.bootstrapConfigFromEnvVar = originalBootstrapConfigFromEnvVar; bootstrapper.bootstrapConfigFromSysProp = originalBootstrapConfigFromSysProp; + CommonBootstrapperTestUtils.setEnableXdsFallback(originalExperimentalXdsFallbackFlag); } @Test diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 198faea7fdc..b326eb7d02d 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -3599,42 +3599,52 @@ public void streamClosedAndRetryWithBackoff() { call.verifyRequest(RDS, RDS_RESOURCE, "5", "6764", NODE); call.sendError(Status.DEADLINE_EXCEEDED.asException()); + fakeClock.forwardNanos(100L); + call = resourceDiscoveryCalls.poll(); + call.sendError(Status.DEADLINE_EXCEEDED.asException()); + + // Already received LDS and RDS, so they only error twice. verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture()); verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture()); - verify(cdsResourceWatcher, times(2)).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); - verify(edsResourceWatcher, times(2)).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); + verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); + verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); // Check metric data. callback_ReportServerConnection(); - verifyServerConnection(3, true, xdsServerInfo.target()); + verifyServerConnection(2, true, xdsServerInfo.target()); + verifyServerConnection(4, false, xdsServerInfo.target()); // Reset backoff sequence and retry after backoff. inOrder.verify(backoffPolicyProvider).get(); - inOrder.verify(backoffPolicy2).nextBackoffNanos(); + inOrder.verify(backoffPolicy2, times(2)).nextBackoffNanos(); retryTask = Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)); - assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(20L); - fakeClock.forwardNanos(20L); + fakeClock.forwardNanos(retryTask.getDelay(TimeUnit.NANOSECONDS)); call = resourceDiscoveryCalls.poll(); call.verifyRequest(LDS, LDS_RESOURCE, "63", "", NODE); call.verifyRequest(RDS, RDS_RESOURCE, "5", "", NODE); call.verifyRequest(CDS, CDS_RESOURCE, "", "", NODE); call.verifyRequest(EDS, EDS_RESOURCE, "", "", NODE); + // Check metric data, should be in error since haven't gotten a response. + callback_ReportServerConnection(); + verifyServerConnection(2, true, xdsServerInfo.target()); + verifyServerConnection(5, false, xdsServerInfo.target()); + // Management server becomes unreachable again. call.sendError(Status.UNAVAILABLE.asException()); verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture()); verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture()); - verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture()); + verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); - verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture()); + verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); // Check metric data. callback_ReportServerConnection(); - verifyServerConnection(4, false, xdsServerInfo.target()); + verifyServerConnection(6, false, xdsServerInfo.target()); // Retry after backoff. inOrder.verify(backoffPolicy2).nextBackoffNanos(); @@ -3650,7 +3660,12 @@ public void streamClosedAndRetryWithBackoff() { // Check metric data. callback_ReportServerConnection(); - verifyServerConnection(5, false, xdsServerInfo.target()); + verifyServerConnection(7, false, xdsServerInfo.target()); + + // Send a response so CPC is considered working + call.sendResponse(LDS, listeners, "63", "3242"); + callback_ReportServerConnection(); + verifyServerConnection(3, true, xdsServerInfo.target()); inOrder.verifyNoMoreInteractions(); } @@ -3750,6 +3765,19 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe // Check metric data. callback_ReportServerConnection(); verifyServerConnection(4, true, xdsServerInfo.target()); + verify(cdsResourceWatcher, never()).onError(errorCaptor.capture()); // We had a response + + fakeClock.forwardTime(5, TimeUnit.SECONDS); + DiscoveryRpcCall call2 = resourceDiscoveryCalls.poll(); + call2.sendError(Status.UNAVAILABLE.asException()); + verify(cdsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); + verify(edsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); + + fakeClock.forwardTime(5, TimeUnit.SECONDS); + DiscoveryRpcCall call3 = resourceDiscoveryCalls.poll(); + assertThat(call3).isNotNull(); fakeClock.forwardNanos(10L); assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0); @@ -3962,11 +3990,14 @@ public void sendingToStoppedServer() throws Exception { @Test public void sendToBadUrl() throws Exception { // Setup xdsClient to fail on stream creation - XdsClientImpl client = createXdsClient("some. garbage"); + String garbageUri = "some. garbage"; + XdsClientImpl client = createXdsClient(garbageUri); client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); fakeClock.forwardTime(20, TimeUnit.SECONDS); - verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any()); + verify(ldsResourceWatcher, Mockito.timeout(5000).atLeastOnce()) + .onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getDescription()).contains(garbageUri); client.shutdown(); } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java new file mode 100644 index 00000000000..5e32036c961 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java @@ -0,0 +1,522 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; +import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.grpc.MetricRecorder; +import io.grpc.Status; +import io.grpc.internal.ExponentialBackoffPolicy; +import io.grpc.internal.FakeClock; +import io.grpc.internal.ObjectPool; +import io.grpc.xds.client.Bootstrapper; +import io.grpc.xds.client.CommonBootstrapperTestUtils; +import io.grpc.xds.client.LoadReportClient; +import io.grpc.xds.client.XdsClient; +import io.grpc.xds.client.XdsClientImpl; +import io.grpc.xds.client.XdsClientMetricReporter; +import io.grpc.xds.client.XdsInitializationException; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class XdsClientFallbackTest { + private static final Logger log = Logger.getLogger(XdsClientFallbackTest.class.getName()); + + private static final String MAIN_SERVER = "main-server"; + private static final String FALLBACK_SERVER = "fallback-server"; + private static final String DUMMY_TARGET = "TEST_TARGET"; + private static final String RDS_NAME = "route-config.googleapis.com"; + private static final String FALLBACK_RDS_NAME = "fallback-" + RDS_NAME; + private static final String CLUSTER_NAME = "cluster0"; + private static final String FALLBACK_CLUSTER_NAME = "fallback-" + CLUSTER_NAME; + private static final String EDS_NAME = "eds-service-0"; + private static final String FALLBACK_EDS_NAME = "fallback-" + EDS_NAME; + private static final HttpConnectionManager MAIN_HTTP_CONNECTION_MANAGER = + HttpConnectionManager.forRdsName(0, RDS_NAME, ImmutableList.of( + new Filter.NamedFilterConfig(MAIN_SERVER, RouterFilter.ROUTER_CONFIG))); + private static final HttpConnectionManager FALLBACK_HTTP_CONNECTION_MANAGER = + HttpConnectionManager.forRdsName(0, RDS_NAME, ImmutableList.of( + new Filter.NamedFilterConfig(FALLBACK_SERVER, RouterFilter.ROUTER_CONFIG))); + private ObjectPool xdsClientPool; + private XdsClient xdsClient; + private boolean originalEnableXdsFallback; + private final FakeClock fakeClock = new FakeClock(); + private final MetricRecorder metricRecorder = new MetricRecorder() {}; + + @Mock + private XdsClientMetricReporter xdsClientMetricReporter; + + @Captor + private ArgumentCaptor errorCaptor; + + + private final XdsClient.ResourceWatcher raalLdsWatcher = + new XdsClient.ResourceWatcher() { + + @Override + public void onChanged(XdsListenerResource.LdsUpdate update) { + log.log(Level.FINE, "LDS update: " + update); + } + + @Override + public void onError(Status error) { + log.log(Level.FINE, "LDS update error: " + error.getDescription()); + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + log.log(Level.FINE, "LDS resource does not exist: " + resourceName); + } + }; + + @SuppressWarnings("unchecked") + private final XdsClient.ResourceWatcher ldsWatcher = + mock(XdsClient.ResourceWatcher.class, delegatesTo(raalLdsWatcher)); + @Mock + private XdsClient.ResourceWatcher ldsWatcher2; + + @Mock + private XdsClient.ResourceWatcher rdsWatcher; + @Mock + private XdsClient.ResourceWatcher rdsWatcher2; + @Mock + private XdsClient.ResourceWatcher rdsWatcher3; + + private final XdsClient.ResourceWatcher raalCdsWatcher = + new XdsClient.ResourceWatcher() { + + @Override + public void onChanged(XdsClusterResource.CdsUpdate update) { + log.log(Level.FINE, "CDS update: " + update); + } + + @Override + public void onError(Status error) { + log.log(Level.FINE, "CDS update error: " + error.getDescription()); + } + + @Override + public void onResourceDoesNotExist(String resourceName) { + log.log(Level.FINE, "CDS resource does not exist: " + resourceName); + } + }; + + @SuppressWarnings("unchecked") + private final XdsClient.ResourceWatcher cdsWatcher = + mock(XdsClient.ResourceWatcher.class, delegatesTo(raalCdsWatcher)); + @Mock + private XdsClient.ResourceWatcher cdsWatcher2; + + @Rule(order = 0) + public ControlPlaneRule mainXdsServer = + new ControlPlaneRule().setServerHostName(MAIN_SERVER); + + @Rule(order = 1) + public ControlPlaneRule fallbackServer = + new ControlPlaneRule().setServerHostName(MAIN_SERVER); + + @Rule public final MockitoRule mocks = MockitoJUnit.rule(); + + @Before + public void setUp() throws XdsInitializationException { + originalEnableXdsFallback = CommonBootstrapperTestUtils.setEnableXdsFallback(true); + if (mainXdsServer == null) { + throw new XdsInitializationException("Failed to create ControlPlaneRule for main TD server"); + } + setAdsConfig(mainXdsServer, MAIN_SERVER); + setAdsConfig(fallbackServer, FALLBACK_SERVER); + + SharedXdsClientPoolProvider clientPoolProvider = new SharedXdsClientPoolProvider(); + clientPoolProvider.setBootstrapOverride(defaultBootstrapOverride()); + xdsClientPool = clientPoolProvider.getOrCreate(DUMMY_TARGET, metricRecorder); + } + + @After + public void cleanUp() { + if (xdsClientPool != null) { + xdsClientPool.returnObject(xdsClient); + } + CommonBootstrapperTestUtils.setEnableXdsFallback(originalEnableXdsFallback); + } + + private static void setAdsConfig(ControlPlaneRule controlPlane, String serverName) { + InetSocketAddress edsInetSocketAddress = + (InetSocketAddress) controlPlane.getServer().getListenSockets().get(0); + boolean isMainServer = serverName.equals(MAIN_SERVER); + String rdsName = isMainServer + ? RDS_NAME + : FALLBACK_RDS_NAME; + String clusterName = isMainServer ? CLUSTER_NAME : FALLBACK_CLUSTER_NAME; + String edsName = isMainServer ? EDS_NAME : FALLBACK_EDS_NAME; + + controlPlane.setLdsConfig(ControlPlaneRule.buildServerListener(), + ControlPlaneRule.buildClientListener(MAIN_SERVER, serverName)); + + controlPlane.setRdsConfig(rdsName, + ControlPlaneRule.buildRouteConfiguration(MAIN_SERVER, rdsName, clusterName)); + controlPlane.setCdsConfig(clusterName, ControlPlaneRule.buildCluster(clusterName, edsName)); + + controlPlane.setEdsConfig(edsName, + ControlPlaneRule.buildClusterLoadAssignment(edsInetSocketAddress.getHostName(), + DataPlaneRule.ENDPOINT_HOST_NAME, edsInetSocketAddress.getPort(), edsName)); + log.log(Level.FINE, + String.format("Set ADS config for %s with address %s", serverName, edsInetSocketAddress)); + } + + // This is basically a control test to make sure everything is set up correctly. + @Test + public void everything_okay() { + mainXdsServer.restartXdsServer(); + fallbackServer.restartXdsServer(); + xdsClient = xdsClientPool.getObject(); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener( + MAIN_HTTP_CONNECTION_MANAGER)); + + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher); + verify(rdsWatcher, timeout(5000)).onChanged(any()); + } + + @Test + public void mainServerDown_fallbackServerUp() { + mainXdsServer.getServer().shutdownNow(); + fallbackServer.restartXdsServer(); + xdsClient = xdsClientPool.getObject(); + log.log(Level.FINE, "Fallback port = " + fallbackServer.getServer().getPort()); + + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener( + FALLBACK_HTTP_CONNECTION_MANAGER)); + } + + @Test + public void useBadAuthority() { + xdsClient = xdsClientPool.getObject(); + InOrder inOrder = inOrder(ldsWatcher, rdsWatcher, rdsWatcher2, rdsWatcher3); + + String badPrefix = "xdstp://authority.xds.bad/envoy.config.listener.v3.Listener/"; + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), + badPrefix + "listener.googleapis.com", ldsWatcher); + inOrder.verify(ldsWatcher, timeout(5000)).onError(any()); + + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), + badPrefix + "route-config.googleapis.bad", rdsWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), + badPrefix + "route-config2.googleapis.bad", rdsWatcher2); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), + badPrefix + "route-config3.googleapis.bad", rdsWatcher3); + inOrder.verify(rdsWatcher, timeout(5000).times(1)).onError(any()); + inOrder.verify(rdsWatcher2, timeout(5000).times(1)).onError(any()); + inOrder.verify(rdsWatcher3, timeout(5000).times(1)).onError(any()); + verify(rdsWatcher, never()).onChanged(any()); + + // even after an error, a valid one will still work + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher2); + verify(ldsWatcher2, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + } + + @Test + public void both_down_restart_main() { + mainXdsServer.getServer().shutdownNow(); + fallbackServer.getServer().shutdownNow(); + xdsClient = xdsClientPool.getObject(); + + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + verify(ldsWatcher, timeout(5000)).onError(any()); + verify(ldsWatcher, timeout(5000).times(0)).onChanged(any()); + xdsClient.watchXdsResource( + XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher2); + verify(rdsWatcher2, timeout(5000)).onError(any()); + + mainXdsServer.restartXdsServer(); + + xdsClient.watchXdsResource( + XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher); + + verify(ldsWatcher, timeout(16000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + verify(rdsWatcher, timeout(5000)).onChanged(any()); + verify(rdsWatcher2, timeout(5000)).onChanged(any()); + } + + @Test + public void mainDown_fallbackUp_restart_main() { + mainXdsServer.getServer().shutdownNow(); + fallbackServer.restartXdsServer(); + xdsClient = xdsClientPool.getObject(); + InOrder inOrder = inOrder(ldsWatcher, rdsWatcher, cdsWatcher, cdsWatcher2); + + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + inOrder.verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER)); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), FALLBACK_CLUSTER_NAME, cdsWatcher); + inOrder.verify(cdsWatcher, timeout(5000)).onChanged(any()); + + assertThat(fallbackServer.getService().getSubscriberCounts() + .get("type.googleapis.com/envoy.config.listener.v3.Listener")).isEqualTo(1); + verifyNoSubscribers(mainXdsServer); + + mainXdsServer.restartXdsServer(); + + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher); + inOrder.verify(rdsWatcher, timeout(5000)).onChanged(any()); + verifyNoSubscribers(fallbackServer); + + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CLUSTER_NAME, cdsWatcher2); + inOrder.verify(cdsWatcher2, timeout(5000)).onChanged(any()); + + verifyNoSubscribers(fallbackServer); + assertThat(mainXdsServer.getService().getSubscriberCounts() + .get("type.googleapis.com/envoy.config.listener.v3.Listener")).isEqualTo(1); + } + + private static void verifyNoSubscribers(ControlPlaneRule rule) { + for (Map.Entry me : rule.getService().getSubscriberCounts().entrySet()) { + String type = me.getKey(); + Integer count = me.getValue(); + assertWithMessage("Type with non-zero subscribers is: %s", type) + .that(count).isEqualTo(0); + } + } + + // This test takes a long time because of the 16 sec timeout for non-existent resource + @Test + public void connect_then_mainServerDown_fallbackServerUp() throws InterruptedException { + mainXdsServer.restartXdsServer(); + fallbackServer.restartXdsServer(); + xdsClient = xdsClientPool.getObject(); + + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher); + verify(rdsWatcher, timeout(5000)).onChanged(any()); + + mainXdsServer.getServer().shutdownNow(); + TimeUnit.SECONDS.sleep(5); // TODO(lsafran) Use FakeClock so test runs faster + + // Shouldn't do fallback since all watchers are loaded + verify(ldsWatcher, never()).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER)); + + // Should just get from cache + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher2); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher2); + verify(ldsWatcher2, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + verify(ldsWatcher, never()).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER)); + // Make sure that rdsWatcher wasn't called again + verify(rdsWatcher, times(1)).onChanged(any()); + verify(rdsWatcher2, timeout(5000)).onChanged(any()); + + // Asking for something not in cache should force a fallback + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), FALLBACK_CLUSTER_NAME, cdsWatcher); + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER)); + verify(ldsWatcher2, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(FALLBACK_HTTP_CONNECTION_MANAGER)); + verify(cdsWatcher, timeout(16000)).onChanged(any()); + + xdsClient.watchXdsResource( + XdsRouteConfigureResource.getInstance(), FALLBACK_RDS_NAME, rdsWatcher3); + verify(rdsWatcher3, timeout(5000)).onChanged(any()); + + // Test that resource defined in main but not fallback is handled correctly + xdsClient.watchXdsResource( + XdsClusterResource.getInstance(), CLUSTER_NAME, cdsWatcher2); + verify(cdsWatcher2, timeout(16000)).onResourceDoesNotExist(eq(CLUSTER_NAME)); + } + + @Test + public void connect_then_mainServerRestart_fallbackServerdown() { + mainXdsServer.restartXdsServer(); + xdsClient = xdsClientPool.getObject(); + + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + + mainXdsServer.getServer().shutdownNow(); + fallbackServer.getServer().shutdownNow(); + + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CLUSTER_NAME, cdsWatcher); + + mainXdsServer.restartXdsServer(); + + verify(cdsWatcher, timeout(5000)).onChanged(any()); + verify(ldsWatcher, timeout(5000).atLeastOnce()).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + } + + @Test + public void fallbackFromBadUrlToGoodOne() { + // Setup xdsClient to fail on stream creation + String garbageUri = "some. garbage"; + + String validUri = "localhost:" + mainXdsServer.getServer().getPort(); + XdsClientImpl client = CommonBootstrapperTestUtils.createXdsClient( + Arrays.asList(garbageUri, validUri), DEFAULT_XDS_TRANSPORT_FACTORY, fakeClock, + new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, xdsClientMetricReporter); + + client.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + fakeClock.forwardTime(20, TimeUnit.SECONDS); + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener( + MAIN_HTTP_CONNECTION_MANAGER)); + verify(ldsWatcher, never()).onError(any()); + + client.shutdown(); + } + + @Test + public void testGoodUrlFollowedByBadUrl() { + // Setup xdsClient to fail on stream creation + String garbageUri = "some. garbage"; + String validUri = "localhost:" + mainXdsServer.getServer().getPort(); + + XdsClientImpl client = CommonBootstrapperTestUtils.createXdsClient( + Arrays.asList(validUri, garbageUri), DEFAULT_XDS_TRANSPORT_FACTORY, fakeClock, + new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, xdsClientMetricReporter); + + client.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + fakeClock.forwardTime(20, TimeUnit.SECONDS); + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener( + MAIN_HTTP_CONNECTION_MANAGER)); + verify(ldsWatcher, never()).onError(any()); + + client.shutdown(); + } + + @Test + public void testTwoBadUrl() { + // Setup xdsClient to fail on stream creation + String garbageUri1 = "some. garbage"; + String garbageUri2 = "other garbage"; + + XdsClientImpl client = CommonBootstrapperTestUtils.createXdsClient( + Arrays.asList(garbageUri1, garbageUri2), DEFAULT_XDS_TRANSPORT_FACTORY, fakeClock, + new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, xdsClientMetricReporter); + + client.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + fakeClock.forwardTime(20, TimeUnit.SECONDS); + verify(ldsWatcher, Mockito.timeout(5000).atLeastOnce()).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getDescription()).contains(garbageUri2); + verify(ldsWatcher, never()).onChanged(any()); + client.shutdown(); + } + + private Bootstrapper.ServerInfo getLrsServerInfo(String target) { + for (Map.Entry entry + : xdsClient.getServerLrsClientMap().entrySet()) { + if (entry.getKey().target().equals(target)) { + return entry.getKey(); + } + } + return null; + } + + @Test + public void used_then_mainServerRestart_fallbackServerUp() { + xdsClient = xdsClientPool.getObject(); + + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); + + verify(ldsWatcher, timeout(5000)).onChanged( + XdsListenerResource.LdsUpdate.forApiListener(MAIN_HTTP_CONNECTION_MANAGER)); + + mainXdsServer.restartXdsServer(); + + assertThat(getLrsServerInfo("localhost:" + fallbackServer.getServer().getPort())).isNull(); + assertThat(getLrsServerInfo("localhost:" + mainXdsServer.getServer().getPort())).isNotNull(); + + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CLUSTER_NAME, cdsWatcher); + + verify(cdsWatcher, timeout(5000)).onChanged(any()); + assertThat(getLrsServerInfo("localhost:" + fallbackServer.getServer().getPort())).isNull(); + } + + private Map defaultBootstrapOverride() { + return ImmutableMap.of( + "node", ImmutableMap.of( + "id", UUID.randomUUID().toString(), + "cluster", CLUSTER_NAME), + "xds_servers", ImmutableList.of( + ImmutableMap.of( + "server_uri", "localhost:" + mainXdsServer.getServer().getPort(), + "channel_creds", Collections.singletonList( + ImmutableMap.of("type", "insecure") + ), + "server_features", Collections.singletonList("xds_v3") + ), + ImmutableMap.of( + "server_uri", "localhost:" + fallbackServer.getServer().getPort(), + "channel_creds", Collections.singletonList( + ImmutableMap.of("type", "insecure") + ), + "server_features", Collections.singletonList("xds_v3") + ) + ), + "fallback-policy", "fallback" + ); + } + +} diff --git a/xds/src/test/java/io/grpc/xds/XdsSecurityClientServerTest.java b/xds/src/test/java/io/grpc/xds/XdsSecurityClientServerTest.java index 590b6c79a10..6915ac6c13e 100644 --- a/xds/src/test/java/io/grpc/xds/XdsSecurityClientServerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsSecurityClientServerTest.java @@ -67,6 +67,7 @@ import io.grpc.xds.XdsServerTestHelper.FakeXdsClient; import io.grpc.xds.XdsServerTestHelper.FakeXdsClientPoolFactory; import io.grpc.xds.client.Bootstrapper; +import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.internal.Matchers.HeaderMatcher; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil; import io.grpc.xds.internal.security.SslContextProviderSupplier; diff --git a/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java b/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java index cc12e3863ba..98f5fcbfef9 100644 --- a/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java +++ b/xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java @@ -202,4 +202,12 @@ private DiscoveryResponse generateResponse(String resourceType, String version, } return responseBuilder.build(); } + + public Map getSubscriberCounts() { + Map subscriberCounts = new HashMap<>(); + for (String type : subscribers.keySet()) { + subscriberCounts.put(type, subscribers.get(type).size()); + } + return subscriberCounts; + } } diff --git a/xds/src/test/java/io/grpc/xds/CommonBootstrapperTestUtils.java b/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java similarity index 65% rename from xds/src/test/java/io/grpc/xds/CommonBootstrapperTestUtils.java rename to xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java index 6a8eced298c..27a0d4ba1d9 100644 --- a/xds/src/test/java/io/grpc/xds/CommonBootstrapperTestUtils.java +++ b/xds/src/test/java/io/grpc/xds/client/CommonBootstrapperTestUtils.java @@ -14,21 +14,46 @@ * limitations under the License. */ -package io.grpc.xds; +package io.grpc.xds.client; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.grpc.ChannelCredentials; +import io.grpc.InsecureChannelCredentials; +import io.grpc.internal.BackoffPolicy; +import io.grpc.internal.FakeClock; import io.grpc.internal.JsonParser; -import io.grpc.xds.client.Bootstrapper; +import io.grpc.internal.TimeProvider; import io.grpc.xds.client.Bootstrapper.ServerInfo; -import io.grpc.xds.client.EnvoyProtoData; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil; +import io.grpc.xds.internal.security.TlsContextManagerImpl; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; public class CommonBootstrapperTestUtils { + private static final ChannelCredentials CHANNEL_CREDENTIALS = InsecureChannelCredentials.create(); + private static final String SERVER_URI_CUSTOM_AUTHORITY = "trafficdirector2.googleapis.com"; + private static final String SERVER_URI_EMPTY_AUTHORITY = "trafficdirector3.googleapis.com"; + + private static final long TIME_INCREMENT = TimeUnit.SECONDS.toNanos(1); + + /** Fake time provider increments time TIME_INCREMENT each call. */ + private static TimeProvider newTimeProvider() { + return new TimeProvider() { + private long count; + + @Override + public long currentTimeNanos() { + return ++count * TIME_INCREMENT; + } + }; + } + private static final String FILE_WATCHER_CONFIG = "{\"path\": \"/etc/secret/certs\"}"; private static final String MESHCA_CONFIG = "{\n" @@ -145,4 +170,57 @@ public static Bootstrapper.BootstrapInfo buildBootstrapInfo( .certProviders(certProviders) .build(); } + + public static boolean setEnableXdsFallback(boolean target) { + boolean oldValue = BootstrapperImpl.enableXdsFallback; + BootstrapperImpl.enableXdsFallback = target; + return oldValue; + } + + public static XdsClientImpl createXdsClient(List serverUris, + XdsTransportFactory xdsTransportFactory, + FakeClock fakeClock, + BackoffPolicy.Provider backoffPolicyProvider, + MessagePrettyPrinter messagePrinter, + XdsClientMetricReporter xdsClientMetricReporter) { + Bootstrapper.BootstrapInfo bootstrapInfo = buildBootStrap(serverUris); + return new XdsClientImpl( + xdsTransportFactory, + bootstrapInfo, + fakeClock.getScheduledExecutorService(), + backoffPolicyProvider, + fakeClock.getStopwatchSupplier(), + newTimeProvider(), + messagePrinter, + new TlsContextManagerImpl(bootstrapInfo), + xdsClientMetricReporter); + } + + public static Bootstrapper.BootstrapInfo buildBootStrap(List serverUris) { + + List serverInfos = new ArrayList<>(); + for (String uri : serverUris) { + serverInfos.add(ServerInfo.create(uri, CHANNEL_CREDENTIALS, false, true)); + } + EnvoyProtoData.Node node = EnvoyProtoData.Node.newBuilder().setId("node-id").build(); + + return Bootstrapper.BootstrapInfo.builder() + .servers(serverInfos) + .node(node) + .authorities(ImmutableMap.of( + "authority.xds.com", + Bootstrapper.AuthorityInfo.create( + "xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s", + ImmutableList.of(Bootstrapper.ServerInfo.create( + SERVER_URI_CUSTOM_AUTHORITY, CHANNEL_CREDENTIALS))), + "", + Bootstrapper.AuthorityInfo.create( + "xdstp:///envoy.config.listener.v3.Listener/%s", + ImmutableList.of(Bootstrapper.ServerInfo.create( + SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS))))) + .certProviders(ImmutableMap.of("cert-instance-name", + Bootstrapper.CertificateProviderInfo.create("file-watcher", ImmutableMap.of()))) + .build(); + } + } diff --git a/xds/src/test/java/io/grpc/xds/internal/security/ClientSslContextProviderFactoryTest.java b/xds/src/test/java/io/grpc/xds/internal/security/ClientSslContextProviderFactoryTest.java index 23e30883307..9c8340123da 100644 --- a/xds/src/test/java/io/grpc/xds/internal/security/ClientSslContextProviderFactoryTest.java +++ b/xds/src/test/java/io/grpc/xds/internal/security/ClientSslContextProviderFactoryTest.java @@ -28,9 +28,9 @@ import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.TlsCertificate; import io.envoyproxy.envoy.type.matcher.v3.StringMatcher; -import io.grpc.xds.CommonBootstrapperTestUtils; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.client.Bootstrapper; +import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.client.XdsInitializationException; import io.grpc.xds.internal.security.certprovider.CertProviderClientSslContextProviderFactory; import io.grpc.xds.internal.security.certprovider.CertificateProvider; diff --git a/xds/src/test/java/io/grpc/xds/internal/security/SecurityProtocolNegotiatorsTest.java b/xds/src/test/java/io/grpc/xds/internal/security/SecurityProtocolNegotiatorsTest.java index fdfcd369937..955c812233a 100644 --- a/xds/src/test/java/io/grpc/xds/internal/security/SecurityProtocolNegotiatorsTest.java +++ b/xds/src/test/java/io/grpc/xds/internal/security/SecurityProtocolNegotiatorsTest.java @@ -45,12 +45,12 @@ import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator; import io.grpc.netty.InternalProtocolNegotiators; import io.grpc.netty.ProtocolNegotiationEvent; -import io.grpc.xds.CommonBootstrapperTestUtils; import io.grpc.xds.EnvoyServerProtoData.DownstreamTlsContext; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.InternalXdsAttributes; import io.grpc.xds.TlsContextManager; import io.grpc.xds.client.Bootstrapper; +import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.internal.security.SecurityProtocolNegotiators.ClientSecurityHandler; import io.grpc.xds.internal.security.SecurityProtocolNegotiators.ClientSecurityProtocolNegotiator; import io.grpc.xds.internal.security.certprovider.CommonCertProviderTestUtils; diff --git a/xds/src/test/java/io/grpc/xds/internal/security/ServerSslContextProviderFactoryTest.java b/xds/src/test/java/io/grpc/xds/internal/security/ServerSslContextProviderFactoryTest.java index c455385dae9..cf86b511f1f 100644 --- a/xds/src/test/java/io/grpc/xds/internal/security/ServerSslContextProviderFactoryTest.java +++ b/xds/src/test/java/io/grpc/xds/internal/security/ServerSslContextProviderFactoryTest.java @@ -24,10 +24,10 @@ import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; import io.envoyproxy.envoy.type.matcher.v3.StringMatcher; -import io.grpc.xds.CommonBootstrapperTestUtils; import io.grpc.xds.EnvoyServerProtoData; import io.grpc.xds.EnvoyServerProtoData.DownstreamTlsContext; import io.grpc.xds.client.Bootstrapper; +import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.client.XdsInitializationException; import io.grpc.xds.internal.security.certprovider.CertProviderServerSslContextProviderFactory; import io.grpc.xds.internal.security.certprovider.CertificateProvider; diff --git a/xds/src/test/java/io/grpc/xds/internal/security/TlsContextManagerTest.java b/xds/src/test/java/io/grpc/xds/internal/security/TlsContextManagerTest.java index 29d131cb8d7..035096a3528 100644 --- a/xds/src/test/java/io/grpc/xds/internal/security/TlsContextManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/internal/security/TlsContextManagerTest.java @@ -30,10 +30,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import io.grpc.xds.CommonBootstrapperTestUtils; import io.grpc.xds.EnvoyServerProtoData.DownstreamTlsContext; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.client.Bootstrapper; +import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.internal.security.ReferenceCountingMap.ValueFactory; import org.junit.Rule; import org.junit.Test; diff --git a/xds/src/test/java/io/grpc/xds/internal/security/certprovider/CertProviderClientSslContextProviderTest.java b/xds/src/test/java/io/grpc/xds/internal/security/certprovider/CertProviderClientSslContextProviderTest.java index 7c300c88297..b0800458d66 100644 --- a/xds/src/test/java/io/grpc/xds/internal/security/certprovider/CertProviderClientSslContextProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/internal/security/certprovider/CertProviderClientSslContextProviderTest.java @@ -33,9 +33,9 @@ import com.google.common.util.concurrent.MoreExecutors; import io.envoyproxy.envoy.config.core.v3.DataSource; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext; -import io.grpc.xds.CommonBootstrapperTestUtils; import io.grpc.xds.EnvoyServerProtoData; import io.grpc.xds.client.Bootstrapper; +import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil.TestCallback; import java.util.Queue; diff --git a/xds/src/test/java/io/grpc/xds/internal/security/certprovider/CertProviderServerSslContextProviderTest.java b/xds/src/test/java/io/grpc/xds/internal/security/certprovider/CertProviderServerSslContextProviderTest.java index 82af7d1dc27..423829ff5af 100644 --- a/xds/src/test/java/io/grpc/xds/internal/security/certprovider/CertProviderServerSslContextProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/internal/security/certprovider/CertProviderServerSslContextProviderTest.java @@ -32,9 +32,9 @@ import io.envoyproxy.envoy.config.core.v3.DataSource; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext; import io.envoyproxy.envoy.type.matcher.v3.StringMatcher; -import io.grpc.xds.CommonBootstrapperTestUtils; import io.grpc.xds.EnvoyServerProtoData; import io.grpc.xds.client.Bootstrapper; +import io.grpc.xds.client.CommonBootstrapperTestUtils; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil.TestCallback; import io.grpc.xds.internal.security.certprovider.CertProviderClientSslContextProviderTest.QueuedExecutor; From 62ce71bc211221ecfb51a06bd9fb01d43c7737a1 Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Mon, 9 Dec 2024 13:43:10 -0800 Subject: [PATCH 2/3] More fixes --- .../main/java/io/grpc/xds/client/XdsClientImpl.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 0368e72efc0..791ba3cc62d 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -672,18 +672,6 @@ private void shutdownLowerPriorityCpcs(ControlPlaneClient activatedCpc) { } - @Override - public Future reportServerConnections(ServerConnectionCallback callback) { - SettableFuture future = SettableFuture.create(); - syncContext.execute(() -> { - serverCpClientMap.forEach((serverInfo, controlPlaneClient) -> - callback.reportServerConnectionGauge( - controlPlaneClient.hasWorkingAdsStream(), serverInfo.target())); - future.set(null); - }); - return future; - } - /** Tracks a single subscribed resource. */ private final class ResourceSubscriber { @Nullable From 984ea31a58b9e69991b69ae85d335a8471e54b2a Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Mon, 9 Dec 2024 14:35:50 -0800 Subject: [PATCH 3/3] Fix flaky test --- xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java index 5e32036c961..6df27db0450 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientFallbackTest.java @@ -273,11 +273,11 @@ public void both_down_restart_main() { xdsClient = xdsClientPool.getObject(); xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher); - verify(ldsWatcher, timeout(5000)).onError(any()); + verify(ldsWatcher, timeout(5000).atLeastOnce()).onError(any()); verify(ldsWatcher, timeout(5000).times(0)).onChanged(any()); xdsClient.watchXdsResource( XdsRouteConfigureResource.getInstance(), RDS_NAME, rdsWatcher2); - verify(rdsWatcher2, timeout(5000)).onError(any()); + verify(rdsWatcher2, timeout(5000).atLeastOnce()).onError(any()); mainXdsServer.restartXdsServer();