Skip to content

Commit

Permalink
RFC: Simplifies cancel managing (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasjpfan authored Jun 1, 2018
1 parent 40da6c3 commit cf38abd
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 137 deletions.
33 changes: 7 additions & 26 deletions service/cancelmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
type CancelManaging interface {
Add(rootCtx context.Context, id string, reqID int64) context.Context
Delete(id string, reqID int64) bool
ForceDelete(id string) bool
}

type cancelPair struct {
Expand All @@ -19,28 +18,26 @@ type cancelPair struct {

// CancelManager implements the `CancelManaging` interface that is thread safe
type CancelManager struct {
v map[string]cancelPair
mux sync.Mutex
cancelBeforeAdding bool
v map[string]cancelPair
mux sync.Mutex
}

// NewCancelManager creates a new `CancelManager`
func NewCancelManager(cancelBeforeAdding bool) *CancelManager {
func NewCancelManager() *CancelManager {
return &CancelManager{
v: map[string]cancelPair{},
mux: sync.Mutex{},
cancelBeforeAdding: cancelBeforeAdding,
v: map[string]cancelPair{},
mux: sync.Mutex{},
}
}

// Add creates an context for `id` and `reqID` and returns that context.
// If `id` exists in memory and cancelBeforeAdding is true, the task with that `id` will be canceled.
// If `id` exists in memory, the task with that `id` will be canceled.
func (m *CancelManager) Add(rootCtx context.Context, id string, reqID int64) context.Context {
m.mux.Lock()
defer m.mux.Unlock()

pair, ok := m.v[id]
if m.cancelBeforeAdding && ok {
if ok {
pair.Cancel()
delete(m.v, id)
}
Expand Down Expand Up @@ -71,19 +68,3 @@ func (m *CancelManager) Delete(id string, reqID int64) bool {
delete(m.v, id)
return true
}

// ForceDelete deletes an id without looking at the `reqID` and count
func (m *CancelManager) ForceDelete(id string) bool {
m.mux.Lock()
defer m.mux.Unlock()

pair, ok := m.v[id]

if !ok {
return false
}

pair.Cancel()
delete(m.v, id)
return true
}
30 changes: 5 additions & 25 deletions service/cancelmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (s *CancelManagerTestSuite) SetupSuite() {
}

func (s *CancelManagerTestSuite) Test_Add_IDEqual_CancelsContext_Returns_Context() {
cm := NewCancelManager(true)
cm := NewCancelManager()
ctx := cm.Add(s.ctx, "id1", 1)
cm.Add(s.ctx, "id1", 2)

Expand All @@ -42,7 +42,7 @@ L:

func (s *CancelManagerTestSuite) Test_Add_IDNotExist_Returns_Context() {

cm := NewCancelManager(true)
cm := NewCancelManager()
firstCtx := cm.Add(s.ctx, "id1", 1)
s.NotNil(firstCtx)

Expand All @@ -51,7 +51,7 @@ func (s *CancelManagerTestSuite) Test_Add_IDNotExist_Returns_Context() {
}

func (s *CancelManagerTestSuite) Test_Delete_IDEqual_ReqIDNotEqual_DoesNothing() {
cm := NewCancelManager(true)
cm := NewCancelManager()
cm.Add(s.ctx, "id1", 1)

s.Require().Len(cm.v, 1)
Expand All @@ -63,7 +63,7 @@ func (s *CancelManagerTestSuite) Test_Delete_IDEqual_ReqIDNotEqual_DoesNothing()
}

func (s *CancelManagerTestSuite) Test_Delete_IDEqual_ReqIDEqual_CallsCancel_RemovesFromMemory() {
cm := NewCancelManager(true)
cm := NewCancelManager()
ctx := cm.Add(s.ctx, "id1", 1)

s.Require().Len(cm.v, 1)
Expand All @@ -85,31 +85,11 @@ L:

func (s *CancelManagerTestSuite) Test_Delete_IDEqual_ReqIDEqual_CntNotZero_StaysInMemory() {
// Set startingCnt to 2
cm := NewCancelManager(true)
cm := NewCancelManager()
cm.Add(s.ctx, "id1", 1)
s.Require().Len(cm.v, 1)
s.Require().Contains(cm.v, "id1")

s.True(cm.Delete("id1", 1))
s.Require().Len(cm.v, 0)
}

func (s *CancelManagerTestSuite) Test_ForceDelete() {
cm := NewCancelManager(true)
ctx := cm.Add(s.ctx, "id1", 1)
s.Require().Len(cm.v, 1)

s.False(cm.ForceDelete("DOESNOTEXIST"))
s.True(cm.ForceDelete("id1"))

L:
for {
select {
case <-time.After(time.Second * 5):
s.Fail("Timeout")
return
case <-ctx.Done():
break L
}
}
}
4 changes: 2 additions & 2 deletions service/notifydistributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func newNotifyDistributorfromStrings(serviceCreateAddrs, serviceRemoveAddrs, nod

return newNotifyDistributor(
notifyEndpoints,
NewCancelManager(true),
NewCancelManager(true),
NewCancelManager(),
NewCancelManager(),
interval,
logger)
}
Expand Down
8 changes: 4 additions & 4 deletions service/notifydistributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ func (s *NotifyDistributorTestSuite) Test_RunDistributesNotificationsToEndpoints
},
}

notifyD := newNotifyDistributor(endpoints, NewCancelManager(true),
NewCancelManager(true), 1, s.log)
notifyD := newNotifyDistributor(endpoints, NewCancelManager(),
NewCancelManager(), 1, s.log)
serviceChan := make(chan Notification)

notifyD.Run(serviceChan, nil)
Expand Down Expand Up @@ -379,8 +379,8 @@ func (s *NotifyDistributorTestSuite) Test_RunDistributesNotificationsToEndpoints
},
}

notifyD := newNotifyDistributor(endpoints, NewCancelManager(true),
NewCancelManager(true), 1, s.log)
notifyD := newNotifyDistributor(endpoints, NewCancelManager(),
NewCancelManager(), 1, s.log)
nodeChan := make(chan Notification)

notifyD.Run(nil, nodeChan)
Expand Down
109 changes: 33 additions & 76 deletions service/swarmlistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,41 +22,6 @@ type SwarmListening interface {
GetNodesParameters(ctx context.Context) ([]map[string]string, error)
}

// CreateRemoveCancelManager combines two cancel managers for creating and
// removing services
type CreateRemoveCancelManager struct {
createCancelManager CancelManaging
removeCancelManager CancelManaging
mux sync.RWMutex
}

// AddEvent controls canceling for creating and removing services
// A create event will cancel delete events with the same ID
// A remove event will cancel create events with the same ID
func (c *CreateRemoveCancelManager) AddEvent(event Event) context.Context {
c.mux.Lock()
defer c.mux.Unlock()
if event.Type == EventTypeCreate {
c.removeCancelManager.ForceDelete(event.ID)
return c.createCancelManager.Add(context.Background(), event.ID, event.TimeNano)
}
// EventTypeRemove
c.createCancelManager.ForceDelete(event.ID)
return c.removeCancelManager.Add(context.Background(), event.ID, event.TimeNano)
}

// RemoveEvent removes and cancels event from its corresponding
// cancel manager
func (c *CreateRemoveCancelManager) RemoveEvent(event Event) bool {
c.mux.Lock()
defer c.mux.Unlock()
if event.Type == EventTypeCreate {
return c.createCancelManager.Delete(event.ID, event.TimeNano)
}
// EventTypeRemove
return c.removeCancelManager.Delete(event.ID, event.TimeNano)
}

// SwarmListener provides public api
type SwarmListener struct {
SSListener SwarmServiceListening
Expand All @@ -75,13 +40,13 @@ type SwarmListener struct {

NotifyDistributor NotifyDistributing

ServiceCreateRemoveCancelManager *CreateRemoveCancelManager
NodeCreateRemoveCancelManager *CreateRemoveCancelManager
IncludeNodeInfo bool
UseDockerServiceEvents bool
IgnoreKey string
IncludeKey string
Log *log.Logger
ServiceCancelManager CancelManaging
NodeCancelManager CancelManaging
IncludeNodeInfo bool
UseDockerServiceEvents bool
IgnoreKey string
IncludeKey string
Log *log.Logger
}

func newSwarmListener(
Expand All @@ -100,10 +65,8 @@ func newSwarmListener(

notifyDistributor NotifyDistributing,

serviceCreateCancelManager CancelManaging,
serviceRemoveCancelManager CancelManaging,
nodeCreateCancelManager CancelManaging,
nodeRemoveCancelManager CancelManaging,
serviceCancelManager CancelManaging,
nodeCancelManager CancelManaging,
includeNodeInfo bool,
useDockerServiceEvents bool,
ignoreKey string,
Expand All @@ -112,24 +75,20 @@ func newSwarmListener(
) *SwarmListener {

return &SwarmListener{
SSListener: ssListener,
SSClient: ssClient,
SSCache: ssCache,
SSPoller: ssPoller,
SSEventChan: ssEventChan,
SSNotificationChan: ssNotificationChan,
NodeListener: nodeListener,
NodeClient: nodeClient,
NodeCache: nodeCache,
NodeEventChan: nodeEventChan,
NodeNotificationChan: nodeNotificationChan,
NotifyDistributor: notifyDistributor,
ServiceCreateRemoveCancelManager: &CreateRemoveCancelManager{
createCancelManager: serviceCreateCancelManager,
removeCancelManager: serviceRemoveCancelManager},
NodeCreateRemoveCancelManager: &CreateRemoveCancelManager{
createCancelManager: nodeCreateCancelManager,
removeCancelManager: nodeRemoveCancelManager},
SSListener: ssListener,
SSClient: ssClient,
SSCache: ssCache,
SSPoller: ssPoller,
SSEventChan: ssEventChan,
SSNotificationChan: ssNotificationChan,
NodeListener: nodeListener,
NodeClient: nodeClient,
NodeCache: nodeCache,
NodeEventChan: nodeEventChan,
NodeNotificationChan: nodeNotificationChan,
NotifyDistributor: notifyDistributor,
ServiceCancelManager: serviceCancelManager,
NodeCancelManager: nodeCancelManager,
IncludeNodeInfo: includeNodeInfo,
UseDockerServiceEvents: useDockerServiceEvents,
IgnoreKey: ignoreKey,
Expand Down Expand Up @@ -204,10 +163,8 @@ func NewSwarmListenerFromEnv(
nodeEventChan,
nodeNotificationChan,
notifyDistributor,
NewCancelManager(true),
NewCancelManager(true),
NewCancelManager(true),
NewCancelManager(true),
NewCancelManager(),
NewCancelManager(),
includeNodeInfo,
useDockerServiceEvents,
ignoreKey,
Expand Down Expand Up @@ -253,8 +210,8 @@ func (l *SwarmListener) connectServiceChannels() {
}

func (l *SwarmListener) processServiceEventCreate(event Event) {
ctx := l.ServiceCreateRemoveCancelManager.AddEvent(event)
defer l.ServiceCreateRemoveCancelManager.RemoveEvent(event)
ctx := l.ServiceCancelManager.Add(context.Background(), event.ID, event.TimeNano)
defer l.ServiceCancelManager.Delete(event.ID, event.TimeNano)

errChan := make(chan error)

Expand Down Expand Up @@ -303,8 +260,8 @@ func (l *SwarmListener) processServiceEventCreate(event Event) {
}

func (l *SwarmListener) processServiceEventRemove(event Event) {
ctx := l.ServiceCreateRemoveCancelManager.AddEvent(event)
defer l.ServiceCreateRemoveCancelManager.RemoveEvent(event)
ctx := l.ServiceCancelManager.Add(context.Background(), event.ID, event.TimeNano)
defer l.ServiceCancelManager.Delete(event.ID, event.TimeNano)

errChan := make(chan error)

Expand Down Expand Up @@ -353,8 +310,8 @@ func (l *SwarmListener) connectNodeChannels() {
}

func (l *SwarmListener) processNodeEventCreate(event Event) {
ctx := l.NodeCreateRemoveCancelManager.AddEvent(event)
defer l.NodeCreateRemoveCancelManager.RemoveEvent(event)
ctx := l.NodeCancelManager.Add(context.Background(), event.ID, event.TimeNano)
defer l.NodeCancelManager.Delete(event.ID, event.TimeNano)

errChan := make(chan error)

Expand Down Expand Up @@ -398,8 +355,8 @@ func (l *SwarmListener) processNodeEventCreate(event Event) {
}

func (l *SwarmListener) processNodeEventRemove(event Event) {
ctx := l.NodeCreateRemoveCancelManager.AddEvent(event)
defer l.NodeCreateRemoveCancelManager.RemoveEvent(event)
ctx := l.NodeCancelManager.Add(context.Background(), event.ID, event.TimeNano)
defer l.NodeCancelManager.Delete(event.ID, event.TimeNano)

errChan := make(chan error)
go func() {
Expand Down
6 changes: 2 additions & 4 deletions service/swarmlistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,8 @@ func (s *SwarmListenerTestSuite) SetupTest() {
make(chan Event),
make(chan Notification),
s.NotifyDistributorMock,
NewCancelManager(true),
NewCancelManager(true),
NewCancelManager(true),
NewCancelManager(true),
NewCancelManager(),
NewCancelManager(),
false,
true,
"com.df.notify",
Expand Down

0 comments on commit cf38abd

Please sign in to comment.