Skip to content

Commit d9ed06e

Browse files
committed
add retry readme section, add to deterministic utc time with sequence, increase test coverage for worker and registry
Signed-off-by: Filinto Duran <[email protected]>
1 parent 366b2f6 commit d9ed06e

File tree

9 files changed

+529
-112
lines changed

9 files changed

+529
-112
lines changed

Makefile

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ test-unit:
77
test-e2e:
88
pytest -m e2e --verbose
99

10+
coverage-clean:
11+
rm -f .coverage .coverage.* coverage.xml
12+
13+
coverage-all: coverage-clean
14+
pytest -m "not e2e" --durations=0 --cov=durabletask --cov-branch --cov-report=term-missing --cov-report=xml
15+
pytest -m e2e --durations=0 --cov=durabletask --cov-branch --cov-report=term-missing --cov-report=xml --cov-append
16+
1017
install:
1118
python3 -m pip install .
1219

@@ -18,4 +25,4 @@ gen-proto:
1825
python3 -m grpc_tools.protoc --proto_path=. --python_out=. --pyi_out=. --grpc_python_out=. ./durabletask/internal/orchestrator_service.proto
1926
rm durabletask/internal/*.proto
2027

21-
.PHONY: init test-unit test-e2e gen-proto install
28+
.PHONY: init test-unit test-e2e coverage-clean coverage-unit coverage-e2e coverage-all gen-proto install

README.md

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,62 @@ Orchestrations can be continued as new using the `continue_as_new` API. This API
126126

127127
Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost. An orchestration can also be terminated using the `terminate_orchestration` client API. Terminated orchestrations will stop processing new events and will discard any buffered events.
128128

129-
### Retry policies (TODO)
129+
### Retry policies
130130

131131
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
132132

133+
#### Creating a retry policy
134+
135+
```python
136+
from datetime import timedelta
137+
from durabletask import task
138+
139+
retry_policy = task.RetryPolicy(
140+
first_retry_interval=timedelta(seconds=1), # Initial delay before first retry
141+
max_number_of_attempts=5, # Maximum total attempts (includes first attempt)
142+
backoff_coefficient=2.0, # Exponential backoff multiplier (must be >= 1)
143+
max_retry_interval=timedelta(seconds=30), # Cap on retry delay
144+
retry_timeout=timedelta(minutes=5), # Total time limit for all retries (optional)
145+
)
146+
```
147+
148+
**Notes:**
149+
- `max_number_of_attempts` **includes the initial attempt**. For example, `max_number_of_attempts=5` means 1 initial attempt + up to 4 retries.
150+
- `retry_timeout` is optional. If omitted or set to `None`, retries continue until `max_number_of_attempts` is reached.
151+
- `backoff_coefficient` controls exponential backoff: delay = `first_retry_interval * (backoff_coefficient ^ retry_number)`, capped by `max_retry_interval`.
152+
- `non_retryable_error_types` (optional) can specify additional exception types to treat as non-retryable (e.g., `[ValueError, TypeError]`). `NonRetryableError` is always non-retryable regardless of this setting.
153+
154+
#### Using retry policies
155+
156+
Apply retry policies to activities or sub-orchestrations:
157+
158+
```python
159+
def my_orchestrator(ctx: task.OrchestrationContext, input):
160+
# Retry an activity
161+
result = yield ctx.call_activity(my_activity, input=data, retry_policy=retry_policy)
162+
163+
# Retry a sub-orchestration
164+
result = yield ctx.call_sub_orchestrator(child_orchestrator, input=data, retry_policy=retry_policy)
165+
```
166+
167+
#### Non-retryable errors
168+
169+
For errors that should not be retried (e.g., validation failures, permanent errors), raise a `NonRetryableError`:
170+
171+
```python
172+
from durabletask.task import NonRetryableError
173+
174+
def my_activity(ctx: task.ActivityContext, input):
175+
if input is None:
176+
# This error will bypass retry logic and fail immediately
177+
raise NonRetryableError("Input cannot be None")
178+
179+
# Transient errors (network, timeouts, etc.) will be retried
180+
return call_external_service(input)
181+
```
182+
183+
Even with a retry policy configured, `NonRetryableError` will fail immediately without retrying.
184+
133185
## Getting Started
134186

135187
### Prerequisites

durabletask/deterministic.py

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import uuid
2626
from collections.abc import Sequence
2727
from dataclasses import dataclass
28-
from datetime import datetime
29-
from typing import Optional, Protocol, TypeVar, runtime_checkable
28+
from datetime import datetime, timedelta
29+
from typing import Optional, TypeVar
3030

3131

3232
@dataclass
@@ -99,17 +99,6 @@ def deterministic_uuid_v5(instance_id: str, current_datetime: datetime, counter:
9999
return uuid.uuid5(namespace, name)
100100

101101

102-
@runtime_checkable
103-
class DeterministicContextProtocol(Protocol):
104-
"""Protocol for contexts that provide deterministic operations."""
105-
106-
@property
107-
def instance_id(self) -> str: ...
108-
109-
@property
110-
def current_utc_datetime(self) -> datetime: ...
111-
112-
113102
class DeterministicContextMixin:
114103
"""
115104
Mixin providing deterministic helpers for workflow contexts.
@@ -121,25 +110,25 @@ class DeterministicContextMixin:
121110
"""
122111

123112
def __init__(self, *args, **kwargs):
124-
"""Initialize the mixin with a UUID counter."""
113+
"""Initialize the mixin with UUID and timestamp counters."""
125114
super().__init__(*args, **kwargs)
126115
# Counter for deterministic UUID generation (matches .NET newGuidCounter)
127116
# This counter resets to 0 on each replay, ensuring determinism
128117
self._uuid_counter: int = 0
118+
# Counter for deterministic timestamp sequencing (resets on replay)
119+
self._timestamp_counter: int = 0
129120

130121
def now(self) -> datetime:
131-
"""Return orchestration time (deterministic UTC)."""
132-
value = self.current_utc_datetime # type: ignore[attr-defined]
133-
assert isinstance(value, datetime)
134-
return value
122+
"""Alias for deterministic current_utc_datetime."""
123+
return self.current_utc_datetime # type: ignore[attr-defined]
135124

136125
def random(self) -> random.Random:
137126
"""Return a PRNG seeded deterministically from instance id and orchestration time."""
138127
rnd = deterministic_random(
139128
self.instance_id, # type: ignore[attr-defined]
140129
self.current_utc_datetime, # type: ignore[attr-defined]
141130
)
142-
# Mark as deterministic for sandbox detector whitelisting of bound methods
131+
# Mark as deterministic for asyncio sandbox detector whitelisting of bound methods (randint, random)
143132
try:
144133
rnd._dt_deterministic = True
145134
except Exception:
@@ -201,3 +190,35 @@ def random_choice(self, sequence: Sequence[T]) -> T:
201190
raise IndexError("Cannot choose from empty sequence")
202191
rnd = self.random()
203192
return rnd.choice(sequence)
193+
194+
def now_with_sequence(self) -> datetime:
195+
"""
196+
Return deterministic timestamp with microsecond increment per call.
197+
198+
Each call returns: current_utc_datetime + (counter * 1 microsecond)
199+
200+
This provides ordered, unique timestamps for tracing/telemetry while maintaining
201+
determinism across replays. The counter resets to 0 on each replay (similar to
202+
_uuid_counter pattern).
203+
204+
Perfect for preserving event ordering within a workflow without requiring activities.
205+
206+
Returns:
207+
datetime: Deterministic timestamp that increments on each call
208+
209+
Example:
210+
```python
211+
def workflow(ctx):
212+
t1 = ctx.now_with_sequence() # 2024-01-01 12:00:00.000000
213+
result = yield ctx.call_activity(some_activity, input="data")
214+
t2 = ctx.now_with_sequence() # 2024-01-01 12:00:00.000001
215+
# t1 < t2, preserving order for telemetry
216+
```
217+
"""
218+
offset = timedelta(microseconds=self._timestamp_counter)
219+
self._timestamp_counter += 1
220+
return self.current_utc_datetime + offset # type: ignore[attr-defined]
221+
222+
def current_utc_datetime_with_sequence(self):
223+
"""Alias for now_with_sequence for API parity with other SDKs."""
224+
return self.now_with_sequence()

durabletask/worker.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import durabletask.internal.orchestrator_service_pb2 as pb
2020
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
2121
import durabletask.internal.shared as shared
22-
from durabletask import task
22+
from durabletask import deterministic, task
2323
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
2424

2525
TInput = TypeVar("TInput")
@@ -159,6 +159,8 @@ class TaskHubGrpcWorker:
159159
interceptors to apply to the channel. Defaults to None.
160160
concurrency_options (Optional[ConcurrencyOptions], optional): Configuration for
161161
controlling worker concurrency limits. If None, default settings are used.
162+
stop_timeout (float, optional): Maximum time in seconds to wait for the worker thread
163+
to stop when calling stop(). Defaults to 30.0. Useful to set lower values in tests.
162164
163165
Attributes:
164166
concurrency_options (ConcurrencyOptions): The current concurrency configuration.
@@ -224,6 +226,7 @@ def __init__(
224226
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
225227
concurrency_options: Optional[ConcurrencyOptions] = None,
226228
channel_options: Optional[Sequence[tuple[str, Any]]] = None,
229+
stop_timeout: float = 30.0,
227230
):
228231
self._registry = _Registry()
229232
self._host_address = host_address if host_address else shared.get_default_host_address()
@@ -232,6 +235,7 @@ def __init__(
232235
self._is_running = False
233236
self._secure_channel = secure_channel
234237
self._channel_options = channel_options
238+
self._stop_timeout = stop_timeout
235239
# Track in-flight activity executions for graceful draining
236240
import threading as _threading
237241

@@ -512,7 +516,7 @@ def stop(self):
512516
if self._response_stream is not None:
513517
self._response_stream.cancel()
514518
if self._runLoop is not None:
515-
self._runLoop.join(timeout=30)
519+
self._runLoop.join(timeout=self._stop_timeout)
516520
self._async_worker_manager.shutdown()
517521
self._logger.info("Worker shutdown completed")
518522
self._is_running = False
@@ -659,11 +663,12 @@ def _execute_activity(
659663
)
660664

661665

662-
class _RuntimeOrchestrationContext(task.OrchestrationContext):
666+
class _RuntimeOrchestrationContext(task.OrchestrationContext, deterministic.DeterministicContextMixin):
663667
_generator: Optional[Generator[task.Task, Any, Any]]
664668
_previous_task: Optional[task.Task]
665669

666670
def __init__(self, instance_id: str):
671+
super().__init__()
667672
self._generator = None
668673
self._is_replaying = True
669674
self._is_suspended = False

0 commit comments

Comments
 (0)