Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-19703: Ensure prepared_statement INSERT timestamp precedes eviction DELETE #3917

Open
wants to merge 1 commit into
base: cassandra-4.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/cql3/QueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public static class Prepared

public final MD5Digest resultMetadataId;

/**
* Timestamp of when this prepared statement was created. Used in QueryProcessor.preparedStatements cache
* to ensure that the deletion timestamp always succeeds the insert timestamp.
*/
public final long timestamp;

/**
* Contains the CQL statement source if the statement has been "regularly" perpared via
* {@link QueryHandler#prepare(String, ClientState, Map)}.
Expand All @@ -82,6 +88,7 @@ public Prepared(CQLStatement statement, String rawCQLStatement, boolean fullyQua
this.resultMetadataId = ResultSet.ResultMetadata.fromPrepared(statement).getResultMetadataId();
this.fullyQualified = fullyQualified;
this.keyspace = keyspace;
this.timestamp = ClientState.getTimestamp();
}
}
}
37 changes: 32 additions & 5 deletions src/java/org/apache/cassandra/cql3/QueryProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ private enum InternalStateInstance
}

public void preloadPreparedStatements()
{
preloadPreparedStatements(5000);
}

@VisibleForTesting
public void preloadPreparedStatements(int pageSize)
{
int count = SystemKeyspace.loadPreparedStatements((id, query, keyspace) -> {
try
Expand All @@ -178,7 +184,7 @@ public void preloadPreparedStatements()
SystemKeyspace.removePreparedStatement(id);
return false;
}
});
}, pageSize);
logger.info("Preloaded {} prepared statements", count);
}

Expand Down Expand Up @@ -549,12 +555,33 @@ public static UntypedResultSet execute(String query, ConsistencyLevel cl, QueryS
public static UntypedResultSet executeInternalWithPaging(String query, int pageSize, Object... values)
{
Prepared prepared = prepareInternal(query);
if (!(prepared.statement instanceof SelectStatement))

return executeInternalWithPaging(prepared.statement, pageSize, values);
}

/**
* Executes with a non-prepared statement using paging. Generally {@link #executeInternalWithPaging(String, int, Object...)}
* should be used instead of this, but this may be used in niche cases like
* {@link SystemKeyspace#loadPreparedStatement(MD5Digest, SystemKeyspace.TriFunction)} where prepared statements are
* being loaded into {@link #preparedStatements} so it doesn't make sense to prepare a statement in this context.
*/
public static UntypedResultSet executeOnceInternalWithPaging(String query, int pageSize, Object... values)
{
QueryState queryState = internalQueryState();
CQLStatement statement = parseStatement(query, queryState.getClientState());
statement.validate(queryState.getClientState());

return executeInternalWithPaging(statement, pageSize, values);
}

private static UntypedResultSet executeInternalWithPaging(CQLStatement statement, int pageSize, Object... values)
{
if (!(statement instanceof SelectStatement))
throw new IllegalArgumentException("Only SELECTs can be paged");

SelectStatement select = (SelectStatement)prepared.statement;
SelectStatement select = (SelectStatement) statement;
int nowInSec = FBUtilities.nowInSeconds();
QueryPager pager = select.getQuery(makeInternalOptionsWithNowInSec(prepared.statement, nowInSec, values), nowInSec).getPager(null, ProtocolVersion.CURRENT);
QueryPager pager = select.getQuery(makeInternalOptionsWithNowInSec(select, nowInSec, values), nowInSec).getPager(null, ProtocolVersion.CURRENT);
return UntypedResultSet.create(select, pager, pageSize);
}

Expand Down Expand Up @@ -793,7 +820,7 @@ public static ResultMessage.Prepared storePreparedStatement(String queryString,

Prepared previous = preparedStatements.get(statementId, (ignored_) -> prepared);
if (previous == prepared)
SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString);
SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString, prepared.timestamp);

ResultSet.PreparedMetadata preparedMetadata = ResultSet.PreparedMetadata.fromPrepared(prepared.statement);
ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.fromPrepared(prepared.statement);
Expand Down
14 changes: 10 additions & 4 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithNowInSec;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternalWithPaging;
import static org.apache.cassandra.service.paxos.Commit.latest;
import static org.apache.cassandra.utils.CassandraVersion.NULL_VERSION;
import static org.apache.cassandra.utils.CassandraVersion.UNREADABLE_VERSION;
Expand Down Expand Up @@ -1842,11 +1843,11 @@ private static Range<Token> byteBufferToRange(ByteBuffer rawRange, IPartitioner
}
}

public static void writePreparedStatement(String loggedKeyspace, MD5Digest key, String cql)
public static void writePreparedStatement(String loggedKeyspace, MD5Digest key, String cql, long timestamp)
{
executeInternal(format("INSERT INTO %s (logged_keyspace, prepared_id, query_string) VALUES (?, ?, ?)",
executeInternal(format("INSERT INTO %s (logged_keyspace, prepared_id, query_string) VALUES (?, ?, ?) USING TIMESTAMP ?",
PreparedStatements.toString()),
loggedKeyspace, key.byteBuffer(), cql);
loggedKeyspace, key.byteBuffer(), cql, timestamp);
logger.debug("stored prepared statement for logged keyspace '{}': '{}'", loggedKeyspace, cql);
}

Expand All @@ -1863,9 +1864,14 @@ public static void resetPreparedStatements()
}

public static int loadPreparedStatements(TriFunction<MD5Digest, String, String, Boolean> onLoaded)
{
return loadPreparedStatements(onLoaded, 5000);
}

public static int loadPreparedStatements(TriFunction<MD5Digest, String, String, Boolean> onLoaded, int pageSize)
{
String query = String.format("SELECT prepared_id, logged_keyspace, query_string FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS);
UntypedResultSet resultSet = executeOnceInternal(query);
UntypedResultSet resultSet = executeOnceInternalWithPaging(query, pageSize);
int counter = 0;
for (UntypedResultSet.Row row : resultSet)
{
Expand Down
97 changes: 93 additions & 4 deletions test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.Assert;
import org.junit.Before;
Expand All @@ -42,6 +46,8 @@

public class PstmtPersistenceTest extends CQLTester
{
private static final CompletableFuture<?>[] futureArray = new CompletableFuture[0];

@Before
public void setUp()
{
Expand Down Expand Up @@ -104,7 +110,7 @@ public void testCachedPreparedStatements() throws Throwable
Assert.assertNotNull(prepared);
}

// add anther prepared statement and sync it to table
// add another prepared statement and sync it to table
prepareStatement(statement2, "foo", "bar", clientState);

// statement1 will have two statements prepared because of `setKeyspace` usage
Expand Down Expand Up @@ -142,19 +148,103 @@ public void testPstmtInvalidation() throws Throwable

createTable("CREATE TABLE %s (key int primary key, val int)");

long initialEvicted = numberOfEvictedStatements();

for (int cnt = 1; cnt < 10000; cnt++)
{
prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt, clientState);

if (numberOfEvictedStatements() > 0)
if (numberOfEvictedStatements() - initialEvicted > 0)
{
assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());

// prepare a more statements to trigger more evictions
for (int cnt2 = cnt + 1; cnt2 < cnt + 10; cnt2++)
prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt2, clientState);

// each new prepared statement should have caused an eviction
assertEquals("eviction count didn't increase by the expected number", 10, numberOfEvictedStatements() - initialEvicted);
assertEquals("Number of statements in memory (expected) and table (actual) don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());

return;
}
}

fail("Prepared statement eviction does not work");
}



@Test
public void testAsyncPstmtInvalidation() throws Throwable
{
ClientState clientState = ClientState.forInternalCalls();
createTable("CREATE TABLE %s (key int primary key, val int)");

// prepare statements concurrently in a thread pool to exercise bug encountered in CASSANDRA-19703 where
// delete from table occurs before the insert due to early eviction.
final ExecutorService executor = Executors.newFixedThreadPool(10);

long initialEvicted = numberOfEvictedStatements();
try
{
int statementsToPrepare = 10000;
List<CompletableFuture<MD5Digest>> prepareFutures = new ArrayList<>(statementsToPrepare);
for (int cnt = 1; cnt < statementsToPrepare; cnt++)
{
final int localCnt = cnt;
prepareFutures.add(CompletableFuture.supplyAsync(() -> prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + localCnt, clientState), executor));
}

// Await completion
CompletableFuture.allOf(prepareFutures.toArray(futureArray)).get(10, TimeUnit.SECONDS);

assertNotEquals("Should have evicted some prepared statements", 0, numberOfEvictedStatements() - initialEvicted);

// ensure the number of statements on disk match the number in memory, if number of statements on disk eclipses in memory, there was a leak.
assertEquals("Number of statements in memory (expected) and table (actual) don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
}
finally
{
executor.shutdown();
}
}

@Test
public void testPreloadPreparedStatements() throws Throwable
{
long initialEvicted = numberOfEvictedStatements();
ClientState clientState = ClientState.forInternalCalls();
createTable("CREATE TABLE %s (key int primary key, val int)");

// prepare more statements than the paging size to ensure paging works properly.
int statementsToPrepare = 300;
int pageSize = 100;

for (int cnt = 1; cnt <= statementsToPrepare; cnt++)
{
prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt, clientState);
}

// capture how many statements are in memory before clearing cache.
long statementsInMemory = numberOfStatementsInMemory();
long statementsOnDisk = numberOfStatementsOnDisk();
assertEquals(statementsOnDisk, statementsInMemory);

// drop prepared statements from cache only
QueryProcessor.clearPreparedStatements(true);
assertEquals(0, numberOfStatementsInMemory());

// Load prepared statements and ensure the cache size matches the size it was before clearing cache.
QueryProcessor.instance.preloadPreparedStatements(pageSize);
long statementsInMemoryAfterLoading = numberOfStatementsInMemory();
assertEquals("Statements in cache previously (expected) does not match statements loaded (actual)", statementsInMemory, statementsInMemoryAfterLoading);

// Ensure size of cache matches statements prepared - evicted.
long totalEvicted = numberOfEvictedStatements() - initialEvicted;
assertEquals("Statements prepared - evicted (expected) does not match statements in memory (actual)", statementsToPrepare - totalEvicted, statementsInMemoryAfterLoading);
}

private long numberOfStatementsOnDisk() throws Throwable
{
UntypedResultSet.Row row = execute("SELECT COUNT(*) FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS).one();
Expand All @@ -178,7 +268,6 @@ private MD5Digest prepareStatement(String stmt, ClientState clientState)

private MD5Digest prepareStatement(String stmt, String keyspace, String table, ClientState clientState)
{
System.out.println(stmt + String.format(stmt, keyspace + "." + table));
return QueryProcessor.instance.prepare(String.format(stmt, keyspace + "." + table), clientState).statementId;
return QueryProcessor.instance.prepare(String.format(stmt, keyspace + '.' + table), clientState).statementId;
}
}