diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index 972aeb095815a..85e85212b114f 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -483,7 +483,7 @@ private void initializeTopicId() { if (partMetadataFile.exists()) { Uuid fileTopicId = partMetadataFile.read().topicId(); - if (topicId.isPresent() && !topicId.get().equals(fileTopicId)) { + if (topicId.filter(x -> !x.equals(fileTopicId)).isPresent()) { throw new InconsistentTopicIdException("Tried to assign topic ID " + topicId + " to log for topic partition " + topicPartition() + "," + "but log already contained topic ID " + fileTopicId); } @@ -645,10 +645,12 @@ public Optional firstUnstableOffset() { private LogOffsetMetadata fetchLastStableOffsetMetadata() throws IOException { localLog.checkIfMemoryMappedBufferClosed(); - // cache the current high watermark to avoid a concurrent update invalidating the range check + // cache the current high watermark and the first unstable offset metadata to avoid a concurrent update + // invalidating the range check breaking the isPresent check LogOffsetMetadata highWatermarkMetadata = fetchHighWatermarkMetadata(); - if (firstUnstableOffsetMetadata.isPresent() && firstUnstableOffsetMetadata.get().messageOffset < highWatermarkMetadata.messageOffset) { - LogOffsetMetadata lom = firstUnstableOffsetMetadata.get(); + Optional firstUnstableOffsetMetadataCopy = firstUnstableOffsetMetadata; + if (firstUnstableOffsetMetadataCopy.isPresent() && firstUnstableOffsetMetadataCopy.get().messageOffset < highWatermarkMetadata.messageOffset) { + LogOffsetMetadata lom = firstUnstableOffsetMetadataCopy.get(); if (lom.messageOffsetOnly()) { synchronized (lock) { LogOffsetMetadata fullOffset = maybeConvertToOffsetMetadata(lom.messageOffset); @@ -671,8 +673,10 @@ private LogOffsetMetadata fetchLastStableOffsetMetadata() throws IOException { * beyond the high watermark. */ public long lastStableOffset() { - if (firstUnstableOffsetMetadata.isPresent() && firstUnstableOffsetMetadata.get().messageOffset < highWatermark()) { - return firstUnstableOffsetMetadata.get().messageOffset; + // cache the first unstable offset metadata to avoid a concurrent update breaking the isPresent check + Optional firstUnstableOffsetMetadataCopy = firstUnstableOffsetMetadata; + if (firstUnstableOffsetMetadataCopy.isPresent() && firstUnstableOffsetMetadataCopy.get().messageOffset < highWatermark()) { + return firstUnstableOffsetMetadataCopy.get().messageOffset; } else { return highWatermark(); } @@ -750,15 +754,15 @@ public void assignTopicId(Uuid topicId) { } } else { this.topicId = Optional.of(topicId); - if (partitionMetadataFile.isPresent()) { - PartitionMetadataFile file = partitionMetadataFile.get(); - if (!file.exists()) { - file.record(topicId); - scheduler().scheduleOnce("flush-metadata-file", this::maybeFlushMetadataFile); - } - } else { - logger.warn("The topic id {} will not be persisted to the partition metadata file since the partition is deleted", topicId); - } + partitionMetadataFile.ifPresentOrElse( + file -> { + if (!file.exists()) { + file.record(topicId); + scheduler().scheduleOnce("flush-metadata-file", this::maybeFlushMetadataFile); + } + }, + () -> logger.warn("The topic id {} will not be persisted to the partition metadata file since the partition is deleted", topicId) + ); } }