Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
* [ENHANCEMENT] API: add request ID injection to context to enable tracking requests across downstream services. #6895
* [ENHANCEMENT] gRPC: Add gRPC Channelz monitoring. #6950
* [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976
* [ENHANCEMENT] Implement versioned transactions for writes to DynamoDB ring. #6986
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
31 changes: 23 additions & 8 deletions pkg/ring/kv/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,16 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
continue
}

putRequests := map[dynamodbKey][]byte{}
putRequests := map[dynamodbKey]dynamodbItem{}
for childKey, bytes := range buf {
putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = bytes
version := int64(0)
if ddbItem, ok := resp[childKey]; ok {
version = ddbItem.version
}
putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = dynamodbItem{
data: bytes,
version: version,
}
}

deleteRequests := make([]dynamodbKey, 0, len(toDelete))
Expand All @@ -196,9 +203,13 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
}

if len(putRequests) > 0 || len(deleteRequests) > 0 {
err = c.kv.Batch(ctx, putRequests, deleteRequests)
retry, err := c.kv.Batch(ctx, putRequests, deleteRequests)
if err != nil {
return err
if !retry {
return err
}
bo.Wait()
continue
}
c.updateStaleData(key, r, time.Now().UTC())
return nil
Expand Down Expand Up @@ -273,8 +284,8 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
continue
}

for key, bytes := range out {
decoded, err := c.codec.Decode(bytes)
for key, ddbItem := range out {
decoded, err := c.codec.Decode(ddbItem.data)
if err != nil {
level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err)
continue
Expand All @@ -293,8 +304,12 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
}
}

func (c *Client) decodeMultikey(data map[string][]byte) (codec.MultiKey, error) {
res, err := c.codec.DecodeMultiKey(data)
func (c *Client) decodeMultikey(data map[string]dynamodbItem) (codec.MultiKey, error) {
multiKeyData := make(map[string][]byte, len(data))
for key, ddbItem := range data {
multiKeyData[key] = ddbItem.data
}
res, err := c.codec.DecodeMultiKey(multiKeyData)
if err != nil {
return nil, err
}
Expand Down
143 changes: 91 additions & 52 deletions pkg/ring/kv/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func Test_CAS_ErrorNoRetry(t *testing.T) {
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
expectedErr := errors.Errorf("test")

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Twice()
descMock.On("Clone").Return(descMock).Once()

Expand All @@ -46,25 +46,60 @@ func Test_CAS_ErrorNoRetry(t *testing.T) {
}

func Test_CAS_Backoff(t *testing.T) {
ddbMock := NewDynamodbClientMock()
codecMock := &CodecMock{}
descMock := &DescMock{}
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
expectedErr := errors.Errorf("test")
testCases := []struct {
name string
setupMocks func(*MockDynamodbClient, *CodecMock, *DescMock, map[dynamodbKey]dynamodbItem, []dynamodbKey)
expectedQueryCalls int
expectedBatchCalls int
}{
{
name: "query_fails_and_backs_off",
setupMocks: func(ddbMock *MockDynamodbClient, codecMock *CodecMock, descMock *DescMock, expectedBatch map[dynamodbKey]dynamodbItem, expectedDelete []dynamodbKey) {
ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("query failed")).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(false, nil).Once()
},
expectedQueryCalls: 2,
expectedBatchCalls: 1,
},
{
name: "batch_fails_and_backs_off",
setupMocks: func(ddbMock *MockDynamodbClient, codecMock *CodecMock, descMock *DescMock, expectedBatch map[dynamodbKey]dynamodbItem, expectedDelete []dynamodbKey) {
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Twice()
ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(true, errors.Errorf("batch failed")).Once()
ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(false, nil).Once()
},
expectedQueryCalls: 2,
expectedBatchCalls: 2,
},
}

ddbMock.On("Query").Return(map[string][]byte{}, expectedErr).Once()
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Batch", context.TODO(), map[dynamodbKey][]byte{}, []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}}).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Twice()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMock).Return(descMock, []string{"childkey"}, nil).Once()
codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Twice()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ddbMock := NewDynamodbClientMock()
codecMock := &CodecMock{}
descMock := &DescMock{}
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
})
expectedBatch := map[dynamodbKey]dynamodbItem{}
expectedDelete := []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}}

require.NoError(t, err)
tc.setupMocks(ddbMock, codecMock, descMock, expectedBatch, expectedDelete)

codecMock.On("DecodeMultiKey").Return(descMock, nil).Times(tc.expectedQueryCalls)
descMock.On("Clone").Return(descMock).Times(tc.expectedQueryCalls)
descMock.On("FindDifference", descMock).Return(descMock, []string{"childkey"}, nil).Times(tc.expectedBatchCalls)
codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Times(tc.expectedBatchCalls)

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
})

require.NoError(t, err)
ddbMock.AssertNumberOfCalls(t, "Query", tc.expectedQueryCalls)
ddbMock.AssertNumberOfCalls(t, "Batch", tc.expectedBatchCalls)
})
}
}

func Test_CAS_Failed(t *testing.T) {
Expand All @@ -78,7 +113,7 @@ func Test_CAS_Failed(t *testing.T) {
descMock := &DescMock{}
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, config)

ddbMock.On("Query").Return(map[string][]byte{}, errors.Errorf("test"))
ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("test"))

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
Expand All @@ -98,17 +133,17 @@ func Test_CAS_Update(t *testing.T) {
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
}
expectedBatch := map[dynamodbKey][]byte{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
expectedBatch := map[dynamodbKey]dynamodbItem{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])},
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])},
}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMock).Return(descMock, []string{}, nil).Once()
codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once()
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once()
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Return(false, nil).Once()

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
Expand All @@ -130,20 +165,20 @@ func Test_CAS_Delete(t *testing.T) {
{primaryKey: key, sortKey: expectedToDelete[1]},
}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once()
codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch)
ddbMock.On("Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch).Return(false, nil).Once()

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
})

require.NoError(t, err)
ddbMock.AssertNumberOfCalls(t, "Batch", 1)
ddbMock.AssertCalled(t, "Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch)
ddbMock.AssertCalled(t, "Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch)
}

func Test_CAS_Update_Delete(t *testing.T) {
Expand All @@ -156,22 +191,22 @@ func Test_CAS_Update_Delete(t *testing.T) {
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
}
expectedUpdateBatch := map[dynamodbKey][]byte{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
expectedUpdateBatch := map[dynamodbKey]dynamodbItem{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])},
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])},
}
expectedToDelete := []string{"test", "test2"}
expectedDeleteBatch := []dynamodbKey{
{primaryKey: key, sortKey: expectedToDelete[0]},
{primaryKey: key, sortKey: expectedToDelete[1]},
}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once()
codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once()
ddbMock.On("Batch", context.TODO(), expectedUpdateBatch, expectedDeleteBatch)
ddbMock.On("Batch", context.TODO(), expectedUpdateBatch, expectedDeleteBatch).Return(false, nil).Once()

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
Expand All @@ -189,7 +224,7 @@ func Test_WatchKey(t *testing.T) {
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), 1*time.Second, defaultBackoff)
timesCalled := 0

ddbMock.On("Query").Return(map[string][]byte{}, nil)
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil)
codecMock.On("DecodeMultiKey").Return(descMock, nil)

c.WatchKey(context.TODO(), key, func(i interface{}) bool {
Expand All @@ -207,7 +242,7 @@ func Test_WatchKey_UpdateStale(t *testing.T) {
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
staleData := &DescMock{}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(staleData, nil)

c.WatchKey(context.TODO(), key, func(i interface{}) bool {
Expand All @@ -217,7 +252,7 @@ func Test_WatchKey_UpdateStale(t *testing.T) {
return false
})

ddbMock.On("Query").Return(map[string][]byte{}, errors.Errorf("failed"))
ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("failed"))
staleData.On("Clone").Return(staleData).Once()

c.WatchKey(context.TODO(), key, func(i interface{}) bool {
Expand All @@ -241,17 +276,17 @@ func Test_CAS_UpdateStale(t *testing.T) {
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
}
expectedBatch := map[dynamodbKey][]byte{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
expectedBatch := map[dynamodbKey]dynamodbItem{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])},
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])},
}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMockResult).Return(descMockResult, []string{}, nil).Once()
codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once()
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once()
ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Return(false, nil).Once()

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMockResult, true, nil
Expand All @@ -266,17 +301,17 @@ func Test_WatchPrefix(t *testing.T) {
ddbMock := NewDynamodbClientMock()
codecMock := &CodecMock{}
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
data := map[string][]byte{}
data := map[string]dynamodbItem{}
dataKey := []string{"t1", "t2"}
data[dataKey[0]] = []byte(dataKey[0])
data[dataKey[1]] = []byte(dataKey[1])
data[dataKey[0]] = dynamodbItem{data: []byte(dataKey[0])}
data[dataKey[1]] = dynamodbItem{data: []byte(dataKey[1])}
calls := 0

ddbMock.On("Query").Return(data, nil)
codecMock.On("Decode").Twice()

c.WatchPrefix(context.TODO(), key, func(key string, i interface{}) bool {
require.EqualValues(t, string(data[key]), i)
require.EqualValues(t, string(data[key].data), i)
delete(data, key)
calls++
return calls < 2
Expand Down Expand Up @@ -321,7 +356,7 @@ func Test_DynamodbKVWithTimeout(t *testing.T) {
err = dbWithTimeout.Put(ctx, dynamodbKey{primaryKey: key}, []byte{})
require.True(t, errors.Is(err, context.DeadlineExceeded))

err = dbWithTimeout.Batch(ctx, nil, nil)
_, err = dbWithTimeout.Batch(ctx, nil, nil)
require.True(t, errors.Is(err, context.DeadlineExceeded))
}

Expand Down Expand Up @@ -358,13 +393,13 @@ func (m *MockDynamodbClient) List(context.Context, dynamodbKey) ([]string, float
}
return args.Get(0).([]string), 0, err
}
func (m *MockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string][]byte, float64, error) {
func (m *MockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string]dynamodbItem, float64, error) {
args := m.Called()
var err error
if args.Get(1) != nil {
err = args.Get(1).(error)
}
return args.Get(0).(map[string][]byte), 0, err
return args.Get(0).(map[string]dynamodbItem), 0, err
}
func (m *MockDynamodbClient) Delete(ctx context.Context, key dynamodbKey) error {
m.Called(ctx, key)
Expand All @@ -374,9 +409,13 @@ func (m *MockDynamodbClient) Put(ctx context.Context, key dynamodbKey, data []by
m.Called(ctx, key, data)
return nil
}
func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
m.Called(ctx, put, delete)
return nil
func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) {
args := m.Called(ctx, put, delete)
var err error
if args.Get(1) != nil {
err = args.Get(1).(error)
}
return args.Get(0).(bool), err
}

type TestLogger struct {
Expand Down Expand Up @@ -471,7 +510,7 @@ func (d *dynamodbKVWithDelayAndContextCheck) List(ctx context.Context, key dynam
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) {
func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) {
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
Expand All @@ -498,10 +537,10 @@ func (d *dynamodbKVWithDelayAndContextCheck) Put(ctx context.Context, key dynamo
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) {
select {
case <-ctx.Done():
return ctx.Err()
return false, ctx.Err()
case <-time.After(d.delay):
return d.ddbClient.Batch(ctx, put, delete)
}
Expand Down
Loading
Loading