Skip to content

Commit 71cff0d

Browse files
committed
External payload storage
1 parent af89be6 commit 71cff0d

File tree

9 files changed

+2131
-43
lines changed

9 files changed

+2131
-43
lines changed

README.md

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ informal introduction to the features and their implementation.
5454
- [Data Conversion](#data-conversion)
5555
- [Pydantic Support](#pydantic-support)
5656
- [Custom Type Data Conversion](#custom-type-data-conversion)
57+
- [External Payload Storage](#external-payload-storage)
5758
- [Workers](#workers)
5859
- [Workflows](#workflows)
5960
- [Definition](#definition)
@@ -309,8 +310,9 @@ other_ns_client = Client(**config)
309310

310311
Data converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of type
311312
`temporalio.converter.DataConverter` can be set via the `data_converter` parameter of the `Client` constructor. Data
312-
converters are a combination of payload converters, payload codecs, and failure converters. Payload converters convert
313-
Python values to/from serialized bytes. Payload codecs convert bytes to bytes (e.g. for compression or encryption).
313+
converters are a combination of payload converters, external payload storage, payload codecs, and failure converters. Payload
314+
converters convert Python values to/from serialized bytes. External payload storage optionally stores and retrieves payloads
315+
to/from external storage services using drivers. Payload codecs convert bytes to bytes (e.g. for compression or encryption).
314316
Failure converters convert exceptions to/from serialized failures.
315317

316318
The default data converter supports converting multiple types including:
@@ -455,6 +457,119 @@ my_data_converter = dataclasses.replace(
455457

456458
Now `IPv4Address` can be used in type hints including collections, optionals, etc.
457459

460+
##### External Payload Storage
461+
462+
⚠️ **External payload storage support is currently at an experimental release stage.** ⚠️
463+
464+
External payload storage allows large payloads to be offloaded to an external storage service (such as Amazon S3) rather than stored inline in workflow history. This is useful when workflows or activities work with data that would otherwise exceed Temporal's payload size limits.
465+
466+
External payload storage is configured via the `external_storage` parameter on `DataConverter`, which accepts a `temporalio.extstore.Options` instance. Any driver used to store payloads must also be configured on the component that retrieves them — for example, if the client stores workflow inputs using a driver, the worker must include that driver in its `Options.drivers` list to retrieve them.
467+
468+
The simplest setup uses a single storage driver:
469+
470+
```python
471+
import dataclasses
472+
from temporalio.client import Client
473+
from temporalio.converter import DataConverter
474+
from temporalio.extstore import Options
475+
476+
driver = MyDriver()
477+
478+
client = await Client.connect(
479+
"localhost:7233",
480+
data_converter=dataclasses.replace(
481+
DataConverter.default,
482+
external_storage=Options(drivers=[driver]),
483+
),
484+
)
485+
```
486+
487+
Some things to note about external payload storage:
488+
489+
* Only payloads that meet or exceed `Options.payload_size_threshold` (default 256 KiB) are offloaded. Smaller payloads are stored inline as normal.
490+
* External payload storage applies transparently to workflow inputs/outputs, activity inputs/outputs, signals, updates, queries, and failure details.
491+
* The `DataConverter`'s `payload_codec` (if configured) is applied to the *reference* payload stored in workflow history, not to the externally stored bytes. To encrypt or compress the bytes handed to a driver, use `Options.external_converter`.
492+
* Setting `Options.payload_size_threshold` to `None` causes every payload to be considered for external payload storage regardless of size.
493+
494+
###### Multiple Drivers and Driver Selection
495+
496+
When multiple storage backends are needed, list all drivers in `Options.drivers` and provide a `driver_selector` to control which driver stores new payloads. Any driver in the list not chosen for storing is still available for retrieval, which is useful when migrating between storage backends.
497+
498+
```python
499+
from temporalio.extstore import Options
500+
501+
options = Options(
502+
drivers=[hot_driver, cold_driver],
503+
driver_selector=lambda context, payload: (
504+
hot_driver if payload.ByteSize() < 5 * 1024 * 1024 else cold_driver
505+
),
506+
)
507+
```
508+
509+
For stateful or class-based selection logic, implement `temporalio.extstore.DriverSelector`:
510+
511+
```python
512+
from temporalio.extstore import Driver, DriverContext, DriverSelector
513+
from temporalio.api.common.v1 import Payload
514+
515+
class MyDriverSelector(DriverSelector):
516+
def select_driver(self, context: DriverContext, payload: Payload) -> Driver | None:
517+
# Return None to store the payload inline rather than externally
518+
if payload.ByteSize() < 256 * 1024:
519+
return None
520+
return hot_driver
521+
```
522+
523+
Some things to note about driver selection:
524+
525+
* When no `driver_selector` is set, the first driver in `Options.drivers` is always used for storing.
526+
* Returning `None` from a selector leaves the payload stored inline in workflow history rather than offloading it.
527+
* The driver returned by the selector must be registered in `Options.drivers`. If it is not, a `DriverNotFoundError` is raised.
528+
529+
###### Custom Drivers
530+
531+
Implement `temporalio.extstore.Driver` to integrate with any external payload storage system:
532+
533+
```python
534+
from collections.abc import Sequence
535+
from temporalio.extstore import Driver, DriverClaim, DriverContext
536+
from temporalio.api.common.v1 import Payload
537+
538+
class MyDriver(Driver):
539+
def __init__(self, driver_name: str | None = None):
540+
self._driver_name = driver_name or "my-org:driver:my-driver"
541+
542+
def name(self) -> str:
543+
return self._driver_name
544+
545+
async def store(
546+
self, context: DriverContext, payloads: Sequence[Payload]
547+
) -> list[DriverClaim]:
548+
claims = []
549+
for payload in payloads:
550+
key = await my_storage.put(payload.SerializeToString())
551+
claims.append(DriverClaim(data={"key": key}))
552+
return claims
553+
554+
async def retrieve(
555+
self, context: DriverContext, claims: Sequence[DriverClaim]
556+
) -> list[Payload]:
557+
payloads = []
558+
for claim in claims:
559+
data = await my_storage.get(claim.data["key"])
560+
p = Payload()
561+
p.ParseFromString(data)
562+
payloads.append(p)
563+
return payloads
564+
```
565+
566+
Some things to note about implementing a custom driver:
567+
568+
* `store` and `retrieve` must return lists of the same length as their respective input sequences.
569+
* `Driver.name()` must return a string that is unique among all drivers in `Options.drivers`. This name is embedded in the reference payload stored in workflow history and used to look up the correct driver during retrieval — changing it after payloads have been stored will break retrieval.
570+
* `Driver.type()` is automatically implemented to return the name of the class. This can be overriden in subclasses but must remain consistent across all instances of the subclass.
571+
* Implement `temporalio.converter.WithSerializationContext` on your driver to receive workflow or activity context (namespace, workflow ID, activity ID, etc.) at serialization time.
572+
458573
### Workers
459574

460575
Workers host workflows and/or activities. Here's how to run a worker:

temporalio/bridge/worker.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,9 @@ async def decode_activation(
303303
decode_headers: bool,
304304
) -> None:
305305
"""Decode all payloads in the activation."""
306-
if data_converter._decode_payload_has_effect:
307-
await CommandAwarePayloadVisitor(
308-
skip_search_attributes=True, skip_headers=not decode_headers
309-
).visit(_Visitor(data_converter._decode_payload_sequence), activation)
306+
await CommandAwarePayloadVisitor(
307+
skip_search_attributes=True, skip_headers=not decode_headers
308+
).visit(_Visitor(data_converter._decode_payload_sequence), activation)
310309

311310

312311
async def encode_completion(

temporalio/converter.py

Lines changed: 82 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from itertools import zip_longest
2222
from logging import getLogger
2323
from typing import (
24+
TYPE_CHECKING,
2425
Any,
2526
ClassVar,
2627
Literal,
@@ -44,6 +45,9 @@
4445
import temporalio.exceptions
4546
import temporalio.types
4647

48+
if TYPE_CHECKING:
49+
from temporalio.extstore import StorageOptions, _ExternalStorageMiddleware
50+
4751
if sys.version_info < (3, 11):
4852
# Python's datetime.fromisoformat doesn't support certain formats pre-3.11
4953
from dateutil import parser # type: ignore
@@ -924,11 +928,42 @@ def to_failure(
924928
failure: temporalio.api.failure.v1.Failure,
925929
) -> None:
926930
"""See base class."""
931+
from temporalio.extstore import (
932+
DriverError,
933+
PayloadNotFoundError,
934+
)
935+
927936
# If already a failure error, use that
928937
if isinstance(exception, temporalio.exceptions.FailureError):
929938
self._error_to_failure(exception, payload_converter, failure)
930939
elif isinstance(exception, nexusrpc.HandlerError):
931940
self._nexus_handler_error_to_failure(exception, payload_converter, failure)
941+
elif isinstance(exception, PayloadNotFoundError):
942+
# Convert to failure error
943+
failure_error = temporalio.exceptions.ApplicationError(
944+
str(exception),
945+
{
946+
"driver_name": exception.driver_name,
947+
"driver_claim": exception.driver_claim,
948+
},
949+
type=exception.__class__.__name__,
950+
non_retryable=True,
951+
)
952+
failure_error.__traceback__ = exception.__traceback__
953+
failure_error.__cause__ = exception.__cause__
954+
self._error_to_failure(failure_error, payload_converter, failure)
955+
elif isinstance(exception, DriverError):
956+
# Convert to failure error
957+
failure_error = temporalio.exceptions.ApplicationError(
958+
str(exception),
959+
{
960+
"driver_name": exception.driver_name,
961+
},
962+
type=exception.__class__.__name__,
963+
)
964+
failure_error.__traceback__ = exception.__traceback__
965+
failure_error.__cause__ = exception.__cause__
966+
self._error_to_failure(failure_error, payload_converter, failure)
932967
else:
933968
# Convert to failure error
934969
failure_error = temporalio.exceptions.ApplicationError(
@@ -1359,15 +1394,27 @@ class DataConverter(WithSerializationContext):
13591394
payload_limits: PayloadLimitsConfig = PayloadLimitsConfig()
13601395
"""Settings for payload size limits."""
13611396

1397+
external_storage: StorageOptions | None = None
1398+
"""Options for external storage. If None, external storage is disabled.
1399+
1400+
.. warning::
1401+
This API is experimental.
1402+
"""
1403+
13621404
default: ClassVar[DataConverter]
13631405
"""Singleton default data converter."""
13641406

1407+
_external_storage_middleware: "_ExternalStorageMiddleware" = dataclasses.field(
1408+
init=False
1409+
)
1410+
13651411
_payload_error_limits: _ServerPayloadErrorLimits | None = None
13661412
"""Server-reported limits for payloads."""
13671413

13681414
def __post_init__(self) -> None: # noqa: D105
13691415
object.__setattr__(self, "payload_converter", self.payload_converter_class())
13701416
object.__setattr__(self, "failure_converter", self.failure_converter_class())
1417+
self._reset_external_storage_middleware()
13711418

13721419
async def encode(
13731420
self, values: Sequence[Any]
@@ -1445,27 +1492,45 @@ def with_context(self, context: SerializationContext) -> Self:
14451492
payload_converter = self.payload_converter
14461493
payload_codec = self.payload_codec
14471494
failure_converter = self.failure_converter
1495+
external_storage = self.external_storage
14481496
if isinstance(payload_converter, WithSerializationContext):
14491497
payload_converter = payload_converter.with_context(context)
14501498
if isinstance(payload_codec, WithSerializationContext):
14511499
payload_codec = payload_codec.with_context(context)
14521500
if isinstance(failure_converter, WithSerializationContext):
14531501
failure_converter = failure_converter.with_context(context)
1502+
if isinstance(external_storage, WithSerializationContext):
1503+
external_storage = external_storage.with_context(context)
14541504
if all(
14551505
new is orig
14561506
for new, orig in [
14571507
(payload_converter, self.payload_converter),
14581508
(payload_codec, self.payload_codec),
14591509
(failure_converter, self.failure_converter),
1510+
(external_storage, self.external_storage),
14601511
]
14611512
):
14621513
return self
14631514
cloned = dataclasses.replace(self)
14641515
object.__setattr__(cloned, "payload_converter", payload_converter)
14651516
object.__setattr__(cloned, "payload_codec", payload_codec)
14661517
object.__setattr__(cloned, "failure_converter", failure_converter)
1518+
object.__setattr__(cloned, "external_storage", external_storage)
1519+
cloned._reset_external_storage_middleware(context)
14671520
return cloned
14681521

1522+
def _reset_external_storage_middleware(
1523+
self, context: SerializationContext | None = None
1524+
) -> None:
1525+
# Lazy import to avoid circular dependency
1526+
from temporalio.extstore import _ExternalStorageMiddleware
1527+
1528+
object.__setattr__(
1529+
self,
1530+
"_external_storage_middleware",
1531+
_ExternalStorageMiddleware(self.external_storage, context),
1532+
)
1533+
14691534
def _with_payload_error_limits(
14701535
self, limits: _ServerPayloadErrorLimits | None
14711536
) -> DataConverter:
@@ -1523,48 +1588,47 @@ async def _encode_memo_existing(
15231588
async def _encode_payload(
15241589
self, payload: temporalio.api.common.v1.Payload
15251590
) -> temporalio.api.common.v1.Payload:
1591+
payload = await self._external_storage_middleware.store_payload(payload)
15261592
if self.payload_codec:
15271593
payload = (await self.payload_codec.encode([payload]))[0]
15281594
self._validate_payload_limits([payload])
15291595
return payload
15301596

15311597
async def _encode_payloads(self, payloads: temporalio.api.common.v1.Payloads):
1532-
if self.payload_codec:
1533-
await self.payload_codec.encode_wrapper(payloads)
1534-
self._validate_payload_limits(payloads.payloads)
1598+
encoded_payloads = await self._encode_payload_sequence(payloads.payloads)
1599+
del payloads.payloads[:]
1600+
payloads.payloads.extend(encoded_payloads)
15351601

15361602
async def _encode_payload_sequence(
15371603
self, payloads: Sequence[temporalio.api.common.v1.Payload]
15381604
) -> list[temporalio.api.common.v1.Payload]:
1539-
encoded_payloads = list(payloads)
1605+
result = await self._external_storage_middleware.store_payloads(payloads)
15401606
if self.payload_codec:
1541-
encoded_payloads = await self.payload_codec.encode(encoded_payloads)
1542-
self._validate_payload_limits(encoded_payloads)
1543-
return encoded_payloads
1607+
result = await self.payload_codec.encode(result)
1608+
self._validate_payload_limits(result)
1609+
return result
15441610

15451611
async def _decode_payload(
15461612
self, payload: temporalio.api.common.v1.Payload
15471613
) -> temporalio.api.common.v1.Payload:
15481614
if self.payload_codec:
15491615
payload = (await self.payload_codec.decode([payload]))[0]
1616+
payload = await self._external_storage_middleware.retrieve_payload(payload)
15501617
return payload
15511618

15521619
async def _decode_payloads(self, payloads: temporalio.api.common.v1.Payloads):
1553-
if self.payload_codec:
1554-
await self.payload_codec.decode_wrapper(payloads)
1620+
decoded_payloads = await self._decode_payload_sequence(payloads.payloads)
1621+
del payloads.payloads[:]
1622+
payloads.payloads.extend(decoded_payloads)
15551623

15561624
async def _decode_payload_sequence(
15571625
self, payloads: Sequence[temporalio.api.common.v1.Payload]
15581626
) -> list[temporalio.api.common.v1.Payload]:
1559-
if not self.payload_codec:
1560-
return list(payloads)
1561-
return await self.payload_codec.decode(payloads)
1562-
1563-
# Temporary shortcircuit detection while the _decode_* methods may no-op if
1564-
# a payload codec is not configured. Remove once those paths have more to them.
1565-
@property
1566-
def _decode_payload_has_effect(self) -> bool:
1567-
return self.payload_codec is not None
1627+
result = list(payloads)
1628+
if self.payload_codec:
1629+
result = await self.payload_codec.decode(result)
1630+
result = await self._external_storage_middleware.retrieve_payloads(result)
1631+
return result
15681632

15691633
@staticmethod
15701634
async def _apply_to_failure_payloads(

0 commit comments

Comments
 (0)