Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: modify the historical message retrieval interface to address the message gap problem caused by server crashes or redis seq cache expired. #856

Merged
merged 9 commits into from
Feb 11, 2025
Merged
22 changes: 12 additions & 10 deletions internal/conversation_msg/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
var messageListCallback sdk.GetAdvancedHistoryMessageListCallback
var conversationID string
var startClientMsgID string
var startTime int64
var startTime, startSeq int64
var err error
var messageList sdk_struct.NewMsgList
conversationID = req.ConversationID
Expand All @@ -65,6 +65,8 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
return nil, err
}
startTime = m.SendTime
startClientMsgID = req.StartClientMsgID
startSeq = m.Seq
err = c.handleEndSeq(ctx, req, isReverse, m)
if err != nil {
return nil, err
Expand All @@ -77,7 +79,7 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd

log.ZDebug(ctx, "Assembly conversation parameters", "cost time", time.Since(t), "conversationID",
conversationID, "startTime:", startTime, "count:", req.Count)
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, startClientMsgID, isReverse, req.ViewType, &messageListCallback)
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, startSeq, startClientMsgID, isReverse, req.ViewType, &messageListCallback)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -135,7 +137,7 @@ func (c *Conversation) handleEndSeq(ctx context.Context, req sdk.GetAdvancedHist
}

func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversationID string,
count int, startTime int64, startClientMsgID string, isReverse bool, viewType int, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {
count int, startTime, startSeq int64, startClientMsgID string, isReverse bool, viewType int, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {

var list, validMessages []*model_struct.LocalChatLog

Expand Down Expand Up @@ -197,16 +199,16 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati

return count - validateMessageNum
}
getNewStartMessageInfo := func(messages []*model_struct.LocalChatLog) (int64, string) {
getNewStartMessageInfo := func(messages []*model_struct.LocalChatLog) (int64, int64, string) {
if len(messages) == 0 {
return 0, ""
return 0, 0, ""
}
// Returns the SendTime and ClientMsgID of the last element in the message list
return messages[len(messages)-1].SendTime, messages[len(messages)-1].ClientMsgID
return messages[len(messages)-1].SendTime, messages[len(messages)-1].Seq, messages[len(messages)-1].ClientMsgID
}

t := time.Now()
list, err := c.db.GetMessageList(ctx, conversationID, count, startTime, startClientMsgID, isReverse)
list, err := c.db.GetMessageList(ctx, conversationID, count, startTime, startSeq, startClientMsgID, isReverse)
log.ZDebug(ctx, "db get messageList", "cost time", time.Since(t), "len", len(list), "err",
err, "conversationID", conversationID)

Expand All @@ -229,10 +231,10 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
// continue fetching recursively until the valid messages are sufficient or all messages have been fetched.
missingCount := shouldFetchMoreMessagesNum(list)
if missingCount > 0 && !messageListCallback.IsEnd {
newStartTime, newStartClientMsgID := getNewStartMessageInfo(list)
newStartTime, newStartSeq, newStartClientMsgID := getNewStartMessageInfo(list)
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID",
conversationID, "newStartTime", newStartTime)
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, newStartClientMsgID, isReverse, viewType, messageListCallback)
conversationID, "newStartTime", newStartTime, "newStartSeq", newStartSeq, "newStartClientMsgID", newStartClientMsgID)
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, newStartSeq, newStartClientMsgID, isReverse, viewType, messageListCallback)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/conversation_msg/revoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *Conversation) revokeMessage(ctx context.Context, tips *sdkws.RevokeMsgT
log.ZDebug(ctx, "latestMsg", "latestMsg", &latestMsg, "seq", tips.Seq)
if latestMsg.Seq <= tips.Seq {
var newLatestMsg sdk_struct.MsgStruct
msgs, err := c.db.GetMessageList(ctx, tips.ConversationID, 1, 0, "", false)
msgs, err := c.db.GetMessageList(ctx, tips.ConversationID, 1, 0, 0, "", false)
if err != nil || len(msgs) == 0 {
log.ZError(ctx, "GetMessageListNoTime failed", err, "tips", &tips)
return errs.Wrap(err)
Expand Down
12 changes: 7 additions & 5 deletions pkg/db/chat_log_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (d *DataBase) UpdateMessageTimeAndStatus(ctx context.Context, conversationI
Updates(model_struct.LocalChatLog{Status: status, SendTime: sendTime, ServerMsgID: serverMsgID}).Error, "UpdateMessageStatusBySourceID failed")
}

func (d *DataBase) GetMessageList(ctx context.Context, conversationID string, count int, startTime int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error) {
func (d *DataBase) GetMessageList(ctx context.Context, conversationID string, count int, startTime, startSeq int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error) {
if err = d.initChatLog(ctx, conversationID); err != nil {
log.ZWarn(ctx, "initChatLog err", err)
return nil, err
Expand All @@ -165,14 +165,16 @@ func (d *DataBase) GetMessageList(ctx context.Context, conversationID string, co
var condition, timeOrder, timeSymbol string
if isReverse {
timeOrder = "send_time ASC,seq ASC"
timeSymbol = ">="
timeSymbol = ">"
} else {
timeOrder = "send_time DESC,seq DESC"
timeSymbol = "<="
timeSymbol = "<"
}
if startTime > 0 {
condition = "send_time " + timeSymbol + " ? AND client_msg_id != ?"
err = errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).Where(condition, startTime, startClientMsgID).
condition = "send_time " + timeSymbol + " ? " +
"OR (send_time = ? AND (seq " + timeSymbol + " ? OR (seq = 0 AND client_msg_id != ?)))"
err = errs.WrapMsg(d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).
Where(condition, startTime, startTime, startSeq, startClientMsgID).
Order(timeOrder).Offset(0).Limit(count).Find(&result).Error, "GetMessageList failed")
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/db_interface/databse.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type MessageModel interface {
UpdateMessage(ctx context.Context, conversationID string, c *model_struct.LocalChatLog) error
UpdateMessageBySeq(ctx context.Context, conversationID string, c *model_struct.LocalChatLog) error
UpdateMessageTimeAndStatus(ctx context.Context, conversationID, clientMsgID string, serverMsgID string, sendTime int64, status int32) error
GetMessageList(ctx context.Context, conversationID string, count int, startTime int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error)
GetMessageList(ctx context.Context, conversationID string, count int, startTime, startSeq int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error)
MarkConversationMessageAsReadDB(ctx context.Context, conversationID string, msgIDs []string) (rowsAffected int64, err error)
MarkConversationMessageAsReadBySeqs(ctx context.Context, conversationID string, seqs []int64) (rowsAffected int64, err error)
GetUnreadMessage(ctx context.Context, conversationID string) (result []*model_struct.LocalChatLog, err error)
Expand Down
7 changes: 6 additions & 1 deletion test/conversation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ func Test_FindMessageList(t *testing.T) {
}

func Test_GetAdvancedHistoryMessageList(t *testing.T) {
msgs, err := open_im_sdk.UserForSDK.Conversation().GetAdvancedHistoryMessageList(ctx, sdk_params_callback.GetAdvancedHistoryMessageListParams{})
msgs, err := open_im_sdk.UserForSDK.Conversation().GetAdvancedHistoryMessageList(ctx, sdk_params_callback.GetAdvancedHistoryMessageListParams{
ConversationID: "si_5318543822_9511766539",
StartClientMsgID: "",
Count: 40,
ViewType: 0,
})
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions wasm/indexdb/chat_log_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func (i *LocalChatLogs) UpdateMessageTimeAndStatus(ctx context.Context, conversa
}

// GetMessageList retrieves a list of messages from the local chat log.
func (i *LocalChatLogs) GetMessageList(ctx context.Context, conversationID string, count int, startTime int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error) {
msgList, err := exec.Exec(conversationID, count, startTime, startClientMsgID, isReverse, i.loginUserID)
func (i *LocalChatLogs) GetMessageList(ctx context.Context, conversationID string, count int, startTime, startSeq int64, startClientMsgID string, isReverse bool) (result []*model_struct.LocalChatLog, err error) {
msgList, err := exec.Exec(conversationID, count, startTime, startSeq, startClientMsgID, isReverse, i.loginUserID)
if err != nil {
return nil, err
} else {
Expand Down
Loading