"""Tests for the v0.2 Session / TurnResult surface of the clawdforge SDK. Same test framework as test_client.py — `unittest` + `responses` to intercept HTTP. No live network. v0.1 paths covered there are not re-exercised here; the regression test_v01_run_unchanged is a smoke check to catch accidental breakage of the existing /run path while we layered on the session methods. """ from __future__ import annotations import json import unittest from typing import Any import requests import responses from clawdforge import ( Forge, ForgeAPIError, Session, TurnResult, ) BASE_URL = "http://192.168.0.5:8800" TOKEN = "cf_test_token_xxxxxxxx" def _forge() -> Forge: return Forge(base_url=BASE_URL, token=TOKEN, default_timeout_secs=60) def _create_payload(session_id: str = "sess_abc", agent: str = "claude") -> dict[str, Any]: return { "ok": True, "session_id": session_id, "agent": agent, "created_at": 1_700_000_000, "cwd": "/tmp/acpx-sessions/sess_abc", } def _turn_payload( session_id: str = "sess_abc", turn_index: int = 1, events: list[dict] | None = None, stop_reason: str = "end_turn", duration_ms: int = 1234, ) -> dict[str, Any]: return { "ok": True, "session_id": session_id, "turn_index": turn_index, "events": events if events is not None else [{"type": "text", "content": "hi"}], "stop_reason": stop_reason, "duration_ms": duration_ms, } class TestSessionBlock(unittest.TestCase): """Block / context-manager form.""" @responses.activate def test_session_block_creates_and_closes(self) -> None: """`with forge.session() as s:` issues create on enter and close on exit.""" captured: dict[str, Any] = {"requests": []} def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: captured["requests"].append(("POST", "/sessions")) return (200, {}, json.dumps(_create_payload())) def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: captured["requests"].append(("DELETE", "/sessions/sess_abc")) return (200, {}, json.dumps({"ok": True})) responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) responses.add_callback( responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb ) with _forge() as f: with f.session(agent="claude") as s: self.assertIsInstance(s, Session) self.assertEqual(s.session_id, "sess_abc") self.assertEqual(s.agent, "claude") self.assertFalse(s.closed) # Outside the block: close was issued self.assertTrue(s.closed) self.assertEqual( captured["requests"], [("POST", "/sessions"), ("DELETE", "/sessions/sess_abc")], ) @responses.activate def test_session_create_sends_agent_and_meta(self) -> None: captured: dict[str, Any] = {} def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: captured["body"] = json.loads(request.body) captured["auth"] = request.headers.get("Authorization") return (200, {}, json.dumps(_create_payload(agent="claude"))) def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: return (200, {}, json.dumps({"ok": True})) responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) responses.add_callback( responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb ) with _forge() as f, f.session(agent="claude", meta={"caller": "test"}) as s: self.assertEqual(s.session_id, "sess_abc") self.assertEqual(captured["auth"], f"Bearer {TOKEN}") self.assertEqual(captured["body"], {"agent": "claude", "meta": {"caller": "test"}}) class TestSessionTurn(unittest.TestCase): @responses.activate def test_session_turn_round_trip(self) -> None: """Mock POST /sessions and POST /sessions/{id}/turn — assert TurnResult shape.""" captured: dict[str, Any] = {} def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: return (200, {}, json.dumps(_create_payload())) def turn_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: captured["body"] = json.loads(request.body) return ( 200, {}, json.dumps( _turn_payload( turn_index=2, events=[ {"type": "thinking", "content": "..."}, {"type": "tool_call", "name": "Read", "args": {}}, {"type": "text", "content": "hello"}, ], duration_ms=4321, ) ), ) def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: return (200, {}, json.dumps({"ok": True})) responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) responses.add_callback( responses.POST, f"{BASE_URL}/sessions/sess_abc/turn", callback=turn_cb ) responses.add_callback( responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb ) with _forge() as f, f.session(agent="claude") as s: r = s.turn("hello", files=["ff_xyz"], timeout_secs=42) self.assertIsInstance(r, TurnResult) self.assertTrue(r.ok) self.assertEqual(r.session_id, "sess_abc") self.assertEqual(r.turn_index, 2) self.assertEqual(r.stop_reason, "end_turn") self.assertEqual(r.duration_ms, 4321) self.assertEqual(len(r.events), 3) self.assertEqual(captured["body"]["prompt"], "hello") self.assertEqual(captured["body"]["files"], ["ff_xyz"]) self.assertEqual(captured["body"]["timeout_secs"], 42) @responses.activate def test_session_turn_empty_prompt_rejected_locally(self) -> None: responses.add( responses.POST, f"{BASE_URL}/sessions", json=_create_payload(), status=200, ) responses.add( responses.DELETE, f"{BASE_URL}/sessions/sess_abc", json={"ok": True}, status=200, ) with _forge() as f, f.session() as s, self.assertRaises(ValueError): s.turn("") class TestSessionCloseIdempotent(unittest.TestCase): @responses.activate def test_session_close_idempotent(self) -> None: """Calling close() twice — second is a no-op (no second HTTP request).""" captured: dict[str, Any] = {"close_count": 0} def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: return (200, {}, json.dumps(_create_payload())) def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: captured["close_count"] += 1 return (200, {}, json.dumps({"ok": True})) responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) responses.add_callback( responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb ) with _forge() as f: s = f.create_session(agent="claude") s.close() self.assertTrue(s.closed) s.close() # second call must NOT issue another DELETE s.close() # ditto self.assertEqual(captured["close_count"], 1) class TestSessionCloseOnException(unittest.TestCase): @responses.activate def test_session_close_on_exception(self) -> None: """Exception raised inside `with` block — session still gets closed.""" captured: dict[str, Any] = {"closed": False} def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: return (200, {}, json.dumps(_create_payload())) def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: captured["closed"] = True return (200, {}, json.dumps({"ok": True})) responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) responses.add_callback( responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb ) class Boom(Exception): pass with _forge() as f: with self.assertRaises(Boom): with f.session() as s: self.assertEqual(s.session_id, "sess_abc") raise Boom("kaboom inside the with") self.assertTrue(captured["closed"]) class TestListSessions(unittest.TestCase): @responses.activate def test_list_sessions(self) -> None: rows = [ { "session_id": "sess_a", "app_name": "cauldron", "agent": "claude", "cwd": "/tmp/x", "created_at": 100, "last_turn_at": 200, "turn_count": 3, "closed_at": None, "meta": None, }, { "session_id": "sess_b", "app_name": "cauldron", "agent": "claude", "cwd": "/tmp/y", "created_at": 50, "last_turn_at": None, "turn_count": 0, "closed_at": 75, "meta": {"x": 1}, }, ] responses.add( responses.GET, f"{BASE_URL}/sessions", json={"ok": True, "sessions": rows, "count": len(rows)}, status=200, ) with _forge() as f: out = f.list_sessions() self.assertEqual(len(out), 2) self.assertEqual(out[0]["session_id"], "sess_a") self.assertIsNone(out[0]["closed_at"]) self.assertEqual(out[1]["closed_at"], 75) @responses.activate def test_list_sessions_include_closed_false(self) -> None: captured: dict[str, Any] = {} def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: captured["url"] = request.url return (200, {}, json.dumps({"ok": True, "sessions": [], "count": 0})) responses.add_callback(responses.GET, f"{BASE_URL}/sessions", callback=cb) with _forge() as f: f.list_sessions(include_closed=False) self.assertIn("include_closed=false", captured["url"]) class TestGetSessionState(unittest.TestCase): @responses.activate def test_get_session_state(self) -> None: responses.add( responses.GET, f"{BASE_URL}/sessions/sess_abc", json={ "ok": True, "session_id": "sess_abc", "agent": "claude", "cwd": "/tmp/x", "created_at": 100, "last_turn_at": 200, "turn_count": 5, "closed_at": None, "live": True, "meta": {"caller": "test"}, }, status=200, ) with _forge() as f: state = f.get_session("sess_abc") self.assertEqual(state["session_id"], "sess_abc") self.assertEqual(state["turn_count"], 5) self.assertIsNone(state["closed_at"]) self.assertTrue(state["live"]) def test_get_session_empty_id_rejected(self) -> None: with _forge() as f, self.assertRaises(ValueError): f.get_session("") @responses.activate def test_session_state_method_round_trips(self) -> None: """Session.state() calls through to forge.get_session(self.session_id).""" def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: return (200, {}, json.dumps(_create_payload())) def state_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: return ( 200, {}, json.dumps( { "ok": True, "session_id": "sess_abc", "agent": "claude", "cwd": "/tmp/x", "created_at": 100, "last_turn_at": None, "turn_count": 0, "closed_at": None, "live": True, "meta": None, } ), ) def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: return (200, {}, json.dumps({"ok": True})) responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) responses.add_callback( responses.GET, f"{BASE_URL}/sessions/sess_abc", callback=state_cb ) responses.add_callback( responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb ) with _forge() as f, f.session() as s: st = s.state() self.assertEqual(st["session_id"], "sess_abc") class TestSession404IsApiError(unittest.TestCase): @responses.activate def test_session_404_is_api_error(self) -> None: """Cross-token session access — server returns 404, SDK surfaces ForgeAPIError(404).""" responses.add( responses.GET, f"{BASE_URL}/sessions/sess_other_token", json={"detail": "session not found"}, status=404, ) with _forge() as f, self.assertRaises(ForgeAPIError) as ctx: f.get_session("sess_other_token") self.assertEqual(ctx.exception.status_code, 404) @responses.activate def test_session_turn_404_is_api_error(self) -> None: """Same on /sessions/{id}/turn — 404 surfaces as ForgeAPIError(404).""" def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: return (200, {}, json.dumps(_create_payload())) def turn_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: return (404, {}, json.dumps({"detail": "session not found"})) def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: return (200, {}, json.dumps({"ok": True})) responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb) responses.add_callback( responses.POST, f"{BASE_URL}/sessions/sess_abc/turn", callback=turn_cb ) responses.add_callback( responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb ) with _forge() as f, f.session() as s, self.assertRaises(ForgeAPIError) as ctx: s.turn("hello") self.assertEqual(ctx.exception.status_code, 404) class TestTurnResultText(unittest.TestCase): def test_turn_result_text_concatenates_text_events(self) -> None: """`.text()` concatenates `type=='text'` events' content; ignores others.""" r = TurnResult.from_response( { "ok": True, "session_id": "sess_x", "turn_index": 1, "events": [ {"type": "thinking", "content": "should not appear"}, {"type": "text", "content": "hello "}, {"type": "tool_call", "name": "Read", "args": {}, "result": {}}, {"type": "text", "content": "world"}, # malformed event must not blow up text() {"type": "text"}, # no content "not a dict", # type: ignore[list-item] ], "stop_reason": "end_turn", "duration_ms": 1, } ) self.assertEqual(r.text(), "hello world") def test_turn_result_text_empty_when_no_text_events(self) -> None: r = TurnResult.from_response( { "ok": True, "session_id": "sess_x", "turn_index": 1, "events": [{"type": "tool_call", "name": "X"}], "stop_reason": "end_turn", "duration_ms": 1, } ) self.assertEqual(r.text(), "") class TestV01RunUnchanged(unittest.TestCase): """Regression: existing /run path still works unchanged after v0.2 layer.""" @responses.activate def test_v01_run_unchanged(self) -> None: responses.add( responses.POST, f"{BASE_URL}/run", json={ "ok": True, "result": {"hello": "world"}, "duration_ms": 100, "stop_reason": "end_turn", }, status=200, ) with _forge() as f: r = f.run(prompt='Reply with JSON: {"hello": "world"}') self.assertTrue(r.ok) self.assertEqual(r.result, {"hello": "world"}) self.assertEqual(r.duration_ms, 100) if __name__ == "__main__": unittest.main()