Skip to content

Commit

Permalink
Ensure prepared_statement INSERT timestamp precedes eviction DELETE
Browse files Browse the repository at this point in the history
Updates SystemKeyspace.writePreparedStatement to accept a timestamp
associated with the Prepared creation time. Using this timestamp
will ensure that an INSERT into system.prepared_statements will
always precede the timestamp for the same Prepared in
SystemKeyspace.removePreparedStatement.

Additionally, any clusters currently experiencing a leaky
system.prepared_statements table from this bug may struggle to
bounce into a version with this fix as
SystemKeyspace.loadPreparedPreparedStatements currently does
not paginate the query to system.prepared_statements, causing heap
OOMs.  To fix this this patch adds pagingation at 5000 rows,
which should allow nodes to come up and delete older prepared
statements that may no longer be used as
the cache fills up (which should happen immediately).

Adds #testAsyncPstmtInvalidation which almost always reproduces the
issue without this fix.

Adds #testPreloadPreparedStatements to verify pagination behavior.

This patch does not address the issue of Caffeine immediately evicting
a prepared statement, however it will prevent the
system.prepared_statements table from growing unbounded.  For most users
this should be adequate, as the cache should only be filled when there
are erroneously many unique prepared statements. In such a case we can
expect that clients will constantly prepare statements regardless
of whether or not the cache is evicting statements.
  • Loading branch information
tolbertam committed Feb 20, 2025
1 parent cd3e564 commit 1e639da
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 16 deletions.
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/cql3/QueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public static class Prepared

public final MD5Digest resultMetadataId;

/**
* Timestamp of when this prepared statement was created. Used in QueryProcessor.preparedStatements cache
* to ensure that the deletion timestamp always succeeds the insert timestamp.
*/
public final long timestamp;

/**
* Contains the CQL statement source if the statement has been "regularly" perpared via
* {@link QueryHandler#prepare(String, ClientState, Map)}.
Expand All @@ -78,6 +84,7 @@ public Prepared(CQLStatement statement, String rawCQLStatement, boolean fullyQua
this.resultMetadataId = ResultSet.ResultMetadata.fromPrepared(statement).getResultMetadataId();
this.fullyQualified = fullyQualified;
this.keyspace = keyspace;
this.timestamp = ClientState.getTimestamp();
}
}
}
37 changes: 32 additions & 5 deletions src/java/org/apache/cassandra/cql3/QueryProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ private enum InternalStateInstance
}

public void preloadPreparedStatements()
{
preloadPreparedStatements(5000);
}

@VisibleForTesting
public void preloadPreparedStatements(int pageSize)
{
int count = SystemKeyspace.loadPreparedStatements((id, query, keyspace) -> {
try
Expand All @@ -177,7 +183,7 @@ public void preloadPreparedStatements()
SystemKeyspace.removePreparedStatement(id);
return false;
}
});
}, pageSize);
logger.info("Preloaded {} prepared statements", count);
}

Expand Down Expand Up @@ -543,12 +549,33 @@ public static UntypedResultSet execute(String query, ConsistencyLevel cl, QueryS
public static UntypedResultSet executeInternalWithPaging(String query, int pageSize, Object... values)
{
Prepared prepared = prepareInternal(query);
if (!(prepared.statement instanceof SelectStatement))

return executeInternalWithPaging(prepared.statement, pageSize, values);
}

/**
* Executes with a non-prepared statement using paging. Generally {@link #executeInternalWithPaging(String, int, Object...)}
* should be used instead of this, but this may be used in niche cases like
* {@link SystemKeyspace#loadPreparedStatement(MD5Digest, SystemKeyspace.TriFunction)} where prepared statements are
* being loaded into {@link #preparedStatements} so it doesn't make sense to prepare a statement in this context.
*/
public static UntypedResultSet executeOnceInternalWithPaging(String query, int pageSize, Object... values)
{
QueryState queryState = internalQueryState();
CQLStatement statement = parseStatement(query, queryState.getClientState());
statement.validate(queryState.getClientState());

return executeInternalWithPaging(statement, pageSize, values);
}

private static UntypedResultSet executeInternalWithPaging(CQLStatement statement, int pageSize, Object... values)
{
if (!(statement instanceof SelectStatement))
throw new IllegalArgumentException("Only SELECTs can be paged");

SelectStatement select = (SelectStatement)prepared.statement;
SelectStatement select = (SelectStatement) statement;
int nowInSec = FBUtilities.nowInSeconds();
QueryPager pager = select.getQuery(makeInternalOptionsWithNowInSec(prepared.statement, nowInSec, values), nowInSec).getPager(null, ProtocolVersion.CURRENT);
QueryPager pager = select.getQuery(makeInternalOptionsWithNowInSec(select, nowInSec, values), nowInSec).getPager(null, ProtocolVersion.CURRENT);
return UntypedResultSet.create(select, pager, pageSize);
}

Expand Down Expand Up @@ -787,7 +814,7 @@ public static ResultMessage.Prepared storePreparedStatement(String queryString,
MD5Digest statementId = computeId(queryString, keyspace);
Prepared previous = preparedStatements.get(statementId, (ignored_) -> prepared);
if (previous == prepared)
SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString);
SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString, prepared.timestamp);

ResultSet.PreparedMetadata preparedMetadata = ResultSet.PreparedMetadata.fromPrepared(prepared.statement);
ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.fromPrepared(prepared.statement);
Expand Down
14 changes: 10 additions & 4 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithNowInSec;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternalWithPaging;
import static org.apache.cassandra.service.paxos.Commit.latest;
import static org.apache.cassandra.utils.CassandraVersion.NULL_VERSION;
import static org.apache.cassandra.utils.CassandraVersion.UNREADABLE_VERSION;
Expand Down Expand Up @@ -1815,11 +1816,11 @@ private static Range<Token> byteBufferToRange(ByteBuffer rawRange, IPartitioner
}
}

public static void writePreparedStatement(String loggedKeyspace, MD5Digest key, String cql)
public static void writePreparedStatement(String loggedKeyspace, MD5Digest key, String cql, long timestamp)
{
executeInternal(format("INSERT INTO %s (logged_keyspace, prepared_id, query_string) VALUES (?, ?, ?)",
executeInternal(format("INSERT INTO %s (logged_keyspace, prepared_id, query_string) VALUES (?, ?, ?) USING TIMESTAMP ?",
PreparedStatements.toString()),
loggedKeyspace, key.byteBuffer(), cql);
loggedKeyspace, key.byteBuffer(), cql, timestamp);
logger.debug("stored prepared statement for logged keyspace '{}': '{}'", loggedKeyspace, cql);
}

Expand All @@ -1836,9 +1837,14 @@ public static void resetPreparedStatements()
}

public static int loadPreparedStatements(TriFunction<MD5Digest, String, String, Boolean> onLoaded)
{
return loadPreparedStatements(onLoaded, 5000);
}

public static int loadPreparedStatements(TriFunction<MD5Digest, String, String, Boolean> onLoaded, int pageSize)
{
String query = String.format("SELECT prepared_id, logged_keyspace, query_string FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS);
UntypedResultSet resultSet = executeOnceInternal(query);
UntypedResultSet resultSet = executeOnceInternalWithPaging(query, pageSize);
int counter = 0;
for (UntypedResultSet.Row row : resultSet)
{
Expand Down
93 changes: 86 additions & 7 deletions test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.Assert;
import org.junit.Before;
Expand All @@ -42,6 +46,8 @@

public class PstmtPersistenceTest extends CQLTester
{
private static final CompletableFuture<?>[] futureArray = new CompletableFuture[0];

@Before
public void setUp()
{
Expand Down Expand Up @@ -104,7 +110,7 @@ public void testCachedPreparedStatements() throws Throwable
Assert.assertNotNull(prepared);
}

// add anther prepared statement and sync it to table
// add another prepared statement and sync it to table
prepareStatement(statement2, "foo", "bar", clientState);

// statement1 will have two statements prepared because of `setKeyspace` usage
Expand Down Expand Up @@ -142,21 +148,23 @@ public void testPstmtInvalidation() throws Throwable

createTable("CREATE TABLE %s (key int primary key, val int)");

long initialEvicted = numberOfEvictedStatements();

for (int cnt = 1; cnt < 10000; cnt++)
{
prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt, clientState);

if (numberOfEvictedStatements() > 0)
if (numberOfEvictedStatements() - initialEvicted > 0)
{
assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());

// prepare a more statements to trigger more evictions
for (int cnt2 = 1; cnt2 < 10; cnt2++)
for (int cnt2 = cnt + 1; cnt2 < cnt + 10; cnt2++)
prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt2, clientState);

// each new prepared statement should have caused an eviction
assertEquals("eviction count didn't increase by the expected number", numberOfEvictedStatements(), 10);
assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
assertEquals("eviction count didn't increase by the expected number", 10, numberOfEvictedStatements() - initialEvicted);
assertEquals("Number of statements in memory (expected) and table (actual) don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());

return;
}
Expand All @@ -165,6 +173,78 @@ public void testPstmtInvalidation() throws Throwable
fail("Prepared statement eviction does not work");
}



@Test
public void testAsyncPstmtInvalidation() throws Throwable
{
ClientState clientState = ClientState.forInternalCalls();
createTable("CREATE TABLE %s (key int primary key, val int)");

// prepare statements concurrently in a thread pool to exercise bug encountered in CASSANDRA-19703 where
// delete from table occurs before the insert due to early eviction.
final ExecutorService executor = Executors.newFixedThreadPool(10);

long initialEvicted = numberOfEvictedStatements();
try
{
int statementsToPrepare = 10000;
List<CompletableFuture<MD5Digest>> prepareFutures = new ArrayList<>(statementsToPrepare);
for (int cnt = 1; cnt < statementsToPrepare; cnt++)
{
final int localCnt = cnt;
prepareFutures.add(CompletableFuture.supplyAsync(() -> prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + localCnt, clientState), executor));
}

// Await completion
CompletableFuture.allOf(prepareFutures.toArray(futureArray)).get(10, TimeUnit.SECONDS);

assertNotEquals("Should have evicted some prepared statements", 0, numberOfEvictedStatements() - initialEvicted);

// ensure the number of statements on disk match the number in memory, if number of statements on disk eclipses in memory, there was a leak.
assertEquals("Number of statements in memory (expected) and table (actual) don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
}
finally
{
executor.shutdown();
}
}

@Test
public void testPreloadPreparedStatements() throws Throwable
{
long initialEvicted = numberOfEvictedStatements();
ClientState clientState = ClientState.forInternalCalls();
createTable("CREATE TABLE %s (key int primary key, val int)");

// prepare more statements than the paging size to ensure paging works properly.
int statementsToPrepare = 300;
int pageSize = 100;

for (int cnt = 1; cnt <= statementsToPrepare; cnt++)
{
prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt, clientState);
}

// capture how many statements are in memory before clearing cache.
long statementsInMemory = numberOfStatementsInMemory();
long statementsOnDisk = numberOfStatementsOnDisk();
assertEquals(statementsOnDisk, statementsInMemory);

// drop prepared statements from cache only
QueryProcessor.clearPreparedStatements(true);
assertEquals(0, numberOfStatementsInMemory());

// Load prepared statements and ensure the cache size matches the size it was before clearing cache.
QueryProcessor.instance.preloadPreparedStatements(pageSize);
long statementsInMemoryAfterLoading = numberOfStatementsInMemory();
assertEquals("Statements in cache previously (expected) does not match statements loaded (actual)", statementsInMemory, statementsInMemoryAfterLoading);

// Ensure size of cache matches statements prepared - evicted.
long totalEvicted = numberOfEvictedStatements() - initialEvicted;
assertEquals("Statements prepared - evicted (expected) does not match statements in memory (actual)", statementsToPrepare - totalEvicted, statementsInMemoryAfterLoading);
}

private long numberOfStatementsOnDisk() throws Throwable
{
UntypedResultSet.Row row = execute("SELECT COUNT(*) FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS).one();
Expand All @@ -188,7 +268,6 @@ private MD5Digest prepareStatement(String stmt, ClientState clientState)

private MD5Digest prepareStatement(String stmt, String keyspace, String table, ClientState clientState)
{
System.out.println(stmt + String.format(stmt, keyspace + "." + table));
return QueryProcessor.instance.prepare(String.format(stmt, keyspace + "." + table), clientState).statementId;
return QueryProcessor.instance.prepare(String.format(stmt, keyspace + '.' + table), clientState).statementId;
}
}

0 comments on commit 1e639da

Please sign in to comment.