Skip to content

Commit 849a82b

Browse files
authored
Locally-dispatched activity test flakiness hopefully improved (#1240)
At least, I finally ran it more than 5x in a row without failure. yay! We should probably just ban bare atomics. There are occasionally test races between test-completion and zap logging, but those are sorta un-resolvable unless we reliably stop everything in tests. Which would be excellent, but I think we're fairly far from achieving that at the moment. There also seem to still be some flaky runs of these locally-dispatched tests, but I'm not sure what the causes is yet. Still, seems improved.
1 parent 5ead824 commit 849a82b

File tree

2 files changed

+32
-29
lines changed

2 files changed

+32
-29
lines changed

internal/internal_workers_test.go

+31-26
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@ package internal
2323

2424
import (
2525
"context"
26-
"sync/atomic"
2726
"testing"
2827
"time"
2928

3029
"github.com/golang/mock/gomock"
3130
"github.com/pborman/uuid"
3231
"github.com/stretchr/testify/suite"
32+
"go.uber.org/atomic"
3333
"go.uber.org/cadence/.gen/go/cadence/workflowservicetest"
3434
m "go.uber.org/cadence/.gen/go/shared"
3535
"go.uber.org/cadence/internal/common"
@@ -663,16 +663,15 @@ func (s *WorkersTestSuite) createLocalActivityMarkerDataForTest(activityID strin
663663
}
664664

665665
func (s *WorkersTestSuite) TestLocallyDispatchedActivity() {
666-
activityCalledCount := 0
666+
activityCalledCount := atomic.NewInt32(0) // must be accessed with atomics, worker uses goroutines to run activities
667667
activitySleep := func(duration time.Duration) error {
668668
time.Sleep(duration)
669-
activityCalledCount++
669+
activityCalledCount.Add(1)
670670
return nil
671671
}
672672

673673
doneCh := make(chan struct{})
674674

675-
isActivityResponseCompleted := false
676675
workflowFn := func(ctx Context, input []byte) error {
677676
ao := ActivityOptions{
678677
ScheduleToCloseTimeout: 1 * time.Second,
@@ -736,10 +735,11 @@ func (s *WorkersTestSuite) TestLocallyDispatchedActivity() {
736735
TaskToken: []byte("test-token")}
737736
return &m.RespondDecisionTaskCompletedResponse{ActivitiesToDispatchLocally: activitiesToDispatchLocally}, nil
738737
}).Times(1)
738+
isActivityResponseCompleted := atomic.NewBool(false)
739739
s.service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), callOptions()...).DoAndReturn(func(ctx context.Context, request *m.RespondActivityTaskCompletedRequest, opts ...yarpc.CallOption,
740740
) error {
741741
defer close(doneCh)
742-
isActivityResponseCompleted = true
742+
isActivityResponseCompleted.Swap(true)
743743
return nil
744744
}).Times(1)
745745

@@ -756,32 +756,37 @@ func (s *WorkersTestSuite) TestLocallyDispatchedActivity() {
756756

757757
startWorkerAndWait(s, worker, &doneCh)
758758

759-
s.True(isActivityResponseCompleted)
760-
s.Equal(1, activityCalledCount)
759+
s.True(isActivityResponseCompleted.Load())
760+
s.Equal(int32(1), activityCalledCount.Load())
761761
}
762762

763763
func (s *WorkersTestSuite) TestMultipleLocallyDispatchedActivity() {
764-
var activityCalledCount uint32 = 0
764+
activityCalledCount := atomic.NewInt32(0)
765765
activitySleep := func(duration time.Duration) error {
766766
time.Sleep(duration)
767-
atomic.AddUint32(&activityCalledCount, 1)
767+
activityCalledCount.Add(1)
768768
return nil
769769
}
770770

771771
doneCh := make(chan struct{})
772772

773-
var activityCount uint32 = 5
773+
var activityCount int32 = 5
774774
workflowFn := func(ctx Context, input []byte) error {
775775
ao := ActivityOptions{
776776
ScheduleToCloseTimeout: 1 * time.Second,
777777
ScheduleToStartTimeout: 1 * time.Second,
778778
StartToCloseTimeout: 1 * time.Second,
779779
}
780780
ctx = WithActivityOptions(ctx, ao)
781-
for i := 1; i < int(activityCount); i++ {
782-
ExecuteActivity(ctx, activitySleep, 500*time.Millisecond)
781+
782+
// start all activities in parallel, and wait for them all to complete.
783+
var all []Future
784+
for i := 0; i < int(activityCount); i++ {
785+
all = append(all, ExecuteActivity(ctx, activitySleep, 500*time.Millisecond))
786+
}
787+
for i, f := range all {
788+
s.NoError(f.Get(ctx, nil), "activity %v should not have failed", i)
783789
}
784-
ExecuteActivity(ctx, activitySleep, 500*time.Millisecond).Get(ctx, nil)
785790
return nil
786791
}
787792

@@ -843,15 +848,13 @@ func (s *WorkersTestSuite) TestMultipleLocallyDispatchedActivity() {
843848
}
844849
return &m.RespondDecisionTaskCompletedResponse{ActivitiesToDispatchLocally: activitiesToDispatchLocally}, nil
845850
}).Times(1)
846-
var activityResponseCompletedCount uint32 = 0
851+
activityResponseCompletedCount := atomic.NewInt32(0)
847852
s.service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), callOptions()...).DoAndReturn(func(ctx context.Context, request *m.RespondActivityTaskCompletedRequest, opts ...yarpc.CallOption,
848853
) error {
849-
defer func() {
850-
if atomic.LoadUint32(&activityResponseCompletedCount) == activityCount {
851-
close(doneCh)
852-
}
853-
}()
854-
atomic.AddUint32(&activityResponseCompletedCount, 1)
854+
counted := activityResponseCompletedCount.Add(1)
855+
if counted == activityCount {
856+
close(doneCh)
857+
}
855858
return nil
856859
}).MinTimes(1)
857860

@@ -862,31 +865,33 @@ func (s *WorkersTestSuite) TestMultipleLocallyDispatchedActivity() {
862865
)
863866
worker.RegisterActivityWithOptions(activitySleep, RegisterActivityOptions{Name: "activitySleep"})
864867
s.NotNil(worker.locallyDispatchedActivityWorker)
865-
worker.Start()
868+
err := worker.Start()
869+
s.NoError(err, "worker failed to start")
866870

867871
// wait for test to complete
868872
// This test currently never completes, however after the timeout the asserts are true
869873
// so the test passes, I believe this is an error.
870874
select {
871875
case <-doneCh:
872-
break
876+
s.T().Log("completed")
873877
case <-time.After(1 * time.Second):
878+
s.T().Log("timed out")
874879
}
875880
worker.Stop()
876881

877882
// for currently unbuffered channel at least one activity should be sent
878-
s.True(activityResponseCompletedCount > 0)
879-
s.True(activityCalledCount > 0)
883+
s.True(activityResponseCompletedCount.Load() > 0)
884+
s.True(activityCalledCount.Load() > 0)
880885
}
881886

882887
// wait for test to complete - timeout and fail after 10 seconds to not block execution of other tests
883888
func startWorkerAndWait(s *WorkersTestSuite, worker *aggregatedWorker, doneCh *chan struct{}) {
884889
s.T().Helper()
885-
worker.Start()
890+
err := worker.Start()
891+
s.NoError(err, "worker failed to start")
886892
// wait for test to complete
887893
select {
888894
case <-*doneCh:
889-
return
890895
case <-time.After(10 * time.Second):
891896
s.Fail("Test timed out")
892897
}

internal/internal_workflow_testsuite_test.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -1991,9 +1991,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_CancelChildWorkflow() {
19911991
err = f.Get(ctx, nil)
19921992
}).Select(ctx)
19931993

1994-
fmt.Println("####")
1995-
fmt.Println(err)
1996-
fmt.Println("####")
1994+
GetLogger(ctx).Info("child workflow returned error", zap.Error(err))
19971995
return err
19981996
}
19991997

0 commit comments

Comments
 (0)