Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions src/workflows/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,14 +500,35 @@ async def wait_for_event(
Raises:
asyncio.TimeoutError: If the timeout elapses.

Notes:
This API pauses a step by raising an internal exception and the step will
be re-run from the beginning once the matching event arrives. In other
words, code before the `await ctx.wait_for_event(...)` call will be
executed again when the step resumes. Make sure any logic before the call
is safe to replay (idempotent) or guarded by persisted state to avoid
duplicate side effects.

- Use `waiter_id` to ensure the optional `waiter_event` is written to the
stream only once across replays.
- Persist replay guards in `ctx.store` or another durable place if you
need to avoid repeating outbound network calls, writes, or other
external effects.

Examples:
```python
@step
async def my_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
# Guard side-effects before the wait; this code may run again
invited = await ctx.store.get("invited")
if not invited:
# e.g., send an invite only once across replays
send_invite_email()
await ctx.store.set("invited", True)

response = await ctx.wait_for_event(
HumanResponseEvent,
waiter_event=InputRequiredEvent(msg="What's your name?"),
waiter_id="user_name",
waiter_event=InputRequiredEvent(msg="What's your name?"), # emitted once per waiter_id
waiter_id="user_name", # stable across replays
timeout=60,
)
return StopEvent(result=response.response)
Expand Down