Skip to content

Conversation

@erratic-pattern
Copy link

@erratic-pattern erratic-pattern commented Oct 21, 2025

Related tracking issue: https://github.com/influxdata/influxdb_iox/issues/15268

Context:
We are adding additional information to track down errors in prod:

Included Patches

  1. All patches from Patched DF 49.0.2 (take 1) #73 except those that have been removed (see below)
  2. @wiedld 's patch Apply single patch to ver49.02-a #75
  3. Backport of DataFusion 50 fix for CI runners running out of space: [branch-50] Backport change to avoid debug symbols in ci builds to 50.0.0 apache/datafusion#17795
  4. disable skip_physical_aggregate_shcema_check and log warning but do not error (4e7ad0d)

Removed Patches

  1. chore: default=true for skip_physical_aggregate_schema_check, and add warn logging:

crepererum and others added 12 commits September 5, 2025 12:54
)

Bumps [tracing-subscriber](https://github.com/tokio-rs/tracing) from 0.3.19 to 0.3.20.
- [Release notes](https://github.com/tokio-rs/tracing/releases)
- [Commits](tokio-rs/tracing@tracing-subscriber-0.3.19...tracing-subscriber-0.3.20)

---
updated-dependencies:
- dependency-name: tracing-subscriber
  dependency-version: 0.3.20
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…rceDistribution) which later causes an error during EnforceSort (without our patch). The next DataFusion version 46 upgrade does the proper fix, which is to not insert the coalesce in the first place.

test: recreating the iox plan:
* demonstrate the insertion of coalesce after the use of column estimates, and the removal of the test scenario's forcing of rr repartitioning

test: reproducer of SanityCheck failure after EnforceSorting removes the coalesce added in the EnforceDistribution

fix: special case to not remove the needed coalesce
…pache#17003)

* Support centroids config for `approx_percentile_cont_with_weight`

* Match two functions' signature

* Update docs

* Address comments and unify centroids config
…ntile_cont_with_weight` (apache#16999)

* Add sqllogictests

* Allow both new and old sytanx for approx_percentile_cont and approx_percentile_cont_with_weight

* Update docs

* Add documentation and more tests
* feat: support distinct for window

* fix

* fix

* fisx

* fix unparse

* fix test

* fix test

* easy way

* add test

* add comments
…he#17404)

* test: regression test for apache#17372

* test: add more direct regression for apache#17372

* fix: return ALL constants in `EquivalenceProperties::constants`
…he#17431)

* feat: Support binary data types for `SortMergeJoin` `on` clause

* Add sql level tests for merge join on binary keys

---------

Co-authored-by: Andrew Lamb <[email protected]>
Copy link
Collaborator

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this PR is missing some content

git diff influxdata/upgrade-df-ver4902-b influxdata/upgrade-df-ver4902-c

Results in

diff --git a/Cargo.toml b/Cargo.toml
index fe6667b7a..bb2809810 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -197,7 +197,6 @@ rpath = false
 strip = false            # Retain debug info for flamegraphs

 [profile.ci]
-debug = false
 inherits = "dev"
 incremental = false

diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs
index df24c19f7..a05aa510f 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -712,10 +712,15 @@ impl DefaultPhysicalPlanner {
                             differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable()));
                         }
                     }
-                    return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
-                        .iter()
-                        .map(|s| format!("\n\t- {s}"))
-                        .join(""));
+
+                    log::warn!("Physical input schema should be the same as the one converted from logical input schema, but did not match for logical plan:\n{}", input.display_indent());
+
+                    //influx: temporarily remove error and only log so that we can find a
+                    //reproducer in production
+                    // return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
+                    //     .iter()
+                    //     .map(|s| format!("\n\t- {s}"))
+                    //     .join(""));
                 }

                 let groups = self.create_grouping_physical_expr(
diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs
index a0d2b6a96..15c0dd57a 100644
--- a/datafusion/expr/src/udaf.rs
+++ b/datafusion/expr/src/udaf.rs
@@ -459,7 +459,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {

         // exclude the first function argument(= column) in ordered set aggregate function,
         // because it is duplicated with the WITHIN GROUP clause in schema name.
-        let args = if self.is_ordered_set_aggregate() && !order_by.is_empty() {
+        let args = if self.is_ordered_set_aggregate() {
             &args[1..]
         } else {
             &args[..]
diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs
index a0d2b6a96..15c0dd57a 100644
--- a/datafusion/expr/src/udaf.rs
+++ b/datafusion/expr/src/udaf.rs
@@ -459,7 +459,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {

         // exclude the first function argument(= column) in ordered set aggregate function,
         // because it is duplicated with the WITHIN GROUP clause in schema name.
-        let args = if self.is_ordered_set_aggregate() && !order_by.is_empty() {
+        let args = if self.is_ordered_set_aggregate() {
             &args[1..]
         } else {
             &args[..]
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt
index ab31a87b9..467140834 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -1771,29 +1771,6 @@ c 122
 d 124
 e 115

-
-# using approx_percentile_cont on 2 columns with same signature
-query TII
-SELECT c1, approx_percentile_cont(c2, 0.95) AS c2, approx_percentile_cont(c3, 0.95) AS c3 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1
-----
-a 5 73
-b 5 68
-c 5 122
-d 5 124
-e 5 115
-
-# error is unique to this UDAF
-query TRR
-SELECT c1, avg(c2) AS c2, avg(c3) AS c3 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1
-----
-a 2.857142857143 -18.333333333333
-b 3.263157894737 -5.842105263158
-c 2.666666666667 -1.333333333333
-d 2.444444444444 25.444444444444
-e 3 40.333333333333
-
-
-
 query TI
 SELECT c1, approx_percentile_cont(0.95) WITHIN GROUP (ORDER BY c3 DESC) AS c3_p95 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1
 ----

Looks like we need to cherry-pick over

I am not sure where the change in b/datafusion/expr/src/udaf.rs came from 🤔

Edit: it came from 680b2aa

@alamb
Copy link
Collaborator

alamb commented Oct 21, 2025

I will do the cherry picking

* test: reproducer of bug
* fix: make schema names unique for approx_percentile_cont
* test: regression test is now resolved
@alamb
Copy link
Collaborator

alamb commented Oct 21, 2025

I looked at the diff, and it is looking good to me now

@alamb
Copy link
Collaborator

alamb commented Oct 22, 2025

Here is the diff compared to the currently deployed version (#75):

git diff influxdata/upgrade-df-ver4902-a-patched influxdata/upgrade-df-ver4902-c
diff --git a/Cargo.toml b/Cargo.toml
index bb2809810..fe6667b7a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -197,6 +197,7 @@ rpath = false
 strip = false            # Retain debug info for flamegraphs

 [profile.ci]
+debug = false
 inherits = "dev"
 incremental = false

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 6758ed479..31159d4a8 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -389,7 +389,7 @@ config_namespace! {
         ///
         /// This is used to workaround bugs in the planner that are now caught by
         /// the new schema verification step.
-        pub skip_physical_aggregate_schema_check: bool, default = true
+        pub skip_physical_aggregate_schema_check: bool, default = false

         /// Sets the compression codec used when spilling data to disk.
         ///
diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs
index 6d2393c99..a05aa510f 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -715,10 +715,12 @@ impl DefaultPhysicalPlanner {

                     log::warn!("Physical input schema should be the same as the one converted from logical input schema, but did not match for logical plan:\n{}", input.display_indent());

-                    return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
-                        .iter()
-                        .map(|s| format!("\n\t- {s}"))
-                        .join(""));
+                    //influx: temporarily remove error and only log so that we can find a
+                    //reproducer in production
+                    // return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences
+                    //     .iter()
+                    //     .map(|s| format!("\n\t- {s}"))
+                    //     .join(""));
                 }

                 let groups = self.create_grouping_physical_expr(
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt
index 0f1fb892c..f76e436e0 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -259,7 +259,7 @@ datafusion.execution.parquet.writer_version 1.0
 datafusion.execution.planning_concurrency 13
 datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8
 datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000
-datafusion.execution.skip_physical_aggregate_schema_check true
+datafusion.execution.skip_physical_aggregate_schema_check false
 datafusion.execution.soft_max_rows_per_output_file 50000000
 datafusion.execution.sort_in_place_threshold_bytes 1048576
 datafusion.execution.sort_spill_reservation_bytes 10485760
@@ -371,7 +371,7 @@ datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer ve
 datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system
 datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater t
hen partial aggregation will skip aggregation for further input
 datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode
-datafusion.execution.skip_physical_aggregate_schema_check true When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step.
+datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step.
 datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max
 datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged.
 datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured).
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index 62be57ec5..d453cb068 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -82,7 +82,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
 | datafusion.execution.parquet.maximum_parallel_row_group_writers         | 1                         | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame.                                                                                                                                                                                                                                                                                                                                                                                                                                          |
 | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2                         | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame.                                                                                                                                                                                                                                                                                                                                                                                                                                          |
 | datafusion.execution.planning_concurrency                               | 0                         | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
-| datafusion.execution.skip_physical_aggregate_schema_check               | true                      | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
+| datafusion.execution.skip_physical_aggregate_schema_check               | false                     | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
 | datafusion.execution.spill_compression                                  | uncompressed              | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed.                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
 | datafusion.execution.sort_spill_reservation_bytes                       | 10485760                  | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured).                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
 | datafusion.execution.sort_in_place_threshold_bytes                      | 1048576                   | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants