- clawdforge_session_new / _turn / _close / _list / _get
- Wraps the v0.2 session HTTP surface (POST /sessions, POST
/sessions/{id}/turn, GET/DELETE /sessions/{id}, GET /sessions)
- Tool descriptions tuned for LLM consumption: when to prefer session_new
vs run, idempotency contract on close, file_token attachment via files[]
- session_turn returns two content blocks: prose text (concat'd text events)
for direct LLM consumption + structured trace JSON (turn_index,
stop_reason, duration_ms, events) for tool-calling agents
- 404/410/auth errors from upstream surface as MCP errors with actionable
messages; no Python tracebacks leak through
- tests/test_sessions.py: 22 new tests covering the 5 tools + 404 + schema
validation + idempotent close
- tests/test_server.py: new v0.1 schema-pin regression test
- README "Sessions (v0.2)" section with example open/turn/turn/close chain
- Bump version 0.1.0 -> 0.2.0
v0.1 tools (clawdforge_healthz / _run / _upload_file) are byte-identical.
Spec: memory/spec-clawdforge-v0.2.md
Server core: 940861f
553 lines
18 KiB
Python
553 lines
18 KiB
Python
"""Tests for the v0.2 multi-turn session tools.
|
|
|
|
Mirrors the structure of test_server.py — HTTP layer mocked via
|
|
``responses``, the dispatch function exercised directly.
|
|
|
|
Covers:
|
|
|
|
- ``clawdforge_session_new`` happy path → returns session_id
|
|
- ``clawdforge_session_turn`` round-trip → text content present in block 1,
|
|
structured events in block 2
|
|
- ``clawdforge_session_close`` idempotency (already_closed=True surfaces)
|
|
- ``clawdforge_session_list`` returns array of sessions
|
|
- ``clawdforge_session_get`` returns state
|
|
- 404 from /sessions/{id}/turn surfaces as MCP error with actionable text
|
|
|
|
The v0.1 tools' regression coverage lives in ``test_server.py`` and stays
|
|
intact; this file is purely additive.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import unittest
|
|
|
|
import jsonschema
|
|
import responses
|
|
|
|
from clawdforge_mcp.client import ForgeClient
|
|
from clawdforge_mcp.server import (
|
|
TOOL_SESSION_CLOSE,
|
|
TOOL_SESSION_GET,
|
|
TOOL_SESSION_LIST,
|
|
TOOL_SESSION_NEW,
|
|
TOOL_SESSION_TURN,
|
|
_dispatch,
|
|
_tool_definitions,
|
|
)
|
|
|
|
|
|
BASE_URL = "http://192.168.0.5:8800"
|
|
TOKEN = "cf_test_token_xxxxxxxx"
|
|
SID = "01HV9P1234567890ABCDEFGHJK" # ULID-ish; server returns whatever acpx hands back
|
|
|
|
|
|
def _client() -> ForgeClient:
|
|
return ForgeClient(base_url=BASE_URL, token=TOKEN, default_timeout_secs=10)
|
|
|
|
|
|
def _run(coro):
|
|
return asyncio.run(coro)
|
|
|
|
|
|
class TestSessionNewDispatch(unittest.TestCase):
|
|
@responses.activate
|
|
def test_session_new_returns_session_id(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/sessions",
|
|
json={
|
|
"ok": True,
|
|
"session_id": SID,
|
|
"agent": "claude",
|
|
"created_at": 1714329600,
|
|
"cwd": "/tmp/acpx-sessions/01HV.../cwd",
|
|
},
|
|
status=200,
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(forge, TOOL_SESSION_NEW, {"agent": "claude"})
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
self.assertEqual(len(content), 1)
|
|
body = json.loads(content[0].text)
|
|
self.assertEqual(body["session_id"], SID)
|
|
self.assertEqual(body["agent"], "claude")
|
|
self.assertEqual(body["created_at"], 1714329600)
|
|
# Whitelist: only the four useful fields are surfaced.
|
|
self.assertEqual(
|
|
set(body.keys()), {"session_id", "agent", "created_at", "cwd"}
|
|
)
|
|
|
|
@responses.activate
|
|
def test_session_new_with_meta_passes_through(self) -> None:
|
|
captured: dict = {}
|
|
|
|
def cb(request):
|
|
captured["body"] = json.loads(request.body)
|
|
return (
|
|
200,
|
|
{},
|
|
json.dumps(
|
|
{
|
|
"ok": True,
|
|
"session_id": SID,
|
|
"agent": "claude",
|
|
"created_at": 1,
|
|
}
|
|
),
|
|
)
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=cb)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge,
|
|
TOOL_SESSION_NEW,
|
|
{"agent": "claude", "meta": {"task": "debug"}},
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
self.assertEqual(captured["body"]["agent"], "claude")
|
|
self.assertEqual(captured["body"]["meta"], {"task": "debug"})
|
|
|
|
def test_session_new_rejects_non_dict_meta(self) -> None:
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(forge, TOOL_SESSION_NEW, {"meta": "not-a-dict"})
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("meta", content[0].text)
|
|
|
|
|
|
class TestSessionTurnDispatch(unittest.TestCase):
|
|
@responses.activate
|
|
def test_session_turn_round_trip(self) -> None:
|
|
"""The turn returns two content blocks: prose text + structured trace."""
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/sessions/{SID}/turn",
|
|
json={
|
|
"ok": True,
|
|
"session_id": SID,
|
|
"turn_index": 1,
|
|
"events": [
|
|
{"type": "thinking", "content": "let me look..."},
|
|
{"type": "text", "content": "Hello back. "},
|
|
{"type": "text", "content": "Anything else?"},
|
|
],
|
|
"stop_reason": "end_turn",
|
|
"duration_ms": 4321,
|
|
},
|
|
status=200,
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge,
|
|
TOOL_SESSION_TURN,
|
|
{"session_id": SID, "prompt": "say hi"},
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
self.assertEqual(len(content), 2)
|
|
# Block 1 is the prose — concatenation of text events.
|
|
self.assertEqual(content[0].text, "Hello back. Anything else?")
|
|
# Block 2 is the structured trace as JSON.
|
|
trace = json.loads(content[1].text)
|
|
self.assertEqual(trace["session_id"], SID)
|
|
self.assertEqual(trace["turn_index"], 1)
|
|
self.assertEqual(trace["stop_reason"], "end_turn")
|
|
self.assertEqual(trace["duration_ms"], 4321)
|
|
self.assertEqual(len(trace["events"]), 3)
|
|
# Thinking events ARE preserved in the trace block (callers may
|
|
# want to introspect tool-calling behavior).
|
|
self.assertEqual(trace["events"][0]["type"], "thinking")
|
|
|
|
@responses.activate
|
|
def test_session_turn_with_files_and_timeout_passes_through(self) -> None:
|
|
captured: dict = {}
|
|
|
|
def cb(request):
|
|
captured["body"] = json.loads(request.body)
|
|
return (
|
|
200,
|
|
{},
|
|
json.dumps(
|
|
{
|
|
"ok": True,
|
|
"session_id": SID,
|
|
"turn_index": 2,
|
|
"events": [{"type": "text", "content": "ok"}],
|
|
"stop_reason": "end_turn",
|
|
"duration_ms": 1,
|
|
}
|
|
),
|
|
)
|
|
|
|
responses.add_callback(
|
|
responses.POST, f"{BASE_URL}/sessions/{SID}/turn", callback=cb
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge,
|
|
TOOL_SESSION_TURN,
|
|
{
|
|
"session_id": SID,
|
|
"prompt": "summarize",
|
|
"files": ["ff_aaa", "ff_bbb"],
|
|
"timeout_secs": 90,
|
|
},
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
self.assertEqual(captured["body"]["files"], ["ff_aaa", "ff_bbb"])
|
|
self.assertEqual(captured["body"]["timeout_secs"], 90)
|
|
|
|
def test_session_turn_rejects_missing_session_id(self) -> None:
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(forge, TOOL_SESSION_TURN, {"prompt": "hi"})
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("session_id", content[0].text)
|
|
|
|
def test_session_turn_rejects_empty_prompt(self) -> None:
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge,
|
|
TOOL_SESSION_TURN,
|
|
{"session_id": SID, "prompt": ""},
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("prompt", content[0].text.lower())
|
|
|
|
def test_session_turn_strict_bool_guard_timeout_secs(self) -> None:
|
|
"""bool is a subclass of int — must not slip past the runtime guard."""
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge,
|
|
TOOL_SESSION_TURN,
|
|
{"session_id": SID, "prompt": "hi", "timeout_secs": True},
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("timeout_secs", content[0].text)
|
|
|
|
@responses.activate
|
|
def test_session_turn_404_surfaces_as_mcp_error(self) -> None:
|
|
"""404 from upstream → isError=True with actionable message."""
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/sessions/{SID}/turn",
|
|
json={"detail": "session not found"},
|
|
status=404,
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge,
|
|
TOOL_SESSION_TURN,
|
|
{"session_id": SID, "prompt": "hi"},
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("404", content[0].text)
|
|
self.assertIn("session not found", content[0].text)
|
|
# Defense-in-depth: no Python traceback leaks through.
|
|
self.assertNotIn("Traceback", content[0].text)
|
|
|
|
|
|
class TestSessionCloseDispatch(unittest.TestCase):
|
|
@responses.activate
|
|
def test_session_close_first_call(self) -> None:
|
|
responses.add(
|
|
responses.DELETE,
|
|
f"{BASE_URL}/sessions/{SID}",
|
|
json={"ok": True},
|
|
status=200,
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(forge, TOOL_SESSION_CLOSE, {"session_id": SID})
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
body = json.loads(content[0].text)
|
|
self.assertTrue(body["ok"])
|
|
# First close: server doesn't set already_closed, so we don't either.
|
|
self.assertNotIn("already_closed", body)
|
|
|
|
@responses.activate
|
|
def test_session_close_idempotent_second_call(self) -> None:
|
|
"""Server returns {ok: true, already_closed: true} on a re-close —
|
|
we surface that flag verbatim. The MCP tool doesn't error on
|
|
re-close (it's the documented contract for idempotency)."""
|
|
responses.add(
|
|
responses.DELETE,
|
|
f"{BASE_URL}/sessions/{SID}",
|
|
json={"ok": True, "already_closed": True},
|
|
status=200,
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(forge, TOOL_SESSION_CLOSE, {"session_id": SID})
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
body = json.loads(content[0].text)
|
|
self.assertTrue(body["ok"])
|
|
self.assertTrue(body["already_closed"])
|
|
|
|
def test_session_close_rejects_missing_session_id(self) -> None:
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(forge, TOOL_SESSION_CLOSE, {})
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("session_id", content[0].text)
|
|
|
|
|
|
class TestSessionListDispatch(unittest.TestCase):
|
|
@responses.activate
|
|
def test_session_list_returns_array(self) -> None:
|
|
responses.add(
|
|
responses.GET,
|
|
f"{BASE_URL}/sessions",
|
|
json={
|
|
"ok": True,
|
|
"sessions": [
|
|
{
|
|
"session_id": SID,
|
|
"agent": "claude",
|
|
"app_name": "mcp-test",
|
|
"created_at": 1714329600,
|
|
"last_turn_at": 1714329700,
|
|
"turn_count": 3,
|
|
"closed_at": None,
|
|
},
|
|
{
|
|
"session_id": "older-session-id",
|
|
"agent": "claude",
|
|
"app_name": "mcp-test",
|
|
"created_at": 1714329000,
|
|
"last_turn_at": 1714329100,
|
|
"turn_count": 1,
|
|
"closed_at": 1714329500,
|
|
},
|
|
],
|
|
"count": 2,
|
|
},
|
|
status=200,
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(_dispatch(forge, TOOL_SESSION_LIST, {}))
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
body = json.loads(content[0].text)
|
|
self.assertIsInstance(body["sessions"], list)
|
|
self.assertEqual(body["count"], 2)
|
|
self.assertEqual(body["sessions"][0]["session_id"], SID)
|
|
self.assertEqual(body["sessions"][1]["closed_at"], 1714329500)
|
|
|
|
@responses.activate
|
|
def test_session_list_include_closed_false_passes_query_param(self) -> None:
|
|
"""include_closed=false must turn into ?include_closed=false on the wire."""
|
|
captured: dict = {}
|
|
|
|
def cb(request):
|
|
captured["url"] = request.url
|
|
return (
|
|
200,
|
|
{},
|
|
json.dumps({"ok": True, "sessions": [], "count": 0}),
|
|
)
|
|
|
|
responses.add_callback(
|
|
responses.GET, f"{BASE_URL}/sessions", callback=cb
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge, TOOL_SESSION_LIST, {"include_closed": False}
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
self.assertIn("include_closed=false", captured["url"])
|
|
|
|
def test_session_list_rejects_non_bool_include_closed(self) -> None:
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge, TOOL_SESSION_LIST, {"include_closed": "false"}
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("include_closed", content[0].text)
|
|
|
|
|
|
class TestSessionGetDispatch(unittest.TestCase):
|
|
@responses.activate
|
|
def test_session_get_returns_state(self) -> None:
|
|
responses.add(
|
|
responses.GET,
|
|
f"{BASE_URL}/sessions/{SID}",
|
|
json={
|
|
"ok": True,
|
|
"session_id": SID,
|
|
"agent": "claude",
|
|
"cwd": "/tmp/acpx/cwd",
|
|
"created_at": 1714329600,
|
|
"last_turn_at": 1714329700,
|
|
"turn_count": 5,
|
|
"closed_at": None,
|
|
"live": True,
|
|
"meta": {"task": "debug"},
|
|
},
|
|
status=200,
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(forge, TOOL_SESSION_GET, {"session_id": SID})
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
body = json.loads(content[0].text)
|
|
self.assertEqual(body["session_id"], SID)
|
|
self.assertEqual(body["turn_count"], 5)
|
|
self.assertTrue(body["live"])
|
|
self.assertEqual(body["meta"], {"task": "debug"})
|
|
# Whitelist: nine known keys, nothing else.
|
|
self.assertEqual(
|
|
set(body.keys()),
|
|
{
|
|
"session_id",
|
|
"agent",
|
|
"cwd",
|
|
"created_at",
|
|
"last_turn_at",
|
|
"turn_count",
|
|
"closed_at",
|
|
"live",
|
|
"meta",
|
|
},
|
|
)
|
|
|
|
@responses.activate
|
|
def test_session_get_404_surfaces_as_mcp_error(self) -> None:
|
|
responses.add(
|
|
responses.GET,
|
|
f"{BASE_URL}/sessions/{SID}",
|
|
json={"detail": "session not found"},
|
|
status=404,
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(forge, TOOL_SESSION_GET, {"session_id": SID})
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("404", content[0].text)
|
|
self.assertIn("session not found", content[0].text)
|
|
|
|
|
|
class TestSessionToolSchemas(unittest.TestCase):
|
|
"""Schema-level checks — the MCP client validates input before dispatch."""
|
|
|
|
def _tool(self, name: str):
|
|
return next(t for t in _tool_definitions() if t.name == name)
|
|
|
|
def test_session_new_schema_optional_fields(self) -> None:
|
|
s = self._tool(TOOL_SESSION_NEW).inputSchema
|
|
# No required fields — agent defaults to claude, meta is optional.
|
|
self.assertNotIn("required", s)
|
|
self.assertFalse(s.get("additionalProperties", True))
|
|
# Empty input is valid.
|
|
jsonschema.validate({}, s)
|
|
jsonschema.validate({"agent": "claude"}, s)
|
|
jsonschema.validate({"agent": "claude", "meta": {"k": "v"}}, s)
|
|
|
|
def test_session_turn_schema_requires_session_id_and_prompt(self) -> None:
|
|
s = self._tool(TOOL_SESSION_TURN).inputSchema
|
|
self.assertEqual(sorted(s["required"]), ["prompt", "session_id"])
|
|
# files items still pinned to the v0.1 ff_-prefixed token pattern.
|
|
self.assertEqual(
|
|
s["properties"]["files"]["items"]["pattern"], r"^ff_[A-Za-z0-9_-]+$"
|
|
)
|
|
with self.assertRaises(jsonschema.ValidationError):
|
|
jsonschema.validate({"prompt": "hi"}, s) # missing session_id
|
|
with self.assertRaises(jsonschema.ValidationError):
|
|
jsonschema.validate({"session_id": SID}, s) # missing prompt
|
|
|
|
def test_session_close_schema_requires_session_id(self) -> None:
|
|
s = self._tool(TOOL_SESSION_CLOSE).inputSchema
|
|
self.assertEqual(s["required"], ["session_id"])
|
|
with self.assertRaises(jsonschema.ValidationError):
|
|
jsonschema.validate({}, s)
|
|
|
|
def test_session_list_schema_no_required_fields(self) -> None:
|
|
s = self._tool(TOOL_SESSION_LIST).inputSchema
|
|
self.assertNotIn("required", s)
|
|
jsonschema.validate({}, s)
|
|
jsonschema.validate({"include_closed": False}, s)
|
|
|
|
def test_session_get_schema_requires_session_id(self) -> None:
|
|
s = self._tool(TOOL_SESSION_GET).inputSchema
|
|
self.assertEqual(s["required"], ["session_id"])
|
|
with self.assertRaises(jsonschema.ValidationError):
|
|
jsonschema.validate({}, s)
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
unittest.main()
|