Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ func (c *connection) ReadByte() (b byte, err error) {

// Malloc implements Connection.
func (c *connection) Malloc(n int) (buf []byte, err error) {
if !c.IsActive() {
return nil, Exception(ErrConnClosed, "when malloc")
}
return c.outputBuffer.Malloc(n)
}

Expand Down Expand Up @@ -273,31 +276,49 @@ func (c *connection) Flush() error {

// MallocAck implements Connection.
func (c *connection) MallocAck(n int) (err error) {
if !c.IsActive() {
return Exception(ErrConnClosed, "when malloc ack")
}
return c.outputBuffer.MallocAck(n)
}

// Append implements Connection.
func (c *connection) Append(w Writer) (err error) {
if !c.IsActive() {
return Exception(ErrConnClosed, "when append")
}
return c.outputBuffer.Append(w)
}

// WriteString implements Connection.
func (c *connection) WriteString(s string) (n int, err error) {
if !c.IsActive() {
return 0, Exception(ErrConnClosed, "when write string")
}
return c.outputBuffer.WriteString(s)
}

// WriteBinary implements Connection.
func (c *connection) WriteBinary(b []byte) (n int, err error) {
if !c.IsActive() {
return 0, Exception(ErrConnClosed, "when write binary")
}
return c.outputBuffer.WriteBinary(b)
}

// WriteDirect implements Connection.
func (c *connection) WriteDirect(p []byte, remainCap int) (err error) {
if !c.IsActive() {
return Exception(ErrConnClosed, "when write direct")
}
return c.outputBuffer.WriteDirect(p, remainCap)
}

// WriteByte implements Connection.
func (c *connection) WriteByte(b byte) (err error) {
if !c.IsActive() {
return Exception(ErrConnClosed, "when write byte")
}
return c.outputBuffer.WriteByte(b)
}

Expand Down
39 changes: 39 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,45 @@ func TestConnectionServerClose(t *testing.T) {
wg.Wait()
}

func TestWriterAfterClose(t *testing.T) {
r, w := GetSysFdPairs()
rconn, wconn := &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
wconn.init(&netFD{fd: w}, nil)

err := wconn.Close()
MustNil(t, err)

for wconn.IsActive() {
runtime.Gosched()
}

methods := []struct {
name string
fn func() error
}{
{"Malloc", func() error { _, err := wconn.Malloc(1); return err }},
{"MallocAck", func() error { return wconn.MallocAck(0) }},
{"WriteBinary", func() error { _, err := wconn.WriteBinary([]byte("hi")); return err }},
{"WriteString", func() error { _, err := wconn.WriteString("hi"); return err }},
{"WriteByte", func() error { return wconn.WriteByte('a') }},
{"WriteDirect", func() error { return wconn.WriteDirect([]byte("hi"), 0) }},
{"Flush", func() error { return wconn.Flush() }},
}
for _, tc := range methods {
t.Run(tc.name, func(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatalf("Writer.%s panicked after Close: %v", tc.name, r)
}
}()
err := tc.fn()
Assert(t, err != nil, fmt.Sprintf("Writer.%s should return error after Close", tc.name))
})
}
rconn.Close()
}

func TestConnectionDailTimeoutAndClose(t *testing.T) {
ln := createTestTCPListener(t)
defer ln.Close()
Expand Down
3 changes: 3 additions & 0 deletions mux/shard_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func (q *ShardQueue) foreach() {

// deal is used to get deal of netpoll.Writer.
func (q *ShardQueue) deal(gts []WriterGetter) {
if !q.conn.IsActive() {
return
}
writer := q.conn.Writer()
for _, gt := range gts {
buf, isNil := gt()
Expand Down
2 changes: 0 additions & 2 deletions netpoll_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,6 @@ func TestServerReadAndClose(t *testing.T) {
runtime.Gosched() // wait for poller close connection
}
_, err = conn.Writer().WriteBinary(sendMsg)
MustNil(t, err)
err = conn.Writer().Flush()
Assert(t, errors.Is(err, ErrConnClosed), err)

err = loop.Shutdown(context.Background())
Expand Down