diff --git a/changes/38.feature b/changes/38.feature
new file mode 100644
index 00000000..7a4f95e4
--- /dev/null
+++ b/changes/38.feature
@@ -0,0 +1 @@
+Move file browser to storage proxy.
diff --git a/config/filebrowser/branding/img/icons/logo.svg b/config/filebrowser/branding/img/icons/logo.svg
new file mode 100644
index 00000000..a31d1ebf
--- /dev/null
+++ b/config/filebrowser/branding/img/icons/logo.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/config/filebrowser/branding/img/logo.svg b/config/filebrowser/branding/img/logo.svg
new file mode 100644
index 00000000..a31d1ebf
--- /dev/null
+++ b/config/filebrowser/branding/img/logo.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/config/filebrowser/start.sh b/config/filebrowser/start.sh
new file mode 100755
index 00000000..156d7a40
--- /dev/null
+++ b/config/filebrowser/start.sh
@@ -0,0 +1,9 @@
+#!/bin/sh
+useradd -r -u $1 -g $2 work
+su work
+/bin/filebrowser config init -d /filebrowser_dir/db/filebrowser_$3.db -p $3
+/bin/filebrowser users add admin admin -d /filebrowser_dir/db/filebrowser_$3.db
+/bin/filebrowser config import /filebrowser_dir/config.json -d /filebrowser_dir/db/filebrowser_$3.db
+/bin/filebrowser -c /filebrowser_dir/settings.json -d /filebrowser_dir/db/filebrowser_$3.db -p $3
+
+exit 0
\ No newline at end of file
diff --git a/config/sample.toml b/config/sample.toml
index 0d751a71..c8e337ce 100644
--- a/config/sample.toml
+++ b/config/sample.toml
@@ -34,6 +34,50 @@ session-expire = "1d"
# user = 1000
# group = 1000
+[filebrowser]
+# A locally available container image to run as filebrowser containers
+image = "cr.backend.ai/stable/filebrowser:21.02-ubuntu20.04"
+
+# The IP address to expose the filebrowser ports
+service_ip = "127.0.0.1"
+
+# The service port number by which user will be able to connect to filerowser UI.
+service_port = 8080
+
+# Settings pass to configure filebrowser application in the docker container.
+# The specific file 'settings.json' should be places in given path,
+# otherwise the default settings file will be genrated in the following pass
+# For more configuration file info see https://filebrowser.org/installation
+settings_path = "./filebrowser_dir/"
+
+# The maxium number of CPU cores available to each filebrowser container
+max_cpu = 1
+
+# The maximum size of memory available to each filebrowser container
+max_mem = "1g"
+
+# The maximum limit of the number of filebrowser containers
+max_containers = 32
+
+# The SQLite db path and name
+db_path = './filebrowser_dir/containers.db'
+
+# The group to be used by the filebrowser container
+user_id = 1001
+group_id = 100
+
+# The frequency to check network I/O status of containers.
+activity_check_freq = 1
+
+# The timeout to mark a container idle when there are no activities observed.
+activity_check_timeout = 30
+
+# The timeout to destroy a container after it is marked as idle.
+idle_timeout = 30
+
+# Define port range which can be used by filebrowser containers.
+# For example, port_range="4000-5000"
+port_range="4000-5000"
[api.client]
# Client-facing API
diff --git a/filebrowser_dir/start.sh b/filebrowser_dir/start.sh
new file mode 100755
index 00000000..236c0252
--- /dev/null
+++ b/filebrowser_dir/start.sh
@@ -0,0 +1,8 @@
+#!/bin/sh
+
+/bin/filebrowser config init -d /filebrowser_dir/db/filebrowser_$1.db -p $1;
+/bin/filebrowser users add admin admin -d /filebrowser_dir/db/filebrowser_$1.db;
+/bin/filebrowser config import /filebrowser_dir/config.json -d /filebrowser_dir/db/filebrowser_$1.db;
+/bin/filebrowser -c /filebrowser_dir/settings.json -d /filebrowser_dir/db/filebrowser_$1.db -p $1;
+
+exit 0;
diff --git a/setup.cfg b/setup.cfg
index d68e51e7..34a84830 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -33,6 +33,7 @@ python_requires = >=3.10
setup_requires =
setuptools>=50.3.2
install_requires =
+ aiodocker~=0.21
aiofiles>=0.7.0
types-aiofiles>=0.1.9
aiohttp~=3.8.0
@@ -45,6 +46,8 @@ install_requires =
uvloop>=0.16.0
zipstream~=1.1.4
backend.ai-common~=22.3.0
+ sqlalchemy~=1.4.32
+ types-setuptools~=57.4.11
zip_safe = false
include_package_data = true
diff --git a/src/ai/backend/storage/api/manager.py b/src/ai/backend/storage/api/manager.py
index 4ca7c9ac..a7ec2887 100644
--- a/src/ai/backend/storage/api/manager.py
+++ b/src/ai/backend/storage/api/manager.py
@@ -22,6 +22,7 @@
from ..abc import AbstractVolume
from ..context import Context
from ..exception import InvalidSubpathError, VFolderNotFoundError
+from ..filebrowser import filebrowser
from ..types import VFolderCreationOptions
from ..utils import check_params, log_manager_api_entry
@@ -358,6 +359,7 @@ async def get_vfolder_usage(request: web.Request) -> web.Response:
},
),
) as params:
+
try:
await log_manager_api_entry(log, "get_vfolder_usage", params)
ctx: Context = request.app["ctx"]
@@ -626,6 +628,49 @@ async def delete_files(request: web.Request) -> web.Response:
)
+async def create_or_update_filebrowser(request: web.Request) -> web.Response:
+ ctx: Context = request.app["ctx"]
+ params = await request.json()
+ host: str
+ port: int
+ container_id: str
+ host, port, container_id = await filebrowser.create_or_update(
+ ctx,
+ params["host"],
+ params["vfolders"],
+ )
+ return web.json_response(
+ {
+ "addr": f"http://{host}:{port}", # TODO: SSL?
+ "container_id": f"{container_id}",
+ "status": "ok",
+ },
+ )
+
+
+async def destroy_filebrowser(request: web.Request) -> web.Response:
+ async with check_params(
+ request,
+ t.Dict(
+ {
+ t.Key("auth_token"): t.String,
+ t.Key("container_id"): t.String,
+ },
+ ),
+ ) as params:
+ await log_manager_api_entry(log, "destroy_filebrowser", params)
+ ctx: Context = request.app["ctx"]
+ try:
+ await filebrowser.destroy_container(ctx, params["container_id"])
+ return web.json_response(
+ {
+ "status": "ok",
+ },
+ )
+ except Exception:
+ raise Exception
+
+
async def init_manager_app(ctx: Context) -> web.Application:
app = web.Application(
middlewares=[
@@ -655,4 +700,10 @@ async def init_manager_app(ctx: Context) -> web.Application:
app.router.add_route("POST", "/folder/file/download", create_download_session)
app.router.add_route("POST", "/folder/file/upload", create_upload_session)
app.router.add_route("POST", "/folder/file/delete", delete_files)
+ app.router.add_route(
+ "POST",
+ "/storage/filebrowser/create",
+ create_or_update_filebrowser,
+ )
+ app.router.add_route("DELETE", "/storage/filebrowser/destroy", destroy_filebrowser)
return app
diff --git a/src/ai/backend/storage/config.py b/src/ai/backend/storage/config.py
index 1d432fcf..1c1b8341 100644
--- a/src/ai/backend/storage/config.py
+++ b/src/ai/backend/storage/config.py
@@ -1,4 +1,5 @@
import os
+from contextlib import asynccontextmanager
from pathlib import Path
import trafaret as t
@@ -13,6 +14,14 @@
_file_perm = (Path(__file__).parent / "server.py").stat()
+@asynccontextmanager
+async def closing_async(thing):
+ try:
+ yield thing
+ finally:
+ await thing.close()
+
+
local_config_iv = (
t.Dict(
{
@@ -38,6 +47,34 @@
),
},
),
+ t.Key("filebrowser"): t.Dict(
+ {
+ t.Key("image"): t.String,
+ t.Key("service_ip"): t.IP,
+ t.Key("max_cpu", default=1): t.Int[1:_max_cpu_count],
+ t.Key("max_mem", default="1g"): tx.BinarySize,
+ t.Key("max_containers", default=32): t.Int[1:],
+ t.Key("user_id", default=None): tx.UserID(
+ default_uid=_file_perm.st_uid,
+ ),
+ t.Key("group_id", default=None): tx.GroupID(
+ default_gid=_file_perm.st_gid,
+ ),
+ t.Key("settings_path", default=None): tx.Path(type="dir"),
+ t.Key("service_port", default=None): t.Int,
+ t.Key("mount_path", default=None): tx.Path(type="dir"),
+ t.Key("max_containers", default=None): t.Int,
+ t.Key("db_path", default=None): tx.Path(
+ type="file",
+ allow_nonexisting=True,
+ allow_devnull=True,
+ ),
+ t.Key("activity_check_timeout", default=30): t.Int,
+ t.Key("activity_check_freq", default=1): t.Int,
+ t.Key("idle_timeout", default=300): t.Int,
+ t.Key("port_range", default="4000-5000"): t.String,
+ },
+ ),
t.Key("logging"): logging_config_iv,
t.Key("api"): t.Dict(
{
diff --git a/src/ai/backend/storage/context.py b/src/ai/backend/storage/context.py
index 1f228af6..ec1ff8ef 100644
--- a/src/ai/backend/storage/context.py
+++ b/src/ai/backend/storage/context.py
@@ -29,6 +29,7 @@ class Context:
pid: int
etcd: AsyncEtcd
local_config: Mapping[str, Any]
+ container_id: Any
def __init__(
self,
diff --git a/src/ai/backend/storage/filebrowser/__init__.py b/src/ai/backend/storage/filebrowser/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/src/ai/backend/storage/filebrowser/config_browser_app.py b/src/ai/backend/storage/filebrowser/config_browser_app.py
new file mode 100644
index 00000000..c73f3af0
--- /dev/null
+++ b/src/ai/backend/storage/filebrowser/config_browser_app.py
@@ -0,0 +1,91 @@
+import json
+from pathlib import Path
+
+import aiofiles
+
+
+async def prepare_filebrowser_app_config(
+ settings_path: Path,
+ service_port: int,
+) -> None:
+ filebrowser_config = {
+ "settings": {
+ "key": (
+ "cdsnV117a7KLMN3GAbNWIB/b4w/P1zsVxmAZxAU"
+ "AKidhZG9418Fn5wjE+Zcv6C9eCEDlajcINFkvV+BFSWtVZw=="
+ ),
+ "signup": False,
+ "createUserDir": False,
+ "defaults": {
+ "scope": ".",
+ "locale": "en",
+ "viewMode": "list",
+ "singleClick": False,
+ "sorting": {
+ "by": "name",
+ "asc": False,
+ },
+ "perm": {
+ "admin": True,
+ "execute": True,
+ "create": True,
+ "rename": True,
+ "modify": True,
+ "delete": True,
+ "share": True,
+ "download": True,
+ },
+ "commands": [],
+ "hideDotfiles": False,
+ },
+ "authMethod": "noauth",
+ "branding": {
+ "name": "BACKEND.AI Web Browser",
+ "disableExternal": True,
+ "files": "/filebrowser_dir/branding/",
+ "theme": "",
+ },
+ "commands": {
+ "after_copy": [],
+ "after_delete": [],
+ "after_rename": [],
+ "after_save": [],
+ "after_upload": [],
+ "before_copy": [],
+ "before_delete": [],
+ "before_rename": [],
+ "before_save": [],
+ "before_upload": [],
+ },
+ "shell": [],
+ "rules": [],
+ },
+ "server": {
+ "root": "/data/",
+ "baseURL": "",
+ "socket": "",
+ "tlsKey": "",
+ "tlsCert": "",
+ "port": str(service_port),
+ "address": "",
+ "log": "stdout",
+ "enableThumbnails": False,
+ "resizePreview": False,
+ "enableExec": False,
+ },
+ "auther": {"recaptcha": None},
+ }
+
+ async with aiofiles.open(settings_path / "config.json", mode="w") as file:
+ await file.write(json.dumps(filebrowser_config))
+
+ filebrowser_app_settings = {
+ "port": service_port,
+ "baseURL": "",
+ "address": "",
+ "log": "stdout",
+ "database": f"/filebrowser_dir/filebrowser_{service_port}.db",
+ "root": "/data/",
+ }
+ async with aiofiles.open(settings_path / "settings.json", mode="w") as file:
+ await file.write(json.dumps(filebrowser_app_settings))
diff --git a/src/ai/backend/storage/filebrowser/database.py b/src/ai/backend/storage/filebrowser/database.py
new file mode 100644
index 00000000..b11cab6c
--- /dev/null
+++ b/src/ai/backend/storage/filebrowser/database.py
@@ -0,0 +1,70 @@
+from typing import Any
+
+import sqlalchemy as sa
+
+
+class FilebrowserTrackerDB:
+ def __init__(self, db_path: str):
+ self.meta = sa.MetaData()
+ self.containers = sa.Table(
+ "containers",
+ self.meta,
+ sa.Column("container_id", sa.String, primary_key=True),
+ sa.Column("container_name", sa.String),
+ sa.Column("service_ip", sa.String),
+ sa.Column("service_port", sa.Integer),
+ sa.Column("config", sa.Text),
+ sa.Column("status", sa.String),
+ sa.Column("timestamp", sa.String),
+ )
+ self.db_path = db_path
+ self.url = f"sqlite:///{str(db_path)}"
+ self._engine = sa.engine
+ self.engine = sa.create_engine(self.url)
+
+ insp = sa.inspect(self.engine)
+ if "containers" not in insp.get_table_names():
+ self.meta.create_all(self.engine)
+
+ async def get_all_containers(self) -> Any:
+ with self.engine.connect() as connection:
+ rows = connection.execute(self.containers.select())
+ return rows
+
+ async def get_filebrowser_by_container_id(self, container_id: str) -> Any:
+ with self.engine.connect() as connection:
+ rows = connection.execute(
+ self.containers.select().where(
+ self.containers.c.container_id == container_id,
+ ),
+ )
+ return rows
+
+ async def insert_new_container(
+ self,
+ container_id: str,
+ container_name: str,
+ service_ip: str,
+ service_port: int,
+ config: dict[str, Any],
+ status: str,
+ timestamp: str,
+ ):
+ with self.engine.connect() as connection:
+ ins = self.containers.insert().values(
+ container_id=container_id,
+ container_name=container_name,
+ service_ip=service_ip,
+ service_port=int(service_port),
+ config=str(config),
+ status=status,
+ timestamp=timestamp,
+ )
+ connection.execute(ins)
+
+ async def delete_container_record(self, container_id: str) -> None:
+ with self.engine.connect() as connection:
+ del_sql = self.containers.delete().where(
+ self.containers.c.container_id == container_id,
+ )
+ connection.execute(del_sql)
diff --git a/src/ai/backend/storage/filebrowser/filebrowser.py b/src/ai/backend/storage/filebrowser/filebrowser.py
new file mode 100644
index 00000000..083ca220
--- /dev/null
+++ b/src/ai/backend/storage/filebrowser/filebrowser.py
@@ -0,0 +1,218 @@
+from __future__ import annotations
+
+import logging
+from contextlib import asynccontextmanager
+from datetime import datetime
+from pathlib import Path
+from typing import Any, List, Mapping, Type
+from uuid import UUID
+
+import aiodocker
+import pkg_resources
+
+from ai.backend.common.logging import BraceStyleAdapter
+from ai.backend.common.validators import BinarySize
+from ai.backend.storage.abc import AbstractVolume
+from ai.backend.storage.context import Context
+from ai.backend.storage.utils import get_available_port
+from ai.backend.storage.vfs import BaseVolume
+
+from .config_browser_app import prepare_filebrowser_app_config
+from .database import FilebrowserTrackerDB
+
+BACKENDS: Mapping[str, Type[AbstractVolume]] = {
+ "vfs": BaseVolume,
+}
+
+
+log = BraceStyleAdapter(logging.getLogger(__name__))
+
+__all__ = (
+ "create_or_update",
+ "destroy_container",
+ "get_container_by_id",
+ "get_filebrowsers",
+ "get_network_stats",
+)
+
+
+@asynccontextmanager
+async def closing_async(thing):
+ try:
+ yield thing
+ finally:
+ await thing.close()
+
+
+async def create_or_update(
+ ctx: Context,
+ host: str,
+ vfolders: list[dict],
+) -> tuple[str, int, str]:
+ image = ctx.local_config["filebrowser"]["image"]
+ service_ip = ctx.local_config["filebrowser"]["service_ip"]
+ service_port = ctx.local_config["filebrowser"]["service_port"]
+ max_containers = ctx.local_config["filebrowser"]["max_containers"]
+ user_id = ctx.local_config["filebrowser"]["user_id"]
+ group_id = ctx.local_config["filebrowser"]["group_id"]
+ cpu_count = ctx.local_config["filebrowser"]["max_cpu"]
+ memory = ctx.local_config["filebrowser"]["max_mem"]
+ memory = int(BinarySize().check_and_return(memory))
+ db_path = ctx.local_config["filebrowser"]["db_path"]
+ p = Path(pkg_resources.resource_filename(__name__, ""))
+ storage_proxy_root_path_index = p.parts.index("storage-proxy")
+ settings_path = (
+ Path(*p.parts[0 : storage_proxy_root_path_index + 1]) / "config/filebrowser/"
+ )
+ _, requested_volume = host.split(":")
+ volumes = ctx.local_config["volume"]
+ for volume_name in volumes.keys():
+ if requested_volume == volume_name:
+ volume_cls: Type[AbstractVolume] = BACKENDS[
+ volumes.get(volume_name)["backend"]
+ ]
+ mount_path = Path(volumes.get(volume_name)["path"])
+ volume_obj = volume_cls(
+ local_config=ctx.local_config,
+ mount_path=mount_path,
+ fsprefix=None,
+ options={},
+ )
+ port_range = ctx.local_config["filebrowser"]["port_range"].split("-")
+ service_port = get_available_port(port_range)
+ running_docker_containers = await get_filebrowsers()
+ if len(running_docker_containers) >= max_containers:
+ print(
+ "Can't create new container. Number of containers exceed the maximum limit.",
+ )
+ return ("0", 0, "0")
+ await prepare_filebrowser_app_config(settings_path, service_port)
+ async with closing_async(aiodocker.Docker()) as docker:
+ config = {
+ "Cmd": [
+ "/filebrowser_dir/start.sh",
+ f"{user_id}",
+ f"{group_id}",
+ f"{service_port}",
+ ],
+ "ExposedPorts": {
+ f"{service_port}/tcp": {},
+ },
+ "Image": image,
+ "HostConfig": {
+ "PortBindings": {
+ f"{service_port}/tcp": [
+ {
+ "HostIp": f"{service_ip}",
+ "HostPort": f"{service_port}/tcp",
+ },
+ ],
+ },
+ "CpuCount": cpu_count,
+ "Memory": memory,
+ "Mounts": [
+ {
+ "Target": "/filebrowser_dir/",
+ "Source": f"{settings_path}",
+ "Type": "bind",
+ },
+ ],
+ },
+ }
+ for vfolder in vfolders:
+ filebrowser_mount_path = str(
+ volume_obj.mangle_vfpath(UUID(vfolder["vfid"])),
+ )
+ config["HostConfig"]["Mounts"].append(
+ {
+ "Target": f"/data/{str(vfolder['name'])}",
+ "Source": filebrowser_mount_path,
+ "Type": "bind",
+ },
+ )
+ container_name = f"ai.backend.container-filebrowser-{service_port}"
+ container = await docker.containers.create_or_replace(
+ config=config,
+ name=container_name,
+ )
+ container_id = container._id
+ await container.start()
+ tracker_db = FilebrowserTrackerDB(db_path)
+ await tracker_db.insert_new_container(
+ container_id,
+ container_name,
+ service_ip,
+ service_port,
+ config,
+ "RUNNING",
+ str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
+ )
+ return service_ip, service_port, container_id
+
+
+async def recreate_container(container_name: str, config: dict[str, Any]) -> None:
+ async with closing_async(aiodocker.Docker()) as docker:
+ try:
+ docker = aiodocker.Docker()
+ container = await docker.containers.create_or_replace(
+ config=config,
+ name=container_name,
+ )
+ await container.start()
+ except Exception as e:
+ print("Failure to recreate container ", e)
+
+
+async def destroy_container(ctx: Context, container_id: str) -> None:
+ db_path = ctx.local_config["filebrowser"]["db_path"]
+ tracker_db = FilebrowserTrackerDB(db_path)
+ async with closing_async(aiodocker.Docker()) as docker:
+ for container in await docker.containers.list():
+ if container._id == container_id:
+ try:
+ await container.stop()
+ await container.delete()
+ await tracker_db.delete_container_record(container_id)
+ except Exception as e:
+ print(f"Failure to destroy container {container_id[0:7]} ", e)
+ else:
+ break
+
+
+async def get_container_by_id(container_id: str) -> Any:
+ async with closing_async(aiodocker.Docker()) as docker:
+ container = aiodocker.docker.DockerContainers(docker).container(
+ container_id=container_id,
+ )
+ return container
+
+
+async def get_filebrowsers() -> List[str]:
+ container_list = []
+ async with closing_async(aiodocker.Docker()) as docker:
+ containers = await aiodocker.docker.DockerContainers(docker).list()
+ for container in containers:
+ stats = await container.stats(stream=False)
+ name = stats[0]["name"]
+ cnt_id = stats[0]["id"]
+ if "ai.backend.container-filebrowser" in name:
+ container_list.append(cnt_id)
+ return container_list
+
+
+async def get_network_stats(container_id: str) -> tuple[int, int]:
+ async with closing_async(aiodocker.Docker()) as docker:
+ container = aiodocker.docker.DockerContainers(docker).container(
+ container_id=container_id,
+ )
+ stats = await container.stats(stream=False)
+ return (
+ int(stats[0]["networks"]["eth0"]["rx_bytes"]),
+ int(stats[0]["networks"]["eth0"]["tx_bytes"]),
+ )
+
+
+async def _check_active_connections() -> bool:
+ if len(await get_filebrowsers()) > 0:
+ return True
+ return False
diff --git a/src/ai/backend/storage/filebrowser/monitor.py b/src/ai/backend/storage/filebrowser/monitor.py
new file mode 100644
index 00000000..62646881
--- /dev/null
+++ b/src/ai/backend/storage/filebrowser/monitor.py
@@ -0,0 +1,92 @@
+import asyncio
+import time
+
+import aiotools
+
+from ai.backend.storage.context import Context
+
+from .filebrowser import destroy_container, get_filebrowsers, get_network_stats
+
+
+async def network_monitor(
+ ctx: Context,
+ container_id: str,
+ activity_check_freq: int,
+ activity_check_timeout: int,
+) -> None:
+ start_time = time.monotonic()
+ network_window = []
+ while True:
+ current_time = time.monotonic()
+ try:
+ stats = await get_network_stats(container_id)
+ except Exception as e:
+ print("Failed to get network stats ", e)
+ break
+ network_total_transfer = stats[0] + stats[1]
+ network_window.append(network_total_transfer)
+ if current_time - start_time > activity_check_timeout:
+ network_utilization_change = network_window[-1] - network_window[0]
+ if network_utilization_change == 0:
+ start_time = current_time
+ try:
+ await destroy_container(ctx, container_id)
+ except Exception as e:
+ print(
+ f"Failure to destroy container based on networking timeout {container_id}",
+ e,
+ )
+ break
+ else:
+ network_window = []
+ start_time = current_time
+ await asyncio.sleep(activity_check_freq)
+
+
+async def idle_timeout_monitor(
+ ctx: Context,
+ container_id: str,
+ idle_timeout: int,
+) -> None:
+ start_time = time.monotonic()
+ while True:
+ current_time = time.monotonic()
+ if current_time - start_time >= idle_timeout:
+ try:
+ await destroy_container(ctx, container_id)
+ except Exception as e:
+ print(
+ f"Failure to destroy container based on Idle timeout {container_id}",
+ e,
+ )
+ break
+ await asyncio.sleep(1)
+
+
+async def keep_monitors_running(ctx: Context) -> None:
+ idle_timeout = ctx.local_config["filebrowser"]["idle_timeout"]
+ activity_check_freq = ctx.local_config["filebrowser"]["activity_check_freq"]
+ activity_check_timeout = ctx.local_config["filebrowser"]["activity_check_timeout"]
+ network_monitored_list = []
+ idle_time_monitored_list = []
+ while True:
+ browsers = await get_filebrowsers()
+ if len(browsers) > 0:
+ async with aiotools.TaskGroup() as tg:
+ for browser in browsers:
+ if browser not in network_monitored_list:
+ network_monitored_list.append(browser)
+ tg.create_task(
+ network_monitor(
+ ctx,
+ browser,
+ activity_check_freq,
+ activity_check_timeout,
+ ),
+ )
+ if (idle_timeout is not None) and (
+ browser not in idle_time_monitored_list
+ ):
+ idle_time_monitored_list.append(browser)
+ tg.create_task(idle_timeout_monitor(ctx, browser, idle_timeout))
+ await asyncio.sleep(10)
diff --git a/src/ai/backend/storage/server.py b/src/ai/backend/storage/server.py
index e91a9189..a6165ad5 100644
--- a/src/ai/backend/storage/server.py
+++ b/src/ai/backend/storage/server.py
@@ -1,4 +1,6 @@
import asyncio
+
+# from asyncio.subprocess import PIPE
import grp
import logging
import multiprocessing
@@ -12,11 +14,13 @@
import aiotools
import click
+import pkg_resources
from aiohttp import web
from setproctitle import setproctitle
from ai.backend.common import config
from ai.backend.common.etcd import AsyncEtcd, ConfigScopes
+from ai.backend.common.lock import FileLock
from ai.backend.common.logging import BraceStyleAdapter, Logger
from ai.backend.common.utils import env_info
@@ -25,6 +29,7 @@
from .api.manager import init_manager_app
from .config import local_config_iv
from .context import Context
+from .filebrowser.monitor import keep_monitors_running
log = BraceStyleAdapter(logging.getLogger("ai.backend.storage.server"))
@@ -43,6 +48,12 @@ async def server_main_logwrapper(loop, pidx, _args):
yield
+storage_proxy_server_path = Path(pkg_resources.resource_filename(__name__, ""))
+monitor_lock_path = Path(storage_proxy_server_path / "filebrowser/monitor_lock.txt")
+if not monitor_lock_path.exists():
+ file_lock = FileLock(monitor_lock_path, timeout=3, debug=True)
+
+
@aiotools.server
async def server_main(
loop: asyncio.AbstractEventLoop,
@@ -89,6 +100,13 @@ async def server_main(
manager_api_runner = web.AppRunner(manager_api_app)
await client_api_runner.setup()
await manager_api_runner.setup()
+ if not file_lock.locked:
+ try:
+ await file_lock.acquire()
+ if file_lock.locked:
+ asyncio.create_task(keep_monitors_running(ctx))
+ except Exception as e:
+ print(e)
client_service_addr = local_config["api"]["client"]["service-addr"]
manager_service_addr = local_config["api"]["manager"]["service-addr"]
client_api_site = web.TCPSite(
@@ -125,6 +143,11 @@ async def server_main(
log.info("Shutting down...")
await manager_api_runner.cleanup()
await client_api_runner.cleanup()
+ if monitor_lock_path.exists() and file_lock.locked:
+ file_lock.release()
+ monitor_lock_path.unlink()
+ if ctx.local_config["filebrowser"]["db_path"].is_file():
+ ctx.local_config["filebrowser"]["db_path"].unlink()
@click.group(invoke_without_command=True)
@@ -190,7 +213,6 @@ def main(cli_ctx, config_path, debug):
log_endpoint=log_endpoint,
)
with logger:
- setproctitle("backend.ai: storage-proxy")
log.info("Backend.AI Storage Proxy", VERSION)
log.info("Runtime: {0}", env_info())
log.info("Node ID: {0}", local_config["storage-proxy"]["node-id"])
@@ -215,6 +237,10 @@ def main(cli_ctx, config_path, debug):
if local_config["storage-proxy"]["pid-file"].is_file():
# check is_file() to prevent deleting /dev/null!
local_config["storage-proxy"]["pid-file"].unlink()
+ if monitor_lock_path.exists() and file_lock.locked:
+ file_lock.release()
+ monitor_lock_path.unlink()
+
return 0
diff --git a/src/ai/backend/storage/utils.py b/src/ai/backend/storage/utils.py
index 4d065d52..3e617f4f 100644
--- a/src/ai/backend/storage/utils.py
+++ b/src/ai/backend/storage/utils.py
@@ -1,10 +1,13 @@
import enum
import json
import logging
+import random
from contextlib import asynccontextmanager as actxmgr
from datetime import datetime
from datetime import timezone as tz
-from typing import Any, Optional, Union
+from pathlib import Path
+from socket import AF_INET, SOCK_STREAM, socket
+from typing import Any, List, Optional, Union
import trafaret as t
from aiohttp import web
@@ -127,3 +130,24 @@ async def log_manager_api_entry(
"ManagerAPI::{}()",
name.upper(),
)
+
+
+def mangle_path(mount_path: str, vfid: str) -> Path:
+ prefix1 = vfid[0:2]
+ prefix2 = vfid[2:4]
+ rest = vfid[4:]
+ return Path(mount_path, prefix1, prefix2, rest)
+
+
+def is_port_in_use(port: int) -> bool:
+ with socket(AF_INET, SOCK_STREAM) as s:
+ return s.connect_ex(("localhost", port)) == 0
+
+
+def get_available_port(port_range: List[int]) -> int:
+ port_range = list(range(int(port_range[0]), int(port_range[1]) + 1))
+ while True:
+ sample_port = random.sample(port_range, 1)[0]
+ if not is_port_in_use(int(sample_port)):
+ break
+ return sample_port