diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala index c98d9e1cd..1c9ddf4c0 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala @@ -22,6 +22,7 @@ import java.time.Instant case class PipelineInfo( pipelineName: String, + pipelineDefinitionId: String, environment: String, runtimeInfo: RuntimeInfo, startedAt: Instant, diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala index 2460f3331..c7e1de1f8 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala @@ -75,6 +75,22 @@ trait Pramen { * @return An instance of the TokenLockFactory, which allows for token-based locking functionality. */ def tokenLockFactory: TokenLockFactory + + def setComputeEngineId(computeEngineId: String): Unit + + def setNumberOfExecutorsMin(n: Int): Unit + + def setNumberOfExecutorsMax(n: Int): Unit + + def setExecutorType(executorType: String): Unit + + def setNumberOfRecordsIngested(count: Long): Unit + + def addNumberOfRecordsIngested(count: Long): Unit + + def setMaximumNumberOfColumns(count: Long): Unit + + def setExecutionAdditionalOption(key: String, value: String): Unit } object Pramen { diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/PipelineStatus.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/PipelineStatus.scala index a094b04f1..13afdf967 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/PipelineStatus.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/PipelineStatus.scala @@ -19,8 +19,16 @@ package za.co.absa.pramen.api.status sealed trait PipelineStatus object PipelineStatus { - case object Success extends PipelineStatus - case object Warning extends PipelineStatus - case object PartialSuccess extends PipelineStatus - case object Failure extends PipelineStatus + case object Success extends PipelineStatus { + override def toString: String = "succeeded" + } + case object Warning extends PipelineStatus{ + override def toString: String = "succeeded with warnings" + } + case object PartialSuccess extends PipelineStatus{ + override def toString: String = "partially succeeded" + } + case object Failure extends PipelineStatus{ + override def toString: String = "failed" + } } diff --git a/pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala b/pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala index c09cc1c0d..cb9b6b131 100644 --- a/pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala +++ b/pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala @@ -40,4 +40,20 @@ class DummyPramen extends Pramen { override def setWarningFlag(): Unit = null override def tokenLockFactory: TokenLockFactory = null + + override def setComputeEngineId(computeEngineId: String): Unit = {} + + override def setNumberOfExecutorsMin(n: Int): Unit = {} + + override def setNumberOfExecutorsMax(n: Int): Unit = {} + + override def setExecutorType(executorType: String): Unit = {} + + override def setNumberOfRecordsIngested(count: Long): Unit = {} + + override def addNumberOfRecordsIngested(count: Long): Unit = {} + + override def setMaximumNumberOfColumns(count: Long): Unit = {} + + override def setExecutionAdditionalOption(key: String, value: String): Unit = {} } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala index f1e25e762..3be5318e6 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala @@ -86,6 +86,22 @@ class PramenImpl extends Pramen { throw new IllegalStateException("Token lock factory is not available at the context.") ) + override def setComputeEngineId(computeEngineId: String): Unit = _pipelineState.foreach(_.setComputeEngineId(computeEngineId)) + + override def setNumberOfExecutorsMin(n: Int): Unit = _pipelineState.foreach(_.setNumberOfExecutorsMin(n)) + + override def setNumberOfExecutorsMax(n: Int): Unit = _pipelineState.foreach(_.setNumberOfExecutorsMax(n)) + + override def setExecutorType(executorType: String): Unit = _pipelineState.foreach(_.setExecutorType(executorType)) + + override def setNumberOfRecordsIngested(count: Long): Unit = _pipelineState.foreach(_.setNumberOfRecordsIngested(count)) + + override def addNumberOfRecordsIngested(count: Long): Unit = _pipelineState.foreach(_.addNumberOfRecordsIngested(count)) + + override def setMaximumNumberOfColumns(count: Long): Unit = _pipelineState.foreach(_.setMaximumNumberOfColumns(count)) + + override def setExecutionAdditionalOption(key: String, value: String): Unit = _pipelineState.foreach(_.setExecutionAdditionalOption(key, value)) + private[core] def setWorkflowConfig(config: Config): Unit = synchronized { _workflowConfig = Option(config) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala index 5463666ba..444708c2d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala @@ -48,7 +48,8 @@ case class RuntimeConfig( sparkAppDescriptionTemplate: Option[String], attempt: Int, // Current attempt number for the pipeline run (for auto-retry automation) maxAttempts: Int, // Maximum number of attempts allowed for the pipeline run - forceReCreateHiveTables: Boolean + forceReCreateHiveTables: Boolean, + executionOptions: Map[String, String] ) object RuntimeConfig { @@ -78,6 +79,7 @@ object RuntimeConfig { val ATTEMPT = "pramen.runtime.attempt" val MAX_ATTEMPTS = "pramen.runtime.max.attempts" val FORCE_RECREATE_HIVE_TABLES = "pramen.runtime.hive.force.recreate" + val EXECUTION_EXTRA_OPTIONS_PREFIX = "pramen.execution.option" def fromConfig(conf: Config): RuntimeConfig = { val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP) @@ -144,6 +146,7 @@ object RuntimeConfig { val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE) val attempt = ConfigUtils.getOptionInt(conf, ATTEMPT).getOrElse(1) val maxAttempts = ConfigUtils.getOptionInt(conf, MAX_ATTEMPTS).getOrElse(1) + val executionOptions = ConfigUtils.getExtraOptions(conf, EXECUTION_EXTRA_OPTIONS_PREFIX) RuntimeConfig( isDryRun = isDryRun, @@ -166,7 +169,8 @@ object RuntimeConfig { sparkAppDescriptionTemplate, attempt, maxAttempts, - forceReCreateHiveTables = ConfigUtils.getOptionBoolean(conf, FORCE_RECREATE_HIVE_TABLES).getOrElse(false) + forceReCreateHiveTables = ConfigUtils.getOptionBoolean(conf, FORCE_RECREATE_HIVE_TABLES).getOrElse(false), + executionOptions ) } @@ -192,7 +196,8 @@ object RuntimeConfig { sparkAppDescriptionTemplate = None, attempt = 1, maxAttempts = 1, - forceReCreateHiveTables = false + forceReCreateHiveTables = false, + Map.empty ) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala index 8f0602754..c92e76932 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala @@ -201,13 +201,15 @@ object Bookkeeper { case HadoopFormat.Delta => bookkeepingConfig.deltaTablePrefix match { case Some(tablePrefix) => - val fullTableName = JournalHadoopDeltaTable.getFullTableName(bookkeepingConfig.deltaDatabase, tablePrefix) - log.info(s"Using Delta Lake managed table '$fullTableName' for the journal.") + val journalTableName = JournalHadoopDeltaTable.getFullTableName(bookkeepingConfig.deltaDatabase, tablePrefix, "journal") + val executionsTableName = JournalHadoopDeltaTable.getFullTableName(bookkeepingConfig.deltaDatabase, tablePrefix, "executions") + log.info(s"Using Delta Lake managed table '$journalTableName' and '$executionsTableName' for the journal.") new JournalHadoopDeltaTable(bookkeepingConfig.deltaDatabase, tablePrefix) case None => - val path = bookkeepingConfig.bookkeepingLocation.get + "/journal" - log.info(s"Using Delta Lake for the journal at $path") - new JournalHadoopDeltaPath(path) + val journalPath = bookkeepingConfig.bookkeepingLocation.get + "/journal" + val executionsPath = bookkeepingConfig.bookkeepingLocation.get + "/executions" + log.info(s"Using Delta Lake for the journal at '$journalPath' and '$executionsPath'") + new JournalHadoopDeltaPath(journalPath, executionsPath) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala index 734a9ff23..3dda811e8 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala @@ -792,20 +792,20 @@ object BookkeeperDynamoDb { val DEFAULT_TABLE_PREFIX = "pramen" // Attribute names for bookkeeping table - val ATTR_TABLE_NAME = "tableName" - val ATTR_INFO_DATE = "infoDate" - val ATTR_INFO_DATE_SORT_KEY = "infoDateSortKey" // Composite: "infoDate#jobFinished" - val ATTR_INFO_DATE_BEGIN = "infoDateBegin" - val ATTR_INFO_DATE_END = "infoDateEnd" - val ATTR_INPUT_RECORD_COUNT = "inputRecordCount" - val ATTR_OUTPUT_RECORD_COUNT = "outputRecordCount" - val ATTR_JOB_STARTED = "jobStarted" - val ATTR_JOB_FINISHED = "jobFinished" - val ATTR_BATCH_ID = "batchId" - val ATTR_APPENDED_RECORD_COUNT = "appendedRecordCount" + val ATTR_TABLE_NAME = "table_name" + val ATTR_INFO_DATE = "info_date" + val ATTR_INFO_DATE_SORT_KEY = "info_date_sort_key" // Composite: "infoDate#jobFinished" + val ATTR_INFO_DATE_BEGIN = "info_date_begin" + val ATTR_INFO_DATE_END = "info_date_end" + val ATTR_INPUT_RECORD_COUNT = "input_record_count" + val ATTR_OUTPUT_RECORD_COUNT = "output_record_count" + val ATTR_JOB_STARTED = "job_started" + val ATTR_JOB_FINISHED = "job_finished" + val ATTR_BATCH_ID = "batch_id" + val ATTR_APPENDED_RECORD_COUNT = "appended_record_count" // Attribute names for schema table - val ATTR_SCHEMA_JSON = "schemaJson" + val ATTR_SCHEMA_JSON = "schema_json" val MODEL_VERSION = 1 diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala index 0c5f872d8..36627538d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala @@ -555,15 +555,15 @@ object OffsetManagerDynamoDb { val DEFAULT_TABLE_PREFIX = "pramen" // Attribute names for offset table - val ATTR_PRAMEN_TABLE_NAME = "pramenTableName" - val ATTR_COMPOSITE_KEY = "compositeKey" // Format: "infoDate#createdAtMilli" - val ATTR_INFO_DATE = "infoDate" - val ATTR_DATA_TYPE = "dataType" - val ATTR_MIN_OFFSET = "minOffset" - val ATTR_MAX_OFFSET = "maxOffset" - val ATTR_BATCH_ID = "batchId" - val ATTR_CREATED_AT = "createdAt" - val ATTR_COMMITTED_AT = "committedAt" + val ATTR_PRAMEN_TABLE_NAME = "pramen_table_name" + val ATTR_COMPOSITE_KEY = "composite_key" // Format: "infoDate#createdAtMilli" + val ATTR_INFO_DATE = "info_date" + val ATTR_DATA_TYPE = "data_type" + val ATTR_MIN_OFFSET = "min_offset" + val ATTR_MAX_OFFSET = "max_offset" + val ATTR_BATCH_ID = "batch_id" + val ATTR_CREATED_AT = "created_at" + val ATTR_COMMITTED_AT = "committed_at" /** * Builder for creating OffsetManagerDynamoDb instances. diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala index b3b64c231..ebfe453a1 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala @@ -17,6 +17,8 @@ package za.co.absa.pramen.core.config object Keys { + val PIPELINE_DEFINITION_ID = "pramen.pipeline.definition.id" + val INFORMATION_DATE_COLUMN = "pramen.information.date.column" val INFORMATION_DATE_FORMAT_APP = "pramen.information.date.format" diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/Journal.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/Journal.scala index b7d70ff81..6605a4778 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/Journal.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/Journal.scala @@ -16,7 +16,7 @@ package za.co.absa.pramen.core.journal -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} import java.time.Instant @@ -27,6 +27,8 @@ trait Journal extends AutoCloseable { def addEntry(entry: TaskCompleted): Unit + def addPipelineEntry(execution: Execution): Unit + def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] override def close(): Unit = {} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala index a9453d3ed..fb58535cf 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala @@ -24,7 +24,7 @@ import software.amazon.awssdk.services.dynamodb.model._ import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.bookkeeper.BookkeeperDynamoDb import za.co.absa.pramen.core.bookkeeper.BookkeeperDynamoDb.waitForTableActive -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} import java.net.URI import java.time.{Instant, LocalDate} @@ -51,9 +51,12 @@ class JournalDynamoDB private ( private val journalTableBaseName = s"${tablePrefix}_${JournalDynamoDB.DEFAULT_JOURNAL_TABLE}" private val journalTableFullName = BookkeeperDynamoDb.getFullTableName(tableArn, journalTableBaseName) + private val executionsTableBaseName = s"${tablePrefix}_${JournalDynamoDB.DEFAULT_EXECUTIONS_TABLE}" + private val executionsTableFullName = BookkeeperDynamoDb.getFullTableName(tableArn, executionsTableBaseName) - // Initialize table on creation + // Initialize tables on creation createJournalTableIfNotExists() + createExecutionsTableIfNotExists() /** * Add a task completion entry to the journal. @@ -119,6 +122,49 @@ class JournalDynamoDB private ( } } + override def addPipelineEntry(execution: Execution): Unit = { + val itemBuilder = Map.newBuilder[String, AttributeValue] + + // Primary key: composite of pipelineName and batchId + itemBuilder += (JournalDynamoDB.ATTR_EXEC_PIPELINE_NAME -> AttributeValue.builder().s(execution.pipelineName).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_BATCH_ID -> AttributeValue.builder().n(execution.batchId.toString).build()) + + itemBuilder += (JournalDynamoDB.ATTR_EXEC_PIPELINE_ID -> AttributeValue.builder().s(execution.pipelineId).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_PIPELINE_DEFINITION_ID -> AttributeValue.builder().s(execution.pipelineDefinitionId).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_ENVIRONMENT_NAME -> AttributeValue.builder().s(execution.environmentName).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_SPARK_APP_ID -> AttributeValue.builder().s(execution.sparkApplicationId).build()) + execution.computeEngineId.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_COMPUTE_ENGINE_ID -> AttributeValue.builder().s(v).build())) + execution.tenant.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_TENANT -> AttributeValue.builder().s(v).build())) + execution.country.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_COUNTRY -> AttributeValue.builder().s(v).build())) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_RUN_DATE_FROM -> AttributeValue.builder().s(execution.runDateFrom).build()) + execution.runDateTo.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_RUN_DATE_TO -> AttributeValue.builder().s(v.format(dateFormatter)).build())) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_STARTED_AT -> AttributeValue.builder().n(execution.startedAt.toString).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_FINISHED_AT -> AttributeValue.builder().n(execution.finishedAt.toString).build()) + execution.numberOfExecutorsMin.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_NUMBER_OF_EXECUTORS_MIN -> AttributeValue.builder().n(v.toString).build())) + execution.numberOfExecutorsMax.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_NUMBER_OF_EXECUTORS_MAX -> AttributeValue.builder().n(v.toString).build())) + execution.executorType.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_EXECUTOR_TYPE -> AttributeValue.builder().s(v).build())) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_STATUS -> AttributeValue.builder().s(execution.status).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_IS_RERUN -> AttributeValue.builder().s(execution.isRerun.toString).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_ATTEMPT_NUMBER -> AttributeValue.builder().s(execution.attemptNumber.toString).build()) + itemBuilder += (JournalDynamoDB.ATTR_EXEC_NUMBER_OF_ATTEMPTS -> AttributeValue.builder().s(execution.numberOfAttempts.toString).build()) + execution.failureReason.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_FAILURE_REASON -> AttributeValue.builder().s(v).build())) + execution.numberOfRecordsIngested.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_NUMBER_OR_RECORDS_INGESTED -> AttributeValue.builder().n(v.toString).build())) + execution.maxNumberOfColumns.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_MAX_NUMBER_OF_COLUMNS -> AttributeValue.builder().n(v.toString).build())) + execution.additionalOptions.foreach(v => itemBuilder += (JournalDynamoDB.ATTR_EXEC_ADDITIONAL_OPTIONS -> AttributeValue.builder().s(v).build())) + + try { + val putRequest = PutItemRequest.builder() + .tableName(executionsTableFullName) + .item(itemBuilder.result().asJava) + .build() + + dynamoDbClient.putItem(putRequest) + } catch { + case NonFatal(ex) => + log.error(s"Unable to write to the executions table '$executionsTableFullName'.", ex) + } + } + /** * Get journal entries within a time range. */ @@ -177,6 +223,61 @@ class JournalDynamoDB private ( } } + /** + * Creates the executions table if it doesn't exist. + */ + private def createExecutionsTableIfNotExists(): Unit = { + try { + val describeRequest = DescribeTableRequest.builder() + .tableName(executionsTableFullName) + .build() + + dynamoDbClient.describeTable(describeRequest) + log.info(s"Executions table '$executionsTableFullName' already exists") + } catch { + case _: ResourceNotFoundException => + log.info(s"Creating executions table '$executionsTableFullName'") + createExecutionsTable() + case NonFatal(ex) => + log.error(s"Error checking if executions table exists", ex) + throw ex + } + } + + /** + * Creates the executions table in DynamoDB. + */ + private def createExecutionsTable(): Unit = { + val createRequest = CreateTableRequest.builder() + .tableName(executionsTableFullName) + .attributeDefinitions( + AttributeDefinition.builder() + .attributeName(JournalDynamoDB.ATTR_EXEC_PIPELINE_NAME) + .attributeType(ScalarAttributeType.S) + .build(), + AttributeDefinition.builder() + .attributeName(JournalDynamoDB.ATTR_EXEC_BATCH_ID) + .attributeType(ScalarAttributeType.N) + .build() + ) + .keySchema( + KeySchemaElement.builder() + .attributeName(JournalDynamoDB.ATTR_EXEC_PIPELINE_NAME) + .keyType(KeyType.HASH) + .build(), + KeySchemaElement.builder() + .attributeName(JournalDynamoDB.ATTR_EXEC_BATCH_ID) + .keyType(KeyType.RANGE) + .build() + ) + .billingMode(BillingMode.PAY_PER_REQUEST) + .build() + + dynamoDbClient.createTable(createRequest) + waitForTableActive(executionsTableFullName, dynamoDbClient) + log.info(s"Executions table '$executionsTableFullName' created successfully") + } + /** * Creates the journal table if it doesn't exist. */ @@ -247,34 +348,62 @@ class JournalDynamoDB private ( object JournalDynamoDB { val DEFAULT_JOURNAL_TABLE = "journal" + val DEFAULT_EXECUTIONS_TABLE = "executions" val DEFAULT_TABLE_PREFIX = "pramen" // Maximum length for failure reason (4KB minus some overhead) val MAX_FAILURE_REASON_LENGTH = 4000 + // Attribute names for executions table + val ATTR_EXEC_PIPELINE_ID = "pipeline_id" + val ATTR_EXEC_PIPELINE_DEFINITION_ID = "pipeline_definition_id" + val ATTR_EXEC_PIPELINE_NAME = "pipeline_name" + val ATTR_EXEC_ENVIRONMENT_NAME = "environment_name" + val ATTR_EXEC_BATCH_ID = "batch_id" + val ATTR_EXEC_SPARK_APP_ID = "spark_application_id" + val ATTR_EXEC_COMPUTE_ENGINE_ID = "compute_engine_id" + val ATTR_EXEC_TENANT = "tenant" + val ATTR_EXEC_COUNTRY = "country" + val ATTR_EXEC_RUN_DATE_FROM = "run_date_from" + val ATTR_EXEC_RUN_DATE_TO = "run_date_to" + val ATTR_EXEC_STARTED_AT = "started_at" + val ATTR_EXEC_FINISHED_AT = "finished_at" + val ATTR_EXEC_NUMBER_OF_EXECUTORS_MIN = "number_of_executors_min" + val ATTR_EXEC_NUMBER_OF_EXECUTORS_MAX = "number_of_executors_max" + val ATTR_EXEC_EXECUTOR_TYPE = "executor_type" + val ATTR_EXEC_STATUS = "status" + val ATTR_EXEC_IS_RERUN = "is_rerun" + val ATTR_EXEC_ATTEMPT_NUMBER = "attempt_number" + val ATTR_EXEC_NUMBER_OF_ATTEMPTS = "number_of_attempts" + val ATTR_EXEC_FAILURE_REASON = "failure_reason" + val ATTR_EXEC_NUMBER_OR_RECORDS_INGESTED = "number_of_records_ingested" + val ATTR_EXEC_MAX_NUMBER_OF_COLUMNS = "max_number_of_columns" + + val ATTR_EXEC_ADDITIONAL_OPTIONS = "additional_options" + // Attribute names for journal table - val ATTR_JOB_NAME = "jobName" - val ATTR_TABLE_NAME = "tableName" - val ATTR_PERIOD_BEGIN = "periodBegin" - val ATTR_PERIOD_END = "periodEnd" - val ATTR_INFO_DATE = "infoDate" - val ATTR_INPUT_RECORD_COUNT = "inputRecordCount" - val ATTR_INPUT_RECORD_COUNT_OLD = "inputRecordCountOld" - val ATTR_OUTPUT_RECORD_COUNT = "outputRecordCount" - val ATTR_OUTPUT_RECORD_COUNT_OLD = "outputRecordCountOld" - val ATTR_APPENDED_RECORD_COUNT = "appendedRecordCount" - val ATTR_OUTPUT_SIZE = "outputSize" - val ATTR_STARTED_AT = "startedAt" - val ATTR_FINISHED_AT = "finishedAt" + val ATTR_JOB_NAME = "job_name" + val ATTR_TABLE_NAME = "table_name" + val ATTR_PERIOD_BEGIN = "period_begin" + val ATTR_PERIOD_END = "period_end" + val ATTR_INFO_DATE = "info_date" + val ATTR_INPUT_RECORD_COUNT = "input_record_count" + val ATTR_INPUT_RECORD_COUNT_OLD = "input_record_count_old" + val ATTR_OUTPUT_RECORD_COUNT = "output_record_count" + val ATTR_OUTPUT_RECORD_COUNT_OLD = "output_record_count_old" + val ATTR_APPENDED_RECORD_COUNT = "appended_record_count" + val ATTR_OUTPUT_SIZE = "output_size" + val ATTR_STARTED_AT = "started_at" + val ATTR_FINISHED_AT = "finished_at" val ATTR_STATUS = "status" - val ATTR_FAILURE_REASON = "failureReason" - val ATTR_SPARK_APP_ID = "sparkApplicationId" - val ATTR_PIPELINE_ID = "pipelineId" - val ATTR_PIPELINE_NAME = "pipelineName" - val ATTR_ENVIRONMENT_NAME = "environmentName" + val ATTR_FAILURE_REASON = "failure_reason" + val ATTR_SPARK_APP_ID = "spark_application_id" + val ATTR_PIPELINE_ID = "pipeline_id" + val ATTR_PIPELINE_NAME = "pipeline_name" + val ATTR_ENVIRONMENT_NAME = "environment_name" val ATTR_TENANT = "tenant" val ATTR_COUNTRY = "country" - val ATTR_BATCH_ID = "batchId" + val ATTR_BATCH_ID = "batchid" /** * Builder for creating JournalDynamoDB instances. diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala index a27b7a0e5..4d68b3458 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import za.co.absa.pramen.core.app.config.InfoDateConfig -import za.co.absa.pramen.core.journal.model.{TaskCompleted, TaskCompletedCsv} +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted, TaskCompletedCsv} import za.co.absa.pramen.core.utils.{CsvUtils, FsUtils, SparkUtils} import java.time.{Instant, LocalDate} @@ -93,6 +93,11 @@ class JournalHadoopCsv(journalPath: String) )) } + override def addPipelineEntry(execution: Execution): Unit = { + // ToDo add the implementation for CSV + } + + private def serializeCompletedTaskCsv(t: TaskCompleted): String = { val periodBegin = t.periodBegin.format(dateFormatter) val periodEnd = t.periodEnd.format(dateFormatter) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scala index 6493029a8..7a8ab710f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scala @@ -20,12 +20,12 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.functions._ import org.apache.spark.sql.{SaveMode, SparkSession} -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} import za.co.absa.pramen.core.utils.FsUtils import java.time.Instant -class JournalHadoopDeltaPath(journalPath: String) +class JournalHadoopDeltaPath(journalPath: String, executionsPath: String) (implicit spark: SparkSession) extends Journal { import spark.implicits._ @@ -42,11 +42,26 @@ class JournalHadoopDeltaPath(journalPath: String) recordDf .write .mode(SaveMode.Append) - .option("format", "delta") + .format("delta") .option("mergeSchema", "true") .save(journalPath) } + override def addPipelineEntry(execution: Execution): Unit = { + val recordDf = Seq(execution).toDS().toDF() + + if (spark.version.split('.').head.toInt < 3) { + throw new IllegalArgumentException("Delta Lake for bookkeeping is only available in Spark 3+") + } + + recordDf + .write + .mode(SaveMode.Append) + .format("delta") + .option("mergeSchema", "true") + .save(executionsPath) + } + override def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] = { import spark.implicits._ @@ -56,7 +71,7 @@ class JournalHadoopDeltaPath(journalPath: String) val df = spark .read - .option("format", "delta") + .format("delta") .load(journalPath) df.filter(col("finishedAt") >= from.getEpochSecond && col("finishedAt") <= to.getEpochSecond) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaTable.scala index 18e096671..11425820a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaTable.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.journal import org.apache.spark.sql.functions._ import org.apache.spark.sql.{SaveMode, SparkSession} -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} import java.time.Instant @@ -38,20 +38,35 @@ class JournalHadoopDeltaTable(database: Option[String], recordDf .write .mode(SaveMode.Append) - .option("format", "delta") + .format("delta") .option("mergeSchema", "true") - .saveAsTable(getFullTableName(database, tablePrefix)) + .saveAsTable(getFullTableName(database, tablePrefix, "journal")) + } + + override def addPipelineEntry(execution: Execution): Unit = { + val recordDf = Seq(execution).toDS().toDF() + + if (spark.version.split('.').head.toInt < 3) { + throw new IllegalArgumentException("Delta Lake for bookkeeping is only available in Spark 3+") + } + + recordDf + .write + .mode(SaveMode.Append) + .format("delta") + .option("mergeSchema", "true") + .saveAsTable(getFullTableName(database, tablePrefix, "executions")) } override def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] = { import spark.implicits._ - if (!spark.catalog.tableExists(getFullTableName(database, tablePrefix))) { + if (!spark.catalog.tableExists(getFullTableName(database, tablePrefix, "journal"))) { return Seq.empty[TaskCompleted] } val df = spark - .table(getFullTableName(database, tablePrefix)) + .table(getFullTableName(database, tablePrefix, "journal")) df.filter(col("finishedAt") >= from.getEpochSecond && col("finishedAt") <= to.getEpochSecond) .orderBy(col("finishedAt")) @@ -61,10 +76,10 @@ class JournalHadoopDeltaTable(database: Option[String], } object JournalHadoopDeltaTable { - def getFullTableName(databaseOpt: Option[String], tablePrefix: String): String = { + def getFullTableName(databaseOpt: Option[String], tablePrefix: String, tableName: String): String = { databaseOpt match { - case Some(db) => s"$db.${tablePrefix}journal" - case None => s"${tablePrefix}journal" + case Some(db) => s"$db.$tablePrefix$tableName" + case None => s"$tablePrefix$tableName" } } } \ No newline at end of file diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala index 0558af397..d9a5afa6e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala @@ -20,7 +20,7 @@ import org.slf4j.LoggerFactory import slick.jdbc.JdbcBackend.Database import slick.jdbc.JdbcProfile import za.co.absa.pramen.core.app.config.InfoDateConfig -import za.co.absa.pramen.core.journal.model.{JournalTable, JournalTask, TaskCompleted} +import za.co.absa.pramen.core.journal.model._ import za.co.absa.pramen.core.utils.SlickUtils import java.time.{Instant, LocalDate} @@ -38,6 +38,10 @@ class JournalJdbc(db: Database, slickProfile: JdbcProfile) extends Journal { override val profile = slickProfile } + private val executionsTable = new ExecutionsTable { + override val profile = slickProfile + } + override def addEntry(entry: TaskCompleted): Unit = { val periodBegin = entry.periodBegin.format(dateFormatter) val periodEnd = entry.periodEnd.format(dateFormatter) @@ -76,6 +80,17 @@ class JournalJdbc(db: Database, slickProfile: JdbcProfile) extends Journal { } } + override def addPipelineEntry(execution: Execution): Unit = { + try { + slickUtils.ensureDbConnected(db) + db.run( + executionsTable.records += execution + ).execute() + } catch { + case NonFatal(ex) => log.error(s"Unable to write to the executions table.", ex) + } + } + override def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] = { val fromSec = from.getEpochSecond val toSec = to.getEpochSecond diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalMongoDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalMongoDb.scala index e1e1c7c0c..4ecacbaa4 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalMongoDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalMongoDb.scala @@ -16,8 +16,6 @@ package za.co.absa.pramen.core.journal -import java.time.Instant - import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries} import org.bson.codecs.configuration.CodecRegistry import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY @@ -25,13 +23,15 @@ import org.mongodb.scala.bson.codecs.Macros._ import org.mongodb.scala.model.Filters import za.co.absa.pramen.core.dao.MongoDb import za.co.absa.pramen.core.dao.model.{ASC, IndexField} -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} import za.co.absa.pramen.core.mongo.MongoDbConnection +import java.time.Instant import scala.util.control.NonFatal object JournalMongoDb { - val collectionName = "journal" + val tasksCollectionName = "journal" + val executionsCollectionName = "executions" } class JournalMongoDb(mongoDbConnection: MongoDbConnection) extends Journal { @@ -39,15 +39,20 @@ class JournalMongoDb(mongoDbConnection: MongoDbConnection) extends Journal { import JournalMongoDb._ import za.co.absa.pramen.core.dao.ScalaMongoImplicits._ - private val codecRegistry: CodecRegistry = fromRegistries(fromProviders(classOf[TaskCompleted]), DEFAULT_CODEC_REGISTRY) + private val codecRegistry: CodecRegistry = fromRegistries(fromProviders(classOf[TaskCompleted], classOf[Execution]), DEFAULT_CODEC_REGISTRY) private val db = mongoDbConnection.getDatabase initCollection() - private val collection = db.getCollection[TaskCompleted](collectionName).withCodecRegistry(codecRegistry) + private val tasksCollection = db.getCollection[TaskCompleted](tasksCollectionName).withCodecRegistry(codecRegistry) + private val executionsCollection = db.getCollection[Execution](executionsCollectionName).withCodecRegistry(codecRegistry) override def addEntry(entry: TaskCompleted): Unit = { - collection.insertOne(entry).execute() + tasksCollection.insertOne(entry).execute() + } + + override def addPipelineEntry(execution: Execution): Unit = { + executionsCollection.insertOne(execution).execute() } override def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] = { @@ -59,17 +64,26 @@ class JournalMongoDb(mongoDbConnection: MongoDbConnection) extends Journal { Filters.gte("finishedAt", instant0.getEpochSecond), Filters.lte("finishedAt", instant1.getEpochSecond) ) - collection.find(filter).execute() + tasksCollection.find(filter).execute() } private def initCollection(): Unit = { try { val d = new MongoDb(db) - if (!d.doesCollectionExists(collectionName)) { - d.createCollection(collectionName) - d.createIndex(collectionName, IndexField("startedAt", ASC) :: Nil) - d.createIndex(collectionName, IndexField("finishedAt", ASC) :: Nil) + if (!d.doesCollectionExists(tasksCollectionName)) { + d.createCollection(tasksCollectionName) + d.createIndex(tasksCollectionName, IndexField("startedAt", ASC) :: Nil) + d.createIndex(tasksCollectionName, IndexField("finishedAt", ASC) :: Nil) + } + if (!d.doesCollectionExists(executionsCollectionName)) { + d.createCollection(executionsCollectionName) + d.createIndex(executionsCollectionName, IndexField("startedAt", ASC) :: Nil) + d.createIndex(executionsCollectionName, IndexField("finishedAt", ASC) :: Nil) + d.createIndex(executionsCollectionName, IndexField("batchId", ASC) :: Nil) + d.createIndex(executionsCollectionName, IndexField("computeEngineId", ASC) :: Nil) + d.createIndex(executionsCollectionName, IndexField("pipelineId", ASC) :: Nil) } + } catch { case NonFatal(ex) => throw new RuntimeException(s"Unable to connect to MongoDb instance: ${mongoDbConnection.getConnectionString}", ex) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalNull.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalNull.scala index b02bea6b1..403671aef 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalNull.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalNull.scala @@ -16,7 +16,8 @@ package za.co.absa.pramen.core.journal -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} + import java.time.Instant /** @@ -26,5 +27,7 @@ import java.time.Instant class JournalNull extends Journal { override def addEntry(entry: TaskCompleted): Unit = {} + override def addPipelineEntry(pipelineStatus: Execution): Unit = {} + override def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] = Nil } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala new file mode 100644 index 000000000..a0556b4b2 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.journal.model + +case class Execution( + pipelineId: String, + pipelineDefinitionId: String, + pipelineName: String, + environmentName: String, + batchId: Long, + sparkApplicationId: String, + computeEngineId: Option[String], + tenant: Option[String], + country: Option[String], + runDateFrom: String, + runDateTo: Option[String], + startedAt: Long, + finishedAt: Long, + numberOfExecutorsMin: Option[Int], + numberOfExecutorsMax: Option[Int], + executorType: Option[String], + status: String, + isRerun: Boolean, + attemptNumber: Int, + numberOfAttempts: Int, + failureReason: Option[String], + numberOfRecordsIngested: Option[Long], + maxNumberOfColumns: Option[Long], + additionalOptions: Option[String] + ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala new file mode 100644 index 000000000..acdb4516f --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala @@ -0,0 +1,79 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.journal.model + +import slick.jdbc.JdbcProfile + +trait ExecutionsTable { + val profile: JdbcProfile + import profile.api._ + + class ExecutionsRecords(tag: Tag) extends Table[Execution](tag, "executions") { + def pipelineId = column[String]("pipeline_id", O.Length(40)) + def pipelineDefinitionId = column[String]("pipeline_definition_id", O.Length(50)) + def pipelineName = column[String]("pipeline_name", O.Length(200)) + def environmentName = column[String]("environment_name", O.Length(128)) + def batchId = column[Long]("batch_id") + def sparkApplicationId = column[String]("spark_application_id", O.Length(128)) + def computeEngineId = column[Option[String]]("compute_engine_id", O.Length(128)) + def tenant = column[Option[String]]("tenant", O.Length(200)) + def country = column[Option[String]]("country", O.Length(50)) + def runDateFrom = column[String]("run_date_from", O.Length(20)) + def runDateTo = column[Option[String]]("run_date_to", O.Length(20)) + def startedAt = column[Long]("started_at") + def finishedAt = column[Long]("finished_at") + def numberOfExecutorsMin = column[Option[Int]]("number_of_executors_min") + def numberOfExecutorsMax = column[Option[Int]]("number_of_executors_max") + def executorType = column[Option[String]]("executor_type", O.Length(128)) + def status = column[String]("status", O.Length(50)) + def isRerun = column[Boolean]("is_rerun") + def attemptNumber = column[Int]("attempt_number") + def numberOfAttempts = column[Int]("number_of_attempts") + def failureReason = column[Option[String]]("failure_reason") + def numberOfRecordsIngested = column[Option[Long]]("number_of_records_ingested") + def maxNumberOfColumns = column[Option[Long]]("max_number_of_columns") + def additionalOptions = column[Option[String]]("additional_options") + + private type ExecutionTuple = ( + (String, String, String, String, Long, String, Option[String], Option[String], Option[String], String, Option[String], Long, Long), + (Option[Int], Option[Int], Option[String], String, Boolean, Int, Int, Option[String], Option[Long], Option[Long], Option[String]) + ) + + private def toExecution(t: ExecutionTuple): Execution = Execution( + t._1._1, t._1._2, t._1._3, t._1._4, t._1._5, t._1._6, t._1._7, t._1._8, t._1._9, t._1._10, t._1._11, t._1._12, t._1._13, + t._2._1, t._2._2, t._2._3, t._2._4, t._2._5, t._2._6, t._2._7, t._2._8, t._2._9, t._2._10, t._2._11 + ) + + private def fromExecution(e: Execution): Option[ExecutionTuple] = Some( + (e.pipelineId, e.pipelineDefinitionId, e.pipelineName, e.environmentName, e.batchId, e.sparkApplicationId, e.computeEngineId, e.tenant, e.country, e.runDateFrom, e.runDateTo, e.startedAt, e.finishedAt), + (e.numberOfExecutorsMin, e.numberOfExecutorsMax, e.executorType, e.status, e.isRerun, e.attemptNumber, e.numberOfAttempts, e.failureReason, e.numberOfRecordsIngested, e.maxNumberOfColumns, e.additionalOptions) + ) + + def * = ( + (pipelineId, pipelineDefinitionId, pipelineName, environmentName, batchId, sparkApplicationId, computeEngineId, tenant, country, runDateFrom, runDateTo, startedAt, finishedAt), + (numberOfExecutorsMin, numberOfExecutorsMax, executorType, status, isRerun, attemptNumber, numberOfAttempts, failureReason, numberOfRecordsIngested, maxNumberOfColumns, additionalOptions) + ) <> (toExecution, fromExecution) + + def idx1 = index("idx_exec_started_at", startedAt, unique = false) + def idx2 = index("idx_exec_finished_at", finishedAt, unique = false) + def idx3 = index("idx_exec_batchid", batchId, unique = false) + def idx4 = index("idx_exec_compute_engine_id", computeEngineId, unique = false) + def idx5 = index("idx_exec_pipeline_id", pipelineId, unique = false) + } + + lazy val records = TableQuery[ExecutionsRecords] +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala index 454e90900..10dafdc9f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala @@ -33,8 +33,8 @@ object TokenLockDynamoDb { // Attribute names val ATTR_TOKEN = "job_token" // 'token' is a reserved word in DynamoDb and can't be used as an attribute val ATTR_OWNER = "job_owner" // 'owner' is a reserved word in DynamoDb and can't be used as an attribute - val ATTR_EXPIRES = "expiresAt" - val ATTR_CREATED_AT = "createdAt" + val ATTR_EXPIRES = "expires_at" + val ATTR_CREATED_AT = "created_at" val TICKETS_HARD_EXPIRE_DAYS = 1 } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala index 31b8a8932..25e62017e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala @@ -248,12 +248,12 @@ object MetadataManagerDynamoDb { val DEFAULT_TABLE_PREFIX = "pramen" // Attribute names for metadata table - val ATTR_COMPOSITE_KEY = "compositeKey" // tableName#infoDate - val ATTR_METADATA_KEY = "metadataKey" - val ATTR_METADATA_VALUE = "metadataValue" - val ATTR_LAST_UPDATED = "lastUpdated" - val ATTR_TABLE_NAME = "tableName" // For filtering/queries - val ATTR_INFO_DATE = "infoDate" // For filtering/queries + val ATTR_COMPOSITE_KEY = "composite_key" // tableName#infoDate + val ATTR_METADATA_KEY = "metadata_key" + val ATTR_METADATA_VALUE = "metadata_value" + val ATTR_LAST_UPDATED = "last_updated" + val ATTR_TABLE_NAME = "table_name" // For filtering/queries + val ATTR_INFO_DATE = "info_date" // For filtering/queries /** * Builder for creating MetadataManagerDynamoDb instances. diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala index 17bde2bc6..441395016 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SparkSession} import za.co.absa.pramen.api.jobdef.SourceTable import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} -import za.co.absa.pramen.api.{Query, Reason, Source, SourceResult} +import za.co.absa.pramen.api._ import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.metastore.Metastore @@ -33,6 +33,7 @@ import za.co.absa.pramen.core.utils.Emoji.WARNING import za.co.absa.pramen.core.utils.SparkUtils._ import java.time.{Instant, LocalDate} +import scala.util.Try class IngestionJob(operationDef: OperationDef, metastore: Metastore, @@ -168,6 +169,16 @@ class IngestionJob(operationDef: OperationDef, inputRecordCount: Option[Long]): SaveResult = { val stats = metastore.saveTable(outputTable.name, infoDate, df, inputRecordCount) + if (!outputTable.format.isRaw) { + val pramenOpt = Try { + Pramen.instance + }.toOption + + pramenOpt.foreach { pramen => + pramen.addNumberOfRecordsIngested(stats.recordCount.getOrElse(0L)) + } + } + try { source.postProcess( sourceTable.query, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala index d00f24ec1..056ac3f1c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala @@ -22,7 +22,7 @@ import slick.jdbc.{JdbcBackend, JdbcProfile} import slick.util.AsyncExecutor import za.co.absa.pramen.api.Pramen import za.co.absa.pramen.core.bookkeeper.model.{BookkeepingTable, MetadataTable, OffsetTable, SchemaTable} -import za.co.absa.pramen.core.journal.model.JournalTable +import za.co.absa.pramen.core.journal.model.{ExecutionsTable, JournalTable} import za.co.absa.pramen.core.lock.model.LockTicketTable import za.co.absa.pramen.core.rdb.PramenDb.MODEL_VERSION import za.co.absa.pramen.core.reader.JdbcUrlSelector @@ -53,6 +53,9 @@ class PramenDb(val jdbcConfig: JdbcConfig, val journalTable: JournalTable = new JournalTable { override val profile = slickProfile } + val executionsTable: ExecutionsTable = new ExecutionsTable { + override val profile = slickProfile + } val lockTicketTable: LockTicketTable = new LockTicketTable { override val profile = slickProfile } @@ -121,6 +124,10 @@ class PramenDb(val jdbcConfig: JdbcConfig, addColumn(bookkeepingTable.records.baseTableRow.tableName, "appended_record_count", "bigint") addColumn(journalTable.records.baseTableRow.tableName, "batch_id", "bigint") } + + if (dbVersion < 10) { + initTable(executionsTable.records.schema) + } } private def initTable(schema: slickProfile.SchemaDescription): Unit = { @@ -165,7 +172,7 @@ object PramenDb { private val log = LoggerFactory.getLogger(this.getClass) private val conf = Pramen.getConfig - val MODEL_VERSION = 9 + val MODEL_VERSION = 10 val DEFAULT_RETRIES: Int = conf.getInt("pramen.internal.connection.retries.default") val BACKOFF_MIN_MS: Int = conf.getInt("pramen.internal.connection.backoff.min.ms") val BACKOFF_MAX_MS: Int = conf.getInt("pramen.internal.connection.backoff.max.ms") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala index fcfeabbf5..32c609f2c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala @@ -70,6 +70,7 @@ object AppRunner { _ <- logBanner(spark) _ <- logExecutorNodes(conf, state, spark) appContext <- createAppContext(conf, state, spark) + _ <- Try { state.setJournal(appContext.journal) } taskRunner <- createTaskRunner(conf, state, appContext, spark.sparkContext.applicationId) pipeline <- getPipelineDef(conf, state, appContext) _ <- addSinkTables(state, pipeline, appContext) @@ -157,6 +158,9 @@ object AppRunner { hosts.foreach(host => log.info(s"Executor node: $host")) } + setExecutorNodeType(spark, state) + setMinMaxExecutors(spark, state) + setExecutionAdditionalProperties(spark, state) }, state, "Spark List of executor nodes") } @@ -168,6 +172,105 @@ object AppRunner { data.mapPartitions { _ => Iterable(java.net.InetAddress.getLocalHost.getHostName).iterator }.collect().distinct.sorted } + private[core] def setMinMaxExecutors(implicit spark: SparkSession, state: PipelineState): Unit = { + val executors = spark.sparkContext.getExecutorMemoryStatus.keySet + .filter(_ != "driver") + + val maxNumExecutors = spark.conf.getOption("spark.dynamicAllocation.maxExecutors").orElse( + spark.conf.getOption("spark.executor.instances")) match { + case Some(s) => Try(s.toInt).toOption.getOrElse(executors.size) + case None => executors.size + } + + state.setNumberOfExecutorsMax(maxNumExecutors) + + val dynamicAllocEnabled = spark.conf.get("spark.dynamicAllocation.enabled", "false").toBoolean + + val minNumExecutors = if (dynamicAllocEnabled) { + spark.conf.getOption("spark.dynamicAllocation.minExecutors") match { + case Some(s) => Try(s.toInt).toOption.getOrElse(1) + case None => 1 + } + } else { + maxNumExecutors + } + + state.setNumberOfExecutorsMin(minNumExecutors) + } + + private[core] def getNumberOfExecutorCores(spark: SparkSession): Int = { + spark.conf.getOption("spark.executor.cores") match { + case Some(s) => Try(s.toInt).toOption.getOrElse(Runtime.getRuntime.availableProcessors()) + case None => Runtime.getRuntime.availableProcessors() + } + } + + private[core] def getNumberOfExecutorMemoryGb(spark: SparkSession): Int = { + val memAttempt1 = spark.conf.getOption("spark.executor.memory") match { + case Some(s) => parseMemorySizeInGb(s) + case None => None + } + + memAttempt1.getOrElse { + val executorMemoryStatus = spark.sparkContext.getExecutorMemoryStatus + val executorEntries = executorMemoryStatus.filterKeys(_ != "driver") + + if (executorEntries.nonEmpty) { + val (_, (maxMemory, _)) = executorEntries.head + val memoryGb = maxMemory / (1024L * 1024L * 1024L) + memoryGb.toInt + } else { + log.warn("No executors found to determine the amount of memory of the executor") + 0 + } + } + } + + private[core] def parseMemorySizeInGb(s: String): Option[Int] = { + val trimmed = s.trim.toLowerCase + val memoryGb = Try { + if (trimmed.endsWith("g")) { + trimmed.dropRight(1).toDouble + } else if (trimmed.endsWith("m")) { + trimmed.dropRight(1).toDouble / 1024.0 + } else if (trimmed.endsWith("k")) { + trimmed.dropRight(1).toDouble / (1024.0 * 1024.0) + } else if (trimmed.endsWith("t")) { + trimmed.dropRight(1).toDouble * 1024.0 + } else { + trimmed.toDouble / (1024.0 * 1024.0 * 1024.0) + } + } + memoryGb.map(Math.round(_).toInt).toOption + } + + private[core] def setExecutionAdditionalProperties(implicit spark: SparkSession, state: PipelineState): Unit = { + spark.conf.getOption("spark.glue.JOB_RUN_ID").foreach { glueId => + state.setComputeEngineId(glueId) + } + + spark.conf.getOption("spark.glue.GLUE_VERSION").foreach { glueVersion => + state.setExecutionAdditionalOption("glue_version", glueVersion) + } + + spark.conf.getOption("spark.glue.accountId").foreach { awsAccount => + state.setExecutionAdditionalOption("aws_account_id", awsAccount) + } + + spark.conf.getOption("spark.glue.JOB_NAME").foreach { glueJobName => + state.setExecutionAdditionalOption("glue_job_name", glueJobName) + } + } + + private[core] def setExecutorNodeType(implicit spark: SparkSession, state: PipelineState): Unit = { + // Get first executor and construct a string like: + // C32M64 meaning 32 virtual CPUs and 64 GB of memory + val cpus = getNumberOfExecutorCores(spark) + val memoryGb = getNumberOfExecutorMemoryGb(spark) + + state.setExecutorType(s"C${cpus}M$memoryGb") + } + private[core] def logBanner(implicit spark: SparkSession): Try[Unit] = { if (!bannerShown) { Try { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index d60126ce4..e5e34cff6 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -40,7 +40,7 @@ import za.co.absa.pramen.core.state.PipelineState import za.co.absa.pramen.core.utils.Emoji._ import za.co.absa.pramen.core.utils.SparkUtils._ import za.co.absa.pramen.core.utils.hive.HiveHelper -import za.co.absa.pramen.core.utils.{ConfigUtils, ThreadUtils, TimeUtils} +import za.co.absa.pramen.core.utils.{ConfigUtils, SparkUtils, ThreadUtils, TimeUtils} import java.sql.Date import java.time.{Instant, LocalDate} @@ -432,8 +432,9 @@ abstract class TaskRunnerBase(conf: Config, val outputMetastoreHiveTable = task.job.outputTable.hiveTable.map(table => HiveHelper.getFullTable(task.job.outputTable.hiveConfig.database, table)) val hiveTableUpdates = (saveResult.hiveTablesUpdates ++ outputMetastoreHiveTable).distinct - val stats = saveResult.stats + pipelineState.setMaximumNumberOfColumns(SparkUtils.getTotalNumberOfColumns(dfTransformed.schema)) + val stats = saveResult.stats val finished = Instant.now() val completionReason = if (validationResult.status == NeedsUpdate || (validationResult.status == AlreadyRan && task.reason != TaskRunReason.Rerun)) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala index cd61e3efe..454485606 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala @@ -17,6 +17,7 @@ package za.co.absa.pramen.core.state import za.co.absa.pramen.api.status.{PipelineStateSnapshot, TaskResult} +import za.co.absa.pramen.core.journal.Journal trait PipelineState extends AutoCloseable { def getState: PipelineStateSnapshot @@ -33,6 +34,24 @@ trait PipelineState extends AutoCloseable { def setSparkAppId(sparkAppId: String): Unit + def setJournal(journal: Journal): Unit + + def setComputeEngineId(computeEngineId: String): Unit + + def setNumberOfExecutorsMin(n: Int): Unit + + def setNumberOfExecutorsMax(n: Int): Unit + + def setExecutorType(executorType: String): Unit + + def setExecutionAdditionalOption(key: String, value: String): Unit + + def setNumberOfRecordsIngested(count: Long): Unit + + def addNumberOfRecordsIngested(count: Long): Unit + + def setMaximumNumberOfColumns(count: Long): Unit + def addTaskCompletion(statuses: Seq[TaskResult]): Unit def getExitCode: Int diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index e4c671b3c..3111b802a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -16,16 +16,20 @@ package za.co.absa.pramen.core.state +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.typesafe.config.Config import org.slf4j.{Logger, LoggerFactory} import sun.misc.Signal import za.co.absa.pramen.api.status.RunStatus.{NotRan, Succeeded} import za.co.absa.pramen.api.status._ -import za.co.absa.pramen.api.{NotificationBuilder, PipelineInfo, PipelineNotificationTarget} +import za.co.absa.pramen.api.{NotificationBuilder, PipelineInfo, PipelineNotificationTarget, RunMode} import za.co.absa.pramen.core.app.config.RuntimeConfig.{DRY_RUN, EMAIL_IF_NO_CHANGES, UNDERCOVER} import za.co.absa.pramen.core.app.config.{HookConfig, RuntimeConfig} -import za.co.absa.pramen.core.config.Keys.{GOOD_THROUGHPUT_RPS, WARN_THROUGHPUT_RPS} +import za.co.absa.pramen.core.config.Keys.{GOOD_THROUGHPUT_RPS, PIPELINE_DEFINITION_ID, WARN_THROUGHPUT_RPS} import za.co.absa.pramen.core.exceptions.OsSignalException +import za.co.absa.pramen.core.journal.Journal +import za.co.absa.pramen.core.journal.model.Execution import za.co.absa.pramen.core.lock.TokenLockRegistry import za.co.absa.pramen.core.metastore.peristence.{TransientJobManager, TransientTableManager} import za.co.absa.pramen.core.notify.PipelineNotificationTargetFactory @@ -34,6 +38,7 @@ import za.co.absa.pramen.core.pipeline.PipelineDef._ import za.co.absa.pramen.core.utils.{ConfigUtils, JvmUtils} import java.time.Instant +import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} @@ -63,6 +68,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification private val taskResults = new ListBuffer[TaskResult] private val pipelineNotificationFailures = new ListBuffer[PipelineNotificationFailure] private val signalHandlers = new ListBuffer[PramenSignalHandler] + private val executionAdditionalOptions: mutable.Map[String, String] = new mutable.HashMap[String, String] ++ runtimeConfig.executionOptions @volatile private var failureException: Option[Throwable] = None @volatile private var signalException: Option[Throwable] = None @volatile private var exitedNormally = false @@ -70,6 +76,13 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification @volatile private var customShutdownHookCanRun = false @volatile private var sparkAppId: Option[String] = None @volatile private var warningFlag: Boolean = false + @volatile private var journalOpt: Option[Journal] = None + @volatile private var computeEngineId: Option[String] = None + @volatile private var numberOfExecutorsMin: Option[Int] = None + @volatile private var numberOfExecutorsMax: Option[Int] = None + @volatile private var executorType: Option[String] = None + @volatile private var numberOfRecordsIngested: Option[Long] = None + @volatile private var maxNumberOfColumns: Option[Long] = None init() @@ -111,6 +124,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification } else failureException + val pipelineDefinitionId = ConfigUtils.getOptionString(conf, PIPELINE_DEFINITION_ID).getOrElse("") val minRps = ConfigUtils.getOptionInt(conf, WARN_THROUGHPUT_RPS).getOrElse(0) val goodRps = ConfigUtils.getOptionInt(conf, GOOD_THROUGHPUT_RPS).getOrElse(0) val dryRun = ConfigUtils.getOptionBoolean(conf, DRY_RUN).getOrElse(false) @@ -119,6 +133,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification PipelineInfo( pipelineName, + pipelineDefinitionId, environmentName, RuntimeInfo( runtimeConfig.runDate, @@ -178,6 +193,48 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification this.sparkAppId = Option(sparkAppId) } + override def setJournal(journal: Journal): Unit = synchronized { + this.journalOpt = Option(journal) + } + + override def setComputeEngineId(computeEngineIdIn: String): Unit = synchronized { + computeEngineId = Option(computeEngineIdIn) + } + + override def setNumberOfExecutorsMin(nIn: Int): Unit = synchronized { + numberOfExecutorsMin = Option(nIn) + if (numberOfExecutorsMax.exists(_ < nIn)) { + numberOfExecutorsMax = Option(nIn) + } + } + + override def setNumberOfExecutorsMax(nIn: Int): Unit = synchronized { + numberOfExecutorsMax = Option(nIn) + if (numberOfExecutorsMin.exists(_ > nIn)) { + numberOfExecutorsMin = Option(nIn) + } + } + + override def setExecutorType(executorTypeIn: String): Unit = synchronized { + executorType = Option(executorTypeIn) + } + + override def setExecutionAdditionalOption(key: String, value: String): Unit = synchronized { + executionAdditionalOptions.put(key, value) + } + + override def setNumberOfRecordsIngested(count: Long): Unit = synchronized { + numberOfRecordsIngested = Option(count) + } + + override def addNumberOfRecordsIngested(count: Long): Unit = synchronized { + numberOfRecordsIngested = Option(numberOfRecordsIngested.getOrElse(0L) + count) + } + + override def setMaximumNumberOfColumns(count: Long): Unit = synchronized { + maxNumberOfColumns = Option(Math.max(maxNumberOfColumns.getOrElse(0L), count)) + } + override def addTaskCompletion(statuses: Seq[TaskResult]): Unit = synchronized { taskResults ++= statuses.filter(_.runStatus != NotRan) if (statuses.exists(_.runStatus.isFailure)) { @@ -209,6 +266,10 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification sendPipelineNotifications() runCustomShutdownHook() removeSignalHandlers() + Try(addJournalEntry()).recover { + case NonFatal(ex) => + log.error("Unable to write pipeline execution journal entry.", ex) + } sendNotificationEmail() TokenLockRegistry.releaseAllLocks() } @@ -277,6 +338,60 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification } } + protected def addJournalEntry(): Unit = { + val pipelineInfo = getPipelineInfo + val failureReason = if (pipelineInfo.status == PipelineStatus.Success || pipelineInfo.status == PipelineStatus.Warning) { + None + } else { + pipelineInfo.failureException match { + case Some(ex) => Option(RunStatus.getShortExceptionDescription(ex)) + case None => + val firstReason = taskResults.map(_.runStatus).find(_.isFailure).map(_.getReason.getOrElse("")) + firstReason + } + } + journalOpt.foreach { journal => + val execution = Execution( + pipelineId, + pipelineInfo.pipelineDefinitionId, + pipelineName, + environmentName, + batchId, + sparkAppId.getOrElse(""), + computeEngineId, + tenant, + country, + runtimeConfig.runDate.toString, + runtimeConfig.runDateTo.map(_.toString), + startedInstant.getEpochSecond, + finishedInstant.getOrElse(Instant.now()).getEpochSecond, + numberOfExecutorsMin, + numberOfExecutorsMax, + executorType, + pipelineInfo.status.toString, + runtimeConfig.isRerun || runtimeConfig.historicalRunMode == RunMode.ForceRun, + runtimeConfig.attempt, + runtimeConfig.maxAttempts, + failureReason.map(_.take(1000)), + numberOfRecordsIngested, + maxNumberOfColumns, + getExecutionAdditionalOptions + ) + + journal.addPipelineEntry(execution) + } + } + + private def getExecutionAdditionalOptions: Option[String] = { + if (executionAdditionalOptions.isEmpty) + None + else { + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + Some(mapper.writeValueAsString(executionAdditionalOptions.toMap)) + } + } + protected def sendNotificationEmail(): Unit = { failureException.foreach(ex => log.error(s"The job has FAILED.", ex)) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala index faae0e78a..4726696d3 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala @@ -934,6 +934,34 @@ object SparkUtils { } } + /** + * Calculates the total number of columns in a given schema, including all nested columns + * within struct types. Array types are traversed to count any nested struct columns within + * their element types, but the array itself is not counted as an additional column beyond + * its top-level entry. + * + * @param schema the StructType representing the schema whose columns are to be counted + * @return the total number of columns, including all columns found in nested struct types + */ + def getTotalNumberOfColumns(schema: StructType): Int = { + def countNestedColumns(dataType: DataType): Int = { + dataType match { + case struct: StructType => + struct.fields.foldLeft(0) { (count, field) => + count + 1 + countNestedColumns(field.dataType) + } + case arr: ArrayType => + countNestedColumns(arr.elementType) + case _ => + 0 + } + } + + schema.fields.foldLeft(0) { (count, field) => + count + 1 + countNestedColumns(field.dataType) + } + } + private def getActualProcessingTimeUdf: UserDefinedFunction = { udf((_: Long) => Instant.now().getEpochSecond) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala index 28edb251c..ec3a18a4e 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala @@ -43,7 +43,8 @@ object RuntimeConfigFactory { sparkAppDescriptionTemplate: Option[String] = None, attempt: Int = 1, maxAttempts: Int = 1, - forceReCreateHiveTables: Boolean = false): RuntimeConfig = { + forceReCreateHiveTables: Boolean = false, + executionOptions: Map[String, String] = Map.empty): RuntimeConfig = { RuntimeConfig(isDryRun, isRerun, runTables, @@ -64,7 +65,8 @@ object RuntimeConfigFactory { sparkAppDescriptionTemplate, attempt, maxAttempts, - forceReCreateHiveTables) + forceReCreateHiveTables, + executionOptions) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala index 206e2f723..29aaa643a 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala @@ -23,6 +23,7 @@ import java.time.{Instant, LocalDate} object PipelineInfoFactory { def getDummyPipelineInfo(pipelineName: String = "Dummy Pipeline", + pipelineDefinitionId: String = "", environment: String = "DEV", runtimeInfo: RuntimeInfo = RuntimeInfo(LocalDate.parse("2022-02-18")), startedAt: Instant = Instant.ofEpochSecond(1718609409), @@ -36,6 +37,6 @@ object PipelineInfoFactory { tenant: Option[String] = Some("Dummy tenant"), country: Option[String] = Some("noname"), batchId: Long = 123L): PipelineInfo = { - PipelineInfo(pipelineName, environment, runtimeInfo, startedAt, finishedAt, warningFlag, sparkApplicationId, pipelineStatus, failureException, pipelineNotificationFailures, pipelineId, tenant, country, batchId) + PipelineInfo(pipelineName, pipelineDefinitionId, environment, runtimeInfo, startedAt, finishedAt, warningFlag, sparkApplicationId, pipelineStatus, failureException, pipelineNotificationFailures, pipelineId, tenant, country, batchId) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/journal/JournalMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/journal/JournalMock.scala index 6a4fce5b8..8a45f2bba 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/journal/JournalMock.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/journal/JournalMock.scala @@ -17,16 +17,19 @@ package za.co.absa.pramen.core.mocks.journal import za.co.absa.pramen.core.journal.Journal -import za.co.absa.pramen.core.journal.model.TaskCompleted +import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted} import java.time.Instant import scala.collection.mutable.ListBuffer class JournalMock extends Journal { val entries: ListBuffer[TaskCompleted] = new ListBuffer[TaskCompleted] + val executions: ListBuffer[Execution] = new ListBuffer[Execution] override def addEntry(entry: TaskCompleted): Unit = entries += entry + override def addPipelineEntry(execution: Execution): Unit = executions += execution + override def getEntries(from: Instant, to: Instant): Seq[TaskCompleted] = entries.filter(e => e.finishedAt >= from.getEpochSecond && e.finishedAt <= to.getEpochSecond).toSeq } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala index fd68398bd..fb3a192c3 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala @@ -17,9 +17,11 @@ package za.co.absa.pramen.core.mocks.state import za.co.absa.pramen.api.status.{PipelineStateSnapshot, TaskResult} +import za.co.absa.pramen.core.journal.Journal import za.co.absa.pramen.core.mocks.{PipelineInfoFactory, PipelineStateSnapshotFactory} import za.co.absa.pramen.core.state.PipelineState +import scala.collection.mutable import scala.collection.mutable.ListBuffer class PipelineStateSpy extends PipelineState { @@ -32,6 +34,14 @@ class PipelineStateSpy extends PipelineState { val completedStatuses = new ListBuffer[TaskResult] var closeCalled = 0 var sparkAppId: Option[String] = None + var journalOpt: Option[Journal] = None + val executionAdditionalOptions: mutable.Map[String, String] = new mutable.HashMap[String, String] + var computeEngineId: Option[String] = None + var numberOfExecutorsMin: Option[Int] = None + var numberOfExecutorsMax: Option[Int] = None + var executorType: Option[String] = None + var numberOfRecordsIngested: Option[Long] = None + var maxNumberOfColumns: Option[Long] = None override def getState: PipelineStateSnapshot = { PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(PipelineInfoFactory.getDummyPipelineInfo(sparkApplicationId = sparkAppId), @@ -62,6 +72,48 @@ class PipelineStateSpy extends PipelineState { this.sparkAppId = Option(sparkAppId) } + override def setJournal(journal: Journal): Unit = synchronized { + this.journalOpt = Option(journal) + } + + override def setComputeEngineId(computeEngineIdIn: String): Unit = synchronized { + computeEngineId = Option(computeEngineIdIn) + } + + override def setNumberOfExecutorsMin(nIn: Int): Unit = synchronized { + numberOfExecutorsMin = Option(nIn) + if (numberOfExecutorsMax.exists(_ < nIn)) { + numberOfExecutorsMax = Option(nIn) + } + } + + override def setNumberOfExecutorsMax(nIn: Int): Unit = synchronized { + numberOfExecutorsMax = Option(nIn) + if (numberOfExecutorsMin.exists(_ > nIn)) { + numberOfExecutorsMin = Option(nIn) + } + } + + override def setExecutorType(executorTypeIn: String): Unit = synchronized { + executorType = Option(executorTypeIn) + } + + override def setNumberOfRecordsIngested(count: Long): Unit = synchronized { + numberOfRecordsIngested = Option(count) + } + + override def addNumberOfRecordsIngested(count: Long): Unit = synchronized { + numberOfRecordsIngested = Option(numberOfRecordsIngested.getOrElse(0L) + count) + } + + override def setMaximumNumberOfColumns(count: Long): Unit = synchronized { + maxNumberOfColumns = Option(Math.max(maxNumberOfColumns.getOrElse(0L), count)) + } + + override def setExecutionAdditionalOption(key: String, value: String): Unit = synchronized { + executionAdditionalOptions.put(key, value) + } + override def addTaskCompletion(statuses: Seq[TaskResult]): Unit = synchronized { completedStatuses ++= statuses } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalHadoopDeltaPathLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalHadoopDeltaPathLongSuite.scala index dac7216e8..502e37b5c 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalHadoopDeltaPathLongSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalHadoopDeltaPathLongSuite.scala @@ -72,7 +72,7 @@ class JournalHadoopDeltaPathLongSuite extends AnyWordSpec with SparkTestBase wit } private def getJournal(path: String): Journal = { - new JournalHadoopDeltaPath(new Path(path, "journal").toString) + new JournalHadoopDeltaPath(new Path(path, "journal").toString, new Path(path, "executions").toString) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala index d3e0825b1..9d63f85dd 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala @@ -30,8 +30,8 @@ class JournalMongoDbSuite extends AnyWordSpec with MongoDbFixture with BeforeAnd before { if (db != null) { - if (db.doesCollectionExists(collectionName)) { - db.dropCollection(collectionName) + if (db.doesCollectionExists(tasksCollectionName)) { + db.dropCollection(tasksCollectionName) } journal = new JournalMongoDb(connection) } @@ -42,9 +42,9 @@ class JournalMongoDbSuite extends AnyWordSpec with MongoDbFixture with BeforeAnd "Initialize an empty database" in { db.doesCollectionExists("collectionName") - assert(db.doesCollectionExists(collectionName)) + assert(db.doesCollectionExists(tasksCollectionName)) - val indexes = dbRaw.getCollection(collectionName).listIndexes().execute() + val indexes = dbRaw.getCollection(tasksCollectionName).listIndexes().execute() assert(indexes.size == 3) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/AppRunnerSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/AppRunnerSuite.scala index 92e6f4977..b741ec11f 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/AppRunnerSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/AppRunnerSuite.scala @@ -246,6 +246,58 @@ class AppRunnerSuite extends AnyWordSpec with SparkTestBase { } } + "setMinMaxExecutors" should { + "set min and max executors on the pipeline state" in { + val state = new PipelineStateSpy + + AppRunner.setMinMaxExecutors(spark, state) + + assert(state.numberOfExecutorsMin.isDefined) + assert(state.numberOfExecutorsMax.isDefined) + assert(state.numberOfExecutorsMin.get >= 1) + assert(state.numberOfExecutorsMax.get >= state.numberOfExecutorsMin.get) + } + } + + "getNumberOfExecutorCores" should { + "return the number of executor cores from the Spark session" in { + val cores = AppRunner.getNumberOfExecutorCores(spark) + + assert(cores >= 1) + } + } + + "getNumberOfExecutorMemoryGb" should { + "return the executor memory in GB from the Spark session" in { + val memoryGb = AppRunner.getNumberOfExecutorMemoryGb(spark) + + assert(memoryGb > 0.0) + } + } + + "parseMemorySizeInGb" should { + "parse gigabytes" in { + assert(AppRunner.parseMemorySizeInGb("2g").get == 2) + assert(AppRunner.parseMemorySizeInGb("4G").get == 4) + } + + "parse megabytes" in { + assert(AppRunner.parseMemorySizeInGb("1024m").get == 1) + assert(AppRunner.parseMemorySizeInGb("2048M").get == 2) + assert(AppRunner.parseMemorySizeInGb("512m").get == 1) + } + + "parse terabytes" in { + assert(AppRunner.parseMemorySizeInGb("1t").get == 1024) + assert(AppRunner.parseMemorySizeInGb("2T").get == 2048) + } + + "parse kilobytes" in { + assert(AppRunner.parseMemorySizeInGb("1048576k").get == 1) + assert(AppRunner.parseMemorySizeInGb("1048576K").get == 1) + } + } + "handleFailure()" should { val state = getMockPipelineState diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala index 22a23bb33..4f06ed983 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala @@ -1146,4 +1146,128 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture } } + "getTotalNumberOfColumns" should { + "return the number of columns for a flat schema" in { + val schema = StructType(Array( + StructField("id", IntegerType), + StructField("name", StringType), + StructField("age", IntegerType) + )) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 3) + } + + "return the total number of columns including nested struct fields" in { + val schema = StructType(Array( + StructField("id", IntegerType), + StructField("address", StructType(Array( + StructField("street", StringType), + StructField("city", StringType), + StructField("zip", StringType) + ))) + )) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 5) + } + + "return the total number of columns including array of structs" in { + val schema = StructType(Array( + StructField("id", IntegerType), + StructField("phones", ArrayType(StructType(Array( + StructField("type", StringType), + StructField("number", StringType) + )))) + )) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 4) + } + + "return the total number of columns for a deeply nested schema" in { + val schema = StructType(Array( + StructField("id", LongType), + StructField("level1", StructType(Array( + StructField("field1", StringType), + StructField("level2", StructType(Array( + StructField("field2", IntegerType), + StructField("level3", ArrayType(StructType(Array( + StructField("field3", StringType), + StructField("field4", DoubleType) + )))) + ))) + ))) + )) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 8) + } + + "return 0 for an empty schema" in { + val schema = StructType(Array.empty[StructField]) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 0) + } + + "count array of primitives as a single column" in { + val schema = StructType(Array( + StructField("id", IntegerType), + StructField("tags", ArrayType(StringType)), + StructField("name", StringType) + )) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 3) + } + + "handle multiple nested structs at the same level" in { + val schema = StructType(Array( + StructField("id", IntegerType), + StructField("struct1", StructType(Array( + StructField("a", StringType), + StructField("b", StringType) + ))), + StructField("struct2", StructType(Array( + StructField("c", IntegerType), + StructField("d", IntegerType), + StructField("e", IntegerType) + ))) + )) + + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 8) + } + + "handle the test case schema from NestedDataFrameFactory" in { + val df = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], NestedDataFrameFactory.testCaseSchema) + + val actual = SparkUtils.getTotalNumberOfColumns(df.schema) + + assert(actual == 29) + } + } + } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala index a9e53d6b0..d7cbd873b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala @@ -126,7 +126,7 @@ class HiveHelperSqlSuite extends AnyWordSpec with SparkTestBase with TempDirFixt withTempDirectory("hive_test") { tempDir => val path = getParquetPath(tempDir) - val expected = "ALTER TABLE `db`.`tbl` REPLACE COLUMNS ( `c` INT );".stripMargin + val expected = "ALTER TABLE `db`.`tbl` REPLACE COLUMNS ( `c` INT )".stripMargin val qe = new QueryExecutorMock(tableExists = false) val hiveHelper = new HiveHelperSql(qe, defaultHiveConfig, true) diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala index 1809bf209..ced98cc1c 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala @@ -62,7 +62,7 @@ class EcsPipelineNotificationTargetSuite extends AnyWordSpec { val task3 = TestPrototypes.taskNotification.copy(taskDef = taskDef3) notificationTarget.sendNotification( - PipelineInfo("Dummy", "DEV", RuntimeInfo(LocalDate.parse("2022-02-18")), Instant.now, None, warningFlag = false, None, PipelineStatus.Success, None, Seq.empty, "pid_123", None, None, 123L), + PipelineInfo("Dummy", "", "DEV", RuntimeInfo(LocalDate.parse("2022-02-18")), Instant.now, None, warningFlag = false, None, PipelineStatus.Success, None, Seq.empty, "pid_123", None, None, 123L), Seq(task1, task2, task3), CustomNotification(Seq.empty, Seq.empty) )