Skip to content

Commit 5bc56a9

Browse files
committed
Bug 37235974 - [37181828->24.09.1] When gRPC proxy restarts Listeners don't appear to be cleaned up
(merge ce/main -> ce/24.09) [git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v24.09/": change = 112233]
1 parent 50dcab5 commit 5bc56a9

File tree

15 files changed

+1084
-54
lines changed

15 files changed

+1084
-54
lines changed

prj/coherence-grpc-proxy-common/src/main/java/com/oracle/coherence/grpc/proxy/common/BaseGrpcAcceptorController.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import com.oracle.coherence.common.base.Exceptions;
1111

12+
import com.oracle.coherence.common.base.Logger;
1213
import com.tangosol.internal.net.service.peer.acceptor.DefaultGrpcAcceptorDependencies;
1314
import com.tangosol.internal.net.service.peer.acceptor.GrpcAcceptorDependencies;
1415

@@ -98,6 +99,21 @@ public final void stop()
9899
m_fRunning = false;
99100
m_healthStatusManager.enterTerminalState();
100101
m_healthStatusManager = null;
102+
List<BindableGrpcProxyService> list = m_listServices;
103+
if (list != null)
104+
{
105+
for (BindableGrpcProxyService service : list)
106+
{
107+
try
108+
{
109+
service.close();
110+
}
111+
catch (Exception e)
112+
{
113+
Logger.err(e);
114+
}
115+
}
116+
}
101117
stopInternal();
102118
m_listServices = null;
103119
}

prj/coherence-grpc-proxy-common/src/main/java/com/oracle/coherence/grpc/proxy/common/BaseGrpcServiceImpl.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@
4343

4444
import io.grpc.Status;
4545

46+
import java.io.Closeable;
4647
import java.util.Objects;
4748
import java.util.Optional;
4849

4950
import java.util.concurrent.Callable;
5051
import java.util.concurrent.CompletionStage;
52+
import java.util.concurrent.ConcurrentLinkedQueue;
5153
import java.util.concurrent.Executor;
5254

5355
import java.util.function.Function;
@@ -405,6 +407,16 @@ public Serializer getSerializer(String sFormat, ClassLoader loader)
405407
return serializer;
406408
}
407409

410+
public void addCloseable(Closeable closeable)
411+
{
412+
f_listCloseable.add(closeable);
413+
}
414+
415+
public void removeCloseable(Closeable closeable)
416+
{
417+
f_listCloseable.remove(closeable);
418+
}
419+
408420
// ----- inner interface: Dependencies ----------------------------------
409421

410422
/**
@@ -524,4 +536,9 @@ public void setConfigurableCacheFactorySupplier(Function<String, ConfigurableCac
524536
* The transfer threshold used for paged requests.
525537
*/
526538
protected long transferThreshold = DEFAULT_TRANSFER_THRESHOLD;
539+
540+
/**
541+
* A list of things to close when this service is stopped.
542+
*/
543+
protected ConcurrentLinkedQueue<Closeable> f_listCloseable = new ConcurrentLinkedQueue<>();
527544
}

prj/coherence-grpc-proxy-common/src/main/java/com/oracle/coherence/grpc/proxy/common/BindableGrpcProxyService.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,18 @@
1717
public interface BindableGrpcProxyService
1818
extends GrpcProxyService, BindableService
1919
{
20+
/**
21+
* Close this service and clean up any state.
22+
*/
23+
void close();
24+
25+
/**
26+
* Return {@code true} if this service is enabled, otherwise {@code false}.
27+
*
28+
* @return {@code true} if this service is enabled, otherwise {@code false}
29+
*/
30+
default boolean isEnabled()
31+
{
32+
return true;
33+
}
2034
}

prj/coherence-grpc-proxy-common/src/main/java/com/oracle/coherence/grpc/proxy/common/BindableServiceFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ static List<BindableGrpcProxyService> discoverServices(GrpcServiceDependencies d
3939
List<BindableGrpcProxyService> list = factory.createServices(depsService);
4040
if (list != null)
4141
{
42-
listService.addAll(list);
42+
list.stream()
43+
.filter(BindableGrpcProxyService::isEnabled)
44+
.forEach(listService::add);
4345
}
4446
}
4547
return listService;

prj/coherence-grpc-proxy-common/src/main/java/com/oracle/coherence/grpc/proxy/common/ProxyServiceChannel.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import io.grpc.StatusRuntimeException;
4242
import io.grpc.stub.StreamObserver;
4343

44+
import java.io.Closeable;
45+
import java.io.IOException;
4446
import java.net.InetSocketAddress;
4547
import java.net.SocketAddress;
4648

@@ -61,7 +63,7 @@
6163
*/
6264
@SuppressWarnings("rawtypes")
6365
public class ProxyServiceChannel
64-
implements StreamObserver<ProxyRequest>
66+
implements StreamObserver<ProxyRequest>, Closeable
6567
{
6668
/**
6769
* Create a {@link ProxyServiceChannel}.
@@ -86,6 +88,7 @@ protected ProxyServiceChannel(GrpcService service, StreamObserver<ProxyResponse>
8688
f_service = service;
8789
f_observer = SafeStreamObserver.ensureSafeObserver(LockingStreamObserver.ensureLockingObserver(observer));
8890
f_memberSupplier = Objects.requireNonNullElse(memberSupplier, () -> CacheFactory.getCluster().getLocalMember());
91+
service.addCloseable(this);
8992
}
9093

9194
// ----- StreamObserver methods -----------------------------------------
@@ -163,6 +166,7 @@ public void onCompleted()
163166
{
164167
m_protocol.close();
165168
}
169+
f_service.removeCloseable(this);
166170
}
167171

168172
/**
@@ -175,6 +179,12 @@ public Serializer getSerializer()
175179
return m_protocol.getSerializer();
176180
}
177181

182+
@Override
183+
public void close() throws IOException
184+
{
185+
onCompleted();
186+
}
187+
178188
// ----- helper methods ---------------------------------------------
179189

180190
/**

prj/coherence-grpc-proxy-common/src/main/java/com/oracle/coherence/grpc/proxy/common/ProxyServiceGrpcImpl.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,23 @@
77

88
package com.oracle.coherence.grpc.proxy.common;
99

10+
import com.oracle.coherence.common.base.Logger;
1011
import com.oracle.coherence.grpc.messages.proxy.v1.ProxyRequest;
1112
import com.oracle.coherence.grpc.messages.proxy.v1.ProxyResponse;
1213
import com.oracle.coherence.grpc.services.proxy.v1.ProxyServiceGrpc;
1314

15+
import com.tangosol.coherence.config.Config;
1416
import com.tangosol.net.grpc.GrpcDependencies;
1517

1618
import io.grpc.ServerServiceDefinition;
1719

1820
import io.grpc.stub.StreamObserver;
1921

22+
import java.io.IOException;
23+
import java.util.List;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.ConcurrentLinkedQueue;
26+
2027
/**
2128
* The implementation of the generated {@link ProxyServiceGrpc.AsyncService}.
2229
*/
@@ -44,20 +51,43 @@ public final ServerServiceDefinition bindService()
4451
return ProxyServiceGrpc.bindService(this);
4552
}
4653

54+
@Override
55+
public boolean isEnabled()
56+
{
57+
return Config.getBoolean(ProxyServiceGrpc.class.getName() + ".enabled", true);
58+
}
59+
4760
// ----- NamedCacheChannelGrpc.NamedCacheChannelImplBase methods --------
4861

4962
@Override
5063
public StreamObserver<ProxyRequest> subChannel(StreamObserver<ProxyResponse> observer)
5164
{
5265
ProxyServiceChannel channel = new ProxyServiceChannel(this, observer);
5366
GrpcDependencies.ServerType type = getDependencies().getServerType();
67+
f_channels.add(channel);
5468
if (type == GrpcDependencies.ServerType.Asynchronous)
5569
{
5670
return channel.async(f_executor);
5771
}
5872
return channel;
5973
}
6074

75+
@Override
76+
public void close()
77+
{
78+
f_listCloseable.forEach(closeable ->
79+
{
80+
try
81+
{
82+
closeable.close();
83+
}
84+
catch (Exception e)
85+
{
86+
Logger.err(e);
87+
}
88+
});
89+
}
90+
6191
// ----- inner interface: Dependencies ----------------------------------
6292

6393
/**
@@ -99,4 +129,9 @@ public DefaultDependencies(Dependencies deps)
99129
* The name to use for the management MBean.
100130
*/
101131
public static final String MBEAN_NAME = "type=GrpcProxy";
132+
133+
/**
134+
*
135+
*/
136+
private final ConcurrentLinkedQueue<ProxyServiceChannel> f_channels = new ConcurrentLinkedQueue<>();
102137
}

prj/coherence-grpc-proxy-common/src/main/java/com/oracle/coherence/grpc/proxy/common/cache/NamedCacheProxyProtocol.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,21 @@ public Serializer getSerializer()
113113
@Override
114114
public void close()
115115
{
116-
m_aProxy.clear();
116+
f_lock.lock();
117+
try
118+
{
119+
m_fClosed = true;
120+
for (NamedCacheProxy proxy : m_aProxy)
121+
{
122+
com.tangosol.net.messaging.Channel channel = proxy.getChannel();
123+
proxy.unregisterChannel(channel);
124+
}
125+
m_aProxy.clear();
126+
}
127+
finally
128+
{
129+
f_lock.unlock();
130+
}
117131
super.close();
118132
}
119133

@@ -335,6 +349,10 @@ protected void onDestroyCache(int nId, StreamObserver<NamedCacheResponse> observ
335349
f_lock.lock();
336350
try
337351
{
352+
if (m_fClosed)
353+
{
354+
throw new IllegalStateException("this proxy channel is closed");
355+
}
338356
NamedCacheProxy proxy = m_aProxy.remove(nId);
339357
if (proxy != null)
340358
{
@@ -354,6 +372,11 @@ protected void onEnsureCache(EnsureCacheRequest request, StreamObserver<NamedCac
354372
f_lock.lock();
355373
try
356374
{
375+
if (m_fClosed)
376+
{
377+
throw new IllegalStateException("this proxy channel is closed");
378+
}
379+
357380
int cacheId;
358381
do
359382
{
@@ -1144,4 +1167,9 @@ private void send(ResponseType type)
11441167
* An array of {@link NamedCacheProxy} instances indexed by the cache identifier.
11451168
*/
11461169
protected final LongArray<NamedCacheProxy> m_aProxy = new SparseArray<>();
1170+
1171+
/**
1172+
* A flag indicating whether this proxy protocol is closed.
1173+
*/
1174+
protected boolean m_fClosed;
11471175
}

prj/coherence-grpc-proxy-common/src/main/java/com/oracle/coherence/grpc/proxy/common/v0/BaseNamedCacheServiceImpl.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@
5353

5454
import io.grpc.Status;
5555

56+
import io.grpc.StatusRuntimeException;
5657
import io.grpc.stub.StreamObserver;
5758

59+
import java.util.ArrayList;
5860
import java.util.Comparator;
5961
import java.util.HashMap;
6062
import java.util.List;
@@ -65,6 +67,8 @@
6567
import java.util.concurrent.CompletionStage;
6668
import java.util.concurrent.Executor;
6769

70+
import java.util.concurrent.locks.Lock;
71+
import java.util.concurrent.locks.ReentrantLock;
6872
import java.util.stream.Collectors;
6973

7074
import static com.oracle.coherence.grpc.proxy.common.v0.ResponseHandlers.handleUnary;
@@ -95,6 +99,33 @@ public BaseNamedCacheServiceImpl(NamedCacheService.Dependencies dependencies)
9599

96100
// ----- BaseGrpcServiceImpl implementation -----------------------------
97101

102+
@Override
103+
public void close()
104+
{
105+
f_lock.lock();
106+
try
107+
{
108+
m_fClosed = true;
109+
}
110+
finally
111+
{
112+
f_lock.unlock();
113+
}
114+
115+
f_listCloseable.forEach(closeable ->
116+
{
117+
try
118+
{
119+
closeable.close();
120+
}
121+
catch (Exception e)
122+
{
123+
Logger.err(e);
124+
}
125+
});
126+
f_listCloseable.clear();
127+
}
128+
98129
// ----- addIndex -------------------------------------------------------
99130

100131
/**
@@ -253,7 +284,21 @@ public void destroy(DestroyRequest request, StreamObserver<Empty> observer)
253284
@Override
254285
public StreamObserver<MapListenerRequest> events(StreamObserver<MapListenerResponse> observer)
255286
{
256-
return new MapListenerProxy(this, SafeStreamObserver.ensureSafeObserver(observer), m_nEventsHeartbeat);
287+
f_lock.lock();
288+
try
289+
{
290+
if (m_fClosed)
291+
{
292+
throw Status.UNAVAILABLE.asRuntimeException();
293+
}
294+
MapListenerProxy proxy = new MapListenerProxy(this, SafeStreamObserver.ensureSafeObserver(observer), m_nEventsHeartbeat);
295+
addCloseable(proxy);
296+
return proxy;
297+
}
298+
finally
299+
{
300+
f_lock.unlock();
301+
}
257302
}
258303

259304
// ----- getAll ---------------------------------------------------------
@@ -617,4 +662,8 @@ public <Req> CacheRequestHolder<Req, Void> createRequestHolder(Req request,
617662
// ----- data members ---------------------------------------------------
618663

619664
private final long m_nEventsHeartbeat;
665+
666+
protected volatile boolean m_fClosed = false;
667+
668+
protected final Lock f_lock = new ReentrantLock();
620669
}

0 commit comments

Comments
 (0)