Skip to content

Commit 428e6d3

Browse files
authored
fix(NODE-3515): do proper opTime merging in bulk results (#3011)
1 parent 564b0d7 commit 428e6d3

File tree

2 files changed

+163
-32
lines changed

2 files changed

+163
-32
lines changed

lib/bulk/common.js

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,15 @@ class WriteError {
414414
}
415415
}
416416

417+
/**
418+
* Converts the number to a Long or returns it.
419+
*
420+
* @ignore
421+
*/
422+
function longOrConvert(value) {
423+
return typeof value === 'number' ? Long.fromNumber(value) : value;
424+
}
425+
417426
/**
418427
* Merges results into shared data structure
419428
* @ignore
@@ -445,42 +454,37 @@ function mergeBatchResults(batch, bulkResult, err, result) {
445454
return;
446455
}
447456

448-
// Deal with opTime if available
457+
// The server write command specification states that lastOp is an optional
458+
// mongod only field that has a type of timestamp. Across various scarce specs
459+
// where opTime is mentioned, it is an "opaque" object that can have a "ts" and
460+
// "t" field with Timestamp and Long as their types respectively.
461+
// The "lastOp" field of the bulk write result is never mentioned in the driver
462+
// specifications or the bulk write spec, so we should probably just keep its
463+
// value consistent since it seems to vary.
464+
// See: https://github.com/mongodb/specifications/blob/master/source/driver-bulk-update.rst#results-object
449465
if (result.opTime || result.lastOp) {
450-
const opTime = result.lastOp || result.opTime;
451-
let lastOpTS = null;
452-
let lastOpT = null;
466+
let opTime = result.lastOp || result.opTime;
453467

454-
// We have a time stamp
455-
if (opTime && opTime._bsontype === 'Timestamp') {
456-
if (bulkResult.lastOp == null) {
457-
bulkResult.lastOp = opTime;
458-
} else if (opTime.greaterThan(bulkResult.lastOp)) {
459-
bulkResult.lastOp = opTime;
460-
}
461-
} else {
462-
// Existing TS
463-
if (bulkResult.lastOp) {
464-
lastOpTS =
465-
typeof bulkResult.lastOp.ts === 'number'
466-
? Long.fromNumber(bulkResult.lastOp.ts)
467-
: bulkResult.lastOp.ts;
468-
lastOpT =
469-
typeof bulkResult.lastOp.t === 'number'
470-
? Long.fromNumber(bulkResult.lastOp.t)
471-
: bulkResult.lastOp.t;
472-
}
473-
474-
// Current OpTime TS
475-
const opTimeTS = typeof opTime.ts === 'number' ? Long.fromNumber(opTime.ts) : opTime.ts;
476-
const opTimeT = typeof opTime.t === 'number' ? Long.fromNumber(opTime.t) : opTime.t;
468+
// If the opTime is a Timestamp, convert it to a consistent format to be
469+
// able to compare easily. Converting to the object from a timestamp is
470+
// much more straightforward than the other direction.
471+
if (opTime._bsontype === 'Timestamp') {
472+
opTime = { ts: opTime, t: Long.ZERO };
473+
}
477474

478-
// Compare the opTime's
479-
if (bulkResult.lastOp == null) {
480-
bulkResult.lastOp = opTime;
481-
} else if (opTimeTS.greaterThan(lastOpTS)) {
475+
// If there's no lastOp, just set it.
476+
if (!bulkResult.lastOp) {
477+
bulkResult.lastOp = opTime;
478+
} else {
479+
// First compare the ts values and set if the opTimeTS value is greater.
480+
const lastOpTS = longOrConvert(bulkResult.lastOp.ts);
481+
const opTimeTS = longOrConvert(opTime.ts);
482+
if (opTimeTS.greaterThan(lastOpTS)) {
482483
bulkResult.lastOp = opTime;
483484
} else if (opTimeTS.equals(lastOpTS)) {
485+
// If the ts values are equal, then compare using the t values.
486+
const lastOpT = longOrConvert(bulkResult.lastOp.t);
487+
const opTimeT = longOrConvert(opTime.t);
484488
if (opTimeT.greaterThan(lastOpT)) {
485489
bulkResult.lastOp = opTime;
486490
}
@@ -1387,6 +1391,7 @@ Object.defineProperty(BulkOperationBase.prototype, 'length', {
13871391
module.exports = {
13881392
Batch,
13891393
BulkOperationBase,
1394+
mergeBatchResults,
13901395
bson,
13911396
INSERT: INSERT,
13921397
UPDATE: UPDATE,

test/unit/bulk_write.test.js

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
const expect = require('chai').expect;
44
const mock = require('mongodb-mock-server');
5-
const BulkWriteResult = require('../../lib/bulk/common').BulkWriteResult;
5+
const Long = require('../../lib/core').BSON.Long;
6+
const Timestamp = require('../../lib/core').BSON.Timestamp;
7+
const common = require('../../lib/bulk/common');
8+
const BulkWriteResult = common.BulkWriteResult;
9+
const mergeBatchResults = common.mergeBatchResults;
610

711
describe('Bulk Writes', function() {
812
const test = {};
@@ -131,4 +135,126 @@ describe('Bulk Writes', function() {
131135

132136
expect(() => result.insertedIds).to.not.throw();
133137
});
138+
139+
describe('#mergeBatchResults', function() {
140+
let opTime;
141+
let lastOp;
142+
const bulkResult = {
143+
ok: 1,
144+
writeErrors: [],
145+
writeConcernErrors: [],
146+
insertedIds: [],
147+
nInserted: 0,
148+
nUpserted: 0,
149+
nMatched: 0,
150+
nModified: 0,
151+
nRemoved: 1,
152+
upserted: []
153+
};
154+
const result = {
155+
n: 8,
156+
nModified: 8,
157+
electionId: '7fffffff0000000000000028',
158+
ok: 1,
159+
$clusterTime: {
160+
clusterTime: '7020546605669417498',
161+
signature: {
162+
hash: 'AAAAAAAAAAAAAAAAAAAAAAAAAAA=',
163+
keyId: 0
164+
}
165+
},
166+
operationTime: '7020546605669417498'
167+
};
168+
const batch = [];
169+
170+
context('when lastOp is an object', function() {
171+
context('when the opTime is a Timestamp', function() {
172+
before(function() {
173+
lastOp = { ts: 7020546605669417496, t: 10 };
174+
opTime = Timestamp.fromNumber(8020546605669417496);
175+
bulkResult.lastOp = lastOp;
176+
result.opTime = opTime;
177+
});
178+
179+
it('replaces the lastOp with the properly formatted object', function() {
180+
mergeBatchResults(batch, bulkResult, null, result);
181+
expect(bulkResult.lastOp).to.deep.equal({ ts: opTime, t: Long.ZERO });
182+
});
183+
});
184+
185+
context('when the opTime is an object', function() {
186+
context('when the ts is greater', function() {
187+
before(function() {
188+
lastOp = { ts: 7020546605669417496, t: 10 };
189+
opTime = { ts: 7020546605669417497, t: 10 };
190+
bulkResult.lastOp = lastOp;
191+
result.opTime = opTime;
192+
});
193+
194+
it('replaces the lastOp with the new opTime', function() {
195+
mergeBatchResults(batch, bulkResult, null, result);
196+
expect(bulkResult.lastOp).to.deep.equal(opTime);
197+
});
198+
});
199+
200+
context('when the ts is equal', function() {
201+
context('when the t is greater', function() {
202+
before(function() {
203+
lastOp = { ts: 7020546605669417496, t: 10 };
204+
opTime = { ts: 7020546605669417496, t: 20 };
205+
bulkResult.lastOp = lastOp;
206+
result.opTime = opTime;
207+
});
208+
209+
it('replaces the lastOp with the new opTime', function() {
210+
mergeBatchResults(batch, bulkResult, null, result);
211+
expect(bulkResult.lastOp).to.deep.equal(opTime);
212+
});
213+
});
214+
215+
context('when the t is equal', function() {
216+
before(function() {
217+
lastOp = { ts: 7020546605669417496, t: 10 };
218+
opTime = { ts: 7020546605669417496, t: 10 };
219+
bulkResult.lastOp = lastOp;
220+
result.opTime = opTime;
221+
});
222+
223+
it('does not replace the lastOp with the new opTime', function() {
224+
mergeBatchResults(batch, bulkResult, null, result);
225+
expect(bulkResult.lastOp).to.deep.equal(lastOp);
226+
});
227+
});
228+
229+
context('when the t is less', function() {
230+
before(function() {
231+
lastOp = { ts: 7020546605669417496, t: 10 };
232+
opTime = { ts: 7020546605669417496, t: 5 };
233+
bulkResult.lastOp = lastOp;
234+
result.opTime = opTime;
235+
});
236+
237+
it('does not replace the lastOp with the new opTime', function() {
238+
mergeBatchResults(batch, bulkResult, null, result);
239+
expect(bulkResult.lastOp).to.deep.equal(lastOp);
240+
});
241+
});
242+
});
243+
244+
context('when the ts is less', function() {
245+
before(function() {
246+
lastOp = { ts: 7020546605669417496, t: 10 };
247+
opTime = { ts: 7020546605669417495, t: 10 };
248+
bulkResult.lastOp = lastOp;
249+
result.opTime = opTime;
250+
});
251+
252+
it('does not replace the lastOp with the new opTime', function() {
253+
mergeBatchResults(batch, bulkResult, null, result);
254+
expect(bulkResult.lastOp).to.deep.equal(lastOp);
255+
});
256+
});
257+
});
258+
});
259+
});
134260
});

0 commit comments

Comments
 (0)