Skip to content

Commit 87ae513

Browse files
committed
xds: ClusterResolverLoadBalancer handle update for both resolved addresses and errors via ResolutionResult (grpc#11997)
1 parent 2448c8b commit 87ae513

File tree

2 files changed

+138
-79
lines changed

2 files changed

+138
-79
lines changed

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

+70-64
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.grpc.NameResolver;
3434
import io.grpc.NameResolver.ResolutionResult;
3535
import io.grpc.Status;
36+
import io.grpc.StatusOr;
3637
import io.grpc.SynchronizationContext;
3738
import io.grpc.SynchronizationContext.ScheduledHandle;
3839
import io.grpc.internal.BackoffPolicy;
@@ -657,79 +658,84 @@ private class NameResolverListener extends NameResolver.Listener2 {
657658

658659
@Override
659660
public void onResult(final ResolutionResult resolutionResult) {
660-
class NameResolved implements Runnable {
661-
@Override
662-
public void run() {
663-
if (shutdown) {
664-
return;
665-
}
666-
backoffPolicy = null; // reset backoff sequence if succeeded
667-
// Arbitrary priority notation for all DNS-resolved endpoints.
668-
String priorityName = priorityName(name, 0); // value doesn't matter
669-
List<EquivalentAddressGroup> addresses = new ArrayList<>();
670-
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
671-
// No weight attribute is attached, all endpoint-level LB policy should be able
672-
// to handle such it.
673-
String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
674-
Attributes attr = eag.getAttributes().toBuilder()
675-
.set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
676-
.set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
677-
.set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
678-
.build();
679-
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
680-
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
681-
addresses.add(eag);
682-
}
683-
PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
684-
name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
685-
lbRegistry, Collections.<DropOverload>emptyList());
686-
status = Status.OK;
687-
resolved = true;
688-
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
689-
handleEndpointResourceUpdate();
661+
syncContext.execute(() -> onResult2(resolutionResult));
662+
}
663+
664+
@Override
665+
public Status onResult2(final ResolutionResult resolutionResult) {
666+
if (shutdown) {
667+
return Status.OK;
668+
}
669+
// Arbitrary priority notation for all DNS-resolved endpoints.
670+
String priorityName = priorityName(name, 0); // value doesn't matter
671+
List<EquivalentAddressGroup> addresses = new ArrayList<>();
672+
StatusOr<List<EquivalentAddressGroup>> addressesOrError =
673+
resolutionResult.getAddressesOrError();
674+
if (addressesOrError.hasValue()) {
675+
backoffPolicy = null; // reset backoff sequence if succeeded
676+
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
677+
// No weight attribute is attached, all endpoint-level LB policy should be able
678+
// to handle such it.
679+
String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
680+
Attributes attr = eag.getAttributes().toBuilder()
681+
.set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
682+
.set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
683+
.set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
684+
.build();
685+
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
686+
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
687+
addresses.add(eag);
690688
}
689+
PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
690+
name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
691+
lbRegistry, Collections.<DropOverload>emptyList());
692+
status = Status.OK;
693+
resolved = true;
694+
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
695+
handleEndpointResourceUpdate();
696+
return Status.OK;
697+
} else {
698+
handleErrorInSyncContext(addressesOrError.getStatus());
699+
return addressesOrError.getStatus();
691700
}
692-
693-
syncContext.execute(new NameResolved());
694701
}
695702

696703
@Override
697704
public void onError(final Status error) {
698-
syncContext.execute(new Runnable() {
699-
@Override
700-
public void run() {
701-
if (shutdown) {
702-
return;
703-
}
704-
status = error;
705-
// NameResolver.Listener API cannot distinguish between address-not-found and
706-
// transient errors. If the error occurs in the first resolution, treat it as
707-
// address not found. Otherwise, either there is previously resolved addresses
708-
// previously encountered error, propagate the error to downstream/upstream and
709-
// let downstream/upstream handle it.
710-
if (!resolved) {
711-
resolved = true;
712-
handleEndpointResourceUpdate();
713-
} else {
714-
handleEndpointResolutionError();
715-
}
716-
if (scheduledRefresh != null && scheduledRefresh.isPending()) {
717-
return;
718-
}
719-
if (backoffPolicy == null) {
720-
backoffPolicy = backoffPolicyProvider.get();
721-
}
722-
long delayNanos = backoffPolicy.nextBackoffNanos();
723-
logger.log(XdsLogLevel.DEBUG,
705+
syncContext.execute(() -> handleErrorInSyncContext(error));
706+
}
707+
708+
private void handleErrorInSyncContext(final Status error) {
709+
if (shutdown) {
710+
return;
711+
}
712+
status = error;
713+
// NameResolver.Listener API cannot distinguish between address-not-found and
714+
// transient errors. If the error occurs in the first resolution, treat it as
715+
// address not found. Otherwise, either there is previously resolved addresses
716+
// previously encountered error, propagate the error to downstream/upstream and
717+
// let downstream/upstream handle it.
718+
if (!resolved) {
719+
resolved = true;
720+
handleEndpointResourceUpdate();
721+
} else {
722+
handleEndpointResolutionError();
723+
}
724+
if (scheduledRefresh != null && scheduledRefresh.isPending()) {
725+
return;
726+
}
727+
if (backoffPolicy == null) {
728+
backoffPolicy = backoffPolicyProvider.get();
729+
}
730+
long delayNanos = backoffPolicy.nextBackoffNanos();
731+
logger.log(XdsLogLevel.DEBUG,
724732
"Logical DNS resolver for cluster {0} encountered name resolution "
725-
+ "error: {1}, scheduling DNS resolution backoff for {2} ns",
733+
+ "error: {1}, scheduling DNS resolution backoff for {2} ns",
726734
name, error, delayNanos);
727-
scheduledRefresh =
735+
scheduledRefresh =
728736
syncContext.schedule(
729-
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
730-
timeService);
731-
}
732-
});
737+
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
738+
timeService);
733739
}
734740
}
735741
}

xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java

+68-15
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ public XdsClient returnObject(Object object) {
200200
private ArgumentCaptor<SubchannelPicker> pickerCaptor;
201201
private int xdsClientRefs;
202202
private ClusterResolverLoadBalancer loadBalancer;
203+
private NameResolverProvider fakeNameResolverProvider;
203204

204205
@Before
205206
public void setUp() throws URISyntaxException {
@@ -216,7 +217,8 @@ public void setUp() throws URISyntaxException {
216217
.setServiceConfigParser(mock(ServiceConfigParser.class))
217218
.setChannelLogger(mock(ChannelLogger.class))
218219
.build();
219-
nsRegistry.register(new FakeNameResolverProvider());
220+
fakeNameResolverProvider = new FakeNameResolverProvider(false);
221+
nsRegistry.register(fakeNameResolverProvider);
220222
when(helper.getNameResolverRegistry()).thenReturn(nsRegistry);
221223
when(helper.getNameResolverArgs()).thenReturn(args);
222224
when(helper.getSynchronizationContext()).thenReturn(syncContext);
@@ -826,6 +828,17 @@ public void handleEdsResource_noHealthyEndpoint() {
826828

827829
@Test
828830
public void onlyLogicalDnsCluster_endpointsResolved() {
831+
do_onlyLogicalDnsCluster_endpointsResolved();
832+
}
833+
834+
@Test
835+
public void oldListenerCallback_onlyLogicalDnsCluster_endpointsResolved() {
836+
nsRegistry.deregister(fakeNameResolverProvider);
837+
nsRegistry.register(new FakeNameResolverProvider(true));
838+
do_onlyLogicalDnsCluster_endpointsResolved();
839+
}
840+
841+
void do_onlyLogicalDnsCluster_endpointsResolved() {
829842
ClusterResolverConfig config = new ClusterResolverConfig(
830843
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
831844
deliverLbConfig(config);
@@ -854,7 +867,6 @@ public void onlyLogicalDnsCluster_endpointsResolved() {
854867
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME);
855868
assertThat(childBalancer.addresses.get(1).getAttributes()
856869
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME);
857-
858870
}
859871

860872
@Test
@@ -874,37 +886,48 @@ public void onlyLogicalDnsCluster_handleRefreshNameResolution() {
874886
}
875887

876888
@Test
877-
public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
889+
public void resolutionError_backoffAndRefresh() {
890+
do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh();
891+
}
892+
893+
@Test
894+
public void oldListenerCallback_resolutionError_backoffAndRefresh() {
895+
nsRegistry.deregister(fakeNameResolverProvider);
896+
nsRegistry.register(new FakeNameResolverProvider(true));
897+
do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh();
898+
}
899+
900+
void do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
878901
InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider,
879-
backoffPolicy1, backoffPolicy2);
902+
backoffPolicy1, backoffPolicy2);
880903
ClusterResolverConfig config = new ClusterResolverConfig(
881-
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
904+
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
882905
deliverLbConfig(config);
883906
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
884907
assertThat(childBalancers).isEmpty();
885908
Status error = Status.UNAVAILABLE.withDescription("cannot reach DNS server");
886909
resolver.deliverError(error);
887910
inOrder.verify(helper).updateBalancingState(
888-
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
911+
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
889912
assertPicker(pickerCaptor.getValue(), error, null);
890913
assertThat(resolver.refreshCount).isEqualTo(0);
891914
inOrder.verify(backoffPolicyProvider).get();
892915
inOrder.verify(backoffPolicy1).nextBackoffNanos();
893916
assertThat(fakeClock.getPendingTasks()).hasSize(1);
894917
assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS))
895-
.isEqualTo(1L);
918+
.isEqualTo(1L);
896919
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
897920
assertThat(resolver.refreshCount).isEqualTo(1);
898921

899922
error = Status.UNKNOWN.withDescription("I am lost");
900923
resolver.deliverError(error);
901924
inOrder.verify(helper).updateBalancingState(
902-
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
925+
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
903926
inOrder.verify(backoffPolicy1).nextBackoffNanos();
904927
assertPicker(pickerCaptor.getValue(), error, null);
905928
assertThat(fakeClock.getPendingTasks()).hasSize(1);
906929
assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS))
907-
.isEqualTo(10L);
930+
.isEqualTo(10L);
908931
fakeClock.forwardTime(10L, TimeUnit.SECONDS);
909932
assertThat(resolver.refreshCount).isEqualTo(2);
910933

@@ -914,7 +937,7 @@ public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
914937
resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2));
915938
assertThat(childBalancers).hasSize(1);
916939
assertAddressesEqual(Arrays.asList(endpoint1, endpoint2),
917-
Iterables.getOnlyElement(childBalancers).addresses);
940+
Iterables.getOnlyElement(childBalancers).addresses);
918941

919942
assertThat(fakeClock.getPendingTasks()).isEmpty();
920943
inOrder.verifyNoMoreInteractions();
@@ -1319,10 +1342,18 @@ void deliverError(Status error) {
13191342
}
13201343

13211344
private class FakeNameResolverProvider extends NameResolverProvider {
1345+
private final boolean useOldListenerCallback;
1346+
1347+
private FakeNameResolverProvider(boolean useOldListenerCallback) {
1348+
this.useOldListenerCallback = useOldListenerCallback;
1349+
}
1350+
13221351
@Override
13231352
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
13241353
assertThat(targetUri.getScheme()).isEqualTo("dns");
1325-
FakeNameResolver resolver = new FakeNameResolver(targetUri);
1354+
FakeNameResolver resolver = useOldListenerCallback
1355+
? new FakeNameResolverUsingOldListenerCallback(targetUri)
1356+
: new FakeNameResolver(targetUri);
13261357
resolvers.add(resolver);
13271358
return resolver;
13281359
}
@@ -1343,9 +1374,10 @@ protected int priority() {
13431374
}
13441375
}
13451376

1377+
13461378
private class FakeNameResolver extends NameResolver {
13471379
private final URI targetUri;
1348-
private Listener2 listener;
1380+
protected Listener2 listener;
13491381
private int refreshCount;
13501382

13511383
private FakeNameResolver(URI targetUri) {
@@ -1372,12 +1404,33 @@ public void shutdown() {
13721404
resolvers.remove(this);
13731405
}
13741406

1375-
private void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
1407+
protected void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
1408+
syncContext.execute(() -> {
1409+
Status ret = listener.onResult2(ResolutionResult.newBuilder()
1410+
.setAddressesOrError(StatusOr.fromValue(addresses)).build());
1411+
assertThat(ret.getCode()).isEqualTo(Status.Code.OK);
1412+
});
1413+
}
1414+
1415+
protected void deliverError(Status error) {
1416+
syncContext.execute(() -> listener.onResult2(ResolutionResult.newBuilder()
1417+
.setAddressesOrError(StatusOr.fromStatus(error)).build()));
1418+
}
1419+
}
1420+
1421+
private class FakeNameResolverUsingOldListenerCallback extends FakeNameResolver {
1422+
private FakeNameResolverUsingOldListenerCallback(URI targetUri) {
1423+
super(targetUri);
1424+
}
1425+
1426+
@Override
1427+
protected void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
13761428
listener.onResult(ResolutionResult.newBuilder()
1377-
.setAddressesOrError(StatusOr.fromValue(addresses)).build());
1429+
.setAddressesOrError(StatusOr.fromValue(addresses)).build());
13781430
}
13791431

1380-
private void deliverError(Status error) {
1432+
@Override
1433+
protected void deliverError(Status error) {
13811434
listener.onError(error);
13821435
}
13831436
}

0 commit comments

Comments
 (0)