Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 43 additions & 22 deletions src/guidellm/benchmark/outputs/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ async def finalize(self, report: GenerativeBenchmarksReport) -> Path:
with output_path.open("w", newline="") as file:
writer = csv.writer(file)

row_maps: list[dict[tuple[str, ...], str | int | float]] = []
ordered_headers: dict[tuple[str, ...], None] = {}
all_headers: list[list[list[str]]] = []
all_values: list[list[str | int | float]] = []

for benchmark in report.benchmarks:
benchmark_headers: list[list[str]] = []
Expand All @@ -145,34 +145,55 @@ async def finalize(self, report: GenerativeBenchmarksReport) -> Path:
self._add_scheduler_info(benchmark, benchmark_headers, benchmark_values)
self._add_runtime_info(report, benchmark_headers, benchmark_values)

row_map: dict[tuple[str, ...], str | int | float] = {}
for header_parts, value in zip(
benchmark_headers, benchmark_values, strict=False
):
header_key = tuple(header_parts)
row_map[header_key] = value
all_headers.append(benchmark_headers)
all_values.append(benchmark_values)

if header_key not in ordered_headers:
ordered_headers[header_key] = None

row_maps.append(row_map)

header_keys = list(ordered_headers.keys())
headers = [list(header_key) for header_key in header_keys]

data_rows: list[list[str | int | float]] = []
for row_map in row_maps:
aligned_row_values = [
row_map.get(header_key, "") for header_key in header_keys
]
data_rows.append(aligned_row_values)
headers, data_rows = self._align_columns(all_headers, all_values)

self._write_multirow_header(writer, headers)
for row in data_rows:
writer.writerow(row)

return output_path

@staticmethod
def _align_columns(
all_headers: list[list[list[str]]],
all_values: list[list[str | int | float]],
) -> tuple[list[list[str]], list[list[str | int | float]]]:
"""
Align columns across multiple benchmarks that may have different column sets.

Builds a unified header list from all benchmarks (preserving first-seen order)
and pads each row with empty strings for columns it doesn't have.

:param all_headers: Per-benchmark list of column header hierarchies
:param all_values: Per-benchmark list of column values
:return: Tuple of (unified headers, aligned data rows)
"""
ordered_headers: dict[tuple[str, ...], None] = {}
row_maps: list[dict[tuple[str, ...], str | int | float]] = []

for benchmark_headers, benchmark_values in zip(
all_headers, all_values, strict=True
):
row_map: dict[tuple[str, ...], str | int | float] = {}
for header_parts, value in zip(
benchmark_headers, benchmark_values, strict=False
):
header_key = tuple(header_parts)
row_map[header_key] = value
if header_key not in ordered_headers:
ordered_headers[header_key] = None
row_maps.append(row_map)

header_keys = list(ordered_headers.keys())
headers = [list(k) for k in header_keys]
data_rows: list[list[str | int | float]] = [
[row_map.get(k, "") for k in header_keys] for row_map in row_maps
]
return headers, data_rows

def _write_multirow_header(self, writer: Any, headers: list[list[str]]) -> None:
"""
Write multi-row header to CSV for hierarchical metric organization.
Expand Down
265 changes: 174 additions & 91 deletions tests/unit/benchmark/test_csv_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,181 @@
from guidellm.benchmark.outputs.csv import GenerativeBenchmarkerCSV


def _make_report(benchmarks):
"""Build a minimal benchmark report for CSV output tests."""
return SimpleNamespace(
benchmarks=benchmarks,
class TestAlignColumns:
"""
Tests for _align_columns ensuring correct column merging and alignment
when benchmarks have different sets of metrics.

## WRITTEN BY AI ##
"""

@pytest.mark.regression
def test_headers_merge_in_first_seen_order(self):
"""
Headers from multiple benchmarks are merged preserving first-seen order,
producing the union of all columns.

## WRITTEN BY AI ##
"""
headers_b1 = [["GroupA", "Field1", ""], ["GroupB", "Field2", ""]]
headers_b2 = [["GroupA", "Field1", ""], ["GroupC", "Field3", ""]]
values_b1 = ["v1", "v2"]
values_b2 = ["v1_b2", "v3"]

headers, rows = GenerativeBenchmarkerCSV._align_columns(
[headers_b1, headers_b2], [values_b1, values_b2]
)

assert headers == [
["GroupA", "Field1", ""],
["GroupB", "Field2", ""],
["GroupC", "Field3", ""],
]
assert rows[0] == ["v1", "v2", ""]
assert rows[1] == ["v1_b2", "", "v3"]

@pytest.mark.regression
def test_missing_columns_filled_with_empty_string(self):
"""
When the second benchmark is missing a column the first has, that
position is filled with an empty string.

## WRITTEN BY AI ##
"""
headers_b1 = [["G", "A", ""], ["G", "B", ""]]
headers_b2 = [["G", "A", ""]]
values_b1 = ["a", "b"]
values_b2 = ["a2"]

headers, rows = GenerativeBenchmarkerCSV._align_columns(
[headers_b1, headers_b2], [values_b1, values_b2]
)

assert headers == [["G", "A", ""], ["G", "B", ""]]
assert rows[0] == ["a", "b"]
assert rows[1] == ["a2", ""]

@pytest.mark.regression
def test_first_benchmark_missing_columns(self):
"""
When the first benchmark lacks columns that the second has, those
columns are appended and the first row gets empty strings.

## WRITTEN BY AI ##
"""
headers_b1 = [["G", "A", ""]]
headers_b2 = [["G", "A", ""], ["G", "B", ""]]
values_b1 = ["a1"]
values_b2 = ["a2", "b2"]

headers, rows = GenerativeBenchmarkerCSV._align_columns(
[headers_b1, headers_b2], [values_b1, values_b2]
)

assert headers == [["G", "A", ""], ["G", "B", ""]]
assert rows[0] == ["a1", ""]
assert rows[1] == ["a2", "b2"]

@pytest.mark.regression
def test_identical_columns_no_padding(self):
"""
When all benchmarks have the same columns, no padding is needed.

## WRITTEN BY AI ##
"""
headers_b1 = [["G", "X", ""], ["G", "Y", ""]]
headers_b2 = [["G", "X", ""], ["G", "Y", ""]]
values_b1 = ["1", "2"]
values_b2 = ["3", "4"]

headers, rows = GenerativeBenchmarkerCSV._align_columns(
[headers_b1, headers_b2], [values_b1, values_b2]
)

assert headers == [["G", "X", ""], ["G", "Y", ""]]
assert rows[0] == ["1", "2"]
assert rows[1] == ["3", "4"]

@pytest.mark.smoke
def test_empty_benchmarks_list(self):
"""
No benchmarks produces empty headers and no data rows.

## WRITTEN BY AI ##
"""
headers, rows = GenerativeBenchmarkerCSV._align_columns([], [])
assert headers == []
assert rows == []

@pytest.mark.smoke
def test_single_benchmark(self):
"""
A single benchmark returns its headers and values unchanged.

## WRITTEN BY AI ##
"""
headers_b1 = [["A", "B", "C"], ["D", "E", "F"]]
values_b1 = [10, 20]

headers, rows = GenerativeBenchmarkerCSV._align_columns(
[headers_b1], [values_b1]
)

assert headers == [["A", "B", "C"], ["D", "E", "F"]]
assert rows == [[10, 20]]

@pytest.mark.regression
def test_three_benchmarks_disjoint_columns(self):
"""
Three benchmarks each with unique columns produces the full union
with correct empty-fill for each row.

## WRITTEN BY AI ##
"""
headers_b1 = [["G", "A", ""]]
headers_b2 = [["G", "B", ""]]
headers_b3 = [["G", "C", ""]]
values_b1 = ["a"]
values_b2 = ["b"]
values_b3 = ["c"]

headers, rows = GenerativeBenchmarkerCSV._align_columns(
[headers_b1, headers_b2, headers_b3],
[values_b1, values_b2, values_b3],
)

assert headers == [["G", "A", ""], ["G", "B", ""], ["G", "C", ""]]
assert rows[0] == ["a", "", ""]
assert rows[1] == ["", "b", ""]
assert rows[2] == ["", "", "c"]


@pytest.mark.asyncio
@pytest.mark.sanity
async def test_finalize_aligns_columns_in_written_csv(tmp_path: Path):
"""
Integration test: finalize writes a CSV where all rows (headers + data)
have the same column count, even when benchmarks produce different columns.

Uses patching to control the column shape without constructing full
benchmark objects.

## WRITTEN BY AI ##
"""
report = SimpleNamespace(
benchmarks=[
SimpleNamespace(_test_fields=[(("G", "A", ""), "a1")]),
SimpleNamespace(
_test_fields=[(("G", "A", ""), "a2"), (("G", "B", ""), "b2")]
),
],
metadata=SimpleNamespace(model_dump_json=lambda: "{}"),
args=SimpleNamespace(model_dump_json=lambda: "{}"),
)


def _make_csv_output(tmp_path: Path, benchmarks):
"""Create a CSV output instance with only the fields this test needs."""
report = _make_report(benchmarks)
out = GenerativeBenchmarkerCSV(output_path=tmp_path)

# Disable unrelated metric emitters so the test controls the output shape.
# Stub all emitters except _add_run_info so we control column shape
for name in [
"_add_benchmark_info",
"_add_timing_info",
Expand All @@ -40,94 +200,17 @@ def _add_run_info(self, benchmark, headers, values):
values.append(val)

out._add_run_info = _add_run_info.__get__(out, out.__class__)
return out, report


@pytest.mark.asyncio
@pytest.mark.regression
async def test_headers_merge_and_order(tmp_path: Path):
"""
Ensure headers from multiple benchmarks are merged in first-seen order.

### WRITTEN BY AI ###
"""
bench1 = SimpleNamespace(
_test_fields=[
(("GroupA", "Field1", ""), "v1"),
(("GroupB", "Field2", ""), "v2"),
]
)

bench2 = SimpleNamespace(
_test_fields=[
(("GroupA", "Field1", ""), "v1_b2"),
(("GroupC", "Field3", ""), "v3"),
]
)

out, report = _make_csv_output(tmp_path, [bench1, bench2])
path = await out.finalize(report)

rows = list(csv.reader(path.open()))
header_rows = rows[:3]
assert len(rows) == 5 # 3 header rows + 2 data rows

reconstructed = [
tuple(col[i] for col in header_rows) for i in range(len(header_rows[0]))
]
# All rows must have the same column count
col_counts = {len(row) for row in rows}
assert len(col_counts) == 1, f"Expected uniform column count, got {col_counts}"

assert reconstructed == [
("GroupA", "Field1", ""),
("GroupB", "Field2", ""),
("GroupC", "Field3", ""),
]


@pytest.mark.asyncio
@pytest.mark.regression
async def test_values_alignment(tmp_path: Path):
"""
Ensure missing columns are written as blanks for each aligned row.

### WRITTEN BY AI ###
"""
bench1 = SimpleNamespace(
_test_fields=[(("G", "A", ""), "a"), (("G", "B", ""), "b")]
)
bench2 = SimpleNamespace(_test_fields=[(("G", "A", ""), "a2")])

out, report = _make_csv_output(tmp_path, [bench1, bench2])
path = await out.finalize(report)
rows = list(csv.reader(path.open()))
data_rows = rows[3:]

assert data_rows[0] == ["a", "b"]
assert data_rows[1] == ["a2", ""]


@pytest.mark.asyncio
@pytest.mark.regression
async def test_first_benchmark_missing_columns(tmp_path: Path):
"""
When the first benchmark lacks columns that the second has, those columns
should still appear in the output and the first row gets blank values.

### WRITTEN BY AI ###
"""
bench1 = SimpleNamespace(_test_fields=[(("G", "A", ""), "a1")])
bench2 = SimpleNamespace(
_test_fields=[(("G", "A", ""), "a2"), (("G", "B", ""), "b2")]
)

out, report = _make_csv_output(tmp_path, [bench1, bench2])
path = await out.finalize(report)
rows = list(csv.reader(path.open()))
header_rows = rows[:3]
# Data row for first benchmark should have blank in column B
data_rows = rows[3:]

reconstructed = [
tuple(col[i] for col in header_rows) for i in range(len(header_rows[0]))
]

assert reconstructed == [("G", "A", ""), ("G", "B", "")]
assert data_rows[0] == ["a1", ""]
assert data_rows[1] == ["a2", "b2"]