Skip to content

Commit 6710269

Browse files
authored
Merge pull request #2082 from authzed/memdb-emit-checkpoints-after-changes
emit memdb checkpoints after changes
2 parents bfd80f7 + a00bf58 commit 6710269

File tree

3 files changed

+32
-13
lines changed

3 files changed

+32
-13
lines changed

Diff for: internal/datastore/memdb/watch.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -114,18 +114,18 @@ func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, opti
114114
changes = append(changes, &change.changes)
115115
}
116116

117+
if options.Content&datastore.WatchSchema == datastore.WatchSchema &&
118+
len(change.changes.ChangedDefinitions) > 0 || len(change.changes.DeletedCaveats) > 0 || len(change.changes.DeletedNamespaces) > 0 {
119+
changes = append(changes, &change.changes)
120+
}
121+
117122
if options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints && change.revisionNanos > lastRevision {
118123
changes = append(changes, &datastore.RevisionChanges{
119124
Revision: revisions.NewForTimestamp(change.revisionNanos),
120125
IsCheckpoint: true,
121126
})
122127
}
123128

124-
if options.Content&datastore.WatchSchema == datastore.WatchSchema &&
125-
len(change.changes.ChangedDefinitions) > 0 || len(change.changes.DeletedCaveats) > 0 || len(change.changes.DeletedNamespaces) > 0 {
126-
changes = append(changes, &change.changes)
127-
}
128-
129129
lastRevision = change.revisionNanos
130130
}
131131

Diff for: internal/testserver/datastore/postgres.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,12 @@ func (b *postgresTester) NewDatabase(t testing.TB) string {
129129
}
130130

131131
const (
132-
retryCount = 3
133-
timeBetweenRetries = 100 * time.Millisecond
132+
retryCount = 4
133+
timeBetweenRetries = 1 * time.Second
134134
)
135135

136136
func (b *postgresTester) NewDatastore(t testing.TB, initFunc InitFunc) datastore.Datastore {
137-
for i := 0; i < retryCount; i++ {
137+
for i := 1; i <= retryCount; i++ {
138138
connectStr := b.NewDatabase(t)
139139

140140
migrationDriver, err := pgmigrations.NewAlembicPostgresDriver(context.Background(), connectStr, datastore.NoCredentialsProvider)
@@ -144,11 +144,11 @@ func (b *postgresTester) NewDatastore(t testing.TB, initFunc InitFunc) datastore
144144
return initFunc("postgres", connectStr)
145145
}
146146

147-
if i == retryCount-1 {
147+
if i == retryCount {
148148
require.NoError(t, err, "got error when trying to create migration driver")
149149
}
150150

151-
time.Sleep(timeBetweenRetries)
151+
time.Sleep(time.Duration(i) * timeBetweenRetries)
152152
}
153153

154154
require.Fail(t, "failed to create datastore for testing")

Diff for: pkg/datastore/test/watch.go

+22-3
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) {
726726
require.NoError(err)
727727

728728
changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{
729-
Content: datastore.WatchCheckpoints | datastore.WatchRelationships,
729+
Content: datastore.WatchCheckpoints | datastore.WatchRelationships | datastore.WatchSchema,
730730
CheckpointInterval: 100 * time.Millisecond,
731731
})
732732
require.Zero(len(errchan))
@@ -735,6 +735,14 @@ func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) {
735735
tuple.Parse("document:firstdoc#viewer@user:tom"),
736736
)
737737
require.NoError(err)
738+
739+
verifyCheckpointUpdate(require, afterTouchRevision, changes)
740+
741+
afterTouchRevision, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
742+
return rwt.WriteNamespaces(ctx, &core.NamespaceDefinition{Name: "doesnotexist"})
743+
})
744+
require.NoError(err)
745+
738746
verifyCheckpointUpdate(require, afterTouchRevision, changes)
739747
}
740748

@@ -743,14 +751,25 @@ func verifyCheckpointUpdate(
743751
expectedRevision datastore.Revision,
744752
changes <-chan *datastore.RevisionChanges,
745753
) {
754+
var relChangeEmitted, schemaChangeEmitted bool
746755
changeWait := time.NewTimer(waitForChangesTimeout)
747756
for {
748757
select {
749758
case change, ok := <-changes:
750759
require.True(ok)
760+
if len(change.ChangedDefinitions) > 0 {
761+
schemaChangeEmitted = true
762+
}
763+
if len(change.RelationshipChanges) > 0 {
764+
relChangeEmitted = true
765+
}
751766
if change.IsCheckpoint {
752-
require.True(change.Revision.Equal(change.Revision) || change.Revision.GreaterThan(expectedRevision))
753-
return
767+
if change.Revision.Equal(expectedRevision) || change.Revision.GreaterThan(expectedRevision) {
768+
require.True(relChangeEmitted || schemaChangeEmitted, "expected relationship/schema changes before checkpoint")
769+
return
770+
}
771+
772+
// we received a past revision checkpoint, ignore
754773
}
755774
case <-changeWait.C:
756775
require.Fail("Timed out", "waited for checkpoint")

0 commit comments

Comments
 (0)