Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
* [ENHANCEMENT] API: add request ID injection to context to enable tracking requests across downstream services. #6895
* [ENHANCEMENT] gRPC: Add gRPC Channelz monitoring. #6950
* [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976
* [ENHANCEMENT] Implement versioned transactions for batch writes to Ring DynamoDB. #6986
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
19 changes: 13 additions & 6 deletions pkg/ring/kv/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,12 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
continue
}

putRequests := map[dynamodbKey][]byte{}
putRequests := map[dynamodbKey]dynamodbItem{}
for childKey, bytes := range buf {
putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = bytes
putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = dynamodbItem{
data: bytes,
version: resp[childKey].version,
}
}

deleteRequests := make([]dynamodbKey, 0, len(toDelete))
Expand Down Expand Up @@ -273,8 +276,8 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
continue
}

for key, bytes := range out {
decoded, err := c.codec.Decode(bytes)
for key, ddbItem := range out {
decoded, err := c.codec.Decode(ddbItem.data)
if err != nil {
level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err)
continue
Expand All @@ -293,8 +296,12 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
}
}

func (c *Client) decodeMultikey(data map[string][]byte) (codec.MultiKey, error) {
res, err := c.codec.DecodeMultiKey(data)
func (c *Client) decodeMultikey(data map[string]dynamodbItem) (codec.MultiKey, error) {
multiKeyData := make(map[string][]byte, len(data))
for key, ddbItem := range data {
multiKeyData[key] = ddbItem.data
}
res, err := c.codec.DecodeMultiKey(multiKeyData)
if err != nil {
return nil, err
}
Expand Down
64 changes: 32 additions & 32 deletions pkg/ring/kv/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func Test_CAS_ErrorNoRetry(t *testing.T) {
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
expectedErr := errors.Errorf("test")

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Twice()
descMock.On("Clone").Return(descMock).Once()

Expand All @@ -52,9 +52,9 @@ func Test_CAS_Backoff(t *testing.T) {
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
expectedErr := errors.Errorf("test")

ddbMock.On("Query").Return(map[string][]byte{}, expectedErr).Once()
ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Batch", context.TODO(), map[dynamodbKey][]byte{}, []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}}).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, expectedErr).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
ddbMock.On("Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}}).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Twice()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMock).Return(descMock, []string{"childkey"}, nil).Once()
Expand All @@ -78,7 +78,7 @@ func Test_CAS_Failed(t *testing.T) {
descMock := &DescMock{}
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, config)

ddbMock.On("Query").Return(map[string][]byte{}, errors.Errorf("test"))
ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("test"))

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
Expand All @@ -98,12 +98,12 @@ func Test_CAS_Update(t *testing.T) {
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
}
expectedBatch := map[dynamodbKey][]byte{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
expectedBatch := map[dynamodbKey]dynamodbItem{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])},
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])},
}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMock).Return(descMock, []string{}, nil).Once()
Expand All @@ -130,20 +130,20 @@ func Test_CAS_Delete(t *testing.T) {
{primaryKey: key, sortKey: expectedToDelete[1]},
}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once()
codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch)
ddbMock.On("Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch)

err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return descMock, true, nil
})

require.NoError(t, err)
ddbMock.AssertNumberOfCalls(t, "Batch", 1)
ddbMock.AssertCalled(t, "Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch)
ddbMock.AssertCalled(t, "Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch)
}

func Test_CAS_Update_Delete(t *testing.T) {
Expand All @@ -156,17 +156,17 @@ func Test_CAS_Update_Delete(t *testing.T) {
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
}
expectedUpdateBatch := map[dynamodbKey][]byte{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
expectedUpdateBatch := map[dynamodbKey]dynamodbItem{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])},
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])},
}
expectedToDelete := []string{"test", "test2"}
expectedDeleteBatch := []dynamodbKey{
{primaryKey: key, sortKey: expectedToDelete[0]},
{primaryKey: key, sortKey: expectedToDelete[1]},
}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once()
Expand All @@ -189,7 +189,7 @@ func Test_WatchKey(t *testing.T) {
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), 1*time.Second, defaultBackoff)
timesCalled := 0

ddbMock.On("Query").Return(map[string][]byte{}, nil)
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil)
codecMock.On("DecodeMultiKey").Return(descMock, nil)

c.WatchKey(context.TODO(), key, func(i interface{}) bool {
Expand All @@ -207,7 +207,7 @@ func Test_WatchKey_UpdateStale(t *testing.T) {
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
staleData := &DescMock{}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(staleData, nil)

c.WatchKey(context.TODO(), key, func(i interface{}) bool {
Expand All @@ -217,7 +217,7 @@ func Test_WatchKey_UpdateStale(t *testing.T) {
return false
})

ddbMock.On("Query").Return(map[string][]byte{}, errors.Errorf("failed"))
ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("failed"))
staleData.On("Clone").Return(staleData).Once()

c.WatchKey(context.TODO(), key, func(i interface{}) bool {
Expand All @@ -241,12 +241,12 @@ func Test_CAS_UpdateStale(t *testing.T) {
expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]),
expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]),
}
expectedBatch := map[dynamodbKey][]byte{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]),
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]),
expectedBatch := map[dynamodbKey]dynamodbItem{
{primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])},
{primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])},
}

ddbMock.On("Query").Return(map[string][]byte{}, nil).Once()
ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once()
codecMock.On("DecodeMultiKey").Return(descMock, nil).Once()
descMock.On("Clone").Return(descMock).Once()
descMock.On("FindDifference", descMockResult).Return(descMockResult, []string{}, nil).Once()
Expand All @@ -266,17 +266,17 @@ func Test_WatchPrefix(t *testing.T) {
ddbMock := NewDynamodbClientMock()
codecMock := &CodecMock{}
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff)
data := map[string][]byte{}
data := map[string]dynamodbItem{}
dataKey := []string{"t1", "t2"}
data[dataKey[0]] = []byte(dataKey[0])
data[dataKey[1]] = []byte(dataKey[1])
data[dataKey[0]] = dynamodbItem{data: []byte(dataKey[0])}
data[dataKey[1]] = dynamodbItem{data: []byte(dataKey[1])}
calls := 0

ddbMock.On("Query").Return(data, nil)
codecMock.On("Decode").Twice()

c.WatchPrefix(context.TODO(), key, func(key string, i interface{}) bool {
require.EqualValues(t, string(data[key]), i)
require.EqualValues(t, string(data[key].data), i)
delete(data, key)
calls++
return calls < 2
Expand Down Expand Up @@ -358,13 +358,13 @@ func (m *MockDynamodbClient) List(context.Context, dynamodbKey) ([]string, float
}
return args.Get(0).([]string), 0, err
}
func (m *MockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string][]byte, float64, error) {
func (m *MockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string]dynamodbItem, float64, error) {
args := m.Called()
var err error
if args.Get(1) != nil {
err = args.Get(1).(error)
}
return args.Get(0).(map[string][]byte), 0, err
return args.Get(0).(map[string]dynamodbItem), 0, err
}
func (m *MockDynamodbClient) Delete(ctx context.Context, key dynamodbKey) error {
m.Called(ctx, key)
Expand All @@ -374,7 +374,7 @@ func (m *MockDynamodbClient) Put(ctx context.Context, key dynamodbKey, data []by
m.Called(ctx, key, data)
return nil
}
func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) error {
m.Called(ctx, put, delete)
return nil
}
Expand Down Expand Up @@ -471,7 +471,7 @@ func (d *dynamodbKVWithDelayAndContextCheck) List(ctx context.Context, key dynam
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) {
func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) {
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
Expand All @@ -498,7 +498,7 @@ func (d *dynamodbKVWithDelayAndContextCheck) Put(ctx context.Context, key dynamo
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) error {
select {
case <-ctx.Done():
return ctx.Err()
Expand Down
Loading
Loading