clawdforge/tests/test_sessions.py
Kayos 940861f70a v0.2: multi-turn /sessions endpoints backed by ACPX
- Dockerfile: install acpx@latest alongside @anthropic-ai/claude-code
- compose.yml: bind /mnt/user/appdata/clawdforge/acpx-sessions:/root/.acpx/sessions
- DB: additive sessions + session_events tables in store.py SCHEMA
- clawdforge/acpx_runner.py: AcpxManager + AcpxSession, bounded async pool,
  per-invocation subprocess model (acpx CLI itself owns the queue-owner
  lifecycle, so each turn = one fresh `acpx prompt -s <uuid>` call)
- server.py: POST/GET/DELETE /sessions + POST /sessions/{id}/turn + GET /sessions
- Per-app isolation: 404 (not 403) on cross-token session access — no
  existence leak across tokens
- Lifespan-managed TTL sweeper: every 60s soft-closes idle sessions past
  CLAWDFORGE_SESSION_TTL_SECS (1h default), hard-deletes ledger rows past
  CLAWDFORGE_SESSION_HARD_TTL_SECS (24h default)
- session_events audit table parallel to existing runs table
  (events: create, turn, close, sweep_close, hard_delete)
- /healthz now reports acpx_present + acpx_version + open_sessions count
- tests/test_sessions.py: 16 tests covering create/turn/close/list/isolation/
  sweep/pool-full/regression. /run regression test asserts byte-identical
  v0.1 response shape.

ACPX research notes (v0.6.1, openclaw/acpx):
- npm package is `acpx`, not `@openclaw/acpx`
- Sessions are scoped by (agentCommand, cwd, name?). We mint our own UUID
  as `--name` and give every session a unique cwd subdir, so the scope key
  is collision-free across apps.
- session_id source: ours. We pass --name <uuid>, ACPX records it under
  ~/.acpx/sessions/<encoded-id>.json. We never need to parse ACPX's
  acpxRecordId — our UUID is canonical.
- Subprocess lifetime: per-invocation, NOT per-session. The acpx CLI itself
  spawns/maintains a per-session "queue owner" process via local IPC; each
  `acpx prompt` call we make either elects itself owner or enqueues. The
  AcpxSession class is therefore a thin (uuid, cwd, asyncio.Lock) handle,
  not a long-lived stdio pipe. The spec's "owns one stdio pipe pair" model
  was rewritten to match reality — flagged here.
- Close semantics: soft-close via `acpx sessions close <name>`. The
  on-disk record stays (ACPX's `sessions prune` is the hard-delete path,
  not invoked from clawdforge). DELETE /sessions/<id> is documented as
  idempotent (200 with already_closed=true on second call) so SDKs can
  call close() in finally/Drop blocks safely.
- File uploads: ACPX has no file-attach ACP method exposed via the CLI.
  We prepend a [Attached files] header listing absolute paths; the agent
  uses its Read tool to open them. Same behavior as /run --files in v0.1.
- Permissions: --approve-all on the turn invocation since the container is
  unattended and callers are bearer-token-trusted. Future v0.3 may expose
  a per-session permission policy.

/run endpoint unchanged — backwards compat verified by
test_run_endpoint_unchanged + test_run_endpoint_unchanged_error_shape.

Spec: memory/spec-clawdforge-v0.2.md
ACPX CLI ref: https://github.com/openclaw/acpx/blob/main/docs/CLI.md
2026-04-29 06:22:55 -07:00

434 lines
13 KiB
Python

"""Smoke tests for the v0.2 multi-turn /sessions surface.
Coverage:
- /sessions create requires bearer auth
- /sessions create returns a non-empty session_id
- One full turn round-trip via FakeAcpxManager (real acpx not required)
- Per-app isolation: token A cannot see token B's session (404, NOT 403)
- /sessions/{id} DELETE is idempotent (second call is no-op success)
- /sessions list filters strictly by app_name
- TTL sweeper closes stale sessions
- /run regression: existing v0.1 surface byte-shape stays intact
"""
from __future__ import annotations
import asyncio
import time
from unittest.mock import patch
import pytest
# ---- /sessions auth + create -----------------------------------------------
def test_create_session_requires_auth(client):
tc, _ = client
r = tc.post("/sessions", json={"agent": "claude"})
assert r.status_code == 401, r.text
def test_create_session_returns_id(client):
tc, ctx = client
r = tc.post(
"/sessions",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"agent": "claude"},
)
assert r.status_code == 200, r.text
body = r.json()
assert body["ok"] is True
assert isinstance(body["session_id"], str) and len(body["session_id"]) >= 16
assert body["agent"] == "claude"
# Ledger row should exist
row = ctx["store"].get_session(body["session_id"])
assert row is not None
assert row["app_name"] == ctx["app_name"]
def test_create_session_with_meta(client):
tc, ctx = client
r = tc.post(
"/sessions",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"agent": "claude", "meta": {"purpose": "smoke"}},
)
assert r.status_code == 200
sid = r.json()["session_id"]
row = ctx["store"].get_session(sid)
assert row["meta"] == {"purpose": "smoke"}
# ---- turn round-trip --------------------------------------------------------
def test_turn_round_trip(client):
tc, ctx = client
r = tc.post(
"/sessions",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"agent": "claude"},
)
sid = r.json()["session_id"]
# Inject a controlled set of events for the turn
ctx["fake_acpx"].next_turn_events = [
{
"jsonrpc": "2.0",
"method": "session/update",
"params": {
"sessionUpdate": "agent_message_chunk",
"content": {"type": "text", "text": "hello"},
},
},
{"jsonrpc": "2.0", "id": "req-1", "result": {"stopReason": "end_turn"}},
]
r2 = tc.post(
f"/sessions/{sid}/turn",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"prompt": "say hello"},
)
assert r2.status_code == 200, r2.text
body = r2.json()
assert body["ok"] is True
assert body["session_id"] == sid
assert body["stop_reason"] == "end_turn"
assert body["turn_index"] == 1
assert isinstance(body["events"], list) and len(body["events"]) == 2
# Must contain the expected text event
chunk = next(
e for e in body["events"]
if e.get("method") == "session/update"
)
assert chunk["params"]["content"]["text"] == "hello"
# State endpoint reflects the turn
r3 = tc.get(
f"/sessions/{sid}",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
)
assert r3.status_code == 200
assert r3.json()["turn_count"] == 1
assert r3.json()["last_turn_at"] is not None
# ---- per-app isolation -----------------------------------------------------
def test_session_isolation_404(client):
tc, ctx = client
# token A creates a session
r = tc.post(
"/sessions",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"agent": "claude"},
)
sid = r.json()["session_id"]
# token B asks for it: must be 404 (NOT 403, no existence leak)
for path in [f"/sessions/{sid}", f"/sessions/{sid}/turn"]:
if path.endswith("/turn"):
r2 = tc.post(
path,
headers={"Authorization": f"Bearer {ctx['other_token']}"},
json={"prompt": "hi"},
)
else:
r2 = tc.get(
path,
headers={"Authorization": f"Bearer {ctx['other_token']}"},
)
assert r2.status_code == 404, f"{path} returned {r2.status_code}: {r2.text}"
# And DELETE: same rule
r3 = tc.delete(
f"/sessions/{sid}",
headers={"Authorization": f"Bearer {ctx['other_token']}"},
)
assert r3.status_code == 404
# ---- close idempotency -----------------------------------------------------
def test_close_session_idempotent(client):
tc, ctx = client
r = tc.post(
"/sessions",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"agent": "claude"},
)
sid = r.json()["session_id"]
r1 = tc.delete(
f"/sessions/{sid}",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
)
assert r1.status_code == 200
assert r1.json()["ok"] is True
# Second close: must be a 200 success no-op (we documented this in
# server.py and SDKs rely on it for safe Drop/finally usage).
r2 = tc.delete(
f"/sessions/{sid}",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
)
assert r2.status_code == 200
assert r2.json().get("already_closed") is True
def test_turn_after_close_returns_410(client):
tc, ctx = client
r = tc.post(
"/sessions",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"agent": "claude"},
)
sid = r.json()["session_id"]
tc.delete(
f"/sessions/{sid}",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
)
r2 = tc.post(
f"/sessions/{sid}/turn",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"prompt": "hi"},
)
assert r2.status_code == 410
# ---- list filtering --------------------------------------------------------
def test_list_sessions_filters_by_app_name(client):
tc, ctx = client
# token A creates two
sids_a = []
for _ in range(2):
r = tc.post(
"/sessions",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"agent": "claude"},
)
sids_a.append(r.json()["session_id"])
# token B creates one
rb = tc.post(
"/sessions",
headers={"Authorization": f"Bearer {ctx['other_token']}"},
json={"agent": "claude"},
)
sid_b = rb.json()["session_id"]
# token A list shows only its 2
la = tc.get(
"/sessions",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
)
assert la.status_code == 200
body_a = la.json()
assert body_a["count"] == 2
listed_a = {s["session_id"] for s in body_a["sessions"]}
assert listed_a == set(sids_a)
assert sid_b not in listed_a
# token B list shows only its 1
lb = tc.get(
"/sessions",
headers={"Authorization": f"Bearer {ctx['other_token']}"},
)
assert lb.status_code == 200
body_b = lb.json()
assert body_b["count"] == 1
assert body_b["sessions"][0]["session_id"] == sid_b
# ---- TTL sweeper -----------------------------------------------------------
def test_ttl_sweep_closes_stale(client):
tc, ctx = client
r = tc.post(
"/sessions",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"agent": "claude"},
)
sid = r.json()["session_id"]
# Backdate created_at so the row is "stale" relative to the configured TTL
store = ctx["store"]
cfg = ctx["cfg"]
fake_now = int(time.time())
backdated = fake_now - cfg.session_ttl_secs - 60
with store._conn() as c:
c.execute(
"UPDATE sessions SET created_at=? WHERE session_id=?",
(backdated, sid),
)
# Run one sweep iteration directly (no need to wait for the task)
server_mod = ctx["server"]
counts = asyncio.get_event_loop().run_until_complete(server_mod._sweep_once())
assert counts["soft_closed"] == 1
row = store.get_session(sid)
assert row["closed_at"] is not None
# Audit event recorded
events = store.list_session_events(sid)
kinds = [e["event"] for e in events]
assert "create" in kinds
assert "sweep_close" in kinds
def test_ttl_sweep_hard_deletes_old_closed(client):
tc, ctx = client
r = tc.post(
"/sessions",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"agent": "claude"},
)
sid = r.json()["session_id"]
store = ctx["store"]
cfg = ctx["cfg"]
closed_long_ago = int(time.time()) - cfg.session_hard_ttl_secs - 60
with store._conn() as c:
c.execute(
"UPDATE sessions SET closed_at=? WHERE session_id=?",
(closed_long_ago, sid),
)
server_mod = ctx["server"]
counts = asyncio.get_event_loop().run_until_complete(server_mod._sweep_once())
assert counts["hard_deleted"] == 1
assert store.get_session(sid) is None
# ---- pool full -------------------------------------------------------------
def test_pool_full_returns_503(client, monkeypatch):
tc, ctx = client
fake = ctx["fake_acpx"]
monkeypatch.setattr(fake, "max_live_sessions", 2)
# First two create OK
for _ in range(2):
r = tc.post(
"/sessions",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"agent": "claude"},
)
assert r.status_code == 200
# Third hits the pool cap
r3 = tc.post(
"/sessions",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"agent": "claude"},
)
assert r3.status_code == 503
# ---- /run regression -------------------------------------------------------
def test_run_endpoint_unchanged(client, monkeypatch):
"""The /run path stays on the bare claude-p subprocess and v0.1 shape."""
tc, ctx = client
# We don't actually want to invoke claude. Patch Runner.run to return a
# canned RunResult that mirrors the v0.1 shape.
from clawdforge.runner import RunResult
server_mod = ctx["server"]
canned = RunResult(
ok=True,
result={"hello": "world"},
raw_stdout='{"type":"result","result":"{\\"hello\\":\\"world\\"}","stop_reason":"end_turn"}',
raw_stderr="",
duration_ms=123,
stop_reason="end_turn",
error=None,
)
monkeypatch.setattr(server_mod.runner, "run", lambda **kw: canned)
r = tc.post(
"/run",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"prompt": "Reply with JSON: {\"hello\":\"world\"}", "model": "sonnet"},
)
assert r.status_code == 200
body = r.json()
# Exact v0.1 shape: ok, result, duration_ms, stop_reason — nothing else added
assert set(body.keys()) == {"ok", "result", "duration_ms", "stop_reason"}
assert body["ok"] is True
assert body["result"] == {"hello": "world"}
assert body["stop_reason"] == "end_turn"
assert body["duration_ms"] == 123
def test_run_endpoint_unchanged_error_shape(client, monkeypatch):
"""v0.1's error shape (502 with ok/error/stderr/duration_ms/stop_reason) preserved."""
tc, ctx = client
from clawdforge.runner import RunResult
server_mod = ctx["server"]
canned = RunResult(
ok=False,
result=None,
raw_stdout="",
raw_stderr="boom",
duration_ms=10,
stop_reason="error",
error="claude exit 1",
)
monkeypatch.setattr(server_mod.runner, "run", lambda **kw: canned)
r = tc.post(
"/run",
headers={"Authorization": f"Bearer {ctx['app_token']}"},
json={"prompt": "x"},
)
assert r.status_code == 502
body = r.json()
assert set(body.keys()) >= {"ok", "error", "stderr", "duration_ms", "stop_reason"}
assert body["ok"] is False
# ---- acpx_runner unit-level helpers ----------------------------------------
def test_extract_stop_reason_from_ndjson():
from clawdforge.acpx_runner import _extract_stop_reason, _parse_ndjson
raw = (
'{"jsonrpc":"2.0","id":"req-1","method":"session/prompt","params":{}}\n'
'{"jsonrpc":"2.0","method":"session/update","params":{"sessionUpdate":"agent_message_chunk","content":{"type":"text","text":"Hi"}}}\n'
'{"jsonrpc":"2.0","id":"req-1","result":{"stopReason":"end_turn"}}\n'
)
events = _parse_ndjson(raw)
assert len(events) == 3
assert _extract_stop_reason(events) == "end_turn"
def test_parse_ndjson_skips_garbage():
from clawdforge.acpx_runner import _parse_ndjson
raw = '{"a":1}\nnot json\n{"b":2}\n\n \n{"c":3}\n'
events = _parse_ndjson(raw)
assert events == [{"a": 1}, {"b": 2}, {"c": 3}]
def test_format_prompt_with_files():
from clawdforge.acpx_runner import _format_prompt_with_files
plain = _format_prompt_with_files("hello", None)
assert plain == "hello"
annotated = _format_prompt_with_files("hello", ["/data/x.txt", "/data/y.txt"])
assert "/data/x.txt" in annotated and "/data/y.txt" in annotated
assert annotated.endswith("hello")