Skip to content

Commit ba0e3a0

Browse files
authored
Merge pull request #13 from polygon-io/branch-50-upgrade
Merge our internal changes and fix conflicts for DF branch 50
2 parents 10343c1 + acd9ddf commit ba0e3a0

File tree

77 files changed

+1499
-225
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1499
-225
lines changed

.github/workflows/audit.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,4 @@ jobs:
4444
- name: Run audit check
4545
# Ignored until https://github.com/apache/datafusion/issues/15571
4646
# ignored py03 warning until arrow 55 upgrade
47-
run: cargo audit --ignore RUSTSEC-2024-0370 --ignore RUSTSEC-2025-0020
47+
run: cargo audit --ignore RUSTSEC-2024-0370 --ignore RUSTSEC-2025-0020 --ignore RUSTSEC-2025-0047

datafusion/common/src/config.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ config_namespace! {
539539

540540
/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
541541
/// and `Binary/BinaryLarge` with `BinaryView`.
542-
pub schema_force_view_types: bool, default = true
542+
pub schema_force_view_types: bool, default = false
543543

544544
/// (reading) If true, parquet reader will read columns of
545545
/// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
@@ -2521,6 +2521,10 @@ config_namespace! {
25212521
// The input regex for Nulls when loading CSVs.
25222522
pub null_regex: Option<String>, default = None
25232523
pub comment: Option<u8>, default = None
2524+
// Whether to allow truncated rows when parsing.
2525+
// By default this is set to false and will error if the CSV rows have different lengths.
2526+
// When set to true then it will allow records with less than the expected number of columns
2527+
pub truncated_rows: Option<bool>, default = None
25242528
}
25252529
}
25262530

@@ -2613,6 +2617,15 @@ impl CsvOptions {
26132617
self
26142618
}
26152619

2620+
/// Whether to allow truncated rows when parsing.
2621+
/// By default this is set to false and will error if the CSV rows have different lengths.
2622+
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
2623+
/// If the record’s schema is not nullable, then it will still return an error.
2624+
pub fn with_truncated_rows(mut self, allow: bool) -> Self {
2625+
self.truncated_rows = Some(allow);
2626+
self
2627+
}
2628+
26162629
/// The delimiter character.
26172630
pub fn delimiter(&self) -> u8 {
26182631
self.delimiter

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 178 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ mod tests {
4848
use datafusion_physical_plan::{collect, ExecutionPlan};
4949

5050
use arrow::array::{
51-
BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray,
51+
Array, BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray,
5252
};
5353
use arrow::compute::concat_batches;
5454
use arrow::csv::ReaderBuilder;
@@ -1256,4 +1256,181 @@ mod tests {
12561256
.build_decoder();
12571257
DecoderDeserializer::new(CsvDecoder::new(decoder))
12581258
}
1259+
1260+
fn csv_deserializer_with_truncated(
1261+
batch_size: usize,
1262+
schema: &Arc<Schema>,
1263+
) -> impl BatchDeserializer<Bytes> {
1264+
// using Arrow's ReaderBuilder and enabling truncated_rows
1265+
let decoder = ReaderBuilder::new(schema.clone())
1266+
.with_batch_size(batch_size)
1267+
.with_truncated_rows(true) // <- enable runtime truncated_rows
1268+
.build_decoder();
1269+
DecoderDeserializer::new(CsvDecoder::new(decoder))
1270+
}
1271+
1272+
#[tokio::test]
1273+
async fn infer_schema_with_truncated_rows_true() -> Result<()> {
1274+
let session_ctx = SessionContext::new();
1275+
let state = session_ctx.state();
1276+
1277+
// CSV: header has 3 columns, but first data row has only 2 columns, second row has 3
1278+
let csv_data = Bytes::from("a,b,c\n1,2\n3,4,5\n");
1279+
let variable_object_store = Arc::new(VariableStream::new(csv_data, 1));
1280+
let object_meta = ObjectMeta {
1281+
location: Path::parse("/")?,
1282+
last_modified: DateTime::default(),
1283+
size: u64::MAX,
1284+
e_tag: None,
1285+
version: None,
1286+
};
1287+
1288+
// Construct CsvFormat and enable truncated_rows via CsvOptions
1289+
let csv_options = CsvOptions::default().with_truncated_rows(true);
1290+
let csv_format = CsvFormat::default()
1291+
.with_has_header(true)
1292+
.with_options(csv_options)
1293+
.with_schema_infer_max_rec(10);
1294+
1295+
let inferred_schema = csv_format
1296+
.infer_schema(
1297+
&state,
1298+
&(variable_object_store.clone() as Arc<dyn ObjectStore>),
1299+
&[object_meta],
1300+
)
1301+
.await?;
1302+
1303+
// header has 3 columns; inferred schema should also have 3
1304+
assert_eq!(inferred_schema.fields().len(), 3);
1305+
1306+
// inferred columns should be nullable
1307+
for f in inferred_schema.fields() {
1308+
assert!(f.is_nullable());
1309+
}
1310+
1311+
Ok(())
1312+
}
1313+
#[test]
1314+
fn test_decoder_truncated_rows_runtime() -> Result<()> {
1315+
// Synchronous test: Decoder API used here is synchronous
1316+
let schema = csv_schema(); // helper already defined in file
1317+
1318+
// Construct a decoder that enables truncated_rows at runtime
1319+
let mut deserializer = csv_deserializer_with_truncated(10, &schema);
1320+
1321+
// Provide two rows: first row complete, second row missing last column
1322+
let input = Bytes::from("0,0.0,true,0-string\n1,1.0,true\n");
1323+
deserializer.digest(input);
1324+
1325+
// Finish and collect output
1326+
deserializer.finish();
1327+
1328+
let output = deserializer.next()?;
1329+
match output {
1330+
DeserializerOutput::RecordBatch(batch) => {
1331+
// ensure at least two rows present
1332+
assert!(batch.num_rows() >= 2);
1333+
// column 4 (index 3) should be a StringArray where second row is NULL
1334+
let col4 = batch
1335+
.column(3)
1336+
.as_any()
1337+
.downcast_ref::<StringArray>()
1338+
.expect("column 4 should be StringArray");
1339+
1340+
// first row present, second row should be null
1341+
assert!(!col4.is_null(0));
1342+
assert!(col4.is_null(1));
1343+
}
1344+
other => panic!("expected RecordBatch but got {other:?}"),
1345+
}
1346+
Ok(())
1347+
}
1348+
1349+
#[tokio::test]
1350+
async fn infer_schema_truncated_rows_false_error() -> Result<()> {
1351+
let session_ctx = SessionContext::new();
1352+
let state = session_ctx.state();
1353+
1354+
// CSV: header has 4 cols, first data row has 3 cols -> truncated at end
1355+
let csv_data = Bytes::from("id,a,b,c\n1,foo,bar\n2,foo,bar,baz\n");
1356+
let variable_object_store = Arc::new(VariableStream::new(csv_data, 1));
1357+
let object_meta = ObjectMeta {
1358+
location: Path::parse("/")?,
1359+
last_modified: DateTime::default(),
1360+
size: u64::MAX,
1361+
e_tag: None,
1362+
version: None,
1363+
};
1364+
1365+
// CsvFormat without enabling truncated_rows (default behavior = false)
1366+
let csv_format = CsvFormat::default()
1367+
.with_has_header(true)
1368+
.with_schema_infer_max_rec(10);
1369+
1370+
let res = csv_format
1371+
.infer_schema(
1372+
&state,
1373+
&(variable_object_store.clone() as Arc<dyn ObjectStore>),
1374+
&[object_meta],
1375+
)
1376+
.await;
1377+
1378+
// Expect an error due to unequal lengths / incorrect number of fields
1379+
assert!(
1380+
res.is_err(),
1381+
"expected infer_schema to error on truncated rows when disabled"
1382+
);
1383+
1384+
// Optional: check message contains indicative text (two known possibilities)
1385+
if let Err(err) = res {
1386+
let msg = format!("{err}");
1387+
assert!(
1388+
msg.contains("Encountered unequal lengths")
1389+
|| msg.contains("incorrect number of fields"),
1390+
"unexpected error message: {msg}",
1391+
);
1392+
}
1393+
1394+
Ok(())
1395+
}
1396+
1397+
#[tokio::test]
1398+
async fn test_read_csv_truncated_rows_via_tempfile() -> Result<()> {
1399+
use std::io::Write;
1400+
1401+
// create a SessionContext
1402+
let ctx = SessionContext::new();
1403+
1404+
// Create a temp file with a .csv suffix so the reader accepts it
1405+
let mut tmp = tempfile::Builder::new().suffix(".csv").tempfile()?; // ensures path ends with .csv
1406+
// CSV has header "a,b,c". First data row is truncated (only "1,2"), second row is complete.
1407+
write!(tmp, "a,b,c\n1,2\n3,4,5\n")?;
1408+
let path = tmp.path().to_str().unwrap().to_string();
1409+
1410+
// Build CsvReadOptions: header present, enable truncated_rows.
1411+
// (Use the exact builder method your crate exposes: `truncated_rows(true)` here,
1412+
// if the method name differs in your codebase use the appropriate one.)
1413+
let options = CsvReadOptions::default().truncated_rows(true);
1414+
1415+
println!("options: {}, path: {path}", options.truncated_rows);
1416+
1417+
// Call the API under test
1418+
let df = ctx.read_csv(&path, options).await?;
1419+
1420+
// Collect the results and combine batches so we can inspect columns
1421+
let batches = df.collect().await?;
1422+
let combined = concat_batches(&batches[0].schema(), &batches)?;
1423+
1424+
// Column 'c' is the 3rd column (index 2). The first data row was truncated -> should be NULL.
1425+
let col_c = combined.column(2);
1426+
assert!(
1427+
col_c.is_null(0),
1428+
"expected first row column 'c' to be NULL due to truncated row"
1429+
);
1430+
1431+
// Also ensure we read at least one row
1432+
assert!(combined.num_rows() >= 2);
1433+
1434+
Ok(())
1435+
}
12591436
}

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ pub struct CsvReadOptions<'a> {
9191
pub file_sort_order: Vec<Vec<SortExpr>>,
9292
/// Optional regex to match null values
9393
pub null_regex: Option<String>,
94+
/// Whether to allow truncated rows when parsing.
95+
/// By default this is set to false and will error if the CSV rows have different lengths.
96+
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
97+
/// If the record’s schema is not nullable, then it will still return an error.
98+
pub truncated_rows: bool,
9499
}
95100

96101
impl Default for CsvReadOptions<'_> {
@@ -117,6 +122,7 @@ impl<'a> CsvReadOptions<'a> {
117122
file_sort_order: vec![],
118123
comment: None,
119124
null_regex: None,
125+
truncated_rows: false,
120126
}
121127
}
122128

@@ -223,6 +229,15 @@ impl<'a> CsvReadOptions<'a> {
223229
self.null_regex = null_regex;
224230
self
225231
}
232+
233+
/// Configure whether to allow truncated rows when parsing.
234+
/// By default this is set to false and will error if the CSV rows have different lengths
235+
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
236+
/// If the record’s schema is not nullable, then it will still return an error.
237+
pub fn truncated_rows(mut self, truncated_rows: bool) -> Self {
238+
self.truncated_rows = truncated_rows;
239+
self
240+
}
226241
}
227242

228243
/// Options that control the reading of Parquet files.
@@ -558,7 +573,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
558573
.with_newlines_in_values(self.newlines_in_values)
559574
.with_schema_infer_max_rec(self.schema_infer_max_records)
560575
.with_file_compression_type(self.file_compression_type.to_owned())
561-
.with_null_regex(self.null_regex.clone());
576+
.with_null_regex(self.null_regex.clone())
577+
.with_truncated_rows(self.truncated_rows);
562578

563579
ListingOptions::new(Arc::new(file_format))
564580
.with_file_extension(self.file_extension)

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -581,11 +581,11 @@ mod tests {
581581
assert_eq!(string_truncation_stats.null_count, Precision::Exact(2));
582582
assert_eq!(
583583
string_truncation_stats.max_value,
584-
Precision::Inexact(ScalarValue::Utf8View(Some("b".repeat(63) + "c")))
584+
Precision::Inexact(Utf8(Some("b".repeat(63) + "c")))
585585
);
586586
assert_eq!(
587587
string_truncation_stats.min_value,
588-
Precision::Inexact(ScalarValue::Utf8View(Some("a".repeat(64))))
588+
Precision::Inexact(Utf8(Some("a".repeat(64))))
589589
);
590590

591591
Ok(())

datafusion/core/src/execution/session_state.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ use datafusion_physical_expr::create_physical_expr;
6767
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
6868
use datafusion_physical_optimizer::optimizer::PhysicalOptimizer;
6969
use datafusion_physical_optimizer::PhysicalOptimizerRule;
70+
use datafusion_physical_plan::node_id::{
71+
annotate_node_id_for_execution_plan, NodeIdAnnotator,
72+
};
7073
use datafusion_physical_plan::ExecutionPlan;
7174
use datafusion_session::Session;
7275
use datafusion_sql::parser::{DFParserBuilder, Statement};
@@ -647,9 +650,12 @@ impl SessionState {
647650
logical_plan: &LogicalPlan,
648651
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
649652
let logical_plan = self.optimize(logical_plan)?;
650-
self.query_planner
653+
let physical_plan = self
654+
.query_planner
651655
.create_physical_plan(&logical_plan, self)
652-
.await
656+
.await?;
657+
let mut id_annotator = NodeIdAnnotator::new();
658+
annotate_node_id_for_execution_plan(&physical_plan, &mut id_annotator)
653659
}
654660

655661
/// Create a [`PhysicalExpr`] from an [`Expr`] after applying type

datafusion/core/tests/parquet/page_pruning.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,9 @@ async fn page_index_filter_one_col() {
165165

166166
// 5.create filter date_string_col == "01/01/09"`;
167167
// Note this test doesn't apply type coercion so the literal must match the actual view type
168-
let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8view("01/01/09")));
168+
// xudong: use new_utf8, because schema_force_view_types was changed to false now.
169+
// qi: when schema_force_view_types setting to true, we should change back to utf8view
170+
let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8("01/01/09")));
169171
let batches = get_filter_results(&state, filter.clone(), false).await;
170172
assert_eq!(batches[0].num_rows(), 14);
171173

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3615,18 +3615,19 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> {
36153615
);
36163616

36173617
// Apply the function
3618-
let result = replace_order_preserving_variants(dist_context)?;
3618+
let result = replace_order_preserving_variants(dist_context, false)?;
36193619

36203620
// Verify the plan was transformed to CoalescePartitionsExec
36213621
result
3622+
.0
36223623
.plan
36233624
.as_any()
36243625
.downcast_ref::<CoalescePartitionsExec>()
36253626
.expect("Expected CoalescePartitionsExec");
36263627

36273628
// Verify fetch was preserved
36283629
assert_eq!(
3629-
result.plan.fetch(),
3630+
result.0.plan.fetch(),
36303631
Some(5),
36313632
"Fetch value was not preserved after transformation"
36323633
);

0 commit comments

Comments
 (0)