Skip to content

Provide API hooks for customizing the SubscribableKafkaMessageSource's RuntimeErrorHandler #567

@filpano

Description

@filpano

Enhancement Description

The current iteration of the SubscribableKafkaMessageSource supports extending some, but not all functionality via its builder pattern, e.g. here

Notably however, it does not support easily extending or overriding the RuntimeErrorHandler which is centrally used when an exception is propagated up to the fetcher thread, see here.

In our use case, we have a product which makes use of the Axon Kafka extension to consume Kafka records. Here we would like to mark all errors during event handling as fatal errors and as a result fail consumption. In other words, we would rather have no data than incorrect data in this instance.

Here it would be very useful to be able to customize the RuntimeErrorHandler and provide something like a back-off period so that new consumers aren't (re-)started in an infinite loop.

Current Behaviour

The current behaviour immediatelly restarts a consumer upon encountering a fatal exception:

    private RuntimeErrorHandler restartOnError(int consumerIndex) {
        return e -> {
            logger.warn("Consumer had a fatal exception, starting a new one", e);
            addConsumer(consumerIndex);
        };
    }

this approach works well in general, as most transient fatal errors will eventually subside.

Wanted Behaviour

Ideally, the SubscribableKafkaMessageSource.Builder should support adding a custom RuntimeErrorHandler:

public static class Builder<K, V> extends TopicSubscriberBuilder<Builder<K, V>> {

        // ... other builder fields    
        private RuntimeErrorHandler errorHandler;


        public Builder<K, V> errorHandler(RuntimeErrorhandler errorHandler) {
            // Note: might be null to fallback to current implementation
            this.errorHandler = errorHandler;
            return this;
        }

usage would then be:

    protected SubscribableKafkaMessageSource(Builder<K, V> builder) {
        builder.validate();
        // ... other fields
        this.errorHandler = builder.errorHandler;
    }

and:

    private void addConsumer(int consumerIndex) {
        Consumer<K, V> consumer = consumerFactory.createConsumer(groupId);
        subscriber.subscribeTopics(consumer);

        Registration closeConsumer = fetcher.poll(
                consumer,
                consumerRecords -> StreamSupport.stream(consumerRecords.spliterator(), false)
                                                .map(messageConverter::readKafkaMessage)
                                                .filter(Optional::isPresent)
                                                .map(Optional::get)
                                                .collect(Collectors.toList()),
                eventMessages -> eventProcessors.forEach(eventProcessor -> eventProcessor.accept(eventMessages)),
                // Updated line
                this.errorHandler != null ? this.errorHandler : restartOnError(consumerIndex) 
        );
        fetcherRegistrations.put(consumerIndex, closeConsumer);
    }

Possible Workarounds

Currently, only explicitly extending the SubscribableKafkaMessageSource (with significant code duplication) would allow getting the same behaviour.

Metadata

Metadata

Assignees

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions