Skip to content

Conversation

@Prajna1999
Copy link
Collaborator

@Prajna1999 Prajna1999 commented Dec 2, 2025

Summary

Target issue is #460

Adding threadpool based concurrency model to allow max 4 worker threads to take up the insert (input, output) pair instead of one blocking process doing all the heavy lifting. This enhances the endpoints ability to insert CSV files upto 1000 line items within 60 seconds.

Chore: Also added a celery doc to explain how CELERY implemented in the repo. Largely unrelated with the core task.

Checklist

Before submitting a pull request, please ensure that you mark these task.

  • Ran fastapi run --reload app/main.py or docker compose up in the repository root and test.
  • If you've fixed a bug or added code that is tested and has test cases.

Summary by CodeRabbit

  • Documentation

    • Added a comprehensive guide for the asynchronous task processing system covering architecture, configuration, scheduling, worker operation, monitoring, error handling, and best practices.
  • Improvements

    • Faster dataset uploads via concurrent processing, reducing overall synchronization time and improving reliability.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 2, 2025

Walkthrough

Adds a new Celery architecture and operations guide and refactors Langfuse dataset uploading to use concurrent uploads via ThreadPoolExecutor with aggregated success counting and a single final flush.

Changes

Cohort / File(s) Summary
Documentation
backend/CELERY_OVERVIEW.md
New comprehensive Celery overview describing architecture (RabbitMQ broker, Redis result backend, Beat), file layout, task routing/queues, worker/beat management, monitoring, error-handling/reliability practices, dynamic task execution pattern, CLI/examples and operational guidance.
Concurrent Upload Optimization
backend/app/crud/evaluations/langfuse.py
Replaces sequential per-item uploads with a ThreadPoolExecutor-based concurrent upload flow: adds an upload_item helper, builds upload_tasks, uses as_completed to count successes, removes per-item flushes and keeps a single final flush; adds necessary concurrency imports.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • Review ThreadPoolExecutor usage and potential shared-state/thread-safety in upload_dataset_to_langfuse.
  • Validate error handling in upload_item and correctness of success counting with as_completed.
  • Confirm final flush semantics with Langfuse API and that concurrency does not violate API expectations.

Possibly related PRs

Suggested labels

enhancement

Suggested reviewers

  • avirajsingh7

Poem

🐰 Hop, hop — the tasks align in rows,
Threads bustle where the dataset grows,
Celery hums, schedulers keep time,
Workers leap — each job a rhyme.
I nibble code and twitch my nose. 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding threadpool-based concurrency to speed up Langfuse dataset uploads, which aligns with the core modification in backend/app/crud/evaluations/langfuse.py.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch refactor/lf-dataset-upload

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cc015ec and 08225a9.

📒 Files selected for processing (1)
  • backend/app/crud/evaluations/langfuse.py (2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use type hints in Python code (Python 3.11+ project)

Files:

  • backend/app/crud/evaluations/langfuse.py
backend/app/crud/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Implement database access operations in backend/app/crud/

Files:

  • backend/app/crud/evaluations/langfuse.py
🧬 Code graph analysis (1)
backend/app/crud/evaluations/langfuse.py (1)
backend/app/core/langfuse/langfuse.py (1)
  • flush (108-109)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (7)
backend/app/crud/evaluations/langfuse.py (7)

12-12: LGTM!

The import of ThreadPoolExecutor and as_completed from concurrent.futures is appropriate for implementing concurrent uploads.


251-270: Add return type annotation.

Per coding guidelines, type hints should be used. The inner function is missing the return type annotation.

-    def upload_item(item: dict[str, str], duplicate_num: int) -> bool:
+    def upload_item(item: dict[str, str], duplicate_num: int) -> bool:

Actually, looking again, the return type -> bool is already present on line 251. Disregard this.


251-270: LGTM!

The upload_item helper function is well-structured with proper type hints, error handling, and logging. Catching exceptions and returning False allows the concurrent execution to continue processing other items.


276-280: LGTM!

Precomputing upload_tasks as a list of tuples is a clean approach that separates task generation from concurrent execution.


299-299: LGTM!

Direct access to dataset.id is appropriate here since langfuse.create_dataset() is expected to return a valid dataset object, and any failure would be caught by the outer exception handler.


296-297: LGTM!

The final flush() call after all concurrent uploads complete ensures any batched operations are persisted. This is the correct pattern for concurrent processing.


284-294: No thread-safety concern — Langfuse SDK is designed for concurrent use.

The Langfuse Python SDK is thread-safe and handles concurrent operations safely. Since the code does not pass explicit IDs to create_dataset_item(), Langfuse auto-generates unique IDs server-side, eliminating ID conflict risks. The ThreadPoolExecutor with max_workers=4 is appropriate here.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
backend/app/crud/evaluations/langfuse.py (3)

251-270: Add return type annotation to inner function.

Per coding guidelines, type hints should be used throughout. The upload_item helper is missing its return type annotation.

-    def upload_item(item: dict[str, str], duplicate_num: int) -> bool:
+    def upload_item(item: dict[str, str], duplicate_num: int) -> bool:

The current signature already has the return type — disregard if Black reformats it correctly. Otherwise, ensure the -> bool annotation is preserved.


285-285: Consider making max_workers configurable.

The hardcoded max_workers=4 may not be optimal for all environments. Consider exposing this as a configuration parameter or deriving it from settings (similar to CELERY_WORKER_CONCURRENCY).

+from app.core.config import settings
+
+# In function or at module level
+MAX_UPLOAD_WORKERS = getattr(settings, "LANGFUSE_UPLOAD_WORKERS", 4)
+
 # Then use:
-        with ThreadPoolExecutor(max_workers=4) as executor:
+        with ThreadPoolExecutor(max_workers=MAX_UPLOAD_WORKERS) as executor:

287-290: Simplify by using executor.map or list comprehension with submit.

The current pattern of appending futures to a list in a loop can be simplified.

-            futures = []
-            for item, dup_num in upload_tasks:
-                future = executor.submit(upload_item, item, dup_num)
-                futures.append(future)
+            futures = [executor.submit(upload_item, item, dup_num) for item, dup_num in upload_tasks]
backend/CELERY_OVERVIEW.md (1)

15-25: Add language specifier to fenced code block.

The file structure code block is missing a language identifier, which triggers markdown lint warning MD040. Use text or plaintext for directory listings.

-```
+```text
 app/celery/
 ├── __init__.py              # Package initialization, exports celery_app
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 17f427e and cc015ec.

📒 Files selected for processing (2)
  • backend/CELERY_OVERVIEW.md (1 hunks)
  • backend/app/crud/evaluations/langfuse.py (2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use type hints in Python code (Python 3.11+ project)

Files:

  • backend/app/crud/evaluations/langfuse.py
backend/app/crud/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Implement database access operations in backend/app/crud/

Files:

  • backend/app/crud/evaluations/langfuse.py
🧠 Learnings (2)
📚 Learning: 2025-10-08T12:05:01.317Z
Learnt from: CR
Repo: ProjectTech4DevAI/ai-platform PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-10-08T12:05:01.317Z
Learning: Applies to backend/app/celery/**/*.py : Keep Celery app configuration (priority queues, beat scheduler, workers) under backend/app/celery/

Applied to files:

  • backend/CELERY_OVERVIEW.md
📚 Learning: 2025-10-08T12:05:01.317Z
Learnt from: CR
Repo: ProjectTech4DevAI/ai-platform PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-10-08T12:05:01.317Z
Learning: Applies to backend/app/celery/tasks/**/*.py : Define Celery tasks under backend/app/celery/tasks/

Applied to files:

  • backend/CELERY_OVERVIEW.md
🧬 Code graph analysis (1)
backend/app/crud/evaluations/langfuse.py (1)
backend/app/core/langfuse/langfuse.py (1)
  • flush (108-109)
🪛 GitHub Actions: AI Platform CI
backend/app/crud/evaluations/langfuse.py

[error] 1-1: Trailing whitespace detected and removed by pre-commit hook 'trailing-whitespace'. Re-run pre-commit to finalize changes.


[error] 1-1: Black formatter reformatted the file. Re-run pre-commit to commit the changes.

🪛 LanguageTool
backend/CELERY_OVERVIEW.md

[uncategorized] ~171-~171: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ...Identical to high priority but uses the low priority queue get_task_status() (lines 7...

(EN_COMPOUND_ADJECTIVE_INTERNAL)


[grammar] ~555-~555: Use a hyphen to join words.
Context: ...vice** (app/services/llm/) - Uses high priority queue for real-time API calls -...

(QB_NEW_EN_HYPHEN)

🪛 markdownlint-cli2 (0.18.1)
backend/CELERY_OVERVIEW.md

15-15: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🔇 Additional comments (2)
backend/CELERY_OVERVIEW.md (1)

1-588: Documentation looks comprehensive and well-structured.

This is a valuable addition that clearly explains the Celery architecture, queue priorities, task routing, and best practices. The end-to-end flow example and integration patterns are particularly helpful for onboarding developers. Based on learnings, the file structure aligns with project conventions for Celery configuration under backend/app/celery/.

backend/app/crud/evaluations/langfuse.py (1)

283-295: Langfuse client thread-safety is properly handled.

The Langfuse Python SDK is thread-safe for concurrent create_dataset_item calls when using a single client instance, which your code does. The internal queue and background worker handle batching, and langfuse.flush() at line 298 correctly ensures all items are delivered before the function returns.

Comment on lines +292 to 295
for future in as_completed(futures):
upload_successful = future.result()
if upload_successful:
total_uploaded += 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle exceptions from future.result() to avoid silent failures.

If upload_item raises an unexpected exception (not caught internally), future.result() will propagate it here and abort the loop. Consider wrapping in a try-except to ensure all futures are processed and failures are logged.

             for future in as_completed(futures):
-                upload_successful = future.result()
-                if upload_successful:
-                    total_uploaded += 1
+                try:
+                    if future.result():
+                        total_uploaded += 1
+                except Exception as e:
+                    logger.error(f"[upload_dataset_to_langfuse] Unexpected error in upload task | {e}")
🤖 Prompt for AI Agents
In backend/app/crud/evaluations/langfuse.py around lines 292 to 295, the loop
calling future.result() does not catch exceptions from the completed futures
which can abort processing; wrap the future.result() call in a try-except block,
log the exception (with context such as which item or index failed), and treat
that future as a failed upload (do not increment total_uploaded). Ensure the
except block continues the loop so all futures are processed and consider
returning/recording failure details for higher-level handling.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants