Skip to content

Commit 154a3d2

Browse files
committed
test: add Rust integration tests for CDC parquet metadata verification
Add content_defined_chunking.rs with 2 tests that write parquet files using ArrowWriter with CDC-enabled WriterProperties, then inspect file metadata to verify CDC is wired through correctly: - cdc_data_round_trip: write/read 5000 rows, verify count and range - cdc_affects_page_boundaries: compare column uncompressed sizes between CDC and non-CDC writes to verify CDC changes the page layout Also fix clippy warning on CdcOptions::default() inherent method.
1 parent e8cc367 commit 154a3d2

File tree

4 files changed

+217
-23
lines changed

4 files changed

+217
-23
lines changed

datafusion/common/src/config.rs

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,7 @@ pub struct CdcOptions {
715715
// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`.
716716
impl CdcOptions {
717717
/// Returns a new `CdcOptions` with default values.
718+
#[expect(clippy::should_implement_trait)]
718719
pub fn default() -> Self {
719720
Self {
720721
min_chunk_size: 256 * 1024,
@@ -731,10 +732,7 @@ impl ConfigField for CdcOptions {
731732
"min_chunk_size" => self.min_chunk_size.set(rem, value),
732733
"max_chunk_size" => self.max_chunk_size.set(rem, value),
733734
"norm_level" => self.norm_level.set(rem, value),
734-
_ => _config_err!(
735-
"Config value \"{}\" not found on CdcOptions",
736-
key
737-
),
735+
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
738736
}
739737
}
740738

@@ -774,10 +772,7 @@ impl ConfigField for CdcOptions {
774772
self.norm_level.reset(rem)
775773
}
776774
}
777-
_ => _config_err!(
778-
"Config value \"{}\" not found on CdcOptions",
779-
key
780-
),
775+
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
781776
}
782777
}
783778
}
@@ -809,8 +804,7 @@ impl ConfigField for Option<CdcOptions> {
809804
),
810805
}
811806
} else {
812-
self.get_or_insert_with(CdcOptions::default)
813-
.set(key, value)
807+
self.get_or_insert_with(CdcOptions::default).set(key, value)
814808
}
815809
}
816810

@@ -819,8 +813,7 @@ impl ConfigField for Option<CdcOptions> {
819813
*self = None;
820814
Ok(())
821815
} else {
822-
self.get_or_insert_with(CdcOptions::default)
823-
.reset(key)
816+
self.get_or_insert_with(CdcOptions::default).reset(key)
824817
}
825818
}
826819
}
@@ -3731,11 +3724,13 @@ mod tests {
37313724
use crate::config::ConfigOptions;
37323725

37333726
let mut config = ConfigOptions::default();
3734-
assert!(config
3735-
.execution
3736-
.parquet
3737-
.use_content_defined_chunking
3738-
.is_none());
3727+
assert!(
3728+
config
3729+
.execution
3730+
.parquet
3731+
.use_content_defined_chunking
3732+
.is_none()
3733+
);
37393734

37403735
// Setting to "true" should enable CDC with default options
37413736
config
@@ -3761,11 +3756,13 @@ mod tests {
37613756
"false",
37623757
)
37633758
.unwrap();
3764-
assert!(config
3765-
.execution
3766-
.parquet
3767-
.use_content_defined_chunking
3768-
.is_none());
3759+
assert!(
3760+
config
3761+
.execution
3762+
.parquet
3763+
.use_content_defined_chunking
3764+
.is_none()
3765+
);
37693766
}
37703767

37713768
#[cfg(feature = "parquet")]
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Tests for parquet content-defined chunking (CDC).
19+
//!
20+
//! These tests verify that CDC options are correctly wired through to the
21+
//! parquet writer by inspecting file metadata (compressed sizes, page
22+
//! boundaries) on the written files.
23+
24+
use arrow::array::{Int32Array, StringArray};
25+
use arrow::datatypes::{DataType, Field, Schema};
26+
use arrow::record_batch::RecordBatch;
27+
use datafusion::prelude::{ParquetReadOptions, SessionContext};
28+
use datafusion_common::config::{CdcOptions, TableParquetOptions};
29+
use parquet::arrow::ArrowWriter;
30+
use parquet::arrow::arrow_reader::ArrowReaderMetadata;
31+
use parquet::file::properties::WriterProperties;
32+
use std::fs::File;
33+
use std::sync::Arc;
34+
use tempfile::NamedTempFile;
35+
36+
/// Create a RecordBatch with enough data to exercise CDC chunking.
37+
fn make_test_batch(num_rows: usize) -> RecordBatch {
38+
let ids: Vec<i32> = (0..num_rows as i32).collect();
39+
// ~100 bytes per row to generate enough data for CDC page splits
40+
let payloads: Vec<String> = (0..num_rows)
41+
.map(|i| format!("row-{i:06}-payload-{}", "x".repeat(80)))
42+
.collect();
43+
44+
let schema = Arc::new(Schema::new(vec![
45+
Field::new("id", DataType::Int32, false),
46+
Field::new("payload", DataType::Utf8, false),
47+
]));
48+
49+
RecordBatch::try_new(
50+
schema,
51+
vec![
52+
Arc::new(Int32Array::from(ids)),
53+
Arc::new(StringArray::from(payloads)),
54+
],
55+
)
56+
.unwrap()
57+
}
58+
59+
/// Build WriterProperties from TableParquetOptions, exercising the same
60+
/// code path that DataFusion's parquet sink uses.
61+
fn writer_props(
62+
opts: &mut TableParquetOptions,
63+
schema: &Arc<Schema>,
64+
) -> WriterProperties {
65+
opts.arrow_schema(schema);
66+
parquet::file::properties::WriterPropertiesBuilder::try_from(
67+
opts as &TableParquetOptions,
68+
)
69+
.unwrap()
70+
.build()
71+
}
72+
73+
/// Write a batch to a temp parquet file and return the file handle.
74+
fn write_parquet_file(batch: &RecordBatch, props: WriterProperties) -> NamedTempFile {
75+
let tmp = tempfile::Builder::new()
76+
.suffix(".parquet")
77+
.tempfile()
78+
.unwrap();
79+
let mut writer =
80+
ArrowWriter::try_new(tmp.reopen().unwrap(), batch.schema(), Some(props)).unwrap();
81+
writer.write(batch).unwrap();
82+
writer.close().unwrap();
83+
tmp
84+
}
85+
86+
/// Read parquet metadata from a file.
87+
fn read_metadata(file: &NamedTempFile) -> parquet::file::metadata::ParquetMetaData {
88+
let f = File::open(file.path()).unwrap();
89+
let reader_meta = ArrowReaderMetadata::load(&f, Default::default()).unwrap();
90+
reader_meta.metadata().as_ref().clone()
91+
}
92+
93+
/// Write parquet with CDC enabled, read it back via DataFusion, and verify
94+
/// the data round-trips correctly.
95+
#[tokio::test]
96+
async fn cdc_data_round_trip() {
97+
let batch = make_test_batch(5000);
98+
99+
let mut opts = TableParquetOptions::default();
100+
opts.global.use_content_defined_chunking = Some(CdcOptions::default());
101+
let props = writer_props(&mut opts, &batch.schema());
102+
103+
let tmp = write_parquet_file(&batch, props);
104+
105+
// Read back via DataFusion and verify row count
106+
let ctx = SessionContext::new();
107+
ctx.register_parquet(
108+
"data",
109+
tmp.path().to_str().unwrap(),
110+
ParquetReadOptions::default(),
111+
)
112+
.await
113+
.unwrap();
114+
115+
let result = ctx
116+
.sql("SELECT COUNT(*), MIN(id), MAX(id) FROM data")
117+
.await
118+
.unwrap()
119+
.collect()
120+
.await
121+
.unwrap();
122+
123+
let row = &result[0];
124+
let count = row
125+
.column(0)
126+
.as_any()
127+
.downcast_ref::<arrow::array::Int64Array>()
128+
.unwrap()
129+
.value(0);
130+
let min_id = row
131+
.column(1)
132+
.as_any()
133+
.downcast_ref::<Int32Array>()
134+
.unwrap()
135+
.value(0);
136+
let max_id = row
137+
.column(2)
138+
.as_any()
139+
.downcast_ref::<Int32Array>()
140+
.unwrap()
141+
.value(0);
142+
143+
assert_eq!(count, 5000);
144+
assert_eq!(min_id, 0);
145+
assert_eq!(max_id, 4999);
146+
}
147+
148+
/// Verify that CDC options are reflected in the parquet file metadata.
149+
/// With small chunk sizes, CDC should produce different page boundaries
150+
/// compared to default (no CDC) writing.
151+
#[tokio::test]
152+
async fn cdc_affects_page_boundaries() {
153+
let batch = make_test_batch(5000);
154+
155+
// Write WITHOUT CDC
156+
let mut no_cdc_opts = TableParquetOptions::default();
157+
let no_cdc_file =
158+
write_parquet_file(&batch, writer_props(&mut no_cdc_opts, &batch.schema()));
159+
let no_cdc_meta = read_metadata(&no_cdc_file);
160+
161+
// Write WITH CDC using small chunk sizes to maximize effect
162+
let mut cdc_opts = TableParquetOptions::default();
163+
cdc_opts.global.use_content_defined_chunking = Some(CdcOptions {
164+
min_chunk_size: 512,
165+
max_chunk_size: 2048,
166+
norm_level: 0,
167+
});
168+
let cdc_file =
169+
write_parquet_file(&batch, writer_props(&mut cdc_opts, &batch.schema()));
170+
let cdc_meta = read_metadata(&cdc_file);
171+
172+
// Both files should have the same number of rows
173+
assert_eq!(
174+
no_cdc_meta.file_metadata().num_rows(),
175+
cdc_meta.file_metadata().num_rows(),
176+
);
177+
178+
// Compare the uncompressed sizes of columns across all row groups.
179+
// CDC with small chunk sizes should produce different page boundaries.
180+
let no_cdc_sizes: Vec<i64> = no_cdc_meta
181+
.row_groups()
182+
.iter()
183+
.flat_map(|rg| rg.columns().iter().map(|c| c.uncompressed_size()))
184+
.collect();
185+
186+
let cdc_sizes: Vec<i64> = cdc_meta
187+
.row_groups()
188+
.iter()
189+
.flat_map(|rg| rg.columns().iter().map(|c| c.uncompressed_size()))
190+
.collect();
191+
192+
assert_ne!(
193+
no_cdc_sizes, cdc_sizes,
194+
"CDC with small chunk sizes should produce different page layouts \
195+
than default writing. no_cdc={no_cdc_sizes:?}, cdc={cdc_sizes:?}"
196+
);
197+
}

datafusion/core/tests/parquet/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use parquet::file::properties::{EnabledStatistics, WriterProperties};
4444
use std::sync::Arc;
4545
use tempfile::NamedTempFile;
4646

47+
mod content_defined_chunking;
4748
mod custom_reader;
4849
#[cfg(feature = "parquet_encryption")]
4950
mod encryption;

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1275,7 +1275,6 @@ pub(crate) fn csv_writer_options_from_proto(
12751275

12761276
#[cfg(test)]
12771277
mod tests {
1278-
use super::*;
12791278
use datafusion_common::config::{CdcOptions, ParquetOptions, TableParquetOptions};
12801279

12811280
fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions {

0 commit comments

Comments
 (0)