diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cfcf56943..d8b0c27c1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ * [BUGFIX] Ruler: Prevent counting 2xx and 4XX responses as failed writes. #6785 * [BUGFIX] Ingester: Allow shipper to skip corrupted blocks. #6786 * [BUGFIX] Compactor: Delete the prefix `blocks_meta` from the metadata fetcher metrics. #6832 +* [BUGFIX] Store Gateway: Avoid race condition by deduplicating entries in bucket stores user scan. #6863 ## 1.19.0 2025-02-27 diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index f49422f3bd..b9da057ae2 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -458,10 +458,25 @@ func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) { users := make([]string, 0, len(activeUsers)+len(deletingUsers)) users = append(users, activeUsers...) users = append(users, deletingUsers...) + users = deduplicateUsers(users) return users, err } +func deduplicateUsers(users []string) []string { + seen := make(map[string]struct{}, len(users)) + var uniqueUsers []string + + for _, user := range users { + if _, ok := seen[user]; !ok { + seen[user] = struct{}{} + uniqueUsers = append(uniqueUsers, user) + } + } + + return uniqueUsers +} + func (u *BucketStores) getStore(userID string) *store.BucketStore { u.storesMu.RLock() defer u.storesMu.RUnlock() diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 5b56408dba..69c018ccfa 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -454,6 +454,38 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) { } } +func TestBucketStores_scanUsers(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + scanner *mockScanner + expectedRes []string + }{ + "should return unique users only": { + scanner: &mockScanner{ + res: []string{"user-1", "user-2", "user-1"}, + }, + expectedRes: []string{"user-1", "user-2"}, + }, + } + + for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + + stores := &BucketStores{ + userScanner: testData.scanner, + } + + users, err := stores.scanUsers(context.Background()) + + assert.NoError(t, err) + assert.ElementsMatch(t, testData.expectedRes, users) + }) + } +} + func TestBucketStores_Series_ShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *testing.T) { for _, lazyLoadingEnabled := range []bool{true, false} { t.Run(fmt.Sprintf("lazy loading enabled = %v", lazyLoadingEnabled), func(t *testing.T) { @@ -996,3 +1028,11 @@ func (f *failFirstGetBucket) Get(ctx context.Context, name string) (io.ReadClose return f.Bucket.Get(ctx, name) } + +type mockScanner struct { + res []string +} + +func (m *mockScanner) ScanUsers(_ context.Context) (active, deleting, deleted []string, err error) { + return m.res, nil, nil, nil +}