Skip to content

Commit d9e6b75

Browse files
committed
Enh 37387064 - [37381796->24.09.1] Topics: general refactoring and hardening
(merge ce/main -> ce/24.09 113488) [git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v24.09/": change = 113492]
1 parent ae8019e commit d9e6b75

File tree

2 files changed

+126
-332
lines changed

2 files changed

+126
-332
lines changed

prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriberConnector.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
/*
2-
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
66
*/
77

88
package com.tangosol.internal.net.topic.impl.paged;
99

10+
import com.oracle.coherence.common.base.Blocking;
1011
import com.oracle.coherence.common.base.Exceptions;
1112
import com.oracle.coherence.common.base.Logger;
1213

@@ -261,6 +262,25 @@ public void closeSubscription(ConnectedSubscriber<V> subscriber, boolean fDestro
261262
// finalizers in the JVM are not reliable that is probably not such a good idea.
262263
destroy(f_caches, f_subscriberGroupId, m_subscriptionId);
263264
}
265+
266+
// We need to ensure that the subscription has really gone.
267+
// During a fail-over situation the subscriber may still exist in the configmap
268+
// so we need to repeat the closure notification
269+
TopicSubscription subscription = getSubscription(subscriber, m_subscriptionId);
270+
while (subscription != null && subscription.getSubscriberTimestamp(f_subscriberId) != Long.MAX_VALUE)
271+
{
272+
try
273+
{
274+
Blocking.sleep(100);
275+
}
276+
catch (InterruptedException e)
277+
{
278+
break;
279+
}
280+
Logger.fine("Repeating subscriber closed notification for topic subscriber: " + subscriber);
281+
PagedTopicSubscription.notifyClosed(f_caches.Subscriptions, f_subscriberGroupId, m_subscriptionId, f_subscriberId);
282+
subscription = getSubscription(subscriber, m_subscriptionId);
283+
}
264284
}
265285

266286
@Override

0 commit comments

Comments
 (0)