Skip to content

Fix incorrect data file length for early open sstables #4114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/cache/ChunkCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;

Expand Down Expand Up @@ -326,4 +327,13 @@ public long weightedSize()
.map(policy -> policy.weightedSize().orElseGet(cache::estimatedSize))
.orElseGet(cache::estimatedSize);
}

/**
* Returns the number of cached chunks of given file.
*/
@VisibleForTesting
public int sizeOfFile(String filePath)
{
return (int) cache.asMap().keySet().stream().filter(x -> x.path.equals(filePath)).count();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on this being on a hot path or not we might contemplate about removing streams in favor of plain for loop.

https://lists.apache.org/thread/65glsjzkmpktzmns6j9wvr4nczvskx36

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is only meant to be used in tests.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class WrappedLifecycleTransaction implements ILifecycleTransaction
{

final ILifecycleTransaction delegate;
protected final ILifecycleTransaction delegate;
public WrappedLifecycleTransaction(ILifecycleTransaction delegate)
{
this.delegate = delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, DeletionTime part
}

@SuppressWarnings({ "resource", "RedundantSuppression" }) // dataFile is closed along with the reader
private BtiTableReader openInternal(OpenReason openReason, boolean isFinal, Supplier<PartitionIndex> partitionIndexSupplier)
private BtiTableReader openInternal(OpenReason openReason, long lengthOverride, Supplier<PartitionIndex> partitionIndexSupplier)
{
IFilter filter = null;
FileHandle dataFile = null;
Expand All @@ -99,7 +99,7 @@ private BtiTableReader openInternal(OpenReason openReason, boolean isFinal, Supp

partitionIndex = partitionIndexSupplier.get();
rowIndexFile = indexWriter.rowIndexFHBuilder.complete();
dataFile = openDataFile(isFinal ? NO_LENGTH_OVERRIDE : dataWriter.getLastFlushOffset(), builder.getStatsMetadata());
dataFile = openDataFile(lengthOverride, builder.getStatsMetadata());
filter = indexWriter.getFilterCopy();

return builder.setPartitionIndex(partitionIndex)
Expand All @@ -121,11 +121,12 @@ private BtiTableReader openInternal(OpenReason openReason, boolean isFinal, Supp
@Override
public void openEarly(Consumer<SSTableReader> callWhenReady)
{
long dataLength = dataWriter.position();
// Because the partition index writer is one partition behind, we want the file to stop at the start of the
// last partition that was written.
long dataLength = partitionWriter.getInitialPosition();
indexWriter.buildPartial(dataLength, partitionIndex ->
{
indexWriter.rowIndexFHBuilder.withLengthOverride(indexWriter.rowIndexWriter.getLastFlushOffset());
BtiTableReader reader = openInternal(OpenReason.EARLY, false, () -> partitionIndex);
BtiTableReader reader = openInternal(OpenReason.EARLY, dataLength, () -> partitionIndex);
callWhenReady.accept(reader);
});
}
Expand All @@ -151,7 +152,7 @@ protected SSTableReader openFinal(OpenReason openReason)
if (maxDataAge < 0)
maxDataAge = Clock.Global.currentTimeMillis();

return openInternal(openReason, true, indexWriter::completedPartitionIndex);
return openInternal(openReason, NO_LENGTH_OVERRIDE, indexWriter::completedPartitionIndex);
}

/**
Expand Down Expand Up @@ -214,7 +215,14 @@ public long append(DecoratedKey key, AbstractRowIndexEntry indexEntry) throws IO

public boolean buildPartial(long dataPosition, Consumer<PartitionIndex> callWhenReady)
{
return partitionIndex.buildPartial(callWhenReady, rowIndexWriter.position(), dataPosition);
long rowIndexPosition = rowIndexWriter.position();
return partitionIndex.buildPartial(partitionIndex ->
{
rowIndexFHBuilder.withLengthOverride(rowIndexPosition);
callWhenReady.accept(partitionIndex);
rowIndexFHBuilder.withLengthOverride(NO_LENGTH_OVERRIDE);
},
rowIndexPosition, dataPosition);
}

public void mark()
Expand Down Expand Up @@ -262,8 +270,8 @@ void complete() throws FSWriteError
PartitionIndex completedPartitionIndex()
{
complete();
rowIndexFHBuilder.withLengthOverride(0);
partitionIndexFHBuilder.withLengthOverride(0);
rowIndexFHBuilder.withLengthOverride(NO_LENGTH_OVERRIDE);
partitionIndexFHBuilder.withLengthOverride(NO_LENGTH_OVERRIDE);
try
{
return PartitionIndex.load(partitionIndexFHBuilder, metadata.getLocal().partitioner, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private void refreshReadableBoundary()
}
finally
{
fhBuilder.withLengthOverride(-1);
fhBuilder.withLengthOverride(FileHandle.Builder.NO_LENGTH_OVERRIDE);
}

}
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/io/util/FileHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public FileHandle complete(Function<File, ChannelProxy> channelProxyFactory)
channel = channelProxyFactory.apply(file);

long fileLength = (compressionMetadata != null) ? compressionMetadata.compressedFileLength : channel.size();
long length = lengthOverride > 0 ? lengthOverride : fileLength;
long length = lengthOverride >= 0 ? lengthOverride : fileLength;

RebuffererFactory rebuffererFactory;
if (length == 0)
Expand Down
157 changes: 157 additions & 0 deletions test/unit/org/apache/cassandra/io/sstable/EarlyOpenIterationTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.io.sstable;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.WrappedLifecycleTransaction;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;

import static org.apache.cassandra.config.CassandraRelevantProperties.SSTABLE_FORMAT_DEFAULT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
public class EarlyOpenIterationTest extends CQLTester
{
@Parameterized.Parameters(name = "format={0}")
public static Collection<Object> generateParameters()
{
// We need to set up the class here, as the parameterized test runner will not call the @BeforeClass method
paramaterizedSetUpClass();

return Lists.newArrayList(DatabaseDescriptor.getSSTableFormats().values());
}

public static void paramaterizedSetUpClass()
{
CQLTester.setUpClass();
DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard);
}

Random rand = new Random();

@Parameterized.Parameter
public SSTableFormat<?, ?> format = DatabaseDescriptor.getSelectedSSTableFormat();

@BeforeClass
public static void setUpClass() // override CQLTester's setUpClass
{
// No-op, as initialization was done in paramaterizedSetUpClass, and we don't want to call CQLTester.setUpClass again
}

@Test
public void testFinalOpenIteration() throws InterruptedException
{
SSTABLE_FORMAT_DEFAULT.setString(format.name());
createTable("CREATE TABLE %s (pkey text, ckey text, val blob, PRIMARY KEY (pkey, ckey))");

for (int i = 0; i < 800; i++)
{
String pkey = RandomStrings.randomAsciiOfLengthBetween(rand, 10, 10);
for (int j = 0; j < 100; j++)
execute("INSERT INTO %s (pkey, ckey, val) VALUES (?, ?, ?)", pkey, "" + j, ByteBuffer.allocate(300));
}
flush();
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();

AtomicInteger opened = new AtomicInteger(0);
assertEquals(1, cfs.getLiveSSTables().size());
SSTableReader source = cfs.getLiveSSTables().iterator().next();

Consumer<Iterable<SSTableReader>> consumer = current -> {
for (SSTableReader s : current)
{
readAllAndVerifyKeySpan(s);
if (s.openReason == SSTableReader.OpenReason.EARLY)
opened.incrementAndGet();
}
};

SSTableReader finalReader;
try (WrappedLifecycleTransaction txn = new WrappedLifecycleTransaction(cfs.getTracker().tryModify(source, OperationType.COMPACTION))
{
@Override
public void checkpoint()
{
consumer.accept(((LifecycleTransaction) delegate).current());
super.checkpoint();
}
};
SSTableRewriter writer = new SSTableRewriter(txn, 1000, 100L << 10, false))
{
writer.switchWriter(SSTableWriterTestBase.getWriter(format, cfs, cfs.getDirectories().getDirectoryForNewSSTables(), txn));
var iter = source.getScanner();
while (iter.hasNext())
{
var next = iter.next();
writer.append(next);
}
finalReader = writer.finish().iterator().next();
}
assertTrue("No early opening occured", opened.get() > 0);

assertEquals(Sets.newHashSet(finalReader), cfs.getLiveSSTables());
readAllAndVerifyKeySpan(finalReader);
}

private static void readAllAndVerifyKeySpan(SSTableReader s)
{
DecoratedKey firstKey = null;
DecoratedKey lastKey = null;
try (var iter = s.getScanner())
{
while (iter.hasNext())
{
try (var partition = iter.next())
{
// consume all rows, so that the data is cached
partition.forEachRemaining(column -> {
// consume all columns
});
if (firstKey == null)
firstKey = partition.partitionKey();
lastKey = partition.partitionKey();
}
}
}
assertEquals("Simple scanner does not iterate all content", s.getFirst(), firstKey);
assertEquals("Simple scanner does not iterate all content", s.getLast(), lastKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
Expand Down Expand Up @@ -153,27 +155,41 @@ public static void validateCFS(ColumnFamilyStore cfs)
assertFalse(CompactionManager.instance.submitMaximal(cfs, cfs.gcBefore((int) (System.currentTimeMillis() / 1000)), false).isEmpty());
}

public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn, long repairedAt, TimeUUID pendingRepair, boolean isTransient)
public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, ILifecycleTransaction txn, long repairedAt, TimeUUID pendingRepair, boolean isTransient)
{
return getWriter(cfs.newSSTableDescriptor(directory), cfs, txn, repairedAt, pendingRepair, isTransient);
}

public static SSTableWriter getWriter(SSTableFormat<?, ?> format, ColumnFamilyStore cfs, File directory, ILifecycleTransaction txn, long repairedAt, TimeUUID pendingRepair, boolean isTransient)
{
return getWriter(cfs.newSSTableDescriptor(directory, format), cfs, txn, repairedAt, pendingRepair, isTransient);
}

public static SSTableWriter getWriter(Descriptor desc, ColumnFamilyStore cfs, ILifecycleTransaction txn, long repairedAt, TimeUUID pendingRepair, boolean isTransient)
{
Descriptor desc = cfs.newSSTableDescriptor(directory);
return desc.getFormat().getWriterFactory().builder(desc)
.setTableMetadataRef(cfs.metadata)
.setKeyCount(0)
.setRepairedAt(repairedAt)
.setPendingRepair(pendingRepair)
.setTransientSSTable(isTransient)
.setSerializationHeader(new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))
.setSecondaryIndexGroups(cfs.indexManager.listIndexGroups())
.setMetadataCollector(new MetadataCollector(cfs.metadata().comparator))
.addDefaultComponents(cfs.indexManager.listIndexGroups())
.build(txn, cfs);
.setTableMetadataRef(cfs.metadata)
.setKeyCount(0)
.setRepairedAt(repairedAt)
.setPendingRepair(pendingRepair)
.setTransientSSTable(isTransient)
.setSerializationHeader(new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))
.setSecondaryIndexGroups(cfs.indexManager.listIndexGroups())
.setMetadataCollector(new MetadataCollector(cfs.metadata().comparator))
.addDefaultComponents(cfs.indexManager.listIndexGroups())
.build(txn, cfs);
}

public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, ILifecycleTransaction txn)
{
return getWriter(cfs, directory, txn, 0, null, false);
}

public static SSTableWriter getWriter(SSTableFormat<?, ?> format, ColumnFamilyStore cfs, File directory, ILifecycleTransaction txn)
{
return getWriter(format, cfs, directory, txn, 0, null, false);
}

public static ByteBuffer random(int i, int size)
{
byte[] bytes = new byte[size + 4];
Expand Down