Fix Kafka commit ordering after producer writes#2311
Fix Kafka commit ordering after producer writes#2311officialasishkumar wants to merge 2 commits into
Conversation
Delay manual offset completion until the downstream Kafka producer callback succeeds. Propagate producer failures through the processing future so single and batch commit paths do not acknowledge records that were not written to the next topic. Add KafkaMosipEventBus tests for producer-success commit ordering and producer-failure behavior in both single and batch processing paths. Signed-off-by: Asish Kumar <officialasishkumar@gmail.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughRefactors KafkaMosipEventBus to defer consumer offset commit until downstream producer write confirms; centralizes MDC and promise resolution into helpers and moves span closure into branch-specific locations. Adds unit tests covering producer success and failure in single and batch commit modes. Changes
Sequence Diagram(s)sequenceDiagram
participant Consumer
participant KafkaMosipEventBus
participant KafkaProducer
participant KafkaBroker
participant Promise
Consumer->>KafkaMosipEventBus: processRecord(record)
activate KafkaMosipEventBus
KafkaMosipEventBus->>KafkaMosipEventBus: handle record, extract MDC, start span
alt processing success
KafkaMosipEventBus->>KafkaProducer: write(producerRecord, callback)
activate KafkaProducer
KafkaProducer->>KafkaBroker: send message (async)
activate KafkaBroker
KafkaBroker-->>KafkaProducer: ack / failure
deactivate KafkaBroker
KafkaProducer-->>KafkaMosipEventBus: callback(success/failure)
deactivate KafkaProducer
alt producer success
KafkaMosipEventBus->>Promise: completeProcessedRecord() / commitOffset()
KafkaMosipEventBus->>KafkaMosipEventBus: closeSpan()
else producer failure
KafkaMosipEventBus->>Promise: failPromise(cause)
KafkaMosipEventBus->>KafkaMosipEventBus: closeSpan()
end
else processing failure
KafkaMosipEventBus->>Promise: failPromise(cause)
KafkaMosipEventBus->>KafkaMosipEventBus: closeSpan()
end
Promise-->>Consumer: resolved / failed
deactivate KafkaMosipEventBus
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.java`:
- Around line 374-382: The partition is only resumed on successful
producer.write; when handler.failed() you must also resume the paused partition
so the consumer can continue/retry; update the failure branch in
KafkaMosipEventBus (the handler.failed() branch that calls failPromise(promise,
handler.cause(), "Failed kafkaProducer.write")) to invoke resumePartition(...)
for the current record/partition before/after failing the promise (mirroring the
success path where completeProcessedRecord(...) and resumePartition are called),
and ensure any tracing cleanup (eventTracingHandler.closeSpan(span) and
MDC.clear()) still runs.
In
`@registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java`:
- Around line 363-405: The tests
testProcessRecordCommitsAfterProducerWriteSuccess,
testProcessRecordFailsAndDoesNotCommitWhenProducerWriteFails, and
testProcessRecordFailsBatchFutureWhenProducerWriteFails call
kafkaMosipEventBus.processRecord(...) and assert result.succeeded()/failed()
synchronously which relies on synchronous mocks
(mockKafkaProducerWriteResult/mockCommitResult); change each test to assert
completion inside the Future's asynchronous callback (e.g., result.onComplete or
using TestContext.async) so assertions (result.isSucceeded()/isFailed(),
result.cause(), and the verify/inOrder checks for kafkaProducer.write and
kafkaConsumer.commit) run after the Future actually completes, ensuring the
tests do not race if producer/commit handlers become async.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 0a18e653-d660-4209-a056-9e8cb9a70d0f
📒 Files selected for processing (2)
registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBus.javaregistration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java
Description
Fixes #2305
Tests
JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64 PATH=/usr/lib/jvm/java-21-openjdk-amd64/bin:$PATH mvn -pl registration-processor-core -Dtest=KafkaMosipEventBusTest -DfailIfNoTests=false -Dgpg.skip=true -Dmaven.javadoc.skip=true testSummary by CodeRabbit
Refactor
Tests