HIGH: - S1: upload_file allow-root + symlink-resolve + size-cap. Env: CLAWDFORGE_UPLOAD_ROOT (default cwd), CLAWDFORGE_UPLOAD_MAX_BYTES (default 100MiB). README updated with threat-model paragraph. LOW: - S2: logger.propagate = False (stdout discipline defense-in-depth) - S3: catch-all error message no longer echoes str(e) (host paths) - S4: whitelist healthz/upload tool response fields - S5: pattern-validate ff_* file tokens in run schema - C1: strict-bool guard on timeout_secs/ttl_secs - C2: coerce empty-string model/system to None Deps: - requests>=2.32 (CVE-2024-35195) - urllib3>=2.2.2 (CVE-2024-37891) - mcp>=1.2.0 Audit: memory/clawdforge-audits/mcp-093021c.md
557 lines
20 KiB
Python
557 lines
20 KiB
Python
"""Tests for clawdforge-mcp.
|
|
|
|
We test at two levels:
|
|
|
|
1. Tool-list discovery and tool-dispatch logic (``_dispatch``) — exercises
|
|
the actual call paths an MCP client would hit, with the HTTP layer
|
|
mocked via ``responses``.
|
|
2. The :func:`build_server` factory — sanity check that the SDK accepts
|
|
our wiring.
|
|
|
|
We deliberately avoid spinning up a real stdio transport; that's the
|
|
SDK's territory and adds nothing on top of testing dispatch directly.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import re
|
|
import tempfile
|
|
import unittest
|
|
from unittest import mock
|
|
|
|
import jsonschema
|
|
import responses
|
|
|
|
from clawdforge_mcp.client import ForgeClient
|
|
from clawdforge_mcp.server import (
|
|
TOOL_HEALTHZ,
|
|
TOOL_RUN,
|
|
TOOL_UPLOAD,
|
|
_dispatch,
|
|
_tool_definitions,
|
|
build_server,
|
|
)
|
|
|
|
|
|
BASE_URL = "http://192.168.0.5:8800"
|
|
TOKEN = "cf_test_token_xxxxxxxx"
|
|
|
|
|
|
def _client() -> ForgeClient:
|
|
# Short timeout so a hung test fails fast rather than hanging CI.
|
|
return ForgeClient(base_url=BASE_URL, token=TOKEN, default_timeout_secs=10)
|
|
|
|
|
|
def _run(coro):
|
|
return asyncio.run(coro)
|
|
|
|
|
|
class TestToolDiscovery(unittest.TestCase):
|
|
"""The MCP client calls list_tools first to discover capabilities."""
|
|
|
|
def test_three_tools_with_valid_schemas(self) -> None:
|
|
tools = _tool_definitions()
|
|
names = [t.name for t in tools]
|
|
self.assertEqual(
|
|
sorted(names),
|
|
sorted([TOOL_HEALTHZ, TOOL_RUN, TOOL_UPLOAD]),
|
|
)
|
|
for t in tools:
|
|
# Every tool must have a non-empty description (the LLM uses
|
|
# this to pick the tool) and a JSON Schema input definition.
|
|
self.assertTrue(t.description and len(t.description) > 20, t.name)
|
|
self.assertEqual(t.inputSchema.get("type"), "object", t.name)
|
|
# Top-level should explicitly forbid extra args so the LLM
|
|
# doesn't get encouraged to invent keys.
|
|
self.assertFalse(
|
|
t.inputSchema.get("additionalProperties", True),
|
|
f"{t.name} should set additionalProperties=False",
|
|
)
|
|
|
|
def test_run_schema_requires_prompt(self) -> None:
|
|
run_tool = next(t for t in _tool_definitions() if t.name == TOOL_RUN)
|
|
self.assertIn("prompt", run_tool.inputSchema["required"])
|
|
self.assertIn("files", run_tool.inputSchema["properties"])
|
|
self.assertEqual(
|
|
run_tool.inputSchema["properties"]["timeout_secs"]["maximum"], 600
|
|
)
|
|
|
|
|
|
class TestHealthzDispatch(unittest.TestCase):
|
|
@responses.activate
|
|
def test_healthz_ok(self) -> None:
|
|
responses.add(
|
|
responses.GET,
|
|
f"{BASE_URL}/healthz",
|
|
json={"ok": True, "claude_present": True, "claude_version": "1.2.3"},
|
|
status=200,
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(_dispatch(forge, TOOL_HEALTHZ, {}))
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
self.assertEqual(len(content), 1)
|
|
body = json.loads(content[0].text)
|
|
self.assertTrue(body["ok"])
|
|
self.assertEqual(body["claude_version"], "1.2.3")
|
|
|
|
|
|
class TestRunDispatch(unittest.TestCase):
|
|
@responses.activate
|
|
def test_run_success(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/run",
|
|
json={
|
|
"ok": True,
|
|
"result": {"hello": "world"},
|
|
"duration_ms": 1234,
|
|
"stop_reason": "end_turn",
|
|
},
|
|
status=200,
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(forge, TOOL_RUN, {"prompt": "say hi"})
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
body = json.loads(content[0].text)
|
|
self.assertEqual(body["result"], {"hello": "world"})
|
|
self.assertEqual(body["duration_ms"], 1234)
|
|
self.assertEqual(body["stop_reason"], "end_turn")
|
|
|
|
@responses.activate
|
|
def test_run_with_files_passes_through(self) -> None:
|
|
captured = {}
|
|
|
|
def cb(request):
|
|
captured["body"] = json.loads(request.body)
|
|
return (
|
|
200,
|
|
{},
|
|
json.dumps(
|
|
{
|
|
"ok": True,
|
|
"result": "fine",
|
|
"duration_ms": 10,
|
|
"stop_reason": "end_turn",
|
|
}
|
|
),
|
|
)
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge,
|
|
TOOL_RUN,
|
|
{
|
|
"prompt": "summarize",
|
|
"files": ["ff_aaa", "ff_bbb"],
|
|
"model": "opus",
|
|
"system": "be terse",
|
|
"timeout_secs": 90,
|
|
},
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
self.assertEqual(captured["body"]["files"], ["ff_aaa", "ff_bbb"])
|
|
self.assertEqual(captured["body"]["model"], "opus")
|
|
self.assertEqual(captured["body"]["system"], "be terse")
|
|
self.assertEqual(captured["body"]["timeout_secs"], 90)
|
|
|
|
def test_run_rejects_empty_prompt(self) -> None:
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(_dispatch(forge, TOOL_RUN, {"prompt": ""}))
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("prompt", content[0].text.lower())
|
|
|
|
@responses.activate
|
|
def test_run_subprocess_error_surfaces_clean_message(self) -> None:
|
|
# The clawdforge server returns 502 with an `error` body when the
|
|
# subprocess fails. Verify we surface that without a traceback.
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/run",
|
|
json={
|
|
"ok": False,
|
|
"error": "claude exited 1",
|
|
"stderr": "...",
|
|
"duration_ms": 200,
|
|
"stop_reason": None,
|
|
},
|
|
status=502,
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(forge, TOOL_RUN, {"prompt": "boom"})
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("claude exited 1", content[0].text)
|
|
self.assertNotIn("Traceback", content[0].text)
|
|
|
|
@responses.activate
|
|
def test_run_auth_failure_surfaces_actionable_message(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/run",
|
|
json={"detail": "bad token"},
|
|
status=401,
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(_dispatch(forge, TOOL_RUN, {"prompt": "hi"}))
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("auth failed", content[0].text)
|
|
self.assertIn("CLAWDFORGE_TOKEN", content[0].text)
|
|
|
|
|
|
class TestUploadDispatch(unittest.TestCase):
|
|
@responses.activate
|
|
def test_upload_happy_path(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/files",
|
|
json={"file_token": "ff_abc123", "ttl_secs": 3600, "size": 11},
|
|
status=200,
|
|
)
|
|
with tempfile.TemporaryDirectory() as root:
|
|
tmp_path = os.path.join(root, "hello.txt")
|
|
with open(tmp_path, "wb") as fh:
|
|
fh.write(b"hello world")
|
|
with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": root}):
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge, TOOL_UPLOAD, {"path": tmp_path, "ttl_secs": 600}
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
body = json.loads(content[0].text)
|
|
self.assertEqual(body["file_token"], "ff_abc123")
|
|
# Whitelist (S4): only file_token / ttl_secs / size are surfaced.
|
|
self.assertEqual(set(body.keys()), {"file_token", "ttl_secs", "size"})
|
|
|
|
def test_upload_missing_file_returns_error(self) -> None:
|
|
with tempfile.TemporaryDirectory() as root:
|
|
with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": root}):
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge,
|
|
TOOL_UPLOAD,
|
|
{
|
|
"path": os.path.join(
|
|
root, "definitely-not-here-zzz.txt"
|
|
)
|
|
},
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("does not exist", content[0].text)
|
|
|
|
def test_upload_path_outside_root_rejected_traversal(self) -> None:
|
|
"""A `..` path that resolves outside CLAWDFORGE_UPLOAD_ROOT is refused."""
|
|
with tempfile.TemporaryDirectory() as outer:
|
|
inner = os.path.join(outer, "inner")
|
|
os.mkdir(inner)
|
|
# Real file lives in outer (escape target).
|
|
secret = os.path.join(outer, "secret.txt")
|
|
with open(secret, "wb") as fh:
|
|
fh.write(b"sekret")
|
|
# Root is the inner dir; the escape goes via `..`.
|
|
with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": inner}):
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge,
|
|
TOOL_UPLOAD,
|
|
{"path": os.path.join(inner, "..", "secret.txt")},
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("outside", content[0].text)
|
|
|
|
def test_upload_path_outside_root_rejected_symlink(self) -> None:
|
|
"""A symlink whose target is outside the root is refused after resolve."""
|
|
with tempfile.TemporaryDirectory() as outer:
|
|
inner = os.path.join(outer, "inner")
|
|
os.mkdir(inner)
|
|
secret = os.path.join(outer, "id_rsa")
|
|
with open(secret, "wb") as fh:
|
|
fh.write(b"private-key-bytes")
|
|
# Symlink lives inside root, target lives outside.
|
|
link = os.path.join(inner, "looks-innocent.txt")
|
|
os.symlink(secret, link)
|
|
with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": inner}):
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(forge, TOOL_UPLOAD, {"path": link})
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("outside", content[0].text)
|
|
|
|
def test_upload_oversize_rejected(self) -> None:
|
|
"""File at limit+1 byte is rejected by CLAWDFORGE_UPLOAD_MAX_BYTES."""
|
|
with tempfile.TemporaryDirectory() as root:
|
|
target = os.path.join(root, "big.bin")
|
|
with open(target, "wb") as fh:
|
|
fh.write(b"x" * 16) # 16 bytes
|
|
with mock.patch.dict(
|
|
os.environ,
|
|
{
|
|
"CLAWDFORGE_UPLOAD_ROOT": root,
|
|
"CLAWDFORGE_UPLOAD_MAX_BYTES": "15",
|
|
},
|
|
):
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(forge, TOOL_UPLOAD, {"path": target})
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("CLAWDFORGE_UPLOAD_MAX_BYTES", content[0].text)
|
|
|
|
def test_upload_non_file_rejected(self) -> None:
|
|
"""Pointing the upload at a directory (or FIFO etc.) is refused."""
|
|
with tempfile.TemporaryDirectory() as root:
|
|
sub = os.path.join(root, "subdir")
|
|
os.mkdir(sub)
|
|
with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": root}):
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(forge, TOOL_UPLOAD, {"path": sub})
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("not a regular file", content[0].text)
|
|
|
|
|
|
class TestDispatchCorrectness(unittest.TestCase):
|
|
"""C1 / C2 — defense-in-depth typing checks in _dispatch."""
|
|
|
|
def test_dispatch_strict_bool_guard_timeout_secs(self) -> None:
|
|
"""bool is a subclass of int — must not slip past the runtime guard."""
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge,
|
|
TOOL_RUN,
|
|
{"prompt": "hi", "timeout_secs": True},
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("timeout_secs", content[0].text)
|
|
|
|
def test_dispatch_strict_bool_guard_ttl_secs(self) -> None:
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge,
|
|
TOOL_UPLOAD,
|
|
{"path": "/tmp/whatever", "ttl_secs": True},
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("ttl_secs", content[0].text)
|
|
|
|
@responses.activate
|
|
def test_dispatch_empty_string_model_coerced(self) -> None:
|
|
"""Empty model/system string must be coerced to None upstream."""
|
|
captured: dict = {}
|
|
|
|
def cb(request):
|
|
captured["body"] = json.loads(request.body)
|
|
return (
|
|
200,
|
|
{},
|
|
json.dumps(
|
|
{
|
|
"ok": True,
|
|
"result": "fine",
|
|
"duration_ms": 1,
|
|
"stop_reason": "end_turn",
|
|
}
|
|
),
|
|
)
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(
|
|
_dispatch(
|
|
forge,
|
|
TOOL_RUN,
|
|
{"prompt": "hi", "model": "", "system": ""},
|
|
)
|
|
)
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
# Empty strings must NOT have been forwarded — they should be absent
|
|
# from the request body entirely (forge.run only sets the keys when
|
|
# they are not None).
|
|
self.assertNotIn("model", captured["body"])
|
|
self.assertNotIn("system", captured["body"])
|
|
|
|
|
|
class TestErrorLeakProtection(unittest.TestCase):
|
|
"""S3 — catch-all internal error must not echo str(e) (host paths)."""
|
|
|
|
def test_unexpected_error_does_not_leak_path(self) -> None:
|
|
# Force forge.healthz to raise a builtin whose str() contains a
|
|
# host path. The catch-all in _dispatch must collapse it.
|
|
forge = _client()
|
|
try:
|
|
with mock.patch.object(
|
|
forge,
|
|
"healthz",
|
|
side_effect=FileNotFoundError(
|
|
2, "No such file", "/home/cobb/.ssh/id_rsa"
|
|
),
|
|
):
|
|
content, is_error = _run(_dispatch(forge, TOOL_HEALTHZ, {}))
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
text = content[0].text
|
|
self.assertIn("unexpected internal error", text)
|
|
self.assertIn("FileNotFoundError", text)
|
|
self.assertNotIn("/home/cobb/.ssh/id_rsa", text)
|
|
self.assertNotIn("No such file", text)
|
|
|
|
|
|
class TestHealthzWhitelist(unittest.TestCase):
|
|
"""S4 — extra fields from the upstream forge must be stripped."""
|
|
|
|
@responses.activate
|
|
def test_healthz_response_whitelisted(self) -> None:
|
|
responses.add(
|
|
responses.GET,
|
|
f"{BASE_URL}/healthz",
|
|
json={
|
|
"ok": True,
|
|
"claude_present": True,
|
|
"claude_version": "1.2.3",
|
|
# Sneaky future fields the forge might add — must NOT pass through.
|
|
"internal_hostname": "lucy.lan",
|
|
"pid": 4242,
|
|
"secret_kv": {"k": "v"},
|
|
},
|
|
status=200,
|
|
)
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(_dispatch(forge, TOOL_HEALTHZ, {}))
|
|
finally:
|
|
forge.close()
|
|
self.assertFalse(is_error)
|
|
body = json.loads(content[0].text)
|
|
self.assertEqual(
|
|
set(body.keys()), {"ok", "claude_present", "claude_version"}
|
|
)
|
|
self.assertNotIn("lucy.lan", content[0].text)
|
|
self.assertNotIn("4242", content[0].text)
|
|
|
|
|
|
class TestFilesPatternValidation(unittest.TestCase):
|
|
"""S5 — `files` array items must match ^ff_[A-Za-z0-9_-]+$."""
|
|
|
|
def _files_item_pattern(self) -> str:
|
|
run_tool = next(t for t in _tool_definitions() if t.name == TOOL_RUN)
|
|
return run_tool.inputSchema["properties"]["files"]["items"]["pattern"]
|
|
|
|
def test_pattern_present(self) -> None:
|
|
self.assertEqual(self._files_item_pattern(), r"^ff_[A-Za-z0-9_-]+$")
|
|
|
|
def test_schema_rejects_malformed_token(self) -> None:
|
|
run_tool = next(t for t in _tool_definitions() if t.name == TOOL_RUN)
|
|
# Malformed tokens — wrong prefix, invalid chars, empty body.
|
|
for bad in ["bad", "ff_", "FF_aaa", "ff_with space", "../etc/passwd"]:
|
|
with self.assertRaises(jsonschema.ValidationError, msg=bad):
|
|
jsonschema.validate(
|
|
{"prompt": "hi", "files": [bad]}, run_tool.inputSchema
|
|
)
|
|
|
|
def test_schema_accepts_valid_token(self) -> None:
|
|
run_tool = next(t for t in _tool_definitions() if t.name == TOOL_RUN)
|
|
for good in ["ff_abc123", "ff_AB-cd_EF-gh", "ff_a"]:
|
|
jsonschema.validate(
|
|
{"prompt": "hi", "files": [good]}, run_tool.inputSchema
|
|
)
|
|
|
|
def test_pattern_compiles_as_regex(self) -> None:
|
|
# Sanity: ensure the schema string is a valid regex that matches
|
|
# the canonical token shape produced by the forge.
|
|
self.assertIsNotNone(re.match(self._files_item_pattern(), "ff_abc_123"))
|
|
|
|
|
|
class TestUnknownTool(unittest.TestCase):
|
|
def test_unknown_tool_returns_error(self) -> None:
|
|
forge = _client()
|
|
try:
|
|
content, is_error = _run(_dispatch(forge, "not_a_tool", {}))
|
|
finally:
|
|
forge.close()
|
|
self.assertTrue(is_error)
|
|
self.assertIn("unknown tool", content[0].text)
|
|
|
|
|
|
class TestServerFactory(unittest.TestCase):
|
|
"""Smoke test that the SDK accepts our wiring."""
|
|
|
|
def test_build_server_returns_named_server(self) -> None:
|
|
forge = _client()
|
|
try:
|
|
server = build_server(forge)
|
|
# The Server class exposes a `name` attribute set in __init__.
|
|
self.assertEqual(server.name, "clawdforge-mcp")
|
|
# Initialization options should populate without error.
|
|
init = server.create_initialization_options()
|
|
self.assertEqual(init.server_name, "clawdforge-mcp")
|
|
finally:
|
|
forge.close()
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
unittest.main()
|