llmOS-Agent / src /vm.py
tech-envision
Fix VM command execution to capture output
3645430
from __future__ import annotations
import subprocess
import asyncio
from functools import partial
from pathlib import Path
from threading import Lock
from .config import UPLOAD_DIR, VM_IMAGE, PERSIST_VMS, VM_STATE_DIR
from .utils import limit_chars
from .log import get_logger
_LOG = get_logger(__name__)
def _sanitize(name: str) -> str:
"""Return a Docker-safe name fragment."""
allowed = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.")
return "".join(c if c in allowed else "_" for c in name)
class LinuxVM:
"""Manage a lightweight Docker-based VM.
The default image provides Python and pip so packages can be installed
immediately. A custom image can be supplied via ``VM_IMAGE``.
"""
def __init__(
self,
username: str,
image: str = VM_IMAGE,
host_dir: str = UPLOAD_DIR,
) -> None:
self._image = image
self._name = f"chat-vm-{_sanitize(username)}"
self._running = False
self._host_dir = Path(host_dir)
self._host_dir.mkdir(parents=True, exist_ok=True)
self._state_dir = Path(VM_STATE_DIR) / _sanitize(username)
self._state_dir.mkdir(parents=True, exist_ok=True)
def start(self) -> None:
"""Start the VM if it is not already running."""
if self._running:
return
try:
inspect = subprocess.run(
["docker", "inspect", "-f", "{{.State.Running}}", self._name],
capture_output=True,
text=True,
)
if inspect.returncode == 0:
if inspect.stdout.strip() == "true":
self._running = True
return
subprocess.run(
["docker", "start", self._name],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
self._running = True
return
subprocess.run(
["docker", "pull", self._image],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
subprocess.run(
[
"docker",
"run",
"-d",
"--name",
self._name,
"-v",
f"{self._host_dir}:/data",
"-v",
f"{self._state_dir}:/state",
self._image,
"sleep",
"infinity",
],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
self._running = True
except Exception as exc: # pragma: no cover - runtime failures
_LOG.error("Failed to start VM: %s", exc)
raise RuntimeError(f"Failed to start VM: {exc}") from exc
def execute(
self, command: str, *, timeout: int | None = 3, detach: bool = False
) -> str:
"""Execute a command inside the running VM.
Parameters
----------
command:
The shell command to run inside the container.
timeout:
Maximum time in seconds to wait for completion. Set to ``None``
to wait indefinitely. Ignored when ``detach`` is ``True``.
detach:
Run the command in the background without waiting for it to finish.
"""
if not self._running:
raise RuntimeError("VM is not running")
cmd = [
"docker",
"exec",
"-i",
]
if detach:
cmd.append("-d")
cmd.extend(
[
self._name,
"bash",
"-lc",
command,
]
)
try:
completed = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=None if detach or timeout is None else timeout,
)
except subprocess.TimeoutExpired as exc:
return f"Command timed out after {timeout}s: {exc.cmd}"
except Exception as exc: # pragma: no cover - unforeseen errors
return f"Failed to execute command: {exc}"
output = completed.stdout
if completed.stderr:
output = f"{output}\n{completed.stderr}" if output else completed.stderr
return limit_chars(output)
async def execute_async(
self, command: str, *, timeout: int | None = 3, detach: bool = False
) -> str:
"""Asynchronously execute ``command`` inside the running VM."""
loop = asyncio.get_running_loop()
func = partial(self.execute, command, timeout=timeout, detach=detach)
return await loop.run_in_executor(None, func)
def stop(self) -> None:
"""Terminate the VM if running."""
if not self._running:
return
if PERSIST_VMS:
subprocess.run(
["docker", "stop", self._name],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
else:
subprocess.run(
["docker", "rm", "-f", self._name],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
self._running = False
def __enter__(self) -> "LinuxVM":
self.start()
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.stop()
class VMRegistry:
"""Manage Linux VM instances on a per-user basis."""
_vms: dict[str, LinuxVM] = {}
_counts: dict[str, int] = {}
_lock = Lock()
@classmethod
def acquire(cls, username: str) -> LinuxVM:
"""Return a running VM for ``username``, creating it if needed."""
with cls._lock:
vm = cls._vms.get(username)
if vm is None:
vm = LinuxVM(
username,
host_dir=str(Path(UPLOAD_DIR) / username),
)
cls._vms[username] = vm
cls._counts[username] = 0
cls._counts[username] += 1
vm.start()
return vm
@classmethod
def release(cls, username: str) -> None:
"""Release one reference to ``username``'s VM and stop it if unused."""
with cls._lock:
vm = cls._vms.get(username)
if vm is None:
return
cls._counts[username] -= 1
if cls._counts[username] <= 0:
cls._counts[username] = 0
if not PERSIST_VMS:
vm.stop()
del cls._vms[username]
del cls._counts[username]
@classmethod
def shutdown_all(cls) -> None:
"""Stop and remove all managed VMs."""
with cls._lock:
if not PERSIST_VMS:
for vm in cls._vms.values():
vm.stop()
cls._vms.clear()
cls._counts.clear()