Skip to content
This repository was archived by the owner on Sep 12, 2024. It is now read-only.

Commit 84998d6

Browse files
committed
[HISTORY]: Sync Redis and Collection
1 parent a0d9999 commit 84998d6

File tree

2 files changed

+463
-6
lines changed

2 files changed

+463
-6
lines changed

jaclang_jaseci/jaseci/datasources/collection.py

Lines changed: 307 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import (
55
Any,
66
AsyncGenerator,
7+
Generator,
78
Generic,
89
Iterable,
910
Mapping,
@@ -22,7 +23,20 @@
2223
AsyncIOMotorLatentCommandCursor,
2324
)
2425

25-
from pymongo import DeleteMany, DeleteOne, IndexModel, InsertOne, UpdateMany, UpdateOne
26+
from pymongo import (
27+
DeleteMany,
28+
DeleteOne,
29+
IndexModel,
30+
InsertOne,
31+
MongoClient,
32+
UpdateMany,
33+
UpdateOne,
34+
)
35+
from pymongo.client_session import ClientSession
36+
from pymongo.collection import Collection as PyMongoCollection
37+
from pymongo.command_cursor import CommandCursor
38+
from pymongo.cursor import Cursor
39+
from pymongo.database import Database
2640
from pymongo.results import (
2741
BulkWriteResult,
2842
DeleteResult,
@@ -46,6 +60,298 @@ class Collection(Generic[T]):
4660
# ---------- Child Properties ---------- #
4761
##########################################
4862

63+
# Collection Name
64+
__collection__: str | None = None
65+
# Singleton Collection Instance
66+
__collection_obj__: PyMongoCollection | None = None
67+
68+
# Custom Index Declaration
69+
__indexes__: list[dict] = []
70+
__default_indexes__: list[dict] = []
71+
72+
# Transient Field List
73+
__excluded__: list = []
74+
# Converted Mapping of Transient Fields
75+
__excluded_obj__: Mapping[str, Any] | None = None
76+
77+
##########################################
78+
# ---------- Parent Properties --------- #
79+
##########################################
80+
81+
# Singleton client instance
82+
__client__: MongoClient | None = None
83+
# Singleton database instance
84+
__database__: Database | None = None
85+
86+
@staticmethod
87+
def apply_indexes() -> None:
88+
"""Apply Indexes."""
89+
queue: list[type[Collection]] = Collection.__subclasses__()
90+
while queue:
91+
cls = queue.pop(-1)
92+
93+
if scls := cls.__subclasses__():
94+
queue.extend(scls)
95+
96+
if cls.__collection__ is None:
97+
continue
98+
99+
if cls.__excluded__:
100+
excl_obj = {}
101+
for excl in cls.__excluded__:
102+
excl_obj[excl] = False
103+
cls.__excluded_obj__ = excl_obj
104+
105+
idxs = []
106+
if cls.__default_indexes__:
107+
for idx in cls.__default_indexes__:
108+
idxs.append(IndexModel(**idx))
109+
110+
if cls.__indexes__:
111+
for idx in cls.__indexes__:
112+
idxs.append(IndexModel(**idx))
113+
114+
if idxs:
115+
cls.collection().create_indexes(idxs)
116+
117+
@classmethod
118+
def __document__(cls, doc: Mapping[str, Any]) -> T:
119+
"""
120+
Return parsed version of document.
121+
122+
This the default parser after getting a single document.
123+
You may override this to specify how/which class it will be casted/based.
124+
"""
125+
return cast(T, doc)
126+
127+
@classmethod
128+
def __documents__(cls, docs: Cursor) -> Generator[T, None]:
129+
"""
130+
Return parsed version of multiple documents.
131+
132+
This the default parser after getting a list of documents.
133+
You may override this to specify how/which class it will be casted/based.
134+
"""
135+
return (cls.__document__(doc) for doc in docs)
136+
137+
@staticmethod
138+
def get_client() -> MongoClient:
139+
"""Return pymongo.database.Database for mongodb connection."""
140+
if not isinstance(Collection.__client__, MongoClient):
141+
Collection.__client__ = MongoClient(
142+
getenv(
143+
"DATABASE_HOST",
144+
"mongodb://localhost/?retryWrites=true&w=majority",
145+
),
146+
server_api=ServerApi("1"),
147+
)
148+
149+
return Collection.__client__
150+
151+
@staticmethod
152+
def get_session() -> ClientSession:
153+
"""Return pymongo.client_session.ClientSession used for mongodb transactional operations."""
154+
return await Collection.get_client().start_session()
155+
156+
@staticmethod
157+
def get_database() -> Database:
158+
"""Return pymongo.database.Database for database connection based from current client connection."""
159+
if not isinstance(Collection.__database__, Database):
160+
Collection.__database__ = Collection.get_client().get_database(
161+
getenv("DATABASE_NAME", "jaseci")
162+
)
163+
164+
return Collection.__database__
165+
166+
@staticmethod
167+
def get_collection(collection: str) -> PyMongoCollection:
168+
"""Return pymongo.collection.Collection for collection connection based from current database connection."""
169+
return Collection.get_database().get_collection(collection)
170+
171+
@classmethod
172+
def collection(cls) -> PyMongoCollection:
173+
"""Return pymongo.collection.Collection for collection connection based from attribute of it's child class."""
174+
if not isinstance(cls.__collection_obj__, PyMongoCollection):
175+
cls.__collection_obj__ = cls.get_collection(
176+
getattr(cls, "__collection__", None) or cls.__name__.lower()
177+
)
178+
179+
return cls.__collection_obj__
180+
181+
@classmethod
182+
def insert_one(
183+
cls,
184+
doc: dict,
185+
session: ClientSession | None = None,
186+
**kwargs: Any, # noqa: ANN401
187+
) -> InsertOneResult:
188+
"""Insert single document and return the inserted id."""
189+
return cls.collection().insert_one(doc, session=session, **kwargs)
190+
191+
@classmethod
192+
def insert_many(
193+
cls,
194+
docs: list[dict],
195+
session: ClientSession | None = None,
196+
**kwargs: Any, # noqa: ANN401
197+
) -> InsertManyResult:
198+
"""Insert multiple documents and return the inserted ids."""
199+
return cls.collection().insert_many(docs, session=session, **kwargs)
200+
201+
@classmethod
202+
def update_one(
203+
cls,
204+
filter: dict,
205+
update: dict,
206+
session: ClientSession | None = None,
207+
**kwargs: Any, # noqa: ANN401
208+
) -> UpdateResult:
209+
"""Update single document and return if it's modified or not."""
210+
return cls.collection().update_one(filter, update, session=session, **kwargs)
211+
212+
@classmethod
213+
def update_many(
214+
cls,
215+
filter: dict,
216+
update: dict,
217+
session: ClientSession | None = None,
218+
**kwargs: Any, # noqa: ANN401
219+
) -> UpdateResult:
220+
"""Update multiple documents and return how many docs are modified."""
221+
return cls.collection().update_many(filter, update, session=session, **kwargs)
222+
223+
@classmethod
224+
def update_by_id(
225+
cls,
226+
id: str | ObjectId,
227+
update: dict,
228+
session: ClientSession | None = None,
229+
**kwargs: Any, # noqa: ANN401
230+
) -> UpdateResult:
231+
"""Update single document via ID and return if it's modified or not."""
232+
return cls.update_one({"_id": ObjectId(id)}, update, session, **kwargs)
233+
234+
@classmethod
235+
def find(
236+
cls,
237+
filter: Mapping[str, Any] | None = None,
238+
projection: Mapping[str, Any] | Iterable[str] | None = None,
239+
session: ClientSession | None = None,
240+
**kwargs: Any, # noqa: ANN401
241+
) -> Generator[T, None]:
242+
"""Retrieve multiple documents."""
243+
if projection is None:
244+
projection = cls.__excluded_obj__
245+
246+
docs = cls.collection().find(filter, projection, session=session, **kwargs)
247+
return cls.__documents__(docs)
248+
249+
@classmethod
250+
def find_one(
251+
cls,
252+
filter: Mapping[str, Any] | None = None,
253+
projection: Mapping[str, Any] | Iterable[str] | None = None,
254+
session: ClientSession | None = None,
255+
**kwargs: Any, # noqa: ANN401
256+
) -> T | None:
257+
"""Retrieve single document from db."""
258+
if projection is None:
259+
projection = cls.__excluded_obj__
260+
261+
if ops := cls.collection().find_one(
262+
filter, projection, session=session, **kwargs
263+
):
264+
return cls.__document__(ops)
265+
return None
266+
267+
@classmethod
268+
def find_by_id(
269+
cls,
270+
id: str | ObjectId,
271+
projection: Mapping[str, Any] | Iterable[str] | None = None,
272+
session: ClientSession | None = None,
273+
**kwargs: Any, # noqa: ANN401
274+
) -> T | None:
275+
"""Retrieve single document via ID."""
276+
return cls.find_one(
277+
{"_id": ObjectId(id)}, projection, session=session, **kwargs
278+
)
279+
280+
@classmethod
281+
def delete(
282+
cls,
283+
filter: dict,
284+
session: ClientSession | None = None,
285+
**kwargs: Any, # noqa: ANN401
286+
) -> DeleteResult:
287+
"""Delete document/s via filter and return how many documents are deleted."""
288+
return cls.collection().delete_many(filter, session=session, **kwargs)
289+
290+
@classmethod
291+
def delete_one(
292+
cls,
293+
filter: dict,
294+
session: ClientSession | None = None,
295+
**kwargs: Any, # noqa: ANN401
296+
) -> DeleteResult:
297+
"""Delete single document via filter and return if it's deleted or not."""
298+
return cls.collection().delete_one(filter, session=session, **kwargs)
299+
300+
@classmethod
301+
def delete_by_id(
302+
cls,
303+
id: str | ObjectId,
304+
session: ClientSession | None = None,
305+
**kwargs: Any, # noqa: ANN401
306+
) -> DeleteResult:
307+
"""Delete single document via ID and return if it's deleted or not."""
308+
return cls.delete_one({"_id": ObjectId(id)}, session, **kwargs)
309+
310+
@classmethod
311+
def bulk_write(
312+
cls,
313+
ops: list[InsertOne | DeleteMany | DeleteOne | UpdateMany | UpdateOne],
314+
ordered: bool = True,
315+
session: ClientSession | None = None,
316+
**kwargs: Any, # noqa: ANN401
317+
) -> BulkWriteResult:
318+
"""Bulk write operations."""
319+
return cls.collection().bulk_write(
320+
ops, ordered=ordered, session=session, **kwargs
321+
)
322+
323+
@classmethod
324+
def count(
325+
cls,
326+
filter: dict,
327+
session: ClientSession | None = None,
328+
**kwargs: Any, # noqa: ANN401
329+
) -> int:
330+
"""Bulk write operations."""
331+
return cls.collection().count_documents(filter, session, **kwargs)
332+
333+
@classmethod
334+
def aggregate(
335+
cls,
336+
pipeline: list[dict],
337+
session: ClientSession | None = None,
338+
**kwargs: Any, # noqa: ANN401
339+
) -> CommandCursor:
340+
"""Bulk write operations."""
341+
return cls.collection().aggregate(pipeline, session, **kwargs)
342+
343+
344+
class AsyncCollection(Generic[T]):
345+
"""
346+
Base collection interface.
347+
348+
This interface use for connecting to mongodb.
349+
"""
350+
351+
##########################################
352+
# ---------- Child Properties ---------- #
353+
##########################################
354+
49355
# Collection Name
50356
__collection__: str | None = None
51357
# Singleton Collection Instance

0 commit comments

Comments
 (0)