diff --git a/Dockerfile b/Dockerfile index 5b5268d..5ebb57b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,9 @@ ENV DF_DOCKER_HOST="unix:///var/run/docker.sock" \ DF_NOTIFY_LABEL="com.df.notify" \ DF_INCLUDE_NODE_IP_INFO="false" \ DF_SERVICE_POLLING_INTERVAL="-1" \ - DF_USE_DOCKER_SERVICE_EVENTS="true" + DF_USE_DOCKER_SERVICE_EVENTS="true" \ + DF_NODE_POLLING_INTERVAL="-1" \ + DF_USE_DOCKER_NODE_EVENTS="true" EXPOSE 8080 diff --git a/args.go b/args.go index 65f1ba8..8424060 100644 --- a/args.go +++ b/args.go @@ -7,6 +7,7 @@ import ( type args struct { ServicePollingInterval int + NodePollingInterval int Retry int RetryInterval int } @@ -14,8 +15,9 @@ type args struct { func getArgs() *args { return &args{ ServicePollingInterval: getValue(-1, "DF_SERVICE_POLLING_INTERVAL"), - Retry: getValue(1, "DF_RETRY"), - RetryInterval: getValue(0, "DF_RETRY_INTERVAL"), + NodePollingInterval: getValue(-1, "DF_NODE_POLLING_INTERVAL"), + Retry: getValue(1, "DF_RETRY"), + RetryInterval: getValue(0, "DF_RETRY_INTERVAL"), } } diff --git a/docs/config.md b/docs/config.md index 1d0c646..21e25d7 100644 --- a/docs/config.md +++ b/docs/config.md @@ -15,3 +15,5 @@ The following environment variables can be used when creating the `swarm-listene |DF_RETRY_INTERVAL |Time between each notificationo request retry, in seconds.
**Default**: `5`
**Example**:`10`| |DF_SERVICE_POLLING_INTERVAL |Time between each service polling request, in seconds. When this value is set less than or equal to zero, service polling is disabled.
**Default**: `-1`
**Example**:`20`| |DF_USE_DOCKER_SERVICE_EVENTS|Use docker events api to get service updates.
**Default**:`true`| +|DF_NODE_POLLING_INTERVAL |Time between each node polling request, in seconds. When this value is set less than or equal to zero, node polling is disabled.
**Default**: `-1`
**Example**:`20`| +|DF_USE_DOCKER_NODE_EVENTS|Use docker events api to get node updates.
**Default**:`true`| diff --git a/main.go b/main.go index bbf0144..42d0f54 100644 --- a/main.go +++ b/main.go @@ -12,7 +12,9 @@ func main() { l.Printf("Starting Docker Flow: Swarm Listener") args := getArgs() - swarmListener, err := service.NewSwarmListenerFromEnv(args.Retry, args.RetryInterval, args.ServicePollingInterval, l) + swarmListener, err := service.NewSwarmListenerFromEnv( + args.Retry, args.RetryInterval, + args.ServicePollingInterval, args.NodePollingInterval, l) if err != nil { l.Printf("Failed to initialize Docker Flow: Swarm Listener") l.Printf("ERROR: %v", err) diff --git a/service/mocks.go b/service/mocks.go index d170ed5..775af6c 100644 --- a/service/mocks.go +++ b/service/mocks.go @@ -131,6 +131,16 @@ func (m *nodeCacherMock) Get(ID string) (NodeMini, bool) { return args.Get(0).(NodeMini), args.Bool(1) } +func (m *nodeCacherMock) IsNewOrUpdated(n NodeMini) bool { + args := m.Called(n) + return args.Bool(0) +} + +func (m *nodeCacherMock) Keys() map[string]struct{} { + args := m.Called() + return args.Get(0).(map[string]struct{}) +} + type notifyDistributorMock struct { mock.Mock } @@ -154,3 +164,11 @@ type swarmServicePollingMock struct { func (m *swarmServicePollingMock) Run(eventChan chan<- Event) { m.Called(eventChan) } + +type nodePollingMock struct { + mock.Mock +} + +func (m *nodePollingMock) Run(eventChan chan<- Event) { + m.Called(eventChan) +} diff --git a/service/nodecache.go b/service/nodecache.go index 808b682..70e1d53 100644 --- a/service/nodecache.go +++ b/service/nodecache.go @@ -1,16 +1,21 @@ package service +import "sync" + // NodeCacher caches sevices type NodeCacher interface { InsertAndCheck(n NodeMini) bool + IsNewOrUpdated(n NodeMini) bool Delete(ID string) Get(ID string) (NodeMini, bool) + Keys() map[string]struct{} } // NodeCache implements `NodeCacher` // Not threadsafe! type NodeCache struct { cache map[string]NodeMini + mux sync.RWMutex } // NewNodeCache creates a new `NewNodeCache` @@ -23,6 +28,9 @@ func NewNodeCache() *NodeCache { // InsertAndCheck inserts `NodeMini` into cache // If the node is new or updated `InsertAndCheck` returns true. func (c *NodeCache) InsertAndCheck(n NodeMini) bool { + c.mux.Lock() + defer c.mux.Unlock() + cachedNode, ok := c.cache[n.ID] c.cache[n.ID] = n @@ -31,11 +39,37 @@ func (c *NodeCache) InsertAndCheck(n NodeMini) bool { // Delete removes node from cache func (c *NodeCache) Delete(ID string) { + c.mux.Lock() + defer c.mux.Unlock() + delete(c.cache, ID) } // Get gets node from cache -func (c NodeCache) Get(ID string) (NodeMini, bool) { +func (c *NodeCache) Get(ID string) (NodeMini, bool) { + c.mux.RLock() + defer c.mux.RUnlock() + v, ok := c.cache[ID] return v, ok } + +// IsNewOrUpdated returns true if node is new or updated +func (c *NodeCache) IsNewOrUpdated(n NodeMini) bool { + c.mux.RLock() + defer c.mux.RUnlock() + + cachedNode, ok := c.cache[n.ID] + return !ok || !n.Equal(cachedNode) +} + +// Keys return the keys of the cache +func (c *NodeCache) Keys() map[string]struct{} { + c.mux.RLock() + defer c.mux.RUnlock() + output := map[string]struct{}{} + for key := range c.cache { + output[key] = struct{}{} + } + return output +} diff --git a/service/nodecache_test.go b/service/nodecache_test.go index b51a1d1..6298cb6 100644 --- a/service/nodecache_test.go +++ b/service/nodecache_test.go @@ -149,6 +149,43 @@ func (s *NodeCacheTestSuite) Test_GetAndRemove_NotInCache_ReturnsFalse() { s.False(ok) } +func (s *NodeCacheTestSuite) Test_IsNewOrUpdated_NodeInCache() { + s.Cache.InsertAndCheck(s.NMini) + s.AssertInCache(s.NMini) + + newOrUpdated := s.Cache.IsNewOrUpdated(s.NMini) + s.False(newOrUpdated) +} + +func (s *NodeCacheTestSuite) Test_IsNewOrUpdated_NodeNotInCache() { + newOrUpdated := s.Cache.IsNewOrUpdated(s.NMini) + s.True(newOrUpdated) +} + +func (s *NodeCacheTestSuite) Test_IsNewOrUpdated_NodeIsDifferentCache() { + + s.Cache.InsertAndCheck(s.NMini) + s.AssertInCache(s.NMini) + + anotherNMini := getNewNodeMini() + anotherNMini.State = swarm.NodeStateDown + + newOrUpdated := s.Cache.IsNewOrUpdated(anotherNMini) + s.True(newOrUpdated) + +} + +func (s *NodeCacheTestSuite) Test_Keys() { + s.Cache.InsertAndCheck(s.NMini) + s.AssertInCache(s.NMini) + + keys := s.Cache.Keys() + + s.Require().Len(keys, 1) + s.Contains(keys, s.NMini.ID) + +} + func (s *NodeCacheTestSuite) AssertInCache(nm NodeMini) { ss, ok := s.Cache.Get(nm.ID) s.True(ok) diff --git a/service/nodepoller.go b/service/nodepoller.go new file mode 100644 index 0000000..fbcac15 --- /dev/null +++ b/service/nodepoller.go @@ -0,0 +1,87 @@ +package service + +import ( + "context" + "log" + "time" + + "github.com/docker/docker/api/types/swarm" +) + +// NodePolling provides an interface for polling node changes +type NodePolling interface { + Run(eventChan chan<- Event) +} + +// NodePoller implements `NodePolling` +type NodePoller struct { + Client NodeInspector + Cache NodeCacher + PollingInterval int + MinifyFunc func(swarm.Node) NodeMini + Log *log.Logger +} + +// NewNodePoller creates a new `NodePoller` +func NewNodePoller( + client NodeInspector, + cache NodeCacher, + pollingInterval int, + minifyFunc func(swarm.Node) NodeMini, + log *log.Logger, +) *NodePoller { + return &NodePoller{ + Client: client, + Cache: cache, + PollingInterval: pollingInterval, + MinifyFunc: minifyFunc, + Log: log, + } +} + +// Run starts poller and places events onto `eventChan` +func (n NodePoller) Run(eventChan chan<- Event) { + + if n.PollingInterval <= 0 { + return + } + + ctx := context.Background() + + n.Log.Printf("Polling for Node Changes") + time.Sleep(time.Duration(n.PollingInterval) * time.Second) + + for { + nodes, err := n.Client.NodeList(ctx) + if err != nil { + n.Log.Printf("ERROR (NodePoller): %v", err) + } else { + nowTimeNano := time.Now().UTC().UnixNano() + keys := n.Cache.Keys() + for _, node := range nodes { + delete(keys, node.ID) + + nodeMini := n.MinifyFunc(node) + if n.Cache.IsNewOrUpdated(nodeMini) { + eventChan <- Event{ + Type: EventTypeCreate, + ID: node.ID, + TimeNano: nowTimeNano, + UseCache: true, + } + } + } + + // Remaining key sare removal events + for k := range keys { + eventChan <- Event{ + Type: EventTypeRemove, + ID: k, + TimeNano: nowTimeNano, + UseCache: true, + } + } + } + time.Sleep(time.Duration(n.PollingInterval) * time.Second) + } +} diff --git a/service/nodepoller_test.go b/service/nodepoller_test.go new file mode 100644 index 0000000..41d8d98 --- /dev/null +++ b/service/nodepoller_test.go @@ -0,0 +1,190 @@ +package service + +import ( + "bytes" + "log" + "testing" + "time" + + "github.com/docker/docker/api/types/swarm" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type NodePollerTestSuite struct { + suite.Suite + NodeClientMock *nodeInspectorMock + NodeCacheMock *nodeCacherMock + + NodePoller *NodePoller + Logger *log.Logger + LogBytes *bytes.Buffer +} + +func TestNodePollerUnitTestSuite(t *testing.T) { + suite.Run(t, new(NodePollerTestSuite)) +} + +func (s *NodePollerTestSuite) SetupTest() { + s.NodeClientMock = new(nodeInspectorMock) + s.NodeCacheMock = new(nodeCacherMock) + + s.LogBytes = new(bytes.Buffer) + s.Logger = log.New(s.LogBytes, "", 0) + + s.NodePoller = NewNodePoller( + s.NodeClientMock, + s.NodeCacheMock, + 1, + MinifyNode, + s.Logger, + ) +} + +func (s *NodePollerTestSuite) Test_Run_NoCache() { + + expNodes := []swarm.Node{ + {ID: "nodeID1"}, {ID: "nodeID2"}, + } + keys := map[string]struct{}{} + miniNode1 := NodeMini{ID: "nodeID1", EngineLabels: map[string]string{}, NodeLabels: map[string]string{}} + miniNode2 := NodeMini{ID: "nodeID2", EngineLabels: map[string]string{}, NodeLabels: map[string]string{}} + + eventChan := make(chan Event) + + s.NodeClientMock. + On("NodeList", mock.AnythingOfType("*context.emptyCtx")).Return(expNodes, nil) + + s.NodeCacheMock. + On("Keys").Return(keys). + On("IsNewOrUpdated", miniNode1).Return(true). + On("IsNewOrUpdated", miniNode2).Return(true) + + go s.NodePoller.Run(eventChan) + + timeout := time.NewTimer(time.Second * 5).C + eventsNum := 0 + + for { + if eventsNum == 2 { + break + } + select { + case event := <-eventChan: + s.Require().Equal(EventTypeCreate, event.Type) + eventsNum++ + case <-timeout: + s.FailNow("Timeout") + } + } + + s.Equal(2, eventsNum) + s.NodeClientMock.AssertExpectations(s.T()) + s.NodeCacheMock.AssertExpectations(s.T()) +} + +func (s *NodePollerTestSuite) Test_Run_HalfInCache() { + expNodes := []swarm.Node{ + {ID: "nodeID1"}, {ID: "nodeID2"}, + } + miniNode1 := NodeMini{ID: "nodeID1", EngineLabels: map[string]string{}, NodeLabels: map[string]string{}} + miniNode2 := NodeMini{ID: "nodeID2", EngineLabels: map[string]string{}, NodeLabels: map[string]string{}} + + keys := map[string]struct{}{} + keys["nodeID1"] = struct{}{} + + eventChan := make(chan Event) + + s.NodeClientMock. + On("NodeList", mock.AnythingOfType("*context.emptyCtx")).Return(expNodes, nil) + + s.NodeCacheMock. + On("Keys").Return(keys). + On("IsNewOrUpdated", miniNode1).Return(false). + On("IsNewOrUpdated", miniNode2).Return(true) + + go s.NodePoller.Run(eventChan) + + timeout := time.NewTimer(time.Second * 5).C + var eventCreate *Event + eventsNum := 0 + + for { + if eventsNum == 1 { + break + } + select { + case event := <-eventChan: + if event.ID == "nodeID2" { + eventCreate = &event + } + eventsNum++ + case <-timeout: + s.FailNow("Timeout") + } + } + + s.Equal(1, eventsNum) + s.Require().NotNil(eventCreate) + + s.Equal("nodeID2", eventCreate.ID) + s.NodeClientMock.AssertExpectations(s.T()) + s.NodeCacheMock.AssertExpectations(s.T()) +} + +func (s *NodePollerTestSuite) Test_Run_MoreInCache() { + expNodes := []swarm.Node{ + {ID: "nodeID1"}, {ID: "nodeID2"}, + } + miniNode1 := NodeMini{ID: "nodeID1", EngineLabels: map[string]string{}, NodeLabels: map[string]string{}} + miniNode2 := NodeMini{ID: "nodeID2", EngineLabels: map[string]string{}, NodeLabels: map[string]string{}} + + keys := map[string]struct{}{} + keys["nodeID1"] = struct{}{} + keys["nodeID2"] = struct{}{} + keys["nodeID3"] = struct{}{} + + eventChan := make(chan Event) + + s.NodeClientMock. + On("NodeList", mock.AnythingOfType("*context.emptyCtx")).Return(expNodes, nil) + + s.NodeCacheMock. + On("Keys").Return(keys). + On("IsNewOrUpdated", miniNode1).Return(true). + On("IsNewOrUpdated", miniNode2).Return(false) + + go s.NodePoller.Run(eventChan) + + timeout := time.NewTimer(time.Second * 5).C + var eventCreate *Event + var eventRemove *Event + eventsNum := 0 + + for { + if eventsNum == 2 { + break + } + select { + case event := <-eventChan: + if event.ID == "nodeID1" { + eventCreate = &event + } else if event.ID == "nodeID3" { + eventRemove = &event + } + eventsNum++ + case <-timeout: + s.FailNow("Timeout") + } + } + + s.Equal(2, eventsNum) + s.Require().NotNil(eventCreate) + s.Require().NotNil(eventRemove) + + s.Equal("nodeID1", eventCreate.ID) + s.Equal("nodeID3", eventRemove.ID) + s.NodeClientMock.AssertExpectations(s.T()) + s.NodeCacheMock.AssertExpectations(s.T()) + +} diff --git a/service/swarmlistener.go b/service/swarmlistener.go index 76f084c..6b438c0 100644 --- a/service/swarmlistener.go +++ b/service/swarmlistener.go @@ -32,9 +32,11 @@ type SwarmListener struct { SSEventChan chan Event SSNotificationChan chan Notification - NodeListener NodeListening - NodeClient NodeInspector - NodeCache NodeCacher + NodeListener NodeListening + NodeClient NodeInspector + NodeCache NodeCacher + NodePoller NodePolling + NodeEventChan chan Event NodeNotificationChan chan Notification @@ -44,6 +46,7 @@ type SwarmListener struct { NodeCancelManager CancelManaging IncludeNodeInfo bool UseDockerServiceEvents bool + UseDockerNodeEvents bool IgnoreKey string IncludeKey string Log *log.Logger @@ -60,6 +63,8 @@ func newSwarmListener( nodeListener NodeListening, nodeClient NodeInspector, nodeCache NodeCacher, + nodePoller NodePolling, + nodeEventChan chan Event, nodeNotificationChan chan Notification, @@ -69,28 +74,32 @@ func newSwarmListener( nodeCancelManager CancelManaging, includeNodeInfo bool, useDockerServiceEvents bool, + useDockerNodeEvents bool, ignoreKey string, includeKey string, logger *log.Logger, ) *SwarmListener { return &SwarmListener{ - SSListener: ssListener, - SSClient: ssClient, - SSCache: ssCache, - SSPoller: ssPoller, - SSEventChan: ssEventChan, - SSNotificationChan: ssNotificationChan, - NodeListener: nodeListener, - NodeClient: nodeClient, - NodeCache: nodeCache, - NodeEventChan: nodeEventChan, - NodeNotificationChan: nodeNotificationChan, - NotifyDistributor: notifyDistributor, - ServiceCancelManager: serviceCancelManager, - NodeCancelManager: nodeCancelManager, + SSListener: ssListener, + SSClient: ssClient, + SSCache: ssCache, + SSPoller: ssPoller, + SSEventChan: ssEventChan, + SSNotificationChan: ssNotificationChan, + NodeListener: nodeListener, + NodeClient: nodeClient, + NodeCache: nodeCache, + NodePoller: nodePoller, + NodeEventChan: nodeEventChan, + NodeNotificationChan: nodeNotificationChan, + NotifyDistributor: notifyDistributor, + ServiceCancelManager: serviceCancelManager, + NodeCancelManager: nodeCancelManager, + IncludeNodeInfo: includeNodeInfo, UseDockerServiceEvents: useDockerServiceEvents, + UseDockerNodeEvents: useDockerNodeEvents, IgnoreKey: ignoreKey, IncludeKey: includeKey, Log: logger, @@ -99,7 +108,8 @@ func newSwarmListener( // NewSwarmListenerFromEnv creats `SwarmListener` from environment variables func NewSwarmListenerFromEnv( - retries, interval, servicePollingInterval int, logger *log.Logger) (*SwarmListener, error) { + retries, interval, servicePollingInterval, + nodePollingInterval int, logger *log.Logger) (*SwarmListener, error) { ignoreKey := os.Getenv("DF_NOTIFY_LABEL") includeNodeInfo, err := strconv.ParseBool(os.Getenv("DF_INCLUDE_NODE_IP_INFO")) if err != nil { @@ -109,6 +119,10 @@ func NewSwarmListenerFromEnv( if err != nil { useDockerServiceEvents = false } + useDockerNodeEvents, err := strconv.ParseBool(os.Getenv("DF_USE_DOCKER_NODE_EVENTS")) + if err != nil { + useDockerNodeEvents = false + } dockerClient, err := NewDockerClientFromEnv() if err != nil { @@ -143,12 +157,13 @@ func NewSwarmListenerFromEnv( nodeEventChan = make(chan Event) nodeNotificationChan = make(chan Notification) } - ssPoller := NewSwarmServicePoller( ssClient, ssCache, servicePollingInterval, includeNodeInfo, func(ss SwarmService) SwarmServiceMini { return MinifySwarmService(ss, ignoreKey, "com.docker.stack.namespace") }, logger) + nodePoller := NewNodePoller( + nodeClient, nodeCache, nodePollingInterval, MinifyNode, logger) return newSwarmListener( ssListener, @@ -160,6 +175,7 @@ func NewSwarmListenerFromEnv( nodeListener, nodeClient, nodeCache, + nodePoller, nodeEventChan, nodeNotificationChan, notifyDistributor, @@ -167,6 +183,7 @@ func NewSwarmListenerFromEnv( NewCancelManager(), includeNodeInfo, useDockerServiceEvents, + useDockerNodeEvents, ignoreKey, "com.docker.stack.namespace", logger, @@ -189,8 +206,13 @@ func (l *SwarmListener) Run() { } if l.NotifyDistributor.HasNodeListeners() { l.connectNodeChannels() - l.NodeListener.ListenForNodeEvents(l.NodeEventChan) - l.Log.Printf("Listening to Docker Node Events") + + if l.UseDockerNodeEvents { + l.NodeListener.ListenForNodeEvents(l.NodeEventChan) + l.Log.Printf("Listening to Docker Node Events") + } + + go l.NodePoller.Run(l.NodeEventChan) } l.NotifyDistributor.Run(l.SSNotificationChan, l.NodeNotificationChan) diff --git a/service/swarmlistener_test.go b/service/swarmlistener_test.go index cd30324..13cf54d 100644 --- a/service/swarmlistener_test.go +++ b/service/swarmlistener_test.go @@ -22,7 +22,8 @@ type SwarmListenerTestSuite struct { NodeClientMock *nodeInspectorMock NodeCacheMock *nodeCacherMock - SSPoller *swarmServicePollingMock + SSPollerMock *swarmServicePollingMock + NodePollerMock *nodePollingMock NotifyDistributorMock *notifyDistributorMock @@ -44,7 +45,8 @@ func (s *SwarmListenerTestSuite) SetupTest() { s.NodeClientMock = new(nodeInspectorMock) s.NodeCacheMock = new(nodeCacherMock) - s.SSPoller = new(swarmServicePollingMock) + s.SSPollerMock = new(swarmServicePollingMock) + s.NodePollerMock = new(nodePollingMock) s.NotifyDistributorMock = new(notifyDistributorMock) s.LogBytes = new(bytes.Buffer) @@ -54,12 +56,13 @@ func (s *SwarmListenerTestSuite) SetupTest() { s.SSListenerMock, s.SSClientMock, s.SSCacheMock, - s.SSPoller, + s.SSPollerMock, make(chan Event), make(chan Notification), s.NodeListeningMock, s.NodeClientMock, s.NodeCacheMock, + s.NodePollerMock, make(chan Event), make(chan Notification), s.NotifyDistributorMock, @@ -67,6 +70,7 @@ func (s *SwarmListenerTestSuite) SetupTest() { NewCancelManager(), false, true, + true, "com.df.notify", "com.docker.stack.namespace", s.Logger, @@ -95,7 +99,7 @@ func (s *SwarmListenerTestSuite) Test_Run_ServicesChannel() { On("HasServiceListeners").Return(true). On("HasNodeListeners").Return(false). On("Run", mock.AnythingOfType("<-chan service.Notification"), mock.AnythingOfType("<-chan service.Notification")) - s.SSPoller. + s.SSPollerMock. On("Run", mock.AnythingOfType("chan<- service.Event")) s.SwarmListener.Run() @@ -157,7 +161,7 @@ func (s *SwarmListenerTestSuite) Test_Run_ServicesChannel() { s.SSClientMock.AssertExpectations(s.T()) s.SSCacheMock.AssertExpectations(s.T()) s.NotifyDistributorMock.AssertExpectations(s.T()) - s.SSPoller.AssertExpectations(s.T()) + s.SSPollerMock.AssertExpectations(s.T()) } @@ -186,6 +190,8 @@ func (s *SwarmListenerTestSuite) Test_Run_NodeChannel() { On("HasServiceListeners").Return(false). On("HasNodeListeners").Return(true). On("Run", mock.AnythingOfType("<-chan service.Notification"), mock.AnythingOfType("<-chan service.Notification")) + s.NodePollerMock. + On("Run", mock.AnythingOfType("chan<- service.Event")) s.SwarmListener.Run() @@ -233,6 +239,7 @@ L: s.NodeClientMock.AssertExpectations(s.T()) s.NodeCacheMock.AssertExpectations(s.T()) s.NotifyDistributorMock.AssertExpectations(s.T()) + s.NodePollerMock.AssertExpectations(s.T()) }