Skip to content

Commit 53e88e4

Browse files
mikemintzmanub
authored andcommitted
Implement withRunningKafkaOnFoundPort (#76)
* Make withRunningKafka generic * Factor out withTempDir helper method * Factor out withRunningZooKeeper helper method * Implement withRunningKafkaOnFoundPort This is useful for writing tests that configure kafka/zookeeper to listen on port 0, i.e. listen on an arbitrary available port.
1 parent 09d50c5 commit 53e88e4

File tree

4 files changed

+137
-19
lines changed

4 files changed

+137
-19
lines changed

README.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,27 @@ class MySpec extends WordSpec with EmbeddedKafka {
7878
}
7979
```
8080

81+
If you want to run ZooKeeper and Kafka on arbitrary available ports, you can
82+
use the `withRunningKafkaOnFoundPort` method. This is useful to make tests more
83+
reliable, especially when running tests in parallel or on machines where other
84+
tests or services may be running with port numbers you can't control.
85+
86+
```scala
87+
class MySpec extends WordSpec with EmbeddedKafka {
88+
89+
"runs with embedded kafka on arbitrary available ports" should {
90+
91+
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
92+
93+
withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
94+
// now a kafka broker is listening on actualConfig.kafkaPort
95+
publishStringMessageToKafka("topic", "message")
96+
consumeFirstStringMessageFrom("topic") shouldBe "message"
97+
}
98+
99+
}
100+
```
101+
81102
The same implicit `EmbeddedKafkaConfig` is used to define custom consumer or producer properties
82103

83104
```scala
@@ -102,7 +123,7 @@ class MySpec extends WordSpec with EmbeddedKafka {
102123
}
103124
```
104125

105-
This works for both `withRunningKafka` and `EmbeddedKafka.start()`
126+
This works for `withRunningKafka`, `withRunningKafkaOnFoundPort`, and `EmbeddedKafka.start()`
106127

107128
Also, it is now possible to provide custom properties to the broker while starting Kafka. `EmbeddedKafkaConfig` has a
108129
`customBrokerProperties` field which can be used to provide extra properties contained in a `Map[String, String]`.

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -129,26 +129,64 @@ sealed trait EmbeddedKafkaSupport {
129129
* @param body the function to execute
130130
* @param config an implicit [[EmbeddedKafkaConfig]]
131131
*/
132-
def withRunningKafka(body: => Any)(
133-
implicit config: EmbeddedKafkaConfig): Any = {
134-
135-
def cleanLogs(directories: Directory*): Unit = {
136-
directories.foreach(_.deleteRecursively())
132+
def withRunningKafka[T](body: => T)(
133+
implicit config: EmbeddedKafkaConfig): T = {
134+
withRunningZooKeeper(config.zooKeeperPort) { zkPort =>
135+
withTempDir("kafka") { kafkaLogsDir =>
136+
val broker = startKafka(config.copy(zooKeeperPort = zkPort), kafkaLogsDir)
137+
try {
138+
body
139+
} finally {
140+
broker.shutdown()
141+
broker.awaitShutdown()
142+
}
143+
}
137144
}
145+
}
138146

139-
val zkLogsDir = Directory.makeTemp("zookeeper-logs")
140-
val kafkaLogsDir = Directory.makeTemp("kafka")
147+
/**
148+
* Starts a ZooKeeper instance and a Kafka broker, then executes the body passed as a parameter.
149+
* The actual ZooKeeper and Kafka ports will be detected and inserted into a copied version of
150+
* the EmbeddedKafkaConfig that gets passed to body. This is useful if you set either or both
151+
* port to 0, which will listen on an arbitrary available port.
152+
*
153+
* @param config the user-defined [[EmbeddedKafkaConfig]]
154+
* @param body the function to execute, given an [[EmbeddedKafkaConfig]] with the actual
155+
* ports Kafka and ZooKeeper are running on
156+
*/
157+
def withRunningKafkaOnFoundPort[T](config: EmbeddedKafkaConfig)(body: EmbeddedKafkaConfig => T): T = {
158+
withRunningZooKeeper(config.zooKeeperPort) { zkPort =>
159+
withTempDir("kafka") { kafkaLogsDir =>
160+
val broker: KafkaServer = startKafka(config.copy(zooKeeperPort = zkPort), kafkaLogsDir)
161+
val kafkaPort = broker.boundPort(broker.config.listeners.head.listenerName)
162+
val actualConfig = config.copy(kafkaPort = kafkaPort, zooKeeperPort = zkPort)
163+
try {
164+
body(actualConfig)
165+
} finally {
166+
broker.shutdown()
167+
broker.awaitShutdown()
168+
}
169+
}
170+
}
171+
}
141172

142-
val factory = startZooKeeper(config.zooKeeperPort, zkLogsDir)
143-
val broker = startKafka(config, kafkaLogsDir)
173+
private def withRunningZooKeeper[T](port: Int)(body: Int => T): T = {
174+
withTempDir("zookeeper-logs") { zkLogsDir =>
175+
val factory = startZooKeeper(port, zkLogsDir)
176+
try {
177+
body(factory.getLocalPort)
178+
} finally {
179+
factory.shutdown()
180+
}
181+
}
182+
}
144183

184+
private def withTempDir[T](prefix: String)(body: Directory => T): T = {
185+
val dir = Directory.makeTemp(prefix)
145186
try {
146-
body
187+
body(dir)
147188
} finally {
148-
broker.shutdown()
149-
broker.awaitShutdown()
150-
factory.shutdown()
151-
cleanLogs(zkLogsDir, kafkaLogsDir)
189+
dir.deleteRecursively()
152190
}
153191
}
154192

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package net.manub.embeddedkafka
2+
3+
class EmbeddedKafkaWithRunningKafkaOnFoundPortSpec
4+
extends EmbeddedKafkaSpecSupport
5+
with EmbeddedKafka {
6+
7+
"the withRunningKafkaOnFoundPort method" should {
8+
"start and stop Kafka and Zookeeper successfully on non-zero ports" in {
9+
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 12346)
10+
val actualConfig = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig =>
11+
actualConfig shouldBe userDefinedConfig
12+
bothKafkaAndZkAreAvailable(actualConfig)
13+
actualConfig
14+
}
15+
bothKafkaAndZkAreNotAvailable(actualConfig)
16+
}
17+
18+
"start and stop multiple Kafka and Zookeeper successfully on arbitrary available ports" in {
19+
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
20+
val actualConfig1 = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig1 =>
21+
bothKafkaAndZkAreAvailable(actualConfig1)
22+
publishStringMessageToKafka("topic", "message1")(actualConfig1)
23+
consumeFirstStringMessageFrom("topic")(actualConfig1) shouldBe "message1"
24+
val actualConfig2 = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig2 =>
25+
bothKafkaAndZkAreAvailable(actualConfig2)
26+
publishStringMessageToKafka("topic", "message2")(actualConfig2)
27+
consumeFirstStringMessageFrom("topic")(actualConfig2) shouldBe "message2"
28+
val allConfigs = Seq(userDefinedConfig, actualConfig1, actualConfig2)
29+
// Confirm both actual configs are running on separate non-zero ports, but otherwise equal
30+
allConfigs.map(_.kafkaPort).distinct should have size 3
31+
allConfigs.map(_.zooKeeperPort).distinct should have size 3
32+
allConfigs.map(_.copy(kafkaPort = 0, zooKeeperPort = 0)).distinct should have size 1
33+
actualConfig2
34+
}
35+
bothKafkaAndZkAreNotAvailable(actualConfig2)
36+
actualConfig1
37+
}
38+
bothKafkaAndZkAreNotAvailable(actualConfig1)
39+
}
40+
41+
"work with a simple example using implicits" in {
42+
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
43+
withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
44+
publishStringMessageToKafka("topic", "message")
45+
consumeFirstStringMessageFrom("topic") shouldBe "message"
46+
}
47+
}
48+
}
49+
50+
private def bothKafkaAndZkAreAvailable(config: EmbeddedKafkaConfig): Unit = {
51+
kafkaIsAvailable(config.kafkaPort)
52+
zookeeperIsAvailable(config.zooKeeperPort)
53+
}
54+
55+
private def bothKafkaAndZkAreNotAvailable(config: EmbeddedKafkaConfig): Unit = {
56+
kafkaIsNotAvailable(config.kafkaPort)
57+
zookeeperIsNotAvailable(config.zooKeeperPort)
58+
}
59+
}

embedded-kafka/src/test/scala/net/manub/embeddedkafka/embeddedKafkaSpecSupport.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,15 @@ abstract class EmbeddedKafkaSpecSupport
6060
expectMsg(1 second, ConnectionSuccessful)
6161
}
6262

63-
def kafkaIsNotAvailable(): Unit = {
63+
def kafkaIsNotAvailable(kafkaPort: Int = 6001): Unit = {
6464
system.actorOf(
65-
TcpClient.props(new InetSocketAddress("localhost", 6001), testActor))
65+
TcpClient.props(new InetSocketAddress("localhost", kafkaPort), testActor))
6666
expectMsg(1 second, ConnectionFailed)
6767
}
6868

69-
def zookeeperIsNotAvailable(): Unit = {
69+
def zookeeperIsNotAvailable(zookeeperPort: Int = 6000): Unit = {
7070
system.actorOf(
71-
TcpClient.props(new InetSocketAddress("localhost", 6000), testActor))
71+
TcpClient.props(new InetSocketAddress("localhost", zookeeperPort), testActor))
7272
expectMsg(1 second, ConnectionFailed)
7373
}
7474
}

0 commit comments

Comments
 (0)