diff --git a/python/monarch/_src/actor/proc_mesh.py b/python/monarch/_src/actor/proc_mesh.py index 89163ff5..27fb0226 100644 --- a/python/monarch/_src/actor/proc_mesh.py +++ b/python/monarch/_src/actor/proc_mesh.py @@ -14,6 +14,7 @@ from typing import ( Any, + Callable, cast, Dict, List, @@ -41,7 +42,9 @@ _ActorMeshRefImpl, Actor, ActorMeshRef, + endpoint, fake_sync_state, + MonarchContext, ) from monarch._src.actor.allocator import LocalAllocator, ProcessAllocator, SimAllocator from monarch._src.actor.code_sync import ( @@ -50,7 +53,6 @@ WorkspaceLocation, WorkspaceShape, ) -from monarch._src.actor.code_sync.auto_reload import AutoReloadActor from monarch._src.actor.debugger import ( _DEBUG_MANAGER_ACTOR_NAME, DebugClient, @@ -79,6 +81,27 @@ DeviceMesh = Any +class SetupActor(Actor): + """ + A helper actor to setup the proc mesh with user defined setup method. + Typically used to setup the environment variables. + """ + + def __init__(self, env: Callable[[MonarchContext], None]) -> None: + """ + Initialize the setup actor with the user defined setup method. + """ + self._setup_method = env + + @endpoint + async def setup(self) -> None: + """ + Call the user defined setup method with the monarch context. + """ + ctx = MonarchContext.get() + self._setup_method(ctx) + + T = TypeVar("T") try: from __manifest__ import fbmake # noqa @@ -88,8 +111,20 @@ IN_PAR = False -async def _allocate_nonblocking(alloc: Alloc) -> "ProcMesh": - return ProcMesh(await HyProcMesh.allocate_nonblocking(alloc)) +async def _allocate_nonblocking( + alloc: Alloc, setup: Callable[[MonarchContext], None] | None = None +) -> "ProcMesh": + _proc_mesh = await HyProcMesh.allocate_nonblocking(alloc) + if setup is None: + return ProcMesh(_proc_mesh) + # If the user has passed the setup lambda, we need to call + # it here before any of the other actors are spawned so that + # the environment variables are set up before cuda init. + proc_mesh = ProcMesh(_proc_mesh) + setup_actor = await proc_mesh.spawn("setup", SetupActor, setup) + await setup_actor.setup.call() + del setup_actor + return proc_mesh class ProcMesh(MeshTrait): @@ -171,9 +206,19 @@ async def monitor_loop(monitor): return await self._proc_mesh.monitor() @classmethod - def from_alloc(self, alloc: Alloc) -> Future["ProcMesh"]: + def from_alloc( + self, alloc: Alloc, setup: Callable[[MonarchContext], None] | None = None + ) -> Future["ProcMesh"]: + """ + Allocate a process mesh according to the provided alloc. + Returns when the mesh is fully allocated. + + Arguments: + - `alloc`: The alloc to allocate according to. + - `setup`: A lambda taking MonarchContext as param, can be used to setup env vars on the allocated mesh + """ return Future( - impl=lambda: _allocate_nonblocking(alloc), + impl=lambda: _allocate_nonblocking(alloc, setup), requires_loop=False, ) @@ -366,7 +411,11 @@ def _get_bootstrap_args() -> tuple[str, Optional[list[str]], dict[str, str]]: async def proc_mesh_nonblocking( - *, gpus: Optional[int] = None, hosts: int = 1, env: Optional[dict[str, str]] = None + *, + gpus: Optional[int] = None, + hosts: int = 1, + env: dict[str, str] | None = None, + setup: Callable[[MonarchContext], None] | None = None, ) -> ProcMesh: if gpus is None: gpus = _local_device_count() @@ -375,18 +424,32 @@ async def proc_mesh_nonblocking( # in the order of the dimensions. spec = AllocSpec(AllocConstraints(), hosts=hosts, gpus=gpus) env = env or {} - cmd, args, base_env = _get_bootstrap_args() - env.update(base_env) + # Todo: Deprecate the env field from the ProcessAllocator + # The PAR_MAIN_OVERRIDE needs to be passed as an env + # to the proc mesh construction in rust, so can not be moved to the + # SetupActor yet + cmd, args, bootstrap_env = _get_bootstrap_args() + env.update(bootstrap_env) allocator = ProcessAllocator(cmd, args, env) alloc = await allocator.allocate(spec) - return await ProcMesh.from_alloc(alloc) + + return await ProcMesh.from_alloc( + alloc, + setup=setup, + ) def proc_mesh( - *, gpus: Optional[int] = None, hosts: int = 1, env: Optional[dict[str, str]] = None + *, + gpus: Optional[int] = None, + hosts: int = 1, + env: dict[str, str] | None = None, + setup: Callable[[MonarchContext], None] | None = None, ) -> Future[ProcMesh]: return Future( - impl=lambda: proc_mesh_nonblocking(gpus=gpus, hosts=hosts, env=env), + impl=lambda: proc_mesh_nonblocking( + gpus=gpus, hosts=hosts, env=env, setup=setup + ), requires_loop=False, ) diff --git a/python/tests/test_allocator.py b/python/tests/test_allocator.py index 5fff8284..24ce6689 100644 --- a/python/tests/test_allocator.py +++ b/python/tests/test_allocator.py @@ -33,8 +33,10 @@ ChannelTransport, ) +from monarch._src.actor.actor_mesh import MonarchContext from monarch._src.actor.allocator import ( ALLOC_LABEL_PROC_MESH_NAME, + LocalAllocator, RemoteAllocator, StaticRemoteAllocInitializer, TorchXRemoteAllocInitializer, @@ -60,6 +62,18 @@ UNUSED = "__UNUSED__" +class EnvCheckActor(Actor): + """Actor that checks for the presence of an environment variable""" + + def __init__(self) -> None: + pass + + @endpoint + async def get_env_var(self, var_name: str) -> str: + """Return the value of the specified environment variable or 'NOT_SET' if not found""" + return os.environ.get(var_name, "NOT_SET") + + class TestActor(Actor): """Silly actor that computes the world size by all-reducing rank-hot tensors""" @@ -128,6 +142,82 @@ def remote_process_allocator( process_allocator.kill() +class TestSetupActorInAllocator(unittest.IsolatedAsyncioTestCase): + @classmethod + def setUpClass(cls) -> None: + cloudpickle.register_pickle_by_value(sys.modules[TestActor.__module__]) + + @classmethod + def tearDownClass(cls) -> None: + cloudpickle.unregister_pickle_by_value(sys.modules[TestActor.__module__]) + + async def test_setup_lambda_with_multiple_env_vars(self) -> None: + """Test that the setup lambda can set multiple environment variables""" + env_vars: dict[str, str] = { + "TEST_ENV_VAR_1": "value_1", + "TEST_ENV_VAR_2": "value_2", + "TEST_ENV_VAR_3": "value_3", + } + + def setup_multiple_env_vars(ctx: MonarchContext) -> None: + for name, value in env_vars.items(): + os.environ[name] = value + + spec = AllocSpec(AllocConstraints(), gpus=1, hosts=1) + allocator = LocalAllocator() + alloc = await allocator.allocate(spec) + + proc_mesh = await ProcMesh.from_alloc(alloc, setup=setup_multiple_env_vars) + + try: + actor = await proc_mesh.spawn("env_check", EnvCheckActor) + + for name, expected_value in env_vars.items(): + actual_value = await actor.get_env_var.call_one(name) + self.assertEqual( + actual_value, + expected_value, + f"Environment variable {name} was not set correctly", + ) + finally: + await proc_mesh.stop() + + async def test_setup_lambda_with_context_info(self) -> None: + """Test that the setup lambda can access context information""" + context_var_name: str = "PROC_MESH_CONTEXT_INFO" + + def setup_with_context(ctx: MonarchContext) -> None: + context_info = f"proc_id:{ctx.proc_id},point_rank:{ctx.point.rank}" + os.environ[context_var_name] = context_info + + spec = AllocSpec(AllocConstraints(), gpus=1, hosts=1) + allocator = LocalAllocator() + alloc = await allocator.allocate(spec) + + proc_mesh = await ProcMesh.from_alloc(alloc, setup=setup_with_context) + + try: + actor = await proc_mesh.spawn("env_check", EnvCheckActor) + + context_info = await actor.get_env_var.call_one(context_var_name) + + self.assertNotEqual( + context_info, + "NOT_SET", + "Context information was not stored in the environment variable", + ) + self.assertIn( + "proc_id:", context_info, "Context information does not contain proc_id" + ) + self.assertIn( + "point_rank:0", + context_info, + f"Context information {context_info} does not contain point_rank", + ) + finally: + await proc_mesh.stop() + + class TestRemoteAllocator(unittest.IsolatedAsyncioTestCase): @classmethod def setUpClass(cls) -> None: @@ -179,7 +269,7 @@ class EmptyAllocInitializer(StaticRemoteAllocInitializer): """test initializer that returns an empty list of addresses""" async def initialize_alloc(self, match_labels: dict[str, str]) -> list[str]: - _ = match_labels # Suppress unused variable warning + _ = match_labels return [] empty_initializer = EmptyAllocInitializer() @@ -339,6 +429,41 @@ async def test_stop_proc_mesh_context_manager(self) -> None: # now we doing casting without accessing the wrapped type. del actor + async def test_setup_lambda_sets_env_vars(self) -> None: + """Test that the setup lambda can set environment variables during proc_mesh allocation""" + test_var_name: str = "TEST_ENV_VAR_FOR_PROC_MESH" + test_var_value: str = "test_value_123" + + def setup_env_vars(ctx: MonarchContext) -> None: + os.environ[test_var_name] = test_var_value + + hosts = 2 + gpus = 4 + spec = AllocSpec(AllocConstraints(), host=hosts, gpu=gpus) + + with remote_process_allocator() as host1, remote_process_allocator() as host2: + allocator = RemoteAllocator( + world_id="test_remote_allocator", + initializer=StaticRemoteAllocInitializer(host1, host2), + heartbeat_interval=_100_MILLISECONDS, + ) + alloc = await allocator.allocate(spec) + proc_mesh = await ProcMesh.from_alloc(alloc, setup=setup_env_vars) + + try: + actor = await proc_mesh.spawn("env_check", EnvCheckActor) + + env_var_values = await actor.get_env_var.call(test_var_name) + env_var_value = env_var_values.item(host=0, gpu=0) + + self.assertEqual( + env_var_value, + test_var_value, + f"Environment variable {test_var_name} was not set correctly", + ) + finally: + await proc_mesh.stop() + async def test_stop_proc_mesh_context_manager_multiple_times(self) -> None: spec = AllocSpec(AllocConstraints(), host=2, gpu=4) diff --git a/python/tests/test_env_before_cuda.py b/python/tests/test_env_before_cuda.py new file mode 100644 index 00000000..6e959419 --- /dev/null +++ b/python/tests/test_env_before_cuda.py @@ -0,0 +1,162 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +# pyre-strict + +import os +import sys +import unittest +from typing import Dict, List + +import cloudpickle + +import torch +from monarch._rust_bindings.monarch_hyperactor.alloc import AllocConstraints, AllocSpec +from monarch._src.actor.actor_mesh import MonarchContext +from monarch._src.actor.allocator import LocalAllocator +from monarch._src.actor.proc_mesh import proc_mesh +from monarch.actor import Actor, endpoint, ProcMesh + + +class CudaInitTestActor(Actor): + """Actor that initializes CUDA and checks environment variables""" + + def __init__(self) -> None: + self.env_vars_before_init: Dict[str, str] = {} + self.cuda_initialized: bool = False + + @endpoint + async def init_cuda_and_check_env(self, env_var_names: List[str]) -> Dict[str, str]: + """ + Check environment variables before initializing CUDA + Returns the values of the environment variables + """ + for var_name in env_var_names: + self.env_vars_before_init[var_name] = os.environ.get(var_name, "NOT_SET") + + if torch.cuda.is_available(): + torch.cuda.init() + self.cuda_initialized = True + + return self.env_vars_before_init + + @endpoint + async def is_cuda_initialized(self) -> bool: + """Return whether CUDA was initialized""" + return self.cuda_initialized + + +class TestEnvBeforeCuda(unittest.IsolatedAsyncioTestCase): + """Test that the env vars are setup before cuda init""" + + @classmethod + def setUpClass(cls) -> None: + cloudpickle.register_pickle_by_value(sys.modules[CudaInitTestActor.__module__]) + + @classmethod + def tearDownClass(cls) -> None: + cloudpickle.unregister_pickle_by_value( + sys.modules[CudaInitTestActor.__module__] + ) + + async def test_lambda_sets_env_vars_before_cuda_init(self) -> None: + """Test that environment variables are set by lambda before CUDA initialization""" + cuda_env_vars: Dict[str, str] = { + "CUDA_VISIBLE_DEVICES": "0", + "CUDA_CACHE_PATH": "/tmp/cuda_cache_test", + "CUDA_LAUNCH_BLOCKING": "1", + } + + def setup_cuda_env(_: MonarchContext) -> None: + for name, value in cuda_env_vars.items(): + os.environ[name] = value + + spec = AllocSpec(AllocConstraints(), gpus=1, hosts=1) + allocator = LocalAllocator() + alloc = await allocator.allocate(spec) + + proc_mesh = await ProcMesh.from_alloc(alloc, setup=setup_cuda_env) + + try: + actor = await proc_mesh.spawn("cuda_init", CudaInitTestActor) + + env_vars = await actor.init_cuda_and_check_env.call_one( + list(cuda_env_vars.keys()) + ) + + await actor.is_cuda_initialized.call_one() + + for name, expected_value in cuda_env_vars.items(): + self.assertEqual( + env_vars.get(name), + expected_value, + f"Environment variable {name} was not set correctly before CUDA initialization", + ) + + finally: + await proc_mesh.stop() + + async def test_proc_mesh_with_lambda_env(self) -> None: + """Test that proc_mesh function works with lambda for env parameter""" + cuda_env_vars: Dict[str, str] = { + "CUDA_DEVICE_ORDER": "PCI_BUS_ID", + "CUDA_MODULE_LOADING": "LAZY", + "CUDA_DEVICE_MAX_CONNECTIONS": "1", + } + + def setup_cuda_env(_: MonarchContext) -> None: + for name, value in cuda_env_vars.items(): + os.environ[name] = value + + proc_mesh_instance = await proc_mesh(gpus=1, hosts=1, setup=setup_cuda_env) + + try: + actor = await proc_mesh_instance.spawn("cuda_init", CudaInitTestActor) + + env_vars = await actor.init_cuda_and_check_env.call_one( + list(cuda_env_vars.keys()) + ) + for name, expected_value in cuda_env_vars.items(): + self.assertEqual( + env_vars.get(name), + expected_value, + f"Environment variable {name} was not set correctly before CUDA initialization", + ) + + finally: + await proc_mesh_instance.stop() + + async def test_proc_mesh_with_dictionary_env(self) -> None: + """Test that proc_mesh function works with dictionary for env parameter""" + cuda_env_vars: Dict[str, str] = { + "CUDA_DEVICE_ORDER": "PCI_BUS_ID", + "CUDA_MODULE_LOADING": "LAZY", + "CUDA_DEVICE_MAX_CONNECTIONS": "1", + } + + proc_mesh_instance = await proc_mesh(gpus=1, hosts=1, env=cuda_env_vars) + + try: + actor = await proc_mesh_instance.spawn("cuda_init", CudaInitTestActor) + env_vars = await actor.init_cuda_and_check_env.call_one( + list(cuda_env_vars.keys()) + ) + + self.assertEqual( + env_vars.get("CUDA_DEVICE_ORDER"), + "PCI_BUS_ID", + ) + self.assertEqual( + env_vars.get("CUDA_MODULE_LOADING"), + "LAZY", + ) + self.assertEqual( + env_vars.get("CUDA_DEVICE_MAX_CONNECTIONS"), + "1", + ) + + finally: + await proc_mesh_instance.stop()