clawdforge/clients/mcp/tests/test_server.py
Kayos 5737903217 clients/mcp: v0.2 multi-turn session tools
- clawdforge_session_new / _turn / _close / _list / _get
- Wraps the v0.2 session HTTP surface (POST /sessions, POST
  /sessions/{id}/turn, GET/DELETE /sessions/{id}, GET /sessions)
- Tool descriptions tuned for LLM consumption: when to prefer session_new
  vs run, idempotency contract on close, file_token attachment via files[]
- session_turn returns two content blocks: prose text (concat'd text events)
  for direct LLM consumption + structured trace JSON (turn_index,
  stop_reason, duration_ms, events) for tool-calling agents
- 404/410/auth errors from upstream surface as MCP errors with actionable
  messages; no Python tracebacks leak through
- tests/test_sessions.py: 22 new tests covering the 5 tools + 404 + schema
  validation + idempotent close
- tests/test_server.py: new v0.1 schema-pin regression test
- README "Sessions (v0.2)" section with example open/turn/turn/close chain
- Bump version 0.1.0 -> 0.2.0

v0.1 tools (clawdforge_healthz / _run / _upload_file) are byte-identical.

Spec: memory/spec-clawdforge-v0.2.md
Server core: 940861f
2026-04-29 06:49:49 -07:00

609 lines
22 KiB
Python

"""Tests for clawdforge-mcp.
We test at two levels:
1. Tool-list discovery and tool-dispatch logic (``_dispatch``) — exercises
the actual call paths an MCP client would hit, with the HTTP layer
mocked via ``responses``.
2. The :func:`build_server` factory — sanity check that the SDK accepts
our wiring.
We deliberately avoid spinning up a real stdio transport; that's the
SDK's territory and adds nothing on top of testing dispatch directly.
"""
from __future__ import annotations
import asyncio
import json
import os
import re
import tempfile
import unittest
from unittest import mock
import jsonschema
import responses
from clawdforge_mcp.client import ForgeClient
from clawdforge_mcp.server import (
TOOL_HEALTHZ,
TOOL_RUN,
TOOL_SESSION_CLOSE,
TOOL_SESSION_GET,
TOOL_SESSION_LIST,
TOOL_SESSION_NEW,
TOOL_SESSION_TURN,
TOOL_UPLOAD,
_dispatch,
_tool_definitions,
build_server,
)
BASE_URL = "http://192.168.0.5:8800"
TOKEN = "cf_test_token_xxxxxxxx"
def _client() -> ForgeClient:
# Short timeout so a hung test fails fast rather than hanging CI.
return ForgeClient(base_url=BASE_URL, token=TOKEN, default_timeout_secs=10)
def _run(coro):
return asyncio.run(coro)
class TestToolDiscovery(unittest.TestCase):
"""The MCP client calls list_tools first to discover capabilities."""
def test_all_tools_with_valid_schemas(self) -> None:
tools = _tool_definitions()
names = [t.name for t in tools]
# v0.1 had 3 tools; v0.2 adds 5 session tools (additive).
self.assertEqual(
sorted(names),
sorted(
[
TOOL_HEALTHZ,
TOOL_RUN,
TOOL_UPLOAD,
TOOL_SESSION_NEW,
TOOL_SESSION_TURN,
TOOL_SESSION_CLOSE,
TOOL_SESSION_LIST,
TOOL_SESSION_GET,
]
),
)
for t in tools:
# Every tool must have a non-empty description (the LLM uses
# this to pick the tool) and a JSON Schema input definition.
self.assertTrue(t.description and len(t.description) > 20, t.name)
self.assertEqual(t.inputSchema.get("type"), "object", t.name)
# Top-level should explicitly forbid extra args so the LLM
# doesn't get encouraged to invent keys.
self.assertFalse(
t.inputSchema.get("additionalProperties", True),
f"{t.name} should set additionalProperties=False",
)
def test_v0_1_tool_schemas_unchanged(self) -> None:
"""Regression: v0.2 must not mutate the v0.1 tool surface.
We pin the exact (name, required-args, schema-version-distinguishing)
properties of healthz / run / upload_file so a future refactor
can't silently break v0.1 callers.
"""
tools = {t.name: t for t in _tool_definitions()}
# healthz: zero args, additionalProperties=False.
h = tools[TOOL_HEALTHZ].inputSchema
self.assertEqual(h.get("properties", {}), {})
self.assertFalse(h.get("additionalProperties", True))
# run: required=['prompt'], same five properties as v0.1, files
# pattern unchanged, timeout_secs range unchanged.
r = tools[TOOL_RUN].inputSchema
self.assertEqual(r["required"], ["prompt"])
self.assertEqual(
sorted(r["properties"].keys()),
sorted(["prompt", "model", "system", "files", "timeout_secs"]),
)
self.assertEqual(
r["properties"]["files"]["items"]["pattern"], r"^ff_[A-Za-z0-9_-]+$"
)
self.assertEqual(r["properties"]["timeout_secs"]["minimum"], 5)
self.assertEqual(r["properties"]["timeout_secs"]["maximum"], 600)
# upload_file: required=['path'], two properties.
u = tools[TOOL_UPLOAD].inputSchema
self.assertEqual(u["required"], ["path"])
self.assertEqual(
sorted(u["properties"].keys()), sorted(["path", "ttl_secs"])
)
def test_run_schema_requires_prompt(self) -> None:
run_tool = next(t for t in _tool_definitions() if t.name == TOOL_RUN)
self.assertIn("prompt", run_tool.inputSchema["required"])
self.assertIn("files", run_tool.inputSchema["properties"])
self.assertEqual(
run_tool.inputSchema["properties"]["timeout_secs"]["maximum"], 600
)
class TestHealthzDispatch(unittest.TestCase):
@responses.activate
def test_healthz_ok(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/healthz",
json={"ok": True, "claude_present": True, "claude_version": "1.2.3"},
status=200,
)
forge = _client()
try:
content, is_error = _run(_dispatch(forge, TOOL_HEALTHZ, {}))
finally:
forge.close()
self.assertFalse(is_error)
self.assertEqual(len(content), 1)
body = json.loads(content[0].text)
self.assertTrue(body["ok"])
self.assertEqual(body["claude_version"], "1.2.3")
class TestRunDispatch(unittest.TestCase):
@responses.activate
def test_run_success(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={
"ok": True,
"result": {"hello": "world"},
"duration_ms": 1234,
"stop_reason": "end_turn",
},
status=200,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_RUN, {"prompt": "say hi"})
)
finally:
forge.close()
self.assertFalse(is_error)
body = json.loads(content[0].text)
self.assertEqual(body["result"], {"hello": "world"})
self.assertEqual(body["duration_ms"], 1234)
self.assertEqual(body["stop_reason"], "end_turn")
@responses.activate
def test_run_with_files_passes_through(self) -> None:
captured = {}
def cb(request):
captured["body"] = json.loads(request.body)
return (
200,
{},
json.dumps(
{
"ok": True,
"result": "fine",
"duration_ms": 10,
"stop_reason": "end_turn",
}
),
)
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_RUN,
{
"prompt": "summarize",
"files": ["ff_aaa", "ff_bbb"],
"model": "opus",
"system": "be terse",
"timeout_secs": 90,
},
)
)
finally:
forge.close()
self.assertFalse(is_error)
self.assertEqual(captured["body"]["files"], ["ff_aaa", "ff_bbb"])
self.assertEqual(captured["body"]["model"], "opus")
self.assertEqual(captured["body"]["system"], "be terse")
self.assertEqual(captured["body"]["timeout_secs"], 90)
def test_run_rejects_empty_prompt(self) -> None:
forge = _client()
try:
content, is_error = _run(_dispatch(forge, TOOL_RUN, {"prompt": ""}))
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("prompt", content[0].text.lower())
@responses.activate
def test_run_subprocess_error_surfaces_clean_message(self) -> None:
# The clawdforge server returns 502 with an `error` body when the
# subprocess fails. Verify we surface that without a traceback.
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={
"ok": False,
"error": "claude exited 1",
"stderr": "...",
"duration_ms": 200,
"stop_reason": None,
},
status=502,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_RUN, {"prompt": "boom"})
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("claude exited 1", content[0].text)
self.assertNotIn("Traceback", content[0].text)
@responses.activate
def test_run_auth_failure_surfaces_actionable_message(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={"detail": "bad token"},
status=401,
)
forge = _client()
try:
content, is_error = _run(_dispatch(forge, TOOL_RUN, {"prompt": "hi"}))
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("auth failed", content[0].text)
self.assertIn("CLAWDFORGE_TOKEN", content[0].text)
class TestUploadDispatch(unittest.TestCase):
@responses.activate
def test_upload_happy_path(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/files",
json={"file_token": "ff_abc123", "ttl_secs": 3600, "size": 11},
status=200,
)
with tempfile.TemporaryDirectory() as root:
tmp_path = os.path.join(root, "hello.txt")
with open(tmp_path, "wb") as fh:
fh.write(b"hello world")
with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": root}):
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge, TOOL_UPLOAD, {"path": tmp_path, "ttl_secs": 600}
)
)
finally:
forge.close()
self.assertFalse(is_error)
body = json.loads(content[0].text)
self.assertEqual(body["file_token"], "ff_abc123")
# Whitelist (S4): only file_token / ttl_secs / size are surfaced.
self.assertEqual(set(body.keys()), {"file_token", "ttl_secs", "size"})
def test_upload_missing_file_returns_error(self) -> None:
with tempfile.TemporaryDirectory() as root:
with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": root}):
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_UPLOAD,
{
"path": os.path.join(
root, "definitely-not-here-zzz.txt"
)
},
)
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("does not exist", content[0].text)
def test_upload_path_outside_root_rejected_traversal(self) -> None:
"""A `..` path that resolves outside CLAWDFORGE_UPLOAD_ROOT is refused."""
with tempfile.TemporaryDirectory() as outer:
inner = os.path.join(outer, "inner")
os.mkdir(inner)
# Real file lives in outer (escape target).
secret = os.path.join(outer, "secret.txt")
with open(secret, "wb") as fh:
fh.write(b"sekret")
# Root is the inner dir; the escape goes via `..`.
with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": inner}):
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_UPLOAD,
{"path": os.path.join(inner, "..", "secret.txt")},
)
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("outside", content[0].text)
def test_upload_path_outside_root_rejected_symlink(self) -> None:
"""A symlink whose target is outside the root is refused after resolve."""
with tempfile.TemporaryDirectory() as outer:
inner = os.path.join(outer, "inner")
os.mkdir(inner)
secret = os.path.join(outer, "id_rsa")
with open(secret, "wb") as fh:
fh.write(b"private-key-bytes")
# Symlink lives inside root, target lives outside.
link = os.path.join(inner, "looks-innocent.txt")
os.symlink(secret, link)
with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": inner}):
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_UPLOAD, {"path": link})
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("outside", content[0].text)
def test_upload_oversize_rejected(self) -> None:
"""File at limit+1 byte is rejected by CLAWDFORGE_UPLOAD_MAX_BYTES."""
with tempfile.TemporaryDirectory() as root:
target = os.path.join(root, "big.bin")
with open(target, "wb") as fh:
fh.write(b"x" * 16) # 16 bytes
with mock.patch.dict(
os.environ,
{
"CLAWDFORGE_UPLOAD_ROOT": root,
"CLAWDFORGE_UPLOAD_MAX_BYTES": "15",
},
):
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_UPLOAD, {"path": target})
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("CLAWDFORGE_UPLOAD_MAX_BYTES", content[0].text)
def test_upload_non_file_rejected(self) -> None:
"""Pointing the upload at a directory (or FIFO etc.) is refused."""
with tempfile.TemporaryDirectory() as root:
sub = os.path.join(root, "subdir")
os.mkdir(sub)
with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": root}):
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_UPLOAD, {"path": sub})
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("not a regular file", content[0].text)
class TestDispatchCorrectness(unittest.TestCase):
"""C1 / C2 — defense-in-depth typing checks in _dispatch."""
def test_dispatch_strict_bool_guard_timeout_secs(self) -> None:
"""bool is a subclass of int — must not slip past the runtime guard."""
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_RUN,
{"prompt": "hi", "timeout_secs": True},
)
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("timeout_secs", content[0].text)
def test_dispatch_strict_bool_guard_ttl_secs(self) -> None:
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_UPLOAD,
{"path": "/tmp/whatever", "ttl_secs": True},
)
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("ttl_secs", content[0].text)
@responses.activate
def test_dispatch_empty_string_model_coerced(self) -> None:
"""Empty model/system string must be coerced to None upstream."""
captured: dict = {}
def cb(request):
captured["body"] = json.loads(request.body)
return (
200,
{},
json.dumps(
{
"ok": True,
"result": "fine",
"duration_ms": 1,
"stop_reason": "end_turn",
}
),
)
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_RUN,
{"prompt": "hi", "model": "", "system": ""},
)
)
finally:
forge.close()
self.assertFalse(is_error)
# Empty strings must NOT have been forwarded — they should be absent
# from the request body entirely (forge.run only sets the keys when
# they are not None).
self.assertNotIn("model", captured["body"])
self.assertNotIn("system", captured["body"])
class TestErrorLeakProtection(unittest.TestCase):
"""S3 — catch-all internal error must not echo str(e) (host paths)."""
def test_unexpected_error_does_not_leak_path(self) -> None:
# Force forge.healthz to raise a builtin whose str() contains a
# host path. The catch-all in _dispatch must collapse it.
forge = _client()
try:
with mock.patch.object(
forge,
"healthz",
side_effect=FileNotFoundError(
2, "No such file", "/home/cobb/.ssh/id_rsa"
),
):
content, is_error = _run(_dispatch(forge, TOOL_HEALTHZ, {}))
finally:
forge.close()
self.assertTrue(is_error)
text = content[0].text
self.assertIn("unexpected internal error", text)
self.assertIn("FileNotFoundError", text)
self.assertNotIn("/home/cobb/.ssh/id_rsa", text)
self.assertNotIn("No such file", text)
class TestHealthzWhitelist(unittest.TestCase):
"""S4 — extra fields from the upstream forge must be stripped."""
@responses.activate
def test_healthz_response_whitelisted(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/healthz",
json={
"ok": True,
"claude_present": True,
"claude_version": "1.2.3",
# Sneaky future fields the forge might add — must NOT pass through.
"internal_hostname": "lucy.lan",
"pid": 4242,
"secret_kv": {"k": "v"},
},
status=200,
)
forge = _client()
try:
content, is_error = _run(_dispatch(forge, TOOL_HEALTHZ, {}))
finally:
forge.close()
self.assertFalse(is_error)
body = json.loads(content[0].text)
self.assertEqual(
set(body.keys()), {"ok", "claude_present", "claude_version"}
)
self.assertNotIn("lucy.lan", content[0].text)
self.assertNotIn("4242", content[0].text)
class TestFilesPatternValidation(unittest.TestCase):
"""S5 — `files` array items must match ^ff_[A-Za-z0-9_-]+$."""
def _files_item_pattern(self) -> str:
run_tool = next(t for t in _tool_definitions() if t.name == TOOL_RUN)
return run_tool.inputSchema["properties"]["files"]["items"]["pattern"]
def test_pattern_present(self) -> None:
self.assertEqual(self._files_item_pattern(), r"^ff_[A-Za-z0-9_-]+$")
def test_schema_rejects_malformed_token(self) -> None:
run_tool = next(t for t in _tool_definitions() if t.name == TOOL_RUN)
# Malformed tokens — wrong prefix, invalid chars, empty body.
for bad in ["bad", "ff_", "FF_aaa", "ff_with space", "../etc/passwd"]:
with self.assertRaises(jsonschema.ValidationError, msg=bad):
jsonschema.validate(
{"prompt": "hi", "files": [bad]}, run_tool.inputSchema
)
def test_schema_accepts_valid_token(self) -> None:
run_tool = next(t for t in _tool_definitions() if t.name == TOOL_RUN)
for good in ["ff_abc123", "ff_AB-cd_EF-gh", "ff_a"]:
jsonschema.validate(
{"prompt": "hi", "files": [good]}, run_tool.inputSchema
)
def test_pattern_compiles_as_regex(self) -> None:
# Sanity: ensure the schema string is a valid regex that matches
# the canonical token shape produced by the forge.
self.assertIsNotNone(re.match(self._files_item_pattern(), "ff_abc_123"))
class TestUnknownTool(unittest.TestCase):
def test_unknown_tool_returns_error(self) -> None:
forge = _client()
try:
content, is_error = _run(_dispatch(forge, "not_a_tool", {}))
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("unknown tool", content[0].text)
class TestServerFactory(unittest.TestCase):
"""Smoke test that the SDK accepts our wiring."""
def test_build_server_returns_named_server(self) -> None:
forge = _client()
try:
server = build_server(forge)
# The Server class exposes a `name` attribute set in __init__.
self.assertEqual(server.name, "clawdforge-mcp")
# Initialization options should populate without error.
init = server.create_initialization_options()
self.assertEqual(init.server_name, "clawdforge-mcp")
finally:
forge.close()
if __name__ == "__main__": # pragma: no cover
unittest.main()