diff --git a/datafusion-examples/examples/advanced_udwf.rs b/datafusion-examples/examples/advanced_udwf.rs index 50860535b720..ba4c377fd676 100644 --- a/datafusion-examples/examples/advanced_udwf.rs +++ b/datafusion-examples/examples/advanced_udwf.rs @@ -15,9 +15,6 @@ // specific language governing permissions and limitations // under the License. -use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; -use std::any::Any; - use arrow::datatypes::Field; use arrow::{ array::{ArrayRef, AsArray, Float64Array}, @@ -33,10 +30,14 @@ use datafusion::logical_expr::function::{ }; use datafusion::logical_expr::simplify::SimplifyInfo; use datafusion::logical_expr::{ - Expr, PartitionEvaluator, Signature, WindowFrame, WindowFunctionDefinition, - WindowUDF, WindowUDFImpl, + Expr, LimitEffect, PartitionEvaluator, Signature, WindowFrame, + WindowFunctionDefinition, WindowUDF, WindowUDFImpl, }; +use datafusion::physical_expr::PhysicalExpr; use datafusion::prelude::*; +use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; +use std::any::Any; +use std::sync::Arc; /// This example shows how to use the full WindowUDFImpl API to implement a user /// defined window function. As in the `simple_udwf.rs` example, this struct implements @@ -91,6 +92,10 @@ impl WindowUDFImpl for SmoothItUdf { fn field(&self, field_args: WindowUDFFieldArgs) -> Result { Ok(Field::new(field_args.name(), DataType::Float64, true).into()) } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } /// This implements the lowest level evaluation for a window function @@ -211,6 +216,10 @@ impl WindowUDFImpl for SimplifySmoothItUdf { fn field(&self, field_args: WindowUDFFieldArgs) -> Result { Ok(Field::new(field_args.name(), DataType::Float64, true).into()) } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } // create local execution context with `cars.csv` registered as a table named `cars` diff --git a/datafusion/core/tests/user_defined/user_defined_window_functions.rs b/datafusion/core/tests/user_defined/user_defined_window_functions.rs index b3542f4da89f..cb7a04607c98 100644 --- a/datafusion/core/tests/user_defined/user_defined_window_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_window_functions.rs @@ -30,7 +30,8 @@ use datafusion::prelude::SessionContext; use datafusion_common::exec_datafusion_err; use datafusion_expr::ptr_eq::PtrEq; use datafusion_expr::{ - PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF, WindowUDFImpl, + LimitEffect, PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF, + WindowUDFImpl, }; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use datafusion_functions_window_common::{ @@ -570,6 +571,10 @@ impl OddCounter { fn field(&self, field_args: WindowUDFFieldArgs) -> Result { Ok(Field::new(field_args.name(), DataType::Int64, true).into()) } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } ctx.register_udwf(WindowUDF::from(SimpleWindowUDF::new(test_state))) @@ -689,6 +694,10 @@ impl WindowUDFImpl for VariadicWindowUDF { fn field(&self, _: WindowUDFFieldArgs) -> Result { unimplemented!("unnecessary for testing"); } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } #[test] @@ -842,6 +851,10 @@ impl WindowUDFImpl for MetadataBasedWindowUdf { .with_metadata(self.metadata.clone()) .into()) } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } #[derive(Debug)] diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 63b4333f218a..213930ac103c 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -29,8 +29,8 @@ use crate::ptr_eq::PtrEq; use crate::select_expr::SelectExpr; use crate::{ conditional_expressions::CaseBuilder, expr::Sort, logical_plan::Subquery, - AggregateUDF, Expr, LogicalPlan, Operator, PartitionEvaluator, ScalarFunctionArgs, - ScalarFunctionImplementation, ScalarUDF, Signature, Volatility, + AggregateUDF, Expr, LimitEffect, LogicalPlan, Operator, PartitionEvaluator, + ScalarFunctionArgs, ScalarFunctionImplementation, ScalarUDF, Signature, Volatility, }; use crate::{ AggregateUDFImpl, ColumnarValue, ScalarUDFImpl, WindowFrame, WindowUDF, WindowUDFImpl, @@ -42,6 +42,7 @@ use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::{plan_err, Column, Result, ScalarValue, Spans, TableReference}; use datafusion_functions_window_common::field::WindowUDFFieldArgs; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use sqlparser::ast::NullTreatment; use std::any::Any; use std::fmt::Debug; @@ -691,6 +692,10 @@ impl WindowUDFImpl for SimpleWindowUDF { true, ))) } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } pub fn interval_year_month_lit(value: &str) -> Expr { diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 4ce02391253c..96197a2d3a10 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -111,7 +111,8 @@ pub use udaf::{ pub use udf::{ scalar_doc_sections, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, }; -pub use udwf::{window_doc_sections, ReversedUDWF, WindowUDF, WindowUDFImpl}; +pub use udwf::{window_doc_sections, ReversedUDWF, WindowUDF}; +pub use udwf::{LimitEffect, WindowUDFImpl}; pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; #[cfg(test)] diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs index 29bf379f60fb..dbae7142db5e 100644 --- a/datafusion/expr/src/udwf.rs +++ b/datafusion/expr/src/udwf.rs @@ -244,11 +244,13 @@ where /// # use std::sync::LazyLock; /// # use arrow::datatypes::{DataType, Field, FieldRef}; /// # use datafusion_common::{DataFusionError, plan_err, Result}; -/// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, WindowFrame, ExprFunctionExt, Documentation}; +/// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, WindowFrame, ExprFunctionExt, Documentation, LimitEffect}; /// # use datafusion_expr::{WindowUDFImpl, WindowUDF}; /// # use datafusion_functions_window_common::field::WindowUDFFieldArgs; /// # use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; /// # use datafusion_expr::window_doc_sections::DOC_SECTION_ANALYTICAL; +/// # use datafusion_physical_expr_common::physical_expr; +/// # use std::sync::Arc; /// /// #[derive(Debug, Clone, PartialEq, Eq, Hash)] /// struct SmoothIt { @@ -295,6 +297,9 @@ where /// fn documentation(&self) -> Option<&Documentation> { /// Some(get_doc()) /// } +/// fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { +/// LimitEffect::Unknown +/// } /// } /// /// // Create a new WindowUDF from the implementation @@ -414,6 +419,23 @@ pub trait WindowUDFImpl: Debug + DynEq + DynHash + Send + Sync { fn documentation(&self) -> Option<&Documentation> { None } + + /// If not causal, returns the effect this function will have on the window + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } +} + +/// the effect this function will have on the limit pushdown +pub enum LimitEffect { + /// Does not affect the limit (i.e. this is causal) + None, + /// Either undeclared, or dynamic (only evaluatable at run time) + Unknown, + /// Grow the limit by N rows + Relative(usize), + /// Limit needs to be at least N rows + Absolute(usize), } pub enum ReversedUDWF { @@ -522,6 +544,10 @@ impl WindowUDFImpl for AliasedWindowUDFImpl { fn documentation(&self) -> Option<&Documentation> { self.inner.documentation() } + + fn limit_effect(&self, args: &[Arc]) -> LimitEffect { + self.inner.limit_effect(args) + } } // Window UDF doc sections for use in public documentation @@ -557,15 +583,17 @@ pub mod window_doc_sections { #[cfg(test)] mod test { - use crate::{PartitionEvaluator, WindowUDF, WindowUDFImpl}; + use crate::{LimitEffect, PartitionEvaluator, WindowUDF, WindowUDFImpl}; use arrow::datatypes::{DataType, FieldRef}; use datafusion_common::Result; use datafusion_expr_common::signature::{Signature, Volatility}; use datafusion_functions_window_common::field::WindowUDFFieldArgs; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use std::any::Any; use std::cmp::Ordering; use std::hash::{DefaultHasher, Hash, Hasher}; + use std::sync::Arc; #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct AWindowUDF { @@ -604,6 +632,10 @@ mod test { fn field(&self, _field_args: WindowUDFFieldArgs) -> Result { unimplemented!() } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -643,6 +675,10 @@ mod test { fn field(&self, _field_args: WindowUDFFieldArgs) -> Result { unimplemented!() } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } #[test] diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index 5362734db2f7..cc23e801d6c9 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -25,6 +25,8 @@ use arrow::{ datatypes::{DataType, SchemaRef}, }; use arrow_schema::{Field, FieldRef}; +use datafusion::logical_expr::LimitEffect; +use datafusion::physical_expr::PhysicalExpr; use datafusion::{ error::DataFusionError, logical_expr::{ @@ -348,6 +350,10 @@ impl WindowUDFImpl for ForeignWindowUDF { let options: Option<&FFI_SortOptions> = self.udf.sort_options.as_ref().into(); options.map(|s| s.into()) } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } #[repr(C)] diff --git a/datafusion/functions-window/src/cume_dist.rs b/datafusion/functions-window/src/cume_dist.rs index 9b4c682f2e36..372086b12d5e 100644 --- a/datafusion/functions-window/src/cume_dist.rs +++ b/datafusion/functions-window/src/cume_dist.rs @@ -23,11 +23,12 @@ use datafusion_common::arrow::datatypes::DataType; use datafusion_common::arrow::datatypes::Field; use datafusion_common::Result; use datafusion_expr::{ - Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl, + Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl, }; use datafusion_functions_window_common::field; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use datafusion_macros::user_doc; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use field::WindowUDFFieldArgs; use std::any::Any; use std::fmt::Debug; @@ -110,6 +111,10 @@ impl WindowUDFImpl for CumeDist { fn documentation(&self) -> Option<&Documentation> { self.doc() } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } #[derive(Debug, Default)] diff --git a/datafusion/functions-window/src/lead_lag.rs b/datafusion/functions-window/src/lead_lag.rs index 7950cc93f8b4..d5687bc687aa 100644 --- a/datafusion/functions-window/src/lead_lag.rs +++ b/datafusion/functions-window/src/lead_lag.rs @@ -25,12 +25,13 @@ use datafusion_common::arrow::datatypes::Field; use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::window_doc_sections::DOC_SECTION_ANALYTICAL; use datafusion_expr::{ - Documentation, Literal, PartitionEvaluator, ReversedUDWF, Signature, TypeSignature, - Volatility, WindowUDFImpl, + Documentation, LimitEffect, Literal, PartitionEvaluator, ReversedUDWF, Signature, + TypeSignature, Volatility, WindowUDFImpl, }; use datafusion_functions_window_common::expr::ExpressionArgs; use datafusion_functions_window_common::field::WindowUDFFieldArgs; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; +use datafusion_physical_expr::expressions; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use std::any::Any; use std::cmp::min; @@ -95,7 +96,7 @@ pub fn lead( } #[derive(Debug, PartialEq, Eq, Hash)] -enum WindowShiftKind { +pub enum WindowShiftKind { Lag, Lead, } @@ -148,6 +149,10 @@ impl WindowShift { pub fn lead() -> Self { Self::new(WindowShiftKind::Lead) } + + pub fn kind(&self) -> &WindowShiftKind { + &self.kind + } } static LAG_DOCUMENTATION: LazyLock = LazyLock::new(|| { @@ -299,6 +304,26 @@ impl WindowUDFImpl for WindowShift { WindowShiftKind::Lead => Some(get_lead_doc()), } } + + fn limit_effect(&self, args: &[Arc]) -> LimitEffect { + if self.kind == WindowShiftKind::Lag { + return LimitEffect::None; + } + match args { + [_, expr, ..] => { + let Some(lit) = expr.as_any().downcast_ref::() + else { + return LimitEffect::Unknown; + }; + let ScalarValue::Int64(Some(amount)) = lit.value() else { + return LimitEffect::Unknown; // we should only get int64 from the parser + }; + LimitEffect::Relative((*amount).max(0) as usize) + } + [_] => LimitEffect::Relative(1), // default value + _ => LimitEffect::Unknown, // invalid arguments + } + } } /// When `lead`/`lag` is evaluated on a `NULL` expression we attempt to @@ -330,10 +355,8 @@ fn parse_expr( let default_value = get_scalar_value_from_args(input_exprs, 2)?; default_value.map_or(Ok(expr), |value| { - ScalarValue::try_from(&value.data_type()).map(|v| { - Arc::new(datafusion_physical_expr::expressions::Literal::new(v)) - as Arc - }) + ScalarValue::try_from(&value.data_type()) + .map(|v| Arc::new(expressions::Literal::new(v)) as Arc) }) } diff --git a/datafusion/functions-window/src/nth_value.rs b/datafusion/functions-window/src/nth_value.rs index 309978e9e718..7a27e8a18b18 100644 --- a/datafusion/functions-window/src/nth_value.rs +++ b/datafusion/functions-window/src/nth_value.rs @@ -26,18 +26,19 @@ use datafusion_common::{exec_datafusion_err, exec_err, Result, ScalarValue}; use datafusion_expr::window_doc_sections::DOC_SECTION_ANALYTICAL; use datafusion_expr::window_state::WindowAggState; use datafusion_expr::{ - Documentation, Literal, PartitionEvaluator, ReversedUDWF, Signature, TypeSignature, - Volatility, WindowUDFImpl, + Documentation, LimitEffect, Literal, PartitionEvaluator, ReversedUDWF, Signature, + TypeSignature, Volatility, WindowUDFImpl, }; use datafusion_functions_window_common::field; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use field::WindowUDFFieldArgs; use std::any::Any; use std::cmp::Ordering; use std::fmt::Debug; use std::hash::Hash; use std::ops::Range; -use std::sync::LazyLock; +use std::sync::{Arc, LazyLock}; get_or_init_udwf!( First, @@ -126,6 +127,10 @@ impl NthValue { pub fn nth() -> Self { Self::new(NthValueKind::Nth) } + + pub fn kind(&self) -> &NthValueKind { + &self.kind + } } static FIRST_VALUE_DOCUMENTATION: LazyLock = LazyLock::new(|| { @@ -337,6 +342,10 @@ impl WindowUDFImpl for NthValue { NthValueKind::Nth => Some(get_nth_value_doc()), } } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::None // NthValue is causal + } } #[derive(Debug, Clone)] diff --git a/datafusion/functions-window/src/ntile.rs b/datafusion/functions-window/src/ntile.rs index f8deac6b3365..b26be377e923 100644 --- a/datafusion/functions-window/src/ntile.rs +++ b/datafusion/functions-window/src/ntile.rs @@ -25,11 +25,13 @@ use datafusion_common::arrow::array::{ArrayRef, UInt64Array}; use datafusion_common::arrow::datatypes::{DataType, Field}; use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr::{ - Documentation, Expr, PartitionEvaluator, Signature, Volatility, WindowUDFImpl, + Documentation, Expr, LimitEffect, PartitionEvaluator, Signature, Volatility, + WindowUDFImpl, }; use datafusion_functions_window_common::field; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use datafusion_macros::user_doc; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use field::WindowUDFFieldArgs; use std::any::Any; use std::fmt::Debug; @@ -158,6 +160,10 @@ impl WindowUDFImpl for Ntile { fn documentation(&self) -> Option<&Documentation> { self.doc() } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } #[derive(Debug)] diff --git a/datafusion/functions-window/src/rank.rs b/datafusion/functions-window/src/rank.rs index bc88572a921e..26b501f6286d 100644 --- a/datafusion/functions-window/src/rank.rs +++ b/datafusion/functions-window/src/rank.rs @@ -29,10 +29,11 @@ use datafusion_common::utils::get_row_at_idx; use datafusion_common::{exec_err, Result, ScalarValue}; use datafusion_expr::window_doc_sections::DOC_SECTION_RANKING; use datafusion_expr::{ - Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl, + Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl, }; use datafusion_functions_window_common::field; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use field::WindowUDFFieldArgs; use std::any::Any; use std::fmt::Debug; @@ -240,6 +241,14 @@ impl WindowUDFImpl for Rank { RankType::Percent => Some(get_percent_rank_doc()), } } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + match self.rank_type { + RankType::Basic => LimitEffect::None, + RankType::Dense => LimitEffect::None, + RankType::Percent => LimitEffect::Unknown, + } + } } /// State for the RANK(rank) built-in window function. diff --git a/datafusion/functions-window/src/row_number.rs b/datafusion/functions-window/src/row_number.rs index 2ef490c3c3ed..d7d298cecead 100644 --- a/datafusion/functions-window/src/row_number.rs +++ b/datafusion/functions-window/src/row_number.rs @@ -25,15 +25,17 @@ use datafusion_common::arrow::datatypes::DataType; use datafusion_common::arrow::datatypes::Field; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{ - Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl, + Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl, }; use datafusion_functions_window_common::field; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use datafusion_macros::user_doc; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use field::WindowUDFFieldArgs; use std::any::Any; use std::fmt::Debug; use std::ops::Range; +use std::sync::Arc; define_udwf_and_expr!( RowNumber, @@ -121,6 +123,10 @@ impl WindowUDFImpl for RowNumber { fn documentation(&self) -> Option<&Documentation> { self.doc() } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::None + } } /// State for the `row_number` built-in window function. @@ -140,7 +146,7 @@ impl PartitionEvaluator for NumRowsEvaluator { _values: &[ArrayRef], num_rows: usize, ) -> Result { - Ok(std::sync::Arc::new(UInt64Array::from_iter_values( + Ok(Arc::new(UInt64Array::from_iter_values( 1..(num_rows as u64) + 1, ))) } diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 3de5a80a9782..7e391dafc111 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -2043,6 +2043,7 @@ mod tests { }; use datafusion_functions_window_common::field::WindowUDFFieldArgs; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; + use datafusion_physical_expr::PhysicalExpr; use std::hash::Hash; use std::{ collections::HashMap, @@ -4387,6 +4388,10 @@ mod tests { fn field(&self, _field_args: WindowUDFFieldArgs) -> Result { unimplemented!("not needed for tests") } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } #[derive(Debug, PartialEq, Eq, Hash)] struct VolatileUdf { diff --git a/datafusion/physical-expr/src/window/standard_window_function_expr.rs b/datafusion/physical-expr/src/window/standard_window_function_expr.rs index 871f735e9a96..ca7c3a4db3d4 100644 --- a/datafusion/physical-expr/src/window/standard_window_function_expr.rs +++ b/datafusion/physical-expr/src/window/standard_window_function_expr.rs @@ -21,7 +21,7 @@ use arrow::array::ArrayRef; use arrow::datatypes::{FieldRef, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::Result; -use datafusion_expr::PartitionEvaluator; +use datafusion_expr::{LimitEffect, PartitionEvaluator}; use std::any::Any; use std::sync::Arc; @@ -90,4 +90,6 @@ pub trait StandardWindowFunctionExpr: Send + Sync + std::fmt::Debug { fn get_result_ordering(&self, _schema: &SchemaRef) -> Option { None } + + fn limit_effect(&self) -> LimitEffect; } diff --git a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs index e2e5a839ef07..1c671cd07488 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs @@ -19,12 +19,18 @@ use crate::PhysicalOptimizerRule; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::ScalarValue; -use datafusion_expr::{WindowFrameBound, WindowFrameUnits}; +use datafusion_expr::{LimitEffect, WindowFrameBound, WindowFrameUnits}; +use datafusion_physical_expr::window::{ + PlainAggregateWindowExpr, SlidingAggregateWindowExpr, StandardWindowExpr, + StandardWindowFunctionExpr, WindowExpr, +}; use datafusion_physical_plan::execution_plan::CardinalityEffect; use datafusion_physical_plan::limit::GlobalLimitExec; +use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort::SortExec; -use datafusion_physical_plan::windows::BoundedWindowAggExec; -use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowUDFExpr}; +use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use std::cmp; use std::sync::Arc; @@ -41,6 +47,29 @@ impl LimitPushPastWindows { } } +#[derive(Eq, PartialEq)] +enum Phase { + FindOrGrow, + Apply, +} + +#[derive(Default)] +struct TraverseState { + pub limit: Option, + pub lookahead: usize, +} + +impl TraverseState { + pub fn reset_limit(&mut self, limit: Option) { + self.limit = limit; + self.lookahead = 0; + } + + pub fn max_lookahead(&mut self, new_val: usize) { + self.lookahead = self.lookahead.max(new_val); + } +} + impl PhysicalOptimizerRule for LimitPushPastWindows { fn optimize( &self, @@ -50,67 +79,62 @@ impl PhysicalOptimizerRule for LimitPushPastWindows { if !config.optimizer.enable_window_limits { return Ok(original); } - let mut latest_limit: Option = None; - let mut latest_max = 0; + let mut ctx = TraverseState::default(); + let mut phase = Phase::FindOrGrow; let result = original.transform_down(|node| { // helper closure to DRY out most the early return cases - let mut reset = |node, - max: &mut usize| + let reset = |node, + ctx: &mut TraverseState| -> datafusion_common::Result< Transformed>, > { - latest_limit = None; - *max = 0; + ctx.limit = None; + ctx.lookahead = 0; Ok(Transformed::no(node)) }; // traversing sides of joins will require more thought if node.children().len() > 1 { - return reset(node, &mut latest_max); + return reset(node, &mut ctx); } // grab the latest limit we see - if let Some(limit) = node.as_any().downcast_ref::() { - latest_limit = limit.fetch().map(|fetch| fetch + limit.skip()); - latest_max = 0; + if phase == Phase::FindOrGrow && get_limit(&node, &mut ctx) { return Ok(Transformed::no(node)); } // grow the limit if we hit a window function if let Some(window) = node.as_any().downcast_ref::() { - for expr in window.window_expr().iter() { - let frame = expr.get_window_frame(); - if frame.units != WindowFrameUnits::Rows { - return reset(node, &mut latest_max); // expression-based limits? - } - let Some(end_bound) = bound_to_usize(&frame.end_bound) else { - return reset(node, &mut latest_max); - }; - latest_max = cmp::max(end_bound, latest_max); + phase = Phase::Apply; + if !grow_limit(window, &mut ctx) { + return reset(node, &mut ctx); } return Ok(Transformed::no(node)); } - // Apply the limit if we hit a sort node - if let Some(sort) = node.as_any().downcast_ref::() { - let latest = latest_limit.take(); - let Some(fetch) = latest else { - latest_max = 0; - return Ok(Transformed::no(node)); - }; - let fetch = match sort.fetch() { - None => fetch + latest_max, - Some(existing) => cmp::min(existing, fetch + latest_max), - }; - let sort: Arc = Arc::new(sort.with_fetch(Some(fetch))); - latest_max = 0; - return Ok(Transformed::complete(sort)); + // Apply the limit if we hit a sortpreservingmerge node + if phase == Phase::Apply { + if let Some(out) = apply_limit(&node, &mut ctx) { + return Ok(out); + } } - // we can't push the limit past nodes that decrease row count + // nodes along the way + if !node.supports_limit_pushdown() { + return reset(node, &mut ctx); + } + if let Some(part) = node.as_any().downcast_ref::() { + let output = part.partitioning().partition_count(); + let input = part.input().output_partitioning().partition_count(); + if output < input { + return reset(node, &mut ctx); + } + } match node.cardinality_effect() { + CardinalityEffect::Unknown => return reset(node, &mut ctx), + CardinalityEffect::LowerEqual => return reset(node, &mut ctx), CardinalityEffect::Equal => {} - _ => return reset(node, &mut latest_max), + CardinalityEffect::GreaterEqual => {} } Ok(Transformed::no(node)) @@ -127,6 +151,99 @@ impl PhysicalOptimizerRule for LimitPushPastWindows { } } +fn grow_limit(window: &BoundedWindowAggExec, ctx: &mut TraverseState) -> bool { + let mut max_rel = 0; + for expr in window.window_expr().iter() { + // grow based on function requirements + match get_limit_effect(expr) { + LimitEffect::None => {} + LimitEffect::Unknown => return false, + LimitEffect::Relative(rel) => max_rel = max_rel.max(rel), + LimitEffect::Absolute(val) => { + let cur = ctx.limit.unwrap_or(0); + ctx.limit = Some(cur.max(val)) + } + } + + // grow based on frames + let frame = expr.get_window_frame(); + if frame.units != WindowFrameUnits::Rows { + return false; // expression-based limits not statically evaluatable + } + let Some(end_bound) = bound_to_usize(&frame.end_bound) else { + return false; // can't optimize unbounded window expressions + }; + ctx.max_lookahead(end_bound); + } + + // finish grow + ctx.max_lookahead(ctx.lookahead + max_rel); + true +} + +fn apply_limit( + node: &Arc, + ctx: &mut TraverseState, +) -> Option>> { + if !node.as_any().is::() && !node.as_any().is::() { + return None; + } + let latest = ctx.limit.take(); + let Some(fetch) = latest else { + ctx.limit = None; + ctx.lookahead = 0; + return Some(Transformed::no(Arc::clone(node))); + }; + let fetch = match node.fetch() { + None => fetch + ctx.lookahead, + Some(existing) => cmp::min(existing, fetch + ctx.lookahead), + }; + Some(Transformed::complete(node.with_fetch(Some(fetch)).unwrap())) +} + +fn get_limit(node: &Arc, ctx: &mut TraverseState) -> bool { + if let Some(limit) = node.as_any().downcast_ref::() { + ctx.reset_limit(limit.fetch().map(|fetch| fetch + limit.skip())); + return true; + } + if let Some(limit) = node.as_any().downcast_ref::() { + ctx.reset_limit(limit.fetch()); + return true; + } + false +} + +/// Examines the `WindowExpr` and decides: +/// 1. The expression does not change the window size +/// 2. The expression grows it by X amount +/// 3. We don't know +/// +/// # Arguments +/// +/// * `expr` the expression to examine +/// +/// # Returns +/// +/// The effect on the limit +fn get_limit_effect(expr: &Arc) -> LimitEffect { + // White list aggregates + if expr.as_any().is::() + || expr.as_any().is::() + { + return LimitEffect::None; + } + + // Grab the window function + let Some(swe) = expr.as_any().downcast_ref::() else { + return LimitEffect::Unknown; // should be only remaining type + }; + let swfe = swe.get_standard_func_expr(); + let Some(udf) = swfe.as_any().downcast_ref::() else { + return LimitEffect::Unknown; // should be only remaining type + }; + udf.limit_effect() +} + fn bound_to_usize(bound: &WindowFrameBound) -> Option { match bound { WindowFrameBound::Preceding(_) => Some(0), @@ -137,5 +254,3 @@ fn bound_to_usize(bound: &WindowFrameBound) -> Option { _ => None, } } - -// tests: all branches are covered by sqllogictests diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index ddc2bfa10ea7..e9249f12d9ea 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -33,7 +33,7 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow_schema::{FieldRef, SortOptions}; use datafusion_common::{exec_err, Result}; use datafusion_expr::{ - PartitionEvaluator, ReversedUDWF, SetMonotonicity, WindowFrame, + LimitEffect, PartitionEvaluator, ReversedUDWF, SetMonotonicity, WindowFrame, WindowFunctionDefinition, WindowUDF, }; use datafusion_functions_window_common::expr::ExpressionArgs; @@ -281,6 +281,10 @@ impl StandardWindowFunctionExpr for WindowUDFExpr { PhysicalSortExpr { expr, options } }) } + + fn limit_effect(&self) -> LimitEffect { + self.fun.inner().limit_effect(self.args.as_slice()) + } } pub(crate) fn calc_requirements< diff --git a/datafusion/proto/tests/cases/mod.rs b/datafusion/proto/tests/cases/mod.rs index 1c52a872df13..aec6c1de3030 100644 --- a/datafusion/proto/tests/cases/mod.rs +++ b/datafusion/proto/tests/cases/mod.rs @@ -17,17 +17,19 @@ use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion::logical_expr::ColumnarValue; +use datafusion::physical_expr::PhysicalExpr; use datafusion_common::plan_err; use datafusion_expr::function::AccumulatorArgs; use datafusion_expr::{ - Accumulator, AggregateUDFImpl, PartitionEvaluator, ScalarFunctionArgs, ScalarUDFImpl, - Signature, Volatility, WindowUDFImpl, + Accumulator, AggregateUDFImpl, LimitEffect, PartitionEvaluator, ScalarFunctionArgs, + ScalarUDFImpl, Signature, Volatility, WindowUDFImpl, }; use datafusion_functions_window_common::field::WindowUDFFieldArgs; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use std::any::Any; use std::fmt::Debug; use std::hash::Hash; +use std::sync::Arc; mod roundtrip_logical_plan; mod roundtrip_physical_plan; @@ -173,6 +175,10 @@ impl WindowUDFImpl for CustomUDWF { ) -> datafusion_common::Result { Ok(Field::new(field_args.name(), DataType::UInt64, false).into()) } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } #[derive(Debug)] diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index c76036a4344f..fb9b64b7bc5e 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -60,6 +60,7 @@ use datafusion::functions_window::expr_fn::{ cume_dist, dense_rank, lag, lead, ntile, percent_rank, rank, row_number, }; use datafusion::functions_window::rank::rank_udwf; +use datafusion::physical_expr::PhysicalExpr; use datafusion::prelude::*; use datafusion::test_util::{TestTableFactory, TestTableProvider}; use datafusion_common::config::TableOptions; @@ -75,10 +76,10 @@ use datafusion_expr::expr::{ }; use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNodeCore}; use datafusion_expr::{ - Accumulator, AggregateUDF, ColumnarValue, ExprFunctionExt, ExprSchemable, Literal, - LogicalPlan, Operator, PartitionEvaluator, ScalarUDF, Signature, TryCast, Volatility, - WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, WindowUDF, - WindowUDFImpl, + Accumulator, AggregateUDF, ColumnarValue, ExprFunctionExt, ExprSchemable, + LimitEffect, Literal, LogicalPlan, Operator, PartitionEvaluator, ScalarUDF, + Signature, TryCast, Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits, + WindowFunctionDefinition, WindowUDF, WindowUDFImpl, }; use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::expr_fn::{ @@ -2544,6 +2545,10 @@ fn roundtrip_window() { ) } } + + fn limit_effect(&self, _args: &[Arc]) -> LimitEffect { + LimitEffect::Unknown + } } fn make_partition_evaluator() -> Result> { diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index e81662a75319..f07ca0de0f33 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -5966,8 +5966,8 @@ physical_plan 01)ProjectionExec: expr=[c1@2 as c1, c2@3 as c2, sum(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum1, sum(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum2, count(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as count1, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@7 as array_agg1, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@8 as array_agg2] 02)--GlobalLimitExec: skip=0, fetch=5 03)----BoundedWindowAggExec: wdw=[sum(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "sum(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, sum(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "sum(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, count(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "count(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: List(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "array_agg(test.c2) FILTER (WHERE test.c2 >= Int64(2) AND test.c2 < Int64(4) AND test.c1 > Int64(0)) ORDER BY [test.c1 ASC NULLS LAST, test.c2 ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: List(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -04)------SortPreservingMergeExec: [c1@2 ASC NULLS LAST, c2@3 ASC NULLS LAST] -05)--------SortExec: expr=[c1@2 ASC NULLS LAST, c2@3 ASC NULLS LAST], preserve_partitioning=[true] +04)------SortPreservingMergeExec: [c1@2 ASC NULLS LAST, c2@3 ASC NULLS LAST], fetch=5 +05)--------SortExec: TopK(fetch=5), expr=[c1@2 ASC NULLS LAST, c2@3 ASC NULLS LAST], preserve_partitioning=[true] 06)----------ProjectionExec: expr=[__common_expr_3@0 as __common_expr_1, __common_expr_3@0 AND c2@2 < 4 AND c1@1 > 0 as __common_expr_2, c1@1 as c1, c2@2 as c2] 07)------------ProjectionExec: expr=[c2@1 >= 2 as __common_expr_3, c1@0 as c1, c2@1 as c2] 08)--------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_csv/partition-0.csv], [WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_csv/partition-1.csv], [WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_csv/partition-2.csv], [WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_csv/partition-3.csv]]}, projection=[c1, c2], file_type=csv, has_header=false diff --git a/datafusion/sqllogictest/test_files/window_limits.slt b/datafusion/sqllogictest/test_files/window_limits.slt new file mode 100644 index 000000000000..c1e680084f4b --- /dev/null +++ b/datafusion/sqllogictest/test_files/window_limits.slt @@ -0,0 +1,769 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# see https://datafusion.apache.org/user-guide/sql/window_functions.html#syntax for field names & examples +statement ok +CREATE EXTERNAL TABLE employees ( + depname VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + empno INT NOT NULL, + salary BIGINT UNSIGNED NOT NULL, + c10 VARCHAR NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL, + hire_date DATE NOT NULL, + c15 TIMESTAMP NOT NULL, +) +STORED AS CSV +LOCATION '../../testing/data/csv/aggregate_test_100_with_dates.csv' +OPTIONS ('format.has_header' 'true'); + +# lead defaults to 1 and should grow limit +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query I +SELECT LEAD(empno) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM employees LIMIT 3 +---- +299 +363 +417 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query I +SELECT LEAD(empno) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM employees LIMIT 3 +---- +299 +363 +417 + +query TT +EXPLAIN +SELECT LEAD(empno) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM employees LIMIT 3 +---- +logical_plan +01)Projection: lead(employees.empno) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +02)--Limit: skip=0, fetch=3 +03)----WindowAggr: windowExpr=[[lead(employees.empno) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: employees projection=[empno] +physical_plan +01)ProjectionExec: expr=[lead(employees.empno) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as lead(employees.empno) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] +02)--GlobalLimitExec: skip=0, fetch=3 +03)----BoundedWindowAggExec: wdw=[lead(employees.empno) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "lead(employees.empno) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: TopK(fetch=4), expr=[empno@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[empno], file_type=csv, has_header=true + +# lead defaults can lookahead by any amount and should grow limit +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query I +SELECT LEAD(empno, 2) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM employees LIMIT 3 +---- +363 +417 +794 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query I +SELECT LEAD(empno, 2) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM employees LIMIT 3 +---- +363 +417 +794 + +query TT +EXPLAIN +SELECT LEAD(empno, 2) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM employees LIMIT 3 +---- +logical_plan +01)Projection: lead(employees.empno,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +02)--Limit: skip=0, fetch=3 +03)----WindowAggr: windowExpr=[[lead(employees.empno, Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: employees projection=[empno] +physical_plan +01)ProjectionExec: expr=[lead(employees.empno,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as lead(employees.empno,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] +02)--GlobalLimitExec: skip=0, fetch=3 +03)----BoundedWindowAggExec: wdw=[lead(employees.empno,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "lead(employees.empno,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: TopK(fetch=5), expr=[empno@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[empno], file_type=csv, has_header=true + +# Should use the max of leads +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query IIII +SELECT + empno, + LEAD(salary, 1) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lead1, + LEAD(salary, 3) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lead3, + LEAD(salary, 5) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lead5 +FROM employees +ORDER BY empno +LIMIT 5; +---- +102 28774375 557517119 4015442341 +299 1865307672 4061635107 3542840110 +363 557517119 4015442341 1088543984 +417 4061635107 3542840110 1362369177 +794 4015442341 1088543984 145294611 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query IIII +SELECT + empno, + LEAD(salary, 1) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lead1, + LEAD(salary, 3) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lead3, + LEAD(salary, 5) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lead5 +FROM employees +ORDER BY empno +LIMIT 5; +---- +102 28774375 557517119 4015442341 +299 1865307672 4061635107 3542840110 +363 557517119 4015442341 1088543984 +417 4061635107 3542840110 1362369177 +794 4015442341 1088543984 145294611 + +query TT +EXPLAIN +SELECT + empno, + LEAD(salary, 1) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lead1, + LEAD(salary, 3) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lead3, + LEAD(salary, 5) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lead5 +FROM employees +ORDER BY empno +LIMIT 5; +---- +logical_plan +01)Sort: employees.empno ASC NULLS LAST, fetch=5 +02)--Projection: employees.empno, lead(employees.salary,Int64(1)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS lead1, lead(employees.salary,Int64(3)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS lead3, lead(employees.salary,Int64(5)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS lead5 +03)----WindowAggr: windowExpr=[[lead(employees.salary, Int64(1)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, lead(employees.salary, Int64(3)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, lead(employees.salary, Int64(5)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: employees projection=[empno, salary] +physical_plan +01)ProjectionExec: expr=[empno@0 as empno, lead(employees.salary,Int64(1)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as lead1, lead(employees.salary,Int64(3)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as lead3, lead(employees.salary,Int64(5)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as lead5] +02)--GlobalLimitExec: skip=0, fetch=5 +03)----BoundedWindowAggExec: wdw=[lead(employees.salary,Int64(1)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "lead(employees.salary,Int64(1)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, lead(employees.salary,Int64(3)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "lead(employees.salary,Int64(3)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, lead(employees.salary,Int64(5)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "lead(employees.salary,Int64(5)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: TopK(fetch=10), expr=[empno@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[empno, salary], file_type=csv, has_header=true + +# 2 < 3... nth_value should not grow the limit +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query I +SELECT NTH_VALUE(empno, 2) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM employees LIMIT 3 +---- +NULL +299 +299 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query I +SELECT NTH_VALUE(empno, 2) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM employees LIMIT 3 +---- +NULL +299 +299 + +query TT +EXPLAIN +SELECT NTH_VALUE(empno, 2) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM employees LIMIT 3 +---- +logical_plan +01)Projection: nth_value(employees.empno,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +02)--Limit: skip=0, fetch=3 +03)----WindowAggr: windowExpr=[[nth_value(employees.empno, Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: employees projection=[empno] +physical_plan +01)ProjectionExec: expr=[nth_value(employees.empno,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as nth_value(employees.empno,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] +02)--GlobalLimitExec: skip=0, fetch=3 +03)----BoundedWindowAggExec: wdw=[nth_value(employees.empno,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "nth_value(employees.empno,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: TopK(fetch=3), expr=[empno@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[empno], file_type=csv, has_header=true + +# 5 > 3... nth_value still won't grow the limit - it's causal +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query I +SELECT NTH_VALUE(empno, 5) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM employees LIMIT 3 +---- +NULL +NULL +NULL + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query I +SELECT NTH_VALUE(empno, 5) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM employees LIMIT 3 +---- +NULL +NULL +NULL + +query TT +EXPLAIN +SELECT NTH_VALUE(empno, 5) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM employees LIMIT 3 +---- +logical_plan +01)Projection: nth_value(employees.empno,Int64(5)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +02)--Limit: skip=0, fetch=3 +03)----WindowAggr: windowExpr=[[nth_value(employees.empno, Int64(5)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: employees projection=[empno] +physical_plan +01)ProjectionExec: expr=[nth_value(employees.empno,Int64(5)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as nth_value(employees.empno,Int64(5)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] +02)--GlobalLimitExec: skip=0, fetch=3 +03)----BoundedWindowAggExec: wdw=[nth_value(employees.empno,Int64(5)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "nth_value(employees.empno,Int64(5)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: TopK(fetch=3), expr=[empno@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[empno], file_type=csv, has_header=true + +# aggregate functions shouldn't affect the window +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query TIIRII +SELECT + depname, + empno, + SUM(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_sum, + AVG(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_avg, + MIN(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_min, + MAX(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_max +FROM employees +LIMIT 5; +---- +a 102 3276123488 3276123488 3276123488 3276123488 +e 299 3304897863 1652448931.5 28774375 3276123488 +a 363 5170205535 1723401845 28774375 3276123488 +e 417 5727722654 1431930663.5 28774375 3276123488 +d 794 9789357761 1957871552.2 28774375 4061635107 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query TIIRII +SELECT + depname, + empno, + SUM(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_sum, + AVG(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_avg, + MIN(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_min, + MAX(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_max +FROM employees +LIMIT 5; +---- +a 102 3276123488 3276123488 3276123488 3276123488 +e 299 3304897863 1652448931.5 28774375 3276123488 +a 363 5170205535 1723401845 28774375 3276123488 +e 417 5727722654 1431930663.5 28774375 3276123488 +d 794 9789357761 1957871552.2 28774375 4061635107 + +query TT +EXPLAIN +SELECT + depname, + empno, + SUM(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_sum, + AVG(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_avg, + MIN(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_min, + MAX(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS running_max +FROM employees +LIMIT 5; +---- +logical_plan +01)Projection: employees.depname, employees.empno, sum(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS running_sum, avg(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS running_avg, min(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS running_min, max(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS running_max +02)--Limit: skip=0, fetch=5 +03)----WindowAggr: windowExpr=[[sum(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, avg(CAST(employees.salary AS Float64)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, min(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, max(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: employees projection=[depname, empno, salary] +physical_plan +01)ProjectionExec: expr=[depname@0 as depname, empno@1 as empno, sum(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as running_sum, avg(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as running_avg, min(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as running_min, max(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as running_max] +02)--GlobalLimitExec: skip=0, fetch=5 +03)----BoundedWindowAggExec: wdw=[sum(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "sum(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, avg(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "avg(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, min(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "min(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, max(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "max(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: TopK(fetch=5), expr=[empno@1 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[depname, empno, salary], file_type=csv, has_header=true + +# ranking functions that don't affect the limit +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query IIII +SELECT + empno, + row_number() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn, + rank() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rnk, + dense_rank() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS drnk +FROM employees +ORDER BY empno +LIMIT 5; +---- +102 1 1 1 +299 2 2 2 +363 3 3 3 +417 4 4 4 +794 5 5 5 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query IIII +SELECT + empno, + row_number() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn, + rank() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rnk, + dense_rank() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS drnk +FROM employees +ORDER BY empno +LIMIT 5; +---- +102 1 1 1 +299 2 2 2 +363 3 3 3 +417 4 4 4 +794 5 5 5 + +query TT +EXPLAIN +SELECT + empno, + row_number() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn, + rank() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rnk, + dense_rank() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS drnk +FROM employees +ORDER BY empno +LIMIT 5; +---- +logical_plan +01)Sort: employees.empno ASC NULLS LAST, fetch=5 +02)--Projection: employees.empno, row_number() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn, rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rnk, dense_rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS drnk +03)----WindowAggr: windowExpr=[[row_number() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, dense_rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: employees projection=[empno] +physical_plan +01)ProjectionExec: expr=[empno@0 as empno, row_number() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as rn, rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as rnk, dense_rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as drnk] +02)--GlobalLimitExec: skip=0, fetch=5 +03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "row_number() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, dense_rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "dense_rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: TopK(fetch=5), expr=[empno@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[empno], file_type=csv, has_header=true + +# Unoptimizable global ranking functions +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query IRRI +SELECT + empno, + percent_rank() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS pr, + cume_dist() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cd, + ntile(4) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nt +FROM employees +ORDER BY empno +LIMIT 5; +---- +102 0 0.01 1 +299 0.010101010101 0.02 1 +363 0.020202020202 0.03 1 +417 0.030303030303 0.04 1 +794 0.040404040404 0.05 1 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query IRRI +SELECT + empno, + percent_rank() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS pr, + cume_dist() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cd, + ntile(4) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nt +FROM employees +ORDER BY empno +LIMIT 5; +---- +102 0 0.01 1 +299 0.010101010101 0.02 1 +363 0.020202020202 0.03 1 +417 0.030303030303 0.04 1 +794 0.040404040404 0.05 1 + +query TT +EXPLAIN +SELECT + empno, + percent_rank() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS pr, + cume_dist() OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cd, + ntile(4) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nt +FROM employees +ORDER BY empno +LIMIT 5; +---- +logical_plan +01)Sort: employees.empno ASC NULLS LAST, fetch=5 +02)--Projection: employees.empno, percent_rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS pr, cume_dist() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS cd, ntile(Int64(4)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS nt +03)----WindowAggr: windowExpr=[[percent_rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, cume_dist() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, ntile(Int64(4)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: employees projection=[empno] +physical_plan +01)ProjectionExec: expr=[empno@0 as empno, percent_rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as pr, cume_dist() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as cd, ntile(Int64(4)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as nt] +02)--GlobalLimitExec: skip=0, fetch=5 +03)----WindowAggExec: wdw=[percent_rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "percent_rank() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Float64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: true }, cume_dist() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "cume_dist() ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Float64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: true }, ntile(Int64(4)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ntile(Int64(4)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: true }] +04)------SortExec: expr=[empno@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[empno], file_type=csv, has_header=true + +# Analytical functions that don't lookahead +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query IIIII +SELECT + empno, + first_value(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS fv, + lag(salary, 1) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS l1, + last_value(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lv, + nth_value(salary, 3) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS n3 +FROM employees +ORDER BY empno +LIMIT 5; +---- +102 3276123488 NULL 3276123488 NULL +299 3276123488 3276123488 28774375 NULL +363 3276123488 28774375 1865307672 1865307672 +417 3276123488 1865307672 557517119 1865307672 +794 3276123488 557517119 4061635107 1865307672 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query IIIII +SELECT + empno, + first_value(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS fv, + lag(salary, 1) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS l1, + last_value(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lv, + nth_value(salary, 3) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS n3 +FROM employees +ORDER BY empno +LIMIT 5; +---- +102 3276123488 NULL 3276123488 NULL +299 3276123488 3276123488 28774375 NULL +363 3276123488 28774375 1865307672 1865307672 +417 3276123488 1865307672 557517119 1865307672 +794 3276123488 557517119 4061635107 1865307672 + +query TT +EXPLAIN +SELECT + empno, + first_value(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS fv, + lag(salary, 1) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS l1, + last_value(salary) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lv, + nth_value(salary, 3) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS n3 +FROM employees +ORDER BY empno +LIMIT 5; +---- +logical_plan +01)Sort: employees.empno ASC NULLS LAST, fetch=5 +02)--Projection: employees.empno, first_value(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS fv, lag(employees.salary,Int64(1)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS l1, last_value(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS lv, nth_value(employees.salary,Int64(3)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS n3 +03)----WindowAggr: windowExpr=[[first_value(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, lag(employees.salary, Int64(1)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, last_value(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, nth_value(employees.salary, Int64(3)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: employees projection=[empno, salary] +physical_plan +01)ProjectionExec: expr=[empno@0 as empno, first_value(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as fv, lag(employees.salary,Int64(1)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as l1, last_value(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as lv, nth_value(employees.salary,Int64(3)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as n3] +02)--GlobalLimitExec: skip=0, fetch=5 +03)----BoundedWindowAggExec: wdw=[first_value(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "first_value(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, lag(employees.salary,Int64(1)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "lag(employees.salary,Int64(1)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, last_value(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "last_value(employees.salary) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, nth_value(employees.salary,Int64(3)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "nth_value(employees.salary,Int64(3)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: TopK(fetch=5), expr=[empno@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[empno, salary], file_type=csv, has_header=true + +# should handle partition by unoptimized +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query TIII +SELECT depname, empno, salary, SUM(salary) OVER ( + PARTITION BY depname + ORDER BY empno + ROWS BETWEEN 1 PRECEDING AND CURRENT ROW + ) AS running_sum +FROM employees +ORDER BY depname +LIMIT 5 +---- +a 102 3276123488 3276123488 +a 363 1865307672 5141431160 +a 829 4015442341 5880750013 +a 2555 145294611 4160736952 +a 2809 754775609 900070220 + +query TT +EXPLAIN +SELECT depname, empno, salary, SUM(salary) OVER ( + PARTITION BY depname + ORDER BY empno + ROWS BETWEEN 1 PRECEDING AND CURRENT ROW + ) AS running_sum +FROM employees +ORDER BY depname +LIMIT 5 +---- +logical_plan +01)Sort: employees.depname ASC NULLS LAST, fetch=5 +02)--Projection: employees.depname, employees.empno, employees.salary, sum(employees.salary) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND CURRENT ROW AS running_sum +03)----WindowAggr: windowExpr=[[sum(employees.salary) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND CURRENT ROW]] +04)------TableScan: employees projection=[depname, empno, salary] +physical_plan +01)SortPreservingMergeExec: [depname@0 ASC NULLS LAST], fetch=5 +02)--ProjectionExec: expr=[depname@0 as depname, empno@1 as empno, salary@2 as salary, sum(employees.salary) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND CURRENT ROW@3 as running_sum] +03)----BoundedWindowAggExec: wdw=[sum(employees.salary) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND CURRENT ROW: Field { name: "sum(employees.salary) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: expr=[depname@0 ASC NULLS LAST, empno@1 ASC NULLS LAST], preserve_partitioning=[true] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([depname@0], 4), input_partitions=4 +07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[depname, empno, salary], file_type=csv, has_header=true + +# should handle partition by optimized +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query TIII +SELECT depname, empno, salary, SUM(salary) OVER ( + PARTITION BY depname + ORDER BY empno + ROWS BETWEEN 1 PRECEDING AND CURRENT ROW + ) AS running_sum +FROM employees +ORDER BY depname +LIMIT 5 +---- +a 102 3276123488 3276123488 +a 363 1865307672 5141431160 +a 829 4015442341 5880750013 +a 2555 145294611 4160736952 +a 2809 754775609 900070220 + +query TT +EXPLAIN +SELECT depname, empno, salary, SUM(salary) OVER ( + PARTITION BY depname + ORDER BY empno + ROWS BETWEEN 1 PRECEDING AND CURRENT ROW + ) AS running_sum +FROM employees +ORDER BY depname +LIMIT 5 +---- +logical_plan +01)Sort: employees.depname ASC NULLS LAST, fetch=5 +02)--Projection: employees.depname, employees.empno, employees.salary, sum(employees.salary) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND CURRENT ROW AS running_sum +03)----WindowAggr: windowExpr=[[sum(employees.salary) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND CURRENT ROW]] +04)------TableScan: employees projection=[depname, empno, salary] +physical_plan +01)SortPreservingMergeExec: [depname@0 ASC NULLS LAST], fetch=5 +02)--ProjectionExec: expr=[depname@0 as depname, empno@1 as empno, salary@2 as salary, sum(employees.salary) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND CURRENT ROW@3 as running_sum] +03)----BoundedWindowAggExec: wdw=[sum(employees.salary) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND CURRENT ROW: Field { name: "sum(employees.salary) PARTITION BY [employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: TopK(fetch=5), expr=[depname@0 ASC NULLS LAST, empno@1 ASC NULLS LAST], preserve_partitioning=[true] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([depname@0], 4), input_partitions=4 +07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +08)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[depname, empno, salary], file_type=csv, has_header=true + +# unbounded following +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query I +SELECT LEAD(salary) OVER (ORDER BY empno ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) +FROM employees +LIMIT 5; +---- +28774375 +1865307672 +557517119 +4061635107 +4015442341 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query I +SELECT LEAD(salary) OVER (ORDER BY empno ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) +FROM employees +LIMIT 5; +---- +28774375 +1865307672 +557517119 +4061635107 +4015442341 + +# RANGE +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query I +SELECT LEAD(salary) OVER (ORDER BY empno RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) +FROM employees +LIMIT 5; +---- +28774375 +1865307672 +557517119 +4061635107 +4015442341 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query I +SELECT LEAD(salary) OVER (ORDER BY empno RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) +FROM employees +LIMIT 5; +---- +28774375 +1865307672 +557517119 +4061635107 +4015442341 + +# multiple windows +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query II +SELECT + LEAD(salary, 1) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), + LEAD(salary, 5) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) +FROM employees +LIMIT 5; +---- +28774375 4015442341 +1865307672 3542840110 +557517119 1088543984 +4061635107 1362369177 +4015442341 145294611 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query II +SELECT + LEAD(salary, 1) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), + LEAD(salary, 5) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) +FROM employees +LIMIT 5; +---- +28774375 4015442341 +1865307672 3542840110 +557517119 1088543984 +4061635107 1362369177 +4015442341 145294611 + +# sliding +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query III +SELECT + empno, + salary, + SUM(salary) OVER (ORDER BY empno ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sliding_sum +FROM employees +LIMIT 3; +---- +102 3276123488 3276123488 +299 28774375 3304897863 +363 1865307672 5170205535 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query III +SELECT + empno, + salary, + SUM(salary) OVER (ORDER BY empno ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sliding_sum +FROM employees +LIMIT 3; +---- +102 3276123488 3276123488 +299 28774375 3304897863 +363 1865307672 5170205535 + +# sliding lead +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query III +SELECT + empno, + salary, + LEAD(salary, 2) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lead2 +FROM employees +LIMIT 3; +---- +102 3276123488 1865307672 +299 28774375 557517119 +363 1865307672 4061635107 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query III +SELECT + empno, + salary, + LEAD(salary, 2) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lead2 +FROM employees +LIMIT 3; +---- +102 3276123488 1865307672 +299 28774375 557517119 +363 1865307672 4061635107 + +query TT +EXPLAIN +SELECT + empno, + salary, + LEAD(salary, 2) OVER (ORDER BY empno ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lead2 +FROM employees +LIMIT 3; +---- +logical_plan +01)Projection: employees.empno, employees.salary, lead(employees.salary,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS lead2 +02)--Limit: skip=0, fetch=3 +03)----WindowAggr: windowExpr=[[lead(employees.salary, Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +04)------TableScan: employees projection=[empno, salary] +physical_plan +01)ProjectionExec: expr=[empno@0 as empno, salary@1 as salary, lead(employees.salary,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as lead2] +02)--GlobalLimitExec: skip=0, fetch=3 +03)----BoundedWindowAggExec: wdw=[lead(employees.salary,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "lead(employees.salary,Int64(2)) ORDER BY [employees.empno ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] +04)------SortExec: TopK(fetch=5), expr=[empno@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[empno, salary], file_type=csv, has_header=true diff --git a/testing b/testing index d2a137123034..0d60ccae40d0 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit d2a13712303498963395318a4eb42872e66aead7 +Subproject commit 0d60ccae40d0e8f2d22c15fafb01c5d4be8c63a6