Skip to content

Replace oneshot channel with JoinHandle in IndexReader prefetch#21744

Open
himshikhagupta wants to merge 1 commit into
opensearch-project:mainfrom
himshikhagupta:fix/prefetch-panic-thread-stuck
Open

Replace oneshot channel with JoinHandle in IndexReader prefetch#21744
himshikhagupta wants to merge 1 commit into
opensearch-project:mainfrom
himshikhagupta:fix/prefetch-panic-thread-stuck

Conversation

@himshikhagupta
Copy link
Copy Markdown
Contributor

Description

IndexReader used a oneshot channel between spawn_blocking and the poll loop for row group prefetch. When the blocking task panicked (e.g. DelegationPossible reaching subtree_cost in the bitmap tree evaluator), the sender was dropped without sending. The Poll::Ready(Err(_)) arm retried unconditionally, creating an infinite loop that left the Java search thread stuck on future.join() forever.

Replace the oneshot channel with JoinHandle from spawn_blocking, which provides explicit panic detection via JoinError::is_panic():

  • Panic: propagate immediately as DataFusionError with the panic message
  • Cancelled (runtime shutdown): retry (transient, preserves existing behavior)

This also fixes waker registration: JoinHandle properly wakes the parent task on completion, unlike the oneshot pattern which required the receiver to be polled before it could register a waker.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@himshikhagupta himshikhagupta requested a review from a team as a code owner May 19, 2026 17:54
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 19, 2026

PR Reviewer Guide 🔍

(Review updated until commit 3fecbee)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Waker not registered

When poll_next_row_group returns Poll::Pending after starting a prefetch (line 237), the waker is not registered with the JoinHandle. The task will complete but won't wake the parent future. This occurs when the JoinHandle hasn't been polled yet (lines 199-238 check pending_prefetch but only poll if it exists). The first call to start_prefetch creates the handle but returns Pending without polling it, so no waker is registered. Subsequent polls will eventually poll the handle, but the initial Pending return leaves the future unwakeable until an external event triggers another poll.

    let handle = tokio::task::spawn_blocking(move || {
        Self::fetch_row_group(&evaluator, &row_groups, rg_idx, doc_range)
    });
    self.pending_prefetch = Some(handle);
}

fn poll_next_row_group(
    &mut self,
    cx: &mut Context<'_>,
) -> Poll<std::result::Result<Option<PrefetchedRowGroup>, DataFusionError>> {
    loop {
        if self.current_rg_idx >= self.row_groups.len() {
            return Poll::Ready(Ok(None));
        }
        if let Some(result) = self.cached_result.take() {
            self.current_rg_idx += 1;
            self.start_prefetch(self.current_rg_idx);
            match result {
                Ok(Some(p)) => return Poll::Ready(Ok(Some(p))),
                Ok(None) => {
                    // RG had no candidates — skipped without a
                    // parquet read. Count for EXPLAIN ANALYZE.
                    if let Some(ref c) = self.rg_skipped {
                        c.add(1);
                    }
                    continue;
                }
                Err(e) => return Poll::Ready(Err(DataFusionError::External(e.into()))),
            }
        }
        if let Some(ref mut rx) = self.pending_prefetch {
            match Pin::new(rx).poll(cx) {
                Poll::Ready(Ok(result)) => {
                    // If we had parked on this receiver, account the
                    // elapsed wall-clock as prefetch_wait_time.
                    if let Some(started) = self.pending_since.take() {
                        if let Some(ref t) = self.prefetch_wait_time {
                            t.add_duration(started.elapsed());
                        }
                    }
                    self.pending_prefetch = None;
                    self.cached_result = Some(result);
                    continue;
                }
                Poll::Ready(Err(join_error)) => {
                    // The spawn_blocking task failed to complete.
                    // JoinError distinguishes panic from cancellation.
                    self.pending_prefetch = None;
                    self.pending_since = None;
                    if join_error.is_panic() {
                        // Deterministic failure (e.g. subtree_cost invariant
                        // violation). Propagate immediately — retrying would
                        // loop forever and hang the calling Java thread.
                        let payload = join_error.into_panic();
                        let panic_msg = payload
                            .downcast_ref::<String>()
                            .cloned()
                            .or_else(|| payload.downcast_ref::<&str>().map(|s| s.to_string()))
                            .unwrap_or_else(|| "unknown panic".into());
                        return Poll::Ready(Err(DataFusionError::Execution(
                            format!(
                                "prefetch for row group {} panicked: {}",
                                self.current_rg_idx, panic_msg
                            ),
                        )));
                    }
                    // Task was cancelled (runtime shutting down) — retry once
                    self.start_prefetch(self.current_rg_idx);
                    return Poll::Pending;

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 431aaf4: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

IndexReader used a oneshot channel between spawn_blocking and the poll
loop for row group prefetch. When the blocking task panicked (e.g.
DelegationPossible reaching subtree_cost in the bitmap tree evaluator),
the sender was dropped without sending. The Poll::Ready(Err(_)) arm
retried unconditionally, creating an infinite loop that left the Java
search thread stuck on future.join() forever.

Replace the oneshot channel with JoinHandle from spawn_blocking, which
provides explicit panic detection via JoinError::is_panic():
- Panic: propagate immediately as DataFusionError with the panic message
- Cancelled (runtime shutdown): retry (transient, preserves existing behavior)

This also fixes waker registration: JoinHandle properly wakes the parent
task on completion, unlike the oneshot pattern which required the receiver
to be polled before it could register a waker.

Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
@himshikhagupta himshikhagupta force-pushed the fix/prefetch-panic-thread-stuck branch from 431aaf4 to 3fecbee Compare May 20, 2026 05:42
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3fecbee

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 3fecbee: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant