diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ff1a4b2ce..30d8d16d67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ * [ENHANCEMENT] Store Gateway: Allow to ignore syncing blocks older than certain time using `ignore_blocks_before`. #6830 * [ENHANCEMENT] Distributor: Add native histograms max sample size bytes limit validation. #6834 * [ENHANCEMENT] Querier: Support caching parquet labels file in parquet queryable. #6835 +* [ENHANCEMENT] Querier: Support query limits in parquet queryable. #6870 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/go.mod b/go.mod index fe03be51c3..24210961be 100644 --- a/go.mod +++ b/go.mod @@ -83,7 +83,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 + github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.4 diff --git a/go.sum b/go.sum index 26277164a4..3e0c1eff20 100644 --- a/go.sum +++ b/go.sum @@ -814,8 +814,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 h1:xHR2Xex5XWYl5rQKObX8sVqykPXzlL0Rytd9mKo0sss= -github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is= +github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 h1:XoOXq+q+CcY8MZqAVoPtdG3R6o84aeZpZFDM+C9DJXg= +github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= diff --git a/pkg/querier/error_translate_queryable.go b/pkg/querier/error_translate_queryable.go index 0c55a15c58..7e0418d463 100644 --- a/pkg/querier/error_translate_queryable.go +++ b/pkg/querier/error_translate_queryable.go @@ -5,6 +5,7 @@ import ( "github.com/gogo/status" "github.com/pkg/errors" + "github.com/prometheus-community/parquet-common/search" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" @@ -48,6 +49,11 @@ func TranslateToPromqlAPIError(err error) error { return err // 422 } + if search.IsResourceExhausted(err) { + cause := errors.Cause(err) + return cause // 422 + } + s, ok := status.FromError(err) if !ok { diff --git a/pkg/querier/error_translate_queryable_test.go b/pkg/querier/error_translate_queryable_test.go index b5d13a96e4..b1b3414909 100644 --- a/pkg/querier/error_translate_queryable_test.go +++ b/pkg/querier/error_translate_queryable_test.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/regexp" "github.com/pkg/errors" + "github.com/prometheus-community/parquet-common/search" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/promslog" "github.com/prometheus/common/route" @@ -82,6 +83,12 @@ func TestApiStatusCodes(t *testing.T) { expectedCode: 422, }, + { + err: search.NewQuota(1).Reserve(2), + expectedString: "resource exhausted (used 1)", + expectedCode: 422, + }, + // 505 is translated to 500 { err: httpgrpc.Errorf(http.StatusHTTPVersionNotSupported, "test"), diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 481034ef4d..8d7fe7152e 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -23,11 +23,13 @@ import ( "github.com/thanos-io/thanos/pkg/strutil" "golang.org/x/sync/errgroup" + "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/multierror" "github.com/cortexproject/cortex/pkg/util/services" @@ -132,6 +134,62 @@ func NewParquetQueryable( cDecoder := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()) + parquetQueryableOpts := []queryable.QueryableOpts{ + queryable.WithRowCountLimitFunc(func(ctx context.Context) int64 { + // Ignore error as this shouldn't happen. + // If failed to resolve tenant we will just use the default limit value. + userID, _ := tenant.TenantID(ctx) + return int64(limits.ParquetMaxFetchedRowCount(userID)) + }), + queryable.WithChunkBytesLimitFunc(func(ctx context.Context) int64 { + // Ignore error as this shouldn't happen. + // If failed to resolve tenant we will just use the default limit value. + userID, _ := tenant.TenantID(ctx) + return int64(limits.ParquetMaxFetchedChunkBytes(userID)) + }), + queryable.WithDataBytesLimitFunc(func(ctx context.Context) int64 { + // Ignore error as this shouldn't happen. + // If failed to resolve tenant we will just use the default limit value. + userID, _ := tenant.TenantID(ctx) + return int64(limits.ParquetMaxFetchedDataBytes(userID)) + }), + queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error { + queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx) + lbls := make([][]cortexpb.LabelAdapter, 0, len(cs)) + for _, series := range cs { + chkCount := 0 + chunkSize := 0 + lblSize := 0 + lblAdapter := cortexpb.FromLabelsToLabelAdapters(series.Labels()) + lbls = append(lbls, lblAdapter) + for _, lbl := range lblAdapter { + lblSize += lbl.Size() + } + iter := series.Iterator(nil) + for iter.Next() { + chk := iter.At() + chunkSize += len(chk.Chunk.Bytes()) + chkCount++ + } + if chkCount > 0 { + if err := queryLimiter.AddChunks(chkCount); err != nil { + return validation.LimitError(err.Error()) + } + if err := queryLimiter.AddChunkBytes(chunkSize); err != nil { + return validation.LimitError(err.Error()) + } + } + + if err := queryLimiter.AddDataBytes(chunkSize + lblSize); err != nil { + return validation.LimitError(err.Error()) + } + } + if err := queryLimiter.AddSeries(lbls...); err != nil { + return validation.LimitError(err.Error()) + } + return nil + }), + } parquetQueryable, err := queryable.NewParquetQueryable(cDecoder, func(ctx context.Context, mint, maxt int64) ([]parquet_storage.ParquetShard, error) { userID, err := tenant.TenantID(ctx) if err != nil { @@ -182,7 +240,7 @@ func NewParquetQueryable( } return shards, errGroup.Wait() - }) + }, parquetQueryableOpts...) p := &parquetQueryableWithFallback{ subservices: manager, @@ -376,7 +434,7 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool userID, err := tenant.TenantID(ctx) if err != nil { - storage.ErrSeriesSet(err) + return storage.ErrSeriesSet(err) } if q.limits.QueryVerticalShardSize(userID) > 1 { diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 33e8f73bed..13cdde6cd5 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -2,26 +2,42 @@ package querier import ( "context" + "fmt" + "math/rand" + "path/filepath" "testing" "time" "github.com/go-kit/log" "github.com/oklog/ulid/v2" + "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/user" + "github.com/cortexproject/cortex/integration/e2e" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/series" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" "github.com/cortexproject/cortex/pkg/storage/parquet" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/limiter" + "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -409,6 +425,209 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { }) } +func TestParquetQueryable_Limits(t *testing.T) { + t.Parallel() + + const ( + metricName = "test_metric" + minT = int64(0) + maxT = int64(1000) + ) + + bkt, tempDir := cortex_testutil.PrepareFilesystemBucket(t) + + config := Config{ + QueryStoreAfter: 0, + StoreGatewayQueryStatsEnabled: false, + StoreGatewayConsistencyCheckMaxAttempts: 3, + ParquetQueryableShardCacheSize: 100, + ParquetQueryableDefaultBlockStore: "parquet", + } + + storageCfg := cortex_tsdb.BlocksStorageConfig{ + Bucket: bucket.Config{ + Backend: "filesystem", + Filesystem: filesystem.Config{ + Directory: tempDir, + }, + }, + } + + ctx := context.Background() + seriesCount := 100 + lbls := make([]labels.Labels, seriesCount) + for i := 0; i < seriesCount; i++ { + lbls[i] = labels.Labels{ + {Name: labels.MetricName, Value: metricName}, + {Name: "series", Value: fmt.Sprintf("%d", i)}, + } + } + + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + blockID, err := e2e.CreateBlock(ctx, rnd, tempDir, lbls, 100, 0, 1000, 10, 1000) + require.NoError(t, err) + + blockDir := filepath.Join(tempDir, blockID.String()) + userBkt := bucket.NewUserBucketClient("user-1", bkt, nil) + err = block.Upload(ctx, log.NewNopLogger(), userBkt, blockDir, metadata.NoneFunc) + require.NoError(t, err) + + err = convertBlockToParquet(t, ctx, userBkt, blockID, blockDir) + require.NoError(t, err) + + // Create a mocked bucket index blocks finder + finder := &blocksFinderMock{} + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ + &bucketindex.Block{ID: blockID, Parquet: &parquet.ConverterMarkMeta{Version: parquet.CurrentVersion}}, + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + tests := map[string]struct { + limits *validation.Overrides + queryLimiter *limiter.QueryLimiter + expectedErr error + }{ + "row count limit hit - Parquet Queryable": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + limits.ParquetMaxFetchedRowCount = 1 + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0), + expectedErr: fmt.Errorf("would fetch too many rows: resource exhausted (used 1)"), + }, + "max series per query limit hit": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(1, 0, 0, 0), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)), + }, + "max chunks per query limit hit": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(0, 0, 1, 0), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 1)), + }, + "max chunk page size limit hit - Parquet Queryable": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + limits.ParquetMaxFetchedChunkBytes = 1 + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(0, 1, 0, 0), + expectedErr: fmt.Errorf("materializer failed to materialize chunks: would fetch too many chunk bytes: resource exhausted (used 1)"), + }, + "max chunk bytes per query limit hit": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(0, 1, 0, 0), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, 1)), + }, + "max data bytes per query limit hit": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + limits.ParquetMaxFetchedDataBytes = 1 + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 1), + expectedErr: fmt.Errorf("error materializing labels: materializer failed to materialize columns: would fetch too many data bytes: resource exhausted (used 1)"), + }, + "limits within bounds - should succeed": { + limits: func() *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + limits.MaxFetchedSeriesPerQuery = 1000 + limits.MaxFetchedChunkBytesPerQuery = 1000000 + limits.MaxFetchedDataBytesPerQuery = 1000000 + return validation.NewOverrides(limits, nil) + }(), + queryLimiter: limiter.NewQueryLimiter(1000, 1000000, 1000, 1000000), + expectedErr: nil, + }, + } + + for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + + ctx := user.InjectOrgID(context.Background(), "user-1") + ctx = limiter.AddQueryLimiterToContext(ctx, testData.queryLimiter) + + mockBlocksStoreQueryable := &BlocksStoreQueryable{finder: finder, Service: services.NewIdleService(func(_ context.Context) error { + return nil + }, func(_ error) error { + return nil + })} + + parquetQueryable, err := NewParquetQueryable(config, storageCfg, testData.limits, mockBlocksStoreQueryable, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + err = services.StartAndAwaitRunning(ctx, parquetQueryable.(*parquetQueryableWithFallback)) + require.NoError(t, err) + + querier, err := parquetQueryable.Querier(minT, maxT) + require.NoError(t, err) + defer querier.Close() + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), + } + + set := querier.Select(ctx, true, nil, matchers...) + if testData.expectedErr != nil { + require.False(t, set.Next()) + err = set.Err() + require.EqualError(t, err, testData.expectedErr.Error()) + return + } + + require.NoError(t, set.Err()) + }) + } +} + +// convertBlockToParquet converts a TSDB block to parquet and uploads it to the bucket +func convertBlockToParquet(t *testing.T, ctx context.Context, userBucketClient objstore.Bucket, blockID ulid.ULID, blockDir string) error { + tsdbBlock, err := tsdb.OpenBlock(nil, blockDir, chunkenc.NewPool(), tsdb.DefaultPostingsDecoderFactory) + require.NoError(t, err) + + converterOpts := []convert.ConvertOption{ + convert.WithSortBy(labels.MetricName), + convert.WithColDuration(time.Hour * 8), + convert.WithRowGroupSize(1000), + convert.WithName(blockID.String()), + } + + _, err = convert.ConvertTSDBBlock( + ctx, + userBucketClient, + tsdbBlock.MinTime(), + tsdbBlock.MaxTime(), + []convert.Convertible{tsdbBlock}, + converterOpts..., + ) + require.NoError(t, err) + + _ = tsdbBlock.Close() + + // Write parquet converter marker + err = parquet.WriteConverterMark(ctx, blockID, userBucketClient) + require.NoError(t, err) + + return nil +} + func defaultOverrides(t *testing.T, queryVerticalShardSize int) *validation.Overrides { limits := validation.Limits{} flagext.DefaultValues(&limits) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 372c1b2c1e..fcd96fea36 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -187,6 +187,11 @@ type Limits struct { QueryVerticalShardSize int `yaml:"query_vertical_shard_size" json:"query_vertical_shard_size" doc:"hidden"` QueryPartialData bool `yaml:"query_partial_data" json:"query_partial_data" doc:"nocli|description=Enable to allow queries to be evaluated with data from a single zone, if other zones are not available.|default=false"` + // Parquet Queryable enforced limits. + ParquetMaxFetchedRowCount int `yaml:"parquet_max_fetched_row_count" json:"parquet_max_fetched_row_count" doc:"hidden"` + ParquetMaxFetchedChunkBytes int `yaml:"parquet_max_fetched_chunk_bytes" json:"parquet_max_fetched_chunk_bytes" doc:"hidden"` + ParquetMaxFetchedDataBytes int `yaml:"parquet_max_fetched_data_bytes" json:"parquet_max_fetched_data_bytes" doc:"hidden"` + // Query Frontend / Scheduler enforced limits. MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant" json:"max_outstanding_requests_per_tenant"` QueryPriority QueryPriority `yaml:"query_priority" json:"query_priority" doc:"nocli|description=Configuration for query priority."` @@ -320,6 +325,11 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.") f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.") + // Parquet Queryable enforced limits. + f.IntVar(&l.ParquetMaxFetchedRowCount, "querier.parquet-queryable.max-fetched-row-count", 0, "The maximum number of rows that can be fetched when querying parquet storage. Each row maps to a series in a parquet file. This limit applies before materializing chunks. 0 to disable.") + f.IntVar(&l.ParquetMaxFetchedChunkBytes, "querier.parquet-queryable.max-fetched-chunk-bytes", 0, "The maximum number of bytes that can be used to fetch chunk column pages when querying parquet storage. 0 to disable.") + f.IntVar(&l.ParquetMaxFetchedDataBytes, "querier.parquet-queryable.max-fetched-data-bytes", 0, "The maximum number of bytes that can be used to fetch all column pages when querying parquet storage. 0 to disable.") + // Store-gateway. f.Float64Var(&l.StoreGatewayTenantShardSize, "store-gateway.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set when the store-gateway sharding is enabled with the shuffle-sharding strategy. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 the shard size will be a percentage of the total store-gateways.") f.IntVar(&l.MaxDownloadedBytesPerRequest, "store-gateway.max-downloaded-bytes-per-request", 0, "The maximum number of data bytes to download per gRPC request in Store Gateway, including Series/LabelNames/LabelValues requests. 0 to disable.") @@ -894,6 +904,21 @@ func (o *Overrides) ParquetConverterEnabled(userID string) bool { return o.GetOverridesForUser(userID).ParquetConverterEnabled } +// ParquetMaxFetchedRowCount returns the maximum number of rows that can be fetched when querying parquet storage. +func (o *Overrides) ParquetMaxFetchedRowCount(userID string) int { + return o.GetOverridesForUser(userID).ParquetMaxFetchedRowCount +} + +// ParquetMaxFetchedChunkBytes returns the maximum number of bytes that can be used to fetch chunk column pages when querying parquet storage. +func (o *Overrides) ParquetMaxFetchedChunkBytes(userID string) int { + return o.GetOverridesForUser(userID).ParquetMaxFetchedChunkBytes +} + +// ParquetMaxFetchedDataBytes returns the maximum number of bytes that can be used to fetch all column pages when querying parquet storage. +func (o *Overrides) ParquetMaxFetchedDataBytes(userID string) int { + return o.GetOverridesForUser(userID).ParquetMaxFetchedDataBytes +} + // CompactorPartitionIndexSizeBytes returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy. func (o *Overrides) CompactorPartitionIndexSizeBytes(userID string) int64 { return o.GetOverridesForUser(userID).CompactorPartitionIndexSizeBytes diff --git a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go index 3bcd8bcbd4..c4b2996f5a 100644 --- a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go +++ b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go @@ -34,11 +34,19 @@ import ( type ShardsFinderFunction func(ctx context.Context, mint, maxt int64) ([]storage.ParquetShard, error) type queryableOpts struct { - concurrency int + concurrency int + rowCountLimitFunc search.QuotaLimitFunc + chunkBytesLimitFunc search.QuotaLimitFunc + dataBytesLimitFunc search.QuotaLimitFunc + materializedSeriesCallback search.MaterializedSeriesFunc } var DefaultQueryableOpts = queryableOpts{ - concurrency: runtime.GOMAXPROCS(0), + concurrency: runtime.GOMAXPROCS(0), + rowCountLimitFunc: search.NoopQuotaLimitFunc, + chunkBytesLimitFunc: search.NoopQuotaLimitFunc, + dataBytesLimitFunc: search.NoopQuotaLimitFunc, + materializedSeriesCallback: search.NoopMaterializedSeriesFunc, } type QueryableOpts func(*queryableOpts) @@ -50,6 +58,35 @@ func WithConcurrency(concurrency int) QueryableOpts { } } +// WithRowCountLimitFunc sets a callback function to get limit for matched row count. +func WithRowCountLimitFunc(fn search.QuotaLimitFunc) QueryableOpts { + return func(opts *queryableOpts) { + opts.rowCountLimitFunc = fn + } +} + +// WithChunkBytesLimitFunc sets a callback function to get limit for chunk column page bytes fetched. +func WithChunkBytesLimitFunc(fn search.QuotaLimitFunc) QueryableOpts { + return func(opts *queryableOpts) { + opts.chunkBytesLimitFunc = fn + } +} + +// WithDataBytesLimitFunc sets a callback function to get limit for data (including label and chunk) +// column page bytes fetched. +func WithDataBytesLimitFunc(fn search.QuotaLimitFunc) QueryableOpts { + return func(opts *queryableOpts) { + opts.dataBytesLimitFunc = fn + } +} + +// WithMaterializedSeriesCallback sets a callback function to process the materialized series. +func WithMaterializedSeriesCallback(fn search.MaterializedSeriesFunc) QueryableOpts { + return func(opts *queryableOpts) { + opts.materializedSeriesCallback = fn + } +} + type parquetQueryable struct { shardsFinder ShardsFinderFunction d *schema.PrometheusParquetChunksDecoder @@ -191,8 +228,11 @@ func (p parquetQuerier) queryableShards(ctx context.Context, mint, maxt int64) ( return nil, err } qBlocks := make([]*queryableShard, len(shards)) + rowCountQuota := search.NewQuota(p.opts.rowCountLimitFunc(ctx)) + chunkBytesQuota := search.NewQuota(p.opts.chunkBytesLimitFunc(ctx)) + dataBytesQuota := search.NewQuota(p.opts.dataBytesLimitFunc(ctx)) for i, shard := range shards { - qb, err := newQueryableShard(p.opts, shard, p.d) + qb, err := newQueryableShard(p.opts, shard, p.d, rowCountQuota, chunkBytesQuota, dataBytesQuota) if err != nil { return nil, err } @@ -207,12 +247,12 @@ type queryableShard struct { concurrency int } -func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder) (*queryableShard, error) { +func newQueryableShard(opts *queryableOpts, block storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*queryableShard, error) { s, err := block.TSDBSchema() if err != nil { return nil, err } - m, err := search.NewMaterializer(s, d, block, opts.concurrency) + m, err := search.NewMaterializer(s, d, block, opts.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, opts.materializedSeriesCallback) if err != nil { return nil, err } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/limits.go b/vendor/github.com/prometheus-community/parquet-common/search/limits.go new file mode 100644 index 0000000000..dbb96aada9 --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/search/limits.go @@ -0,0 +1,82 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) The Thanos Authors. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +// This package is a modified copy from +// https://github.com/thanos-io/thanos-parquet-gateway/blob/cfc1279f605d1c629c4afe8b1e2a340e8b15ecdc/internal/limits/limit.go. + +package search + +import ( + "context" + "errors" + "fmt" + "sync" +) + +type resourceExhausted struct { + used int64 +} + +func (re *resourceExhausted) Error() string { + return fmt.Sprintf("resource exhausted (used %d)", re.used) +} + +// IsResourceExhausted checks if the error is a resource exhausted error. +func IsResourceExhausted(err error) bool { + var re *resourceExhausted + return errors.As(err, &re) +} + +// Quota is a limiter for a resource. +type Quota struct { + mu sync.Mutex + q int64 + u int64 +} + +// NewQuota creates a new quota with the given limit. +func NewQuota(n int64) *Quota { + return &Quota{q: n, u: n} +} + +// UnlimitedQuota creates a new quota with no limit. +func UnlimitedQuota() *Quota { + return NewQuota(0) +} + +func (q *Quota) Reserve(n int64) error { + if q.q == 0 { + return nil + } + + q.mu.Lock() + defer q.mu.Unlock() + + if q.u-n < 0 { + return &resourceExhausted{used: q.q} + } + q.u -= n + return nil +} + +// QuotaLimitFunc is a function that returns the limit value. +type QuotaLimitFunc func(ctx context.Context) int64 + +// NoopQuotaLimitFunc returns 0 which means no limit. +func NoopQuotaLimitFunc(ctx context.Context) int64 { + return 0 +} diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index 51538182b0..2f485503e3 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io" + "iter" "maps" "slices" "sync" @@ -43,12 +44,31 @@ type Materializer struct { concurrency int dataColToIndex []int + + rowCountQuota *Quota + chunkBytesQuota *Quota + dataBytesQuota *Quota + + materializedSeriesCallback MaterializedSeriesFunc +} + +// MaterializedSeriesFunc is a callback function that can be used to add limiter or statistic logics for +// materialized series. +type MaterializedSeriesFunc func(ctx context.Context, series []prom_storage.ChunkSeries) error + +// NoopMaterializedSeriesFunc is a noop callback function that does nothing. +func NoopMaterializedSeriesFunc(_ context.Context, _ []prom_storage.ChunkSeries) error { + return nil } func NewMaterializer(s *schema.TSDBSchema, d *schema.PrometheusParquetChunksDecoder, block storage.ParquetShard, concurrency int, + rowCountQuota *Quota, + chunkBytesQuota *Quota, + dataBytesQuota *Quota, + materializeSeriesCallback MaterializedSeriesFunc, ) (*Materializer, error) { colIdx, ok := block.LabelsFile().Schema().Lookup(schema.ColIndexes) if !ok { @@ -66,19 +86,26 @@ func NewMaterializer(s *schema.TSDBSchema, } return &Materializer{ - s: s, - d: d, - b: block, - colIdx: colIdx.ColumnIndex, - concurrency: concurrency, - partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize), - dataColToIndex: dataColToIndex, + s: s, + d: d, + b: block, + colIdx: colIdx.ColumnIndex, + concurrency: concurrency, + partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize), + dataColToIndex: dataColToIndex, + rowCountQuota: rowCountQuota, + chunkBytesQuota: chunkBytesQuota, + dataBytesQuota: dataBytesQuota, + materializedSeriesCallback: materializeSeriesCallback, }, nil } // Materialize reconstructs the ChunkSeries that belong to the specified row ranges (rr). // It uses the row group index (rgi) and time bounds (mint, maxt) to filter and decode the series. func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int64, skipChunks bool, rr []RowRange) ([]prom_storage.ChunkSeries, error) { + if err := m.checkRowCountQuota(rr); err != nil { + return nil, err + } sLbls, err := m.materializeAllLabels(ctx, rgi, rr) if err != nil { return nil, errors.Wrapf(err, "error materializing labels") @@ -106,6 +133,10 @@ func (m *Materializer) Materialize(ctx context.Context, rgi int, mint, maxt int6 return len(cs.(*concreteChunksSeries).chks) == 0 }) } + + if err := m.materializedSeriesCallback(ctx, results); err != nil { + return nil, err + } return results, err } @@ -125,7 +156,7 @@ func (m *Materializer) MaterializeAllLabelNames() []string { func (m *Materializer) MaterializeLabelNames(ctx context.Context, rgi int, rr []RowRange) ([]string, error) { labelsRg := m.b.LabelsFile().RowGroups()[rgi] cc := labelsRg.ColumnChunks()[m.colIdx] - colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) + colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } @@ -164,7 +195,7 @@ func (m *Materializer) MaterializeLabelValues(ctx context.Context, name string, return []string{}, nil } cc := labelsRg.ColumnChunks()[cIdx.ColumnIndex] - values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) + values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } @@ -208,7 +239,7 @@ func (m *Materializer) MaterializeAllLabelValues(ctx context.Context, name strin func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []RowRange) ([][]labels.Label, error) { labelsRg := m.b.LabelsFile().RowGroups()[rgi] cc := labelsRg.ColumnChunks()[m.colIdx] - colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) + colsIdxs, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) if err != nil { return nil, errors.Wrap(err, "materializer failed to materialize columns") } @@ -232,7 +263,7 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R for cIdx, v := range colsMap { errGroup.Go(func() error { cc := labelsRg.ColumnChunks()[cIdx] - values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr) + values, err := m.materializeColumn(ctx, m.b.LabelsFile(), rgi, cc, rr, false) if err != nil { return errors.Wrap(err, "failed to materialize labels values") } @@ -279,7 +310,7 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max r := make([][]chunks.Meta, totalRows(rr)) for i := minDataCol; i <= min(maxDataCol, len(m.dataColToIndex)-1); i++ { - values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rgi, rg.ColumnChunks()[m.dataColToIndex[i]], rr) + values, err := m.materializeColumn(ctx, m.b.ChunksFile(), rgi, rg.ColumnChunks()[m.dataColToIndex[i]], rr, true) if err != nil { return r, err } @@ -296,7 +327,7 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max return r, nil } -func (m *Materializer) materializeColumn(ctx context.Context, file *storage.ParquetFile, rgi int, cc parquet.ColumnChunk, rr []RowRange) ([]parquet.Value, error) { +func (m *Materializer) materializeColumn(ctx context.Context, file *storage.ParquetFile, rgi int, cc parquet.ColumnChunk, rr []RowRange, chunkColumn bool) ([]parquet.Value, error) { if len(rr) == 0 { return nil, nil } @@ -331,6 +362,9 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq } } } + if err := m.checkBytesQuota(maps.Keys(pagesToRowsMap), oidx, chunkColumn); err != nil { + return nil, err + } pageRanges := m.coalescePageRanges(pagesToRowsMap, oidx) @@ -464,6 +498,34 @@ func (m *Materializer) coalescePageRanges(pagedIdx map[int][]RowRange, offset pa return r } +func (m *Materializer) checkRowCountQuota(rr []RowRange) error { + if err := m.rowCountQuota.Reserve(totalRows(rr)); err != nil { + return fmt.Errorf("would fetch too many rows: %w", err) + } + return nil +} + +func (m *Materializer) checkBytesQuota(pages iter.Seq[int], oidx parquet.OffsetIndex, chunkColumn bool) error { + total := totalBytes(pages, oidx) + if chunkColumn { + if err := m.chunkBytesQuota.Reserve(total); err != nil { + return fmt.Errorf("would fetch too many chunk bytes: %w", err) + } + } + if err := m.dataBytesQuota.Reserve(total); err != nil { + return fmt.Errorf("would fetch too many data bytes: %w", err) + } + return nil +} + +func totalBytes(pages iter.Seq[int], oidx parquet.OffsetIndex) int64 { + res := int64(0) + for i := range pages { + res += oidx.CompressedPageSize(i) + } + return res +} + type valuesIterator struct { p parquet.Page diff --git a/vendor/modules.txt b/vendor/modules.txt index 58a73b0822..8f3f592537 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -947,7 +947,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 +# github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable