16
16
*/
17
17
package com .alipay .sofa .rpc .interceptor ;
18
18
19
+ import com .alipay .common .tracer .core .context .trace .SofaTraceContext ;
20
+ import com .alipay .common .tracer .core .holder .SofaTraceContextHolder ;
21
+ import com .alipay .common .tracer .core .span .SofaTracerSpan ;
19
22
import com .alipay .sofa .rpc .config .ConsumerConfig ;
23
+ import com .alipay .sofa .rpc .context .RpcInternalContext ;
20
24
import com .alipay .sofa .rpc .context .RpcInvokeContext ;
21
25
import com .alipay .sofa .rpc .context .RpcRunningState ;
26
+ import com .alipay .sofa .rpc .core .exception .RpcErrorType ;
27
+ import com .alipay .sofa .rpc .core .exception .SofaRouteException ;
28
+ import com .alipay .sofa .rpc .core .exception .SofaRpcException ;
29
+ import com .alipay .sofa .rpc .core .exception .SofaTimeOutException ;
22
30
import com .alipay .sofa .rpc .core .request .SofaRequest ;
31
+ import com .alipay .sofa .rpc .core .response .SofaResponse ;
32
+ import com .alipay .sofa .rpc .event .ClientAsyncReceiveEvent ;
33
+ import com .alipay .sofa .rpc .event .EventBus ;
23
34
import com .alipay .sofa .rpc .server .triple .TripleContants ;
24
35
import com .alipay .sofa .rpc .tracer .sofatracer .TripleTracerAdapter ;
25
36
import io .grpc .CallOptions ;
@@ -59,12 +70,15 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT
59
70
60
71
@ Override
61
72
public void start (Listener <RespT > responseListener , Metadata requestHeader ) {
62
-
73
+ RpcInternalContext internalContext = RpcInternalContext . getContext ();
63
74
RpcInvokeContext context = RpcInvokeContext .getContext ();
64
75
SofaRequest sofaRequest = (SofaRequest ) context .get (TripleContants .SOFA_REQUEST_KEY );
65
76
66
- ConsumerConfig consumerConfig = (ConsumerConfig ) context .get (TripleContants .SOFA_CONSUMER_CONFIG_KEY );
67
- TripleTracerAdapter .beforeSend (sofaRequest , consumerConfig , requestHeader );
77
+ ConsumerConfig <?> consumerConfig = (ConsumerConfig <?>) context
78
+ .get (TripleContants .SOFA_CONSUMER_CONFIG_KEY );
79
+ TripleTracerAdapter .beforeSend (sofaRequest , consumerConfig , requestHeader , method );
80
+ SofaTraceContext sofaTraceContext = SofaTraceContextHolder .getSofaTraceContext ();
81
+ SofaTracerSpan clientSpan = sofaTraceContext .getCurrentSpan ();
68
82
if (RpcRunningState .isDebugMode ()) {
69
83
LOGGER .info ("[2]prepare to send from client:{}" , requestHeader );
70
84
}
@@ -80,18 +94,48 @@ public void onHeaders(Metadata responseHeader) {
80
94
81
95
@ Override
82
96
public void onMessage (RespT message ) {
83
- if (RpcRunningState .isDebugMode ()) {
84
- LOGGER .info ("[4]response message received from server:{}" , message );
97
+ // onMessage -> onNext()
98
+ try {
99
+ if (sofaRequest .isAsync ()) {
100
+ RpcInvokeContext .setContext (context );
101
+ sofaTraceContext .push (clientSpan );
102
+ }
103
+ if (RpcRunningState .isDebugMode ()) {
104
+ LOGGER .info ("[4]response message received from server:{}" , message );
105
+ }
106
+ super .onMessage (message );
107
+ } finally {
108
+ if (sofaRequest .isAsync ()) {
109
+ sofaTraceContext .clear ();
110
+ RpcInvokeContext .removeContext ();
111
+ }
85
112
}
86
- super .onMessage (message );
87
113
}
88
114
89
115
@ Override
90
116
public void onClose (Status status , Metadata trailers ) {
91
- if (RpcRunningState .isDebugMode ()) {
92
- LOGGER .info ("[5]response close received from server:{},trailers:{}" , status , trailers );
117
+ // onClose -> onComplete() or onError()
118
+ try {
119
+ if (sofaRequest .isAsync ()) {
120
+ RpcInvokeContext .setContext (context );
121
+ sofaTraceContext .push (clientSpan );
122
+ }
123
+ if (RpcRunningState .isDebugMode ()) {
124
+ LOGGER .info ("[5]response close received from server:{},trailers:{}" , status , trailers );
125
+ }
126
+ super .onClose (status , trailers );
127
+ } finally {
128
+ if (sofaRequest .isAsync ()) {
129
+ Throwable throwable = getThrowableFromStatus (status );
130
+ RpcInternalContext .setContext (internalContext );
131
+ if (EventBus .isEnable (ClientAsyncReceiveEvent .class )) {
132
+ EventBus .post (new ClientAsyncReceiveEvent (consumerConfig , null ,
133
+ sofaRequest , new SofaResponse (), throwable ));
134
+ }
135
+ RpcInvokeContext .removeContext ();
136
+ RpcInternalContext .removeAllContext ();
137
+ }
93
138
}
94
- super .onClose (status , trailers );
95
139
}
96
140
97
141
@ Override
@@ -104,6 +148,27 @@ public void onReady() {
104
148
}, requestHeader );
105
149
}
106
150
151
+ @ Override
152
+ public void sendMessage (ReqT message ) {
153
+ try {
154
+ super .sendMessage (message );
155
+ } catch (Throwable t ) {
156
+ LOGGER .error ("Client invoke grpc sendMessage meet error:" , t );
157
+ throw t ;
158
+ }
159
+ }
107
160
};
108
161
}
162
+
163
+ private static Throwable getThrowableFromStatus (Status status ) {
164
+ if (status .getCode () == Status .OK .getCode ()) {
165
+ return null ;
166
+ } else if (status .getCode () == Status .UNAVAILABLE .getCode ()) {
167
+ return new SofaRouteException (status .getDescription (), status .getCause ());
168
+ } else if (status .getCode () == Status .DEADLINE_EXCEEDED .getCode ()) {
169
+ return new SofaTimeOutException (status .getDescription (), status .getCause ());
170
+ } else {
171
+ return new SofaRpcException (RpcErrorType .UNKNOWN , status .getDescription (), status .getCause ());
172
+ }
173
+ }
109
174
}
0 commit comments