clients/mcp: initial MCP server for clawdforge

Drops a Model Context Protocol server into clients/mcp/ that wraps the
clawdforge HTTP surface so MCP-aware clients (Claude Desktop, Claude Code,
Cursor, Zed, custom agents) can call it as a native tool — claude talking
to claude through the LAN bridge.

Three tools exposed:

  - clawdforge_healthz     -> GET /healthz
  - clawdforge_run         -> POST /run
  - clawdforge_upload_file -> POST /files

Admin endpoints intentionally NOT exposed; token minting stays human-gated.

Implementation notes:

  - Built on the official `mcp` Python SDK (>=1.0). asyncio-native server,
    stdio transport, low-level Server class with @list_tools / @call_tool
    handlers.
  - Self-contained `requests` HTTP wrapper rather than depending on the
    sibling clients/python SDK — keeps clawdforge-mcp installable
    standalone. Same error taxonomy (ForgeError / ForgeAPIError /
    ForgeAuthError / ForgeTransportError).
  - Sync HTTP calls offloaded via asyncio.to_thread so a slow `claude -p`
    can't stall the MCP event loop.
  - Errors are formatted into a single 'clawdforge error: ...' text block
    with isError=True; tracebacks never leak through the JSON-RPC pipe.
  - Logging goes to stderr (CLAWDFORGE_MCP_LOG=DEBUG to enable). stdout
    is reserved for JSON-RPC framing.
  - Config via env: CLAWDFORGE_URL (default http://localhost:8800) and
    CLAWDFORGE_TOKEN (required). MCP clients pass these via their `env`
    config block.

Tests: 12 unit tests covering tool discovery, healthz, run-success,
run-with-files, run-empty-prompt, run-subprocess-502, run-auth-401,
upload happy path, upload missing file, unknown tool, server factory.
HTTP layer mocked via `responses`. Plus a manual end-to-end stdio
smoke (initialize + tools/list round-trip) verified during build.

Includes ready-to-paste Claude Desktop and Claude Code config examples,
and a README documenting install, env, all three tools, and operational
notes (stdout-is-sacred, error wrapping, no streaming).
This commit is contained in:
Kayos 2026-04-28 22:36:36 -07:00
parent 3c62613c30
commit 093021cb36
10 changed files with 1205 additions and 0 deletions

191
clients/mcp/README.md Normal file
View file

@ -0,0 +1,191 @@
# clawdforge-mcp
Model Context Protocol (MCP) server that bridges to the
[clawdforge](http://192.168.0.5:3001/Sulkta-Coop/clawdforge) LAN HTTP service.
Drops the clawdforge tool surface into any MCP-aware client — Claude Desktop,
Claude Code, Cursor, Zed, custom agents — so the model can delegate sub-tasks
to a separate Claude context window via `claude -p`. "Claude talking to
Claude," with the auth living in one place on the LAN.
## What it exposes
| Tool | Backed by | Use it for |
| -------------------------- | -------------- | ----------------------------------------------------------------------------------------- |
| `clawdforge_healthz` | `GET /healthz` | Verify clawdforge is up and the host's `claude` CLI is authenticated. |
| `clawdforge_run` | `POST /run` | Run a one-shot prompt in a fresh Claude subprocess. Single-turn. Returns the parsed result. |
| `clawdforge_upload_file` | `POST /files` | Stage a local file on the clawdforge host and get back a `ff_...` token to attach to a `clawdforge_run` call. |
The admin endpoints (`/admin/tokens`) are deliberately NOT exposed — token
minting is a human-gated operation.
## Install
From a checkout of the clawdforge repo:
```sh
pip install -e clients/mcp
# or with test deps
pip install -e 'clients/mcp[test]'
```
This installs:
- the `clawdforge_mcp` Python package
- a `clawdforge-mcp` console script (alias for `python -m clawdforge_mcp`)
## Configure
The server reads configuration from environment variables — your MCP client
sets these via its `env` block when it spawns the subprocess.
| Variable | Default | Notes |
| -------------------- | ------------------------ | ----------------------------------------------------------- |
| `CLAWDFORGE_URL` | `http://localhost:8800` | Override to your forge host (e.g. `http://192.168.0.5:8800`). |
| `CLAWDFORGE_TOKEN` | (required) | App bearer token (`cf_...`). Mint with `/admin/tokens`. |
| `CLAWDFORGE_MCP_LOG` | `WARNING` | Optional. Set `INFO` or `DEBUG` for stderr logs. |
### Claude Desktop
Add to `~/Library/Application Support/Claude/claude_desktop_config.json`
(macOS) or `%APPDATA%\Claude\claude_desktop_config.json` (Windows):
```json
{
"mcpServers": {
"clawdforge": {
"command": "clawdforge-mcp",
"env": {
"CLAWDFORGE_URL": "http://192.168.0.5:8800",
"CLAWDFORGE_TOKEN": "cf_REPLACE_ME"
}
}
}
}
```
Or if you'd rather not rely on `clawdforge-mcp` being on `$PATH`:
```json
{
"mcpServers": {
"clawdforge": {
"command": "/usr/bin/python3",
"args": ["-m", "clawdforge_mcp"],
"env": {
"CLAWDFORGE_URL": "http://192.168.0.5:8800",
"CLAWDFORGE_TOKEN": "cf_REPLACE_ME"
}
}
}
}
```
A ready-to-paste version lives at `examples/claude-desktop.json`.
### Claude Code
Pass the config via `--mcp-config`:
```sh
claude --mcp-config examples/claude-code.json
```
`examples/claude-code.json` follows the same `mcpServers` schema as Claude
Desktop.
### Cursor / Zed / others
Any client that follows the MCP server-spawn convention works the same way —
point it at the `clawdforge-mcp` command and pass `CLAWDFORGE_URL` and
`CLAWDFORGE_TOKEN` in the env block.
## Tool reference
### `clawdforge_healthz`
```jsonc
// args: none
// returns: {ok, claude_present, claude_version}
```
### `clawdforge_run`
```jsonc
// args:
{
"prompt": "string (required)",
"model": "string (optional, default 'sonnet')",
"system": "string (optional system prompt)",
"files": ["ff_...", "..."], // optional, from clawdforge_upload_file
"timeout_secs": 60 // optional, 5..600
}
// returns: {result, duration_ms, stop_reason}
```
`result` is whatever `claude -p --output-format json` produced, auto-parsed
to JSON if possible, otherwise a string.
When to reach for it:
- **Bounded sub-tasks** that don't need to stay in your main conversation
context — recipe parsing, log summarization, diff classification.
- **Different system prompts** — e.g. spawn a strict JSON-only sub-Claude
for one extraction step.
- **Cheap parallelism in spirit** — a sequence of `clawdforge_run` calls is
fine; each gets its own context window.
When NOT to reach for it:
- Long multi-turn conversations.
- Anything that needs streaming or partial output.
- Trivial prompts where the model can just answer in-context — `claude -p`
takes seconds even for one-liners.
### `clawdforge_upload_file`
```jsonc
// args:
{
"path": "/abs/or/relative/path/on/host",
"ttl_secs": 3600 // optional, 60..86400
}
// returns: {file_token, ttl_secs, size}
```
Path is interpreted on the **host running the MCP server** (typically the
user's workstation), not whatever sandbox the LLM thinks it's in.
## Testing
```sh
pip install -e 'clients/mcp[test]'
python -m pytest clients/mcp/tests
```
The tests stub out the HTTP layer with `responses` — no live clawdforge
required.
## Operational notes
- **stdout is sacred.** The MCP transport pipes JSON-RPC frames over
stdin/stdout. Any stray `print()` in the server process corrupts the
stream. All diagnostics go to stderr via the `clawdforge_mcp` logger.
- **Errors are wrapped, not raised.** Auth failures, transport errors,
upstream 502s — all get formatted into a single short `clawdforge error:
...` text content with `isError=True`. Callers see a clean message, not
a Python traceback.
- **Sync calls under async.** The MCP SDK is asyncio; our HTTP client is
blocking `requests`. Each tool offloads to `asyncio.to_thread` so a slow
`claude -p` call doesn't stall heartbeats.
- **No streaming.** `clawdforge_run` blocks the MCP request until the
subprocess returns. MCP clients handle this fine — it's a normal
long-running tool call.
## Why this exists
clawdforge centralizes the Claude CLI subscription auth on one LAN host so
every Sulkta service doesn't need its own login. MCP is the natural
integration layer: any MCP client can now treat clawdforge as a native
tool surface and call `claude -p` indirectly. Cobb's framing: *"may as
well let claude talk to claude."*

View file

@ -0,0 +1,12 @@
{
"mcpServers": {
"clawdforge": {
"command": "python",
"args": ["-m", "clawdforge_mcp"],
"env": {
"CLAWDFORGE_URL": "http://192.168.0.5:8800",
"CLAWDFORGE_TOKEN": "cf_REPLACE_ME_WITH_AN_APP_TOKEN"
}
}
}
}

View file

@ -0,0 +1,11 @@
{
"mcpServers": {
"clawdforge": {
"command": "clawdforge-mcp",
"env": {
"CLAWDFORGE_URL": "http://192.168.0.5:8800",
"CLAWDFORGE_TOKEN": "cf_REPLACE_ME_WITH_AN_APP_TOKEN"
}
}
}
}

View file

@ -0,0 +1,53 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "clawdforge-mcp"
version = "0.1.0"
description = "Model Context Protocol (MCP) server that bridges to clawdforge — lets MCP-aware clients (Claude Desktop, Claude Code, Cursor, Zed) call clawdforge tools as native tools."
readme = "README.md"
requires-python = ">=3.10"
authors = [{ name = "Kayos", email = "kayos@sulkta.com" }]
keywords = ["clawdforge", "mcp", "model-context-protocol", "claude", "sulkta"]
classifiers = [
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Operating System :: POSIX :: Linux",
"Intended Audience :: Developers",
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = [
"mcp>=1.0",
"requests>=2.28",
]
[project.optional-dependencies]
test = [
"responses>=0.23",
"pytest>=7",
"pytest-asyncio>=0.21",
]
[project.scripts]
clawdforge-mcp = "clawdforge_mcp.__main__:main"
[project.urls]
Homepage = "http://192.168.0.5:3001/Sulkta-Coop/clawdforge"
Source = "http://192.168.0.5:3001/Sulkta-Coop/clawdforge"
[tool.hatch.build.targets.wheel]
packages = ["src/clawdforge_mcp"]
[tool.hatch.build.targets.sdist]
include = [
"src/clawdforge_mcp",
"README.md",
"pyproject.toml",
]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

View file

@ -0,0 +1,12 @@
"""clawdforge-mcp — Model Context Protocol server that bridges to clawdforge.
Exposes the clawdforge HTTP service as MCP tools so that MCP-aware clients
(Claude Desktop, Claude Code, Cursor, Zed, custom agents) can invoke
``claude -p`` indirectly via the LAN-only forge.
Entry point: ``python -m clawdforge_mcp``.
"""
from .server import build_server, run_stdio
__all__ = ["build_server", "run_stdio"]
__version__ = "0.1.0"

View file

@ -0,0 +1,65 @@
"""Entry point: ``python -m clawdforge_mcp``.
Reads ``CLAWDFORGE_URL`` and ``CLAWDFORGE_TOKEN`` from the environment (the
MCP client is responsible for setting these via its ``env`` block) and runs
the MCP server over stdio. JSON-RPC frames flow on stdin/stdout; everything
else (including our own diagnostic prints) MUST go to stderr or it will
corrupt the protocol stream.
"""
from __future__ import annotations
import argparse
import asyncio
import os
import sys
from .server import run_stdio
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
prog="clawdforge-mcp",
description=(
"MCP server bridging to clawdforge. Set CLAWDFORGE_URL and "
"CLAWDFORGE_TOKEN in the environment. Speaks JSON-RPC on stdio."
),
)
parser.add_argument(
"--url",
default=None,
help="Override CLAWDFORGE_URL (default: env or http://localhost:8800).",
)
parser.add_argument(
"--check",
action="store_true",
help="Print resolved config to stderr and exit 0 without serving.",
)
args = parser.parse_args(argv)
url = args.url or os.environ.get("CLAWDFORGE_URL", "http://localhost:8800")
token = os.environ.get("CLAWDFORGE_TOKEN", "")
if args.check:
print(
f"clawdforge-mcp: url={url} token={'set' if token else 'MISSING'}",
file=sys.stderr,
)
return 0
if not token:
print(
"clawdforge-mcp: CLAWDFORGE_TOKEN is not set in env. "
"Configure your MCP client to pass it via the server's env block.",
file=sys.stderr,
)
return 2
try:
asyncio.run(run_stdio(base_url=url, token=token))
except KeyboardInterrupt:
return 130
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,202 @@
"""Thin sync HTTP wrapper around the clawdforge service.
We deliberately keep this self-contained (just ``requests``) rather than
depending on the sibling Python SDK at ``clients/python/``. Reasons:
- ``clawdforge-mcp`` ships independently and may be ``pip install``'d on a
host that doesn't have the Python SDK published anywhere reachable.
- The MCP server only needs three endpoints (healthz, run, files) and
trivial error wrapping pulling the full SDK is overkill.
Errors from this layer are surfaced as :class:`ForgeError` (and subclasses)
which the MCP server's tool handlers catch and reformat into MCP error
content. We never let a stack trace leak back through the JSON-RPC pipe
clients show that to the model verbatim and it pollutes the context.
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import Any
import requests
_HEALTHZ_TIMEOUT_SECS = 10
_HTTP_TIMEOUT_MARGIN_SECS = 30
_DEFAULT_RUN_TIMEOUT_SECS = 120
class ForgeError(Exception):
"""Base error for the clawdforge HTTP wrapper."""
class ForgeTransportError(ForgeError):
"""Connection / TCP / DNS / TLS failure — no HTTP response."""
class ForgeAPIError(ForgeError):
"""4xx / 5xx response. ``status_code`` and ``body`` are populated."""
def __init__(
self,
message: str,
*,
status_code: int,
body: dict[str, Any] | str | None = None,
) -> None:
super().__init__(message)
self.status_code = status_code
self.body = body
self.message = message
class ForgeAuthError(ForgeAPIError):
"""401 / 403 — bad token or IP not allowed."""
class ForgeClient:
"""Minimal sync client for the three endpoints we expose via MCP.
One instance per MCP server process. Holds a ``requests.Session`` for
keep-alive across tool calls. Not thread-safe but the MCP server is
asyncio-single-threaded, and we only ever call this from
``asyncio.to_thread``, so a single shared instance is fine.
"""
def __init__(
self,
*,
base_url: str,
token: str,
default_timeout_secs: int = _DEFAULT_RUN_TIMEOUT_SECS,
http_timeout_margin: int = _HTTP_TIMEOUT_MARGIN_SECS,
session: requests.Session | None = None,
) -> None:
if not base_url:
raise ValueError("base_url is required")
if not token:
raise ValueError("token is required")
self.base_url = base_url.rstrip("/")
self.token = token
self.default_timeout_secs = default_timeout_secs
self.http_timeout_margin = http_timeout_margin
self._session = session or requests.Session()
self._owns_session = session is None
def close(self) -> None:
if self._owns_session:
self._session.close()
# -- internals ---------------------------------------------------------
def _headers(self) -> dict[str, str]:
return {"Authorization": f"Bearer {self.token}"}
def _request(
self,
method: str,
path: str,
*,
json_body: dict | None = None,
data: dict | None = None,
files: dict | None = None,
timeout: float | tuple[float, float] | None = None,
) -> Any:
try:
resp = self._session.request(
method,
f"{self.base_url}{path}",
headers=self._headers(),
json=json_body,
data=data,
files=files,
timeout=timeout,
)
except requests.RequestException as e:
raise ForgeTransportError(f"transport: {e}") from e
try:
body = resp.json()
except ValueError:
body = resp.text or None
if resp.status_code >= 400:
short = ""
if isinstance(body, dict):
short = body.get("error") or body.get("detail") or ""
elif isinstance(body, str):
short = body[:200]
msg = f"{resp.status_code} {resp.reason}: {short}".rstrip(": ")
if resp.status_code in (401, 403):
raise ForgeAuthError(msg, status_code=resp.status_code, body=body)
raise ForgeAPIError(msg, status_code=resp.status_code, body=body)
return body
# -- endpoints ---------------------------------------------------------
def healthz(self) -> dict:
return self._request("GET", "/healthz", timeout=_HEALTHZ_TIMEOUT_SECS)
def run(
self,
*,
prompt: str,
model: str | None = None,
system: str | None = None,
files: list[str] | None = None,
timeout_secs: int | None = None,
) -> dict:
if not prompt:
raise ValueError("prompt must be non-empty")
body: dict[str, Any] = {"prompt": prompt}
if model is not None:
body["model"] = model
if system is not None:
body["system"] = system
if files:
body["files"] = list(files)
if timeout_secs is not None:
body["timeout_secs"] = timeout_secs
effective_run_timeout = timeout_secs or self.default_timeout_secs
http_timeout = effective_run_timeout + self.http_timeout_margin
payload = self._request("POST", "/run", json_body=body, timeout=http_timeout)
if not isinstance(payload, dict):
raise ForgeError(f"unexpected /run response type: {type(payload).__name__}")
return payload
def upload_file(
self,
*,
path: str | os.PathLike[str],
ttl_secs: int = 3600,
) -> dict:
p = Path(path)
if not p.exists():
raise ValueError(f"file does not exist: {p}")
if not p.is_file():
raise ValueError(f"path is not a regular file: {p}")
try:
fh = p.open("rb")
except OSError as e:
raise ValueError(f"cannot open {p}: {e}") from e
try:
payload = self._request(
"POST",
"/files",
data={"ttl_secs": str(ttl_secs)},
files={"file": (p.name, fh)},
timeout=self.default_timeout_secs + self.http_timeout_margin,
)
finally:
fh.close()
if not isinstance(payload, dict):
raise ForgeError(
f"unexpected /files response type: {type(payload).__name__}"
)
return payload

View file

@ -0,0 +1,360 @@
"""MCP server implementation for clawdforge.
Three tools are exposed:
- ``clawdforge_healthz`` liveness + claude-CLI presence smoke test.
- ``clawdforge_run`` one-shot ``claude -p`` delegation.
- ``clawdforge_upload_file`` stage a local file and get back a token to
pass into a subsequent ``clawdforge_run`` call.
Admin endpoints (``/admin/tokens`` etc.) are intentionally NOT exposed.
Token minting is a human-gated operation; an LLM client has no business
poking at it.
Design notes
------------
- The MCP SDK is asyncio-native. Our underlying ``requests`` wrapper is
blocking, so each tool offloads HTTP work via ``asyncio.to_thread`` to
avoid stalling the event loop.
- Tool results are returned as a single :class:`mcp.types.TextContent`
block with a JSON body. That maximizes machine-readability for the
invoking LLM (the MCP spec lets clients pass content blocks straight
into the model's context). For ``clawdforge_run`` we surface
``result``, ``duration_ms``, and ``stop_reason`` together.
- Errors NEVER raise out of the tool handler; they are returned as
``isError=True`` results with a clean message. Raising would surface a
Python traceback through the JSON-RPC layer, which is both ugly and
potentially leaky (token strings, internal paths).
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
from typing import Any
import mcp.types as types
from mcp.server import Server
from mcp.server.stdio import stdio_server
from .client import (
ForgeAPIError,
ForgeAuthError,
ForgeClient,
ForgeError,
ForgeTransportError,
)
logger = logging.getLogger("clawdforge_mcp")
# ---------------------------------------------------------------------------
# Tool definitions
# ---------------------------------------------------------------------------
TOOL_HEALTHZ = "clawdforge_healthz"
TOOL_RUN = "clawdforge_run"
TOOL_UPLOAD = "clawdforge_upload_file"
def _tool_definitions() -> list[types.Tool]:
"""Return the static MCP Tool definitions list.
Descriptions are written for an LLM consumer they should make it
obvious WHEN to use the tool, not just WHAT it does.
"""
return [
types.Tool(
name=TOOL_HEALTHZ,
description=(
"Check whether clawdforge is up and the underlying Claude CLI "
"is authenticated. Use this before invoking clawdforge_run if "
"you suspect the bridge may be down. Returns "
"{ok, claude_present, claude_version}. No arguments."
),
inputSchema={
"type": "object",
"properties": {},
"additionalProperties": False,
},
),
types.Tool(
name=TOOL_RUN,
description=(
"Run a one-shot Claude prompt via the local clawdforge bridge. "
"Use this to delegate a bounded sub-task to a SEPARATE Claude "
"context window — e.g. 'parse this recipe text into JSON', "
"'summarize this 500-line log', 'extract the changed function "
"signatures from this diff'. Returns the parsed result "
"(usually a JSON object if the prompt asked for JSON, "
"otherwise a string). Single-turn only — NOT for streaming or "
"multi-turn chat. Cost: spawns a fresh `claude -p` subprocess "
"on the LAN host, so latency is several seconds even for "
"trivial prompts. Pass file_tokens (returned by "
"clawdforge_upload_file) in the `files` arg to attach uploads."
),
inputSchema={
"type": "object",
"properties": {
"prompt": {
"type": "string",
"minLength": 1,
"description": "The user prompt to send to claude -p.",
},
"model": {
"type": "string",
"description": (
"Optional model override (e.g. 'sonnet', 'opus', "
"'haiku'). Defaults to whatever clawdforge has "
"configured (typically 'sonnet')."
),
},
"system": {
"type": "string",
"description": (
"Optional system prompt prepended to the request "
"(e.g. 'You are a precise recipe parser. Always "
"reply with valid JSON.')."
),
},
"files": {
"type": "array",
"items": {"type": "string"},
"description": (
"Optional list of file_token strings (each "
"starting 'ff_') previously returned by "
"clawdforge_upload_file. They will be attached "
"to the claude run via --files."
),
},
"timeout_secs": {
"type": "integer",
"minimum": 5,
"maximum": 600,
"description": (
"Per-run subprocess timeout in seconds (5..600). "
"Defaults to the server's configured default."
),
},
},
"required": ["prompt"],
"additionalProperties": False,
},
),
types.Tool(
name=TOOL_UPLOAD,
description=(
"Stage a local file on the clawdforge host so it can be "
"attached to a subsequent clawdforge_run call via the `files` "
"argument. Returns {file_token, ttl_secs, size}. The path is "
"interpreted relative to the host running this MCP server "
"(typically the user's workstation), NOT the LLM's sandbox. "
"Files auto-expire after ttl_secs (default 3600s, range "
"60..86400)."
),
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"minLength": 1,
"description": (
"Filesystem path to the file to upload. Must "
"exist and be readable by the MCP server "
"process."
),
},
"ttl_secs": {
"type": "integer",
"minimum": 60,
"maximum": 86400,
"description": (
"Server-side TTL in seconds (60..86400). "
"Default 3600."
),
},
},
"required": ["path"],
"additionalProperties": False,
},
),
]
# ---------------------------------------------------------------------------
# Tool result helpers
# ---------------------------------------------------------------------------
def _ok_content(payload: Any) -> list[types.TextContent]:
"""Wrap a successful tool result as MCP content.
We encode as JSON so structured results (dicts, lists) survive intact.
The MCP client typically passes the text into the LLM verbatim, and
JSON is the lowest-friction shape to re-parse on the model side.
"""
if isinstance(payload, str):
text = payload
else:
try:
text = json.dumps(payload, ensure_ascii=False, indent=2, default=str)
except (TypeError, ValueError):
text = str(payload)
return [types.TextContent(type="text", text=text)]
def _err_content(message: str) -> list[types.TextContent]:
"""Wrap a failure as a single text block.
The CallToolResult around this gets marked ``isError=True`` upstream
(see :func:`_call_tool`). We keep the message tight no traceback,
no token strings.
"""
return [types.TextContent(type="text", text=f"clawdforge error: {message}")]
def _format_forge_error(e: ForgeError) -> str:
"""Render a ForgeError as a single short string for an LLM."""
if isinstance(e, ForgeAuthError):
return f"auth failed ({e.status_code}). Check CLAWDFORGE_TOKEN and IP allowlist."
if isinstance(e, ForgeAPIError):
# The /run endpoint uses 502 for subprocess failures — surface its
# `error` field if present, since it's the most actionable signal.
if isinstance(e.body, dict) and e.body.get("error"):
return f"api {e.status_code}: {e.body['error']}"
return f"api {e.status_code}: {e.message}"
if isinstance(e, ForgeTransportError):
return f"transport: {e}"
return f"forge: {e}"
# ---------------------------------------------------------------------------
# Tool dispatch
# ---------------------------------------------------------------------------
async def _dispatch(
forge: ForgeClient,
name: str,
arguments: dict[str, Any] | None,
) -> tuple[list[types.TextContent], bool]:
"""Run a tool. Returns (content, is_error)."""
args = arguments or {}
try:
if name == TOOL_HEALTHZ:
payload = await asyncio.to_thread(forge.healthz)
return _ok_content(payload), False
if name == TOOL_RUN:
prompt = args.get("prompt")
if not isinstance(prompt, str) or not prompt:
return _err_content("missing or empty 'prompt' argument"), True
model = args.get("model")
system = args.get("system")
files = args.get("files")
timeout_secs = args.get("timeout_secs")
payload = await asyncio.to_thread(
forge.run,
prompt=prompt,
model=model if isinstance(model, str) else None,
system=system if isinstance(system, str) else None,
files=list(files) if isinstance(files, list) else None,
timeout_secs=int(timeout_secs)
if isinstance(timeout_secs, int)
else None,
)
# Surface result + the two metadata fields the LLM may want.
shaped = {
"result": payload.get("result"),
"duration_ms": payload.get("duration_ms"),
"stop_reason": payload.get("stop_reason"),
}
return _ok_content(shaped), False
if name == TOOL_UPLOAD:
path = args.get("path")
if not isinstance(path, str) or not path:
return _err_content("missing or empty 'path' argument"), True
ttl_secs = args.get("ttl_secs", 3600)
if not isinstance(ttl_secs, int):
return _err_content("'ttl_secs' must be an integer"), True
try:
payload = await asyncio.to_thread(
forge.upload_file, path=path, ttl_secs=ttl_secs
)
except ValueError as ve:
return _err_content(str(ve)), True
return _ok_content(payload), False
return _err_content(f"unknown tool: {name}"), True
except ForgeError as fe:
logger.warning("forge error in tool %s: %s", name, fe)
return _err_content(_format_forge_error(fe)), True
except Exception as e: # pragma: no cover - defensive
logger.exception("unexpected error in tool %s", name)
return _err_content(f"unexpected: {type(e).__name__}: {e}"), True
# ---------------------------------------------------------------------------
# Server wiring
# ---------------------------------------------------------------------------
def build_server(forge: ForgeClient) -> Server:
"""Build an MCP :class:`Server` bound to the given forge client.
Split out so tests can construct a server with a mock client without
touching stdio.
"""
server: Server = Server("clawdforge-mcp")
@server.list_tools()
async def _list_tools() -> list[types.Tool]:
return _tool_definitions()
@server.call_tool()
async def _call_tool(
name: str, arguments: dict[str, Any] | None
) -> list[types.TextContent]:
content, is_error = await _dispatch(forge, name, arguments)
if is_error:
# Raising lets the SDK marshal isError=True into the response.
# We use a plain Exception with the already-formatted message
# rather than letting an arbitrary traceback through.
raise RuntimeError(content[0].text)
return content
return server
async def run_stdio(*, base_url: str, token: str) -> None:
"""Run the MCP server forever on stdio. Returns when stdin closes."""
forge = ForgeClient(base_url=base_url, token=token)
server = build_server(forge)
init_options = server.create_initialization_options()
try:
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, init_options)
finally:
forge.close()
__all__ = [
"build_server",
"run_stdio",
"TOOL_HEALTHZ",
"TOOL_RUN",
"TOOL_UPLOAD",
]
# Configure logging to stderr (stdout is reserved for JSON-RPC framing).
if not logger.handlers:
_h = logging.StreamHandler()
_h.setFormatter(logging.Formatter("clawdforge-mcp [%(levelname)s] %(message)s"))
logger.addHandler(_h)
logger.setLevel(os.environ.get("CLAWDFORGE_MCP_LOG", "WARNING").upper())

View file

View file

@ -0,0 +1,299 @@
"""Tests for clawdforge-mcp.
We test at two levels:
1. Tool-list discovery and tool-dispatch logic (``_dispatch``) exercises
the actual call paths an MCP client would hit, with the HTTP layer
mocked via ``responses``.
2. The :func:`build_server` factory sanity check that the SDK accepts
our wiring.
We deliberately avoid spinning up a real stdio transport; that's the
SDK's territory and adds nothing on top of testing dispatch directly.
"""
from __future__ import annotations
import asyncio
import json
import os
import tempfile
import unittest
import responses
from clawdforge_mcp.client import ForgeClient
from clawdforge_mcp.server import (
TOOL_HEALTHZ,
TOOL_RUN,
TOOL_UPLOAD,
_dispatch,
_tool_definitions,
build_server,
)
BASE_URL = "http://192.168.0.5:8800"
TOKEN = "cf_test_token_xxxxxxxx"
def _client() -> ForgeClient:
# Short timeout so a hung test fails fast rather than hanging CI.
return ForgeClient(base_url=BASE_URL, token=TOKEN, default_timeout_secs=10)
def _run(coro):
return asyncio.run(coro)
class TestToolDiscovery(unittest.TestCase):
"""The MCP client calls list_tools first to discover capabilities."""
def test_three_tools_with_valid_schemas(self) -> None:
tools = _tool_definitions()
names = [t.name for t in tools]
self.assertEqual(
sorted(names),
sorted([TOOL_HEALTHZ, TOOL_RUN, TOOL_UPLOAD]),
)
for t in tools:
# Every tool must have a non-empty description (the LLM uses
# this to pick the tool) and a JSON Schema input definition.
self.assertTrue(t.description and len(t.description) > 20, t.name)
self.assertEqual(t.inputSchema.get("type"), "object", t.name)
# Top-level should explicitly forbid extra args so the LLM
# doesn't get encouraged to invent keys.
self.assertFalse(
t.inputSchema.get("additionalProperties", True),
f"{t.name} should set additionalProperties=False",
)
def test_run_schema_requires_prompt(self) -> None:
run_tool = next(t for t in _tool_definitions() if t.name == TOOL_RUN)
self.assertIn("prompt", run_tool.inputSchema["required"])
self.assertIn("files", run_tool.inputSchema["properties"])
self.assertEqual(
run_tool.inputSchema["properties"]["timeout_secs"]["maximum"], 600
)
class TestHealthzDispatch(unittest.TestCase):
@responses.activate
def test_healthz_ok(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/healthz",
json={"ok": True, "claude_present": True, "claude_version": "1.2.3"},
status=200,
)
forge = _client()
try:
content, is_error = _run(_dispatch(forge, TOOL_HEALTHZ, {}))
finally:
forge.close()
self.assertFalse(is_error)
self.assertEqual(len(content), 1)
body = json.loads(content[0].text)
self.assertTrue(body["ok"])
self.assertEqual(body["claude_version"], "1.2.3")
class TestRunDispatch(unittest.TestCase):
@responses.activate
def test_run_success(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={
"ok": True,
"result": {"hello": "world"},
"duration_ms": 1234,
"stop_reason": "end_turn",
},
status=200,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_RUN, {"prompt": "say hi"})
)
finally:
forge.close()
self.assertFalse(is_error)
body = json.loads(content[0].text)
self.assertEqual(body["result"], {"hello": "world"})
self.assertEqual(body["duration_ms"], 1234)
self.assertEqual(body["stop_reason"], "end_turn")
@responses.activate
def test_run_with_files_passes_through(self) -> None:
captured = {}
def cb(request):
captured["body"] = json.loads(request.body)
return (
200,
{},
json.dumps(
{
"ok": True,
"result": "fine",
"duration_ms": 10,
"stop_reason": "end_turn",
}
),
)
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_RUN,
{
"prompt": "summarize",
"files": ["ff_aaa", "ff_bbb"],
"model": "opus",
"system": "be terse",
"timeout_secs": 90,
},
)
)
finally:
forge.close()
self.assertFalse(is_error)
self.assertEqual(captured["body"]["files"], ["ff_aaa", "ff_bbb"])
self.assertEqual(captured["body"]["model"], "opus")
self.assertEqual(captured["body"]["system"], "be terse")
self.assertEqual(captured["body"]["timeout_secs"], 90)
def test_run_rejects_empty_prompt(self) -> None:
forge = _client()
try:
content, is_error = _run(_dispatch(forge, TOOL_RUN, {"prompt": ""}))
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("prompt", content[0].text.lower())
@responses.activate
def test_run_subprocess_error_surfaces_clean_message(self) -> None:
# The clawdforge server returns 502 with an `error` body when the
# subprocess fails. Verify we surface that without a traceback.
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={
"ok": False,
"error": "claude exited 1",
"stderr": "...",
"duration_ms": 200,
"stop_reason": None,
},
status=502,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_RUN, {"prompt": "boom"})
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("claude exited 1", content[0].text)
self.assertNotIn("Traceback", content[0].text)
@responses.activate
def test_run_auth_failure_surfaces_actionable_message(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={"detail": "bad token"},
status=401,
)
forge = _client()
try:
content, is_error = _run(_dispatch(forge, TOOL_RUN, {"prompt": "hi"}))
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("auth failed", content[0].text)
self.assertIn("CLAWDFORGE_TOKEN", content[0].text)
class TestUploadDispatch(unittest.TestCase):
@responses.activate
def test_upload_happy_path(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/files",
json={"file_token": "ff_abc123", "ttl_secs": 3600, "size": 11},
status=200,
)
with tempfile.NamedTemporaryFile(
"wb", suffix=".txt", delete=False
) as tf:
tf.write(b"hello world")
tf.flush()
tmp_path = tf.name
try:
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge, TOOL_UPLOAD, {"path": tmp_path, "ttl_secs": 600}
)
)
finally:
forge.close()
finally:
os.unlink(tmp_path)
self.assertFalse(is_error)
body = json.loads(content[0].text)
self.assertEqual(body["file_token"], "ff_abc123")
def test_upload_missing_file_returns_error(self) -> None:
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_UPLOAD,
{"path": "/nonexistent/definitely-not-here-zzz.txt"},
)
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("does not exist", content[0].text)
class TestUnknownTool(unittest.TestCase):
def test_unknown_tool_returns_error(self) -> None:
forge = _client()
try:
content, is_error = _run(_dispatch(forge, "not_a_tool", {}))
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("unknown tool", content[0].text)
class TestServerFactory(unittest.TestCase):
"""Smoke test that the SDK accepts our wiring."""
def test_build_server_returns_named_server(self) -> None:
forge = _client()
try:
server = build_server(forge)
# The Server class exposes a `name` attribute set in __init__.
self.assertEqual(server.name, "clawdforge-mcp")
# Initialization options should populate without error.
init = server.create_initialization_options()
self.assertEqual(init.server_name, "clawdforge-mcp")
finally:
forge.close()
if __name__ == "__main__": # pragma: no cover
unittest.main()