clients/mcp: v0.2 multi-turn session tools

- clawdforge_session_new / _turn / _close / _list / _get
- Wraps the v0.2 session HTTP surface (POST /sessions, POST
  /sessions/{id}/turn, GET/DELETE /sessions/{id}, GET /sessions)
- Tool descriptions tuned for LLM consumption: when to prefer session_new
  vs run, idempotency contract on close, file_token attachment via files[]
- session_turn returns two content blocks: prose text (concat'd text events)
  for direct LLM consumption + structured trace JSON (turn_index,
  stop_reason, duration_ms, events) for tool-calling agents
- 404/410/auth errors from upstream surface as MCP errors with actionable
  messages; no Python tracebacks leak through
- tests/test_sessions.py: 22 new tests covering the 5 tools + 404 + schema
  validation + idempotent close
- tests/test_server.py: new v0.1 schema-pin regression test
- README "Sessions (v0.2)" section with example open/turn/turn/close chain
- Bump version 0.1.0 -> 0.2.0

v0.1 tools (clawdforge_healthz / _run / _upload_file) are byte-identical.

Spec: memory/spec-clawdforge-v0.2.md
Server core: 940861f
This commit is contained in:
Kayos 2026-04-29 06:49:28 -07:00
parent 99173353bb
commit 5737903217
7 changed files with 1216 additions and 10 deletions

View file

@ -10,11 +10,16 @@ Claude," with the auth living in one place on the LAN.
## What it exposes
| Tool | Backed by | Use it for |
| -------------------------- | -------------- | ----------------------------------------------------------------------------------------- |
| `clawdforge_healthz` | `GET /healthz` | Verify clawdforge is up and the host's `claude` CLI is authenticated. |
| `clawdforge_run` | `POST /run` | Run a one-shot prompt in a fresh Claude subprocess. Single-turn. Returns the parsed result. |
| `clawdforge_upload_file` | `POST /files` | Stage a local file on the clawdforge host and get back a `ff_...` token to attach to a `clawdforge_run` call. |
| Tool | Backed by | Use it for |
| --------------------------- | --------------------------------- | ----------------------------------------------------------------------------------------- |
| `clawdforge_healthz` | `GET /healthz` | Verify clawdforge is up and the host's `claude` CLI is authenticated. |
| `clawdforge_run` | `POST /run` | Run a one-shot prompt in a fresh Claude subprocess. Single-turn. Returns the parsed result. |
| `clawdforge_upload_file` | `POST /files` | Stage a local file on the clawdforge host and get back a `ff_...` token to attach to a `clawdforge_run` call. |
| `clawdforge_session_new` | `POST /sessions` | (v0.2) Open a multi-turn session against an agent (default `claude`). Returns a `session_id`. |
| `clawdforge_session_turn` | `POST /sessions/{id}/turn` | (v0.2) Send one turn to an existing session. Returns prose text + structured events. |
| `clawdforge_session_close` | `DELETE /sessions/{id}` | (v0.2) Close a session explicitly. Idempotent. |
| `clawdforge_session_list` | `GET /sessions` | (v0.2) List sessions visible to this server's bearer token. |
| `clawdforge_session_get` | `GET /sessions/{id}` | (v0.2) Fetch a session's state (turn_count, last_turn_at, closed_at, live, meta). |
The admin endpoints (`/admin/tokens`) are deliberately NOT exposed — token
minting is a human-gated operation.
@ -166,7 +171,126 @@ traversal are neutralized via `Path.resolve(strict=True)` followed by an
bytes are sent to the forge. Non-regular files (FIFOs, sockets, directories,
devices) are refused.
### Threat model — why these guards exist
## Sessions (v0.2)
v0.2 adds multi-turn session tools. Use them when you need context across
multiple turns; for one-shot calls, `clawdforge_run` is still cheaper (no
session-create overhead, no ACPX handshake). Sessions auto-close after
**1 hour of inactivity** server-side, but explicit close is preferred.
### `clawdforge_session_new`
```jsonc
// args:
{
"agent": "claude", // optional, default "claude"
"meta": { "task": "..." } // optional, free-form caller metadata
}
// returns: {session_id, agent, created_at, cwd?}
```
### `clawdforge_session_turn`
```jsonc
// args:
{
"session_id": "01HV...ABC", // required, from clawdforge_session_new
"prompt": "string", // required
"files": ["ff_..."], // optional, from clawdforge_upload_file
"timeout_secs": 90 // optional, 5..600
}
// returns: two content blocks
// [0] plain text — concatenated `text` events (the model's reply)
// [1] JSON — {session_id, turn_index, stop_reason, duration_ms, events[]}
```
The two-block layout means the LLM consumer sees the prose reply directly
without having to parse JSON, while tool-calling agents that want to
introspect the structured event trace can still get it from the second
block. Concurrent turns on the same session are serialized server-side.
### `clawdforge_session_close`
```jsonc
// args:
{
"session_id": "01HV...ABC"
}
// returns: {ok: true}
// or: {ok: true, already_closed: true} // idempotent re-close
```
Idempotent. Safe to call multiple times — the server returns
`already_closed: true` on a re-close, and we surface that flag verbatim.
### `clawdforge_session_list`
```jsonc
// args:
{
"include_closed": true // optional, default true
}
// returns: {sessions: [...rows], count}
```
Each row has `session_id`, `agent`, `app_name`, `created_at`,
`last_turn_at`, `turn_count`, `closed_at` (nullable), and optional `meta`.
### `clawdforge_session_get`
```jsonc
// args:
{
"session_id": "01HV...ABC"
}
// returns: {session_id, agent, cwd, created_at, last_turn_at,
// turn_count, closed_at, live, meta}
```
`live` is true while the underlying ACPX subprocess is still running.
### Example flow — open, turn, turn, close
A typical multi-turn debug session from an MCP client (Claude Desktop,
Claude Code, Cursor, etc.):
```
> clawdforge_session_new {"agent": "claude", "meta": {"task": "debug auth flow"}}
< {"session_id": "01HV9P1234...", "agent": "claude", "created_at": 1714329600}
> clawdforge_session_turn {"session_id": "01HV9P1234...",
"prompt": "Read auth.py and explain how the bearer token check works."}
< [0] "The auth check happens in `require_app(...)` at line 42..."
[1] {"turn_index": 1, "stop_reason": "end_turn", "duration_ms": 5410, "events": [...]}
> clawdforge_session_turn {"session_id": "01HV9P1234...",
"prompt": "Now show me where the IP CIDR allowlist is enforced."}
< [0] "The CIDR check is in `_check_cidr_match` at..."
[1] {"turn_index": 2, ...}
> clawdforge_session_close {"session_id": "01HV9P1234..."}
< {"ok": true}
```
The session_id ties the turns together — clawdforge holds the ACPX
context across them, so turn 2 has full awareness of what turn 1 read.
### When to prefer `session_new` over `run`
- **Multi-turn investigation.** "Read X… now look at Y… now Z" benefits
from accumulated context. With `clawdforge_run` you'd repeat the entire
context on every call.
- **Long-running agentic tasks.** ACPX exposes structured tool calls; the
agent inside the session can `Read`, `Bash`, `Edit` etc. and you'll see
those events in the second content block of `session_turn`.
- **Stateful prompts.** "We agreed on schema X earlier — now generate the
migration for it." Same context window across turns.
For one-off prompts ("parse this recipe", "summarize this log"),
`clawdforge_run` is still the right call — lower latency, no session
lifecycle to manage.
## Threat model — why the upload guards exist
The MCP-specific question is: **what can a malicious LLM-driven client do?**
A model that has earned the user's trust can socially-engineer them into

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "clawdforge-mcp"
version = "0.1.0"
version = "0.2.0"
description = "Model Context Protocol (MCP) server that bridges to clawdforge — lets MCP-aware clients (Claude Desktop, Claude Code, Cursor, Zed) call clawdforge tools as native tools."
readme = "README.md"
requires-python = ">=3.10"

View file

@ -9,4 +9,4 @@ Entry point: ``python -m clawdforge_mcp``.
from .server import build_server, run_stdio
__all__ = ["build_server", "run_stdio"]
__version__ = "0.1.0"
__version__ = "0.2.0"

View file

@ -18,6 +18,7 @@ from __future__ import annotations
import os
from pathlib import Path
from typing import Any
from urllib.parse import quote
import requests
@ -116,6 +117,7 @@ class ForgeClient:
json_body: dict | None = None,
data: dict | None = None,
files: dict | None = None,
params: dict | None = None,
timeout: float | tuple[float, float] | None = None,
) -> Any:
try:
@ -126,6 +128,7 @@ class ForgeClient:
json=json_body,
data=data,
files=files,
params=params,
timeout=timeout,
)
except requests.RequestException as e:
@ -275,3 +278,116 @@ class ForgeClient:
f"unexpected /files response type: {type(payload).__name__}"
)
return payload
# -- /sessions (v0.2) --------------------------------------------------
#
# Five thin wrappers over the v0.2 multi-turn surface. Same self-contained
# pattern as the rest of this client: we don't pull in the Python SDK
# (clients/python) just for these. The MCP server only needs the wire
# shape, which is stable per the v0.2 spec. Errors bubble up as
# ForgeError subclasses; the server's _dispatch reformats them for the
# LLM.
def session_new(
self,
*,
agent: str = "claude",
meta: dict[str, Any] | None = None,
) -> dict:
body: dict[str, Any] = {"agent": agent}
if meta is not None:
body["meta"] = meta
# Session create involves an acpx handshake — give it the same
# generous timeout we use for /run.
timeout = self.default_timeout_secs + self.http_timeout_margin
payload = self._request("POST", "/sessions", json_body=body, timeout=timeout)
if not isinstance(payload, dict):
raise ForgeError(
f"unexpected /sessions response type: {type(payload).__name__}"
)
return payload
def session_turn(
self,
*,
session_id: str,
prompt: str,
files: list[str] | None = None,
timeout_secs: int | None = None,
) -> dict:
if not session_id:
raise ValueError("session_id must be non-empty")
if not prompt:
raise ValueError("prompt must be non-empty")
body: dict[str, Any] = {"prompt": prompt}
if files:
body["files"] = list(files)
if timeout_secs is not None:
body["timeout_secs"] = timeout_secs
# HTTP timeout uses the same subprocess-timeout-plus-margin pattern
# as /run; per-turn ACPX work can be just as slow as a one-shot.
effective = timeout_secs if timeout_secs is not None else self.default_timeout_secs
http_timeout = effective + self.http_timeout_margin
slug = quote(session_id, safe="")
payload = self._request(
"POST",
f"/sessions/{slug}/turn",
json_body=body,
timeout=http_timeout,
)
if not isinstance(payload, dict):
raise ForgeError(
f"unexpected /sessions/{{id}}/turn response type: {type(payload).__name__}"
)
return payload
def session_close(self, session_id: str) -> dict:
if not session_id:
raise ValueError("session_id must be non-empty")
slug = quote(session_id, safe="")
payload = self._request(
"DELETE",
f"/sessions/{slug}",
timeout=_HEALTHZ_TIMEOUT_SECS,
)
if not isinstance(payload, dict):
raise ForgeError(
f"unexpected /sessions/{{id}} DELETE response type: {type(payload).__name__}"
)
return payload
def session_get(self, session_id: str) -> dict:
if not session_id:
raise ValueError("session_id must be non-empty")
slug = quote(session_id, safe="")
payload = self._request(
"GET",
f"/sessions/{slug}",
timeout=_HEALTHZ_TIMEOUT_SECS,
)
if not isinstance(payload, dict):
raise ForgeError(
f"unexpected /sessions/{{id}} GET response type: {type(payload).__name__}"
)
return payload
def session_list(self, *, include_closed: bool = True) -> dict:
# Server default is include_closed=true; only forward the override
# when False to keep the URL stable for the common case.
params: dict[str, Any] | None = None
if include_closed is False:
params = {"include_closed": "false"}
payload = self._request(
"GET",
"/sessions",
params=params,
timeout=_HEALTHZ_TIMEOUT_SECS,
)
if not isinstance(payload, dict):
raise ForgeError(
f"unexpected /sessions LIST response type: {type(payload).__name__}"
)
return payload

View file

@ -58,6 +58,14 @@ TOOL_HEALTHZ = "clawdforge_healthz"
TOOL_RUN = "clawdforge_run"
TOOL_UPLOAD = "clawdforge_upload_file"
# v0.2 multi-turn session tools. Additive on top of v0.1 — none of the
# v0.1 tools above have changed shape.
TOOL_SESSION_NEW = "clawdforge_session_new"
TOOL_SESSION_TURN = "clawdforge_session_turn"
TOOL_SESSION_CLOSE = "clawdforge_session_close"
TOOL_SESSION_LIST = "clawdforge_session_list"
TOOL_SESSION_GET = "clawdforge_session_get"
def _tool_definitions() -> list[types.Tool]:
"""Return the static MCP Tool definitions list.
@ -146,6 +154,165 @@ def _tool_definitions() -> list[types.Tool]:
"additionalProperties": False,
},
),
types.Tool(
name=TOOL_SESSION_NEW,
description=(
"Open a new multi-turn session against the clawdforge agent. "
"Returns a session_id you can pass to clawdforge_session_turn "
"for follow-up turns. Sessions auto-close after 1h of "
"inactivity. Use this when you need context across multiple "
"turns; for one-shot calls prefer clawdforge_run (cheaper, "
"no session-create overhead). Returns "
"{session_id, agent, created_at, cwd?}."
),
inputSchema={
"type": "object",
"properties": {
"agent": {
"type": "string",
"minLength": 1,
"description": (
"Agent name to bind the session to. Defaults to "
"'claude'."
),
},
"meta": {
"type": "object",
"description": (
"Optional caller metadata persisted server-side "
"in the session ledger. Free-form JSON object."
),
"additionalProperties": True,
},
},
"additionalProperties": False,
},
),
types.Tool(
name=TOOL_SESSION_TURN,
description=(
"Send a turn to an existing session. Returns the agent's "
"response — the human-readable text reply in the first "
"content block, plus structured events (text, tool_call, "
"thinking) and metadata (turn_index, stop_reason, "
"duration_ms) as JSON in a second block for tool-calling "
"traceability. Concurrent turns on the same session are "
"serialized server-side. Pass file_tokens (from "
"clawdforge_upload_file) in `files` to attach uploads. "
"404 if the session_id is unknown or belongs to another "
"token; 410 if the session has been closed/GC'd."
),
inputSchema={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"minLength": 1,
"maxLength": 200,
"description": (
"Session id from clawdforge_session_new."
),
},
"prompt": {
"type": "string",
"minLength": 1,
"description": "The user prompt for this turn.",
},
"files": {
"type": "array",
"items": {
"type": "string",
"pattern": "^ff_[A-Za-z0-9_-]+$",
},
"description": (
"Optional list of file_token strings (each "
"starting 'ff_') previously returned by "
"clawdforge_upload_file."
),
},
"timeout_secs": {
"type": "integer",
"minimum": 5,
"maximum": 600,
"description": (
"Per-turn subprocess timeout in seconds (5..600). "
"Defaults to the server's configured default."
),
},
},
"required": ["session_id", "prompt"],
"additionalProperties": False,
},
),
types.Tool(
name=TOOL_SESSION_CLOSE,
description=(
"Close a session explicitly. Idempotent — safe to call "
"multiple times; the second call returns "
"{ok: true, already_closed: true}. Sessions also auto-close "
"after 1h inactivity, but explicit close is preferred when "
"you're done with it (frees the slot immediately and ends "
"the underlying acpx subprocess)."
),
inputSchema={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"minLength": 1,
"maxLength": 200,
"description": "Session id to close.",
},
},
"required": ["session_id"],
"additionalProperties": False,
},
),
types.Tool(
name=TOOL_SESSION_LIST,
description=(
"List sessions visible to this MCP server's bearer token. "
"Each entry includes session_id, agent, created_at, "
"last_turn_at, turn_count, closed_at (nullable), and "
"optional meta. Newest-first."
),
inputSchema={
"type": "object",
"properties": {
"include_closed": {
"type": "boolean",
"description": (
"If false, only sessions where closed_at is null "
"are returned. Default true (mirrors server "
"default)."
),
},
},
"additionalProperties": False,
},
),
types.Tool(
name=TOOL_SESSION_GET,
description=(
"Get the state of a specific session: turn_count, "
"last_turn_at timestamp, agent, closed_at, live flag, and "
"any meta the caller attached at creation. 404 if the "
"session_id is unknown or belongs to another token."
),
inputSchema={
"type": "object",
"properties": {
"session_id": {
"type": "string",
"minLength": 1,
"maxLength": 200,
"description": "Session id to fetch.",
},
},
"required": ["session_id"],
"additionalProperties": False,
},
),
types.Tool(
name=TOOL_UPLOAD,
description=(
@ -253,6 +420,123 @@ def _format_upload_response(payload: Any) -> dict[str, Any]:
}
def _format_session_new_response(payload: Any) -> dict[str, Any]:
"""Whitelist /sessions create fields returned to the LLM.
Server returns: ``ok``, ``session_id``, ``agent``, ``created_at``,
``cwd``. We pass through the four caller-useful keys; ``ok`` is
elided since reaching this branch already implies success.
"""
if not isinstance(payload, dict):
return {}
return {
"session_id": payload.get("session_id"),
"agent": payload.get("agent"),
"created_at": payload.get("created_at"),
"cwd": payload.get("cwd"),
}
def _format_session_get_response(payload: Any) -> dict[str, Any]:
"""Whitelist /sessions/{id} GET fields.
Server returns: ``session_id``, ``agent``, ``cwd``, ``created_at``,
``last_turn_at``, ``turn_count``, ``closed_at``, ``live``, ``meta``.
"""
if not isinstance(payload, dict):
return {}
return {
"session_id": payload.get("session_id"),
"agent": payload.get("agent"),
"cwd": payload.get("cwd"),
"created_at": payload.get("created_at"),
"last_turn_at": payload.get("last_turn_at"),
"turn_count": payload.get("turn_count"),
"closed_at": payload.get("closed_at"),
"live": payload.get("live"),
"meta": payload.get("meta"),
}
def _format_session_close_response(payload: Any) -> dict[str, Any]:
"""Whitelist DELETE /sessions/{id} response.
Server returns ``{ok: true}`` or ``{ok: true, already_closed: true}``.
"""
if not isinstance(payload, dict):
return {"ok": False}
out: dict[str, Any] = {"ok": bool(payload.get("ok", True))}
if payload.get("already_closed"):
out["already_closed"] = True
return out
def _format_session_list_response(payload: Any) -> dict[str, Any]:
"""Whitelist /sessions list response.
Server returns ``{ok, sessions: [...], count}``. Each row is passed
through verbatim the row shape is already known/stable per the v0.2
spec and doesn't carry any internal-only fields the way /healthz might.
"""
if not isinstance(payload, dict):
return {"sessions": [], "count": 0}
rows = payload.get("sessions") or []
if not isinstance(rows, list):
rows = []
return {"sessions": rows, "count": payload.get("count", len(rows))}
def _format_session_turn_content(payload: Any) -> list[types.TextContent]:
"""Render a /sessions/{id}/turn response as MCP content blocks.
Two-block layout:
1. The concatenated ``text`` events as plain prose this is what
the LLM will actually read into context. Most callers want this
and only this.
2. The structured turn metadata (turn_index, stop_reason, duration_ms,
events) as JSON useful for tool-calling traceability and for
agents that want to inspect the thinking / tool_call event stream.
If the upstream payload is malformed (not a dict, or events isn't a
list), we still return a sane structure so the caller doesn't get a
Python exception bubbled up.
"""
if not isinstance(payload, dict):
return [types.TextContent(type="text", text="")]
events = payload.get("events") or []
if not isinstance(events, list):
events = []
# Block 1: prose. Concatenate ``content`` from all type=="text" events.
parts: list[str] = []
for ev in events:
if isinstance(ev, dict) and ev.get("type") == "text":
content = ev.get("content")
if isinstance(content, str):
parts.append(content)
text = "".join(parts)
# Block 2: structured trace. Whitelisted to known keys so a future
# server addition can't smuggle internal fields through verbatim.
trace = {
"session_id": payload.get("session_id"),
"turn_index": payload.get("turn_index"),
"stop_reason": payload.get("stop_reason"),
"duration_ms": payload.get("duration_ms"),
"events": events,
}
try:
trace_json = json.dumps(trace, ensure_ascii=False, indent=2, default=str)
except (TypeError, ValueError):
trace_json = str(trace)
return [
types.TextContent(type="text", text=text),
types.TextContent(type="text", text=trace_json),
]
def _format_forge_error(e: ForgeError) -> str:
"""Render a ForgeError as a single short string for an LLM."""
if isinstance(e, ForgeAuthError):
@ -338,6 +622,78 @@ async def _dispatch(
return _err_content(str(ve)), True
return _ok_content(_format_upload_response(payload)), False
if name == TOOL_SESSION_NEW:
agent = args.get("agent", "claude")
if not isinstance(agent, str) or not agent:
agent = "claude"
meta = args.get("meta")
if meta is not None and not isinstance(meta, dict):
return _err_content("'meta' must be an object"), True
payload = await asyncio.to_thread(
forge.session_new,
agent=agent,
meta=meta,
)
return _ok_content(_format_session_new_response(payload)), False
if name == TOOL_SESSION_TURN:
session_id = args.get("session_id")
if not isinstance(session_id, str) or not session_id:
return _err_content("missing or empty 'session_id' argument"), True
prompt = args.get("prompt")
if not isinstance(prompt, str) or not prompt:
return _err_content("missing or empty 'prompt' argument"), True
files = args.get("files")
timeout_secs = args.get("timeout_secs")
if timeout_secs is not None and not (
isinstance(timeout_secs, int) and not isinstance(timeout_secs, bool)
):
return _err_content("'timeout_secs' must be an integer"), True
try:
payload = await asyncio.to_thread(
forge.session_turn,
session_id=session_id,
prompt=prompt,
files=list(files) if isinstance(files, list) else None,
timeout_secs=timeout_secs if timeout_secs is not None else None,
)
except ValueError as ve:
return _err_content(str(ve)), True
return _format_session_turn_content(payload), False
if name == TOOL_SESSION_CLOSE:
session_id = args.get("session_id")
if not isinstance(session_id, str) or not session_id:
return _err_content("missing or empty 'session_id' argument"), True
try:
payload = await asyncio.to_thread(
forge.session_close, session_id
)
except ValueError as ve:
return _err_content(str(ve)), True
return _ok_content(_format_session_close_response(payload)), False
if name == TOOL_SESSION_LIST:
include_closed = args.get("include_closed", True)
if not isinstance(include_closed, bool):
return _err_content("'include_closed' must be a boolean"), True
payload = await asyncio.to_thread(
forge.session_list, include_closed=include_closed
)
return _ok_content(_format_session_list_response(payload)), False
if name == TOOL_SESSION_GET:
session_id = args.get("session_id")
if not isinstance(session_id, str) or not session_id:
return _err_content("missing or empty 'session_id' argument"), True
try:
payload = await asyncio.to_thread(
forge.session_get, session_id
)
except ValueError as ve:
return _err_content(str(ve)), True
return _ok_content(_format_session_get_response(payload)), False
return _err_content(f"unknown tool: {name}"), True
except ForgeError as fe:
@ -402,6 +758,11 @@ __all__ = [
"TOOL_HEALTHZ",
"TOOL_RUN",
"TOOL_UPLOAD",
"TOOL_SESSION_NEW",
"TOOL_SESSION_TURN",
"TOOL_SESSION_CLOSE",
"TOOL_SESSION_LIST",
"TOOL_SESSION_GET",
]

View file

@ -28,6 +28,11 @@ from clawdforge_mcp.client import ForgeClient
from clawdforge_mcp.server import (
TOOL_HEALTHZ,
TOOL_RUN,
TOOL_SESSION_CLOSE,
TOOL_SESSION_GET,
TOOL_SESSION_LIST,
TOOL_SESSION_NEW,
TOOL_SESSION_TURN,
TOOL_UPLOAD,
_dispatch,
_tool_definitions,
@ -51,12 +56,24 @@ def _run(coro):
class TestToolDiscovery(unittest.TestCase):
"""The MCP client calls list_tools first to discover capabilities."""
def test_three_tools_with_valid_schemas(self) -> None:
def test_all_tools_with_valid_schemas(self) -> None:
tools = _tool_definitions()
names = [t.name for t in tools]
# v0.1 had 3 tools; v0.2 adds 5 session tools (additive).
self.assertEqual(
sorted(names),
sorted([TOOL_HEALTHZ, TOOL_RUN, TOOL_UPLOAD]),
sorted(
[
TOOL_HEALTHZ,
TOOL_RUN,
TOOL_UPLOAD,
TOOL_SESSION_NEW,
TOOL_SESSION_TURN,
TOOL_SESSION_CLOSE,
TOOL_SESSION_LIST,
TOOL_SESSION_GET,
]
),
)
for t in tools:
# Every tool must have a non-empty description (the LLM uses
@ -70,6 +87,41 @@ class TestToolDiscovery(unittest.TestCase):
f"{t.name} should set additionalProperties=False",
)
def test_v0_1_tool_schemas_unchanged(self) -> None:
"""Regression: v0.2 must not mutate the v0.1 tool surface.
We pin the exact (name, required-args, schema-version-distinguishing)
properties of healthz / run / upload_file so a future refactor
can't silently break v0.1 callers.
"""
tools = {t.name: t for t in _tool_definitions()}
# healthz: zero args, additionalProperties=False.
h = tools[TOOL_HEALTHZ].inputSchema
self.assertEqual(h.get("properties", {}), {})
self.assertFalse(h.get("additionalProperties", True))
# run: required=['prompt'], same five properties as v0.1, files
# pattern unchanged, timeout_secs range unchanged.
r = tools[TOOL_RUN].inputSchema
self.assertEqual(r["required"], ["prompt"])
self.assertEqual(
sorted(r["properties"].keys()),
sorted(["prompt", "model", "system", "files", "timeout_secs"]),
)
self.assertEqual(
r["properties"]["files"]["items"]["pattern"], r"^ff_[A-Za-z0-9_-]+$"
)
self.assertEqual(r["properties"]["timeout_secs"]["minimum"], 5)
self.assertEqual(r["properties"]["timeout_secs"]["maximum"], 600)
# upload_file: required=['path'], two properties.
u = tools[TOOL_UPLOAD].inputSchema
self.assertEqual(u["required"], ["path"])
self.assertEqual(
sorted(u["properties"].keys()), sorted(["path", "ttl_secs"])
)
def test_run_schema_requires_prompt(self) -> None:
run_tool = next(t for t in _tool_definitions() if t.name == TOOL_RUN)
self.assertIn("prompt", run_tool.inputSchema["required"])

View file

@ -0,0 +1,553 @@
"""Tests for the v0.2 multi-turn session tools.
Mirrors the structure of test_server.py HTTP layer mocked via
``responses``, the dispatch function exercised directly.
Covers:
- ``clawdforge_session_new`` happy path returns session_id
- ``clawdforge_session_turn`` round-trip text content present in block 1,
structured events in block 2
- ``clawdforge_session_close`` idempotency (already_closed=True surfaces)
- ``clawdforge_session_list`` returns array of sessions
- ``clawdforge_session_get`` returns state
- 404 from /sessions/{id}/turn surfaces as MCP error with actionable text
The v0.1 tools' regression coverage lives in ``test_server.py`` and stays
intact; this file is purely additive.
"""
from __future__ import annotations
import asyncio
import json
import unittest
import jsonschema
import responses
from clawdforge_mcp.client import ForgeClient
from clawdforge_mcp.server import (
TOOL_SESSION_CLOSE,
TOOL_SESSION_GET,
TOOL_SESSION_LIST,
TOOL_SESSION_NEW,
TOOL_SESSION_TURN,
_dispatch,
_tool_definitions,
)
BASE_URL = "http://192.168.0.5:8800"
TOKEN = "cf_test_token_xxxxxxxx"
SID = "01HV9P1234567890ABCDEFGHJK" # ULID-ish; server returns whatever acpx hands back
def _client() -> ForgeClient:
return ForgeClient(base_url=BASE_URL, token=TOKEN, default_timeout_secs=10)
def _run(coro):
return asyncio.run(coro)
class TestSessionNewDispatch(unittest.TestCase):
@responses.activate
def test_session_new_returns_session_id(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/sessions",
json={
"ok": True,
"session_id": SID,
"agent": "claude",
"created_at": 1714329600,
"cwd": "/tmp/acpx-sessions/01HV.../cwd",
},
status=200,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_NEW, {"agent": "claude"})
)
finally:
forge.close()
self.assertFalse(is_error)
self.assertEqual(len(content), 1)
body = json.loads(content[0].text)
self.assertEqual(body["session_id"], SID)
self.assertEqual(body["agent"], "claude")
self.assertEqual(body["created_at"], 1714329600)
# Whitelist: only the four useful fields are surfaced.
self.assertEqual(
set(body.keys()), {"session_id", "agent", "created_at", "cwd"}
)
@responses.activate
def test_session_new_with_meta_passes_through(self) -> None:
captured: dict = {}
def cb(request):
captured["body"] = json.loads(request.body)
return (
200,
{},
json.dumps(
{
"ok": True,
"session_id": SID,
"agent": "claude",
"created_at": 1,
}
),
)
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=cb)
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_SESSION_NEW,
{"agent": "claude", "meta": {"task": "debug"}},
)
)
finally:
forge.close()
self.assertFalse(is_error)
self.assertEqual(captured["body"]["agent"], "claude")
self.assertEqual(captured["body"]["meta"], {"task": "debug"})
def test_session_new_rejects_non_dict_meta(self) -> None:
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_NEW, {"meta": "not-a-dict"})
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("meta", content[0].text)
class TestSessionTurnDispatch(unittest.TestCase):
@responses.activate
def test_session_turn_round_trip(self) -> None:
"""The turn returns two content blocks: prose text + structured trace."""
responses.add(
responses.POST,
f"{BASE_URL}/sessions/{SID}/turn",
json={
"ok": True,
"session_id": SID,
"turn_index": 1,
"events": [
{"type": "thinking", "content": "let me look..."},
{"type": "text", "content": "Hello back. "},
{"type": "text", "content": "Anything else?"},
],
"stop_reason": "end_turn",
"duration_ms": 4321,
},
status=200,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_SESSION_TURN,
{"session_id": SID, "prompt": "say hi"},
)
)
finally:
forge.close()
self.assertFalse(is_error)
self.assertEqual(len(content), 2)
# Block 1 is the prose — concatenation of text events.
self.assertEqual(content[0].text, "Hello back. Anything else?")
# Block 2 is the structured trace as JSON.
trace = json.loads(content[1].text)
self.assertEqual(trace["session_id"], SID)
self.assertEqual(trace["turn_index"], 1)
self.assertEqual(trace["stop_reason"], "end_turn")
self.assertEqual(trace["duration_ms"], 4321)
self.assertEqual(len(trace["events"]), 3)
# Thinking events ARE preserved in the trace block (callers may
# want to introspect tool-calling behavior).
self.assertEqual(trace["events"][0]["type"], "thinking")
@responses.activate
def test_session_turn_with_files_and_timeout_passes_through(self) -> None:
captured: dict = {}
def cb(request):
captured["body"] = json.loads(request.body)
return (
200,
{},
json.dumps(
{
"ok": True,
"session_id": SID,
"turn_index": 2,
"events": [{"type": "text", "content": "ok"}],
"stop_reason": "end_turn",
"duration_ms": 1,
}
),
)
responses.add_callback(
responses.POST, f"{BASE_URL}/sessions/{SID}/turn", callback=cb
)
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_SESSION_TURN,
{
"session_id": SID,
"prompt": "summarize",
"files": ["ff_aaa", "ff_bbb"],
"timeout_secs": 90,
},
)
)
finally:
forge.close()
self.assertFalse(is_error)
self.assertEqual(captured["body"]["files"], ["ff_aaa", "ff_bbb"])
self.assertEqual(captured["body"]["timeout_secs"], 90)
def test_session_turn_rejects_missing_session_id(self) -> None:
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_TURN, {"prompt": "hi"})
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("session_id", content[0].text)
def test_session_turn_rejects_empty_prompt(self) -> None:
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_SESSION_TURN,
{"session_id": SID, "prompt": ""},
)
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("prompt", content[0].text.lower())
def test_session_turn_strict_bool_guard_timeout_secs(self) -> None:
"""bool is a subclass of int — must not slip past the runtime guard."""
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_SESSION_TURN,
{"session_id": SID, "prompt": "hi", "timeout_secs": True},
)
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("timeout_secs", content[0].text)
@responses.activate
def test_session_turn_404_surfaces_as_mcp_error(self) -> None:
"""404 from upstream → isError=True with actionable message."""
responses.add(
responses.POST,
f"{BASE_URL}/sessions/{SID}/turn",
json={"detail": "session not found"},
status=404,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge,
TOOL_SESSION_TURN,
{"session_id": SID, "prompt": "hi"},
)
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("404", content[0].text)
self.assertIn("session not found", content[0].text)
# Defense-in-depth: no Python traceback leaks through.
self.assertNotIn("Traceback", content[0].text)
class TestSessionCloseDispatch(unittest.TestCase):
@responses.activate
def test_session_close_first_call(self) -> None:
responses.add(
responses.DELETE,
f"{BASE_URL}/sessions/{SID}",
json={"ok": True},
status=200,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_CLOSE, {"session_id": SID})
)
finally:
forge.close()
self.assertFalse(is_error)
body = json.loads(content[0].text)
self.assertTrue(body["ok"])
# First close: server doesn't set already_closed, so we don't either.
self.assertNotIn("already_closed", body)
@responses.activate
def test_session_close_idempotent_second_call(self) -> None:
"""Server returns {ok: true, already_closed: true} on a re-close —
we surface that flag verbatim. The MCP tool doesn't error on
re-close (it's the documented contract for idempotency)."""
responses.add(
responses.DELETE,
f"{BASE_URL}/sessions/{SID}",
json={"ok": True, "already_closed": True},
status=200,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_CLOSE, {"session_id": SID})
)
finally:
forge.close()
self.assertFalse(is_error)
body = json.loads(content[0].text)
self.assertTrue(body["ok"])
self.assertTrue(body["already_closed"])
def test_session_close_rejects_missing_session_id(self) -> None:
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_CLOSE, {})
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("session_id", content[0].text)
class TestSessionListDispatch(unittest.TestCase):
@responses.activate
def test_session_list_returns_array(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/sessions",
json={
"ok": True,
"sessions": [
{
"session_id": SID,
"agent": "claude",
"app_name": "mcp-test",
"created_at": 1714329600,
"last_turn_at": 1714329700,
"turn_count": 3,
"closed_at": None,
},
{
"session_id": "older-session-id",
"agent": "claude",
"app_name": "mcp-test",
"created_at": 1714329000,
"last_turn_at": 1714329100,
"turn_count": 1,
"closed_at": 1714329500,
},
],
"count": 2,
},
status=200,
)
forge = _client()
try:
content, is_error = _run(_dispatch(forge, TOOL_SESSION_LIST, {}))
finally:
forge.close()
self.assertFalse(is_error)
body = json.loads(content[0].text)
self.assertIsInstance(body["sessions"], list)
self.assertEqual(body["count"], 2)
self.assertEqual(body["sessions"][0]["session_id"], SID)
self.assertEqual(body["sessions"][1]["closed_at"], 1714329500)
@responses.activate
def test_session_list_include_closed_false_passes_query_param(self) -> None:
"""include_closed=false must turn into ?include_closed=false on the wire."""
captured: dict = {}
def cb(request):
captured["url"] = request.url
return (
200,
{},
json.dumps({"ok": True, "sessions": [], "count": 0}),
)
responses.add_callback(
responses.GET, f"{BASE_URL}/sessions", callback=cb
)
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge, TOOL_SESSION_LIST, {"include_closed": False}
)
)
finally:
forge.close()
self.assertFalse(is_error)
self.assertIn("include_closed=false", captured["url"])
def test_session_list_rejects_non_bool_include_closed(self) -> None:
forge = _client()
try:
content, is_error = _run(
_dispatch(
forge, TOOL_SESSION_LIST, {"include_closed": "false"}
)
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("include_closed", content[0].text)
class TestSessionGetDispatch(unittest.TestCase):
@responses.activate
def test_session_get_returns_state(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/sessions/{SID}",
json={
"ok": True,
"session_id": SID,
"agent": "claude",
"cwd": "/tmp/acpx/cwd",
"created_at": 1714329600,
"last_turn_at": 1714329700,
"turn_count": 5,
"closed_at": None,
"live": True,
"meta": {"task": "debug"},
},
status=200,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_GET, {"session_id": SID})
)
finally:
forge.close()
self.assertFalse(is_error)
body = json.loads(content[0].text)
self.assertEqual(body["session_id"], SID)
self.assertEqual(body["turn_count"], 5)
self.assertTrue(body["live"])
self.assertEqual(body["meta"], {"task": "debug"})
# Whitelist: nine known keys, nothing else.
self.assertEqual(
set(body.keys()),
{
"session_id",
"agent",
"cwd",
"created_at",
"last_turn_at",
"turn_count",
"closed_at",
"live",
"meta",
},
)
@responses.activate
def test_session_get_404_surfaces_as_mcp_error(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/sessions/{SID}",
json={"detail": "session not found"},
status=404,
)
forge = _client()
try:
content, is_error = _run(
_dispatch(forge, TOOL_SESSION_GET, {"session_id": SID})
)
finally:
forge.close()
self.assertTrue(is_error)
self.assertIn("404", content[0].text)
self.assertIn("session not found", content[0].text)
class TestSessionToolSchemas(unittest.TestCase):
"""Schema-level checks — the MCP client validates input before dispatch."""
def _tool(self, name: str):
return next(t for t in _tool_definitions() if t.name == name)
def test_session_new_schema_optional_fields(self) -> None:
s = self._tool(TOOL_SESSION_NEW).inputSchema
# No required fields — agent defaults to claude, meta is optional.
self.assertNotIn("required", s)
self.assertFalse(s.get("additionalProperties", True))
# Empty input is valid.
jsonschema.validate({}, s)
jsonschema.validate({"agent": "claude"}, s)
jsonschema.validate({"agent": "claude", "meta": {"k": "v"}}, s)
def test_session_turn_schema_requires_session_id_and_prompt(self) -> None:
s = self._tool(TOOL_SESSION_TURN).inputSchema
self.assertEqual(sorted(s["required"]), ["prompt", "session_id"])
# files items still pinned to the v0.1 ff_-prefixed token pattern.
self.assertEqual(
s["properties"]["files"]["items"]["pattern"], r"^ff_[A-Za-z0-9_-]+$"
)
with self.assertRaises(jsonschema.ValidationError):
jsonschema.validate({"prompt": "hi"}, s) # missing session_id
with self.assertRaises(jsonschema.ValidationError):
jsonschema.validate({"session_id": SID}, s) # missing prompt
def test_session_close_schema_requires_session_id(self) -> None:
s = self._tool(TOOL_SESSION_CLOSE).inputSchema
self.assertEqual(s["required"], ["session_id"])
with self.assertRaises(jsonschema.ValidationError):
jsonschema.validate({}, s)
def test_session_list_schema_no_required_fields(self) -> None:
s = self._tool(TOOL_SESSION_LIST).inputSchema
self.assertNotIn("required", s)
jsonschema.validate({}, s)
jsonschema.validate({"include_closed": False}, s)
def test_session_get_schema_requires_session_id(self) -> None:
s = self._tool(TOOL_SESSION_GET).inputSchema
self.assertEqual(s["required"], ["session_id"])
with self.assertRaises(jsonschema.ValidationError):
jsonschema.validate({}, s)
if __name__ == "__main__": # pragma: no cover
unittest.main()