Skip to content

Commit

Permalink
HIVE-28590: Iceberg: Add support for FILE_SIZE_THRESHOLD to compactio…
Browse files Browse the repository at this point in the history
…n command
  • Loading branch information
Dmitriy Fingerman committed Nov 13, 2024
1 parent 7b3a4be commit db5c2ad
Show file tree
Hide file tree
Showing 20 changed files with 430 additions and 42 deletions.
1 change: 1 addition & 0 deletions common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ public enum ErrorMsg {
ICEBERG_COMPACTION_WITH_PART_SPEC_AND_FILTER_NOT_SUPPORTED(10441, "Compaction command with both partition spec and filter is not supported on Iceberg table {0}.{1}", true),
COMPACTION_THREAD_INITIALIZATION(10442, "Compaction thread failed during initialization", false),
ALTER_TABLE_COMPACTION_NON_PARTITIONED_COLUMN_NOT_ALLOWED(10443, "Filter expression can contain only partition columns."),
UNSUPPORTED_COMPACTION_REQUEST_WITH_FILE_SIZE_THRESHOLD(10444, "File size threshold is supported only with major and minor compaction for Iceberg tables"),

//========================== 20000 range starts here ========================//

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobContextImpl;
Expand Down Expand Up @@ -505,7 +506,13 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
.map(x -> x.getJobConf().get(IcebergCompactionService.PARTITION_PATH))
.orElse(null);

commitCompaction(table, snapshotId, startTime, filesForCommit, partitionPath);
Long fileSizeThreshold = jobContexts.stream()
.findAny()
.map(x -> x.getJobConf().get(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD))
.map(Long::parseLong)
.orElse(null);

commitCompaction(table, snapshotId, startTime, filesForCommit, partitionPath, fileSizeThreshold);
} else {
commitOverwrite(table, branchName, snapshotId, startTime, filesForCommit);
}
Expand Down Expand Up @@ -597,9 +604,10 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
* @param partitionPath The path of the compacted partition
*/
private void commitCompaction(Table table, Long snapshotId, long startTime, FilesForCommit results,
String partitionPath) {
List<DataFile> existingDataFiles = IcebergTableUtil.getDataFiles(table, partitionPath);
List<DeleteFile> existingDeleteFiles = IcebergTableUtil.getDeleteFiles(table, partitionPath);
String partitionPath, Long fileSizeThreshold) {
List<DataFile> existingDataFiles = IcebergTableUtil.getDataFiles(table, partitionPath, fileSizeThreshold);
List<DeleteFile> existingDeleteFiles = fileSizeThreshold == null ?
IcebergTableUtil.getDeleteFiles(table, partitionPath) : Collections.emptyList();

RewriteFiles rewriteFiles = table.newRewrite();
existingDataFiles.forEach(rewriteFiles::deleteFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2255,24 +2255,13 @@ boolean canCompact(HiveConf hiveConf, Table icebergTable, String partitionPath,
return true;
}

int dataFilesCount = IcebergTableUtil.getDataFiles(icebergTable, partitionPath).size();
int dataFilesCount = IcebergTableUtil.getDataFiles(icebergTable, partitionPath, null).size();
if (dataFilesCount < 2) {
return false;
}

long fileSizeInBytesThreshold;
switch (compactionType) {
case MAJOR:
fileSizeInBytesThreshold = HiveConf.getSizeVar(hiveConf,
ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD);
break;
case MINOR:
fileSizeInBytesThreshold = HiveConf.getSizeVar(hiveConf,
ConfVars.HIVE_ICEBERG_MINOR_COMPACTION_FILE_SIZE_THRESHOLD);
break;
default:
throw new HiveException(String.format("Invalid compaction type: %s", compactionType.name()));
}
long fileSizeInBytesThreshold = HiveConf.toSizeBytes(IcebergTableUtil.getFileSizeTreshold(hiveConf,
compactionType));

float fileSizeRatioThreshold = HiveConf.getFloatVar(hiveConf, ConfVars.ICEBERG_COMPACTION_FILE_SIZE_RATIO);
float fileSizeRatio = IcebergTableUtil.getFileSizeRatio(icebergTable, partitionPath, fileSizeInBytesThreshold);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
import org.apache.hadoop.hive.common.type.TimestampTZUtil;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
Expand Down Expand Up @@ -417,19 +419,23 @@ public static PartitionData toPartitionData(StructLike sourceKey, Types.StructTy
* 1. If the table is unpartitioned, returns all data files.
* 2. If partitionPath is not provided, returns all data files that belong to the non-latest partition spec.
* 3. If partitionPath is provided, returns all data files that belong to the corresponding partition.
* 4. If fileSizeThreshold is not null, file size threshold also applied in all cases above.
* @param table the iceberg table
* @param partitionPath partition path
* @param fileSizeThreshold file size threshold
* @return list of data files
*/
public static List<DataFile> getDataFiles(Table table, String partitionPath) {
public static List<DataFile> getDataFiles(Table table, String partitionPath, Long fileSizeThreshold) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
CloseableIterable<FileScanTask> filteredFileScanTasks =
CloseableIterable.filter(fileScanTasks, t -> {
DataFile file = t.asFileScanTask().file();
return !table.spec().isPartitioned() ||
return (!table.spec().isPartitioned() ||
partitionPath == null && file.specId() != table.spec().specId() ||
partitionPath != null &&
table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath);
table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath)) &&
(fileSizeThreshold == null || file.fileSizeInBytes() < fileSizeThreshold);
});
return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file()));
}
Expand All @@ -441,6 +447,7 @@ public static List<DataFile> getDataFiles(Table table, String partitionPath) {
* 3. If partitionPath is provided, returns all delete files that belong to corresponding partition.
* @param table the iceberg table
* @param partitionPath partition path
* @return list of delete files
*/
public static List<DeleteFile> getDeleteFiles(Table table, String partitionPath) {
Table deletesTable =
Expand Down Expand Up @@ -654,4 +661,21 @@ public static Snapshot getTableSnapshot(org.apache.hadoop.hive.ql.metadata.Table
}
return snapshot;
}

public static String getFileSizeTreshold(HiveConf hiveConf, CompactionType compactionType) throws HiveException {
String fileSizeInBytesThreshold;
switch (compactionType) {
case MAJOR:
fileSizeInBytesThreshold = HiveConf.getVar(hiveConf,
HiveConf.ConfVars.HIVE_ICEBERG_MAJOR_COMPACTION_FILE_SIZE_THRESHOLD);
break;
case MINOR:
fileSizeInBytesThreshold = HiveConf.getVar(hiveConf,
HiveConf.ConfVars.HIVE_ICEBERG_MINOR_COMPACTION_FILE_SIZE_THRESHOLD);
break;
default:
throw new HiveException(String.format("Invalid compaction type: %s", compactionType.name()));
}
return fileSizeInBytesThreshold;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public IcebergCompactionService() {

public Boolean compact(Table table, CompactionInfo ci) throws Exception {

if (!ci.isMajorCompaction()) {
ci.errorMessage = "Presently Iceberg tables support only Major compaction";
if (!ci.isMajorCompaction() && !ci.isMinorCompaction()) {
ci.errorMessage = String.format("Iceberg tables do not support %s compaction type", ci.type.name());
LOG.error(ci.errorMessage + " Compaction info: {}", ci);
try {
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
import org.apache.hadoop.hive.ql.DriverUtils;
Expand All @@ -43,9 +44,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergMajorQueryCompactor extends QueryCompactor {
public class IcebergQueryCompactor extends QueryCompactor {

private static final Logger LOG = LoggerFactory.getLogger(IcebergMajorQueryCompactor.class.getName());
private static final Logger LOG = LoggerFactory.getLogger(IcebergQueryCompactor.class.getName());

@Override
public boolean run(CompactorContext context) throws IOException, HiveException, InterruptedException {
Expand All @@ -62,20 +63,35 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
String compactionQuery;
String orderBy = ci.orderByClause == null ? "" : ci.orderByClause;
String fileSizeCond = null;

if (ci.type == CompactionType.MINOR) {
String userProvidedFileSizeThreshold = context.getCompactionInfo()
.getProperty(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD);
Long fileSizeInBytesThreshold = HiveConf.toSizeBytes(userProvidedFileSizeThreshold == null ?
IcebergTableUtil.getFileSizeTreshold(conf, ci.type) : userProvidedFileSizeThreshold);
fileSizeCond = String.format("%1$s in (select file_path from %2$s.files where file_size_in_bytes < %3$d)",
VirtualColumn.FILE_PATH.getName(), compactTableName, fileSizeInBytesThreshold);
conf.setLong(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD, fileSizeInBytesThreshold);
// IOW query containing a join with Iceberg .files metadata table fails with exception that Iceberg AVRO format
// doesn't support vectorization, hence disabling it in this case.
conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
}

if (partSpec == null) {
if (!icebergTable.spec().isPartitioned()) {
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name());
compactionQuery = String.format("insert overwrite table %s select * from %<s %2$s", compactTableName, orderBy);
compactionQuery = String.format("insert overwrite table %s select * from %<s %2$s %3$s", compactTableName,
fileSizeCond == null ? "" : "where " + fileSizeCond, orderBy);
} else if (icebergTable.specs().size() > 1) {
// Compacting partitions of old partition specs on a partitioned table with partition evolution
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
// A single filter on a virtual column causes errors during compilation,
// added another filter on file_path as a workaround.
compactionQuery = String.format("insert overwrite table %1$s select * from %1$s " +
"where %2$s != %3$d and %4$s is not null %5$s",
"where %2$s != %3$d and %4$s is not null %5$s %6$s",
compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(),
VirtualColumn.FILE_PATH.getName(), orderBy);
VirtualColumn.FILE_PATH.getName(), fileSizeCond == null ? "" : "and " + fileSizeCond, orderBy);
} else {
// Partitioned table without partition evolution with partition spec as null in the compaction request - this
// code branch is not supposed to be reachable
Expand All @@ -90,8 +106,8 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null);

compactionQuery = String.format("insert overwrite table %1$s select * from %1$s where %2$s=%3$d " +
"and %4$s is not null %5$s", compactTableName, VirtualColumn.PARTITION_HASH.getName(), partitionHash,
VirtualColumn.FILE_PATH.getName(), orderBy);
"and %4$s is not null %5$s %6$s", compactTableName, VirtualColumn.PARTITION_HASH.getName(), partitionHash,
VirtualColumn.FILE_PATH.getName(), fileSizeCond == null ? "" : "and " + fileSizeCond, orderBy);
}

SessionState sessionState = setupQueryCompactionSession(conf, ci, tblProperties);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-- SORT_QUERY_RESULTS
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
-- Mask random uuid
--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
-- Mask a random snapshot id
--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
-- Mask added file size
--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask total file size
--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask current-snapshot-timestamp-ms
--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
-- Mask the enqueue time which is based on current time
--! qt:replace:/(MINOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^[0-9]/#Masked#/
-- Mask removed file size
--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
-- Mask iceberg version
--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table ice_orc (
first_name string,
last_name string
)
stored by iceberg stored as orc
tblproperties ('format-version'='2');

insert into ice_orc VALUES ('fn1','ln1');
insert into ice_orc VALUES ('fn2','ln2'), ('fn3','ln3');
insert into ice_orc VALUES ('fn4','ln4'), ('fn5','ln5'), ('fn6','ln6'), ('fn7','ln7');

select * from ice_orc;
describe formatted ice_orc;

explain alter table ice_orc COMPACT 'minor' and wait file_size_threshold = '365bytes' pool 'iceberg';
explain optimize table ice_orc rewrite data file_size_threshold = '365bytes' pool 'iceberg';

alter table ice_orc COMPACT 'minor' and wait file_size_threshold = '365bytes' pool 'iceberg';

select * from ice_orc;
describe formatted ice_orc;
show compactions;
Loading

0 comments on commit db5c2ad

Please sign in to comment.