Skip to content

Commit bbeea4f

Browse files
committed
initial commit
1 parent a395431 commit bbeea4f

11 files changed

+772
-0
lines changed

.gitignore

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Java #
2+
*.class
3+
4+
# Package Files #
5+
*.jar
6+
*.war
7+
*.ear
8+
9+
# IDEA #
10+
*.iml
11+
.idea
12+
*~
13+
14+
# eclipse specific git ignore
15+
.project
16+
.metadata
17+
.classpath
18+
.checkstyle
19+
.settings/
20+
.checkstyle
21+
22+
# Maven files #
23+
data/
24+
target
25+
pom.xml.tag
26+
pom.xml.releaseBackup
27+
pom.xml.versionsBackup
28+
pom.xml.next
29+
release.properties
30+
dependency-reduced-pom.xml
31+
32+
# Misc #
33+
*.log
34+
.DS_Store

pom.xml

+209
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
5+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
6+
<modelVersion>4.0.0</modelVersion>
7+
<parent>
8+
<groupId>org.graylog.plugins</groupId>
9+
<artifactId>graylog-plugin-parent</artifactId>
10+
<version>2.4.6</version>
11+
</parent>
12+
<groupId>com.github.exabrial</groupId>
13+
<artifactId>graylog-plugin-openwire</artifactId>
14+
<version>1.0.0-SNAPSHOT</version>
15+
<packaging>jar</packaging>
16+
17+
<name>Openwire Graylog Plugin</name>
18+
<description>Provides an Openwire input for Graylog using the ActiveMQ client library</description>
19+
20+
21+
<url>https://github.com/exabrial/graylog-plugin-openwire</url>
22+
<scm>
23+
<connection>scm:git:ssh://[email protected]/exabrial/graylog-plugin-openwire.git</connection>
24+
<developerConnection>scm:git:ssh://[email protected]/exabrial/graylog-plugin-openwire.git</developerConnection>
25+
<url>https://github.com/exabrial/graylog-plugin-openwire/tree/master</url>
26+
<tag>HEAD</tag>
27+
</scm>
28+
29+
<properties>
30+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
31+
<gpg.skip>true</gpg.skip>
32+
</properties>
33+
34+
<licenses>
35+
<license>
36+
<name>The GNU General Public License, Version 3.0</name>
37+
<url>http://www.gnu.org/licenses/gpl-3.0.txt</url>
38+
<distribution>repo</distribution>
39+
</license>
40+
</licenses>
41+
42+
<developers>
43+
<developer>
44+
<id>exabrial</id>
45+
<name>Jonathan S. Fisher</name>
46+
<properties>
47+
<dev.pgp.fingerprint>871638A21A7F2C38066471420306A354336B4F0D</dev.pgp.fingerprint>
48+
</properties>
49+
</developer>
50+
</developers>
51+
52+
<dependencies>
53+
<dependency>
54+
<groupId>org.apache.activemq</groupId>
55+
<artifactId>activemq-client</artifactId>
56+
<version>5.15.5</version>
57+
<scope>compile</scope>
58+
</dependency>
59+
<dependency>
60+
<groupId>com.google.auto.value</groupId>
61+
<artifactId>auto-value</artifactId>
62+
<version>1.5.4</version>
63+
<scope>provided</scope>
64+
</dependency>
65+
<dependency>
66+
<groupId>com.google.auto.service</groupId>
67+
<artifactId>auto-service</artifactId>
68+
<version>1.0-rc4</version>
69+
<scope>provided</scope>
70+
</dependency>
71+
<dependency>
72+
<groupId>org.graylog2</groupId>
73+
<artifactId>graylog2-server</artifactId>
74+
<scope>provided</scope>
75+
</dependency>
76+
</dependencies>
77+
78+
<build>
79+
<resources>
80+
<resource>
81+
<directory>src/main/resources</directory>
82+
<filtering>true</filtering>
83+
</resource>
84+
</resources>
85+
<plugins>
86+
<plugin>
87+
<groupId>org.apache.maven.plugins</groupId>
88+
<artifactId>maven-gpg-plugin</artifactId>
89+
<executions>
90+
<execution>
91+
<id>gpg-sign</id>
92+
<phase>verify</phase>
93+
<goals>
94+
<goal>sign</goal>
95+
</goals>
96+
</execution>
97+
</executions>
98+
</plugin>
99+
<plugin>
100+
<groupId>org.apache.maven.plugins</groupId>
101+
<artifactId>maven-compiler-plugin</artifactId>
102+
<configuration>
103+
<source>1.8</source>
104+
<target>1.8</target>
105+
<compilerArguments>
106+
<parameters />
107+
</compilerArguments>
108+
<annotationProcessors>
109+
<annotationProcessor>com.google.auto.service.processor.AutoServiceProcessor</annotationProcessor>
110+
<annotationProcessor>com.google.auto.value.processor.AutoValueProcessor</annotationProcessor>
111+
</annotationProcessors>
112+
</configuration>
113+
</plugin>
114+
<plugin>
115+
<groupId>org.apache.maven.plugins</groupId>
116+
<artifactId>maven-jar-plugin</artifactId>
117+
<configuration>
118+
<archive>
119+
<manifestEntries>
120+
<Graylog-Plugin-Properties-Path>${project.groupId}.${project.artifactId}</Graylog-Plugin-Properties-Path>
121+
</manifestEntries>
122+
</archive>
123+
</configuration>
124+
</plugin>
125+
<plugin>
126+
<groupId>org.apache.maven.plugins</groupId>
127+
<artifactId>maven-shade-plugin</artifactId>
128+
<configuration>
129+
<createDependencyReducedPom>false</createDependencyReducedPom>
130+
<minimizeJar>false</minimizeJar>
131+
</configuration>
132+
<executions>
133+
<execution>
134+
<phase>package</phase>
135+
<goals>
136+
<goal>shade</goal>
137+
</goals>
138+
<configuration>
139+
<transformers>
140+
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
141+
</transformers>
142+
</configuration>
143+
</execution>
144+
</executions>
145+
</plugin>
146+
<plugin>
147+
<groupId>org.apache.maven.plugins</groupId>
148+
<artifactId>maven-toolchains-plugin</artifactId>
149+
<version>1.1</version>
150+
<executions>
151+
<execution>
152+
<id>toolchain-exec</id>
153+
<goals>
154+
<goal>toolchain</goal>
155+
</goals>
156+
</execution>
157+
</executions>
158+
<configuration>
159+
<toolchains>
160+
<jdk>
161+
<version>1.8</version>
162+
</jdk>
163+
</toolchains>
164+
</configuration>
165+
</plugin>
166+
<plugin>
167+
<groupId>org.sonatype.plugins</groupId>
168+
<artifactId>nexus-staging-maven-plugin</artifactId>
169+
<version>1.6.8</version>
170+
<extensions>true</extensions>
171+
<configuration>
172+
<serverId>ossrh</serverId>
173+
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
174+
<autoReleaseAfterClose>true</autoReleaseAfterClose>
175+
</configuration>
176+
</plugin>
177+
</plugins>
178+
</build>
179+
180+
<pluginRepositories>
181+
<pluginRepository>
182+
<id>central</id>
183+
<name>Central Repository</name>
184+
<releases>
185+
<checksumPolicy>fail</checksumPolicy>
186+
</releases>
187+
<url>https://repo.maven.apache.org/maven2</url>
188+
</pluginRepository>
189+
</pluginRepositories>
190+
<distributionManagement>
191+
<snapshotRepository>
192+
<id>ossrh</id>
193+
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
194+
</snapshotRepository>
195+
<repository>
196+
<id>ossrh</id>
197+
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
198+
</repository>
199+
</distributionManagement>
200+
201+
<profiles>
202+
<profile>
203+
<id>sign</id>
204+
<properties>
205+
<gpg.skip>false</gpg.skip>
206+
</properties>
207+
</profile>
208+
</profiles>
209+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package com.github.exabrial.graylog;
2+
3+
import java.nio.charset.StandardCharsets;
4+
import java.util.concurrent.ScheduledExecutorService;
5+
import java.util.concurrent.TimeUnit;
6+
import java.util.concurrent.atomic.AtomicLong;
7+
8+
import javax.jms.Connection;
9+
import javax.jms.Destination;
10+
import javax.jms.ExceptionListener;
11+
import javax.jms.JMSException;
12+
import javax.jms.Message;
13+
import javax.jms.MessageConsumer;
14+
import javax.jms.MessageListener;
15+
import javax.jms.Session;
16+
import javax.jms.TextMessage;
17+
18+
import org.apache.activemq.ActiveMQConnectionFactory;
19+
import org.graylog2.plugin.inputs.MessageInput;
20+
import org.graylog2.plugin.journal.RawMessage;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
public class OpenwireConsumer {
25+
private final String brokerUrl;
26+
private final String queue;
27+
private final MessageInput sourceInput;
28+
private final OpenwireTransport openwireTransport;
29+
30+
private AtomicLong totalBytesRead = new AtomicLong(0);
31+
private AtomicLong lastSecBytesRead = new AtomicLong(0);
32+
private AtomicLong lastSecBytesReadTmp = new AtomicLong(0);
33+
private Connection connection;
34+
35+
OpenwireConsumer(String brokerUrl, String queue, MessageInput sourceInput, ScheduledExecutorService scheduler,
36+
OpenwireTransport openwireTransport) {
37+
this.brokerUrl = brokerUrl;
38+
this.queue = queue;
39+
this.sourceInput = sourceInput;
40+
this.openwireTransport = openwireTransport;
41+
42+
scheduler.scheduleAtFixedRate(new Runnable() {
43+
@Override
44+
public void run() {
45+
lastSecBytesRead.set(lastSecBytesReadTmp.getAndSet(0));
46+
}
47+
}, 1, 1, TimeUnit.SECONDS);
48+
}
49+
50+
public void run() throws JMSException {
51+
if (!isConnected()) {
52+
connect();
53+
}
54+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
55+
Destination destination = session.createQueue(queue);
56+
MessageConsumer consumer = session.createConsumer(destination);
57+
consumer.setMessageListener(new MessageListener() {
58+
@Override
59+
public void onMessage(Message message) {
60+
try {
61+
TextMessage textMessage = (TextMessage) message;
62+
String body = textMessage.getText();
63+
totalBytesRead.addAndGet(body.length());
64+
lastSecBytesReadTmp.addAndGet(body.length());
65+
final RawMessage rawMessage = new RawMessage(body.getBytes(StandardCharsets.UTF_8));
66+
if (openwireTransport.isThrottled()) {
67+
openwireTransport.blockUntilUnthrottled();
68+
}
69+
sourceInput.processRawMessage(rawMessage);
70+
} catch (JMSException exception) {
71+
Logger log = LoggerFactory.getLogger(getClass());
72+
log.error("onMessage() error", exception);
73+
throw new RuntimeException(exception);
74+
}
75+
}
76+
});
77+
}
78+
79+
public void connect() throws JMSException {
80+
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
81+
connection = connectionFactory.createConnection();
82+
connection.start();
83+
connection.setExceptionListener(new ExceptionListener() {
84+
@Override
85+
public void onException(JMSException exception) {
86+
Logger log = LoggerFactory.getLogger(getClass());
87+
log.error("onException() error", exception);
88+
}
89+
});
90+
}
91+
92+
public void stop() throws JMSException {
93+
if (connection != null) {
94+
try {
95+
connection.close();
96+
} finally {
97+
connection = null;
98+
}
99+
}
100+
}
101+
102+
public boolean isConnected() {
103+
return connection != null;
104+
}
105+
106+
public AtomicLong getLastSecBytesRead() {
107+
return lastSecBytesRead;
108+
}
109+
110+
public AtomicLong getTotalBytesRead() {
111+
return totalBytesRead;
112+
}
113+
}

0 commit comments

Comments
 (0)