diff --git a/integration/parquet_gateway_test.go b/integration/parquet_gateway_test.go new file mode 100644 index 0000000000..2711cf46d1 --- /dev/null +++ b/integration/parquet_gateway_test.go @@ -0,0 +1,646 @@ +//go:build integration_parquet_gateway +// +build integration_parquet_gateway + +package integration + +import ( + "context" + "fmt" + "math/rand" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/integration/e2e" + e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/log" + cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" +) + +func TestParquetGatewayWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { + tests := map[string]struct { + blocksShardingStrategy string // Empty means sharding is disabled. + tenantShardSize int + indexCacheBackend string + chunkCacheBackend string + bucketIndexEnabled bool + }{ + "blocks sharding disabled, memcached index cache": { + blocksShardingStrategy: "", + indexCacheBackend: tsdb.IndexCacheBackendMemcached, + chunkCacheBackend: tsdb.CacheBackendMemcached, + }, + "blocks sharding disabled, multilevel index cache (inmemory, memcached)": { + blocksShardingStrategy: "", + indexCacheBackend: fmt.Sprintf("%v,%v", tsdb.IndexCacheBackendInMemory, tsdb.IndexCacheBackendMemcached), + chunkCacheBackend: tsdb.CacheBackendMemcached, + }, + "blocks sharding disabled, redis index cache": { + blocksShardingStrategy: "", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendRedis, + }, + "blocks sharding disabled, multilevel index cache (inmemory, redis)": { + blocksShardingStrategy: "", + indexCacheBackend: fmt.Sprintf("%v,%v", tsdb.IndexCacheBackendInMemory, tsdb.IndexCacheBackendRedis), + chunkCacheBackend: tsdb.CacheBackendRedis, + }, + "blocks default sharding, inmemory index cache": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendInMemory, + }, + "blocks default sharding, memcached index cache": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendMemcached, + chunkCacheBackend: tsdb.CacheBackendMemcached, + }, + "blocks shuffle sharding, memcached index cache": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + indexCacheBackend: tsdb.IndexCacheBackendMemcached, + chunkCacheBackend: tsdb.CacheBackendMemcached, + }, + "blocks default sharding, inmemory index cache, bucket index enabled": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendInMemory, + bucketIndexEnabled: true, + }, + "blocks shuffle sharding, memcached index cache, bucket index enabled": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + indexCacheBackend: tsdb.IndexCacheBackendInMemory, + bucketIndexEnabled: true, + }, + "blocks default sharding, redis index cache, bucket index enabled": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendRedis, + bucketIndexEnabled: true, + }, + "blocks shuffle sharding, redis index cache, bucket index enabled": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendRedis, + bucketIndexEnabled: true, + }, + "blocks sharding disabled, in-memory chunk cache": { + blocksShardingStrategy: "", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, + "blocks default sharding, in-memory chunk cache": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, + "blocks shuffle sharding, in-memory chunk cache": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, + "block sharding disabled, multi-level chunk cache": { + blocksShardingStrategy: "", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis), + bucketIndexEnabled: true, + }, + "block default sharding, multi-level chunk cache": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis), + bucketIndexEnabled: true, + }, + "block shuffle sharding, multi-level chunk cache": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis), + bucketIndexEnabled: true, + }, + } + + for testName, testCfg := range tests { + t.Run(testName, func(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + numberOfCacheBackends := len(strings.Split(testCfg.indexCacheBackend, ",")) + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.bucket-store.index-cache.backend": testCfg.indexCacheBackend, + "-blocks-storage.bucket-store.chunks-cache.backend": testCfg.chunkCacheBackend, + "-store-gateway.sharding-enabled": strconv.FormatBool(testCfg.blocksShardingStrategy != ""), + "-store-gateway.sharding-strategy": testCfg.blocksShardingStrategy, + "-store-gateway.tenant-shard-size": fmt.Sprintf("%d", testCfg.tenantShardSize), + "-querier.query-store-for-labels-enabled": "true", + "-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled), + "-blocks-storage.bucket-store.bucket-store-type": "parquet", + // Enable parquet converter + "-parquet-converter.enabled": "true", + "-parquet-converter.conversion-interval": "1s", + "-parquet-converter.ring.consul.hostname": "consul:8500", + }) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + memcached := e2ecache.NewMemcached() + redis := e2ecache.NewRedis() + require.NoError(t, s.StartAndWaitReady(consul, minio, memcached, redis)) + + // Add the cache address to the flags. + if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendMemcached) { + flags["-blocks-storage.bucket-store.index-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort) + } + if strings.Contains(testCfg.indexCacheBackend, tsdb.IndexCacheBackendRedis) { + flags["-blocks-storage.bucket-store.index-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort) + } + if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendMemcached) { + flags["-blocks-storage.bucket-store.chunks-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort) + } + if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendRedis) { + flags["-blocks-storage.bucket-store.chunks-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort) + } + + // Start Cortex components. + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + storeGateway1 := e2ecortex.NewStoreGateway("store-gateway-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + storeGateway2 := e2ecortex.NewStoreGateway("store-gateway-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + storeGateways := e2ecortex.NewCompositeCortexService(storeGateway1, storeGateway2) + require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway1, storeGateway2)) + + // Start the querier with configuring store-gateway addresses if sharding is disabled. + if testCfg.blocksShardingStrategy == "" { + flags = mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": strings.Join([]string{storeGateway1.NetworkGRPCEndpoint(), storeGateway2.NetworkGRPCEndpoint()}, ","), + }) + } + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(querier)) + + // Wait until both the distributor and querier have updated the ring. The querier will also watch + // the store-gateway ring if blocks sharding is enabled. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + if testCfg.blocksShardingStrategy != "" { + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(float64(512+(512*storeGateways.NumInstances()))), "cortex_ring_tokens_total")) + } else { + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + } + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Prepare test data similar to parquet_querier_test.go + ctx := context.Background() + rnd := rand.New(rand.NewSource(time.Now().Unix())) + dir := filepath.Join(s.SharedDir(), "data") + numSeries := 10 + numSamples := 60 + lbls := make([]labels.Labels, 0, numSeries*2) + scrapeInterval := time.Minute + statusCodes := []string{"200", "400", "404", "500", "502"} + now := time.Now() + start := now.Add(-time.Hour * 24) + end := now.Add(-time.Hour) + + for i := 0; i < numSeries; i++ { + lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_a", "job", "test", "series", strconv.Itoa(i%3), "status_code", statusCodes[i%5])) + lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_b", "job", "test", "series", strconv.Itoa((i+1)%3), "status_code", statusCodes[(i+1)%5])) + } + + // Create a block with test data + id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10) + require.NoError(t, err) + + // Upload the block to storage + storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, err) + bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) + + err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) + require.NoError(t, err) + + // Wait until we convert the blocks to parquet + cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} { + found := false + foundBucketIndex := false + + err := bkt.Iter(context.Background(), "", func(name string) error { + if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) { + found = true + } + if name == "bucket-index.json.gz" { + foundBucketIndex = true + } + return nil + }, objstore.WithRecursiveIter()) + require.NoError(t, err) + return found && foundBucketIndex + }) + + // Push some series to Cortex for real-time data + series1Timestamp := time.Now() + series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2) + series1, expectedVector1 := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"}) + series2, expectedVector2 := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Wait until the TSDB head is compacted and shipped to the storage. + // The shipped block contains the 1st series, while the 2nd series is in the head. + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total")) + + // Push another series to further compact another block and delete the first block + // due to expired retention. + series3Timestamp := series2Timestamp.Add(blockRangePeriod * 2) + series3, expectedVector3 := generateSeries("series_3", series3Timestamp, prompb.Label{Name: "series_3", Value: "series_3"}) + + res, err = c.Push(series3) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(3), "cortex_ingester_memory_series_created_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total")) + + if testCfg.bucketIndexEnabled { + // Start the compactor to have the bucket index created before querying. + compactor := e2ecortex.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(compactor)) + } else { + // Wait until the querier has discovered the uploaded blocks. + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics)) + } + + // Wait until the store-gateway has synched the new uploaded blocks. When sharding is enabled + // we don't known which store-gateway instance will synch the blocks, so we need to wait on + // metrics extracted from all instances. + if testCfg.blocksShardingStrategy != "" { + // If shuffle sharding is enabled and we have tenant shard size set to 1, + // then the metric only appears in one store gateway instance. + require.NoError(t, storeGateways.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"cortex_bucket_store_blocks_loaded"}, e2e.SkipMissingMetrics)) + } else { + require.NoError(t, storeGateways.WaitSumMetricsWithOptions(e2e.Equals(float64(2*storeGateways.NumInstances())), []string{"cortex_bucket_store_blocks_loaded"}, e2e.WaitMissingMetrics)) + } + + // Check how many tenants have been discovered and synced by store-gateways. + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1*storeGateways.NumInstances())), "cortex_bucket_stores_tenants_discovered")) + if testCfg.blocksShardingStrategy == "shuffle-sharding" { + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1)), "cortex_bucket_stores_tenants_synced")) + } else { + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1*storeGateways.NumInstances())), "cortex_bucket_stores_tenants_synced")) + } + + // Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both). + result, err := c.Query("series_1", series1Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector1, result.(model.Vector)) + + result, err = c.Query("series_2", series2Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector2, result.(model.Vector)) + + result, err = c.Query("series_3", series3Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector3, result.(model.Vector)) + + // Query the pre-uploaded test data + result, err = c.Query("test_series_a", now.Add(-time.Hour)) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + // Should have some results from the pre-uploaded data + assert.Greater(t, len(result.(model.Vector)), 0) + + // Check the in-memory index cache metrics (in the store-gateway). + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64((5+5+2)*numberOfCacheBackends)), "thanos_store_index_cache_requests_total")) + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty + + // Query back again the 1st series from storage. This time it should use the index cache. + result, err = c.Query("series_1", series1Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector1, result.(model.Vector)) + + if numberOfCacheBackends > 1 { + // 6 requests for Expanded Postings, 5 for Postings and 3 for Series. + require.NoError(t, storeGateways.WaitSumMetricsWithOptions(e2e.Equals(float64(6+5+3)), []string{"thanos_store_index_cache_requests_total"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "level", "L0"), + ))) + // In case of L0 cache hits, store gateway might send fewer requests. Should be within range 12 ~ 14. + require.NoError(t, storeGateways.WaitSumMetricsWithOptions(e2e.EqualsAmong(float64(12), float64(14)), []string{"thanos_store_index_cache_requests_total"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "level", "L1"), + ))) + l1IndexCacheRequests, err := storeGateways.SumMetrics([]string{"thanos_store_index_cache_requests_total"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "level", "L1"), + )) + require.NoError(t, err) + l0IndexCacheHits, err := storeGateways.SumMetrics([]string{"thanos_store_index_cache_hits_total"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "level", "L0"), + )) + require.NoError(t, err) + // Make sure l1 cache requests + l0 cache hits is 14. + require.Equal(t, float64(14), l1IndexCacheRequests[0]+l0IndexCacheHits[0]) + } else { + // 6 requests for Expanded Postings, 5 for Postings and 3 for Series. + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(6+5+3)), "thanos_store_index_cache_requests_total")) + } + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache + + // Query metadata. + testMetadataQueriesWithBlocksStorage(t, c, series1[0], series2[0], series3[0], blockRangePeriod) + + // Ensure no service-specific metrics prefix is used by the wrong service. + assertServiceMetricsPrefixes(t, Distributor, distributor) + assertServiceMetricsPrefixes(t, Ingester, ingester) + assertServiceMetricsPrefixes(t, Querier, querier) + assertServiceMetricsPrefixes(t, StoreGateway, storeGateway1) + assertServiceMetricsPrefixes(t, StoreGateway, storeGateway2) + + // Verify that parquet bucket stores are being used + require.NoError(t, storeGateways.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_bucket_stores_cache_hits_total"}, e2e.SkipMissingMetrics)) + }) + } +} + +func TestParquetGatewayWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) { + tests := map[string]struct { + blocksShardingEnabled bool + indexCacheBackend string + bucketIndexEnabled bool + }{ + "blocks sharding enabled, inmemory index cache": { + blocksShardingEnabled: true, + indexCacheBackend: tsdb.IndexCacheBackendInMemory, + }, + "blocks sharding disabled, memcached index cache": { + blocksShardingEnabled: false, + indexCacheBackend: tsdb.IndexCacheBackendMemcached, + }, + "blocks sharding enabled, memcached index cache": { + blocksShardingEnabled: true, + indexCacheBackend: tsdb.IndexCacheBackendMemcached, + }, + "blocks sharding enabled, memcached index cache, bucket index enabled": { + blocksShardingEnabled: true, + indexCacheBackend: tsdb.IndexCacheBackendMemcached, + bucketIndexEnabled: true, + }, + "blocks sharding disabled,redis index cache": { + blocksShardingEnabled: false, + indexCacheBackend: tsdb.IndexCacheBackendRedis, + }, + "blocks sharding enabled, redis index cache": { + blocksShardingEnabled: true, + indexCacheBackend: tsdb.IndexCacheBackendRedis, + }, + "blocks sharding enabled, redis index cache, bucket index enabled": { + blocksShardingEnabled: true, + indexCacheBackend: tsdb.IndexCacheBackendRedis, + bucketIndexEnabled: true, + }, + } + + for testName, testCfg := range tests { + t.Run(testName, func(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, bucketName) + memcached := e2ecache.NewMemcached() + redis := e2ecache.NewRedis() + require.NoError(t, s.StartAndWaitReady(consul, minio, memcached, redis)) + + // Setting the replication factor equal to the number of Cortex replicas + // make sure each replica creates the same blocks, so the total number of + // blocks is stable and easy to assert on. + const seriesReplicationFactor = 2 + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags( + BlocksStorageFlags(), + AlertmanagerLocalFlags(), + map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.bucket-store.index-cache.backend": testCfg.indexCacheBackend, + "-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled), + "-blocks-storage.bucket-store.bucket-store-type": "parquet", + "-querier.query-store-for-labels-enabled": "true", + // Enable parquet converter + "-parquet-converter.enabled": "true", + "-parquet-converter.conversion-interval": "1s", + "-parquet-converter.ring.consul.hostname": "consul:8500", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": strconv.FormatInt(seriesReplicationFactor, 10), + // Store-gateway. + "-store-gateway.sharding-enabled": strconv.FormatBool(testCfg.blocksShardingEnabled), + "-store-gateway.sharding-ring.store": "consul", + "-store-gateway.sharding-ring.consul.hostname": consul.NetworkHTTPEndpoint(), + "-store-gateway.sharding-ring.replication-factor": "1", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs/user-1.yaml", []byte(cortexAlertmanagerUserConfigYaml))) + + // Add the cache address to the flags. + switch testCfg.indexCacheBackend { + case tsdb.IndexCacheBackendMemcached: + flags["-blocks-storage.bucket-store.index-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort) + case tsdb.IndexCacheBackendRedis: + flags["-blocks-storage.bucket-store.index-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort) + } + + // Start Cortex replicas. + cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags, "") + cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags, "") + cluster := e2ecortex.NewCompositeCortexService(cortex1, cortex2) + require.NoError(t, s.StartAndWaitReady(cortex1, cortex2)) + + // Wait until Cortex replicas have updated the ring state. + for _, replica := range cluster.Instances() { + numTokensPerInstance := 512 // Ingesters ring. + if testCfg.blocksShardingEnabled { + numTokensPerInstance += 512 * 2 // Store-gateway ring (read both by the querier and store-gateway). + } + + require.NoError(t, replica.WaitSumMetrics(e2e.Equals(float64(numTokensPerInstance*cluster.NumInstances())), "cortex_ring_tokens_total")) + } + + c, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), cortex2.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Push some series to Cortex. + series1Timestamp := time.Now() + series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2) + series1, expectedVector1 := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"}) + series2, expectedVector2 := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Wait until the TSDB head is compacted and shipped to the storage. + // The shipped block contains the 1st series, while the 2nd series is in the head. + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(1*cluster.NumInstances())), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(1*cluster.NumInstances())), "cortex_ingester_memory_series")) + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*cluster.NumInstances())), "cortex_ingester_memory_series_created_total")) + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(1*cluster.NumInstances())), "cortex_ingester_memory_series_removed_total")) + + // Push another series to further compact another block and delete the first block + // due to expired retention. + series3Timestamp := series2Timestamp.Add(blockRangePeriod * 2) + series3, expectedVector3 := generateSeries("series_3", series3Timestamp, prompb.Label{Name: "series_3", Value: "series_3"}) + + res, err = c.Push(series3) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*cluster.NumInstances())), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(1*cluster.NumInstances())), "cortex_ingester_memory_series")) + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(3*cluster.NumInstances())), "cortex_ingester_memory_series_created_total")) + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*cluster.NumInstances())), "cortex_ingester_memory_series_removed_total")) + + if testCfg.bucketIndexEnabled { + // Start the compactor to have the bucket index created before querying. We need to run the compactor + // as a separate service because it's currently not part of the single binary. + compactor := e2ecortex.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(compactor)) + } else { + // Wait until the querier has discovered the uploaded blocks (discovered both by the querier and store-gateway). + require.NoError(t, cluster.WaitSumMetricsWithOptions(e2e.Equals(float64(2*cluster.NumInstances()*2)), []string{"cortex_blocks_meta_synced"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "component", "querier")))) + } + + // Wait until the store-gateway has synched the new uploaded blocks. The number of blocks loaded + // may be greater than expected if the compactor is running (there may have been compacted). + const shippedBlocks = 2 + if testCfg.blocksShardingEnabled { + require.NoError(t, cluster.WaitSumMetrics(e2e.GreaterOrEqual(float64(shippedBlocks*seriesReplicationFactor)), "cortex_bucket_store_blocks_loaded")) + } else { + require.NoError(t, cluster.WaitSumMetrics(e2e.GreaterOrEqual(float64(shippedBlocks*seriesReplicationFactor*cluster.NumInstances())), "cortex_bucket_store_blocks_loaded")) + } + + // Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both). + result, err := c.Query("series_1", series1Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector1, result.(model.Vector)) + + result, err = c.Query("series_2", series2Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector2, result.(model.Vector)) + + result, err = c.Query("series_3", series3Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector3, result.(model.Vector)) + + // Check the in-memory index cache metrics (in the store-gateway). + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((5+5+2)*seriesReplicationFactor)), "thanos_store_index_cache_requests_total")) // 5 for expanded postings and postings, 2 for series + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty + + if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(21*seriesReplicationFactor)), "thanos_memcached_operations_total")) // 14 gets + 7 sets + } + + // Query back again the 1st series from storage. This time it should use the index cache. + result, err = c.Query("series_1", series1Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector1, result.(model.Vector)) + + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((12+2)*seriesReplicationFactor)), "thanos_store_index_cache_requests_total")) + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*seriesReplicationFactor)), "thanos_store_index_cache_hits_total")) // this time has used the index cache + + if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((21+2)*seriesReplicationFactor)), "thanos_memcached_operations_total")) // as before + 2 gets + } + + // Query metadata. + testMetadataQueriesWithBlocksStorage(t, c, series1[0], series2[0], series3[0], blockRangePeriod) + + // Verify that parquet bucket stores are being used + require.NoError(t, cluster.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_bucket_stores_cache_hits_total"}, e2e.SkipMissingMetrics)) + }) + } +} + +func getMetricName(lbls []prompb.Label) string { + for _, lbl := range lbls { + if lbl.Name == labels.MetricName { + return lbl.Value + } + } + + panic(fmt.Sprintf("series %v has no metric name", lbls)) +} + +func prompbLabelsToModelMetric(pbLabels []prompb.Label) model.Metric { + metric := model.Metric{} + + for _, l := range pbLabels { + metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + + return metric +} diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index 6cac224319..fce486be3b 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -86,6 +86,7 @@ func TestCortex(t *testing.T) { IndexCache: tsdb.IndexCacheConfig{ Backend: tsdb.IndexCacheBackendInMemory, }, + BucketStoreType: string(tsdb.TSDBBucketStore), }, UsersScanner: tsdb.UsersScannerConfig{ Strategy: tsdb.UserScanStrategyList, diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index cc1be08b13..4e9c46c50b 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -61,6 +61,7 @@ var ( ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy") ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode") ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio = errors.New("lazy expanded posting group max key series ratio needs to be equal or greater than 0") + ErrInvalidBucketStoreType = errors.New("invalid bucket store type") ) // BlocksStorageConfig holds the config information for the blocks storage. @@ -292,6 +293,7 @@ type BucketStoreConfig struct { IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"` BucketIndex BucketIndexConfig `yaml:"bucket_index"` BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"` + BucketStoreType string `yaml:"bucket_store_type"` // Chunk pool. MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` @@ -378,6 +380,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&cfg.LazyExpandedPostingGroupMaxKeySeriesRatio, "blocks-storage.bucket-store.lazy-expanded-posting-group-max-key-series-ratio", 100, "Mark posting group as lazy if it fetches more keys than R * max series the query should fetch. With R set to 100, a posting group which fetches 100K keys will be marked as lazy if the current query only fetches 1000 series. This config is only valid if lazy expanded posting is enabled. 0 disables the limit.") f.IntVar(&cfg.SeriesBatchSize, "blocks-storage.bucket-store.series-batch-size", store.SeriesBatchSize, "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.") f.StringVar(&cfg.BlockDiscoveryStrategy, "blocks-storage.bucket-store.block-discovery-strategy", string(ConcurrentDiscovery), "One of "+strings.Join(supportedBlockDiscoveryStrategies, ", ")+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations. bucket_index strategy can be used in Compactor only and utilizes the existing bucket index to fetch block IDs to sync. This avoids iterating the bucket but can be impacted by delays of cleaner creating bucket index.") + f.StringVar(&cfg.BucketStoreType, "blocks-storage.bucket-store.bucket-store-type", "tsdb", "Type of bucket store to use (tsdb or parquet).") f.StringVar(&cfg.TokenBucketBytesLimiter.Mode, "blocks-storage.bucket-store.token-bucket-bytes-limiter.mode", string(TokenBucketBytesLimiterDisabled), fmt.Sprintf("Token bucket bytes limiter mode. Supported values are: %s", strings.Join(supportedTokenBucketBytesLimiterModes, ", "))) f.Int64Var(&cfg.TokenBucketBytesLimiter.InstanceTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size", int64(820*units.Mebibyte), "Instance token bucket size") f.Int64Var(&cfg.TokenBucketBytesLimiter.UserTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size", int64(615*units.Mebibyte), "User token bucket size") @@ -415,6 +418,9 @@ func (cfg *BucketStoreConfig) Validate() error { if !util.StringsContain(supportedTokenBucketBytesLimiterModes, cfg.TokenBucketBytesLimiter.Mode) { return ErrInvalidTokenBucketBytesLimiterMode } + if !util.StringsContain(supportedBucketStoreTypes, cfg.BucketStoreType) { + return ErrInvalidBucketStoreType + } if cfg.LazyExpandedPostingGroupMaxKeySeriesRatio < 0 { return ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio } @@ -450,6 +456,19 @@ var supportedBlockDiscoveryStrategies = []string{ string(BucketIndexDiscovery), } +// BucketStoreType represents the type of bucket store +type BucketStoreType string + +const ( + TSDBBucketStore BucketStoreType = "tsdb" + ParquetBucketStore BucketStoreType = "parquet" +) + +var supportedBucketStoreTypes = []string{ + string(TSDBBucketStore), + string(ParquetBucketStore), +} + type TokenBucketBytesLimiterMode string const ( diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index b9da057ae2..644c68583f 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -44,8 +44,15 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -// BucketStores is a multi-tenant wrapper of Thanos BucketStore. -type BucketStores struct { +// BucketStores defines the methods that any bucket stores implementation must provide +type BucketStores interface { + storepb.StoreServer + SyncBlocks(ctx context.Context) error + InitialSync(ctx context.Context) error +} + +// ThanosBucketStores is a multi-tenant wrapper of Thanos BucketStore. +type ThanosBucketStores struct { logger log.Logger cfg tsdb.BlocksStorageConfig limits *validation.Overrides @@ -74,7 +81,7 @@ type BucketStores struct { storesMu sync.RWMutex stores map[string]*store.BucketStore - // Keeps the last sync error for the bucket store for each tenant. + // Keeps the last sync error for the bucket store for each tenant. storesErrorsMu sync.RWMutex storesErrors map[string]error @@ -86,8 +93,7 @@ type BucketStores struct { userScanner users.Scanner // Keeps number of inflight requests - inflightRequestCnt int - inflightRequestMu sync.RWMutex + inflightRequests *util.InflightRequestTracker // Metrics. syncTimes prometheus.Histogram @@ -99,7 +105,19 @@ type BucketStores struct { var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway") // NewBucketStores makes a new BucketStores. -func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { +func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (BucketStores, error) { + switch cfg.BucketStore.BucketStoreType { + case string(tsdb.ParquetBucketStore): + return newParquetBucketStores(cfg, bucketClient, limits, logger, reg) + case string(tsdb.TSDBBucketStore): + return newThanosBucketStores(cfg, shardingStrategy, bucketClient, limits, logLevel, logger, reg) + default: + return nil, fmt.Errorf("unsupported bucket store type: %s", cfg.BucketStore.BucketStoreType) + } +} + +// newThanosBucketStores creates a new TSDB-based bucket stores +func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*ThanosBucketStores, error) { matchers := tsdb.NewMatchers() cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, tsdb.ParquetLabelsCacheConfig{}, matchers, bucketClient, logger, reg) if err != nil { @@ -114,7 +132,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra Help: "Number of maximum concurrent queries allowed.", }).Set(float64(cfg.BucketStore.MaxConcurrent)) - u := &BucketStores{ + u := &ThanosBucketStores{ logger: logger, cfg: cfg, limits: limits, @@ -128,6 +146,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra queryGate: queryGate, partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg), userTokenBuckets: make(map[string]*util.TokenBucket), + inflightRequests: util.NewInflightRequestTracker(), syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_bucket_stores_blocks_sync_seconds", Help: "The total time it takes to perform a sync stores", @@ -187,7 +206,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra } // InitialSync does an initial synchronization of blocks for all users. -func (u *BucketStores) InitialSync(ctx context.Context) error { +func (u *ThanosBucketStores) InitialSync(ctx context.Context) error { level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users") if err := u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, s *store.BucketStore) error { @@ -202,13 +221,13 @@ func (u *BucketStores) InitialSync(ctx context.Context) error { } // SyncBlocks synchronizes the stores state with the Bucket store for every user. -func (u *BucketStores) SyncBlocks(ctx context.Context) error { +func (u *ThanosBucketStores) SyncBlocks(ctx context.Context) error { return u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, s *store.BucketStore) error { return s.SyncBlocks(ctx) }) } -func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *store.BucketStore) error) error { +func (u *ThanosBucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *store.BucketStore) error) error { retries := backoff.New(ctx, backoff.Config{ MinBackoff: 1 * time.Second, MaxBackoff: 10 * time.Second, @@ -232,7 +251,7 @@ func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(co return lastErr } -func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *store.BucketStore) error) (returnErr error) { +func (u *ThanosBucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *store.BucketStore) error) (returnErr error) { defer func(start time.Time) { u.syncTimes.Observe(time.Since(start).Seconds()) if returnErr == nil { @@ -330,7 +349,7 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte } // Series makes a series request to the underlying user bucket store. -func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { +func (u *ThanosBucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { spanLog, spanCtx := spanlogger.New(srv.Context(), "BucketStores.Series") defer spanLog.Finish() @@ -356,12 +375,12 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri maxInflightRequests := u.cfg.BucketStore.MaxInflightRequests if maxInflightRequests > 0 { - if u.getInflightRequestCnt() >= maxInflightRequests { + if u.inflightRequests.Count() >= maxInflightRequests { return ErrTooManyInflightRequests } - u.incrementInflightRequestCnt() - defer u.decrementInflightRequestCnt() + u.inflightRequests.Inc() + defer u.inflightRequests.Dec() } err = store.Series(req, spanSeriesServer{ @@ -372,26 +391,8 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri return err } -func (u *BucketStores) getInflightRequestCnt() int { - u.inflightRequestMu.RLock() - defer u.inflightRequestMu.RUnlock() - return u.inflightRequestCnt -} - -func (u *BucketStores) incrementInflightRequestCnt() { - u.inflightRequestMu.Lock() - u.inflightRequestCnt++ - u.inflightRequestMu.Unlock() -} - -func (u *BucketStores) decrementInflightRequestCnt() { - u.inflightRequestMu.Lock() - u.inflightRequestCnt-- - u.inflightRequestMu.Unlock() -} - // LabelNames implements the Storegateway proto service. -func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { +func (u *ThanosBucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames") defer spanLog.Finish() @@ -421,7 +422,7 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe } // LabelValues implements the Storegateway proto service. -func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { +func (u *ThanosBucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelValues") defer spanLog.Finish() @@ -450,7 +451,7 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues // scanUsers in the bucket and return the list of found users. It includes active and deleting users // but not deleted users. -func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) { +func (u *ThanosBucketStores) scanUsers(ctx context.Context) ([]string, error) { activeUsers, deletingUsers, _, err := u.userScanner.ScanUsers(ctx) if err != nil { return nil, err @@ -477,13 +478,13 @@ func deduplicateUsers(users []string) []string { return uniqueUsers } -func (u *BucketStores) getStore(userID string) *store.BucketStore { +func (u *ThanosBucketStores) getStore(userID string) *store.BucketStore { u.storesMu.RLock() defer u.storesMu.RUnlock() return u.stores[userID] } -func (u *BucketStores) getStoreError(userID string) error { +func (u *ThanosBucketStores) getStoreError(userID string) error { u.storesErrorsMu.RLock() defer u.storesErrorsMu.RUnlock() return u.storesErrors[userID] @@ -499,7 +500,7 @@ var ( // If bucket store doesn't exist, returns errBucketStoreNotFound. // If bucket store is not empty, returns errBucketStoreNotEmpty. // Otherwise returns error from closing the bucket store. -func (u *BucketStores) closeEmptyBucketStore(userID string) error { +func (u *ThanosBucketStores) closeEmptyBucketStore(userID string) error { u.storesMu.Lock() unlockInDefer := true defer func() { @@ -537,11 +538,11 @@ func isEmptyBucketStore(bs *store.BucketStore) bool { return min == math.MaxInt64 && max == math.MinInt64 } -func (u *BucketStores) syncDirForUser(userID string) string { +func (u *ThanosBucketStores) syncDirForUser(userID string) string { return filepath.Join(u.cfg.BucketStore.SyncDir, userID) } -func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) { +func (u *ThanosBucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) { // Check if the store already exists. bs := u.getStore(userID) if bs != nil { @@ -721,7 +722,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro // deleteLocalFilesForExcludedTenants removes local "sync" directories for tenants that are not included in the current // shard. -func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[string]struct{}) { +func (u *ThanosBucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[string]struct{}) { files, err := os.ReadDir(u.cfg.BucketStore.SyncDir) if err != nil { return @@ -760,13 +761,13 @@ func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[str } } -func (u *BucketStores) getUserTokenBucket(userID string) *util.TokenBucket { +func (u *ThanosBucketStores) getUserTokenBucket(userID string) *util.TokenBucket { u.userTokenBucketsMu.RLock() defer u.userTokenBucketsMu.RUnlock() return u.userTokenBuckets[userID] } -func (u *BucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 { +func (u *ThanosBucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 { tokensToRetrieve := float64(tokens) switch dataType { case store.PostingsFetched: diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 674a2bae27..3254b398cf 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -132,17 +132,19 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { // Should set the error on user-1 require.NoError(t, stores.InitialSync(ctx)) if tc.mockInitialSync { - s, ok := status.FromError(stores.storesErrors["user-1"]) + thanosStores := stores.(*ThanosBucketStores) + s, ok := status.FromError(thanosStores.storesErrors["user-1"]) require.True(t, ok) require.Equal(t, s.Code(), codes.PermissionDenied) - require.ErrorIs(t, stores.storesErrors["user-2"], nil) + require.ErrorIs(t, thanosStores.storesErrors["user-2"], nil) } require.NoError(t, stores.SyncBlocks(context.Background())) if tc.mockInitialSync { - s, ok := status.FromError(stores.storesErrors["user-1"]) + thanosStores := stores.(*ThanosBucketStores) + s, ok := status.FromError(thanosStores.storesErrors["user-1"]) require.True(t, ok) require.Equal(t, s.Code(), codes.PermissionDenied) - require.ErrorIs(t, stores.storesErrors["user-2"], nil) + require.ErrorIs(t, thanosStores.storesErrors["user-2"], nil) } mBucket.GetFailures = tc.GetFailures @@ -168,8 +170,9 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { // Cleaning the error mBucket.GetFailures = map[string]error{} require.NoError(t, stores.SyncBlocks(context.Background())) - require.ErrorIs(t, stores.storesErrors["user-1"], nil) - require.ErrorIs(t, stores.storesErrors["user-2"], nil) + thanosStores := stores.(*ThanosBucketStores) + require.ErrorIs(t, thanosStores.storesErrors["user-1"], nil) + require.ErrorIs(t, thanosStores.storesErrors["user-2"], nil) _, _, err = querySeries(stores, "user-1", "series", 0, 100) require.NoError(t, err) _, _, err = querySeries(stores, "user-2", "series", 0, 100) @@ -259,7 +262,8 @@ func TestBucketStores_InitialSync(t *testing.T) { "cortex_bucket_stores_gate_queries_in_flight", )) - assert.Greater(t, testutil.ToFloat64(stores.syncLastSuccess), float64(0)) + thanosStores := stores.(*ThanosBucketStores) + assert.Greater(t, testutil.ToFloat64(thanosStores.syncLastSuccess), float64(0)) } func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) { @@ -319,7 +323,8 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) { "cortex_bucket_store_blocks_loaded", )) - assert.Greater(t, testutil.ToFloat64(stores.syncLastSuccess), float64(0)) + thanosStores := stores.(*ThanosBucketStores) + assert.Greater(t, testutil.ToFloat64(thanosStores.syncLastSuccess), float64(0)) } func TestBucketStores_SyncBlocks(t *testing.T) { @@ -389,7 +394,8 @@ func TestBucketStores_SyncBlocks(t *testing.T) { "cortex_bucket_stores_gate_queries_in_flight", )) - assert.Greater(t, testutil.ToFloat64(stores.syncLastSuccess), float64(0)) + thanosStores := stores.(*ThanosBucketStores) + assert.Greater(t, testutil.ToFloat64(thanosStores.syncLastSuccess), float64(0)) } func TestBucketStores_syncUsersBlocks(t *testing.T) { @@ -442,7 +448,8 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) { // Sync user stores and count the number of times the callback is called. var storesCount atomic.Int32 - err = stores.syncUsersBlocks(context.Background(), func(ctx context.Context, bs *store.BucketStore) error { + thanosStores := stores.(*ThanosBucketStores) + err = thanosStores.syncUsersBlocks(context.Background(), func(ctx context.Context, bs *store.BucketStore) error { storesCount.Inc() return nil }) @@ -474,7 +481,7 @@ func TestBucketStores_scanUsers(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - stores := &BucketStores{ + stores := &ThanosBucketStores{ userScanner: testData.scanner, } @@ -574,9 +581,11 @@ func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *t require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) - stores.inflightRequestMu.Lock() - stores.inflightRequestCnt = 10 - stores.inflightRequestMu.Unlock() + thanosStores := stores.(*ThanosBucketStores) + // Set inflight requests to the limit + for i := 0; i < 10; i++ { + thanosStores.inflightRequests.Inc() + } series, warnings, err := querySeries(stores, "user_id", "series_1", 0, 100) assert.ErrorIs(t, err, ErrTooManyInflightRequests) assert.Empty(t, series) @@ -595,9 +604,11 @@ func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabl require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) - stores.inflightRequestMu.Lock() - stores.inflightRequestCnt = 10 // max_inflight_request is set to 0 by default = disabled - stores.inflightRequestMu.Unlock() + thanosStores := stores.(*ThanosBucketStores) + // Set inflight requests to the limit (max_inflight_request is set to 0 by default = disabled) + for i := 0; i < 10; i++ { + thanosStores.inflightRequests.Inc() + } series, _, err := querySeries(stores, "user_id", "series_1", 0, 100) require.NoError(t, err) assert.Equal(t, 1, len(series)) @@ -715,7 +726,26 @@ func generateStorageBlock(t *testing.T, storageDir, userID string, metricName st require.NoError(t, db.Snapshot(userDir, true)) } -func querySeries(stores *BucketStores, userID, metricName string, minT, maxT int64) ([]*storepb.Series, annotations.Annotations, error) { +func querySeries(stores BucketStores, userID, metricName string, minT, maxT int64) ([]*storepb.Series, annotations.Annotations, error) { + req := &storepb.SeriesRequest{ + MinTime: minT, + MaxTime: maxT, + Matchers: []storepb.LabelMatcher{{ + Type: storepb.LabelMatcher_EQ, + Name: labels.MetricName, + Value: metricName, + }}, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + } + + ctx := setUserIDToGRPCContext(context.Background(), userID) + srv := newBucketStoreSeriesServer(ctx) + err := stores.Series(req, srv) + + return srv.SeriesSet, srv.Warnings, err +} + +func querySeriesWithBlockIDs(stores BucketStores, userID, metricName string, minT, maxT int64, blocks []string) ([]*storepb.Series, annotations.Annotations, error) { req := &storepb.SeriesRequest{ MinTime: minT, MaxTime: maxT, @@ -734,7 +764,7 @@ func querySeries(stores *BucketStores, userID, metricName string, minT, maxT int return srv.SeriesSet, srv.Warnings, err } -func queryLabelsNames(stores *BucketStores, userID, metricName string, start, end int64) (*storepb.LabelNamesResponse, error) { +func queryLabelsNames(stores BucketStores, userID, metricName string, start, end int64) (*storepb.LabelNamesResponse, error) { req := &storepb.LabelNamesRequest{ Start: start, End: end, @@ -750,7 +780,7 @@ func queryLabelsNames(stores *BucketStores, userID, metricName string, start, en return stores.LabelNames(ctx, req) } -func queryLabelsValues(stores *BucketStores, userID, labelName, metricName string, start, end int64) (*storepb.LabelValuesResponse, error) { +func queryLabelsValues(stores BucketStores, userID, labelName, metricName string, start, end int64) (*storepb.LabelValuesResponse, error) { req := &storepb.LabelValuesRequest{ Start: start, End: end, @@ -910,32 +940,34 @@ func TestBucketStores_tokenBuckets(t *testing.T) { reg := prometheus.NewPedanticRegistry() stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) assert.NoError(t, err) - assert.NotNil(t, stores.instanceTokenBucket) + thanosStores := stores.(*ThanosBucketStores) + assert.NotNil(t, thanosStores.instanceTokenBucket) assert.NoError(t, stores.InitialSync(ctx)) - assert.NotNil(t, stores.getUserTokenBucket("user-1")) - assert.NotNil(t, stores.getUserTokenBucket("user-2")) + assert.NotNil(t, thanosStores.getUserTokenBucket("user-1")) + assert.NotNil(t, thanosStores.getUserTokenBucket("user-2")) sharding.users = []string{user1} assert.NoError(t, stores.SyncBlocks(ctx)) - assert.NotNil(t, stores.getUserTokenBucket("user-1")) - assert.Nil(t, stores.getUserTokenBucket("user-2")) + assert.NotNil(t, thanosStores.getUserTokenBucket("user-1")) + assert.Nil(t, thanosStores.getUserTokenBucket("user-2")) sharding.users = []string{} assert.NoError(t, stores.SyncBlocks(ctx)) - assert.Nil(t, stores.getUserTokenBucket("user-1")) - assert.Nil(t, stores.getUserTokenBucket("user-2")) + assert.Nil(t, thanosStores.getUserTokenBucket("user-1")) + assert.Nil(t, thanosStores.getUserTokenBucket("user-2")) cfg.BucketStore.TokenBucketBytesLimiter.Mode = string(cortex_tsdb.TokenBucketBytesLimiterDryRun) sharding.users = []string{user1, user2} reg = prometheus.NewPedanticRegistry() stores, err = NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) assert.NoError(t, err) - assert.NotNil(t, stores.instanceTokenBucket) + thanosStores = stores.(*ThanosBucketStores) + assert.NotNil(t, thanosStores.instanceTokenBucket) assert.NoError(t, stores.InitialSync(ctx)) - assert.NotNil(t, stores.getUserTokenBucket("user-1")) - assert.NotNil(t, stores.getUserTokenBucket("user-2")) + assert.NotNil(t, thanosStores.getUserTokenBucket("user-1")) + assert.NotNil(t, thanosStores.getUserTokenBucket("user-2")) cfg.BucketStore.TokenBucketBytesLimiter.Mode = string(cortex_tsdb.TokenBucketBytesLimiterDisabled) sharding.users = []string{user1, user2} @@ -944,9 +976,10 @@ func TestBucketStores_tokenBuckets(t *testing.T) { assert.NoError(t, err) assert.NoError(t, stores.InitialSync(ctx)) - assert.Nil(t, stores.instanceTokenBucket) - assert.Nil(t, stores.getUserTokenBucket("user-1")) - assert.Nil(t, stores.getUserTokenBucket("user-2")) + thanosStores = stores.(*ThanosBucketStores) + assert.Nil(t, thanosStores.instanceTokenBucket) + assert.Nil(t, thanosStores.getUserTokenBucket("user-1")) + assert.Nil(t, thanosStores.getUserTokenBucket("user-2")) } func TestBucketStores_getTokensToRetrieve(t *testing.T) { @@ -966,12 +999,13 @@ func TestBucketStores_getTokensToRetrieve(t *testing.T) { stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) assert.NoError(t, err) - assert.Equal(t, int64(2), stores.getTokensToRetrieve(2, store.PostingsFetched)) - assert.Equal(t, int64(4), stores.getTokensToRetrieve(2, store.PostingsTouched)) - assert.Equal(t, int64(6), stores.getTokensToRetrieve(2, store.SeriesFetched)) - assert.Equal(t, int64(8), stores.getTokensToRetrieve(2, store.SeriesTouched)) - assert.Equal(t, int64(0), stores.getTokensToRetrieve(2, store.ChunksFetched)) - assert.Equal(t, int64(1), stores.getTokensToRetrieve(2, store.ChunksTouched)) + thanosStores := stores.(*ThanosBucketStores) + assert.Equal(t, int64(2), thanosStores.getTokensToRetrieve(2, store.PostingsFetched)) + assert.Equal(t, int64(4), thanosStores.getTokensToRetrieve(2, store.PostingsTouched)) + assert.Equal(t, int64(6), thanosStores.getTokensToRetrieve(2, store.SeriesFetched)) + assert.Equal(t, int64(8), thanosStores.getTokensToRetrieve(2, store.SeriesTouched)) + assert.Equal(t, int64(0), thanosStores.getTokensToRetrieve(2, store.ChunksFetched)) + assert.Equal(t, int64(1), thanosStores.getTokensToRetrieve(2, store.ChunksTouched)) } func getUsersInDir(t *testing.T, dir string) []string { diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 9e61d63abf..94f7d65ace 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -118,7 +118,7 @@ type StoreGateway struct { gatewayCfg Config storageCfg cortex_tsdb.BlocksStorageConfig logger log.Logger - stores *BucketStores + stores BucketStores // Ring used for sharding blocks. ringLifecycler *ring.BasicLifecycler diff --git a/pkg/storegateway/gateway_ring.go b/pkg/storegateway/gateway_ring.go index 798d1221a2..3c9ea79f55 100644 --- a/pkg/storegateway/gateway_ring.go +++ b/pkg/storegateway/gateway_ring.go @@ -111,8 +111,8 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.DetailedMetricsEnabled, ringFlagsPrefix+"detailed-metrics-enabled", true, "Set to true to enable ring detailed metrics. These metrics provide detailed information, such as token count and ownership per tenant. Disabling them can significantly decrease the number of metrics emitted.") // Wait stability flags. - f.DurationVar(&cfg.WaitStabilityMinDuration, ringFlagsPrefix+"wait-stability-min-duration", time.Minute, "Minimum time to wait for ring stability at startup. 0 to disable.") - f.DurationVar(&cfg.WaitStabilityMaxDuration, ringFlagsPrefix+"wait-stability-max-duration", 5*time.Minute, "Maximum time to wait for ring stability at startup. If the store-gateway ring keeps changing after this period of time, the store-gateway will start anyway.") + f.DurationVar(&cfg.WaitStabilityMinDuration, ringFlagsPrefix+"wait-stability-min-duration", 0, "Minimum time to wait for ring stability at startup. 0 to disable.") + f.DurationVar(&cfg.WaitStabilityMaxDuration, ringFlagsPrefix+"wait-stability-max-duration", 5*time.Second, "Maximum time to wait for ring stability at startup. If the store-gateway ring keeps changing after this period of time, the store-gateway will start anyway.") f.DurationVar(&cfg.FinalSleep, ringFlagsPrefix+"final-sleep", 0*time.Second, "The sleep seconds when store-gateway is shutting down. Need to be close to or larger than KV Store information propagation delay") diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index b9070c236e..68f03f2060 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -185,10 +185,11 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { assert.Equal(t, RingNumTokens, len(g.ringLifecycler.GetTokens())) assert.Subset(t, g.ringLifecycler.GetTokens(), testData.initialTokens) - assert.NotNil(t, g.stores.getStore("user-1")) - assert.NotNil(t, g.stores.getStore("user-2")) - assert.Nil(t, g.stores.getStore("user-disabled")) - assert.Nil(t, g.stores.getStore("user-unknown")) + thanosStores := g.stores.(*ThanosBucketStores) + assert.NotNil(t, thanosStores.getStore("user-1")) + assert.NotNil(t, thanosStores.getStore("user-2")) + assert.Nil(t, thanosStores.getStore("user-disabled")) + assert.Nil(t, thanosStores.getStore("user-unknown")) }) } } @@ -219,10 +220,11 @@ func TestStoreGateway_InitialSyncWithShardingDisabled(t *testing.T) { bucketClient.MockExists(path.Join("user-disabled", "markers", cortex_tsdb.TenantDeletionMarkFile), false, nil) require.NoError(t, services.StartAndAwaitRunning(ctx, g)) - assert.NotNil(t, g.stores.getStore("user-1")) - assert.NotNil(t, g.stores.getStore("user-2")) - assert.Nil(t, g.stores.getStore("user-disabled")) - assert.Nil(t, g.stores.getStore("user-unknown")) + thanosStores := g.stores.(*ThanosBucketStores) + assert.NotNil(t, thanosStores.getStore("user-1")) + assert.NotNil(t, thanosStores.getStore("user-2")) + assert.Nil(t, thanosStores.getStore("user-disabled")) + assert.Nil(t, thanosStores.getStore("user-unknown")) } func TestStoreGateway_InitialSyncFailure(t *testing.T) { diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go new file mode 100644 index 0000000000..61a33515fe --- /dev/null +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -0,0 +1,762 @@ +package storegateway + +import ( + "context" + "fmt" + "sort" + "strings" + "sync" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/gogo/protobuf/types" + "github.com/parquet-go/parquet-go" + "github.com/pkg/errors" + "github.com/prometheus-community/parquet-common/convert" + "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/search" + parquet_storage "github.com/prometheus-community/parquet-common/storage" + "github.com/prometheus-community/parquet-common/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + prom_storage "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + storecache "github.com/thanos-io/thanos/pkg/store/cache" + "github.com/thanos-io/thanos/pkg/store/hintspb" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/weaveworks/common/httpgrpc" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/cortexproject/cortex/pkg/querysharding" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/tsdb" + cortex_util "github.com/cortexproject/cortex/pkg/util" + cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" + "github.com/cortexproject/cortex/pkg/util/spanlogger" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +// ParquetBucketStores is a multi-tenant wrapper for parquet bucket stores +type ParquetBucketStores struct { + logger log.Logger + cfg tsdb.BlocksStorageConfig + limits *validation.Overrides + bucket objstore.Bucket + + storesMu sync.RWMutex + stores map[string]*parquetBucketStore + + // Keeps the last sync error for the bucket store for each tenant. + storesErrorsMu sync.RWMutex + storesErrors map[string]error + + chunksDecoder *schema.PrometheusParquetChunksDecoder + + matcherCache storecache.MatchersCache + + inflightRequests *cortex_util.InflightRequestTracker +} + +// parquetBucketStore represents a single tenant's parquet store +type parquetBucketStore struct { + logger log.Logger + bucket objstore.Bucket + limits *validation.Overrides + concurrency int + + chunksDecoder *schema.PrometheusParquetChunksDecoder + + matcherCache storecache.MatchersCache +} + +// newParquetBucketStores creates a new multi-tenant parquet bucket stores +func newParquetBucketStores(cfg tsdb.BlocksStorageConfig, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logger log.Logger, reg prometheus.Registerer) (*ParquetBucketStores, error) { + // Create caching bucket client for parquet bucket stores + cachingBucket, err := createCachingBucketClientForParquet(cfg, bucketClient, "parquet-storegateway", logger, reg) + if err != nil { + return nil, err + } + + u := &ParquetBucketStores{ + logger: logger, + cfg: cfg, + limits: limits, + bucket: cachingBucket, + stores: map[string]*parquetBucketStore{}, + storesErrors: map[string]error{}, + chunksDecoder: schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()), + inflightRequests: cortex_util.NewInflightRequestTracker(), + } + + if cfg.BucketStore.MatchersCacheMaxItems > 0 { + r := prometheus.NewRegistry() + reg.MustRegister(tsdb.NewMatchCacheMetrics("cortex_storegateway", r, logger)) + u.matcherCache, err = storecache.NewMatchersCache(storecache.WithSize(cfg.BucketStore.MatchersCacheMaxItems), storecache.WithPromRegistry(r)) + if err != nil { + return nil, err + } + } else { + u.matcherCache = storecache.NoopMatchersCache + } + + return u, nil +} + +// Series implements BucketStores +func (u *ParquetBucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + spanLog, spanCtx := spanlogger.New(srv.Context(), "ParquetBucketStores.Series") + defer spanLog.Finish() + + userID := getUserIDFromGRPCContext(spanCtx) + if userID == "" { + return fmt.Errorf("no userID") + } + + err := u.getStoreError(userID) + userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) + if err != nil { + if cortex_errors.ErrorIs(err, userBkt.IsAccessDeniedErr) { + return httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err) + } + + return err + } + + store, err := u.getOrCreateStore(userID) + if err != nil { + return status.Error(codes.Internal, err.Error()) + } + + maxInflightRequests := u.cfg.BucketStore.MaxInflightRequests + if maxInflightRequests > 0 { + if u.inflightRequests.Count() >= maxInflightRequests { + return ErrTooManyInflightRequests + } + + u.inflightRequests.Inc() + defer u.inflightRequests.Dec() + } + + return store.Series(req, spanSeriesServer{ + Store_SeriesServer: srv, + ctx: spanCtx, + }) +} + +// LabelNames implements BucketStores +func (u *ParquetBucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + userID := getUserIDFromGRPCContext(ctx) + if userID == "" { + return nil, fmt.Errorf("no userID") + } + + err := u.getStoreError(userID) + userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) + if err != nil { + if cortex_errors.ErrorIs(err, userBkt.IsAccessDeniedErr) { + return nil, httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err) + } + + return nil, err + } + + store, err := u.getOrCreateStore(userID) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return store.LabelNames(ctx, req) +} + +// LabelValues implements BucketStores +func (u *ParquetBucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + userID := getUserIDFromGRPCContext(ctx) + if userID == "" { + return nil, fmt.Errorf("no userID") + } + + err := u.getStoreError(userID) + userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) + if err != nil { + if cortex_errors.ErrorIs(err, userBkt.IsAccessDeniedErr) { + return nil, httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err) + } + + return nil, err + } + + store, err := u.getOrCreateStore(userID) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return store.LabelValues(ctx, req) +} + +// SyncBlocks implements BucketStores +func (u *ParquetBucketStores) SyncBlocks(ctx context.Context) error { + return nil +} + +// InitialSync implements BucketStores +func (u *ParquetBucketStores) InitialSync(ctx context.Context) error { + return nil +} + +func (u *ParquetBucketStores) getStoreError(userID string) error { + u.storesErrorsMu.RLock() + defer u.storesErrorsMu.RUnlock() + return u.storesErrors[userID] +} + +// getOrCreateStore gets or creates a parquet bucket store for the given user +func (u *ParquetBucketStores) getOrCreateStore(userID string) (*parquetBucketStore, error) { + u.storesMu.RLock() + store, exists := u.stores[userID] + u.storesMu.RUnlock() + + if exists { + return store, nil + } + + u.storesMu.Lock() + defer u.storesMu.Unlock() + + // Double-check after acquiring write lock + if store, exists = u.stores[userID]; exists { + return store, nil + } + + // Check if there was an error creating this store + if err, exists := u.storesErrors[userID]; exists { + return nil, err + } + + // Create new store + userLogger := log.With(u.logger, "user", userID) + store, err := u.createParquetBucketStore(userID, userLogger) + if err != nil { + u.storesErrors[userID] = err + return nil, err + } + + u.stores[userID] = store + return store, nil +} + +// createParquetBucketStore creates a new parquet bucket store for a user +func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger log.Logger) (*parquetBucketStore, error) { + level.Info(userLogger).Log("msg", "creating parquet bucket store") + + // Create user-specific bucket client + userBucket := bucket.NewUserBucketClient(userID, u.bucket, u.limits) + + store := &parquetBucketStore{ + logger: userLogger, + bucket: userBucket, + limits: u.limits, + concurrency: 4, // TODO: make this configurable + chunksDecoder: u.chunksDecoder, + matcherCache: u.matcherCache, + } + + return store, nil +} + +// findParquetBlocks finds parquet shards for the given user and time range +func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatchers []storepb.LabelMatcher) ([]*parquetBlock, error) { + if len(blockMatchers) != 1 || blockMatchers[0].Type != storepb.LabelMatcher_RE || blockMatchers[0].Name != block.BlockIDLabel { + return nil, status.Error(codes.InvalidArgument, "only one block matcher is supported") + } + + blockIDs := strings.Split(blockMatchers[0].Value, "|") + blocks := make([]*parquetBlock, 0, len(blockIDs)) + bucketOpener := parquet_storage.NewParquetBucketOpener(p.bucket) + noopQuota := search.NewQuota(search.NoopQuotaLimitFunc(ctx)) + for _, blockID := range blockIDs { + block, err := p.newParquetBlock(ctx, blockID, bucketOpener, bucketOpener, p.chunksDecoder, noopQuota, noopQuota, noopQuota) + if err != nil { + return nil, err + } + blocks = append(blocks, block) + } + + return blocks, nil +} + +// Series implements the store interface for a single parquet bucket store +func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { + matchers, err := storecache.MatchersToPromMatchersCached(p.matcherCache, req.Matchers...) + if err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + + ctx := srv.Context() + resHints := &hintspb.SeriesResponseHints{} + var anyHints *types.Any + + var blockMatchers []storepb.LabelMatcher + if req.Hints != nil { + reqHints := &hintspb.SeriesRequestHints{} + if err := types.UnmarshalAny(req.Hints, reqHints); err != nil { + return status.Error(codes.InvalidArgument, errors.Wrap(err, "unmarshal series request hints").Error()) + } + blockMatchers = reqHints.BlockMatchers + } + + ctx = injectShardInfoIntoContext(ctx, req.ShardInfo) + + // Find parquet shards for the time range + shards, err := p.findParquetBlocks(ctx, blockMatchers) + if err != nil { + return fmt.Errorf("failed to find parquet shards: %w", err) + } + + seriesSet := make([]prom_storage.ChunkSeriesSet, len(shards)) + errGroup, ctx := errgroup.WithContext(srv.Context()) + errGroup.SetLimit(p.concurrency) + + for i, shard := range shards { + resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{ + Id: shard.name, + }) + errGroup.Go(func() error { + ss, err := shard.Query(ctx, req.MinTime, req.MaxTime, req.SkipChunks, matchers) + seriesSet[i] = ss + return err + }) + } + + if err = errGroup.Wait(); err != nil { + return err + } + + ss := convert.NewMergeChunkSeriesSet(seriesSet, labels.Compare, prom_storage.NewConcatenatingChunkSeriesMerger()) + for ss.Next() { + cs := ss.At() + cIter := cs.Iterator(nil) + chunks := make([]storepb.AggrChunk, 0) + for cIter.Next() { + chunk := cIter.At() + chunks = append(chunks, storepb.AggrChunk{ + MinTime: chunk.MinTime, + MaxTime: chunk.MaxTime, + Raw: &storepb.Chunk{ + Type: chunkToStoreEncoding(chunk.Chunk.Encoding()), + Data: chunk.Chunk.Bytes(), + }, + }) + } + if err = srv.Send(storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(cs.Labels()), + Chunks: chunks, + })); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) + return + } + } + + if anyHints, err = types.MarshalAny(resHints); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "marshal series response hints").Error()) + return + } + + if err = srv.Send(storepb.NewHintsSeriesResponse(anyHints)); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "send series response hints").Error()) + return + } + + return nil +} + +// LabelNames implements the store interface for a single parquet bucket store +func (p *parquetBucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + matchers, err := storecache.MatchersToPromMatchersCached(p.matcherCache, req.Matchers...) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + resHints := &hintspb.LabelNamesResponseHints{} + + var blockMatchers []storepb.LabelMatcher + if req.Hints != nil { + reqHints := &hintspb.LabelNamesRequestHints{} + if err := types.UnmarshalAny(req.Hints, reqHints); err != nil { + return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "unmarshal label names request hints").Error()) + } + blockMatchers = reqHints.BlockMatchers + } + + // Find parquet shards for the time range + shards, err := p.findParquetBlocks(ctx, blockMatchers) + if err != nil { + return nil, fmt.Errorf("failed to find parquet shards: %w", err) + } + + resNameSets := make([][]string, len(shards)) + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(p.concurrency) + + for i, s := range shards { + resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{ + Id: s.name, + }) + errGroup.Go(func() error { + r, err := s.LabelNames(ctx, req.Limit, matchers) + resNameSets[i] = r + return err + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + anyHints, err := types.MarshalAny(resHints) + if err != nil { + return nil, status.Error(codes.Unknown, errors.Wrap(err, "marshal label names response hints").Error()) + } + result := util.MergeUnsortedSlices(int(req.Limit), resNameSets...) + + return &storepb.LabelNamesResponse{ + Names: result, + Hints: anyHints, + }, nil +} + +// LabelValues implements the store interface for a single parquet bucket store +func (p *parquetBucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + matchers, err := storecache.MatchersToPromMatchersCached(p.matcherCache, req.Matchers...) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + resHints := &hintspb.LabelValuesResponseHints{} + var blockMatchers []storepb.LabelMatcher + if req.Hints != nil { + reqHints := &hintspb.LabelValuesRequestHints{} + if err := types.UnmarshalAny(req.Hints, reqHints); err != nil { + return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "unmarshal label values request hints").Error()) + } + blockMatchers = reqHints.BlockMatchers + } + + // Find parquet shards for the time range + shards, err := p.findParquetBlocks(ctx, blockMatchers) + if err != nil { + return nil, fmt.Errorf("failed to find parquet shards: %w", err) + } + + resNameValues := make([][]string, len(shards)) + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(p.concurrency) + + for i, s := range shards { + resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{ + Id: s.name, + }) + errGroup.Go(func() error { + r, err := s.LabelValues(ctx, req.Label, req.Limit, matchers) + resNameValues[i] = r + return err + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + anyHints, err := types.MarshalAny(resHints) + if err != nil { + return nil, status.Error(codes.Unknown, errors.Wrap(err, "marshal label values response hints").Error()) + } + result := util.MergeUnsortedSlices(int(req.Limit), resNameValues...) + + return &storepb.LabelValuesResponse{ + Values: result, + Hints: anyHints, + }, nil +} + +type parquetBlock struct { + name string + shard parquet_storage.ParquetShard + m *search.Materializer + concurrency int +} + +func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, labelsFileOpener, chunksFileOpener parquet_storage.ParquetOpener, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*parquetBlock, error) { + shard, err := parquet_storage.NewParquetShardOpener( + context.WithoutCancel(ctx), + name, + labelsFileOpener, + chunksFileOpener, + 0, + parquet_storage.WithFileOptions( + parquet.SkipMagicBytes(true), + parquet.ReadBufferSize(100*1024), + parquet.SkipBloomFilters(true), + parquet.OptimisticRead(true), + ), + ) + if err != nil { + return nil, errors.Wrapf(err, "failed to open parquet shard. block: %v", name) + } + + s, err := shard.TSDBSchema() + if err != nil { + return nil, err + } + m, err := search.NewMaterializer(s, d, shard, p.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, search.NoopMaterializedSeriesFunc, materializedLabelsFilterCallback) + if err != nil { + return nil, err + } + + return &parquetBlock{ + shard: shard, + m: m, + concurrency: p.concurrency, + name: name, + }, nil +} + +type contextKey int + +var ( + shardInfoCtxKey contextKey = 1 +) + +func injectShardInfoIntoContext(ctx context.Context, si *storepb.ShardInfo) context.Context { + return context.WithValue(ctx, shardInfoCtxKey, si) +} + +func extractShardInfoFromContext(ctx context.Context) (*storepb.ShardInfo, bool) { + if si := ctx.Value(shardInfoCtxKey); si != nil { + return si.(*storepb.ShardInfo), true + } + + return nil, false +} + +func materializedLabelsFilterCallback(ctx context.Context, _ *prom_storage.SelectHints) (search.MaterializedLabelsFilter, bool) { + shardInfo, exists := extractShardInfoFromContext(ctx) + if !exists { + return nil, false + } + sm := shardInfo.Matcher(&querysharding.Buffers) + if !sm.IsSharded() { + return nil, false + } + return &shardMatcherLabelsFilter{shardMatcher: sm}, true +} + +type shardMatcherLabelsFilter struct { + shardMatcher *storepb.ShardMatcher +} + +func (f *shardMatcherLabelsFilter) Filter(lbls labels.Labels) bool { + return f.shardMatcher.MatchesLabels(lbls) +} + +func (f *shardMatcherLabelsFilter) Close() { + f.shardMatcher.Close() +} + +func (b *parquetBlock) Query(ctx context.Context, mint, maxt int64, skipChunks bool, matchers []*labels.Matcher) (prom_storage.ChunkSeriesSet, error) { + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(b.concurrency) + + rowGroupCount := len(b.shard.LabelsFile().RowGroups()) + results := make([][]prom_storage.ChunkSeries, rowGroupCount) + for i := range results { + results[i] = make([]prom_storage.ChunkSeries, 0, 1024/rowGroupCount) + } + + for rgi := range rowGroupCount { + errGroup.Go(func() error { + cs, err := search.MatchersToConstraints(matchers...) + if err != nil { + return err + } + err = search.Initialize(b.shard.LabelsFile(), cs...) + if err != nil { + return err + } + rr, err := search.Filter(ctx, b.shard, rgi, cs...) + if err != nil { + return err + } + + if len(rr) == 0 { + return nil + } + + seriesSetIter, err := b.m.Materialize(ctx, nil, rgi, mint, maxt, skipChunks, rr) + if err != nil { + return err + } + defer func() { _ = seriesSetIter.Close() }() + for seriesSetIter.Next() { + results[rgi] = append(results[rgi], seriesSetIter.At()) + } + sort.Sort(byLabels(results[rgi])) + return seriesSetIter.Err() + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + totalResults := 0 + for _, res := range results { + totalResults += len(res) + } + + resultsFlattened := make([]prom_storage.ChunkSeries, 0, totalResults) + for _, res := range results { + resultsFlattened = append(resultsFlattened, res...) + } + sort.Sort(byLabels(resultsFlattened)) + + return convert.NewChunksSeriesSet(resultsFlattened), nil +} + +func (b *parquetBlock) LabelNames(ctx context.Context, limit int64, matchers []*labels.Matcher) ([]string, error) { + if len(matchers) == 0 { + return b.m.MaterializeAllLabelNames(), nil + } + + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(b.concurrency) + + results := make([][]string, len(b.shard.LabelsFile().RowGroups())) + + for rgi := range b.shard.LabelsFile().RowGroups() { + errGroup.Go(func() error { + cs, err := search.MatchersToConstraints(matchers...) + if err != nil { + return err + } + err = search.Initialize(b.shard.LabelsFile(), cs...) + if err != nil { + return err + } + rr, err := search.Filter(ctx, b.shard, rgi, cs...) + if err != nil { + return err + } + series, err := b.m.MaterializeLabelNames(ctx, rgi, rr) + if err != nil { + return err + } + results[rgi] = series + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + return util.MergeUnsortedSlices(int(limit), results...), nil +} + +func (b *parquetBlock) LabelValues(ctx context.Context, name string, limit int64, matchers []*labels.Matcher) ([]string, error) { + if len(matchers) == 0 { + return b.allLabelValues(ctx, name, limit) + } + + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(b.concurrency) + + results := make([][]string, len(b.shard.LabelsFile().RowGroups())) + + for rgi := range b.shard.LabelsFile().RowGroups() { + errGroup.Go(func() error { + cs, err := search.MatchersToConstraints(matchers...) + if err != nil { + return err + } + err = search.Initialize(b.shard.LabelsFile(), cs...) + if err != nil { + return err + } + rr, err := search.Filter(ctx, b.shard, rgi, cs...) + if err != nil { + return err + } + series, err := b.m.MaterializeLabelValues(ctx, name, rgi, rr) + if err != nil { + return err + } + results[rgi] = series + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + return util.MergeUnsortedSlices(int(limit), results...), nil +} + +func (b *parquetBlock) allLabelValues(ctx context.Context, name string, limit int64) ([]string, error) { + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(b.concurrency) + + results := make([][]string, len(b.shard.LabelsFile().RowGroups())) + + for i := range b.shard.LabelsFile().RowGroups() { + errGroup.Go(func() error { + series, err := b.m.MaterializeAllLabelValues(ctx, name, i) + if err != nil { + return err + } + results[i] = series + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + return util.MergeUnsortedSlices(int(limit), results...), nil +} + +type byLabels []prom_storage.ChunkSeries + +func (b byLabels) Len() int { return len(b) } +func (b byLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b byLabels) Less(i, j int) bool { return labels.Compare(b[i].Labels(), b[j].Labels()) < 0 } + +func chunkToStoreEncoding(in chunkenc.Encoding) storepb.Chunk_Encoding { + switch in { + case chunkenc.EncXOR: + return storepb.Chunk_XOR + case chunkenc.EncHistogram: + return storepb.Chunk_HISTOGRAM + case chunkenc.EncFloatHistogram: + return storepb.Chunk_FLOAT_HISTOGRAM + default: + panic("unknown chunk encoding") + } +} + +// createCachingBucketClientForParquet creates a caching bucket client for parquet bucket stores +func createCachingBucketClientForParquet(storageCfg tsdb.BlocksStorageConfig, bucketClient objstore.InstrumentedBucket, name string, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) { + // Create caching bucket using the existing infrastructure + matchers := tsdb.NewMatchers() + cachingBucket, err := tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, storageCfg.BucketStore.ParquetLabelsCache, matchers, bucketClient, logger, reg) + if err != nil { + return nil, errors.Wrap(err, "create caching bucket for parquet") + } + return cachingBucket, nil +} diff --git a/pkg/storegateway/parquet_bucket_stores_test.go b/pkg/storegateway/parquet_bucket_stores_test.go new file mode 100644 index 0000000000..bf815e2b24 --- /dev/null +++ b/pkg/storegateway/parquet_bucket_stores_test.go @@ -0,0 +1,323 @@ +package storegateway + +import ( + "context" + "errors" + "testing" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/store/storepb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +func TestParquetBucketStores_Series_NoUserID(t *testing.T) { + stores := &ParquetBucketStores{ + logger: log.NewNopLogger(), + } + + req := &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 100, + Matchers: []storepb.LabelMatcher{{ + Type: storepb.LabelMatcher_EQ, + Name: labels.MetricName, + Value: "test_metric", + }}, + } + + srv := newBucketStoreSeriesServer(context.Background()) + err := stores.Series(req, srv) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "no userID") +} + +func TestParquetBucketStores_Series_StoreCreationError(t *testing.T) { + // Create a mock bucket client + mockBucket := &bucket.ClientMock{} + + stores := &ParquetBucketStores{ + logger: log.NewNopLogger(), + bucket: mockBucket, + stores: make(map[string]*parquetBucketStore), + storesErrors: make(map[string]error), + } + + // Simulate a store creation error + stores.storesErrors["user-1"] = errors.New("store creation failed") + + req := &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 100, + Matchers: []storepb.LabelMatcher{{ + Type: storepb.LabelMatcher_EQ, + Name: labels.MetricName, + Value: "test_metric", + }}, + } + + ctx := setUserIDToGRPCContext(context.Background(), "user-1") + srv := newBucketStoreSeriesServer(ctx) + err := stores.Series(req, srv) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "store creation failed") +} + +func TestParquetBucketStores_LabelNames_NoUserID(t *testing.T) { + stores := &ParquetBucketStores{ + logger: log.NewNopLogger(), + } + + req := &storepb.LabelNamesRequest{ + Start: 0, + End: 100, + } + + _, err := stores.LabelNames(context.Background(), req) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "no userID") +} + +func TestParquetBucketStores_LabelValues_NoUserID(t *testing.T) { + stores := &ParquetBucketStores{ + logger: log.NewNopLogger(), + } + + req := &storepb.LabelValuesRequest{ + Start: 0, + End: 100, + Label: "__name__", + } + + _, err := stores.LabelValues(context.Background(), req) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "no userID") +} + +func TestParquetBucketStore_FindParquetBlocks_InvalidMatchers(t *testing.T) { + store := &parquetBucketStore{ + logger: log.NewNopLogger(), + } + + // Test with no matchers + _, err := store.findParquetBlocks(context.Background(), nil) + assert.Error(t, err) + s, ok := status.FromError(err) + assert.True(t, ok) + assert.Equal(t, codes.InvalidArgument, s.Code()) + + // Test with multiple matchers + _, err = store.findParquetBlocks(context.Background(), []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: block.BlockIDLabel, Value: "block1"}, + {Type: storepb.LabelMatcher_RE, Name: block.BlockIDLabel, Value: "block2"}, + }) + assert.Error(t, err) + s, ok = status.FromError(err) + assert.True(t, ok) + assert.Equal(t, codes.InvalidArgument, s.Code()) + + // Test with wrong matcher type + _, err = store.findParquetBlocks(context.Background(), []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: block.BlockIDLabel, Value: "block1"}, + }) + assert.Error(t, err) + s, ok = status.FromError(err) + assert.True(t, ok) + assert.Equal(t, codes.InvalidArgument, s.Code()) + + // Test with wrong matcher name + _, err = store.findParquetBlocks(context.Background(), []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "wrong_name", Value: "block1"}, + }) + assert.Error(t, err) + s, ok = status.FromError(err) + assert.True(t, ok) + assert.Equal(t, codes.InvalidArgument, s.Code()) +} + +func TestChunkToStoreEncoding(t *testing.T) { + tests := []struct { + name string + encoding chunkenc.Encoding + expected storepb.Chunk_Encoding + }{ + { + name: "XOR encoding", + encoding: chunkenc.EncXOR, + expected: storepb.Chunk_XOR, + }, + { + name: "Histogram encoding", + encoding: chunkenc.EncHistogram, + expected: storepb.Chunk_HISTOGRAM, + }, + { + name: "Float histogram encoding", + encoding: chunkenc.EncFloatHistogram, + expected: storepb.Chunk_FLOAT_HISTOGRAM, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := chunkToStoreEncoding(tt.encoding) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestParquetBucketStoresWithCaching(t *testing.T) { + // Create a temporary directory for the test + tempDir := t.TempDir() + + // Create storage configuration with caching enabled + storageCfg := cortex_tsdb.BlocksStorageConfig{ + Bucket: bucket.Config{ + Backend: "filesystem", + Filesystem: filesystem.Config{ + Directory: tempDir, + }, + }, + BucketStore: cortex_tsdb.BucketStoreConfig{ + ChunksCache: cortex_tsdb.ChunksCacheConfig{ + BucketCacheBackend: cortex_tsdb.BucketCacheBackend{ + Backend: "inmemory", + }, + }, + MetadataCache: cortex_tsdb.MetadataCacheConfig{ + BucketCacheBackend: cortex_tsdb.BucketCacheBackend{ + Backend: "inmemory", + }, + }, + ParquetLabelsCache: cortex_tsdb.ParquetLabelsCacheConfig{ + BucketCacheBackend: cortex_tsdb.BucketCacheBackend{ + Backend: "inmemory", + }, + }, + }, + } + + // Create a mock bucket client + bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, nil, "test", log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + + // Create limits + limits := validation.NewOverrides(validation.Limits{}, nil) + + // Create parquet bucket stores with caching + parquetStores, err := newParquetBucketStores(storageCfg, bucketClient, limits, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + require.NotNil(t, parquetStores) + + // Verify that the bucket is a caching bucket (it should be wrapped) + // The caching bucket should be different from the original bucket client + require.NotEqual(t, bucketClient, parquetStores.bucket) +} + +func TestCreateCachingBucketClientForParquet(t *testing.T) { + // Create a temporary directory for the test + tempDir := t.TempDir() + + // Create storage configuration with caching enabled + storageCfg := cortex_tsdb.BlocksStorageConfig{ + Bucket: bucket.Config{ + Backend: "filesystem", + Filesystem: filesystem.Config{ + Directory: tempDir, + }, + }, + BucketStore: cortex_tsdb.BucketStoreConfig{ + ChunksCache: cortex_tsdb.ChunksCacheConfig{ + BucketCacheBackend: cortex_tsdb.BucketCacheBackend{ + Backend: "inmemory", + }, + }, + MetadataCache: cortex_tsdb.MetadataCacheConfig{ + BucketCacheBackend: cortex_tsdb.BucketCacheBackend{ + Backend: "inmemory", + }, + }, + ParquetLabelsCache: cortex_tsdb.ParquetLabelsCacheConfig{ + BucketCacheBackend: cortex_tsdb.BucketCacheBackend{ + Backend: "inmemory", + }, + }, + }, + } + + // Create a mock bucket client + bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, nil, "test", log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + + // Create caching bucket client + cachingBucket, err := createCachingBucketClientForParquet(storageCfg, bucketClient, "test", log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + require.NotNil(t, cachingBucket) + + // Verify that the caching bucket is different from the original bucket client + require.NotEqual(t, bucketClient, cachingBucket) +} + +func TestParquetBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *testing.T) { + cfg := prepareStorageConfig(t) + cfg.BucketStore.BucketStoreType = string(cortex_tsdb.ParquetBucketStore) + cfg.BucketStore.MaxInflightRequests = 10 + reg := prometheus.NewPedanticRegistry() + storageDir := t.TempDir() + generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15) + bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + require.NoError(t, stores.InitialSync(context.Background())) + + parquetStores := stores.(*ParquetBucketStores) + // Set inflight requests to the limit + for i := 0; i < 10; i++ { + parquetStores.inflightRequests.Inc() + } + series, warnings, err := querySeries(stores, "user_id", "series_1", 0, 100) + assert.ErrorIs(t, err, ErrTooManyInflightRequests) + assert.Empty(t, series) + assert.Empty(t, warnings) +} + +//func TestParquetBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabled(t *testing.T) { +// cfg := prepareStorageConfig(t) +// cfg.BucketStore.BucketStoreType = string(ParquetBucketStore) +// reg := prometheus.NewPedanticRegistry() +// storageDir := t.TempDir() +// generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15) +// bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) +// require.NoError(t, err) +// +// stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) +// require.NoError(t, err) +// require.NoError(t, stores.InitialSync(context.Background())) +// +// parquetStores := stores.(*ParquetBucketStores) +// // Set inflight requests to the limit (max_inflight_request is set to 0 by default = disabled) +// for i := 0; i < 10; i++ { +// parquetStores.inflightRequests.Inc() +// } +// series, _, err := querySeriesWithBlockIDs(stores, "user_id", "series_1", 0, 100) +// require.NoError(t, err) +// assert.Equal(t, 1, len(series)) +//} diff --git a/pkg/util/inflight.go b/pkg/util/inflight.go new file mode 100644 index 0000000000..3ad1326b3c --- /dev/null +++ b/pkg/util/inflight.go @@ -0,0 +1,30 @@ +package util + +import "sync" + +type InflightRequestTracker struct { + mu sync.RWMutex + cnt int +} + +func NewInflightRequestTracker() *InflightRequestTracker { + return &InflightRequestTracker{} +} + +func (t *InflightRequestTracker) Inc() { + t.mu.Lock() + t.cnt++ + t.mu.Unlock() +} + +func (t *InflightRequestTracker) Dec() { + t.mu.Lock() + t.cnt-- + t.mu.Unlock() +} + +func (t *InflightRequestTracker) Count() int { + t.mu.RLock() + defer t.mu.RUnlock() + return t.cnt +}