Skip to content

Commit 782fc70

Browse files
authored
Merge pull request rapidsai#18830 from rapidsai/branch-25.06
Forward-merge branch-25.06 into branch-25.08
2 parents 773e21f + 9e3e5b4 commit 782fc70

File tree

3 files changed

+67
-13
lines changed

3 files changed

+67
-13
lines changed

python/cudf_polars/cudf_polars/dsl/ir.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -408,9 +408,18 @@ def __init__(
408408
"Multi-character comment prefix not supported for CSV reader"
409409
)
410410
if not self.reader_options["has_header"]:
411-
# Need to do some file introspection to get the number
412-
# of columns so that column projection works right.
413-
raise NotImplementedError("Reading CSV without header")
411+
# TODO: To support reading headerless CSV files without requiring new
412+
# column names, we would need to do file introspection to infer the number
413+
# of columns so column projection works right.
414+
reader_schema = self.reader_options.get("schema")
415+
if not (
416+
reader_schema
417+
and isinstance(schema, dict)
418+
and "fields" in reader_schema
419+
):
420+
raise NotImplementedError(
421+
"Reading CSV without header requires user-provided column names via new_columns"
422+
)
414423
elif self.typ == "ndjson":
415424
# TODO: consider handling the low memory option here
416425
# (maybe use chunked JSON reader)
@@ -510,8 +519,8 @@ def read_csv_header(
510519
# file provides column names
511520
column_names = None
512521
usecols = with_columns
513-
# TODO: support has_header=False
514-
header = 0
522+
has_header = reader_options["has_header"]
523+
header = 0 if has_header else -1
515524

516525
# polars defaults to no null recognition
517526
null_values = [""]
@@ -557,7 +566,7 @@ def read_csv_header(
557566
options.set_names([str(name) for name in column_names])
558567
else:
559568
if (
560-
not POLARS_VERSION_LT_128 and skip_rows > header
569+
not POLARS_VERSION_LT_128 and header > -1 and skip_rows > header
561570
): # pragma: no cover
562571
# We need to read the header otherwise we would skip it
563572
column_names = read_csv_header(path, str(sep))

python/cudf_polars/cudf_polars/testing/io.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def make_partitioned_source(
2323
*,
2424
n_files: int = 1,
2525
row_group_size: int | None = None,
26+
write_kwargs: dict | None = None,
2627
) -> None:
2728
"""
2829
Write the Polars DataFrame to one or more files of the desired format.
@@ -39,19 +40,23 @@ def make_partitioned_source(
3940
If greater than 1, splits the data into multiple files.
4041
row_group_size : optional, int
4142
Only used for Parquet. Specifies the row group size per file.
43+
write_kwargs : dict, optional
44+
Additional keyword arguments to pass to the write_* functions.
4245
"""
4346
path = Path(path)
47+
write_kwargs = write_kwargs or {}
4448

4549
def write(part: pl.DataFrame, file_path: Path) -> None:
4650
match fmt:
4751
case "csv":
48-
part.write_csv(file_path)
52+
part.write_csv(file_path, **write_kwargs)
4953
case "ndjson":
50-
part.write_ndjson(file_path)
54+
part.write_ndjson(file_path, **write_kwargs)
5155
case "parquet" | "chunked_parquet":
5256
part.write_parquet(
5357
file_path,
5458
row_group_size=row_group_size or (len(part) // 2),
59+
**write_kwargs,
5560
)
5661
case _:
5762
raise ValueError(f"Unsupported format: {fmt}")

python/cudf_polars/tests/test_scan.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def mask(request):
6363
params=[None, (1, 1)],
6464
ids=["no_slice", "slice_second"],
6565
)
66-
def slice(request):
66+
def zlice(request):
6767
# For use in testing that we handle
6868
# polars slice pushdown correctly
6969
return request.param
@@ -85,7 +85,7 @@ def scan_fn(format):
8585

8686

8787
def test_scan(
88-
tmp_path, df, format, scan_fn, row_index, n_rows, columns, mask, slice, request
88+
tmp_path, df, format, scan_fn, row_index, n_rows, columns, mask, zlice, request
8989
):
9090
name, offset = row_index
9191
is_chunked = format == "chunked_parquet"
@@ -102,7 +102,7 @@ def test_scan(
102102
pytest.mark.xfail(
103103
condition=(
104104
not POLARS_VERSION_LT_128
105-
and slice is not None
105+
and zlice is not None
106106
and scan_fn is pl.scan_ndjson
107107
),
108108
reason="slice pushdown not supported in the libcudf JSON reader",
@@ -116,8 +116,8 @@ def test_scan(
116116
)
117117
engine = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": is_chunked})
118118

119-
if slice is not None:
120-
q = q.slice(*slice)
119+
if zlice is not None:
120+
q = q.slice(*zlice)
121121
if mask is not None:
122122
q = q.filter(mask)
123123
if columns is not None:
@@ -422,3 +422,43 @@ def test_select_arbitrary_order_with_row_index_column(request, tmp_path):
422422
[pl.col("a"), pl.col("foo")]
423423
)
424424
assert_gpu_result_equal(q)
425+
426+
427+
@pytest.mark.parametrize(
428+
"has_header,new_columns",
429+
[
430+
(True, None),
431+
(False, ["a", "b", "c"]),
432+
],
433+
)
434+
def test_scan_csv_with_and_without_header(
435+
df, tmp_path, has_header, new_columns, row_index, columns, zlice
436+
):
437+
path = tmp_path / "test.csv"
438+
make_partitioned_source(
439+
df, path, "csv", write_kwargs={"include_header": has_header}
440+
)
441+
442+
name, offset = row_index
443+
444+
q = pl.scan_csv(
445+
path,
446+
has_header=has_header,
447+
new_columns=new_columns,
448+
row_index_name=name,
449+
row_index_offset=offset,
450+
)
451+
452+
if zlice is not None:
453+
q = q.slice(*zlice)
454+
if columns is not None:
455+
q = q.select(columns)
456+
457+
assert_gpu_result_equal(q)
458+
459+
460+
def test_scan_csv_without_header_and_new_column_names_raises(df, tmp_path):
461+
path = tmp_path / "test.csv"
462+
make_partitioned_source(df, path, "csv", write_kwargs={"include_header": False})
463+
q = pl.scan_csv(path, has_header=False)
464+
assert_ir_translation_raises(q, NotImplementedError)

0 commit comments

Comments
 (0)