Skip to content

Define blocking variants of everything in terms of non-blocking variant. #585

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: gh/zdevito/46/base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 9 additions & 81 deletions monarch_hyperactor/src/alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use pyo3::prelude::*;
use pyo3::types::PyDict;
use tokio::process::Command;

use crate::actor::PyPythonTask;
use crate::channel::PyChannelAddr;
use crate::runtime::signal_safe_block_on;

Expand Down Expand Up @@ -190,37 +191,19 @@ impl PyLocalAllocator {
PyLocalAllocator {}
}

fn allocate_nonblocking<'py>(
&self,
py: Python<'py>,
spec: &PyAllocSpec,
) -> PyResult<Bound<'py, PyAny>> {
fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
// We could use Bound here, and acquire the GIL inside of `future_into_py`, but
// it is rather awkward with the current APIs, and we can anyway support Arc/Mutex
// pretty easily.
let spec = spec.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
PyPythonTask::new(async move {
LocalAllocator
.allocate(spec)
.await
.map(|inner| PyAlloc::new(Box::new(inner)))
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
})
}

fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult<PyAlloc> {
// We could use Bound here, and acquire the GIL inside of
// `signal_safe_block_on`, but it is rather awkward with the current
// APIs, and we can anyway support Arc/Mutex pretty easily.
let spec = spec.inner.clone();
signal_safe_block_on(py, async move {
LocalAllocator
.allocate(spec)
.await
.map(|inner| PyAlloc::new(Box::new(inner)))
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
})?
}
}

#[pyclass(
Expand All @@ -237,37 +220,19 @@ impl PySimAllocator {
PySimAllocator {}
}

fn allocate_nonblocking<'py>(
&self,
py: Python<'py>,
spec: &PyAllocSpec,
) -> PyResult<Bound<'py, PyAny>> {
fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
// We could use Bound here, and acquire the GIL inside of `future_into_py`, but
// it is rather awkward with the current APIs, and we can anyway support Arc/Mutex
// pretty easily.
let spec = spec.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
PyPythonTask::new(async move {
SimAllocator
.allocate(spec)
.await
.map(|inner| PyAlloc::new(Box::new(inner)))
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
})
}

fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult<PyAlloc> {
// We could use Bound here, and acquire the GIL inside of
// `signal_safe_block_on`, but it is rather awkward with the current
// APIs, and we can anyway support Arc/Mutex pretty easily.
let spec = spec.inner.clone();
signal_safe_block_on(py, async move {
SimAllocator
.allocate(spec)
.await
.map(|inner| PyAlloc::new(Box::new(inner)))
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
})?
}
}

#[pyclass(
Expand Down Expand Up @@ -296,17 +261,13 @@ impl PyProcessAllocator {
}
}

fn allocate_nonblocking<'py>(
&self,
py: Python<'py>,
spec: &PyAllocSpec,
) -> PyResult<Bound<'py, PyAny>> {
fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
// We could use Bound here, and acquire the GIL inside of `future_into_py`, but
// it is rather awkward with the current APIs, and we can anyway support Arc/Mutex
// pretty easily.
let instance = Arc::clone(&self.inner);
let spec = spec.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
PyPythonTask::new(async move {
instance
.lock()
.await
Expand All @@ -316,23 +277,6 @@ impl PyProcessAllocator {
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
})
}

fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult<PyAlloc> {
// We could use Bound here, and acquire the GIL inside of
// `signal_safe_block_on`, but it is rather awkward with the current
// APIs, and we can anyway support Arc/Mutex pretty easily.
let instance = Arc::clone(&self.inner);
let spec = spec.inner.clone();
signal_safe_block_on(py, async move {
instance
.lock()
.await
.allocate(spec)
.await
.map(|inner| PyAlloc::new(Box::new(inner)))
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
})?
}
}

/// A `[hyperactor_mesh::alloc::RemoteProcessAllocInitializer]` wrapper to enable subclassing from Python.
Expand Down Expand Up @@ -486,34 +430,18 @@ impl PyRemoteAllocator {
})
}

fn allocate_nonblocking<'py>(
&self,
py: Python<'py>,
spec: &PyAllocSpec,
) -> PyResult<Bound<'py, PyAny>> {
fn allocate_nonblocking(&self, spec: &PyAllocSpec) -> PyResult<PyPythonTask> {
let spec = spec.inner.clone();
let mut cloned = self.clone();

pyo3_async_runtimes::tokio::future_into_py(py, async move {
PyPythonTask::new(async move {
cloned
.allocate(spec)
.await
.map(|alloc| PyAlloc::new(Box::new(alloc)))
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
})
}
fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult<PyAlloc> {
let spec = spec.inner.clone();
let mut cloned = self.clone();

signal_safe_block_on(py, async move {
cloned
.allocate(spec)
.await
.map(|alloc| PyAlloc::new(Box::new(alloc)))
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
})?
}
}

pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
Expand Down
92 changes: 14 additions & 78 deletions monarch_hyperactor/src/proc_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ use pyo3::types::PyType;
use tokio::sync::Mutex;
use tokio::sync::mpsc;

use crate::actor::PyPythonTask;
use crate::actor::PythonTask;
use crate::actor_mesh::PythonActorMesh;
use crate::alloc::PyAlloc;
use crate::mailbox::PyMailbox;
Expand Down Expand Up @@ -120,7 +122,7 @@ pub struct PyProcMesh {
unhealthy_event: Arc<Mutex<Option<Option<ProcEvent>>>>,
}

fn allocate_proc_mesh<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<Bound<'py, PyAny>> {
fn allocate_proc_mesh(alloc: &PyAlloc) -> PyResult<PyPythonTask> {
let alloc = match alloc.take() {
Some(alloc) => alloc,
None => {
Expand All @@ -129,7 +131,7 @@ fn allocate_proc_mesh<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<Bound<'
));
}
};
pyo3_async_runtimes::tokio::future_into_py(py, async move {
PyPythonTask::new(async move {
let world_id = alloc.world_id().clone();
let mesh = ProcMesh::allocate(alloc)
.await
Expand All @@ -138,24 +140,6 @@ fn allocate_proc_mesh<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<Bound<'
})
}

fn allocate_proc_mesh_blocking<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<PyProcMesh> {
let alloc = match alloc.take() {
Some(alloc) => alloc,
None => {
return Err(PyException::new_err(
"Alloc object already been used".to_string(),
));
}
};
signal_safe_block_on(py, async move {
let world_id = alloc.world_id().clone();
let mesh = ProcMesh::allocate(alloc)
.await
.map_err(|err| PyException::new_err(err.to_string()))?;
Ok(PyProcMesh::monitored(mesh, world_id))
})?
}

impl PyProcMesh {
/// Create a new [`PyProcMesh`] with self health status monitoring.
fn monitored(mut proc_mesh: ProcMesh, world_id: WorldId) -> Self {
Expand Down Expand Up @@ -281,71 +265,34 @@ impl PyProcMesh {
_cls: &Bound<'_, PyType>,
py: Python<'py>,
alloc: &PyAlloc,
) -> PyResult<Bound<'py, PyAny>> {
allocate_proc_mesh(py, alloc)
}

#[classmethod]
fn allocate_blocking<'py>(
_cls: &Bound<'_, PyType>,
py: Python<'py>,
alloc: &PyAlloc,
) -> PyResult<PyProcMesh> {
allocate_proc_mesh_blocking(py, alloc)
) -> PyResult<PyPythonTask> {
allocate_proc_mesh(alloc)
}

fn spawn_nonblocking<'py>(
&self,
py: Python<'py>,
name: String,
actor: &Bound<'py, PyType>,
) -> PyResult<Bound<'py, PyAny>> {
) -> PyResult<PyPythonTask> {
let unhealthy_event = Arc::clone(&self.unhealthy_event);
let pickled_type = PickledPyObject::pickle(actor.as_any())?;
let proc_mesh = self.try_inner()?;
let keepalive = self.keepalive.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
PyPythonTask::new(async move {
ensure_mesh_healthy(&unhealthy_event).await?;

let mailbox = proc_mesh.client().clone();
let actor_mesh = proc_mesh.spawn(&name, &pickled_type).await?;
let actor_events = actor_mesh.with_mut(|a| a.events()).await.unwrap().unwrap();
let python_actor_mesh = PythonActorMesh::monitored(
Ok(PythonActorMesh::monitored(
actor_mesh,
PyMailbox { inner: mailbox },
keepalive,
actor_events,
);
Python::with_gil(|py| python_actor_mesh.into_py_any(py))
))
})
}

fn spawn_blocking<'py>(
&self,
py: Python<'py>,
name: String,
actor: &Bound<'py, PyType>,
) -> PyResult<PyObject> {
let unhealthy_event = Arc::clone(&self.unhealthy_event);
let pickled_type = PickledPyObject::pickle(actor.as_any())?;
let proc_mesh = self.try_inner()?;
let keepalive = self.keepalive.clone();
signal_safe_block_on(py, async move {
ensure_mesh_healthy(&unhealthy_event).await?;

let mailbox = proc_mesh.client().clone();
let actor_mesh = proc_mesh.spawn(&name, &pickled_type).await?;
let actor_events = actor_mesh.with_mut(|a| a.events()).await.unwrap().unwrap();
let python_actor_mesh = PythonActorMesh::monitored(
actor_mesh,
PyMailbox { inner: mailbox },
keepalive,
actor_events,
);
Python::with_gil(|py| python_actor_mesh.into_py_any(py))
})?
}

// User can call this to monitor the proc mesh events. This will override
// the default monitor that exits the client on process crash, so user can
// handle the process crash in their own way.
Expand Down Expand Up @@ -383,27 +330,16 @@ impl PyProcMesh {
Ok(self.try_inner()?.shape().clone().into())
}

fn stop_nonblocking<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
fn stop_nonblocking<'py>(&self) -> PyResult<PyPythonTask> {
// Clone the necessary fields from self to avoid capturing self in the async block
let inner = self.inner.clone();
let proc_events = self.proc_events.clone();

pyo3_async_runtimes::tokio::future_into_py(py, async move {
Ok(PythonTask::new(async move {
Self::stop_mesh(inner, proc_events).await?;
PyResult::Ok(())
Python::with_gil(|py| Ok(py.None()))
})
}

fn stop_blocking<'py>(&self, py: Python<'py>) -> PyResult<()> {
// Clone the necessary fields from self to avoid capturing self in the async block
let inner = self.inner.clone();
let proc_events = self.proc_events.clone();

signal_safe_block_on(py, async move {
Self::stop_mesh(inner, proc_events)
.await
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
})?
.into())
}
}

Expand Down
Loading
Loading