diff --git a/src/java/org/apache/cassandra/cache/ChunkCache.java b/src/java/org/apache/cassandra/cache/ChunkCache.java index 5fb0bc6a2f90..ac9bdd0489ea 100644 --- a/src/java/org/apache/cassandra/cache/ChunkCache.java +++ b/src/java/org/apache/cassandra/cache/ChunkCache.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; @@ -326,4 +327,13 @@ public long weightedSize() .map(policy -> policy.weightedSize().orElseGet(cache::estimatedSize)) .orElseGet(cache::estimatedSize); } + + /** + * Returns the number of cached chunks of given file. + */ + @VisibleForTesting + public int sizeOfFile(String filePath) + { + return (int) cache.asMap().keySet().stream().filter(x -> x.path.equals(filePath)).count(); + } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/db/lifecycle/WrappedLifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/WrappedLifecycleTransaction.java index 12c46a9573ca..12eed440e1e8 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/WrappedLifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/WrappedLifecycleTransaction.java @@ -28,7 +28,7 @@ public class WrappedLifecycleTransaction implements ILifecycleTransaction { - final ILifecycleTransaction delegate; + protected final ILifecycleTransaction delegate; public WrappedLifecycleTransaction(ILifecycleTransaction delegate) { this.delegate = delegate; diff --git a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java index 7aad38511f16..a724fc26b339 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java @@ -82,7 +82,7 @@ protected TrieIndexEntry createRowIndexEntry(DecoratedKey key, DeletionTime part } @SuppressWarnings({ "resource", "RedundantSuppression" }) // dataFile is closed along with the reader - private BtiTableReader openInternal(OpenReason openReason, boolean isFinal, Supplier partitionIndexSupplier) + private BtiTableReader openInternal(OpenReason openReason, long lengthOverride, Supplier partitionIndexSupplier) { IFilter filter = null; FileHandle dataFile = null; @@ -99,7 +99,7 @@ private BtiTableReader openInternal(OpenReason openReason, boolean isFinal, Supp partitionIndex = partitionIndexSupplier.get(); rowIndexFile = indexWriter.rowIndexFHBuilder.complete(); - dataFile = openDataFile(isFinal ? NO_LENGTH_OVERRIDE : dataWriter.getLastFlushOffset(), builder.getStatsMetadata()); + dataFile = openDataFile(lengthOverride, builder.getStatsMetadata()); filter = indexWriter.getFilterCopy(); return builder.setPartitionIndex(partitionIndex) @@ -121,11 +121,12 @@ private BtiTableReader openInternal(OpenReason openReason, boolean isFinal, Supp @Override public void openEarly(Consumer callWhenReady) { - long dataLength = dataWriter.position(); + // Because the partition index writer is one partition behind, we want the file to stop at the start of the + // last partition that was written. + long dataLength = partitionWriter.getInitialPosition(); indexWriter.buildPartial(dataLength, partitionIndex -> { - indexWriter.rowIndexFHBuilder.withLengthOverride(indexWriter.rowIndexWriter.getLastFlushOffset()); - BtiTableReader reader = openInternal(OpenReason.EARLY, false, () -> partitionIndex); + BtiTableReader reader = openInternal(OpenReason.EARLY, dataLength, () -> partitionIndex); callWhenReady.accept(reader); }); } @@ -151,7 +152,7 @@ protected SSTableReader openFinal(OpenReason openReason) if (maxDataAge < 0) maxDataAge = Clock.Global.currentTimeMillis(); - return openInternal(openReason, true, indexWriter::completedPartitionIndex); + return openInternal(openReason, NO_LENGTH_OVERRIDE, indexWriter::completedPartitionIndex); } /** @@ -214,7 +215,14 @@ public long append(DecoratedKey key, AbstractRowIndexEntry indexEntry) throws IO public boolean buildPartial(long dataPosition, Consumer callWhenReady) { - return partitionIndex.buildPartial(callWhenReady, rowIndexWriter.position(), dataPosition); + long rowIndexPosition = rowIndexWriter.position(); + return partitionIndex.buildPartial(partitionIndex -> + { + rowIndexFHBuilder.withLengthOverride(rowIndexPosition); + callWhenReady.accept(partitionIndex); + rowIndexFHBuilder.withLengthOverride(NO_LENGTH_OVERRIDE); + }, + rowIndexPosition, dataPosition); } public void mark() @@ -262,8 +270,8 @@ void complete() throws FSWriteError PartitionIndex completedPartitionIndex() { complete(); - rowIndexFHBuilder.withLengthOverride(0); - partitionIndexFHBuilder.withLengthOverride(0); + rowIndexFHBuilder.withLengthOverride(NO_LENGTH_OVERRIDE); + partitionIndexFHBuilder.withLengthOverride(NO_LENGTH_OVERRIDE); try { return PartitionIndex.load(partitionIndexFHBuilder, metadata.getLocal().partitioner, false); diff --git a/src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndexBuilder.java b/src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndexBuilder.java index b096b9d05ffb..803f43efe2af 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndexBuilder.java +++ b/src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndexBuilder.java @@ -116,7 +116,7 @@ private void refreshReadableBoundary() } finally { - fhBuilder.withLengthOverride(-1); + fhBuilder.withLengthOverride(FileHandle.Builder.NO_LENGTH_OVERRIDE); } } diff --git a/src/java/org/apache/cassandra/io/util/FileHandle.java b/src/java/org/apache/cassandra/io/util/FileHandle.java index 67bfd239d61e..7e6f06eba290 100644 --- a/src/java/org/apache/cassandra/io/util/FileHandle.java +++ b/src/java/org/apache/cassandra/io/util/FileHandle.java @@ -395,7 +395,7 @@ public FileHandle complete(Function channelProxyFactory) channel = channelProxyFactory.apply(file); long fileLength = (compressionMetadata != null) ? compressionMetadata.compressedFileLength : channel.size(); - long length = lengthOverride > 0 ? lengthOverride : fileLength; + long length = lengthOverride >= 0 ? lengthOverride : fileLength; RebuffererFactory rebuffererFactory; if (length == 0) diff --git a/test/unit/org/apache/cassandra/io/sstable/EarlyOpenIterationTest.java b/test/unit/org/apache/cassandra/io/sstable/EarlyOpenIterationTest.java new file mode 100644 index 000000000000..0b5bdc59045f --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/EarlyOpenIterationTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import com.carrotsearch.randomizedtesting.generators.RandomStrings; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.WrappedLifecycleTransaction; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableReader; + +import static org.apache.cassandra.config.CassandraRelevantProperties.SSTABLE_FORMAT_DEFAULT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class EarlyOpenIterationTest extends CQLTester +{ + @Parameterized.Parameters(name = "format={0}") + public static Collection generateParameters() + { + // We need to set up the class here, as the parameterized test runner will not call the @BeforeClass method + paramaterizedSetUpClass(); + + return Lists.newArrayList(DatabaseDescriptor.getSSTableFormats().values()); + } + + public static void paramaterizedSetUpClass() + { + CQLTester.setUpClass(); + DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard); + } + + Random rand = new Random(); + + @Parameterized.Parameter + public SSTableFormat format = DatabaseDescriptor.getSelectedSSTableFormat(); + + @BeforeClass + public static void setUpClass() // override CQLTester's setUpClass + { + // No-op, as initialization was done in paramaterizedSetUpClass, and we don't want to call CQLTester.setUpClass again + } + + @Test + public void testFinalOpenIteration() throws InterruptedException + { + SSTABLE_FORMAT_DEFAULT.setString(format.name()); + createTable("CREATE TABLE %s (pkey text, ckey text, val blob, PRIMARY KEY (pkey, ckey))"); + + for (int i = 0; i < 800; i++) + { + String pkey = RandomStrings.randomAsciiOfLengthBetween(rand, 10, 10); + for (int j = 0; j < 100; j++) + execute("INSERT INTO %s (pkey, ckey, val) VALUES (?, ?, ?)", pkey, "" + j, ByteBuffer.allocate(300)); + } + flush(); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + + AtomicInteger opened = new AtomicInteger(0); + assertEquals(1, cfs.getLiveSSTables().size()); + SSTableReader source = cfs.getLiveSSTables().iterator().next(); + + Consumer> consumer = current -> { + for (SSTableReader s : current) + { + readAllAndVerifyKeySpan(s); + if (s.openReason == SSTableReader.OpenReason.EARLY) + opened.incrementAndGet(); + } + }; + + SSTableReader finalReader; + try (WrappedLifecycleTransaction txn = new WrappedLifecycleTransaction(cfs.getTracker().tryModify(source, OperationType.COMPACTION)) + { + @Override + public void checkpoint() + { + consumer.accept(((LifecycleTransaction) delegate).current()); + super.checkpoint(); + } + }; + SSTableRewriter writer = new SSTableRewriter(txn, 1000, 100L << 10, false)) + { + writer.switchWriter(SSTableWriterTestBase.getWriter(format, cfs, cfs.getDirectories().getDirectoryForNewSSTables(), txn)); + var iter = source.getScanner(); + while (iter.hasNext()) + { + var next = iter.next(); + writer.append(next); + } + finalReader = writer.finish().iterator().next(); + } + assertTrue("No early opening occured", opened.get() > 0); + + assertEquals(Sets.newHashSet(finalReader), cfs.getLiveSSTables()); + readAllAndVerifyKeySpan(finalReader); + } + + private static void readAllAndVerifyKeySpan(SSTableReader s) + { + DecoratedKey firstKey = null; + DecoratedKey lastKey = null; + try (var iter = s.getScanner()) + { + while (iter.hasNext()) + { + try (var partition = iter.next()) + { + // consume all rows, so that the data is cached + partition.forEachRemaining(column -> { + // consume all columns + }); + if (firstKey == null) + firstKey = partition.partitionKey(); + lastKey = partition.partitionKey(); + } + } + } + assertEquals("Simple scanner does not iterate all content", s.getFirst(), firstKey); + assertEquals("Simple scanner does not iterate all content", s.getLast(), lastKey); + } +} diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java index 061591500ee5..1ccd74ffe16c 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java @@ -36,9 +36,11 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; @@ -153,27 +155,41 @@ public static void validateCFS(ColumnFamilyStore cfs) assertFalse(CompactionManager.instance.submitMaximal(cfs, cfs.gcBefore((int) (System.currentTimeMillis() / 1000)), false).isEmpty()); } - public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn, long repairedAt, TimeUUID pendingRepair, boolean isTransient) + public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, ILifecycleTransaction txn, long repairedAt, TimeUUID pendingRepair, boolean isTransient) + { + return getWriter(cfs.newSSTableDescriptor(directory), cfs, txn, repairedAt, pendingRepair, isTransient); + } + + public static SSTableWriter getWriter(SSTableFormat format, ColumnFamilyStore cfs, File directory, ILifecycleTransaction txn, long repairedAt, TimeUUID pendingRepair, boolean isTransient) + { + return getWriter(cfs.newSSTableDescriptor(directory, format), cfs, txn, repairedAt, pendingRepair, isTransient); + } + + public static SSTableWriter getWriter(Descriptor desc, ColumnFamilyStore cfs, ILifecycleTransaction txn, long repairedAt, TimeUUID pendingRepair, boolean isTransient) { - Descriptor desc = cfs.newSSTableDescriptor(directory); return desc.getFormat().getWriterFactory().builder(desc) - .setTableMetadataRef(cfs.metadata) - .setKeyCount(0) - .setRepairedAt(repairedAt) - .setPendingRepair(pendingRepair) - .setTransientSSTable(isTransient) - .setSerializationHeader(new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)) - .setSecondaryIndexGroups(cfs.indexManager.listIndexGroups()) - .setMetadataCollector(new MetadataCollector(cfs.metadata().comparator)) - .addDefaultComponents(cfs.indexManager.listIndexGroups()) - .build(txn, cfs); + .setTableMetadataRef(cfs.metadata) + .setKeyCount(0) + .setRepairedAt(repairedAt) + .setPendingRepair(pendingRepair) + .setTransientSSTable(isTransient) + .setSerializationHeader(new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)) + .setSecondaryIndexGroups(cfs.indexManager.listIndexGroups()) + .setMetadataCollector(new MetadataCollector(cfs.metadata().comparator)) + .addDefaultComponents(cfs.indexManager.listIndexGroups()) + .build(txn, cfs); } - public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn) + public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, ILifecycleTransaction txn) { return getWriter(cfs, directory, txn, 0, null, false); } + public static SSTableWriter getWriter(SSTableFormat format, ColumnFamilyStore cfs, File directory, ILifecycleTransaction txn) + { + return getWriter(format, cfs, directory, txn, 0, null, false); + } + public static ByteBuffer random(int i, int size) { byte[] bytes = new byte[size + 4];