Skip to content

Commit 02bdac5

Browse files
committed
External storage
1 parent 6cc1a02 commit 02bdac5

File tree

9 files changed

+1772
-43
lines changed

9 files changed

+1772
-43
lines changed

README.md

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ informal introduction to the features and their implementation.
5454
- [Data Conversion](#data-conversion)
5555
- [Pydantic Support](#pydantic-support)
5656
- [Custom Type Data Conversion](#custom-type-data-conversion)
57+
- [External Storage](#external-storage)
5758
- [Workers](#workers)
5859
- [Workflows](#workflows)
5960
- [Definition](#definition)
@@ -309,8 +310,9 @@ other_ns_client = Client(**config)
309310

310311
Data converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of type
311312
`temporalio.converter.DataConverter` can be set via the `data_converter` parameter of the `Client` constructor. Data
312-
converters are a combination of payload converters, payload codecs, and failure converters. Payload converters convert
313-
Python values to/from serialized bytes. Payload codecs convert bytes to bytes (e.g. for compression or encryption).
313+
converters are a combination of payload converters, external storage, payload codecs, and failure converters. Payload
314+
converters convert Python values to/from serialized bytes. External storage optionally stores and retrieves payloads
315+
to/from external storage services using drivers. Payload codecs convert bytes to bytes (e.g. for compression or encryption).
314316
Failure converters convert exceptions to/from serialized failures.
315317

316318
The default data converter supports converting multiple types including:
@@ -455,6 +457,119 @@ my_data_converter = dataclasses.replace(
455457

456458
Now `IPv4Address` can be used in type hints including collections, optionals, etc.
457459

460+
##### External Storage
461+
462+
⚠️ **External storage support is currently at an experimental release stage.** ⚠️
463+
464+
External storage allows large payloads to be offloaded to an external storage service (such as Amazon S3) rather than stored inline in workflow history. This is useful when workflows or activities work with data that would otherwise exceed Temporal's payload size limits.
465+
466+
External storage is configured via the `external_storage` parameter on `DataConverter`, which accepts a `temporalio.extstore.Options` instance. Any driver used to store payloads must also be configured on the component that retrieves them — for example, if the client stores workflow inputs using a driver, the worker must include that driver in its `Options.drivers` list to retrieve them.
467+
468+
The simplest setup uses a single storage driver:
469+
470+
```python
471+
import dataclasses
472+
from temporalio.client import Client
473+
from temporalio.converter import DataConverter
474+
from temporalio.extstore import Options
475+
476+
driver = MyDriver()
477+
478+
client = await Client.connect(
479+
"localhost:7233",
480+
data_converter=dataclasses.replace(
481+
DataConverter.default,
482+
external_storage=Options(drivers=[driver]),
483+
),
484+
)
485+
```
486+
487+
Some things to note about external storage:
488+
489+
* Only payloads that meet or exceed `Options.payload_size_threshold` (default 256 KiB) are offloaded. Smaller payloads are stored inline as normal.
490+
* External storage applies transparently to workflow inputs/outputs, activity inputs/outputs, signals, updates, queries, and failure details.
491+
* The `DataConverter`'s `payload_codec` (if configured) is applied to the *reference* payload stored in workflow history, not to the externally stored bytes. To encrypt or compress the bytes handed to a driver, use `Options.external_converter`.
492+
* Setting `Options.payload_size_threshold` to `None` causes every payload to be considered for external storage regardless of size.
493+
494+
###### Multiple Drivers and Driver Selection
495+
496+
When multiple storage backends are needed, list all drivers in `Options.drivers` and provide a `driver_selector` to control which driver stores new payloads. Any driver in the list not chosen for storing is still available for retrieval, which is useful when migrating between storage backends.
497+
498+
```python
499+
from temporalio.extstore import Options
500+
501+
options = Options(
502+
drivers=[hot_driver, cold_driver],
503+
driver_selector=lambda context, payload: (
504+
hot_driver if payload.ByteSize() < 5 * 1024 * 1024 else cold_driver
505+
),
506+
)
507+
```
508+
509+
For stateful or class-based selection logic, implement `temporalio.extstore.DriverSelector`:
510+
511+
```python
512+
from temporalio.extstore import Driver, DriverContext, DriverSelector
513+
from temporalio.api.common.v1 import Payload
514+
515+
class MyDriverSelector(DriverSelector):
516+
def select_driver(self, context: DriverContext, payload: Payload) -> Driver | None:
517+
# Return None to store the payload inline rather than externally
518+
if payload.ByteSize() < 256 * 1024:
519+
return None
520+
return hot_driver
521+
```
522+
523+
Some things to note about driver selection:
524+
525+
* When no `driver_selector` is set, the first driver in `Options.drivers` is always used for storing.
526+
* Returning `None` from a selector leaves the payload stored inline in workflow history rather than offloading it.
527+
* The driver returned by the selector must be registered in `Options.drivers`. If it is not, a `DriverNotFoundError` is raised.
528+
529+
###### Custom Drivers
530+
531+
Implement `temporalio.extstore.Driver` to integrate with any external storage system:
532+
533+
```python
534+
from collections.abc import Sequence
535+
from temporalio.extstore import Driver, DriverClaim, DriverContext
536+
from temporalio.api.common.v1 import Payload
537+
538+
class MyDriver(Driver):
539+
def name(self) -> str:
540+
return "my-org:driver:custom"
541+
542+
def type(self) -> str:
543+
return "my-org:driver:custom"
544+
545+
async def store(
546+
self, context: DriverContext, payloads: Sequence[Payload]
547+
) -> list[DriverClaim]:
548+
claims = []
549+
for payload in payloads:
550+
key = await my_storage.put(payload.SerializeToString())
551+
claims.append(DriverClaim(data={"key": key}))
552+
return claims
553+
554+
async def retrieve(
555+
self, context: DriverContext, claims: Sequence[DriverClaim]
556+
) -> list[Payload]:
557+
payloads = []
558+
for claim in claims:
559+
data = await my_storage.get(claim.data["key"])
560+
p = Payload()
561+
p.ParseFromString(data)
562+
payloads.append(p)
563+
return payloads
564+
```
565+
566+
Some things to note about implementing a custom driver:
567+
568+
* `store` and `retrieve` must return lists of the same length as their respective input sequences.
569+
* `Driver.name()` must return a string that is unique among all drivers in `Options.drivers`. This name is embedded in the reference payload stored in workflow history and used to look up the correct driver during retrieval — changing it after payloads have been stored will break retrieval.
570+
* `Driver.type()` should be consistent across all instances of the same driver class.
571+
* Implement `temporalio.converter.WithSerializationContext` on your driver to receive workflow or activity context (namespace, workflow ID, activity ID, etc.) at serialization time.
572+
458573
### Workers
459574

460575
Workers host workflows and/or activities. Here's how to run a worker:

temporalio/bridge/worker.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,9 @@ async def decode_activation(
303303
decode_headers: bool,
304304
) -> None:
305305
"""Decode all payloads in the activation."""
306-
if data_converter._decode_payload_has_effect:
307-
await CommandAwarePayloadVisitor(
308-
skip_search_attributes=True, skip_headers=not decode_headers
309-
).visit(_Visitor(data_converter._decode_payload_sequence), activation)
306+
await CommandAwarePayloadVisitor(
307+
skip_search_attributes=True, skip_headers=not decode_headers
308+
).visit(_Visitor(data_converter._decode_payload_sequence), activation)
310309

311310

312311
async def encode_completion(

temporalio/converter.py

Lines changed: 82 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from itertools import zip_longest
2222
from logging import getLogger
2323
from typing import (
24+
TYPE_CHECKING,
2425
Any,
2526
ClassVar,
2627
Literal,
@@ -44,6 +45,9 @@
4445
import temporalio.exceptions
4546
import temporalio.types
4647

48+
if TYPE_CHECKING:
49+
from temporalio.extstore import StorageOptions, _ExternalStorageMiddleware
50+
4751
if sys.version_info < (3, 11):
4852
# Python's datetime.fromisoformat doesn't support certain formats pre-3.11
4953
from dateutil import parser # type: ignore
@@ -922,11 +926,42 @@ def to_failure(
922926
failure: temporalio.api.failure.v1.Failure,
923927
) -> None:
924928
"""See base class."""
929+
from temporalio.extstore import (
930+
DriverError,
931+
PayloadNotFoundError,
932+
)
933+
925934
# If already a failure error, use that
926935
if isinstance(exception, temporalio.exceptions.FailureError):
927936
self._error_to_failure(exception, payload_converter, failure)
928937
elif isinstance(exception, nexusrpc.HandlerError):
929938
self._nexus_handler_error_to_failure(exception, payload_converter, failure)
939+
elif isinstance(exception, PayloadNotFoundError):
940+
# Convert to failure error
941+
failure_error = temporalio.exceptions.ApplicationError(
942+
str(exception),
943+
{
944+
"driver_name": exception.driver_name,
945+
"driver_claim": exception.driver_claim,
946+
},
947+
type=exception.__class__.__name__,
948+
non_retryable=True,
949+
)
950+
failure_error.__traceback__ = exception.__traceback__
951+
failure_error.__cause__ = exception.__cause__
952+
self._error_to_failure(failure_error, payload_converter, failure)
953+
elif isinstance(exception, DriverError):
954+
# Convert to failure error
955+
failure_error = temporalio.exceptions.ApplicationError(
956+
str(exception),
957+
{
958+
"driver_name": exception.driver_name,
959+
},
960+
type=exception.__class__.__name__,
961+
)
962+
failure_error.__traceback__ = exception.__traceback__
963+
failure_error.__cause__ = exception.__cause__
964+
self._error_to_failure(failure_error, payload_converter, failure)
930965
else:
931966
# Convert to failure error
932967
failure_error = temporalio.exceptions.ApplicationError(
@@ -1289,15 +1324,27 @@ class DataConverter(WithSerializationContext):
12891324
payload_limits: PayloadLimitsConfig = PayloadLimitsConfig()
12901325
"""Settings for payload size limits."""
12911326

1327+
external_storage: StorageOptions | None = None
1328+
"""Options for external storage. If None, external storage is disabled.
1329+
1330+
.. warning::
1331+
This API is experimental.
1332+
"""
1333+
12921334
default: ClassVar[DataConverter]
12931335
"""Singleton default data converter."""
12941336

1337+
_external_storage_middleware: "_ExternalStorageMiddleware" = dataclasses.field(
1338+
init=False
1339+
)
1340+
12951341
_payload_error_limits: _ServerPayloadErrorLimits | None = None
12961342
"""Server-reported limits for payloads."""
12971343

12981344
def __post_init__(self) -> None: # noqa: D105
12991345
object.__setattr__(self, "payload_converter", self.payload_converter_class())
13001346
object.__setattr__(self, "failure_converter", self.failure_converter_class())
1347+
self._reset_external_storage_middleware()
13011348

13021349
async def encode(
13031350
self, values: Sequence[Any]
@@ -1375,27 +1422,45 @@ def with_context(self, context: SerializationContext) -> Self:
13751422
payload_converter = self.payload_converter
13761423
payload_codec = self.payload_codec
13771424
failure_converter = self.failure_converter
1425+
external_storage = self.external_storage
13781426
if isinstance(payload_converter, WithSerializationContext):
13791427
payload_converter = payload_converter.with_context(context)
13801428
if isinstance(payload_codec, WithSerializationContext):
13811429
payload_codec = payload_codec.with_context(context)
13821430
if isinstance(failure_converter, WithSerializationContext):
13831431
failure_converter = failure_converter.with_context(context)
1432+
if isinstance(external_storage, WithSerializationContext):
1433+
external_storage = external_storage.with_context(context)
13841434
if all(
13851435
new is orig
13861436
for new, orig in [
13871437
(payload_converter, self.payload_converter),
13881438
(payload_codec, self.payload_codec),
13891439
(failure_converter, self.failure_converter),
1440+
(external_storage, self.external_storage),
13901441
]
13911442
):
13921443
return self
13931444
cloned = dataclasses.replace(self)
13941445
object.__setattr__(cloned, "payload_converter", payload_converter)
13951446
object.__setattr__(cloned, "payload_codec", payload_codec)
13961447
object.__setattr__(cloned, "failure_converter", failure_converter)
1448+
object.__setattr__(cloned, "external_storage", external_storage)
1449+
cloned._reset_external_storage_middleware(context)
13971450
return cloned
13981451

1452+
def _reset_external_storage_middleware(
1453+
self, context: SerializationContext | None = None
1454+
) -> None:
1455+
# Lazy import to avoid circular dependency
1456+
from temporalio.extstore import _ExternalStorageMiddleware
1457+
1458+
object.__setattr__(
1459+
self,
1460+
"_external_storage_middleware",
1461+
_ExternalStorageMiddleware(self.external_storage, context),
1462+
)
1463+
13991464
def _with_payload_error_limits(
14001465
self, limits: _ServerPayloadErrorLimits | None
14011466
) -> DataConverter:
@@ -1453,48 +1518,47 @@ async def _encode_memo_existing(
14531518
async def _encode_payload(
14541519
self, payload: temporalio.api.common.v1.Payload
14551520
) -> temporalio.api.common.v1.Payload:
1521+
payload = await self._external_storage_middleware.store_payload(payload)
14561522
if self.payload_codec:
14571523
payload = (await self.payload_codec.encode([payload]))[0]
14581524
self._validate_payload_limits([payload])
14591525
return payload
14601526

14611527
async def _encode_payloads(self, payloads: temporalio.api.common.v1.Payloads):
1462-
if self.payload_codec:
1463-
await self.payload_codec.encode_wrapper(payloads)
1464-
self._validate_payload_limits(payloads.payloads)
1528+
encoded_payloads = await self._encode_payload_sequence(payloads.payloads)
1529+
del payloads.payloads[:]
1530+
payloads.payloads.extend(encoded_payloads)
14651531

14661532
async def _encode_payload_sequence(
14671533
self, payloads: Sequence[temporalio.api.common.v1.Payload]
14681534
) -> list[temporalio.api.common.v1.Payload]:
1469-
encoded_payloads = list(payloads)
1535+
result = await self._external_storage_middleware.store_payloads(payloads)
14701536
if self.payload_codec:
1471-
encoded_payloads = await self.payload_codec.encode(encoded_payloads)
1472-
self._validate_payload_limits(encoded_payloads)
1473-
return encoded_payloads
1537+
result = await self.payload_codec.encode(result)
1538+
self._validate_payload_limits(result)
1539+
return result
14741540

14751541
async def _decode_payload(
14761542
self, payload: temporalio.api.common.v1.Payload
14771543
) -> temporalio.api.common.v1.Payload:
14781544
if self.payload_codec:
14791545
payload = (await self.payload_codec.decode([payload]))[0]
1546+
payload = await self._external_storage_middleware.retrieve_payload(payload)
14801547
return payload
14811548

14821549
async def _decode_payloads(self, payloads: temporalio.api.common.v1.Payloads):
1483-
if self.payload_codec:
1484-
await self.payload_codec.decode_wrapper(payloads)
1550+
decoded_payloads = await self._decode_payload_sequence(payloads.payloads)
1551+
del payloads.payloads[:]
1552+
payloads.payloads.extend(decoded_payloads)
14851553

14861554
async def _decode_payload_sequence(
14871555
self, payloads: Sequence[temporalio.api.common.v1.Payload]
14881556
) -> list[temporalio.api.common.v1.Payload]:
1489-
if not self.payload_codec:
1490-
return list(payloads)
1491-
return await self.payload_codec.decode(payloads)
1492-
1493-
# Temporary shortcircuit detection while the _decode_* methods may no-op if
1494-
# a payload codec is not configured. Remove once those paths have more to them.
1495-
@property
1496-
def _decode_payload_has_effect(self) -> bool:
1497-
return self.payload_codec is not None
1557+
result = list(payloads)
1558+
if self.payload_codec:
1559+
result = await self.payload_codec.decode(result)
1560+
result = await self._external_storage_middleware.retrieve_payloads(result)
1561+
return result
14981562

14991563
@staticmethod
15001564
async def _apply_to_failure_payloads(

0 commit comments

Comments
 (0)