From 94ee11f6abaa80166b1310998ab1f14f5b113ece Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Fri, 30 Aug 2024 15:53:16 -0600 Subject: [PATCH 01/14] Do not send snapshot field in OP_MSG This field was depecrated in MongoDB 4.0. It's an unrecognized field as of MongoDB 6.0. https://github.com/mongodb/specifications/blob/e9f02f328a93d6a0321694cc30d031079c555e4a/source/crud/crud.md?plain=1#L681 --- Database/MongoDB/Query.hs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index b83e316..7717b78 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -1239,7 +1239,7 @@ data Query = Query { skip :: Word32, -- ^ Number of initial matching documents to skip. Default = 0 limit :: Limit, -- ^ Maximum number of documents to return, 0 = no limit. Default = 0 sort :: Order, -- ^ Sort results by this order, @[]@ = no sort. Default = @[]@ - snapshot :: Bool, -- ^ If true assures no duplicates are returned, or objects missed, which were present at both the start and end of the query's execution (even if the object were updated). If an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode. Note that short query responses (less than 1MB) are always effectively snapshotted. Default = @False@ + snapshot :: Bool, -- ^ Deprecated since MongoDB 4.0. If true assures no duplicates are returned, or objects missed, which were present at both the start and end of the query's execution (even if the object were updated). If an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode. Note that short query responses (less than 1MB) are always effectively snapshotted. Default = @False@ batchSize :: BatchSize, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Default = 0 hint :: Order -- ^ Force MongoDB to use this index, @[]@ = no hint. Default = @[]@ } deriving (Show, Eq) @@ -1520,10 +1520,9 @@ queryRequestOpMsg isExplain Query{..} = do -- the relevant fields to the selector isNotCommand = null $ catMaybes $ map (\l -> look l (selector selection)) (noticeCommands ++ adminCommands) mOrder = if null sort then Nothing else Just ("sort" =: sort) - mSnapshot = if snapshot then Just ("snapshot" =: True) else Nothing mHint = if null hint then Nothing else Just ("hint" =: hint) mExplain = if isExplain then Just ("$explain" =: True) else Nothing - special = catMaybes [mOrder, mSnapshot, mHint, mExplain] + special = catMaybes [mOrder, mHint, mExplain] qProjector = if isNotCommand then ["projection" =: project] else project qSelector = if isNotCommand then c else s where s = selector selection From c55743e4e6d2743baa989ff269501d32843469c8 Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Fri, 30 Aug 2024 16:27:54 -0600 Subject: [PATCH 02/14] Remove obsolete explain modifier from OpMsg queries https://github.com/mongodb/specifications/blob/e9f02f328a93d6a0321694cc30d031079c555e4a/source/find_getmore_killcursors_commands.md?plain=1#L439 --- Database/MongoDB/Query.hs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 7717b78..fd9ff8a 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -430,7 +430,7 @@ allCollections = do return $ mapMaybe (\d -> d !? "name") docs else do let q = Query [] (Select ["listCollections" =: (1 :: Int)] "$cmd") [] 0 0 [] False 0 [] - qr <- queryRequestOpMsg False q + qr <- queryRequestOpMsg q dBatch <- liftIO $ requestOpMsg p qr [] db <- thisDatabase nc <- newCursor db "$cmd" 0 dBatch @@ -1303,7 +1303,7 @@ find q@Query{selection, batchSize} = do dBatch <- liftIO $ request pipe [] qr newCursor db (coll selection) batchSize dBatch else do - qr <- queryRequestOpMsg False q + qr <- queryRequestOpMsg q let newQr = case fst qr of Req P.Query{..} -> @@ -1365,7 +1365,7 @@ findOne q = do if (maxWireVersion sd < 17) then legacyQuery else do - qr <- queryRequestOpMsg False q {limit = 1} + qr <- queryRequestOpMsg q {limit = 1} let newQr = case fst qr of Req P.Query{..} -> @@ -1504,9 +1504,9 @@ queryRequest isExplain Query{..} = do special = catMaybes [mOrder, mSnapshot, mHint, mExplain] qSelector = if null special then s else ("$query" =: s) : special where s = selector selection -queryRequestOpMsg :: (Monad m, MonadIO m) => Bool -> Query -> Action m (Cmd, Maybe Limit) +queryRequestOpMsg :: (Monad m, MonadIO m) => Query -> Action m (Cmd, Maybe Limit) -- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. -queryRequestOpMsg isExplain Query{..} = do +queryRequestOpMsg Query{..} = do ctx <- ask return $ queryRequest' (mongoReadMode ctx) (mongoDatabase ctx) where @@ -1521,8 +1521,7 @@ queryRequestOpMsg isExplain Query{..} = do isNotCommand = null $ catMaybes $ map (\l -> look l (selector selection)) (noticeCommands ++ adminCommands) mOrder = if null sort then Nothing else Just ("sort" =: sort) mHint = if null hint then Nothing else Just ("hint" =: hint) - mExplain = if isExplain then Just ("$explain" =: True) else Nothing - special = catMaybes [mOrder, mHint, mExplain] + special = catMaybes [mOrder, mHint] qProjector = if isNotCommand then ["projection" =: project] else project qSelector = if isNotCommand then c else s where s = selector selection @@ -1737,7 +1736,7 @@ aggregateCursor aColl agg cfg = do >>= either (liftIO . throwIO . AggregateFailure) return else do let q = select (aggregateCommand aColl agg cfg) aColl - qr <- queryRequestOpMsg False q + qr <- queryRequestOpMsg q dBatch <- liftIO $ requestOpMsg pipe qr [] db <- thisDatabase Right <$> newCursor db aColl 0 dBatch From 79dc67912d172fa781b4ae4291ad6fa94d78ffee Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Thu, 5 Sep 2024 08:47:08 -0600 Subject: [PATCH 03/14] Correct delete behavior for OP_MSG protocol --- Database/MongoDB/Query.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index fd9ff8a..25b5bff 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -1001,7 +1001,7 @@ delete s = do let sd = P.serverData pipe if maxWireVersion sd < 17 then deleteHelper [] s - else deleteMany (coll s) [([], [])] >> return () + else void $ deleteMany (coll s) [(selector s, [])] deleteOne :: (MonadIO m) => Selection -> Action m () From 95d3507ab448e7bb90b2e81991271ca14785cbbf Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Thu, 5 Sep 2024 10:19:54 -0600 Subject: [PATCH 04/14] Respect update write concerns for OP_MSG protocol --- Database/MongoDB/Query.hs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 25b5bff..bcfcf6c 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -731,12 +731,16 @@ update opts (Select sel col) up = do ctx <- ask liftIO $ runReaderT (void $ write (Update (db <.> col) opts sel up)) ctx else do + mode <- asks mongoWriteMode + let writeConcern = case mode of + NoConfirm -> ["w" =: (0 :: Int32)] + Confirm params -> params liftIOE ConnectionFailure $ P.sendOpMsg pipe [Nc (Update (db <.> col) opts sel up)] (Just P.MoreToCome) - ["writeConcern" =: ["w" =: (0 :: Int32)]] + writeConcern updateCommandDocument :: Collection -> Bool -> [Document] -> Document -> Document updateCommandDocument col ordered updates writeConcern = From 93a14ea5e096fb6d58506276dafb96e585c2466f Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Tue, 3 Sep 2024 10:51:46 -0600 Subject: [PATCH 05/14] Distinguish Query Hint and Order field types --- Database/MongoDB/Query.hs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index bcfcf6c..2930264 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -1245,7 +1245,7 @@ data Query = Query { sort :: Order, -- ^ Sort results by this order, @[]@ = no sort. Default = @[]@ snapshot :: Bool, -- ^ Deprecated since MongoDB 4.0. If true assures no duplicates are returned, or objects missed, which were present at both the start and end of the query's execution (even if the object were updated). If an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode. Note that short query responses (less than 1MB) are always effectively snapshotted. Default = @False@ batchSize :: BatchSize, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Default = 0 - hint :: Order -- ^ Force MongoDB to use this index, @[]@ = no hint. Default = @[]@ + hint :: Hint -- ^ Force MongoDB to use this index, @[]@ = no hint. Default = @[]@ } deriving (Show, Eq) type Projector = Document @@ -1257,6 +1257,11 @@ type Limit = Word32 type Order = Document -- ^ Fields to sort by. Each one is associated with 1 or -1. Eg. @[\"x\" =: 1, \"y\" =: -1]@ means sort by @x@ ascending then @y@ descending +-- The current specification allows this to be passed as a string or a document. For backwards compatibility and simplicity, I'm leaving this unchanged to continue being required as a document. +-- https://github.com/mongodb/specifications/blob/e651ebe819743f1cffae938f5488c69d5277c21f/source/crud/crud.md?plain=1#L1953 +type Hint = Document +-- ^ Index specification. If specified, then the query system will only consider plans using the hinted index. Specify the index name as a key pattern, e.g. { age: 1 } + type BatchSize = Word32 -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. From 2952292b96c3fcda9606e64b4062a4cc762d8e78 Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Tue, 3 Sep 2024 15:49:57 -0600 Subject: [PATCH 06/14] Refactor the Legacy breaking Mongo wire version number --- Database/MongoDB/Query.hs | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 2930264..5080398 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -133,9 +133,8 @@ import Database.MongoDB.Internal.Protocol pwKey, FlagBit (..) ) -import Control.Monad.Trans.Except import qualified Database.MongoDB.Internal.Protocol as P -import Database.MongoDB.Internal.Util (liftIOE, loop, true1, (<.>), splitDot) +import Database.MongoDB.Internal.Util (liftIOE, loop, true1, (<.>)) import System.Mem.Weak (Weak) import Text.Read (readMaybe) import Prelude hiding (lookup) @@ -205,6 +204,12 @@ data Upserted = Upserted , upsertedId :: ObjectId } deriving Show +-- MongoDB 5.1 Removed support OP_QUERY, and other CRUD specific OpCodes. +-- OP_MSG is go forward. +-- https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/#legacy-opcodes +opMsgOnlyWireVersion :: Int +opMsgOnlyWireVersion = 17 + master :: AccessMode -- ^ Same as 'ConfirmWrites' [] master = ConfirmWrites [] @@ -412,7 +417,7 @@ allCollections = do docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]} (return . filter (not . isSpecial db)) (map (dropDbPrefix . at "name") docs) else - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then do r <- runCommand1 "listCollections" let curData = do @@ -594,7 +599,7 @@ insertBlock opts col (prevCount, docs) = do case errorMessage of Just failure -> return $ Left failure Nothing -> return $ Right $ map (valueAt "_id") docs - else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do + else if maxWireVersion sd == 2 then do mode <- asks mongoWriteMode let writeConcern = case mode of NoConfirm -> ["w" =: (0 :: Int32)] @@ -726,7 +731,7 @@ update opts (Select sel col) up = do pipe <- asks mongoPipe db <- thisDatabase let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then do ctx <- ask liftIO $ runReaderT (void $ write (Update (db <.> col) opts sel up)) ctx @@ -847,7 +852,7 @@ updateBlock ordered col (prevCount, docs) = do let sd = P.serverData p if maxWireVersion sd < 2 then liftIO $ ioError $ userError "updateMany doesn't support mongodb older than 2.6" - else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do + else if maxWireVersion sd == 2 then do mode <- asks mongoWriteMode let writeConcern = case mode of NoConfirm -> ["w" =: (0 :: Int32)] @@ -1003,7 +1008,7 @@ delete :: (MonadIO m) delete s = do pipe <- asks mongoPipe let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then deleteHelper [] s else void $ deleteMany (coll s) [(selector s, [])] @@ -1013,7 +1018,7 @@ deleteOne :: (MonadIO m) deleteOne sel@((Select sel' col)) = do pipe <- asks mongoPipe let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then deleteHelper [SingleRemove] sel else do -- Starting with v6 confirming writes via getLastError as it is @@ -1117,7 +1122,7 @@ deleteBlock ordered col (prevCount, docs) = do let sd = P.serverData p if maxWireVersion sd < 2 then liftIO $ ioError $ userError "deleteMany doesn't support mongodb older than 2.6" - else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do + else if maxWireVersion sd == 2 then do mode <- asks mongoWriteMode let writeConcern = case mode of NoConfirm -> ["w" =: (0 :: Int32)] @@ -1306,7 +1311,7 @@ find q@Query{selection, batchSize} = do pipe <- asks mongoPipe db <- thisDatabase let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then do qr <- queryRequest False q dBatch <- liftIO $ request pipe [] qr @@ -1328,7 +1333,7 @@ findCommand :: (MonadIO m) => Query -> Action m Cursor findCommand q@Query{..} = do pipe <- asks mongoPipe let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then do let aColl = coll selection response <- runCommand $ @@ -1371,7 +1376,7 @@ findOne q = do then legacyQuery else do let sd = P.serverData pipe - if (maxWireVersion sd < 17) + if maxWireVersion sd < opMsgOnlyWireVersion then legacyQuery else do qr <- queryRequestOpMsg q {limit = 1} @@ -1648,7 +1653,7 @@ nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Maybe Limit -> Curso nextBatch' fcol batchSize limit cid = do pipe <- asks mongoPipe let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then liftIO $ request pipe [] (GetMore fcol batchSize' cid, remLimit) else liftIO $ requestOpMsg pipe (Req $ GetMore fcol batchSize' cid, remLimit) [] where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit @@ -1674,7 +1679,7 @@ next (Cursor fcol batchSize var) = liftDB $ modifyMVar var nextState where when (newLimit == Just 0) $ unless (cid == 0) $ do pipe <- asks mongoPipe let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] else liftIOE ConnectionFailure $ P.sendOpMsg pipe [Kc (P.KillC (KillCursors [cid]) fcol)] (Just MoreToCome) [] return (dBatch', Just doc) @@ -1738,7 +1743,7 @@ aggregateCursor :: (MonadIO m) => Collection -> Pipeline -> AggregateConfig -> A aggregateCursor aColl agg cfg = do pipe <- asks mongoPipe let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then do response <- runCommand (aggregateCommand aColl agg cfg) getCursorFromResponse aColl response @@ -1889,7 +1894,7 @@ type Command = Document runCommand :: (MonadIO m) => Command -> Action m Document runCommand params = do pipe <- asks mongoPipe - if isHandshake params || maxWireVersion (P.serverData pipe) < 17 + if isHandshake params || maxWireVersion (P.serverData pipe) < opMsgOnlyWireVersion then runCommandLegacy pipe params else runCommand' pipe params From 070cdd864516ecd65066f9bd7059031674572a71 Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Mon, 26 Aug 2024 13:45:42 -0600 Subject: [PATCH 07/14] Send OP_MSG query request options The readPreference field is required to read from replicaSet secondaries. --- Database/MongoDB/Internal/Protocol.hs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index dbf7630..b7b3ba9 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -74,7 +74,7 @@ import qualified Database.MongoDB.Transport as Tr #if MIN_VERSION_base(4,6,0) import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, putMVar, readMVar, mkWeakMVar, isEmptyMVar) -import GHC.List (foldl1') +import GHC.List (foldl', foldl1') import Conduit (repeatWhileMC, (.|), runConduit, foldlC) #else import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, @@ -570,7 +570,7 @@ putOpMsg cmd requestId flagBit params = do Query{..} -> do let n = T.splitOn "." qFullCollection db = head n - sec0 = foldl1' merge [qProjector, [ "$db" =: db ], qSelector] + sec0 = foldl1' merge [optionsToDoc qOptions, qProjector, [ "$db" =: db ], qSelector] putInt32 biT putInt8 0 putDocument sec0 @@ -721,6 +721,21 @@ qBit Database.MongoDB.Internal.Protocol.Partial = bit 7 qBits :: [QueryOption] -> Int32 qBits = bitOr . map qBit +optionToDoc :: QueryOption -> Document +optionToDoc qOpt = + case qOpt of + TailableCursor -> ["tailable" =: True] + NoCursorTimeout -> ["noCursorTimeout" =: True] + AwaitData -> ["awaitData" =: True] + Partial -> ["allowPartialResults" =: True] + -- "secondaryPreferred" matches the replicaSet host selection critieria of secondaryOk() + -- https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.md + -- https://www.mongodb.com/docs/manual/core/read-preference/#mongodb-readmode-secondaryPreferred + SlaveOK -> ["$readPreference" =: ["mode" =: ("secondaryPreferred" :: Text)]] + +optionsToDoc :: [QueryOption] -> Document +optionsToDoc = foldl' merge [] . map optionToDoc + -- ** Reply -- | A reply is a message received in response to a 'Request' From 3481393ed59ea28510b874cac257f56ae9484549 Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Wed, 11 Sep 2024 14:15:49 -0600 Subject: [PATCH 08/14] Upgrade mongoDB test image to 5.0 plus QOL testing improvements --- docker-compose.yml | 7 ++++--- mongoDB.cabal | 1 + test/Main.hs | 1 - 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index f79c6ee..6391f27 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,16 +1,17 @@ -version: '3' services: mongodb: ports: - 27017:27017 - image: mongo:4.0 + image: mongo:5.0 mongodb-haskell: - image: phadej/ghc:8.0.2 + image: phadej/ghc:8.10.4-focal environment: - HASKELL_MONGODB_TEST_HOST=mongodb + - MONGO_VERSION=mongo:5.0 entrypoint: - /bin/bash volumes: - ./:/opt/mongodb-haskell + working_dir: /opt/mongodb-haskell # vim: ts=2 et sw=2 ai diff --git a/mongoDB.cabal b/mongoDB.cabal index f78a68f..74ab503 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -102,6 +102,7 @@ test-suite test , old-locale , text , time + build-tool-depends: hspec-discover:hspec-discover default-language: Haskell2010 default-extensions: OverloadedStrings diff --git a/test/Main.hs b/test/Main.hs index 267f7ee..9b13867 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -6,7 +6,6 @@ import Data.Maybe (isJust) import qualified Spec import System.Environment (getEnv, lookupEnv) import Test.Hspec.Runner -import TestImport main :: IO () main = do From 26862d3917114ce84f4d2f3773e84a448882eb51 Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Wed, 18 Sep 2024 07:00:51 -0600 Subject: [PATCH 09/14] Send read preference argument in OP_MSG requests --- Database/MongoDB/Internal/Protocol.hs | 49 +++++++++++++++++++++------ Database/MongoDB/Query.hs | 21 +++++++++--- 2 files changed, 56 insertions(+), 14 deletions(-) diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index b7b3ba9..6dd7c5d 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -18,7 +18,9 @@ #endif module Database.MongoDB.Internal.Protocol ( + -- * Global command arguments FullCollection, + ReadPreferenceMode(..), setReadPreferenceMode, -- * Pipe Pipe, newPipe, newPipeWith, send, sendOpMsg, call, callOpMsg, -- ** Notice @@ -570,21 +572,23 @@ putOpMsg cmd requestId flagBit params = do Query{..} -> do let n = T.splitOn "." qFullCollection db = head n - sec0 = foldl1' merge [optionsToDoc qOptions, qProjector, [ "$db" =: db ], qSelector] + sec0 = foldl1' merge [qReadPreference, optionsToDoc qOptions, qProjector, [ "$db" =: db ], qSelector] putInt32 biT putInt8 0 putDocument sec0 GetMore{..} -> do let n = T.splitOn "." gFullCollection (db, coll) = (head n, last n) - pre = ["getMore" =: gCursorId, "collection" =: coll, "$db" =: db, "batchSize" =: gBatchSize] + pre = merge + gReadPreference + ["getMore" =: gCursorId, "collection" =: coll, "$db" =: db, "batchSize" =: gBatchSize] putInt32 (bit $ bitOpMsg $ ExhaustAllowed) putInt8 0 putDocument pre Message{..} -> do putInt32 biT putInt8 0 - putDocument $ merge [ "$db" =: mDatabase ] mParams + putDocument $ foldl1' merge [mReadPreference, [ "$db" =: mDatabase ], mParams] Kc k -> case k of KillC{..} -> do let n = T.splitOn "." kFullCollection @@ -649,6 +653,8 @@ bitOpMsg ExhaustAllowed = 16 -- ** Request -- | A request is a message that is sent with a 'Reply' expected in return +-- Read preference is a global argument required for requests to secondaries using the OP_MSG protocol. +-- https://github.com/mongodb/specifications/blob/ffa75b41736f669c754dff9c0a0d60bb70eb319f/source/server-selection/server-selection.md?plain=1#L503 data Request = Query { qOptions :: [QueryOption], @@ -656,14 +662,17 @@ data Request = qSkip :: Int32, -- ^ Number of initial matching documents to skip qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size. qSelector :: Document, -- ^ @[]@ = return all documents in collection - qProjector :: Document -- ^ @[]@ = return whole document + qProjector :: Document, -- ^ @[]@ = return whole document + qReadPreference :: Document } | GetMore { gFullCollection :: FullCollection, gBatchSize :: Int32, - gCursorId :: CursorId + gCursorId :: CursorId, + gReadPreference :: Document } | Message { mDatabase :: Text, - mParams :: Document + mParams :: Document, + mReadPreference :: Document } deriving (Show, Eq) @@ -679,6 +688,15 @@ data QueryOption = | Partial -- ^ Get partial results from a /mongos/ if some shards are down, instead of throwing an error. deriving (Show, Eq) +-- https://www.mongodb.com/docs/manual/core/read-preference/ +data ReadPreferenceMode + = Primary -- Default mode. + | Secondary + | PrimaryPreferred + | SecondaryPreferred -- Corresponds to slaveOk access mode. + | Nearest + deriving (Show, Eq) + -- *** Binary format qOpcode :: Request -> Opcode @@ -728,14 +746,25 @@ optionToDoc qOpt = NoCursorTimeout -> ["noCursorTimeout" =: True] AwaitData -> ["awaitData" =: True] Partial -> ["allowPartialResults" =: True] - -- "secondaryPreferred" matches the replicaSet host selection critieria of secondaryOk() - -- https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.md - -- https://www.mongodb.com/docs/manual/core/read-preference/#mongodb-readmode-secondaryPreferred - SlaveOK -> ["$readPreference" =: ["mode" =: ("secondaryPreferred" :: Text)]] + SlaveOK -> [] optionsToDoc :: [QueryOption] -> Document optionsToDoc = foldl' merge [] . map optionToDoc +readPreferenceModeToText :: ReadPreferenceMode -> Text +readPreferenceModeToText mode = + case mode of + Primary -> "primary" + Secondary -> "secondary" + PrimaryPreferred -> "primaryPreferred" + SecondaryPreferred -> "secondaryPreferred" + Nearest -> "nearest" + +setReadPreferenceMode :: ReadPreferenceMode -> Document +setReadPreferenceMode mode = + ["mode" =: (readPreferenceModeToText mode)] + + -- ** Reply -- | A reply is a message received in response to a 'Request' diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 5080398..13eb9c7 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -226,6 +226,15 @@ readMode :: AccessMode -> ReadMode readMode ReadStaleOk = StaleOk readMode _ = Fresh +-- https://github.com/mongodb/specifications/blob/ffa75b41736f669c754dff9c0a0d60bb70eb319f/source/server-selection/server-selection.md?plain=1#L548 +readPreferenceDoc :: ReadMode -> Document +readPreferenceDoc mode = + case mode of + -- "secondaryPreferred" matches the replicaSet host selection critieria of secondaryOk() + -- https://www.mongodb.com/docs/manual/core/read-preference/#mongodb-readmode-secondaryPreferred + StaleOk -> ["$readPreference" =: P.setReadPreferenceMode P.SecondaryPreferred] + _ -> [] + writeMode :: AccessMode -> WriteMode writeMode ReadStaleOk = Confirm [] writeMode UnconfirmedWrites = NoConfirm @@ -1517,6 +1526,7 @@ queryRequest isExplain Query{..} = do mExplain = if isExplain then Just ("$explain" =: True) else Nothing special = catMaybes [mOrder, mSnapshot, mHint, mExplain] qSelector = if null special then s else ("$query" =: s) : special where s = selector selection + qReadPreference = [] queryRequestOpMsg :: (Monad m, MonadIO m) => Query -> Action m (Cmd, Maybe Limit) -- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. @@ -1525,7 +1535,7 @@ queryRequestOpMsg Query{..} = do return $ queryRequest' (mongoReadMode ctx) (mongoDatabase ctx) where queryRequest' rm db = (Req P.Query{..}, remainingLimit) where - qOptions = readModeOption rm ++ options + qOptions = options qFullCollection = db <.> coll selection qSkip = fromIntegral skip (qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize (if limit == 0 then Nothing else Just limit) @@ -1542,6 +1552,7 @@ queryRequestOpMsg Query{..} = do bSize = if qBatchSize == 0 then Nothing else Just ("batchSize" =: qBatchSize) mLimit = if limit == 0 then Nothing else maybe Nothing (\rL -> Just ("limit" =: (fromIntegral rL :: Int32))) remainingLimit c = ("filter" =: s) : special ++ maybeToList bSize ++ maybeToList mLimit + qReadPreference = readPreferenceDoc rm batchSizeRemainingLimit :: BatchSize -> Maybe Limit -> (Int32, Maybe Limit) -- ^ Given batchSize and limit return P.qBatchSize and remaining limit @@ -1652,10 +1663,11 @@ fulfill' fcol batchSize dBatch = do nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Maybe Limit -> CursorId -> Action m DelayedBatch nextBatch' fcol batchSize limit cid = do pipe <- asks mongoPipe + mode <- asks mongoReadMode let sd = P.serverData pipe if maxWireVersion sd < opMsgOnlyWireVersion - then liftIO $ request pipe [] (GetMore fcol batchSize' cid, remLimit) - else liftIO $ requestOpMsg pipe (Req $ GetMore fcol batchSize' cid, remLimit) [] + then liftIO $ request pipe [] (GetMore fcol batchSize' cid [], remLimit) + else liftIO $ requestOpMsg pipe (Req $ GetMore fcol batchSize' cid (readPreferenceDoc mode), remLimit) [] where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit next :: MonadIO m => Cursor -> Action m (Maybe Document) @@ -1910,7 +1922,8 @@ runCommandLegacy pipe params = do runCommand' :: MonadIO m => Pipe -> Selector -> ReaderT MongoContext m Document runCommand' pipe params = do ctx <- ask - rq <- liftIO $ requestOpMsg pipe ( Req (P.Message (mongoDatabase ctx) params), Just 1) [] + let readPreference = readPreferenceDoc $ mongoReadMode ctx + rq <- liftIO $ requestOpMsg pipe ( Req (P.Message (mongoDatabase ctx) params readPreference), Just 1) [] Batch _ _ docs <- liftDB $ fulfill rq case docs of [doc] -> pure doc From c083ead0a0139278b6e77cb8ac4b94cb7d7ea7f6 Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Wed, 23 Oct 2024 09:27:22 -0600 Subject: [PATCH 10/14] Use OP_MSG to kill cursors --- Database/MongoDB/Query.hs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 13eb9c7..147220e 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -1648,7 +1648,10 @@ nextBatch (Cursor fcol batchSize var) = liftDB $ modifyMVar var $ \dBatch -> do (0, _) -> return (emptyBatch, resultDocs) (_, Just 0) -> do pipe <- asks mongoPipe - liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] + let sd = P.serverData pipe + if maxWireVersion sd < opMsgOnlyWireVersion + then liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] + else liftIOE ConnectionFailure $ P.sendOpMsg pipe [Kc (P.KillC (KillCursors [cid]) fcol)] (Just MoreToCome) [] return (emptyBatch, resultDocs) (_, _) -> (, resultDocs) <$> getNextBatch @@ -1710,11 +1713,14 @@ rest :: MonadIO m => Cursor -> Action m [Document] rest c = loop (next c) closeCursor :: MonadIO m => Cursor -> Action m () -closeCursor (Cursor _ _ var) = liftDB $ modifyMVar var $ \dBatch -> do +closeCursor (Cursor fcol _ var) = liftDB $ modifyMVar var $ \dBatch -> do Batch _ cid _ <- fulfill dBatch unless (cid == 0) $ do pipe <- asks mongoPipe - liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] + let sd = P.serverData pipe + if maxWireVersion sd < opMsgOnlyWireVersion + then liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] + else liftIOE ConnectionFailure $ P.sendOpMsg pipe [Kc (P.KillC (KillCursors [cid]) fcol)] (Just MoreToCome) [] return (return $ Batch (Just 0) 0 [], ()) isCursorClosed :: MonadIO m => Cursor -> Action m Bool From 5825c90c66ae45fbbfbe36dabb432f697569b224 Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Tue, 19 Nov 2024 10:20:24 -0700 Subject: [PATCH 11/14] Fix request and response checking in MoreToCome processing Addresses https://github.com/mongodb-haskell/mongodb/issues/153 --- Database/MongoDB/Internal/Protocol.hs | 47 +++++++++++++-------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 6dd7c5d..f38cfcd 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -287,9 +287,11 @@ callOpMsg pipe request flagBit params = do (rt, r) -> case r of ReplyOpMsg{..} -> - if flagBits == [MoreToCome] - then yieldResponses .| foldlC mergeResponses p - else return $ (rt, check reqId p) + if rt /= reqId then + error $ "expected response id (" ++ show rt ++ ") to match request id (" ++ show reqId ++ ")" + else + if flagBits == [MoreToCome] then yieldResponses .| foldlC mergeResponses p + else return (rt, r) _ -> error "Impossible" -- see comment above yieldResponses = repeatWhileMC (do @@ -298,27 +300,24 @@ callOpMsg pipe request flagBit params = do readMVar var >>= either throwIO return :: IO Response ) checkFlagBit - mergeResponses p@(rt,rep) p' = - case (p, p') of - ((_, r), (_, r')) -> - case (r, r') of - (ReplyOpMsg _ sec _, ReplyOpMsg _ sec' _) -> do - let (section, section') = (head sec, head sec') - (cur, cur') = (maybe Nothing cast $ look "cursor" section, - maybe Nothing cast $ look "cursor" section') - case (cur, cur') of - (Just doc, Just doc') -> do - let (docs, docs') = - ( fromJust $ cast $ valueAt "nextBatch" doc :: [Document] - , fromJust $ cast $ valueAt "nextBatch" doc' :: [Document]) - id' = fromJust $ cast $ valueAt "id" doc' :: Int32 - (rt, check id' (rt, rep{ sections = docs' ++ docs })) -- todo: avoid (++) - -- Since we use this to process moreToCome messages, we - -- know that there will be a nextBatch key in the document - _ -> error "Impossible" - _ -> error "Impossible" -- see comment above - check requestId (responseTo, reply) = if requestId == responseTo then reply else - error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")" + mergeResponses (rt, rep) (_, rep') = + case (rep, rep') of + (ReplyOpMsg _ sec _, ReplyOpMsg _ sec' _) -> do + let (section, section') = (head sec, head sec') + (cur, cur') = ( cast =<< look "cursor" section + , cast =<< look "cursor" section' + ) + case (cur, cur') of + (Just doc, Just doc') -> do + let (docs, docs') = + ( fromJust $ cast $ valueAt "nextBatch" doc :: [Document] + , fromJust $ cast $ valueAt "nextBatch" doc' :: [Document] + ) + (rt, rep{ sections = docs' ++ docs }) -- todo: avoid (++) + -- Since we use this to process moreToCome messages, we + -- know that there will be a nextBatch key in the document + _ -> error "Impossible" + _ -> error "Impossible" -- see comment above -- * Message From c84a9cd11e4fac157286808545d6fca1dfa4ba8a Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Tue, 19 Nov 2024 12:40:01 -0700 Subject: [PATCH 12/14] Add special case to deal with first GetMore response When a batchSize is specified on a GetMore request to MongoDB its first response with the first batch of docs is returned at the top level instead of the expected cursor structure. It sometimes occurs even without the presence of a batchSize parameter in the request. --- Database/MongoDB/Internal/Protocol.hs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index f38cfcd..8dcfe6a 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -303,6 +303,8 @@ callOpMsg pipe request flagBit params = do mergeResponses (rt, rep) (_, rep') = case (rep, rep') of (ReplyOpMsg _ sec _, ReplyOpMsg _ sec' _) -> do + -- Look for the nested data in the GetMore responses + -- https://github.com/mongodb/specifications/blob/master/source/find_getmore_killcursors_commands/find_getmore_killcursors_commands.md#getmore let (section, section') = (head sec, head sec') (cur, cur') = ( cast =<< look "cursor" section , cast =<< look "cursor" section' @@ -314,6 +316,13 @@ callOpMsg pipe request flagBit params = do , fromJust $ cast $ valueAt "nextBatch" doc' :: [Document] ) (rt, rep{ sections = docs' ++ docs }) -- todo: avoid (++) + -- Extra case for the processed first batch + (Nothing, Just doc') -> do + let (docs, docs') = + ( sec -- already processed + , fromJust $ cast $ valueAt "nextBatch" doc' :: [Document] + ) + (rt, rep{ sections = docs' ++ docs }) -- todo: avoid (++) -- Since we use this to process moreToCome messages, we -- know that there will be a nextBatch key in the document _ -> error "Impossible" From 0d754b3527592858f7b06a462314cd5481eb2630 Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Tue, 19 Nov 2024 14:06:52 -0700 Subject: [PATCH 13/14] Replace buggy conduit streaming with monad loop Re-write the MoreToCome response handling logic to use monadic loops instead of conduit streaming to yield GetMore responses. The conduit logic ends up swallowing responses into the abyss instead of producing and consolidating them as intended. --- Database/MongoDB/Internal/Protocol.hs | 67 ++++++++++++--------------- 1 file changed, 29 insertions(+), 38 deletions(-) diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 8dcfe6a..35046f5 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -77,7 +77,6 @@ import qualified Database.MongoDB.Transport as Tr import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, putMVar, readMVar, mkWeakMVar, isEmptyMVar) import GHC.List (foldl', foldl1') -import Conduit (repeatWhileMC, (.|), runConduit, foldlC) #else import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, putMVar, readMVar, addMVarFinalizer) @@ -265,42 +264,34 @@ callOpMsg pipe request flagBit params = do requestId <- genRequestId promise <- pcallOpMsg pipe (Just (request, requestId)) flagBit params promise' <- promise :: IO Response - return $ snd <$> produce requestId promise' + return $ produce requestId promise' where - -- We need to perform streaming here as within the OP_MSG protocol mongoDB expects - -- our client to keep receiving messages after the MoreToCome flagbit was - -- set by the server until our client receives an empty flagbit. After the - -- first MoreToCome flagbit was set the responseTo field in the following - -- headers will reference the cursorId that was set in the previous message. - -- see: - -- https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#moretocome-on-responses - checkFlagBit p = - case p of - (_, r) -> - case r of - ReplyOpMsg{..} -> flagBits == [MoreToCome] - -- This is called by functions using the OP_MSG protocol, - -- so this has to be ReplyOpMsg - _ -> error "Impossible" - produce reqId p = runConduit $ - case p of - (rt, r) -> - case r of - ReplyOpMsg{..} -> - if rt /= reqId then - error $ "expected response id (" ++ show rt ++ ") to match request id (" ++ show reqId ++ ")" - else - if flagBits == [MoreToCome] then yieldResponses .| foldlC mergeResponses p - else return (rt, r) - _ -> error "Impossible" -- see comment above - yieldResponses = repeatWhileMC - (do - var <- newEmptyMVar - liftIO $ atomically $ writeTChan (responseQueue pipe) var - readMVar var >>= either throwIO return :: IO Response - ) - checkFlagBit - mergeResponses (rt, rep) (_, rep') = + -- The OP_MSG protocol expects our client to receive multiple messages if the ExhaustAllowed + -- bit is set on the request. Continue to poll for messages until the MoreToCome flagbit is + -- no longer set. + -- https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.md#moretocome + produce reqId (rt, rep) + | rt /= reqId = error $ "expected response id (" ++ show rt ++ ") to match request id (" ++ show reqId ++ ")" + | checkFlagBit rep = foldr mergeResponses rep <$> generateResponseChain + | otherwise = return rep + generateResponseChain = loop id + where + loop f = do + x <- snd <$> yieldNextResponse + if checkFlagBit x + then loop (f . (x:)) + else return (f [x]) + yieldNextResponse = do + var <- newEmptyMVar + liftIO $ atomically $ writeTChan (responseQueue pipe) var + readMVar var >>= either throwIO return :: IO Response + checkFlagBit rep = + case rep of + ReplyOpMsg{..} -> flagBits == [MoreToCome] + -- This is called by functions using the OP_MSG protocol, + -- so this has to be ReplyOpMsg + _ -> error "Impossible" + mergeResponses rep' rep = case (rep, rep') of (ReplyOpMsg _ sec _, ReplyOpMsg _ sec' _) -> do -- Look for the nested data in the GetMore responses @@ -315,14 +306,14 @@ callOpMsg pipe request flagBit params = do ( fromJust $ cast $ valueAt "nextBatch" doc :: [Document] , fromJust $ cast $ valueAt "nextBatch" doc' :: [Document] ) - (rt, rep{ sections = docs' ++ docs }) -- todo: avoid (++) + rep { sections = docs' ++ docs } -- todo: avoid (++) -- Extra case for the processed first batch (Nothing, Just doc') -> do let (docs, docs') = ( sec -- already processed , fromJust $ cast $ valueAt "nextBatch" doc' :: [Document] ) - (rt, rep{ sections = docs' ++ docs }) -- todo: avoid (++) + rep { sections = docs' ++ docs } -- todo: avoid (++) -- Since we use this to process moreToCome messages, we -- know that there will be a nextBatch key in the document _ -> error "Impossible" From f38e5f9e9f5f74633e1ef0bca73c99861e55ab09 Mon Sep 17 00:00:00 2001 From: Dary Cabrera Date: Tue, 19 Nov 2024 14:17:36 -0700 Subject: [PATCH 14/14] Remove the extra IO onion layer from OP_MSG call chain If nothing else, it simplifies the code. --- Database/MongoDB/Internal/Protocol.hs | 9 ++++----- Database/MongoDB/Query.hs | 3 +-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 35046f5..e6725ff 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -207,7 +207,7 @@ pcall p@Pipeline{..} message = do liftIO $ atomically $ writeTChan responseQueue var return $ readMVar var >>= either throwIO return -- return promise -pcallOpMsg :: Pipeline -> Maybe (Request, RequestId) -> Maybe FlagBit -> Document -> IO (IO Response) +pcallOpMsg :: Pipeline -> Maybe (Request, RequestId) -> Maybe FlagBit -> Document -> IO Response -- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). -- Throw IOError and closes pipeline if send fails, likewise for promised response. pcallOpMsg p@Pipeline{..} message flagbit params = do @@ -222,7 +222,7 @@ pcallOpMsg p@Pipeline{..} message flagbit params = do -- put var into the response-queue so that it can -- fetch the latest response liftIO $ atomically $ writeTChan responseQueue var - return $ readMVar var >>= either throwIO return -- return promise + readMVar var >>= either throwIO return -- return promise -- * Pipe @@ -258,13 +258,12 @@ call pipe notices request = do check requestId (responseTo, reply) = if requestId == responseTo then reply else error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")" -callOpMsg :: Pipe -> Request -> Maybe FlagBit -> Document -> IO (IO Reply) +callOpMsg :: Pipe -> Request -> Maybe FlagBit -> Document -> IO Reply -- ^ Send requests as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails. callOpMsg pipe request flagBit params = do requestId <- genRequestId promise <- pcallOpMsg pipe (Just (request, requestId)) flagBit params - promise' <- promise :: IO Response - return $ produce requestId promise' + produce requestId promise where -- The OP_MSG protocol expects our client to receive multiple messages if the ExhaustAllowed -- bit is set on the request. Continue to poll for messages until the MoreToCome flagbit is diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 147220e..8744a14 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -1583,8 +1583,7 @@ requestOpMsg :: Pipe -> (Cmd, Maybe Limit) -> Document -> IO DelayedBatch -- ^ Send notices and request and return promised batch requestOpMsg pipe (Req r, remainingLimit) params = do promise <- liftIOE ConnectionFailure $ P.callOpMsg pipe r Nothing params - let protectedPromise = liftIOE ConnectionFailure promise - return $ fromReply remainingLimit =<< protectedPromise + return $ fromReply remainingLimit promise requestOpMsg _ _ _ = error "requestOpMsg: Only messages of type Query are supported" fromReply :: Maybe Limit -> Reply -> DelayedBatch