@@ -7,12 +7,14 @@ import (
77 "fmt"
88 "math/rand"
99 "path/filepath"
10+ "slices"
1011 "strconv"
1112 "testing"
1213 "time"
1314
1415 "github.com/cortexproject/promqlsmith"
1516 "github.com/prometheus/prometheus/model/labels"
17+ "github.com/prometheus/prometheus/promql"
1618 "github.com/stretchr/testify/require"
1719 "github.com/thanos-io/objstore"
1820 "github.com/thanos-io/thanos/pkg/block"
@@ -176,3 +178,190 @@ func TestParquetFuzz(t *testing.T) {
176178 require .NoError (t , cortex .WaitSumMetricsWithOptions (e2e .Greater (0 ), []string {"cortex_parquet_queryable_blocks_queried_total" }, e2e .WithLabelMatchers (
177179 labels .MustNewMatcher (labels .MatchEqual , "type" , "parquet" ))))
178180}
181+
182+ func TestParquetProjectionPushdown (t * testing.T ) {
183+ s , err := e2e .NewScenario (networkName )
184+ require .NoError (t , err )
185+ defer s .Close ()
186+
187+ consul := e2edb .NewConsulWithName ("consul" )
188+ memcached := e2ecache .NewMemcached ()
189+ require .NoError (t , s .StartAndWaitReady (consul , memcached ))
190+
191+ baseFlags := mergeFlags (AlertmanagerLocalFlags (), BlocksStorageFlags ())
192+ flags := mergeFlags (
193+ baseFlags ,
194+ map [string ]string {
195+ "-target" : "all,parquet-converter" ,
196+ "-blocks-storage.tsdb.block-ranges-period" : "1m,24h" ,
197+ "-blocks-storage.tsdb.ship-interval" : "1s" ,
198+ "-blocks-storage.bucket-store.sync-interval" : "1s" ,
199+ "-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl" : "1s" ,
200+ "-blocks-storage.bucket-store.bucket-index.idle-timeout" : "1s" ,
201+ "-blocks-storage.bucket-store.bucket-index.enabled" : "true" ,
202+ "-blocks-storage.bucket-store.index-cache.backend" : tsdb .IndexCacheBackendInMemory ,
203+ "-querier.query-store-for-labels-enabled" : "true" ,
204+ // compactor
205+ "-compactor.cleanup-interval" : "1s" ,
206+ // Ingester.
207+ "-ring.store" : "consul" ,
208+ "-consul.hostname" : consul .NetworkHTTPEndpoint (),
209+ // Distributor.
210+ "-distributor.replication-factor" : "1" ,
211+ // Store-gateway.
212+ "-store-gateway.sharding-enabled" : "false" ,
213+ "--querier.store-gateway-addresses" : "nonExistent" , // Make sure we do not call Store gateways
214+ // alert manager
215+ "-alertmanager.web.external-url" : "http://localhost/alertmanager" ,
216+ // parquet-converter
217+ "-parquet-converter.ring.consul.hostname" : consul .NetworkHTTPEndpoint (),
218+ "-parquet-converter.conversion-interval" : "1s" ,
219+ "-parquet-converter.enabled" : "true" ,
220+ // Querier - Enable Thanos engine with projection optimizer
221+ "-querier.thanos-engine" : "true" ,
222+ "-querier.optimizers" : "default,projection" , // Enable projection optimizer
223+ "-querier.enable-parquet-queryable" : "true" ,
224+ "-querier.parquet-queryable-honor-projection-hints" : "true" , // Honor projection hints
225+ // Set query-ingesters-within to 2h so queries older than 2h don't hit ingesters
226+ // Since test queries are 24-48h old, they won't query ingesters and projection will be enabled
227+ "-querier.query-ingesters-within" : "2h" ,
228+ // Enable cache for parquet labels and chunks
229+ "-blocks-storage.bucket-store.parquet-labels-cache.backend" : "inmemory,memcached" ,
230+ "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses" : "dns+" + memcached .NetworkEndpoint (e2ecache .MemcachedPort ),
231+ "-blocks-storage.bucket-store.chunks-cache.backend" : "inmemory,memcached" ,
232+ "-blocks-storage.bucket-store.chunks-cache.memcached.addresses" : "dns+" + memcached .NetworkEndpoint (e2ecache .MemcachedPort ),
233+ },
234+ )
235+
236+ // make alert manager config dir
237+ require .NoError (t , writeFileToSharedDir (s , "alertmanager_configs" , []byte {}))
238+
239+ ctx := context .Background ()
240+ rnd := rand .New (rand .NewSource (time .Now ().Unix ()))
241+ dir := filepath .Join (s .SharedDir (), "data" )
242+ numSeries := 20
243+ numSamples := 100
244+ lbls := make ([]labels.Labels , 0 , numSeries )
245+ scrapeInterval := time .Minute
246+ statusCodes := []string {"200" , "400" , "404" , "500" , "502" }
247+ methods := []string {"GET" , "POST" , "PUT" , "DELETE" }
248+ now := time .Now ()
249+ // Make sure query time is old enough to not overlap with ingesters
250+ // With query-ingesters-within=2h, queries with maxT < now-2h won't hit ingesters
251+ // Using 24h-48h ago ensures no ingester overlap, allowing projection to be enabled
252+ start := now .Add (- time .Hour * 48 )
253+ end := now .Add (- time .Hour * 24 )
254+
255+ // Create series with multiple labels
256+ for i := 0 ; i < numSeries ; i ++ {
257+ lbls = append (lbls , labels .FromStrings (
258+ labels .MetricName , "http_requests_total" ,
259+ "job" , "api-server" ,
260+ "instance" , fmt .Sprintf ("instance-%d" , i % 5 ),
261+ "status_code" , statusCodes [i % len (statusCodes )],
262+ "method" , methods [i % len (methods )],
263+ "path" , fmt .Sprintf ("/api/v1/endpoint%d" , i % 3 ),
264+ "cluster" , "test-cluster" ,
265+ ))
266+ }
267+
268+ id , err := e2e .CreateBlock (ctx , rnd , dir , lbls , numSamples , start .UnixMilli (), end .UnixMilli (), scrapeInterval .Milliseconds (), 10 )
269+ require .NoError (t , err )
270+ minio := e2edb .NewMinio (9000 , flags ["-blocks-storage.s3.bucket-name" ])
271+ require .NoError (t , s .StartAndWaitReady (minio ))
272+
273+ cortex := e2ecortex .NewSingleBinary ("cortex" , flags , "" )
274+ require .NoError (t , s .StartAndWaitReady (cortex ))
275+
276+ storage , err := e2ecortex .NewS3ClientForMinio (minio , flags ["-blocks-storage.s3.bucket-name" ])
277+ require .NoError (t , err )
278+ bkt := bucket .NewUserBucketClient ("user-1" , storage .GetBucket (), nil )
279+
280+ err = block .Upload (ctx , log .Logger , bkt , filepath .Join (dir , id .String ()), metadata .NoneFunc )
281+ require .NoError (t , err )
282+
283+ // Wait until we convert the blocks to parquet
284+ cortex_testutil .Poll (t , 30 * time .Second , true , func () interface {} {
285+ found := false
286+ foundBucketIndex := false
287+
288+ err := bkt .Iter (context .Background (), "" , func (name string ) error {
289+ if name == fmt .Sprintf ("parquet-markers/%v-parquet-converter-mark.json" , id .String ()) {
290+ found = true
291+ }
292+ if name == "bucket-index.json.gz" {
293+ foundBucketIndex = true
294+ }
295+ return nil
296+ }, objstore .WithRecursiveIter ())
297+ require .NoError (t , err )
298+ return found && foundBucketIndex
299+ })
300+
301+ c , err := e2ecortex .NewClient ("" , cortex .HTTPEndpoint (), "" , "" , "user-1" )
302+ require .NoError (t , err )
303+
304+ // Test queries that should use projection hints
305+ testCases := []struct {
306+ name string
307+ query string
308+ expectedLabels []string // Labels that should be present in result (besides __name__)
309+ }{
310+ {
311+ name : "simple_sum_by_job" ,
312+ query : `sum by (job) (http_requests_total)` ,
313+ expectedLabels : []string {"job" },
314+ },
315+ {
316+ name : "rate_with_aggregation" ,
317+ query : `sum by (method) (rate(http_requests_total[5m]))` ,
318+ expectedLabels : []string {"method" },
319+ },
320+ {
321+ name : "multiple_grouping_labels" ,
322+ query : `sum by (job, status_code) (http_requests_total)` ,
323+ expectedLabels : []string {"job" , "status_code" },
324+ },
325+ }
326+
327+ for _ , tc := range testCases {
328+ t .Run (tc .name , func (t * testing.T ) {
329+ t .Logf ("Testing: %s" , tc .query )
330+
331+ // Execute instant query
332+ result , err := c .Query (tc .query , end )
333+ require .NoError (t , err )
334+ require .NotNil (t , result )
335+
336+ // Verify we got results
337+ matrix := result .(promql.Matrix )
338+ require .NotEmpty (t , matrix , "query should return results" )
339+
340+ t .Logf ("Query returned %d series" , len (matrix ))
341+
342+ // Verify projection worked: series should only have the expected labels
343+ for i , series := range matrix {
344+ actualLabels := make (map [string ]struct {})
345+ for _ , label := range series .Metric {
346+ actualLabels [label .Name ] = struct {}{}
347+ }
348+
349+ // Check that no unexpected labels are present
350+ for lbl := range actualLabels {
351+ if ! slices .Contains (tc .expectedLabels , lbl ) {
352+ require .Fail (t , "series should not have %s label" , lbl )
353+ }
354+ }
355+ // Check that all expected labels are present
356+ for _ , expectedLabel := range tc .expectedLabels {
357+ require .True (t , actualLabels [expectedLabel ],
358+ "series should have %s label" , expectedLabel )
359+ }
360+ }
361+ })
362+ }
363+
364+ // Verify that parquet blocks were queried
365+ require .NoError (t , cortex .WaitSumMetricsWithOptions (e2e .Greater (0 ), []string {"cortex_parquet_queryable_blocks_queried_total" }, e2e .WithLabelMatchers (
366+ labels .MustNewMatcher (labels .MatchEqual , "type" , "parquet" ))))
367+ }
0 commit comments