@@ -176,14 +176,32 @@ impl PythonActorMesh {
176
176
. map ( PyActorId :: from) )
177
177
}
178
178
179
- // Start monitoring the actor mesh by subscribing to its supervision events. For each supervision
180
- // event, it is consumed by PythonActorMesh first, then gets sent to the monitor for user to consume.
181
- fn monitor < ' py > ( & self , py : Python < ' py > ) -> PyResult < PyObject > {
182
- let receiver = self . user_monitor_sender . subscribe ( ) ;
183
- let monitor_instance = PyActorMeshMonitor {
184
- receiver : SharedCell :: from ( Mutex :: new ( receiver) ) ,
179
+ fn supervise_port < ' py > (
180
+ & self ,
181
+ py : Python < ' py > ,
182
+ receiver : & PythonPortReceiver ,
183
+ ) -> PyResult < PyObject > {
184
+ let rx = MonitoredPythonPortReceiver {
185
+ inner : receiver. inner ( ) ,
186
+ monitor : ActorMeshMonitor {
187
+ receiver : SharedCell :: from ( Mutex :: new ( self . user_monitor_sender . subscribe ( ) ) ) ,
188
+ } ,
185
189
} ;
186
- Ok ( monitor_instance. into_py ( py) )
190
+ rx. into_py_any ( py)
191
+ }
192
+
193
+ fn supervise_once_port < ' py > (
194
+ & self ,
195
+ py : Python < ' py > ,
196
+ receiver : & PythonOncePortReceiver ,
197
+ ) -> PyResult < PyObject > {
198
+ let rx = MonitoredPythonOncePortReceiver {
199
+ inner : receiver. inner ( ) ,
200
+ monitor : ActorMeshMonitor {
201
+ receiver : SharedCell :: from ( Mutex :: new ( self . user_monitor_sender . subscribe ( ) ) ) ,
202
+ } ,
203
+ } ;
204
+ rx. into_py_any ( py)
187
205
}
188
206
189
207
#[ pyo3( signature = ( * * kwargs) ) ]
@@ -335,83 +353,46 @@ impl Drop for PythonActorMesh {
335
353
}
336
354
}
337
355
338
- #[ pyclass(
339
- name = "ActorMeshMonitor" ,
340
- module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
341
- ) ]
342
- pub struct PyActorMeshMonitor {
356
+ #[ derive( Debug , Clone ) ]
357
+ struct ActorMeshMonitor {
343
358
receiver : SharedCell < Mutex < tokio:: sync:: broadcast:: Receiver < Option < ActorSupervisionEvent > > > > ,
344
359
}
345
360
346
- #[ pymethods]
347
- impl PyActorMeshMonitor {
348
- fn __aiter__ ( slf : PyRef < ' _ , Self > ) -> PyRef < ' _ , Self > {
349
- slf
350
- }
351
-
352
- pub fn __anext__ ( & self , py : Python < ' _ > ) -> PyResult < PyObject > {
361
+ impl ActorMeshMonitor {
362
+ pub async fn next ( & self ) -> PyActorSupervisionEvent {
353
363
let receiver = self . receiver . clone ( ) ;
354
- Ok ( pyo3_async_runtimes:: tokio:: future_into_py ( py, get_next ( receiver) ) ?. into ( ) )
355
- }
356
- }
357
-
358
- impl PyActorMeshMonitor {
359
- pub async fn next ( & self ) -> PyResult < PyObject > {
360
- get_next ( self . receiver . clone ( ) ) . await
361
- }
362
- }
363
-
364
- impl Clone for PyActorMeshMonitor {
365
- fn clone ( & self ) -> Self {
366
- Self {
367
- receiver : self . receiver . clone ( ) ,
364
+ let receiver = receiver
365
+ . borrow ( )
366
+ . expect ( "`Actor mesh receiver` is shutdown" ) ;
367
+ let mut receiver = receiver. lock ( ) . await ;
368
+ let event = receiver. recv ( ) . await . unwrap ( ) ;
369
+ match event {
370
+ None => PyActorSupervisionEvent {
371
+ // Dummy actor as place holder to indicate the whole mesh is stopped
372
+ // TODO(albertli): remove this when pushing all supervision logic to rust.
373
+ actor_id : id ! ( default [ 0 ] . actor[ 0 ] ) . into ( ) ,
374
+ actor_status : "actor mesh is stopped due to proc mesh shutdown" . to_string ( ) ,
375
+ } ,
376
+ Some ( event) => PyActorSupervisionEvent :: from ( event. clone ( ) ) ,
368
377
}
369
378
}
370
379
}
371
380
372
- async fn get_next (
373
- receiver : SharedCell < Mutex < tokio:: sync:: broadcast:: Receiver < Option < ActorSupervisionEvent > > > > ,
374
- ) -> PyResult < PyObject > {
375
- let receiver = receiver. clone ( ) ;
376
-
377
- let receiver = receiver
378
- . borrow ( )
379
- . expect ( "`Actor mesh receiver` is shutdown" ) ;
380
- let mut receiver = receiver. lock ( ) . await ;
381
- let event = receiver. recv ( ) . await . unwrap ( ) ;
382
-
383
- let supervision_event = match event {
384
- None => PyActorSupervisionEvent {
385
- // Dummy actor as place holder to indicate the whole mesh is stopped
386
- // TODO(albertli): remove this when pushing all supervision logic to rust.
387
- actor_id : id ! ( default [ 0 ] . actor[ 0 ] ) . into ( ) ,
388
- actor_status : "actor mesh is stopped due to proc mesh shutdown" . to_string ( ) ,
389
- } ,
390
- Some ( event) => PyActorSupervisionEvent :: from ( event. clone ( ) ) ,
391
- } ;
392
-
393
- Python :: with_gil ( |py| supervision_event. into_py_any ( py) )
394
- }
395
-
396
- // TODO(albertli): this is temporary remove this when pushing all supervision logic to rust.
381
+ // Values of this (private) type can only be created by calling
382
+ // `PythonActorMesh::supervise_port()`.
397
383
#[ pyclass(
398
384
name = "MonitoredPortReceiver" ,
399
385
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
400
386
) ]
401
- pub ( super ) struct MonitoredPythonPortReceiver {
387
+ struct MonitoredPythonPortReceiver {
402
388
inner : Arc < tokio:: sync:: Mutex < PortReceiver < PythonMessage > > > ,
403
- monitor : PyActorMeshMonitor ,
389
+ monitor : ActorMeshMonitor ,
404
390
}
405
391
406
392
#[ pymethods]
407
393
impl MonitoredPythonPortReceiver {
408
- #[ new]
409
- fn new ( receiver : & PythonPortReceiver , monitor : & PyActorMeshMonitor ) -> Self {
410
- let inner = receiver. inner ( ) ;
411
- MonitoredPythonPortReceiver {
412
- inner,
413
- monitor : monitor. clone ( ) ,
414
- }
394
+ fn __repr__ ( & self ) -> & ' static str {
395
+ "<MonitoredPortReceiver>"
415
396
}
416
397
417
398
fn recv < ' py > ( & mut self , py : Python < ' py > ) -> PyResult < Bound < ' py , PyAny > > {
@@ -424,7 +405,7 @@ impl MonitoredPythonPortReceiver {
424
405
result. map_err( |err| PyErr :: new:: <PyEOFError , _>( format!( "port closed: {}" , err) ) )
425
406
}
426
407
event = monitor. next( ) => {
427
- Err ( PyErr :: new:: <SupervisionError , _>( format!( "supervision error: {:?}" , event. unwrap ( ) ) ) )
408
+ Err ( PyErr :: new:: <SupervisionError , _>( format!( "supervision error: {:?}" , event) ) )
428
409
}
429
410
}
430
411
} )
@@ -440,31 +421,28 @@ impl MonitoredPythonPortReceiver {
440
421
result. map_err( |err| PyErr :: new:: <PyEOFError , _>( format!( "port closed: {}" , err) ) )
441
422
}
442
423
event = monitor. next( ) => {
443
- Err ( PyErr :: new:: <SupervisionError , _>( format!( "supervision error: {:?}" , event. unwrap ( ) ) ) )
424
+ Err ( PyErr :: new:: <SupervisionError , _>( format!( "supervision error: {:?}" , event) ) )
444
425
}
445
426
}
446
427
} ) ?
447
428
}
448
429
}
449
430
431
+ // Values of this (private) type can only be created by calling
432
+ // `PythonActorMesh::supervise_once_port()`.
450
433
#[ pyclass(
451
434
name = "MonitoredOncePortReceiver" ,
452
435
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
453
436
) ]
454
- pub ( super ) struct MonitoredPythonOncePortReceiver {
437
+ struct MonitoredPythonOncePortReceiver {
455
438
inner : Arc < std:: sync:: Mutex < Option < OncePortReceiver < PythonMessage > > > > ,
456
- monitor : PyActorMeshMonitor ,
439
+ monitor : ActorMeshMonitor ,
457
440
}
458
441
459
442
#[ pymethods]
460
443
impl MonitoredPythonOncePortReceiver {
461
- #[ new]
462
- fn new ( receiver : & PythonOncePortReceiver , monitor : & PyActorMeshMonitor ) -> Self {
463
- let inner = receiver. inner ( ) ;
464
- MonitoredPythonOncePortReceiver {
465
- inner,
466
- monitor : monitor. clone ( ) ,
467
- }
444
+ fn __repr__ ( & self ) -> & ' static str {
445
+ "<MonitoredOncePortReceiver>"
468
446
}
469
447
470
448
fn recv < ' py > ( & mut self , py : Python < ' py > ) -> PyResult < Bound < ' py , PyAny > > {
@@ -478,7 +456,7 @@ impl MonitoredPythonOncePortReceiver {
478
456
result. map_err( |err| PyErr :: new:: <PyEOFError , _>( format!( "port closed: {}" , err) ) )
479
457
}
480
458
event = monitor. next( ) => {
481
- Err ( PyErr :: new:: <SupervisionError , _>( format!( "supervision error: {:?}" , event. unwrap ( ) ) ) )
459
+ Err ( PyErr :: new:: <SupervisionError , _>( format!( "supervision error: {:?}" , event) ) )
482
460
}
483
461
}
484
462
} )
@@ -495,7 +473,7 @@ impl MonitoredPythonOncePortReceiver {
495
473
result. map_err( |err| PyErr :: new:: <PyEOFError , _>( format!( "port closed: {}" , err) ) )
496
474
}
497
475
event = monitor. next( ) => {
498
- Err ( PyErr :: new:: <SupervisionError , _>( format!( "supervision error: {:?}" , event. unwrap ( ) ) ) )
476
+ Err ( PyErr :: new:: <SupervisionError , _>( format!( "supervision error: {:?}" , event) ) )
499
477
}
500
478
}
501
479
} ) ?
@@ -506,6 +484,7 @@ impl MonitoredPythonOncePortReceiver {
506
484
name = "ActorSupervisionEvent" ,
507
485
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
508
486
) ]
487
+ #[ derive( Debug ) ]
509
488
pub struct PyActorSupervisionEvent {
510
489
/// Actor ID of the actor where supervision event originates from.
511
490
#[ pyo3( get) ]
@@ -538,7 +517,6 @@ impl From<ActorSupervisionEvent> for PyActorSupervisionEvent {
538
517
pub fn register_python_bindings ( hyperactor_mod : & Bound < ' _ , PyModule > ) -> PyResult < ( ) > {
539
518
hyperactor_mod. add_class :: < PythonActorMesh > ( ) ?;
540
519
hyperactor_mod. add_class :: < PythonActorMeshRef > ( ) ?;
541
- hyperactor_mod. add_class :: < PyActorMeshMonitor > ( ) ?;
542
520
hyperactor_mod. add_class :: < MonitoredPythonPortReceiver > ( ) ?;
543
521
hyperactor_mod. add_class :: < MonitoredPythonOncePortReceiver > ( ) ?;
544
522
hyperactor_mod. add_class :: < PyActorSupervisionEvent > ( ) ?;
0 commit comments