11import asyncio
22import logging
3+ import sys
34
45from a2a .types import (
56 A2AError ,
@@ -39,6 +40,8 @@ def __init__(self) -> None:
3940 """Initializes the EventQueue."""
4041 self .queue : asyncio .Queue [Event ] = asyncio .Queue ()
4142 self ._children : list [EventQueue ] = []
43+ self ._is_closed = False
44+ self ._lock = asyncio .Lock ()
4245 logger .debug ('EventQueue initialized.' )
4346
4447 def enqueue_event (self , event : Event ):
@@ -47,6 +50,9 @@ def enqueue_event(self, event: Event):
4750 Args:
4851 event: The event object to enqueue.
4952 """
53+ if self ._is_closed :
54+ logger .warning ('Queue is closed. Event will not be enqueued.' )
55+ return
5056 logger .debug (f'Enqueuing event of type: { type (event )} ' )
5157 self .queue .put_nowait (event )
5258 for child in self ._children :
@@ -55,6 +61,20 @@ def enqueue_event(self, event: Event):
5561 async def dequeue_event (self , no_wait : bool = False ) -> Event :
5662 """Dequeues an event from the queue.
5763
64+ This implementation expects that dequeue to raise an exception when
65+ the queue has been closed. In python 3.13+ this is naturally provided
66+ by the QueueShutDown exception generated when the queue has closed and
67+ the user is awaiting the queue.get method. Python<=3.12 this needs to
68+ manage this lifecycle itself. The current implementation can lead to
69+ blocking if the dequeue_event is called before the EventQueue has been
70+ closed but when there are no events on the queue. Two ways to avoid this
71+ are to call this with no_wait = True which won't block, but is the
72+ callers responsibility to retry as appropriate. Alternatively, one can
73+ use a async Task management solution to cancel the get task if the queue
74+ has closed or some other condition is met. The implementation of the
75+ EventConsumer uses an async.wait with a timeout to abort the
76+ dequeue_event call and retry, when it will return with a closed error.
77+
5878 Args:
5979 no_wait: If True, retrieve an event immediately or raise `asyncio.QueueEmpty`.
6080 If False (default), wait until an event is available.
@@ -66,6 +86,11 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
6686 asyncio.QueueEmpty: If `no_wait` is True and the queue is empty.
6787 asyncio.QueueShutDown: If the queue has been closed and is empty.
6888 """
89+ async with self ._lock :
90+ if self ._is_closed and self .queue .empty ():
91+ logger .warning ('Queue is closed. Event will not be dequeued.' )
92+ raise asyncio .QueueEmpty ('Queue is closed.' )
93+
6994 if no_wait :
7095 logger .debug ('Attempting to dequeue event (no_wait=True).' )
7196 event = self .queue .get_nowait ()
@@ -99,13 +124,30 @@ def tap(self) -> 'EventQueue':
99124 self ._children .append (queue )
100125 return queue
101126
102- def close (self ):
127+ async def close (self ):
103128 """Closes the queue for future push events.
104129
105130 Once closed, `dequeue_event` will eventually raise `asyncio.QueueShutDown`
106131 when the queue is empty. Also closes all child queues.
107132 """
108133 logger .debug ('Closing EventQueue.' )
109- self .queue .shutdown ()
110- for child in self ._children :
111- child .close ()
134+ async with self ._lock :
135+ # If already closed, just return.
136+ if self ._is_closed :
137+ return
138+ self ._is_closed = True
139+ # If using python 3.13 or higher, use the shutdown method
140+ if sys .version_info >= (3 , 13 ):
141+ self .queue .shutdown ()
142+ for child in self ._children :
143+ child .close ()
144+ # Otherwise, join the queue
145+ else :
146+ tasks = [asyncio .create_task (self .queue .join ())]
147+ for child in self ._children :
148+ tasks .append (asyncio .create_task (child .close ()))
149+ await asyncio .wait (tasks , return_when = asyncio .ALL_COMPLETED )
150+
151+ def is_closed (self ) -> bool :
152+ """Checks if the queue is closed."""
153+ return self ._is_closed
0 commit comments