From f5cdf743d7265de1b611c48e0924d8f0cacd6fe6 Mon Sep 17 00:00:00 2001 From: Matt Conroy Date: Fri, 7 Oct 2016 12:40:50 -0600 Subject: [PATCH] - Allow cluster name to be specified --- Dockerfile | 4 ++-- core/src/main/java/forklift/consumer/Consumer.java | 1 - doc/forklift.adoc | 5 +++++ plugins/replay/build.sbt | 2 +- .../src/main/java/forklift/replay/ReplayES.java | 8 ++++---- .../main/java/forklift/replay/ReplayESWriter.java | 14 ++++++++++---- server/build.sbt | 4 ++-- server/src/main/java/forklift/ForkliftOpts.java | 11 +++++++++++ server/src/main/java/forklift/ForkliftServer.java | 2 +- 9 files changed, 36 insertions(+), 15 deletions(-) diff --git a/Dockerfile b/Dockerfile index 63643ae..81db2bc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,9 +9,9 @@ RUN apk add --no-cache bash # Add forklift server WORKDIR /tmp -ADD server/target/universal/forklift-server-0.30.zip forklift.zip +ADD server/target/universal/forklift-server-0.31.zip forklift.zip RUN yes | unzip -d /usr/local forklift.zip -RUN ln -s /usr/local/forklift-server-0.30 /usr/local/forklift +RUN ln -s /usr/local/forklift-server-0.31 /usr/local/forklift RUN rm forklift.zip RUN mkdir -p /usr/local/forklift/consumers diff --git a/core/src/main/java/forklift/consumer/Consumer.java b/core/src/main/java/forklift/consumer/Consumer.java index af62711..239e8b8 100644 --- a/core/src/main/java/forklift/consumer/Consumer.java +++ b/core/src/main/java/forklift/consumer/Consumer.java @@ -55,7 +55,6 @@ public class Consumer { private Logger log; private static AtomicInteger id = new AtomicInteger(1); - private final ClassLoader classLoader; private final ForkliftConnectorI connector; private final Map, List>> injectFields; diff --git a/doc/forklift.adoc b/doc/forklift.adoc index b3ae641..1aefce7 100644 --- a/doc/forklift.adoc +++ b/doc/forklift.adoc @@ -285,6 +285,11 @@ and used to recreate an event. events to a log file with enough data to be able to replay the message. In essence making it possible to resend a message that has already been processed. +===== Stats Collector +The stats collector plugin appends statistical processing data to the properties +of each message processed. This works really well in conjunction with the replay +plugin which stores this information in a datastore like elastic search. + ## Quickstart Guide * Download the https://github.com/dcshock/forklift/releases/download[forklift-server-x.x.zip] release. diff --git a/plugins/replay/build.sbt b/plugins/replay/build.sbt index 7c3db12..11c3b22 100644 --- a/plugins/replay/build.sbt +++ b/plugins/replay/build.sbt @@ -2,7 +2,7 @@ organization := "com.github.dcshock" name := "forklift-replay" -version := "0.12" +version := "0.13" javacOptions ++= Seq("-source", "1.8") diff --git a/plugins/replay/src/main/java/forklift/replay/ReplayES.java b/plugins/replay/src/main/java/forklift/replay/ReplayES.java index acbf44a..227da3a 100644 --- a/plugins/replay/src/main/java/forklift/replay/ReplayES.java +++ b/plugins/replay/src/main/java/forklift/replay/ReplayES.java @@ -38,11 +38,11 @@ public class ReplayES { private final ConsumerThread thread; private final Consumer consumer; - public ReplayES(boolean clientOnly, boolean ssl, String hostname, ForkliftConnectorI connector) { - this(clientOnly, ssl, hostname, 9200, connector); + public ReplayES(boolean clientOnly, String hostname, String clusterName, ForkliftConnectorI connector) { + this(clientOnly, hostname, 9200, clusterName, connector); } - public ReplayES(boolean clientOnly, boolean ssl, String hostname, int port, ForkliftConnectorI connector) { + public ReplayES(boolean clientOnly, String hostname, int port, String clusterName, ForkliftConnectorI connector) { /* * Setup the connection to the server. If we are only a client we'll not setup a node locally to run. * This will help developers and smaller setups avoid the pain of setting up elastic search. @@ -65,7 +65,7 @@ public ReplayES(boolean clientOnly, boolean ssl, String hostname, int port, Fork } } - this.writer = new ReplayESWriter(ssl, hostname, port); + this.writer = new ReplayESWriter(hostname, port, clusterName); this.writer.start(); Runtime.getRuntime().addShutdownHook(new Thread() { diff --git a/plugins/replay/src/main/java/forklift/replay/ReplayESWriter.java b/plugins/replay/src/main/java/forklift/replay/ReplayESWriter.java index ab4c390..770bfef 100644 --- a/plugins/replay/src/main/java/forklift/replay/ReplayESWriter.java +++ b/plugins/replay/src/main/java/forklift/replay/ReplayESWriter.java @@ -2,6 +2,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; @@ -17,12 +18,17 @@ public class ReplayESWriter extends ReplayStoreThread { private final TransportClient client; - public ReplayESWriter(boolean ssl, String hostname) { - this(ssl, hostname, 9200); + public ReplayESWriter(String hostname) { + this(hostname, 9300, "elasticsearch"); } - public ReplayESWriter(boolean ssl, String hostname, int port) { - this.client = TransportClient.builder().build() + public ReplayESWriter(String hostname, int port, String clusterName) { + final Settings settings = Settings.settingsBuilder() + .put("cluster.name", clusterName).build(); + + this.client = TransportClient.builder() + .settings(settings) + .build() .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(hostname, port))); } diff --git a/server/build.sbt b/server/build.sbt index a70d1c7..819f02b 100644 --- a/server/build.sbt +++ b/server/build.sbt @@ -2,7 +2,7 @@ organization := "com.github.dcshock" name := "forklift-server" -version := "0.30" +version := "0.31" enablePlugins(JavaAppPackaging) @@ -18,7 +18,7 @@ libraryDependencies ++= Seq( "com.github.dcshock" % "forklift" % "0.22", "com.github.dcshock" % "forklift-activemq" % "0.10", "org.apache.activemq" % "activemq-broker" % "5.14.0", - "com.github.dcshock" % "forklift-replay" % "0.12", + "com.github.dcshock" % "forklift-replay" % "0.13", "com.github.dcshock" % "forklift-retry" % "0.11", "com.github.dcshock" % "forklift-stats" % "0.1", "com.github.dcshock" % "consul-rest-client" % "0.10", diff --git a/server/src/main/java/forklift/ForkliftOpts.java b/server/src/main/java/forklift/ForkliftOpts.java index 6c33e46..2d976ca 100644 --- a/server/src/main/java/forklift/ForkliftOpts.java +++ b/server/src/main/java/forklift/ForkliftOpts.java @@ -45,6 +45,9 @@ public class ForkliftOpts { @Option(name="-consulHost", usage="consul host name") private String consulHost = "localhost"; + @Option(name="-replayESCluster", usage="name of the elastic search cluster to use for replay logs.") + private String replayESCluster = "elasticsearch"; + public String getConsumerDir() { return consumerDir; } @@ -156,4 +159,12 @@ public void setRunRetries(boolean runRetries) { public boolean isRunRetries() { return runRetries; } + + public String getReplayESCluster() { + return replayESCluster; + } + + public void setReplayESCluster(String replayESCluster) { + this.replayESCluster = replayESCluster; + } } diff --git a/server/src/main/java/forklift/ForkliftServer.java b/server/src/main/java/forklift/ForkliftServer.java index aca2a11..f36fe95 100644 --- a/server/src/main/java/forklift/ForkliftServer.java +++ b/server/src/main/java/forklift/ForkliftServer.java @@ -138,7 +138,7 @@ public static void main(String[] args) throws Throwable { if (opts.getReplayESHost() == null) replayES = null; else - replayES = new ReplayES(!opts.isReplayESServer(), opts.isReplayESSsl(), opts.getReplayESHost(), opts.getReplayESPort(), connector); + replayES = new ReplayES(!opts.isReplayESServer(), opts.getReplayESHost(), opts.getReplayESPort(), opts.getReplayESCluster(), connector); // Setup retry handling. if (opts.getRetryDir() != null)