- Dockerfile: install acpx@latest alongside @anthropic-ai/claude-code
- compose.yml: bind /mnt/user/appdata/clawdforge/acpx-sessions:/root/.acpx/sessions
- DB: additive sessions + session_events tables in store.py SCHEMA
- clawdforge/acpx_runner.py: AcpxManager + AcpxSession, bounded async pool,
per-invocation subprocess model (acpx CLI itself owns the queue-owner
lifecycle, so each turn = one fresh `acpx prompt -s <uuid>` call)
- server.py: POST/GET/DELETE /sessions + POST /sessions/{id}/turn + GET /sessions
- Per-app isolation: 404 (not 403) on cross-token session access — no
existence leak across tokens
- Lifespan-managed TTL sweeper: every 60s soft-closes idle sessions past
CLAWDFORGE_SESSION_TTL_SECS (1h default), hard-deletes ledger rows past
CLAWDFORGE_SESSION_HARD_TTL_SECS (24h default)
- session_events audit table parallel to existing runs table
(events: create, turn, close, sweep_close, hard_delete)
- /healthz now reports acpx_present + acpx_version + open_sessions count
- tests/test_sessions.py: 16 tests covering create/turn/close/list/isolation/
sweep/pool-full/regression. /run regression test asserts byte-identical
v0.1 response shape.
ACPX research notes (v0.6.1, openclaw/acpx):
- npm package is `acpx`, not `@openclaw/acpx`
- Sessions are scoped by (agentCommand, cwd, name?). We mint our own UUID
as `--name` and give every session a unique cwd subdir, so the scope key
is collision-free across apps.
- session_id source: ours. We pass --name <uuid>, ACPX records it under
~/.acpx/sessions/<encoded-id>.json. We never need to parse ACPX's
acpxRecordId — our UUID is canonical.
- Subprocess lifetime: per-invocation, NOT per-session. The acpx CLI itself
spawns/maintains a per-session "queue owner" process via local IPC; each
`acpx prompt` call we make either elects itself owner or enqueues. The
AcpxSession class is therefore a thin (uuid, cwd, asyncio.Lock) handle,
not a long-lived stdio pipe. The spec's "owns one stdio pipe pair" model
was rewritten to match reality — flagged here.
- Close semantics: soft-close via `acpx sessions close <name>`. The
on-disk record stays (ACPX's `sessions prune` is the hard-delete path,
not invoked from clawdforge). DELETE /sessions/<id> is documented as
idempotent (200 with already_closed=true on second call) so SDKs can
call close() in finally/Drop blocks safely.
- File uploads: ACPX has no file-attach ACP method exposed via the CLI.
We prepend a [Attached files] header listing absolute paths; the agent
uses its Read tool to open them. Same behavior as /run --files in v0.1.
- Permissions: --approve-all on the turn invocation since the container is
unattended and callers are bearer-token-trusted. Future v0.3 may expose
a per-session permission policy.
/run endpoint unchanged — backwards compat verified by
test_run_endpoint_unchanged + test_run_endpoint_unchanged_error_shape.
Spec: memory/spec-clawdforge-v0.2.md
ACPX CLI ref: https://github.com/openclaw/acpx/blob/main/docs/CLI.md
234 lines
8.1 KiB
Python
234 lines
8.1 KiB
Python
"""Shared pytest fixtures for the clawdforge server suite.
|
|
|
|
Provides:
|
|
- Per-test temp DB + acpx-cwd-root via env overrides (config.load is import-time-cached
|
|
in server.py, so we monkeypatch the module-level cfg/store/acpx_manager directly).
|
|
- A FakeAcpxManager that conforms to AcpxManager's surface but never spawns a real
|
|
subprocess. Use it to drive the FastAPI client without an installed `acpx` binary.
|
|
- A TestClient configured with an admin token + one app token already minted, so
|
|
tests can hit /sessions/* without going through the bootstrap dance.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import tempfile
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
|
|
@dataclass
|
|
class _FakeSession:
|
|
session_id: str
|
|
app_name: str
|
|
agent: str
|
|
cwd: Path
|
|
created_at: int
|
|
closed: bool = False
|
|
last_turn_at: int | None = None
|
|
turn_count: int = 0
|
|
|
|
|
|
class FakeAcpxManager:
|
|
"""Drop-in for AcpxManager that simulates acpx-side behavior in-process.
|
|
|
|
Records calls so tests can assert on shape without checking acpx binaries.
|
|
"""
|
|
|
|
def __init__(self, *, max_live_sessions: int = 32, sessions_cwd_root: str | Path = "/tmp/fake-acpx"):
|
|
self.max_live_sessions = max_live_sessions
|
|
self.sessions_cwd_root = Path(sessions_cwd_root)
|
|
self.sessions_cwd_root.mkdir(parents=True, exist_ok=True)
|
|
self._sessions: dict[str, _FakeSession] = {}
|
|
self.acpx_bin = "fake-acpx"
|
|
# Test injectable: override the next turn() result, or raise.
|
|
self.next_turn_events: list[dict] | None = None
|
|
self.next_turn_stop_reason: str = "end_turn"
|
|
self.next_turn_ok: bool = True
|
|
self.next_turn_error: str | None = None
|
|
self.calls: list[dict] = []
|
|
|
|
async def create(self, *, app_name: str, agent: str = "claude"):
|
|
from clawdforge.acpx_runner import AcpxPoolFull
|
|
if self._count_open() >= self.max_live_sessions:
|
|
raise AcpxPoolFull(f"max_live_sessions={self.max_live_sessions} reached")
|
|
sid = uuid.uuid4().hex
|
|
cwd = self.sessions_cwd_root / sid
|
|
cwd.mkdir(parents=True, exist_ok=True)
|
|
sess = _FakeSession(
|
|
session_id=sid,
|
|
app_name=app_name,
|
|
agent=agent,
|
|
cwd=cwd,
|
|
created_at=int(time.time()),
|
|
)
|
|
self._sessions[sid] = sess
|
|
self.calls.append({"op": "create", "session_id": sid, "app_name": app_name})
|
|
return sess
|
|
|
|
async def turn(self, *, session_id: str, prompt: str, files=None, timeout_secs=None):
|
|
from clawdforge.acpx_runner import (
|
|
AcpxSessionClosed,
|
|
AcpxSessionNotFound,
|
|
AcpxTurnResult,
|
|
)
|
|
sess = self._sessions.get(session_id)
|
|
if sess is None:
|
|
raise AcpxSessionNotFound(session_id)
|
|
if sess.closed:
|
|
raise AcpxSessionClosed(session_id)
|
|
|
|
self.calls.append({"op": "turn", "session_id": session_id, "prompt": prompt, "files": files})
|
|
|
|
events = self.next_turn_events if self.next_turn_events is not None else [
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"method": "session/update",
|
|
"params": {
|
|
"sessionUpdate": "agent_message_chunk",
|
|
"content": {"type": "text", "text": "hello"},
|
|
},
|
|
},
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"id": "req-1",
|
|
"result": {"stopReason": self.next_turn_stop_reason},
|
|
},
|
|
]
|
|
|
|
if self.next_turn_ok:
|
|
sess.last_turn_at = int(time.time())
|
|
sess.turn_count += 1
|
|
return AcpxTurnResult(
|
|
ok=self.next_turn_ok,
|
|
events=events,
|
|
stop_reason=self.next_turn_stop_reason,
|
|
duration_ms=42,
|
|
error=self.next_turn_error,
|
|
)
|
|
|
|
async def close(self, session_id: str) -> bool:
|
|
sess = self._sessions.get(session_id)
|
|
if sess is None or sess.closed:
|
|
return False
|
|
sess.closed = True
|
|
self.calls.append({"op": "close", "session_id": session_id})
|
|
return True
|
|
|
|
async def forget(self, session_id: str) -> None:
|
|
self._sessions.pop(session_id, None)
|
|
|
|
def get(self, session_id: str):
|
|
return self._sessions.get(session_id)
|
|
|
|
def list_for_app(self, app_name: str):
|
|
return [s for s in self._sessions.values() if s.app_name == app_name]
|
|
|
|
def count_open(self) -> int:
|
|
return self._count_open()
|
|
|
|
def _count_open(self) -> int:
|
|
return sum(1 for s in self._sessions.values() if not s.closed)
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_workspace(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
|
|
"""Set every clawdforge env var pointing at a fresh tmp dir before importing the server."""
|
|
db_path = tmp_path / "clawdforge.db"
|
|
runs_dir = tmp_path / "runs"
|
|
acpx_cwds = tmp_path / "acpx-cwds"
|
|
runs_dir.mkdir()
|
|
acpx_cwds.mkdir()
|
|
admin_token = "admin-test-bootstrap"
|
|
|
|
monkeypatch.setenv("DB_PATH", str(db_path))
|
|
monkeypatch.setenv("RUNS_DIR", str(runs_dir))
|
|
monkeypatch.setenv("ACPX_SESSIONS_CWD", str(acpx_cwds))
|
|
monkeypatch.setenv("ADMIN_BOOTSTRAP_TOKEN", admin_token)
|
|
monkeypatch.setenv("ALLOW_CIDRS", "127.0.0.0/8,::1/128")
|
|
monkeypatch.setenv("CLAUDE_BIN", "/bin/true") # /run path won't be hit unless asked
|
|
monkeypatch.setenv("ACPX_BIN", "/bin/true")
|
|
monkeypatch.setenv("CLAWDFORGE_SESSION_TTL_SECS", "3600")
|
|
monkeypatch.setenv("CLAWDFORGE_SESSION_HARD_TTL_SECS", "86400")
|
|
monkeypatch.setenv("CLAWDFORGE_SWEEP_INTERVAL_SECS", "60")
|
|
monkeypatch.setenv("CLAWDFORGE_MAX_LIVE_SESSIONS", "8")
|
|
yield {
|
|
"db_path": str(db_path),
|
|
"runs_dir": str(runs_dir),
|
|
"acpx_cwds": str(acpx_cwds),
|
|
"admin_token": admin_token,
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def client(tmp_workspace, monkeypatch: pytest.MonkeyPatch):
|
|
"""FastAPI TestClient with FakeAcpxManager swapped in. Pre-mints one app token.
|
|
|
|
Yields (TestClient, dict) where dict has: admin_token, app_token, app_name, fake_acpx, store.
|
|
"""
|
|
# Force-reimport so module-level cfg/store/acpx_manager pick up our env.
|
|
import importlib
|
|
import sys
|
|
|
|
for mod_name in [
|
|
"clawdforge.server",
|
|
"clawdforge.acpx_runner",
|
|
"clawdforge.config",
|
|
"clawdforge.store",
|
|
"clawdforge.runner",
|
|
"clawdforge.auth",
|
|
"clawdforge",
|
|
]:
|
|
sys.modules.pop(mod_name, None)
|
|
|
|
from clawdforge import server as srv # noqa: WPS433
|
|
from clawdforge import auth as auth_mod
|
|
|
|
# FastAPI's TestClient reports `request.client.host == "testclient"` which
|
|
# fails the IP allowlist check. Force loopback for tests.
|
|
monkeypatch.setattr(auth_mod, "_client_ip", lambda _req: "127.0.0.1")
|
|
|
|
# Replace the real AcpxManager with our fake. The server module already
|
|
# holds a reference; rebind it.
|
|
fake = FakeAcpxManager(
|
|
max_live_sessions=srv.cfg.max_live_sessions,
|
|
sessions_cwd_root=srv.cfg.acpx_sessions_cwd,
|
|
)
|
|
srv.acpx_manager = fake
|
|
|
|
from fastapi.testclient import TestClient
|
|
with TestClient(srv.app) as tc:
|
|
# Mint an app token via admin endpoint
|
|
app_name = "testapp"
|
|
r = tc.post(
|
|
"/admin/tokens",
|
|
headers={"Authorization": f"Bearer {tmp_workspace['admin_token']}"},
|
|
json={"name": app_name, "ip_cidrs": []},
|
|
)
|
|
assert r.status_code == 200, r.text
|
|
app_token = r.json()["token"]
|
|
|
|
# Mint a second app token for isolation tests
|
|
r2 = tc.post(
|
|
"/admin/tokens",
|
|
headers={"Authorization": f"Bearer {tmp_workspace['admin_token']}"},
|
|
json={"name": "otherapp", "ip_cidrs": []},
|
|
)
|
|
assert r2.status_code == 200, r2.text
|
|
other_token = r2.json()["token"]
|
|
|
|
yield tc, {
|
|
"admin_token": tmp_workspace["admin_token"],
|
|
"app_token": app_token,
|
|
"app_name": app_name,
|
|
"other_token": other_token,
|
|
"other_name": "otherapp",
|
|
"fake_acpx": fake,
|
|
"store": srv.store,
|
|
"cfg": srv.cfg,
|
|
"server": srv,
|
|
}
|