Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
646 changes: 646 additions & 0 deletions integration/parquet_gateway_test.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/cortex/cortex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func TestCortex(t *testing.T) {
IndexCache: tsdb.IndexCacheConfig{
Backend: tsdb.IndexCacheBackendInMemory,
},
BucketStoreType: string(tsdb.TSDBBucketStore),
},
UsersScanner: tsdb.UsersScannerConfig{
Strategy: tsdb.UserScanStrategyList,
Expand Down
19 changes: 19 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (
ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy")
ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode")
ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio = errors.New("lazy expanded posting group max key series ratio needs to be equal or greater than 0")
ErrInvalidBucketStoreType = errors.New("invalid bucket store type")
)

// BlocksStorageConfig holds the config information for the blocks storage.
Expand Down Expand Up @@ -292,6 +293,7 @@ type BucketStoreConfig struct {
IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"`
BucketStoreType string `yaml:"bucket_store_type"`

// Chunk pool.
MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"`
Expand Down Expand Up @@ -378,6 +380,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&cfg.LazyExpandedPostingGroupMaxKeySeriesRatio, "blocks-storage.bucket-store.lazy-expanded-posting-group-max-key-series-ratio", 100, "Mark posting group as lazy if it fetches more keys than R * max series the query should fetch. With R set to 100, a posting group which fetches 100K keys will be marked as lazy if the current query only fetches 1000 series. This config is only valid if lazy expanded posting is enabled. 0 disables the limit.")
f.IntVar(&cfg.SeriesBatchSize, "blocks-storage.bucket-store.series-batch-size", store.SeriesBatchSize, "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.")
f.StringVar(&cfg.BlockDiscoveryStrategy, "blocks-storage.bucket-store.block-discovery-strategy", string(ConcurrentDiscovery), "One of "+strings.Join(supportedBlockDiscoveryStrategies, ", ")+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations. bucket_index strategy can be used in Compactor only and utilizes the existing bucket index to fetch block IDs to sync. This avoids iterating the bucket but can be impacted by delays of cleaner creating bucket index.")
f.StringVar(&cfg.BucketStoreType, "blocks-storage.bucket-store.bucket-store-type", "tsdb", "Type of bucket store to use (tsdb or parquet).")
f.StringVar(&cfg.TokenBucketBytesLimiter.Mode, "blocks-storage.bucket-store.token-bucket-bytes-limiter.mode", string(TokenBucketBytesLimiterDisabled), fmt.Sprintf("Token bucket bytes limiter mode. Supported values are: %s", strings.Join(supportedTokenBucketBytesLimiterModes, ", ")))
f.Int64Var(&cfg.TokenBucketBytesLimiter.InstanceTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size", int64(820*units.Mebibyte), "Instance token bucket size")
f.Int64Var(&cfg.TokenBucketBytesLimiter.UserTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size", int64(615*units.Mebibyte), "User token bucket size")
Expand Down Expand Up @@ -415,6 +418,9 @@ func (cfg *BucketStoreConfig) Validate() error {
if !util.StringsContain(supportedTokenBucketBytesLimiterModes, cfg.TokenBucketBytesLimiter.Mode) {
return ErrInvalidTokenBucketBytesLimiterMode
}
if !util.StringsContain(supportedBucketStoreTypes, cfg.BucketStoreType) {
return ErrInvalidBucketStoreType
}
if cfg.LazyExpandedPostingGroupMaxKeySeriesRatio < 0 {
return ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio
}
Expand Down Expand Up @@ -450,6 +456,19 @@ var supportedBlockDiscoveryStrategies = []string{
string(BucketIndexDiscovery),
}

// BucketStoreType represents the type of bucket store
type BucketStoreType string

const (
TSDBBucketStore BucketStoreType = "tsdb"
ParquetBucketStore BucketStoreType = "parquet"
)

var supportedBucketStoreTypes = []string{
string(TSDBBucketStore),
string(ParquetBucketStore),
}

type TokenBucketBytesLimiterMode string

const (
Expand Down
89 changes: 45 additions & 44 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,15 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

// BucketStores is a multi-tenant wrapper of Thanos BucketStore.
type BucketStores struct {
// BucketStores defines the methods that any bucket stores implementation must provide
type BucketStores interface {
storepb.StoreServer
SyncBlocks(ctx context.Context) error
InitialSync(ctx context.Context) error
}

// ThanosBucketStores is a multi-tenant wrapper of Thanos BucketStore.
type ThanosBucketStores struct {
logger log.Logger
cfg tsdb.BlocksStorageConfig
limits *validation.Overrides
Expand Down Expand Up @@ -74,7 +81,7 @@ type BucketStores struct {
storesMu sync.RWMutex
stores map[string]*store.BucketStore

// Keeps the last sync error for the bucket store for each tenant.
// Keeps the last sync error for the bucket store for each tenant.
storesErrorsMu sync.RWMutex
storesErrors map[string]error

Expand All @@ -86,8 +93,7 @@ type BucketStores struct {
userScanner users.Scanner

// Keeps number of inflight requests
inflightRequestCnt int
inflightRequestMu sync.RWMutex
inflightRequests *util.InflightRequestTracker

// Metrics.
syncTimes prometheus.Histogram
Expand All @@ -99,7 +105,19 @@ type BucketStores struct {
var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway")

// NewBucketStores makes a new BucketStores.
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (BucketStores, error) {
switch cfg.BucketStore.BucketStoreType {
case string(tsdb.ParquetBucketStore):
return newParquetBucketStores(cfg, bucketClient, limits, logger, reg)
case string(tsdb.TSDBBucketStore):
return newThanosBucketStores(cfg, shardingStrategy, bucketClient, limits, logLevel, logger, reg)
default:
return nil, fmt.Errorf("unsupported bucket store type: %s", cfg.BucketStore.BucketStoreType)
}
}

// newThanosBucketStores creates a new TSDB-based bucket stores
func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*ThanosBucketStores, error) {
matchers := tsdb.NewMatchers()
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, tsdb.ParquetLabelsCacheConfig{}, matchers, bucketClient, logger, reg)
if err != nil {
Expand All @@ -114,7 +132,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
Help: "Number of maximum concurrent queries allowed.",
}).Set(float64(cfg.BucketStore.MaxConcurrent))

u := &BucketStores{
u := &ThanosBucketStores{
logger: logger,
cfg: cfg,
limits: limits,
Expand All @@ -128,6 +146,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
queryGate: queryGate,
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
userTokenBuckets: make(map[string]*util.TokenBucket),
inflightRequests: util.NewInflightRequestTracker(),
syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_bucket_stores_blocks_sync_seconds",
Help: "The total time it takes to perform a sync stores",
Expand Down Expand Up @@ -187,7 +206,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
}

// InitialSync does an initial synchronization of blocks for all users.
func (u *BucketStores) InitialSync(ctx context.Context) error {
func (u *ThanosBucketStores) InitialSync(ctx context.Context) error {
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")

if err := u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, s *store.BucketStore) error {
Expand All @@ -202,13 +221,13 @@ func (u *BucketStores) InitialSync(ctx context.Context) error {
}

// SyncBlocks synchronizes the stores state with the Bucket store for every user.
func (u *BucketStores) SyncBlocks(ctx context.Context) error {
func (u *ThanosBucketStores) SyncBlocks(ctx context.Context) error {
return u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, s *store.BucketStore) error {
return s.SyncBlocks(ctx)
})
}

func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *store.BucketStore) error) error {
func (u *ThanosBucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *store.BucketStore) error) error {
retries := backoff.New(ctx, backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
Expand All @@ -232,7 +251,7 @@ func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(co
return lastErr
}

func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *store.BucketStore) error) (returnErr error) {
func (u *ThanosBucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *store.BucketStore) error) (returnErr error) {
defer func(start time.Time) {
u.syncTimes.Observe(time.Since(start).Seconds())
if returnErr == nil {
Expand Down Expand Up @@ -330,7 +349,7 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte
}

// Series makes a series request to the underlying user bucket store.
func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
func (u *ThanosBucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
spanLog, spanCtx := spanlogger.New(srv.Context(), "BucketStores.Series")
defer spanLog.Finish()

Expand All @@ -356,12 +375,12 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri

maxInflightRequests := u.cfg.BucketStore.MaxInflightRequests
if maxInflightRequests > 0 {
if u.getInflightRequestCnt() >= maxInflightRequests {
if u.inflightRequests.Count() >= maxInflightRequests {
return ErrTooManyInflightRequests
}

u.incrementInflightRequestCnt()
defer u.decrementInflightRequestCnt()
u.inflightRequests.Inc()
defer u.inflightRequests.Dec()
}

err = store.Series(req, spanSeriesServer{
Expand All @@ -372,26 +391,8 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
return err
}

func (u *BucketStores) getInflightRequestCnt() int {
u.inflightRequestMu.RLock()
defer u.inflightRequestMu.RUnlock()
return u.inflightRequestCnt
}

func (u *BucketStores) incrementInflightRequestCnt() {
u.inflightRequestMu.Lock()
u.inflightRequestCnt++
u.inflightRequestMu.Unlock()
}

func (u *BucketStores) decrementInflightRequestCnt() {
u.inflightRequestMu.Lock()
u.inflightRequestCnt--
u.inflightRequestMu.Unlock()
}

// LabelNames implements the Storegateway proto service.
func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
func (u *ThanosBucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames")
defer spanLog.Finish()

Expand Down Expand Up @@ -421,7 +422,7 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe
}

// LabelValues implements the Storegateway proto service.
func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
func (u *ThanosBucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelValues")
defer spanLog.Finish()

Expand Down Expand Up @@ -450,7 +451,7 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues

// scanUsers in the bucket and return the list of found users. It includes active and deleting users
// but not deleted users.
func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) {
func (u *ThanosBucketStores) scanUsers(ctx context.Context) ([]string, error) {
activeUsers, deletingUsers, _, err := u.userScanner.ScanUsers(ctx)
if err != nil {
return nil, err
Expand All @@ -477,13 +478,13 @@ func deduplicateUsers(users []string) []string {
return uniqueUsers
}

func (u *BucketStores) getStore(userID string) *store.BucketStore {
func (u *ThanosBucketStores) getStore(userID string) *store.BucketStore {
u.storesMu.RLock()
defer u.storesMu.RUnlock()
return u.stores[userID]
}

func (u *BucketStores) getStoreError(userID string) error {
func (u *ThanosBucketStores) getStoreError(userID string) error {
u.storesErrorsMu.RLock()
defer u.storesErrorsMu.RUnlock()
return u.storesErrors[userID]
Expand All @@ -499,7 +500,7 @@ var (
// If bucket store doesn't exist, returns errBucketStoreNotFound.
// If bucket store is not empty, returns errBucketStoreNotEmpty.
// Otherwise returns error from closing the bucket store.
func (u *BucketStores) closeEmptyBucketStore(userID string) error {
func (u *ThanosBucketStores) closeEmptyBucketStore(userID string) error {
u.storesMu.Lock()
unlockInDefer := true
defer func() {
Expand Down Expand Up @@ -537,11 +538,11 @@ func isEmptyBucketStore(bs *store.BucketStore) bool {
return min == math.MaxInt64 && max == math.MinInt64
}

func (u *BucketStores) syncDirForUser(userID string) string {
func (u *ThanosBucketStores) syncDirForUser(userID string) string {
return filepath.Join(u.cfg.BucketStore.SyncDir, userID)
}

func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) {
func (u *ThanosBucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) {
// Check if the store already exists.
bs := u.getStore(userID)
if bs != nil {
Expand Down Expand Up @@ -721,7 +722,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro

// deleteLocalFilesForExcludedTenants removes local "sync" directories for tenants that are not included in the current
// shard.
func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[string]struct{}) {
func (u *ThanosBucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[string]struct{}) {
files, err := os.ReadDir(u.cfg.BucketStore.SyncDir)
if err != nil {
return
Expand Down Expand Up @@ -760,13 +761,13 @@ func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[str
}
}

func (u *BucketStores) getUserTokenBucket(userID string) *util.TokenBucket {
func (u *ThanosBucketStores) getUserTokenBucket(userID string) *util.TokenBucket {
u.userTokenBucketsMu.RLock()
defer u.userTokenBucketsMu.RUnlock()
return u.userTokenBuckets[userID]
}

func (u *BucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 {
func (u *ThanosBucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 {
tokensToRetrieve := float64(tokens)
switch dataType {
case store.PostingsFetched:
Expand Down
Loading
Loading