Skip to content

Commit 7954703

Browse files
jcrossleyjenkins
authored and
jenkins
committed
finagle/finagle-core: Introduce SLOStatsFilter for recording SLO violations
Problem Currently, server-side stats expose success rate and latency metrics, but there is no way to know how often a target latency is being violated. Solution Introduce SLOStatsFilter (still in development, not yet added to the default server stack) that can be configured with a target latency. This filter records both latency violations and response failure violations to provide a picture of the server's SLO compliance. Differential Revision: https://phabricator.twitter.biz/D1191333
1 parent 8b6572d commit 7954703

File tree

2 files changed

+317
-0
lines changed

2 files changed

+317
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package com.twitter.finagle.filter
2+
3+
import com.twitter.finagle.FailureFlags
4+
import com.twitter.finagle.stats.StatsReceiver
5+
import com.twitter.finagle.param
6+
import com.twitter.finagle.Service
7+
import com.twitter.finagle.ServiceFactory
8+
import com.twitter.finagle.SimpleFilter
9+
import com.twitter.finagle.Stack
10+
import com.twitter.finagle.Stackable
11+
import com.twitter.finagle.service.ReqRep
12+
import com.twitter.finagle.service.ResponseClass
13+
import com.twitter.finagle.service.ResponseClassifier
14+
import com.twitter.util.Duration
15+
import com.twitter.util.Future
16+
import com.twitter.util.Stopwatch
17+
import com.twitter.util.Throw
18+
import com.twitter.util.Try
19+
20+
private[twitter] object SLOStatsFilter {
21+
val role = Stack.Role("SLOStats")
22+
23+
sealed trait Param {
24+
def mk(): (Param, Stack.Param[Param]) = (this, Param.param)
25+
}
26+
27+
object Param {
28+
case class Configured(
29+
latency: Duration)
30+
extends Param
31+
32+
case object Disabled extends Param
33+
34+
implicit val param: Stack.Param[SLOStatsFilter.Param] = Stack.Param(Disabled)
35+
}
36+
37+
val Disabled: Param = Param.Disabled
38+
39+
def configured(latency: Duration): Param = {
40+
Param.Configured(latency)
41+
}
42+
43+
def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
44+
new Stack.Module3[param.Stats, param.ResponseClassifier, Param, ServiceFactory[Req, Rep]] {
45+
val role = SLOStatsFilter.role
46+
val description =
47+
"Record number of SLO violations of underlying service"
48+
override def make(
49+
_stats: param.Stats,
50+
_responseClassifier: param.ResponseClassifier,
51+
params: Param,
52+
next: ServiceFactory[Req, Rep]
53+
): ServiceFactory[Req, Rep] = {
54+
params match {
55+
case Param.Disabled => next
56+
case Param.Configured(latency) =>
57+
val param.Stats(statsReceiver) = _stats
58+
val param.ResponseClassifier(responseClassifier) = _responseClassifier
59+
new SLOStatsFilter(
60+
statsReceiver.scope("slo"),
61+
latency.inNanoseconds,
62+
responseClassifier).andThen(next)
63+
}
64+
}
65+
}
66+
}
67+
68+
/**
69+
* A [[com.twitter.finagle.Filter]] that records the number of slo violations from the underlying
70+
* service. A request is classified as violating the slo if any of the following occur:
71+
* - The response returns after `latency` duration has elapsed
72+
* - The response is classified as a failure according to the ResponseClassifier (but is not
73+
* ignorable or interrupted)
74+
*/
75+
private[finagle] class SLOStatsFilter[Req, Rep](
76+
statsReceiver: StatsReceiver,
77+
latencyNanos: Long,
78+
responseClassifier: ResponseClassifier,
79+
nowNanos: () => Long = Stopwatch.systemNanos)
80+
extends SimpleFilter[Req, Rep] {
81+
82+
private[this] val violationsScope = statsReceiver.scope("violations")
83+
private[this] val violationsTotalCounter = violationsScope.counter("total")
84+
private[this] val violationsFailuresCounter = violationsScope.counter("failures")
85+
private[this] val violationsLatencyCounter = violationsScope.counter("latency")
86+
87+
def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
88+
val start = nowNanos()
89+
service(request).respond { response =>
90+
if (!isIgnorable(response)) {
91+
var violated = false
92+
if (nowNanos() - start > latencyNanos) {
93+
violated = true
94+
violationsLatencyCounter.incr()
95+
}
96+
97+
if (isFailure(request, response)) {
98+
violated = true
99+
violationsFailuresCounter.incr()
100+
}
101+
102+
if (violated) {
103+
violationsTotalCounter.incr()
104+
}
105+
}
106+
}
107+
}
108+
109+
private[this] def isFailure(request: Req, response: Try[Rep]): Boolean = {
110+
responseClassifier
111+
.applyOrElse(ReqRep(request, response), ResponseClassifier.Default) match {
112+
case ResponseClass.Failed(_) if !isInterrupted(response) => true
113+
case _ => false
114+
}
115+
}
116+
117+
private[this] def isIgnorable(response: Try[Rep]): Boolean = response match {
118+
case Throw(f: FailureFlags[_]) => f.isFlagged(FailureFlags.Ignorable)
119+
case _ => false
120+
}
121+
122+
private[this] def isInterrupted(response: Try[Rep]): Boolean = response match {
123+
case Throw(f: FailureFlags[_]) => f.isFlagged(FailureFlags.Interrupted)
124+
case _ => false
125+
}
126+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package com.twitter.finagle.filter
2+
3+
import com.twitter.conversions.DurationOps._
4+
import com.twitter.finagle.Failure
5+
import com.twitter.finagle.FailureFlags
6+
import com.twitter.finagle.Service
7+
import com.twitter.finagle.ServiceFactory
8+
import com.twitter.finagle.Stack
9+
import com.twitter.finagle.param.Stats
10+
import com.twitter.finagle.service.ReqRep
11+
import com.twitter.finagle.service.ResponseClass
12+
import com.twitter.finagle.service.ResponseClassifier
13+
import com.twitter.finagle.service.ResponseClassifier.named
14+
import com.twitter.finagle.stats.InMemoryStatsReceiver
15+
import com.twitter.finagle.stats.StatsReceiver
16+
import com.twitter.util.Await
17+
import com.twitter.util.Future
18+
import com.twitter.util.MockTimer
19+
import com.twitter.util.Return
20+
import com.twitter.util.Time
21+
import org.scalatest.funsuite.AnyFunSuite
22+
23+
class SLOStatsFilterTest extends AnyFunSuite {
24+
25+
private[this] val unitSvc: Service[Unit, Unit] = Service.mk { _ =>
26+
Future.Unit
27+
}
28+
29+
private[this] def mkService(
30+
statsReceiver: StatsReceiver,
31+
params: Stack.Params,
32+
underlying: Service[Unit, Unit] = unitSvc
33+
): Service[Unit, Unit] = {
34+
val factory = SLOStatsFilter.module
35+
.toStack(Stack.leaf(Stack.Role("test"), ServiceFactory.const(underlying)))
36+
.make(params + Stats(statsReceiver))
37+
Await.result(factory())
38+
}
39+
40+
test("Does not create the filter if not configured") {
41+
val statsReceiver = new InMemoryStatsReceiver
42+
val service = mkService(statsReceiver, Stack.Params.empty)
43+
Await.result(service(()))
44+
assert(!statsReceiver.counters.contains(Seq("slo", "violations", "total")))
45+
}
46+
47+
test("Does not create the filter if disabled") {
48+
val statsReceiver = new InMemoryStatsReceiver
49+
val service = mkService(statsReceiver, Stack.Params.empty + SLOStatsFilter.Disabled)
50+
Await.result(service(()))
51+
52+
assert(!statsReceiver.counters.contains(Seq("slo", "violations", "total")))
53+
}
54+
55+
test("Creates the filter if configured") {
56+
val statsReceiver = new InMemoryStatsReceiver
57+
val service =
58+
mkService(statsReceiver, Stack.Params.empty + SLOStatsFilter.configured(5.seconds))
59+
Await.result(service(()))
60+
61+
assert(statsReceiver.counters.contains(Seq("slo", "violations", "total")))
62+
}
63+
64+
test("Records latency violation if latency is violated") {
65+
val latency = 5.seconds
66+
val statsReceiver = new InMemoryStatsReceiver
67+
Time.withCurrentTimeFrozen { tc =>
68+
val timer = new MockTimer
69+
val filter = new SLOStatsFilter[Unit, Unit](
70+
statsReceiver = statsReceiver,
71+
latencyNanos = latency.inNanoseconds,
72+
responseClassifier = ResponseClassifier.Default,
73+
nowNanos = () => Time.now.inNanoseconds
74+
)
75+
76+
val service: Service[Unit, Unit] = Service.mk { _ =>
77+
Future.Unit.delayed(latency + 1.second)(timer)
78+
}
79+
80+
val res = filter((), service)
81+
82+
tc.advance(latency + 1.second)
83+
timer.tick()
84+
85+
Await.result(res)
86+
87+
assert(statsReceiver.counters(Seq("violations", "total")) == 1)
88+
assert(statsReceiver.counters(Seq("violations", "latency")) == 1)
89+
assert(statsReceiver.counters(Seq("violations", "failures")) == 0)
90+
}
91+
}
92+
93+
test("Records failure violation if response is a failure according to classifier") {
94+
val latency = 5.seconds
95+
val statsReceiver = new InMemoryStatsReceiver
96+
val responseClassifier = named("SuccessIsFailure") {
97+
case ReqRep(_, Return(_)) => ResponseClass.NonRetryableFailure
98+
}
99+
100+
val filter = new SLOStatsFilter[Unit, Unit](
101+
statsReceiver = statsReceiver,
102+
latencyNanos = latency.inNanoseconds,
103+
responseClassifier = responseClassifier,
104+
nowNanos = () => Time.now.inNanoseconds
105+
)
106+
107+
val service: Service[Unit, Unit] = Service.mk { _ =>
108+
Future.Done
109+
}
110+
111+
Await.result(filter((), service))
112+
113+
assert(statsReceiver.counters(Seq("violations", "total")) == 1)
114+
assert(statsReceiver.counters(Seq("violations", "latency")) == 0)
115+
assert(statsReceiver.counters(Seq("violations", "failures")) == 1)
116+
}
117+
118+
test(
119+
"Records latency and failure violation if violated, does not double-count total violations") {
120+
val latency = 5.seconds
121+
val statsReceiver = new InMemoryStatsReceiver
122+
Time.withCurrentTimeFrozen { tc =>
123+
val timer = new MockTimer
124+
val responseClassifier = named("SuccessIsFailure") {
125+
case ReqRep(_, Return(_)) => ResponseClass.NonRetryableFailure
126+
}
127+
val filter = new SLOStatsFilter[Unit, Unit](
128+
statsReceiver = statsReceiver,
129+
latencyNanos = latency.inNanoseconds,
130+
responseClassifier = responseClassifier,
131+
nowNanos = () => Time.now.inNanoseconds
132+
)
133+
134+
val service: Service[Unit, Unit] = Service.mk { _ =>
135+
Future.Unit.delayed(latency + 1.second)(timer)
136+
}
137+
138+
val res = filter((), service)
139+
140+
tc.advance(latency + 1.second)
141+
timer.tick()
142+
143+
Await.result(res)
144+
145+
assert(statsReceiver.counters(Seq("violations", "total")) == 1)
146+
assert(statsReceiver.counters(Seq("violations", "latency")) == 1)
147+
assert(statsReceiver.counters(Seq("violations", "failures")) == 1)
148+
}
149+
}
150+
151+
test("Does not record violation if response is ignorable failure") {
152+
val latency = 5.seconds
153+
val statsReceiver = new InMemoryStatsReceiver
154+
val responseClassifier = named("SuccessIsFailure") {
155+
case ReqRep(_, Return(_)) => ResponseClass.NonRetryableFailure
156+
}
157+
158+
val filter = new SLOStatsFilter[Unit, Unit](
159+
statsReceiver = statsReceiver,
160+
latencyNanos = latency.inNanoseconds,
161+
responseClassifier = responseClassifier,
162+
nowNanos = () => Time.now.inNanoseconds
163+
)
164+
165+
val service: Service[Unit, Unit] = Service.mk { _ =>
166+
Future.exception(new Failure("boom!", flags = FailureFlags.Ignorable))
167+
}
168+
169+
intercept[Exception](Await.result(filter((), service)))
170+
assert(statsReceiver.counters(Seq("violations", "total")) == 0)
171+
}
172+
173+
test("Does not record violation if response is interrupted failure") {
174+
val latency = 5.seconds
175+
val statsReceiver = new InMemoryStatsReceiver
176+
177+
val filter = new SLOStatsFilter[Unit, Unit](
178+
statsReceiver = statsReceiver,
179+
latencyNanos = latency.inNanoseconds,
180+
responseClassifier = ResponseClassifier.Default,
181+
nowNanos = () => Time.now.inNanoseconds
182+
)
183+
184+
val service: Service[Unit, Unit] = Service.mk { _ =>
185+
Future.exception(new Failure("boom!", flags = FailureFlags.Interrupted))
186+
}
187+
188+
intercept[Exception](Await.result(filter((), service)))
189+
assert(statsReceiver.counters(Seq("violations", "total")) == 0)
190+
}
191+
}

0 commit comments

Comments
 (0)