Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,24 @@ config_namespace! {
}
}

config_namespace! {
/// Options for content-defined chunking (CDC) when writing parquet files.
/// See [`ParquetOptions::use_content_defined_chunking`].
pub struct CdcOptions {
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
/// until this many bytes have been accumulated. Default is 256 KiB.
pub min_chunk_size: usize, default = 256 * 1024

/// Maximum chunk size in bytes. A split is forced when the accumulated
/// size exceeds this value. Default is 1 MiB.
pub max_chunk_size: usize, default = 1024 * 1024

/// Normalization level. Increasing this improves deduplication ratio
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
pub norm_level: i64, default = 0
}
}

config_namespace! {
/// Options for reading and writing parquet files
///
Expand Down Expand Up @@ -845,6 +863,7 @@ config_namespace! {
/// default parquet writer setting
pub bloom_filter_ndv: Option<u64>, default = None


/// (writing) Controls whether DataFusion will attempt to speed up writing
/// parquet files by serializing them in parallel. Each column
/// in each row group in each output file are serialized in parallel
Expand Down Expand Up @@ -872,6 +891,12 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2

/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
/// parquet files. When `Some`, CDC is enabled with the given options; when `None`
/// (the default), CDC is disabled. When CDC is enabled, parallel writing is
/// automatically disabled since the chunker state must persist across row groups.
pub use_content_defined_chunking: Option<CdcOptions>, default = None
}
}

Expand Down Expand Up @@ -1820,6 +1845,7 @@ config_field!(usize);
config_field!(f64);
config_field!(u64);
config_field!(u32);
config_field!(i64);

impl ConfigField for u8 {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
Expand Down
69 changes: 67 additions & 2 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
global,
column_specific_options,
key_value_metadata,
crypto: _,
..
} = table_parquet_options;

let mut builder = global.into_writer_properties_builder()?;
Expand Down Expand Up @@ -191,6 +191,7 @@ impl ParquetOptions {
bloom_filter_on_write,
bloom_filter_fpp,
bloom_filter_ndv,
use_content_defined_chunking,

// not in WriterProperties
enable_page_index: _,
Expand Down Expand Up @@ -247,6 +248,15 @@ impl ParquetOptions {
if let Some(encoding) = encoding {
builder = builder.set_encoding(parse_encoding_string(encoding)?);
}
if let Some(cdc) = use_content_defined_chunking {
builder = builder.set_content_defined_chunking(Some(
parquet::file::properties::CdcOptions {
min_chunk_size: cdc.min_chunk_size,
max_chunk_size: cdc.max_chunk_size,
norm_level: cdc.norm_level as i32,
},
));
}

Ok(builder)
}
Expand Down Expand Up @@ -388,7 +398,9 @@ mod tests {
use super::*;
#[cfg(feature = "parquet_encryption")]
use crate::config::ConfigFileEncryptionProperties;
use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
use crate::config::{
CdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions,
};
use crate::parquet_config::DFParquetWriterVersion;
use parquet::basic::Compression;
use parquet::file::properties::{
Expand Down Expand Up @@ -460,6 +472,7 @@ mod tests {
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
max_predicate_cache_size: defaults.max_predicate_cache_size,
use_content_defined_chunking: defaults.use_content_defined_chunking.clone(),
}
}

Expand Down Expand Up @@ -576,6 +589,13 @@ mod tests {
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
coerce_int96: None,
use_content_defined_chunking: props.content_defined_chunking().map(|c| {
CdcOptions {
min_chunk_size: c.min_chunk_size,
max_chunk_size: c.max_chunk_size,
norm_level: c.norm_level as i64,
}
}),
},
column_specific_options,
key_value_metadata,
Expand Down Expand Up @@ -786,6 +806,51 @@ mod tests {
);
}

#[test]
fn test_cdc_enabled_with_custom_options() {
let mut opts = TableParquetOptions::default();
opts.global.use_content_defined_chunking = Some(CdcOptions {
min_chunk_size: 128 * 1024,
max_chunk_size: 512 * 1024,
norm_level: 2,
});
opts.arrow_schema(&Arc::new(Schema::empty()));

let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
let cdc = props.content_defined_chunking().expect("CDC should be set");
assert_eq!(cdc.min_chunk_size, 128 * 1024);
assert_eq!(cdc.max_chunk_size, 512 * 1024);
assert_eq!(cdc.norm_level, 2);
}

#[test]
fn test_cdc_disabled_by_default() {
let mut opts = TableParquetOptions::default();
opts.arrow_schema(&Arc::new(Schema::empty()));

let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
assert!(props.content_defined_chunking().is_none());
}

#[test]
fn test_cdc_round_trip_through_writer_props() {
let mut opts = TableParquetOptions::default();
opts.global.use_content_defined_chunking = Some(CdcOptions {
min_chunk_size: 64 * 1024,
max_chunk_size: 2 * 1024 * 1024,
norm_level: -1,
});
opts.arrow_schema(&Arc::new(Schema::empty()));

let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
let recovered = session_config_from_writer_props(&props);

let cdc = recovered.global.use_content_defined_chunking.unwrap();
assert_eq!(cdc.min_chunk_size, 64 * 1024);
assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024);
assert_eq!(cdc.norm_level, -1);
}

#[test]
fn test_bloom_filter_set_ndv_only() {
// the TableParquetOptions::default, with only ndv set
Expand Down
6 changes: 5 additions & 1 deletion datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1368,7 +1368,11 @@ impl FileSink for ParquetSink {

while let Some((path, mut rx)) = file_stream_rx.recv().await {
let parquet_props = self.create_writer_props(&runtime, &path).await?;
if !parquet_opts.global.allow_single_file_parallelism {
// CDC requires the sequential writer: the chunker state lives in ArrowWriter
// and persists across row groups. The parallel path bypasses ArrowWriter entirely.
if !parquet_opts.global.allow_single_file_parallelism
|| parquet_opts.global.use_content_defined_chunking.is_some()
{
let mut writer = self
.create_async_arrow_writer(
&path,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ name = "datafusion_proto_common"
[features]
default = []
json = ["serde", "pbjson"]
parquet = ["datafusion-common/parquet", "dep:parquet"]

[dependencies]
arrow = { workspace = true }
datafusion-common = { workspace = true }
parquet = { workspace = true, optional = true }
pbjson = { workspace = true, optional = true }
prost = { workspace = true }
serde = { version = "1.0", optional = true }
Expand Down
8 changes: 8 additions & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,14 @@ message ParquetOptions {
oneof max_predicate_cache_size_opt {
uint64 max_predicate_cache_size = 33;
}

CdcOptions content_defined_chunking = 35;
}

message CdcOptions {
uint64 min_chunk_size = 1;
uint64 max_chunk_size = 2;
int32 norm_level = 3;
}

enum JoinSide {
Expand Down
103 changes: 98 additions & 5 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use datafusion_common::{
DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
arrow_datafusion_err,
config::{
CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
CdcOptions, CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
TableParquetOptions,
},
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
Expand Down Expand Up @@ -1090,6 +1090,14 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
}).unwrap_or(None),
use_content_defined_chunking: value.content_defined_chunking.map(|cdc| {
let defaults = CdcOptions::default();
CdcOptions {
min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size },
max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size },
norm_level: cdc.norm_level as i64,
}
}),
})
}
}
Expand Down Expand Up @@ -1152,17 +1160,17 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
column_specific_options.insert(column_name.clone(), options.try_into()?);
}
}
Ok(TableParquetOptions {
let opts = TableParquetOptions {
global: value
.global
.as_ref()
.map(|v| v.try_into())
.unwrap()
.unwrap(),
column_specific_options,
key_value_metadata: Default::default(),
crypto: Default::default(),
})
..Default::default()
};
Ok(opts)
}
}

Expand Down Expand Up @@ -1262,3 +1270,88 @@ pub(crate) fn csv_writer_options_from_proto(
.with_null(writer_options.null_value.clone())
.with_double_quote(writer_options.double_quote))
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion_common::config::{CdcOptions, ParquetOptions, TableParquetOptions};

fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions {
let proto: crate::protobuf_common::ParquetOptions =
(&opts).try_into().expect("to_proto");
ParquetOptions::try_from(&proto).expect("from_proto")
}

fn table_parquet_options_proto_round_trip(
opts: TableParquetOptions,
) -> TableParquetOptions {
let proto: crate::protobuf_common::TableParquetOptions =
(&opts).try_into().expect("to_proto");
TableParquetOptions::try_from(&proto).expect("from_proto")
}

#[test]
fn test_parquet_options_cdc_disabled_round_trip() {
let opts = ParquetOptions::default();
assert!(opts.use_content_defined_chunking.is_none());
let recovered = parquet_options_proto_round_trip(opts.clone());
assert_eq!(opts, recovered);
}

#[test]
fn test_parquet_options_cdc_enabled_round_trip() {
let opts = ParquetOptions {
use_content_defined_chunking: Some(CdcOptions {
min_chunk_size: 128 * 1024,
max_chunk_size: 512 * 1024,
norm_level: 2,
}),
..ParquetOptions::default()
};
let recovered = parquet_options_proto_round_trip(opts.clone());
let cdc = recovered.use_content_defined_chunking.unwrap();
assert_eq!(cdc.min_chunk_size, 128 * 1024);
assert_eq!(cdc.max_chunk_size, 512 * 1024);
assert_eq!(cdc.norm_level, 2);
}

#[test]
fn test_parquet_options_cdc_negative_norm_level_round_trip() {
let opts = ParquetOptions {
use_content_defined_chunking: Some(CdcOptions {
norm_level: -3,
..CdcOptions::default()
}),
..ParquetOptions::default()
};
let recovered = parquet_options_proto_round_trip(opts);
assert_eq!(
recovered.use_content_defined_chunking.unwrap().norm_level,
-3
);
}

#[test]
fn test_table_parquet_options_cdc_round_trip() {
let mut opts = TableParquetOptions::default();
opts.global.use_content_defined_chunking = Some(CdcOptions {
min_chunk_size: 64 * 1024,
max_chunk_size: 2 * 1024 * 1024,
norm_level: -1,
});

let recovered = table_parquet_options_proto_round_trip(opts.clone());
let cdc = recovered.global.use_content_defined_chunking.unwrap();
assert_eq!(cdc.min_chunk_size, 64 * 1024);
assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024);
assert_eq!(cdc.norm_level, -1);
}

#[test]
fn test_table_parquet_options_cdc_disabled_round_trip() {
let opts = TableParquetOptions::default();
assert!(opts.global.use_content_defined_chunking.is_none());
let recovered = table_parquet_options_proto_round_trip(opts.clone());
assert!(recovered.global.use_content_defined_chunking.is_none());
}
}
Loading
Loading