Skip to content

Commit ac70b1d

Browse files
committed
Merge branch 'tuoyun'
2 parents 1f45b64 + d720082 commit ac70b1d

File tree

31 files changed

+2836
-518
lines changed

31 files changed

+2836
-518
lines changed

cmd/Open-IM-SDK-Core

cmd/open_im_api/main.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,29 @@ import (
1010
"Open_IM/internal/api/office"
1111
apiThird "Open_IM/internal/api/third"
1212
"Open_IM/internal/api/user"
13-
"Open_IM/pkg/common/constant"
13+
"Open_IM/pkg/common/config"
1414
"Open_IM/pkg/common/log"
1515
"Open_IM/pkg/utils"
1616
"flag"
17+
"io"
18+
"os"
1719
"strconv"
1820

1921
"github.com/gin-gonic/gin"
2022
//"syscall"
23+
"Open_IM/pkg/common/constant"
2124
)
2225

2326
func main() {
27+
log.NewPrivateLog(constant.LogFileName)
2428
gin.SetMode(gin.ReleaseMode)
29+
f, _ := os.Create("../logs/api.log")
30+
gin.DefaultWriter = io.MultiWriter(f)
31+
2532
r := gin.Default()
2633
r.Use(utils.CorsHandler())
34+
35+
log.Info("load config: ", config.Config)
2736
// user routing group, which handles user registration and login services
2837
userRouterGroup := r.Group("/user")
2938
{
@@ -126,8 +135,7 @@ func main() {
126135
officeGroup.POST("/send_msg_to_tag", office.SendMsg2Tag)
127136
officeGroup.POST("/get_send_tag_log", office.GetTagSendLogs)
128137
}
129-
apiThird.MinioInit()
130-
log.NewPrivateLog(constant.LogFileName)
138+
go apiThird.MinioInit()
131139
ginPort := flag.Int("port", 10000, "get ginServerPort from cmd,default 10000 as port")
132140
flag.Parse()
133141
r.Run(":" + strconv.Itoa(*ginPort))

cmd/open_im_demo/main.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,22 @@ package main
22

33
import (
44
"Open_IM/internal/demo/register"
5-
"Open_IM/pkg/common/constant"
6-
"Open_IM/pkg/common/log"
75
"Open_IM/pkg/utils"
86
"flag"
7+
"io"
8+
"os"
99
"strconv"
1010

11+
"Open_IM/pkg/common/constant"
12+
"Open_IM/pkg/common/log"
1113
"github.com/gin-gonic/gin"
1214
)
1315

1416
func main() {
17+
log.NewPrivateLog(constant.LogFileName)
18+
gin.SetMode(gin.ReleaseMode)
19+
f, _ := os.Create("../logs/api.log")
20+
gin.DefaultWriter = io.MultiWriter(f)
1521

1622
r := gin.Default()
1723
r.Use(utils.CorsHandler())
@@ -24,7 +30,7 @@ func main() {
2430
authRouterGroup.POST("/login", register.Login)
2531
authRouterGroup.POST("/reset_password", register.ResetPassword)
2632
}
27-
log.NewPrivateLog(constant.LogFileName)
33+
2834
ginPort := flag.Int("port", 42233, "get ginServerPort from cmd,default 42233 as port")
2935
flag.Parse()
3036
r.Run(":" + strconv.Itoa(*ginPort))

cmd/open_im_msg_gateway/main.go

+3
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ package main
22

33
import (
44
"Open_IM/internal/msg_gateway/gate"
5+
"Open_IM/pkg/common/constant"
6+
"Open_IM/pkg/common/log"
57
"flag"
68
"sync"
79
)
810

911
func main() {
12+
log.NewPrivateLog(constant.LogFileName)
1013
rpcPort := flag.Int("rpc_port", 10400, "rpc listening port")
1114
wsPort := flag.Int("ws_port", 17778, "ws listening port")
1215
flag.Parse()

cmd/open_im_msg_transfer/main.go

+3
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package main
22

33
import (
44
"Open_IM/internal/msg_transfer/logic"
5+
"Open_IM/pkg/common/constant"
6+
"Open_IM/pkg/common/log"
57
"sync"
68
)
79

810
func main() {
911
var wg sync.WaitGroup
1012
wg.Add(1)
13+
log.NewPrivateLog(constant.LogFileName)
1114
logic.Init()
1215
logic.Run()
1316
wg.Wait()

cmd/open_im_push/main.go

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package main
22

33
import (
44
"Open_IM/internal/push/logic"
5+
"Open_IM/pkg/common/constant"
6+
"Open_IM/pkg/common/log"
57
"flag"
68
"sync"
79
)
@@ -11,6 +13,7 @@ func main() {
1113
flag.Parse()
1214
var wg sync.WaitGroup
1315
wg.Add(1)
16+
log.NewPrivateLog(constant.LogFileName)
1417
logic.Init(*rpcPort)
1518
logic.Run()
1619
wg.Wait()

cmd/open_im_timer_task/main.go

+22-25
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ package main
22

33
import (
44
"Open_IM/pkg/common/constant"
5-
commonDB "Open_IM/pkg/common/db"
6-
"Open_IM/pkg/common/db/mysql_model/im_mysql_model"
75
"Open_IM/pkg/common/log"
8-
"time"
96
)
107

118
func main() {
@@ -40,27 +37,27 @@ func main() {
4037
// time.Sleep(time.Duration(sleepTime) * time.Second)
4138
// }
4239
//}
43-
for {
44-
uidList, err := im_mysql_model.SelectAllUserID()
45-
if err != nil {
46-
//log.NewError("999999", err.Error())
47-
} else {
48-
for _, v := range uidList {
49-
minSeq, err := commonDB.DB.GetMinSeqFromMongo(v)
50-
if err != nil {
51-
//log.NewError("999999", "get user minSeq err", err.Error(), v)
52-
continue
53-
} else {
54-
err := commonDB.DB.SetUserMinSeq(v, minSeq)
55-
if err != nil {
56-
//log.NewError("999999", "set user minSeq err", err.Error(), v)
57-
}
58-
}
59-
time.Sleep(time.Duration(100) * time.Millisecond)
60-
}
61-
62-
}
63-
64-
}
40+
//for {
41+
// uidList, err := im_mysql_model.SelectAllUserID()
42+
// if err != nil {
43+
// //log.NewError("999999", err.Error())
44+
// } else {
45+
// for _, v := range uidList {
46+
// minSeq, err := commonDB.DB.GetMinSeqFromMongo(v)
47+
// if err != nil {
48+
// //log.NewError("999999", "get user minSeq err", err.Error(), v)
49+
// continue
50+
// } else {
51+
// err := commonDB.DB.SetUserMinSeq(v, minSeq)
52+
// if err != nil {
53+
// //log.NewError("999999", "set user minSeq err", err.Error(), v)
54+
// }
55+
// }
56+
// time.Sleep(time.Duration(100) * time.Millisecond)
57+
// }
58+
//
59+
// }
60+
//
61+
//}
6562

6663
}

config/config.yaml

+3-2
Original file line numberDiff line numberDiff line change
@@ -588,5 +588,6 @@ demo:
588588
smtpAddr: "smtp.qq.com"
589589
smtpPort: 25 #需开放此端口 出口方向
590590

591-
592-
591+
rtc:
592+
port: 11300
593+
address: 127.0.0.1

internal/api/auth/auth.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,16 @@ func UserRegister(c *gin.Context) {
3535
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImAuthName)
3636
client := rpc.NewAuthClient(etcdConn)
3737
reply, err := client.UserRegister(context.Background(), req)
38-
if err != nil || reply.CommonResp.ErrCode != 0 {
39-
log.NewError(req.OperationID, "UserRegister failed ", err, reply.CommonResp.ErrCode)
38+
if err != nil {
39+
log.NewError(req.OperationID, "call rpc err ", err)
40+
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "internal service err"})
41+
return
42+
}
43+
if reply.CommonResp.ErrCode != 0 {
44+
log.NewError(req.OperationID, "UserRegister failed ", err)
4045
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": reply.CommonResp.ErrMsg})
4146
return
47+
4248
}
4349

4450
pbDataToken := &rpc.UserTokenReq{Platform: params.Platform, FromUserID: params.UserID, OperationID: params.OperationID}

internal/api/third/minio_init.go

+11-8
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,21 @@ var (
1515
)
1616

1717
func MinioInit() {
18-
log.NewInfo("", utils.GetSelfFuncName())
18+
operationID := utils.OperationIDGenerator()
19+
log.NewInfo(operationID, utils.GetSelfFuncName(), "minio config: ", config.Config.Credential.Minio)
1920
minioUrl, err := url2.Parse(config.Config.Credential.Minio.Endpoint)
2021
if err != nil {
21-
log.NewError("", utils.GetSelfFuncName(), "parse failed, please check config/config.yaml", err.Error())
22+
log.NewError(operationID, utils.GetSelfFuncName(), "parse failed, please check config/config.yaml", err.Error())
2223
return
2324
}
25+
log.NewInfo(operationID, utils.GetSelfFuncName(), "Parse ok ", config.Config.Credential.Minio)
2426
minioClient, err = minio.New(minioUrl.Host, &minio.Options{
2527
Creds: credentials.NewStaticV4(config.Config.Credential.Minio.AccessKeyID, config.Config.Credential.Minio.SecretAccessKey, ""),
2628
Secure: false,
2729
})
30+
log.NewInfo(operationID, utils.GetSelfFuncName(), "new ok ", config.Config.Credential.Minio)
2831
if err != nil {
29-
log.NewError("", utils.GetSelfFuncName(), "init minio client failed", err.Error())
32+
log.NewError(operationID, utils.GetSelfFuncName(), "init minio client failed", err.Error())
3033
return
3134
}
3235
opt := minio.MakeBucketOptions{
@@ -35,15 +38,15 @@ func MinioInit() {
3538
}
3639
err = minioClient.MakeBucket(context.Background(), config.Config.Credential.Minio.Bucket, opt)
3740
if err != nil {
38-
log.NewInfo("", utils.GetSelfFuncName(), err.Error())
41+
log.NewError(operationID, utils.GetSelfFuncName(), "MakeBucket failed ", err.Error())
3942
exists, err := minioClient.BucketExists(context.Background(), config.Config.Credential.Minio.Bucket)
4043
if err == nil && exists {
41-
log.NewInfo("", utils.GetSelfFuncName(), "We already own %s\n", config.Config.Credential.Minio.Bucket)
44+
log.NewWarn(operationID, utils.GetSelfFuncName(), "We already own ", config.Config.Credential.Minio.Bucket)
4245
} else {
4346
if err != nil {
44-
log.NewError("", utils.GetSelfFuncName(), err.Error())
47+
log.NewError(operationID, utils.GetSelfFuncName(), err.Error())
4548
}
46-
log.NewError("", utils.GetSelfFuncName(), "create bucket failed and bucket not exists")
49+
log.NewError(operationID, utils.GetSelfFuncName(), "create bucket failed and bucket not exists")
4750
return
4851
}
4952
}
@@ -53,5 +56,5 @@ func MinioInit() {
5356
// log.NewError("", utils.GetSelfFuncName(), "SetBucketPolicy failed please set in web", err.Error())
5457
// return
5558
//}
56-
log.NewInfo("", utils.GetSelfFuncName(), "minio create and set policy success")
59+
log.NewInfo(operationID, utils.GetSelfFuncName(), "minio create and set policy success")
5760
}

internal/msg_gateway/gate/init.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ package gate
22

33
import (
44
"Open_IM/pkg/common/config"
5-
"Open_IM/pkg/common/constant"
6-
"Open_IM/pkg/common/log"
5+
76
"Open_IM/pkg/statistics"
87
"fmt"
98
"github.com/go-playground/validator/v10"
@@ -21,7 +20,7 @@ var (
2120

2221
func Init(rpcPort, wsPort int) {
2322
//log initialization
24-
log.NewPrivateLog(constant.LogFileName)
23+
2524
rwLock = new(sync.RWMutex)
2625
validate = validator.New()
2726
statistics.NewStatistics(&sendMsgCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", sendMsgCount), 300)

internal/msg_gateway/gate/logic.go

+37-10
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@ import (
66
"Open_IM/pkg/common/log"
77
"Open_IM/pkg/grpc-etcdv3/getcdv3"
88
pbChat "Open_IM/pkg/proto/chat"
9+
pbRtc "Open_IM/pkg/proto/rtc"
910
sdk_ws "Open_IM/pkg/proto/sdk_ws"
11+
"Open_IM/pkg/utils"
1012
"bytes"
1113
"context"
1214
"encoding/gob"
1315
"github.com/golang/protobuf/proto"
1416
"github.com/gorilla/websocket"
17+
"google.golang.org/grpc"
1518
"runtime"
19+
"strconv"
1620
"strings"
1721
)
1822

@@ -200,38 +204,61 @@ func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) {
200204
nReply := new(pbChat.SendMsgResp)
201205
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendSignalMsg)
202206
if isPass {
203-
isPass2, errCode2, errMsg2, signalResp, msgData := ws.signalMessageAssemble(pData.(*sdk_ws.SignalReq), m.OperationID)
204-
if isPass2 {
207+
signalResp := pbRtc.SignalResp{}
208+
//isPass2, errCode2, errMsg2, signalResp, msgData := ws.signalMessageAssemble(pData.(*sdk_ws.SignalReq), m.OperationID)
209+
connGrpc, err := grpc.Dial(config.Config.Rtc.Address+":"+strconv.Itoa(config.Config.Rtc.Port), grpc.WithInsecure())
210+
if err != nil {
211+
log.NewError(m.OperationID, utils.GetSelfFuncName(), "grpc.Dial failed", err.Error())
212+
ws.sendSignalMsgResp(conn, 204, "create grpc failed"+err.Error(), m, nil)
213+
return
214+
}
215+
rtcClient := pbRtc.NewRtcServiceClient(connGrpc)
216+
req := &pbRtc.SignalMessageAssembleReq{
217+
SignalReq: pData.(*pbRtc.SignalReq),
218+
OperationID: m.OperationID,
219+
}
220+
respPb, err := rtcClient.SignalMessageAssemble(context.Background(), req)
221+
if err != nil {
222+
log.NewError(m.OperationID, utils.GetSelfFuncName(), "SignalMessageAssemble", err.Error(), config.Config.Rtc.Address+":"+strconv.Itoa(config.Config.Rtc.Port))
223+
ws.sendSignalMsgResp(conn, 204, "grpc SignalMessageAssemble failed: "+err.Error(), m, &signalResp)
224+
return
225+
}
226+
signalResp.Payload = respPb.SignalResp.Payload
227+
msgData := sdk_ws.MsgData{}
228+
utils.CopyStructFields(&msgData, respPb.MsgData)
229+
log.NewInfo(m.OperationID, utils.GetSelfFuncName(), respPb.String())
230+
if respPb.IsPass {
205231
pbData := pbChat.SendMsgReq{
206232
Token: m.Token,
207233
OperationID: m.OperationID,
208-
MsgData: msgData,
234+
MsgData: &msgData,
209235
}
236+
log.NewInfo(m.OperationID, utils.GetSelfFuncName(), "pbData: ", pbData)
210237
log.NewInfo(m.OperationID, "Ws call success to sendSignalMsgReq middle", m.ReqIdentifier, m.SendID, m.MsgIncr, msgData)
211238
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
212239
client := pbChat.NewChatClient(etcdConn)
213240
reply, err := client.SendMsg(context.Background(), &pbData)
214241
if err != nil {
215-
log.NewError(pbData.OperationID, "rpc sendMsg err", err.Error())
242+
log.NewError(pbData.OperationID, utils.GetSelfFuncName(), "rpc sendMsg err", err.Error())
216243
nReply.ErrCode = 200
217244
nReply.ErrMsg = err.Error()
218-
ws.sendSignalMsgResp(conn, 200, err.Error(), m, signalResp)
245+
ws.sendSignalMsgResp(conn, 200, err.Error(), m, &signalResp)
219246
} else {
220247
log.NewInfo(pbData.OperationID, "rpc call success to sendMsgReq", reply.String())
221-
ws.sendSignalMsgResp(conn, 0, "", m, signalResp)
248+
ws.sendSignalMsgResp(conn, 0, "", m, &signalResp)
222249
}
223250
} else {
224-
log.NewError(m.OperationID, isPass2, errCode2, errMsg2)
225-
ws.sendSignalMsgResp(conn, errCode2, errMsg2, m, signalResp)
251+
log.NewError(m.OperationID, utils.GetSelfFuncName(), respPb.IsPass, respPb.CommonResp.ErrCode, respPb.CommonResp.ErrMsg)
252+
ws.sendSignalMsgResp(conn, respPb.CommonResp.ErrCode, respPb.CommonResp.ErrMsg, m, &signalResp)
226253
}
227254
} else {
228255
ws.sendSignalMsgResp(conn, errCode, errMsg, m, nil)
229256
}
230257

231258
}
232-
func (ws *WServer) sendSignalMsgResp(conn *UserConn, errCode int32, errMsg string, m *Req, pb *sdk_ws.SignalResp) {
259+
func (ws *WServer) sendSignalMsgResp(conn *UserConn, errCode int32, errMsg string, m *Req, pb *pbRtc.SignalResp) {
233260
// := make(map[string]interface{})
234-
261+
log.Debug(m.OperationID, "SignalMsgResp is", pb.String())
235262
b, _ := proto.Marshal(pb)
236263
mReply := Resp{
237264
ReqIdentifier: m.ReqIdentifier,

0 commit comments

Comments
 (0)