From d524452eee8b25d256b339bbca79b6c1b4f21f45 Mon Sep 17 00:00:00 2001 From: Thomas Wang Date: Wed, 20 Aug 2025 11:04:13 -0700 Subject: [PATCH 1/4] Use tokio::time::Duration and tokio::time::Instant for timekeeping Summary: Previously we had to use u64 for serialization reasons but those reasons no longer exist Differential Revision: D80556690 --- hyperactor/src/channel/sim.rs | 20 +++++----- hyperactor/src/clock.rs | 45 +++++++++++---------- hyperactor/src/simnet.rs | 73 +++++++++++++++++++---------------- 3 files changed, 72 insertions(+), 66 deletions(-) diff --git a/hyperactor/src/channel/sim.rs b/hyperactor/src/channel/sim.rs index 41a41487e..724a7b67d 100644 --- a/hyperactor/src/channel/sim.rs +++ b/hyperactor/src/channel/sim.rs @@ -22,7 +22,6 @@ use super::*; use crate::channel; use crate::clock::Clock; use crate::clock::RealClock; -use crate::clock::SimClock; use crate::data::Serialized; use crate::mailbox::MessageEnvelope; use crate::simnet; @@ -129,7 +128,7 @@ pub(crate) struct MessageDeliveryEvent { src_addr: Option, dest_addr: ChannelAddr, data: Serialized, - duration_ms: u64, + duration: tokio::time::Duration, } impl MessageDeliveryEvent { @@ -139,7 +138,7 @@ impl MessageDeliveryEvent { src_addr, dest_addr, data, - duration_ms: 100, + duration: tokio::time::Duration::from_millis(100), } } } @@ -158,8 +157,8 @@ impl Event for MessageDeliveryEvent { Ok(()) } - fn duration_ms(&self) -> u64 { - self.duration_ms + fn duration(&self) -> tokio::time::Duration { + self.duration } fn summary(&self) -> String { @@ -178,12 +177,12 @@ impl Event for MessageDeliveryEvent { src: src_addr.clone(), dst: self.dest_addr.clone(), }; - self.duration_ms = topology + self.duration = topology .lock() .await .topology .get(&edge) - .map_or_else(|| 1, |v| v.latency.as_millis() as u64); + .map_or_else(|| tokio::time::Duration::from_millis(1), |v| v.latency); } } } @@ -332,7 +331,7 @@ impl Tx for SimTx { self.dst_addr.clone(), data, )), - time: SimClock.millis_since_start(RealClock.now()), + time: RealClock.now(), }), _ => handle.send_event(Box::new(MessageDeliveryEvent::new( self.src_addr.clone(), @@ -551,7 +550,10 @@ mod tests { .await .unwrap(); - assert_eq!(SimClock.millis_since_start(RealClock.now()), 0); + assert_eq!( + SimClock.duration_since_start(RealClock.now()), + tokio::time::Duration::ZERO + ); // Fast forward real time to 5 seconds tokio::time::advance(tokio::time::Duration::from_secs(5)).await; { diff --git a/hyperactor/src/clock.rs b/hyperactor/src/clock.rs index 84c572ecc..70fdb25f4 100644 --- a/hyperactor/src/clock.rs +++ b/hyperactor/src/clock.rs @@ -197,11 +197,7 @@ impl Clock for SimClock { simnet_handle() .unwrap() - .send_event(SleepEvent::new( - tx.bind(), - mailbox, - duration.as_millis() as u64, - )) + .send_event(SleepEvent::new(tx.bind(), mailbox, duration)) .unwrap(); rx.recv().await.unwrap(); } @@ -212,11 +208,7 @@ impl Clock for SimClock { simnet_handle() .unwrap() - .send_nonadvanceable_event(SleepEvent::new( - tx.bind(), - mailbox, - duration.as_millis() as u64, - )) + .send_nonadvanceable_event(SleepEvent::new(tx.bind(), mailbox, duration)) .unwrap(); rx.recv().await.unwrap(); } @@ -247,11 +239,7 @@ impl Clock for SimClock { simnet_handle() .unwrap() - .send_event(SleepEvent::new( - tx.bind(), - mailbox, - duration.as_millis() as u64, - )) + .send_event(SleepEvent::new(tx.bind(), mailbox, duration)) .unwrap(); let fut = f; @@ -290,14 +278,19 @@ impl SimClock { } /// Advance the sumulator's time to the specified instant - pub fn advance_to(&self, millis: u64) { + pub fn advance_to(&self, time: tokio::time::Instant) { let mut guard = SIM_TIME.now.lock().unwrap(); - *guard = SIM_TIME.start + tokio::time::Duration::from_millis(millis); + *guard = time; } /// Get the number of milliseconds elapsed since the start of the simulation - pub fn millis_since_start(&self, instant: tokio::time::Instant) -> u64 { - instant.duration_since(SIM_TIME.start).as_millis() as u64 + pub fn duration_since_start(&self, instant: tokio::time::Instant) -> tokio::time::Duration { + instant.duration_since(SIM_TIME.start) + } + + /// Instant marking the start of the simulation + pub fn start(&self) -> tokio::time::Instant { + SIM_TIME.start.clone() } } @@ -347,10 +340,16 @@ mod tests { #[tokio::test] async fn test_sim_clock_simple() { let start = SimClock.now(); - assert_eq!(SimClock.millis_since_start(start), 0); - SimClock.advance_to(10000); + assert_eq!( + SimClock.duration_since_start(start), + tokio::time::Duration::ZERO + ); + SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000)); let end = SimClock.now(); - assert_eq!(SimClock.millis_since_start(end), 10000); + assert_eq!( + SimClock.duration_since_start(end), + tokio::time::Duration::from_millis(10000) + ); assert_eq!( end.duration_since(start), tokio::time::Duration::from_secs(10) @@ -360,7 +359,7 @@ mod tests { #[tokio::test] async fn test_sim_clock_system_time() { let start = SimClock.system_time_now(); - SimClock.advance_to(10000); + SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000)); let end = SimClock.system_time_now(); assert_eq!( end.duration_since(start).unwrap(), diff --git a/hyperactor/src/simnet.rs b/hyperactor/src/simnet.rs index def1fc9f5..1ee69729a 100644 --- a/hyperactor/src/simnet.rs +++ b/hyperactor/src/simnet.rs @@ -70,7 +70,7 @@ const OPERATIONAL_MESSAGE_BUFFER_SIZE: usize = 8; pub trait Address: Hash + Debug + Eq + PartialEq + Ord + PartialOrd + Clone {} impl Address for A {} -type SimulatorTimeInstant = u64; +type SimulatorTimeInstant = tokio::time::Instant; /// The unit of execution for the simulator. /// Using handle(), simnet can schedule executions in the network. @@ -99,7 +99,7 @@ pub trait Event: Send + Sync + Debug { /// The latency of the event. This could be network latency, induced latency (sleep), or /// GPU work latency. - fn duration_ms(&self) -> u64; + fn duration(&self) -> tokio::time::Duration; /// Read the simnet config and update self accordingly. async fn read_simnet_config(&mut self, _topology: &Arc>) {} @@ -126,8 +126,8 @@ impl Event for NodeJoinEvent { self.handle().await } - fn duration_ms(&self) -> u64 { - 0 + fn duration(&self) -> tokio::time::Duration { + tokio::time::Duration::ZERO } fn summary(&self) -> String { @@ -139,15 +139,19 @@ impl Event for NodeJoinEvent { pub(crate) struct SleepEvent { done_tx: OncePortRef<()>, mailbox: Mailbox, - duration_ms: u64, + duration: tokio::time::Duration, } impl SleepEvent { - pub(crate) fn new(done_tx: OncePortRef<()>, mailbox: Mailbox, duration_ms: u64) -> Box { + pub(crate) fn new( + done_tx: OncePortRef<()>, + mailbox: Mailbox, + duration: tokio::time::Duration, + ) -> Box { Box::new(Self { done_tx, mailbox, - duration_ms, + duration, }) } } @@ -166,12 +170,12 @@ impl Event for SleepEvent { Ok(()) } - fn duration_ms(&self) -> u64 { - self.duration_ms + fn duration(&self) -> tokio::time::Duration { + self.duration } fn summary(&self) -> String { - format!("Sleeping for {} ms", self.duration_ms) + format!("Sleeping for {} ms", self.duration.as_millis()) } } @@ -200,8 +204,8 @@ impl Event for TorchOpEvent { Ok(()) } - fn duration_ms(&self) -> u64 { - 100 + fn duration(&self) -> tokio::time::Duration { + tokio::time::Duration::from_millis(100) } fn summary(&self) -> String { @@ -561,22 +565,20 @@ impl SimNet { // Get latency event.read_simnet_config(&self.config).await; ScheduledEvent { - time: SimClock.millis_since_start( - SimClock.now() + tokio::time::Duration::from_millis(event.duration_ms()), - ), + time: SimClock.now() + event.duration(), event, } } /// Schedule the event into the network. fn schedule_event(&mut self, scheduled_event: ScheduledEvent, advanceable: bool) { - let start_at = SimClock.millis_since_start(SimClock.now()); + let start_at = SimClock.now(); let end_at = scheduled_event.time; self.records.push(SimulatorEventRecord { summary: scheduled_event.event.summary(), - start_at, - end_at, + start_at: SimClock.duration_since_start(start_at).as_millis() as u64, + end_at: SimClock.duration_since_start(end_at).as_millis() as u64, }); if advanceable { @@ -604,7 +606,7 @@ impl SimNet { ) -> Vec { // The simulated number of milliseconds the training script // has spent waiting for the backend to resolve a future - let mut training_script_waiting_time: u64 = 0; + let mut training_script_waiting_time = tokio::time::Duration::from_millis(0); // Duration elapsed while only non_advanceable_events has events let mut debounce_timer: Option = None; 'outer: loop { @@ -638,9 +640,7 @@ impl SimNet { .scheduled_events .first_key_value() .is_some_and(|(time, _)| { - *time - > SimClock.millis_since_start(RealClock.now()) - + training_script_waiting_time + *time > RealClock.now() + training_script_waiting_time }) { tokio::task::yield_now().await; @@ -705,8 +705,7 @@ impl SimNet { continue; }; if training_script_state_rx.borrow().is_waiting() { - let advanced_time = - scheduled_time - SimClock.millis_since_start(SimClock.now()); + let advanced_time = scheduled_time - SimClock.now(); training_script_waiting_time += advanced_time; } SimClock.advance_to(scheduled_time); @@ -749,9 +748,9 @@ pub struct SimulatorEventRecord { /// Event dependent summary for user pub summary: String, /// The time at which the message delivery was started. - pub start_at: SimulatorTimeInstant, + pub start_at: u64, /// The time at which the message was delivered to the receiver. - pub end_at: SimulatorTimeInstant, + pub end_at: u64, } /// A configuration for the network topology. @@ -805,7 +804,7 @@ mod tests { src_addr: SimAddr, dest_addr: SimAddr, data: Serialized, - duration_ms: u64, + duration: tokio::time::Duration, dispatcher: Option, } @@ -823,8 +822,8 @@ mod tests { } Ok(()) } - fn duration_ms(&self) -> u64 { - self.duration_ms + fn duration(&self) -> tokio::time::Duration { + self.duration } fn summary(&self) -> String { @@ -840,12 +839,12 @@ mod tests { src: self.src_addr.addr().clone(), dst: self.dest_addr.addr().clone(), }; - self.duration_ms = config + self.duration = config .lock() .await .topology .get(&edge) - .map_or_else(|| 1, |v| v.latency.as_millis() as u64); + .map_or_else(|| tokio::time::Duration::from_millis(1), |v| v.latency); } } @@ -860,7 +859,7 @@ mod tests { src_addr, dest_addr, data, - duration_ms: 1, + duration: tokio::time::Duration::from_millis(1), dispatcher, } } @@ -1132,12 +1131,18 @@ mod tests { start(); let start = SimClock.now(); - assert_eq!(SimClock.millis_since_start(start), 0); + assert_eq!( + SimClock.duration_since_start(start), + tokio::time::Duration::ZERO + ); SimClock.sleep(tokio::time::Duration::from_secs(10)).await; let end = SimClock.now(); - assert_eq!(SimClock.millis_since_start(end), 10000); + assert_eq!( + SimClock.duration_since_start(end), + tokio::time::Duration::from_secs(10) + ); } #[tokio::test] From 082ee4a927a53376f08573b941ed49d973468a6b Mon Sep 17 00:00:00 2001 From: Thomas Wang Date: Wed, 20 Aug 2025 11:04:13 -0700 Subject: [PATCH 2/4] Use tokio::oneshot for sim clock Summary: There was an open TODO to remove the global mailbox for SimClock. We don't actually even need mailboxes for sim clock and a oneshot works just fine Differential Revision: D80029571 --- hyperactor/src/channel/sim.rs | 2 +- hyperactor/src/clock.rs | 98 +++++++++++++++++++---------------- hyperactor/src/simnet.rs | 64 +++++------------------ 3 files changed, 66 insertions(+), 98 deletions(-) diff --git a/hyperactor/src/channel/sim.rs b/hyperactor/src/channel/sim.rs index 724a7b67d..1ec2393f3 100644 --- a/hyperactor/src/channel/sim.rs +++ b/hyperactor/src/channel/sim.rs @@ -145,7 +145,7 @@ impl MessageDeliveryEvent { #[async_trait] impl Event for MessageDeliveryEvent { - async fn handle(&self) -> Result<(), SimNetError> { + async fn handle(&mut self) -> Result<(), SimNetError> { // Send the message to the correct receiver. SENDER .send( diff --git a/hyperactor/src/clock.rs b/hyperactor/src/clock.rs index 70fdb25f4..b05152e63 100644 --- a/hyperactor/src/clock.rs +++ b/hyperactor/src/clock.rs @@ -12,25 +12,17 @@ use std::error::Error; use std::fmt; use std::sync::LazyLock; use std::sync::Mutex; -use std::sync::OnceLock; use std::time::SystemTime; +use async_trait::async_trait; use futures::pin_mut; use hyperactor_telemetry::TelemetryClock; use serde::Deserialize; use serde::Serialize; -use crate::Mailbox; use crate::channel::ChannelAddr; -use crate::data::Named; -use crate::id; -use crate::mailbox::DeliveryError; -use crate::mailbox::MailboxSender; -use crate::mailbox::MessageEnvelope; -use crate::mailbox::Undeliverable; -use crate::mailbox::UndeliverableMailboxSender; -use crate::mailbox::monitored_return_handle; -use crate::simnet::SleepEvent; +use crate::simnet::Event; +use crate::simnet::SimNetError; use crate::simnet::simnet_handle; struct SimTime { @@ -183,6 +175,45 @@ impl ClockKind { } } +#[derive(Debug)] +struct SleepEvent { + done_tx: Option>, + duration: tokio::time::Duration, +} + +impl SleepEvent { + pub(crate) fn new( + done_tx: tokio::sync::oneshot::Sender<()>, + duration: tokio::time::Duration, + ) -> Box { + Box::new(Self { + done_tx: Some(done_tx), + duration, + }) + } +} + +#[async_trait] +impl Event for SleepEvent { + async fn handle(&mut self) -> Result<(), SimNetError> { + self.done_tx + .take() + .unwrap() + .send(()) + .map_err(|_| SimNetError::PanickedTask)?; + + Ok(()) + } + + fn duration(&self) -> tokio::time::Duration { + self.duration + } + + fn summary(&self) -> String { + format!("Sleeping for {} ms", self.duration.as_millis()) + } +} + /// Clock to be used in simulator runs that allows the simnet to create a scheduled event for. /// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's /// time to the wakeup time and use the transmitter to wake up this green thread @@ -192,25 +223,25 @@ pub struct SimClock; impl Clock for SimClock { /// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet async fn sleep(&self, duration: tokio::time::Duration) { - let mailbox = SimClock::mailbox().clone(); - let (tx, rx) = mailbox.open_once_port::<()>(); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); simnet_handle() .unwrap() - .send_event(SleepEvent::new(tx.bind(), mailbox, duration)) + .send_event(SleepEvent::new(tx, duration)) .unwrap(); - rx.recv().await.unwrap(); + + rx.await.unwrap(); } async fn non_advancing_sleep(&self, duration: tokio::time::Duration) { - let mailbox = SimClock::mailbox().clone(); - let (tx, rx) = mailbox.open_once_port::<()>(); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); simnet_handle() .unwrap() - .send_nonadvanceable_event(SleepEvent::new(tx.bind(), mailbox, duration)) + .send_nonadvanceable_event(SleepEvent::new(tx, duration)) .unwrap(); - rx.recv().await.unwrap(); + + rx.await.unwrap(); } async fn sleep_until(&self, deadline: tokio::time::Instant) { @@ -234,19 +265,18 @@ impl Clock for SimClock { where F: std::future::Future, { - let mailbox = SimClock::mailbox().clone(); - let (tx, deadline_rx) = mailbox.open_once_port::<()>(); + let (tx, deadline_rx) = tokio::sync::oneshot::channel::<()>(); simnet_handle() .unwrap() - .send_event(SleepEvent::new(tx.bind(), mailbox, duration)) + .send_event(SleepEvent::new(tx, duration)) .unwrap(); let fut = f; pin_mut!(fut); tokio::select! { - _ = deadline_rx.recv() => { + _ = deadline_rx => { Err(TimeoutError) } res = &mut fut => Ok(res) @@ -255,28 +285,6 @@ impl Clock for SimClock { } impl SimClock { - // TODO (SF, 2025-07-11): Remove this global, thread through a mailbox - // from upstack and handle undeliverable messages properly. - fn mailbox() -> &'static Mailbox { - static SIMCLOCK_MAILBOX: OnceLock = OnceLock::new(); - SIMCLOCK_MAILBOX.get_or_init(|| { - let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone()); - let (undeliverable_messages, mut rx) = - mailbox.open_port::>(); - undeliverable_messages.bind_to(Undeliverable::::port()); - tokio::spawn(async move { - while let Ok(Undeliverable(mut envelope)) = rx.recv().await { - envelope.try_set_error(DeliveryError::BrokenLink( - "message returned to undeliverable port".to_string(), - )); - UndeliverableMailboxSender - .post(envelope, /*unused */ monitored_return_handle()) - } - }); - mailbox - }) - } - /// Advance the sumulator's time to the specified instant pub fn advance_to(&self, time: tokio::time::Instant) { let mut guard = SIM_TIME.now.lock().unwrap(); diff --git a/hyperactor/src/simnet.rs b/hyperactor/src/simnet.rs index 1ee69729a..19db9691a 100644 --- a/hyperactor/src/simnet.rs +++ b/hyperactor/src/simnet.rs @@ -88,12 +88,12 @@ pub trait Event: Send + Sync + Debug { /// For a proc spawn, it will be creating the proc object and instantiating it. /// For any event that manipulates the network (like adding/removing nodes etc.) /// implement handle_network(). - async fn handle(&self) -> Result<(), SimNetError>; + async fn handle(&mut self) -> Result<(), SimNetError>; /// This is the method that will be called when the simulator fires the event /// Unless you need to make changes to the network, you do not have to implement this. /// Only implement handle() method for all non-simnet requirements. - async fn handle_network(&self, _phantom: &SimNet) -> Result<(), SimNetError> { + async fn handle_network(&mut self, _phantom: &SimNet) -> Result<(), SimNetError> { self.handle().await } @@ -117,11 +117,11 @@ struct NodeJoinEvent { #[async_trait] impl Event for NodeJoinEvent { - async fn handle(&self) -> Result<(), SimNetError> { + async fn handle(&mut self) -> Result<(), SimNetError> { Ok(()) } - async fn handle_network(&self, simnet: &SimNet) -> Result<(), SimNetError> { + async fn handle_network(&mut self, simnet: &SimNet) -> Result<(), SimNetError> { simnet.bind(self.channel_addr.clone()).await; self.handle().await } @@ -135,50 +135,6 @@ impl Event for NodeJoinEvent { } } -#[derive(Debug)] -pub(crate) struct SleepEvent { - done_tx: OncePortRef<()>, - mailbox: Mailbox, - duration: tokio::time::Duration, -} - -impl SleepEvent { - pub(crate) fn new( - done_tx: OncePortRef<()>, - mailbox: Mailbox, - duration: tokio::time::Duration, - ) -> Box { - Box::new(Self { - done_tx, - mailbox, - duration, - }) - } -} - -#[async_trait] -impl Event for SleepEvent { - async fn handle(&self) -> Result<(), SimNetError> { - Ok(()) - } - - async fn handle_network(&self, _simnet: &SimNet) -> Result<(), SimNetError> { - self.done_tx - .clone() - .send(&self.mailbox, ()) - .map_err(|_err| SimNetError::Closed("TODO".to_string()))?; - Ok(()) - } - - fn duration(&self) -> tokio::time::Duration { - self.duration - } - - fn summary(&self) -> String { - format!("Sleeping for {} ms", self.duration.as_millis()) - } -} - #[derive(Debug)] /// A pytorch operation pub struct TorchOpEvent { @@ -192,11 +148,11 @@ pub struct TorchOpEvent { #[async_trait] impl Event for TorchOpEvent { - async fn handle(&self) -> Result<(), SimNetError> { + async fn handle(&mut self) -> Result<(), SimNetError> { Ok(()) } - async fn handle_network(&self, _simnet: &SimNet) -> Result<(), SimNetError> { + async fn handle_network(&mut self, _simnet: &SimNet) -> Result<(), SimNetError> { self.done_tx .clone() .send(&self.mailbox, ()) @@ -308,6 +264,10 @@ pub enum SimNetError { /// SimnetHandle being accessed without starting simnet #[error("simnet not started")] NotStarted, + + /// A task has panicked. + #[error("panicked task")] + PanickedTask, } struct State { @@ -709,7 +669,7 @@ impl SimNet { training_script_waiting_time += advanced_time; } SimClock.advance_to(scheduled_time); - for scheduled_event in scheduled_events { + for mut scheduled_event in scheduled_events { self.pending_event_count .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); if scheduled_event.event.handle_network(self).await.is_err() { @@ -810,7 +770,7 @@ mod tests { #[async_trait] impl Event for MessageDeliveryEvent { - async fn handle(&self) -> Result<(), simnet::SimNetError> { + async fn handle(&mut self) -> Result<(), simnet::SimNetError> { if let Some(dispatcher) = &self.dispatcher { dispatcher .send( From 514fed6302a8ea3109671c3b8ffd8e53251ba4f6 Mon Sep 17 00:00:00 2001 From: Thomas Wang Date: Wed, 20 Aug 2025 11:04:13 -0700 Subject: [PATCH 3/4] Configurable debounce (#854) Summary: Pull Request resolved: https://github.com/meta-pytorch/monarch/pull/854 When we increase the number of actors in our simulation it takes longer for all the events at a certain time to complete so we need to wait for longer. If we wait to long then the simulation just runs slower than it needs to so its nice to make this configurable. In the long term we will come up with a more robust solution to this but in the meantime that is not a priority. See EX528476 to understand the underlying problem the debounce is remedying Differential Revision: D80137965 Reviewed By: pablorfb-meta --- hyperactor/src/simnet.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/hyperactor/src/simnet.rs b/hyperactor/src/simnet.rs index 19db9691a..2e03eab1e 100644 --- a/hyperactor/src/simnet.rs +++ b/hyperactor/src/simnet.rs @@ -569,6 +569,12 @@ impl SimNet { let mut training_script_waiting_time = tokio::time::Duration::from_millis(0); // Duration elapsed while only non_advanceable_events has events let mut debounce_timer: Option = None; + + let debounce_duration = std::env::var("SIM_DEBOUNCE") + .ok() + .and_then(|val| val.parse::().ok()) + .unwrap_or(1); + 'outer: loop { // Check if we should stop if stop_signal.load(Ordering::SeqCst) { @@ -576,7 +582,10 @@ impl SimNet { } while let Ok(Some((event, advanceable, time))) = RealClock - .timeout(tokio::time::Duration::from_millis(1), event_rx.recv()) + .timeout( + tokio::time::Duration::from_millis(debounce_duration), + event_rx.recv(), + ) .await { let scheduled_event = match time { From d54780207c10bbde948ffc733eb2a3f00d82124b Mon Sep 17 00:00:00 2001 From: Thomas Wang Date: Wed, 20 Aug 2025 11:10:13 -0700 Subject: [PATCH 4/4] Compute resource aware simulation (#857) Summary: Pull Request resolved: https://github.com/meta-pytorch/monarch/pull/857 The sim allocator will now register the location (region, dc, zone, rack, host, gpu) of every ProcId upon creation with the simnet. Reviewed By: pablorfb-meta Differential Revision: D80137963 --- hyperactor/src/simnet.rs | 9 ++++++++ hyperactor_mesh/src/alloc/sim.rs | 32 ++++++++++++++++++++------ python/monarch/_src/actor/proc_mesh.py | 22 ++++++++++++++++-- 3 files changed, 54 insertions(+), 9 deletions(-) diff --git a/hyperactor/src/simnet.rs b/hyperactor/src/simnet.rs index 2e03eab1e..41d5b5c2b 100644 --- a/hyperactor/src/simnet.rs +++ b/hyperactor/src/simnet.rs @@ -26,6 +26,7 @@ use async_trait::async_trait; use dashmap::DashMap; use dashmap::DashSet; use enum_as_inner::EnumAsInner; +use ndslice::view::Point; use serde::Deserialize; use serde::Deserializer; use serde::Serialize; @@ -43,6 +44,7 @@ use tokio::time::interval; use crate::ActorId; use crate::Mailbox; use crate::OncePortRef; +use crate::ProcId; use crate::channel::ChannelAddr; use crate::clock::Clock; use crate::clock::RealClock; @@ -299,6 +301,7 @@ pub struct SimNetHandle { training_script_state_tx: tokio::sync::watch::Sender, /// Signal to stop the simnet loop stop_signal: Arc, + resources: DashMap, } impl SimNetHandle { @@ -409,6 +412,11 @@ impl SimNetHandle { "timeout waiting for received events to be scheduled".to_string(), )) } + + /// Register the location in resource space for a Proc + pub fn register_proc(&self, proc_id: ProcId, point: Point) { + self.resources.insert(proc_id, point); + } } pub(crate) type Topology = DashMap; @@ -482,6 +490,7 @@ pub fn start() { pending_event_count, training_script_state_tx, stop_signal, + resources: DashMap::new(), }); } diff --git a/hyperactor_mesh/src/alloc/sim.rs b/hyperactor_mesh/src/alloc/sim.rs index 03bf946bf..fbd7a54cd 100644 --- a/hyperactor_mesh/src/alloc/sim.rs +++ b/hyperactor_mesh/src/alloc/sim.rs @@ -11,6 +11,7 @@ #![allow(dead_code)] // until it is used outside of testing use async_trait::async_trait; +use hyperactor::ProcId; use hyperactor::WorldId; use hyperactor::channel::ChannelAddr; use hyperactor::channel::ChannelTransport; @@ -61,12 +62,23 @@ pub struct SimAlloc { impl SimAlloc { fn new(spec: AllocSpec) -> Self { - Self { - inner: LocalAlloc::new_with_transport( - spec, - ChannelTransport::Sim(Box::new(ChannelTransport::Unix)), - ), - } + let inner = LocalAlloc::new_with_transport( + spec, + ChannelTransport::Sim(Box::new(ChannelTransport::Unix)), + ); + let client_proc_id = ProcId::Ranked(WorldId(format!("{}_manager", inner.name())), 0); + + let ext = inner.extent(); + + hyperactor::simnet::simnet_handle() + .expect("simnet event loop not running") + .register_proc( + client_proc_id.clone(), + ext.point(ext.sizes().iter().map(|_| 0).collect()) + .expect("should be valid point"), + ); + + Self { inner } } /// A chaos monkey that can be used to stop procs at random. pub(crate) fn chaos_monkey(&self) -> impl Fn(usize, ProcStopReason) + 'static { @@ -90,7 +102,13 @@ impl SimAlloc { #[async_trait] impl Alloc for SimAlloc { async fn next(&mut self) -> Option { - self.inner.next().await + let proc_state = self.inner.next().await; + if let Some(ProcState::Created { proc_id, point, .. }) = &proc_state { + hyperactor::simnet::simnet_handle() + .expect("simnet event loop not running") + .register_proc(proc_id.clone(), point.clone()); + } + proc_state } fn extent(&self) -> &Extent { diff --git a/python/monarch/_src/actor/proc_mesh.py b/python/monarch/_src/actor/proc_mesh.py index 6aee0a973..7b4efdd8b 100644 --- a/python/monarch/_src/actor/proc_mesh.py +++ b/python/monarch/_src/actor/proc_mesh.py @@ -548,8 +548,26 @@ def local_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh: return _proc_mesh_from_allocator(allocator=LocalAllocator(), gpus=gpus, hosts=hosts) -def sim_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh: - return _proc_mesh_from_allocator(allocator=SimAllocator(), gpus=gpus, hosts=hosts) +def sim_proc_mesh( + *, + gpus: int = 1, + hosts: int = 1, + racks: int = 1, + zones: int = 1, + dcs: int = 1, + regions: int = 1, +) -> ProcMesh: + spec: AllocSpec = AllocSpec( + AllocConstraints(), + hosts=hosts, + gpus=gpus, + racks=racks, + zones=zones, + dcs=dcs, + regions=regions, + ) + alloc = SimAllocator().allocate(spec) + return ProcMesh.from_alloc(alloc, None, True) _BOOTSTRAP_MAIN = "monarch._src.actor.bootstrap_main"