Skip to content
74 changes: 35 additions & 39 deletions backend/app/api/routes/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
list_datasets,
start_evaluation_batch,
upload_csv_to_object_store,
upload_dataset_to_langfuse_from_csv,
upload_dataset_to_langfuse,
)
from app.crud.evaluations import list_evaluation_runs as list_evaluation_runs_crud
from app.crud.evaluations.dataset import delete_dataset as delete_dataset_crud
Expand All @@ -41,6 +41,19 @@
router = APIRouter(tags=["evaluation"])


def _dataset_to_response(dataset) -> DatasetUploadResponse:
"""Convert a dataset model to a DatasetUploadResponse."""
return DatasetUploadResponse(
dataset_id=dataset.id,
dataset_name=dataset.name,
total_items=dataset.dataset_metadata.get("total_items_count", 0),
original_items=dataset.dataset_metadata.get("original_items_count", 0),
duplication_factor=dataset.dataset_metadata.get("duplication_factor", 1),
langfuse_dataset_id=dataset.langfuse_dataset_id,
object_store_url=dataset.object_store_url,
)


def sanitize_dataset_name(name: str) -> str:
"""
Sanitize dataset name for Langfuse compatibility.
Expand Down Expand Up @@ -164,24 +177,32 @@ async def upload_dataset(
try:
csv_text = csv_content.decode("utf-8")
csv_reader = csv.DictReader(io.StringIO(csv_text))
csv_reader.fieldnames = [name.strip() for name in csv_reader.fieldnames]

# Validate headers
if (
"question" not in csv_reader.fieldnames
or "answer" not in csv_reader.fieldnames
):
if not csv_reader.fieldnames:
raise HTTPException(status_code=422, detail="CSV file has no headers")

# Normalize headers for case-insensitive matching
clean_headers = {
field.strip().lower(): field for field in csv_reader.fieldnames
}

# Validate required headers (case-insensitive)
if "question" not in clean_headers or "answer" not in clean_headers:
raise HTTPException(
status_code=422,
detail=f"CSV must contain 'question' and 'answer' columns. "
detail=f"CSV must contain 'question' and 'answer' columns "
f"Found columns: {csv_reader.fieldnames}",
)

# Get the actual column names from the CSV
question_col = clean_headers["question"]
answer_col = clean_headers["answer"]

# Count original items
original_items = []
for row in csv_reader:
question = row.get("question", "").strip()
answer = row.get("answer", "").strip()
question = row.get(question_col, "").strip()
answer = row.get(answer_col, "").strip()
if question and answer:
original_items.append({"question": question, "answer": answer})

Expand Down Expand Up @@ -237,9 +258,9 @@ async def upload_dataset(
)

# Upload to Langfuse
langfuse_dataset_id, _ = upload_dataset_to_langfuse_from_csv(
langfuse_dataset_id, _ = upload_dataset_to_langfuse(
langfuse=langfuse,
csv_content=csv_content,
items=original_items,
dataset_name=dataset_name,
duplication_factor=duplication_factor,
)
Expand Down Expand Up @@ -316,24 +337,7 @@ def list_datasets_endpoint(
offset=offset,
)

# Convert to response format
response = []
for dataset in datasets:
response.append(
DatasetUploadResponse(
dataset_id=dataset.id,
dataset_name=dataset.name,
total_items=dataset.dataset_metadata.get("total_items_count", 0),
original_items=dataset.dataset_metadata.get("original_items_count", 0),
duplication_factor=dataset.dataset_metadata.get(
"duplication_factor", 1
),
langfuse_dataset_id=dataset.langfuse_dataset_id,
object_store_url=dataset.object_store_url,
)
)

return response
return [_dataset_to_response(dataset) for dataset in datasets]


@router.get(
Expand Down Expand Up @@ -364,15 +368,7 @@ def get_dataset(
status_code=404, detail=f"Dataset {dataset_id} not found or not accessible"
)

return DatasetUploadResponse(
dataset_id=dataset.id,
dataset_name=dataset.name,
total_items=dataset.dataset_metadata.get("total_items_count", 0),
original_items=dataset.dataset_metadata.get("original_items_count", 0),
duplication_factor=dataset.dataset_metadata.get("duplication_factor", 1),
langfuse_dataset_id=dataset.langfuse_dataset_id,
object_store_url=dataset.object_store_url,
)
return _dataset_to_response(dataset)


@router.delete(
Expand Down
4 changes: 2 additions & 2 deletions backend/app/crud/evaluations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from app.crud.evaluations.langfuse import (
create_langfuse_dataset_run,
update_traces_with_cosine_scores,
upload_dataset_to_langfuse_from_csv,
upload_dataset_to_langfuse,
)
from app.crud.evaluations.processing import (
check_and_process_evaluation,
Expand Down Expand Up @@ -62,5 +62,5 @@
# Langfuse
"create_langfuse_dataset_run",
"update_traces_with_cosine_scores",
"upload_dataset_to_langfuse_from_csv",
"upload_dataset_to_langfuse",
]
147 changes: 16 additions & 131 deletions backend/app/crud/evaluations/core.py
Original file line number Diff line number Diff line change
@@ -1,138 +1,13 @@
import csv
import io
import logging

from fastapi import HTTPException
from sqlmodel import Session, select

from app.core.util import now
from app.models import EvaluationRun, UserProjectOrg
from app.models.evaluation import DatasetUploadResponse
from app.utils import get_langfuse_client
from app.models import EvaluationRun

logger = logging.getLogger(__name__)


async def upload_dataset_to_langfuse(
csv_content: bytes,
dataset_name: str,
dataset_id: int,
duplication_factor: int,
_session: Session,
_current_user: UserProjectOrg,
) -> tuple[bool, DatasetUploadResponse | None, str | None]:
"""
Upload a CSV dataset to Langfuse with duplication for flakiness testing.

Args:
csv_content: Raw CSV file content as bytes
dataset_name: Name for the dataset in Langfuse
dataset_id: Database ID of the created dataset
duplication_factor: Number of times to duplicate each item (default 5)
_session: Database session
_current_user: Current user organization

Returns:
Tuple of (success, dataset_response, error_message)
"""
try:
# Get Langfuse client
try:
langfuse = get_langfuse_client(
session=_session,
org_id=_current_user.organization_id,
project_id=_current_user.project_id,
)
except HTTPException as http_exc:
return False, None, http_exc.detail

# Parse CSV content
csv_text = csv_content.decode("utf-8")
csv_reader = csv.DictReader(io.StringIO(csv_text))

# Validate CSV headers
if (
"question" not in csv_reader.fieldnames
or "answer" not in csv_reader.fieldnames
):
return (
False,
None,
"CSV must contain 'question' and 'answer' columns. "
f"Found columns: {csv_reader.fieldnames}",
)

# Read all rows from CSV
original_items = []
for row in csv_reader:
question = row.get("question", "").strip()
answer = row.get("answer", "").strip()

if not question or not answer:
logger.warning(f"Skipping row with empty question or answer: {row}")
continue

original_items.append({"question": question, "answer": answer})

if not original_items:
return False, None, "No valid items found in CSV file."

logger.info(
f"Parsed {len(original_items)} items from CSV. "
f"Will duplicate {duplication_factor}x for a total of {len(original_items) * duplication_factor} items."
)

# Create or get dataset in Langfuse
dataset = langfuse.create_dataset(name=dataset_name)

# Upload items with duplication
total_uploaded = 0
for item in original_items:
# Duplicate each item N times
for duplicate_num in range(duplication_factor):
try:
langfuse.create_dataset_item(
dataset_name=dataset_name,
input={"question": item["question"]},
expected_output={"answer": item["answer"]},
metadata={
"original_question": item["question"],
"duplicate_number": duplicate_num + 1,
"duplication_factor": duplication_factor,
},
)
total_uploaded += 1
except Exception as e:
logger.error(
f"Failed to upload item (duplicate {duplicate_num + 1}): {item['question'][:50]}... Error: {e}"
)

# Flush to ensure all items are uploaded
langfuse.flush()

logger.info(
f"Successfully uploaded {total_uploaded} items to dataset '{dataset_name}' "
f"({len(original_items)} original × {duplication_factor} duplicates)"
)

return (
True,
DatasetUploadResponse(
dataset_id=dataset_id,
dataset_name=dataset_name,
total_items=total_uploaded,
original_items=len(original_items),
duplication_factor=duplication_factor,
langfuse_dataset_id=dataset.id if hasattr(dataset, "id") else None,
),
None,
)

except Exception as e:
logger.error(f"Error uploading dataset: {str(e)}", exc_info=True)
return False, None, f"Failed to upload dataset: {str(e)}"


def create_evaluation_run(
session: Session,
run_name: str,
Expand Down Expand Up @@ -170,8 +45,13 @@ def create_evaluation_run(
)

session.add(eval_run)
session.commit()
session.refresh(eval_run)
try:
session.commit()
session.refresh(eval_run)
except Exception as e:
session.rollback()
logger.error(f"Failed to create EvaluationRun: {e}", exc_info=True)
raise

logger.info(f"Created EvaluationRun record: id={eval_run.id}, run_name={run_name}")

Expand Down Expand Up @@ -214,7 +94,7 @@ def list_evaluation_runs(
f"project_id={project_id}"
)

return list(runs)
return runs


def get_evaluation_run_by_id(
Expand Down Expand Up @@ -302,7 +182,12 @@ def update_evaluation_run(

# Persist to database
session.add(eval_run)
session.commit()
session.refresh(eval_run)
try:
session.commit()
session.refresh(eval_run)
except Exception as e:
session.rollback()
logger.error(f"Failed to update EvaluationRun: {e}", exc_info=True)
raise

return eval_run
Loading