Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
continue
}

if marker.Version == cortex_parquet.CurrentVersion {
// We don't convert blocks again if they already have a valid converter mark.
if cortex_parquet.ValidConverterMarkVersion(marker.Version) {
continue
}

Expand Down
70 changes: 70 additions & 0 deletions pkg/parquetconverter/converter_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package parquetconverter

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -414,3 +416,71 @@ func (m *mockTenantLimits) ByUserID(userID string) *validation.Limits {
func (m *mockTenantLimits) AllByUserID() map[string]*validation.Limits {
return m.limits
}

func TestConverter_SkipBlocksWithExistingValidMarker(t *testing.T) {
cfg := prepareConfig()
user := "user"
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
dir := t.TempDir()

cfg.Ring.InstanceID = "parquet-converter-1"
cfg.Ring.InstanceAddr = "1.2.3.4"
cfg.Ring.KVStore.Mock = ringStore
bucketClient, err := filesystem.NewBucket(t.TempDir())
require.NoError(t, err)
userBucket := bucket.NewPrefixedBucketClient(bucketClient, user)
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.ParquetConverterEnabled = true

c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil)

ctx := context.Background()

lbls := labels.FromStrings("__name__", "test")

// Create a block
rnd := rand.New(rand.NewSource(time.Now().Unix()))
blockID, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, 0, 2*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10)
require.NoError(t, err)

// Upload the block to the bucket
blockDir := fmt.Sprintf("%s/%s", dir, blockID.String())
b, err := tsdb.OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc)
require.NoError(t, err)

// Write a converter mark with version 1 to simulate an already converted block
markerV1 := parquet.ConverterMark{
Version: parquet.ParquetConverterMarkVersion1,
}
markerBytes, err := json.Marshal(markerV1)
require.NoError(t, err)
markerPath := path.Join(blockID.String(), parquet.ConverterMarkerFileName)
err = userBucket.Upload(ctx, markerPath, bytes.NewReader(markerBytes))
require.NoError(t, err)

// Verify the marker exists with version 1
marker, err := parquet.ReadConverterMark(ctx, blockID, userBucket, logger)
require.NoError(t, err)
require.Equal(t, parquet.ParquetConverterMarkVersion1, marker.Version)

// Start the converter
err = services.StartAndAwaitRunning(context.Background(), c)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck

// Wait a bit for the converter to process blocks
time.Sleep(5 * time.Second)

// Verify the marker version is still 1 (i.e., the block was not converted again)
markerAfter, err := parquet.ReadConverterMark(ctx, blockID, userBucket, logger)
require.NoError(t, err)
require.Equal(t, parquet.ParquetConverterMarkVersion1, markerAfter.Version, "block with existing marker version 1 should not be converted again")

// Verify that no conversion happened by checking the convertedBlocks metric
// It should be 0 since the block was already converted
assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user)))
}
11 changes: 10 additions & 1 deletion pkg/storage/parquet/converter_marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ import (
const (
ConverterMarkerPrefix = "parquet-markers"
ConverterMarkerFileName = "parquet-converter-mark.json"
CurrentVersion = 1

CurrentVersion = ParquetConverterMarkVersion2
ParquetConverterMarkVersion1 = 1
// ParquetConverterMarkVersion2 has an additional series hash
// column which is used for projection pushdown.
ParquetConverterMarkVersion2 = 2
)

type ConverterMark struct {
Expand Down Expand Up @@ -64,3 +69,7 @@ func WriteConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Buck
type ConverterMarkMeta struct {
Version int `json:"version"`
}

func ValidConverterMarkVersion(version int) bool {
return version == ParquetConverterMarkVersion1 || version == ParquetConverterMarkVersion2
}
Loading