@@ -7,11 +7,13 @@ import (
77 "fmt"
88 "math/rand"
99 "path/filepath"
10+ "slices"
1011 "strconv"
1112 "testing"
1213 "time"
1314
1415 "github.com/cortexproject/promqlsmith"
16+ "github.com/prometheus/common/model"
1517 "github.com/prometheus/prometheus/model/labels"
1618 "github.com/stretchr/testify/require"
1719 "github.com/thanos-io/objstore"
@@ -23,7 +25,9 @@ import (
2325 e2edb "github.com/cortexproject/cortex/integration/e2e/db"
2426 "github.com/cortexproject/cortex/integration/e2ecortex"
2527 "github.com/cortexproject/cortex/pkg/storage/bucket"
28+ cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
2629 "github.com/cortexproject/cortex/pkg/storage/tsdb"
30+ "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2731 "github.com/cortexproject/cortex/pkg/util/log"
2832 cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
2933)
@@ -176,3 +180,213 @@ func TestParquetFuzz(t *testing.T) {
176180 require .NoError (t , cortex .WaitSumMetricsWithOptions (e2e .Greater (0 ), []string {"cortex_parquet_queryable_blocks_queried_total" }, e2e .WithLabelMatchers (
177181 labels .MustNewMatcher (labels .MatchEqual , "type" , "parquet" ))))
178182}
183+
184+ func TestParquetProjectionPushdownFuzz (t * testing.T ) {
185+ s , err := e2e .NewScenario (networkName )
186+ require .NoError (t , err )
187+ defer s .Close ()
188+
189+ consul := e2edb .NewConsulWithName ("consul" )
190+ memcached := e2ecache .NewMemcached ()
191+ require .NoError (t , s .StartAndWaitReady (consul , memcached ))
192+
193+ baseFlags := mergeFlags (AlertmanagerLocalFlags (), BlocksStorageFlags ())
194+ flags := mergeFlags (
195+ baseFlags ,
196+ map [string ]string {
197+ "-target" : "all,parquet-converter" ,
198+ "-blocks-storage.tsdb.block-ranges-period" : "1m,24h" ,
199+ "-blocks-storage.tsdb.ship-interval" : "1s" ,
200+ "-blocks-storage.bucket-store.sync-interval" : "1s" ,
201+ "-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl" : "1s" ,
202+ "-blocks-storage.bucket-store.bucket-index.idle-timeout" : "1s" ,
203+ "-blocks-storage.bucket-store.bucket-index.enabled" : "true" ,
204+ "-blocks-storage.bucket-store.index-cache.backend" : tsdb .IndexCacheBackendInMemory ,
205+ "-querier.query-store-for-labels-enabled" : "true" ,
206+ // compactor
207+ "-compactor.cleanup-interval" : "1s" ,
208+ // Ingester.
209+ "-ring.store" : "consul" ,
210+ "-consul.hostname" : consul .NetworkHTTPEndpoint (),
211+ // Distributor.
212+ "-distributor.replication-factor" : "1" ,
213+ // Store-gateway.
214+ "-store-gateway.sharding-enabled" : "false" ,
215+ "--querier.store-gateway-addresses" : "nonExistent" , // Make sure we do not call Store gateways
216+ // alert manager
217+ "-alertmanager.web.external-url" : "http://localhost/alertmanager" ,
218+ // parquet-converter
219+ "-parquet-converter.ring.consul.hostname" : consul .NetworkHTTPEndpoint (),
220+ "-parquet-converter.conversion-interval" : "1s" ,
221+ "-parquet-converter.enabled" : "true" ,
222+ // Querier - Enable Thanos engine with projection optimizer
223+ "-querier.thanos-engine" : "true" ,
224+ "-querier.optimizers" : "propagate-matchers,sort-matchers,merge-selects,detect-histogram-stats,projection" , // Enable all optimizers including projection
225+ "-querier.enable-parquet-queryable" : "true" ,
226+ "-querier.parquet-queryable-honor-projection-hints" : "true" , // Honor projection hints
227+ // Set query-ingesters-within to 2h so queries older than 2h don't hit ingesters
228+ // Since test queries are 24-48h old, they won't query ingesters and projection will be enabled
229+ "-querier.query-ingesters-within" : "2h" ,
230+ // Enable cache for parquet labels and chunks
231+ "-blocks-storage.bucket-store.parquet-labels-cache.backend" : "inmemory,memcached" ,
232+ "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses" : "dns+" + memcached .NetworkEndpoint (e2ecache .MemcachedPort ),
233+ "-blocks-storage.bucket-store.chunks-cache.backend" : "inmemory,memcached" ,
234+ "-blocks-storage.bucket-store.chunks-cache.memcached.addresses" : "dns+" + memcached .NetworkEndpoint (e2ecache .MemcachedPort ),
235+ },
236+ )
237+
238+ // make alert manager config dir
239+ require .NoError (t , writeFileToSharedDir (s , "alertmanager_configs" , []byte {}))
240+
241+ ctx := context .Background ()
242+ rnd := rand .New (rand .NewSource (time .Now ().Unix ()))
243+ dir := filepath .Join (s .SharedDir (), "data" )
244+ numSeries := 20
245+ numSamples := 100
246+ lbls := make ([]labels.Labels , 0 , numSeries )
247+ scrapeInterval := time .Minute
248+ statusCodes := []string {"200" , "400" , "404" , "500" , "502" }
249+ methods := []string {"GET" , "POST" , "PUT" , "DELETE" }
250+ now := time .Now ()
251+ // Make sure query time is old enough to not overlap with ingesters.
252+ start := now .Add (- time .Hour * 72 )
253+ end := now .Add (- time .Hour * 48 )
254+
255+ // Create series with multiple labels
256+ for i := range numSeries {
257+ lbls = append (lbls , labels .FromStrings (
258+ labels .MetricName , "http_requests_total" ,
259+ "job" , "api-server" ,
260+ "instance" , fmt .Sprintf ("instance-%d" , i % 5 ),
261+ "status_code" , statusCodes [i % len (statusCodes )],
262+ "method" , methods [i % len (methods )],
263+ "path" , fmt .Sprintf ("/api/v1/endpoint%d" , i % 3 ),
264+ "cluster" , "test-cluster" ,
265+ ))
266+ }
267+
268+ id , err := e2e .CreateBlock (ctx , rnd , dir , lbls , numSamples , start .UnixMilli (), end .UnixMilli (), scrapeInterval .Milliseconds (), 10 )
269+ require .NoError (t , err )
270+ minio := e2edb .NewMinio (9000 , flags ["-blocks-storage.s3.bucket-name" ])
271+ require .NoError (t , s .StartAndWaitReady (minio ))
272+
273+ cortex := e2ecortex .NewSingleBinary ("cortex" , flags , "" )
274+ require .NoError (t , s .StartAndWaitReady (cortex ))
275+
276+ storage , err := e2ecortex .NewS3ClientForMinio (minio , flags ["-blocks-storage.s3.bucket-name" ])
277+ require .NoError (t , err )
278+ bkt := storage .GetBucket ()
279+ userBucket := bucket .NewUserBucketClient ("user-1" , bkt , nil )
280+
281+ err = block .Upload (ctx , log .Logger , userBucket , filepath .Join (dir , id .String ()), metadata .NoneFunc )
282+ require .NoError (t , err )
283+
284+ // Wait until we convert the blocks to parquet AND bucket index is updated
285+ cortex_testutil .Poll (t , 300 * time .Second , true , func () interface {} {
286+ // Check if parquet marker exists
287+ markerFound := false
288+ err := userBucket .Iter (context .Background (), "" , func (name string ) error {
289+ if name == fmt .Sprintf ("parquet-markers/%v-parquet-converter-mark.json" , id .String ()) {
290+ markerFound = true
291+ }
292+ return nil
293+ }, objstore .WithRecursiveIter ())
294+ if err != nil || ! markerFound {
295+ return false
296+ }
297+
298+ // Check if bucket index exists AND contains the parquet block metadata
299+ idx , err := bucketindex .ReadIndex (ctx , bkt , "user-1" , nil , log .Logger )
300+ if err != nil {
301+ return false
302+ }
303+
304+ // Verify the block is in the bucket index with parquet metadata
305+ for _ , b := range idx .Blocks {
306+ if b .ID == id && b .Parquet != nil {
307+ require .True (t , b .Parquet .Version == cortex_parquet .CurrentVersion )
308+ return true
309+ }
310+ }
311+ return false
312+ })
313+
314+ c , err := e2ecortex .NewClient ("" , cortex .HTTPEndpoint (), "" , "" , "user-1" )
315+ require .NoError (t , err )
316+
317+ testCases := []struct {
318+ name string
319+ query string
320+ expectedLabels []string // Labels that should be present in result
321+ }{
322+ {
323+ name : "vector selector query should not use projection" ,
324+ query : `http_requests_total` ,
325+ expectedLabels : []string {"__name__" , "job" , "instance" , "status_code" , "method" , "path" , "cluster" },
326+ },
327+ {
328+ name : "simple_sum_by_job" ,
329+ query : `sum by (job) (http_requests_total)` ,
330+ expectedLabels : []string {"job" },
331+ },
332+ {
333+ name : "rate_with_aggregation" ,
334+ query : `sum by (method) (rate(http_requests_total[5m]))` ,
335+ expectedLabels : []string {"method" },
336+ },
337+ {
338+ name : "multiple_grouping_labels" ,
339+ query : `sum by (job, status_code) (http_requests_total)` ,
340+ expectedLabels : []string {"job" , "status_code" },
341+ },
342+ {
343+ name : "aggregation without query" ,
344+ query : `sum without (instance, method) (http_requests_total)` ,
345+ expectedLabels : []string {"job" , "status_code" , "path" , "cluster" },
346+ },
347+ }
348+
349+ for _ , tc := range testCases {
350+ t .Run (tc .name , func (t * testing.T ) {
351+ t .Logf ("Testing: %s" , tc .query )
352+
353+ // Execute instant query
354+ result , err := c .Query (tc .query , end )
355+ require .NoError (t , err )
356+ require .NotNil (t , result )
357+
358+ // Verify we got results
359+ vector , ok := result .(model.Vector )
360+ require .True (t , ok , "result should be a vector" )
361+ require .NotEmpty (t , vector , "query should return results" )
362+
363+ t .Logf ("Query returned %d series" , len (vector ))
364+
365+ // Verify projection worked: series should only have the expected labels
366+ for _ , sample := range vector {
367+ actualLabels := make (map [string ]struct {})
368+ for label := range sample .Metric {
369+ actualLabels [string (label )] = struct {}{}
370+ }
371+
372+ // Check that all expected labels are present
373+ for _ , expectedLabel := range tc .expectedLabels {
374+ _ , ok := actualLabels [expectedLabel ]
375+ require .True (t , ok ,
376+ "series should have %s label" , expectedLabel )
377+ }
378+
379+ // Check that no unexpected labels are present
380+ for lbl := range actualLabels {
381+ if ! slices .Contains (tc .expectedLabels , lbl ) {
382+ require .Fail (t , "series should not have unexpected label: %s" , lbl )
383+ }
384+ }
385+ }
386+ })
387+ }
388+
389+ // Verify that parquet blocks were queried
390+ require .NoError (t , cortex .WaitSumMetricsWithOptions (e2e .Greater (0 ), []string {"cortex_parquet_queryable_blocks_queried_total" }, e2e .WithLabelMatchers (
391+ labels .MustNewMatcher (labels .MatchEqual , "type" , "parquet" ))))
392+ }
0 commit comments