Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23441 Sql. Cancellation of script execution #4706

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,16 @@ <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
*/
void executeScript(String query, @Nullable Object... arguments);

/**
* Executes a multi-statement SQL query.
*
* @param cancellationToken Cancellation token or {@code null}.
* @param query SQL query template.
* @param arguments Arguments for the template (optional).
* @throws SqlException If failed.
*/
void executeScript(@Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments);

/**
* Executes a multi-statement SQL query.
*
Expand All @@ -389,4 +399,15 @@ <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
*/
CompletableFuture<Void> executeScriptAsync(String query, @Nullable Object... arguments);

/**
* Executes a multi-statement SQL query.
*
* @param cancellationToken Cancellation token or {@code null}.
* @param query SQL query template.
* @param arguments Arguments for the template (optional).
* @return Operation future.
* @throws SqlException If failed.
*/
CompletableFuture<Void> executeScriptAsync(@Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments);

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ public static CompletableFuture<Void> process(
HybridTimestamp clientTs = HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
transactions.updateObservableTimestamp(clientTs);

// TODO https://issues.apache.org/jira/browse/IGNITE-23646 Pass cancellation token to the query processor.
return IgniteSqlImpl.executeScriptCore(
sql, transactions.observableTimestampTracker(), () -> true, () -> {}, script, arguments, props.toSqlProps()
sql, transactions.observableTimestampTracker(), () -> true, () -> {}, script, null, arguments, props.toSqlProps()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ public void executeScript(String query, @Nullable Object... arguments) {
}
}

/** {@inheritDoc} */
@Override
public void executeScript(@Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments) {
// TODO https://issues.apache.org/jira/browse/IGNITE-23646 Support cancellation token.
throw new UnsupportedOperationException();
}

/** {@inheritDoc} */
@Override
public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
Expand Down Expand Up @@ -335,6 +342,14 @@ public CompletableFuture<Void> executeScriptAsync(String query, @Nullable Object
return ch.serviceAsync(ClientOp.SQL_EXEC_SCRIPT, payloadWriter, null);
}

/** {@inheritDoc} */
@Override
public CompletableFuture<Void> executeScriptAsync(@Nullable CancellationToken cancellationToken, String query,
@Nullable Object... arguments) {
// TODO https://issues.apache.org/jira/browse/IGNITE-23646 Support cancellation token.
throw new UnsupportedOperationException();
}

private static void packProperties(
PayloadOutputChannel w,
@Nullable Map<String, Object> statementProps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,22 @@ public void executeScript(String query, @Nullable Object... arguments) {
attachmentLock.consumeAttached(ignite -> ignite.sql().executeScript(query, arguments));
}

@Override
public void executeScript(@Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments) {
attachmentLock.consumeAttached(ignite -> ignite.sql().executeScript(cancellationToken, query, arguments));
}

@Override
public CompletableFuture<Void> executeScriptAsync(String query, @Nullable Object... arguments) {
return attachmentLock.attachedAsync(ignite -> ignite.sql().executeScriptAsync(query, arguments));
}

@Override
public CompletableFuture<Void> executeScriptAsync(@Nullable CancellationToken cancellationToken, String query,
@Nullable Object... arguments) {
return attachmentLock.attachedAsync(ignite -> ignite.sql().executeScriptAsync(cancellationToken, query, arguments));
}

@Override
public <T> T unwrap(Class<T> classToUnwrap) {
return attachmentLock.attached(ignite -> Wrappers.unwrap(ignite.sql(), classToUnwrap));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -36,15 +39,20 @@
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.client.sql.ClientSql;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.ColumnMetadataImpl;
import org.apache.ignite.internal.sql.ColumnMetadataImpl.ColumnOriginImpl;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.lang.CancelHandle;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.lang.CursorClosedException;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.apache.ignite.lang.ErrorGroups.Sql;
Expand All @@ -65,10 +73,12 @@
import org.apache.ignite.table.Table;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.awaitility.Awaitility;
import org.hamcrest.Matcher;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AssertionFailureBuilder;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -929,6 +939,46 @@ public void testQueryTimeout() {
});
}

@Test
public void cancelScript() {
IgniteSql sql = igniteSql();

// TODO https://issues.apache.org/jira/browse/IGNITE-23646 Support cancellation token and remove assumption.
Assumptions.assumeFalse(sql instanceof ClientSql, "The thin client does not yet support canceling requests.");

sql("CREATE TABLE test (id INT PRIMARY KEY);");

String query = "INSERT INTO test SELECT x FROM system_range(0, 10000000000); SELECT 1;";

CancelHandle cancelHandle = CancelHandle.create();
CancellationToken token = cancelHandle.token();

CompletableFuture<Void> scriptFut = IgniteTestUtils.runAsync(() -> executeScript(sql, token, query));

// Wait until FIRST script statement is started to execute.
Awaitility.await().untilAsserted(() -> assertThat(queryProcessor().runningQueries(), greaterThan(1)));

assertThat(scriptFut.isDone(), is(false));

cancelHandle.cancel();

assertThrowsSqlException(
Sql.EXECUTION_CANCELLED_ERR,
"The query was cancelled while executing.",
() -> IgniteTestUtils.await(scriptFut)
);

assertThat(queryProcessor().runningQueries(), is(0));

// Checks the exception that is thrown if a query is canceled before a cursor is obtained.
assertThrowsSqlException(
Sql.EXECUTION_CANCELLED_ERR,
"The query was cancelled while executing.",
() -> executeScript(sql, token, "SELECT 1; SELECT 2;")
);
assertThat(queryProcessor().runningQueries(), is(0));
}

@Test
public void testQueryTimeoutIsPropagatedFromTheServer() throws Exception {
Statement stmt = igniteSql().statementBuilder()
Expand Down Expand Up @@ -1041,6 +1091,8 @@ protected ResultProcessor execute(IgniteSql sql, String query, Object... args) {

protected abstract void executeScript(IgniteSql sql, String query, Object... args);

protected abstract void executeScript(IgniteSql sql, CancellationToken token, String query, Object... args);

protected abstract void rollback(Transaction outerTx);

protected abstract void commit(Transaction outerTx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.sql.api;

import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -120,6 +121,17 @@ public void cancelQueryString() throws InterruptedException {

return sql.executeAsync(transaction, token, query);
});

// Checks the exception that is thrown if a query is canceled before a cursor is obtained.
CancelHandle cancelHandle = CancelHandle.create();
CancellationToken token = cancelHandle.token();
cancelHandle.cancel();

assertThrowsSqlException(
Sql.EXECUTION_CANCELLED_ERR,
"The query was cancelled while executing.",
() -> await(sql.executeAsync(null, token, "SELECT 1"))
);
}

@Test
Expand Down Expand Up @@ -221,6 +233,11 @@ protected void executeScript(IgniteSql sql, String query, Object... args) {
await(sql.executeScriptAsync(query, args));
}

@Override
protected void executeScript(IgniteSql sql, CancellationToken cancellationToken, String query, Object... args) {
await(sql.executeScriptAsync(cancellationToken, query, args));
}

@Override
protected void rollback(Transaction tx) {
await(tx.rollbackAsync());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.sql.api;

import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand Down Expand Up @@ -76,6 +77,18 @@ public void cancelQueryString() throws InterruptedException {
Transaction transaction = igniteTx().begin();
return sql.execute(transaction, token, statement);
});

// Checks the exception that is thrown if a query is canceled before a cursor is obtained.
CancelHandle cancelHandle = CancelHandle.create();
CancellationToken token = cancelHandle.token();
cancelHandle.cancel();

//noinspection resource
assertThrowsSqlException(
Sql.EXECUTION_CANCELLED_ERR,
"The query was cancelled while executing.",
() -> sql.execute(null, token, "SELECT 1")
);
}

@Test
Expand Down Expand Up @@ -160,6 +173,11 @@ protected void executeScript(IgniteSql sql, String query, Object... args) {
sql.executeScript(query, args);
}

@Override
protected void executeScript(IgniteSql sql, CancellationToken cancellationToken, String query, Object... args) {
sql.executeScript(cancellationToken, query, args);
}

@Override
protected void rollback(Transaction tx) {
tx.rollback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.ignite.internal.sql.engine.util.CursorUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
import org.apache.ignite.lang.CancellationToken;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;

Expand Down Expand Up @@ -68,21 +69,27 @@ void executeScript(String query, Object ... params) {

/** Fully executes multi-statements query without reading cursor data. */
void executeScript(String query, @Nullable InternalTransaction tx, Object ... params) {
iterateThroughResultsAndCloseThem(runScript(query, tx, params));
iterateThroughResultsAndCloseThem(runScript(tx, null, query, params));
}

/** Initiates multi-statements query execution. */
AsyncSqlCursor<InternalSqlRow> runScript(String query) {
return runScript(query, null);
AsyncSqlCursor<InternalSqlRow> runScript(String query, Object... params) {
return runScript(null, null, query, params);
}

AsyncSqlCursor<InternalSqlRow> runScript(String query, @Nullable InternalTransaction tx, Object ... params) {
/** Initiates multi-statements query execution. */
AsyncSqlCursor<InternalSqlRow> runScript(CancellationToken cancellationToken, String query, Object... params) {
return runScript(null, cancellationToken, query, params);
}

AsyncSqlCursor<InternalSqlRow> runScript(@Nullable InternalTransaction tx, @Nullable CancellationToken cancellationToken, String query,
Object... params) {
SqlProperties properties = SqlPropertiesHelper.newBuilder()
.set(QueryProperty.ALLOWED_QUERY_TYPES, SqlQueryType.ALL)
.build();

AsyncSqlCursor<InternalSqlRow> cursor = await(
queryProcessor().queryAsync(properties, observableTimeTracker(), tx, null, query, params)
queryProcessor().queryAsync(properties, observableTimeTracker(), tx, cancellationToken, query, params)
);

return Objects.requireNonNull(cursor);
Expand Down
Loading