@@ -20,7 +20,6 @@ import (
20
20
"context"
21
21
"errors"
22
22
"net"
23
- "runtime/debug"
24
23
"sync"
25
24
"sync/atomic"
26
25
@@ -34,7 +33,7 @@ type Channel interface {
34
33
ID () int64
35
34
36
35
// Write a message through the Pipeline
37
- Write (Message ) bool
36
+ Write (Message ) error
38
37
39
38
// Trigger user event
40
39
Trigger (event Event )
@@ -75,43 +74,46 @@ type Channel interface {
75
74
76
75
// NewChannel create a ChannelFactory
77
76
func NewChannel (capacity int ) ChannelFactory {
78
- return func (id int64 , ctx context.Context , pipeline Pipeline , transport transport.Transport ) Channel {
79
- return newChannelWith (ctx , pipeline , transport , id , capacity )
80
- }
81
- }
82
-
83
- // NewBufferedChannel create a ChannelFactory with buffered transport
84
- func NewBufferedChannel (capacity int , sizeRead int ) ChannelFactory {
85
- return func (id int64 , ctx context.Context , pipeline Pipeline , tran transport.Transport ) Channel {
86
- tran = transport .BufferedTransport (tran , sizeRead )
87
- return newChannelWith (ctx , pipeline , tran , id , capacity )
77
+ return func (id int64 , ctx context.Context , pipeline Pipeline , transport transport.Transport , executor Executor ) Channel {
78
+ return newChannelWith (ctx , pipeline , transport , executor , id , capacity )
88
79
}
89
80
}
90
81
91
82
// newChannelWith internal method for NewChannel & NewBufferedChannel
92
- func newChannelWith (ctx context.Context , pipeline Pipeline , transport transport.Transport , id int64 , capacity int ) Channel {
83
+ func newChannelWith (ctx context.Context , pipeline Pipeline , transport transport.Transport , executor Executor , id int64 , capacity int ) Channel {
93
84
childCtx , cancel := context .WithCancel (ctx )
94
85
return & channel {
95
- id : id ,
96
- ctx : childCtx ,
97
- cancel : cancel ,
98
- pipeline : pipeline ,
99
- transport : transport ,
100
- sendQueue : make (chan [][]byte , capacity ),
86
+ id : id ,
87
+ ctx : childCtx ,
88
+ cancel : cancel ,
89
+ pipeline : pipeline ,
90
+ transport : transport ,
91
+ executor : executor ,
92
+ sendQueue : utils .NewRingBuffer (uint64 (capacity )),
93
+ writeBuffers : make (net.Buffers , 0 , capacity / 3 + 8 ),
94
+ writeIndexes : make ([]int , 0 , capacity / 3 + 8 ),
101
95
}
102
96
}
103
97
98
+ const idle = 0
99
+ const running = 1
100
+
104
101
// implement of Channel
105
102
type channel struct {
106
- id int64
107
- ctx context.Context
108
- cancel context.CancelFunc
109
- transport transport.Transport
110
- pipeline Pipeline
111
- attachment Attachment
112
- sendQueue chan [][]byte
113
- activeWait sync.WaitGroup
114
- closed int32
103
+ id int64
104
+ ctx context.Context
105
+ cancel context.CancelFunc
106
+ transport transport.Transport
107
+ executor Executor
108
+ pipeline Pipeline
109
+ attachment Attachment
110
+ sendQueue * utils.RingBuffer
111
+ writeBuffers net.Buffers
112
+ writeIndexes []int
113
+ activeWait sync.WaitGroup
114
+ closed int32
115
+ running int32
116
+ closeErr error
115
117
}
116
118
117
119
// ID get channel id
@@ -120,22 +122,22 @@ func (c *channel) ID() int64 {
120
122
}
121
123
122
124
// Write a message through the Pipeline
123
- func (c * channel ) Write (message Message ) bool {
124
-
125
- select {
126
- case <- c .ctx .Done ():
127
- return false
128
- default :
129
- c .invokeMethod (func () {
130
- c .pipeline .FireChannelWrite (message )
131
- })
132
- return true
125
+ func (c * channel ) Write (message Message ) error {
126
+ if ! c .IsActive () {
127
+ select {
128
+ case <- c .ctx .Done ():
129
+ return c .closeErr
130
+ }
133
131
}
132
+
133
+ c .invokeMethod (func () {
134
+ c .pipeline .FireChannelWrite (message )
135
+ })
136
+ return nil
134
137
}
135
138
136
139
// Trigger trigger event
137
140
func (c * channel ) Trigger (event Event ) {
138
-
139
141
c .invokeMethod (func () {
140
142
c .pipeline .FireChannelEvent (event )
141
143
})
@@ -144,27 +146,34 @@ func (c *channel) Trigger(event Event) {
144
146
// Close through the Pipeline
145
147
func (c * channel ) Close (err error ) {
146
148
if atomic .CompareAndSwapInt32 (& c .closed , 0 , 1 ) {
147
- c .cancel ()
149
+ c .closeErr = err
150
+ c .sendQueue .Dispose ()
148
151
c .transport .Close ()
152
+ c .cancel ()
149
153
150
154
c .invokeMethod (func () {
151
- c .pipeline .FireChannelInactive (AsException ( err , debug . Stack ()) )
155
+ c .pipeline .FireChannelInactive (err )
152
156
})
153
157
}
154
158
}
155
159
156
160
// Writev to write [][]byte for optimize syscall
157
161
func (c * channel ) Writev (p [][]byte ) (n int64 , err error ) {
158
-
159
- select {
160
- case <- c .ctx .Done ():
161
- return 0 , errors .New ("broken pipe" )
162
- case c .sendQueue <- p :
163
- for _ , d := range p {
164
- n += int64 (len (d ))
162
+ if ! c .IsActive () {
163
+ select {
164
+ case <- c .ctx .Done ():
165
+ return 0 , c .closeErr
165
166
}
166
- return
167
167
}
168
+ // put packet to send queue
169
+ if err = c .sendQueue .Put (p ); nil != err {
170
+ return 0 , err
171
+ }
172
+ // try send
173
+ if atomic .CompareAndSwapInt32 (& c .running , idle , running ) {
174
+ c .executor .Exec (c .writeOnce )
175
+ }
176
+ return utils .CountOf (p ), nil
168
177
}
169
178
170
179
// IsActive return true if the Channel is active and so connected
@@ -210,20 +219,19 @@ func (c *channel) Context() context.Context {
210
219
// serveChannel start write & read routines
211
220
func (c * channel ) serveChannel () {
212
221
c .activeWait .Add (1 )
213
- go c .readLoop ()
214
- go c .writeLoop ()
222
+ c .executor .Exec (c .readLoop )
215
223
c .activeWait .Wait ()
216
224
}
217
225
218
226
func (c * channel ) invokeMethod (fn func ()) {
219
227
220
228
defer func () {
221
229
if err := recover (); nil != err && 0 == atomic .LoadInt32 (& c .closed ) {
222
- c .pipeline .FireChannelException (AsException (err , debug . Stack () ))
230
+ c .pipeline .FireChannelException (AsException (err ))
223
231
224
232
if e , ok := err .(error ); ok {
225
233
var ne net.Error
226
- if errors .As (e , & ne ) && ! ne .Temporary () {
234
+ if errors .As (e , & ne ) && ! ne .Timeout () {
227
235
c .Close (e )
228
236
}
229
237
}
@@ -237,11 +245,7 @@ func (c *channel) invokeMethod(fn func()) {
237
245
func (c * channel ) readLoop () {
238
246
239
247
defer func () {
240
- if err := recover (); nil != err {
241
- c .Close (AsException (err , debug .Stack ()))
242
- } else {
243
- c .Close (nil )
244
- }
248
+ c .Close (AsException (recover ()))
245
249
}()
246
250
247
251
func () {
@@ -261,56 +265,56 @@ func (c *channel) readLoop() {
261
265
}
262
266
}
263
267
264
- // writeLoop sending message of channel
265
- func (c * channel ) writeLoop () {
268
+ // writeOnce sending messages of channel
269
+ func (c * channel ) writeOnce () {
266
270
267
271
defer func () {
268
272
if err := recover (); nil != err {
269
- c .Close (AsException (err , debug .Stack ()))
270
- } else {
271
- c .Close (nil )
273
+ c .Close (AsException (err ))
272
274
}
273
275
}()
274
276
275
- var bufferCap = cap (c .sendQueue )
276
- var buffers = make (net.Buffers , 0 , bufferCap )
277
- var indexes = make ([]int , 0 , bufferCap )
277
+ for {
278
+ // reuse buffer.
279
+ sendBuffers := c .writeBuffers [:0 ]
280
+ sendIndexes := c .writeIndexes [:0 ]
281
+
282
+ // more packet will be merged
283
+ for c .sendQueue .Len () > 0 && len (sendBuffers ) < cap (sendBuffers ) {
284
+ // poll packet
285
+ item , err := c .sendQueue .Poll (- 1 )
286
+ if nil != err {
287
+ break
288
+ }
278
289
279
- // Try to combine packet sending to optimize sending performance
280
- sendWithWritev := func (data [][]byte , queue <- chan [][]byte ) (int64 , error ) {
290
+ // combine send bytes to reduce syscall.
291
+ pkts := item .([][]byte )
292
+ sendBuffers = append (sendBuffers , pkts ... )
293
+ sendIndexes = append (sendIndexes , len (sendBuffers ))
294
+ }
281
295
282
- // reuse buffer.
283
- sendBuffers := buffers [:0 ]
284
- sendIndexes := indexes [:0 ]
285
-
286
- // append first packet.
287
- sendBuffers = append (sendBuffers , data ... )
288
- sendIndexes = append (sendIndexes , len (sendBuffers ))
289
-
290
- // more packet will be merged.
291
- for {
292
- select {
293
- case data := <- queue :
294
- sendBuffers = append (sendBuffers , data ... )
295
- sendIndexes = append (sendIndexes , len (sendBuffers ))
296
- if len (sendIndexes ) >= bufferCap {
297
- return c .transport .Writev (transport.Buffers {Buffers : sendBuffers , Indexes : sendIndexes })
296
+ if len (sendBuffers ) > 0 {
297
+ utils .AssertLong (c .transport .Writev (transport.Buffers {Buffers : sendBuffers , Indexes : sendIndexes }))
298
+ utils .Assert (c .transport .Flush ())
299
+
300
+ // clear buffer ref
301
+ for index := range sendBuffers {
302
+ sendBuffers [index ] = nil // avoid memory leak
303
+ if index < len (sendIndexes ) {
304
+ sendIndexes [index ] = - 1 // for safety
298
305
}
299
- default :
300
- return c .transport .Writev (transport.Buffers {Buffers : sendBuffers , Indexes : sendIndexes })
301
306
}
302
307
}
303
- }
304
308
305
- for {
306
- select {
307
- case buf := <- c .sendQueue :
308
- // combine send bytes to reduce syscall.
309
- utils .AssertLong (sendWithWritev (buf , c .sendQueue ))
310
- // flush buffer
311
- utils .Assert (c .transport .Flush ())
312
- case <- c .ctx .Done ():
313
- return
309
+ // double check
310
+ atomic .StoreInt32 (& c .running , idle )
311
+ if size := c .sendQueue .Len (); size > 0 {
312
+ if atomic .CompareAndSwapInt32 (& c .running , idle , running ) {
313
+ continue
314
+ }
314
315
}
316
+
317
+ // no packets to send
318
+ break
315
319
}
316
320
}
0 commit comments