Skip to content

Commit 49ca5c9

Browse files
withchaomo3etFGadvancericey-yu
authored
feat: msg queue push (#2434)
* fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * friend incr sync * friend incr sync * friend incr sync * friend incr sync * friend incr sync * mage * optimization version log * optimization version log * sync * sync * sync * group sync * sync option * sync option * refactor: replace `friend` package with `realtion`. * refactor: update lastest commit to relation. * sync option * sync option * sync option * sync * sync * go.mod * seq * update: go mod * refactor: change incremental to full * feat: get full friend user ids * feat: api and config * seq * group version * merge * seq * seq * seq * fix: sort by id avoid unstable sort friends. * group * group * group * fix: sort by id avoid unstable sort friends. * fix: sort by id avoid unstable sort friends. * fix: sort by id avoid unstable sort friends. * user version * seq * seq * seq user * user online * implement minio expire delete. * user online * config * fix * fix * implement minio expire delete logic. * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * feat: implement scheduled delete outdated object in minio. * update gomake version * update gomake version * implement FindExpires pagination. * remove unnesseary incr. * fix uncorrect args call. * online push * online push * online push * resolving conflicts * resolving conflicts * test * api prommetrics * api prommetrics * api prommetrics * api prommetrics * api prommetrics * rpc prommetrics * rpc prommetrics * online status * online status * online status * online status * sub * conversation version incremental * merge seq * merge online * merge online * merge online * merge seq * GetOwnerConversation * fix: change incremental syncer router name. * rockscache batch get * rockscache seq batch get * fix: GetMsgDocModelByIndex bug * update go.mod * update go.mod * merge * feat: prometheus * feat: prometheus * group member sort * sub * sub * fix: seq conversion bug * fix: redis pipe exec * sort version * sort version * sort version * remove old version online subscription * remove old version online subscription * version log index * version log index * batch push * batch push --------- Co-authored-by: withchao <[email protected]> Co-authored-by: Monet Lee <[email protected]> Co-authored-by: OpenIM-Gordon <[email protected]> Co-authored-by: icey-yu <[email protected]>
1 parent 8087f70 commit 49ca5c9

File tree

3 files changed

+87
-43
lines changed

3 files changed

+87
-43
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
1414
github.com/mitchellh/mapstructure v1.5.0
1515
github.com/openimsdk/protocol v0.0.69-alpha.38
16-
github.com/openimsdk/tools v0.0.49-alpha.51
16+
github.com/openimsdk/tools v0.0.49-alpha.52
1717
github.com/pkg/errors v0.9.1 // indirect
1818
github.com/prometheus/client_golang v1.18.0
1919
github.com/stretchr/testify v1.9.0

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF
321321
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
322322
github.com/openimsdk/protocol v0.0.69-alpha.38 h1:kVZCHIXg/el8YJFoIBWhZu1sbbTUqmzgF4l0W3sUH24=
323323
github.com/openimsdk/protocol v0.0.69-alpha.38/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
324-
github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY=
325-
github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
324+
github.com/openimsdk/tools v0.0.49-alpha.52 h1:NwAAtBO4BV96qG6Z0P2btGEqn4AI2DFgaHvLMXNHal0=
325+
github.com/openimsdk/tools v0.0.49-alpha.52/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
326326
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
327327
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
328328
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

internal/msggateway/hub_server.go

+84-40
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,15 @@ import (
2222
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
2323
"github.com/openimsdk/protocol/constant"
2424
"github.com/openimsdk/protocol/msggateway"
25+
"github.com/openimsdk/protocol/sdkws"
2526
"github.com/openimsdk/tools/discovery"
2627
"github.com/openimsdk/tools/errs"
2728
"github.com/openimsdk/tools/log"
2829
"github.com/openimsdk/tools/mcontext"
30+
"github.com/openimsdk/tools/mq/memamq"
31+
"github.com/openimsdk/tools/utils/datautil"
2932
"google.golang.org/grpc"
33+
"sync/atomic"
3034
)
3135

3236
func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
@@ -57,6 +61,7 @@ type Server struct {
5761
pushTerminal map[int]struct{}
5862
ready func(srv *Server) error
5963
userRcp rpcclient.UserRpcClient
64+
queue *memamq.MemoryQueue
6065
}
6166

6267
func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
@@ -70,6 +75,7 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f
7075
pushTerminal: make(map[int]struct{}),
7176
config: conf,
7277
ready: ready,
78+
queue: memamq.NewMemoryQueue(512, 1024*16),
7379
}
7480
s.pushTerminal[constant.IOSPlatformID] = struct{}{}
7581
s.pushTerminal[constant.AndroidPlatformID] = struct{}{}
@@ -125,55 +131,93 @@ func (s *Server) OnlineBatchPushOneMsg(ctx context.Context, req *msggateway.Onli
125131
return nil, nil
126132
}
127133

128-
func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq,
129-
) (*msggateway.OnlineBatchPushOneMsgResp, error) {
130-
var singleUserResults []*msggateway.SingleMsgToUserResults
131-
for _, v := range req.PushToUserIDs {
132-
var resp []*msggateway.SingleMsgToUserPlatform
133-
results := &msggateway.SingleMsgToUserResults{
134-
UserID: v,
134+
func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.MsgData) *msggateway.SingleMsgToUserResults {
135+
clients, ok := s.LongConnServer.GetUserAllCons(userID)
136+
if !ok {
137+
log.ZDebug(ctx, "push user not online", "userID", userID)
138+
return &msggateway.SingleMsgToUserResults{
139+
UserID: userID,
135140
}
136-
clients, ok := s.LongConnServer.GetUserAllCons(v)
137-
if !ok {
138-
log.ZDebug(ctx, "push user not online", "userID", v)
139-
results.Resp = resp
140-
singleUserResults = append(singleUserResults, results)
141+
}
142+
log.ZDebug(ctx, "push user online", "clients", clients, "userID", userID)
143+
result := &msggateway.SingleMsgToUserResults{
144+
UserID: userID,
145+
Resp: make([]*msggateway.SingleMsgToUserPlatform, 0, len(clients)),
146+
}
147+
for _, client := range clients {
148+
if client == nil {
141149
continue
142150
}
143-
144-
log.ZDebug(ctx, "push user online", "clients", clients, "userID", v)
145-
for _, client := range clients {
146-
if client == nil {
147-
continue
151+
userPlatform := &msggateway.SingleMsgToUserPlatform{
152+
RecvPlatFormID: int32(client.PlatformID),
153+
}
154+
if !client.IsBackground ||
155+
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
156+
err := client.PushMessage(ctx, msgData)
157+
if err != nil {
158+
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
159+
} else {
160+
if _, ok := s.pushTerminal[client.PlatformID]; ok {
161+
result.OnlinePush = true
162+
}
148163
}
164+
} else {
165+
userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
166+
}
167+
result.Resp = append(result.Resp, userPlatform)
168+
}
169+
return result
170+
}
149171

150-
userPlatform := &msggateway.SingleMsgToUserPlatform{
151-
RecvPlatFormID: int32(client.PlatformID),
172+
func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) {
173+
if len(req.PushToUserIDs) == 0 {
174+
return &msggateway.OnlineBatchPushOneMsgResp{}, nil
175+
}
176+
ch := make(chan *msggateway.SingleMsgToUserResults, len(req.PushToUserIDs))
177+
var count atomic.Int64
178+
count.Add(int64(len(req.PushToUserIDs)))
179+
for i := range req.PushToUserIDs {
180+
userID := req.PushToUserIDs[i]
181+
err := s.queue.PushCtx(ctx, func() {
182+
ch <- s.pushToUser(ctx, userID, req.MsgData)
183+
if count.Add(-1) == 0 {
184+
close(ch)
152185
}
153-
if !client.IsBackground ||
154-
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
155-
err := client.PushMessage(ctx, req.MsgData)
156-
if err != nil {
157-
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
158-
resp = append(resp, userPlatform)
159-
} else {
160-
if _, ok := s.pushTerminal[client.PlatformID]; ok {
161-
results.OnlinePush = true
162-
resp = append(resp, userPlatform)
163-
}
164-
}
165-
} else {
166-
userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
167-
resp = append(resp, userPlatform)
186+
})
187+
if err != nil {
188+
if count.Add(-1) == 0 {
189+
close(ch)
190+
}
191+
log.ZError(ctx, "pushToUser MemoryQueue failed", err, "userID", userID)
192+
ch <- &msggateway.SingleMsgToUserResults{
193+
UserID: userID,
168194
}
169195
}
170-
results.Resp = resp
171-
singleUserResults = append(singleUserResults, results)
172196
}
173-
174-
return &msggateway.OnlineBatchPushOneMsgResp{
175-
SinglePushResult: singleUserResults,
176-
}, nil
197+
resp := &msggateway.OnlineBatchPushOneMsgResp{
198+
SinglePushResult: make([]*msggateway.SingleMsgToUserResults, 0, len(req.PushToUserIDs)),
199+
}
200+
for {
201+
select {
202+
case <-ctx.Done():
203+
log.ZError(ctx, "SuperGroupOnlineBatchPushOneMsg ctx done", context.Cause(ctx))
204+
userIDSet := datautil.SliceSet(req.PushToUserIDs)
205+
for _, results := range resp.SinglePushResult {
206+
delete(userIDSet, results.UserID)
207+
}
208+
for userID := range userIDSet {
209+
resp.SinglePushResult = append(resp.SinglePushResult, &msggateway.SingleMsgToUserResults{
210+
UserID: userID,
211+
})
212+
}
213+
return resp, nil
214+
case res, ok := <-ch:
215+
if !ok {
216+
return resp, nil
217+
}
218+
resp.SinglePushResult = append(resp.SinglePushResult, res)
219+
}
220+
}
177221
}
178222

179223
func (s *Server) KickUserOffline(

0 commit comments

Comments
 (0)