diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index dbf7630..e6725ff 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -18,7 +18,9 @@ #endif module Database.MongoDB.Internal.Protocol ( + -- * Global command arguments FullCollection, + ReadPreferenceMode(..), setReadPreferenceMode, -- * Pipe Pipe, newPipe, newPipeWith, send, sendOpMsg, call, callOpMsg, -- ** Notice @@ -74,8 +76,7 @@ import qualified Database.MongoDB.Transport as Tr #if MIN_VERSION_base(4,6,0) import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, putMVar, readMVar, mkWeakMVar, isEmptyMVar) -import GHC.List (foldl1') -import Conduit (repeatWhileMC, (.|), runConduit, foldlC) +import GHC.List (foldl', foldl1') #else import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, putMVar, readMVar, addMVarFinalizer) @@ -206,7 +207,7 @@ pcall p@Pipeline{..} message = do liftIO $ atomically $ writeTChan responseQueue var return $ readMVar var >>= either throwIO return -- return promise -pcallOpMsg :: Pipeline -> Maybe (Request, RequestId) -> Maybe FlagBit -> Document -> IO (IO Response) +pcallOpMsg :: Pipeline -> Maybe (Request, RequestId) -> Maybe FlagBit -> Document -> IO Response -- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). -- Throw IOError and closes pipeline if send fails, likewise for promised response. pcallOpMsg p@Pipeline{..} message flagbit params = do @@ -221,7 +222,7 @@ pcallOpMsg p@Pipeline{..} message flagbit params = do -- put var into the response-queue so that it can -- fetch the latest response liftIO $ atomically $ writeTChan responseQueue var - return $ readMVar var >>= either throwIO return -- return promise + readMVar var >>= either throwIO return -- return promise -- * Pipe @@ -257,66 +258,65 @@ call pipe notices request = do check requestId (responseTo, reply) = if requestId == responseTo then reply else error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")" -callOpMsg :: Pipe -> Request -> Maybe FlagBit -> Document -> IO (IO Reply) +callOpMsg :: Pipe -> Request -> Maybe FlagBit -> Document -> IO Reply -- ^ Send requests as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails. callOpMsg pipe request flagBit params = do requestId <- genRequestId promise <- pcallOpMsg pipe (Just (request, requestId)) flagBit params - promise' <- promise :: IO Response - return $ snd <$> produce requestId promise' + produce requestId promise where - -- We need to perform streaming here as within the OP_MSG protocol mongoDB expects - -- our client to keep receiving messages after the MoreToCome flagbit was - -- set by the server until our client receives an empty flagbit. After the - -- first MoreToCome flagbit was set the responseTo field in the following - -- headers will reference the cursorId that was set in the previous message. - -- see: - -- https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#moretocome-on-responses - checkFlagBit p = - case p of - (_, r) -> - case r of - ReplyOpMsg{..} -> flagBits == [MoreToCome] - -- This is called by functions using the OP_MSG protocol, - -- so this has to be ReplyOpMsg - _ -> error "Impossible" - produce reqId p = runConduit $ - case p of - (rt, r) -> - case r of - ReplyOpMsg{..} -> - if flagBits == [MoreToCome] - then yieldResponses .| foldlC mergeResponses p - else return $ (rt, check reqId p) - _ -> error "Impossible" -- see comment above - yieldResponses = repeatWhileMC - (do - var <- newEmptyMVar - liftIO $ atomically $ writeTChan (responseQueue pipe) var - readMVar var >>= either throwIO return :: IO Response - ) - checkFlagBit - mergeResponses p@(rt,rep) p' = - case (p, p') of - ((_, r), (_, r')) -> - case (r, r') of - (ReplyOpMsg _ sec _, ReplyOpMsg _ sec' _) -> do - let (section, section') = (head sec, head sec') - (cur, cur') = (maybe Nothing cast $ look "cursor" section, - maybe Nothing cast $ look "cursor" section') - case (cur, cur') of - (Just doc, Just doc') -> do - let (docs, docs') = - ( fromJust $ cast $ valueAt "nextBatch" doc :: [Document] - , fromJust $ cast $ valueAt "nextBatch" doc' :: [Document]) - id' = fromJust $ cast $ valueAt "id" doc' :: Int32 - (rt, check id' (rt, rep{ sections = docs' ++ docs })) -- todo: avoid (++) - -- Since we use this to process moreToCome messages, we - -- know that there will be a nextBatch key in the document - _ -> error "Impossible" - _ -> error "Impossible" -- see comment above - check requestId (responseTo, reply) = if requestId == responseTo then reply else - error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")" + -- The OP_MSG protocol expects our client to receive multiple messages if the ExhaustAllowed + -- bit is set on the request. Continue to poll for messages until the MoreToCome flagbit is + -- no longer set. + -- https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.md#moretocome + produce reqId (rt, rep) + | rt /= reqId = error $ "expected response id (" ++ show rt ++ ") to match request id (" ++ show reqId ++ ")" + | checkFlagBit rep = foldr mergeResponses rep <$> generateResponseChain + | otherwise = return rep + generateResponseChain = loop id + where + loop f = do + x <- snd <$> yieldNextResponse + if checkFlagBit x + then loop (f . (x:)) + else return (f [x]) + yieldNextResponse = do + var <- newEmptyMVar + liftIO $ atomically $ writeTChan (responseQueue pipe) var + readMVar var >>= either throwIO return :: IO Response + checkFlagBit rep = + case rep of + ReplyOpMsg{..} -> flagBits == [MoreToCome] + -- This is called by functions using the OP_MSG protocol, + -- so this has to be ReplyOpMsg + _ -> error "Impossible" + mergeResponses rep' rep = + case (rep, rep') of + (ReplyOpMsg _ sec _, ReplyOpMsg _ sec' _) -> do + -- Look for the nested data in the GetMore responses + -- https://github.com/mongodb/specifications/blob/master/source/find_getmore_killcursors_commands/find_getmore_killcursors_commands.md#getmore + let (section, section') = (head sec, head sec') + (cur, cur') = ( cast =<< look "cursor" section + , cast =<< look "cursor" section' + ) + case (cur, cur') of + (Just doc, Just doc') -> do + let (docs, docs') = + ( fromJust $ cast $ valueAt "nextBatch" doc :: [Document] + , fromJust $ cast $ valueAt "nextBatch" doc' :: [Document] + ) + rep { sections = docs' ++ docs } -- todo: avoid (++) + -- Extra case for the processed first batch + (Nothing, Just doc') -> do + let (docs, docs') = + ( sec -- already processed + , fromJust $ cast $ valueAt "nextBatch" doc' :: [Document] + ) + rep { sections = docs' ++ docs } -- todo: avoid (++) + -- Since we use this to process moreToCome messages, we + -- know that there will be a nextBatch key in the document + _ -> error "Impossible" + _ -> error "Impossible" -- see comment above -- * Message @@ -570,21 +570,23 @@ putOpMsg cmd requestId flagBit params = do Query{..} -> do let n = T.splitOn "." qFullCollection db = head n - sec0 = foldl1' merge [qProjector, [ "$db" =: db ], qSelector] + sec0 = foldl1' merge [qReadPreference, optionsToDoc qOptions, qProjector, [ "$db" =: db ], qSelector] putInt32 biT putInt8 0 putDocument sec0 GetMore{..} -> do let n = T.splitOn "." gFullCollection (db, coll) = (head n, last n) - pre = ["getMore" =: gCursorId, "collection" =: coll, "$db" =: db, "batchSize" =: gBatchSize] + pre = merge + gReadPreference + ["getMore" =: gCursorId, "collection" =: coll, "$db" =: db, "batchSize" =: gBatchSize] putInt32 (bit $ bitOpMsg $ ExhaustAllowed) putInt8 0 putDocument pre Message{..} -> do putInt32 biT putInt8 0 - putDocument $ merge [ "$db" =: mDatabase ] mParams + putDocument $ foldl1' merge [mReadPreference, [ "$db" =: mDatabase ], mParams] Kc k -> case k of KillC{..} -> do let n = T.splitOn "." kFullCollection @@ -649,6 +651,8 @@ bitOpMsg ExhaustAllowed = 16 -- ** Request -- | A request is a message that is sent with a 'Reply' expected in return +-- Read preference is a global argument required for requests to secondaries using the OP_MSG protocol. +-- https://github.com/mongodb/specifications/blob/ffa75b41736f669c754dff9c0a0d60bb70eb319f/source/server-selection/server-selection.md?plain=1#L503 data Request = Query { qOptions :: [QueryOption], @@ -656,14 +660,17 @@ data Request = qSkip :: Int32, -- ^ Number of initial matching documents to skip qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size. qSelector :: Document, -- ^ @[]@ = return all documents in collection - qProjector :: Document -- ^ @[]@ = return whole document + qProjector :: Document, -- ^ @[]@ = return whole document + qReadPreference :: Document } | GetMore { gFullCollection :: FullCollection, gBatchSize :: Int32, - gCursorId :: CursorId + gCursorId :: CursorId, + gReadPreference :: Document } | Message { mDatabase :: Text, - mParams :: Document + mParams :: Document, + mReadPreference :: Document } deriving (Show, Eq) @@ -679,6 +686,15 @@ data QueryOption = | Partial -- ^ Get partial results from a /mongos/ if some shards are down, instead of throwing an error. deriving (Show, Eq) +-- https://www.mongodb.com/docs/manual/core/read-preference/ +data ReadPreferenceMode + = Primary -- Default mode. + | Secondary + | PrimaryPreferred + | SecondaryPreferred -- Corresponds to slaveOk access mode. + | Nearest + deriving (Show, Eq) + -- *** Binary format qOpcode :: Request -> Opcode @@ -721,6 +737,32 @@ qBit Database.MongoDB.Internal.Protocol.Partial = bit 7 qBits :: [QueryOption] -> Int32 qBits = bitOr . map qBit +optionToDoc :: QueryOption -> Document +optionToDoc qOpt = + case qOpt of + TailableCursor -> ["tailable" =: True] + NoCursorTimeout -> ["noCursorTimeout" =: True] + AwaitData -> ["awaitData" =: True] + Partial -> ["allowPartialResults" =: True] + SlaveOK -> [] + +optionsToDoc :: [QueryOption] -> Document +optionsToDoc = foldl' merge [] . map optionToDoc + +readPreferenceModeToText :: ReadPreferenceMode -> Text +readPreferenceModeToText mode = + case mode of + Primary -> "primary" + Secondary -> "secondary" + PrimaryPreferred -> "primaryPreferred" + SecondaryPreferred -> "secondaryPreferred" + Nearest -> "nearest" + +setReadPreferenceMode :: ReadPreferenceMode -> Document +setReadPreferenceMode mode = + ["mode" =: (readPreferenceModeToText mode)] + + -- ** Reply -- | A reply is a message received in response to a 'Request' diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index b83e316..8744a14 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -133,9 +133,8 @@ import Database.MongoDB.Internal.Protocol pwKey, FlagBit (..) ) -import Control.Monad.Trans.Except import qualified Database.MongoDB.Internal.Protocol as P -import Database.MongoDB.Internal.Util (liftIOE, loop, true1, (<.>), splitDot) +import Database.MongoDB.Internal.Util (liftIOE, loop, true1, (<.>)) import System.Mem.Weak (Weak) import Text.Read (readMaybe) import Prelude hiding (lookup) @@ -205,6 +204,12 @@ data Upserted = Upserted , upsertedId :: ObjectId } deriving Show +-- MongoDB 5.1 Removed support OP_QUERY, and other CRUD specific OpCodes. +-- OP_MSG is go forward. +-- https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/#legacy-opcodes +opMsgOnlyWireVersion :: Int +opMsgOnlyWireVersion = 17 + master :: AccessMode -- ^ Same as 'ConfirmWrites' [] master = ConfirmWrites [] @@ -221,6 +226,15 @@ readMode :: AccessMode -> ReadMode readMode ReadStaleOk = StaleOk readMode _ = Fresh +-- https://github.com/mongodb/specifications/blob/ffa75b41736f669c754dff9c0a0d60bb70eb319f/source/server-selection/server-selection.md?plain=1#L548 +readPreferenceDoc :: ReadMode -> Document +readPreferenceDoc mode = + case mode of + -- "secondaryPreferred" matches the replicaSet host selection critieria of secondaryOk() + -- https://www.mongodb.com/docs/manual/core/read-preference/#mongodb-readmode-secondaryPreferred + StaleOk -> ["$readPreference" =: P.setReadPreferenceMode P.SecondaryPreferred] + _ -> [] + writeMode :: AccessMode -> WriteMode writeMode ReadStaleOk = Confirm [] writeMode UnconfirmedWrites = NoConfirm @@ -412,7 +426,7 @@ allCollections = do docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]} (return . filter (not . isSpecial db)) (map (dropDbPrefix . at "name") docs) else - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then do r <- runCommand1 "listCollections" let curData = do @@ -430,7 +444,7 @@ allCollections = do return $ mapMaybe (\d -> d !? "name") docs else do let q = Query [] (Select ["listCollections" =: (1 :: Int)] "$cmd") [] 0 0 [] False 0 [] - qr <- queryRequestOpMsg False q + qr <- queryRequestOpMsg q dBatch <- liftIO $ requestOpMsg p qr [] db <- thisDatabase nc <- newCursor db "$cmd" 0 dBatch @@ -594,7 +608,7 @@ insertBlock opts col (prevCount, docs) = do case errorMessage of Just failure -> return $ Left failure Nothing -> return $ Right $ map (valueAt "_id") docs - else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do + else if maxWireVersion sd == 2 then do mode <- asks mongoWriteMode let writeConcern = case mode of NoConfirm -> ["w" =: (0 :: Int32)] @@ -726,17 +740,21 @@ update opts (Select sel col) up = do pipe <- asks mongoPipe db <- thisDatabase let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then do ctx <- ask liftIO $ runReaderT (void $ write (Update (db <.> col) opts sel up)) ctx else do + mode <- asks mongoWriteMode + let writeConcern = case mode of + NoConfirm -> ["w" =: (0 :: Int32)] + Confirm params -> params liftIOE ConnectionFailure $ P.sendOpMsg pipe [Nc (Update (db <.> col) opts sel up)] (Just P.MoreToCome) - ["writeConcern" =: ["w" =: (0 :: Int32)]] + writeConcern updateCommandDocument :: Collection -> Bool -> [Document] -> Document -> Document updateCommandDocument col ordered updates writeConcern = @@ -843,7 +861,7 @@ updateBlock ordered col (prevCount, docs) = do let sd = P.serverData p if maxWireVersion sd < 2 then liftIO $ ioError $ userError "updateMany doesn't support mongodb older than 2.6" - else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do + else if maxWireVersion sd == 2 then do mode <- asks mongoWriteMode let writeConcern = case mode of NoConfirm -> ["w" =: (0 :: Int32)] @@ -999,9 +1017,9 @@ delete :: (MonadIO m) delete s = do pipe <- asks mongoPipe let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then deleteHelper [] s - else deleteMany (coll s) [([], [])] >> return () + else void $ deleteMany (coll s) [(selector s, [])] deleteOne :: (MonadIO m) => Selection -> Action m () @@ -1009,7 +1027,7 @@ deleteOne :: (MonadIO m) deleteOne sel@((Select sel' col)) = do pipe <- asks mongoPipe let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then deleteHelper [SingleRemove] sel else do -- Starting with v6 confirming writes via getLastError as it is @@ -1113,7 +1131,7 @@ deleteBlock ordered col (prevCount, docs) = do let sd = P.serverData p if maxWireVersion sd < 2 then liftIO $ ioError $ userError "deleteMany doesn't support mongodb older than 2.6" - else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do + else if maxWireVersion sd == 2 then do mode <- asks mongoWriteMode let writeConcern = case mode of NoConfirm -> ["w" =: (0 :: Int32)] @@ -1239,9 +1257,9 @@ data Query = Query { skip :: Word32, -- ^ Number of initial matching documents to skip. Default = 0 limit :: Limit, -- ^ Maximum number of documents to return, 0 = no limit. Default = 0 sort :: Order, -- ^ Sort results by this order, @[]@ = no sort. Default = @[]@ - snapshot :: Bool, -- ^ If true assures no duplicates are returned, or objects missed, which were present at both the start and end of the query's execution (even if the object were updated). If an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode. Note that short query responses (less than 1MB) are always effectively snapshotted. Default = @False@ + snapshot :: Bool, -- ^ Deprecated since MongoDB 4.0. If true assures no duplicates are returned, or objects missed, which were present at both the start and end of the query's execution (even if the object were updated). If an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode. Note that short query responses (less than 1MB) are always effectively snapshotted. Default = @False@ batchSize :: BatchSize, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Default = 0 - hint :: Order -- ^ Force MongoDB to use this index, @[]@ = no hint. Default = @[]@ + hint :: Hint -- ^ Force MongoDB to use this index, @[]@ = no hint. Default = @[]@ } deriving (Show, Eq) type Projector = Document @@ -1253,6 +1271,11 @@ type Limit = Word32 type Order = Document -- ^ Fields to sort by. Each one is associated with 1 or -1. Eg. @[\"x\" =: 1, \"y\" =: -1]@ means sort by @x@ ascending then @y@ descending +-- The current specification allows this to be passed as a string or a document. For backwards compatibility and simplicity, I'm leaving this unchanged to continue being required as a document. +-- https://github.com/mongodb/specifications/blob/e651ebe819743f1cffae938f5488c69d5277c21f/source/crud/crud.md?plain=1#L1953 +type Hint = Document +-- ^ Index specification. If specified, then the query system will only consider plans using the hinted index. Specify the index name as a key pattern, e.g. { age: 1 } + type BatchSize = Word32 -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. @@ -1297,13 +1320,13 @@ find q@Query{selection, batchSize} = do pipe <- asks mongoPipe db <- thisDatabase let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then do qr <- queryRequest False q dBatch <- liftIO $ request pipe [] qr newCursor db (coll selection) batchSize dBatch else do - qr <- queryRequestOpMsg False q + qr <- queryRequestOpMsg q let newQr = case fst qr of Req P.Query{..} -> @@ -1319,7 +1342,7 @@ findCommand :: (MonadIO m) => Query -> Action m Cursor findCommand q@Query{..} = do pipe <- asks mongoPipe let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then do let aColl = coll selection response <- runCommand $ @@ -1362,10 +1385,10 @@ findOne q = do then legacyQuery else do let sd = P.serverData pipe - if (maxWireVersion sd < 17) + if maxWireVersion sd < opMsgOnlyWireVersion then legacyQuery else do - qr <- queryRequestOpMsg False q {limit = 1} + qr <- queryRequestOpMsg q {limit = 1} let newQr = case fst qr of Req P.Query{..} -> @@ -1503,15 +1526,16 @@ queryRequest isExplain Query{..} = do mExplain = if isExplain then Just ("$explain" =: True) else Nothing special = catMaybes [mOrder, mSnapshot, mHint, mExplain] qSelector = if null special then s else ("$query" =: s) : special where s = selector selection + qReadPreference = [] -queryRequestOpMsg :: (Monad m, MonadIO m) => Bool -> Query -> Action m (Cmd, Maybe Limit) +queryRequestOpMsg :: (Monad m, MonadIO m) => Query -> Action m (Cmd, Maybe Limit) -- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. -queryRequestOpMsg isExplain Query{..} = do +queryRequestOpMsg Query{..} = do ctx <- ask return $ queryRequest' (mongoReadMode ctx) (mongoDatabase ctx) where queryRequest' rm db = (Req P.Query{..}, remainingLimit) where - qOptions = readModeOption rm ++ options + qOptions = options qFullCollection = db <.> coll selection qSkip = fromIntegral skip (qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize (if limit == 0 then Nothing else Just limit) @@ -1520,16 +1544,15 @@ queryRequestOpMsg isExplain Query{..} = do -- the relevant fields to the selector isNotCommand = null $ catMaybes $ map (\l -> look l (selector selection)) (noticeCommands ++ adminCommands) mOrder = if null sort then Nothing else Just ("sort" =: sort) - mSnapshot = if snapshot then Just ("snapshot" =: True) else Nothing mHint = if null hint then Nothing else Just ("hint" =: hint) - mExplain = if isExplain then Just ("$explain" =: True) else Nothing - special = catMaybes [mOrder, mSnapshot, mHint, mExplain] + special = catMaybes [mOrder, mHint] qProjector = if isNotCommand then ["projection" =: project] else project qSelector = if isNotCommand then c else s where s = selector selection bSize = if qBatchSize == 0 then Nothing else Just ("batchSize" =: qBatchSize) mLimit = if limit == 0 then Nothing else maybe Nothing (\rL -> Just ("limit" =: (fromIntegral rL :: Int32))) remainingLimit c = ("filter" =: s) : special ++ maybeToList bSize ++ maybeToList mLimit + qReadPreference = readPreferenceDoc rm batchSizeRemainingLimit :: BatchSize -> Maybe Limit -> (Int32, Maybe Limit) -- ^ Given batchSize and limit return P.qBatchSize and remaining limit @@ -1560,8 +1583,7 @@ requestOpMsg :: Pipe -> (Cmd, Maybe Limit) -> Document -> IO DelayedBatch -- ^ Send notices and request and return promised batch requestOpMsg pipe (Req r, remainingLimit) params = do promise <- liftIOE ConnectionFailure $ P.callOpMsg pipe r Nothing params - let protectedPromise = liftIOE ConnectionFailure promise - return $ fromReply remainingLimit =<< protectedPromise + return $ fromReply remainingLimit promise requestOpMsg _ _ _ = error "requestOpMsg: Only messages of type Query are supported" fromReply :: Maybe Limit -> Reply -> DelayedBatch @@ -1625,7 +1647,10 @@ nextBatch (Cursor fcol batchSize var) = liftDB $ modifyMVar var $ \dBatch -> do (0, _) -> return (emptyBatch, resultDocs) (_, Just 0) -> do pipe <- asks mongoPipe - liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] + let sd = P.serverData pipe + if maxWireVersion sd < opMsgOnlyWireVersion + then liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] + else liftIOE ConnectionFailure $ P.sendOpMsg pipe [Kc (P.KillC (KillCursors [cid]) fcol)] (Just MoreToCome) [] return (emptyBatch, resultDocs) (_, _) -> (, resultDocs) <$> getNextBatch @@ -1640,10 +1665,11 @@ fulfill' fcol batchSize dBatch = do nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Maybe Limit -> CursorId -> Action m DelayedBatch nextBatch' fcol batchSize limit cid = do pipe <- asks mongoPipe + mode <- asks mongoReadMode let sd = P.serverData pipe - if maxWireVersion sd < 17 - then liftIO $ request pipe [] (GetMore fcol batchSize' cid, remLimit) - else liftIO $ requestOpMsg pipe (Req $ GetMore fcol batchSize' cid, remLimit) [] + if maxWireVersion sd < opMsgOnlyWireVersion + then liftIO $ request pipe [] (GetMore fcol batchSize' cid [], remLimit) + else liftIO $ requestOpMsg pipe (Req $ GetMore fcol batchSize' cid (readPreferenceDoc mode), remLimit) [] where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit next :: MonadIO m => Cursor -> Action m (Maybe Document) @@ -1667,7 +1693,7 @@ next (Cursor fcol batchSize var) = liftDB $ modifyMVar var nextState where when (newLimit == Just 0) $ unless (cid == 0) $ do pipe <- asks mongoPipe let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] else liftIOE ConnectionFailure $ P.sendOpMsg pipe [Kc (P.KillC (KillCursors [cid]) fcol)] (Just MoreToCome) [] return (dBatch', Just doc) @@ -1686,11 +1712,14 @@ rest :: MonadIO m => Cursor -> Action m [Document] rest c = loop (next c) closeCursor :: MonadIO m => Cursor -> Action m () -closeCursor (Cursor _ _ var) = liftDB $ modifyMVar var $ \dBatch -> do +closeCursor (Cursor fcol _ var) = liftDB $ modifyMVar var $ \dBatch -> do Batch _ cid _ <- fulfill dBatch unless (cid == 0) $ do pipe <- asks mongoPipe - liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] + let sd = P.serverData pipe + if maxWireVersion sd < opMsgOnlyWireVersion + then liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] + else liftIOE ConnectionFailure $ P.sendOpMsg pipe [Kc (P.KillC (KillCursors [cid]) fcol)] (Just MoreToCome) [] return (return $ Batch (Just 0) 0 [], ()) isCursorClosed :: MonadIO m => Cursor -> Action m Bool @@ -1731,14 +1760,14 @@ aggregateCursor :: (MonadIO m) => Collection -> Pipeline -> AggregateConfig -> A aggregateCursor aColl agg cfg = do pipe <- asks mongoPipe let sd = P.serverData pipe - if maxWireVersion sd < 17 + if maxWireVersion sd < opMsgOnlyWireVersion then do response <- runCommand (aggregateCommand aColl agg cfg) getCursorFromResponse aColl response >>= either (liftIO . throwIO . AggregateFailure) return else do let q = select (aggregateCommand aColl agg cfg) aColl - qr <- queryRequestOpMsg False q + qr <- queryRequestOpMsg q dBatch <- liftIO $ requestOpMsg pipe qr [] db <- thisDatabase Right <$> newCursor db aColl 0 dBatch @@ -1882,7 +1911,7 @@ type Command = Document runCommand :: (MonadIO m) => Command -> Action m Document runCommand params = do pipe <- asks mongoPipe - if isHandshake params || maxWireVersion (P.serverData pipe) < 17 + if isHandshake params || maxWireVersion (P.serverData pipe) < opMsgOnlyWireVersion then runCommandLegacy pipe params else runCommand' pipe params @@ -1898,7 +1927,8 @@ runCommandLegacy pipe params = do runCommand' :: MonadIO m => Pipe -> Selector -> ReaderT MongoContext m Document runCommand' pipe params = do ctx <- ask - rq <- liftIO $ requestOpMsg pipe ( Req (P.Message (mongoDatabase ctx) params), Just 1) [] + let readPreference = readPreferenceDoc $ mongoReadMode ctx + rq <- liftIO $ requestOpMsg pipe ( Req (P.Message (mongoDatabase ctx) params readPreference), Just 1) [] Batch _ _ docs <- liftDB $ fulfill rq case docs of [doc] -> pure doc diff --git a/docker-compose.yml b/docker-compose.yml index f79c6ee..6391f27 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,16 +1,17 @@ -version: '3' services: mongodb: ports: - 27017:27017 - image: mongo:4.0 + image: mongo:5.0 mongodb-haskell: - image: phadej/ghc:8.0.2 + image: phadej/ghc:8.10.4-focal environment: - HASKELL_MONGODB_TEST_HOST=mongodb + - MONGO_VERSION=mongo:5.0 entrypoint: - /bin/bash volumes: - ./:/opt/mongodb-haskell + working_dir: /opt/mongodb-haskell # vim: ts=2 et sw=2 ai diff --git a/mongoDB.cabal b/mongoDB.cabal index f78a68f..74ab503 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -102,6 +102,7 @@ test-suite test , old-locale , text , time + build-tool-depends: hspec-discover:hspec-discover default-language: Haskell2010 default-extensions: OverloadedStrings diff --git a/test/Main.hs b/test/Main.hs index 267f7ee..9b13867 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -6,7 +6,6 @@ import Data.Maybe (isJust) import qualified Spec import System.Environment (getEnv, lookupEnv) import Test.Hspec.Runner -import TestImport main :: IO () main = do