From 104f49c441480e13aa52a200e3ada3be836bd6d7 Mon Sep 17 00:00:00 2001 From: Kayos Date: Tue, 28 Apr 2026 23:10:07 -0700 Subject: [PATCH] =?UTF-8?q?clients/mcp:=20apply=20audit=20findings=20?= =?UTF-8?q?=E2=80=94=20release-blocker=20fix=20on=20upload=20(093021c=20?= =?UTF-8?q?=E2=86=92=20new)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit HIGH: - S1: upload_file allow-root + symlink-resolve + size-cap. Env: CLAWDFORGE_UPLOAD_ROOT (default cwd), CLAWDFORGE_UPLOAD_MAX_BYTES (default 100MiB). README updated with threat-model paragraph. LOW: - S2: logger.propagate = False (stdout discipline defense-in-depth) - S3: catch-all error message no longer echoes str(e) (host paths) - S4: whitelist healthz/upload tool response fields - S5: pattern-validate ff_* file tokens in run schema - C1: strict-bool guard on timeout_secs/ttl_secs - C2: coerce empty-string model/system to None Deps: - requests>=2.32 (CVE-2024-35195) - urllib3>=2.2.2 (CVE-2024-37891) - mcp>=1.2.0 Audit: memory/clawdforge-audits/mcp-093021c.md --- clients/mcp/README.md | 36 ++- clients/mcp/pyproject.toml | 5 +- clients/mcp/src/clawdforge_mcp/client.py | 81 ++++++- clients/mcp/src/clawdforge_mcp/server.py | 79 +++++- clients/mcp/tests/test_server.py | 296 +++++++++++++++++++++-- 5 files changed, 457 insertions(+), 40 deletions(-) diff --git a/clients/mcp/README.md b/clients/mcp/README.md index 2324918..a1dfee0 100644 --- a/clients/mcp/README.md +++ b/clients/mcp/README.md @@ -39,11 +39,13 @@ This installs: The server reads configuration from environment variables — your MCP client sets these via its `env` block when it spawns the subprocess. -| Variable | Default | Notes | -| -------------------- | ------------------------ | ----------------------------------------------------------- | -| `CLAWDFORGE_URL` | `http://localhost:8800` | Override to your forge host (e.g. `http://192.168.0.5:8800`). | -| `CLAWDFORGE_TOKEN` | (required) | App bearer token (`cf_...`). Mint with `/admin/tokens`. | -| `CLAWDFORGE_MCP_LOG` | `WARNING` | Optional. Set `INFO` or `DEBUG` for stderr logs. | +| Variable | Default | Notes | +| ------------------------------ | ------------------------ | ----------------------------------------------------------- | +| `CLAWDFORGE_URL` | `http://localhost:8800` | Override to your forge host (e.g. `http://192.168.0.5:8800`). | +| `CLAWDFORGE_TOKEN` | (required) | App bearer token (`cf_...`). Mint with `/admin/tokens`. | +| `CLAWDFORGE_MCP_LOG` | `WARNING` | Optional. Set `INFO` or `DEBUG` for stderr logs. | +| `CLAWDFORGE_UPLOAD_ROOT` | process cwd | Allow-root for `clawdforge_upload_file`. Paths must resolve INSIDE this directory after symlink resolution. | +| `CLAWDFORGE_UPLOAD_MAX_BYTES` | `104857600` (100 MiB) | Hard cap on the size of a single file upload. Files exceeding this are refused before any bytes are sent. | ### Claude Desktop @@ -156,6 +158,30 @@ When NOT to reach for it: Path is interpreted on the **host running the MCP server** (typically the user's workstation), not whatever sandbox the LLM thinks it's in. +The path is constrained to an allow-root: `CLAWDFORGE_UPLOAD_ROOT` (default +the MCP server process's current working directory). Both symlinks and `..` +traversal are neutralized via `Path.resolve(strict=True)` followed by an +`is_relative_to(root)` containment check. Files larger than +`CLAWDFORGE_UPLOAD_MAX_BYTES` (default 100 MiB) are refused before any +bytes are sent to the forge. Non-regular files (FIFOs, sockets, directories, +devices) are refused. + +### Threat model — why these guards exist + +The MCP-specific question is: **what can a malicious LLM-driven client do?** +A model that has earned the user's trust can socially-engineer them into +running a tool call against any path it can name — and an MCP server that +reaches the local filesystem is a perfect exfiltration channel. The classic +shape is "let me upload your config so I can debug" pointed at +`~/.ssh/id_rsa`, `~/.aws/credentials`, `/etc/shadow`, etc. + +The defaults pin the upload root to the process cwd, which means an MCP +server launched from your project directory cannot reach into your home +directory at all. If you need broader access, set `CLAWDFORGE_UPLOAD_ROOT` +explicitly to a directory you've thought about — never to `/`. Symlink +resolution is mandatory: a symlink inside the root that points to `/etc` +will be rejected, not followed. + ## Testing ```sh diff --git a/clients/mcp/pyproject.toml b/clients/mcp/pyproject.toml index 07cc1e0..4ccd197 100644 --- a/clients/mcp/pyproject.toml +++ b/clients/mcp/pyproject.toml @@ -20,8 +20,9 @@ classifiers = [ "Topic :: Software Development :: Libraries :: Python Modules", ] dependencies = [ - "mcp>=1.0", - "requests>=2.28", + "mcp>=1.2.0", + "requests>=2.32.0", + "urllib3>=2.2.2", ] [project.optional-dependencies] diff --git a/clients/mcp/src/clawdforge_mcp/client.py b/clients/mcp/src/clawdforge_mcp/client.py index bc1c6c4..0531d32 100644 --- a/clients/mcp/src/clawdforge_mcp/client.py +++ b/clients/mcp/src/clawdforge_mcp/client.py @@ -26,6 +26,21 @@ _HEALTHZ_TIMEOUT_SECS = 10 _HTTP_TIMEOUT_MARGIN_SECS = 30 _DEFAULT_RUN_TIMEOUT_SECS = 120 +# Upload-side guards. The MCP-specific threat model is "what can a malicious +# LLM-driven client do?" — and the obvious win for an attacker is convincing +# a user to "let me upload this config so I can debug" and walking away with +# `~/.ssh/id_rsa`, `~/.aws/credentials`, etc. We defend in three ways: +# +# 1. ``CLAWDFORGE_UPLOAD_ROOT`` pins an allow-root (default: process cwd). +# Symlinks and ``..`` traversal both get neutralized via +# ``Path.resolve(strict=True)`` + ``is_relative_to``. +# 2. ``CLAWDFORGE_UPLOAD_MAX_BYTES`` caps the on-disk size we'll stream +# (default 100 MiB) so a runaway / malicious request can't pin the +# forge host on a bottomless file. +# 3. We refuse anything that isn't a regular file post-resolve — no FIFOs, +# sockets, devices, or directories. +_DEFAULT_UPLOAD_MAX_BYTES = 100 * 1024 * 1024 # 100 MiB + class ForgeError(Exception): """Base error for the clawdforge HTTP wrapper.""" @@ -175,11 +190,71 @@ class ForgeClient: path: str | os.PathLike[str], ttl_secs: int = 3600, ) -> dict: - p = Path(path) - if not p.exists(): - raise ValueError(f"file does not exist: {p}") + # Resolve the configured allow-root and size cap fresh on every call + # so tests / runtime env-toggles take effect without re-instantiating + # the client. Both fall back to safe defaults. + root_env = os.environ.get("CLAWDFORGE_UPLOAD_ROOT") + if root_env: + try: + root = Path(root_env).resolve(strict=True) + except (OSError, RuntimeError) as e: + raise ValueError( + f"CLAWDFORGE_UPLOAD_ROOT is not a valid directory: {e}" + ) from e + else: + root = Path.cwd().resolve() + if not root.is_dir(): + raise ValueError( + f"CLAWDFORGE_UPLOAD_ROOT must be a directory (got {root})" + ) + + max_bytes_env = os.environ.get("CLAWDFORGE_UPLOAD_MAX_BYTES") + if max_bytes_env: + try: + max_bytes = int(max_bytes_env) + except ValueError as e: + raise ValueError( + "CLAWDFORGE_UPLOAD_MAX_BYTES must be an integer" + ) from e + if max_bytes <= 0: + raise ValueError( + "CLAWDFORGE_UPLOAD_MAX_BYTES must be a positive integer" + ) + else: + max_bytes = _DEFAULT_UPLOAD_MAX_BYTES + + # ``resolve(strict=True)`` raises FileNotFoundError if the path (or + # any intermediate symlink target) is missing — that's the exact + # behaviour we want, just under one error type. + try: + p = Path(path).resolve(strict=True) + except FileNotFoundError as e: + raise ValueError(f"file does not exist: {path}") from e + except (OSError, RuntimeError) as e: + raise ValueError(f"cannot resolve path: {e}") from e + + # Containment check after symlink resolution. ``is_relative_to`` is + # the 3.9+ API; we require 3.10+ in pyproject so it's always present. + if not p.is_relative_to(root): + raise ValueError( + f"path is outside CLAWDFORGE_UPLOAD_ROOT ({root}): refusing to upload" + ) + + # Reject FIFOs, sockets, block/char devices, directories. ``is_file`` + # only returns True for regular files (after symlink resolution). if not p.is_file(): raise ValueError(f"path is not a regular file: {p}") + + try: + size = p.stat().st_size + except OSError as e: + raise ValueError(f"cannot stat {p}: {e}") from e + if size > max_bytes: + raise ValueError( + f"file exceeds CLAWDFORGE_UPLOAD_MAX_BYTES " + f"({size} > {max_bytes} bytes)" + ) + try: fh = p.open("rb") except OSError as e: diff --git a/clients/mcp/src/clawdforge_mcp/server.py b/clients/mcp/src/clawdforge_mcp/server.py index 286732c..3b8ae7b 100644 --- a/clients/mcp/src/clawdforge_mcp/server.py +++ b/clients/mcp/src/clawdforge_mcp/server.py @@ -121,7 +121,10 @@ def _tool_definitions() -> list[types.Tool]: }, "files": { "type": "array", - "items": {"type": "string"}, + "items": { + "type": "string", + "pattern": "^ff_[A-Za-z0-9_-]+$", + }, "description": ( "Optional list of file_token strings (each " "starting 'ff_') previously returned by " @@ -152,7 +155,11 @@ def _tool_definitions() -> list[types.Tool]: "interpreted relative to the host running this MCP server " "(typically the user's workstation), NOT the LLM's sandbox. " "Files auto-expire after ttl_secs (default 3600s, range " - "60..86400)." + "60..86400). Paths are constrained to an allow-root " + "(CLAWDFORGE_UPLOAD_ROOT, default: the MCP server process's " + "current working directory) AFTER symlink resolution — the " + "server will refuse paths that resolve outside the root or " + "exceed CLAWDFORGE_UPLOAD_MAX_BYTES (default 100 MiB)." ), inputSchema={ "type": "object", @@ -215,6 +222,37 @@ def _err_content(message: str) -> list[types.TextContent]: return [types.TextContent(type="text", text=f"clawdforge error: {message}")] +def _format_healthz_response(payload: Any) -> dict[str, Any]: + """Whitelist healthz fields returned to the LLM. + + The forge currently only emits ``ok`` / ``claude_present`` / + ``claude_version``, but if a future version adds e.g. internal hostnames + or pid info we don't want it leaking through this surface verbatim. + """ + if not isinstance(payload, dict): + return {"ok": False} + return { + "ok": payload.get("ok"), + "claude_present": payload.get("claude_present"), + "claude_version": payload.get("claude_version"), + } + + +def _format_upload_response(payload: Any) -> dict[str, Any]: + """Whitelist upload-file fields returned to the LLM. + + The forge currently emits ``file_token`` / ``ttl_secs`` / ``size``. + Same defense-in-depth rationale as ``_format_healthz_response``. + """ + if not isinstance(payload, dict): + return {} + return { + "file_token": payload.get("file_token"), + "ttl_secs": payload.get("ttl_secs"), + "size": payload.get("size"), + } + + def _format_forge_error(e: ForgeError) -> str: """Render a ForgeError as a single short string for an LLM.""" if isinstance(e, ForgeAuthError): @@ -246,7 +284,7 @@ async def _dispatch( try: if name == TOOL_HEALTHZ: payload = await asyncio.to_thread(forge.healthz) - return _ok_content(payload), False + return _ok_content(_format_healthz_response(payload)), False if name == TOOL_RUN: prompt = args.get("prompt") @@ -256,15 +294,25 @@ async def _dispatch( system = args.get("system") files = args.get("files") timeout_secs = args.get("timeout_secs") + # ``bool`` is a subclass of ``int`` in Python; without the + # ``not isinstance(..., bool)`` guard, ``timeout_secs=True`` + # silently slips through as 1 and gets forwarded to the forge. + # jsonschema rejects it upstream, but defense-in-depth. + if timeout_secs is not None and not ( + isinstance(timeout_secs, int) and not isinstance(timeout_secs, bool) + ): + return _err_content("'timeout_secs' must be an integer"), True payload = await asyncio.to_thread( forge.run, prompt=prompt, - model=model if isinstance(model, str) else None, - system=system if isinstance(system, str) else None, + # Treat empty-string model/system as "not set" — sending a + # bare empty string to the forge would override its default + # in confusing ways. Coerce to None so the forge picks its + # configured default. + model=model if isinstance(model, str) and model else None, + system=system if isinstance(system, str) and system else None, files=list(files) if isinstance(files, list) else None, - timeout_secs=int(timeout_secs) - if isinstance(timeout_secs, int) - else None, + timeout_secs=timeout_secs if timeout_secs is not None else None, ) # Surface result + the two metadata fields the LLM may want. shaped = { @@ -279,7 +327,8 @@ async def _dispatch( if not isinstance(path, str) or not path: return _err_content("missing or empty 'path' argument"), True ttl_secs = args.get("ttl_secs", 3600) - if not isinstance(ttl_secs, int): + # Same bool-as-int subclass trap as timeout_secs above. + if not (isinstance(ttl_secs, int) and not isinstance(ttl_secs, bool)): return _err_content("'ttl_secs' must be an integer"), True try: payload = await asyncio.to_thread( @@ -287,7 +336,7 @@ async def _dispatch( ) except ValueError as ve: return _err_content(str(ve)), True - return _ok_content(payload), False + return _ok_content(_format_upload_response(payload)), False return _err_content(f"unknown tool: {name}"), True @@ -295,8 +344,12 @@ async def _dispatch( logger.warning("forge error in tool %s: %s", name, fe) return _err_content(_format_forge_error(fe)), True except Exception as e: # pragma: no cover - defensive + # Full detail (including stack + str(e)) goes to stderr via + # logger.exception. The message we hand back to the LLM intentionally + # omits ``str(e)`` because Python builtin exceptions like + # FileNotFoundError / PermissionError stringify to host paths. logger.exception("unexpected error in tool %s", name) - return _err_content(f"unexpected: {type(e).__name__}: {e}"), True + return _err_content(f"unexpected internal error ({type(e).__name__})"), True # --------------------------------------------------------------------------- @@ -358,3 +411,7 @@ if not logger.handlers: _h.setFormatter(logging.Formatter("clawdforge-mcp [%(levelname)s] %(message)s")) logger.addHandler(_h) logger.setLevel(os.environ.get("CLAWDFORGE_MCP_LOG", "WARNING").upper()) + # Defense-in-depth against an embedding application reconfiguring the + # root logger to a stdout StreamHandler — that would corrupt our + # JSON-RPC framing. Our own handler is stderr-bound regardless. + logger.propagate = False diff --git a/clients/mcp/tests/test_server.py b/clients/mcp/tests/test_server.py index d0160f8..41438bc 100644 --- a/clients/mcp/tests/test_server.py +++ b/clients/mcp/tests/test_server.py @@ -16,9 +16,12 @@ from __future__ import annotations import asyncio import json import os +import re import tempfile import unittest +from unittest import mock +import jsonschema import responses from clawdforge_mcp.client import ForgeClient @@ -230,42 +233,297 @@ class TestUploadDispatch(unittest.TestCase): json={"file_token": "ff_abc123", "ttl_secs": 3600, "size": 11}, status=200, ) - with tempfile.NamedTemporaryFile( - "wb", suffix=".txt", delete=False - ) as tf: - tf.write(b"hello world") - tf.flush() - tmp_path = tf.name - try: - forge = _client() - try: - content, is_error = _run( - _dispatch( - forge, TOOL_UPLOAD, {"path": tmp_path, "ttl_secs": 600} + with tempfile.TemporaryDirectory() as root: + tmp_path = os.path.join(root, "hello.txt") + with open(tmp_path, "wb") as fh: + fh.write(b"hello world") + with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": root}): + forge = _client() + try: + content, is_error = _run( + _dispatch( + forge, TOOL_UPLOAD, {"path": tmp_path, "ttl_secs": 600} + ) ) - ) - finally: - forge.close() - finally: - os.unlink(tmp_path) + finally: + forge.close() self.assertFalse(is_error) body = json.loads(content[0].text) self.assertEqual(body["file_token"], "ff_abc123") + # Whitelist (S4): only file_token / ttl_secs / size are surfaced. + self.assertEqual(set(body.keys()), {"file_token", "ttl_secs", "size"}) def test_upload_missing_file_returns_error(self) -> None: + with tempfile.TemporaryDirectory() as root: + with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": root}): + forge = _client() + try: + content, is_error = _run( + _dispatch( + forge, + TOOL_UPLOAD, + { + "path": os.path.join( + root, "definitely-not-here-zzz.txt" + ) + }, + ) + ) + finally: + forge.close() + self.assertTrue(is_error) + self.assertIn("does not exist", content[0].text) + + def test_upload_path_outside_root_rejected_traversal(self) -> None: + """A `..` path that resolves outside CLAWDFORGE_UPLOAD_ROOT is refused.""" + with tempfile.TemporaryDirectory() as outer: + inner = os.path.join(outer, "inner") + os.mkdir(inner) + # Real file lives in outer (escape target). + secret = os.path.join(outer, "secret.txt") + with open(secret, "wb") as fh: + fh.write(b"sekret") + # Root is the inner dir; the escape goes via `..`. + with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": inner}): + forge = _client() + try: + content, is_error = _run( + _dispatch( + forge, + TOOL_UPLOAD, + {"path": os.path.join(inner, "..", "secret.txt")}, + ) + ) + finally: + forge.close() + self.assertTrue(is_error) + self.assertIn("outside", content[0].text) + + def test_upload_path_outside_root_rejected_symlink(self) -> None: + """A symlink whose target is outside the root is refused after resolve.""" + with tempfile.TemporaryDirectory() as outer: + inner = os.path.join(outer, "inner") + os.mkdir(inner) + secret = os.path.join(outer, "id_rsa") + with open(secret, "wb") as fh: + fh.write(b"private-key-bytes") + # Symlink lives inside root, target lives outside. + link = os.path.join(inner, "looks-innocent.txt") + os.symlink(secret, link) + with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": inner}): + forge = _client() + try: + content, is_error = _run( + _dispatch(forge, TOOL_UPLOAD, {"path": link}) + ) + finally: + forge.close() + self.assertTrue(is_error) + self.assertIn("outside", content[0].text) + + def test_upload_oversize_rejected(self) -> None: + """File at limit+1 byte is rejected by CLAWDFORGE_UPLOAD_MAX_BYTES.""" + with tempfile.TemporaryDirectory() as root: + target = os.path.join(root, "big.bin") + with open(target, "wb") as fh: + fh.write(b"x" * 16) # 16 bytes + with mock.patch.dict( + os.environ, + { + "CLAWDFORGE_UPLOAD_ROOT": root, + "CLAWDFORGE_UPLOAD_MAX_BYTES": "15", + }, + ): + forge = _client() + try: + content, is_error = _run( + _dispatch(forge, TOOL_UPLOAD, {"path": target}) + ) + finally: + forge.close() + self.assertTrue(is_error) + self.assertIn("CLAWDFORGE_UPLOAD_MAX_BYTES", content[0].text) + + def test_upload_non_file_rejected(self) -> None: + """Pointing the upload at a directory (or FIFO etc.) is refused.""" + with tempfile.TemporaryDirectory() as root: + sub = os.path.join(root, "subdir") + os.mkdir(sub) + with mock.patch.dict(os.environ, {"CLAWDFORGE_UPLOAD_ROOT": root}): + forge = _client() + try: + content, is_error = _run( + _dispatch(forge, TOOL_UPLOAD, {"path": sub}) + ) + finally: + forge.close() + self.assertTrue(is_error) + self.assertIn("not a regular file", content[0].text) + + +class TestDispatchCorrectness(unittest.TestCase): + """C1 / C2 — defense-in-depth typing checks in _dispatch.""" + + def test_dispatch_strict_bool_guard_timeout_secs(self) -> None: + """bool is a subclass of int — must not slip past the runtime guard.""" + forge = _client() + try: + content, is_error = _run( + _dispatch( + forge, + TOOL_RUN, + {"prompt": "hi", "timeout_secs": True}, + ) + ) + finally: + forge.close() + self.assertTrue(is_error) + self.assertIn("timeout_secs", content[0].text) + + def test_dispatch_strict_bool_guard_ttl_secs(self) -> None: forge = _client() try: content, is_error = _run( _dispatch( forge, TOOL_UPLOAD, - {"path": "/nonexistent/definitely-not-here-zzz.txt"}, + {"path": "/tmp/whatever", "ttl_secs": True}, ) ) finally: forge.close() self.assertTrue(is_error) - self.assertIn("does not exist", content[0].text) + self.assertIn("ttl_secs", content[0].text) + + @responses.activate + def test_dispatch_empty_string_model_coerced(self) -> None: + """Empty model/system string must be coerced to None upstream.""" + captured: dict = {} + + def cb(request): + captured["body"] = json.loads(request.body) + return ( + 200, + {}, + json.dumps( + { + "ok": True, + "result": "fine", + "duration_ms": 1, + "stop_reason": "end_turn", + } + ), + ) + + responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb) + forge = _client() + try: + content, is_error = _run( + _dispatch( + forge, + TOOL_RUN, + {"prompt": "hi", "model": "", "system": ""}, + ) + ) + finally: + forge.close() + self.assertFalse(is_error) + # Empty strings must NOT have been forwarded — they should be absent + # from the request body entirely (forge.run only sets the keys when + # they are not None). + self.assertNotIn("model", captured["body"]) + self.assertNotIn("system", captured["body"]) + + +class TestErrorLeakProtection(unittest.TestCase): + """S3 — catch-all internal error must not echo str(e) (host paths).""" + + def test_unexpected_error_does_not_leak_path(self) -> None: + # Force forge.healthz to raise a builtin whose str() contains a + # host path. The catch-all in _dispatch must collapse it. + forge = _client() + try: + with mock.patch.object( + forge, + "healthz", + side_effect=FileNotFoundError( + 2, "No such file", "/home/cobb/.ssh/id_rsa" + ), + ): + content, is_error = _run(_dispatch(forge, TOOL_HEALTHZ, {})) + finally: + forge.close() + self.assertTrue(is_error) + text = content[0].text + self.assertIn("unexpected internal error", text) + self.assertIn("FileNotFoundError", text) + self.assertNotIn("/home/cobb/.ssh/id_rsa", text) + self.assertNotIn("No such file", text) + + +class TestHealthzWhitelist(unittest.TestCase): + """S4 — extra fields from the upstream forge must be stripped.""" + + @responses.activate + def test_healthz_response_whitelisted(self) -> None: + responses.add( + responses.GET, + f"{BASE_URL}/healthz", + json={ + "ok": True, + "claude_present": True, + "claude_version": "1.2.3", + # Sneaky future fields the forge might add — must NOT pass through. + "internal_hostname": "lucy.lan", + "pid": 4242, + "secret_kv": {"k": "v"}, + }, + status=200, + ) + forge = _client() + try: + content, is_error = _run(_dispatch(forge, TOOL_HEALTHZ, {})) + finally: + forge.close() + self.assertFalse(is_error) + body = json.loads(content[0].text) + self.assertEqual( + set(body.keys()), {"ok", "claude_present", "claude_version"} + ) + self.assertNotIn("lucy.lan", content[0].text) + self.assertNotIn("4242", content[0].text) + + +class TestFilesPatternValidation(unittest.TestCase): + """S5 — `files` array items must match ^ff_[A-Za-z0-9_-]+$.""" + + def _files_item_pattern(self) -> str: + run_tool = next(t for t in _tool_definitions() if t.name == TOOL_RUN) + return run_tool.inputSchema["properties"]["files"]["items"]["pattern"] + + def test_pattern_present(self) -> None: + self.assertEqual(self._files_item_pattern(), r"^ff_[A-Za-z0-9_-]+$") + + def test_schema_rejects_malformed_token(self) -> None: + run_tool = next(t for t in _tool_definitions() if t.name == TOOL_RUN) + # Malformed tokens — wrong prefix, invalid chars, empty body. + for bad in ["bad", "ff_", "FF_aaa", "ff_with space", "../etc/passwd"]: + with self.assertRaises(jsonschema.ValidationError, msg=bad): + jsonschema.validate( + {"prompt": "hi", "files": [bad]}, run_tool.inputSchema + ) + + def test_schema_accepts_valid_token(self) -> None: + run_tool = next(t for t in _tool_definitions() if t.name == TOOL_RUN) + for good in ["ff_abc123", "ff_AB-cd_EF-gh", "ff_a"]: + jsonschema.validate( + {"prompt": "hi", "files": [good]}, run_tool.inputSchema + ) + + def test_pattern_compiles_as_regex(self) -> None: + # Sanity: ensure the schema string is a valid regex that matches + # the canonical token shape produced by the forge. + self.assertIsNotNone(re.match(self._files_item_pattern(), "ff_abc_123")) class TestUnknownTool(unittest.TestCase):