Skip to content

cassandra-11565 Prevents replaying commit log segments with invalid mutations from being replayed over and over #1587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public abstract class AbstractCommitLogSegmentManager
*/
private final AtomicLong size = new AtomicLong();

public static CommitLogSegmentHandler commitLogSegmentHandler = new CommitLogSegmentHandler();

@VisibleForTesting
Thread managerThread;
protected final CommitLog commitLog;
Expand Down Expand Up @@ -414,21 +416,12 @@ void archiveAndDiscard(final CommitLogSegment segment)
*/
void handleReplayedSegment(final File file)
{
handleReplayedSegment(file, false);
handleReplayedSegment(file, false, false);
}

void handleReplayedSegment(final File file, boolean hasInvalidOrFailedMutations)
void handleReplayedSegment(final File file, boolean hasInvalidAndNoFailedMutations, boolean hasFailedMutations)
{
if (!hasInvalidOrFailedMutations)
{
// (don't decrease managed size, since this was never a "live" segment)
logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file);
FileUtils.deleteWithConfirm(file);
}
else
{
logger.debug("File {} should not be deleted as it contains invalid or failed mutations", file.name());
}
commitLogSegmentHandler.handleReplayedSegment(file, hasInvalidAndNoFailedMutations, hasFailedMutations);
}

/**
Expand Down
31 changes: 26 additions & 5 deletions src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileStore;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -33,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.CRC32;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -91,8 +94,9 @@ public class CommitLog implements CommitLogMBean
public final CommitLogArchiver archiver;
public final CommitLogMetrics metrics;
final AbstractCommitLogService executor;
private Set<String> segmentsWithInvalidOrFailedMutations;

private Set<String> segmentsWithInvalidMutations;
private Set<String> segmentsWithFailedMutations;
private final String nameOfInvalidMutationDirectory = "INVALID_MUTATIONS";
volatile Configuration configuration;
private boolean started = false;

Expand Down Expand Up @@ -233,10 +237,16 @@ public Map<Keyspace, Integer> recoverSegmentsOnDisk(ColumnFamilyStore.FlushReaso
replayedKeyspaces = recoverFiles(flushReason, files);
logger.info("Log replay complete, {} replayed mutations", replayedKeyspaces.values().stream().reduce(Integer::sum).orElse(0));

Set<String> segmentsWithInvalidAndNoFailedMutations = new HashSet<>(segmentsWithInvalidMutations).stream()
.filter(segment -> !segmentsWithFailedMutations.contains(segment))
.collect(Collectors.toSet());

// We retain all segments with failed mutations in the commit log directory of the host so that they can be replayed again.
// Move all segments with invalid (and no failed) mutations to a different sub-directory and delete the segment from the commit log directory of the host.
for (File f : files)
{
boolean hasInvalidOrFailedMutations = segmentsWithInvalidOrFailedMutations.contains(f.name());
segmentManager.handleReplayedSegment(f, hasInvalidOrFailedMutations);
boolean hasFailedMutations = segmentsWithFailedMutations.contains(f.name());
segmentManager.handleReplayedSegment(f, segmentsWithInvalidAndNoFailedMutations.contains(f.name()), hasFailedMutations);
}
}

Expand All @@ -258,10 +268,16 @@ public Map<Keyspace, Integer> recoverFiles(ColumnFamilyStore.FlushReason flushRe
replayer.replayFiles(clogs);

Map<Keyspace, Integer> res = replayer.blockForWrites(flushReason);
segmentsWithInvalidOrFailedMutations = replayer.getSegmentWithInvalidOrFailedMutations();
segmentsWithFailedMutations = replayer.getSegmentWithFailedMutations();
segmentsWithInvalidMutations = replayer.getSegmentWithInvalidMutations();
return res;
}

public String getNameOfInvalidMutationDirectory()
{
return nameOfInvalidMutationDirectory;
}

public void recoverPath(String path, boolean tolerateTruncation) throws IOException
{
CommitLogReplayer replayer = CommitLogReplayer.construct(this, getLocalHostId());
Expand All @@ -282,6 +298,11 @@ public void recover(String path) throws IOException
recoverPath(path, false);
}

public void setCommitLogSegmentHandler(CommitLogSegmentHandler handler)
{
AbstractCommitLogSegmentManager.commitLogSegmentHandler = handler;
}

/**
* @return a CommitLogPosition which, if {@code >= one} returned from add(), implies add() was started
* (but not necessarily finished) prior to this call
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,16 @@ public Set<String> getSegmentWithInvalidOrFailedMutations()
return union;
}

public Set<String> getSegmentWithFailedMutations()
{
return segmentsWithFailedMutations;
}

public Set<String> getSegmentWithInvalidMutations()
{
return commitLogReader.getSegmentsWithInvalidMutations();
}

public void handleInvalidMutation(TableId id)
{
mutationInitiator.onInvalidMutation(id);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.commitlog;

import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLogSegmentHandler
{
private static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentHandler.class);
public void handleReplayedSegment(final File file)
{
// no-op by default
};

public void handleReplayedSegment(final File file, boolean hasInvalidAndNoFailedMutations, boolean hasFailedMutations)
{
if (!hasFailedMutations)
{
// (don't decrease managed size, since this was never a "live" segment)
logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file);
FileUtils.deleteWithConfirm(file);
}
else
{
logger.debug("File {} should not be deleted as it contains invalid or failed mutations", file.name());
}
}
}