Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use std::any::Any;

use arrow::datatypes::Field;
use arrow::{
array::{ArrayRef, AsArray, Float64Array},
Expand All @@ -33,10 +30,14 @@ use datafusion::logical_expr::function::{
};
use datafusion::logical_expr::simplify::SimplifyInfo;
use datafusion::logical_expr::{
Expr, PartitionEvaluator, Signature, WindowFrame, WindowFunctionDefinition,
WindowUDF, WindowUDFImpl,
Expr, LimitEffect, PartitionEvaluator, Signature, WindowFrame,
WindowFunctionDefinition, WindowUDF, WindowUDFImpl,
};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::prelude::*;
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use std::any::Any;
use std::sync::Arc;

/// This example shows how to use the full WindowUDFImpl API to implement a user
/// defined window function. As in the `simple_udwf.rs` example, this struct implements
Expand Down Expand Up @@ -91,6 +92,10 @@ impl WindowUDFImpl for SmoothItUdf {
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
Ok(Field::new(field_args.name(), DataType::Float64, true).into())
}

fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
LimitEffect::Unknown
}
}

/// This implements the lowest level evaluation for a window function
Expand Down Expand Up @@ -211,6 +216,10 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
Ok(Field::new(field_args.name(), DataType::Float64, true).into())
}

fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
LimitEffect::Unknown
}
}

// create local execution context with `cars.csv` registered as a table named `cars`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use datafusion::prelude::SessionContext;
use datafusion_common::exec_datafusion_err;
use datafusion_expr::ptr_eq::PtrEq;
use datafusion_expr::{
PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF, WindowUDFImpl,
LimitEffect, PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF,
WindowUDFImpl,
};
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_functions_window_common::{
Expand Down Expand Up @@ -570,6 +571,10 @@ impl OddCounter {
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
Ok(Field::new(field_args.name(), DataType::Int64, true).into())
}

fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
LimitEffect::Unknown
}
}

ctx.register_udwf(WindowUDF::from(SimpleWindowUDF::new(test_state)))
Expand Down Expand Up @@ -689,6 +694,10 @@ impl WindowUDFImpl for VariadicWindowUDF {
fn field(&self, _: WindowUDFFieldArgs) -> Result<FieldRef> {
unimplemented!("unnecessary for testing");
}

fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
LimitEffect::Unknown
}
}

#[test]
Expand Down Expand Up @@ -842,6 +851,10 @@ impl WindowUDFImpl for MetadataBasedWindowUdf {
.with_metadata(self.metadata.clone())
.into())
}

fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
LimitEffect::Unknown
}
}

#[derive(Debug)]
Expand Down
9 changes: 7 additions & 2 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use crate::ptr_eq::PtrEq;
use crate::select_expr::SelectExpr;
use crate::{
conditional_expressions::CaseBuilder, expr::Sort, logical_plan::Subquery,
AggregateUDF, Expr, LogicalPlan, Operator, PartitionEvaluator, ScalarFunctionArgs,
ScalarFunctionImplementation, ScalarUDF, Signature, Volatility,
AggregateUDF, Expr, LimitEffect, LogicalPlan, Operator, PartitionEvaluator,
ScalarFunctionArgs, ScalarFunctionImplementation, ScalarUDF, Signature, Volatility,
};
use crate::{
AggregateUDFImpl, ColumnarValue, ScalarUDFImpl, WindowFrame, WindowUDF, WindowUDFImpl,
Expand All @@ -42,6 +42,7 @@ use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::{plan_err, Column, Result, ScalarValue, Spans, TableReference};
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use sqlparser::ast::NullTreatment;
use std::any::Any;
use std::fmt::Debug;
Expand Down Expand Up @@ -691,6 +692,10 @@ impl WindowUDFImpl for SimpleWindowUDF {
true,
)))
}

fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
LimitEffect::Unknown
}
}

pub fn interval_year_month_lit(value: &str) -> Expr {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ pub use udaf::{
pub use udf::{
scalar_doc_sections, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl,
};
pub use udwf::{window_doc_sections, ReversedUDWF, WindowUDF, WindowUDFImpl};
pub use udwf::{window_doc_sections, ReversedUDWF, WindowUDF};
pub use udwf::{LimitEffect, WindowUDFImpl};
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};

#[cfg(test)]
Expand Down
40 changes: 38 additions & 2 deletions datafusion/expr/src/udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,13 @@ where
/// # use std::sync::LazyLock;
/// # use arrow::datatypes::{DataType, Field, FieldRef};
/// # use datafusion_common::{DataFusionError, plan_err, Result};
/// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, WindowFrame, ExprFunctionExt, Documentation};
/// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, WindowFrame, ExprFunctionExt, Documentation, LimitEffect};
/// # use datafusion_expr::{WindowUDFImpl, WindowUDF};
/// # use datafusion_functions_window_common::field::WindowUDFFieldArgs;
/// # use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
/// # use datafusion_expr::window_doc_sections::DOC_SECTION_ANALYTICAL;
/// # use datafusion_physical_expr_common::physical_expr;
/// # use std::sync::Arc;
///
/// #[derive(Debug, Clone, PartialEq, Eq, Hash)]
/// struct SmoothIt {
Expand Down Expand Up @@ -295,6 +297,9 @@ where
/// fn documentation(&self) -> Option<&Documentation> {
/// Some(get_doc())
/// }
/// fn limit_effect(&self, _args: &[Arc<dyn physical_expr::PhysicalExpr>]) -> LimitEffect {
/// LimitEffect::Unknown
/// }
/// }
///
/// // Create a new WindowUDF from the implementation
Expand Down Expand Up @@ -414,6 +419,23 @@ pub trait WindowUDFImpl: Debug + DynEq + DynHash + Send + Sync {
fn documentation(&self) -> Option<&Documentation> {
None
}

/// If not causal, returns the effect this function will have on the window
fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
LimitEffect::Unknown
}
}

/// the effect this function will have on the limit pushdown
pub enum LimitEffect {
/// Does not affect the limit (i.e. this is causal)
None,
/// Either undeclared, or dynamic (only evaluatable at run time)
Unknown,
/// Grow the limit by N rows
Relative(usize),
/// Limit needs to be at least N rows
Absolute(usize),
}

pub enum ReversedUDWF {
Expand Down Expand Up @@ -522,6 +544,10 @@ impl WindowUDFImpl for AliasedWindowUDFImpl {
fn documentation(&self) -> Option<&Documentation> {
self.inner.documentation()
}

fn limit_effect(&self, args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
self.inner.limit_effect(args)
}
}

// Window UDF doc sections for use in public documentation
Expand Down Expand Up @@ -557,15 +583,17 @@ pub mod window_doc_sections {

#[cfg(test)]
mod test {
use crate::{PartitionEvaluator, WindowUDF, WindowUDFImpl};
use crate::{LimitEffect, PartitionEvaluator, WindowUDF, WindowUDFImpl};
use arrow::datatypes::{DataType, FieldRef};
use datafusion_common::Result;
use datafusion_expr_common::signature::{Signature, Volatility};
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use std::any::Any;
use std::cmp::Ordering;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::sync::Arc;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct AWindowUDF {
Expand Down Expand Up @@ -604,6 +632,10 @@ mod test {
fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
unimplemented!()
}

fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
LimitEffect::Unknown
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -643,6 +675,10 @@ mod test {
fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
unimplemented!()
}

fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
LimitEffect::Unknown
}
}

#[test]
Expand Down
6 changes: 6 additions & 0 deletions datafusion/ffi/src/udwf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use arrow::{
datatypes::{DataType, SchemaRef},
};
use arrow_schema::{Field, FieldRef};
use datafusion::logical_expr::LimitEffect;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::{
error::DataFusionError,
logical_expr::{
Expand Down Expand Up @@ -348,6 +350,10 @@ impl WindowUDFImpl for ForeignWindowUDF {
let options: Option<&FFI_SortOptions> = self.udf.sort_options.as_ref().into();
options.map(|s| s.into())
}

fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
LimitEffect::Unknown
}
}

#[repr(C)]
Expand Down
7 changes: 6 additions & 1 deletion datafusion/functions-window/src/cume_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::arrow::datatypes::Field;
use datafusion_common::Result;
use datafusion_expr::{
Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
};
use datafusion_functions_window_common::field;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_macros::user_doc;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use field::WindowUDFFieldArgs;
use std::any::Any;
use std::fmt::Debug;
Expand Down Expand Up @@ -110,6 +111,10 @@ impl WindowUDFImpl for CumeDist {
fn documentation(&self) -> Option<&Documentation> {
self.doc()
}

fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
LimitEffect::Unknown
}
}

#[derive(Debug, Default)]
Expand Down
37 changes: 30 additions & 7 deletions datafusion/functions-window/src/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use datafusion_common::arrow::datatypes::Field;
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::window_doc_sections::DOC_SECTION_ANALYTICAL;
use datafusion_expr::{
Documentation, Literal, PartitionEvaluator, ReversedUDWF, Signature, TypeSignature,
Volatility, WindowUDFImpl,
Documentation, LimitEffect, Literal, PartitionEvaluator, ReversedUDWF, Signature,
TypeSignature, Volatility, WindowUDFImpl,
};
use datafusion_functions_window_common::expr::ExpressionArgs;
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_physical_expr::expressions;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use std::any::Any;
use std::cmp::min;
Expand Down Expand Up @@ -95,7 +96,7 @@ pub fn lead(
}

#[derive(Debug, PartialEq, Eq, Hash)]
enum WindowShiftKind {
pub enum WindowShiftKind {
Lag,
Lead,
}
Expand Down Expand Up @@ -148,6 +149,10 @@ impl WindowShift {
pub fn lead() -> Self {
Self::new(WindowShiftKind::Lead)
}

pub fn kind(&self) -> &WindowShiftKind {
&self.kind
}
}

static LAG_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
Expand Down Expand Up @@ -299,6 +304,26 @@ impl WindowUDFImpl for WindowShift {
WindowShiftKind::Lead => Some(get_lead_doc()),
}
}

fn limit_effect(&self, args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
if self.kind == WindowShiftKind::Lag {
return LimitEffect::None;
}
match args {
[_, expr, ..] => {
let Some(lit) = expr.as_any().downcast_ref::<expressions::Literal>()
else {
return LimitEffect::Unknown;
};
let ScalarValue::Int64(Some(amount)) = lit.value() else {
return LimitEffect::Unknown; // we should only get int64 from the parser
};
LimitEffect::Relative((*amount).max(0) as usize)
}
[_] => LimitEffect::Relative(1), // default value
_ => LimitEffect::Unknown, // invalid arguments
}
}
}

/// When `lead`/`lag` is evaluated on a `NULL` expression we attempt to
Expand Down Expand Up @@ -330,10 +355,8 @@ fn parse_expr(

let default_value = get_scalar_value_from_args(input_exprs, 2)?;
default_value.map_or(Ok(expr), |value| {
ScalarValue::try_from(&value.data_type()).map(|v| {
Arc::new(datafusion_physical_expr::expressions::Literal::new(v))
as Arc<dyn PhysicalExpr>
})
ScalarValue::try_from(&value.data_type())
.map(|v| Arc::new(expressions::Literal::new(v)) as Arc<dyn PhysicalExpr>)
})
}

Expand Down
15 changes: 12 additions & 3 deletions datafusion/functions-window/src/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ use datafusion_common::{exec_datafusion_err, exec_err, Result, ScalarValue};
use datafusion_expr::window_doc_sections::DOC_SECTION_ANALYTICAL;
use datafusion_expr::window_state::WindowAggState;
use datafusion_expr::{
Documentation, Literal, PartitionEvaluator, ReversedUDWF, Signature, TypeSignature,
Volatility, WindowUDFImpl,
Documentation, LimitEffect, Literal, PartitionEvaluator, ReversedUDWF, Signature,
TypeSignature, Volatility, WindowUDFImpl,
};
use datafusion_functions_window_common::field;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use field::WindowUDFFieldArgs;
use std::any::Any;
use std::cmp::Ordering;
use std::fmt::Debug;
use std::hash::Hash;
use std::ops::Range;
use std::sync::LazyLock;
use std::sync::{Arc, LazyLock};

get_or_init_udwf!(
First,
Expand Down Expand Up @@ -126,6 +127,10 @@ impl NthValue {
pub fn nth() -> Self {
Self::new(NthValueKind::Nth)
}

pub fn kind(&self) -> &NthValueKind {
&self.kind
}
}

static FIRST_VALUE_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
Expand Down Expand Up @@ -337,6 +342,10 @@ impl WindowUDFImpl for NthValue {
NthValueKind::Nth => Some(get_nth_value_doc()),
}
}

fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
LimitEffect::None // NthValue is causal
}
}

#[derive(Debug, Clone)]
Expand Down
Loading
Loading