Skip to content

Commit ce33b79

Browse files
FGadvancerwithchaowangchuxiao-devBanTangerhanzhixiao
authored
fix: conflict resolve main (#537)
* statistics user register * refactor: router change * minio init * UserRegisterCount * push use local conn * refactor: user pb update * remove online push close grpc conn * refactor: user pb update * refactor:pb file * msgs statistics * msgs statistics * revoke userID * refactor: errcode update * active user * active user * active user * refactor: errcode update * feat: conn update token * active user * active user * feat: conn update token * active user * feat: conn update token * feat: conn update token * feat: conn update token * add tx_oss cos * active user * active user * group create * group create * feat: group notification show to conversation * feat: group notification show to conversation * group active * user active * sendNotificationWithName * withname * privateChat * a2r call option * grpc with detail return error * change log error * chain unary interceptor * api nil slice map * fix sync has read * fix: text update * fix: update add model * set conversations update * set privateChat * fix: content update * remove unuse rpc * msgDestruct * cron use rpc mw * set IsMsgDestruct * msg destruct * msgDestruct * s3 minio, cos, oss support * feat: add implement of GetUsersOnlineStatus, #472 (#477) * s3 minio, cos, oss support * s3 route * remove extendMsg code * s3 route * remove unuse code * s3 pb * s3 pb * s3 pb * s3 presigned put * s3 presigned test * s3 presigned test * s3 presigned test * s3 presigned test * s3 presigned test * s3 presigned test * s3 presigned test * s3 presigned test * Update .gitignore (#482) * s3 debug log * s3 debug log * cron add log and fix cron * add log * cron * s3 config * fix kick user bug * s3 cos * add kick log * s3 cos test * s3 cos test * s3 cos test * kick user log * kickuserlog * s3 cos copy * s3 cos copy * s3 url * s3 url * s3 AccessURL * log * s3 InitiateMultipartUpload add ExpireTime * feat: regenerate pb file * feat: regenerate pb file * Revert "feat: regenerate pb file" This reverts commit 434f225. * Delete .idea directory * feat: regenerate pb file * fix: remove import C * fix: add msg transfer main file * fix: get user online status fix --------- Co-authored-by: withchao <[email protected]> Co-authored-by: wangchuxiao <[email protected]> Co-authored-by: BanTanger <[email protected]> Co-authored-by: withchao <[email protected]> Co-authored-by: Alan <[email protected]>
1 parent 4cacc3f commit ce33b79

File tree

47 files changed

+616
-2889
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+616
-2889
lines changed

Diff for: config/config.yaml

+23-40
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ mysql:
3636
mongo:
3737
uri: #不为空则直接使用该值
3838
address: [ 127.0.0.1:37017 ] #单机时为mongo地址,使用分片集群时,为mongos地址
39-
database: openIM_v3 #mongo db 默认即可
39+
database: openIM_v3 #mongo db 默认即可
4040
username: root #用户名
4141
password: openIM123 #密码
4242
maxPoolSize: 100
@@ -56,14 +56,11 @@ kafka:
5656
topic: "offlineMsgToMongoMysql" #不建议修改
5757
msgToPush:
5858
topic: "msgToPush" #不建议修改
59-
msgToModify:
60-
topic: "msgToModify" #不建议修改
6159
consumerGroupID: #消费者组,不建议修改
6260
msgToRedis: redis #
6361
msgToMongo: mongo #
6462
msgToMySql: mysql #
6563
msgToPush: push #
66-
msgToModify: modify #
6764

6865

6966
rpc:
@@ -76,41 +73,26 @@ api:
7673
listenIP: #默认为0.0.0.0
7774

7875
object:
79-
enable: minio #使用minio
80-
apiURL: http://127.0.0.1:10002/third/object
76+
enable: "minio" #使用minio
77+
apiURL: "http://127.0.0.1:10002/object/"
8178
minio:
82-
tempBucket: "openim" #不建议修改
83-
dataBucket: "openim" #不建议修改
84-
location: us-east-1 #不建议修改
85-
endpoint: http://127.0.0.1:10005 #minio对外服务的ip和端口,app要能访问此ip和端口
86-
accessKeyID: root #ID
87-
secretAccessKey: openIM123 #秘钥
88-
isDistributedMod: false #是否分布式多硬盘部署,如果是多硬盘部署,需要修改为true
89-
tencent: #tencent cos
90-
appID:
91-
region:
92-
bucket:
93-
secretID:
94-
secretKey:
95-
ali: #ali oss
96-
regionID:
97-
accessKeyID:
98-
accessKeySecret:
99-
stsEndpoint:
100-
ossEndpoint:
101-
bucket:
102-
finalHost:
103-
stsDurationSeconds:
104-
OssRoleArn:
105-
aws:
106-
accessKeyID:
107-
accessKeySecret:
108-
region:
109-
bucket:
110-
finalHost:
111-
roleArn:
112-
externalId:
113-
roleSessionName:
79+
bucket: "openim" #不建议修改
80+
endpoint: "http://127.0.0.1:10005" #minio对外服务的ip和端口,app要能访问此ip和端口
81+
accessKeyID: "root" #ID
82+
secretAccessKey: "openIM123" #秘钥
83+
sessionToken: "" #token
84+
cos: #tencent cos
85+
bucketURL: "https://temp-1252357374.cos.ap-chengdu.myqcloud.com"
86+
secretID: ""
87+
secretKey: ""
88+
sessionToken: ""
89+
oss: #ali oss
90+
endpoint: "https://oss-cn-chengdu.aliyuncs.com"
91+
bucket: "demo-9999999"
92+
bucketURL: "https://demo-9999999.oss-cn-chengdu.aliyuncs.com"
93+
accessKeyID: ""
94+
accessKeySecret: ""
95+
sessionToken: ""
11496

11597
rpcPort: #rpc服务端口,不建议修改,端口由脚本读取后传入程序,如启动多个程序,只需要填入多个端口,用逗号隔开,如 [10110, 10111]
11698
openImUserPort: [ 10110 ]
@@ -135,7 +117,7 @@ rpcRegisterName: #rpc注册服务名,不建议修改
135117
openImThirdName: Third
136118

137119
log:
138-
storageLocation: ../../../../../logs/ #TODO: 存放目录
120+
storageLocation: ../logs/ #存放目录
139121
rotationTime: 24 #日志旋转时间
140122
remainRotationCount: 2 #日志数量
141123
remainLogLevel: 6 #日志级别 6表示全都打印,
@@ -182,7 +164,8 @@ groupMessageHasReadReceiptEnable: true #群聊已读是否开
182164
singleMessageHasReadReceiptEnable: true #单聊已读是否开启
183165

184166
retainChatRecords: 365 #mongo保存离线消息时间(天)
185-
chatRecordsClearTime: "0 2 * * 3" #每周三凌晨2点清理mongo中的过期(超过retainChatRecords时间)消息
167+
chatRecordsClearTime: "0 2 * * 3" #每周三凌晨2点清理mongo中的过期(超过retainChatRecords时间)消息,这个删除是为了清理满足上个配置retainChatRecords的过期消息,不会发送通知,仅仅作为清理磁盘使用
168+
msgDestructTime: "0 2 * * *" #消息自动删除时间,每天凌晨2点删除过期消息,这个删除是为了删除保留时间超过超过会话字段msg_destruct_time(秒)的消息。
186169

187170
secret: tuoyun #秘钥,获取token时校验
188171

Diff for: internal/msgtransfer/init.go

+9-23
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@ import (
55
"sync"
66
"time"
77

8-
"google.golang.org/grpc"
9-
"google.golang.org/grpc/credentials/insecure"
10-
118
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
129
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
1310
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
@@ -19,6 +16,8 @@ import (
1916
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
2017
openKeeper "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry/zookeeper"
2118
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
19+
"google.golang.org/grpc"
20+
"google.golang.org/grpc/credentials/insecure"
2221
)
2322

2423
type MsgTransfer struct {
@@ -47,18 +46,9 @@ func StartTransfer(prometheusPort int) error {
4746
if err := mongo.CreateMsgIndex(); err != nil {
4847
return err
4948
}
50-
client, err := openKeeper.NewClient(
51-
config.Config.Zookeeper.ZkAddr,
52-
config.Config.Zookeeper.Schema,
53-
openKeeper.WithFreq(
54-
time.Hour,
55-
),
56-
openKeeper.WithRoundRobin(),
57-
openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
58-
config.Config.Zookeeper.Password),
59-
openKeeper.WithTimeout(10),
60-
openKeeper.WithLogger(log.NewZkLogger()),
61-
)
49+
client, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
50+
openKeeper.WithFreq(time.Hour), openKeeper.WithRoundRobin(), openKeeper.WithUserNameAndPassword(config.Config.Zookeeper.Username,
51+
config.Config.Zookeeper.Password), openKeeper.WithTimeout(10), openKeeper.WithLogger(log.NewZkLogger()))
6252
if err != nil {
6353
return err
6454
}
@@ -68,9 +58,8 @@ func StartTransfer(prometheusPort int) error {
6858
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
6959
msgModel := cache.NewMsgCacheModel(rdb)
7060
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
71-
msgMysModel := relation.NewChatLogGorm(db)
72-
chatLogDatabase := controller.NewChatLogDatabase(msgMysModel)
73-
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel, msgMysModel)
61+
chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db))
62+
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel)
7463
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
7564
groupRpcClient := rpcclient.NewGroupRpcClient(client)
7665
msgTransfer := NewMsgTransfer(chatLogDatabase, msgDatabase, &conversationRpcClient, &groupRpcClient)
@@ -81,11 +70,8 @@ func StartTransfer(prometheusPort int) error {
8170
func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase,
8271
msgDatabase controller.CommonMsgDatabase,
8372
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) *MsgTransfer {
84-
return &MsgTransfer{
85-
persistentCH: NewPersistentConsumerHandler(chatLogDatabase),
86-
historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient),
87-
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase),
88-
}
73+
return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient),
74+
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase)}
8975
}
9076

9177
func (m *MsgTransfer) initPrometheus() {

Diff for: internal/push/push_to_client.go

+14-89
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,9 @@ type Pusher struct {
3838

3939
var errNoOfflinePusher = errors.New("no offlinePusher is configured")
4040

41-
func NewPusher(
42-
discov discoveryregistry.SvcDiscoveryRegistry,
43-
offlinePusher offlinepush.OfflinePusher,
44-
database controller.PushDatabase,
45-
groupLocalCache *localcache.GroupLocalCache,
46-
conversationLocalCache *localcache.ConversationLocalCache,
47-
conversationRpcClient *rpcclient.ConversationRpcClient,
48-
groupRpcClient *rpcclient.GroupRpcClient,
49-
msgRpcClient *rpcclient.MessageRpcClient,
50-
) *Pusher {
41+
func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
42+
groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache,
43+
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient) *Pusher {
5144
return &Pusher{
5245
discov: discov,
5346
database: database,
@@ -94,18 +87,7 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg
9487
return err
9588
}
9689
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
97-
log.ZDebug(
98-
ctx,
99-
"push_result",
100-
"ws push result",
101-
wsResults,
102-
"sendData",
103-
msg,
104-
"isOfflinePush",
105-
isOfflinePush,
106-
"push_to_userID",
107-
userIDs,
108-
)
90+
log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs)
10991
p.successCount++
11092
for _, userID := range userIDs {
11193
if isOfflinePush && userID != msg.SendID {
@@ -156,15 +138,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
156138
}
157139
defer func(groupID string, userIDs []string) {
158140
if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
159-
log.ZError(
160-
ctx,
161-
"MemberQuitNotification DeleteMemberAndSetConversationSeq",
162-
err,
163-
"groupID",
164-
groupID,
165-
"userIDs",
166-
userIDs,
167-
)
141+
log.ZError(ctx, "MemberQuitNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs)
168142
}
169143
}(groupID, []string{tips.QuitUser.UserID})
170144
pushToUserIDs = append(pushToUserIDs, tips.QuitUser.UserID)
@@ -173,21 +147,10 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
173147
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
174148
return err
175149
}
176-
kickedUsers := utils.Slice(
177-
tips.KickedUserList,
178-
func(e *sdkws.GroupMemberFullInfo) string { return e.UserID },
179-
)
150+
kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID })
180151
defer func(groupID string, userIDs []string) {
181152
if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
182-
log.ZError(
183-
ctx,
184-
"MemberKickedNotification DeleteMemberAndSetConversationSeq",
185-
err,
186-
"groupID",
187-
groupID,
188-
"userIDs",
189-
userIDs,
190-
)
153+
log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs)
191154
}
192155
}(groupID, kickedUsers)
193156
pushToUserIDs = append(pushToUserIDs, kickedUsers...)
@@ -197,16 +160,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
197160
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
198161
return err
199162
}
200-
log.ZInfo(
201-
ctx,
202-
"GroupDismissedNotificationInfo****",
203-
"groupID",
204-
groupID,
205-
"num",
206-
len(pushToUserIDs),
207-
"list",
208-
pushToUserIDs,
209-
)
163+
log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(pushToUserIDs), "list", pushToUserIDs)
210164
if len(config.Config.Manager.UserID) > 0 {
211165
ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.UserID[0])
212166
}
@@ -270,35 +224,17 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
270224
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
271225
return err
272226
}
273-
_, err := p.GetConnsAndOnlinePush(
274-
ctx,
275-
msg,
276-
utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs),
277-
)
227+
_, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs))
278228
if err != nil {
279-
log.ZError(
280-
ctx,
281-
"offlinePushMsg failed",
282-
err,
283-
"groupID",
284-
groupID,
285-
"msg",
286-
msg,
287-
"userIDs",
288-
utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs),
289-
)
229+
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs))
290230
return err
291231
}
292232
}
293233
}
294234
return nil
295235
}
296236

297-
func (p *Pusher) GetConnsAndOnlinePush(
298-
ctx context.Context,
299-
msg *sdkws.MsgData,
300-
pushToUserIDs []string,
301-
) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
237+
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
302238
conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
303239
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
304240
if err != nil {
@@ -307,10 +243,7 @@ func (p *Pusher) GetConnsAndOnlinePush(
307243
//Online push message
308244
for _, v := range conns {
309245
msgClient := msggateway.NewMsgGatewayClient(v)
310-
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(
311-
ctx,
312-
&msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs},
313-
)
246+
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs})
314247
if err != nil {
315248
continue
316249
}
@@ -323,12 +256,7 @@ func (p *Pusher) GetConnsAndOnlinePush(
323256
return wsResults, nil
324257
}
325258

326-
func (p *Pusher) offlinePushMsg(
327-
ctx context.Context,
328-
conversationID string,
329-
msg *sdkws.MsgData,
330-
offlinePushUserIDs []string,
331-
) error {
259+
func (p *Pusher) offlinePushMsg(ctx context.Context, conversationID string, msg *sdkws.MsgData, offlinePushUserIDs []string) error {
332260
title, content, opts, err := p.getOfflinePushInfos(conversationID, msg)
333261
if err != nil {
334262
return err
@@ -362,10 +290,7 @@ func (p *Pusher) GetOfflinePushOpts(msg *sdkws.MsgData) (opts *offlinepush.Opts,
362290
return opts, nil
363291
}
364292

365-
func (p *Pusher) getOfflinePushInfos(
366-
conversationID string,
367-
msg *sdkws.MsgData,
368-
) (title, content string, opts *offlinepush.Opts, err error) {
293+
func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData) (title, content string, opts *offlinepush.Opts, err error) {
369294
if p.offlinePusher == nil {
370295
err = errNoOfflinePusher
371296
return

0 commit comments

Comments
 (0)