Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.time.Instant

case class PipelineInfo(
pipelineName: String,
pipelineDefinitionId: String,
environment: String,
runtimeInfo: RuntimeInfo,
startedAt: Instant,
Expand Down
16 changes: 16 additions & 0 deletions pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ trait Pramen {
* @return An instance of the TokenLockFactory, which allows for token-based locking functionality.
*/
def tokenLockFactory: TokenLockFactory

def setComputeEngineId(computeEngineId: String): Unit

def setNumberOfExecutorsMin(n: Int): Unit

def setNumberOfExecutorsMax(n: Int): Unit

def setExecutorType(executorType: String): Unit

def setNumberOfRecordsIngested(count: Long): Unit

def addNumberOfRecordsIngested(count: Long): Unit

def setMaximumNumberOfColumns(count: Long): Unit

def setExecutionAdditionalOption(key: String, value: String): Unit
Comment on lines +79 to +93
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Implementers of Pramen in this repo:"
rg -nP --type=scala '\b(class|object)\b[^{\n]*\b(extends|with)\s+Pramen\b' -C2

echo
echo "Binary compatibility / versioning config hints:"
rg -nP --type=sbt 'mima|MiMa|versionScheme|binary' -C2

Repository: AbsaOSS/pramen

Length of output: 1268


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check PramenImpl for the new methods
echo "=== PramenImpl implementations ==="
grep -n "setComputeEngineId\|setNumberOfExecutorsMin\|setNumberOfExecutorsMax\|setExecutorType\|setNumberOfRecordsIngested\|addNumberOfRecordsIngested\|setMaximumNumberOfColumns\|setExecutionAdditionalOption" pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala || echo "Methods not found in PramenImpl"

echo
echo "=== DummyPramen implementations ==="
grep -n "setComputeEngineId\|setNumberOfExecutorsMin\|setNumberOfExecutorsMax\|setExecutorType\|setNumberOfRecordsIngested\|addNumberOfRecordsIngested\|setMaximumNumberOfColumns\|setExecutionAdditionalOption" pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala || echo "Methods not found in DummyPramen"

echo
echo "=== Build configuration files ==="
find . -maxdepth 2 -name "build.sbt" -o -name "pom.xml" | head -5 | xargs -I {} sh -c 'echo "File: {}"; head -30 "{}"'

echo
echo "=== Version info in build files ==="
rg -l "version|versionScheme" --type=scala --type=xml . | head -10

Repository: AbsaOSS/pramen

Length of output: 4707


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Get the full content of Pramen trait to see all methods
echo "=== Full Pramen trait definition ==="
wc -l pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala

echo
echo "=== Lines 70-100 of Pramen.scala ==="
sed -n '70,100p' pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala

Repository: AbsaOSS/pramen

Length of output: 1134


All in-repo implementers properly updated; verify breaking change is documented as intentional.

Adding abstract methods to a public trait is binary-incompatible for external implementers. Both PramenImpl and DummyPramen are already updated with implementations, confirming this is an intentional coordinated change. Ensure the breaking change is clearly documented in release notes and consider explicitly bumping the major version (currently 1.13.11-SNAPSHOT) or adding migration guidance if external users rely on this API.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala` around lines 79
- 93, You added new abstract methods to the public trait Pramen
(setComputeEngineId, setNumberOfExecutorsMin, setNumberOfExecutorsMax,
setExecutorType, setNumberOfRecordsIngested, addNumberOfRecordsIngested,
setMaximumNumberOfColumns, setExecutionAdditionalOption) which is a
binary-breaking API change; update the changelog/release notes to explicitly
document this breaking change, add migration guidance for external implementers
(list the new methods and expected semantics/behaviour), and bump the major
version or explicitly mark the release as incompatible (update version from
1.13.11-SNAPSHOT accordingly) so consumers know to recompile custom
implementations.

}

object Pramen {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@ package za.co.absa.pramen.api.status
sealed trait PipelineStatus

object PipelineStatus {
case object Success extends PipelineStatus
case object Warning extends PipelineStatus
case object PartialSuccess extends PipelineStatus
case object Failure extends PipelineStatus
case object Success extends PipelineStatus {
override def toString: String = "succeeded"
}
case object Warning extends PipelineStatus{
override def toString: String = "succeeded with warnings"
}
case object PartialSuccess extends PipelineStatus{
override def toString: String = "partially succeeded"
}
case object Failure extends PipelineStatus{
override def toString: String = "failed"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,20 @@ class DummyPramen extends Pramen {
override def setWarningFlag(): Unit = null

override def tokenLockFactory: TokenLockFactory = null

override def setComputeEngineId(computeEngineId: String): Unit = {}

override def setNumberOfExecutorsMin(n: Int): Unit = {}

override def setNumberOfExecutorsMax(n: Int): Unit = {}

override def setExecutorType(executorType: String): Unit = {}

override def setNumberOfRecordsIngested(count: Long): Unit = {}

override def addNumberOfRecordsIngested(count: Long): Unit = {}

override def setMaximumNumberOfColumns(count: Long): Unit = {}

override def setExecutionAdditionalOption(key: String, value: String): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ class PramenImpl extends Pramen {
throw new IllegalStateException("Token lock factory is not available at the context.")
)

override def setComputeEngineId(computeEngineId: String): Unit = _pipelineState.foreach(_.setComputeEngineId(computeEngineId))

override def setNumberOfExecutorsMin(n: Int): Unit = _pipelineState.foreach(_.setNumberOfExecutorsMin(n))

override def setNumberOfExecutorsMax(n: Int): Unit = _pipelineState.foreach(_.setNumberOfExecutorsMax(n))

override def setExecutorType(executorType: String): Unit = _pipelineState.foreach(_.setExecutorType(executorType))

override def setNumberOfRecordsIngested(count: Long): Unit = _pipelineState.foreach(_.setNumberOfRecordsIngested(count))

override def addNumberOfRecordsIngested(count: Long): Unit = _pipelineState.foreach(_.addNumberOfRecordsIngested(count))

override def setMaximumNumberOfColumns(count: Long): Unit = _pipelineState.foreach(_.setMaximumNumberOfColumns(count))

override def setExecutionAdditionalOption(key: String, value: String): Unit = _pipelineState.foreach(_.setExecutionAdditionalOption(key, value))

private[core] def setWorkflowConfig(config: Config): Unit = synchronized {
_workflowConfig = Option(config)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ case class RuntimeConfig(
sparkAppDescriptionTemplate: Option[String],
attempt: Int, // Current attempt number for the pipeline run (for auto-retry automation)
maxAttempts: Int, // Maximum number of attempts allowed for the pipeline run
forceReCreateHiveTables: Boolean
forceReCreateHiveTables: Boolean,
executionOptions: Map[String, String]
)

object RuntimeConfig {
Expand Down Expand Up @@ -78,6 +79,7 @@ object RuntimeConfig {
val ATTEMPT = "pramen.runtime.attempt"
val MAX_ATTEMPTS = "pramen.runtime.max.attempts"
val FORCE_RECREATE_HIVE_TABLES = "pramen.runtime.hive.force.recreate"
val EXECUTION_EXTRA_OPTIONS_PREFIX = "pramen.execution.option"

def fromConfig(conf: Config): RuntimeConfig = {
val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP)
Expand Down Expand Up @@ -144,6 +146,7 @@ object RuntimeConfig {
val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE)
val attempt = ConfigUtils.getOptionInt(conf, ATTEMPT).getOrElse(1)
val maxAttempts = ConfigUtils.getOptionInt(conf, MAX_ATTEMPTS).getOrElse(1)
val executionOptions = ConfigUtils.getExtraOptions(conf, EXECUTION_EXTRA_OPTIONS_PREFIX)

RuntimeConfig(
isDryRun = isDryRun,
Expand All @@ -166,7 +169,8 @@ object RuntimeConfig {
sparkAppDescriptionTemplate,
attempt,
maxAttempts,
forceReCreateHiveTables = ConfigUtils.getOptionBoolean(conf, FORCE_RECREATE_HIVE_TABLES).getOrElse(false)
forceReCreateHiveTables = ConfigUtils.getOptionBoolean(conf, FORCE_RECREATE_HIVE_TABLES).getOrElse(false),
executionOptions
)
}

Expand All @@ -192,7 +196,8 @@ object RuntimeConfig {
sparkAppDescriptionTemplate = None,
attempt = 1,
maxAttempts = 1,
forceReCreateHiveTables = false
forceReCreateHiveTables = false,
Map.empty
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,15 @@ object Bookkeeper {
case HadoopFormat.Delta =>
bookkeepingConfig.deltaTablePrefix match {
case Some(tablePrefix) =>
val fullTableName = JournalHadoopDeltaTable.getFullTableName(bookkeepingConfig.deltaDatabase, tablePrefix)
log.info(s"Using Delta Lake managed table '$fullTableName' for the journal.")
val journalTableName = JournalHadoopDeltaTable.getFullTableName(bookkeepingConfig.deltaDatabase, tablePrefix, "journal")
val executionsTableName = JournalHadoopDeltaTable.getFullTableName(bookkeepingConfig.deltaDatabase, tablePrefix, "executions")
log.info(s"Using Delta Lake managed table '$journalTableName' and '$executionsTableName' for the journal.")
new JournalHadoopDeltaTable(bookkeepingConfig.deltaDatabase, tablePrefix)
case None =>
val path = bookkeepingConfig.bookkeepingLocation.get + "/journal"
log.info(s"Using Delta Lake for the journal at $path")
new JournalHadoopDeltaPath(path)
val journalPath = bookkeepingConfig.bookkeepingLocation.get + "/journal"
val executionsPath = bookkeepingConfig.bookkeepingLocation.get + "/executions"
log.info(s"Using Delta Lake for the journal at '$journalPath' and '$executionsPath'")
new JournalHadoopDeltaPath(journalPath, executionsPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,20 +792,20 @@ object BookkeeperDynamoDb {
val DEFAULT_TABLE_PREFIX = "pramen"

// Attribute names for bookkeeping table
val ATTR_TABLE_NAME = "tableName"
val ATTR_INFO_DATE = "infoDate"
val ATTR_INFO_DATE_SORT_KEY = "infoDateSortKey" // Composite: "infoDate#jobFinished"
val ATTR_INFO_DATE_BEGIN = "infoDateBegin"
val ATTR_INFO_DATE_END = "infoDateEnd"
val ATTR_INPUT_RECORD_COUNT = "inputRecordCount"
val ATTR_OUTPUT_RECORD_COUNT = "outputRecordCount"
val ATTR_JOB_STARTED = "jobStarted"
val ATTR_JOB_FINISHED = "jobFinished"
val ATTR_BATCH_ID = "batchId"
val ATTR_APPENDED_RECORD_COUNT = "appendedRecordCount"
val ATTR_TABLE_NAME = "table_name"
val ATTR_INFO_DATE = "info_date"
val ATTR_INFO_DATE_SORT_KEY = "info_date_sort_key" // Composite: "infoDate#jobFinished"
val ATTR_INFO_DATE_BEGIN = "info_date_begin"
val ATTR_INFO_DATE_END = "info_date_end"
val ATTR_INPUT_RECORD_COUNT = "input_record_count"
val ATTR_OUTPUT_RECORD_COUNT = "output_record_count"
val ATTR_JOB_STARTED = "job_started"
val ATTR_JOB_FINISHED = "job_finished"
val ATTR_BATCH_ID = "batch_id"
val ATTR_APPENDED_RECORD_COUNT = "appended_record_count"

// Attribute names for schema table
val ATTR_SCHEMA_JSON = "schemaJson"
val ATTR_SCHEMA_JSON = "schema_json"

Comment on lines +795 to 809
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Attribute-key rename can break existing bookkeeping/schema DynamoDB tables.

These constant changes alter key and field names used in all query/update paths. Existing tables created with legacy attribute names won’t match new key-condition expressions, causing runtime failures unless a migration/compatibility layer is added.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 795 - 809, The recent renames of DynamoDB attribute constants
(e.g., ATTR_TABLE_NAME, ATTR_INFO_DATE, ATTR_INFO_DATE_SORT_KEY,
ATTR_INFO_DATE_BEGIN, ATTR_INFO_DATE_END, ATTR_INPUT_RECORD_COUNT,
ATTR_OUTPUT_RECORD_COUNT, ATTR_JOB_STARTED, ATTR_JOB_FINISHED, ATTR_BATCH_ID,
ATTR_APPENDED_RECORD_COUNT, ATTR_SCHEMA_JSON) in BookkeeperDynamoDb.scala will
break compatibility with existing DynamoDB tables; restore compatibility by
either reverting these constants to their previous names or implementing a
compatibility layer that accepts both legacy and new attribute names in all
query/update paths (e.g., in methods that build KeyConditionExpressions,
UpdateExpressions, and attribute maps), and add a migration helper or detection
logic to prefer existing attributes when present; ensure all usages of these
constants across BookkeeperDynamoDb (and any helpers that serialize/deserialize
schema) are updated to consult the compatibility mapping so existing tables
continue to function without a forced migration.

val MODEL_VERSION = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,15 +555,15 @@ object OffsetManagerDynamoDb {
val DEFAULT_TABLE_PREFIX = "pramen"

// Attribute names for offset table
val ATTR_PRAMEN_TABLE_NAME = "pramenTableName"
val ATTR_COMPOSITE_KEY = "compositeKey" // Format: "infoDate#createdAtMilli"
val ATTR_INFO_DATE = "infoDate"
val ATTR_DATA_TYPE = "dataType"
val ATTR_MIN_OFFSET = "minOffset"
val ATTR_MAX_OFFSET = "maxOffset"
val ATTR_BATCH_ID = "batchId"
val ATTR_CREATED_AT = "createdAt"
val ATTR_COMMITTED_AT = "committedAt"
val ATTR_PRAMEN_TABLE_NAME = "pramen_table_name"
val ATTR_COMPOSITE_KEY = "composite_key" // Format: "infoDate#createdAtMilli"
val ATTR_INFO_DATE = "info_date"
val ATTR_DATA_TYPE = "data_type"
val ATTR_MIN_OFFSET = "min_offset"
val ATTR_MAX_OFFSET = "max_offset"
val ATTR_BATCH_ID = "batch_id"
val ATTR_CREATED_AT = "created_at"
val ATTR_COMMITTED_AT = "committed_at"
Comment on lines +558 to +566
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Snake_case key rename can break existing offset tables at runtime.

Line 558 and Line 559 change the partition/sort key attribute names. For already-provisioned tables using legacy keys, DynamoDB key-condition validation will fail because this code now queries with different key names and does not migrate/recreate existing tables.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`
around lines 558 - 566, The rename of partition/sort key attributes
(ATTR_PRAMEN_TABLE_NAME and ATTR_COMPOSITE_KEY) will break existing DynamoDB
tables; update the code to be backward-compatible by detecting and supporting
legacy key names: either query DescribeTable once (e.g., in the
OffsetManagerDynamoDb initialization) to read the actual attribute names and
store which set to use, or implement dual-key logic in methods that build
key-condition expressions (use ATTR_PRAMEN_TABLE_NAME/ATTR_COMPOSITE_KEY but
fall back to legacy names if the first query returns a validation error). Ensure
this detection/dual-use is applied wherever the constants are used to construct
key conditions, reads or writes so existing tables continue to work without
migration.


/**
* Builder for creating OffsetManagerDynamoDb instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package za.co.absa.pramen.core.config

object Keys {
val PIPELINE_DEFINITION_ID = "pramen.pipeline.definition.id"

val INFORMATION_DATE_COLUMN = "pramen.information.date.column"
val INFORMATION_DATE_FORMAT_APP = "pramen.information.date.format"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.pramen.core.journal

import za.co.absa.pramen.core.journal.model.TaskCompleted
import za.co.absa.pramen.core.journal.model.{Execution, TaskCompleted}

import java.time.Instant

Expand All @@ -27,6 +27,8 @@ trait Journal extends AutoCloseable {

def addEntry(entry: TaskCompleted): Unit

def addPipelineEntry(execution: Execution): Unit

def getEntries(from: Instant, to: Instant): Seq[TaskCompleted]

override def close(): Unit = {}
Expand Down
Loading
Loading