Skip to content

Add parquet convert failure, block delay and total block to convert metrics #6821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
* [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809 #6821
* [ENHANCEMENT] Compactor: Optimize cleaner run time. #6815
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
Expand Down
7 changes: 4 additions & 3 deletions integration/parquet_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ func TestParquetFuzz(t *testing.T) {
flags := mergeFlags(
baseFlags,
map[string]string{
"-target": "all,parquet-converter",
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-target": "all,parquet-converter",
"-blocks-storage.tsdb.block-ranges-period": "1m,24h",
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
Expand Down
36 changes: 28 additions & 8 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
Expand All @@ -43,6 +44,7 @@ type BlocksCleanerConfig struct {
TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug".
ShardingStrategy string
CompactionStrategy string
BlockRanges []int64
}

type BlocksCleaner struct {
Expand Down Expand Up @@ -73,6 +75,7 @@ type BlocksCleaner struct {
blocksMarkedForDeletion *prometheus.CounterVec
tenantBlocks *prometheus.GaugeVec
tenantParquetBlocks *prometheus.GaugeVec
tenantParquetUnConvertedBlocks *prometheus.GaugeVec
tenantBlocksMarkedForDelete *prometheus.GaugeVec
tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec
tenantPartialBlocks *prometheus.GaugeVec
Expand Down Expand Up @@ -160,6 +163,10 @@ func NewBlocksCleaner(
Name: "cortex_bucket_parquet_blocks_count",
Help: "Total number of parquet blocks in the bucket. Blocks marked for deletion are included.",
}, commonLabels),
tenantParquetUnConvertedBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_bucket_parquet_unconverted_blocks_count",
Help: "Total number of unconverted parquet blocks in the bucket. Blocks marked for deletion are included.",
}, commonLabels),
tenantBlocksMarkedForDelete: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_bucket_blocks_marked_for_deletion_count",
Help: "Total number of blocks marked for deletion in the bucket.",
Expand Down Expand Up @@ -377,6 +384,7 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
if !isActive[userID] && !isMarkedForDeletion[userID] {
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantParquetBlocks.DeleteLabelValues(userID)
c.tenantParquetUnConvertedBlocks.DeleteLabelValues(userID)
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
Expand Down Expand Up @@ -483,6 +491,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
// Given all blocks have been deleted, we can also remove the metrics.
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantParquetBlocks.DeleteLabelValues(userID)
c.tenantParquetUnConvertedBlocks.DeleteLabelValues(userID)
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
Expand Down Expand Up @@ -708,14 +717,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
}
level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
}
c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks)))
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
if parquetEnabled {
c.tenantParquetBlocks.WithLabelValues(userID).Set(float64(len(idx.ParquetBlocks())))
}
c.updateBucketMetrics(userID, parquetEnabled, idx, float64(len(partials)), float64(totalBlocksBlocksMarkedForNoCompaction))

if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
begin = time.Now()
Expand All @@ -725,6 +727,24 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
return nil
}

func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool, idx *bucketindex.Index, partials, totalBlocksBlocksMarkedForNoCompaction float64) {
c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks)))
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(totalBlocksBlocksMarkedForNoCompaction)
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(partials))
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
if parquetEnabled {
c.tenantParquetBlocks.WithLabelValues(userID).Set(float64(len(idx.ParquetBlocks())))
remainingBlocksToConvert := 0
for _, b := range idx.NonParquetBlocks() {
if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.BlockRanges) {
remainingBlocksToConvert++
}
}
c.tenantParquetUnConvertedBlocks.WithLabelValues(userID).Set(float64(remainingBlocksToConvert))
}
}

func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, userID string) {
existentPartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct {
path string
Expand Down
108 changes: 107 additions & 1 deletion pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
}

logger := log.NewNopLogger()
Expand Down Expand Up @@ -182,6 +183,8 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions

// Create Parquet marker
block13 := createTSDBBlock(t, bucketClient, "user-6", 30, 50, nil)
// This block should be converted to Parquet format so counted as remaining.
block14 := createTSDBBlock(t, bucketClient, "user-6", 30, 50, nil)
createParquetMarker(t, bucketClient, "user-6", block13)

// The fixtures have been created. If the bucket client wasn't wrapped to write
Expand All @@ -196,6 +199,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
CleanupConcurrency: options.concurrency,
BlockDeletionMarksMigrationEnabled: options.markersMigrationEnabled,
TenantCleanupDelay: options.tenantDeletionDelay,
BlockRanges: (&tsdb.DurationList{2 * time.Hour}).ToMilliseconds(),
}

reg := prometheus.NewPedanticRegistry()
Expand Down Expand Up @@ -251,6 +255,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
{path: path.Join("user-3", block10.String(), parquet.ConverterMarkerFileName), expectedExists: false},
{path: path.Join("user-4", block.DebugMetas, "meta.json"), expectedExists: options.user4FilesExist},
{path: path.Join("user-6", block13.String(), parquet.ConverterMarkerFileName), expectedExists: true},
{path: path.Join("user-6", block14.String(), parquet.ConverterMarkerFileName), expectedExists: false},
} {
exists, err := bucketClient.Exists(ctx, tc.path)
require.NoError(t, err)
Expand Down Expand Up @@ -296,6 +301,11 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
}, {
userID: "user-3",
expectedIndex: false,
}, {
userID: "user-6",
expectedIndex: true,
expectedBlocks: []ulid.ULID{block13, block14},
expectedMarks: []ulid.ULID{},
},
} {
idx, err := bucketindex.ReadIndex(ctx, bucketClient, tc.userID, nil, logger)
Expand All @@ -318,7 +328,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
cortex_bucket_blocks_count{user="user-1"} 2
cortex_bucket_blocks_count{user="user-2"} 1
cortex_bucket_blocks_count{user="user-5"} 2
cortex_bucket_blocks_count{user="user-6"} 1
cortex_bucket_blocks_count{user="user-6"} 2
# HELP cortex_bucket_blocks_marked_for_deletion_count Total number of blocks marked for deletion in the bucket.
# TYPE cortex_bucket_blocks_marked_for_deletion_count gauge
cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 1
Expand All @@ -341,9 +351,14 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
# TYPE cortex_bucket_parquet_blocks_count gauge
cortex_bucket_parquet_blocks_count{user="user-5"} 0
cortex_bucket_parquet_blocks_count{user="user-6"} 1
# HELP cortex_bucket_parquet_unconverted_blocks_count Total number of unconverted parquet blocks in the bucket. Blocks marked for deletion are included.
# TYPE cortex_bucket_parquet_unconverted_blocks_count gauge
cortex_bucket_parquet_unconverted_blocks_count{user="user-5"} 0
cortex_bucket_parquet_unconverted_blocks_count{user="user-6"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_parquet_blocks_count",
"cortex_bucket_parquet_unconverted_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_bucket_blocks_marked_for_no_compaction_count",
"cortex_bucket_blocks_partials_count",
Expand Down Expand Up @@ -378,6 +393,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
}

logger := log.NewNopLogger()
Expand Down Expand Up @@ -447,6 +463,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
DeletionDelay: deletionDelay,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
}

logger := log.NewNopLogger()
Expand Down Expand Up @@ -508,6 +525,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
}

ctx := context.Background()
Expand Down Expand Up @@ -657,6 +675,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
}

ctx := context.Background()
Expand Down Expand Up @@ -889,6 +908,7 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
CleanupConcurrency: 1,
ShardingStrategy: util.ShardingStrategyShuffle,
CompactionStrategy: util.CompactionStrategyPartitioning,
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
}

ctx := context.Background()
Expand Down Expand Up @@ -964,6 +984,7 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
CleanupConcurrency: 1,
ShardingStrategy: util.ShardingStrategyShuffle,
CompactionStrategy: util.CompactionStrategyPartitioning,
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
}

ctx := context.Background()
Expand Down Expand Up @@ -1021,6 +1042,91 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
require.True(t, userBucket.IsObjNotFoundErr(err))
}

func TestBlocksCleaner_ParquetMetrics(t *testing.T) {
// Create metrics
reg := prometheus.NewPedanticRegistry()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Name: "cortex_compactor_blocks_marked_for_deletion_total",
Help: "Total number of blocks marked for deletion in compactor.",
},
[]string{"user", "reason"},
)
remainingPlannedCompactions := promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Name: "cortex_compactor_remaining_planned_compactions",
Help: "Total number of remaining planned compactions.",
},
[]string{"user"},
)

// Create the blocks cleaner
cleaner := NewBlocksCleaner(
BlocksCleanerConfig{
BlockRanges: (&tsdb.DurationList{
2 * time.Hour,
12 * time.Hour,
}).ToMilliseconds(),
},
nil, // bucket not needed
nil, // usersScanner not needed
0,
&mockConfigProvider{
parquetConverterEnabled: map[string]bool{
"user1": true,
},
},
log.NewNopLogger(),
"test",
reg,
0,
0,
blocksMarkedForDeletion,
remainingPlannedCompactions,
)

// Create test blocks in the index
now := time.Now()
idx := &bucketindex.Index{
Blocks: bucketindex.Blocks{
{
ID: ulid.MustNew(ulid.Now(), rand.Reader),
MinTime: now.Add(-3 * time.Hour).UnixMilli(),
MaxTime: now.UnixMilli(),
Parquet: &parquet.ConverterMarkMeta{},
},
{
ID: ulid.MustNew(ulid.Now(), rand.Reader),
MinTime: now.Add(-3 * time.Hour).UnixMilli(),
MaxTime: now.UnixMilli(),
Parquet: nil,
},
{
ID: ulid.MustNew(ulid.Now(), rand.Reader),
MinTime: now.Add(-5 * time.Hour).UnixMilli(),
MaxTime: now.UnixMilli(),
Parquet: nil,
},
},
}

// Update metrics
cleaner.updateBucketMetrics("user1", true, idx, 0, 0)

// Verify metrics
require.NoError(t, prom_testutil.CollectAndCompare(cleaner.tenantParquetBlocks, strings.NewReader(`
# HELP cortex_bucket_parquet_blocks_count Total number of parquet blocks in the bucket. Blocks marked for deletion are included.
# TYPE cortex_bucket_parquet_blocks_count gauge
cortex_bucket_parquet_blocks_count{user="user1"} 1
`)))

require.NoError(t, prom_testutil.CollectAndCompare(cleaner.tenantParquetUnConvertedBlocks, strings.NewReader(`
# HELP cortex_bucket_parquet_unconverted_blocks_count Total number of unconverted parquet blocks in the bucket. Blocks marked for deletion are included.
# TYPE cortex_bucket_parquet_unconverted_blocks_count gauge
cortex_bucket_parquet_unconverted_blocks_count{user="user1"} 2
`)))
}

type mockConfigProvider struct {
userRetentionPeriods map[string]time.Duration
parquetConverterEnabled map[string]bool
Expand Down
1 change: 1 addition & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ func (c *Compactor) starting(ctx context.Context) error {
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
ShardingStrategy: c.compactorCfg.ShardingStrategy,
CompactionStrategy: c.compactorCfg.CompactionStrategy,
BlockRanges: c.compactorCfg.BlockRanges.ToMilliseconds(),
}, cleanerBucketClient, cleanerUsersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions)

Expand Down
Loading