Skip to content

Add proposal for parquet storage #6712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions docs/proposals/parquet-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
---
title: "Parquet-based Storage"
linkTitle: "Parquet-based Storage"
weight: 1
slug: parquet-storage
---

- Author: [Alan Protasio](https://github.com/alanprot), [Ben Ye](https://github.com/yeya24)
- Date: April 2025
- Status: Proposed

## Background

Since the introduction of Block Storage in Cortex, TSDB format and Store Gateway is the de-facto way to query long term data on object storage. However, it presents several significant challenges:

### TSDB Format Limitations

TSDB format, while efficient for write-heavy workloads on local SSDs, is not designed for object storage:
- Index relies heavily on random reads to serve queries, where each random read becomes a request to object store
- In order to reduce requests to object store, requests needs to be merged, leading to higher overfetch
- Index relies on postings, which can be a huge bottleneck for high cardinality data

### Store Gateway Operational Challenges

Store Gateway is originally introduced in [Thanos](https://thanos.io/). Both Cortex and Thanos community have been collaborating to add a lot of optimizations to Store Gateway. However, it has its own problems related to the design.

1. Resource Intensive
- Requires significant local disk space to store index headers
- High memory utilization due to index header mmap
- Often needs over-provisioning to handle query spikes

2. State Management and Scaling Difficulties
- Requires complex data sharding when scaling. Often causing issues such as consistency check failure. Hard to configure for users
- Initial sync causes long startup time. This affects service availability on both scaling and failure recovery scenario

3. Query Inefficiencies
- Attempts to minimize storage requests often lead to overfetching, causing high bandwidth usage
- Complex caching logic with varying effectiveness. Latency varies a lot when cache miss
- Processes single block with one goroutine, leading to high latency for large blocks and cannot scale without complex data partitioning

### Why Parquet?

[Apache Parquet](https://parquet.apache.org/) is a columnar storage format designed specifically for efficient data storage and retrieval from object storage systems. It offers several key advantages that directly address the problems we face with TSDB and Store Gateway:

- Data organized by columns rather than rows, reduces number of requests to object storage as only limited IO is required to fetch the whole column
- Rich file metadata and index, no local state like index header required to query the data, making it stateless
- Advanced compression techniques reduce storage costs and improve query performance
- Parallel processing friendly using Parquet Row Group

There are other benefits of Parquet formats, but they are not directly related to the proposal:

- Wide ecosystem and tooling support
- Column pruning opportunity using projection pushdown

## Out of Scope

- Allow Ingester and Compactor to create Parquet files instead of TSDB blocks directly. This could be in the future roadmap but this proposal only focuses on converting and querying Parquet files.

## Proposed Design

### Components

There are 2 new Cortex components/modules introduced in this design.

#### 1. Parquet Converter

Parquet Converter is a new component that converts TSDB blocks on object store to Parquet file format.

It is similar to compactor, however, it only converts single block. The converted Parquet files will be stored in the same TSDB block folder so that the lifecycle of Parquet file will be managed together with the block.

Only certain blocks can be configured to convert to Parquet file and it can be block duration based, for example we only convert if block duration is >= 12h.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add a sentence on why we wouldn't want to automatically convert all blocks to the Parquet format? 🤔


#### 2. Parquet Queryable

Similar to the existing `distributorQueryable` and `blockStorageQueryable`, Parquet queryable is a queryable implementation which allows Cortex to query parquet files and can be used in both Cortex Querier and Ruler.

If Parquet queryable is enabled, block storage queryable will be disabled and Cortex querier will not query Store Gateway anymore. `distributorQueryable` remains unchanged so it still queries Ingesters.
Copy link
Contributor

@danielblando danielblando Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have the parquet converter configured for only blocks >= 12h. Would blockStorageQueryable still be enabled when querying blocks < 12h?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing flags like query Ingester within and query store after will still be used so we can fallback to Ingesters.
We can also do some kind of fallback to store gateway as you said during migration phase. But those are implementation details.

Long term solution is for compactor to create and compact parquet files so same as what we have today.


Parquet queryable uses bucket index to discovers parquet files in object storage. The bucket index is the same as the existing TSDB bucket index file, but using a different name `bucket-index-parquet.json.gz`. It is updated periodically by Cortex Compactor/Parquet Converter if parquet storage is enabled.
Copy link
Contributor

@danielblando danielblando Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we still query the 12h tsdb blocks while we dont have them in the parquet index?
For example if we have 8h blocks and we are compacting to 12h blocks, I assume after the new 12h tsdb blocks are created we convert it to parquet, but while this doesnt finish we only have them in the default index and the 8h blocks would be removed from default index by compactor, no?

Copy link
Contributor Author

@yeya24 yeya24 Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot query them until the parquet file is created and added to bucket index. There is some tradeoff here. Users can configure any option that works for them

  • You can configure convert only 12h+ blocks. This has a longer delay in parquet file creation but less resource required for convertion. Users need to expect fallback to some other storage to query the data
  • You can configure converting 2h block. Maybe we can configure it to only convert blocks after deduplication so parquet files are available earlier but more compactors are required to do convert

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, makes more sense now. I think then parquet would need to do some kind of merge between both index for the 12h scenario.


Cortex querier remains a stateless component when Parquet queryable is enabled.

### Architecture

```
┌──────────┐ ┌─────────────┐ ┌──────────────┐
│ Ingester │───>│ TSDB │───>│ Parquet │
└──────────┘ │ Blocks │ │ Converter │
└─────────────┘ └──────────────┘
v
┌──────────┐ ┌─────────────┐ ┌──────────────┐
│ Query │───>│ Parquet │───>│ Parquet │
│ Frontend │ │ Querier │ │ Files │
└──────────┘ └─────────────┘ └──────────────┘
```

### Data Format

Parquet file is converted from TSDB block so it follows the same time range constraint.

If the largest block is 1 day then parquet file can go up to 1 day. Max block range is configurable in Cortex but default value is 24h. So following schema will use 24h as example.

#### Schema Overview

The Parquet format consists of two types of files:

1. **Labels Parquet File**
- Each row represents a unique time series
- Each column corresponds to a label name (e.g., `__name__`, `label1`, ..., `labelN`)
- Row groups are sorted by `__name__` alphabetically in ascending order
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably mean alphabetical here, letters cannot ascend 😄


2. **Chunks Parquet File**
- Maintains row and row group order matching the Labels file
- Contains multiple chunk columns for time-series data ordered by time. With 3 chunk columns for example, each column covers 8h of chunks: 0-8h, 8h-16h, 16-24h. It is possible that a single TSDB chunk spans over time ranges of 2 columns and Parquet file writer needs to split and re-encode chunks for each chunk column.

#### Column Specifications
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this column specification originated by joining the two parquet files above?

maybe I'm missing something obvious, but would be nice to include the rationale for splitting in two files. rows are ordered in the same way both files, so I'm not sure why they are need to be split.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are definitely right. We experimented with a single file with both labels and chunks. The reason of splitting to 2 files is that labels and chunks have kind of different size and read pattern. We are able to configure parquet reader differently for read buffer so that we can read more efficiently.

There is a also POC from Cloudflare which uses 2 files so that they can choose to store those files differently. They can cache labels file inmemory and leave chunks file on object store because of size for more efficient index queries.

Overall, 2 files seem a more flexible approach. Maybe @alanprot can share more info.

Copy link

@MichaHoffmann MichaHoffmann Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 files are useful since the labels parquet file is tiny and can be stored on disk or memoized if wanted. This reduces requests to object storage for any label related lookups.


| Column Name | Description | Type | Encoding/Compression/skipPageBounds | Required |
|------------|-------------|------|-----------------------------------|-----------|
| `s_hash` | Hash of all labels | INT64 | None/Zstd/Yes | No |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will this be used for? Projection? Or maybe for consistency checks too? prometheus/prometheus#16423 I believe projection might not be so easy :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For projection and potentially other usecases like cardinality analysis

| `s_col_indexes` | Bitmap indicating which columns store the label set for this row (series) | ByteArray (bitmap) | DeltaByteArray/Zstd/Yes | Yes |
| `s_lbl_{labelName}` | Values for a given label name. Rows are sorted by metric name | ByteArray (string) | RLE_DICTIONARY/Zstd/No | Yes |
| `s_data_{n}` | Chunks columns (0 to data_cols_count). Each column contains data from `[n*duration, (n+1)*duration]` where duration is `24h/data_cols_count` | ByteArray (encoded chunks) | DeltaByteArray/Zstd/Yes | Yes |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a dumb question, but should the column count always result in columns being split by full hours (e.g. every 6 / 8 / 12 hours)? Are there any consequences if that's not so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will mention we re-encoded the chunks a bit at writer so that they fall into the configured column time ranges


data_cols_count will be a parquet file metadata and its value is default to 3 but it can be configurable to adjust for different usecases.

## Open Questions

1. Should we use Parquet Gateway to replace Store Gateway
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a fully compatible API between Parquet Gateway and Store Gateway would make the migration easier as well no?

Copy link
Contributor Author

@yeya24 yeya24 Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is easier to just replace querier with parquet querier I believe as it is stateless.
But yeah migration is not part of the proposal. We probably need to create a migration guide later.

- Separate query engine and storage
- We can make Parquet Gateway semi-stateful like data locality for better performance

## Acknowledgement

We'd like to give huge credits for people from the Thanos community who started this initiative.

- [Filip Petkovski](https://github.com/fpetkovski) and his initial [talk about Parquet](https://www.youtube.com/watch?v=V8Y4VuUwg8I)
- [Michael Hoffmann](https://github.com/MichaHoffmann) and his great work of [parquet poc](https://github.com/cloudflare/parquet-tsdb-poc)
Loading