Skip to content

Commit 439fc9d

Browse files
authored
REP-6634 Add time-series support (#140)
This changeset adds basic time-series support. Because of the verifier’s `_id` dependency, this verifies buckets rather than logical measurements. This implies a requirement that the migration copy time-series collections via buckets as well since a logical replication would not preserve artifacts like bucket `_id`, and possibly not even the grouping of measurements. Because of the bucket-level verification, details in mismatch reports are not very useful for time-series because they reference bucket-level fields that the logical API doesn’t expose. This works with per-shard verification (i.e., it can verify with or without a view). It _does not_ currently support namespace filtering.
1 parent 25e7f29 commit 439fc9d

File tree

7 files changed

+411
-16
lines changed

7 files changed

+411
-16
lines changed

README.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,8 +357,16 @@ Additionally, because the amount of data sent to migration-verifier doesn’t ac
357357
358358
- If the server’s memory usage rises after generation 0, try reducing `recheckMaxSizeMB`. This will shrink the queries that the verifier sends, which in turn should reduce the server’s memory usage. (The number of actual queries sent will rise, of course.)
359359
360+
## Time-Series Collections
361+
362+
Because the verifier compares documents by `_id`, it cannot compare logical time-series measurements (i.e., the data that users actually insert). Instead it compares the server’s internal time-series “buckets”. Unfortunately, this makes mismatch details essentially useless with time-series since they will be details about time-series buckets, which users generally don’t see.
363+
364+
It also requires that migrations replicate the raw buckets rather than the logical measurements. This is because a logical migration would cause `_id` mismatches between source & destination buckets. A user application wouldn’t care (since it never sees the buckets’ `_id`s), but verification does.
365+
366+
NB: Given bucket documents’ size, hashed document comparison can be especially useful with time-series.
367+
360368
# Limitations
361369
362-
- The verifier’s iterative process can handle data changes while it is running, until you hit the writesOff endpoint. However, it cannot handle DDL commands. If the verifier receives a DDL change stream event, the verification will fail.
370+
- The verifier’s iterative process can handle data changes while it is running, until you hit the writesOff endpoint. However, it cannot handle DDL commands. If the verifier receives a DDL change stream event from the source, the verification will fail permanently.
363371
364-
- The verifier crashes if it tries to compare time-series collections. The error will include a phrase like “Collection has nil UUID (most probably is a view)” and also mention “timeseries”.
372+
- The verifier cannot verify time-series collections under namespace filtering.

internal/verifier/change_stream.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,12 @@ func (csr *ChangeStreamReader) createChangeStream(
622622
SetMaxAwaitTime(maxChangeStreamAwaitTime)
623623

624624
if csr.clusterInfo.VersionArray[0] >= 6 {
625-
opts = opts.SetCustomPipeline(bson.M{"showExpandedEvents": true})
625+
opts = opts.SetCustomPipeline(
626+
bson.M{
627+
"showSystemEvents": true,
628+
"showExpandedEvents": true,
629+
},
630+
)
626631
}
627632

628633
savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx)

internal/verifier/check.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -202,13 +202,13 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
202202
err = verifier.verificationDatabase().Drop(ctx)
203203
if err != nil {
204204
verifier.mux.Unlock()
205-
return err
205+
return errors.Wrap(err, "dropping metadata")
206206
}
207207
} else {
208208
genOpt, err := verifier.readGeneration(ctx)
209209
if err != nil {
210210
verifier.mux.Unlock()
211-
return err
211+
return errors.Wrap(err, "reading generation from metadata")
212212
}
213213

214214
if gen, has := genOpt.Get(); has {
@@ -221,6 +221,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
221221
}
222222
}
223223

224+
verifier.logger.Info().Msg("Starting change streams.")
225+
224226
// Now that we’ve initialized verifier.generation we can
225227
// start the change stream readers.
226228
verifier.initializeChangeStreamReaders()
@@ -230,7 +232,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
230232
func(ctx context.Context, _ *retry.FuncInfo) error {
231233
err = verifier.AddMetaIndexes(ctx)
232234
if err != nil {
233-
return err
235+
return errors.Wrap(err, "adding metadata indexes")
234236
}
235237

236238
err = verifier.doInMetaTransaction(
@@ -457,7 +459,7 @@ func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error
457459
}
458460
isPrimary, err := verifier.CreatePrimaryTaskIfNeeded(ctx)
459461
if err != nil {
460-
return err
462+
return errors.Wrap(err, "creating primary task")
461463
}
462464
if !isPrimary {
463465
verifier.logger.Info().Msg("Primary task already existed; skipping setup")
@@ -466,7 +468,7 @@ func (verifier *Verifier) CreateInitialTasksIfNeeded(ctx context.Context) error
466468
if verifier.verifyAll {
467469
err := verifier.setupAllNamespaceList(ctx)
468470
if err != nil {
469-
return err
471+
return errors.Wrap(err, "creating namespace list")
470472
}
471473
}
472474
for _, src := range verifier.srcNamespaces {

internal/verifier/list_namespaces.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/10gen/migration-verifier/internal/logger"
77
"github.com/10gen/migration-verifier/internal/util"
8+
"github.com/10gen/migration-verifier/mmongo"
89
"github.com/10gen/migration-verifier/mslices"
910
"go.mongodb.org/mongo-driver/bson"
1011
"go.mongodb.org/mongo-driver/mongo"
@@ -27,8 +28,12 @@ var (
2728
ExcludedSystemCollPrefix = "system."
2829
)
2930

30-
// Lists all the user collections on a cluster. Unlike mongosync, we don't use the internal $listCatalog, since we need to
31-
// work on old versions without that command. This means this does not run with read concern majority.
31+
// ListAllUserNamespaces lists all the user collections on a cluster,
32+
// in addition to time-series “system.buckets.*” collections.
33+
//
34+
// Unlike mongosync, we don't use the internal $listCatalog, since we need to
35+
// work on old versions without that command. Thus, this does *NOT* run with
36+
// majority read concern.
3237
func ListAllUserNamespaces(
3338
ctx context.Context,
3439
logger *logger.Logger,
@@ -62,10 +67,17 @@ func ListAllUserNamespaces(
6267
for _, dbName := range dbNames {
6368
db := client.Database(dbName)
6469

65-
filter := util.ExcludePrefixesQuery(
66-
"name",
67-
mslices.Of(ExcludedSystemCollPrefix),
68-
)
70+
filter := bson.D{
71+
{"$or", []bson.D{
72+
util.ExcludePrefixesQuery(
73+
"name",
74+
mslices.Of(ExcludedSystemCollPrefix),
75+
),
76+
{
77+
{"$expr", mmongo.StartsWithAgg("$name", timeseriesBucketsPrefix)},
78+
},
79+
}},
80+
}
6981

7082
specifications, err := db.ListCollectionSpecifications(ctx, filter, options.ListCollections().SetNameOnly(true))
7183
if err != nil {

internal/verifier/migration_verifier.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -822,8 +822,20 @@ func (verifier *Verifier) compareCollectionSpecifications(
822822
}
823823
}
824824

825-
// Don't compare view data; they have no data of their own.
826-
canCompareData := srcSpec.Type != "view"
825+
canCompareData := false
826+
827+
switch srcSpec.Type {
828+
case "collection":
829+
canCompareData = true
830+
case "view":
831+
case "timeseries":
832+
if !verifier.verifyAll {
833+
return nil, false, fmt.Errorf("cannot verify time-series collection (%#q) under namespace filtering", srcNs)
834+
}
835+
default:
836+
return nil, false, fmt.Errorf("unrecognized collection type (spec: %+v)", srcSpec)
837+
}
838+
827839
// Do not compare data between capped and uncapped collections because the partitioning is different.
828840
canCompareData = canCompareData && srcSpec.Options.Lookup("capped").Equal(dstSpec.Options.Lookup("capped"))
829841

internal/verifier/timeseries.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package verifier
2+
3+
const timeseriesBucketsPrefix = "system.buckets."

0 commit comments

Comments
 (0)