Skip to content

Commit e042a2d

Browse files
author
Marcin Kuthan
committed
workaround for #3, to be verified
1 parent 19087bf commit e042a2d

File tree

3 files changed

+8
-3
lines changed

3 files changed

+8
-3
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ Start Kafka producer:
4848

4949
Start Kafka consumer:
5050

51-
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic output
51+
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output
5252

5353
Run example application:
5454

55-
./sbt "run-main example.WordCountJob"
55+
sbt "runMain example.WordCountJob"
5656

5757
Publish a few words on input topic using Kafka console producer and check the processing result on output topic using Kafka console producer.
5858

build.sbt

+5
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ lazy val customLibraryDependencies = Seq(
5757
"org.scalatest" %% "scalatest" % "2.2.4" % "test"
5858
)
5959

60+
lazy val commonExcludeDependencies = Seq(
61+
"org.slf4j" % "slf4j-log4j12"
62+
)
63+
6064
lazy val customJavaOptions = Seq(
6165
"-Xmx1024m",
6266
"-XX:-MaxFDLimit"
@@ -69,6 +73,7 @@ lazy val root = (project in file("."))
6973
.settings(commonSettings)
7074
.settings(scalacOptions ++= customScalacOptions)
7175
.settings(libraryDependencies ++= customLibraryDependencies)
76+
.settings(excludeDependencies ++= commonExcludeDependencies)
7277
.settings(fork in run := true)
7378
.settings(connectInput in run := true)
7479
.settings(javaOptions in run ++= customJavaOptions)

src/main/scala/org/mkuthan/spark/KafkaDStreamSink.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.log4j.Logger
2323
import org.apache.spark.TaskContext
2424
import org.apache.spark.streaming.dstream.DStream
2525

26-
class KafkaDStreamSink(dstream: DStream[KafkaPayload]) {
26+
class KafkaDStreamSink(@transient private val dstream: DStream[KafkaPayload]) extends Serializable {
2727

2828
def sendToKafka(config: Map[String, String], topic: String): Unit = {
2929
dstream.foreachRDD { rdd =>

0 commit comments

Comments
 (0)