Skip to content

Commit b46e801

Browse files
author
Jonathan Knight
committed
Enh 37387065 - [37381796->25.03] Topics: general refactoring and hardening
(merge main -> ce/main 112890) [git-p4: depot-paths = "//dev/coherence-ce/main/": change = 112900]
1 parent 8a05843 commit b46e801

File tree

122 files changed

+15619
-5885
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

122 files changed

+15619
-5885
lines changed

prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/SafeNamedTopic.java

Lines changed: 377 additions & 369 deletions
Large diffs are not rendered by default.

prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/daemon/queueProcessor/service/grid/partitionedService/partitionedCache/PagedTopic.java

Lines changed: 319 additions & 70 deletions
Large diffs are not rendered by default.
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Universal Permissive License v 1.0 as shown at
5+
* https://oss.oracle.com/licenses/upl.
6+
*/
7+
8+
package com.tangosol.coherence.component.util.safeNamedTopic;
9+
10+
import com.tangosol.coherence.Component;
11+
12+
import com.tangosol.coherence.component.util.SafeNamedTopic;
13+
14+
import com.tangosol.internal.net.topic.PublisherConnector;
15+
import com.tangosol.internal.net.topic.SubscriberConnector;
16+
17+
import com.tangosol.internal.net.topic.impl.paged.PagedTopic;
18+
import com.tangosol.internal.net.topic.impl.paged.PagedTopicCaches;
19+
import com.tangosol.internal.net.topic.impl.paged.PagedTopicPublisherConnector;
20+
import com.tangosol.internal.net.topic.impl.paged.PagedTopicSubscriberConnector;
21+
22+
import com.tangosol.internal.net.topic.impl.paged.model.SubscriberGroupId;
23+
24+
import com.tangosol.net.topic.Publisher;
25+
import com.tangosol.net.topic.Subscriber;
26+
27+
import com.tangosol.util.Filter;
28+
import com.tangosol.util.ValueExtractor;
29+
30+
/**
31+
* A safe wrapper around a paged topic.
32+
*
33+
* @author Jonathan Knight 2024.11.26
34+
*/
35+
@SuppressWarnings({"rawtypes"})
36+
public class SafePagedTopic<V>
37+
extends SafeNamedTopic<V>
38+
{
39+
public SafePagedTopic()
40+
{
41+
this(null, null, true);
42+
}
43+
44+
public SafePagedTopic(String sName, Component compParent, boolean fInit)
45+
{
46+
super(sName, compParent, fInit);
47+
}
48+
49+
@Override
50+
public PublisherConnector<V> createPublisherConnector(Publisher.Option<? super V>[] options)
51+
{
52+
return new PagedTopicPublisherConnector<>(__m_PagedTopicCaches, getChannelCount(), options);
53+
}
54+
55+
@Override
56+
public <U> SubscriberConnector<U> createSubscriberConnector(Subscriber.Option<? super V, U>[] options)
57+
{
58+
return new PagedTopicSubscriberConnector<>(__m_PagedTopicCaches, options);
59+
}
60+
61+
@Override
62+
public boolean isDestroyed()
63+
{
64+
return super.isDestroyed() || getPagedTopicCaches().isDestroyed();
65+
}
66+
67+
@Override
68+
public void destroy()
69+
{
70+
getPagedTopicCaches().destroy();
71+
super.destroy();
72+
}
73+
74+
@Override
75+
public boolean isReleased()
76+
{
77+
return super.isReleased() || getPagedTopicCaches().isReleased();
78+
}
79+
80+
@Override
81+
public void release()
82+
{
83+
getPagedTopicCaches().release();
84+
super.release();
85+
}
86+
87+
@Override
88+
public void ensureSubscriberGroup(String sGroupName, Filter filter, ValueExtractor extractor)
89+
{
90+
if (sGroupName == null)
91+
{
92+
throw new IllegalArgumentException("invalid group name");
93+
}
94+
PagedTopicCaches pagedTopicCaches = getPagedTopicCaches();
95+
pagedTopicCaches.ensureSubscriberGroup(sGroupName, filter, extractor);
96+
}
97+
98+
@Override
99+
public void destroySubscriberGroup(String sGroupName)
100+
{
101+
if (sGroupName == null)
102+
{
103+
throw new IllegalArgumentException("invalid group name");
104+
}
105+
106+
PagedTopicCaches pagedTopicCaches = getPagedTopicCaches();
107+
PagedTopic.destroy(pagedTopicCaches, SubscriberGroupId.withName(sGroupName), 0L);
108+
}
109+
110+
111+
public PagedTopicCaches getPagedTopicCaches()
112+
{
113+
return __m_PagedTopicCaches;
114+
}
115+
116+
public void setPagedTopicCaches(PagedTopicCaches cachesTopic)
117+
{
118+
__m_PagedTopicCaches = cachesTopic;
119+
}
120+
121+
// ----- data members ---------------------------------------------------
122+
123+
private PagedTopicCaches __m_PagedTopicCaches;
124+
}

0 commit comments

Comments
 (0)