Skip to content

Commit bc7a6c0

Browse files
: supervision: new enum Unhealthy (#569)
Summary: - introduced new type `supervision::Unhealthy` ```rust #[derive(Debug, Clone)] pub(crate) enum Unhealthy<Event> { SoFarSoGood, // Still healthy StreamClosed, // Event stream closed Crashed(Event), // Bad health event received } ``` - in `PyProcMesh` - replace: - from`Arc<Mutex<Option<Option<ProcEvent>>>>` - to `Arc<Mutex<Unhealthy<ProcEvent>>>` - in `PythonActorMesh` - replace: - from: `Arc<Mutex<Option<Option<ActorSupervisionEvent>>>>` - to: `Arc<Mutex<Unhealthy<ActorSupervisionEvent>>` Differential Revision: D78498195
1 parent cef93c7 commit bc7a6c0

File tree

3 files changed

+71
-36
lines changed

3 files changed

+71
-36
lines changed

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::runtime::signal_safe_block_on;
4444
use crate::selection::PySelection;
4545
use crate::shape::PyShape;
4646
use crate::supervision::SupervisionError;
47+
use crate::supervision::Unhealthy;
4748

4849
#[pyclass(
4950
name = "PythonActorMesh",
@@ -53,7 +54,7 @@ pub struct PythonActorMesh {
5354
inner: SharedCell<RootActorMesh<'static, PythonActor>>,
5455
client: PyMailbox,
5556
_keepalive: Keepalive,
56-
unhealthy_event: Arc<std::sync::Mutex<Option<Option<ActorSupervisionEvent>>>>,
57+
unhealthy_event: Arc<std::sync::Mutex<Unhealthy<ActorSupervisionEvent>>>,
5758
user_monitor_sender: tokio::sync::broadcast::Sender<Option<ActorSupervisionEvent>>,
5859
monitor: tokio::task::JoinHandle<()>,
5960
}
@@ -69,11 +70,11 @@ impl PythonActorMesh {
6970
) -> Self {
7071
let (user_monitor_sender, _) =
7172
tokio::sync::broadcast::channel::<Option<ActorSupervisionEvent>>(1);
72-
let unhealthy_event = Arc::new(std::sync::Mutex::new(None));
73+
let unhealthy_event = Arc::new(std::sync::Mutex::new(Unhealthy::SoFarSoGood));
7374
let monitor = tokio::spawn(Self::actor_mesh_monitor(
7475
events,
7576
user_monitor_sender.clone(),
76-
unhealthy_event.clone(),
77+
Arc::clone(&unhealthy_event),
7778
));
7879
Self {
7980
inner,
@@ -90,15 +91,19 @@ impl PythonActorMesh {
9091
async fn actor_mesh_monitor(
9192
mut events: ActorSupervisionEvents,
9293
user_sender: tokio::sync::broadcast::Sender<Option<ActorSupervisionEvent>>,
93-
unhealthy_event: Arc<std::sync::Mutex<Option<Option<ActorSupervisionEvent>>>>,
94+
unhealthy_event: Arc<std::sync::Mutex<Unhealthy<ActorSupervisionEvent>>>,
9495
) {
9596
loop {
9697
let event = events.next().await;
9798
let mut inner_unhealthy_event = unhealthy_event.lock().unwrap();
98-
*inner_unhealthy_event = Some(event.clone());
99+
match &event {
100+
None => *inner_unhealthy_event = Unhealthy::StreamClosed,
101+
Some(event) => *inner_unhealthy_event = Unhealthy::Crashed(event.clone()),
102+
}
99103

100-
// Ignore the sender error when there is no receiver, which happens when there
101-
// is no active requests to this mesh.
104+
// Ignore the sender error when there is no receiver,
105+
// which happens when there is no active requests to this
106+
// mesh.
102107
let _ = user_sender.send(event.clone());
103108

104109
if event.is_none() {
@@ -130,11 +135,20 @@ impl PythonActorMesh {
130135
.unhealthy_event
131136
.lock()
132137
.expect("failed to acquire unhealthy_event lock");
133-
if let Some(ref event) = *unhealthy_event {
134-
return Err(PyRuntimeError::new_err(format!(
135-
"actor mesh is unhealthy with reason: {:?}",
136-
event
137-
)));
138+
139+
match &*unhealthy_event {
140+
Unhealthy::SoFarSoGood => (),
141+
Unhealthy::Crashed(event) => {
142+
return Err(PyRuntimeError::new_err(format!(
143+
"actor mesh is unhealthy with reason: {:?}",
144+
event
145+
)));
146+
}
147+
Unhealthy::StreamClosed => {
148+
return Err(PyRuntimeError::new_err(
149+
"actor mesh is stopped due to proc mesh shutdown".to_string(),
150+
));
151+
}
138152
}
139153

140154
self.try_inner()?
@@ -154,15 +168,16 @@ impl PythonActorMesh {
154168
.lock()
155169
.expect("failed to acquire unhealthy_event lock");
156170

157-
Ok(unhealthy_event.as_ref().map(|event| match event {
158-
None => PyActorSupervisionEvent {
171+
match &*unhealthy_event {
172+
Unhealthy::SoFarSoGood => Ok(None),
173+
Unhealthy::StreamClosed => Ok(Some(PyActorSupervisionEvent {
159174
// Dummy actor as place holder to indicate the whole mesh is stopped
160175
// TODO(albertli): remove this when pushing all supervision logic to rust.
161176
actor_id: id!(default[0].actor[0]).into(),
162177
actor_status: "actor mesh is stopped due to proc mesh shutdown".to_string(),
163-
},
164-
Some(event) => PyActorSupervisionEvent::from(event.clone()),
165-
}))
178+
})),
179+
Unhealthy::Crashed(event) => Ok(Some(PyActorSupervisionEvent::from(event.clone()))),
180+
}
166181
}
167182

168183
// Consider defining a "PythonActorRef", which carries specifically

monarch_hyperactor/src/proc_mesh.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::mailbox::PyMailbox;
4444
use crate::runtime::signal_safe_block_on;
4545
use crate::shape::PyShape;
4646
use crate::supervision::SupervisionError;
47+
use crate::supervision::Unhealthy;
4748

4849
// A wrapper around `ProcMesh` which keeps track of all `RootActorMesh`s that it spawns.
4950
pub struct TrackedProcMesh {
@@ -117,7 +118,7 @@ pub struct PyProcMesh {
117118
proc_events: SharedCell<Mutex<ProcEvents>>,
118119
user_monitor_receiver: SharedCell<Mutex<mpsc::UnboundedReceiver<ProcEvent>>>,
119120
user_monitor_registered: Arc<AtomicBool>,
120-
unhealthy_event: Arc<Mutex<Option<Option<ProcEvent>>>>,
121+
unhealthy_event: Arc<Mutex<Unhealthy<ProcEvent>>>,
121122
}
122123

123124
fn allocate_proc_mesh<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<Bound<'py, PyAny>> {
@@ -162,15 +163,15 @@ impl PyProcMesh {
162163
let proc_events = SharedCell::from(Mutex::new(proc_mesh.events().unwrap()));
163164
let (user_sender, user_receiver) = mpsc::unbounded_channel::<ProcEvent>();
164165
let user_monitor_registered = Arc::new(AtomicBool::new(false));
165-
let unhealthy_event = Arc::new(Mutex::new(None));
166+
let unhealthy_event = Arc::new(Mutex::new(Unhealthy::SoFarSoGood));
166167
let monitor = tokio::spawn(Self::default_proc_mesh_monitor(
167168
proc_events
168169
.borrow()
169170
.expect("borrowing immediately after creation"),
170171
world_id,
171172
user_sender,
172-
user_monitor_registered.clone(),
173-
unhealthy_event.clone(),
173+
Arc::clone(&user_monitor_registered),
174+
Arc::clone(&unhealthy_event),
174175
));
175176
Self {
176177
inner: SharedCell::from(TrackedProcMesh::from(proc_mesh)),
@@ -188,7 +189,7 @@ impl PyProcMesh {
188189
world_id: WorldId,
189190
user_sender: mpsc::UnboundedSender<ProcEvent>,
190191
user_monitor_registered: Arc<AtomicBool>,
191-
unhealthy_event: Arc<Mutex<Option<Option<ProcEvent>>>>,
192+
unhealthy_event: Arc<Mutex<Unhealthy<ProcEvent>>>,
192193
) {
193194
loop {
194195
let mut proc_events = events.lock().await;
@@ -197,15 +198,15 @@ impl PyProcMesh {
197198
let mut inner_unhealthy_event = unhealthy_event.lock().await;
198199
match event {
199200
None => {
200-
*inner_unhealthy_event = Some(None);
201+
*inner_unhealthy_event = Unhealthy::StreamClosed;
201202
tracing::info!("ProcMesh {}: alloc has stopped", world_id);
202203
break;
203204
}
204205
Some(event) => match event {
205206
// Graceful stops can be ignored.
206207
ProcEvent::Stopped(_, ProcStopReason::Stopped) => continue,
207208
event => {
208-
*inner_unhealthy_event = Some(Some(event.clone()));
209+
*inner_unhealthy_event = Unhealthy::Crashed(event.clone());
209210
tracing::info!("ProcMesh {}: {}", world_id, event);
210211
if user_monitor_registered.load(std::sync::atomic::Ordering::SeqCst) {
211212
if user_sender.send(event).is_err() {
@@ -218,7 +219,7 @@ impl PyProcMesh {
218219
}
219220
_ = events.preempted() => {
220221
let mut inner_unhealthy_event = unhealthy_event.lock().await;
221-
*inner_unhealthy_event = Some(None);
222+
*inner_unhealthy_event = Unhealthy::StreamClosed;
222223
tracing::info!("ProcMesh {}: is stopped", world_id);
223224
break;
224225
}
@@ -259,19 +260,18 @@ impl PyProcMesh {
259260
}
260261
}
261262

262-
// Return with error if the mesh is unhealthy.
263-
async fn ensure_mesh_healthy(
264-
unhealthy_event: &Mutex<Option<Option<ProcEvent>>>,
265-
) -> Result<(), PyErr> {
263+
async fn ensure_mesh_healthy(unhealthy_event: &Mutex<Unhealthy<ProcEvent>>) -> Result<(), PyErr> {
266264
let locked = unhealthy_event.lock().await;
267-
if let Some(event) = &*locked {
268-
let msg = match event {
269-
Some(e) => format!("proc mesh is stopped with reason: {:?}", e),
270-
None => "proc mesh is stopped with reason: alloc is stopped".to_string(),
271-
};
272-
return Err(SupervisionError::new_err(msg));
265+
match &*locked {
266+
Unhealthy::SoFarSoGood => Ok(()),
267+
Unhealthy::StreamClosed => Err(SupervisionError::new_err(
268+
"proc mesh is stopped with reason: alloc is stopped".to_string(),
269+
)),
270+
Unhealthy::Crashed(event) => Err(SupervisionError::new_err(format!(
271+
"proc mesh is stopped with reason: {:?}",
272+
event
273+
))),
273274
}
274-
Ok(())
275275
}
276276

277277
#[pymethods]

monarch_hyperactor/src/supervision.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,23 @@ pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
2323
module.add("SupervisionError", py.get_type::<SupervisionError>())?;
2424
Ok(())
2525
}
26+
27+
// Shared between mesh types.
28+
#[derive(Debug, Clone)]
29+
pub(crate) enum Unhealthy<Event> {
30+
SoFarSoGood, // Still healthy
31+
StreamClosed, // Event stream closed
32+
Crashed(Event), // Bad health event received
33+
}
34+
35+
impl<Event> Unhealthy<Event> {
36+
#[allow(dead_code)] // No uses yet.
37+
pub(crate) fn is_healthy(&self) -> bool {
38+
matches!(self, Unhealthy::SoFarSoGood)
39+
}
40+
41+
#[allow(dead_code)] // No uses yet.
42+
pub(crate) fn is_crashed(&self) -> bool {
43+
matches!(self, Unhealthy::Crashed(_))
44+
}
45+
}

0 commit comments

Comments
 (0)