From 9b6e316c63d6aa51b90e6f72c72ae04fa92d6d67 Mon Sep 17 00:00:00 2001 From: "yevgeni.blenki" Date: Tue, 2 Dec 2025 13:21:28 +0200 Subject: [PATCH 01/10] Add exception handling for consumer polling --- README.md | 19 +++++--- src/ketu/async/source.clj | 64 ++++++++++++++++++++----- test/ketu/async/source_test.clj | 83 +++++++++++++++++++++++++-------- 3 files changed, 128 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index d844ca4..5cf4d14 100644 --- a/README.md +++ b/README.md @@ -78,11 +78,14 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it. | :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config | #### Consumer-source options -| Key | Type | Req? | Notes | -|---------------------------------|-----------------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------| -| :group-id | string | required | | -| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | -| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) | + +| Key | Type | Req? | Notes | +|---------------------------------------|-----------------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------| +| :group-id | string | required | | +| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | +| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) | +| :ketu.source/custom-catch-fn | `(fn [^Consumer consumer opts] ...)` | optional | Called when `poll` throws (non-wakeup) exception; should return a (possibly empty) collection of records. May mutate consumer (e.g. `seek`) and/or opts | +| :ketu.source/error-skip-offset-amount | int | optional | Number of records to skip on a poll exception. If not set, falls back to Kafka `max.poll.records` from `:internal-config` (string or numeric), else 1 | #### Producer-sink options @@ -145,7 +148,8 @@ for example when managing the offset manually, auto-commit should usually set to In this example we use the decorator to run commands in the polling thread context. The consumer is paused/resumed based on commands sent from the application. -The decorator processes all immediately available commands in the commands-chan, and only then calls (poll-fn). +The decorator processes all immediately available commands in the commands-chan, and only then calls (poll-fn). + ```clojure (ns consumer-decorator-example (:require [clojure.core.async :as async] @@ -226,9 +230,10 @@ The decorator processes all immediately available commands in the commands-chan, ``` ## Java Kafka client versions + - `ketu` version 1.0.0+ uses `org.apache.kafka/kafka-clients` version 3.3.1 - `ketu` version 2.1.0+ uses `org.apache.kafka/kafka-clients` version 3.9.1 - - For a comprehensive list of changes in the java client, see [here](kafka-client-changes-analysis.md) + - For a comprehensive list of changes in the java client, see [here](kafka-client-changes-analysis.md) ## Development & Contribution diff --git a/src/ketu/async/source.clj b/src/ketu/async/source.clj index 051db46..7d39c4e 100644 --- a/src/ketu/async/source.clj +++ b/src/ketu/async/source.clj @@ -8,6 +8,7 @@ [ketu.util.log :as log]) (:import (java.time Duration) (org.apache.kafka.clients.consumer Consumer) + (org.apache.kafka.common TopicPartition) (org.apache.kafka.common.errors WakeupException) (org.apache.kafka.common.serialization Deserializer) (org.slf4j Logger LoggerFactory))) @@ -74,22 +75,61 @@ (fn [consumer] (consumer/assign! consumer (consumer/topic-partitions topic partitions)))))) +(defn- get-max-poll-records-from-opts + "Gets max.poll.records from opts config, defaulting to 1 if not found." + [opts] + (try + (let [config (:ketu.apache.consumer/config opts) + max-poll-records (get config "max.poll.records")] + (if max-poll-records + (if (string? max-poll-records) + (Long/parseLong max-poll-records) + (long max-poll-records)) + 1)) + (catch Exception _ + (log/error logger "Failed to get max.poll.records from opts" opts) + 1))) + +(defn- increment-offsets-for-assigned-partitions! + "Increments the offset by skip-amount for all assigned partitions to skip faulty messages. + Uses max.poll.records from opts config if skip-amount is not provided." + ([^Consumer consumer source-name opts] + (increment-offsets-for-assigned-partitions! consumer source-name opts nil)) + ([^Consumer consumer source-name opts skip-amount] + (try + (let [assigned-partitions (consumer/assignment consumer) + skip-amount (or skip-amount (get-max-poll-records-from-opts opts))] + (doseq [^TopicPartition partition assigned-partitions] + (try + (let [current-position (consumer/position consumer partition) + next-offset (+ current-position skip-amount)] + (consumer/seek! consumer partition next-offset) + (log/info logger "[source={}] Incremented offset for partition {} from {} to {} (skip amount: {})" + source-name partition current-position next-offset skip-amount)) + (catch Exception e + (log/error logger "[source={}] Failed to increment offset for partition {}" + source-name partition e))))) + (catch Exception e + (log/error logger "[source={}] Failed to get assigned partitions for offset increment" + source-name e))))) + (defn- poll-fn [^Consumer consumer should-poll? opts] (when @should-poll? (let [source-name (:ketu/name opts) poll-timeout-duration (Duration/ofMillis (:ketu.source/poll-timeout-ms opts)) - catching-poll? (:ketu.source.legacy/catching-poll? opts)] - (if catching-poll? - ;TODO Eliminate catching poll ASAP. - ; Just in case of a production issue and generic error handling wasn't implemented yet. - (fn [] - (try - (consumer/poll! consumer poll-timeout-duration) - (catch Exception e - (log/error logger "[source={}] Caught poll exception" source-name e) - []))) - (fn [] - (consumer/poll! consumer poll-timeout-duration)))))) + custom-catch-fn (:ketu.source/custom-catch-fn opts)] + (fn [] + (try + (consumer/poll! consumer poll-timeout-duration) + (catch WakeupException e + (throw e)) + (catch Exception e + (if (some? custom-catch-fn) + (custom-catch-fn consumer opts) + (let [skip-amount (:ketu.source/error-skip-offset-amount opts)] + (log/error logger "[source={}] Caught poll exception, skipping faulty batch" source-name e) + (increment-offsets-for-assigned-partitions! consumer source-name opts skip-amount) + [])))))))) (defn- ->data-fn [{:keys [ketu.source/shape] :as opts}] (cond diff --git a/test/ketu/async/source_test.clj b/test/ketu/async/source_test.clj index 8b0a678..f74f2d2 100644 --- a/test/ketu/async/source_test.clj +++ b/test/ketu/async/source_test.clj @@ -2,6 +2,7 @@ (:require [clojure.test :refer [deftest testing is]] [clojure.core.async :as async] [clojure.core.async.impl.protocols] + [clojure.string :as str] [ketu.test.log :as log] [ketu.test.util :as u] [ketu.clients.consumer :as consumer] @@ -140,22 +141,66 @@ [:info "[source=test] Exit consumer thread"]] (log/events log-ctx)))))))) -(deftest unrecoverable-exception-logs - (testing "Throw unrecoverable exception on first poll" - (log/with-test-appender - (log/ns-logger 'ketu.async.source) - (fn [log-ctx] - (let [consumer (doto (mock-consumer) - (.setPollException (KafkaException. "test exception"))) - ch (async/chan) - source (source/source ch {:name "test" - :topic "test-topic" - :ketu.source/consumer-supplier (constantly consumer)})] - (u/try-take! (source/done-chan source)) - (is (= [[:info "[source=test] Start consumer thread"] - [:error "[source=test] Unrecoverable consumer error"] - [:info "[source=test] Done consuming"] - [:info "[source=test] Close out channel"] - [:info "[source=test] Close consumer"] - [:info "[source=test] Exit consumer thread"]] - (log/events log-ctx)))))))) +(deftest poll-catch-fn + (testing "Custom catch function is called, receives correct parameters, and can return empty collection" + (let [received-opts (atom nil) + topic "test-topic" + partition (consumer/topic-partition topic 0) + custom-catch-fn (fn [consumer opts] + (reset! received-opts opts) + (consumer/seek! consumer partition 1) + []) ; Return empty collection + consumer (doto (mock-consumer topic) + (.setPollException (KafkaException. "test exception"))) + ch (async/chan) + opts {:name "test" + :topic topic + :ketu.source/consumer-supplier (constantly consumer) + :ketu.source/custom-catch-fn custom-catch-fn + :ketu.source/close-out-chan? false + :custom-opt "custom-value"} + source (source/source ch opts)] + (add-record consumer (ConsumerRecord. topic 0 0 "test-key" "test-value")) + (Thread/sleep 100) + (is (= "custom-value" (:custom-opt @received-opts))) + (is (not (channel-closed? ch))) + (let [timeout-ch (async/timeout 100) + [item _] (async/alts!! [ch timeout-ch] :priority true)] + (is (nil? item) "Catch function returns [], so no records should be put on channel")) + (source/stop! source))) + + (testing "Default catch function handles faulty message gracefully and then processes healthy message" + (let [orig-poll consumer/poll!] + (with-redefs [ketu.clients.consumer/poll! + (fn [c t] + (let [records (orig-poll c t)] + (if (some #(or (= "faulty-key" (.key %)) + (= "faulty-value" (.value %))) + records) + (do + ;; Reset position to 0 to simulate that we are stuck at the faulty record + ;; Since orig-poll advanced it, we must rewind for the test logic to be valid. + (consumer/seek! c (consumer/topic-partition "test-topic" 0) 0) + (throw (KafkaException. "Simulated corruption"))) + records)))] + (log/with-test-appender + (log/ns-logger 'ketu.async.source) + (fn [log-ctx] + (let [consumer (mock-consumer "test-topic") + ch (async/chan) + source (source/source ch {:name "test" + :topic "test-topic" + :ketu.source/consumer-supplier (constantly consumer) + :ketu.source/close-out-chan? false})] + (add-record consumer (ConsumerRecord. "test-topic" 0 0 "faulty-key" "faulty-value")) + (Thread/sleep 100) + (is (some #(and (= :error (first %)) + (str/includes? (second %) "Caught poll exception")) + (log/events log-ctx))) + (add-record consumer (ConsumerRecord. "test-topic" 0 1 "healthy-key" "healthy-value")) + (Thread/sleep 100) + (let [received-record (u/try-take! ch)] + (is (= "healthy-key" (.key received-record))) + (is (= "healthy-value" (.value received-record))) + (is (= 1 (.offset received-record)))) + (source/stop! source)))))))) \ No newline at end of file From 798240fd00b13d739491579344e17322d952fa43 Mon Sep 17 00:00:00 2001 From: "yevgeni.blenki" Date: Wed, 3 Dec 2025 14:44:25 +0200 Subject: [PATCH 02/10] refactoring --- README.md | 14 +++++++------- src/ketu/async/source.clj | 32 +++++++++++++++---------------- test/ketu/async/source_test.clj | 34 ++++++++++++++++----------------- 3 files changed, 40 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 5cf4d14..6b6f488 100644 --- a/README.md +++ b/README.md @@ -79,13 +79,13 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it. #### Consumer-source options -| Key | Type | Req? | Notes | -|---------------------------------------|-----------------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------| -| :group-id | string | required | | -| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | -| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) | -| :ketu.source/custom-catch-fn | `(fn [^Consumer consumer opts] ...)` | optional | Called when `poll` throws (non-wakeup) exception; should return a (possibly empty) collection of records. May mutate consumer (e.g. `seek`) and/or opts | -| :ketu.source/error-skip-offset-amount | int | optional | Number of records to skip on a poll exception. If not set, falls back to Kafka `max.poll.records` from `:internal-config` (string or numeric), else 1 | +| Key | Type | Req? | Notes | +|---------------------------------------|-----------------------------------------------------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| :group-id | string | required | | +| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | +| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) | +| :ketu.source/poll-error-handler | `(fn [^Consumer consumer opts] ...)` | optional | Called when `poll` throws (non-wakeup) exception; should return a (possibly empty) collection of records. May mutate consumer (e.g. `seek`) and/or opts (consumer options) | +| :ketu.source/error-skip-offset-amount | int | optional | Number of records to skip on a poll exception. If not set, falls back to Kafka `max.poll.records` from `:internal-config` (string or numeric), else 1 | #### Producer-sink options diff --git a/src/ketu/async/source.clj b/src/ketu/async/source.clj index 7d39c4e..7dd4829 100644 --- a/src/ketu/async/source.clj +++ b/src/ketu/async/source.clj @@ -91,21 +91,19 @@ 1))) (defn- increment-offsets-for-assigned-partitions! - "Increments the offset by skip-amount for all assigned partitions to skip faulty messages. - Uses max.poll.records from opts config if skip-amount is not provided." - ([^Consumer consumer source-name opts] - (increment-offsets-for-assigned-partitions! consumer source-name opts nil)) - ([^Consumer consumer source-name opts skip-amount] + "Increments the offset by records-to-skip for all assigned partitions to skip faulty messages. + Uses max.poll.records from opts config if records-to-skip is not provided." + ([^Consumer consumer source-name opts records-to-skip] (try (let [assigned-partitions (consumer/assignment consumer) - skip-amount (or skip-amount (get-max-poll-records-from-opts opts))] + records-to-skip (or records-to-skip (get-max-poll-records-from-opts opts))] (doseq [^TopicPartition partition assigned-partitions] (try (let [current-position (consumer/position consumer partition) - next-offset (+ current-position skip-amount)] + next-offset (+ current-position records-to-skip)] (consumer/seek! consumer partition next-offset) (log/info logger "[source={}] Incremented offset for partition {} from {} to {} (skip amount: {})" - source-name partition current-position next-offset skip-amount)) + source-name partition current-position next-offset records-to-skip)) (catch Exception e (log/error logger "[source={}] Failed to increment offset for partition {}" source-name partition e))))) @@ -113,23 +111,25 @@ (log/error logger "[source={}] Failed to get assigned partitions for offset increment" source-name e))))) +(defn default-poll-error-handler [consumer opts] + (let [records-to-skip (:ketu.source/error-skip-offset-amount opts) + source-name (:ketu/name opts)] + (increment-offsets-for-assigned-partitions! consumer source-name opts records-to-skip) + [])) + (defn- poll-fn [^Consumer consumer should-poll? opts] (when @should-poll? (let [source-name (:ketu/name opts) - poll-timeout-duration (Duration/ofMillis (:ketu.source/poll-timeout-ms opts)) - custom-catch-fn (:ketu.source/custom-catch-fn opts)] + error-handler (:ketu.source/poll-error-handler opts (default-poll-error-handler consumer opts)) + poll-timeout-duration (Duration/ofMillis (:ketu.source/poll-timeout-ms opts))] (fn [] (try (consumer/poll! consumer poll-timeout-duration) (catch WakeupException e (throw e)) (catch Exception e - (if (some? custom-catch-fn) - (custom-catch-fn consumer opts) - (let [skip-amount (:ketu.source/error-skip-offset-amount opts)] - (log/error logger "[source={}] Caught poll exception, skipping faulty batch" source-name e) - (increment-offsets-for-assigned-partitions! consumer source-name opts skip-amount) - [])))))))) + (log/error logger "[source={}] Caught poll exception, skipping faulty batch" source-name e) + (error-handler consumer opts))))))) (defn- ->data-fn [{:keys [ketu.source/shape] :as opts}] (cond diff --git a/test/ketu/async/source_test.clj b/test/ketu/async/source_test.clj index f74f2d2..0cfdf41 100644 --- a/test/ketu/async/source_test.clj +++ b/test/ketu/async/source_test.clj @@ -143,23 +143,23 @@ (deftest poll-catch-fn (testing "Custom catch function is called, receives correct parameters, and can return empty collection" - (let [received-opts (atom nil) - topic "test-topic" - partition (consumer/topic-partition topic 0) - custom-catch-fn (fn [consumer opts] - (reset! received-opts opts) - (consumer/seek! consumer partition 1) - []) ; Return empty collection - consumer (doto (mock-consumer topic) - (.setPollException (KafkaException. "test exception"))) - ch (async/chan) - opts {:name "test" - :topic topic - :ketu.source/consumer-supplier (constantly consumer) - :ketu.source/custom-catch-fn custom-catch-fn - :ketu.source/close-out-chan? false - :custom-opt "custom-value"} - source (source/source ch opts)] + (let [received-opts (atom nil) + topic "test-topic" + partition (consumer/topic-partition topic 0) + poll-error-handler (fn [consumer opts] + (reset! received-opts opts) + (consumer/seek! consumer partition 1) + []) ; Return empty collection + consumer (doto (mock-consumer topic) + (.setPollException (KafkaException. "test exception"))) + ch (async/chan) + opts {:name "test" + :topic topic + :ketu.source/consumer-supplier (constantly consumer) + :ketu.source/poll-error-handler poll-error-handler + :ketu.source/close-out-chan? false + :custom-opt "custom-value"} + source (source/source ch opts)] (add-record consumer (ConsumerRecord. topic 0 0 "test-key" "test-value")) (Thread/sleep 100) (is (= "custom-value" (:custom-opt @received-opts))) From f4b887fd0e94ac3ec34ce53f2e7bd2b00fee8d32 Mon Sep 17 00:00:00 2001 From: "yevgeni.blenki" Date: Mon, 22 Dec 2025 12:58:50 +0200 Subject: [PATCH 03/10] Fix formatting of options in consumer polling configuration --- test/ketu/async/source_test.clj | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/ketu/async/source_test.clj b/test/ketu/async/source_test.clj index 0cfdf41..9b5134e 100644 --- a/test/ketu/async/source_test.clj +++ b/test/ketu/async/source_test.clj @@ -156,9 +156,9 @@ opts {:name "test" :topic topic :ketu.source/consumer-supplier (constantly consumer) - :ketu.source/poll-error-handler poll-error-handler - :ketu.source/close-out-chan? false - :custom-opt "custom-value"} + :ketu.source/poll-error-handler poll-error-handler + :ketu.source/close-out-chan? false + :custom-opt "custom-value"} source (source/source ch opts)] (add-record consumer (ConsumerRecord. topic 0 0 "test-key" "test-value")) (Thread/sleep 100) From 3937349149ff57062737d9f30a7d91395c0b2b08 Mon Sep 17 00:00:00 2001 From: "yevgeni.blenki" Date: Mon, 22 Dec 2025 16:17:01 +0200 Subject: [PATCH 04/10] Fix ConsumerRecord offset in shape test --- test/ketu/async/source_test.clj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/ketu/async/source_test.clj b/test/ketu/async/source_test.clj index 9b5134e..6b87f70 100644 --- a/test/ketu/async/source_test.clj +++ b/test/ketu/async/source_test.clj @@ -93,7 +93,7 @@ (deftest shape (testing "Put ConsumerRecord objects by default" - (let [record (ConsumerRecord. "topic" 0 0 "k" "v") + (let [record (ConsumerRecord. "topic" 0 1 "k" "v") consumer (doto (mock-consumer "topic") (add-record record)) ch (async/chan) @@ -156,7 +156,7 @@ opts {:name "test" :topic topic :ketu.source/consumer-supplier (constantly consumer) - :ketu.source/poll-error-handler poll-error-handler + :ketu.source/poll-error-handler poll-error-handler :ketu.source/close-out-chan? false :custom-opt "custom-value"} source (source/source ch opts)] From 30bca28cf2e7d111577723ee17fad74459d70d6d Mon Sep 17 00:00:00 2001 From: "yevgeni.blenki" Date: Wed, 24 Dec 2025 13:01:01 +0200 Subject: [PATCH 05/10] Update test to verify offset increment on faulty batch handling --- test/ketu/async/source_test.clj | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/ketu/async/source_test.clj b/test/ketu/async/source_test.clj index 6b87f70..a92a10b 100644 --- a/test/ketu/async/source_test.clj +++ b/test/ketu/async/source_test.clj @@ -194,9 +194,10 @@ :ketu.source/close-out-chan? false})] (add-record consumer (ConsumerRecord. "test-topic" 0 0 "faulty-key" "faulty-value")) (Thread/sleep 100) - (is (some #(and (= :error (first %)) - (str/includes? (second %) "Caught poll exception")) - (log/events log-ctx))) + (is (some #(and (= :info (first %)) + (str/includes? (second %) "Incremented offset for partition")) + (log/events log-ctx)) + "Default handler should skip faulty batch by incrementing partition offset") (add-record consumer (ConsumerRecord. "test-topic" 0 1 "healthy-key" "healthy-value")) (Thread/sleep 100) (let [received-record (u/try-take! ch)] From 00682467a01a56b6deddcb7a709c65f0e0463566 Mon Sep 17 00:00:00 2001 From: "yevgeni.blenki" Date: Wed, 24 Dec 2025 14:03:37 +0200 Subject: [PATCH 06/10] Bump project version to 2.2.0-SNAPSHOT --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index 98acaac..ed6cb2f 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.appsflyer/ketu "2.1.0" +(defproject com.appsflyer/ketu "2.2.0-SNAPSHOT" :description "Clojure Apache Kafka client with core.async api" :url "https://github.com/AppsFlyer/ketu" :license {:name "Apache License, Version 2.0" From 5749006d3c4eecec716d01aa5521416d3002a9d7 Mon Sep 17 00:00:00 2001 From: "yevgeni.blenki" Date: Mon, 19 Jan 2026 14:11:55 +0200 Subject: [PATCH 07/10] Enhance error handling with custom catch function in polling mechanism --- src/ketu/async/source.clj | 200 +++++++++++++++++++++----------------- 1 file changed, 109 insertions(+), 91 deletions(-) diff --git a/src/ketu/async/source.clj b/src/ketu/async/source.clj index 7dd4829..eaabbbe 100644 --- a/src/ketu/async/source.clj +++ b/src/ketu/async/source.clj @@ -25,26 +25,26 @@ "Returns opts with final :ketu.apache.consumer/config entry" [opts] (let [original-config (:ketu.apache.consumer/config opts) - config (util/set-ketu-to-apache-opts original-config opts)] + config (util/set-ketu-to-apache-opts original-config opts)] (assoc opts :ketu.apache.consumer/config config))) (defn default-consumer-supplier [opts] - (let [key-type (:ketu.source/key-type opts) - key-deserializer (when (instance? Deserializer key-type) key-type) - value-type (:ketu.source/value-type opts) + (let [key-type (:ketu.source/key-type opts) + key-deserializer (when (instance? Deserializer key-type) key-type) + value-type (:ketu.source/value-type opts) value-deserializer (when (instance? Deserializer value-type) value-type)] (consumer/consumer (:ketu.apache.consumer/config opts) key-deserializer value-deserializer))) (defn default-opts [] - {:ketu.source/consumer-supplier default-consumer-supplier - :ketu.source/key-type :byte-array - :ketu.source/value-type :byte-array - :ketu.source/poll-timeout-ms 100 - :ketu.source/done-putting-timeout-ms 60000 - :ketu.source/consumer-close-timeout-ms 60000 + {:ketu.source/consumer-supplier default-consumer-supplier + :ketu.source/key-type :byte-array + :ketu.source/value-type :byte-array + :ketu.source/poll-timeout-ms 100 + :ketu.source/done-putting-timeout-ms 60000 + :ketu.source/consumer-close-timeout-ms 60000 :ketu.source/consumer-thread-timeout-ms 60000 - :ketu.source/close-out-chan? true - :ketu.source/close-consumer? true}) + :ketu.source/close-out-chan? true + :ketu.source/close-consumer? true}) (defn- finalize-opts [opts] (-> (default-opts) @@ -54,13 +54,13 @@ (defn- subscribe-fn "Returns a function that takes a consumer and subscribes to either a topic list or a pattern according to opts." [opts] - (let [subscribe (cond - (:ketu.source/topic opts) consumer/subscribe-to-topic! - (:ketu.source/topic-list opts) consumer/subscribe-to-list! - (:ketu.source/topic-pattern opts) consumer/subscribe-to-pattern!) - topic (or (:ketu.source/topic opts) - (:ketu.source/topic-list opts) - (:ketu.source/topic-pattern opts)) + (let [subscribe (cond + (:ketu.source/topic opts) consumer/subscribe-to-topic! + (:ketu.source/topic-list opts) consumer/subscribe-to-list! + (:ketu.source/topic-pattern opts) consumer/subscribe-to-pattern!) + topic (or (:ketu.source/topic opts) + (:ketu.source/topic-list opts) + (:ketu.source/topic-pattern opts)) create-listener (:ketu.source/create-rebalance-listener-obj opts)] (if create-listener (fn [consumer] (subscribe consumer topic (create-listener {:ketu.source/consumer consumer}))) @@ -70,7 +70,7 @@ "Returns a function that takes a consumer and assigns specific partitions according to opts." [opts] (when-let [assign-tps (:ketu.source/assign-single-topic-partitions opts)] - (let [topic (:ketu.source.assign/topic assign-tps) + (let [topic (:ketu.source.assign/topic assign-tps) partitions (:ketu.source.assign/partition-nums assign-tps)] (fn [consumer] (consumer/assign! consumer (consumer/topic-partitions topic partitions)))))) @@ -96,7 +96,7 @@ ([^Consumer consumer source-name opts records-to-skip] (try (let [assigned-partitions (consumer/assignment consumer) - records-to-skip (or records-to-skip (get-max-poll-records-from-opts opts))] + records-to-skip (or records-to-skip (get-max-poll-records-from-opts opts))] (doseq [^TopicPartition partition assigned-partitions] (try (let [current-position (consumer/position consumer partition) @@ -111,16 +111,34 @@ (log/error logger "[source={}] Failed to get assigned partitions for offset increment" source-name e))))) -(defn default-poll-error-handler [consumer opts] +(defn- default-poll-error-handler [consumer opts] (let [records-to-skip (:ketu.source/error-skip-offset-amount opts) - source-name (:ketu/name opts)] + source-name (:ketu/name opts)] (increment-offsets-for-assigned-partitions! consumer source-name opts records-to-skip) [])) +(defn- get-custom-error-handler [opts] + (let [provided-catch-fn (:ketu.source/custom-catch-fn opts) + custom-catch-fn + (cond + (nil? provided-catch-fn) + nil + + (fn? provided-catch-fn) + provided-catch-fn + + :else + (do + (log/error logger "[source={}] Invalid :ketu.source/custom-catch-fn (must be fn [consumer opts] -> coll), got: %s. Using default error handler." + (type provided-catch-fn)) + nil))] + custom-catch-fn)) + (defn- poll-fn [^Consumer consumer should-poll? opts] (when @should-poll? (let [source-name (:ketu/name opts) - error-handler (:ketu.source/poll-error-handler opts (default-poll-error-handler consumer opts)) + custom-error-handler (get-custom-error-handler opts) + error-handler (or custom-error-handler (default-poll-error-handler consumer opts)) poll-timeout-duration (Duration/ofMillis (:ketu.source/poll-timeout-ms opts))] (fn [] (try @@ -139,61 +157,61 @@ (defn- source-existing-consumer [^Consumer consumer out-chan opts] - (let [source-name (:ketu/name opts) - ^String thread-name (str "ketu-source-" source-name) - close-out-chan? (:ketu.source/close-out-chan? opts) - ^long close-consumer? (:ketu.source/close-consumer? opts) + (let [source-name (:ketu/name opts) + ^String thread-name (str "ketu-source-" source-name) + close-out-chan? (:ketu.source/close-out-chan? opts) + ^long close-consumer? (:ketu.source/close-consumer? opts) consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts) - should-poll? (volatile! true) - abort-pending-put (async/chan) - done-putting (async/chan) - subscribe! (or (subscribe-fn opts) (assign-fn opts)) - poll-impl (poll-fn consumer should-poll? opts) - poll! (if (some? (:ketu.source/consumer-decorator opts)) - (consumer-decorator/decorate-poll-fn {:ketu.source/consumer consumer} poll-impl opts) - poll-impl) - ->data (->data-fn opts) - put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put)) + should-poll? (volatile! true) + abort-pending-put (async/chan) + done-putting (async/chan) + subscribe! (or (subscribe-fn opts) (assign-fn opts)) + poll-impl (poll-fn consumer should-poll? opts) + poll! (if (some? (:ketu.source/consumer-decorator opts)) + (consumer-decorator/decorate-poll-fn {:ketu.source/consumer consumer} poll-impl opts) + poll-impl) + ->data (->data-fn opts) + put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put)) consumer-thread - (async/thread - (try - (.info logger "[source={}] Start consumer thread" source-name) - (.setName (Thread/currentThread) thread-name) - - (subscribe! consumer) - - (loop [] - (when-let [records (poll!)] - (run! put! records) - (recur))) - - (catch WakeupException e - ; We wakeup the consumer on graceful shutdown after should-poll? is false. - ; If it's not false, somebody else woke the consumer up unexpectedly. - (when @should-poll? - (log/error logger "[source={}] Unexpected consumer wakeup" source-name e))) - (catch Exception e - (log/error logger "[source={}] Unrecoverable consumer error" source-name e)) - (finally - (log/info logger "[source={}] Done consuming" source-name) - (async/close! done-putting) - (when close-out-chan? - (log/info logger "[source={}] Close out channel" source-name) - (async/close! out-chan)) - (when close-consumer? - (log/info logger "[source={}] Close consumer" source-name) - (consumer/close! consumer consumer-close-timeout-ms)) - (log/info logger "[source={}] Exit consumer thread" source-name)))) - - - state {:ketu/source-opts opts - :ketu.source/out-chan out-chan - :ketu.source/consumer consumer - :ketu.source/should-poll? should-poll? - :ketu.source/abort-pending-put abort-pending-put - :ketu.source/done-putting done-putting - :ketu.source/consumer-thread consumer-thread}] + (async/thread + (try + (.info logger "[source={}] Start consumer thread" source-name) + (.setName (Thread/currentThread) thread-name) + + (subscribe! consumer) + + (loop [] + (when-let [records (poll!)] + (run! put! records) + (recur))) + + (catch WakeupException e + ; We wakeup the consumer on graceful shutdown after should-poll? is false. + ; If it's not false, somebody else woke the consumer up unexpectedly. + (when @should-poll? + (log/error logger "[source={}] Unexpected consumer wakeup" source-name e))) + (catch Exception e + (log/error logger "[source={}] Unrecoverable consumer error" source-name e)) + (finally + (log/info logger "[source={}] Done consuming" source-name) + (async/close! done-putting) + (when close-out-chan? + (log/info logger "[source={}] Close out channel" source-name) + (async/close! out-chan)) + (when close-consumer? + (log/info logger "[source={}] Close consumer" source-name) + (consumer/close! consumer consumer-close-timeout-ms)) + (log/info logger "[source={}] Exit consumer thread" source-name)))) + + + state {:ketu/source-opts opts + :ketu.source/out-chan out-chan + :ketu.source/consumer consumer + :ketu.source/should-poll? should-poll? + :ketu.source/abort-pending-put abort-pending-put + :ketu.source/done-putting done-putting + :ketu.source/consumer-thread consumer-thread}] state)) @@ -201,18 +219,18 @@ "Starts consuming into a channel. Returns a state map including out-chan. Pass it to the `stop!` function when done." [ch opts] - (let [opts (ketu.spec/assert-and-conform :ketu/public-source-opts opts) - opts (finalize-opts opts) - source-name (:ketu/name opts) + (let [opts (ketu.spec/assert-and-conform :ketu/public-source-opts opts) + opts (finalize-opts opts) + source-name (:ketu/name opts) consumer-supplier (:ketu.source/consumer-supplier opts) - consumer (try - (consumer-supplier opts) - (catch Exception e - (log/error logger "[source={}] Error creating consumer-source" source-name e) - (when (opts :ketu.source/close-out-chan?) - (log/warn logger "[source={}] Close consumer channel" source-name) - (async/close! ch)) - (throw e)))] + consumer (try + (consumer-supplier opts) + (catch Exception e + (log/error logger "[source={}] Error creating consumer-source" source-name e) + (when (opts :ketu.source/close-out-chan?) + (log/warn logger "[source={}] Close consumer channel" source-name) + (async/close! ch)) + (throw e)))] (try (source-existing-consumer consumer ch opts) (catch Exception e @@ -234,9 +252,9 @@ (consumer/wakeup! consumer))) (defn- wait-for-put! [state] - (let [done-putting (:ketu.source/done-putting state) + (let [done-putting (:ketu.source/done-putting state) done-putting-timeout-ms (-> state :ketu/source-opts :ketu.source/done-putting-timeout-ms) - timeout (async/timeout done-putting-timeout-ms)] + timeout (async/timeout done-putting-timeout-ms)] (async/alts!! [done-putting timeout] :priority true))) (defn- abort-pending-put! [state] @@ -252,9 +270,9 @@ (:ketu.source/consumer-thread state)) (defn- wait-for-the-thread! [state] - (let [consumer-thread (:ketu.source/consumer-thread state) + (let [consumer-thread (:ketu.source/consumer-thread state) consumer-thread-timeout-ms (-> state :ketu/source-opts :ketu.source/consumer-thread-timeout-ms) - timeout (async/timeout consumer-thread-timeout-ms)] + timeout (async/timeout consumer-thread-timeout-ms)] (async/alts!! [consumer-thread timeout] :priority true))) (defn stop! [state] From 0e3bdfd3ef56bbeb5b3bb59b42182358c86e9669 Mon Sep 17 00:00:00 2001 From: "yevgeni.blenki" Date: Mon, 19 Jan 2026 14:44:54 +0200 Subject: [PATCH 08/10] Refactor error handler retrieval in polling mechanism --- src/ketu/async/source.clj | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/ketu/async/source.clj b/src/ketu/async/source.clj index eaabbbe..e191dd1 100644 --- a/src/ketu/async/source.clj +++ b/src/ketu/async/source.clj @@ -117,12 +117,12 @@ (increment-offsets-for-assigned-partitions! consumer source-name opts records-to-skip) [])) -(defn- get-custom-error-handler [opts] - (let [provided-catch-fn (:ketu.source/custom-catch-fn opts) +(defn- get-error-handler [opts] + (let [provided-catch-fn (:ketu.source/poll-error-handler opts) custom-catch-fn (cond (nil? provided-catch-fn) - nil + default-poll-error-handler (fn? provided-catch-fn) provided-catch-fn @@ -131,14 +131,13 @@ (do (log/error logger "[source={}] Invalid :ketu.source/custom-catch-fn (must be fn [consumer opts] -> coll), got: %s. Using default error handler." (type provided-catch-fn)) - nil))] + default-poll-error-handler))] custom-catch-fn)) (defn- poll-fn [^Consumer consumer should-poll? opts] (when @should-poll? (let [source-name (:ketu/name opts) - custom-error-handler (get-custom-error-handler opts) - error-handler (or custom-error-handler (default-poll-error-handler consumer opts)) + error-handler-fn (get-error-handler opts) poll-timeout-duration (Duration/ofMillis (:ketu.source/poll-timeout-ms opts))] (fn [] (try @@ -147,7 +146,7 @@ (throw e)) (catch Exception e (log/error logger "[source={}] Caught poll exception, skipping faulty batch" source-name e) - (error-handler consumer opts))))))) + (error-handler-fn consumer opts))))))) (defn- ->data-fn [{:keys [ketu.source/shape] :as opts}] (cond From 4863221539f0cf212f9c956a6d8a1a14a908f414 Mon Sep 17 00:00:00 2001 From: "yevgeni.blenki" Date: Wed, 21 Jan 2026 12:27:29 +0200 Subject: [PATCH 09/10] Refactor offset increment logic and simplify error handler retrieval in polling mechanism --- src/ketu/async/source.clj | 43 +++++++++++++-------------------------- 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/src/ketu/async/source.clj b/src/ketu/async/source.clj index e191dd1..827ee75 100644 --- a/src/ketu/async/source.clj +++ b/src/ketu/async/source.clj @@ -75,35 +75,20 @@ (fn [consumer] (consumer/assign! consumer (consumer/topic-partitions topic partitions)))))) -(defn- get-max-poll-records-from-opts - "Gets max.poll.records from opts config, defaulting to 1 if not found." - [opts] - (try - (let [config (:ketu.apache.consumer/config opts) - max-poll-records (get config "max.poll.records")] - (if max-poll-records - (if (string? max-poll-records) - (Long/parseLong max-poll-records) - (long max-poll-records)) - 1)) - (catch Exception _ - (log/error logger "Failed to get max.poll.records from opts" opts) - 1))) - (defn- increment-offsets-for-assigned-partitions! "Increments the offset by records-to-skip for all assigned partitions to skip faulty messages. - Uses max.poll.records from opts config if records-to-skip is not provided." - ([^Consumer consumer source-name opts records-to-skip] + If records-to-skip is not provided, default to 1." + ([^Consumer consumer source-name records-to-skip] (try (let [assigned-partitions (consumer/assignment consumer) - records-to-skip (or records-to-skip (get-max-poll-records-from-opts opts))] + skip-amount (or records-to-skip 1)] (doseq [^TopicPartition partition assigned-partitions] (try (let [current-position (consumer/position consumer partition) - next-offset (+ current-position records-to-skip)] + next-offset (+ current-position skip-amount)] (consumer/seek! consumer partition next-offset) (log/info logger "[source={}] Incremented offset for partition {} from {} to {} (skip amount: {})" - source-name partition current-position next-offset records-to-skip)) + source-name partition current-position next-offset skip-amount)) (catch Exception e (log/error logger "[source={}] Failed to increment offset for partition {}" source-name partition e))))) @@ -114,25 +99,25 @@ (defn- default-poll-error-handler [consumer opts] (let [records-to-skip (:ketu.source/error-skip-offset-amount opts) source-name (:ketu/name opts)] - (increment-offsets-for-assigned-partitions! consumer source-name opts records-to-skip) + (increment-offsets-for-assigned-partitions! consumer source-name records-to-skip) [])) (defn- get-error-handler [opts] - (let [provided-catch-fn (:ketu.source/poll-error-handler opts) - custom-catch-fn + (let [provided-error-fn (:ketu.source/poll-error-handler opts) + error-handler-fn (cond - (nil? provided-catch-fn) + (nil? provided-error-fn) default-poll-error-handler - (fn? provided-catch-fn) - provided-catch-fn + (fn? provided-error-fn) + provided-error-fn :else (do - (log/error logger "[source={}] Invalid :ketu.source/custom-catch-fn (must be fn [consumer opts] -> coll), got: %s. Using default error handler." - (type provided-catch-fn)) + (log/error logger "[source={}] Invalid :ketu.source/poll-error-handler (must be fn [consumer opts] -> coll), got: %s. Using default error handler." + (type provided-error-fn)) default-poll-error-handler))] - custom-catch-fn)) + error-handler-fn)) (defn- poll-fn [^Consumer consumer should-poll? opts] (when @should-poll? From 7ea482d8fdcd82ca5fd7fe7323100afa815bcef1 Mon Sep 17 00:00:00 2001 From: "yevgeni.blenki" Date: Thu, 22 Jan 2026 10:42:19 +0200 Subject: [PATCH 10/10] Update README to clarify default behavior for error-skip-offset-amount --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 6b6f488..7447e27 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it. | :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | | :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) | | :ketu.source/poll-error-handler | `(fn [^Consumer consumer opts] ...)` | optional | Called when `poll` throws (non-wakeup) exception; should return a (possibly empty) collection of records. May mutate consumer (e.g. `seek`) and/or opts (consumer options) | -| :ketu.source/error-skip-offset-amount | int | optional | Number of records to skip on a poll exception. If not set, falls back to Kafka `max.poll.records` from `:internal-config` (string or numeric), else 1 | +| :ketu.source/error-skip-offset-amount | int | optional | Number of records to skip on a poll exception. If not set, default value is 1. | #### Producer-sink options