"""Tests for the clawdforge Python SDK. Uses `responses` to intercept HTTP calls. No live network. """ from __future__ import annotations import io import json import unittest from pathlib import Path import requests import responses from clawdforge import ( AppToken, FileToken, Forge, ForgeAPIError, ForgeAuthError, ForgeError, ForgeTransportError, RunResult, ) BASE_URL = "http://192.168.0.5:8800" TOKEN = "cf_test_token_xxxxxxxx" def _forge() -> Forge: return Forge(base_url=BASE_URL, token=TOKEN, default_timeout_secs=60) class TestHealthz(unittest.TestCase): @responses.activate def test_healthz_ok(self) -> None: responses.add( responses.GET, f"{BASE_URL}/healthz", json={"ok": True, "claude_present": True, "claude_version": "1.2.3"}, status=200, ) with _forge() as f: out = f.healthz() self.assertEqual(out["ok"], True) self.assertEqual(out["claude_version"], "1.2.3") @responses.activate def test_healthz_no_auth_header_still_sent(self) -> None: # The server doesn't *require* auth on /healthz, but we still send the # bearer because we set it as a default header. Verify it's present. captured = {} def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: captured["auth"] = request.headers.get("Authorization") return (200, {}, json.dumps({"ok": True, "claude_present": True, "claude_version": "x"})) responses.add_callback(responses.GET, f"{BASE_URL}/healthz", callback=cb) with _forge() as f: f.healthz() self.assertEqual(captured["auth"], f"Bearer {TOKEN}") class TestRun(unittest.TestCase): @responses.activate def test_run_success_json_result(self) -> None: responses.add( responses.POST, f"{BASE_URL}/run", json={ "ok": True, "result": {"hello": "world"}, "duration_ms": 1234, "stop_reason": "end_turn", }, status=200, ) with _forge() as f: r = f.run(prompt='Reply with JSON: {"hello": "world"}') self.assertIsInstance(r, RunResult) self.assertTrue(r.ok) self.assertEqual(r.result, {"hello": "world"}) self.assertEqual(r.duration_ms, 1234) self.assertEqual(r.stop_reason, "end_turn") @responses.activate def test_run_success_string_result(self) -> None: responses.add( responses.POST, f"{BASE_URL}/run", json={ "ok": True, "result": "plain text reply", "duration_ms": 800, "stop_reason": "end_turn", }, status=200, ) with _forge() as f: r = f.run(prompt="hello") self.assertEqual(r.result, "plain text reply") @responses.activate def test_run_sends_expected_body(self) -> None: captured: dict = {} def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: captured["body"] = json.loads(request.body) captured["auth"] = request.headers.get("Authorization") return ( 200, {}, json.dumps({"ok": True, "result": {}, "duration_ms": 1, "stop_reason": "end_turn"}), ) responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb) with _forge() as f: f.run( prompt="hi", model="opus", system="be terse", files=["ff_abc"], timeout_secs=42, ) self.assertEqual(captured["auth"], f"Bearer {TOKEN}") self.assertEqual( captured["body"], { "prompt": "hi", "model": "opus", "system": "be terse", "files": ["ff_abc"], "timeout_secs": 42, }, ) @responses.activate def test_run_uses_default_model_when_omitted(self) -> None: captured: dict = {} def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: captured["body"] = json.loads(request.body) return ( 200, {}, json.dumps({"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"}), ) responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb) with _forge() as f: f.run(prompt="hi") self.assertEqual(captured["body"]["model"], "sonnet") self.assertNotIn("system", captured["body"]) self.assertNotIn("files", captured["body"]) self.assertNotIn("timeout_secs", captured["body"]) @responses.activate def test_run_502_raises_api_error_with_body(self) -> None: responses.add( responses.POST, f"{BASE_URL}/run", json={ "ok": False, "error": "subprocess timed out", "stderr": "...", "duration_ms": 60000, "stop_reason": "timeout", }, status=502, ) with _forge() as f, self.assertRaises(ForgeAPIError) as ctx: f.run(prompt="hi", timeout_secs=60) err = ctx.exception self.assertEqual(err.status_code, 502) self.assertIsInstance(err.body, dict) self.assertEqual(err.body["stop_reason"], "timeout") self.assertIn("subprocess timed out", err.message) @responses.activate def test_run_401_raises_auth_error(self) -> None: responses.add( responses.POST, f"{BASE_URL}/run", json={"detail": "missing bearer"}, status=401, ) with _forge() as f, self.assertRaises(ForgeAuthError) as ctx: f.run(prompt="hi") self.assertEqual(ctx.exception.status_code, 401) # ForgeAuthError is-a ForgeAPIError is-a ForgeError self.assertIsInstance(ctx.exception, ForgeAPIError) self.assertIsInstance(ctx.exception, ForgeError) @responses.activate def test_run_403_raises_auth_error(self) -> None: responses.add( responses.POST, f"{BASE_URL}/run", json={"detail": "ip not in allowlist: 10.0.0.1"}, status=403, ) with _forge() as f, self.assertRaises(ForgeAuthError): f.run(prompt="hi") @responses.activate def test_run_transport_error(self) -> None: # No registered response → responses raises ConnectionError. with _forge() as f, self.assertRaises(ForgeTransportError): f.run(prompt="hi") def test_run_empty_prompt_rejected_locally(self) -> None: with _forge() as f, self.assertRaises(ValueError): f.run(prompt="") @responses.activate def test_run_http_timeout_uses_margin(self) -> None: captured: dict = {} def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: # `responses` doesn't surface the timeout directly, but we can # check via the prepared request hook that we got here at all. captured["called"] = True return ( 200, {}, json.dumps({"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"}), ) responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb) with _forge() as f: # Verify the computation: timeout_secs=42 → http_timeout=72. # We poke at the private margin only to guard the formula stays # in sync with the readme/contract. self.assertEqual(f.http_timeout_margin, 30) f.run(prompt="hi", timeout_secs=42) self.assertTrue(captured.get("called")) class TestFiles(unittest.TestCase): @responses.activate def test_upload_file_from_path(self) -> None: responses.add( responses.POST, f"{BASE_URL}/files", json={"file_token": "ff_abc123", "ttl_secs": 3600, "size": 11}, status=200, ) # Write a small temp file. import tempfile with tempfile.NamedTemporaryFile("wb", delete=False, suffix=".txt") as tf: tf.write(b"hello world") tmp_path = tf.name try: with _forge() as f: ft = f.upload_file(tmp_path, ttl_secs=3600) self.assertIsInstance(ft, FileToken) self.assertEqual(ft.file_token, "ff_abc123") self.assertEqual(ft.ttl_secs, 3600) self.assertEqual(ft.size, 11) finally: Path(tmp_path).unlink(missing_ok=True) @responses.activate def test_upload_file_from_fileobj(self) -> None: captured: dict = {} def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: ct = request.headers.get("Content-Type", "") captured["content_type"] = ct captured["body_starts_multipart"] = ct.startswith("multipart/form-data") return ( 200, {}, json.dumps({"file_token": "ff_xyz", "ttl_secs": 60, "size": 5}), ) responses.add_callback(responses.POST, f"{BASE_URL}/files", callback=cb) buf = io.BytesIO(b"hello") with _forge() as f: ft = f.upload_file(buf, ttl_secs=60, filename="snippet.txt") self.assertEqual(ft.file_token, "ff_xyz") self.assertTrue(captured["body_starts_multipart"]) @responses.activate def test_upload_file_400_raises_api_error(self) -> None: responses.add( responses.POST, f"{BASE_URL}/files", json={"detail": "ttl_secs out of range (60..86400)"}, status=400, ) buf = io.BytesIO(b"x") with _forge() as f, self.assertRaises(ForgeAPIError) as ctx: f.upload_file(buf, ttl_secs=10, filename="x.bin") self.assertEqual(ctx.exception.status_code, 400) class TestAdminTokens(unittest.TestCase): @responses.activate def test_create_token(self) -> None: responses.add( responses.POST, f"{BASE_URL}/admin/tokens", json={ "name": "cauldron", "token": "cf_brandnew_xxx", "ip_cidrs": ["172.24.0.0/16"], }, status=200, ) with _forge() as f: t = f.create_token("cauldron", ip_cidrs=["172.24.0.0/16"]) self.assertIsInstance(t, AppToken) self.assertEqual(t.name, "cauldron") self.assertEqual(t.token, "cf_brandnew_xxx") self.assertEqual(t.ip_cidrs, ["172.24.0.0/16"]) @responses.activate def test_list_tokens(self) -> None: responses.add( responses.GET, f"{BASE_URL}/admin/tokens", json={ "tokens": [ { "name": "cauldron", "ip_cidrs": "172.24.0.0/16", "created_at": 100, "last_used": 200, "enabled": 1, }, { "name": "petalparse", "ip_cidrs": "", "created_at": 50, "last_used": None, "enabled": 0, }, ] }, status=200, ) with _forge() as f: toks = f.list_tokens() self.assertEqual(len(toks), 2) self.assertEqual(toks[0].name, "cauldron") self.assertEqual(toks[0].ip_cidrs, ["172.24.0.0/16"]) self.assertTrue(toks[0].enabled) self.assertIsNone(toks[0].token) self.assertEqual(toks[1].ip_cidrs, []) self.assertFalse(toks[1].enabled) @responses.activate def test_revoke_token_ok(self) -> None: responses.add( responses.DELETE, f"{BASE_URL}/admin/tokens/cauldron", json={"ok": True}, status=200, ) with _forge() as f: self.assertTrue(f.revoke_token("cauldron")) @responses.activate def test_revoke_token_404(self) -> None: responses.add( responses.DELETE, f"{BASE_URL}/admin/tokens/nosuch", json={"detail": "no such token"}, status=404, ) with _forge() as f, self.assertRaises(ForgeAPIError) as ctx: f.revoke_token("nosuch") self.assertEqual(ctx.exception.status_code, 404) class TestForgeConstruction(unittest.TestCase): def test_requires_base_url(self) -> None: with self.assertRaises(ValueError): Forge(base_url="", token="cf_x") def test_requires_token(self) -> None: with self.assertRaises(ValueError): Forge(base_url="http://x", token="") def test_strips_trailing_slash(self) -> None: f = Forge(base_url="http://x:8800/", token="cf_x") self.assertEqual(f.base_url, "http://x:8800") def test_context_manager_closes_owned_session(self) -> None: f = Forge(base_url="http://x", token="cf_x") with f: self.assertTrue(f._owns_session) # After close(), session.close() was called — calling close() again is safe. f.close() def test_external_session_not_closed(self) -> None: sess = requests.Session() try: with Forge(base_url="http://x", token="cf_x", session=sess) as f: self.assertFalse(f._owns_session) # Externally-owned session should still be usable after Forge close. self.assertIsNotNone(sess.adapters) finally: sess.close() if __name__ == "__main__": unittest.main()