Skip to content
Open
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

### Updated
- `EnableGeoSpatialSupport` no longer requires `EnableComplexDatatypeSupport=1`. Geospatial types (GEOMETRY, GEOGRAPHY) can now be enabled independently of complex type support (ARRAY, MAP, STRUCT).
- Server-side operations are now closed proactively when all results are consumed (`next()` returns false) or `ResultSet.close()` is called, improving resource utilization. The client-side Statement remains open and reusable for re-execution.

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,18 +283,32 @@ public boolean next() throws SQLException {
cachedTelemetryCollector.recordResultSetIteration(
statementId.toSQLExecStatementId(), resultSetMetaData.getChunkCount(), hasNext);
}
if (!hasNext) {
// All rows consumed — proactively close server operation to release resources.
// The client-side Statement remains open for reuse.
closeServerOperation();
}
return hasNext;
}

@Override
public void close() throws DatabricksSQLException {
// Proactively close server operation when ResultSet is closed explicitly.
closeServerOperation();
isClosed = true;
this.executionResult.close();
if (parentStatement != null) {
parentStatement.handleResultSetClose(this);
}
}

/** Proactively closes the server-side operation via the parent statement. */
private void closeServerOperation() {
if (parentStatement instanceof DatabricksStatement) {
((DatabricksStatement) parentStatement).closeServerOperation();
}
}

private static TelemetryCollector resolveTelemetryCollector(
IDatabricksStatementInternal parentStatement) {
try {
Expand Down
77 changes: 59 additions & 18 deletions src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class DatabricksStatement implements IDatabricksStatement, IDatabricksSta
// closed the operation — no further RPCs for this statement ID are possible. The JDBC Statement
// itself remains open for re-execution. Reset on each new execution. Volatile because cancel()
// can be called from a different thread (JDBC spec requirement).
private volatile boolean serverOperationClosed = false; // Client proactively closed the server
// operation after all results were consumed or ResultSet was closed. Prevents duplicate
// closeStatement RPC when Statement.close() is called later. Reset on each new execution.
protected Boolean shouldReturnResultSet =
null; // Cached result of shouldReturnResultSetWithConfig()

Expand Down Expand Up @@ -149,15 +152,18 @@ public void close(boolean removeFromSession) throws DatabricksSQLException {
LOGGER.warn(warningMsg);
warnings = WarningUtil.addWarning(warnings, warningMsg);
} else {
// Skip server-side close if the server already closed the operation (direct results).
// The operation handle is gone on the server side, so closeStatement would fail.
if (!directResultsReceived) {
// Skip server-side close if operation was already closed:
// - directResultsReceived: server closed it (inline results)
// - serverOperationClosed: client proactively closed it (results consumed or RS closed)
if (!directResultsReceived && !serverOperationClosed) {
this.connection.getSession().getDatabricksClient().closeStatement(statementId);
} else {
LOGGER.debug(
"Statement {} closed locally (direct results — server operation already closed, "
+ "skipping closeStatement RPC)",
statementId);
"Statement {} closed locally (server operation already closed — "
+ "directResults={}, proactivelyClosed={}, skipping closeStatement RPC)",
statementId,
directResultsReceived,
serverOperationClosed);
}
if (resultSet != null) {
this.resultSet.close();
Expand Down Expand Up @@ -246,12 +252,11 @@ public void cancel() throws SQLException {
LOGGER.debug("public void cancel()");
checkIfClosed();

if (statementId != null && !directResultsReceived) {
if (statementId != null && !directResultsReceived && !serverOperationClosed) {
this.connection.getSession().getDatabricksClient().cancelStatement(statementId);
DatabricksThreadContextHolder.clearStatementInfo();
} else if (directResultsReceived) {
String warningMsg =
"Statement's server operation was already closed (direct results); cancel has no effect.";
} else if (directResultsReceived || serverOperationClosed) {
String warningMsg = "Statement's server operation was already closed; cancel has no effect.";
LOGGER.debug(warningMsg);
warnings = WarningUtil.addWarning(warnings, warningMsg);
} else {
Expand Down Expand Up @@ -694,17 +699,18 @@ public ResultSet getExecutionResult() throws SQLException {
"No execution available for statement", DatabricksDriverErrorCode.INPUT_VALIDATION_ERROR);
}

// For direct results, the server already closed the operation — making an RPC
// would return "not found". Return the cached result set instead.
if (directResultsReceived) {
// For direct results or proactively closed operations, the server operation is gone —
// making an RPC would return "not found". Return the cached result set instead.
if (directResultsReceived || serverOperationClosed) {
if (resultSet != null) {
LOGGER.debug(
"Returning cached result for statement {} (direct results received)", statementId);
"Returning cached result for statement {} (server operation already closed)",
statementId);
return resultSet;
}
throw new DatabricksSQLException(
"Direct results were received but no result set is available. "
+ "The server closed the operation and no further results can be fetched.",
"Server operation was already closed and no result set is available. "
+ "No further results can be fetched.",
DatabricksDriverErrorCode.INVALID_STATE);
}

Expand Down Expand Up @@ -953,6 +959,36 @@ public void markDirectResultsReceived() {
this.directResultsReceived = true;
}

/**
* Proactively closes the server-side operation to release server resources while keeping the
* client-side Statement open for reuse. Called when all results have been consumed (next()
* returns false) or when ResultSet.close() is called.
*
* <p>After this call, {@link #close(boolean)} will skip the closeStatement RPC since the server
* operation is already closed. The Statement can still be re-executed.
*/
void closeServerOperation() {
if (serverOperationClosed || directResultsReceived || statementId == null || isClosed) {
return;
}
try {
this.connection.getSession().getDatabricksClient().closeStatement(statementId);
// Only mark closed on success — if the RPC fails (transient network error),
// Statement.close() should retry the closeStatement RPC rather than skip it,
// to avoid leaving the server operation alive until session timeout.
this.serverOperationClosed = true;
LOGGER.debug(
"Proactively closed server operation for statement {} (results consumed)", statementId);
} catch (Exception e) {
// Best-effort — don't fail the user's iteration/close for a server cleanup failure.
// serverOperationClosed stays false so Statement.close() will retry the RPC.
LOGGER.debug(
"Failed to proactively close server operation for statement {}: {}",
statementId,
e.getMessage());
}
}

/**
* Resets statement state before a new execution (sync or async). Closes the previous server-side
* operation handle (if still open) and the local ResultSet, clears flags, and nulls the
Expand All @@ -969,9 +1005,10 @@ private void resetForNewExecution() {
// when the server returns unexpected responses (e.g., WireMock 404 in tests).
// For direct results, the server already closed the handle.

directResultsReceived = false;

// Per JDBC spec, re-executing a Statement implicitly closes the current ResultSet.
// Close BEFORE resetting flags — resultSet.close() → closeServerOperation() needs
// to see the current directResultsReceived/serverOperationClosed state to decide
// whether to send closeStatement RPC.
if (resultSet != null) {
try {
resultSet.close();
Expand All @@ -981,6 +1018,10 @@ private void resetForNewExecution() {
resultSet = null;
}

// Reset flags AFTER closing old ResultSet
directResultsReceived = false;
serverOperationClosed = false;

// Null out statementId so that if the new execution fails before setStatementId(),
// close() takes the statementId==null branch instead of sending closeStatement(stale-id)
statementId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.databricks.jdbc.exception.DatabricksSQLException;
import com.databricks.jdbc.exception.DatabricksSQLFeatureNotSupportedException;
import com.databricks.jdbc.model.core.StatementStatus;
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
import com.databricks.sdk.service.sql.StatementState;
import java.io.InputStream;
import java.sql.*;
Expand Down Expand Up @@ -1682,4 +1683,196 @@ public void testGetResultSet_WithNonRowcountQueryPrefixes_ReturnsResultSet() thr

statement.close();
}

// =========================================================================
// Proactive server operation close
// =========================================================================

@Test
public void testCloseServerOperation_closesServerAndSkipsRpcOnStatementClose() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);
statement.setStatementId(STATEMENT_ID);

// Proactively close the server operation (simulates next() returning false)
statement.closeServerOperation();

// Statement should still be open
assertFalse(statement.isClosed());

// closeStatement should have been called once (proactive close)
verify(client, times(1)).closeStatement(STATEMENT_ID);

// Now close the statement — should NOT call closeStatement again
statement.close();
verify(client, times(1)).closeStatement(STATEMENT_ID); // still 1, not 2
}

@Test
public void testCloseServerOperation_idempotent() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);
statement.setStatementId(STATEMENT_ID);

// Call twice — should only close once
statement.closeServerOperation();
statement.closeServerOperation();

verify(client, times(1)).closeStatement(STATEMENT_ID);
}

@Test
public void testCloseServerOperation_skippedForDirectResults() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);
statement.setStatementId(STATEMENT_ID);
statement.markDirectResultsReceived();

// Should be a no-op since directResults already closed the server operation
statement.closeServerOperation();

verify(client, never()).closeStatement(any(StatementId.class));
}

@Test
public void testCloseServerOperation_skippedWhenNoStatementId() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);
// statementId is null — should be a no-op

statement.closeServerOperation();

verify(client, never()).closeStatement(any(StatementId.class));
}

@Test
public void testCloseServerOperation_resetsAfterFlagClear() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);
statement.setStatementId(STATEMENT_ID);

// First proactive close
statement.closeServerOperation();
verify(client, times(1)).closeStatement(STATEMENT_ID);

// Second call is no-op (flag set)
statement.closeServerOperation();
verify(client, times(1)).closeStatement(STATEMENT_ID);

// Statement.close() is also no-op for server RPC (flag set)
statement.close();
verify(client, times(1)).closeStatement(STATEMENT_ID);
}

@Test
public void testCloseServerOperation_errorSwallowed() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);
statement.setStatementId(STATEMENT_ID);

// Server close fails — should not throw
doThrow(new DatabricksSQLException("Server error", DatabricksDriverErrorCode.SDK_CLIENT_ERROR))
.when(client)
.closeStatement(STATEMENT_ID);

assertDoesNotThrow(() -> statement.closeServerOperation());

// Flag should NOT be set on failure — Statement.close() should retry
verify(client, times(1)).closeStatement(STATEMENT_ID);
// The second closeServerOperation call should retry since flag wasn't set
reset(client); // clear the throw stub
statement.closeServerOperation();
verify(client, times(1)).closeStatement(STATEMENT_ID);
}

@Test
public void testCloseServerOperation_cancelSkipsAfterProactiveClose() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);
statement.setStatementId(STATEMENT_ID);

// Proactively close server operation
statement.closeServerOperation();
verify(client, times(1)).closeStatement(STATEMENT_ID);

// cancel() should be a no-op — server operation already closed
statement.cancel();
verify(client, never()).cancelStatement(any(StatementId.class));
}

@Test
public void testCloseServerOperation_getExecutionResultReturnsCachedAfterClose()
throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);
statement.setStatementId(STATEMENT_ID);
// Set resultSet field via reflection to simulate executeQuery having run
java.lang.reflect.Field rsField = DatabricksStatement.class.getDeclaredField("resultSet");
rsField.setAccessible(true);
rsField.set(statement, resultSet);

// Proactively close server operation
statement.closeServerOperation();

// getExecutionResult() should return cached result, not make RPC
ResultSet cached = statement.getExecutionResult();
assertNotNull(cached);
assertEquals(resultSet, cached);
verify(client, never())
.getStatementResult(any(StatementId.class), any(IDatabricksSession.class), any());
}

@Test
public void testStatementReusableAfterProactiveClose() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);

when(client.executeStatement(
eq(STATEMENT),
eq(new Warehouse(WAREHOUSE_ID)),
eq(new HashMap<>()),
eq(StatementType.QUERY),
any(IDatabricksSession.class),
eq(statement),
any()))
.thenReturn(resultSet);

// First execution
statement.executeQuery(STATEMENT);
statement.closeServerOperation();
assertFalse(statement.isClosed(), "Statement should stay open after proactive close");

// Re-execute — should work, flag reset by resetForNewExecution
statement.executeQuery(STATEMENT);
assertFalse(statement.isClosed());

// Both executions should have called executeStatement
verify(client, times(2))
.executeStatement(
eq(STATEMENT),
eq(new Warehouse(WAREHOUSE_ID)),
eq(new HashMap<>()),
eq(StatementType.QUERY),
any(IDatabricksSession.class),
eq(statement),
any());
}
}