From 4da2925e6bf8de73b17c8d021e7d154ea6dea3fd Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Wed, 14 May 2025 11:55:31 +0200 Subject: [PATCH 1/3] fix --- .../org/apache/kafka/storage/internals/log/UnifiedLog.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index 972aeb095815a..47b972b9dde33 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -647,8 +647,9 @@ private LogOffsetMetadata fetchLastStableOffsetMetadata() throws IOException { // cache the current high watermark to avoid a concurrent update invalidating the range check LogOffsetMetadata highWatermarkMetadata = fetchHighWatermarkMetadata(); - if (firstUnstableOffsetMetadata.isPresent() && firstUnstableOffsetMetadata.get().messageOffset < highWatermarkMetadata.messageOffset) { - LogOffsetMetadata lom = firstUnstableOffsetMetadata.get(); + Optional firstUnstableOffsetMetadataCopy = firstUnstableOffsetMetadata; + if (firstUnstableOffsetMetadataCopy.isPresent() && firstUnstableOffsetMetadataCopy.get().messageOffset < highWatermarkMetadata.messageOffset) { + LogOffsetMetadata lom = firstUnstableOffsetMetadataCopy.get(); if (lom.messageOffsetOnly()) { synchronized (lock) { LogOffsetMetadata fullOffset = maybeConvertToOffsetMetadata(lom.messageOffset); From c443bdd3b25d1fd313aab9ab96b903a824a61079 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Thu, 15 May 2025 12:47:37 +0200 Subject: [PATCH 2/3] other fixes --- .../storage/internals/log/UnifiedLog.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index 47b972b9dde33..08801dd202b97 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -483,7 +483,7 @@ private void initializeTopicId() { if (partMetadataFile.exists()) { Uuid fileTopicId = partMetadataFile.read().topicId(); - if (topicId.isPresent() && !topicId.get().equals(fileTopicId)) { + if (topicId.filter(x -> !x.equals(fileTopicId)).isPresent()) { throw new InconsistentTopicIdException("Tried to assign topic ID " + topicId + " to log for topic partition " + topicPartition() + "," + "but log already contained topic ID " + fileTopicId); } @@ -672,8 +672,9 @@ private LogOffsetMetadata fetchLastStableOffsetMetadata() throws IOException { * beyond the high watermark. */ public long lastStableOffset() { - if (firstUnstableOffsetMetadata.isPresent() && firstUnstableOffsetMetadata.get().messageOffset < highWatermark()) { - return firstUnstableOffsetMetadata.get().messageOffset; + Optional firstUnstableOffsetMetadataCopy = firstUnstableOffsetMetadata; + if (firstUnstableOffsetMetadataCopy.isPresent() && firstUnstableOffsetMetadataCopy.get().messageOffset < highWatermark()) { + return firstUnstableOffsetMetadataCopy.get().messageOffset; } else { return highWatermark(); } @@ -751,15 +752,15 @@ public void assignTopicId(Uuid topicId) { } } else { this.topicId = Optional.of(topicId); - if (partitionMetadataFile.isPresent()) { - PartitionMetadataFile file = partitionMetadataFile.get(); - if (!file.exists()) { - file.record(topicId); - scheduler().scheduleOnce("flush-metadata-file", this::maybeFlushMetadataFile); - } - } else { - logger.warn("The topic id {} will not be persisted to the partition metadata file since the partition is deleted", topicId); - } + partitionMetadataFile.ifPresentOrElse( + file -> { + if (!file.exists()) { + file.record(topicId); + scheduler().scheduleOnce("flush-metadata-file", this::maybeFlushMetadataFile); + } + }, + () -> logger.warn("The topic id {} will not be persisted to the partition metadata file since the partition is deleted", topicId) + ); } } From 66d15fb326c70b863486ef32c33bc3f92247850c Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Fri, 16 May 2025 20:12:50 +0200 Subject: [PATCH 3/3] commnets --- .../org/apache/kafka/storage/internals/log/UnifiedLog.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index 08801dd202b97..85e85212b114f 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -645,7 +645,8 @@ public Optional firstUnstableOffset() { private LogOffsetMetadata fetchLastStableOffsetMetadata() throws IOException { localLog.checkIfMemoryMappedBufferClosed(); - // cache the current high watermark to avoid a concurrent update invalidating the range check + // cache the current high watermark and the first unstable offset metadata to avoid a concurrent update + // invalidating the range check breaking the isPresent check LogOffsetMetadata highWatermarkMetadata = fetchHighWatermarkMetadata(); Optional firstUnstableOffsetMetadataCopy = firstUnstableOffsetMetadata; if (firstUnstableOffsetMetadataCopy.isPresent() && firstUnstableOffsetMetadataCopy.get().messageOffset < highWatermarkMetadata.messageOffset) { @@ -672,6 +673,7 @@ private LogOffsetMetadata fetchLastStableOffsetMetadata() throws IOException { * beyond the high watermark. */ public long lastStableOffset() { + // cache the first unstable offset metadata to avoid a concurrent update breaking the isPresent check Optional firstUnstableOffsetMetadataCopy = firstUnstableOffsetMetadata; if (firstUnstableOffsetMetadataCopy.isPresent() && firstUnstableOffsetMetadataCopy.get().messageOffset < highWatermark()) { return firstUnstableOffsetMetadataCopy.get().messageOffset;