Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,14 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it.
| :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config |

#### Consumer-source options
| Key | Type | Req? | Notes |
|---------------------------------|-----------------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) |

| Key | Type | Req? | Notes |
|---------------------------------------|-----------------------------------------------------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) |
| :ketu.source/poll-error-handler | `(fn [^Consumer consumer opts] ...)` | optional | Called when `poll` throws (non-wakeup) exception; should return a (possibly empty) collection of records. May mutate consumer (e.g. `seek`) and/or opts (consumer options) |
| :ketu.source/error-skip-offset-amount | int | optional | Number of records to skip on a poll exception. If not set, falls back to Kafka `max.poll.records` from `:internal-config` (string or numeric), else 1 |

#### Producer-sink options

Expand Down Expand Up @@ -145,7 +148,8 @@ for example when managing the offset manually, auto-commit should usually set to

In this example we use the decorator to run commands in the polling thread context.
The consumer is paused/resumed based on commands sent from the application.
The decorator processes all immediately available commands in the commands-chan, and only then calls (poll-fn).
The decorator processes all immediately available commands in the commands-chan, and only then calls (poll-fn).

```clojure
(ns consumer-decorator-example
(:require [clojure.core.async :as async]
Expand Down Expand Up @@ -226,9 +230,10 @@ The decorator processes all immediately available commands in the commands-chan,
```

## Java Kafka client versions

- `ketu` version 1.0.0+ uses `org.apache.kafka/kafka-clients` version 3.3.1
- `ketu` version 2.1.0+ uses `org.apache.kafka/kafka-clients` version 3.9.1
- For a comprehensive list of changes in the java client, see [here](kafka-client-changes-analysis.md)
- For a comprehensive list of changes in the java client, see [here](kafka-client-changes-analysis.md)

## Development & Contribution

Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.appsflyer/ketu "2.1.0"
(defproject com.appsflyer/ketu "2.2.0-SNAPSHOT"
:description "Clojure Apache Kafka client with core.async api"
:url "https://github.com/AppsFlyer/ketu"
:license {:name "Apache License, Version 2.0"
Expand Down
66 changes: 53 additions & 13 deletions src/ketu/async/source.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
[ketu.util.log :as log])
(:import (java.time Duration)
(org.apache.kafka.clients.consumer Consumer)
(org.apache.kafka.common TopicPartition)
(org.apache.kafka.common.errors WakeupException)
(org.apache.kafka.common.serialization Deserializer)
(org.slf4j Logger LoggerFactory)))
Expand Down Expand Up @@ -74,22 +75,61 @@
(fn [consumer]
(consumer/assign! consumer (consumer/topic-partitions topic partitions))))))

(defn- get-max-poll-records-from-opts
"Gets max.poll.records from opts config, defaulting to 1 if not found."
[opts]
(try
(let [config (:ketu.apache.consumer/config opts)
max-poll-records (get config "max.poll.records")]
(if max-poll-records
(if (string? max-poll-records)
(Long/parseLong max-poll-records)
(long max-poll-records))
1))
(catch Exception _
(log/error logger "Failed to get max.poll.records from opts" opts)
1)))

(defn- increment-offsets-for-assigned-partitions!
"Increments the offset by records-to-skip for all assigned partitions to skip faulty messages.
Uses max.poll.records from opts config if records-to-skip is not provided."
([^Consumer consumer source-name opts records-to-skip]
(try
(let [assigned-partitions (consumer/assignment consumer)
records-to-skip (or records-to-skip (get-max-poll-records-from-opts opts))]
(doseq [^TopicPartition partition assigned-partitions]
(try
(let [current-position (consumer/position consumer partition)
next-offset (+ current-position records-to-skip)]
(consumer/seek! consumer partition next-offset)
(log/info logger "[source={}] Incremented offset for partition {} from {} to {} (skip amount: {})"
source-name partition current-position next-offset records-to-skip))
(catch Exception e
(log/error logger "[source={}] Failed to increment offset for partition {}"
source-name partition e)))))
(catch Exception e
(log/error logger "[source={}] Failed to get assigned partitions for offset increment"
source-name e)))))

(defn default-poll-error-handler [consumer opts]
(let [records-to-skip (:ketu.source/error-skip-offset-amount opts)
source-name (:ketu/name opts)]
(increment-offsets-for-assigned-partitions! consumer source-name opts records-to-skip)
[]))

(defn- poll-fn [^Consumer consumer should-poll? opts]
(when @should-poll?
(let [source-name (:ketu/name opts)
poll-timeout-duration (Duration/ofMillis (:ketu.source/poll-timeout-ms opts))
catching-poll? (:ketu.source.legacy/catching-poll? opts)]
(if catching-poll?
;TODO Eliminate catching poll ASAP.
; Just in case of a production issue and generic error handling wasn't implemented yet.
(fn []
(try
(consumer/poll! consumer poll-timeout-duration)
(catch Exception e
(log/error logger "[source={}] Caught poll exception" source-name e)
[])))
(fn []
(consumer/poll! consumer poll-timeout-duration))))))
error-handler (:ketu.source/poll-error-handler opts (default-poll-error-handler consumer opts))
poll-timeout-duration (Duration/ofMillis (:ketu.source/poll-timeout-ms opts))]
(fn []
(try
(consumer/poll! consumer poll-timeout-duration)
(catch WakeupException e
(throw e))
(catch Exception e
(log/error logger "[source={}] Caught poll exception, skipping faulty batch" source-name e)
(error-handler consumer opts)))))))

(defn- ->data-fn [{:keys [ketu.source/shape] :as opts}]
(cond
Expand Down
86 changes: 66 additions & 20 deletions test/ketu/async/source_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(:require [clojure.test :refer [deftest testing is]]
[clojure.core.async :as async]
[clojure.core.async.impl.protocols]
[clojure.string :as str]
[ketu.test.log :as log]
[ketu.test.util :as u]
[ketu.clients.consumer :as consumer]
Expand Down Expand Up @@ -92,7 +93,7 @@

(deftest shape
(testing "Put ConsumerRecord objects by default"
(let [record (ConsumerRecord. "topic" 0 0 "k" "v")
(let [record (ConsumerRecord. "topic" 0 1 "k" "v")
consumer (doto (mock-consumer "topic")
(add-record record))
ch (async/chan)
Expand Down Expand Up @@ -140,22 +141,67 @@
[:info "[source=test] Exit consumer thread"]]
(log/events log-ctx))))))))

(deftest unrecoverable-exception-logs
(testing "Throw unrecoverable exception on first poll"
(log/with-test-appender
(log/ns-logger 'ketu.async.source)
(fn [log-ctx]
(let [consumer (doto (mock-consumer)
(.setPollException (KafkaException. "test exception")))
ch (async/chan)
source (source/source ch {:name "test"
:topic "test-topic"
:ketu.source/consumer-supplier (constantly consumer)})]
(u/try-take! (source/done-chan source))
(is (= [[:info "[source=test] Start consumer thread"]
[:error "[source=test] Unrecoverable consumer error"]
[:info "[source=test] Done consuming"]
[:info "[source=test] Close out channel"]
[:info "[source=test] Close consumer"]
[:info "[source=test] Exit consumer thread"]]
(log/events log-ctx))))))))
(deftest poll-catch-fn
(testing "Custom catch function is called, receives correct parameters, and can return empty collection"
(let [received-opts (atom nil)
topic "test-topic"
partition (consumer/topic-partition topic 0)
poll-error-handler (fn [consumer opts]
(reset! received-opts opts)
(consumer/seek! consumer partition 1)
[]) ; Return empty collection
consumer (doto (mock-consumer topic)
(.setPollException (KafkaException. "test exception")))
ch (async/chan)
opts {:name "test"
:topic topic
:ketu.source/consumer-supplier (constantly consumer)
:ketu.source/poll-error-handler poll-error-handler
:ketu.source/close-out-chan? false
:custom-opt "custom-value"}
source (source/source ch opts)]
(add-record consumer (ConsumerRecord. topic 0 0 "test-key" "test-value"))
(Thread/sleep 100)
(is (= "custom-value" (:custom-opt @received-opts)))
(is (not (channel-closed? ch)))
(let [timeout-ch (async/timeout 100)
[item _] (async/alts!! [ch timeout-ch] :priority true)]
(is (nil? item) "Catch function returns [], so no records should be put on channel"))
(source/stop! source)))

(testing "Default catch function handles faulty message gracefully and then processes healthy message"
(let [orig-poll consumer/poll!]
(with-redefs [ketu.clients.consumer/poll!
(fn [c t]
(let [records (orig-poll c t)]
(if (some #(or (= "faulty-key" (.key %))
(= "faulty-value" (.value %)))
records)
(do
;; Reset position to 0 to simulate that we are stuck at the faulty record
;; Since orig-poll advanced it, we must rewind for the test logic to be valid.
(consumer/seek! c (consumer/topic-partition "test-topic" 0) 0)
(throw (KafkaException. "Simulated corruption")))
records)))]
(log/with-test-appender
(log/ns-logger 'ketu.async.source)
(fn [log-ctx]
(let [consumer (mock-consumer "test-topic")
ch (async/chan)
source (source/source ch {:name "test"
:topic "test-topic"
:ketu.source/consumer-supplier (constantly consumer)
:ketu.source/close-out-chan? false})]
(add-record consumer (ConsumerRecord. "test-topic" 0 0 "faulty-key" "faulty-value"))
(Thread/sleep 100)
(is (some #(and (= :info (first %))
(str/includes? (second %) "Incremented offset for partition"))
(log/events log-ctx))
"Default handler should skip faulty batch by incrementing partition offset")
(add-record consumer (ConsumerRecord. "test-topic" 0 1 "healthy-key" "healthy-value"))
(Thread/sleep 100)
(let [received-record (u/try-take! ch)]
(is (= "healthy-key" (.key received-record)))
(is (= "healthy-value" (.value received-record)))
(is (= 1 (.offset received-record))))
(source/stop! source))))))))