Skip to content

Commit c2b5f15

Browse files
authored
add static func for encode message to byte (#39)
1 parent 4e809ac commit c2b5f15

File tree

7 files changed

+81
-5
lines changed

7 files changed

+81
-5
lines changed

agent/agent.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Agent struct {
2121
conn acceptor.AcceptorConn // the Conn
2222
agentLog *zap.Logger // logger
2323
chSend chan *bytebuffer.ByteBuffer // send message channel
24+
chSendByte chan []byte // send message byte channel
2425
chStopWrite chan struct{} // close message send
2526
chStopHeartbeat chan struct{} // close heartbeat
2627
chDie chan struct{} // wait for close
@@ -108,6 +109,10 @@ func (a *Agent) write() {
108109
return
109110
}
110111
bytebuffer.Put(msg)
112+
case msg := <-a.chSendByte:
113+
if _, err := a.conn.Write(msg); err != nil {
114+
return
115+
}
111116
case <-a.chStopWrite:
112117
return
113118
}
@@ -179,10 +184,8 @@ func (a *Agent) Send(in interface{}, name ...string) error {
179184
}
180185

181186
func (a *Agent) SendData(data []byte) {
182-
b := bytebuffer.Get()
183-
b.Set(data)
184187
select {
185-
case a.chSend <- b:
188+
case a.chSendByte <- data:
186189
case <-a.chDie:
187190
}
188191
}

agent/factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func (af *AgentFacotry) NewAgent(conn acceptor.AcceptorConn) *Agent {
6060
conn: conn,
6161
agentLog: gslog.NewLog("agent").With(zap.Int64("agent_id", agentId)),
6262
chSend: make(chan *bytebuffer.ByteBuffer, af.messagesBufferSize),
63+
chSendByte: make(chan []byte, af.messagesBufferSize),
6364
chStopWrite: make(chan struct{}),
6465
chStopHeartbeat: make(chan struct{}),
6566
chDie: make(chan struct{}),

app.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func NewApp(config *config.Config) *App {
7171
MessageServer: appBuidler.messageServer,
7272
webServer: appBuidler.webServer,
7373
}
74+
message.DefaultMessageServer = app.MessageServer
7475

7576
//初始化便捷服务
7677
helper := newAppHelper(app)

message/static.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package message
2+
3+
var DefaultMessageServer *MessageServer
4+
5+
func EncodeMessage(in interface{}, name ...string) ([]byte, error) {
6+
packet, err := DefaultMessageServer.EncodeMessage(in, name...)
7+
if err != nil {
8+
return nil, err
9+
}
10+
return packet.ToByte(), nil
11+
}

packet/packet.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,32 @@ func (p *Packet) ToData() *bytebuffer.ByteBuffer {
8181
}()
8282

8383
b := bytebuffer.Get()
84-
if p.HeaderByte[0] == HeaderFlag {
84+
if len(p.HeaderByte) > 0 && p.HeaderByte[0] == HeaderFlag {
8585
_, _ = b.Write(p.HeaderByte)
8686
}
8787
_, _ = b.Write(p.Data)
8888

8989
return b
9090
}
9191

92+
func (p *Packet) ToByte() []byte {
93+
defer func() {
94+
PutPacket(p)
95+
}()
96+
97+
length := len(p.Data)
98+
if len(p.HeaderByte) > 0 && p.HeaderByte[0] == HeaderFlag {
99+
length += HeaderLength
100+
data := make([]byte, length)
101+
copy(data, p.HeaderByte)
102+
copy(data[HeaderLength:], p.Data)
103+
return data
104+
}
105+
data := make([]byte, length)
106+
copy(data, p.Data)
107+
return data
108+
}
109+
92110
// packetType: 2bit // 0x01: system, 0x02: service
93111
// module: 6bit
94112
// action: 16bit

packet/packet_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,26 @@ func TestPacket_ToData(t *testing.T) {
318318
B: []byte{0x7E, 0x0A, 0x41, 0x00, 0x02, 0x00, 0x00, 0x0F, 0x0A, 0x0D, 0x31, 0x36, 0x36, 0x30, 0x33, 0x36, 0x30, 0x39, 0x31, 0x35, 0x35, 0x35, 0x36},
319319
},
320320
},
321+
{
322+
name: "to data2",
323+
fields: fields{
324+
HeaderByte: []byte{},
325+
Data: []byte{0x0A, 0x0D, 0x31, 0x36, 0x36, 0x30, 0x33, 0x36, 0x30, 0x39, 0x31, 0x35, 0x35, 0x35, 0x36},
326+
},
327+
want: &bytebuffer.ByteBuffer{
328+
B: []byte{0x0A, 0x0D, 0x31, 0x36, 0x36, 0x30, 0x33, 0x36, 0x30, 0x39, 0x31, 0x35, 0x35, 0x35, 0x36},
329+
},
330+
},
331+
{
332+
name: "to data2",
333+
fields: fields{
334+
HeaderByte: []byte{0x20},
335+
Data: []byte{0x0A, 0x0D, 0x31, 0x36, 0x36, 0x30, 0x33, 0x36, 0x30, 0x39, 0x31, 0x35, 0x35, 0x35, 0x36},
336+
},
337+
want: &bytebuffer.ByteBuffer{
338+
B: []byte{0x0A, 0x0D, 0x31, 0x36, 0x36, 0x30, 0x33, 0x36, 0x30, 0x39, 0x31, 0x35, 0x35, 0x35, 0x36},
339+
},
340+
},
321341
}
322342
for _, tt := range tests {
323343
t.Run(tt.name, func(t *testing.T) {
@@ -329,6 +349,9 @@ func TestPacket_ToData(t *testing.T) {
329349
if got := p.ToData(); !reflect.DeepEqual(got, tt.want) {
330350
t.Errorf("Packet.ToData() = %v, want %v", got, tt.want)
331351
}
352+
if !reflect.DeepEqual(p.ToData().B, p.ToByte()) {
353+
t.Errorf("Packet.ToData() = %v, want %v", p.ToByte(), p.ToData().B)
354+
}
332355
})
333356
}
334357
}

session/static.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package session
22

3-
import "github.com/metagogs/gogs/utils/slicex"
3+
import (
4+
"github.com/metagogs/gogs/utils/slicex"
5+
)
46

57
var DefaultSessionPool SessionPool
68

@@ -21,6 +23,12 @@ func SendMessageByID(sessionId int64, in interface{}) {
2123
}
2224
}
2325

26+
func SendDataByID(sessionId int64, in []byte) {
27+
if sess, err := DefaultSessionPool.GetSessionByID(sessionId); err == nil {
28+
sess.SendData(in)
29+
}
30+
}
31+
2432
func ListSessions() []*Session {
2533
return DefaultSessionPool.ListSessions()
2634
}
@@ -37,3 +45,14 @@ func BroadcastMessage(users []string, send interface{}, filter *SessionFilter, e
3745
}
3846
}
3947
}
48+
49+
func BroadcastData(users []string, send []byte, filter *SessionFilter, exclude ...string) {
50+
for _, u := range users {
51+
if slicex.InSlice(u, exclude) {
52+
continue
53+
}
54+
if result, _ := GetSessionByUID(u, filter); len(result) > 0 {
55+
SendDataByID(result[0], send)
56+
}
57+
}
58+
}

0 commit comments

Comments
 (0)