diff --git a/conf/hstream.yaml b/conf/hstream.yaml index 5e6ec57b2..10c99d0f5 100644 --- a/conf/hstream.yaml +++ b/conf/hstream.yaml @@ -106,6 +106,8 @@ hserver: # gossip-interval: 1000000 # 1 sec # probe-interval: 2000000 # 2 sec # roundtrip-timeout: 500000 # 0.5 sec + # + # recover-tasks-delay-ms: 0 # TODO: Auth tokens # - store tokens safely diff --git a/hstream/app/server.hs b/hstream/app/server.hs index 69218f752..c8ab7b6d6 100644 --- a/hstream/app/server.hs +++ b/hstream/app/server.hs @@ -12,7 +12,8 @@ {-# LANGUAGE TypeApplications #-} import Control.Concurrent (forkIO, newEmptyMVar, - putMVar, readMVar) + putMVar, readMVar, + threadDelay) import qualified Control.Concurrent.Async as Async import Control.Exception (bracket, handle) import Control.Monad (forM, forM_, join, void, @@ -55,6 +56,7 @@ import HStream.Server.Config (AdvertisedListeners, FileLoggerSettings (..), ListenersSecurityProtocolMap, MetaStoreAddr (..), + RecoverOpts (..), SecurityProtocolMap, ServerCli (..), ServerOpts (..), TlsConfig, @@ -219,7 +221,10 @@ serve sc@ServerContext{..} rpcOpts enableStreamV2 = do _ -> do getProtoTimestamp >>= \x -> upsertMeta @Proto.Timestamp clusterStartTimeId x metaHandle handle (\(_ :: RQLiteRowNotFound) -> return ()) $ deleteAllMeta @TaskAllocation metaHandle + Log.info "deleted all TaskAllocation records" -- recover tasks + when (serverOpts._recover_opts._recover_tasks_delay_ms > 0) $ do + threadDelay $ 1000 * serverOpts._recover_opts._recover_tasks_delay_ms Log.info "recovering local io tasks" Cluster.recoverLocalTasks sc scIOWorker Log.info "recovering local query tasks" diff --git a/hstream/src/HStream/Server/Config.hs b/hstream/src/HStream/Server/Config.hs index 068b3f9ea..6ae346bfb 100644 --- a/hstream/src/HStream/Server/Config.hs +++ b/hstream/src/HStream/Server/Config.hs @@ -28,6 +28,8 @@ module HStream.Server.Config , readProtocol #endif , parseHostPorts + + , RecoverOpts (..) ) where import Control.Exception (throwIO) @@ -122,6 +124,7 @@ data ServerOpts = ServerOpts , _gossipOpts :: !GossipOpts , _ioOptions :: !IO.IOOptions + , _recover_opts :: !RecoverOpts , _querySnapshotPath :: !FilePath , experimentalFeatures :: ![ExperimentalFeature] @@ -272,6 +275,9 @@ parseJSONToOptions CliOptions{..} obj = do tokensCfg <- nodeCfgObj .:? "tokens" .!= mempty let serverTokens = map encodeUtf8 tokensCfg + _recover_tasks_delay_ms <- nodeCfgObj .:? "recover-tasks-delay-ms" .!= 0 + let !_recover_opts = RecoverOpts{..} + return ServerOpts {..} ------------------------------------------------------------------------------- @@ -325,3 +331,9 @@ readWithErrLog :: Read a => String -> String -> a readWithErrLog opt v = case readEither v of Right x -> x Left _err -> errorWithoutStackTrace $ "Failed to parse value " <> show v <> " for option " <> opt + +------------------------------------------------------------------------------- +data RecoverOpts + = RecoverOpts + { _recover_tasks_delay_ms :: Int + } deriving (Show, Eq) diff --git a/hstream/test/HStream/ConfigSpec.hs b/hstream/test/HStream/ConfigSpec.hs index e3bde07fe..e486eb87e 100644 --- a/hstream/test/HStream/ConfigSpec.hs +++ b/hstream/test/HStream/ConfigSpec.hs @@ -31,6 +31,7 @@ import HStream.Gossip (GossipOpts (..), import HStream.IO.Types (IOOptions (..)) import HStream.Server.Config (CliOptions (..), MetaStoreAddr (..), + RecoverOpts (RecoverOpts), ServerOpts (..), TlsConfig (..), parseHostPorts, @@ -124,6 +125,8 @@ defaultConfig = ServerOpts , _gossipOpts = defaultGossipOpts , _ioOptions = defaultIOOptions + , _recover_opts = RecoverOpts 0 + , _querySnapshotPath = "/data/query_snapshots" , experimentalFeatures = [] , grpcChannelArgs = [] @@ -294,6 +297,8 @@ instance Arbitrary ServerOpts where let experimentalFeatures = [] let grpcChannelArgs = [] let serverTokens = [] + + let _recover_opts = RecoverOpts 0 pure ServerOpts{..} instance Arbitrary CliOptions where