Skip to content

KAFKA-20678: Support share group DLQ remote record fetch.#22601

Open
smjn wants to merge 2 commits into
apache:trunkfrom
smjn:KAFKA-20678
Open

KAFKA-20678: Support share group DLQ remote record fetch.#22601
smjn wants to merge 2 commits into
apache:trunkfrom
smjn:KAFKA-20678

Conversation

@smjn

@smjn smjn commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator
  • In this PR, we have extracted the record fetch logic from
    ShareGroupDLQStateManager and encapsulated it in
    ShareGroupDLQRecordFetcher.
  • The fetcher also gets the ability to fetch the records from the remote
    tier and then returns the combined record map via its public fetch
    method.
  • Since the remote record fetch is inherently async, there is a change
    in the way the records are populated:
    • Earlier records population was lazy in that it was invoked when
      handlers were coalesced. Now, we will issue the record fetch call when
      the handler is enqueued. Otherwise, record population will block the
      Sender thread.
    • Furthermore, we also require storing additional state in the fetcher
      so
      that local and remote records could be held until combined.
      This is also divergent from previous code.
  • Lastly, we have added tests to ShareGroupDLQRecordFetcherTest and
    ReplicaManagerLogReaderTest.

@github-actions github-actions Bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Jun 17, 2026
@smjn

smjn commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator Author

The record fetcher pseudo code is:

state:
        endOffset = param.lastOffset
        recordMap = {}                       // offset -> Record (sparse)
        result    = new CompletableFuture()

    fetch() -> Future<Map>:                  // public
        try: runFrom(param.firstOffset)
        catch: result.complete({})           // empty on err
        return result

    runFrom(offset):                         // synchronous loop
        while offset <= endOffset:
            adv = fetchLocal(offset)
            if adv.isEmpty: return            
            offset = adv.get
        complete()

   fetchLocal(readFrom) -> OptionalLong:     // empty = stop, present = next offset
        res = logReader.read(readFrom)
        if res == null or res.error: result.complete({}); return empty   // abort
        if res.isTiered: return fetchRemote(res.descriptor, readFrom)     // delegate
        return advanceOrStop(collect(res.records, readFrom), readFrom)

    fetchRemote(descriptor, readFrom) -> OptionalLong:
        f = logReader.readRemote(descriptor)                 // async
        if not f.isDone:
            f.whenComplete((data, ex) -> resumeRemote(readFrom, data, ex))
            return empty                                     // resume from callback
        return advanceOrStop(process(readFrom, f.getNow, f.error), readFrom)

    resumeRemote(readFrom, data, ex):         // runs on reader thread, after runFrom returned
        try:
            adv = process(readFrom, data, ex)
            if adv <= readFrom: complete()
            else:               runFrom(adv)  // resume; fresh stack
        catch: result.complete({})

    process(readFrom, data, ex) -> long:
        records = (ex != null or data == null) ? EMPTY : data.records   // skip on failure
        return collect(records, readFrom)

    collect(records, readFrom) -> long:       // fill recordMap, return next offset
        next = readFrom
        for r in records:
            if r.offset < readFrom: skip
            if r.offset > endOffset: break
            recordMap[r.offset] = r
            next = max(next, r.offset + 1)
        return next

    advanceOrStop(advanced, readFrom) -> OptionalLong:   // inlined in real code
        if advanced <= readFrom: complete(); return empty   // no progress
        return of(advanced)

    complete():
        result.complete(immutableCopy(recordMap))           //missing offsets => absent

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved core Kafka Broker KIP-932 Queues for Kafka triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant