Skip to content

Commit d649959

Browse files
authored
Querier: Allow switching between engines via header (#6777)
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent e8891e2 commit d649959

27 files changed

+8186
-148
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## master / unreleased
4+
* [FEATURE] Querier: Allow choosing PromQL engine via header. #6777
45
* [BUGFIX] ThanosEngine: Only enable default optimizers. #6776
56
* [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766
67
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769

docs/api/_index.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ GET,POST <legacy-http-prefix>/api/v1/query
331331
```
332332

333333
Prometheus-compatible instant query endpoint.
334+
PromQL engine can be selected using `X-PromQL-EngineType` header with values `prometheus` (default) and `thanos`.
334335

335336
_For more information, please check out the Prometheus [instant query](https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries) documentation._
336337

@@ -346,6 +347,7 @@ GET,POST <legacy-http-prefix>/api/v1/query_range
346347
```
347348

348349
Prometheus-compatible range query endpoint. When the request is sent through the query-frontend, the query will be accelerated by query-frontend (results caching and execution parallelisation).
350+
PromQL engine can be selected using `X-PromQL-EngineType` header with values `prometheus` (default) and `thanos`.
349351

350352
_For more information, please check out the Prometheus [range query](https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries) documentation._
351353

pkg/api/queryapi/query_api.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
v1 "github.com/prometheus/prometheus/web/api/v1"
1818
"github.com/weaveworks/common/httpgrpc"
1919

20+
"github.com/cortexproject/cortex/pkg/engine"
2021
"github.com/cortexproject/cortex/pkg/util"
2122
"github.com/cortexproject/cortex/pkg/util/api"
2223
)
@@ -95,6 +96,8 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
9596
if err != nil {
9697
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
9798
}
99+
100+
ctx = engine.AddEngineTypeToContext(ctx, r)
98101
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
99102
if err != nil {
100103
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
@@ -148,6 +151,8 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
148151
if err != nil {
149152
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
150153
}
154+
155+
ctx = engine.AddEngineTypeToContext(ctx, r)
151156
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
152157
if err != nil {
153158
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")

pkg/engine/engine.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package engine
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"time"
7+
8+
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/prometheus/client_golang/prometheus/promauto"
10+
"github.com/prometheus/prometheus/promql"
11+
"github.com/prometheus/prometheus/storage"
12+
thanosengine "github.com/thanos-io/promql-engine/engine"
13+
"github.com/thanos-io/promql-engine/logicalplan"
14+
)
15+
16+
type engineKeyType struct{}
17+
18+
var engineKey = engineKeyType{}
19+
20+
const TypeHeader = "X-PromQL-EngineType"
21+
22+
type Type string
23+
24+
const (
25+
Prometheus Type = "prometheus"
26+
Thanos Type = "thanos"
27+
None Type = "none"
28+
)
29+
30+
func AddEngineTypeToContext(ctx context.Context, r *http.Request) context.Context {
31+
ng := Type(r.Header.Get(TypeHeader))
32+
switch ng {
33+
case Prometheus, Thanos:
34+
return context.WithValue(ctx, engineKey, ng)
35+
default:
36+
return context.WithValue(ctx, engineKey, None)
37+
}
38+
}
39+
40+
func GetEngineType(ctx context.Context) Type {
41+
if ng, ok := ctx.Value(engineKey).(Type); ok {
42+
return ng
43+
}
44+
return None
45+
}
46+
47+
type Engine struct {
48+
prometheusEngine *promql.Engine
49+
thanosEngine *thanosengine.Engine
50+
51+
fallbackQueriesTotal prometheus.Counter
52+
engineSwitchQueriesTotal *prometheus.CounterVec
53+
}
54+
55+
func New(opts promql.EngineOpts, enableThanosEngine bool, reg prometheus.Registerer) *Engine {
56+
prometheusEngine := promql.NewEngine(opts)
57+
58+
var thanosEngine *thanosengine.Engine
59+
if enableThanosEngine {
60+
thanosEngine = thanosengine.New(thanosengine.Opts{
61+
EngineOpts: opts,
62+
LogicalOptimizers: logicalplan.DefaultOptimizers,
63+
EnableAnalysis: true,
64+
})
65+
}
66+
67+
return &Engine{
68+
prometheusEngine: prometheusEngine,
69+
thanosEngine: thanosEngine,
70+
fallbackQueriesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
71+
Name: "cortex_thanos_engine_fallback_queries_total",
72+
Help: "Total number of fallback queries due to not implementation in thanos engine",
73+
}),
74+
engineSwitchQueriesTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
75+
Name: "cortex_engine_switch_queries_total",
76+
Help: "Total number of queries where engine_type is set explicitly",
77+
}, []string{"engine_type"}),
78+
}
79+
}
80+
81+
func (qf *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
82+
if engineType := GetEngineType(ctx); engineType == Prometheus {
83+
qf.engineSwitchQueriesTotal.WithLabelValues(string(Prometheus)).Inc()
84+
goto prom
85+
} else if engineType == Thanos {
86+
qf.engineSwitchQueriesTotal.WithLabelValues(string(Thanos)).Inc()
87+
}
88+
89+
if qf.thanosEngine != nil {
90+
res, err := qf.thanosEngine.MakeInstantQuery(ctx, q, fromPromQLOpts(opts), qs, ts)
91+
if err != nil {
92+
if thanosengine.IsUnimplemented(err) {
93+
// fallback to use prometheus engine
94+
qf.fallbackQueriesTotal.Inc()
95+
goto prom
96+
}
97+
return nil, err
98+
}
99+
return res, nil
100+
}
101+
102+
prom:
103+
return qf.prometheusEngine.NewInstantQuery(ctx, q, opts, qs, ts)
104+
}
105+
106+
func (qf *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
107+
if engineType := GetEngineType(ctx); engineType == Prometheus {
108+
qf.engineSwitchQueriesTotal.WithLabelValues(string(Prometheus)).Inc()
109+
goto prom
110+
} else if engineType == Thanos {
111+
qf.engineSwitchQueriesTotal.WithLabelValues(string(Thanos)).Inc()
112+
}
113+
if qf.thanosEngine != nil {
114+
res, err := qf.thanosEngine.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, interval)
115+
if err != nil {
116+
if thanosengine.IsUnimplemented(err) {
117+
// fallback to use prometheus engine
118+
qf.fallbackQueriesTotal.Inc()
119+
goto prom
120+
}
121+
return nil, err
122+
}
123+
return res, nil
124+
}
125+
126+
prom:
127+
return qf.prometheusEngine.NewRangeQuery(ctx, q, opts, qs, start, end, interval)
128+
}
129+
130+
func fromPromQLOpts(opts promql.QueryOpts) *thanosengine.QueryOpts {
131+
if opts == nil {
132+
return &thanosengine.QueryOpts{}
133+
}
134+
return &thanosengine.QueryOpts{
135+
LookbackDeltaParam: opts.LookbackDelta(),
136+
EnablePerStepStatsParam: opts.EnablePerStepStats(),
137+
}
138+
}

pkg/engine/engine_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package engine
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"net/http"
7+
"testing"
8+
"time"
9+
10+
"github.com/go-kit/log"
11+
"github.com/prometheus/client_golang/prometheus"
12+
"github.com/prometheus/client_golang/prometheus/testutil"
13+
"github.com/prometheus/prometheus/promql"
14+
"github.com/prometheus/prometheus/promql/parser"
15+
"github.com/prometheus/prometheus/promql/promqltest"
16+
"github.com/stretchr/testify/require"
17+
18+
utillog "github.com/cortexproject/cortex/pkg/util/log"
19+
)
20+
21+
func TestEngine_Fallback(t *testing.T) {
22+
// add unimplemented function
23+
parser.Functions["unimplemented"] = &parser.Function{
24+
Name: "unimplemented",
25+
ArgTypes: []parser.ValueType{parser.ValueTypeVector},
26+
ReturnType: parser.ValueTypeVector,
27+
}
28+
29+
ctx := context.Background()
30+
reg := prometheus.NewRegistry()
31+
32+
now := time.Now()
33+
start := time.Now().Add(-time.Minute * 5)
34+
step := time.Minute
35+
queryable := promqltest.LoadedStorage(t, "")
36+
opts := promql.EngineOpts{
37+
Logger: utillog.GoKitLogToSlog(log.NewNopLogger()),
38+
Reg: reg,
39+
}
40+
queryEngine := New(opts, true, reg)
41+
42+
// instant query, should go to fallback
43+
_, _ = queryEngine.NewInstantQuery(ctx, queryable, nil, "unimplemented(foo)", now)
44+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
45+
# HELP cortex_thanos_engine_fallback_queries_total Total number of fallback queries due to not implementation in thanos engine
46+
# TYPE cortex_thanos_engine_fallback_queries_total counter
47+
cortex_thanos_engine_fallback_queries_total 1
48+
`), "cortex_thanos_engine_fallback_queries_total"))
49+
50+
// range query, should go to fallback
51+
_, _ = queryEngine.NewRangeQuery(ctx, queryable, nil, "unimplemented(foo)", start, now, step)
52+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
53+
# HELP cortex_thanos_engine_fallback_queries_total Total number of fallback queries due to not implementation in thanos engine
54+
# TYPE cortex_thanos_engine_fallback_queries_total counter
55+
cortex_thanos_engine_fallback_queries_total 2
56+
`), "cortex_thanos_engine_fallback_queries_total"))
57+
}
58+
59+
func TestEngine_Switch(t *testing.T) {
60+
ctx := context.Background()
61+
reg := prometheus.NewRegistry()
62+
63+
now := time.Now()
64+
start := time.Now().Add(-time.Minute * 5)
65+
step := time.Minute
66+
queryable := promqltest.LoadedStorage(t, "")
67+
opts := promql.EngineOpts{
68+
Logger: utillog.GoKitLogToSlog(log.NewNopLogger()),
69+
Reg: reg,
70+
}
71+
queryEngine := New(opts, true, reg)
72+
73+
// Query Prometheus engine
74+
r := &http.Request{Header: http.Header{}}
75+
r.Header.Set(TypeHeader, string(Prometheus))
76+
ctx = AddEngineTypeToContext(ctx, r)
77+
_, _ = queryEngine.NewInstantQuery(ctx, queryable, nil, "foo", now)
78+
_, _ = queryEngine.NewRangeQuery(ctx, queryable, nil, "foo", start, now, step)
79+
80+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
81+
# HELP cortex_engine_switch_queries_total Total number of queries where engine_type is set explicitly
82+
# TYPE cortex_engine_switch_queries_total counter
83+
cortex_engine_switch_queries_total{engine_type="prometheus"} 2
84+
`), "cortex_engine_switch_queries_total"))
85+
86+
// Query Thanos engine
87+
r.Header.Set(TypeHeader, string(Thanos))
88+
ctx = AddEngineTypeToContext(ctx, r)
89+
_, _ = queryEngine.NewInstantQuery(ctx, queryable, nil, "foo", now)
90+
_, _ = queryEngine.NewRangeQuery(ctx, queryable, nil, "foo", start, now, step)
91+
92+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
93+
# HELP cortex_engine_switch_queries_total Total number of queries where engine_type is set explicitly
94+
# TYPE cortex_engine_switch_queries_total counter
95+
cortex_engine_switch_queries_total{engine_type="prometheus"} 2
96+
cortex_engine_switch_queries_total{engine_type="thanos"} 2
97+
`), "cortex_engine_switch_queries_total"))
98+
}

pkg/querier/engine_factory.go

Lines changed: 0 additions & 88 deletions
This file was deleted.

0 commit comments

Comments
 (0)