Skip to content

Commit af9c239

Browse files
committed
Resolve conflicts on pull and rewrite local winner as new version to be pushed back
1 parent 93304eb commit af9c239

7 files changed

+163
-60
lines changed

db/hybrid_logical_vector.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -340,13 +340,13 @@ func (hlv *HybridLogicalVector) AddNewerVersions(otherVector *HybridLogicalVecto
340340
// for source if the local version for that source is lower
341341
for i, v := range otherVector.PreviousVersions {
342342
if hlv.PreviousVersions[i] == 0 {
343-
hlv.setPreviousVersion(i, v)
343+
hlv.SetPreviousVersion(i, v)
344344
} else {
345345
// if we get here then there is entry for this source in PV so we must check if its newer or not
346346
otherHLVPVValue := v
347347
localHLVPVValue := hlv.PreviousVersions[i]
348348
if localHLVPVValue < otherHLVPVValue {
349-
hlv.setPreviousVersion(i, v)
349+
hlv.SetPreviousVersion(i, v)
350350
}
351351
}
352352
}
@@ -375,8 +375,8 @@ func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansi
375375
return outputSpec
376376
}
377377

378-
// setPreviousVersion will take a source/version pair and add it to the HLV previous versions map
379-
func (hlv *HybridLogicalVector) setPreviousVersion(source string, version uint64) {
378+
// SetPreviousVersion will take a source/version pair and add it to the HLV previous versions map
379+
func (hlv *HybridLogicalVector) SetPreviousVersion(source string, version uint64) {
380380
if hlv.PreviousVersions == nil {
381381
hlv.PreviousVersions = make(HLVVersions)
382382
}

rest/utilities_testing_blip_client.go

+103-15
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,22 @@ const (
3737
RevtreeSubtestName = "revTree"
3838
)
3939

40+
type BlipTesterClientConflictResolverType string
41+
42+
const (
43+
ConflictResolverLastWriteWins BlipTesterClientConflictResolverType = "lww"
44+
45+
ConflictResolverDefault = ConflictResolverLastWriteWins
46+
)
47+
48+
func (c BlipTesterClientConflictResolverType) IsValid() bool {
49+
switch c {
50+
case ConflictResolverLastWriteWins:
51+
return true
52+
}
53+
return false
54+
}
55+
4056
type BlipTesterClientOpts struct {
4157
ClientDeltas bool // Support deltas on the client side
4258
Username string
@@ -62,6 +78,8 @@ type BlipTesterClientOpts struct {
6278

6379
// SourceID is used to define the SourceID for the blip client
6480
SourceID string
81+
82+
ConflictResolver BlipTesterClientConflictResolverType
6583
}
6684

6785
// defaultBlipTesterClientRevsLimit is the number of revisions sent as history when the client replicates - older revisions are not sent, and may not be stored.
@@ -281,6 +299,12 @@ func (cd *clientDoc) currentVersion(t testing.TB) *db.Version {
281299
return &rev.version.CV
282300
}
283301

302+
func (cd *clientDoc) _currentVersion(t testing.TB) *db.Version {
303+
rev, err := cd._latestRev()
304+
require.NoError(t, err)
305+
return &rev.version.CV
306+
}
307+
284308
type BlipTesterCollectionClient struct {
285309
parent *BlipTesterClient
286310

@@ -571,20 +595,41 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
571595
doc.lock.Lock()
572596
defer doc.lock.Unlock()
573597

598+
var incomingVersion DocVersion
574599
var newVersion DocVersion
575600
var hlv db.HybridLogicalVector
576601
if btc.UseHLV() {
602+
var incomingHLV *db.HybridLogicalVector
577603
if revHistory != "" {
578-
existingVersion, _, err := db.ExtractHLVFromBlipMessage(revHistory)
604+
incomingHLV, _, err = db.ExtractHLVFromBlipMessage(revHistory)
579605
require.NoError(btr.TB(), err, "error extracting HLV %q: %v", revHistory, err)
580-
hlv = *existingVersion
606+
hlv = *incomingHLV
581607
}
582-
v, err := db.ParseVersion(revID)
608+
incomingCV, err := db.ParseVersion(revID)
583609
require.NoError(btr.TB(), err, "error parsing version %q: %v", revID, err)
584-
newVersion = DocVersion{CV: v}
585-
require.NoError(btr.TB(), hlv.AddVersion(v))
610+
incomingVersion = DocVersion{CV: incomingCV}
611+
612+
clientCV := doc._currentVersion(btc.TB())
613+
// incoming rev older than stored client version and comes from a different source - need to resolve
614+
if incomingCV.Value < clientCV.Value && incomingCV.SourceID != clientCV.SourceID {
615+
btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV)
616+
switch btc.BlipTesterClientOpts.ConflictResolver {
617+
case ConflictResolverLastWriteWins:
618+
// generate a new version for the resolution and write it to the remote HLV
619+
v := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.id), Value: uint64(time.Now().UnixNano())}
620+
require.NoError(btc.TB(), hlv.AddVersion(v), "couldn't add incoming HLV into client HLV")
621+
newVersion = DocVersion{CV: v}
622+
hlv.SetPreviousVersion(incomingCV.SourceID, incomingCV.Value)
623+
default:
624+
btc.TB().Fatalf("Unknown conflict resolver %q - cannot resolve detected conflict", btc.BlipTesterClientOpts.ConflictResolver)
625+
}
626+
} else {
627+
newVersion = DocVersion{CV: incomingCV}
628+
}
629+
require.NoError(btc.TB(), hlv.AddVersion(newVersion.CV), "couldn't add newVersion CV into doc HLV")
586630
} else {
587631
newVersion = DocVersion{RevTreeID: revID}
632+
incomingVersion = newVersion
588633
}
589634

590635
docRev := clientDocRev{
@@ -614,12 +659,16 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
614659
// store the new sequence for a replaced rev for tests waiting for this specific rev
615660
doc._seqsByVersions[replacedVersion] = newClientSeq
616661
}
617-
doc._latestServerVersion = newVersion
662+
// store the _incoming_ version - not newVersion - since we may have written a resolved conflict which will need pushing back
663+
doc._latestServerVersion = incomingVersion
618664

619665
if !msg.NoReply() {
620666
response := msg.Response()
621667
response.SetBody([]byte(`[]`))
622668
}
669+
670+
// new sequence written, wake up changes feeds for push
671+
btcr._seqCond.Broadcast()
623672
return
624673
}
625674

@@ -794,24 +843,53 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
794843
doc.lock.Lock()
795844
defer doc.lock.Unlock()
796845

797-
var newVersion DocVersion
846+
var incomingVersion DocVersion
847+
var versionToWrite DocVersion
798848
var hlv db.HybridLogicalVector
799849
if btc.UseHLV() {
850+
var incomingHLV *db.HybridLogicalVector
800851
if revHistory != "" {
801-
existingVersion, _, err := db.ExtractHLVFromBlipMessage(revHistory)
852+
incomingHLV, _, err = db.ExtractHLVFromBlipMessage(revHistory)
802853
require.NoError(btr.TB(), err, "error extracting HLV %q: %v", revHistory, err)
803-
hlv = *existingVersion
854+
hlv = *incomingHLV
804855
}
805-
v, err := db.ParseVersion(revID)
856+
incomingCV, err := db.ParseVersion(revID)
806857
require.NoError(btr.TB(), err, "error parsing version %q: %v", revID, err)
807-
newVersion = DocVersion{CV: v}
808-
require.NoError(btr.TB(), hlv.AddVersion(v))
858+
incomingVersion = DocVersion{CV: incomingCV}
859+
860+
// fetch client's latest version to do conflict check and resolution
861+
latestClientRev, err := doc._latestRev()
862+
require.NoError(btc.TB(), err, "couldn't get latest revision for doc %q", docID)
863+
if latestClientRev != nil {
864+
clientCV := latestClientRev.version.CV
865+
866+
// incoming rev older than stored client version and comes from a different source - need to resolve
867+
if incomingCV.Value < clientCV.Value && incomingCV.SourceID != clientCV.SourceID {
868+
btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV)
869+
switch btc.BlipTesterClientOpts.ConflictResolver {
870+
case ConflictResolverLastWriteWins:
871+
// local wins so write the local body back as a new resolved version (based on incoming HLV) to push
872+
body = latestClientRev.body
873+
v := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.id), Value: uint64(time.Now().UnixNano())}
874+
require.NoError(btc.TB(), hlv.AddVersion(v), "couldn't add incoming HLV into client HLV")
875+
versionToWrite = DocVersion{CV: v}
876+
hlv.SetPreviousVersion(incomingCV.SourceID, incomingCV.Value)
877+
default:
878+
btc.TB().Fatalf("Unknown conflict resolver %q - cannot resolve detected conflict", btc.BlipTesterClientOpts.ConflictResolver)
879+
}
880+
} else {
881+
// no conflict - accept incoming rev
882+
versionToWrite = DocVersion{CV: incomingCV}
883+
}
884+
}
885+
require.NoError(btc.TB(), hlv.AddVersion(versionToWrite.CV), "couldn't add new CV into doc HLV")
809886
} else {
810-
newVersion = DocVersion{RevTreeID: revID}
887+
versionToWrite = DocVersion{RevTreeID: revID}
888+
incomingVersion = versionToWrite
811889
}
812890
docRev := clientDocRev{
813891
clientSeq: newClientSeq,
814-
version: newVersion,
892+
version: versionToWrite,
815893
HLV: hlv,
816894
body: body,
817895
message: msg,
@@ -835,12 +913,16 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
835913
// store the new sequence for a replaced rev for tests waiting for this specific rev
836914
doc._seqsByVersions[replacedVersion] = newClientSeq
837915
}
838-
doc._latestServerVersion = newVersion
916+
// store the _incoming_ version - not versionToWrite - since we may have written a resolved conflict which will need pushing back
917+
doc._latestServerVersion = incomingVersion
839918

840919
if !msg.NoReply() {
841920
response := msg.Response()
842921
response.SetBody([]byte(`[]`))
843922
}
923+
924+
// new sequence written, wake up changes feeds for push
925+
btcr._seqCond.Broadcast()
844926
}
845927

846928
btr.bt.blipContext.HandlerForProfile[db.MessageGetAttachment] = func(msg *blip.Message) {
@@ -1007,6 +1089,12 @@ func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTes
10071089
if !opts.AllowCreationWithoutBlipTesterClientRunner && !btcRunner.initialisedInsideRunnerCode {
10081090
require.FailNow(btcRunner.TB(), "must initialise BlipTesterClient inside Run() method")
10091091
}
1092+
if opts.ConflictResolver == "" {
1093+
opts.ConflictResolver = ConflictResolverDefault
1094+
}
1095+
if !opts.ConflictResolver.IsValid() {
1096+
require.FailNow(btcRunner.TB(), "invalid conflict resolver %q", opts.ConflictResolver)
1097+
}
10101098
if opts.SourceID == "" {
10111099
opts.SourceID = "blipclient"
10121100
}

topologytest/couchbase_lite_mock_peer_test.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ func (p *CouchbaseLiteMockPeer) GetDocument(dsName sgbucket.DataStoreName, docID
6565
bodyBytes, meta := p.getLatestDocVersion(dsName, docID)
6666
require.NotNil(p.TB(), meta, "docID:%s not found on %s", docID, p)
6767
var body db.Body
68-
require.NoError(p.TB(), base.JSONUnmarshal(bodyBytes, &body))
68+
// it's easier if all clients can return consistent bodies for tombstones
69+
// lets just settle on nil, since we still need special handling anyway for `` vs `{}` so unmarshal doesn't barf
70+
if len(bodyBytes) > 0 && string(bodyBytes) != base.EmptyDocument {
71+
require.NoError(p.TB(), base.JSONUnmarshal(bodyBytes, &body))
72+
}
6973
return *meta, body
7074
}
7175

topologytest/couchbase_server_peer_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,8 @@ func getBodyAndVersion(peer Peer, collection sgbucket.DataStore, docID string) (
327327
require.NoError(peer.TB(), err)
328328
// get hlv to construct DocVersion
329329
var body db.Body
330-
require.NoError(peer.TB(), base.JSONUnmarshal(docBytes, &body))
330+
if len(docBytes) > 0 {
331+
require.NoError(peer.TB(), base.JSONUnmarshal(docBytes, &body))
332+
}
331333
return getDocVersion(docID, peer, cas, xattrs), body
332334
}

topologytest/hlv_test.go

+27-21
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/couchbase/sync_gateway/base"
1717
"github.com/couchbase/sync_gateway/db"
18+
"github.com/stretchr/testify/assert"
1819
"github.com/stretchr/testify/require"
1920
)
2021

@@ -65,6 +66,25 @@ func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, p
6566
}
6667
}
6768

69+
// waitForConvergingVersion waits for the same document version to reach all peers.
70+
func waitForConvergingVersion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) {
71+
t.Logf("waiting for converged doc versions across all peers")
72+
require.EventuallyWithT(t, func(c *assert.CollectT) {
73+
for peerAid, peerA := range peers.SortedPeers() {
74+
docMetaA, bodyA := peerA.GetDocument(dsName, docID)
75+
for peerBid, peerB := range peers.SortedPeers() {
76+
if peerAid == peerBid {
77+
continue
78+
}
79+
docMetaB, bodyB := peerB.GetDocument(dsName, docID)
80+
cvA, cvB := docMetaA.CV(t), docMetaB.CV(t)
81+
require.Equalf(c, cvA, cvB, "CV mismatch: %s:%#v != %s:%#v", peerAid, docMetaA, peerBid, docMetaB)
82+
require.Equalf(c, bodyA, bodyB, "body mismatch: %s:%s != %s:%s", peerAid, bodyA, peerBid, bodyB)
83+
}
84+
}
85+
}, totalWaitTime, pollInterval)
86+
}
87+
6888
// removeSyncGatewayBackingPeers will check if there is sync gateway in topology, if so will track the backing CBS
6989
// so we can skip creating docs on these peers (avoiding conflicts between docs created on the SGW and cbs)
7090
func removeSyncGatewayBackingPeers(peers map[string]Peer) map[string]bool {
@@ -80,9 +100,9 @@ func removeSyncGatewayBackingPeers(peers map[string]Peer) map[string]bool {
80100
return peersToRemove
81101
}
82102

83-
// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents, then
84-
// returns the last peer to have a doc created on it
85-
func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) (lastWrite BodyAndVersion) {
103+
// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents.
104+
// It is not known at this stage which write the "winner" will be, since conflict resolution can happen at replication time which may not be LWW, or may be LWW but with a new value.
105+
func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) {
86106
backingPeers := removeSyncGatewayBackingPeers(peers)
87107
documentVersion := make([]BodyAndVersion, 0, len(peers))
88108
for peerName, peer := range peers {
@@ -94,15 +114,10 @@ func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee
94114
t.Logf("%s - createVersion: %#v", peerName, docVersion.docMeta)
95115
documentVersion = append(documentVersion, docVersion)
96116
}
97-
index := len(documentVersion) - 1
98-
lastWrite = documentVersion[index]
99-
100-
return lastWrite
101117
}
102118

103-
// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations, then
104-
// returns the last peer to have a doc updated on it.
105-
func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) (lastWrite BodyAndVersion) {
119+
// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations
120+
func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) {
106121
backingPeers := removeSyncGatewayBackingPeers(peers)
107122
var documentVersion []BodyAndVersion
108123
for peerName, peer := range peers {
@@ -114,15 +129,10 @@ func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee
114129
t.Logf("updateVersion: %#v", docVersion.docMeta)
115130
documentVersion = append(documentVersion, docVersion)
116131
}
117-
index := len(documentVersion) - 1
118-
lastWrite = documentVersion[index]
119-
120-
return lastWrite
121132
}
122133

123-
// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions, then
124-
// returns the last peer to have a doc deleted on it
125-
func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) (lastWrite BodyAndVersion) {
134+
// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions
135+
func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) {
126136
backingPeers := removeSyncGatewayBackingPeers(peers)
127137
var documentVersion []BodyAndVersion
128138
for peerName, peer := range peers {
@@ -133,10 +143,6 @@ func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers
133143
t.Logf("deleteVersion: %#v", deleteVersion)
134144
documentVersion = append(documentVersion, BodyAndVersion{docMeta: deleteVersion, updatePeer: peerName})
135145
}
136-
index := len(documentVersion) - 1
137-
lastWrite = documentVersion[index]
138-
139-
return lastWrite
140146
}
141147

142148
// getDocID returns a unique doc ID for the test case. Note: when running with Couchbase Server and -count > 1, this will return duplicate IDs for count 2 and higher and they can conflict due to the way bucket pool works.

0 commit comments

Comments
 (0)