"""Tests for the clawdforge Python SDK. Uses `responses` to intercept HTTP calls. No live network. """ from __future__ import annotations import io import json import os import unittest from pathlib import Path from typing import Any import requests import responses from clawdforge import ( AppToken, FileToken, Forge, ForgeAPIError, ForgeAuthError, ForgeError, ForgeTransportError, RunResult, ) BASE_URL = "http://192.168.0.5:8800" TOKEN = "cf_test_token_xxxxxxxx" def _forge() -> Forge: return Forge(base_url=BASE_URL, token=TOKEN, default_timeout_secs=60) class TestHealthz(unittest.TestCase): @responses.activate def test_healthz_ok(self) -> None: responses.add( responses.GET, f"{BASE_URL}/healthz", json={"ok": True, "claude_present": True, "claude_version": "1.2.3"}, status=200, ) with _forge() as f: out = f.healthz() self.assertEqual(out["ok"], True) self.assertEqual(out["claude_version"], "1.2.3") @responses.activate def test_healthz_no_auth_header_still_sent(self) -> None: # The server doesn't *require* auth on /healthz, but we still send the # bearer because we set it as a default header. Verify it's present. captured = {} def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: captured["auth"] = request.headers.get("Authorization") return (200, {}, json.dumps({"ok": True, "claude_present": True, "claude_version": "x"})) responses.add_callback(responses.GET, f"{BASE_URL}/healthz", callback=cb) with _forge() as f: f.healthz() self.assertEqual(captured["auth"], f"Bearer {TOKEN}") class TestRun(unittest.TestCase): @responses.activate def test_run_success_json_result(self) -> None: responses.add( responses.POST, f"{BASE_URL}/run", json={ "ok": True, "result": {"hello": "world"}, "duration_ms": 1234, "stop_reason": "end_turn", }, status=200, ) with _forge() as f: r = f.run(prompt='Reply with JSON: {"hello": "world"}') self.assertIsInstance(r, RunResult) self.assertTrue(r.ok) self.assertEqual(r.result, {"hello": "world"}) self.assertEqual(r.duration_ms, 1234) self.assertEqual(r.stop_reason, "end_turn") @responses.activate def test_run_success_string_result(self) -> None: responses.add( responses.POST, f"{BASE_URL}/run", json={ "ok": True, "result": "plain text reply", "duration_ms": 800, "stop_reason": "end_turn", }, status=200, ) with _forge() as f: r = f.run(prompt="hello") self.assertEqual(r.result, "plain text reply") @responses.activate def test_run_sends_expected_body(self) -> None: captured: dict = {} def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: captured["body"] = json.loads(request.body) captured["auth"] = request.headers.get("Authorization") return ( 200, {}, json.dumps({"ok": True, "result": {}, "duration_ms": 1, "stop_reason": "end_turn"}), ) responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb) with _forge() as f: f.run( prompt="hi", model="opus", system="be terse", files=["ff_abc"], timeout_secs=42, ) self.assertEqual(captured["auth"], f"Bearer {TOKEN}") self.assertEqual( captured["body"], { "prompt": "hi", "model": "opus", "system": "be terse", "files": ["ff_abc"], "timeout_secs": 42, }, ) @responses.activate def test_run_uses_default_model_when_omitted(self) -> None: captured: dict = {} def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: captured["body"] = json.loads(request.body) return ( 200, {}, json.dumps({"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"}), ) responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb) with _forge() as f: f.run(prompt="hi") self.assertEqual(captured["body"]["model"], "sonnet") self.assertNotIn("system", captured["body"]) self.assertNotIn("files", captured["body"]) self.assertNotIn("timeout_secs", captured["body"]) @responses.activate def test_run_502_raises_api_error_with_body(self) -> None: responses.add( responses.POST, f"{BASE_URL}/run", json={ "ok": False, "error": "subprocess timed out", "stderr": "...", "duration_ms": 60000, "stop_reason": "timeout", }, status=502, ) with _forge() as f, self.assertRaises(ForgeAPIError) as ctx: f.run(prompt="hi", timeout_secs=60) err = ctx.exception self.assertEqual(err.status_code, 502) self.assertIsInstance(err.body, dict) self.assertEqual(err.body["stop_reason"], "timeout") self.assertIn("subprocess timed out", err.message) @responses.activate def test_run_401_raises_auth_error(self) -> None: responses.add( responses.POST, f"{BASE_URL}/run", json={"detail": "missing bearer"}, status=401, ) with _forge() as f, self.assertRaises(ForgeAuthError) as ctx: f.run(prompt="hi") self.assertEqual(ctx.exception.status_code, 401) # ForgeAuthError is-a ForgeAPIError is-a ForgeError self.assertIsInstance(ctx.exception, ForgeAPIError) self.assertIsInstance(ctx.exception, ForgeError) @responses.activate def test_run_403_raises_auth_error(self) -> None: responses.add( responses.POST, f"{BASE_URL}/run", json={"detail": "ip not in allowlist: 10.0.0.1"}, status=403, ) with _forge() as f, self.assertRaises(ForgeAuthError): f.run(prompt="hi") @responses.activate def test_run_transport_error(self) -> None: # No registered response → responses raises ConnectionError. with _forge() as f, self.assertRaises(ForgeTransportError): f.run(prompt="hi") def test_run_empty_prompt_rejected_locally(self) -> None: with _forge() as f, self.assertRaises(ValueError): f.run(prompt="") @responses.activate def test_run_http_timeout_uses_margin(self) -> None: captured: dict = {} def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: # `responses` doesn't surface the timeout directly, but we can # check via the prepared request hook that we got here at all. captured["called"] = True return ( 200, {}, json.dumps({"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"}), ) responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb) with _forge() as f: # Verify the computation: timeout_secs=42 → http_timeout=72. # We poke at the private margin only to guard the formula stays # in sync with the readme/contract. self.assertEqual(f.http_timeout_margin, 30) f.run(prompt="hi", timeout_secs=42) self.assertTrue(captured.get("called")) class TestFiles(unittest.TestCase): @responses.activate def test_upload_file_from_path(self) -> None: responses.add( responses.POST, f"{BASE_URL}/files", json={"file_token": "ff_abc123", "ttl_secs": 3600, "size": 11}, status=200, ) # Write a small temp file. import tempfile with tempfile.NamedTemporaryFile("wb", delete=False, suffix=".txt") as tf: tf.write(b"hello world") tmp_path = tf.name try: with _forge() as f: ft = f.upload_file(tmp_path, ttl_secs=3600) self.assertIsInstance(ft, FileToken) self.assertEqual(ft.file_token, "ff_abc123") self.assertEqual(ft.ttl_secs, 3600) self.assertEqual(ft.size, 11) finally: Path(tmp_path).unlink(missing_ok=True) @responses.activate def test_upload_file_from_fileobj(self) -> None: captured: dict = {} def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]: ct = request.headers.get("Content-Type", "") captured["content_type"] = ct captured["body_starts_multipart"] = ct.startswith("multipart/form-data") return ( 200, {}, json.dumps({"file_token": "ff_xyz", "ttl_secs": 60, "size": 5}), ) responses.add_callback(responses.POST, f"{BASE_URL}/files", callback=cb) buf = io.BytesIO(b"hello") with _forge() as f: ft = f.upload_file(buf, ttl_secs=60, filename="snippet.txt") self.assertEqual(ft.file_token, "ff_xyz") self.assertTrue(captured["body_starts_multipart"]) @responses.activate def test_upload_file_400_raises_api_error(self) -> None: responses.add( responses.POST, f"{BASE_URL}/files", json={"detail": "ttl_secs out of range (60..86400)"}, status=400, ) buf = io.BytesIO(b"x") with _forge() as f, self.assertRaises(ForgeAPIError) as ctx: f.upload_file(buf, ttl_secs=10, filename="x.bin") self.assertEqual(ctx.exception.status_code, 400) class TestAdminTokens(unittest.TestCase): @responses.activate def test_create_token(self) -> None: responses.add( responses.POST, f"{BASE_URL}/admin/tokens", json={ "name": "cauldron", "token": "cf_brandnew_xxx", "ip_cidrs": ["172.24.0.0/16"], }, status=200, ) with _forge() as f: t = f.create_token("cauldron", ip_cidrs=["172.24.0.0/16"]) self.assertIsInstance(t, AppToken) self.assertEqual(t.name, "cauldron") self.assertEqual(t.token, "cf_brandnew_xxx") self.assertEqual(t.ip_cidrs, ("172.24.0.0/16",)) @responses.activate def test_list_tokens(self) -> None: responses.add( responses.GET, f"{BASE_URL}/admin/tokens", json={ "tokens": [ { "name": "cauldron", "ip_cidrs": "172.24.0.0/16", "created_at": 100, "last_used": 200, "enabled": 1, }, { "name": "petalparse", "ip_cidrs": "", "created_at": 50, "last_used": None, "enabled": 0, }, ] }, status=200, ) with _forge() as f: toks = f.list_tokens() self.assertEqual(len(toks), 2) self.assertEqual(toks[0].name, "cauldron") self.assertEqual(toks[0].ip_cidrs, ("172.24.0.0/16",)) self.assertTrue(toks[0].enabled) self.assertIsNone(toks[0].token) self.assertEqual(toks[1].ip_cidrs, ()) self.assertFalse(toks[1].enabled) @responses.activate def test_revoke_token_ok(self) -> None: responses.add( responses.DELETE, f"{BASE_URL}/admin/tokens/cauldron", json={"ok": True}, status=200, ) with _forge() as f: self.assertTrue(f.revoke_token("cauldron")) @responses.activate def test_revoke_token_404(self) -> None: responses.add( responses.DELETE, f"{BASE_URL}/admin/tokens/nosuch", json={"detail": "no such token"}, status=404, ) with _forge() as f, self.assertRaises(ForgeAPIError) as ctx: f.revoke_token("nosuch") self.assertEqual(ctx.exception.status_code, 404) class TestForgeConstruction(unittest.TestCase): def test_requires_base_url(self) -> None: with self.assertRaises(ValueError): Forge(base_url="", token="cf_x") def test_requires_token(self) -> None: with self.assertRaises(ValueError): Forge(base_url="http://x", token="") def test_strips_trailing_slash(self) -> None: f = Forge(base_url="http://x:8800/", token="cf_x") self.assertEqual(f.base_url, "http://x:8800") def test_context_manager_closes_owned_session(self) -> None: f = Forge(base_url="http://x", token="cf_x") with f: self.assertTrue(f._owns_session) # After close(), session.close() was called — calling close() again is safe. f.close() def test_external_session_not_closed(self) -> None: sess = requests.Session() try: with Forge(base_url="http://x", token="cf_x", session=sess) as f: self.assertFalse(f._owns_session) # Externally-owned session should still be usable after Forge close. self.assertIsNotNone(sess.adapters) finally: sess.close() class TestRevokeTokenSlugQuoting(unittest.TestCase): """H1: revoke_token must percent-encode the slug to defeat path traversal.""" def test_revoke_token_empty_name_rejected(self) -> None: with _forge() as f, self.assertRaises(ValueError): f.revoke_token("") @responses.activate def test_revoke_token_path_traversal_is_quoted(self) -> None: # Pre-fix this would issue DELETE /healthz; post-fix it must hit the # admin/tokens endpoint with the slug percent-encoded. captured: dict[str, Any] = {} def cb(request: requests.PreparedRequest) -> tuple[int, dict[str, Any], str]: captured["url"] = request.url return (404, {}, json.dumps({"detail": "no such token"})) # Match any URL under /admin/tokens/ — the bug shape would route # somewhere else entirely. responses.add_callback( responses.DELETE, f"{BASE_URL}/admin/tokens/..%2F..%2Fhealthz", callback=cb, ) with _forge() as f, self.assertRaises(ForgeAPIError): f.revoke_token("../../healthz") # Must have been routed under /admin/tokens/, not /healthz. self.assertIn("/admin/tokens/", captured["url"]) self.assertNotIn("/healthz", captured["url"].split("/admin/tokens/")[0]) @responses.activate def test_revoke_token_slash_in_name_quoted(self) -> None: captured: dict[str, Any] = {} def cb(request: requests.PreparedRequest) -> tuple[int, dict[str, Any], str]: captured["url"] = request.url return (200, {}, json.dumps({"ok": True})) responses.add_callback( responses.DELETE, f"{BASE_URL}/admin/tokens/foo%2Fbar", callback=cb, ) with _forge() as f: self.assertTrue(f.revoke_token("foo/bar")) self.assertIn("foo%2Fbar", captured["url"]) class TestAppTokenRedaction(unittest.TestCase): """H2: AppToken repr/str must redact the plaintext bearer.""" def test_repr_redacts_token(self) -> None: t = AppToken(name="x", token="cf_secret_xxxxxxxx", ip_cidrs=("10.0.0.0/8",)) r = repr(t) self.assertNotIn("cf_secret_xxxxxxxx", r) self.assertIn("", r) self.assertIn("name='x'", r) def test_str_redacts_token(self) -> None: t = AppToken(name="x", token="cf_secret_xxxxxxxx") s = str(t) self.assertNotIn("cf_secret_xxxxxxxx", s) self.assertIn("", s) def test_repr_token_none_shows_none(self) -> None: t = AppToken(name="x", token=None) r = repr(t) self.assertIn("token=None", r) self.assertNotIn("", r) def test_format_string_doesnt_leak(self) -> None: # `log.info("token: %s", t)` is the worry case — uses __str__. t = AppToken(name="cauldron", token="cf_super_secret") formatted = f"token: {t}" self.assertNotIn("cf_super_secret", formatted) class TestModelExceptionWrapping(unittest.TestCase): """M1, M2: stdlib exceptions in *.from_response must surface as ForgeError.""" def test_run_result_malformed_duration(self) -> None: with self.assertRaises(ForgeError): RunResult.from_response( {"ok": True, "result": "x", "duration_ms": "not-an-int"} ) def test_run_result_ok_false_raises_api_error(self) -> None: # L7: ok=False should raise ForgeAPIError, not silently parse. with self.assertRaises(ForgeAPIError) as ctx: RunResult.from_response( {"ok": False, "result": None, "duration_ms": 0} ) self.assertEqual(ctx.exception.status_code, 200) def test_file_token_missing_field(self) -> None: with self.assertRaises(ForgeError): FileToken.from_response({"ttl_secs": 60, "size": 5}) # no file_token def test_file_token_bad_int(self) -> None: with self.assertRaises(ForgeError): FileToken.from_response( {"file_token": "ff_x", "ttl_secs": "lots", "size": 5} ) def test_app_token_create_missing_name(self) -> None: with self.assertRaises(ForgeError): AppToken.from_create_response({"token": "cf_x"}) def test_app_token_list_row_missing_name(self) -> None: with self.assertRaises(ForgeError): AppToken.from_list_row({"ip_cidrs": ""}) @responses.activate def test_run_payload_with_bad_duration_surfaces_forge_error(self) -> None: responses.add( responses.POST, f"{BASE_URL}/run", json={"ok": True, "result": "x", "duration_ms": "junk"}, status=200, ) with _forge() as f, self.assertRaises(ForgeError): f.run(prompt="hi") class TestUploadFileMissingPath(unittest.TestCase): """M3: missing path must surface as ForgeError, not FileNotFoundError.""" def test_missing_path_raises_forge_error(self) -> None: with _forge() as f, self.assertRaises(ForgeError): f.upload_file("/nonexistent/path/to/file.bin") def test_missing_path_does_not_leak_filenotfounderror(self) -> None: # FileNotFoundError is OSError; ForgeError is not. Verify caller can # rely on `except ForgeError` to catch this. with _forge() as f: try: f.upload_file("/nonexistent/path/to/file.bin") except ForgeError: pass except FileNotFoundError: self.fail("FileNotFoundError leaked through ForgeError boundary") class TestUploadFileSymlinkOption(unittest.TestCase): """M5: optional follow_symlinks=False kwarg refuses symlinks.""" @responses.activate def test_follow_symlinks_false_refuses_symlink(self) -> None: import tempfile with tempfile.NamedTemporaryFile("wb", delete=False, suffix=".txt") as tf: tf.write(b"target") target = tf.name link = target + ".link" try: os.symlink(target, link) with _forge() as f, self.assertRaises(ForgeError): f.upload_file(link, follow_symlinks=False) finally: Path(link).unlink(missing_ok=True) Path(target).unlink(missing_ok=True) @responses.activate def test_follow_symlinks_true_uploads_target(self) -> None: import tempfile responses.add( responses.POST, f"{BASE_URL}/files", json={"file_token": "ff_sym", "ttl_secs": 60, "size": 6}, status=200, ) with tempfile.NamedTemporaryFile("wb", delete=False, suffix=".txt") as tf: tf.write(b"target") target = tf.name link = target + ".link" try: os.symlink(target, link) with _forge() as f: ft = f.upload_file(link, ttl_secs=60, follow_symlinks=True) self.assertEqual(ft.file_token, "ff_sym") finally: Path(link).unlink(missing_ok=True) Path(target).unlink(missing_ok=True) class TestRunTimeoutValidation(unittest.TestCase): """M4: timeout_secs is range-validated locally.""" def test_negative_timeout_rejected(self) -> None: with _forge() as f, self.assertRaises(ValueError): f.run(prompt="hi", timeout_secs=-30) def test_zero_timeout_rejected(self) -> None: # 0 used to be falsy-substituted with the default; now it's a hard error. with _forge() as f, self.assertRaises(ValueError): f.run(prompt="hi", timeout_secs=0) def test_excessive_timeout_rejected(self) -> None: with _forge() as f, self.assertRaises(ValueError): f.run(prompt="hi", timeout_secs=10_000) @responses.activate def test_min_boundary_accepted(self) -> None: # Smoke: 5 is the minimum and must NOT trip local validation. responses.add( responses.POST, f"{BASE_URL}/run", json={"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"}, status=200, ) with _forge() as f: r = f.run(prompt="hi", timeout_secs=5) self.assertTrue(r.ok) @responses.activate def test_max_boundary_accepted(self) -> None: responses.add( responses.POST, f"{BASE_URL}/run", json={"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"}, status=200, ) with _forge() as f: r = f.run(prompt="hi", timeout_secs=600) self.assertTrue(r.ok) class TestHealthzValidation(unittest.TestCase): """M6: non-dict /healthz response raises ForgeError.""" @responses.activate def test_healthz_string_response_raises(self) -> None: responses.add( responses.GET, f"{BASE_URL}/healthz", body="OK", # plain string, not JSON status=200, content_type="text/plain", ) with _forge() as f, self.assertRaises(ForgeError): f.healthz() @responses.activate def test_healthz_json_list_raises(self) -> None: responses.add( responses.GET, f"{BASE_URL}/healthz", json=["not", "a", "dict"], status=200, ) with _forge() as f, self.assertRaises(ForgeError): f.healthz() if __name__ == "__main__": unittest.main()