Skip to content

Commit 0bd7376

Browse files
authored
fix: add server isEnd determination criteria for message retrieval. (#812)
* fix: go mod dep repo update and fix reverse fetch message duplicate. (#810) Signed-off-by: Gordon <[email protected]> * refactor: change some tips. Signed-off-by: Gordon <[email protected]> * refactor: change some tips. Signed-off-by: Gordon <[email protected]> * fix: Add server isEnd determination criteria for message retrieval. Signed-off-by: Gordon <[email protected]> --------- Signed-off-by: Gordon <[email protected]>
1 parent 344d15b commit 0bd7376

File tree

5 files changed

+171
-75
lines changed

5 files changed

+171
-75
lines changed

go.mod

+14-10
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,28 @@
11
module github.com/openimsdk/openim-sdk-core/v3
22

3-
go 1.21
3+
go 1.22.7
4+
5+
toolchain go1.22.10
46

57
require (
68
github.com/golang/protobuf v1.5.4
79
github.com/gorilla/websocket v1.4.2
810
github.com/jinzhu/copier v0.4.0
911
github.com/pkg/errors v0.9.1
10-
google.golang.org/protobuf v1.33.0
12+
google.golang.org/protobuf v1.35.1 // indirect
1113
gorm.io/driver/sqlite v1.5.5
1214
nhooyr.io/websocket v1.8.10
1315
)
1416

15-
require golang.org/x/net v0.22.0 // indirect
17+
require golang.org/x/net v0.29.0 // indirect
1618

1719
require (
1820
github.com/google/go-cmp v0.6.0
19-
github.com/openimsdk/protocol v0.0.72
21+
github.com/openimsdk/protocol v0.0.72-alpha.63
2022
github.com/openimsdk/tools v0.0.50-alpha.21
2123
github.com/patrickmn/go-cache v2.1.0+incompatible
2224
golang.org/x/image v0.15.0
23-
golang.org/x/sync v0.6.0
24-
google.golang.org/grpc v1.62.1
25+
golang.org/x/sync v0.8.0
2526
gorm.io/gorm v1.25.10
2627
)
2728

@@ -53,9 +54,12 @@ require (
5354
go.uber.org/multierr v1.6.0 // indirect
5455
go.uber.org/zap v1.24.0 // indirect
5556
golang.org/x/arch v0.3.0 // indirect
56-
golang.org/x/crypto v0.21.0 // indirect
57-
golang.org/x/sys v0.18.0 // indirect
58-
golang.org/x/text v0.14.0 // indirect
59-
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
57+
golang.org/x/crypto v0.27.0 // indirect
58+
golang.org/x/sys v0.25.0 // indirect
59+
golang.org/x/text v0.18.0 // indirect
60+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
61+
google.golang.org/grpc v1.68.0 // indirect
6062
gopkg.in/yaml.v3 v3.0.1 // indirect
6163
)
64+
65+
replace nhooyr.io/websocket => github.com/coder/websocket v1.8.10

go.sum

+20-20
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZX
66
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
77
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
88
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
9+
github.com/coder/websocket v1.8.10 h1:K+NrQte1lq04N7V/E3avmuuuCGEaInbjTWukHZsN17g=
10+
github.com/coder/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
911
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1012
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
1113
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -64,8 +66,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
6466
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
6567
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
6668
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
67-
github.com/openimsdk/protocol v0.0.72 h1:K+vslwaR7lDXyBzb07UuEQITaqsgighz7NyXVIWsu6A=
68-
github.com/openimsdk/protocol v0.0.72/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
69+
github.com/openimsdk/protocol v0.0.72-alpha.63 h1:IyPBibEvwBtTmD8DSrlqcekfEXe74k4+KeeHsgdhGh0=
70+
github.com/openimsdk/protocol v0.0.72-alpha.63/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
6971
github.com/openimsdk/tools v0.0.50-alpha.21 h1:ZKgSFkiBjz6KcNZlNwvrSoUYJ7K5Flan8wHuRBH3VqY=
7072
github.com/openimsdk/tools v0.0.50-alpha.21/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
7173
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
@@ -103,26 +105,26 @@ go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
103105
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
104106
golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k=
105107
golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
106-
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
107-
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
108+
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
109+
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
108110
golang.org/x/image v0.15.0 h1:kOELfmgrmJlw4Cdb7g/QGuB3CvDrXbqEIww/pNtNBm8=
109111
golang.org/x/image v0.15.0/go.mod h1:HUYqC05R2ZcZ3ejNQsIHQDQiwWM4JBqmm6MKANTp4LE=
110-
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
111-
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
112-
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
113-
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
112+
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
113+
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
114+
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
115+
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
114116
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
115117
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
116-
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
117-
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
118-
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
119-
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
120-
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc=
121-
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
122-
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
123-
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
124-
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
125-
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
118+
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
119+
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
120+
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
121+
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
122+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
123+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
124+
google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0=
125+
google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA=
126+
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
127+
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
126128
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
127129
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
128130
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
@@ -132,6 +134,4 @@ gorm.io/driver/sqlite v1.5.5 h1:7MDMtUZhV065SilG62E0MquljeArQZNfJnjd9i9gx3E=
132134
gorm.io/driver/sqlite v1.5.5/go.mod h1:6NgQ7sQWAIFsPrJJl1lSNSu2TABh0ZZ/zm5fosATavE=
133135
gorm.io/gorm v1.25.10 h1:dQpO+33KalOA+aFYGlK+EfxcI5MbO7EP2yYygwh9h+s=
134136
gorm.io/gorm v1.25.10/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
135-
nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q=
136-
nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
137137
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=

internal/conversation_msg/conversation.go

+59-36
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import (
3939
pbConversation "github.com/openimsdk/protocol/conversation"
4040
)
4141

42+
const MaxRecursionDepth = 3
43+
4244
func (c *Conversation) setConversation(ctx context.Context, apiReq *pbConversation.SetConversationsReq, localConversation *model_struct.LocalConversation) error {
4345
apiReq.Conversation.ConversationID = localConversation.ConversationID
4446
apiReq.Conversation.ConversationType = localConversation.ConversationType
@@ -77,19 +79,12 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
7779
log.ZDebug(ctx, "pull message", "pull cost time", time.Since(t))
7880
t = time.Now()
7981

80-
var thisEndSeq int64
81-
thisEndSeq, messageList = c.LocalChatLog2MsgStruct(list, isReverse)
82+
messageList = c.LocalChatLog2MsgStruct(list)
8283
log.ZDebug(ctx, "message convert and unmarshal", "unmarshal cost time", time.Since(t))
8384
t = time.Now()
8485
if !isReverse {
8586
sort.Sort(messageList)
86-
if thisEndSeq != 0 {
87-
c.messagePullForwardEndSeqMap.Store(conversationID, thisEndSeq)
88-
}
89-
} else {
90-
if thisEndSeq != 0 {
91-
c.messagePullReverseEndSeqMap.Store(conversationID, thisEndSeq)
92-
}
87+
9388
}
9489
log.ZDebug(ctx, "sort", "sort cost time", time.Since(t))
9590
messageListCallback.MessageList = messageList
@@ -104,20 +99,60 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
10499

105100
// Get the number of invalid messages in this batch to recursive fetching from earlier points.
106101
shouldFetchMoreMessagesNum := func(messages []*model_struct.LocalChatLog) int {
107-
if len(messages) == 0 {
108-
return count
109-
}
110-
102+
var thisEndSeq int64
111103
// Represents the number of valid messages in the batch
112104
validateMessageNum := 0
113105
for _, msg := range messages {
114-
if msg.Status < constant.MsgStatusHasDeleted {
115-
validateMessageNum++
116-
validMessages = append(validMessages, msg)
106+
if msg.Seq != 0 && thisEndSeq == 0 {
107+
thisEndSeq = msg.Seq
108+
}
109+
if isReverse {
110+
if msg.Seq > thisEndSeq && thisEndSeq != 0 {
111+
thisEndSeq = msg.Seq
112+
}
113+
117114
} else {
115+
if msg.Seq < thisEndSeq && msg.Seq != 0 {
116+
thisEndSeq = msg.Seq
117+
}
118+
}
119+
if msg.Status >= constant.MsgStatusHasDeleted {
118120
log.ZDebug(ctx, "this message has been deleted or exception message", "msg", msg)
121+
continue
122+
}
123+
124+
validateMessageNum++
125+
validMessages = append(validMessages, msg)
126+
127+
}
128+
if !isReverse {
129+
if thisEndSeq != 0 {
130+
c.messagePullForwardEndSeqMap.StoreWithFunc(conversationID, thisEndSeq, func(key string, value int64) bool {
131+
lastEndSeq, _ := c.messagePullForwardEndSeqMap.Load(key)
132+
if value < lastEndSeq || lastEndSeq == 0 {
133+
log.ZDebug(ctx, "update the end sequence of the message", "lastEndSeq", lastEndSeq, "thisEndSeq", value)
134+
return true
135+
}
136+
log.ZWarn(ctx, "The end sequence number of the message is more than the last end sequence number",
137+
nil, "conversationID", key, "value", value, "lastEndSeq", lastEndSeq)
138+
return false
139+
})
140+
}
141+
} else {
142+
if thisEndSeq != 0 {
143+
c.messagePullReverseEndSeqMap.StoreWithFunc(conversationID, thisEndSeq, func(key string, value int64) bool {
144+
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(key)
145+
if value > lastEndSeq || lastEndSeq == 0 {
146+
log.ZDebug(ctx, "update the end sequence of the message", "lastEndSeq", lastEndSeq, "thisEndSeq", value)
147+
return true
148+
}
149+
log.ZWarn(ctx, "The end sequence number of the message is less than the last end sequence number",
150+
nil, "conversationID", key, "value", value, "lastEndSeq", lastEndSeq)
151+
return false
152+
})
119153
}
120154
}
155+
121156
return count - validateMessageNum
122157
}
123158
getNewStartTime := func(messages []*model_struct.LocalChatLog) int64 {
@@ -139,11 +174,11 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
139174
t = time.Now()
140175
thisStartSeq := c.validateAndFillInternalGaps(ctx, conversationID, isReverse,
141176
count, startTime, &list, messageListCallback)
142-
log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t))
177+
log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
143178
t = time.Now()
144179
c.validateAndFillInterBlockGaps(ctx, thisStartSeq, conversationID,
145180
isReverse, count, startTime, &list, messageListCallback)
146-
log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t))
181+
log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
147182
t = time.Now()
148183
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse,
149184
count, startTime, &list, messageListCallback)
@@ -152,8 +187,10 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
152187
// continue fetching recursively until the valid messages are sufficient or all messages have been fetched.
153188
missingCount := shouldFetchMoreMessagesNum(list)
154189
if missingCount > 0 && !messageListCallback.IsEnd {
155-
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID", conversationID)
156-
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, getNewStartTime(list), isReverse, messageListCallback)
190+
newStartTime := getNewStartTime(list)
191+
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID",
192+
conversationID, "newStartTime", newStartTime)
193+
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, isReverse, messageListCallback)
157194
if err != nil {
158195
return nil, err
159196
}
@@ -164,31 +201,17 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
164201
return validMessages, nil
165202
}
166203

167-
func (c *Conversation) LocalChatLog2MsgStruct(list []*model_struct.LocalChatLog, isReverse bool) (int64, []*sdk_struct.MsgStruct) {
204+
func (c *Conversation) LocalChatLog2MsgStruct(list []*model_struct.LocalChatLog) []*sdk_struct.MsgStruct {
168205
messageList := make([]*sdk_struct.MsgStruct, 0, len(list))
169-
var thisEndSeq int64
170206
for _, v := range list {
171-
if v.Seq != 0 && thisEndSeq == 0 {
172-
thisEndSeq = v.Seq
173-
}
174-
if isReverse {
175-
if v.Seq > thisEndSeq && thisEndSeq != 0 {
176-
thisEndSeq = v.Seq
177-
}
178-
179-
} else {
180-
if v.Seq < thisEndSeq && v.Seq != 0 {
181-
thisEndSeq = v.Seq
182-
}
183-
}
184207
temp := LocalChatLogToMsgStruct(v)
185208

186209
if temp.AttachedInfoElem.IsPrivateChat && temp.SendTime+int64(temp.AttachedInfoElem.BurnDuration) < time.Now().Unix() {
187210
continue
188211
}
189212
messageList = append(messageList, temp)
190213
}
191-
return thisEndSeq, messageList
214+
return messageList
192215
}
193216

194217
func (c *Conversation) typingStatusUpdate(ctx context.Context, recvID, msgTip string) error {

0 commit comments

Comments
 (0)