From 625b689deae70184ecc39e4fcda96e56ccf18027 Mon Sep 17 00:00:00 2001 From: Anna Tran Date: Fri, 22 Aug 2025 16:28:58 -0700 Subject: [PATCH 1/6] Update ring DynamoDB Client to write in a transaction when batching Signed-off-by: Anna Tran --- CHANGELOG.md | 1 + pkg/ring/kv/dynamodb/client.go | 19 +++-- pkg/ring/kv/dynamodb/client_test.go | 64 +++++++-------- pkg/ring/kv/dynamodb/dynamodb.go | 98 ++++++++++++++-------- pkg/ring/kv/dynamodb/dynamodb_test.go | 112 ++++++++++++++------------ pkg/ring/kv/dynamodb/metrics.go | 6 +- 6 files changed, 171 insertions(+), 129 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6987b41ecdc..78568102d54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ * [ENHANCEMENT] API: add request ID injection to context to enable tracking requests across downstream services. #6895 * [ENHANCEMENT] gRPC: Add gRPC Channelz monitoring. #6950 * [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976 +* [ENHANCEMENT] Implement versioned transactions for batch writes to Ring DynamoDB. #6986 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/pkg/ring/kv/dynamodb/client.go b/pkg/ring/kv/dynamodb/client.go index 0fb53294d1d..e527b4e5afd 100644 --- a/pkg/ring/kv/dynamodb/client.go +++ b/pkg/ring/kv/dynamodb/client.go @@ -185,9 +185,12 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou continue } - putRequests := map[dynamodbKey][]byte{} + putRequests := map[dynamodbKey]dynamodbItem{} for childKey, bytes := range buf { - putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = bytes + putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = dynamodbItem{ + data: bytes, + version: resp[childKey].version, + } } deleteRequests := make([]dynamodbKey, 0, len(toDelete)) @@ -273,8 +276,8 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, continue } - for key, bytes := range out { - decoded, err := c.codec.Decode(bytes) + for key, ddbItem := range out { + decoded, err := c.codec.Decode(ddbItem.data) if err != nil { level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err) continue @@ -293,8 +296,12 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, } } -func (c *Client) decodeMultikey(data map[string][]byte) (codec.MultiKey, error) { - res, err := c.codec.DecodeMultiKey(data) +func (c *Client) decodeMultikey(data map[string]dynamodbItem) (codec.MultiKey, error) { + multiKeyData := make(map[string][]byte, len(data)) + for key, ddbItem := range data { + multiKeyData[key] = ddbItem.data + } + res, err := c.codec.DecodeMultiKey(multiKeyData) if err != nil { return nil, err } diff --git a/pkg/ring/kv/dynamodb/client_test.go b/pkg/ring/kv/dynamodb/client_test.go index 7cefe64f752..b70e7f5cc5e 100644 --- a/pkg/ring/kv/dynamodb/client_test.go +++ b/pkg/ring/kv/dynamodb/client_test.go @@ -34,7 +34,7 @@ func Test_CAS_ErrorNoRetry(t *testing.T) { c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) expectedErr := errors.Errorf("test") - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() codecMock.On("DecodeMultiKey").Return(descMock, nil).Twice() descMock.On("Clone").Return(descMock).Once() @@ -52,9 +52,9 @@ func Test_CAS_Backoff(t *testing.T) { c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) expectedErr := errors.Errorf("test") - ddbMock.On("Query").Return(map[string][]byte{}, expectedErr).Once() - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() - ddbMock.On("Batch", context.TODO(), map[dynamodbKey][]byte{}, []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}}).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, expectedErr).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() + ddbMock.On("Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}}).Once() codecMock.On("DecodeMultiKey").Return(descMock, nil).Twice() descMock.On("Clone").Return(descMock).Once() descMock.On("FindDifference", descMock).Return(descMock, []string{"childkey"}, nil).Once() @@ -78,7 +78,7 @@ func Test_CAS_Failed(t *testing.T) { descMock := &DescMock{} c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, config) - ddbMock.On("Query").Return(map[string][]byte{}, errors.Errorf("test")) + ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("test")) err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { return descMock, true, nil @@ -98,12 +98,12 @@ func Test_CAS_Update(t *testing.T) { expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]), expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]), } - expectedBatch := map[dynamodbKey][]byte{ - {primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]), - {primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]), + expectedBatch := map[dynamodbKey]dynamodbItem{ + {primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])}, + {primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])}, } - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() codecMock.On("DecodeMultiKey").Return(descMock, nil).Once() descMock.On("Clone").Return(descMock).Once() descMock.On("FindDifference", descMock).Return(descMock, []string{}, nil).Once() @@ -130,12 +130,12 @@ func Test_CAS_Delete(t *testing.T) { {primaryKey: key, sortKey: expectedToDelete[1]}, } - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() codecMock.On("DecodeMultiKey").Return(descMock, nil).Once() descMock.On("Clone").Return(descMock).Once() descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once() codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Once() - ddbMock.On("Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch) + ddbMock.On("Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch) err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { return descMock, true, nil @@ -143,7 +143,7 @@ func Test_CAS_Delete(t *testing.T) { require.NoError(t, err) ddbMock.AssertNumberOfCalls(t, "Batch", 1) - ddbMock.AssertCalled(t, "Batch", context.TODO(), map[dynamodbKey][]byte{}, expectedBatch) + ddbMock.AssertCalled(t, "Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch) } func Test_CAS_Update_Delete(t *testing.T) { @@ -156,9 +156,9 @@ func Test_CAS_Update_Delete(t *testing.T) { expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]), expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]), } - expectedUpdateBatch := map[dynamodbKey][]byte{ - {primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]), - {primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]), + expectedUpdateBatch := map[dynamodbKey]dynamodbItem{ + {primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])}, + {primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])}, } expectedToDelete := []string{"test", "test2"} expectedDeleteBatch := []dynamodbKey{ @@ -166,7 +166,7 @@ func Test_CAS_Update_Delete(t *testing.T) { {primaryKey: key, sortKey: expectedToDelete[1]}, } - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() codecMock.On("DecodeMultiKey").Return(descMock, nil).Once() descMock.On("Clone").Return(descMock).Once() descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once() @@ -189,7 +189,7 @@ func Test_WatchKey(t *testing.T) { c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), 1*time.Second, defaultBackoff) timesCalled := 0 - ddbMock.On("Query").Return(map[string][]byte{}, nil) + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil) codecMock.On("DecodeMultiKey").Return(descMock, nil) c.WatchKey(context.TODO(), key, func(i interface{}) bool { @@ -207,7 +207,7 @@ func Test_WatchKey_UpdateStale(t *testing.T) { c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) staleData := &DescMock{} - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() codecMock.On("DecodeMultiKey").Return(staleData, nil) c.WatchKey(context.TODO(), key, func(i interface{}) bool { @@ -217,7 +217,7 @@ func Test_WatchKey_UpdateStale(t *testing.T) { return false }) - ddbMock.On("Query").Return(map[string][]byte{}, errors.Errorf("failed")) + ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("failed")) staleData.On("Clone").Return(staleData).Once() c.WatchKey(context.TODO(), key, func(i interface{}) bool { @@ -241,12 +241,12 @@ func Test_CAS_UpdateStale(t *testing.T) { expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]), expectedUpdatedKeys[1]: []byte(expectedUpdatedKeys[1]), } - expectedBatch := map[dynamodbKey][]byte{ - {primaryKey: key, sortKey: expectedUpdatedKeys[0]}: []byte(expectedUpdatedKeys[0]), - {primaryKey: key, sortKey: expectedUpdatedKeys[1]}: []byte(expectedUpdatedKeys[1]), + expectedBatch := map[dynamodbKey]dynamodbItem{ + {primaryKey: key, sortKey: expectedUpdatedKeys[0]}: {data: []byte(expectedUpdatedKeys[0])}, + {primaryKey: key, sortKey: expectedUpdatedKeys[1]}: {data: []byte(expectedUpdatedKeys[1])}, } - ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() codecMock.On("DecodeMultiKey").Return(descMock, nil).Once() descMock.On("Clone").Return(descMock).Once() descMock.On("FindDifference", descMockResult).Return(descMockResult, []string{}, nil).Once() @@ -266,17 +266,17 @@ func Test_WatchPrefix(t *testing.T) { ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{} c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) - data := map[string][]byte{} + data := map[string]dynamodbItem{} dataKey := []string{"t1", "t2"} - data[dataKey[0]] = []byte(dataKey[0]) - data[dataKey[1]] = []byte(dataKey[1]) + data[dataKey[0]] = dynamodbItem{data: []byte(dataKey[0])} + data[dataKey[1]] = dynamodbItem{data: []byte(dataKey[1])} calls := 0 ddbMock.On("Query").Return(data, nil) codecMock.On("Decode").Twice() c.WatchPrefix(context.TODO(), key, func(key string, i interface{}) bool { - require.EqualValues(t, string(data[key]), i) + require.EqualValues(t, string(data[key].data), i) delete(data, key) calls++ return calls < 2 @@ -358,13 +358,13 @@ func (m *MockDynamodbClient) List(context.Context, dynamodbKey) ([]string, float } return args.Get(0).([]string), 0, err } -func (m *MockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string][]byte, float64, error) { +func (m *MockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string]dynamodbItem, float64, error) { args := m.Called() var err error if args.Get(1) != nil { err = args.Get(1).(error) } - return args.Get(0).(map[string][]byte), 0, err + return args.Get(0).(map[string]dynamodbItem), 0, err } func (m *MockDynamodbClient) Delete(ctx context.Context, key dynamodbKey) error { m.Called(ctx, key) @@ -374,7 +374,7 @@ func (m *MockDynamodbClient) Put(ctx context.Context, key dynamodbKey, data []by m.Called(ctx, key, data) return nil } -func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { +func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) error { m.Called(ctx, put, delete) return nil } @@ -471,7 +471,7 @@ func (d *dynamodbKVWithDelayAndContextCheck) List(ctx context.Context, key dynam } } -func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) { +func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) { select { case <-ctx.Done(): return nil, 0, ctx.Err() @@ -498,7 +498,7 @@ func (d *dynamodbKVWithDelayAndContextCheck) Put(ctx context.Context, key dynamo } } -func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { +func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) error { select { case <-ctx.Done(): return ctx.Err() diff --git a/pkg/ring/kv/dynamodb/dynamodb.go b/pkg/ring/kv/dynamodb/dynamodb.go index 2dc4769d6e2..5e2f7e1a65d 100644 --- a/pkg/ring/kv/dynamodb/dynamodb.go +++ b/pkg/ring/kv/dynamodb/dynamodb.go @@ -27,10 +27,10 @@ type dynamodbKey struct { type dynamoDbClient interface { List(ctx context.Context, key dynamodbKey) ([]string, float64, error) - Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) + Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) Delete(ctx context.Context, key dynamodbKey) error Put(ctx context.Context, key dynamodbKey, data []byte) error - Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error + Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) error } type dynamodbKV struct { @@ -40,11 +40,17 @@ type dynamodbKV struct { ttlValue time.Duration } +type dynamodbItem struct { + data []byte + version string +} + var ( primaryKey = "RingKey" sortKey = "InstanceKey" contentData = "Data" timeToLive = "ttl" + version = "version" ) func newDynamodbKV(cfg Config, logger log.Logger) (dynamodbKV, error) { @@ -120,8 +126,8 @@ func (kv dynamodbKV) List(ctx context.Context, key dynamodbKey) ([]string, float return keys, totalCapacity, nil } -func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) { - keys := make(map[string][]byte) +func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) { + keys := make(map[string]dynamodbItem) var totalCapacity float64 co := dynamodb.ComparisonOperatorEq if isPrefix { @@ -145,7 +151,15 @@ func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool) err := kv.ddbClient.QueryPagesWithContext(ctx, input, func(output *dynamodb.QueryOutput, _ bool) bool { totalCapacity += getCapacityUnits(output.ConsumedCapacity) for _, item := range output.Items { - keys[*item[sortKey].S] = item[contentData].B + var itemVersion string + if item[version] != nil { + itemVersion = *item[version].N + } + keys[*item[sortKey].S] = dynamodbItem{ + data: item[contentData].B, + version: itemVersion, + } + } return true }) @@ -174,7 +188,7 @@ func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) (flo input := &dynamodb.PutItemInput{ TableName: kv.tableName, ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), - Item: kv.generatePutItemRequest(key, data), + Item: kv.generatePutItemRequest(key, dynamodbItem{data: data}), } totalCapacity := float64(0) output, err := kv.ddbClient.PutItemWithContext(ctx, input) @@ -184,26 +198,31 @@ func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) (flo return totalCapacity, err } -func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) (float64, error) { +func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (float64, error) { totalCapacity := float64(0) writeRequestSize := len(put) + len(delete) if writeRequestSize == 0 { return totalCapacity, nil } - writeRequestsSlices := make([][]*dynamodb.WriteRequest, int(math.Ceil(float64(writeRequestSize)/float64(DdbBatchSizeLimit)))) + writeRequestsSlices := make([][]*dynamodb.TransactWriteItem, int(math.Ceil(float64(writeRequestSize)/float64(DdbBatchSizeLimit)))) for i := 0; i < len(writeRequestsSlices); i++ { - writeRequestsSlices[i] = make([]*dynamodb.WriteRequest, 0, DdbBatchSizeLimit) + writeRequestsSlices[i] = make([]*dynamodb.TransactWriteItem, 0, DdbBatchSizeLimit) } - currIdx := 0 - for key, data := range put { - item := kv.generatePutItemRequest(key, data) - writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.WriteRequest{ - PutRequest: &dynamodb.PutRequest{ - Item: item, - }, - }) + for key, ddbItem := range put { + item := kv.generatePutItemRequest(key, ddbItem) + ddbPut := &dynamodb.Put{ + TableName: kv.tableName, + Item: item, + } + if ddbItem.version != "" { + ddbPut.ConditionExpression = aws.String("version = :v") + ddbPut.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{ + ":v": {N: aws.String(ddbItem.version)}, + } + } + writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.TransactWriteItem{Put: ddbPut}) if len(writeRequestsSlices[currIdx]) == DdbBatchSizeLimit { currIdx++ } @@ -211,9 +230,10 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, dele for _, key := range delete { item := generateItemKey(key) - writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.WriteRequest{ - DeleteRequest: &dynamodb.DeleteRequest{ - Key: item, + writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.TransactWriteItem{ + Delete: &dynamodb.Delete{ + TableName: kv.tableName, + Key: item, }, }) if len(writeRequestsSlices[currIdx]) == DdbBatchSizeLimit { @@ -222,33 +242,29 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey][]byte, dele } for _, slice := range writeRequestsSlices { - input := &dynamodb.BatchWriteItemInput{ - ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), - RequestItems: map[string][]*dynamodb.WriteRequest{ - *kv.tableName: slice, - }, + transactItems := &dynamodb.TransactWriteItemsInput{ + TransactItems: slice, } - - resp, err := kv.ddbClient.BatchWriteItemWithContext(ctx, input) + resp, err := kv.ddbClient.TransactWriteItems(transactItems) if err != nil { return totalCapacity, err } for _, consumedCapacity := range resp.ConsumedCapacity { totalCapacity += getCapacityUnits(consumedCapacity) } - - if len(resp.UnprocessedItems) > 0 { - return totalCapacity, fmt.Errorf("error processing batch request for %s requests", resp.UnprocessedItems) - } } return totalCapacity, nil } -func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[string]*dynamodb.AttributeValue { +func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, ddbItem dynamodbItem) map[string]*dynamodb.AttributeValue { item := generateItemKey(key) item[contentData] = &dynamodb.AttributeValue{ - B: data, + B: ddbItem.data, + } + itemVersion := getItemVersion(ddbItem, kv) + item[version] = &dynamodb.AttributeValue{ + N: aws.String(itemVersion), } if kv.getTTL() > 0 { item[timeToLive] = &dynamodb.AttributeValue{ @@ -259,6 +275,18 @@ func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[st return item } +func getItemVersion(ddbItem dynamodbItem, kv dynamodbKV) string { + if ddbItem.version != "" { + itemVersion, err := strconv.ParseInt(ddbItem.version, 10, 64) + if err != nil { + kv.logger.Log("msg", "error converting version to int", "version", ddbItem.version, "err", err) + return "" + } + return strconv.FormatInt(itemVersion+1, 10) + } + return "0" +} + type dynamodbKVWithTimeout struct { ddbClient dynamoDbClient timeout time.Duration @@ -274,7 +302,7 @@ func (d *dynamodbKVWithTimeout) List(ctx context.Context, key dynamodbKey) ([]st return d.ddbClient.List(ctx, key) } -func (d *dynamodbKVWithTimeout) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) { +func (d *dynamodbKVWithTimeout) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) { ctx, cancel := context.WithTimeout(ctx, d.timeout) defer cancel() return d.ddbClient.Query(ctx, key, isPrefix) @@ -292,7 +320,7 @@ func (d *dynamodbKVWithTimeout) Put(ctx context.Context, key dynamodbKey, data [ return d.ddbClient.Put(ctx, key, data) } -func (d *dynamodbKVWithTimeout) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { +func (d *dynamodbKVWithTimeout) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) error { ctx, cancel := context.WithTimeout(ctx, d.timeout) defer cancel() return d.ddbClient.Batch(ctx, put, delete) diff --git a/pkg/ring/kv/dynamodb/dynamodb_test.go b/pkg/ring/kv/dynamodb/dynamodb_test.go index 7e253716d12..37499241e92 100644 --- a/pkg/ring/kv/dynamodb/dynamodb_test.go +++ b/pkg/ring/kv/dynamodb/dynamodb_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "strings" "testing" "time" @@ -53,27 +54,41 @@ func Test_TTL(t *testing.T) { func Test_Batch(t *testing.T) { tableName := "TEST" - ddbKeyUpdate := dynamodbKey{ - primaryKey: "PKUpdate", - sortKey: "SKUpdate", + ddbKeyUpdate1 := dynamodbKey{ + primaryKey: "PKUpdate1", + sortKey: "SKUpdate1", + } + ddbKeyUpdate2 := dynamodbKey{ + primaryKey: "PKUpdate2", + sortKey: "SKUpdate2", } ddbKeyDelete := dynamodbKey{ primaryKey: "PKDelete", sortKey: "SKDelete", } - update := map[dynamodbKey][]byte{ - ddbKeyUpdate: {}, + update := map[dynamodbKey]dynamodbItem{ + ddbKeyUpdate1: dynamodbItem{ + data: []byte{}, + version: "", + }, + ddbKeyUpdate2: dynamodbItem{ + data: []byte{}, + version: "0", + }, } delete := []dynamodbKey{ddbKeyDelete} ddbClientMock := &mockDynamodb{ - batchWriteItem: func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) { - require.NotNil(t, input.RequestItems[tableName]) - require.EqualValues(t, 2, len(input.RequestItems[tableName])) + transactWriteItem: func(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) { + require.NotNil(t, input.TransactItems) + require.EqualValues(t, 3, len(input.TransactItems)) require.True(t, - (checkPutRequestForItem(input.RequestItems[tableName][0], ddbKeyUpdate) || checkPutRequestForItem(input.RequestItems[tableName][1], ddbKeyUpdate)) && - (checkDeleteRequestForItem(input.RequestItems[tableName][0], ddbKeyDelete) || checkDeleteRequestForItem(input.RequestItems[tableName][1], ddbKeyDelete))) - return &dynamodb.BatchWriteItemOutput{}, nil + (checkPutForItem(input.TransactItems[0].Put, ddbKeyUpdate1)) && + (checkPutForNoConditionalExpression(input.TransactItems[0].Put, ddbKeyUpdate2)) && + (checkPutForItem(input.TransactItems[1].Put, ddbKeyUpdate2)) && + (checkPutForConditionalExpression(input.TransactItems[1].Put, ddbKeyUpdate2)) && + (checkDeleteForItem(input.TransactItems[2].Delete, ddbKeyDelete))) + return &dynamodb.TransactWriteItemsOutput{}, nil }, } @@ -90,9 +105,9 @@ func Test_BatchSlices(t *testing.T) { } numOfCalls := 0 ddbClientMock := &mockDynamodb{ - batchWriteItem: func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) { + transactWriteItem: func(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) { numOfCalls++ - return &dynamodb.BatchWriteItemOutput{}, nil + return &dynamodb.TransactWriteItemsOutput{}, nil }, } ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) @@ -144,7 +159,7 @@ func Test_EmptyBatch(t *testing.T) { require.NoError(t, err) } -func Test_Batch_UnprocessedItems(t *testing.T) { +func Test_Batch_Error(t *testing.T) { tableName := "TEST" ddbKeyDelete := dynamodbKey{ primaryKey: "PKDelete", @@ -153,60 +168,47 @@ func Test_Batch_UnprocessedItems(t *testing.T) { delete := []dynamodbKey{ddbKeyDelete} ddbClientMock := &mockDynamodb{ - batchWriteItem: func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) { - return &dynamodb.BatchWriteItemOutput{ - UnprocessedItems: map[string][]*dynamodb.WriteRequest{ - tableName: {&dynamodb.WriteRequest{ - PutRequest: &dynamodb.PutRequest{Item: generateItemKey(ddbKeyDelete)}}, - }, - }, - }, nil + transactWriteItem: func(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) { + return &dynamodb.TransactWriteItemsOutput{}, fmt.Errorf("mocked error") }, } ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) _, err := ddb.Batch(context.TODO(), nil, delete) - require.Errorf(t, err, "error processing batch dynamodb") + require.Errorf(t, err, "mocked error") } -func Test_Batch_Error(t *testing.T) { - tableName := "TEST" - ddbKeyDelete := dynamodbKey{ - primaryKey: "PKDelete", - sortKey: "SKDelete", - } - delete := []dynamodbKey{ddbKeyDelete} - - ddbClientMock := &mockDynamodb{ - batchWriteItem: func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) { - return &dynamodb.BatchWriteItemOutput{}, fmt.Errorf("mocked error") - }, - } +func checkPutForItem(request *dynamodb.Put, key dynamodbKey) bool { + return request != nil && + request.Item != nil && + request.Item[primaryKey] != nil && + request.Item[sortKey] != nil && + *request.Item[primaryKey].S == key.primaryKey && + *request.Item[sortKey].S == key.sortKey +} - ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - _, err := ddb.Batch(context.TODO(), nil, delete) - require.Errorf(t, err, "mocked error") +func checkDeleteForItem(request *dynamodb.Delete, key dynamodbKey) bool { + return request != nil && + request.Key[primaryKey] != nil && + request.Key[sortKey] != nil && + *request.Key[primaryKey].S == key.primaryKey && + *request.Key[sortKey].S == key.sortKey } -func checkPutRequestForItem(request *dynamodb.WriteRequest, key dynamodbKey) bool { - return request.PutRequest != nil && - request.PutRequest.Item[primaryKey] != nil && - request.PutRequest.Item[sortKey] != nil && - *request.PutRequest.Item[primaryKey].S == key.primaryKey && - *request.PutRequest.Item[sortKey].S == key.sortKey +func checkPutForConditionalExpression(request *dynamodb.Put, key dynamodbKey) bool { + return request != nil && + request.ConditionExpression != nil && + strings.Contains(*request.ConditionExpression, "version = :v") } -func checkDeleteRequestForItem(request *dynamodb.WriteRequest, key dynamodbKey) bool { - return request.DeleteRequest != nil && - request.DeleteRequest.Key[primaryKey] != nil && - request.DeleteRequest.Key[sortKey] != nil && - *request.DeleteRequest.Key[primaryKey].S == key.primaryKey && - *request.DeleteRequest.Key[sortKey].S == key.sortKey +func checkPutForNoConditionalExpression(request *dynamodb.Put, key dynamodbKey) bool { + return request != nil && request.ConditionExpression == nil } type mockDynamodb struct { - putItem func(input *dynamodb.PutItemInput) *dynamodb.PutItemOutput - batchWriteItem func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) + putItem func(input *dynamodb.PutItemInput) *dynamodb.PutItemOutput + batchWriteItem func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) + transactWriteItem func(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) dynamodbiface.DynamoDBAPI } @@ -219,6 +221,10 @@ func (m *mockDynamodb) BatchWriteItemWithContext(ctx context.Context, input *dyn return m.batchWriteItem(input) } +func (m *mockDynamodb) TransactWriteItems(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) { + return m.transactWriteItem(input) +} + func newDynamodbClientMock(tableName string, mock *mockDynamodb, ttl time.Duration) *dynamodbKV { ddbKV := &dynamodbKV{ ddbClient: mock, diff --git a/pkg/ring/kv/dynamodb/metrics.go b/pkg/ring/kv/dynamodb/metrics.go index fc5e35a9e94..60b86687895 100644 --- a/pkg/ring/kv/dynamodb/metrics.go +++ b/pkg/ring/kv/dynamodb/metrics.go @@ -59,8 +59,8 @@ func (d dynamodbInstrumentation) List(ctx context.Context, key dynamodbKey) ([]s return resp, totalCapacity, err } -func (d dynamodbInstrumentation) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) { - var resp map[string][]byte +func (d dynamodbInstrumentation) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) { + var resp map[string]dynamodbItem var totalCapacity float64 err := instrument.CollectedRequest(ctx, "Query", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error { var err error @@ -87,7 +87,7 @@ func (d dynamodbInstrumentation) Put(ctx context.Context, key dynamodbKey, data }) } -func (d dynamodbInstrumentation) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { +func (d dynamodbInstrumentation) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) error { return instrument.CollectedRequest(ctx, "Batch", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error { totalCapacity, err := d.kv.Batch(ctx, put, delete) d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Batch").Add(totalCapacity) From 814bbe7e7eb6b0c81cb3e8214d01c3be586583d4 Mon Sep 17 00:00:00 2001 From: Anna Tran Date: Tue, 26 Aug 2025 16:28:45 -0700 Subject: [PATCH 2/6] Retry on DDB KV batch error for conditional check failed Signed-off-by: Anna Tran --- pkg/ring/kv/dynamodb/client.go | 8 ++- pkg/ring/kv/dynamodb/client_test.go | 91 +++++++++++++++++++-------- pkg/ring/kv/dynamodb/dynamodb.go | 17 ++--- pkg/ring/kv/dynamodb/dynamodb_test.go | 21 +++---- pkg/ring/kv/dynamodb/metrics.go | 10 ++- 5 files changed, 96 insertions(+), 51 deletions(-) diff --git a/pkg/ring/kv/dynamodb/client.go b/pkg/ring/kv/dynamodb/client.go index e527b4e5afd..26b8ea8cb91 100644 --- a/pkg/ring/kv/dynamodb/client.go +++ b/pkg/ring/kv/dynamodb/client.go @@ -199,9 +199,13 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou } if len(putRequests) > 0 || len(deleteRequests) > 0 { - err = c.kv.Batch(ctx, putRequests, deleteRequests) + retry, err := c.kv.Batch(ctx, putRequests, deleteRequests) if err != nil { - return err + if !retry { + return err + } + bo.Wait() + continue } c.updateStaleData(key, r, time.Now().UTC()) return nil diff --git a/pkg/ring/kv/dynamodb/client_test.go b/pkg/ring/kv/dynamodb/client_test.go index b70e7f5cc5e..f82a72de439 100644 --- a/pkg/ring/kv/dynamodb/client_test.go +++ b/pkg/ring/kv/dynamodb/client_test.go @@ -46,25 +46,60 @@ func Test_CAS_ErrorNoRetry(t *testing.T) { } func Test_CAS_Backoff(t *testing.T) { - ddbMock := NewDynamodbClientMock() - codecMock := &CodecMock{} - descMock := &DescMock{} - c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) - expectedErr := errors.Errorf("test") + testCases := []struct { + name string + setupMocks func(*MockDynamodbClient, *CodecMock, *DescMock, map[dynamodbKey]dynamodbItem, []dynamodbKey) + expectedQueryCalls int + expectedBatchCalls int + }{ + { + name: "query_fails_and_backs_off", + setupMocks: func(ddbMock *MockDynamodbClient, codecMock *CodecMock, descMock *DescMock, expectedBatch map[dynamodbKey]dynamodbItem, expectedDelete []dynamodbKey) { + ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("query failed")).Once() + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() + ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(false, nil).Once() + }, + expectedQueryCalls: 2, + expectedBatchCalls: 1, + }, + { + name: "batch_fails_and_backs_off", + setupMocks: func(ddbMock *MockDynamodbClient, codecMock *CodecMock, descMock *DescMock, expectedBatch map[dynamodbKey]dynamodbItem, expectedDelete []dynamodbKey) { + ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Twice() + ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(true, errors.Errorf("batch failed")).Once() + ddbMock.On("Batch", context.TODO(), expectedBatch, expectedDelete).Return(false, nil).Once() + }, + expectedQueryCalls: 2, + expectedBatchCalls: 2, + }, + } - ddbMock.On("Query").Return(map[string]dynamodbItem{}, expectedErr).Once() - ddbMock.On("Query").Return(map[string]dynamodbItem{}, nil).Once() - ddbMock.On("Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}}).Once() - codecMock.On("DecodeMultiKey").Return(descMock, nil).Twice() - descMock.On("Clone").Return(descMock).Once() - descMock.On("FindDifference", descMock).Return(descMock, []string{"childkey"}, nil).Once() - codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Twice() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ddbMock := NewDynamodbClientMock() + codecMock := &CodecMock{} + descMock := &DescMock{} + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) - err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { - return descMock, true, nil - }) + expectedBatch := map[dynamodbKey]dynamodbItem{} + expectedDelete := []dynamodbKey{{primaryKey: "test", sortKey: "childkey"}} - require.NoError(t, err) + tc.setupMocks(ddbMock, codecMock, descMock, expectedBatch, expectedDelete) + + codecMock.On("DecodeMultiKey").Return(descMock, nil).Times(tc.expectedQueryCalls) + descMock.On("Clone").Return(descMock).Times(tc.expectedQueryCalls) + descMock.On("FindDifference", descMock).Return(descMock, []string{"childkey"}, nil).Times(tc.expectedBatchCalls) + codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Times(tc.expectedBatchCalls) + + err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { + return descMock, true, nil + }) + + require.NoError(t, err) + ddbMock.AssertNumberOfCalls(t, "Query", tc.expectedQueryCalls) + ddbMock.AssertNumberOfCalls(t, "Batch", tc.expectedBatchCalls) + }) + } } func Test_CAS_Failed(t *testing.T) { @@ -108,7 +143,7 @@ func Test_CAS_Update(t *testing.T) { descMock.On("Clone").Return(descMock).Once() descMock.On("FindDifference", descMock).Return(descMock, []string{}, nil).Once() codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once() - ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once() + ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Return(false, nil).Once() err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { return descMock, true, nil @@ -135,7 +170,7 @@ func Test_CAS_Delete(t *testing.T) { descMock.On("Clone").Return(descMock).Once() descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once() codecMock.On("EncodeMultiKey").Return(map[string][]byte{}, nil).Once() - ddbMock.On("Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch) + ddbMock.On("Batch", context.TODO(), map[dynamodbKey]dynamodbItem{}, expectedBatch).Return(false, nil).Once() err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { return descMock, true, nil @@ -171,7 +206,7 @@ func Test_CAS_Update_Delete(t *testing.T) { descMock.On("Clone").Return(descMock).Once() descMock.On("FindDifference", descMock).Return(descMock, expectedToDelete, nil).Once() codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once() - ddbMock.On("Batch", context.TODO(), expectedUpdateBatch, expectedDeleteBatch) + ddbMock.On("Batch", context.TODO(), expectedUpdateBatch, expectedDeleteBatch).Return(false, nil).Once() err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { return descMock, true, nil @@ -251,7 +286,7 @@ func Test_CAS_UpdateStale(t *testing.T) { descMock.On("Clone").Return(descMock).Once() descMock.On("FindDifference", descMockResult).Return(descMockResult, []string{}, nil).Once() codecMock.On("EncodeMultiKey").Return(expectedUpdated, nil).Once() - ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Once() + ddbMock.On("Batch", context.TODO(), expectedBatch, []dynamodbKey{}).Return(false, nil).Once() err := c.CAS(context.TODO(), key, func(in interface{}) (out interface{}, retry bool, err error) { return descMockResult, true, nil @@ -321,7 +356,7 @@ func Test_DynamodbKVWithTimeout(t *testing.T) { err = dbWithTimeout.Put(ctx, dynamodbKey{primaryKey: key}, []byte{}) require.True(t, errors.Is(err, context.DeadlineExceeded)) - err = dbWithTimeout.Batch(ctx, nil, nil) + _, err = dbWithTimeout.Batch(ctx, nil, nil) require.True(t, errors.Is(err, context.DeadlineExceeded)) } @@ -374,9 +409,13 @@ func (m *MockDynamodbClient) Put(ctx context.Context, key dynamodbKey, data []by m.Called(ctx, key, data) return nil } -func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) error { - m.Called(ctx, put, delete) - return nil +func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) { + args := m.Called(ctx, put, delete) + var err error + if args.Get(1) != nil { + err = args.Get(1).(error) + } + return args.Get(0).(bool), err } type TestLogger struct { @@ -498,10 +537,10 @@ func (d *dynamodbKVWithDelayAndContextCheck) Put(ctx context.Context, key dynamo } } -func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) error { +func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) { select { case <-ctx.Done(): - return ctx.Err() + return false, ctx.Err() case <-time.After(d.delay): return d.ddbClient.Batch(ctx, put, delete) } diff --git a/pkg/ring/kv/dynamodb/dynamodb.go b/pkg/ring/kv/dynamodb/dynamodb.go index 5e2f7e1a65d..7b4566a620e 100644 --- a/pkg/ring/kv/dynamodb/dynamodb.go +++ b/pkg/ring/kv/dynamodb/dynamodb.go @@ -2,6 +2,7 @@ package dynamodb import ( "context" + "errors" "fmt" "math" "strconv" @@ -30,7 +31,7 @@ type dynamoDbClient interface { Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) Delete(ctx context.Context, key dynamodbKey) error Put(ctx context.Context, key dynamodbKey, data []byte) error - Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) error + Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) } type dynamodbKV struct { @@ -198,11 +199,11 @@ func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) (flo return totalCapacity, err } -func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (float64, error) { +func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (float64, bool, error) { totalCapacity := float64(0) writeRequestSize := len(put) + len(delete) if writeRequestSize == 0 { - return totalCapacity, nil + return totalCapacity, false, nil } writeRequestsSlices := make([][]*dynamodb.TransactWriteItem, int(math.Ceil(float64(writeRequestSize)/float64(DdbBatchSizeLimit)))) @@ -217,11 +218,12 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem Item: item, } if ddbItem.version != "" { - ddbPut.ConditionExpression = aws.String("version = :v") + ddbPut.ConditionExpression = aws.String("attribute_not_exists(version) OR version = :v") ddbPut.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{ ":v": {N: aws.String(ddbItem.version)}, } } + writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.TransactWriteItem{Put: ddbPut}) if len(writeRequestsSlices[currIdx]) == DdbBatchSizeLimit { currIdx++ @@ -247,14 +249,15 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem } resp, err := kv.ddbClient.TransactWriteItems(transactItems) if err != nil { - return totalCapacity, err + var checkFailed *dynamodb.ConditionalCheckFailedException + return totalCapacity, errors.As(err, &checkFailed), err } for _, consumedCapacity := range resp.ConsumedCapacity { totalCapacity += getCapacityUnits(consumedCapacity) } } - return totalCapacity, nil + return totalCapacity, false, nil } func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, ddbItem dynamodbItem) map[string]*dynamodb.AttributeValue { @@ -320,7 +323,7 @@ func (d *dynamodbKVWithTimeout) Put(ctx context.Context, key dynamodbKey, data [ return d.ddbClient.Put(ctx, key, data) } -func (d *dynamodbKVWithTimeout) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) error { +func (d *dynamodbKVWithTimeout) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) { ctx, cancel := context.WithTimeout(ctx, d.timeout) defer cancel() return d.ddbClient.Batch(ctx, put, delete) diff --git a/pkg/ring/kv/dynamodb/dynamodb_test.go b/pkg/ring/kv/dynamodb/dynamodb_test.go index 37499241e92..fd7fa9a291f 100644 --- a/pkg/ring/kv/dynamodb/dynamodb_test.go +++ b/pkg/ring/kv/dynamodb/dynamodb_test.go @@ -67,11 +67,11 @@ func Test_Batch(t *testing.T) { sortKey: "SKDelete", } update := map[dynamodbKey]dynamodbItem{ - ddbKeyUpdate1: dynamodbItem{ + ddbKeyUpdate1: { data: []byte{}, version: "", }, - ddbKeyUpdate2: dynamodbItem{ + ddbKeyUpdate2: { data: []byte{}, version: "0", }, @@ -84,7 +84,7 @@ func Test_Batch(t *testing.T) { require.EqualValues(t, 3, len(input.TransactItems)) require.True(t, (checkPutForItem(input.TransactItems[0].Put, ddbKeyUpdate1)) && - (checkPutForNoConditionalExpression(input.TransactItems[0].Put, ddbKeyUpdate2)) && + (checkPutForNoConditionalExpression(input.TransactItems[0].Put, ddbKeyUpdate1)) && (checkPutForItem(input.TransactItems[1].Put, ddbKeyUpdate2)) && (checkPutForConditionalExpression(input.TransactItems[1].Put, ddbKeyUpdate2)) && (checkDeleteForItem(input.TransactItems[2].Delete, ddbKeyDelete))) @@ -93,7 +93,7 @@ func Test_Batch(t *testing.T) { } ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - _, err := ddb.Batch(context.TODO(), update, delete) + _, _, err := ddb.Batch(context.TODO(), update, delete) require.NoError(t, err) } @@ -141,7 +141,7 @@ func Test_BatchSlices(t *testing.T) { delete = append(delete, ddbKeyDelete) } - _, err := ddb.Batch(context.TODO(), nil, delete) + _, _, err := ddb.Batch(context.TODO(), nil, delete) require.NoError(t, err) require.EqualValues(t, tc.expectedCalls, numOfCalls) @@ -155,7 +155,7 @@ func Test_EmptyBatch(t *testing.T) { ddbClientMock := &mockDynamodb{} ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - _, err := ddb.Batch(context.TODO(), nil, nil) + _, _, err := ddb.Batch(context.TODO(), nil, nil) require.NoError(t, err) } @@ -174,7 +174,7 @@ func Test_Batch_Error(t *testing.T) { } ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - _, err := ddb.Batch(context.TODO(), nil, delete) + _, _, err := ddb.Batch(context.TODO(), nil, delete) require.Errorf(t, err, "mocked error") } @@ -207,20 +207,15 @@ func checkPutForNoConditionalExpression(request *dynamodb.Put, key dynamodbKey) type mockDynamodb struct { putItem func(input *dynamodb.PutItemInput) *dynamodb.PutItemOutput - batchWriteItem func(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error) transactWriteItem func(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) dynamodbiface.DynamoDBAPI } -func (m *mockDynamodb) PutItemWithContext(_ context.Context, input *dynamodb.PutItemInput, _ ...request.Option) (*dynamodb.PutItemOutput, error) { +func (m *mockDynamodb) PutItemWithContext(_ aws.Context, input *dynamodb.PutItemInput, _ ...request.Option) (*dynamodb.PutItemOutput, error) { return m.putItem(input), nil } -func (m *mockDynamodb) BatchWriteItemWithContext(ctx context.Context, input *dynamodb.BatchWriteItemInput, opts ...request.Option) (*dynamodb.BatchWriteItemOutput, error) { - return m.batchWriteItem(input) -} - func (m *mockDynamodb) TransactWriteItems(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) { return m.transactWriteItem(input) } diff --git a/pkg/ring/kv/dynamodb/metrics.go b/pkg/ring/kv/dynamodb/metrics.go index 60b86687895..72ede5e9dae 100644 --- a/pkg/ring/kv/dynamodb/metrics.go +++ b/pkg/ring/kv/dynamodb/metrics.go @@ -87,12 +87,16 @@ func (d dynamodbInstrumentation) Put(ctx context.Context, key dynamodbKey, data }) } -func (d dynamodbInstrumentation) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) error { - return instrument.CollectedRequest(ctx, "Batch", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error { - totalCapacity, err := d.kv.Batch(ctx, put, delete) +func (d dynamodbInstrumentation) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error) { + retry := false + err := instrument.CollectedRequest(ctx, "Batch", d.ddbMetrics.dynamodbRequestDuration, errorCode, func(ctx context.Context) error { + var err error + totalCapacity, shouldRetry, err := d.kv.Batch(ctx, put, delete) + retry = shouldRetry d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Batch").Add(totalCapacity) return err }) + return retry, err } // errorCode converts an error into an error code string. From 65c90a39f6771fef1400ab209353cbebbd86c173 Mon Sep 17 00:00:00 2001 From: Anna Tran Date: Tue, 26 Aug 2025 21:34:38 -0700 Subject: [PATCH 3/6] Use int64 as type for DDB KV item version Signed-off-by: Anna Tran --- pkg/ring/kv/dynamodb/dynamodb.go | 37 ++++++++++----------------- pkg/ring/kv/dynamodb/dynamodb_test.go | 30 ++++++---------------- 2 files changed, 22 insertions(+), 45 deletions(-) diff --git a/pkg/ring/kv/dynamodb/dynamodb.go b/pkg/ring/kv/dynamodb/dynamodb.go index 7b4566a620e..b2be6eaea4e 100644 --- a/pkg/ring/kv/dynamodb/dynamodb.go +++ b/pkg/ring/kv/dynamodb/dynamodb.go @@ -43,7 +43,7 @@ type dynamodbKV struct { type dynamodbItem struct { data []byte - version string + version int64 } var ( @@ -152,10 +152,16 @@ func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool) err := kv.ddbClient.QueryPagesWithContext(ctx, input, func(output *dynamodb.QueryOutput, _ bool) bool { totalCapacity += getCapacityUnits(output.ConsumedCapacity) for _, item := range output.Items { - var itemVersion string + itemVersion := int64(0) if item[version] != nil { - itemVersion = *item[version].N + parsedVersion, err := strconv.ParseInt(*item[version].N, 10, 0) + if err != nil { + kv.logger.Log("msg", "failed to parse item version", "version", *item[version].N, "err", err) + } else { + itemVersion = parsedVersion + } } + keys[*item[sortKey].S] = dynamodbItem{ data: item[contentData].B, version: itemVersion, @@ -217,11 +223,9 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem TableName: kv.tableName, Item: item, } - if ddbItem.version != "" { - ddbPut.ConditionExpression = aws.String("attribute_not_exists(version) OR version = :v") - ddbPut.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{ - ":v": {N: aws.String(ddbItem.version)}, - } + ddbPut.ConditionExpression = aws.String("attribute_not_exists(version) OR version = :v") + ddbPut.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{ + ":v": {N: aws.String(strconv.FormatInt(ddbItem.version, 10))}, } writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.TransactWriteItem{Put: ddbPut}) @@ -247,7 +251,7 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem transactItems := &dynamodb.TransactWriteItemsInput{ TransactItems: slice, } - resp, err := kv.ddbClient.TransactWriteItems(transactItems) + resp, err := kv.ddbClient.TransactWriteItemsWithContext(ctx, transactItems) if err != nil { var checkFailed *dynamodb.ConditionalCheckFailedException return totalCapacity, errors.As(err, &checkFailed), err @@ -265,9 +269,8 @@ func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, ddbItem dynamodbIte item[contentData] = &dynamodb.AttributeValue{ B: ddbItem.data, } - itemVersion := getItemVersion(ddbItem, kv) item[version] = &dynamodb.AttributeValue{ - N: aws.String(itemVersion), + N: aws.String(strconv.FormatInt(ddbItem.version+1, 10)), } if kv.getTTL() > 0 { item[timeToLive] = &dynamodb.AttributeValue{ @@ -278,18 +281,6 @@ func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, ddbItem dynamodbIte return item } -func getItemVersion(ddbItem dynamodbItem, kv dynamodbKV) string { - if ddbItem.version != "" { - itemVersion, err := strconv.ParseInt(ddbItem.version, 10, 64) - if err != nil { - kv.logger.Log("msg", "error converting version to int", "version", ddbItem.version, "err", err) - return "" - } - return strconv.FormatInt(itemVersion+1, 10) - } - return "0" -} - type dynamodbKVWithTimeout struct { ddbClient dynamoDbClient timeout time.Duration diff --git a/pkg/ring/kv/dynamodb/dynamodb_test.go b/pkg/ring/kv/dynamodb/dynamodb_test.go index fd7fa9a291f..e0313666c24 100644 --- a/pkg/ring/kv/dynamodb/dynamodb_test.go +++ b/pkg/ring/kv/dynamodb/dynamodb_test.go @@ -54,26 +54,18 @@ func Test_TTL(t *testing.T) { func Test_Batch(t *testing.T) { tableName := "TEST" - ddbKeyUpdate1 := dynamodbKey{ + ddbKeyUpdate := dynamodbKey{ primaryKey: "PKUpdate1", sortKey: "SKUpdate1", } - ddbKeyUpdate2 := dynamodbKey{ - primaryKey: "PKUpdate2", - sortKey: "SKUpdate2", - } ddbKeyDelete := dynamodbKey{ primaryKey: "PKDelete", sortKey: "SKDelete", } update := map[dynamodbKey]dynamodbItem{ - ddbKeyUpdate1: { - data: []byte{}, - version: "", - }, - ddbKeyUpdate2: { + ddbKeyUpdate: { data: []byte{}, - version: "0", + version: 0, }, } delete := []dynamodbKey{ddbKeyDelete} @@ -81,13 +73,11 @@ func Test_Batch(t *testing.T) { ddbClientMock := &mockDynamodb{ transactWriteItem: func(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) { require.NotNil(t, input.TransactItems) - require.EqualValues(t, 3, len(input.TransactItems)) + require.EqualValues(t, 2, len(input.TransactItems)) require.True(t, - (checkPutForItem(input.TransactItems[0].Put, ddbKeyUpdate1)) && - (checkPutForNoConditionalExpression(input.TransactItems[0].Put, ddbKeyUpdate1)) && - (checkPutForItem(input.TransactItems[1].Put, ddbKeyUpdate2)) && - (checkPutForConditionalExpression(input.TransactItems[1].Put, ddbKeyUpdate2)) && - (checkDeleteForItem(input.TransactItems[2].Delete, ddbKeyDelete))) + (checkPutForItem(input.TransactItems[0].Put, ddbKeyUpdate)) && + (checkPutForConditionalExpression(input.TransactItems[0].Put, ddbKeyUpdate)) && + (checkDeleteForItem(input.TransactItems[1].Delete, ddbKeyDelete))) return &dynamodb.TransactWriteItemsOutput{}, nil }, } @@ -201,10 +191,6 @@ func checkPutForConditionalExpression(request *dynamodb.Put, key dynamodbKey) bo strings.Contains(*request.ConditionExpression, "version = :v") } -func checkPutForNoConditionalExpression(request *dynamodb.Put, key dynamodbKey) bool { - return request != nil && request.ConditionExpression == nil -} - type mockDynamodb struct { putItem func(input *dynamodb.PutItemInput) *dynamodb.PutItemOutput transactWriteItem func(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) @@ -216,7 +202,7 @@ func (m *mockDynamodb) PutItemWithContext(_ aws.Context, input *dynamodb.PutItem return m.putItem(input), nil } -func (m *mockDynamodb) TransactWriteItems(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) { +func (m *mockDynamodb) TransactWriteItemsWithContext(_ aws.Context, input *dynamodb.TransactWriteItemsInput, _ ...request.Option) (*dynamodb.TransactWriteItemsOutput, error) { return m.transactWriteItem(input) } From dc189d06c3280f940647da46d89728cdec2581ef Mon Sep 17 00:00:00 2001 From: Anna Tran Date: Thu, 28 Aug 2025 09:21:08 -0700 Subject: [PATCH 4/6] Default version 0 if instance joining DDB ring for the first time Signed-off-by: Anna Tran --- CHANGELOG.md | 2 +- pkg/ring/kv/dynamodb/client.go | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 78568102d54..5ee7ca0f17d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,7 +72,7 @@ * [ENHANCEMENT] API: add request ID injection to context to enable tracking requests across downstream services. #6895 * [ENHANCEMENT] gRPC: Add gRPC Channelz monitoring. #6950 * [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976 -* [ENHANCEMENT] Implement versioned transactions for batch writes to Ring DynamoDB. #6986 +* [ENHANCEMENT] Implement versioned transactions for writes to DynamoDB ring. #6986 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/pkg/ring/kv/dynamodb/client.go b/pkg/ring/kv/dynamodb/client.go index 26b8ea8cb91..ba0d0387693 100644 --- a/pkg/ring/kv/dynamodb/client.go +++ b/pkg/ring/kv/dynamodb/client.go @@ -187,9 +187,13 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou putRequests := map[dynamodbKey]dynamodbItem{} for childKey, bytes := range buf { + version := int64(0) + if ddbItem, ok := resp[childKey]; ok { + version = ddbItem.version + } putRequests[dynamodbKey{primaryKey: key, sortKey: childKey}] = dynamodbItem{ data: bytes, - version: resp[childKey].version, + version: version, } } From ad51603fd50f849511db70b2f4afd8109fadaf95 Mon Sep 17 00:00:00 2001 From: Anna Tran Date: Thu, 28 Aug 2025 13:33:11 -0700 Subject: [PATCH 5/6] Add test verifying retry on DDB ConditionalCheckFailedException Signed-off-by: Anna Tran --- pkg/ring/kv/dynamodb/dynamodb.go | 2 ++ pkg/ring/kv/dynamodb/dynamodb_test.go | 46 +++++++++++++++++++-------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/pkg/ring/kv/dynamodb/dynamodb.go b/pkg/ring/kv/dynamodb/dynamodb.go index b2be6eaea4e..170f8d2a58a 100644 --- a/pkg/ring/kv/dynamodb/dynamodb.go +++ b/pkg/ring/kv/dynamodb/dynamodb.go @@ -223,6 +223,8 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem TableName: kv.tableName, Item: item, } + // condition for optimistic locking; DynamoDB will only succeed the request if either the version attribute does not exist + // (for backwards compatibility) or the object version has not changed since it was last read ddbPut.ConditionExpression = aws.String("attribute_not_exists(version) OR version = :v") ddbPut.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{ ":v": {N: aws.String(strconv.FormatInt(ddbItem.version, 10))}, diff --git a/pkg/ring/kv/dynamodb/dynamodb_test.go b/pkg/ring/kv/dynamodb/dynamodb_test.go index e0313666c24..c9dcd85ef12 100644 --- a/pkg/ring/kv/dynamodb/dynamodb_test.go +++ b/pkg/ring/kv/dynamodb/dynamodb_test.go @@ -55,8 +55,8 @@ func Test_TTL(t *testing.T) { func Test_Batch(t *testing.T) { tableName := "TEST" ddbKeyUpdate := dynamodbKey{ - primaryKey: "PKUpdate1", - sortKey: "SKUpdate1", + primaryKey: "PKUpdate", + sortKey: "SKUpdate", } ddbKeyDelete := dynamodbKey{ primaryKey: "PKDelete", @@ -151,21 +151,41 @@ func Test_EmptyBatch(t *testing.T) { func Test_Batch_Error(t *testing.T) { tableName := "TEST" - ddbKeyDelete := dynamodbKey{ - primaryKey: "PKDelete", - sortKey: "SKDelete", - } - delete := []dynamodbKey{ddbKeyDelete} - ddbClientMock := &mockDynamodb{ - transactWriteItem: func(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) { - return &dynamodb.TransactWriteItemsOutput{}, fmt.Errorf("mocked error") + testCases := []struct { + name string + mockError error + expectedRetry bool + }{ + { + name: "generic_error_no_retry", + mockError: fmt.Errorf("mocked error"), + expectedRetry: false, + }, + { + name: "conditional_check_failed_should_retry", + mockError: &dynamodb.ConditionalCheckFailedException{}, + expectedRetry: true, }, } - ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) - _, _, err := ddb.Batch(context.TODO(), nil, delete) - require.Errorf(t, err, "mocked error") + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ddbClientMock := &mockDynamodb{ + transactWriteItem: func(input *dynamodb.TransactWriteItemsInput) (*dynamodb.TransactWriteItemsOutput, error) { + return nil, tc.mockError + }, + } + + ddb := newDynamodbClientMock(tableName, ddbClientMock, 5*time.Hour) + + delete := []dynamodbKey{{primaryKey: "PKDelete", sortKey: "SKDelete"}} + _, retry, err := ddb.Batch(context.TODO(), nil, delete) + + require.Error(t, err) + require.Equal(t, tc.expectedRetry, retry) + }) + } } func checkPutForItem(request *dynamodb.Put, key dynamodbKey) bool { From 524b144425ed9b4b6ef80286cfd7058ffa8ddebc Mon Sep 17 00:00:00 2001 From: Anna Tran Date: Thu, 28 Aug 2025 14:32:25 -0700 Subject: [PATCH 6/6] Add metrics and logs to track conditional check failures Signed-off-by: Anna Tran --- pkg/ring/kv/dynamodb/dynamodb.go | 6 +++++- pkg/ring/kv/dynamodb/metrics.go | 22 ++++++++++++++++------ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/pkg/ring/kv/dynamodb/dynamodb.go b/pkg/ring/kv/dynamodb/dynamodb.go index 170f8d2a58a..57246497016 100644 --- a/pkg/ring/kv/dynamodb/dynamodb.go +++ b/pkg/ring/kv/dynamodb/dynamodb.go @@ -256,7 +256,11 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem resp, err := kv.ddbClient.TransactWriteItemsWithContext(ctx, transactItems) if err != nil { var checkFailed *dynamodb.ConditionalCheckFailedException - return totalCapacity, errors.As(err, &checkFailed), err + isCheckFailedException := errors.As(err, &checkFailed) + if isCheckFailedException { + kv.logger.Log("msg", "conditional check failed on DynamoDB Batch", "item", fmt.Sprintf("%v", checkFailed.Item), "err", err) + } + return totalCapacity, isCheckFailedException, err } for _, consumedCapacity := range resp.ConsumedCapacity { totalCapacity += getCapacityUnits(consumedCapacity) diff --git a/pkg/ring/kv/dynamodb/metrics.go b/pkg/ring/kv/dynamodb/metrics.go index 72ede5e9dae..1d0f051da0e 100644 --- a/pkg/ring/kv/dynamodb/metrics.go +++ b/pkg/ring/kv/dynamodb/metrics.go @@ -17,9 +17,10 @@ type dynamodbInstrumentation struct { } type dynamodbMetrics struct { - dynamodbRequestDuration *instrument.HistogramCollector - dynamodbUsageMetrics *prometheus.CounterVec - dynamodbCasAttempts prometheus.Counter + dynamodbRequestDuration *instrument.HistogramCollector + dynamodbUsageMetrics *prometheus.CounterVec + dynamodbCasAttempts prometheus.Counter + dynamodbConditionalCheckFailures prometheus.Counter } func newDynamoDbMetrics(registerer prometheus.Registerer) *dynamodbMetrics { @@ -39,10 +40,16 @@ func newDynamoDbMetrics(registerer prometheus.Registerer) *dynamodbMetrics { Help: "DynamoDB KV Store Attempted CAS operations", }) + dynamodbConditionalCheckFailures := promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "dynamodb_kv_conditional_check_failed_total", + Help: "Total number of DynamoDB conditional check failures", + }) + dynamodbMetrics := dynamodbMetrics{ - dynamodbRequestDuration: dynamodbRequestDurationCollector, - dynamodbUsageMetrics: dynamodbUsageMetrics, - dynamodbCasAttempts: dynamodbCasAttempts, + dynamodbRequestDuration: dynamodbRequestDurationCollector, + dynamodbUsageMetrics: dynamodbUsageMetrics, + dynamodbCasAttempts: dynamodbCasAttempts, + dynamodbConditionalCheckFailures: dynamodbConditionalCheckFailures, } return &dynamodbMetrics } @@ -93,6 +100,9 @@ func (d dynamodbInstrumentation) Batch(ctx context.Context, put map[dynamodbKey] var err error totalCapacity, shouldRetry, err := d.kv.Batch(ctx, put, delete) retry = shouldRetry + if retry { + d.ddbMetrics.dynamodbConditionalCheckFailures.Inc() + } d.ddbMetrics.dynamodbUsageMetrics.WithLabelValues("Batch").Add(totalCapacity) return err })