Skip to content

Commit edc5ccf

Browse files
authored
Merge pull request grpc#817 from menghanl/finish_clientstream_on_error
Finish trace when newclientsteram returns error
2 parents 111daf1 + d2b50c7 commit edc5ccf

File tree

1 file changed

+36
-26
lines changed

1 file changed

+36
-26
lines changed

stream.go

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,10 @@ type ClientStream interface {
9797

9898
// NewClientStream creates a new Stream for the client side. This is called
9999
// by generated code.
100-
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
100+
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
101101
var (
102102
t transport.ClientTransport
103103
s *transport.Stream
104-
err error
105104
put func()
106105
)
107106
c := defaultCallInfo
@@ -118,27 +117,24 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
118117
if cc.dopts.cp != nil {
119118
callHdr.SendCompress = cc.dopts.cp.Type()
120119
}
121-
cs := &clientStream{
122-
opts: opts,
123-
c: c,
124-
desc: desc,
125-
codec: cc.dopts.codec,
126-
cp: cc.dopts.cp,
127-
dc: cc.dopts.dc,
128-
tracing: EnableTracing,
129-
}
130-
if cc.dopts.cp != nil {
131-
callHdr.SendCompress = cc.dopts.cp.Type()
132-
cs.cbuf = new(bytes.Buffer)
133-
}
134-
if cs.tracing {
135-
cs.trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
136-
cs.trInfo.firstLine.client = true
120+
var trInfo traceInfo
121+
if EnableTracing {
122+
trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
123+
trInfo.firstLine.client = true
137124
if deadline, ok := ctx.Deadline(); ok {
138-
cs.trInfo.firstLine.deadline = deadline.Sub(time.Now())
125+
trInfo.firstLine.deadline = deadline.Sub(time.Now())
139126
}
140-
cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false)
141-
ctx = trace.NewContext(ctx, cs.trInfo.tr)
127+
trInfo.tr.LazyLog(&trInfo.firstLine, false)
128+
ctx = trace.NewContext(ctx, trInfo.tr)
129+
defer func() {
130+
if err != nil {
131+
// Need to call tr.finish() if error is returned.
132+
// Because tr will not be returned to caller.
133+
trInfo.tr.LazyPrintf("RPC: [%v]", err)
134+
trInfo.tr.SetError()
135+
trInfo.tr.Finish()
136+
}
137+
}()
142138
}
143139
gopts := BalancerGetOptions{
144140
BlockingWait: !c.failFast,
@@ -168,7 +164,6 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
168164
}
169165
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
170166
if c.failFast {
171-
cs.finish(err)
172167
return nil, toRPCErr(err)
173168
}
174169
continue
@@ -177,10 +172,25 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
177172
}
178173
break
179174
}
180-
cs.put = put
181-
cs.t = t
182-
cs.s = s
183-
cs.p = &parser{r: s}
175+
cs := &clientStream{
176+
opts: opts,
177+
c: c,
178+
desc: desc,
179+
codec: cc.dopts.codec,
180+
cp: cc.dopts.cp,
181+
dc: cc.dopts.dc,
182+
183+
put: put,
184+
t: t,
185+
s: s,
186+
p: &parser{r: s},
187+
188+
tracing: EnableTracing,
189+
trInfo: trInfo,
190+
}
191+
if cc.dopts.cp != nil {
192+
cs.cbuf = new(bytes.Buffer)
193+
}
184194
// Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination
185195
// when there is no pending I/O operations on this stream.
186196
go func() {

0 commit comments

Comments
 (0)