Skip to content

Allow passing a lamda to from_alloc to set env vars based on MonarchContext #622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 74 additions & 11 deletions python/monarch/_src/actor/proc_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from typing import (
Any,
Callable,
cast,
Dict,
List,
Expand Down Expand Up @@ -41,7 +42,9 @@
_ActorMeshRefImpl,
Actor,
ActorMeshRef,
endpoint,
fake_sync_state,
MonarchContext,
)
from monarch._src.actor.allocator import LocalAllocator, ProcessAllocator, SimAllocator
from monarch._src.actor.code_sync import (
Expand All @@ -50,7 +53,6 @@
WorkspaceLocation,
WorkspaceShape,
)
from monarch._src.actor.code_sync.auto_reload import AutoReloadActor
from monarch._src.actor.debugger import (
_DEBUG_MANAGER_ACTOR_NAME,
DebugClient,
Expand Down Expand Up @@ -79,6 +81,27 @@
DeviceMesh = Any


class SetupActor(Actor):
"""
A helper actor to setup the proc mesh with user defined setup method.
Typically used to setup the environment variables.
"""

def __init__(self, env: Callable[[MonarchContext], None]) -> None:
"""
Initialize the setup actor with the user defined setup method.
"""
self._setup_method = env

@endpoint
async def setup(self) -> None:
"""
Call the user defined setup method with the monarch context.
"""
ctx = MonarchContext.get()
self._setup_method(ctx)


T = TypeVar("T")
try:
from __manifest__ import fbmake # noqa
Expand All @@ -88,8 +111,20 @@
IN_PAR = False


async def _allocate_nonblocking(alloc: Alloc) -> "ProcMesh":
return ProcMesh(await HyProcMesh.allocate_nonblocking(alloc))
async def _allocate_nonblocking(
alloc: Alloc, setup: Callable[[MonarchContext], None] | None = None
) -> "ProcMesh":
_proc_mesh = await HyProcMesh.allocate_nonblocking(alloc)
if setup is None:
return ProcMesh(_proc_mesh)
# If the user has passed the setup lambda, we need to call
# it here before any of the other actors are spawned so that
# the environment variables are set up before cuda init.
proc_mesh = ProcMesh(_proc_mesh)
setup_actor = await proc_mesh.spawn("setup", SetupActor, setup)
await setup_actor.setup.call()
del setup_actor
return proc_mesh


class ProcMesh(MeshTrait):
Expand Down Expand Up @@ -171,9 +206,19 @@ async def monitor_loop(monitor):
return await self._proc_mesh.monitor()

@classmethod
def from_alloc(self, alloc: Alloc) -> Future["ProcMesh"]:
def from_alloc(
self, alloc: Alloc, setup: Callable[[MonarchContext], None] | None = None
) -> Future["ProcMesh"]:
"""
Allocate a process mesh according to the provided alloc.
Returns when the mesh is fully allocated.

Arguments:
- `alloc`: The alloc to allocate according to.
- `setup`: A lambda taking MonarchContext as param, can be used to setup env vars on the allocated mesh
"""
return Future(
impl=lambda: _allocate_nonblocking(alloc),
impl=lambda: _allocate_nonblocking(alloc, setup),
requires_loop=False,
)

Expand Down Expand Up @@ -366,7 +411,11 @@ def _get_bootstrap_args() -> tuple[str, Optional[list[str]], dict[str, str]]:


async def proc_mesh_nonblocking(
*, gpus: Optional[int] = None, hosts: int = 1, env: Optional[dict[str, str]] = None
*,
gpus: Optional[int] = None,
hosts: int = 1,
env: dict[str, str] | None = None,
setup: Callable[[MonarchContext], None] | None = None,
) -> ProcMesh:
if gpus is None:
gpus = _local_device_count()
Expand All @@ -375,18 +424,32 @@ async def proc_mesh_nonblocking(
# in the order of the dimensions.
spec = AllocSpec(AllocConstraints(), hosts=hosts, gpus=gpus)
env = env or {}
cmd, args, base_env = _get_bootstrap_args()
env.update(base_env)
# Todo: Deprecate the env field from the ProcessAllocator
# The PAR_MAIN_OVERRIDE needs to be passed as an env
# to the proc mesh construction in rust, so can not be moved to the
# SetupActor yet
cmd, args, bootstrap_env = _get_bootstrap_args()
env.update(bootstrap_env)
allocator = ProcessAllocator(cmd, args, env)
alloc = await allocator.allocate(spec)
return await ProcMesh.from_alloc(alloc)

return await ProcMesh.from_alloc(
alloc,
setup=setup,
)


def proc_mesh(
*, gpus: Optional[int] = None, hosts: int = 1, env: Optional[dict[str, str]] = None
*,
gpus: Optional[int] = None,
hosts: int = 1,
env: dict[str, str] | None = None,
setup: Callable[[MonarchContext], None] | None = None,
) -> Future[ProcMesh]:
return Future(
impl=lambda: proc_mesh_nonblocking(gpus=gpus, hosts=hosts, env=env),
impl=lambda: proc_mesh_nonblocking(
gpus=gpus, hosts=hosts, env=env, setup=setup
),
requires_loop=False,
)

Expand Down
127 changes: 126 additions & 1 deletion python/tests/test_allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
ChannelTransport,
)

from monarch._src.actor.actor_mesh import MonarchContext
from monarch._src.actor.allocator import (
ALLOC_LABEL_PROC_MESH_NAME,
LocalAllocator,
RemoteAllocator,
StaticRemoteAllocInitializer,
TorchXRemoteAllocInitializer,
Expand All @@ -60,6 +62,18 @@
UNUSED = "__UNUSED__"


class EnvCheckActor(Actor):
"""Actor that checks for the presence of an environment variable"""

def __init__(self) -> None:
pass

@endpoint
async def get_env_var(self, var_name: str) -> str:
"""Return the value of the specified environment variable or 'NOT_SET' if not found"""
return os.environ.get(var_name, "NOT_SET")


class TestActor(Actor):
"""Silly actor that computes the world size by all-reducing rank-hot tensors"""

Expand Down Expand Up @@ -128,6 +142,82 @@ def remote_process_allocator(
process_allocator.kill()


class TestSetupActorInAllocator(unittest.IsolatedAsyncioTestCase):
@classmethod
def setUpClass(cls) -> None:
cloudpickle.register_pickle_by_value(sys.modules[TestActor.__module__])

@classmethod
def tearDownClass(cls) -> None:
cloudpickle.unregister_pickle_by_value(sys.modules[TestActor.__module__])

async def test_setup_lambda_with_multiple_env_vars(self) -> None:
"""Test that the setup lambda can set multiple environment variables"""
env_vars: dict[str, str] = {
"TEST_ENV_VAR_1": "value_1",
"TEST_ENV_VAR_2": "value_2",
"TEST_ENV_VAR_3": "value_3",
}

def setup_multiple_env_vars(ctx: MonarchContext) -> None:
for name, value in env_vars.items():
os.environ[name] = value

spec = AllocSpec(AllocConstraints(), gpus=1, hosts=1)
allocator = LocalAllocator()
alloc = await allocator.allocate(spec)

proc_mesh = await ProcMesh.from_alloc(alloc, setup=setup_multiple_env_vars)

try:
actor = await proc_mesh.spawn("env_check", EnvCheckActor)

for name, expected_value in env_vars.items():
actual_value = await actor.get_env_var.call_one(name)
self.assertEqual(
actual_value,
expected_value,
f"Environment variable {name} was not set correctly",
)
finally:
await proc_mesh.stop()

async def test_setup_lambda_with_context_info(self) -> None:
"""Test that the setup lambda can access context information"""
context_var_name: str = "PROC_MESH_CONTEXT_INFO"

def setup_with_context(ctx: MonarchContext) -> None:
context_info = f"proc_id:{ctx.proc_id},point_rank:{ctx.point.rank}"
os.environ[context_var_name] = context_info

spec = AllocSpec(AllocConstraints(), gpus=1, hosts=1)
allocator = LocalAllocator()
alloc = await allocator.allocate(spec)

proc_mesh = await ProcMesh.from_alloc(alloc, setup=setup_with_context)

try:
actor = await proc_mesh.spawn("env_check", EnvCheckActor)

context_info = await actor.get_env_var.call_one(context_var_name)

self.assertNotEqual(
context_info,
"NOT_SET",
"Context information was not stored in the environment variable",
)
self.assertIn(
"proc_id:", context_info, "Context information does not contain proc_id"
)
self.assertIn(
"point_rank:0",
context_info,
f"Context information {context_info} does not contain point_rank",
)
finally:
await proc_mesh.stop()


class TestRemoteAllocator(unittest.IsolatedAsyncioTestCase):
@classmethod
def setUpClass(cls) -> None:
Expand Down Expand Up @@ -179,7 +269,7 @@ class EmptyAllocInitializer(StaticRemoteAllocInitializer):
"""test initializer that returns an empty list of addresses"""

async def initialize_alloc(self, match_labels: dict[str, str]) -> list[str]:
_ = match_labels # Suppress unused variable warning
_ = match_labels
return []

empty_initializer = EmptyAllocInitializer()
Expand Down Expand Up @@ -339,6 +429,41 @@ async def test_stop_proc_mesh_context_manager(self) -> None:
# now we doing casting without accessing the wrapped type.
del actor

async def test_setup_lambda_sets_env_vars(self) -> None:
"""Test that the setup lambda can set environment variables during proc_mesh allocation"""
test_var_name: str = "TEST_ENV_VAR_FOR_PROC_MESH"
test_var_value: str = "test_value_123"

def setup_env_vars(ctx: MonarchContext) -> None:
os.environ[test_var_name] = test_var_value

hosts = 2
gpus = 4
spec = AllocSpec(AllocConstraints(), host=hosts, gpu=gpus)

with remote_process_allocator() as host1, remote_process_allocator() as host2:
allocator = RemoteAllocator(
world_id="test_remote_allocator",
initializer=StaticRemoteAllocInitializer(host1, host2),
heartbeat_interval=_100_MILLISECONDS,
)
alloc = await allocator.allocate(spec)
proc_mesh = await ProcMesh.from_alloc(alloc, setup=setup_env_vars)

try:
actor = await proc_mesh.spawn("env_check", EnvCheckActor)

env_var_values = await actor.get_env_var.call(test_var_name)
env_var_value = env_var_values.item(host=0, gpu=0)

self.assertEqual(
env_var_value,
test_var_value,
f"Environment variable {test_var_name} was not set correctly",
)
finally:
await proc_mesh.stop()

async def test_stop_proc_mesh_context_manager_multiple_times(self) -> None:
spec = AllocSpec(AllocConstraints(), host=2, gpu=4)

Expand Down
Loading
Loading