"""Shared pytest fixtures for the clawdforge server suite. Provides: - Per-test temp DB + acpx-cwd-root via env overrides (config.load is import-time-cached in server.py, so we monkeypatch the module-level cfg/store/acpx_manager directly). - A FakeAcpxManager that conforms to AcpxManager's surface but never spawns a real subprocess. Use it to drive the FastAPI client without an installed `acpx` binary. - A TestClient configured with an admin token + one app token already minted, so tests can hit /sessions/* without going through the bootstrap dance. """ from __future__ import annotations import asyncio import os import tempfile import time import uuid from dataclasses import dataclass, field from pathlib import Path import pytest @dataclass class _FakeSession: session_id: str app_name: str agent: str cwd: Path created_at: int closed: bool = False last_turn_at: int | None = None turn_count: int = 0 class FakeAcpxManager: """Drop-in for AcpxManager that simulates acpx-side behavior in-process. Records calls so tests can assert on shape without checking acpx binaries. """ def __init__(self, *, max_live_sessions: int = 32, sessions_cwd_root: str | Path = "/tmp/fake-acpx"): self.max_live_sessions = max_live_sessions self.sessions_cwd_root = Path(sessions_cwd_root) self.sessions_cwd_root.mkdir(parents=True, exist_ok=True) self._sessions: dict[str, _FakeSession] = {} self.acpx_bin = "fake-acpx" # Test injectable: override the next turn() result, or raise. self.next_turn_events: list[dict] | None = None self.next_turn_stop_reason: str = "end_turn" self.next_turn_ok: bool = True self.next_turn_error: str | None = None self.calls: list[dict] = [] async def create(self, *, app_name: str, agent: str = "claude"): from clawdforge.acpx_runner import AcpxPoolFull if self._count_open() >= self.max_live_sessions: raise AcpxPoolFull(f"max_live_sessions={self.max_live_sessions} reached") sid = uuid.uuid4().hex cwd = self.sessions_cwd_root / sid cwd.mkdir(parents=True, exist_ok=True) sess = _FakeSession( session_id=sid, app_name=app_name, agent=agent, cwd=cwd, created_at=int(time.time()), ) self._sessions[sid] = sess self.calls.append({"op": "create", "session_id": sid, "app_name": app_name}) return sess async def turn(self, *, session_id: str, prompt: str, files=None, timeout_secs=None): from clawdforge.acpx_runner import ( AcpxSessionClosed, AcpxSessionNotFound, AcpxTurnResult, ) sess = self._sessions.get(session_id) if sess is None: raise AcpxSessionNotFound(session_id) if sess.closed: raise AcpxSessionClosed(session_id) self.calls.append({"op": "turn", "session_id": session_id, "prompt": prompt, "files": files}) events = self.next_turn_events if self.next_turn_events is not None else [ { "jsonrpc": "2.0", "method": "session/update", "params": { "sessionUpdate": "agent_message_chunk", "content": {"type": "text", "text": "hello"}, }, }, { "jsonrpc": "2.0", "id": "req-1", "result": {"stopReason": self.next_turn_stop_reason}, }, ] if self.next_turn_ok: sess.last_turn_at = int(time.time()) sess.turn_count += 1 return AcpxTurnResult( ok=self.next_turn_ok, events=events, stop_reason=self.next_turn_stop_reason, duration_ms=42, error=self.next_turn_error, ) async def close(self, session_id: str) -> bool: sess = self._sessions.get(session_id) if sess is None or sess.closed: return False sess.closed = True self.calls.append({"op": "close", "session_id": session_id}) return True async def forget(self, session_id: str) -> None: self._sessions.pop(session_id, None) def get(self, session_id: str): return self._sessions.get(session_id) def list_for_app(self, app_name: str): return [s for s in self._sessions.values() if s.app_name == app_name] def count_open(self) -> int: return self._count_open() def _count_open(self) -> int: return sum(1 for s in self._sessions.values() if not s.closed) @pytest.fixture def tmp_workspace(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): """Set every clawdforge env var pointing at a fresh tmp dir before importing the server.""" db_path = tmp_path / "clawdforge.db" runs_dir = tmp_path / "runs" acpx_cwds = tmp_path / "acpx-cwds" runs_dir.mkdir() acpx_cwds.mkdir() admin_token = "admin-test-bootstrap" monkeypatch.setenv("DB_PATH", str(db_path)) monkeypatch.setenv("RUNS_DIR", str(runs_dir)) monkeypatch.setenv("ACPX_SESSIONS_CWD", str(acpx_cwds)) monkeypatch.setenv("ADMIN_BOOTSTRAP_TOKEN", admin_token) monkeypatch.setenv("ALLOW_CIDRS", "127.0.0.0/8,::1/128") monkeypatch.setenv("CLAUDE_BIN", "/bin/true") # /run path won't be hit unless asked monkeypatch.setenv("ACPX_BIN", "/bin/true") monkeypatch.setenv("CLAWDFORGE_SESSION_TTL_SECS", "3600") monkeypatch.setenv("CLAWDFORGE_SESSION_HARD_TTL_SECS", "86400") monkeypatch.setenv("CLAWDFORGE_SWEEP_INTERVAL_SECS", "60") monkeypatch.setenv("CLAWDFORGE_MAX_LIVE_SESSIONS", "8") yield { "db_path": str(db_path), "runs_dir": str(runs_dir), "acpx_cwds": str(acpx_cwds), "admin_token": admin_token, } @pytest.fixture def client(tmp_workspace, monkeypatch: pytest.MonkeyPatch): """FastAPI TestClient with FakeAcpxManager swapped in. Pre-mints one app token. Yields (TestClient, dict) where dict has: admin_token, app_token, app_name, fake_acpx, store. """ # Force-reimport so module-level cfg/store/acpx_manager pick up our env. import importlib import sys for mod_name in [ "clawdforge.server", "clawdforge.acpx_runner", "clawdforge.config", "clawdforge.store", "clawdforge.runner", "clawdforge.auth", "clawdforge", ]: sys.modules.pop(mod_name, None) from clawdforge import server as srv # noqa: WPS433 from clawdforge import auth as auth_mod # FastAPI's TestClient reports `request.client.host == "testclient"` which # fails the IP allowlist check. Force loopback for tests. monkeypatch.setattr(auth_mod, "_client_ip", lambda _req: "127.0.0.1") # Replace the real AcpxManager with our fake. The server module already # holds a reference; rebind it. fake = FakeAcpxManager( max_live_sessions=srv.cfg.max_live_sessions, sessions_cwd_root=srv.cfg.acpx_sessions_cwd, ) srv.acpx_manager = fake from fastapi.testclient import TestClient with TestClient(srv.app) as tc: # Mint an app token via admin endpoint app_name = "testapp" r = tc.post( "/admin/tokens", headers={"Authorization": f"Bearer {tmp_workspace['admin_token']}"}, json={"name": app_name, "ip_cidrs": []}, ) assert r.status_code == 200, r.text app_token = r.json()["token"] # Mint a second app token for isolation tests r2 = tc.post( "/admin/tokens", headers={"Authorization": f"Bearer {tmp_workspace['admin_token']}"}, json={"name": "otherapp", "ip_cidrs": []}, ) assert r2.status_code == 200, r2.text other_token = r2.json()["token"] yield tc, { "admin_token": tmp_workspace["admin_token"], "app_token": app_token, "app_name": app_name, "other_token": other_token, "other_name": "otherapp", "fake_acpx": fake, "store": srv.store, "cfg": srv.cfg, "server": srv, }