Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ private void initializeTopicId() {

if (partMetadataFile.exists()) {
Uuid fileTopicId = partMetadataFile.read().topicId();
if (topicId.isPresent() && !topicId.get().equals(fileTopicId)) {
if (topicId.filter(x -> !x.equals(fileTopicId)).isPresent()) {
throw new InconsistentTopicIdException("Tried to assign topic ID " + topicId + " to log for topic partition " + topicPartition() + "," +
"but log already contained topic ID " + fileTopicId);
}
Expand Down Expand Up @@ -645,10 +645,12 @@ public Optional<Long> firstUnstableOffset() {
private LogOffsetMetadata fetchLastStableOffsetMetadata() throws IOException {
localLog.checkIfMemoryMappedBufferClosed();

// cache the current high watermark to avoid a concurrent update invalidating the range check
// cache the current high watermark and the first unstable offset metadata to avoid a concurrent update
// invalidating the range check breaking the isPresent check
LogOffsetMetadata highWatermarkMetadata = fetchHighWatermarkMetadata();
if (firstUnstableOffsetMetadata.isPresent() && firstUnstableOffsetMetadata.get().messageOffset < highWatermarkMetadata.messageOffset) {
LogOffsetMetadata lom = firstUnstableOffsetMetadata.get();
Optional<LogOffsetMetadata> firstUnstableOffsetMetadataCopy = firstUnstableOffsetMetadata;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a comment or update the comment just above?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (firstUnstableOffsetMetadataCopy.isPresent() && firstUnstableOffsetMetadataCopy.get().messageOffset < highWatermarkMetadata.messageOffset) {
LogOffsetMetadata lom = firstUnstableOffsetMetadataCopy.get();
if (lom.messageOffsetOnly()) {
synchronized (lock) {
LogOffsetMetadata fullOffset = maybeConvertToOffsetMetadata(lom.messageOffset);
Expand All @@ -671,8 +673,10 @@ private LogOffsetMetadata fetchLastStableOffsetMetadata() throws IOException {
* beyond the high watermark.
*/
public long lastStableOffset() {
if (firstUnstableOffsetMetadata.isPresent() && firstUnstableOffsetMetadata.get().messageOffset < highWatermark()) {
return firstUnstableOffsetMetadata.get().messageOffset;
// cache the first unstable offset metadata to avoid a concurrent update breaking the isPresent check
Optional<LogOffsetMetadata> firstUnstableOffsetMetadataCopy = firstUnstableOffsetMetadata;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth adding a comment. There are a few places in this class where we say we cache a value to avoid concurrency issues.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (firstUnstableOffsetMetadataCopy.isPresent() && firstUnstableOffsetMetadataCopy.get().messageOffset < highWatermark()) {
return firstUnstableOffsetMetadataCopy.get().messageOffset;
} else {
return highWatermark();
}
Expand Down Expand Up @@ -750,15 +754,15 @@ public void assignTopicId(Uuid topicId) {
}
} else {
this.topicId = Optional.of(topicId);
if (partitionMetadataFile.isPresent()) {
PartitionMetadataFile file = partitionMetadataFile.get();
if (!file.exists()) {
file.record(topicId);
scheduler().scheduleOnce("flush-metadata-file", this::maybeFlushMetadataFile);
}
} else {
logger.warn("The topic id {} will not be persisted to the partition metadata file since the partition is deleted", topicId);
}
partitionMetadataFile.ifPresentOrElse(
file -> {
if (!file.exists()) {
file.record(topicId);
scheduler().scheduleOnce("flush-metadata-file", this::maybeFlushMetadataFile);
}
},
() -> logger.warn("The topic id {} will not be persisted to the partition metadata file since the partition is deleted", topicId)
);
}
}

Expand Down