diff --git a/changes/4252.feature.md b/changes/4252.feature.md new file mode 100644 index 00000000000..149600ffb4c --- /dev/null +++ b/changes/4252.feature.md @@ -0,0 +1 @@ +Add detection and event notifications for dangling kernel/container mismatches diff --git a/src/ai/backend/agent/agent.py b/src/ai/backend/agent/agent.py index f0863dc0323..50b50013b42 100644 --- a/src/ai/backend/agent/agent.py +++ b/src/ai/backend/agent/agent.py @@ -145,6 +145,7 @@ from ai.backend.common.metrics.metric import CommonMetricRegistry from ai.backend.common.metrics.types import UTILIZATION_METRIC_INTERVAL from ai.backend.common.plugin.monitor import ErrorPluginContext, StatsPluginContext +from ai.backend.common.runner import LoopRunner from ai.backend.common.service_ports import parse_service_ports from ai.backend.common.types import ( MODEL_SERVICE_RUNTIME_PROFILES, @@ -201,6 +202,7 @@ AbstractKernel, match_distro_data, ) +from .probe import AgentProbe from .resources import ( AbstractAllocMap, AbstractComputeDevice, @@ -711,6 +713,7 @@ class AbstractAgent( _ongoing_exec_batch_tasks: weakref.WeakSet[asyncio.Task] _ongoing_destruction_tasks: weakref.WeakValueDictionary[KernelId, asyncio.Task] _metric_registry: CommonMetricRegistry + _probe_runner: LoopRunner def __init__( self, @@ -874,6 +877,9 @@ async def _pipeline(r: Redis): self.last_registry_written_time = time.monotonic() self.container_lifecycle_handler = loop.create_task(self.process_lifecycle_events()) + self._probe_runner = self._init_probe_runner_obj() + await self._probe_runner.run() + # Notify the gateway. await self.anycast_event(AgentStartedEvent(reason="self-started")) @@ -915,6 +921,7 @@ async def shutdown(self, stop_signal: signal.Signals) -> None: It must call this super method in an appropriate order, only once. """ await cancel_tasks(self._ongoing_exec_batch_tasks) + await self._probe_runner.close() async with self.registry_lock: # Close all pending kernel runners. @@ -1474,6 +1481,17 @@ async def reconstruct_resource_usage(self) -> None: kernel_id, ) + def get_kernel_registry(self) -> Mapping[KernelId, AbstractKernel]: + return self.kernel_registry + + def _init_probe_runner_obj(self) -> LoopRunner: + probe = AgentProbe( + self.enumerate_containers, + self.get_kernel_registry, + self.event_producer, + ) + return LoopRunner.with_nop_resource_ctx(11.0, [probe]) + async def sync_container_lifecycles(self, interval: float) -> None: """ Periodically synchronize the alive/known container sets, @@ -1852,6 +1870,10 @@ async def scan_running_kernels(self) -> None: self._iterate_batch_result(kernel_obj.kernel_id), ), ) + if kernel_obj.runner is not None: + kernel_obj.runner.event_producer = self.event_producer + await kernel_obj.runner.__ainit__() + await kernel_obj.init_probe_runner() async with self.registry_lock: for kernel_id, container in await self.enumerate_containers( ACTIVE_STATUS_SET | DEAD_STATUS_SET, diff --git a/src/ai/backend/agent/docker/agent.py b/src/ai/backend/agent/docker/agent.py index 718d18a7e10..047301e634c 100644 --- a/src/ai/backend/agent/docker/agent.py +++ b/src/ai/backend/agent/docker/agent.py @@ -105,7 +105,7 @@ ) from ..exception import ContainerCreationError, UnsupportedResource from ..fs import create_scratch_filesystem, destroy_scratch_filesystem -from ..kernel import AbstractKernel +from ..kernel import AbstractKernel, KernelInitArgs from ..plugin.network import ContainerNetworkCapability, ContainerNetworkInfo, NetworkPluginContext from ..proxy import DomainSocketProxy, proxy_connection from ..resources import AbstractComputePlugin, KernelResourceSpec, Mount, known_slot_types @@ -118,7 +118,6 @@ KernelOwnershipData, LifecycleEvent, MountInfo, - Port, ) from ..utils import ( closing_async, @@ -130,7 +129,7 @@ from .kernel import DockerKernel from .metadata.server import MetadataServer from .resources import load_resources, scan_available_resources -from .utils import PersistentServiceContainer +from .utils import PersistentServiceContainer, container_from_docker_container if TYPE_CHECKING: from ai.backend.common.auth import PublicKey @@ -153,27 +152,6 @@ } -def container_from_docker_container(src: DockerContainer) -> Container: - ports = [] - for private_port, host_ports in src["NetworkSettings"]["Ports"].items(): - private_port = int(private_port.split("/")[0]) - if host_ports is None: - host_ip = "127.0.0.1" - host_port = 0 - else: - host_ip = host_ports[0]["HostIp"] - host_port = int(host_ports[0]["HostPort"]) - ports.append(Port(host_ip, private_port, host_port)) - return Container( - id=src._id, - status=src["State"]["Status"], - image=src["Config"]["Image"], - labels=src["Config"]["Labels"], - ports=ports, - backend_obj=src, - ) - - async def _clean_scratch( loop: asyncio.AbstractEventLoop, scratch_type: str, @@ -881,16 +859,19 @@ def chown_idfile(uid: Optional[int], gid: Optional[int]) -> None: ) kernel_obj = DockerKernel( - self.ownership_data, - self.kernel_config["network_id"], - self.image_ref, - self.kspec_version, - cluster_info["network_config"].get("mode", "bridge"), - agent_config=self.local_config, - service_ports=service_ports, - resource_spec=resource_spec, - environ=environ, - data={}, + KernelInitArgs( + ownership_data=self.ownership_data, + network_id=self.kernel_config["network_id"], + image=self.image_ref, + version=self.kspec_version, + agent_config=self.local_config, + service_ports=service_ports, + resource_spec=resource_spec, + environ=environ, + data={}, + event_producer=self.event_producer, + ), + network_driver=cluster_info["network_config"].get("mode", "bridge"), ) return kernel_obj diff --git a/src/ai/backend/agent/docker/kernel.py b/src/ai/backend/agent/docker/kernel.py index f054f73df27..15456626504 100644 --- a/src/ai/backend/agent/docker/kernel.py +++ b/src/ai/backend/agent/docker/kernel.py @@ -12,7 +12,7 @@ import subprocess import textwrap from pathlib import Path, PurePosixPath -from typing import Any, Dict, Final, FrozenSet, Mapping, Optional, Sequence, Tuple, cast, override +from typing import Any, Final, FrozenSet, Mapping, Optional, Sequence, Tuple, cast, override import janus import pkg_resources @@ -21,18 +21,20 @@ from aiotools import TaskGroup from ai.backend.agent.docker.utils import PersistentServiceContainer -from ai.backend.common.docker import ImageRef from ai.backend.common.events.dispatcher import EventProducer from ai.backend.common.lock import FileLock +from ai.backend.common.runner import LoopRunner from ai.backend.common.types import CommitStatus, KernelId, Sentinel from ai.backend.common.utils import current_loop from ai.backend.logging import BraceStyleAdapter from ai.backend.plugin.entrypoint import scan_entrypoints -from ..kernel import AbstractCodeRunner, AbstractKernel -from ..resources import KernelResourceSpec -from ..types import AgentEventData, KernelOwnershipData +from ..kernel import AbstractCodeRunner, AbstractKernel, KernelInitArgs +from ..types import ( + AgentEventData, +) from ..utils import closing_async, get_arch_name +from .probe import DockerKernelProbe log = BraceStyleAdapter(logging.getLogger(__spec__.name)) @@ -45,35 +47,13 @@ class DockerKernel(AbstractKernel): def __init__( self, - ownership_data: KernelOwnershipData, - network_id: str, - image: ImageRef, - version: int, + args: KernelInitArgs, network_driver: str, - *, - agent_config: Mapping[str, Any], - resource_spec: KernelResourceSpec, - service_ports: Any, # TODO: type-annotation - environ: Mapping[str, Any], - data: Dict[str, Any], ) -> None: - super().__init__( - ownership_data, - network_id, - image, - version, - agent_config=agent_config, - resource_spec=resource_spec, - service_ports=service_ports, - data=data, - environ=environ, - ) + super().__init__(args) self.network_driver = network_driver - async def close(self) -> None: - pass - def __getstate__(self): props = super().__getstate__() return props @@ -83,6 +63,16 @@ def __setstate__(self, props): props["network_driver"] = "bridge" super().__setstate__(props) + @override + def _init_probe_runner_obj(self) -> LoopRunner: + probe = DockerKernelProbe( + self.kernel_id, + self.get_kernel_lifecycle_state, + self.get_container_id, + self._event_producer, + ) + return LoopRunner.with_nop_resource_ctx(5.0, [probe]) + async def create_code_runner( self, event_producer: EventProducer, *, client_features: FrozenSet[str], api_version: int ) -> AbstractCodeRunner: diff --git a/src/ai/backend/agent/docker/probe.py b/src/ai/backend/agent/docker/probe.py new file mode 100644 index 00000000000..485d9a48dfb --- /dev/null +++ b/src/ai/backend/agent/docker/probe.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +from typing import Callable, Optional + +from aiodocker.docker import Docker +from aiodocker.exceptions import DockerError + +from ai.backend.common.events.dispatcher import EventProducer +from ai.backend.common.events.event_types.agent.anycast import ( + DanglingKernelDetected, +) +from ai.backend.common.types import ContainerId, KernelId + +from ..probe import DanglingKernel +from ..types import ( + Container, + ContainerStatus, + KernelLifecycleStatus, +) +from ..utils import closing_async +from .utils import container_from_docker_container + + +class DockerKernelProbe: + def __init__( + self, + kernel_id: KernelId, + kernel_state_getter: Callable[..., KernelLifecycleStatus], + container_id_getter: Callable[..., Optional[ContainerId]], + event_producer: EventProducer, + ) -> None: + self._kernel_id = kernel_id + self._container_id_getter = container_id_getter + self._kernel_state_getter = kernel_state_getter + self._event_producer = event_producer + + async def _get_container_info(self) -> Optional[Container]: + cid = self._container_id_getter() + if cid is None: + return None + async with closing_async(Docker()) as docker: + try: + container = await docker.containers.get(str(cid)) + except DockerError as e: + if e.status == 404: + raise DanglingKernel + return container_from_docker_container(container) + + def _compare_with_container(self, container: Optional[Container]) -> None: + kernel_state = self._kernel_state_getter() + match kernel_state: + case KernelLifecycleStatus.PREPARING: + if container is not None: + # container exists but kernel is hanging in PREPARING state + raise DanglingKernel + case KernelLifecycleStatus.RUNNING: + if container is None or container.status != ContainerStatus.RUNNING: + raise DanglingKernel + case KernelLifecycleStatus.TERMINATING: + # There might be a delay in the container status change + # after the kernel is being terminated. + pass + + async def probe(self, resource_ctx: None) -> None: + try: + container = await self._get_container_info() + self._compare_with_container(container) + except DanglingKernel: + await self._event_producer.anycast_event(DanglingKernelDetected(self._kernel_id)) diff --git a/src/ai/backend/agent/docker/utils.py b/src/ai/backend/agent/docker/utils.py index 966b8d9b046..93c3399ec41 100644 --- a/src/ai/backend/agent/docker/utils.py +++ b/src/ai/backend/agent/docker/utils.py @@ -7,12 +7,13 @@ from typing import Any, Final, Mapping, Optional, Tuple import pkg_resources -from aiodocker.docker import Docker +from aiodocker.docker import Docker, DockerContainer from aiodocker.exceptions import DockerError from ai.backend.logging import BraceStyleAdapter from ..exception import InitializationError +from ..types import Container, Port from ..utils import closing_async, get_arch_name, update_nested_dict log = BraceStyleAdapter(logging.getLogger(__spec__.name)) @@ -182,3 +183,24 @@ async def start(self) -> None: async with closing_async(Docker()) as docker: c = docker.containers.container(self.container_name) await c.start() + + +def container_from_docker_container(src: DockerContainer) -> Container: + ports = [] + for private_port, host_ports in src["NetworkSettings"]["Ports"].items(): + private_port = int(private_port.split("/")[0]) + if host_ports is None: + host_ip = "127.0.0.1" + host_port = 0 + else: + host_ip = host_ports[0]["HostIp"] + host_port = int(host_ports[0]["HostPort"]) + ports.append(Port(host_ip, private_port, host_port)) + return Container( + id=src._id, + status=src["State"]["Status"], + image=src["Config"]["Image"], + labels=src["Config"]["Labels"], + ports=ports, + backend_obj=src, + ) diff --git a/src/ai/backend/agent/dummy/agent.py b/src/ai/backend/agent/dummy/agent.py index 9bf74de90c5..2aac1a839db 100644 --- a/src/ai/backend/agent/dummy/agent.py +++ b/src/ai/backend/agent/dummy/agent.py @@ -47,7 +47,7 @@ ScanImagesResult, ) from ..exception import UnsupportedResource -from ..kernel import AbstractKernel +from ..kernel import AbstractKernel, KernelInitArgs from ..resources import AbstractComputePlugin, KernelResourceSpec, Mount, known_slot_types from ..types import Container, ContainerStatus, KernelOwnershipData, MountInfo from .config import DEFAULT_CONFIG_PATH, dummy_local_config @@ -175,15 +175,18 @@ async def prepare_container( delay = self.creation_ctx_config["delay"]["spawn"] await asyncio.sleep(delay) return DummyKernel( - self.ownership_data, - self.kernel_config["network_id"], - self.image_ref, - self.kspec_version, - agent_config=self.local_config, - service_ports=service_ports, - resource_spec=resource_spec, - environ=environ, - data={}, + KernelInitArgs( + ownership_data=self.ownership_data, + network_id=self.kernel_config["network_id"], + image=self.image_ref, + version=self.kspec_version, + agent_config=self.local_config, + resource_spec=resource_spec, + service_ports=service_ports, + environ=environ, + data={}, + event_producer=self.event_producer, + ), dummy_config=self.dummy_config, ) diff --git a/src/ai/backend/agent/dummy/kernel.py b/src/ai/backend/agent/dummy/kernel.py index 1da9a610318..514ed215ba4 100644 --- a/src/ai/backend/agent/dummy/kernel.py +++ b/src/ai/backend/agent/dummy/kernel.py @@ -3,15 +3,15 @@ import asyncio import os from collections import OrderedDict -from typing import Any, Dict, FrozenSet, Mapping, Sequence, override +from collections.abc import Mapping, Sequence +from typing import Any, FrozenSet, override -from ai.backend.common.docker import ImageRef from ai.backend.common.events.dispatcher import EventProducer +from ai.backend.common.runner import LoopRunner from ai.backend.common.types import CommitStatus -from ..kernel import AbstractCodeRunner, AbstractKernel, NextResult, ResultRecord -from ..resources import KernelResourceSpec -from ..types import AgentEventData, KernelOwnershipData +from ..kernel import AbstractCodeRunner, AbstractKernel, KernelInitArgs, NextResult, ResultRecord +from ..types import AgentEventData class DummyKernel(AbstractKernel): @@ -19,29 +19,10 @@ class DummyKernel(AbstractKernel): def __init__( self, - ownership_data: KernelOwnershipData, - network_id: str, - image: ImageRef, - version: int, - *, - agent_config: Mapping[str, Any], - resource_spec: KernelResourceSpec, - service_ports: Any, # TODO: type-annotation - environ: Mapping[str, Any], - data: Dict[str, Any], + args: KernelInitArgs, dummy_config: Mapping[str, Any], ) -> None: - super().__init__( - ownership_data, - network_id, - image, - version, - agent_config=agent_config, - resource_spec=resource_spec, - service_ports=service_ports, - data=data, - environ=environ, - ) + super().__init__(args) self.is_commiting = False self.dummy_config = dummy_config self.dummy_kernel_cfg = self.dummy_config["kernel"] @@ -49,6 +30,10 @@ def __init__( async def close(self) -> None: pass + @override + def _init_probe_runner_obj(self) -> LoopRunner: + return LoopRunner.nop() + async def create_code_runner( self, event_producer: EventProducer, diff --git a/src/ai/backend/agent/kernel.py b/src/ai/backend/agent/kernel.py index 0625bead265..a015137be66 100644 --- a/src/ai/backend/agent/kernel.py +++ b/src/ai/backend/agent/kernel.py @@ -49,9 +49,11 @@ ModelServiceStatusAnycastEvent, ) from ai.backend.common.json import dump_json, load_json +from ai.backend.common.runner import LoopRunner from ai.backend.common.types import ( AgentId, CommitStatus, + ContainerId, KernelId, ModelServiceStatus, ServicePort, @@ -161,6 +163,21 @@ class NextResult(TypedDict): console: NotRequired[Sequence[Any]] +@dataclass +class KernelInitArgs: + ownership_data: KernelOwnershipData + network_id: str + image: ImageRef + version: int + agent_config: Mapping[str, Any] + resource_spec: KernelResourceSpec + service_ports: Any # TODO: type-annotation + data: dict[str, Any] + environ: Mapping[str, Any] + event_producer: EventProducer + session_type: SessionTypes = SessionTypes.INTERACTIVE # TODO: Remove default value + + class AbstractKernel(UserDict, aobject, metaclass=ABCMeta): version: int ownership_data: KernelOwnershipData @@ -181,6 +198,7 @@ class AbstractKernel(UserDict, aobject, metaclass=ABCMeta): # FIXME: apply TypedDict to data in Python 3.8 environ: Mapping[str, Any] state: KernelLifecycleStatus + _event_producer: EventProducer session_type: SessionTypes _tasks: Set[asyncio.Task] @@ -189,38 +207,30 @@ class AbstractKernel(UserDict, aobject, metaclass=ABCMeta): def __init__( self, - ownership_data: KernelOwnershipData, - network_id: str, - image: ImageRef, - version: int, - *, - agent_config: Mapping[str, Any], - resource_spec: KernelResourceSpec, - service_ports: Any, # TODO: type-annotation - data: Dict[Any, Any], - environ: Mapping[str, Any], - session_type: SessionTypes = SessionTypes.INTERACTIVE, + args: KernelInitArgs, ) -> None: - self.agent_config = agent_config - self.ownership_data = ownership_data - self.kernel_id = ownership_data.kernel_id - self.session_id = ownership_data.session_id - self.agent_id = ownership_data.agent_id - self.network_id = network_id - self.image = image - self.version = version - self.resource_spec = resource_spec - self.service_ports = service_ports - self.data = data + self.agent_config = args.agent_config + self.ownership_data = args.ownership_data + self.kernel_id = self.ownership_data.kernel_id + self.session_id = self.ownership_data.session_id + self.agent_id = self.ownership_data.agent_id + self.network_id = args.network_id + self.image = args.image + self.version = args.version + self.resource_spec = args.resource_spec + self.service_ports = args.service_ports + self.data = args.data self.last_used = time.monotonic() self.termination_reason = None self.clean_event = None self.stats_enabled = False - self.environ = environ + self._tasks = set() + self.environ = args.environ self.runner = None self.container_id = None self.state = KernelLifecycleStatus.PREPARING - self.session_type = session_type + self._event_producer = args.event_producer + self.session_type = args.session_type async def init(self, event_producer: EventProducer) -> None: log.debug( @@ -232,11 +242,15 @@ async def init(self, event_producer: EventProducer) -> None: self.runner = await self.create_code_runner( event_producer, client_features=default_client_features, api_version=default_api_version ) + await self.init_probe_runner() def __getstate__(self) -> Mapping[str, Any]: props = self.__dict__.copy() del props["agent_config"] del props["clean_event"] + del props["_tasks"] + del props["_event_producer"] + del props["_probe_runner"] return props def __setstate__(self, props) -> None: @@ -252,16 +266,21 @@ def __setstate__(self, props) -> None: if "session_type" not in props: props["session_type"] = SessionTypes.INTERACTIVE self.__dict__.update(props) - # agent_config is set by the pickle.loads() caller. + # agent_config and _event_producer are set by the pickle.loads() caller. self.clean_event = None - @abstractmethod + def get_container_id(self) -> Optional[ContainerId]: + return ContainerId(self.container_id) if self.container_id is not None else None + + def get_kernel_lifecycle_state(self) -> KernelLifecycleStatus: + return self.state + async def close(self) -> None: """ Release internal resources used for interacting with the kernel. Note that this does NOT terminate the container. """ - pass + await self._probe_runner.close() # We don't have "allocate_slots()" method here because: # - resource_spec is initialized by allocating slots at computer's alloc_map @@ -277,6 +296,14 @@ def release_slots(self, computer_ctxs) -> None: for accel_key, accel_alloc in self.resource_spec.allocations.items(): computer_ctxs[accel_key].alloc_map.free(accel_alloc) + @abstractmethod + def _init_probe_runner_obj(self) -> LoopRunner: + raise NotImplementedError + + async def init_probe_runner(self) -> None: + self._probe_runner = self._init_probe_runner_obj() + await self._probe_runner.run() + @abstractmethod async def create_code_runner( self, diff --git a/src/ai/backend/agent/kubernetes/agent.py b/src/ai/backend/agent/kubernetes/agent.py index 2ebbb07d97c..f577624470a 100644 --- a/src/ai/backend/agent/kubernetes/agent.py +++ b/src/ai/backend/agent/kubernetes/agent.py @@ -66,7 +66,7 @@ ScanImagesResult, ) from ..exception import K8sError, UnsupportedResource -from ..kernel import AbstractKernel +from ..kernel import AbstractKernel, KernelInitArgs from ..resources import AbstractComputePlugin, KernelResourceSpec, Mount, known_slot_types from ..types import Container, ContainerStatus, KernelOwnershipData, MountInfo, Port from .kernel import KubernetesKernel @@ -647,15 +647,18 @@ def _write_config(file_name: str, content: str): # TODO: Mark shmem feature as unsupported when advertising agent kernel_obj = KubernetesKernel( - self.ownership_data, - self.kernel_config["network_id"], - self.image_ref, - self.kspec_version, - agent_config=self.local_config, - service_ports=service_ports, - resource_spec=resource_spec, - environ=environ, - data={}, + KernelInitArgs( + ownership_data=self.ownership_data, + network_id=self.kernel_config["network_id"], + image=self.image_ref, + version=self.kspec_version, + agent_config=self.local_config, + resource_spec=resource_spec, + service_ports=service_ports, + environ=environ, + data={}, + event_producer=self.event_producer, + ) ) return kernel_obj diff --git a/src/ai/backend/agent/kubernetes/kernel.py b/src/ai/backend/agent/kubernetes/kernel.py index 412879869ea..11510a478ed 100644 --- a/src/ai/backend/agent/kubernetes/kernel.py +++ b/src/ai/backend/agent/kubernetes/kernel.py @@ -5,7 +5,7 @@ import shutil import textwrap from pathlib import Path, PurePosixPath -from typing import Any, Dict, FrozenSet, Mapping, Optional, Sequence, Tuple, override +from typing import Any, FrozenSet, Mapping, Optional, Sequence, Tuple, override import pkg_resources import zmq @@ -16,15 +16,14 @@ from kubernetes_asyncio import watch from ai.backend.agent.utils import get_arch_name -from ai.backend.common.docker import ImageRef from ai.backend.common.events.dispatcher import EventProducer +from ai.backend.common.runner import LoopRunner from ai.backend.common.utils import current_loop from ai.backend.logging import BraceStyleAdapter from ai.backend.plugin.entrypoint import scan_entrypoints -from ..kernel import AbstractCodeRunner, AbstractKernel -from ..resources import KernelResourceSpec -from ..types import AgentEventData, KernelOwnershipData +from ..kernel import AbstractCodeRunner, AbstractKernel, KernelInitArgs +from ..types import AgentEventData log = BraceStyleAdapter(logging.getLogger(__spec__.name)) @@ -32,36 +31,19 @@ class KubernetesKernel(AbstractKernel): deployment_name: str - def __init__( - self, - ownership_data: KernelOwnershipData, - network_id: str, - image: ImageRef, - version: int, - *, - agent_config: Mapping[str, Any], - resource_spec: KernelResourceSpec, - service_ports: Any, # TODO: type-annotation - data: Dict[str, Any], - environ: Mapping[str, Any], - ) -> None: - super().__init__( - ownership_data, - network_id, - image, - version, - agent_config=agent_config, - resource_spec=resource_spec, - service_ports=service_ports, - data=data, - environ=environ, - ) + def __init__(self, args: KernelInitArgs) -> None: + super().__init__(args) - self.deployment_name = f"kernel-{ownership_data.kernel_id}" + self.deployment_name = f"kernel-{self.ownership_data.kernel_id}" async def close(self) -> None: + await super().close() await self.scale(0) + @override + def _init_probe_runner_obj(self) -> LoopRunner: + return LoopRunner.nop() + async def create_code_runner( self, event_producer: EventProducer, *, client_features: FrozenSet[str], api_version: int ) -> AbstractCodeRunner: diff --git a/src/ai/backend/agent/probe.py b/src/ai/backend/agent/probe.py new file mode 100644 index 00000000000..234a265c0ff --- /dev/null +++ b/src/ai/backend/agent/probe.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import asyncio +import logging +from collections.abc import Mapping, Sequence +from typing import TYPE_CHECKING, Awaitable, Callable + +from ai.backend.common.events.dispatcher import EventProducer +from ai.backend.common.events.event_types.agent.anycast import ( + DanglingContainerDetected, + DanglingKernelDetected, +) +from ai.backend.common.types import KernelId +from ai.backend.logging import BraceStyleAdapter + +from .types import ( + Container, + ContainerStatus, +) + +if TYPE_CHECKING: + from .kernel import AbstractKernel + +log = BraceStyleAdapter(logging.getLogger(__spec__.name)) + + +class DanglingKernel(Exception): + pass + + +class DanglingContainer(Exception): + pass + + +class AgentProbe: + def __init__( + self, + container_enumerator: Callable[ + [frozenset[ContainerStatus]], Awaitable[Sequence[tuple[KernelId, Container]]] + ], + kernel_registry_getter: Callable[..., Mapping[KernelId, AbstractKernel]], + event_producer: EventProducer, + ) -> None: + self._container_enumerator = container_enumerator + self._kernel_registry_getter = kernel_registry_getter + self._event_producer = event_producer + + async def _check_dangling_container( + self, + containers: Sequence[tuple[KernelId, Container]], + kernel_registry: Mapping[KernelId, AbstractKernel], + ) -> None: + """ + Check if the container is in the kernel registry. + If not, produce DanglingContainerDetected event. + """ + for existing_kernel, container in containers: + if existing_kernel not in kernel_registry: + log.exception( + "scan_containers() detected dangling container (k:{},c:{})", + existing_kernel, + container.id, + ) + await self._event_producer.anycast_event( + DanglingContainerDetected(container.id), + ) + + async def _check_dangling_kernel( + self, + containers: Sequence[tuple[KernelId, Container]], + kernel_registry: Mapping[KernelId, AbstractKernel], + ) -> None: + """ + Check if the kernel is in the container registry. + If not, produce DanglingKernelDetected event. + """ + existing_kernel_ids = set([k for k, _ in containers]) + for registered_kernel_id in kernel_registry: + if registered_kernel_id not in existing_kernel_ids: + log.exception( + "scan_containers() detected dangling kernel (k:{})", + registered_kernel_id, + ) + await self._event_producer.anycast_event( + DanglingKernelDetected(registered_kernel_id), + ) + + async def probe(self, resource_ctx: None) -> None: + """ + Scan containers and compare with kernel registry. + """ + try: + async with asyncio.timeout(20): + containers = await self._container_enumerator(ContainerStatus.all()) + except asyncio.TimeoutError: + log.warning("scan_containers() timeout, continuing") + return + + kernel_registry = self._kernel_registry_getter() + await self._check_dangling_container(containers, kernel_registry) + await self._check_dangling_kernel(containers, kernel_registry) diff --git a/src/ai/backend/agent/types.py b/src/ai/backend/agent/types.py index d16fb2deb2f..52fb243eaee 100644 --- a/src/ai/backend/agent/types.py +++ b/src/ai/backend/agent/types.py @@ -3,12 +3,13 @@ import uuid from dataclasses import dataclass from pathlib import Path -from typing import Any, Awaitable, Callable, Mapping, Optional, Sequence +from typing import Any, Awaitable, Callable, Mapping, Optional, Self, Sequence import attrs from aiohttp import web from aiohttp.typedefs import Handler +from ai.backend.common.docker import LabelName from ai.backend.common.events.kernel import KernelLifecycleEventReason from ai.backend.common.types import AgentId, ContainerId, KernelId, MountTypes, SessionId @@ -55,6 +56,10 @@ class ContainerStatus(enum.StrEnum): DEAD = "dead" REMOVING = "removing" + @classmethod + def all(cls) -> frozenset[Self]: + return frozenset(cls) + @attrs.define(auto_attribs=True, slots=True) class Container: @@ -65,6 +70,22 @@ class Container: ports: Sequence[Port] backend_obj: Any # used to keep the backend-specific data + @property + def kernel_id(self) -> Optional[KernelId]: + raw_kernel_id = self.labels.get(LabelName.KERNEL_ID.value) + try: + return KernelId(uuid.UUID(raw_kernel_id)) + except (TypeError, ValueError): + return None + + @property + def session_id(self) -> Optional[SessionId]: + _session_id = self.labels.get(LabelName.SESSION_ID.value) + try: + return SessionId(uuid.UUID(_session_id)) + except (TypeError, ValueError): + return None + class KernelLifecycleStatus(enum.StrEnum): """ diff --git a/src/ai/backend/common/events/event_types/agent/anycast.py b/src/ai/backend/common/events/event_types/agent/anycast.py index 32bb64eb615..335570cf440 100644 --- a/src/ai/backend/common/events/event_types/agent/anycast.py +++ b/src/ai/backend/common/events/event_types/agent/anycast.py @@ -1,3 +1,4 @@ +import uuid from dataclasses import dataclass, field from typing import Any, Mapping, Optional, Self, override @@ -6,7 +7,7 @@ EventDomain, ) from ai.backend.common.events.user_event.user_event import UserEvent -from ai.backend.common.types import AgentId +from ai.backend.common.types import AgentId, ContainerId, KernelId from ai.backend.logging.types import LogLevel @@ -158,3 +159,41 @@ def deserialize(cls, value: tuple) -> Self: @override def event_name(cls) -> str: return "do_agent_resource_check" + + +@dataclass +class DanglingKernelDetected(AgentOperationEvent): + kernel_id: KernelId + + def serialize(self) -> tuple: + return (str(self.kernel_id),) + + @classmethod + def deserialize(cls, value: tuple) -> Self: + return cls( + KernelId(uuid.UUID(value[0])), + ) + + @classmethod + @override + def event_name(cls) -> str: + return "dangling_kernel_detected" + + +@dataclass +class DanglingContainerDetected(AgentOperationEvent): + container_id: ContainerId + + def serialize(self) -> tuple: + return (str(self.container_id),) + + @classmethod + def deserialize(cls, value: tuple) -> Self: + return cls( + ContainerId(value[0]), + ) + + @classmethod + @override + def event_name(cls) -> str: + return "dangling_container_detected" diff --git a/src/ai/backend/common/runner/BUILD b/src/ai/backend/common/runner/BUILD new file mode 100644 index 00000000000..73574424040 --- /dev/null +++ b/src/ai/backend/common/runner/BUILD @@ -0,0 +1 @@ +python_sources(name="src") diff --git a/src/ai/backend/common/runner/__init__.py b/src/ai/backend/common/runner/__init__.py new file mode 100644 index 00000000000..d9d5e8b7134 --- /dev/null +++ b/src/ai/backend/common/runner/__init__.py @@ -0,0 +1,6 @@ +from .runner import LoopRunner, NopResourceCtx + +__all__ = ( + "LoopRunner", + "NopResourceCtx", +) diff --git a/src/ai/backend/common/runner/runner.py b/src/ai/backend/common/runner/runner.py new file mode 100644 index 00000000000..8e24a81fef7 --- /dev/null +++ b/src/ai/backend/common/runner/runner.py @@ -0,0 +1,111 @@ +import asyncio +from collections.abc import Sequence +from typing import Generic, Optional, Protocol, TypeVar + +TResourceCtx = TypeVar("TResourceCtx") +TResourceCtx_Co = TypeVar("TResourceCtx_Co", covariant=True) +TResourceCtx_Contra = TypeVar("TResourceCtx_Contra", contravariant=True) + + +class ResourceCtx(Protocol[TResourceCtx_Co]): + async def open(self) -> TResourceCtx_Co: + pass + + async def close(self) -> None: + pass + + +class NopResourceCtx: + def __init__(self) -> None: + pass + + async def open(self) -> None: + return None + + async def close(self) -> None: + return None + + +class Probe(Protocol[TResourceCtx_Contra]): + async def probe(self, resource_ctx: TResourceCtx_Contra) -> None: + pass + + +class HeartbeatService(Protocol[TResourceCtx_Contra]): + async def heartbeat(self, resource_ctx: TResourceCtx_Contra) -> None: + pass + + +class LoopRunner(Generic[TResourceCtx]): + _closed: bool + _runner_task: Optional[asyncio.Task] + + _resource_ctx: ResourceCtx[TResourceCtx] + + _interval: float + _probes: Sequence[Probe[TResourceCtx]] + _heartbeart_services: Sequence[HeartbeatService[TResourceCtx]] + + def __init__( + self, + interval: float, + resource_ctx: ResourceCtx[TResourceCtx], + probes: Sequence[Probe[TResourceCtx]], + heartbeart_services: Sequence[HeartbeatService[TResourceCtx]] = tuple(), + ) -> None: + self._closed = False + self._runner_task = None + + self._resource_ctx = resource_ctx + + self._interval = interval + self._probes = probes + self._heartbeart_services = heartbeart_services + + @classmethod + def with_nop_resource_ctx( + cls, + interval: float, + probes: Sequence[Probe[None]], + heartbeart_services: Sequence[HeartbeatService[None]] = tuple(), + ) -> "LoopRunner[None]": + obj = LoopRunner[None](interval, NopResourceCtx(), probes, heartbeart_services) + return obj + + @classmethod + def nop(cls) -> "LoopRunner[None]": + obj = LoopRunner[None](0, NopResourceCtx(), []) + obj._closed = True + return obj + + async def close(self) -> None: + self._closed = True + if self._runner_task is not None: + self._runner_task.cancel() + await self._resource_ctx.close() + + async def _run_probes(self, resource: TResourceCtx) -> None: + for probe in self._probes: + await probe.probe(resource) + + async def _heartbeat(self, resource: TResourceCtx) -> None: + for service in self._heartbeart_services: + await service.heartbeat(resource) + + async def _task(self) -> None: + while not self._closed: + try: + resource = await self._resource_ctx.open() + await self._run_probes(resource) + await self._heartbeat(resource) + finally: + await self._resource_ctx.close() + + if self._closed: + break + await asyncio.sleep(self._interval) + + async def run(self) -> None: + self._runner_task = asyncio.create_task(self._task()) + # Sleep 0 to allow the event loop to start + await asyncio.sleep(0) diff --git a/src/ai/backend/manager/event_dispatcher/dispatch.py b/src/ai/backend/manager/event_dispatcher/dispatch.py index 4b02d708bd7..5367b4d13ef 100644 --- a/src/ai/backend/manager/event_dispatcher/dispatch.py +++ b/src/ai/backend/manager/event_dispatcher/dispatch.py @@ -193,6 +193,15 @@ def _dispatch_agent_events( self._agent_event_handler.handle_agent_error, name="agent.error", ) + # evd.consume( + # DanglingKernelDetected, self, handle_dangling_kernel, name="api.session.dangling-kernel" + # ) + # evd.consume( + # DanglingContainerDetected, + # self, + # handle_dangling_container, + # name="api.session.dangling-container", + # ) def _dispatch_image_events(self, event_dispatcher: EventDispatcher) -> None: evd = event_dispatcher.with_reporters([EventLogger(self._db)]) diff --git a/src/ai/backend/manager/registry.py b/src/ai/backend/manager/registry.py index d248eeca8b3..92d406fdca4 100644 --- a/src/ai/backend/manager/registry.py +++ b/src/ai/backend/manager/registry.py @@ -64,6 +64,8 @@ AgentImagesRemoveEvent, AgentStartedEvent, AgentTerminatedEvent, + DanglingContainerDetected, + DanglingKernelDetected, DoAgentResourceCheckEvent, ) from ai.backend.common.events.event_types.image.anycast import ( @@ -4356,6 +4358,20 @@ async def handle_check_agent_resource( log.info("agent@{0} occupied slots: {1}", source, row["occupied_slots"].to_json()) +async def handle_dangling_kernel( + context: AgentRegistry, source: AgentId, event: DanglingKernelDetected +) -> None: + # TODO: Impl dangling kernel handler + pass + + +async def handle_dangling_container( + context: AgentRegistry, source: AgentId, event: DanglingContainerDetected +) -> None: + # TODO: Impl dangling container handler + pass + + async def check_scaling_group( conn: SAConnection, scaling_group: str | None,