From 8fe68442d10bc70c26e9fa18a226983efee84314 Mon Sep 17 00:00:00 2001 From: Manuel Raynaud Date: Tue, 2 Jun 2026 17:23:30 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B(backend)=20stream=20document=20con?= =?UTF-8?q?tent=20with=20an=20async=20iterator=20under=20ASGI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When docs is ran using an ASGI server, the django StreamingHttpResponse expect to consume an async generator. If a sync generator is used, it fully consumes it and then return all the content and we loose the benefits of using a StreamingHttpResponse. --- CHANGELOG.md | 1 + src/backend/core/api/viewsets.py | 7 +- .../test_api_documents_content_retrieve.py | 31 +++++ .../tests/test_utils_s3_response_stream.py | 125 ++++++++++++++++++ src/backend/core/utils/s3_response_stream.py | 40 ++++++ src/backend/pyproject.toml | 1 + src/backend/uv.lock | 14 ++ 7 files changed, 214 insertions(+), 5 deletions(-) create mode 100644 src/backend/core/tests/test_utils_s3_response_stream.py create mode 100644 src/backend/core/utils/s3_response_stream.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e3c426beb..ca8b7128fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to OperationalError #2385 - 🐛(frontend) fix crash when orphaned threads #2395 - 🐛(backend) order trashbin response by most recently deleted +- 🐛(backend) stream document content with an async iterator under ASGI #2381 ## [v5.2.0] - 2026-06-03 diff --git a/src/backend/core/api/viewsets.py b/src/backend/core/api/viewsets.py index 4caafd70f9..b5d4304afc 100644 --- a/src/backend/core/api/viewsets.py +++ b/src/backend/core/api/viewsets.py @@ -69,6 +69,7 @@ from core.tasks.mail import send_ask_for_access_mail from core.utils.analytics import PosthogEventName, posthog_capture from core.utils.paths import filter_descendants +from core.utils.s3_response_stream import content_stream from core.utils.treebeard import create_tree_node_with_retry from core.utils.users import users_sharing_documents_with from core.utils.yjs import extract_attachments @@ -2161,12 +2162,8 @@ def content_retrieve(self, request, *args, **kwargs): settings.CONTENT_METADATA_CACHE_TIMEOUT, ) - def _stream(body): - yield from body.iter_chunks() - body.close() - response = StreamingHttpResponse( - streaming_content=_stream(s3_response["Body"]), + streaming_content=content_stream(s3_response["Body"]), content_type="text/plain", status=status.HTTP_200_OK, ) diff --git a/src/backend/core/tests/documents/test_api_documents_content_retrieve.py b/src/backend/core/tests/documents/test_api_documents_content_retrieve.py index a4e8ae4b94..59bd40f548 100644 --- a/src/backend/core/tests/documents/test_api_documents_content_retrieve.py +++ b/src/backend/core/tests/documents/test_api_documents_content_retrieve.py @@ -10,6 +10,7 @@ from django.utils import timezone import pytest +from asgiref.sync import sync_to_async from rest_framework import status from rest_framework.test import APIClient @@ -164,6 +165,36 @@ def test_api_documents_content_retrieve_file_not_in_storage(): assert not cache.get(get_content_metadata_cache_key(document.id)) +# The data created in this test through `sync_to_async` is written on a +# separate thread-local database connection, outside the atomic transaction +# pytest-django uses to isolate tests. `transaction=True` makes pytest-django +# flush the tables after the test instead of relying on a rollback, so the row +# does not leak into the rest of the suite. +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio(loop_scope="function") +async def test_api_documents_content_retrieve_async(monkeypatch): + """ + Test the content retrieve method in async should use the async generator in the streaming + response. + """ + monkeypatch.setenv("PYTHON_SERVER_MODE", "async") + + document = await sync_to_async(factories.DocumentFactory)(link_reach="public") + client = APIClient() + + response = await sync_to_async(client.get)( + f"/api/v1.0/documents/{document.id!s}/content/" + ) + + assert response.status_code == status.HTTP_200_OK + # Wait for the streaming content to be fully received => async iterator -> list + # This fails it the streaming is not an async generator + response_content = b"".join( + [content async for content in response.streaming_content] + ).decode("utf-8") + assert response_content == factories.YDOC_HELLO_WORLD_BASE64 + + def test_api_documents_content_retrieve_content_length_header(): """The response includes the Content-Length header when available from storage.""" user = factories.UserFactory() diff --git a/src/backend/core/tests/test_utils_s3_response_stream.py b/src/backend/core/tests/test_utils_s3_response_stream.py new file mode 100644 index 0000000000..e42c1d79fd --- /dev/null +++ b/src/backend/core/tests/test_utils_s3_response_stream.py @@ -0,0 +1,125 @@ +"""Test the s3 response stream utilities.""" + +from collections.abc import AsyncIterator, Iterator + +import pytest +from asgiref.sync import async_to_sync + +from core.utils.s3_response_stream import async_stream, content_stream, sync_stream + +pytestmark = pytest.mark.django_db + + +class FakeS3Body: + """Minimal stand-in for a botocore StreamingBody.""" + + def __init__(self, chunks): + self._chunks = chunks + self.closed = False + + def iter_chunks(self): + """Yield the configured chunks, like StreamingBody.iter_chunks.""" + yield from self._chunks + + def close(self): + """Record that the body has been closed.""" + self.closed = True + + +def collect_async(async_gen): + """Consume an async generator synchronously and return its items as a list.""" + + async def _collect(): + return [chunk async for chunk in async_gen] + + return async_to_sync(_collect)() + + +# -- sync_stream -- + + +def test_sync_stream_yields_all_chunks(): + """Should yield every chunk of the body in order.""" + body = FakeS3Body([b"hello", b"world", b"!"]) + + assert list(sync_stream(body)) == [b"hello", b"world", b"!"] + + +def test_sync_stream_empty_body(): + """Should yield nothing when the body is empty.""" + body = FakeS3Body([]) + + assert not list(sync_stream(body)) + + +def test_sync_stream_closes_body(): + """Should close the body once it has been fully consumed.""" + body = FakeS3Body([b"hello"]) + + assert body.closed is False + list(sync_stream(body)) + assert body.closed is True + + +# -- async_stream -- + + +def test_async_stream_yields_all_chunks(): + """Should yield every chunk of the body in order.""" + body = FakeS3Body([b"hello", b"world", b"!"]) + + assert collect_async(async_stream(body)) == [b"hello", b"world", b"!"] + + +def test_async_stream_empty_body(): + """Should yield nothing when the body is empty.""" + body = FakeS3Body([]) + + assert not collect_async(async_stream(body)) + + +def test_async_stream_closes_body(): + """Should close the body once it has been fully consumed.""" + body = FakeS3Body([b"hello"]) + + assert body.closed is False + collect_async(async_stream(body)) + assert body.closed is True + + +# -- content_stream -- + + +def test_content_stream_async_mode(monkeypatch): + """In async mode, content_stream should return an async iterator.""" + monkeypatch.setenv("PYTHON_SERVER_MODE", "async") + body = FakeS3Body([b"hello", b"world"]) + + stream = content_stream(body) + + assert isinstance(stream, AsyncIterator) + assert collect_async(stream) == [b"hello", b"world"] + + +def test_content_stream_sync_mode(monkeypatch): + """In sync mode, content_stream should return a sync iterator.""" + monkeypatch.setenv("PYTHON_SERVER_MODE", "sync") + body = FakeS3Body([b"hello", b"world"]) + + stream = content_stream(body) + + assert not isinstance(stream, AsyncIterator) + assert isinstance(stream, Iterator) + assert list(stream) == [b"hello", b"world"] + + +def test_content_stream_defaults_to_sync(monkeypatch): + """When PYTHON_SERVER_MODE is not set, content_stream should default to sync.""" + monkeypatch.delenv("PYTHON_SERVER_MODE", raising=False) + body = FakeS3Body([b"hello", b"world"]) + + stream = content_stream(body) + + assert not isinstance(stream, AsyncIterator) + assert isinstance(stream, Iterator) + assert list(stream) == [b"hello", b"world"] diff --git a/src/backend/core/utils/s3_response_stream.py b/src/backend/core/utils/s3_response_stream.py new file mode 100644 index 0000000000..a029a356af --- /dev/null +++ b/src/backend/core/utils/s3_response_stream.py @@ -0,0 +1,40 @@ +"""Utils module to stream content to a StreamingHttpResponse""" + +import os + +from asgiref.sync import sync_to_async +from botocore.response import StreamingBody + + +def sync_stream(body: StreamingBody): + """Synchronous generator consuming s3 response body.""" + yield from body.iter_chunks() + body.close() + + +async def async_stream(body: StreamingBody): + """Asynchronous generator consuming s3 response body""" + # The botocore stream is blocking, so each read is offloaded with + # sync_to_async to avoid blocking the event loop. + chunks = await sync_to_async(body.iter_chunks)() + sentinel = object() + while True: + chunk = await sync_to_async(next)(chunks, sentinel) + if chunk is sentinel: + break + yield chunk + await sync_to_async(body.close)() + + +def content_stream(body: StreamingBody): + """ + Depending on the server mode (set through the PYTHON_SERVER_MODE + environment variable in impress/asgi.py and impress/wsgi.py), the + content is streamed back with either an asynchronous or a synchronous + iterator. Under ASGI, a synchronous iterator would trigger a Django + warning and be consumed synchronously, defeating the purpose of + streaming. + """ + is_async_server = os.environ.get("PYTHON_SERVER_MODE", "sync") == "async" + + return async_stream(body) if is_async_server else sync_stream(body) diff --git a/src/backend/pyproject.toml b/src/backend/pyproject.toml index 39c8532343..fdd0423423 100644 --- a/src/backend/pyproject.toml +++ b/src/backend/pyproject.toml @@ -88,6 +88,7 @@ dev = [ "pyfakefs==6.2.0", "pylint-django==2.7.0", "pylint<4.0.0", + "pytest-asyncio==1.4.0", "pytest-cov==7.1.0", "pytest-django==4.12.0", "pytest==9.0.3", diff --git a/src/backend/uv.lock b/src/backend/uv.lock index 3ce8df42e9..5fcfbdf316 100644 --- a/src/backend/uv.lock +++ b/src/backend/uv.lock @@ -945,6 +945,7 @@ dev = [ { name = "pylint" }, { name = "pylint-django" }, { name = "pytest" }, + { name = "pytest-asyncio" }, { name = "pytest-cov" }, { name = "pytest-django" }, { name = "pytest-icdiff" }, @@ -1005,6 +1006,7 @@ requires-dist = [ { name = "pylint", marker = "extra == 'dev'", specifier = "<4.0.0" }, { name = "pylint-django", marker = "extra == 'dev'", specifier = "==2.7.0" }, { name = "pytest", marker = "extra == 'dev'", specifier = "==9.0.3" }, + { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = "==1.4.0" }, { name = "pytest-cov", marker = "extra == 'dev'", specifier = "==7.1.0" }, { name = "pytest-django", marker = "extra == 'dev'", specifier = "==4.12.0" }, { name = "pytest-icdiff", marker = "extra == 'dev'", specifier = "==0.9" }, @@ -1929,6 +1931,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d4/24/a372aaf5c9b7208e7112038812994107bc65a84cd00e0354a88c2c77a617/pytest-9.0.3-py3-none-any.whl", hash = "sha256:2c5efc453d45394fdd706ade797c0a81091eccd1d6e4bccfcd476e2b8e0ab5d9", size = 375249, upload-time = "2026-04-07T17:16:16.13Z" }, ] +[[package]] +name = "pytest-asyncio" +version = "1.4.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/43/7c/d36d04db312ecf4298932ef77e6e4a9e8ad017906e24e34f0b0c361a2473/pytest_asyncio-1.4.0.tar.gz", hash = "sha256:c6c0d2259945122819f171a32ecea2c349ead889ee28176caaf492143424be42", size = 58514, upload-time = "2026-05-26T09:56:04.083Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/03/e2/08a497ef684b88559c9cc5f4ad53a37e7b99e727094a86d6ea32536d5d3c/pytest_asyncio-1.4.0-py3-none-any.whl", hash = "sha256:933ca923a23075a87fb7070c0ec272a6848489824d887c85c812670932835aa1", size = 16930, upload-time = "2026-05-26T09:56:02.576Z" }, +] + [[package]] name = "pytest-cov" version = "7.1.0"