@@ -5,6 +5,12 @@ import com.twitter.finagle.Failure
5
5
import com .twitter .finagle .FailureFlags
6
6
import com .twitter .finagle .Service
7
7
import com .twitter .finagle .Status
8
+ import com .twitter .finagle .stats .DefaultStatsReceiver
9
+ import com .twitter .finagle .stats .HistogramFormat
10
+ import com .twitter .finagle .stats .MetricBuilder
11
+ import com .twitter .finagle .stats .MetricBuilder .HistogramType
12
+ import com .twitter .finagle .stats .MetricUsageHint
13
+ import com .twitter .finagle .stats .StatsReceiver
8
14
import com .twitter .logging .Logger
9
15
import com .twitter .util ._
10
16
@@ -22,13 +28,29 @@ import com.twitter.util._
22
28
* result of another request unless the connection is stuck, and does not
23
29
* look like it will make progress. Use `stallTimeout` to configure this timeout.
24
30
*/
25
- final class PipeliningClientPushSession [In , Out ](
31
+ class PipeliningClientPushSession [In , Out ](
26
32
handle : PushChannelHandle [In , Out ],
27
33
stallTimeout : Duration ,
28
- timer : Timer )
34
+ timer : Timer ,
35
+ statsReceiver : StatsReceiver = DefaultStatsReceiver )
29
36
extends PushSession [In , Out ](handle) { self =>
30
37
31
38
private [this ] val logger = Logger .get
39
+ private [this ] val scopedStatsReceived = statsReceiver.scope(" pipelining_client" )
40
+ private [this ] val epollQueueDelay = scopedStatsReceived.stat(
41
+ MetricBuilder (metricType = HistogramType )
42
+ .withHistogramFormat(HistogramFormat .FullSummary )
43
+ .withPercentiles(0.5 , 0.9 , 0.99 , 0.999 , 0.9999 )
44
+ .withMetricUsageHints(Set (MetricUsageHint .HighContention ))
45
+ .withName(" epoll_queue_delay_ns" )
46
+ )
47
+ private [this ] val messageSendLatency = scopedStatsReceived.stat(
48
+ MetricBuilder (metricType = HistogramType )
49
+ .withHistogramFormat(HistogramFormat .FullSummary )
50
+ .withPercentiles(0.5 , 0.9 , 0.99 , 0.999 , 0.9999 )
51
+ .withMetricUsageHints(Set (MetricUsageHint .HighContention ))
52
+ .withName(" message_send_latency_ns" )
53
+ )
32
54
33
55
// used only within SerialExecutor
34
56
private [this ] val h_queue = new java.util.ArrayDeque [Promise [In ]]()
@@ -93,7 +115,9 @@ final class PipeliningClientPushSession[In, Out](
93
115
})
94
116
}
95
117
}
96
- handle.serialExecutor.execute(new Runnable { def run (): Unit = handleDispatch(request, p) })
118
+
119
+ val requestStartTime = System .nanoTime()
120
+ handle.serialExecutor.execute(() => handleDispatch(request, p, requestStartTime))
97
121
p
98
122
}
99
123
@@ -123,12 +147,16 @@ final class PipeliningClientPushSession[In, Out](
123
147
}
124
148
}
125
149
126
- private [this ] def handleDispatch (request : Out , p : Promise [In ]): Unit = {
150
+ private [this ] def handleDispatch (request : Out , p : Promise [In ], requestStartTime : Long ): Unit = {
151
+ val handleStartTime = System .nanoTime()
152
+ epollQueueDelay.add(handleStartTime - requestStartTime)
127
153
if (! h_running) p.setException(new ChannelClosedException (handle.remoteAddress))
128
154
else {
129
155
h_queue.offer(p)
130
156
h_queueSize += 1
131
- handle.sendAndForget(request)
157
+ handle.send(request) { _ =>
158
+ messageSendLatency.add(System .nanoTime() - handleStartTime)
159
+ }
132
160
}
133
161
}
134
162
}
0 commit comments