Sync requests-based SDK in clients/python/. Wraps /healthz, /run, /files, and /admin/tokens behind a Forge class with typed exceptions (ForgeError + Transport/API/Auth subclasses) and dataclass response shapes (RunResult, FileToken, AppToken). HTTP timeout = run timeout + 30s margin, matching the pattern cauldron has been running inline. No retries — caller's job since /run isn't idempotent. 24 unit tests via responses, all passing. Install with pip install -e clients/python/.
411 lines
14 KiB
Python
411 lines
14 KiB
Python
"""Tests for the clawdforge Python SDK.
|
|
|
|
Uses `responses` to intercept HTTP calls. No live network.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import json
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
import responses
|
|
|
|
from clawdforge import (
|
|
AppToken,
|
|
FileToken,
|
|
Forge,
|
|
ForgeAPIError,
|
|
ForgeAuthError,
|
|
ForgeError,
|
|
ForgeTransportError,
|
|
RunResult,
|
|
)
|
|
|
|
|
|
BASE_URL = "http://192.168.0.5:8800"
|
|
TOKEN = "cf_test_token_xxxxxxxx"
|
|
|
|
|
|
def _forge() -> Forge:
|
|
return Forge(base_url=BASE_URL, token=TOKEN, default_timeout_secs=60)
|
|
|
|
|
|
class TestHealthz(unittest.TestCase):
|
|
@responses.activate
|
|
def test_healthz_ok(self) -> None:
|
|
responses.add(
|
|
responses.GET,
|
|
f"{BASE_URL}/healthz",
|
|
json={"ok": True, "claude_present": True, "claude_version": "1.2.3"},
|
|
status=200,
|
|
)
|
|
with _forge() as f:
|
|
out = f.healthz()
|
|
self.assertEqual(out["ok"], True)
|
|
self.assertEqual(out["claude_version"], "1.2.3")
|
|
|
|
@responses.activate
|
|
def test_healthz_no_auth_header_still_sent(self) -> None:
|
|
# The server doesn't *require* auth on /healthz, but we still send the
|
|
# bearer because we set it as a default header. Verify it's present.
|
|
captured = {}
|
|
|
|
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
captured["auth"] = request.headers.get("Authorization")
|
|
return (200, {}, json.dumps({"ok": True, "claude_present": True, "claude_version": "x"}))
|
|
|
|
responses.add_callback(responses.GET, f"{BASE_URL}/healthz", callback=cb)
|
|
with _forge() as f:
|
|
f.healthz()
|
|
self.assertEqual(captured["auth"], f"Bearer {TOKEN}")
|
|
|
|
|
|
class TestRun(unittest.TestCase):
|
|
@responses.activate
|
|
def test_run_success_json_result(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/run",
|
|
json={
|
|
"ok": True,
|
|
"result": {"hello": "world"},
|
|
"duration_ms": 1234,
|
|
"stop_reason": "end_turn",
|
|
},
|
|
status=200,
|
|
)
|
|
with _forge() as f:
|
|
r = f.run(prompt='Reply with JSON: {"hello": "world"}')
|
|
self.assertIsInstance(r, RunResult)
|
|
self.assertTrue(r.ok)
|
|
self.assertEqual(r.result, {"hello": "world"})
|
|
self.assertEqual(r.duration_ms, 1234)
|
|
self.assertEqual(r.stop_reason, "end_turn")
|
|
|
|
@responses.activate
|
|
def test_run_success_string_result(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/run",
|
|
json={
|
|
"ok": True,
|
|
"result": "plain text reply",
|
|
"duration_ms": 800,
|
|
"stop_reason": "end_turn",
|
|
},
|
|
status=200,
|
|
)
|
|
with _forge() as f:
|
|
r = f.run(prompt="hello")
|
|
self.assertEqual(r.result, "plain text reply")
|
|
|
|
@responses.activate
|
|
def test_run_sends_expected_body(self) -> None:
|
|
captured: dict = {}
|
|
|
|
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
captured["body"] = json.loads(request.body)
|
|
captured["auth"] = request.headers.get("Authorization")
|
|
return (
|
|
200,
|
|
{},
|
|
json.dumps({"ok": True, "result": {}, "duration_ms": 1, "stop_reason": "end_turn"}),
|
|
)
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
|
|
with _forge() as f:
|
|
f.run(
|
|
prompt="hi",
|
|
model="opus",
|
|
system="be terse",
|
|
files=["ff_abc"],
|
|
timeout_secs=42,
|
|
)
|
|
self.assertEqual(captured["auth"], f"Bearer {TOKEN}")
|
|
self.assertEqual(
|
|
captured["body"],
|
|
{
|
|
"prompt": "hi",
|
|
"model": "opus",
|
|
"system": "be terse",
|
|
"files": ["ff_abc"],
|
|
"timeout_secs": 42,
|
|
},
|
|
)
|
|
|
|
@responses.activate
|
|
def test_run_uses_default_model_when_omitted(self) -> None:
|
|
captured: dict = {}
|
|
|
|
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
captured["body"] = json.loads(request.body)
|
|
return (
|
|
200,
|
|
{},
|
|
json.dumps({"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"}),
|
|
)
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
|
|
with _forge() as f:
|
|
f.run(prompt="hi")
|
|
self.assertEqual(captured["body"]["model"], "sonnet")
|
|
self.assertNotIn("system", captured["body"])
|
|
self.assertNotIn("files", captured["body"])
|
|
self.assertNotIn("timeout_secs", captured["body"])
|
|
|
|
@responses.activate
|
|
def test_run_502_raises_api_error_with_body(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/run",
|
|
json={
|
|
"ok": False,
|
|
"error": "subprocess timed out",
|
|
"stderr": "...",
|
|
"duration_ms": 60000,
|
|
"stop_reason": "timeout",
|
|
},
|
|
status=502,
|
|
)
|
|
with _forge() as f, self.assertRaises(ForgeAPIError) as ctx:
|
|
f.run(prompt="hi", timeout_secs=60)
|
|
err = ctx.exception
|
|
self.assertEqual(err.status_code, 502)
|
|
self.assertIsInstance(err.body, dict)
|
|
self.assertEqual(err.body["stop_reason"], "timeout")
|
|
self.assertIn("subprocess timed out", err.message)
|
|
|
|
@responses.activate
|
|
def test_run_401_raises_auth_error(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/run",
|
|
json={"detail": "missing bearer"},
|
|
status=401,
|
|
)
|
|
with _forge() as f, self.assertRaises(ForgeAuthError) as ctx:
|
|
f.run(prompt="hi")
|
|
self.assertEqual(ctx.exception.status_code, 401)
|
|
# ForgeAuthError is-a ForgeAPIError is-a ForgeError
|
|
self.assertIsInstance(ctx.exception, ForgeAPIError)
|
|
self.assertIsInstance(ctx.exception, ForgeError)
|
|
|
|
@responses.activate
|
|
def test_run_403_raises_auth_error(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/run",
|
|
json={"detail": "ip not in allowlist: 10.0.0.1"},
|
|
status=403,
|
|
)
|
|
with _forge() as f, self.assertRaises(ForgeAuthError):
|
|
f.run(prompt="hi")
|
|
|
|
@responses.activate
|
|
def test_run_transport_error(self) -> None:
|
|
# No registered response → responses raises ConnectionError.
|
|
with _forge() as f, self.assertRaises(ForgeTransportError):
|
|
f.run(prompt="hi")
|
|
|
|
def test_run_empty_prompt_rejected_locally(self) -> None:
|
|
with _forge() as f, self.assertRaises(ValueError):
|
|
f.run(prompt="")
|
|
|
|
@responses.activate
|
|
def test_run_http_timeout_uses_margin(self) -> None:
|
|
captured: dict = {}
|
|
|
|
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
# `responses` doesn't surface the timeout directly, but we can
|
|
# check via the prepared request hook that we got here at all.
|
|
captured["called"] = True
|
|
return (
|
|
200,
|
|
{},
|
|
json.dumps({"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"}),
|
|
)
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
|
|
with _forge() as f:
|
|
# Verify the computation: timeout_secs=42 → http_timeout=72.
|
|
# We poke at the private margin only to guard the formula stays
|
|
# in sync with the readme/contract.
|
|
self.assertEqual(f.http_timeout_margin, 30)
|
|
f.run(prompt="hi", timeout_secs=42)
|
|
self.assertTrue(captured.get("called"))
|
|
|
|
|
|
class TestFiles(unittest.TestCase):
|
|
@responses.activate
|
|
def test_upload_file_from_path(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/files",
|
|
json={"file_token": "ff_abc123", "ttl_secs": 3600, "size": 11},
|
|
status=200,
|
|
)
|
|
# Write a small temp file.
|
|
import tempfile
|
|
|
|
with tempfile.NamedTemporaryFile("wb", delete=False, suffix=".txt") as tf:
|
|
tf.write(b"hello world")
|
|
tmp_path = tf.name
|
|
try:
|
|
with _forge() as f:
|
|
ft = f.upload_file(tmp_path, ttl_secs=3600)
|
|
self.assertIsInstance(ft, FileToken)
|
|
self.assertEqual(ft.file_token, "ff_abc123")
|
|
self.assertEqual(ft.ttl_secs, 3600)
|
|
self.assertEqual(ft.size, 11)
|
|
finally:
|
|
Path(tmp_path).unlink(missing_ok=True)
|
|
|
|
@responses.activate
|
|
def test_upload_file_from_fileobj(self) -> None:
|
|
captured: dict = {}
|
|
|
|
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
|
|
ct = request.headers.get("Content-Type", "")
|
|
captured["content_type"] = ct
|
|
captured["body_starts_multipart"] = ct.startswith("multipart/form-data")
|
|
return (
|
|
200,
|
|
{},
|
|
json.dumps({"file_token": "ff_xyz", "ttl_secs": 60, "size": 5}),
|
|
)
|
|
|
|
responses.add_callback(responses.POST, f"{BASE_URL}/files", callback=cb)
|
|
buf = io.BytesIO(b"hello")
|
|
with _forge() as f:
|
|
ft = f.upload_file(buf, ttl_secs=60, filename="snippet.txt")
|
|
self.assertEqual(ft.file_token, "ff_xyz")
|
|
self.assertTrue(captured["body_starts_multipart"])
|
|
|
|
@responses.activate
|
|
def test_upload_file_400_raises_api_error(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/files",
|
|
json={"detail": "ttl_secs out of range (60..86400)"},
|
|
status=400,
|
|
)
|
|
buf = io.BytesIO(b"x")
|
|
with _forge() as f, self.assertRaises(ForgeAPIError) as ctx:
|
|
f.upload_file(buf, ttl_secs=10, filename="x.bin")
|
|
self.assertEqual(ctx.exception.status_code, 400)
|
|
|
|
|
|
class TestAdminTokens(unittest.TestCase):
|
|
@responses.activate
|
|
def test_create_token(self) -> None:
|
|
responses.add(
|
|
responses.POST,
|
|
f"{BASE_URL}/admin/tokens",
|
|
json={
|
|
"name": "cauldron",
|
|
"token": "cf_brandnew_xxx",
|
|
"ip_cidrs": ["172.24.0.0/16"],
|
|
},
|
|
status=200,
|
|
)
|
|
with _forge() as f:
|
|
t = f.create_token("cauldron", ip_cidrs=["172.24.0.0/16"])
|
|
self.assertIsInstance(t, AppToken)
|
|
self.assertEqual(t.name, "cauldron")
|
|
self.assertEqual(t.token, "cf_brandnew_xxx")
|
|
self.assertEqual(t.ip_cidrs, ["172.24.0.0/16"])
|
|
|
|
@responses.activate
|
|
def test_list_tokens(self) -> None:
|
|
responses.add(
|
|
responses.GET,
|
|
f"{BASE_URL}/admin/tokens",
|
|
json={
|
|
"tokens": [
|
|
{
|
|
"name": "cauldron",
|
|
"ip_cidrs": "172.24.0.0/16",
|
|
"created_at": 100,
|
|
"last_used": 200,
|
|
"enabled": 1,
|
|
},
|
|
{
|
|
"name": "petalparse",
|
|
"ip_cidrs": "",
|
|
"created_at": 50,
|
|
"last_used": None,
|
|
"enabled": 0,
|
|
},
|
|
]
|
|
},
|
|
status=200,
|
|
)
|
|
with _forge() as f:
|
|
toks = f.list_tokens()
|
|
self.assertEqual(len(toks), 2)
|
|
self.assertEqual(toks[0].name, "cauldron")
|
|
self.assertEqual(toks[0].ip_cidrs, ["172.24.0.0/16"])
|
|
self.assertTrue(toks[0].enabled)
|
|
self.assertIsNone(toks[0].token)
|
|
self.assertEqual(toks[1].ip_cidrs, [])
|
|
self.assertFalse(toks[1].enabled)
|
|
|
|
@responses.activate
|
|
def test_revoke_token_ok(self) -> None:
|
|
responses.add(
|
|
responses.DELETE,
|
|
f"{BASE_URL}/admin/tokens/cauldron",
|
|
json={"ok": True},
|
|
status=200,
|
|
)
|
|
with _forge() as f:
|
|
self.assertTrue(f.revoke_token("cauldron"))
|
|
|
|
@responses.activate
|
|
def test_revoke_token_404(self) -> None:
|
|
responses.add(
|
|
responses.DELETE,
|
|
f"{BASE_URL}/admin/tokens/nosuch",
|
|
json={"detail": "no such token"},
|
|
status=404,
|
|
)
|
|
with _forge() as f, self.assertRaises(ForgeAPIError) as ctx:
|
|
f.revoke_token("nosuch")
|
|
self.assertEqual(ctx.exception.status_code, 404)
|
|
|
|
|
|
class TestForgeConstruction(unittest.TestCase):
|
|
def test_requires_base_url(self) -> None:
|
|
with self.assertRaises(ValueError):
|
|
Forge(base_url="", token="cf_x")
|
|
|
|
def test_requires_token(self) -> None:
|
|
with self.assertRaises(ValueError):
|
|
Forge(base_url="http://x", token="")
|
|
|
|
def test_strips_trailing_slash(self) -> None:
|
|
f = Forge(base_url="http://x:8800/", token="cf_x")
|
|
self.assertEqual(f.base_url, "http://x:8800")
|
|
|
|
def test_context_manager_closes_owned_session(self) -> None:
|
|
f = Forge(base_url="http://x", token="cf_x")
|
|
with f:
|
|
self.assertTrue(f._owns_session)
|
|
# After close(), session.close() was called — calling close() again is safe.
|
|
f.close()
|
|
|
|
def test_external_session_not_closed(self) -> None:
|
|
sess = requests.Session()
|
|
try:
|
|
with Forge(base_url="http://x", token="cf_x", session=sess) as f:
|
|
self.assertFalse(f._owns_session)
|
|
# Externally-owned session should still be usable after Forge close.
|
|
self.assertIsNotNone(sess.adapters)
|
|
finally:
|
|
sess.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|