Skip to content

Commit cca0cc7

Browse files
authored
Support caching bucket for cleaner (#6778)
* support caching bucket for cleaner Signed-off-by: yeya24 <[email protected]> * add changelog Signed-off-by: yeya24 <[email protected]> * add separate flag Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: yeya24 <[email protected]> Signed-off-by: Ben Ye <[email protected]>
1 parent 1b10efe commit cca0cc7

File tree

5 files changed

+67
-11
lines changed

5 files changed

+67
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
* [ENHANCEMENT] Querier: Add `querier.ingester-query-max-attempts` to retry on partial data. #6714
3737
* [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766
3838
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769
39+
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
3940
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
4041
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
4142
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

docs/blocks-storage/compactor.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,4 +333,8 @@ compactor:
333333
# service, which serves as the source of truth for block status
334334
# CLI flag: -compactor.caching-bucket-enabled
335335
[caching_bucket_enabled: <boolean> | default = false]
336+
337+
# When enabled, caching bucket will be used for cleaner
338+
# CLI flag: -compactor.cleaner-caching-bucket-enabled
339+
[cleaner_caching_bucket_enabled: <boolean> | default = false]
336340
```

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2413,6 +2413,10 @@ sharding_ring:
24132413
# service, which serves as the source of truth for block status
24142414
# CLI flag: -compactor.caching-bucket-enabled
24152415
[caching_bucket_enabled: <boolean> | default = false]
2416+
2417+
# When enabled, caching bucket will be used for cleaner
2418+
# CLI flag: -compactor.cleaner-caching-bucket-enabled
2419+
[cleaner_caching_bucket_enabled: <boolean> | default = false]
24162420
```
24172421

24182422
### `configs_config`

pkg/compactor/compactor.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,9 @@ type Config struct {
300300
CleanerVisitMarkerTimeout time.Duration `yaml:"cleaner_visit_marker_timeout"`
301301
CleanerVisitMarkerFileUpdateInterval time.Duration `yaml:"cleaner_visit_marker_file_update_interval"`
302302

303-
AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
304-
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
303+
AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
304+
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
305+
CleanerCachingBucketEnabled bool `yaml:"cleaner_caching_bucket_enabled"`
305306
}
306307

307308
// RegisterFlags registers the Compactor flags.
@@ -345,6 +346,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
345346

346347
f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.")
347348
f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status")
349+
f.BoolVar(&cfg.CleanerCachingBucketEnabled, "compactor.cleaner-caching-bucket-enabled", false, "When enabled, caching bucket will be used for cleaner")
348350

349351
f.DurationVar(&cfg.ShardingPlannerDelay, "compactor.sharding-planner-delay", 10*time.Second, "How long shuffle sharding planner would wait before running planning code. This delay would prevent double compaction when two compactors claimed same partition in grouper at same time.")
350352
}
@@ -650,8 +652,17 @@ func (c *Compactor) starting(ctx context.Context) error {
650652
// Wrap the bucket client to write block deletion marks in the global location too.
651653
c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient)
652654

655+
cleanerBucketClient := c.bucketClient
656+
657+
if c.compactorCfg.CleanerCachingBucketEnabled {
658+
cleanerBucketClient, err = cortex_tsdb.CreateCachingBucketForCompactor(c.storageCfg.BucketStore.MetadataCache, true, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "cleaner"}, c.registerer))
659+
if err != nil {
660+
return errors.Wrap(err, "create caching bucket for cleaner")
661+
}
662+
}
663+
653664
// Create the users scanner.
654-
c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUserForCleanUp, c.parentLogger)
665+
c.usersScanner = cortex_tsdb.NewUsersScanner(cleanerBucketClient, c.ownUserForCleanUp, c.parentLogger)
655666

656667
var cleanerRingLifecyclerID = "default-cleaner"
657668
// Initialize the compactors ring if sharding is enabled.
@@ -727,18 +738,13 @@ func (c *Compactor) starting(ctx context.Context) error {
727738
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
728739
ShardingStrategy: c.compactorCfg.ShardingStrategy,
729740
CompactionStrategy: c.compactorCfg.CompactionStrategy,
730-
}, c.bucketClient, c.usersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
741+
}, cleanerBucketClient, c.usersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
731742
c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions)
732743

733744
if c.compactorCfg.CachingBucketEnabled {
734-
matchers := cortex_tsdb.NewMatchers()
735-
// Do not cache tenant deletion marker and block deletion marker for compactor
736-
matchers.SetMetaFileMatcher(func(name string) bool {
737-
return strings.HasSuffix(name, "/"+metadata.MetaFilename)
738-
})
739-
c.bucketClient, err = cortex_tsdb.CreateCachingBucket(cortex_tsdb.ChunksCacheConfig{}, c.storageCfg.BucketStore.MetadataCache, matchers, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer))
745+
c.bucketClient, err = cortex_tsdb.CreateCachingBucketForCompactor(c.storageCfg.BucketStore.MetadataCache, false, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer))
740746
if err != nil {
741-
return errors.Wrap(err, "create caching bucket")
747+
return errors.Wrap(err, "create caching bucket for compactor")
742748
}
743749
}
744750
return nil

pkg/storage/tsdb/caching_bucket.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,47 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
245245
return storecache.NewCachingBucket(bkt, cfg, logger, reg)
246246
}
247247

248+
func CreateCachingBucketForCompactor(metadataConfig MetadataCacheConfig, cleaner bool, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
249+
matchers := NewMatchers()
250+
// Do not cache block deletion marker for compactor
251+
matchers.SetMetaFileMatcher(func(name string) bool {
252+
return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+TenantDeletionMarkFile)
253+
})
254+
cfg := cache.NewCachingBucketConfig()
255+
cachingConfigured := false
256+
257+
metadataCache, err := createMetadataCache("metadata-cache", &metadataConfig.MetadataCacheBackend, logger, reg)
258+
if err != nil {
259+
return nil, errors.Wrapf(err, "metadata-cache")
260+
}
261+
if metadataCache != nil {
262+
cachingConfigured = true
263+
metadataCache = cache.NewTracingCache(metadataCache)
264+
265+
codec := snappyIterCodec{storecache.JSONIterCodec{}}
266+
cfg.CacheIter("tenants-iter", metadataCache, matchers.GetTenantsIterMatcher(), metadataConfig.TenantsListTTL, codec, "")
267+
cfg.CacheAttributes("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileAttributesTTL)
268+
269+
// Don't cache bucket index get and tenant blocks iter if it is cleaner.
270+
if !cleaner {
271+
cfg.CacheExists("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
272+
cfg.CacheGet("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
273+
cfg.CacheGet("bucket-index", metadataCache, matchers.GetBucketIndexMatcher(), metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0)
274+
cfg.CacheIter("tenant-blocks-iter", metadataCache, matchers.GetTenantBlocksIterMatcher(), metadataConfig.TenantBlocksListTTL, codec, "")
275+
} else {
276+
// Cache only GET for metadata and don't cache exists and not exists.
277+
cfg.CacheGet("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, 0, 0)
278+
}
279+
}
280+
281+
if !cachingConfigured {
282+
// No caching is configured.
283+
return bkt, nil
284+
}
285+
286+
return storecache.NewCachingBucket(bkt, cfg, logger, reg)
287+
}
288+
248289
func createMetadataCache(cacheName string, cacheBackend *MetadataCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
249290
switch cacheBackend.Backend {
250291
case "":

0 commit comments

Comments
 (0)