Skip to content

Commit d8dbc40

Browse files
committed
Update CamelServletContextListener.java and add node.js STOMP websocket test.
Reworked CamelServletContextListener.java to switch servlet annotation, and revise message connections and endpoints. Added a new package.json file for websocket-testing and a processor for message-to-json processing. Created a new javascript file for testing STOMP over websockets using node.js. This commit lays the groundwork for more effective messaging and debugging.
1 parent f6bf553 commit d8dbc40

File tree

4 files changed

+131
-22
lines changed

4 files changed

+131
-22
lines changed

cwms-data-api/src/main/java/cwms/cda/api/messaging/CamelServletContextListener.java

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424

2525
package cwms.cda.api.messaging;
2626

27-
import cwms.cda.datasource.DelegatingDataSource;
28-
import oracle.jdbc.driver.OracleConnection;
2927
import oracle.jms.AQjmsFactory;
3028
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
3129
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -41,12 +39,11 @@
4139
import javax.jms.TopicConnectionFactory;
4240
import javax.servlet.ServletContextEvent;
4341
import javax.servlet.ServletContextListener;
44-
import javax.servlet.ServletException;
42+
import javax.servlet.annotation.WebListener;
4543
import javax.sql.DataSource;
4644
import java.net.InetAddress;
47-
import java.sql.Connection;
48-
import java.sql.SQLException;
4945

46+
@WebListener
5047
public final class CamelServletContextListener implements ServletContextListener {
5148

5249
@Resource(name = "jdbc/CWMS3")
@@ -58,26 +55,15 @@ public void contextInitialized(ServletContextEvent servletContextEvent) {
5855
try {
5956
//wrapped DelegatingDataSource is used because internally AQJMS casts the returned connection
6057
//as an OracleConnection, but the JNDI pool is returning us a proxy, so unwrap it
61-
CamelContext camelContext = new DefaultCamelContext();
62-
TopicConnectionFactory connectionFactory = AQjmsFactory.getTopicConnectionFactory(new DelegatingDataSource(cwms)
63-
{
64-
@Override
65-
public Connection getConnection() throws SQLException {
66-
return super.getConnection().unwrap(OracleConnection.class);
67-
}
68-
69-
@Override
70-
public Connection getConnection(String username, String password) throws SQLException {
71-
return super.getConnection(username, password).unwrap(OracleConnection.class);
72-
}
73-
}, true);
58+
camelContext = new DefaultCamelContext();
59+
TopicConnectionFactory connectionFactory = AQjmsFactory.getTopicConnectionFactory(new DataSourceWrapper(cwms), true);
7460
camelContext.addComponent("oracleAQ", JmsComponent.jmsComponent(connectionFactory));
7561
//TODO: determine how the port is configured
76-
String activeMqUrl = "tcp://" + InetAddress.getLocalHost().getHostName() + ":61616";
62+
String activeMqUrl = "tcp://" + InetAddress.getLocalHost().getHostName() + ":61616?protocols=STOMP&webSocketEncoderType=text";
7763
ActiveMQServer server = ActiveMQServers.newActiveMQServer(new ConfigurationImpl()
7864
.addAcceptorConfiguration("tcp", activeMqUrl)
79-
.setPersistenceEnabled(true)
80-
.setJournalDirectory("build/data/journal")
65+
.setPersistenceEnabled(false)
66+
// .setJournalDirectory("build/data/journal")
8167
//Need to update to verify roles
8268
.setSecurityEnabled(false)
8369
.addAcceptorConfiguration("invm", "vm://0"));
@@ -90,9 +76,10 @@ public void configure() {
9076
//TODO: determine clientId - should be unique to CDA version?
9177
from("oracleAQ:topic:CWMS_20.SWT_TS_STORED?durableSubscriptionName=CDA_SWT_TS_STORED&clientId=CDA")
9278
.log("Received message from ActiveMQ.Queue : ${body}")
79+
.process(new MapMessageToJsonProcessor(camelContext))
9380
//TODO: define standard naming
9481
//TODO: register artemis queue names with Swagger UI
95-
.to("artemis:topic:ActiveMQ.Queue");
82+
.to("artemis:topic:SWT_TS_STORED");
9683
}
9784
});
9885
server.start();
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2024 Hydrologic Engineering Center
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package cwms.cda.api.messaging;
26+
27+
import com.fasterxml.jackson.databind.ObjectMapper;
28+
import org.apache.camel.CamelContext;
29+
import org.apache.camel.Exchange;
30+
import org.apache.camel.Message;
31+
import org.apache.camel.Processor;
32+
import org.apache.camel.component.jms.JmsMessage;
33+
34+
import javax.jms.MapMessage;
35+
import java.util.Map;
36+
37+
final class MapMessageToJsonProcessor implements Processor {
38+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
39+
private final CamelContext context;
40+
41+
MapMessageToJsonProcessor(CamelContext context) {
42+
this.context = context;
43+
}
44+
45+
@SuppressWarnings("unchecked")
46+
@Override
47+
public void process(Exchange exchange) throws Exception {
48+
Message inMessage = exchange.getIn();
49+
//If we use types other than MapMessage or TextMessage, we'd need to handle here
50+
if (((JmsMessage) inMessage).getJmsMessage() instanceof MapMessage) {
51+
Map<String, Object> map = inMessage.getBody(Map.class);
52+
String payload = null;
53+
54+
if (map != null) {
55+
payload = OBJECT_MAPPER.writeValueAsString(map);
56+
}
57+
inMessage.setBody(payload);
58+
inMessage.setHeader(Exchange.CONTENT_TYPE, "application/json");
59+
}
60+
}
61+
}

websocket-testing/package.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "websocket-testing",
3+
"version": "1.0.0",
4+
"description": "",
5+
"main": "index.js",
6+
"scripts": {
7+
"test": "node ./src/main/javascript/stomp-ws-testing.js"
8+
},
9+
"type": "module",
10+
"keywords": [],
11+
"author": "",
12+
"license": "ISC",
13+
"dependencies": {
14+
"@stomp/stompjs": "^7.0.0",
15+
"ws": "^8.17.0"
16+
}
17+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2024 Hydrologic Engineering Center
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
import {Client} from '@stomp/stompjs';
26+
27+
import {WebSocket} from 'ws';
28+
29+
Object.assign(global, {WebSocket});
30+
const client = new Client({
31+
logRawCommunication: true,
32+
brokerURL: 'ws://tacocat:61616/topic', connectionTimeout: 1000, onConnect: () => {
33+
console.log("Connected")
34+
client.subscribe('SWT_TS_STORED', message => {
35+
console.log(`Received: ${message.body}`);
36+
message.ack();
37+
}, {ack: 'client'});
38+
}, onStompError: (frame) => {
39+
console.log('Broker reported error: ' + frame.headers['message']);
40+
console.log('Additional details: ' + frame.body);
41+
}
42+
});
43+
44+
client.activate();

0 commit comments

Comments
 (0)