Skip to content

Commit cefb2b8

Browse files
committed
Several cleanups, addressing review comments
Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
1 parent 5b56355 commit cefb2b8

15 files changed

Lines changed: 91 additions & 86 deletions

docling_jobkit/config/target_config.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44

55

66
class S3PresignedConfig(BaseModel):
7+
"""Server-managed presigned target policy injected by docling-serve.
8+
9+
``docling-serve`` builds this internal config from its artifact storage settings
10+
before constructing an orchestrator. The storage prefix lives on
11+
``s3_coords.key_prefix`` so jobkit does not carry a redundant top-level field.
12+
"""
13+
714
s3_coords: S3Coordinates
8-
key_prefix: str = "converted/"
915
date_partition_format: str = "%Y%m%d"
1016
url_expiration: int = Field(default=3600, ge=60, le=604800)

docling_jobkit/connectors/google_drive_source_processor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from io import BytesIO
44
from typing import TYPE_CHECKING, Iterator
55

6+
from typing_extensions import override
7+
68
from docling.datamodel.base_models import DocumentStream
79

810
if TYPE_CHECKING:
@@ -84,6 +86,7 @@ def _fetch_document_by_id(self, info: GoogleDriveFileIdentifier) -> DocumentStre
8486
stream=buffer,
8587
)
8688

89+
@override
8790
def _make_document_ref(
8891
self, info: GoogleDriveFileIdentifier, source_index: int
8992
) -> SourceDocumentRef[GoogleDriveFileIdentifier]:
@@ -93,5 +96,4 @@ def _make_document_ref(
9396
source_index=source_index,
9497
source_uri=source_uri,
9598
filename=info["name"],
96-
metadata={"mimeType": info["mimeType"]},
9799
)

docling_jobkit/connectors/http_source_processor.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import Iterator, TypedDict
22

3+
from typing_extensions import override
4+
35
from docling.datamodel.service.sources import FileSource, HttpSource
46
from docling_core.types.io import DocumentStream
57

@@ -43,6 +45,7 @@ def _fetch_document_by_id(self, identifier: HttpFileIdentifier) -> DocumentStrea
4345
else:
4446
raise ValueError(f"Unsupported source type: {type(source)}")
4547

48+
@override
4649
def _make_document_ref(
4750
self, identifier: HttpFileIdentifier, source_index: int
4851
) -> SourceDocumentRef[HttpFileIdentifier]:
@@ -60,14 +63,16 @@ def _make_document_ref(
6063
filename=filename,
6164
)
6265

66+
@override
6367
def fetch_converter_source_by_ref(
6468
self, ref: SourceDocumentRef[HttpFileIdentifier]
6569
) -> ConverterSource:
6670
source = ref.id["source"]
6771
if isinstance(source, HttpSource):
6872
return str(source.url)
69-
return self.fetch_document_by_ref(ref)
73+
return self._fetch_document_by_id(ref.id)
7074

75+
@override
7176
def headers_for_ref(
7277
self, ref: SourceDocumentRef[HttpFileIdentifier]
7378
) -> dict[str, object] | None:

docling_jobkit/connectors/local_path_source_processor.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from pathlib import Path
22
from typing import Iterator, TypedDict
33

4+
from typing_extensions import override
5+
46
from docling_core.types.io import DocumentStream
57

68
from docling_jobkit.connectors.source_processor import (
@@ -125,6 +127,7 @@ def _fetch_document_by_id(
125127

126128
return DocumentStream(name=str(file_path), stream=buffer)
127129

130+
@override
128131
def _make_document_ref(
129132
self, identifier: LocalPathFileIdentifier, source_index: int
130133
) -> SourceDocumentRef[LocalPathFileIdentifier]:
@@ -134,10 +137,6 @@ def _make_document_ref(
134137
source_index=source_index,
135138
source_uri=str(file_path),
136139
filename=str(file_path),
137-
metadata={
138-
"size": identifier["size"],
139-
"last_modified": identifier["last_modified"],
140-
},
141140
)
142141

143142
def _fetch_documents(self) -> Iterator[DocumentStream]:

docling_jobkit/connectors/s3_source_processor.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from io import BytesIO
22
from typing import Iterator, TypedDict
33

4+
from typing_extensions import override
5+
46
from docling.datamodel.service.sources import S3Coordinates
57
from docling_core.types.io import DocumentStream
68

@@ -52,6 +54,7 @@ def _count_documents(self) -> int:
5254
total += len(page.get("Contents", []))
5355
return total
5456

57+
@override
5558
def _make_document_ref(
5659
self, identifier: S3FileIdentifier, source_index: int
5760
) -> SourceDocumentRef[S3FileIdentifier]:
@@ -61,10 +64,6 @@ def _make_document_ref(
6164
source_index=source_index,
6265
source_uri=f"s3://{self._coords.bucket}/{key}",
6366
filename=key,
64-
metadata={
65-
"size": identifier["size"],
66-
"last_modified": identifier["last_modified"],
67-
},
6867
)
6968

7069
# ----------------- Document fetch -----------------

docling_jobkit/connectors/s3_upload_support.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,14 @@ def build_task_scoped_s3_key(
6666
artifact_filename: str,
6767
) -> str:
6868
# PresignedUrlTarget writes into operator-managed storage, so the full key
69-
# includes the managed prefix/tenant/date/task structure before the per-source hash.
69+
# includes the managed prefix/tenant/date/task structure before the per-source
70+
# hash. The storage prefix is injected by docling-serve onto
71+
# config.s3_coords.key_prefix when the orchestrator is constructed.
7072
source_key = hash_path_component(source_uri)
7173
date_partition = datetime.now(timezone.utc).strftime(config.date_partition_format)
7274

7375
path_parts: list[str] = []
74-
key_prefix = config.key_prefix.strip("/")
76+
key_prefix = config.s3_coords.key_prefix.strip("/")
7577
if key_prefix:
7678
path_parts.append(key_prefix)
7779

docling_jobkit/connectors/source_processor.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
from abc import ABC, abstractmethod
22
from contextlib import AbstractContextManager
33
from itertools import islice
4-
from typing import Any, Callable, Generic, Iterator, Sequence, TypeVar
4+
from typing import Callable, Generic, Iterator, Sequence, TypeVar
55

6-
from pydantic import BaseModel, ConfigDict, Field
6+
from pydantic import BaseModel, ConfigDict
77

88
from docling.datamodel.base_models import DocumentStream
99

@@ -21,11 +21,16 @@ class SourceDocumentRef(BaseModel, Generic[FileIdentifierT]):
2121
source_index: int
2222
source_uri: str
2323
filename: str
24-
metadata: dict[str, Any] = Field(default_factory=dict)
2524

2625

2726
class DocumentChunk(BaseModel, Generic[SourceT, FileIdentifierT]):
28-
"""A data-only source chunk plus a local fetcher convenience."""
27+
"""A serializable source chunk plus an optional local fetcher convenience.
28+
29+
Local/CLI callers may attach a fetcher so ``iter_documents()`` can materialize
30+
streams lazily from the refs. Cross-process callers such as Ray must strip that
31+
fetcher because it may capture initialized connector state that is not safe to
32+
serialize.
33+
"""
2934

3035
model_config = ConfigDict(arbitrary_types_allowed=True)
3136

@@ -53,6 +58,7 @@ def index(self) -> int:
5358
return self.chunk_index
5459

5560
def iter_documents(self) -> Iterator[DocumentStream]:
61+
"""Materialize documents for local callers when a fetcher is attached."""
5662
if self._fetcher is None:
5763
raise RuntimeError("DocumentChunk does not have an attached fetcher.")
5864
for ref in self.refs:
@@ -118,19 +124,21 @@ def source(self) -> SourceT:
118124
def _count_documents(self) -> int | None:
119125
return None
120126

121-
def fetch_document_by_ref(
122-
self, ref: SourceDocumentRef[FileIdentifierT]
123-
) -> DocumentStream:
124-
return self._fetch_document_by_id(ref.id)
125-
126127
def fetch_converter_source_by_ref(
127128
self, ref: SourceDocumentRef[FileIdentifierT]
128129
) -> ConverterSource:
129-
return self.fetch_document_by_ref(ref)
130+
"""Resolve a ref into the converter input expected by the backend.
131+
132+
Most connectors materialize a ``DocumentStream`` from the ref's identifier.
133+
Connectors with remote-fetch semantics may override this to return a lighter
134+
representation such as a source URL.
135+
"""
136+
return self._fetch_document_by_id(ref.id)
130137

131138
def headers_for_ref(
132139
self, ref: SourceDocumentRef[FileIdentifierT]
133-
) -> dict[str, Any] | None:
140+
) -> dict[str, object] | None:
141+
"""Return per-ref request headers when the converter should fetch remotely."""
134142
del ref
135143
return None
136144

0 commit comments

Comments
 (0)