Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to

## [Unreleased]

### Fixed

- 🐛(backend) stream document content with an async iterator under ASGI

## [v5.2.0] - 2026-06-03

### Added
Expand Down
7 changes: 2 additions & 5 deletions src/backend/core/api/viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
from core.tasks.mail import send_ask_for_access_mail
from core.utils.analytics import PosthogEventName, posthog_capture
from core.utils.paths import filter_descendants
from core.utils.s3_response_stream import content_stream
from core.utils.treebeard import create_tree_node_with_retry
from core.utils.users import users_sharing_documents_with
from core.utils.yjs import extract_attachments
Expand Down Expand Up @@ -2155,12 +2156,8 @@ def content_retrieve(self, request, *args, **kwargs):
settings.CONTENT_METADATA_CACHE_TIMEOUT,
)

def _stream(body):
yield from body.iter_chunks()
body.close()

response = StreamingHttpResponse(
streaming_content=_stream(s3_response["Body"]),
streaming_content=content_stream(s3_response["Body"]),
content_type="text/plain",
status=status.HTTP_200_OK,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from django.utils import timezone

import pytest
from asgiref.sync import sync_to_async
from rest_framework import status
from rest_framework.test import APIClient

Expand Down Expand Up @@ -164,6 +165,36 @@ def test_api_documents_content_retrieve_file_not_in_storage():
assert not cache.get(get_content_metadata_cache_key(document.id))


# The data created in this test through `sync_to_async` is written on a
# separate thread-local database connection, outside the atomic transaction
# pytest-django uses to isolate tests. `transaction=True` makes pytest-django
# flush the tables after the test instead of relying on a rollback, so the row
# does not leak into the rest of the suite.
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio(loop_scope="function")
async def test_api_documents_content_retrieve_async(monkeypatch):
"""
Test the content retrieve method in async should use the async generator in the streaming
response.
"""
monkeypatch.setenv("PYTHON_SERVER_MODE", "async")

document = await sync_to_async(factories.DocumentFactory)(link_reach="public")
client = APIClient()

response = await sync_to_async(client.get)(
f"/api/v1.0/documents/{document.id!s}/content/"
)

assert response.status_code == status.HTTP_200_OK
# Wait for the streaming content to be fully received => async iterator -> list
# This fails it the streaming is not an async generator
response_content = b"".join(
[content async for content in response.streaming_content]
).decode("utf-8")
assert response_content == factories.YDOC_HELLO_WORLD_BASE64


def test_api_documents_content_retrieve_content_length_header():
"""The response includes the Content-Length header when available from storage."""
user = factories.UserFactory()
Expand Down
125 changes: 125 additions & 0 deletions src/backend/core/tests/test_utils_s3_response_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Test the s3 response stream utilities."""

from collections.abc import AsyncIterator, Iterator

import pytest
from asgiref.sync import async_to_sync

from core.utils.s3_response_stream import async_stream, content_stream, sync_stream

pytestmark = pytest.mark.django_db


class FakeS3Body:
"""Minimal stand-in for a botocore StreamingBody."""

def __init__(self, chunks):
self._chunks = chunks
self.closed = False

def iter_chunks(self):
"""Yield the configured chunks, like StreamingBody.iter_chunks."""
yield from self._chunks

def close(self):
"""Record that the body has been closed."""
self.closed = True


def collect_async(async_gen):
"""Consume an async generator synchronously and return its items as a list."""

async def _collect():
return [chunk async for chunk in async_gen]

return async_to_sync(_collect)()


# -- sync_stream --


def test_sync_stream_yields_all_chunks():
"""Should yield every chunk of the body in order."""
body = FakeS3Body([b"hello", b"world", b"!"])

assert list(sync_stream(body)) == [b"hello", b"world", b"!"]


def test_sync_stream_empty_body():
"""Should yield nothing when the body is empty."""
body = FakeS3Body([])

assert not list(sync_stream(body))


def test_sync_stream_closes_body():
"""Should close the body once it has been fully consumed."""
body = FakeS3Body([b"hello"])

assert body.closed is False
list(sync_stream(body))
assert body.closed is True


# -- async_stream --


def test_async_stream_yields_all_chunks():
"""Should yield every chunk of the body in order."""
body = FakeS3Body([b"hello", b"world", b"!"])

assert collect_async(async_stream(body)) == [b"hello", b"world", b"!"]


def test_async_stream_empty_body():
"""Should yield nothing when the body is empty."""
body = FakeS3Body([])

assert not collect_async(async_stream(body))


def test_async_stream_closes_body():
"""Should close the body once it has been fully consumed."""
body = FakeS3Body([b"hello"])

assert body.closed is False
collect_async(async_stream(body))
assert body.closed is True


# -- content_stream --


def test_content_stream_async_mode(monkeypatch):
"""In async mode, content_stream should return an async iterator."""
monkeypatch.setenv("PYTHON_SERVER_MODE", "async")
body = FakeS3Body([b"hello", b"world"])

stream = content_stream(body)

assert isinstance(stream, AsyncIterator)
assert collect_async(stream) == [b"hello", b"world"]


def test_content_stream_sync_mode(monkeypatch):
"""In sync mode, content_stream should return a sync iterator."""
monkeypatch.setenv("PYTHON_SERVER_MODE", "sync")
body = FakeS3Body([b"hello", b"world"])

stream = content_stream(body)

assert not isinstance(stream, AsyncIterator)
assert isinstance(stream, Iterator)
assert list(stream) == [b"hello", b"world"]


def test_content_stream_defaults_to_sync(monkeypatch):
"""When PYTHON_SERVER_MODE is not set, content_stream should default to sync."""
monkeypatch.delenv("PYTHON_SERVER_MODE", raising=False)
body = FakeS3Body([b"hello", b"world"])

stream = content_stream(body)

assert not isinstance(stream, AsyncIterator)
assert isinstance(stream, Iterator)
assert list(stream) == [b"hello", b"world"]
40 changes: 40 additions & 0 deletions src/backend/core/utils/s3_response_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Utils module to stream content to a StreamingHttpResponse"""

import os

from asgiref.sync import sync_to_async
from botocore.response import StreamingBody


def sync_stream(body: StreamingBody):
"""Synchronous generator consuming s3 response body."""
yield from body.iter_chunks()
body.close()


async def async_stream(body: StreamingBody):
"""Asynchronous generator consuming s3 response body"""
# The botocore stream is blocking, so each read is offloaded with
# sync_to_async to avoid blocking the event loop.
chunks = await sync_to_async(body.iter_chunks)()
sentinel = object()
while True:
chunk = await sync_to_async(next)(chunks, sentinel)
if chunk is sentinel:
break
yield chunk
await sync_to_async(body.close)()


def content_stream(body: StreamingBody):
"""
Depending on the server mode (set through the PYTHON_SERVER_MODE
environment variable in impress/asgi.py and impress/wsgi.py), the
content is streamed back with either an asynchronous or a synchronous
iterator. Under ASGI, a synchronous iterator would trigger a Django
warning and be consumed synchronously, defeating the purpose of
streaming.
"""
is_async_server = os.environ.get("PYTHON_SERVER_MODE", "sync") == "async"

return async_stream(body) if is_async_server else sync_stream(body)
1 change: 1 addition & 0 deletions src/backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ dev = [
"pyfakefs==6.2.0",
"pylint-django==2.7.0",
"pylint<4.0.0",
"pytest-asyncio==1.4.0",
"pytest-cov==7.1.0",
"pytest-django==4.12.0",
"pytest==9.0.3",
Expand Down
14 changes: 14 additions & 0 deletions src/backend/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading