Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: change the login and initialization logic to prevent the SD… #889

Merged
merged 8 commits into from
Mar 20, 2025
1 change: 0 additions & 1 deletion integration_test/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func GetConf() sdk_struct.IMConfig {
cf.WsAddr = WsAddr
cf.DataDir = DataDir
cf.LogLevel = LogLevel
cf.IsExternalExtensions = true
cf.PlatformID = int32(PlatformID)
cf.LogFilePath = LogFilePath
cf.IsLogStandardOutput = IsLogStandardOutput
Expand Down
2 changes: 1 addition & 1 deletion integration_test/internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (m *MetaManager) BuildCtx(ctx context.Context) context.Context {
}
ctx = ccontext.WithInfo(ctx, &ccontext.GlobalConfig{
Token: m.token,
IMConfig: m.IMConfig,
IMConfig: &m.IMConfig,
})
ctx = ccontext.WithOperationID(ctx, utils.OperationIDGenerator())
ctx = mcontext.SetOpUserID(ctx, "admin")
Expand Down
7 changes: 4 additions & 3 deletions integration_test/internal/pkg/sdk_user_simulator/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@ func GetRelativeServerTime() int64 {
return utils.GetCurrentTimestampByMill() + timeOffset
}

func InitSDK(userID string, cf sdk_struct.IMConfig) (*open_im_sdk.LoginMgr, error) {
func InitSDK(userID string, cf sdk_struct.IMConfig) (*open_im_sdk.UserContext, error) {
userForSDK := open_im_sdk.NewLoginMgr()
var testConnListener testConnListener
testConnListener.UserID = userID
isInit := userForSDK.InitSDK(cf, &testConnListener)
isInit := userForSDK.InitSDK(&cf, &testConnListener)
if !isInit {
return nil, errs.New("sdk init failed").Wrap()
}
userForSDK.InitResources()

SetListener(userForSDK, userID)

return userForSDK, nil
}

func SetListener(userForSDK *open_im_sdk.LoginMgr, userID string) {
func SetListener(userForSDK *open_im_sdk.UserContext, userID string) {
var testConversation conversationCallBack
userForSDK.SetConversationListener(&testConversation)
var testUser userCallback
Expand Down
4 changes: 2 additions & 2 deletions integration_test/internal/sdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ var (
type TestSDK struct {
UserID string
Num int
SDK *open_im_sdk.LoginMgr
SDK *open_im_sdk.UserContext
}

func NewTestSDK(userID string, num int, loginMgr *open_im_sdk.LoginMgr) *TestSDK {
func NewTestSDK(userID string, num int, loginMgr *open_im_sdk.UserContext) *TestSDK {
return &TestSDK{
UserID: userID,
Num: num,
Expand Down
4 changes: 2 additions & 2 deletions internal/conversation_msg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (c *Conversation) checkID(ctx context.Context, s *sdk_struct.MsgStruct,
return nil, sdkerrs.ErrArgs
}
s.SendID = c.loginUserID
s.SenderPlatformID = c.platformID
s.SenderPlatformID = c.platform
lc := &model_struct.LocalConversation{LatestMsgSendTime: s.CreateTime}
//assemble messages and conversations based on single or group chat types
if recvID == "" {
Expand Down Expand Up @@ -950,7 +950,7 @@ func (c *Conversation) initBasicInfo(ctx context.Context, message *sdk_struct.Ms
message.ClientMsgID = ClientMsgID
message.MsgFrom = msgFrom
message.ContentType = contentType
message.SenderPlatformID = c.platformID
message.SenderPlatformID = c.platform
return nil
}

Expand Down
28 changes: 19 additions & 9 deletions internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/openimsdk/openim-sdk-core/v3/internal/third/file"
"github.com/openimsdk/openim-sdk-core/v3/internal/user"
"github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback"
"github.com/openimsdk/openim-sdk-core/v3/pkg/ccontext"
"github.com/openimsdk/openim-sdk-core/v3/pkg/common"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/db_interface"
Expand Down Expand Up @@ -58,7 +57,7 @@ type Conversation struct {
recvCh chan common.Cmd2Value
msgSyncerCh chan common.Cmd2Value
loginUserID string
platformID int32
platform int32
DataDir string
relation *relation.Relation
group *group.Group
Expand All @@ -79,6 +78,22 @@ type Conversation struct {
typing *typing
}

func (c *Conversation) SetDataBase(db db_interface.DataBase) {
c.db = db
}

func (c *Conversation) SetLoginUserID(loginUserID string) {
c.loginUserID = loginUserID
}

func (c *Conversation) SetPlatform(platform int32) {
c.platform = platform
}

func (c *Conversation) SetDataDir(DataDir string) {
c.DataDir = DataDir
}

func (c *Conversation) SetMsgListener(msgListener func() open_im_sdk_callback.OnAdvancedMsgListener) {
c.msgListener = msgListener
}
Expand All @@ -91,22 +106,17 @@ func (c *Conversation) SetBusinessListener(businessListener func() open_im_sdk_c
c.businessListener = businessListener
}

func NewConversation(ctx context.Context, longConnMgr *interaction.LongConnMgr, db db_interface.DataBase,
func NewConversation(longConnMgr *interaction.LongConnMgr,
recvCh, msgSyncerCh chan common.Cmd2Value, relation *relation.Relation, group *group.Group, user *user.User,
file *file.File) *Conversation {
info := ccontext.Info(ctx)
n := &Conversation{db: db,
n := &Conversation{
LongConnMgr: longConnMgr,
recvCh: recvCh,
msgSyncerCh: msgSyncerCh,
loginUserID: info.UserID(),
platformID: info.PlatformID(),
DataDir: info.DataDir(),
relation: relation,
group: group,
user: user,
file: file,
IsExternalExtensions: info.IsExternalExtensions(),
maxSeqRecorder: NewMaxSeqRecorder(),
messagePullForwardEndSeqMap: cache.NewConversationSeqContextCache(),
messagePullReverseEndSeqMap: cache.NewConversationSeqContextCache(),
Expand Down
39 changes: 0 additions & 39 deletions internal/group/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,6 @@ func (g *Group) DismissGroup(ctx context.Context, groupID string) error {
return nil
}

//func (g *Group) SetGroupApplyMemberFriend(ctx context.Context, groupID string, rule int32) error {
// return g.SetGroupInfo(ctx, &sdkws.GroupInfoForSet{GroupID: groupID, ApplyMemberFriend: wrapperspb.Int32(rule)})
//}
//
//func (g *Group) SetGroupLookMemberInfo(ctx context.Context, groupID string, rule int32) error {
// return g.SetGroupInfo(ctx, &sdkws.GroupInfoForSet{GroupID: groupID, LookMemberInfo: wrapperspb.Int32(rule)})
//}
//
//func (g *Group) SetGroupVerification(ctx context.Context, groupID string, verification int32) error {
// return g.SetGroupInfo(ctx, &sdkws.GroupInfoForSet{GroupID: groupID, NeedVerification: wrapperspb.Int32(verification)})
//}

func (g *Group) ChangeGroupMute(ctx context.Context, groupID string, isMute bool) (err error) {
if isMute {
err = g.muteGroup(ctx, groupID)
Expand Down Expand Up @@ -183,14 +171,6 @@ func (g *Group) SetGroupMemberInfo(ctx context.Context, groupMemberInfo *group.S
return g.IncrSyncGroupAndMember(ctx, groupMemberInfo.GroupID)
}

//func (g *Group) SetGroupMemberRoleLevel(ctx context.Context, groupID, userID string, roleLevel int) error {
// return g.SetGroupMemberInfo(ctx, &group.SetGroupMemberInfo{GroupID: groupID, UserID: userID, RoleLevel: wrapperspb.Int32(int32(roleLevel))})
//}
//
//func (g *Group) SetGroupMemberNickname(ctx context.Context, groupID, userID string, groupMemberNickname string) error {
// return g.SetGroupMemberInfo(ctx, &group.SetGroupMemberInfo{GroupID: groupID, UserID: userID, Nickname: wrapperspb.String(groupMemberNickname)})
//}

func (g *Group) GetJoinedGroupList(ctx context.Context) ([]*model_struct.LocalGroup, error) {
return g.db.GetJoinedGroupListDB(ctx)
}
Expand Down Expand Up @@ -524,22 +504,3 @@ func (g *Group) HandlerGroupApplication(ctx context.Context, req *group.GroupApp
func (g *Group) GetGroupMemberNameAndFaceURL(ctx context.Context, groupID string, userIDs []string) (map[string]*model_struct.LocalGroupMember, error) {
return g.GetGroupMembersInfo(ctx, groupID, userIDs)
}

//func (g *Group) SearchGroupMembersV2(ctx context.Context, req *group.SearchGroupMemberReq) ([]*model_struct.LocalGroupMember, error) {
// if err := req.Check(); err != nil {
// return nil, err
// }
// info, err := g.db.GetGroupInfoByGroupID(ctx, req.GroupID)
// if err != nil {
// return nil, err
// }
// if info.MemberCount <= pconstant.MaxSyncPullNumber {
// return g.db.SearchGroupMembersDB(ctx, req.Keyword, req.GroupID, true, false,
// int((req.Pagination.PageNumber-1)*req.Pagination.ShowNumber), int(req.Pagination.ShowNumber))
// }
// resp, err := util.CallApi[group.SearchGroupMemberResp](ctx, constant.SearchGroupMember, req)
// if err != nil {
// return nil, err
// }
// return datautil.Slice(resp.Members, g.pbGroupMemberToLocal), nil
//}
14 changes: 11 additions & 3 deletions internal/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ const (
groupMemberSyncLimit = 1000
)

func NewGroup(loginUserID string, db db_interface.DataBase,
func NewGroup(
conversationCh chan common.Cmd2Value) *Group {
g := &Group{
loginUserID: loginUserID,
db: db,
conversationCh: conversationCh,
}
g.initSyncer()
Expand Down Expand Up @@ -311,3 +309,13 @@ func (g *Group) FetchGroupOrError(ctx context.Context, groupID string) (*model_s
func (g *Group) delLocalGroupRequest(ctx context.Context, groupID, userID string) error {
return g.db.DeleteGroupRequest(ctx, groupID, userID)
}

// SetDataBase sets the DataBase field in Group struct
func (g *Group) SetDataBase(db db_interface.DataBase) {
g.db = db
}

// SetLoginUserID sets the loginUserID field in Group struct
func (g *Group) SetLoginUserID(loginUserID string) {
g.loginUserID = loginUserID
}
63 changes: 30 additions & 33 deletions internal/interaction/msg_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,36 +55,33 @@ type MsgSyncer struct {
syncedMaxSeqs map[string]int64 // map of the maximum synced SEQ numbers for all group IDs
syncedMaxSeqsLock sync.RWMutex // syncedMaxSeqs map lock
db db_interface.DataBase // data store
syncTimes int // times of sync
ctx context.Context // context
reinstalled bool //true if the app was uninstalled and reinstalled
isSyncing bool // indicates whether data is being synced
isSyncingLock sync.Mutex // lock for syncing state

}

func (m *MsgSyncer) SetLoginUserID(loginUserID string) {
m.loginUserID = loginUserID
}

func (m *MsgSyncer) SetDataBase(db db_interface.DataBase) {
m.db = db
}

// NewMsgSyncer creates a new instance of the message synchronizer.
func NewMsgSyncer(ctx context.Context, conversationCh, recvCh chan common.Cmd2Value,
loginUserID string, longConnMgr *LongConnMgr, db db_interface.DataBase, syncTimes int) (*MsgSyncer, error) {
m := &MsgSyncer{
loginUserID: loginUserID,
func NewMsgSyncer(conversationCh, recvCh chan common.Cmd2Value,
longConnMgr *LongConnMgr) *MsgSyncer {
return &MsgSyncer{
longConnMgr: longConnMgr,
recvCh: recvCh,
conversationCh: conversationCh,
ctx: ctx,
syncedMaxSeqs: make(map[string]int64),
db: db,
syncTimes: syncTimes,
}
if err := m.loadSeq(ctx); err != nil {
log.ZError(ctx, "loadSeq err", err)
return nil, err
}
return m, nil
}

// seq The db reads the data to the memory,set syncedMaxSeqs
func (m *MsgSyncer) loadSeq(ctx context.Context) error {
// LoadSeq seq The db reads the data to the memory,set syncedMaxSeqs
func (m *MsgSyncer) LoadSeq(ctx context.Context) error {
conversationIDList, err := m.db.GetAllConversationIDList(ctx)
if err != nil {
log.ZError(ctx, "get conversation id list failed", err)
Expand Down Expand Up @@ -179,7 +176,7 @@ func (m *MsgSyncer) DoListener(ctx context.Context) {
case cmd := <-m.recvCh:
m.handlePushMsgAndEvent(cmd)
case <-ctx.Done():
log.ZInfo(m.ctx, "msg syncer done, sdk logout.....")
log.ZInfo(ctx, "msg syncer done, sdk logout.....")
return
}
}
Expand Down Expand Up @@ -271,7 +268,7 @@ func (m *MsgSyncer) compareSeqsAndBatchSync(ctx context.Context, maxSeqToSync ma
}
m.reinstalled = false
}()
_ = m.syncAndTriggerReinstallMsgs(m.ctx, needSyncSeqMap, pullNums)
_ = m.syncAndTriggerReinstallMsgs(ctx, needSyncSeqMap, pullNums)
} else {
for conversationID, maxSeq := range maxSeqToSync {
if syncedMaxSeq, ok := m.syncedMaxSeqs[conversationID]; ok {
Expand All @@ -284,7 +281,7 @@ func (m *MsgSyncer) compareSeqsAndBatchSync(ctx context.Context, maxSeqToSync ma
}
}
}
_ = m.syncAndTriggerMsgs(m.ctx, needSyncSeqMap, pullNums)
_ = m.syncAndTriggerMsgs(ctx, needSyncSeqMap, pullNums)
}
}

Expand Down Expand Up @@ -351,45 +348,45 @@ func (m *MsgSyncer) pushTriggerAndSync(ctx context.Context, pushMessages map[str
func (m *MsgSyncer) doConnected(ctx context.Context) {
reinstalled := m.reinstalled
if reinstalled {
common.TriggerCmdSyncFlag(m.ctx, constant.AppDataSyncStart, m.conversationCh)
common.TriggerCmdSyncFlag(ctx, constant.AppDataSyncStart, m.conversationCh)
} else {
common.TriggerCmdSyncFlag(m.ctx, constant.MsgSyncBegin, m.conversationCh)
common.TriggerCmdSyncFlag(ctx, constant.MsgSyncBegin, m.conversationCh)
}
var resp sdkws.GetMaxSeqResp
if err := m.longConnMgr.SendReqWaitResp(m.ctx, &sdkws.GetMaxSeqReq{UserID: m.loginUserID}, constant.GetNewestSeq, &resp); err != nil {
log.ZError(m.ctx, "get max seq error", err)
common.TriggerCmdSyncFlag(m.ctx, constant.MsgSyncFailed, m.conversationCh)
if err := m.longConnMgr.SendReqWaitResp(ctx, &sdkws.GetMaxSeqReq{UserID: m.loginUserID}, constant.GetNewestSeq, &resp); err != nil {
log.ZError(ctx, "get max seq error", err)
common.TriggerCmdSyncFlag(ctx, constant.MsgSyncFailed, m.conversationCh)
return
} else {
log.ZDebug(m.ctx, "get max seq success", "resp", resp.MaxSeqs)
log.ZDebug(ctx, "get max seq success", "resp", resp.MaxSeqs)
}
m.compareSeqsAndBatchSync(ctx, resp.MaxSeqs, connectPullNums)
if reinstalled {
common.TriggerCmdSyncFlag(m.ctx, constant.AppDataSyncFinish, m.conversationCh)
common.TriggerCmdSyncFlag(ctx, constant.AppDataSyncFinish, m.conversationCh)
} else {
common.TriggerCmdSyncFlag(m.ctx, constant.MsgSyncEnd, m.conversationCh)
common.TriggerCmdSyncFlag(ctx, constant.MsgSyncEnd, m.conversationCh)
}
}

func (m *MsgSyncer) doWakeupDataSync(ctx context.Context) {
common.TriggerCmdSyncData(ctx, m.conversationCh)
var resp sdkws.GetMaxSeqResp
if err := m.longConnMgr.SendReqWaitResp(m.ctx, &sdkws.GetMaxSeqReq{UserID: m.loginUserID}, constant.GetNewestSeq, &resp); err != nil {
log.ZError(m.ctx, "get max seq error", err)
if err := m.longConnMgr.SendReqWaitResp(ctx, &sdkws.GetMaxSeqReq{UserID: m.loginUserID}, constant.GetNewestSeq, &resp); err != nil {
log.ZError(ctx, "get max seq error", err)
return
} else {
log.ZDebug(m.ctx, "get max seq success", "resp", resp.MaxSeqs)
log.ZDebug(ctx, "get max seq success", "resp", resp.MaxSeqs)
}
m.compareSeqsAndBatchSync(ctx, resp.MaxSeqs, defaultPullNums)
}

func (m *MsgSyncer) doIMMessageSync(ctx context.Context) {
var resp sdkws.GetMaxSeqResp
if err := m.longConnMgr.SendReqWaitResp(m.ctx, &sdkws.GetMaxSeqReq{UserID: m.loginUserID}, constant.GetNewestSeq, &resp); err != nil {
log.ZError(m.ctx, "get max seq error", err)
if err := m.longConnMgr.SendReqWaitResp(ctx, &sdkws.GetMaxSeqReq{UserID: m.loginUserID}, constant.GetNewestSeq, &resp); err != nil {
log.ZError(ctx, "get max seq error", err)
return
} else {
log.ZDebug(m.ctx, "get max seq success", "resp", resp.MaxSeqs)
log.ZDebug(ctx, "get max seq success", "resp", resp.MaxSeqs)
}
m.compareSeqsAndBatchSync(ctx, resp.MaxSeqs, defaultPullNums)
}
Expand Down
16 changes: 13 additions & 3 deletions internal/relation/relation.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ const (
friendSyncLimit int64 = 10000
)

func NewFriend(loginUserID string, db db_interface.DataBase, user *user.User, conversationCh chan common.Cmd2Value) *Relation {
r := &Relation{loginUserID: loginUserID, db: db, user: user, conversationCh: conversationCh}
func NewRelation(conversationCh chan common.Cmd2Value, user *user.User) *Relation {
r := &Relation{conversationCh: conversationCh, user: user}
r.initSyncer()
return r
}
Expand Down Expand Up @@ -56,7 +56,7 @@ func (r *Relation) initSyncer() {
return r.db.DeleteFriendDB(ctx, value.FriendUserID)
}),
syncer.WithUpdate[*model_struct.LocalFriend, relation.GetPaginationFriendsResp, [2]string](func(ctx context.Context, server, local *model_struct.LocalFriend) error {
r.user.UserCache.Delete(server.FriendUserID)
r.user.UserCache().Delete(server.FriendUserID)
return r.db.UpdateFriend(ctx, server)
}),
syncer.WithUUID[*model_struct.LocalFriend, relation.GetPaginationFriendsResp, [2]string](func(value *model_struct.LocalFriend) [2]string {
Expand Down Expand Up @@ -216,3 +216,13 @@ func (r *Relation) SetListener(listener func() open_im_sdk_callback.OnFriendship
func (r *Relation) SetListenerForService(listener open_im_sdk_callback.OnListenerForService) {
r.listenerForService = listener
}

// SetDataBase sets the DataBase field in Relation struct
func (r *Relation) SetDataBase(db db_interface.DataBase) {
r.db = db
}

// SetLoginUserID sets the loginUserID field in Relation struct
func (r *Relation) SetLoginUserID(loginUserID string) {
r.loginUserID = loginUserID
}
Loading
Loading