clawdforge/clients/mcp/tests/test_sessions.py
Kayos 5737903217 clients/mcp: v0.2 multi-turn session tools
- clawdforge_session_new / _turn / _close / _list / _get
- Wraps the v0.2 session HTTP surface (POST /sessions, POST
  /sessions/{id}/turn, GET/DELETE /sessions/{id}, GET /sessions)
- Tool descriptions tuned for LLM consumption: when to prefer session_new
  vs run, idempotency contract on close, file_token attachment via files[]
- session_turn returns two content blocks: prose text (concat'd text events)
  for direct LLM consumption + structured trace JSON (turn_index,
  stop_reason, duration_ms, events) for tool-calling agents
- 404/410/auth errors from upstream surface as MCP errors with actionable
  messages; no Python tracebacks leak through
- tests/test_sessions.py: 22 new tests covering the 5 tools + 404 + schema
  validation + idempotent close
- tests/test_server.py: new v0.1 schema-pin regression test
- README "Sessions (v0.2)" section with example open/turn/turn/close chain
- Bump version 0.1.0 -> 0.2.0

v0.1 tools (clawdforge_healthz / _run / _upload_file) are byte-identical.

Spec: memory/spec-clawdforge-v0.2.md
Server core: 940861f
2026-04-29 06:49:49 -07:00

553 lines
18 KiB
Python

"""Tests for the v0.2 multi-turn session tools.
Mirrors the structure of test_server.py — HTTP layer mocked via
``responses``, the dispatch function exercised directly.
Covers:
- ``clawdforge_session_new`` happy path → returns session_id
- ``clawdforge_session_turn`` round-trip → text content present in block 1,
structured events in block 2
- ``clawdforge_session_close`` idempotency (already_closed=True surfaces)
- ``clawdforge_session_list`` returns array of sessions
- ``clawdforge_session_get`` returns state
- 404 from /sessions/{id}/turn surfaces as MCP error with actionable text
The v0.1 tools' regression coverage lives in ``test_server.py`` and stays
intact; this file is purely additive.
"""
from __future__ import annotations
import asyncio
import json
import unittest
import jsonschema
import responses
from clawdforge_mcp.client import ForgeClient
from clawdforge_mcp.server import (
TOOL_SESSION_CLOSE,
TOOL_SESSION_GET,
TOOL_SESSION_LIST,
TOOL_SESSION_NEW,
TOOL_SESSION_TURN,
_dispatch,
_tool_definitions,
)
BASE_URL = "http://192.168.0.5:8800"
TOKEN = "cf_test_token_xxxxxxxx"
SID = "01HV9P1234567890ABCDEFGHJK" # ULID-ish; server returns whatever acpx hands back
def _client() -> ForgeClient:
return ForgeClient(base_url=BASE_URL, token=TOKEN, default_timeout_secs=10)
def _run(coro):
return asyncio.run(coro)
class TestSessionNewDispatch(unittest.TestCase):
@responses.activate
def test_session_new_returns_session_id(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/sessions",
json={
"ok": True,
"session_id": SID,
"agent": "claude",
"created_at": 1714329600,
"cwd": "/tmp/acpx-sessions/01HV.../cwd",
},
status=200,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_NEW, {"agent": "claude"})
)
finally:
forge.close()
self.assertFalse(is_error)
self.assertEqual(len(content), 1)
body = json.loads(content[0].text)
self.assertEqual(body["session_id"], SID)
self.assertEqual(body["agent"], "claude")
self.assertEqual(body["created_at"], 1714329600)
# Whitelist: only the four useful fields are surfaced.
self.assertEqual(
set(body.keys()), {"session_id", "agent", "created_at", "cwd"}
)
@responses.activate
def test_session_new_with_meta_passes_through(self) -> None:
captured: dict = {}
def cb(request):
captured["body"] = json.loads(request.body)
return (
200,
{},
json.dumps(
{
"ok": True,
"session_id": SID,
"agent": "claude",
"created_at": 1,
}
),
)
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=cb)
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_SESSION_NEW,
{"agent": "claude", "meta": {"task": "debug"}},
)
)
finally:
forge.close()
self.assertFalse(is_error)
self.assertEqual(captured["body"]["agent"], "claude")
self.assertEqual(captured["body"]["meta"], {"task": "debug"})
def test_session_new_rejects_non_dict_meta(self) -> None:
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_NEW, {"meta": "not-a-dict"})
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("meta", content[0].text)
class TestSessionTurnDispatch(unittest.TestCase):
@responses.activate
def test_session_turn_round_trip(self) -> None:
"""The turn returns two content blocks: prose text + structured trace."""
responses.add(
responses.POST,
f"{BASE_URL}/sessions/{SID}/turn",
json={
"ok": True,
"session_id": SID,
"turn_index": 1,
"events": [
{"type": "thinking", "content": "let me look..."},
{"type": "text", "content": "Hello back. "},
{"type": "text", "content": "Anything else?"},
],
"stop_reason": "end_turn",
"duration_ms": 4321,
},
status=200,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_SESSION_TURN,
{"session_id": SID, "prompt": "say hi"},
)
)
finally:
forge.close()
self.assertFalse(is_error)
self.assertEqual(len(content), 2)
# Block 1 is the prose — concatenation of text events.
self.assertEqual(content[0].text, "Hello back. Anything else?")
# Block 2 is the structured trace as JSON.
trace = json.loads(content[1].text)
self.assertEqual(trace["session_id"], SID)
self.assertEqual(trace["turn_index"], 1)
self.assertEqual(trace["stop_reason"], "end_turn")
self.assertEqual(trace["duration_ms"], 4321)
self.assertEqual(len(trace["events"]), 3)
# Thinking events ARE preserved in the trace block (callers may
# want to introspect tool-calling behavior).
self.assertEqual(trace["events"][0]["type"], "thinking")
@responses.activate
def test_session_turn_with_files_and_timeout_passes_through(self) -> None:
captured: dict = {}
def cb(request):
captured["body"] = json.loads(request.body)
return (
200,
{},
json.dumps(
{
"ok": True,
"session_id": SID,
"turn_index": 2,
"events": [{"type": "text", "content": "ok"}],
"stop_reason": "end_turn",
"duration_ms": 1,
}
),
)
responses.add_callback(
responses.POST, f"{BASE_URL}/sessions/{SID}/turn", callback=cb
)
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_SESSION_TURN,
{
"session_id": SID,
"prompt": "summarize",
"files": ["ff_aaa", "ff_bbb"],
"timeout_secs": 90,
},
)
)
finally:
forge.close()
self.assertFalse(is_error)
self.assertEqual(captured["body"]["files"], ["ff_aaa", "ff_bbb"])
self.assertEqual(captured["body"]["timeout_secs"], 90)
def test_session_turn_rejects_missing_session_id(self) -> None:
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_TURN, {"prompt": "hi"})
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("session_id", content[0].text)
def test_session_turn_rejects_empty_prompt(self) -> None:
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_SESSION_TURN,
{"session_id": SID, "prompt": ""},
)
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("prompt", content[0].text.lower())
def test_session_turn_strict_bool_guard_timeout_secs(self) -> None:
"""bool is a subclass of int — must not slip past the runtime guard."""
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_SESSION_TURN,
{"session_id": SID, "prompt": "hi", "timeout_secs": True},
)
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("timeout_secs", content[0].text)
@responses.activate
def test_session_turn_404_surfaces_as_mcp_error(self) -> None:
"""404 from upstream → isError=True with actionable message."""
responses.add(
responses.POST,
f"{BASE_URL}/sessions/{SID}/turn",
json={"detail": "session not found"},
status=404,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_SESSION_TURN,
{"session_id": SID, "prompt": "hi"},
)
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("404", content[0].text)
self.assertIn("session not found", content[0].text)
# Defense-in-depth: no Python traceback leaks through.
self.assertNotIn("Traceback", content[0].text)
class TestSessionCloseDispatch(unittest.TestCase):
@responses.activate
def test_session_close_first_call(self) -> None:
responses.add(
responses.DELETE,
f"{BASE_URL}/sessions/{SID}",
json={"ok": True},
status=200,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_CLOSE, {"session_id": SID})
)
finally:
forge.close()
self.assertFalse(is_error)
body = json.loads(content[0].text)
self.assertTrue(body["ok"])
# First close: server doesn't set already_closed, so we don't either.
self.assertNotIn("already_closed", body)
@responses.activate
def test_session_close_idempotent_second_call(self) -> None:
"""Server returns {ok: true, already_closed: true} on a re-close —
we surface that flag verbatim. The MCP tool doesn't error on
re-close (it's the documented contract for idempotency)."""
responses.add(
responses.DELETE,
f"{BASE_URL}/sessions/{SID}",
json={"ok": True, "already_closed": True},
status=200,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_CLOSE, {"session_id": SID})
)
finally:
forge.close()
self.assertFalse(is_error)
body = json.loads(content[0].text)
self.assertTrue(body["ok"])
self.assertTrue(body["already_closed"])
def test_session_close_rejects_missing_session_id(self) -> None:
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_CLOSE, {})
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("session_id", content[0].text)
class TestSessionListDispatch(unittest.TestCase):
@responses.activate
def test_session_list_returns_array(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/sessions",
json={
"ok": True,
"sessions": [
{
"session_id": SID,
"agent": "claude",
"app_name": "mcp-test",
"created_at": 1714329600,
"last_turn_at": 1714329700,
"turn_count": 3,
"closed_at": None,
},
{
"session_id": "older-session-id",
"agent": "claude",
"app_name": "mcp-test",
"created_at": 1714329000,
"last_turn_at": 1714329100,
"turn_count": 1,
"closed_at": 1714329500,
},
],
"count": 2,
},
status=200,
)
forge = _client()
try:
content, is_error = _run(_dispatch(forge, TOOL_SESSION_LIST, {}))
finally:
forge.close()
self.assertFalse(is_error)
body = json.loads(content[0].text)
self.assertIsInstance(body["sessions"], list)
self.assertEqual(body["count"], 2)
self.assertEqual(body["sessions"][0]["session_id"], SID)
self.assertEqual(body["sessions"][1]["closed_at"], 1714329500)
@responses.activate
def test_session_list_include_closed_false_passes_query_param(self) -> None:
"""include_closed=false must turn into ?include_closed=false on the wire."""
captured: dict = {}
def cb(request):
captured["url"] = request.url
return (
200,
{},
json.dumps({"ok": True, "sessions": [], "count": 0}),
)
responses.add_callback(
responses.GET, f"{BASE_URL}/sessions", callback=cb
)
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge, TOOL_SESSION_LIST, {"include_closed": False}
)
)
finally:
forge.close()
self.assertFalse(is_error)
self.assertIn("include_closed=false", captured["url"])
def test_session_list_rejects_non_bool_include_closed(self) -> None:
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge, TOOL_SESSION_LIST, {"include_closed": "false"}
)
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("include_closed", content[0].text)
class TestSessionGetDispatch(unittest.TestCase):
@responses.activate
def test_session_get_returns_state(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/sessions/{SID}",
json={
"ok": True,
"session_id": SID,
"agent": "claude",
"cwd": "/tmp/acpx/cwd",
"created_at": 1714329600,
"last_turn_at": 1714329700,
"turn_count": 5,
"closed_at": None,
"live": True,
"meta": {"task": "debug"},
},
status=200,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_GET, {"session_id": SID})
)
finally:
forge.close()
self.assertFalse(is_error)
body = json.loads(content[0].text)
self.assertEqual(body["session_id"], SID)
self.assertEqual(body["turn_count"], 5)
self.assertTrue(body["live"])
self.assertEqual(body["meta"], {"task": "debug"})
# Whitelist: nine known keys, nothing else.
self.assertEqual(
set(body.keys()),
{
"session_id",
"agent",
"cwd",
"created_at",
"last_turn_at",
"turn_count",
"closed_at",
"live",
"meta",
},
)
@responses.activate
def test_session_get_404_surfaces_as_mcp_error(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/sessions/{SID}",
json={"detail": "session not found"},
status=404,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_GET, {"session_id": SID})
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("404", content[0].text)
self.assertIn("session not found", content[0].text)
class TestSessionToolSchemas(unittest.TestCase):
"""Schema-level checks — the MCP client validates input before dispatch."""
def _tool(self, name: str):
return next(t for t in _tool_definitions() if t.name == name)
def test_session_new_schema_optional_fields(self) -> None:
s = self._tool(TOOL_SESSION_NEW).inputSchema
# No required fields — agent defaults to claude, meta is optional.
self.assertNotIn("required", s)
self.assertFalse(s.get("additionalProperties", True))
# Empty input is valid.
jsonschema.validate({}, s)
jsonschema.validate({"agent": "claude"}, s)
jsonschema.validate({"agent": "claude", "meta": {"k": "v"}}, s)
def test_session_turn_schema_requires_session_id_and_prompt(self) -> None:
s = self._tool(TOOL_SESSION_TURN).inputSchema
self.assertEqual(sorted(s["required"]), ["prompt", "session_id"])
# files items still pinned to the v0.1 ff_-prefixed token pattern.
self.assertEqual(
s["properties"]["files"]["items"]["pattern"], r"^ff_[A-Za-z0-9_-]+$"
)
with self.assertRaises(jsonschema.ValidationError):
jsonschema.validate({"prompt": "hi"}, s) # missing session_id
with self.assertRaises(jsonschema.ValidationError):
jsonschema.validate({"session_id": SID}, s) # missing prompt
def test_session_close_schema_requires_session_id(self) -> None:
s = self._tool(TOOL_SESSION_CLOSE).inputSchema
self.assertEqual(s["required"], ["session_id"])
with self.assertRaises(jsonschema.ValidationError):
jsonschema.validate({}, s)
def test_session_list_schema_no_required_fields(self) -> None:
s = self._tool(TOOL_SESSION_LIST).inputSchema
self.assertNotIn("required", s)
jsonschema.validate({}, s)
jsonschema.validate({"include_closed": False}, s)
def test_session_get_schema_requires_session_id(self) -> None:
s = self._tool(TOOL_SESSION_GET).inputSchema
self.assertEqual(s["required"], ["session_id"])
with self.assertRaises(jsonschema.ValidationError):
jsonschema.validate({}, s)
if __name__ == "__main__": # pragma: no cover
unittest.main()