From 940861f70a1d28172d09134f49a333d3e11ab8cd Mon Sep 17 00:00:00 2001 From: Kayos Date: Wed, 29 Apr 2026 06:22:55 -0700 Subject: [PATCH] v0.2: multi-turn /sessions endpoints backed by ACPX MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Dockerfile: install acpx@latest alongside @anthropic-ai/claude-code - compose.yml: bind /mnt/user/appdata/clawdforge/acpx-sessions:/root/.acpx/sessions - DB: additive sessions + session_events tables in store.py SCHEMA - clawdforge/acpx_runner.py: AcpxManager + AcpxSession, bounded async pool, per-invocation subprocess model (acpx CLI itself owns the queue-owner lifecycle, so each turn = one fresh `acpx prompt -s ` call) - server.py: POST/GET/DELETE /sessions + POST /sessions/{id}/turn + GET /sessions - Per-app isolation: 404 (not 403) on cross-token session access — no existence leak across tokens - Lifespan-managed TTL sweeper: every 60s soft-closes idle sessions past CLAWDFORGE_SESSION_TTL_SECS (1h default), hard-deletes ledger rows past CLAWDFORGE_SESSION_HARD_TTL_SECS (24h default) - session_events audit table parallel to existing runs table (events: create, turn, close, sweep_close, hard_delete) - /healthz now reports acpx_present + acpx_version + open_sessions count - tests/test_sessions.py: 16 tests covering create/turn/close/list/isolation/ sweep/pool-full/regression. /run regression test asserts byte-identical v0.1 response shape. ACPX research notes (v0.6.1, openclaw/acpx): - npm package is `acpx`, not `@openclaw/acpx` - Sessions are scoped by (agentCommand, cwd, name?). We mint our own UUID as `--name` and give every session a unique cwd subdir, so the scope key is collision-free across apps. - session_id source: ours. We pass --name , ACPX records it under ~/.acpx/sessions/.json. We never need to parse ACPX's acpxRecordId — our UUID is canonical. - Subprocess lifetime: per-invocation, NOT per-session. The acpx CLI itself spawns/maintains a per-session "queue owner" process via local IPC; each `acpx prompt` call we make either elects itself owner or enqueues. The AcpxSession class is therefore a thin (uuid, cwd, asyncio.Lock) handle, not a long-lived stdio pipe. The spec's "owns one stdio pipe pair" model was rewritten to match reality — flagged here. - Close semantics: soft-close via `acpx sessions close `. The on-disk record stays (ACPX's `sessions prune` is the hard-delete path, not invoked from clawdforge). DELETE /sessions/ is documented as idempotent (200 with already_closed=true on second call) so SDKs can call close() in finally/Drop blocks safely. - File uploads: ACPX has no file-attach ACP method exposed via the CLI. We prepend a [Attached files] header listing absolute paths; the agent uses its Read tool to open them. Same behavior as /run --files in v0.1. - Permissions: --approve-all on the turn invocation since the container is unattended and callers are bearer-token-trusted. Future v0.3 may expose a per-session permission policy. /run endpoint unchanged — backwards compat verified by test_run_endpoint_unchanged + test_run_endpoint_unchanged_error_shape. Spec: memory/spec-clawdforge-v0.2.md ACPX CLI ref: https://github.com/openclaw/acpx/blob/main/docs/CLI.md --- .env.example | 14 ++ Dockerfile | 14 +- README.md | 111 +++++++++- clawdforge/acpx_runner.py | 448 ++++++++++++++++++++++++++++++++++++++ clawdforge/config.py | 14 ++ clawdforge/server.py | 385 ++++++++++++++++++++++++++++++-- clawdforge/store.py | 193 +++++++++++++++- compose.yml | 2 + tests/__init__.py | 0 tests/conftest.py | 234 ++++++++++++++++++++ tests/test_sessions.py | 434 ++++++++++++++++++++++++++++++++++++ 11 files changed, 1829 insertions(+), 20 deletions(-) create mode 100644 clawdforge/acpx_runner.py create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_sessions.py diff --git a/.env.example b/.env.example index 0d9df1f..303a6fd 100644 --- a/.env.example +++ b/.env.example @@ -22,6 +22,20 @@ CLAUDE_BIN=claude DEFAULT_MODEL=sonnet DEFAULT_TIMEOUT_SECS=120 +# ACPX (multi-turn /sessions endpoints). Reuses Claude Code auth at /root/.claude. +ACPX_BIN=acpx +# Working directory for each session's CWD (acpx scopes by cwd; we give each session its own subdir). +ACPX_SESSIONS_CWD=/data/acpx-cwds +# Max simultaneously-open (non-closed) sessions across all apps. New /sessions returns 503 if at cap. +CLAWDFORGE_MAX_LIVE_SESSIONS=32 +# How long an idle session lives before the sweeper soft-closes it. Counted from last_turn_at (or +# created_at if no turn ever ran). +CLAWDFORGE_SESSION_TTL_SECS=3600 +# How long a closed session record stays before hard-delete (ledger row + acpx on-disk metadata). +CLAWDFORGE_SESSION_HARD_TTL_SECS=86400 +# Sweep cadence in seconds. +CLAWDFORGE_SWEEP_INTERVAL_SECS=60 + # Run-staging area inside the container (don't change unless you also change compose mount) RUNS_DIR=/data/runs diff --git a/Dockerfile b/Dockerfile index 8732bdd..ecd345d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,6 +9,10 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # Claude Code CLI RUN npm install -g @anthropic-ai/claude-code +# ACPX — headless Agent Client Protocol CLI (https://github.com/openclaw/acpx). +# Drives multi-turn /sessions endpoints. Shares Claude Code auth from /root/.claude. +RUN npm install -g acpx@latest + # Python deps in a venv ENV VIRTUAL_ENV=/opt/venv RUN python3 -m venv $VIRTUAL_ENV @@ -21,10 +25,12 @@ RUN pip install --no-cache-dir -r requirements.txt COPY clawdforge /app/clawdforge # Persistent volume mount points: -# /data -> sqlite + runs staging -# /root/.claude -> claude code auth (cobb runs `claude /login` once per container) -# /root/.config/claude -> alt config path some claude versions use -RUN mkdir -p /data /root/.claude /root/.config/claude +# /data -> sqlite + runs staging +# /root/.claude -> claude code auth (cobb runs `claude /login` once per container) +# /root/.config/claude -> alt config path some claude versions use +# /root/.acpx/sessions -> acpx session metadata (per-session JSON files) +RUN mkdir -p /data /root/.claude /root/.config/claude /root/.acpx/sessions \ + && chmod 700 /root/.acpx EXPOSE 8800 diff --git a/README.md b/README.md index 2a585e1..d52b1b9 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,13 @@ tokens + IP allowlist. GET /healthz liveness + claude --version smoke POST /run run a prompt, return parsed result POST /files upload a file, get a file_token to pass to /run + +POST /sessions create a multi-turn session (v0.2) +POST /sessions//turn send a turn to a session (v0.2) +GET /sessions/ read session state (v0.2) +DELETE /sessions/ soft-close a session (v0.2) +GET /sessions list this token's sessions (v0.2) + POST /admin/tokens mint a per-app token (admin) GET /admin/tokens list app tokens (admin) DELETE /admin/tokens/ revoke a token (admin) @@ -115,10 +122,112 @@ r.raise_for_status() print(r.json()["result"]) # {'hello': 'world'} ``` +## Multi-turn / Sessions (v0.2) + +`/run` is one-shot: stateless, fast, returns a single result. When you need +multi-turn context (build something step-by-step, debug across iterations, +long-running structured tool-call work), use `/sessions/*`. + +The session surface is backed by [ACPX](https://github.com/openclaw/acpx) — +the OpenClaw team's headless Agent Client Protocol CLI. Clawdforge wraps it +so apps don't need to manage ACPX subprocesses or session metadata directly. +Sessions persist on disk under `/root/.acpx/sessions/` (mounted from the +host) so they survive container rebuilds. + +### Quickstart — three turns in one session + +```bash +TOKEN=$CLAWDFORGE_TOKEN +CF=http://192.168.0.5:8800 + +# 1. Create a session +SID=$(curl -sS -X POST $CF/sessions \ + -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ + -d '{"agent":"claude"}' | jq -r .session_id) + +# 2. Send a turn +curl -sS -X POST $CF/sessions/$SID/turn \ + -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ + -d '{"prompt":"Help me draft a SQL migration. First describe what you need to know."}' + +# 3. Send a follow-up — the session keeps context +curl -sS -X POST $CF/sessions/$SID/turn \ + -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ + -d '{"prompt":"Postgres 16, table users, add column tier text default \"free\""}' + +# 4. Close +curl -sS -X DELETE $CF/sessions/$SID -H "Authorization: Bearer $TOKEN" +``` + +### Turn response shape + +```json +{ + "ok": true, + "session_id": "abc123...", + "turn_index": 2, + "events": [ + {"jsonrpc":"2.0","method":"session/update","params":{"sessionUpdate":"agent_message_chunk","content":{"type":"text","text":"..."}}}, + {"jsonrpc":"2.0","id":"req-1","result":{"stopReason":"end_turn"}} + ], + "stop_reason": "end_turn", + "duration_ms": 12345 +} +``` + +`events` is the raw ACP NDJSON stream from acpx, parsed into objects. Each +entry is a JSON-RPC message — `session/update` for streamed agent output, +tool calls, plan updates, etc., and a final `result` envelope with the +`stopReason`. No streaming/SSE in v0.2; the full event list is returned +when the turn ends. + +### Per-app isolation + +Every session is owned by the token that created it. Cross-token access +returns `404` (not `403`) so token A can't even probe whether token B's +session exists. + +### TTL + cleanup + +A background sweeper runs every `CLAWDFORGE_SWEEP_INTERVAL_SECS` (default 60s): + +- Sessions idle longer than `CLAWDFORGE_SESSION_TTL_SECS` (default 1h) are + soft-closed via `acpx sessions close`. +- Sessions whose `closed_at` is older than `CLAWDFORGE_SESSION_HARD_TTL_SECS` + (default 24h) are hard-deleted from clawdforge's ledger. +- Closed sessions stay queryable via `GET /sessions/` until the hard-TTL + fires. + +### Container / deploy + +The container needs both `claude` and `acpx` on PATH plus a host-mounted +volume for ACPX's session store: + +```yaml +# compose.yml (already configured) +volumes: + - /mnt/user/appdata/clawdforge/acpx-sessions:/root/.acpx/sessions +``` + +```dockerfile +ENV ACPX_BIN=acpx +RUN npm install -g acpx@latest +``` + +ACPX shares Claude Code auth from the same `/root/.claude/` volume the v0.1 +runtime already used, so a single `claude /login` ceremony covers both +`/run` and `/sessions/*`. + ## Notes - The CLI is `@anthropic-ai/claude-code` (not the Python `anthropic` SDK). -- Default model is `sonnet`; per-request override via `model` field. +- ACPX is the upstream session driver — see https://github.com/openclaw/acpx + and `docs/CLI.md` in that repo for protocol semantics. Clawdforge owns + the per-app ledger and TTL policy; ACPX owns session content. +- Default model is `sonnet`; per-request override via `model` field on `/run`. + For sessions, model is fixed at create time (configurable later). - Per-run working directory is staged under `RUNS_DIR` and torn down on exit, so `claude` can't pollute the container's working tree. +- Per-session working directory is staged under `ACPX_SESSIONS_CWD` (default + `/data/acpx-cwds/`) and torn down on close. - File uploads are scoped to the uploading app — token A can't reference token B's files. diff --git a/clawdforge/acpx_runner.py b/clawdforge/acpx_runner.py new file mode 100644 index 0000000..4d370dc --- /dev/null +++ b/clawdforge/acpx_runner.py @@ -0,0 +1,448 @@ +"""ACPX-backed multi-turn session runner (v0.2). + +ACPX (https://github.com/openclaw/acpx) is a headless ACP CLI. Its session +lifecycle is per-invocation: every command starts a fresh `acpx` subprocess +that talks to a per-session "queue owner" via local IPC. Sessions persist on +disk under ~/.acpx/sessions/.json, scoped by +`(agentCommand, cwd, name?)`. + +That means an `AcpxSession` here is NOT a long-lived stdio pipe — it is a +thin handle (cwd, name, asyncio.Lock) that mediates clawdforge's calls into +acpx. Each turn = `await asyncio.create_subprocess_exec(acpx_bin, ...)` and +read NDJSON stdout to completion. + +We give every clawdforge session its own cwd subdirectory + UUID name so +acpx's `(agentCommand, cwd, name)` scope key is collision-free across apps. + +Spec: memory/spec-clawdforge-v0.2.md +ACPX CLI ref: https://github.com/openclaw/acpx/blob/main/docs/CLI.md +""" +from __future__ import annotations + +import asyncio +import json +import os +import shutil +import time +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + + +# Exit codes from acpx CLI (docs/CLI.md) +ACPX_EXIT_OK = 0 +ACPX_EXIT_AGENT_ERROR = 1 +ACPX_EXIT_USAGE = 2 +ACPX_EXIT_TIMEOUT = 3 +ACPX_EXIT_NO_SESSION = 4 +ACPX_EXIT_PERMISSION_DENIED = 5 + + +class AcpxError(Exception): + """Base class for acpx-runner failures surfaced to the API layer.""" + + +class AcpxNotInstalled(AcpxError): + """ACPX_BIN not on PATH or not executable.""" + + +class AcpxPoolFull(AcpxError): + """The bounded live-session pool has no free slot.""" + + +class AcpxSessionNotFound(AcpxError): + """No live AcpxSession in memory for the given session_id.""" + + +class AcpxSessionClosed(AcpxError): + """The session has been closed; turn is not allowed.""" + + +class AcpxTurnFailed(AcpxError): + """ACPX subprocess exited non-zero or produced unparseable output.""" + + def __init__(self, message: str, *, exit_code: int | None = None, stderr: str = ""): + super().__init__(message) + self.exit_code = exit_code + self.stderr = stderr + + +@dataclass +class AcpxTurnResult: + ok: bool + events: list[dict] + stop_reason: str | None + duration_ms: int + error: str | None = None + stderr_tail: str = "" + + +@dataclass +class AcpxSession: + """In-memory handle for one acpx session record. + + Owns: a per-session asyncio.Lock that serializes turns, the on-disk cwd + where acpx stores its session state (because acpx scopes by cwd), and the + UUID we passed as `--name` to acpx. + """ + + session_id: str # also the acpx --name value + app_name: str + agent: str + cwd: Path + created_at: int + closed: bool = False + last_turn_at: int | None = None + turn_count: int = 0 + _lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False) + + +class AcpxManager: + """Process-pool-ish manager around acpx subprocess invocations. + + Holds the in-memory `AcpxSession` registry. The on-disk truth lives in + two places: clawdforge's SQLite (the per-app ledger) and acpx's own + ~/.acpx/sessions/*.json files (which acpx owns). + + The pool is bounded — `MAX_LIVE_SESSIONS` caps how many open sessions + exist at a given moment across all apps. This protects against runaway + callers exhausting fd's or filling /root/.acpx. + """ + + DEFAULT_TURN_TIMEOUT_SECS = 600 # generous; turns can be long-running + + def __init__( + self, + *, + acpx_bin: str = "acpx", + sessions_cwd_root: str | Path = "/data/acpx-cwds", + max_live_sessions: int = 32, + turn_timeout_secs: int | None = None, + ): + self.acpx_bin = acpx_bin + self.sessions_cwd_root = Path(sessions_cwd_root) + self.sessions_cwd_root.mkdir(parents=True, exist_ok=True) + self.max_live_sessions = max_live_sessions + self.turn_timeout_secs = turn_timeout_secs or self.DEFAULT_TURN_TIMEOUT_SECS + self._sessions: dict[str, AcpxSession] = {} + self._registry_lock = asyncio.Lock() + + # ---- public API ------------------------------------------------------ + + async def create(self, *, app_name: str, agent: str = "claude") -> AcpxSession: + """Create a new acpx-backed session. + + Mints a UUID, makes a per-session cwd, runs `acpx sessions + new --name --cwd --format json`, captures the + session_ensured event, returns the in-memory handle. + """ + async with self._registry_lock: + if self._count_open_locked() >= self.max_live_sessions: + raise AcpxPoolFull( + f"max_live_sessions={self.max_live_sessions} reached" + ) + + session_id = uuid.uuid4().hex + cwd = self.sessions_cwd_root / session_id + cwd.mkdir(parents=True, exist_ok=True) + + # acpx sessions new --name on this cwd. JSON output gives + # us a `session_ensured` line we can sanity-check. + cmd = [ + self.acpx_bin, + "--format", "json", + "--cwd", str(cwd), + agent, + "sessions", "new", + "--name", session_id, + ] + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + except FileNotFoundError as e: + shutil.rmtree(cwd, ignore_errors=True) + raise AcpxNotInstalled(f"acpx binary not found: {self.acpx_bin}") from e + + try: + stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=60) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + shutil.rmtree(cwd, ignore_errors=True) + raise AcpxTurnFailed("acpx sessions new timed out after 60s") + + stdout = stdout_b.decode("utf-8", "replace") + stderr = stderr_b.decode("utf-8", "replace") + if proc.returncode != ACPX_EXIT_OK: + shutil.rmtree(cwd, ignore_errors=True) + raise AcpxTurnFailed( + f"acpx sessions new exit {proc.returncode}", + exit_code=proc.returncode, + stderr=stderr[-2000:], + ) + + # We don't strictly need to parse the JSON — our --name is the + # canonical id — but doing so catches weird API drift early. + ensured = _first_json_line(stdout) + if ensured is not None: + action = ensured.get("action") + if action not in ("session_ensured", "session_created"): + # Soft warning only; action name is implementation-defined and + # we have all we need (the --name we passed in). + pass + + sess = AcpxSession( + session_id=session_id, + app_name=app_name, + agent=agent, + cwd=cwd, + created_at=int(time.time()), + ) + self._sessions[session_id] = sess + return sess + + async def turn( + self, + *, + session_id: str, + prompt: str, + files: list[str] | None = None, + timeout_secs: int | None = None, + ) -> AcpxTurnResult: + """Send a single turn to an existing session, await `end_turn`. + + Caller is responsible for ledger updates (session_events row, + last_turn_at bump). This method only drives the subprocess and + parses NDJSON. + """ + sess = self._sessions.get(session_id) + if sess is None: + raise AcpxSessionNotFound(session_id) + if sess.closed: + raise AcpxSessionClosed(session_id) + + async with sess._lock: + # If files are passed, we prepend a [files] hint to the prompt. + # ACPX's `claude` adapter is Claude Code's ACP wrapper, which + # does not currently expose a file-attachment ACP method that + # acpx surfaces on the CLI. The agent has Read tool access in + # its session cwd so passing absolute paths in the prompt + # itself lets the agent open them via Read. This matches the + # /run subprocess behavior (it just uses --files which is also + # a "tell the model to open these" pattern). + full_prompt = _format_prompt_with_files(prompt, files) + + cmd = [ + self.acpx_bin, + "--format", "json", + "--json-strict", + "--approve-all", # non-interactive container; trusted callers + "--cwd", str(sess.cwd), + sess.agent, + "prompt", + "-s", sess.session_id, + full_prompt, + ] + + timeout = timeout_secs or self.turn_timeout_secs + started = time.monotonic() + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + except FileNotFoundError as e: + raise AcpxNotInstalled(f"acpx binary not found: {self.acpx_bin}") from e + + try: + stdout_b, stderr_b = await asyncio.wait_for( + proc.communicate(), timeout=timeout + ) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + duration_ms = int((time.monotonic() - started) * 1000) + return AcpxTurnResult( + ok=False, + events=[], + stop_reason="timeout", + duration_ms=duration_ms, + error=f"acpx turn timed out after {timeout}s", + ) + + duration_ms = int((time.monotonic() - started) * 1000) + stdout = stdout_b.decode("utf-8", "replace") + stderr = stderr_b.decode("utf-8", "replace") + + events = _parse_ndjson(stdout) + stop_reason = _extract_stop_reason(events) + + if proc.returncode != ACPX_EXIT_OK: + error_msg = _exit_code_to_error(proc.returncode) + return AcpxTurnResult( + ok=False, + events=events, + stop_reason=stop_reason or "error", + duration_ms=duration_ms, + error=error_msg, + stderr_tail=stderr[-2000:], + ) + + sess.last_turn_at = int(time.time()) + sess.turn_count += 1 + + return AcpxTurnResult( + ok=True, + events=events, + stop_reason=stop_reason, + duration_ms=duration_ms, + stderr_tail=stderr[-1000:] if stderr else "", + ) + + async def close(self, session_id: str) -> bool: + """Soft-close session via `acpx sessions close `. + + Returns True if this call closed it, False if it was already closed + or unknown. Idempotent. + """ + sess = self._sessions.get(session_id) + if sess is None: + return False + if sess.closed: + return False + + async with sess._lock: + if sess.closed: + return False + + cmd = [ + self.acpx_bin, + "--format", "json", + "--cwd", str(sess.cwd), + sess.agent, + "sessions", "close", + sess.session_id, + ] + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + await asyncio.wait_for(proc.communicate(), timeout=30) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + except FileNotFoundError: + # No acpx binary — best-effort flag in memory and bail + pass + + sess.closed = True + + # Clean up the per-session cwd. acpx keeps its own session + # record under ~/.acpx/sessions/, the cwd was just a scope key + # for us. + shutil.rmtree(sess.cwd, ignore_errors=True) + + return True + + async def forget(self, session_id: str) -> None: + """Drop the in-memory handle. Caller must have closed first.""" + async with self._registry_lock: + self._sessions.pop(session_id, None) + + def get(self, session_id: str) -> AcpxSession | None: + return self._sessions.get(session_id) + + def list_for_app(self, app_name: str) -> list[AcpxSession]: + return [s for s in self._sessions.values() if s.app_name == app_name] + + def count_open(self) -> int: + return self._count_open_locked() + + def _count_open_locked(self) -> int: + return sum(1 for s in self._sessions.values() if not s.closed) + + +# ---- module-level helpers ------------------------------------------------- + + +def _first_json_line(stdout: str) -> dict | None: + for line in stdout.splitlines(): + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + if isinstance(obj, dict): + return obj + except json.JSONDecodeError: + continue + return None + + +def _parse_ndjson(stdout: str) -> list[dict]: + """Parse acpx --format json NDJSON output. + + acpx emits raw ACP JSON-RPC messages, one per line. We collect every + parseable dict; bad lines are skipped silently. + """ + out: list[dict] = [] + for line in stdout.splitlines(): + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + if isinstance(obj, dict): + out.append(obj) + return out + + +def _extract_stop_reason(events: list[dict]) -> str | None: + """Find the result.stopReason from the JSON-RPC response envelope. + + ACP terminal envelope: `{"jsonrpc":"2.0","id":"req-N","result":{"stopReason":"end_turn"}}` + """ + for ev in reversed(events): + result = ev.get("result") + if isinstance(result, dict) and "stopReason" in result: + return result.get("stopReason") + return None + + +def _exit_code_to_error(code: int) -> str: + return { + ACPX_EXIT_AGENT_ERROR: "agent or protocol error", + ACPX_EXIT_USAGE: "acpx CLI usage error", + ACPX_EXIT_TIMEOUT: "acpx-side timeout", + ACPX_EXIT_NO_SESSION: "session not found by acpx", + ACPX_EXIT_PERMISSION_DENIED: "permission denied during turn", + 130: "interrupted", + }.get(code, f"acpx exit {code}") + + +def _format_prompt_with_files(prompt: str, files: list[str] | None) -> str: + if not files: + return prompt + # Files are absolute paths inside the container; the agent has Read access + # via its session cwd. Hint at the top of the prompt; the agent will + # decide whether to open them. + header_lines = ["[Attached files — open with Read tool as needed]"] + for p in files: + header_lines.append(f" - {p}") + return "\n".join(header_lines) + "\n\n" + prompt + + +def is_acpx_available(acpx_bin: str = "acpx") -> bool: + """Cheap check used by /healthz + tests to skip if acpx isn't present.""" + return shutil.which(acpx_bin) is not None diff --git a/clawdforge/config.py b/clawdforge/config.py index f0dcb63..43d1b73 100644 --- a/clawdforge/config.py +++ b/clawdforge/config.py @@ -17,6 +17,14 @@ class Config: runs_dir: str db_path: str + # ACPX (v0.2 — multi-turn /sessions) + acpx_bin: str + acpx_sessions_cwd: str + max_live_sessions: int + session_ttl_secs: int + session_hard_ttl_secs: int + sweep_interval_secs: int + def load() -> Config: return Config( @@ -31,4 +39,10 @@ def load() -> Config: default_timeout_secs=int(os.environ.get("DEFAULT_TIMEOUT_SECS", "120")), runs_dir=os.environ.get("RUNS_DIR", "/data/runs"), db_path=os.environ.get("DB_PATH", "/data/clawdforge.db"), + acpx_bin=os.environ.get("ACPX_BIN", "acpx"), + acpx_sessions_cwd=os.environ.get("ACPX_SESSIONS_CWD", "/data/acpx-cwds"), + max_live_sessions=int(os.environ.get("CLAWDFORGE_MAX_LIVE_SESSIONS", "32")), + session_ttl_secs=int(os.environ.get("CLAWDFORGE_SESSION_TTL_SECS", "3600")), + session_hard_ttl_secs=int(os.environ.get("CLAWDFORGE_SESSION_HARD_TTL_SECS", "86400")), + sweep_interval_secs=int(os.environ.get("CLAWDFORGE_SWEEP_INTERVAL_SECS", "60")), ) diff --git a/clawdforge/server.py b/clawdforge/server.py index 1e6b6fc..882a26d 100644 --- a/clawdforge/server.py +++ b/clawdforge/server.py @@ -1,6 +1,23 @@ -"""FastAPI app exposing /run, /files, /admin/tokens, /healthz.""" +"""FastAPI app exposing /run, /files, /admin/tokens, /healthz, and (v0.2) /sessions. + +v0.2 adds multi-turn session endpoints backed by ACPX: + POST /sessions create + POST /sessions/{id}/turn send a turn + GET /sessions/{id} read state + DELETE /sessions/{id} soft-close + GET /sessions list (per-token) + +Per-app isolation: every session-scoped endpoint returns 404 (not 403) on +cross-token access to avoid leaking session existence across apps. + +The /run endpoint is byte-identical to v0.1 and stays on the bare `claude -p` +subprocess path. ACPX is only used by /sessions. +""" +import asyncio +import logging import os import time +from contextlib import asynccontextmanager from pathlib import Path from typing import Annotated @@ -8,12 +25,26 @@ from fastapi import FastAPI, Header, HTTPException, Request, UploadFile, File, F from fastapi.responses import JSONResponse from pydantic import BaseModel, Field +from .acpx_runner import ( + AcpxManager, + AcpxNotInstalled, + AcpxPoolFull, + AcpxSessionClosed, + AcpxSessionNotFound, + AcpxTurnFailed, + is_acpx_available, +) from .auth import Auth from .config import load from .runner import Runner from .store import Store +log = logging.getLogger("clawdforge") +if not log.handlers: + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s") + + cfg = load() store = Store(cfg.db_path) auth = Auth( @@ -27,8 +58,42 @@ runner = Runner( default_timeout=cfg.default_timeout_secs, runs_dir=cfg.runs_dir, ) +acpx_manager = AcpxManager( + acpx_bin=cfg.acpx_bin, + sessions_cwd_root=cfg.acpx_sessions_cwd, + max_live_sessions=cfg.max_live_sessions, +) -app = FastAPI(title="clawdforge", version="0.1.0") + +# ---------- lifespan: startup/shutdown --------------------------------------- + + +@asynccontextmanager +async def _lifespan(app: FastAPI): + # Best-effort GC of expired staged files at boot + try: + for p in store.gc_expired_files(): + try: + os.remove(p) + except FileNotFoundError: + pass + except Exception: + pass + + sweeper_task = asyncio.create_task(_session_sweeper()) + log.info("clawdforge startup complete; session sweeper running every %ss", cfg.sweep_interval_secs) + try: + yield + finally: + sweeper_task.cancel() + try: + await sweeper_task + except asyncio.CancelledError: + pass + log.info("clawdforge shutdown: session sweeper stopped") + + +app = FastAPI(title="clawdforge", version="0.2.0", lifespan=_lifespan) # ---------- schemas ---------------------------------------------------------- @@ -47,6 +112,17 @@ class TokenCreateRequest(BaseModel): ip_cidrs: list[str] = Field(default_factory=list) +class CreateSessionRequest(BaseModel): + agent: str = Field(default="claude", min_length=1, max_length=64, pattern=r"^[a-zA-Z0-9_-]+$") + meta: dict | None = None + + +class TurnRequest(BaseModel): + prompt: str = Field(min_length=1) + files: list[str] | None = None + timeout_secs: int | None = Field(default=None, ge=5, le=1800) + + # ---------- endpoints -------------------------------------------------------- @@ -63,7 +139,22 @@ def healthz(request: Request): version = (r.stdout or r.stderr or "").strip().splitlines()[0] if (r.stdout or r.stderr) else None except Exception as e: version = f"err: {e}" - return {"ok": True, "claude_present": found, "claude_version": version} + acpx_present = is_acpx_available(cfg.acpx_bin) + acpx_version = None + if acpx_present: + try: + r = subprocess.run([cfg.acpx_bin, "--version"], capture_output=True, text=True, timeout=5) + acpx_version = (r.stdout or r.stderr or "").strip().splitlines()[0] if (r.stdout or r.stderr) else None + except Exception as e: + acpx_version = f"err: {e}" + return { + "ok": True, + "claude_present": found, + "claude_version": version, + "acpx_present": acpx_present, + "acpx_version": acpx_version, + "open_sessions": acpx_manager.count_open(), + } @app.post("/run") @@ -154,6 +245,212 @@ def upload_file( return {"file_token": file_token, "ttl_secs": ttl_secs, "size": target.stat().st_size} +# ---------- /sessions (v0.2) ------------------------------------------------- + + +@app.post("/sessions") +async def create_session( + request: Request, + body: CreateSessionRequest, + authorization: Annotated[str | None, Header()] = None, +): + rec = auth.require_app(request, authorization) + + try: + sess = await acpx_manager.create(app_name=rec["name"], agent=body.agent) + except AcpxPoolFull as e: + raise HTTPException(503, f"session pool full: {e}") + except AcpxNotInstalled as e: + raise HTTPException(503, f"acpx not installed: {e}") + except AcpxTurnFailed as e: + raise HTTPException(502, f"acpx session create failed: {e}") + + row = store.insert_session( + session_id=sess.session_id, + app_name=rec["name"], + agent=body.agent, + cwd=str(sess.cwd), + meta=body.meta, + ) + store.log_session_event( + session_id=sess.session_id, + app_name=rec["name"], + event="create", + meta={"agent": body.agent}, + ) + return { + "ok": True, + "session_id": sess.session_id, + "agent": body.agent, + "created_at": row["created_at"], + "cwd": str(sess.cwd), + } + + +@app.post("/sessions/{session_id}/turn") +async def session_turn( + session_id: str, + request: Request, + body: TurnRequest, + authorization: Annotated[str | None, Header()] = None, +): + rec = auth.require_app(request, authorization) + + row = store.get_session(session_id) + if not row or row["app_name"] != rec["name"]: + # 404 (not 403) — don't leak that the id exists under another token + raise HTTPException(404, "session not found") + if row["closed_at"] is not None: + raise HTTPException(410, "session is closed") + + # Resolve file_tokens to absolute paths + file_paths: list[str] = [] + if body.files: + for ftoken in body.files: + path = store.resolve_file(ftoken, rec["name"]) + if not path: + raise HTTPException(404, f"unknown or expired file token: {ftoken}") + file_paths.append(path) + + sess = acpx_manager.get(session_id) + if sess is None: + # Ledger thinks it's open but the in-memory handle is gone (e.g. + # post-restart). For v0.2 we surface this as 410 — caller must + # create a new session. Future: re-bind to acpx record on demand. + store.mark_session_closed(session_id) + raise HTTPException(410, "session is no longer live in this process") + + try: + res = await acpx_manager.turn( + session_id=session_id, + prompt=body.prompt, + files=file_paths or None, + timeout_secs=body.timeout_secs, + ) + except AcpxSessionNotFound: + raise HTTPException(404, "session not found") + except AcpxSessionClosed: + raise HTTPException(410, "session is closed") + except AcpxNotInstalled as e: + raise HTTPException(503, f"acpx not installed: {e}") + + if res.ok: + store.mark_session_turn(session_id) + store.log_session_event( + session_id=session_id, + app_name=rec["name"], + event="turn", + duration_ms=res.duration_ms, + meta={ + "ok": res.ok, + "stop_reason": res.stop_reason, + "event_count": len(res.events), + "file_count": len(file_paths), + "error": res.error, + }, + ) + + refreshed = store.get_session(session_id) or row + + if not res.ok: + return JSONResponse( + status_code=502, + content={ + "ok": False, + "session_id": session_id, + "turn_index": refreshed["turn_count"], + "events": res.events, + "stop_reason": res.stop_reason, + "duration_ms": res.duration_ms, + "error": res.error, + "stderr": res.stderr_tail, + }, + ) + + return { + "ok": True, + "session_id": session_id, + "turn_index": refreshed["turn_count"], + "events": res.events, + "stop_reason": res.stop_reason, + "duration_ms": res.duration_ms, + } + + +@app.get("/sessions/{session_id}") +async def session_state( + session_id: str, + request: Request, + authorization: Annotated[str | None, Header()] = None, +): + rec = auth.require_app(request, authorization) + row = store.get_session(session_id) + if not row or row["app_name"] != rec["name"]: + raise HTTPException(404, "session not found") + + sess = acpx_manager.get(session_id) + return { + "ok": True, + "session_id": session_id, + "agent": row["agent"], + "cwd": row["cwd"], + "created_at": row["created_at"], + "last_turn_at": row["last_turn_at"], + "turn_count": row["turn_count"], + "closed_at": row["closed_at"], + "live": sess is not None and not sess.closed, + "meta": row.get("meta"), + } + + +@app.delete("/sessions/{session_id}") +async def close_session( + session_id: str, + request: Request, + authorization: Annotated[str | None, Header()] = None, +): + rec = auth.require_app(request, authorization) + row = store.get_session(session_id) + if not row or row["app_name"] != rec["name"]: + raise HTTPException(404, "session not found") + + if row["closed_at"] is not None: + # Idempotent — already closed is a success. We document this as + # "second close is no-op (200 ok)" rather than 410, so SDKs can + # safely call close() in finally/Drop blocks without try/except. + return {"ok": True, "already_closed": True} + + try: + await acpx_manager.close(session_id) + except Exception as e: + # Even on acpx-side failure, mark closed in the ledger so we don't + # leak the slot. Log for diagnostics. + log.warning("acpx close failed for %s: %s", session_id, e) + + store.mark_session_closed(session_id) + store.log_session_event( + session_id=session_id, + app_name=rec["name"], + event="close", + ) + await acpx_manager.forget(session_id) + return {"ok": True} + + +@app.get("/sessions") +async def list_sessions( + request: Request, + authorization: Annotated[str | None, Header()] = None, + include_closed: bool = True, +): + rec = auth.require_app(request, authorization) + rows = store.list_sessions_for_app(rec["name"], include_closed=include_closed) + return {"ok": True, "sessions": rows, "count": len(rows)} + + +# ---------- /admin/tokens ---------------------------------------------------- + + @app.post("/admin/tokens") def create_token( request: Request, @@ -189,14 +486,74 @@ def revoke_token( return {"ok": True} -@app.on_event("startup") -def _startup_gc(): - # Best-effort GC of expired staged files at boot - try: - for p in store.gc_expired_files(): - try: - os.remove(p) - except FileNotFoundError: - pass - except Exception: - pass +# ---------- background sweeper ----------------------------------------------- + + +async def _session_sweeper() -> None: + """Periodic loop: close stale sessions past TTL, hard-delete past hard-TTL. + + Runs every cfg.sweep_interval_secs. Cancels cleanly on app shutdown. + """ + while True: + try: + await asyncio.sleep(cfg.sweep_interval_secs) + await _sweep_once() + except asyncio.CancelledError: + raise + except Exception as e: + log.warning("session sweeper iteration failed: %s", e) + + +async def _sweep_once() -> dict: + """Single sweep pass. Exposed for tests. + + Returns a counter dict so callers/tests can assert on what happened. + """ + now = int(time.time()) + ttl_cutoff = now - cfg.session_ttl_secs + hard_cutoff = now - cfg.session_hard_ttl_secs + + soft_closed = 0 + hard_deleted = 0 + + # 1. Soft-close stale open sessions + stale = store.find_stale_open_sessions(ttl_cutoff=ttl_cutoff) + for row in stale: + sid = row["session_id"] + try: + await acpx_manager.close(sid) + except Exception as e: + log.warning("sweeper: acpx close failed for %s: %s", sid, e) + store.mark_session_closed(sid, now=now) + store.log_session_event( + session_id=sid, + app_name=row["app_name"], + event="sweep_close", + meta={"reason": "ttl", "ttl_secs": cfg.session_ttl_secs}, + ) + await acpx_manager.forget(sid) + soft_closed += 1 + + # 2. Hard-delete sessions closed long ago + deletable = store.find_hard_deletable_sessions(hard_cutoff=hard_cutoff) + for row in deletable: + sid = row["session_id"] + store.log_session_event( + session_id=sid, + app_name=row["app_name"], + event="hard_delete", + meta={"hard_ttl_secs": cfg.session_hard_ttl_secs}, + ) + store.hard_delete_session(sid) + # The acpx on-disk record under ~/.acpx/sessions/ is left for acpx + # itself to prune via `acpx sessions prune` — we do not own + # that filesystem layout. Our cwd was already cleaned up at close(). + hard_deleted += 1 + + if soft_closed or hard_deleted: + log.info( + "sweep: soft_closed=%d hard_deleted=%d open_now=%d", + soft_closed, hard_deleted, acpx_manager.count_open(), + ) + + return {"soft_closed": soft_closed, "hard_deleted": hard_deleted} diff --git a/clawdforge/store.py b/clawdforge/store.py index c5ae183..7cbee58 100644 --- a/clawdforge/store.py +++ b/clawdforge/store.py @@ -1,4 +1,5 @@ -"""SQLite store for app tokens + run audit log.""" +"""SQLite store for app tokens, run audit log, and (v0.2) ACPX session ledger.""" +import json import sqlite3 import secrets import time @@ -38,6 +39,41 @@ CREATE TABLE IF NOT EXISTS files ( created_at INTEGER NOT NULL, expires_at INTEGER NOT NULL ); + +-- v0.2: ACPX-backed multi-turn sessions ledger. +-- session_id is the value we pass to acpx as `--name` (a UUID we mint). +-- ACPX persists its own session metadata under ~/.acpx/sessions/*.json keyed by +-- (agentCommand, cwd, name); this table is clawdforge's per-app view of which +-- token spawned which session and when it was last used. +CREATE TABLE IF NOT EXISTS sessions ( + session_id TEXT PRIMARY KEY, + app_name TEXT NOT NULL, + agent TEXT NOT NULL DEFAULT 'claude', + cwd TEXT NOT NULL, + created_at INTEGER NOT NULL, + last_turn_at INTEGER, + turn_count INTEGER NOT NULL DEFAULT 0, + closed_at INTEGER, + meta_json TEXT +); + +CREATE INDEX IF NOT EXISTS idx_sessions_app_name ON sessions(app_name); +CREATE INDEX IF NOT EXISTS idx_sessions_closed_at ON sessions(closed_at); + +-- v0.2: append-only audit of session-scoped events (parallel to `runs`). +CREATE TABLE IF NOT EXISTS session_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + app_name TEXT NOT NULL, + event TEXT NOT NULL, + duration_ms INTEGER, + meta_json TEXT, + created_at INTEGER NOT NULL, + FOREIGN KEY (session_id) REFERENCES sessions(session_id) +); + +CREATE INDEX IF NOT EXISTS idx_session_events_sid ON session_events(session_id); +CREATE INDEX IF NOT EXISTS idx_session_events_app ON session_events(app_name); """ @@ -165,3 +201,158 @@ class Store: paths = [r["path"] for r in rows] c.execute("DELETE FROM files WHERE expires_at dict: + now = int(time.time()) + meta_json = json.dumps(meta) if meta is not None else None + with self._conn() as c: + c.execute( + "INSERT INTO sessions (session_id, app_name, agent, cwd, created_at, meta_json) VALUES (?,?,?,?,?,?)", + (session_id, app_name, agent, cwd, now, meta_json), + ) + return { + "session_id": session_id, + "app_name": app_name, + "agent": agent, + "cwd": cwd, + "created_at": now, + "last_turn_at": None, + "turn_count": 0, + "closed_at": None, + "meta": meta, + } + + def get_session(self, session_id: str) -> dict | None: + with self._conn() as c: + row = c.execute( + "SELECT * FROM sessions WHERE session_id=?", (session_id,) + ).fetchone() + if not row: + return None + d = dict(row) + mj = d.pop("meta_json", None) + d["meta"] = json.loads(mj) if mj else None + return d + + def list_sessions_for_app(self, app_name: str, *, include_closed: bool = True) -> list[dict]: + sql = "SELECT * FROM sessions WHERE app_name=?" + params: list = [app_name] + if not include_closed: + sql += " AND closed_at IS NULL" + sql += " ORDER BY created_at DESC" + with self._conn() as c: + rows = c.execute(sql, params).fetchall() + out = [] + for r in rows: + d = dict(r) + mj = d.pop("meta_json", None) + d["meta"] = json.loads(mj) if mj else None + out.append(d) + return out + + def count_open_sessions(self) -> int: + with self._conn() as c: + r = c.execute("SELECT COUNT(*) AS n FROM sessions WHERE closed_at IS NULL").fetchone() + return int(r["n"]) if r else 0 + + def mark_session_turn(self, session_id: str, *, now: int | None = None) -> None: + now = now or int(time.time()) + with self._conn() as c: + c.execute( + "UPDATE sessions SET last_turn_at=?, turn_count=turn_count+1 WHERE session_id=?", + (now, session_id), + ) + + def mark_session_closed(self, session_id: str, *, now: int | None = None) -> bool: + """Returns True if this call was the one that flipped closed_at, False if it was already closed or missing.""" + now = now or int(time.time()) + with self._conn() as c: + cur = c.execute( + "UPDATE sessions SET closed_at=? WHERE session_id=? AND closed_at IS NULL", + (now, session_id), + ) + return cur.rowcount > 0 + + def hard_delete_session(self, session_id: str) -> bool: + with self._conn() as c: + cur = c.execute("DELETE FROM sessions WHERE session_id=?", (session_id,)) + return cur.rowcount > 0 + + def find_stale_open_sessions(self, *, ttl_cutoff: int) -> list[dict]: + """Open sessions where (last_turn_at OR created_at) < ttl_cutoff.""" + with self._conn() as c: + rows = c.execute( + """ + SELECT * FROM sessions + WHERE closed_at IS NULL + AND ( + (last_turn_at IS NOT NULL AND last_turn_at < ?) + OR (last_turn_at IS NULL AND created_at < ?) + ) + """, + (ttl_cutoff, ttl_cutoff), + ).fetchall() + out = [] + for r in rows: + d = dict(r) + mj = d.pop("meta_json", None) + d["meta"] = json.loads(mj) if mj else None + out.append(d) + return out + + def find_hard_deletable_sessions(self, *, hard_cutoff: int) -> list[dict]: + """Closed sessions whose closed_at < hard_cutoff.""" + with self._conn() as c: + rows = c.execute( + "SELECT * FROM sessions WHERE closed_at IS NOT NULL AND closed_at < ?", + (hard_cutoff,), + ).fetchall() + out = [] + for r in rows: + d = dict(r) + mj = d.pop("meta_json", None) + d["meta"] = json.loads(mj) if mj else None + out.append(d) + return out + + # --- session_events (v0.2) --------------------------------------------- + + def log_session_event( + self, + *, + session_id: str, + app_name: str, + event: str, + duration_ms: int | None = None, + meta: dict | None = None, + ) -> None: + with self._conn() as c: + c.execute( + "INSERT INTO session_events (session_id, app_name, event, duration_ms, meta_json, created_at) VALUES (?,?,?,?,?,?)", + ( + session_id, + app_name, + event, + duration_ms, + json.dumps(meta) if meta is not None else None, + int(time.time()), + ), + ) + + def list_session_events(self, session_id: str) -> list[dict]: + with self._conn() as c: + rows = c.execute( + "SELECT * FROM session_events WHERE session_id=? ORDER BY id ASC", + (session_id,), + ).fetchall() + return [dict(r) for r in rows] diff --git a/compose.yml b/compose.yml index 5c0d6e9..34bf159 100644 --- a/compose.yml +++ b/compose.yml @@ -14,6 +14,8 @@ services: - /mnt/user/appdata/clawdforge/data:/data - /mnt/user/appdata/clawdforge/claude-config:/root/.claude - /mnt/user/appdata/clawdforge/claude-alt-config:/root/.config/claude + # acpx persists session metadata to ~/.acpx/sessions/*.json — survives container rebuild + - /mnt/user/appdata/clawdforge/acpx-sessions:/root/.acpx/sessions ports: # LAN-only bind. 8800 picked to live near other forge-y services; bump if collides. - "192.168.0.5:8800:8800" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..4c174d6 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,234 @@ +"""Shared pytest fixtures for the clawdforge server suite. + +Provides: +- Per-test temp DB + acpx-cwd-root via env overrides (config.load is import-time-cached + in server.py, so we monkeypatch the module-level cfg/store/acpx_manager directly). +- A FakeAcpxManager that conforms to AcpxManager's surface but never spawns a real + subprocess. Use it to drive the FastAPI client without an installed `acpx` binary. +- A TestClient configured with an admin token + one app token already minted, so + tests can hit /sessions/* without going through the bootstrap dance. +""" +from __future__ import annotations + +import asyncio +import os +import tempfile +import time +import uuid +from dataclasses import dataclass, field +from pathlib import Path + +import pytest + + +@dataclass +class _FakeSession: + session_id: str + app_name: str + agent: str + cwd: Path + created_at: int + closed: bool = False + last_turn_at: int | None = None + turn_count: int = 0 + + +class FakeAcpxManager: + """Drop-in for AcpxManager that simulates acpx-side behavior in-process. + + Records calls so tests can assert on shape without checking acpx binaries. + """ + + def __init__(self, *, max_live_sessions: int = 32, sessions_cwd_root: str | Path = "/tmp/fake-acpx"): + self.max_live_sessions = max_live_sessions + self.sessions_cwd_root = Path(sessions_cwd_root) + self.sessions_cwd_root.mkdir(parents=True, exist_ok=True) + self._sessions: dict[str, _FakeSession] = {} + self.acpx_bin = "fake-acpx" + # Test injectable: override the next turn() result, or raise. + self.next_turn_events: list[dict] | None = None + self.next_turn_stop_reason: str = "end_turn" + self.next_turn_ok: bool = True + self.next_turn_error: str | None = None + self.calls: list[dict] = [] + + async def create(self, *, app_name: str, agent: str = "claude"): + from clawdforge.acpx_runner import AcpxPoolFull + if self._count_open() >= self.max_live_sessions: + raise AcpxPoolFull(f"max_live_sessions={self.max_live_sessions} reached") + sid = uuid.uuid4().hex + cwd = self.sessions_cwd_root / sid + cwd.mkdir(parents=True, exist_ok=True) + sess = _FakeSession( + session_id=sid, + app_name=app_name, + agent=agent, + cwd=cwd, + created_at=int(time.time()), + ) + self._sessions[sid] = sess + self.calls.append({"op": "create", "session_id": sid, "app_name": app_name}) + return sess + + async def turn(self, *, session_id: str, prompt: str, files=None, timeout_secs=None): + from clawdforge.acpx_runner import ( + AcpxSessionClosed, + AcpxSessionNotFound, + AcpxTurnResult, + ) + sess = self._sessions.get(session_id) + if sess is None: + raise AcpxSessionNotFound(session_id) + if sess.closed: + raise AcpxSessionClosed(session_id) + + self.calls.append({"op": "turn", "session_id": session_id, "prompt": prompt, "files": files}) + + events = self.next_turn_events if self.next_turn_events is not None else [ + { + "jsonrpc": "2.0", + "method": "session/update", + "params": { + "sessionUpdate": "agent_message_chunk", + "content": {"type": "text", "text": "hello"}, + }, + }, + { + "jsonrpc": "2.0", + "id": "req-1", + "result": {"stopReason": self.next_turn_stop_reason}, + }, + ] + + if self.next_turn_ok: + sess.last_turn_at = int(time.time()) + sess.turn_count += 1 + return AcpxTurnResult( + ok=self.next_turn_ok, + events=events, + stop_reason=self.next_turn_stop_reason, + duration_ms=42, + error=self.next_turn_error, + ) + + async def close(self, session_id: str) -> bool: + sess = self._sessions.get(session_id) + if sess is None or sess.closed: + return False + sess.closed = True + self.calls.append({"op": "close", "session_id": session_id}) + return True + + async def forget(self, session_id: str) -> None: + self._sessions.pop(session_id, None) + + def get(self, session_id: str): + return self._sessions.get(session_id) + + def list_for_app(self, app_name: str): + return [s for s in self._sessions.values() if s.app_name == app_name] + + def count_open(self) -> int: + return self._count_open() + + def _count_open(self) -> int: + return sum(1 for s in self._sessions.values() if not s.closed) + + +@pytest.fixture +def tmp_workspace(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Set every clawdforge env var pointing at a fresh tmp dir before importing the server.""" + db_path = tmp_path / "clawdforge.db" + runs_dir = tmp_path / "runs" + acpx_cwds = tmp_path / "acpx-cwds" + runs_dir.mkdir() + acpx_cwds.mkdir() + admin_token = "admin-test-bootstrap" + + monkeypatch.setenv("DB_PATH", str(db_path)) + monkeypatch.setenv("RUNS_DIR", str(runs_dir)) + monkeypatch.setenv("ACPX_SESSIONS_CWD", str(acpx_cwds)) + monkeypatch.setenv("ADMIN_BOOTSTRAP_TOKEN", admin_token) + monkeypatch.setenv("ALLOW_CIDRS", "127.0.0.0/8,::1/128") + monkeypatch.setenv("CLAUDE_BIN", "/bin/true") # /run path won't be hit unless asked + monkeypatch.setenv("ACPX_BIN", "/bin/true") + monkeypatch.setenv("CLAWDFORGE_SESSION_TTL_SECS", "3600") + monkeypatch.setenv("CLAWDFORGE_SESSION_HARD_TTL_SECS", "86400") + monkeypatch.setenv("CLAWDFORGE_SWEEP_INTERVAL_SECS", "60") + monkeypatch.setenv("CLAWDFORGE_MAX_LIVE_SESSIONS", "8") + yield { + "db_path": str(db_path), + "runs_dir": str(runs_dir), + "acpx_cwds": str(acpx_cwds), + "admin_token": admin_token, + } + + +@pytest.fixture +def client(tmp_workspace, monkeypatch: pytest.MonkeyPatch): + """FastAPI TestClient with FakeAcpxManager swapped in. Pre-mints one app token. + + Yields (TestClient, dict) where dict has: admin_token, app_token, app_name, fake_acpx, store. + """ + # Force-reimport so module-level cfg/store/acpx_manager pick up our env. + import importlib + import sys + + for mod_name in [ + "clawdforge.server", + "clawdforge.acpx_runner", + "clawdforge.config", + "clawdforge.store", + "clawdforge.runner", + "clawdforge.auth", + "clawdforge", + ]: + sys.modules.pop(mod_name, None) + + from clawdforge import server as srv # noqa: WPS433 + from clawdforge import auth as auth_mod + + # FastAPI's TestClient reports `request.client.host == "testclient"` which + # fails the IP allowlist check. Force loopback for tests. + monkeypatch.setattr(auth_mod, "_client_ip", lambda _req: "127.0.0.1") + + # Replace the real AcpxManager with our fake. The server module already + # holds a reference; rebind it. + fake = FakeAcpxManager( + max_live_sessions=srv.cfg.max_live_sessions, + sessions_cwd_root=srv.cfg.acpx_sessions_cwd, + ) + srv.acpx_manager = fake + + from fastapi.testclient import TestClient + with TestClient(srv.app) as tc: + # Mint an app token via admin endpoint + app_name = "testapp" + r = tc.post( + "/admin/tokens", + headers={"Authorization": f"Bearer {tmp_workspace['admin_token']}"}, + json={"name": app_name, "ip_cidrs": []}, + ) + assert r.status_code == 200, r.text + app_token = r.json()["token"] + + # Mint a second app token for isolation tests + r2 = tc.post( + "/admin/tokens", + headers={"Authorization": f"Bearer {tmp_workspace['admin_token']}"}, + json={"name": "otherapp", "ip_cidrs": []}, + ) + assert r2.status_code == 200, r2.text + other_token = r2.json()["token"] + + yield tc, { + "admin_token": tmp_workspace["admin_token"], + "app_token": app_token, + "app_name": app_name, + "other_token": other_token, + "other_name": "otherapp", + "fake_acpx": fake, + "store": srv.store, + "cfg": srv.cfg, + "server": srv, + } diff --git a/tests/test_sessions.py b/tests/test_sessions.py new file mode 100644 index 0000000..e1fb6fb --- /dev/null +++ b/tests/test_sessions.py @@ -0,0 +1,434 @@ +"""Smoke tests for the v0.2 multi-turn /sessions surface. + +Coverage: +- /sessions create requires bearer auth +- /sessions create returns a non-empty session_id +- One full turn round-trip via FakeAcpxManager (real acpx not required) +- Per-app isolation: token A cannot see token B's session (404, NOT 403) +- /sessions/{id} DELETE is idempotent (second call is no-op success) +- /sessions list filters strictly by app_name +- TTL sweeper closes stale sessions +- /run regression: existing v0.1 surface byte-shape stays intact +""" +from __future__ import annotations + +import asyncio +import time +from unittest.mock import patch + +import pytest + + +# ---- /sessions auth + create ----------------------------------------------- + + +def test_create_session_requires_auth(client): + tc, _ = client + r = tc.post("/sessions", json={"agent": "claude"}) + assert r.status_code == 401, r.text + + +def test_create_session_returns_id(client): + tc, ctx = client + r = tc.post( + "/sessions", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"agent": "claude"}, + ) + assert r.status_code == 200, r.text + body = r.json() + assert body["ok"] is True + assert isinstance(body["session_id"], str) and len(body["session_id"]) >= 16 + assert body["agent"] == "claude" + # Ledger row should exist + row = ctx["store"].get_session(body["session_id"]) + assert row is not None + assert row["app_name"] == ctx["app_name"] + + +def test_create_session_with_meta(client): + tc, ctx = client + r = tc.post( + "/sessions", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"agent": "claude", "meta": {"purpose": "smoke"}}, + ) + assert r.status_code == 200 + sid = r.json()["session_id"] + row = ctx["store"].get_session(sid) + assert row["meta"] == {"purpose": "smoke"} + + +# ---- turn round-trip -------------------------------------------------------- + + +def test_turn_round_trip(client): + tc, ctx = client + r = tc.post( + "/sessions", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"agent": "claude"}, + ) + sid = r.json()["session_id"] + + # Inject a controlled set of events for the turn + ctx["fake_acpx"].next_turn_events = [ + { + "jsonrpc": "2.0", + "method": "session/update", + "params": { + "sessionUpdate": "agent_message_chunk", + "content": {"type": "text", "text": "hello"}, + }, + }, + {"jsonrpc": "2.0", "id": "req-1", "result": {"stopReason": "end_turn"}}, + ] + + r2 = tc.post( + f"/sessions/{sid}/turn", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"prompt": "say hello"}, + ) + assert r2.status_code == 200, r2.text + body = r2.json() + assert body["ok"] is True + assert body["session_id"] == sid + assert body["stop_reason"] == "end_turn" + assert body["turn_index"] == 1 + assert isinstance(body["events"], list) and len(body["events"]) == 2 + # Must contain the expected text event + chunk = next( + e for e in body["events"] + if e.get("method") == "session/update" + ) + assert chunk["params"]["content"]["text"] == "hello" + + # State endpoint reflects the turn + r3 = tc.get( + f"/sessions/{sid}", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + ) + assert r3.status_code == 200 + assert r3.json()["turn_count"] == 1 + assert r3.json()["last_turn_at"] is not None + + +# ---- per-app isolation ----------------------------------------------------- + + +def test_session_isolation_404(client): + tc, ctx = client + # token A creates a session + r = tc.post( + "/sessions", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"agent": "claude"}, + ) + sid = r.json()["session_id"] + + # token B asks for it: must be 404 (NOT 403, no existence leak) + for path in [f"/sessions/{sid}", f"/sessions/{sid}/turn"]: + if path.endswith("/turn"): + r2 = tc.post( + path, + headers={"Authorization": f"Bearer {ctx['other_token']}"}, + json={"prompt": "hi"}, + ) + else: + r2 = tc.get( + path, + headers={"Authorization": f"Bearer {ctx['other_token']}"}, + ) + assert r2.status_code == 404, f"{path} returned {r2.status_code}: {r2.text}" + + # And DELETE: same rule + r3 = tc.delete( + f"/sessions/{sid}", + headers={"Authorization": f"Bearer {ctx['other_token']}"}, + ) + assert r3.status_code == 404 + + +# ---- close idempotency ----------------------------------------------------- + + +def test_close_session_idempotent(client): + tc, ctx = client + r = tc.post( + "/sessions", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"agent": "claude"}, + ) + sid = r.json()["session_id"] + + r1 = tc.delete( + f"/sessions/{sid}", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + ) + assert r1.status_code == 200 + assert r1.json()["ok"] is True + + # Second close: must be a 200 success no-op (we documented this in + # server.py and SDKs rely on it for safe Drop/finally usage). + r2 = tc.delete( + f"/sessions/{sid}", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + ) + assert r2.status_code == 200 + assert r2.json().get("already_closed") is True + + +def test_turn_after_close_returns_410(client): + tc, ctx = client + r = tc.post( + "/sessions", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"agent": "claude"}, + ) + sid = r.json()["session_id"] + + tc.delete( + f"/sessions/{sid}", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + ) + + r2 = tc.post( + f"/sessions/{sid}/turn", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"prompt": "hi"}, + ) + assert r2.status_code == 410 + + +# ---- list filtering -------------------------------------------------------- + + +def test_list_sessions_filters_by_app_name(client): + tc, ctx = client + # token A creates two + sids_a = [] + for _ in range(2): + r = tc.post( + "/sessions", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"agent": "claude"}, + ) + sids_a.append(r.json()["session_id"]) + + # token B creates one + rb = tc.post( + "/sessions", + headers={"Authorization": f"Bearer {ctx['other_token']}"}, + json={"agent": "claude"}, + ) + sid_b = rb.json()["session_id"] + + # token A list shows only its 2 + la = tc.get( + "/sessions", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + ) + assert la.status_code == 200 + body_a = la.json() + assert body_a["count"] == 2 + listed_a = {s["session_id"] for s in body_a["sessions"]} + assert listed_a == set(sids_a) + assert sid_b not in listed_a + + # token B list shows only its 1 + lb = tc.get( + "/sessions", + headers={"Authorization": f"Bearer {ctx['other_token']}"}, + ) + assert lb.status_code == 200 + body_b = lb.json() + assert body_b["count"] == 1 + assert body_b["sessions"][0]["session_id"] == sid_b + + +# ---- TTL sweeper ----------------------------------------------------------- + + +def test_ttl_sweep_closes_stale(client): + tc, ctx = client + r = tc.post( + "/sessions", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"agent": "claude"}, + ) + sid = r.json()["session_id"] + + # Backdate created_at so the row is "stale" relative to the configured TTL + store = ctx["store"] + cfg = ctx["cfg"] + fake_now = int(time.time()) + backdated = fake_now - cfg.session_ttl_secs - 60 + with store._conn() as c: + c.execute( + "UPDATE sessions SET created_at=? WHERE session_id=?", + (backdated, sid), + ) + + # Run one sweep iteration directly (no need to wait for the task) + server_mod = ctx["server"] + counts = asyncio.get_event_loop().run_until_complete(server_mod._sweep_once()) + assert counts["soft_closed"] == 1 + + row = store.get_session(sid) + assert row["closed_at"] is not None + + # Audit event recorded + events = store.list_session_events(sid) + kinds = [e["event"] for e in events] + assert "create" in kinds + assert "sweep_close" in kinds + + +def test_ttl_sweep_hard_deletes_old_closed(client): + tc, ctx = client + r = tc.post( + "/sessions", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"agent": "claude"}, + ) + sid = r.json()["session_id"] + + store = ctx["store"] + cfg = ctx["cfg"] + closed_long_ago = int(time.time()) - cfg.session_hard_ttl_secs - 60 + with store._conn() as c: + c.execute( + "UPDATE sessions SET closed_at=? WHERE session_id=?", + (closed_long_ago, sid), + ) + + server_mod = ctx["server"] + counts = asyncio.get_event_loop().run_until_complete(server_mod._sweep_once()) + assert counts["hard_deleted"] == 1 + assert store.get_session(sid) is None + + +# ---- pool full ------------------------------------------------------------- + + +def test_pool_full_returns_503(client, monkeypatch): + tc, ctx = client + fake = ctx["fake_acpx"] + monkeypatch.setattr(fake, "max_live_sessions", 2) + # First two create OK + for _ in range(2): + r = tc.post( + "/sessions", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"agent": "claude"}, + ) + assert r.status_code == 200 + + # Third hits the pool cap + r3 = tc.post( + "/sessions", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"agent": "claude"}, + ) + assert r3.status_code == 503 + + +# ---- /run regression ------------------------------------------------------- + + +def test_run_endpoint_unchanged(client, monkeypatch): + """The /run path stays on the bare claude-p subprocess and v0.1 shape.""" + tc, ctx = client + + # We don't actually want to invoke claude. Patch Runner.run to return a + # canned RunResult that mirrors the v0.1 shape. + from clawdforge.runner import RunResult + server_mod = ctx["server"] + + canned = RunResult( + ok=True, + result={"hello": "world"}, + raw_stdout='{"type":"result","result":"{\\"hello\\":\\"world\\"}","stop_reason":"end_turn"}', + raw_stderr="", + duration_ms=123, + stop_reason="end_turn", + error=None, + ) + monkeypatch.setattr(server_mod.runner, "run", lambda **kw: canned) + + r = tc.post( + "/run", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"prompt": "Reply with JSON: {\"hello\":\"world\"}", "model": "sonnet"}, + ) + assert r.status_code == 200 + body = r.json() + # Exact v0.1 shape: ok, result, duration_ms, stop_reason — nothing else added + assert set(body.keys()) == {"ok", "result", "duration_ms", "stop_reason"} + assert body["ok"] is True + assert body["result"] == {"hello": "world"} + assert body["stop_reason"] == "end_turn" + assert body["duration_ms"] == 123 + + +def test_run_endpoint_unchanged_error_shape(client, monkeypatch): + """v0.1's error shape (502 with ok/error/stderr/duration_ms/stop_reason) preserved.""" + tc, ctx = client + from clawdforge.runner import RunResult + server_mod = ctx["server"] + + canned = RunResult( + ok=False, + result=None, + raw_stdout="", + raw_stderr="boom", + duration_ms=10, + stop_reason="error", + error="claude exit 1", + ) + monkeypatch.setattr(server_mod.runner, "run", lambda **kw: canned) + + r = tc.post( + "/run", + headers={"Authorization": f"Bearer {ctx['app_token']}"}, + json={"prompt": "x"}, + ) + assert r.status_code == 502 + body = r.json() + assert set(body.keys()) >= {"ok", "error", "stderr", "duration_ms", "stop_reason"} + assert body["ok"] is False + + +# ---- acpx_runner unit-level helpers ---------------------------------------- + + +def test_extract_stop_reason_from_ndjson(): + from clawdforge.acpx_runner import _extract_stop_reason, _parse_ndjson + + raw = ( + '{"jsonrpc":"2.0","id":"req-1","method":"session/prompt","params":{}}\n' + '{"jsonrpc":"2.0","method":"session/update","params":{"sessionUpdate":"agent_message_chunk","content":{"type":"text","text":"Hi"}}}\n' + '{"jsonrpc":"2.0","id":"req-1","result":{"stopReason":"end_turn"}}\n' + ) + events = _parse_ndjson(raw) + assert len(events) == 3 + assert _extract_stop_reason(events) == "end_turn" + + +def test_parse_ndjson_skips_garbage(): + from clawdforge.acpx_runner import _parse_ndjson + + raw = '{"a":1}\nnot json\n{"b":2}\n\n \n{"c":3}\n' + events = _parse_ndjson(raw) + assert events == [{"a": 1}, {"b": 2}, {"c": 3}] + + +def test_format_prompt_with_files(): + from clawdforge.acpx_runner import _format_prompt_with_files + + plain = _format_prompt_with_files("hello", None) + assert plain == "hello" + + annotated = _format_prompt_with_files("hello", ["/data/x.txt", "/data/y.txt"]) + assert "/data/x.txt" in annotated and "/data/y.txt" in annotated + assert annotated.endswith("hello")