@@ -8,6 +8,7 @@ import com.twitter.finagle.mysql.LostSyncException.const
8
8
import com .twitter .finagle .mysql .param .{MaxConcurrentPrepareStatements , UnsignedColumns }
9
9
import com .twitter .finagle .mysql .transport .{MysqlBuf , MysqlBufReader , Packet }
10
10
import com .twitter .finagle .param .Stats
11
+ import com .twitter .finagle .stats .{Counter , LazyStatsReceiver , StatsReceiver }
11
12
import com .twitter .finagle .transport .Transport
12
13
import com .twitter .finagle .{Service , ServiceProxy , Stack }
13
14
import com .twitter .util ._
@@ -24,13 +25,32 @@ case class ServerError(code: Short, sqlState: String, message: String) extends E
24
25
* the chances of leaking prepared statements and can simplify the
25
26
* implementation of prepared statements in the presence of a connection pool.
26
27
*/
27
- private [mysql] class PrepareCache (svc : Service [Request , Result ], cache : Caffeine [Object , Object ])
28
+ private [mysql] class PrepareCache (
29
+ svc : Service [Request , Result ],
30
+ cache : Caffeine [Object , Object ],
31
+ statsReceiver : StatsReceiver )
28
32
extends ServiceProxy [Request , Result ](svc) {
29
33
34
+ private [this ] val scopedStatsReceiver = new LazyStatsReceiver (
35
+ statsReceiver.scope(" pstmt-cache" )
36
+ )
37
+
38
+ private [this ] val evictionCounters = {
39
+ val counters = new Array [Counter ](RemovalCause .values().length)
40
+ for (value <- RemovalCause .values()) {
41
+ counters(value.ordinal()) =
42
+ scopedStatsReceiver.counter(s " evicted_ ${value.name().toLowerCase}" )
43
+ }
44
+ counters
45
+ }
46
+ private [this ] val callCounter = scopedStatsReceiver.counter(" calls" )
47
+ private [this ] val missCounter = scopedStatsReceiver.counter(" misses" )
48
+
30
49
private [this ] val fn = {
31
50
val listener = new RemovalListener [Request , Future [Result ]] {
32
51
// make sure prepared futures get removed eventually
33
52
def onRemoval (request : Request , response : Future [Result ], cause : RemovalCause ): Unit = {
53
+ evictionCounters(cause.ordinal()).incr()
34
54
response.respond {
35
55
case Return (r : PrepareOK ) =>
36
56
svc(CloseRequest (r.id)).unit
@@ -43,15 +63,23 @@ private[mysql] class PrepareCache(svc: Service[Request, Result], cache: Caffeine
43
63
.removalListener(listener)
44
64
.build[Request , Future [Result ]]()
45
65
46
- CaffeineCache .fromCache(Service .mk { req : Request => svc(req) }, underlying)
66
+ CaffeineCache .fromCache(
67
+ fn = { req : Request =>
68
+ missCounter.incr()
69
+ svc(req)
70
+ },
71
+ cache = underlying
72
+ )
47
73
}
48
74
49
75
/**
50
76
* Populate cache with unique prepare requests identified by their
51
77
* sql queries.
52
78
*/
53
79
override def apply (req : Request ): Future [Result ] = req match {
54
- case _ : PrepareRequest => fn(req)
80
+ case _ : PrepareRequest =>
81
+ callCounter.incr()
82
+ fn(req)
55
83
case _ => super .apply(req)
56
84
}
57
85
}
@@ -67,8 +95,9 @@ private[finagle] object ClientDispatcher {
67
95
def apply (trans : Transport [Packet , Packet ], params : Stack .Params ): Service [Request , Result ] = {
68
96
val maxConcurrentPrepareStatements = params[MaxConcurrentPrepareStatements ].num
69
97
new PrepareCache (
70
- new ClientDispatcher (trans, params),
71
- Caffeine .newBuilder().maximumSize(maxConcurrentPrepareStatements)
98
+ svc = new ClientDispatcher (trans, params),
99
+ cache = Caffeine .newBuilder().maximumSize(maxConcurrentPrepareStatements),
100
+ statsReceiver = params[Stats ].statsReceiver
72
101
)
73
102
}
74
103
@@ -171,8 +200,9 @@ private[finagle] final class ClientDispatcher(
171
200
(seq2, _) <- readTx(req, ok.numOfCols)
172
201
ps <- Future .collect(seq1.map { p => const(Field (p)) })
173
202
cs <- Future .collect(seq2.map { p => const(Field (p)) })
174
- } yield ok.copy(params = ps, columns = cs)
175
-
203
+ } yield {
204
+ ok.copy(params = ps, columns = cs)
205
+ }
176
206
result.ensure { signal.setDone() }
177
207
178
208
// decode OK Result
0 commit comments