Skip to content

Commit d3d0132

Browse files
committed
Iceberg: Table-level column stats filter support
1 parent f06aa95 commit d3d0132

File tree

4 files changed

+94
-63
lines changed

4 files changed

+94
-63
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

+48-52
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.concurrent.Executors;
4343
import java.util.concurrent.atomic.AtomicInteger;
4444
import java.util.function.Function;
45+
import java.util.function.Predicate;
4546
import java.util.stream.Collectors;
4647
import java.util.stream.Stream;
4748
import org.apache.commons.collections.MapUtils;
@@ -637,20 +638,26 @@ private boolean writeColStats(List<ColumnStatistics> colStats, Table tbl) {
637638
long snapshotId = tbl.currentSnapshot().snapshotId();
638639
long snapshotSequenceNumber = tbl.currentSnapshot().sequenceNumber();
639640

640-
colStats.forEach(statsObj -> {
641-
byte[] serializeColStats = SerializationUtils.serialize(statsObj);
642-
puffinWriter.add(
643-
new Blob(
644-
ColumnStatisticsObj.class.getSimpleName(),
645-
ImmutableList.of(1),
646-
snapshotId,
647-
snapshotSequenceNumber,
648-
ByteBuffer.wrap(serializeColStats),
649-
PuffinCompressionCodec.NONE,
650-
ImmutableMap.of("partition",
651-
String.valueOf(statsObj.getStatsDesc().getPartName()))
652-
));
653-
});
641+
colStats.forEach(stats -> {
642+
boolean isTblLevel = stats.getStatsDesc().isIsTblLevel();
643+
644+
for (Serializable statsObj : (isTblLevel) ? stats.getStatsObj() : Collections.singletonList(stats)) {
645+
byte[] serializeColStats = SerializationUtils.serialize(statsObj);
646+
puffinWriter.add(
647+
new Blob(
648+
ColumnStatisticsObj.class.getSimpleName(),
649+
ImmutableList.of((isTblLevel) ? tbl.spec().schema().findField(
650+
((ColumnStatisticsObj) statsObj).getColName()).fieldId() : 1),
651+
snapshotId,
652+
snapshotSequenceNumber,
653+
ByteBuffer.wrap(serializeColStats),
654+
PuffinCompressionCodec.NONE,
655+
(isTblLevel) ?
656+
ImmutableMap.of("specId", String.valueOf(tbl.spec().specId())) :
657+
ImmutableMap.of("partition", String.valueOf(stats.getStatsDesc().getPartName()))
658+
));
659+
}});
660+
654661
puffinWriter.finish();
655662

656663
statisticsFile =
@@ -693,17 +700,27 @@ private boolean canProvideColStats(Table table, long snapshotId) {
693700
}
694701

695702
@Override
696-
public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
703+
public List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable, List<String> colNames) {
697704
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
698705
Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, hmsTable);
699706

700-
ColumnStatistics emptyStats = new ColumnStatistics();
701707
if (snapshot != null) {
702-
return IcebergTableUtil.getColStatsPath(table, snapshot.snapshotId())
703-
.map(statsPath -> readColStats(table, statsPath, null).get(0))
704-
.orElse(emptyStats).getStatsObj();
708+
709+
Predicate<BlobMetadata> filter;
710+
if (colNames != null) {
711+
Set<String> columns = Sets.newHashSet(colNames);
712+
filter = metadata -> {
713+
int specId = Integer.parseInt(metadata.properties().get("specId"));
714+
String column = table.specs().get(specId).schema().findColumnName(metadata.inputFields().get(0));
715+
return columns.contains(column);
716+
};
717+
} else {
718+
filter = null;
719+
}
720+
721+
return IcebergTableUtil.readColStats(table, snapshot.snapshotId(), filter);
705722
}
706-
return emptyStats.getStatsObj();
723+
return Lists.newArrayList();
707724
}
708725

709726
@Override
@@ -720,9 +737,10 @@ public AggrStats getAggrColStatsFor(org.apache.hadoop.hive.ql.metadata.Table hms
720737
MetastoreConf.ConfVars.STATS_NDV_DENSITY_FUNCTION);
721738
double ndvTuner = MetastoreConf.getDoubleVar(getConf(), MetastoreConf.ConfVars.STATS_NDV_TUNER);
722739

723-
List<ColumnStatistics> partStats = IcebergTableUtil.getColStatsPath(table, snapshot.snapshotId())
724-
.map(statsPath -> readColStats(table, statsPath, Sets.newHashSet(partNames)))
725-
.orElse(Collections.emptyList());
740+
Set<String> partitions = Sets.newHashSet(partNames);
741+
Predicate<BlobMetadata> filter = metadata -> partitions.contains(metadata.properties().get("partition"));
742+
743+
List<ColumnStatistics> partStats = IcebergTableUtil.readColStats(table, snapshot.snapshotId(), filter);
726744

727745
partStats.forEach(colStats ->
728746
colStats.getStatsObj().removeIf(statsObj -> !colNames.contains(statsObj.getColName())));
@@ -736,30 +754,6 @@ public AggrStats getAggrColStatsFor(org.apache.hadoop.hive.ql.metadata.Table hms
736754
return new AggrStats(colStatsList, partStats.size());
737755
}
738756

739-
private List<ColumnStatistics> readColStats(Table table, Path statsPath, Set<String> partNames) {
740-
List<ColumnStatistics> colStats = Lists.newArrayList();
741-
742-
try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath.toString())).build()) {
743-
List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
744-
745-
if (partNames != null) {
746-
blobMetadata = blobMetadata.stream()
747-
.filter(metadata -> partNames.contains(metadata.properties().get("partition")))
748-
.collect(Collectors.toList());
749-
}
750-
Iterator<ByteBuffer> it = Iterables.transform(reader.readAll(blobMetadata), Pair::second).iterator();
751-
LOG.info("Using col stats from : {}", statsPath);
752-
753-
while (it.hasNext()) {
754-
byte[] byteBuffer = ByteBuffers.toByteArray(it.next());
755-
colStats.add(SerializationUtils.deserialize(byteBuffer));
756-
}
757-
} catch (Exception e) {
758-
LOG.warn(" Unable to read col stats: ", e);
759-
}
760-
return colStats;
761-
}
762-
763757
@Override
764758
public boolean canComputeQueryUsingStats(Partish partish) {
765759
org.apache.hadoop.hive.ql.metadata.Table hmsTable = partish.getTable();
@@ -799,22 +793,24 @@ private boolean shouldRewriteColStats(Table tbl) {
799793
private void checkAndMergeColStats(List<ColumnStatistics> statsNew, Table tbl) throws InvalidObjectException {
800794
Long previousSnapshotId = tbl.currentSnapshot().parentId();
801795
if (previousSnapshotId != null && canProvideColStats(tbl, previousSnapshotId)) {
802-
List<ColumnStatistics> statsOld = IcebergTableUtil.getColStatsPath(tbl, previousSnapshotId)
803-
.map(statsPath -> readColStats(tbl, statsPath, null))
804-
.orElse(Collections.emptyList());
805796

806797
boolean isTblLevel = statsNew.get(0).getStatsDesc().isIsTblLevel();
807798
Map<String, ColumnStatistics> oldStatsMap = Maps.newHashMap();
808799

800+
List<?> statsOld = IcebergTableUtil.readColStats(tbl, previousSnapshotId, null);
801+
809802
if (!isTblLevel) {
810-
for (ColumnStatistics statsObjOld : statsOld) {
803+
for (ColumnStatistics statsObjOld : (List<ColumnStatistics>) statsOld) {
811804
oldStatsMap.put(statsObjOld.getStatsDesc().getPartName(), statsObjOld);
812805
}
806+
} else {
807+
statsOld = Collections.singletonList(
808+
new ColumnStatistics(null, (List<ColumnStatisticsObj>) statsOld));
813809
}
814810
for (ColumnStatistics statsObjNew : statsNew) {
815811
String partitionKey = statsObjNew.getStatsDesc().getPartName();
816812
ColumnStatistics statsObjOld = isTblLevel ?
817-
statsOld.get(0) : oldStatsMap.get(partitionKey);
813+
(ColumnStatistics) statsOld.get(0) : oldStatsMap.get(partitionKey);
818814

819815
if (statsObjOld != null && statsObjOld.getStatsObjSize() != 0 && !statsObjNew.getStatsObj().isEmpty()) {
820816
MetaStoreServerUtils.mergeColStats(statsObjNew, statsObjOld);

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java

+38-9
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,15 @@
2020
package org.apache.iceberg.mr.hive;
2121

2222
import java.io.IOException;
23+
import java.nio.ByteBuffer;
2324
import java.time.ZoneId;
24-
import java.util.Collections;
25-
import java.util.Comparator;
26-
import java.util.List;
27-
import java.util.Map;
28-
import java.util.Optional;
29-
import java.util.Properties;
25+
import java.util.*;
3026
import java.util.function.BinaryOperator;
3127
import java.util.function.Function;
28+
import java.util.function.Predicate;
3229
import java.util.stream.Collectors;
30+
31+
import org.apache.commons.lang3.SerializationUtils;
3332
import org.apache.commons.lang3.StringUtils;
3433
import org.apache.hadoop.conf.Configuration;
3534
import org.apache.hadoop.fs.Path;
@@ -77,14 +76,17 @@
7776
import org.apache.iceberg.io.CloseableIterable;
7877
import org.apache.iceberg.mr.Catalogs;
7978
import org.apache.iceberg.mr.InputFormatConfig;
79+
import org.apache.iceberg.puffin.BlobMetadata;
80+
import org.apache.iceberg.puffin.Puffin;
81+
import org.apache.iceberg.puffin.PuffinReader;
8082
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
83+
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
84+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
8185
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
8286
import org.apache.iceberg.types.Conversions;
8387
import org.apache.iceberg.types.Type;
8488
import org.apache.iceberg.types.Types;
85-
import org.apache.iceberg.util.PropertyUtil;
86-
import org.apache.iceberg.util.SnapshotUtil;
87-
import org.apache.iceberg.util.StructProjection;
89+
import org.apache.iceberg.util.*;
8890
import org.slf4j.Logger;
8991
import org.slf4j.LoggerFactory;
9092

@@ -547,4 +549,31 @@ public static TransformSpec getTransformSpec(Table table, String transformName,
547549
return spec;
548550
}
549551

552+
public static <T> List<T> readColStats(Table table, Long snapshotId, Predicate<BlobMetadata> filter) {
553+
List<T> colStats = Lists.newArrayList();
554+
555+
Optional<Path> statsPath = IcebergTableUtil.getColStatsPath(table, snapshotId);
556+
if (!statsPath.isPresent()) {
557+
return colStats;
558+
}
559+
try (PuffinReader reader = Puffin.read(table.io().newInputFile(statsPath.toString())).build()) {
560+
List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
561+
562+
if (filter != null) {
563+
blobMetadata = blobMetadata.stream().filter(filter)
564+
.collect(Collectors.toList());
565+
}
566+
Iterator<ByteBuffer> it = Iterables.transform(reader.readAll(blobMetadata), Pair::second).iterator();
567+
LOG.info("Using col stats from : {}", statsPath);
568+
569+
while (it.hasNext()) {
570+
byte[] byteBuffer = ByteBuffers.toByteArray(it.next());
571+
colStats.add(SerializationUtils.deserialize(byteBuffer));
572+
}
573+
} catch (Exception e) {
574+
LOG.warn(" Unable to read col stats: ", e);
575+
}
576+
return colStats;
577+
}
578+
550579
}

ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -6121,7 +6121,7 @@ public List<ColumnStatisticsObj> getTableColumnStatistics(
61216121
List<ColumnStatisticsObj> retv = null;
61226122
try {
61236123
if (tbl.isNonNative() && tbl.getStorageHandler().canProvideColStatistics(tbl)) {
6124-
return tbl.getStorageHandler().getColStatistics(tbl);
6124+
return tbl.getStorageHandler().getColStatistics(tbl, colNames);
61256125
}
61266126
if (checkTransactional) {
61276127
AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl);

ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -286,12 +286,18 @@ default boolean canProvidePartitionStatistics(org.apache.hadoop.hive.ql.metadata
286286
/**
287287
* Returns column statistics (upper/lower bounds, number of Null/NaN values, NDVs, histogram).
288288
* @param table table object
289+
* @param colNames list of column names
289290
* @return list of ColumnStatisticsObj objects
290291
*/
291-
default List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table table) {
292+
default List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table table, List<String> colNames) {
292293
return null;
293294
}
294295

296+
@Deprecated
297+
default List<ColumnStatisticsObj> getColStatistics(org.apache.hadoop.hive.ql.metadata.Table table) {
298+
return getColStatistics(table, null);
299+
}
300+
295301
/**
296302
* Returns an aggregated column statistics for the supplied partition list
297303
* @param table table object

0 commit comments

Comments
 (0)