Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,26 @@

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.share.LogReader;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedHashMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import scala.Tuple2;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
import scala.jdk.javaapi.OptionConverters;
import scala.runtime.BoxedUnit;

/**
Expand Down Expand Up @@ -92,4 +97,37 @@ public LinkedHashMap<TopicIdPartition, LogReadResult> read(
log.trace("Data successfully retrieved by replica manager: {}", responseData);
return responseData;
}

@Override
public CompletableFuture<FetchDataInfo> readRemote(RemoteStorageFetchInfo remoteStorageFetchInfo) {
CompletableFuture<FetchDataInfo> future = new CompletableFuture<>();

Optional<RemoteLogManager> remoteLogManager = OptionConverters.toJava(replicaManager.remoteLogManager());
if (remoteLogManager.isEmpty()) {
future.completeExceptionally(new IllegalStateException(
"Cannot read " + remoteStorageFetchInfo + " from remote storage as remote log manager is not configured."));
return future;
}

try {
// The read runs on the remote storage reader thread pool; the callback completes the
// future on that pool's thread, so the caller's thread is never blocked on remote IO.
remoteLogManager.get().asyncRead(remoteStorageFetchInfo, result -> {
if (result.error().isPresent()) {
future.completeExceptionally(result.error().get());
} else if (result.fetchDataInfo().isPresent()) {
future.complete(result.fetchDataInfo().get());
} else {
future.completeExceptionally(new IllegalStateException(
"Remote read for " + remoteStorageFetchInfo + " returned neither data nor error."));
}
});
} catch (Exception e) {
// e.g. RejectedExecutionException if the reader pool is shutting down.
log.warn("Unable to schedule remote read for {}.", remoteStorageFetchInfo, e);
future.completeExceptionally(e);
}

return future;
}
}
208 changes: 208 additions & 0 deletions core/src/test/java/kafka/server/share/ReplicaManagerLogReaderTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.share;

import kafka.server.ReplicaManager;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogReadResult;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;

import org.junit.jupiter.api.Test;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import scala.Option;
import scala.Tuple2;
import scala.collection.immutable.Seq;
import scala.jdk.javaapi.CollectionConverters;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class ReplicaManagerLogReaderTest {

private static final TopicIdPartition TOPIC_ID_PARTITION =
new TopicIdPartition(Uuid.randomUuid(), 0, "topic");

private static RemoteStorageFetchInfo remoteStorageFetchInfo() {
return new RemoteStorageFetchInfo(
1024,
true,
TOPIC_ID_PARTITION,
new FetchRequest.PartitionData(TOPIC_ID_PARTITION.topicId(), 0L, 0L, 1024, Optional.empty()),
FetchIsolation.HIGH_WATERMARK);
}

private static FetchParams fetchParams() {
return new FetchParams(
FetchRequest.CONSUMER_REPLICA_ID, -1, 0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty());
}

@Test
public void testReadReturnsEmptyWhenNoPartitionsToFetch() {
ReplicaManager replicaManager = mock(ReplicaManager.class);

ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager);
LinkedHashMap<TopicIdPartition, LogReadResult> result =
logReader.read(fetchParams(), Set.of(), new LinkedHashMap<>(), new LinkedHashMap<>());

assertTrue(result.isEmpty());
verify(replicaManager, never()).readFromLog(any(), any(), any(), anyBoolean());
}

@Test
public void testReadReturnsResultsFromReplicaManager() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
LogReadResult logReadResult = mock(LogReadResult.class);
Seq<Tuple2<TopicIdPartition, LogReadResult>> readFromLogResult =
CollectionConverters.asScala(List.of(new Tuple2<>(TOPIC_ID_PARTITION, logReadResult))).toSeq();
when(replicaManager.readFromLog(any(), any(), any(), anyBoolean())).thenReturn(readFromLogResult);

LinkedHashMap<TopicIdPartition, Long> offsets = new LinkedHashMap<>();
offsets.put(TOPIC_ID_PARTITION, 5L);
LinkedHashMap<TopicIdPartition, Integer> maxBytes = new LinkedHashMap<>();
maxBytes.put(TOPIC_ID_PARTITION, 1024);

ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager);
LinkedHashMap<TopicIdPartition, LogReadResult> result =
logReader.read(fetchParams(), Set.of(TOPIC_ID_PARTITION), offsets, maxBytes);

assertEquals(1, result.size());
assertSame(logReadResult, result.get(TOPIC_ID_PARTITION));
}

@Test
public void testReadRemoteCompletesExceptionallyWhenRemoteLogManagerNotConfigured() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
when(replicaManager.remoteLogManager()).thenReturn(Option.empty());

ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager);
CompletableFuture<FetchDataInfo> future = logReader.readRemote(remoteStorageFetchInfo());

assertTrue(future.isCompletedExceptionally());
ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.SECONDS));
assertInstanceOf(IllegalStateException.class, exception.getCause());
}

@Test
public void testReadRemoteCompletesWithFetchedData() throws Exception {
ReplicaManager replicaManager = mock(ReplicaManager.class);
RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));

FetchDataInfo fetchDataInfo = new FetchDataInfo(new LogOffsetMetadata(0L), MemoryRecords.EMPTY);
RemoteStorageFetchInfo fetchInfo = remoteStorageFetchInfo();
doAnswer(invocation -> {
Consumer<RemoteLogReadResult> callback = invocation.getArgument(1);
callback.accept(new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty()));
return null;
}).when(remoteLogManager).asyncRead(eq(fetchInfo), any());

ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager);
CompletableFuture<FetchDataInfo> future = logReader.readRemote(fetchInfo);

assertSame(fetchDataInfo, future.get(10, TimeUnit.SECONDS));
}

@Test
public void testReadRemoteCompletesExceptionallyWhenReadResultHasError() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));

RuntimeException readError = new RuntimeException("remote read failed");
doAnswer(invocation -> {
Consumer<RemoteLogReadResult> callback = invocation.getArgument(1);
callback.accept(new RemoteLogReadResult(Optional.empty(), Optional.of(readError)));
return null;
}).when(remoteLogManager).asyncRead(any(), any());

ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager);
CompletableFuture<FetchDataInfo> future = logReader.readRemote(remoteStorageFetchInfo());

assertTrue(future.isCompletedExceptionally());
ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.SECONDS));
assertSame(readError, exception.getCause());
}

@Test
public void testReadRemoteCompletesExceptionallyWhenReadResultIsEmpty() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));

doAnswer(invocation -> {
Consumer<RemoteLogReadResult> callback = invocation.getArgument(1);
callback.accept(new RemoteLogReadResult(Optional.empty(), Optional.empty()));
return null;
}).when(remoteLogManager).asyncRead(any(), any());

ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager);
CompletableFuture<FetchDataInfo> future = logReader.readRemote(remoteStorageFetchInfo());

assertTrue(future.isCompletedExceptionally());
ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.SECONDS));
assertInstanceOf(IllegalStateException.class, exception.getCause());
}

@Test
public void testReadRemoteCompletesExceptionallyWhenSchedulingRejected() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));

RejectedExecutionException rejected = new RejectedExecutionException("reader pool shutting down");
doThrow(rejected).when(remoteLogManager).asyncRead(any(), any());

ReplicaManagerLogReader logReader = new ReplicaManagerLogReader(replicaManager);
CompletableFuture<FetchDataInfo> future = logReader.readRemote(remoteStorageFetchInfo());

assertTrue(future.isCompletedExceptionally());
ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get(10, TimeUnit.SECONDS));
assertSame(rejected, exception.getCause());
}
}
20 changes: 20 additions & 0 deletions server/src/main/java/org/apache/kafka/server/share/LogReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;

import java.util.LinkedHashMap;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

/**
* Abstraction for reading records from log.
Expand All @@ -42,4 +45,21 @@ LinkedHashMap<TopicIdPartition, LogReadResult> read(
Set<TopicIdPartition> partitionsToFetch,
LinkedHashMap<TopicIdPartition, Long> topicPartitionFetchOffsets,
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes);

/**
* Read records asynchronously from the remote tier for an offset that has been tiered off the
* local log. The {@link RemoteStorageFetchInfo} is the descriptor surfaced by
* {@link LogReadResult#info()} as {@link FetchDataInfo#delayedRemoteStorageFetch} when a
* preceding {@link #read} determined that the requested data resides in remote storage.
*
* <p>The read is performed off-thread (on the remote storage reader pool) so that the caller's
* thread is not blocked on remote storage IO. It is intended for low volume, best-effort reads.
* The returned future completes exceptionally when remote storage is not configured on the broker
* or the read could not be completed, allowing callers to gracefully skip the data instead of failing.
*
* @param remoteStorageFetchInfo The remote fetch descriptor obtained from a prior local read.
* @return A future that completes with the fetched data, or completes exceptionally if it could
* not be read remotely.
*/
CompletableFuture<FetchDataInfo> readRemote(RemoteStorageFetchInfo remoteStorageFetchInfo);
}
Loading
Loading