- Session class wrapping a clawdforge session_id; context-manager auto-close
- forge.session(agent=...) block form (preferred)
- forge.create_session() / forge.list_sessions() / forge.get_session() admin shapes
- TurnResult dataclass with .text() helper concatenating text events
- Idempotent Session.close() — safe in finally / __exit__
- tests/test_sessions.py: 16 tests covering block/manual/idempotency/exception/list/state/text-helper/v0.1-regression
- README "Multi-turn / Sessions (v0.2)" section
- pyproject version 0.1.0 -> 0.2.0; package __version__ matches
Architecture: matches the existing v0.1 client — sync, requests-based,
single Forge-owned requests.Session for connection pooling. Session holds
a back-reference to the Forge for HTTP work (no per-Session HTTP client).
This mirrors how Forge already exposes its own context-manager pattern, so
nothing about the threading/lifecycle story changes for callers.
v0.1 /run path unchanged — 49 existing tests still green, +16 new tests
for v0.2 (target was 9; covered the spec's 9 plus extras for empty-prompt
local validation, include_closed=false param, empty-id ValueError, and
s.state() round-trip).
mypy --strict src/clawdforge/ clean.
Spec: memory/spec-clawdforge-v0.2.md
Server core: 940861f
479 lines
17 KiB
Python
479 lines
17 KiB
Python
"""Tests for the v0.2 Session / TurnResult surface of the clawdforge SDK.
|
|
|
|
Same test framework as test_client.py — `unittest` + `responses` to
|
|
intercept HTTP. No live network. v0.1 paths covered there are not
|
|
re-exercised here; the regression test_v01_run_unchanged is a smoke check
|
|
to catch accidental breakage of the existing /run path while we layered on
|
|
the session methods.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import unittest
|
|
from typing import Any
|
|
|
|
import requests
|
|
import responses
|
|
|
|
from clawdforge import (
|
|
Forge,
|
|
ForgeAPIError,
|
|
Session,
|
|
TurnResult,
|
|
)
|
|
|
|
|
|
BASE_URL = "http://192.168.0.5:8800"
|
|
TOKEN = "cf_test_token_xxxxxxxx"
|
|
|
|
|
|
def _forge() -> Forge:
|
|
return Forge(base_url=BASE_URL, token=TOKEN, default_timeout_secs=60)
|
|
|
|
|
|
def _create_payload(session_id: str = "sess_abc", agent: str = "claude") -> dict[str, Any]:
|
|
return {
|
|
"ok": True,
|
|
"session_id": session_id,
|
|
"agent": agent,
|
|
"created_at": 1_700_000_000,
|
|
"cwd": "/tmp/acpx-sessions/sess_abc",
|
|
}
|
|
|
|
|
|
def _turn_payload(
|
|
session_id: str = "sess_abc",
|
|
turn_index: int = 1,
|
|
events: list[dict] | None = None,
|
|
stop_reason: str = "end_turn",
|
|
duration_ms: int = 1234,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"ok": True,
|
|
"session_id": session_id,
|
|
"turn_index": turn_index,
|
|
"events": events if events is not None else [{"type": "text", "content": "hi"}],
|
|
"stop_reason": stop_reason,
|
|
"duration_ms": duration_ms,
|
|
}
|
|
|
|
|
|
class TestSessionBlock(unittest.TestCase):
|
|
"""Block / context-manager form."""
|
|
|
|
@responses.activate
|
|
def test_session_block_creates_and_closes(self) -> None:
|
|
"""`with forge.session() as s:` issues create on enter and close on exit."""
|
|
captured: dict[str, Any] = {"requests": []}
|
|
|
|
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
captured["requests"].append(("POST", "/sessions"))
|
|
return (200, {}, json.dumps(_create_payload()))
|
|
|
|
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
captured["requests"].append(("DELETE", "/sessions/sess_abc"))
|
|
return (200, {}, json.dumps({"ok": True}))
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
|
|
responses.add_callback(
|
|
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
|
|
)
|
|
|
|
with _forge() as f:
|
|
with f.session(agent="claude") as s:
|
|
self.assertIsInstance(s, Session)
|
|
self.assertEqual(s.session_id, "sess_abc")
|
|
self.assertEqual(s.agent, "claude")
|
|
self.assertFalse(s.closed)
|
|
# Outside the block: close was issued
|
|
self.assertTrue(s.closed)
|
|
|
|
self.assertEqual(
|
|
captured["requests"],
|
|
[("POST", "/sessions"), ("DELETE", "/sessions/sess_abc")],
|
|
)
|
|
|
|
@responses.activate
|
|
def test_session_create_sends_agent_and_meta(self) -> None:
|
|
captured: dict[str, Any] = {}
|
|
|
|
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
captured["body"] = json.loads(request.body)
|
|
captured["auth"] = request.headers.get("Authorization")
|
|
return (200, {}, json.dumps(_create_payload(agent="claude")))
|
|
|
|
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
return (200, {}, json.dumps({"ok": True}))
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
|
|
responses.add_callback(
|
|
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
|
|
)
|
|
|
|
with _forge() as f, f.session(agent="claude", meta={"caller": "test"}) as s:
|
|
self.assertEqual(s.session_id, "sess_abc")
|
|
|
|
self.assertEqual(captured["auth"], f"Bearer {TOKEN}")
|
|
self.assertEqual(captured["body"], {"agent": "claude", "meta": {"caller": "test"}})
|
|
|
|
|
|
class TestSessionTurn(unittest.TestCase):
|
|
@responses.activate
|
|
def test_session_turn_round_trip(self) -> None:
|
|
"""Mock POST /sessions and POST /sessions/{id}/turn — assert TurnResult shape."""
|
|
captured: dict[str, Any] = {}
|
|
|
|
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
return (200, {}, json.dumps(_create_payload()))
|
|
|
|
def turn_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
captured["body"] = json.loads(request.body)
|
|
return (
|
|
200,
|
|
{},
|
|
json.dumps(
|
|
_turn_payload(
|
|
turn_index=2,
|
|
events=[
|
|
{"type": "thinking", "content": "..."},
|
|
{"type": "tool_call", "name": "Read", "args": {}},
|
|
{"type": "text", "content": "hello"},
|
|
],
|
|
duration_ms=4321,
|
|
)
|
|
),
|
|
)
|
|
|
|
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
return (200, {}, json.dumps({"ok": True}))
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
|
|
responses.add_callback(
|
|
responses.POST, f"{BASE_URL}/sessions/sess_abc/turn", callback=turn_cb
|
|
)
|
|
responses.add_callback(
|
|
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
|
|
)
|
|
|
|
with _forge() as f, f.session(agent="claude") as s:
|
|
r = s.turn("hello", files=["ff_xyz"], timeout_secs=42)
|
|
|
|
self.assertIsInstance(r, TurnResult)
|
|
self.assertTrue(r.ok)
|
|
self.assertEqual(r.session_id, "sess_abc")
|
|
self.assertEqual(r.turn_index, 2)
|
|
self.assertEqual(r.stop_reason, "end_turn")
|
|
self.assertEqual(r.duration_ms, 4321)
|
|
self.assertEqual(len(r.events), 3)
|
|
self.assertEqual(captured["body"]["prompt"], "hello")
|
|
self.assertEqual(captured["body"]["files"], ["ff_xyz"])
|
|
self.assertEqual(captured["body"]["timeout_secs"], 42)
|
|
|
|
@responses.activate
|
|
def test_session_turn_empty_prompt_rejected_locally(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/sessions",
|
|
json=_create_payload(),
|
|
status=200,
|
|
)
|
|
responses.add(
|
|
responses.DELETE,
|
|
f"{BASE_URL}/sessions/sess_abc",
|
|
json={"ok": True},
|
|
status=200,
|
|
)
|
|
with _forge() as f, f.session() as s, self.assertRaises(ValueError):
|
|
s.turn("")
|
|
|
|
|
|
class TestSessionCloseIdempotent(unittest.TestCase):
|
|
@responses.activate
|
|
def test_session_close_idempotent(self) -> None:
|
|
"""Calling close() twice — second is a no-op (no second HTTP request)."""
|
|
captured: dict[str, Any] = {"close_count": 0}
|
|
|
|
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
return (200, {}, json.dumps(_create_payload()))
|
|
|
|
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
captured["close_count"] += 1
|
|
return (200, {}, json.dumps({"ok": True}))
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
|
|
responses.add_callback(
|
|
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
|
|
)
|
|
|
|
with _forge() as f:
|
|
s = f.create_session(agent="claude")
|
|
s.close()
|
|
self.assertTrue(s.closed)
|
|
s.close() # second call must NOT issue another DELETE
|
|
s.close() # ditto
|
|
|
|
self.assertEqual(captured["close_count"], 1)
|
|
|
|
|
|
class TestSessionCloseOnException(unittest.TestCase):
|
|
@responses.activate
|
|
def test_session_close_on_exception(self) -> None:
|
|
"""Exception raised inside `with` block — session still gets closed."""
|
|
captured: dict[str, Any] = {"closed": False}
|
|
|
|
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
return (200, {}, json.dumps(_create_payload()))
|
|
|
|
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
captured["closed"] = True
|
|
return (200, {}, json.dumps({"ok": True}))
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
|
|
responses.add_callback(
|
|
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
|
|
)
|
|
|
|
class Boom(Exception):
|
|
pass
|
|
|
|
with _forge() as f:
|
|
with self.assertRaises(Boom):
|
|
with f.session() as s:
|
|
self.assertEqual(s.session_id, "sess_abc")
|
|
raise Boom("kaboom inside the with")
|
|
|
|
self.assertTrue(captured["closed"])
|
|
|
|
|
|
class TestListSessions(unittest.TestCase):
|
|
@responses.activate
|
|
def test_list_sessions(self) -> None:
|
|
rows = [
|
|
{
|
|
"session_id": "sess_a",
|
|
"app_name": "cauldron",
|
|
"agent": "claude",
|
|
"cwd": "/tmp/x",
|
|
"created_at": 100,
|
|
"last_turn_at": 200,
|
|
"turn_count": 3,
|
|
"closed_at": None,
|
|
"meta": None,
|
|
},
|
|
{
|
|
"session_id": "sess_b",
|
|
"app_name": "cauldron",
|
|
"agent": "claude",
|
|
"cwd": "/tmp/y",
|
|
"created_at": 50,
|
|
"last_turn_at": None,
|
|
"turn_count": 0,
|
|
"closed_at": 75,
|
|
"meta": {"x": 1},
|
|
},
|
|
]
|
|
responses.add(
|
|
responses.GET,
|
|
f"{BASE_URL}/sessions",
|
|
json={"ok": True, "sessions": rows, "count": len(rows)},
|
|
status=200,
|
|
)
|
|
|
|
with _forge() as f:
|
|
out = f.list_sessions()
|
|
|
|
self.assertEqual(len(out), 2)
|
|
self.assertEqual(out[0]["session_id"], "sess_a")
|
|
self.assertIsNone(out[0]["closed_at"])
|
|
self.assertEqual(out[1]["closed_at"], 75)
|
|
|
|
@responses.activate
|
|
def test_list_sessions_include_closed_false(self) -> None:
|
|
captured: dict[str, Any] = {}
|
|
|
|
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
captured["url"] = request.url
|
|
return (200, {}, json.dumps({"ok": True, "sessions": [], "count": 0}))
|
|
|
|
responses.add_callback(responses.GET, f"{BASE_URL}/sessions", callback=cb)
|
|
with _forge() as f:
|
|
f.list_sessions(include_closed=False)
|
|
self.assertIn("include_closed=false", captured["url"])
|
|
|
|
|
|
class TestGetSessionState(unittest.TestCase):
|
|
@responses.activate
|
|
def test_get_session_state(self) -> None:
|
|
responses.add(
|
|
responses.GET,
|
|
f"{BASE_URL}/sessions/sess_abc",
|
|
json={
|
|
"ok": True,
|
|
"session_id": "sess_abc",
|
|
"agent": "claude",
|
|
"cwd": "/tmp/x",
|
|
"created_at": 100,
|
|
"last_turn_at": 200,
|
|
"turn_count": 5,
|
|
"closed_at": None,
|
|
"live": True,
|
|
"meta": {"caller": "test"},
|
|
},
|
|
status=200,
|
|
)
|
|
with _forge() as f:
|
|
state = f.get_session("sess_abc")
|
|
self.assertEqual(state["session_id"], "sess_abc")
|
|
self.assertEqual(state["turn_count"], 5)
|
|
self.assertIsNone(state["closed_at"])
|
|
self.assertTrue(state["live"])
|
|
|
|
def test_get_session_empty_id_rejected(self) -> None:
|
|
with _forge() as f, self.assertRaises(ValueError):
|
|
f.get_session("")
|
|
|
|
@responses.activate
|
|
def test_session_state_method_round_trips(self) -> None:
|
|
"""Session.state() calls through to forge.get_session(self.session_id)."""
|
|
|
|
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
return (200, {}, json.dumps(_create_payload()))
|
|
|
|
def state_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
return (
|
|
200,
|
|
{},
|
|
json.dumps(
|
|
{
|
|
"ok": True,
|
|
"session_id": "sess_abc",
|
|
"agent": "claude",
|
|
"cwd": "/tmp/x",
|
|
"created_at": 100,
|
|
"last_turn_at": None,
|
|
"turn_count": 0,
|
|
"closed_at": None,
|
|
"live": True,
|
|
"meta": None,
|
|
}
|
|
),
|
|
)
|
|
|
|
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
return (200, {}, json.dumps({"ok": True}))
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
|
|
responses.add_callback(
|
|
responses.GET, f"{BASE_URL}/sessions/sess_abc", callback=state_cb
|
|
)
|
|
responses.add_callback(
|
|
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
|
|
)
|
|
|
|
with _forge() as f, f.session() as s:
|
|
st = s.state()
|
|
self.assertEqual(st["session_id"], "sess_abc")
|
|
|
|
|
|
class TestSession404IsApiError(unittest.TestCase):
|
|
@responses.activate
|
|
def test_session_404_is_api_error(self) -> None:
|
|
"""Cross-token session access — server returns 404, SDK surfaces ForgeAPIError(404)."""
|
|
responses.add(
|
|
responses.GET,
|
|
f"{BASE_URL}/sessions/sess_other_token",
|
|
json={"detail": "session not found"},
|
|
status=404,
|
|
)
|
|
with _forge() as f, self.assertRaises(ForgeAPIError) as ctx:
|
|
f.get_session("sess_other_token")
|
|
self.assertEqual(ctx.exception.status_code, 404)
|
|
|
|
@responses.activate
|
|
def test_session_turn_404_is_api_error(self) -> None:
|
|
"""Same on /sessions/{id}/turn — 404 surfaces as ForgeAPIError(404)."""
|
|
|
|
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
return (200, {}, json.dumps(_create_payload()))
|
|
|
|
def turn_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
return (404, {}, json.dumps({"detail": "session not found"}))
|
|
|
|
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
return (200, {}, json.dumps({"ok": True}))
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
|
|
responses.add_callback(
|
|
responses.POST, f"{BASE_URL}/sessions/sess_abc/turn", callback=turn_cb
|
|
)
|
|
responses.add_callback(
|
|
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
|
|
)
|
|
|
|
with _forge() as f, f.session() as s, self.assertRaises(ForgeAPIError) as ctx:
|
|
s.turn("hello")
|
|
self.assertEqual(ctx.exception.status_code, 404)
|
|
|
|
|
|
class TestTurnResultText(unittest.TestCase):
|
|
def test_turn_result_text_concatenates_text_events(self) -> None:
|
|
"""`.text()` concatenates `type=='text'` events' content; ignores others."""
|
|
r = TurnResult.from_response(
|
|
{
|
|
"ok": True,
|
|
"session_id": "sess_x",
|
|
"turn_index": 1,
|
|
"events": [
|
|
{"type": "thinking", "content": "should not appear"},
|
|
{"type": "text", "content": "hello "},
|
|
{"type": "tool_call", "name": "Read", "args": {}, "result": {}},
|
|
{"type": "text", "content": "world"},
|
|
# malformed event must not blow up text()
|
|
{"type": "text"}, # no content
|
|
"not a dict", # type: ignore[list-item]
|
|
],
|
|
"stop_reason": "end_turn",
|
|
"duration_ms": 1,
|
|
}
|
|
)
|
|
self.assertEqual(r.text(), "hello world")
|
|
|
|
def test_turn_result_text_empty_when_no_text_events(self) -> None:
|
|
r = TurnResult.from_response(
|
|
{
|
|
"ok": True,
|
|
"session_id": "sess_x",
|
|
"turn_index": 1,
|
|
"events": [{"type": "tool_call", "name": "X"}],
|
|
"stop_reason": "end_turn",
|
|
"duration_ms": 1,
|
|
}
|
|
)
|
|
self.assertEqual(r.text(), "")
|
|
|
|
|
|
class TestV01RunUnchanged(unittest.TestCase):
|
|
"""Regression: existing /run path still works unchanged after v0.2 layer."""
|
|
|
|
@responses.activate
|
|
def test_v01_run_unchanged(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/run",
|
|
json={
|
|
"ok": True,
|
|
"result": {"hello": "world"},
|
|
"duration_ms": 100,
|
|
"stop_reason": "end_turn",
|
|
},
|
|
status=200,
|
|
)
|
|
with _forge() as f:
|
|
r = f.run(prompt='Reply with JSON: {"hello": "world"}')
|
|
self.assertTrue(r.ok)
|
|
self.assertEqual(r.result, {"hello": "world"})
|
|
self.assertEqual(r.duration_ms, 100)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|