- 
                Notifications
    You must be signed in to change notification settings 
- Fork 5
fix duplicate nats jobs #189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| WalkthroughThe pull request introduces enhancements to the NATS jobs driver in the RoadRunner project, focusing on improving state management and handling of in-progress jobs. A new  Changes
 Sequence DiagramsequenceDiagram
    participant Driver
    participant Listener
    participant Job
    
    Driver->>Listener: Start Job Processing
    Listener->>Listener: Check inProgressItems
    alt Job Not In Progress
        Listener->>Job: Process Job
        Listener->>Driver: Update inProgressItems
    else Job Already In Progress
        Listener->>Listener: Log Duplicate Job
        Listener->>Driver: Skip Processing
    end
    Listener->>Driver: Cleanup Job
Poem
 ✨ Finishing Touches
 Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit: 
 Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
 Other keywords and placeholders
 CodeRabbit Configuration File ( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
natsjobs/listener.go (1)
106-119: Consider refactoring nested function declarations for better readability.While the implementation is functionally correct, the nested function declarations could be simplified for better maintainability.
Consider extracting the requeue cleanup wrapper:
-item.Options.requeueFn = func(item *Item) error { - return c.wrapCleanupFn(item.ID(), func() error { - return c.requeue(item) - })() -} +func (c *Driver) wrapRequeueCleanup(item *Item) func() error { + return c.wrapCleanupFn(item.ID(), func() error { + return c.requeue(item) + }) +} + +item.Options.requeueFn = func(item *Item) error { + return c.wrapRequeueCleanup(item)() +}tests/php_test_files/jobs/jobs_long_task.php (1)
1-25: Consider enhancing error handling in the test script.While the script serves its purpose for testing, the error handling could be more robust.
Consider adding more specific error handling:
try { sleep(35); $task->ack(); } catch (\Throwable $e) { - $task->error((string)$e); + $error_message = sprintf( + "Task failed: %s\nFile: %s\nLine: %d", + $e->getMessage(), + $e->getFile(), + $e->getLine() + ); + $task->error($error_message); }tests/jobs_nats_test.go (1)
1258-1339: Consider adding more specific test assertions.While the test covers the basic functionality, it could benefit from more detailed assertions.
Consider adding assertions for the exact message content and timing:
require.Equal(t, 1, oLogger.FilterMessageSnippet("job processing was started").Len()) require.Equal(t, 1, oLogger.FilterMessageSnippet("job already in progress").Len()) + +// Verify timing of the duplicate job detection +messages := oLogger.FilterMessageSnippet("job already in progress") +require.Equal(t, 1, len(messages)) +firstMessage := messages[0] +require.Contains(t, firstMessage.Message, "test") // Verify job ID + +// Verify the time difference between start and duplicate detection +startMessages := oLogger.FilterMessageSnippet("job processing was started") +require.Equal(t, 1, len(startMessages)) +timeDiff := messages[0].Time.Sub(startMessages[0].Time) +require.Greater(t, timeDiff.Seconds(), 30.0) // Verify minimum processing timetests/configs/.rr-nats-long-task.yaml (1)
1-40: Consider documenting the test configuration.The configuration file would benefit from comments explaining:
- The purpose of this test configuration
- The relationship with the duplicate jobs fix
- Why specific values were chosen (e.g., timeouts, prefetch)
Add comments like:
version: '3' +# Test configuration for verifying NATS duplicate jobs fix +# - Uses long-running tasks (35s) to trigger potential duplicates +# - Single worker with prefetch=1 to control message delivery +# - Configured for at-least-once delivery semantics testing
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
- go.work.sumis excluded by- !**/*.sum
📒 Files selected for processing (5)
- natsjobs/driver.go(2 hunks)
- natsjobs/listener.go(3 hunks)
- tests/configs/.rr-nats-long-task.yaml(1 hunks)
- tests/jobs_nats_test.go(1 hunks)
- tests/php_test_files/jobs/jobs_long_task.php(1 hunks)
🔇 Additional comments (7)
natsjobs/listener.go (3)
45-54: Well-structured cleanup wrapper implementation!The implementation is concise and handles the cleanup of
inProgressItemscorrectly. The error handling ensures the map is only cleaned up on successful operations.
84-99: Effective duplicate job prevention implementation!The use of
LoadOrStoreprovides thread-safe checking for in-progress items. The error handling is comprehensive, and the logging provides clear visibility into duplicate job scenarios.
Line range hint
134-152: Well-implemented auto-ack handling!The implementation properly handles auto-ack scenarios with appropriate error handling and cleanup. The sequence of operations (ack → delete → cleanup) is correct.
natsjobs/driver.go (1)
39-47: Clean integration of inProgressItems tracking!The addition of
inProgressItemsto the Driver struct and its initialization is well-implemented. The use ofsync.Mapensures thread-safe operations.Also applies to: 134-140
tests/configs/.rr-nats-long-task.yaml (3)
31-38: Configuration aligns well with duplicate job prevention.The NATS pipeline configuration is well-structured for testing the duplicate jobs fix:
prefetch: 1ensures controlled message delivery
deliver_new: "true"prioritizes new messages
delete_after_ack: truehelps maintain clean state
23-27: Worker pool configuration supports the test scenario.The worker pool settings are appropriate for testing the duplicate jobs fix:
- Single worker ensures sequential processing
- No max jobs limit (
max_jobs: 0) prevents artificial constraints- Generous timeouts (60s) accommodate the long-running tasks
7-9: Verify the relay timeout configuration.The relay timeout is set to 20s but according to the summary, the test involves tasks running for 35s. This mismatch could cause premature termination of long-running tasks.
Run this script to check the task duration in the PHP file:
✅ Verification successful
The relay timeout configuration is correct for this test case
The 20-second relay timeout combined with a 35-second task duration is intentionally configured this way to test the system's behavior with long-running tasks that exceed the timeout period. This setup is appropriate for verifying the duplicate jobs issue mentioned in the PR objectives.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the sleep duration in the PHP test file rg "sleep|usleep" "php_test_files/jobs/jobs_long_task.php"Length of output: 137
Script:
#!/bin/bash # Find PHP files containing "long" or "task" in their names fd -e php -E vendor '(long|task)'Length of output: 80
Script:
#!/bin/bash # Check sleep duration in the test file cat "tests/php_test_files/jobs/jobs_long_task.php"Length of output: 575
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
natsjobs/listener.go (4)
84-99: Simplify error handling pattern.The nested error handling could be simplified to improve readability while maintaining the same functionality.
Consider this more concise approach:
if _, loaded := c.inProgressItems.LoadOrStore(item.ID(), struct{}{}); loaded { - err = m.InProgress() - if err != nil { - errn := m.Nak() - if errn != nil { - c.log.Error("failed to send Nak state", zap.Error(errn), zap.Error(err)) - continue - } - c.log.Error("failed to send InProgress state", zap.Error(err)) - continue - } + if err := m.InProgress(); err != nil { + if errn := m.Nak(); errn != nil { + c.log.Error("failed to send Nak state", zap.Error(errn), zap.Error(err)) + } else { + c.log.Error("failed to send InProgress state", zap.Error(err)) + } + continue + } c.log.Info("job already in progress", zap.String("id", item.ID())) span.End() continue }
106-119: Reduce code duplication in wrapper functions.The wrapper pattern for
nakWithDelayandrequeueFnis duplicated. Consider extracting a helper function to make the code more DRY.Consider this approach:
+// wrapDelayedCleanupFn helper for operations that need additional setup +func (c *Driver) wrapDelayedCleanupFn(id string, setupFn func() func() error) func() error { + return c.wrapCleanupFn(id, func() error { + return setupFn()() + }) +} // in listenerStart: -item.Options.nakWithDelay = func(delay time.Duration) error { - return c.wrapCleanupFn(item.ID(), func() error { - return m.NakWithDelay(delay) - })() -} +item.Options.nakWithDelay = func(delay time.Duration) error { + return c.wrapDelayedCleanupFn(item.ID(), func() func() error { + return func() error { return m.NakWithDelay(delay) } + })() +}
134-137: Maintain consistent error handling patterns.The error handling pattern here differs from other parts of the code. Consider maintaining consistency with the simplified error handling pattern suggested earlier.
Consider this approach:
if item.Options.AutoAck { c.log.Debug("auto_ack option enabled") - err := item.Options.ack() - if err != nil { + if err := item.Options.ack(); err != nil { item = nil c.log.Error("message acknowledge", zap.Error(err)) span.RecordError(err)
Line range hint
1-180: Overall implementation effectively addresses task duplication.The implementation successfully addresses the task duplication issue by using a thread-safe
sync.Mapto track in-progress items and implementing proper cleanup mechanisms. The code is well-structured and includes appropriate error handling and logging.While there are some suggested improvements for code organization and error handling patterns, the core functionality is solid and achieves the PR objectives.
Consider adding metrics/monitoring for:
- Number of in-progress items
- Frequency of duplicate job attempts
- Cleanup success/failure rates
This would help track the effectiveness of the solution in production.
| // wrapCleanupFn wrap the cleanup function to ensure the inProgressItems map is updated correctly | ||
| func (c *Driver) wrapCleanupFn(id string, fn func() error) func() error { | ||
| return func() error { | ||
| err := fn() | ||
| if err == nil { | ||
| c.inProgressItems.Delete(id) | ||
| } | ||
| return err | ||
| } | ||
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider cleanup on all execution paths.
The cleanup function only removes items from inProgressItems when err == nil. This could lead to memory leaks if a job consistently fails, as failed items will remain in the map indefinitely.
Consider cleaning up regardless of the error state:
 func (c *Driver) wrapCleanupFn(id string, fn func() error) func() error {
 	return func() error {
 		err := fn()
+		// Always clean up, regardless of error
+		c.inProgressItems.Delete(id)
-		if err == nil {
-			c.inProgressItems.Delete(id)
-		}
 		return err
 	}
 }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // wrapCleanupFn wrap the cleanup function to ensure the inProgressItems map is updated correctly | |
| func (c *Driver) wrapCleanupFn(id string, fn func() error) func() error { | |
| return func() error { | |
| err := fn() | |
| if err == nil { | |
| c.inProgressItems.Delete(id) | |
| } | |
| return err | |
| } | |
| } | |
| // wrapCleanupFn wrap the cleanup function to ensure the inProgressItems map is updated correctly | |
| func (c *Driver) wrapCleanupFn(id string, fn func() error) func() error { | |
| return func() error { | |
| err := fn() | |
| // Always clean up, regardless of error | |
| c.inProgressItems.Delete(id) | |
| return err | |
| } | |
| } | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
natsjobs/listener.go (2)
87-102: Enhance error handling and span management.Consider these improvements:
- Add job ID to error logs for better debugging
- Move
span.End()after error handling to capture error contextif _, loaded := c.inProgressItems.LoadOrStore(item.ID(), struct{}{}); loaded { err = m.InProgress() if err != nil { errn := m.Nak() if errn != nil { - c.log.Error("failed to send Nak state", zap.Error(errn), zap.Error(err)) + c.log.Error("failed to send Nak state", + zap.String("job_id", item.ID()), + zap.Error(errn), + zap.Error(err)) + span.RecordError(errn) + span.End() continue } - c.log.Error("failed to send InProgress state", zap.Error(err)) + c.log.Error("failed to send InProgress state", + zap.String("job_id", item.ID()), + zap.Error(err)) + span.RecordError(err) + span.End() continue } c.log.Info("job already in progress", zap.String("id", item.ID())) span.End() continue }
109-122: Simplify nested function creation.The nested function creation in
nakWithDelayandrequeueFncould be simplified for better readability.-item.Options.nakWithDelay = func(delay time.Duration) error { - return c.wrapCleanupFn(item.ID(), func() error { - return m.NakWithDelay(delay) - })() -} +item.Options.nakWithDelay = func(delay time.Duration) error { + nakFn := func() error { return m.NakWithDelay(delay) } + return c.wrapCleanupFn(item.ID(), nakFn)() +} -item.Options.requeueFn = func(item *Item) error { - return c.wrapCleanupFn(item.ID(), func() error { - return c.requeue(item) - })() -} +item.Options.requeueFn = func(item *Item) error { + requeueFn := func() error { return c.requeue(item) } + return c.wrapCleanupFn(item.ID(), requeueFn)() +}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- natsjobs/listener.go(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: Analyze (go)
- GitHub Check: NATS plugin (Go stable, PHP 8.3, OS ubuntu-latest)
- GitHub Check: NATS durability plugin (Go stable, PHP 8.3, OS ubuntu-latest)
🔇 Additional comments (3)
natsjobs/listener.go (3)
18-20: Great documentation!The comment clearly explains the NATS server's message redelivery behavior, which is crucial context for understanding the duplicate jobs issue being fixed.
48-57: Consider cleanup on all execution paths.The cleanup function only removes items from
inProgressItemswhenerr == nil. This could lead to memory leaks if a job consistently fails, as failed items will remain in the map indefinitely.
137-140:⚠️ Potential issueEnsure cleanup happens even when message deletion fails.
When auto-ack is enabled, the
inProgressItemsentry should be cleaned up even if the subsequent message deletion fails. Currently, if message deletion fails, the item remains in the map.if item.Options.AutoAck { c.log.Debug("auto_ack option enabled") err := item.Options.ack() if err != nil { item = nil c.log.Error("message acknowledge", zap.Error(err)) span.RecordError(err) span.End() continue } if item.Options.deleteAfterAck { err = c.stream.DeleteMsg(context.Background(), meta.Sequence.Stream) if err != nil { + // Clean up the item from inProgressItems even if deletion fails + c.inProgressItems.Delete(item.ID()) c.log.Error("delete message", zap.Error(err)) item = nil span.RecordError(err) span.End() continue } }Likely invalid or redundant comment.
| Hey @shellphy 👋🏻 I'm not sure this is the correct way to handle case with the re-delivery: 
 I think, naturally, while processing the message, should be implemented something like a ticker with a currently in progress messages. To naturally notify the Nats server about in-progress messages instead of skipping them. I understand that this solution won't work for everyone, but worth saying that increasing ackwait timeout would also solve that problem. | 
Reason for This PR
Avoid duplication of tasks
Description of Changes
NATS consumer implements at-least-once message delivery semantics. By default, if a task is being processed but no "in-progress" signal is sent back to the NATS server, the server will re-deliver the same message every 30 seconds, which leads to duplicate task executions when the task processing time exceeds 30 seconds.
The issue can be reproduced by running the test case
TestNATSLongTaskErragainst the original version. The test fails because the task is executed twice, which validates my concern about duplicate task executions.To prevent duplicate task executions, I implemented a concurrency-safe solution using a sync.Map named inProgressItems to track the status of running tasks. This map maintains the state of currently executing tasks, and entries are promptly cleaned up after task completion (ACK), failure (NACK), or termination (TERM) operations.
License Acceptance
By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.
PR Checklist
[Author TODO: Meet these criteria.][Reviewer TODO: Verify that these criteria are met. Request changes if not]git commit -s).CHANGELOG.md.Summary by CodeRabbit
Release Notes
New Features
Tests
The changes improve the robustness of job processing by tracking in-progress items and preventing duplicate job executions.