Skip to content

[monarch] add some more logs #574

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ impl<A: Actor> ActorHandle<A> {
/// Signal the actor to drain its current messages and then stop.
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`.
pub fn drain_and_stop(&self) -> Result<(), ActorError> {
tracing::info!("ActorHandle::drain_and_stop called: {}", self.actor_id());
self.cell.signal(Signal::DrainAndStop)
}

Expand Down
3 changes: 3 additions & 0 deletions hyperactor/src/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ impl Proc {
match entry.value().upgrade() {
None => (), // the root's cell has been dropped
Some(cell) => {
tracing::info!("sending stop signal to {}", cell.actor_id());
if let Err(err) = cell.signal(Signal::DrainAndStop) {
tracing::error!(
"{}: failed to send stop signal to pid {}: {:?}",
Expand Down Expand Up @@ -855,6 +856,7 @@ impl<A: Actor> Instance<A> {
/// Signal the actor to stop.
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `ActorError`.
pub fn stop(&self) -> Result<(), ActorError> {
tracing::info!("Instance::stop called, {}", self.cell.actor_id());
self.cell.signal(Signal::DrainAndStop)
}

Expand Down Expand Up @@ -1053,6 +1055,7 @@ impl<A: Actor> Instance<A> {
}
signal = self.signal_receiver.recv() => {
let signal = signal.map_err(ActorError::from);
tracing::debug!("Received signal {signal:?}");
match signal? {
signal@(Signal::Stop | Signal::DrainAndStop) => {
need_drain = matches!(signal, Signal::DrainAndStop);
Expand Down
3 changes: 3 additions & 0 deletions hyperactor_mesh/src/alloc/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,17 @@ impl Child {
};
let group = self.group.clone();
let stop_reason = self.stop_reason.clone();
tracing::info!("spawning watchdog");
tokio::spawn(async move {
let exit_timeout =
hyperactor::config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
#[allow(clippy::disallowed_methods)]
if tokio::time::timeout(exit_timeout, exit_flag).await.is_err() {
tracing::info!("watchdog timeout, killing process");
let _ = stop_reason.set(ProcStopReason::Watchdog);
group.fail();
}
tracing::info!("Watchdog task exit");
});
}

Expand Down
16 changes: 11 additions & 5 deletions hyperactor_mesh/src/alloc/remoteprocess.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,13 @@ impl RemoteProcessAllocator {
if let Some(active_allocation) = active_allocation.take() {
tracing::info!("previous alloc found, stopping");
active_allocation.cancel_token.cancel();
// should be ok to wait even if original caller has gone since heartbeat
// will eventually timeout and exit the loop.
if let Err(e) = active_allocation.handle.await {
tracing::error!("allocation handler failed: {}", e);
match active_allocation.handle.await {
Ok(_) => {
tracing::info!("allocation stopped.")
}
Err(e) => {
tracing::error!("allocation handler failed: {}", e);
}
}
}
}
Expand All @@ -197,6 +200,7 @@ impl RemoteProcessAllocator {

ensure_previous_alloc_stopped(&mut active_allocation).await;

tracing::info!("allocating...");
match process_allocator.allocate(spec.clone()).await {
Ok(alloc) => {
let cancel_token = CancellationToken::new();
Expand Down Expand Up @@ -259,6 +263,7 @@ impl RemoteProcessAllocator {
heartbeat_interval: Duration,
cancel_token: CancellationToken,
) {
tracing::info!("handle allocation request, bootstrap_addr: {bootstrap_addr}");
// start proc message forwarder
let (forwarder_addr, forwarder_rx) =
match channel::serve(ChannelAddr::any(bootstrap_addr.transport())).await {
Expand Down Expand Up @@ -326,6 +331,7 @@ impl RemoteProcessAllocator {
heartbeat_interval: Duration,
cancel_token: CancellationToken,
) {
tracing::info!("starting handle allocation loop");
let tx = match channel::dial(bootstrap_addr) {
Ok(tx) => tx,
Err(err) => {
Expand Down Expand Up @@ -417,7 +423,7 @@ impl RemoteProcessAllocator {
}
}
}
tracing::debug!("allocation handler loop exited");
tracing::info!("allocation handler loop exited");
if running {
tracing::info!("stopping processes");
if let Err(e) = alloc.stop_and_wait().await {
Expand Down
6 changes: 6 additions & 0 deletions monarch_hyperactor/src/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ impl PythonActorMeshRef {

impl Drop for PythonActorMesh {
fn drop(&mut self) {
tracing::info!(
"Dropping PythonActorMesh: {}",
self.inner.borrow().unwrap().name()
);
self.monitor.abort();
}
}
Expand Down Expand Up @@ -390,6 +394,7 @@ async fn get_next(
},
Some(event) => PyActorSupervisionEvent::from(event.clone()),
};
tracing::info!("recv supervision event: {supervision_event:?}");

Python::with_gil(|py| supervision_event.into_py_any(py))
}
Expand Down Expand Up @@ -476,6 +481,7 @@ impl MonitoredPythonOncePortReceiver {
name = "ActorSupervisionEvent",
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
)]
#[derive(Debug)]
pub struct PyActorSupervisionEvent {
/// Actor ID of the actor where supervision event originates from.
#[pyo3(get)]
Expand Down
Loading