diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 9716a8d3b..9c50adf4b 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -14,6 +14,7 @@ ### Updated - `EnableGeoSpatialSupport` no longer requires `EnableComplexDatatypeSupport=1`. Geospatial types (GEOMETRY, GEOGRAPHY) can now be enabled independently of complex type support (ARRAY, MAP, STRUCT). +- Server-side operations are now closed proactively when all results are consumed (`next()` returns false) or `ResultSet.close()` is called, improving resource utilization. The client-side Statement remains open and reusable for re-execution. ### Fixed diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java index a83956528..510095009 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java @@ -283,11 +283,18 @@ public boolean next() throws SQLException { cachedTelemetryCollector.recordResultSetIteration( statementId.toSQLExecStatementId(), resultSetMetaData.getChunkCount(), hasNext); } + if (!hasNext) { + // All rows consumed — proactively close server operation to release resources. + // The client-side Statement remains open for reuse. + closeServerOperation(); + } return hasNext; } @Override public void close() throws DatabricksSQLException { + // Proactively close server operation when ResultSet is closed explicitly. + closeServerOperation(); isClosed = true; this.executionResult.close(); if (parentStatement != null) { @@ -295,6 +302,13 @@ public void close() throws DatabricksSQLException { } } + /** Proactively closes the server-side operation via the parent statement. */ + private void closeServerOperation() { + if (parentStatement instanceof DatabricksStatement) { + ((DatabricksStatement) parentStatement).closeServerOperation(); + } + } + private static TelemetryCollector resolveTelemetryCollector( IDatabricksStatementInternal parentStatement) { try { diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java index c9b6733e7..991f98254 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java @@ -59,6 +59,9 @@ public class DatabricksStatement implements IDatabricksStatement, IDatabricksSta // closed the operation — no further RPCs for this statement ID are possible. The JDBC Statement // itself remains open for re-execution. Reset on each new execution. Volatile because cancel() // can be called from a different thread (JDBC spec requirement). + private volatile boolean serverOperationClosed = false; // Client proactively closed the server + // operation after all results were consumed or ResultSet was closed. Prevents duplicate + // closeStatement RPC when Statement.close() is called later. Reset on each new execution. protected Boolean shouldReturnResultSet = null; // Cached result of shouldReturnResultSetWithConfig() @@ -149,15 +152,18 @@ public void close(boolean removeFromSession) throws DatabricksSQLException { LOGGER.warn(warningMsg); warnings = WarningUtil.addWarning(warnings, warningMsg); } else { - // Skip server-side close if the server already closed the operation (direct results). - // The operation handle is gone on the server side, so closeStatement would fail. - if (!directResultsReceived) { + // Skip server-side close if operation was already closed: + // - directResultsReceived: server closed it (inline results) + // - serverOperationClosed: client proactively closed it (results consumed or RS closed) + if (!directResultsReceived && !serverOperationClosed) { this.connection.getSession().getDatabricksClient().closeStatement(statementId); } else { LOGGER.debug( - "Statement {} closed locally (direct results — server operation already closed, " - + "skipping closeStatement RPC)", - statementId); + "Statement {} closed locally (server operation already closed — " + + "directResults={}, proactivelyClosed={}, skipping closeStatement RPC)", + statementId, + directResultsReceived, + serverOperationClosed); } if (resultSet != null) { this.resultSet.close(); @@ -246,12 +252,11 @@ public void cancel() throws SQLException { LOGGER.debug("public void cancel()"); checkIfClosed(); - if (statementId != null && !directResultsReceived) { + if (statementId != null && !directResultsReceived && !serverOperationClosed) { this.connection.getSession().getDatabricksClient().cancelStatement(statementId); DatabricksThreadContextHolder.clearStatementInfo(); - } else if (directResultsReceived) { - String warningMsg = - "Statement's server operation was already closed (direct results); cancel has no effect."; + } else if (directResultsReceived || serverOperationClosed) { + String warningMsg = "Statement's server operation was already closed; cancel has no effect."; LOGGER.debug(warningMsg); warnings = WarningUtil.addWarning(warnings, warningMsg); } else { @@ -694,17 +699,18 @@ public ResultSet getExecutionResult() throws SQLException { "No execution available for statement", DatabricksDriverErrorCode.INPUT_VALIDATION_ERROR); } - // For direct results, the server already closed the operation — making an RPC - // would return "not found". Return the cached result set instead. - if (directResultsReceived) { + // For direct results or proactively closed operations, the server operation is gone — + // making an RPC would return "not found". Return the cached result set instead. + if (directResultsReceived || serverOperationClosed) { if (resultSet != null) { LOGGER.debug( - "Returning cached result for statement {} (direct results received)", statementId); + "Returning cached result for statement {} (server operation already closed)", + statementId); return resultSet; } throw new DatabricksSQLException( - "Direct results were received but no result set is available. " - + "The server closed the operation and no further results can be fetched.", + "Server operation was already closed and no result set is available. " + + "No further results can be fetched.", DatabricksDriverErrorCode.INVALID_STATE); } @@ -953,6 +959,36 @@ public void markDirectResultsReceived() { this.directResultsReceived = true; } + /** + * Proactively closes the server-side operation to release server resources while keeping the + * client-side Statement open for reuse. Called when all results have been consumed (next() + * returns false) or when ResultSet.close() is called. + * + *

After this call, {@link #close(boolean)} will skip the closeStatement RPC since the server + * operation is already closed. The Statement can still be re-executed. + */ + void closeServerOperation() { + if (serverOperationClosed || directResultsReceived || statementId == null || isClosed) { + return; + } + try { + this.connection.getSession().getDatabricksClient().closeStatement(statementId); + // Only mark closed on success — if the RPC fails (transient network error), + // Statement.close() should retry the closeStatement RPC rather than skip it, + // to avoid leaving the server operation alive until session timeout. + this.serverOperationClosed = true; + LOGGER.debug( + "Proactively closed server operation for statement {} (results consumed)", statementId); + } catch (Exception e) { + // Best-effort — don't fail the user's iteration/close for a server cleanup failure. + // serverOperationClosed stays false so Statement.close() will retry the RPC. + LOGGER.debug( + "Failed to proactively close server operation for statement {}: {}", + statementId, + e.getMessage()); + } + } + /** * Resets statement state before a new execution (sync or async). Closes the previous server-side * operation handle (if still open) and the local ResultSet, clears flags, and nulls the @@ -969,9 +1005,10 @@ private void resetForNewExecution() { // when the server returns unexpected responses (e.g., WireMock 404 in tests). // For direct results, the server already closed the handle. - directResultsReceived = false; - // Per JDBC spec, re-executing a Statement implicitly closes the current ResultSet. + // Close BEFORE resetting flags — resultSet.close() → closeServerOperation() needs + // to see the current directResultsReceived/serverOperationClosed state to decide + // whether to send closeStatement RPC. if (resultSet != null) { try { resultSet.close(); @@ -981,6 +1018,10 @@ private void resetForNewExecution() { resultSet = null; } + // Reset flags AFTER closing old ResultSet + directResultsReceived = false; + serverOperationClosed = false; + // Null out statementId so that if the new execution fails before setStatementId(), // close() takes the statementId==null branch instead of sending closeStatement(stale-id) statementId = null; diff --git a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java index ff5d9e7ec..179742ac3 100644 --- a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java +++ b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java @@ -16,6 +16,7 @@ import com.databricks.jdbc.exception.DatabricksSQLException; import com.databricks.jdbc.exception.DatabricksSQLFeatureNotSupportedException; import com.databricks.jdbc.model.core.StatementStatus; +import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode; import com.databricks.sdk.service.sql.StatementState; import java.io.InputStream; import java.sql.*; @@ -1682,4 +1683,196 @@ public void testGetResultSet_WithNonRowcountQueryPrefixes_ReturnsResultSet() thr statement.close(); } + + // ========================================================================= + // Proactive server operation close + // ========================================================================= + + @Test + public void testCloseServerOperation_closesServerAndSkipsRpcOnStatementClose() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + + // Proactively close the server operation (simulates next() returning false) + statement.closeServerOperation(); + + // Statement should still be open + assertFalse(statement.isClosed()); + + // closeStatement should have been called once (proactive close) + verify(client, times(1)).closeStatement(STATEMENT_ID); + + // Now close the statement — should NOT call closeStatement again + statement.close(); + verify(client, times(1)).closeStatement(STATEMENT_ID); // still 1, not 2 + } + + @Test + public void testCloseServerOperation_idempotent() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + + // Call twice — should only close once + statement.closeServerOperation(); + statement.closeServerOperation(); + + verify(client, times(1)).closeStatement(STATEMENT_ID); + } + + @Test + public void testCloseServerOperation_skippedForDirectResults() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + statement.markDirectResultsReceived(); + + // Should be a no-op since directResults already closed the server operation + statement.closeServerOperation(); + + verify(client, never()).closeStatement(any(StatementId.class)); + } + + @Test + public void testCloseServerOperation_skippedWhenNoStatementId() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + // statementId is null — should be a no-op + + statement.closeServerOperation(); + + verify(client, never()).closeStatement(any(StatementId.class)); + } + + @Test + public void testCloseServerOperation_resetsAfterFlagClear() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + + // First proactive close + statement.closeServerOperation(); + verify(client, times(1)).closeStatement(STATEMENT_ID); + + // Second call is no-op (flag set) + statement.closeServerOperation(); + verify(client, times(1)).closeStatement(STATEMENT_ID); + + // Statement.close() is also no-op for server RPC (flag set) + statement.close(); + verify(client, times(1)).closeStatement(STATEMENT_ID); + } + + @Test + public void testCloseServerOperation_errorSwallowed() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + + // Server close fails — should not throw + doThrow(new DatabricksSQLException("Server error", DatabricksDriverErrorCode.SDK_CLIENT_ERROR)) + .when(client) + .closeStatement(STATEMENT_ID); + + assertDoesNotThrow(() -> statement.closeServerOperation()); + + // Flag should NOT be set on failure — Statement.close() should retry + verify(client, times(1)).closeStatement(STATEMENT_ID); + // The second closeServerOperation call should retry since flag wasn't set + reset(client); // clear the throw stub + statement.closeServerOperation(); + verify(client, times(1)).closeStatement(STATEMENT_ID); + } + + @Test + public void testCloseServerOperation_cancelSkipsAfterProactiveClose() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + + // Proactively close server operation + statement.closeServerOperation(); + verify(client, times(1)).closeStatement(STATEMENT_ID); + + // cancel() should be a no-op — server operation already closed + statement.cancel(); + verify(client, never()).cancelStatement(any(StatementId.class)); + } + + @Test + public void testCloseServerOperation_getExecutionResultReturnsCachedAfterClose() + throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + statement.setStatementId(STATEMENT_ID); + // Set resultSet field via reflection to simulate executeQuery having run + java.lang.reflect.Field rsField = DatabricksStatement.class.getDeclaredField("resultSet"); + rsField.setAccessible(true); + rsField.set(statement, resultSet); + + // Proactively close server operation + statement.closeServerOperation(); + + // getExecutionResult() should return cached result, not make RPC + ResultSet cached = statement.getExecutionResult(); + assertNotNull(cached); + assertEquals(resultSet, cached); + verify(client, never()) + .getStatementResult(any(StatementId.class), any(IDatabricksSession.class), any()); + } + + @Test + public void testStatementReusableAfterProactiveClose() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement), + any())) + .thenReturn(resultSet); + + // First execution + statement.executeQuery(STATEMENT); + statement.closeServerOperation(); + assertFalse(statement.isClosed(), "Statement should stay open after proactive close"); + + // Re-execute — should work, flag reset by resetForNewExecution + statement.executeQuery(STATEMENT); + assertFalse(statement.isClosed()); + + // Both executions should have called executeStatement + verify(client, times(2)) + .executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement), + any()); + } }