clients/python: initial Python SDK for clawdforge

Sync requests-based SDK in clients/python/. Wraps /healthz, /run, /files,
and /admin/tokens behind a Forge class with typed exceptions
(ForgeError + Transport/API/Auth subclasses) and dataclass response shapes
(RunResult, FileToken, AppToken). HTTP timeout = run timeout + 30s margin,
matching the pattern cauldron has been running inline. No retries —
caller's job since /run isn't idempotent.

24 unit tests via responses, all passing. Install with
pip install -e clients/python/.
This commit is contained in:
Kayos 2026-04-28 22:27:08 -07:00
parent 347fddea0f
commit 90e158f2fe
9 changed files with 1141 additions and 0 deletions

171
clients/python/README.md Normal file
View file

@ -0,0 +1,171 @@
# clawdforge — Python SDK
Sync Python client for the [clawdforge](http://192.168.0.5:3001/Sulkta-Coop/clawdforge) HTTP service. Wraps the bearer-token-gated REST API behind a typed dataclass surface so other Sulkta apps (cauldron, petalparse, johnny5) don't have to hand-roll `requests` calls.
## Install
```bash
pip install -e clients/python/
```
(LAN-only, not on PyPI.)
## Quickstart
```python
from clawdforge import Forge
forge = Forge(base_url="http://192.168.0.5:8800", token="cf_...")
print(forge.healthz())
# {'ok': True, 'claude_present': True, 'claude_version': '1.x.y'}
r = forge.run(prompt='Reply with JSON: {"hello": "world"}')
print(r.result) # {'hello': 'world'}
print(r.duration_ms) # 4321
print(r.stop_reason) # 'end_turn'
```
`Forge` is also a context manager — preferred when you have a script-shaped lifetime:
```python
with Forge(base_url="...", token="...") as forge:
forge.run(prompt="...")
```
## Construction
```python
Forge(
*,
base_url: str,
token: str,
default_model: str = "sonnet",
default_timeout_secs: int = 120,
http_timeout_margin: int = 30,
session: requests.Session | None = None,
)
```
- `base_url``http://host:port`. Trailing slash stripped.
- `token` — bearer. App token (`cf_...`) for `/run` and `/files`; admin bootstrap token for `/admin/*`.
- `default_model` — passed to `/run` when caller doesn't override. Server default is also `sonnet` so this matches.
- `default_timeout_secs``timeout_secs` for `/run` when caller doesn't override.
- `http_timeout_margin` — seconds added to the run subprocess timeout to derive the HTTP-level timeout. The HTTP deadline must outlast the server's subprocess deadline so we don't bail while clawdforge is still doing legitimate work for us. Default 30s — same pattern cauldron has been running inline.
- `session` — bring your own `requests.Session` if you want shared connection-pooling across multiple Forges. If omitted, the Forge owns one and closes it on exit.
## Methods
### `healthz() -> dict`
```python
forge.healthz()
# {'ok': True, 'claude_present': True, 'claude_version': '1.x.y'}
```
### `run(prompt, *, model=None, system=None, files=None, timeout_secs=None) -> RunResult`
```python
r = forge.run(
prompt="Sterilize this ingredient line: 'about 2 cups of cooked white rice'",
model="sonnet", # optional, defaults to forge.default_model
system="You are a precise recipe parser. Always reply with valid JSON.",
files=["ff_..."], # optional file tokens from upload_file()
timeout_secs=60, # optional, 5..600, defaults to forge.default_timeout_secs
)
r.ok # True
r.result # parsed JSON when claude returned valid JSON, else str
r.duration_ms # 4321
r.stop_reason # 'end_turn' | 'timeout' | 'error' | ...
```
### `upload_file(path_or_fileobj, *, ttl_secs=3600, filename=None, content_type=None) -> FileToken`
Either path-like or a binary file-object:
```python
ft = forge.upload_file("/path/to/recipe.png", ttl_secs=3600)
# FileToken(file_token='ff_...', ttl_secs=3600, size=12345)
# Or from a buffer:
import io
ft = forge.upload_file(io.BytesIO(b"..."), filename="snippet.txt")
r = forge.run(prompt="Extract recipe data", files=[ft.file_token])
```
The server enforces `60 <= ttl_secs <= 86400` and returns 400 for out-of-range values, surfaced as `ForgeAPIError`.
### Admin (admin-bootstrap-token gated)
```python
admin = Forge(base_url="http://192.168.0.5:8800", token=ADMIN_BOOTSTRAP_TOKEN)
t = admin.create_token("cauldron", ip_cidrs=["172.24.0.0/16"])
# AppToken(name='cauldron', token='cf_brandnew_xxx', ip_cidrs=['172.24.0.0/16'], ...)
# t.token is the plaintext bearer — store it now, the server only keeps a hash.
admin.list_tokens()
# [AppToken(name='cauldron', token=None, ip_cidrs=['172.24.0.0/16'], created_at=..., last_used=..., enabled=True), ...]
admin.revoke_token("cauldron") # True on success; raises ForgeAPIError(404) if no such token
```
## Errors
Everything the SDK raises descends from `ForgeError`:
```
ForgeError
├── ForgeTransportError — connection/TCP-timeout failures (server never responded)
└── ForgeAPIError — server returned 4xx/5xx; .status_code, .body available
└── ForgeAuthError — 401 or 403 specifically
```
`ForgeAPIError` carries:
- `.status_code: int`
- `.body: dict | str | None` — parsed JSON if available, else text
- `.message: str` — short summary
Typical pattern:
```python
from clawdforge import Forge, ForgeAuthError, ForgeAPIError, ForgeTransportError
try:
r = forge.run(prompt="...")
except ForgeAuthError:
# Re-mint your token, or check the IP allowlist.
raise
except ForgeAPIError as e:
if e.status_code == 502 and isinstance(e.body, dict):
# Subprocess failed/timed out. e.body has 'error', 'stderr',
# 'duration_ms', 'stop_reason'.
log.warning("clawdforge run failed: %s", e.body.get("error"))
raise
except ForgeTransportError:
# Network blip — clawdforge is down or unreachable. Caller decides
# whether to retry; the SDK does not retry.
raise
```
## No retries by default
Clawdforge runs are not idempotent — each `/run` spawns a real `claude -p` subprocess that costs tokens and time. If you want retries, wrap calls with whatever retry policy fits your call-site (tenacity, backoff, hand-rolled). The SDK won't double-charge you behind your back.
## Tests
```bash
cd clients/python
pip install -e .[test]
python -m unittest discover tests
```
Tests use `responses` to intercept HTTP calls — no live network, no live clawdforge.
## Notes
- The `result` field of `RunResult` matches whatever the server's `runner` parsed out of `claude -p --output-format json`: a dict when the prompt asked for JSON and the model complied, a string otherwise. The SDK passes it through as-is — your prompt determines the shape.
- The server's `GET /admin/tokens` returns `ip_cidrs` as a comma-joined string (it's stored that way in SQLite). The SDK normalizes that to `list[str]` on `AppToken`. The create response already returns a list, so both shapes round-trip cleanly.
- `revoke_token` on a non-existent name raises `ForgeAPIError(404)` rather than returning `False`. That matches the server's contract and lets callers distinguish "didn't exist" from "was disabled".

View file

@ -0,0 +1,6 @@
"""3-line clawdforge usage."""
import os
from clawdforge import Forge
with Forge(base_url=os.environ["CLAWDFORGE_URL"], token=os.environ["CLAWDFORGE_TOKEN"]) as forge:
print(forge.run(prompt='Reply with JSON: {"hello": "world"}').result)

View file

@ -0,0 +1,43 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "clawdforge"
version = "0.1.0"
description = "Python SDK for the clawdforge LAN-only HTTP service (claude -p subprocess wrapper)."
readme = "README.md"
requires-python = ">=3.10"
authors = [{ name = "Kayos", email = "kayos@sulkta.com" }]
keywords = ["clawdforge", "claude", "sulkta", "sdk"]
classifiers = [
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Operating System :: POSIX :: Linux",
"Intended Audience :: Developers",
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = [
"requests>=2.28",
]
[project.optional-dependencies]
test = [
"responses>=0.23",
]
[project.urls]
Homepage = "http://192.168.0.5:3001/Sulkta-Coop/clawdforge"
Source = "http://192.168.0.5:3001/Sulkta-Coop/clawdforge"
[tool.hatch.build.targets.wheel]
packages = ["src/clawdforge"]
[tool.hatch.build.targets.sdist]
include = [
"src/clawdforge",
"README.md",
"pyproject.toml",
]

View file

@ -0,0 +1,30 @@
"""clawdforge — Python SDK for the LAN-only clawdforge HTTP service.
Quickstart:
>>> from clawdforge import Forge
>>> forge = Forge(base_url="http://192.168.0.5:8800", token="cf_...")
>>> r = forge.run(prompt='Reply with JSON: {"hi": "ok"}')
>>> r.result
{'hi': 'ok'}
"""
from ._models import AppToken, FileToken, RunResult
from .client import Forge
from .exceptions import (
ForgeAPIError,
ForgeAuthError,
ForgeError,
ForgeTransportError,
)
__all__ = [
"Forge",
"RunResult",
"FileToken",
"AppToken",
"ForgeError",
"ForgeAPIError",
"ForgeAuthError",
"ForgeTransportError",
]
__version__ = "0.1.0"

View file

@ -0,0 +1,98 @@
"""Dataclasses for clawdforge API response shapes.
Shapes mirror what `clawdforge/server.py` returns so callers get attribute
access instead of dict-poking.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True, slots=True)
class RunResult:
"""Result of a successful `POST /run`.
`result` is whatever clawdforge parsed out of `claude -p --output-format json`:
a dict if the model returned valid JSON, otherwise the raw string. `ok` is
always True for a RunResult (failures raise ForgeAPIError instead).
"""
ok: bool
result: Any
duration_ms: int
stop_reason: str | None = None
@classmethod
def from_response(cls, payload: dict) -> "RunResult":
return cls(
ok=bool(payload.get("ok", True)),
result=payload.get("result"),
duration_ms=int(payload.get("duration_ms", 0)),
stop_reason=payload.get("stop_reason"),
)
@dataclass(frozen=True, slots=True)
class FileToken:
"""Result of a successful `POST /files`.
Pass `file_token` to `Forge.run(files=[...])` to attach the upload to a
prompt. `ttl_secs` is the lifetime the server assigned (it may differ from
what the client requested if outside the 60..86400 range, though the server
rejects out-of-range values in current implementation).
"""
file_token: str
ttl_secs: int
size: int
@classmethod
def from_response(cls, payload: dict) -> "FileToken":
return cls(
file_token=payload["file_token"],
ttl_secs=int(payload["ttl_secs"]),
size=int(payload["size"]),
)
@dataclass(frozen=True, slots=True)
class AppToken:
"""A row from `GET /admin/tokens` or the result of `POST /admin/tokens`.
`token` is the plaintext bearer string, returned ONLY at create time.
On list responses it is None; the server stores only a sha256 hash.
"""
name: str
token: str | None = None
ip_cidrs: list[str] = field(default_factory=list)
created_at: int | None = None
last_used: int | None = None
enabled: bool = True
@classmethod
def from_create_response(cls, payload: dict) -> "AppToken":
return cls(
name=payload["name"],
token=payload.get("token"),
ip_cidrs=list(payload.get("ip_cidrs") or []),
)
@classmethod
def from_list_row(cls, row: dict) -> "AppToken":
# `ip_cidrs` from the server's list endpoint is a comma-joined string
# (see store.list_tokens) — we normalize to list[str] here.
raw = row.get("ip_cidrs", "")
if isinstance(raw, str):
cidrs = [s for s in raw.split(",") if s]
else:
cidrs = list(raw or [])
return cls(
name=row["name"],
token=None,
ip_cidrs=cidrs,
created_at=row.get("created_at"),
last_used=row.get("last_used"),
enabled=bool(row.get("enabled", 1)),
)

View file

@ -0,0 +1,326 @@
"""Sync HTTP client for clawdforge.
Built on `requests`. One `Forge` per (base_url, token) pair; reuse it across
calls it holds a `requests.Session` for connection pooling. Safe to use as
a context manager to close the session deterministically.
No retry logic is built in. Wrap calls in your own retry layer (tenacity,
backoff, hand-rolled) if you need it clawdforge runs are not idempotent
(they spawn `claude -p`), so retries belong with the caller who knows the
semantics of the prompt.
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import IO, Any
import requests
from ._models import AppToken, FileToken, RunResult
from .exceptions import (
ForgeAPIError,
ForgeAuthError,
ForgeError,
ForgeTransportError,
)
_DEFAULT_MODEL = "sonnet"
_DEFAULT_RUN_TIMEOUT_SECS = 120
# HTTP timeout = run's subprocess timeout + this margin so we don't bail
# while clawdforge is still doing legitimate work for us. Mirrors the
# pattern in cauldron's inline Forge wrapper.
_HTTP_TIMEOUT_MARGIN_SECS = 30
_HEALTHZ_TIMEOUT_SECS = 10
class Forge:
"""Sync client for the clawdforge HTTP service.
Args:
base_url: e.g. ``"http://192.168.0.5:8800"``. Trailing slash is
stripped. Required.
token: bearer token. App token (``cf_...``) for ``/run`` and
``/files``; admin bootstrap token for ``/admin/*``. Required.
default_model: model passed to ``/run`` when caller doesn't supply
one. Defaults to ``"sonnet"``.
default_timeout_secs: ``timeout_secs`` passed to ``/run`` when caller
doesn't supply one. Defaults to 120.
http_timeout_margin: seconds added to the subprocess timeout to
derive the HTTP-level timeout. Defaults to 30.
session: optional pre-built ``requests.Session``. If None, a fresh
session is created and owned by this Forge instance.
Example:
>>> from clawdforge import Forge
>>> forge = Forge(base_url="http://192.168.0.5:8800", token="cf_...")
>>> r = forge.run(prompt='Reply with JSON: {"hi": "ok"}')
>>> r.result
{'hi': 'ok'}
"""
def __init__(
self,
*,
base_url: str,
token: str,
default_model: str = _DEFAULT_MODEL,
default_timeout_secs: int = _DEFAULT_RUN_TIMEOUT_SECS,
http_timeout_margin: int = _HTTP_TIMEOUT_MARGIN_SECS,
session: requests.Session | None = None,
) -> None:
if not base_url:
raise ValueError("base_url is required")
if not token:
raise ValueError("token is required")
self.base_url = base_url.rstrip("/")
self.token = token
self.default_model = default_model
self.default_timeout_secs = default_timeout_secs
self.http_timeout_margin = http_timeout_margin
self._session = session or requests.Session()
self._owns_session = session is None
# -- lifecycle ---------------------------------------------------------
def close(self) -> None:
"""Close the underlying ``requests.Session`` if we own it."""
if self._owns_session:
self._session.close()
def __enter__(self) -> "Forge":
return self
def __exit__(self, exc_type: object, exc: object, tb: object) -> None:
self.close()
# -- internal ----------------------------------------------------------
def _headers(self, extra: dict[str, str] | None = None) -> dict[str, str]:
h = {"Authorization": f"Bearer {self.token}"}
if extra:
h.update(extra)
return h
def _url(self, path: str) -> str:
return f"{self.base_url}{path}"
def _request(
self,
method: str,
path: str,
*,
json_body: dict | None = None,
data: dict | None = None,
files: dict | None = None,
timeout: float | tuple[float, float] | None = None,
extra_headers: dict[str, str] | None = None,
) -> Any:
"""Wrap a requests call, normalize errors, return parsed JSON.
Raises:
ForgeTransportError: connection / TCP-timeout failures.
ForgeAuthError: 401 / 403.
ForgeAPIError: other 4xx / 5xx.
"""
try:
resp = self._session.request(
method,
self._url(path),
headers=self._headers(extra_headers),
json=json_body,
data=data,
files=files,
timeout=timeout,
)
except requests.RequestException as e:
raise ForgeTransportError(f"transport: {e}") from e
return self._parse(resp)
@staticmethod
def _parse(resp: requests.Response) -> Any:
# Try JSON first; fall back to text. Some error responses (502 in
# particular) carry JSON bodies that we want to surface verbatim.
body: dict | str | None
try:
body = resp.json()
except ValueError:
body = resp.text or None
if resp.status_code >= 400:
short = ""
if isinstance(body, dict):
short = body.get("error") or body.get("detail") or ""
elif isinstance(body, str):
short = body[:200]
msg = f"{resp.status_code} {resp.reason}: {short}".rstrip(": ")
if resp.status_code in (401, 403):
raise ForgeAuthError(msg, status_code=resp.status_code, body=body)
raise ForgeAPIError(msg, status_code=resp.status_code, body=body)
return body
# -- /healthz ----------------------------------------------------------
def healthz(self) -> dict:
"""``GET /healthz``.
Returns:
``{"ok": True, "claude_present": bool, "claude_version": str | None}``
Raises:
ForgeTransportError, ForgeAPIError
"""
return self._request("GET", "/healthz", timeout=_HEALTHZ_TIMEOUT_SECS)
# -- /run --------------------------------------------------------------
def run(
self,
prompt: str,
*,
model: str | None = None,
system: str | None = None,
files: list[str] | None = None,
timeout_secs: int | None = None,
) -> RunResult:
"""``POST /run``: run a prompt through ``claude -p``.
Args:
prompt: the user prompt. Required, must be non-empty.
model: optional model override. Defaults to ``self.default_model``.
system: optional system prompt.
files: optional list of ``ff_...`` file tokens previously returned
from ``upload_file``.
timeout_secs: per-run subprocess timeout (5..600). Defaults to
``self.default_timeout_secs``.
Returns:
RunResult ``ok=True`` on success.
Raises:
ForgeAPIError: server returned 502 (subprocess failed/timed out)
or another 4xx/5xx. Inspect ``.body`` for ``error``,
``stderr``, ``duration_ms``, ``stop_reason``.
ForgeAuthError: bad token / IP not allowed.
ForgeTransportError: connection-level failure.
ValueError: empty prompt.
"""
if not prompt:
raise ValueError("prompt must be non-empty")
body: dict[str, Any] = {
"prompt": prompt,
"model": model or self.default_model,
}
if system is not None:
body["system"] = system
if files:
body["files"] = list(files)
if timeout_secs is not None:
body["timeout_secs"] = timeout_secs
effective_run_timeout = timeout_secs or self.default_timeout_secs
http_timeout = effective_run_timeout + self.http_timeout_margin
payload = self._request("POST", "/run", json_body=body, timeout=http_timeout)
if not isinstance(payload, dict):
raise ForgeError(f"unexpected /run response type: {type(payload).__name__}")
return RunResult.from_response(payload)
# -- /files ------------------------------------------------------------
def upload_file(
self,
path_or_fileobj: str | os.PathLike[str] | IO[bytes],
*,
ttl_secs: int = 3600,
filename: str | None = None,
content_type: str | None = None,
) -> FileToken:
"""``POST /files``: upload a file, get back a ``ff_...`` token.
Args:
path_or_fileobj: a filesystem path (``str`` / ``Path``) or an
already-opened binary file-like object.
ttl_secs: server-side TTL for the staged file (60..86400). The
server rejects out-of-range values with 400.
filename: override the filename advertised to the server.
Defaults to ``Path(path).name`` when given a path, or
``"upload"`` when given a raw file object without ``.name``.
content_type: optional MIME type. If None, requests/servers
default to ``application/octet-stream``.
Returns:
FileToken
"""
# Resolve to (filename, fileobj, opened-by-us-flag)
opened_here = False
fileobj: IO[bytes]
if isinstance(path_or_fileobj, (str, os.PathLike)):
p = Path(path_or_fileobj)
fileobj = p.open("rb")
opened_here = True
resolved_name = filename or p.name
else:
fileobj = path_or_fileobj
resolved_name = filename or getattr(fileobj, "name", None) or "upload"
# If `name` came from a real file, strip directory components.
resolved_name = Path(str(resolved_name)).name
try:
file_field: tuple[str, IO[bytes]] | tuple[str, IO[bytes], str] = (
(resolved_name, fileobj, content_type)
if content_type
else (resolved_name, fileobj)
)
payload = self._request(
"POST",
"/files",
data={"ttl_secs": str(ttl_secs)},
files={"file": file_field},
# Upload timeout: same generous margin as run's default.
timeout=self.default_timeout_secs + self.http_timeout_margin,
)
finally:
if opened_here:
fileobj.close()
if not isinstance(payload, dict):
raise ForgeError(f"unexpected /files response type: {type(payload).__name__}")
return FileToken.from_response(payload)
# -- /admin/tokens ----------------------------------------------------
def create_token(self, name: str, *, ip_cidrs: list[str] | None = None) -> AppToken:
"""``POST /admin/tokens`` (admin-bootstrap-token gated).
Returns an AppToken whose ``.token`` field holds the plaintext
bearer store it immediately, the server only keeps a hash.
"""
body: dict[str, Any] = {"name": name, "ip_cidrs": list(ip_cidrs or [])}
payload = self._request("POST", "/admin/tokens", json_body=body, timeout=10)
if not isinstance(payload, dict):
raise ForgeError("unexpected /admin/tokens response")
return AppToken.from_create_response(payload)
def list_tokens(self) -> list[AppToken]:
"""``GET /admin/tokens`` (admin-bootstrap-token gated)."""
payload = self._request("GET", "/admin/tokens", timeout=10)
if not isinstance(payload, dict) or "tokens" not in payload:
raise ForgeError("unexpected /admin/tokens response")
return [AppToken.from_list_row(row) for row in payload["tokens"]]
def revoke_token(self, name: str) -> bool:
"""``DELETE /admin/tokens/<name>`` (admin-bootstrap-token gated).
Returns True on success. Raises ForgeAPIError(404) if no such token
exists rather than returning False that matches the server's
contract and lets callers distinguish "missing" from "revoked".
"""
payload = self._request("DELETE", f"/admin/tokens/{name}", timeout=10)
if isinstance(payload, dict):
return bool(payload.get("ok", True))
return True

View file

@ -0,0 +1,56 @@
"""Typed exceptions for the clawdforge SDK.
Callers should catch `ForgeError` to handle anything this client raises.
The subclasses let you discriminate transport vs. auth vs. other API errors
without sniffing status codes by hand.
"""
from __future__ import annotations
class ForgeError(Exception):
"""Base exception for everything the clawdforge SDK raises.
All other ForgeXxxError subclasses inherit from this, so a single
`except ForgeError:` catches the whole family.
"""
class ForgeTransportError(ForgeError):
"""The HTTP request never produced a response.
Connection refused, DNS failure, TCP timeout, TLS handshake failure, etc.
The original `requests` exception is available as `__cause__`.
"""
class ForgeAPIError(ForgeError):
"""The server returned a 4xx or 5xx response.
Attributes:
status_code: HTTP status code returned by the server.
body: parsed response body (dict if JSON, else str).
message: short human-readable summary.
"""
def __init__(
self,
message: str,
*,
status_code: int,
body: dict | str | None = None,
) -> None:
super().__init__(message)
self.status_code = status_code
self.body = body
self.message = message
def __str__(self) -> str: # pragma: no cover - cosmetic
return f"{self.message} (status={self.status_code})"
class ForgeAuthError(ForgeAPIError):
"""401 / 403 from the server.
Either the bearer token is missing/wrong, or the IP is not in the
allowlist (global or per-app). Inspect `body` for the server's message.
"""

View file

View file

@ -0,0 +1,411 @@
"""Tests for the clawdforge Python SDK.
Uses `responses` to intercept HTTP calls. No live network.
"""
from __future__ import annotations
import io
import json
import unittest
from pathlib import Path
import requests
import responses
from clawdforge import (
AppToken,
FileToken,
Forge,
ForgeAPIError,
ForgeAuthError,
ForgeError,
ForgeTransportError,
RunResult,
)
BASE_URL = "http://192.168.0.5:8800"
TOKEN = "cf_test_token_xxxxxxxx"
def _forge() -> Forge:
return Forge(base_url=BASE_URL, token=TOKEN, default_timeout_secs=60)
class TestHealthz(unittest.TestCase):
@responses.activate
def test_healthz_ok(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/healthz",
json={"ok": True, "claude_present": True, "claude_version": "1.2.3"},
status=200,
)
with _forge() as f:
out = f.healthz()
self.assertEqual(out["ok"], True)
self.assertEqual(out["claude_version"], "1.2.3")
@responses.activate
def test_healthz_no_auth_header_still_sent(self) -> None:
# The server doesn't *require* auth on /healthz, but we still send the
# bearer because we set it as a default header. Verify it's present.
captured = {}
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
captured["auth"] = request.headers.get("Authorization")
return (200, {}, json.dumps({"ok": True, "claude_present": True, "claude_version": "x"}))
responses.add_callback(responses.GET, f"{BASE_URL}/healthz", callback=cb)
with _forge() as f:
f.healthz()
self.assertEqual(captured["auth"], f"Bearer {TOKEN}")
class TestRun(unittest.TestCase):
@responses.activate
def test_run_success_json_result(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={
"ok": True,
"result": {"hello": "world"},
"duration_ms": 1234,
"stop_reason": "end_turn",
},
status=200,
)
with _forge() as f:
r = f.run(prompt='Reply with JSON: {"hello": "world"}')
self.assertIsInstance(r, RunResult)
self.assertTrue(r.ok)
self.assertEqual(r.result, {"hello": "world"})
self.assertEqual(r.duration_ms, 1234)
self.assertEqual(r.stop_reason, "end_turn")
@responses.activate
def test_run_success_string_result(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={
"ok": True,
"result": "plain text reply",
"duration_ms": 800,
"stop_reason": "end_turn",
},
status=200,
)
with _forge() as f:
r = f.run(prompt="hello")
self.assertEqual(r.result, "plain text reply")
@responses.activate
def test_run_sends_expected_body(self) -> None:
captured: dict = {}
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
captured["body"] = json.loads(request.body)
captured["auth"] = request.headers.get("Authorization")
return (
200,
{},
json.dumps({"ok": True, "result": {}, "duration_ms": 1, "stop_reason": "end_turn"}),
)
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
with _forge() as f:
f.run(
prompt="hi",
model="opus",
system="be terse",
files=["ff_abc"],
timeout_secs=42,
)
self.assertEqual(captured["auth"], f"Bearer {TOKEN}")
self.assertEqual(
captured["body"],
{
"prompt": "hi",
"model": "opus",
"system": "be terse",
"files": ["ff_abc"],
"timeout_secs": 42,
},
)
@responses.activate
def test_run_uses_default_model_when_omitted(self) -> None:
captured: dict = {}
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
captured["body"] = json.loads(request.body)
return (
200,
{},
json.dumps({"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"}),
)
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
with _forge() as f:
f.run(prompt="hi")
self.assertEqual(captured["body"]["model"], "sonnet")
self.assertNotIn("system", captured["body"])
self.assertNotIn("files", captured["body"])
self.assertNotIn("timeout_secs", captured["body"])
@responses.activate
def test_run_502_raises_api_error_with_body(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={
"ok": False,
"error": "subprocess timed out",
"stderr": "...",
"duration_ms": 60000,
"stop_reason": "timeout",
},
status=502,
)
with _forge() as f, self.assertRaises(ForgeAPIError) as ctx:
f.run(prompt="hi", timeout_secs=60)
err = ctx.exception
self.assertEqual(err.status_code, 502)
self.assertIsInstance(err.body, dict)
self.assertEqual(err.body["stop_reason"], "timeout")
self.assertIn("subprocess timed out", err.message)
@responses.activate
def test_run_401_raises_auth_error(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={"detail": "missing bearer"},
status=401,
)
with _forge() as f, self.assertRaises(ForgeAuthError) as ctx:
f.run(prompt="hi")
self.assertEqual(ctx.exception.status_code, 401)
# ForgeAuthError is-a ForgeAPIError is-a ForgeError
self.assertIsInstance(ctx.exception, ForgeAPIError)
self.assertIsInstance(ctx.exception, ForgeError)
@responses.activate
def test_run_403_raises_auth_error(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={"detail": "ip not in allowlist: 10.0.0.1"},
status=403,
)
with _forge() as f, self.assertRaises(ForgeAuthError):
f.run(prompt="hi")
@responses.activate
def test_run_transport_error(self) -> None:
# No registered response → responses raises ConnectionError.
with _forge() as f, self.assertRaises(ForgeTransportError):
f.run(prompt="hi")
def test_run_empty_prompt_rejected_locally(self) -> None:
with _forge() as f, self.assertRaises(ValueError):
f.run(prompt="")
@responses.activate
def test_run_http_timeout_uses_margin(self) -> None:
captured: dict = {}
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
# `responses` doesn't surface the timeout directly, but we can
# check via the prepared request hook that we got here at all.
captured["called"] = True
return (
200,
{},
json.dumps({"ok": True, "result": "x", "duration_ms": 1, "stop_reason": "end_turn"}),
)
responses.add_callback(responses.POST, f"{BASE_URL}/run", callback=cb)
with _forge() as f:
# Verify the computation: timeout_secs=42 → http_timeout=72.
# We poke at the private margin only to guard the formula stays
# in sync with the readme/contract.
self.assertEqual(f.http_timeout_margin, 30)
f.run(prompt="hi", timeout_secs=42)
self.assertTrue(captured.get("called"))
class TestFiles(unittest.TestCase):
@responses.activate
def test_upload_file_from_path(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/files",
json={"file_token": "ff_abc123", "ttl_secs": 3600, "size": 11},
status=200,
)
# Write a small temp file.
import tempfile
with tempfile.NamedTemporaryFile("wb", delete=False, suffix=".txt") as tf:
tf.write(b"hello world")
tmp_path = tf.name
try:
with _forge() as f:
ft = f.upload_file(tmp_path, ttl_secs=3600)
self.assertIsInstance(ft, FileToken)
self.assertEqual(ft.file_token, "ff_abc123")
self.assertEqual(ft.ttl_secs, 3600)
self.assertEqual(ft.size, 11)
finally:
Path(tmp_path).unlink(missing_ok=True)
@responses.activate
def test_upload_file_from_fileobj(self) -> None:
captured: dict = {}
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
ct = request.headers.get("Content-Type", "")
captured["content_type"] = ct
captured["body_starts_multipart"] = ct.startswith("multipart/form-data")
return (
200,
{},
json.dumps({"file_token": "ff_xyz", "ttl_secs": 60, "size": 5}),
)
responses.add_callback(responses.POST, f"{BASE_URL}/files", callback=cb)
buf = io.BytesIO(b"hello")
with _forge() as f:
ft = f.upload_file(buf, ttl_secs=60, filename="snippet.txt")
self.assertEqual(ft.file_token, "ff_xyz")
self.assertTrue(captured["body_starts_multipart"])
@responses.activate
def test_upload_file_400_raises_api_error(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/files",
json={"detail": "ttl_secs out of range (60..86400)"},
status=400,
)
buf = io.BytesIO(b"x")
with _forge() as f, self.assertRaises(ForgeAPIError) as ctx:
f.upload_file(buf, ttl_secs=10, filename="x.bin")
self.assertEqual(ctx.exception.status_code, 400)
class TestAdminTokens(unittest.TestCase):
@responses.activate
def test_create_token(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/admin/tokens",
json={
"name": "cauldron",
"token": "cf_brandnew_xxx",
"ip_cidrs": ["172.24.0.0/16"],
},
status=200,
)
with _forge() as f:
t = f.create_token("cauldron", ip_cidrs=["172.24.0.0/16"])
self.assertIsInstance(t, AppToken)
self.assertEqual(t.name, "cauldron")
self.assertEqual(t.token, "cf_brandnew_xxx")
self.assertEqual(t.ip_cidrs, ["172.24.0.0/16"])
@responses.activate
def test_list_tokens(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/admin/tokens",
json={
"tokens": [
{
"name": "cauldron",
"ip_cidrs": "172.24.0.0/16",
"created_at": 100,
"last_used": 200,
"enabled": 1,
},
{
"name": "petalparse",
"ip_cidrs": "",
"created_at": 50,
"last_used": None,
"enabled": 0,
},
]
},
status=200,
)
with _forge() as f:
toks = f.list_tokens()
self.assertEqual(len(toks), 2)
self.assertEqual(toks[0].name, "cauldron")
self.assertEqual(toks[0].ip_cidrs, ["172.24.0.0/16"])
self.assertTrue(toks[0].enabled)
self.assertIsNone(toks[0].token)
self.assertEqual(toks[1].ip_cidrs, [])
self.assertFalse(toks[1].enabled)
@responses.activate
def test_revoke_token_ok(self) -> None:
responses.add(
responses.DELETE,
f"{BASE_URL}/admin/tokens/cauldron",
json={"ok": True},
status=200,
)
with _forge() as f:
self.assertTrue(f.revoke_token("cauldron"))
@responses.activate
def test_revoke_token_404(self) -> None:
responses.add(
responses.DELETE,
f"{BASE_URL}/admin/tokens/nosuch",
json={"detail": "no such token"},
status=404,
)
with _forge() as f, self.assertRaises(ForgeAPIError) as ctx:
f.revoke_token("nosuch")
self.assertEqual(ctx.exception.status_code, 404)
class TestForgeConstruction(unittest.TestCase):
def test_requires_base_url(self) -> None:
with self.assertRaises(ValueError):
Forge(base_url="", token="cf_x")
def test_requires_token(self) -> None:
with self.assertRaises(ValueError):
Forge(base_url="http://x", token="")
def test_strips_trailing_slash(self) -> None:
f = Forge(base_url="http://x:8800/", token="cf_x")
self.assertEqual(f.base_url, "http://x:8800")
def test_context_manager_closes_owned_session(self) -> None:
f = Forge(base_url="http://x", token="cf_x")
with f:
self.assertTrue(f._owns_session)
# After close(), session.close() was called — calling close() again is safe.
f.close()
def test_external_session_not_closed(self) -> None:
sess = requests.Session()
try:
with Forge(base_url="http://x", token="cf_x", session=sess) as f:
self.assertFalse(f._owns_session)
# Externally-owned session should still be usable after Forge close.
self.assertIsNotNone(sess.adapters)
finally:
sess.close()
if __name__ == "__main__":
unittest.main()