Skip to content

PipelineTask deadlock: cancel() before StartFrame reaches sink hangs worker permanently #4276

@miie

Description

@miie

PipelineTask deadlock: cancel() before StartFrame reaches sink hangs worker permanently

Summary

When task.cancel() is called while _process_push_queue is blocked in _wait_for_pipeline_start(), the queued CancelFrame can never be consumed and the pipeline worker hangs indefinitely in a busy state. This regression was introduced in v0.0.108 and is still present in main.

Environment

  • pipecat-ai version: 0.0.108 (latest; confirmed present in main as of 2026-04-13)
  • Transport: Daily
  • Python: 3.13

Steps to Reproduce

  1. Start a PipelineTask with a Daily transport.
  2. Call task.cancel() within the first ~1–2 seconds of session start (i.e. before the StartFrame has finished propagating to the sink — typically during the client connection handshake or cold-start initialisation).
  3. Observe that the task never finishes: has_finished() remains False and the worker process stays in a BUSY state permanently.

A concrete trigger: a user clicks a "Stop session" / disconnect button immediately after initiating a session, before the pipeline has fully started.

Root Cause

_process_push_queue blocks at line 777 (v0.0.108 / main):

await self._wait_for_pipeline_start(start_frame)  # blocks here until _pipeline_start_event is set

while running:
    frame = await self._push_queue.get()           # CancelFrame is stuck here — never reached
    ...

_pipeline_start_event is only set when the StartFrame reaches the sink (_sink_push_frame, line 853). If task.cancel() is called before that happens, _cancel() enqueues a CancelFrame via queue_frame():

async def _cancel(self, *, reason=None):
    ...
    await self.queue_frame(CancelFrame(reason=reason))  # → puts CancelFrame in _push_queue

But _process_push_queue is still blocked waiting for _pipeline_start_event.wait(). The CancelFrame sits in _push_queue forever. _pipeline_end_event is never set, _pipeline_finished_event is never set, and run() waits on _wait_for_pipeline_finished() indefinitely. The worker is permanently exhausted.

Why This Is a Regression

Prior to v0.0.108, _process_push_queue did not call _wait_for_pipeline_start() before entering the frame loop. The _wait_for_pipeline_start guard was added in v0.0.108 to ensure frames aren't processed before the pipeline is ready — a valid goal — but the cancellation path was not updated to account for the queue being blocked.

Notably, the codebase already handles a similar "blocked push queue" scenario for interruptions (line 821–823):

# InterruptionTaskFrame bypasses the push queue directly
await self._pipeline.queue_frame(InterruptionFrame())

The same bypass pattern is not applied for the cancel path.

Workaround

Manually set _pipeline_start_event before calling task.cancel() to unblock the queue worker:

async def _safe_cancel(task: PipelineTask) -> None:
    try:
        if not task.has_finished():
            ev = getattr(task, "_pipeline_start_event", None)
            if ev and not ev.is_set():
                ev.set()  # unblock _wait_for_pipeline_start so CancelFrame can be consumed
            await task.cancel()
    except Exception as e:
        logger.error(f"Error cancelling task: {e}")

asyncio.create_task(_safe_cancel(task))

This works but relies on a private attribute (_pipeline_start_event), making it fragile across future pipecat versions.

Suggested Fix

In _cancel(), check whether _pipeline_start_event has been set and set it if not, before queuing the CancelFrame:

async def _cancel(self, *, reason: Optional[str] = None):
    if not self._cancelled:
        logger.debug(f"Cancelling pipeline task {self}")
        self._cancelled = True
        # Unblock _process_push_queue if StartFrame hasn't reached the sink yet,
        # otherwise CancelFrame would be enqueued but never consumed.
        if not self._pipeline_start_event.is_set():
            self._pipeline_start_event.set()
        await self.queue_frame(CancelFrame(reason=reason))

Alternatively, the CancelFrame could bypass the push queue entirely (as InterruptionFrame does), though that may have different ordering implications.

Impact

Any downstream application that allows users to cancel a session during the first few seconds of start-up (before the pipeline is fully initialised) will permanently exhaust its worker pool. No recovery is possible without restarting the worker process.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions