From ced47f0ccab9e7af705f212579586762fb74c0ca Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 16 Jul 2025 15:13:39 +0200 Subject: [PATCH 1/3] Remove Persister In preparation for the addition of an async KVStore, we here remove the Persister pseudo-wrapper. The wrapper is thin, would need to be duplicated for async, and KVStore isn't fully abstracted anyway anymore because the sweeper takes it directly. --- lightning-background-processor/src/lib.rs | 112 +++++++++++++++------- lightning/src/ln/channelmanager.rs | 3 +- lightning/src/util/persist.rs | 71 +++----------- 3 files changed, 92 insertions(+), 94 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 235bb39c7d4..cc622e2ac53 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -17,6 +17,7 @@ extern crate alloc; extern crate lightning; extern crate lightning_rapid_gossip_sync; +use crate::lightning::util::ser::Writeable; use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; @@ -40,7 +41,13 @@ use lightning::sign::ChangeDestinationSourceSync; use lightning::sign::EntropySource; use lightning::sign::OutputSpender; use lightning::util::logger::Logger; -use lightning::util::persist::{KVStore, Persister}; +use lightning::util::persist::{ + KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, +}; use lightning::util::sweep::OutputSweeper; #[cfg(feature = "std")] use lightning::util::sweep::OutputSweeperSync; @@ -313,7 +320,8 @@ fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + Wri macro_rules! define_run_body { ( - $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, + $kv_store: ident, + $chain_monitor: ident, $process_chain_monitor_events: expr, $channel_manager: ident, $process_channel_manager_events: expr, $onion_messenger: ident, $process_onion_message_handler_events: expr, $peer_manager: ident, $gossip_sync: ident, @@ -375,7 +383,12 @@ macro_rules! define_run_body { if $channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!($logger, "Persisting ChannelManager..."); - $persister.persist_manager(&$channel_manager)?; + $kv_store.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &$channel_manager.get_cm().encode(), + )?; log_trace!($logger, "Done persisting ChannelManager."); } if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) { @@ -436,7 +449,12 @@ macro_rules! define_run_body { log_trace!($logger, "Persisting network graph."); } - if let Err(e) = $persister.persist_graph(network_graph) { + if let Err(e) = $kv_store.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + ) { log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) } @@ -464,7 +482,12 @@ macro_rules! define_run_body { } else { log_trace!($logger, "Persisting scorer"); } - if let Err(e) = $persister.persist_scorer(&scorer) { + if let Err(e) = $kv_store.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) { log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) } } @@ -487,16 +510,31 @@ macro_rules! define_run_body { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - $persister.persist_manager(&$channel_manager)?; + $kv_store.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &$channel_manager.get_cm().encode(), + )?; // Persist Scorer on exit if let Some(ref scorer) = $scorer { - $persister.persist_scorer(&scorer)?; + $kv_store.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + )?; } // Persist NetworkGraph on exit if let Some(network_graph) = $gossip_sync.network_graph() { - $persister.persist_graph(network_graph)?; + $kv_store.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + )?; } Ok(()) @@ -684,7 +722,6 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static, /// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static, /// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static, -/// # K: lightning::util::persist::KVStore + Send + Sync + 'static, /// # O: lightning::sign::OutputSpender + Send + Sync + 'static, /// # > { /// # peer_manager: Arc>, @@ -697,7 +734,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # persister: Arc, /// # logger: Arc, /// # scorer: Arc, -/// # sweeper: Arc, Arc, Arc, Arc, Arc, Arc, Arc>>, +/// # sweeper: Arc, Arc, Arc, Arc, Arc, Arc, Arc>>, /// # } /// # /// # async fn setup_background_processing< @@ -706,9 +743,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static, /// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static, /// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static, -/// # K: lightning::util::persist::KVStore + Send + Sync + 'static, /// # O: lightning::sign::OutputSpender + Send + Sync + 'static, -/// # >(node: Node) { +/// # >(node: Node) { /// let background_persister = Arc::clone(&node.persister); /// let background_event_handler = Arc::clone(&node.event_handler); /// let background_chain_mon = Arc::clone(&node.chain_monitor); @@ -780,7 +816,6 @@ pub async fn process_events_async< P: 'static + Deref, EventHandlerFuture: core::future::Future>, EventHandler: Fn(Event) -> EventHandlerFuture, - PS: 'static + Deref + Send, ES: 'static + Deref + Send, M: 'static + Deref::Signer, CF, T, F, L, P, ES>> @@ -802,7 +837,7 @@ pub async fn process_events_async< Sleeper: Fn(Duration) -> SleepFuture, FetchTime: Fn() -> Option, >( - persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, + kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, liquidity_manager: Option, sweeper: Option, logger: L, scorer: Option, sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime, @@ -814,7 +849,6 @@ where F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, P::Target: 'static + Persist<::Signer>, - PS::Target: 'static + Persister<'a, CM, L, S>, ES::Target: 'static + EntropySource, CM::Target: AChannelManager, OM::Target: AOnionMessenger, @@ -830,7 +864,7 @@ where let event_handler = &event_handler; let scorer = &scorer; let logger = &logger; - let persister = &persister; + let kv_store = &kv_store; let fetch_time = &fetch_time; // We should be able to drop the Box once our MSRV is 1.68 Box::pin(async move { @@ -841,7 +875,12 @@ where if let Some(duration_since_epoch) = fetch_time() { if update_scorer(scorer, &event, duration_since_epoch) { log_trace!(logger, "Persisting scorer after update"); - if let Err(e) = persister.persist_scorer(&*scorer) { + if let Err(e) = kv_store.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) { log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e); // We opt not to abort early on persistence failure here as persisting // the scorer is non-critical and we still hope that it will have @@ -855,7 +894,7 @@ where }) }; define_run_body!( - persister, + kv_store, chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, @@ -928,21 +967,21 @@ impl BackgroundProcessor { /// documentation]. /// /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or - /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling + /// [`KVStore`] returns an error. In case of an error, the error is retrieved by calling /// either [`join`] or [`stop`]. /// /// # Data Persistence /// - /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or + /// [`KVStore`] is responsible for writing out the [`ChannelManager`] to disk, and/or /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's /// provided implementation. /// - /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if + /// [`KVStore`] is also responsible for writing out the [`NetworkGraph`] to disk, if /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. /// See the `lightning-persister` crate for LDK's provided implementation. /// - /// Typically, users should either implement [`Persister::persist_manager`] to never return an + /// Typically, users should either implement [`KVStore`] to never return an /// error or call [`join`] and handle any error that may arise. For the latter case, /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error. /// @@ -964,8 +1003,6 @@ impl BackgroundProcessor { /// [`stop`]: Self::stop /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable - /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager - /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable pub fn start< @@ -978,7 +1015,6 @@ impl BackgroundProcessor { L: 'static + Deref + Send, P: 'static + Deref, EH: 'static + EventHandler + Send, - PS: 'static + Deref + Send, ES: 'static + Deref + Send, M: 'static + Deref< @@ -996,10 +1032,10 @@ impl BackgroundProcessor { SC: for<'b> WriteableScore<'b>, D: 'static + Deref, O: 'static + Deref, - K: 'static + Deref, + K: 'static + Deref + Send, OS: 'static + Deref> + Send, >( - persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, + kv_store: K, event_handler: EH, chain_monitor: M, channel_manager: CM, onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, liquidity_manager: Option, sweeper: Option, logger: L, scorer: Option, ) -> Self @@ -1010,7 +1046,6 @@ impl BackgroundProcessor { F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, P::Target: 'static + Persist<::Signer>, - PS::Target: 'static + Persister<'a, CM, L, S>, ES::Target: 'static + EntropySource, CM::Target: AChannelManager, OM::Target: AOnionMessenger, @@ -1035,7 +1070,12 @@ impl BackgroundProcessor { .expect("Time should be sometime after 1970"); if update_scorer(scorer, &event, duration_since_epoch) { log_trace!(logger, "Persisting scorer after update"); - if let Err(e) = persister.persist_scorer(&scorer) { + if let Err(e) = kv_store.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) { log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) } } @@ -1043,7 +1083,7 @@ impl BackgroundProcessor { event_handler.handle_event(event) }; define_run_body!( - persister, + kv_store, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, @@ -1256,7 +1296,7 @@ mod tests { Arc, Arc, Arc, - Arc, + Arc, Arc, >; @@ -1314,7 +1354,7 @@ mod tests { >, liquidity_manager: Arc, chain_monitor: Arc, - kv_store: Arc, + kv_store: Arc, tx_broadcaster: Arc, network_graph: Arc>>, logger: Arc, @@ -1326,7 +1366,7 @@ mod tests { Arc, Arc, Arc, - Arc, + Arc, Arc, Arc, >, @@ -1418,6 +1458,10 @@ mod tests { fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self { Self { scorer_error: Some((error, message)), ..self } } + + pub fn get_data_dir(&self) -> PathBuf { + self.kv_store.get_data_dir() + } } impl KVStore for Persister { @@ -1662,7 +1706,7 @@ mod tests { )); let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin)); let kv_store = - Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into())); + Arc::new(Persister::new(format!("{}_persister_{}", &persist_dir, i).into())); let now = Duration::from_secs(genesis_block.header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new( diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index d0d429a0abc..02f79707023 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1883,7 +1883,7 @@ where /// - Perform any periodic channel and payment checks by calling [`timer_tick_occurred`] roughly /// every minute /// - Persist to disk whenever [`get_and_clear_needs_persistence`] returns `true` using a -/// [`Persister`] such as a [`KVStore`] implementation +/// [`KVStore`] implementation /// - Handle [`Event`]s obtained via its [`EventsProvider`] implementation /// /// The [`Future`] returned by [`get_event_or_persistence_needed_future`] is useful in determining @@ -2465,7 +2465,6 @@ where /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events /// [`timer_tick_occurred`]: Self::timer_tick_occurred /// [`get_and_clear_needs_persistence`]: Self::get_and_clear_needs_persistence -/// [`Persister`]: crate::util::persist::Persister /// [`KVStore`]: crate::util::persist::KVStore /// [`get_event_or_persistence_needed_future`]: Self::get_event_or_persistence_needed_future /// [`lightning-block-sync`]: https://docs.rs/lightning_block_sync/latest/lightning_block_sync diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 97a7687cb7b..a229432a7b6 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -9,6 +9,7 @@ //! and [`ChannelMonitor`] all in one place. //! //! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager +//! [`NetworkGraph`]: crate::routing::gossip::NetworkGraph use bitcoin::hashes::hex::FromHex; use bitcoin::{BlockHash, Txid}; @@ -24,10 +25,7 @@ use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::chainmonitor::Persist; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}; use crate::chain::transaction::OutPoint; -use crate::ln::channelmanager::AChannelManager; use crate::ln::types::ChannelId; -use crate::routing::gossip::NetworkGraph; -use crate::routing::scoring::WriteableScore; use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; use crate::util::logger::Logger; use crate::util::ser::{Readable, ReadableArgs, Writeable}; @@ -65,17 +63,29 @@ pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archiv pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// The primary namespace under which the [`NetworkGraph`] will be persisted. +/// +/// [`NetworkGraph`]: crate::routing::gossip::NetworkGraph pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = ""; /// The secondary namespace under which the [`NetworkGraph`] will be persisted. +/// +/// [`NetworkGraph`]: crate::routing::gossip::NetworkGraph pub const NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// The key under which the [`NetworkGraph`] will be persisted. +/// +/// [`NetworkGraph`]: crate::routing::gossip::NetworkGraph pub const NETWORK_GRAPH_PERSISTENCE_KEY: &str = "network_graph"; /// The primary namespace under which the [`WriteableScore`] will be persisted. +/// +/// [`WriteableScore`]: crate::routing::scoring::WriteableScore pub const SCORER_PERSISTENCE_PRIMARY_NAMESPACE: &str = ""; /// The secondary namespace under which the [`WriteableScore`] will be persisted. +/// +/// [`WriteableScore`]: crate::routing::scoring::WriteableScore pub const SCORER_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// The key under which the [`WriteableScore`] will be persisted. +/// +/// [`WriteableScore`]: crate::routing::scoring::WriteableScore pub const SCORER_PERSISTENCE_KEY: &str = "scorer"; /// The primary namespace under which [`OutputSweeper`] state will be persisted. @@ -199,61 +209,6 @@ pub fn migrate_kv_store_data( Ok(()) } -/// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk. -/// -/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager -pub trait Persister<'a, CM: Deref, L: Deref, S: Deref> -where - CM::Target: 'static + AChannelManager, - L::Target: 'static + Logger, - S::Target: WriteableScore<'a>, -{ - /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed. - /// - /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager - fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error>; - - /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed. - fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error>; - - /// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed. - fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>; -} - -impl<'a, A: KVStore + ?Sized, CM: Deref, L: Deref, S: Deref> Persister<'a, CM, L, S> for A -where - CM::Target: 'static + AChannelManager, - L::Target: 'static + Logger, - S::Target: WriteableScore<'a>, -{ - fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> { - self.write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - &channel_manager.get_cm().encode(), - ) - } - - fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> { - self.write( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - &network_graph.encode(), - ) - } - - fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> { - self.write( - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - &scorer.encode(), - ) - } -} - impl Persist for K { // TODO: We really need a way for the persister to inform the user that its time to crash/shut // down once these start returning failure. From 03032ddf311d65bc6e75deff64e94d62f7a2f73a Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 30 Jun 2025 14:51:39 +0200 Subject: [PATCH 2/3] Rename KVStore trait to KVStoreSync --- lightning-background-processor/src/lib.rs | 23 +++++------ lightning-liquidity/tests/common/mod.rs | 4 +- lightning-persister/src/fs_store.rs | 6 +-- lightning-persister/src/test_utils.rs | 6 +-- lightning/src/ln/channelmanager.rs | 4 +- lightning/src/util/persist.rs | 48 +++++++++++------------ lightning/src/util/sweep.rs | 24 ++++++------ lightning/src/util/test_utils.rs | 4 +- 8 files changed, 60 insertions(+), 59 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index cc622e2ac53..2ee75577d40 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -42,7 +42,7 @@ use lightning::sign::EntropySource; use lightning::sign::OutputSpender; use lightning::util::logger::Logger; use lightning::util::persist::{ - KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -690,8 +690,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # impl lightning::util::logger::Logger for Logger { /// # fn log(&self, _record: lightning::util::logger::Record) {} /// # } -/// # struct Store {} -/// # impl lightning::util::persist::KVStore for Store { +/// # struct StoreSync {} +/// # impl lightning::util::persist::KVStoreSync for StoreSync { /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } @@ -707,14 +707,14 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 } /// # fn disconnect_socket(&mut self) {} /// # } -/// # type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc, Arc>; +/// # type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc, Arc>; /// # type NetworkGraph = lightning::routing::gossip::NetworkGraph>; /// # type P2PGossipSync
    = lightning::routing::gossip::P2PGossipSync, Arc
      , Arc>; /// # type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager, B, FE, Logger>; /// # type OnionMessenger = lightning::onion_message::messenger::OnionMessenger, Arc, Arc, Arc>, Arc, Arc, Arc>>, Arc>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>; /// # type LiquidityManager = lightning_liquidity::LiquidityManager, Arc>, Arc>; /// # type Scorer = RwLock, Arc>>; -/// # type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, B, FE, Arc
        , Logger, F, Store>; +/// # type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, B, FE, Arc
          , Logger, F, StoreSync>; /// # /// # struct Node< /// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static, @@ -731,10 +731,10 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # liquidity_manager: Arc>, /// # chain_monitor: Arc>, /// # gossip_sync: Arc>, -/// # persister: Arc, +/// # persister: Arc, /// # logger: Arc, /// # scorer: Arc, -/// # sweeper: Arc, Arc, Arc, Arc, Arc, Arc, Arc>>, +/// # sweeper: Arc, Arc, Arc, Arc, Arc, Arc, Arc>>, /// # } /// # /// # async fn setup_background_processing< @@ -856,7 +856,7 @@ where LM::Target: ALiquidityManager, O::Target: 'static + OutputSpender, D::Target: 'static + ChangeDestinationSource, - K::Target: 'static + KVStore, + K::Target: 'static + KVStoreSync, { let mut should_break = false; let async_event_handler = |event| { @@ -1053,7 +1053,7 @@ impl BackgroundProcessor { LM::Target: ALiquidityManager, D::Target: ChangeDestinationSourceSync, O::Target: 'static + OutputSpender, - K::Target: 'static + KVStore, + K::Target: 'static + KVStoreSync, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = Arc::clone(&stop_thread); @@ -1226,7 +1226,8 @@ mod tests { use lightning::types::payment::PaymentHash; use lightning::util::config::UserConfig; use lightning::util::persist::{ - KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1464,7 +1465,7 @@ mod tests { } } - impl KVStore for Persister { + impl KVStoreSync for Persister { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> lightning::io::Result> { diff --git a/lightning-liquidity/tests/common/mod.rs b/lightning-liquidity/tests/common/mod.rs index ebf2afdaadd..6fa355a1340 100644 --- a/lightning-liquidity/tests/common/mod.rs +++ b/lightning-liquidity/tests/common/mod.rs @@ -27,7 +27,7 @@ use lightning::routing::scoring::{ChannelUsage, ScoreLookUp, ScoreUpdate}; use lightning::sign::{InMemorySigner, KeysManager}; use lightning::util::config::UserConfig; use lightning::util::persist::{ - KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -194,7 +194,7 @@ impl Persister { } } -impl KVStore for Persister { +impl KVStoreSync for Persister { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> lightning::io::Result> { diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 86c19de2144..9f490eb6fb2 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -2,7 +2,7 @@ use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; use lightning::types::string::PrintableString; -use lightning::util::persist::{KVStore, MigratableKVStore}; +use lightning::util::persist::{KVStoreSync, MigratableKVStore}; use std::collections::HashMap; use std::fs; @@ -33,7 +33,7 @@ fn path_to_windows_str>(path: &T) -> Vec { // The number of read/write/remove/list operations after which we clean up our `locks` HashMap. const GC_LOCK_INTERVAL: usize = 25; -/// A [`KVStore`] implementation that writes to and reads from the file system. +/// A [`KVStoreSync`] implementation that writes to and reads from the file system. pub struct FilesystemStore { data_dir: PathBuf, tmp_file_counter: AtomicUsize, @@ -92,7 +92,7 @@ impl FilesystemStore { } } -impl KVStore for FilesystemStore { +impl KVStoreSync for FilesystemStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> lightning::io::Result> { diff --git a/lightning-persister/src/test_utils.rs b/lightning-persister/src/test_utils.rs index c6617e8be1e..18d643c7443 100644 --- a/lightning-persister/src/test_utils.rs +++ b/lightning-persister/src/test_utils.rs @@ -4,7 +4,7 @@ use lightning::ln::functional_test_utils::{ create_network, create_node_cfgs, create_node_chanmgrs, send_payment, }; use lightning::util::persist::{ - migrate_kv_store_data, read_channel_monitors, KVStore, MigratableKVStore, + migrate_kv_store_data, read_channel_monitors, KVStoreSync, MigratableKVStore, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, }; use lightning::util::test_utils; @@ -12,7 +12,7 @@ use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event use std::panic::RefUnwindSafe; -pub(crate) fn do_read_write_remove_list_persist(kv_store: &K) { +pub(crate) fn do_read_write_remove_list_persist(kv_store: &K) { let data = [42u8; 32]; let primary_namespace = "testspace"; @@ -113,7 +113,7 @@ pub(crate) fn do_test_data_migration // Integration-test the given KVStore implementation. Test relaying a few payments and check that // the persisted data is updated the appropriate number of times. -pub(crate) fn do_test_store(store_0: &K, store_1: &K) { +pub(crate) fn do_test_store(store_0: &K, store_1: &K) { let chanmon_cfgs = create_chanmon_cfgs(2); let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let chain_mon_0 = test_utils::TestChainMonitor::new( diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 02f79707023..828067635f4 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1883,7 +1883,7 @@ where /// - Perform any periodic channel and payment checks by calling [`timer_tick_occurred`] roughly /// every minute /// - Persist to disk whenever [`get_and_clear_needs_persistence`] returns `true` using a -/// [`KVStore`] implementation +/// [`KVStoreSync`] implementation /// - Handle [`Event`]s obtained via its [`EventsProvider`] implementation /// /// The [`Future`] returned by [`get_event_or_persistence_needed_future`] is useful in determining @@ -2465,7 +2465,7 @@ where /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events /// [`timer_tick_occurred`]: Self::timer_tick_occurred /// [`get_and_clear_needs_persistence`]: Self::get_and_clear_needs_persistence -/// [`KVStore`]: crate::util::persist::KVStore +/// [`KVStoreSync`]: crate::util::persist::KVStore /// [`get_event_or_persistence_needed_future`]: Self::get_event_or_persistence_needed_future /// [`lightning-block-sync`]: https://docs.rs/lightning_block_sync/latest/lightning_block_sync /// [`lightning-transaction-sync`]: https://docs.rs/lightning_transaction_sync/latest/lightning_transaction_sync diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index a229432a7b6..6e93cc83f16 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -4,7 +4,7 @@ // You may not use this file except in accordance with one or both of these // licenses. -//! This module contains a simple key-value store trait [`KVStore`] that +//! This module contains a simple key-value store trait [`KVStoreSync`] that //! allows one to implement the persistence for [`ChannelManager`], [`NetworkGraph`], //! and [`ChannelMonitor`] all in one place. //! @@ -129,7 +129,7 @@ pub const MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL: &[u8] = &[0xFF; 2]; /// **Note:** Users migrating custom persistence backends from the pre-v0.0.117 `KVStorePersister` /// interface can use a concatenation of `[{primary_namespace}/[{secondary_namespace}/]]{key}` to /// recover a `key` compatible with the data model previously assumed by `KVStorePersister::persist`. -pub trait KVStore { +pub trait KVStoreSync { /// Returns the data stored for the given `primary_namespace`, `secondary_namespace`, and /// `key`. /// @@ -152,7 +152,7 @@ pub trait KVStore { /// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily /// remove the given `key` at some point in time after the method returns, e.g., as part of an /// eventual batch deletion of multiple keys. As a consequence, subsequent calls to - /// [`KVStore::list`] might include the removed key until the changes are actually persisted. + /// [`KVStoreSync::list`] might include the removed key until the changes are actually persisted. /// /// Note that while setting the `lazy` flag reduces the I/O burden of multiple subsequent /// `remove` calls, it also influences the atomicity guarantees as lazy `remove`s could @@ -175,12 +175,12 @@ pub trait KVStore { ) -> Result, io::Error>; } -/// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] +/// Provides additional interface methods that are required for [`KVStoreSync`]-to-[`KVStoreSync`] /// data migration. -pub trait MigratableKVStore: KVStore { +pub trait MigratableKVStore: KVStoreSync { /// Returns *all* known keys as a list of `primary_namespace`, `secondary_namespace`, `key` tuples. /// - /// This is useful for migrating data from [`KVStore`] implementation to [`KVStore`] + /// This is useful for migrating data from [`KVStoreSync`] implementation to [`KVStoreSync`] /// implementation. /// /// Must exhaustively return all entries known to the store to ensure no data is missed, but @@ -209,7 +209,7 @@ pub fn migrate_kv_store_data( Ok(()) } -impl Persist for K { +impl Persist for K { // TODO: We really need a way for the persister to inform the user that its time to crash/shut // down once these start returning failure. // Then we should return InProgress rather than UnrecoverableError, implying we should probably @@ -277,7 +277,7 @@ pub fn read_channel_monitors( kv_store: K, entropy_source: ES, signer_provider: SP, ) -> Result::EcdsaSigner>)>, io::Error> where - K::Target: KVStore, + K::Target: KVStoreSync, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, { @@ -322,7 +322,7 @@ where /// /// # Overview /// -/// The main benefit this provides over the [`KVStore`]'s [`Persist`] implementation is decreased +/// The main benefit this provides over the [`KVStoreSync`]'s [`Persist`] implementation is decreased /// I/O bandwidth and storage churn, at the expense of more IOPS (including listing, reading, and /// deleting) and complexity. This is because it writes channel monitor differential updates, /// whereas the other (default) implementation rewrites the entire monitor on each update. For @@ -330,7 +330,7 @@ where /// of megabytes (or more). Updates can be as small as a few hundred bytes. /// /// Note that monitors written with `MonitorUpdatingPersister` are _not_ backward-compatible with -/// the default [`KVStore`]'s [`Persist`] implementation. They have a prepended byte sequence, +/// the default [`KVStoreSync`]'s [`Persist`] implementation. They have a prepended byte sequence, /// [`MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL`], applied to prevent deserialization with other /// persisters. This is because monitors written by this struct _may_ have unapplied updates. In /// order to downgrade, you must ensure that all updates are applied to the monitor, and remove the @@ -382,7 +382,7 @@ where /// /// ## EXTREMELY IMPORTANT /// -/// It is extremely important that your [`KVStore::read`] implementation uses the +/// It is extremely important that your [`KVStoreSync::read`] implementation uses the /// [`io::ErrorKind::NotFound`] variant correctly: that is, when a file is not found, and _only_ in /// that circumstance (not when there is really a permissions error, for example). This is because /// neither channel monitor reading function lists updates. Instead, either reads the monitor, and @@ -394,7 +394,7 @@ where /// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`. /// Monitor updates in the range between the latest `update_id` and `update_id - maximum_pending_updates` /// are deleted. -/// The `lazy` flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions +/// The `lazy` flag is used on the [`KVStoreSync::remove`] method, so there are no guarantees that the deletions /// will complete. However, stale updates are not a problem for data integrity, since updates are /// only read that are higher than the stored [`ChannelMonitor`]'s `update_id`. /// @@ -403,7 +403,7 @@ where /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. pub struct MonitorUpdatingPersister where - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, @@ -423,7 +423,7 @@ where impl MonitorUpdatingPersister where - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, @@ -463,7 +463,7 @@ where /// Reads all stored channel monitors, along with any stored updates for them. /// - /// It is extremely important that your [`KVStore::read`] implementation uses the + /// It is extremely important that your [`KVStoreSync::read`] implementation uses the /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the /// documentation for [`MonitorUpdatingPersister`]. pub fn read_all_channel_monitors_with_updates( @@ -485,7 +485,7 @@ where /// Read a single channel monitor, along with any stored updates for it. /// - /// It is extremely important that your [`KVStore::read`] implementation uses the + /// It is extremely important that your [`KVStoreSync::read`] implementation uses the /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the /// documentation for [`MonitorUpdatingPersister`]. /// @@ -612,7 +612,7 @@ where /// This function works by first listing all monitors, and then for each of them, listing all /// updates. The updates that have an `update_id` less than or equal to than the stored monitor /// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will - /// be passed to [`KVStore::remove`]. + /// be passed to [`KVStoreSync::remove`]. pub fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { let monitor_keys = self.kv_store.list( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, @@ -651,7 +651,7 @@ impl< FE: Deref, > Persist for MonitorUpdatingPersister where - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, @@ -659,7 +659,7 @@ where FE::Target: FeeEstimator, { /// Persists a new channel. This means writing the entire monitor to the - /// parametrized [`KVStore`]. + /// parametrized [`KVStoreSync`]. fn persist_new_channel( &self, monitor_name: MonitorName, monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { @@ -692,11 +692,11 @@ where } } - /// Persists a channel update, writing only the update to the parameterized [`KVStore`] if possible. + /// Persists a channel update, writing only the update to the parameterized [`KVStoreSync`] if possible. /// /// In some cases, this will forward to [`MonitorUpdatingPersister::persist_new_channel`]: /// - /// - No full monitor is found in [`KVStore`] + /// - No full monitor is found in [`KVStoreSync`] /// - The number of pending updates exceeds `maximum_pending_updates` as given to [`Self::new`] /// - LDK commands re-persisting the entire monitor through this function, specifically when /// `update` is `None`. @@ -806,7 +806,7 @@ impl MonitorUpdatingPersister where ES::Target: EntropySource + Sized, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, SP::Target: SignerProvider + Sized, BI::Target: BroadcasterInterface, @@ -887,7 +887,7 @@ pub enum MonitorName { } impl MonitorName { - /// Attempts to construct a `MonitorName` from a storage key returned by [`KVStore::list`]. + /// Attempts to construct a `MonitorName` from a storage key returned by [`KVStoreSync::list`]. /// /// This is useful when you need to reconstruct the original data the key represents. fn from_str(monitor_key: &str) -> Result { @@ -1426,7 +1426,7 @@ mod tests { #[test] fn kvstore_trait_object_usage() { - let store: Arc = Arc::new(TestStore::new(false)); + let store: Arc = Arc::new(TestStore::new(false)); assert!(persist_fn::<_, TestChannelSigner>(Arc::clone(&store))); } } diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index 10558b3fdea..ee79793e313 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -5,7 +5,7 @@ // licenses. //! This module contains an [`OutputSweeper`] utility that keeps track of -//! [`SpendableOutputDescriptor`]s, i.e., persists them in a given [`KVStore`] and regularly retries +//! [`SpendableOutputDescriptor`]s, i.e., persists them in a given [`KVStoreSync`] and regularly retries //! sweeping them. use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; @@ -23,7 +23,7 @@ use crate::sync::Arc; use crate::sync::Mutex; use crate::util::logger::Logger; use crate::util::persist::{ - KVStore, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + KVStoreSync, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, }; use crate::util::ser::{Readable, ReadableArgs, Writeable}; @@ -328,7 +328,7 @@ impl_writeable_tlv_based_enum!(OutputSpendStatus, ); /// A utility that keeps track of [`SpendableOutputDescriptor`]s, persists them in a given -/// [`KVStore`] and regularly retries sweeping them based on a callback given to the constructor +/// [`KVStoreSync`] and regularly retries sweeping them based on a callback given to the constructor /// methods. /// /// Users should call [`Self::track_spendable_outputs`] for any [`SpendableOutputDescriptor`]s received via [`Event::SpendableOutputs`]. @@ -347,7 +347,7 @@ where D::Target: ChangeDestinationSource, E::Target: FeeEstimator, F::Target: Filter, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { @@ -369,7 +369,7 @@ where D::Target: ChangeDestinationSource, E::Target: FeeEstimator, F::Target: Filter, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { @@ -684,7 +684,7 @@ where D::Target: ChangeDestinationSource, E::Target: FeeEstimator, F::Target: Filter + Sync + Send, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { @@ -731,7 +731,7 @@ where D::Target: ChangeDestinationSource, E::Target: FeeEstimator, F::Target: Filter + Sync + Send, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { @@ -828,7 +828,7 @@ where D::Target: ChangeDestinationSource, E::Target: FeeEstimator, F::Target: Filter + Sync + Send, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { @@ -877,7 +877,7 @@ where D::Target: ChangeDestinationSource, E::Target: FeeEstimator, F::Target: Filter + Sync + Send, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { @@ -929,7 +929,7 @@ where D::Target: ChangeDestinationSourceSync, E::Target: FeeEstimator, F::Target: Filter, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { @@ -943,7 +943,7 @@ where D::Target: ChangeDestinationSourceSync, E::Target: FeeEstimator, F::Target: Filter, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { @@ -1017,7 +1017,7 @@ where D::Target: ChangeDestinationSourceSync, E::Target: FeeEstimator, F::Target: Filter + Sync + Send, - K::Target: KVStore, + K::Target: KVStoreSync, L::Target: Logger, O::Target: OutputSpender, { diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index e0869bf4364..17ef2a52af7 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -57,7 +57,7 @@ use crate::util::dyn_signer::{ use crate::util::logger::{Logger, Record}; #[cfg(feature = "std")] use crate::util::mut_global::MutGlobal; -use crate::util::persist::{KVStore, MonitorName}; +use crate::util::persist::{KVStoreSync, MonitorName}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner}; @@ -800,7 +800,7 @@ impl TestStore { } } -impl KVStore for TestStore { +impl KVStoreSync for TestStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result> { From 59881951391e64b9275d05beb6ea8beba6492577 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 17 Jul 2025 15:04:57 +0200 Subject: [PATCH 3/3] Add async KVStore --- lightning-background-processor/src/lib.rs | 187 +++++++--- lightning/src/util/persist.rs | 89 ++++- lightning/src/util/sweep.rs | 410 ++++++++++++++++------ 3 files changed, 535 insertions(+), 151 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 2ee75577d40..faf7898730c 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -42,15 +42,16 @@ use lightning::sign::EntropySource; use lightning::sign::OutputSpender; use lightning::util::logger::Logger; use lightning::util::persist::{ - KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::sweep::OutputSweeper; #[cfg(feature = "std")] use lightning::util::sweep::OutputSweeperSync; +use lightning::util::sweep::OutputSweeperSyncKVStore; #[cfg(feature = "std")] use lightning::util::wakers::Sleeper; use lightning_rapid_gossip_sync::RapidGossipSync; @@ -318,6 +319,15 @@ fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + Wri true } +macro_rules! maybe_await { + (true, $e:expr) => { + $e.await + }; + (false, $e:expr) => { + $e + }; +} + macro_rules! define_run_body { ( $kv_store: ident, @@ -327,7 +337,7 @@ macro_rules! define_run_body { $peer_manager: ident, $gossip_sync: ident, $process_sweeper: expr, $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, - $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, + $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async_persist: tt, ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.get_cm().timer_tick_occurred(); @@ -383,12 +393,12 @@ macro_rules! define_run_body { if $channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!($logger, "Persisting ChannelManager..."); - $kv_store.write( + maybe_await!($async_persist, $kv_store.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, &$channel_manager.get_cm().encode(), - )?; + ))?; log_trace!($logger, "Done persisting ChannelManager."); } if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) { @@ -449,12 +459,12 @@ macro_rules! define_run_body { log_trace!($logger, "Persisting network graph."); } - if let Err(e) = $kv_store.write( + if let Err(e) = maybe_await!($async_persist, $kv_store.write( NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, &network_graph.encode(), - ) { + )) { log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) } @@ -482,12 +492,12 @@ macro_rules! define_run_body { } else { log_trace!($logger, "Persisting scorer"); } - if let Err(e) = $kv_store.write( + if let Err(e) = maybe_await!($async_persist, $kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, &scorer.encode(), - ) { + )) { log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) } } @@ -510,31 +520,31 @@ macro_rules! define_run_body { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - $kv_store.write( + maybe_await!($async_persist, $kv_store.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, &$channel_manager.get_cm().encode(), - )?; + ))?; // Persist Scorer on exit if let Some(ref scorer) = $scorer { - $kv_store.write( + maybe_await!($async_persist, $kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, &scorer.encode(), - )?; + ))?; } // Persist NetworkGraph on exit if let Some(network_graph) = $gossip_sync.network_graph() { - $kv_store.write( + maybe_await!($async_persist, $kv_store.write( NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, &network_graph.encode(), - )?; + ))?; } Ok(()) @@ -681,11 +691,12 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// ``` /// # use lightning::io; /// # use lightning::events::ReplayEvent; -/// # use lightning::util::sweep::OutputSweeper; /// # use std::sync::{Arc, RwLock}; /// # use std::sync::atomic::{AtomicBool, Ordering}; /// # use std::time::SystemTime; -/// # use lightning_background_processor::{process_events_async, GossipSync}; +/// # use lightning_background_processor::{process_events_full_async, GossipSync}; +/// # use core::future::Future; +/// # use core::pin::Pin; /// # struct Logger {} /// # impl lightning::util::logger::Logger for Logger { /// # fn log(&self, _record: lightning::util::logger::Record) {} @@ -697,6 +708,13 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { Ok(Vec::new()) } /// # } +/// # struct Store {} +/// # impl lightning::util::persist::KVStore for Store { +/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin, io::Error>> + 'static + Send>> { todo!() } +/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin> + 'static + Send>> { todo!() } +/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin> + 'static + Send>> { todo!() } +/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin, io::Error>> + 'static + Send>> { todo!() } +/// # } /// # struct EventHandler {} /// # impl EventHandler { /// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) } @@ -715,7 +733,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # type LiquidityManager = lightning_liquidity::LiquidityManager, Arc>, Arc>; /// # type Scorer = RwLock, Arc>>; /// # type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, B, FE, Arc
            , Logger, F, StoreSync>; -/// # +/// # type OutputSweeper = lightning::util::sweep::OutputSweeper, Arc, Arc, Arc, Arc, Arc, Arc>; +/// /// # struct Node< /// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static, /// # F: lightning::chain::Filter + Send + Sync + 'static, @@ -731,10 +750,10 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// # liquidity_manager: Arc>, /// # chain_monitor: Arc>, /// # gossip_sync: Arc>, -/// # persister: Arc, +/// # persister: Arc, /// # logger: Arc, /// # scorer: Arc, -/// # sweeper: Arc, Arc, Arc, Arc, Arc, Arc, Arc>>, +/// # sweeper: Arc>, /// # } /// # /// # async fn setup_background_processing< @@ -780,7 +799,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; doc = " let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();" )] #[cfg_attr(not(feature = "std"), doc = " rt.block_on(async move {")] -/// process_events_async( +/// process_events_full_async( /// background_persister, /// |e| background_event_handler.handle_event(e), /// background_chain_mon, @@ -805,7 +824,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; #[cfg_attr(feature = "std", doc = " handle.await.unwrap()")] /// # } ///``` -pub async fn process_events_async< +pub async fn process_events_full_async< 'a, UL: 'static + Deref, CF: 'static + Deref, @@ -856,7 +875,7 @@ where LM::Target: ALiquidityManager, O::Target: 'static + OutputSpender, D::Target: 'static + ChangeDestinationSource, - K::Target: 'static + KVStoreSync, + K::Target: 'static + KVStore, { let mut should_break = false; let async_event_handler = |event| { @@ -875,12 +894,15 @@ where if let Some(duration_since_epoch) = fetch_time() { if update_scorer(scorer, &event, duration_since_epoch) { log_trace!(logger, "Persisting scorer after update"); - if let Err(e) = kv_store.write( - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - &scorer.encode(), - ) { + if let Err(e) = kv_store + .write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) + .await + { log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e); // We opt not to abort early on persistence failure here as persisting // the scorer is non-critical and we still hope that it will have @@ -958,7 +980,83 @@ where }, mobile_interruptable_platform, fetch_time, + true, + ) +} + +/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for +/// synchronous background persistence. +pub async fn process_events_async< + UL: 'static + Deref, + CF: 'static + Deref, + T: 'static + Deref, + F: 'static + Deref, + G: 'static + Deref>, + L: 'static + Deref + Send + Sync, + P: 'static + Deref, + EventHandlerFuture: core::future::Future>, + EventHandler: Fn(Event) -> EventHandlerFuture, + ES: 'static + Deref + Send, + M: 'static + + Deref::Signer, CF, T, F, L, P, ES>> + + Send + + Sync, + CM: 'static + Deref + Send + Sync, + OM: 'static + Deref, + PGS: 'static + Deref>, + RGS: 'static + Deref>, + PM: 'static + Deref, + LM: 'static + Deref, + D: 'static + Deref, + O: 'static + Deref, + K: 'static + Deref, + OS: 'static + Deref>, + S: 'static + Deref + Send + Sync, + SC: for<'b> WriteableScore<'b>, + SleepFuture: core::future::Future + core::marker::Unpin, + Sleeper: Fn(Duration) -> SleepFuture, + FetchTime: Fn() -> Option, +>( + kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, + onion_messenger: Option, gossip_sync: GossipSync, peer_manager: PM, + liquidity_manager: Option, sweeper: Option, logger: L, scorer: Option, + sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime, +) -> Result<(), lightning::io::Error> +where + UL::Target: 'static + UtxoLookup, + CF::Target: 'static + chain::Filter, + T::Target: 'static + BroadcasterInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist<::Signer>, + ES::Target: 'static + EntropySource, + CM::Target: AChannelManager, + OM::Target: AOnionMessenger, + PM::Target: APeerManager, + LM::Target: ALiquidityManager, + O::Target: 'static + OutputSpender, + D::Target: 'static + ChangeDestinationSource, + K::Target: 'static + KVStoreSync, +{ + let kv_store = Arc::new(KVStoreSyncWrapper(kv_store)); + let sweeper = sweeper.map(|s| s.sweeper_async()); + process_events_full_async( + kv_store, + event_handler, + chain_monitor, + channel_manager, + onion_messenger, + gossip_sync, + peer_manager, + liquidity_manager, + sweeper, + logger, + scorer, + sleeper, + mobile_interruptable_platform, + fetch_time, ) + .await } #[cfg(feature = "std")] @@ -1138,6 +1236,7 @@ impl BackgroundProcessor { .expect("Time should be sometime after 1970"), ) }, + false, ) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } @@ -1226,7 +1325,7 @@ mod tests { use lightning::types::payment::PaymentHash; use lightning::util::config::UserConfig; use lightning::util::persist::{ - KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, + KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, @@ -2151,12 +2250,13 @@ mod tests { open_channel!(nodes[0], nodes[1], 100000); let data_dir = nodes[0].kv_store.get_data_dir(); - let persister = Arc::new( + let kv_store_sync = Arc::new( Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"), ); + let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync)); - let bp_future = super::process_events_async( - persister, + let bp_future = super::process_events_full_async( + kv_store, |_: _| async { Ok(()) }, Arc::clone(&nodes[0].chain_monitor), Arc::clone(&nodes[0].node), @@ -2659,11 +2759,13 @@ mod tests { let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async"); let data_dir = nodes[0].kv_store.get_data_dir(); - let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); + let kv_store_sync = + Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); + let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync)); let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); - let bp_future = super::process_events_async( - persister, + let bp_future = super::process_events_full_async( + kv_store, |_: _| async { Ok(()) }, Arc::clone(&nodes[0].chain_monitor), Arc::clone(&nodes[0].node), @@ -2875,12 +2977,13 @@ mod tests { let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async"); let data_dir = nodes[0].kv_store.get_data_dir(); - let persister = Arc::new(Persister::new(data_dir)); + let kv_store_sync = Arc::new(Persister::new(data_dir)); + let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync)); let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); - let bp_future = super::process_events_async( - persister, + let bp_future = super::process_events_full_async( + kv_store, event_handler, Arc::clone(&nodes[0].chain_monitor), Arc::clone(&nodes[0].node), diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 6e93cc83f16..c1cdd0d5fb3 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -14,7 +14,9 @@ use bitcoin::hashes::hex::FromHex; use bitcoin::{BlockHash, Txid}; use core::cmp; +use core::future::Future; use core::ops::Deref; +use core::pin::Pin; use core::str::FromStr; use crate::prelude::*; @@ -108,6 +110,78 @@ pub const OUTPUT_SWEEPER_PERSISTENCE_KEY: &str = "output_sweeper"; /// updates applied to be current) with another implementation. pub const MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL: &[u8] = &[0xFF; 2]; +/// A synchronous version of the [`KVStore`] trait. +pub trait KVStoreSync { + /// A synchronous version of the [`KVStore::read`] method. + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, io::Error>; + /// A synchronous version of the [`KVStore::write`] method. + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> Result<(), io::Error>; + /// A synchronous version of the [`KVStore::remove`] method. + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), io::Error>; + /// A synchronous version of the [`KVStore::list`] method. + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, io::Error>; +} + +/// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait. +pub struct KVStoreSyncWrapper(pub K) +where + K::Target: KVStoreSync; + +impl Deref for KVStoreSyncWrapper +where + K::Target: KVStoreSync, +{ + type Target = Self; + fn deref(&self) -> &Self { + self + } +} + +impl KVStore for KVStoreSyncWrapper +where + K::Target: KVStoreSync, +{ + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, io::Error>> + 'static + Send>> { + let res = self.0.read(primary_namespace, secondary_namespace, key); + + Box::pin(async move { res }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> Pin> + 'static + Send>> { + let res = self.0.write(primary_namespace, secondary_namespace, key, buf); + + Box::pin(async move { res }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + 'static + Send>> { + let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy); + + Box::pin(async move { res }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, io::Error>> + 'static + Send>> { + let res = self.0.list(primary_namespace, secondary_namespace); + + Box::pin(async move { res }) + } +} + /// Provides an interface that allows storage and retrieval of persisted values that are associated /// with given keys. /// @@ -129,7 +203,7 @@ pub const MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL: &[u8] = &[0xFF; 2]; /// **Note:** Users migrating custom persistence backends from the pre-v0.0.117 `KVStorePersister` /// interface can use a concatenation of `[{primary_namespace}/[{secondary_namespace}/]]{key}` to /// recover a `key` compatible with the data model previously assumed by `KVStorePersister::persist`. -pub trait KVStoreSync { +pub trait KVStore { /// Returns the data stored for the given `primary_namespace`, `secondary_namespace`, and /// `key`. /// @@ -139,14 +213,15 @@ pub trait KVStoreSync { /// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Result, io::Error>; - /// Persists the given data under the given `key`. + ) -> Pin, io::Error>> + 'static + Send>>; + /// Persists the given data under the given `key`. Note that the order of multiple writes calls needs to be retained + /// when persisting asynchronously. /// /// Will create the given `primary_namespace` and `secondary_namespace` if not already present /// in the store. fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], - ) -> Result<(), io::Error>; + ) -> Pin> + 'static + Send>>; /// Removes any data that had previously been persisted under the given `key`. /// /// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily @@ -164,7 +239,7 @@ pub trait KVStoreSync { /// invokation or not. fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> Result<(), io::Error>; + ) -> Pin> + 'static + Send>>; /// Returns a list of keys that are stored under the given `secondary_namespace` in /// `primary_namespace`. /// @@ -172,10 +247,10 @@ pub trait KVStoreSync { /// returned keys. Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown. fn list( &self, primary_namespace: &str, secondary_namespace: &str, - ) -> Result, io::Error>; + ) -> Pin, io::Error>> + 'static + Send>>; } -/// Provides additional interface methods that are required for [`KVStoreSync`]-to-[`KVStoreSync`] +/// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] /// data migration. pub trait MigratableKVStore: KVStoreSync { /// Returns *all* known keys as a list of `primary_namespace`, `secondary_namespace`, `key` tuples. diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index ee79793e313..c1f0229bdd1 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -23,8 +23,8 @@ use crate::sync::Arc; use crate::sync::Mutex; use crate::util::logger::Logger; use crate::util::persist::{ - KVStoreSync, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + KVStore, KVStoreSync, KVStoreSyncWrapper, OUTPUT_SWEEPER_PERSISTENCE_KEY, + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, }; use crate::util::ser::{Readable, ReadableArgs, Writeable}; use crate::{impl_writeable_tlv_based, log_debug, log_error}; @@ -36,6 +36,7 @@ use bitcoin::{BlockHash, ScriptBuf, Transaction, Txid}; use core::future::Future; use core::ops::Deref; +use core::pin::Pin; use core::sync::atomic::{AtomicBool, Ordering}; use core::task; @@ -347,7 +348,7 @@ where D::Target: ChangeDestinationSource, E::Target: FeeEstimator, F::Target: Filter, - K::Target: KVStoreSync, + K::Target: KVStore, L::Target: Logger, O::Target: OutputSpender, { @@ -369,7 +370,7 @@ where D::Target: ChangeDestinationSource, E::Target: FeeEstimator, F::Target: Filter, - K::Target: KVStoreSync, + K::Target: KVStore, L::Target: Logger, O::Target: OutputSpender, { @@ -411,7 +412,7 @@ where /// Returns `Err` on persistence failure, in which case the call may be safely retried. /// /// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs - pub fn track_spendable_outputs( + pub async fn track_spendable_outputs( &self, output_descriptors: Vec, channel_id: Option, exclude_static_outputs: bool, delay_until_height: Option, ) -> Result<(), ()> { @@ -427,29 +428,38 @@ where return Ok(()); } - let mut state_lock = self.sweeper_state.lock().unwrap(); - for descriptor in relevant_descriptors { - let output_info = TrackedSpendableOutput { - descriptor, - channel_id, - status: OutputSpendStatus::PendingInitialBroadcast { - delayed_until_height: delay_until_height, - }, - }; - - if state_lock.outputs.iter().find(|o| o.descriptor == output_info.descriptor).is_some() - { - continue; + let persist_fut = { + let mut state_lock = self.sweeper_state.lock().unwrap(); + for descriptor in relevant_descriptors { + let output_info = TrackedSpendableOutput { + descriptor, + channel_id, + status: OutputSpendStatus::PendingInitialBroadcast { + delayed_until_height: delay_until_height, + }, + }; + + if state_lock + .outputs + .iter() + .find(|o| o.descriptor == output_info.descriptor) + .is_some() + { + continue; + } + + state_lock.outputs.push(output_info); } - state_lock.outputs.push(output_info); - } - self.persist_state(&*state_lock).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - })?; - state_lock.dirty = false; + state_lock.dirty = false; + self.persist_state(&state_lock) + }; + + persist_fut.await.map_err(|e| { + self.sweeper_state.lock().unwrap().dirty = true; - Ok(()) + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + }) } /// Returns a list of the currently tracked spendable outputs. @@ -505,22 +515,30 @@ where }; // See if there is anything to sweep before requesting a change address. - { + let (persist_fut, has_respends) = { let mut sweeper_state = self.sweeper_state.lock().unwrap(); let cur_height = sweeper_state.best_block.height; let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height)); - if !has_respends { - // If there is nothing to sweep, we still persist the state if it is dirty. - if sweeper_state.dirty { - self.persist_state(&sweeper_state).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - })?; - sweeper_state.dirty = false; - } - - return Ok(()); + // If there is nothing to sweep, we still persist the state if it is dirty. + if !has_respends && sweeper_state.dirty { + sweeper_state.dirty = false; + (Some(self.persist_state(&sweeper_state)), has_respends) + } else { + (None, has_respends) } + }; + + if let Some(persist_fut) = persist_fut { + persist_fut.await.map_err(|e| { + self.sweeper_state.lock().unwrap().dirty = true; + + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + })?; + }; + + if !has_respends { + return Ok(()); } // Request a new change address outside of the mutex to avoid the mutex crossing await. @@ -528,7 +546,7 @@ where self.change_destination_source.get_change_destination_script().await?; // Sweep the outputs. - { + let persist_fut = { let mut sweeper_state = self.sweeper_state.lock().unwrap(); let cur_height = sweeper_state.best_block.height; @@ -541,53 +559,51 @@ where .map(|o| &o.descriptor) .collect(); - if respend_descriptors.is_empty() { - // It could be that a tx confirmed and there is now nothing to sweep anymore. We still persist the state - // if it is dirty. - if sweeper_state.dirty { - self.persist_state(&sweeper_state).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - })?; - sweeper_state.dirty = false; - } - + // Exit if there is nothing to spend anymore and there also is no need to persist the state. + if respend_descriptors.is_empty() && !sweeper_state.dirty { return Ok(()); } - let spending_tx = self - .spend_outputs(&sweeper_state, &respend_descriptors, change_destination_script) - .map_err(|e| { - log_error!(self.logger, "Error spending outputs: {:?}", e); - })?; - - log_debug!( - self.logger, - "Generating and broadcasting sweeping transaction {}", - spending_tx.compute_txid() - ); - - // As we didn't modify the state so far, the same filter_fn yields the same elements as - // above. - let respend_outputs = - sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height)); - for output_info in respend_outputs { - if let Some(filter) = self.chain_data_source.as_ref() { - let watched_output = output_info.to_watched_output(cur_hash); - filter.register_output(watched_output); + // Generate the spending transaction and broadcast it. + if !respend_descriptors.is_empty() { + let spending_tx = self + .spend_outputs(&sweeper_state, &respend_descriptors, change_destination_script) + .map_err(|e| { + log_error!(self.logger, "Error spending outputs: {:?}", e); + })?; + + log_debug!( + self.logger, + "Generating and broadcasting sweeping transaction {}", + spending_tx.compute_txid() + ); + + // As we didn't modify the state so far, the same filter_fn yields the same elements as + // above. + let respend_outputs = + sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o, cur_height)); + for output_info in respend_outputs { + if let Some(filter) = self.chain_data_source.as_ref() { + let watched_output = output_info.to_watched_output(cur_hash); + filter.register_output(watched_output); + } + + output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone()); } - output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone()); + self.broadcaster.broadcast_transactions(&[&spending_tx]); } - self.persist_state(&sweeper_state).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - })?; + // Either the state was already dirty or we modified it above, so we persist it. sweeper_state.dirty = false; + self.persist_state(&sweeper_state) + }; - self.broadcaster.broadcast_transactions(&[&spending_tx]); - } + persist_fut.await.map_err(|e| { + self.sweeper_state.lock().unwrap().dirty = true; - Ok(()) + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + }) } fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) { @@ -612,25 +628,17 @@ where sweeper_state.dirty = true; } - fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> { - self.kv_store - .write( - OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_KEY, - &sweeper_state.encode(), - ) - .map_err(|e| { - log_error!( - self.logger, - "Write for key {}/{}/{} failed due to: {}", - OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_KEY, - e - ); - e - }) + fn persist_state<'a>( + &self, sweeper_state: &SweeperState, + ) -> Pin> + 'a + Send>> { + let encoded = sweeper_state.encode(); + + self.kv_store.write( + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, + &encoded, + ) } fn spend_outputs( @@ -684,7 +692,7 @@ where D::Target: ChangeDestinationSource, E::Target: FeeEstimator, F::Target: Filter + Sync + Send, - K::Target: KVStoreSync, + K::Target: KVStore, L::Target: Logger, O::Target: OutputSpender, { @@ -731,7 +739,7 @@ where D::Target: ChangeDestinationSource, E::Target: FeeEstimator, F::Target: Filter + Sync + Send, - K::Target: KVStoreSync, + K::Target: KVStore, L::Target: Logger, O::Target: OutputSpender, { @@ -828,7 +836,7 @@ where D::Target: ChangeDestinationSource, E::Target: FeeEstimator, F::Target: Filter + Sync + Send, - K::Target: KVStoreSync, + K::Target: KVStore, L::Target: Logger, O::Target: OutputSpender, { @@ -877,7 +885,7 @@ where D::Target: ChangeDestinationSource, E::Target: FeeEstimator, F::Target: Filter + Sync + Send, - K::Target: KVStoreSync, + K::Target: KVStore, L::Target: Logger, O::Target: OutputSpender, { @@ -922,6 +930,173 @@ where } } +/// A wrapper around [`OutputSweeper`] to be used with a sync kv store. +pub struct OutputSweeperSyncKVStore< + B: Deref, + D: Deref, + E: Deref, + F: Deref, + K: Deref, + L: Deref, + O: Deref, +> where + B::Target: BroadcasterInterface, + D::Target: ChangeDestinationSource, + E::Target: FeeEstimator, + F::Target: Filter, + K::Target: KVStoreSync, + L::Target: Logger, + O::Target: OutputSpender, +{ + sweeper: Arc>, L, O>>, +} + +impl + OutputSweeperSyncKVStore +where + B::Target: BroadcasterInterface, + D::Target: ChangeDestinationSource, + E::Target: FeeEstimator, + F::Target: Filter, + K::Target: KVStoreSync, + L::Target: Logger, + O::Target: OutputSpender, +{ + /// Constructs a new [`OutputSweeperSyncKVStore`] instance. + pub fn new( + best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option, + output_spender: O, change_destination_source: D, kv_store: K, logger: L, + ) -> Self { + let kv_store = Arc::new(KVStoreSyncWrapper(kv_store)); + + let sweeper = OutputSweeper::new( + best_block, + broadcaster, + fee_estimator, + chain_data_source, + output_spender, + change_destination_source, + kv_store, + logger, + ); + Self { sweeper: Arc::new(sweeper) } + } + + /// Regenerates and broadcasts the spending transaction for any outputs that are pending. Wraps + /// [`OutputSweeper::regenerate_and_broadcast_spend_if_necessary`]. + pub async fn regenerate_and_broadcast_spend_if_necessary(&self) -> Result<(), ()> { + self.sweeper.regenerate_and_broadcast_spend_if_necessary().await + } + + /// Wrapper around [`OutputSweeper::track_spendable_outputs`]. + pub async fn track_spendable_outputs( + &self, output_descriptors: Vec, channel_id: Option, + exclude_static_outputs: bool, delay_until_height: Option, + ) -> Result<(), ()> { + self.sweeper + .track_spendable_outputs( + output_descriptors, + channel_id, + exclude_static_outputs, + delay_until_height, + ) + .await + } + + /// Returns a list of the currently tracked spendable outputs. Wraps [`OutputSweeper::tracked_spendable_outputs`]. + pub fn tracked_spendable_outputs(&self) -> Vec { + self.sweeper.tracked_spendable_outputs() + } + + /// Wraps [`OutputSweeper::current_best_block`]. + pub fn current_best_block(&self) -> BestBlock { + self.sweeper.current_best_block() + } + + /// Returns a reference to the underlying [`OutputSweeper`]. + pub fn sweeper_async( + &self, + ) -> Arc>, L, O>> { + Arc::clone(&self.sweeper) + } +} + +impl Confirm + for OutputSweeperSyncKVStore +where + B::Target: BroadcasterInterface, + D::Target: ChangeDestinationSource, + E::Target: FeeEstimator, + F::Target: Filter + Sync + Send, + K::Target: KVStoreSync, + L::Target: Logger, + O::Target: OutputSpender, +{ + fn transactions_confirmed( + &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, + ) { + self.sweeper.transactions_confirmed(header, txdata, height) + } + + fn transaction_unconfirmed(&self, txid: &Txid) { + self.sweeper.transaction_unconfirmed(txid) + } + + fn best_block_updated(&self, header: &Header, height: u32) { + self.sweeper.best_block_updated(header, height); + } + + fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { + self.sweeper.get_relevant_txids() + } +} + +impl Listen + for OutputSweeperSyncKVStore +where + B::Target: BroadcasterInterface, + D::Target: ChangeDestinationSource, + E::Target: FeeEstimator, + F::Target: Filter + Sync + Send, + K::Target: KVStoreSync, + L::Target: Logger, + O::Target: OutputSpender, +{ + fn filtered_block_connected( + &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, + ) { + self.sweeper.filtered_block_connected(header, txdata, height); + } + + fn block_disconnected(&self, header: &Header, height: u32) { + self.sweeper.block_disconnected(header, height); + } +} + +impl + ReadableArgs<(B, E, Option, O, D, K, L)> for OutputSweeperSyncKVStore +where + B::Target: BroadcasterInterface, + D::Target: ChangeDestinationSource, + E::Target: FeeEstimator, + F::Target: Filter + Sync + Send, + K::Target: KVStoreSync, + L::Target: Logger, + O::Target: OutputSpender, +{ + fn read( + reader: &mut R, args: (B, E, Option, O, D, K, L), + ) -> Result { + let kv_store = Arc::new(KVStoreSyncWrapper(args.5)); + + let args = (args.0, args.1, args.2, args.3, args.4, kv_store, args.6); + + let sweeper = OutputSweeper::read(reader, args)?; + + Ok(Self { sweeper: Arc::new(sweeper) }) + } +} + /// A synchronous wrapper around [`OutputSweeper`] to be used in contexts where async is not available. pub struct OutputSweeperSync where @@ -933,7 +1108,17 @@ where L::Target: Logger, O::Target: OutputSpender, { - sweeper: Arc>, E, F, K, L, O>>, + sweeper: Arc< + OutputSweeper< + B, + Arc>, + E, + F, + Arc>, + L, + O, + >, + >, } impl @@ -955,6 +1140,8 @@ where let change_destination_source = Arc::new(ChangeDestinationSourceSyncWrapper::new(change_destination_source)); + let kv_store = Arc::new(KVStoreSyncWrapper(kv_store)); + let sweeper = OutputSweeper::new( best_block, broadcaster, @@ -983,17 +1170,26 @@ where } } - /// Tells the sweeper to track the given outputs descriptors. Wraps [`OutputSweeper::track_spendable_outputs`]. + /// Wrapper around [`OutputSweeper::track_spendable_outputs`]. pub fn track_spendable_outputs( &self, output_descriptors: Vec, channel_id: Option, exclude_static_outputs: bool, delay_until_height: Option, ) -> Result<(), ()> { - self.sweeper.track_spendable_outputs( + let mut fut = Box::pin(self.sweeper.track_spendable_outputs( output_descriptors, channel_id, exclude_static_outputs, delay_until_height, - ) + )); + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match fut.as_mut().poll(&mut ctx) { + task::Poll::Ready(result) => result, + task::Poll::Pending => { + // In a sync context, we can't wait for the future to complete. + unreachable!("OutputSweeper::track_spendable_outputs should not be pending in a sync context"); + }, + } } /// Returns a list of the currently tracked spendable outputs. Wraps [`OutputSweeper::tracked_spendable_outputs`]. @@ -1005,7 +1201,17 @@ where #[cfg(any(test, feature = "_test_utils"))] pub fn sweeper_async( &self, - ) -> Arc>, E, F, K, L, O>> { + ) -> Arc< + OutputSweeper< + B, + Arc>, + E, + F, + Arc>, + L, + O, + >, + > { Arc::clone(&self.sweeper) } }