diff --git a/hyperactor_mesh/src/alloc/remoteprocess.rs b/hyperactor_mesh/src/alloc/remoteprocess.rs index 5805def7..783e81cd 100644 --- a/hyperactor_mesh/src/alloc/remoteprocess.rs +++ b/hyperactor_mesh/src/alloc/remoteprocess.rs @@ -99,6 +99,13 @@ pub struct RemoteProcessAllocator { cancel_token: CancellationToken, } +async fn conditional_sleeper>(t: Option) { + match t { + Some(timer) => timer.await, + None => futures::future::pending().await, + } +} + impl RemoteProcessAllocator { /// Create a new allocator. It will not start until start() is called. pub fn new() -> Arc { @@ -125,19 +132,27 @@ impl RemoteProcessAllocator { /// 4. Allocator sends Done message to bootstrap_addr when Alloc is done. /// /// At any point, client can send Stop message to serve_addr to stop the allocator. - pub async fn start(&self, cmd: Command, serve_addr: ChannelAddr) -> Result<(), anyhow::Error> { + pub async fn start( + &self, + cmd: Command, + serve_addr: ChannelAddr, + timeout: Option, + ) -> Result<(), anyhow::Error> { let process_allocator = ProcessAllocator::new(cmd); - self.start_with_allocator(serve_addr, process_allocator) + self.start_with_allocator(serve_addr, process_allocator, timeout) .await } /// Start a remote process allocator with given allocator listening for /// RemoteProcessAllocatorMessage on serve_addr. + /// If timeout is Some, the allocator will exit if no client connects within + /// that timeout, and no child allocation is running. /// Used for testing. pub async fn start_with_allocator( &self, serve_addr: ChannelAddr, mut process_allocator: A, + timeout: Option, ) -> Result<(), anyhow::Error> where ::Alloc: Send, @@ -166,6 +181,9 @@ impl RemoteProcessAllocator { let mut active_allocation: Option = None; loop { + // Refresh each loop iteration so the timer updates whenever a message + // is received. + let sleep = conditional_sleeper(timeout.map(|t| RealClock.sleep(t))); tokio::select! { msg = rx.recv() => { match msg { @@ -218,6 +236,16 @@ impl RemoteProcessAllocator { break; } + _ = sleep => { + // If there are any active allocations, reset the timeout. + if active_allocation.is_some() { + continue; + } + // Else, exit the loop as a client hasn't connected in a reasonable + // amount of time. + tracing::warn!("timeout elapsed without any allocations, exiting"); + break; + } } } @@ -1143,7 +1171,7 @@ mod test { let remote_allocator = remote_allocator.clone(); async move { remote_allocator - .start_with_allocator(serve_addr, allocator) + .start_with_allocator(serve_addr, allocator, None) .await } }); @@ -1280,7 +1308,7 @@ mod test { let remote_allocator = remote_allocator.clone(); async move { remote_allocator - .start_with_allocator(serve_addr, allocator) + .start_with_allocator(serve_addr, allocator, None) .await } }); @@ -1376,7 +1404,7 @@ mod test { let remote_allocator = remote_allocator.clone(); async move { remote_allocator - .start_with_allocator(serve_addr, allocator) + .start_with_allocator(serve_addr, allocator, None) .await } }); @@ -1483,7 +1511,7 @@ mod test { let remote_allocator = remote_allocator.clone(); async move { remote_allocator - .start_with_allocator(serve_addr, allocator) + .start_with_allocator(serve_addr, allocator, None) .await } }); @@ -1566,7 +1594,7 @@ mod test { let remote_allocator = remote_allocator.clone(); async move { remote_allocator - .start_with_allocator(serve_addr, allocator) + .start_with_allocator(serve_addr, allocator, None) .await } }); @@ -1640,14 +1668,14 @@ mod test_alloc { let task1_allocator_handle = tokio::spawn(async move { tracing::info!("spawning task1"); task1_allocator_copy - .start(task1_cmd, task1_addr) + .start(task1_cmd, task1_addr, None) .await .unwrap(); }); let task2_allocator_copy = task2_allocator.clone(); let task2_allocator_handle = tokio::spawn(async move { task2_allocator_copy - .start(task2_cmd, task2_addr) + .start(task2_cmd, task2_addr, None) .await .unwrap(); }); @@ -1763,7 +1791,7 @@ mod test_alloc { let task1_allocator_handle = tokio::spawn(async move { tracing::info!("spawning task1"); task1_allocator_copy - .start(task1_cmd, task1_addr) + .start(task1_cmd, task1_addr, None) .await .unwrap(); tracing::info!("task1 terminated"); @@ -1771,7 +1799,7 @@ mod test_alloc { let task2_allocator_copy = task2_allocator.clone(); let task2_allocator_handle = tokio::spawn(async move { task2_allocator_copy - .start(task2_cmd, task2_addr) + .start(task2_cmd, task2_addr, None) .await .unwrap(); tracing::info!("task2 terminated"); @@ -1884,14 +1912,14 @@ mod test_alloc { let task1_allocator_handle = tokio::spawn(async move { tracing::info!("spawning task1"); task1_allocator_copy - .start(task1_cmd, task1_addr) + .start(task1_cmd, task1_addr, None) .await .unwrap(); }); let task2_allocator_copy = task2_allocator.clone(); let task2_allocator_handle = tokio::spawn(async move { task2_allocator_copy - .start(task2_cmd, task2_addr) + .start(task2_cmd, task2_addr, None) .await .unwrap(); }); diff --git a/monarch_hyperactor/src/bin/process_allocator/common.rs b/monarch_hyperactor/src/bin/process_allocator/common.rs index f4839e0a..2476406b 100644 --- a/monarch_hyperactor/src/bin/process_allocator/common.rs +++ b/monarch_hyperactor/src/bin/process_allocator/common.rs @@ -13,6 +13,7 @@ use clap::command; use hyperactor::channel::ChannelAddr; use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocator; use tokio::process::Command; +use tokio::time::Duration; #[derive(Parser, Debug)] #[command(about = "Runs hyperactor's process allocator")] @@ -38,18 +39,26 @@ pub struct Args { help = "The path to the binary that this process allocator spawns on an `allocate` request" )] pub program: String, + + #[arg( + long, + default_value_t = 0, + help = "If non-zero, a timeout for the allocator to wait before exiting. 0 means infinite wait" + )] + pub timeout: u64, } pub fn main_impl( serve_address: ChannelAddr, - program: String, + program: Command, + timeout: Option, ) -> tokio::task::JoinHandle> { tracing::info!("bind address is: {}", serve_address); - tracing::info!("program to spawn on allocation request: [{}]", &program); + tracing::info!("program to spawn on allocation request: [{:?}]", &program); - tokio::spawn(async { + tokio::spawn(async move { RemoteProcessAllocator::new() - .start(Command::new(program), serve_address) + .start(program, serve_address, timeout) .await }) } @@ -61,6 +70,8 @@ mod tests { use clap::Parser; use hyperactor::WorldId; use hyperactor::channel::ChannelTransport; + use hyperactor::clock::Clock; + use hyperactor::clock::RealClock; use hyperactor_mesh::alloc; use hyperactor_mesh::alloc::Alloc; use hyperactor_mesh::alloc::remoteprocess; @@ -100,8 +111,8 @@ mod tests { hyperactor::initialize_with_current_runtime(); let serve_address = ChannelAddr::any(ChannelTransport::Unix); - let program = String::from("/bin/date"); // date is usually a unix built-in command - let server_handle = main_impl(serve_address.clone(), program); + let program = Command::new("/bin/date"); // date is usually a unix built-in command + let server_handle = main_impl(serve_address.clone(), program, None); let spec = alloc::AllocSpec { // NOTE: x cannot be more than 1 since we created a single process-allocator server instance! @@ -158,4 +169,228 @@ mod tests { server_handle.abort(); Ok(()) } + + /// Tests that an allocator with a timeout and no messages will exit and not + /// finish allocating. + #[tokio::test] + async fn test_timeout() -> Result<(), anyhow::Error> { + hyperactor::initialize_with_current_runtime(); + + let serve_address = ChannelAddr::any(ChannelTransport::Unix); + let program = Command::new("/bin/date"); // date is usually a unix built-in command + // 1 second quick timeout to check that it fails. + let timeout = Duration::from_millis(500); + let server_handle = main_impl(serve_address.clone(), program, Some(timeout)); + + let spec = alloc::AllocSpec { + // NOTE: x cannot be more than 1 since we created a single process-allocator server instance! + shape: shape! { x=1, y=4 }, + constraints: Default::default(), + }; + + let mut initializer = remoteprocess::MockRemoteProcessAllocInitializer::new(); + initializer.expect_initialize_alloc().return_once(move || { + Ok(vec![remoteprocess::RemoteProcessAllocHost { + hostname: serve_address.to_string(), + id: serve_address.to_string(), + }]) + }); + + let heartbeat = std::time::Duration::from_millis(100); + let world_id = WorldId("__unused__".to_string()); + + // Wait at least as long as the timeout before sending any messages. + RealClock.sleep(timeout * 2).await; + + // Attempt to allocate, it should fail because a timeout happens before + let mut alloc = remoteprocess::RemoteProcessAlloc::new( + spec.clone(), + world_id.clone(), + ChannelTransport::Unix, + 0, + heartbeat, + initializer, + ) + .await + .unwrap(); + let res = alloc.next().await.unwrap(); + // Should fail because the allocator timed out. + if let alloc::ProcState::Failed { + world_id: msg_world_id, + description, + } = res + { + assert_eq!(msg_world_id, world_id); + assert!(description.contains("no process has ever been allocated")); + } else { + panic!("Unexpected ProcState: {:?}", res); + } + + server_handle.abort(); + Ok(()) + } + + /// Tests that an allocator with a timeout and some messages will still exit + /// after the allocation finishes. + #[tokio::test] + async fn test_timeout_after_message() -> Result<(), anyhow::Error> { + hyperactor::initialize_with_current_runtime(); + + let serve_address = ChannelAddr::any(ChannelTransport::Unix); + let program = Command::new("/bin/date"); // date is usually a unix built-in command + // Slower timeout so we can send a message in time. + let timeout = Duration::from_millis(1500); + let server_handle = main_impl(serve_address.clone(), program, Some(timeout)); + + let spec = alloc::AllocSpec { + // NOTE: x cannot be more than 1 since we created a single process-allocator server instance! + shape: shape! { x=1, y=4 }, + constraints: Default::default(), + }; + + let mut initializer = remoteprocess::MockRemoteProcessAllocInitializer::new(); + let alloc_host = remoteprocess::RemoteProcessAllocHost { + hostname: serve_address.to_string(), + id: serve_address.to_string(), + }; + let alloc_host_clone = alloc_host.clone(); + initializer + .expect_initialize_alloc() + .return_once(move || Ok(vec![alloc_host_clone])); + + let heartbeat = std::time::Duration::from_millis(100); + let world_id = WorldId("__unused__".to_string()); + + // Attempt to allocate, it should succeed because a timeout happens before + let mut alloc = remoteprocess::RemoteProcessAlloc::new( + spec.clone(), + world_id.clone(), + ChannelTransport::Unix, + 0, + heartbeat, + initializer, + ) + .await + .unwrap(); + // Ensure the process starts. + alloc.next().await.unwrap(); + // Now stop the alloc and wait for a timeout to ensure the allocator exited. + alloc.stop_and_wait().await.unwrap(); + + // Wait at least as long as the timeout before sending any messages. + RealClock.sleep(timeout * 2).await; + + // Allocate again to see the error. + let mut initializer = remoteprocess::MockRemoteProcessAllocInitializer::new(); + initializer + .expect_initialize_alloc() + .return_once(move || Ok(vec![alloc_host])); + let mut alloc = remoteprocess::RemoteProcessAlloc::new( + spec.clone(), + world_id.clone(), + ChannelTransport::Unix, + 0, + heartbeat, + initializer, + ) + .await + .unwrap(); + let res = alloc.next().await.unwrap(); + // Should fail because the allocator timed out. + if let alloc::ProcState::Failed { + world_id: msg_world_id, + description, + } = res + { + assert_eq!(msg_world_id, world_id); + assert!(description.contains("no process has ever been allocated")); + } else { + panic!("Unexpected ProcState: {:?}", res); + } + + server_handle.abort(); + Ok(()) + } + + /// Tests that an allocator with a timeout, that has a process running and + /// receives no messages, will keep running as long as the processes do. + #[tokio::test] + async fn test_timeout_not_during_execution() -> Result<(), anyhow::Error> { + hyperactor::initialize_with_current_runtime(); + + let serve_address = ChannelAddr::any(ChannelTransport::Unix); + let mut program = Command::new("/usr/bin/sleep"); // use a command that waits for a while + program.arg("3"); + let timeout = Duration::from_millis(500); + let server_handle = main_impl(serve_address.clone(), program, Some(timeout)); + + let spec = alloc::AllocSpec { + // NOTE: x cannot be more than 1 since we created a single process-allocator server instance! + shape: shape! { x=1, y=4 }, + constraints: Default::default(), + }; + + let mut initializer = remoteprocess::MockRemoteProcessAllocInitializer::new(); + let alloc_host = remoteprocess::RemoteProcessAllocHost { + hostname: serve_address.to_string(), + id: serve_address.to_string(), + }; + initializer + .expect_initialize_alloc() + .return_once(move || Ok(vec![alloc_host])); + + let heartbeat = std::time::Duration::from_millis(100); + let world_id = WorldId("__unused__".to_string()); + + // Attempt to allocate, it should succeed because a timeout happens before + let mut alloc = remoteprocess::RemoteProcessAlloc::new( + spec.clone(), + world_id.clone(), + ChannelTransport::Unix, + 0, + heartbeat, + initializer, + ) + .await + .unwrap(); + // Ensure the process starts. Since the command is "sleep", it should + // start without stopping. + // make sure we accounted for `world_size` number of Created and Stopped proc states + let world_size = spec.shape.slice().iter().count(); + let mut created_ranks: HashSet = HashSet::new(); + + while created_ranks.len() < world_size { + let proc_state = alloc.next().await.unwrap(); + match proc_state { + alloc::ProcState::Created { proc_id, .. } => { + created_ranks.insert(proc_id.rank()); + } + _ => { + panic!("Unexpected message: {:?}", proc_state) + } + } + } + // Now that all procs have started, wait at least as long as the timeout + // before sending any messages. This way we ensure the remote allocator + // stays alive as long as the child processes stay alive. + RealClock.sleep(timeout * 2).await; + // Now wait for more events and ensure they are ProcState::Stopped + let mut stopped_ranks: HashSet = HashSet::new(); + while stopped_ranks.len() < world_size { + let proc_state = alloc.next().await.unwrap(); + match proc_state { + alloc::ProcState::Created { .. } => { + // ignore + } + alloc::ProcState::Stopped { proc_id, .. } => { + stopped_ranks.insert(proc_id.rank() % world_size); + } + _ => { + panic!("Unexpected message: {:?}", proc_state) + } + } + } + server_handle.abort(); + Ok(()) + } } diff --git a/monarch_hyperactor/src/bin/process_allocator/main.rs b/monarch_hyperactor/src/bin/process_allocator/main.rs index af4a8509..368d4790 100644 --- a/monarch_hyperactor/src/bin/process_allocator/main.rs +++ b/monarch_hyperactor/src/bin/process_allocator/main.rs @@ -14,6 +14,8 @@ use clap::Parser; use common::Args; use common::main_impl; use hyperactor::channel::ChannelAddr; +use tokio::process::Command; +use tokio::time::Duration; #[tokio::main] async fn main() { @@ -25,6 +27,12 @@ async fn main() { .unwrap_or_else(|| format!("tcp![::]:{}", args.port)); let serve_address = ChannelAddr::from_str(&bind).unwrap(); + let program = Command::new(args.program); + let timeout = if args.timeout > 0 { + Some(Duration::from_secs(args.timeout)) + } else { + None + }; - let _ = main_impl(serve_address, args.program).await.unwrap(); + let _ = main_impl(serve_address, program, timeout).await.unwrap(); } diff --git a/python/tests/test_allocator.py b/python/tests/test_allocator.py index f363f1a0..8b80b02a 100644 --- a/python/tests/test_allocator.py +++ b/python/tests/test_allocator.py @@ -6,6 +6,7 @@ # pyre-strict +import asyncio import contextlib import importlib.resources import logging @@ -86,15 +87,23 @@ async def log(self, message: str) -> None: @contextlib.contextmanager -def remote_process_allocator(addr: Optional[str] = None) -> Generator[str, None, None]: +def remote_process_allocator( + addr: Optional[str] = None, timeout: Optional[int] = None +) -> Generator[str, None, None]: + """Start a remote process allocator on addr. If timeout is not None, have it + timeout after that many seconds if no messages come in""" + with importlib.resources.path(__package__, "") as package_path: addr = addr or ChannelAddr.any(ChannelTransport.Unix) + args = [ + "process_allocator", + f"--addr={addr}", + ] + if timeout is not None: + args.append(f"--timeout={timeout}") process_allocator = subprocess.Popen( - args=[ - "process_allocator", - f"--addr={addr}", - ], + args=args, env={ # prefix PATH with this test module's directory to # give 'process_allocator' and 'monarch_bootstrap' binary resources @@ -325,6 +334,23 @@ async def test_stop_proc_mesh_context_manager_multiple_times(self) -> None: # now we doing casting without accessing the wrapped type. del actor + async def test_remote_allocator_with_no_connection(self) -> None: + spec = AllocSpec(AllocConstraints(), host=1, gpu=4) + + with remote_process_allocator(timeout=1) as host1: + # Wait 3 seconds without making any processes, make sure it dies. + await asyncio.sleep(3) + allocator = RemoteAllocator( + world_id="test_remote_allocator", + initializer=StaticRemoteAllocInitializer(host1), + heartbeat_interval=_100_MILLISECONDS, + ) + with self.assertRaisesRegex( + Exception, "no process has ever been allocated on" + ): + alloc = await allocator.allocate(spec) + await ProcMesh.from_alloc(alloc) + async def test_stacked_1d_meshes(self) -> None: # create two stacked actor meshes on the same host # each actor mesh running on separate process-allocators