Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ jobs:
container:
image: lmsysorg/sglang:dev
options: --gpus all --shm-size=2g --rm -v /dev/shm
env:
CUDA_VISIBLE_DEVICES: 6,7
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down
69 changes: 55 additions & 14 deletions tests/ci/gpu_lock_exec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#!/usr/bin/env python3
import argparse
import fcntl
import os
Expand Down Expand Up @@ -38,21 +37,46 @@ def _os_execvp(args):

def _parse_args():
p = argparse.ArgumentParser()
p.add_argument("--count", type=int, default=None, help="Acquire this many GPUs (any free ones)")
p.add_argument("--devices", type=str, default=None, help="Comma separated explicit devices to acquire (e.g. 0,1)")
p.add_argument("--total-gpus", type=int, default=8, help="Total GPUs on the machine")
p.add_argument("--timeout", type=int, default=3600, help="Seconds to wait for locks before failing")
p.add_argument(
"--target-env-name", type=str, default="CUDA_VISIBLE_DEVICES", help="Which env var to set for devices"
"--count", type=int, default=None, help="Acquire this many GPUs (any free ones)"
)
p.add_argument(
"--devices",
type=str,
default=None,
help="Comma separated explicit devices to acquire (e.g. 0,1)",
)
p.add_argument(
"--total-gpus", type=int, default=8, help="Total GPUs on the machine"
)
p.add_argument(
"--timeout",
type=int,
default=3600,
help="Seconds to wait for locks before failing",
)
p.add_argument(
"--target-env-name",
type=str,
default="CUDA_VISIBLE_DEVICES",
help="Which env var to set for devices",
)
p.add_argument(
"--lock-path-pattern",
type=str,
default="/dev/shm/custom_gpu_lock_{gpu_id}.lock",
help='Filename pattern with "{gpu_id}" placeholder',
)
p.add_argument("--print-only", action="store_true", help="Probe free devices and print them (does NOT hold locks)")
p.add_argument("cmd", nargs=argparse.REMAINDER, help="Command to exec after '--' (required unless --print-only)")
p.add_argument(
"--print-only",
action="store_true",
help="Probe free devices and print them (does NOT hold locks)",
)
p.add_argument(
"cmd",
nargs=argparse.REMAINDER,
help="Command to exec after '--' (required unless --print-only)",
)
args = p.parse_args()

if "{gpu_id}" not in args.lock_path_pattern:
Expand All @@ -79,7 +103,9 @@ def _execute_print_only(args):
pass
fd_lock.close()
except Exception as e:
print(f"Warning: Error while probing lock: {e}", file=sys.stderr, flush=True)
print(
f"Warning: Error while probing lock: {e}", file=sys.stderr, flush=True
)

print("Free GPUs:", ",".join(str(x) for x in free), flush=True)

Expand All @@ -89,7 +115,9 @@ def _try_acquire(args):
devs = _parse_devices(args.devices)
return _try_acquire_specific(devs, args.lock_path_pattern, args.timeout)
else:
return _try_acquire_count(args.count, args.total_gpus, args.lock_path_pattern, args.timeout)
return _try_acquire_count(
args.count, args.total_gpus, args.lock_path_pattern, args.timeout
)


def _try_acquire_specific(devs: List[int], path_pattern: str, timeout: int):
Expand All @@ -111,7 +139,9 @@ def _try_acquire_specific(devs: List[int], path_pattern: str, timeout: int):
fd_locks.append(fd_lock)
return fd_locks
except Exception as e:
print(f"Error during specific GPU acquisition: {e}", file=sys.stderr, flush=True)
print(
f"Error during specific GPU acquisition: {e}", file=sys.stderr, flush=True
)
for fd_lock in fd_locks:
fd_lock.close()
raise
Expand Down Expand Up @@ -143,7 +173,10 @@ def _try_acquire_count(count: int, total_gpus: int, path_pattern: str, timeout:
if time.time() - start > timeout:
raise TimeoutError(f"Timeout acquiring {count} GPUs (out of {total_gpus})")

print(f"[gpu_lock_exec] try_acquire_count failed, sleep and retry (only got: {gotten_gpu_ids})", flush=True)
print(
f"[gpu_lock_exec] try_acquire_count failed, sleep and retry (only got: {gotten_gpu_ids})",
flush=True,
)
time.sleep(SLEEP_BACKOFF * random.random())


Expand All @@ -168,7 +201,11 @@ def close(self):
try:
self.fd.close()
except Exception as e:
print(f"Warning: Failed to close file descriptor: {e}", file=sys.stderr, flush=True)
print(
f"Warning: Failed to close file descriptor: {e}",
file=sys.stderr,
flush=True,
)
self.fd = None


Expand All @@ -181,7 +218,11 @@ def _ensure_lock_files(path_pattern: str, total_gpus: int):
try:
open(p, "a").close()
except Exception as e:
print(f"Warning: Could not create lock file {p}: {e}", file=sys.stderr, flush=True)
print(
f"Warning: Could not create lock file {p}: {e}",
file=sys.stderr,
flush=True,
)


def _get_lock_path(path_pattern: str, gpu_id: int) -> str:
Expand Down