Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 8c983ab

Browse files
committed
Merge branch 'develop'
2 parents 684b7cf + 9f0d180 commit 8c983ab

File tree

4 files changed

+17
-8
lines changed

4 files changed

+17
-8
lines changed

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ name := "kafka-streams-scala"
44

55
organization := "com.lightbend"
66

7-
version := "0.1.1"
7+
version := "0.1.2"
88

99
scalaVersion := Versions.Scala_2_12_Version
1010

src/main/scala/com/lightbend/kafka/scala/streams/KGroupedStreamS.scala

+6-2
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,17 @@ class KGroupedStreamS[K, V](inner: KGroupedStream[K, V]) {
3636
def reduce(reducer: (V, V) => V,
3737
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] = {
3838

39-
inner.reduce((v1: V, v2: V) => reducer(v1, v2), materialized)
39+
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
40+
// works perfectly with Scala 2.12 though
41+
inner.reduce(((v1: V, v2: V) => reducer(v1, v2)).asReducer, materialized)
4042
}
4143

4244
def reduce(reducer: (V, V) => V,
4345
storeName: String): KTableS[K, V] = {
4446

45-
inner.reduce((v1: V, v2: V) => reducer(v1, v2), Materialized.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName))
47+
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
48+
// works perfectly with Scala 2.12 though
49+
inner.reduce(((v1: V, v2: V) => reducer(v1, v2)).asReducer, Materialized.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName))
4650
}
4751

4852
def aggregate[VR](initializer: () => VR,

src/main/scala/com/lightbend/kafka/scala/streams/KGroupedTableS.scala

+6-3
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,18 @@ class KGroupedTableS[K, V](inner: KGroupedTable[K, V]) {
2828
def reduce(adder: (V, V) => V,
2929
subtractor: (V, V) => V): KTableS[K, V] = {
3030

31-
inner.reduce((v1, v2) => adder(v1, v2), (v1, v2) => subtractor(v1, v2))
31+
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
32+
// works perfectly with Scala 2.12 though
33+
inner.reduce(((v1, v2) => adder(v1, v2)).asReducer, ((v1, v2) => subtractor(v1, v2)).asReducer)
3234
}
3335

3436
def reduce(adder: (V, V) => V,
3537
subtractor: (V, V) => V,
3638
materialized: Materialized[K, V, ByteArrayKVStore]): KTableS[K, V] = {
3739

38-
inner.reduce((v1, v2) => adder(v1, v2), (v1, v2) => subtractor(v1, v2), materialized)
40+
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
41+
// works perfectly with Scala 2.12 though
42+
inner.reduce(((v1, v2) => adder(v1, v2)).asReducer, ((v1, v2) => subtractor(v1, v2)).asReducer, materialized)
3943
}
4044

4145
def aggregate[VR](initializer: () => VR,
@@ -50,7 +54,6 @@ class KGroupedTableS[K, V](inner: KGroupedTable[K, V]) {
5054
subtractor: (K, V, VR) => VR,
5155
materialized: Materialized[K, VR, ByteArrayKVStore]): KTableS[K, VR] = {
5256

53-
5457
inner.aggregate(initializer.asInitializer, adder.asAggregator, subtractor.asAggregator, materialized)
5558
}
5659
}

src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTest.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ object StreamToTableJoinScalaIntegrationTest extends TestSuite[KafkaLocalServer]
7373
p.put(StreamsConfig.CLIENT_ID_CONFIG, "join-scala-integration-test-standard-consumer")
7474
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
7575
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
76-
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
76+
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long.getClass.getName)
7777
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100")
7878
p.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir)
7979
p
@@ -97,7 +97,9 @@ object StreamToTableJoinScalaIntegrationTest extends TestSuite[KafkaLocalServer]
9797

9898
// Compute the total per region by summing the individual click counts per region.
9999
.groupByKey(Serialized.`with`(stringSerde, longSerde))
100-
.reduce(_ + _)
100+
101+
// .reduce(_ + _, "local_state_data") // doesn't work in Scala 2.11, works with Scala 2.12
102+
.reduce((firstClicks: Long, secondClicks: Long) => firstClicks + secondClicks, "local_state_data")
101103

102104
// Write the (continuously updating) results to the output topic.
103105
clicksPerRegion.toStream.to(outputTopic, Produced.`with`(stringSerde, longSerde))

0 commit comments

Comments
 (0)