From add3bf9b24c07801ffbe5a512f0e24f70eb19a63 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 8 Dec 2025 18:44:20 +0900 Subject: [PATCH 1/4] Refactor: move parquet shard cache to parquet util pkg Signed-off-by: SungJin1212 --- docs/blocks-storage/querier.md | 8 +- docs/configuration/config-file-reference.md | 8 +- pkg/querier/parquet_queryable.go | 160 +----------------- pkg/querier/parquet_queryable_test.go | 116 +------------ pkg/querier/querier.go | 13 +- pkg/util/parquetutil/cache.go | 176 ++++++++++++++++++++ pkg/util/parquetutil/cache_test.go | 130 +++++++++++++++ 7 files changed, 329 insertions(+), 282 deletions(-) create mode 100644 pkg/util/parquetutil/cache.go create mode 100644 pkg/util/parquetutil/cache_test.go diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 732324fec1f..614713f0196 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -294,6 +294,10 @@ querier: # CLI flag: -querier.parquet-queryable-shard-cache-size [parquet_queryable_shard_cache_size: | default = 512] + # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. + # CLI flag: -querier.parquet-queryable-shard-cache-ttl + [parquet_queryable_shard_cache_ttl: | default = 24h] + # [Experimental] Parquet queryable's default block store to query. Valid # options are tsdb and parquet. If it is set to tsdb, parquet queryable always # fallback to store gateway. @@ -307,10 +311,6 @@ querier: # queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] - - # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. - # CLI flag: -querier.parquet-queryable-shard-cache-ttl - [parquet_queryable_shard_cache_ttl: | default = 24h] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 58d1c89ebb9..4109092cd6c 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4812,6 +4812,10 @@ thanos_engine: # CLI flag: -querier.parquet-queryable-shard-cache-size [parquet_queryable_shard_cache_size: | default = 512] +# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. +# CLI flag: -querier.parquet-queryable-shard-cache-ttl +[parquet_queryable_shard_cache_ttl: | default = 24h] + # [Experimental] Parquet queryable's default block store to query. Valid options # are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback # to store gateway. @@ -4824,10 +4828,6 @@ thanos_engine: # need to make sure Parquet files are created before it is queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] - -# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. -# CLI flag: -querier.parquet-queryable-shard-cache-ttl -[parquet_queryable_shard_cache_ttl: | default = 24h] ``` ### `query_frontend_config` diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 40be5ff8997..77f578225cf 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -7,7 +7,6 @@ import ( "time" "github.com/go-kit/log" - lru "github.com/hashicorp/golang-lru/v2" "github.com/opentracing/opentracing-go" "github.com/parquet-go/parquet-go" "github.com/pkg/errors" @@ -21,6 +20,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" "golang.org/x/sync/errgroup" @@ -33,6 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/multierror" + "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" @@ -51,8 +52,6 @@ const ( parquetBlockStore blockStoreType = "parquet" ) -const defaultMaintenanceInterval = time.Minute - var ( validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore} ) @@ -99,7 +98,7 @@ type parquetQueryableWithFallback struct { fallbackDisabled bool queryStoreAfter time.Duration parquetQueryable storage.Queryable - cache cacheInterface[parquet_storage.ParquetShard] + cache parquetutil.CacheInterface[parquet_storage.ParquetShard] blockStorageQueryable *BlocksStoreQueryable finder BlocksFinder @@ -135,7 +134,7 @@ func NewParquetQueryable( return nil, err } - cache, err := newCache[parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, config.ParquetQueryableShardCacheTTL, defaultMaintenanceInterval, newCacheMetrics(reg)) + cache, err := parquetutil.NewCache[parquet_storage.ParquetShard](&config.ParquetShardCache, "parquet-shards", extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) if err != nil { return nil, err } @@ -623,157 +622,6 @@ func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHint return &shardMatcherLabelsFilter{shardMatcher: sm}, true } -type cacheInterface[T any] interface { - Get(path string) T - Set(path string, reader T) - Close() -} - -type cacheMetrics struct { - hits *prometheus.CounterVec - misses *prometheus.CounterVec - evictions *prometheus.CounterVec - size *prometheus.GaugeVec -} - -func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics { - return &cacheMetrics{ - hits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_hits_total", - Help: "Total number of parquet cache hits", - }, []string{"name"}), - misses: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_misses_total", - Help: "Total number of parquet cache misses", - }, []string{"name"}), - evictions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_evictions_total", - Help: "Total number of parquet cache evictions", - }, []string{"name"}), - size: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "cortex_parquet_queryable_cache_item_count", - Help: "Current number of cached parquet items", - }, []string{"name"}), - } -} - -type cacheEntry[T any] struct { - value T - expiresAt time.Time -} - -type Cache[T any] struct { - cache *lru.Cache[string, *cacheEntry[T]] - name string - metrics *cacheMetrics - ttl time.Duration - stopCh chan struct{} -} - -func newCache[T any](name string, size int, ttl, maintenanceInterval time.Duration, metrics *cacheMetrics) (cacheInterface[T], error) { - if size <= 0 { - return &noopCache[T]{}, nil - } - cache, err := lru.NewWithEvict(size, func(key string, value *cacheEntry[T]) { - metrics.evictions.WithLabelValues(name).Inc() - metrics.size.WithLabelValues(name).Dec() - }) - if err != nil { - return nil, err - } - - c := &Cache[T]{ - cache: cache, - name: name, - metrics: metrics, - ttl: ttl, - stopCh: make(chan struct{}), - } - - if ttl > 0 { - go c.maintenanceLoop(maintenanceInterval) - } - - return c, nil -} - -func (c *Cache[T]) maintenanceLoop(interval time.Duration) { - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - now := time.Now() - keys := c.cache.Keys() - for _, key := range keys { - if entry, ok := c.cache.Peek(key); ok { - // we use a Peek() because the Get() change LRU order. - if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) { - c.cache.Remove(key) - } - } - } - case <-c.stopCh: - return - } - } -} - -func (c *Cache[T]) Get(path string) (r T) { - if entry, ok := c.cache.Get(path); ok { - isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt) - - if isExpired { - c.cache.Remove(path) - c.metrics.misses.WithLabelValues(c.name).Inc() - return - } - - c.metrics.hits.WithLabelValues(c.name).Inc() - return entry.value - } - c.metrics.misses.WithLabelValues(c.name).Inc() - return -} - -func (c *Cache[T]) Set(path string, reader T) { - if !c.cache.Contains(path) { - c.metrics.size.WithLabelValues(c.name).Inc() - } - - var expiresAt time.Time - if c.ttl > 0 { - expiresAt = time.Now().Add(c.ttl) - } - - entry := &cacheEntry[T]{ - value: reader, - expiresAt: expiresAt, - } - - c.cache.Add(path, entry) -} - -func (c *Cache[T]) Close() { - close(c.stopCh) -} - -type noopCache[T any] struct { -} - -func (n noopCache[T]) Get(_ string) (r T) { - return -} - -func (n noopCache[T]) Set(_ string, _ T) { - -} - -func (n noopCache[T]) Close() { - -} - var ( shardInfoCtxKey contextKey = 1 ) diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 250b1831442..82bc29f80a0 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -1,7 +1,6 @@ package querier import ( - "bytes" "context" "fmt" "math/rand" @@ -15,7 +14,6 @@ import ( "github.com/oklog/ulid/v2" "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -41,6 +39,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/services" cortex_testutil "github.com/cortexproject/cortex/pkg/util/testutil" "github.com/cortexproject/cortex/pkg/util/validation" @@ -402,8 +401,10 @@ func TestParquetQueryable_Limits(t *testing.T) { QueryStoreAfter: 0, StoreGatewayQueryStatsEnabled: false, StoreGatewayConsistencyCheckMaxAttempts: 3, - ParquetQueryableShardCacheSize: 100, - ParquetQueryableDefaultBlockStore: "parquet", + ParquetShardCache: parquetutil.CacheConfig{ + ParquetQueryableShardCacheSize: 100, + }, + ParquetQueryableDefaultBlockStore: "parquet", } storageCfg := cortex_tsdb.BlocksStorageConfig{ @@ -883,110 +884,3 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { }) }) } - -func Test_Cache_LRUEviction(t *testing.T) { - reg := prometheus.NewRegistry() - metrics := newCacheMetrics(reg) - cache, err := newCache[string]("test", 2, 0, time.Minute, metrics) - require.NoError(t, err) - defer cache.Close() - - cache.Set("key1", "value1") - cache.Set("key2", "value2") - - _ = cache.Get("key1") // hit - // "key2" deleted by LRU eviction - cache.Set("key3", "value3") - - val1 := cache.Get("key1") // hit - require.Equal(t, "value1", val1) - val3 := cache.Get("key3") // hit - require.Equal(t, "value3", val3) - val2 := cache.Get("key2") // miss - require.Equal(t, "", val2) - - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 3 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 2 - # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses - # TYPE cortex_parquet_queryable_cache_misses_total counter - cortex_parquet_queryable_cache_misses_total{name="test"} 1 - `))) -} - -func Test_Cache_TTLEvictionByGet(t *testing.T) { - reg := prometheus.NewRegistry() - metrics := newCacheMetrics(reg) - - cache, err := newCache[string]("test", 10, 100*time.Millisecond, time.Minute, metrics) - require.NoError(t, err) - defer cache.Close() - - cache.Set("key1", "value1") - - val := cache.Get("key1") - require.Equal(t, "value1", val) - - // sleep longer than TTL - time.Sleep(150 * time.Millisecond) - - val = cache.Get("key1") - require.Equal(t, "", val) - - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 0 - # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses - # TYPE cortex_parquet_queryable_cache_misses_total counter - cortex_parquet_queryable_cache_misses_total{name="test"} 1 - `))) -} - -func Test_Cache_TTLEvictionByLoop(t *testing.T) { - reg := prometheus.NewRegistry() - metrics := newCacheMetrics(reg) - - cache, err := newCache[string]("test", 10, 100*time.Millisecond, 100*time.Millisecond, metrics) - require.NoError(t, err) - defer cache.Close() - - cache.Set("key1", "value1") - - val := cache.Get("key1") - require.Equal(t, "value1", val) - - // sleep longer than TTL - time.Sleep(150 * time.Millisecond) - - if c, ok := cache.(*Cache[string]); ok { - // should delete by maintenance loop - _, ok := c.cache.Peek("key1") - require.False(t, ok) - } - - require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 0 - `))) -} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index a6912ea024a..636300d64df 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -32,6 +32,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" @@ -92,11 +93,10 @@ type Config struct { EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"` // Query Parquet files if available - EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` - ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` - ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` - ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` - ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` + EnableParquetQueryable bool `yaml:"enable_parquet_queryable"` + ParquetShardCache parquetutil.CacheConfig `yaml:",inline"` + ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"` + ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` } @@ -147,8 +147,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.") f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.") f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.") - f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") - f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, "querier.parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.") + cfg.ParquetShardCache.RegisterFlagsWithPrefix("querier.", f) f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "[Experimental] Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") diff --git a/pkg/util/parquetutil/cache.go b/pkg/util/parquetutil/cache.go new file mode 100644 index 00000000000..d9a4fc67ecb --- /dev/null +++ b/pkg/util/parquetutil/cache.go @@ -0,0 +1,176 @@ +package parquetutil + +import ( + "flag" + "time" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + defaultMaintenanceInterval = time.Minute +) + +type CacheInterface[T any] interface { + Get(path string) T + Set(path string, reader T) + Close() +} + +type cacheMetrics struct { + hits *prometheus.CounterVec + misses *prometheus.CounterVec + evictions *prometheus.CounterVec + size *prometheus.GaugeVec +} + +func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics { + return &cacheMetrics{ + hits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_queryable_cache_hits_total", + Help: "Total number of parquet cache hits", + }, []string{"name"}), + misses: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_queryable_cache_misses_total", + Help: "Total number of parquet cache misses", + }, []string{"name"}), + evictions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_queryable_cache_evictions_total", + Help: "Total number of parquet cache evictions", + }, []string{"name"}), + size: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_parquet_queryable_cache_item_count", + Help: "Current number of cached parquet items", + }, []string{"name"}), + } +} + +type cacheEntry[T any] struct { + value T + expiresAt time.Time +} + +type Cache[T any] struct { + cache *lru.Cache[string, *cacheEntry[T]] + name string + metrics *cacheMetrics + ttl time.Duration + stopCh chan struct{} +} + +type CacheConfig struct { + ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` + ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` + MaintenanceInterval time.Duration `yaml:"-"` +} + +func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.IntVar(&cfg.ParquetQueryableShardCacheSize, prefix+"parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") + f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, prefix+"parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.") + cfg.MaintenanceInterval = defaultMaintenanceInterval +} + +func NewCache[T any](cfg *CacheConfig, name string, reg prometheus.Registerer) (CacheInterface[T], error) { + if cfg.ParquetQueryableShardCacheSize <= 0 { + return &noopCache[T]{}, nil + } + metrics := newCacheMetrics(reg) + cache, err := lru.NewWithEvict(cfg.ParquetQueryableShardCacheSize, func(key string, value *cacheEntry[T]) { + metrics.evictions.WithLabelValues(name).Inc() + metrics.size.WithLabelValues(name).Dec() + }) + if err != nil { + return nil, err + } + + c := &Cache[T]{ + cache: cache, + name: name, + metrics: metrics, + ttl: cfg.ParquetQueryableShardCacheTTL, + stopCh: make(chan struct{}), + } + + if cfg.ParquetQueryableShardCacheTTL > 0 { + go c.maintenanceLoop(cfg.MaintenanceInterval) + } + + return c, nil +} + +func (c *Cache[T]) maintenanceLoop(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + now := time.Now() + keys := c.cache.Keys() + for _, key := range keys { + if entry, ok := c.cache.Peek(key); ok { + // we use a Peek() because the Get() change LRU order. + if !entry.expiresAt.IsZero() && now.After(entry.expiresAt) { + c.cache.Remove(key) + } + } + } + case <-c.stopCh: + return + } + } +} + +func (c *Cache[T]) Get(path string) (r T) { + if entry, ok := c.cache.Get(path); ok { + isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt) + + if isExpired { + c.cache.Remove(path) + c.metrics.misses.WithLabelValues(c.name).Inc() + return + } + + c.metrics.hits.WithLabelValues(c.name).Inc() + return entry.value + } + c.metrics.misses.WithLabelValues(c.name).Inc() + return +} + +func (c *Cache[T]) Set(path string, reader T) { + if !c.cache.Contains(path) { + c.metrics.size.WithLabelValues(c.name).Inc() + } + + var expiresAt time.Time + if c.ttl > 0 { + expiresAt = time.Now().Add(c.ttl) + } + + entry := &cacheEntry[T]{ + value: reader, + expiresAt: expiresAt, + } + + c.cache.Add(path, entry) +} + +func (c *Cache[T]) Close() { + close(c.stopCh) +} + +type noopCache[T any] struct { +} + +func (n noopCache[T]) Get(_ string) (r T) { + return +} + +func (n noopCache[T]) Set(_ string, _ T) { + +} + +func (n noopCache[T]) Close() {} diff --git a/pkg/util/parquetutil/cache_test.go b/pkg/util/parquetutil/cache_test.go new file mode 100644 index 00000000000..c9ad1f48ec5 --- /dev/null +++ b/pkg/util/parquetutil/cache_test.go @@ -0,0 +1,130 @@ +package parquetutil + +import ( + "bytes" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func Test_Cache_LRUEviction(t *testing.T) { + reg := prometheus.NewRegistry() + cfg := &CacheConfig{ + ParquetQueryableShardCacheSize: 2, + ParquetQueryableShardCacheTTL: 0, + MaintenanceInterval: time.Minute, + } + cache, err := NewCache[string](cfg, "test", reg) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + cache.Set("key2", "value2") + + _ = cache.Get("key1") // hit + // "key2" deleted by LRU eviction + cache.Set("key3", "value3") + + val1 := cache.Get("key1") // hit + require.Equal(t, "value1", val1) + val3 := cache.Get("key3") // hit + require.Equal(t, "value3", val3) + val2 := cache.Get("key2") // miss + require.Equal(t, "", val2) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 3 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 2 + # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_queryable_cache_misses_total counter + cortex_parquet_queryable_cache_misses_total{name="test"} 1 + `))) +} + +func Test_Cache_TTLEvictionByGet(t *testing.T) { + reg := prometheus.NewRegistry() + cfg := &CacheConfig{ + ParquetQueryableShardCacheSize: 10, + ParquetQueryableShardCacheTTL: 100 * time.Millisecond, + MaintenanceInterval: time.Minute, + } + + cache, err := NewCache[string](cfg, "test", reg) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + + val := cache.Get("key1") + require.Equal(t, "value1", val) + + // sleep longer than TTL + time.Sleep(150 * time.Millisecond) + + val = cache.Get("key1") + require.Equal(t, "", val) + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 0 + # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_queryable_cache_misses_total counter + cortex_parquet_queryable_cache_misses_total{name="test"} 1 + `))) +} + +func Test_Cache_TTLEvictionByLoop(t *testing.T) { + reg := prometheus.NewRegistry() + cfg := &CacheConfig{ + ParquetQueryableShardCacheSize: 10, + ParquetQueryableShardCacheTTL: 100 * time.Millisecond, + MaintenanceInterval: 100 * time.Millisecond, + } + + cache, err := NewCache[string](cfg, "test", reg) + require.NoError(t, err) + defer cache.Close() + + cache.Set("key1", "value1") + + val := cache.Get("key1") + require.Equal(t, "value1", val) + + // sleep longer than TTL + time.Sleep(150 * time.Millisecond) + + if c, ok := cache.(*Cache[string]); ok { + // should delete by maintenance loop + _, ok := c.cache.Peek("key1") + require.False(t, ok) + } + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_queryable_cache_evictions_total counter + cortex_parquet_queryable_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_queryable_cache_hits_total counter + cortex_parquet_queryable_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_queryable_cache_item_count gauge + cortex_parquet_queryable_cache_item_count{name="test"} 0 + `))) +} From d84092a32bd67673e036fc96378fb8122ddff2c5 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 11 Dec 2025 15:27:16 +0900 Subject: [PATCH 2/4] delete queryable Signed-off-by: SungJin1212 --- docs/blocks-storage/querier.md | 15 ++-- docs/configuration/config-file-reference.md | 13 ++-- pkg/querier/parquet_queryable_test.go | 2 +- pkg/util/parquetutil/cache.go | 26 +++---- pkg/util/parquetutil/cache_test.go | 84 ++++++++++----------- schemas/cortex-config-schema.json | 12 +-- 6 files changed, 75 insertions(+), 77 deletions(-) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 614713f0196..a6e0a29e67f 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -289,14 +289,13 @@ querier: # CLI flag: -querier.enable-parquet-queryable [enable_parquet_queryable: | default = false] - # [Experimental] Maximum size of the Parquet queryable shard cache. 0 to - # disable. - # CLI flag: -querier.parquet-queryable-shard-cache-size - [parquet_queryable_shard_cache_size: | default = 512] - - # [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. - # CLI flag: -querier.parquet-queryable-shard-cache-ttl - [parquet_queryable_shard_cache_ttl: | default = 24h] + # [Experimental] Maximum size of the Parquet shard cache. 0 to disable. + # CLI flag: -querier.parquet-shard-cache-size + [parquet_shard_cache_size: | default = 512] + + # [Experimental] TTL of the Parquet shard cache. 0 to no TTL. + # CLI flag: -querier.parquet-shard-cache-ttl + [parquet_shard_cache_ttl: | default = 24h] # [Experimental] Parquet queryable's default block store to query. Valid # options are tsdb and parquet. If it is set to tsdb, parquet queryable always diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 4109092cd6c..75257b54541 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4807,14 +4807,13 @@ thanos_engine: # CLI flag: -querier.enable-parquet-queryable [enable_parquet_queryable: | default = false] -# [Experimental] Maximum size of the Parquet queryable shard cache. 0 to -# disable. -# CLI flag: -querier.parquet-queryable-shard-cache-size -[parquet_queryable_shard_cache_size: | default = 512] +# [Experimental] Maximum size of the Parquet shard cache. 0 to disable. +# CLI flag: -querier.parquet-shard-cache-size +[parquet_shard_cache_size: | default = 512] -# [Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL. -# CLI flag: -querier.parquet-queryable-shard-cache-ttl -[parquet_queryable_shard_cache_ttl: | default = 24h] +# [Experimental] TTL of the Parquet shard cache. 0 to no TTL. +# CLI flag: -querier.parquet-shard-cache-ttl +[parquet_shard_cache_ttl: | default = 24h] # [Experimental] Parquet queryable's default block store to query. Valid options # are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 82bc29f80a0..70c2a5cdacd 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -402,7 +402,7 @@ func TestParquetQueryable_Limits(t *testing.T) { StoreGatewayQueryStatsEnabled: false, StoreGatewayConsistencyCheckMaxAttempts: 3, ParquetShardCache: parquetutil.CacheConfig{ - ParquetQueryableShardCacheSize: 100, + ParquetShardCacheSize: 100, }, ParquetQueryableDefaultBlockStore: "parquet", } diff --git a/pkg/util/parquetutil/cache.go b/pkg/util/parquetutil/cache.go index d9a4fc67ecb..952e84bea3b 100644 --- a/pkg/util/parquetutil/cache.go +++ b/pkg/util/parquetutil/cache.go @@ -29,19 +29,19 @@ type cacheMetrics struct { func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics { return &cacheMetrics{ hits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_hits_total", + Name: "cortex_parquet_cache_hits_total", Help: "Total number of parquet cache hits", }, []string{"name"}), misses: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_misses_total", + Name: "cortex_parquet_cache_misses_total", Help: "Total number of parquet cache misses", }, []string{"name"}), evictions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_parquet_queryable_cache_evictions_total", + Name: "cortex_parquet_cache_evictions_total", Help: "Total number of parquet cache evictions", }, []string{"name"}), size: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "cortex_parquet_queryable_cache_item_count", + Name: "cortex_parquet_cache_item_count", Help: "Current number of cached parquet items", }, []string{"name"}), } @@ -61,23 +61,23 @@ type Cache[T any] struct { } type CacheConfig struct { - ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"` - ParquetQueryableShardCacheTTL time.Duration `yaml:"parquet_queryable_shard_cache_ttl"` - MaintenanceInterval time.Duration `yaml:"-"` + ParquetShardCacheSize int `yaml:"parquet_shard_cache_size"` + ParquetShardCacheTTL time.Duration `yaml:"parquet_shard_cache_ttl"` + MaintenanceInterval time.Duration `yaml:"-"` } func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.IntVar(&cfg.ParquetQueryableShardCacheSize, prefix+"parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.") - f.DurationVar(&cfg.ParquetQueryableShardCacheTTL, prefix+"parquet-queryable-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.") + f.IntVar(&cfg.ParquetShardCacheSize, prefix+"parquet-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet shard cache. 0 to disable.") + f.DurationVar(&cfg.ParquetShardCacheTTL, prefix+"parquet-shard-cache-ttl", 24*time.Hour, "[Experimental] TTL of the Parquet shard cache. 0 to no TTL.") cfg.MaintenanceInterval = defaultMaintenanceInterval } func NewCache[T any](cfg *CacheConfig, name string, reg prometheus.Registerer) (CacheInterface[T], error) { - if cfg.ParquetQueryableShardCacheSize <= 0 { + if cfg.ParquetShardCacheSize <= 0 { return &noopCache[T]{}, nil } metrics := newCacheMetrics(reg) - cache, err := lru.NewWithEvict(cfg.ParquetQueryableShardCacheSize, func(key string, value *cacheEntry[T]) { + cache, err := lru.NewWithEvict(cfg.ParquetShardCacheSize, func(key string, value *cacheEntry[T]) { metrics.evictions.WithLabelValues(name).Inc() metrics.size.WithLabelValues(name).Dec() }) @@ -89,11 +89,11 @@ func NewCache[T any](cfg *CacheConfig, name string, reg prometheus.Registerer) ( cache: cache, name: name, metrics: metrics, - ttl: cfg.ParquetQueryableShardCacheTTL, + ttl: cfg.ParquetShardCacheTTL, stopCh: make(chan struct{}), } - if cfg.ParquetQueryableShardCacheTTL > 0 { + if cfg.ParquetShardCacheTTL > 0 { go c.maintenanceLoop(cfg.MaintenanceInterval) } diff --git a/pkg/util/parquetutil/cache_test.go b/pkg/util/parquetutil/cache_test.go index c9ad1f48ec5..b71b77207bf 100644 --- a/pkg/util/parquetutil/cache_test.go +++ b/pkg/util/parquetutil/cache_test.go @@ -13,9 +13,9 @@ import ( func Test_Cache_LRUEviction(t *testing.T) { reg := prometheus.NewRegistry() cfg := &CacheConfig{ - ParquetQueryableShardCacheSize: 2, - ParquetQueryableShardCacheTTL: 0, - MaintenanceInterval: time.Minute, + ParquetShardCacheSize: 2, + ParquetShardCacheTTL: 0, + MaintenanceInterval: time.Minute, } cache, err := NewCache[string](cfg, "test", reg) require.NoError(t, err) @@ -36,27 +36,27 @@ func Test_Cache_LRUEviction(t *testing.T) { require.Equal(t, "", val2) require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 3 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 2 - # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses - # TYPE cortex_parquet_queryable_cache_misses_total counter - cortex_parquet_queryable_cache_misses_total{name="test"} 1 + # HELP cortex_parquet_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_cache_evictions_total counter + cortex_parquet_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_cache_hits_total counter + cortex_parquet_cache_hits_total{name="test"} 3 + # HELP cortex_parquet_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_cache_item_count gauge + cortex_parquet_cache_item_count{name="test"} 2 + # HELP cortex_parquet_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_cache_misses_total counter + cortex_parquet_cache_misses_total{name="test"} 1 `))) } func Test_Cache_TTLEvictionByGet(t *testing.T) { reg := prometheus.NewRegistry() cfg := &CacheConfig{ - ParquetQueryableShardCacheSize: 10, - ParquetQueryableShardCacheTTL: 100 * time.Millisecond, - MaintenanceInterval: time.Minute, + ParquetShardCacheSize: 10, + ParquetShardCacheTTL: 100 * time.Millisecond, + MaintenanceInterval: time.Minute, } cache, err := NewCache[string](cfg, "test", reg) @@ -75,27 +75,27 @@ func Test_Cache_TTLEvictionByGet(t *testing.T) { require.Equal(t, "", val) require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 0 - # HELP cortex_parquet_queryable_cache_misses_total Total number of parquet cache misses - # TYPE cortex_parquet_queryable_cache_misses_total counter - cortex_parquet_queryable_cache_misses_total{name="test"} 1 + # HELP cortex_parquet_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_cache_evictions_total counter + cortex_parquet_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_cache_hits_total counter + cortex_parquet_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_cache_item_count gauge + cortex_parquet_cache_item_count{name="test"} 0 + # HELP cortex_parquet_cache_misses_total Total number of parquet cache misses + # TYPE cortex_parquet_cache_misses_total counter + cortex_parquet_cache_misses_total{name="test"} 1 `))) } func Test_Cache_TTLEvictionByLoop(t *testing.T) { reg := prometheus.NewRegistry() cfg := &CacheConfig{ - ParquetQueryableShardCacheSize: 10, - ParquetQueryableShardCacheTTL: 100 * time.Millisecond, - MaintenanceInterval: 100 * time.Millisecond, + ParquetShardCacheSize: 10, + ParquetShardCacheTTL: 100 * time.Millisecond, + MaintenanceInterval: 100 * time.Millisecond, } cache, err := NewCache[string](cfg, "test", reg) @@ -117,14 +117,14 @@ func Test_Cache_TTLEvictionByLoop(t *testing.T) { } require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_parquet_queryable_cache_evictions_total Total number of parquet cache evictions - # TYPE cortex_parquet_queryable_cache_evictions_total counter - cortex_parquet_queryable_cache_evictions_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_hits_total Total number of parquet cache hits - # TYPE cortex_parquet_queryable_cache_hits_total counter - cortex_parquet_queryable_cache_hits_total{name="test"} 1 - # HELP cortex_parquet_queryable_cache_item_count Current number of cached parquet items - # TYPE cortex_parquet_queryable_cache_item_count gauge - cortex_parquet_queryable_cache_item_count{name="test"} 0 + # HELP cortex_parquet_cache_evictions_total Total number of parquet cache evictions + # TYPE cortex_parquet_cache_evictions_total counter + cortex_parquet_cache_evictions_total{name="test"} 1 + # HELP cortex_parquet_cache_hits_total Total number of parquet cache hits + # TYPE cortex_parquet_cache_hits_total counter + cortex_parquet_cache_hits_total{name="test"} 1 + # HELP cortex_parquet_cache_item_count Current number of cached parquet items + # TYPE cortex_parquet_cache_item_count gauge + cortex_parquet_cache_item_count{name="test"} 0 `))) } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 66fbc2d76db..30efc2eb96c 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5840,17 +5840,17 @@ "type": "boolean", "x-cli-flag": "querier.parquet-queryable-fallback-disabled" }, - "parquet_queryable_shard_cache_size": { + "parquet_shard_cache_size": { "default": 512, - "description": "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.", + "description": "[Experimental] Maximum size of the Parquet shard cache. 0 to disable.", "type": "number", - "x-cli-flag": "querier.parquet-queryable-shard-cache-size" + "x-cli-flag": "querier.parquet-shard-cache-size" }, - "parquet_queryable_shard_cache_ttl": { + "parquet_shard_cache_ttl": { "default": "24h0m0s", - "description": "[Experimental] TTL of the Parquet queryable shard cache. 0 to no TTL.", + "description": "[Experimental] TTL of the Parquet shard cache. 0 to no TTL.", "type": "string", - "x-cli-flag": "querier.parquet-queryable-shard-cache-ttl", + "x-cli-flag": "querier.parquet-shard-cache-ttl", "x-format": "duration" }, "per_step_stats_enabled": { From 51b5c1d7ccde082d1bc68139b5db1fd5ac31cbb3 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 16 Dec 2025 16:11:34 +0900 Subject: [PATCH 3/4] add changelog Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d59fc2f6f7..5ad6054e5be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased +* [CHANGE] Querier: Renamed `cortex_parquet_queryable_cache_*` metrics to `cortex_parquet_cache_*`. #7146 * [FEATURE] StoreGateway: Introduces a new parquet mode. #7046 * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 * [ENHANCEMENT] StoreGateway: Add tracings to parquet mode. #7125 From 1e041e78dbb36d37961ffbf90bb1c965b4e0de79 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 16 Dec 2025 16:38:02 +0900 Subject: [PATCH 4/4] Rename parquet shard cache funcs Signed-off-by: SungJin1212 --- pkg/querier/parquet_queryable.go | 2 +- pkg/util/parquetutil/{cache.go => shard_cache.go} | 14 +++++++------- .../{cache_test.go => shard_cache_test.go} | 8 ++++---- 3 files changed, 12 insertions(+), 12 deletions(-) rename pkg/util/parquetutil/{cache.go => shard_cache.go} (90%) rename pkg/util/parquetutil/{cache_test.go => shard_cache_test.go} (94%) diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 77f578225cf..b821dced7ee 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -134,7 +134,7 @@ func NewParquetQueryable( return nil, err } - cache, err := parquetutil.NewCache[parquet_storage.ParquetShard](&config.ParquetShardCache, "parquet-shards", extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) + cache, err := parquetutil.NewParquetShardCache[parquet_storage.ParquetShard](&config.ParquetShardCache, "parquet-shards", extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) if err != nil { return nil, err } diff --git a/pkg/util/parquetutil/cache.go b/pkg/util/parquetutil/shard_cache.go similarity index 90% rename from pkg/util/parquetutil/cache.go rename to pkg/util/parquetutil/shard_cache.go index 952e84bea3b..004a35c9eaf 100644 --- a/pkg/util/parquetutil/cache.go +++ b/pkg/util/parquetutil/shard_cache.go @@ -52,7 +52,7 @@ type cacheEntry[T any] struct { expiresAt time.Time } -type Cache[T any] struct { +type ParquetShardCache[T any] struct { cache *lru.Cache[string, *cacheEntry[T]] name string metrics *cacheMetrics @@ -72,7 +72,7 @@ func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) cfg.MaintenanceInterval = defaultMaintenanceInterval } -func NewCache[T any](cfg *CacheConfig, name string, reg prometheus.Registerer) (CacheInterface[T], error) { +func NewParquetShardCache[T any](cfg *CacheConfig, name string, reg prometheus.Registerer) (CacheInterface[T], error) { if cfg.ParquetShardCacheSize <= 0 { return &noopCache[T]{}, nil } @@ -85,7 +85,7 @@ func NewCache[T any](cfg *CacheConfig, name string, reg prometheus.Registerer) ( return nil, err } - c := &Cache[T]{ + c := &ParquetShardCache[T]{ cache: cache, name: name, metrics: metrics, @@ -100,7 +100,7 @@ func NewCache[T any](cfg *CacheConfig, name string, reg prometheus.Registerer) ( return c, nil } -func (c *Cache[T]) maintenanceLoop(interval time.Duration) { +func (c *ParquetShardCache[T]) maintenanceLoop(interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -123,7 +123,7 @@ func (c *Cache[T]) maintenanceLoop(interval time.Duration) { } } -func (c *Cache[T]) Get(path string) (r T) { +func (c *ParquetShardCache[T]) Get(path string) (r T) { if entry, ok := c.cache.Get(path); ok { isExpired := !entry.expiresAt.IsZero() && time.Now().After(entry.expiresAt) @@ -140,7 +140,7 @@ func (c *Cache[T]) Get(path string) (r T) { return } -func (c *Cache[T]) Set(path string, reader T) { +func (c *ParquetShardCache[T]) Set(path string, reader T) { if !c.cache.Contains(path) { c.metrics.size.WithLabelValues(c.name).Inc() } @@ -158,7 +158,7 @@ func (c *Cache[T]) Set(path string, reader T) { c.cache.Add(path, entry) } -func (c *Cache[T]) Close() { +func (c *ParquetShardCache[T]) Close() { close(c.stopCh) } diff --git a/pkg/util/parquetutil/cache_test.go b/pkg/util/parquetutil/shard_cache_test.go similarity index 94% rename from pkg/util/parquetutil/cache_test.go rename to pkg/util/parquetutil/shard_cache_test.go index b71b77207bf..d8b4e8c531d 100644 --- a/pkg/util/parquetutil/cache_test.go +++ b/pkg/util/parquetutil/shard_cache_test.go @@ -17,7 +17,7 @@ func Test_Cache_LRUEviction(t *testing.T) { ParquetShardCacheTTL: 0, MaintenanceInterval: time.Minute, } - cache, err := NewCache[string](cfg, "test", reg) + cache, err := NewParquetShardCache[string](cfg, "test", reg) require.NoError(t, err) defer cache.Close() @@ -59,7 +59,7 @@ func Test_Cache_TTLEvictionByGet(t *testing.T) { MaintenanceInterval: time.Minute, } - cache, err := NewCache[string](cfg, "test", reg) + cache, err := NewParquetShardCache[string](cfg, "test", reg) require.NoError(t, err) defer cache.Close() @@ -98,7 +98,7 @@ func Test_Cache_TTLEvictionByLoop(t *testing.T) { MaintenanceInterval: 100 * time.Millisecond, } - cache, err := NewCache[string](cfg, "test", reg) + cache, err := NewParquetShardCache[string](cfg, "test", reg) require.NoError(t, err) defer cache.Close() @@ -110,7 +110,7 @@ func Test_Cache_TTLEvictionByLoop(t *testing.T) { // sleep longer than TTL time.Sleep(150 * time.Millisecond) - if c, ok := cache.(*Cache[string]); ok { + if c, ok := cache.(*ParquetShardCache[string]); ok { // should delete by maintenance loop _, ok := c.cache.Peek("key1") require.False(t, ok)