Skip to content

Commit c1e9fb5

Browse files
author
Katrin Shechtman
committed
adding streaming example and providing setup instructions and tests
1 parent 375a2bc commit c1e9fb5

10 files changed

+140
-10
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ You should see `scala>` after some more updates (Use `:q` for quitting). You're
3030

3131
# Different ways to execute the code
3232

33-
`sbt test` - executes all tests
33+
`sbt "testOnly sample.<name-of-the-test>""` - executes specific test
3434

3535
`sbt run` - executing Main with Spark in-memory
3636

build.sbt

+7-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ libraryDependencies ++= {
1313
ExclusionRule(organization = "org.scalactic"),
1414
ExclusionRule(organization = "org.scalatest")
1515
),
16-
"org.apache.spark" %% "spark-hive" % sparkVersion
16+
"org.apache.spark" %% "spark-hive" % sparkVersion,
17+
"org.apache.spark" % "spark-streaming_2.11" % sparkVersion % "provided",
18+
"com.typesafe" % "config" % "1.3.1",
19+
"org.jfarcand" % "wcs" % "1.5"
1720
)
1821
}
1922

@@ -25,9 +28,11 @@ assemblyMergeStrategy in assembly := {
2528
fork in run := true
2629
javaOptions in run ++= Seq(
2730
"-Dlog4j.configuration=log4j.properties")
28-
fork in Test := true
31+
fork in Test := false
2932
javaOptions in Test ++= Seq(
3033
"-Dlog4j.configuration=log4j.properties")
3134

35+
parallelExecution in Test := false
36+
3237
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in(Compile, run), runner in(Compile, run))
3338
runMain in Compile <<= Defaults.runMainTask(fullClasspath in Compile, runner in(Compile, run))

src/main/resources/application.conf

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
// NOTE: it assumes these jars are present at this location in Spark executors.
2+
//spark.jars.location=["local:///usr/local/spark-2.0.1-bin-hadoop2.7/mysql-connector-java-6.0.5.jar"]
3+
spark.master="local[*]"// "spark://127.0.0.1:7077"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package sample
2+
3+
import org.apache.spark.storage.StorageLevel
4+
import org.apache.spark.streaming.receiver.Receiver
5+
import org.jfarcand.wcs.{TextListener, WebSocket}
6+
7+
8+
class CryptoCurrencyFeedReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable {
9+
10+
private[this] lazy val thread: Thread = new Thread(this)
11+
12+
private[this] lazy val webSocket: WebSocket =
13+
WebSocket().open("wss://stream.binance.com:9443/ws/ethbtc@aggTrade")
14+
15+
override def onStart() = {
16+
thread.start()
17+
}
18+
19+
override def onStop() = {
20+
webSocket.close
21+
thread.interrupt()
22+
}
23+
24+
override def run() = {
25+
webSocket.listener(new TextListener {
26+
override def onMessage(message: String) {
27+
println(message)
28+
store(message)
29+
}
30+
})
31+
}
32+
}

src/main/scala/Main.scala src/main/scala/sample/Main.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
package sample
12

23
import java.io.File
34

4-
import org.apache.spark._
5+
import org.apache.spark.{SparkConf, SparkContext}
56

67
object Main extends App {
78

src/main/scala/sample/Utils.scala

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package sample
2+
3+
import com.typesafe.config.Config
4+
import org.apache.spark.SparkConf
5+
import org.apache.spark.sql.SparkSession
6+
import org.apache.spark.streaming.{Duration, StreamingContext}
7+
8+
object Utils {
9+
10+
def getOrCreateSparkSession(conf: Config) = {
11+
12+
val appName = "Scala for Spark"
13+
val master = conf.getString("spark.master")
14+
// val jars = conf.getString("spark.jars.location")
15+
val session = SparkSession.builder
16+
.master(master)
17+
.appName(appName)
18+
// .config("spark.jars", jars)
19+
.getOrCreate()
20+
session
21+
}
22+
23+
def createSparkStreamingContext(conf: Config, time: Duration) = {
24+
25+
val appName = "Scala for Spark"
26+
val master = conf.getString("spark.master")
27+
val sparkConf = new SparkConf()
28+
.setMaster(master)
29+
.setAppName(appName)
30+
31+
new StreamingContext(sparkConf, time)
32+
}
33+
}

src/test/resources/application.conf

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
db.default.port=3307
2+
db.default.driver="com.mysql.cj.jdbc.Driver"
3+
db.default.url="jdbc:mysql://127.0.0.1:"${db.default.port}"/test"
4+
db.default.user="root"
5+
db.default.password=""
6+
spark.jars.location=""
7+
spark.master="local[*]"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package sample
2+
3+
import com.typesafe.config.ConfigFactory
4+
import org.apache.spark.streaming.Seconds
5+
import org.scalatest.concurrent.Eventually
6+
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
7+
8+
import scala.collection.mutable
9+
10+
class HelloLocalSparkSpec extends WordSpecLike
11+
with Matchers
12+
with BeforeAndAfterAll
13+
with Eventually {
14+
15+
val conf = ConfigFactory.load()
16+
17+
lazy val ssc = Utils.createSparkStreamingContext(conf, Seconds(10))
18+
19+
override protected def afterAll(): Unit = {
20+
super.afterAll()
21+
ssc.stop()
22+
}
23+
24+
"Hello Spark Streaming" should {
25+
26+
"successfully receives multiple batches" in {
27+
28+
val items = mutable.Map.empty[Int, Long].withDefaultValue(0L)
29+
30+
val stream = ssc.receiverStream(new CryptoCurrencyFeedReceiver())
31+
32+
stream.foreachRDD(rdd => items += rdd.id -> (items(rdd.id) + rdd.count()))
33+
34+
ssc.start()
35+
36+
Thread.sleep(22000)
37+
38+
println(items)
39+
40+
assert(items.exists {
41+
case (_, trades) => trades > 0
42+
})
43+
44+
}
45+
}
46+
}
47+

src/test/scala/WordCountTest.scala src/test/scala/sample/WordCountTest.scala

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1+
package sample
2+
3+
import org.apache.spark.{SparkConf, SparkContext}
14
import org.scalatest.{BeforeAndAfterAll, FunSuite}
2-
import org.apache.spark.SparkContext
3-
import org.apache.spark.SparkConf
45

56
class WordCountTest extends FunSuite with BeforeAndAfterAll {
67

7-
private var sparkConf: SparkConf = _
8-
private var sc: SparkContext = _
8+
lazy val sparkConf: SparkConf = new SparkConf().setAppName("unit-testing").setMaster("local")
9+
lazy val sc: SparkContext = new SparkContext(sparkConf)
910

1011
override def beforeAll() {
11-
sparkConf = new SparkConf().setAppName("unit-testing").setMaster("local")
12-
sc = new SparkContext(sparkConf)
12+
1313
}
1414

1515
test("get word count rdd") {

src/test/scala/WordCountTestWithTestingBase.scala src/test/scala/sample/WordCountTestWithTestingBase.scala

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
package sample
2+
13
import com.holdenkarau.spark.testing.SharedSparkContext
24
import org.scalatest.FunSuite
35

0 commit comments

Comments
 (0)