Large payload storage - Python SDK
The Temporal Service enforces a ~2 MB per payload limit. When your Workflows or Activities handle data larger than the limit, you can offload payloads to external storage, such as S3, and pass a small reference token through the event history instead. This is sometimes called the claim check pattern.
External storage sits at the end of the data pipeline, after both the Payload Converter and the Payload Codec:
User code → PayloadConverter → PayloadCodec → External Storage → Temporal Service
When a payload exceeds a configurable size threshold (default 256 KiB), the storage driver uploads it to your external store and replaces it with a lightweight reference. Payloads below the threshold stay inline in the event history. On the way back, reference payloads are retrieved from external storage before the codec decodes them.
Because external storage runs after the codec. If you use an encryption codec, payloads are already encrypted before they're uploaded to your store.
Store and retrieve large payloads using external storage
To offload large payloads, implement a StorageDriver and configure it on your DataConverter. The driver needs a
store() method to upload payloads and a retrieve() method to fetch them back.
Once you implement a storage driver, configure it on your DataConverter and use it when creating your Client and
Worker. All Workflows and Activities running on the Worker will use the storage drive automatically without changes to
your business logic. You can also configure the size threshold and use multiple storage drivers.
Implement a storage driver
Extend the StorageDriver abstract class:
from temporalio.converter import (
StorageDriver,
StorageDriverClaim,
StorageDriverStoreContext,
StorageDriverRetrieveContext,
)
from temporalio.api.common.v1 import Payload
from typing import Sequence
class S3StorageDriver(StorageDriver):
def __init__(self, bucket: str) -> None:
self._bucket = bucket
self._s3 = boto3.client("s3")
def name(self) -> str:
return "s3"
async def store(self, context, payloads):
# Upload payloads, return claims (see below)
...
async def retrieve(self, context, claims):
# Download payloads using claims (see below)
...
Store payloads
The store() method receives a sequence of payloads and must return exactly one StorageDriverClaim per payload. A
claim is a set of string key-value pairs that the driver uses to locate the payload later — typically a storage key or
URL.
Sample implementation:
Retrieve payloads
The retrieve() method receives the claims that store() produced and must return the original payloads:
Sample implementation:
Configure external storage on the Data Converter
Pass an ExternalStorage instance to your DataConverter:
from temporalio.converter import DataConverter, ExternalStorage
converter = DataConverter(
external_storage=ExternalStorage(
drivers=[S3StorageDriver("my-bucket")],
payload_size_threshold=256 * 1024, # 256 KiB (default)
),
)
Use this converter when creating your Client and Worker:
converter = DataConverter(
external_storage=ExternalStorage(
drivers=[LocalDiskStorageDriver()],
payload_size_threshold=1_000, # 1KB — low threshold for testing
),
)
Adjust the size threshold
The payload_size_threshold controls which payloads get offloaded. Payloads smaller than this value stay inline in the
event history.
ExternalStorage(
drivers=[driver],
payload_size_threshold=100 * 1024, # 100 KiB
)
Set it to None to externalize all payloads regardless of size.
Use multiple storage drivers
When you have multiple drivers (for example, hot and cold storage tiers), provide a driver_selector function that
chooses which driver handles each payload:
hot_driver = S3StorageDriver("hot-bucket")
cold_driver = S3StorageDriver("cold-bucket")
ExternalStorage(
drivers=[hot_driver, cold_driver],
driver_selector=lambda context, payload: (
cold_driver if payload.ByteSize() > 1_000_000 else hot_driver
),
payload_size_threshold=100 * 1024,
)
Return None from the selector to keep a specific payload inline.