Skip to content

Commit 25b95d4

Browse files
committed
do the heartbeat handling here
1 parent 4dfd0e9 commit 25b95d4

4 files changed

Lines changed: 40 additions & 14 deletions

File tree

src/context.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use uuid::Uuid;
77

88
use crate::Durable;
99
use crate::error::{ControlFlow, TaskError, TaskResult};
10-
use crate::heartbeat::{HeartbeatHandle, Heartbeater};
10+
use crate::heartbeat::{HeartbeatHandle, Heartbeater, StepState};
1111
use crate::task::Task;
1212
use crate::types::DurableEventPayload;
1313
use crate::types::{
@@ -165,9 +165,9 @@ where
165165
/// # Example
166166
///
167167
/// ```ignore
168-
/// let payment_id = ctx.step("charge-payment", ctx.task_id, |task_id, _state| async {
168+
/// let payment_id = ctx.step("charge-payment", ctx.task_id, |task_id, step_state| async {
169169
/// let idempotency_key = format!("{}:charge", task_id);
170-
/// stripe::charge(amount, &idempotency_key).await
170+
/// stripe::charge(amount, &idempotency_key, &step_state.state).await
171171
/// }).await?;
172172
/// ```
173173
#[cfg_attr(
@@ -182,7 +182,7 @@ where
182182
&mut self,
183183
base_name: &str,
184184
params: P,
185-
f: fn(P, State) -> Fut,
185+
f: fn(P, StepState<State>) -> Fut,
186186
) -> TaskResult<T>
187187
where
188188
P: Serialize,
@@ -206,13 +206,14 @@ where
206206
span.record("cached", false);
207207

208208
// Execute the step
209-
let result =
210-
f(params, self.durable.state().clone())
211-
.await
212-
.map_err(|e| TaskError::Step {
213-
base_name: base_name.to_string(),
214-
error: e,
215-
})?;
209+
let step_state = StepState {
210+
state: self.durable.state().clone(),
211+
heartbeater: self.heartbeat_handle.clone(),
212+
};
213+
let result = f(params, step_state).await.map_err(|e| TaskError::Step {
214+
base_name: base_name.to_string(),
215+
error: e,
216+
})?;
216217

217218
// Persist checkpoint (also extends claim lease)
218219
#[cfg(feature = "telemetry")]

src/heartbeat.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,28 @@ impl Heartbeater for NoopHeartbeater {
9696
Ok(())
9797
}
9898
}
99+
100+
/// State provided to `step()` closures, wrapping the user's application state
101+
/// alongside a [`HeartbeatHandle`] for extending the task lease.
102+
///
103+
/// This is passed as the second argument to every `step()` closure, making
104+
/// heartbeating available without the consumer needing to thread it manually.
105+
///
106+
/// # Example
107+
///
108+
/// ```ignore
109+
/// ctx.step("long-operation", params, |params, step_state| async move {
110+
/// for item in &params.items {
111+
/// process(item, &step_state.state).await?;
112+
/// // Extend lease during long-running work
113+
/// let _ = step_state.heartbeater.heartbeat(None).await;
114+
/// }
115+
/// Ok(result)
116+
/// }).await?;
117+
/// ```
118+
pub struct StepState<State> {
119+
/// The user's application state.
120+
pub state: State,
121+
/// Handle for extending the task lease during long-running operations.
122+
pub heartbeater: HeartbeatHandle,
123+
}

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ pub use client::{Durable, DurableBuilder};
110110
pub use context::TaskContext;
111111
pub use cron::{ScheduleFilter, ScheduleInfo, ScheduleOptions, setup_pgcron};
112112
pub use error::{ControlFlow, DurableError, DurableResult, TaskError, TaskResult};
113-
pub use heartbeat::{HeartbeatHandle, Heartbeater, NoopHeartbeater};
113+
pub use heartbeat::{HeartbeatHandle, Heartbeater, NoopHeartbeater, StepState};
114114
pub use task::{ErasedTask, Task, TaskWrapper};
115115
pub use types::{
116116
CancellationPolicy, ClaimedTask, DurableEventPayload, RetryStrategy, SpawnDefaults,

tests/execution_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -745,13 +745,13 @@ impl durable::Task<AppState> for WriteToDbTask {
745745
) -> durable::TaskResult<Self::Output> {
746746
// Use the app state's db pool to write to a table
747747
let row_id: i64 = ctx
748-
.step("insert", params, |params, state| async move {
748+
.step("insert", params, |params, step_state| async move {
749749
let (id,): (i64,) = sqlx::query_as(
750750
"INSERT INTO test_state_table (key, value) VALUES ($1, $2) RETURNING id",
751751
)
752752
.bind(&params.key)
753753
.bind(&params.value)
754-
.fetch_one(&state.db_pool)
754+
.fetch_one(&step_state.state.db_pool)
755755
.await
756756
.map_err(|e| anyhow::anyhow!("DB error: {}", e))?;
757757
Ok(id)

0 commit comments

Comments
 (0)