diff --git a/hyperactor/src/actor.rs b/hyperactor/src/actor.rs index 7e8f4b24..c07122d8 100644 --- a/hyperactor/src/actor.rs +++ b/hyperactor/src/actor.rs @@ -529,6 +529,7 @@ impl ActorHandle { /// Signal the actor to drain its current messages and then stop. #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`. pub fn drain_and_stop(&self) -> Result<(), ActorError> { + tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_id()); self.cell.signal(Signal::DrainAndStop) } diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index c8f72db5..5e298c33 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -523,6 +523,7 @@ impl Proc { match entry.value().upgrade() { None => (), // the root's cell has been dropped Some(cell) => { + tracing::info!("sending stop signal to {}", cell.actor_id()); if let Err(err) = cell.signal(Signal::DrainAndStop) { tracing::error!( "{}: failed to send stop signal to pid {}: {:?}", @@ -855,6 +856,7 @@ impl Instance { /// Signal the actor to stop. #[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`. pub fn stop(&self) -> Result<(), ActorError> { + tracing::info!("Instance::stop called, {}", self.cell.actor_id()); self.cell.signal(Signal::DrainAndStop) } @@ -1053,6 +1055,7 @@ impl Instance { } signal = self.signal_receiver.recv() => { let signal = signal.map_err(ActorError::from); + tracing::debug!("Received signal {signal:?}"); match signal? { signal@(Signal::Stop | Signal::DrainAndStop) => { need_drain = matches!(signal, Signal::DrainAndStop); diff --git a/hyperactor_mesh/src/alloc/process.rs b/hyperactor_mesh/src/alloc/process.rs index 798fb77e..dc6c4b42 100644 --- a/hyperactor_mesh/src/alloc/process.rs +++ b/hyperactor_mesh/src/alloc/process.rs @@ -266,14 +266,17 @@ impl Child { }; let group = self.group.clone(); let stop_reason = self.stop_reason.clone(); + tracing::info!("spawning watchdog"); tokio::spawn(async move { let exit_timeout = hyperactor::config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT); #[allow(clippy::disallowed_methods)] if tokio::time::timeout(exit_timeout, exit_flag).await.is_err() { + tracing::info!("watchdog timeout, killing process"); let _ = stop_reason.set(ProcStopReason::Watchdog); group.fail(); } + tracing::info!("Watchdog task exit"); }); } diff --git a/hyperactor_mesh/src/alloc/remoteprocess.rs b/hyperactor_mesh/src/alloc/remoteprocess.rs index 783e81cd..41043d03 100644 --- a/hyperactor_mesh/src/alloc/remoteprocess.rs +++ b/hyperactor_mesh/src/alloc/remoteprocess.rs @@ -171,10 +171,13 @@ impl RemoteProcessAllocator { if let Some(active_allocation) = active_allocation.take() { tracing::info!("previous alloc found, stopping"); active_allocation.cancel_token.cancel(); - // should be ok to wait even if original caller has gone since heartbeat - // will eventually timeout and exit the loop. - if let Err(e) = active_allocation.handle.await { - tracing::error!("allocation handler failed: {}", e); + match active_allocation.handle.await { + Ok(_) => { + tracing::info!("allocation stopped.") + } + Err(e) => { + tracing::error!("allocation handler failed: {}", e); + } } } } @@ -197,6 +200,7 @@ impl RemoteProcessAllocator { ensure_previous_alloc_stopped(&mut active_allocation).await; + tracing::info!("allocating..."); match process_allocator.allocate(spec.clone()).await { Ok(alloc) => { let cancel_token = CancellationToken::new(); @@ -259,6 +263,7 @@ impl RemoteProcessAllocator { heartbeat_interval: Duration, cancel_token: CancellationToken, ) { + tracing::info!("handle allocation request, bootstrap_addr: {bootstrap_addr}"); // start proc message forwarder let (forwarder_addr, forwarder_rx) = match channel::serve(ChannelAddr::any(bootstrap_addr.transport())).await { @@ -326,6 +331,7 @@ impl RemoteProcessAllocator { heartbeat_interval: Duration, cancel_token: CancellationToken, ) { + tracing::info!("starting handle allocation loop"); let tx = match channel::dial(bootstrap_addr) { Ok(tx) => tx, Err(err) => { @@ -417,7 +423,7 @@ impl RemoteProcessAllocator { } } } - tracing::debug!("allocation handler loop exited"); + tracing::info!("allocation handler loop exited"); if running { tracing::info!("stopping processes"); if let Err(e) = alloc.stop_and_wait().await { diff --git a/monarch_hyperactor/src/actor_mesh.rs b/monarch_hyperactor/src/actor_mesh.rs index c769f969..6dcb76b6 100644 --- a/monarch_hyperactor/src/actor_mesh.rs +++ b/monarch_hyperactor/src/actor_mesh.rs @@ -332,6 +332,10 @@ impl PythonActorMeshRef { impl Drop for PythonActorMesh { fn drop(&mut self) { + tracing::info!( + "Dropping PythonActorMesh: {}", + self.inner.borrow().unwrap().name() + ); self.monitor.abort(); } } @@ -390,6 +394,7 @@ async fn get_next( }, Some(event) => PyActorSupervisionEvent::from(event.clone()), }; + tracing::info!("recv supervision event: {supervision_event:?}"); Python::with_gil(|py| supervision_event.into_py_any(py)) } @@ -476,6 +481,7 @@ impl MonitoredPythonOncePortReceiver { name = "ActorSupervisionEvent", module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh" )] +#[derive(Debug)] pub struct PyActorSupervisionEvent { /// Actor ID of the actor where supervision event originates from. #[pyo3(get)]