Skip to content

Commit d6b6a74

Browse files
nguyenuymanub
authored andcommitted
Add batch consumption of messages (#67)
1 parent e122a2c commit d6b6a74

File tree

2 files changed

+124
-4
lines changed

2 files changed

+124
-4
lines changed

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import org.scalatest.Suite
1616

1717
import scala.collection.JavaConversions.mapAsJavaMap
1818
import scala.collection.mutable
19+
import scala.collection.mutable.ListBuffer
1920
import scala.concurrent.duration._
2021
import scala.concurrent.{ExecutionContext, TimeoutException}
2122
import scala.language.{higherKinds, postfixOps}
@@ -210,10 +211,23 @@ sealed trait EmbeddedKafkaSupport {
210211
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
211212
)
212213

214+
private def baseConsumerConfig(implicit config: EmbeddedKafkaConfig) : Properties = {
215+
val props = new Properties()
216+
props.put("group.id", s"embedded-kafka-spec")
217+
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
218+
props.put("auto.offset.reset", "earliest")
219+
props.put("enable.auto.commit", "false")
220+
props
221+
}
222+
213223
def consumeFirstStringMessageFrom(topic: String, autoCommit: Boolean = false)(
214224
implicit config: EmbeddedKafkaConfig): String =
215225
consumeFirstMessageFrom(topic, autoCommit)(config, new StringDeserializer())
216226

227+
def consumeNumberStringMessagesFrom(topic: String, number: Int, autoCommit: Boolean = false)(
228+
implicit config: EmbeddedKafkaConfig): List[String] =
229+
consumeNumberMessagesFrom(topic, number, autoCommit)(config, new StringDeserializer())
230+
217231
/**
218232
* Consumes the first message available in a given topic, deserializing it as a String.
219233
*
@@ -238,10 +252,7 @@ sealed trait EmbeddedKafkaSupport {
238252

239253
import scala.collection.JavaConversions._
240254

241-
val props = new Properties()
242-
props.put("group.id", s"embedded-kafka-spec")
243-
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
244-
props.put("auto.offset.reset", "earliest")
255+
val props = baseConsumerConfig
245256
props.put("enable.auto.commit", autoCommit.toString)
246257

247258
val consumer =
@@ -271,6 +282,69 @@ sealed trait EmbeddedKafkaSupport {
271282
}.get
272283
}
273284

285+
/**
286+
* Consumes the first n messages available in a given topic, deserializing it as a String, and returns
287+
* the n messages as a List.
288+
*
289+
* Only the messsages that are returned are committed if autoCommit is false.
290+
* If autoCommit is true then all messages that were polled will be committed.
291+
*
292+
* @param topic the topic to consume a message from
293+
* @param number the number of messagese to consume in a batch
294+
* @param autoCommit if false, only the offset for the consumed message will be commited.
295+
* if true, the offset for the last polled message will be committed instead.
296+
* Defaulted to false.
297+
* @param config an implicit [[EmbeddedKafkaConfig]]
298+
* @param deserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]] for the type [[T]]
299+
* @return the first message consumed from the given topic, with a type [[T]]
300+
* @throws TimeoutException if unable to consume a message within 5 seconds
301+
* @throws KafkaUnavailableException if unable to connect to Kafka
302+
*/
303+
def consumeNumberMessagesFrom[T](topic: String, number: Int, autoCommit: Boolean = false)(
304+
implicit config: EmbeddedKafkaConfig,
305+
deserializer: Deserializer[T]): List[T] = {
306+
307+
import scala.collection.JavaConverters._
308+
309+
val props = baseConsumerConfig
310+
props.put("enable.auto.commit", autoCommit.toString)
311+
312+
val consumer =
313+
new KafkaConsumer[String, T](props, new StringDeserializer, deserializer)
314+
315+
val messages = Try {
316+
val messagesBuffer = ListBuffer.empty[T]
317+
var messagesRead = 0
318+
consumer.subscribe(List(topic).asJava)
319+
consumer.partitionsFor(topic)
320+
321+
while (messagesRead < number) {
322+
val records = consumer.poll(5000)
323+
if (records.isEmpty) {
324+
throw new TimeoutException(
325+
"Unable to retrieve a message from Kafka in 5000ms")
326+
}
327+
328+
val recordIter = records.iterator()
329+
while (recordIter.hasNext && messagesRead < number) {
330+
val record = recordIter.next()
331+
messagesBuffer += record.value()
332+
val tp = new TopicPartition(record.topic(), record.partition())
333+
val om = new OffsetAndMetadata(record.offset() + 1)
334+
consumer.commitSync(Map(tp -> om).asJava)
335+
messagesRead += 1
336+
}
337+
}
338+
messagesBuffer.toList
339+
}
340+
341+
consumer.close()
342+
messages.recover {
343+
case ex: KafkaException => throw new KafkaUnavailableException(ex)
344+
}.get
345+
}
346+
347+
274348
object aKafkaProducer {
275349
private[this] var producers = Vector.empty[KafkaProducer[_, _]]
276350

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaMethodsSpec.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,52 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
195195
}
196196
}
197197

198+
"the consumeNumberStringMessagesFrom method" should {
199+
"consume set number of messages when multiple messages have been published to a topic" in {
200+
val messages = Set("message 1", "message 2", "message 3")
201+
val topic = "consume_test_topic"
202+
val producer = new KafkaProducer[String, String](Map(
203+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
204+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
205+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
206+
))
207+
208+
messages.foreach { message =>
209+
producer.send(new ProducerRecord[String, String](topic, message))
210+
}
211+
212+
producer.flush()
213+
214+
val consumedMessages = consumeNumberStringMessagesFrom(topic, messages.size)
215+
216+
consumedMessages.toSet shouldEqual messages
217+
218+
producer.close()
219+
}
220+
221+
"timeout and throw a TimeoutException when n messages are not received in time" in {
222+
val messages = Set("message 1", "message 2", "message 3")
223+
val topic = "consume_test_topic"
224+
val producer = new KafkaProducer[String, String](Map(
225+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
226+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
227+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
228+
))
229+
230+
messages.foreach { message =>
231+
producer.send(new ProducerRecord[String, String](topic, message))
232+
}
233+
234+
producer.flush()
235+
236+
a[TimeoutException] shouldBe thrownBy {
237+
consumeNumberStringMessagesFrom(topic, messages.size + 1)
238+
}
239+
240+
producer.close()
241+
}
242+
}
243+
198244
"the aKafkaProducerThat method" should {
199245
"return a producer that encodes messages for the given encoder" in {
200246
val producer = aKafkaProducer thatSerializesValuesWith classOf[ByteArraySerializer]

0 commit comments

Comments
 (0)