clawdforge/clients/python/tests/test_client.py
Kayos 1b097a21be clients/python: apply audit findings (90e158f → next)
- H1: quote slug in revoke_token
- H2: redact AppToken.token in repr/str
- M1-M6: wrap stdlib exceptions in ForgeError, validate timeouts, document uploads
- L1/L5/L7: type-strict, immutable ip_cidrs, validate ok field
- Bump requests floor to 2.32

Audit: memory/clawdforge-audits/python-90e158f.md
2026-04-28 23:07:38 -07:00

668 lines
23 KiB
Python

"""Tests for the clawdforge Python SDK.
Uses `responses` to intercept HTTP calls. No live network.
"""
from __future__ import annotations
import io
import json
import os
import unittest
from pathlib import Path
from typing import Any
import requests
import responses
from clawdforge import (
AppToken,
FileToken,
Forge,
ForgeAPIError,
ForgeAuthError,
ForgeError,
ForgeTransportError,
RunResult,
)
BASE_URL = "http://192.168.0.5:8800"
TOKEN = "cf_test_token_xxxxxxxx"
def _forge() -> Forge:
return Forge(base_url=BASE_URL, token=TOKEN, default_timeout_secs=60)
class TestHealthz(unittest.TestCase):
@responses.activate
def test_healthz_ok(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/healthz",
json={"ok": True, "claude_present": True, "claude_version": "1.2.3"},
status=200,
)
with _forge() as f:
out = f.healthz()
self.assertEqual(out["ok"], True)
self.assertEqual(out["claude_version"], "1.2.3")
@responses.activate
def test_healthz_no_auth_header_still_sent(self) -> None:
# The server doesn't *require* auth on /healthz, but we still send the
# bearer because we set it as a default header. Verify it's present.
captured = {}
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
captured["auth"] = request.headers.get("Authorization")
return (200, {}, json.dumps({"ok": True, "claude_present": True, "claude_version": "x"}))
responses.add_callback(responses.GET, f"{BASE_URL}/healthz", callback=cb)
with _forge() as f:
f.healthz()
self.assertEqual(captured["auth"], f"Bearer {TOKEN}")
class TestRun(unittest.TestCase):
@responses.activate
def test_run_success_json_result(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={
"ok": True,
"result": {"hello": "world"},
"duration_ms": 1234,
"stop_reason": "end_turn",
},
status=200,
)
with _forge() as f:
r = f.run(prompt='Reply with JSON: {"hello": "world"}')
self.assertIsInstance(r, RunResult)
self.assertTrue(r.ok)
self.assertEqual(r.result, {"hello": "world"})
self.assertEqual(r.duration_ms, 1234)
self.assertEqual(r.stop_reason, "end_turn")
@responses.activate
def test_run_success_string_result(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={
"ok": True,
"result": "plain text reply",
"duration_ms": 800,
"stop_reason": "end_turn",
},
status=200,
)
with _forge() as f:
r = f.run(prompt="hello")
self.assertEqual(r.result, "plain text reply")
@responses.activate
def test_run_sends_expected_body(self) -> None:
captured: dict = {}
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
captured["body"] = json.loads(request.body)
captured["auth"] = request.headers.get("Authorization")
return (
200,
{},
json.dumps({"ok": True, "result": {}, "duration_ms": 1, "stop_reason": "end_turn"}),
)
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
with _forge() as f:
f.run(
prompt="hi",
model="opus",
system="be terse",
files=["ff_abc"],
timeout_secs=42,
)
self.assertEqual(captured["auth"], f"Bearer {TOKEN}")
self.assertEqual(
captured["body"],
{
"prompt": "hi",
"model": "opus",
"system": "be terse",
"files": ["ff_abc"],
"timeout_secs": 42,
},
)
@responses.activate
def test_run_uses_default_model_when_omitted(self) -> None:
captured: dict = {}
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
captured["body"] = json.loads(request.body)
return (
200,
{},
json.dumps({"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"}),
)
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
with _forge() as f:
f.run(prompt="hi")
self.assertEqual(captured["body"]["model"], "sonnet")
self.assertNotIn("system", captured["body"])
self.assertNotIn("files", captured["body"])
self.assertNotIn("timeout_secs", captured["body"])
@responses.activate
def test_run_502_raises_api_error_with_body(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={
"ok": False,
"error": "subprocess timed out",
"stderr": "...",
"duration_ms": 60000,
"stop_reason": "timeout",
},
status=502,
)
with _forge() as f, self.assertRaises(ForgeAPIError) as ctx:
f.run(prompt="hi", timeout_secs=60)
err = ctx.exception
self.assertEqual(err.status_code, 502)
self.assertIsInstance(err.body, dict)
self.assertEqual(err.body["stop_reason"], "timeout")
self.assertIn("subprocess timed out", err.message)
@responses.activate
def test_run_401_raises_auth_error(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={"detail": "missing bearer"},
status=401,
)
with _forge() as f, self.assertRaises(ForgeAuthError) as ctx:
f.run(prompt="hi")
self.assertEqual(ctx.exception.status_code, 401)
# ForgeAuthError is-a ForgeAPIError is-a ForgeError
self.assertIsInstance(ctx.exception, ForgeAPIError)
self.assertIsInstance(ctx.exception, ForgeError)
@responses.activate
def test_run_403_raises_auth_error(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={"detail": "ip not in allowlist: 10.0.0.1"},
status=403,
)
with _forge() as f, self.assertRaises(ForgeAuthError):
f.run(prompt="hi")
@responses.activate
def test_run_transport_error(self) -> None:
# No registered response → responses raises ConnectionError.
with _forge() as f, self.assertRaises(ForgeTransportError):
f.run(prompt="hi")
def test_run_empty_prompt_rejected_locally(self) -> None:
with _forge() as f, self.assertRaises(ValueError):
f.run(prompt="")
@responses.activate
def test_run_http_timeout_uses_margin(self) -> None:
captured: dict = {}
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
# `responses` doesn't surface the timeout directly, but we can
# check via the prepared request hook that we got here at all.
captured["called"] = True
return (
200,
{},
json.dumps({"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"}),
)
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
with _forge() as f:
# Verify the computation: timeout_secs=42 → http_timeout=72.
# We poke at the private margin only to guard the formula stays
# in sync with the readme/contract.
self.assertEqual(f.http_timeout_margin, 30)
f.run(prompt="hi", timeout_secs=42)
self.assertTrue(captured.get("called"))
class TestFiles(unittest.TestCase):
@responses.activate
def test_upload_file_from_path(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/files",
json={"file_token": "ff_abc123", "ttl_secs": 3600, "size": 11},
status=200,
)
# Write a small temp file.
import tempfile
with tempfile.NamedTemporaryFile("wb", delete=False, suffix=".txt") as tf:
tf.write(b"hello world")
tmp_path = tf.name
try:
with _forge() as f:
ft = f.upload_file(tmp_path, ttl_secs=3600)
self.assertIsInstance(ft, FileToken)
self.assertEqual(ft.file_token, "ff_abc123")
self.assertEqual(ft.ttl_secs, 3600)
self.assertEqual(ft.size, 11)
finally:
Path(tmp_path).unlink(missing_ok=True)
@responses.activate
def test_upload_file_from_fileobj(self) -> None:
captured: dict = {}
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
ct = request.headers.get("Content-Type", "")
captured["content_type"] = ct
captured["body_starts_multipart"] = ct.startswith("multipart/form-data")
return (
200,
{},
json.dumps({"file_token": "ff_xyz", "ttl_secs": 60, "size": 5}),
)
responses.add_callback(responses.POST, f"{BASE_URL}/files", callback=cb)
buf = io.BytesIO(b"hello")
with _forge() as f:
ft = f.upload_file(buf, ttl_secs=60, filename="snippet.txt")
self.assertEqual(ft.file_token, "ff_xyz")
self.assertTrue(captured["body_starts_multipart"])
@responses.activate
def test_upload_file_400_raises_api_error(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/files",
json={"detail": "ttl_secs out of range (60..86400)"},
status=400,
)
buf = io.BytesIO(b"x")
with _forge() as f, self.assertRaises(ForgeAPIError) as ctx:
f.upload_file(buf, ttl_secs=10, filename="x.bin")
self.assertEqual(ctx.exception.status_code, 400)
class TestAdminTokens(unittest.TestCase):
@responses.activate
def test_create_token(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/admin/tokens",
json={
"name": "cauldron",
"token": "cf_brandnew_xxx",
"ip_cidrs": ["172.24.0.0/16"],
},
status=200,
)
with _forge() as f:
t = f.create_token("cauldron", ip_cidrs=["172.24.0.0/16"])
self.assertIsInstance(t, AppToken)
self.assertEqual(t.name, "cauldron")
self.assertEqual(t.token, "cf_brandnew_xxx")
self.assertEqual(t.ip_cidrs, ("172.24.0.0/16",))
@responses.activate
def test_list_tokens(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/admin/tokens",
json={
"tokens": [
{
"name": "cauldron",
"ip_cidrs": "172.24.0.0/16",
"created_at": 100,
"last_used": 200,
"enabled": 1,
},
{
"name": "petalparse",
"ip_cidrs": "",
"created_at": 50,
"last_used": None,
"enabled": 0,
},
]
},
status=200,
)
with _forge() as f:
toks = f.list_tokens()
self.assertEqual(len(toks), 2)
self.assertEqual(toks[0].name, "cauldron")
self.assertEqual(toks[0].ip_cidrs, ("172.24.0.0/16",))
self.assertTrue(toks[0].enabled)
self.assertIsNone(toks[0].token)
self.assertEqual(toks[1].ip_cidrs, ())
self.assertFalse(toks[1].enabled)
@responses.activate
def test_revoke_token_ok(self) -> None:
responses.add(
responses.DELETE,
f"{BASE_URL}/admin/tokens/cauldron",
json={"ok": True},
status=200,
)
with _forge() as f:
self.assertTrue(f.revoke_token("cauldron"))
@responses.activate
def test_revoke_token_404(self) -> None:
responses.add(
responses.DELETE,
f"{BASE_URL}/admin/tokens/nosuch",
json={"detail": "no such token"},
status=404,
)
with _forge() as f, self.assertRaises(ForgeAPIError) as ctx:
f.revoke_token("nosuch")
self.assertEqual(ctx.exception.status_code, 404)
class TestForgeConstruction(unittest.TestCase):
def test_requires_base_url(self) -> None:
with self.assertRaises(ValueError):
Forge(base_url="", token="cf_x")
def test_requires_token(self) -> None:
with self.assertRaises(ValueError):
Forge(base_url="http://x", token="")
def test_strips_trailing_slash(self) -> None:
f = Forge(base_url="http://x:8800/", token="cf_x")
self.assertEqual(f.base_url, "http://x:8800")
def test_context_manager_closes_owned_session(self) -> None:
f = Forge(base_url="http://x", token="cf_x")
with f:
self.assertTrue(f._owns_session)
# After close(), session.close() was called — calling close() again is safe.
f.close()
def test_external_session_not_closed(self) -> None:
sess = requests.Session()
try:
with Forge(base_url="http://x", token="cf_x", session=sess) as f:
self.assertFalse(f._owns_session)
# Externally-owned session should still be usable after Forge close.
self.assertIsNotNone(sess.adapters)
finally:
sess.close()
class TestRevokeTokenSlugQuoting(unittest.TestCase):
"""H1: revoke_token must percent-encode the slug to defeat path traversal."""
def test_revoke_token_empty_name_rejected(self) -> None:
with _forge() as f, self.assertRaises(ValueError):
f.revoke_token("")
@responses.activate
def test_revoke_token_path_traversal_is_quoted(self) -> None:
# Pre-fix this would issue DELETE /healthz; post-fix it must hit the
# admin/tokens endpoint with the slug percent-encoded.
captured: dict[str, Any] = {}
def cb(request: requests.PreparedRequest) -> tuple[int, dict[str, Any], str]:
captured["url"] = request.url
return (404, {}, json.dumps({"detail": "no such token"}))
# Match any URL under /admin/tokens/ — the bug shape would route
# somewhere else entirely.
responses.add_callback(
responses.DELETE,
f"{BASE_URL}/admin/tokens/..%2F..%2Fhealthz",
callback=cb,
)
with _forge() as f, self.assertRaises(ForgeAPIError):
f.revoke_token("../../healthz")
# Must have been routed under /admin/tokens/, not /healthz.
self.assertIn("/admin/tokens/", captured["url"])
self.assertNotIn("/healthz", captured["url"].split("/admin/tokens/")[0])
@responses.activate
def test_revoke_token_slash_in_name_quoted(self) -> None:
captured: dict[str, Any] = {}
def cb(request: requests.PreparedRequest) -> tuple[int, dict[str, Any], str]:
captured["url"] = request.url
return (200, {}, json.dumps({"ok": True}))
responses.add_callback(
responses.DELETE,
f"{BASE_URL}/admin/tokens/foo%2Fbar",
callback=cb,
)
with _forge() as f:
self.assertTrue(f.revoke_token("foo/bar"))
self.assertIn("foo%2Fbar", captured["url"])
class TestAppTokenRedaction(unittest.TestCase):
"""H2: AppToken repr/str must redact the plaintext bearer."""
def test_repr_redacts_token(self) -> None:
t = AppToken(name="x", token="cf_secret_xxxxxxxx", ip_cidrs=("10.0.0.0/8",))
r = repr(t)
self.assertNotIn("cf_secret_xxxxxxxx", r)
self.assertIn("<redacted>", r)
self.assertIn("name='x'", r)
def test_str_redacts_token(self) -> None:
t = AppToken(name="x", token="cf_secret_xxxxxxxx")
s = str(t)
self.assertNotIn("cf_secret_xxxxxxxx", s)
self.assertIn("<redacted>", s)
def test_repr_token_none_shows_none(self) -> None:
t = AppToken(name="x", token=None)
r = repr(t)
self.assertIn("token=None", r)
self.assertNotIn("<redacted>", r)
def test_format_string_doesnt_leak(self) -> None:
# `log.info("token: %s", t)` is the worry case — uses __str__.
t = AppToken(name="cauldron", token="cf_super_secret")
formatted = f"token: {t}"
self.assertNotIn("cf_super_secret", formatted)
class TestModelExceptionWrapping(unittest.TestCase):
"""M1, M2: stdlib exceptions in *.from_response must surface as ForgeError."""
def test_run_result_malformed_duration(self) -> None:
with self.assertRaises(ForgeError):
RunResult.from_response(
{"ok": True, "result": "x", "duration_ms": "not-an-int"}
)
def test_run_result_ok_false_raises_api_error(self) -> None:
# L7: ok=False should raise ForgeAPIError, not silently parse.
with self.assertRaises(ForgeAPIError) as ctx:
RunResult.from_response(
{"ok": False, "result": None, "duration_ms": 0}
)
self.assertEqual(ctx.exception.status_code, 200)
def test_file_token_missing_field(self) -> None:
with self.assertRaises(ForgeError):
FileToken.from_response({"ttl_secs": 60, "size": 5}) # no file_token
def test_file_token_bad_int(self) -> None:
with self.assertRaises(ForgeError):
FileToken.from_response(
{"file_token": "ff_x", "ttl_secs": "lots", "size": 5}
)
def test_app_token_create_missing_name(self) -> None:
with self.assertRaises(ForgeError):
AppToken.from_create_response({"token": "cf_x"})
def test_app_token_list_row_missing_name(self) -> None:
with self.assertRaises(ForgeError):
AppToken.from_list_row({"ip_cidrs": ""})
@responses.activate
def test_run_payload_with_bad_duration_surfaces_forge_error(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={"ok": True, "result": "x", "duration_ms": "junk"},
status=200,
)
with _forge() as f, self.assertRaises(ForgeError):
f.run(prompt="hi")
class TestUploadFileMissingPath(unittest.TestCase):
"""M3: missing path must surface as ForgeError, not FileNotFoundError."""
def test_missing_path_raises_forge_error(self) -> None:
with _forge() as f, self.assertRaises(ForgeError):
f.upload_file("/nonexistent/path/to/file.bin")
def test_missing_path_does_not_leak_filenotfounderror(self) -> None:
# FileNotFoundError is OSError; ForgeError is not. Verify caller can
# rely on `except ForgeError` to catch this.
with _forge() as f:
try:
f.upload_file("/nonexistent/path/to/file.bin")
except ForgeError:
pass
except FileNotFoundError:
self.fail("FileNotFoundError leaked through ForgeError boundary")
class TestUploadFileSymlinkOption(unittest.TestCase):
"""M5: optional follow_symlinks=False kwarg refuses symlinks."""
@responses.activate
def test_follow_symlinks_false_refuses_symlink(self) -> None:
import tempfile
with tempfile.NamedTemporaryFile("wb", delete=False, suffix=".txt") as tf:
tf.write(b"target")
target = tf.name
link = target + ".link"
try:
os.symlink(target, link)
with _forge() as f, self.assertRaises(ForgeError):
f.upload_file(link, follow_symlinks=False)
finally:
Path(link).unlink(missing_ok=True)
Path(target).unlink(missing_ok=True)
@responses.activate
def test_follow_symlinks_true_uploads_target(self) -> None:
import tempfile
responses.add(
responses.POST,
f"{BASE_URL}/files",
json={"file_token": "ff_sym", "ttl_secs": 60, "size": 6},
status=200,
)
with tempfile.NamedTemporaryFile("wb", delete=False, suffix=".txt") as tf:
tf.write(b"target")
target = tf.name
link = target + ".link"
try:
os.symlink(target, link)
with _forge() as f:
ft = f.upload_file(link, ttl_secs=60, follow_symlinks=True)
self.assertEqual(ft.file_token, "ff_sym")
finally:
Path(link).unlink(missing_ok=True)
Path(target).unlink(missing_ok=True)
class TestRunTimeoutValidation(unittest.TestCase):
"""M4: timeout_secs is range-validated locally."""
def test_negative_timeout_rejected(self) -> None:
with _forge() as f, self.assertRaises(ValueError):
f.run(prompt="hi", timeout_secs=-30)
def test_zero_timeout_rejected(self) -> None:
# 0 used to be falsy-substituted with the default; now it's a hard error.
with _forge() as f, self.assertRaises(ValueError):
f.run(prompt="hi", timeout_secs=0)
def test_excessive_timeout_rejected(self) -> None:
with _forge() as f, self.assertRaises(ValueError):
f.run(prompt="hi", timeout_secs=10_000)
@responses.activate
def test_min_boundary_accepted(self) -> None:
# Smoke: 5 is the minimum and must NOT trip local validation.
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"},
status=200,
)
with _forge() as f:
r = f.run(prompt="hi", timeout_secs=5)
self.assertTrue(r.ok)
@responses.activate
def test_max_boundary_accepted(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"},
status=200,
)
with _forge() as f:
r = f.run(prompt="hi", timeout_secs=600)
self.assertTrue(r.ok)
class TestHealthzValidation(unittest.TestCase):
"""M6: non-dict /healthz response raises ForgeError."""
@responses.activate
def test_healthz_string_response_raises(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/healthz",
body="OK", # plain string, not JSON
status=200,
content_type="text/plain",
)
with _forge() as f, self.assertRaises(ForgeError):
f.healthz()
@responses.activate
def test_healthz_json_list_raises(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/healthz",
json=["not", "a", "dict"],
status=200,
)
with _forge() as f, self.assertRaises(ForgeError):
f.healthz()
if __name__ == "__main__":
unittest.main()