diff --git a/CHANGELOG.md b/CHANGELOG.md index a02f8c2955c..6fc6b0c741d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,6 +73,7 @@ * [ENHANCEMENT] API: add request ID injection to context to enable tracking requests across downstream services. #6895 * [ENHANCEMENT] gRPC: Add gRPC Channelz monitoring. #6950 * [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976 +* [ENHANCEMENT] Implement versioned transactions for writes to DynamoDB ring. #6986 * [ENHANCEMENT] Add source metadata to requests(api vs ruler) #6947 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 diff --git a/pkg/ring/kv/dynamodb/client.go b/pkg/ring/kv/dynamodb/client.go index 0fb53294d1d..ba0d0387693 100644 --- a/pkg/ring/kv/dynamodb/client.go +++ b/pkg/ring/kv/dynamodb/client.go @@ -185,9 +185,16 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou continue } - putRequests := map[dynamodbKey][]byte{} + putRequests := map[dynamodbKey]dynamodbItem{} for childKey, bytes := range buf { - putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = bytes + version := int64(0) + if ddbItem, ok := resp[childKey]; ok { + version = ddbItem.version + } + putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = dynamodbItem{ + data: bytes, + version: version, + } } deleteRequests := make([]dynamodbKey, 0, len(toDelete)) @@ -196,9 +203,13 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou } if len(putRequests) > 0 || len(deleteRequests) > 0 { - err = c.kv.Batch(ctx, putRequests, deleteRequests) + retry, err := c.kv.Batch(ctx, putRequests, deleteRequests) if err != nil { - return err + if !retry { + return err + } + bo.Wait() + continue } c.updateStaleData(key, r, time.Now().UTC()) return nil @@ -273,8 +284,8 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, continue } - for key, bytes := range out { - decoded, err := c.codec.Decode(bytes) + for key, ddbItem := range out { + decoded, err := c.codec.Decode(ddbItem.data) if err != nil { level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err) continue @@ -293,8 +304,12 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, } } -func (c *Client) decodeMultikey(data map[string][]byte) (codec.MultiKey, error) { - res, err := c.codec.DecodeMultiKey(data) +func (c *Client) decodeMultikey(data map[string]dynamodbItem) (codec.MultiKey, error) { + multiKeyData := make(map[string][]byte, len(data)) + for key, ddbItem := range data { + multiKeyData[key] = ddbItem.data + } + res, err := c.codec.DecodeMultiKey(multiKeyData) if err != nil { return nil, err } diff --git a/pkg/ring/kv/dynamodb/client_test.go b/pkg/ring/kv/dynamodb/client_test.go index 7cefe64f752..f82a72de439 100644 --- a/pkg/ring/kv/dynamodb/client_test.go +++ b/pkg/ring/kv/dynamodb/client_test.go @@ -34,7 +34,7 @@ func Test_CAS_ErrorNoRetry(t *testing.T) { c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) expectedErr := errors.Errorf("test") - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() codecMock.On("DecodeMultiKey").Return(descMock, nil).Twice() descMock.On("Clone").Return(descMock).Once() @@ -46,25 +46,60 @@ func Test_CAS_ErrorNoRetry(t *testing.T) { } func Test_CAS_Backoff(t *testing.T) { - ddbMock := NewDynamodbClientMock() - codecMock := &CodecMock{} - descMock := &DescMock{} - c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) - expectedErr := errors.Errorf("test") + testCases := []struct { + name string + setupMocks func(*MockDynamodbClient, *CodecMock, *DescMock, map[dynamodbKey]dynamodbItem, []dynamodbKey) + expectedQueryCalls int + expectedBatchCalls int + }{ + { + name: "query_fails_and_backs_off", + setupMocks: func(ddbMock *MockDynamodbClient, codecMock *CodecMock, descMock *DescMock, expectedBatch map[dynamodbKey]dynamodbItem, expectedDelete []dynamodbKey) { + ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("query failed")).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() + ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(false, nil).Once() + }, + expectedQueryCalls: 2, + expectedBatchCalls: 1, + }, + { + name: "batch_fails_and_backs_off", + setupMocks: func(ddbMock *MockDynamodbClient, codecMock *CodecMock, descMock *DescMock, expectedBatch map[dynamodbKey]dynamodbItem, expectedDelete []dynamodbKey) { + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Twice() + ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(true, errors.Errorf("batch failed")).Once() + ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(false, nil).Once() + }, + expectedQueryCalls: 2, + expectedBatchCalls: 2, + }, + } - ddbMock.On("Query").Return(map[string][]byte{}, expectedErr).Once() - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() - ddbMock.On("Batch", context.TODO(), map[dynamodbKey][]byte{}, []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}}).Once() - codecMock.On("DecodeMultiKey").Return(descMock, nil).Twice() - descMock.On("Clone").Return(descMock).Once() - descMock.On("FindDifference", descMock).Return(descMock, []string{"childkey"}, nil).Once() - codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Twice() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ddbMock := NewDynamodbClientMock() + codecMock := &CodecMock{} + descMock := &DescMock{} + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) - err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { - return descMock, true, nil - }) + expectedBatch := map[dynamodbKey]dynamodbItem{} + expectedDelete := []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}} - require.NoError(t, err) + tc.setupMocks(ddbMock, codecMock, descMock, expectedBatch, expectedDelete) + + codecMock.On("DecodeMultiKey").Return(descMock, nil).Times(tc.expectedQueryCalls) + descMock.On("Clone").Return(descMock).Times(tc.expectedQueryCalls) + descMock.On("FindDifference", descMock).Return(descMock, []string{"childkey"}, nil).Times(tc.expectedBatchCalls) + codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Times(tc.expectedBatchCalls) + + err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { + return descMock, true, nil + }) + + require.NoError(t, err) + ddbMock.AssertNumberOfCalls(t, "Query", tc.expectedQueryCalls) + ddbMock.AssertNumberOfCalls(t, "Batch", tc.expectedBatchCalls) + }) + } } func Test_CAS_Failed(t *testing.T) { @@ -78,7 +113,7 @@ func Test_CAS_Failed(t *testing.T) { descMock := &DescMock{} c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, config) - ddbMock.On("Query").Return(map[string][]byte{}, errors.Errorf("test")) + ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("test")) err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { return descMock, true, nil @@ -98,17 +133,17 @@ func Test_CAS_Update(t *testing.T) { expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]), expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]), } - expectedBatch := map[dynamodbKey][]byte{ - {primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]), - {primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]), + expectedBatch := map[dynamodbKey]dynamodbItem{ + {primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])}, + {primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])}, } - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() codecMock.On("DecodeMultiKey").Return(descMock, nil).Once() descMock.On("Clone").Return(descMock).Once() descMock.On("FindDifference", descMock).Return(descMock, []string{}, nil).Once() codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once() - ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once() + ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Return(false, nil).Once() err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { return descMock, true, nil @@ -130,12 +165,12 @@ func Test_CAS_Delete(t *testing.T) { {primaryKey: key, sortKey: expectedToDelete[1]}, } - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() codecMock.On("DecodeMultiKey").Return(descMock, nil).Once() descMock.On("Clone").Return(descMock).Once() descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once() codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Once() - ddbMock.On("Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch) + ddbMock.On("Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch).Return(false, nil).Once() err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { return descMock, true, nil @@ -143,7 +178,7 @@ func Test_CAS_Delete(t *testing.T) { require.NoError(t, err) ddbMock.AssertNumberOfCalls(t, "Batch", 1) - ddbMock.AssertCalled(t, "Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch) + ddbMock.AssertCalled(t, "Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch) } func Test_CAS_Update_Delete(t *testing.T) { @@ -156,9 +191,9 @@ func Test_CAS_Update_Delete(t *testing.T) { expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]), expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]), } - expectedUpdateBatch := map[dynamodbKey][]byte{ - {primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]), - {primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]), + expectedUpdateBatch := map[dynamodbKey]dynamodbItem{ + {primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])}, + {primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])}, } expectedToDelete := []string{"test", "test2"} expectedDeleteBatch := []dynamodbKey{ @@ -166,12 +201,12 @@ func Test_CAS_Update_Delete(t *testing.T) { {primaryKey: key, sortKey: expectedToDelete[1]}, } - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() codecMock.On("DecodeMultiKey").Return(descMock, nil).Once() descMock.On("Clone").Return(descMock).Once() descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once() codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once() - ddbMock.On("Batch", context.TODO(), expectedUpdateBatch, expectedDeleteBatch) + ddbMock.On("Batch", context.TODO(), expectedUpdateBatch, expectedDeleteBatch).Return(false, nil).Once() err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { return descMock, true, nil @@ -189,7 +224,7 @@ func Test_WatchKey(t *testing.T) { c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), 1*time.Second, defaultBackoff) timesCalled := 0 - ddbMock.On("Query").Return(map[string][]byte{}, nil) + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil) codecMock.On("DecodeMultiKey").Return(descMock, nil) c.WatchKey(context.TODO(), key, func(i interface{}) bool { @@ -207,7 +242,7 @@ func Test_WatchKey_UpdateStale(t *testing.T) { c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) staleData := &DescMock{} - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() codecMock.On("DecodeMultiKey").Return(staleData, nil) c.WatchKey(context.TODO(), key, func(i interface{}) bool { @@ -217,7 +252,7 @@ func Test_WatchKey_UpdateStale(t *testing.T) { return false }) - ddbMock.On("Query").Return(map[string][]byte{}, errors.Errorf("failed")) + ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("failed")) staleData.On("Clone").Return(staleData).Once() c.WatchKey(context.TODO(), key, func(i interface{}) bool { @@ -241,17 +276,17 @@ func Test_CAS_UpdateStale(t *testing.T) { expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]), expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]), } - expectedBatch := map[dynamodbKey][]byte{ - {primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]), - {primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]), + expectedBatch := map[dynamodbKey]dynamodbItem{ + {primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])}, + {primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])}, } - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() codecMock.On("DecodeMultiKey").Return(descMock, nil).Once() descMock.On("Clone").Return(descMock).Once() descMock.On("FindDifference", descMockResult).Return(descMockResult, []string{}, nil).Once() codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once() - ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once() + ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Return(false, nil).Once() err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { return descMockResult, true, nil @@ -266,17 +301,17 @@ func Test_WatchPrefix(t *testing.T) { ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{} c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) - data := map[string][]byte{} + data := map[string]dynamodbItem{} dataKey := []string{"t1", "t2"} - data[dataKey[0]] = []byte(dataKey[0]) - data[dataKey[1]] = []byte(dataKey[1]) + data[dataKey[0]] = dynamodbItem{data: []byte(dataKey[0])} + data[dataKey[1]] = dynamodbItem{data: []byte(dataKey[1])} calls := 0 ddbMock.On("Query").Return(data, nil) codecMock.On("Decode").Twice() c.WatchPrefix(context.TODO(), key, func(key string, i interface{}) bool { - require.EqualValues(t, string(data[key]), i) + require.EqualValues(t, string(data[key].data), i) delete(data, key) calls++ return calls < 2 @@ -321,7 +356,7 @@ func Test_DynamodbKVWithTimeout(t *testing.T) { err = dbWithTimeout.Put(ctx, dynamodbKey{primaryKey: key}, []byte{}) require.True(t, errors.Is(err, context.DeadlineExceeded)) - err = dbWithTimeout.Batch(ctx, nil, nil) + _, err = dbWithTimeout.Batch(ctx, nil, nil) require.True(t, errors.Is(err, context.DeadlineExceeded)) } @@ -358,13 +393,13 @@ func (m *MockDynamodbClient) List(context.Context, dynamodbKey) ([]string, float } return args.Get(0).([]string), 0, err } -func (m *MockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string][]byte, float64, error) { +func (m *MockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string]dynamodbItem, float64, error) { args := m.Called() var err error if args.Get(1) != nil { err = args.Get(1).(error) } - return args.Get(0).(map[string][]byte), 0, err + return args.Get(0).(map[string]dynamodbItem), 0, err } func (m *MockDynamodbClient) Delete(ctx context.Context, key dynamodbKey) error { m.Called(ctx, key) @@ -374,9 +409,13 @@ func (m *MockDynamodbClient) Put(ctx context.Context, key dynamodbKey, data []by m.Called(ctx, key, data) return nil } -func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { - m.Called(ctx, put, delete) - return nil +func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) { + args := m.Called(ctx, put, delete) + var err error + if args.Get(1) != nil { + err = args.Get(1).(error) + } + return args.Get(0).(bool), err } type TestLogger struct { @@ -471,7 +510,7 @@ func (d *dynamodbKVWithDelayAndContextCheck) List(ctx context.Context, key dynam } } -func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) { +func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) { select { case <-ctx.Done(): return nil, 0, ctx.Err() @@ -498,10 +537,10 @@ func (d *dynamodbKVWithDelayAndContextCheck) Put(ctx context.Context, key dynamo } } -func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { +func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) { select { case <-ctx.Done(): - return ctx.Err() + return false, ctx.Err() case <-time.After(d.delay): return d.ddbClient.Batch(ctx, put, delete) } diff --git a/pkg/ring/kv/dynamodb/dynamodb.go b/pkg/ring/kv/dynamodb/dynamodb.go index 2dc4769d6e2..57246497016 100644 --- a/pkg/ring/kv/dynamodb/dynamodb.go +++ b/pkg/ring/kv/dynamodb/dynamodb.go @@ -2,6 +2,7 @@ package dynamodb import ( "context" + "errors" "fmt" "math" "strconv" @@ -27,10 +28,10 @@ type dynamodbKey struct { type dynamoDbClient interface { List(ctx context.Context, key dynamodbKey) ([]string, float64, error) - Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) + Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) Delete(ctx context.Context, key dynamodbKey) error Put(ctx context.Context, key dynamodbKey, data []byte) error - Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error + Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) } type dynamodbKV struct { @@ -40,11 +41,17 @@ type dynamodbKV struct { ttlValue time.Duration } +type dynamodbItem struct { + data []byte + version int64 +} + var ( primaryKey = "RingKey" sortKey = "InstanceKey" contentData = "Data" timeToLive = "ttl" + version = "version" ) func newDynamodbKV(cfg Config, logger log.Logger) (dynamodbKV, error) { @@ -120,8 +127,8 @@ func (kv dynamodbKV) List(ctx context.Context, key dynamodbKey) ([]string, float return keys, totalCapacity, nil } -func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) { - keys := make(map[string][]byte) +func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) { + keys := make(map[string]dynamodbItem) var totalCapacity float64 co := dynamodb.ComparisonOperatorEq if isPrefix { @@ -145,7 +152,21 @@ func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool) err := kv.ddbClient.QueryPagesWithContext(ctx, input, func(output *dynamodb.QueryOutput, _ bool) bool { totalCapacity += getCapacityUnits(output.ConsumedCapacity) for _, item := range output.Items { - keys[*item[sortKey].S] = item[contentData].B + itemVersion := int64(0) + if item[version] != nil { + parsedVersion, err := strconv.ParseInt(*item[version].N, 10, 0) + if err != nil { + kv.logger.Log("msg", "failed to parse item version", "version", *item[version].N, "err", err) + } else { + itemVersion = parsedVersion + } + } + + keys[*item[sortKey].S] = dynamodbItem{ + data: item[contentData].B, + version: itemVersion, + } + } return true }) @@ -174,7 +195,7 @@ func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) (flo input := &dynamodb.PutItemInput{ TableName: kv.tableName, ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), - Item: kv.generatePutItemRequest(key, data), + Item: kv.generatePutItemRequest(key, dynamodbItem{data: data}), } totalCapacity := float64(0) output, err := kv.ddbClient.PutItemWithContext(ctx, input) @@ -184,26 +205,32 @@ func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) (flo return totalCapacity, err } -func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) (float64, error) { +func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (float64, bool, error) { totalCapacity := float64(0) writeRequestSize := len(put) + len(delete) if writeRequestSize == 0 { - return totalCapacity, nil + return totalCapacity, false, nil } - writeRequestsSlices := make([][]*dynamodb.WriteRequest, int(math.Ceil(float64(writeRequestSize)/float64(DdbBatchSizeLimit)))) + writeRequestsSlices := make([][]*dynamodb.TransactWriteItem, int(math.Ceil(float64(writeRequestSize)/float64(DdbBatchSizeLimit)))) for i := 0; i < len(writeRequestsSlices); i++ { - writeRequestsSlices[i] = make([]*dynamodb.WriteRequest, 0, DdbBatchSizeLimit) + writeRequestsSlices[i] = make([]*dynamodb.TransactWriteItem, 0, DdbBatchSizeLimit) } - currIdx := 0 - for key, data := range put { - item := kv.generatePutItemRequest(key, data) - writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.WriteRequest{ - PutRequest: &dynamodb.PutRequest{ - Item: item, - }, - }) + for key, ddbItem := range put { + item := kv.generatePutItemRequest(key, ddbItem) + ddbPut := &dynamodb.Put{ + TableName: kv.tableName, + Item: item, + } + // condition for optimistic locking; DynamoDB will only succeed the request if either the version attribute does not exist + // (for backwards compatibility) or the object version has not changed since it was last read + ddbPut.ConditionExpression = aws.String("attribute_not_exists(version) OR version = :v") + ddbPut.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{ + ":v": {N: aws.String(strconv.FormatInt(ddbItem.version, 10))}, + } + + writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.TransactWriteItem{Put: ddbPut}) if len(writeRequestsSlices[currIdx]) == DdbBatchSizeLimit { currIdx++ } @@ -211,9 +238,10 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, dele for _, key := range delete { item := generateItemKey(key) - writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.WriteRequest{ - DeleteRequest: &dynamodb.DeleteRequest{ - Key: item, + writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.TransactWriteItem{ + Delete: &dynamodb.Delete{ + TableName: kv.tableName, + Key: item, }, }) if len(writeRequestsSlices[currIdx]) == DdbBatchSizeLimit { @@ -222,33 +250,33 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, dele } for _, slice := range writeRequestsSlices { - input := &dynamodb.BatchWriteItemInput{ - ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), - RequestItems: map[string][]*dynamodb.WriteRequest{ - *kv.tableName: slice, - }, + transactItems := &dynamodb.TransactWriteItemsInput{ + TransactItems: slice, } - - resp, err := kv.ddbClient.BatchWriteItemWithContext(ctx, input) + resp, err := kv.ddbClient.TransactWriteItemsWithContext(ctx, transactItems) if err != nil { - return totalCapacity, err + var checkFailed *dynamodb.ConditionalCheckFailedException + isCheckFailedException := errors.As(err, &checkFailed) + if isCheckFailedException { + kv.logger.Log("msg", "conditional check failed on DynamoDB Batch", "item", fmt.Sprintf("%v", checkFailed.Item), "err", err) + } + return totalCapacity, isCheckFailedException, err } for _, consumedCapacity := range resp.ConsumedCapacity { totalCapacity += getCapacityUnits(consumedCapacity) } - - if len(resp.UnprocessedItems) > 0 { - return totalCapacity, fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems) - } } - return totalCapacity, nil + return totalCapacity, false, nil } -func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[string]*dynamodb.AttributeValue { +func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, ddbItem dynamodbItem) map[string]*dynamodb.AttributeValue { item := generateItemKey(key) item[contentData] = &dynamodb.AttributeValue{ - B: data, + B: ddbItem.data, + } + item[version] = &dynamodb.AttributeValue{ + N: aws.String(strconv.FormatInt(ddbItem.version+1, 10)), } if kv.getTTL() > 0 { item[timeToLive] = &dynamodb.AttributeValue{ @@ -274,7 +302,7 @@ func (d *dynamodbKVWithTimeout) List(ctx context.Context, key dynamodbKey) ([]st return d.ddbClient.List(ctx, key) } -func (d *dynamodbKVWithTimeout) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) { +func (d *dynamodbKVWithTimeout) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) { ctx, cancel := context.WithTimeout(ctx, d.timeout) defer cancel() return d.ddbClient.Query(ctx, key, isPrefix) @@ -292,7 +320,7 @@ func (d *dynamodbKVWithTimeout) Put(ctx context.Context, key dynamodbKey, data [ return d.ddbClient.Put(ctx, key, data) } -func (d *dynamodbKVWithTimeout) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { +func (d *dynamodbKVWithTimeout) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) { ctx, cancel := context.WithTimeout(ctx, d.timeout) defer cancel() return d.ddbClient.Batch(ctx, put, delete) diff --git a/pkg/ring/kv/dynamodb/dynamodb_test.go b/pkg/ring/kv/dynamodb/dynamodb_test.go index 7e253716d12..c9dcd85ef12 100644 --- a/pkg/ring/kv/dynamodb/dynamodb_test.go +++ b/pkg/ring/kv/dynamodb/dynamodb_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "strings" "testing" "time" @@ -61,24 +62,28 @@ func Test_Batch(t *testing.T) { primaryKey: "PKDelete", sortKey: "SKDelete", } - update := map[dynamodbKey][]byte{ - ddbKeyUpdate: {}, + update := map[dynamodbKey]dynamodbItem{ + ddbKeyUpdate: { + data: []byte{}, + version: 0, + }, } delete := []dynamodbKey{ddbKeyDelete} ddbClientMock := &mockDynamodb{ - batchWriteItem: func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) { - require.NotNil(t, input.RequestItems[tableName]) - require.EqualValues(t, 2, len(input.RequestItems[tableName])) + transactWriteItem: func(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) { + require.NotNil(t, input.TransactItems) + require.EqualValues(t, 2, len(input.TransactItems)) require.True(t, - (checkPutRequestForItem(input.RequestItems[tableName][0], ddbKeyUpdate) || checkPutRequestForItem(input.RequestItems[tableName][1], ddbKeyUpdate)) && - (checkDeleteRequestForItem(input.RequestItems[tableName][0], ddbKeyDelete) || checkDeleteRequestForItem(input.RequestItems[tableName][1], ddbKeyDelete))) - return &dynamodb.BatchWriteItemOutput{}, nil + (checkPutForItem(input.TransactItems[0].Put, ddbKeyUpdate)) && + (checkPutForConditionalExpression(input.TransactItems[0].Put, ddbKeyUpdate)) && + (checkDeleteForItem(input.TransactItems[1].Delete, ddbKeyDelete))) + return &dynamodb.TransactWriteItemsOutput{}, nil }, } ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - _, err := ddb.Batch(context.TODO(), update, delete) + _, _, err := ddb.Batch(context.TODO(), update, delete) require.NoError(t, err) } @@ -90,9 +95,9 @@ func Test_BatchSlices(t *testing.T) { } numOfCalls := 0 ddbClientMock := &mockDynamodb{ - batchWriteItem: func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) { + transactWriteItem: func(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) { numOfCalls++ - return &dynamodb.BatchWriteItemOutput{}, nil + return &dynamodb.TransactWriteItemsOutput{}, nil }, } ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) @@ -126,7 +131,7 @@ func Test_BatchSlices(t *testing.T) { delete = append(delete, ddbKeyDelete) } - _, err := ddb.Batch(context.TODO(), nil, delete) + _, _, err := ddb.Batch(context.TODO(), nil, delete) require.NoError(t, err) require.EqualValues(t, tc.expectedCalls, numOfCalls) @@ -140,83 +145,85 @@ func Test_EmptyBatch(t *testing.T) { ddbClientMock := &mockDynamodb{} ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - _, err := ddb.Batch(context.TODO(), nil, nil) + _, _, err := ddb.Batch(context.TODO(), nil, nil) require.NoError(t, err) } -func Test_Batch_UnprocessedItems(t *testing.T) { +func Test_Batch_Error(t *testing.T) { tableName := "TEST" - ddbKeyDelete := dynamodbKey{ - primaryKey: "PKDelete", - sortKey: "SKDelete", - } - delete := []dynamodbKey{ddbKeyDelete} - ddbClientMock := &mockDynamodb{ - batchWriteItem: func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) { - return &dynamodb.BatchWriteItemOutput{ - UnprocessedItems: map[string][]*dynamodb.WriteRequest{ - tableName: {&dynamodb.WriteRequest{ - PutRequest: &dynamodb.PutRequest{Item: generateItemKey(ddbKeyDelete)}}, - }, - }, - }, nil + testCases := []struct { + name string + mockError error + expectedRetry bool + }{ + { + name: "generic_error_no_retry", + mockError: fmt.Errorf("mocked error"), + expectedRetry: false, + }, + { + name: "conditional_check_failed_should_retry", + mockError: &dynamodb.ConditionalCheckFailedException{}, + expectedRetry: true, }, } - ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - _, err := ddb.Batch(context.TODO(), nil, delete) - require.Errorf(t, err, "error processing batch dynamodb") -} + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ddbClientMock := &mockDynamodb{ + transactWriteItem: func(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) { + return nil, tc.mockError + }, + } -func Test_Batch_Error(t *testing.T) { - tableName := "TEST" - ddbKeyDelete := dynamodbKey{ - primaryKey: "PKDelete", - sortKey: "SKDelete", - } - delete := []dynamodbKey{ddbKeyDelete} + ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - ddbClientMock := &mockDynamodb{ - batchWriteItem: func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) { - return &dynamodb.BatchWriteItemOutput{}, fmt.Errorf("mocked error") - }, + delete := []dynamodbKey{{primaryKey: "PKDelete", sortKey: "SKDelete"}} + _, retry, err := ddb.Batch(context.TODO(), nil, delete) + + require.Error(t, err) + require.Equal(t, tc.expectedRetry, retry) + }) } +} - ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - _, err := ddb.Batch(context.TODO(), nil, delete) - require.Errorf(t, err, "mocked error") +func checkPutForItem(request *dynamodb.Put, key dynamodbKey) bool { + return request != nil && + request.Item != nil && + request.Item[primaryKey] != nil && + request.Item[sortKey] != nil && + *request.Item[primaryKey].S == key.primaryKey && + *request.Item[sortKey].S == key.sortKey } -func checkPutRequestForItem(request *dynamodb.WriteRequest, key dynamodbKey) bool { - return request.PutRequest != nil && - request.PutRequest.Item[primaryKey] != nil && - request.PutRequest.Item[sortKey] != nil && - *request.PutRequest.Item[primaryKey].S == key.primaryKey && - *request.PutRequest.Item[sortKey].S == key.sortKey +func checkDeleteForItem(request *dynamodb.Delete, key dynamodbKey) bool { + return request != nil && + request.Key[primaryKey] != nil && + request.Key[sortKey] != nil && + *request.Key[primaryKey].S == key.primaryKey && + *request.Key[sortKey].S == key.sortKey } -func checkDeleteRequestForItem(request *dynamodb.WriteRequest, key dynamodbKey) bool { - return request.DeleteRequest != nil && - request.DeleteRequest.Key[primaryKey] != nil && - request.DeleteRequest.Key[sortKey] != nil && - *request.DeleteRequest.Key[primaryKey].S == key.primaryKey && - *request.DeleteRequest.Key[sortKey].S == key.sortKey +func checkPutForConditionalExpression(request *dynamodb.Put, key dynamodbKey) bool { + return request != nil && + request.ConditionExpression != nil && + strings.Contains(*request.ConditionExpression, "version = :v") } type mockDynamodb struct { - putItem func(input *dynamodb.PutItemInput) *dynamodb.PutItemOutput - batchWriteItem func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) + putItem func(input *dynamodb.PutItemInput) *dynamodb.PutItemOutput + transactWriteItem func(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) dynamodbiface.DynamoDBAPI } -func (m *mockDynamodb) PutItemWithContext(_ context.Context, input *dynamodb.PutItemInput, _ ...request.Option) (*dynamodb.PutItemOutput, error) { +func (m *mockDynamodb) PutItemWithContext(_ aws.Context, input *dynamodb.PutItemInput, _ ...request.Option) (*dynamodb.PutItemOutput, error) { return m.putItem(input), nil } -func (m *mockDynamodb) BatchWriteItemWithContext(ctx context.Context, input *dynamodb.BatchWriteItemInput, opts ...request.Option) (*dynamodb.BatchWriteItemOutput, error) { - return m.batchWriteItem(input) +func (m *mockDynamodb) TransactWriteItemsWithContext(_ aws.Context, input *dynamodb.TransactWriteItemsInput, _ ...request.Option) (*dynamodb.TransactWriteItemsOutput, error) { + return m.transactWriteItem(input) } func newDynamodbClientMock(tableName string, mock *mockDynamodb, ttl time.Duration) *dynamodbKV { diff --git a/pkg/ring/kv/dynamodb/metrics.go b/pkg/ring/kv/dynamodb/metrics.go index fc5e35a9e94..1d0f051da0e 100644 --- a/pkg/ring/kv/dynamodb/metrics.go +++ b/pkg/ring/kv/dynamodb/metrics.go @@ -17,9 +17,10 @@ type dynamodbInstrumentation struct { } type dynamodbMetrics struct { - dynamodbRequestDuration *instrument.HistogramCollector - dynamodbUsageMetrics *prometheus.CounterVec - dynamodbCasAttempts prometheus.Counter + dynamodbRequestDuration *instrument.HistogramCollector + dynamodbUsageMetrics *prometheus.CounterVec + dynamodbCasAttempts prometheus.Counter + dynamodbConditionalCheckFailures prometheus.Counter } func newDynamoDbMetrics(registerer prometheus.Registerer) *dynamodbMetrics { @@ -39,10 +40,16 @@ func newDynamoDbMetrics(registerer prometheus.Registerer) *dynamodbMetrics { Help: "DynamoDB KV Store Attempted CAS operations", }) + dynamodbConditionalCheckFailures := promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "dynamodb_kv_conditional_check_failed_total", + Help: "Total number of DynamoDB conditional check failures", + }) + dynamodbMetrics := dynamodbMetrics{ - dynamodbRequestDuration: dynamodbRequestDurationCollector, - dynamodbUsageMetrics: dynamodbUsageMetrics, - dynamodbCasAttempts: dynamodbCasAttempts, + dynamodbRequestDuration: dynamodbRequestDurationCollector, + dynamodbUsageMetrics: dynamodbUsageMetrics, + dynamodbCasAttempts: dynamodbCasAttempts, + dynamodbConditionalCheckFailures: dynamodbConditionalCheckFailures, } return &dynamodbMetrics } @@ -59,8 +66,8 @@ func (d dynamodbInstrumentation) List(ctx context.Context, key dynamodbKey) ([]s return resp, totalCapacity, err } -func (d dynamodbInstrumentation) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) { - var resp map[string][]byte +func (d dynamodbInstrumentation) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) { + var resp map[string]dynamodbItem var totalCapacity float64 err := instrument.CollectedRequest(ctx, "Query", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error { var err error @@ -87,12 +94,19 @@ func (d dynamodbInstrumentation) Put(ctx context.Context, key dynamodbKey, data }) } -func (d dynamodbInstrumentation) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { - return instrument.CollectedRequest(ctx, "Batch", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error { - totalCapacity, err := d.kv.Batch(ctx, put, delete) +func (d dynamodbInstrumentation) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) { + retry := false + err := instrument.CollectedRequest(ctx, "Batch", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error { + var err error + totalCapacity, shouldRetry, err := d.kv.Batch(ctx, put, delete) + retry = shouldRetry + if retry { + d.ddbMetrics.dynamodbConditionalCheckFailures.Inc() + } d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Batch").Add(totalCapacity) return err }) + return retry, err } // errorCode converts an error into an error code string.