clawdforge/clawdforge/acpx_runner.py
Kayos 940861f70a v0.2: multi-turn /sessions endpoints backed by ACPX
- Dockerfile: install acpx@latest alongside @anthropic-ai/claude-code
- compose.yml: bind /mnt/user/appdata/clawdforge/acpx-sessions:/root/.acpx/sessions
- DB: additive sessions + session_events tables in store.py SCHEMA
- clawdforge/acpx_runner.py: AcpxManager + AcpxSession, bounded async pool,
  per-invocation subprocess model (acpx CLI itself owns the queue-owner
  lifecycle, so each turn = one fresh `acpx prompt -s <uuid>` call)
- server.py: POST/GET/DELETE /sessions + POST /sessions/{id}/turn + GET /sessions
- Per-app isolation: 404 (not 403) on cross-token session access — no
  existence leak across tokens
- Lifespan-managed TTL sweeper: every 60s soft-closes idle sessions past
  CLAWDFORGE_SESSION_TTL_SECS (1h default), hard-deletes ledger rows past
  CLAWDFORGE_SESSION_HARD_TTL_SECS (24h default)
- session_events audit table parallel to existing runs table
  (events: create, turn, close, sweep_close, hard_delete)
- /healthz now reports acpx_present + acpx_version + open_sessions count
- tests/test_sessions.py: 16 tests covering create/turn/close/list/isolation/
  sweep/pool-full/regression. /run regression test asserts byte-identical
  v0.1 response shape.

ACPX research notes (v0.6.1, openclaw/acpx):
- npm package is `acpx`, not `@openclaw/acpx`
- Sessions are scoped by (agentCommand, cwd, name?). We mint our own UUID
  as `--name` and give every session a unique cwd subdir, so the scope key
  is collision-free across apps.
- session_id source: ours. We pass --name <uuid>, ACPX records it under
  ~/.acpx/sessions/<encoded-id>.json. We never need to parse ACPX's
  acpxRecordId — our UUID is canonical.
- Subprocess lifetime: per-invocation, NOT per-session. The acpx CLI itself
  spawns/maintains a per-session "queue owner" process via local IPC; each
  `acpx prompt` call we make either elects itself owner or enqueues. The
  AcpxSession class is therefore a thin (uuid, cwd, asyncio.Lock) handle,
  not a long-lived stdio pipe. The spec's "owns one stdio pipe pair" model
  was rewritten to match reality — flagged here.
- Close semantics: soft-close via `acpx sessions close <name>`. The
  on-disk record stays (ACPX's `sessions prune` is the hard-delete path,
  not invoked from clawdforge). DELETE /sessions/<id> is documented as
  idempotent (200 with already_closed=true on second call) so SDKs can
  call close() in finally/Drop blocks safely.
- File uploads: ACPX has no file-attach ACP method exposed via the CLI.
  We prepend a [Attached files] header listing absolute paths; the agent
  uses its Read tool to open them. Same behavior as /run --files in v0.1.
- Permissions: --approve-all on the turn invocation since the container is
  unattended and callers are bearer-token-trusted. Future v0.3 may expose
  a per-session permission policy.

/run endpoint unchanged — backwards compat verified by
test_run_endpoint_unchanged + test_run_endpoint_unchanged_error_shape.

Spec: memory/spec-clawdforge-v0.2.md
ACPX CLI ref: https://github.com/openclaw/acpx/blob/main/docs/CLI.md
2026-04-29 06:22:55 -07:00

448 lines
15 KiB
Python

"""ACPX-backed multi-turn session runner (v0.2).
ACPX (https://github.com/openclaw/acpx) is a headless ACP CLI. Its session
lifecycle is per-invocation: every command starts a fresh `acpx` subprocess
that talks to a per-session "queue owner" via local IPC. Sessions persist on
disk under ~/.acpx/sessions/<encoded-id>.json, scoped by
`(agentCommand, cwd, name?)`.
That means an `AcpxSession` here is NOT a long-lived stdio pipe — it is a
thin handle (cwd, name, asyncio.Lock) that mediates clawdforge's calls into
acpx. Each turn = `await asyncio.create_subprocess_exec(acpx_bin, ...)` and
read NDJSON stdout to completion.
We give every clawdforge session its own cwd subdirectory + UUID name so
acpx's `(agentCommand, cwd, name)` scope key is collision-free across apps.
Spec: memory/spec-clawdforge-v0.2.md
ACPX CLI ref: https://github.com/openclaw/acpx/blob/main/docs/CLI.md
"""
from __future__ import annotations
import asyncio
import json
import os
import shutil
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
# Exit codes from acpx CLI (docs/CLI.md)
ACPX_EXIT_OK = 0
ACPX_EXIT_AGENT_ERROR = 1
ACPX_EXIT_USAGE = 2
ACPX_EXIT_TIMEOUT = 3
ACPX_EXIT_NO_SESSION = 4
ACPX_EXIT_PERMISSION_DENIED = 5
class AcpxError(Exception):
"""Base class for acpx-runner failures surfaced to the API layer."""
class AcpxNotInstalled(AcpxError):
"""ACPX_BIN not on PATH or not executable."""
class AcpxPoolFull(AcpxError):
"""The bounded live-session pool has no free slot."""
class AcpxSessionNotFound(AcpxError):
"""No live AcpxSession in memory for the given session_id."""
class AcpxSessionClosed(AcpxError):
"""The session has been closed; turn is not allowed."""
class AcpxTurnFailed(AcpxError):
"""ACPX subprocess exited non-zero or produced unparseable output."""
def __init__(self, message: str, *, exit_code: int | None = None, stderr: str = ""):
super().__init__(message)
self.exit_code = exit_code
self.stderr = stderr
@dataclass
class AcpxTurnResult:
ok: bool
events: list[dict]
stop_reason: str | None
duration_ms: int
error: str | None = None
stderr_tail: str = ""
@dataclass
class AcpxSession:
"""In-memory handle for one acpx session record.
Owns: a per-session asyncio.Lock that serializes turns, the on-disk cwd
where acpx stores its session state (because acpx scopes by cwd), and the
UUID we passed as `--name` to acpx.
"""
session_id: str # also the acpx --name value
app_name: str
agent: str
cwd: Path
created_at: int
closed: bool = False
last_turn_at: int | None = None
turn_count: int = 0
_lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False)
class AcpxManager:
"""Process-pool-ish manager around acpx subprocess invocations.
Holds the in-memory `AcpxSession` registry. The on-disk truth lives in
two places: clawdforge's SQLite (the per-app ledger) and acpx's own
~/.acpx/sessions/*.json files (which acpx owns).
The pool is bounded — `MAX_LIVE_SESSIONS` caps how many open sessions
exist at a given moment across all apps. This protects against runaway
callers exhausting fd's or filling /root/.acpx.
"""
DEFAULT_TURN_TIMEOUT_SECS = 600 # generous; turns can be long-running
def __init__(
self,
*,
acpx_bin: str = "acpx",
sessions_cwd_root: str | Path = "/data/acpx-cwds",
max_live_sessions: int = 32,
turn_timeout_secs: int | None = None,
):
self.acpx_bin = acpx_bin
self.sessions_cwd_root = Path(sessions_cwd_root)
self.sessions_cwd_root.mkdir(parents=True, exist_ok=True)
self.max_live_sessions = max_live_sessions
self.turn_timeout_secs = turn_timeout_secs or self.DEFAULT_TURN_TIMEOUT_SECS
self._sessions: dict[str, AcpxSession] = {}
self._registry_lock = asyncio.Lock()
# ---- public API ------------------------------------------------------
async def create(self, *, app_name: str, agent: str = "claude") -> AcpxSession:
"""Create a new acpx-backed session.
Mints a UUID, makes a per-session cwd, runs `acpx <agent> sessions
new --name <uuid> --cwd <dir> --format json`, captures the
session_ensured event, returns the in-memory handle.
"""
async with self._registry_lock:
if self._count_open_locked() >= self.max_live_sessions:
raise AcpxPoolFull(
f"max_live_sessions={self.max_live_sessions} reached"
)
session_id = uuid.uuid4().hex
cwd = self.sessions_cwd_root / session_id
cwd.mkdir(parents=True, exist_ok=True)
# acpx sessions new --name <uuid> on this cwd. JSON output gives
# us a `session_ensured` line we can sanity-check.
cmd = [
self.acpx_bin,
"--format", "json",
"--cwd", str(cwd),
agent,
"sessions", "new",
"--name", session_id,
]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as e:
shutil.rmtree(cwd, ignore_errors=True)
raise AcpxNotInstalled(f"acpx binary not found: {self.acpx_bin}") from e
try:
stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=60)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
shutil.rmtree(cwd, ignore_errors=True)
raise AcpxTurnFailed("acpx sessions new timed out after 60s")
stdout = stdout_b.decode("utf-8", "replace")
stderr = stderr_b.decode("utf-8", "replace")
if proc.returncode != ACPX_EXIT_OK:
shutil.rmtree(cwd, ignore_errors=True)
raise AcpxTurnFailed(
f"acpx sessions new exit {proc.returncode}",
exit_code=proc.returncode,
stderr=stderr[-2000:],
)
# We don't strictly need to parse the JSON — our --name is the
# canonical id — but doing so catches weird API drift early.
ensured = _first_json_line(stdout)
if ensured is not None:
action = ensured.get("action")
if action not in ("session_ensured", "session_created"):
# Soft warning only; action name is implementation-defined and
# we have all we need (the --name we passed in).
pass
sess = AcpxSession(
session_id=session_id,
app_name=app_name,
agent=agent,
cwd=cwd,
created_at=int(time.time()),
)
self._sessions[session_id] = sess
return sess
async def turn(
self,
*,
session_id: str,
prompt: str,
files: list[str] | None = None,
timeout_secs: int | None = None,
) -> AcpxTurnResult:
"""Send a single turn to an existing session, await `end_turn`.
Caller is responsible for ledger updates (session_events row,
last_turn_at bump). This method only drives the subprocess and
parses NDJSON.
"""
sess = self._sessions.get(session_id)
if sess is None:
raise AcpxSessionNotFound(session_id)
if sess.closed:
raise AcpxSessionClosed(session_id)
async with sess._lock:
# If files are passed, we prepend a [files] hint to the prompt.
# ACPX's `claude` adapter is Claude Code's ACP wrapper, which
# does not currently expose a file-attachment ACP method that
# acpx surfaces on the CLI. The agent has Read tool access in
# its session cwd so passing absolute paths in the prompt
# itself lets the agent open them via Read. This matches the
# /run subprocess behavior (it just uses --files which is also
# a "tell the model to open these" pattern).
full_prompt = _format_prompt_with_files(prompt, files)
cmd = [
self.acpx_bin,
"--format", "json",
"--json-strict",
"--approve-all", # non-interactive container; trusted callers
"--cwd", str(sess.cwd),
sess.agent,
"prompt",
"-s", sess.session_id,
full_prompt,
]
timeout = timeout_secs or self.turn_timeout_secs
started = time.monotonic()
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as e:
raise AcpxNotInstalled(f"acpx binary not found: {self.acpx_bin}") from e
try:
stdout_b, stderr_b = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
duration_ms = int((time.monotonic() - started) * 1000)
return AcpxTurnResult(
ok=False,
events=[],
stop_reason="timeout",
duration_ms=duration_ms,
error=f"acpx turn timed out after {timeout}s",
)
duration_ms = int((time.monotonic() - started) * 1000)
stdout = stdout_b.decode("utf-8", "replace")
stderr = stderr_b.decode("utf-8", "replace")
events = _parse_ndjson(stdout)
stop_reason = _extract_stop_reason(events)
if proc.returncode != ACPX_EXIT_OK:
error_msg = _exit_code_to_error(proc.returncode)
return AcpxTurnResult(
ok=False,
events=events,
stop_reason=stop_reason or "error",
duration_ms=duration_ms,
error=error_msg,
stderr_tail=stderr[-2000:],
)
sess.last_turn_at = int(time.time())
sess.turn_count += 1
return AcpxTurnResult(
ok=True,
events=events,
stop_reason=stop_reason,
duration_ms=duration_ms,
stderr_tail=stderr[-1000:] if stderr else "",
)
async def close(self, session_id: str) -> bool:
"""Soft-close session via `acpx sessions close <name>`.
Returns True if this call closed it, False if it was already closed
or unknown. Idempotent.
"""
sess = self._sessions.get(session_id)
if sess is None:
return False
if sess.closed:
return False
async with sess._lock:
if sess.closed:
return False
cmd = [
self.acpx_bin,
"--format", "json",
"--cwd", str(sess.cwd),
sess.agent,
"sessions", "close",
sess.session_id,
]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
await asyncio.wait_for(proc.communicate(), timeout=30)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
except FileNotFoundError:
# No acpx binary — best-effort flag in memory and bail
pass
sess.closed = True
# Clean up the per-session cwd. acpx keeps its own session
# record under ~/.acpx/sessions/, the cwd was just a scope key
# for us.
shutil.rmtree(sess.cwd, ignore_errors=True)
return True
async def forget(self, session_id: str) -> None:
"""Drop the in-memory handle. Caller must have closed first."""
async with self._registry_lock:
self._sessions.pop(session_id, None)
def get(self, session_id: str) -> AcpxSession | None:
return self._sessions.get(session_id)
def list_for_app(self, app_name: str) -> list[AcpxSession]:
return [s for s in self._sessions.values() if s.app_name == app_name]
def count_open(self) -> int:
return self._count_open_locked()
def _count_open_locked(self) -> int:
return sum(1 for s in self._sessions.values() if not s.closed)
# ---- module-level helpers -------------------------------------------------
def _first_json_line(stdout: str) -> dict | None:
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
if isinstance(obj, dict):
return obj
except json.JSONDecodeError:
continue
return None
def _parse_ndjson(stdout: str) -> list[dict]:
"""Parse acpx --format json NDJSON output.
acpx emits raw ACP JSON-RPC messages, one per line. We collect every
parseable dict; bad lines are skipped silently.
"""
out: list[dict] = []
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(obj, dict):
out.append(obj)
return out
def _extract_stop_reason(events: list[dict]) -> str | None:
"""Find the result.stopReason from the JSON-RPC response envelope.
ACP terminal envelope: `{"jsonrpc":"2.0","id":"req-N","result":{"stopReason":"end_turn"}}`
"""
for ev in reversed(events):
result = ev.get("result")
if isinstance(result, dict) and "stopReason" in result:
return result.get("stopReason")
return None
def _exit_code_to_error(code: int) -> str:
return {
ACPX_EXIT_AGENT_ERROR: "agent or protocol error",
ACPX_EXIT_USAGE: "acpx CLI usage error",
ACPX_EXIT_TIMEOUT: "acpx-side timeout",
ACPX_EXIT_NO_SESSION: "session not found by acpx",
ACPX_EXIT_PERMISSION_DENIED: "permission denied during turn",
130: "interrupted",
}.get(code, f"acpx exit {code}")
def _format_prompt_with_files(prompt: str, files: list[str] | None) -> str:
if not files:
return prompt
# Files are absolute paths inside the container; the agent has Read access
# via its session cwd. Hint at the top of the prompt; the agent will
# decide whether to open them.
header_lines = ["[Attached files — open with Read tool as needed]"]
for p in files:
header_lines.append(f" - {p}")
return "\n".join(header_lines) + "\n\n" + prompt
def is_acpx_available(acpx_bin: str = "acpx") -> bool:
"""Cheap check used by /healthz + tests to skip if acpx isn't present."""
return shutil.which(acpx_bin) is not None