Skip to content

Commit 1fa2092

Browse files
: actor: port receiver supervision (#578)
Summary: Pull Request resolved: #578 Differential Revision: D78528860
1 parent bc7a6c0 commit 1fa2092

File tree

5 files changed

+75
-118
lines changed

5 files changed

+75
-118
lines changed

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -190,14 +190,32 @@ impl PythonActorMesh {
190190
.map(PyActorId::from))
191191
}
192192

193-
// Start monitoring the actor mesh by subscribing to its supervision events. For each supervision
194-
// event, it is consumed by PythonActorMesh first, then gets sent to the monitor for user to consume.
195-
fn monitor<'py>(&self, py: Python<'py>) -> PyResult<PyObject> {
196-
let receiver = self.user_monitor_sender.subscribe();
197-
let monitor_instance = PyActorMeshMonitor {
198-
receiver: SharedCell::from(Mutex::new(receiver)),
193+
fn supervise_port<'py>(
194+
&self,
195+
py: Python<'py>,
196+
receiver: &PythonPortReceiver,
197+
) -> PyResult<PyObject> {
198+
let rx = MonitoredPythonPortReceiver {
199+
inner: receiver.inner(),
200+
monitor: PyActorMeshMonitor {
201+
receiver: SharedCell::from(Mutex::new(self.user_monitor_sender.subscribe())),
202+
},
199203
};
200-
Ok(monitor_instance.into_py(py))
204+
Ok(rx.into_py(py))
205+
}
206+
207+
fn supervise_once_port<'py>(
208+
&self,
209+
py: Python<'py>,
210+
receiver: &PythonOncePortReceiver,
211+
) -> PyResult<PyObject> {
212+
let rx = MonitoredPythonOncePortReceiver {
213+
inner: receiver.inner(),
214+
monitor: PyActorMeshMonitor {
215+
receiver: SharedCell::from(Mutex::new(self.user_monitor_sender.subscribe())),
216+
},
217+
};
218+
Ok(rx.into_py(py))
201219
}
202220

203221
#[pyo3(signature = (**kwargs))]
@@ -349,27 +367,22 @@ impl Drop for PythonActorMesh {
349367
}
350368
}
351369

370+
// `PyActorMeshMonitor` is not accessed directly from Python. It is
371+
// marked with `#[pyclass]` so it can be used as a field inside
372+
// `MonitoredPythonPortReceiver`.
352373
#[pyclass(
353374
name = "ActorMeshMonitor",
354375
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
355376
)]
356-
pub struct PyActorMeshMonitor {
377+
struct PyActorMeshMonitor {
357378
receiver: SharedCell<Mutex<tokio::sync::broadcast::Receiver<Option<ActorSupervisionEvent>>>>,
358379
}
359380

360-
#[pymethods]
361381
impl PyActorMeshMonitor {
362-
fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
363-
slf
364-
}
365-
366-
pub fn __anext__(&self, py: Python<'_>) -> PyResult<PyObject> {
367-
let receiver = self.receiver.clone();
368-
Ok(pyo3_async_runtimes::tokio::future_into_py(py, get_next(receiver))?.into())
382+
fn __repr__(&self) -> &'static str {
383+
"<ActorMeshMonitor>"
369384
}
370-
}
371385

372-
impl PyActorMeshMonitor {
373386
pub async fn next(&self) -> PyResult<PyObject> {
374387
get_next(self.receiver.clone()).await
375388
}
@@ -407,25 +420,21 @@ async fn get_next(
407420
Ok(Python::with_gil(|py| supervision_event.into_py(py)))
408421
}
409422

410-
// TODO(albertli): this is temporary remove this when pushing all supervision logic to rust.
423+
// Values of this (private) type can only be created by calling
424+
// `PythonActorMesh::supervise_port()`.
411425
#[pyclass(
412426
name = "MonitoredPortReceiver",
413427
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
414428
)]
415-
pub(super) struct MonitoredPythonPortReceiver {
429+
struct MonitoredPythonPortReceiver {
416430
inner: Arc<tokio::sync::Mutex<PortReceiver<PythonMessage>>>,
417431
monitor: PyActorMeshMonitor,
418432
}
419433

420434
#[pymethods]
421435
impl MonitoredPythonPortReceiver {
422-
#[new]
423-
fn new(receiver: &PythonPortReceiver, monitor: &PyActorMeshMonitor) -> Self {
424-
let inner = receiver.inner();
425-
MonitoredPythonPortReceiver {
426-
inner,
427-
monitor: monitor.clone(),
428-
}
436+
fn __repr__(&self) -> &'static str {
437+
"<MonitoredPortReceiver>"
429438
}
430439

431440
fn recv<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
@@ -461,24 +470,21 @@ impl MonitoredPythonPortReceiver {
461470
}
462471
}
463472

473+
// Values of this (private) type can only be created by calling
474+
// `PythonActorMesh::supervise_once_port()`.
464475
#[pyclass(
465476
name = "MonitoredOncePortReceiver",
466477
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
467478
)]
468-
pub(super) struct MonitoredPythonOncePortReceiver {
479+
struct MonitoredPythonOncePortReceiver {
469480
inner: Arc<std::sync::Mutex<Option<OncePortReceiver<PythonMessage>>>>,
470481
monitor: PyActorMeshMonitor,
471482
}
472483

473484
#[pymethods]
474485
impl MonitoredPythonOncePortReceiver {
475-
#[new]
476-
fn new(receiver: &PythonOncePortReceiver, monitor: &PyActorMeshMonitor) -> Self {
477-
let inner = receiver.inner();
478-
MonitoredPythonOncePortReceiver {
479-
inner,
480-
monitor: monitor.clone(),
481-
}
486+
fn __repr__(&self) -> &'static str {
487+
"<MonitoredOncePortReceiver>"
482488
}
483489

484490
fn recv<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {

python/monarch/_rust_bindings/monarch_hyperactor/actor_mesh.pyi

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,15 @@ class PythonActorMesh:
8888
"""
8989
...
9090

91-
# TODO(albertli): remove this when pushing all supervision logic to Rust
92-
def monitor(self) -> ActorMeshMonitor:
91+
def supervise_port(self, r: PortReceiver) -> MonitoredPortReceiver:
9392
"""
94-
Returns a supervision monitor for this mesh.
93+
Return a monitored port receiver.
94+
"""
95+
...
96+
97+
def supervise_once_port(self, r: OncePortReceiver) -> MonitoredOncePortReceiver:
98+
"""
99+
Return a monitored once port receiver.
95100
"""
96101
...
97102

@@ -113,31 +118,11 @@ class PythonActorMesh:
113118
"""
114119
...
115120

116-
@final
117-
class ActorMeshMonitor:
118-
def __aiter__(self) -> AsyncIterator["ActorSupervisionEvent"]:
119-
"""
120-
Returns an async iterator for this monitor.
121-
"""
122-
...
123-
124-
async def __anext__(self) -> "ActorSupervisionEvent":
125-
"""
126-
Returns the next proc event in the proc mesh.
127-
"""
128-
...
129-
130121
@final
131122
class MonitoredPortReceiver:
123+
"""A monitored receiver to which PythonMessages are sent. Values
124+
of this type cannot be constructed directly in Python.
132125
"""
133-
A monitored receiver to which PythonMessages are sent.
134-
"""
135-
136-
def __init__(self, receiver: PortReceiver, monitor: ActorMeshMonitor) -> None:
137-
"""
138-
Create a new monitored receiver from a PortReceiver.
139-
"""
140-
...
141126

142127
async def recv(self) -> PythonMessage:
143128
"""Receive a PythonMessage from the port's sender."""
@@ -148,15 +133,9 @@ class MonitoredPortReceiver:
148133

149134
@final
150135
class MonitoredOncePortReceiver:
136+
"""A monitored once receiver to which PythonMessages are sent.
137+
Values of this type cannot be constructed directly in Python.
151138
"""
152-
A variant of monitored PortReceiver that can only receive a single message.
153-
"""
154-
155-
def __init__(self, receiver: OncePortReceiver, monitor: ActorMeshMonitor) -> None:
156-
"""
157-
Create a new monitored receiver from a PortReceiver.
158-
"""
159-
...
160139

161140
async def recv(self) -> PythonMessage:
162141
"""Receive a single PythonMessage from the port's sender."""

python/monarch/_src/actor/actor_mesh.py

Lines changed: 20 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,7 @@
5050
PythonMessage,
5151
PythonMessageKind,
5252
)
53-
from monarch._rust_bindings.monarch_hyperactor.actor_mesh import (
54-
ActorMeshMonitor,
55-
MonitoredOncePortReceiver,
56-
MonitoredPortReceiver,
57-
PythonActorMesh,
58-
)
53+
from monarch._rust_bindings.monarch_hyperactor.actor_mesh import PythonActorMesh
5954
from monarch._rust_bindings.monarch_hyperactor.mailbox import (
6055
Mailbox,
6156
OncePortReceiver,
@@ -306,6 +301,9 @@ def _send(
306301
def _port(self, once: bool = False) -> "PortTuple[R]":
307302
pass
308303

304+
def _supervise(self, r: HyPortReceiver | OncePortReceiver) -> Any:
305+
return r
306+
309307
# the following are all 'adverbs' or different ways to handle the
310308
# return values of this endpoint. Adverbs should only ever take *args, **kwargs
311309
# of the original call. If we want to add syntax sugar for something that needs additional
@@ -399,6 +397,14 @@ def __init__(
399397
self._signature: inspect.Signature = inspect.signature(impl)
400398
self._mailbox = mailbox
401399

400+
def _supervise(self, r: HyPortReceiver | OncePortReceiver) -> Any:
401+
mesh = self._actor_mesh._actor_mesh
402+
return (
403+
mesh.supervise_once_port(r)
404+
if isinstance(r, OncePortReceiver)
405+
else mesh.supervise_port(r)
406+
)
407+
402408
def _send(
403409
self,
404410
args: Tuple[Any, ...],
@@ -430,12 +436,8 @@ def _send(
430436
return Extent(shape.labels, shape.ndslice.sizes)
431437

432438
def _port(self, once: bool = False) -> "PortTuple[R]":
433-
monitor = (
434-
None
435-
if self._actor_mesh._actor_mesh is None
436-
else self._actor_mesh._actor_mesh.monitor()
437-
)
438-
return PortTuple.create(self._mailbox, monitor, once)
439+
p, r = PortTuple.create(self._mailbox, once)
440+
return PortTuple(p, PortReceiver(self._mailbox, self._supervise(r._receiver)))
439441

440442

441443
class Accumulator(Generic[P, R, A]):
@@ -589,21 +591,11 @@ class PortTuple(NamedTuple, Generic[R]):
589591
receiver: "PortReceiver[R]"
590592

591593
@staticmethod
592-
def create(
593-
mailbox: Mailbox, monitor: Optional[ActorMeshMonitor], once: bool = False
594-
) -> "PortTuple[Any]":
594+
def create(mailbox: Mailbox, once: bool = False) -> "PortTuple[Any]":
595595
handle, receiver = mailbox.open_once_port() if once else mailbox.open_port()
596596
port_ref = handle.bind()
597-
if monitor is not None:
598-
receiver = (
599-
MonitoredOncePortReceiver(receiver, monitor)
600-
if isinstance(receiver, OncePortReceiver)
601-
else MonitoredPortReceiver(receiver, monitor)
602-
)
603-
604597
return PortTuple(
605-
Port(port_ref, mailbox, rank=None),
606-
PortReceiver(mailbox, receiver),
598+
Port(port_ref, mailbox, rank=None), PortReceiver(mailbox, receiver)
607599
)
608600
else:
609601

@@ -612,21 +604,11 @@ class PortTuple(NamedTuple):
612604
receiver: "PortReceiver[Any]"
613605

614606
@staticmethod
615-
def create(
616-
mailbox: Mailbox, monitor: Optional[ActorMeshMonitor], once: bool = False
617-
) -> "PortTuple[Any]":
607+
def create(mailbox: Mailbox, once: bool = False) -> "PortTuple[Any]":
618608
handle, receiver = mailbox.open_once_port() if once else mailbox.open_port()
619609
port_ref = handle.bind()
620-
if monitor is not None:
621-
receiver = (
622-
MonitoredOncePortReceiver(receiver, monitor)
623-
if isinstance(receiver, OncePortReceiver)
624-
else MonitoredPortReceiver(receiver, monitor)
625-
)
626-
627610
return PortTuple(
628-
Port(port_ref, mailbox, rank=None),
629-
PortReceiver(mailbox, receiver),
611+
Port(port_ref, mailbox, rank=None), PortReceiver(mailbox, receiver)
630612
)
631613

632614

@@ -648,18 +630,10 @@ class PortReceiver(Generic[R]):
648630
def __init__(
649631
self,
650632
mailbox: Mailbox,
651-
receiver: MonitoredPortReceiver
652-
| MonitoredOncePortReceiver
653-
| HyPortReceiver
654-
| OncePortReceiver,
633+
receiver: HyPortReceiver | OncePortReceiver,
655634
) -> None:
656635
self._mailbox: Mailbox = mailbox
657-
self._receiver: (
658-
MonitoredPortReceiver
659-
| MonitoredOncePortReceiver
660-
| HyPortReceiver
661-
| OncePortReceiver
662-
) = receiver
636+
self._receiver: HyPortReceiver | OncePortReceiver = receiver
663637

664638
async def _recv(self) -> R:
665639
return self._process(await self._receiver.recv())

python/monarch/common/remote.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def _port(self, once: bool = False) -> "PortTuple[R]":
144144
"Cannot create raw port objects with an old-style tensor engine controller."
145145
)
146146
mailbox: Mailbox = mesh_controller._mailbox
147-
return PortTuple.create(mailbox, None, once)
147+
return PortTuple.create(mailbox, once)
148148

149149
@property
150150
def _resolvable(self):

python/monarch/mesh_controller.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,7 @@ def fetch(
149149
defs: Tuple["Tensor", ...],
150150
uses: Tuple["Tensor", ...],
151151
) -> "OldFuture": # the OldFuture is a lie
152-
sender, receiver = PortTuple.create(
153-
self._mesh_controller._mailbox, None, once=True
154-
)
152+
sender, receiver = PortTuple.create(self._mesh_controller._mailbox, once=True)
155153

156154
ident = self.new_node(defs, uses, cast("OldFuture", sender))
157155
process = mesh._process(shard)

0 commit comments

Comments
 (0)