diff --git a/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java b/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java index 71ce7c74ca92d..37cbea45ffc60 100644 --- a/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java +++ b/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java @@ -21,9 +21,12 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.server.log.remote.storage.RemoteLogManager; import org.apache.kafka.server.share.LogReader; import org.apache.kafka.server.storage.log.FetchParams; +import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,11 +34,13 @@ import java.util.LinkedHashMap; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import scala.Tuple2; import scala.collection.Seq; import scala.jdk.javaapi.CollectionConverters; +import scala.jdk.javaapi.OptionConverters; import scala.runtime.BoxedUnit; /** @@ -92,4 +97,37 @@ public LinkedHashMap read( log.trace("Data successfully retrieved by replica manager: {}", responseData); return responseData; } + + @Override + public CompletableFuture readRemote(RemoteStorageFetchInfo remoteStorageFetchInfo) { + CompletableFuture future = new CompletableFuture<>(); + + Optional remoteLogManager = OptionConverters.toJava(replicaManager.remoteLogManager()); + if (remoteLogManager.isEmpty()) { + future.completeExceptionally(new IllegalStateException( + "Cannot read " + remoteStorageFetchInfo + " from remote storage as remote log manager is not configured.")); + return future; + } + + try { + // The read runs on the remote storage reader thread pool; the callback completes the + // future on that pool's thread, so the caller's thread is never blocked on remote IO. + remoteLogManager.get().asyncRead(remoteStorageFetchInfo, result -> { + if (result.error().isPresent()) { + future.completeExceptionally(result.error().get()); + } else if (result.fetchDataInfo().isPresent()) { + future.complete(result.fetchDataInfo().get()); + } else { + future.completeExceptionally(new IllegalStateException( + "Remote read for " + remoteStorageFetchInfo + " returned neither data nor error.")); + } + }); + } catch (Exception e) { + // e.g. RejectedExecutionException if the reader pool is shutting down. + log.warn("Unable to schedule remote read for {}.", remoteStorageFetchInfo, e); + future.completeExceptionally(e); + } + + return future; + } } diff --git a/core/src/test/java/kafka/server/share/ReplicaManagerLogReaderTest.java b/core/src/test/java/kafka/server/share/ReplicaManagerLogReaderTest.java new file mode 100644 index 0000000000000..4a638ecad9fd7 --- /dev/null +++ b/core/src/test/java/kafka/server/share/ReplicaManagerLogReaderTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import kafka.server.ReplicaManager; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.record.internal.MemoryRecords; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.server.log.remote.storage.RemoteLogManager; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.storage.log.FetchParams; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogOffsetMetadata; +import org.apache.kafka.storage.internals.log.LogReadResult; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; + +import org.junit.jupiter.api.Test; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import scala.Option; +import scala.Tuple2; +import scala.collection.immutable.Seq; +import scala.jdk.javaapi.CollectionConverters; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ReplicaManagerLogReaderTest { + + private static final TopicIdPartition TOPIC_ID_PARTITION = + new TopicIdPartition(Uuid.randomUuid(), 0, "topic"); + + private static RemoteStorageFetchInfo remoteStorageFetchInfo() { + return new RemoteStorageFetchInfo( + 1024, + true, + TOPIC_ID_PARTITION, + new FetchRequest.PartitionData(TOPIC_ID_PARTITION.topicId(), 0L, 0L, 1024, Optional.empty()), + FetchIsolation.HIGH_WATERMARK); + } + + private static FetchParams fetchParams() { + return new FetchParams( + FetchRequest.CONSUMER_REPLICA_ID, -1, 0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); + } + + @Test + public void testReadReturnsEmptyWhenNoPartitionsToFetch() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + + ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager); + LinkedHashMap result = + logReader.read(fetchParams(), Set.of(), new LinkedHashMap<>(), new LinkedHashMap<>()); + + assertTrue(result.isEmpty()); + verify(replicaManager, never()).readFromLog(any(), any(), any(), anyBoolean()); + } + + @Test + public void testReadReturnsResultsFromReplicaManager() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + LogReadResult logReadResult = mock(LogReadResult.class); + Seq> readFromLogResult = + CollectionConverters.asScala(List.of(new Tuple2<>(TOPIC_ID_PARTITION, logReadResult))).toSeq(); + when(replicaManager.readFromLog(any(), any(), any(), anyBoolean())).thenReturn(readFromLogResult); + + LinkedHashMap offsets = new LinkedHashMap<>(); + offsets.put(TOPIC_ID_PARTITION, 5L); + LinkedHashMap maxBytes = new LinkedHashMap<>(); + maxBytes.put(TOPIC_ID_PARTITION, 1024); + + ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager); + LinkedHashMap result = + logReader.read(fetchParams(), Set.of(TOPIC_ID_PARTITION), offsets, maxBytes); + + assertEquals(1, result.size()); + assertSame(logReadResult, result.get(TOPIC_ID_PARTITION)); + } + + @Test + public void testReadRemoteCompletesExceptionallyWhenRemoteLogManagerNotConfigured() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + when(replicaManager.remoteLogManager()).thenReturn(Option.empty()); + + ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager); + CompletableFuture future = logReader.readRemote(remoteStorageFetchInfo()); + + assertTrue(future.isCompletedExceptionally()); + ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.SECONDS)); + assertInstanceOf(IllegalStateException.class, exception.getCause()); + } + + @Test + public void testReadRemoteCompletesWithFetchedData() throws Exception { + ReplicaManager replicaManager = mock(ReplicaManager.class); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + FetchDataInfo fetchDataInfo = new FetchDataInfo(new LogOffsetMetadata(0L), MemoryRecords.EMPTY); + RemoteStorageFetchInfo fetchInfo = remoteStorageFetchInfo(); + doAnswer(invocation -> { + Consumer callback = invocation.getArgument(1); + callback.accept(new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty())); + return null; + }).when(remoteLogManager).asyncRead(eq(fetchInfo), any()); + + ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager); + CompletableFuture future = logReader.readRemote(fetchInfo); + + assertSame(fetchDataInfo, future.get(10, TimeUnit.SECONDS)); + } + + @Test + public void testReadRemoteCompletesExceptionallyWhenReadResultHasError() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + RuntimeException readError = new RuntimeException("remote read failed"); + doAnswer(invocation -> { + Consumer callback = invocation.getArgument(1); + callback.accept(new RemoteLogReadResult(Optional.empty(), Optional.of(readError))); + return null; + }).when(remoteLogManager).asyncRead(any(), any()); + + ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager); + CompletableFuture future = logReader.readRemote(remoteStorageFetchInfo()); + + assertTrue(future.isCompletedExceptionally()); + ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.SECONDS)); + assertSame(readError, exception.getCause()); + } + + @Test + public void testReadRemoteCompletesExceptionallyWhenReadResultIsEmpty() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + doAnswer(invocation -> { + Consumer callback = invocation.getArgument(1); + callback.accept(new RemoteLogReadResult(Optional.empty(), Optional.empty())); + return null; + }).when(remoteLogManager).asyncRead(any(), any()); + + ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager); + CompletableFuture future = logReader.readRemote(remoteStorageFetchInfo()); + + assertTrue(future.isCompletedExceptionally()); + ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.SECONDS)); + assertInstanceOf(IllegalStateException.class, exception.getCause()); + } + + @Test + public void testReadRemoteCompletesExceptionallyWhenSchedulingRejected() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + RejectedExecutionException rejected = new RejectedExecutionException("reader pool shutting down"); + doThrow(rejected).when(remoteLogManager).asyncRead(any(), any()); + + ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager); + CompletableFuture future = logReader.readRemote(remoteStorageFetchInfo()); + + assertTrue(future.isCompletedExceptionally()); + ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.SECONDS)); + assertSame(rejected, exception.getCause()); + } +} diff --git a/server/src/main/java/org/apache/kafka/server/share/LogReader.java b/server/src/main/java/org/apache/kafka/server/share/LogReader.java index 60e453e9b8693..9af68abe43044 100644 --- a/server/src/main/java/org/apache/kafka/server/share/LogReader.java +++ b/server/src/main/java/org/apache/kafka/server/share/LogReader.java @@ -18,10 +18,13 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.server.storage.log.FetchParams; +import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import java.util.LinkedHashMap; import java.util.Set; +import java.util.concurrent.CompletableFuture; /** * Abstraction for reading records from log. @@ -42,4 +45,21 @@ LinkedHashMap read( Set partitionsToFetch, LinkedHashMap topicPartitionFetchOffsets, LinkedHashMap partitionMaxBytes); + + /** + * Read records asynchronously from the remote tier for an offset that has been tiered off the + * local log. The {@link RemoteStorageFetchInfo} is the descriptor surfaced by + * {@link LogReadResult#info()} as {@link FetchDataInfo#delayedRemoteStorageFetch} when a + * preceding {@link #read} determined that the requested data resides in remote storage. + * + *

The read is performed off-thread (on the remote storage reader pool) so that the caller's + * thread is not blocked on remote storage IO. It is intended for low volume, best-effort reads. + * The returned future completes exceptionally when remote storage is not configured on the broker + * or the read could not be completed, allowing callers to gracefully skip the data instead of failing. + * + * @param remoteStorageFetchInfo The remote fetch descriptor obtained from a prior local read. + * @return A future that completes with the fetched data, or completes exceptionally if it could + * not be read remotely. + */ + CompletableFuture readRemote(RemoteStorageFetchInfo remoteStorageFetchInfo); } diff --git a/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordFetcher.java b/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordFetcher.java new file mode 100644 index 0000000000000..9943c0412e3f7 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordFetcher.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.dlq; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.internal.MemoryRecords; +import org.apache.kafka.common.record.internal.Record; +import org.apache.kafka.common.record.internal.RecordBatch; +import org.apache.kafka.common.record.internal.Records; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.share.LogReader; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.storage.log.FetchParams; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** + * Reads the original source records for the offset range described by a {@link ShareGroupDLQRecordParameter} + * so they can be copied into a DLQ record. Local reads are performed inline in a loop; when an offset has + * been tiered off the local log the records are fetched asynchronously from the remote tier and the loop + * resumes once the read completes (so the caller's thread is never blocked on remote storage IO). + * + *

Best-effort: the returned future always completes normally with whatever records could be read. + * Offsets that cannot be read - locally or remotely - are simply absent from the map, leaving the caller + * to produce a DLQ record with headers only for them. + * + *

Instances are single-use: create one fetcher per {@link #fetch()} call. + */ +public class ShareGroupDLQRecordFetcher { + private static final Logger log = LoggerFactory.getLogger(ShareGroupDLQRecordFetcher.class); + + private final LogReader logReader; + private final Time time; + private final ShareGroupDLQRecordParameter param; + + private final TopicIdPartition tp; + private final long endOffset; + private final int recordCount; + private final long startTime; + private final Map recordMap; + // We are fetching data for one TopicIdPartition only. Hence, there is no need to keep recreating + // the maxBytes map, and we can re-use a single copy. In similar vein, we needn't clear the offsets + // map either and just update the value corresponding to the TopicIdPartition key across iterations. + private final LinkedHashMap offsets = new LinkedHashMap<>(); + private final LinkedHashMap maxBytesMap = new LinkedHashMap<>(); + private final CompletableFuture> result = new CompletableFuture<>(); + private final FetchParams fetchParams; + + public ShareGroupDLQRecordFetcher(LogReader logReader, Time time, ShareGroupDLQRecordParameter param, int maxFetchBytes) { + this.logReader = logReader; + this.time = time; + this.param = param; + this.tp = param.topicIdPartition(); + this.endOffset = param.lastOffset(); + this.recordCount = (int) (param.lastOffset() - param.firstOffset() + 1); + this.startTime = time.hiResClockMs(); + this.recordMap = new HashMap<>(recordCount); + this.maxBytesMap.put(tp, maxFetchBytes); + this.fetchParams = new FetchParams( + FetchRequest.CONSUMER_REPLICA_ID, // -1, reading as a consumer + -1, // replicaEpoch + 0L, // maxWaitMs - don't block + 1, // minBytes + maxFetchBytes, // maxBytes + FetchIsolation.HIGH_WATERMARK, // committed only + Optional.empty() // clientMetadata + ); + } + + /** + * Fetches the source records for the configured offset range. + * + * @return A future that always completes normally with the records that could be read, keyed by offset. + */ + public CompletableFuture> fetch() { + try { + runFrom(param.firstOffset()); + } catch (Exception e) { + // Never let an unexpected error escape; skip record copy entirely. + log.warn("Unexpected error fetching records for {}. Skipping record copy.", param, e); + result.complete(Map.of()); + } + return result; + } + + // Visibility for testing + CompletableFuture> result() { + return result; + } + + /** + * Drives synchronous local reads in a loop. When an offset resides in the remote tier the records + * are read asynchronously: if the remote read is already complete the loop continues in place, and + * if it is still pending the loop returns and is resumed from the callback - so the synchronous path + * never recurses and the async path resumes on a fresh stack (the remote storage reader thread). + */ + private void runFrom(long startFrom) { + long nextOffset = startFrom; + while (nextOffset <= endOffset) { + OptionalLong advanced = fetchLocal(nextOffset); + if (advanced.isEmpty()) { + // The loop stopped: the result has already been completed (read error, or no progress), + // or a remote read is pending and will resume the loop from its callback. + return; + } + nextOffset = advanced.getAsLong(); + } + complete(); + } + + /** + * Reads readFrom from the local log. If the offset has been tiered off the local log, delegates to + * the remote tier. Returns the next offset to read from so the loop can continue, or + * OptionalLong.empty() if the loop should stop now - either because the result has been completed + * (read error, or no progress) or because a remote read is pending and will resume from its callback. + */ + // Visibility for testing + OptionalLong fetchLocal(long readFrom) { + offsets.put(tp, readFrom); + + LinkedHashMap readResult = + logReader.read(fetchParams, Set.of(tp), offsets, maxBytesMap); + + LogReadResult res = readResult.get(tp); + if (res == null) { + log.warn("Unable to fetch actual record at offset {} for {}.", readFrom, param); + result.complete(Map.of()); + return OptionalLong.empty(); + } + + if (res.error().code() != Errors.NONE.code()) { + log.warn("Unable to fetch actual record at offset {} for {} due to error {}.", + readFrom, param, res.error()); + result.complete(Map.of()); + return OptionalLong.empty(); + } + + if (res.info().delayedRemoteStorageFetch.isPresent()) { + return fetchRemote(res.info().delayedRemoteStorageFetch.get(), readFrom); + } + + long advanced = collectRecords(res.info().records, readFrom); + // If the read position did not advance this iteration we have made no progress (reached HWM/LEO + // or only stale records were returned). Bail out to guarantee termination rather than + // re-fetching the same offset forever. + if (advanced <= readFrom) { + complete(); // no progress, stop + return OptionalLong.empty(); + } + return OptionalLong.of(advanced); + } + + /** + * Reads a tiered offset from the remote tier. Returns the next offset to read from so the loop can + * continue, or OptionalLong.empty() if the loop should stop now - either because the read is still + * pending (it will resume from its callback on the remote storage reader thread, leaving the sender + * thread unblocked) or because the read made no progress and the result has already been completed. + */ + // Visibility for testing + OptionalLong fetchRemote(RemoteStorageFetchInfo remoteStorageFetchInfo, long readFrom) { + CompletableFuture remote = logReader.readRemote(remoteStorageFetchInfo); + + if (!remote.isDone()) { + remote.whenComplete((fetchDataInfo, exception) -> resumeRemote(readFrom, fetchDataInfo, exception)); + return OptionalLong.empty(); + } + + // The read is already complete, so process it in place without blocking and keep looping. + FetchDataInfo fetchDataInfo = null; + Throwable exception = null; + try { + // Safe (non-blocking) because the future is done. + fetchDataInfo = remote.getNow(null); + } catch (CompletionException e) { + exception = e.getCause(); + } + + long advanced = processRemoteOutcome(readFrom, fetchDataInfo, exception); + if (advanced <= readFrom) { + complete(); // no progress, stop + return OptionalLong.empty(); + } + return OptionalLong.of(advanced); + } + + /** + * Resumes the read loop after an asynchronous remote read completes. Runs after runFrom() has + * already returned, so invoking runFrom() here does not grow the original call stack. + */ + private void resumeRemote(long readFrom, FetchDataInfo fetchDataInfo, Throwable exception) { + try { + long advanced = processRemoteOutcome(readFrom, fetchDataInfo, exception); + if (advanced <= readFrom) { + complete(); // no progress, stop + } else { + runFrom(advanced); // resume the loop + } + } catch (Exception e) { + log.warn("Unexpected error processing remote records for {}. Skipping record copy.", param, e); + result.complete(Map.of()); + } + } + + /** + * Turns the outcome of a remote read into records and collects them. A failed or empty read leaves + * the offsets unread (skipped), consistent with any other unavailable offset. + */ + private long processRemoteOutcome(long readFrom, FetchDataInfo fetchDataInfo, Throwable exception) { + Records records; + if (exception != null || fetchDataInfo == null) { + log.warn("Offset {} for {} is in remote storage but could not be read. Skipping it.", readFrom, param, exception); + records = MemoryRecords.EMPTY; + } else { + records = fetchDataInfo.records; + } + return collectRecords(records, readFrom); + } + + /** + * Adds the records within the requested range to the map and returns the offset to read from next + * (never moves backwards). Records below readFrom or above endOffset are ignored. + */ + private long collectRecords(Records records, long readFrom) { + long nextOffset = readFrom; + for (RecordBatch batch : records.batches()) { + for (Record record : batch) { + // A fetch can return a batch whose base offset is below the requested offset, so skip + // any record at or before the read position to avoid re-processing and dragging + // nextOffset backwards. + if (record.offset() < readFrom) continue; + if (record.offset() > endOffset) return nextOffset; + recordMap.put(record.offset(), record); + nextOffset = Math.max(nextOffset, record.offset() + 1); // never moves backwards + } + } + return nextOffset; + } + + /** + * Completes the result future with an immutable snapshot of the records collected so far. Offsets + * that could not be read are absent from the map; the caller produces a headers-only DLQ record for them. + */ + private void complete() { + log.trace("Log fetch took {} ms for {} records starting at {} for {}", time.hiResClockMs() - startTime, + recordCount, param.firstOffset(), param); + if (recordCount != recordMap.size()) { + log.info("Total offsets requested: {}, Records found: {}", recordCount, recordMap.size()); + } + result.complete(Map.copyOf(recordMap)); + } +} diff --git a/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java b/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java index 6bd1863aca880..06e4f6e708fac 100644 --- a/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java +++ b/server/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java @@ -37,12 +37,10 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.internal.MemoryRecords; import org.apache.kafka.common.record.internal.Record; -import org.apache.kafka.common.record.internal.RecordBatch; import org.apache.kafka.common.record.internal.SimpleRecord; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.CreateTopicsResponse; -import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.utils.Time; @@ -50,13 +48,10 @@ import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.share.LogReader; import org.apache.kafka.server.share.metrics.ShareGroupMetrics; -import org.apache.kafka.server.storage.log.FetchIsolation; -import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.util.InterBrokerSendThread; import org.apache.kafka.server.util.RequestAndCompletionHandler; import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.server.util.timer.TimerTask; -import org.apache.kafka.storage.internals.log.LogReadResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +63,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -218,9 +212,6 @@ private void enqueue(ProduceRequestHandler requestHandler) { * @param handler The handler instance to add to the node map. */ private void addRequestToNodeMap(Node node, ProduceRequestHandler handler) { - if (!handler.isBatchable()) { - return; - } synchronized (nodeMapLock) { nodeRPCMap.computeIfAbsent(node, k -> new LinkedList<>()) .add(handler); @@ -235,9 +226,18 @@ class ProduceRequestHandler implements RequestCompletionHandler { private static final Logger LOG = LoggerFactory.getLogger(ShareGroupDLQStateManager.ProduceRequestHandler.class); private final ExponentialBackoffManager createTopicsBackoff; private final ExponentialBackoffManager produceRequestBackoff; - private Node dlqPartitionLeaderNode; - private int dlqDestinationPartition; - private ShareGroupDLQMetadataCacheHelper.TopicPartitionData dlqTopicPartitionData; + // These DLQ topic fields are written by populateDLQTopicData() and then read while building the + // produce request. Since records are resolved asynchronously, the handler can be published + // to the node map from the remote storage reader thread, so these are volatile to guarantee + // their writes are visible to the sender thread that subsequently reads them. + private volatile Node dlqPartitionLeaderNode; + private volatile int dlqDestinationPartition; + private volatile ShareGroupDLQMetadataCacheHelper.TopicPartitionData dlqTopicPartitionData; + // The original source records, resolved asynchronously (possibly via remote storage) before + // this handler is added to the node map for produce coalescing. Volatile because it is set on + // the thread that completes the record fetch (the sender thread or the remote storage reader + // pool) and read on the sender thread while building the produce request. + private volatile Map resolvedRecordData = Map.of(); public static final String HEADER_DLQ_ERRORS_TOPIC = "__dlq.errors.topic"; public static final String HEADER_DLQ_ERRORS_PARTITION = "__dlq.errors.partition"; @@ -293,19 +293,6 @@ public String name() { return "ProduceRequestHandler"; } - /** - * This method helps determine if the handler could - * participate in batching (added to nodeMap). This will - * be helpful if the RPCs which cannot be batched are included in - * this class as well. - * - * @return Boolean indicating whether this handler can be coalesced with others - * to reduce number of RPCs sent. - */ - boolean isBatchable() { - return true; - } - public void requestErrorResponse(Throwable exception) { this.result.completeExceptionally(exception); } @@ -334,10 +321,6 @@ public AbstractRequest.Builder createTopicBuilder() throws .setTopics(topicCollection)); } - public AbstractRequest.Builder requestBuilder() { - throw new RuntimeException("Produce requests are batchable, hence individual requests not needed."); - } - public void populateDLQTopicData() throws ConfigException { Optional dlqTopic = cacheHelper.shareGroupDlqTopic(param.groupId()); if (dlqTopic.isEmpty()) { @@ -369,7 +352,9 @@ public void populateDLQTopicData() throws ConfigException { } public ProduceRequestData.TopicProduceData topicProduceData() { - Map originalRecordData = maybeFetchRecordData(); + // Records have already been resolved (including any remote storage reads) before this + // handler was added to the node map, so no blocking fetch happens on the sender thread here. + Map originalRecordData = resolvedRecordData; List simpleRecords = new ArrayList<>(); for (long i = param.firstOffset(); i <= param.lastOffset(); i++) { @@ -448,7 +433,7 @@ public boolean dlqTopicExists() { } catch (ConfigException e) { return false; } - addRequestToNodeMap(dlqPartitionLeaderNode, this); + resolveRecordsAndAddToNodeMap(dlqPartitionLeaderNode); } return isDlqTopicPresent; } @@ -542,11 +527,7 @@ private void handleCreateTopicsResponse(ClientResponse response) { try { populateDLQTopicData(); createTopicsBackoff.resetAttempts(); - if (this.isBatchable()) { - addRequestToNodeMap(this.dlqPartitionLeaderNode, this); - } else { - enqueue(this); - } + resolveRecordsAndAddToNodeMap(this.dlqPartitionLeaderNode); } catch (ConfigException e) { LOG.error("Error enqueueing after DLQ create topic response {}.", this, e); if (!createTopicsBackoff.canAttempt()) { @@ -687,78 +668,35 @@ private void handleProduceResponse(ClientResponse response) { } } - private Map maybeFetchRecordData() { - if (!cacheHelper.isShareGroupDlqCopyRecordEnabled(param.groupId())) { - return Map.of(); - } - long startTime = time.hiResClockMs(); - TopicIdPartition tp = param.topicIdPartition(); - - FetchParams fetchParams = new FetchParams( - FetchRequest.CONSUMER_REPLICA_ID, // -1, reading as a consumer - -1, // replicaEpoch - 0L, // maxWaitMs - don't block - 1, // minBytes - DLQ_MAX_FETCH_BYTES, // maxBytes - FetchIsolation.HIGH_WATERMARK, // committed only - Optional.empty() // clientMetadata - ); - - long nextOffset = param.firstOffset(); - long endOffset = param.lastOffset(); - int recordCount = (int) (param.lastOffset() - param.firstOffset() + 1); - - Map recordMap = new HashMap<>(recordCount); - LinkedHashMap offsets = new LinkedHashMap<>(); - LinkedHashMap maxBytesMap = new LinkedHashMap<>(); - maxBytesMap.put(tp, DLQ_MAX_FETCH_BYTES); - - // We are fetching data for one TopicIdPartition only. Hence, there - // is no need to keep recreating the maxBytes map, and we can re-use a - // single copy. In similar vein, we needn't clear the offsets map - // either and just update the value corresponding to the TopicIdPartition - // key in offsets map within the while loop. - while (nextOffset <= endOffset) { - long readFrom = nextOffset; // offset requested for this iteration - offsets.put(tp, readFrom); - - LinkedHashMap result = - logReader.read(fetchParams, Set.of(tp), offsets, maxBytesMap); - - LogReadResult res = result.get(param.topicIdPartition()); - if (res == null || res.error().code() != Errors.NONE.code()) { - log.warn("Unable to fetch actual record at offset {} for handler {}.", readFrom, this); - return Map.of(); - } - - res.info().delayedRemoteStorageFetch.ifPresent(data -> log.info( - "Some offset data in is in remote storage. Skipping it.")); - - for (RecordBatch batch : res.info().records.batches()) { - for (Record record : batch) { - // A fetch can return a batch whose base offset is below the requested - // offset, so skip any record at or before the read position to avoid - // re-processing and dragging nextOffset backwards. - if (record.offset() < readFrom) continue; - if (record.offset() > param.lastOffset()) { - log.trace("Preempted log fetch took {} ms for {} records starting at {} for {}", time.hiResClockMs() - startTime, - recordCount, param.firstOffset(), this); - return Map.copyOf(recordMap); - } - recordMap.put(record.offset(), record); - nextOffset = Math.max(nextOffset, record.offset() + 1); // never moves backwards - } + /** + * Resolves the original source records for this handler (reading from the local log and, for + * any offsets tiered to remote storage, asynchronously from the remote tier) and only then + * adds the handler to the node map for produce coalescing. Resolving up front keeps the + * (potentially blocking) remote storage IO off the sender thread, which would otherwise stall + * all DLQ produce/create-topic RPCs. + * + *

A failed fetch is non-fatal: the handler is still enqueued and the DLQ record is produced + * with headers only (no key/value), mirroring how individually unavailable offsets are skipped. + * + * @param node The destination node for the produce request once records are resolved. + */ + private void resolveRecordsAndAddToNodeMap(Node node) { + maybeFetchRecordData().whenComplete((records, exception) -> { + if (exception != null || records == null) { + LOG.warn("Unable to fetch original record data for handler {}. DLQ records will be produced with headers only.", this, exception); + this.resolvedRecordData = Map.of(); + } else { + this.resolvedRecordData = records; } + addRequestToNodeMap(node, this); + }); + } - // If the read position did not advance this iteration we have made no progress - // (reached HWM/LEO or only stale records were returned). Bail out to guarantee - // termination rather than re-fetching the same offset forever. - if (nextOffset <= readFrom) break; + private CompletableFuture> maybeFetchRecordData() { + if (!cacheHelper.isShareGroupDlqCopyRecordEnabled(param.groupId())) { + return CompletableFuture.completedFuture(Map.of()); } - log.trace("Full log fetch took {} ms for {} records starting at {} for {}", time.hiResClockMs() - startTime, - recordCount, param.firstOffset(), this); - log.info("Total offsets fetched: {}, Records found: {}", recordCount, recordMap.size()); - return Map.copyOf(recordMap); + return new ShareGroupDLQRecordFetcher(logReader, time, param, DLQ_MAX_FETCH_BYTES).fetch(); } } @@ -803,16 +741,9 @@ public Collection generateRequests() { log.error("Unable to create topic request for handler {}.", handler, exp); handler.requestErrorResponse(Errors.INVALID_CONFIG.exception()); } - } else { - if (!handler.isBatchable()) { - requests.add(new RequestAndCompletionHandler( - time.milliseconds(), - handler.dlqPartitionLeaderNode(), - handler.requestBuilder(), - handler - )); - } } + // When the DLQ topic already exists, the handler is added to the node map for produce + // coalescing (asynchronously, once its records are resolved), so nothing more to do here. } // { diff --git a/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordFetcherTest.java b/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordFetcherTest.java new file mode 100644 index 0000000000000..ce646f362c4fe --- /dev/null +++ b/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordFetcherTest.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.dlq; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.internal.MemoryRecords; +import org.apache.kafka.common.record.internal.Record; +import org.apache.kafka.common.record.internal.Records; +import org.apache.kafka.common.record.internal.SimpleRecord; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.share.LogReader; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; + +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class ShareGroupDLQRecordFetcherTest { + + private static final MockTime MOCK_TIME = new MockTime(); + private static final String GROUP_ID = "test-group"; + private static final TopicIdPartition TOPIC_ID_PARTITION = + new TopicIdPartition(Uuid.randomUuid(), 0, "source-topic"); + private static final int MAX_FETCH_BYTES = 1024 * 1024; + + private final LogReader logReader = mock(LogReader.class); + + private static ShareGroupDLQRecordParameter param(long firstOffset, long lastOffset) { + return new ShareGroupDLQRecordParameter( + GROUP_ID, TOPIC_ID_PARTITION, firstOffset, lastOffset, Optional.empty(), Optional.empty()); + } + + private ShareGroupDLQRecordFetcher fetcher(ShareGroupDLQRecordParameter param) { + return new ShareGroupDLQRecordFetcher(logReader, MOCK_TIME, param, MAX_FETCH_BYTES); + } + + private Map fetch(ShareGroupDLQRecordParameter param) throws Exception { + return fetcher(param).fetch().get(10, TimeUnit.SECONDS); + } + + private static RemoteStorageFetchInfo remoteFetchInfo() { + return new RemoteStorageFetchInfo( + MAX_FETCH_BYTES, + true, + TOPIC_ID_PARTITION, + new FetchRequest.PartitionData(TOPIC_ID_PARTITION.topicId(), 0L, 0L, MAX_FETCH_BYTES, Optional.empty()), + FetchIsolation.HIGH_WATERMARK); + } + + private static LogReadResult localResult(SimpleRecord... records) { + LogReadResult result = mock(LogReadResult.class); + when(result.error()).thenReturn(Errors.NONE); + when(result.info()).thenReturn(new FetchDataInfo(null, MemoryRecords.withRecords(Compression.NONE, records))); + return result; + } + + private static LogReadResult errorResult(Errors error) { + LogReadResult result = mock(LogReadResult.class); + when(result.error()).thenReturn(error); + return result; + } + + // A local read result indicating the requested offset has been tiered off to remote storage. + private static LogReadResult remoteResult() { + LogReadResult result = mock(LogReadResult.class); + when(result.error()).thenReturn(Errors.NONE); + when(result.info()).thenReturn(new FetchDataInfo( + null, MemoryRecords.EMPTY, false, Optional.empty(), Optional.of(remoteFetchInfo()))); + return result; + } + + private static FetchDataInfo records(SimpleRecord... records) { + return new FetchDataInfo(null, MemoryRecords.withRecords(Compression.NONE, records)); + } + + @SuppressWarnings("unchecked") + private void whenLocalReads(LogReadResult first, LogReadResult... rest) { + LinkedHashMap[] maps = new LinkedHashMap[rest.length + 1]; + maps[0] = mapOf(first); + for (int i = 0; i < rest.length; i++) { + maps[i + 1] = mapOf(rest[i]); + } + var stub = when(logReader.read(any(), anySet(), any(), any())).thenReturn(maps[0]); + for (int i = 1; i < maps.length; i++) { + stub = stub.thenReturn(maps[i]); + } + } + + private static LinkedHashMap mapOf(LogReadResult result) { + LinkedHashMap map = new LinkedHashMap<>(); + map.put(TOPIC_ID_PARTITION, result); + return map; + } + + private static SimpleRecord record(String key, String value) { + return new SimpleRecord(MOCK_TIME.milliseconds(), + key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8)); + } + + private static void assertRecord(Map result, long offset, String key, String value) { + Record record = result.get(offset); + assertTrue(record != null, "Expected a record at offset " + offset); + assertArrayEquals(key.getBytes(StandardCharsets.UTF_8), toArray(record.key())); + assertArrayEquals(value.getBytes(StandardCharsets.UTF_8), toArray(record.value())); + } + + private static byte[] toArray(ByteBuffer buffer) { + return Utils.toArray(buffer); + } + + @Test + public void testFetchAllLocalRecordsInSingleRead() throws Exception { + whenLocalReads(localResult(record("k0", "v0"), record("k1", "v1"), record("k2", "v2"))); + + Map result = fetch(param(0L, 2L)); + + assertEquals(3, result.size()); + assertRecord(result, 0L, "k0", "v0"); + assertRecord(result, 1L, "k1", "v1"); + assertRecord(result, 2L, "k2", "v2"); + verify(logReader, never()).readRemote(any()); + } + + @Test + public void testFetchLocalRecordsAcrossMultipleReads() throws Exception { + // First read returns only the first two offsets; the next read returns the batch containing the + // remaining offset (records at or before the already-read position are skipped by the fetcher). + whenLocalReads( + localResult(record("k0", "v0"), record("k1", "v1")), + localResult(record("k0", "v0"), record("k1", "v1"), record("k2", "v2"))); + + Map result = fetch(param(0L, 2L)); + + assertEquals(3, result.size()); + assertRecord(result, 0L, "k0", "v0"); + assertRecord(result, 1L, "k1", "v1"); + assertRecord(result, 2L, "k2", "v2"); + verify(logReader, never()).readRemote(any()); + } + + @Test + public void testLocalReadErrorYieldsNoRecords() throws Exception { + whenLocalReads(errorResult(Errors.UNKNOWN_SERVER_ERROR)); + + Map result = fetch(param(0L, 2L)); + + assertTrue(result.isEmpty()); + verify(logReader, never()).readRemote(any()); + } + + @Test + public void testMissingPartitionInReadResultYieldsNoRecords() throws Exception { + when(logReader.read(any(), anySet(), any(), any())).thenReturn(new LinkedHashMap<>()); + + Map result = fetch(param(0L, 2L)); + + assertTrue(result.isEmpty()); + } + + @Test + public void testFetchAllRecordsFromRemoteStorage() throws Exception { + whenLocalReads(remoteResult()); + when(logReader.readRemote(any())).thenReturn(CompletableFuture.completedFuture( + records(record("k0", "v0"), record("k1", "v1"), record("k2", "v2")))); + + Map result = fetch(param(0L, 2L)); + + assertEquals(3, result.size()); + assertRecord(result, 0L, "k0", "v0"); + assertRecord(result, 1L, "k1", "v1"); + assertRecord(result, 2L, "k2", "v2"); + verify(logReader).readRemote(any()); + } + + @Test + public void testRemoteFetchFailureSkipsRecords() throws Exception { + whenLocalReads(remoteResult()); + when(logReader.readRemote(any())).thenReturn(CompletableFuture.failedFuture( + new IllegalStateException("remote log manager not configured"))); + + Map result = fetch(param(0L, 2L)); + + assertTrue(result.isEmpty()); + verify(logReader).readRemote(any()); + } + + @Test + public void testFetchMixedLocalAndRemoteRecords() throws Exception { + // Offset 0 is read locally; the next read indicates the rest has been tiered to remote storage. + whenLocalReads(localResult(record("k0", "v0")), remoteResult()); + when(logReader.readRemote(any())).thenReturn(CompletableFuture.completedFuture( + records(record("k0", "v0"), record("k1", "v1"), record("k2", "v2")))); + + Map result = fetch(param(0L, 2L)); + + assertEquals(3, result.size()); + assertRecord(result, 0L, "k0", "v0"); + assertRecord(result, 1L, "k1", "v1"); + assertRecord(result, 2L, "k2", "v2"); + verify(logReader).readRemote(any()); + } + + @Test + public void testAsyncRemoteCompletionResumesLoop() throws Exception { + whenLocalReads(remoteResult()); + // Return a future that is not yet complete to exercise the asynchronous resume path (the fetch + // returns before the remote read completes, and the loop is resumed from the callback). + CompletableFuture remoteFuture = new CompletableFuture<>(); + when(logReader.readRemote(any())).thenReturn(remoteFuture); + + CompletableFuture> resultFuture = + new ShareGroupDLQRecordFetcher(logReader, MOCK_TIME, param(0L, 2L), MAX_FETCH_BYTES).fetch(); + assertFalse(resultFuture.isDone(), "Fetch should be waiting on the pending remote read"); + + remoteFuture.complete(records(record("k0", "v0"), record("k1", "v1"), record("k2", "v2"))); + + Map result = resultFuture.get(10, TimeUnit.SECONDS); + assertEquals(3, result.size()); + assertRecord(result, 0L, "k0", "v0"); + assertRecord(result, 1L, "k1", "v1"); + assertRecord(result, 2L, "k2", "v2"); + } + + @Test + public void testSingleOffsetFetch() throws Exception { + whenLocalReads(localResult(record("k0", "v0"))); + + Map result = fetch(param(0L, 0L)); + + assertEquals(1, result.size()); + assertRecord(result, 0L, "k0", "v0"); + } + + @Test + public void testFetchInterleavingTwoLocalAndTwoRemoteReads() throws Exception { + // Offsets 0..3 served by alternating reads: offset 0 local, offset 1 remote, offset 2 local, + // offset 3 remote. This drives the loop through two local reads and two remote reads. Each read + // returns a batch starting at base offset 0, so records at or before the requested offset are + // skipped and only the offset being requested in that iteration is collected. + whenLocalReads( + localResult(record("k0", "v0")), // offset 0 (local) + remoteResult(), // offset 1 (tiered) + localResult(record("k0", "v0"), record("k1", "v1"), record("k2", "v2")), // offset 2 (local) + remoteResult()); // offset 3 (tiered) + + when(logReader.readRemote(any())) + .thenReturn(CompletableFuture.completedFuture( + records(record("k0", "v0"), record("k1", "v1")))) // serves offset 1 + .thenReturn(CompletableFuture.completedFuture( + records(record("k0", "v0"), record("k1", "v1"), record("k2", "v2"), record("k3", "v3")))); // serves offset 3 + + Map result = fetch(param(0L, 3L)); + + assertEquals(4, result.size()); + assertRecord(result, 0L, "k0", "v0"); + assertRecord(result, 1L, "k1", "v1"); + assertRecord(result, 2L, "k2", "v2"); + assertRecord(result, 3L, "k3", "v3"); + verify(logReader, times(4)).read(any(), anySet(), any(), any()); + verify(logReader, times(2)).readRemote(any()); + } + + // ---- fetchLocal ---- + + @Test + public void testFetchLocalReturnsNextOffsetOnLocalRecords() { + whenLocalReads(localResult(record("k0", "v0"), record("k1", "v1"), record("k2", "v2"))); + + ShareGroupDLQRecordFetcher fetcher = fetcher(param(0L, 2L)); + OptionalLong advanced = fetcher.fetchLocal(0L); + + // All three requested offsets were read, so the loop should continue past the range. + assertEquals(OptionalLong.of(3L), advanced); + assertFalse(fetcher.result().isDone(), "Result should remain pending while there is progress"); + verify(logReader, never()).readRemote(any()); + } + + @Test + public void testFetchLocalAbortsWhenPartitionMissing() throws Exception { + when(logReader.read(any(), anySet(), any(), any())).thenReturn(new LinkedHashMap<>()); + + ShareGroupDLQRecordFetcher fetcher = fetcher(param(0L, 2L)); + OptionalLong advanced = fetcher.fetchLocal(0L); + + assertEquals(OptionalLong.empty(), advanced); + assertTrue(fetcher.result().isDone()); + assertTrue(fetcher.result().get(10, TimeUnit.SECONDS).isEmpty()); + } + + @Test + public void testFetchLocalAbortsOnReadError() throws Exception { + whenLocalReads(errorResult(Errors.UNKNOWN_SERVER_ERROR)); + + ShareGroupDLQRecordFetcher fetcher = fetcher(param(0L, 2L)); + OptionalLong advanced = fetcher.fetchLocal(0L); + + assertEquals(OptionalLong.empty(), advanced); + assertTrue(fetcher.result().isDone()); + assertTrue(fetcher.result().get(10, TimeUnit.SECONDS).isEmpty()); + } + + @Test + public void testFetchLocalCompletesWhenNoProgress() throws Exception { + // A read that returns no records does not advance the read position. + whenLocalReads(localResult()); + + ShareGroupDLQRecordFetcher fetcher = fetcher(param(0L, 2L)); + OptionalLong advanced = fetcher.fetchLocal(0L); + + assertEquals(OptionalLong.empty(), advanced); + assertTrue(fetcher.result().isDone()); + assertTrue(fetcher.result().get(10, TimeUnit.SECONDS).isEmpty()); + } + + @Test + public void testFetchLocalDelegatesToRemoteWhenTiered() { + whenLocalReads(remoteResult()); + when(logReader.readRemote(any())).thenReturn(CompletableFuture.completedFuture( + records(record("k0", "v0"), record("k1", "v1"), record("k2", "v2")))); + + ShareGroupDLQRecordFetcher fetcher = fetcher(param(0L, 2L)); + OptionalLong advanced = fetcher.fetchLocal(0L); + + // The tiered read returned all offsets, so fetchLocal returns the remote read's result. + assertEquals(OptionalLong.of(3L), advanced); + assertFalse(fetcher.result().isDone()); + verify(logReader).readRemote(any()); + } + + // ---- fetchRemote ---- + + @Test + public void testFetchRemoteReturnsNextOffsetWhenComplete() { + when(logReader.readRemote(any())).thenReturn(CompletableFuture.completedFuture( + records(record("k0", "v0"), record("k1", "v1"), record("k2", "v2")))); + + ShareGroupDLQRecordFetcher fetcher = fetcher(param(0L, 2L)); + OptionalLong advanced = fetcher.fetchRemote(remoteFetchInfo(), 0L); + + assertEquals(OptionalLong.of(3L), advanced); + assertFalse(fetcher.result().isDone()); + } + + @Test + public void testFetchRemoteSkipsAndCompletesOnFailure() throws Exception { + when(logReader.readRemote(any())).thenReturn(CompletableFuture.failedFuture( + new IllegalStateException("remote log manager not configured"))); + + ShareGroupDLQRecordFetcher fetcher = fetcher(param(0L, 2L)); + OptionalLong advanced = fetcher.fetchRemote(remoteFetchInfo(), 0L); + + // The failed remote read read nothing, so no progress -> the loop stops and the result completes. + assertEquals(OptionalLong.empty(), advanced); + assertTrue(fetcher.result().isDone()); + assertTrue(fetcher.result().get(10, TimeUnit.SECONDS).isEmpty()); + } + + @Test + public void testFetchRemotePendingReturnsEmptyAndResumesOnCompletion() throws Exception { + CompletableFuture remoteFuture = new CompletableFuture<>(); + when(logReader.readRemote(any())).thenReturn(remoteFuture); + + ShareGroupDLQRecordFetcher fetcher = fetcher(param(0L, 2L)); + OptionalLong advanced = fetcher.fetchRemote(remoteFetchInfo(), 0L); + + // Pending remote read: fetchRemote returns empty (loop suspends) and the result stays pending. + assertEquals(OptionalLong.empty(), advanced); + assertFalse(fetcher.result().isDone()); + + // Completing the remote read resumes the loop from the callback and completes the fetch. + remoteFuture.complete(records(record("k0", "v0"), record("k1", "v1"), record("k2", "v2"))); + + Map result = fetcher.result().get(10, TimeUnit.SECONDS); + assertEquals(3, result.size()); + assertRecord(result, 0L, "k0", "v0"); + assertRecord(result, 1L, "k1", "v1"); + assertRecord(result, 2L, "k2", "v2"); + } + + // ---- additional branch coverage ---- + + @Test + public void testFetchCompletesEmptyWhenLocalReadThrows() throws Exception { + // An unexpected error from the log reader must not escape; the copy is skipped entirely. + when(logReader.read(any(), anySet(), any(), any())).thenThrow(new RuntimeException("boom")); + + Map result = fetch(param(0L, 2L)); + + assertTrue(result.isEmpty()); + } + + @Test + public void testCollectStopsAtEndOffsetWhenReadReturnsExtraRecords() throws Exception { + // The read returns more records than the requested range [0, 1]; offsets beyond endOffset are ignored. + whenLocalReads(localResult(record("k0", "v0"), record("k1", "v1"), record("k2", "v2"))); + + Map result = fetch(param(0L, 1L)); + + assertEquals(2, result.size()); + assertRecord(result, 0L, "k0", "v0"); + assertRecord(result, 1L, "k1", "v1"); + assertNull(result.get(2L)); + } + + @Test + public void testFetchRemoteSkipsWhenReadReturnsNullData() throws Exception { + // A remote read that completes with null data leaves the offsets unread (skipped). + when(logReader.readRemote(any())).thenReturn(CompletableFuture.completedFuture(null)); + + ShareGroupDLQRecordFetcher fetcher = fetcher(param(0L, 2L)); + OptionalLong advanced = fetcher.fetchRemote(remoteFetchInfo(), 0L); + + assertEquals(OptionalLong.empty(), advanced); + assertTrue(fetcher.result().get(10, TimeUnit.SECONDS).isEmpty()); + } + + @Test + public void testFetchRemotePendingCompletesWhenNoProgress() throws Exception { + CompletableFuture remoteFuture = new CompletableFuture<>(); + when(logReader.readRemote(any())).thenReturn(remoteFuture); + + ShareGroupDLQRecordFetcher fetcher = fetcher(param(0L, 2L)); + assertEquals(OptionalLong.empty(), fetcher.fetchRemote(remoteFetchInfo(), 0L)); + assertFalse(fetcher.result().isDone()); + + // Completing with no records makes no progress, so the resumed loop stops and completes empty. + remoteFuture.complete(records()); + + assertTrue(fetcher.result().get(10, TimeUnit.SECONDS).isEmpty()); + } + + @Test + public void testFetchRemotePendingCompletesEmptyWhenProcessingThrows() throws Exception { + CompletableFuture remoteFuture = new CompletableFuture<>(); + when(logReader.readRemote(any())).thenReturn(remoteFuture); + + ShareGroupDLQRecordFetcher fetcher = fetcher(param(0L, 2L)); + fetcher.fetchRemote(remoteFetchInfo(), 0L); + + // An unexpected error while processing the resumed remote records must not escape the callback. + Records throwing = mock(Records.class); + when(throwing.batches()).thenThrow(new RuntimeException("boom")); + remoteFuture.complete(new FetchDataInfo(null, throwing)); + + assertTrue(fetcher.result().get(10, TimeUnit.SECONDS).isEmpty()); + } +} diff --git a/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java b/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java index 2da51e250d3e4..b3d1ee9c63ca0 100644 --- a/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java +++ b/server/src/test/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManagerTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.record.internal.SimpleRecord; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.CreateTopicsResponse; +import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.utils.Time; @@ -41,6 +42,7 @@ import org.apache.kafka.server.share.LogReader; import org.apache.kafka.server.share.dlq.ShareGroupDLQMetadataCacheHelper.TopicPartitionData; import org.apache.kafka.server.share.metrics.ShareGroupMetrics; +import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.util.MockTime; import org.apache.kafka.server.util.timer.MockTimer; import org.apache.kafka.server.util.timer.SystemTimer; @@ -48,6 +50,7 @@ import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -1706,6 +1709,214 @@ public void testDLQRecordCopyEnabledWithErrorOnLogRead() throws Exception { verify(mockMetrics, never()).recordDLQProduceFailed(any()); } + @Test + public void testDLQRecordCopyEnabledWithRemoteFetch() throws Exception { + MockClient client = new MockClient(MOCK_TIME); + List capturedProduces = new ArrayList<>(); + client.prepareResponseFrom( + captureProduce(capturedProduces), + successfulProduceResponse(0), + DEFAULT_LEADER + ); + + ShareGroupDLQRecordParameter param = param(); // 3 offsets requested + byte[] keyData1 = "key1".getBytes(StandardCharsets.UTF_8); + byte[] valueData1 = "value1".getBytes(StandardCharsets.UTF_8); + byte[] keyData2 = "key2".getBytes(StandardCharsets.UTF_8); + byte[] valueData2 = "value2".getBytes(StandardCharsets.UTF_8); + byte[] keyData3 = "key3".getBytes(StandardCharsets.UTF_8); + byte[] valueData3 = "value3".getBytes(StandardCharsets.UTF_8); + + LogReader logReader = mock(LogReader.class); + // The local read indicates that the data has been tiered off to remote storage. + LogReadResult readResult = mock(LogReadResult.class); + when(readResult.error()).thenReturn(Errors.NONE); + when(readResult.info()).thenReturn(remoteFetchDataInfo(param.topicIdPartition())); + LinkedHashMap readResultMap = new LinkedHashMap<>(); + readResultMap.put(param.topicIdPartition(), readResult); + when(logReader.read(any(), anySet(), any(), any())).thenReturn(readResultMap); + + // The remote read returns all the requested records. + when(logReader.readRemote(any())).thenReturn(CompletableFuture.completedFuture(new FetchDataInfo( + null, + MemoryRecords.withRecords( + Compression.NONE, + new SimpleRecord(MOCK_TIME.milliseconds(), keyData1, valueData1), + new SimpleRecord(MOCK_TIME.milliseconds(), keyData2, valueData2), + new SimpleRecord(MOCK_TIME.milliseconds(), keyData3, valueData3) + ) + ))); + + ShareGroupDLQMetadataCacheHelper cacheHelper = cacheHelper(DEFAULT_LEADER); + when(cacheHelper.isShareGroupDlqCopyRecordEnabled(any())).thenReturn(true); + stateManager = builder().withClient(client).withLogReader(logReader).withCacheHelper(cacheHelper).build(); + stateManager.start(); + assertNull(stateManager.dlq(param).get(10, TimeUnit.SECONDS)); + + assertEquals(1, capturedProduces.size()); + assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of( + 0, new ExpectedDlqPartition(0L, 2L, Map.of( + HEADER_DLQ_ERRORS_TOPIC, "source-topic", + HEADER_DLQ_ERRORS_PARTITION, "0", + HEADER_DLQ_ERRORS_GROUP, GROUP_ID, + HEADER_DLQ_ERRORS_DELIVERY_COUNT, "1", + HEADER_DLQ_ERRORS_MESSAGE, "simulated cause" + ), List.of(keyData1, keyData2, keyData3), List.of(valueData1, valueData2, valueData3)) + )); + verify(logReader).readRemote(any()); + verify(mockMetrics).recordDLQProduce(GROUP_ID); + verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 3); + verify(mockMetrics, never()).recordDLQProduceFailed(any()); + } + + @Test + public void testDLQRecordCopyEnabledWithMixedLocalAndRemoteFetch() throws Exception { + MockClient client = new MockClient(MOCK_TIME); + List capturedProduces = new ArrayList<>(); + client.prepareResponseFrom( + captureProduce(capturedProduces), + successfulProduceResponse(0), + DEFAULT_LEADER + ); + + ShareGroupDLQRecordParameter param = param(); // 3 offsets requested (0, 1, 2) + byte[] keyData1 = "key1".getBytes(StandardCharsets.UTF_8); + byte[] valueData1 = "value1".getBytes(StandardCharsets.UTF_8); + byte[] keyData2 = "key2".getBytes(StandardCharsets.UTF_8); + byte[] valueData2 = "value2".getBytes(StandardCharsets.UTF_8); + byte[] keyData3 = "key3".getBytes(StandardCharsets.UTF_8); + byte[] valueData3 = "value3".getBytes(StandardCharsets.UTF_8); + + LogReader logReader = mock(LogReader.class); + // First local read returns offset 0 locally; the next read (for offset 1) indicates the + // remaining data has been tiered off to remote storage. + LogReadResult localResult = mock(LogReadResult.class); + when(localResult.error()).thenReturn(Errors.NONE); + when(localResult.info()).thenReturn(new FetchDataInfo( + null, + MemoryRecords.withRecords( + Compression.NONE, + new SimpleRecord(MOCK_TIME.milliseconds(), keyData1, valueData1) + ) + )); + LogReadResult remoteResult = mock(LogReadResult.class); + when(remoteResult.error()).thenReturn(Errors.NONE); + when(remoteResult.info()).thenReturn(remoteFetchDataInfo(param.topicIdPartition())); + + LinkedHashMap localMap = new LinkedHashMap<>(); + localMap.put(param.topicIdPartition(), localResult); + LinkedHashMap remoteMap = new LinkedHashMap<>(); + remoteMap.put(param.topicIdPartition(), remoteResult); + when(logReader.read(any(), anySet(), any(), any())) + .thenReturn(localMap) + .thenReturn(remoteMap); + + // The remote read returns the batch covering the requested offset; it starts at base offset 0 + // so records at or before the already-read position (offset 0) are skipped by the caller, and + // only the tiered offsets (1 and 2) are collected. + when(logReader.readRemote(any())).thenReturn(CompletableFuture.completedFuture(new FetchDataInfo( + null, + MemoryRecords.withRecords( + Compression.NONE, + new SimpleRecord(MOCK_TIME.milliseconds(), keyData1, valueData1), + new SimpleRecord(MOCK_TIME.milliseconds(), keyData2, valueData2), + new SimpleRecord(MOCK_TIME.milliseconds(), keyData3, valueData3) + ) + ))); + + ShareGroupDLQMetadataCacheHelper cacheHelper = cacheHelper(DEFAULT_LEADER); + when(cacheHelper.isShareGroupDlqCopyRecordEnabled(any())).thenReturn(true); + stateManager = builder().withClient(client).withLogReader(logReader).withCacheHelper(cacheHelper).build(); + stateManager.start(); + assertNull(stateManager.dlq(param).get(10, TimeUnit.SECONDS)); + + assertEquals(1, capturedProduces.size()); + assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of( + 0, new ExpectedDlqPartition(0L, 2L, Map.of( + HEADER_DLQ_ERRORS_TOPIC, "source-topic", + HEADER_DLQ_ERRORS_PARTITION, "0", + HEADER_DLQ_ERRORS_GROUP, GROUP_ID, + HEADER_DLQ_ERRORS_DELIVERY_COUNT, "1", + HEADER_DLQ_ERRORS_MESSAGE, "simulated cause" + ), List.of(keyData1, keyData2, keyData3), List.of(valueData1, valueData2, valueData3)) + )); + verify(logReader).readRemote(any()); + verify(mockMetrics).recordDLQProduce(GROUP_ID); + verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 3); + verify(mockMetrics, never()).recordDLQProduceFailed(any()); + } + + @Test + public void testDLQRecordCopyEnabledWithRemoteFetchFailureSkipsRecords() throws Exception { + MockClient client = new MockClient(MOCK_TIME); + List capturedProduces = new ArrayList<>(); + client.prepareResponseFrom( + captureProduce(capturedProduces), + successfulProduceResponse(0), + DEFAULT_LEADER + ); + + ShareGroupDLQRecordParameter param = param(); + LogReader logReader = mock(LogReader.class); + // The local read indicates the data is in remote storage. + LogReadResult readResult = mock(LogReadResult.class); + when(readResult.error()).thenReturn(Errors.NONE); + when(readResult.info()).thenReturn(remoteFetchDataInfo(param.topicIdPartition())); + LinkedHashMap readResultMap = new LinkedHashMap<>(); + readResultMap.put(param.topicIdPartition(), readResult); + when(logReader.read(any(), anySet(), any(), any())).thenReturn(readResultMap); + + // The remote read fails (e.g. remote storage not configured or segment unavailable). The + // offsets are gracefully skipped and the DLQ record is produced with headers only. + when(logReader.readRemote(any())).thenReturn(CompletableFuture.failedFuture( + new IllegalStateException("remote log manager not configured"))); + + ShareGroupDLQMetadataCacheHelper cacheHelper = cacheHelper(DEFAULT_LEADER); + when(cacheHelper.isShareGroupDlqCopyRecordEnabled(any())).thenReturn(true); + stateManager = builder().withClient(client).withLogReader(logReader).withCacheHelper(cacheHelper).build(); + stateManager.start(); + assertNull(stateManager.dlq(param).get(10, TimeUnit.SECONDS)); + + assertEquals(1, capturedProduces.size()); + assertDlqProduceRecordHeaders(capturedProduces.get(0), Map.of( + 0, new ExpectedDlqPartition(0L, 2L, Map.of( + HEADER_DLQ_ERRORS_TOPIC, "source-topic", + HEADER_DLQ_ERRORS_PARTITION, "0", + HEADER_DLQ_ERRORS_GROUP, GROUP_ID, + HEADER_DLQ_ERRORS_DELIVERY_COUNT, "1", + HEADER_DLQ_ERRORS_MESSAGE, "simulated cause" + ), List.of(), List.of()) + )); + verify(logReader).readRemote(any()); + verify(mockMetrics).recordDLQProduce(GROUP_ID); + verify(mockMetrics).recordDLQRecordWrite(GROUP_ID, 3); + verify(mockMetrics, never()).recordDLQProduceFailed(any()); + } + + // Builds a FetchDataInfo that mimics a local read whose requested offset has been tiered off the + // local log: empty local records plus a remote fetch descriptor that triggers a remote read. + private static FetchDataInfo remoteFetchDataInfo(TopicIdPartition topicIdPartition) { + int maxBytes = 1024 * 1024; + RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo( + maxBytes, + true, + topicIdPartition, + new FetchRequest.PartitionData(topicIdPartition.topicId(), 0L, 0L, maxBytes, Optional.empty()), + FetchIsolation.HIGH_WATERMARK + ); + return new FetchDataInfo(null, MemoryRecords.EMPTY, false, Optional.empty(), Optional.of(remoteStorageFetchInfo)); + } + + private static MockClient.RequestMatcher captureProduce(List capturedProduces) { + return body -> { + if (body instanceof ProduceRequest pr) { + capturedProduces.add(pr); + return true; + } + return false; + }; + } + private static ShareGroupDLQStateManager.ProduceRequestHandler newHandlerForCoalesceTest( ShareGroupDLQStateManager manager, String groupId,