diff --git a/example/natsmq/consumer/config.yaml b/example/natsmq/consumer/config.yaml new file mode 100644 index 0000000..0fd8b3e --- /dev/null +++ b/example/natsmq/consumer/config.yaml @@ -0,0 +1,48 @@ +# config.yaml + +nats: + url: "nats://127.0.0.1:4222" + +streams: + - name: "user" + description: "user stream" + subjects: ["user.>"] + +consumerQueues: + - consumerConfig: + name: "taskRegister" + durable: "taskRegister" + filterSubjects: ["user.register"] + delivery: + consumptionMethod: "consumer" + queueConsumerCount: 3 + streamName: "user" + + - consumerConfig: + name: "activityRegister" + durable: "activityRegister" + filterSubjects: ["user.register"] + delivery: + consumptionMethod: "consumer" + streamName: "user" + + - consumerConfig: + name: "taskRecharger" + durable: "taskRecharger" + filterSubjects: ["user.recharge"] + delivery: + consumptionMethod: "fetch" + queueConsumerCount: 2 + streamName: "user" + + - consumerConfig: + filterSubjects: ["user.recharge"] + delivery: + consumptionMethod: "fetchNoWait" + streamName: "user" + + - consumerConfig: + filterSubjects: ["subject.activity.*"] + ordered: true + delivery: + consumptionMethod: "consumer" diff --git a/example/natsmq/consumer/consumer.go b/example/natsmq/consumer/consumer.go new file mode 100644 index 0000000..12a5dee --- /dev/null +++ b/example/natsmq/consumer/consumer.go @@ -0,0 +1,65 @@ +package main + +import ( + "flag" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/zeromicro/go-queue/natsmq/common" + "github.com/zeromicro/go-queue/natsmq/consumer" + "github.com/zeromicro/go-zero/core/conf" + "log" + "os" + "os/signal" + "syscall" + "time" +) + +var configFile = flag.String("f", "config.yaml", "Specify the config file") + +type ConsumerExampleConfig struct { + Streams []*common.JetStreamConfig `json:"streams"` + Nats NatsConf `json:"nats"` + ConsumerQueues []consumer.ConsumerQueueConfig `json:"consumerQueues"` +} + +type NatsConf struct { + URL string `json:"url"` +} + +type MyConsumeHandler struct{} + +func (h *MyConsumeHandler) Consume(msg jetstream.Msg) error { + log.Printf("subject [%s] Received message: %s", msg.Subject(), string(msg.Data())) + return nil +} + +func main() { + flag.Parse() + var c ConsumerExampleConfig + conf.MustLoad(*configFile, &c) + + var queueConfigs []*consumer.ConsumerQueueConfig + for i := range c.ConsumerQueues { + c.ConsumerQueues[i].Handler = &MyConsumeHandler{} + queueConfigs = append(queueConfigs, &c.ConsumerQueues[i]) + } + + natsConf := &common.NatsConfig{ + URL: c.Nats.URL, + Options: []nats.Option{}, + } + + cm, err := consumer.NewConsumerManager(natsConf, c.Streams, queueConfigs) + if err != nil { + log.Fatalf("failed to create consumer manager: %v", err) + } + + go cm.Start() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + sig := <-sigChan + log.Printf("Received signal %s, shutting down...", sig) + cm.Stop() + time.Sleep(time.Second) +} diff --git a/example/natsmq/consumer/nats-jetsteam-compose.yaml b/example/natsmq/consumer/nats-jetsteam-compose.yaml new file mode 100644 index 0000000..1e2656f --- /dev/null +++ b/example/natsmq/consumer/nats-jetsteam-compose.yaml @@ -0,0 +1,15 @@ +version: "3.6" + +services: + nats: + image: nats:2.1.8-alpine3.11 + command: + - "--jetstream" + - "--debug" + - "--port" + - "4222" + - "--http_port" + - "8222" + ports: + - "4222:4222" + - "8222:8222" \ No newline at end of file diff --git a/example/natsmq/publisher/config.yaml b/example/natsmq/publisher/config.yaml new file mode 100644 index 0000000..05c324d --- /dev/null +++ b/example/natsmq/publisher/config.yaml @@ -0,0 +1,4 @@ +# config.yaml + +nats: + url: "nats://127.0.0.1:4222" diff --git a/example/natsmq/publisher/publisher.go b/example/natsmq/publisher/publisher.go new file mode 100644 index 0000000..d017abd --- /dev/null +++ b/example/natsmq/publisher/publisher.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "flag" + "github.com/nats-io/nats.go" + "github.com/zeromicro/go-queue/natsmq/common" + "github.com/zeromicro/go-queue/natsmq/publisher" + "github.com/zeromicro/go-zero/core/conf" + + "log" + "time" +) + +var configFile = flag.String("f", "config.yaml", "Specify the config file") + +type PublisherExampleConfig struct { + Nats NatsConf `json:"nats"` +} + +type NatsConf struct { + URL string `json:"url"` +} + +func main() { + flag.Parse() + var c PublisherExampleConfig + conf.MustLoad(*configFile, &c) + + natsConf := &common.NatsConfig{ + URL: c.Nats.URL, + Options: []nats.Option{}, + } + + jSPublisher, err := publisher.NewJetStreamPublisher(natsConf) + + if err != nil { + log.Fatalf("failed to NewJetStreamPublisher message: %v", err) + } + + subjects := []string{ + "user.register", + "user.recharge", + "subject.activity.example", + } + messages := []string{ + "Test message: user.register message", + "Test message: user.recharge message", + "Test message: subject.activity message", + } + ctx := context.Background() + for i, subj := range subjects { + msg := []byte(messages[i]) + for j := 0; j < 3; j++ { + go func(s string, m []byte) { + ack, err := jSPublisher.Publish(ctx, s, m) + if err != nil { + log.Fatalf("failed to publish message: %v", err) + } + log.Printf("published message to %s, ack: %+v", s, ack.Stream) + }(subj, msg) + } + } + + time.Sleep(2 * time.Second) +} diff --git a/natsmq/common/common.go b/natsmq/common/common.go new file mode 100644 index 0000000..c50c5d4 --- /dev/null +++ b/natsmq/common/common.go @@ -0,0 +1,155 @@ +package common + +import ( + "context" + "fmt" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "log" + "sync" + "time" +) + +const ( + DefaultStream = "defaultStream" +) + +var ( + streamRegistry = make(map[string]*JetStreamManager) + registryLock sync.RWMutex + streamInstances = make(map[string]jetstream.Stream) + streamInstLock sync.RWMutex +) + +// RegisterManager registers a JetStreamManager with the specified streamID. +func RegisterManager(streamID string, mgr *JetStreamManager) { + registryLock.Lock() + defer registryLock.Unlock() + streamRegistry[streamID] = mgr +} + +// GetManager retrieves the JetStreamManager for the given streamID. +// Returns the manager and true if found; otherwise returns nil and false. +func GetManager(streamID string) (*JetStreamManager, bool) { + registryLock.RLock() + defer registryLock.RUnlock() + mgr, ok := streamRegistry[streamID] + return mgr, ok +} + +// RegisterStreamInstances initializes the JetStream contexts (if needed), +// creates or updates streams based on the provided JetStream configurations, +// and stores the stream instances in a global map for later usage. +// Parameters: +// +// nc - pointer to the NATS connection +// cfgs - list of JetStreamConfig configurations to register +func RegisterStreamInstances(nc *nats.Conn, cfgs []*JetStreamConfig) { + // Register managers for each provided configuration if not already registered. + if len(cfgs) > 0 { + for _, cfg := range cfgs { + if _, ok := GetManager(cfg.Name); !ok { + mgr := NewJetStream(cfg) + RegisterManager(cfg.Name, mgr) + } else { + log.Printf("manager for stream %q already registered", cfg.Name) + } + } + } + + // Iterate through all registered stream managers to initialize JetStream and create stream instances. + for streamName, streamMgr := range streamRegistry { + streamInstLock.RLock() + _, exists := streamInstances[streamName] + streamInstLock.RUnlock() + if exists { + log.Printf("streamInstance %q already created", streamName) + continue + } + // Initialize JetStream context + if err := streamMgr.InitJetStream(nc); err != nil { + log.Printf("failed to initialize jetstream for stream %q: %v", streamName, err) + continue + } + ctx := context.Background() + stream, err := streamMgr.CreateStream(ctx) + if err != nil { + log.Printf("failed to create stream %q: %v", streamName, err) + continue + } + streamInstLock.Lock() + streamInstances[streamName] = stream + streamInstLock.Unlock() + log.Printf("streamInstance %q created", streamName) + } +} + +// GetStream retrieves a JetStream stream instance by streamID. +// Returns the stream instance and true if found. +func GetStream(streamID string) (jetstream.Stream, bool) { + streamInstLock.RLock() + defer streamInstLock.RUnlock() + stream, ok := streamInstances[streamID] + return stream, ok +} + +func init() { + // Registers a default stream manager if one hasn't been registered. + if _, ok := GetManager(DefaultStream); !ok { + defaultCfg := &JetStreamConfig{ + Name: DefaultStream, + Subjects: []string{"subject.*.*"}, + Description: DefaultStream, + Retention: 0, + MaxConsumers: 30, + MaxMsgs: -1, + MaxBytes: -1, + Discard: 0, + DiscardNewPerSubject: false, + MaxAge: 0, + MaxMsgsPerSubject: 10000, + MaxMsgSize: 10000, + NoAck: false, + } + defaultManager := NewJetStream(defaultCfg) + RegisterManager(DefaultStream, defaultManager) + log.Printf("default stream %q registered", DefaultStream) + } +} + +// InitJetStream initializes the JetStream context for the manager using the given NATS connection. +// Parameters: +// +// nc - pointer to the NATS connection +// +// Returns: +// +// error - non-nil error if JetStream context creation fails +func (jsm *JetStreamManager) InitJetStream(nc *nats.Conn) error { + js, err := jetstream.New(nc) + if err != nil { + nc.Close() + return fmt.Errorf("failed to create JetStream context: %w", err) + } + jsm.JS = js + return nil +} + +// CreateStream creates or updates a JetStream stream using the manager's configuration. +// Parameters: +// +// ctx - context to control request timeout +// +// Returns: +// +// jetstream.Stream - the created/updated stream instance +// error - non-nil error if stream creation fails +func (jsm *JetStreamManager) CreateStream(ctx context.Context) (jetstream.Stream, error) { + streamCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + stream, err := jsm.JS.CreateOrUpdateStream(streamCtx, jsm.streamConf) + if err != nil { + return nil, err + } + return stream, nil +} diff --git a/natsmq/common/config.go b/natsmq/common/config.go new file mode 100644 index 0000000..ff4dc8c --- /dev/null +++ b/natsmq/common/config.go @@ -0,0 +1,73 @@ +package common + +import ( + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "time" +) + +// NatsConfig holds the configuration for connecting to a NATS server. +type NatsConfig struct { + URL string // NATS server URL + Options []nats.Option // Additional NATS connection options +} + +// JetStreamManager manages JetStream operations and holds the stream configuration. +type JetStreamManager struct { + JS jetstream.JetStream // JetStream context + streamConf jetstream.StreamConfig // Configuration for the JetStream stream +} + +// JetStreamConfig defines the configuration parameters for a JetStream stream. +type JetStreamConfig struct { + // Basic configuration + Name string `json:"name"` // Stream name (required) + Description string `json:"description,optional,default=desc"` // Stream description, default "desc" + Subjects []string `json:"subjects"` // Subjects associated with the stream (required) + + // Storage and retention policies + Retention int `json:"retention,options=0|1|2,default=0"` + MaxConsumers int `json:"maxConsumers,optional,default=-1"` + MaxMsgs int64 `json:"maxMsgs,optional,default=-1"` + MaxBytes int64 `json:"maxBytes,optional,default=-1"` + Discard int `json:"discard,options=0|1,default=0"` + DiscardNewPerSubject bool `json:"discardNewPerSubject,optional,default=false"` + + // Message lifecycle settings + MaxAge time.Duration `json:"maxAge,optional,default=0"` + MaxMsgsPerSubject int64 `json:"maxMsgsPerSubject,optional,default=10000"` + MaxMsgSize int32 `json:"maxMsgSize,optional,default=10000"` + + // Message acknowledgment policy + NoAck bool `json:"noAck,optional,default=false"` +} + +// NewJetStream creates a new instance of JetStreamManager based on the provided configuration. +// Parameters: +// +// cfg - pointer to a JetStreamConfig containing stream settings +// +// Returns: +// +// *JetStreamManager - the new JetStreamManager instance +func NewJetStream(cfg *JetStreamConfig) *JetStreamManager { + // Map custom configuration to jetstream.StreamConfig + streamConf := jetstream.StreamConfig{ + Name: cfg.Name, + Subjects: cfg.Subjects, + Retention: jetstream.RetentionPolicy(cfg.Retention), + MaxConsumers: cfg.MaxConsumers, + MaxMsgs: cfg.MaxMsgs, + MaxBytes: cfg.MaxBytes, + Discard: jetstream.DiscardPolicy(cfg.Discard), + DiscardNewPerSubject: cfg.DiscardNewPerSubject, + MaxAge: cfg.MaxAge, + MaxMsgsPerSubject: cfg.MaxMsgsPerSubject, + MaxMsgSize: cfg.MaxMsgSize, + NoAck: cfg.NoAck, + } + + return &JetStreamManager{ + streamConf: streamConf, + } +} diff --git a/natsmq/consumer/config.go b/natsmq/consumer/config.go new file mode 100644 index 0000000..a661066 --- /dev/null +++ b/natsmq/consumer/config.go @@ -0,0 +1,63 @@ +package consumer + +import ( + "github.com/nats-io/nats.go/jetstream" + "time" +) + +// ConsumerConfig combines core consumer settings with advanced parameters. +// Fields marked as optional are not strictly required, but you may want to provide +// default values in your configuration if consistency is needed. +type ConsumerConfig struct { + // Core parameters + Name string `json:"name,optional"` // Consumer name (optional) + Durable string `json:"durable,optional"` // Durable name for persistent consumer (optional) + FilterSubjects []string `json:"filterSubjects"` // Subjects filtered by this consumer (required) + Description string `json:"description,optional"` // Consumer description (optional) + + // Advanced parameters + AckPolicy int `json:"ackPolicy,options=0|1|2,default=0"` // Acknowledgment policy (e.g., 0 for auto-ack, others for manual ack) + Ordered bool `json:"ordered,optional,default=false"` // Whether the consumer is ordered + OrderedConsumerOptions OrderedConsumerOptions `json:"orderedConsumerOptions,optional"` // Extra options for ordered consumers +} + +// OrderedConsumerOptions defines additional options used when the consumer is configured to be ordered. +type OrderedConsumerOptions struct { + DeliverPolicy int `json:"deliverPolicy,options=0|1|2|3|4|5,default=0"` // Delivery policy option for ordered consumers + OptStartSeq uint64 `json:"optStartSeq,optional"` // Optional starting sequence for message consumption + OptStartTime *time.Time `json:"optStartTime,optional"` // Optional starting time for message consumption + ReplayPolicy int `json:"replayPolicy,options=0|1,default=0"` // Replay policy for ordered consumers +} + +// DeliveryConfig groups the consumption method and pull-related settings. +// ConsumptionMethod can be push-based ("consumer") or pull-based ("fetch"/"fetchNoWait"). +// FetchCount defines how many messages to pull in one batch. +type DeliveryConfig struct { + ConsumptionMethod ConsumerType `json:"consumptionMethod,options=consumer|fetch|fetchNoWait,default=consumer"` + FetchCount int `json:"fetchCount,optional,default=10"` +} + +// ConsumerQueueConfig defines the full configuration for building a consumer queue. +// If StreamName is empty, the default stream (DefaultStream) will be used. +type ConsumerQueueConfig struct { + StreamName string `json:"streamName,optional"` // Name of the stream to associate with; if empty, uses default stream + ConsumerConfig ConsumerConfig `json:"consumerConfig"` // Consumer core and advanced configuration + QueueConsumerCount int `json:"queueConsumerCount,optional,default=1"` // Number of consumer instances to create for this queue + Delivery DeliveryConfig `json:"delivery"` // Delivery settings including consumption method and pull batch size + Handler ConsumeHandler `json:"-"` // Message handler to process incoming messages +} + +// ConsumerType defines the mode of consumption: push-based or pull-based. +type ConsumerType string + +const ( + Consumer ConsumerType = "consumer" // Push-based consumption (server pushes messages) + Pull ConsumerType = "fetch" // Pull-based consumption (client actively pulls messages) + PullNoWait ConsumerType = "fetchNoWait" // Pull-based consumption without waiting for messages +) + +// ConsumeHandler defines an interface for message processing. +// Users need to implement the Consume method to handle individual messages. +type ConsumeHandler interface { + Consume(msg jetstream.Msg) error +} diff --git a/natsmq/consumer/consumer.go b/natsmq/consumer/consumer.go new file mode 100644 index 0000000..701da1b --- /dev/null +++ b/natsmq/consumer/consumer.go @@ -0,0 +1,261 @@ +package consumer + +import ( + "context" + "errors" + "fmt" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/zeromicro/go-queue/natsmq/common" + "github.com/zeromicro/go-zero/core/queue" + "log" +) + +// ConsumerManager manages consumer operations including NATS connection, JetStream stream initialization, +// and consumer queue creation and subscription. +type ConsumerManager struct { + // NATS connection configuration and instance. + nc *nats.Conn + natsConf *common.NatsConfig + + // List of global JetStream configurations parsed from a config file. + JetStreamConfigs []*common.JetStreamConfig + + // List of consumer queue configurations. + queues []*ConsumerQueueConfig + + // Channel to signal stop of the consumer manager. + stopCh chan struct{} + + // List of push-based subscription contexts. + subscribers []jetstream.ConsumeContext + + // List of cancel functions for managing pull-based consumption contexts. + cancelFuncs []context.CancelFunc +} + +// NewConsumerManager creates a new ConsumerManager instance. +// Parameters: +// +// natsConf - pointer to NatsConfig defining the NATS connection settings +// jetStreamConfigs - list of JetStreamConfig, used to initialize global streams +// cq - list of ConsumerQueueConfig defining individual consumer queue settings +// +// Returns: +// +// queue.MessageQueue - the created consumer manager (implements MessageQueue) +// error - error if no consumer queues provided or if connection fails +func NewConsumerManager(natsConf *common.NatsConfig, jetStreamConfigs []*common.JetStreamConfig, cq []*ConsumerQueueConfig) (queue.MessageQueue, error) { + if len(cq) == 0 { + return nil, errors.New("no consumer queues provided") + } + cm := &ConsumerManager{ + natsConf: natsConf, + JetStreamConfigs: jetStreamConfigs, + queues: cq, + stopCh: make(chan struct{}), + subscribers: []jetstream.ConsumeContext{}, + cancelFuncs: []context.CancelFunc{}, + } + if err := cm.connectToNATS(); err != nil { + return nil, err + } + return cm, nil +} + +// connectToNATS establishes a connection to the NATS server. +// Returns: +// +// error - non-nil error if connection fails +func (cm *ConsumerManager) connectToNATS() error { + nc, err := nats.Connect(cm.natsConf.URL, cm.natsConf.Options...) + if err != nil { + return fmt.Errorf("failed to connect to NATS: %w", err) + } + cm.nc = nc + return nil +} + +// Start initializes stream instances and creates consumers according to the provided consumer queue configurations. +// This function blocks until Stop() is invoked. +func (cm *ConsumerManager) Start() { + // Initialize stream instances (register or update streams) based on the provided JetStream configurations. + common.RegisterStreamInstances(cm.nc, cm.JetStreamConfigs) + + // Iterate over each consumer queue configuration to create the consumer on the corresponding stream. + for _, cfg := range cm.queues { + var stream jetstream.Stream + if cfg.StreamName != "" { + s, ok := common.GetStream(cfg.StreamName) + if !ok { + log.Printf("stream %s not found, skipping consumer: %s", cfg.StreamName, cfg.ConsumerConfig.Name) + continue + } else { + stream = s + } + } else { + s, ok := common.GetStream(common.DefaultStream) + if !ok { + log.Printf("default stream not found, skipping consumer: %s", cfg.ConsumerConfig.Name) + continue + } + stream = s + } + + ctx := context.Background() + if err := cm.createConsumer(ctx, cfg, stream); err != nil { + log.Printf("failed to create consumer %s: %v", cfg.ConsumerConfig.Name, err) + continue + } + } + <-cm.stopCh +} + +// createConsumer creates a consumer for a given queue configuration and attaches it to the provided JetStream stream. +// Parameters: +// +// ctx - context to manage cancellation and timeout during consumer creation +// cfg - pointer to ConsumerQueueConfig containing consumer settings and delivery options +// stream - JetStream stream instance to be used +// +// Returns: +// +// error - non-nil error if creating the consumer or subscribing fails +func (cm *ConsumerManager) createConsumer(ctx context.Context, cfg *ConsumerQueueConfig, stream jetstream.Stream) error { + var consumer jetstream.Consumer + var err error + + // Create an ordered consumer or a standard consumer based on the configuration. + if cfg.ConsumerConfig.Ordered { + opts := jetstream.OrderedConsumerConfig{ + FilterSubjects: cfg.ConsumerConfig.FilterSubjects, + DeliverPolicy: jetstream.DeliverPolicy(cfg.ConsumerConfig.OrderedConsumerOptions.DeliverPolicy), + OptStartSeq: cfg.ConsumerConfig.OrderedConsumerOptions.OptStartSeq, + OptStartTime: cfg.ConsumerConfig.OrderedConsumerOptions.OptStartTime, + ReplayPolicy: jetstream.ReplayPolicy(cfg.ConsumerConfig.OrderedConsumerOptions.ReplayPolicy), + } + consumer, err = stream.OrderedConsumer(ctx, opts) + if err != nil { + return fmt.Errorf("failed to create ordered consumer: %w", err) + } + } else { + consumer, err = stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Name: cfg.ConsumerConfig.Name, + Durable: cfg.ConsumerConfig.Durable, + Description: cfg.ConsumerConfig.Description, + FilterSubjects: cfg.ConsumerConfig.FilterSubjects, + AckPolicy: jetstream.AckPolicy(cfg.ConsumerConfig.AckPolicy), + }) + if err != nil { + return fmt.Errorf("failed to create standard consumer: %w", err) + } + } + + // Create consumer instances based on the specified number for the consumer queue. Each instance + // uses either push-based (subscription) or pull-based consumption. + for i := 0; i < cfg.QueueConsumerCount; i++ { + log.Printf("Consumer [%s] instance [%d] with filterSubjects %v created successfully", cfg.ConsumerConfig.Name, i, cfg.ConsumerConfig.FilterSubjects) + switch cfg.Delivery.ConsumptionMethod { + case Consumer: + consumerCtx, err := cm.consumerSubscription(consumer, cfg) + if err != nil { + return fmt.Errorf("failed to subscribe to push messages: %w", err) + } + cm.subscribers = append(cm.subscribers, consumerCtx) + case Pull, PullNoWait: + pullFn := func(num int) (jetstream.MessageBatch, error) { + if cfg.Delivery.ConsumptionMethod == Pull { + return consumer.Fetch(num) + } + return consumer.FetchNoWait(num) + } + pullCtx, cancel := context.WithCancel(ctx) + cm.cancelFuncs = append(cm.cancelFuncs, cancel) + go cm.runPullMessages(pullCtx, cfg, pullFn) + default: + return fmt.Errorf("unsupported consumption method: %v", cfg.Delivery.ConsumptionMethod) + } + } + return nil +} + +// consumerSubscription sets up a push-based subscription using the provided JetStream consumer. +// Parameters: +// +// consumer - JetStream consumer instance to be used for subscription +// cfg - pointer to ConsumerQueueConfig containing consumer settings and the message handler +// +// Returns: +// +// jetstream.ConsumeContext - context to manage the subscription lifecycle +// error - non-nil error if subscription fails +func (cm *ConsumerManager) consumerSubscription(consumer jetstream.Consumer, cfg *ConsumerQueueConfig) (jetstream.ConsumeContext, error) { + consumerCtx, err := consumer.Consume(func(msg jetstream.Msg) { + cm.ackMessage(cfg, msg) + }) + if err != nil { + return nil, fmt.Errorf("failed to subscribe to messages: %w", err) + } + return consumerCtx, nil +} + +// ackMessage processes a message using the user-provided handler and acknowledges the message if required. +// Parameters: +// +// cfg - pointer to ConsumerQueueConfig containing the message handler and acknowledgement settings +// msg - the JetStream message to process +func (cm *ConsumerManager) ackMessage(cfg *ConsumerQueueConfig, msg jetstream.Msg) { + if err := cfg.Handler.Consume(msg); err != nil { + log.Printf("message processing error: %v", err) + return + } + + // Acknowledge the message unless using AckNonePolicy or in an ordered consumer scenario. + if jetstream.AckPolicy(cfg.ConsumerConfig.AckPolicy) != jetstream.AckNonePolicy && !cfg.ConsumerConfig.Ordered { + if err := msg.Ack(); err != nil { + log.Printf("failed to acknowledge message: %v", err) + } + } +} + +// runPullMessages continuously pulls messages in batches using the provided fetch function. +// Parameters: +// +// ctx - context to control the pull loop (supports cancellation) +// cfg - pointer to ConsumerQueueConfig with pull configuration, including the fetch count +// fetchFn - function that fetches a batch of messages; takes an integer defining the number of messages +func (cm *ConsumerManager) runPullMessages(ctx context.Context, cfg *ConsumerQueueConfig, fetchFn func(num int) (jetstream.MessageBatch, error)) { + for { + select { + case <-ctx.Done(): + return + default: + } + msgs, err := fetchFn(cfg.Delivery.FetchCount) + if err != nil { + log.Printf("error fetching messages: %v", err) + continue + } + for msg := range msgs.Messages() { + cm.ackMessage(cfg, msg) + } + if fetchErr := msgs.Error(); fetchErr != nil { + log.Printf("error after fetching messages: %v", fetchErr) + } + } +} + +// Stop terminates all active subscriptions and pull routines, +// closes the underlying NATS connection, and signals exit via stopCh. +func (cm *ConsumerManager) Stop() { + for _, consumerCtx := range cm.subscribers { + consumerCtx.Stop() + } + for _, cancel := range cm.cancelFuncs { + cancel() + } + if cm.nc != nil { + cm.nc.Close() + } + close(cm.stopCh) +} diff --git a/natsmq/publisher/publisher.go b/natsmq/publisher/publisher.go new file mode 100644 index 0000000..d3fe201 --- /dev/null +++ b/natsmq/publisher/publisher.go @@ -0,0 +1,84 @@ +package publisher + +import ( + "context" + "fmt" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/zeromicro/go-queue/natsmq/common" + "log" +) + +// JetStreamPublisher implements the Publisher interface by utilizing an internal JetStream context for message publishing. +// Note: It is recommended to rename the package from "publiser" to "publisher" for better clarity. +type JetStreamPublisher struct { + natsConf *common.NatsConfig + conn *nats.Conn + js jetstream.JetStream +} + +// NewJetStreamPublisher creates a new JetStreamPublisher instance based on the provided NATS configuration. +func NewJetStreamPublisher(natsConf *common.NatsConfig) (*JetStreamPublisher, error) { + p := &JetStreamPublisher{ + natsConf: natsConf, + } + + // Establish a connection to the NATS server using the URL and options from the configuration. + if err := p.connectToNATS(); err != nil { + return nil, fmt.Errorf("connect to NATS failed: %w", err) + } + + // Initialize the JetStream context. + if err := p.initJetStream(); err != nil { + return nil, fmt.Errorf("initialize JetStream failed: %w", err) + } + + return p, nil +} + +// connectToNATS establishes a connection to the NATS server using the URL and Options specified in the configuration. +func (p *JetStreamPublisher) connectToNATS() error { + conn, err := nats.Connect(p.natsConf.URL, p.natsConf.Options...) + if err != nil { + return fmt.Errorf("failed to connect to NATS at %s: %w", p.natsConf.URL, err) + } + p.conn = conn + log.Printf("Successfully connected to NATS: %s", p.natsConf.URL) + return nil +} + +// initJetStream initializes the JetStream context. +// For newer versions of the NATS library, consider using p.conn.JetStream() instead. +func (p *JetStreamPublisher) initJetStream() error { + // Using the jetstream.New method (legacy API); if there are no special requirements, + // you can switch to the newer API: + // js, err := p.conn.JetStream() + // if err != nil { + // return fmt.Errorf("failed to create JetStream context: %w", err) + // } + // p.js = js + js, err := jetstream.New(p.conn) + if err != nil { + return fmt.Errorf("failed to create JetStream context: %w", err) + } + p.js = js + log.Printf("JetStream context initialized") + return nil +} + +// Publish synchronously publishes a message to the specified subject and waits for a server acknowledgment. +func (p *JetStreamPublisher) Publish(ctx context.Context, subject string, payload []byte) (*jetstream.PubAck, error) { + ack, err := p.js.Publish(ctx, subject, payload) + if err != nil { + return nil, fmt.Errorf("failed to publish message on subject %s: %w", subject, err) + } + return ack, nil +} + +// Close terminates the NATS connection used by the JetStreamPublisher and releases all associated resources. +func (p *JetStreamPublisher) Close() { + if p.conn != nil { + p.conn.Close() + log.Printf("NATS connection closed") + } +}