From 85dafd1d883e859b1e834e0b2c70116cba1bc261 Mon Sep 17 00:00:00 2001 From: "subarna.chatterjee" Date: Wed, 19 Feb 2025 15:34:13 -0500 Subject: [PATCH 1/3] CNDB-11565: Prevent replaying commit log with invalid mutations --- .../AbstractCommitLogSegmentManager.java | 17 ++----- .../cassandra/db/commitlog/CommitLog.java | 31 +++++++++++-- .../db/commitlog/CommitLogReplayer.java | 10 ++++ .../db/commitlog/CommitLogSegmentHandler.java | 46 +++++++++++++++++++ 4 files changed, 87 insertions(+), 17 deletions(-) create mode 100644 src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentHandler.java diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java index 7211e3a4ca8b..c9e003ff2238 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java @@ -105,6 +105,8 @@ public abstract class AbstractCommitLogSegmentManager */ private final AtomicLong size = new AtomicLong(); + public static CommitLogSegmentHandler commitLogSegmentHandler = new CommitLogSegmentHandler(); + @VisibleForTesting Thread managerThread; protected final CommitLog commitLog; @@ -414,21 +416,12 @@ void archiveAndDiscard(final CommitLogSegment segment) */ void handleReplayedSegment(final File file) { - handleReplayedSegment(file, false); + handleReplayedSegment(file, false, false); } - void handleReplayedSegment(final File file, boolean hasInvalidOrFailedMutations) + void handleReplayedSegment(final File file, boolean hasInvalidAndNoFailedMutations, boolean hasFailedMutations) { - if (!hasInvalidOrFailedMutations) - { - // (don't decrease managed size, since this was never a "live" segment) - logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file); - FileUtils.deleteWithConfirm(file); - } - else - { - logger.debug("File {} should not be deleted as it contains invalid or failed mutations", file.name()); - } + commitLogSegmentHandler.handleReplayedSegment(file, hasInvalidAndNoFailedMutations, hasFailedMutations); } /** diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 965ee67d54fe..3414e70048f5 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -20,10 +20,12 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.FileStore; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -33,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiPredicate; import java.util.function.Function; +import java.util.stream.Collectors; import java.util.zip.CRC32; import com.google.common.annotations.VisibleForTesting; @@ -91,8 +94,9 @@ public class CommitLog implements CommitLogMBean public final CommitLogArchiver archiver; public final CommitLogMetrics metrics; final AbstractCommitLogService executor; - private Set segmentsWithInvalidOrFailedMutations; - + private Set segmentsWithInvalidMutations; + private Set segmentsWithFailedMutations; + private final String nameOfInvalidMutationDirectory = "INVALID_MUTATIONS"; volatile Configuration configuration; private boolean started = false; @@ -233,10 +237,16 @@ public Map recoverSegmentsOnDisk(ColumnFamilyStore.FlushReaso replayedKeyspaces = recoverFiles(flushReason, files); logger.info("Log replay complete, {} replayed mutations", replayedKeyspaces.values().stream().reduce(Integer::sum).orElse(0)); + Set segmentsWithInvalidAndNoFailedMutations = new HashSet<>(segmentsWithInvalidMutations).stream() + .filter(segment -> !segmentsWithFailedMutations.contains(segment)) + .collect(Collectors.toSet()); + + // We retain all segments with failed mutations in the commit log directory of the host so that they can be replayed again. + // Move all segments with invalid (and no failed) mutations to a different sub-directory and delete the segment from the commit log directory of the host. for (File f : files) { - boolean hasInvalidOrFailedMutations = segmentsWithInvalidOrFailedMutations.contains(f.name()); - segmentManager.handleReplayedSegment(f, hasInvalidOrFailedMutations); + boolean hasFailedMutations = segmentsWithFailedMutations.contains(f.name()); + segmentManager.handleReplayedSegment(f, segmentsWithInvalidAndNoFailedMutations.contains(f.name()), hasFailedMutations); } } @@ -258,10 +268,16 @@ public Map recoverFiles(ColumnFamilyStore.FlushReason flushRe replayer.replayFiles(clogs); Map res = replayer.blockForWrites(flushReason); - segmentsWithInvalidOrFailedMutations = replayer.getSegmentWithInvalidOrFailedMutations(); + segmentsWithFailedMutations = replayer.getSegmentWithFailedMutations(); + segmentsWithInvalidMutations = replayer.getSegmentWithInvalidMutations(); return res; } + public String getNameOfInvalidMutationDirectory() + { + return nameOfInvalidMutationDirectory; + } + public void recoverPath(String path, boolean tolerateTruncation) throws IOException { CommitLogReplayer replayer = CommitLogReplayer.construct(this, getLocalHostId()); @@ -282,6 +298,11 @@ public void recover(String path) throws IOException recoverPath(path, false); } + public void setCommitLogSegmentHandler(CommitLogSegmentHandler handler) + { + AbstractCommitLogSegmentManager.commitLogSegmentHandler = handler; + } + /** * @return a CommitLogPosition which, if {@code >= one} returned from add(), implies add() was started * (but not necessarily finished) prior to this call diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 18bdf6091193..1be0980c8004 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -573,6 +573,16 @@ public Set getSegmentWithInvalidOrFailedMutations() return union; } + public Set getSegmentWithFailedMutations() + { + return segmentsWithFailedMutations; + } + + public Set getSegmentWithInvalidMutations() + { + return commitLogReader.getSegmentsWithInvalidMutations(); + } + public void handleInvalidMutation(TableId id) { mutationInitiator.onInvalidMutation(id); diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentHandler.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentHandler.java new file mode 100644 index 000000000000..85e0fc7b26e8 --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentHandler.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CommitLogSegmentHandler +{ + private static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentHandler.class); + public void handleReplayedSegment(final File file) + { + // no-op by default + }; + + public void handleReplayedSegment(final File file, boolean hasInvalidAndNoFailedMutations, boolean hasFailedMutations) + { + if (!hasFailedMutations) + { + // (don't decrease managed size, since this was never a "live" segment) + logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file); + FileUtils.deleteWithConfirm(file); + } + else + { + logger.debug("File {} should not be deleted as it contains invalid or failed mutations", file.name()); + } + } +} From 484d1637a0358858f8a14f11954d407c863d4ccb Mon Sep 17 00:00:00 2001 From: "niksa.jakovljevic" Date: Fri, 19 Sep 2025 12:30:03 +0200 Subject: [PATCH 2/3] Tweaks and adds test --- .../AbstractCommitLogSegmentManager.java | 6 +- .../cassandra/db/commitlog/CommitLog.java | 15 +--- .../db/commitlog/CommitLogReplayer.java | 7 -- .../db/commitlog/CommitLogSegmentHandler.java | 12 +-- .../db/commitlog/CommitLogReplayerTest.java | 4 +- .../CommitLogSegmentHandlerTest.java | 88 +++++++++++++++++++ 6 files changed, 100 insertions(+), 32 deletions(-) create mode 100644 test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentHandlerTest.java diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java index c9e003ff2238..d46b00f78612 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java @@ -105,7 +105,7 @@ public abstract class AbstractCommitLogSegmentManager */ private final AtomicLong size = new AtomicLong(); - public static CommitLogSegmentHandler commitLogSegmentHandler = new CommitLogSegmentHandler(); + public static volatile CommitLogSegmentHandler commitLogSegmentHandler = new CommitLogSegmentHandler(); @VisibleForTesting Thread managerThread; @@ -419,9 +419,9 @@ void handleReplayedSegment(final File file) handleReplayedSegment(file, false, false); } - void handleReplayedSegment(final File file, boolean hasInvalidAndNoFailedMutations, boolean hasFailedMutations) + void handleReplayedSegment(final File file, boolean hasInvalidMutations, boolean hasFailedMutations) { - commitLogSegmentHandler.handleReplayedSegment(file, hasInvalidAndNoFailedMutations, hasFailedMutations); + commitLogSegmentHandler.handleReplayedSegment(file, hasInvalidMutations, hasFailedMutations); } /** diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 3414e70048f5..c9763ddc642b 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -96,7 +96,6 @@ public class CommitLog implements CommitLogMBean final AbstractCommitLogService executor; private Set segmentsWithInvalidMutations; private Set segmentsWithFailedMutations; - private final String nameOfInvalidMutationDirectory = "INVALID_MUTATIONS"; volatile Configuration configuration; private boolean started = false; @@ -237,16 +236,9 @@ public Map recoverSegmentsOnDisk(ColumnFamilyStore.FlushReaso replayedKeyspaces = recoverFiles(flushReason, files); logger.info("Log replay complete, {} replayed mutations", replayedKeyspaces.values().stream().reduce(Integer::sum).orElse(0)); - Set segmentsWithInvalidAndNoFailedMutations = new HashSet<>(segmentsWithInvalidMutations).stream() - .filter(segment -> !segmentsWithFailedMutations.contains(segment)) - .collect(Collectors.toSet()); - - // We retain all segments with failed mutations in the commit log directory of the host so that they can be replayed again. - // Move all segments with invalid (and no failed) mutations to a different sub-directory and delete the segment from the commit log directory of the host. for (File f : files) { - boolean hasFailedMutations = segmentsWithFailedMutations.contains(f.name()); - segmentManager.handleReplayedSegment(f, segmentsWithInvalidAndNoFailedMutations.contains(f.name()), hasFailedMutations); + segmentManager.handleReplayedSegment(f, segmentsWithInvalidMutations.contains(f.name()), segmentsWithFailedMutations.contains(f.name())); } } @@ -273,11 +265,6 @@ public Map recoverFiles(ColumnFamilyStore.FlushReason flushRe return res; } - public String getNameOfInvalidMutationDirectory() - { - return nameOfInvalidMutationDirectory; - } - public void recoverPath(String path, boolean tolerateTruncation) throws IOException { CommitLogReplayer replayer = CommitLogReplayer.construct(this, getLocalHostId()); diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 1be0980c8004..2750695613ab 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -566,13 +566,6 @@ protected boolean pointInTimeExceeded(Mutation fm) return false; } - public Set getSegmentWithInvalidOrFailedMutations() - { - Set union = new HashSet<>(segmentsWithFailedMutations); - union.addAll(commitLogReader.getSegmentsWithInvalidMutations()); - return union; - } - public Set getSegmentWithFailedMutations() { return segmentsWithFailedMutations; diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentHandler.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentHandler.java index 85e0fc7b26e8..be2aa2f2db75 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentHandler.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentHandler.java @@ -22,17 +22,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * After recovery of commit logs is performed, this class is responsible for handling the commit log files that were + * replayed. + */ public class CommitLogSegmentHandler { private static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentHandler.class); - public void handleReplayedSegment(final File file) - { - // no-op by default - }; - public void handleReplayedSegment(final File file, boolean hasInvalidAndNoFailedMutations, boolean hasFailedMutations) + public void handleReplayedSegment(final File file, boolean hasInvalidMutations, boolean hasFailedMutations) { - if (!hasFailedMutations) + if (!hasFailedMutations && !hasInvalidMutations) { // (don't decrease managed size, since this was never a "live" segment) logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file); diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogReplayerTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogReplayerTest.java index edf03176f773..2b222b6deb36 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogReplayerTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogReplayerTest.java @@ -58,7 +58,7 @@ public void testTrackingSegmentsWhenMutationFails() when(descriptor.fileName()).thenReturn(failedSegment); CompletableFuture mutationFuture = mutationInitiator.initiateMutation(mock(Mutation.class), descriptor, 0, 0, replayer); Assert.assertThrows(ExecutionException.class, () -> mutationFuture.get()); - Assert.assertTrue(!replayer.getSegmentWithInvalidOrFailedMutations().isEmpty()); - Assert.assertTrue(replayer.getSegmentWithInvalidOrFailedMutations().contains(failedSegment)); + Assert.assertTrue(!replayer.getSegmentWithFailedMutations().isEmpty()); + Assert.assertTrue(replayer.getSegmentWithFailedMutations().contains(failedSegment)); } } diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentHandlerTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentHandlerTest.java new file mode 100644 index 000000000000..c81f3cbaf483 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentHandlerTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.commitlog; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.io.util.File; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class CommitLogSegmentHandlerTest +{ + private CommitLogSegmentHandler handler; + private Path tempDir; + private File testFile; + + @Before + public void setUp() throws IOException + { + handler = new CommitLogSegmentHandler(); + tempDir = Files.createTempDirectory("commitlog_test"); + testFile = new File(tempDir.resolve("test_segment.log")); + testFile.createFileIfNotExists(); + } + + @After + public void tearDown() throws IOException + { + if (testFile.exists()) + testFile.delete(); + Files.deleteIfExists(tempDir); + } + + @Test + public void testHandleReplayedSegmentDeletesFileWhenNoIssues() + { + assertTrue("Test file should exist before handling", testFile.exists()); + handler.handleReplayedSegment(testFile, false, false); + assertFalse("File should be deleted when no invalid or failed mutations", testFile.exists()); + } + + @Test + public void testHandleReplayedSegmentKeepsFileWhenHasInvalidMutations() + { + assertTrue("Test file should exist before handling", testFile.exists()); + handler.handleReplayedSegment(testFile, true, false); + assertTrue("File should be kept when has invalid mutations", testFile.exists()); + } + + @Test + public void testHandleReplayedSegmentKeepsFileWhenHasFailedMutations() + { + assertTrue("Test file should exist before handling", testFile.exists()); + handler.handleReplayedSegment(testFile, false, true); + assertTrue("File should be kept when has failed mutations", testFile.exists()); + } + + @Test + public void testHandleReplayedSegmentKeepsFileWhenHasBothIssues() + { + assertTrue("Test file should exist before handling", testFile.exists()); + handler.handleReplayedSegment(testFile, true, true); + assertTrue("File should be kept when has both invalid and failed mutations", testFile.exists()); + } +} From 112ba52f403a6a19b2b87af8e745840604ba2547 Mon Sep 17 00:00:00 2001 From: "niksa.jakovljevic" Date: Mon, 27 Oct 2025 15:57:28 +0100 Subject: [PATCH 3/3] Tweak code docs --- .../org/apache/cassandra/db/commitlog/CommitLogReader.java | 2 +- .../apache/cassandra/db/commitlog/CommitLogReplayer.java | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java index 8c7a43d58ee4..d47fbaea544e 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java @@ -56,7 +56,7 @@ public class CommitLogReader @VisibleForTesting public static final int ALL_MUTATIONS = -1; private final CRC32 checksum; - private final Map invalidMutations; + private final Map invalidMutations; // if we can't find a table for a mutation, we count it here private final Set segmentsWithInvalidMutations; private byte[] buffer; diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 2750695613ab..d834ac3823e9 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -99,7 +99,7 @@ public class CommitLogReplayer implements CommitLogReadHandler private final Map keyspacesReplayed; private final Queue> futures; - private final Set segmentsWithFailedMutations; + private final Set segmentsWithFailedMutations; // mutations that failed to apply private final Map> cfPersisted; private final CommitLogPosition globalPosition; @@ -571,6 +571,10 @@ public Set getSegmentWithFailedMutations() return segmentsWithFailedMutations; } + /** + * Get segments with invalid mutations. + * Invalid mutations are mutations for which the table can not be found. + */ public Set getSegmentWithInvalidMutations() { return commitLogReader.getSegmentsWithInvalidMutations();