diff --git a/changes/710.feature.md b/changes/710.feature.md new file mode 100644 index 00000000000..a1265e3dd3b --- /dev/null +++ b/changes/710.feature.md @@ -0,0 +1,2 @@ +Add support for FileBrowser in Storage Proxy. +Migrate to mono-repository. diff --git a/src/ai/backend/client/cli/__init__.py b/src/ai/backend/client/cli/__init__.py index e45d170cc94..d24feaab4b9 100644 --- a/src/ai/backend/client/cli/__init__.py +++ b/src/ai/backend/client/cli/__init__.py @@ -2,6 +2,7 @@ from . import config # noqa # type: ignore from . import dotfile # noqa # type: ignore from . import extensions # noqa # type: ignore +from . import filebrowser # noqa # type: ignore from . import model # noqa # type: ignore from . import server_log # noqa # type: ignore from . import service # noqa # type: ignore diff --git a/src/ai/backend/client/cli/admin/__init__.py b/src/ai/backend/client/cli/admin/__init__.py index 1388dcda6b8..17e9f4873d4 100644 --- a/src/ai/backend/client/cli/admin/__init__.py +++ b/src/ai/backend/client/cli/admin/__init__.py @@ -13,6 +13,7 @@ def admin(): agent, domain, etcd, + filebrowser, group, image, keypair, diff --git a/src/ai/backend/client/cli/admin/filebrowser.py b/src/ai/backend/client/cli/admin/filebrowser.py new file mode 100644 index 00000000000..1306a9b78ea --- /dev/null +++ b/src/ai/backend/client/cli/admin/filebrowser.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from . import admin + + +@admin.group() +def filebrowser() -> None: + """ + FileBrowser administration commands. + """ diff --git a/src/ai/backend/client/cli/filebrowser.py b/src/ai/backend/client/cli/filebrowser.py new file mode 100644 index 00000000000..417b0cf25c1 --- /dev/null +++ b/src/ai/backend/client/cli/filebrowser.py @@ -0,0 +1,61 @@ +import sys +from typing import List + +import click + +from ai.backend.client.session import Session + +from .main import main +from .pretty import print_error + + +@main.group() +def filebrowser(): + """Set of filebrowser operations""" + + +@filebrowser.command() +@click.option( + "-host", + "--Host", + help="Host:Volume reference for a filebrowser session.", + type=str, + metavar="HOST", + multiple=False, +) +@click.option( + "-vf", + "--vfolders", + help="Vfolder to be attached for a filebrowser session.", + type=str, + metavar="VFOLDERS", + multiple=True, +) +def create(host: str, vfolders: List[str]) -> None: + """Create or update filebrowser session""" + vfolder = list(vfolders) + + with Session() as session: + try: + session.FileBrowser.create_or_update_browser(host, vfolder) + except Exception as e: + print_error(e) + sys.exit(1) + + +@filebrowser.command() +@click.option( + "-cid", + "--container_id", + help="Container ID of user filebrowser session.", + type=str, + metavar="CID", +) +def destroy(container_id: str) -> None: + """Destroy filebrowser session using Container ID.""" + with Session() as session: + try: + session.FileBrowser.destroy_browser(container_id) + except Exception as e: + print_error(e) + sys.exit(1) diff --git a/src/ai/backend/client/func/filebrowser.py b/src/ai/backend/client/func/filebrowser.py new file mode 100644 index 00000000000..526d7b83424 --- /dev/null +++ b/src/ai/backend/client/func/filebrowser.py @@ -0,0 +1,48 @@ +import asyncio +import webbrowser + +from ..request import Request +from .base import BaseFunction, api_function + +__all__ = ("FileBrowser",) + + +class FileBrowser(BaseFunction): + @api_function + @classmethod + async def create_or_update_browser(self, host: str, vfolders: list[str]) -> str: + rqst = Request("POST", "/storage/filebrowser/create") + rqst.set_json({"host": host, "vfolders": vfolders}) + async with rqst.fetch() as resp: + # give a grace period for filebrowser server to initialize and start + await asyncio.sleep(2) + result = await resp.json() + if result["status"] == "ok": + if result["addr"] == "0": + print("the number of container exceeds the maximum limit.") + print( + f""" + File Browser started. + Container ID: + {result['container_id']} + URL: {result['addr']} + """, + ) + webbrowser.open_new_tab(result["addr"]) + else: + raise Exception + return result + + @api_function + @classmethod + async def destroy_browser(self, container_id: str) -> str: + rqst = Request("DELETE", "/storage/filebrowser/destroy") + rqst.set_json({"container_id": container_id}) + + async with rqst.fetch() as resp: + result = await resp.json() + if result["status"] == "ok": + print("File Browser destroyed.") + else: + raise Exception + return result diff --git a/src/ai/backend/client/session.py b/src/ai/backend/client/session.py index a17090411a9..9da7c2044d0 100644 --- a/src/ai/backend/client/session.py +++ b/src/ai/backend/client/session.py @@ -270,6 +270,7 @@ class BaseSession(metaclass=abc.ABCMeta): "Resource", "KeypairResourcePolicy", "VFolder", + "FileBrowser", "Dotfile", "ServerLog", "Permission", @@ -303,6 +304,7 @@ def __init__( from .func.domain import Domain from .func.dotfile import Dotfile from .func.etcd import EtcdConfig + from .func.filebrowser import FileBrowser from .func.group import Group from .func.image import Image from .func.keypair import KeyPair @@ -340,6 +342,7 @@ def __init__( self.ScalingGroup = ScalingGroup self.SessionTemplate = SessionTemplate self.VFolder = VFolder + self.FileBrowser = FileBrowser self.Dotfile = Dotfile self.ServerLog = ServerLog self.Permission = Permission diff --git a/src/ai/backend/common/utils.py b/src/ai/backend/common/utils.py index 76324dc55bc..5b5e789fb69 100644 --- a/src/ai/backend/common/utils.py +++ b/src/ai/backend/common/utils.py @@ -7,6 +7,7 @@ import sys import uuid from collections import OrderedDict +from contextlib import asynccontextmanager from datetime import timedelta from itertools import chain from typing import TYPE_CHECKING, Any, Iterable, Iterator, Mapping, Tuple, TypeVar, Union @@ -284,3 +285,11 @@ async def remove_by_mountpoint(self, mountpoint): if entry: return await self.remove_entry(entry) return False + + +@asynccontextmanager +async def closing_async(thing): + try: + yield thing + finally: + await thing.close() diff --git a/src/ai/backend/manager/api/filebrowser.py b/src/ai/backend/manager/api/filebrowser.py new file mode 100644 index 00000000000..d92bef4c8d6 --- /dev/null +++ b/src/ai/backend/manager/api/filebrowser.py @@ -0,0 +1,295 @@ +import functools +import logging +from typing import Any, Awaitable, Callable, Iterable, Mapping, Tuple + +import aiohttp +import aiohttp_cors +import sqlalchemy as sa +import trafaret as t +from aiohttp import web + +from ai.backend.common.logging import BraceStyleAdapter +from ai.backend.manager.models.storage import AUTH_TOKEN_HDR + +from ..exceptions import InvalidArgument +from ..models import ( + VFolderAccessStatus, + VFolderPermission, + query_accessible_vfolders, + vfolder_permissions, + vfolders, +) +from .auth import auth_required +from .context import RootContext +from .exceptions import InvalidAPIParameters, VFolderNotFound +from .manager import READ_ALLOWED, server_status_required +from .types import CORSOptions, WebMiddleware +from .utils import check_api_params +from .vfolder import ensure_vfolder_status + +logger = logging.getLogger(__name__) + +log = BraceStyleAdapter(logging.getLogger(__name__)) + +VFolderRow = Mapping[str, Any] + + +async def get_vfid(root_ctx: RootContext, host: str, name: str) -> str: + async with root_ctx.db.begin_readonly() as conn: + query = ( + sa.select([vfolders.c.id]) + .select_from(vfolders) + .where((vfolders.c.host == host) & (vfolders.c.name == name)) + ) + folder_id = await conn.scalar(query) + return folder_id + + +def vfolder_permission_required(perm: VFolderPermission): + """ + Checks if the target vfolder exists and is either: + - owned by the current access key, or + - allowed accesses by the access key under the specified permission. + + The decorated handler should accept an extra argument + which contains a dict object describing the matched VirtualFolder table row. + """ + + def _wrapper(handler: Callable[..., Awaitable[web.Response]]): + @functools.wraps(handler) + async def _wrapped(request: web.Request, *args, **kwargs) -> web.Response: + root_ctx: RootContext = request.app["_root.context"] + domain_name = request["user"]["domain_name"] + user_role = request["user"]["role"] + user_uuid = request["user"]["uuid"] + + params = await request.json() + folder_names = params["vfolders"] + + for folder_name in folder_names: + allowed_vfolder_types = await root_ctx.shared_config.get_vfolder_types() + vf_user_cond = None + vf_group_cond = None + if perm == VFolderPermission.READ_ONLY: + # if READ_ONLY is requested, any permission accepts. + invited_perm_cond = vfolder_permissions.c.permission.in_( + [ + VFolderPermission.READ_ONLY, + VFolderPermission.READ_WRITE, + VFolderPermission.RW_DELETE, + ] + ) + if not request["is_admin"]: + vf_group_cond = vfolders.c.permission.in_( + [ + VFolderPermission.READ_ONLY, + VFolderPermission.READ_WRITE, + VFolderPermission.RW_DELETE, + ] + ) + elif perm == VFolderPermission.READ_WRITE: + invited_perm_cond = vfolder_permissions.c.permission.in_( + [ + VFolderPermission.READ_WRITE, + VFolderPermission.RW_DELETE, + ] + ) + if not request["is_admin"]: + vf_group_cond = vfolders.c.permission.in_( + [ + VFolderPermission.READ_WRITE, + VFolderPermission.RW_DELETE, + ] + ) + elif perm == VFolderPermission.RW_DELETE: + # If RW_DELETE is requested, only RW_DELETE accepts. + invited_perm_cond = ( + vfolder_permissions.c.permission == VFolderPermission.RW_DELETE + ) + if not request["is_admin"]: + vf_group_cond = vfolders.c.permission == VFolderPermission.RW_DELETE + else: + # Otherwise, just compare it as-is (for future compatibility). + invited_perm_cond = vfolder_permissions.c.permission == perm + if not request["is_admin"]: + vf_group_cond = vfolders.c.permission == perm + async with root_ctx.db.begin() as conn: + entries = await query_accessible_vfolders( + conn, + user_uuid, + user_role=user_role, + domain_name=domain_name, + allowed_vfolder_types=allowed_vfolder_types, + extra_vf_conds=(vfolders.c.name == folder_name), + extra_invited_vf_conds=invited_perm_cond, + extra_vf_user_conds=vf_user_cond, + extra_vf_group_conds=vf_group_cond, + ) + if len(entries) == 0: + raise VFolderNotFound("Your operation may be permission denied.") + return await handler(request, *args, **kwargs) + + return _wrapped + + return _wrapper + + +def vfolder_check_exists(): + def _wrapper(handler: Callable[..., Awaitable[web.Response]]): + @functools.wraps(handler) + async def _wrapped(request: web.Request, *args, **kwargs) -> web.Response: + root_ctx: RootContext = request.app["_root.context"] + user_uuid = request["user"]["uuid"] + + params = await request.json() + folder_names = params["vfolders"] + + for folder_name in folder_names: + async with root_ctx.db.begin() as conn: + j = sa.join( + vfolders, + vfolder_permissions, + vfolders.c.id == vfolder_permissions.c.vfolder, + isouter=True, + ) + query = ( + sa.select("*") + .select_from(j) + .where( + ( + (vfolders.c.user == user_uuid) + | (vfolder_permissions.c.user == user_uuid) + ) + & (vfolders.c.name == folder_name) + ) + ) + try: + result = await conn.execute(query) + row = result.first() + except sa.exc.DataError: + raise InvalidAPIParameters + if row is None: + raise VFolderNotFound(folder_name) + return await handler(request, *args, **kwargs) + + return _wrapped + + return _wrapper + + +@auth_required +@server_status_required(READ_ALLOWED) +@check_api_params( + t.Dict( + { + t.Key("host"): t.String, + t.Key("vfolders"): t.List(t.String), + }, + ), +) +@vfolder_permission_required(VFolderPermission.READ_WRITE) +@vfolder_check_exists() +async def create_or_update_filebrowser( + request: web.Request, + params: Any, +) -> web.Response: + root_ctx: RootContext = request.app["_root.context"] + vfolders = [] + host = params["host"] + for vfolder_name in params["vfolders"]: + vfid = (await get_vfid(root_ctx, host, vfolder_name),) + vfolders.append( + { + "name": vfolder_name, + "vfid": str(vfid[0]), + }, + ) + await ensure_vfolder_status(request, VFolderAccessStatus.READABLE, folder_id=str(vfid[0])) + + proxy_name, tmp = root_ctx.storage_manager.split_host(host) + try: + proxy_info = root_ctx.storage_manager._proxies[proxy_name] + except KeyError: + raise InvalidArgument("There is no such storage proxy", proxy_name) + headers = {} + + headers[AUTH_TOKEN_HDR] = proxy_info.secret + try: + async with proxy_info.session.request( + "POST", + proxy_info.manager_api_url / "storage/filebrowser/create", + headers=headers, + json={"host": host, "vfolders": vfolders}, + ) as client_resp: + return web.json_response(await client_resp.json()) + except aiohttp.ClientResponseError: + raise + + +@auth_required +@server_status_required(READ_ALLOWED) +@check_api_params( + t.Dict( + { + t.Key("container_id"): t.String, + }, + ), +) +async def destroy_filebrowser( + request: web.Request, + params: Any, +) -> web.Response: + root_ctx: RootContext = request.app["_root.context"] + container_id = params["container_id"] + + volumes = await root_ctx.storage_manager.get_all_volumes() + + # search for volume among available volumes which has file browser container id in order to destroy + for volume in volumes: + proxy_name = volume[0] + try: + proxy_info = root_ctx.storage_manager._proxies[proxy_name] + except KeyError: + raise InvalidArgument("There is no such storage proxy", proxy_name) + + headers = {} + headers[AUTH_TOKEN_HDR] = proxy_info.secret + auth_token = proxy_info.secret + try: + async with proxy_info.session.request( + "DELETE", + proxy_info.manager_api_url / "storage/filebrowser/destroy", + headers=headers, + json={"container_id": container_id, "auth_token": auth_token}, + ) as client_resp: + return web.json_response(await client_resp.json()) + except aiohttp.ClientResponseError: + raise + return web.json_response({"status": "fail"}) + + +async def init(app: web.Application) -> None: + pass + + +async def shutdown(app: web.Application) -> None: + pass + + +def create_app( + default_cors_options: CORSOptions, +) -> Tuple[web.Application, Iterable[WebMiddleware]]: + app = web.Application() + app["prefix"] = "storage/filebrowser" + app["api_versions"] = ( + 2, + 3, + 4, + ) + app.on_startup.append(init) + app.on_shutdown.append(shutdown) + cors = aiohttp_cors.setup(app, defaults=default_cors_options) + cors.add(app.router.add_route("POST", "/create", create_or_update_filebrowser)) + cors.add(app.router.add_route("DELETE", "/destroy", destroy_filebrowser)) + + return app, [] diff --git a/src/ai/backend/manager/server.py b/src/ai/backend/manager/server.py index f7f2e1a4bca..70682e836d3 100644 --- a/src/ai/backend/manager/server.py +++ b/src/ai/backend/manager/server.py @@ -669,6 +669,7 @@ async def server_main( ".auth", ".ratelimit", ".vfolder", + ".filebrowser", ".admin", ".service", ".session", diff --git a/src/ai/backend/storage/api/manager.py b/src/ai/backend/storage/api/manager.py index dc6952112d7..bf79f3ee5b9 100644 --- a/src/ai/backend/storage/api/manager.py +++ b/src/ai/backend/storage/api/manager.py @@ -22,6 +22,7 @@ from ..abc import AbstractVolume from ..context import Context from ..exception import InvalidSubpathError, StorageProxyError, VFolderNotFoundError +from ..filebrowser import filebrowser from ..types import VFolderCreationOptions from ..utils import check_params, log_manager_api_entry @@ -649,12 +650,57 @@ async def delete_files(request: web.Request) -> web.Response: ) +async def create_or_update_filebrowser(request: web.Request) -> web.Response: + ctx: Context = request.app["ctx"] + params = await request.json() + host: str + port: int + container_id: str + host, port, container_id = await filebrowser.create_or_update( + ctx, + params["host"], + params["vfolders"], + ) + log.info("Filebrowser created at %s:%d", host, port) + return web.json_response( + { + "addr": f"http://{host}:{port}", # TODO: SSL? + "container_id": f"{container_id}", + "status": "ok", + }, + ) + + +async def destroy_filebrowser(request: web.Request) -> web.Response: + async with check_params( + request, + t.Dict( + { + t.Key("auth_token"): t.String, + t.Key("container_id"): t.String, + }, + ), + ) as params: + await log_manager_api_entry(log, "destroy_filebrowser", params) + ctx: Context = request.app["ctx"] + try: + await filebrowser.destroy_container(ctx, params["container_id"]) + return web.json_response( + { + "status": "ok", + }, + ) + except Exception: + raise Exception + + async def init_manager_app(ctx: Context) -> web.Application: app = web.Application( middlewares=[ token_auth_middleware, ], ) + app["ctx"] = ctx app.router.add_route("GET", "/", get_status) app.router.add_route("GET", "/volumes", get_volumes) @@ -679,4 +725,10 @@ async def init_manager_app(ctx: Context) -> web.Application: app.router.add_route("POST", "/folder/file/download", create_download_session) app.router.add_route("POST", "/folder/file/upload", create_upload_session) app.router.add_route("POST", "/folder/file/delete", delete_files) + app.router.add_route( + "POST", + "/storage/filebrowser/create", + create_or_update_filebrowser, + ) + app.router.add_route("DELETE", "/storage/filebrowser/destroy", destroy_filebrowser) return app diff --git a/src/ai/backend/storage/config.py b/src/ai/backend/storage/config.py index 3cd3b6a641b..e96e87c1a99 100644 --- a/src/ai/backend/storage/config.py +++ b/src/ai/backend/storage/config.py @@ -52,6 +52,36 @@ t.Key("aiomonitor-port", default=48300): t.Int[1:65535], }, ), + t.Key("filebrowser"): t.Dict( + { + t.Key("image"): t.String, + t.Key("service_ip"): t.IP, + t.Key("service_port", default=None): t.Int, + t.Key("port_range", default="4000-5000"): t.String, + t.Key("settings_path", default=None): tx.Path(type="dir"), + t.Key("mount_path", default=None): tx.Path(type="dir"), + t.Key("db_path", default=None): tx.Path( + type="file", + allow_nonexisting=True, + allow_devnull=True, + ), + t.Key( + "filebrowser_key", + ): t.String, + t.Key("max_cpu", default=1): t.Int[1:_max_cpu_count], + t.Key("max_mem", default="1g"): tx.BinarySize, + t.Key("max_containers", default=32): t.Int[1:], + t.Key("user_id", default=None): tx.UserID( + default_uid=_file_perm.st_uid, + ), + t.Key("group_id", default=None): tx.GroupID( + default_gid=_file_perm.st_gid, + ), + t.Key("activity_check_timeout", default=30): t.Int, + t.Key("activity_check_freq", default=1): t.Int, + t.Key("idle_timeout", default=300): t.Int, + }, + ), t.Key("logging"): logging_config_iv, t.Key("api"): t.Dict( { diff --git a/src/ai/backend/storage/context.py b/src/ai/backend/storage/context.py index 20635cae763..e5284921845 100644 --- a/src/ai/backend/storage/context.py +++ b/src/ai/backend/storage/context.py @@ -34,6 +34,7 @@ class Context: pid: int etcd: AsyncEtcd local_config: Mapping[str, Any] + container_id: Any def __init__( self, diff --git a/src/ai/backend/storage/filebrowser/BUILD b/src/ai/backend/storage/filebrowser/BUILD new file mode 100644 index 00000000000..db46e8d6c97 --- /dev/null +++ b/src/ai/backend/storage/filebrowser/BUILD @@ -0,0 +1 @@ +python_sources() diff --git a/src/ai/backend/storage/filebrowser/__init__.py b/src/ai/backend/storage/filebrowser/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/ai/backend/storage/filebrowser/config_browser_app.py b/src/ai/backend/storage/filebrowser/config_browser_app.py new file mode 100644 index 00000000000..23b5475c4c4 --- /dev/null +++ b/src/ai/backend/storage/filebrowser/config_browser_app.py @@ -0,0 +1,89 @@ +import json +from pathlib import Path + +import aiofiles + + +async def prepare_filebrowser_app_config( + settings_path: Path, + service_port: int, + filebrowser_key: str, +) -> None: + filebrowser_config = { + "settings": { + "key": filebrowser_key, + "signup": False, + "createUserDir": False, + "defaults": { + "scope": ".", + "locale": "en", + "viewMode": "list", + "singleClick": False, + "sorting": { + "by": "name", + "asc": False, + }, + "perm": { + "admin": False, + "execute": True, + "create": True, + "rename": True, + "modify": True, + "delete": True, + "share": True, + "download": True, + }, + "commands": [], + "hideDotfiles": False, + }, + "authMethod": "noauth", + "branding": { + "name": "BACKEND.AI Web Browser", + "disableExternal": True, + "files": "/filebrowser_app/branding/", + "theme": "", + }, + "commands": { + "after_copy": [], + "after_delete": [], + "after_rename": [], + "after_save": [], + "after_upload": [], + "before_copy": [], + "before_delete": [], + "before_rename": [], + "before_save": [], + "before_upload": [], + }, + "shell": [], + "rules": [], + }, + "server": { + "root": "/data/", + "baseURL": "", + "socket": "", + "tlsKey": "", + "tlsCert": "", + "port": str(service_port), + "address": "", + "log": "stdout", + "enableThumbnails": False, + "resizePreview": False, + "enableExec": False, + }, + "auther": {"recaptcha": None}, + } + + async with aiofiles.open(settings_path / "config.json", mode="w") as file: + await file.write(json.dumps(filebrowser_config)) + + filebrowser_app_settings = { + "port": service_port, + "baseURL": "", + "address": "", + "log": "stdout", + "database": f"/filebrowser_app/filebrowser_{service_port}.db", + "root": "/data/", + } + async with aiofiles.open(settings_path / "settings.json", mode="w") as file: + await file.write(json.dumps(filebrowser_app_settings)) diff --git a/src/ai/backend/storage/filebrowser/database.py b/src/ai/backend/storage/filebrowser/database.py new file mode 100644 index 00000000000..a044f3a7b53 --- /dev/null +++ b/src/ai/backend/storage/filebrowser/database.py @@ -0,0 +1,78 @@ +from typing import Any + +import aiosqlite + + +class FilebrowserTrackerDB: + def __init__(self, db_path: str): + self.db_path = db_path + + async def __ainit__(self): + async with aiosqlite.connect(self.db_path) as db: + await db.execute(""" + CREATE TABLE IF NOT EXISTS containers ( + container_id TEXT PRIMARY KEY, + container_name TEXT, + service_ip TEXT, + service_port INTEGER, + config TEXT, + status TEXT, + timestamp TEXT + ) + """) + await db.commit() + + @classmethod + async def create(cls, db_path: str) -> "FilebrowserTrackerDB": + self = cls(db_path) + await self.__ainit__() + return self + + async def get_all_containers(self) -> Any: + async with aiosqlite.connect(self.db_path) as db: + async with db.cursor() as cursor: + await cursor.execute("SELECT * FROM containers;") + rows = await cursor.fetchall() + return rows + + async def get_filebrowser_by_container_id(self, container_id: str) -> Any: + async with aiosqlite.connect(self.db_path) as db: + async with db.cursor() as cursor: + await cursor.execute( + "SELECT * FROM containers WHERE container_id=?", (container_id,) + ) + row = await cursor.fetchone() + return row + + async def insert_new_container( + self, + container_id: str, + container_name: str, + service_ip: str, + service_port: int, + config: dict[str, Any], + status: str, + timestamp: str, + ): + async with aiosqlite.connect(self.db_path) as db: + await db.execute( + ( + "INSERT INTO containers (container_id, container_name, service_ip," + " service_port, config, status, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?);" + ), + ( + container_id, + container_name, + service_ip, + service_port, + str(config), + status, + timestamp, + ), + ) + await db.commit() + + async def delete_container_record(self, container_id: str) -> None: + async with aiosqlite.connect(self.db_path) as db: + await db.execute("DELETE FROM containers WHERE container_id=?", (container_id,)) + await db.commit() diff --git a/src/ai/backend/storage/filebrowser/filebrowser.py b/src/ai/backend/storage/filebrowser/filebrowser.py new file mode 100644 index 00000000000..cb6bc541276 --- /dev/null +++ b/src/ai/backend/storage/filebrowser/filebrowser.py @@ -0,0 +1,244 @@ +from __future__ import annotations + +import logging +from datetime import datetime +from pathlib import Path +from typing import Any, List, Mapping, NamedTuple, Type +from uuid import UUID + +import aiodocker +import pkg_resources + +from ai.backend.common.logging import BraceStyleAdapter +from ai.backend.common.utils import closing_async +from ai.backend.storage.abc import AbstractVolume +from ai.backend.storage.context import Context +from ai.backend.storage.utils import get_available_port +from ai.backend.storage.vfs import BaseVolume + +from .config_browser_app import prepare_filebrowser_app_config +from .database import FilebrowserTrackerDB + +logger = logging.getLogger(__name__) +log = BraceStyleAdapter(logging.getLogger(__name__)) + +BACKENDS: Mapping[str, Type[AbstractVolume]] = { + "vfs": BaseVolume, +} + + +__all__ = ( + "create_or_update", + "destroy_container", + "get_container_by_id", + "get_filebrowsers", + "get_network_stats", + "check_container_existance", +) + + +class FileBrowserResult(NamedTuple): + container_id: str + port: int + token: str + + +class NetworkStatsResult(NamedTuple): + rx_bytes: int + tx_bytes: int + + +async def create_or_update( + ctx: Context, + host: str, + vfolders: list[dict], +) -> FileBrowserResult: + image = ctx.local_config["filebrowser"]["image"] + service_ip = ctx.local_config["filebrowser"]["service_ip"] + service_port = ctx.local_config["filebrowser"]["service_port"] + max_containers = ctx.local_config["filebrowser"]["max_containers"] + user_id = ctx.local_config["filebrowser"]["user_id"] + group_id = ctx.local_config["filebrowser"]["group_id"] + cpu_count = ctx.local_config["filebrowser"]["max_cpu"] + memory = ctx.local_config["filebrowser"]["max_mem"] + db_path = ctx.local_config["filebrowser"]["db_path"] + p = Path(pkg_resources.resource_filename(__name__, "")) + storage_proxy_root_path_index = p.parts.index("storage") + settings_path = Path(*p.parts[0 : storage_proxy_root_path_index + 1]) / "filebrowser_app/" + _, requested_volume = host.split(":", maxsplit=1) + found_volume = False + volumes = ctx.local_config["volume"] + for volume_name in volumes.keys(): + if requested_volume == volume_name: + volume_cls: Type[AbstractVolume] = BACKENDS[volumes.get(volume_name)["backend"]] + mount_path = Path(volumes.get(volume_name)["path"]) + volume_obj = volume_cls( + local_config=ctx.local_config, + mount_path=mount_path, + fsprefix=None, + options={}, + ) + found_volume = True + break + if not found_volume: + raise ValueError( + f"Requested volume '{requested_volume}' does not exist in the configuration." + ) + + port_range = ctx.local_config["filebrowser"]["port_range"].split("-") + service_port = get_available_port(port_range) + running_docker_containers = await get_filebrowsers() + if len(running_docker_containers) >= max_containers: + log.error("Can't create new container. Number of containers exceed the maximum limit.") + return FileBrowserResult("0", 0, "0") + await prepare_filebrowser_app_config( + settings_path, service_port, ctx.local_config["filebrowser"]["filebrowser_key"] + ) + async with closing_async(aiodocker.Docker()) as docker: + config = { + "Cmd": [ + "/filebrowser_app/start.sh", + f"{user_id}", + f"{group_id}", + f"{service_port}", + ], + "ExposedPorts": { + f"{service_port}/tcp": {}, + }, + "Image": image, + "HostConfig": { + "PortBindings": { + f"{service_port}/tcp": [ + { + "HostIp": f"{service_ip}", + "HostPort": f"{service_port}/tcp", + }, + ], + }, + "CpuCount": cpu_count, + "Memory": memory, + "Mounts": [ + { + "Target": "/filebrowser_app/", + "Source": f"{settings_path}", + "Type": "bind", + }, + ], + }, + } + for vfolder in vfolders: + filebrowser_mount_path = str( + volume_obj.mangle_vfpath(UUID(vfolder["vfid"])), + ) + config["HostConfig"]["Mounts"].append( + { + "Target": f"/data/{vfolder['name']}", + "Source": filebrowser_mount_path, + "Type": "bind", + }, + ) + container_name = f"ai.backend.container-filebrowser-{service_port}" + container = await docker.containers.create_or_replace( + config=config, + name=container_name, + ) + container_id = container._id + await container.start() + tracker_db = await FilebrowserTrackerDB.create(db_path) + await tracker_db.insert_new_container( + container_id, + container_name, + service_ip, + service_port, + config, + "RUNNING", + str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")), + ) + return FileBrowserResult(service_ip, service_port, container_id) + + +async def recreate_container(container_name: str, config: dict[str, Any]) -> None: + async with closing_async(aiodocker.Docker()) as docker: + try: + docker = aiodocker.Docker() + container = await docker.containers.create_or_replace( + config=config, + name=container_name, + ) + await container.start() + except aiodocker.exceptions.DockerError as e: + logger.error(f"Failure to recreate container: {container_name}. Error: {e}") + + +async def check_container_existance(container_id: str) -> bool: + async with closing_async(aiodocker.Docker()) as docker: + for container in await docker.containers.list(): + if container._id == container_id: + return True + return False + + +async def close_all_filebrowser_containers(ctx) -> None: + db_path = ctx.local_config["filebrowser"]["db_path"] + tracker_db = await FilebrowserTrackerDB.create(db_path) + async with closing_async(aiodocker.Docker()) as docker: + for container in await docker.containers.list(all=True): + if "ai.backend.container-filebrowser" in container._container["Names"][0]: + await container.stop() + await container.delete(force=True) + await tracker_db.delete_container_record(container._id) + + +async def destroy_container(ctx: Context, container_id: str) -> None: + db_path = ctx.local_config["filebrowser"]["db_path"] + tracker_db = await FilebrowserTrackerDB.create(db_path) + async with closing_async(aiodocker.Docker()) as docker: + if await check_container_existance(container_id) is True: + container = aiodocker.docker.DockerContainers(docker).container( + container_id=container_id, + ) + try: + await container.stop() + await container.delete(force=True) + await tracker_db.delete_container_record(container_id) + except aiodocker.exceptions.DockerError as e: + log.error(f"Failure to destroy container {container_id[0:7]} ", e) + + +async def get_container_by_id(container_id: str) -> Any: + async with closing_async(aiodocker.Docker()) as docker: + container = aiodocker.docker.DockerContainers(docker).container( + container_id=container_id, + ) + return container + + +async def get_filebrowsers() -> List[str]: + container_list = [] + async with closing_async(aiodocker.Docker()) as docker: + containers = await aiodocker.docker.DockerContainers(docker).list(all=True) + for container in containers: + stats = await container.stats(stream=False) + name = stats[0]["name"] + cnt_id = stats[0]["id"] + if "ai.backend.container-filebrowser" in name: + container_list.append(cnt_id) + return container_list + + +async def get_network_stats(container_id: str) -> NetworkStatsResult: + async with closing_async(aiodocker.Docker()) as docker: + try: + container = aiodocker.docker.DockerContainers(docker).container( + container_id=container_id, + ) + if container in await docker.containers.list(): + stats = await container.stats(stream=False) + else: + return NetworkStatsResult(0, 0) + except aiodocker.exceptions.DockerError as e: + log.error(f"Failure to get network stats for container {container_id[0:7]}, {e} ") + return NetworkStatsResult(0, 0) + return NetworkStatsResult( + int(stats[0]["networks"]["eth0"]["rx_bytes"]), int(stats[0]["networks"]["eth0"]["tx_bytes"]) + ) diff --git a/src/ai/backend/storage/filebrowser/monitor.py b/src/ai/backend/storage/filebrowser/monitor.py new file mode 100644 index 00000000000..936913f4e88 --- /dev/null +++ b/src/ai/backend/storage/filebrowser/monitor.py @@ -0,0 +1,105 @@ +import asyncio +import logging +import time + +import aiotools + +from ai.backend.storage.context import Context + +from .filebrowser import destroy_container, get_filebrowsers, get_network_stats + +log = logging.getLogger(__name__) + + +async def network_monitor( + ctx: Context, + container_id: str, + activity_check_freq: int, + activity_check_timeout: int, +) -> None: + start_time = time.monotonic() + network_window = [] + stats = (0, 0) + idle_timeout = ctx.local_config["filebrowser"]["idle_timeout"] + while True: + current_time = time.monotonic() + if container_id not in await get_filebrowsers(): + break + for attempt in range(2): + try: + stats = await get_network_stats(container_id) + break + except Exception as e: + log.error("Failed to get network stats ", e) + if attempt == 1: + raise e + await asyncio.sleep(idle_timeout) + network_total_transfer = stats[0] + stats[1] + network_window.append(network_total_transfer) + if current_time - start_time > activity_check_timeout: + network_utilization_change = network_window[-1] - network_window[0] + if network_utilization_change == 0: + start_time = current_time + try: + await destroy_container(ctx, container_id) + except Exception as e: + log.error( + f"Failure to destroy container based on networking timeout {container_id}", + e, + ) + break + else: + network_window = [] + start_time = current_time + await asyncio.sleep(activity_check_freq) + + +async def idle_timeout_monitor( + ctx: Context, + container_id: str, + idle_timeout: int, +) -> None: + start_time = time.monotonic() + while True: + if container_id not in await get_filebrowsers(): + break + current_time = time.monotonic() + if current_time - start_time >= idle_timeout: + try: + await destroy_container(ctx, container_id) + break + except Exception as e: + log.error(f"Failure to destroy container based on Idle timeout {container_id}", e) + break + + +async def keep_monitors_running(ctx: Context) -> None: + idle_timeout = ctx.local_config["filebrowser"]["idle_timeout"] + activity_check_freq = ctx.local_config["filebrowser"]["activity_check_freq"] + activity_check_timeout = ctx.local_config["filebrowser"]["activity_check_timeout"] + network_monitored_list = [] + idle_time_monitored_list = [] + while True: + await asyncio.sleep(30) # grace period for FileBrowser to start before monitoring. + browsers = await get_filebrowsers() + if len(browsers) > 0: + async with aiotools.TaskGroup() as tg: + for browser in browsers: + if browser not in network_monitored_list: + network_monitored_list.append(browser) + tg.create_task( + network_monitor( + ctx, + browser, + activity_check_freq, + activity_check_timeout, + ), + ) + if (idle_timeout is not None) and (browser not in idle_time_monitored_list): + idle_time_monitored_list.append(browser) + tg.create_task(idle_timeout_monitor(ctx, browser, idle_timeout)) + else: + network_monitored_list = [] + idle_time_monitored_list = [] + break + await asyncio.sleep(idle_timeout) diff --git a/src/ai/backend/storage/filebrowser_app/start.sh b/src/ai/backend/storage/filebrowser_app/start.sh new file mode 100755 index 00000000000..2b9b92577da --- /dev/null +++ b/src/ai/backend/storage/filebrowser_app/start.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +useradd -r -u $1 -g $2 work +su work +/bin/filebrowser config init -d /filebrowser_app/db/filebrowser_$3.db -p $3 +/bin/filebrowser users add admin admin -d /filebrowser_app/db/filebrowser_$3.db +/bin/filebrowser config import /filebrowser_app/config.json -d /filebrowser_app/db/filebrowser_$3.db +/bin/filebrowser -c /filebrowser_app/settings.json -d /filebrowser_app/db/filebrowser_$3.db -p $3 + +exit 0; diff --git a/src/ai/backend/storage/server.py b/src/ai/backend/storage/server.py index 88b97201a04..055250262f0 100644 --- a/src/ai/backend/storage/server.py +++ b/src/ai/backend/storage/server.py @@ -8,16 +8,19 @@ import sys from pathlib import Path from pprint import pprint +from shutil import rmtree from typing import Any, AsyncIterator, Sequence import aiomonitor import aiotools import click +import pkg_resources from aiohttp import web from setproctitle import setproctitle from ai.backend.common.config import ConfigurationError, override_key from ai.backend.common.etcd import AsyncEtcd, ConfigScopes +from ai.backend.common.lock import FileLock from ai.backend.common.logging import BraceStyleAdapter, Logger from ai.backend.common.types import LogSeverity from ai.backend.common.utils import env_info @@ -27,6 +30,8 @@ from .api.manager import init_manager_app from .config import load_local_config from .context import Context +from .filebrowser.filebrowser import close_all_filebrowser_containers +from .filebrowser.monitor import keep_monitors_running log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined] @@ -45,6 +50,12 @@ async def server_main_logwrapper(loop, pidx, _args): yield +storage_proxy_server_path = Path(pkg_resources.resource_filename(__name__, "")) +monitor_lock_path = Path(storage_proxy_server_path / "filebrowser/monitor_lock.txt") +if not monitor_lock_path.exists(): + file_lock = FileLock(monitor_lock_path, timeout=3, debug=True) + + @aiotools.server async def server_main( loop: asyncio.AbstractEventLoop, @@ -110,6 +121,15 @@ async def server_main( manager_api_runner = web.AppRunner(manager_api_app) await client_api_runner.setup() await manager_api_runner.setup() + if not file_lock.locked: + try: + await file_lock.acquire() + if file_lock.locked: + asyncio.create_task(keep_monitors_running(ctx)) + except asyncio.TimeoutError: + log.debug("File lock is already acquired by another process.") + except Exception as e: + log.error("Failed to acquire file lock", exc_info=e) client_service_addr = local_config["api"]["client"]["service-addr"] manager_service_addr = local_config["api"]["manager"]["service-addr"] client_api_site = web.TCPSite( @@ -146,6 +166,11 @@ async def server_main( log.info("Shutting down...") await manager_api_runner.cleanup() await client_api_runner.cleanup() + + if monitor_lock_path.exists() and file_lock.locked: + file_lock.release() + monitor_lock_path.unlink() + await close_all_filebrowser_containers(ctx) finally: if aiomon_started: m.close() @@ -236,6 +261,13 @@ def main( if local_config["storage-proxy"]["pid-file"].is_file(): # check is_file() to prevent deleting /dev/null! local_config["storage-proxy"]["pid-file"].unlink() + if monitor_lock_path.exists() and file_lock.locked: + file_lock.release() + monitor_lock_path.unlink() + if local_config["filebrowser"]["db_path"].exists(): + local_config["filebrowser"]["db_path"].unlink() + if (local_config["filebrowser"]["settings_path"] / "db/").exists(): + rmtree((local_config["filebrowser"]["settings_path"] / "db/")) return 0 diff --git a/src/ai/backend/storage/utils.py b/src/ai/backend/storage/utils.py index 05665598afc..7b24033645a 100644 --- a/src/ai/backend/storage/utils.py +++ b/src/ai/backend/storage/utils.py @@ -1,10 +1,13 @@ import enum import json import logging +import random from contextlib import asynccontextmanager as actxmgr from datetime import datetime from datetime import timezone as tz -from typing import Any, Optional, Union +from pathlib import Path +from socket import AF_INET, SOCK_STREAM, socket +from typing import Any, List, Optional, Union import trafaret as t from aiohttp import web @@ -127,3 +130,24 @@ async def log_manager_api_entry( "ManagerAPI::{}()", name.upper(), ) + + +def mangle_path(mount_path: str, vfid: str) -> Path: + prefix1 = vfid[0:2] + prefix2 = vfid[2:4] + rest = vfid[4:] + return Path(mount_path, prefix1, prefix2, rest) + + +def is_port_in_use(port: int) -> bool: + with socket(AF_INET, SOCK_STREAM) as s: + return s.connect_ex(("localhost", port)) == 0 + + +def get_available_port(port_range: List[int]) -> int: + port_range = list(range(int(port_range[0]), int(port_range[1]) + 1)) + while True: + sample_port = random.sample(port_range, 1)[0] + if not is_port_in_use(int(sample_port)): + break + return sample_port diff --git a/src/ai/backend/test/cli_integration/user/test_filebrowser.py b/src/ai/backend/test/cli_integration/user/test_filebrowser.py new file mode 100644 index 00000000000..0a7790c5d4f --- /dev/null +++ b/src/ai/backend/test/cli_integration/user/test_filebrowser.py @@ -0,0 +1,60 @@ +from unittest import mock + +import pytest +from aioresponses import aioresponses + +from ai.backend.client.config import API_VERSION +from ai.backend.client.session import Session +from ai.backend.client.test_utils import AsyncMock + + +def build_url(config, path: str): + base_url = config.endpoint.path.rstrip("/") + query_path = path.lstrip("/") if len(path) > 0 else "" + path = "{0}/{1}".format(base_url, query_path) + canonical_url = config.endpoint.with_path(path) + return canonical_url + + +@pytest.fixture(scope="module", autouse=True) +def api_version(): + mock_nego_func = AsyncMock() + mock_nego_func.return_value = API_VERSION + with mock.patch("ai.backend.client.session._negotiate_api_version", mock_nego_func): + yield + + +def test_create_vfolder(): + host = "local:volume1" + vfolders = ["mydata1"] + with Session() as session, aioresponses() as m: + payload = { + "host": host, + "vfolders": vfolders, + "status": "ok", + "addr": "127.0.0.1", + "container_id": "00000000", + } + m.post( + build_url(session.config, "/storage/filebrowser/create"), + status=201, + payload=payload, + ) + resp = session.FileBrowser.create_or_update_browser(host, vfolders) + assert resp == payload + + +def destroy_browser(): + container_id = "0000000" + with Session() as session, aioresponses() as m: + payload = { + "container_id": container_id, + "status": "ok", + } + m.delete( + build_url(session.config, "/storage/filebrowser/destroy"), + status=201, + payload=payload, + ) + resp = session.FileBrowser.destroy(container_id) + assert resp == payload