-
Notifications
You must be signed in to change notification settings - Fork 321
Stream document to db as embedding is occurring to reduce memory consumption #214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…ration and DB insertion. This provides better memory efficiency by immediately inserting embeddings as they're generated.
| # hint: 750 seems to be a good balance between speed and memory usage for text-embedding-3-small embeddings | ||
| EMBEDDING_BATCH_SIZE = int(get_env_variable("EMBEDDING_BATCH_SIZE", "0")) | ||
| # when EMBEDDING_BATCH_SIZE is set, this controls the max size the queue of batches to process can contain at any one time | ||
| EMBEDIING_MAX_QUEUE_SIZE = int(get_env_variable("EMBEDIING_MAX_QUEUE_SIZE", "3")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo?
|
Thanks for the PR, will review soon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements a streaming approach to document embedding and vector database insertion to reduce memory consumption when processing large files. Instead of embedding entire documents in memory before inserting them, the code now processes documents in configurable batches, inserting each batch as embeddings are generated.
Key changes:
- Introduces configurable batch processing via
EMBEDDING_BATCH_SIZEenvironment variable - Implements async producer-consumer pipeline for concurrent embedding generation and database insertion
- Adds rollback mechanisms to ensure no partial documents remain in the database on failure
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 18 comments.
| File | Description |
|---|---|
| app/config.py | Adds new configuration variables for embedding batch size and queue size to control memory usage |
| app/routes/document_routes.py | Implements async pipeline with producer-consumer pattern for streaming embeddings, adds batched sync fallback, and updates main insertion logic to use new streaming approach when batch size is configured |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review for a chance to win a $100 gift card. Take the survey.
| logger.error(f"Batch {batch_idx + 1} failed: {batch_error}") | ||
|
|
||
| # Rollback entire file from vector store | ||
| if all_ids: # If we have some successful inserts |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The rollback logic checks for all_ids to determine if any successful inserts occurred, but then attempts to delete by file_id. If individual batch insertions succeeded, all_ids will contain multiple IDs for the same file_id. The deletion should work correctly since it deletes by file_id, but the comment "If we have some successful inserts" is misleading - it should check if any batches succeeded, not just if all_ids is non-empty. Consider clarifying this logic or comment.
| if all_ids: # If we have some successful inserts | |
| if all_ids: # If any batch succeeded (i.e., any chunks for this file were inserted) |
| from app.config import EMBEDDINGS_PROVIDER, init_embeddings, EMBEDDINGS_MODEL | ||
| embedding = init_embeddings(EMBEDDINGS_PROVIDER, EMBEDDINGS_MODEL) |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The embedding function is initialized inside the conditional block for every document upload when EMBEDDING_BATCH_SIZE > 0. This import and initialization could be expensive and should ideally be done once at module level or cached. Consider moving the import to the top of the file and potentially caching the embedding function initialization.
| temp_vector_store = AsyncPgVector( | ||
| connection_string=vector_store.connection_string, | ||
| embedding_function=request_embedding_func, | ||
| collection_name=vector_store.collection_name, | ||
| ) |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Creating a new AsyncPgVector instance for each document upload could be inefficient. This creates a new database connection and configuration for each upload. Consider whether the existing vector_store could be used with a request-specific embedding function wrapper, or if connection pooling is properly configured to handle this pattern efficiently.
| # hint: 750 seems to be a good balance between speed and memory usage for text-embedding-3-small embeddings | ||
| EMBEDDING_BATCH_SIZE = int(get_env_variable("EMBEDDING_BATCH_SIZE", "0")) | ||
| # when EMBEDDING_BATCH_SIZE is set, this controls the max size the queue of batches to process can contain at any one time | ||
| EMBEDIING_MAX_QUEUE_SIZE = int(get_env_variable("EMBEDIING_MAX_QUEUE_SIZE", "3")) |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in variable name: EMBEDIING_MAX_QUEUE_SIZE should be EMBEDDING_MAX_QUEUE_SIZE (missing 'D' in 'EMBEDDING'). This constant name should match the corrected import.
| EMBEDIING_MAX_QUEUE_SIZE = int(get_env_variable("EMBEDIING_MAX_QUEUE_SIZE", "3")) | |
| EMBEDDING_MAX_QUEUE_SIZE = int(get_env_variable("EMBEDDING_MAX_QUEUE_SIZE", "3")) |
| else: | ||
| ids = vector_store.add_documents(docs, ids=[file_id] * len(documents)) | ||
| # asynchronously embed the file and insert into vector store as it is embedding | ||
| # to lesson memory impact and speed up slightly as the majority of the document |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: "lesson" should be "lessen" (meaning to reduce or decrease).
| # to lesson memory impact and speed up slightly as the majority of the document | |
| # to lessen memory impact and speed up slightly as the majority of the document |
| vector_store | ||
| ) -> List[str]: | ||
| """ | ||
| Fallback batched processing for sync vector stores. |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The function docstring is incomplete. It should document the parameters (documents, file_id, request_embedding_func, vector_store) and return value (List[str]). Consider adding a complete docstring with parameter descriptions, return value, and potential exceptions raised.
| Fallback batched processing for sync vector stores. | |
| Processes documents in batches and adds them to a synchronous vector store. | |
| Args: | |
| documents (List[Document]): The list of Document objects to process and add. | |
| file_id (str): The identifier for the file associated with these documents. | |
| request_embedding_func: The embedding function to use for processing documents. | |
| vector_store: The synchronous vector store instance to which documents are added. | |
| Returns: | |
| List[str]: A list of IDs corresponding to the successfully added documents. | |
| Raises: | |
| Exception: If any batch fails to process, the exception is raised after attempting rollback. |
| # Wait for both to complete | ||
| # return_exceptions = false will force catching the exception below in this try | ||
| results = await asyncio.gather(producer_task, consumer_task, return_exceptions=False) | ||
|
|
||
| # Get final results | ||
| batch_results = await results_queue.get() |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment says "return_exceptions = false will force catching the exception below" but asyncio.gather() is called with return_exceptions=False. When return_exceptions=False, exceptions are propagated immediately and will be caught by the outer try-except block. However, if an exception occurs, the code on line 446 (batch_results = await results_queue.get()) may never execute because the exception would have already been raised. Consider restructuring this logic to ensure proper error handling flow.
|
|
||
| finally: | ||
| # Cleanup | ||
| del temp_vector_store, request_embedding_func |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The explicit del statement is unnecessary. These objects will be automatically garbage collected when they go out of scope. This explicit deletion provides no benefit and can be removed for cleaner code.
| del temp_vector_store, request_embedding_func |
| if not consumer_task.done(): | ||
| consumer_task.cancel() | ||
| if not producer_task.done(): | ||
| producer_task.cancel() |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After cancelling tasks, you should await them to ensure proper cleanup. Task cancellation in asyncio requires awaiting the cancelled task to handle the CancelledError exception. Add await asyncio.gather(consumer_task, producer_task, return_exceptions=True) after the cancellation to ensure proper cleanup.
| producer_task.cancel() | |
| producer_task.cancel() | |
| # Await cancelled tasks to ensure proper cleanup | |
| await asyncio.gather(consumer_task, producer_task, return_exceptions=True) |
| try: | ||
| # Wait for both to complete | ||
| # return_exceptions = false will force catching the exception below in this try | ||
| results = await asyncio.gather(producer_task, consumer_task, return_exceptions=False) |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable results is not used.
| results = await asyncio.gather(producer_task, consumer_task, return_exceptions=False) | |
| await asyncio.gather(producer_task, consumer_task, return_exceptions=False) |
|
@MarcAmick please fix the typos and address each Copilot comment |
Currently RAG creates embeddings for the file holding them all in memory then bulk inserts them all into the vectordb. When embedding very large files this consumes a vast amount of memory. When memory limits are set in an AKS/EKS environment the pod is more likely to hit the memory limit causing it to crash and restart. This is largely solved by changing the logic such that it breaks the file up into chucks and embeds each separately, and asynchronously bulk inserting each chunk individually as the embedding process completes each chunk. This way less memory is consumed because the process is clearing memory of each chunk of embedding once completed. It also may slightly increase the speed because the database has already inserted most of the document by the time the last bulk insert is run so the last bulk insert is much smaller and completes quickly. If any of the chunks result in some sort of error, the entire document is removed from the db so there is no chance of a partial file in the vectordb.