Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task::next() Deadlocks with Pipeline Dependencies in Single-threaded Execution #11442

Open
yjshen opened this issue Nov 5, 2024 · 6 comments
Labels
bug Something isn't working

Comments

@yjshen
Copy link

yjshen commented Nov 5, 2024

Bug description

Current Behavior

In Velox's single-threaded execution mode, Task::next() uses folly::collectAll to wait for all blocked drivers, even when some drivers are waiting for others in the same task.

Specific example with hash join:

  1. Build side driver temporarily blocks waiting for input from a custom operator.
  2. Probe side driver blocks waiting for the build side to complete
  3. Task::next() waits for both drivers via collectAll
  4. Execution deadlocks because probe can never unblock until build completes

Expected Behavior

The implementation should:

  1. Allow drivers to unblock independently when their blocking conditions are met
  2. Not require all blocked drivers to become unblocked simultaneously
  3. Respect pipeline data dependencies

Technical Details

The bug occurs in Task::next():

// In Task::next()
if (runnableDrivers == 0) {
  if (blockedDrivers > 0) {
    if (!future) {
      VELOX_FAIL(
          "Cannot make progress as all remaining drivers are blocked and user are not expected to wait.");
    } else {
      std::vector<ContinueFuture> notReadyFutures;
      for (auto& continueFuture : futures) {
        if (!continueFuture.isReady()) {
          notReadyFutures.emplace_back(std::move(continueFuture));
        }
      }
      *future = folly::collectAll(std::move(notReadyFutures)).unit();
    }
  }
  return nullptr;
}

Proposed Solution

Replace collectAll with collectAny to allow drivers to make progress independently

System information

I'm using an old version of Velox, but I checked the code for `Task::next(), and it is unchanged.

Velox System Info v0.0.2
Commit: 5d315fb
CMake Version: 3.28.3
System: Linux-6.8.0-1017-gcp
Arch: x86_64
C++ Compiler: /usr/bin/c++
C++ Compiler Version: 11.4.0
C Compiler: /usr/bin/cc
C Compiler Version: 11.4.0
CMake Prefix Path: /usr/local;/usr;/;/usr/local/lib/python3.10/dist-packages/cmake/data;/usr/local;/usr/X11R6;/usr/pkg;/opt

Relevant logs

No response

@yjshen yjshen added bug Something isn't working triage Newly created issue that needs attention. labels Nov 5, 2024
@Yuhta
Copy link
Contributor

Yuhta commented Nov 5, 2024

Probably not a problem with collectAll. What is blocking build side to finish? It seems just blocked normally on the custom operator, not a deadlock.

@yjshen
Copy link
Author

yjshen commented Nov 5, 2024

In our scenario, we employ Gluten + Velox with custom operators preceding the join build; the caller of Task::next() begins to wait upon encountering a block in the build pipeline for the first time, yet it requires both the build and probe pipelines to be kNotBlocked in order to advance.

class CustomOperator : public Operator {
 protected:
  // States for async input processing
  bool noMoreInput_{false};
  mutable std::mutex mutex_;
  folly::Future<folly::Unit> inputProcessingTask_;  // Key field for blocking
  folly::Executor* executor_{nullptr};

 public:
  // Called when operator needs to process input asynchronously
  void startAsyncInputProcessing(RowVectorPtr input) {
    // Create a promise/future pair to track async processing
    folly::Promise<folly::Unit> promise;
    inputProcessingTask_ = promise.getFuture();  // Future becomes valid here

    // Schedule async task
    auto task = [this, input = std::move(input), promise = std::move(promise)]() {
        // Process input asynchronously...
        
        // When done, fulfill the promise
        promise.setValue();
    };
    executor_->add(std::move(task));
  }

  // Check if operator is blocked
  BlockingReason isBlockedDefault(ContinueFuture* future) {
    std::lock_guard<std::mutex> lock(mutex_);
    if (inputProcessingTask_.valid()) {
      // We're blocked waiting for async processing
      if (future) {
        *future = std::move(inputProcessingTask_);
      }
      return BlockingReason::kWaitForConnector;
    }
    return BlockingReason::kNotBlocked;
  }

  // Add new input
  void addInputDefault(RowVectorPtr input) {
    std::lock_guard<std::mutex> lock(mutex_);
    if (inputProcessingTask_.valid()) {
      VELOX_FAIL("Cannot add input while processing previous input");
    }
    startAsyncInputProcessing(std::move(input));
  }
};

@Yuhta
Copy link
Contributor

Yuhta commented Nov 5, 2024

Probe is waiting on build, and build is waiting on inputProcessingTask_. The whole task cannot proceed until inputProcessingTask_ is fulfilled, which is correct.

@yjshen
Copy link
Author

yjshen commented Nov 5, 2024

Yes, I think the issue is that when inputProcessingTask_ is fulfilled later, the whole task still cannot proceed because the caller (WholeStageResultIterator) already waits on the future from collectAll, which won't be notified until the probe side is also runnable. Do I miss something crucial here?

@Yuhta
Copy link
Contributor

Yuhta commented Nov 5, 2024

I see, we should only wait on futures for external blocking, and probe is an internal blocking and should not be part of the list.

@xiaoxmeng Any idea how we can differentiate external blocking vs internal blocking futures in this case?

@FelixYBW
Copy link
Contributor

FelixYBW commented Nov 6, 2024

@zhztheplayer

@assignUser assignUser removed the triage Newly created issue that needs attention. label Nov 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants