Skip to content

Commit

Permalink
- Allow cluster name to be specified
Browse files Browse the repository at this point in the history
  • Loading branch information
dcshock committed Oct 7, 2016
1 parent a0f63af commit f5cdf74
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 15 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ RUN apk add --no-cache bash

# Add forklift server
WORKDIR /tmp
ADD server/target/universal/forklift-server-0.30.zip forklift.zip
ADD server/target/universal/forklift-server-0.31.zip forklift.zip
RUN yes | unzip -d /usr/local forklift.zip
RUN ln -s /usr/local/forklift-server-0.30 /usr/local/forklift
RUN ln -s /usr/local/forklift-server-0.31 /usr/local/forklift
RUN rm forklift.zip
RUN mkdir -p /usr/local/forklift/consumers

Expand Down
1 change: 0 additions & 1 deletion core/src/main/java/forklift/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class Consumer {
private Logger log;
private static AtomicInteger id = new AtomicInteger(1);


private final ClassLoader classLoader;
private final ForkliftConnectorI connector;
private final Map<Class, Map<Class<?>, List<Field>>> injectFields;
Expand Down
5 changes: 5 additions & 0 deletions doc/forklift.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ and used to recreate an event.
events to a log file with enough data to be able to replay the message. In
essence making it possible to resend a message that has already been processed.

===== Stats Collector
The stats collector plugin appends statistical processing data to the properties
of each message processed. This works really well in conjunction with the replay
plugin which stores this information in a datastore like elastic search.

## Quickstart Guide

* Download the https://github.com/dcshock/forklift/releases/download[forklift-server-x.x.zip] release.
Expand Down
2 changes: 1 addition & 1 deletion plugins/replay/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ organization := "com.github.dcshock"

name := "forklift-replay"

version := "0.12"
version := "0.13"

javacOptions ++= Seq("-source", "1.8")

Expand Down
8 changes: 4 additions & 4 deletions plugins/replay/src/main/java/forklift/replay/ReplayES.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ public class ReplayES {
private final ConsumerThread thread;
private final Consumer consumer;

public ReplayES(boolean clientOnly, boolean ssl, String hostname, ForkliftConnectorI connector) {
this(clientOnly, ssl, hostname, 9200, connector);
public ReplayES(boolean clientOnly, String hostname, String clusterName, ForkliftConnectorI connector) {
this(clientOnly, hostname, 9200, clusterName, connector);
}

public ReplayES(boolean clientOnly, boolean ssl, String hostname, int port, ForkliftConnectorI connector) {
public ReplayES(boolean clientOnly, String hostname, int port, String clusterName, ForkliftConnectorI connector) {
/*
* Setup the connection to the server. If we are only a client we'll not setup a node locally to run.
* This will help developers and smaller setups avoid the pain of setting up elastic search.
Expand All @@ -65,7 +65,7 @@ public ReplayES(boolean clientOnly, boolean ssl, String hostname, int port, Fork
}
}

this.writer = new ReplayESWriter(ssl, hostname, port);
this.writer = new ReplayESWriter(hostname, port, clusterName);
this.writer.start();

Runtime.getRuntime().addShutdownHook(new Thread() {
Expand Down
14 changes: 10 additions & 4 deletions plugins/replay/src/main/java/forklift/replay/ReplayESWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
Expand All @@ -17,12 +18,17 @@ public class ReplayESWriter extends ReplayStoreThread<ReplayESWriterMsg> {

private final TransportClient client;

public ReplayESWriter(boolean ssl, String hostname) {
this(ssl, hostname, 9200);
public ReplayESWriter(String hostname) {
this(hostname, 9300, "elasticsearch");
}

public ReplayESWriter(boolean ssl, String hostname, int port) {
this.client = TransportClient.builder().build()
public ReplayESWriter(String hostname, int port, String clusterName) {
final Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName).build();

this.client = TransportClient.builder()
.settings(settings)
.build()
.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(hostname, port)));
}

Expand Down
4 changes: 2 additions & 2 deletions server/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ organization := "com.github.dcshock"

name := "forklift-server"

version := "0.30"
version := "0.31"

enablePlugins(JavaAppPackaging)

Expand All @@ -18,7 +18,7 @@ libraryDependencies ++= Seq(
"com.github.dcshock" % "forklift" % "0.22",
"com.github.dcshock" % "forklift-activemq" % "0.10",
"org.apache.activemq" % "activemq-broker" % "5.14.0",
"com.github.dcshock" % "forklift-replay" % "0.12",
"com.github.dcshock" % "forklift-replay" % "0.13",
"com.github.dcshock" % "forklift-retry" % "0.11",
"com.github.dcshock" % "forklift-stats" % "0.1",
"com.github.dcshock" % "consul-rest-client" % "0.10",
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/forklift/ForkliftOpts.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class ForkliftOpts {
@Option(name="-consulHost", usage="consul host name")
private String consulHost = "localhost";

@Option(name="-replayESCluster", usage="name of the elastic search cluster to use for replay logs.")
private String replayESCluster = "elasticsearch";

public String getConsumerDir() {
return consumerDir;
}
Expand Down Expand Up @@ -156,4 +159,12 @@ public void setRunRetries(boolean runRetries) {
public boolean isRunRetries() {
return runRetries;
}

public String getReplayESCluster() {
return replayESCluster;
}

public void setReplayESCluster(String replayESCluster) {
this.replayESCluster = replayESCluster;
}
}
2 changes: 1 addition & 1 deletion server/src/main/java/forklift/ForkliftServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public static void main(String[] args) throws Throwable {
if (opts.getReplayESHost() == null)
replayES = null;
else
replayES = new ReplayES(!opts.isReplayESServer(), opts.isReplayESSsl(), opts.getReplayESHost(), opts.getReplayESPort(), connector);
replayES = new ReplayES(!opts.isReplayESServer(), opts.getReplayESHost(), opts.getReplayESPort(), opts.getReplayESCluster(), connector);

// Setup retry handling.
if (opts.getRetryDir() != null)
Expand Down

0 comments on commit f5cdf74

Please sign in to comment.