"""Smoke tests for the v0.2 multi-turn /sessions surface. Coverage: - /sessions create requires bearer auth - /sessions create returns a non-empty session_id - One full turn round-trip via FakeAcpxManager (real acpx not required) - Per-app isolation: token A cannot see token B's session (404, NOT 403) - /sessions/{id} DELETE is idempotent (second call is no-op success) - /sessions list filters strictly by app_name - TTL sweeper closes stale sessions - /run regression: existing v0.1 surface byte-shape stays intact """ from __future__ import annotations import asyncio import time from unittest.mock import patch import pytest # ---- /sessions auth + create ----------------------------------------------- def test_create_session_requires_auth(client): tc, _ = client r = tc.post("/sessions", json={"agent": "claude"}) assert r.status_code == 401, r.text def test_create_session_returns_id(client): tc, ctx = client r = tc.post( "/sessions", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"agent": "claude"}, ) assert r.status_code == 200, r.text body = r.json() assert body["ok"] is True assert isinstance(body["session_id"], str) and len(body["session_id"]) >= 16 assert body["agent"] == "claude" # Ledger row should exist row = ctx["store"].get_session(body["session_id"]) assert row is not None assert row["app_name"] == ctx["app_name"] def test_create_session_with_meta(client): tc, ctx = client r = tc.post( "/sessions", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"agent": "claude", "meta": {"purpose": "smoke"}}, ) assert r.status_code == 200 sid = r.json()["session_id"] row = ctx["store"].get_session(sid) assert row["meta"] == {"purpose": "smoke"} # ---- turn round-trip -------------------------------------------------------- def test_turn_round_trip(client): tc, ctx = client r = tc.post( "/sessions", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"agent": "claude"}, ) sid = r.json()["session_id"] # Inject a controlled set of events for the turn ctx["fake_acpx"].next_turn_events = [ { "jsonrpc": "2.0", "method": "session/update", "params": { "sessionUpdate": "agent_message_chunk", "content": {"type": "text", "text": "hello"}, }, }, {"jsonrpc": "2.0", "id": "req-1", "result": {"stopReason": "end_turn"}}, ] r2 = tc.post( f"/sessions/{sid}/turn", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"prompt": "say hello"}, ) assert r2.status_code == 200, r2.text body = r2.json() assert body["ok"] is True assert body["session_id"] == sid assert body["stop_reason"] == "end_turn" assert body["turn_index"] == 1 assert isinstance(body["events"], list) and len(body["events"]) == 2 # Must contain the expected text event chunk = next( e for e in body["events"] if e.get("method") == "session/update" ) assert chunk["params"]["content"]["text"] == "hello" # State endpoint reflects the turn r3 = tc.get( f"/sessions/{sid}", headers={"Authorization": f"Bearer {ctx['app_token']}"}, ) assert r3.status_code == 200 assert r3.json()["turn_count"] == 1 assert r3.json()["last_turn_at"] is not None # ---- per-app isolation ----------------------------------------------------- def test_session_isolation_404(client): tc, ctx = client # token A creates a session r = tc.post( "/sessions", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"agent": "claude"}, ) sid = r.json()["session_id"] # token B asks for it: must be 404 (NOT 403, no existence leak) for path in [f"/sessions/{sid}", f"/sessions/{sid}/turn"]: if path.endswith("/turn"): r2 = tc.post( path, headers={"Authorization": f"Bearer {ctx['other_token']}"}, json={"prompt": "hi"}, ) else: r2 = tc.get( path, headers={"Authorization": f"Bearer {ctx['other_token']}"}, ) assert r2.status_code == 404, f"{path} returned {r2.status_code}: {r2.text}" # And DELETE: same rule r3 = tc.delete( f"/sessions/{sid}", headers={"Authorization": f"Bearer {ctx['other_token']}"}, ) assert r3.status_code == 404 # ---- close idempotency ----------------------------------------------------- def test_close_session_idempotent(client): tc, ctx = client r = tc.post( "/sessions", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"agent": "claude"}, ) sid = r.json()["session_id"] r1 = tc.delete( f"/sessions/{sid}", headers={"Authorization": f"Bearer {ctx['app_token']}"}, ) assert r1.status_code == 200 assert r1.json()["ok"] is True # Second close: must be a 200 success no-op (we documented this in # server.py and SDKs rely on it for safe Drop/finally usage). r2 = tc.delete( f"/sessions/{sid}", headers={"Authorization": f"Bearer {ctx['app_token']}"}, ) assert r2.status_code == 200 assert r2.json().get("already_closed") is True def test_turn_after_close_returns_410(client): tc, ctx = client r = tc.post( "/sessions", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"agent": "claude"}, ) sid = r.json()["session_id"] tc.delete( f"/sessions/{sid}", headers={"Authorization": f"Bearer {ctx['app_token']}"}, ) r2 = tc.post( f"/sessions/{sid}/turn", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"prompt": "hi"}, ) assert r2.status_code == 410 # ---- list filtering -------------------------------------------------------- def test_list_sessions_filters_by_app_name(client): tc, ctx = client # token A creates two sids_a = [] for _ in range(2): r = tc.post( "/sessions", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"agent": "claude"}, ) sids_a.append(r.json()["session_id"]) # token B creates one rb = tc.post( "/sessions", headers={"Authorization": f"Bearer {ctx['other_token']}"}, json={"agent": "claude"}, ) sid_b = rb.json()["session_id"] # token A list shows only its 2 la = tc.get( "/sessions", headers={"Authorization": f"Bearer {ctx['app_token']}"}, ) assert la.status_code == 200 body_a = la.json() assert body_a["count"] == 2 listed_a = {s["session_id"] for s in body_a["sessions"]} assert listed_a == set(sids_a) assert sid_b not in listed_a # token B list shows only its 1 lb = tc.get( "/sessions", headers={"Authorization": f"Bearer {ctx['other_token']}"}, ) assert lb.status_code == 200 body_b = lb.json() assert body_b["count"] == 1 assert body_b["sessions"][0]["session_id"] == sid_b # ---- TTL sweeper ----------------------------------------------------------- def test_ttl_sweep_closes_stale(client): tc, ctx = client r = tc.post( "/sessions", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"agent": "claude"}, ) sid = r.json()["session_id"] # Backdate created_at so the row is "stale" relative to the configured TTL store = ctx["store"] cfg = ctx["cfg"] fake_now = int(time.time()) backdated = fake_now - cfg.session_ttl_secs - 60 with store._conn() as c: c.execute( "UPDATE sessions SET created_at=? WHERE session_id=?", (backdated, sid), ) # Run one sweep iteration directly (no need to wait for the task) server_mod = ctx["server"] counts = asyncio.get_event_loop().run_until_complete(server_mod._sweep_once()) assert counts["soft_closed"] == 1 row = store.get_session(sid) assert row["closed_at"] is not None # Audit event recorded events = store.list_session_events(sid) kinds = [e["event"] for e in events] assert "create" in kinds assert "sweep_close" in kinds def test_ttl_sweep_hard_deletes_old_closed(client): tc, ctx = client r = tc.post( "/sessions", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"agent": "claude"}, ) sid = r.json()["session_id"] store = ctx["store"] cfg = ctx["cfg"] closed_long_ago = int(time.time()) - cfg.session_hard_ttl_secs - 60 with store._conn() as c: c.execute( "UPDATE sessions SET closed_at=? WHERE session_id=?", (closed_long_ago, sid), ) server_mod = ctx["server"] counts = asyncio.get_event_loop().run_until_complete(server_mod._sweep_once()) assert counts["hard_deleted"] == 1 assert store.get_session(sid) is None # ---- pool full ------------------------------------------------------------- def test_pool_full_returns_503(client, monkeypatch): tc, ctx = client fake = ctx["fake_acpx"] monkeypatch.setattr(fake, "max_live_sessions", 2) # First two create OK for _ in range(2): r = tc.post( "/sessions", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"agent": "claude"}, ) assert r.status_code == 200 # Third hits the pool cap r3 = tc.post( "/sessions", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"agent": "claude"}, ) assert r3.status_code == 503 # ---- /run regression ------------------------------------------------------- def test_run_endpoint_unchanged(client, monkeypatch): """The /run path stays on the bare claude-p subprocess and v0.1 shape.""" tc, ctx = client # We don't actually want to invoke claude. Patch Runner.run to return a # canned RunResult that mirrors the v0.1 shape. from clawdforge.runner import RunResult server_mod = ctx["server"] canned = RunResult( ok=True, result={"hello": "world"}, raw_stdout='{"type":"result","result":"{\\"hello\\":\\"world\\"}","stop_reason":"end_turn"}', raw_stderr="", duration_ms=123, stop_reason="end_turn", error=None, ) monkeypatch.setattr(server_mod.runner, "run", lambda **kw: canned) r = tc.post( "/run", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"prompt": "Reply with JSON: {\"hello\":\"world\"}", "model": "sonnet"}, ) assert r.status_code == 200 body = r.json() # Exact v0.1 shape: ok, result, duration_ms, stop_reason — nothing else added assert set(body.keys()) == {"ok", "result", "duration_ms", "stop_reason"} assert body["ok"] is True assert body["result"] == {"hello": "world"} assert body["stop_reason"] == "end_turn" assert body["duration_ms"] == 123 def test_run_endpoint_unchanged_error_shape(client, monkeypatch): """v0.1's error shape (502 with ok/error/stderr/duration_ms/stop_reason) preserved.""" tc, ctx = client from clawdforge.runner import RunResult server_mod = ctx["server"] canned = RunResult( ok=False, result=None, raw_stdout="", raw_stderr="boom", duration_ms=10, stop_reason="error", error="claude exit 1", ) monkeypatch.setattr(server_mod.runner, "run", lambda **kw: canned) r = tc.post( "/run", headers={"Authorization": f"Bearer {ctx['app_token']}"}, json={"prompt": "x"}, ) assert r.status_code == 502 body = r.json() assert set(body.keys()) >= {"ok", "error", "stderr", "duration_ms", "stop_reason"} assert body["ok"] is False # ---- acpx_runner unit-level helpers ---------------------------------------- def test_extract_stop_reason_from_ndjson(): from clawdforge.acpx_runner import _extract_stop_reason, _parse_ndjson raw = ( '{"jsonrpc":"2.0","id":"req-1","method":"session/prompt","params":{}}\n' '{"jsonrpc":"2.0","method":"session/update","params":{"sessionUpdate":"agent_message_chunk","content":{"type":"text","text":"Hi"}}}\n' '{"jsonrpc":"2.0","id":"req-1","result":{"stopReason":"end_turn"}}\n' ) events = _parse_ndjson(raw) assert len(events) == 3 assert _extract_stop_reason(events) == "end_turn" def test_parse_ndjson_skips_garbage(): from clawdforge.acpx_runner import _parse_ndjson raw = '{"a":1}\nnot json\n{"b":2}\n\n \n{"c":3}\n' events = _parse_ndjson(raw) assert events == [{"a": 1}, {"b": 2}, {"c": 3}] def test_format_prompt_with_files(): from clawdforge.acpx_runner import _format_prompt_with_files plain = _format_prompt_with_files("hello", None) assert plain == "hello" annotated = _format_prompt_with_files("hello", ["/data/x.txt", "/data/y.txt"]) assert "/data/x.txt" in annotated and "/data/y.txt" in annotated assert annotated.endswith("hello")