clawdforge/tests/conftest.py
Kayos 940861f70a v0.2: multi-turn /sessions endpoints backed by ACPX
- Dockerfile: install acpx@latest alongside @anthropic-ai/claude-code
- compose.yml: bind /mnt/user/appdata/clawdforge/acpx-sessions:/root/.acpx/sessions
- DB: additive sessions + session_events tables in store.py SCHEMA
- clawdforge/acpx_runner.py: AcpxManager + AcpxSession, bounded async pool,
  per-invocation subprocess model (acpx CLI itself owns the queue-owner
  lifecycle, so each turn = one fresh `acpx prompt -s <uuid>` call)
- server.py: POST/GET/DELETE /sessions + POST /sessions/{id}/turn + GET /sessions
- Per-app isolation: 404 (not 403) on cross-token session access — no
  existence leak across tokens
- Lifespan-managed TTL sweeper: every 60s soft-closes idle sessions past
  CLAWDFORGE_SESSION_TTL_SECS (1h default), hard-deletes ledger rows past
  CLAWDFORGE_SESSION_HARD_TTL_SECS (24h default)
- session_events audit table parallel to existing runs table
  (events: create, turn, close, sweep_close, hard_delete)
- /healthz now reports acpx_present + acpx_version + open_sessions count
- tests/test_sessions.py: 16 tests covering create/turn/close/list/isolation/
  sweep/pool-full/regression. /run regression test asserts byte-identical
  v0.1 response shape.

ACPX research notes (v0.6.1, openclaw/acpx):
- npm package is `acpx`, not `@openclaw/acpx`
- Sessions are scoped by (agentCommand, cwd, name?). We mint our own UUID
  as `--name` and give every session a unique cwd subdir, so the scope key
  is collision-free across apps.
- session_id source: ours. We pass --name <uuid>, ACPX records it under
  ~/.acpx/sessions/<encoded-id>.json. We never need to parse ACPX's
  acpxRecordId — our UUID is canonical.
- Subprocess lifetime: per-invocation, NOT per-session. The acpx CLI itself
  spawns/maintains a per-session "queue owner" process via local IPC; each
  `acpx prompt` call we make either elects itself owner or enqueues. The
  AcpxSession class is therefore a thin (uuid, cwd, asyncio.Lock) handle,
  not a long-lived stdio pipe. The spec's "owns one stdio pipe pair" model
  was rewritten to match reality — flagged here.
- Close semantics: soft-close via `acpx sessions close <name>`. The
  on-disk record stays (ACPX's `sessions prune` is the hard-delete path,
  not invoked from clawdforge). DELETE /sessions/<id> is documented as
  idempotent (200 with already_closed=true on second call) so SDKs can
  call close() in finally/Drop blocks safely.
- File uploads: ACPX has no file-attach ACP method exposed via the CLI.
  We prepend a [Attached files] header listing absolute paths; the agent
  uses its Read tool to open them. Same behavior as /run --files in v0.1.
- Permissions: --approve-all on the turn invocation since the container is
  unattended and callers are bearer-token-trusted. Future v0.3 may expose
  a per-session permission policy.

/run endpoint unchanged — backwards compat verified by
test_run_endpoint_unchanged + test_run_endpoint_unchanged_error_shape.

Spec: memory/spec-clawdforge-v0.2.md
ACPX CLI ref: https://github.com/openclaw/acpx/blob/main/docs/CLI.md
2026-04-29 06:22:55 -07:00

234 lines
8.1 KiB
Python

"""Shared pytest fixtures for the clawdforge server suite.
Provides:
- Per-test temp DB + acpx-cwd-root via env overrides (config.load is import-time-cached
in server.py, so we monkeypatch the module-level cfg/store/acpx_manager directly).
- A FakeAcpxManager that conforms to AcpxManager's surface but never spawns a real
subprocess. Use it to drive the FastAPI client without an installed `acpx` binary.
- A TestClient configured with an admin token + one app token already minted, so
tests can hit /sessions/* without going through the bootstrap dance.
"""
from __future__ import annotations
import asyncio
import os
import tempfile
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
import pytest
@dataclass
class _FakeSession:
session_id: str
app_name: str
agent: str
cwd: Path
created_at: int
closed: bool = False
last_turn_at: int | None = None
turn_count: int = 0
class FakeAcpxManager:
"""Drop-in for AcpxManager that simulates acpx-side behavior in-process.
Records calls so tests can assert on shape without checking acpx binaries.
"""
def __init__(self, *, max_live_sessions: int = 32, sessions_cwd_root: str | Path = "/tmp/fake-acpx"):
self.max_live_sessions = max_live_sessions
self.sessions_cwd_root = Path(sessions_cwd_root)
self.sessions_cwd_root.mkdir(parents=True, exist_ok=True)
self._sessions: dict[str, _FakeSession] = {}
self.acpx_bin = "fake-acpx"
# Test injectable: override the next turn() result, or raise.
self.next_turn_events: list[dict] | None = None
self.next_turn_stop_reason: str = "end_turn"
self.next_turn_ok: bool = True
self.next_turn_error: str | None = None
self.calls: list[dict] = []
async def create(self, *, app_name: str, agent: str = "claude"):
from clawdforge.acpx_runner import AcpxPoolFull
if self._count_open() >= self.max_live_sessions:
raise AcpxPoolFull(f"max_live_sessions={self.max_live_sessions} reached")
sid = uuid.uuid4().hex
cwd = self.sessions_cwd_root / sid
cwd.mkdir(parents=True, exist_ok=True)
sess = _FakeSession(
session_id=sid,
app_name=app_name,
agent=agent,
cwd=cwd,
created_at=int(time.time()),
)
self._sessions[sid] = sess
self.calls.append({"op": "create", "session_id": sid, "app_name": app_name})
return sess
async def turn(self, *, session_id: str, prompt: str, files=None, timeout_secs=None):
from clawdforge.acpx_runner import (
AcpxSessionClosed,
AcpxSessionNotFound,
AcpxTurnResult,
)
sess = self._sessions.get(session_id)
if sess is None:
raise AcpxSessionNotFound(session_id)
if sess.closed:
raise AcpxSessionClosed(session_id)
self.calls.append({"op": "turn", "session_id": session_id, "prompt": prompt, "files": files})
events = self.next_turn_events if self.next_turn_events is not None else [
{
"jsonrpc": "2.0",
"method": "session/update",
"params": {
"sessionUpdate": "agent_message_chunk",
"content": {"type": "text", "text": "hello"},
},
},
{
"jsonrpc": "2.0",
"id": "req-1",
"result": {"stopReason": self.next_turn_stop_reason},
},
]
if self.next_turn_ok:
sess.last_turn_at = int(time.time())
sess.turn_count += 1
return AcpxTurnResult(
ok=self.next_turn_ok,
events=events,
stop_reason=self.next_turn_stop_reason,
duration_ms=42,
error=self.next_turn_error,
)
async def close(self, session_id: str) -> bool:
sess = self._sessions.get(session_id)
if sess is None or sess.closed:
return False
sess.closed = True
self.calls.append({"op": "close", "session_id": session_id})
return True
async def forget(self, session_id: str) -> None:
self._sessions.pop(session_id, None)
def get(self, session_id: str):
return self._sessions.get(session_id)
def list_for_app(self, app_name: str):
return [s for s in self._sessions.values() if s.app_name == app_name]
def count_open(self) -> int:
return self._count_open()
def _count_open(self) -> int:
return sum(1 for s in self._sessions.values() if not s.closed)
@pytest.fixture
def tmp_workspace(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
"""Set every clawdforge env var pointing at a fresh tmp dir before importing the server."""
db_path = tmp_path / "clawdforge.db"
runs_dir = tmp_path / "runs"
acpx_cwds = tmp_path / "acpx-cwds"
runs_dir.mkdir()
acpx_cwds.mkdir()
admin_token = "admin-test-bootstrap"
monkeypatch.setenv("DB_PATH", str(db_path))
monkeypatch.setenv("RUNS_DIR", str(runs_dir))
monkeypatch.setenv("ACPX_SESSIONS_CWD", str(acpx_cwds))
monkeypatch.setenv("ADMIN_BOOTSTRAP_TOKEN", admin_token)
monkeypatch.setenv("ALLOW_CIDRS", "127.0.0.0/8,::1/128")
monkeypatch.setenv("CLAUDE_BIN", "/bin/true") # /run path won't be hit unless asked
monkeypatch.setenv("ACPX_BIN", "/bin/true")
monkeypatch.setenv("CLAWDFORGE_SESSION_TTL_SECS", "3600")
monkeypatch.setenv("CLAWDFORGE_SESSION_HARD_TTL_SECS", "86400")
monkeypatch.setenv("CLAWDFORGE_SWEEP_INTERVAL_SECS", "60")
monkeypatch.setenv("CLAWDFORGE_MAX_LIVE_SESSIONS", "8")
yield {
"db_path": str(db_path),
"runs_dir": str(runs_dir),
"acpx_cwds": str(acpx_cwds),
"admin_token": admin_token,
}
@pytest.fixture
def client(tmp_workspace, monkeypatch: pytest.MonkeyPatch):
"""FastAPI TestClient with FakeAcpxManager swapped in. Pre-mints one app token.
Yields (TestClient, dict) where dict has: admin_token, app_token, app_name, fake_acpx, store.
"""
# Force-reimport so module-level cfg/store/acpx_manager pick up our env.
import importlib
import sys
for mod_name in [
"clawdforge.server",
"clawdforge.acpx_runner",
"clawdforge.config",
"clawdforge.store",
"clawdforge.runner",
"clawdforge.auth",
"clawdforge",
]:
sys.modules.pop(mod_name, None)
from clawdforge import server as srv # noqa: WPS433
from clawdforge import auth as auth_mod
# FastAPI's TestClient reports `request.client.host == "testclient"` which
# fails the IP allowlist check. Force loopback for tests.
monkeypatch.setattr(auth_mod, "_client_ip", lambda _req: "127.0.0.1")
# Replace the real AcpxManager with our fake. The server module already
# holds a reference; rebind it.
fake = FakeAcpxManager(
max_live_sessions=srv.cfg.max_live_sessions,
sessions_cwd_root=srv.cfg.acpx_sessions_cwd,
)
srv.acpx_manager = fake
from fastapi.testclient import TestClient
with TestClient(srv.app) as tc:
# Mint an app token via admin endpoint
app_name = "testapp"
r = tc.post(
"/admin/tokens",
headers={"Authorization": f"Bearer {tmp_workspace['admin_token']}"},
json={"name": app_name, "ip_cidrs": []},
)
assert r.status_code == 200, r.text
app_token = r.json()["token"]
# Mint a second app token for isolation tests
r2 = tc.post(
"/admin/tokens",
headers={"Authorization": f"Bearer {tmp_workspace['admin_token']}"},
json={"name": "otherapp", "ip_cidrs": []},
)
assert r2.status_code == 200, r2.text
other_token = r2.json()["token"]
yield tc, {
"admin_token": tmp_workspace["admin_token"],
"app_token": app_token,
"app_name": app_name,
"other_token": other_token,
"other_name": "otherapp",
"fake_acpx": fake,
"store": srv.store,
"cfg": srv.cfg,
"server": srv,
}