From cf38abd7ef6817a6711bdc4702c5b7552081b31f Mon Sep 17 00:00:00 2001 From: Thomas Fan Date: Thu, 31 May 2018 21:30:02 -0400 Subject: [PATCH] RFC: Simplifies cancel managing (#16) --- service/cancelmanager.go | 33 ++------- service/cancelmanager_test.go | 30 ++------ service/notifydistributor.go | 4 +- service/notifydistributor_test.go | 8 +-- service/swarmlistener.go | 109 +++++++++--------------------- service/swarmlistener_test.go | 6 +- 6 files changed, 53 insertions(+), 137 deletions(-) diff --git a/service/cancelmanager.go b/service/cancelmanager.go index 3b74d82..fcd8ae8 100644 --- a/service/cancelmanager.go +++ b/service/cancelmanager.go @@ -9,7 +9,6 @@ import ( type CancelManaging interface { Add(rootCtx context.Context, id string, reqID int64) context.Context Delete(id string, reqID int64) bool - ForceDelete(id string) bool } type cancelPair struct { @@ -19,28 +18,26 @@ type cancelPair struct { // CancelManager implements the `CancelManaging` interface that is thread safe type CancelManager struct { - v map[string]cancelPair - mux sync.Mutex - cancelBeforeAdding bool + v map[string]cancelPair + mux sync.Mutex } // NewCancelManager creates a new `CancelManager` -func NewCancelManager(cancelBeforeAdding bool) *CancelManager { +func NewCancelManager() *CancelManager { return &CancelManager{ - v: map[string]cancelPair{}, - mux: sync.Mutex{}, - cancelBeforeAdding: cancelBeforeAdding, + v: map[string]cancelPair{}, + mux: sync.Mutex{}, } } // Add creates an context for `id` and `reqID` and returns that context. -// If `id` exists in memory and cancelBeforeAdding is true, the task with that `id` will be canceled. +// If `id` exists in memory, the task with that `id` will be canceled. func (m *CancelManager) Add(rootCtx context.Context, id string, reqID int64) context.Context { m.mux.Lock() defer m.mux.Unlock() pair, ok := m.v[id] - if m.cancelBeforeAdding && ok { + if ok { pair.Cancel() delete(m.v, id) } @@ -71,19 +68,3 @@ func (m *CancelManager) Delete(id string, reqID int64) bool { delete(m.v, id) return true } - -// ForceDelete deletes an id without looking at the `reqID` and count -func (m *CancelManager) ForceDelete(id string) bool { - m.mux.Lock() - defer m.mux.Unlock() - - pair, ok := m.v[id] - - if !ok { - return false - } - - pair.Cancel() - delete(m.v, id) - return true -} diff --git a/service/cancelmanager_test.go b/service/cancelmanager_test.go index 8718a02..b21ca21 100644 --- a/service/cancelmanager_test.go +++ b/service/cancelmanager_test.go @@ -22,7 +22,7 @@ func (s *CancelManagerTestSuite) SetupSuite() { } func (s *CancelManagerTestSuite) Test_Add_IDEqual_CancelsContext_Returns_Context() { - cm := NewCancelManager(true) + cm := NewCancelManager() ctx := cm.Add(s.ctx, "id1", 1) cm.Add(s.ctx, "id1", 2) @@ -42,7 +42,7 @@ L: func (s *CancelManagerTestSuite) Test_Add_IDNotExist_Returns_Context() { - cm := NewCancelManager(true) + cm := NewCancelManager() firstCtx := cm.Add(s.ctx, "id1", 1) s.NotNil(firstCtx) @@ -51,7 +51,7 @@ func (s *CancelManagerTestSuite) Test_Add_IDNotExist_Returns_Context() { } func (s *CancelManagerTestSuite) Test_Delete_IDEqual_ReqIDNotEqual_DoesNothing() { - cm := NewCancelManager(true) + cm := NewCancelManager() cm.Add(s.ctx, "id1", 1) s.Require().Len(cm.v, 1) @@ -63,7 +63,7 @@ func (s *CancelManagerTestSuite) Test_Delete_IDEqual_ReqIDNotEqual_DoesNothing() } func (s *CancelManagerTestSuite) Test_Delete_IDEqual_ReqIDEqual_CallsCancel_RemovesFromMemory() { - cm := NewCancelManager(true) + cm := NewCancelManager() ctx := cm.Add(s.ctx, "id1", 1) s.Require().Len(cm.v, 1) @@ -85,7 +85,7 @@ L: func (s *CancelManagerTestSuite) Test_Delete_IDEqual_ReqIDEqual_CntNotZero_StaysInMemory() { // Set startingCnt to 2 - cm := NewCancelManager(true) + cm := NewCancelManager() cm.Add(s.ctx, "id1", 1) s.Require().Len(cm.v, 1) s.Require().Contains(cm.v, "id1") @@ -93,23 +93,3 @@ func (s *CancelManagerTestSuite) Test_Delete_IDEqual_ReqIDEqual_CntNotZero_Stays s.True(cm.Delete("id1", 1)) s.Require().Len(cm.v, 0) } - -func (s *CancelManagerTestSuite) Test_ForceDelete() { - cm := NewCancelManager(true) - ctx := cm.Add(s.ctx, "id1", 1) - s.Require().Len(cm.v, 1) - - s.False(cm.ForceDelete("DOESNOTEXIST")) - s.True(cm.ForceDelete("id1")) - -L: - for { - select { - case <-time.After(time.Second * 5): - s.Fail("Timeout") - return - case <-ctx.Done(): - break L - } - } -} diff --git a/service/notifydistributor.go b/service/notifydistributor.go index 1bac456..e86eb4b 100644 --- a/service/notifydistributor.go +++ b/service/notifydistributor.go @@ -99,8 +99,8 @@ func newNotifyDistributorfromStrings(serviceCreateAddrs, serviceRemoveAddrs, nod return newNotifyDistributor( notifyEndpoints, - NewCancelManager(true), - NewCancelManager(true), + NewCancelManager(), + NewCancelManager(), interval, logger) } diff --git a/service/notifydistributor_test.go b/service/notifydistributor_test.go index 4c8e925..9eddb7c 100644 --- a/service/notifydistributor_test.go +++ b/service/notifydistributor_test.go @@ -304,8 +304,8 @@ func (s *NotifyDistributorTestSuite) Test_RunDistributesNotificationsToEndpoints }, } - notifyD := newNotifyDistributor(endpoints, NewCancelManager(true), - NewCancelManager(true), 1, s.log) + notifyD := newNotifyDistributor(endpoints, NewCancelManager(), + NewCancelManager(), 1, s.log) serviceChan := make(chan Notification) notifyD.Run(serviceChan, nil) @@ -379,8 +379,8 @@ func (s *NotifyDistributorTestSuite) Test_RunDistributesNotificationsToEndpoints }, } - notifyD := newNotifyDistributor(endpoints, NewCancelManager(true), - NewCancelManager(true), 1, s.log) + notifyD := newNotifyDistributor(endpoints, NewCancelManager(), + NewCancelManager(), 1, s.log) nodeChan := make(chan Notification) notifyD.Run(nil, nodeChan) diff --git a/service/swarmlistener.go b/service/swarmlistener.go index 9fb3aab..76f084c 100644 --- a/service/swarmlistener.go +++ b/service/swarmlistener.go @@ -22,41 +22,6 @@ type SwarmListening interface { GetNodesParameters(ctx context.Context) ([]map[string]string, error) } -// CreateRemoveCancelManager combines two cancel managers for creating and -// removing services -type CreateRemoveCancelManager struct { - createCancelManager CancelManaging - removeCancelManager CancelManaging - mux sync.RWMutex -} - -// AddEvent controls canceling for creating and removing services -// A create event will cancel delete events with the same ID -// A remove event will cancel create events with the same ID -func (c *CreateRemoveCancelManager) AddEvent(event Event) context.Context { - c.mux.Lock() - defer c.mux.Unlock() - if event.Type == EventTypeCreate { - c.removeCancelManager.ForceDelete(event.ID) - return c.createCancelManager.Add(context.Background(), event.ID, event.TimeNano) - } - // EventTypeRemove - c.createCancelManager.ForceDelete(event.ID) - return c.removeCancelManager.Add(context.Background(), event.ID, event.TimeNano) -} - -// RemoveEvent removes and cancels event from its corresponding -// cancel manager -func (c *CreateRemoveCancelManager) RemoveEvent(event Event) bool { - c.mux.Lock() - defer c.mux.Unlock() - if event.Type == EventTypeCreate { - return c.createCancelManager.Delete(event.ID, event.TimeNano) - } - // EventTypeRemove - return c.removeCancelManager.Delete(event.ID, event.TimeNano) -} - // SwarmListener provides public api type SwarmListener struct { SSListener SwarmServiceListening @@ -75,13 +40,13 @@ type SwarmListener struct { NotifyDistributor NotifyDistributing - ServiceCreateRemoveCancelManager *CreateRemoveCancelManager - NodeCreateRemoveCancelManager *CreateRemoveCancelManager - IncludeNodeInfo bool - UseDockerServiceEvents bool - IgnoreKey string - IncludeKey string - Log *log.Logger + ServiceCancelManager CancelManaging + NodeCancelManager CancelManaging + IncludeNodeInfo bool + UseDockerServiceEvents bool + IgnoreKey string + IncludeKey string + Log *log.Logger } func newSwarmListener( @@ -100,10 +65,8 @@ func newSwarmListener( notifyDistributor NotifyDistributing, - serviceCreateCancelManager CancelManaging, - serviceRemoveCancelManager CancelManaging, - nodeCreateCancelManager CancelManaging, - nodeRemoveCancelManager CancelManaging, + serviceCancelManager CancelManaging, + nodeCancelManager CancelManaging, includeNodeInfo bool, useDockerServiceEvents bool, ignoreKey string, @@ -112,24 +75,20 @@ func newSwarmListener( ) *SwarmListener { return &SwarmListener{ - SSListener: ssListener, - SSClient: ssClient, - SSCache: ssCache, - SSPoller: ssPoller, - SSEventChan: ssEventChan, - SSNotificationChan: ssNotificationChan, - NodeListener: nodeListener, - NodeClient: nodeClient, - NodeCache: nodeCache, - NodeEventChan: nodeEventChan, - NodeNotificationChan: nodeNotificationChan, - NotifyDistributor: notifyDistributor, - ServiceCreateRemoveCancelManager: &CreateRemoveCancelManager{ - createCancelManager: serviceCreateCancelManager, - removeCancelManager: serviceRemoveCancelManager}, - NodeCreateRemoveCancelManager: &CreateRemoveCancelManager{ - createCancelManager: nodeCreateCancelManager, - removeCancelManager: nodeRemoveCancelManager}, + SSListener: ssListener, + SSClient: ssClient, + SSCache: ssCache, + SSPoller: ssPoller, + SSEventChan: ssEventChan, + SSNotificationChan: ssNotificationChan, + NodeListener: nodeListener, + NodeClient: nodeClient, + NodeCache: nodeCache, + NodeEventChan: nodeEventChan, + NodeNotificationChan: nodeNotificationChan, + NotifyDistributor: notifyDistributor, + ServiceCancelManager: serviceCancelManager, + NodeCancelManager: nodeCancelManager, IncludeNodeInfo: includeNodeInfo, UseDockerServiceEvents: useDockerServiceEvents, IgnoreKey: ignoreKey, @@ -204,10 +163,8 @@ func NewSwarmListenerFromEnv( nodeEventChan, nodeNotificationChan, notifyDistributor, - NewCancelManager(true), - NewCancelManager(true), - NewCancelManager(true), - NewCancelManager(true), + NewCancelManager(), + NewCancelManager(), includeNodeInfo, useDockerServiceEvents, ignoreKey, @@ -253,8 +210,8 @@ func (l *SwarmListener) connectServiceChannels() { } func (l *SwarmListener) processServiceEventCreate(event Event) { - ctx := l.ServiceCreateRemoveCancelManager.AddEvent(event) - defer l.ServiceCreateRemoveCancelManager.RemoveEvent(event) + ctx := l.ServiceCancelManager.Add(context.Background(), event.ID, event.TimeNano) + defer l.ServiceCancelManager.Delete(event.ID, event.TimeNano) errChan := make(chan error) @@ -303,8 +260,8 @@ func (l *SwarmListener) processServiceEventCreate(event Event) { } func (l *SwarmListener) processServiceEventRemove(event Event) { - ctx := l.ServiceCreateRemoveCancelManager.AddEvent(event) - defer l.ServiceCreateRemoveCancelManager.RemoveEvent(event) + ctx := l.ServiceCancelManager.Add(context.Background(), event.ID, event.TimeNano) + defer l.ServiceCancelManager.Delete(event.ID, event.TimeNano) errChan := make(chan error) @@ -353,8 +310,8 @@ func (l *SwarmListener) connectNodeChannels() { } func (l *SwarmListener) processNodeEventCreate(event Event) { - ctx := l.NodeCreateRemoveCancelManager.AddEvent(event) - defer l.NodeCreateRemoveCancelManager.RemoveEvent(event) + ctx := l.NodeCancelManager.Add(context.Background(), event.ID, event.TimeNano) + defer l.NodeCancelManager.Delete(event.ID, event.TimeNano) errChan := make(chan error) @@ -398,8 +355,8 @@ func (l *SwarmListener) processNodeEventCreate(event Event) { } func (l *SwarmListener) processNodeEventRemove(event Event) { - ctx := l.NodeCreateRemoveCancelManager.AddEvent(event) - defer l.NodeCreateRemoveCancelManager.RemoveEvent(event) + ctx := l.NodeCancelManager.Add(context.Background(), event.ID, event.TimeNano) + defer l.NodeCancelManager.Delete(event.ID, event.TimeNano) errChan := make(chan error) go func() { diff --git a/service/swarmlistener_test.go b/service/swarmlistener_test.go index 526692a..cd30324 100644 --- a/service/swarmlistener_test.go +++ b/service/swarmlistener_test.go @@ -63,10 +63,8 @@ func (s *SwarmListenerTestSuite) SetupTest() { make(chan Event), make(chan Notification), s.NotifyDistributorMock, - NewCancelManager(true), - NewCancelManager(true), - NewCancelManager(true), - NewCancelManager(true), + NewCancelManager(), + NewCancelManager(), false, true, "com.df.notify",