Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/substrait/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ prost = { workspace = true }
substrait = { version = "0.58", features = ["serde"] }
url = { workspace = true }
tokio = { workspace = true, features = ["fs"] }
uuid = { version = "1.17.0", features = ["v4"] }

[dev-dependencies]
datafusion = { workspace = true, features = ["nested_expressions"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,20 @@ pub async fn from_project_rel(
// to transform it into a column reference
window_exprs.insert(e.clone());
}
explicit_exprs.push(name_tracker.get_uniquely_named_expr(e)?);
// Substrait plans are ordinal based, so they do not provide names for columns.
// Names for columns are generated by Datafusion during conversion, and for literals
// Datafusion produces names based on the literal value. It is possible to construct
// valid Substrait plans that result in duplicated names if the same literal value is
// used in multiple relations. To avoid this issue, we alias literals with unique names.
// The name tracker will ensure that two literals in the same project would have
// unique names but, it does not ensure that if a literal column exists in a previous
// project say before a join that it is deduplicated with respect to those columns.
// See: https://github.com/apache/datafusion/pull/17299
let maybe_apply_alias = match e {
lit @ Expr::Literal(_, _) => lit.alias(uuid::Uuid::new_v4().to_string()),
_ => e,
};
explicit_exprs.push(name_tracker.get_uniquely_named_expr(maybe_apply_alias)?);
}

let input = if !window_exprs.is_empty() {
Expand Down
40 changes: 24 additions & 16 deletions datafusion/substrait/tests/cases/consumer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,23 +647,31 @@ mod tests {
#[tokio::test]
async fn test_multiple_unions() -> Result<()> {
let plan_str = test_plan_to_string("multiple_unions.json").await?;
assert_snapshot!(
plan_str,
@r#"
Projection: Utf8("people") AS product_category, Utf8("people")__temp__0 AS product_type, product_key
Union
Projection: Utf8("people"), Utf8("people") AS Utf8("people")__temp__0, sales.product_key
Left Join: sales.product_key = food.@food_id
TableScan: sales
TableScan: food
Union
Projection: people.$f3, people.$f5, people.product_key0
Left Join: people.product_key0 = food.@food_id
TableScan: people
TableScan: food
TableScan: more_products
"#

let mut settings = insta::Settings::clone_current();
settings.add_filter(
r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}",
"[UUID]",
);
settings.bind(|| {
assert_snapshot!(
plan_str,
@r#"
Projection: [UUID] AS product_category, [UUID] AS product_type, product_key
Union
Projection: Utf8("people") AS [UUID], Utf8("people") AS [UUID], sales.product_key
Left Join: sales.product_key = food.@food_id
TableScan: sales
TableScan: food
Union
Projection: people.$f3, people.$f5, people.product_key0
Left Join: people.product_key0 = food.@food_id
TableScan: people
TableScan: food
TableScan: more_products
"#
);
});

Ok(())
}
Expand Down
41 changes: 41 additions & 0 deletions datafusion/substrait/tests/cases/logical_plans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,47 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn null_literal_before_and_after_joins() -> Result<()> {
// Confirms that literals used before and after a join but for different columns
// are correctly handled.

// File generated with substrait-java's Isthmus:
// ./isthmus-cli/build/graal/isthmus --create "create table A (a int); create table B (a int, c int); create table C (a int, d int)" "select t.*, C.d, CAST(NULL AS VARCHAR) as e from (select a, CAST(NULL AS VARCHAR) as c from A UNION ALL select a, c from B) t LEFT JOIN C ON t.a = C.a"
let proto_plan =
read_json("tests/testdata/test_plans/disambiguate_literals_with_same_name.substrait.json");
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

let mut settings = insta::Settings::clone_current();
settings.add_filter(
r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}",
"[UUID]",
);
settings.bind(|| {
assert_snapshot!(
plan,
@r#"
Projection: left.A, left.[UUID] AS C, right.D, Utf8(NULL) AS [UUID] AS E
Left Join: left.A = right.A
SubqueryAlias: left
Union
Projection: A.A, Utf8(NULL) AS [UUID]
TableScan: A
Projection: B.A, CAST(B.C AS Utf8)
TableScan: B
SubqueryAlias: right
TableScan: C
"#
);
});

// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;

Ok(())
}

#[tokio::test]
async fn non_nullable_lists() -> Result<()> {
// DataFusion's Substrait consumer treats all lists as nullable, even if the Substrait plan specifies them as non-nullable.
Expand Down
Loading
Loading