Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement nats-jetstreaming subscription with multiple consumer modes #85

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions example/natsmq/consumer/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# config.yaml

nats:
url: "nats://127.0.0.1:4222"

streams:
- name: "user"
description: "user stream"
subjects: ["user.>"]

consumerQueues:
- consumerConfig:
name: "taskRegister"
durable: "taskRegister"
filterSubjects: ["user.register"]
delivery:
consumptionMethod: "consumer"
queueConsumerCount: 3
streamName: "user"

- consumerConfig:
name: "activityRegister"
durable: "activityRegister"
filterSubjects: ["user.register"]
delivery:
consumptionMethod: "consumer"
streamName: "user"

- consumerConfig:
name: "taskRecharger"
durable: "taskRecharger"
filterSubjects: ["user.recharge"]
delivery:
consumptionMethod: "fetch"
queueConsumerCount: 2
streamName: "user"

- consumerConfig:
filterSubjects: ["user.recharge"]
delivery:
consumptionMethod: "fetchNoWait"
streamName: "user"
Comment on lines +38 to +42
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: This consumer configuration is missing a name and durable property, unlike the other configurations. While this may work, it could make tracking and managing this consumer difficult in production.


- consumerConfig:
filterSubjects: ["subject.activity.*"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The subject pattern 'subject.activity.*' doesn't match any subjects in the defined 'user' stream (which only includes 'user.>' subjects). This consumer won't receive messages unless using the default stream.

ordered: true
delivery:
consumptionMethod: "consumer"
65 changes: 65 additions & 0 deletions example/natsmq/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package main

import (
"flag"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/zeromicro/go-queue/natsmq/common"
"github.com/zeromicro/go-queue/natsmq/consumer"
"github.com/zeromicro/go-zero/core/conf"
"log"
"os"
"os/signal"
"syscall"
"time"
)

var configFile = flag.String("f", "config.yaml", "Specify the config file")

type ConsumerExampleConfig struct {
Streams []*common.JetStreamConfig `json:"streams"`
Nats NatsConf `json:"nats"`
ConsumerQueues []consumer.ConsumerQueueConfig `json:"consumerQueues"`
}

type NatsConf struct {
URL string `json:"url"`
}

type MyConsumeHandler struct{}

func (h *MyConsumeHandler) Consume(msg jetstream.Msg) error {
log.Printf("subject [%s] Received message: %s", msg.Subject(), string(msg.Data()))
return nil
}

func main() {
flag.Parse()
var c ConsumerExampleConfig
conf.MustLoad(*configFile, &c)

var queueConfigs []*consumer.ConsumerQueueConfig
for i := range c.ConsumerQueues {
c.ConsumerQueues[i].Handler = &MyConsumeHandler{}
queueConfigs = append(queueConfigs, &c.ConsumerQueues[i])
}
Comment on lines +41 to +45
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Creating a new slice and appending to it is redundant. Could directly initialize queueConfigs with the right capacity.


natsConf := &common.NatsConfig{
URL: c.Nats.URL,
Options: []nats.Option{},
}

cm, err := consumer.NewConsumerManager(natsConf, c.Streams, queueConfigs)
if err != nil {
log.Fatalf("failed to create consumer manager: %v", err)
}

go cm.Start()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Start() is a blocking function that only returns when Stop() is called, but it's being run in a goroutine without any synchronization mechanism to ensure it's fully initialized before potential shutdown signals arrive.


sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigChan
log.Printf("Received signal %s, shutting down...", sig)
cm.Stop()
time.Sleep(time.Second)
Comment on lines +63 to +64
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Using time.Sleep() for shutdown coordination is not ideal. Consider using a WaitGroup or a done channel to properly wait for the consumer manager to fully stop.

}
15 changes: 15 additions & 0 deletions example/natsmq/consumer/nats-jetsteam-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: "3.6"

services:
nats:
Comment on lines +3 to +4
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: No volume configuration for persistence. JetStream data will be lost when container restarts.

image: nats:2.1.8-alpine3.11
command:
- "--jetstream"
- "--debug"
Comment on lines +7 to +8
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Debug mode is enabled which may impact performance in production. Consider making this configurable or disabling for production deployments.

- "--port"
- "4222"
- "--http_port"
- "8222"
ports:
- "4222:4222"
- "8222:8222"
Comment on lines +13 to +15
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Ports are exposed directly to the host without any network isolation. Consider using internal networks for production deployments.

4 changes: 4 additions & 0 deletions example/natsmq/publisher/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# config.yaml

nats:
url: "nats://127.0.0.1:4222"
66 changes: 66 additions & 0 deletions example/natsmq/publisher/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
"context"
"flag"
"github.com/nats-io/nats.go"
"github.com/zeromicro/go-queue/natsmq/common"
"github.com/zeromicro/go-queue/natsmq/publisher"
"github.com/zeromicro/go-zero/core/conf"

"log"
"time"
)

var configFile = flag.String("f", "config.yaml", "Specify the config file")

type PublisherExampleConfig struct {
Nats NatsConf `json:"nats"`
}

type NatsConf struct {
URL string `json:"url"`
}

func main() {
flag.Parse()
var c PublisherExampleConfig
conf.MustLoad(*configFile, &c)

natsConf := &common.NatsConfig{
URL: c.Nats.URL,
Options: []nats.Option{},
}

jSPublisher, err := publisher.NewJetStreamPublisher(natsConf)

if err != nil {
log.Fatalf("failed to NewJetStreamPublisher message: %v", err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Error message is inconsistent with the function name. 'failed to NewJetStreamPublisher message' should be 'failed to create JetStreamPublisher'

Suggested change
log.Fatalf("failed to NewJetStreamPublisher message: %v", err)
log.Fatalf("failed to create JetStreamPublisher: %v", err)

}

subjects := []string{
"user.register",
"user.recharge",
"subject.activity.example",
}
messages := []string{
"Test message: user.register message",
"Test message: user.recharge message",
"Test message: subject.activity message",
}
ctx := context.Background()
for i, subj := range subjects {
msg := []byte(messages[i])
for j := 0; j < 3; j++ {
go func(s string, m []byte) {
ack, err := jSPublisher.Publish(ctx, s, m)
if err != nil {
log.Fatalf("failed to publish message: %v", err)
}
log.Printf("published message to %s, ack: %+v", s, ack.Stream)
}(subj, msg)
}
Comment on lines +55 to +62
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Using log.Fatalf in goroutines will terminate the entire program if any single publish fails. Consider using a more graceful error handling approach like logging the error and continuing.

}

time.Sleep(2 * time.Second)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Using a fixed sleep time is unreliable. Consider using a WaitGroup to properly wait for all goroutines to complete.

}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Missing defer jSPublisher.Close() to properly clean up resources when the program exits.

155 changes: 155 additions & 0 deletions natsmq/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package common

import (
"context"
"fmt"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"log"
"sync"
"time"
)

const (
DefaultStream = "defaultStream"
)

var (
streamRegistry = make(map[string]*JetStreamManager)
registryLock sync.RWMutex
streamInstances = make(map[string]jetstream.Stream)
streamInstLock sync.RWMutex
)

// RegisterManager registers a JetStreamManager with the specified streamID.
func RegisterManager(streamID string, mgr *JetStreamManager) {
registryLock.Lock()
defer registryLock.Unlock()
streamRegistry[streamID] = mgr
}
Comment on lines +25 to +29
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: This function lacks validation for the mgr parameter. If nil is passed, it will silently accept it and cause issues later.

Suggested change
func RegisterManager(streamID string, mgr *JetStreamManager) {
registryLock.Lock()
defer registryLock.Unlock()
streamRegistry[streamID] = mgr
}
func RegisterManager(streamID string, mgr *JetStreamManager) {
if mgr == nil {
log.Printf("error: nil JetStreamManager provided for stream %q", streamID)
return
}
registryLock.Lock()
defer registryLock.Unlock()
streamRegistry[streamID] = mgr
}


// GetManager retrieves the JetStreamManager for the given streamID.
// Returns the manager and true if found; otherwise returns nil and false.
func GetManager(streamID string) (*JetStreamManager, bool) {
registryLock.RLock()
defer registryLock.RUnlock()
mgr, ok := streamRegistry[streamID]
return mgr, ok
}

// RegisterStreamInstances initializes the JetStream contexts (if needed),
// creates or updates streams based on the provided JetStream configurations,
// and stores the stream instances in a global map for later usage.
// Parameters:
//
// nc - pointer to the NATS connection
// cfgs - list of JetStreamConfig configurations to register
func RegisterStreamInstances(nc *nats.Conn, cfgs []*JetStreamConfig) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: This function lacks validation for the nc parameter. If nil is passed, it will cause panic when used.

Suggested change
func RegisterStreamInstances(nc *nats.Conn, cfgs []*JetStreamConfig) {
func RegisterStreamInstances(nc *nats.Conn, cfgs []*JetStreamConfig) {
if nc == nil {
log.Printf("error: nil NATS connection provided to RegisterStreamInstances")
return
}

// Register managers for each provided configuration if not already registered.
if len(cfgs) > 0 {
for _, cfg := range cfgs {
if _, ok := GetManager(cfg.Name); !ok {
mgr := NewJetStream(cfg)
RegisterManager(cfg.Name, mgr)
} else {
log.Printf("manager for stream %q already registered", cfg.Name)
}
}
}

// Iterate through all registered stream managers to initialize JetStream and create stream instances.
for streamName, streamMgr := range streamRegistry {
streamInstLock.RLock()
_, exists := streamInstances[streamName]
streamInstLock.RUnlock()
if exists {
log.Printf("streamInstance %q already created", streamName)
continue
}
// Initialize JetStream context
if err := streamMgr.InitJetStream(nc); err != nil {
log.Printf("failed to initialize jetstream for stream %q: %v", streamName, err)
continue
}
ctx := context.Background()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Using a background context without timeout for stream creation could lead to hanging operations if the NATS server is unresponsive.

stream, err := streamMgr.CreateStream(ctx)
if err != nil {
log.Printf("failed to create stream %q: %v", streamName, err)
continue
}
streamInstLock.Lock()
streamInstances[streamName] = stream
streamInstLock.Unlock()
log.Printf("streamInstance %q created", streamName)
}
Comment on lines +61 to +84
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Potential race condition when iterating over streamRegistry while other goroutines might modify it. Consider making a copy of the registry keys before iteration.

}

// GetStream retrieves a JetStream stream instance by streamID.
// Returns the stream instance and true if found.
func GetStream(streamID string) (jetstream.Stream, bool) {
streamInstLock.RLock()
defer streamInstLock.RUnlock()
stream, ok := streamInstances[streamID]
return stream, ok
}

func init() {
// Registers a default stream manager if one hasn't been registered.
if _, ok := GetManager(DefaultStream); !ok {
defaultCfg := &JetStreamConfig{
Name: DefaultStream,
Subjects: []string{"subject.*.*"},
Description: DefaultStream,
Retention: 0,
MaxConsumers: 30,
MaxMsgs: -1,
MaxBytes: -1,
Discard: 0,
DiscardNewPerSubject: false,
MaxAge: 0,
MaxMsgsPerSubject: 10000,
MaxMsgSize: 10000,
NoAck: false,
}
defaultManager := NewJetStream(defaultCfg)
RegisterManager(DefaultStream, defaultManager)
log.Printf("default stream %q registered", DefaultStream)
}
}

// InitJetStream initializes the JetStream context for the manager using the given NATS connection.
// Parameters:
//
// nc - pointer to the NATS connection
//
// Returns:
//
// error - non-nil error if JetStream context creation fails
func (jsm *JetStreamManager) InitJetStream(nc *nats.Conn) error {
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
return fmt.Errorf("failed to create JetStream context: %w", err)
Comment on lines +129 to +132
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Closing the NATS connection here is problematic as the connection is owned by the caller. This could unexpectedly terminate connections that are still needed by other components.

Suggested change
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
return fmt.Errorf("failed to create JetStream context: %w", err)
js, err := jetstream.New(nc)
if err != nil {
return fmt.Errorf("failed to create JetStream context: %w", err)
}

}
jsm.JS = js
return nil
}

// CreateStream creates or updates a JetStream stream using the manager's configuration.
// Parameters:
//
// ctx - context to control request timeout
//
// Returns:
//
// jetstream.Stream - the created/updated stream instance
// error - non-nil error if stream creation fails
func (jsm *JetStreamManager) CreateStream(ctx context.Context) (jetstream.Stream, error) {
streamCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
stream, err := jsm.JS.CreateOrUpdateStream(streamCtx, jsm.streamConf)
if err != nil {
return nil, err
}
return stream, nil
}
Loading