diff --git a/.github/workflows/all.yml b/.github/workflows/all.yml index d5bd8701..280ee945 100644 --- a/.github/workflows/all.yml +++ b/.github/workflows/all.yml @@ -28,6 +28,41 @@ jobs: - mongodb_versions: [ '4.2.5', '6.0' ] topology: replset + - mongodb_versions: [ '4.0', '8.0' ] + topology: replset + srcChangeReader: tailOplog + dstChangeReader: tailOplog + + - mongodb_versions: [ '4.2', '8.0' ] + topology: replset + srcChangeReader: tailOplog + dstChangeReader: tailOplog + + - mongodb_versions: [ '4.4', '8.0' ] + topology: replset + srcChangeReader: tailOplog + dstChangeReader: tailOplog + + - mongodb_versions: [ '5.0', '8.0' ] + topology: replset + srcChangeReader: tailOplog + dstChangeReader: tailOplog + + - mongodb_versions: [ '6.0', '8.0' ] + topology: replset + srcChangeReader: tailOplog + dstChangeReader: tailOplog + + - mongodb_versions: [ '7.0', '8.0' ] + topology: replset + srcChangeReader: tailOplog + dstChangeReader: tailOplog + + - mongodb_versions: [ '8.0', '8.0' ] + topology: replset + srcChangeReader: tailOplog + dstChangeReader: tailOplog + exclude: - mongodb_versions: [ '4.2', '4.2' ] toHashedIndexKey: true @@ -69,6 +104,9 @@ jobs: toHashedIndexKey: [true, false] + srcChangeReader: [changeStream] + dstChangeReader: [changeStream] + topology: - replset - replset-to-sharded @@ -78,7 +116,7 @@ jobs: # versions need. runs-on: ubuntu-22.04 - name: ${{ matrix.mongodb_versions[0] }} to ${{ matrix.mongodb_versions[1] }}, ${{ matrix.topology }}${{ matrix.toHashedIndexKey && ', hashed doc compare' || '' }} + name: ${{ matrix.mongodb_versions[0] }} to ${{ matrix.mongodb_versions[1] }}, ${{ matrix.topology }}${{ matrix.toHashedIndexKey && ', hashed doc compare' || '' }}, srcChangeReader=${{ matrix.srcChangeReader }}, dstChangeReader=${{ matrix.dstChangeReader }} steps: - run: uname -a @@ -124,6 +162,9 @@ jobs: env: MVTEST_DOC_COMPARE_METHOD: ${{matrix.toHashedIndexKey && 'toHashedIndexKey' || ''}} + MVTEST_SRC_CHANGE_READER: ${{matrix.srcChangeReader}} + MVTEST_DST_CHANGE_READER: ${{matrix.dstChangeReader}} + MVTEST_SRC: ${{ (matrix.topology == 'sharded') && env.shardedSrcConnStr || env.replsetSrcConnStr }} MVTEST_DST: ${{ (matrix.topology == 'sharded' || matrix.topology == 'replset-to-sharded') && env.shardedDstConnStr || env.replsetDstConnStr }} diff --git a/README.md b/README.md index 48926001..e1c0afa5 100644 --- a/README.md +++ b/README.md @@ -145,6 +145,8 @@ The verifier will now check to completion to make sure that there are no inconsi | `--dstNamespace ` | destination namespaces to check | | `--metaDBName ` | name of the database in which to store verification metadata (default: "migration_verification_metadata") | | `--docCompareMethod` | How to compare documents. See below for details. | +| `--srcChangeReader` | How to read changes from the source. See below for details. | +| `--dstChangeReader` | How to read changes from the destination. See below for details. | | `--start` | Start checking documents right away rather than waiting for a `/check` API request. | | `--verifyAll` | If set, verify all user namespaces | | `--clean` | If set, drop all previous verification metadata before starting | @@ -198,8 +200,6 @@ connection strings in the following environment variables: The migration-verifier has two steps: - - 1. The initial check 1. The verifier partitions up the data into 400MB (configurable) chunks and spins up many worker goroutines (threads) to read from both the source and destination. 2. The verifier compares the documents on the source and destination by bytes and if they are different, it then checks field by field in case the field ordering has changed (since field ordering isn't required to be the same for the migration to be a success) @@ -392,6 +392,19 @@ Full-document verification methods allow migration-verifier to diagnose mismatch Additionally, because the amount of data sent to migration-verifier doesn’t actually reflect the documents’ size, no meaningful statistics are shown concerning the collection data size. Document counts, of course, are still shown. +# Change reading methods + +NB: If the verifier restarts, it **MUST** use the same change reader options +as before, or it will fail immediately. + +## `changeStream` + +The default. The verifier will read a change stream, which works seamlessly on sharded or unsharded clusters. + +## `tailOplog` + +The verifier will read the oplog continually instead of reading a change stream. This is generally faster, but it doesn’t work in sharded clusters. + # Known Issues - The verifier may report missing documents on the destination that don’t actually appear to be missing (i.e., a nonexistent problem). This has been hard to reproduce. If missing documents are reported, it is good practice to check for false positives. diff --git a/agg/agg.go b/agg/agg.go new file mode 100644 index 00000000..3633a1c7 --- /dev/null +++ b/agg/agg.go @@ -0,0 +1,208 @@ +// Package agg provides convenience types for aggregation operators. +// This yields two major advantages over using bson.D or bson.M: +// - simpler syntax +// - auto-completion (i.e., via gopls) +// +// Guiding principles are: +// - Prefer [1]any for unary operators (e.g., `$bsonSize`). +// - Prefer struct types for operators with named parameters. +// - Use functions sparingly, e.g., for “tuple” operators like `$in`. +package agg + +import ( + "go.mongodb.org/mongo-driver/v2/bson" +) + +type Eq []any + +var _ bson.Marshaler = Eq{} + +func (e Eq) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{{"$eq", []any(e)}}) +} + +// --------------------------------------------- + +func In[T any](needle any, haystack []T) bson.D { + return bson.D{{"$in", bson.A{needle, haystack}}} +} + +// --------------------------------------------- + +type BSONSize [1]any + +var _ bson.Marshaler = BSONSize{} + +func (b BSONSize) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{{"$bsonSize", b[0]}}) +} + +// --------------------------------------------- + +type Type [1]any + +var _ bson.Marshaler = Type{} + +func (t Type) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{{"$type", t[0]}}) +} + +// --------------------------------------------- + +type Not [1]any + +var _ bson.Marshaler = Type{} + +func (n Not) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{{"$not", n[0]}}) +} + +// --------------------------------------------- + +type And []any + +var _ bson.Marshaler = And{} + +func (a And) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$and", []any(a)}, + }) +} + +// --------------------------------------------- + +type Or []any + +var _ bson.Marshaler = Or{} + +func (o Or) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$or", []any(o)}, + }) +} + +// --------------------------------------------- + +type MergeObjects []any + +var _ bson.Marshaler = MergeObjects{} + +func (m MergeObjects) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$mergeObjects", []any(m)}, + }) +} + +// --------------------------------------------- + +type Cond struct { + If, Then, Else any +} + +var _ bson.Marshaler = Cond{} + +func (c Cond) D() bson.D { + return bson.D{ + {"$cond", bson.D{ + {"if", c.If}, + {"then", c.Then}, + {"else", c.Else}, + }}, + } +} + +func (c Cond) MarshalBSON() ([]byte, error) { + return bson.Marshal(c.D()) +} + +// --------------------------------------------- + +type Switch struct { + Branches []SwitchCase + Default any +} + +var _ bson.Marshaler = Switch{} + +type SwitchCase struct { + Case any + Then any +} + +func (s Switch) D() bson.D { + return bson.D{{"$switch", bson.D{ + {"branches", s.Branches}, + {"default", s.Default}, + }}} +} + +func (s Switch) MarshalBSON() ([]byte, error) { + return bson.Marshal(s.D()) +} + +// --------------------------------------------- + +type ArrayElemAt struct { + Array any + Index int +} + +var _ bson.Marshaler = ArrayElemAt{} + +func (a ArrayElemAt) D() bson.D { + return bson.D{{"$arrayElemAt", bson.A{ + a.Array, + a.Index, + }}} +} + +func (a ArrayElemAt) MarshalBSON() ([]byte, error) { + return bson.Marshal(a.D()) +} + +// --------------------------------------------- + +type Map struct { + Input, As, In any +} + +var _ bson.Marshaler = Map{} + +func (m Map) D() bson.D { + return bson.D{ + {"$map", bson.D{ + {"input", m.Input}, + {"as", m.As}, + {"in", m.In}, + }}, + } +} + +func (m Map) MarshalBSON() ([]byte, error) { + return bson.Marshal(m.D()) +} + +// ------------------------------------------ + +type Filter struct { + Input, As, Cond, Limit any +} + +var _ bson.Marshaler = Filter{} + +func (f Filter) D() bson.D { + d := bson.D{ + {"input", f.Input}, + {"as", f.As}, + {"cond", f.Cond}, + } + + if f.Limit != nil { + d = append(d, bson.E{"limit", f.Limit}) + } + return bson.D{{"$filter", d}} +} + +func (f Filter) MarshalBSON() ([]byte, error) { + return bson.Marshal(f.D()) +} diff --git a/agg/helpers/exist.go b/agg/helpers/exist.go new file mode 100644 index 00000000..0d507066 --- /dev/null +++ b/agg/helpers/exist.go @@ -0,0 +1,12 @@ +package helpers + +import ( + "github.com/10gen/migration-verifier/agg" + "go.mongodb.org/mongo-driver/v2/bson" +) + +type Exists [1]any + +func (e Exists) MarshalBSON() ([]byte, error) { + return bson.Marshal(agg.Not{agg.Eq{"missing", agg.Type{e[0]}}}) +} diff --git a/agg/helpers/string.go b/agg/helpers/string.go new file mode 100644 index 00000000..a84523bf --- /dev/null +++ b/agg/helpers/string.go @@ -0,0 +1,27 @@ +// Package helpers exposes functions that express common operations +// that don’t map to a single aggregation operator. +package helpers + +import ( + "go.mongodb.org/mongo-driver/v2/bson" +) + +// StringHasPrefix parallels Go’s strings.HasPrefix. +type StringHasPrefix struct { + FieldRef any + Prefix string +} + +func (sp StringHasPrefix) MarshalBSON() ([]byte, error) { + return bson.Marshal(bson.D{ + {"$eq", bson.A{ + 0, + bson.D{{"$indexOfCP", bson.A{ + sp.FieldRef, + sp.Prefix, + 0, + 1, + }}}, + }}, + }) +} diff --git a/internal/retry/retry.go b/internal/retry/retry.go index 62f74e8d..7ab5959b 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -3,6 +3,7 @@ package retry import ( "context" "fmt" + "slices" "time" "github.com/10gen/migration-verifier/contextplus" @@ -180,18 +181,20 @@ func (r *Retryer) runRetryLoop( // Not a transient error? Fail immediately. if !r.shouldRetryWithSleep(logger, sleepTime, descriptions, cbErr) { - return cbErr + return wrapErrWithDescriptions(cbErr, descriptions) } // Our error is transient. If we've exhausted the allowed time // then fail. if failedFuncInfo.GetDurationSoFar() > li.durationLimit { - return RetryDurationLimitExceededErr{ + var err error = RetryDurationLimitExceededErr{ attempts: li.attemptsSoFar, duration: failedFuncInfo.GetDurationSoFar(), lastErr: groupErr.errFromCallback, } + + return wrapErrWithDescriptions(err, descriptions) } // Sleep and increase the sleep time for the next retry, @@ -235,6 +238,17 @@ func (r *Retryer) addDescriptionToEvent(event *zerolog.Event) *zerolog.Event { return event } +func wrapErrWithDescriptions(err error, descriptions []string) error { + reversed := slices.Clone(descriptions) + slices.Reverse(reversed) + + for _, d := range reversed { + err = errors.Wrap(err, d) + } + + return err +} + // // For the above function, there have historically been concerns regarding majority write concern // upon retrying a write operation to the server. Mongomirror explicitly handled this: diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index e0a680df..79abf1c7 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -10,6 +10,7 @@ import ( "github.com/10gen/migration-verifier/internal/util" "github.com/pkg/errors" "github.com/samber/lo" + "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" @@ -90,6 +91,60 @@ func convertDocsToAnys(docs []bson.D) []any { return anys } +func KillTransactions( + ctx context.Context, + t *testing.T, + client *mongo.Client, +) { + cursor, err := client.Database( + "admin", + options.Database().SetReadConcern(readconcern.Local()), + ).Aggregate( + ctx, + mongo.Pipeline{ + {{"$currentOp", bson.D{}}}, + {{"$match", bson.D{ + {"transaction.parameters.txnNumber", bson.D{ + {"$exists", true}, + }}, + }}}, + }, + ) + require.NoError(t, err) + + type txn struct { + LSID struct { + ID bson.Binary + } + } + + var txns []txn + require.NoError(t, cursor.All(ctx, &txns)) + + if len(txns) == 0 { + return + } + + t.Logf("Killing %d transaction(s) via killSessions …", len(txns)) + + sessionsToKill := lo.Map( + txns, + func(t txn, _ int) bson.D { + return bson.D{{"id", t.LSID.ID}} + }, + ) + + require.NoError( + t, + client.Database("admin").RunCommand( + ctx, + bson.D{ + {"killSessions", sessionsToKill}, + }, + ).Err(), + ) +} + func KillApplicationChangeStreams( ctx context.Context, t *testing.T, diff --git a/internal/util/cluster_time.go b/internal/util/cluster_time.go index da9bef04..294d36b4 100644 --- a/internal/util/cluster_time.go +++ b/internal/util/cluster_time.go @@ -10,6 +10,10 @@ import ( func GetClusterTimeFromSession(sess *mongo.Session) (bson.Timestamp, error) { clusterTimeRaw := sess.ClusterTime() + if clusterTimeRaw == nil { + panic("found empty session cluster time but need nonempty") + } + ctrv, err := clusterTimeRaw.LookupErr("$clusterTime", "clusterTime") if err != nil { return bson.Timestamp{}, errors.Wrapf(err, "finding clusterTime in session cluster time document (%v)", clusterTimeRaw) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 056fdbc5..3e0273fd 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -6,12 +6,14 @@ import ( "github.com/10gen/migration-verifier/history" "github.com/10gen/migration-verifier/internal/logger" + "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/msync" "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/samber/lo" + "github.com/samber/mo" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" @@ -21,7 +23,7 @@ import ( type ddlEventHandling string const ( - fauxDocSizeForDeleteEvents = 1024 + defaultUserDocumentSize = 1024 // The number of batches we’ll hold in memory at once. batchChanBufferSize = 100 @@ -33,7 +35,7 @@ const ( type changeReader interface { getWhichCluster() whichCluster - getReadChannel() <-chan changeEventBatch + getReadChannel() <-chan eventBatch getStartTimestamp() bson.Timestamp getLastSeenClusterTime() option.Option[bson.Timestamp] getEventsPerSecond() option.Option[float64] @@ -58,9 +60,9 @@ type ChangeReaderCommon struct { resumeTokenTSExtractor func(bson.Raw) (bson.Timestamp, error) - running bool - changeEventBatchChan chan changeEventBatch - writesOffTs *util.Eventual[bson.Timestamp] + running bool + eventBatchChan chan eventBatch + writesOffTs *util.Eventual[bson.Timestamp] lastChangeEventTime *msync.TypedAtomic[option.Option[bson.Timestamp]] @@ -69,17 +71,20 @@ type ChangeReaderCommon struct { lag *msync.TypedAtomic[option.Option[time.Duration]] batchSizeHistory *history.History[int] + createIteratorCb func(context.Context, *mongo.Session) (bson.Timestamp, error) + iterateCb func(context.Context, *retry.FuncInfo, *mongo.Session) error + onDDLEvent ddlEventHandling } func newChangeReaderCommon(clusterName whichCluster) ChangeReaderCommon { return ChangeReaderCommon{ - readerType: clusterName, - changeEventBatchChan: make(chan changeEventBatch, batchChanBufferSize), - writesOffTs: util.NewEventual[bson.Timestamp](), - lag: msync.NewTypedAtomic(option.None[time.Duration]()), - lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()), - batchSizeHistory: history.New[int](time.Minute), + readerType: clusterName, + eventBatchChan: make(chan eventBatch, batchChanBufferSize), + writesOffTs: util.NewEventual[bson.Timestamp](), + lag: msync.NewTypedAtomic(option.None[time.Duration]()), + lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()), + batchSizeHistory: history.New[int](time.Minute), onDDLEvent: lo.Ternary( clusterName == dst, onDDLEventAllow, @@ -108,8 +113,8 @@ func (rc *ChangeReaderCommon) isRunning() bool { return rc.running } -func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch { - return rc.changeEventBatchChan +func (rc *ChangeReaderCommon) getReadChannel() <-chan eventBatch { + return rc.eventBatchChan } func (rc *ChangeReaderCommon) getLastSeenClusterTime() option.Option[bson.Timestamp] { @@ -120,7 +125,7 @@ func (rc *ChangeReaderCommon) getLastSeenClusterTime() option.Option[bson.Timest // as a fraction. If saturation rises, that means we’re reading events faster // than we can persist them. func (rc *ChangeReaderCommon) getBufferSaturation() float64 { - return util.DivideToF64(len(rc.changeEventBatchChan), cap(rc.changeEventBatchChan)) + return util.DivideToF64(len(rc.eventBatchChan), cap(rc.eventBatchChan)) } // getLag returns the observed change stream lag (i.e., the delta between @@ -155,43 +160,137 @@ func (rc *ChangeReaderCommon) getEventsPerSecond() option.Option[float64] { return option.None[float64]() } +func (rc *ChangeReaderCommon) start( + ctx context.Context, + eg *errgroup.Group, +) error { + // This channel holds the first change stream creation's result, whether + // success or failure. Rather than using a Result we could make separate + // Timestamp and error channels, but the single channel is cleaner since + // there's no chance of "nonsense" like both channels returning a payload. + initialCreateResultChan := make(chan mo.Result[bson.Timestamp]) + + eg.Go( + func() error { + // Closing changeEventBatchChan at the end of change stream goroutine + // notifies the verifier's change event handler to exit. + defer func() { + rc.logger.Debug(). + Str("reader", string(rc.readerType)). + Msg("Finished.") + + close(rc.eventBatchChan) + }() + + retryer := retry.New().WithErrorCodes(util.CursorKilledErrCode) + + parentThreadWaiting := true + + err := retryer.WithCallback( + func(ctx context.Context, ri *retry.FuncInfo) error { + sess, err := rc.watcherClient.StartSession() + if err != nil { + return errors.Wrap(err, "failed to start session") + } + + if rc.createIteratorCb == nil { + panic("rc.createIteratorCb should be set") + } + + startTs, err := rc.createIteratorCb(ctx, sess) + if err != nil { + logEvent := rc.logger.Debug(). + Err(err). + Str("reader", string(rc.readerType)) + + if parentThreadWaiting { + logEvent.Msg("First change stream open failed.") + + initialCreateResultChan <- mo.Err[bson.Timestamp](err) + return nil + } + + logEvent.Msg("Retried change stream open failed.") + + return err + } + + logEvent := rc.logger.Debug(). + Str("reader", string(rc.readerType)). + Any("startTimestamp", startTs) + + if parentThreadWaiting { + logEvent.Msg("First change stream open succeeded.") + + initialCreateResultChan <- mo.Ok(startTs) + close(initialCreateResultChan) + parentThreadWaiting = false + } else { + logEvent.Msg("Retried change stream open succeeded.") + } + + return rc.iterateCb(ctx, ri, sess) + }, + "reading %s’s changes", rc.readerType, + ).Run(ctx, rc.logger) + + return err + }, + ) + + result := <-initialCreateResultChan + + startTs, err := result.Get() + if err != nil { + return errors.Wrapf(err, "creating change stream") + } + + rc.startAtTs = &startTs + + rc.running = true + + return nil +} + func (rc *ChangeReaderCommon) persistResumeToken(ctx context.Context, token bson.Raw) error { + ts, err := rc.resumeTokenTSExtractor(token) + if err != nil { + return errors.Wrapf(err, "parsing resume token %#q", token) + } + + if ts.IsZero() { + panic("empty ts in resume token is invalid!") + } + coll := rc.metaDB.Collection(changeReaderCollectionName) - _, err := coll.ReplaceOne( + _, err = coll.ReplaceOne( ctx, - bson.D{{"_id", rc.resumeTokenDocID()}}, + bson.D{{"_id", resumeTokenDocID(rc.getWhichCluster())}}, token, options.Replace().SetUpsert(true), ) - if err == nil { - ts, err := rc.resumeTokenTSExtractor(token) + if err != nil { + return errors.Wrapf(err, "persisting %s resume token (%v)", rc.readerType, token) + } - logEvent := rc.logger.Debug() + logEvent := rc.logger.Debug() - if err == nil { - logEvent = addTimestampToLogEvent(ts, logEvent) - } else { - rc.logger.Warn().Err(err). - Msg("failed to extract resume token timestamp") - } + logEvent = addTimestampToLogEvent(ts, logEvent) - logEvent.Msgf("Persisted %s's resume token.", rc.readerType) - - return nil - } + logEvent.Msgf("Persisted %s’s resume token.", rc.readerType) - return errors.Wrapf(err, "failed to persist %s resume token (%v)", rc.readerType, token) + return nil } -func (rc *ChangeReaderCommon) resumeTokenDocID() string { - switch rc.readerType { +func resumeTokenDocID(clusterType whichCluster) string { + switch clusterType { case src: return "srcResumeToken" case dst: return "dstResumeToken" default: - panic("unknown readerType: " + rc.readerType) + panic("unknown readerType: " + clusterType) } } @@ -204,7 +303,7 @@ func (rc *ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Optio token, err := coll.FindOne( ctx, - bson.D{{"_id", rc.resumeTokenDocID()}}, + bson.D{{"_id", resumeTokenDocID(rc.getWhichCluster())}}, ).Raw() if err != nil { diff --git a/internal/verifier/change_reader_test.go b/internal/verifier/change_reader_test.go new file mode 100644 index 00000000..83482a1e --- /dev/null +++ b/internal/verifier/change_reader_test.go @@ -0,0 +1,58 @@ +package verifier + +import "github.com/10gen/migration-verifier/internal/util" + +// TestFailChangeReaderOptChange confirms that verifier fails if it restarts +// with different change-reader settings. +func (suite *IntegrationTestSuite) TestFailChangeReaderOptChange() { + if suite.GetTopology(suite.srcMongoClient) == util.TopologySharded { + suite.T().Skipf("sharded source can only read changes via change stream") + } + + ctx := suite.Context() + + v1 := suite.BuildVerifier() + suite.Require().NoError( + v1.SetSrcChangeReaderMethod(ChangeReaderOptChangeStream), + ) + suite.Require().NoError( + v1.SetDstChangeReaderMethod(ChangeReaderOptChangeStream), + ) + + v1Runner := RunVerifierCheck(ctx, suite.T(), v1) + suite.Require().NoError( + v1Runner.AwaitGenerationEnd(), + ) + + badSrcOptVerifier := suite.BuildVerifier() + suite.Require().NoError( + badSrcOptVerifier.SetSrcChangeReaderMethod(ChangeReaderOptOplog), + ) + suite.Require().NoError( + badSrcOptVerifier.SetDstChangeReaderMethod(ChangeReaderOptChangeStream), + ) + + err := RunVerifierCheck(ctx, suite.T(), badSrcOptVerifier). + AwaitGenerationEnd() + + suite.Require().Error(err, "wrong source opt should fail") + suite.Assert().ErrorAs(err, &changeReaderOptMismatchErr{}) + + if suite.GetTopology(suite.dstMongoClient) == util.TopologySharded { + return + } + + badDstOptVerifier := suite.BuildVerifier() + suite.Require().NoError( + badDstOptVerifier.SetSrcChangeReaderMethod(ChangeReaderOptChangeStream), + ) + suite.Require().NoError( + badDstOptVerifier.SetDstChangeReaderMethod(ChangeReaderOptOplog), + ) + + err = RunVerifierCheck(ctx, suite.T(), badDstOptVerifier). + AwaitGenerationEnd() + + suite.Require().Error(err, "wrong destination opt should fail") + suite.Assert().ErrorAs(err, &changeReaderOptMismatchErr{}) +} diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 81732825..e4549841 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -8,17 +8,16 @@ import ( "github.com/10gen/migration-verifier/internal/keystring" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/internal/verifier/namespaces" "github.com/10gen/migration-verifier/mbson" "github.com/10gen/migration-verifier/option" mapset "github.com/deckarep/golang-set/v2" clone "github.com/huandu/go-clone/generic" "github.com/pkg/errors" - "github.com/samber/mo" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" "golang.org/x/exp/slices" - "golang.org/x/sync/errgroup" ) var supportedEventOpTypes = mapset.NewSet( @@ -29,8 +28,9 @@ var supportedEventOpTypes = mapset.NewSet( ) const ( - minChangeStreamPersistInterval = time.Second * 10 - maxChangeStreamAwaitTime = time.Second + maxChangeStreamAwaitTime = time.Second + + ChangeReaderOptChangeStream = "changeStream" ) type UnknownEventError struct { @@ -42,6 +42,7 @@ func (uee UnknownEventError) Error() string { } type ChangeStreamReader struct { + changeStream *mongo.ChangeStream ChangeReaderCommon } @@ -66,6 +67,9 @@ func (v *Verifier) newChangeStreamReader( csr := &ChangeStreamReader{ChangeReaderCommon: common} + csr.createIteratorCb = csr.createChangeStream + csr.iterateCb = csr.iterateChangeStream + return csr } @@ -87,7 +91,7 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() (pipeline mongo.Pipeline) {{"$match", util.ExcludePrefixesQuery( "ns.db", append( - slices.Clone(ExcludedDBPrefixes), + slices.Clone(namespaces.ExcludedDBPrefixes), csr.metaDB.Name(), ), )}}, @@ -145,10 +149,9 @@ func (csr *ChangeStreamReader) GetChangeStreamFilter() (pipeline mongo.Pipeline) // is unideal but shouldn’t impede correctness since post-writesOff events // shouldn’t really happen anyway by definition. func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( - ctx context.Context, + sctx context.Context, ri *retry.FuncInfo, cs *mongo.ChangeStream, - sess *mongo.Session, ) error { eventsRead := 0 var changeEvents []ParsedEvent @@ -157,7 +160,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( var batchTotalBytes int for hasEventInBatch := true; hasEventInBatch; hasEventInBatch = cs.RemainingBatchLength() > 0 { - gotEvent := cs.TryNext(ctx) + gotEvent := cs.TryNext(sctx) if cs.Err() != nil { return errors.Wrap(cs.Err(), "change stream iteration failed") @@ -224,6 +227,8 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( eventsRead++ } + sess := mongo.SessionFromContext(sctx) + csr.updateLag(sess, cs.ResumeToken()) if eventsRead == 0 { @@ -244,11 +249,10 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( ri.NoteSuccess("parsed %d-event batch", len(changeEvents)) select { - case <-ctx.Done(): - return util.WrapCtxErrWithCause(ctx) - case csr.changeEventBatchChan <- changeEventBatch{ - events: changeEvents, - + case <-sctx.Done(): + return util.WrapCtxErrWithCause(sctx) + case csr.eventBatchChan <- eventBatch{ + events: changeEvents, resumeToken: cs.ResumeToken(), }: } @@ -261,9 +265,14 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( func (csr *ChangeStreamReader) iterateChangeStream( ctx context.Context, ri *retry.FuncInfo, - cs *mongo.ChangeStream, sess *mongo.Session, ) error { + sctx := mongo.NewSessionContext(ctx, sess) + + cs := csr.changeStream + + defer cs.Close(sctx) + for { var err error var gotwritesOffTimestamp bool @@ -314,7 +323,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( break } - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) + err = csr.readAndHandleOneChangeEventBatch(sctx, ri, cs) if err != nil { return errors.Wrap(err, "finishing change stream after writes-off") @@ -322,7 +331,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( } default: - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) + err = csr.readAndHandleOneChangeEventBatch(sctx, ri, cs) if err != nil { return err @@ -355,7 +364,8 @@ func (csr *ChangeStreamReader) iterateChangeStream( func (csr *ChangeStreamReader) createChangeStream( ctx context.Context, -) (*mongo.ChangeStream, *mongo.Session, bson.Timestamp, error) { + sess *mongo.Session, +) (bson.Timestamp, error) { pipeline := csr.GetChangeStreamFilter() opts := options.ChangeStream(). SetMaxAwaitTime(maxChangeStreamAwaitTime) @@ -371,14 +381,14 @@ func (csr *ChangeStreamReader) createChangeStream( savedResumeToken, err := csr.loadResumeToken(ctx) if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") + return bson.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") } csStartLogEvent := csr.logger.Info() if token, hasToken := savedResumeToken.Get(); hasToken { logEvent := csStartLogEvent. - Stringer(csr.resumeTokenDocID(), token) + Stringer(resumeTokenDocID(csr.readerType), token) ts, err := csr.resumeTokenTSExtractor(token) if err == nil { @@ -400,24 +410,25 @@ func (csr *ChangeStreamReader) createChangeStream( csStartLogEvent.Msgf("Starting change stream from current %s cluster time.", csr.readerType) } - sess, err := csr.watcherClient.StartSession() - if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to start session") - } sctx := mongo.NewSessionContext(ctx, sess) + changeStream, err := csr.watcherClient.Watch(sctx, pipeline, opts) if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "opening change stream") + return bson.Timestamp{}, errors.Wrap(err, "opening change stream") } - err = csr.persistResumeToken(ctx, changeStream.ResumeToken()) + resumeToken := changeStream.ResumeToken() + + err = csr.persistResumeToken(ctx, resumeToken) if err != nil { - return nil, nil, bson.Timestamp{}, err + changeStream.Close(sctx) + return bson.Timestamp{}, err } - startTs, err := csr.resumeTokenTSExtractor(changeStream.ResumeToken()) + startTs, err := csr.resumeTokenTSExtractor(resumeToken) if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + changeStream.Close(sctx) + return bson.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") } // With sharded clusters the resume token might lead the cluster time @@ -425,7 +436,8 @@ func (csr *ChangeStreamReader) createChangeStream( // otherwise we will get errors. clusterTime, err := util.GetClusterTimeFromSession(sess) if err != nil { - return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") + changeStream.Close(sctx) + return bson.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") } csr.logger.Debug(). @@ -438,93 +450,9 @@ func (csr *ChangeStreamReader) createChangeStream( startTs = clusterTime } - return changeStream, sess, startTs, nil -} - -// StartChangeStream starts the change stream. -func (csr *ChangeStreamReader) start( - ctx context.Context, - eg *errgroup.Group, -) error { - // This channel holds the first change stream creation's result, whether - // success or failure. Rather than using a Result we could make separate - // Timestamp and error channels, but the single channel is cleaner since - // there's no chance of "nonsense" like both channels returning a payload. - initialCreateResultChan := make(chan mo.Result[bson.Timestamp]) - - eg.Go( - func() error { - // Closing changeEventBatchChan at the end of change stream goroutine - // notifies the verifier's change event handler to exit. - defer func() { - csr.logger.Debug(). - Stringer("changeStreamReader", csr). - Msg("Closing change event batch channel.") - - close(csr.changeEventBatchChan) - }() - - retryer := retry.New().WithErrorCodes(util.CursorKilledErrCode) - - parentThreadWaiting := true - - err := retryer.WithCallback( - func(ctx context.Context, ri *retry.FuncInfo) error { - changeStream, sess, startTs, err := csr.createChangeStream(ctx) - if err != nil { - logEvent := csr.logger.Debug(). - Err(err). - Stringer("changeStreamReader", csr) - - if parentThreadWaiting { - logEvent.Msg("First change stream open failed.") - - initialCreateResultChan <- mo.Err[bson.Timestamp](err) - return nil - } - - logEvent.Msg("Retried change stream open failed.") - - return err - } - - defer changeStream.Close(ctx) - - logEvent := csr.logger.Debug(). - Stringer("changeStreamReader", csr). - Any("startTimestamp", startTs) - - if parentThreadWaiting { - logEvent.Msg("First change stream open succeeded.") - - initialCreateResultChan <- mo.Ok(startTs) - close(initialCreateResultChan) - parentThreadWaiting = false - } else { - logEvent.Msg("Retried change stream open succeeded.") - } - - return csr.iterateChangeStream(ctx, ri, changeStream, sess) - }, - "running %s", csr, - ).Run(ctx, csr.logger) - - return err - }, - ) - - result := <-initialCreateResultChan - - startTs, err := result.Get() - if err != nil { - return errors.Wrapf(err, "creating change stream") - } - - csr.startAtTs = &startTs + csr.changeStream = changeStream - csr.running = true - - return nil + return startTs, nil } func (csr *ChangeStreamReader) String() string { diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 62a39bca..1c0208e3 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -345,15 +345,20 @@ func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() { assert.Eventually( suite.T(), func() bool { - rt, err := changeStreamMetaColl.FindOne(ctx, bson.D{}).Raw() + rt, err := changeStreamMetaColl.FindOne( + ctx, + bson.D{ + {"_id", resumeTokenDocID(src)}, + }, + ).Raw() require.NoError(suite.T(), err) - suite.T().Logf("found rt: %v\n", rt) + suite.T().Logf("found rt: %v", rt) return !bytes.Equal(rt, originalResumeToken) }, time.Minute, - 50*time.Millisecond, + 500*time.Millisecond, "should see a new change stream resume token persisted", ) @@ -695,10 +700,9 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() { startAtTs = verifier.srcChangeReader.getStartTimestamp() - suite.Assert().Equal( - *postEventsSessionTime, - startAtTs, - "verifier.srcStartAtTs should now be our session timestamp", + suite.Assert().False( + startAtTs.Before(*postEventsSessionTime), + "verifier.srcStartAtTs should now be at least at the session timestamp", ) } @@ -951,7 +955,7 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() { eventErr := UnknownEventError{} suite.Require().ErrorAs(err, &eventErr) - suite.Assert().Equal("create", eventErr.Event.Lookup("operationType").StringValue()) + suite.Assert().Contains(string(eventErr.Event), "create") } func (suite *IntegrationTestSuite) TestTolerateDestinationCollMod() { diff --git a/internal/verifier/check.go b/internal/verifier/check.go index be003ba6..f310b3d0 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -31,6 +31,11 @@ var ( verificationTaskFailed, verificationTaskMetadataMismatch, ) + + ChangeReaderOpts = mslices.Of( + ChangeReaderOptChangeStream, + ChangeReaderOptOplog, + ) ) // Check is the asynchronous entry point to Check, should only be called by the web server. Use @@ -215,9 +220,13 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // Now that we’ve initialized verifier.generation we can // start the change readers. - verifier.initializeChangeReaders() + err = verifier.initializeChangeReaders() verifier.mux.Unlock() + if err != nil { + return err + } + err = retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { err = verifier.AddMetaIndexes(ctx) @@ -333,7 +342,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // caught again on the next iteration. if verifier.writesOff { verifier.logger.Debug(). - Msg("Waiting for change readers to end.") + Msg("Waiting for change handling to finish.") // It's necessary to wait for the change reader to finish before incrementing the // generation number, or the last changes will not be checked. @@ -610,18 +619,44 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error { } } -func (v *Verifier) initializeChangeReaders() { - v.srcChangeReader = v.newChangeStreamReader( - v.srcNamespaces, - src, - v.srcClient, - *v.srcClusterInfo, - ) +func (v *Verifier) initializeChangeReaders() error { + switch v.srcChangeReaderMethod { + case ChangeReaderOptOplog: + v.srcChangeReader = v.newOplogReader( + v.srcNamespaces, + src, + v.srcClient, + *v.srcClusterInfo, + ) + case ChangeReaderOptChangeStream: + v.srcChangeReader = v.newChangeStreamReader( + v.srcNamespaces, + src, + v.srcClient, + *v.srcClusterInfo, + ) + default: + return fmt.Errorf("bad source change reader: %#q", v.srcChangeReaderMethod) + } - v.dstChangeReader = v.newChangeStreamReader( - v.dstNamespaces, - dst, - v.dstClient, - *v.dstClusterInfo, - ) + switch v.dstChangeReaderMethod { + case ChangeReaderOptOplog: + v.dstChangeReader = v.newOplogReader( + v.dstNamespaces, + dst, + v.dstClient, + *v.dstClusterInfo, + ) + case ChangeReaderOptChangeStream: + v.dstChangeReader = v.newChangeStreamReader( + v.dstNamespaces, + dst, + v.dstClient, + *v.dstClusterInfo, + ) + default: + return fmt.Errorf("bad destination change reader: %#q", v.srcChangeReaderMethod) + } + + return nil } diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 678cbf9b..e63adbd3 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -536,6 +536,9 @@ func iterateCursorToChannel( defer close(writer) sess := mongo.SessionFromContext(sctx) + if sess == nil { + panic("need a session") + } for cursor.Next(sctx) { state.NoteSuccess("received a document") @@ -577,7 +580,7 @@ func getMapKey(docKeyValues []bson.RawValue) string { } func (verifier *Verifier) getDocumentsCursor( - ctx context.Context, + sctx context.Context, collection *mongo.Collection, clusterInfo *util.ClusterInfo, readConcernTS bson.Timestamp, @@ -663,7 +666,7 @@ func (verifier *Verifier) getDocumentsCursor( ) } - sess := mongo.SessionFromContext(ctx) + sess := mongo.SessionFromContext(sctx) if sess == nil { panic("No session?!?") @@ -685,6 +688,7 @@ func (verifier *Verifier) getDocumentsCursor( // quite long. if !task.IsRecheck() { if verifier.logger.Trace().Enabled() { + evt := verifier.logger.Trace(). Any("task", task.PrimaryKey) @@ -697,10 +701,15 @@ func (verifier *Verifier) getDocumentsCursor( Str("cmd", string(cmdStr)). Str("options", fmt.Sprintf("%v", *runCommandOptions)). Msg("getDocuments command.") + } } - return collection.Database().RunCommandCursor(ctx, cmd, runCommandOptions) + return collection.Database().RunCommandCursor( + sctx, + cmd, + runCommandOptions, + ) } func transformPipelineForToHashedIndexKey( diff --git a/internal/verifier/generation.go b/internal/verifier/generation.go index 9f267d82..0f5f8f52 100644 --- a/internal/verifier/generation.go +++ b/internal/verifier/generation.go @@ -12,13 +12,14 @@ import ( ) const ( - generationCollName = "generation" - generationFieldName = "generation" + generationCollName = "generation" ) type generationDoc struct { - Generation int - MetadataVersion int + Generation int + MetadataVersion int + SourceChangeReaderOpt string + DestinationChangeReaderOpt string } type metadataMismatchErr struct { @@ -32,6 +33,20 @@ func (mme metadataMismatchErr) Error() string { ) } +type changeReaderOptMismatchErr struct { + reader whichCluster + persistedOpt string + currentOpt string +} + +func (crme changeReaderOptMismatchErr) Error() string { + return fmt.Sprintf("new %s change reader opt is %#q, but %#q was used previously; either use the old option, or restart verification", + crme.reader, + crme.currentOpt, + crme.persistedOpt, + ) +} + func (v *Verifier) persistGenerationWhileLocked(ctx context.Context) error { generation, _ := v.getGenerationWhileLocked() @@ -41,8 +56,10 @@ func (v *Verifier) persistGenerationWhileLocked(ctx context.Context) error { ctx, bson.D{}, generationDoc{ - Generation: generation, - MetadataVersion: verifierMetadataVersion, + Generation: generation, + MetadataVersion: verifierMetadataVersion, + SourceChangeReaderOpt: v.srcChangeReaderMethod, + DestinationChangeReaderOpt: v.dstChangeReaderMethod, }, options.Replace().SetUpsert(true), ) @@ -78,7 +95,22 @@ func (v *Verifier) readGeneration(ctx context.Context) (option.Option[int], erro if parsed.MetadataVersion != verifierMetadataVersion { return option.None[int](), metadataMismatchErr{parsed.MetadataVersion} + } + if parsed.SourceChangeReaderOpt != v.srcChangeReaderMethod { + return option.None[int](), changeReaderOptMismatchErr{ + reader: src, + persistedOpt: parsed.SourceChangeReaderOpt, + currentOpt: v.srcChangeReaderMethod, + } + } + + if parsed.DestinationChangeReaderOpt != v.dstChangeReaderMethod { + return option.None[int](), changeReaderOptMismatchErr{ + reader: dst, + persistedOpt: parsed.DestinationChangeReaderOpt, + currentOpt: v.dstChangeReaderMethod, + } } return option.Some(parsed.Generation), nil diff --git a/internal/verifier/integration_test_suite.go b/internal/verifier/integration_test_suite.go index 19aff0ab..df86b3ff 100644 --- a/internal/verifier/integration_test_suite.go +++ b/internal/verifier/integration_test_suite.go @@ -1,6 +1,7 @@ package verifier import ( + "cmp" "context" "os" "strings" @@ -8,6 +9,7 @@ import ( "github.com/10gen/migration-verifier/contextplus" "github.com/10gen/migration-verifier/internal/logger" + "github.com/10gen/migration-verifier/internal/testutil" "github.com/10gen/migration-verifier/internal/util" mapset "github.com/deckarep/golang-set/v2" "github.com/pkg/errors" @@ -117,6 +119,9 @@ func (suite *IntegrationTestSuite) SetupTest() { suite.initialDbNames.Add(dbName) } } + + testutil.KillTransactions(ctx, suite.T(), suite.srcMongoClient) + testutil.KillTransactions(ctx, suite.T(), suite.dstMongoClient) } func (suite *IntegrationTestSuite) TearDownTest() { @@ -191,11 +196,26 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier { "should set metadata connection string", ) verifier.SetMetaDBName(metaDBName) - verifier.initializeChangeReaders() + + envSrcChangeReader := cmp.Or( + os.Getenv("MVTEST_SRC_CHANGE_READER"), + ChangeReaderOptChangeStream, + ) + suite.Require().NoError(verifier.SetSrcChangeReaderMethod(envSrcChangeReader)) + + envDstChangeReader := cmp.Or( + os.Getenv("MVTEST_DST_CHANGE_READER"), + ChangeReaderOptChangeStream, + ) + + suite.Require().NoError(verifier.SetDstChangeReaderMethod(envDstChangeReader)) + + suite.Require().NoError(verifier.initializeChangeReaders()) suite.Require().NoError(verifier.srcClientCollection(&task).Drop(ctx)) suite.Require().NoError(verifier.dstClientCollection(&task).Drop(ctx)) suite.Require().NoError(verifier.AddMetaIndexes(ctx)) + return verifier } diff --git a/internal/verifier/list_namespaces.go b/internal/verifier/list_namespaces.go index 7608469f..ebb28fa0 100644 --- a/internal/verifier/list_namespaces.go +++ b/internal/verifier/list_namespaces.go @@ -5,6 +5,7 @@ import ( "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/internal/verifier/namespaces" "github.com/10gen/migration-verifier/mmongo" "github.com/10gen/migration-verifier/mslices" "go.mongodb.org/mongo-driver/v2/bson" @@ -12,28 +13,6 @@ import ( "go.mongodb.org/mongo-driver/v2/mongo/options" ) -const ( - // ExcludedSystemCollPrefix is the prefix of system collections, - // which we ignore. - ExcludedSystemCollPrefix = "system." - - // MongoDBInternalDBPrefix is the prefix for MongoDB-internal databases. - // (e.g., Atlas’s availability canary) - MongoDBInternalDBPrefix = "__mdb_internal" -) - -var ( - ExcludedDBPrefixes = mslices.Of( - // mongosync metadata: - "mongosync_internal_", - "mongosync_reserved_", - MongoDBInternalDBPrefix, - ) - - // ExcludedSystemDBs are system databases that are excluded from verification. - ExcludedSystemDBs = []string{"admin", "config", "local"} -) - // ListAllUserNamespaces lists all the user collections on a cluster, // in addition to time-series “system.buckets.*” collections. // @@ -48,7 +27,7 @@ func ListAllUserNamespaces( ) ([]string, error) { excludedDBs := []string{} excludedDBs = append(excludedDBs, additionalExcludedDBs...) - excludedDBs = append(excludedDBs, ExcludedSystemDBs...) + excludedDBs = append(excludedDBs, namespaces.ExcludedSystemDBs...) var excluded []any for _, e := range excludedDBs { @@ -58,7 +37,7 @@ func ListAllUserNamespaces( dbNames, err := client.ListDatabaseNames(ctx, bson.D{ {"$and", []bson.D{ {{"name", bson.D{{"$nin", excluded}}}}, - util.ExcludePrefixesQuery("name", ExcludedDBPrefixes), + util.ExcludePrefixesQuery("name", namespaces.ExcludedDBPrefixes), }}, }) @@ -77,7 +56,7 @@ func ListAllUserNamespaces( {"$or", []bson.D{ util.ExcludePrefixesQuery( "name", - mslices.Of(ExcludedSystemCollPrefix), + mslices.Of(namespaces.ExcludedSystemCollPrefix), ), { {"$expr", mmongo.StartsWithAgg("$name", timeseriesBucketsPrefix)}, diff --git a/internal/verifier/metadata.go b/internal/verifier/metadata.go index 906117c9..310048a6 100644 --- a/internal/verifier/metadata.go +++ b/internal/verifier/metadata.go @@ -6,5 +6,6 @@ package verifier // 3: Enqueued rechecks now reference the generation in which they’ll be // rechecked rather than the generation during which they were enqueued. // 4: Use “changeReader” instead of “changeStream” collection name. +// 5: Metadata now stores source & destination change reader options. -const verifierMetadataVersion = 4 +const verifierMetadataVersion = 5 diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index e23445cb..de97f8c3 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -103,6 +103,9 @@ type Verifier struct { srcEventRecorder *EventRecorder dstEventRecorder *EventRecorder + srcChangeReaderMethod string + dstChangeReaderMethod string + changeHandlingErr *util.Eventual[error] // Used only with generation 0 to defer the first @@ -245,7 +248,7 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error { } verifier.writesOff = true - verifier.logger.Debug().Msg("Signalling that writes are done.") + verifier.logger.Debug().Msg("Signaling that writes are done.") srcFinalTs, err = GetNewClusterTime( ctx, @@ -378,6 +381,42 @@ func (verifier *Verifier) SetDocCompareMethod(method DocCompareMethod) { verifier.docCompareMethod = method } +func (verifier *Verifier) SetSrcChangeReaderMethod(method string) error { + err := validateChangeReaderOpt(method, *verifier.srcClusterInfo) + if err != nil { + return errors.Wrap(err, "setting source change reader method") + } + + verifier.srcChangeReaderMethod = method + + return nil +} + +func (verifier *Verifier) SetDstChangeReaderMethod(method string) error { + err := validateChangeReaderOpt(method, *verifier.dstClusterInfo) + if err != nil { + return errors.Wrap(err, "setting source change reader method") + } + + verifier.dstChangeReaderMethod = method + + return nil +} + +func validateChangeReaderOpt( + method string, + clusterInfo util.ClusterInfo, +) error { + switch method { + case ChangeReaderOptOplog: + if clusterInfo.Topology == util.TopologySharded { + return fmt.Errorf("cannot read oplog from sharded cluster") + } + } + + return nil +} + func (verifier *Verifier) SetVerifyAll(arg bool) { verifier.verifyAll = arg } diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index e714491e..f799af29 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -25,6 +25,7 @@ import ( "github.com/10gen/migration-verifier/internal/testutil" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/internal/verifier/namespaces" "github.com/10gen/migration-verifier/internal/verifier/recheck" "github.com/10gen/migration-verifier/mbson" "github.com/10gen/migration-verifier/mslices" @@ -696,7 +697,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck() err := verifier.PersistChangeEvents( ctx, - changeEventBatch{ + eventBatch{ events: []ParsedEvent{{ OpType: "insert", Ns: &Namespace{DB: "mydb", Coll: "coll2"}, @@ -712,7 +713,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck() err = verifier.PersistChangeEvents( ctx, - changeEventBatch{ + eventBatch{ events: []ParsedEvent{{ OpType: "insert", Ns: &Namespace{DB: "mydb", Coll: "coll1"}, @@ -981,7 +982,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { }, } - batch := changeEventBatch{ + batch := eventBatch{ events: mslices.Of(event), } @@ -1698,19 +1699,21 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() { func (suite *IntegrationTestSuite) TestVerifierDocMismatches() { ctx := suite.Context() + dbName := suite.DBNameForTest() + suite.Require().NoError( suite.srcMongoClient. - Database("test"). + Database(dbName). Collection("coll").Drop(ctx), ) suite.Require().NoError( suite.dstMongoClient. - Database("test"). + Database(dbName). Collection("coll").Drop(ctx), ) _, err := suite.srcMongoClient. - Database("test"). + Database(dbName). Collection("coll"). InsertMany( ctx, @@ -1729,7 +1732,7 @@ func (suite *IntegrationTestSuite) TestVerifierDocMismatches() { // The first has a mismatched `foo` value, // and the 2nd lacks `foo` entirely. _, err = suite.dstMongoClient. - Database("test"). + Database(dbName). Collection("coll"). InsertMany(ctx, lo.ToAnySlice([]bson.D{ {{"_id", 100000}, {"foo", 1}}, @@ -1740,7 +1743,7 @@ func (suite *IntegrationTestSuite) TestVerifierDocMismatches() { verifier := suite.BuildVerifier() verifier.failureDisplaySize = 10 - ns := "test.coll" + ns := dbName + ".coll" verifier.SetSrcNamespaces([]string{ns}) verifier.SetDstNamespaces([]string{ns}) verifier.SetNamespaceMap() @@ -2070,13 +2073,6 @@ func (suite *IntegrationTestSuite) TestMetadataMismatchAndPartitioning() { srcColl := suite.srcMongoClient.Database(suite.DBNameForTest()).Collection("coll") dstColl := suite.dstMongoClient.Database(suite.DBNameForTest()).Collection("coll") - verifier := suite.BuildVerifier() - - ns := srcColl.Database().Name() + "." + srcColl.Name() - verifier.SetSrcNamespaces([]string{ns}) - verifier.SetDstNamespaces([]string{ns}) - verifier.SetNamespaceMap() - for _, coll := range mslices.Of(srcColl, dstColl) { _, err := coll.InsertOne(ctx, bson.M{"_id": 1, "x": 42}) suite.Require().NoError(err) @@ -2090,6 +2086,13 @@ func (suite *IntegrationTestSuite) TestMetadataMismatchAndPartitioning() { ) suite.Require().NoError(err) + verifier := suite.BuildVerifier() + + ns := srcColl.Database().Name() + "." + srcColl.Name() + verifier.SetSrcNamespaces([]string{ns}) + verifier.SetDstNamespaces([]string{ns}) + verifier.SetNamespaceMap() + runner := RunVerifierCheck(ctx, suite.T(), verifier) suite.Require().NoError(runner.AwaitGenerationEnd()) @@ -2118,23 +2121,37 @@ func (suite *IntegrationTestSuite) TestMetadataMismatchAndPartitioning() { suite.Require().Equal(verificationTaskVerifyCollection, tasks[1].Type) suite.Require().Equal(verificationTaskMetadataMismatch, tasks[1].Status) - suite.Require().NoError(runner.StartNextGeneration()) - suite.Require().NoError(runner.AwaitGenerationEnd()) + // When tailing the oplog sometimes the verifier starts up “in the past”, + // which can cause extra rechecks that we wouldn’t normally expect. This + // waits for any of those to clear out. + suite.Require().Eventually( + func() bool { + suite.Require().NoError(runner.StartNextGeneration()) + suite.Require().NoError(runner.AwaitGenerationEnd()) - cursor, err = verifier.verificationTaskCollection().Aggregate( - ctx, - append( - mongo.Pipeline{ - bson.D{{"$match", bson.D{{"generation", 1}}}}, - }, - testutil.SortByListAgg("type", sortedTaskTypes)..., - ), - ) - suite.Require().NoError(err) + cursor, err = verifier.verificationTaskCollection().Aggregate( + ctx, + append( + mongo.Pipeline{ + bson.D{{"$match", bson.D{{"generation", verifier.generation}}}}, + }, + testutil.SortByListAgg("type", sortedTaskTypes)..., + ), + ) + suite.Require().NoError(err) - suite.Require().NoError(cursor.All(ctx, &tasks)) + suite.Require().NoError(cursor.All(ctx, &tasks)) + + suite.Require().GreaterOrEqual(len(tasks), 1, "we always expect >=1 task") - suite.Require().Len(tasks, 1, "generation 1 should only have done 1 task") + return len(tasks) == 1 + }, + time.Minute, + time.Millisecond, + "wait until verifier has caught up with itself", + ) + + suite.Require().Len(tasks, 1, "should eventually only have 1 task; tasks=%+v", tasks) suite.Require().Equal(verificationTaskVerifyCollection, tasks[0].Type) suite.Require().Equal(verificationTaskMetadataMismatch, tasks[0].Status) } @@ -2243,7 +2260,7 @@ func (suite *IntegrationTestSuite) TestGenerationalRechecking() { func (suite *IntegrationTestSuite) TestMongoDBInternalDB() { ctx := suite.Context() - dbName := MongoDBInternalDBPrefix + "internalDBTest" + dbName := namespaces.MongoDBInternalDBPrefix + "internalDBTest" _, err := suite.srcMongoClient. Database(dbName). @@ -2283,21 +2300,14 @@ func (suite *IntegrationTestSuite) TestMongoDBInternalDB() { } func (suite *IntegrationTestSuite) TestVerifierWithFilter() { + ctx := suite.Context() + zerolog.SetGlobalLevel(zerolog.DebugLevel) dbname1 := suite.DBNameForTest("1") dbname2 := suite.DBNameForTest("2") filter := bson.D{{"inFilter", bson.M{"$ne": false}}} - verifier := suite.BuildVerifier() - verifier.SetSrcNamespaces([]string{dbname1 + ".testColl1"}) - verifier.SetDstNamespaces([]string{dbname2 + ".testColl3"}) - verifier.SetNamespaceMap() - verifier.SetDocCompareMethod(DocCompareIgnoreOrder) - // Set this value low to test the verifier with multiple partitions. - verifier.partitionSizeInBytes = 50 - - ctx := suite.Context() srcColl := suite.srcMongoClient.Database(dbname1).Collection("testColl1") dstColl := suite.dstMongoClient.Database(dbname2).Collection("testColl3") @@ -2320,6 +2330,14 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { _, err = srcColl.InsertMany(ctx, docs) suite.Require().NoError(err) + verifier := suite.BuildVerifier() + verifier.SetSrcNamespaces([]string{dbname1 + ".testColl1"}) + verifier.SetDstNamespaces([]string{dbname2 + ".testColl3"}) + verifier.SetNamespaceMap() + verifier.SetDocCompareMethod(DocCompareIgnoreOrder) + // Set this value low to test the verifier with multiple partitions. + verifier.partitionSizeInBytes = 50 + checkDoneChan := make(chan struct{}) checkContinueChan := make(chan struct{}) go func() { @@ -2348,10 +2366,29 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { // Wait for one generation to finish. <-checkDoneChan status := waitForTasks() + suite.Require().Greater(status.CompletedTasks, 1) suite.Require().Greater(status.TotalTasks, 1) suite.Require().Zero(status.FailedTasks, "there should be no failed tasks") + // When reading the oplog, verifier often sees the “near past”. + // Wait for it to do initial checks before continuing. + suite.Require().Eventually( + func() bool { + suite.T().Logf("Checking whether verifier has caught up to itself …") + + checkContinueChan <- struct{}{} + <-checkDoneChan + status, err = verifier.GetVerificationStatus(ctx) + suite.Require().NoError(err) + + return status.TotalTasks == 0 + }, + time.Minute, + time.Millisecond, + "verifier must reach stasis before continuing", + ) + // Insert another document that is not in the filter. // This should trigger a recheck despite being outside the filter. _, err = srcColl.InsertOne(ctx, bson.M{"_id": 200, "x": 200, "inFilter": false}) @@ -2372,15 +2409,23 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { _, err = srcColl.InsertOne(ctx, bson.M{"_id": 201, "x": 201, "inFilter": true}) suite.Require().NoError(err) - // Tell check to start the next generation. - checkContinueChan <- struct{}{} + suite.Require().Eventually( + func() bool { + suite.T().Log("checking to see if a failure was found yet") - // Wait for one generation to finish. - <-checkDoneChan - status = waitForTasks() + // Tell check to start the next generation. + checkContinueChan <- struct{}{} - // There should be a failure from the src insert of a document in the filter. - suite.Require().Equal(VerificationStatus{TotalTasks: 1, FailedTasks: 1}, *status) + // Wait for one generation to finish. + <-checkDoneChan + status = waitForTasks() + + return *status == VerificationStatus{TotalTasks: 1, FailedTasks: 1} + }, + time.Minute, + time.Second, + "we should see a failure from the src insert of a document in the filter.", + ) // Now patch up the destination. _, err = dstColl.InsertOne(ctx, bson.M{"_id": 201, "x": 201, "inFilter": true}) @@ -2396,6 +2441,8 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() { // There should be no failures now, since they are equivalent at this point in time. suite.Require().Equal(VerificationStatus{TotalTasks: 1, CompletedTasks: 1}, *status) + suite.T().Log("Finalizing test") + // Turn writes off. suite.Require().NoError(verifier.WritesOff(ctx)) diff --git a/internal/verifier/namespaces/exclude.go b/internal/verifier/namespaces/exclude.go new file mode 100644 index 00000000..d80cc40f --- /dev/null +++ b/internal/verifier/namespaces/exclude.go @@ -0,0 +1,26 @@ +package namespaces + +import ( + "github.com/10gen/migration-verifier/mslices" +) + +const ( + // ExcludedSystemCollPrefix is the prefix of system collections, + // which we ignore. + ExcludedSystemCollPrefix = "system." + + // MongoDBInternalDBPrefix is the prefix for MongoDB-internal databases. + // (e.g., Atlas’s availability canary) + MongoDBInternalDBPrefix = "__mdb_internal" +) + +var ( + ExcludedDBPrefixes = mslices.Of( + "mongosync_internal_", + "mongosync_reserved_", + MongoDBInternalDBPrefix, + ) + + // ExcludedSystemDBs are system databases that are excluded from verification. + ExcludedSystemDBs = []string{"admin", "config", "local"} +) diff --git a/internal/verifier/oplog/oplog.go b/internal/verifier/oplog/oplog.go new file mode 100644 index 00000000..a718e66a --- /dev/null +++ b/internal/verifier/oplog/oplog.go @@ -0,0 +1,174 @@ +package oplog + +import ( + "encoding/binary" + "fmt" + "slices" + + "github.com/10gen/migration-verifier/mbson" + "github.com/10gen/migration-verifier/option" + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" +) + +const ( + rtBSONLength = 4 + 1 + 2 + 1 + 8 + 1 +) + +// Op represents the parts of an oplog entry that we care about. +type Op struct { + // Op holds the oplog entry’s `op`. + Op string + + // TS holds the oplog entry’s `ts`. + TS bson.Timestamp + + // Ns holds the oplog entry’s `ns`. + Ns string + + // CmdName is the first field name in the oplog entry’s `o` document. + CmdName option.Option[string] `bson:",omitempty"` + + // DocLen is the length, in bytes, of whatever document the oplog entry + // describes. This will only be meaningful for insert & replace entries. + DocLen int32 `bson:"docLen"` + + // DocID is the `_id` of whatever document the oplog entry describes. + // This won’t be populated for multi-op Op instances. + DocID bson.RawValue `bson:"docID"` + + // Ops holds the ops in an `applyOps` oplog entry. + Ops []Op `bson:",omitempty"` +} + +func (*Op) UnmarshalBSON([]byte) error { + panic("Use UnmarshalFromBSON.") +} + +// UnmarshalFromBSON unmarshals an Op as transformed by the oplog reader’s +// projection of the oplog. It’s more efficient than the standard +// bson.Unmarshal function. When verifier reads a v4.4+ server, this function +// is called for every oplog entry, so that efficiency is material. +func (o *Op) UnmarshalFromBSON(in []byte) error { + for el, err := range mbson.RawElements(bson.Raw(in)) { + if err != nil { + return errors.Wrap(err, "iterating BSON document") + } + + key, err := el.KeyErr() + if err != nil { + return errors.Wrap(err, "reading BSON field name") + } + + switch key { + case "op": + err := mbson.UnmarshalElementValue(el, &o.Op) + if err != nil { + return errors.Wrapf(err, "parsing %#q", key) + } + case "ts": + err := mbson.UnmarshalElementValue(el, &o.TS) + if err != nil { + return errors.Wrapf(err, "parsing %#q", key) + } + case "ns": + err := mbson.UnmarshalElementValue(el, &o.Ns) + if err != nil { + return errors.Wrapf(err, "parsing %#q", key) + } + case "cmdName": + var cmdName string + err := mbson.UnmarshalElementValue(el, &cmdName) + if err != nil { + return errors.Wrapf(err, "parsing %#q", key) + } + o.CmdName = option.Some(cmdName) + case "docLen": + err := mbson.UnmarshalElementValue(el, &o.DocLen) + if err != nil { + return errors.Wrapf(err, "parsing %#q", key) + } + case "docID": + o.DocID, err = el.ValueErr() + if err != nil { + return errors.Wrapf(err, "parsing %#q value", key) + } + o.DocID.Value = slices.Clone(o.DocID.Value) + case "ops": + var arr bson.RawArray + err := errors.Wrapf( + mbson.UnmarshalElementValue(el, &arr), + "parsing ops", + ) + + if err != nil { + return err + } + + vals, err := arr.Values() + if err != nil { + return errors.Wrap(err, "parsing applyOps") + } + + o.Ops = make([]Op, len(vals)) + + for i, val := range vals { + + var opRaw bson.Raw + err := mbson.UnmarshalRawValue(val, &opRaw) + if err != nil { + return errors.Wrapf(err, "parsing applyOps field") + } + + if err := (&o.Ops[i]).UnmarshalFromBSON(opRaw); err != nil { + return errors.Wrapf(err, "parsing applyOps[%d]", i) + } + } + default: + return errors.Wrapf(err, "unexpected field %#q", key) + } + } + + return nil +} + +// ResumeToken is Migration Verifier’s internal format for storing the +// timestamp to resume an oplog reader. +type ResumeToken struct { + TS bson.Timestamp +} + +func (ResumeToken) MarshalBSON() ([]byte, error) { + panic("Use MarshalToBSON.") +} + +// MarshalToBSON marshals a ResumeToken to BSON. Unlike with the standard +// bson.Marshaler interface, this method never fails. It’s also faster/lighter +// because it avoids reflection, which is relevant because this is called for +// every batch of ops. +func (rt ResumeToken) MarshalToBSON() []byte { + buf := make([]byte, 4, rtBSONLength) + + binary.LittleEndian.PutUint32(buf, uint32(cap(buf))) + + buf = bsoncore.AppendTimestampElement(buf, "ts", rt.TS.T, rt.TS.I) + + buf = append(buf, 0) + + if len(buf) != rtBSONLength { + panic(fmt.Sprintf("bad resume token BSON length: %d", len(buf))) + } + + return buf +} + +// GetRawResumeTokenTimestamp extracts the timestamp from a given oplog entry. +func GetRawResumeTokenTimestamp(token bson.Raw) (bson.Timestamp, error) { + rv, err := token.LookupErr("ts") + if err != nil { + return bson.Timestamp{}, errors.Wrap(err, "getting ts") + } + + return mbson.CastRawValue[bson.Timestamp](rv) +} diff --git a/internal/verifier/oplog/oplog_test.go b/internal/verifier/oplog/oplog_test.go new file mode 100644 index 00000000..8d4b6f03 --- /dev/null +++ b/internal/verifier/oplog/oplog_test.go @@ -0,0 +1,46 @@ +package oplog + +import ( + "testing" + + "github.com/10gen/migration-verifier/mbson" + "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/v2/bson" +) + +func TestOpUnmarshal(t *testing.T) { + op := Op{ + Op: "hey", + TS: bson.Timestamp{345, 456}, + Ns: "hohqohewoqhwe", + DocLen: 777, + DocID: mbson.ToRawValue("haha"), + } + + raw := bson.Raw(lo.Must(bson.Marshal(op))) + + rt := &Op{} + require.NoError(t, rt.UnmarshalFromBSON(raw)) + + assert.Equal(t, &op, rt, "Op should round-trip BSON (raw: %+v)", raw) +} + +func TestResumeTokenBSON(t *testing.T) { + token := ResumeToken{ + TS: bson.Timestamp{T: 234234, I: 11}, + } + + raw := token.MarshalToBSON() + + ts, err := GetRawResumeTokenTimestamp(raw) + require.NoError(t, err) + + assert.Equal(t, token.TS, ts, "extracted timestamp should match") + + var rt ResumeToken + require.NoError(t, bson.Unmarshal(raw, &rt)) + + assert.Equal(t, token, rt) +} diff --git a/internal/verifier/oplog/start_time.go b/internal/verifier/oplog/start_time.go new file mode 100644 index 00000000..cd2368fe --- /dev/null +++ b/internal/verifier/oplog/start_time.go @@ -0,0 +1,163 @@ +package oplog + +import ( + "context" + "fmt" + + "github.com/10gen/migration-verifier/option" + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/mongo/readconcern" +) + +// GetTailingStartTimes returns the earliest transaction timestamp and the +// timestamp of the latest-visible op in the oplog. +func GetTailingStartTimes( + ctx context.Context, + client *mongo.Client, +) (OpTime, OpTime, error) { + oldestTxn, err := getOldestTransactionTime(ctx, client) + if err != nil { + return OpTime{}, OpTime{}, errors.Wrapf(err, "finding oldest txn") + } + + latestTime, err := getLatestVisibleOplogOpTime(ctx, client) + if err != nil { + return OpTime{}, OpTime{}, errors.Wrapf(err, "finding latest optime") + } + + if oldestTime, has := oldestTxn.Get(); has { + return oldestTime, latestTime, nil + } + + return latestTime, latestTime, nil +} + +type OpTime struct { + TS bson.Timestamp + T int64 + H option.Option[int64] +} + +func (ot OpTime) Equals(ot2 OpTime) bool { + if !ot.TS.Equal(ot2.TS) { + return false + } + + if ot.T != ot2.T { + return false + } + + return ot.H.OrZero() == ot2.H.OrZero() +} + +// GetLatestOplogOpTime returns the optime of the most recent oplog +// record satisfying the given `query` or a zero-value db.OpTime{} if +// no oplog record matches. This method does not ensure that all prior oplog +// entries are visible (i.e. have been storage-committed). +func getLatestOplogOpTime( + ctx context.Context, + client *mongo.Client, +) (OpTime, error) { + var optime OpTime + + opts := options.FindOne(). + SetProjection(bson.M{"ts": 1, "t": 1, "h": 1}). + SetSort(bson.D{{"$natural", -1}}) + + coll := client.Database("local").Collection("oplog.rs") + + res := coll.FindOne(ctx, bson.D{}, opts) + if err := res.Err(); err != nil { + return OpTime{}, err + } + + if err := res.Decode(&optime); err != nil { + return OpTime{}, err + } + return optime, nil +} + +func getLatestVisibleOplogOpTime( + ctx context.Context, + client *mongo.Client, +) (OpTime, error) { + + latestOpTime, err := getLatestOplogOpTime(ctx, client) + if err != nil { + return OpTime{}, err + } + + coll := client.Database("local").Collection("oplog.rs") + + // Do a forward scan starting at the last op fetched to ensure that + // all operations with earlier oplog times have been storage-committed. + result, err := coll.FindOne(ctx, + bson.M{"ts": bson.M{"$gte": latestOpTime.TS}}, + + //nolint SA1019 + options.FindOne().SetOplogReplay(true), + ).Raw() + if err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + return OpTime{}, fmt.Errorf( + "last op was not confirmed. last optime: %+v. confirmation time was not found", + latestOpTime, + ) + } + return OpTime{}, err + } + + var optime OpTime + + if err := bson.Unmarshal(result, &optime); err != nil { + return OpTime{}, errors.Wrap(err, "local.oplog.rs error") + } + + if !optime.Equals(latestOpTime) { + return OpTime{}, fmt.Errorf( + "last op was not confirmed. last optime: %+v. confirmation time: %+v", + latestOpTime, + optime, + ) + } + + return latestOpTime, nil +} + +func getOldestTransactionTime( + ctx context.Context, + client *mongo.Client, +) (option.Option[OpTime], error) { + coll := client.Database("config"). + Collection( + "transactions", + options.Collection().SetReadConcern(readconcern.Majority()), + ) + + decoded := struct { + StartOpTime OpTime + }{} + + err := coll.FindOne( + ctx, + bson.D{ + {"state", bson.D{ + {"$in", bson.A{"prepared", "inProgress"}}, + }}, + }, + options.FindOne().SetSort(bson.D{{"startOpTime", 1}}), + ).Decode(&decoded) + + if errors.Is(err, mongo.ErrNoDocuments) { + return option.None[OpTime](), nil + } + + if err != nil { + return option.None[OpTime](), errors.Wrap(err, "config.transactions.findOne") + } + + return option.Some(decoded.StartOpTime), nil +} diff --git a/internal/verifier/oplog_reader.go b/internal/verifier/oplog_reader.go new file mode 100644 index 00000000..80203993 --- /dev/null +++ b/internal/verifier/oplog_reader.go @@ -0,0 +1,716 @@ +package verifier + +import ( + "context" + "fmt" + "strings" + + "github.com/10gen/migration-verifier/agg" + "github.com/10gen/migration-verifier/agg/helpers" + "github.com/10gen/migration-verifier/internal/retry" + "github.com/10gen/migration-verifier/internal/types" + "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/internal/verifier/namespaces" + "github.com/10gen/migration-verifier/internal/verifier/oplog" + "github.com/10gen/migration-verifier/mbson" + "github.com/10gen/migration-verifier/mmongo" + "github.com/10gen/migration-verifier/mslices" + "github.com/10gen/migration-verifier/option" + "github.com/pkg/errors" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/mongo/readconcern" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "golang.org/x/exp/slices" +) + +const ( + ChangeReaderOptOplog = "tailOplog" +) + +// OplogReader reads change events via oplog tailing instead of a change stream. +// This significantly lightens server load and allows verification of heavier +// workloads than change streams allow. It only works with replica sets. +type OplogReader struct { + ChangeReaderCommon + + curDocs []bson.Raw + scratch []byte + cursor *mongo.Cursor + allowDDLBeforeTS bson.Timestamp +} + +var _ changeReader = &OplogReader{} + +func (v *Verifier) newOplogReader( + namespaces []string, + cluster whichCluster, + client *mongo.Client, + clusterInfo util.ClusterInfo, +) *OplogReader { + common := newChangeReaderCommon(cluster) + common.namespaces = namespaces + common.watcherClient = client + common.clusterInfo = clusterInfo + + common.logger = v.logger + common.metaDB = v.metaClient.Database(v.metaDBName) + + common.resumeTokenTSExtractor = oplog.GetRawResumeTokenTimestamp + + o := &OplogReader{ChangeReaderCommon: common} + + o.createIteratorCb = o.createCursor + o.iterateCb = o.iterateCursor + + return o +} + +func (o *OplogReader) createCursor( + ctx context.Context, + sess *mongo.Session, +) (bson.Timestamp, error) { + savedResumeToken, err := o.loadResumeToken(ctx) + if err != nil { + return bson.Timestamp{}, errors.Wrap(err, "loading persisted resume token") + } + + var allowDDLBeforeTS bson.Timestamp + + var startTS bson.Timestamp + + if token, has := savedResumeToken.Get(); has { + var rt oplog.ResumeToken + if err := bson.Unmarshal(token, &rt); err != nil { + return bson.Timestamp{}, errors.Wrap(err, "parsing persisted resume token") + } + + ddlAllowanceResult := o.getMetadataCollection().FindOne( + ctx, + bson.D{ + {"_id", o.ddlAllowanceDocID()}, + }, + ) + + allowanceRaw, err := ddlAllowanceResult.Raw() + if err != nil { + return bson.Timestamp{}, errors.Wrap(err, "fetching DDL allowance timestamp") + } + + allowDDLBeforeTS, err = mbson.Lookup[bson.Timestamp](allowanceRaw, "ts") + if err != nil { + return bson.Timestamp{}, errors.Wrap(err, "parsing DDL allowance timestamp doc") + } + + startTS = rt.TS + } else { + startOpTime, latestOpTime, err := oplog.GetTailingStartTimes(ctx, o.watcherClient) + if err != nil { + return bson.Timestamp{}, errors.Wrapf(err, "getting start optime from %s", o.readerType) + } + + allowDDLBeforeTS = latestOpTime.TS + + _, err = o.getMetadataCollection().ReplaceOne( + ctx, + bson.D{ + {"_id", o.ddlAllowanceDocID()}, + }, + bson.D{ + {"ts", allowDDLBeforeTS}, + }, + options.Replace().SetUpsert(true), + ) + if err != nil { + return bson.Timestamp{}, errors.Wrapf(err, "persisting DDL-allowance timestamp") + } + + startTS = startOpTime.TS + + err = o.persistResumeToken(ctx, oplog.ResumeToken{startTS}.MarshalToBSON()) + if err != nil { + return bson.Timestamp{}, errors.Wrap(err, "persisting resume token") + } + } + + o.logger.Info(). + Any("startReadTs", startTS). + Any("currentOplogTs", allowDDLBeforeTS). + Msg("Tailing oplog.") + + sctx := mongo.NewSessionContext(ctx, sess) + + findOpts := options.Find(). + SetCursorType(options.TailableAwait) + + if util.ClusterHasBSONSize([2]int(o.clusterInfo.VersionArray)) { + findOpts.SetProjection(o.getExprProjection()) + } + + oplogFilter := bson.D{{"$and", []any{ + bson.D{{"ts", bson.D{{"$gte", startTS}}}}, + + bson.D{{"$expr", agg.Or{ + // plain ops: one write per op + append( + agg.And{agg.In("$op", mslices.Of("d", "i", "u"))}, + o.getNSFilter("$$ROOT")..., + ), + + // op=n is for no-ops, so we stay up-to-date. + agg.Eq{"$op", "n"}, + + // op=c is for applyOps, and also to detect forbidden DDL. + agg.And{ + agg.Eq{"$op", "c"}, + agg.Not{helpers.StringHasPrefix{ + FieldRef: "$ns", + Prefix: "config.", + }}, + }, + }}}, + }}} + + cursor, err := o.watcherClient. + Database("local"). + Collection( + "oplog.rs", + options.Collection().SetReadConcern(readconcern.Majority()), + ). + Find( + sctx, + oplogFilter, + findOpts, + ) + + if err != nil { + return bson.Timestamp{}, errors.Wrapf(err, "opening cursor to tail %s’s oplog", o.readerType) + } + + o.cursor = cursor + o.allowDDLBeforeTS = allowDDLBeforeTS + + return startTS, nil +} + +func (o *OplogReader) getExprProjection() bson.D { + return bson.D{ + {"ts", 1}, + {"op", agg.Cond{ + If: agg.And{ + agg.Eq{"$op", "u"}, + helpers.Exists{"$o._id"}, + }, + Then: "r", + Else: "$op", + }}, + {"ns", 1}, + + {"docLen", getOplogDocLenExpr("$$ROOT")}, + + {"docID", getOplogDocIDExpr("$$ROOT")}, + + {"cmdName", agg.Cond{ + If: agg.Eq{"$op", "c"}, + Then: agg.ArrayElemAt{ + Array: agg.Map{ + Input: bson.D{ + {"$objectToArray", "$o"}, + }, + As: "field", + In: "$$field.k", + }, + Index: 0, + }, + Else: "$$REMOVE", + }}, + + {"o", agg.Cond{ + If: agg.And{ + agg.Eq{"$op", "c"}, + agg.Eq{"missing", agg.Type{"$o.applyOps"}}, + }, + Then: "$o", + Else: "$$REMOVE", + }}, + + {"ops", agg.Cond{ + If: agg.And{ + agg.Eq{"$op", "c"}, + agg.Eq{agg.Type{"$o.applyOps"}, "array"}, + }, + Then: agg.Map{ + Input: agg.Filter{ + Input: "$o.applyOps", + As: "opEntry", + Cond: o.getNSFilter("$$opEntry"), + }, + As: "opEntry", + In: bson.D{ + {"op", "$$opEntry.op"}, + {"ns", "$$opEntry.ns"}, + {"docID", getOplogDocIDExpr("$$opEntry")}, + {"docLen", getOplogDocLenExpr("$$opEntry")}, + }, + }, + Else: "$$REMOVE", + }}, + } +} + +func (o *OplogReader) ddlAllowanceDocID() string { + return string(o.readerType) + "-ddlAllowanceTS" +} + +func (o *OplogReader) iterateCursor( + ctx context.Context, + _ *retry.FuncInfo, + sess *mongo.Session, +) error { + sctx := mongo.NewSessionContext(ctx, sess) + cursor := o.cursor + allowDDLBeforeTS := o.allowDDLBeforeTS + +CursorLoop: + for { + var err error + + select { + case <-sctx.Done(): + return sctx.Err() + case <-o.writesOffTs.Ready(): + o.logger.Debug(). + Stringer("reader", o). + Any("timestamp", o.writesOffTs.Get()). + Msg("Received writes-off timestamp.") + + break CursorLoop + default: + err = o.readAndHandleOneBatch(sctx, cursor, allowDDLBeforeTS) + if err != nil { + return err + } + } + } + + writesOffTS := o.writesOffTs.Get() + + for { + if !o.lastChangeEventTime.Load().OrZero().Before(writesOffTS) { + o.logger.Debug(). + Stringer("reader", o). + Any("lastChangeEventTS", o.lastChangeEventTime.Load()). + Any("writesOffTS", writesOffTS). + Msg("Reached writes-off timestamp.") + + break + } + + err := o.readAndHandleOneBatch(sctx, cursor, allowDDLBeforeTS) + if err != nil { + return err + } + } + + o.running = false + + infoLog := o.logger.Info() + if ts, has := o.lastChangeEventTime.Load().Get(); has { + infoLog = infoLog.Any("lastEventTime", ts) + o.startAtTs = lo.ToPtr(ts) + } else { + infoLog = infoLog.Str("lastEventTime", "none") + } + + infoLog. + Stringer("reader", o). + Msg("Oplog reader is done.") + + return nil +} + +var oplogOpToOperationType = map[string]string{ + "i": "insert", + "r": "replace", // NB: This doesn’t happen in the oplog; we project it. + "u": "update", + "d": "delete", +} + +func (o *OplogReader) readAndHandleOneBatch( + sctx context.Context, + cursor *mongo.Cursor, + allowDDLBeforeTS bson.Timestamp, +) error { + var err error + + o.curDocs = o.curDocs[:0] + o.scratch = o.scratch[:0] + + o.curDocs, o.scratch, err = mmongo.GetBatch(sctx, cursor, o.curDocs, o.scratch) + if err != nil { + return errors.Wrap(err, "reading cursor") + } + + if len(o.curDocs) == 0 { + // If there were no oplog events, then there’s nothing for us to do. + return nil + } + + var latestTS bson.Timestamp + + events := make([]ParsedEvent, 0, len(o.curDocs)) + + if util.ClusterHasBSONSize([2]int(o.clusterInfo.VersionArray)) { + events, latestTS, err = o.parseExprProjectedOps(events, allowDDLBeforeTS) + } else { + events, latestTS, err = o.parseRawOps(events, allowDDLBeforeTS) + } + + if err != nil { + return err + } + + sess := mongo.SessionFromContext(sctx) + resumeToken := oplog.ResumeToken{latestTS}.MarshalToBSON() + + o.updateLag(sess, resumeToken) + + // NB: events can legitimately be empty here because we might only have + // gotten op=n oplog entries, which we just use to advance the reader. + // (Similar to a change stream’s post-batch resume token.) + if len(events) > 0 { + o.batchSizeHistory.Add(len(events)) + } + + select { + case <-sctx.Done(): + return err + case o.eventBatchChan <- eventBatch{ + events: events, + resumeToken: resumeToken, + }: + } + + o.lastChangeEventTime.Store(option.Some(latestTS)) + + return nil +} + +func (o *OplogReader) parseRawOps(events []ParsedEvent, allowDDLBeforeTS bson.Timestamp) ([]ParsedEvent, bson.Timestamp, error) { + var latestTS bson.Timestamp + + nsPrefixesToExclude := o.getExcludedNSPrefixes() + + parseOneDocumentOp := func(opName string, ts bson.Timestamp, rawDoc bson.Raw) error { + nsStr, err := mbson.Lookup[string](rawDoc, "ns") + if err != nil { + return err + } + + // Things we always ignore: + for _, prefix := range nsPrefixesToExclude { + if strings.HasPrefix(nsStr, prefix) { + return nil + } + } + + // Honor namespace filtering: + if len(o.namespaces) > 0 && !slices.Contains(o.namespaces, nsStr) { + return nil + } + + var docID bson.RawValue + var docLength option.Option[types.ByteCount] + var docField string + + switch opName { + case "i": + docField = "o" + case "d": + docID, err = rawDoc.LookupErr("o", "_id") + if err != nil { + return errors.Wrap(err, "extracting o._id from delete") + } + case "u": + _, err := rawDoc.LookupErr("o", "_id") + if err == nil { + // replace, so we have the full doc + docField = "o" + } else if errors.Is(err, bsoncore.ErrElementNotFound) { + docID, err = rawDoc.LookupErr("o2", "_id") + if err != nil { + return errors.Wrap(err, "extracting o2._id from update") + } + } else { + return errors.Wrap(err, "extracting o._id from update") + } + default: + panic(fmt.Sprintf("op=%#q unexpected (%v)", opName, rawDoc)) + } + + if docField != "" { + if opName == "u" { + opName = "r" + } + + doc, err := mbson.Lookup[bson.Raw](rawDoc, docField) + if err != nil { + return errors.Wrap(err, "extracting doc from op") + } + + docLength = option.Some(types.ByteCount(len(doc))) + docID, err = doc.LookupErr("_id") + if err != nil { + return errors.Wrap(err, "extracting doc ID from op") + } + } else { + if docID.IsZero() { + panic("zero doc ID!") + } + } + + docID.Value = slices.Clone(docID.Value) + + events = append( + events, + ParsedEvent{ + OpType: oplogOpToOperationType[opName], + Ns: NewNamespace(SplitNamespace(nsStr)), + DocID: docID, + FullDocLen: docLength, + ClusterTime: lo.ToPtr(ts), + }, + ) + + return nil + } + + for _, rawDoc := range o.curDocs { + opName, err := mbson.Lookup[string](rawDoc, "op") + if err != nil { + return nil, bson.Timestamp{}, err + } + + err = mbson.LookupTo(rawDoc, &latestTS, "ts") + if err != nil { + return nil, bson.Timestamp{}, err + } + + switch opName { + case "n": + // Ignore. + case "c": + oDoc, err := mbson.Lookup[bson.Raw](rawDoc, "o") + if err != nil { + return nil, bson.Timestamp{}, err + } + + el, err := oDoc.IndexErr(0) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "getting first el of o doc") + } + + cmdName, err := el.KeyErr() + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "getting first field name of o doc") + } + + if cmdName != "applyOps" { + if o.onDDLEvent == onDDLEventAllow { + o.logIgnoredDDL(rawDoc) + continue + } + + if !latestTS.After(allowDDLBeforeTS) { + o.logger.Info(). + Stringer("event", rawDoc). + Msg("Ignoring unrecognized write from the past.") + + continue + } + + return nil, bson.Timestamp{}, UnknownEventError{rawDoc} + } + + var opsArray bson.Raw + err = mbson.UnmarshalElementValue(el, &opsArray) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "parsing applyOps") + } + + arrayVals, err := opsArray.Values() + if err != nil { + return nil, bson.Timestamp{}, errors.Wrap(err, "getting applyOps values") + } + + // Might as well ... + events = slices.Grow(events, len(arrayVals)) + + for i, opRV := range arrayVals { + opRaw, err := mbson.CastRawValue[bson.Raw](opRV) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrapf(err, "extracting applyOps[%d]", i) + } + + opName, err := mbson.Lookup[string](opRaw, "op") + if err != nil { + return nil, bson.Timestamp{}, errors.Wrapf(err, "extracting applyOps[%d].op", i) + } + + err = parseOneDocumentOp(opName, latestTS, opRaw) + if err != nil { + return nil, bson.Timestamp{}, errors.Wrapf(err, "processing applyOps[%d]", i) + } + } + default: + err := parseOneDocumentOp(opName, latestTS, rawDoc) + if err != nil { + return nil, bson.Timestamp{}, err + } + } + } + + return events, latestTS, nil +} + +func (o *OplogReader) parseExprProjectedOps(events []ParsedEvent, allowDDLBeforeTS bson.Timestamp) ([]ParsedEvent, bson.Timestamp, error) { + + var latestTS bson.Timestamp + + for _, rawDoc := range o.curDocs { + var op oplog.Op + + if err := (&op).UnmarshalFromBSON(rawDoc); err != nil { + return nil, bson.Timestamp{}, errors.Wrapf(err, "reading oplog entry") + } + + latestTS = op.TS + + switch op.Op { + case "n": + // Ignore. + case "c": + cmdName, has := op.CmdName.Get() + if !has { + return nil, bson.Timestamp{}, fmt.Errorf("no cmdname in op=c: %+v", op) + } + + if cmdName != "applyOps" { + if o.onDDLEvent == onDDLEventAllow { + o.logIgnoredDDL(rawDoc) + continue + } + + if !op.TS.After(allowDDLBeforeTS) { + o.logger.Info(). + Stringer("event", rawDoc). + Msg("Ignoring unrecognized write from the past.") + + continue + } + + return nil, bson.Timestamp{}, UnknownEventError{rawDoc} + } + + events = append( + events, + lo.Map( + op.Ops, + func(subOp oplog.Op, _ int) ParsedEvent { + return ParsedEvent{ + OpType: oplogOpToOperationType[subOp.Op], + Ns: NewNamespace(SplitNamespace(subOp.Ns)), + DocID: subOp.DocID, + FullDocLen: option.Some(types.ByteCount(subOp.DocLen)), + ClusterTime: &op.TS, + } + }, + )..., + ) + default: + events = append( + events, + ParsedEvent{ + OpType: oplogOpToOperationType[op.Op], + Ns: NewNamespace(SplitNamespace(op.Ns)), + DocID: op.DocID, + FullDocLen: option.Some(types.ByteCount(op.DocLen)), + ClusterTime: &op.TS, + }, + ) + } + } + + return events, latestTS, nil +} + +func (o *OplogReader) getExcludedNSPrefixes() []string { + return append( + slices.Clone(namespaces.ExcludedDBPrefixes), + + o.metaDB.Name()+".", + "config.", + "admin.", + ) +} + +func (o *OplogReader) getNSFilter(docroot string) agg.And { + filter := agg.And(lo.Map( + o.getExcludedNSPrefixes(), + func(prefix string, _ int) any { + return agg.Not{helpers.StringHasPrefix{ + FieldRef: docroot + ".ns", + Prefix: prefix, + }} + }, + )) + + if len(o.namespaces) > 0 { + filter = append( + filter, + agg.In(docroot+".ns", o.namespaces), + ) + } + + return filter +} + +func getOplogDocLenExpr(docroot string) any { + return agg.Cond{ + If: agg.Or{ + agg.Eq{docroot + ".op", "i"}, + agg.And{ + agg.Eq{docroot + ".op", "u"}, + helpers.Exists{docroot + ".o._id"}, + }, + }, + Then: agg.BSONSize{docroot + ".o"}, + Else: "$$REMOVE", + } +} + +func getOplogDocIDExpr(docroot string) any { + // $switch was new in MongoDB 4.4, so use $cond instead. + return agg.Switch{ + Branches: []agg.SwitchCase{ + { + Case: agg.Eq{docroot + ".op", "c"}, + Then: "$$REMOVE", + }, + { + Case: agg.In(docroot+".op", mslices.Of("i", "d")), + Then: docroot + ".o._id", + }, + { + Case: agg.Eq{docroot + ".op", "u"}, + Then: docroot + ".o2._id", + }, + }, + } +} + +func (o *OplogReader) String() string { + return fmt.Sprintf("%s oplog reader", o.readerType) +} diff --git a/internal/verifier/oplog_reader_test.go b/internal/verifier/oplog_reader_test.go new file mode 100644 index 00000000..26045d38 --- /dev/null +++ b/internal/verifier/oplog_reader_test.go @@ -0,0 +1,310 @@ +package verifier + +import ( + "time" + + "github.com/10gen/migration-verifier/contextplus" + "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/mbson" + "github.com/10gen/migration-verifier/mslices" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/v2/bson" +) + +// TestOplogReader_SourceDDL verifies that source DDL crashes the oplog reader. +func (suite *IntegrationTestSuite) TestOplogReader_SourceDDL() { + ctx := suite.Context() + + verifier := suite.BuildVerifier() + + if suite.GetTopology(verifier.srcClient) == util.TopologySharded { + suite.T().Skipf("oplog mode is only for unsharded clusters") + } + + var reader changeReader = verifier.newOplogReader( + nil, + src, + verifier.srcClient, + *verifier.srcClusterInfo, + ) + + dbName := suite.DBNameForTest() + + coll := verifier.srcClient.Database(dbName).Collection("coll") + + eg, egCtx := contextplus.ErrGroup(ctx) + suite.Require().NoError(reader.start(egCtx, eg)) + lo.Must(coll.InsertOne(ctx, bson.D{{"_id", "hey"}})) + + batchReceiver := reader.getReadChannel() + + timer := time.NewTimer(time.Minute) + + channelOpen := true + for channelOpen { + var batch eventBatch + select { + case <-ctx.Done(): + suite.Require().NoError(ctx.Err()) + case <-timer.C: + suite.Require().Fail("should read batch channel") + case batch, channelOpen = <-batchReceiver: + if channelOpen { + suite.T().Logf("got batch: %+v", batch) + } + } + } + + err := eg.Wait() + suite.Assert().ErrorAs(err, &UnknownEventError{}) + + // Confirm that the error text is wrapped: + suite.Assert().Contains(err.Error(), "reading") +} + +// TestOplogReader_Documents verifies that the oplog reader sees & publishes +// document changes on the source. +func (suite *IntegrationTestSuite) TestOplogReader_Documents() { + ctx := suite.Context() + + verifier := suite.BuildVerifier() + + dbName := suite.DBNameForTest() + + coll := verifier.srcClient.Database(dbName).Collection("coll") + + outFilterColl := verifier.srcClient. + Database(suite.DBNameForTest()). + Collection("coll2") + + lo.Must(coll.InsertOne(ctx, bson.D{{"_id", "hey"}})) + lo.Must(outFilterColl.InsertOne(ctx, bson.D{{"_id", "hey"}})) + + if suite.GetTopology(verifier.srcClient) == util.TopologySharded { + suite.T().Skipf("oplog mode is only for unsharded clusters") + } + + var reader changeReader = verifier.newOplogReader( + mslices.Of(FullName(coll)), + src, + verifier.srcClient, + *verifier.srcClusterInfo, + ) + + batchReceiver := reader.getReadChannel() + + var lastResumeTokenTS bson.Timestamp + + getBatch := func() eventBatch { + batch, isOpen := <-batchReceiver + suite.Require().True(isOpen, "channel should still be open") + + rtTS, err := mbson.Lookup[bson.Timestamp](batch.resumeToken, "ts") + suite.Require().NoError(err) + + lastResumeTokenTS = rtTS + + suite.Require().False( + rtTS.Before(*lo.LastOrEmpty(batch.events).ClusterTime), + "resume token must not predate the last event", + ) + + return batch + } + + eg, egCtx := contextplus.ErrGroup(ctx) + suite.Require().NoError(reader.start(egCtx, eg)) + + // NB: This should be the first event we see because the most + // recent op before this was for an out-filter namespace. + suite.Run( + "insert one", + func() { + + raw := lo.Must(bson.Marshal(bson.D{{"_id", "ho"}})) + lo.Must(coll.InsertOne(ctx, raw)) + batch := getBatch() + event := batch.events[0] + + suite.Assert().Equal( + NewNamespace(dbName, coll.Name()), + event.Ns, + ) + suite.Assert().Equal("insert", event.OpType) + suite.Assert().Equal("ho", lo.Must(mbson.CastRawValue[string](event.DocID))) + suite.Assert().EqualValues(len(raw), event.FullDocLen.MustGet(), "doc length") + }, + ) + + suite.Run( + "update one", + func() { + lo.Must(coll.UpdateOne( + ctx, + bson.D{{"_id", "hey"}}, + bson.D{{"$set", bson.D{{"foo", "bar"}}}}, + )) + + batch := getBatch() + event := batch.events[0] + + suite.Assert().Equal( + NewNamespace(dbName, coll.Name()), + event.Ns, + ) + suite.Assert().Equal("update", event.OpType) + suite.Assert().Equal("hey", lo.Must(mbson.CastRawValue[string](event.DocID))) + }, + ) + + suite.Run( + "replace one", + func() { + raw := lo.Must(bson.Marshal(bson.D{{"_id", "ho"}, {"a", "b"}})) + + lo.Must(coll.ReplaceOne(ctx, bson.D{{"_id", "ho"}}, raw)) + batch := getBatch() + event := batch.events[0] + + suite.Assert().Equal( + NewNamespace(dbName, coll.Name()), + event.Ns, + ) + suite.Assert().Equal("replace", event.OpType) + suite.Assert().Equal("ho", lo.Must(mbson.CastRawValue[string](event.DocID))) + suite.Assert().EqualValues(len(raw), event.FullDocLen.MustGet(), "doc length") + }, + ) + + suite.Run( + "delete one", + func() { + // Now check that the reader understands bulk inserts. + lo.Must(coll.DeleteOne(ctx, bson.D{{"_id", "hey"}})) + batch := getBatch() + event := batch.events[0] + + suite.Assert().Equal( + NewNamespace(dbName, coll.Name()), + event.Ns, + ) + suite.Assert().Equal("delete", event.OpType) + suite.Assert().Equal("hey", lo.Must(mbson.CastRawValue[string](event.DocID))) + }, + ) + + bulkDocs := []bson.D{ + {{"_id", 1.25}}, + {{"_id", 1.5}}, + {{"_id", 1.75}}, + {{"_id", 2.25}}, + } + + suite.Run( + "bulk insert", + func() { + lo.Must(coll.InsertMany(ctx, lo.ToAnySlice(bulkDocs))) + + docLen := len(lo.Must(bson.Marshal(bulkDocs[0]))) + + events := []ParsedEvent{} + + for len(events) < 4 { + batch := getBatch() + events = append(events, batch.events...) + } + + suite.Require().Len(events, 4) + + for i, event := range events { + suite.Assert().Equal("insert", event.OpType) + suite.Assert().EqualValues(docLen, event.FullDocLen.MustGet()) + + suite.Assert().Equal( + bulkDocs[i][0].Value, + lo.Must(mbson.CastRawValue[float64](event.DocID)), + "events[%d].DocID", i, + ) + } + }, + ) + + suite.Run( + "bulk update", + func() { + docIDs := lo.Map( + bulkDocs, + func(d bson.D, _ int) any { + return d[0].Value + }, + ) + + lo.Must(coll.UpdateMany( + ctx, + bson.D{{"_id", bson.D{{"$in", docIDs}}}}, + bson.D{{"$set", bson.D{{"aa", "bb"}}}}, + )) + + events := []ParsedEvent{} + + for len(events) < 4 { + batch := getBatch() + events = append(events, batch.events...) + } + + suite.Require().Len(events, 4) + + for _, event := range events { + suite.Assert().Equal("update", event.OpType) + } + + eventDocIDs := lo.Map( + events, + func(event ParsedEvent, _ int) any { + return lo.Must(mbson.CastRawValue[float64](event.DocID)) + }, + ) + + suite.Assert().ElementsMatch(docIDs, eventDocIDs) + }, + ) + + suite.Run( + "bulk delete", + func() { + docIDs := lo.Map( + bulkDocs, + func(d bson.D, _ int) any { + return d[0].Value + }, + ) + + lo.Must(coll.DeleteMany(ctx, bson.D{{"_id", bson.D{{"$in", docIDs}}}})) + + events := []ParsedEvent{} + + for len(events) < 4 { + batch := getBatch() + events = append(events, batch.events...) + } + + suite.Require().Len(events, 4) + + for _, event := range events { + suite.Assert().Equal("delete", event.OpType) + } + + eventDocIDs := lo.Map( + events, + func(event ParsedEvent, _ int) any { + return lo.Must(mbson.CastRawValue[float64](event.DocID)) + }, + ) + + suite.Assert().ElementsMatch(docIDs, eventDocIDs) + }, + ) + + reader.setWritesOff(lastResumeTokenTS) + suite.Require().NoError(eg.Wait()) +} diff --git a/internal/verifier/recheck_persist.go b/internal/verifier/recheck_persist.go index 82db17fc..dfb90c7c 100644 --- a/internal/verifier/recheck_persist.go +++ b/internal/verifier/recheck_persist.go @@ -10,11 +10,15 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" ) -type changeEventBatch struct { +type eventBatch struct { events []ParsedEvent resumeToken bson.Raw } +const ( + minResumeTokenPersistInterval = 10 * time.Second +) + // RunChangeEventPersistor persists rechecks from change event batches. // It needs to be started after the reader starts and should run in its own // goroutine. @@ -30,7 +34,7 @@ func (verifier *Verifier) RunChangeEventPersistor( var lastPersistedTime time.Time persistResumeTokenIfNeeded := func(ctx context.Context, token bson.Raw) { - if time.Since(lastPersistedTime) >= minChangeStreamPersistInterval { + if time.Since(lastPersistedTime) >= minResumeTokenPersistInterval { persistErr := persistCallback(ctx, token) if persistErr != nil { verifier.logger.Warn(). @@ -70,7 +74,7 @@ HandlerLoop: err = errors.Wrap( verifier.PersistChangeEvents(ctx, batch, clusterName), - "failed to handle change stream events", + "persisting rechecks for change events", ) if err == nil && batch.resumeToken != nil { @@ -83,19 +87,19 @@ HandlerLoop: } // PersistChangeEvents performs the necessary work for change events after receiving a batch. -func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeEventBatch, eventOrigin whichCluster) error { +func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch eventBatch, eventOrigin whichCluster) error { if len(batch.events) == 0 { return nil } - dbNames := make([]string, len(batch.events)) - collNames := make([]string, len(batch.events)) - docIDs := make([]bson.RawValue, len(batch.events)) - dataSizes := make([]int32, len(batch.events)) + dbNames := make([]string, 0, len(batch.events)) + collNames := make([]string, 0, len(batch.events)) + docIDs := make([]bson.RawValue, 0, len(batch.events)) + dataSizes := make([]int32, 0, len(batch.events)) latestTimestamp := bson.Timestamp{} - for i, changeEvent := range batch.events { + for _, changeEvent := range batch.events { if !supportedEventOpTypes.Contains(changeEvent.OpType) { panic(fmt.Sprintf("Unsupported optype in event; should have failed already! event=%+v", changeEvent)) } @@ -123,10 +127,14 @@ func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeE srcDBName = changeEvent.Ns.DB srcCollName = changeEvent.Ns.Coll } else { + if changeEvent.Ns.DB == verifier.metaDBName { + continue + } + dstNs := fmt.Sprintf("%s.%s", changeEvent.Ns.DB, changeEvent.Ns.Coll) srcNs, exist := verifier.nsMap.GetSrcNamespace(dstNs) if !exist { - return errors.Errorf("no source namespace corresponding to the destination namepsace %s", dstNs) + return errors.Errorf("no source namespace matches the destination namepsace %#q", dstNs) } srcDBName, srcCollName = SplitNamespace(srcNs) } @@ -139,21 +147,24 @@ func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeE panic(fmt.Sprintf("unknown event origin: %s", eventOrigin)) } - dbNames[i] = srcDBName - collNames[i] = srcCollName - docIDs[i] = changeEvent.DocID + dbNames = append(dbNames, srcDBName) + collNames = append(collNames, srcCollName) + docIDs = append(docIDs, changeEvent.DocID) + var dataSize int32 if changeEvent.FullDocLen.OrZero() > 0 { - dataSizes[i] = int32(changeEvent.FullDocLen.OrZero()) + dataSize = int32(changeEvent.FullDocLen.OrZero()) } else if changeEvent.FullDocument == nil { // This happens for deletes and for some updates. // The document is probably, but not necessarily, deleted. - dataSizes[i] = fauxDocSizeForDeleteEvents + dataSize = defaultUserDocumentSize } else { // This happens for inserts, replaces, and most updates. - dataSizes[i] = int32(len(changeEvent.FullDocument)) + dataSize = int32(len(changeEvent.FullDocument)) } + dataSizes = append(dataSizes, dataSize) + if err := eventRecorder.AddEvent(&changeEvent); err != nil { return errors.Wrapf( err, diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 0da76289..f9e93f86 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -64,7 +64,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { err := verifier.PersistChangeEvents( ctx, - changeEventBatch{events: mslices.Of(event)}, + eventBatch{events: mslices.Of(event)}, src, ) suite.Require().NoError(err) diff --git a/main/migration_verifier.go b/main/migration_verifier.go index aa1c8a65..29a3858b 100644 --- a/main/migration_verifier.go +++ b/main/migration_verifier.go @@ -33,6 +33,8 @@ const ( logPath = "logPath" srcNamespace = "srcNamespace" dstNamespace = "dstNamespace" + srcChangeReader = "srcChangeReader" + dstChangeReader = "dstChangeReader" metaDBName = "metaDBName" docCompareMethod = "docCompareMethod" verifyAll = "verifyAll" @@ -126,6 +128,22 @@ func main() { Name: dstNamespace, Usage: "destination `namespaces` to check", }), + altsrc.NewStringFlag(cli.StringFlag{ + Name: srcChangeReader, + Value: verifier.ChangeReaderOptChangeStream, + Usage: "How to read changes from the source. One of: " + strings.Join( + verifier.ChangeReaderOpts, + ", ", + ), + }), + altsrc.NewStringFlag(cli.StringFlag{ + Name: dstChangeReader, + Value: verifier.ChangeReaderOptChangeStream, + Usage: "How to read changes from the destination. One of: " + strings.Join( + verifier.ChangeReaderOpts, + ", ", + ), + }), altsrc.NewStringFlag(cli.StringFlag{ Name: metaDBName, Value: "migration_verification_metadata", @@ -344,9 +362,27 @@ func handleArgs(ctx context.Context, cCtx *cli.Context) (*verifier.Verifier, err } v.SetMetaDBName(cCtx.String(metaDBName)) + srcChangeReaderVal := cCtx.String(srcChangeReader) + if !slices.Contains(verifier.ChangeReaderOpts, srcChangeReaderVal) { + return nil, errors.Errorf("invalid %#q (%s); valid values are: %#q", srcChangeReader, srcChangeReaderVal, verifier.ChangeReaderOpts) + } + err = v.SetSrcChangeReaderMethod(srcChangeReaderVal) + if err != nil { + return nil, err + } + + dstChangeReaderVal := cCtx.String(dstChangeReader) + if !slices.Contains(verifier.ChangeReaderOpts, dstChangeReaderVal) { + return nil, errors.Errorf("invalid %#q (%s); valid values are: %#q", dstChangeReader, dstChangeReaderVal, verifier.ChangeReaderOpts) + } + err = v.SetDstChangeReaderMethod(srcChangeReaderVal) + if err != nil { + return nil, err + } + docCompareMethod := verifier.DocCompareMethod(cCtx.String(docCompareMethod)) if !slices.Contains(verifier.DocCompareMethods, docCompareMethod) { - return nil, errors.Errorf("invalid doc compare method (%s); valid value are: %v", docCompareMethod, verifier.DocCompareMethods) + return nil, errors.Errorf("invalid doc compare method (%s); valid values are: %#q", docCompareMethod, verifier.DocCompareMethods) } v.SetDocCompareMethod(docCompareMethod) diff --git a/mbson/raw_value.go b/mbson/raw_value.go index b0b1c96c..9993232f 100644 --- a/mbson/raw_value.go +++ b/mbson/raw_value.go @@ -9,7 +9,8 @@ import ( ) type bsonCastRecipient interface { - bson.Raw | bson.Timestamp | bson.ObjectID | string | int32 + bson.Raw | bson.RawArray | bson.Timestamp | bson.ObjectID | + string | int32 | float64 } type bsonSourceTypes interface { @@ -36,6 +37,10 @@ func CastRawValue[T bsonCastRecipient](in bson.RawValue) (T, error) { if doc, isDoc := in.DocumentOK(); isDoc { return any(doc).(T), nil } + case bson.RawArray: + if arr, ok := in.ArrayOK(); ok { + return any(arr).(T), nil + } case bson.Timestamp: if t, i, ok := in.TimestampOK(); ok { return any(bson.Timestamp{t, i}).(T), nil @@ -52,6 +57,10 @@ func CastRawValue[T bsonCastRecipient](in bson.RawValue) (T, error) { if val, ok := in.Int32OK(); ok { return any(val).(T), nil } + case float64: + if val, ok := in.DoubleOK(); ok { + return any(val).(T), nil + } default: panic(fmt.Sprintf("Unrecognized Go type: %T (maybe augment bsonType?)", in)) } diff --git a/mbson/raw_value_test.go b/mbson/raw_value_test.go index 3c35c32c..718b7c1c 100644 --- a/mbson/raw_value_test.go +++ b/mbson/raw_value_test.go @@ -6,6 +6,7 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/v2/bson" ) @@ -109,6 +110,29 @@ func TestRaw(t *testing.T) { } } +func TestRawArray(t *testing.T) { + vals := lo.Map( + []bson.RawArray{ + lo.Must(bson.Marshal(bson.D{})), + lo.Must(bson.Marshal(bson.D{{"0", nil}})), + lo.Must(bson.Marshal(bson.D{{"0", 1.2}, {"1", "abc"}})), + }, + func(ra bson.RawArray, _ int) bson.RawValue { + return bson.RawValue{ + Type: bson.TypeArray, + Value: []byte(ra), + } + }, + ) + + for _, cur := range vals { + ra, err := CastRawValue[bson.RawArray](cur) + require.NoError(t, err) + + assert.Equal(t, cur.Value, []byte(ra), "expect same bytes") + } +} + func TestTimestamp(t *testing.T) { vals := []bson.Timestamp{ {0, 0}, diff --git a/mmongo/cursor.go b/mmongo/cursor.go new file mode 100644 index 00000000..d9a4c7f0 --- /dev/null +++ b/mmongo/cursor.go @@ -0,0 +1,52 @@ +package mmongo + +import ( + "context" + "fmt" + "slices" + + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" +) + +// GetBatch returns a batch of documents from a cursor. It does so by appending +// to passed-in slices, which lets you optimize memory handling. +func GetBatch( + ctx context.Context, + cursor *mongo.Cursor, + docs []bson.Raw, + buffer []byte, +) ([]bson.Raw, []byte, error) { + var docsCount, expectedCount int + + for hasDocs := true; hasDocs; hasDocs = cursor.RemainingBatchLength() > 0 { + got := cursor.TryNext(ctx) + + if cursor.Err() != nil { + return nil, nil, errors.Wrap(cursor.Err(), "cursor iteration failed") + } + + if !got { + if docsCount != 0 { + panic(fmt.Sprintf("Docs batch ended after %d but expected %d", docsCount, expectedCount)) + } + + break + } + + // This ensures we only reallocate once (if at all): + if docsCount == 0 { + expectedCount = 1 + cursor.RemainingBatchLength() + docs = slices.Grow(docs, expectedCount) + } + + docsCount++ + + docPos := len(buffer) + buffer = append(buffer, cursor.Current...) + docs = append(docs, buffer[docPos:]) + } + + return docs, buffer, nil +}