-
Notifications
You must be signed in to change notification settings - Fork 16.8k
triggerer: TriggerCommsDecoder.asend() asyncio.Lock is not thread-safe — concurrent sync_to_async calls cause "Response read out of order" and fatal TriggerRunner crash #64620
Description
Apache Airflow Provider(s)
core (triggerer)
Versions of Apache Airflow Providers
Observed in Apache Airflow 3.1.8 (Astronomer Runtime 3.1-14).
Apache Airflow version
3.1.8
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Astronomer (managed)
Deployment details
Managed Astronomer deployment with triggerer enabled, using deferrable operators including CloudDataTransferServiceS3ToGCSOperator (Google Cloud Storage Transfer Service) and BigQuery deferrable operators. Single triggerer replica, DEFAULT_CAPACITY=1000, 0.5 vCPU / 1.92 GiB memory.
What happened
The triggerer's TriggerRunner subprocess crashes with RuntimeError: Response read out of order! Got frame.id=N, expect_id=N+1 raised from TriggerCommsDecoder._aget_response. When this exception propagates up through TriggerRunner.arun() → sync_state_to_supervisor(), it kills the entire TriggerRunner subprocess rather than just the individual failing trigger. After this fatal crash, the triggerer pod remains alive (its HTTP server continues serving requests) but the TriggerRunner subprocess does not restart, leaving all deferred tasks stuck indefinitely.
The error was observed repeatedly — at least a dozen times over an 11-hour window — across multiple TriggerRunner subprocess restarts. The final fatal crash propagated to arun() itself and the subprocess did not recover:
2026-04-02T05:43:00.310571Z [error] Trigger runner failed [airflow.jobs.triggerer_job_runner]
RuntimeError: Response read out of order! Got frame.id=18314, expect_id=18315
File triggerer_job_runner.py:880, in arun
File triggerer_job_runner.py:1090, in sync_state_to_supervisor
File triggerer_job_runner.py:1101, in asend
File triggerer_job_runner.py:801, in asend
File triggerer_job_runner.py:791, in _aget_response
An earlier instance at 02:37:35 UTC shows the full call chain, originating from a BigQuery trigger calling a synchronous method from within an async context:
RuntimeError: Response read out of order! Got frame.id=25902, expect_id=25903
task: <Task finished name='kingpd-dimensions/manual__2026-04-01T00:00:00+00:00/d_kingpd_flavour_f_act_sum.insert-step-1/-1/1 (ID 158353)'
exception=RuntimeError('Response read out of order! Got frame.id=25902, expect_id=25903')>
Full traceback:
greenback/_impl.py:116 greenback_shim
greenback/_impl.py:201 _greenback_shim
greenback/_impl.py:81 trampoline
outcome/_impl.py:185 send
triggerer_job_runner.py:1152 run_trigger
providers/google/cloud/triggers/bigquery.py:199 run
providers/google/cloud/triggers/bigquery.py:157 safe_to_cancel
providers/google/cloud/triggers/bigquery.py:131 get_task_state
asgiref/sync.py:439 __call__ ← sync_to_async wrapping a sync fn
greenback/_impl.py:210 _greenback_shim
concurrent/futures/thread.py:59 run ← runs in thread pool
asgiref/sync.py:491 thread_handler
sdk/execution_time/task_runner.py:514 get_task_states
triggerer_job_runner.py:772 send ← TriggerCommsDecoder.send()
asgiref/sync.py:262 __call__ ← async_to_sync()
concurrent/futures/_base.py:449 result
concurrent/futures/_base.py:401 __get_result
asgiref/sync.py:300 main_wrap
triggerer_job_runner.py:801 asend
triggerer_job_runner.py:791 _aget_response ← frame.id mismatch raised here
After the fatal 05:43 crash, the triggerer pod served only HTTP 404 Not Found responses for trigger log requests — confirming no triggers were being executed — for the remainder of the observation window (07:30–07:53+ UTC).
Root Cause Analysis
The bug is a thread-safety violation in TriggerCommsDecoder.asend().
# triggerer_job_runner.py
async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
bytes = frame.as_bytes()
async with self._async_lock: # ← asyncio.Lock: only safe within ONE event loop
self._async_writer.write(bytes)
return await self._aget_response(frame.id)TriggerCommsDecoder.send() calls async_to_sync(self.asend)(msg). When async_to_sync is called from a thread (e.g., from asgiref.sync_to_async running a synchronous method of a trigger), it spins up a new asyncio event loop in that thread. The asyncio.Lock (self._async_lock) is bound to a single event loop and provides no mutual exclusion across threads — it only serializes coroutines within the same event loop.
When two concurrent callers invoke TriggerCommsDecoder.send():
- Caller A:
sync_state_to_supervisor()from the mainTriggerRunner.arun()event loop - Caller B: a BigQuery trigger's
get_task_state()viasync_to_async→ thread →async_to_sync(asend)on a separate event loop
Both callers write their request frames and then await the response for their own frame ID. Because the writes and reads are not mutually exclusive across threads, Caller A reads the response intended for Caller B (frame IDs arrive out of order), raising RuntimeError: Response read out of order! Got frame.id=N, expect_id=N+1.
The specific trigger type that initiates the cross-thread send is any trigger that:
- calls a synchronous method from its
async def run()loop (viaasgiref.sync_to_asyncorgreenback) - and that synchronous method calls
TriggerCommsDecoder.send()(e.g., viatask_runner.get_task_states)
In the observed incidents, BigQueryTableExistenceTrigger calling safe_to_cancel → get_task_state → task_runner.get_task_states is the initiating trigger. But the victim can be any trigger running concurrently in the same TriggerRunner subprocess.
What you think should happen instead
TriggerCommsDecoder.asend() should be safe to call from multiple threads simultaneously. The asyncio.Lock should be replaced with a threading.Lock (or a threading.RLock) that provides mutual exclusion across threads, not just within a single event loop. Alternatively, the communication channel could be restructured so that cross-thread sends use a different mechanism (e.g., asyncio.run_coroutine_threadsafe with the parent event loop rather than async_to_sync).
A minimal fix would be:
async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
bytes = frame.as_bytes()
with self._thread_lock: # threading.Lock — cross-thread safe
self._async_writer.write(bytes)
return await self._aget_response(frame.id)where self._thread_lock = threading.Lock() is added alongside the existing asyncio.Lock.
Note: self.id_counter (a itertools.count) is also shared across threads; it should be verified as thread-safe (it is in CPython due to the GIL, but worth noting).
How to reproduce
- Run a deployment with multiple deferrable operators active concurrently, including at least one trigger type that calls
task_runner.get_task_states(or any synchronous SDK method) from insideasync def run()viasync_to_asyncorgreenback. - Set
AIRFLOW__TRIGGERER__DEFAULT_CAPACITYhigh (e.g., 1000) to maximize concurrency. - Observe
RuntimeError: Response read out of order!in triggerer logs when the race condition fires. - Observe
Trigger runner failedwith the same error logged fromarunwhen it propagates to the main loop. - After the fatal crash, all deferred tasks remain stuck in
DEFERREDstate indefinitely; the triggerer pod is alive but serves only404 Not Foundfor trigger log requests.
Relationship to existing issues
This is related to but distinct from #64213, which covers a different RuntimeError from the same TriggerCommsDecoder.send() path (Task got Future attached to a different loop). Both issues share the same root cause (thread-unsafe asyncio.Lock in asend), but produce different error messages depending on which async/thread boundary is crossed first.
Anything else
- The error repeats across multiple
TriggerRunnersubprocess restarts (observed ~12 times in 11 hours before the fatal crash). - The fatal variant (
Trigger runner failedpropagating througharun) is more severe than non-fatal variants: theTriggerRunnersubprocess does not restart, requiring a full triggerer pod restart to recover. AIRFLOW__TRIGGERER__DEFAULT_CAPACITY=1000on a 0.5 vCPU pod was a contributing factor — higher concurrency increases the probability of the race condition.- Confirmed on Airflow 3.1.8 / Python 3.12 /
asgiref3.x /greenbackinstalled.