-
Notifications
You must be signed in to change notification settings - Fork 39
[PECOBLR-2321] Result Set Heartbeat / Keep-Alive for Ongoing Query Executions #1415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
00b8d4b
6130304
d3f0c47
cedf144
5d9aadc
fa92347
291be6c
fe84cc4
53db645
994dbc2
a51bccc
723ce06
7534523
d24d62a
7631ee3
469a459
a037ae1
b881d4c
1910332
a859117
458aa5e
a40466a
832a1c8
fc46126
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |||||||||||||
| import com.databricks.jdbc.common.Nullable; | ||||||||||||||
| import com.databricks.jdbc.common.StatementType; | ||||||||||||||
| import com.databricks.jdbc.common.util.WarningUtil; | ||||||||||||||
| import com.databricks.jdbc.dbclient.IDatabricksClient; | ||||||||||||||
| import com.databricks.jdbc.dbclient.impl.common.StatementId; | ||||||||||||||
| import com.databricks.jdbc.exception.DatabricksParsingException; | ||||||||||||||
| import com.databricks.jdbc.exception.DatabricksSQLException; | ||||||||||||||
|
|
@@ -123,6 +124,7 @@ public DatabricksResultSet( | |||||||||||||
| this.cachedTelemetryCollector = resolveTelemetryCollector(parentStatement); | ||||||||||||||
| this.isClosed = false; | ||||||||||||||
| this.wasNull = false; | ||||||||||||||
| startHeartbeatIfEnabled(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @VisibleForTesting | ||||||||||||||
|
|
@@ -191,6 +193,7 @@ public DatabricksResultSet( | |||||||||||||
| this.cachedTelemetryCollector = resolveTelemetryCollector(parentStatement); | ||||||||||||||
| this.isClosed = false; | ||||||||||||||
| this.wasNull = false; | ||||||||||||||
| startHeartbeatIfEnabled(); // C4 fix: Thrift result sets also need heartbeat | ||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [High] Heartbeat starts on already-closed operations — 10 ticks of failed RPCs per tiny query The eligibility check at construction time evaluates Thrift Empirically verified against pecotestingworkspace (interval=5s for fast turnaround):
The self-stop after exactly 11 ticks at the 5s interval is the precise signature of the 10-strike-failure path firing. mitmproxy captured the wire-level evidence: At default Important refinement of the prior reviewer's finding: the bug is not Thrift-direct-results-specific. It affects any inline/small-result query on either protocol. The Fix options:
|
||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /* Constructing results for getUDTs, getTypeInfo, getProcedures metadata calls */ | ||||||||||||||
|
|
@@ -278,23 +281,198 @@ public DatabricksResultSet( | |||||||||||||
| @Override | ||||||||||||||
| public boolean next() throws SQLException { | ||||||||||||||
| checkIfClosed(); | ||||||||||||||
| if (executionResult == null) { | ||||||||||||||
| throw new DatabricksSQLException( | ||||||||||||||
| "Cannot iterate: no result data available. " | ||||||||||||||
| + "For async execution, call getExecutionResult() first.", | ||||||||||||||
| DatabricksDriverErrorCode.INVALID_STATE); | ||||||||||||||
| } | ||||||||||||||
| boolean hasNext = this.executionResult.next(); | ||||||||||||||
| if (cachedTelemetryCollector != null) { | ||||||||||||||
| cachedTelemetryCollector.recordResultSetIteration( | ||||||||||||||
| statementId.toSQLExecStatementId(), resultSetMetaData.getChunkCount(), hasNext); | ||||||||||||||
| } | ||||||||||||||
| if (!hasNext) { | ||||||||||||||
| stopHeartbeat(); | ||||||||||||||
| } | ||||||||||||||
| return hasNext; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public void close() throws DatabricksSQLException { | ||||||||||||||
| stopHeartbeat(); | ||||||||||||||
| isClosed = true; | ||||||||||||||
| this.executionResult.close(); | ||||||||||||||
| if (executionResult != null) { | ||||||||||||||
| executionResult.close(); | ||||||||||||||
| } | ||||||||||||||
| if (parentStatement != null) { | ||||||||||||||
| parentStatement.handleResultSetClose(this); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** Starts heartbeat polling if enabled on the connection and this result set is eligible. */ | ||||||||||||||
| private void startHeartbeatIfEnabled() { | ||||||||||||||
| if (parentStatement == null || statementId == null) { | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
| if (!isHeartbeatEligible()) { | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| try { | ||||||||||||||
| // C3 fix: Use JDBC unwrap() to handle pooled connection wrappers (HikariCP, DBCP) | ||||||||||||||
| java.sql.Connection rawConn = parentStatement.getStatement().getConnection(); | ||||||||||||||
| DatabricksConnection conn; | ||||||||||||||
| if (rawConn instanceof DatabricksConnection) { | ||||||||||||||
| conn = (DatabricksConnection) rawConn; | ||||||||||||||
| } else if (rawConn.isWrapperFor(DatabricksConnection.class)) { | ||||||||||||||
| conn = rawConn.unwrap(DatabricksConnection.class); | ||||||||||||||
| } else { | ||||||||||||||
| LOGGER.debug("Cannot start heartbeat: connection is not a DatabricksConnection"); | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| ResultHeartbeatManager mgr = conn.getHeartbeatManager(); | ||||||||||||||
| if (mgr == null) { | ||||||||||||||
| return; // heartbeat not enabled | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // C2 fix: Capture only what the lambda needs — avoid capturing 'this' to prevent | ||||||||||||||
| // abandoned ResultSets from keeping the warehouse alive via heartbeat. | ||||||||||||||
| // Note: capturing 'client' retains a reference to the session/connection. If the | ||||||||||||||
| // connection is GC'd without close(), heartbeat RPCs will fail and self-stop after | ||||||||||||||
| // maxConsecutiveFailures (10 ticks, ~10 min at 60s interval). Acceptable tradeoff. | ||||||||||||||
| final IDatabricksClient client = conn.getSession().getDatabricksClient(); | ||||||||||||||
| final StatementId capturedStatementId = this.statementId; | ||||||||||||||
| final int maxConsecutiveFailures = 10; | ||||||||||||||
| final java.util.concurrent.atomic.AtomicInteger consecutiveFailures = | ||||||||||||||
| new java.util.concurrent.atomic.AtomicInteger(0); | ||||||||||||||
| // C1 fix: Read the stopped flag from the manager on each tick instead of pre-capturing. | ||||||||||||||
| // Pre-capturing caused an orphan-flag bug: startHeartbeat() internally calls | ||||||||||||||
| // stopHeartbeat() which removes and replaces the flag, leaving the lambda with a | ||||||||||||||
| // permanently-true reference. Reading from the manager each tick always gets the | ||||||||||||||
| // current flag. | ||||||||||||||
| final ResultHeartbeatManager capturedMgr = mgr; | ||||||||||||||
|
|
||||||||||||||
| Runnable heartbeatTask = | ||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [CRITICAL] Lambda strong-captures This lambda invokes The future is held in
The C# ADBC reference avoids this: its poller is per-statement with linked cancellation, so even GC of the statement helps. The Java implementation here is connection-scoped, so GC of the ResultSet alone won't help — the future keeps a hard reference back to the ResultSet. Fix: Don't capture |
||||||||||||||
| () -> { | ||||||||||||||
| // C1 fix: read current flag each tick | ||||||||||||||
| java.util.concurrent.atomic.AtomicBoolean stopped = | ||||||||||||||
| capturedMgr.getStoppedFlag(capturedStatementId); | ||||||||||||||
| if (stopped.get()) { | ||||||||||||||
| return; // client/session may be closed, skip RPC | ||||||||||||||
| } | ||||||||||||||
| try { | ||||||||||||||
| boolean alive = client.checkStatementAlive(capturedStatementId); | ||||||||||||||
| consecutiveFailures.set(0); // reset on success | ||||||||||||||
| if (!alive) { | ||||||||||||||
| LOGGER.info( | ||||||||||||||
| "Heartbeat detected terminal state for statement {}, stopping", | ||||||||||||||
| capturedStatementId); | ||||||||||||||
| capturedMgr.stopHeartbeat(capturedStatementId); | ||||||||||||||
| } | ||||||||||||||
| } catch (Exception e) { | ||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Medium] The heartbeat lambda at line 374 catches
Empirically demonstrated: a JUnit test wires a task that throws
The consequence is worse than swallowing exceptions: there's no Fix: } catch (Throwable t) {
if (capturedMgr.getStoppedFlag(capturedStatementId).get()) return;
// ... same failure-counter logic ...
if (t instanceof Error && !(t instanceof VirtualMachineError)) {
// log + stop cleanly; VirtualMachineError should still propagate
capturedMgr.stopHeartbeat(capturedStatementId);
}
if (t instanceof VirtualMachineError) throw (VirtualMachineError) t;
}(Or at minimum, change the catch to |
||||||||||||||
| // Re-read flag — may have been set during the RPC (connection closing) | ||||||||||||||
| if (capturedMgr.getStoppedFlag(capturedStatementId).get()) { | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
| int failures = consecutiveFailures.incrementAndGet(); | ||||||||||||||
| if (failures == 1) { | ||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Medium] Default The default Empirically verified with a JUnit test that builds a no-override
User-visible consequence: if anyone wires a custom The WARN says "results may expire" — but the actual cause is a missing client-side override. Fix options:
|
||||||||||||||
| LOGGER.info( | ||||||||||||||
| "Heartbeat failed for statement {} (first failure): {}", | ||||||||||||||
| capturedStatementId, | ||||||||||||||
| e.getMessage()); | ||||||||||||||
| } else { | ||||||||||||||
| LOGGER.debug( | ||||||||||||||
| "Heartbeat failed for statement {} (failure {}/{}): {}", | ||||||||||||||
| capturedStatementId, | ||||||||||||||
| failures, | ||||||||||||||
| maxConsecutiveFailures, | ||||||||||||||
| e.getMessage()); | ||||||||||||||
| } | ||||||||||||||
| if (failures >= maxConsecutiveFailures) { | ||||||||||||||
| LOGGER.warn( | ||||||||||||||
| "Heartbeat stopped for statement {} after {} consecutive failures. " | ||||||||||||||
| + "Server-side results may expire. Last error: {}", | ||||||||||||||
| capturedStatementId, | ||||||||||||||
| failures, | ||||||||||||||
| e.getMessage()); | ||||||||||||||
| capturedMgr.stopHeartbeat(capturedStatementId); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| mgr.startHeartbeat(capturedStatementId, heartbeatTask); | ||||||||||||||
| LOGGER.debug( | ||||||||||||||
| "Heartbeat started for statement {} (resultType={}, interval={}s)", | ||||||||||||||
| capturedStatementId, | ||||||||||||||
| resultSetType, | ||||||||||||||
| mgr.getIntervalSeconds()); | ||||||||||||||
| } catch (Exception e) { | ||||||||||||||
| LOGGER.debug("Failed to start heartbeat: {}", e.getMessage()); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** Stops the heartbeat for this result set's statement. Idempotent. */ | ||||||||||||||
| private void stopHeartbeat() { | ||||||||||||||
| if (parentStatement == null || statementId == null) { | ||||||||||||||
| return; | ||||||||||||||
| } | ||||||||||||||
| try { | ||||||||||||||
| DatabricksConnection conn = | ||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [High] Asymmetric with the C3 fix on the start path. DatabricksConnection conn =
(DatabricksConnection) parentStatement.getStatement().getConnection();On any pooled connection this throws Empirical verification:
Fix: Extract a |
||||||||||||||
| (DatabricksConnection) parentStatement.getStatement().getConnection(); | ||||||||||||||
| ResultHeartbeatManager mgr = conn.getHeartbeatManager(); | ||||||||||||||
| if (mgr != null) { | ||||||||||||||
| mgr.stopHeartbeat(statementId); | ||||||||||||||
| } | ||||||||||||||
| } catch (Exception e) { | ||||||||||||||
| LOGGER.debug("Failed to stop heartbeat: {}", e.getMessage()); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** | ||||||||||||||
| * Determines whether this result set is eligible for heartbeat polling. Package-visible for | ||||||||||||||
| * testing. | ||||||||||||||
| * | ||||||||||||||
| * <p>Heartbeat is NOT needed when: | ||||||||||||||
| * | ||||||||||||||
| * <ul> | ||||||||||||||
| * <li>No execution result (nothing to fetch, also covers async PENDING/RUNNING with no data) | ||||||||||||||
| * <li>SEA inline (InlineJsonResult): all rows loaded in memory at construction | ||||||||||||||
| * <li>Update count (DML): no result rows to keep alive | ||||||||||||||
| * <li>Direct results (CLOSED state): server already closed, data fully delivered | ||||||||||||||
| * <li>Async execution (PENDING/RUNNING): user controls polling via getExecutionResult() | ||||||||||||||
| * </ul> | ||||||||||||||
| */ | ||||||||||||||
| boolean isHeartbeatEligible() { | ||||||||||||||
| // No execution result — nothing to fetch | ||||||||||||||
| if (executionResult == null) { | ||||||||||||||
| return false; | ||||||||||||||
| } | ||||||||||||||
| // SEA inline — all data loaded in memory at construction | ||||||||||||||
| if (resultSetType == ResultSetType.SEA_INLINE) { | ||||||||||||||
| return false; | ||||||||||||||
| } | ||||||||||||||
| // Update count — no result rows | ||||||||||||||
| if (statementType == StatementType.UPDATE) { | ||||||||||||||
| return false; | ||||||||||||||
| } | ||||||||||||||
| // Check execution state | ||||||||||||||
| if (executionStatus != null) { | ||||||||||||||
| com.databricks.jdbc.api.ExecutionState state = executionStatus.getExecutionState(); | ||||||||||||||
| // Direct results — server already closed | ||||||||||||||
| if (state == com.databricks.jdbc.api.ExecutionState.CLOSED) { | ||||||||||||||
| return false; | ||||||||||||||
| } | ||||||||||||||
| // Async execution — user controls polling | ||||||||||||||
| if (state == com.databricks.jdbc.api.ExecutionState.PENDING | ||||||||||||||
| || state == com.databricks.jdbc.api.ExecutionState.RUNNING) { | ||||||||||||||
| return false; | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| return true; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| private static TelemetryCollector resolveTelemetryCollector( | ||||||||||||||
| IDatabricksStatementInternal parentStatement) { | ||||||||||||||
| try { | ||||||||||||||
|
|
||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -172,6 +172,13 @@ public void close(boolean removeFromSession) throws DatabricksSQLException { | |
| this.connection.closeStatement(this); | ||
| } | ||
| DatabricksThreadContextHolder.clearStatementInfo(); | ||
| // Safety net: stop any heartbeat for this statement | ||
| if (statementId != null) { | ||
| ResultHeartbeatManager mgr = connection.getHeartbeatManager(); | ||
| if (mgr != null) { | ||
| mgr.stopHeartbeat(statementId); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [HIGH] This After Fix: Add a heartbeat stop to public void cancel() throws SQLException {
...
if (statementId != null) {
ResultHeartbeatManager mgr = connection.getHeartbeatManager();
if (mgr != null) {
mgr.stopHeartbeat(statementId);
}
}
this.connection.getSession().getDatabricksClient().cancelStatement(statementId);
...
} |
||
| } | ||
| } | ||
| shutDownExecutor(); | ||
| this.updateCount = -1; | ||
| this.isClosed = true; | ||
|
|
@@ -246,6 +253,15 @@ public void cancel() throws SQLException { | |
| LOGGER.debug("public void cancel()"); | ||
| checkIfClosed(); | ||
|
|
||
| // H11 fix: Stop heartbeat on cancel — server operation is being cancelled, | ||
| // no point continuing to poll it | ||
| if (statementId != null) { | ||
| ResultHeartbeatManager mgr = connection.getHeartbeatManager(); | ||
| if (mgr != null) { | ||
| mgr.stopHeartbeat(statementId); | ||
| } | ||
| } | ||
|
|
||
| if (statementId != null && !directResultsReceived) { | ||
| this.connection.getSession().getDatabricksClient().cancelStatement(statementId); | ||
| DatabricksThreadContextHolder.clearStatementInfo(); | ||
|
|
@@ -672,6 +688,8 @@ public ResultSet executeAsync(String sql) throws SQLException { | |
| LOGGER.debug("ResultSet executeAsync() for statement {%s}", sql); | ||
| checkIfClosed(); | ||
|
|
||
| // No heartbeat during async wait — the user controls polling via getExecutionResult(). | ||
| // Heartbeat starts later when the ResultSet is constructed (after getExecutionResult()). | ||
| resetForNewExecution(); | ||
|
|
||
| IDatabricksClient client = connection.getSession().getDatabricksClient(); | ||
|
|
@@ -969,6 +987,16 @@ private void resetForNewExecution() { | |
| // when the server returns unexpected responses (e.g., WireMock 404 in tests). | ||
| // For direct results, the server already closed the handle. | ||
|
|
||
| // Stop heartbeat for the previous execution before clearing state. | ||
| // Without this, the old heartbeat (keyed by old statementId) would fail and self-terminate | ||
| // after 10 consecutive failures — wasteful and noisy in logs. | ||
| if (statementId != null) { | ||
| ResultHeartbeatManager mgr = connection.getHeartbeatManager(); | ||
| if (mgr != null) { | ||
| mgr.stopHeartbeat(statementId); | ||
| } | ||
| } | ||
|
|
||
| directResultsReceived = false; | ||
|
|
||
| // Per JDBC spec, re-executing a Statement implicitly closes the current ResultSet. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CRITICAL] Heartbeat never starts on Thrift result sets — feature is dead-on-arrival on the Thrift path
The Thrift constructor (this method, lines 153-196) does not call
startHeartbeatIfEnabled(). Only the SEA constructor at line 127 does.All Thrift result sets are constructed via
DatabricksThriftAccessor(executeStatement,getStatementResult, etc.) using this constructor — so on atransportMode=thriftconnection withEnableHeartbeat=1, the manager is created and the eligibility logic correctly returns true forTHRIFT_INLINE/THRIFT_ARROW_ENABLED, but no heartbeat ever starts.Per the design doc's eligibility table, Thrift inline (data only on cluster, server-evictable) is one of the most critical scenarios this feature is meant to cover. It's silently broken.
The eligibility tests in
ResultSetHeartbeatEligibilityTest.testThriftInlineIsEligible/testThriftArrowIsEligiblemock the instance via reflection and bypass the constructor entirely, so they pass while production reality is broken.Fix: Add
startHeartbeatIfEnabled();at the end of this constructor (line 196). Add a real-constructor smoke test that builds a ThriftDatabricksResultSetvia the production constructor and assertsmgr.getActiveHeartbeatCount() == 1.