Skip to content

Commit 5a85b7d

Browse files
authored
Merge pull request #14 from polygon-io/branch-50-upgrade
Add more fix PRs to branch-50
2 parents ba0e3a0 + e78eafe commit 5a85b7d

File tree

12 files changed

+270
-9
lines changed

12 files changed

+270
-9
lines changed

datafusion/common/src/dfschema.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -913,7 +913,11 @@ impl TryFrom<SchemaRef> for DFSchema {
913913
field_qualifiers: vec![None; field_count],
914914
functional_dependencies: FunctionalDependencies::empty(),
915915
};
916-
dfschema.check_names()?;
916+
// Without checking names, because schema here may have duplicate field names.
917+
// For example, Partial AggregateMode will generate duplicate field names from
918+
// state_fields.
919+
// See <https://github.com/apache/datafusion/issues/17715>
920+
// dfschema.check_names()?;
917921
Ok(dfschema)
918922
}
919923
}

datafusion/core/src/execution/context/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1727,6 +1727,14 @@ impl FunctionRegistry for SessionContext {
17271727
) -> Result<()> {
17281728
self.state.write().register_expr_planner(expr_planner)
17291729
}
1730+
1731+
fn udafs(&self) -> HashSet<String> {
1732+
self.state.read().udafs()
1733+
}
1734+
1735+
fn udwfs(&self) -> HashSet<String> {
1736+
self.state.read().udwfs()
1737+
}
17301738
}
17311739

17321740
/// Create a new task context instance from SessionContext

datafusion/core/src/execution/session_state.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1881,6 +1881,14 @@ impl FunctionRegistry for SessionState {
18811881
self.expr_planners.push(expr_planner);
18821882
Ok(())
18831883
}
1884+
1885+
fn udafs(&self) -> HashSet<String> {
1886+
self.aggregate_functions.keys().cloned().collect()
1887+
}
1888+
1889+
fn udwfs(&self) -> HashSet<String> {
1890+
self.window_functions.keys().cloned().collect()
1891+
}
18841892
}
18851893

18861894
impl OptimizerConfig for SessionState {

datafusion/core/tests/dataframe/mod.rs

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use arrow::datatypes::{
3232
};
3333
use arrow::error::ArrowError;
3434
use arrow::util::pretty::pretty_format_batches;
35+
use arrow_schema::{SortOptions, TimeUnit};
3536
use datafusion::{assert_batches_eq, dataframe};
3637
use datafusion_functions_aggregate::count::{count_all, count_all_window};
3738
use datafusion_functions_aggregate::expr_fn::{
@@ -64,8 +65,8 @@ use datafusion::test_util::{
6465
use datafusion_catalog::TableProvider;
6566
use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
6667
use datafusion_common::{
67-
assert_contains, Constraint, Constraints, DataFusionError, ParamValues, ScalarValue,
68-
TableReference, UnnestOptions,
68+
assert_contains, Constraint, Constraints, DFSchema, DataFusionError, ParamValues,
69+
ScalarValue, TableReference, UnnestOptions,
6970
};
7071
use datafusion_common_runtime::SpawnedTask;
7172
use datafusion_datasource::file_format::format_as_file_type;
@@ -79,10 +80,16 @@ use datafusion_expr::{
7980
LogicalPlanBuilder, ScalarFunctionImplementation, SortExpr, WindowFrame,
8081
WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
8182
};
83+
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
8284
use datafusion_physical_expr::expressions::Column;
8385
use datafusion_physical_expr::Partitioning;
8486
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
85-
use datafusion_physical_plan::{displayable, ExecutionPlanProperties};
87+
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
88+
use datafusion_physical_plan::aggregates::{
89+
AggregateExec, AggregateMode, PhysicalGroupBy,
90+
};
91+
use datafusion_physical_plan::empty::EmptyExec;
92+
use datafusion_physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};
8693

8794
// Get string representation of the plan
8895
async fn physical_plan_to_string(df: &DataFrame) -> String {
@@ -6322,3 +6329,105 @@ async fn test_copy_to_preserves_order() -> Result<()> {
63226329
);
63236330
Ok(())
63246331
}
6332+
6333+
#[tokio::test]
6334+
async fn test_duplicate_state_fields_for_dfschema_construct() -> Result<()> {
6335+
let ctx = SessionContext::new();
6336+
6337+
// Simple schema with just the fields we need
6338+
let file_schema = Arc::new(Schema::new(vec![
6339+
Field::new(
6340+
"timestamp",
6341+
DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
6342+
true,
6343+
),
6344+
Field::new("ticker", DataType::Utf8, true),
6345+
Field::new("value", DataType::Float64, true),
6346+
Field::new("date", DataType::Utf8, false),
6347+
]));
6348+
6349+
let df_schema = DFSchema::try_from(file_schema.clone())?;
6350+
6351+
let timestamp = col("timestamp");
6352+
let value = col("value");
6353+
let ticker = col("ticker");
6354+
let date = col("date");
6355+
6356+
let mock_exec = Arc::new(EmptyExec::new(file_schema.clone()));
6357+
6358+
// Build first_value aggregate
6359+
let first_value = Arc::new(
6360+
AggregateExprBuilder::new(
6361+
datafusion_functions_aggregate::first_last::first_value_udaf(),
6362+
vec![ctx.create_physical_expr(value.clone(), &df_schema)?],
6363+
)
6364+
.alias("first_value(value)")
6365+
.order_by(vec![PhysicalSortExpr::new(
6366+
ctx.create_physical_expr(timestamp.clone(), &df_schema)?,
6367+
SortOptions::new(false, false),
6368+
)])
6369+
.schema(file_schema.clone())
6370+
.build()
6371+
.expect("Failed to build first_value"),
6372+
);
6373+
6374+
// Build last_value aggregate
6375+
let last_value = Arc::new(
6376+
AggregateExprBuilder::new(
6377+
datafusion_functions_aggregate::first_last::last_value_udaf(),
6378+
vec![ctx.create_physical_expr(value.clone(), &df_schema)?],
6379+
)
6380+
.alias("last_value(value)")
6381+
.order_by(vec![PhysicalSortExpr::new(
6382+
ctx.create_physical_expr(timestamp.clone(), &df_schema)?,
6383+
SortOptions::new(false, false),
6384+
)])
6385+
.schema(file_schema.clone())
6386+
.build()
6387+
.expect("Failed to build last_value"),
6388+
);
6389+
6390+
let partial_agg = AggregateExec::try_new(
6391+
AggregateMode::Partial,
6392+
PhysicalGroupBy::new_single(vec![
6393+
(
6394+
ctx.create_physical_expr(date.clone(), &df_schema)?,
6395+
"date".to_string(),
6396+
),
6397+
(
6398+
ctx.create_physical_expr(ticker.clone(), &df_schema)?,
6399+
"ticker".to_string(),
6400+
),
6401+
]),
6402+
vec![first_value, last_value],
6403+
vec![None, None],
6404+
mock_exec,
6405+
file_schema,
6406+
)
6407+
.expect("Failed to build partial agg");
6408+
6409+
// Assert that the schema field names match the expected names
6410+
let expected_field_names = vec![
6411+
"date",
6412+
"ticker",
6413+
"first_value(value)[first_value]",
6414+
"timestamp@0",
6415+
"is_set",
6416+
"last_value(value)[last_value]",
6417+
"timestamp@0",
6418+
"is_set",
6419+
];
6420+
6421+
let binding = partial_agg.schema();
6422+
let actual_field_names: Vec<_> = binding.fields().iter().map(|f| f.name()).collect();
6423+
assert_eq!(actual_field_names, expected_field_names);
6424+
6425+
// Ensure that DFSchema::try_from does not fail
6426+
let partial_agg_exec_schema = DFSchema::try_from(partial_agg.schema());
6427+
assert!(
6428+
partial_agg_exec_schema.is_ok(),
6429+
"Expected get AggregateExec schema to succeed with duplicate state fields"
6430+
);
6431+
6432+
Ok(())
6433+
}

datafusion/datasource-parquet/src/reader.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
209209
file_metrics,
210210
file_meta,
211211
metadata_cache: Arc::clone(&self.metadata_cache),
212+
metadata_size_hint,
212213
}))
213214
}
214215
}
@@ -222,6 +223,7 @@ pub struct CachedParquetFileReader {
222223
pub inner: ParquetObjectReader,
223224
file_meta: FileMeta,
224225
metadata_cache: Arc<dyn FileMetadataCache>,
226+
metadata_size_hint: Option<usize>,
225227
}
226228

227229
impl AsyncFileReader for CachedParquetFileReader {
@@ -261,11 +263,10 @@ impl AsyncFileReader for CachedParquetFileReader {
261263
#[cfg(not(feature = "parquet_encryption"))]
262264
let file_decryption_properties = None;
263265

264-
// TODO there should be metadata prefetch hint here
265-
// https://github.com/apache/datafusion/issues/17279
266266
DFParquetMetadata::new(&self.store, &file_meta.object_meta)
267267
.with_decryption_properties(file_decryption_properties)
268268
.with_file_metadata_cache(Some(Arc::clone(&metadata_cache)))
269+
.with_metadata_size_hint(self.metadata_size_hint)
269270
.fetch_metadata()
270271
.await
271272
.map_err(|e| {

datafusion/datasource/src/file_scan_config.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,11 @@ impl DataSource for FileScanConfig {
590590
// Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence.
591591
match reassign_predicate_columns(filter, &schema, true) {
592592
Ok(filter) => {
593-
match Self::add_filter_equivalence_info(filter, &mut eq_properties) {
593+
match Self::add_filter_equivalence_info(
594+
filter,
595+
&mut eq_properties,
596+
&schema,
597+
) {
594598
Ok(()) => {}
595599
Err(e) => {
596600
warn!("Failed to add filter equivalence info: {e}");
@@ -758,9 +762,24 @@ impl FileScanConfig {
758762
fn add_filter_equivalence_info(
759763
filter: Arc<dyn PhysicalExpr>,
760764
eq_properties: &mut EquivalenceProperties,
765+
schema: &Schema,
761766
) -> Result<()> {
767+
macro_rules! ignore_dangling_col {
768+
($col:expr) => {
769+
if let Some(col) = $col.as_any().downcast_ref::<Column>() {
770+
if schema.index_of(col.name()).is_err() {
771+
continue;
772+
}
773+
}
774+
};
775+
}
776+
762777
let (equal_pairs, _) = collect_columns_from_predicate(&filter);
763778
for (lhs, rhs) in equal_pairs {
779+
// Ignore any binary expressions that reference non-existent columns in the current schema
780+
// (e.g. due to unnecessary projections being removed)
781+
ignore_dangling_col!(lhs);
782+
ignore_dangling_col!(rhs);
764783
eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
765784
}
766785
Ok(())
@@ -1449,6 +1468,7 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
14491468
#[cfg(test)]
14501469
mod tests {
14511470
use super::*;
1471+
use crate::test_util::col;
14521472
use crate::{
14531473
generate_test_files, test_util::MockSource, tests::aggr_test_schema,
14541474
verify_sort_integrity,
@@ -1457,8 +1477,9 @@ mod tests {
14571477
use arrow::array::{Int32Array, RecordBatch};
14581478
use datafusion_common::stats::Precision;
14591479
use datafusion_common::{assert_batches_eq, internal_err};
1460-
use datafusion_expr::SortExpr;
1480+
use datafusion_expr::{Operator, SortExpr};
14611481
use datafusion_physical_expr::create_physical_sort_expr;
1482+
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
14621483

14631484
/// Returns the column names on the schema
14641485
pub fn columns(schema: &Schema) -> Vec<String> {
@@ -2214,6 +2235,54 @@ mod tests {
22142235
assert_eq!(config.output_ordering.len(), 1);
22152236
}
22162237

2238+
#[test]
2239+
fn equivalence_properties_after_schema_change() {
2240+
let file_schema = aggr_test_schema();
2241+
let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2242+
// Create a file source with a filter
2243+
let file_source: Arc<dyn FileSource> =
2244+
Arc::new(MockSource::default().with_filter(Arc::new(BinaryExpr::new(
2245+
col("c2", &file_schema).unwrap(),
2246+
Operator::Eq,
2247+
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2248+
))));
2249+
2250+
let config = FileScanConfigBuilder::new(
2251+
object_store_url.clone(),
2252+
Arc::clone(&file_schema),
2253+
Arc::clone(&file_source),
2254+
)
2255+
.with_projection(Some(vec![0, 1, 2]))
2256+
.build();
2257+
2258+
// Simulate projection being updated. Since the filter has already been pushed down,
2259+
// the new projection won't include the filtered column.
2260+
let data_source = config
2261+
.try_swapping_with_projection(&[ProjectionExpr::new(
2262+
col("c3", &file_schema).unwrap(),
2263+
"c3".to_string(),
2264+
)])
2265+
.unwrap()
2266+
.unwrap();
2267+
2268+
// Gather the equivalence properties from the new data source. There should
2269+
// be no equivalence class for column c2 since it was removed by the projection.
2270+
let eq_properties = data_source.eq_properties();
2271+
let eq_group = eq_properties.eq_group();
2272+
2273+
for class in eq_group.iter() {
2274+
for expr in class.iter() {
2275+
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
2276+
assert_ne!(
2277+
col.name(),
2278+
"c2",
2279+
"c2 should not be present in any equivalence class"
2280+
);
2281+
}
2282+
}
2283+
}
2284+
}
2285+
22172286
#[test]
22182287
fn test_file_scan_config_builder_defaults() {
22192288
let file_schema = aggr_test_schema();

datafusion/datasource/src/test_util.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ pub(crate) struct MockSource {
3434
metrics: ExecutionPlanMetricsSet,
3535
projected_statistics: Option<Statistics>,
3636
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
37+
filter: Option<Arc<dyn PhysicalExpr>>,
38+
}
39+
40+
impl MockSource {
41+
pub fn with_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
42+
self.filter = Some(filter);
43+
self
44+
}
3745
}
3846

3947
impl FileSource for MockSource {
@@ -50,6 +58,10 @@ impl FileSource for MockSource {
5058
self
5159
}
5260

61+
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
62+
self.filter.clone()
63+
}
64+
5365
fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
5466
Arc::new(Self { ..self.clone() })
5567
}

datafusion/execution/src/task.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,14 @@ impl FunctionRegistry for TaskContext {
201201
fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
202202
vec![]
203203
}
204+
205+
fn udafs(&self) -> HashSet<String> {
206+
self.aggregate_functions.keys().cloned().collect()
207+
}
208+
209+
fn udwfs(&self) -> HashSet<String> {
210+
self.window_functions.keys().cloned().collect()
211+
}
204212
}
205213

206214
#[cfg(test)]

0 commit comments

Comments
 (0)