#738 Add executions - the pipeline level journal table#741
Conversation
…vel instead of task level.
…he journal of executions.
…me as for JDBC bookkeeping tables.
… 'executions' table.
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (13)
WalkthroughThis PR introduces pipeline-level execution journaling for Pramen, complementing the existing task-level journal. It captures workflow metadata (executor configuration, timing, status, records ingested) and persists execution records across multiple backends (DynamoDB, MongoDB, Delta Lake, JDBC), enabling pipeline-wide observability and auditing. ChangesPipeline Execution Journaling and Tracking
Possibly related PRs
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 15
🧹 Nitpick comments (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/GlueIcebergSuite.scala (1)
54-57: ⚡ Quick winParameterize environment-specific values instead of hardcoding them.
Lines 54–57 and Line 69 embed UAT bucket/catalog identifiers directly. This makes the utility brittle across accounts/environments. Read these from
argsor config/env vars.Also applies to: 69-70
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pramen/core/src/test/scala/za/co/absa/pramen/core/GlueIcebergSuite.scala` around lines 54 - 57, The test GlueIcebergSuite currently hardcodes UAT-specific values into Spark configs (e.g., the "spark.sql.catalog.glue_catalog.warehouse" warehouse path, "spark.sql.catalog.glue_catalog.catalog-impl", "spark.sql.catalog.glue_catalog.io", and the access-point key like "spark.sql.catalog.glue_catalog.s3.access-points.ursamajor-afs1-dev-edla-aqdt-za"); change these to read from test args or environment/configuration variables and inject them into the same .config(...) calls so the suite is environment-agnostic—add parameters (or System.getenv/Config) used by GlueIcebergSuite to supply the warehouse URI, catalog impl, file IO class and access-point ARN instead of the hardcoded strings (also update the other hardcoded values referenced later in the file the comment mentions).pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala (1)
83-95: ⚡ Quick winOrder-dependent executor min/max reconciliation may cause confusion.
The reconciliation logic adjusts
numberOfExecutorsMinornumberOfExecutorsMaxto maintain the invariant min ≤ max, but the final values depend on the order of setter calls:
setNumberOfExecutorsMin(10)thensetNumberOfExecutorsMax(5)→ final: min=5, max=5setNumberOfExecutorsMax(5)thensetNumberOfExecutorsMin(10)→ final: min=10, max=10While maintaining the invariant is good, this order dependency could lead to unexpected test behavior. Consider documenting this behavior or validating inputs instead of auto-adjusting.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala` around lines 83 - 95, The current synchronized setters setNumberOfExecutorsMin and setNumberOfExecutorsMax auto-adjust the opposite bound (numberOfExecutorsMax / numberOfExecutorsMin) which makes final state order-dependent; change the behavior to be deterministic by validating inputs instead of auto-mangling the other field: in setNumberOfExecutorsMin(nIn: Int) check if numberOfExecutorsMax.exists(_ < nIn) and if so throw an IllegalArgumentException (or return a failure) rather than changing numberOfExecutorsMax, and likewise in setNumberOfExecutorsMax(nIn: Int) check if numberOfExecutorsMin.exists(_ > nIn) and throw if inconsistent; keep the synchronized block and update tests to expect validation errors (or document the validation behavior) so callers cannot rely on order-dependent auto-adjustment.pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala (1)
172-180: 💤 Low valueSilent exception swallowing when accessing
Pramen.instancemay hide configuration issues.The code wraps
Pramen.instanceaccess inTry{...}.toOptionto prevent ingestion failure, which is defensible. However, silently ignoring exceptions could mask initialization problems or configuration issues that should be surfaced.Consider adding logging when
Pramen.instanceis unavailable to aid troubleshooting:📋 Proposed improvement
if (!outputTable.format.isRaw) { val pramenOpt = Try { Pramen.instance - }.toOption + }.toOption.orElse { + log.debug(s"Pramen.instance not available; skipping ingestion metrics recording for ${outputTable.name}") + None + } pramenOpt.foreach { pramen =>🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala` around lines 172 - 180, The current Try{ Pramen.instance }.toOption in IngestionJob silently swallows exceptions; change it to capture the failure and log the exception so configuration/initialization errors are visible: replace the silent toOption usage around Pramen.instance with pattern matching or fold on Try/Option to call processLogger (or the class logger) with a clear message plus the caught exception before skipping the call to pramen.addNumberOfRecordsIngested(stats.recordCount.getOrElse(0L)); keep the existing behavior of not failing ingestion for non-raw formats but ensure the exception and context (accessing Pramen.instance for outputTable.format.isRaw check and stats.recordCount) are logged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala`:
- Around line 79-93: You added new abstract methods to the public trait Pramen
(setComputeEngineId, setNumberOfExecutorsMin, setNumberOfExecutorsMax,
setExecutorType, setNumberOfRecordsIngested, addNumberOfRecordsIngested,
setMaximumNumberOfColumns, setExecutionAdditionalOption) which is a
binary-breaking API change; update the changelog/release notes to explicitly
document this breaking change, add migration guidance for external implementers
(list the new methods and expected semantics/behaviour), and bump the major
version or explicitly mark the release as incompatible (update version from
1.13.11-SNAPSHOT accordingly) so consumers know to recompile custom
implementations.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`:
- Around line 795-809: The recent renames of DynamoDB attribute constants (e.g.,
ATTR_TABLE_NAME, ATTR_INFO_DATE, ATTR_INFO_DATE_SORT_KEY, ATTR_INFO_DATE_BEGIN,
ATTR_INFO_DATE_END, ATTR_INPUT_RECORD_COUNT, ATTR_OUTPUT_RECORD_COUNT,
ATTR_JOB_STARTED, ATTR_JOB_FINISHED, ATTR_BATCH_ID, ATTR_APPENDED_RECORD_COUNT,
ATTR_SCHEMA_JSON) in BookkeeperDynamoDb.scala will break compatibility with
existing DynamoDB tables; restore compatibility by either reverting these
constants to their previous names or implementing a compatibility layer that
accepts both legacy and new attribute names in all query/update paths (e.g., in
methods that build KeyConditionExpressions, UpdateExpressions, and attribute
maps), and add a migration helper or detection logic to prefer existing
attributes when present; ensure all usages of these constants across
BookkeeperDynamoDb (and any helpers that serialize/deserialize schema) are
updated to consult the compatibility mapping so existing tables continue to
function without a forced migration.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`:
- Around line 558-566: The rename of partition/sort key attributes
(ATTR_PRAMEN_TABLE_NAME and ATTR_COMPOSITE_KEY) will break existing DynamoDB
tables; update the code to be backward-compatible by detecting and supporting
legacy key names: either query DescribeTable once (e.g., in the
OffsetManagerDynamoDb initialization) to read the actual attribute names and
store which set to use, or implement dual-key logic in methods that build
key-condition expressions (use ATTR_PRAMEN_TABLE_NAME/ATTR_COMPOSITE_KEY but
fall back to legacy names if the first query returns a validation error). Ensure
this detection/dual-use is applied wherever the constants are used to construct
key conditions, reads or writes so existing tables continue to work without
migration.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`:
- Line 406: ATTR_BATCH_ID in JournalDynamoDB.scala is set to "batchId" but must
be "batch_id" to match the snake_case convention used across journal and
executions attributes (see ATTR_EXEC_BATCH_ID). Change the constant
ATTR_BATCH_ID's string value from "batchId" to "batch_id" in the JournalDynamoDB
object/class, search for any usages of ATTR_BATCH_ID to ensure consistency, and
add a short note in the changelog or migration plan to handle migrating existing
DynamoDB journal records (mapping old "batchId" to new "batch_id") where
applicable.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala`:
- Around line 96-98: The addPipelineEntry method in JournalHadoopCsv currently
no-ops which silently drops Execution records; update
JournalHadoopCsv.addPipelineEntry(execution: Execution) to either persist the
execution to the CSV journal using the same CSV writing/append logic used for
other journal rows (serialize the relevant Execution fields into the CSV schema
and flush/close the writer) or, if CSV journaling cannot support execution-level
entries, fail fast by throwing a clear
NotImplementedException/IllegalStateException that includes "addPipelineEntry"
and the Execution id to prevent silent data loss; locate the CSV write/append
helpers used elsewhere in JournalHadoopCsv and reuse them for consistency.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scala`:
- Around line 57-62: The DataFrameWriter call on recordDf currently uses
.option("format", "delta") which does not set the data source; replace that
option with .format("delta") so Delta becomes the writer format, keeping the
existing .option("mergeSchema", "true") and .mode(SaveMode.Append). Update the
chain around recordDf.write (the call that ends with .save(executionsPath)) to
use .format("delta") instead of .option("format", "delta") to ensure Delta
output.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scala`:
- Around line 73-75: The three index definitions idx3, idx4, and idx5 currently
point to finishedAt but their names indicate they should index batchId,
computeEngineId, and pipelineId respectively; update the index calls for idx3 to
use batchId, idx4 to use computeEngineId, and idx5 to use pipelineId (preserving
names and unique=false) so each index matches its intended column.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala`:
- Around line 36-37: The constants ATTR_EXPIRES and ATTR_CREATED_AT were renamed
and will break mixed-version rollouts; update getTicket (and any code that reads
those attributes) to attempt reads from both the new keys (ATTR_EXPIRES /
ATTR_CREATED_AT) and the old keys (e.g. "expiresAt"/"createdAt") in a
backward-compatible way (check presence before calling .n()/.s()), preferring
the new keys but falling back to the old ones, and adjust the DynamoDB
delete/conditional expressions in the lock-release path to accept either the new
timestamp attributes or the legacy names so deletes succeed during transitional
mixed deployments (refer to getTicket, the delete/condition expression building
code, ATTR_EXPIRES and ATTR_CREATED_AT to locate changes).
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`:
- Around line 251-256: The rename of DynamoDB attribute constants
(ATTR_COMPOSITE_KEY, ATTR_METADATA_KEY, ATTR_METADATA_VALUE, ATTR_LAST_UPDATED,
ATTR_TABLE_NAME, ATTR_INFO_DATE) is a breaking change for existing records;
restore backward compatibility by either keeping the original attribute names or
implementing dual-read/write logic: when reading, attempt to read both the new
and old attribute keys and merge/fallback (e.g., check ATTR_METADATA_VALUE then
the old name), when writing, write both the new and old attribute names (or
provide a one-time migration tool to rename existing items). Update
MetadataManagerDynamoDb methods that build/get item attributes and queries to
consult both key names or write both keys so old items remain accessible, and
add a short migration helper (or flag) to eventually remove legacy keys once
migration is complete.
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala`:
- Around line 185-189: The numeric Spark config parsing (e.g., the
maxNumExecutors val in AppRunner.scala used by logExecutorNodes) currently calls
toInt on config strings and can throw NumberFormatException; change parsing to a
safe parse (e.g., use s.toIntOption or Try(s.toInt).toOption) and fall back to
the existing default (executors.size) when parsing fails or the option is
missing, and apply the same hardened parsing to the other similar config parses
referenced (the blocks around logExecutorNodes at the other occurrences).
- Around line 165-168: The current AppRunner logging loop
(spark.conf.getAll.foreach with log.info) exposes raw Spark config values;
change it to avoid printing sensitive values by either (a) implementing an
allowlist of safe keys to log and only emitting those, or (b)
filtering/redacting keys that match sensitive patterns (e.g., "password",
"secret", "token", "key", "credential", "aws", "access", "private") and
replacing their values with a fixed redaction string before calling log.info;
update the code around AppRunner.scala's use of spark.conf.getAll and the
log.info calls so keys/values are sanitized or limited prior to logging.
- Around line 182-183: The code currently filters executor memory status with
.filter(_ != "driver") which never matches BlockManagerId keys; instead use
SparkStatusTracker to get real executor ids and exclude the driver: call
spark.sparkContext.statusTracker.getExecutorInfos.map(_.id).toSet to build
executorIds (this set contains "driver" for the driver process), then replace
the current executors calculation with filtering memoryStatus.keySet by whether
the key corresponds to one of those executorIds (e.g.
memoryStatus.keySet.filter(k => executorIds.exists(id => k.contains(id))) and/or
explicitly exclude id == "driver"); apply this change in both setMinMaxExecutors
and getNumberOfExecutorMemoryGb, referencing the executors variable and the
SparkStatusTracker usage.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala`:
- Around line 269-272: addJournalEntry() and sendNotificationEmail() in
PipelineStateImpl.onAppFinish can throw and currently run unguarded, which may
prevent TokenLockRegistry.releaseAllLocks() from executing and leak locks; wrap
the journal/email calls in try/catch (or a try { ... } finally {
TokenLockRegistry.releaseAllLocks() }) so that releaseAllLocks() always runs on
shutdown, log any exceptions from addJournalEntry()/sendNotificationEmail()
rather than letting them propagate, and apply the same protection to the other
similar block (the occurrence around lines 338-379) to ensure locks are released
even if external I/O fails.
In `@pramen/core/src/test/scala/za/co/absa/pramen/core/GlueIcebergSuite.scala`:
- Around line 47-71: The SparkSession created in main is never stopped; wrap the
body of main (the SparkSession creation and subsequent actions like df1 and
log.error(showString(df1))) in a try/finally where the finally calls
spark.stop(), ensuring SparkSession is referenced by the variable SparkSession
spark (or the existing spark) and terminated even on exceptions; update the main
method to use try { ... } finally { spark.stop() } around the code that uses the
SparkSession.
- Around line 53-58: Add the missing catalog implementation key and correct the
FileIO key so the Glue catalog initializes: set the config
"spark.sql.catalog.glue_catalog" to the Iceberg Glue catalog class
(org.apache.iceberg.aws.glue.GlueCatalog) and rename
"spark.sql.catalog.glue_catalog.io" to "spark.sql.catalog.glue_catalog.io-impl"
with the S3FileIO implementation (org.apache.iceberg.aws.s3.S3FileIO) in
GlueIcebergSuite.scala, keeping the existing access-points and
warehouse/defaultCatalog settings intact.
---
Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala`:
- Around line 172-180: The current Try{ Pramen.instance }.toOption in
IngestionJob silently swallows exceptions; change it to capture the failure and
log the exception so configuration/initialization errors are visible: replace
the silent toOption usage around Pramen.instance with pattern matching or fold
on Try/Option to call processLogger (or the class logger) with a clear message
plus the caught exception before skipping the call to
pramen.addNumberOfRecordsIngested(stats.recordCount.getOrElse(0L)); keep the
existing behavior of not failing ingestion for non-raw formats but ensure the
exception and context (accessing Pramen.instance for outputTable.format.isRaw
check and stats.recordCount) are logged.
In `@pramen/core/src/test/scala/za/co/absa/pramen/core/GlueIcebergSuite.scala`:
- Around line 54-57: The test GlueIcebergSuite currently hardcodes UAT-specific
values into Spark configs (e.g., the "spark.sql.catalog.glue_catalog.warehouse"
warehouse path, "spark.sql.catalog.glue_catalog.catalog-impl",
"spark.sql.catalog.glue_catalog.io", and the access-point key like
"spark.sql.catalog.glue_catalog.s3.access-points.ursamajor-afs1-dev-edla-aqdt-za");
change these to read from test args or environment/configuration variables and
inject them into the same .config(...) calls so the suite is
environment-agnostic—add parameters (or System.getenv/Config) used by
GlueIcebergSuite to supply the warehouse URI, catalog impl, file IO class and
access-point ARN instead of the hardcoded strings (also update the other
hardcoded values referenced later in the file the comment mentions).
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala`:
- Around line 83-95: The current synchronized setters setNumberOfExecutorsMin
and setNumberOfExecutorsMax auto-adjust the opposite bound (numberOfExecutorsMax
/ numberOfExecutorsMin) which makes final state order-dependent; change the
behavior to be deterministic by validating inputs instead of auto-mangling the
other field: in setNumberOfExecutorsMin(nIn: Int) check if
numberOfExecutorsMax.exists(_ < nIn) and if so throw an IllegalArgumentException
(or return a failure) rather than changing numberOfExecutorsMax, and likewise in
setNumberOfExecutorsMax(nIn: Int) check if numberOfExecutorsMin.exists(_ > nIn)
and throw if inconsistent; keep the synchronized block and update tests to
expect validation errors (or document the validation behavior) so callers cannot
rely on order-dependent auto-adjustment.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 51991362-ccfc-4b41-9b47-e80d2f5ed0c4
📒 Files selected for processing (40)
pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scalapramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scalapramen/api/src/main/scala/za/co/absa/pramen/api/status/PipelineStatus.scalapramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scalapramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scalapramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/Journal.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaPath.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopDeltaTable.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalMongoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalNull.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/Execution.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/ExecutionsTable.scalapramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scalapramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scalapramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scalapramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scalapramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scalapramen/core/src/test/scala/za/co/absa/pramen/core/GlueIcebergSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/journal/JournalMock.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalHadoopDeltaPathLongSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/AppRunnerSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scalapramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala
| def setComputeEngineId(computeEngineId: String): Unit | ||
|
|
||
| def setNumberOfExecutorsMin(n: Int): Unit | ||
|
|
||
| def setNumberOfExecutorsMax(n: Int): Unit | ||
|
|
||
| def setExecutorType(executorType: String): Unit | ||
|
|
||
| def setNumberOfRecordsIngested(count: Long): Unit | ||
|
|
||
| def addNumberOfRecordsIngested(count: Long): Unit | ||
|
|
||
| def setMaximumNumberOfColumns(count: Long): Unit | ||
|
|
||
| def setExecutionAdditionalOption(key: String, value: String): Unit |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Implementers of Pramen in this repo:"
rg -nP --type=scala '\b(class|object)\b[^{\n]*\b(extends|with)\s+Pramen\b' -C2
echo
echo "Binary compatibility / versioning config hints:"
rg -nP --type=sbt 'mima|MiMa|versionScheme|binary' -C2Repository: AbsaOSS/pramen
Length of output: 1268
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check PramenImpl for the new methods
echo "=== PramenImpl implementations ==="
grep -n "setComputeEngineId\|setNumberOfExecutorsMin\|setNumberOfExecutorsMax\|setExecutorType\|setNumberOfRecordsIngested\|addNumberOfRecordsIngested\|setMaximumNumberOfColumns\|setExecutionAdditionalOption" pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala || echo "Methods not found in PramenImpl"
echo
echo "=== DummyPramen implementations ==="
grep -n "setComputeEngineId\|setNumberOfExecutorsMin\|setNumberOfExecutorsMax\|setExecutorType\|setNumberOfRecordsIngested\|addNumberOfRecordsIngested\|setMaximumNumberOfColumns\|setExecutionAdditionalOption" pramen/api/src/test/scala/za/co/absa/pramen/api/mocks/DummyPramen.scala || echo "Methods not found in DummyPramen"
echo
echo "=== Build configuration files ==="
find . -maxdepth 2 -name "build.sbt" -o -name "pom.xml" | head -5 | xargs -I {} sh -c 'echo "File: {}"; head -30 "{}"'
echo
echo "=== Version info in build files ==="
rg -l "version|versionScheme" --type=scala --type=xml . | head -10Repository: AbsaOSS/pramen
Length of output: 4707
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Get the full content of Pramen trait to see all methods
echo "=== Full Pramen trait definition ==="
wc -l pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala
echo
echo "=== Lines 70-100 of Pramen.scala ==="
sed -n '70,100p' pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scalaRepository: AbsaOSS/pramen
Length of output: 1134
All in-repo implementers properly updated; verify breaking change is documented as intentional.
Adding abstract methods to a public trait is binary-incompatible for external implementers. Both PramenImpl and DummyPramen are already updated with implementations, confirming this is an intentional coordinated change. Ensure the breaking change is clearly documented in release notes and consider explicitly bumping the major version (currently 1.13.11-SNAPSHOT) or adding migration guidance if external users rely on this API.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pramen/api/src/main/scala/za/co/absa/pramen/api/Pramen.scala` around lines 79
- 93, You added new abstract methods to the public trait Pramen
(setComputeEngineId, setNumberOfExecutorsMin, setNumberOfExecutorsMax,
setExecutorType, setNumberOfRecordsIngested, addNumberOfRecordsIngested,
setMaximumNumberOfColumns, setExecutionAdditionalOption) which is a
binary-breaking API change; update the changelog/release notes to explicitly
document this breaking change, add migration guidance for external implementers
(list the new methods and expected semantics/behaviour), and bump the major
version or explicitly mark the release as incompatible (update version from
1.13.11-SNAPSHOT accordingly) so consumers know to recompile custom
implementations.
| val ATTR_TABLE_NAME = "table_name" | ||
| val ATTR_INFO_DATE = "info_date" | ||
| val ATTR_INFO_DATE_SORT_KEY = "info_date_sort_key" // Composite: "infoDate#jobFinished" | ||
| val ATTR_INFO_DATE_BEGIN = "info_date_begin" | ||
| val ATTR_INFO_DATE_END = "info_date_end" | ||
| val ATTR_INPUT_RECORD_COUNT = "input_record_count" | ||
| val ATTR_OUTPUT_RECORD_COUNT = "output_record_count" | ||
| val ATTR_JOB_STARTED = "job_started" | ||
| val ATTR_JOB_FINISHED = "job_finished" | ||
| val ATTR_BATCH_ID = "batch_id" | ||
| val ATTR_APPENDED_RECORD_COUNT = "appended_record_count" | ||
|
|
||
| // Attribute names for schema table | ||
| val ATTR_SCHEMA_JSON = "schemaJson" | ||
| val ATTR_SCHEMA_JSON = "schema_json" | ||
|
|
There was a problem hiding this comment.
Attribute-key rename can break existing bookkeeping/schema DynamoDB tables.
These constant changes alter key and field names used in all query/update paths. Existing tables created with legacy attribute names won’t match new key-condition expressions, causing runtime failures unless a migration/compatibility layer is added.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 795 - 809, The recent renames of DynamoDB attribute constants
(e.g., ATTR_TABLE_NAME, ATTR_INFO_DATE, ATTR_INFO_DATE_SORT_KEY,
ATTR_INFO_DATE_BEGIN, ATTR_INFO_DATE_END, ATTR_INPUT_RECORD_COUNT,
ATTR_OUTPUT_RECORD_COUNT, ATTR_JOB_STARTED, ATTR_JOB_FINISHED, ATTR_BATCH_ID,
ATTR_APPENDED_RECORD_COUNT, ATTR_SCHEMA_JSON) in BookkeeperDynamoDb.scala will
break compatibility with existing DynamoDB tables; restore compatibility by
either reverting these constants to their previous names or implementing a
compatibility layer that accepts both legacy and new attribute names in all
query/update paths (e.g., in methods that build KeyConditionExpressions,
UpdateExpressions, and attribute maps), and add a migration helper or detection
logic to prefer existing attributes when present; ensure all usages of these
constants across BookkeeperDynamoDb (and any helpers that serialize/deserialize
schema) are updated to consult the compatibility mapping so existing tables
continue to function without a forced migration.
| val ATTR_PRAMEN_TABLE_NAME = "pramen_table_name" | ||
| val ATTR_COMPOSITE_KEY = "composite_key" // Format: "infoDate#createdAtMilli" | ||
| val ATTR_INFO_DATE = "info_date" | ||
| val ATTR_DATA_TYPE = "data_type" | ||
| val ATTR_MIN_OFFSET = "min_offset" | ||
| val ATTR_MAX_OFFSET = "max_offset" | ||
| val ATTR_BATCH_ID = "batch_id" | ||
| val ATTR_CREATED_AT = "created_at" | ||
| val ATTR_COMMITTED_AT = "committed_at" |
There was a problem hiding this comment.
Snake_case key rename can break existing offset tables at runtime.
Line 558 and Line 559 change the partition/sort key attribute names. For already-provisioned tables using legacy keys, DynamoDB key-condition validation will fail because this code now queries with different key names and does not migrate/recreate existing tables.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`
around lines 558 - 566, The rename of partition/sort key attributes
(ATTR_PRAMEN_TABLE_NAME and ATTR_COMPOSITE_KEY) will break existing DynamoDB
tables; update the code to be backward-compatible by detecting and supporting
legacy key names: either query DescribeTable once (e.g., in the
OffsetManagerDynamoDb initialization) to read the actual attribute names and
store which set to use, or implement dual-key logic in methods that build
key-condition expressions (use ATTR_PRAMEN_TABLE_NAME/ATTR_COMPOSITE_KEY but
fall back to legacy names if the first query returns a validation error). Ensure
this detection/dual-use is applied wherever the constants are used to construct
key conditions, reads or writes so existing tables continue to work without
migration.
| val ATTR_ENVIRONMENT_NAME = "environment_name" | ||
| val ATTR_TENANT = "tenant" | ||
| val ATTR_COUNTRY = "country" | ||
| val ATTR_BATCH_ID = "batchId" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check attribute name constants in JournalDynamoDB.scala
rg -n "val ATTR_" pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala | grep -v "= \"[a-z_]*\"" || echo "All attributes use snake_case"
# Check how ATTR_BATCH_ID is used in the file
rg -n "ATTR_BATCH_ID" pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scalaRepository: AbsaOSS/pramen
Length of output: 321
🏁 Script executed:
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala | sed -n '357,410p'Repository: AbsaOSS/pramen
Length of output: 2740
🏁 Script executed:
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala | sed -n '100,120p'Repository: AbsaOSS/pramen
Length of output: 1562
🏁 Script executed:
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala | sed -n '210,225p'Repository: AbsaOSS/pramen
Length of output: 808
Change "batchId" to "batch_id" for consistency with all other journal and executions table attributes.
Line 406 defines ATTR_BATCH_ID = "batchId" as camelCase—the only attribute among 48+ attributes across both the journal table (lines 385-405) and executions table (lines 357-382) that does not follow snake_case. All other attributes, including ATTR_EXEC_BATCH_ID = "batch_id", consistently use snake_case. This aligns with the PR summary's goal of "alignment of attribute naming (snake_case) with JDBC bookkeeping tables."
Update the attribute to "batch_id" and plan for data migration of existing journal table records if applicable.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`
at line 406, ATTR_BATCH_ID in JournalDynamoDB.scala is set to "batchId" but must
be "batch_id" to match the snake_case convention used across journal and
executions attributes (see ATTR_EXEC_BATCH_ID). Change the constant
ATTR_BATCH_ID's string value from "batchId" to "batch_id" in the JournalDynamoDB
object/class, search for any usages of ATTR_BATCH_ID to ensure consistency, and
add a short note in the changelog or migration plan to handle migrating existing
DynamoDB journal records (mapping old "batchId" to new "batch_id") where
applicable.
| override def addPipelineEntry(execution: Execution): Unit = { | ||
| // ToDo add the implementation for CSV | ||
| } |
There was a problem hiding this comment.
Don't silently drop pipeline execution records in CSV journaling.
Line 96 currently no-ops addPipelineEntry, which causes silent data loss for execution-level journal entries on this backend. Either implement persistence or fail fast.
Proposed fix
override def addPipelineEntry(execution: Execution): Unit = {
- // ToDo add the implementation for CSV
+ throw new UnsupportedOperationException(
+ "Pipeline execution journaling is not implemented for JournalHadoopCsv."
+ )
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| override def addPipelineEntry(execution: Execution): Unit = { | |
| // ToDo add the implementation for CSV | |
| } | |
| override def addPipelineEntry(execution: Execution): Unit = { | |
| throw new UnsupportedOperationException( | |
| "Pipeline execution journaling is not implemented for JournalHadoopCsv." | |
| ) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala`
around lines 96 - 98, The addPipelineEntry method in JournalHadoopCsv currently
no-ops which silently drops Execution records; update
JournalHadoopCsv.addPipelineEntry(execution: Execution) to either persist the
execution to the CSV journal using the same CSV writing/append logic used for
other journal rows (serialize the relevant Execution fields into the CSV schema
and flush/close the writer) or, if CSV journaling cannot support execution-level
entries, fail fast by throwing a clear
NotImplementedException/IllegalStateException that includes "addPipelineEntry"
and the Execution id to prevent silent data loss; locate the CSV write/append
helpers used elsewhere in JournalHadoopCsv and reuse them for consistency.
| val executors = spark.sparkContext.getExecutorMemoryStatus.keySet | ||
| .filter(_ != "driver") |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In Apache Spark 3.x, what is the exact key format returned by SparkContext.getExecutorMemoryStatus? Specifically: does it include a literal "driver" key or host:port keys (including driver), and what is the recommended way to exclude the driver when counting executors?
💡 Result:
In Spark 3.x, SparkContext.getExecutorMemoryStatus returns a Map whose keys correspond to the BlockManagerId “block manager” identities known to the driver, and the map includes an entry for the driver’s BlockManager as well (i.e., it is not a pure “executors-only” list). [1][2] 1) Does it include a literal "driver" key? No source I found for Spark 3.x indicates that the key is the literal string "driver". The public SparkContext API docs only say it returns a map “from the block manager” (not how keys are formatted), and Spark’s status APIs indicate that driver and executors are both represented as SparkExecutorInfo/SparkStatusTracker entries (the driver is included). [1][2] 2) Are the keys host:port (including driver)? The keys are derived from BlockManagerId. Spark’s status APIs model each executor (and the driver) with separate host and port fields (SparkExecutorInfo has host and port). This implies the corresponding BlockManagerId-based identifier includes host/port (or equivalent) rather than a literal "driver" string. [3][2] 3) Recommended way to exclude the driver when counting executors Do not try to filter by a guessed key string format (e.g., literal "driver"). Instead, use Spark’s executor status APIs that explicitly distinguish host/port for each reported executor entry and then exclude the driver by matching against the driver’s own host+port (or by using the fact that SparkStatusTracker includes driver and executors). [2] Concrete pattern (conceptual): - Use SparkStatusTracker / executor summary to retrieve all known executor infos (it “include[s] information for both the driver and executors”). [2] - Count only those entries where the executor’s identity is not the driver’s identity (match host and port against the driver info you capture from the same API call / context). [2][3] If you still use getExecutorMemoryStatus specifically: - Treat the keys as BlockManagerId-derived strings and exclude the driver by detecting the driver BlockManagerId (again by matching to the driver identity from SparkStatusTracker/SparkExecutorInfo, rather than relying on a literal "driver" key). This avoids relying on any undocumented string formatting details. [2] Note: Many practitioners observe startup timing effects where local/clusters may temporarily show only the driver-like executor(s); that’s consistent with the driver being counted as an executor-like entity in some configurations. [4]
Citations:
- 1: https://api-docs.databricks.com/scala/spark/latest/org/apache/spark/SparkContext.html
- 2: https://downloads.apache.org/spark/docs/3.3.3/api/java/org/apache/spark/SparkStatusTracker.html
- 3: https://downloads.apache.org/spark/docs/3.0.3/api/java/org/apache/spark/SparkExecutorInfo.html
- 4: https://stackoverflow.com/questions/51342460/getexecutormemorystatus-size-not-outputting-correct-num-of-executors
🏁 Script executed:
# First, check the specific lines mentioned in the review
wc -l /repo/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scalaRepository: AbsaOSS/pramen
Length of output: 168
🏁 Script executed:
# Read the specific lines with context (lines 175-190 and 215-230)
sed -n '175,190p' /repo/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scalaRepository: AbsaOSS/pramen
Length of output: 180
🏁 Script executed:
# Read the second location mentioned (lines 215-230)
sed -n '215,230p' /repo/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scalaRepository: AbsaOSS/pramen
Length of output: 180
🏁 Script executed:
# Search for other uses of getExecutorMemoryStatus in the codebase to see patterns
rg "getExecutorMemoryStatus" -B 3 -A 3Repository: AbsaOSS/pramen
Length of output: 1611
🏁 Script executed:
# Search for "driver" filtering patterns in the codebase
rg "driver" -A 2 -B 2 --glob="*.scala" | head -80Repository: AbsaOSS/pramen
Length of output: 8683
🏁 Script executed:
# Get the file path correct and read lines around both locations
fd "AppRunner.scala" -t fRepository: AbsaOSS/pramen
Length of output: 131
🏁 Script executed:
# Read the file to see context around both problematic lines
head -250 pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala | tail -100Repository: AbsaOSS/pramen
Length of output: 3606
🏁 Script executed:
# Check Spark version used in the project
rg "sparkVersion|spark.*version" --max-count=5 pramen/project/Dependencies.scalaRepository: AbsaOSS/pramen
Length of output: 607
Replace != "driver" string comparison with proper BlockManagerId matching to exclude driver executors.
The literal "driver" key does not exist in getExecutorMemoryStatus.keySet. Spark 3.x maps executor and driver identities as BlockManagerId-derived keys (typically host:port format), so the current filter has no effect and fails to exclude the driver entry. This causes executor counts and memory metadata to be incorrect.
Use SparkStatusTracker to explicitly distinguish executors from the driver, or match the driver's BlockManagerId identity instead of relying on string comparison.
Locations affected
- Line 182-183: `setMinMaxExecutors` method (affects min/max executor counts) - Line 222-223: `getNumberOfExecutorMemoryGb` method (affects executor memory detection)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala`
around lines 182 - 183, The code currently filters executor memory status with
.filter(_ != "driver") which never matches BlockManagerId keys; instead use
SparkStatusTracker to get real executor ids and exclude the driver: call
spark.sparkContext.statusTracker.getExecutorInfos.map(_.id).toSet to build
executorIds (this set contains "driver" for the driver process), then replace
the current executors calculation with filtering memoryStatus.keySet by whether
the key corresponds to one of those executorIds (e.g.
memoryStatus.keySet.filter(k => executorIds.exists(id => k.contains(id))) and/or
explicitly exclude id == "driver"); apply this change in both setMinMaxExecutors
and getNumberOfExecutorMemoryGb, referencing the executors variable and the
SparkStatusTracker usage.
Unit Test Coverage
Files
|
535d5a8 to
0e4acfe
Compare
Closes #738
Summary by CodeRabbit
Release Notes