Skip to content

Commit c5eeebb

Browse files
committed
Convert process_events_async to take an asynchronous Persister
Also provide a wrapper to allow a sync kvstore to be used.
1 parent 6095c18 commit c5eeebb

File tree

2 files changed

+249
-61
lines changed

2 files changed

+249
-61
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 88 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ use lightning::sign::ChangeDestinationSourceSync;
4040
use lightning::sign::EntropySource;
4141
use lightning::sign::OutputSpender;
4242
use lightning::util::logger::Logger;
43-
use lightning::util::persist::{KVStoreSync, PersisterSync};
43+
#[cfg(feature = "std")]
44+
use lightning::util::persist::PersisterSync;
45+
use lightning::util::persist::{KVStoreSync, Persister};
4446
use lightning::util::sweep::OutputSweeper;
4547
#[cfg(feature = "std")]
4648
use lightning::util::sweep::OutputSweeperSync;
@@ -311,6 +313,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
311313
true
312314
}
313315

316+
macro_rules! maybe_await {
317+
(true, $e:expr) => {
318+
$e.await
319+
};
320+
(false, $e:expr) => {
321+
$e
322+
};
323+
}
324+
314325
macro_rules! define_run_body {
315326
(
316327
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
@@ -319,7 +330,7 @@ macro_rules! define_run_body {
319330
$peer_manager: ident, $gossip_sync: ident,
320331
$process_sweeper: expr,
321332
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
322-
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
333+
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async: tt,
323334
) => { {
324335
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
325336
$channel_manager.get_cm().timer_tick_occurred();
@@ -375,7 +386,7 @@ macro_rules! define_run_body {
375386

376387
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
377388
log_trace!($logger, "Persisting ChannelManager...");
378-
$persister.persist_manager(&$channel_manager)?;
389+
maybe_await!($async, $persister.persist_manager(&$channel_manager))?;
379390
log_trace!($logger, "Done persisting ChannelManager.");
380391
}
381392
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
@@ -436,7 +447,7 @@ macro_rules! define_run_body {
436447
log_trace!($logger, "Persisting network graph.");
437448
}
438449

439-
if let Err(e) = $persister.persist_graph(network_graph) {
450+
if let Err(e) = maybe_await!($async, $persister.persist_graph(network_graph)) {
440451
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
441452
}
442453

@@ -464,8 +475,8 @@ macro_rules! define_run_body {
464475
} else {
465476
log_trace!($logger, "Persisting scorer");
466477
}
467-
if let Err(e) = $persister.persist_scorer(&scorer) {
468-
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
478+
if let Err(e) = maybe_await!($async, $persister.persist_scorer(&scorer)) {
479+
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
469480
}
470481
}
471482
last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
@@ -487,16 +498,16 @@ macro_rules! define_run_body {
487498
// After we exit, ensure we persist the ChannelManager one final time - this avoids
488499
// some races where users quit while channel updates were in-flight, with
489500
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
490-
$persister.persist_manager(&$channel_manager)?;
501+
maybe_await!($async, $persister.persist_manager(&$channel_manager))?;
491502

492503
// Persist Scorer on exit
493504
if let Some(ref scorer) = $scorer {
494-
$persister.persist_scorer(&scorer)?;
505+
maybe_await!($async, $persister.persist_scorer(&scorer))?;
495506
}
496507

497508
// Persist NetworkGraph on exit
498509
if let Some(network_graph) = $gossip_sync.network_graph() {
499-
$persister.persist_graph(network_graph)?;
510+
maybe_await!($async, $persister.persist_graph(network_graph))?;
500511
}
501512

502513
Ok(())
@@ -643,22 +654,30 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
643654
/// ```
644655
/// # use lightning::io;
645656
/// # use lightning::events::ReplayEvent;
646-
/// # use lightning::util::sweep::OutputSweeper;
647657
/// # use std::sync::{Arc, RwLock};
648658
/// # use std::sync::atomic::{AtomicBool, Ordering};
649659
/// # use std::time::SystemTime;
650660
/// # use lightning_background_processor::{process_events_async, GossipSync};
661+
/// # use core::future::Future;
662+
/// # use core::pin::Pin;
651663
/// # struct Logger {}
652664
/// # impl lightning::util::logger::Logger for Logger {
653665
/// # fn log(&self, _record: lightning::util::logger::Record) {}
654666
/// # }
655-
/// # struct Store {}
656-
/// # impl lightning::util::persist::KVStore for Store {
667+
/// # struct StoreSync {}
668+
/// # impl lightning::util::persist::KVStoreSync for StoreSync {
657669
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
658670
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
659671
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
660672
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
661673
/// # }
674+
/// # struct Store {}
675+
/// # impl lightning::util::persist::KVStore for Store {
676+
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
677+
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
678+
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
679+
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
680+
/// # }
662681
/// # struct EventHandler {}
663682
/// # impl EventHandler {
664683
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
@@ -669,22 +688,22 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
669688
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
670689
/// # fn disconnect_socket(&mut self) {}
671690
/// # }
672-
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store>, Arc<lightning::sign::KeysManager>>;
691+
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
673692
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
674693
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
675694
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
676695
/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
677696
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
678697
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
679-
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, Store>;
680-
/// #
698+
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
699+
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O>>;
700+
///
681701
/// # struct Node<
682702
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
683703
/// # F: lightning::chain::Filter + Send + Sync + 'static,
684704
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
685705
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
686706
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
687-
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
688707
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
689708
/// # > {
690709
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
@@ -697,7 +716,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
697716
/// # persister: Arc<Store>,
698717
/// # logger: Arc<Logger>,
699718
/// # scorer: Arc<Scorer>,
700-
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O>>>,
719+
/// # sweeper: Arc<OutputSweeper<B, D, FE, F, O>>,
701720
/// # }
702721
/// #
703722
/// # async fn setup_background_processing<
@@ -706,10 +725,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
706725
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
707726
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
708727
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
709-
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
710728
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
711-
/// # >(node: Node<B, F, FE, UL, D, K, O>) {
712-
/// let background_persister = Arc::clone(&node.persister);
729+
/// # >(node: Node<B, F, FE, UL, D, O>) {
730+
/// let background_persister = Arc::new(Arc::clone(&node.persister));
713731
/// let background_event_handler = Arc::clone(&node.event_handler);
714732
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
715733
/// let background_chan_man = Arc::clone(&node.channel_manager);
@@ -814,7 +832,7 @@ where
814832
F::Target: 'static + FeeEstimator,
815833
L::Target: 'static + Logger,
816834
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
817-
PS::Target: 'static + PersisterSync<'a, CM, L, S>,
835+
PS::Target: 'static + Persister<'a, CM, L, S>,
818836
ES::Target: 'static + EntropySource,
819837
CM::Target: AChannelManager,
820838
OM::Target: AOnionMessenger,
@@ -841,7 +859,7 @@ where
841859
if let Some(duration_since_epoch) = fetch_time() {
842860
if update_scorer(scorer, &event, duration_since_epoch) {
843861
log_trace!(logger, "Persisting scorer after update");
844-
if let Err(e) = persister.persist_scorer(&*scorer) {
862+
if let Err(e) = persister.persist_scorer(&*scorer).await {
845863
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
846864
// We opt not to abort early on persistence failure here as persisting
847865
// the scorer is non-critical and we still hope that it will have
@@ -919,6 +937,7 @@ where
919937
},
920938
mobile_interruptable_platform,
921939
fetch_time,
940+
true,
922941
)
923942
}
924943

@@ -1098,6 +1117,7 @@ impl BackgroundProcessor {
10981117
.expect("Time should be sometime after 1970"),
10991118
)
11001119
},
1120+
false,
11011121
)
11021122
});
11031123
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
@@ -1162,6 +1182,8 @@ mod tests {
11621182
use bitcoin::transaction::Version;
11631183
use bitcoin::transaction::{Transaction, TxOut};
11641184
use bitcoin::{Amount, ScriptBuf, Txid};
1185+
use core::future::Future;
1186+
use core::pin::Pin;
11651187
use core::sync::atomic::{AtomicBool, Ordering};
11661188
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
11671189
use lightning::chain::transaction::OutPoint;
@@ -1186,7 +1208,7 @@ mod tests {
11861208
use lightning::types::payment::PaymentHash;
11871209
use lightning::util::config::UserConfig;
11881210
use lightning::util::persist::{
1189-
KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY,
1211+
KVStore, KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY,
11901212
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
11911213
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
11921214
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
@@ -1483,6 +1505,42 @@ mod tests {
14831505
}
14841506
}
14851507

1508+
struct PersisterSyncWrapper(PersisterSync);
1509+
1510+
impl KVStore for PersisterSyncWrapper {
1511+
fn read(
1512+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
1513+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, lightning::io::Error>> + 'static + Send>> {
1514+
let res = self.0.read(primary_namespace, secondary_namespace, key);
1515+
1516+
Box::pin(async move { res })
1517+
}
1518+
1519+
fn write(
1520+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
1521+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
1522+
let res = self.0.write(primary_namespace, secondary_namespace, key, buf);
1523+
1524+
Box::pin(async move { res })
1525+
}
1526+
1527+
fn remove(
1528+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
1529+
) -> Result<(), lightning::io::Error> {
1530+
let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy);
1531+
1532+
return res;
1533+
}
1534+
1535+
fn list(
1536+
&self, primary_namespace: &str, secondary_namespace: &str,
1537+
) -> Result<Vec<String>, lightning::io::Error> {
1538+
let res = self.0.list(primary_namespace, secondary_namespace);
1539+
1540+
return res;
1541+
}
1542+
}
1543+
14861544
struct TestScorer {
14871545
event_expectations: Option<VecDeque<TestResult>>,
14881546
}
@@ -2107,9 +2165,9 @@ mod tests {
21072165
open_channel!(nodes[0], nodes[1], 100000);
21082166

21092167
let data_dir = nodes[0].kv_store.get_data_dir();
2110-
let persister = Arc::new(
2111-
PersisterSync::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2112-
);
2168+
let persister_sync =
2169+
PersisterSync::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test");
2170+
let persister = Arc::new(Arc::new(PersisterSyncWrapper(persister_sync)));
21132171

21142172
let bp_future = super::process_events_async(
21152173
persister,
@@ -2618,8 +2676,8 @@ mod tests {
26182676
let (_, nodes) =
26192677
create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
26202678
let data_dir = nodes[0].kv_store.get_data_dir();
2621-
let persister =
2622-
Arc::new(PersisterSync::new(data_dir).with_graph_persistence_notifier(sender));
2679+
let persister_sync = PersisterSync::new(data_dir).with_graph_persistence_notifier(sender);
2680+
let persister = Arc::new(Arc::new(PersisterSyncWrapper(persister_sync)));
26232681

26242682
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
26252683
let bp_future = super::process_events_async(
@@ -2835,7 +2893,8 @@ mod tests {
28352893

28362894
let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
28372895
let data_dir = nodes[0].kv_store.get_data_dir();
2838-
let persister = Arc::new(PersisterSync::new(data_dir));
2896+
let persister_sync = PersisterSync::new(data_dir);
2897+
let persister = Arc::new(Arc::new(PersisterSyncWrapper(persister_sync)));
28392898

28402899
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
28412900

0 commit comments

Comments
 (0)