Skip to content

Commit 2014cf5

Browse files
committed
Convert process_events_async to take an asynchronous Persister
Also provide a wrapper to allow a sync kvstore to be used.
1 parent cf5bae8 commit 2014cf5

File tree

2 files changed

+273
-58
lines changed

2 files changed

+273
-58
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 172 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ use lightning::sign::ChangeDestinationSourceSync;
4040
use lightning::sign::EntropySource;
4141
use lightning::sign::OutputSpender;
4242
use lightning::util::logger::Logger;
43-
use lightning::util::persist::{KVStoreSync, PersisterSync};
43+
use lightning::util::persist::KVStoreSync;
44+
use lightning::util::persist::Persister;
45+
use lightning::util::persist::PersisterSync;
4446
use lightning::util::sweep::OutputSweeper;
4547
#[cfg(feature = "std")]
4648
use lightning::util::sweep::OutputSweeperSync;
@@ -50,7 +52,9 @@ use lightning_rapid_gossip_sync::RapidGossipSync;
5052

5153
use lightning_liquidity::ALiquidityManager;
5254

55+
use core::future::Future;
5356
use core::ops::Deref;
57+
use core::pin::Pin;
5458
use core::time::Duration;
5559

5660
#[cfg(feature = "std")]
@@ -311,6 +315,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
311315
true
312316
}
313317

318+
macro_rules! maybe_await {
319+
(true, $e:expr) => {
320+
$e.await
321+
};
322+
(false, $e:expr) => {
323+
$e
324+
};
325+
}
326+
314327
macro_rules! define_run_body {
315328
(
316329
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
@@ -319,7 +332,7 @@ macro_rules! define_run_body {
319332
$peer_manager: ident, $gossip_sync: ident,
320333
$process_sweeper: expr,
321334
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
322-
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
335+
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async: tt,
323336
) => { {
324337
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
325338
$channel_manager.get_cm().timer_tick_occurred();
@@ -375,7 +388,7 @@ macro_rules! define_run_body {
375388

376389
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
377390
log_trace!($logger, "Persisting ChannelManager...");
378-
$persister.persist_manager(&$channel_manager)?;
391+
maybe_await!($async, $persister.persist_manager(&$channel_manager))?;
379392
log_trace!($logger, "Done persisting ChannelManager.");
380393
}
381394
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
@@ -436,7 +449,7 @@ macro_rules! define_run_body {
436449
log_trace!($logger, "Persisting network graph.");
437450
}
438451

439-
if let Err(e) = $persister.persist_graph(network_graph) {
452+
if let Err(e) = maybe_await!($async, $persister.persist_graph(network_graph)) {
440453
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
441454
}
442455

@@ -464,7 +477,7 @@ macro_rules! define_run_body {
464477
} else {
465478
log_trace!($logger, "Persisting scorer");
466479
}
467-
if let Err(e) = $persister.persist_scorer(&scorer) {
480+
if let Err(e) = maybe_await!($async, $persister.persist_scorer(&scorer)) {
468481
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
469482
}
470483
}
@@ -487,16 +500,16 @@ macro_rules! define_run_body {
487500
// After we exit, ensure we persist the ChannelManager one final time - this avoids
488501
// some races where users quit while channel updates were in-flight, with
489502
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
490-
$persister.persist_manager(&$channel_manager)?;
503+
maybe_await!($async, $persister.persist_manager(&$channel_manager))?;
491504

492505
// Persist Scorer on exit
493506
if let Some(ref scorer) = $scorer {
494-
$persister.persist_scorer(&scorer)?;
507+
maybe_await!($async, $persister.persist_scorer(&scorer))?;
495508
}
496509

497510
// Persist NetworkGraph on exit
498511
if let Some(network_graph) = $gossip_sync.network_graph() {
499-
$persister.persist_graph(network_graph)?;
512+
maybe_await!($async, $persister.persist_graph(network_graph))?;
500513
}
501514

502515
Ok(())
@@ -643,22 +656,30 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
643656
/// ```
644657
/// # use lightning::io;
645658
/// # use lightning::events::ReplayEvent;
646-
/// # use lightning::util::sweep::OutputSweeper;
647659
/// # use std::sync::{Arc, RwLock};
648660
/// # use std::sync::atomic::{AtomicBool, Ordering};
649661
/// # use std::time::SystemTime;
650-
/// # use lightning_background_processor::{process_events_async, GossipSync};
662+
/// # use lightning_background_processor::{process_events_full_async, GossipSync};
663+
/// # use core::future::Future;
664+
/// # use core::pin::Pin;
651665
/// # struct Logger {}
652666
/// # impl lightning::util::logger::Logger for Logger {
653667
/// # fn log(&self, _record: lightning::util::logger::Record) {}
654668
/// # }
655-
/// # struct Store {}
656-
/// # impl lightning::util::persist::KVStoreSync for Store {
669+
/// # struct StoreSync {}
670+
/// # impl lightning::util::persist::KVStoreSync for StoreSync {
657671
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
658672
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
659673
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
660674
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
661675
/// # }
676+
/// # struct Store {}
677+
/// # impl lightning::util::persist::KVStore for Store {
678+
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
679+
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
680+
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
681+
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
682+
/// # }
662683
/// # struct EventHandler {}
663684
/// # impl EventHandler {
664685
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
@@ -669,22 +690,22 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
669690
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
670691
/// # fn disconnect_socket(&mut self) {}
671692
/// # }
672-
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store>, Arc<lightning::sign::KeysManager>>;
693+
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
673694
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
674695
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
675696
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
676697
/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
677698
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
678699
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
679-
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, Store>;
680-
/// #
700+
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
701+
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O>>;
702+
///
681703
/// # struct Node<
682704
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
683705
/// # F: lightning::chain::Filter + Send + Sync + 'static,
684706
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
685707
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
686708
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
687-
/// # K: lightning::util::persist::KVStoreSync + Send + Sync + 'static,
688709
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
689710
/// # > {
690711
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
@@ -697,7 +718,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
697718
/// # persister: Arc<Store>,
698719
/// # logger: Arc<Logger>,
699720
/// # scorer: Arc<Scorer>,
700-
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O>>>,
721+
/// # sweeper: Arc<OutputSweeper<B, D, FE, F, O>>,
701722
/// # }
702723
/// #
703724
/// # async fn setup_background_processing<
@@ -706,10 +727,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
706727
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
707728
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
708729
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
709-
/// # K: lightning::util::persist::KVStoreSync + Send + Sync + 'static,
710730
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
711-
/// # >(node: Node<B, F, FE, UL, D, K, O>) {
712-
/// let background_persister = Arc::clone(&node.persister);
731+
/// # >(node: Node<B, F, FE, UL, D, O>) {
732+
/// let background_persister = Arc::new(Arc::clone(&node.persister));
713733
/// let background_event_handler = Arc::clone(&node.event_handler);
714734
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
715735
/// let background_chan_man = Arc::clone(&node.channel_manager);
@@ -744,7 +764,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
744764
doc = " let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();"
745765
)]
746766
#[cfg_attr(not(feature = "std"), doc = " rt.block_on(async move {")]
747-
/// process_events_async(
767+
/// process_events_full_async(
748768
/// background_persister,
749769
/// |e| background_event_handler.handle_event(e),
750770
/// background_chain_mon,
@@ -769,7 +789,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
769789
#[cfg_attr(feature = "std", doc = " handle.await.unwrap()")]
770790
/// # }
771791
///```
772-
pub async fn process_events_async<
792+
pub async fn process_events_full_async<
773793
'a,
774794
UL: 'static + Deref,
775795
CF: 'static + Deref,
@@ -814,7 +834,7 @@ where
814834
F::Target: 'static + FeeEstimator,
815835
L::Target: 'static + Logger,
816836
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
817-
PS::Target: 'static + PersisterSync<'a, CM, L, S>,
837+
PS::Target: 'static + Persister<'a, CM, L, S>,
818838
ES::Target: 'static + EntropySource,
819839
CM::Target: AChannelManager,
820840
OM::Target: AOnionMessenger,
@@ -841,7 +861,7 @@ where
841861
if let Some(duration_since_epoch) = fetch_time() {
842862
if update_scorer(scorer, &event, duration_since_epoch) {
843863
log_trace!(logger, "Persisting scorer after update");
844-
if let Err(e) = persister.persist_scorer(&*scorer) {
864+
if let Err(e) = persister.persist_scorer(&*scorer).await {
845865
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
846866
// We opt not to abort early on persistence failure here as persisting
847867
// the scorer is non-critical and we still hope that it will have
@@ -919,9 +939,136 @@ where
919939
},
920940
mobile_interruptable_platform,
921941
fetch_time,
942+
true,
922943
)
923944
}
924945

946+
/// A wrapper that turns a synchronous [`PersisterSync`] into an async [`Persister`].
947+
struct PersisterSyncWrapper<'a, PS, CM, L, S> {
948+
inner: PS,
949+
_phantom: core::marker::PhantomData<(&'a (), CM, L, S)>,
950+
}
951+
952+
impl<'a, PS, CM, L, S> Deref for PersisterSyncWrapper<'_, PS, CM, L, S> {
953+
type Target = Self;
954+
fn deref(&self) -> &Self {
955+
self
956+
}
957+
}
958+
959+
impl<'a, PS, CM, L, S> PersisterSyncWrapper<'a, PS, CM, L, S> {
960+
/// Constructs a new [`PersisterSyncWrapper`] from the given sync persister.
961+
pub fn new(inner: PS) -> Self {
962+
Self { inner, _phantom: core::marker::PhantomData }
963+
}
964+
}
965+
966+
impl<'a, PS: Deref, CM: Deref + 'static, L: Deref + 'static, S: Deref + 'static>
967+
Persister<'a, CM, L, S> for PersisterSyncWrapper<'a, PS, CM, L, S>
968+
where
969+
PS::Target: PersisterSync<'a, CM, L, S>,
970+
CM::Target: AChannelManager,
971+
L::Target: Logger,
972+
S::Target: WriteableScore<'a>,
973+
{
974+
fn persist_manager(
975+
&self, channel_manager: &CM,
976+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
977+
let res = self.inner.persist_manager(&channel_manager);
978+
Box::pin(async move { res })
979+
}
980+
981+
fn persist_graph(
982+
&self, network_graph: &NetworkGraph<L>,
983+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
984+
let res = self.inner.persist_graph(&network_graph);
985+
Box::pin(async move { res })
986+
}
987+
988+
fn persist_scorer(
989+
&self, scorer: &S,
990+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
991+
let res = self.inner.persist_scorer(&scorer);
992+
Box::pin(async move { res })
993+
}
994+
}
995+
996+
/// Async events processor that is based on [`process_events_async`] but allows for [`PersisterSync`] to be used for
997+
/// synchronous background persistence.
998+
pub async fn process_events_async<
999+
UL: 'static + Deref,
1000+
CF: 'static + Deref,
1001+
T: 'static + Deref,
1002+
F: 'static + Deref,
1003+
G: 'static + Deref<Target = NetworkGraph<L>>,
1004+
L: 'static + Deref + Send + Sync,
1005+
P: 'static + Deref,
1006+
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
1007+
EventHandler: Fn(Event) -> EventHandlerFuture,
1008+
PS: 'static + Deref + Send + Sync,
1009+
ES: 'static + Deref + Send,
1010+
M: 'static
1011+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
1012+
+ Send
1013+
+ Sync,
1014+
CM: 'static + Deref + Send + Sync,
1015+
OM: 'static + Deref,
1016+
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>>,
1017+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
1018+
PM: 'static + Deref,
1019+
LM: 'static + Deref,
1020+
D: 'static + Deref,
1021+
O: 'static + Deref,
1022+
K: 'static + Deref,
1023+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
1024+
S: 'static + Deref<Target = SC> + Send + Sync,
1025+
SC: for<'b> WriteableScore<'b>,
1026+
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1027+
Sleeper: Fn(Duration) -> SleepFuture,
1028+
FetchTime: Fn() -> Option<Duration>,
1029+
>(
1030+
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
1031+
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1032+
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1033+
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
1034+
) -> Result<(), lightning::io::Error>
1035+
where
1036+
UL::Target: 'static + UtxoLookup,
1037+
CF::Target: 'static + chain::Filter,
1038+
T::Target: 'static + BroadcasterInterface,
1039+
F::Target: 'static + FeeEstimator,
1040+
L::Target: 'static + Logger,
1041+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1042+
PS::Target: 'static + PersisterSync<'static, CM, L, S>,
1043+
ES::Target: 'static + EntropySource,
1044+
CM::Target: AChannelManager,
1045+
OM::Target: AOnionMessenger,
1046+
PM::Target: APeerManager,
1047+
LM::Target: ALiquidityManager,
1048+
O::Target: 'static + OutputSpender,
1049+
D::Target: 'static + ChangeDestinationSource,
1050+
K::Target: 'static + KVStoreSync,
1051+
{
1052+
let persister = PersisterSyncWrapper::<'static, PS, CM, L, S>::new(persister);
1053+
process_events_full_async(
1054+
persister,
1055+
event_handler,
1056+
chain_monitor,
1057+
channel_manager,
1058+
onion_messenger,
1059+
gossip_sync,
1060+
peer_manager,
1061+
liquidity_manager,
1062+
sweeper,
1063+
logger,
1064+
scorer,
1065+
sleeper,
1066+
mobile_interruptable_platform,
1067+
fetch_time,
1068+
)
1069+
.await
1070+
}
1071+
9251072
#[cfg(feature = "std")]
9261073
impl BackgroundProcessor {
9271074
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
@@ -1098,6 +1245,7 @@ impl BackgroundProcessor {
10981245
.expect("Time should be sometime after 1970"),
10991246
)
11001247
},
1248+
false,
11011249
)
11021250
});
11031251
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }

0 commit comments

Comments
 (0)