Skip to content

Commit be1856c

Browse files
authored
Merge pull request #1247 from Nikita-Shupletsov/virtual_topic_fix
[AMQ-9530]Fix SelectorAwareVirtualTopicInterceptor ClassCastException if next is not Topic.
2 parents 871f942 + 473267b commit be1856c

File tree

5 files changed

+173
-15
lines changed

5 files changed

+173
-15
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.broker.region.virtual;
18+
19+
import org.apache.activemq.broker.region.BaseDestination;
20+
import org.apache.activemq.broker.region.Destination;
21+
import org.apache.activemq.broker.region.DestinationFilter;
22+
23+
import java.util.Optional;
24+
25+
public class BaseVirtualDestinationFilter extends DestinationFilter {
26+
27+
public BaseVirtualDestinationFilter(Destination next) {
28+
super(next);
29+
}
30+
31+
BaseDestination getBaseDestination(Destination virtualDest) {
32+
if (virtualDest instanceof BaseDestination) {
33+
return (BaseDestination) virtualDest;
34+
} else if (virtualDest instanceof DestinationFilter) {
35+
return ((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class);
36+
}
37+
return null;
38+
}
39+
}

activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616
*/
1717
package org.apache.activemq.broker.region.virtual;
1818

19+
import java.util.Optional;
1920
import java.util.Set;
2021

2122
import org.apache.activemq.broker.ConnectionContext;
2223
import org.apache.activemq.broker.region.BaseDestination;
2324
import org.apache.activemq.broker.region.Destination;
24-
import org.apache.activemq.broker.region.DestinationFilter;
2525
import org.apache.activemq.broker.region.IndirectMessageReference;
2626
import org.apache.activemq.broker.region.RegionBroker;
2727
import org.apache.activemq.broker.region.Subscription;
@@ -34,7 +34,7 @@
3434
* Creates a mapped Queue that can recover messages from subscription recovery
3535
* policy of its Virtual Topic.
3636
*/
37-
public class MappedQueueFilter extends DestinationFilter {
37+
public class MappedQueueFilter extends BaseVirtualDestinationFilter {
3838

3939
private final ActiveMQDestination virtualDestination;
4040

@@ -87,15 +87,6 @@ public synchronized void addSubscription(ConnectionContext context, Subscription
8787
}
8888
}
8989

90-
private BaseDestination getBaseDestination(Destination virtualDest) {
91-
if (virtualDest instanceof BaseDestination) {
92-
return (BaseDestination) virtualDest;
93-
} else if (virtualDest instanceof DestinationFilter) {
94-
return ((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class);
95-
}
96-
return null;
97-
}
98-
9990
@Override
10091
public synchronized void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
10192
super.removeSubscription(context, sub, lastDeliveredSequenceId);

activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package org.apache.activemq.broker.region.virtual;
1818

1919
import org.apache.activemq.broker.Broker;
20+
import org.apache.activemq.broker.region.BaseDestination;
2021
import org.apache.activemq.broker.region.Destination;
2122
import org.apache.activemq.broker.region.Subscription;
22-
import org.apache.activemq.broker.region.Topic;
2323
import org.apache.activemq.command.Message;
2424
import org.apache.activemq.filter.BooleanExpression;
2525
import org.apache.activemq.filter.MessageEvaluationContext;
@@ -41,8 +41,11 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
4141

4242
public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
4343
super(next, virtualTopic);
44+
BaseDestination baseDestination = getBaseDestination(next);
4445
selectorCachePlugin = (SubQueueSelectorCacheBroker)
45-
((Topic)next).createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class);
46+
(baseDestination != null
47+
? baseDestination.createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class)
48+
: null);
4649
}
4750

4851
/**

activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.activemq.broker.ConnectionContext;
2626
import org.apache.activemq.broker.ProducerBrokerExchange;
2727
import org.apache.activemq.broker.region.Destination;
28-
import org.apache.activemq.broker.region.DestinationFilter;
2928
import org.apache.activemq.broker.region.Topic;
3029
import org.apache.activemq.command.ActiveMQDestination;
3130
import org.apache.activemq.command.ActiveMQQueue;
@@ -39,7 +38,7 @@
3938
/**
4039
* A Destination which implements <a href="https://activemq.apache.org/virtual-destinations">Virtual Topic</a>
4140
*/
42-
public class VirtualTopicInterceptor extends DestinationFilter {
41+
public class VirtualTopicInterceptor extends BaseVirtualDestinationFilter {
4342

4443
private final String prefix;
4544
private final String postfix;
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.broker.virtual;
18+
19+
import jakarta.jms.Destination;
20+
import jakarta.jms.JMSException;
21+
import jakarta.jms.MessageConsumer;
22+
import jakarta.jms.MessageProducer;
23+
import jakarta.jms.Session;
24+
import org.apache.activemq.broker.Broker;
25+
import org.apache.activemq.broker.BrokerService;
26+
import org.apache.activemq.broker.ConnectionContext;
27+
import org.apache.activemq.broker.region.DestinationFilter;
28+
import org.apache.activemq.broker.region.DestinationInterceptor;
29+
import org.apache.activemq.broker.region.virtual.VirtualDestination;
30+
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
31+
import org.apache.activemq.broker.region.virtual.VirtualTopic;
32+
import org.apache.activemq.command.ActiveMQDestination;
33+
import org.apache.activemq.command.ActiveMQQueue;
34+
import org.apache.activemq.command.ActiveMQTopic;
35+
import org.apache.activemq.spring.ConsumerBean;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
public class VirtualTopicSelectorWithAnotherInterceptorTest extends CompositeTopicTest {
40+
41+
private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicSelectorWithAnotherInterceptorTest.class);
42+
43+
protected Destination getConsumer1Dsetination() {
44+
return new ActiveMQQueue("Consumer.1.VirtualTopic.TEST");
45+
}
46+
47+
protected Destination getConsumer2Dsetination() {
48+
return new ActiveMQQueue("Consumer.2.VirtualTopic.TEST");
49+
}
50+
51+
protected Destination getProducerDestination() {
52+
return new ActiveMQTopic("VirtualTopic.TEST");
53+
}
54+
55+
@Override
56+
protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
57+
messageList1.assertMessagesArrived(total / 2);
58+
messageList2.assertMessagesArrived(total / 2);
59+
60+
messageList1.flushMessages();
61+
messageList2.flushMessages();
62+
63+
LOG.info("validate no other messages on queues");
64+
try {
65+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
66+
67+
Destination destination1 = getConsumer1Dsetination();
68+
Destination destination2 = getConsumer2Dsetination();
69+
MessageConsumer c1 = session.createConsumer(destination1, null);
70+
MessageConsumer c2 = session.createConsumer(destination2, null);
71+
c1.setMessageListener(messageList1);
72+
c2.setMessageListener(messageList2);
73+
74+
75+
LOG.info("send one simple message that should go to both consumers");
76+
MessageProducer producer = session.createProducer(getProducerDestination());
77+
assertNotNull(producer);
78+
79+
producer.send(session.createTextMessage("Last Message"));
80+
81+
messageList1.assertMessagesArrived(1);
82+
messageList2.assertMessagesArrived(1);
83+
84+
} catch (JMSException e) {
85+
e.printStackTrace();
86+
fail("unexpeced ex while waiting for last messages: " + e);
87+
}
88+
}
89+
90+
@Override
91+
protected BrokerService createBroker() throws Exception {
92+
// use message selectors on consumers that need to propagate up to the virtual
93+
// topic dispatch so that un matched messages do not linger on subscription queues
94+
messageSelector1 = "odd = 'yes'";
95+
messageSelector2 = "odd = 'no'";
96+
97+
BrokerService broker = new BrokerService();
98+
broker.setPersistent(false);
99+
100+
VirtualTopic virtualTopic = new VirtualTopic();
101+
// the new config that enables selectors on the interceptor
102+
virtualTopic.setSelectorAware(true);
103+
VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
104+
interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
105+
TestDestinationInterceptor testInterceptor = new TestDestinationInterceptor();
106+
broker.setDestinationInterceptors(new DestinationInterceptor[]{testInterceptor, interceptor});
107+
return broker;
108+
}
109+
110+
private static class TestDestinationInterceptor implements DestinationInterceptor {
111+
112+
@Override
113+
public org.apache.activemq.broker.region.Destination intercept(org.apache.activemq.broker.region.Destination destination) {
114+
return new DestinationFilter(destination);
115+
}
116+
117+
@Override
118+
public void remove(org.apache.activemq.broker.region.Destination destination) {
119+
}
120+
121+
@Override
122+
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
123+
}
124+
}
125+
126+
}

0 commit comments

Comments
 (0)