Skip to content

Commit 8a5727a

Browse files
committed
feat: add batch nested resource access across all resources
Enable .files, .runs, .documents, and .datasets on batch resources, supporting concurrent .list() fan-out across multiple parent IDs.
1 parent 7d60b78 commit 8a5727a

11 files changed

Lines changed: 253 additions & 2 deletions

File tree

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ async def main():
8484
# Batch operations
8585
results = await client.v1.stages("id-1", "id-2", "id-3").get(concurrency=5)
8686

87+
# Batch nested resources — fan out .list() across multiple parents
88+
all_files = await client.v1.stages("id-1", "id-2").files.list()
89+
all_runs = await client.v1.stages("id-1", "id-2").runs.list()
90+
8791
asyncio.run(main())
8892
```
8993

@@ -116,6 +120,12 @@ await client.v1.stages("stage-id").delete()
116120
stages = await client.v1.stages("id-1", "id-2").get()
117121
await client.v1.stages("id-1", "id-2").delete()
118122

123+
# Batch nested resources — fan out .list() concurrently
124+
all_files = await client.v1.stages("id-1", "id-2").files.list()
125+
all_runs = await client.v1.stages("id-1", "id-2").runs.list()
126+
all_docs = await client.v1.search_stores("ss-1", "ss-2").documents.list()
127+
all_ds = await client.v1.repositories("r-1", "r-2").datasets.list()
128+
119129
# Nested resources
120130
file_content = await client.v1.stages("stage-id").files("file-id").get()
121131
presigned = await client.v1.stages("stage-id").files("file-id").presigned_url(ttl=3600)

pharia/resources/connectors.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,44 @@ async def list(self, page: int = 0, size: int = 100, status: str = "") -> RunLis
4646
)
4747

4848

49+
@dataclass
50+
class BatchConnectorFiles:
51+
"""Batch list operations for files across multiple connectors."""
52+
53+
client: "Client"
54+
connector_ids: list[str]
55+
56+
async def list(
57+
self, page: int = 0, size: int = 100, concurrency: int = 10
58+
) -> list[ConnectorFilesListResponse]:
59+
"""List files in multiple connectors concurrently."""
60+
coros = [
61+
ConnectorFiles(client=self.client, connector_id=cid).list(page=page, size=size)
62+
for cid in self.connector_ids
63+
]
64+
return await gather_with_limit(coros, concurrency)
65+
66+
67+
@dataclass
68+
class BatchConnectorRuns:
69+
"""Batch list operations for runs across multiple connectors."""
70+
71+
client: "Client"
72+
connector_ids: list[str]
73+
74+
async def list(
75+
self, page: int = 0, size: int = 100, status: str = "", concurrency: int = 10
76+
) -> list[RunListResponse]:
77+
"""List runs in multiple connectors concurrently."""
78+
coros = [
79+
ConnectorRuns(client=self.client, connector_id=cid).list(
80+
page=page, size=size, status=status
81+
)
82+
for cid in self.connector_ids
83+
]
84+
return await gather_with_limit(coros, concurrency)
85+
86+
4987
@dataclass
5088
class ConnectorResource:
5189
"""Instance-level operations for a single connector."""
@@ -79,6 +117,16 @@ class BatchConnectorResource:
79117
client: "Client"
80118
connector_ids: list[str]
81119

120+
@property
121+
def files(self) -> BatchConnectorFiles:
122+
"""Access files across multiple connectors."""
123+
return BatchConnectorFiles(client=self.client, connector_ids=self.connector_ids)
124+
125+
@property
126+
def runs(self) -> BatchConnectorRuns:
127+
"""Access runs across multiple connectors."""
128+
return BatchConnectorRuns(client=self.client, connector_ids=self.connector_ids)
129+
82130
async def get(self, concurrency: int = 10) -> list[Connector]:
83131
"""Retrieve multiple connectors concurrently."""
84132
coros = [self.client.request("GET", f"/connectors/{cid}") for cid in self.connector_ids]

pharia/resources/datasets.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,33 @@ async def create(self, **dataset_data: Unpack[CreateDatasetInput]) -> Dataset:
143143
return await self.client.request(
144144
"POST", f"/repositories/{self.repository_id}/datasets", json=payload
145145
)
146+
147+
148+
@dataclass
149+
class BatchRepositoryDatasets:
150+
"""Batch list operations for datasets across multiple repositories."""
151+
152+
client: "Client"
153+
repository_ids: list[str]
154+
155+
async def list(
156+
self,
157+
page: int = 0,
158+
size: int = 100,
159+
label: list[str] | None = None,
160+
created_after: str = "",
161+
created_before: str = "",
162+
concurrency: int = 10,
163+
) -> list[DatasetListResponse]:
164+
"""List datasets in multiple repositories concurrently."""
165+
coros = [
166+
RepositoryDatasets(client=self.client, repository_id=rid).list(
167+
page=page,
168+
size=size,
169+
label=label,
170+
created_after=created_after,
171+
created_before=created_before,
172+
)
173+
for rid in self.repository_ids
174+
]
175+
return await gather_with_limit(coros, concurrency)

pharia/resources/documents.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,28 @@ async def list(
122122
return await self.client.request(
123123
"GET", f"/search_stores/{self.search_store_id}/documents", params=params
124124
)
125+
126+
127+
@dataclass
128+
class BatchSearchStoreDocuments:
129+
"""Batch list operations for documents across multiple search stores."""
130+
131+
client: "Client"
132+
search_store_ids: list[str]
133+
134+
async def list(
135+
self,
136+
page: int = 1,
137+
size: int = 100,
138+
name: str = "",
139+
starts_with: str = "",
140+
concurrency: int = 10,
141+
) -> list[DocumentListResponse]:
142+
"""List documents in multiple search stores concurrently."""
143+
coros = [
144+
SearchStoreDocuments(client=self.client, search_store_id=sid).list(
145+
page=page, size=size, name=name, starts_with=starts_with
146+
)
147+
for sid in self.search_store_ids
148+
]
149+
return await gather_with_limit(coros, concurrency)

pharia/resources/files.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,33 @@ async def list(
109109
async def create(self, file_data: dict) -> File:
110110
"""Upload a file to this stage."""
111111
return await self.client.request("POST", f"/stages/{self.stage_id}/files", json=file_data)
112+
113+
114+
@dataclass
115+
class BatchStageFiles:
116+
"""Batch list operations for files across multiple stages."""
117+
118+
client: "Client"
119+
stage_ids: list[str]
120+
121+
async def list(
122+
self,
123+
page: int = 0,
124+
size: int = 100,
125+
name: str = "",
126+
created_after: str = "",
127+
created_before: str = "",
128+
concurrency: int = 10,
129+
) -> list[FileListResponse]:
130+
"""List files in multiple stages concurrently."""
131+
coros = [
132+
StageFiles(client=self.client, stage_id=sid).list(
133+
page=page,
134+
size=size,
135+
name=name,
136+
created_after=created_after,
137+
created_before=created_before,
138+
)
139+
for sid in self.stage_ids
140+
]
141+
return await gather_with_limit(coros, concurrency)

pharia/resources/repositories.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from pharia.models import RepositoryListResponse
99
from pharia.models import create_repository_to_api
1010
from pharia.resources.base import gather_with_limit
11-
from pharia.resources.datasets import DatasetResource
11+
from pharia.resources.datasets import BatchRepositoryDatasets
1212
from pharia.resources.datasets import RepositoryDatasets
1313

1414

@@ -44,6 +44,11 @@ class BatchRepositoryResource:
4444
client: "Client"
4545
repository_ids: list[str]
4646

47+
@property
48+
def datasets(self) -> BatchRepositoryDatasets:
49+
"""Access datasets across multiple repositories."""
50+
return BatchRepositoryDatasets(client=self.client, repository_ids=self.repository_ids)
51+
4752
async def get(self, concurrency: int = 10) -> list[Repository]:
4853
"""Retrieve multiple repositories concurrently."""
4954
coros = [self.client.request("GET", f"/repositories/{rid}") for rid in self.repository_ids]

pharia/resources/search_stores.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from pharia.models import search_input_to_api
1616
from pharia.models import update_search_store_to_api
1717
from pharia.resources.base import gather_with_limit
18+
from pharia.resources.documents import BatchSearchStoreDocuments
1819
from pharia.resources.documents import SearchStoreDocuments
1920

2021

@@ -79,6 +80,11 @@ class BatchSearchStoreResource:
7980
client: "Client"
8081
search_store_ids: list[str]
8182

83+
@property
84+
def documents(self) -> BatchSearchStoreDocuments:
85+
"""Access documents across multiple search stores."""
86+
return BatchSearchStoreDocuments(client=self.client, search_store_ids=self.search_store_ids)
87+
8288
async def get(self, concurrency: int = 10) -> list[SearchStore]:
8389
"""Retrieve multiple search stores concurrently."""
8490
coros = [

pharia/resources/stages.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from pharia.models import create_stage_to_api
1919
from pharia.models import update_stage_to_api
2020
from pharia.resources.base import gather_with_limit
21+
from pharia.resources.files import BatchStageFiles
2122
from pharia.resources.files import StageFiles
2223

2324

@@ -38,6 +39,24 @@ async def list(self, page: int = 0, size: int = 100, status: str = "") -> RunLis
3839
return await self.client.request("GET", f"/stages/{self.stage_id}/runs", params=params)
3940

4041

42+
@dataclass
43+
class BatchStageRuns:
44+
"""Batch list operations for runs across multiple stages."""
45+
46+
client: "Client"
47+
stage_ids: list[str]
48+
49+
async def list(
50+
self, page: int = 0, size: int = 100, status: str = "", concurrency: int = 10
51+
) -> list[RunListResponse]:
52+
"""List runs in multiple stages concurrently."""
53+
coros = [
54+
StageRuns(client=self.client, stage_id=sid).list(page=page, size=size, status=status)
55+
for sid in self.stage_ids
56+
]
57+
return await gather_with_limit(coros, concurrency)
58+
59+
4160
@dataclass
4261
class StageResource:
4362
"""Instance-level operations for a single stage."""
@@ -76,6 +95,16 @@ class BatchStageResource:
7695
client: "Client"
7796
stage_ids: list[str]
7897

98+
@property
99+
def files(self) -> BatchStageFiles:
100+
"""Access files across multiple stages."""
101+
return BatchStageFiles(client=self.client, stage_ids=self.stage_ids)
102+
103+
@property
104+
def runs(self) -> BatchStageRuns:
105+
"""Access runs across multiple stages."""
106+
return BatchStageRuns(client=self.client, stage_ids=self.stage_ids)
107+
79108
async def get(self, concurrency: int = 10) -> list[Stage]:
80109
"""Retrieve multiple stages concurrently."""
81110
coros = [self.client.request("GET", f"/stages/{sid}") for sid in self.stage_ids]

pharia/tests/test_stages.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,65 @@ async def test_stages_nested_files():
5050
assert hasattr(file_resource, "update")
5151
assert hasattr(file_resource, "delete")
5252
assert hasattr(file_resource, "presigned_url")
53+
54+
55+
@pytest.mark.asyncio
56+
async def test_batch_stages_has_files_and_runs():
57+
"""Test batch stages expose .files and .runs with .list() methods."""
58+
client = Client(base_url="https://api.example.com", api_key="test-key")
59+
60+
batch = client.v1.stages("id1", "id2")
61+
assert hasattr(batch, "files")
62+
assert hasattr(batch, "runs")
63+
64+
batch_files = batch.files
65+
assert hasattr(batch_files, "list")
66+
assert batch_files.stage_ids == ["id1", "id2"]
67+
68+
batch_runs = batch.runs
69+
assert hasattr(batch_runs, "list")
70+
assert batch_runs.stage_ids == ["id1", "id2"]
71+
72+
73+
@pytest.mark.asyncio
74+
async def test_batch_search_stores_has_documents():
75+
"""Test batch search stores expose .documents with .list() method."""
76+
client = Client(base_url="https://api.example.com", api_key="test-key")
77+
78+
batch = client.v1.search_stores("ss1", "ss2")
79+
assert hasattr(batch, "documents")
80+
81+
batch_docs = batch.documents
82+
assert hasattr(batch_docs, "list")
83+
assert batch_docs.search_store_ids == ["ss1", "ss2"]
84+
85+
86+
@pytest.mark.asyncio
87+
async def test_batch_repositories_has_datasets():
88+
"""Test batch repositories expose .datasets with .list() method."""
89+
client = Client(base_url="https://api.example.com", api_key="test-key")
90+
91+
batch = client.v1.repositories("r1", "r2")
92+
assert hasattr(batch, "datasets")
93+
94+
batch_datasets = batch.datasets
95+
assert hasattr(batch_datasets, "list")
96+
assert batch_datasets.repository_ids == ["r1", "r2"]
97+
98+
99+
@pytest.mark.asyncio
100+
async def test_batch_connectors_has_files_and_runs():
101+
"""Test batch connectors expose .files and .runs with .list() methods."""
102+
client = Client(base_url="https://api.example.com", api_key="test-key")
103+
104+
batch = client.v1.connectors("c1", "c2")
105+
assert hasattr(batch, "files")
106+
assert hasattr(batch, "runs")
107+
108+
batch_files = batch.files
109+
assert hasattr(batch_files, "list")
110+
assert batch_files.connector_ids == ["c1", "c2"]
111+
112+
batch_runs = batch.runs
113+
assert hasattr(batch_runs, "list")
114+
assert batch_runs.connector_ids == ["c1", "c2"]

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,3 +167,6 @@ addopts = [
167167
"--strict-config",
168168
"--showlocals",
169169
]
170+
171+
[dependency-groups]
172+
dev = []

0 commit comments

Comments
 (0)