Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ trait Metastore {
infoDate: LocalDate,
schema: Option[StructType],
hiveHelper: HiveHelper,
updateSchema: Boolean,
recreate: Boolean): Unit

def getStats(tableName: String, infoDate: LocalDate): MetaTableStats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import za.co.absa.pramen.core.utils.hive.{HiveFormat, HiveHelper}
import java.time.{Instant, LocalDate}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.NonFatal

class MetastoreImpl(appConfig: Config,
tableDefsIn: Seq[MetaTable],
Expand Down Expand Up @@ -163,6 +164,7 @@ class MetastoreImpl(appConfig: Config,
infoDate: LocalDate,
schema: Option[StructType],
hiveHelper: HiveHelper,
updateSchema: Boolean,
recreate: Boolean): Unit = {
val mt = getTableDef(tableName)
val hiveTable = mt.hiveTable match {
Expand Down Expand Up @@ -197,24 +199,44 @@ class MetastoreImpl(appConfig: Config,
val fullTableName = HiveHelper.getFullTable(mt.hiveConfig.database, hiveTable)
val effectivePath = mt.hivePath.getOrElse(path)

var needAddPartition = false

if (recreate) {
log.info(s"Recreating Hive table '$fullTableName'")
hiveHelper.createOrUpdateHiveTable(effectivePath, format, effectiveSchema, Seq(mt.infoDateColumn), mt.hiveConfig.database, hiveTable)
} else {
if (hiveHelper.doesTableExist(mt.hiveConfig.database, hiveTable)) {
if (mt.hivePreferAddPartition && mt.format.isInstanceOf[DataFormat.Parquet]) {
val location = new Path(effectivePath, s"${mt.infoDateColumn}=${infoDate}")
log.info(s"The table '$fullTableName' exists. Adding partition '$location'...")
hiveHelper.addPartition(mt.hiveConfig.database, hiveTable, Seq(mt.infoDateColumn), Seq(infoDate.toString), location.toString)
if (updateSchema) {
try {
log.info(s"The table '$fullTableName' exists. Updating schema of the Hive table '$fullTableName'")
hiveHelper.replaceHiveTableSchema(effectiveSchema, Seq(mt.infoDateColumn), mt.hiveConfig.database, hiveTable)
// Schema changed in-place. We still need to add the new partition
needAddPartition = true
} catch {
case NonFatal(ex) =>
log.warn(s"Could not update Hive schema via ${hiveHelper.getClass.getName}. Recreating Hive table '$fullTableName'", ex)
hiveHelper.createOrUpdateHiveTable(effectivePath, format, effectiveSchema, Seq(mt.infoDateColumn), mt.hiveConfig.database, hiveTable)
}
} else {
log.info(s"The table '$fullTableName' exists. Repairing it.")
hiveHelper.repairHiveTable(mt.hiveConfig.database, hiveTable, format)
// Schema didn't change, but we need to add the new partition
needAddPartition = true
}
} else {
log.info(s"The table '$fullTableName' does not exist. Creating it.")
hiveHelper.createOrUpdateHiveTable(effectivePath, format, effectiveSchema, Seq(mt.infoDateColumn), mt.hiveConfig.database, hiveTable)
}
}

if (needAddPartition) {
if (mt.hivePreferAddPartition && mt.format.isInstanceOf[DataFormat.Parquet]) {
val location = new Path(effectivePath, s"${mt.infoDateColumn}=${infoDate}")
log.info(s"The table '$fullTableName' exists. Adding partition '$location'...")
hiveHelper.addPartition(mt.hiveConfig.database, hiveTable, Seq(mt.infoDateColumn), Seq(infoDate.toString), location.toString)
} else {
log.info(s"The table '$fullTableName' exists. Repairing it.")
hiveHelper.repairHiveTable(mt.hiveConfig.database, hiveTable, format)
}
}
}

override def getStats(tableName: String, infoDate: LocalDate): MetaTableStats = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,7 @@ object HiveConfig {
* @return
*/
def fromConfigWithDefaults(conf: Config, defaults: HiveDefaultConfig, format: DataFormat, parent: String = ""): HiveConfig = {
val defaultTemplates = defaults.templates.getOrElse(format.name, HiveQueryTemplates(
DEFAULT_CREATE_TABLE_TEMPLATE,
DEFAULT_CREATE_ONLY_TABLE_TEMPLATE,
DEFAULT_REPAIR_TABLE_TEMPLATE,
DEFAULT_ADD_PARTITION_TEMPLATE,
DEFAULT_DROP_TABLE_TEMPLATE
))
val defaultTemplates = defaults.templates.getOrElse(format.name, HiveQueryTemplates.getDefaultQueryTemplates)

val hiveApi = if (conf.hasPath(HIVE_API_KEY))
HiveApi.fromString(conf.getString(HIVE_API_KEY))
Expand All @@ -111,6 +105,9 @@ object HiveConfig {
val createOnlyTableTemplate = ConfigUtils.getOptionString(conf, s"$HIVE_TEMPLATE_CONFIG_PREFIX.$CREATE_ONLY_TABLE_TEMPLATE_KEY")
.getOrElse(defaultTemplates.createOnlyTableTemplate)

val replaceSchemaTemplate = ConfigUtils.getOptionString(conf, s"$HIVE_TEMPLATE_CONFIG_PREFIX.$REPLACE_SCHEMA_TEMPLATE_KEY")
.getOrElse(defaultTemplates.replaceSchemaTemplate)

val repairTableTemplate = ConfigUtils.getOptionString(conf, s"$HIVE_TEMPLATE_CONFIG_PREFIX.$REPAIR_TABLE_TEMPLATE_KEY")
.getOrElse(defaultTemplates.repairTableTemplate)

Expand All @@ -126,7 +123,7 @@ object HiveConfig {
HiveConfig(
hiveApi = hiveApi,
database = database,
templates = HiveQueryTemplates(createTableTemplate, createOnlyTableTemplate, repairTableTemplate, addPartitionTableTemplate, dropTableTemplate),
templates = HiveQueryTemplates(createTableTemplate, createOnlyTableTemplate, replaceSchemaTemplate, repairTableTemplate, addPartitionTableTemplate, dropTableTemplate),
jdbcConfig = jdbcConfig,
ignoreFailures,
alwaysEscapeColumnNames,
Expand All @@ -142,21 +139,15 @@ object HiveConfig {
* @return Hive configuration with default query templates for the given format.
*/
def fromDefaults(defaults: HiveDefaultConfig, format: DataFormat): HiveConfig = {
val templates = defaults.templates.getOrElse(format.name, HiveQueryTemplates(
DEFAULT_CREATE_TABLE_TEMPLATE,
DEFAULT_CREATE_ONLY_TABLE_TEMPLATE,
DEFAULT_REPAIR_TABLE_TEMPLATE,
DEFAULT_ADD_PARTITION_TEMPLATE,
DEFAULT_DROP_TABLE_TEMPLATE
))
val templates = defaults.templates.getOrElse(format.name, HiveQueryTemplates.getDefaultQueryTemplates)

HiveConfig(defaults.hiveApi, defaults.database, templates, defaults.jdbcConfig, defaults.ignoreFailures, alwaysEscapeColumnNames = true, optimizeExistQuery = true)
}

def getNullConfig: HiveConfig = HiveConfig(
HiveApi.Sql,
None,
HiveQueryTemplates(DEFAULT_CREATE_TABLE_TEMPLATE, DEFAULT_CREATE_ONLY_TABLE_TEMPLATE, DEFAULT_REPAIR_TABLE_TEMPLATE, DEFAULT_ADD_PARTITION_TEMPLATE, DEFAULT_DROP_TABLE_TEMPLATE),
HiveQueryTemplates(DEFAULT_CREATE_TABLE_TEMPLATE, DEFAULT_CREATE_ONLY_TABLE_TEMPLATE, DEFAULT_REPLACE_SCHEMA_TEMPLATE, DEFAULT_REPAIR_TABLE_TEMPLATE, DEFAULT_ADD_PARTITION_TEMPLATE, DEFAULT_DROP_TABLE_TEMPLATE),
None,
ignoreFailures = false,
alwaysEscapeColumnNames = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ trait Job {
*
* @param schema The schema of the Hive table
* @param infoDate The information date for which to update the Hive table
* @param updateSchema If true, the schema needs update
* @param recreate Whether to force recreate the Hive table
* @return The list of warnings if Hive errors are ignored.
*/
def createOrRefreshHiveTable(schema: StructType, infoDate: LocalDate, recreate: Boolean): Seq[String]
def createOrRefreshHiveTable(schema: StructType, infoDate: LocalDate, updateSchema: Boolean, recreate: Boolean): Seq[String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ abstract class JobBase(operationDef: OperationDef,
}
}

override def createOrRefreshHiveTable(schema: StructType, infoDate: LocalDate, recreate: Boolean): Seq[String] = {
override def createOrRefreshHiveTable(schema: StructType, infoDate: LocalDate, updateSchema: Boolean, recreate: Boolean): Seq[String] = {
if (outputTableDef.hiveTable.isEmpty)
return Seq.empty

val hiveHelper = metastore.getHiveHelper(outputTableDef.name)

val attempt = Try {
metastore.repairOrCreateHiveTable(outputTableDef.name, infoDate, Option(schema), hiveHelper, recreate)
metastore.repairOrCreateHiveTable(outputTableDef.name, infoDate, Option(schema), hiveHelper, updateSchema, recreate)
}

attempt match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ abstract class TaskRunnerBase(conf: Config,
}

val hiveWarnings = if (task.job.outputTable.hiveTable.nonEmpty) {
val recreate = schemaChangesBeforeTransform.nonEmpty || schemaChangesAfterTransform.nonEmpty || runtimeConfig.forceReCreateHiveTables
task.job.createOrRefreshHiveTable(dfTransformed.schema, task.infoDate, recreate)
val updateSchema = schemaChangesBeforeTransform.nonEmpty || schemaChangesAfterTransform.nonEmpty || newSchemaRegisteredAfterTransform
task.job.createOrRefreshHiveTable(dfTransformed.schema, task.infoDate, updateSchema, runtimeConfig.forceReCreateHiveTables)
} else {
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.jobdef.TransformExpression
import za.co.absa.pramen.api.{CatalogTable, FieldChange}
import za.co.absa.pramen.core.expr.DateExprEvaluator
import za.co.absa.pramen.core.utils.JdbcSparkUtils.MAXIMUM_VARCHAR_LENGTH
import za.co.absa.pramen.core.utils.SparkMaster.Databricks

import java.io.ByteArrayOutputStream
Expand Down Expand Up @@ -388,7 +389,7 @@ object SparkUtils {
* otherwise `None`
*/
def getLengthFromMetadata(metadata: Metadata): Option[Int] = {
if (metadata.contains(MAX_LENGTH_METADATA_KEY)) {
val proposedLength = if (metadata.contains(MAX_LENGTH_METADATA_KEY)) {
val try1 = Try {
val length = metadata.getLong(MAX_LENGTH_METADATA_KEY).toInt
Option(length)
Expand All @@ -412,6 +413,8 @@ object SparkUtils {
} else {
None
}

proposedLength.filter(length => length < MAXIMUM_VARCHAR_LENGTH && length > 0)
}

/**
Expand All @@ -427,12 +430,27 @@ object SparkUtils {
* @return the resolved string-based DataType, which can be CharType, VarcharType, or StringType
*/
def getStringTypeFromMetadata(metadata: Metadata): DataType = {
def getLength(lenStr: String): Option[Int] = {
Try {
lenStr.toInt
}.toOption
.filter(len => len > 0 && len < MAXIMUM_VARCHAR_LENGTH)
}

if (metadata.contains(CHAR_VARCHAR_METADATA_KEY)) {
metadata.getString(CHAR_VARCHAR_METADATA_KEY) match {
case charVarcharTypePattern(kind, len) if kind.equalsIgnoreCase("char") =>
CharType(len.toInt)
val lenOpt = getLength(len)
lenOpt match {
case Some(l) => CharType(l)
case None => StringType
}
case charVarcharTypePattern(_, len) =>
VarcharType(len.toInt)
val lenOpt = getLength(len)
lenOpt match {
case Some(l) => VarcharType(l)
case None => StringType
}
case _ =>
getLengthFromMetadata(metadata).map(VarcharType.apply).getOrElse(StringType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ abstract class HiveHelper {
databaseName: Option[String],
tableName: String): Unit

def replaceHiveTableSchema(schema: StructType,
partitionBy: Seq[String],
databaseName: Option[String],
tableName: String): Unit

def repairHiveTable(databaseName: Option[String],
tableName: String,
format: HiveFormat): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,27 @@ class HiveHelperSparkCatalog(spark: SparkSession) extends HiveHelper {
}
}

override def replaceHiveTableSchema(schema: StructType,
partitionBy: Seq[String],
databaseName: Option[String],
tableName: String): Unit = {
val fullTableName = HiveHelper.getFullTable(databaseName, tableName)

val partitionColsLower = partitionBy.map(_.toLowerCase())
val nonPartitionFields = SparkUtils.transformSchemaForCatalog(schema)
.filter(field => !partitionColsLower.contains(field.name.toLowerCase()))
.filter(field => field.name.trim.nonEmpty)

val schemaDDL = SparkUtils.escapeColumnsSparkDDL(StructType(nonPartitionFields).toDDL)

val sql = HiveQueryTemplates.DEFAULT_REPLACE_SCHEMA_TEMPLATE
.replace("@fullTableName", fullTableName)
.replace("@schema", schemaDDL)

log.info(s"Executing: $sql")
spark.sql(sql).collect()
}

override def repairHiveTable(databaseName: Option[String],
tableName: String,
format: HiveFormat): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ class HiveHelperSql(val queryExecutor: QueryExecutor,
}
}

override def replaceHiveTableSchema(schema: StructType,
partitionBy: Seq[String],
databaseName: Option[String],
tableName: String): Unit = {
val fullTableName = HiveHelper.getFullTable(databaseName, tableName)

log.info(s"Updating schema Hive table: $fullTableName...")

val sqlHiveCreate = applyTemplate(
hiveConfig.replaceSchemaTemplate,
fullTableName,
"",
HiveFormat.Parquet,
getTableDDL(schema, partitionBy),
getPartitionDDL(schema, partitionBy)
)

queryExecutor.execute(sqlHiveCreate)
}

override def repairHiveTable(databaseName: Option[String],
tableName: String,
format: HiveFormat): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import za.co.absa.pramen.core.utils.ConfigUtils
case class HiveQueryTemplates(
createTableTemplate: String,
createOnlyTableTemplate: String,
replaceSchemaTemplate: String,
repairTableTemplate: String,
addPartitionTemplate: String,
dropTableTemplate: String
Expand All @@ -32,6 +33,7 @@ object HiveQueryTemplates {

val CREATE_TABLE_TEMPLATE_KEY = "create.table.template"
val CREATE_ONLY_TABLE_TEMPLATE_KEY = "create.only.table.template"
val REPLACE_SCHEMA_TEMPLATE_KEY = "replace.schema.template"
val REPAIR_TABLE_TEMPLATE_KEY = "repair.table.template"
val ADD_PARTITION_TEMPLATE_KEY = "add.partition.template"
val DROP_TABLE_TEMPLATE_KEY = "drop.table.template"
Expand All @@ -54,6 +56,8 @@ object HiveQueryTemplates {
|OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|LOCATION '@path';""".stripMargin

val DEFAULT_REPLACE_SCHEMA_TEMPLATE: String = "ALTER TABLE @fullTableName REPLACE COLUMNS ( @schema );"

val DEFAULT_REPAIR_TABLE_TEMPLATE: String = "MSCK REPAIR TABLE @fullTableName"

val DEFAULT_ADD_PARTITION_TEMPLATE: String =
Expand All @@ -68,6 +72,9 @@ object HiveQueryTemplates {
val createOnlyTableTemplate = ConfigUtils.getOptionString(conf, CREATE_ONLY_TABLE_TEMPLATE_KEY)
.getOrElse(DEFAULT_CREATE_ONLY_TABLE_TEMPLATE)

val replaceSchemaTemplate = ConfigUtils.getOptionString(conf, REPLACE_SCHEMA_TEMPLATE_KEY)
.getOrElse(DEFAULT_REPLACE_SCHEMA_TEMPLATE)

val repairTableTemplate = ConfigUtils.getOptionString(conf, REPAIR_TABLE_TEMPLATE_KEY)
.getOrElse(DEFAULT_REPAIR_TABLE_TEMPLATE)

Expand All @@ -80,6 +87,7 @@ object HiveQueryTemplates {
HiveQueryTemplates(
createTableTemplate = createTableTemplate,
createOnlyTableTemplate = createOnlyTableTemplate,
replaceSchemaTemplate = replaceSchemaTemplate,
repairTableTemplate = repairTableTemplate,
addPartitionTemplate = addPartitionTemplate,
dropTableTemplate = dropTableTemplate
Expand All @@ -90,6 +98,7 @@ object HiveQueryTemplates {
HiveQueryTemplates(
createTableTemplate = DEFAULT_CREATE_TABLE_TEMPLATE,
createOnlyTableTemplate = DEFAULT_CREATE_ONLY_TABLE_TEMPLATE,
replaceSchemaTemplate = DEFAULT_REPLACE_SCHEMA_TEMPLATE,
repairTableTemplate = DEFAULT_REPAIR_TABLE_TEMPLATE,
addPartitionTemplate = DEFAULT_ADD_PARTITION_TEMPLATE,
dropTableTemplate = DEFAULT_DROP_TABLE_TEMPLATE
Expand Down
Loading
Loading