Skip to content

Commit 939c920

Browse files
committed
Enh 37387064 - [37381796->24.09.1] Topics: general refactoring and hardening
(merge ce/main -> ce/24.09 113314) [git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v24.09/": change = 113315]
1 parent 1cad1d2 commit 939c920

File tree

5 files changed

+128
-55
lines changed

5 files changed

+128
-55
lines changed

prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/safeNamedTopic/SafePublisherConnector.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
/*
2-
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
66
*/
77

88
package com.tangosol.coherence.component.util.safeNamedTopic;
99

10+
import com.oracle.coherence.common.base.Logger;
1011
import com.tangosol.coherence.component.util.SafeNamedTopic;
1112

1213
import com.tangosol.internal.net.topic.NamedTopicPublisher;
@@ -25,7 +26,7 @@
2526
import java.util.Arrays;
2627
import java.util.List;
2728

28-
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.CompletionStage;
2930
import java.util.concurrent.locks.Lock;
3031
import java.util.concurrent.locks.ReentrantLock;
3132

@@ -160,12 +161,24 @@ public void removeListener(NamedTopicPublisher.PublisherListener listener)
160161
protected PublisherConnector<V> ensureRunningConnector()
161162
{
162163
PublisherConnector<V> connector = m_connector;
163-
if (connector == null || !connector.isActive())
164+
TopicService service = getTopicService();
165+
if (!service.isRunning() || connector == null || !connector.isActive())
164166
{
165167
f_lock.lock();
166168
try
167169
{
168170
connector = m_connector;
171+
172+
if (connector != null)
173+
{
174+
service = getTopicService();
175+
if (!service.isRunning())
176+
{
177+
Logger.info("Restarting Publisher connector, topic=" + getTopicName());
178+
connector = null;
179+
}
180+
}
181+
169182
if (connector == null || !connector.isActive())
170183
{
171184
if (isReleased() || isDestroyed())
@@ -260,7 +273,7 @@ public void ensureConnected()
260273
}
261274

262275
@Override
263-
public CompletableFuture<?> initialize()
276+
public CompletionStage<?> initialize()
264277
{
265278
return ensureRunningChannelConnector().initialize();
266279
}
@@ -272,7 +285,7 @@ public void offer(Object oCookie, List<Binary> listBinary, int nNotifyPostFull,
272285
}
273286

274287
@Override
275-
public CompletableFuture<?> prepareOfferRetry(Object oCookie)
288+
public CompletionStage<?> prepareOfferRetry(Object oCookie)
276289
{
277290
return ensureRunningChannelConnector().prepareOfferRetry(oCookie);
278291
}
@@ -283,17 +296,38 @@ public TopicDependencies getTopicDependencies()
283296
return SafePublisherConnector.this.getTopicDependencies();
284297
}
285298

299+
@Override
300+
public TopicService getTopicService()
301+
{
302+
return f_safeTopic.getTopicService();
303+
}
304+
286305
// ----- helper methods ---------------------------------------------
287306

288307
protected PublisherChannelConnector<V> ensureRunningChannelConnector()
289308
{
290309
PublisherChannelConnector<V> connector = m_channelConnector;
291-
if (connector == null || !connector.isActive())
310+
TopicService service = getTopicService();
311+
if (!service.isRunning())
312+
{
313+
System.out.println();
314+
}
315+
if (!service.isRunning() || connector == null || !connector.isActive())
292316
{
293317
f_lock.lock();
294318
try
295319
{
296320
connector = m_channelConnector;
321+
if (connector != null)
322+
{
323+
service = getTopicService();
324+
if (!service.isRunning())
325+
{
326+
Logger.info("Restarting Publisher channel connector, topic=" + getTopicName() + ", channel=" + f_nChannel);
327+
connector = null;
328+
}
329+
}
330+
297331
if (connector == null || !connector.isActive())
298332
{
299333
connector = m_channelConnector = ensureRunningConnector().createChannelConnector(f_nChannel);

prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/ConverterPublisherConnector.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
@@ -18,7 +18,7 @@
1818
import java.util.List;
1919
import java.util.Map;
2020

21-
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.CompletionStage;
2222
import java.util.concurrent.ConcurrentHashMap;
2323

2424
import java.util.function.BiConsumer;
@@ -195,7 +195,7 @@ public void ensureConnected()
195195
}
196196

197197
@Override
198-
public CompletableFuture<?> initialize()
198+
public CompletionStage<?> initialize()
199199
{
200200
return f_channelConnector.initialize();
201201
}
@@ -209,7 +209,7 @@ public void offer(Object oCookie, List<Binary> listBinary, int nNotifyPostFull,
209209
}
210210

211211
@Override
212-
public CompletableFuture<?> prepareOfferRetry(Object oCookie)
212+
public CompletionStage<?> prepareOfferRetry(Object oCookie)
213213
{
214214
return f_channelConnector.prepareOfferRetry(oCookie);
215215
}
@@ -220,6 +220,12 @@ public TopicDependencies getTopicDependencies()
220220
return f_channelConnector.getTopicDependencies();
221221
}
222222

223+
@Override
224+
public TopicService getTopicService()
225+
{
226+
return f_channelConnector.getTopicService();
227+
}
228+
223229
// ----- data members -----------------------------------------------
224230

225231
/**

prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicPublisherChannel.java

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
@@ -36,6 +36,8 @@
3636

3737
import java.util.concurrent.CompletableFuture;
3838

39+
import java.util.concurrent.locks.Lock;
40+
import java.util.concurrent.locks.ReentrantLock;
3941
import java.util.function.BiConsumer;
4042
import java.util.function.BiFunction;
4143

@@ -115,52 +117,67 @@ public CompletableFuture<Publisher.Status> publish(Binary binValue)
115117
*/
116118
private void ensureConnected()
117119
{
118-
TopicDependencies dependencies = m_connector.getTopicDependencies();
119-
long retry = dependencies.getReconnectRetryMillis();
120-
long now = System.currentTimeMillis();
121-
long timeout = now + dependencies.getReconnectTimeoutMillis();
122-
Throwable error = null;
120+
if (m_state != State.Active || m_connector == null)
121+
{
122+
// we're closed
123+
return;
124+
}
123125

124-
while (now < timeout)
126+
f_lock.lock();
127+
try
125128
{
126-
if (m_state != State.Active || m_connector == null)
127-
{
128-
// we're closed
129-
return;
130-
}
129+
TopicDependencies dependencies = m_connector.getTopicDependencies();
130+
long retry = dependencies.getReconnectRetryMillis();
131+
long now = System.currentTimeMillis();
132+
long timeout = now + dependencies.getReconnectTimeoutMillis();
133+
Throwable error = null;
131134

132-
try
135+
while (now < timeout)
133136
{
134-
m_connector.ensureConnected();
135-
break;
136-
}
137-
catch (Throwable thrown)
138-
{
139-
error = thrown;
140-
if (error instanceof TopicException)
137+
if (m_state != State.Active || m_connector == null)
141138
{
142-
break;
139+
// we're closed
140+
return;
143141
}
144-
}
145-
now = System.currentTimeMillis();
146-
if (now < timeout)
147-
{
148-
Logger.finer("Failed to reconnect publisher, will retry in "
149-
+ retry + " millis " + this + " due to " + error.getMessage());
142+
150143
try
151144
{
152-
Thread.sleep(retry);
145+
m_connector.ensureConnected();
146+
error = null;
147+
break;
153148
}
154-
catch (InterruptedException e)
149+
catch (Throwable thrown)
155150
{
156-
// ignored
151+
error = thrown;
152+
if (error instanceof TopicException)
153+
{
154+
break;
155+
}
156+
}
157+
now = System.currentTimeMillis();
158+
if (now < timeout)
159+
{
160+
Logger.finer("Failed to reconnect publisher, will retry in "
161+
+ retry + " millis " + this + " due to " + error.getMessage());
162+
try
163+
{
164+
Thread.sleep(retry);
165+
}
166+
catch (InterruptedException e)
167+
{
168+
// ignored
169+
}
157170
}
158171
}
159-
}
160172

161-
if (error != null)
173+
if (error != null)
174+
{
175+
throw Exceptions.ensureRuntimeException(error);
176+
}
177+
}
178+
finally
162179
{
163-
throw Exceptions.ensureRuntimeException(error);
180+
f_lock.unlock();
164181
}
165182
}
166183

@@ -598,10 +615,12 @@ public void run()
598615

599616
// ----- data members ---------------------------------------------------
600617

618+
private final Lock f_lock = new ReentrantLock();
619+
601620
/**
602621
* The publisher connector to use to connect to back end resources.
603622
*/
604-
private PublisherChannelConnector<V> m_connector;
623+
private PublisherChannelConnector<V> m_connector;
605624

606625
/**
607626
* The identifier of the parent publisher.

prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/PublisherChannelConnector.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
/*
2-
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
66
*/
77

88
package com.tangosol.internal.net.topic;
99

10+
import com.tangosol.net.TopicService;
1011
import com.tangosol.net.topic.TopicDependencies;
1112

1213
import com.tangosol.util.Binary;
1314

1415
import java.util.List;
1516

16-
import java.util.concurrent.CompletableFuture;
17+
import java.util.concurrent.CompletionStage;
1718

1819
import java.util.function.BiConsumer;
1920

@@ -64,7 +65,7 @@ public interface PublisherChannelConnector<V>
6465
* @return an opaque cookie to pass to the {@link #offer(Object, List, int, BiConsumer)} method.
6566
*/
6667
@SuppressWarnings("TypeParameterExplicitlyExtendsObject")
67-
CompletableFuture<? extends Object> initialize();
68+
CompletionStage<? extends Object> initialize();
6869

6970
/**
7071
* Publish a value to the underlying topic.
@@ -79,17 +80,23 @@ public interface PublisherChannelConnector<V>
7980
/**
8081
* Perform any set-up required to retry an offer.
8182
*
82-
* @param oCookie the cookie used by the connector.
83-
*
84-
* @return a {@link CompletableFuture} that completes when the set-up is completed
83+
* @param oCookie the cookie used by the connector.
84+
* @return a {@link CompletionStage} that completes when the set-up is completed
8585
*/
8686
@SuppressWarnings("TypeParameterExplicitlyExtendsObject")
87-
CompletableFuture<? extends Object> prepareOfferRetry(Object oCookie);
87+
CompletionStage<? extends Object> prepareOfferRetry(Object oCookie);
8888

8989
/**
9090
* Return the dependencies for the topic.
9191
*
9292
* @return the dependencies for the topic
9393
*/
9494
TopicDependencies getTopicDependencies();
95+
96+
/**
97+
* Return the underlying topic service.
98+
*
99+
* @return the underlying topic service
100+
*/
101+
TopicService getTopicService();
95102
}

0 commit comments

Comments
 (0)