Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -145,21 +146,28 @@ public static String toSanitizedString(
}

/**
* Extracts an expression that references only the given column IDs from the given expression.
* Returns an expression that retains only predicates which reference one of the given field IDs.
*
* <p>The result is inclusive. If a row would match the original filter, it must match the result
* filter.
*
* @param expression a filter Expression
* @param schema a Schema
* @param expression a filter expression
* @param schema schema for binding references
* @param caseSensitive whether binding is case sensitive
* @param ids field IDs used to match predicates to extract from the expression
* @return an Expression that selects at least the same rows as the original using only the IDs
* @param ids field IDs to retain predicates for
* @return expression containing only predicates that reference the given IDs
*/
public static Expression extractByIdInclusive(
Expression expression, Schema schema, boolean caseSensitive, int... ids) {
PartitionSpec spec = identitySpec(schema, ids);
return Projections.inclusive(spec, caseSensitive).project(Expressions.rewriteNot(expression));
if (ids == null || ids.length == 0) {
return Expressions.alwaysTrue();
}

ImmutableSet.Builder<Integer> retainIds = ImmutableSet.builder();
for (int id : ids) {
retainIds.add(id);
}

return ExpressionVisitors.visit(
Expressions.rewriteNot(expression),
new RetainPredicatesByFieldIdVisitor(schema, caseSensitive, retainIds.build()));
}

/**
Expand Down Expand Up @@ -262,6 +270,61 @@ public static <T> UnboundTerm<T> unbind(Term term) {
throw new UnsupportedOperationException("Cannot unbind unsupported term: " + term);
}

private static class RetainPredicatesByFieldIdVisitor
extends ExpressionVisitors.ExpressionVisitor<Expression> {
private final Schema schema;
private final boolean caseSensitive;
private final Set<Integer> retainFieldIds;

RetainPredicatesByFieldIdVisitor(
Schema schema, boolean caseSensitive, Set<Integer> retainFieldIds) {
this.schema = schema;
this.caseSensitive = caseSensitive;
this.retainFieldIds = retainFieldIds;
}

@Override
public Expression alwaysTrue() {
return Expressions.alwaysTrue();
}

@Override
public Expression alwaysFalse() {
return Expressions.alwaysFalse();
}

@Override
public Expression not(Expression result) {
return Expressions.not(result);
}

@Override
public Expression and(Expression leftResult, Expression rightResult) {
return Expressions.and(leftResult, rightResult);
}

@Override
public Expression or(Expression leftResult, Expression rightResult) {
return Expressions.or(leftResult, rightResult);
}

@Override
public <T> Expression predicate(BoundPredicate<T> pred) {
return retainFieldIds.contains(pred.ref().fieldId()) ? pred : Expressions.alwaysTrue();
}

@Override
public <T> Expression predicate(UnboundPredicate<T> pred) {
Expression bound = Binder.bind(schema.asStruct(), pred, caseSensitive);
if (bound instanceof BoundPredicate) {
return retainFieldIds.contains(((BoundPredicate<?>) bound).ref().fieldId())
? pred
: Expressions.alwaysTrue();
}
return Expressions.alwaysTrue();
}
}

private static class ExpressionSanitizer
extends ExpressionVisitors.ExpressionVisitor<Expression> {
private final long now;
Expand Down Expand Up @@ -697,14 +760,4 @@ private static String sanitizeVariantValue(
}
return builder.toString();
}

private static PartitionSpec identitySpec(Schema schema, int... ids) {
PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema);

for (int id : ids) {
specBuilder.identity(schema.findColumnName(id));
}

return specBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,24 @@ public class TestExpressionUtil {
private static final Types.StructType FLOAT_TEST =
Types.StructType.of(Types.NestedField.optional(1, "test", Types.FloatType.get()));

/** Schema with struct, list, and map columns for {@link #testExtractByIdInclusiveNestedTypes}. */
private static final Schema NESTED_EXTRACT_SCHEMA =
new Schema(
Types.NestedField.required(1, "top_id", Types.LongType.get()),
Types.NestedField.optional(
2,
"st",
Types.StructType.of(
Types.NestedField.required(3, "inner_i", Types.IntegerType.get()))),
Types.NestedField.optional(
4, "arr", Types.ListType.ofRequired(5, Types.IntegerType.get())),
Types.NestedField.optional(
6,
"mp",
Types.MapType.ofRequired(7, 8, Types.StringType.get(), Types.IntegerType.get())));

private static final Types.StructType NESTED_EXTRACT_STRUCT = NESTED_EXTRACT_SCHEMA.asStruct();

@Test
public void testUnchangedUnaryPredicates() {
for (Expression unary :
Expand Down Expand Up @@ -825,6 +843,146 @@ public void testSanitizeStringFallback() {
}
}

@Test
public void testExtractByIdInclusive() {
Expression alwaysTrue = Expressions.alwaysTrue();
Expression idEq = Expressions.equal("id", 5L);
Expression valEq = Expressions.equal("val", 5);

assertThat(
ExpressionUtil.equivalent(
alwaysTrue,
ExpressionUtil.extractByIdInclusive(
Expressions.and(idEq, valEq), SCHEMA, true, new int[0]),
STRUCT,
true))
.isTrue();

assertThat(
ExpressionUtil.equivalent(
alwaysTrue,
ExpressionUtil.extractByIdInclusive(
Expressions.and(idEq, valEq), SCHEMA, true, (int[]) null),
STRUCT,
true))
.isTrue();

assertThat(
ExpressionUtil.equivalent(
idEq, ExpressionUtil.extractByIdInclusive(idEq, SCHEMA, true, 1), STRUCT, true))
.isTrue();

assertThat(
ExpressionUtil.equivalent(
alwaysTrue,
ExpressionUtil.extractByIdInclusive(valEq, SCHEMA, true, 1),
STRUCT,
true))
.isTrue();

assertThat(
ExpressionUtil.equivalent(
idEq,
ExpressionUtil.extractByIdInclusive(Expressions.and(idEq, valEq), SCHEMA, true, 1),
STRUCT,
true))
.isTrue();

Expression orBothId = Expressions.or(Expressions.equal("id", 1L), Expressions.equal("id", 2L));
assertThat(
ExpressionUtil.equivalent(
orBothId,
ExpressionUtil.extractByIdInclusive(orBothId, SCHEMA, true, 1),
STRUCT,
true))
.isTrue();
}

@Test
public void testExtractByIdInclusiveNestedTypes() {
Expression alwaysTrue = Expressions.alwaysTrue();
Expression structPred = Expressions.equal("st.inner_i", 1);
Expression listPred = Expressions.equal("arr.element", 42);
Expression mapKeyPred = Expressions.equal("mp.key", "k");
Expression mapValuePred = Expressions.equal("mp.value", 7);
Expression topPred = Expressions.equal("top_id", 9L);

assertThat(
ExpressionUtil.equivalent(
structPred,
ExpressionUtil.extractByIdInclusive(structPred, NESTED_EXTRACT_SCHEMA, true, 3),
NESTED_EXTRACT_STRUCT,
true))
.isTrue();
assertThat(
ExpressionUtil.equivalent(
alwaysTrue,
ExpressionUtil.extractByIdInclusive(structPred, NESTED_EXTRACT_SCHEMA, true, 1),
NESTED_EXTRACT_STRUCT,
true))
.isTrue();

assertThat(
ExpressionUtil.equivalent(
listPred,
ExpressionUtil.extractByIdInclusive(listPred, NESTED_EXTRACT_SCHEMA, true, 5),
NESTED_EXTRACT_STRUCT,
true))
.isTrue();
assertThat(
ExpressionUtil.equivalent(
alwaysTrue,
ExpressionUtil.extractByIdInclusive(listPred, NESTED_EXTRACT_SCHEMA, true, 1),
NESTED_EXTRACT_STRUCT,
true))
.isTrue();

assertThat(
ExpressionUtil.equivalent(
mapKeyPred,
ExpressionUtil.extractByIdInclusive(mapKeyPred, NESTED_EXTRACT_SCHEMA, true, 7),
NESTED_EXTRACT_STRUCT,
true))
.isTrue();
assertThat(
ExpressionUtil.equivalent(
mapValuePred,
ExpressionUtil.extractByIdInclusive(mapValuePred, NESTED_EXTRACT_SCHEMA, true, 8),
NESTED_EXTRACT_STRUCT,
true))
.isTrue();
assertThat(
ExpressionUtil.equivalent(
alwaysTrue,
ExpressionUtil.extractByIdInclusive(mapKeyPred, NESTED_EXTRACT_SCHEMA, true, 8),
NESTED_EXTRACT_STRUCT,
true))
.isTrue();

Expression mixed = Expressions.and(structPred, Expressions.and(listPred, topPred));
assertThat(
ExpressionUtil.equivalent(
structPred,
ExpressionUtil.extractByIdInclusive(mixed, NESTED_EXTRACT_SCHEMA, true, 3),
NESTED_EXTRACT_STRUCT,
true))
.isTrue();
assertThat(
ExpressionUtil.equivalent(
listPred,
ExpressionUtil.extractByIdInclusive(mixed, NESTED_EXTRACT_SCHEMA, true, 5),
NESTED_EXTRACT_STRUCT,
true))
.isTrue();
assertThat(
ExpressionUtil.equivalent(
topPred,
ExpressionUtil.extractByIdInclusive(mixed, NESTED_EXTRACT_SCHEMA, true, 1),
NESTED_EXTRACT_STRUCT,
true))
.isTrue();
}

@Test
public void testIdenticalExpressionIsEquivalent() {
Expression[] exprs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,68 @@ public void testRewriteSummary() throws Exception {
EnvironmentContext.ENGINE_VERSION, v -> assertThat(v).startsWith("4.1"));
}

@TestTemplate
public void testRewritePositionDeletesWithArrayColumns() throws Exception {
sql(
"CREATE TABLE %s (id BIGINT, data STRING, items ARRAY<STRUCT<value:BIGINT, count:INT>>) "
+ "USING iceberg TBLPROPERTIES "
+ "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read')",
tableName);

sql(
"INSERT INTO %s VALUES "
+ "(1, 'a', array(named_struct('value', cast(10 as bigint), 'count', 1))), "
+ "(2, 'b', array(named_struct('value', cast(20 as bigint), 'count', 2))), "
+ "(3, 'c', array(named_struct('value', cast(30 as bigint), 'count', 3))), "
+ "(4, 'd', array(named_struct('value', cast(40 as bigint), 'count', 4))), "
+ "(5, 'e', array(named_struct('value', cast(50 as bigint), 'count', 5))), "
+ "(6, 'f', array(named_struct('value', cast(60 as bigint), 'count', 6)))",
tableName);

sql("DELETE FROM %s WHERE id = 1", tableName);
sql("DELETE FROM %s WHERE id = 2", tableName);

Table table = validationCatalog.loadTable(tableIdent);
assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);

sql(
"CALL %s.system.rewrite_position_delete_files("
+ "table => '%s',"
+ "options => map('rewrite-all','true'))",
catalogName, tableIdent);
}

@TestTemplate
public void testRewritePositionDeletesWithMapColumns() throws Exception {
sql(
"CREATE TABLE %s (id BIGINT, data STRING, props MAP<STRING, BIGINT>) "
+ "USING iceberg TBLPROPERTIES "
+ "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read')",
tableName);

sql(
"INSERT INTO %s VALUES "
+ "(1, 'a', map('x', cast(10 as bigint))), "
+ "(2, 'b', map('y', cast(20 as bigint))), "
+ "(3, 'c', map('z', cast(30 as bigint))), "
+ "(4, 'd', map('w', cast(40 as bigint))), "
+ "(5, 'e', map('v', cast(50 as bigint))), "
+ "(6, 'f', map('u', cast(60 as bigint)))",
tableName);

sql("DELETE FROM %s WHERE id = 1", tableName);
sql("DELETE FROM %s WHERE id = 2", tableName);

Table table = validationCatalog.loadTable(tableIdent);
assertThat(TestHelpers.deleteFiles(table)).hasSizeGreaterThanOrEqualTo(1);

sql(
"CALL %s.system.rewrite_position_delete_files("
+ "table => '%s',"
+ "options => map('rewrite-all','true'))",
catalogName, tableIdent);
}

private Map<String, String> snapshotSummary() {
return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
}
Expand Down
Loading
Loading