Skip to content

Commit b017848

Browse files
authored
Refactored test code in order to speed up builds. (#54)
1 parent 5d7897d commit b017848

File tree

5 files changed

+304
-317
lines changed

5 files changed

+304
-317
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package net.manub.embeddedkafka
2+
3+
import java.util.concurrent.TimeoutException
4+
5+
import kafka.admin.AdminUtils
6+
import kafka.utils.ZkUtils
7+
import org.apache.kafka.clients.consumer.KafkaConsumer
8+
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
9+
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeserializer, StringSerializer}
10+
import org.scalatest.BeforeAndAfterAll
11+
12+
import scala.collection.JavaConversions._
13+
14+
15+
class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka with BeforeAndAfterAll {
16+
17+
override def beforeAll(): Unit = {
18+
super.beforeAll()
19+
EmbeddedKafka.start()
20+
}
21+
22+
override def afterAll(): Unit = {
23+
EmbeddedKafka.stop()
24+
super.afterAll()
25+
}
26+
27+
"the publishToKafka method" should {
28+
"publish synchronously a String message to Kafka" in {
29+
implicit val serializer = new StringSerializer()
30+
val message = "hello world!"
31+
val topic = "test_topic"
32+
33+
publishToKafka(topic, message)
34+
35+
val consumer = new KafkaConsumer[String, String](consumerProps, new StringDeserializer, new StringDeserializer)
36+
consumer.subscribe(List("test_topic"))
37+
38+
val records = consumer.poll(ConsumerPollTimeout)
39+
40+
records.iterator().hasNext shouldBe true
41+
val record = records.iterator().next()
42+
43+
record.value() shouldBe message
44+
45+
consumer.close()
46+
47+
}
48+
49+
"publish synchronously a String message with String key to Kafka" in {
50+
implicit val serializer = new StringSerializer()
51+
val key = "key"
52+
val message = "hello world!"
53+
val topic = "test_topic"
54+
55+
publishToKafka(topic, key, message)
56+
57+
val consumer = new KafkaConsumer[String, String](consumerProps, new StringDeserializer, new StringDeserializer)
58+
consumer.subscribe(List("test_topic"))
59+
60+
val records = consumer.poll(ConsumerPollTimeout)
61+
62+
records.iterator().hasNext shouldBe true
63+
val record = records.iterator().next()
64+
65+
record.key() shouldBe key
66+
record.value() shouldBe message
67+
68+
69+
consumer.close()
70+
}
71+
}
72+
73+
"the createCustomTopic method" should {
74+
"create a topic with a custom configuration" in {
75+
implicit val config = EmbeddedKafkaConfig(customBrokerProperties = Map("log.cleaner.dedupe.buffer.size" -> "2000000"))
76+
val topic = "test_custom_topic"
77+
78+
79+
createCustomTopic(topic, Map("cleanup.policy" -> "compact"))
80+
81+
val zkSessionTimeoutMs = 10000
82+
val zkConnectionTimeoutMs = 10000
83+
val zkSecurityEnabled = false
84+
85+
val zkUtils = ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled)
86+
try {
87+
AdminUtils.topicExists(zkUtils, topic) shouldBe true
88+
} finally zkUtils.close()
89+
90+
}
91+
92+
"create a topic with custom number of partitions" in {
93+
implicit val config = EmbeddedKafkaConfig()
94+
val topic = "test_custom_topic_with_custom_partitions"
95+
96+
97+
createCustomTopic(topic, Map("cleanup.policy" -> "compact"), partitions = 2)
98+
99+
val zkSessionTimeoutMs = 10000
100+
val zkConnectionTimeoutMs = 10000
101+
val zkSecurityEnabled = false
102+
103+
val zkUtils = ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled)
104+
try {
105+
AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionMetadata().size shouldBe 2
106+
} finally zkUtils.close()
107+
108+
}
109+
}
110+
111+
"the consumeFirstStringMessageFrom method" should {
112+
"return a message published to a topic" in {
113+
val message = "hello world!"
114+
val topic = "test_topic"
115+
116+
val producer = new KafkaProducer[String, String](Map(
117+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
118+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
119+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
120+
))
121+
122+
whenReady(producer.send(new ProducerRecord[String, String](topic, message))) { _ =>
123+
consumeFirstStringMessageFrom(topic) shouldBe message
124+
}
125+
126+
producer.close()
127+
}
128+
129+
"return a message published to a topic with implicit decoder" in {
130+
val message = "hello world!"
131+
val topic = "test_topic"
132+
133+
val producer = new KafkaProducer[String, String](Map(
134+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
135+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
136+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
137+
))
138+
139+
import Codecs._
140+
whenReady(producer.send(new ProducerRecord[String, String](topic, message))) { _ =>
141+
consumeFirstMessageFrom[Array[Byte]](topic) shouldBe message.getBytes
142+
}
143+
144+
producer.close()
145+
}
146+
147+
"return a message published to a topic with custom decoder" in {
148+
149+
import avro._
150+
151+
val message = TestAvroClass("name")
152+
val topic = "test_topic"
153+
implicit val testAvroClassDecoder = specificAvroDeserializer[TestAvroClass](TestAvroClass.SCHEMA$)
154+
155+
val producer = new KafkaProducer[String, TestAvroClass](Map(
156+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001"
157+
), new StringSerializer, specificAvroSerializer[TestAvroClass])
158+
159+
whenReady(producer.send(new ProducerRecord(topic, message))) { _ =>
160+
consumeFirstMessageFrom[TestAvroClass](topic) shouldBe message
161+
}
162+
163+
producer.close()
164+
}
165+
166+
"throw a TimeoutExeption when a message is not available" in {
167+
a[TimeoutException] shouldBe thrownBy {
168+
consumeFirstStringMessageFrom("non_existing_topic")
169+
}
170+
}
171+
}
172+
173+
"the aKafkaProducerThat method" should {
174+
"return a producer that encodes messages for the given encoder" in {
175+
val producer = aKafkaProducer thatSerializesValuesWith classOf[ByteArraySerializer]
176+
producer.send(new ProducerRecord[String, Array[Byte]]("a_topic", "a message".getBytes))
177+
producer.close()
178+
}
179+
}
180+
181+
"the aKafkaProducer object" should {
182+
"return a producer that encodes messages for the given type" in {
183+
import Codecs._
184+
val producer = aKafkaProducer[String]
185+
producer.send(new ProducerRecord[String, String]("a_topic", "a message"))
186+
producer.close()
187+
}
188+
189+
"return a producer that encodes messages for a custom type" in {
190+
import avro._
191+
val producer = aKafkaProducer[TestAvroClass]
192+
producer.send(new ProducerRecord[String, TestAvroClass]("a_topic", TestAvroClass("name")))
193+
producer.close()
194+
}
195+
}
196+
197+
val ConsumerPollTimeout = 3000
198+
199+
}

0 commit comments

Comments
 (0)