Skip to content

Conversation

@jordepic
Copy link

@jordepic jordepic commented Oct 22, 2025

Description

Prior to this change, Trino's Kafka connector reads from dynamically loaded offsets fetched at query time. This is not conducive to snapshot reads, which can only be achieved by specifying a start and end offset per topic partition to read from.

In this change, we enable doing so by setting
kafka.override-topic-partition-offsets to true in the kafka catalog config file.

From there, we can use the session level setting,
kafka_topic_partition_offset_overrides, in order to specify manual overrides for a set of topic partitions. If specified, these will override the existing offsets that the connector loads at runtime.

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
() Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text:

Add support for reading user-specified offsets from the kafka connector. See kafka docs for details.

Summary by Sourcery

Enable snapshot-style reads from Kafka by allowing users to specify manual start and end offsets per topic partition via a new catalog flag and session property

New Features:

  • Add config property 'kafka.override-topic-partition-offsets' to require explicit offset overrides for Kafka reads
  • Introduce session property 'kafka_topic_partition_offset_overrides' to specify custom offset ranges per topic-partition

Enhancements:

  • Introduce TopicPartitionOffsetProvider interface with Current and Overridden implementations to centralize offset retrieval
  • Refactor KafkaSplitManager and KafkaFilterManager to use unified KafkaPartitionInputs record

Documentation:

  • Document the new offset override config and session property in the Kafka connector documentation

Tests:

  • Add unit tests for OverriddenTopicPartitionOffsetProvider and update KafkaConfig tests to cover the new catalog setting

@cla-bot
Copy link

cla-bot bot commented Oct 22, 2025

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

@sourcery-ai
Copy link

sourcery-ai bot commented Oct 22, 2025

Reviewer's Guide

This PR implements a pluggable offset‐range provider in the Kafka connector, allowing users to supply explicit start/end offsets per partition (snapshot reads). It introduces a TopicPartitionOffsetProvider interface with current and overridden implementations, wires it through configuration and session properties, refactors split generation and filtering to use a unified KafkaPartitionInputs record, updates related config/session classes and docs, and adds tests.

ER diagram for Kafka connector offset configuration properties

erDiagram
    KAFKA_CONFIG ||--o{ KAFKA_SESSION_PROPERTY : has
    KAFKA_CONFIG {
        boolean overrideTopicPartitionOffsets
    }
    KAFKA_SESSION_PROPERTY {
        string KAFKA_TOPIC_PARTITION_OFFSET_OVERRIDES
    }
Loading

Class diagram for new and updated Kafka offset provider types

classDiagram
    class TopicPartitionOffsetProvider {
        <<interface>>
        +offsetRangesPerPartition(KafkaConsumer, String, ConnectorSession) KafkaPartitionInputs
    }
    class CurrentTopicPartitionOffsetProvider {
        +offsetRangesPerPartition(KafkaConsumer, String, ConnectorSession) KafkaPartitionInputs
    }
    class OverriddenTopicPartitionOffsetProvider {
        +offsetRangesPerPartition(KafkaConsumer, String, ConnectorSession) KafkaPartitionInputs
        +parseOverriddenInputs(String) KafkaPartitionInputs
    }
    TopicPartitionOffsetProvider <|.. CurrentTopicPartitionOffsetProvider
    TopicPartitionOffsetProvider <|.. OverriddenTopicPartitionOffsetProvider
    class KafkaPartitionInputs {
        +partitionInfos: List<PartitionInfo>
        +partitionBeginOffsets: Map<TopicPartition, Long>
        +partitionEndOffsets: Map<TopicPartition, Long>
    }
Loading

Class diagram for updated KafkaSplitManager and related classes

classDiagram
    class KafkaSplitManager {
        -consumerFactory: KafkaConsumerFactory
        -messagesPerSplit: int
        -kafkaFilterManager: KafkaFilterManager
        -contentSchemaProvider: ContentSchemaProvider
        -topicPartitionOffsetProvider: TopicPartitionOffsetProvider
        +getSplits(...)
        +toTopicPartition(PartitionInfo) TopicPartition
    }
    class KafkaFilterManager {
        +getKafkaFilterResult(session, kafkaTableHandle, KafkaPartitionInputs) KafkaPartitionInputs
    }
    KafkaSplitManager --> TopicPartitionOffsetProvider
    KafkaSplitManager --> KafkaFilterManager
    KafkaSplitManager --> ContentSchemaProvider
    KafkaSplitManager --> KafkaConsumerFactory
    KafkaSplitManager --> KafkaPartitionInputs
    KafkaFilterManager --> KafkaPartitionInputs
Loading

Class diagram for updated KafkaConfig and KafkaSessionProperties

classDiagram
    class KafkaConfig {
        +overrideTopicPartitionOffsets: boolean
        +isOverrideTopicPartitionOffsets() boolean
        +setOverrideTopicPartitionOffsets(boolean) KafkaConfig
    }
    class KafkaSessionProperties {
        +KAFKA_TOPIC_PARTITION_OFFSET_OVERRIDES: String
        +getSessionProperties() List<PropertyMetadata<?>>
    }
Loading

File-Level Changes

Change Details Files
Introduce pluggable offset provider abstraction
  • Define TopicPartitionOffsetProvider interface
  • Implement CurrentTopicPartitionOffsetProvider
  • Implement OverriddenTopicPartitionOffsetProvider to parse and merge session overrides
  • Bind appropriate provider in KafkaConnectorModule based on config
TopicPartitionOffsetProvider.java
CurrentTopicPartitionOffsetProvider.java
OverriddenTopicPartitionOffsetProvider.java
KafkaConnectorModule.java
Enhance connector and session configs for overrides
  • Add kafka.override-topic-partition-offsets flag in KafkaConfig
  • Add kafka_topic_partition_offset_overrides session property in KafkaSessionProperties
  • Update TestKafkaConfig to cover new config flag
KafkaConfig.java
KafkaSessionProperties.java
TestKafkaConfig.java
Refactor split manager and filter logic to use unified inputs
  • Inject TopicPartitionOffsetProvider into KafkaSplitManager
  • Replace direct beginning/end offset calls with provider.offsetRangesPerPartition
  • Rename KafkaFilteringResult to KafkaPartitionInputs and update KafkaFilterManager signature
  • Adjust split loop to consume KafkaPartitionInputs instead of raw offsets
KafkaSplitManager.java
KafkaFilterManager.java
KafkaPartitionInputs.java
Update documentation for manual offset overrides
  • Add kafka.override-topic-partition-offsets description and usage example
  • Document session parameter syntax
  • Fix table formatting in connector docs
docs/src/main/sphinx/connector/kafka.md
Add and update tests for override functionality
  • Add TestOverriddenTopicPartitionOffsetProvider to validate parsing logic
  • Update catalog properties for default override setting
TestOverriddenTopicPartitionOffsetProvider.java
testing/trino-server-dev/etc/catalog/kafka.properties

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@github-actions github-actions bot added docs kafka Kafka connector labels Oct 22, 2025
Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • OverriddenTopicPartitionOffsetProvider.parseOverriddenInputs splits on all hyphens, which will break for topic names containing hyphens; update parsing to split at the last hyphen (e.g. via lastIndexOf) so you correctly extract the partition and preserve full topic names.
  • Add explicit validation and clear error messages when the session override string is malformed (e.g. missing ‘=’ or incorrect range format) rather than letting generic parse exceptions bubble up.
  • Introduce a unit test for CurrentTopicPartitionOffsetProvider.offsetRangesPerPartition to verify it returns the expected partition list and begin/end offsets, similar to the coverage added for the overridden provider.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- OverriddenTopicPartitionOffsetProvider.parseOverriddenInputs splits on all hyphens, which will break for topic names containing hyphens; update parsing to split at the last hyphen (e.g. via lastIndexOf) so you correctly extract the partition and preserve full topic names.
- Add explicit validation and clear error messages when the session override string is malformed (e.g. missing ‘=’ or incorrect range format) rather than letting generic parse exceptions bubble up.
- Introduce a unit test for CurrentTopicPartitionOffsetProvider.offsetRangesPerPartition to verify it returns the expected partition list and begin/end offsets, similar to the coverage added for the overridden provider.

## Individual Comments

### Comment 1
<location> `plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/OverriddenTopicPartitionOffsetProvider.java:55` </location>
<code_context>
+
+    public static KafkaPartitionInputs parseOverriddenInputs(String inputString)
+    {
+        List<String> perTopicOffsets = Arrays.stream(inputString.replace(" ", "").split(",")).toList();
+        Map<TopicPartition, Long> overriddenPartitionBeginOffsets = perTopicOffsets.stream()
+                .map(aEntry -> aEntry.split(PARTITION_TO_OFFSET_DELIMITER))
</code_context>

<issue_to_address>
**issue:** Input parsing does not validate malformed or empty entries.

Filter out empty entries and validate each entry's format before processing to prevent runtime exceptions.
</issue_to_address>

### Comment 2
<location> `plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/OverriddenTopicPartitionOffsetProvider.java:37` </location>
<code_context>
+
+    private static TopicPartition parseTopicPartition(String topicPartitionString)
+    {
+        String[] parts = topicPartitionString.split(DELIMITER);
+        return new TopicPartition(parts[0], Integer.parseInt(parts[1]));
+    }
</code_context>

<issue_to_address>
**issue (bug_risk):** No bounds checking on split results for topic partition parsing.

Accessing parts[0] and parts[1] without validation may cause an ArrayIndexOutOfBoundsException if the input is malformed. Please add checks to handle invalid input gracefully.
</issue_to_address>

### Comment 3
<location> `plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/OverriddenTopicPartitionOffsetProvider.java:43` </location>
<code_context>
+
+    private static long parseBeginningOffset(String offsetString)
+    {
+        String[] parts = offsetString.split(DELIMITER);
+        return Long.parseLong(parts[0]);
+    }
</code_context>

<issue_to_address>
**issue:** Offset parsing assumes both beginning and end offsets are present.

Validate that the split result contains both expected parts and throw a descriptive error if the format is invalid.
</issue_to_address>

### Comment 4
<location> `plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestOverriddenTopicPartitionOffsetProvider.java:23-24` </location>
<code_context>
+                .setOverrideTopicPartitionOffsets(false));
     }

     @Test
</code_context>

<issue_to_address>
**suggestion (testing):** Missing tests for invalid or malformed offset override strings.

Add tests for cases like missing delimiters, non-numeric offsets, empty strings, and duplicate topic-partition entries to ensure robust error handling in `parseOverriddenInputs`.
</issue_to_address>

### Comment 5
<location> `plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConfig.java:76-80` </location>
<code_context>
                 .put("kafka.timestamp-upper-bound-force-push-down-enabled", "true")
                 .put("kafka.config.resources", resource1.toString() + "," + resource2.toString())
                 .put("kafka.internal-column-prefix", "the_most_unexpected_prefix_")
+                .put("kafka.override-topic-partition-offsets", "true")
                 .buildOrThrow();

</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding a test for invalid values for 'kafka.override-topic-partition-offsets'.

Testing with invalid values will help verify that configuration parsing and error handling are robust.

Suggested implementation:

```java
    @Test
    public void testInvalidOverrideTopicPartitionOffsetsValue()
    {
        assertThrows(IllegalArgumentException.class, () -> {
            KafkaConfig kafkaConfig = new KafkaConfig();
            kafkaConfig
                .setMessagesPerSplit(1)
                .setTimestampUpperBoundPushDownEnabled(true);

            KafkaConfig.builder()
                .put("kafka.timestamp-upper-bound-force-push-down-enabled", "true")
                .put("kafka.config.resources", resource1.toString() + "," + resource2.toString())
                .put("kafka.internal-column-prefix", "the_most_unexpected_prefix_")
                .put("kafka.override-topic-partition-offsets", "not_a_boolean")
                .buildOrThrow();
        });
    }

    @Test
                .put("kafka.timestamp-upper-bound-force-push-down-enabled", "true")
                .put("kafka.config.resources", resource1.toString() + "," + resource2.toString())
                .put("kafka.internal-column-prefix", "the_most_unexpected_prefix_")
                .put("kafka.override-topic-partition-offsets", "true")
                .buildOrThrow();

        KafkaConfig expected = new KafkaConfig()
                .setMessagesPerSplit(1)
                .setTimestampUpperBoundPushDownEnabled(true)

```

- Ensure that `assertThrows` is statically imported from the appropriate test library (e.g., `org.junit.jupiter.api.Assertions.assertThrows`).
- If `KafkaConfig.builder()` or the configuration parsing logic does not throw `IllegalArgumentException` for invalid boolean values, you may need to adjust the exception type or update the parsing logic to enforce this.
- If `resource1` and `resource2` are not available in the test context, you may need to use dummy values or refactor the test setup accordingly.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Prior to this change, Trino's Kafka connector reads from dynamically
loaded offsets fetched at query time. This is not conducive to
snapshot reads, which can only be achieved by specifying a start and
end offset per topic partition to read from.

In this change, we enable doing so by setting
kafka.override-topic-partition-offsets to true in the kafka catalog
config file.

From there, we can use the session level setting,
kafka_topic_partition_offset_overrides, in order to specify manual
overrides for a set of topic partitions. If specified, these
will override the existing offsets that the connector loads at runtime.
@jordepic jordepic force-pushed the trino-kafka-offsets branch from 40037a9 to d5276d5 Compare October 23, 2025 00:27
@cla-bot
Copy link

cla-bot bot commented Oct 23, 2025

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

Comment on lines +230 to +231
To read offsets 0 to 10 (exclusive) on partition 0 and 5 to 20 (exclusive) on partition 1 of topic x:
`SET SESSION kafka.kafka_topic_partition_offset_overrides = 'x-0=0-10,x-1=5-20';`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: I think it'd be awesome to support reading the offset overrides from a file (as well as inlined as string)

@Test
public void testCanParseOverriddenOffsets()
{
KafkaPartitionInputs inputs = OverriddenTopicPartitionOffsetProvider.parseOverriddenInputs("x-0=0-10, y-1=5-20 ");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding additional test cases around new config syntax parse error ("x-0?1-10").

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs kafka Kafka connector

Development

Successfully merging this pull request may close these issues.

2 participants