Skip to content
4 changes: 2 additions & 2 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -3121,7 +3121,7 @@ public static enum ConfVars {
TXN_MERGE_INSERT_X_LOCK("hive.txn.xlock.mergeinsert", false,
"Ensures MERGE INSERT operations acquire EXCLUSIVE / EXCL_WRITE lock for transactional tables.\n" +
"If enabled, prevents duplicates when MERGE statements are executed in parallel transactions."),
TXN_WRITE_X_LOCK("hive.txn.xlock.write", true,
TXN_WRITE_X_LOCK("hive.txn.xlock.write", false,
"Manages concurrency levels for ACID resources. Provides better level of query parallelism by enabling " +
"shared writes and write-write conflict resolution at the commit step." +
"- If true - exclusive writes are used:\n" +
Expand All @@ -3130,7 +3130,7 @@ public static enum ConfVars {
" - INSERT acquires SHARED_READ locks\n" +
"- If false - shared writes, transaction is aborted in case of conflicting changes:\n" +
" - INSERT OVERWRITE acquires EXCL_WRITE locks\n" +
" - INSERT/UPDATE/DELETE acquire SHARED_READ locks"),
" - INSERT/UPDATE/DELETE acquire SHARED_WRITE locks"),
HIVE_TXN_STATS_ENABLED("hive.txn.stats.enabled", true,
"Whether Hive supports transactional stats (accurate stats for transactional tables)"),
HIVE_TXN_ACID_DIR_CACHE_DURATION("hive.txn.acid.dir.cache.duration",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.iceberg.hive;

interface HiveLock {
public interface HiveLock {
void lock() throws LockException;

void ensureActive() throws LockException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.JsonUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.thrift.TException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -476,6 +477,10 @@ private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration c
ConfigProperties.ENGINE_HIVE_ENABLED, true);
}

private static boolean hiveLockEnabled(TableMetadata metadata, Configuration conf) {
return hiveLockEnabled(metadata != null ? metadata.properties() : null, conf);
}

/**
* Returns if the hive locking should be enabled on the table, or not.
*
Expand All @@ -489,14 +494,14 @@ private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration c
* TableProperties#HIVE_LOCK_ENABLED_DEFAULT}
* </ol>
*
* @param metadata Table metadata to use
* @param properties Table properties to use
* @param conf The hive configuration to use
* @return if the hive engine related values should be enabled or not
*/
private static boolean hiveLockEnabled(TableMetadata metadata, Configuration conf) {
if (metadata != null && metadata.properties().get(TableProperties.HIVE_LOCK_ENABLED) != null) {
public static boolean hiveLockEnabled(Map<String, String> properties, Configuration conf) {
if (properties != null && properties.containsKey(TableProperties.HIVE_LOCK_ENABLED)) {
// We know that the property is set, so default value will not be used,
return metadata.propertyAsBoolean(TableProperties.HIVE_LOCK_ENABLED, false);
return PropertyUtil.propertyAsBoolean(properties, TableProperties.HIVE_LOCK_ENABLED, false);
}

return conf.getBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
Expand All @@ -70,6 +71,7 @@
import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.hive.ql.util.NullOrdering;
Expand Down Expand Up @@ -116,9 +118,11 @@
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.expressions.UnboundTerm;
import org.apache.iceberg.hive.CachedClientPool;
import org.apache.iceberg.hive.HiveLock;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.hive.HiveTableOperations;
import org.apache.iceberg.hive.MetastoreLock;
import org.apache.iceberg.hive.NoLock;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.mapping.MappingUtil;
Expand Down Expand Up @@ -193,7 +197,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
private Transaction transaction;
private AlterTableType currentAlterTableOp;
private boolean createHMSTableInHook = false;
private MetastoreLock commitLock;
private HiveLock commitLock;

private enum FileFormat {
ORC("orc"), PARQUET("parquet"), AVRO("avro");
Expand Down Expand Up @@ -409,8 +413,7 @@ public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E
context.getProperties().get(OLD_TABLE_NAME)).toString());
}
if (commitLock == null) {
commitLock = new MetastoreLock(conf, new CachedClientPool(conf, Maps.fromProperties(catalogProperties)),
catalogProperties.getProperty(Catalogs.NAME), hmsTable.getDbName(), hmsTable.getTableName());
commitLock = lockObject(hmsTable);
}

try {
Expand All @@ -422,6 +425,22 @@ public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E
}
}

private HiveLock lockObject(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
if (!HiveTableOperations.hiveLockEnabled(hmsTable.getParameters(), conf) ||
SessionStateUtil.getQueryState(conf)
.map(QueryState::getHiveOperation)
.filter(opType -> HiveOperation.QUERY == opType)
.isPresent()) {
return new NoLock();
} else {
return new MetastoreLock(
conf,
new CachedClientPool(conf, Maps.fromProperties(catalogProperties)),
catalogProperties.getProperty(Catalogs.NAME), hmsTable.getDbName(),
hmsTable.getTableName());
}
}

private void doPreAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context)
throws MetaException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.hive.HiveTableOperations;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
Expand Down Expand Up @@ -836,13 +837,24 @@ private void checkAndMergeColStats(List<ColumnStatistics> statsNew, Table tbl) t
*/
@Override
public LockType getLockType(WriteEntity writeEntity) {
org.apache.hadoop.hive.ql.metadata.Table hmsTable = writeEntity.getTable();
boolean sharedWrite = !HiveConf.getBoolVar(conf, ConfVars.TXN_WRITE_X_LOCK);
// Materialized views stored by Iceberg and the MV metadata is stored in HMS doesn't need write locking because
// the locking is done by DbTxnManager.acquireMaterializationRebuildLock()
if (TableType.MATERIALIZED_VIEW == writeEntity.getTable().getTableType()) {
return LockType.SHARED_READ;
}
if (WriteEntity.WriteType.INSERT_OVERWRITE == writeEntity.getWriteType()) {
return LockType.EXCL_WRITE;
if (HiveTableOperations.hiveLockEnabled(hmsTable.getParameters(), conf)) {
throw new RuntimeException("Hive locking on table `" + hmsTable.getFullTableName() +
"`cannot be enabled when `engine.hive.lock-enabled`=`true`. " +
"Disable `engine.hive.lock-enabled` to use Hive locking");
}
switch (writeEntity.getWriteType()) {
case INSERT_OVERWRITE:
return LockType.EXCL_WRITE;
case UPDATE:
case DELETE:
return sharedWrite ? LockType.SHARED_WRITE : LockType.EXCL_WRITE;
}
return LockType.SHARED_WRITE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -130,6 +131,40 @@ public void testStatsWithInsert() {
checkColStatMinMaxValue(identifier.name(), "customer_id", 0, 5);
}

@Test
public void testStatsWithPessimisticLockInsert() {
Assume.assumeTrue(testTableType == TestTables.TestTableType.HIVE_CATALOG);
TableIdentifier identifier = getTableIdentifierWithPessimisticLock("false");
String insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, identifier, false);
shell.executeStatement(insert);

checkColStat(identifier.name(), "customer_id", true);
checkColStatMinMaxValue(identifier.name(), "customer_id", 0, 2);
}

@Test
public void testStatsWithPessimisticLockInsertWhenHiveLockEnabled() {
Assume.assumeTrue(testTableType == TestTables.TestTableType.HIVE_CATALOG);
TableIdentifier identifier = getTableIdentifierWithPessimisticLock("true");
String insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, identifier, false);
AssertHelpers.assertThrows(
"Should throw RuntimeException when Hive locking is on with 'engine.hive.lock-enabled=true'",
RuntimeException.class,
() -> shell.executeStatement(insert)
);
}

private TableIdentifier getTableIdentifierWithPessimisticLock(String hiveLockEnabled) {
TableIdentifier identifier = TableIdentifier.of("default", "customers");

shell.setHiveSessionValue(HiveConf.ConfVars.HIVE_STATS_AUTOGATHER.varname, true);
shell.setHiveSessionValue(HiveConf.ConfVars.HIVE_TXN_EXT_LOCKING_ENABLED.varname, true);
testTables.createTable(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
PartitionSpec.unpartitioned(), fileFormat, ImmutableList.of(), formatVersion,
ImmutableMap.of(TableProperties.HIVE_LOCK_ENABLED, hiveLockEnabled));
return identifier;
}

@Test
public void testStatsWithInsertOverwrite() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
Expand Down
1 change: 1 addition & 0 deletions ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ private IMetaStoreClient prepareParallelTest(String tableName, int val)
throws Exception, MetaException, TException, NoSuchObjectException {
hiveConf.setBoolean("hive.stats.autogather", true);
hiveConf.setBoolean("hive.stats.column.autogather", true);
hiveConf.setBoolean("hive.txn.xlock.write", true);
// Need to close the thread local Hive object so that configuration change is reflected to HMS.
Hive.closeCurrent();
runStatementOnDriver("drop table if exists " + tableName);
Expand Down
Loading