clients/python: v0.2 multi-turn Session API

- Session class wrapping a clawdforge session_id; context-manager auto-close
- forge.session(agent=...) block form (preferred)
- forge.create_session() / forge.list_sessions() / forge.get_session() admin shapes
- TurnResult dataclass with .text() helper concatenating text events
- Idempotent Session.close() — safe in finally / __exit__
- tests/test_sessions.py: 16 tests covering block/manual/idempotency/exception/list/state/text-helper/v0.1-regression
- README "Multi-turn / Sessions (v0.2)" section
- pyproject version 0.1.0 -> 0.2.0; package __version__ matches

Architecture: matches the existing v0.1 client — sync, requests-based,
single Forge-owned requests.Session for connection pooling. Session holds
a back-reference to the Forge for HTTP work (no per-Session HTTP client).
This mirrors how Forge already exposes its own context-manager pattern, so
nothing about the threading/lifecycle story changes for callers.

v0.1 /run path unchanged — 49 existing tests still green, +16 new tests
for v0.2 (target was 9; covered the spec's 9 plus extras for empty-prompt
local validation, include_closed=false param, empty-id ValueError, and
s.state() round-trip).

mypy --strict src/clawdforge/ clean.

Spec: memory/spec-clawdforge-v0.2.md
Server core: 940861f
This commit is contained in:
Kayos 2026-04-29 06:35:11 -07:00
parent 41a522a469
commit 6a6fc8a67f
7 changed files with 1066 additions and 7 deletions

View file

@ -33,6 +33,103 @@ with Forge(base_url="...", token="...") as forge:
forge.run(prompt="...")
```
## Multi-turn / Sessions (v0.2)
For agent-shaped workflows that span multiple turns, use a `Session`. A session keeps acpx context across `turn()` calls — file reads, tool-call results, and conversational state persist across turns until the session is closed (or hits the server's TTL).
### Block form (preferred — auto-close)
```python
with forge.session(agent="claude") as s:
r1 = s.turn("Read README.md and summarize it")
print(r1.text()) # concatenated 'text' events from the model
r2 = s.turn("Now look at the auth flow")
print(r2.duration_ms) # 12345
for ev in r2.events:
print(ev["type"], ev.get("content"))
# Session is closed automatically here, even if an exception was raised
# inside the block.
```
### Manual form (explicit lifecycle)
For callers that need a Session to outlive a single block (background workers, cross-handler state):
```python
s = forge.create_session(agent="claude", meta={"caller": "petalparse"})
try:
r = s.turn("hello")
finally:
s.close() # idempotent — safe to call repeatedly, including on a
# session that was already closed by the server's TTL sweeper
```
`Session.close()` is idempotent: subsequent calls short-circuit without an HTTP request, and the server itself returns 200 (`already_closed: true`) on a repeat DELETE. Safe to drop in `finally` and `__exit__` blocks without `try/except`.
### TurnResult shape
```python
@dataclass(frozen=True)
class TurnResult:
ok: bool
session_id: str
turn_index: int # 1-based, monotonic per session
events: list[dict] # 'text' | 'tool_call' | 'thinking' | ...
stop_reason: str | None # 'end_turn' | 'timeout' | 'error' | ...
duration_ms: int
def text(self) -> str: ... # concat of all 'text' events' content
```
`events` is passed through verbatim from the v0.2 server — see the server doc for the exhaustive event-type schema. `text()` is sugar for "give me the model's prose reply, ignore thinking and tool calls."
### Listing & inspection
```python
# All sessions for the calling token (newest first):
sessions = forge.list_sessions()
for row in sessions:
print(row["session_id"], row["turn_count"], row["closed_at"])
# Drop closed ones:
live = forge.list_sessions(include_closed=False)
# Fresh state for a specific session:
state = forge.get_session("sess_abc")
# {'session_id': 'sess_abc', 'agent': 'claude', 'cwd': ...,
# 'created_at': ..., 'last_turn_at': ..., 'turn_count': 3,
# 'closed_at': None, 'live': True, 'meta': {...}}
# Same thing from inside a block:
with forge.session() as s:
s.turn("hi")
print(s.state()["turn_count"])
```
### Errors
The same exception classes apply as v0.1. Two cases worth calling out:
- **Cross-token access → `ForgeAPIError(404)`.** If token A asks for token B's session, the server returns 404 (not 403) so it can't be used to enumerate live session ids across token boundaries.
- **Session closed mid-flight → `ForgeAPIError(410)`.** The TTL sweeper or another caller's explicit `close()` may have ended the session between turns.
```python
from clawdforge import ForgeAPIError
try:
r = s.turn("hello")
except ForgeAPIError as e:
if e.status_code == 410:
# Session expired or was closed — start a fresh one.
...
elif e.status_code == 404:
# Server forgot us, or we're asking for someone else's id.
...
raise
```
## Construction
```python
@ -97,6 +194,22 @@ r = forge.run(prompt="Extract recipe data", files=[ft.file_token])
The server enforces `60 <= ttl_secs <= 86400` and returns 400 for out-of-range values, surfaced as `ForgeAPIError`.
### `session(*, agent="claude", meta=None) -> ContextManager[Session]` (v0.2)
Block-form session — auto-closes on `__exit__`. See the "Multi-turn / Sessions (v0.2)" section above.
### `create_session(*, agent="claude", meta=None) -> Session` (v0.2)
Manual-lifecycle session. Caller is responsible for `s.close()` (typically in `finally`). `close()` is idempotent.
### `list_sessions(*, include_closed=True) -> list[dict]` (v0.2)
List sessions for the calling token. Pass `include_closed=False` to skip closed rows.
### `get_session(session_id) -> dict` (v0.2)
Fetch fresh server-side state for a session. Raises `ForgeAPIError(404)` if the id is unknown or belongs to another token.
### Admin (admin-bootstrap-token gated)
```python

View file

@ -4,8 +4,8 @@ build-backend = "hatchling.build"
[project]
name = "clawdforge"
version = "0.1.0"
description = "Python SDK for the clawdforge LAN-only HTTP service (claude -p subprocess wrapper)."
version = "0.2.0"
description = "Python SDK for the clawdforge LAN-only HTTP service — single-turn /run plus multi-turn /sessions (v0.2)."
readme = "README.md"
requires-python = ">=3.10"
authors = [{ name = "Kayos", email = "kayos@sulkta.com" }]

View file

@ -1,13 +1,20 @@
"""clawdforge — Python SDK for the LAN-only clawdforge HTTP service.
Quickstart:
Quickstart (single-turn v0.1):
>>> from clawdforge import Forge
>>> forge = Forge(base_url="http://192.168.0.5:8800", token="cf_...")
>>> r = forge.run(prompt='Reply with JSON: {"hi": "ok"}')
>>> r.result
{'hi': 'ok'}
Multi-turn (v0.2):
>>> with forge.session(agent="claude") as s:
... r1 = s.turn("Read README.md and summarize")
... r2 = s.turn("Now look at the auth flow")
>>> # session auto-closed on __exit__, even on exception
"""
from ._models import AppToken, FileToken, RunResult
from ._models import AppToken, FileToken, RunResult, TurnResult
from ._session import Session
from .client import Forge
from .exceptions import (
ForgeAPIError,
@ -19,6 +26,8 @@ from .exceptions import (
__all__ = [
"Forge",
"RunResult",
"TurnResult",
"Session",
"FileToken",
"AppToken",
"ForgeError",
@ -27,4 +36,4 @@ __all__ = [
"ForgeTransportError",
]
__version__ = "0.1.0"
__version__ = "0.2.0"

View file

@ -11,6 +11,78 @@ from typing import Any
from .exceptions import ForgeAPIError, ForgeError
@dataclass(frozen=True, slots=True)
class TurnResult:
"""Result of a successful `POST /sessions/{id}/turn`.
`events` is the structured event stream the server collected from acpx
for this turn. Each event is a dict with at minimum a ``type`` key
(``"text"``, ``"tool_call"``, ``"thinking"``, ...) plus type-specific
payload fields. The SDK passes events through verbatim see the v0.2
server doc for the exhaustive shape.
"""
ok: bool
session_id: str
turn_index: int
events: list[dict[str, Any]]
stop_reason: str | None
duration_ms: int
def text(self) -> str:
"""Concatenate all ``type == "text"`` events' ``content`` fields.
Sugar for the common case where a caller wants the model's prose
reply and doesn't care about thinking / tool_call events. Returns
an empty string if no text events were emitted.
"""
parts: list[str] = []
for ev in self.events:
if not isinstance(ev, dict):
continue
if ev.get("type") == "text":
content = ev.get("content")
if isinstance(content, str):
parts.append(content)
return "".join(parts)
@classmethod
def from_response(cls, payload: dict[str, Any]) -> "TurnResult":
try:
ok = bool(payload.get("ok", True))
session_id = str(payload["session_id"])
turn_index = int(payload.get("turn_index", 0))
events_raw = payload.get("events") or []
if not isinstance(events_raw, list):
raise ForgeError(
f"malformed /sessions/turn response: events is "
f"{type(events_raw).__name__}, expected list"
)
stop_reason = payload.get("stop_reason")
duration_ms = int(payload.get("duration_ms", 0))
except (TypeError, ValueError, KeyError) as e:
raise ForgeError(f"malformed /sessions/turn response: {e}") from e
# Same contract guard as RunResult: ok=False on a 200 is a server
# bug, surface it as ApiError so callers can't silently treat the
# turn as successful.
if not ok:
raise ForgeAPIError(
"server returned ok=False on /sessions/{id}/turn (contract violation)",
status_code=200,
body=payload,
)
return cls(
ok=ok,
session_id=session_id,
turn_index=turn_index,
events=list(events_raw),
stop_reason=stop_reason,
duration_ms=duration_ms,
)
@dataclass(frozen=True, slots=True)
class RunResult:
"""Result of a successful `POST /run`.

View file

@ -0,0 +1,172 @@
"""Session class for clawdforge v0.2 multi-turn API.
A Session wraps a server-side session_id and a back-reference to the parent
Forge so all HTTP work goes through the Forge's pooled requests.Session.
There is no per-Session HTTP client.
Idiomatic use:
with forge.session(agent="claude") as s:
r = s.turn("hello")
print(r.text())
Manual use:
s = forge.create_session(agent="claude")
try:
s.turn("hello")
finally:
s.close()
`Session.close()` is idempotent both because the server's DELETE /sessions
returns 200 on a re-close (with ``already_closed: true``) and because the
client short-circuits after the first successful close. Safe to call from
``finally`` blocks and ``__exit__`` without try/except.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
from ._models import TurnResult
if TYPE_CHECKING:
from .client import Forge
log = logging.getLogger(__name__)
class Session:
"""Handle to a server-side multi-turn session.
Construct via ``forge.create_session(...)`` (manual lifecycle) or
``forge.session(...)`` (context-manager auto-close preferred).
Attributes:
session_id: server-assigned id (also acpx's id).
agent: agent name this session is bound to (e.g. ``"claude"``).
created_at: unix-seconds timestamp from the server at create time.
cwd: working directory the server allocated for this session, or
None if the server did not return one.
"""
__slots__ = ("_forge", "session_id", "agent", "created_at", "cwd", "_closed")
def __init__(
self,
forge: "Forge",
*,
session_id: str,
agent: str,
created_at: int | None = None,
cwd: str | None = None,
) -> None:
self._forge = forge
self.session_id = session_id
self.agent = agent
self.created_at = created_at
self.cwd = cwd
self._closed = False
# -- context manager --------------------------------------------------
def __enter__(self) -> "Session":
return self
def __exit__(self, exc_type: object, exc: object, tb: object) -> None:
# Swallow close() failures — the `with` block already finished its
# work, and a network blip on close shouldn't mask the real result
# (or a real exception unwinding through __exit__).
try:
self.close()
except Exception as e:
log.warning(
"clawdforge: session %s close failed during __exit__: %s",
self.session_id,
e,
)
# -- public ops -------------------------------------------------------
@property
def closed(self) -> bool:
"""True after the first successful ``close()`` call.
Note: this is the *client's* view. The server may have GC'd the
session out from under us (TTL sweeper) without us knowing. A
subsequent ``turn()`` would surface that as ``ForgeAPIError(410)``.
"""
return self._closed
def turn(
self,
prompt: str,
*,
files: list[str] | None = None,
timeout_secs: int | None = None,
) -> TurnResult:
"""Send one turn. Returns a TurnResult.
Args:
prompt: required, must be non-empty.
files: optional list of ``ff_...`` file tokens from
``Forge.upload_file``. Server-side these resolve to staged
paths visible inside the session's cwd.
timeout_secs: optional per-turn subprocess timeout. Defaults
to the parent Forge's ``default_timeout_secs``.
Raises:
ValueError: empty prompt or already-closed session.
ForgeAPIError: 4xx/5xx notably 404 (session vanished or
cross-token access) and 410 (session is closed / no longer
live in the server process).
ForgeAuthError, ForgeTransportError, ForgeError: as v0.1.
"""
if not prompt:
raise ValueError("prompt must be non-empty")
if self._closed:
raise ValueError(
f"session {self.session_id} already closed by this client"
)
return self._forge._session_turn(
session_id=self.session_id,
prompt=prompt,
files=files,
timeout_secs=timeout_secs,
)
def state(self) -> dict[str, Any]:
"""Fetch fresh server-side state for this session.
Returns the raw ``GET /sessions/{id}`` payload a dict with
``session_id``, ``agent``, ``cwd``, ``created_at``, ``last_turn_at``,
``turn_count``, ``closed_at``, ``live``, ``meta``.
"""
return self._forge.get_session(self.session_id)
def close(self) -> None:
"""Close the session server-side. Idempotent.
Safe to call from ``finally`` blocks and ``__exit__``. Subsequent
calls short-circuit without an HTTP round-trip.
"""
if self._closed:
return
# Mark closed BEFORE the HTTP call so that even if the request
# raises, we don't enter a retry loop on every __exit__ pass. The
# server's DELETE is itself idempotent (200 on re-close) so the
# caller can also re-issue close() through a fresh client if they
# really need to.
self._closed = True
self._forge._session_close(self.session_id)
# -- repr -------------------------------------------------------------
def __repr__(self) -> str: # pragma: no cover - cosmetic
state = "closed" if self._closed else "open"
return (
f"Session(session_id={self.session_id!r}, agent={self.agent!r}, "
f"state={state})"
)

View file

@ -11,14 +11,17 @@ semantics of the prompt.
"""
from __future__ import annotations
import logging
import os
from contextlib import contextmanager
from pathlib import Path
from typing import IO, Any
from typing import IO, Any, Iterator
from urllib.parse import quote
import requests
from ._models import AppToken, FileToken, RunResult
from ._models import AppToken, FileToken, RunResult, TurnResult
from ._session import Session
from .exceptions import (
ForgeAPIError,
ForgeAuthError,
@ -27,6 +30,9 @@ from .exceptions import (
)
log = logging.getLogger(__name__)
_DEFAULT_MODEL = "sonnet"
_DEFAULT_RUN_TIMEOUT_SECS = 120
# HTTP timeout = run's subprocess timeout + this margin so we don't bail
@ -393,3 +399,211 @@ class Forge:
if isinstance(payload, dict):
return bool(payload.get("ok", True))
return True
# -- /sessions (v0.2) -------------------------------------------------
#
# The session surface is purely additive on top of v0.1. v0.1 callers
# never touch /sessions and never see Session/TurnResult. The block
# form `forge.session(agent=...)` is the preferred entry point — it
# auto-closes on exit (including on exceptions) and matches the
# cross-language idiom in the v0.2 spec.
def create_session(
self,
*,
agent: str = "claude",
meta: dict[str, Any] | None = None,
) -> Session:
"""``POST /sessions``: create a multi-turn session.
Manual lifecycle. Caller is responsible for ``s.close()`` usually
via ``try/finally``. For most use cases, prefer the block form
``with forge.session(...) as s:`` which auto-closes.
Args:
agent: which agent the session binds to. Defaults to ``"claude"``.
meta: optional caller metadata persisted server-side in the
session ledger.
Returns:
Session
"""
body: dict[str, Any] = {"agent": agent}
if meta is not None:
body["meta"] = meta
# /sessions create is bounded by acpx session-create handshake —
# use the same generous default as /run so we don't bail mid-handshake.
http_timeout = self.default_timeout_secs + self.http_timeout_margin
payload = self._request("POST", "/sessions", json_body=body, timeout=http_timeout)
if not isinstance(payload, dict):
raise ForgeError(
f"unexpected /sessions response type: {type(payload).__name__}"
)
try:
session_id = str(payload["session_id"])
except KeyError as e:
raise ForgeError(f"malformed /sessions response: missing {e}") from e
return Session(
self,
session_id=session_id,
agent=str(payload.get("agent", agent)),
created_at=payload.get("created_at"),
cwd=payload.get("cwd"),
)
@contextmanager
def session(
self,
*,
agent: str = "claude",
meta: dict[str, Any] | None = None,
) -> Iterator[Session]:
"""Context-manager shortcut: create + auto-close a session.
Preferred form. Equivalent to::
s = forge.create_session(agent=agent, meta=meta)
try:
yield s
finally:
s.close() # idempotent; swallows close-time blips
Use ``forge.create_session(...)`` directly if you need the Session
to outlive the call site (e.g. background worker threads).
"""
s = self.create_session(agent=agent, meta=meta)
try:
yield s
finally:
try:
s.close()
except Exception as e:
# Already closed / network blip during teardown — log and
# swallow so we don't mask the real exception unwinding
# through this finally block.
log.warning(
"clawdforge: session %s close failed during context-manager exit: %s",
s.session_id,
e,
)
def get_session(self, session_id: str) -> dict[str, Any]:
"""``GET /sessions/{id}``: fetch session state.
Returns the raw payload from the server: a dict with
``session_id``, ``agent``, ``cwd``, ``created_at``, ``last_turn_at``,
``turn_count``, ``closed_at`` (nullable), ``live`` (bool), ``meta``.
Raises:
ValueError: empty ``session_id``.
ForgeAPIError(404): unknown id, or id belongs to another token
(the server intentionally returns 404 not 403 to avoid
leaking session existence across token boundaries).
"""
if not session_id:
raise ValueError("session_id must be non-empty")
slug = quote(session_id, safe="")
payload = self._request("GET", f"/sessions/{slug}", timeout=_HEALTHZ_TIMEOUT_SECS)
if not isinstance(payload, dict):
raise ForgeError(
f"unexpected /sessions/{{id}} response type: {type(payload).__name__}"
)
return payload
def list_sessions(self, *, include_closed: bool = True) -> list[dict[str, Any]]:
"""``GET /sessions``: list sessions for the calling token.
Returns:
List of session-row dicts (newest-first). Each row has
``session_id``, ``app_name``, ``agent``, ``cwd``, ``created_at``,
``last_turn_at``, ``turn_count``, ``closed_at``, ``meta``.
Args:
include_closed: if False, only sessions where ``closed_at IS
NULL`` are returned. Default True (mirrors server default).
"""
params: dict[str, Any] = {}
# Only forward the non-default to keep the URL stable for the
# default case.
if include_closed is False:
params["include_closed"] = "false"
# We can't pass `params=` through `_request` (no kwarg) without
# broadening that helper; use the session directly for this one
# GET. Mirrors the pattern of `_request` exactly otherwise.
try:
resp = self._session.request(
"GET",
self._url("/sessions"),
headers=self._headers(),
params=params or None,
timeout=_HEALTHZ_TIMEOUT_SECS,
)
except requests.RequestException as e:
raise ForgeTransportError(f"transport: {e}") from e
payload = self._parse(resp)
if not isinstance(payload, dict) or "sessions" not in payload:
raise ForgeError("unexpected /sessions response")
rows = payload["sessions"]
if not isinstance(rows, list):
raise ForgeError(
f"unexpected /sessions rows type: {type(rows).__name__}"
)
return rows
# -- internal session helpers (called from Session) -------------------
def _session_turn(
self,
*,
session_id: str,
prompt: str,
files: list[str] | None,
timeout_secs: int | None,
) -> TurnResult:
"""Internal: ``POST /sessions/{id}/turn``. Don't call directly — go
through ``Session.turn``.
"""
if timeout_secs is not None:
if not isinstance(timeout_secs, int) or isinstance(timeout_secs, bool):
raise ValueError("timeout_secs must be int")
if timeout_secs < _RUN_TIMEOUT_MIN or timeout_secs > _RUN_TIMEOUT_MAX:
raise ValueError(
f"timeout_secs out of range "
f"({_RUN_TIMEOUT_MIN}..{_RUN_TIMEOUT_MAX}), got {timeout_secs}"
)
body: dict[str, Any] = {"prompt": prompt}
if files:
body["files"] = list(files)
if timeout_secs is not None:
body["timeout_secs"] = timeout_secs
effective_run_timeout = (
timeout_secs if timeout_secs is not None else self.default_timeout_secs
)
http_timeout = effective_run_timeout + self.http_timeout_margin
slug = quote(session_id, safe="")
payload = self._request(
"POST",
f"/sessions/{slug}/turn",
json_body=body,
timeout=http_timeout,
)
if not isinstance(payload, dict):
raise ForgeError(
f"unexpected /sessions/turn response type: {type(payload).__name__}"
)
return TurnResult.from_response(payload)
def _session_close(self, session_id: str) -> None:
"""Internal: ``DELETE /sessions/{id}``. Idempotent server-side.
Don't call directly — go through ``Session.close``. Errors are
swallowed by the calling Session's ``__exit__``/``finally``
wrapping; raised here so direct callers can observe them.
"""
slug = quote(session_id, safe="")
self._request("DELETE", f"/sessions/{slug}", timeout=_HEALTHZ_TIMEOUT_SECS)

View file

@ -0,0 +1,479 @@
"""Tests for the v0.2 Session / TurnResult surface of the clawdforge SDK.
Same test framework as test_client.py `unittest` + `responses` to
intercept HTTP. No live network. v0.1 paths covered there are not
re-exercised here; the regression test_v01_run_unchanged is a smoke check
to catch accidental breakage of the existing /run path while we layered on
the session methods.
"""
from __future__ import annotations
import json
import unittest
from typing import Any
import requests
import responses
from clawdforge import (
Forge,
ForgeAPIError,
Session,
TurnResult,
)
BASE_URL = "http://192.168.0.5:8800"
TOKEN = "cf_test_token_xxxxxxxx"
def _forge() -> Forge:
return Forge(base_url=BASE_URL, token=TOKEN, default_timeout_secs=60)
def _create_payload(session_id: str = "sess_abc", agent: str = "claude") -> dict[str, Any]:
return {
"ok": True,
"session_id": session_id,
"agent": agent,
"created_at": 1_700_000_000,
"cwd": "/tmp/acpx-sessions/sess_abc",
}
def _turn_payload(
session_id: str = "sess_abc",
turn_index: int = 1,
events: list[dict] | None = None,
stop_reason: str = "end_turn",
duration_ms: int = 1234,
) -> dict[str, Any]:
return {
"ok": True,
"session_id": session_id,
"turn_index": turn_index,
"events": events if events is not None else [{"type": "text", "content": "hi"}],
"stop_reason": stop_reason,
"duration_ms": duration_ms,
}
class TestSessionBlock(unittest.TestCase):
"""Block / context-manager form."""
@responses.activate
def test_session_block_creates_and_closes(self) -> None:
"""`with forge.session() as s:` issues create on enter and close on exit."""
captured: dict[str, Any] = {"requests": []}
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
captured["requests"].append(("POST", "/sessions"))
return (200, {}, json.dumps(_create_payload()))
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
captured["requests"].append(("DELETE", "/sessions/sess_abc"))
return (200, {}, json.dumps({"ok": True}))
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
responses.add_callback(
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
)
with _forge() as f:
with f.session(agent="claude") as s:
self.assertIsInstance(s, Session)
self.assertEqual(s.session_id, "sess_abc")
self.assertEqual(s.agent, "claude")
self.assertFalse(s.closed)
# Outside the block: close was issued
self.assertTrue(s.closed)
self.assertEqual(
captured["requests"],
[("POST", "/sessions"), ("DELETE", "/sessions/sess_abc")],
)
@responses.activate
def test_session_create_sends_agent_and_meta(self) -> None:
captured: dict[str, Any] = {}
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
captured["body"] = json.loads(request.body)
captured["auth"] = request.headers.get("Authorization")
return (200, {}, json.dumps(_create_payload(agent="claude")))
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
return (200, {}, json.dumps({"ok": True}))
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
responses.add_callback(
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
)
with _forge() as f, f.session(agent="claude", meta={"caller": "test"}) as s:
self.assertEqual(s.session_id, "sess_abc")
self.assertEqual(captured["auth"], f"Bearer {TOKEN}")
self.assertEqual(captured["body"], {"agent": "claude", "meta": {"caller": "test"}})
class TestSessionTurn(unittest.TestCase):
@responses.activate
def test_session_turn_round_trip(self) -> None:
"""Mock POST /sessions and POST /sessions/{id}/turn — assert TurnResult shape."""
captured: dict[str, Any] = {}
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
return (200, {}, json.dumps(_create_payload()))
def turn_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
captured["body"] = json.loads(request.body)
return (
200,
{},
json.dumps(
_turn_payload(
turn_index=2,
events=[
{"type": "thinking", "content": "..."},
{"type": "tool_call", "name": "Read", "args": {}},
{"type": "text", "content": "hello"},
],
duration_ms=4321,
)
),
)
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
return (200, {}, json.dumps({"ok": True}))
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
responses.add_callback(
responses.POST, f"{BASE_URL}/sessions/sess_abc/turn", callback=turn_cb
)
responses.add_callback(
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
)
with _forge() as f, f.session(agent="claude") as s:
r = s.turn("hello", files=["ff_xyz"], timeout_secs=42)
self.assertIsInstance(r, TurnResult)
self.assertTrue(r.ok)
self.assertEqual(r.session_id, "sess_abc")
self.assertEqual(r.turn_index, 2)
self.assertEqual(r.stop_reason, "end_turn")
self.assertEqual(r.duration_ms, 4321)
self.assertEqual(len(r.events), 3)
self.assertEqual(captured["body"]["prompt"], "hello")
self.assertEqual(captured["body"]["files"], ["ff_xyz"])
self.assertEqual(captured["body"]["timeout_secs"], 42)
@responses.activate
def test_session_turn_empty_prompt_rejected_locally(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/sessions",
json=_create_payload(),
status=200,
)
responses.add(
responses.DELETE,
f"{BASE_URL}/sessions/sess_abc",
json={"ok": True},
status=200,
)
with _forge() as f, f.session() as s, self.assertRaises(ValueError):
s.turn("")
class TestSessionCloseIdempotent(unittest.TestCase):
@responses.activate
def test_session_close_idempotent(self) -> None:
"""Calling close() twice — second is a no-op (no second HTTP request)."""
captured: dict[str, Any] = {"close_count": 0}
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
return (200, {}, json.dumps(_create_payload()))
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
captured["close_count"] += 1
return (200, {}, json.dumps({"ok": True}))
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
responses.add_callback(
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
)
with _forge() as f:
s = f.create_session(agent="claude")
s.close()
self.assertTrue(s.closed)
s.close() # second call must NOT issue another DELETE
s.close() # ditto
self.assertEqual(captured["close_count"], 1)
class TestSessionCloseOnException(unittest.TestCase):
@responses.activate
def test_session_close_on_exception(self) -> None:
"""Exception raised inside `with` block — session still gets closed."""
captured: dict[str, Any] = {"closed": False}
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
return (200, {}, json.dumps(_create_payload()))
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
captured["closed"] = True
return (200, {}, json.dumps({"ok": True}))
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
responses.add_callback(
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
)
class Boom(Exception):
pass
with _forge() as f:
with self.assertRaises(Boom):
with f.session() as s:
self.assertEqual(s.session_id, "sess_abc")
raise Boom("kaboom inside the with")
self.assertTrue(captured["closed"])
class TestListSessions(unittest.TestCase):
@responses.activate
def test_list_sessions(self) -> None:
rows = [
{
"session_id": "sess_a",
"app_name": "cauldron",
"agent": "claude",
"cwd": "/tmp/x",
"created_at": 100,
"last_turn_at": 200,
"turn_count": 3,
"closed_at": None,
"meta": None,
},
{
"session_id": "sess_b",
"app_name": "cauldron",
"agent": "claude",
"cwd": "/tmp/y",
"created_at": 50,
"last_turn_at": None,
"turn_count": 0,
"closed_at": 75,
"meta": {"x": 1},
},
]
responses.add(
responses.GET,
f"{BASE_URL}/sessions",
json={"ok": True, "sessions": rows, "count": len(rows)},
status=200,
)
with _forge() as f:
out = f.list_sessions()
self.assertEqual(len(out), 2)
self.assertEqual(out[0]["session_id"], "sess_a")
self.assertIsNone(out[0]["closed_at"])
self.assertEqual(out[1]["closed_at"], 75)
@responses.activate
def test_list_sessions_include_closed_false(self) -> None:
captured: dict[str, Any] = {}
def cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
captured["url"] = request.url
return (200, {}, json.dumps({"ok": True, "sessions": [], "count": 0}))
responses.add_callback(responses.GET, f"{BASE_URL}/sessions", callback=cb)
with _forge() as f:
f.list_sessions(include_closed=False)
self.assertIn("include_closed=false", captured["url"])
class TestGetSessionState(unittest.TestCase):
@responses.activate
def test_get_session_state(self) -> None:
responses.add(
responses.GET,
f"{BASE_URL}/sessions/sess_abc",
json={
"ok": True,
"session_id": "sess_abc",
"agent": "claude",
"cwd": "/tmp/x",
"created_at": 100,
"last_turn_at": 200,
"turn_count": 5,
"closed_at": None,
"live": True,
"meta": {"caller": "test"},
},
status=200,
)
with _forge() as f:
state = f.get_session("sess_abc")
self.assertEqual(state["session_id"], "sess_abc")
self.assertEqual(state["turn_count"], 5)
self.assertIsNone(state["closed_at"])
self.assertTrue(state["live"])
def test_get_session_empty_id_rejected(self) -> None:
with _forge() as f, self.assertRaises(ValueError):
f.get_session("")
@responses.activate
def test_session_state_method_round_trips(self) -> None:
"""Session.state() calls through to forge.get_session(self.session_id)."""
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
return (200, {}, json.dumps(_create_payload()))
def state_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
return (
200,
{},
json.dumps(
{
"ok": True,
"session_id": "sess_abc",
"agent": "claude",
"cwd": "/tmp/x",
"created_at": 100,
"last_turn_at": None,
"turn_count": 0,
"closed_at": None,
"live": True,
"meta": None,
}
),
)
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
return (200, {}, json.dumps({"ok": True}))
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
responses.add_callback(
responses.GET, f"{BASE_URL}/sessions/sess_abc", callback=state_cb
)
responses.add_callback(
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
)
with _forge() as f, f.session() as s:
st = s.state()
self.assertEqual(st["session_id"], "sess_abc")
class TestSession404IsApiError(unittest.TestCase):
@responses.activate
def test_session_404_is_api_error(self) -> None:
"""Cross-token session access — server returns 404, SDK surfaces ForgeAPIError(404)."""
responses.add(
responses.GET,
f"{BASE_URL}/sessions/sess_other_token",
json={"detail": "session not found"},
status=404,
)
with _forge() as f, self.assertRaises(ForgeAPIError) as ctx:
f.get_session("sess_other_token")
self.assertEqual(ctx.exception.status_code, 404)
@responses.activate
def test_session_turn_404_is_api_error(self) -> None:
"""Same on /sessions/{id}/turn — 404 surfaces as ForgeAPIError(404)."""
def create_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
return (200, {}, json.dumps(_create_payload()))
def turn_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
return (404, {}, json.dumps({"detail": "session not found"}))
def delete_cb(request: requests.PreparedRequest) -> tuple[int, dict, str]:
return (200, {}, json.dumps({"ok": True}))
responses.add_callback(responses.POST, f"{BASE_URL}/sessions", callback=create_cb)
responses.add_callback(
responses.POST, f"{BASE_URL}/sessions/sess_abc/turn", callback=turn_cb
)
responses.add_callback(
responses.DELETE, f"{BASE_URL}/sessions/sess_abc", callback=delete_cb
)
with _forge() as f, f.session() as s, self.assertRaises(ForgeAPIError) as ctx:
s.turn("hello")
self.assertEqual(ctx.exception.status_code, 404)
class TestTurnResultText(unittest.TestCase):
def test_turn_result_text_concatenates_text_events(self) -> None:
"""`.text()` concatenates `type=='text'` events' content; ignores others."""
r = TurnResult.from_response(
{
"ok": True,
"session_id": "sess_x",
"turn_index": 1,
"events": [
{"type": "thinking", "content": "should not appear"},
{"type": "text", "content": "hello "},
{"type": "tool_call", "name": "Read", "args": {}, "result": {}},
{"type": "text", "content": "world"},
# malformed event must not blow up text()
{"type": "text"}, # no content
"not a dict", # type: ignore[list-item]
],
"stop_reason": "end_turn",
"duration_ms": 1,
}
)
self.assertEqual(r.text(), "hello world")
def test_turn_result_text_empty_when_no_text_events(self) -> None:
r = TurnResult.from_response(
{
"ok": True,
"session_id": "sess_x",
"turn_index": 1,
"events": [{"type": "tool_call", "name": "X"}],
"stop_reason": "end_turn",
"duration_ms": 1,
}
)
self.assertEqual(r.text(), "")
class TestV01RunUnchanged(unittest.TestCase):
"""Regression: existing /run path still works unchanged after v0.2 layer."""
@responses.activate
def test_v01_run_unchanged(self) -> None:
responses.add(
responses.POST,
f"{BASE_URL}/run",
json={
"ok": True,
"result": {"hello": "world"},
"duration_ms": 100,
"stop_reason": "end_turn",
},
status=200,
)
with _forge() as f:
r = f.run(prompt='Reply with JSON: {"hello": "world"}')
self.assertTrue(r.ok)
self.assertEqual(r.result, {"hello": "world"})
self.assertEqual(r.duration_ms, 100)
if __name__ == "__main__":
unittest.main()