Skip to content

Commit 8248424

Browse files
authored
Fail durable tasks immediately for non-retryable errors (#66)
* Fail durable tasks immediately for non-retryable errors Currently, we classify only a few error types (including errors from user steps) as retryable. Everything else is non-retryable, and causes the task to fail immediately, without any retries * Run fmt
1 parent 3eb3259 commit 8248424

10 files changed

Lines changed: 402 additions & 48 deletions

File tree

sql/schema.sql

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -857,12 +857,16 @@ begin
857857
end;
858858
$$;
859859

860+
--- Marks a run as failed.
861+
--- If p_force_fail is true, then the retry policy and `p_retry_at` are ignored,
862+
--- and the task is immediately failed (as though it had reached the max retries)
860863
create function durable.fail_run (
861864
p_queue_name text,
862865
p_task_id uuid,
863866
p_run_id uuid,
864867
p_reason jsonb,
865-
p_retry_at timestamptz default null
868+
p_retry_at timestamptz default null,
869+
p_force_fail boolean default false
866870
)
867871
returns void
868872
language plpgsql
@@ -941,8 +945,8 @@ begin
941945
v_task_state_after := 'failed';
942946
v_recorded_attempt := v_attempt;
943947

944-
-- Compute the next retry time
945-
if v_max_attempts is null or v_next_attempt <= v_max_attempts then
948+
-- Compute the next retry time, unless we're forcing a failure
949+
if (not p_force_fail) and (v_max_attempts is null or v_next_attempt <= v_max_attempts) then
946950
if p_retry_at is not null then
947951
v_next_available := p_retry_at;
948952
else

src/context.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,13 @@ where
186186
}
187187

188188
// Execute the step
189-
let result = f(params, self.durable.state().clone()).await?;
189+
let result =
190+
f(params, self.durable.state().clone())
191+
.await
192+
.map_err(|e| TaskError::Step {
193+
base_name: base_name.to_string(),
194+
error: e,
195+
})?;
190196

191197
// Persist checkpoint (also extends claim lease)
192198
#[cfg(feature = "telemetry")]
@@ -262,7 +268,8 @@ where
262268
.bind(self.run_id)
263269
.bind(self.claim_timeout.as_secs() as i32)
264270
.execute(self.durable.pool())
265-
.await?;
271+
.await
272+
.map_err(TaskError::from_sqlx_error)?;
266273

267274
self.checkpoint_cache.insert(name.to_string(), state_json);
268275

@@ -301,7 +308,8 @@ where
301308
.bind(&checkpoint_name)
302309
.bind(duration_ms)
303310
.fetch_one(self.durable.pool())
304-
.await?;
311+
.await
312+
.map_err(TaskError::from_sqlx_error)?;
305313

306314
if needs_suspend {
307315
return Err(TaskError::Control(ControlFlow::Suspend));
@@ -379,7 +387,8 @@ where
379387
.bind(event_name)
380388
.bind(timeout_secs)
381389
.fetch_one(self.durable.pool())
382-
.await?;
390+
.await
391+
.map_err(TaskError::from_sqlx_error)?;
383392

384393
if result.should_suspend {
385394
return Err(TaskError::Control(ControlFlow::Suspend));
@@ -480,7 +489,8 @@ where
480489
.bind(self.run_id)
481490
.bind(extend_by.as_secs() as i32)
482491
.execute(self.durable.pool())
483-
.await?;
492+
.await
493+
.map_err(TaskError::from_sqlx_error)?;
484494

485495
// Notify worker that lease was extended so it can reset timers
486496
self.lease_extender.notify(extend_by);
@@ -743,7 +753,8 @@ where
743753
.bind(&event_name)
744754
.bind(None::<i32>) // No timeout
745755
.fetch_one(self.durable.pool())
746-
.await?;
756+
.await
757+
.map_err(TaskError::from_sqlx_error)?;
747758

748759
if result.should_suspend {
749760
return Err(TaskError::Control(ControlFlow::Suspend));

src/error.rs

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,41 @@ pub enum TaskError {
128128
error_data: JsonValue,
129129
},
130130

131-
/// An internal error from user task code.
132-
///
133-
/// This is the catch-all variant for errors propagated via `?` on anyhow errors.
134-
/// For structured user errors, prefer using [`TaskError::user()`].
135-
#[error(transparent)]
136-
TaskInternal(#[from] anyhow::Error),
131+
//// The user callback provided to `step` failed.
132+
/// We treat this as a non-deterministic error, and will retry the task
133+
#[error("user step `{base_name}` failed: {error}")]
134+
Step {
135+
base_name: String,
136+
error: anyhow::Error,
137+
},
138+
139+
/// The task panicked.
140+
#[error("task panicked: {message}")]
141+
TaskPanicked {
142+
/// The error message from the task.
143+
message: String,
144+
},
145+
}
146+
147+
impl TaskError {
148+
pub fn retryable(&self) -> bool {
149+
match self {
150+
// These are non-deterministic errors, which might succeed on a retry
151+
// (which will have the same checkpoint cache up to the point of the error)
152+
TaskError::Timeout { .. } | TaskError::Database(_) | TaskError::Step { .. } => true,
153+
// Everything else is considered to be a deterministic error, which will fail again
154+
// on a retry
155+
TaskError::SubtaskSpawnFailed { .. }
156+
| TaskError::EmitEventFailed { .. }
157+
| TaskError::Control(_)
158+
| TaskError::Serialization(_)
159+
| TaskError::ChildFailed { .. }
160+
| TaskError::ChildCancelled { .. }
161+
| TaskError::Validation { .. }
162+
| TaskError::User { .. }
163+
| TaskError::TaskPanicked { .. } => false,
164+
}
165+
}
137166
}
138167

139168
/// Result type alias for task execution.
@@ -183,8 +212,10 @@ impl From<serde_json::Error> for TaskError {
183212
}
184213
}
185214

186-
impl From<sqlx::Error> for TaskError {
187-
fn from(err: sqlx::Error) -> Self {
215+
impl TaskError {
216+
// This is explicitly *not* a `From<sqlx::Error> for TaskError` impl,
217+
// because we don't want user code to be performing database queries directly.
218+
pub(crate) fn from_sqlx_error(err: sqlx::Error) -> Self {
188219
if is_cancelled_error(&err) {
189220
TaskError::Control(ControlFlow::Cancelled)
190221
} else {
@@ -275,11 +306,17 @@ pub fn serialize_task_error(err: &TaskError) -> JsonValue {
275306
"error_data": error_data,
276307
})
277308
}
278-
TaskError::TaskInternal(e) => {
309+
TaskError::Step { base_name, error } => {
279310
serde_json::json!({
280-
"name": "TaskInternal",
281-
"message": e.to_string(),
282-
"backtrace": format!("{:?}", e)
311+
"name": "Step",
312+
"base_name": base_name,
313+
"message": error.to_string(),
314+
})
315+
}
316+
TaskError::TaskPanicked { message } => {
317+
serde_json::json!({
318+
"name": "TaskPanicked",
319+
"message": message,
283320
})
284321
}
285322
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
-- Add p_force_fail parameter to durable.fail_run function
2+
-- When p_force_fail is true, the retry policy and p_retry_at are ignored,
3+
-- and the task is immediately failed (as though it had reached the max retries)
4+
5+
drop function if exists durable.fail_run(text, uuid, uuid, jsonb, timestamptz);
6+
7+
create function durable.fail_run (
8+
p_queue_name text,
9+
p_task_id uuid,
10+
p_run_id uuid,
11+
p_reason jsonb,
12+
p_retry_at timestamptz default null,
13+
p_force_fail boolean default false
14+
)
15+
returns void
16+
language plpgsql
17+
as $$
18+
declare
19+
v_run_task_id uuid;
20+
v_attempt integer;
21+
v_retry_strategy jsonb;
22+
v_max_attempts integer;
23+
v_now timestamptz := durable.current_time();
24+
v_next_attempt integer;
25+
v_delay_seconds double precision := 0;
26+
v_next_available timestamptz;
27+
v_retry_kind text;
28+
v_base double precision;
29+
v_factor double precision;
30+
v_max_seconds double precision;
31+
v_first_started timestamptz;
32+
v_cancellation jsonb;
33+
v_max_duration bigint;
34+
v_task_state text;
35+
v_task_cancel boolean := false;
36+
v_new_run_id uuid;
37+
v_task_state_after text;
38+
v_recorded_attempt integer;
39+
v_last_attempt_run uuid := p_run_id;
40+
v_cancelled_at timestamptz := null;
41+
begin
42+
-- Lock task first to keep a consistent task -> run lock order.
43+
execute format(
44+
'select retry_strategy, max_attempts, first_started_at, cancellation, state
45+
from durable.%I
46+
where task_id = $1
47+
for update',
48+
't_' || p_queue_name
49+
)
50+
into v_retry_strategy, v_max_attempts, v_first_started, v_cancellation, v_task_state
51+
using p_task_id;
52+
53+
if v_task_state is null then
54+
raise exception 'Task "%" not found in queue "%"', p_task_id, p_queue_name;
55+
end if;
56+
57+
-- Lock run after task and ensure it's still eligible
58+
execute format(
59+
'select task_id, attempt
60+
from durable.%I
61+
where run_id = $1
62+
and state in (''running'', ''sleeping'')
63+
for update',
64+
'r_' || p_queue_name
65+
)
66+
into v_run_task_id, v_attempt
67+
using p_run_id;
68+
69+
if v_run_task_id is null then
70+
raise exception 'Run "%" cannot be failed in queue "%"', p_run_id, p_queue_name;
71+
end if;
72+
73+
if v_run_task_id <> p_task_id then
74+
raise exception 'Run "%" does not belong to task "%"', p_run_id, p_task_id;
75+
end if;
76+
77+
-- Actually fail the run
78+
execute format(
79+
'update durable.%I
80+
set state = ''failed'',
81+
wake_event = null,
82+
failed_at = $2,
83+
failure_reason = $3
84+
where run_id = $1',
85+
'r_' || p_queue_name
86+
) using p_run_id, v_now, p_reason;
87+
88+
v_next_attempt := v_attempt + 1;
89+
v_task_state_after := 'failed';
90+
v_recorded_attempt := v_attempt;
91+
92+
-- Compute the next retry time, unless we're forcing a failure
93+
if (not p_force_fail) and (v_max_attempts is null or v_next_attempt <= v_max_attempts) then
94+
if p_retry_at is not null then
95+
v_next_available := p_retry_at;
96+
else
97+
v_retry_kind := coalesce(v_retry_strategy->>'kind', 'none');
98+
if v_retry_kind = 'fixed' then
99+
v_base := coalesce((v_retry_strategy->>'base_seconds')::double precision, 60);
100+
v_delay_seconds := v_base;
101+
elsif v_retry_kind = 'exponential' then
102+
v_base := coalesce((v_retry_strategy->>'base_seconds')::double precision, 30);
103+
v_factor := coalesce((v_retry_strategy->>'factor')::double precision, 2);
104+
v_delay_seconds := v_base * power(v_factor, greatest(v_attempt - 1, 0));
105+
v_max_seconds := (v_retry_strategy->>'max_seconds')::double precision;
106+
if v_max_seconds is not null then
107+
v_delay_seconds := least(v_delay_seconds, v_max_seconds);
108+
end if;
109+
else
110+
v_delay_seconds := 0;
111+
end if;
112+
v_next_available := v_now + (v_delay_seconds * interval '1 second');
113+
end if;
114+
115+
if v_next_available < v_now then
116+
v_next_available := v_now;
117+
end if;
118+
119+
if v_cancellation is not null then
120+
v_max_duration := (v_cancellation->>'max_duration')::bigint;
121+
if v_max_duration is not null and v_first_started is not null then
122+
if extract(epoch from (v_next_available - v_first_started)) >= v_max_duration then
123+
v_task_cancel := true;
124+
end if;
125+
end if;
126+
end if;
127+
128+
-- Set up the new run if not cancelling
129+
if not v_task_cancel then
130+
v_task_state_after := case when v_next_available > v_now then 'sleeping' else 'pending' end;
131+
v_new_run_id := durable.portable_uuidv7();
132+
v_recorded_attempt := v_next_attempt;
133+
v_last_attempt_run := v_new_run_id;
134+
execute format(
135+
'insert into durable.%I (run_id, task_id, attempt, state, available_at, wake_event, event_payload, result, failure_reason)
136+
values ($1, $2, $3, %L, $4, null, null, null, null)',
137+
'r_' || p_queue_name,
138+
v_task_state_after
139+
)
140+
using v_new_run_id, p_task_id, v_next_attempt, v_next_available;
141+
end if;
142+
end if;
143+
144+
if v_task_cancel then
145+
v_task_state_after := 'cancelled';
146+
v_cancelled_at := v_now;
147+
v_recorded_attempt := greatest(v_recorded_attempt, v_attempt);
148+
v_last_attempt_run := p_run_id;
149+
end if;
150+
151+
execute format(
152+
'update durable.%I
153+
set state = %L,
154+
attempts = greatest(attempts, $3),
155+
last_attempt_run = $4,
156+
cancelled_at = coalesce(cancelled_at, $5)
157+
where task_id = $1',
158+
't_' || p_queue_name,
159+
v_task_state_after
160+
) using p_task_id, v_task_state_after, v_recorded_attempt, v_last_attempt_run, v_cancelled_at;
161+
162+
-- Delete wait registrations for this run
163+
execute format(
164+
'delete from durable.%I where run_id = $1',
165+
'w_' || p_queue_name
166+
) using p_run_id;
167+
168+
-- If task reached terminal state, cleanup (emit event, cascade cancel)
169+
if v_task_state_after in ('failed', 'cancelled') then
170+
perform durable.cleanup_task_terminal(
171+
p_queue_name,
172+
p_task_id,
173+
v_task_state_after,
174+
jsonb_build_object('error', p_reason),
175+
true -- cascade cancel children
176+
);
177+
end if;
178+
end;
179+
$$;

src/worker.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ impl Worker {
309309
durable.queue_name(),
310310
task.task_id,
311311
task.run_id,
312-
&e.into(),
312+
&TaskError::from_sqlx_error(e),
313313
)
314314
.await;
315315
return;
@@ -412,8 +412,9 @@ impl Worker {
412412
Ok(r) => Some(r),
413413
Err(e) if e.is_cancelled() => None, // Task was aborted
414414
Err(e) => {
415-
tracing::error!("Task {} panicked: {}", task_label, e);
416-
Some(Err(TaskError::TaskInternal(anyhow::anyhow!("Task panicked: {e}"))))
415+
let message = format!("Task {} panicked: {}", task_label, e);
416+
tracing::error!("{}", message);
417+
Some(Err(TaskError::TaskPanicked { message }))
417418
}
418419
}
419420
}
@@ -563,13 +564,15 @@ impl Worker {
563564
error: &TaskError,
564565
) {
565566
let error_json = serialize_task_error(error);
566-
let query = "SELECT durable.fail_run($1, $2, $3, $4, $5)";
567+
let query = "SELECT durable.fail_run($1, $2, $3, $4, $5, $6)";
568+
let force_fail = !error.retryable();
567569
if let Err(e) = sqlx::query(query)
568570
.bind(queue_name)
569571
.bind(task_id)
570572
.bind(run_id)
571573
.bind(&error_json)
572574
.bind(None::<DateTime<Utc>>)
575+
.bind(force_fail)
573576
.execute(pool)
574577
.await
575578
{

0 commit comments

Comments
 (0)