Skip to content

Commit 645ff1a

Browse files
authored
Revert "feat: use buildAndRegisterGlobal (#43)" (#44)
This reverts commit b9cd0b7.
1 parent b9cd0b7 commit 645ff1a

1 file changed

Lines changed: 3 additions & 11 deletions

File tree

src/main/java/io/goboolean/streams/kafka/KafkaAggregateConsumerService.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111

1212
public class KafkaAggregateConsumerService {
1313

14-
private static final Logger logger = LoggerFactory.getLogger(KafkaAggregateConsumerService.class);
15-
1614
private final Properties props;
1715
private final Consumer<Integer, Model.Aggregate> consumer;
1816
private final KafkaConsumerListener listener;
@@ -33,15 +31,9 @@ public void run(String[] topics) {
3331

3432
pollingThread = new Thread(() -> {
3533
while (!Thread.currentThread().isInterrupted()) {
36-
try {
37-
consumer.poll(100).forEach(record -> {
38-
logger.debug("Received message - Topic: {}, Partition: {}, Offset: {}",
39-
record.topic(), record.partition(), record.offset());
40-
listener.onMessage(record.value());
41-
});
42-
} catch (Exception e) {
43-
logger.error("Error polling Kafka", e);
44-
}
34+
consumer.poll(100).forEach(record -> {
35+
listener.onMessage(record.value());
36+
});
4537
}
4638
});
4739
pollingThread.start();

0 commit comments

Comments
 (0)