Skip to content

Commit 3058c0f

Browse files
committed
very rough WIP on CASSANDRA-20639
1 parent ab1ce59 commit 3058c0f

File tree

8 files changed

+109
-41
lines changed

8 files changed

+109
-41
lines changed

src/java/org/apache/cassandra/db/partitions/PartitionIterators.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,6 @@ public static void consume(PartitionIterator iterator)
9393
}
9494
}
9595

96-
/**
97-
* Consumes all rows in the next partition of the provided partition iterator.
98-
*/
99-
public static void consumeNext(PartitionIterator iterator)
100-
{
101-
if (iterator.hasNext())
102-
{
103-
try (RowIterator partition = iterator.next())
104-
{
105-
while (partition.hasNext())
106-
partition.next();
107-
}
108-
}
109-
}
110-
11196
/**
11297
* Wraps the provided iterator so it logs the returned rows for debugging purposes.
11398
* <p>

src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ public interface MergeListener
100100

101101
public void close();
102102

103+
public default void checkpoint() { }
104+
103105
public static MergeListener NOOP = new MergeListener()
104106
{
105107
public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) {}
@@ -540,6 +542,13 @@ public void close()
540542
listener.close();
541543
}
542544

545+
@Override
546+
public void checkpoint()
547+
{
548+
if (listener != null)
549+
listener.checkpoint();
550+
}
551+
543552
private class MergeReducer extends MergeIterator.Reducer<Unfiltered, Unfiltered>
544553
{
545554
private final MergeListener listener;

src/java/org/apache/cassandra/db/transform/BaseIterator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ public final void close()
115115
maybeFail(fail);
116116
}
117117

118+
public void checkpoint()
119+
{
120+
input.checkpoint();
121+
}
122+
118123
public final O next()
119124
{
120125
if (next == null && !hasNext())

src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java

Lines changed: 72 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.cassandra.concurrent.Stage;
3434
import org.apache.cassandra.db.Clustering;
3535
import org.apache.cassandra.db.ColumnFamilyStore;
36-
import org.apache.cassandra.db.Columns;
3736
import org.apache.cassandra.db.ConsistencyLevel;
3837
import org.apache.cassandra.db.DecoratedKey;
3938
import org.apache.cassandra.db.DeletionTime;
@@ -46,12 +45,12 @@
4645
import org.apache.cassandra.db.filter.DataLimits;
4746
import org.apache.cassandra.db.filter.RowFilter;
4847
import org.apache.cassandra.db.partitions.PartitionIterator;
49-
import org.apache.cassandra.db.partitions.PartitionIterators;
5048
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
5149
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
5250
import org.apache.cassandra.db.rows.EncodingStats;
5351
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
5452
import org.apache.cassandra.db.rows.Row;
53+
import org.apache.cassandra.db.rows.RowIterator;
5554
import org.apache.cassandra.db.rows.Rows;
5655
import org.apache.cassandra.db.rows.Unfiltered;
5756
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -71,6 +70,7 @@
7170
import org.apache.cassandra.service.ClientWarn;
7271
import org.apache.cassandra.service.StorageProxy;
7372
import org.apache.cassandra.service.reads.repair.NoopReadRepair;
73+
import org.apache.cassandra.service.reads.repair.PartitionIteratorMergeListener;
7474
import org.apache.cassandra.tracing.Tracing;
7575
import org.apache.cassandra.transport.Dispatcher;
7676
import org.apache.cassandra.utils.NoSpamLogger;
@@ -120,6 +120,12 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>>
120120
*/
121121
private final List<Queue<PartitionBuilder>> originalPartitions;
122122

123+
/** Whether to consume entire partitions or not in {@link #queryProtectedPartitions}. */
124+
private final boolean consumeEntirePartitions;
125+
126+
/** Tracks the current partitions when not consuming entire partitions in {@link #queryProtectedPartitions}. */
127+
private RowIterator currentRowIterator = null;
128+
123129
ReplicaFilteringProtection(ReadCoordinator coordinator,
124130
Keyspace keyspace,
125131
ReadCommand command,
@@ -132,6 +138,7 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>>
132138
this.coordinator = coordinator;
133139
this.keyspace = keyspace;
134140
this.command = command;
141+
this.consumeEntirePartitions = command.limits().isUnlimited() || !command.isLimitedToOnePartition() || command.rowFilter().hasStaticExpression();
135142
this.consistency = consistency;
136143
this.requestTime = requestTime;
137144
this.sources = sources;
@@ -195,14 +202,14 @@ public void close()
195202
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
196203
{
197204
List<PartitionBuilder> builders = new ArrayList<>(sources.size());
198-
RegularAndStaticColumns columns = columns(versions);
205+
RegularAndStaticColumns columns = PartitionIteratorMergeListener.columns(versions);
199206
EncodingStats stats = EncodingStats.merge(versions, NULL_TO_NO_STATS);
200207

201208
for (int i = 0; i < sources.size(); i++)
202209
builders.add(i, new PartitionBuilder(partitionKey, sources.get(i), columns, stats));
203210

204-
boolean[] silentRowAt = new boolean[builders.size()];
205-
boolean[] silentColumnAt = new boolean[builders.size()];
211+
final boolean[] silentRowAt = new boolean[builders.size()];
212+
final boolean[] silentColumnAt = new boolean[builders.size()];
206213

207214
return new UnfilteredRowIterators.MergeListener()
208215
{
@@ -270,6 +277,11 @@ public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTomb
270277

271278
@Override
272279
public void close()
280+
{
281+
}
282+
283+
@Override
284+
public void checkpoint()
273285
{
274286
for (int i = 0; i < sources.size(); i++)
275287
originalPartitions.get(i).add(builders.get(i));
@@ -313,22 +325,6 @@ private void releaseCachedRows(int count)
313325
currentRowsCached -= count;
314326
}
315327

316-
private static RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)
317-
{
318-
Columns statics = Columns.NONE;
319-
Columns regulars = Columns.NONE;
320-
for (UnfilteredRowIterator iter : versions)
321-
{
322-
if (iter == null)
323-
continue;
324-
325-
RegularAndStaticColumns cols = iter.columns();
326-
statics = statics.mergeTo(cols.statics);
327-
regulars = regulars.mergeTo(cols.regulars);
328-
}
329-
return new RegularAndStaticColumns(statics, regulars);
330-
}
331-
332328
/**
333329
* Returns the protected results for the specified replica. These are generated fetching the extra rows and merging
334330
* them with the cached original filtered results for that replica.
@@ -350,16 +346,66 @@ public TableMetadata metadata()
350346
}
351347

352348
@Override
353-
public void close() { }
349+
public void close()
350+
{
351+
if (currentRowIterator != null)
352+
currentRowIterator.close();
353+
}
354354

355355
@Override
356356
public boolean hasNext()
357357
{
358358
// If there are no cached partition builders for this source, advance the first phase iterator, which
359-
// will force the RFP merge listener to load at least the next protected partition.
359+
// will force the RFP merge listener to load rows from the next protected partition.
360360
if (partitions.isEmpty())
361361
{
362-
PartitionIterators.consumeNext(merged);
362+
if (consumeEntirePartitions)
363+
{
364+
if (merged.hasNext())
365+
{
366+
try (RowIterator partition = merged.next())
367+
{
368+
while (partition.hasNext())
369+
partition.next();
370+
371+
partition.checkpoint();
372+
}
373+
}
374+
}
375+
else
376+
{
377+
if (currentRowIterator == null || !currentRowIterator.hasNext())
378+
{
379+
// If there is an iterator, it's done, so just close it.
380+
if (currentRowIterator != null)
381+
{
382+
currentRowIterator.close();
383+
currentRowIterator = null;
384+
}
385+
386+
// Take the next filtered partition from the merged partition iterator.
387+
if (merged.hasNext())
388+
currentRowIterator = merged.next();
389+
}
390+
391+
if (currentRowIterator != null)
392+
{
393+
int i = 0;
394+
395+
// Consume LIMIT filtered rows from the current partition, unless there are fewer results.
396+
// The underlying iterator is short-read protected, and limiting the number of rows we
397+
// consume avoids needless SRP reads when there are many more than LIMIT results.
398+
while (i < command.limits().count() && currentRowIterator.hasNext())
399+
{
400+
currentRowIterator.next();
401+
i++;
402+
}
403+
404+
// If we actually consumed a row, checkpoint to populate the builders.
405+
if (i > 0)
406+
currentRowIterator.checkpoint();
407+
}
408+
}
363409
}
364410

365411
return !partitions.isEmpty();
@@ -491,6 +537,8 @@ public Row staticRow()
491537
public void close()
492538
{
493539
releaseCachedRows(partitionRowsCached);
540+
toFetch = null;
541+
// TODO: the counters might not be accurate for the static row at this point?
494542
}
495543

496544
@Override

src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey par
4949
return new RowIteratorMergeListener<>(partitionKey, columns(versions), isReversed(versions), replicaPlan, command, readRepair);
5050
}
5151

52-
protected RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)
52+
public static RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)
5353
{
5454
Columns statics = Columns.NONE;
5555
Columns regulars = Columns.NONE;

src/java/org/apache/cassandra/utils/CloseableIterator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable
2525
{
2626
public void close();
27+
28+
public default void checkpoint() { }
2729

2830
public static <T> CloseableIterator<T> wrap(Iterator<T> iter)
2931
{

test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ public MultiNodeTableWalkWithoutReadRepairTest()
3333
protected void preCheck(Cluster cluster, Property.StatefulBuilder builder)
3434
{
3535
// if a failing seed is detected, populate here
36-
// Example: builder.withSeed(42L);
36+
// builder.withSeed(1210048824849624538L).withExamples(1);
37+
// builder.withSeed(-7862021736520593557L).withExamples(1);
38+
// builder.withSeed(602472426346856339L).withExamples(1);
39+
builder.withExamples(Integer.MAX_VALUE);
3740
// CQL operations may have opertors such as +, -, and / (example 4 + 4), to "apply" them to get a constant value
3841
// CQL_DEBUG_APPLY_OPERATOR = true;
3942
// When mutations look to be lost as seen by more complex SELECTs, it can be useful to just SELECT the partition/row right after to write to see if it was safe at the time.

test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,22 @@ public void testShortReadWithRegularColumns()
216216
assertRows(initialRows, row(0, 1, 2));
217217
}
218218

219+
@Test
220+
public void testNoShortReadAtLimit()
221+
{
222+
CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.short_read_no_srp (k int, c int, a int, PRIMARY KEY (k, c)) WITH read_repair = 'NONE'"));
223+
CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON %s.short_read_no_srp(a) USING 'sai'"));
224+
SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
225+
226+
CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s.short_read_no_srp(k, c, a) VALUES (0, 2, 1) USING TIMESTAMP 5"));
227+
CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO %s.short_read_no_srp(k, c, a) VALUES (0, 3, 1) USING TIMESTAMP 6"));
228+
229+
// TODO: Add some couter verification to this for SRP (we should not have to short read here after getting one good result)
230+
String select = withKeyspace("SELECT * FROM %s.short_read_no_srp WHERE k = 0 AND a = 1 LIMIT 1");
231+
Iterator<Object[]> initialRows = CLUSTER.coordinator(1).executeWithPaging(select, ConsistencyLevel.ALL, 2);
232+
assertRows(initialRows, row(0, 2, 1));
233+
}
234+
219235
@Test
220236
public void testShortReadWithStaticColumn()
221237
{

0 commit comments

Comments
 (0)