Skip to content

Commit 44187d1

Browse files
authored
Accelerate change stream handling (#132)
This optimizes change stream handling thus: 1. `fullDocument` is replaced, in v4.4+, with just the size of that BSON document. (v4.2 & prior lack the required `$bsonSize` aggregation operator.) 2. Both change streams can now enqueue rechecks concurrently. In local testing these changes allowed a verification that consistently lagged the change stream (and eventually fell off the oplog) to keep up.
1 parent 13a1b2d commit 44187d1

File tree

4 files changed

+103
-38
lines changed

4 files changed

+103
-38
lines changed

internal/verifier/change_stream.go

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/10gen/migration-verifier/internal/keystring"
99
"github.com/10gen/migration-verifier/internal/logger"
1010
"github.com/10gen/migration-verifier/internal/retry"
11+
"github.com/10gen/migration-verifier/internal/types"
1112
"github.com/10gen/migration-verifier/internal/util"
1213
"github.com/10gen/migration-verifier/msync"
1314
"github.com/10gen/migration-verifier/option"
@@ -40,12 +41,13 @@ var supportedEventOpTypes = mapset.NewSet(
4041

4142
// ParsedEvent contains the fields of an event that we have parsed from 'bson.Raw'.
4243
type ParsedEvent struct {
43-
ID any `bson:"_id"`
44-
OpType string `bson:"operationType"`
45-
Ns *Namespace `bson:"ns,omitempty"`
46-
DocKey DocKey `bson:"documentKey,omitempty"`
47-
FullDocument bson.Raw `bson:"fullDocument,omitempty"`
48-
ClusterTime *primitive.Timestamp `bson:"clusterTime,omitEmpty"`
44+
ID any `bson:"_id"`
45+
OpType string `bson:"operationType"`
46+
Ns *Namespace `bson:"ns,omitempty"`
47+
DocKey DocKey `bson:"documentKey,omitempty"`
48+
FullDocument bson.Raw `bson:"fullDocument,omitempty"`
49+
FullDocLen option.Option[types.ByteCount] `bson:"_fullDocLen"`
50+
ClusterTime *primitive.Timestamp `bson:"clusterTime,omitEmpty"`
4951
}
5052

5153
func (pe *ParsedEvent) String() string {
@@ -244,7 +246,9 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch ch
244246
collNames[i] = srcCollName
245247
docIDs[i] = changeEvent.DocKey.ID
246248

247-
if changeEvent.FullDocument == nil {
249+
if changeEvent.FullDocLen.OrZero() > 0 {
250+
dataSizes[i] = int(changeEvent.FullDocLen.OrZero())
251+
} else if changeEvent.FullDocument == nil {
248252
// This happens for deletes and for some updates.
249253
// The document is probably, but not necessarily, deleted.
250254
dataSizes[i] = fauxDocSizeForDeleteEvents
@@ -316,14 +320,38 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() (pipeline mongo.Pipeline)
316320
}
317321
}
318322

319-
return append(
323+
pipeline = append(
320324
pipeline,
321325
bson.D{
322326
{"$unset", []string{
323327
"updateDescription",
324328
}},
325329
},
326330
)
331+
332+
if csr.hasBsonSize() {
333+
pipeline = append(
334+
pipeline,
335+
bson.D{
336+
{"$addFields", bson.D{
337+
{"_fullDocLen", bson.D{{"$bsonSize", "$fullDocument"}}},
338+
{"fullDocument", "$$REMOVE"},
339+
}},
340+
},
341+
)
342+
}
343+
344+
return pipeline
345+
}
346+
347+
func (csr *ChangeStreamReader) hasBsonSize() bool {
348+
major := csr.clusterInfo.VersionArray[0]
349+
350+
if major == 4 {
351+
return csr.clusterInfo.VersionArray[1] >= 4
352+
}
353+
354+
return major > 4
327355
}
328356

329357
// This function reads a single `getMore` response into a slice.
@@ -345,6 +373,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
345373

346374
latestEvent := option.None[ParsedEvent]()
347375

376+
var batchTotalBytes int
348377
for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 {
349378
gotEvent := cs.TryNext(ctx)
350379

@@ -364,6 +393,8 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
364393
changeEvents = make([]ParsedEvent, batchSize)
365394
}
366395

396+
batchTotalBytes += len(cs.Current)
397+
367398
if err := cs.Decode(&changeEvents[eventsRead]); err != nil {
368399
return errors.Wrapf(err, "failed to decode change event to %T", changeEvents[eventsRead])
369400
}
@@ -373,7 +404,8 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
373404
Stringer("changeStream", csr).
374405
Any("event", changeEvents[eventsRead]).
375406
Int("eventsPreviouslyReadInBatch", eventsRead).
376-
Int("batchSize", len(changeEvents)).
407+
Int("batchEvents", len(changeEvents)).
408+
Int("batchBytes", batchTotalBytes).
377409
Msg("Received a change event.")
378410

379411
opType := changeEvents[eventsRead].OpType

internal/verifier/change_stream_test.go

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/10gen/migration-verifier/contextplus"
1010
"github.com/10gen/migration-verifier/internal/logger"
1111
"github.com/10gen/migration-verifier/internal/testutil"
12+
"github.com/10gen/migration-verifier/internal/types"
1213
"github.com/10gen/migration-verifier/internal/util"
1314
"github.com/10gen/migration-verifier/mslices"
1415
"github.com/10gen/migration-verifier/mstrings"
@@ -85,31 +86,67 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() {
8586
"updateDescription",
8687
"update description should be filtered out",
8788
)
89+
}
90+
91+
func (suite *IntegrationTestSuite) TestChangeStreamFilter_BsonSize() {
92+
ctx := suite.Context()
93+
94+
verifier := suite.BuildVerifier()
95+
if !verifier.srcChangeStreamReader.hasBsonSize() {
96+
suite.T().Skip("Need a source version that has $bsonSize")
97+
}
8898

89-
/*
90-
suite.Assert().Contains(
99+
srcColl := verifier.srcClient.Database(suite.DBNameForTest()).Collection("coll")
91100

92-
,
93-
bson.D{
94-
{"$match", bson.D{{"ns.db", bson.D{{"$ne", metaDBName}}}}},
95-
},
101+
_, err := srcColl.InsertOne(ctx, bson.D{{"_id", 123}})
102+
suite.Require().NoError(err)
103+
104+
verifier.srcChangeStreamReader.namespaces = mslices.Of(FullName(srcColl))
96105

97-
)
98-
verifier.srcChangeStreamReader.namespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"}
99-
suite.Assert().Contains(
106+
filter := verifier.srcChangeStreamReader.GetChangeStreamFilter()
100107

101-
verifier.srcChangeStreamReader.GetChangeStreamFilter(),
102-
bson.D{{"$match", bson.D{
103-
{"$or", []bson.D{
104-
{{"ns", bson.D{{"db", "foo"}, {"coll", "bar"}}}},
105-
{{"ns", bson.D{{"db", "foo"}, {"coll", "baz"}}}},
106-
{{"ns", bson.D{{"db", "test"}, {"coll", "car"}}}},
107-
{{"ns", bson.D{{"db", "test"}, {"coll", "chaz"}}}},
108-
}},
109-
}}},
108+
cs, err := suite.srcMongoClient.Watch(
109+
ctx,
110+
filter,
111+
options.ChangeStream().SetFullDocument("updateLookup"),
112+
)
113+
suite.Require().NoError(err)
114+
defer cs.Close(ctx)
115+
116+
// Now create a large document and verify that the corresponding event
117+
// does NOT contain that full document but _does_ contain the document’s
118+
// length.
119+
120+
_, err = srcColl.InsertOne(ctx, bson.D{
121+
{"_id", "abc"},
122+
{"bigstring", strings.Repeat("x", 10_000)},
123+
})
124+
suite.Require().NoError(err)
125+
126+
suite.Require().True(cs.Next(ctx), "should get event")
127+
128+
suite.Require().Equal(
129+
"abc",
130+
cs.Current.Lookup("documentKey", "_id").StringValue(),
131+
"event should reference expected document",
132+
)
133+
suite.Assert().Less(len(cs.Current), 10_000, "event should not be large")
134+
135+
parsed := ParsedEvent{}
136+
suite.Require().NoError(cs.Decode(&parsed))
137+
suite.Require().Equal("insert", parsed.OpType)
138+
139+
suite.Require().True(parsed.FullDocLen.IsSome(), "full doc len should be in event")
140+
suite.Assert().Greater(parsed.FullDocLen.MustGet(), types.ByteCount(10_000))
141+
142+
_, err = srcColl.DeleteOne(ctx, bson.D{{"_id", "abc"}})
143+
suite.Require().NoError(err)
110144

111-
)
112-
*/
145+
suite.Require().True(cs.Next(ctx), "should get event")
146+
parsed = ParsedEvent{}
147+
suite.Require().NoError(cs.Decode(&parsed))
148+
suite.Require().Equal("delete", parsed.OpType)
149+
suite.Require().True(parsed.FullDocLen.IsNone(), "full doc len not in delete")
113150
}
114151

115152
func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() {

internal/verifier/migration_verifier.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,9 +446,9 @@ func (verifier *Verifier) getGenerationWhileLocked() (int, bool) {
446446

447447
// As long as no other goroutine has locked the mux this will
448448
// usefully panic if the caller neglected the lock.
449-
wasUnlocked := verifier.mux.TryRLock()
449+
wasUnlocked := verifier.mux.TryLock()
450450
if wasUnlocked {
451-
verifier.mux.RUnlock()
451+
verifier.mux.Unlock()
452452
panic("getGenerationWhileLocked() while unlocked")
453453
}
454454

internal/verifier/recheck.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ func (verifier *Verifier) insertRecheckDocs(
9292
documentIDs []any,
9393
dataSizes []int,
9494
) error {
95-
verifier.mux.Lock()
96-
defer verifier.mux.Unlock()
95+
verifier.mux.RLock()
96+
defer verifier.mux.RUnlock()
9797

9898
generation, _ := verifier.getGenerationWhileLocked()
9999

@@ -105,8 +105,6 @@ func (verifier *Verifier) insertRecheckDocs(
105105
genCollection := verifier.getRecheckQueueCollection(generation)
106106

107107
for _, curThreadIndexes := range indexesPerThread {
108-
curThreadIndexes := curThreadIndexes
109-
110108
eg.Go(func() error {
111109
models := make([]mongo.WriteModel, len(curThreadIndexes))
112110
for m, i := range curThreadIndexes {
@@ -162,9 +160,7 @@ func (verifier *Verifier) insertRecheckDocs(
162160
})
163161
}
164162

165-
err := eg.Wait()
166-
167-
if err != nil {
163+
if err := eg.Wait(); err != nil {
168164
return errors.Wrapf(
169165
err,
170166
"failed to persist %d recheck(s) for generation %d",

0 commit comments

Comments
 (0)