diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala index c64ae9e2..14c74dfd 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala @@ -50,6 +50,7 @@ trait Metastore { infoDate: LocalDate, schema: Option[StructType], hiveHelper: HiveHelper, + updateSchema: Boolean, recreate: Boolean): Unit def getStats(tableName: String, infoDate: LocalDate): MetaTableStats diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala index 9e781e88..53e11837 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala @@ -38,6 +38,7 @@ import za.co.absa.pramen.core.utils.hive.{HiveFormat, HiveHelper} import java.time.{Instant, LocalDate} import scala.collection.mutable import scala.collection.mutable.ListBuffer +import scala.util.control.NonFatal class MetastoreImpl(appConfig: Config, tableDefsIn: Seq[MetaTable], @@ -163,6 +164,7 @@ class MetastoreImpl(appConfig: Config, infoDate: LocalDate, schema: Option[StructType], hiveHelper: HiveHelper, + updateSchema: Boolean, recreate: Boolean): Unit = { val mt = getTableDef(tableName) val hiveTable = mt.hiveTable match { @@ -197,24 +199,44 @@ class MetastoreImpl(appConfig: Config, val fullTableName = HiveHelper.getFullTable(mt.hiveConfig.database, hiveTable) val effectivePath = mt.hivePath.getOrElse(path) + var needAddPartition = false + if (recreate) { log.info(s"Recreating Hive table '$fullTableName'") hiveHelper.createOrUpdateHiveTable(effectivePath, format, effectiveSchema, Seq(mt.infoDateColumn), mt.hiveConfig.database, hiveTable) } else { if (hiveHelper.doesTableExist(mt.hiveConfig.database, hiveTable)) { - if (mt.hivePreferAddPartition && mt.format.isInstanceOf[DataFormat.Parquet]) { - val location = new Path(effectivePath, s"${mt.infoDateColumn}=${infoDate}") - log.info(s"The table '$fullTableName' exists. Adding partition '$location'...") - hiveHelper.addPartition(mt.hiveConfig.database, hiveTable, Seq(mt.infoDateColumn), Seq(infoDate.toString), location.toString) + if (updateSchema) { + try { + log.info(s"The table '$fullTableName' exists. Updating schema of the Hive table '$fullTableName'") + hiveHelper.replaceHiveTableSchema(effectiveSchema, Seq(mt.infoDateColumn), mt.hiveConfig.database, hiveTable) + // Schema changed in-place. We still need to add the new partition + needAddPartition = true + } catch { + case NonFatal(ex) => + log.warn(s"Could not update Hive schema via ${hiveHelper.getClass.getName}. Recreating Hive table '$fullTableName'", ex) + hiveHelper.createOrUpdateHiveTable(effectivePath, format, effectiveSchema, Seq(mt.infoDateColumn), mt.hiveConfig.database, hiveTable) + } } else { - log.info(s"The table '$fullTableName' exists. Repairing it.") - hiveHelper.repairHiveTable(mt.hiveConfig.database, hiveTable, format) + // Schema didn't change, but we need to add the new partition + needAddPartition = true } } else { log.info(s"The table '$fullTableName' does not exist. Creating it.") hiveHelper.createOrUpdateHiveTable(effectivePath, format, effectiveSchema, Seq(mt.infoDateColumn), mt.hiveConfig.database, hiveTable) } } + + if (needAddPartition) { + if (mt.hivePreferAddPartition && mt.format.isInstanceOf[DataFormat.Parquet]) { + val location = new Path(effectivePath, s"${mt.infoDateColumn}=${infoDate}") + log.info(s"The table '$fullTableName' exists. Adding partition '$location'...") + hiveHelper.addPartition(mt.hiveConfig.database, hiveTable, Seq(mt.infoDateColumn), Seq(infoDate.toString), location.toString) + } else { + log.info(s"The table '$fullTableName' exists. Repairing it.") + hiveHelper.repairHiveTable(mt.hiveConfig.database, hiveTable, format) + } + } } override def getStats(tableName: String, infoDate: LocalDate): MetaTableStats = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/HiveConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/HiveConfig.scala index ecd63e29..9317ed4d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/HiveConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/HiveConfig.scala @@ -82,13 +82,7 @@ object HiveConfig { * @return */ def fromConfigWithDefaults(conf: Config, defaults: HiveDefaultConfig, format: DataFormat, parent: String = ""): HiveConfig = { - val defaultTemplates = defaults.templates.getOrElse(format.name, HiveQueryTemplates( - DEFAULT_CREATE_TABLE_TEMPLATE, - DEFAULT_CREATE_ONLY_TABLE_TEMPLATE, - DEFAULT_REPAIR_TABLE_TEMPLATE, - DEFAULT_ADD_PARTITION_TEMPLATE, - DEFAULT_DROP_TABLE_TEMPLATE - )) + val defaultTemplates = defaults.templates.getOrElse(format.name, HiveQueryTemplates.getDefaultQueryTemplates) val hiveApi = if (conf.hasPath(HIVE_API_KEY)) HiveApi.fromString(conf.getString(HIVE_API_KEY)) @@ -111,6 +105,9 @@ object HiveConfig { val createOnlyTableTemplate = ConfigUtils.getOptionString(conf, s"$HIVE_TEMPLATE_CONFIG_PREFIX.$CREATE_ONLY_TABLE_TEMPLATE_KEY") .getOrElse(defaultTemplates.createOnlyTableTemplate) + val replaceSchemaTemplate = ConfigUtils.getOptionString(conf, s"$HIVE_TEMPLATE_CONFIG_PREFIX.$REPLACE_SCHEMA_TEMPLATE_KEY") + .getOrElse(defaultTemplates.replaceSchemaTemplate) + val repairTableTemplate = ConfigUtils.getOptionString(conf, s"$HIVE_TEMPLATE_CONFIG_PREFIX.$REPAIR_TABLE_TEMPLATE_KEY") .getOrElse(defaultTemplates.repairTableTemplate) @@ -126,7 +123,7 @@ object HiveConfig { HiveConfig( hiveApi = hiveApi, database = database, - templates = HiveQueryTemplates(createTableTemplate, createOnlyTableTemplate, repairTableTemplate, addPartitionTableTemplate, dropTableTemplate), + templates = HiveQueryTemplates(createTableTemplate, createOnlyTableTemplate, replaceSchemaTemplate, repairTableTemplate, addPartitionTableTemplate, dropTableTemplate), jdbcConfig = jdbcConfig, ignoreFailures, alwaysEscapeColumnNames, @@ -142,13 +139,7 @@ object HiveConfig { * @return Hive configuration with default query templates for the given format. */ def fromDefaults(defaults: HiveDefaultConfig, format: DataFormat): HiveConfig = { - val templates = defaults.templates.getOrElse(format.name, HiveQueryTemplates( - DEFAULT_CREATE_TABLE_TEMPLATE, - DEFAULT_CREATE_ONLY_TABLE_TEMPLATE, - DEFAULT_REPAIR_TABLE_TEMPLATE, - DEFAULT_ADD_PARTITION_TEMPLATE, - DEFAULT_DROP_TABLE_TEMPLATE - )) + val templates = defaults.templates.getOrElse(format.name, HiveQueryTemplates.getDefaultQueryTemplates) HiveConfig(defaults.hiveApi, defaults.database, templates, defaults.jdbcConfig, defaults.ignoreFailures, alwaysEscapeColumnNames = true, optimizeExistQuery = true) } @@ -156,7 +147,7 @@ object HiveConfig { def getNullConfig: HiveConfig = HiveConfig( HiveApi.Sql, None, - HiveQueryTemplates(DEFAULT_CREATE_TABLE_TEMPLATE, DEFAULT_CREATE_ONLY_TABLE_TEMPLATE, DEFAULT_REPAIR_TABLE_TEMPLATE, DEFAULT_ADD_PARTITION_TEMPLATE, DEFAULT_DROP_TABLE_TEMPLATE), + HiveQueryTemplates(DEFAULT_CREATE_TABLE_TEMPLATE, DEFAULT_CREATE_ONLY_TABLE_TEMPLATE, DEFAULT_REPLACE_SCHEMA_TEMPLATE, DEFAULT_REPAIR_TABLE_TEMPLATE, DEFAULT_ADD_PARTITION_TEMPLATE, DEFAULT_DROP_TABLE_TEMPLATE), None, ignoreFailures = false, alwaysEscapeColumnNames = true, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala index 99840544..3b6083bc 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala @@ -102,8 +102,9 @@ trait Job { * * @param schema The schema of the Hive table * @param infoDate The information date for which to update the Hive table + * @param updateSchema If true, the schema needs update * @param recreate Whether to force recreate the Hive table * @return The list of warnings if Hive errors are ignored. */ - def createOrRefreshHiveTable(schema: StructType, infoDate: LocalDate, recreate: Boolean): Seq[String] + def createOrRefreshHiveTable(schema: StructType, infoDate: LocalDate, updateSchema: Boolean, recreate: Boolean): Seq[String] } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala index d02fbd86..b8a17bd6 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala @@ -106,14 +106,14 @@ abstract class JobBase(operationDef: OperationDef, } } - override def createOrRefreshHiveTable(schema: StructType, infoDate: LocalDate, recreate: Boolean): Seq[String] = { + override def createOrRefreshHiveTable(schema: StructType, infoDate: LocalDate, updateSchema: Boolean, recreate: Boolean): Seq[String] = { if (outputTableDef.hiveTable.isEmpty) return Seq.empty val hiveHelper = metastore.getHiveHelper(outputTableDef.name) val attempt = Try { - metastore.repairOrCreateHiveTable(outputTableDef.name, infoDate, Option(schema), hiveHelper, recreate) + metastore.repairOrCreateHiveTable(outputTableDef.name, infoDate, Option(schema), hiveHelper, updateSchema, recreate) } attempt match { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index d6b57bbc..d60126ce 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -423,8 +423,8 @@ abstract class TaskRunnerBase(conf: Config, } val hiveWarnings = if (task.job.outputTable.hiveTable.nonEmpty) { - val recreate = schemaChangesBeforeTransform.nonEmpty || schemaChangesAfterTransform.nonEmpty || runtimeConfig.forceReCreateHiveTables - task.job.createOrRefreshHiveTable(dfTransformed.schema, task.infoDate, recreate) + val updateSchema = schemaChangesBeforeTransform.nonEmpty || schemaChangesAfterTransform.nonEmpty || newSchemaRegisteredAfterTransform + task.job.createOrRefreshHiveTable(dfTransformed.schema, task.infoDate, updateSchema, runtimeConfig.forceReCreateHiveTables) } else { Seq.empty } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala index de5b3e21..faae0e78 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory import za.co.absa.pramen.api.jobdef.TransformExpression import za.co.absa.pramen.api.{CatalogTable, FieldChange} import za.co.absa.pramen.core.expr.DateExprEvaluator +import za.co.absa.pramen.core.utils.JdbcSparkUtils.MAXIMUM_VARCHAR_LENGTH import za.co.absa.pramen.core.utils.SparkMaster.Databricks import java.io.ByteArrayOutputStream @@ -388,7 +389,7 @@ object SparkUtils { * otherwise `None` */ def getLengthFromMetadata(metadata: Metadata): Option[Int] = { - if (metadata.contains(MAX_LENGTH_METADATA_KEY)) { + val proposedLength = if (metadata.contains(MAX_LENGTH_METADATA_KEY)) { val try1 = Try { val length = metadata.getLong(MAX_LENGTH_METADATA_KEY).toInt Option(length) @@ -412,6 +413,8 @@ object SparkUtils { } else { None } + + proposedLength.filter(length => length < MAXIMUM_VARCHAR_LENGTH && length > 0) } /** @@ -427,12 +430,27 @@ object SparkUtils { * @return the resolved string-based DataType, which can be CharType, VarcharType, or StringType */ def getStringTypeFromMetadata(metadata: Metadata): DataType = { + def getLength(lenStr: String): Option[Int] = { + Try { + lenStr.toInt + }.toOption + .filter(len => len > 0 && len < MAXIMUM_VARCHAR_LENGTH) + } + if (metadata.contains(CHAR_VARCHAR_METADATA_KEY)) { metadata.getString(CHAR_VARCHAR_METADATA_KEY) match { case charVarcharTypePattern(kind, len) if kind.equalsIgnoreCase("char") => - CharType(len.toInt) + val lenOpt = getLength(len) + lenOpt match { + case Some(l) => CharType(l) + case None => StringType + } case charVarcharTypePattern(_, len) => - VarcharType(len.toInt) + val lenOpt = getLength(len) + lenOpt match { + case Some(l) => VarcharType(l) + case None => StringType + } case _ => getLengthFromMetadata(metadata).map(VarcharType.apply).getOrElse(StringType) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelper.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelper.scala index 40854b93..541cfc51 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelper.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelper.scala @@ -37,6 +37,11 @@ abstract class HiveHelper { databaseName: Option[String], tableName: String): Unit + def replaceHiveTableSchema(schema: StructType, + partitionBy: Seq[String], + databaseName: Option[String], + tableName: String): Unit + def repairHiveTable(databaseName: Option[String], tableName: String, format: HiveFormat): Unit diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSparkCatalog.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSparkCatalog.scala index 95922d04..9ac102fd 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSparkCatalog.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSparkCatalog.scala @@ -70,6 +70,27 @@ class HiveHelperSparkCatalog(spark: SparkSession) extends HiveHelper { } } + override def replaceHiveTableSchema(schema: StructType, + partitionBy: Seq[String], + databaseName: Option[String], + tableName: String): Unit = { + val fullTableName = HiveHelper.getFullTable(databaseName, tableName) + + val partitionColsLower = partitionBy.map(_.toLowerCase()) + val nonPartitionFields = SparkUtils.transformSchemaForCatalog(schema) + .filter(field => !partitionColsLower.contains(field.name.toLowerCase())) + .filter(field => field.name.trim.nonEmpty) + + val schemaDDL = SparkUtils.escapeColumnsSparkDDL(StructType(nonPartitionFields).toDDL) + + val sql = HiveQueryTemplates.DEFAULT_REPLACE_SCHEMA_TEMPLATE + .replace("@fullTableName", fullTableName) + .replace("@schema", schemaDDL) + + log.info(s"Executing: $sql") + spark.sql(sql).collect() + } + override def repairHiveTable(databaseName: Option[String], tableName: String, format: HiveFormat): Unit = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSql.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSql.scala index b599d02b..a784deb6 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSql.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSql.scala @@ -54,6 +54,26 @@ class HiveHelperSql(val queryExecutor: QueryExecutor, } } + override def replaceHiveTableSchema(schema: StructType, + partitionBy: Seq[String], + databaseName: Option[String], + tableName: String): Unit = { + val fullTableName = HiveHelper.getFullTable(databaseName, tableName) + + log.info(s"Updating schema Hive table: $fullTableName...") + + val sqlHiveCreate = applyTemplate( + hiveConfig.replaceSchemaTemplate, + fullTableName, + "", + HiveFormat.Parquet, + getTableDDL(schema, partitionBy), + getPartitionDDL(schema, partitionBy) + ) + + queryExecutor.execute(sqlHiveCreate) + } + override def repairHiveTable(databaseName: Option[String], tableName: String, format: HiveFormat): Unit = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveQueryTemplates.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveQueryTemplates.scala index 34f1f562..5d2dd2e5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveQueryTemplates.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveQueryTemplates.scala @@ -22,6 +22,7 @@ import za.co.absa.pramen.core.utils.ConfigUtils case class HiveQueryTemplates( createTableTemplate: String, createOnlyTableTemplate: String, + replaceSchemaTemplate: String, repairTableTemplate: String, addPartitionTemplate: String, dropTableTemplate: String @@ -32,6 +33,7 @@ object HiveQueryTemplates { val CREATE_TABLE_TEMPLATE_KEY = "create.table.template" val CREATE_ONLY_TABLE_TEMPLATE_KEY = "create.only.table.template" + val REPLACE_SCHEMA_TEMPLATE_KEY = "replace.schema.template" val REPAIR_TABLE_TEMPLATE_KEY = "repair.table.template" val ADD_PARTITION_TEMPLATE_KEY = "add.partition.template" val DROP_TABLE_TEMPLATE_KEY = "drop.table.template" @@ -54,6 +56,8 @@ object HiveQueryTemplates { |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |LOCATION '@path';""".stripMargin + val DEFAULT_REPLACE_SCHEMA_TEMPLATE: String = "ALTER TABLE @fullTableName REPLACE COLUMNS ( @schema );" + val DEFAULT_REPAIR_TABLE_TEMPLATE: String = "MSCK REPAIR TABLE @fullTableName" val DEFAULT_ADD_PARTITION_TEMPLATE: String = @@ -68,6 +72,9 @@ object HiveQueryTemplates { val createOnlyTableTemplate = ConfigUtils.getOptionString(conf, CREATE_ONLY_TABLE_TEMPLATE_KEY) .getOrElse(DEFAULT_CREATE_ONLY_TABLE_TEMPLATE) + val replaceSchemaTemplate = ConfigUtils.getOptionString(conf, REPLACE_SCHEMA_TEMPLATE_KEY) + .getOrElse(DEFAULT_REPLACE_SCHEMA_TEMPLATE) + val repairTableTemplate = ConfigUtils.getOptionString(conf, REPAIR_TABLE_TEMPLATE_KEY) .getOrElse(DEFAULT_REPAIR_TABLE_TEMPLATE) @@ -80,6 +87,7 @@ object HiveQueryTemplates { HiveQueryTemplates( createTableTemplate = createTableTemplate, createOnlyTableTemplate = createOnlyTableTemplate, + replaceSchemaTemplate = replaceSchemaTemplate, repairTableTemplate = repairTableTemplate, addPartitionTemplate = addPartitionTemplate, dropTableTemplate = dropTableTemplate @@ -90,6 +98,7 @@ object HiveQueryTemplates { HiveQueryTemplates( createTableTemplate = DEFAULT_CREATE_TABLE_TEMPLATE, createOnlyTableTemplate = DEFAULT_CREATE_ONLY_TABLE_TEMPLATE, + replaceSchemaTemplate = DEFAULT_REPLACE_SCHEMA_TEMPLATE, repairTableTemplate = DEFAULT_REPAIR_TABLE_TEMPLATE, addPartitionTemplate = DEFAULT_ADD_PARTITION_TEMPLATE, dropTableTemplate = DEFAULT_DROP_TABLE_TEMPLATE diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala index 4a845bed..393bd563 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala @@ -294,26 +294,37 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF val qe = new QueryExecutorMock(tableExists = true) val hh = new HiveHelperSql(qe, defaultTemplates, true) - m.repairOrCreateHiveTable("table1", infoDate, Option(schema), hh, recreate = false) + m.repairOrCreateHiveTable("table1", infoDate, Option(schema), hh, updateSchema = false, recreate = false) assert(qe.queries.isEmpty) } - "repair existing table" in { + "repair existing table with no schema change" in { val qe = new QueryExecutorMock(tableExists = true) val hh = new HiveHelperSql(qe, defaultTemplates, false) - m.repairOrCreateHiveTable("table_hive_parquet", infoDate, Option(schema), hh, recreate = false) + m.repairOrCreateHiveTable("table_hive_parquet", infoDate, Option(schema), hh, updateSchema = false, recreate = false) assert(qe.queries.length == 1) assert(qe.queries.exists(_.contains("ALTER TABLE"))) } + "repair existing table with schema change" in { + val qe = new QueryExecutorMock(tableExists = true) + val hh = new HiveHelperSql(qe, defaultTemplates, false) + + m.repairOrCreateHiveTable("table_hive_parquet", infoDate, Option(schema), hh, updateSchema = true, recreate = false) + + assert(qe.queries.length == 2) + assert(qe.queries.exists(_.contains("ALTER TABLE"))) + assert(qe.queries.exists(_.contains("REPLACE COLUMNS"))) + } + "do nothing for a delta since it does not need repairing" in { val qe = new QueryExecutorMock(tableExists = true) val hh = new HiveHelperSql(qe, defaultTemplates, true) - m.repairOrCreateHiveTable("table_hive_delta", infoDate, Option(schema), hh, recreate = false) + m.repairOrCreateHiveTable("table_hive_delta", infoDate, Option(schema), hh, updateSchema = false, recreate = false) assert(qe.queries.isEmpty) } @@ -322,7 +333,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF val qe = new QueryExecutorMock(tableExists = false) val hh = new HiveHelperSql(qe, defaultTemplates, false) - m.repairOrCreateHiveTable("table_hive_parquet", infoDate, Option(schema), hh, recreate = false) + m.repairOrCreateHiveTable("table_hive_parquet", infoDate, Option(schema), hh, updateSchema = false, recreate = false) assert(qe.queries.length == 3) assert(qe.queries.exists(_.contains("DROP"))) @@ -335,7 +346,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF val qe = new QueryExecutorMock(tableExists = false) val hh = new HiveHelperSql(qe, defaultTemplates, true) - m.repairOrCreateHiveTable("table_hive_delta", infoDate, Option(schema), hh, recreate = false) + m.repairOrCreateHiveTable("table_hive_delta", infoDate, Option(schema), hh, updateSchema = false, recreate = false) assert(qe.queries.length == 3) assert(qe.queries.exists(_.contains("DROP"))) @@ -348,7 +359,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF val qe = new QueryExecutorMock(tableExists = true) val hh = new HiveHelperSql(qe, defaultTemplates, false) - m.repairOrCreateHiveTable("table_hive_parquet", infoDate, Option(schema), hh, recreate = true) + m.repairOrCreateHiveTable("table_hive_parquet", infoDate, Option(schema), hh, updateSchema = false, recreate = true) assert(qe.queries.length == 3) assert(qe.queries.exists(_.contains("DROP"))) @@ -361,7 +372,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF val hh = new HiveHelperSql(qe, defaultTemplates, true) val ex = intercept[IllegalArgumentException] { - m.repairOrCreateHiveTable("table_hive_not_supported", infoDate, Option(schema), hh, recreate = false) + m.repairOrCreateHiveTable("table_hive_not_supported", infoDate, Option(schema), hh, updateSchema = false, recreate = false) } assert(ex.getMessage.contains("Unsupported query type 'table'")) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/HiveConfigSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/HiveConfigSuite.scala index ce61b914..57807e9c 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/HiveConfigSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/HiveConfigSuite.scala @@ -29,7 +29,7 @@ class HiveConfigSuite extends AnyWordSpec { val defaultConfig = HiveDefaultConfig( HiveApi.SparkCatalog, Some("mydb1"), - Map("parquet" -> HiveQueryTemplates("create1", "create_only1", "repair1", "add_partition1", "drop1")), + Map("parquet" -> HiveQueryTemplates("create1", "create_only1", "update1", "repair1", "add_partition1", "drop1")), None, ignoreFailures = true, alwaysEscapeColumnNames = false, @@ -45,6 +45,7 @@ class HiveConfigSuite extends AnyWordSpec { assert(hiveConfig.optimizeExistQuery) assert(hiveConfig.templates.createTableTemplate.contains("create1")) assert(hiveConfig.templates.createOnlyTableTemplate.contains("create_only1")) + assert(hiveConfig.templates.replaceSchemaTemplate.contains("update1")) assert(hiveConfig.templates.repairTableTemplate.contains("repair1")) assert(hiveConfig.templates.addPartitionTemplate.contains("add_partition1")) assert(hiveConfig.templates.dropTableTemplate.contains("drop1")) @@ -68,6 +69,7 @@ class HiveConfigSuite extends AnyWordSpec { |conf { | create.table.template = "create2" | create.only.table.template = "create_only2" + | replace.schema.template = "replace2" | repair.table.template = "repair2" | add.partition.template = "add_partition2" | drop.table.template = "drop2" @@ -78,7 +80,7 @@ class HiveConfigSuite extends AnyWordSpec { val defaultConfig = HiveDefaultConfig( HiveApi.Sql, Some("mydb1"), - Map("parquet" -> HiveQueryTemplates("create1", "create_only1", "repair1", "add_partition1", "drop1")), + Map("parquet" -> HiveQueryTemplates("create1", "create_only1", "replace1", "repair1", "add_partition1", "drop1")), None, ignoreFailures = false, alwaysEscapeColumnNames = false, @@ -95,6 +97,7 @@ class HiveConfigSuite extends AnyWordSpec { assert(!hiveConfig.optimizeExistQuery) assert(hiveConfig.templates.createTableTemplate.contains("create2")) assert(hiveConfig.templates.createOnlyTableTemplate.contains("create_only2")) + assert(hiveConfig.templates.replaceSchemaTemplate.contains("replace2")) assert(hiveConfig.templates.repairTableTemplate.contains("repair2")) assert(hiveConfig.templates.addPartitionTemplate.contains("add_partition2")) assert(hiveConfig.templates.dropTableTemplate.contains("drop2")) @@ -106,7 +109,7 @@ class HiveConfigSuite extends AnyWordSpec { val defaultConfig = HiveDefaultConfig( HiveApi.Sql, Some("mydb"), - Map("parquet" -> HiveQueryTemplates("create", "create_only", "repair", "add_partition1", "drop")), + Map("parquet" -> HiveQueryTemplates("create", "create_only", "update", "repair", "add_partition1", "drop")), None, ignoreFailures = true, alwaysEscapeColumnNames = true, @@ -122,6 +125,7 @@ class HiveConfigSuite extends AnyWordSpec { assert(hiveConfig.optimizeExistQuery) assert(hiveConfig.templates.createTableTemplate.contains("create")) assert(hiveConfig.templates.createOnlyTableTemplate.contains("create_only")) + assert(hiveConfig.templates.replaceSchemaTemplate.contains("update")) assert(hiveConfig.templates.repairTableTemplate.contains("repair")) assert(hiveConfig.templates.addPartitionTemplate.contains("add_partition1")) assert(hiveConfig.templates.dropTableTemplate.contains("drop")) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala index fe97a4f5..48beb639 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala @@ -230,7 +230,7 @@ class MetaTableSuite extends AnyWordSpec { val defaultHiveConfig = HiveDefaultConfig(HiveApi.Sql, Some("mydb"), - Map("parquet" -> HiveQueryTemplates("create", "create_only", "repair", "add_partition", "drop")), + Map("parquet" -> HiveQueryTemplates("create", "create_only", "update", "repair", "add_partition", "drop")), Some(JdbcConfig("driver", Some("url"), user = Some("user"), password = Some("pass") @@ -246,6 +246,7 @@ class MetaTableSuite extends AnyWordSpec { assert(metaTable.hiveConfig.jdbcConfig.exists(_.driver == "driver")) assert(metaTable.hiveConfig.templates.createTableTemplate == "create") assert(metaTable.hiveConfig.templates.createOnlyTableTemplate == "create_only") + assert(metaTable.hiveConfig.templates.replaceSchemaTemplate == "update") assert(metaTable.hiveConfig.templates.repairTableTemplate == "repair") assert(metaTable.hiveConfig.templates.addPartitionTemplate == "add_partition") assert(metaTable.hiveConfig.templates.dropTableTemplate == "drop") @@ -289,6 +290,7 @@ class MetaTableSuite extends AnyWordSpec { | conf { | create.table.template = "create2" | create.only.table.template = "create_only2" + | replace.schema.template = "replace2" | repair.table.template = "repair2" | add.partition.template = "add_partition2" | drop.table.template = "drop2" @@ -299,7 +301,7 @@ class MetaTableSuite extends AnyWordSpec { val defaultHiveConfig = HiveDefaultConfig( HiveApi.Sql, Some("mydb1"), - Map("parquet" -> HiveQueryTemplates("create1", "create_only1", "repair1", "add_partition1", "drop1")), + Map("parquet" -> HiveQueryTemplates("create1", "create_only1", "replace1", "repair1", "add_partition1", "drop1")), Some(JdbcConfig("driver1", Some("url1"), user = Some("user1"), password = Some("pass1") @@ -315,6 +317,7 @@ class MetaTableSuite extends AnyWordSpec { assert(metaTable.hiveConfig.jdbcConfig.exists(_.driver == "driver2")) assert(metaTable.hiveConfig.templates.createTableTemplate == "create2") assert(metaTable.hiveConfig.templates.createOnlyTableTemplate == "create_only2") + assert(metaTable.hiveConfig.templates.replaceSchemaTemplate == "replace2") assert(metaTable.hiveConfig.templates.repairTableTemplate == "repair2") assert(metaTable.hiveConfig.templates.addPartitionTemplate == "add_partition2") assert(metaTable.hiveConfig.templates.dropTableTemplate == "drop2") diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala index cb1886c4..3ecda9d9 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala @@ -55,6 +55,7 @@ class JobSpy(jobName: String = "Dummy Job", var saveCount = 0 var saveDf: DataFrame = _ var createHiveTableCount = 0 + var updateSchemaHiveTable = false var recreateHiveTable = false override def taskDef: TaskDef = TaskDefFactory.getDummyTaskNotification(outputTable = MetaTable.getMetaTableDef(outputTable)) @@ -108,8 +109,9 @@ class JobSpy(jobName: String = "Dummy Job", SaveResult(saveStats) } - override def createOrRefreshHiveTable(schema: StructType, infoDate: LocalDate, recreate: Boolean): Seq[String] = { + override def createOrRefreshHiveTable(schema: StructType, infoDate: LocalDate, updateSchema: Boolean, recreate: Boolean): Seq[String] = { createHiveTableCount += 1 + updateSchemaHiveTable = updateSchema recreateHiveTable = recreate Nil } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala index 6f1d6c53..cffd942c 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala @@ -48,7 +48,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), writeOptions: Map[String, String] = Map.empty[String, String]) extends Metastore { val saveTableInvocations = new ListBuffer[(String, LocalDate, DataFrame)] - var hiveCreationInvocations = new ListBuffer[(String, LocalDate, Option[StructType], Boolean)] + var hiveCreationInvocations = new ListBuffer[(String, LocalDate, Option[StructType], Boolean, Boolean)] val queryExecutorMock = new QueryExecutorMock(true) val metadataManagerMock = new MetadataManagerNull(false) private val incrementalTables = new mutable.HashSet[String] @@ -96,11 +96,12 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), infoDate: LocalDate, schema: Option[StructType], hiveHelper: HiveHelper, + updateSchema: Boolean, recreate: Boolean): Unit = { if (failHive) { throw new RuntimeException("Test exception") } else - hiveCreationInvocations.append((tableName, infoDate, schema, recreate)) + hiveCreationInvocations.append((tableName, infoDate, schema, updateSchema, recreate)) } override def getStats(tableName: String, infoDate: LocalDate): MetaTableStats = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/JobBaseSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/JobBaseSuite.scala index ca1295bd..1d23753f 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/JobBaseSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/JobBaseSuite.scala @@ -217,7 +217,7 @@ class JobBaseSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix "do nothing if Hive table is not defined" in { val job = getUseCase() - val warnings = job.createOrRefreshHiveTable(null, infoDate, recreate = false) + val warnings = job.createOrRefreshHiveTable(null, infoDate, updateSchema = false, recreate = false) assert(warnings.isEmpty) @@ -226,7 +226,7 @@ class JobBaseSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix "return an empty seq of warnings when the operation succeeded" in { val job = getUseCase(hiveTable = Some("test_hive_table")) - val warnings = job.createOrRefreshHiveTable(null, infoDate, recreate = false) + val warnings = job.createOrRefreshHiveTable(null, infoDate, updateSchema = false, recreate = false) assert(warnings.isEmpty) } @@ -234,7 +234,7 @@ class JobBaseSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix "return warnings if ignore failures enabled" in { val job = getUseCase(hiveTable = Some("test_hive_table"), hiveFailure = true, ignoreHiveFailures = true) - val warnings = job.createOrRefreshHiveTable(null, infoDate, recreate = false) + val warnings = job.createOrRefreshHiveTable(null, infoDate, updateSchema = false, recreate = false) assert(warnings.nonEmpty) assert(warnings.head == "Failed to create or update Hive table 'test_hive_table': Test exception") @@ -244,7 +244,7 @@ class JobBaseSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix val job = getUseCase(hiveTable = Some("test_hive_table"), hiveFailure = true) val ex = intercept[RuntimeException] { - job.createOrRefreshHiveTable(null, infoDate, recreate = false) + job.createOrRefreshHiveTable(null, infoDate, updateSchema = false, recreate = false) } assert(ex.getMessage == "Test exception") diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSparkCatalogSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSparkCatalogSuite.scala index 57648716..d82c5af5 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSparkCatalogSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSparkCatalogSuite.scala @@ -18,6 +18,7 @@ package za.co.absa.pramen.core.tests.utils.hive import org.apache.hadoop.fs.Path import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, SaveMode} import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.core.base.SparkTestBase @@ -64,6 +65,22 @@ class HiveHelperSparkCatalogSuite extends AnyWordSpec with SparkTestBase with Te assert(hiveHelper.doesTableExist(Some("default"), "tbl2")) } } + + "replace schema of a partitioned Parquet table" in { + withTempDirectory("hive_test") { tempDir => + val path = getParquetPath(tempDir) + + val hiveHelper = new HiveHelperSparkCatalog(spark) + val schema = spark.read.parquet(path).withColumn("b", lit(1)).schema + + hiveHelper.createHiveTable(path, HiveFormat.Parquet, schema, "a" :: "b" :: Nil, Some("default"), "tbl3") + assert(hiveHelper.doesTableExist(Some("default"), "tbl3")) + + assertThrows[AnalysisException] { + hiveHelper.replaceHiveTableSchema(StructType(schema.drop(1)), "a" :: "b" :: Nil, Some("default"), "tbl3") + } + } + } } "createOrUpdateHiveTable()" should { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala index 228224e9..a9e53d6b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala @@ -82,7 +82,6 @@ class HiveHelperSqlSuite extends AnyWordSpec with SparkTestBase with TempDirFixt |MSCK REPAIR TABLE `db`.`tbl` |""".stripMargin - val qe = new QueryExecutorMock(tableExists = false) val hiveHelper = new HiveHelperSql(qe, defaultHiveConfig, true) val schema = spark.read.parquet(path).withColumn("b", lit(1)).schema @@ -123,6 +122,24 @@ class HiveHelperSqlSuite extends AnyWordSpec with SparkTestBase with TempDirFixt } } + "execute expected query for replacing schema of a partitioned table" in { + withTempDirectory("hive_test") { tempDir => + val path = getParquetPath(tempDir) + + val expected = "ALTER TABLE `db`.`tbl` REPLACE COLUMNS ( `c` INT );".stripMargin + + val qe = new QueryExecutorMock(tableExists = false) + val hiveHelper = new HiveHelperSql(qe, defaultHiveConfig, true) + val schema = spark.read.parquet(path).withColumn("b", lit(1)).schema + + hiveHelper.replaceHiveTableSchema(schema, "a" :: "b" :: Nil, Some("db"), "tbl") + + val actual = qe.queries.mkString("\n") + + compareText(actual, expected) + } + } + "repair table with database" in { val expected = "MSCK REPAIR TABLE `db`.`tbl`"