Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement nats-jetstreaming subscription with multiple consumer modes #85

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

goldensnitch123
Copy link

@goldensnitch123 goldensnitch123 commented Mar 2, 2025

This commit implements subscription based on the nats-jetstreaming queue, leveraging JetStream to offer multiple consumer modes and supporting various consumer configurations and consumption methods to meet the demands of different business scenarios.

Greptile Summary

This PR implements NATS JetStream subscription functionality in the go-queue library, providing flexible messaging capabilities with multiple consumer modes and configuration options.

  • Added natsmq/consumer/consumer.go with ConsumerManager supporting both push-based and pull-based consumption methods
  • Added natsmq/publisher/publisher.go with JetStreamPublisher for message publishing to subjects
  • Created natsmq/common package with stream management utilities and configuration structures
  • Implemented comprehensive consumer configuration in natsmq/consumer/config.go with support for different acknowledgment policies
  • Included example implementations and Docker Compose configuration for development testing

Greptile AI

💡 (1/5) You can manually trigger the bot by mentioning @greptileai in a comment!

This commit implements subscription based on the nats-jetstreaming queue, leveraging JetStream to offer multiple consumer modes and supporting various consumer configurations and consumption methods to meet the demands of different business scenarios.
Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 file(s) reviewed, 20 comment(s)
Edit PR Review Bot Settings | Greptile

Comment on lines +7 to +8
- "--jetstream"
- "--debug"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Debug mode is enabled which may impact performance in production. Consider making this configurable or disabling for production deployments.

Comment on lines +13 to +15
ports:
- "4222:4222"
- "8222:8222"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Ports are exposed directly to the host without any network isolation. Consider using internal networks for production deployments.

log.Fatalf("failed to create consumer manager: %v", err)
}

go cm.Start()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Start() is a blocking function that only returns when Stop() is called, but it's being run in a goroutine without any synchronization mechanism to ensure it's fully initialized before potential shutdown signals arrive.

Comment on lines +63 to +64
cm.Stop()
time.Sleep(time.Second)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Using time.Sleep() for shutdown coordination is not ideal. Consider using a WaitGroup or a done channel to properly wait for the consumer manager to fully stop.

Comment on lines +38 to +42
- consumerConfig:
filterSubjects: ["user.recharge"]
delivery:
consumptionMethod: "fetchNoWait"
streamName: "user"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: This consumer configuration is missing a name and durable property, unlike the other configurations. While this may work, it could make tracking and managing this consumer difficult in production.

log.Printf("failed to initialize jetstream for stream %q: %v", streamName, err)
continue
}
ctx := context.Background()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Using a background context without timeout for stream creation could lead to hanging operations if the NATS server is unresponsive.

//
// nc - pointer to the NATS connection
// cfgs - list of JetStreamConfig configurations to register
func RegisterStreamInstances(nc *nats.Conn, cfgs []*JetStreamConfig) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: This function lacks validation for the nc parameter. If nil is passed, it will cause panic when used.

Suggested change
func RegisterStreamInstances(nc *nats.Conn, cfgs []*JetStreamConfig) {
func RegisterStreamInstances(nc *nats.Conn, cfgs []*JetStreamConfig) {
if nc == nil {
log.Printf("error: nil NATS connection provided to RegisterStreamInstances")
return
}

Comment on lines +61 to +84
for streamName, streamMgr := range streamRegistry {
streamInstLock.RLock()
_, exists := streamInstances[streamName]
streamInstLock.RUnlock()
if exists {
log.Printf("streamInstance %q already created", streamName)
continue
}
// Initialize JetStream context
if err := streamMgr.InitJetStream(nc); err != nil {
log.Printf("failed to initialize jetstream for stream %q: %v", streamName, err)
continue
}
ctx := context.Background()
stream, err := streamMgr.CreateStream(ctx)
if err != nil {
log.Printf("failed to create stream %q: %v", streamName, err)
continue
}
streamInstLock.Lock()
streamInstances[streamName] = stream
streamInstLock.Unlock()
log.Printf("streamInstance %q created", streamName)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Potential race condition when iterating over streamRegistry while other goroutines might modify it. Consider making a copy of the registry keys before iteration.

natsConf *common.NatsConfig

// List of global JetStream configurations parsed from a config file.
JetStreamConfigs []*common.JetStreamConfig
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: JetStreamConfigs field is exported but doesn't need to be. Consider making it private since it's only used internally.

Comment on lines +227 to +246
func (cm *ConsumerManager) runPullMessages(ctx context.Context, cfg *ConsumerQueueConfig, fetchFn func(num int) (jetstream.MessageBatch, error)) {
for {
select {
case <-ctx.Done():
return
default:
}
msgs, err := fetchFn(cfg.Delivery.FetchCount)
if err != nil {
log.Printf("error fetching messages: %v", err)
continue
}
for msg := range msgs.Messages() {
cm.ackMessage(cfg, msg)
}
if fetchErr := msgs.Error(); fetchErr != nil {
log.Printf("error after fetching messages: %v", fetchErr)
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: No backoff mechanism when errors occur in the pull loop. Consider adding exponential backoff to avoid hammering the server during error conditions.

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 file(s) reviewed, 10 comment(s)
Edit PR Review Bot Settings | Greptile

Comment on lines +3 to +4
services:
nats:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: No volume configuration for persistence. JetStream data will be lost when container restarts.

Comment on lines +41 to +45
var queueConfigs []*consumer.ConsumerQueueConfig
for i := range c.ConsumerQueues {
c.ConsumerQueues[i].Handler = &MyConsumeHandler{}
queueConfigs = append(queueConfigs, &c.ConsumerQueues[i])
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Creating a new slice and appending to it is redundant. Could directly initialize queueConfigs with the right capacity.

Comment on lines +70 to +72
return &JetStreamManager{
streamConf: streamConf,
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: JS field is declared in JetStreamManager struct but not initialized in NewJetStream function. This could lead to nil pointer dereference when using the JS field.

Comment on lines +25 to +29
func RegisterManager(streamID string, mgr *JetStreamManager) {
registryLock.Lock()
defer registryLock.Unlock()
streamRegistry[streamID] = mgr
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: This function lacks validation for the mgr parameter. If nil is passed, it will silently accept it and cause issues later.

Suggested change
func RegisterManager(streamID string, mgr *JetStreamManager) {
registryLock.Lock()
defer registryLock.Unlock()
streamRegistry[streamID] = mgr
}
func RegisterManager(streamID string, mgr *JetStreamManager) {
if mgr == nil {
log.Printf("error: nil JetStreamManager provided for stream %q", streamID)
return
}
registryLock.Lock()
defer registryLock.Unlock()
streamRegistry[streamID] = mgr
}

// ConsumerQueueConfig defines the full configuration for building a consumer queue.
// If StreamName is empty, the default stream (DefaultStream) will be used.
type ConsumerQueueConfig struct {
StreamName string `json:"streamName,optional"` // Name of the stream to associate with; if empty, uses default stream
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The comment mentions 'DefaultStream' but there's no definition of this constant in the file. Make sure it's defined elsewhere or document where it comes from.

stream = s
}

ctx := context.Background()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider using a context with timeout here to prevent blocking indefinitely during consumer creation

Comment on lines +208 to +211
if err := cfg.Handler.Consume(msg); err != nil {
log.Printf("message processing error: %v", err)
return
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: No handling for message processing errors. Consider implementing negative acknowledgment (NAK) or retry logic when Handler.Consume returns an error

// This function blocks until Stop() is invoked.
func (cm *ConsumerManager) Start() {
// Initialize stream instances (register or update streams) based on the provided JetStream configurations.
common.RegisterStreamInstances(cm.nc, cm.JetStreamConfigs)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: No error handling for RegisterStreamInstances. If stream registration fails, the consumer will continue without streams

Comment on lines +250 to +260
func (cm *ConsumerManager) Stop() {
for _, consumerCtx := range cm.subscribers {
consumerCtx.Stop()
}
for _, cancel := range cm.cancelFuncs {
cancel()
}
if cm.nc != nil {
cm.nc.Close()
}
close(cm.stopCh)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: No graceful shutdown mechanism to ensure in-flight messages are processed before stopping. Consider adding a wait group or timeout

Comment on lines +12 to +13
// JetStreamPublisher implements the Publisher interface by utilizing an internal JetStream context for message publishing.
// Note: It is recommended to rename the package from "publiser" to "publisher" for better clarity.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: The comment mentions renaming the package from 'publiser' to 'publisher', but the package is already correctly named 'publisher' on line 1. This comment appears to be outdated and should be removed.

Suggested change
// JetStreamPublisher implements the Publisher interface by utilizing an internal JetStream context for message publishing.
// Note: It is recommended to rename the package from "publiser" to "publisher" for better clarity.
// JetStreamPublisher implements the Publisher interface by utilizing an internal JetStream context for message publishing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant