Skip to content

[Enhancement] Paimon limit push down when only has partition predicate. (backport #57006) #57271

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,13 @@ public List<RemoteFileInfo> getRemoteFiles(Table table, GetRemoteFilesParams par
int[] projected =
params.getFieldNames().stream().mapToInt(name -> (paimonTable.getFieldNames().indexOf(name))).toArray();
List<Predicate> predicates = extractPredicates(paimonTable, params.getPredicate());
InnerTableScan scan = (InnerTableScan) readBuilder.withFilter(predicates).withProjection(projected).newScan();
boolean pruneManifestsByLimit = params.getLimit() != -1 && params.getLimit() < Integer.MAX_VALUE
&& onlyHasPartitionPredicate(table, params.getPredicate());
readBuilder = readBuilder.withFilter(predicates).withProjection(projected);
if (pruneManifestsByLimit) {
readBuilder = readBuilder.withLimit((int) params.getLimit());
}
InnerTableScan scan = (InnerTableScan) readBuilder.newScan();
PaimonMetricRegistry paimonMetricRegistry = new PaimonMetricRegistry();
List<Split> splits = scan.withMetricsRegistry(paimonMetricRegistry).plan().splits();
traceScanMetrics(paimonMetricRegistry, splits, table.getCatalogTableName(), predicates);
Expand Down Expand Up @@ -651,4 +657,35 @@ private void refreshPartitionInfo(Identifier identifier) {
LOG.warn("Current catalog {} does not support cache.", catalogName);
}
}

public static boolean onlyHasPartitionPredicate(Table table, ScalarOperator predicate) {
if (predicate == null) {
return true;
}

List<ScalarOperator> scalarOperators = Utils.extractConjuncts(predicate);

List<String> predicateColumns = new ArrayList<>();
for (ScalarOperator operator : scalarOperators) {
String columnName = null;
if (operator.getChild(0) instanceof ColumnRefOperator) {
columnName = ((ColumnRefOperator) operator.getChild(0)).getName();
}

if (columnName == null || columnName.isEmpty()) {
return false;
}

predicateColumns.add(columnName);
}

List<String> partitionColNames = table.getPartitionColumnNames();
for (String columnName : predicateColumns) {
if (!partitionColNames.contains(columnName)) {
return false;
}
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,12 @@ public long getEstimatedLength(long rowCount, TupleDescriptor tupleDescriptor) {
return rowCount * rowSize;
}

public void setupScanRangeLocations(TupleDescriptor tupleDescriptor, ScalarOperator predicate) {
public void setupScanRangeLocations(TupleDescriptor tupleDescriptor, ScalarOperator predicate, long limit) {
List<String> fieldNames =
tupleDescriptor.getSlots().stream().map(s -> s.getColumn().getName()).collect(Collectors.toList());
GetRemoteFilesParams params =
GetRemoteFilesParams.newBuilder().setPredicate(predicate).setFieldNames(fieldNames).build();
GetRemoteFilesParams.newBuilder().setPredicate(predicate).setFieldNames(fieldNames).setLimit(limit)
.build();
List<RemoteFileInfo> fileInfos;
try (Timer ignored = Tracers.watchScope(EXTERNAL, paimonTable.getCatalogTableName() + ".getPaimonRemoteFileInfos")) {
fileInfos = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFiles(paimonTable, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,7 @@ public PlanFragment visitPhysicalPaimonScan(OptExpression optExpression, ExecPla
paimonScanNode.getConjuncts()
.add(ScalarOperatorToExpr.buildExecExpression(predicate, formatterContext));
}
paimonScanNode.setupScanRangeLocations(tupleDescriptor, node.getPredicate());
paimonScanNode.setupScanRangeLocations(tupleDescriptor, node.getPredicate(), node.getLimit());
HDFSScanNodePredicates scanNodePredicates = paimonScanNode.getScanNodePredicates();
prepareCommonExpr(scanNodePredicates, node.getScanOperatorPredicates(), context);
prepareMinMaxExpr(scanNodePredicates, node.getScanOperatorPredicates(), context, referenceTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.starrocks.analysis.BinaryType;
import com.starrocks.analysis.FunctionName;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Function;
import com.starrocks.catalog.FunctionSet;
import com.starrocks.catalog.PaimonTable;
import com.starrocks.catalog.ScalarType;
import com.starrocks.catalog.Table;
Expand All @@ -32,33 +36,50 @@
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.credential.CloudType;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.MetadataMgr;
import com.starrocks.sql.optimizer.Memo;
import com.starrocks.sql.optimizer.OptExpression;
import com.starrocks.sql.optimizer.OptimizerContext;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.sql.optimizer.base.ColumnRefFactory;
import com.starrocks.sql.optimizer.operator.logical.LogicalPaimonScanOperator;
import com.starrocks.sql.optimizer.operator.scalar.BinaryPredicateOperator;
import com.starrocks.sql.optimizer.operator.scalar.CallOperator;
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator;
import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
import com.starrocks.sql.optimizer.rule.transformation.ExternalScanPartitionPruneRule;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.stats.ColStats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadBuilder;
Expand All @@ -72,6 +93,7 @@
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
Expand All @@ -84,7 +106,9 @@
import org.junit.Before;
import org.junit.Test;

import java.nio.file.Files;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -316,6 +340,179 @@ public long getTableCreateTime(String dbName, String tblName) {
Assert.assertEquals(2, desc.getPaimonSplitsInfo().getPaimonSplits().size());
}

@Test
public void testGetRemoteFileInfosWithLimit() throws Exception {

java.nio.file.Path tmpDir = Files.createTempDirectory("tmp_");

Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(tmpDir.toString())));

catalog.createDatabase("test_db", true);

// create schema
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.partitionKeys("create_date");
schemaBuilder.column("create_date", DataTypes.STRING());
schemaBuilder.column("user", DataTypes.STRING());
schemaBuilder.column("record_time", DataTypes.STRING());

Options options = new Options();
options.set(CoreOptions.BUCKET, 2);
options.set(CoreOptions.BUCKET_KEY, "user");
schemaBuilder.options(options.toMap());

Schema schema = schemaBuilder.build();

// create table
Identifier identifier = Identifier.create("test_db", "test_table");
catalog.createTable(identifier, schema, true);

// insert data
org.apache.paimon.table.Table table = catalog.getTable(identifier);
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder().withOverwrite();
BatchTableWrite write = writeBuilder.newWrite();

DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd");

DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

LocalDateTime now = LocalDateTime.now();
GenericRow record1 = GenericRow.of(BinaryString.fromString(dateFormatter.format(now)),
BinaryString.fromString("user_1"),
BinaryString.fromString(dateTimeFormatter.format(now)));
GenericRow record2 = GenericRow.of(BinaryString.fromString(dateFormatter.format(now)),
BinaryString.fromString("user_2"),
BinaryString.fromString(dateTimeFormatter.format(now)));

now = now.minusDays(1);
GenericRow record3 = GenericRow.of(BinaryString.fromString(dateFormatter.format(now)),
BinaryString.fromString("user_1"),
BinaryString.fromString(dateTimeFormatter.format(now)));
GenericRow record4 = GenericRow.of(BinaryString.fromString(dateFormatter.format(now)),
BinaryString.fromString("user_2"),
BinaryString.fromString(dateTimeFormatter.format(now)));

now = now.minusDays(1);
GenericRow record5 = GenericRow.of(BinaryString.fromString(dateFormatter.format(now)),
BinaryString.fromString("user_1"),
BinaryString.fromString(dateTimeFormatter.format(now)));
GenericRow record6 = GenericRow.of(BinaryString.fromString(dateFormatter.format(now)),
BinaryString.fromString("user_2"),
BinaryString.fromString(dateTimeFormatter.format(now)));

write.write(record1);
write.write(record2);
write.write(record3);
write.write(record4);
write.write(record5);
write.write(record6);

List<CommitMessage> messages = write.prepareCommit();

BatchTableCommit commit = writeBuilder.newCommit();
commit.commit(messages);

List<String> fieldNames = Lists.newArrayList("create_date", "user", "record_time");

HdfsEnvironment environment = new HdfsEnvironment();
ConnectorProperties properties = new ConnectorProperties(ConnectorType.PAIMON);

// no predicate, limit 1
PaimonMetadata metadata = new PaimonMetadata("paimon", environment, catalog, properties);
GetRemoteFilesParams params = GetRemoteFilesParams.newBuilder().setFieldNames(fieldNames).setLimit(1).build();
List<RemoteFileInfo> result = metadata.getRemoteFiles(metadata.getTable("test_db", "test_table"), params);
Assert.assertEquals(1, result.size());
Assert.assertEquals(1, result.get(0).getFiles().size());
Assert.assertEquals(1, ((PaimonRemoteFileDesc) result.get(0).getFiles().get(0))
.getPaimonSplitsInfo().getPaimonSplits().size());

// no predicate, no limit
metadata = new PaimonMetadata("paimon", environment, catalog, properties);
params = GetRemoteFilesParams.newBuilder().setFieldNames(fieldNames).setLimit(-1).build();
result = metadata.getRemoteFiles(metadata.getTable("test_db", "test_table"), params);
Assert.assertEquals(1, result.size());
Assert.assertEquals(1, result.get(0).getFiles().size());
Assert.assertEquals(6, ((PaimonRemoteFileDesc) result.get(0).getFiles().get(0))
.getPaimonSplitsInfo().getPaimonSplits().size());

ColumnRefOperator createDateColumn = new ColumnRefOperator(1, Type.STRING, "create_date", false);
ScalarOperator createDateEqualPredicate = new BinaryPredicateOperator(BinaryType.EQ, createDateColumn,
ConstantOperator.createVarchar(dateFormatter.format(now)));

// partition predicate, limit 1
metadata = new PaimonMetadata("paimon", environment, catalog, properties);
params = GetRemoteFilesParams.newBuilder().setFieldNames(fieldNames).setPredicate(createDateEqualPredicate)
.setLimit(1).build();
result = metadata.getRemoteFiles(metadata.getTable("test_db", "test_table"), params);
Assert.assertEquals(1, result.size());
Assert.assertEquals(1, result.get(0).getFiles().size());
Assert.assertEquals(1, ((PaimonRemoteFileDesc) result.get(0).getFiles().get(0))
.getPaimonSplitsInfo().getPaimonSplits().size());

// partition predicate, no limit
metadata = new PaimonMetadata("paimon", environment, catalog, properties);
params = GetRemoteFilesParams.newBuilder().setFieldNames(fieldNames).setPredicate(createDateEqualPredicate)
.setLimit(-1).build();
result = metadata.getRemoteFiles(metadata.getTable("test_db", "test_table"), params);
Assert.assertEquals(1, result.size());
Assert.assertEquals(1, result.get(0).getFiles().size());
Assert.assertEquals(2, ((PaimonRemoteFileDesc) result.get(0).getFiles().get(0))
.getPaimonSplitsInfo().getPaimonSplits().size());

ColumnRefOperator userColumn = new ColumnRefOperator(2, Type.STRING, "user", false);
ScalarOperator userEqualPredicate = new BinaryPredicateOperator(BinaryType.EQ, userColumn,
ConstantOperator.createVarchar("user_1"));

// none partition predicate, limit 1
metadata = new PaimonMetadata("paimon", environment, catalog, properties);
params = GetRemoteFilesParams.newBuilder().setFieldNames(fieldNames).setPredicate(userEqualPredicate)
.setLimit(1).build();
result = metadata.getRemoteFiles(metadata.getTable("test_db", "test_table"), params);
Assert.assertEquals(1, result.size());
Assert.assertEquals(1, result.get(0).getFiles().size());
Assert.assertEquals(3, ((PaimonRemoteFileDesc) result.get(0).getFiles().get(0))
.getPaimonSplitsInfo().getPaimonSplits().size());

ScalarOperator createDateGreaterPredicate = new BinaryPredicateOperator(BinaryType.GT, createDateColumn,
ConstantOperator.createVarchar(dateFormatter.format(now)));

// partition and none partition predicate, limit 1
metadata = new PaimonMetadata("paimon", environment, catalog, properties);
params = GetRemoteFilesParams.newBuilder().setFieldNames(fieldNames)
.setPredicate(Utils.compoundAnd(createDateGreaterPredicate, userEqualPredicate))
.setLimit(1).build();
result = metadata.getRemoteFiles(metadata.getTable("test_db", "test_table"), params);
Assert.assertEquals(1, result.size());
Assert.assertEquals(1, result.get(0).getFiles().size());
Assert.assertEquals(2, ((PaimonRemoteFileDesc) result.get(0).getFiles().get(0))
.getPaimonSplitsInfo().getPaimonSplits().size());

Function coalesce = GlobalStateMgr.getCurrentState().getFunction(
new Function(new FunctionName(FunctionSet.COALESCE), Lists.newArrayList(Type.VARCHAR, Type.VARCHAR),
Type.VARCHAR, false),
Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);

CallOperator createDateCoalesce = new CallOperator("coalesce", Type.VARCHAR,
List.of(createDateColumn, ConstantOperator.createVarchar("unknown")), coalesce);

ScalarOperator createDateCoalescePredicate = new BinaryPredicateOperator(BinaryType.EQ, createDateCoalesce,
ConstantOperator.createVarchar(dateFormatter.format(now)));

// partition with function predicate, limit 1
metadata = new PaimonMetadata("paimon", environment, catalog, properties);
params = GetRemoteFilesParams.newBuilder().setFieldNames(fieldNames).setPredicate(createDateCoalescePredicate)
.setLimit(1).build();
result = metadata.getRemoteFiles(metadata.getTable("test_db", "test_table"), params);
Assert.assertEquals(1, result.size());
Assert.assertEquals(1, result.get(0).getFiles().size());
Assert.assertEquals(6, ((PaimonRemoteFileDesc) result.get(0).getFiles().get(0))
.getPaimonSplitsInfo().getPaimonSplits().size());

catalog.dropTable(identifier, true);
catalog.dropDatabase("test_db", true, true);
Files.delete(tmpDir);
}

@Test
public void testGetCloudConfiguration() {
CloudConfiguration cc = metadata.getCloudConfiguration();
Expand Down
Loading