-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathqueue.go
72 lines (63 loc) · 2 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package memamq
import (
"errors"
"sync"
"time"
)
// AsyncQueue is the interface responsible for asynchronous processing of functions.
type AsyncQueue interface {
Initialize(processFunc func(), workerCount int, bufferSize int)
Push(task func()) error
}
// MemoryQueue is an implementation of the AsyncQueue interface using a channel to process functions.
type MemoryQueue struct {
taskChan chan func()
wg sync.WaitGroup
isStopped bool
stopMutex sync.Mutex // Mutex to protect access to isStopped
}
func NewMemoryQueue(workerCount int, bufferSize int) *MemoryQueue {
mq := &MemoryQueue{} // Create a new instance of MemoryQueue
mq.Initialize(workerCount, bufferSize) // Initialize it with specified parameters
return mq
}
// Initialize sets up the worker nodes and the buffer size of the channel,
// starting internal goroutines to handle tasks from the channel.
func (mq *MemoryQueue) Initialize(workerCount int, bufferSize int) {
mq.taskChan = make(chan func(), bufferSize) // Initialize the channel with the provided buffer size.
mq.isStopped = false
// Start multiple goroutines based on the specified workerCount.
for i := 0; i < workerCount; i++ {
mq.wg.Add(1)
go func(workerID int) {
defer mq.wg.Done()
for task := range mq.taskChan {
task() // Execute the function
}
}(i)
}
}
// Push submits a function to the queue.
// Returns an error if the queue is stopped or if the queue is full.
func (mq *MemoryQueue) Push(task func()) error {
mq.stopMutex.Lock()
if mq.isStopped {
mq.stopMutex.Unlock()
return errors.New("push failed: queue is stopped")
}
mq.stopMutex.Unlock()
select {
case mq.taskChan <- task:
return nil
case <-time.After(time.Millisecond * 100): // Timeout to prevent deadlock/blocking
return errors.New("push failed: queue is full")
}
}
// Stop is used to terminate the internal goroutines and close the channel.
func (mq *MemoryQueue) Stop() {
mq.stopMutex.Lock()
mq.isStopped = true
close(mq.taskChan)
mq.stopMutex.Unlock()
mq.wg.Wait()
}