Skip to content

Commit 112607a

Browse files
committed
feat: add option to rebuild gRPC connection on error
Signed-off-by: Todd Baert <[email protected]>
1 parent 1a85794 commit 112607a

File tree

6 files changed

+208
-24
lines changed

6 files changed

+208
-24
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public final class Config {
2020
static final int DEFAULT_MAX_CACHE_SIZE = 1000;
2121
static final int DEFAULT_OFFLINE_POLL_MS = 5000;
2222
static final long DEFAULT_KEEP_ALIVE = 0;
23+
static final String DEFAULT_REINITIALIZE_ON_ERROR = "false";
2324

2425
static final String RESOLVER_ENV_VAR = "FLAGD_RESOLVER";
2526
static final String HOST_ENV_VAR_NAME = "FLAGD_HOST";
@@ -51,6 +52,7 @@ public final class Config {
5152
static final String KEEP_ALIVE_MS_ENV_VAR_NAME = "FLAGD_KEEP_ALIVE_TIME_MS";
5253
static final String TARGET_URI_ENV_VAR_NAME = "FLAGD_TARGET_URI";
5354
static final String STREAM_RETRY_GRACE_PERIOD = "FLAGD_RETRY_GRACE_PERIOD";
55+
static final String REINITIALIZE_ON_ERROR_ENV_VAR_NAME = "FLAGD_REINITIALIZE_ON_ERROR";
5456

5557
static final String RESOLVER_RPC = "rpc";
5658
static final String RESOLVER_IN_PROCESS = "in-process";

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,16 @@ public class FlagdOptions {
204204
@Builder.Default
205205
private String defaultAuthority = fallBackToEnvOrDefault(Config.DEFAULT_AUTHORITY_ENV_VAR_NAME, null);
206206

207+
/**
208+
* !EXPERIMENTAL!
209+
* Whether to reinitialize the channel (TCP connection) after the grace period is exceeded.
210+
* This can help recover from connection issues by creating fresh connections.
211+
* Particularly useful for troubleshooting network issues related to proxies or service meshes.
212+
*/
213+
@Builder.Default
214+
private boolean reinitializeOnError = Boolean.parseBoolean(
215+
fallBackToEnvOrDefault(Config.REINITIALIZE_ON_ERROR_ENV_VAR_NAME, Config.DEFAULT_REINITIALIZE_ON_ERROR));
216+
207217
/**
208218
* Builder overwrite in order to customize the "build" method.
209219
*

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class InProcessResolver implements Resolver {
4141
private final Consumer<FlagdProviderEvent> onConnectionEvent;
4242
private final Operator operator;
4343
private final String scope;
44+
private final QueueSource queueSource;
4445

4546
/**
4647
* Resolves flag values using
@@ -52,7 +53,8 @@ public class InProcessResolver implements Resolver {
5253
* connection/stream
5354
*/
5455
public InProcessResolver(FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
55-
this.flagStore = new FlagStore(getConnector(options, onConnectionEvent));
56+
this.queueSource = getQueueSource(options, onConnectionEvent);
57+
this.flagStore = new FlagStore(queueSource);
5658
this.onConnectionEvent = onConnectionEvent;
5759
this.operator = new Operator();
5860
this.scope = options.getSelector();
@@ -94,6 +96,21 @@ public void init() throws Exception {
9496
stateWatcher.start();
9597
}
9698

99+
/**
100+
* Called when the provider enters error state after grace period.
101+
* Attempts to reinitialize the sync connector if enabled.
102+
*/
103+
@Override
104+
public void onError() {
105+
if (queueSource instanceof SyncStreamQueueSource) {
106+
SyncStreamQueueSource syncConnector = (SyncStreamQueueSource) queueSource;
107+
if (syncConnector.getStreamQueue() != null) {
108+
// Only reinitialize if option is enabled
109+
syncConnector.reinitializeChannelComponents();
110+
}
111+
}
112+
}
113+
97114
/**
98115
* Shutdown in-process resolver.
99116
*
@@ -147,7 +164,7 @@ public ProviderEvaluation<Value> objectEvaluation(String key, Value defaultValue
147164
.build();
148165
}
149166

150-
static QueueSource getConnector(final FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
167+
static QueueSource getQueueSource(final FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
151168
if (options.getCustomConnector() != null) {
152169
return options.getCustomConnector();
153170
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@
2828
import lombok.extern.slf4j.Slf4j;
2929

3030
/**
31-
* Implements the {@link QueueSource} contract and emit flags obtained from flagd sync gRPC contract.
31+
* Implements the {@link QueueSource} contract and emit flags obtained from
32+
* flagd sync gRPC contract.
3233
*/
3334
@Slf4j
3435
@SuppressFBWarnings(
3536
value = {"EI_EXPOSE_REP"},
36-
justification = "Random is used to generate a variation & flag configurations require exposing")
37+
justification = "We need to expose the BlockingQueue to allow consumers to read from it")
3738
public class SyncStreamQueueSource implements QueueSource {
3839
private static final int QUEUE_SIZE = 5;
3940

@@ -45,13 +46,17 @@ public class SyncStreamQueueSource implements QueueSource {
4546
private final String selector;
4647
private final String providerId;
4748
private final boolean syncMetadataDisabled;
48-
private final ChannelConnector channelConnector;
49+
private final boolean reinitializeOnError;
50+
private final FlagdOptions options;
51+
private final Consumer<FlagdProviderEvent> onConnectionEvent;
4952
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
50-
private final FlagSyncServiceStub flagSyncStub;
51-
private final FlagSyncServiceBlockingStub metadataStub;
53+
private volatile ChannelConnector channelConnector;
54+
private volatile FlagSyncServiceStub flagSyncStub;
55+
private volatile FlagSyncServiceBlockingStub metadataStub;
5256

5357
/**
54-
* Creates a new SyncStreamQueueSource responsible for observing the event stream.
58+
* Creates a new SyncStreamQueueSource responsible for observing the event
59+
* stream.
5560
*/
5661
public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderEvent> onConnectionEvent) {
5762
streamDeadline = options.getStreamDeadlineMs();
@@ -60,11 +65,10 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer<FlagdProviderE
6065
providerId = options.getProviderId();
6166
maxBackoffMs = options.getRetryBackoffMaxMs();
6267
syncMetadataDisabled = options.isSyncMetadataDisabled();
63-
channelConnector = new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
64-
flagSyncStub =
65-
FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
66-
metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
67-
.withWaitForReady();
68+
reinitializeOnError = options.isReinitializeOnError();
69+
this.options = options;
70+
this.onConnectionEvent = onConnectionEvent;
71+
initializeChannelComponents();
6872
}
6973

7074
// internal use only
@@ -81,7 +85,52 @@ protected SyncStreamQueueSource(
8185
maxBackoffMs = options.getRetryBackoffMaxMs();
8286
flagSyncStub = stubMock;
8387
syncMetadataDisabled = options.isSyncMetadataDisabled();
88+
reinitializeOnError = options.isReinitializeOnError();
8489
metadataStub = blockingStubMock;
90+
this.options = options;
91+
this.onConnectionEvent = null;
92+
}
93+
94+
/** Initialize channel connector and stubs. */
95+
private synchronized void initializeChannelComponents() {
96+
ChannelConnector newConnector =
97+
new ChannelConnector(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
98+
FlagSyncServiceStub newFlagSyncStub =
99+
FlagSyncServiceGrpc.newStub(newConnector.getChannel()).withWaitForReady();
100+
FlagSyncServiceBlockingStub newMetadataStub =
101+
FlagSyncServiceGrpc.newBlockingStub(newConnector.getChannel()).withWaitForReady();
102+
103+
// Atomic assignment of all components
104+
channelConnector = newConnector;
105+
flagSyncStub = newFlagSyncStub;
106+
metadataStub = newMetadataStub;
107+
}
108+
109+
/** Reinitialize channel connector and stubs on error. */
110+
public synchronized void reinitializeChannelComponents() {
111+
if (!reinitializeOnError || shutdown.get()) {
112+
return;
113+
}
114+
115+
log.info("Reinitializing channel gRPC components in attempt to restore stream...");
116+
ChannelConnector oldConnector = channelConnector;
117+
118+
try {
119+
// Create new channel components first
120+
initializeChannelComponents();
121+
} catch (Exception e) {
122+
log.error("Failed to reinitialize channel components", e);
123+
return;
124+
}
125+
126+
// Shutdown old connector after successful reinitialization
127+
if (oldConnector != null) {
128+
try {
129+
oldConnector.shutdown();
130+
} catch (Exception e) {
131+
log.debug("Error shutting down old channel connector during reinitialization", e);
132+
}
133+
}
85134
}
86135

87136
/** Initialize sync stream connector. */
@@ -159,7 +208,8 @@ private void observeSyncStream() {
159208
log.info("Shutdown invoked, exiting event stream listener");
160209
}
161210

162-
// TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
211+
// TODO: remove the metadata call entirely after
212+
// https://github.com/open-feature/flagd/issues/1584
163213
private Struct getMetadata() {
164214
if (syncMetadataDisabled) {
165215
return null;
@@ -180,7 +230,8 @@ private Struct getMetadata() {
180230

181231
return null;
182232
} catch (StatusRuntimeException e) {
183-
// In newer versions of flagd, metadata is part of the sync stream. If the method is unimplemented, we
233+
// In newer versions of flagd, metadata is part of the sync stream. If the
234+
// method is unimplemented, we
184235
// can ignore the error
185236
if (e.getStatus() != null
186237
&& Status.Code.UNIMPLEMENTED.equals(e.getStatus().getCode())) {

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
import static org.junit.jupiter.api.Assertions.assertNotNull;
1818
import static org.junit.jupiter.api.Assertions.assertThrows;
1919
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.never;
22+
import static org.mockito.Mockito.times;
23+
import static org.mockito.Mockito.verify;
24+
import static org.mockito.Mockito.when;
2025

2126
import dev.openfeature.contrib.providers.flagd.Config;
2227
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
@@ -51,6 +56,36 @@
5156
import org.junit.jupiter.api.Test;
5257

5358
class InProcessResolverTest {
59+
@Test
60+
void onError_reinitializesOnlyIfOptionTrue() throws Exception {
61+
// Setup: option true, should call reinitializeChannelComponents
62+
FlagdOptions options = FlagdOptions.builder().reinitializeOnError(true).build();
63+
SyncStreamQueueSource mockConnector = mock(SyncStreamQueueSource.class);
64+
// Mock getStreamQueue to return a non-null queue
65+
when(mockConnector.getStreamQueue()).thenReturn(new LinkedBlockingQueue<>());
66+
InProcessResolver resolver = new InProcessResolver(options, e -> {});
67+
// Inject mock connector
68+
java.lang.reflect.Field queueSourceField = InProcessResolver.class.getDeclaredField("queueSource");
69+
queueSourceField.setAccessible(true);
70+
queueSourceField.set(resolver, mockConnector);
71+
resolver.onError();
72+
73+
verify(mockConnector, times(1)).reinitializeChannelComponents();
74+
}
75+
76+
@Test
77+
void onError_doesNotReinitializeIfOptionFalse() throws Exception {
78+
// Setup: option false, should NOT call reinitializeChannelComponents
79+
FlagdOptions options = FlagdOptions.builder().reinitializeOnError(false).build();
80+
SyncStreamQueueSource mockConnector = mock(SyncStreamQueueSource.class);
81+
InProcessResolver resolver = new InProcessResolver(options, e -> {});
82+
// Inject mock connector
83+
java.lang.reflect.Field queueSourceField = InProcessResolver.class.getDeclaredField("queueSource");
84+
queueSourceField.setAccessible(true);
85+
queueSourceField.set(resolver, mockConnector);
86+
resolver.onError();
87+
verify(mockConnector, never()).reinitializeChannelComponents();
88+
}
5489

5590
@Test
5691
public void connectorSetup() {
@@ -70,9 +105,9 @@ public void connectorSetup() {
70105
.build();
71106

72107
// then
73-
assertInstanceOf(SyncStreamQueueSource.class, InProcessResolver.getConnector(forGrpcOptions, e -> {}));
74-
assertInstanceOf(FileQueueSource.class, InProcessResolver.getConnector(forOfflineOptions, e -> {}));
75-
assertInstanceOf(MockConnector.class, InProcessResolver.getConnector(forCustomConnectorOptions, e -> {}));
108+
assertInstanceOf(SyncStreamQueueSource.class, InProcessResolver.getQueueSource(forGrpcOptions, e -> {}));
109+
assertInstanceOf(FileQueueSource.class, InProcessResolver.getQueueSource(forOfflineOptions, e -> {}));
110+
assertInstanceOf(MockConnector.class, InProcessResolver.getQueueSource(forCustomConnectorOptions, e -> {}));
76111
}
77112

78113
@Test

0 commit comments

Comments
 (0)