Skip to content

Commit f7e3262

Browse files
committed
BulkObservable should look at item.IsValid not response.IsValid
This also feeds the failing documents through the callback whether we continue afterwards or not
1 parent 36c2024 commit f7e3262

File tree

4 files changed

+19
-16
lines changed

4 files changed

+19
-16
lines changed

src/Nest/Document/Multiple/Bulk/BulkResponseItem/BulkResponseItemBase.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public bool IsValid
3434
switch (this.Operation.ToLowerInvariant())
3535
{
3636
case "delete": return this.Status == 200 || this.Status == 404;
37-
case "update":
37+
case "update":
3838
case "index":
3939
case "create":
4040
return this.Status == 200 || this.Status == 201;
@@ -44,6 +44,7 @@ public bool IsValid
4444
}
4545
}
4646

47-
public override string ToString() => $"{Operation} returned {Status} _index: {Index} _type: {Type} _id: {Id} _version: {Version} error: {Error}";
47+
public override string ToString() =>
48+
$"{Operation} returned {Status} _index: {Index} _type: {Type} _id: {Id} _version: {Version} error: {Error}";
4849
}
49-
}
50+
}

src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
124124
this.HandleDroppedDocuments(documentsWithResponse, response);
125125

126126
var retryDocuments = documentsWithResponse
127-
.Where(x=> !response.IsValid && this._retryPredicate(x.Item1, x.Item2))
127+
.Where(x=> !x.Item1.IsValid && this._retryPredicate(x.Item1, x.Item2))
128128
.Select(x => x.Item2)
129129
.ToList();
130130

@@ -140,14 +140,12 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
140140
private void HandleDroppedDocuments(List<Tuple<BulkResponseItemBase, T>> documentsWithResponse, IBulkResponse response)
141141
{
142142
var droppedDocuments = documentsWithResponse
143-
.Where(x => !response.IsValid && !this._retryPredicate(x.Item1, x.Item2))
143+
.Where(x => !x.Item1.IsValid && !this._retryPredicate(x.Item1, x.Item2))
144144
.ToList();
145-
if (droppedDocuments.Count > 0 && !this._partionedBulkRequest.ContinueAfterDroppedDocuments)
145+
if (droppedDocuments.Count <= 0) return;
146+
foreach (var dropped in droppedDocuments) this._droppedDocumentCallBack(dropped.Item1, dropped.Item2);
147+
if (!this._partionedBulkRequest.ContinueAfterDroppedDocuments)
146148
throw this.ThrowOnBadBulk(response, $"BulkAll halted after receiving failures that can not be retried from _bulk");
147-
else if (droppedDocuments.Count > 0 && this._partionedBulkRequest.ContinueAfterDroppedDocuments)
148-
{
149-
foreach (var dropped in droppedDocuments) this._droppedDocumentCallBack(dropped.Item1, dropped.Item2);
150-
}
151149
}
152150

153151
private async Task<IBulkAllResponse> HandleBulkRequest(IList<T> buffer, long page, int backOffRetries, IBulkResponse response)

src/Nest/Document/Multiple/BulkAll/BulkAllRequest.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ public interface IBulkAllRequest<T> where T : class
8181
bool ContinueAfterDroppedDocuments { get; set; }
8282

8383
/// <summary>
84-
/// If <see cref="ContinueAfterDroppedDocuments"/> is set to true dropped messages will be fed through
85-
/// this callback. Use this if you don't expect many failures and want to feed these dropped messages in a dead letter queue
86-
/// for instance.
84+
/// If a bulk operation fails because it receives documents it can not retry they will be fed to this callback.
85+
/// If <see cref="ContinueAfterDroppedDocuments"/> is set to true processing will continue, so this callback can be used
86+
/// to feed into a dead letter queue. Otherwise the bulk all indexation will be halted.
8787
/// </summary>
8888
Action<BulkResponseItemBase, T> DroppedDocumentCallback { get; set; }
8989

src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,22 +56,26 @@ private void ScrollAll(string index, int size, int numberOfShards, int numberOfD
5656
private void BulkAll(string index, IEnumerable<SmallObject> documents, int size, int pages, int numberOfDocuments)
5757
{
5858
var seenPages = 0;
59+
60+
var droppedDocuments = new ConcurrentBag<Tuple<BulkResponseItemBase,SmallObject>>();
5961
//first we setup our cold observable
6062
var observableBulk = this.Client.BulkAll(documents, f => f
6163
.MaxDegreeOfParallelism(8)
6264
.BackOffTime(TimeSpan.FromSeconds(10))
6365
.BackOffRetries(2)
66+
.DroppedDocumentCallback((b, i) => droppedDocuments.Add(Tuple.Create(b, i)))
6467
.Size(size)
6568
.RefreshOnCompleted()
6669
.Index(index)
6770
);
6871
//we set up an observer
6972
var bulkObserver = observableBulk.Wait(TimeSpan.FromMinutes(5), b => Interlocked.Increment(ref seenPages));
7073

71-
seenPages.Should().Be(pages);
74+
droppedDocuments.Take(10).Should().BeEmpty();
75+
bulkObserver.TotalNumberOfFailedBuffers.Should().Be(0, "All buffers are expected to be indexed");
76+
seenPages.Should().Be(pages, "BulkAll() did not run to completion");
7277
var count = this.Client.Count<SmallObject>(f => f.Index(index));
73-
count.Count.Should().Be(numberOfDocuments);
74-
bulkObserver.TotalNumberOfFailedBuffers.Should().Be(0);
78+
count.Count.Should().Be(numberOfDocuments, "Target index should have the same document count as source index");
7579
}
7680
}
7781
}

0 commit comments

Comments
 (0)