Skip to content

Commit 38dba2f

Browse files
GH-3542: API for adding record interceptors instead of overriding them (#3937)
Fixes: #3542 * Providing the ability to add record interceptors instead of overriding them Change `RecordInterceptor` to `List<RecordInterceptor>` in `MessageListenerContainer` will allow the addition of multiple `RecordInterceptor` instances instead of overriding the existing one Currently, only a single `RecordInterceptor` is supported. Users may want to register multiple `RecordInterceptors`. There are some workarounds, but they are not clean or ideal solutions. By supporting `List<RecordInterceptor`>, users can add their own interceptors via `setRecordInterceptor(...)`. * Adding new API for addRecordInterceptor. * Addressing PR review Signed-off-by: Sanghyeok An <[email protected]>
1 parent cadd306 commit 38dba2f

File tree

5 files changed

+91
-13
lines changed

5 files changed

+91
-13
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,35 @@ IMPORTANT: If the interceptor mutates the record (by creating a new one), the `t
2222

2323
The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors.
2424

25+
Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` as a public method.
26+
If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` has already been configured.
27+
The following example shows how to do so:
28+
29+
[source, java]
30+
----
31+
public void configureRecordInterceptor(KafkaMessageListenerContainer<Integer, String> container) {
32+
CompositeRecordInterceptor compositeInterceptor;
33+
34+
RecordInterceptor<Integer, String> previousInterceptor = container.getRecordInterceptor();
35+
if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
36+
compositeInterceptor = interceptor;
37+
} else {
38+
compositeInterceptor = new CompositeRecordInterceptor<>();
39+
container.setRecordInterceptor(compositeInterceptor);
40+
}
41+
42+
if (previousInterceptor != null) {
43+
compositeRecordInterceptor.addRecordInterceptor(previousInterceptor);
44+
}
45+
46+
RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};
47+
RecordInterceptor<Integer, String> recordInterceptor2 = new RecordInterceptor() {...};
48+
49+
compositeInterceptor.addRecordInterceptor(recordInterceptor1);
50+
compositeInterceptor.addRecordInterceptor(recordInterceptor2);
51+
}
52+
----
53+
2554
By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started.
2655
You can set the listener container's `interceptBeforeTx` property to `false` to invoke the interceptor after the transaction has started instead.
2756
Starting with version 2.9, this will apply to any transaction manager, not just `KafkaAwareTransactionManager`+++s+++.
@@ -265,4 +294,3 @@ The listener containers implement `SmartLifecycle`, and `autoStartup` is `true`
265294
The containers are started in a late phase (`Integer.MAX-VALUE - 100`).
266295
Other components that implement `SmartLifecycle`, to handle data from listeners, should be started in an earlier phase.
267296
The `- 100` leaves room for later phases to enable components to be auto-started after the containers.
268-

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,9 @@ For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-reba
7676

7777
The `DefaultKafkaHeaderMapper` and `SimpleKafkaHeaderMapper` support multi-value header mapping for Kafka records.
7878
More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header mapping].
79+
80+
[[x40-add-record-interceptor]]
81+
=== Configure additional `RecordInterceptor`
82+
83+
Listener containers now support interceptor customization via `getRecordInterceptor()`.
84+
See the xref:kafka/receiving-messages/message-listener-container.adoc#message-listener-container[Message Listener Containers] section for details.

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,12 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
460460
this.kafkaAdmin = kafkaAdmin;
461461
}
462462

463-
protected @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
463+
/**
464+
* Get the {@link RecordInterceptor} for modification, if configured.
465+
* @return the {@link RecordInterceptor}, or {@code null} if not configured
466+
* @since 4.0
467+
*/
468+
public @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
464469
return this.recordInterceptor;
465470
}
466471

spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
*
3636
* @author Artem Bilan
3737
* @author Gary Russell
38+
* @author Sanghyeok An
3839
* @since 2.3
3940
*
4041
*/
@@ -92,4 +93,13 @@ public void afterRecord(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
9293
this.delegates.forEach(del -> del.afterRecord(record, consumer));
9394
}
9495

96+
/**
97+
* Add an {@link RecordInterceptor} to delegates.
98+
* @param recordInterceptor the interceptor.
99+
* @since 4.0
100+
*/
101+
public void addRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
102+
this.delegates.add(recordInterceptor);
103+
}
104+
95105
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3842,7 +3842,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
38423842
containerProps.setClientId("clientId");
38433843

38443844
CountDownLatch afterLatch = new CountDownLatch(1);
3845-
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {
3845+
RecordInterceptor<Integer, String> recordInterceptor1 = spy(new RecordInterceptor<Integer, String>() {
38463846

38473847
@Override
38483848
public @NonNull ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
@@ -3858,25 +3858,54 @@ public void clearThreadState(Consumer<?, ?> consumer) {
38583858

38593859
});
38603860

3861+
RecordInterceptor<Integer, String> recordInterceptor2 = spy(new RecordInterceptor<Integer, String>() {
3862+
3863+
@Override
3864+
public @NonNull ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
3865+
Consumer<Integer, String> consumer) {
3866+
3867+
return record;
3868+
}
3869+
3870+
@Override
3871+
public void clearThreadState(Consumer<?, ?> consumer) {
3872+
afterLatch.countDown();
3873+
}
3874+
3875+
});
3876+
38613877
KafkaMessageListenerContainer<Integer, String> container =
38623878
new KafkaMessageListenerContainer<>(cf, containerProps);
3863-
container.setRecordInterceptor(recordInterceptor);
3879+
container.setRecordInterceptor(new CompositeRecordInterceptor<>());
3880+
if (container.getRecordInterceptor() instanceof CompositeRecordInterceptor<Integer, String> composite) {
3881+
composite.addRecordInterceptor(recordInterceptor1);
3882+
composite.addRecordInterceptor(recordInterceptor2);
3883+
}
3884+
38643885
container.start();
38653886
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
38663887
assertThat(afterLatch.await(10, TimeUnit.SECONDS)).isTrue();
38673888

3868-
InOrder inOrder = inOrder(recordInterceptor, messageListener, consumer);
3869-
inOrder.verify(recordInterceptor).setupThreadState(eq(consumer));
3889+
InOrder inOrder = inOrder(recordInterceptor1, recordInterceptor2, messageListener, consumer);
3890+
inOrder.verify(recordInterceptor1).setupThreadState(eq(consumer));
3891+
inOrder.verify(recordInterceptor2).setupThreadState(eq(consumer));
38703892
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
3871-
inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer));
3893+
inOrder.verify(recordInterceptor1).intercept(eq(firstRecord), eq(consumer));
3894+
inOrder.verify(recordInterceptor2).intercept(eq(firstRecord), eq(consumer));
38723895
inOrder.verify(messageListener).onMessage(eq(firstRecord));
3873-
inOrder.verify(recordInterceptor).success(eq(firstRecord), eq(consumer));
3874-
inOrder.verify(recordInterceptor).afterRecord(eq(firstRecord), eq(consumer));
3875-
inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer));
3896+
inOrder.verify(recordInterceptor1).success(eq(firstRecord), eq(consumer));
3897+
inOrder.verify(recordInterceptor2).success(eq(firstRecord), eq(consumer));
3898+
inOrder.verify(recordInterceptor1).afterRecord(eq(firstRecord), eq(consumer));
3899+
inOrder.verify(recordInterceptor2).afterRecord(eq(firstRecord), eq(consumer));
3900+
inOrder.verify(recordInterceptor1).intercept(eq(secondRecord), eq(consumer));
3901+
inOrder.verify(recordInterceptor2).intercept(eq(secondRecord), eq(consumer));
38763902
inOrder.verify(messageListener).onMessage(eq(secondRecord));
3877-
inOrder.verify(recordInterceptor).success(eq(secondRecord), eq(consumer));
3878-
inOrder.verify(recordInterceptor).afterRecord(eq(secondRecord), eq(consumer));
3879-
inOrder.verify(recordInterceptor).clearThreadState(eq(consumer));
3903+
inOrder.verify(recordInterceptor1).success(eq(secondRecord), eq(consumer));
3904+
inOrder.verify(recordInterceptor2).success(eq(secondRecord), eq(consumer));
3905+
inOrder.verify(recordInterceptor1).afterRecord(eq(secondRecord), eq(consumer));
3906+
inOrder.verify(recordInterceptor2).afterRecord(eq(secondRecord), eq(consumer));
3907+
inOrder.verify(recordInterceptor1).clearThreadState(eq(consumer));
3908+
inOrder.verify(recordInterceptor2).clearThreadState(eq(consumer));
38803909
container.stop();
38813910
}
38823911

0 commit comments

Comments
 (0)