diff --git a/README.md b/README.md index d844ca4..7447e27 100644 --- a/README.md +++ b/README.md @@ -78,11 +78,14 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it. | :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config | #### Consumer-source options -| Key | Type | Req? | Notes | -|---------------------------------|-----------------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------| -| :group-id | string | required | | -| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | -| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) | + +| Key | Type | Req? | Notes | +|---------------------------------------|-----------------------------------------------------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| :group-id | string | required | | +| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | +| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) | +| :ketu.source/poll-error-handler | `(fn [^Consumer consumer opts] ...)` | optional | Called when `poll` throws (non-wakeup) exception; should return a (possibly empty) collection of records. May mutate consumer (e.g. `seek`) and/or opts (consumer options) | +| :ketu.source/error-skip-offset-amount | int | optional | Number of records to skip on a poll exception. If not set, default value is 1. | #### Producer-sink options @@ -145,7 +148,8 @@ for example when managing the offset manually, auto-commit should usually set to In this example we use the decorator to run commands in the polling thread context. The consumer is paused/resumed based on commands sent from the application. -The decorator processes all immediately available commands in the commands-chan, and only then calls (poll-fn). +The decorator processes all immediately available commands in the commands-chan, and only then calls (poll-fn). + ```clojure (ns consumer-decorator-example (:require [clojure.core.async :as async] @@ -226,9 +230,10 @@ The decorator processes all immediately available commands in the commands-chan, ``` ## Java Kafka client versions + - `ketu` version 1.0.0+ uses `org.apache.kafka/kafka-clients` version 3.3.1 - `ketu` version 2.1.0+ uses `org.apache.kafka/kafka-clients` version 3.9.1 - - For a comprehensive list of changes in the java client, see [here](kafka-client-changes-analysis.md) + - For a comprehensive list of changes in the java client, see [here](kafka-client-changes-analysis.md) ## Development & Contribution diff --git a/project.clj b/project.clj index 98acaac..ed6cb2f 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.appsflyer/ketu "2.1.0" +(defproject com.appsflyer/ketu "2.2.0-SNAPSHOT" :description "Clojure Apache Kafka client with core.async api" :url "https://github.com/AppsFlyer/ketu" :license {:name "Apache License, Version 2.0" diff --git a/src/ketu/async/source.clj b/src/ketu/async/source.clj index 051db46..827ee75 100644 --- a/src/ketu/async/source.clj +++ b/src/ketu/async/source.clj @@ -8,6 +8,7 @@ [ketu.util.log :as log]) (:import (java.time Duration) (org.apache.kafka.clients.consumer Consumer) + (org.apache.kafka.common TopicPartition) (org.apache.kafka.common.errors WakeupException) (org.apache.kafka.common.serialization Deserializer) (org.slf4j Logger LoggerFactory))) @@ -24,26 +25,26 @@ "Returns opts with final :ketu.apache.consumer/config entry" [opts] (let [original-config (:ketu.apache.consumer/config opts) - config (util/set-ketu-to-apache-opts original-config opts)] + config (util/set-ketu-to-apache-opts original-config opts)] (assoc opts :ketu.apache.consumer/config config))) (defn default-consumer-supplier [opts] - (let [key-type (:ketu.source/key-type opts) - key-deserializer (when (instance? Deserializer key-type) key-type) - value-type (:ketu.source/value-type opts) + (let [key-type (:ketu.source/key-type opts) + key-deserializer (when (instance? Deserializer key-type) key-type) + value-type (:ketu.source/value-type opts) value-deserializer (when (instance? Deserializer value-type) value-type)] (consumer/consumer (:ketu.apache.consumer/config opts) key-deserializer value-deserializer))) (defn default-opts [] - {:ketu.source/consumer-supplier default-consumer-supplier - :ketu.source/key-type :byte-array - :ketu.source/value-type :byte-array - :ketu.source/poll-timeout-ms 100 - :ketu.source/done-putting-timeout-ms 60000 - :ketu.source/consumer-close-timeout-ms 60000 + {:ketu.source/consumer-supplier default-consumer-supplier + :ketu.source/key-type :byte-array + :ketu.source/value-type :byte-array + :ketu.source/poll-timeout-ms 100 + :ketu.source/done-putting-timeout-ms 60000 + :ketu.source/consumer-close-timeout-ms 60000 :ketu.source/consumer-thread-timeout-ms 60000 - :ketu.source/close-out-chan? true - :ketu.source/close-consumer? true}) + :ketu.source/close-out-chan? true + :ketu.source/close-consumer? true}) (defn- finalize-opts [opts] (-> (default-opts) @@ -53,13 +54,13 @@ (defn- subscribe-fn "Returns a function that takes a consumer and subscribes to either a topic list or a pattern according to opts." [opts] - (let [subscribe (cond - (:ketu.source/topic opts) consumer/subscribe-to-topic! - (:ketu.source/topic-list opts) consumer/subscribe-to-list! - (:ketu.source/topic-pattern opts) consumer/subscribe-to-pattern!) - topic (or (:ketu.source/topic opts) - (:ketu.source/topic-list opts) - (:ketu.source/topic-pattern opts)) + (let [subscribe (cond + (:ketu.source/topic opts) consumer/subscribe-to-topic! + (:ketu.source/topic-list opts) consumer/subscribe-to-list! + (:ketu.source/topic-pattern opts) consumer/subscribe-to-pattern!) + topic (or (:ketu.source/topic opts) + (:ketu.source/topic-list opts) + (:ketu.source/topic-pattern opts)) create-listener (:ketu.source/create-rebalance-listener-obj opts)] (if create-listener (fn [consumer] (subscribe consumer topic (create-listener {:ketu.source/consumer consumer}))) @@ -69,27 +70,68 @@ "Returns a function that takes a consumer and assigns specific partitions according to opts." [opts] (when-let [assign-tps (:ketu.source/assign-single-topic-partitions opts)] - (let [topic (:ketu.source.assign/topic assign-tps) + (let [topic (:ketu.source.assign/topic assign-tps) partitions (:ketu.source.assign/partition-nums assign-tps)] (fn [consumer] (consumer/assign! consumer (consumer/topic-partitions topic partitions)))))) +(defn- increment-offsets-for-assigned-partitions! + "Increments the offset by records-to-skip for all assigned partitions to skip faulty messages. + If records-to-skip is not provided, default to 1." + ([^Consumer consumer source-name records-to-skip] + (try + (let [assigned-partitions (consumer/assignment consumer) + skip-amount (or records-to-skip 1)] + (doseq [^TopicPartition partition assigned-partitions] + (try + (let [current-position (consumer/position consumer partition) + next-offset (+ current-position skip-amount)] + (consumer/seek! consumer partition next-offset) + (log/info logger "[source={}] Incremented offset for partition {} from {} to {} (skip amount: {})" + source-name partition current-position next-offset skip-amount)) + (catch Exception e + (log/error logger "[source={}] Failed to increment offset for partition {}" + source-name partition e))))) + (catch Exception e + (log/error logger "[source={}] Failed to get assigned partitions for offset increment" + source-name e))))) + +(defn- default-poll-error-handler [consumer opts] + (let [records-to-skip (:ketu.source/error-skip-offset-amount opts) + source-name (:ketu/name opts)] + (increment-offsets-for-assigned-partitions! consumer source-name records-to-skip) + [])) + +(defn- get-error-handler [opts] + (let [provided-error-fn (:ketu.source/poll-error-handler opts) + error-handler-fn + (cond + (nil? provided-error-fn) + default-poll-error-handler + + (fn? provided-error-fn) + provided-error-fn + + :else + (do + (log/error logger "[source={}] Invalid :ketu.source/poll-error-handler (must be fn [consumer opts] -> coll), got: %s. Using default error handler." + (type provided-error-fn)) + default-poll-error-handler))] + error-handler-fn)) + (defn- poll-fn [^Consumer consumer should-poll? opts] (when @should-poll? (let [source-name (:ketu/name opts) - poll-timeout-duration (Duration/ofMillis (:ketu.source/poll-timeout-ms opts)) - catching-poll? (:ketu.source.legacy/catching-poll? opts)] - (if catching-poll? - ;TODO Eliminate catching poll ASAP. - ; Just in case of a production issue and generic error handling wasn't implemented yet. - (fn [] - (try - (consumer/poll! consumer poll-timeout-duration) - (catch Exception e - (log/error logger "[source={}] Caught poll exception" source-name e) - []))) - (fn [] - (consumer/poll! consumer poll-timeout-duration)))))) + error-handler-fn (get-error-handler opts) + poll-timeout-duration (Duration/ofMillis (:ketu.source/poll-timeout-ms opts))] + (fn [] + (try + (consumer/poll! consumer poll-timeout-duration) + (catch WakeupException e + (throw e)) + (catch Exception e + (log/error logger "[source={}] Caught poll exception, skipping faulty batch" source-name e) + (error-handler-fn consumer opts))))))) (defn- ->data-fn [{:keys [ketu.source/shape] :as opts}] (cond @@ -99,61 +141,61 @@ (defn- source-existing-consumer [^Consumer consumer out-chan opts] - (let [source-name (:ketu/name opts) - ^String thread-name (str "ketu-source-" source-name) - close-out-chan? (:ketu.source/close-out-chan? opts) - ^long close-consumer? (:ketu.source/close-consumer? opts) + (let [source-name (:ketu/name opts) + ^String thread-name (str "ketu-source-" source-name) + close-out-chan? (:ketu.source/close-out-chan? opts) + ^long close-consumer? (:ketu.source/close-consumer? opts) consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts) - should-poll? (volatile! true) - abort-pending-put (async/chan) - done-putting (async/chan) - subscribe! (or (subscribe-fn opts) (assign-fn opts)) - poll-impl (poll-fn consumer should-poll? opts) - poll! (if (some? (:ketu.source/consumer-decorator opts)) - (consumer-decorator/decorate-poll-fn {:ketu.source/consumer consumer} poll-impl opts) - poll-impl) - ->data (->data-fn opts) - put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put)) + should-poll? (volatile! true) + abort-pending-put (async/chan) + done-putting (async/chan) + subscribe! (or (subscribe-fn opts) (assign-fn opts)) + poll-impl (poll-fn consumer should-poll? opts) + poll! (if (some? (:ketu.source/consumer-decorator opts)) + (consumer-decorator/decorate-poll-fn {:ketu.source/consumer consumer} poll-impl opts) + poll-impl) + ->data (->data-fn opts) + put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put)) consumer-thread - (async/thread - (try - (.info logger "[source={}] Start consumer thread" source-name) - (.setName (Thread/currentThread) thread-name) - - (subscribe! consumer) - - (loop [] - (when-let [records (poll!)] - (run! put! records) - (recur))) - - (catch WakeupException e - ; We wakeup the consumer on graceful shutdown after should-poll? is false. - ; If it's not false, somebody else woke the consumer up unexpectedly. - (when @should-poll? - (log/error logger "[source={}] Unexpected consumer wakeup" source-name e))) - (catch Exception e - (log/error logger "[source={}] Unrecoverable consumer error" source-name e)) - (finally - (log/info logger "[source={}] Done consuming" source-name) - (async/close! done-putting) - (when close-out-chan? - (log/info logger "[source={}] Close out channel" source-name) - (async/close! out-chan)) - (when close-consumer? - (log/info logger "[source={}] Close consumer" source-name) - (consumer/close! consumer consumer-close-timeout-ms)) - (log/info logger "[source={}] Exit consumer thread" source-name)))) - - - state {:ketu/source-opts opts - :ketu.source/out-chan out-chan - :ketu.source/consumer consumer - :ketu.source/should-poll? should-poll? - :ketu.source/abort-pending-put abort-pending-put - :ketu.source/done-putting done-putting - :ketu.source/consumer-thread consumer-thread}] + (async/thread + (try + (.info logger "[source={}] Start consumer thread" source-name) + (.setName (Thread/currentThread) thread-name) + + (subscribe! consumer) + + (loop [] + (when-let [records (poll!)] + (run! put! records) + (recur))) + + (catch WakeupException e + ; We wakeup the consumer on graceful shutdown after should-poll? is false. + ; If it's not false, somebody else woke the consumer up unexpectedly. + (when @should-poll? + (log/error logger "[source={}] Unexpected consumer wakeup" source-name e))) + (catch Exception e + (log/error logger "[source={}] Unrecoverable consumer error" source-name e)) + (finally + (log/info logger "[source={}] Done consuming" source-name) + (async/close! done-putting) + (when close-out-chan? + (log/info logger "[source={}] Close out channel" source-name) + (async/close! out-chan)) + (when close-consumer? + (log/info logger "[source={}] Close consumer" source-name) + (consumer/close! consumer consumer-close-timeout-ms)) + (log/info logger "[source={}] Exit consumer thread" source-name)))) + + + state {:ketu/source-opts opts + :ketu.source/out-chan out-chan + :ketu.source/consumer consumer + :ketu.source/should-poll? should-poll? + :ketu.source/abort-pending-put abort-pending-put + :ketu.source/done-putting done-putting + :ketu.source/consumer-thread consumer-thread}] state)) @@ -161,18 +203,18 @@ "Starts consuming into a channel. Returns a state map including out-chan. Pass it to the `stop!` function when done." [ch opts] - (let [opts (ketu.spec/assert-and-conform :ketu/public-source-opts opts) - opts (finalize-opts opts) - source-name (:ketu/name opts) + (let [opts (ketu.spec/assert-and-conform :ketu/public-source-opts opts) + opts (finalize-opts opts) + source-name (:ketu/name opts) consumer-supplier (:ketu.source/consumer-supplier opts) - consumer (try - (consumer-supplier opts) - (catch Exception e - (log/error logger "[source={}] Error creating consumer-source" source-name e) - (when (opts :ketu.source/close-out-chan?) - (log/warn logger "[source={}] Close consumer channel" source-name) - (async/close! ch)) - (throw e)))] + consumer (try + (consumer-supplier opts) + (catch Exception e + (log/error logger "[source={}] Error creating consumer-source" source-name e) + (when (opts :ketu.source/close-out-chan?) + (log/warn logger "[source={}] Close consumer channel" source-name) + (async/close! ch)) + (throw e)))] (try (source-existing-consumer consumer ch opts) (catch Exception e @@ -194,9 +236,9 @@ (consumer/wakeup! consumer))) (defn- wait-for-put! [state] - (let [done-putting (:ketu.source/done-putting state) + (let [done-putting (:ketu.source/done-putting state) done-putting-timeout-ms (-> state :ketu/source-opts :ketu.source/done-putting-timeout-ms) - timeout (async/timeout done-putting-timeout-ms)] + timeout (async/timeout done-putting-timeout-ms)] (async/alts!! [done-putting timeout] :priority true))) (defn- abort-pending-put! [state] @@ -212,9 +254,9 @@ (:ketu.source/consumer-thread state)) (defn- wait-for-the-thread! [state] - (let [consumer-thread (:ketu.source/consumer-thread state) + (let [consumer-thread (:ketu.source/consumer-thread state) consumer-thread-timeout-ms (-> state :ketu/source-opts :ketu.source/consumer-thread-timeout-ms) - timeout (async/timeout consumer-thread-timeout-ms)] + timeout (async/timeout consumer-thread-timeout-ms)] (async/alts!! [consumer-thread timeout] :priority true))) (defn stop! [state] diff --git a/test/ketu/async/source_test.clj b/test/ketu/async/source_test.clj index 8b0a678..a92a10b 100644 --- a/test/ketu/async/source_test.clj +++ b/test/ketu/async/source_test.clj @@ -2,6 +2,7 @@ (:require [clojure.test :refer [deftest testing is]] [clojure.core.async :as async] [clojure.core.async.impl.protocols] + [clojure.string :as str] [ketu.test.log :as log] [ketu.test.util :as u] [ketu.clients.consumer :as consumer] @@ -92,7 +93,7 @@ (deftest shape (testing "Put ConsumerRecord objects by default" - (let [record (ConsumerRecord. "topic" 0 0 "k" "v") + (let [record (ConsumerRecord. "topic" 0 1 "k" "v") consumer (doto (mock-consumer "topic") (add-record record)) ch (async/chan) @@ -140,22 +141,67 @@ [:info "[source=test] Exit consumer thread"]] (log/events log-ctx)))))))) -(deftest unrecoverable-exception-logs - (testing "Throw unrecoverable exception on first poll" - (log/with-test-appender - (log/ns-logger 'ketu.async.source) - (fn [log-ctx] - (let [consumer (doto (mock-consumer) - (.setPollException (KafkaException. "test exception"))) - ch (async/chan) - source (source/source ch {:name "test" - :topic "test-topic" - :ketu.source/consumer-supplier (constantly consumer)})] - (u/try-take! (source/done-chan source)) - (is (= [[:info "[source=test] Start consumer thread"] - [:error "[source=test] Unrecoverable consumer error"] - [:info "[source=test] Done consuming"] - [:info "[source=test] Close out channel"] - [:info "[source=test] Close consumer"] - [:info "[source=test] Exit consumer thread"]] - (log/events log-ctx)))))))) +(deftest poll-catch-fn + (testing "Custom catch function is called, receives correct parameters, and can return empty collection" + (let [received-opts (atom nil) + topic "test-topic" + partition (consumer/topic-partition topic 0) + poll-error-handler (fn [consumer opts] + (reset! received-opts opts) + (consumer/seek! consumer partition 1) + []) ; Return empty collection + consumer (doto (mock-consumer topic) + (.setPollException (KafkaException. "test exception"))) + ch (async/chan) + opts {:name "test" + :topic topic + :ketu.source/consumer-supplier (constantly consumer) + :ketu.source/poll-error-handler poll-error-handler + :ketu.source/close-out-chan? false + :custom-opt "custom-value"} + source (source/source ch opts)] + (add-record consumer (ConsumerRecord. topic 0 0 "test-key" "test-value")) + (Thread/sleep 100) + (is (= "custom-value" (:custom-opt @received-opts))) + (is (not (channel-closed? ch))) + (let [timeout-ch (async/timeout 100) + [item _] (async/alts!! [ch timeout-ch] :priority true)] + (is (nil? item) "Catch function returns [], so no records should be put on channel")) + (source/stop! source))) + + (testing "Default catch function handles faulty message gracefully and then processes healthy message" + (let [orig-poll consumer/poll!] + (with-redefs [ketu.clients.consumer/poll! + (fn [c t] + (let [records (orig-poll c t)] + (if (some #(or (= "faulty-key" (.key %)) + (= "faulty-value" (.value %))) + records) + (do + ;; Reset position to 0 to simulate that we are stuck at the faulty record + ;; Since orig-poll advanced it, we must rewind for the test logic to be valid. + (consumer/seek! c (consumer/topic-partition "test-topic" 0) 0) + (throw (KafkaException. "Simulated corruption"))) + records)))] + (log/with-test-appender + (log/ns-logger 'ketu.async.source) + (fn [log-ctx] + (let [consumer (mock-consumer "test-topic") + ch (async/chan) + source (source/source ch {:name "test" + :topic "test-topic" + :ketu.source/consumer-supplier (constantly consumer) + :ketu.source/close-out-chan? false})] + (add-record consumer (ConsumerRecord. "test-topic" 0 0 "faulty-key" "faulty-value")) + (Thread/sleep 100) + (is (some #(and (= :info (first %)) + (str/includes? (second %) "Incremented offset for partition")) + (log/events log-ctx)) + "Default handler should skip faulty batch by incrementing partition offset") + (add-record consumer (ConsumerRecord. "test-topic" 0 1 "healthy-key" "healthy-value")) + (Thread/sleep 100) + (let [received-record (u/try-take! ch)] + (is (= "healthy-key" (.key received-record))) + (is (= "healthy-value" (.value received-record))) + (is (= 1 (.offset received-record)))) + (source/stop! source)))))))) \ No newline at end of file