diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6638e00829e8..ffab506ea165 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3121,7 +3121,7 @@ public static enum ConfVars { TXN_MERGE_INSERT_X_LOCK("hive.txn.xlock.mergeinsert", false, "Ensures MERGE INSERT operations acquire EXCLUSIVE / EXCL_WRITE lock for transactional tables.\n" + "If enabled, prevents duplicates when MERGE statements are executed in parallel transactions."), - TXN_WRITE_X_LOCK("hive.txn.xlock.write", true, + TXN_WRITE_X_LOCK("hive.txn.xlock.write", false, "Manages concurrency levels for ACID resources. Provides better level of query parallelism by enabling " + "shared writes and write-write conflict resolution at the commit step." + "- If true - exclusive writes are used:\n" + @@ -3130,7 +3130,7 @@ public static enum ConfVars { " - INSERT acquires SHARED_READ locks\n" + "- If false - shared writes, transaction is aborted in case of conflicting changes:\n" + " - INSERT OVERWRITE acquires EXCL_WRITE locks\n" + - " - INSERT/UPDATE/DELETE acquire SHARED_READ locks"), + " - INSERT/UPDATE/DELETE acquire SHARED_WRITE locks"), HIVE_TXN_STATS_ENABLED("hive.txn.stats.enabled", true, "Whether Hive supports transactional stats (accurate stats for transactional tables)"), HIVE_TXN_ACID_DIR_CACHE_DURATION("hive.txn.acid.dir.cache.duration", diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveLock.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveLock.java index 20517f3e9052..442385dc8b49 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveLock.java +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveLock.java @@ -19,7 +19,7 @@ package org.apache.iceberg.hive; -interface HiveLock { +public interface HiveLock { void lock() throws LockException; void ensureActive() throws LockException; diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 8a4d86637706..b7142a5f4914 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -57,6 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.JsonUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -476,6 +477,10 @@ private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration c ConfigProperties.ENGINE_HIVE_ENABLED, true); } + private static boolean hiveLockEnabled(TableMetadata metadata, Configuration conf) { + return hiveLockEnabled(metadata != null ? metadata.properties() : null, conf); + } + /** * Returns if the hive locking should be enabled on the table, or not. * @@ -489,14 +494,14 @@ private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration c * TableProperties#HIVE_LOCK_ENABLED_DEFAULT} * * - * @param metadata Table metadata to use + * @param properties Table properties to use * @param conf The hive configuration to use * @return if the hive engine related values should be enabled or not */ - private static boolean hiveLockEnabled(TableMetadata metadata, Configuration conf) { - if (metadata != null && metadata.properties().get(TableProperties.HIVE_LOCK_ENABLED) != null) { + public static boolean hiveLockEnabled(Map properties, Configuration conf) { + if (properties != null && properties.containsKey(TableProperties.HIVE_LOCK_ENABLED)) { // We know that the property is set, so default value will not be used, - return metadata.propertyAsBoolean(TableProperties.HIVE_LOCK_ENABLED, false); + return PropertyUtil.propertyAsBoolean(properties, TableProperties.HIVE_LOCK_ENABLED, false); } return conf.getBoolean( diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 534afe11afea..6bb5f27e76ef 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc; import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields; import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; @@ -70,6 +71,7 @@ import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionStateUtil; import org.apache.hadoop.hive.ql.util.NullOrdering; @@ -116,9 +118,11 @@ import org.apache.iceberg.expressions.UnboundPredicate; import org.apache.iceberg.expressions.UnboundTerm; import org.apache.iceberg.hive.CachedClientPool; +import org.apache.iceberg.hive.HiveLock; import org.apache.iceberg.hive.HiveSchemaUtil; import org.apache.iceberg.hive.HiveTableOperations; import org.apache.iceberg.hive.MetastoreLock; +import org.apache.iceberg.hive.NoLock; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.mapping.MappingUtil; @@ -193,7 +197,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook { private Transaction transaction; private AlterTableType currentAlterTableOp; private boolean createHMSTableInHook = false; - private MetastoreLock commitLock; + private HiveLock commitLock; private enum FileFormat { ORC("orc"), PARQUET("parquet"), AVRO("avro"); @@ -409,8 +413,7 @@ public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E context.getProperties().get(OLD_TABLE_NAME)).toString()); } if (commitLock == null) { - commitLock = new MetastoreLock(conf, new CachedClientPool(conf, Maps.fromProperties(catalogProperties)), - catalogProperties.getProperty(Catalogs.NAME), hmsTable.getDbName(), hmsTable.getTableName()); + commitLock = lockObject(hmsTable); } try { @@ -422,6 +425,22 @@ public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E } } + private HiveLock lockObject(org.apache.hadoop.hive.metastore.api.Table hmsTable) { + if (!HiveTableOperations.hiveLockEnabled(hmsTable.getParameters(), conf) || + SessionStateUtil.getQueryState(conf) + .map(QueryState::getHiveOperation) + .filter(opType -> HiveOperation.QUERY == opType) + .isPresent()) { + return new NoLock(); + } else { + return new MetastoreLock( + conf, + new CachedClientPool(conf, Maps.fromProperties(catalogProperties)), + catalogProperties.getProperty(Catalogs.NAME), hmsTable.getDbName(), + hmsTable.getTableName()); + } + } + private void doPreAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context) throws MetaException { try { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 54ce20e3a580..013bf168cf3e 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -178,6 +178,7 @@ import org.apache.iceberg.hadoop.ConfigProperties; import org.apache.iceberg.hadoop.HadoopConfigurable; import org.apache.iceberg.hive.HiveSchemaUtil; +import org.apache.iceberg.hive.HiveTableOperations; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; @@ -836,13 +837,24 @@ private void checkAndMergeColStats(List statsNew, Table tbl) t */ @Override public LockType getLockType(WriteEntity writeEntity) { + org.apache.hadoop.hive.ql.metadata.Table hmsTable = writeEntity.getTable(); + boolean sharedWrite = !HiveConf.getBoolVar(conf, ConfVars.TXN_WRITE_X_LOCK); // Materialized views stored by Iceberg and the MV metadata is stored in HMS doesn't need write locking because // the locking is done by DbTxnManager.acquireMaterializationRebuildLock() if (TableType.MATERIALIZED_VIEW == writeEntity.getTable().getTableType()) { return LockType.SHARED_READ; } - if (WriteEntity.WriteType.INSERT_OVERWRITE == writeEntity.getWriteType()) { - return LockType.EXCL_WRITE; + if (HiveTableOperations.hiveLockEnabled(hmsTable.getParameters(), conf)) { + throw new RuntimeException("Hive locking on table `" + hmsTable.getFullTableName() + + "`cannot be enabled when `engine.hive.lock-enabled`=`true`. " + + "Disable `engine.hive.lock-enabled` to use Hive locking"); + } + switch (writeEntity.getWriteType()) { + case INSERT_OVERWRITE: + return LockType.EXCL_WRITE; + case UPDATE: + case DELETE: + return sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE; } return LockType.SHARED_WRITE; } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java index aaf7138056d1..832676df55b7 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -130,6 +131,40 @@ public void testStatsWithInsert() { checkColStatMinMaxValue(identifier.name(), "customer_id", 0, 5); } + @Test + public void testStatsWithPessimisticLockInsert() { + Assume.assumeTrue(testTableType == TestTables.TestTableType.HIVE_CATALOG); + TableIdentifier identifier = getTableIdentifierWithPessimisticLock("false"); + String insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, identifier, false); + shell.executeStatement(insert); + + checkColStat(identifier.name(), "customer_id", true); + checkColStatMinMaxValue(identifier.name(), "customer_id", 0, 2); + } + + @Test + public void testStatsWithPessimisticLockInsertWhenHiveLockEnabled() { + Assume.assumeTrue(testTableType == TestTables.TestTableType.HIVE_CATALOG); + TableIdentifier identifier = getTableIdentifierWithPessimisticLock("true"); + String insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, identifier, false); + AssertHelpers.assertThrows( + "Should throw RuntimeException when Hive locking is on with 'engine.hive.lock-enabled=true'", + RuntimeException.class, + () -> shell.executeStatement(insert) + ); + } + + private TableIdentifier getTableIdentifierWithPessimisticLock(String hiveLockEnabled) { + TableIdentifier identifier = TableIdentifier.of("default", "customers"); + + shell.setHiveSessionValue(HiveConf.ConfVars.HIVE_STATS_AUTOGATHER.varname, true); + shell.setHiveSessionValue(HiveConf.ConfVars.HIVE_TXN_EXT_LOCKING_ENABLED.varname, true); + testTables.createTable(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + PartitionSpec.unpartitioned(), fileFormat, ImmutableList.of(), formatVersion, + ImmutableMap.of(TableProperties.HIVE_LOCK_ENABLED, hiveLockEnabled)); + return identifier; + } + @Test public void testStatsWithInsertOverwrite() { TableIdentifier identifier = TableIdentifier.of("default", "customers"); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 266e0ef299e1..2eaed2f1107f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -404,6 +404,7 @@ private IMetaStoreClient prepareParallelTest(String tableName, int val) throws Exception, MetaException, TException, NoSuchObjectException { hiveConf.setBoolean("hive.stats.autogather", true); hiveConf.setBoolean("hive.stats.column.autogather", true); + hiveConf.setBoolean("hive.txn.xlock.write", true); // Need to close the thread local Hive object so that configuration change is reflected to HMS. Hive.closeCurrent(); runStatementOnDriver("drop table if exists " + tableName);