Skip to content

Commit 1f2a4a9

Browse files
committed
metrics: add compression counters
Add running counters that keep track of how many bytes were compressed and decompressed. The counters are segregated along the same lines where compression settings can differ: L5 vs L6 vs other levels, and data vs value vs other blocks. The intention is to estimate the CPU usage change for a different compression profile (in conjunction with data about each algorithm's performance, as obtained by the compression analyzer).
1 parent caae028 commit 1f2a4a9

32 files changed

+577
-150
lines changed

compaction.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2945,6 +2945,8 @@ func (d *DB) runCopyCompaction(
29452945
var wrote uint64
29462946
err = d.fileCache.withReader(ctx, block.NoReadEnv, inputMeta.VirtualMeta(), func(r *sstable.Reader, env sstable.ReadEnv) error {
29472947
var err error
2948+
writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, d.TableFormat())
2949+
writerOpts.CompressionCounters = d.compressionCounters.Compressed.ForLevel(base.MakeLevel(c.outputLevel.level))
29482950
// TODO(radu): plumb a ReadEnv to CopySpan (it could use the buffer pool
29492951
// or update category stats).
29502952
wrote, err = sstable.CopySpan(ctx,

db.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,8 @@ type DB struct {
547547
// compaction concurrency
548548
openedAt time.Time
549549

550+
compressionCounters block.CompressionCounters
551+
550552
iterTracker *inflight.Tracker
551553
}
552554

@@ -2013,6 +2015,9 @@ func (d *DB) Metrics() *Metrics {
20132015
blobCompressionMetrics := blobCompressionStatsAnnotator.Annotation(&vers.BlobFiles)
20142016
metrics.BlobFiles.Compression.MergeWith(&blobCompressionMetrics)
20152017

2018+
metrics.CompressionCounters.LogicalBytesCompressed = d.compressionCounters.LoadCompressed()
2019+
metrics.CompressionCounters.LogicalBytesDecompressed = d.compressionCounters.LoadDecompressed()
2020+
20162021
metrics.BlockCache = d.opts.Cache.Metrics()
20172022
metrics.FileCache, metrics.Filter = d.fileCache.Metrics()
20182023
metrics.TableIters = d.fileCache.IterCount()

flushable_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
5858

5959
// We can reuse the ingestLoad function for this test even if we're
6060
// not actually ingesting a file.
61-
lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheHandle, pendingOutputs)
61+
lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheHandle, &d.compressionCounters, pendingOutputs)
6262
if err != nil {
6363
t.Fatal(err)
6464
}

ingest.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ func ingestLoad1(
257257
fmv FormatMajorVersion,
258258
readable objstorage.Readable,
259259
cacheHandle *cache.Handle,
260+
compressionCounters *block.CompressionCounters,
260261
tableNum base.TableNum,
261262
rangeKeyValidator rangeKeyIngestValidator,
262263
) (
@@ -270,6 +271,9 @@ func ingestLoad1(
270271
CacheHandle: cacheHandle,
271272
FileNum: base.PhysicalTableDiskFileNum(tableNum),
272273
}
274+
if compressionCounters != nil {
275+
o.CompressionCounters = &compressionCounters.Decompressed
276+
}
273277
r, err := sstable.NewReader(ctx, readable, o)
274278
if err != nil {
275279
return nil, keyspan.Span{}, base.BlockReadStats{}, errors.CombineErrors(err, readable.Close())
@@ -498,6 +502,7 @@ func ingestLoad(
498502
shared []SharedSSTMeta,
499503
external []ExternalFile,
500504
cacheHandle *cache.Handle,
505+
compressionCounters *block.CompressionCounters,
501506
pending []base.TableNum,
502507
) (ingestLoadResult, error) {
503508
localFileNums := pending[:len(paths)]
@@ -531,7 +536,7 @@ func ingestLoad(
531536
if !shouldDisableRangeKeyChecks {
532537
rangeKeyValidator = validateSuffixedBoundaries(opts.Comparer, lastRangeKey)
533538
}
534-
m, lastRangeKey, blockReadStats, err = ingestLoad1(ctx, opts, fmv, readable, cacheHandle, localFileNums[i], rangeKeyValidator)
539+
m, lastRangeKey, blockReadStats, err = ingestLoad1(ctx, opts, fmv, readable, cacheHandle, compressionCounters, localFileNums[i], rangeKeyValidator)
535540
if err != nil {
536541
return ingestLoadResult{}, err
537542
}
@@ -1480,7 +1485,7 @@ func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats,
14801485
// Load the metadata for all the files being ingested. This step detects
14811486
// and elides empty sstables.
14821487
loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external,
1483-
d.cacheHandle, pendingOutputs)
1488+
d.cacheHandle, &d.compressionCounters, pendingOutputs)
14841489
if err != nil {
14851490
return IngestOperationStats{}, err
14861491
}

ingest_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func TestIngestLoad(t *testing.T) {
148148
FS: mem,
149149
}
150150
opts.WithFSDefaults()
151-
lr, err := ingestLoad(context.Background(), opts, dbVersion, []string{"ext"}, nil, nil, nil, []base.TableNum{1})
151+
lr, err := ingestLoad(context.Background(), opts, dbVersion, []string{"ext"}, nil, nil, nil, nil, []base.TableNum{1})
152152
if err != nil {
153153
return err.Error()
154154
}
@@ -247,7 +247,7 @@ func TestIngestLoadRand(t *testing.T) {
247247
}
248248
opts.WithFSDefaults()
249249
opts.EnsureDefaults()
250-
lr, err := ingestLoad(context.Background(), opts, version, paths, nil, nil, nil, pending)
250+
lr, err := ingestLoad(context.Background(), opts, version, paths, nil, nil, nil, nil, pending)
251251
require.NoError(t, err)
252252

253253
// Reset flaky stats.
@@ -272,7 +272,7 @@ func TestIngestLoadInvalid(t *testing.T) {
272272
FS: mem,
273273
}
274274
opts.WithFSDefaults()
275-
if _, err := ingestLoad(context.Background(), opts, internalFormatNewest, []string{"invalid"}, nil, nil, nil, []base.TableNum{1}); err == nil {
275+
if _, err := ingestLoad(context.Background(), opts, internalFormatNewest, []string{"invalid"}, nil, nil, nil, nil, []base.TableNum{1}); err == nil {
276276
t.Fatalf("expected error, but found success")
277277
}
278278
}

internal.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
package pebble
66

7-
import "github.com/cockroachdb/pebble/internal/base"
7+
import (
8+
"github.com/cockroachdb/pebble/internal/base"
9+
"github.com/cockroachdb/pebble/sstable/block"
10+
)
811

912
// SeqNum exports the base.SeqNum type.
1013
type SeqNum = base.SeqNum
@@ -80,3 +83,5 @@ type ShortAttribute = base.ShortAttribute
8083
// LazyValue.Clone requires a pointer to a LazyFetcher struct to avoid
8184
// allocations. No code outside Pebble needs to peer into a LazyFetcher.
8285
type LazyFetcher = base.LazyFetcher
86+
87+
type CompressionCounters = block.CompressionCounters

metrics.go

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ type Metrics struct {
349349
BackingTableCount uint64
350350
// The sum of the sizes of the BackingTableCount sstables that are backing virtual tables.
351351
BackingTableSize uint64
352-
// Compression statistics for sstable data (does not include blob files).
352+
// Compression statistics for the current sstables.
353353
Compression CompressionMetrics
354354

355355
// Local file sizes.
@@ -447,9 +447,17 @@ type Metrics struct {
447447
ZombieCount uint64
448448
}
449449

450+
// Compression statistics for the current blob files.
450451
Compression CompressionMetrics
451452
}
452453

454+
// CompressionCounters are cumulative counters for the number of logical
455+
// (uncompressed) bytes that went through compression and decompression.
456+
CompressionCounters struct {
457+
LogicalBytesCompressed block.ByLevel[block.ByKind[uint64]]
458+
LogicalBytesDecompressed block.ByLevel[block.ByKind[uint64]]
459+
}
460+
453461
FileCache FileCacheMetrics
454462

455463
// Count of the number of open sstable iterators.
@@ -500,7 +508,7 @@ type Metrics struct {
500508

501509
// CompressionMetrics contains compression metrics for sstables or blob files.
502510
type CompressionMetrics struct {
503-
// NoCompressionBytes is the total number of bytes in files that do are not
511+
// NoCompressionBytes is the total number of bytes in files that are not
504512
// compressed. Data can be uncompressed when 1) compression is disabled; 2)
505513
// for certain special types of blocks; and 3) for blocks that are not
506514
// compressible.
@@ -845,6 +853,16 @@ var (
845853
table.Div(),
846854
table.String("blob files", 13, table.AlignRight, func(i compressionInfo) string { return i.blobFiles }),
847855
)
856+
compressionCountersTableHeader = ` | logical bytes compressed / decompressed`
857+
compressionCountersTable = table.Define[compressionCountersInfo](
858+
table.String("level", 5, table.AlignRight, func(i compressionCountersInfo) string { return i.level }),
859+
table.Div(),
860+
table.String("data blocks", 14, table.AlignCenter, func(i compressionCountersInfo) string { return i.DataBlocks }),
861+
table.Div(),
862+
table.String("value blocks", 14, table.AlignCenter, func(i compressionCountersInfo) string { return i.ValueBlocks }),
863+
table.Div(),
864+
table.String("other blocks", 14, table.AlignCenter, func(i compressionCountersInfo) string { return i.OtherBlocks }),
865+
)
848866
)
849867

850868
type commitPipelineInfo struct {
@@ -973,6 +991,34 @@ func makeCompressionInfo(algorithm string, table, blob CompressionStatsForSettin
973991
return i
974992
}
975993

994+
type compressionCountersInfo struct {
995+
level string
996+
block.ByKind[string]
997+
}
998+
999+
func makeCompressionCountersInfo(m *Metrics) []compressionCountersInfo {
1000+
var result []compressionCountersInfo
1001+
isZero := func(c *block.ByKind[uint64]) bool {
1002+
return c.DataBlocks == 0 && c.ValueBlocks == 0 && c.OtherBlocks == 0
1003+
}
1004+
addLevel := func(level string, compressed, decompressed *block.ByKind[uint64]) {
1005+
if isZero(compressed) && isZero(decompressed) {
1006+
return
1007+
}
1008+
result = append(result, compressionCountersInfo{
1009+
level: level,
1010+
ByKind: block.ByKind[string]{
1011+
DataBlocks: humanizeBytes(compressed.DataBlocks) + " / " + humanizeBytes(decompressed.DataBlocks),
1012+
ValueBlocks: humanizeBytes(compressed.ValueBlocks) + " / " + humanizeBytes(decompressed.ValueBlocks),
1013+
OtherBlocks: humanizeBytes(compressed.OtherBlocks) + " / " + humanizeBytes(decompressed.OtherBlocks)},
1014+
})
1015+
}
1016+
addLevel("L0-L4", &m.CompressionCounters.LogicalBytesCompressed.OtherLevels, &m.CompressionCounters.LogicalBytesDecompressed.OtherLevels)
1017+
addLevel("L5", &m.CompressionCounters.LogicalBytesCompressed.L5, &m.CompressionCounters.LogicalBytesDecompressed.L5)
1018+
addLevel("L6", &m.CompressionCounters.LogicalBytesCompressed.L6, &m.CompressionCounters.LogicalBytesDecompressed.L6)
1019+
return result
1020+
}
1021+
9761022
// String pretty-prints the metrics.
9771023
//
9781024
// See testdata/metrics for an example.
@@ -1160,7 +1206,11 @@ func (m *Metrics) String() string {
11601206
compressionContents = slices.DeleteFunc(compressionContents, func(i compressionInfo) bool {
11611207
return i.tables == "" && i.blobFiles == ""
11621208
})
1163-
compressionTable.Render(cur, table.RenderOptions{}, compressionContents...)
1209+
cur = compressionTable.Render(cur, table.RenderOptions{}, compressionContents...)
1210+
1211+
cur = cur.NewlineReturn()
1212+
cur = cur.WriteString(compressionCountersTableHeader).NewlineReturn()
1213+
compressionCountersTable.Render(cur, table.RenderOptions{}, makeCompressionCountersInfo(m)...)
11641214

11651215
return wb.String()
11661216
}
@@ -1190,8 +1240,8 @@ func (m *Metrics) StringForTests() string {
11901240

11911241
// We recalculate the file cache size using the 64-bit sizes, and we ignore
11921242
// the genericcache metadata size which is harder to adjust.
1193-
const sstableReaderSize64bit = 280
1194-
const blobFileReaderSize64bit = 112
1243+
const sstableReaderSize64bit = 288
1244+
const blobFileReaderSize64bit = 120
11951245
mCopy.FileCache.Size = mCopy.FileCache.TableCount*sstableReaderSize64bit + mCopy.FileCache.BlobFileCount*blobFileReaderSize64bit
11961246
if math.MaxInt == math.MaxInt64 {
11971247
// Verify the 64-bit sizes, so they are kept updated.

metrics_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,24 @@ func exampleMetrics() Metrics {
184184
m.BlobFiles.Compression.Zstd.CompressedBytes = 100 * GB
185185
m.BlobFiles.Compression.Zstd.UncompressedBytes = 500 * GB
186186

187+
byKind := func(n uint64) block.ByKind[uint64] {
188+
return block.ByKind[uint64]{
189+
DataBlocks: n * 10 * GB,
190+
ValueBlocks: n * 100 * GB,
191+
OtherBlocks: n * GB,
192+
}
193+
}
194+
m.CompressionCounters.LogicalBytesCompressed = block.ByLevel[block.ByKind[uint64]]{
195+
L5: byKind(5),
196+
L6: byKind(6),
197+
OtherLevels: byKind(1),
198+
}
199+
m.CompressionCounters.LogicalBytesDecompressed = block.ByLevel[block.ByKind[uint64]]{
200+
L5: byKind(50),
201+
L6: byKind(60),
202+
OtherLevels: byKind(10),
203+
}
204+
187205
m.FileCache.Size = 1 * MB
188206
m.FileCache.TableCount = 180
189207
m.FileCache.BlobFileCount = 181

open.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,9 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
476476
opts.FileCache = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
477477
defer opts.FileCache.Unref()
478478
}
479-
d.fileCache = opts.FileCache.newHandle(d.cacheHandle, d.objProvider, d.opts.LoggerAndTracer, d.opts.MakeReaderOptions(), d.reportCorruption)
479+
fileCacheReaderOpts := d.opts.MakeReaderOptions()
480+
fileCacheReaderOpts.CompressionCounters = &d.compressionCounters.Decompressed
481+
d.fileCache = opts.FileCache.newHandle(d.cacheHandle, d.objProvider, d.opts.LoggerAndTracer, fileCacheReaderOpts, d.reportCorruption)
480482
d.newIters = d.fileCache.newIters
481483
d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
482484

@@ -875,7 +877,7 @@ func (d *DB) replayIngestedFlushable(
875877
}
876878
// NB: ingestLoad1 will close readable.
877879
meta[i], lastRangeKey, _, err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(),
878-
readable, d.cacheHandle, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
880+
readable, d.cacheHandle, &d.compressionCounters, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
879881
if err != nil {
880882
return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
881883
}

options.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2634,17 +2634,20 @@ func (o *Options) MakeWriterOptions(level int, format sstable.TableFormat) sstab
26342634
// makeWriterOptions constructs sstable.WriterOptions for the specified level
26352635
// using the current DB options and format.
26362636
func (d *DB) makeWriterOptions(level int) sstable.WriterOptions {
2637-
return d.opts.MakeWriterOptions(level, d.TableFormat())
2637+
o := d.opts.MakeWriterOptions(level, d.TableFormat())
2638+
o.CompressionCounters = d.compressionCounters.Compressed.ForLevel(base.MakeLevel(level))
2639+
return o
26382640
}
26392641

26402642
// makeBlobWriterOptions constructs blob.FileWriterOptions using the current DB
26412643
// options and format.
26422644
func (d *DB) makeBlobWriterOptions(level int) blob.FileWriterOptions {
26432645
lo := &d.opts.Levels[level]
26442646
return blob.FileWriterOptions{
2645-
Format: d.BlobFileFormat(),
2646-
Compression: lo.Compression(),
2647-
ChecksumType: block.ChecksumTypeCRC32c,
2647+
Format: d.BlobFileFormat(),
2648+
Compression: lo.Compression(),
2649+
CompressionCounters: d.compressionCounters.Compressed.ForLevel(base.MakeLevel(level)),
2650+
ChecksumType: block.ChecksumTypeCRC32c,
26482651
FlushGovernor: block.MakeFlushGovernor(
26492652
lo.BlockSize,
26502653
lo.BlockSizeThreshold,

0 commit comments

Comments
 (0)