Skip to content

Commit f3b8f4d

Browse files
authored
Evaluation: Upload Dataset Improvements (#450)
* cleanup upload * added check for duplicate headers * added check for duplicate headers * cleanup * cleanup * cleanups * cleanups * refactor upload dataset * updated failing testcases
1 parent 1e62db0 commit f3b8f4d

File tree

6 files changed

+130
-319
lines changed

6 files changed

+130
-319
lines changed

backend/app/api/routes/evaluation.py

Lines changed: 35 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
list_datasets,
1818
start_evaluation_batch,
1919
upload_csv_to_object_store,
20-
upload_dataset_to_langfuse_from_csv,
20+
upload_dataset_to_langfuse,
2121
)
2222
from app.crud.evaluations import list_evaluation_runs as list_evaluation_runs_crud
2323
from app.crud.evaluations.dataset import delete_dataset as delete_dataset_crud
@@ -41,6 +41,19 @@
4141
router = APIRouter(tags=["evaluation"])
4242

4343

44+
def _dataset_to_response(dataset) -> DatasetUploadResponse:
45+
"""Convert a dataset model to a DatasetUploadResponse."""
46+
return DatasetUploadResponse(
47+
dataset_id=dataset.id,
48+
dataset_name=dataset.name,
49+
total_items=dataset.dataset_metadata.get("total_items_count", 0),
50+
original_items=dataset.dataset_metadata.get("original_items_count", 0),
51+
duplication_factor=dataset.dataset_metadata.get("duplication_factor", 1),
52+
langfuse_dataset_id=dataset.langfuse_dataset_id,
53+
object_store_url=dataset.object_store_url,
54+
)
55+
56+
4457
def sanitize_dataset_name(name: str) -> str:
4558
"""
4659
Sanitize dataset name for Langfuse compatibility.
@@ -164,24 +177,32 @@ async def upload_dataset(
164177
try:
165178
csv_text = csv_content.decode("utf-8")
166179
csv_reader = csv.DictReader(io.StringIO(csv_text))
167-
csv_reader.fieldnames = [name.strip() for name in csv_reader.fieldnames]
168180

169-
# Validate headers
170-
if (
171-
"question" not in csv_reader.fieldnames
172-
or "answer" not in csv_reader.fieldnames
173-
):
181+
if not csv_reader.fieldnames:
182+
raise HTTPException(status_code=422, detail="CSV file has no headers")
183+
184+
# Normalize headers for case-insensitive matching
185+
clean_headers = {
186+
field.strip().lower(): field for field in csv_reader.fieldnames
187+
}
188+
189+
# Validate required headers (case-insensitive)
190+
if "question" not in clean_headers or "answer" not in clean_headers:
174191
raise HTTPException(
175192
status_code=422,
176-
detail=f"CSV must contain 'question' and 'answer' columns. "
193+
detail=f"CSV must contain 'question' and 'answer' columns "
177194
f"Found columns: {csv_reader.fieldnames}",
178195
)
179196

197+
# Get the actual column names from the CSV
198+
question_col = clean_headers["question"]
199+
answer_col = clean_headers["answer"]
200+
180201
# Count original items
181202
original_items = []
182203
for row in csv_reader:
183-
question = row.get("question", "").strip()
184-
answer = row.get("answer", "").strip()
204+
question = row.get(question_col, "").strip()
205+
answer = row.get(answer_col, "").strip()
185206
if question and answer:
186207
original_items.append({"question": question, "answer": answer})
187208

@@ -237,9 +258,9 @@ async def upload_dataset(
237258
)
238259

239260
# Upload to Langfuse
240-
langfuse_dataset_id, _ = upload_dataset_to_langfuse_from_csv(
261+
langfuse_dataset_id, _ = upload_dataset_to_langfuse(
241262
langfuse=langfuse,
242-
csv_content=csv_content,
263+
items=original_items,
243264
dataset_name=dataset_name,
244265
duplication_factor=duplication_factor,
245266
)
@@ -316,24 +337,7 @@ def list_datasets_endpoint(
316337
offset=offset,
317338
)
318339

319-
# Convert to response format
320-
response = []
321-
for dataset in datasets:
322-
response.append(
323-
DatasetUploadResponse(
324-
dataset_id=dataset.id,
325-
dataset_name=dataset.name,
326-
total_items=dataset.dataset_metadata.get("total_items_count", 0),
327-
original_items=dataset.dataset_metadata.get("original_items_count", 0),
328-
duplication_factor=dataset.dataset_metadata.get(
329-
"duplication_factor", 1
330-
),
331-
langfuse_dataset_id=dataset.langfuse_dataset_id,
332-
object_store_url=dataset.object_store_url,
333-
)
334-
)
335-
336-
return response
340+
return [_dataset_to_response(dataset) for dataset in datasets]
337341

338342

339343
@router.get(
@@ -364,15 +368,7 @@ def get_dataset(
364368
status_code=404, detail=f"Dataset {dataset_id} not found or not accessible"
365369
)
366370

367-
return DatasetUploadResponse(
368-
dataset_id=dataset.id,
369-
dataset_name=dataset.name,
370-
total_items=dataset.dataset_metadata.get("total_items_count", 0),
371-
original_items=dataset.dataset_metadata.get("original_items_count", 0),
372-
duplication_factor=dataset.dataset_metadata.get("duplication_factor", 1),
373-
langfuse_dataset_id=dataset.langfuse_dataset_id,
374-
object_store_url=dataset.object_store_url,
375-
)
371+
return _dataset_to_response(dataset)
376372

377373

378374
@router.delete(

backend/app/crud/evaluations/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from app.crud.evaluations.langfuse import (
2626
create_langfuse_dataset_run,
2727
update_traces_with_cosine_scores,
28-
upload_dataset_to_langfuse_from_csv,
28+
upload_dataset_to_langfuse,
2929
)
3030
from app.crud.evaluations.processing import (
3131
check_and_process_evaluation,
@@ -62,5 +62,5 @@
6262
# Langfuse
6363
"create_langfuse_dataset_run",
6464
"update_traces_with_cosine_scores",
65-
"upload_dataset_to_langfuse_from_csv",
65+
"upload_dataset_to_langfuse",
6666
]

backend/app/crud/evaluations/core.py

Lines changed: 16 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -1,138 +1,13 @@
1-
import csv
2-
import io
31
import logging
42

5-
from fastapi import HTTPException
63
from sqlmodel import Session, select
74

85
from app.core.util import now
9-
from app.models import EvaluationRun, UserProjectOrg
10-
from app.models.evaluation import DatasetUploadResponse
11-
from app.utils import get_langfuse_client
6+
from app.models import EvaluationRun
127

138
logger = logging.getLogger(__name__)
149

1510

16-
async def upload_dataset_to_langfuse(
17-
csv_content: bytes,
18-
dataset_name: str,
19-
dataset_id: int,
20-
duplication_factor: int,
21-
_session: Session,
22-
_current_user: UserProjectOrg,
23-
) -> tuple[bool, DatasetUploadResponse | None, str | None]:
24-
"""
25-
Upload a CSV dataset to Langfuse with duplication for flakiness testing.
26-
27-
Args:
28-
csv_content: Raw CSV file content as bytes
29-
dataset_name: Name for the dataset in Langfuse
30-
dataset_id: Database ID of the created dataset
31-
duplication_factor: Number of times to duplicate each item (default 5)
32-
_session: Database session
33-
_current_user: Current user organization
34-
35-
Returns:
36-
Tuple of (success, dataset_response, error_message)
37-
"""
38-
try:
39-
# Get Langfuse client
40-
try:
41-
langfuse = get_langfuse_client(
42-
session=_session,
43-
org_id=_current_user.organization_id,
44-
project_id=_current_user.project_id,
45-
)
46-
except HTTPException as http_exc:
47-
return False, None, http_exc.detail
48-
49-
# Parse CSV content
50-
csv_text = csv_content.decode("utf-8")
51-
csv_reader = csv.DictReader(io.StringIO(csv_text))
52-
53-
# Validate CSV headers
54-
if (
55-
"question" not in csv_reader.fieldnames
56-
or "answer" not in csv_reader.fieldnames
57-
):
58-
return (
59-
False,
60-
None,
61-
"CSV must contain 'question' and 'answer' columns. "
62-
f"Found columns: {csv_reader.fieldnames}",
63-
)
64-
65-
# Read all rows from CSV
66-
original_items = []
67-
for row in csv_reader:
68-
question = row.get("question", "").strip()
69-
answer = row.get("answer", "").strip()
70-
71-
if not question or not answer:
72-
logger.warning(f"Skipping row with empty question or answer: {row}")
73-
continue
74-
75-
original_items.append({"question": question, "answer": answer})
76-
77-
if not original_items:
78-
return False, None, "No valid items found in CSV file."
79-
80-
logger.info(
81-
f"Parsed {len(original_items)} items from CSV. "
82-
f"Will duplicate {duplication_factor}x for a total of {len(original_items) * duplication_factor} items."
83-
)
84-
85-
# Create or get dataset in Langfuse
86-
dataset = langfuse.create_dataset(name=dataset_name)
87-
88-
# Upload items with duplication
89-
total_uploaded = 0
90-
for item in original_items:
91-
# Duplicate each item N times
92-
for duplicate_num in range(duplication_factor):
93-
try:
94-
langfuse.create_dataset_item(
95-
dataset_name=dataset_name,
96-
input={"question": item["question"]},
97-
expected_output={"answer": item["answer"]},
98-
metadata={
99-
"original_question": item["question"],
100-
"duplicate_number": duplicate_num + 1,
101-
"duplication_factor": duplication_factor,
102-
},
103-
)
104-
total_uploaded += 1
105-
except Exception as e:
106-
logger.error(
107-
f"Failed to upload item (duplicate {duplicate_num + 1}): {item['question'][:50]}... Error: {e}"
108-
)
109-
110-
# Flush to ensure all items are uploaded
111-
langfuse.flush()
112-
113-
logger.info(
114-
f"Successfully uploaded {total_uploaded} items to dataset '{dataset_name}' "
115-
f"({len(original_items)} original × {duplication_factor} duplicates)"
116-
)
117-
118-
return (
119-
True,
120-
DatasetUploadResponse(
121-
dataset_id=dataset_id,
122-
dataset_name=dataset_name,
123-
total_items=total_uploaded,
124-
original_items=len(original_items),
125-
duplication_factor=duplication_factor,
126-
langfuse_dataset_id=dataset.id if hasattr(dataset, "id") else None,
127-
),
128-
None,
129-
)
130-
131-
except Exception as e:
132-
logger.error(f"Error uploading dataset: {str(e)}", exc_info=True)
133-
return False, None, f"Failed to upload dataset: {str(e)}"
134-
135-
13611
def create_evaluation_run(
13712
session: Session,
13813
run_name: str,
@@ -170,8 +45,13 @@ def create_evaluation_run(
17045
)
17146

17247
session.add(eval_run)
173-
session.commit()
174-
session.refresh(eval_run)
48+
try:
49+
session.commit()
50+
session.refresh(eval_run)
51+
except Exception as e:
52+
session.rollback()
53+
logger.error(f"Failed to create EvaluationRun: {e}", exc_info=True)
54+
raise
17555

17656
logger.info(f"Created EvaluationRun record: id={eval_run.id}, run_name={run_name}")
17757

@@ -214,7 +94,7 @@ def list_evaluation_runs(
21494
f"project_id={project_id}"
21595
)
21696

217-
return list(runs)
97+
return runs
21898

21999

220100
def get_evaluation_run_by_id(
@@ -302,7 +182,12 @@ def update_evaluation_run(
302182

303183
# Persist to database
304184
session.add(eval_run)
305-
session.commit()
306-
session.refresh(eval_run)
185+
try:
186+
session.commit()
187+
session.refresh(eval_run)
188+
except Exception as e:
189+
session.rollback()
190+
logger.error(f"Failed to update EvaluationRun: {e}", exc_info=True)
191+
raise
307192

308193
return eval_run

0 commit comments

Comments
 (0)