Skip to content

Commit 04a7f54

Browse files
authored
receive/rule: Fixed Segfault issue; Added tests & benchmarks for TSDBStore, fixed multitsdb benchmarks. (thanos-io#3046)
Fixed thanos-io#3013 Also: * Fixed other quite big issue with reusing chunk slice. * Fixed framing - previously it was wrongly sending single-chunk frames, taking huge amount of time. Fix: We deletage closer now to ensure multitsdb operate on valid data. Signed-off-by: Bartlomiej Plotka <[email protected]>
1 parent 61b8382 commit 04a7f54

File tree

8 files changed

+577
-92
lines changed

8 files changed

+577
-92
lines changed

pkg/store/bucket_test.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1175,7 +1175,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
11751175
var (
11761176
logger = log.NewNopLogger()
11771177
blocks []*bucketBlock
1178-
series []storepb.Series
1178+
series []*storepb.Series
11791179
random = rand.New(rand.NewSource(120))
11801180
)
11811181

@@ -1210,7 +1210,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
12101210
// This allows to pick time range that will correspond to number of series picked 1:1.
12111211
for bi := 0; bi < numOfBlocks; bi++ {
12121212
head, bSeries := storetestutil.CreateHeadWithSeries(t, bi, storetestutil.HeadGenOptions{
1213-
Dir: tmpDir,
1213+
TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", bi)),
12141214
SamplesPerSeries: samplesPerSeriesPerBlock,
12151215
Series: seriesPerBlock,
12161216
PrependLabels: extLset,
@@ -1533,17 +1533,22 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
15331533
}
15341534

15351535
// Create TSDB blocks.
1536-
opts := storetestutil.HeadGenOptions{
1537-
Dir: tmpDir,
1536+
head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{
1537+
TSDBDir: filepath.Join(tmpDir, "0"),
15381538
SamplesPerSeries: 1,
15391539
Series: 2,
15401540
PrependLabels: extLset,
15411541
Random: random,
1542-
}
1543-
head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, opts)
1542+
})
15441543
block1 := createBlockFromHead(t, bktDir, head)
15451544
testutil.Ok(t, head.Close())
1546-
head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, opts)
1545+
head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, storetestutil.HeadGenOptions{
1546+
TSDBDir: filepath.Join(tmpDir, "1"),
1547+
SamplesPerSeries: 1,
1548+
Series: 2,
1549+
PrependLabels: extLset,
1550+
Random: random,
1551+
})
15471552
block2 := createBlockFromHead(t, bktDir, head2)
15481553
testutil.Ok(t, head2.Close())
15491554

@@ -1607,7 +1612,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
16071612
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
16081613
},
16091614
},
1610-
ExpectedSeries: append(append([]storepb.Series{}, seriesSet1...), seriesSet2...),
1615+
ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...),
16111616
ExpectedHints: []hintspb.SeriesResponseHints{
16121617
{
16131618
QueriedBlocks: []hintspb.Block{

pkg/store/multitsdb.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@ package store
66
import (
77
"context"
88
"fmt"
9+
"io"
910
"sync"
1011

1112
"github.com/go-kit/kit/log"
1213
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
1314
"github.com/opentracing/opentracing-go"
1415
"github.com/pkg/errors"
1516
"github.com/prometheus/client_golang/prometheus"
17+
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
18+
"github.com/thanos-io/thanos/pkg/runutil"
1619
"golang.org/x/sync/errgroup"
1720
"google.golang.org/grpc"
1821
"google.golang.org/grpc/codes"
@@ -96,6 +99,8 @@ type tenantSeriesSetServer struct {
9699

97100
err error
98101
tenant string
102+
103+
closers []io.Closer
99104
}
100105

101106
// TODO(bwplotka): Remove tenant awareness; keep it simple with single functionality.
@@ -156,6 +161,18 @@ func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error {
156161
}
157162
}
158163

164+
func (s *tenantSeriesSetServer) Delegate(closer io.Closer) {
165+
s.closers = append(s.closers, closer)
166+
}
167+
168+
func (s *tenantSeriesSetServer) Close() error {
169+
var merr tsdb_errors.MultiError
170+
for _, c := range s.closers {
171+
merr.Add(c.Close())
172+
}
173+
return merr.Err()
174+
}
175+
159176
func (s *tenantSeriesSetServer) Next() (ok bool) {
160177
s.cur, ok = <-s.recv
161178
return ok
@@ -188,6 +205,7 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
188205
// Each might be quite large (multi chunk long series given by sidecar).
189206
respSender, respCh := newCancelableRespChannel(gctx, 10)
190207

208+
var closers []io.Closer
191209
g.Go(func() error {
192210
// This go routine is responsible for calling store's Series concurrently. Merged results
193211
// are passed to respCh and sent concurrently to client (if buffer of 10 have room).
@@ -216,6 +234,8 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
216234
defer wg.Done()
217235
ss.Series(store, r)
218236
}()
237+
238+
closers = append(closers, ss)
219239
seriesSet = append(seriesSet, ss)
220240
}
221241

@@ -237,7 +257,12 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
237257
}
238258
return nil
239259
})
240-
return g.Wait()
260+
err := g.Wait()
261+
for _, c := range closers {
262+
runutil.CloseWithLogOnErr(s.logger, c, "close tenant series request")
263+
}
264+
return err
265+
241266
}
242267

243268
// LabelNames returns all known label names.

pkg/store/multitsdb_test.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -91,29 +91,24 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
9191
}
9292
}()
9393
for j := range dbs {
94+
tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j))
95+
9496
head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{
95-
Dir: tmpDir,
97+
TSDBDir: tsdbDir,
9698
SamplesPerSeries: samplesPerSeriesPerTSDB,
9799
Series: seriesPerTSDB,
98-
WithWAL: true,
100+
WithWAL: !flushToBlocks,
99101
Random: random,
100102
SkipChunks: t.IsBenchmark(),
101103
})
102-
testutil.Ok(t, head.Close())
103-
104-
tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j))
105-
106104
for i := 0; i < len(created); i++ {
107-
resps[j] = append(resps[j], storepb.NewSeriesResponse(&created[i]))
105+
resps[j] = append(resps[j], storepb.NewSeriesResponse(created[i]))
108106
}
109107

110108
if flushToBlocks {
111-
db, err := tsdb.OpenDBReadOnly(tsdbDir, logger)
112-
testutil.Ok(t, err)
113-
114-
testutil.Ok(t, db.FlushWAL(tmpDir))
115-
testutil.Ok(t, db.Close())
109+
_ = createBlockFromHead(t, tsdbDir, head)
116110
}
111+
testutil.Ok(t, head.Close())
117112

118113
db, err := tsdb.OpenDBReadOnly(tsdbDir, logger)
119114
testutil.Ok(t, err)
@@ -128,7 +123,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
128123

129124
store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs })
130125

131-
var expected []storepb.Series
126+
var expected []*storepb.Series
132127
lastLabels := storepb.Series{}
133128
for _, resp := range resps {
134129
for _, r := range resp {
@@ -140,7 +135,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
140135
continue
141136
}
142137
lastLabels = x
143-
expected = append(expected, *r.GetSeries())
138+
expected = append(expected, r.GetSeries())
144139
}
145140
}
146141

pkg/store/proxy_test.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"math"
1212
"math/rand"
1313
"os"
14+
"path/filepath"
1415
"sort"
1516
"testing"
1617
"time"
@@ -1616,17 +1617,16 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
16161617
var resps []*storepb.SeriesResponse
16171618

16181619
head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{
1619-
Dir: tmpDir,
1620+
TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", j)),
16201621
SamplesPerSeries: samplesPerSeriesPerClient,
16211622
Series: seriesPerClient,
1622-
MaxFrameBytes: storetestutil.RemoteReadFrameLimit,
16231623
Random: random,
16241624
SkipChunks: t.IsBenchmark(),
16251625
})
16261626
testutil.Ok(t, head.Close())
16271627

16281628
for i := 0; i < len(created); i++ {
1629-
resps = append(resps, storepb.NewSeriesResponse(&created[i]))
1629+
resps = append(resps, storepb.NewSeriesResponse(created[i]))
16301630
}
16311631

16321632
clients[j] = &testClient{
@@ -1647,23 +1647,22 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
16471647
}
16481648

16491649
var allResps []*storepb.SeriesResponse
1650-
var expected []storepb.Series
1650+
var expected []*storepb.Series
16511651
lastLabels := storepb.Series{}
16521652
for _, c := range clients {
16531653
m := c.(*testClient).StoreClient.(*mockedStoreAPI)
16541654

1655+
// NOTE: Proxy will merge all series with same labels without any frame limit (https://github.com/thanos-io/thanos/issues/2332).
16551656
for _, r := range m.RespSeries {
16561657
allResps = append(allResps, r)
16571658

1658-
// Proxy will merge all series with same labels without limit (https://github.com/thanos-io/thanos/issues/2332).
1659-
// Let's do this here as well.
16601659
x := storepb.Series{Labels: r.GetSeries().Labels}
16611660
if x.String() == lastLabels.String() {
16621661
expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, r.GetSeries().Chunks...)
16631662
continue
16641663
}
16651664
lastLabels = x
1666-
expected = append(expected, *r.GetSeries())
1665+
expected = append(expected, r.GetSeries())
16671666
}
16681667

16691668
}
@@ -1700,7 +1699,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
17001699
// In this we expect exactly the same response as input.
17011700
expected = expected[:0]
17021701
for _, r := range allResps {
1703-
expected = append(expected, *r.GetSeries())
1702+
expected = append(expected, r.GetSeries())
17041703
}
17051704
storetestutil.TestServerSeries(t, store,
17061705
&storetestutil.SeriesCase{

0 commit comments

Comments
 (0)